kevy_rt/blocked.rs
1// The pieces consumed by the follow-up XREAD BLOCK / XREADGROUP BLOCK
2// sprints (`BlockKind::XReadBlock`, `BlockKind::XReadGroupBlock`,
3// `BlockHint::XReadBlock`, …) are marked here; once those sprints connect
4// callers the corresponding warnings re-fire automatically.
5#![expect(
6 dead_code,
7 reason = "stream BlockKind / BlockHint variants land in v2-7d.3 / .4"
8)]
9
10//! Per-shard blocked-client registry, shared by `BLPOP` / `BRPOP` /
11//! `XREAD BLOCK` / `XREADGROUP BLOCK`.
12//!
13//! Design: when a command blocks, the conn's `argv` + `proto` is stashed
14//! under every key it watches. A subsequent write to any of those keys wakes
15//! the oldest waiter (FIFO per key, matching Redis); a periodic tick sweeps
16//! waiters past their `deadline_ms` and fires a nil reply.
17//!
18//! The registry holds no reactor / socket state — `Shard` owns the wake +
19//! reply emission paths. `BlockedClients::pop_*` returns the bookkeeping;
20//! the caller decides what RESP frame to write.
21
22use crate::Commands;
23use crate::shard::Shard;
24use kevy_resp::{Argv, RespVersion};
25use std::collections::{HashMap, VecDeque};
26use std::time::{SystemTime, UNIX_EPOCH};
27
28/// Unix wall-clock milliseconds — the time base both the dispatcher (when
29/// computing a waiter's `deadline_ms = now_ms + timeout_ms`) and the reactor
30/// tick (when checking `deadline_ms <= now_ms`) read. System-time jumps
31/// (NTP slew, manual clock change) are accepted: a backwards jump may make
32/// a waiter expire late, but BLOCK is not a wall-clock contract.
33#[inline]
34pub(crate) fn unix_now_ms() -> u64 {
35 SystemTime::now()
36 .duration_since(UNIX_EPOCH)
37 .map_or(0, |d| d.as_millis() as u64)
38}
39
40/// Emit the RESP nil reply that a timed-out blocking command returns.
41/// Shape depends on both proto and kind:
42/// - RESP3: `_\r\n` (the null type) for all kinds.
43/// - RESP2 `BLPOP` / `BRPOP`: nil array `*-1\r\n` (Redis returns nil array
44/// so the multi-bulk reply slot stays well-typed).
45/// - RESP2 `XREAD` / `XREADGROUP`: nil bulk `$-1\r\n` (matches "no streams
46/// updated in this window" — also Redis's choice).
47pub(crate) fn encode_block_timeout(out: &mut Vec<u8>, kind: BlockKind, proto: RespVersion) {
48 match (proto, kind) {
49 (RespVersion::V3, _) => out.extend_from_slice(b"_\r\n"),
50 (RespVersion::V2, BlockKind::Blpop | BlockKind::Brpop | BlockKind::Bzpopmin) => {
51 out.extend_from_slice(b"*-1\r\n");
52 }
53 (RespVersion::V2, BlockKind::XReadBlock | BlockKind::XReadGroupBlock) => {
54 out.extend_from_slice(b"$-1\r\n");
55 }
56 // BRPOPLPUSH on timeout returns nil bulk (the would-be moved
57 // element). Same shape as XREAD timeout.
58 (RespVersion::V2, BlockKind::Brpoplpush) => {
59 out.extend_from_slice(b"$-1\r\n");
60 }
61 }
62}
63
64/// Which blocking command a waiter is parked in. Drives both timeout-nil
65/// shape and wake-retry dispatch.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum BlockKind {
68 Blpop,
69 Brpop,
70 /// `BZPOPMIN key [key ...] timeout` — block until a sorted set has a
71 /// member, then pop the lowest-scored one. Same arm-and-serve flow as
72 /// `BLPOP`; the reply shape adds a third bulk (the score).
73 Bzpopmin,
74 /// `BRPOPLPUSH source destination timeout` — atomic blocking
75 /// right-pop from `source` + left-push to `destination`. Parks
76 /// on `source` only. Reply: single bulk of the moved element on
77 /// success, nil bulk on timeout. Deprecated since Redis 6.2 in
78 /// favour of BLMOVE, but Bee Queue (and many older clients)
79 /// still emit it.
80 Brpoplpush,
81 XReadBlock,
82 XReadGroupBlock,
83}
84
85/// How a command wants to block, if at all. Returned by
86/// [`Commands::resolve`] inside [`crate::ResolvedCmd`] so the verb-table
87/// lookup happens once per command. `None` is the zero-cost default for
88/// every non-blocking verb (≥ 99.9 % of dispatches in steady state).
89///
90/// `keys` is every key the conn watches (≥ 1). The dispatcher picks the
91/// park strategy from them:
92/// - **single key on the conn's own shard** → the in-shard fast path
93/// (`BlockedClients`): register + wake without any cross-core hop.
94/// - **single remote key, or any multi-key form** → the cross-shard
95/// arbiter (`block_xshard`): the conn parks on its origin
96/// shard and watch registrations fan out to each key's owning shard.
97///
98/// For `BLPOP` / `BRPOP` the keys are list keys; for `XREAD BLOCK` /
99/// `XREADGROUP BLOCK` they are the STREAMS keys (in request order).
100#[derive(Clone, Debug, Default)]
101pub enum BlockHint {
102 #[default]
103 None,
104 Block {
105 kind: BlockKind,
106 keys: Vec<Vec<u8>>,
107 /// `0` = block forever (Redis convention). Anything else is the
108 /// wall-clock millis the dispatcher will add to `unix_now_ms()` to
109 /// derive the waiter's `deadline_ms`.
110 timeout_ms: u64,
111 },
112}
113
114pub(crate) struct BlockedClient {
115 pub(crate) conn_id: u64,
116 pub(crate) kind: BlockKind,
117 /// Unix-ms wall clock when this waiter expires. `u64::MAX` = block forever.
118 pub(crate) deadline_ms: u64,
119 pub(crate) argv: Argv,
120 pub(crate) proto: RespVersion,
121}
122
123/// FIFO per key; secondary index by conn for O(1) cleanup on wake / close.
124#[derive(Default)]
125pub(crate) struct BlockedClients {
126 by_key: HashMap<Vec<u8>, VecDeque<BlockedClient>>,
127 by_conn: HashMap<u64, Vec<Vec<u8>>>,
128}
129
130impl BlockedClients {
131 pub(crate) fn new() -> Self {
132 Self::default()
133 }
134
135 /// Was a write on `key` watched by any blocker? `is_empty()` short-circuit
136 /// keeps the hot push/xadd path free of map lookups when nothing's parked.
137 #[inline]
138 pub(crate) fn is_empty(&self) -> bool {
139 self.by_key.is_empty()
140 }
141
142 #[inline]
143 pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
144 self.by_key.contains_key(key)
145 }
146
147 /// Register one waiter on each of `keys`. The same waiter is cloned into
148 /// every key's FIFO; the wake path drops the surviving copies via
149 /// `drop_for_conn` once any one fires (so a multi-key BLPOP woken by key
150 /// A does not also fire on a later push to key B).
151 pub(crate) fn add(
152 &mut self,
153 conn_id: u64,
154 keys: &[Vec<u8>],
155 kind: BlockKind,
156 deadline_ms: u64,
157 argv: Argv,
158 proto: RespVersion,
159 ) {
160 for key in keys {
161 let bc = BlockedClient {
162 conn_id,
163 kind,
164 deadline_ms,
165 argv: argv.clone(),
166 proto,
167 };
168 self.by_key.entry(key.clone()).or_default().push_back(bc);
169 }
170 self.by_conn.insert(conn_id, keys.to_vec());
171 }
172
173 /// Pop and return the oldest waiter on `key`. Caller must then call
174 /// `drop_for_conn(waiter.conn_id)` to scrub copies on this conn's other
175 /// watched keys (multi-key BLPOP), then retry `waiter.argv`.
176 pub(crate) fn pop_oldest_on_key(&mut self, key: &[u8]) -> Option<BlockedClient> {
177 let queue = self.by_key.get_mut(key)?;
178 let waiter = queue.pop_front();
179 if queue.is_empty() {
180 self.by_key.remove(key);
181 }
182 waiter
183 }
184
185 /// Drop every waiter copy belonging to `conn_id`. Called on (a) successful
186 /// wake (purge stale copies on other keys), and (b) connection close.
187 pub(crate) fn drop_for_conn(&mut self, conn_id: u64) {
188 let Some(keys) = self.by_conn.remove(&conn_id) else {
189 return;
190 };
191 for key in keys {
192 let Some(queue) = self.by_key.get_mut(&key) else {
193 continue;
194 };
195 queue.retain(|w| w.conn_id != conn_id);
196 if queue.is_empty() {
197 self.by_key.remove(&key);
198 }
199 }
200 }
201
202 /// Pop one representative waiter per conn whose `deadline_ms <= now_ms`.
203 /// All copies on the conn's other watched keys are removed too, so each
204 /// expired conn fires exactly one timeout reply.
205 pub(crate) fn pop_expired(&mut self, now_ms: u64) -> Vec<BlockedClient> {
206 let conns = self.expired_conn_ids(now_ms);
207 let mut out = Vec::with_capacity(conns.len());
208 for conn_id in conns {
209 if let Some(rep) = self.representative(conn_id) {
210 out.push(rep);
211 }
212 self.drop_for_conn(conn_id);
213 }
214 out
215 }
216
217 fn expired_conn_ids(&self, now_ms: u64) -> Vec<u64> {
218 let mut seen: Vec<u64> = Vec::new();
219 for queue in self.by_key.values() {
220 for w in queue {
221 if w.deadline_ms <= now_ms && !seen.contains(&w.conn_id) {
222 seen.push(w.conn_id);
223 }
224 }
225 }
226 seen
227 }
228
229 fn representative(&self, conn_id: u64) -> Option<BlockedClient> {
230 let keys = self.by_conn.get(&conn_id)?;
231 let first_key = keys.first()?;
232 let queue = self.by_key.get(first_key)?;
233 queue
234 .iter()
235 .find(|w| w.conn_id == conn_id)
236 .map(|w| BlockedClient {
237 conn_id: w.conn_id,
238 kind: w.kind,
239 deadline_ms: w.deadline_ms,
240 argv: w.argv.clone(),
241 proto: w.proto,
242 })
243 }
244}
245
246impl<C: Commands> Shard<C> {
247 /// Periodic reactor tick: fire one timeout reply per blocked waiter whose
248 /// `deadline_ms <= now`. Cheap when no one is parked (`is_empty()` short-
249 /// circuit). Called from both the epoll and io_uring reactor loops on the
250 /// same cadence as the active-TTL reaper.
251 pub(crate) fn tick_blocked_timeouts(&mut self) {
252 if self.blocked.is_empty() {
253 return;
254 }
255 let now_ms = unix_now_ms();
256 for w in self.blocked.pop_expired(now_ms) {
257 let Some(conn) = self.conns.get_mut(&w.conn_id) else {
258 continue;
259 };
260 conn.blocked = false;
261 encode_block_timeout(&mut conn.output, w.kind, w.proto);
262 self.dirty.push(w.conn_id);
263 }
264 }
265
266 /// Wake the oldest waiter on `key` (FIFO, matching Redis) and retry its
267 /// command. Called by the dispatcher after a write that may have produced
268 /// new data for blocked readers — `LPUSH` / `RPUSH` for `BLPOP` /
269 /// `BRPOP`; `XADD` for `XREAD BLOCK` / `XREADGROUP BLOCK`. The retry
270 /// re-runs the original command via `Commands::dispatch_into`; if the
271 /// data has already been consumed in a race window, the retry sees an
272 /// empty list / stream and a `None` from this fn — the waiter has
273 /// already been popped out of the registry so it stays unblocked (the
274 /// next tick or a fresh client request resolves it). One push wakes one
275 /// waiter only (Redis semantics — a single LPUSH does not feed two
276 /// BLPOP clients).
277 pub(crate) fn wake_blocked_on_key(&mut self, key: &[u8]) {
278 if self.blocked.is_empty() {
279 return;
280 }
281 let Some(waiter) = self.blocked.pop_oldest_on_key(key) else {
282 return;
283 };
284 self.blocked.drop_for_conn(waiter.conn_id);
285 let Some(conn) = self.conns.get_mut(&waiter.conn_id) else {
286 return;
287 };
288 conn.blocked = false;
289 let proto = waiter.proto;
290 match proto {
291 RespVersion::V2 => self
292 .commands
293 .dispatch_into(&mut self.store, &waiter.argv, &mut conn.output),
294 RespVersion::V3 => self
295 .commands
296 .dispatch_into_resp3(&mut self.store, &waiter.argv, &mut conn.output),
297 }
298 conn.next_emit += 1;
299 self.dirty.push(waiter.conn_id);
300 }
301}