kevy_client/subscribe.rs
1//! Pub/sub consumer side — a connection dedicated to receiving messages.
2//!
3//! `SUBSCRIBE` / `PSUBSCRIBE` morph a connection into a one-way event
4//! stream: the client no longer sends ordinary commands and instead reads
5//! an unbounded sequence of `subscribe`, `message`, `pmessage`,
6//! `unsubscribe`, … frames until the connection is closed. That semantic
7//! doesn't fit the one-shot `Connection::request` shape, so subscribed
8//! traffic gets its own type, [`Subscriber`].
9//!
10//! Two backends, switched on the URL:
11//! - `kevy://` / `redis://` / `tcp://` — dedicated TCP socket
12//! - `mem://<name>` / `file:///path` — in-process bus, via the URL
13//! registry in [`crate::resolve_store`]. Anonymous `mem://` (no name)
14//! has no bus and is rejected; use a named bus to actually receive
15//! messages from a [`crate::Connection::publish`] on the same URL.
16//!
17//! ```no_run
18//! use kevy_client::{Subscriber, PubsubEvent};
19//!
20//! let mut sub = Subscriber::open("kevy://localhost:6379", &[b"news"])?;
21//! loop {
22//! if let PubsubEvent::Message { channel, payload } = sub.recv()? {
23//! println!("{}: {}", String::from_utf8_lossy(&channel),
24//! String::from_utf8_lossy(&payload));
25//! }
26//! }
27//! # Ok::<(), std::io::Error>(())
28//! ```
29
30use std::io::{self, Read, Write};
31use std::net::TcpStream;
32use std::time::Duration;
33
34use kevy_embedded::Subscription;
35use kevy_resp::{Reply, encode_command};
36
37#[cfg(test)]
38use crate::subscribe_io::classify;
39use crate::subscribe_io::{frame_to_event, invalid, recv_remote, send_to, shape};
40use crate::{Target, parse_url, resolve_store};
41
42/// One subscribed connection. Owns either a TCP socket or an in-process
43/// [`Subscription`]; the variant is chosen by the URL scheme in
44/// [`Subscriber::open`] / [`Subscriber::connect`].
45#[derive(Debug)]
46pub struct Subscriber {
47 inner: Inner,
48}
49
50#[derive(Debug)]
51enum Inner {
52 /// TCP RESP2 connection, drained one reply at a time.
53 Remote {
54 stream: TcpStream,
55 buf: Vec<u8>,
56 },
57 /// In-process bus subscription. `timeout` mirrors the TCP
58 /// `SO_RCVTIMEO` behaviour for [`Subscriber::recv`] / [`Subscriber::set_read_timeout`].
59 Embedded {
60 subscription: Subscription,
61 timeout: Option<Duration>,
62 },
63}
64
65/// One pubsub frame received from the bus or the wire.
66///
67/// `Unsubscribe` / `Punsubscribe`'s `channel` / `pattern` is `None` when
68/// the server is acknowledging "unsubscribed from everything" with a nil
69/// bulk — matching the Redis wire shape.
70#[non_exhaustive]
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum PubsubEvent {
73 /// `SUBSCRIBE` ack — one per channel the client subscribed to.
74 Subscribe {
75 /// Channel that was just subscribed.
76 channel: Vec<u8>,
77 /// Total number of channels + patterns the connection is now subscribed to.
78 count: i64,
79 },
80 /// `PSUBSCRIBE` ack — one per pattern.
81 Psubscribe {
82 /// Pattern that was just subscribed.
83 pattern: Vec<u8>,
84 /// Total number of channels + patterns the connection is now subscribed to.
85 count: i64,
86 },
87 /// `UNSUBSCRIBE` ack — `channel: None` when the server is reporting
88 /// "no channels were subscribed" (the spec's nil bulk).
89 Unsubscribe {
90 /// Channel that was just unsubscribed (`None` for "all" / "none").
91 channel: Option<Vec<u8>>,
92 /// Total number of channels + patterns still subscribed.
93 count: i64,
94 },
95 /// `PUNSUBSCRIBE` ack — pattern `None` when the server is reporting
96 /// "no patterns were subscribed".
97 Punsubscribe {
98 /// Pattern that was just unsubscribed (`None` for "all" / "none").
99 pattern: Option<Vec<u8>>,
100 /// Total number of channels + patterns still subscribed.
101 count: i64,
102 },
103 /// Plain `PUBLISH` delivery on a subscribed channel.
104 Message {
105 /// Channel the publish was made to.
106 channel: Vec<u8>,
107 /// Raw payload bytes (no encoding assumed).
108 payload: Vec<u8>,
109 },
110 /// Pattern-match delivery: a `PUBLISH` to a channel that matched one
111 /// of this connection's patterns.
112 Pmessage {
113 /// Pattern the channel matched.
114 pattern: Vec<u8>,
115 /// Channel the publish was made to.
116 channel: Vec<u8>,
117 /// Raw payload bytes.
118 payload: Vec<u8>,
119 },
120}
121
122impl Subscriber {
123 /// Open a fresh connection without subscribing to anything yet. Call
124 /// [`Self::subscribe`] / [`Self::psubscribe`] next.
125 ///
126 /// Accepted URLs:
127 /// - `kevy://`, `redis://`, `tcp://` — TCP RESP server
128 /// - `mem://<name>`, `file:///path` — in-process shared bus
129 /// - `mem://` (anonymous), `rediss://`, `kevys://`, `redis://user:pass@…`
130 /// are rejected with [`io::ErrorKind::Unsupported`]
131 pub fn connect(url: &str) -> io::Result<Self> {
132 let target = parse_url(url)?;
133 let inner = match target {
134 Target::EmbedMemoryAnonymous => {
135 return Err(io::Error::new(
136 io::ErrorKind::Unsupported,
137 "anonymous mem:// has no other producer; use mem://<name> for a shared bus",
138 ));
139 }
140 Target::EmbedMemoryNamed(_) | Target::EmbedPersist(_) => Inner::Embedded {
141 subscription: resolve_store(&target)?.subscribe(&[]),
142 timeout: None,
143 },
144 Target::Remote(remote_url) => {
145 let (host, port) = remote_host_port(&remote_url)?;
146 let stream = TcpStream::connect((host.as_str(), port))?;
147 stream.set_nodelay(true).ok();
148 Inner::Remote {
149 stream,
150 buf: Vec::with_capacity(8192),
151 }
152 }
153 };
154 Ok(Self { inner })
155 }
156
157 /// Open and subscribe to one or more channels in one step. Returns
158 /// `ErrorKind::InvalidInput` if `channels` is empty (use
159 /// [`Self::connect`] for an empty start).
160 pub fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
161 if channels.is_empty() {
162 return Err(io::Error::new(
163 io::ErrorKind::InvalidInput,
164 "Subscriber::open needs ≥ 1 channel — use Subscriber::connect() for empty start",
165 ));
166 }
167 let mut s = Self::connect(url)?;
168 s.subscribe(channels)?;
169 Ok(s)
170 }
171
172 /// `SUBSCRIBE channel [channel ...]`. Per-channel `Subscribe` acks
173 /// are delivered via [`Self::recv`].
174 pub fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
175 if channels.is_empty() {
176 return Err(io::Error::new(
177 io::ErrorKind::InvalidInput,
178 "SUBSCRIBE needs ≥ 1 channel",
179 ));
180 }
181 match &mut self.inner {
182 Inner::Remote { stream, .. } => send_to(stream, b"SUBSCRIBE", channels),
183 Inner::Embedded { subscription, .. } => {
184 subscription.subscribe(channels);
185 Ok(())
186 }
187 }
188 }
189
190 /// `PSUBSCRIBE pattern [pattern ...]`. Patterns use Redis glob syntax
191 /// (`*`, `?`, `[…]`).
192 pub fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
193 if patterns.is_empty() {
194 return Err(io::Error::new(
195 io::ErrorKind::InvalidInput,
196 "PSUBSCRIBE needs ≥ 1 pattern",
197 ));
198 }
199 match &mut self.inner {
200 Inner::Remote { stream, .. } => send_to(stream, b"PSUBSCRIBE", patterns),
201 Inner::Embedded { subscription, .. } => {
202 subscription.psubscribe(patterns);
203 Ok(())
204 }
205 }
206 }
207
208 /// `UNSUBSCRIBE [channel ...]`. Empty `channels` unsubscribes from
209 /// every channel (Redis wire semantics).
210 pub fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
211 match &mut self.inner {
212 Inner::Remote { stream, .. } => send_to(stream, b"UNSUBSCRIBE", channels),
213 Inner::Embedded { subscription, .. } => {
214 subscription.unsubscribe(channels);
215 Ok(())
216 }
217 }
218 }
219
220 /// `PUNSUBSCRIBE [pattern ...]`. Empty `patterns` unsubscribes from
221 /// every pattern.
222 pub fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
223 match &mut self.inner {
224 Inner::Remote { stream, .. } => send_to(stream, b"PUNSUBSCRIBE", patterns),
225 Inner::Embedded { subscription, .. } => {
226 subscription.punsubscribe(patterns);
227 Ok(())
228 }
229 }
230 }
231
232 /// Block until the next pubsub frame arrives. Apply
233 /// [`Self::set_read_timeout`] for bounded blocking.
234 /// Connection close / bus tear-down yields `ErrorKind::UnexpectedEof`.
235 pub fn recv(&mut self) -> io::Result<PubsubEvent> {
236 match &mut self.inner {
237 Inner::Remote { stream, buf } => recv_remote(stream, buf),
238 Inner::Embedded {
239 subscription,
240 timeout,
241 } => {
242 let frame = match *timeout {
243 Some(d) => subscription.recv_timeout(d)?,
244 None => subscription.recv()?,
245 };
246 Ok(frame_to_event(frame))
247 }
248 }
249 }
250
251 /// Block until the next published `Message` / `Pmessage` arrives,
252 /// silently skipping subscription-acknowledgement frames
253 /// ([`PubsubEvent::Subscribe`] / [`Unsubscribe`] / [`Psubscribe`] /
254 /// [`Punsubscribe`]) along the way.
255 ///
256 /// This is the form most callers want — almost no consumer of
257 /// pubsub needs to see the ack frames (they're a wire-protocol
258 /// detail), so a loop+match around [`Self::recv`] is essentially
259 /// boilerplate. Returns `(channel, payload)`. For pattern matches,
260 /// `channel` is the concrete channel the publisher used (matching
261 /// Redis's `pmessage` shape, where `pattern` is discarded — use
262 /// [`Self::recv`] directly if you need it).
263 ///
264 /// Errors from [`Self::recv`] (connection close, timeout, etc.)
265 /// propagate unchanged.
266 pub fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
267 loop {
268 match self.recv()? {
269 PubsubEvent::Message { channel, payload } => return Ok((channel, payload)),
270 PubsubEvent::Pmessage { channel, payload, .. } => {
271 return Ok((channel, payload));
272 }
273 // Ack frames — keep waiting for the next real message.
274 PubsubEvent::Subscribe { .. }
275 | PubsubEvent::Psubscribe { .. }
276 | PubsubEvent::Unsubscribe { .. }
277 | PubsubEvent::Punsubscribe { .. } => continue,
278 }
279 }
280 }
281
282 /// Negotiate RESP3 on this connection by sending `HELLO 3` and
283 /// draining the ack. Subsequent `SUBSCRIBE` / `PSUBSCRIBE` /
284 /// `PUBLISH` deliveries arrive as push frames (`>N\r\n…`) instead
285 /// of the legacy RESP2 array shape (`*N\r\n…`); [`Self::recv`]
286 /// accepts both transparently, so existing code keeps working with
287 /// no other changes.
288 ///
289 /// Remote-only: the embedded backend has no proto negotiation
290 /// concept (frames go through the in-process bus typed). Calling
291 /// `hello3` on an embedded [`Subscriber`] returns
292 /// [`io::ErrorKind::Unsupported`].
293 ///
294 /// Must be called BEFORE any [`Self::subscribe`] /
295 /// [`Self::psubscribe`] — Redis requires `HELLO` be the first
296 /// command on a connection that uses it.
297 pub fn hello3(&mut self) -> io::Result<PubsubEvent> {
298 match &mut self.inner {
299 Inner::Embedded { .. } => Err(io::Error::new(
300 io::ErrorKind::Unsupported,
301 "HELLO 3 is a remote/TCP-only operation; embedded backend has no proto switch",
302 )),
303 Inner::Remote { stream, buf } => {
304 let mut frame = Vec::new();
305 encode_command(&mut frame, &[b"HELLO".to_vec(), b"3".to_vec()]);
306 stream.write_all(&frame)?;
307 // The HELLO 3 ack itself comes back as a RESP3 Map
308 // (`%7\r\n…`). parse_reply accepts it (P1); we drain
309 // and discard since the proto switch is the actual
310 // semantic — the body's just server metadata.
311 let mut chunk = [0u8; 4096];
312 loop {
313 match kevy_resp::parse_reply(buf) {
314 Ok(Some((reply, used))) => {
315 buf.drain(..used);
316 // Reply::Map / Reply::Array both acceptable
317 // (a server that rejected V3 would emit an
318 // Error reply — fall through to the error
319 // branch below).
320 return match reply {
321 Reply::Map(_) | Reply::Array(_) => {
322 Ok(PubsubEvent::Subscribe {
323 channel: b"HELLO".to_vec(),
324 count: 3,
325 })
326 }
327 Reply::Error(e) => Err(io::Error::other(
328 String::from_utf8_lossy(&e).into_owned(),
329 )),
330 other => Err(invalid(format!(
331 "unexpected HELLO 3 reply shape: {}",
332 shape(&other)
333 ))),
334 };
335 }
336 Ok(None) => {}
337 Err(_) => {
338 return Err(io::Error::new(
339 io::ErrorKind::InvalidData,
340 "malformed HELLO 3 reply",
341 ));
342 }
343 }
344 let n = stream.read(&mut chunk)?;
345 if n == 0 {
346 return Err(io::Error::new(
347 io::ErrorKind::UnexpectedEof,
348 "server closed connection during HELLO 3",
349 ));
350 }
351 buf.extend_from_slice(&chunk[..n]);
352 }
353 }
354 }
355 }
356
357 /// Apply (or clear) a read timeout. After setting `Some(dur)`,
358 /// [`Self::recv`] returns an `io::Error` of kind `WouldBlock` /
359 /// `TimedOut` when no frame arrives within `dur`.
360 pub fn set_read_timeout(&mut self, dur: Option<Duration>) -> io::Result<()> {
361 match &mut self.inner {
362 Inner::Remote { stream, .. } => stream.set_read_timeout(dur),
363 Inner::Embedded { timeout, .. } => {
364 *timeout = dur;
365 Ok(())
366 }
367 }
368 }
369
370 /// Borrowing iterator over every pubsub frame — ack frames included.
371 /// Each `next()` is one blocking [`Self::recv`]. Terminates (`None`)
372 /// when the underlying stream / bus is gone (`ErrorKind::UnexpectedEof`);
373 /// every other error is surfaced as `Some(Err(_))` so the caller can
374 /// decide whether to retry (e.g. a read timeout) or break.
375 ///
376 /// kevy stays 0-deps so this is a `std::iter::Iterator`, not a
377 /// `futures::Stream`. Async runtimes consume it via
378 /// `spawn_blocking` (see `docs/pubsub.md`).
379 pub fn events(&mut self) -> SubscriberEvents<'_> {
380 SubscriberEvents { sub: self }
381 }
382
383 /// Borrowing iterator that silently skips `(p)?(un)?subscribe` acks
384 /// and yields the payload tuples consumers actually want. Mirrors
385 /// [`Self::recv_message`] in iterator form. For `Pmessage` the
386 /// pattern is discarded — fall back to [`Self::events`] if you need it.
387 pub fn messages(&mut self) -> SubscriberMessages<'_> {
388 SubscriberMessages { sub: self }
389 }
390}
391
392/// Iterator returned by [`Subscriber::events`]. Yields every pubsub
393/// frame (acks + payloads). See the method docs for termination + error
394/// semantics.
395#[derive(Debug)]
396pub struct SubscriberEvents<'a> {
397 sub: &'a mut Subscriber,
398}
399
400impl Iterator for SubscriberEvents<'_> {
401 type Item = io::Result<PubsubEvent>;
402 fn next(&mut self) -> Option<Self::Item> {
403 match self.sub.recv() {
404 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
405 other => Some(other),
406 }
407 }
408}
409
410/// Iterator returned by [`Subscriber::messages`]. Yields one
411/// `(channel, payload)` per published `message` / `pmessage`; ack frames
412/// are silently consumed and not yielded.
413#[derive(Debug)]
414pub struct SubscriberMessages<'a> {
415 sub: &'a mut Subscriber,
416}
417
418impl Iterator for SubscriberMessages<'_> {
419 type Item = io::Result<(Vec<u8>, Vec<u8>)>;
420 fn next(&mut self) -> Option<Self::Item> {
421 match self.sub.recv_message() {
422 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => None,
423 other => Some(other),
424 }
425 }
426}
427
428// `send_to` / `recv_remote` / `frame_to_event` / `classify` and the
429// per-field reply unwrap helpers live in [`crate::subscribe_io`] —
430// split out so this file stays under the 500-LOC house rule.
431
432// ─────────────────────────────────────────────────────────────────────────
433// Remote host:port extraction. Reuses the same authority parsing logic
434// kevy-resp-client::from_url applies, but only needs host+port (pub/sub
435// is global, not db-scoped — any /N path segment is ignored).
436// ─────────────────────────────────────────────────────────────────────────
437
438fn remote_host_port(url: &str) -> io::Result<(String, u16)> {
439 let (_scheme, rest) = url.split_once("://").ok_or_else(|| {
440 io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'")
441 })?;
442 if rest.contains('@') {
443 return Err(io::Error::new(
444 io::ErrorKind::Unsupported,
445 "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
446 ));
447 }
448 let authority = rest.split('/').next().unwrap_or("");
449 let (host, port) = match authority.rsplit_once(':') {
450 Some((h, p)) => {
451 let port: u16 = p.parse().map_err(|_| {
452 io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
453 })?;
454 (h.to_string(), port)
455 }
456 None => (authority.to_string(), 6379),
457 };
458 if host.is_empty() {
459 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
460 }
461 Ok((host, port))
462}
463
464#[cfg(test)]
465#[path = "subscribe_tests.rs"]
466mod tests;