1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! [`Route`] — how each command maps onto shards. Returned by
//! [`crate::Commands::route`] / carried in [`crate::ResolvedCmd`]; the
//! runtime's `start_command` matches on it to pick a dispatch shape.
use crate::exec_slowlog::SlowlogSub;
/// How a command maps onto shards.
#[derive(Debug)]
pub enum Route {
/// Keyless; execute on the connection's own shard (e.g. PING).
Local,
/// Single-key; route by `args[idx]`.
Single(usize),
/// `args[1..]` are keys; delete each on its shard, sum the counts.
DelKeys,
/// `args[1..]` are keys; count existing across shards.
ExistsKeys,
/// Sum every shard's key count.
Dbsize,
/// Flush every shard.
Flush,
/// Snapshot every shard's store to disk, synchronously (`SAVE` —
/// blocks until durable, the Redis contract for the explicit form).
Save,
/// `BGSAVE` — collect a COW view per shard and persist in the
/// background; the command returns once the views are frozen.
BgSave,
/// `BGREWRITEAOF` — rebuild every shard's AOF from in-memory state.
/// Synchronous in v1.0 (each shard blocks for its own rewrite duration).
RewriteAof,
/// `MSET` — `args[1..]` are key/value pairs, routed per key's shard.
MSet,
/// `MGET` — `args[1..]` are keys; values gathered in request order.
MGet,
/// `SINTER` / `SUNION` / `SDIFF` — `args[1..]` are set keys.
SInter,
SUnion,
SDiff,
/// `KEYS pattern` — every shard returns its matching keys.
Keys(Option<Vec<u8>>),
/// `SCAN` (cursor-0 approximation) — like KEYS but replies `[cursor, keys]`.
Scan(Option<Vec<u8>>),
/// `RANDOMKEY` — one arbitrary key across all shards.
RandomKey,
/// `SUBSCRIBE` / `UNSUBSCRIBE` — connection-level (modifies this conn).
Subscribe,
Unsubscribe,
/// `PSUBSCRIBE pattern [pattern ...]` / `PUNSUBSCRIBE [pattern ...]` —
/// like Subscribe/Unsubscribe but the conn registers Redis-glob
/// patterns; `PUBLISH` to a matching channel delivers a `pmessage`
/// frame. Connection-level (modifies this conn + shared pattern
/// registry).
Psubscribe,
Punsubscribe,
/// `PUBLISH channel message` — delivered to subscribers on every core.
Publish,
/// `WATCH key [key ...]` — fan-out to record per-shard versions, then
/// stash the (key, version) pairs in the conn's `watched` set so the
/// next `EXEC` can validate them. Connection-level.
Watch,
/// `UNWATCH` — clear the conn's `watched` set. Connection-level, local.
Unwatch,
/// `HELLO [protover [AUTH user pass] [SETNAME name]]` — server
/// handshake; on `HELLO 3` flips the conn into RESP3 mode (per-conn
/// `proto` field). Reply shape itself is proto-aware (V2: array of
/// pairs; V3: Map). Connection-level, dispatch via the
/// [`crate::Commands::hello_reply`] hook so embedders set their own server
/// metadata.
Hello,
/// `RENAME source destination` / `RENAMENX source destination`. The
/// runtime handles the two-shard decision: same-shard renames go
/// through one atomic [`crate::Store::rename`] on the owning shard; cross-
/// shard renames use the Take→Put orchestrator (lands in v2-3b;
/// v2-3a emits `-CROSSSHARD ...` for that case).
Rename {
/// `true` for `RENAMENX` (no overwrite — reply `:0` if dst exists).
nx: bool,
},
/// `SLOWLOG GET / LEN / RESET / HELP`. The sub-command + parsed
/// args are pre-decoded at routing time so the runtime knows
/// whether to short-circuit (HELP / error) or fan out across
/// shards (GET / LEN / RESET). See [`crate::parse_slowlog_sub`].
Slowlog(SlowlogSub),
/// Non-blocking `XREAD` / `XREADGROUP` over **multiple** streams — fan
/// each stream out to its owning shard and merge the per-stream replies
/// in request order (single-stream forms still route via
/// [`Self::Single`]). Each element is `(stream key, last-seen id)`;
/// `count` is the optional `COUNT` cap applied per stream; `group`
/// `Some` makes each per-shard sub-query an `XREADGROUP` (a write —
/// PEL / last-delivered updates happen on each stream's owning shard
/// and are AOF-logged there as the rewritten single-stream command).
/// The command set builds this only for the non-blocking, ≥2-stream
/// forms; blocking reads park on the origin shard instead (see the
/// cross-shard BLOCK arbiter).
XReadGather {
streams: Vec<(Vec<u8>, Vec<u8>)>,
count: Option<usize>,
group: Option<XGroupCtx>,
},
}
/// The `GROUP <name> <consumer>` (+ `NOACK`) context an `XREADGROUP`
/// gather carries to each per-stream sub-query.
#[derive(Debug)]
pub struct XGroupCtx {
/// Consumer-group name.
pub group: Vec<u8>,
/// Consumer name within the group.
pub consumer: Vec<u8>,
/// `NOACK` — deliver without adding to the PEL.
pub noack: bool,
}