1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
//! Pub/sub fast-path methods on `Shard`. `SUBSCRIBE` / `UNSUBSCRIBE` /
//! `PUBLISH` and the per-loop publish-batch flush. Split out so [`crate::exec`]
//! stays under the 500-LOC house rule; everything is still on the same
//! `impl<C: Commands> Shard<C>`.
use crate::Commands;
use crate::message::{Agg, Inbound, Part, PendingSlot};
use crate::reduce::pubsub_message;
use crate::shard::Shard;
use kevy_resp::{ArgvView, encode_array_len, encode_bulk, encode_integer, encode_null_bulk};
impl<C: Commands> Shard<C> {
pub(crate) fn do_subscribe<A: ArgvView + ?Sized>(
&mut self,
conn_id: u64,
seq: u64,
args: &A,
subscribe: bool,
) {
let verb: &[u8] = if subscribe {
b"subscribe"
} else {
b"unsubscribe"
};
// Channels: the explicit args, or (UNSUBSCRIBE with none) all current subs.
let channels: Vec<Vec<u8>> = match self.conns.get(&conn_id) {
None => return,
Some(_) if args.len() > 1 => (1..args.len()).map(|i| args[i].to_vec()).collect(),
Some(c) => c.sub.iter().cloned().collect(),
};
// Track which channels actually changed (sub/unsub is idempotent) so the
// shared registry count stays exact.
let mut changed: Vec<Vec<u8>> = Vec::new();
let reply = {
let Some(c) = self.conns.get_mut(&conn_id) else {
return;
};
let mut out = Vec::new();
if channels.is_empty() {
encode_array_len(&mut out, 3);
encode_bulk(&mut out, verb);
encode_null_bulk(&mut out);
encode_integer(&mut out, c.sub.len() as i64);
}
for ch in &channels {
let did = if subscribe {
c.sub.insert(ch.clone())
} else {
c.sub.remove(ch)
};
if did {
changed.push(ch.clone());
}
encode_array_len(&mut out, 3);
encode_bulk(&mut out, verb);
encode_bulk(&mut out, ch);
encode_integer(&mut out, c.sub.len() as i64);
}
out
};
// Reflect the change in the shared registry (PUBLISH reads it).
if !changed.is_empty() {
let bit = 1u64 << self.id;
let mut reg = self.pubsub.write().expect("pubsub registry");
for ch in &changed {
if subscribe {
let e = reg.entry(ch.clone()).or_insert((0, 0));
e.0 += 1;
e.1 |= bit;
} else {
let drop = match reg.get_mut(ch) {
Some(e) => {
e.0 = e.0.saturating_sub(1);
e.0 == 0
}
None => false,
};
if drop {
reg.remove(ch);
}
}
}
}
if let Some(c) = self.conns.get_mut(&conn_id) {
c.pending.push_back(PendingSlot {
remaining: 1,
agg: Agg::First(None),
done: None,
});
}
self.fold(conn_id, seq, Part::Reply(reply));
}
/// PUBLISH: reply with the receiver count read **locally** from the shared
/// registry (no cross-shard aggregation), then deliver the message
/// fire-and-forget to exactly the shards that hold a subscriber (in
/// parallel; no replies fold back). Replaces the old all-shards SumInt
/// fan-out, which cost ~2N cross-core ops per publish (N sends + N replies).
pub(crate) fn do_publish<A: ArgvView + ?Sized>(
&mut self,
conn_id: u64,
seq: u64,
args: &A,
) {
let (count, bits) = self
.pubsub
.read()
.expect("pubsub registry")
.get(&args[1])
.copied()
.unwrap_or((0, 0));
let mut reply = Vec::new();
encode_integer(&mut reply, count as i64);
if let Some(c) = self.conns.get_mut(&conn_id) {
c.pending.push_back(PendingSlot {
remaining: 1,
agg: Agg::First(None),
done: None,
});
}
self.fold(conn_id, seq, Part::Reply(reply));
if bits != 0 {
// Share one payload across all target shards (Arc, no per-target byte
// clone). Deliver locally inline; queue remote shards into per-target
// batches flushed once per drain (see `flush_publish`).
let m = std::sync::Arc::new((args[1].to_vec(), args[2].to_vec()));
for s in 0..self.nshards {
if bits & (1u64 << s) == 0 {
continue;
}
if s == self.id {
self.deliver_publish(&m.0, &m.1);
} else {
self.publish_batch[s].push(m.clone());
}
}
}
}
/// Append a pub/sub message to every local subscriber of `channel`; returns
/// the count delivered and marks them dirty for the reactor to flush.
pub(crate) fn deliver_publish(&mut self, channel: &[u8], msg: &[u8]) -> usize {
let ids: Vec<u64> = self
.conns
.iter()
.filter(|(_, c)| c.sub.contains(channel))
.map(|(id, _)| *id)
.collect();
if ids.is_empty() {
return 0;
}
let message = pubsub_message(channel, msg);
for id in &ids {
if let Some(c) = self.conns.get_mut(id) {
c.output.extend_from_slice(&message);
}
}
self.dirty.extend_from_slice(&ids);
ids.len()
}
/// Flush each shard's accumulated pub/sub batch as one cross-core message —
/// a flood of PUBLISHes costs one send per target shard per drain, not one
/// per message. Call once per reactor loop iteration.
#[inline]
pub(crate) fn flush_publish(&mut self) {
// Outer-empty short-circuit: the common hot path has no pub/sub.
if self.publish_batch.iter().all(|b| b.is_empty()) {
return;
}
for s in 0..self.nshards {
if s == self.id || self.publish_batch[s].is_empty() {
continue;
}
let batch = std::mem::take(&mut self.publish_batch[s]);
self.send_to(s, Inbound::DeliverPublish(batch));
}
}
}