kevy_rt/blocked.rs
1// The pieces consumed by the follow-up XREAD BLOCK / XREADGROUP BLOCK
2// sprints (`BlockKind::XReadBlock`, `BlockKind::XReadGroupBlock`,
3// `BlockHint::XReadBlock`, …) are marked here; once those sprints connect
4// callers the corresponding warnings re-fire automatically.
5#![expect(
6 dead_code,
7 reason = "stream BlockKind / BlockHint variants land in v2-7d.3 / .4"
8)]
9
10//! Per-shard blocked-client registry, shared by `BLPOP` / `BRPOP` /
11//! `XREAD BLOCK` / `XREADGROUP BLOCK`.
12//!
13//! Design: when a command blocks, the conn's `argv` + `proto` is stashed
14//! under every key it watches. A subsequent write to any of those keys wakes
15//! the oldest waiter (FIFO per key, matching Redis); a periodic tick sweeps
16//! waiters past their `deadline_ms` and fires a nil reply.
17//!
18//! The registry holds no reactor / socket state — `Shard` owns the wake +
19//! reply emission paths. `BlockedClients::pop_*` returns the bookkeeping;
20//! the caller decides what RESP frame to write.
21
22use crate::Commands;
23use crate::shard::Shard;
24use kevy_resp::{Argv, RespVersion};
25use std::collections::{HashMap, VecDeque};
26use std::time::{SystemTime, UNIX_EPOCH};
27
28/// Unix wall-clock milliseconds — the time base both the dispatcher (when
29/// computing a waiter's `deadline_ms = now_ms + timeout_ms`) and the reactor
30/// tick (when checking `deadline_ms <= now_ms`) read. System-time jumps
31/// (NTP slew, manual clock change) are accepted: a backwards jump may make
32/// a waiter expire late, but BLOCK is not a wall-clock contract.
33#[inline]
34pub(crate) fn unix_now_ms() -> u64 {
35 SystemTime::now()
36 .duration_since(UNIX_EPOCH)
37 .map_or(0, |d| d.as_millis() as u64)
38}
39
40/// Emit the RESP nil reply that a timed-out blocking command returns.
41/// Shape depends on both proto and kind:
42/// - RESP3: `_\r\n` (the null type) for all kinds.
43/// - RESP2 `BLPOP` / `BRPOP`: nil array `*-1\r\n` (Redis returns nil array
44/// so the multi-bulk reply slot stays well-typed).
45/// - RESP2 `XREAD` / `XREADGROUP`: nil bulk `$-1\r\n` (matches "no streams
46/// updated in this window" — also Redis's choice).
47pub(crate) fn encode_block_timeout(out: &mut Vec<u8>, kind: BlockKind, proto: RespVersion) {
48 match (proto, kind) {
49 (RespVersion::V3, _) => out.extend_from_slice(b"_\r\n"),
50 (RespVersion::V2, BlockKind::Blpop | BlockKind::Brpop) => {
51 out.extend_from_slice(b"*-1\r\n");
52 }
53 (RespVersion::V2, BlockKind::XReadBlock | BlockKind::XReadGroupBlock) => {
54 out.extend_from_slice(b"$-1\r\n");
55 }
56 }
57}
58
59/// Which blocking command a waiter is parked in. Drives both timeout-nil
60/// shape and wake-retry dispatch.
61#[derive(Clone, Copy, Debug, PartialEq, Eq)]
62pub enum BlockKind {
63 Blpop,
64 Brpop,
65 XReadBlock,
66 XReadGroupBlock,
67}
68
69/// How a command wants to block, if at all. Returned by
70/// [`Commands::resolve`] inside [`crate::ResolvedCmd`] so the verb-table
71/// lookup happens once per command. `None` is the zero-cost default for
72/// every non-blocking verb (≥ 99.9 % of dispatches in steady state).
73///
74/// `keys` is every key the conn watches (≥ 1). The dispatcher picks the
75/// park strategy from them:
76/// - **single key on the conn's own shard** → the in-shard fast path
77/// (`BlockedClients`): register + wake without any cross-core hop.
78/// - **single remote key, or any multi-key form** → the cross-shard
79/// arbiter (`block_xshard`): the conn parks on its origin
80/// shard and watch registrations fan out to each key's owning shard.
81///
82/// For `BLPOP` / `BRPOP` the keys are list keys; for `XREAD BLOCK` /
83/// `XREADGROUP BLOCK` they are the STREAMS keys (in request order).
84#[derive(Clone, Debug, Default)]
85pub enum BlockHint {
86 #[default]
87 None,
88 Block {
89 kind: BlockKind,
90 keys: Vec<Vec<u8>>,
91 /// `0` = block forever (Redis convention). Anything else is the
92 /// wall-clock millis the dispatcher will add to `unix_now_ms()` to
93 /// derive the waiter's `deadline_ms`.
94 timeout_ms: u64,
95 },
96}
97
98pub(crate) struct BlockedClient {
99 pub(crate) conn_id: u64,
100 pub(crate) kind: BlockKind,
101 /// Unix-ms wall clock when this waiter expires. `u64::MAX` = block forever.
102 pub(crate) deadline_ms: u64,
103 pub(crate) argv: Argv,
104 pub(crate) proto: RespVersion,
105}
106
107/// FIFO per key; secondary index by conn for O(1) cleanup on wake / close.
108#[derive(Default)]
109pub(crate) struct BlockedClients {
110 by_key: HashMap<Vec<u8>, VecDeque<BlockedClient>>,
111 by_conn: HashMap<u64, Vec<Vec<u8>>>,
112}
113
114impl BlockedClients {
115 pub(crate) fn new() -> Self {
116 Self::default()
117 }
118
119 /// Was a write on `key` watched by any blocker? `is_empty()` short-circuit
120 /// keeps the hot push/xadd path free of map lookups when nothing's parked.
121 #[inline]
122 pub(crate) fn is_empty(&self) -> bool {
123 self.by_key.is_empty()
124 }
125
126 #[inline]
127 pub(crate) fn is_watched(&self, key: &[u8]) -> bool {
128 self.by_key.contains_key(key)
129 }
130
131 /// Register one waiter on each of `keys`. The same waiter is cloned into
132 /// every key's FIFO; the wake path drops the surviving copies via
133 /// `drop_for_conn` once any one fires (so a multi-key BLPOP woken by key
134 /// A does not also fire on a later push to key B).
135 pub(crate) fn add(
136 &mut self,
137 conn_id: u64,
138 keys: &[Vec<u8>],
139 kind: BlockKind,
140 deadline_ms: u64,
141 argv: Argv,
142 proto: RespVersion,
143 ) {
144 for key in keys {
145 let bc = BlockedClient {
146 conn_id,
147 kind,
148 deadline_ms,
149 argv: argv.clone(),
150 proto,
151 };
152 self.by_key.entry(key.clone()).or_default().push_back(bc);
153 }
154 self.by_conn.insert(conn_id, keys.to_vec());
155 }
156
157 /// Pop and return the oldest waiter on `key`. Caller must then call
158 /// `drop_for_conn(waiter.conn_id)` to scrub copies on this conn's other
159 /// watched keys (multi-key BLPOP), then retry `waiter.argv`.
160 pub(crate) fn pop_oldest_on_key(&mut self, key: &[u8]) -> Option<BlockedClient> {
161 let queue = self.by_key.get_mut(key)?;
162 let waiter = queue.pop_front();
163 if queue.is_empty() {
164 self.by_key.remove(key);
165 }
166 waiter
167 }
168
169 /// Drop every waiter copy belonging to `conn_id`. Called on (a) successful
170 /// wake (purge stale copies on other keys), and (b) connection close.
171 pub(crate) fn drop_for_conn(&mut self, conn_id: u64) {
172 let Some(keys) = self.by_conn.remove(&conn_id) else {
173 return;
174 };
175 for key in keys {
176 let Some(queue) = self.by_key.get_mut(&key) else {
177 continue;
178 };
179 queue.retain(|w| w.conn_id != conn_id);
180 if queue.is_empty() {
181 self.by_key.remove(&key);
182 }
183 }
184 }
185
186 /// Pop one representative waiter per conn whose `deadline_ms <= now_ms`.
187 /// All copies on the conn's other watched keys are removed too, so each
188 /// expired conn fires exactly one timeout reply.
189 pub(crate) fn pop_expired(&mut self, now_ms: u64) -> Vec<BlockedClient> {
190 let conns = self.expired_conn_ids(now_ms);
191 let mut out = Vec::with_capacity(conns.len());
192 for conn_id in conns {
193 if let Some(rep) = self.representative(conn_id) {
194 out.push(rep);
195 }
196 self.drop_for_conn(conn_id);
197 }
198 out
199 }
200
201 fn expired_conn_ids(&self, now_ms: u64) -> Vec<u64> {
202 let mut seen: Vec<u64> = Vec::new();
203 for queue in self.by_key.values() {
204 for w in queue {
205 if w.deadline_ms <= now_ms && !seen.contains(&w.conn_id) {
206 seen.push(w.conn_id);
207 }
208 }
209 }
210 seen
211 }
212
213 fn representative(&self, conn_id: u64) -> Option<BlockedClient> {
214 let keys = self.by_conn.get(&conn_id)?;
215 let first_key = keys.first()?;
216 let queue = self.by_key.get(first_key)?;
217 queue
218 .iter()
219 .find(|w| w.conn_id == conn_id)
220 .map(|w| BlockedClient {
221 conn_id: w.conn_id,
222 kind: w.kind,
223 deadline_ms: w.deadline_ms,
224 argv: w.argv.clone(),
225 proto: w.proto,
226 })
227 }
228}
229
230impl<C: Commands> Shard<C> {
231 /// Periodic reactor tick: fire one timeout reply per blocked waiter whose
232 /// `deadline_ms <= now`. Cheap when no one is parked (`is_empty()` short-
233 /// circuit). Called from both the epoll and io_uring reactor loops on the
234 /// same cadence as the active-TTL reaper.
235 pub(crate) fn tick_blocked_timeouts(&mut self) {
236 if self.blocked.is_empty() {
237 return;
238 }
239 let now_ms = unix_now_ms();
240 for w in self.blocked.pop_expired(now_ms) {
241 let Some(conn) = self.conns.get_mut(&w.conn_id) else {
242 continue;
243 };
244 conn.blocked = false;
245 encode_block_timeout(&mut conn.output, w.kind, w.proto);
246 self.dirty.push(w.conn_id);
247 }
248 }
249
250 /// Wake the oldest waiter on `key` (FIFO, matching Redis) and retry its
251 /// command. Called by the dispatcher after a write that may have produced
252 /// new data for blocked readers — `LPUSH` / `RPUSH` for `BLPOP` /
253 /// `BRPOP`; `XADD` for `XREAD BLOCK` / `XREADGROUP BLOCK`. The retry
254 /// re-runs the original command via `Commands::dispatch_into`; if the
255 /// data has already been consumed in a race window, the retry sees an
256 /// empty list / stream and a `None` from this fn — the waiter has
257 /// already been popped out of the registry so it stays unblocked (the
258 /// next tick or a fresh client request resolves it). One push wakes one
259 /// waiter only (Redis semantics — a single LPUSH does not feed two
260 /// BLPOP clients).
261 pub(crate) fn wake_blocked_on_key(&mut self, key: &[u8]) {
262 if self.blocked.is_empty() {
263 return;
264 }
265 let Some(waiter) = self.blocked.pop_oldest_on_key(key) else {
266 return;
267 };
268 self.blocked.drop_for_conn(waiter.conn_id);
269 let Some(conn) = self.conns.get_mut(&waiter.conn_id) else {
270 return;
271 };
272 conn.blocked = false;
273 let proto = waiter.proto;
274 match proto {
275 RespVersion::V2 => self
276 .commands
277 .dispatch_into(&mut self.store, &waiter.argv, &mut conn.output),
278 RespVersion::V3 => self
279 .commands
280 .dispatch_into_resp3(&mut self.store, &waiter.argv, &mut conn.output),
281 }
282 conn.next_emit += 1;
283 self.dirty.push(waiter.conn_id);
284 }
285}