Skip to main content

kevy_rt/
route.rs

1//! [`Route`] — how each command maps onto shards. Returned by
2//! [`crate::Commands::route`] / carried in [`crate::ResolvedCmd`]; the
3//! runtime's `start_command` matches on it to pick a dispatch shape.
4
5use crate::exec_slowlog::SlowlogSub;
6
7/// How a command maps onto shards.
8#[derive(Debug)]
9pub enum Route {
10    /// Keyless; execute on the connection's own shard (e.g. PING).
11    Local,
12    /// Single-key; route by `args[idx]`.
13    Single(usize),
14    /// `args[1..]` are keys; delete each on its shard, sum the counts.
15    DelKeys,
16    /// `args[1..]` are keys; count existing across shards.
17    ExistsKeys,
18    /// Sum every shard's key count.
19    Dbsize,
20    /// Flush every shard.
21    Flush,
22    /// Snapshot every shard's store to disk, synchronously (`SAVE` —
23    /// blocks until durable, the Redis contract for the explicit form).
24    Save,
25    /// `BGSAVE` — collect a COW view per shard and persist in the
26    /// background; the command returns once the views are frozen.
27    BgSave,
28    /// `BGREWRITEAOF` — rebuild every shard's AOF from in-memory state.
29    /// Synchronous in v1.0 (each shard blocks for its own rewrite duration).
30    RewriteAof,
31    /// `MSET` — `args[1..]` are key/value pairs, routed per key's shard.
32    MSet,
33    /// `MGET` — `args[1..]` are keys; values gathered in request order.
34    MGet,
35    /// `SINTER` / `SUNION` / `SDIFF` — `args[1..]` are set keys.
36    SInter,
37    SUnion,
38    SDiff,
39    /// `KEYS pattern` — every shard returns its matching keys.
40    Keys(Option<Vec<u8>>),
41    /// `SCAN` (cursor-0 approximation) — like KEYS but replies `[cursor, keys]`.
42    Scan(Option<Vec<u8>>),
43    /// `RANDOMKEY` — one arbitrary key across all shards.
44    RandomKey,
45    /// `SUBSCRIBE` / `UNSUBSCRIBE` — connection-level (modifies this conn).
46    Subscribe,
47    Unsubscribe,
48    /// `PSUBSCRIBE pattern [pattern ...]` / `PUNSUBSCRIBE [pattern ...]` —
49    /// like Subscribe/Unsubscribe but the conn registers Redis-glob
50    /// patterns; `PUBLISH` to a matching channel delivers a `pmessage`
51    /// frame. Connection-level (modifies this conn + shared pattern
52    /// registry).
53    Psubscribe,
54    Punsubscribe,
55    /// `PUBLISH channel message` — delivered to subscribers on every core.
56    Publish,
57    /// `WATCH key [key ...]` — fan-out to record per-shard versions, then
58    /// stash the (key, version) pairs in the conn's `watched` set so the
59    /// next `EXEC` can validate them. Connection-level.
60    Watch,
61    /// `UNWATCH` — clear the conn's `watched` set. Connection-level, local.
62    Unwatch,
63    /// `HELLO [protover [AUTH user pass] [SETNAME name]]` — server
64    /// handshake; on `HELLO 3` flips the conn into RESP3 mode (per-conn
65    /// `proto` field). Reply shape itself is proto-aware (V2: array of
66    /// pairs; V3: Map). Connection-level, dispatch via the
67    /// [`crate::Commands::hello_reply`] hook so embedders set their own server
68    /// metadata.
69    Hello,
70    /// `RENAME source destination` / `RENAMENX source destination`. The
71    /// runtime handles the two-shard decision: same-shard renames go
72    /// through one atomic [`crate::Store::rename`] on the owning shard; cross-
73    /// shard renames use the Take→Put orchestrator (lands in v2-3b;
74    /// v2-3a emits `-CROSSSHARD ...` for that case).
75    Rename {
76        /// `true` for `RENAMENX` (no overwrite — reply `:0` if dst exists).
77        nx: bool,
78    },
79    /// `SLOWLOG GET / LEN / RESET / HELP`. The sub-command + parsed
80    /// args are pre-decoded at routing time so the runtime knows
81    /// whether to short-circuit (HELP / error) or fan out across
82    /// shards (GET / LEN / RESET). See [`crate::parse_slowlog_sub`].
83    Slowlog(SlowlogSub),
84    /// Non-blocking `XREAD` / `XREADGROUP` over **multiple** streams — fan
85    /// each stream out to its owning shard and merge the per-stream replies
86    /// in request order (single-stream forms still route via
87    /// [`Self::Single`]). Each element is `(stream key, last-seen id)`;
88    /// `count` is the optional `COUNT` cap applied per stream; `group`
89    /// `Some` makes each per-shard sub-query an `XREADGROUP` (a write —
90    /// PEL / last-delivered updates happen on each stream's owning shard
91    /// and are AOF-logged there as the rewritten single-stream command).
92    /// The command set builds this only for the non-blocking, ≥2-stream
93    /// forms; blocking reads park on the origin shard instead (see the
94    /// cross-shard BLOCK arbiter).
95    XReadGather {
96        streams: Vec<(Vec<u8>, Vec<u8>)>,
97        count: Option<usize>,
98        group: Option<XGroupCtx>,
99    },
100}
101
102/// The `GROUP <name> <consumer>` (+ `NOACK`) context an `XREADGROUP`
103/// gather carries to each per-stream sub-query.
104#[derive(Debug)]
105pub struct XGroupCtx {
106    /// Consumer-group name.
107    pub group: Vec<u8>,
108    /// Consumer name within the group.
109    pub consumer: Vec<u8>,
110    /// `NOACK` — deliver without adding to the PEL.
111    pub noack: bool,
112}