Skip to main content

kevy_rt/
blocked.rs

1// The pieces consumed by the follow-up XREAD BLOCK / XREADGROUP BLOCK
2// sprints (`BlockKind::XReadBlock`, `BlockKind::XReadGroupBlock`,
3// `BlockHint::XReadBlock`, …) are marked here; once those sprints connect
4// callers the corresponding warnings re-fire automatically.
5#![expect(
6    dead_code,
7    reason = "stream BlockKind / BlockHint variants land in v2-7d.3 / .4"
8)]
9
10//! Per-shard blocked-client registry, shared by `BLPOP` / `BRPOP` /
11//! `XREAD BLOCK` / `XREADGROUP BLOCK`.
12//!
13//! Design: when a command blocks, the conn's `argv` + `proto` is stashed
14//! under every key it watches. A subsequent write to any of those keys wakes
15//! the oldest waiter (FIFO per key, matching Redis); a periodic tick sweeps
16//! waiters past their `deadline_ms` and fires a nil reply.
17//!
18//! The registry holds no reactor / socket state — `Shard` owns the wake +
19//! reply emission paths. `BlockedClients::pop_*` returns the bookkeeping;
20//! the caller decides what RESP frame to write.
21
22use crate::Commands;
23use crate::shard::Shard;
24use kevy_resp::{Argv, RespVersion};
25use std::collections::{HashMap, VecDeque};
26use std::time::{SystemTime, UNIX_EPOCH};
27
28/// Unix wall-clock milliseconds — the time base both the dispatcher (when
29/// computing a waiter's `deadline_ms = now_ms + timeout_ms`) and the reactor
30/// tick (when checking `deadline_ms <= now_ms`) read. System-time jumps
31/// (NTP slew, manual clock change) are accepted: a backwards jump may make
32/// a waiter expire late, but BLOCK is not a wall-clock contract.
33#[inline]
34pub(crate) fn unix_now_ms() -> u64 {
35    SystemTime::now()
36        .duration_since(UNIX_EPOCH)
37        .map_or(0, |d| d.as_millis() as u64)
38}
39
40/// Emit the RESP nil reply that a timed-out blocking command returns.
41/// Shape depends on both proto and kind:
42/// - RESP3: `_\r\n` (the null type) for all kinds.
43/// - RESP2 `BLPOP` / `BRPOP`: nil array `*-1\r\n` (Redis returns nil array
44///   so the multi-bulk reply slot stays well-typed).
45/// - RESP2 `XREAD` / `XREADGROUP`: nil bulk `$-1\r\n` (matches "no streams
46///   updated in this window" — also Redis's choice).
47pub(crate) fn encode_block_timeout(out: &mut Vec<u8>, kind: BlockKind, proto: RespVersion) {
48    match (proto, kind) {
49        (RespVersion::V3, _) => out.extend_from_slice(b"_\r\n"),
50        (RespVersion::V2, BlockKind::Blpop | BlockKind::Brpop | BlockKind::Bzpopmin) => {
51            out.extend_from_slice(b"*-1\r\n");
52        }
53        (RespVersion::V2, BlockKind::XReadBlock | BlockKind::XReadGroupBlock) => {
54            out.extend_from_slice(b"$-1\r\n");
55        }
56        // BRPOPLPUSH on timeout returns nil bulk (the would-be moved
57        // element). Same shape as XREAD timeout.
58        (RespVersion::V2, BlockKind::Brpoplpush) => {
59            out.extend_from_slice(b"$-1\r\n");
60        }
61    }
62}
63
64/// Which blocking command a waiter is parked in. Drives both timeout-nil
65/// shape and wake-retry dispatch.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum BlockKind {
68    Blpop,
69    Brpop,
70    /// `BZPOPMIN key [key ...] timeout` — block until a sorted set has a
71    /// member, then pop the lowest-scored one. Same arm-and-serve flow as
72    /// `BLPOP`; the reply shape adds a third bulk (the score).
73    Bzpopmin,
74    /// `BRPOPLPUSH source destination timeout` — atomic blocking
75    /// right-pop from `source` + left-push to `destination`. Parks
76    /// on `source` only. Reply: single bulk of the moved element on
77    /// success, nil bulk on timeout. Deprecated since Redis 6.2 in
78    /// favour of BLMOVE, but Bee Queue (and many older clients)
79    /// still emit it.
80    Brpoplpush,
81    XReadBlock,
82    XReadGroupBlock,
83}
84
85/// How a command wants to block, if at all. Returned by
86/// [`Commands::resolve`] inside [`crate::ResolvedCmd`] so the verb-table
87/// lookup happens once per command. `None` is the zero-cost default for
88/// every non-blocking verb (≥ 99.9 % of dispatches in steady state).
89///
90/// `keys` is every key the conn watches (≥ 1). The dispatcher picks the
91/// park strategy from them:
92/// - **single key on the conn's own shard** → the in-shard fast path
93///   (`BlockedClients`): register + wake without any cross-core hop.
94/// - **single remote key, or any multi-key form** → the cross-shard
95///   arbiter (`block_xshard`): the conn parks on its origin
96///   shard and watch registrations fan out to each key's owning shard.
97///
98/// For `BLPOP` / `BRPOP` the keys are list keys; for `XREAD BLOCK` /
99/// `XREADGROUP BLOCK` they are the STREAMS keys (in request order).
100#[derive(Clone, Debug, Default)]
101pub enum BlockHint {
102    #[default]
103    None,
104    Block {
105        kind: BlockKind,
106        keys: Vec<Vec<u8>>,
107        /// `0` = block forever (Redis convention). Anything else is the
108        /// wall-clock millis the dispatcher will add to `unix_now_ms()` to
109        /// derive the waiter's `deadline_ms`.
110        timeout_ms: u64,
111    },
112}
113
114pub(crate) struct BlockedClient {
115    pub(crate) conn_id: u64,
116    pub(crate) kind: BlockKind,
117    /// Unix-ms wall clock when this waiter expires. `u64::MAX` = block forever.
118    pub(crate) deadline_ms: u64,
119    pub(crate) argv: Argv,
120    pub(crate) proto: RespVersion,
121}
122
123/// FIFO per key; secondary index by conn for O(1) cleanup on wake / close.
124#[derive(Default)]
125pub(crate) struct BlockedClients {
126    by_key: HashMap<Vec<u8>, VecDeque<BlockedClient>>,
127    by_conn: HashMap<u64, Vec<Vec<u8>>>,
128}
129
130impl BlockedClients {
131    pub(crate) fn new() -> Self {
132        Self::default()
133    }
134
135    /// Was a write on `key` watched by any blocker? `is_empty()` short-circuit
136    /// keeps the hot push/xadd path free of map lookups when nothing's parked.
137    #[inline]
138    pub(crate) fn is_empty(&self) -> bool {
139        self.by_key.is_empty()
140    }
141
142    #[inline]
143    pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
144        self.by_key.contains_key(key)
145    }
146
147    /// Register one waiter on each of `keys`. The same waiter is cloned into
148    /// every key's FIFO; the wake path drops the surviving copies via
149    /// `drop_for_conn` once any one fires (so a multi-key BLPOP woken by key
150    /// A does not also fire on a later push to key B).
151    pub(crate) fn add(
152        &mut self,
153        conn_id: u64,
154        keys: &[Vec<u8>],
155        kind: BlockKind,
156        deadline_ms: u64,
157        argv: Argv,
158        proto: RespVersion,
159    ) {
160        for key in keys {
161            let bc = BlockedClient {
162                conn_id,
163                kind,
164                deadline_ms,
165                argv: argv.clone(),
166                proto,
167            };
168            self.by_key.entry(key.clone()).or_default().push_back(bc);
169        }
170        self.by_conn.insert(conn_id, keys.to_vec());
171    }
172
173    /// Pop and return the oldest waiter on `key`. Caller must then call
174    /// `drop_for_conn(waiter.conn_id)` to scrub copies on this conn's other
175    /// watched keys (multi-key BLPOP), then retry `waiter.argv`.
176    pub(crate) fn pop_oldest_on_key(&mut self, key: &[u8]) -> Option<BlockedClient> {
177        let queue = self.by_key.get_mut(key)?;
178        let waiter = queue.pop_front();
179        if queue.is_empty() {
180            self.by_key.remove(key);
181        }
182        waiter
183    }
184
185    /// Drop every waiter copy belonging to `conn_id`. Called on (a) successful
186    /// wake (purge stale copies on other keys), and (b) connection close.
187    pub(crate) fn drop_for_conn(&mut self, conn_id: u64) {
188        let Some(keys) = self.by_conn.remove(&conn_id) else {
189            return;
190        };
191        for key in keys {
192            let Some(queue) = self.by_key.get_mut(&key) else {
193                continue;
194            };
195            queue.retain(|w| w.conn_id != conn_id);
196            if queue.is_empty() {
197                self.by_key.remove(&key);
198            }
199        }
200    }
201
202    /// Pop one representative waiter per conn whose `deadline_ms <= now_ms`.
203    /// All copies on the conn's other watched keys are removed too, so each
204    /// expired conn fires exactly one timeout reply.
205    pub(crate) fn pop_expired(&mut self, now_ms: u64) -> Vec<BlockedClient> {
206        let conns = self.expired_conn_ids(now_ms);
207        let mut out = Vec::with_capacity(conns.len());
208        for conn_id in conns {
209            if let Some(rep) = self.representative(conn_id) {
210                out.push(rep);
211            }
212            self.drop_for_conn(conn_id);
213        }
214        out
215    }
216
217    fn expired_conn_ids(&self, now_ms: u64) -> Vec<u64> {
218        let mut seen: Vec<u64> = Vec::new();
219        for queue in self.by_key.values() {
220            for w in queue {
221                if w.deadline_ms <= now_ms && !seen.contains(&w.conn_id) {
222                    seen.push(w.conn_id);
223                }
224            }
225        }
226        seen
227    }
228
229    fn representative(&self, conn_id: u64) -> Option<BlockedClient> {
230        let keys = self.by_conn.get(&conn_id)?;
231        let first_key = keys.first()?;
232        let queue = self.by_key.get(first_key)?;
233        queue
234            .iter()
235            .find(|w| w.conn_id == conn_id)
236            .map(|w| BlockedClient {
237                conn_id: w.conn_id,
238                kind: w.kind,
239                deadline_ms: w.deadline_ms,
240                argv: w.argv.clone(),
241                proto: w.proto,
242            })
243    }
244}
245
246impl<C: Commands> Shard<C> {
247    /// Periodic reactor tick: fire one timeout reply per blocked waiter whose
248    /// `deadline_ms <= now`. Cheap when no one is parked (`is_empty()` short-
249    /// circuit). Called from both the epoll and io_uring reactor loops on the
250    /// same cadence as the active-TTL reaper.
251    pub(crate) fn tick_blocked_timeouts(&mut self) {
252        if self.blocked.is_empty() {
253            return;
254        }
255        let now_ms = unix_now_ms();
256        for w in self.blocked.pop_expired(now_ms) {
257            let Some(conn) = self.conns.get_mut(&w.conn_id) else {
258                continue;
259            };
260            conn.blocked = false;
261            encode_block_timeout(&mut conn.output, w.kind, w.proto);
262            self.dirty.push(w.conn_id);
263        }
264    }
265
266    /// Wake the oldest waiter on `key` (FIFO, matching Redis) and retry its
267    /// command. Called by the dispatcher after a write that may have produced
268    /// new data for blocked readers — `LPUSH` / `RPUSH` for `BLPOP` /
269    /// `BRPOP`; `XADD` for `XREAD BLOCK` / `XREADGROUP BLOCK`. The retry
270    /// re-runs the original command via `Commands::dispatch_into`; if the
271    /// data has already been consumed in a race window, the retry sees an
272    /// empty list / stream and a `None` from this fn — the waiter has
273    /// already been popped out of the registry so it stays unblocked (the
274    /// next tick or a fresh client request resolves it). One push wakes one
275    /// waiter only (Redis semantics — a single LPUSH does not feed two
276    /// BLPOP clients).
277    pub(crate) fn wake_blocked_on_key(&mut self, key: &[u8]) {
278        if self.blocked.is_empty() {
279            return;
280        }
281        let Some(waiter) = self.blocked.pop_oldest_on_key(key) else {
282            return;
283        };
284        self.blocked.drop_for_conn(waiter.conn_id);
285        let Some(conn) = self.conns.get_mut(&waiter.conn_id) else {
286            return;
287        };
288        conn.blocked = false;
289        let proto = waiter.proto;
290        match proto {
291            RespVersion::V2 => self
292                .commands
293                .dispatch_into(&mut self.store, &waiter.argv, &mut conn.output),
294            RespVersion::V3 => self
295                .commands
296                .dispatch_into_resp3(&mut self.store, &waiter.argv, &mut conn.output),
297        }
298        conn.next_emit += 1;
299        self.dirty.push(waiter.conn_id);
300    }
301}