Skip to main content

kevy_client/
subscribe.rs

1//! Pub/sub consumer side — a connection dedicated to receiving messages.
2//!
3//! `SUBSCRIBE` / `PSUBSCRIBE` morph a connection into a one-way event
4//! stream: the client no longer sends ordinary commands and instead reads
5//! an unbounded sequence of `subscribe`, `message`, `pmessage`,
6//! `unsubscribe`, … frames until the connection is closed. That semantic
7//! doesn't fit the one-shot `Connection::request` shape, so subscribed
8//! traffic gets its own type, [`Subscriber`].
9//!
10//! Two backends, switched on the URL:
11//! - `kevy://` / `redis://` / `tcp://` — dedicated TCP socket
12//! - `mem://<name>` / `file:///path` — in-process bus, via the URL
13//!   registry in [`crate::resolve_store`]. Anonymous `mem://` (no name)
14//!   has no bus and is rejected; use a named bus to actually receive
15//!   messages from a [`crate::Connection::publish`] on the same URL.
16//!
17//! ```no_run
18//! use kevy_client::{Subscriber, PubsubEvent};
19//!
20//! let mut sub = Subscriber::open("kevy://localhost:6379", &[b"news"])?;
21//! loop {
22//!     if let PubsubEvent::Message { channel, payload } = sub.recv()? {
23//!         println!("{}: {}", String::from_utf8_lossy(&channel),
24//!                            String::from_utf8_lossy(&payload));
25//!     }
26//! }
27//! # Ok::<(), std::io::Error>(())
28//! ```
29
30use std::io::{self, Read, Write};
31use std::net::TcpStream;
32use std::time::Duration;
33
34use kevy_embedded::Subscription;
35use kevy_resp::{Reply, encode_command};
36
37#[cfg(test)]
38use crate::subscribe_io::classify;
39use crate::subscribe_io::{frame_to_event, invalid, recv_remote, send_to, shape};
40use crate::{Target, parse_url, resolve_store};
41
42/// One subscribed connection. Owns either a TCP socket or an in-process
43/// [`Subscription`]; the variant is chosen by the URL scheme in
44/// [`Subscriber::open`] / [`Subscriber::connect`].
45#[derive(Debug)]
46pub struct Subscriber {
47    inner: Inner,
48}
49
50#[derive(Debug)]
51enum Inner {
52    /// TCP RESP2 connection, drained one reply at a time.
53    Remote {
54        stream: TcpStream,
55        buf: Vec<u8>,
56    },
57    /// In-process bus subscription. `timeout` mirrors the TCP
58    /// `SO_RCVTIMEO` behaviour for [`Subscriber::recv`] / [`Subscriber::set_read_timeout`].
59    Embedded {
60        subscription: Subscription,
61        timeout: Option<Duration>,
62    },
63}
64
65/// One pubsub frame received from the bus or the wire.
66///
67/// `Unsubscribe` / `Punsubscribe`'s `channel` / `pattern` is `None` when
68/// the server is acknowledging "unsubscribed from everything" with a nil
69/// bulk — matching the Redis wire shape.
70#[non_exhaustive]
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum PubsubEvent {
73    /// `SUBSCRIBE` ack — one per channel the client subscribed to.
74    Subscribe {
75        /// Channel that was just subscribed.
76        channel: Vec<u8>,
77        /// Total number of channels + patterns the connection is now subscribed to.
78        count: i64,
79    },
80    /// `PSUBSCRIBE` ack — one per pattern.
81    Psubscribe {
82        /// Pattern that was just subscribed.
83        pattern: Vec<u8>,
84        /// Total number of channels + patterns the connection is now subscribed to.
85        count: i64,
86    },
87    /// `UNSUBSCRIBE` ack — `channel: None` when the server is reporting
88    /// "no channels were subscribed" (the spec's nil bulk).
89    Unsubscribe {
90        /// Channel that was just unsubscribed (`None` for "all" / "none").
91        channel: Option<Vec<u8>>,
92        /// Total number of channels + patterns still subscribed.
93        count: i64,
94    },
95    /// `PUNSUBSCRIBE` ack — pattern `None` when the server is reporting
96    /// "no patterns were subscribed".
97    Punsubscribe {
98        /// Pattern that was just unsubscribed (`None` for "all" / "none").
99        pattern: Option<Vec<u8>>,
100        /// Total number of channels + patterns still subscribed.
101        count: i64,
102    },
103    /// Plain `PUBLISH` delivery on a subscribed channel.
104    Message {
105        /// Channel the publish was made to.
106        channel: Vec<u8>,
107        /// Raw payload bytes (no encoding assumed).
108        payload: Vec<u8>,
109    },
110    /// Pattern-match delivery: a `PUBLISH` to a channel that matched one
111    /// of this connection's patterns.
112    Pmessage {
113        /// Pattern the channel matched.
114        pattern: Vec<u8>,
115        /// Channel the publish was made to.
116        channel: Vec<u8>,
117        /// Raw payload bytes.
118        payload: Vec<u8>,
119    },
120}
121
122impl Subscriber {
123    /// Open a fresh connection without subscribing to anything yet. Call
124    /// [`Self::subscribe`] / [`Self::psubscribe`] next.
125    ///
126    /// Accepted URLs:
127    /// - `kevy://`, `redis://`, `tcp://` — TCP RESP server
128    /// - `mem://<name>`, `file:///path` — in-process shared bus
129    /// - `mem://` (anonymous), `rediss://`, `kevys://`, `redis://user:pass@…`
130    ///   are rejected with [`io::ErrorKind::Unsupported`]
131    pub fn connect(url: &str) -> io::Result<Self> {
132        let target = parse_url(url)?;
133        let inner = match target {
134            Target::EmbedMemoryAnonymous => {
135                return Err(io::Error::new(
136                    io::ErrorKind::Unsupported,
137                    "anonymous mem:// has no other producer; use mem://<name> for a shared bus",
138                ));
139            }
140            Target::EmbedMemoryNamed(_) | Target::EmbedPersist(_) => Inner::Embedded {
141                subscription: resolve_store(&target)?.subscribe(&[]),
142                timeout: None,
143            },
144            Target::Remote(remote_url) => {
145                let (host, port) = remote_host_port(&remote_url)?;
146                let stream = TcpStream::connect((host.as_str(), port))?;
147                stream.set_nodelay(true).ok();
148                Inner::Remote {
149                    stream,
150                    buf: Vec::with_capacity(8192),
151                }
152            }
153        };
154        Ok(Self { inner })
155    }
156
157    /// Open and subscribe to one or more channels in one step. Returns
158    /// `ErrorKind::InvalidInput` if `channels` is empty (use
159    /// [`Self::connect`] for an empty start).
160    pub fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
161        if channels.is_empty() {
162            return Err(io::Error::new(
163                io::ErrorKind::InvalidInput,
164                "Subscriber::open needs ≥ 1 channel — use Subscriber::connect() for empty start",
165            ));
166        }
167        let mut s = Self::connect(url)?;
168        s.subscribe(channels)?;
169        Ok(s)
170    }
171
172    /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks
173    /// are delivered via [`Self::recv`].
174    pub fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
175        if channels.is_empty() {
176            return Err(io::Error::new(
177                io::ErrorKind::InvalidInput,
178                "SUBSCRIBE needs ≥ 1 channel",
179            ));
180        }
181        match &mut self.inner {
182            Inner::Remote { stream, .. } => send_to(stream, b"SUBSCRIBE", channels),
183            Inner::Embedded { subscription, .. } => {
184                subscription.subscribe(channels);
185                Ok(())
186            }
187        }
188    }
189
190    /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
191    /// (`*`, `?`, `[…]`).
192    pub fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
193        if patterns.is_empty() {
194            return Err(io::Error::new(
195                io::ErrorKind::InvalidInput,
196                "PSUBSCRIBE needs ≥ 1 pattern",
197            ));
198        }
199        match &mut self.inner {
200            Inner::Remote { stream, .. } => send_to(stream, b"PSUBSCRIBE", patterns),
201            Inner::Embedded { subscription, .. } => {
202                subscription.psubscribe(patterns);
203                Ok(())
204            }
205        }
206    }
207
208    /// `UNSUBSCRIBE [channel ...]`. Empty `channels` unsubscribes from
209    /// every channel (Redis wire semantics).
210    pub fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
211        match &mut self.inner {
212            Inner::Remote { stream, .. } => send_to(stream, b"UNSUBSCRIBE", channels),
213            Inner::Embedded { subscription, .. } => {
214                subscription.unsubscribe(channels);
215                Ok(())
216            }
217        }
218    }
219
220    /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` unsubscribes from
221    /// every pattern.
222    pub fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
223        match &mut self.inner {
224            Inner::Remote { stream, .. } => send_to(stream, b"PUNSUBSCRIBE", patterns),
225            Inner::Embedded { subscription, .. } => {
226                subscription.punsubscribe(patterns);
227                Ok(())
228            }
229        }
230    }
231
232    /// Block until the next pubsub frame arrives. Apply
233    /// [`Self::set_read_timeout`] for bounded blocking.
234    /// Connection close / bus tear-down yields `ErrorKind::UnexpectedEof`.
235    pub fn recv(&mut self) -> io::Result<PubsubEvent> {
236        match &mut self.inner {
237            Inner::Remote { stream, buf } => recv_remote(stream, buf),
238            Inner::Embedded {
239                subscription,
240                timeout,
241            } => {
242                let frame = match *timeout {
243                    Some(d) => subscription.recv_timeout(d)?,
244                    None => subscription.recv()?,
245                };
246                Ok(frame_to_event(frame))
247            }
248        }
249    }
250
251    /// Block until the next published `Message` / `Pmessage` arrives,
252    /// silently skipping subscription-acknowledgement frames
253    /// ([`PubsubEvent::Subscribe`] / [`PubsubEvent::Unsubscribe`] /
254    /// [`PubsubEvent::Psubscribe`] / [`PubsubEvent::Punsubscribe`]) along
255    /// the way.
256    ///
257    /// This is the form most callers want — almost no consumer of
258    /// pubsub needs to see the ack frames (they're a wire-protocol
259    /// detail), so a loop+match around [`Self::recv`] is essentially
260    /// boilerplate. Returns `(channel, payload)`. For pattern matches,
261    /// `channel` is the concrete channel the publisher used (matching
262    /// Redis's `pmessage` shape, where `pattern` is discarded — use
263    /// [`Self::recv`] directly if you need it).
264    ///
265    /// Errors from [`Self::recv`] (connection close, timeout, etc.)
266    /// propagate unchanged.
267    pub fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
268        loop {
269            match self.recv()? {
270                PubsubEvent::Message { channel, payload } => return Ok((channel, payload)),
271                PubsubEvent::Pmessage { channel, payload, .. } => {
272                    return Ok((channel, payload));
273                }
274                // Ack frames — keep waiting for the next real message.
275                PubsubEvent::Subscribe { .. }
276                | PubsubEvent::Psubscribe { .. }
277                | PubsubEvent::Unsubscribe { .. }
278                | PubsubEvent::Punsubscribe { .. } => continue,
279            }
280        }
281    }
282
283    /// Negotiate RESP3 on this connection by sending `HELLO 3` and
284    /// draining the ack. Subsequent `SUBSCRIBE` / `PSUBSCRIBE` /
285    /// `PUBLISH` deliveries arrive as push frames (`>N\r\n…`) instead
286    /// of the legacy RESP2 array shape (`*N\r\n…`); [`Self::recv`]
287    /// accepts both transparently, so existing code keeps working with
288    /// no other changes.
289    ///
290    /// Remote-only: the embedded backend has no proto negotiation
291    /// concept (frames go through the in-process bus typed). Calling
292    /// `hello3` on an embedded [`Subscriber`] returns
293    /// [`io::ErrorKind::Unsupported`].
294    ///
295    /// Must be called BEFORE any [`Self::subscribe`] /
296    /// [`Self::psubscribe`] — Redis requires `HELLO` be the first
297    /// command on a connection that uses it.
298    pub fn hello3(&mut self) -> io::Result<PubsubEvent> {
299        match &mut self.inner {
300            Inner::Embedded { .. } => Err(io::Error::new(
301                io::ErrorKind::Unsupported,
302                "HELLO 3 is a remote/TCP-only operation; embedded backend has no proto switch",
303            )),
304            Inner::Remote { stream, buf } => {
305                let mut frame = Vec::new();
306                encode_command(&mut frame, &[b"HELLO".to_vec(), b"3".to_vec()]);
307                stream.write_all(&frame)?;
308                // The HELLO 3 ack itself comes back as a RESP3 Map
309                // (`%7\r\n…`). parse_reply accepts it (P1); we drain
310                // and discard since the proto switch is the actual
311                // semantic — the body's just server metadata.
312                let mut chunk = [0u8; 4096];
313                loop {
314                    match kevy_resp::parse_reply(buf) {
315                        Ok(Some((reply, used))) => {
316                            buf.drain(..used);
317                            // Reply::Map / Reply::Array both acceptable
318                            // (a server that rejected V3 would emit an
319                            // Error reply — fall through to the error
320                            // branch below).
321                            return match reply {
322                                Reply::Map(_) | Reply::Array(_) => {
323                                    Ok(PubsubEvent::Subscribe {
324                                        channel: b"HELLO".to_vec(),
325                                        count: 3,
326                                    })
327                                }
328                                Reply::Error(e) => Err(io::Error::other(
329                                    String::from_utf8_lossy(&e).into_owned(),
330                                )),
331                                other => Err(invalid(format!(
332                                    "unexpected HELLO 3 reply shape: {}",
333                                    shape(&other)
334                                ))),
335                            };
336                        }
337                        Ok(None) => {}
338                        Err(_) => {
339                            return Err(io::Error::new(
340                                io::ErrorKind::InvalidData,
341                                "malformed HELLO 3 reply",
342                            ));
343                        }
344                    }
345                    let n = stream.read(&mut chunk)?;
346                    if n == 0 {
347                        return Err(io::Error::new(
348                            io::ErrorKind::UnexpectedEof,
349                            "server closed connection during HELLO 3",
350                        ));
351                    }
352                    buf.extend_from_slice(&chunk[..n]);
353                }
354            }
355        }
356    }
357
358    /// Apply (or clear) a read timeout. After setting `Some(dur)`,
359    /// [`Self::recv`] returns an `io::Error` of kind `WouldBlock` /
360    /// `TimedOut` when no frame arrives within `dur`.
361    pub fn set_read_timeout(&mut self, dur: Option<Duration>) -> io::Result<()> {
362        match &mut self.inner {
363            Inner::Remote { stream, .. } => stream.set_read_timeout(dur),
364            Inner::Embedded { timeout, .. } => {
365                *timeout = dur;
366                Ok(())
367            }
368        }
369    }
370
371    /// Borrowing iterator over every pubsub frame — ack frames included.
372    /// Each `next()` is one blocking [`Self::recv`]. Terminates (`None`)
373    /// when the underlying stream / bus is gone (`ErrorKind::UnexpectedEof`);
374    /// every other error is surfaced as `Some(Err(_))` so the caller can
375    /// decide whether to retry (e.g. a read timeout) or break.
376    ///
377    /// kevy stays 0-deps so this is a `std::iter::Iterator`, not a
378    /// `futures::Stream`. Async runtimes consume it via
379    /// `spawn_blocking` (see `docs/pubsub.md`).
380    pub fn events(&mut self) -> SubscriberEvents<'_> {
381        SubscriberEvents { sub: self }
382    }
383
384    /// Borrowing iterator that silently skips `(p)?(un)?subscribe` acks
385    /// and yields the payload tuples consumers actually want. Mirrors
386    /// [`Self::recv_message`] in iterator form. For `Pmessage` the
387    /// pattern is discarded — fall back to [`Self::events`] if you need it.
388    pub fn messages(&mut self) -> SubscriberMessages<'_> {
389        SubscriberMessages { sub: self }
390    }
391}
392
393/// Iterator returned by [`Subscriber::events`]. Yields every pubsub
394/// frame (acks + payloads). See the method docs for termination + error
395/// semantics.
396#[derive(Debug)]
397pub struct SubscriberEvents<'a> {
398    sub: &'a mut Subscriber,
399}
400
401impl Iterator for SubscriberEvents<'_> {
402    type Item = io::Result<PubsubEvent>;
403    fn next(&mut self) -> Option<Self::Item> {
404        match self.sub.recv() {
405            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
406            other => Some(other),
407        }
408    }
409}
410
411/// Iterator returned by [`Subscriber::messages`]. Yields one
412/// `(channel, payload)` per published `message` / `pmessage`; ack frames
413/// are silently consumed and not yielded.
414#[derive(Debug)]
415pub struct SubscriberMessages<'a> {
416    sub: &'a mut Subscriber,
417}
418
419impl Iterator for SubscriberMessages<'_> {
420    type Item = io::Result<(Vec<u8>, Vec<u8>)>;
421    fn next(&mut self) -> Option<Self::Item> {
422        match self.sub.recv_message() {
423            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
424            other => Some(other),
425        }
426    }
427}
428
429// `send_to` / `recv_remote` / `frame_to_event` / `classify` and the
430// per-field reply unwrap helpers live in [`crate::subscribe_io`] —
431// split out so this file stays under the 500-LOC house rule.
432
433// ─────────────────────────────────────────────────────────────────────────
434// Remote host:port extraction. Reuses the same authority parsing logic
435// kevy-resp-client::from_url applies, but only needs host+port (pub/sub
436// is global, not db-scoped — any /N path segment is ignored).
437// ─────────────────────────────────────────────────────────────────────────
438
439fn remote_host_port(url: &str) -> io::Result<(String, u16)> {
440    let (_scheme, rest) = url.split_once("://").ok_or_else(|| {
441        io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'")
442    })?;
443    if rest.contains('@') {
444        return Err(io::Error::new(
445            io::ErrorKind::Unsupported,
446            "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
447        ));
448    }
449    let authority = rest.split('/').next().unwrap_or("");
450    let (host, port) = match authority.rsplit_once(':') {
451        Some((h, p)) => {
452            let port: u16 = p.parse().map_err(|_| {
453                io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
454            })?;
455            (h.to_string(), port)
456        }
457        None => (authority.to_string(), 6379),
458    };
459    if host.is_empty() {
460        return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
461    }
462    Ok((host, port))
463}
464
465#[cfg(test)]
466#[path = "subscribe_tests.rs"]
467mod tests;