Skip to main content

kevy_client_async/
pubsub.rs

1//! `PubsubEvent` enum — mirror of the type in
2//! `kevy_client::subscribe::PubsubEvent`. Lives here so the async
3//! subscriber doesn't have to dep on the blocking client crate.
4//!
5//! TODO: this is the third PubsubEvent in the workspace shape-wise
6//! (kevy-embedded's `PubsubFrame`, `kevy-client::subscribe::PubsubEvent`,
7//! and this one). When the fourth client lands, lift this and the
8//! RESP-shape classifier into a `kevy-pubsub` stone — same trigger as
9//! the `kevy-url` deferral noted in `url.rs`.
10
11use std::io;
12
13use kevy_resp::Reply;
14
15/// One pubsub frame received over the wire.
16#[non_exhaustive]
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum PubsubEvent {
19    /// `SUBSCRIBE` ack per channel.
20    Subscribe {
21        /// Channel that was just subscribed.
22        channel: Vec<u8>,
23        /// Total channels + patterns subscribed.
24        count: i64,
25    },
26    /// `PSUBSCRIBE` ack per pattern.
27    Psubscribe {
28        /// Pattern that was just subscribed.
29        pattern: Vec<u8>,
30        /// Total channels + patterns subscribed.
31        count: i64,
32    },
33    /// `UNSUBSCRIBE` ack.
34    Unsubscribe {
35        /// Channel just unsubscribed (`None` for "all"/"none" nil bulk).
36        channel: Option<Vec<u8>>,
37        /// Total channels + patterns still subscribed.
38        count: i64,
39    },
40    /// `PUNSUBSCRIBE` ack.
41    Punsubscribe {
42        /// Pattern just unsubscribed (`None` for "all"/"none" nil bulk).
43        pattern: Option<Vec<u8>>,
44        /// Total channels + patterns still subscribed.
45        count: i64,
46    },
47    /// Plain `PUBLISH` delivery on a subscribed channel.
48    Message {
49        /// Channel the publish was made to.
50        channel: Vec<u8>,
51        /// Raw payload bytes.
52        payload: Vec<u8>,
53    },
54    /// Pattern-match delivery.
55    Pmessage {
56        /// Pattern the channel matched.
57        pattern: Vec<u8>,
58        /// Channel the publish was made to.
59        channel: Vec<u8>,
60        /// Raw payload bytes.
61        payload: Vec<u8>,
62    },
63}
64
65/// Turn a RESP reply into a [`PubsubEvent`]. Handles both RESP2
66/// (`*N\r\n…` arrays) and RESP3 (`>N\r\n…` push frames).
67pub(crate) fn classify(reply: Reply) -> io::Result<PubsubEvent> {
68    let items = match reply {
69        Reply::Array(v) | Reply::Push(v) => v,
70        Reply::Error(e) => return Err(io::Error::other(String::from_utf8_lossy(&e).into_owned())),
71        other => {
72            return Err(invalid(format!(
73                "pubsub: expected array/push, got {}",
74                shape(&other)
75            )));
76        }
77    };
78
79    let mut it = items.into_iter();
80    let kind = take_bulk(
81        it.next().ok_or_else(|| invalid("pubsub: empty frame"))?,
82        "kind",
83    )?;
84
85    match kind.as_slice() {
86        b"subscribe" => {
87            let channel = take_bulk(
88                it.next().ok_or_else(|| invalid("subscribe: missing channel"))?,
89                "channel",
90            )?;
91            let count = take_int(
92                it.next().ok_or_else(|| invalid("subscribe: missing count"))?,
93                "count",
94            )?;
95            Ok(PubsubEvent::Subscribe { channel, count })
96        }
97        b"psubscribe" => {
98            let pattern = take_bulk(
99                it.next().ok_or_else(|| invalid("psubscribe: missing pattern"))?,
100                "pattern",
101            )?;
102            let count = take_int(
103                it.next().ok_or_else(|| invalid("psubscribe: missing count"))?,
104                "count",
105            )?;
106            Ok(PubsubEvent::Psubscribe { pattern, count })
107        }
108        b"unsubscribe" => {
109            let channel = take_bulk_or_nil(
110                it.next().ok_or_else(|| invalid("unsubscribe: missing channel"))?,
111                "channel",
112            )?;
113            let count = take_int(
114                it.next().ok_or_else(|| invalid("unsubscribe: missing count"))?,
115                "count",
116            )?;
117            Ok(PubsubEvent::Unsubscribe { channel, count })
118        }
119        b"punsubscribe" => {
120            let pattern = take_bulk_or_nil(
121                it.next()
122                    .ok_or_else(|| invalid("punsubscribe: missing pattern"))?,
123                "pattern",
124            )?;
125            let count = take_int(
126                it.next()
127                    .ok_or_else(|| invalid("punsubscribe: missing count"))?,
128                "count",
129            )?;
130            Ok(PubsubEvent::Punsubscribe { pattern, count })
131        }
132        b"message" => {
133            let channel = take_bulk(
134                it.next().ok_or_else(|| invalid("message: missing channel"))?,
135                "channel",
136            )?;
137            let payload = take_bulk(
138                it.next().ok_or_else(|| invalid("message: missing payload"))?,
139                "payload",
140            )?;
141            Ok(PubsubEvent::Message { channel, payload })
142        }
143        b"pmessage" => {
144            let pattern = take_bulk(
145                it.next().ok_or_else(|| invalid("pmessage: missing pattern"))?,
146                "pattern",
147            )?;
148            let channel = take_bulk(
149                it.next().ok_or_else(|| invalid("pmessage: missing channel"))?,
150                "channel",
151            )?;
152            let payload = take_bulk(
153                it.next().ok_or_else(|| invalid("pmessage: missing payload"))?,
154                "payload",
155            )?;
156            Ok(PubsubEvent::Pmessage {
157                pattern,
158                channel,
159                payload,
160            })
161        }
162        other => Err(invalid(format!(
163            "unknown pubsub kind: {}",
164            String::from_utf8_lossy(other)
165        ))),
166    }
167}
168
169fn take_bulk(r: Reply, field: &str) -> io::Result<Vec<u8>> {
170    match r {
171        Reply::Bulk(v) | Reply::Simple(v) => Ok(v),
172        other => Err(invalid(format!(
173            "pubsub field {field}: expected bulk, got {}",
174            shape(&other)
175        ))),
176    }
177}
178
179fn take_bulk_or_nil(r: Reply, field: &str) -> io::Result<Option<Vec<u8>>> {
180    match r {
181        Reply::Bulk(v) | Reply::Simple(v) => Ok(Some(v)),
182        Reply::Nil | Reply::Null => Ok(None),
183        other => Err(invalid(format!(
184            "pubsub field {field}: expected bulk/nil, got {}",
185            shape(&other)
186        ))),
187    }
188}
189
190fn take_int(r: Reply, field: &str) -> io::Result<i64> {
191    match r {
192        Reply::Int(n) => Ok(n),
193        other => Err(invalid(format!(
194            "pubsub field {field}: expected int, got {}",
195            shape(&other)
196        ))),
197    }
198}
199
200fn shape(r: &Reply) -> &'static str {
201    match r {
202        Reply::Simple(_) => "simple",
203        Reply::Error(_) => "error",
204        Reply::Int(_) => "int",
205        Reply::Bulk(_) => "bulk",
206        Reply::Nil | Reply::Null => "nil",
207        Reply::Array(_) => "array",
208        Reply::Map(_) => "map",
209        Reply::Set(_) => "set",
210        Reply::Double(_) => "double",
211        Reply::Boolean(_) => "boolean",
212        Reply::Verbatim { .. } => "verbatim",
213        Reply::BigNumber(_) => "bignumber",
214        Reply::Push(_) => "push",
215        Reply::BlobError(_) => "bloberror",
216    }
217}
218
219fn invalid(msg: impl Into<String>) -> io::Error {
220    io::Error::new(io::ErrorKind::InvalidData, msg.into())
221}