1use super::{utils, Command, CommandError};
2use ntex::util::{Bytes, Either};
3use std::convert::TryFrom;
4
5use crate::codec::{BulkString, Request, Response};
6
7const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
8const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
9const TYPE_SSUBSCRIBE: Bytes = Bytes::from_static(b"ssubscribe");
10const TYPE_SUNSUBSCRIBE: Bytes = Bytes::from_static(b"sunsubscribe");
11const TYPE_PSUBSCRIBE: Bytes = Bytes::from_static(b"psubscribe");
12const TYPE_PUNSUBSCRIBE: Bytes = Bytes::from_static(b"punsubscribe");
13const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
14const TYPE_SMESSAGE: Bytes = Bytes::from_static(b"smessage");
15const TYPE_PMESSAGE: Bytes = Bytes::from_static(b"pmessage");
16
17pub trait PubSubCommand {}
18
19#[derive(Debug, PartialEq, Eq)]
20pub enum SubscribeItem {
21 Subscribed(Bytes),
22 UnSubscribed(Bytes),
23 Message {
24 pattern: Option<Bytes>,
25 channel: Bytes,
26 payload: Bytes,
27 },
28}
29
30struct MessagePayload(Either<Bytes, i64>);
31
32impl TryFrom<Response> for MessagePayload {
33 type Error = (&'static str, Response);
34
35 fn try_from(val: Response) -> Result<Self, Self::Error> {
36 match val {
37 Response::Bytes(bytes) => Ok(MessagePayload(Either::Left(bytes))),
38 Response::Integer(number) => Ok(MessagePayload(Either::Right(number))),
39 _ => Err(("Not a bytes object or integer", val)),
40 }
41 }
42}
43
44impl TryFrom<Response> for SubscribeItem {
45 type Error = CommandError;
46
47 fn try_from(val: Response) -> Result<Self, Self::Error> {
48 let (mtype, pattern, channel, payload) = match val {
49 Response::Array(ary) => match ary.len() {
50 3 => {
52 let mut ary_iter = ary.into_iter();
53 (
54 Bytes::try_from(ary_iter.next().expect("No value"))?,
55 None,
56 Bytes::try_from(ary_iter.next().expect("No value"))?,
57 MessagePayload::try_from(ary_iter.next().expect("No value"))?,
58 )
59 }
60 4 => {
62 let mut ary_iter = ary.into_iter();
63 (
64 Bytes::try_from(ary_iter.next().expect("No value"))?,
65 Some(Bytes::try_from(ary_iter.next().expect("No value"))?),
66 Bytes::try_from(ary_iter.next().expect("No value"))?,
67 MessagePayload::try_from(ary_iter.next().expect("No value"))?,
68 )
69 }
70 _ => {
71 return Err(CommandError::Output(
72 "Array needs to be 3 or 4 elements",
73 Response::Array(ary),
74 ))
75 }
76 },
77 _ => return Err(CommandError::Output("Unexpected value", val)),
78 };
79
80 match &mtype {
81 s if s == &TYPE_SUBSCRIBE || s == &TYPE_SSUBSCRIBE || s == &TYPE_PSUBSCRIBE => {
82 Ok(SubscribeItem::Subscribed(channel))
83 }
84 s if s == &TYPE_UNSUBSCRIBE || s == &TYPE_SUNSUBSCRIBE || s == &TYPE_PUNSUBSCRIBE => {
85 Ok(SubscribeItem::UnSubscribed(channel))
86 }
87 s if s == &TYPE_MESSAGE || s == &TYPE_SMESSAGE || s == &TYPE_PMESSAGE => {
88 if let Some(payload) = payload.0.left() {
89 Ok(SubscribeItem::Message {
90 pattern,
91 channel,
92 payload,
93 })
94 } else {
95 Err(CommandError::Output(
96 "Subscription message payload is not bytes",
97 Response::Nil,
98 ))
99 }
100 }
101 _ => Err(CommandError::Output(
102 "Subscription message type unknown",
103 Response::Bytes(mtype),
104 )),
105 }
106 }
107}
108
109pub struct SubscribeOutputCommand(pub(crate) Request);
110
111impl Command for SubscribeOutputCommand {
112 type Output = SubscribeItem;
113
114 fn to_request(self) -> Request {
115 self.0
116 }
117
118 fn to_output(val: Response) -> Result<Self::Output, CommandError> {
119 SubscribeItem::try_from(val)
120 }
121}
122
123pub struct UnSubscribeOutputCommand(pub(crate) Request);
124
125impl Command for UnSubscribeOutputCommand {
126 type Output = SubscribeItem;
127
128 fn to_request(self) -> Request {
129 self.0
130 }
131
132 fn to_output(val: Response) -> Result<Self::Output, CommandError> {
133 SubscribeItem::try_from(val)
134 }
135}
136
137pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
139where
140 BulkString: From<T> + From<V>,
141{
142 utils::IntOutputCommand(Request::Array(vec![
143 Request::from_static("PUBLISH"),
144 Request::BulkString(key.into()),
145 Request::BulkString(value.into()),
146 ]))
147}
148
149pub fn SPublish<T, V>(key: T, value: V) -> utils::IntOutputCommand
151where
152 BulkString: From<T> + From<V>,
153{
154 utils::IntOutputCommand(Request::Array(vec![
155 Request::from_static("SPUBLISH"),
156 Request::BulkString(key.into()),
157 Request::BulkString(value.into()),
158 ]))
159}
160
161pub fn Subscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
163where
164 BulkString: From<T>,
165{
166 let mut req = Request::from_static("SUBSCRIBE");
167 for channel in channels {
168 req = req.add(Request::BulkString(channel.into()));
169 }
170 SubscribeOutputCommand(req)
171}
172
173pub fn UnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
175where
176 BulkString: From<T>,
177{
178 let mut req = Request::from_static("UNSUBSCRIBE");
179 if let Some(channels) = channels {
180 for channel in channels {
181 req = req.add(Request::BulkString(channel.into()));
182 }
183 }
184 UnSubscribeOutputCommand(req)
185}
186
187pub fn SSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
189where
190 BulkString: From<T>,
191{
192 let mut req = Request::from_static("SSUBSCRIBE");
193 for channel in channels {
194 req = req.add(Request::BulkString(channel.into()));
195 }
196 SubscribeOutputCommand(req)
197}
198
199pub fn SUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
201where
202 BulkString: From<T>,
203{
204 let mut req = Request::from_static("SUNSUBSCRIBE");
205 if let Some(channels) = channels {
206 for channel in channels {
207 req = req.add(Request::BulkString(channel.into()));
208 }
209 }
210 UnSubscribeOutputCommand(req)
211}
212
213pub fn PSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
215where
216 BulkString: From<T>,
217{
218 let mut req = Request::from_static("PSUBSCRIBE");
219 for channel in channels {
220 req = req.add(Request::BulkString(channel.into()));
221 }
222 SubscribeOutputCommand(req)
223}
224
225pub fn PUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
227where
228 BulkString: From<T>,
229{
230 let mut req = Request::from_static("PUNSUBSCRIBE");
231 if let Some(channels) = channels {
232 for channel in channels {
233 req = req.add(Request::BulkString(channel.into()));
234 }
235 }
236 UnSubscribeOutputCommand(req)
237}
238
239impl PubSubCommand for SubscribeOutputCommand {}
240impl PubSubCommand for UnSubscribeOutputCommand {}