1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Single-target dispatch hot path — extracted from [`crate::exec`] to
//! keep that file under the 500-LOC house rule.
//!
//! [`Shard::start_single`] is the entry point for the vast majority of
//! commands (any `Route::Local` or `Route::Single` — GET/SET/INCR/PING/
//! the lot). It first tries [`Shard::try_inline_local`], the in-order
//! reply-straight-to-conn fast path, and only falls back to the
//! pending-slot machinery when that can't fire (out-of-order seq, a
//! cross-shard hop, or a block-and-park).
use crate::Commands;
use crate::message::{Agg, DispatchMeta};
use crate::shard::Shard;
use kevy_resp::{ArgvView, RespVersion};
use std::time::Instant;
impl<C: Commands> Shard<C> {
/// Single-target command (keyless `Local` or single-key `Single`) — the
/// overwhelming majority (GET/SET/INCR/PING/…). Skips the
/// `Vec<(shard, Op)>` allocation + the aggregation fold loop entirely.
/// `meta` carries the origin resolve()'s write-side facts so no later
/// stage (local post-write or the owning shard) re-parses the verb.
/// `proto` rides per-cmd from `handle_command`'s single conns probe so
/// a V2 + V3 mix on the same owning shard each gets the right reply
/// shape.
#[allow(clippy::too_many_arguments)]
pub(crate) fn start_single<A: ArgvView + ?Sized>(
&mut self,
conn_id: u64,
seq: u64,
proto: RespVersion,
args: &A,
shard: usize,
is_quit: bool,
block_hint: crate::BlockHint,
meta: DispatchMeta,
) {
// In-order local fast path: `seq == next_emit` and no prior cmd is
// pending, so write straight to the conn's output and return.
if shard == self.id
&& self.try_inline_local(conn_id, args, is_quit, proto, block_hint, meta)
{
return;
}
self.push_pending_slot(conn_id, 1, Agg::First(None), is_quit);
if shard == self.id {
// Local-but-not-fast-path (a prior cmd is still pending):
// dispatch straight off the borrowed argv — no owned
// materialise needed.
let part = self.run_dispatch(args, proto, meta);
self.fold(conn_id, seq, part);
} else {
// Cross-shard forward: materialise owned at the handoff —
// into a pool-recycled Argv, so the steady state mallocs
// nothing. The -c50 single-shard hot path never reaches here.
let argv = self.argv_pool.take_filled(args);
self.request_batch[shard].push((conn_id, seq, argv, proto, meta));
}
}
/// Try to dispatch a single-shard local command straight to the
/// connection's output buffer (no PendingSlot, no fold, no reply Vec).
/// Only valid when `seq == next_emit`, i.e. nothing is pending. Returns
/// `true` iff the inline write happened (caller skips fallback paths).
///
/// One conns probe covers the whole inline path — pending check,
/// dispatch into `conn.output` (disjoint field borrows: commands /
/// store / conn), wrote-reply check, and the next_emit/closing
/// bookkeeping. This used to be four probes split across a
/// `dispatch_inline` helper.
#[inline]
pub(crate) fn try_inline_local<A: ArgvView + ?Sized>(
&mut self,
conn_id: u64,
args: &A,
is_quit: bool,
proto: RespVersion,
block_hint: crate::BlockHint,
meta: DispatchMeta,
) -> bool {
// SLOWLOG OFF (`slower_than_micros < 0`) skips the clock pair.
// Field-only read, before the conn borrow.
let t0 = if self.slowlog.slower_than_micros >= 0 {
Some(Instant::now())
} else {
None
};
let Some(conn) = self.conns.get_mut(&conn_id) else { return false };
if !conn.pending.is_empty() {
return false;
}
let out_pre_len = conn.output.len();
// V2 is the default + the hot path; V3 only fires after HELLO 3.
match proto {
RespVersion::V2 => self
.commands
.dispatch_into(&mut self.store, args, &mut conn.output),
RespVersion::V3 => self
.commands
.dispatch_into_resp3(&mut self.store, args, &mut conn.output),
}
let wrote_reply = conn.output.len() > out_pre_len;
// Park-on-miss for BLPOP / BRPOP / XREAD BLOCK that wrote nothing:
// the reply is deferred to the wake / timeout path.
if !wrote_reply
&& let crate::BlockHint::Block { kind, keys, timeout_ms } = block_hint
{
self.slowlog_maybe(t0, args);
self.park_dispatch(conn_id, args, kind, keys, timeout_ms, proto);
return true;
}
conn.next_emit += 1;
if is_quit {
conn.closing = true;
}
self.slowlog_maybe(t0, args);
if meta.is_write {
self.post_write_housekeeping(args, meta);
}
true
}
/// Record `args` in the SLOWLOG if a start instant was captured
/// (`None` = SLOWLOG OFF, the default — a no-op).
fn slowlog_maybe<A: ArgvView + ?Sized>(&mut self, t0: Option<Instant>, args: &A) {
if let Some(t0) = t0 {
let elapsed = t0.elapsed().as_micros().min(u64::MAX as u128) as u64;
self.slowlog_record(args, elapsed);
}
}
/// Park a blocking command whose `dispatch_into` produced no reply.
/// Picks the strategy from the watched keys:
/// - **single key on this shard** → the in-shard fast path
/// ([`crate::blocked::BlockedClients`]): freeze the replay argv (via
/// `block_serve_argv` then `resolve_block_argv` for `$`) and register
/// the waiter locally — no cross-core hop.
/// - **single remote key, or any multi-key form** → the cross-shard
/// arbiter ([`crate::block_xshard`]): the conn parks here (its origin
/// shard) and watch registrations fan out to each key's owning shard.
///
/// Either way `Conn.blocked` is set so the reactor knows the conn is
/// parked until a wake or timeout.
fn park_dispatch<A: ArgvView + ?Sized>(
&mut self,
conn_id: u64,
args: &A,
kind: crate::BlockKind,
keys: Vec<Vec<u8>>,
timeout_ms: u64,
proto: RespVersion,
) {
let deadline_ms = if timeout_ms == 0 {
u64::MAX
} else {
crate::blocked::unix_now_ms().saturating_add(timeout_ms)
};
if keys.len() == 1 && self.shard_of(&keys[0]) == self.id {
// In-shard fast path: narrow to the one key + freeze `$`.
let serve = self.commands.block_serve_argv(args, kind, &keys[0]);
let serve = self.commands.resolve_block_argv(&mut self.store, &serve, kind);
self.blocked.add(
conn_id,
std::slice::from_ref(&keys[0]),
kind,
deadline_ms,
serve,
proto,
);
if let Some(conn) = self.conns.get_mut(&conn_id) {
conn.blocked = true;
}
} else {
let entries =
crate::block_xshard::build_serve_entries(&self.commands, args, kind, &keys);
self.park_blocked_xshard(conn_id, kind, entries, deadline_ms, proto);
}
}
/// Post-`dispatch_into` work for a write — runs after the inline fast
/// path here and after [`Shard::run_dispatch`] (the local fallback +
/// forwarded paths): WATCH version bump, AOF append, keyspace notify,
/// and BLOCK reactor wake on the written key. Each step is a no-op
/// when its feature is unused on this shard.
pub(crate) fn post_write_housekeeping<A: ArgvView + ?Sized>(
&mut self,
args: &A,
meta: DispatchMeta,
) {
// The WATCH bump uses `meta.key_idx` straight from the origin
// resolve() — no verb re-parse (this used to re-run the ~40-arm
// `Commands::route` walk on every local write). `bump_if_watched`
// is an empty-map lookup when nothing is WATCH-ed;
// `maybe_notify_dispatch` is an empty-flags check when
// notify_keyspace_events is off (the default).
if let Some(idx) = meta.key_idx
&& (idx as usize) < args.len()
{
self.store.bump_if_watched(&args[idx as usize]);
}
if self.aof.is_some() {
self.log_write(args);
}
self.maybe_notify_dispatch(args);
// BLOCK wake: if this write targets a key a waiter is parked on,
// wake it. Gated on `wake_idx` (None for non-wake writes), so a
// None-only workload pays one Option discriminant check per write.
if let Some(idx) = meta.wake_idx
&& let Some(key) = args.get(idx as usize).map(<[u8]>::to_vec)
{
self.wake_key(&key);
}
}
/// Wake both block registries for a write that landed on `key`: the
/// in-shard fast path ([`crate::blocked::BlockedClients`]) and the
/// cross-shard arbiter ([`crate::block_xshard`]). Each is an
/// `is_empty()` short-circuit when unused, so the steady state pays two
/// predicted branches. Called from the local write path here and from
/// the cross-shard forwarded write path in [`crate::exec_op`].
pub(crate) fn wake_key(&mut self, key: &[u8]) {
if !self.blocked.is_empty() {
self.wake_blocked_on_key(key);
}
if !self.xwaiters.is_empty() && self.xwaiters.is_watched(key) {
self.target_wake_xshard(key);
}
}
}