ntex_redis/cmd/
pubsub.rs

1use super::{utils, Command, CommandError};
2use ntex::util::{Bytes, Either};
3use std::convert::TryFrom;
4
5use crate::codec::{BulkString, Request, Response};
6
7const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
8const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
9const TYPE_SSUBSCRIBE: Bytes = Bytes::from_static(b"ssubscribe");
10const TYPE_SUNSUBSCRIBE: Bytes = Bytes::from_static(b"sunsubscribe");
11const TYPE_PSUBSCRIBE: Bytes = Bytes::from_static(b"psubscribe");
12const TYPE_PUNSUBSCRIBE: Bytes = Bytes::from_static(b"punsubscribe");
13const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
14const TYPE_SMESSAGE: Bytes = Bytes::from_static(b"smessage");
15const TYPE_PMESSAGE: Bytes = Bytes::from_static(b"pmessage");
16
17pub trait PubSubCommand {}
18
19#[derive(Debug, PartialEq, Eq)]
20pub enum SubscribeItem {
21    Subscribed(Bytes),
22    UnSubscribed(Bytes),
23    Message {
24        pattern: Option<Bytes>,
25        channel: Bytes,
26        payload: Bytes,
27    },
28}
29
30struct MessagePayload(Either<Bytes, i64>);
31
32impl TryFrom<Response> for MessagePayload {
33    type Error = (&'static str, Response);
34
35    fn try_from(val: Response) -> Result<Self, Self::Error> {
36        match val {
37            Response::Bytes(bytes) => Ok(MessagePayload(Either::Left(bytes))),
38            Response::Integer(number) => Ok(MessagePayload(Either::Right(number))),
39            _ => Err(("Not a bytes object or integer", val)),
40        }
41    }
42}
43
44impl TryFrom<Response> for SubscribeItem {
45    type Error = CommandError;
46
47    fn try_from(val: Response) -> Result<Self, Self::Error> {
48        let (mtype, pattern, channel, payload) = match val {
49            Response::Array(ary) => match ary.len() {
50                // subscribe or ssubscribe message
51                3 => {
52                    let mut ary_iter = ary.into_iter();
53                    (
54                        Bytes::try_from(ary_iter.next().expect("No value"))?,
55                        None,
56                        Bytes::try_from(ary_iter.next().expect("No value"))?,
57                        MessagePayload::try_from(ary_iter.next().expect("No value"))?,
58                    )
59                }
60                // psubscribe message
61                4 => {
62                    let mut ary_iter = ary.into_iter();
63                    (
64                        Bytes::try_from(ary_iter.next().expect("No value"))?,
65                        Some(Bytes::try_from(ary_iter.next().expect("No value"))?),
66                        Bytes::try_from(ary_iter.next().expect("No value"))?,
67                        MessagePayload::try_from(ary_iter.next().expect("No value"))?,
68                    )
69                }
70                _ => {
71                    return Err(CommandError::Output(
72                        "Array needs to be 3 or 4 elements",
73                        Response::Array(ary),
74                    ))
75                }
76            },
77            _ => return Err(CommandError::Output("Unexpected value", val)),
78        };
79
80        match &mtype {
81            s if s == &TYPE_SUBSCRIBE || s == &TYPE_SSUBSCRIBE || s == &TYPE_PSUBSCRIBE => {
82                Ok(SubscribeItem::Subscribed(channel))
83            }
84            s if s == &TYPE_UNSUBSCRIBE || s == &TYPE_SUNSUBSCRIBE || s == &TYPE_PUNSUBSCRIBE => {
85                Ok(SubscribeItem::UnSubscribed(channel))
86            }
87            s if s == &TYPE_MESSAGE || s == &TYPE_SMESSAGE || s == &TYPE_PMESSAGE => {
88                if let Some(payload) = payload.0.left() {
89                    Ok(SubscribeItem::Message {
90                        pattern,
91                        channel,
92                        payload,
93                    })
94                } else {
95                    Err(CommandError::Output(
96                        "Subscription message payload is not bytes",
97                        Response::Nil,
98                    ))
99                }
100            }
101            _ => Err(CommandError::Output(
102                "Subscription message type unknown",
103                Response::Bytes(mtype),
104            )),
105        }
106    }
107}
108
109pub struct SubscribeOutputCommand(pub(crate) Request);
110
111impl Command for SubscribeOutputCommand {
112    type Output = SubscribeItem;
113
114    fn to_request(self) -> Request {
115        self.0
116    }
117
118    fn to_output(val: Response) -> Result<Self::Output, CommandError> {
119        SubscribeItem::try_from(val)
120    }
121}
122
123pub struct UnSubscribeOutputCommand(pub(crate) Request);
124
125impl Command for UnSubscribeOutputCommand {
126    type Output = SubscribeItem;
127
128    fn to_request(self) -> Request {
129        self.0
130    }
131
132    fn to_output(val: Response) -> Result<Self::Output, CommandError> {
133        SubscribeItem::try_from(val)
134    }
135}
136
137/// PUBLISH redis command
138pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
139where
140    BulkString: From<T> + From<V>,
141{
142    utils::IntOutputCommand(Request::Array(vec![
143        Request::from_static("PUBLISH"),
144        Request::BulkString(key.into()),
145        Request::BulkString(value.into()),
146    ]))
147}
148
149/// SPUBLISH redis command
150pub fn SPublish<T, V>(key: T, value: V) -> utils::IntOutputCommand
151where
152    BulkString: From<T> + From<V>,
153{
154    utils::IntOutputCommand(Request::Array(vec![
155        Request::from_static("SPUBLISH"),
156        Request::BulkString(key.into()),
157        Request::BulkString(value.into()),
158    ]))
159}
160
161/// SUBSCRIBE redis command
162pub fn Subscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
163where
164    BulkString: From<T>,
165{
166    let mut req = Request::from_static("SUBSCRIBE");
167    for channel in channels {
168        req = req.add(Request::BulkString(channel.into()));
169    }
170    SubscribeOutputCommand(req)
171}
172
173/// UNSUBSCRIBE redis command
174pub fn UnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
175where
176    BulkString: From<T>,
177{
178    let mut req = Request::from_static("UNSUBSCRIBE");
179    if let Some(channels) = channels {
180        for channel in channels {
181            req = req.add(Request::BulkString(channel.into()));
182        }
183    }
184    UnSubscribeOutputCommand(req)
185}
186
187/// SSUBSCRIBE redis command
188pub fn SSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
189where
190    BulkString: From<T>,
191{
192    let mut req = Request::from_static("SSUBSCRIBE");
193    for channel in channels {
194        req = req.add(Request::BulkString(channel.into()));
195    }
196    SubscribeOutputCommand(req)
197}
198
199/// SUNSUBSCRIBE redis command
200pub fn SUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
201where
202    BulkString: From<T>,
203{
204    let mut req = Request::from_static("SUNSUBSCRIBE");
205    if let Some(channels) = channels {
206        for channel in channels {
207            req = req.add(Request::BulkString(channel.into()));
208        }
209    }
210    UnSubscribeOutputCommand(req)
211}
212
213/// PSUBSCRIBE redis command
214pub fn PSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
215where
216    BulkString: From<T>,
217{
218    let mut req = Request::from_static("PSUBSCRIBE");
219    for channel in channels {
220        req = req.add(Request::BulkString(channel.into()));
221    }
222    SubscribeOutputCommand(req)
223}
224
225/// PUNSUBSCRIBE redis command
226pub fn PUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
227where
228    BulkString: From<T>,
229{
230    let mut req = Request::from_static("PUNSUBSCRIBE");
231    if let Some(channels) = channels {
232        for channel in channels {
233            req = req.add(Request::BulkString(channel.into()));
234        }
235    }
236    UnSubscribeOutputCommand(req)
237}
238
239impl PubSubCommand for SubscribeOutputCommand {}
240impl PubSubCommand for UnSubscribeOutputCommand {}