Skip to main content

kevy_rt/
blocked.rs

1// The pieces consumed by the follow-up XREAD BLOCK / XREADGROUP BLOCK
2// sprints (`BlockKind::XReadBlock`, `BlockKind::XReadGroupBlock`,
3// `BlockHint::XReadBlock`, …) are marked here; once those sprints connect
4// callers the corresponding warnings re-fire automatically.
5#![expect(
6    dead_code,
7    reason = "stream BlockKind / BlockHint variants land in v2-7d.3 / .4"
8)]
9
10//! Per-shard blocked-client registry, shared by `BLPOP` / `BRPOP` /
11//! `XREAD BLOCK` / `XREADGROUP BLOCK`.
12//!
13//! Design: when a command blocks, the conn's `argv` + `proto` is stashed
14//! under every key it watches. A subsequent write to any of those keys wakes
15//! the oldest waiter (FIFO per key, matching Redis); a periodic tick sweeps
16//! waiters past their `deadline_ms` and fires a nil reply.
17//!
18//! The registry holds no reactor / socket state — `Shard` owns the wake +
19//! reply emission paths. `BlockedClients::pop_*` returns the bookkeeping;
20//! the caller decides what RESP frame to write.
21
22use crate::Commands;
23use crate::shard::Shard;
24use kevy_resp::{Argv, RespVersion};
25use std::collections::{HashMap, VecDeque};
26use std::time::{SystemTime, UNIX_EPOCH};
27
28/// Unix wall-clock milliseconds — the time base both the dispatcher (when
29/// computing a waiter's `deadline_ms = now_ms + timeout_ms`) and the reactor
30/// tick (when checking `deadline_ms <= now_ms`) read. System-time jumps
31/// (NTP slew, manual clock change) are accepted: a backwards jump may make
32/// a waiter expire late, but BLOCK is not a wall-clock contract.
33#[inline]
34pub(crate) fn unix_now_ms() -> u64 {
35    SystemTime::now()
36        .duration_since(UNIX_EPOCH)
37        .map_or(0, |d| d.as_millis() as u64)
38}
39
40/// Emit the RESP nil reply that a timed-out blocking command returns.
41/// Shape depends on both proto and kind:
42/// - RESP3: `_\r\n` (the null type) for all kinds.
43/// - RESP2 `BLPOP` / `BRPOP`: nil array `*-1\r\n` (Redis returns nil array
44///   so the multi-bulk reply slot stays well-typed).
45/// - RESP2 `XREAD` / `XREADGROUP`: nil bulk `$-1\r\n` (matches "no streams
46///   updated in this window" — also Redis's choice).
47pub(crate) fn encode_block_timeout(out: &mut Vec<u8>, kind: BlockKind, proto: RespVersion) {
48    match (proto, kind) {
49        (RespVersion::V3, _) => out.extend_from_slice(b"_\r\n"),
50        (RespVersion::V2, BlockKind::Blpop | BlockKind::Brpop) => {
51            out.extend_from_slice(b"*-1\r\n");
52        }
53        (RespVersion::V2, BlockKind::XReadBlock | BlockKind::XReadGroupBlock) => {
54            out.extend_from_slice(b"$-1\r\n");
55        }
56    }
57}
58
59/// Which blocking command a waiter is parked in. Drives both timeout-nil
60/// shape and wake-retry dispatch.
61#[derive(Clone, Copy, Debug, PartialEq, Eq)]
62pub enum BlockKind {
63    Blpop,
64    Brpop,
65    XReadBlock,
66    XReadGroupBlock,
67}
68
69/// How a command wants to block, if at all. Returned by
70/// [`Commands::resolve`] inside [`crate::ResolvedCmd`] so the verb-table
71/// lookup happens once per command. `None` is the zero-cost default for
72/// every non-blocking verb (≥ 99.9 % of dispatches in steady state).
73///
74/// `keys` is every key the conn watches (≥ 1). The dispatcher picks the
75/// park strategy from them:
76/// - **single key on the conn's own shard** → the in-shard fast path
77///   (`BlockedClients`): register + wake without any cross-core hop.
78/// - **single remote key, or any multi-key form** → the cross-shard
79///   arbiter (`block_xshard`): the conn parks on its origin
80///   shard and watch registrations fan out to each key's owning shard.
81///
82/// For `BLPOP` / `BRPOP` the keys are list keys; for `XREAD BLOCK` /
83/// `XREADGROUP BLOCK` they are the STREAMS keys (in request order).
84#[derive(Clone, Debug, Default)]
85pub enum BlockHint {
86    #[default]
87    None,
88    Block {
89        kind: BlockKind,
90        keys: Vec<Vec<u8>>,
91        /// `0` = block forever (Redis convention). Anything else is the
92        /// wall-clock millis the dispatcher will add to `unix_now_ms()` to
93        /// derive the waiter's `deadline_ms`.
94        timeout_ms: u64,
95    },
96}
97
98pub(crate) struct BlockedClient {
99    pub(crate) conn_id: u64,
100    pub(crate) kind: BlockKind,
101    /// Unix-ms wall clock when this waiter expires. `u64::MAX` = block forever.
102    pub(crate) deadline_ms: u64,
103    pub(crate) argv: Argv,
104    pub(crate) proto: RespVersion,
105}
106
107/// FIFO per key; secondary index by conn for O(1) cleanup on wake / close.
108#[derive(Default)]
109pub(crate) struct BlockedClients {
110    by_key: HashMap<Vec<u8>, VecDeque<BlockedClient>>,
111    by_conn: HashMap<u64, Vec<Vec<u8>>>,
112}
113
114impl BlockedClients {
115    pub(crate) fn new() -> Self {
116        Self::default()
117    }
118
119    /// Was a write on `key` watched by any blocker? `is_empty()` short-circuit
120    /// keeps the hot push/xadd path free of map lookups when nothing's parked.
121    #[inline]
122    pub(crate) fn is_empty(&self) -> bool {
123        self.by_key.is_empty()
124    }
125
126    #[inline]
127    pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
128        self.by_key.contains_key(key)
129    }
130
131    /// Register one waiter on each of `keys`. The same waiter is cloned into
132    /// every key's FIFO; the wake path drops the surviving copies via
133    /// `drop_for_conn` once any one fires (so a multi-key BLPOP woken by key
134    /// A does not also fire on a later push to key B).
135    pub(crate) fn add(
136        &mut self,
137        conn_id: u64,
138        keys: &[Vec<u8>],
139        kind: BlockKind,
140        deadline_ms: u64,
141        argv: Argv,
142        proto: RespVersion,
143    ) {
144        for key in keys {
145            let bc = BlockedClient {
146                conn_id,
147                kind,
148                deadline_ms,
149                argv: argv.clone(),
150                proto,
151            };
152            self.by_key.entry(key.clone()).or_default().push_back(bc);
153        }
154        self.by_conn.insert(conn_id, keys.to_vec());
155    }
156
157    /// Pop and return the oldest waiter on `key`. Caller must then call
158    /// `drop_for_conn(waiter.conn_id)` to scrub copies on this conn's other
159    /// watched keys (multi-key BLPOP), then retry `waiter.argv`.
160    pub(crate) fn pop_oldest_on_key(&mut self, key: &[u8]) -> Option<BlockedClient> {
161        let queue = self.by_key.get_mut(key)?;
162        let waiter = queue.pop_front();
163        if queue.is_empty() {
164            self.by_key.remove(key);
165        }
166        waiter
167    }
168
169    /// Drop every waiter copy belonging to `conn_id`. Called on (a) successful
170    /// wake (purge stale copies on other keys), and (b) connection close.
171    pub(crate) fn drop_for_conn(&mut self, conn_id: u64) {
172        let Some(keys) = self.by_conn.remove(&conn_id) else {
173            return;
174        };
175        for key in keys {
176            let Some(queue) = self.by_key.get_mut(&key) else {
177                continue;
178            };
179            queue.retain(|w| w.conn_id != conn_id);
180            if queue.is_empty() {
181                self.by_key.remove(&key);
182            }
183        }
184    }
185
186    /// Pop one representative waiter per conn whose `deadline_ms <= now_ms`.
187    /// All copies on the conn's other watched keys are removed too, so each
188    /// expired conn fires exactly one timeout reply.
189    pub(crate) fn pop_expired(&mut self, now_ms: u64) -> Vec<BlockedClient> {
190        let conns = self.expired_conn_ids(now_ms);
191        let mut out = Vec::with_capacity(conns.len());
192        for conn_id in conns {
193            if let Some(rep) = self.representative(conn_id) {
194                out.push(rep);
195            }
196            self.drop_for_conn(conn_id);
197        }
198        out
199    }
200
201    fn expired_conn_ids(&self, now_ms: u64) -> Vec<u64> {
202        let mut seen: Vec<u64> = Vec::new();
203        for queue in self.by_key.values() {
204            for w in queue {
205                if w.deadline_ms <= now_ms && !seen.contains(&w.conn_id) {
206                    seen.push(w.conn_id);
207                }
208            }
209        }
210        seen
211    }
212
213    fn representative(&self, conn_id: u64) -> Option<BlockedClient> {
214        let keys = self.by_conn.get(&conn_id)?;
215        let first_key = keys.first()?;
216        let queue = self.by_key.get(first_key)?;
217        queue
218            .iter()
219            .find(|w| w.conn_id == conn_id)
220            .map(|w| BlockedClient {
221                conn_id: w.conn_id,
222                kind: w.kind,
223                deadline_ms: w.deadline_ms,
224                argv: w.argv.clone(),
225                proto: w.proto,
226            })
227    }
228}
229
230impl<C: Commands> Shard<C> {
231    /// Periodic reactor tick: fire one timeout reply per blocked waiter whose
232    /// `deadline_ms <= now`. Cheap when no one is parked (`is_empty()` short-
233    /// circuit). Called from both the epoll and io_uring reactor loops on the
234    /// same cadence as the active-TTL reaper.
235    pub(crate) fn tick_blocked_timeouts(&mut self) {
236        if self.blocked.is_empty() {
237            return;
238        }
239        let now_ms = unix_now_ms();
240        for w in self.blocked.pop_expired(now_ms) {
241            let Some(conn) = self.conns.get_mut(&w.conn_id) else {
242                continue;
243            };
244            conn.blocked = false;
245            encode_block_timeout(&mut conn.output, w.kind, w.proto);
246            self.dirty.push(w.conn_id);
247        }
248    }
249
250    /// Wake the oldest waiter on `key` (FIFO, matching Redis) and retry its
251    /// command. Called by the dispatcher after a write that may have produced
252    /// new data for blocked readers — `LPUSH` / `RPUSH` for `BLPOP` /
253    /// `BRPOP`; `XADD` for `XREAD BLOCK` / `XREADGROUP BLOCK`. The retry
254    /// re-runs the original command via `Commands::dispatch_into`; if the
255    /// data has already been consumed in a race window, the retry sees an
256    /// empty list / stream and a `None` from this fn — the waiter has
257    /// already been popped out of the registry so it stays unblocked (the
258    /// next tick or a fresh client request resolves it). One push wakes one
259    /// waiter only (Redis semantics — a single LPUSH does not feed two
260    /// BLPOP clients).
261    pub(crate) fn wake_blocked_on_key(&mut self, key: &[u8]) {
262        if self.blocked.is_empty() {
263            return;
264        }
265        let Some(waiter) = self.blocked.pop_oldest_on_key(key) else {
266            return;
267        };
268        self.blocked.drop_for_conn(waiter.conn_id);
269        let Some(conn) = self.conns.get_mut(&waiter.conn_id) else {
270            return;
271        };
272        conn.blocked = false;
273        let proto = waiter.proto;
274        match proto {
275            RespVersion::V2 => self
276                .commands
277                .dispatch_into(&mut self.store, &waiter.argv, &mut conn.output),
278            RespVersion::V3 => self
279                .commands
280                .dispatch_into_resp3(&mut self.store, &waiter.argv, &mut conn.output),
281        }
282        conn.next_emit += 1;
283        self.dirty.push(waiter.conn_id);
284    }
285}