Skip to main content

kevy_embedded/
pubsub.rs

1//! In-process pub/sub bus for embedded `Store`.
2//!
3//! Mirrors the Redis/kevy server pub/sub semantics inside a single process:
4//! `Store::publish` walks the channel + pattern subscriber tables and
5//! enqueues a [`PubsubFrame`] onto each matching [`Subscription`]'s
6//! `std::sync::mpsc` channel. Each `Subscription` drains its own queue via
7//! [`Subscription::recv`] / [`Subscription::recv_timeout`] /
8//! [`Subscription::try_recv`].
9//!
10//! The bus lives inside `Inner` and is reached only under the embedded
11//! mutex; per-publish we clone the matching senders out, drop the lock,
12//! then `send()` — so a slow receiver can't stall publishes on unrelated
13//! channels.
14
15use std::collections::HashSet;
16use std::io;
17use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError, channel};
18use std::sync::{Arc, Mutex, RwLock};
19use std::time::Duration;
20
21use crate::store::Inner;
22
23/// One pub/sub event delivered to a [`Subscription`].
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PubsubFrame {
26    /// Ack: `SUBSCRIBE` succeeded on `channel`.
27    Subscribe {
28        /// Channel that was just subscribed.
29        channel: Vec<u8>,
30        /// Total channels + patterns this subscription holds after the op.
31        count: usize,
32    },
33    /// Ack: `PSUBSCRIBE` succeeded on `pattern`.
34    Psubscribe {
35        /// Pattern that was just subscribed.
36        pattern: Vec<u8>,
37        /// Total channels + patterns this subscription holds after the op.
38        count: usize,
39    },
40    /// Ack: `UNSUBSCRIBE` removed `channel` (or "all", when `None`).
41    Unsubscribe {
42        /// Channel that was just unsubscribed (`None` = "all").
43        channel: Option<Vec<u8>>,
44        /// Total channels + patterns still held after the op.
45        count: usize,
46    },
47    /// Ack: `PUNSUBSCRIBE` removed `pattern` (or "all", when `None`).
48    Punsubscribe {
49        /// Pattern that was just unsubscribed (`None` = "all").
50        pattern: Option<Vec<u8>>,
51        /// Total channels + patterns still held after the op.
52        count: usize,
53    },
54    /// A `PUBLISH` reached a channel this subscription holds directly.
55    Message {
56        /// Channel the publish was made to.
57        channel: Vec<u8>,
58        /// Raw payload bytes.
59        payload: Vec<u8>,
60    },
61    /// A `PUBLISH` reached a channel matching one of this subscription's
62    /// patterns.
63    Pmessage {
64        /// Pattern the channel matched.
65        pattern: Vec<u8>,
66        /// Channel the publish was made to.
67        channel: Vec<u8>,
68        /// Raw payload bytes.
69        payload: Vec<u8>,
70    },
71}
72
73// `BusEntry` + `PubsubBus` live in [`crate::pubsub_bus`] — split out so
74// this file stays under the 500-LOC house rule. Re-exported below so
75// `crate::store::Inner` keeps its existing `pubsub::PubsubBus` import.
76pub(crate) use crate::pubsub_bus::PubsubBus;
77
78/// A handle to one subscription — owns the receive end of the bus channel.
79///
80/// Drop unsubscribes from everything automatically. While the handle is
81/// alive, [`recv`](Self::recv) / [`recv_timeout`](Self::recv_timeout) /
82/// [`try_recv`](Self::try_recv) drain queued [`PubsubFrame`]s in arrival
83/// order.
84///
85/// **Threading.** `Subscription` is `Send + Sync` —
86/// `Arc<Subscription>` works, so multiple async tasks (or
87/// `spawn_blocking` jobs) can share one subscription and call `recv`
88/// concurrently. The underlying `std::sync::mpsc::Receiver` is
89/// !Sync, so we wrap it (and the matching ack `Sender`) in a `Mutex`;
90/// concurrent `recv` callers serialise on that lock, with each call
91/// receiving a *different* frame in arrival order (single-consumer
92/// semantics — NOT broadcast fanout). `try_recv` is non-blocking even
93/// under contention: if the lock is held by a blocking `recv`,
94/// `try_recv` returns `Ok(None)` rather than waiting.
95///
96/// If you need broadcast fanout (every subscriber sees every message),
97/// open a separate `Subscription` per consumer — they're cheap.
98#[allow(missing_debug_implementations)]
99pub struct Subscription {
100    inner: Arc<RwLock<Inner>>,
101    // Keeps the AOF/reaper alive as long as a Subscription does — so
102    // dropping every `Store` clone while a subscriber is still active
103    // leaves the keyspace intact until the subscriber also goes away.
104    _guard: Arc<crate::store::DropGuard>,
105    // `Receiver<T>` is `Send + !Sync`; wrap so `Subscription: Sync`.
106    // Hot path (recv) acquires + holds the lock during the blocking
107    // wait — single consumer at a time; concurrent recv callers
108    // serialise and each get a different frame. See type-level
109    // doc-comment for the trade-off.
110    receiver: Mutex<Receiver<PubsubFrame>>,
111    // `Sender<T>` is also !Sync (Send + Clone but cannot be shared by
112    // reference across threads). Wrap so the ack-frame path (called
113    // from subscribe/unsubscribe / Drop) can run from any thread.
114    sender: Mutex<Sender<PubsubFrame>>,
115    id: u64,
116    channels: HashSet<Vec<u8>>,
117    patterns: HashSet<Vec<u8>>,
118}
119
120impl Subscription {
121    pub(crate) fn new(inner: Arc<RwLock<Inner>>, guard: Arc<crate::store::DropGuard>) -> Self {
122        let (sender, receiver) = channel();
123        let id = inner
124            .write()
125            .unwrap_or_else(|p| p.into_inner())
126            .bus
127            .alloc_id();
128        Self {
129            inner,
130            _guard: guard,
131            receiver: Mutex::new(receiver),
132            sender: Mutex::new(sender),
133            id,
134            channels: HashSet::new(),
135            patterns: HashSet::new(),
136        }
137    }
138
139    /// Clone of the inbound `Sender`. Used both for ack frames (Subscribe /
140    /// Unsubscribe / ...) and to register a sender clone inside
141    /// `PubsubBus`. Calling this acquires the sender lock briefly (~20 ns).
142    fn sender_clone(&self) -> Sender<PubsubFrame> {
143        self.sender
144            .lock()
145            .unwrap_or_else(|p| p.into_inner())
146            .clone()
147    }
148
149    /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks are
150    /// enqueued onto the receive queue in order.
151    pub fn subscribe(&mut self, channels: &[&[u8]]) {
152        let s = self.sender_clone();
153        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
154        for ch in channels {
155            let owned = ch.to_vec();
156            let added = g.bus.add_channel(self.id, &s, owned.clone());
157            if added {
158                self.channels.insert(owned.clone());
159            }
160            let count = g.bus.count_for(self.id);
161            let _ = s.send(PubsubFrame::Subscribe {
162                channel: owned,
163                count,
164            });
165        }
166    }
167
168    /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
169    /// (`*`, `?`, `[abc]`).
170    pub fn psubscribe(&mut self, patterns: &[&[u8]]) {
171        let s = self.sender_clone();
172        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
173        for pat in patterns {
174            let owned = pat.to_vec();
175            let added = g.bus.add_pattern(self.id, &s, owned.clone());
176            if added {
177                self.patterns.insert(owned.clone());
178            }
179            let count = g.bus.count_for(self.id);
180            let _ = s.send(PubsubFrame::Psubscribe {
181                pattern: owned,
182                count,
183            });
184        }
185    }
186
187    /// `UNSUBSCRIBE [channel ...]`. Empty `channels` removes every channel
188    /// subscription this handle holds (matching the Redis wire shape:
189    /// individual ack frames for each channel that was actually removed,
190    /// or a single `Unsubscribe { channel: None }` if none were held).
191    pub fn unsubscribe(&mut self, channels: &[&[u8]]) {
192        if channels.is_empty() {
193            self.drain_channel_subs();
194            return;
195        }
196        let s = self.sender_clone();
197        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
198        for ch in channels {
199            let owned = ch.to_vec();
200            let _ = g.bus.remove_channel(self.id, &owned);
201            self.channels.remove(&owned);
202            let count = g.bus.count_for(self.id);
203            let _ = s.send(PubsubFrame::Unsubscribe {
204                channel: Some(owned),
205                count,
206            });
207        }
208    }
209
210    /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` removes every pattern.
211    pub fn punsubscribe(&mut self, patterns: &[&[u8]]) {
212        if patterns.is_empty() {
213            self.drain_pattern_subs();
214            return;
215        }
216        let s = self.sender_clone();
217        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
218        for pat in patterns {
219            let owned = pat.to_vec();
220            let _ = g.bus.remove_pattern(self.id, &owned);
221            self.patterns.remove(&owned);
222            let count = g.bus.count_for(self.id);
223            let _ = s.send(PubsubFrame::Punsubscribe {
224                pattern: Some(owned),
225                count,
226            });
227        }
228    }
229
230    fn drain_channel_subs(&mut self) {
231        let s = self.sender_clone();
232        let owned: Vec<Vec<u8>> = self.channels.drain().collect();
233        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
234        if owned.is_empty() {
235            let count = g.bus.count_for(self.id);
236            let _ = s.send(PubsubFrame::Unsubscribe { channel: None, count });
237            return;
238        }
239        for ch in owned {
240            let _ = g.bus.remove_channel(self.id, &ch);
241            let count = g.bus.count_for(self.id);
242            let _ = s.send(PubsubFrame::Unsubscribe {
243                channel: Some(ch),
244                count,
245            });
246        }
247    }
248
249    fn drain_pattern_subs(&mut self) {
250        let s = self.sender_clone();
251        let owned: Vec<Vec<u8>> = self.patterns.drain().collect();
252        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
253        if owned.is_empty() {
254            let count = g.bus.count_for(self.id);
255            let _ = s.send(PubsubFrame::Punsubscribe { pattern: None, count });
256            return;
257        }
258        for p in owned {
259            let _ = g.bus.remove_pattern(self.id, &p);
260            let count = g.bus.count_for(self.id);
261            let _ = s.send(PubsubFrame::Punsubscribe {
262                pattern: Some(p),
263                count,
264            });
265        }
266    }
267
268    /// Block until one frame is queued. `Err(io::ErrorKind::UnexpectedEof)`
269    /// once the underlying bus tears down (last `Store` clone dropped).
270    ///
271    /// Acquires the receiver mutex for the entire blocking wait — other
272    /// `recv`/`recv_timeout` callers serialise behind this one. Concurrent
273    /// `try_recv` calls return `Ok(None)` while a `recv` is blocked (no
274    /// wait on the lock); see the type-level doc for the trade-off.
275    pub fn recv(&self) -> io::Result<PubsubFrame> {
276        let g = self.receiver.lock().unwrap_or_else(|p| p.into_inner());
277        g.recv()
278            .map_err(|_| io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed"))
279    }
280
281    /// Bounded blocking recv. `Err(io::ErrorKind::TimedOut)` when `dur`
282    /// elapses; `Err(io::ErrorKind::UnexpectedEof)` when the bus is gone.
283    pub fn recv_timeout(&self, dur: Duration) -> io::Result<PubsubFrame> {
284        let g = self.receiver.lock().unwrap_or_else(|p| p.into_inner());
285        g.recv_timeout(dur).map_err(|e| match e {
286            RecvTimeoutError::Timeout => io::Error::from(io::ErrorKind::TimedOut),
287            RecvTimeoutError::Disconnected => {
288                io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed")
289            }
290        })
291    }
292
293    /// Non-blocking recv. `Ok(None)` if the queue is empty;
294    /// `Err(UnexpectedEof)` when the bus is gone.
295    ///
296    /// Uses `try_lock` so a concurrent blocking `recv` doesn't make
297    /// `try_recv` itself block — lock contention is reported as `Ok(None)`
298    /// (semantically: "no frame available right now"). Same shape callers
299    /// already handle for an empty queue.
300    pub fn try_recv(&self) -> io::Result<Option<PubsubFrame>> {
301        let g = match self.receiver.try_lock() {
302            Ok(g) => g,
303            Err(_) => return Ok(None),
304        };
305        match g.try_recv() {
306            Ok(f) => Ok(Some(f)),
307            Err(TryRecvError::Empty) => Ok(None),
308            Err(TryRecvError::Disconnected) => {
309                Err(io::Error::new(io::ErrorKind::UnexpectedEof, "bus closed"))
310            }
311        }
312    }
313}
314
315impl std::fmt::Debug for Subscription {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        f.debug_struct("Subscription")
318            .field("id", &self.id)
319            .field("channels", &self.channels.len())
320            .field("patterns", &self.patterns.len())
321            .finish_non_exhaustive()
322    }
323}
324
325impl Drop for Subscription {
326    fn drop(&mut self) {
327        // Best-effort cleanup. Recover from poison (a panic elsewhere left the
328        // bus intact) so our entries are always removed.
329        let mut g = self.inner.write().unwrap_or_else(|p| p.into_inner());
330        g.bus.remove_all_for(self.id);
331    }
332}
333
334#[cfg(test)]
335#[path = "pubsub_tests.rs"]
336mod tests;