kevy_embedded/pubsub.rs
1//! In-process pub/sub bus for embedded `Store`.
2//!
3//! Mirrors the Redis/kevy server pub/sub semantics inside a single process:
4//! `Store::publish` walks the channel + pattern subscriber tables and
5//! enqueues a [`PubsubFrame`] onto each matching [`Subscription`]'s
6//! `std::sync::mpsc` channel. Each `Subscription` drains its own queue via
7//! [`Subscription::recv`] / [`Subscription::recv_timeout`] /
8//! [`Subscription::try_recv`].
9//!
10//! The bus lives inside `Inner` and is reached only under the embedded
11//! mutex; per-publish we clone the matching senders out, drop the lock,
12//! then `send()` — so a slow receiver can't stall publishes on unrelated
13//! channels.
14
15use std::collections::HashSet;
16use std::io;
17use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError, channel};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use crate::store::Inner;
22
23/// One pub/sub event delivered to a [`Subscription`].
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PubsubFrame {
26 /// Ack: `SUBSCRIBE` succeeded on `channel`.
27 Subscribe {
28 /// Channel that was just subscribed.
29 channel: Vec<u8>,
30 /// Total channels + patterns this subscription holds after the op.
31 count: usize,
32 },
33 /// Ack: `PSUBSCRIBE` succeeded on `pattern`.
34 Psubscribe {
35 /// Pattern that was just subscribed.
36 pattern: Vec<u8>,
37 /// Total channels + patterns this subscription holds after the op.
38 count: usize,
39 },
40 /// Ack: `UNSUBSCRIBE` removed `channel` (or "all", when `None`).
41 Unsubscribe {
42 /// Channel that was just unsubscribed (`None` = "all").
43 channel: Option<Vec<u8>>,
44 /// Total channels + patterns still held after the op.
45 count: usize,
46 },
47 /// Ack: `PUNSUBSCRIBE` removed `pattern` (or "all", when `None`).
48 Punsubscribe {
49 /// Pattern that was just unsubscribed (`None` = "all").
50 pattern: Option<Vec<u8>>,
51 /// Total channels + patterns still held after the op.
52 count: usize,
53 },
54 /// A `PUBLISH` reached a channel this subscription holds directly.
55 Message {
56 /// Channel the publish was made to.
57 channel: Vec<u8>,
58 /// Raw payload bytes.
59 payload: Vec<u8>,
60 },
61 /// A `PUBLISH` reached a channel matching one of this subscription's
62 /// patterns.
63 Pmessage {
64 /// Pattern the channel matched.
65 pattern: Vec<u8>,
66 /// Channel the publish was made to.
67 channel: Vec<u8>,
68 /// Raw payload bytes.
69 payload: Vec<u8>,
70 },
71}
72
73// `BusEntry` + `PubsubBus` live in [`crate::pubsub_bus`] — split out so
74// this file stays under the 500-LOC house rule. Re-exported below so
75// `crate::store::Inner` keeps its existing `pubsub::PubsubBus` import.
76pub(crate) use crate::pubsub_bus::PubsubBus;
77
78/// A handle to one subscription — owns the receive end of the bus channel.
79///
80/// Drop unsubscribes from everything automatically. While the handle is
81/// alive, [`recv`](Self::recv) / [`recv_timeout`](Self::recv_timeout) /
82/// [`try_recv`](Self::try_recv) drain queued [`PubsubFrame`]s in arrival
83/// order.
84///
85/// **Threading.** `Subscription` is `Send + Sync` —
86/// `Arc<Subscription>` works, so multiple async tasks (or
87/// `spawn_blocking` jobs) can share one subscription and call `recv`
88/// concurrently. The underlying `std::sync::mpsc::Receiver` is
89/// !Sync, so we wrap it (and the matching ack `Sender`) in a `Mutex`;
90/// concurrent `recv` callers serialise on that lock, with each call
91/// receiving a *different* frame in arrival order (single-consumer
92/// semantics — NOT broadcast fanout). `try_recv` is non-blocking even
93/// under contention: if the lock is held by a blocking `recv`,
94/// `try_recv` returns `Ok(None)` rather than waiting.
95///
96/// If you need broadcast fanout (every subscriber sees every message),
97/// open a separate `Subscription` per consumer — they're cheap.
98#[allow(missing_debug_implementations)]
99pub struct Subscription {
100 inner: Arc<Mutex<Inner>>,
101 // Keeps the AOF/reaper alive as long as a Subscription does — so
102 // dropping every `Store` clone while a subscriber is still active
103 // leaves the keyspace intact until the subscriber also goes away.
104 _guard: Arc<crate::store::DropGuard>,
105 // `Receiver<T>` is `Send + !Sync`; wrap so `Subscription: Sync`.
106 // Hot path (recv) acquires + holds the lock during the blocking
107 // wait — single consumer at a time; concurrent recv callers
108 // serialise and each get a different frame. See type-level
109 // doc-comment for the trade-off.
110 receiver: Mutex<Receiver<PubsubFrame>>,
111 // `Sender<T>` is also !Sync (Send + Clone but cannot be shared by
112 // reference across threads). Wrap so the ack-frame path (called
113 // from subscribe/unsubscribe / Drop) can run from any thread.
114 sender: Mutex<Sender<PubsubFrame>>,
115 id: u64,
116 channels: HashSet<Vec<u8>>,
117 patterns: HashSet<Vec<u8>>,
118}
119
120impl Subscription {
121 pub(crate) fn new(inner: Arc<Mutex<Inner>>, guard: Arc<crate::store::DropGuard>) -> Self {
122 let (sender, receiver) = channel();
123 let id = inner
124 .lock()
125 .unwrap_or_else(|p| p.into_inner())
126 .bus
127 .alloc_id();
128 Self {
129 inner,
130 _guard: guard,
131 receiver: Mutex::new(receiver),
132 sender: Mutex::new(sender),
133 id,
134 channels: HashSet::new(),
135 patterns: HashSet::new(),
136 }
137 }
138
139 /// Clone of the inbound `Sender`. Used both for ack frames (Subscribe /
140 /// Unsubscribe / ...) and to register a sender clone inside
141 /// `PubsubBus`. Calling this acquires the sender lock briefly (~20 ns).
142 fn sender_clone(&self) -> Sender<PubsubFrame> {
143 self.sender
144 .lock()
145 .unwrap_or_else(|p| p.into_inner())
146 .clone()
147 }
148
149 /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks are
150 /// enqueued onto the receive queue in order.
151 pub fn subscribe(&mut self, channels: &[&[u8]]) {
152 let s = self.sender_clone();
153 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
154 for ch in channels {
155 let owned = ch.to_vec();
156 let added = g.bus.add_channel(self.id, &s, owned.clone());
157 if added {
158 self.channels.insert(owned.clone());
159 }
160 let count = g.bus.count_for(self.id);
161 let _ = s.send(PubsubFrame::Subscribe {
162 channel: owned,
163 count,
164 });
165 }
166 }
167
168 /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
169 /// (`*`, `?`, `[abc]`).
170 pub fn psubscribe(&mut self, patterns: &[&[u8]]) {
171 let s = self.sender_clone();
172 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
173 for pat in patterns {
174 let owned = pat.to_vec();
175 let added = g.bus.add_pattern(self.id, &s, owned.clone());
176 if added {
177 self.patterns.insert(owned.clone());
178 }
179 let count = g.bus.count_for(self.id);
180 let _ = s.send(PubsubFrame::Psubscribe {
181 pattern: owned,
182 count,
183 });
184 }
185 }
186
187 /// `UNSUBSCRIBE [channel ...]`. Empty `channels` removes every channel
188 /// subscription this handle holds (matching the Redis wire shape:
189 /// individual ack frames for each channel that was actually removed,
190 /// or a single `Unsubscribe { channel: None }` if none were held).
191 pub fn unsubscribe(&mut self, channels: &[&[u8]]) {
192 if channels.is_empty() {
193 self.drain_channel_subs();
194 return;
195 }
196 let s = self.sender_clone();
197 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
198 for ch in channels {
199 let owned = ch.to_vec();
200 let _ = g.bus.remove_channel(self.id, &owned);
201 self.channels.remove(&owned);
202 let count = g.bus.count_for(self.id);
203 let _ = s.send(PubsubFrame::Unsubscribe {
204 channel: Some(owned),
205 count,
206 });
207 }
208 }
209
210 /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` removes every pattern.
211 pub fn punsubscribe(&mut self, patterns: &[&[u8]]) {
212 if patterns.is_empty() {
213 self.drain_pattern_subs();
214 return;
215 }
216 let s = self.sender_clone();
217 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
218 for pat in patterns {
219 let owned = pat.to_vec();
220 let _ = g.bus.remove_pattern(self.id, &owned);
221 self.patterns.remove(&owned);
222 let count = g.bus.count_for(self.id);
223 let _ = s.send(PubsubFrame::Punsubscribe {
224 pattern: Some(owned),
225 count,
226 });
227 }
228 }
229
230 fn drain_channel_subs(&mut self) {
231 let s = self.sender_clone();
232 let owned: Vec<Vec<u8>> = self.channels.drain().collect();
233 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
234 if owned.is_empty() {
235 let count = g.bus.count_for(self.id);
236 let _ = s.send(PubsubFrame::Unsubscribe { channel: None, count });
237 return;
238 }
239 for ch in owned {
240 let _ = g.bus.remove_channel(self.id, &ch);
241 let count = g.bus.count_for(self.id);
242 let _ = s.send(PubsubFrame::Unsubscribe {
243 channel: Some(ch),
244 count,
245 });
246 }
247 }
248
249 fn drain_pattern_subs(&mut self) {
250 let s = self.sender_clone();
251 let owned: Vec<Vec<u8>> = self.patterns.drain().collect();
252 let mut g = self.inner.lock().unwrap_or_else(|p| p.into_inner());
253 if owned.is_empty() {
254 let count = g.bus.count_for(self.id);
255 let _ = s.send(PubsubFrame::Punsubscribe { pattern: None, count });
256 return;
257 }
258 for p in owned {
259 let _ = g.bus.remove_pattern(self.id, &p);
260 let count = g.bus.count_for(self.id);
261 let _ = s.send(PubsubFrame::Punsubscribe {
262 pattern: Some(p),
263 count,
264 });
265 }
266 }
267
268 /// Block until one frame is queued. `Err(io::ErrorKind::UnexpectedEof)`
269 /// once the underlying bus tears down (last `Store` clone dropped).
270 ///
271 /// Acquires the receiver mutex for the entire blocking wait — other
272 /// `recv`/`recv_timeout` callers serialise behind this one. Concurrent
273 /// `try_recv` calls return `Ok(None)` while a `recv` is blocked (no
274 /// wait on the lock); see the type-level doc for the trade-off.
275 pub fn recv(&self) -> io::Result<PubsubFrame> {
276 let g = self.receiver.lock().unwrap_or_else(|p| p.into_inner());
277 g.recv()
278 .map_err(|_| io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed"))
279 }
280
281 /// Bounded blocking recv. `Err(io::ErrorKind::TimedOut)` when `dur`
282 /// elapses; `Err(io::ErrorKind::UnexpectedEof)` when the bus is gone.
283 pub fn recv_timeout(&self, dur: Duration) -> io::Result<PubsubFrame> {
284 let g = self.receiver.lock().unwrap_or_else(|p| p.into_inner());
285 g.recv_timeout(dur).map_err(|e| match e {
286 RecvTimeoutError::Timeout => io::Error::from(io::ErrorKind::TimedOut),
287 RecvTimeoutError::Disconnected => {
288 io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed")
289 }
290 })
291 }
292
293 /// Non-blocking recv. `Ok(None)` if the queue is empty;
294 /// `Err(UnexpectedEof)` when the bus is gone.
295 ///
296 /// Uses `try_lock` so a concurrent blocking `recv` doesn't make
297 /// `try_recv` itself block — lock contention is reported as `Ok(None)`
298 /// (semantically: "no frame available right now"). Same shape callers
299 /// already handle for an empty queue.
300 pub fn try_recv(&self) -> io::Result<Option<PubsubFrame>> {
301 let g = match self.receiver.try_lock() {
302 Ok(g) => g,
303 Err(_) => return Ok(None),
304 };
305 match g.try_recv() {
306 Ok(f) => Ok(Some(f)),
307 Err(TryRecvError::Empty) => Ok(None),
308 Err(TryRecvError::Disconnected) => {
309 Err(io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed"))
310 }
311 }
312 }
313}
314
315impl std::fmt::Debug for Subscription {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 f.debug_struct("Subscription")
318 .field("id", &self.id)
319 .field("channels", &self.channels.len())
320 .field("patterns", &self.patterns.len())
321 .finish_non_exhaustive()
322 }
323}
324
325impl Drop for Subscription {
326 fn drop(&mut self) {
327 // Best-effort cleanup. If the underlying Inner is poisoned we still
328 // remove our entries; the AtomicBool / send stuff doesn't care.
329 if let Ok(mut g) = self.inner.lock() {
330 g.bus.remove_all_for(self.id);
331 } else if let Ok(mut g) = self.inner.clear_poison_and_lock() {
332 // Mutex::clear_poison + reacquire is stable since Rust 1.77; we
333 // pin rust-version=1.95 so this is available. The `else` branch
334 // above is unreachable in practice given we always recover from
335 // poison ourselves; left here so the cleanup is total.
336 g.bus.remove_all_for(self.id);
337 }
338 }
339}
340
341/// Tiny helper trait so `Drop` can recover from poison without
342/// pulling in the explicit `poison.into_inner()` dance. Local to the
343/// module; not part of the public API.
344trait LockExt<'a, T> {
345 fn clear_poison_and_lock(&'a self) -> std::sync::LockResult<std::sync::MutexGuard<'a, T>>;
346}
347
348impl<'a, T> LockExt<'a, T> for Mutex<T> {
349 fn clear_poison_and_lock(&'a self) -> std::sync::LockResult<std::sync::MutexGuard<'a, T>> {
350 self.clear_poison();
351 self.lock()
352 }
353}
354
355#[cfg(test)]
356#[path = "pubsub_tests.rs"]
357mod tests;