1use std::io;
12
13use kevy_resp::Reply;
14
15#[non_exhaustive]
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum PubsubEvent {
19 Subscribe {
21 channel: Vec<u8>,
23 count: i64,
25 },
26 Psubscribe {
28 pattern: Vec<u8>,
30 count: i64,
32 },
33 Unsubscribe {
35 channel: Option<Vec<u8>>,
37 count: i64,
39 },
40 Punsubscribe {
42 pattern: Option<Vec<u8>>,
44 count: i64,
46 },
47 Message {
49 channel: Vec<u8>,
51 payload: Vec<u8>,
53 },
54 Pmessage {
56 pattern: Vec<u8>,
58 channel: Vec<u8>,
60 payload: Vec<u8>,
62 },
63}
64
65pub(crate) fn classify(reply: Reply) -> io::Result<PubsubEvent> {
68 let items = match reply {
69 Reply::Array(v) | Reply::Push(v) => v,
70 Reply::Error(e) => return Err(io::Error::other(String::from_utf8_lossy(&e).into_owned())),
71 other => {
72 return Err(invalid(format!(
73 "pubsub: expected array/push, got {}",
74 shape(&other)
75 )));
76 }
77 };
78
79 let mut it = items.into_iter();
80 let kind = take_bulk(
81 it.next().ok_or_else(|| invalid("pubsub: empty frame"))?,
82 "kind",
83 )?;
84
85 match kind.as_slice() {
86 b"subscribe" => {
87 let channel = take_bulk(
88 it.next().ok_or_else(|| invalid("subscribe: missing channel"))?,
89 "channel",
90 )?;
91 let count = take_int(
92 it.next().ok_or_else(|| invalid("subscribe: missing count"))?,
93 "count",
94 )?;
95 Ok(PubsubEvent::Subscribe { channel, count })
96 }
97 b"psubscribe" => {
98 let pattern = take_bulk(
99 it.next().ok_or_else(|| invalid("psubscribe: missing pattern"))?,
100 "pattern",
101 )?;
102 let count = take_int(
103 it.next().ok_or_else(|| invalid("psubscribe: missing count"))?,
104 "count",
105 )?;
106 Ok(PubsubEvent::Psubscribe { pattern, count })
107 }
108 b"unsubscribe" => {
109 let channel = take_bulk_or_nil(
110 it.next().ok_or_else(|| invalid("unsubscribe: missing channel"))?,
111 "channel",
112 )?;
113 let count = take_int(
114 it.next().ok_or_else(|| invalid("unsubscribe: missing count"))?,
115 "count",
116 )?;
117 Ok(PubsubEvent::Unsubscribe { channel, count })
118 }
119 b"punsubscribe" => {
120 let pattern = take_bulk_or_nil(
121 it.next()
122 .ok_or_else(|| invalid("punsubscribe: missing pattern"))?,
123 "pattern",
124 )?;
125 let count = take_int(
126 it.next()
127 .ok_or_else(|| invalid("punsubscribe: missing count"))?,
128 "count",
129 )?;
130 Ok(PubsubEvent::Punsubscribe { pattern, count })
131 }
132 b"message" => {
133 let channel = take_bulk(
134 it.next().ok_or_else(|| invalid("message: missing channel"))?,
135 "channel",
136 )?;
137 let payload = take_bulk(
138 it.next().ok_or_else(|| invalid("message: missing payload"))?,
139 "payload",
140 )?;
141 Ok(PubsubEvent::Message { channel, payload })
142 }
143 b"pmessage" => {
144 let pattern = take_bulk(
145 it.next().ok_or_else(|| invalid("pmessage: missing pattern"))?,
146 "pattern",
147 )?;
148 let channel = take_bulk(
149 it.next().ok_or_else(|| invalid("pmessage: missing channel"))?,
150 "channel",
151 )?;
152 let payload = take_bulk(
153 it.next().ok_or_else(|| invalid("pmessage: missing payload"))?,
154 "payload",
155 )?;
156 Ok(PubsubEvent::Pmessage {
157 pattern,
158 channel,
159 payload,
160 })
161 }
162 other => Err(invalid(format!(
163 "unknown pubsub kind: {}",
164 String::from_utf8_lossy(other)
165 ))),
166 }
167}
168
169fn take_bulk(r: Reply, field: &str) -> io::Result<Vec<u8>> {
170 match r {
171 Reply::Bulk(v) | Reply::Simple(v) => Ok(v),
172 other => Err(invalid(format!(
173 "pubsub field {field}: expected bulk, got {}",
174 shape(&other)
175 ))),
176 }
177}
178
179fn take_bulk_or_nil(r: Reply, field: &str) -> io::Result<Option<Vec<u8>>> {
180 match r {
181 Reply::Bulk(v) | Reply::Simple(v) => Ok(Some(v)),
182 Reply::Nil | Reply::Null => Ok(None),
183 other => Err(invalid(format!(
184 "pubsub field {field}: expected bulk/nil, got {}",
185 shape(&other)
186 ))),
187 }
188}
189
190fn take_int(r: Reply, field: &str) -> io::Result<i64> {
191 match r {
192 Reply::Int(n) => Ok(n),
193 other => Err(invalid(format!(
194 "pubsub field {field}: expected int, got {}",
195 shape(&other)
196 ))),
197 }
198}
199
200fn shape(r: &Reply) -> &'static str {
201 match r {
202 Reply::Simple(_) => "simple",
203 Reply::Error(_) => "error",
204 Reply::Int(_) => "int",
205 Reply::Bulk(_) => "bulk",
206 Reply::Nil | Reply::Null => "nil",
207 Reply::Array(_) => "array",
208 Reply::Map(_) => "map",
209 Reply::Set(_) => "set",
210 Reply::Double(_) => "double",
211 Reply::Boolean(_) => "boolean",
212 Reply::Verbatim { .. } => "verbatim",
213 Reply::BigNumber(_) => "bignumber",
214 Reply::Push(_) => "push",
215 Reply::BlobError(_) => "bloberror",
216 }
217}
218
219fn invalid(msg: impl Into<String>) -> io::Error {
220 io::Error::new(io::ErrorKind::InvalidData, msg.into())
221}