redis_pubsub/
message.rs

1use thiserror::Error;
2
3use super::parser;
4use crate::Error;
5
6#[derive(Debug)]
7pub enum Message {
8    Subscription {
9        channel: String,
10        subscriptions: i64,
11    },
12    Unsubscription {
13        channel: String,
14        subscriptions: i64,
15    },
16    Message {
17        channel: String,
18        message: String,
19    },
20    PatternSubscription {
21        channel: String,
22        subscriptions: i64,
23    },
24    PatternUnsubscription {
25        channel: String,
26        subscriptions: i64,
27    },
28    PatternMessage {
29        pattern: String,
30        channel: String,
31        message: String,
32    },
33    Connected,
34    Disconnected(Error),
35    Error(Error),
36}
37
38#[derive(Error, Debug)]
39pub enum ParserError {
40    #[error("The response has an invalid format.")]
41    MalformedResponse,
42    #[error("The provided channel was invalid.")]
43    InvalidChannel,
44    #[error("The provided amount of subscribers is invalid.")]
45    InvalidSubscriberCount,
46    #[error("The provided pattern is invalid.")]
47    InvalidPattern,
48}
49
50impl Message {
51    /// Parse the response to a message.
52    ///
53    /// # Errors
54    /// Returns an error if the response has unexpected types.
55    pub fn from_response(res: parser::Response) -> crate::Result<Self> {
56        // Make sure the response is a array.
57        let arr = match res {
58            parser::Response::Array(arr) => Ok(arr),
59            _ => Err(ParserError::MalformedResponse),
60        }?;
61
62        // Get the first element of the array.
63        let channel = match arr.get(0) {
64            Some(parser::Response::Bulk(channel)) => Ok(channel.as_str()),
65            _ => Err(ParserError::MalformedResponse),
66        }?;
67
68        // Match on the first element text.
69        match channel.to_lowercase().as_str() {
70            "subscribe" => Self::from_subscribe(&arr),
71            "unsubscribe" => Self::from_unsubscribe(&arr),
72            "message" => Self::from_message(&arr),
73            "pmessage" => Self::from_pmessage(&arr),
74            "psubscribe" => Self::from_psubscribe(&arr),
75            "punsubscribe" => Self::from_punsubscribe(&arr),
76            _ => Err(Error::ParserError(ParserError::MalformedResponse)),
77        }
78    }
79
80    /// parse the subscription message.
81    fn from_subscribe(res: &[parser::Response]) -> crate::Result<Self> {
82        let channel = match res.get(1) {
83            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
84            _ => Err(ParserError::InvalidChannel),
85        }?;
86
87        let subscriptions = match res.get(2) {
88            Some(parser::Response::Integer(subscriptions)) => Ok(*subscriptions),
89            _ => Err(ParserError::InvalidSubscriberCount),
90        }?;
91
92        Ok(Self::Subscription {
93            channel,
94            subscriptions,
95        })
96    }
97
98    fn from_psubscribe(res: &[parser::Response]) -> crate::Result<Self> {
99        let channel = match res.get(1) {
100            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
101            _ => Err(ParserError::InvalidChannel),
102        }?;
103
104        let subscriptions = match res.get(2) {
105            Some(parser::Response::Integer(subscriptions)) => Ok(*subscriptions),
106            _ => Err(ParserError::InvalidSubscriberCount),
107        }?;
108
109        Ok(Self::PatternSubscription {
110            channel,
111            subscriptions,
112        })
113    }
114
115    /// parse the unsubscription message.
116    fn from_unsubscribe(res: &[parser::Response]) -> crate::Result<Self> {
117        let channel = match res.get(1) {
118            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
119            _ => Err(ParserError::InvalidChannel),
120        }?;
121
122        let subscriptions = match res.get(2) {
123            Some(parser::Response::Integer(subscriptions)) => Ok(*subscriptions),
124            _ => Err(ParserError::InvalidSubscriberCount),
125        }?;
126
127        Ok(Self::Unsubscription {
128            channel,
129            subscriptions,
130        })
131    }
132
133    fn from_punsubscribe(res: &[parser::Response]) -> crate::Result<Self> {
134        let channel = match res.get(1) {
135            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
136            _ => Err(ParserError::InvalidChannel),
137        }?;
138
139        let subscriptions = match res.get(2) {
140            Some(parser::Response::Integer(subscriptions)) => Ok(*subscriptions),
141            _ => Err(ParserError::InvalidSubscriberCount),
142        }?;
143
144        Ok(Self::PatternUnsubscription {
145            channel,
146            subscriptions,
147        })
148    }
149
150    /// parse the response to a message.
151    fn from_message(res: &[parser::Response]) -> crate::Result<Self> {
152        let channel = match res.get(1) {
153            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
154            _ => Err(ParserError::InvalidChannel),
155        }?;
156
157        let message = match res.get(2) {
158            Some(parser::Response::Bulk(message)) => Ok((*message).clone()),
159            _ => Err(ParserError::InvalidSubscriberCount),
160        }?;
161
162        Ok(Self::Message { channel, message })
163    }
164
165    /// parse the response to a pattern message
166    fn from_pmessage(res: &[parser::Response]) -> crate::Result<Self> {
167        let pattern = match res.get(1) {
168            Some(parser::Response::Bulk(pattern)) => Ok((*pattern).clone()),
169            _ => Err(ParserError::InvalidPattern),
170        }?;
171
172        let channel = match res.get(2) {
173            Some(parser::Response::Bulk(channel)) => Ok((*channel).clone()),
174            _ => Err(ParserError::InvalidChannel),
175        }?;
176
177        let message = match res.get(3) {
178            Some(parser::Response::Bulk(message)) => Ok((*message).clone()),
179            _ => Err(ParserError::InvalidSubscriberCount),
180        }?;
181
182        Ok(Self::PatternMessage {
183            pattern,
184            channel,
185            message,
186        })
187    }
188}
189
190impl Message {
191    #[must_use]
192    #[inline]
193    pub const fn is_subscription(&self) -> bool {
194        matches!(self, Self::Subscription { .. })
195    }
196
197    #[must_use]
198    #[inline]
199    pub const fn is_pattern_subscription(&self) -> bool {
200        matches!(self, Self::PatternSubscription { .. })
201    }
202
203    #[must_use]
204    #[inline]
205    pub const fn is_unsubscription(&self) -> bool {
206        matches!(self, Self::Unsubscription { .. })
207    }
208
209    #[must_use]
210    #[inline]
211    pub const fn is_pattern_unsubscription(&self) -> bool {
212        matches!(self, Self::PatternUnsubscription { .. })
213    }
214
215    #[must_use]
216    #[inline]
217    pub const fn is_message(&self) -> bool {
218        matches!(self, Self::Message { .. })
219    }
220
221    #[must_use]
222    #[inline]
223    pub const fn is_pattern_message(&self) -> bool {
224        matches!(self, Self::PatternMessage { .. })
225    }
226
227    #[must_use]
228    #[inline]
229    pub const fn is_connected(&self) -> bool {
230        matches!(self, Self::Connected)
231    }
232
233    #[must_use]
234    #[inline]
235    pub const fn is_disconnected(&self) -> bool {
236        matches!(self, Self::Disconnected(_))
237    }
238
239    #[must_use]
240    #[inline]
241    pub const fn is_error(&self) -> bool {
242        matches!(self, Self::Error(_))
243    }
244}