kevy_client_async/
subscriber.rs1use std::io;
20
21use kevy_resp::Reply;
22
23use crate::codec::AsyncRespCodec;
24use crate::pubsub::{PubsubEvent, classify};
25use crate::url::parse_url;
26
27#[cfg(feature = "tokio")]
28type DefaultTransport = tokio::net::TcpStream;
29#[cfg(feature = "smol")]
30type DefaultTransport = smol::net::TcpStream;
31#[cfg(feature = "async-std")]
32type DefaultTransport = async_std::net::TcpStream;
33
34#[cfg(feature = "tokio")]
35async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
36 crate::rt_tokio::connect(host, port).await
37}
38#[cfg(feature = "smol")]
39async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
40 crate::rt_smol::connect(host, port).await
41}
42#[cfg(feature = "async-std")]
43async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
44 crate::rt_async_std::connect(host, port).await
45}
46
47pub struct AsyncSubscriber {
50 codec: AsyncRespCodec<DefaultTransport>,
51}
52
53impl AsyncSubscriber {
54 pub async fn connect(url: &str) -> io::Result<Self> {
57 let parsed = parse_url(url)?;
58 let transport = connect_default(&parsed.host, parsed.port).await?;
59 Ok(Self {
60 codec: AsyncRespCodec::new(transport),
61 })
62 }
63
64 pub async fn open(url: &str, channels: &[&[u8]]) -> io::Result<Self> {
66 if channels.is_empty() {
67 return Err(io::Error::new(
68 io::ErrorKind::InvalidInput,
69 "AsyncSubscriber::open needs ≥ 1 channel — use connect() for empty start",
70 ));
71 }
72 let mut s = Self::connect(url).await?;
73 s.subscribe(channels).await?;
74 Ok(s)
75 }
76
77 pub async fn subscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
80 if channels.is_empty() {
81 return Err(io::Error::new(
82 io::ErrorKind::InvalidInput,
83 "SUBSCRIBE needs ≥ 1 channel",
84 ));
85 }
86 self.send_with_args(b"SUBSCRIBE", channels).await
87 }
88
89 pub async fn psubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
91 if patterns.is_empty() {
92 return Err(io::Error::new(
93 io::ErrorKind::InvalidInput,
94 "PSUBSCRIBE needs ≥ 1 pattern",
95 ));
96 }
97 self.send_with_args(b"PSUBSCRIBE", patterns).await
98 }
99
100 pub async fn unsubscribe(&mut self, channels: &[&[u8]]) -> io::Result<()> {
102 self.send_with_args(b"UNSUBSCRIBE", channels).await
103 }
104
105 pub async fn punsubscribe(&mut self, patterns: &[&[u8]]) -> io::Result<()> {
107 self.send_with_args(b"PUNSUBSCRIBE", patterns).await
108 }
109
110 pub async fn recv(&mut self) -> io::Result<PubsubEvent> {
112 let reply = self.codec.read_reply().await?;
113 classify(reply)
114 }
115
116 pub async fn recv_message(&mut self) -> io::Result<(Vec<u8>, Vec<u8>)> {
122 loop {
123 match self.recv().await? {
124 PubsubEvent::Message { channel, payload }
125 | PubsubEvent::Pmessage {
126 channel, payload, ..
127 } => return Ok((channel, payload)),
128 _ => continue,
129 }
130 }
131 }
132
133 pub async fn hello3(&mut self) -> io::Result<PubsubEvent> {
139 let reply = self
140 .codec
141 .request(&[b"HELLO".to_vec(), b"3".to_vec()])
142 .await?;
143 match reply {
144 Reply::Map(_) | Reply::Array(_) => Ok(PubsubEvent::Subscribe {
145 channel: b"HELLO".to_vec(),
146 count: 3,
147 }),
148 Reply::Error(e) => Err(io::Error::other(
149 String::from_utf8_lossy(&e).into_owned(),
150 )),
151 other => Err(io::Error::new(
152 io::ErrorKind::InvalidData,
153 format!("unexpected HELLO 3 reply shape: {other:?}"),
154 )),
155 }
156 }
157
158 async fn send_with_args(&mut self, verb: &[u8], args: &[&[u8]]) -> io::Result<()> {
159 let mut argv = Vec::with_capacity(args.len() + 1);
160 argv.push(verb.to_vec());
161 argv.extend(args.iter().map(|a| a.to_vec()));
162 self.codec.send(&argv).await
163 }
164}