kevy_client/subscribe.rs
1//! Pub/sub consumer side — a connection dedicated to receiving messages.
2//!
3//! `SUBSCRIBE` / `PSUBSCRIBE` morph a connection into a one-way event
4//! stream: the client no longer sends ordinary commands and instead reads
5//! an unbounded sequence of `subscribe`, `message`, `pmessage`,
6//! `unsubscribe`, … frames until the connection is closed. That semantic
7//! doesn't fit the one-shot `Connection::request` shape, so subscribed
8//! traffic gets its own type, [`Subscriber`].
9//!
10//! Two backends, switched on the URL:
11//! - `kevy://` / `redis://` / `tcp://` — dedicated TCP socket
12//! - `mem://<name>` / `file:///path` — in-process bus, via the URL
13//! registry in [`crate::resolve_store`]. Anonymous `mem://` (no name)
14//! has no bus and is rejected; use a named bus to actually receive
15//! messages from a [`crate::Connection::publish`] on the same URL.
16//!
17//! ```no_run
18//! use kevy_client::{Subscriber, PubsubEvent};
19//!
20//! let mut sub = Subscriber::open("kevy://localhost:6379", &[b"news"])?;
21//! loop {
22//! if let PubsubEvent::Message { channel, payload } = sub.recv()? {
23//! println!("{}: {}", String::from_utf8_lossy(&channel),
24//! String::from_utf8_lossy(&payload));
25//! }
26//! }
27//! # Ok::<(), std::io::Error>(())
28//! ```
29
30use std::io::{self, Read, Write};
31use std::net::TcpStream;
32use std::time::Duration;
33
34use kevy_embedded::Subscription;
35use kevy_resp::{Reply, encode_command};
36
37#[cfg(test)]
38use crate::subscribe_io::classify;
39use crate::subscribe_io::{frame_to_event, invalid, recv_remote, send_to, shape};
40use crate::{Target, parse_url, resolve_store};
41
42/// One subscribed connection. Owns either a TCP socket or an in-process
43/// [`Subscription`]; the variant is chosen by the URL scheme in
44/// [`Subscriber::open`] / [`Subscriber::connect`].
45#[derive(Debug)]
46pub struct Subscriber {
47 inner: Inner,
48}
49
50#[derive(Debug)]
51enum Inner {
52 /// TCP RESP2 connection, drained one reply at a time.
53 Remote {
54 stream: TcpStream,
55 buf: Vec<u8>,
56 },
57 /// In-process bus subscription. `timeout` mirrors the TCP
58 /// `SO_RCVTIMEO` behaviour for [`Subscriber::recv`] / [`Subscriber::set_read_timeout`].
59 Embedded {
60 subscription: Subscription,
61 timeout: Option<Duration>,
62 },
63}
64
65/// One pubsub frame received from the bus or the wire.
66///
67/// `Unsubscribe` / `Punsubscribe`'s `channel` / `pattern` is `None` when
68/// the server is acknowledging "unsubscribed from everything" with a nil
69/// bulk — matching the Redis wire shape.
70#[non_exhaustive]
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum PubsubEvent {
73 /// `SUBSCRIBE` ack — one per channel the client subscribed to.
74 Subscribe {
75 /// Channel that was just subscribed.
76 channel: Vec<u8>,
77 /// Total number of channels + patterns the connection is now subscribed to.
78 count: i64,
79 },
80 /// `PSUBSCRIBE` ack — one per pattern.
81 Psubscribe {
82 /// Pattern that was just subscribed.
83 pattern: Vec<u8>,
84 /// Total number of channels + patterns the connection is now subscribed to.
85 count: i64,
86 },
87 /// `UNSUBSCRIBE` ack — `channel: None` when the server is reporting
88 /// "no channels were subscribed" (the spec's nil bulk).
89 Unsubscribe {
90 /// Channel that was just unsubscribed (`None` for "all" / "none").
91 channel: Option<Vec<u8>>,
92 /// Total number of channels + patterns still subscribed.
93 count: i64,
94 },
95 /// `PUNSUBSCRIBE` ack — pattern `None` when the server is reporting
96 /// "no patterns were subscribed".
97 Punsubscribe {
98 /// Pattern that was just unsubscribed (`None` for "all" / "none").
99 pattern: Option<Vec<u8>>,
100 /// Total number of channels + patterns still subscribed.
101 count: i64,
102 },
103 /// Plain `PUBLISH` delivery on a subscribed channel.
104 Message {
105 /// Channel the publish was made to.
106 channel: Vec<u8>,
107 /// Raw payload bytes (no encoding assumed).
108 payload: Vec<u8>,
109 },
110 /// Pattern-match delivery: a `PUBLISH` to a channel that matched one
111 /// of this connection's patterns.
112 Pmessage {
113 /// Pattern the channel matched.
114 pattern: Vec<u8>,
115 /// Channel the publish was made to.
116 channel: Vec<u8>,
117 /// Raw payload bytes.
118 payload: Vec<u8>,
119 },
120}
121
122impl Subscriber {
123 /// Open a fresh connection without subscribing to anything yet. Call
124 /// [`Self::subscribe`] / [`Self::psubscribe`] next.
125 ///
126 /// Accepted URLs:
127 /// - `kevy://`, `redis://`, `tcp://` — TCP RESP server
128 /// - `mem://<name>`, `file:///path` — in-process shared bus
129 /// - `mem://` (anonymous), `rediss://`, `kevys://`, `redis://user:pass@…`
130 /// are rejected with [`io::ErrorKind::Unsupported`]
131 pub fn connect(url: &str) -> io::Result<Self> {
132 let target = parse_url(url)?;
133 let inner = match target {
134 Target::EmbedMemoryAnonymous => {
135 return Err(io::Error::new(
136 io::ErrorKind::Unsupported,
137 "anonymous mem:// has no other producer; use mem://<name> for a shared bus",
138 ));
139 }
140 Target::EmbedMemoryNamed(_) | Target::EmbedPersist(_) => Inner::Embedded {
141 subscription: resolve_store(&target)?.subscribe(&[]),
142 timeout: None,
143 },
144 Target::Remote(remote_url) => {
145 let (host, port) = remote_host_port(&remote_url)?;
146 let stream = TcpStream::connect((host.as_str(), port))?;
147 stream.set_nodelay(true).ok();
148 Inner::Remote {
149 stream,
150 buf: Vec::with_capacity(8192),
151 }
152 }
153 };
154 Ok(Self { inner })
155 }
156
157 /// Open and subscribe to one or more channels in one step. Returns
158 /// `ErrorKind::InvalidInput` if `channels` is empty (use
159 /// [`Self::connect`] for an empty start).
160 pub fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
161 if channels.is_empty() {
162 return Err(io::Error::new(
163 io::ErrorKind::InvalidInput,
164 "Subscriber::open needs ≥ 1 channel — use Subscriber::connect() for empty start",
165 ));
166 }
167 let mut s = Self::connect(url)?;
168 s.subscribe(channels)?;
169 Ok(s)
170 }
171
172 /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks
173 /// are delivered via [`Self::recv`].
174 pub fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
175 if channels.is_empty() {
176 return Err(io::Error::new(
177 io::ErrorKind::InvalidInput,
178 "SUBSCRIBE needs ≥ 1 channel",
179 ));
180 }
181 match &mut self.inner {
182 Inner::Remote { stream, .. } => send_to(stream, b"SUBSCRIBE", channels),
183 Inner::Embedded { subscription, .. } => {
184 subscription.subscribe(channels);
185 Ok(())
186 }
187 }
188 }
189
190 /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
191 /// (`*`, `?`, `[…]`).
192 pub fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
193 if patterns.is_empty() {
194 return Err(io::Error::new(
195 io::ErrorKind::InvalidInput,
196 "PSUBSCRIBE needs ≥ 1 pattern",
197 ));
198 }
199 match &mut self.inner {
200 Inner::Remote { stream, .. } => send_to(stream, b"PSUBSCRIBE", patterns),
201 Inner::Embedded { subscription, .. } => {
202 subscription.psubscribe(patterns);
203 Ok(())
204 }
205 }
206 }
207
208 /// `UNSUBSCRIBE [channel ...]`. Empty `channels` unsubscribes from
209 /// every channel (Redis wire semantics).
210 pub fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
211 match &mut self.inner {
212 Inner::Remote { stream, .. } => send_to(stream, b"UNSUBSCRIBE", channels),
213 Inner::Embedded { subscription, .. } => {
214 subscription.unsubscribe(channels);
215 Ok(())
216 }
217 }
218 }
219
220 /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` unsubscribes from
221 /// every pattern.
222 pub fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
223 match &mut self.inner {
224 Inner::Remote { stream, .. } => send_to(stream, b"PUNSUBSCRIBE", patterns),
225 Inner::Embedded { subscription, .. } => {
226 subscription.punsubscribe(patterns);
227 Ok(())
228 }
229 }
230 }
231
232 /// Block until the next pubsub frame arrives. Apply
233 /// [`Self::set_read_timeout`] for bounded blocking.
234 /// Connection close / bus tear-down yields `ErrorKind::UnexpectedEof`.
235 pub fn recv(&mut self) -> io::Result<PubsubEvent> {
236 match &mut self.inner {
237 Inner::Remote { stream, buf } => recv_remote(stream, buf),
238 Inner::Embedded {
239 subscription,
240 timeout,
241 } => {
242 let frame = match *timeout {
243 Some(d) => subscription.recv_timeout(d)?,
244 None => subscription.recv()?,
245 };
246 Ok(frame_to_event(frame))
247 }
248 }
249 }
250
251 /// Block until the next published `Message` / `Pmessage` arrives,
252 /// silently skipping subscription-acknowledgement frames
253 /// ([`PubsubEvent::Subscribe`] / [`PubsubEvent::Unsubscribe`] /
254 /// [`PubsubEvent::Psubscribe`] / [`PubsubEvent::Punsubscribe`]) along
255 /// the way.
256 ///
257 /// This is the form most callers want — almost no consumer of
258 /// pubsub needs to see the ack frames (they're a wire-protocol
259 /// detail), so a loop+match around [`Self::recv`] is essentially
260 /// boilerplate. Returns `(channel, payload)`. For pattern matches,
261 /// `channel` is the concrete channel the publisher used (matching
262 /// Redis's `pmessage` shape, where `pattern` is discarded — use
263 /// [`Self::recv`] directly if you need it).
264 ///
265 /// Errors from [`Self::recv`] (connection close, timeout, etc.)
266 /// propagate unchanged.
267 pub fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
268 loop {
269 match self.recv()? {
270 PubsubEvent::Message { channel, payload } => return Ok((channel, payload)),
271 PubsubEvent::Pmessage { channel, payload, .. } => {
272 return Ok((channel, payload));
273 }
274 // Ack frames — keep waiting for the next real message.
275 PubsubEvent::Subscribe { .. }
276 | PubsubEvent::Psubscribe { .. }
277 | PubsubEvent::Unsubscribe { .. }
278 | PubsubEvent::Punsubscribe { .. } => continue,
279 }
280 }
281 }
282
283 /// Negotiate RESP3 on this connection by sending `HELLO 3` and
284 /// draining the ack. Subsequent `SUBSCRIBE` / `PSUBSCRIBE` /
285 /// `PUBLISH` deliveries arrive as push frames (`>N\r\n…`) instead
286 /// of the legacy RESP2 array shape (`*N\r\n…`); [`Self::recv`]
287 /// accepts both transparently, so existing code keeps working with
288 /// no other changes.
289 ///
290 /// Remote-only: the embedded backend has no proto negotiation
291 /// concept (frames go through the in-process bus typed). Calling
292 /// `hello3` on an embedded [`Subscriber`] returns
293 /// [`io::ErrorKind::Unsupported`].
294 ///
295 /// Must be called BEFORE any [`Self::subscribe`] /
296 /// [`Self::psubscribe`] — Redis requires `HELLO` be the first
297 /// command on a connection that uses it.
298 pub fn hello3(&mut self) -> io::Result<PubsubEvent> {
299 match &mut self.inner {
300 Inner::Embedded { .. } => Err(io::Error::new(
301 io::ErrorKind::Unsupported,
302 "HELLO 3 is a remote/TCP-only operation; embedded backend has no proto switch",
303 )),
304 Inner::Remote { stream, buf } => {
305 let mut frame = Vec::new();
306 encode_command(&mut frame, &[b"HELLO".to_vec(), b"3".to_vec()]);
307 stream.write_all(&frame)?;
308 // The HELLO 3 ack itself comes back as a RESP3 Map
309 // (`%7\r\n…`). parse_reply accepts it (P1); we drain
310 // and discard since the proto switch is the actual
311 // semantic — the body's just server metadata.
312 let mut chunk = [0u8; 4096];
313 loop {
314 match kevy_resp::parse_reply(buf) {
315 Ok(Some((reply, used))) => {
316 buf.drain(..used);
317 // Reply::Map / Reply::Array both acceptable
318 // (a server that rejected V3 would emit an
319 // Error reply — fall through to the error
320 // branch below).
321 return match reply {
322 Reply::Map(_) | Reply::Array(_) => {
323 Ok(PubsubEvent::Subscribe {
324 channel: b"HELLO".to_vec(),
325 count: 3,
326 })
327 }
328 Reply::Error(e) => Err(io::Error::other(
329 String::from_utf8_lossy(&e).into_owned(),
330 )),
331 other => Err(invalid(format!(
332 "unexpected HELLO 3 reply shape: {}",
333 shape(&other)
334 ))),
335 };
336 }
337 Ok(None) => {}
338 Err(_) => {
339 return Err(io::Error::new(
340 io::ErrorKind::InvalidData,
341 "malformed HELLO 3 reply",
342 ));
343 }
344 }
345 let n = stream.read(&mut chunk)?;
346 if n == 0 {
347 return Err(io::Error::new(
348 io::ErrorKind::UnexpectedEof,
349 "server closed connection during HELLO 3",
350 ));
351 }
352 buf.extend_from_slice(&chunk[..n]);
353 }
354 }
355 }
356 }
357
358 /// Apply (or clear) a read timeout. After setting `Some(dur)`,
359 /// [`Self::recv`] returns an `io::Error` of kind `WouldBlock` /
360 /// `TimedOut` when no frame arrives within `dur`.
361 pub fn set_read_timeout(&mut self, dur: Option<Duration>) -> io::Result<()> {
362 match &mut self.inner {
363 Inner::Remote { stream, .. } => stream.set_read_timeout(dur),
364 Inner::Embedded { timeout, .. } => {
365 *timeout = dur;
366 Ok(())
367 }
368 }
369 }
370
371 /// Borrowing iterator over every pubsub frame — ack frames included.
372 /// Each `next()` is one blocking [`Self::recv`]. Terminates (`None`)
373 /// when the underlying stream / bus is gone (`ErrorKind::UnexpectedEof`);
374 /// every other error is surfaced as `Some(Err(_))` so the caller can
375 /// decide whether to retry (e.g. a read timeout) or break.
376 ///
377 /// kevy stays 0-deps so this is a `std::iter::Iterator`, not a
378 /// `futures::Stream`. Async runtimes consume it via
379 /// `spawn_blocking` (see `docs/pubsub.md`).
380 pub fn events(&mut self) -> SubscriberEvents<'_> {
381 SubscriberEvents { sub: self }
382 }
383
384 /// Borrowing iterator that silently skips `(p)?(un)?subscribe` acks
385 /// and yields the payload tuples consumers actually want. Mirrors
386 /// [`Self::recv_message`] in iterator form. For `Pmessage` the
387 /// pattern is discarded — fall back to [`Self::events`] if you need it.
388 pub fn messages(&mut self) -> SubscriberMessages<'_> {
389 SubscriberMessages { sub: self }
390 }
391}
392
393/// Iterator returned by [`Subscriber::events`]. Yields every pubsub
394/// frame (acks + payloads). See the method docs for termination + error
395/// semantics.
396#[derive(Debug)]
397pub struct SubscriberEvents<'a> {
398 sub: &'a mut Subscriber,
399}
400
401impl Iterator for SubscriberEvents<'_> {
402 type Item = io::Result<PubsubEvent>;
403 fn next(&mut self) -> Option<Self::Item> {
404 match self.sub.recv() {
405 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
406 other => Some(other),
407 }
408 }
409}
410
411/// Iterator returned by [`Subscriber::messages`]. Yields one
412/// `(channel, payload)` per published `message` / `pmessage`; ack frames
413/// are silently consumed and not yielded.
414#[derive(Debug)]
415pub struct SubscriberMessages<'a> {
416 sub: &'a mut Subscriber,
417}
418
419impl Iterator for SubscriberMessages<'_> {
420 type Item = io::Result<(Vec<u8>, Vec<u8>)>;
421 fn next(&mut self) -> Option<Self::Item> {
422 match self.sub.recv_message() {
423 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
424 other => Some(other),
425 }
426 }
427}
428
429// `send_to` / `recv_remote` / `frame_to_event` / `classify` and the
430// per-field reply unwrap helpers live in [`crate::subscribe_io`] —
431// split out so this file stays under the 500-LOC house rule.
432
433// ─────────────────────────────────────────────────────────────────────────
434// Remote host:port extraction. Reuses the same authority parsing logic
435// kevy-resp-client::from_url applies, but only needs host+port (pub/sub
436// is global, not db-scoped — any /N path segment is ignored).
437// ─────────────────────────────────────────────────────────────────────────
438
439fn remote_host_port(url: &str) -> io::Result<(String, u16)> {
440 let (_scheme, rest) = url.split_once("://").ok_or_else(|| {
441 io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'")
442 })?;
443 if rest.contains('@') {
444 return Err(io::Error::new(
445 io::ErrorKind::Unsupported,
446 "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
447 ));
448 }
449 let authority = rest.split('/').next().unwrap_or("");
450 let (host, port) = match authority.rsplit_once(':') {
451 Some((h, p)) => {
452 let port: u16 = p.parse().map_err(|_| {
453 io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
454 })?;
455 (h.to_string(), port)
456 }
457 None => (authority.to_string(), 6379),
458 };
459 if host.is_empty() {
460 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
461 }
462 Ok((host, port))
463}
464
465#[cfg(test)]
466#[path = "subscribe_tests.rs"]
467mod tests;