kevy_rt/lua_wake_bridge.rs
1//! Bridge from Lua's `redis.call` writes to the runtime's blocked-
2//! waiter wake hook.
3//!
4//! ## The problem (v1.27.3 root cause)
5//!
6//! When a Lua script calls `redis.call('XADD', ...)`, the kevy-side
7//! dispatch closure in `kevy::cmd_lua` routes the call through
8//! `Commands::dispatch_into(&mut Store, ...)` directly — that hits the
9//! shard's `Store`, but it does NOT go through the runtime's
10//! `commit_write` path which is where [`Shard::wake_key`] fires for
11//! parked `BLPOP` / `BRPOP` / `XREAD BLOCK` / `BZPOPMIN` waiters.
12//!
13//! Net effect under v1.27.3-dev: BullMQ Worker's `BZPOPMIN` on the
14//! marker key, and `QueueEvents`' `XREAD BLOCK` on the events stream,
15//! both fail to wake when an EVAL script writes the trigger value.
16//! Jobs still complete (visible in `getJobCounts`) but the wake-driven
17//! pipeline is missing, leaving listeners hanging until their own
18//! timeout cycle.
19//!
20//! ## The bridge
21//!
22//! Pure thread-local buffer. The dispatch closure pushes affected
23//! write keys here after each wake-triggering `redis.call`; the
24//! runtime drains and fires `wake_key` for each after the EVAL
25//! returns. Single-threaded per shard so a `Cell<Vec<...>>` would
26//! suffice, but `RefCell<Vec<...>>` gives a cleaner `take` shape.
27//!
28//! Zero overhead when no Lua write happens this dispatch (the buffer
29//! stays empty → drain is one capacity-check branch).
30
31use std::cell::RefCell;
32
33thread_local! {
34 static LUA_WAKE_BUFFER: RefCell<Vec<Vec<u8>>> = const { RefCell::new(Vec::new()) };
35}
36
37/// Lua's `redis.call` dispatch closure calls this after every
38/// wake-triggering write (LPUSH / RPUSH / XADD / ZADD / ZINCRBY).
39/// The runtime drains via [`drain_lua_wake_buffer`] after the outer
40/// EVAL dispatch returns and fires `wake_key` for each.
41///
42/// Cheap: one thread-local lookup + one `Vec::push` per call.
43pub fn push_lua_wake_key(key: &[u8]) {
44 LUA_WAKE_BUFFER.with(|b| b.borrow_mut().push(key.to_vec()));
45}
46
47/// Drain the per-shard Lua wake buffer. The runtime calls this once
48/// after every top-level command dispatch (the EVAL/EVALSHA case is
49/// what fills it; every other verb's drain is a single empty-check).
50pub(crate) fn drain_lua_wake_buffer() -> Vec<Vec<u8>> {
51 LUA_WAKE_BUFFER.with(|b| std::mem::take(&mut *b.borrow_mut()))
52}