Skip to main content

kevy_client_async/
subscriber.rs

1//! Async mirror of `kevy_client::Subscriber` — TCP-only.
2//!
3//! A subscribed RESP connection cannot send normal commands, so this
4//! is a separate type from [`crate::AsyncConnection`] (matching the
5//! blocking client's split).
6//!
7//! What is NOT mirrored from the blocking surface:
8//! - `set_read_timeout`: in async land timeouts are runtime-level
9//!   (`tokio::time::timeout`, `async_io::Timer`); a socket-level
10//!   `SO_RCVTIMEO` makes no sense when the read itself is non-blocking.
11//!   Wrap a `recv()` future with your runtime's timeout primitive.
12//! - `events()` / `messages()` blocking iterators: the async-native
13//!   shape is a `Stream`, deferred to a future iteration. For now
14//!   loop `recv().await` / `recv_message().await` directly.
15//! - `mem://` / `file://` embed schemes: already rejected by the URL
16//!   parser; embedded pub/sub is in-process synchronous, blocking
17//!   client is strictly faster.
18
19use std::io;
20
21use kevy_resp::Reply;
22
23use crate::codec::AsyncRespCodec;
24use crate::pubsub::{PubsubEvent, classify};
25use crate::url::parse_url;
26
27#[cfg(feature = "tokio")]
28type DefaultTransport = tokio::net::TcpStream;
29#[cfg(feature = "smol")]
30type DefaultTransport = smol::net::TcpStream;
31#[cfg(feature = "async-std")]
32type DefaultTransport = async_std::net::TcpStream;
33
34#[cfg(feature = "tokio")]
35async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
36    crate::rt_tokio::connect(host, port).await
37}
38#[cfg(feature = "smol")]
39async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
40    crate::rt_smol::connect(host, port).await
41}
42#[cfg(feature = "async-std")]
43async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
44    crate::rt_async_std::connect(host, port).await
45}
46
47/// Subscribed async TCP-RESP connection. Mirrors
48/// [`kevy_client::Subscriber`] for TCP backends.
49pub struct AsyncSubscriber {
50    codec: AsyncRespCodec<DefaultTransport>,
51}
52
53impl AsyncSubscriber {
54    /// Open a fresh connection without subscribing yet. Call
55    /// [`Self::subscribe`] / [`Self::psubscribe`] next.
56    pub async fn connect(url: &str) -> io::Result<Self> {
57        let parsed = parse_url(url)?;
58        let transport = connect_default(&parsed.host, parsed.port).await?;
59        Ok(Self {
60            codec: AsyncRespCodec::new(transport),
61        })
62    }
63
64    /// Open and subscribe to one or more channels in one step.
65    pub async fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
66        if channels.is_empty() {
67            return Err(io::Error::new(
68                io::ErrorKind::InvalidInput,
69                "AsyncSubscriber::open needs ≥ 1 channel — use connect() for empty start",
70            ));
71        }
72        let mut s = Self::connect(url).await?;
73        s.subscribe(channels).await?;
74        Ok(s)
75    }
76
77    /// `SUBSCRIBE channel [channel ...]`. Per-channel Subscribe acks
78    /// arrive via [`Self::recv`].
79    pub async fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
80        if channels.is_empty() {
81            return Err(io::Error::new(
82                io::ErrorKind::InvalidInput,
83                "SUBSCRIBE needs ≥ 1 channel",
84            ));
85        }
86        self.send_with_args(b"SUBSCRIBE", channels).await
87    }
88
89    /// `PSUBSCRIBE pattern [pattern ...]`.
90    pub async fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
91        if patterns.is_empty() {
92            return Err(io::Error::new(
93                io::ErrorKind::InvalidInput,
94                "PSUBSCRIBE needs ≥ 1 pattern",
95            ));
96        }
97        self.send_with_args(b"PSUBSCRIBE", patterns).await
98    }
99
100    /// `UNSUBSCRIBE [channel ...]`. Empty list = unsubscribe all.
101    pub async fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
102        self.send_with_args(b"UNSUBSCRIBE", channels).await
103    }
104
105    /// `PUNSUBSCRIBE [pattern ...]`. Empty list = unsubscribe all.
106    pub async fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
107        self.send_with_args(b"PUNSUBSCRIBE", patterns).await
108    }
109
110    /// Await the next pubsub frame. Connection close = `UnexpectedEof`.
111    pub async fn recv(&mut self) -> io::Result<PubsubEvent> {
112        let reply = self.codec.read_reply().await?;
113        classify(reply)
114    }
115
116    /// Skip subscription-ack frames and return the next published
117    /// `Message` / `Pmessage`. Returns `(channel, payload)`; for
118    /// pattern matches `channel` is the concrete publish channel
119    /// (the matched pattern is discarded — use [`Self::recv`] if you
120    /// need it).
121    pub async fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
122        loop {
123            match self.recv().await? {
124                PubsubEvent::Message { channel, payload }
125                | PubsubEvent::Pmessage {
126                    channel, payload, ..
127                } => return Ok((channel, payload)),
128                _ => continue,
129            }
130        }
131    }
132
133    /// Negotiate RESP3 on this connection (`HELLO 3`). Must run BEFORE
134    /// any subscribe — Redis spec requires HELLO be the first command.
135    /// Returns a synthetic [`PubsubEvent::Subscribe`] marker (matching
136    /// the blocking client) so callers can pattern-match a uniform
137    /// type.
138    pub async fn hello3(&mut self) -> io::Result<PubsubEvent> {
139        let reply = self
140            .codec
141            .request(&[b"HELLO".to_vec(), b"3".to_vec()])
142            .await?;
143        match reply {
144            Reply::Map(_) | Reply::Array(_) => Ok(PubsubEvent::Subscribe {
145                channel: b"HELLO".to_vec(),
146                count: 3,
147            }),
148            Reply::Error(e) => Err(io::Error::other(
149                String::from_utf8_lossy(&e).into_owned(),
150            )),
151            other => Err(io::Error::new(
152                io::ErrorKind::InvalidData,
153                format!("unexpected HELLO 3 reply shape: {other:?}"),
154            )),
155        }
156    }
157
158    async fn send_with_args(&mut self, verb: &[u8], args: &[&[u8]]) -> io::Result<()> {
159        let mut argv = Vec::with_capacity(args.len() + 1);
160        argv.push(verb.to_vec());
161        argv.extend(args.iter().map(|a| a.to_vec()));
162        self.codec.send(&argv).await
163    }
164}