Skip to main content

kevy_client/
subscribe.rs

1//! Pub/sub consumer side — a connection dedicated to receiving messages.
2//!
3//! `SUBSCRIBE` / `PSUBSCRIBE` morph a connection into a one-way event
4//! stream: the client no longer sends ordinary commands and instead reads
5//! an unbounded sequence of `subscribe`, `message`, `pmessage`,
6//! `unsubscribe`, … frames until the connection is closed. That semantic
7//! doesn't fit the one-shot `Connection::request` shape, so subscribed
8//! traffic gets its own type, [`Subscriber`].
9//!
10//! Two backends, switched on the URL:
11//! - `kevy://` / `redis://` / `tcp://` — dedicated TCP socket
12//! - `mem://<name>` / `file:///path` — in-process bus, via the URL
13//!   registry in [`crate::resolve_store`]. Anonymous `mem://` (no name)
14//!   has no bus and is rejected; use a named bus to actually receive
15//!   messages from a [`crate::Connection::publish`] on the same URL.
16//!
17//! ```no_run
18//! use kevy_client::{Subscriber, PubsubEvent};
19//!
20//! let mut sub = Subscriber::open("kevy://localhost:6379", &[b"news"])?;
21//! loop {
22//!     if let PubsubEvent::Message { channel, payload } = sub.recv()? {
23//!         println!("{}: {}", String::from_utf8_lossy(&channel),
24//!                            String::from_utf8_lossy(&payload));
25//!     }
26//! }
27//! # Ok::<(), std::io::Error>(())
28//! ```
29
30use std::io::{self, Read, Write};
31use std::net::TcpStream;
32use std::time::Duration;
33
34use kevy_embedded::Subscription;
35use kevy_resp::{Reply, encode_command};
36
37#[cfg(test)]
38use crate::subscribe_io::classify;
39use crate::subscribe_io::{frame_to_event, invalid, recv_remote, send_to, shape};
40use crate::{Target, parse_url, resolve_store};
41
42/// One subscribed connection. Owns either a TCP socket or an in-process
43/// [`Subscription`]; the variant is chosen by the URL scheme in
44/// [`Subscriber::open`] / [`Subscriber::connect`].
45#[derive(Debug)]
46pub struct Subscriber {
47    inner: Inner,
48}
49
50#[derive(Debug)]
51enum Inner {
52    /// TCP RESP2 connection, drained one reply at a time.
53    Remote {
54        stream: TcpStream,
55        buf: Vec<u8>,
56    },
57    /// In-process bus subscription. `timeout` mirrors the TCP
58    /// `SO_RCVTIMEO` behaviour for [`Subscriber::recv`] / [`Subscriber::set_read_timeout`].
59    Embedded {
60        subscription: Subscription,
61        timeout: Option<Duration>,
62    },
63}
64
65/// One pubsub frame received from the bus or the wire.
66///
67/// `Unsubscribe` / `Punsubscribe`'s `channel` / `pattern` is `None` when
68/// the server is acknowledging "unsubscribed from everything" with a nil
69/// bulk — matching the Redis wire shape.
70#[non_exhaustive]
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum PubsubEvent {
73    /// `SUBSCRIBE` ack — one per channel the client subscribed to.
74    Subscribe {
75        /// Channel that was just subscribed.
76        channel: Vec<u8>,
77        /// Total number of channels + patterns the connection is now subscribed to.
78        count: i64,
79    },
80    /// `PSUBSCRIBE` ack — one per pattern.
81    Psubscribe {
82        /// Pattern that was just subscribed.
83        pattern: Vec<u8>,
84        /// Total number of channels + patterns the connection is now subscribed to.
85        count: i64,
86    },
87    /// `UNSUBSCRIBE` ack — `channel: None` when the server is reporting
88    /// "no channels were subscribed" (the spec's nil bulk).
89    Unsubscribe {
90        /// Channel that was just unsubscribed (`None` for "all" / "none").
91        channel: Option<Vec<u8>>,
92        /// Total number of channels + patterns still subscribed.
93        count: i64,
94    },
95    /// `PUNSUBSCRIBE` ack — pattern `None` when the server is reporting
96    /// "no patterns were subscribed".
97    Punsubscribe {
98        /// Pattern that was just unsubscribed (`None` for "all" / "none").
99        pattern: Option<Vec<u8>>,
100        /// Total number of channels + patterns still subscribed.
101        count: i64,
102    },
103    /// Plain `PUBLISH` delivery on a subscribed channel.
104    Message {
105        /// Channel the publish was made to.
106        channel: Vec<u8>,
107        /// Raw payload bytes (no encoding assumed).
108        payload: Vec<u8>,
109    },
110    /// Pattern-match delivery: a `PUBLISH` to a channel that matched one
111    /// of this connection's patterns.
112    Pmessage {
113        /// Pattern the channel matched.
114        pattern: Vec<u8>,
115        /// Channel the publish was made to.
116        channel: Vec<u8>,
117        /// Raw payload bytes.
118        payload: Vec<u8>,
119    },
120}
121
122impl Subscriber {
123    /// Open a fresh connection without subscribing to anything yet. Call
124    /// [`Self::subscribe`] / [`Self::psubscribe`] next.
125    ///
126    /// Accepted URLs:
127    /// - `kevy://`, `redis://`, `tcp://` — TCP RESP server
128    /// - `mem://<name>`, `file:///path` — in-process shared bus
129    /// - `mem://` (anonymous), `rediss://`, `kevys://`, `redis://user:pass@…`
130    ///   are rejected with [`io::ErrorKind::Unsupported`]
131    pub fn connect(url: &str) -> io::Result<Self> {
132        let target = parse_url(url)?;
133        let inner = match target {
134            Target::EmbedMemoryAnonymous => {
135                return Err(io::Error::new(
136                    io::ErrorKind::Unsupported,
137                    "anonymous mem:// has no other producer; use mem://<name> for a shared bus",
138                ));
139            }
140            Target::EmbedMemoryNamed(_) | Target::EmbedPersist(_) => Inner::Embedded {
141                subscription: resolve_store(&target)?.subscribe(&[]),
142                timeout: None,
143            },
144            Target::Remote(remote_url) => {
145                let (host, port) = remote_host_port(&remote_url)?;
146                let stream = TcpStream::connect((host.as_str(), port))?;
147                stream.set_nodelay(true).ok();
148                Inner::Remote {
149                    stream,
150                    buf: Vec::with_capacity(8192),
151                }
152            }
153        };
154        Ok(Self { inner })
155    }
156
157    /// Open and subscribe to one or more channels in one step. Returns
158    /// `ErrorKind::InvalidInput` if `channels` is empty (use
159    /// [`Self::connect`] for an empty start).
160    pub fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
161        if channels.is_empty() {
162            return Err(io::Error::new(
163                io::ErrorKind::InvalidInput,
164                "Subscriber::open needs ≥ 1 channel — use Subscriber::connect() for empty start",
165            ));
166        }
167        let mut s = Self::connect(url)?;
168        s.subscribe(channels)?;
169        Ok(s)
170    }
171
172    /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks
173    /// are delivered via [`Self::recv`].
174    pub fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
175        if channels.is_empty() {
176            return Err(io::Error::new(
177                io::ErrorKind::InvalidInput,
178                "SUBSCRIBE needs ≥ 1 channel",
179            ));
180        }
181        match &mut self.inner {
182            Inner::Remote { stream, .. } => send_to(stream, b"SUBSCRIBE", channels),
183            Inner::Embedded { subscription, .. } => {
184                subscription.subscribe(channels);
185                Ok(())
186            }
187        }
188    }
189
190    /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
191    /// (`*`, `?`, `[…]`).
192    pub fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
193        if patterns.is_empty() {
194            return Err(io::Error::new(
195                io::ErrorKind::InvalidInput,
196                "PSUBSCRIBE needs ≥ 1 pattern",
197            ));
198        }
199        match &mut self.inner {
200            Inner::Remote { stream, .. } => send_to(stream, b"PSUBSCRIBE", patterns),
201            Inner::Embedded { subscription, .. } => {
202                subscription.psubscribe(patterns);
203                Ok(())
204            }
205        }
206    }
207
208    /// `UNSUBSCRIBE [channel ...]`. Empty `channels` unsubscribes from
209    /// every channel (Redis wire semantics).
210    pub fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
211        match &mut self.inner {
212            Inner::Remote { stream, .. } => send_to(stream, b"UNSUBSCRIBE", channels),
213            Inner::Embedded { subscription, .. } => {
214                subscription.unsubscribe(channels);
215                Ok(())
216            }
217        }
218    }
219
220    /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` unsubscribes from
221    /// every pattern.
222    pub fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
223        match &mut self.inner {
224            Inner::Remote { stream, .. } => send_to(stream, b"PUNSUBSCRIBE", patterns),
225            Inner::Embedded { subscription, .. } => {
226                subscription.punsubscribe(patterns);
227                Ok(())
228            }
229        }
230    }
231
232    /// Block until the next pubsub frame arrives. Apply
233    /// [`Self::set_read_timeout`] for bounded blocking.
234    /// Connection close / bus tear-down yields `ErrorKind::UnexpectedEof`.
235    pub fn recv(&mut self) -> io::Result<PubsubEvent> {
236        match &mut self.inner {
237            Inner::Remote { stream, buf } => recv_remote(stream, buf),
238            Inner::Embedded {
239                subscription,
240                timeout,
241            } => {
242                let frame = match *timeout {
243                    Some(d) => subscription.recv_timeout(d)?,
244                    None => subscription.recv()?,
245                };
246                Ok(frame_to_event(frame))
247            }
248        }
249    }
250
251    /// Block until the next published `Message` / `Pmessage` arrives,
252    /// silently skipping subscription-acknowledgement frames
253    /// ([`PubsubEvent::Subscribe`] / [`Unsubscribe`] / [`Psubscribe`] /
254    /// [`Punsubscribe`]) along the way.
255    ///
256    /// This is the form most callers want — almost no consumer of
257    /// pubsub needs to see the ack frames (they're a wire-protocol
258    /// detail), so a loop+match around [`Self::recv`] is essentially
259    /// boilerplate. Returns `(channel, payload)`. For pattern matches,
260    /// `channel` is the concrete channel the publisher used (matching
261    /// Redis's `pmessage` shape, where `pattern` is discarded — use
262    /// [`Self::recv`] directly if you need it).
263    ///
264    /// Errors from [`Self::recv`] (connection close, timeout, etc.)
265    /// propagate unchanged.
266    pub fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
267        loop {
268            match self.recv()? {
269                PubsubEvent::Message { channel, payload } => return Ok((channel, payload)),
270                PubsubEvent::Pmessage { channel, payload, .. } => {
271                    return Ok((channel, payload));
272                }
273                // Ack frames — keep waiting for the next real message.
274                PubsubEvent::Subscribe { .. }
275                | PubsubEvent::Psubscribe { .. }
276                | PubsubEvent::Unsubscribe { .. }
277                | PubsubEvent::Punsubscribe { .. } => continue,
278            }
279        }
280    }
281
282    /// Negotiate RESP3 on this connection by sending `HELLO 3` and
283    /// draining the ack. Subsequent `SUBSCRIBE` / `PSUBSCRIBE` /
284    /// `PUBLISH` deliveries arrive as push frames (`>N\r\n…`) instead
285    /// of the legacy RESP2 array shape (`*N\r\n…`); [`Self::recv`]
286    /// accepts both transparently, so existing code keeps working with
287    /// no other changes.
288    ///
289    /// Remote-only: the embedded backend has no proto negotiation
290    /// concept (frames go through the in-process bus typed). Calling
291    /// `hello3` on an embedded [`Subscriber`] returns
292    /// [`io::ErrorKind::Unsupported`].
293    ///
294    /// Must be called BEFORE any [`Self::subscribe`] /
295    /// [`Self::psubscribe`] — Redis requires `HELLO` be the first
296    /// command on a connection that uses it.
297    pub fn hello3(&mut self) -> io::Result<PubsubEvent> {
298        match &mut self.inner {
299            Inner::Embedded { .. } => Err(io::Error::new(
300                io::ErrorKind::Unsupported,
301                "HELLO 3 is a remote/TCP-only operation; embedded backend has no proto switch",
302            )),
303            Inner::Remote { stream, buf } => {
304                let mut frame = Vec::new();
305                encode_command(&mut frame, &[b"HELLO".to_vec(), b"3".to_vec()]);
306                stream.write_all(&frame)?;
307                // The HELLO 3 ack itself comes back as a RESP3 Map
308                // (`%7\r\n…`). parse_reply accepts it (P1); we drain
309                // and discard since the proto switch is the actual
310                // semantic — the body's just server metadata.
311                let mut chunk = [0u8; 4096];
312                loop {
313                    match kevy_resp::parse_reply(buf) {
314                        Ok(Some((reply, used))) => {
315                            buf.drain(..used);
316                            // Reply::Map / Reply::Array both acceptable
317                            // (a server that rejected V3 would emit an
318                            // Error reply — fall through to the error
319                            // branch below).
320                            return match reply {
321                                Reply::Map(_) | Reply::Array(_) => {
322                                    Ok(PubsubEvent::Subscribe {
323                                        channel: b"HELLO".to_vec(),
324                                        count: 3,
325                                    })
326                                }
327                                Reply::Error(e) => Err(io::Error::other(
328                                    String::from_utf8_lossy(&e).into_owned(),
329                                )),
330                                other => Err(invalid(format!(
331                                    "unexpected HELLO 3 reply shape: {}",
332                                    shape(&other)
333                                ))),
334                            };
335                        }
336                        Ok(None) => {}
337                        Err(_) => {
338                            return Err(io::Error::new(
339                                io::ErrorKind::InvalidData,
340                                "malformed HELLO 3 reply",
341                            ));
342                        }
343                    }
344                    let n = stream.read(&mut chunk)?;
345                    if n == 0 {
346                        return Err(io::Error::new(
347                            io::ErrorKind::UnexpectedEof,
348                            "server closed connection during HELLO 3",
349                        ));
350                    }
351                    buf.extend_from_slice(&chunk[..n]);
352                }
353            }
354        }
355    }
356
357    /// Apply (or clear) a read timeout. After setting `Some(dur)`,
358    /// [`Self::recv`] returns an `io::Error` of kind `WouldBlock` /
359    /// `TimedOut` when no frame arrives within `dur`.
360    pub fn set_read_timeout(&mut self, dur: Option<Duration>) -> io::Result<()> {
361        match &mut self.inner {
362            Inner::Remote { stream, .. } => stream.set_read_timeout(dur),
363            Inner::Embedded { timeout, .. } => {
364                *timeout = dur;
365                Ok(())
366            }
367        }
368    }
369
370    /// Borrowing iterator over every pubsub frame — ack frames included.
371    /// Each `next()` is one blocking [`Self::recv`]. Terminates (`None`)
372    /// when the underlying stream / bus is gone (`ErrorKind::UnexpectedEof`);
373    /// every other error is surfaced as `Some(Err(_))` so the caller can
374    /// decide whether to retry (e.g. a read timeout) or break.
375    ///
376    /// kevy stays 0-deps so this is a `std::iter::Iterator`, not a
377    /// `futures::Stream`. Async runtimes consume it via
378    /// `spawn_blocking` (see `docs/pubsub.md`).
379    pub fn events(&mut self) -> SubscriberEvents<'_> {
380        SubscriberEvents { sub: self }
381    }
382
383    /// Borrowing iterator that silently skips `(p)?(un)?subscribe` acks
384    /// and yields the payload tuples consumers actually want. Mirrors
385    /// [`Self::recv_message`] in iterator form. For `Pmessage` the
386    /// pattern is discarded — fall back to [`Self::events`] if you need it.
387    pub fn messages(&mut self) -> SubscriberMessages<'_> {
388        SubscriberMessages { sub: self }
389    }
390}
391
392/// Iterator returned by [`Subscriber::events`]. Yields every pubsub
393/// frame (acks + payloads). See the method docs for termination + error
394/// semantics.
395#[derive(Debug)]
396pub struct SubscriberEvents<'a> {
397    sub: &'a mut Subscriber,
398}
399
400impl Iterator for SubscriberEvents<'_> {
401    type Item = io::Result<PubsubEvent>;
402    fn next(&mut self) -> Option<Self::Item> {
403        match self.sub.recv() {
404            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
405            other => Some(other),
406        }
407    }
408}
409
410/// Iterator returned by [`Subscriber::messages`]. Yields one
411/// `(channel, payload)` per published `message` / `pmessage`; ack frames
412/// are silently consumed and not yielded.
413#[derive(Debug)]
414pub struct SubscriberMessages<'a> {
415    sub: &'a mut Subscriber,
416}
417
418impl Iterator for SubscriberMessages<'_> {
419    type Item = io::Result<(Vec<u8>, Vec<u8>)>;
420    fn next(&mut self) -> Option<Self::Item> {
421        match self.sub.recv_message() {
422            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
423            other => Some(other),
424        }
425    }
426}
427
428// `send_to` / `recv_remote` / `frame_to_event` / `classify` and the
429// per-field reply unwrap helpers live in [`crate::subscribe_io`] —
430// split out so this file stays under the 500-LOC house rule.
431
432// ─────────────────────────────────────────────────────────────────────────
433// Remote host:port extraction. Reuses the same authority parsing logic
434// kevy-resp-client::from_url applies, but only needs host+port (pub/sub
435// is global, not db-scoped — any /N path segment is ignored).
436// ─────────────────────────────────────────────────────────────────────────
437
438fn remote_host_port(url: &str) -> io::Result<(String, u16)> {
439    let (_scheme, rest) = url.split_once("://").ok_or_else(|| {
440        io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'")
441    })?;
442    if rest.contains('@') {
443        return Err(io::Error::new(
444            io::ErrorKind::Unsupported,
445            "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
446        ));
447    }
448    let authority = rest.split('/').next().unwrap_or("");
449    let (host, port) = match authority.rsplit_once(':') {
450        Some((h, p)) => {
451            let port: u16 = p.parse().map_err(|_| {
452                io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
453            })?;
454            (h.to_string(), port)
455        }
456        None => (authority.to_string(), 6379),
457    };
458    if host.is_empty() {
459        return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
460    }
461    Ok((host, port))
462}
463
464#[cfg(test)]
465#[path = "subscribe_tests.rs"]
466mod tests;