embedded_redis/subscription/
client.rs

1use crate::commands::builder::CommandBuilder;
2use crate::commands::hello::HelloCommand;
3use crate::commands::Command;
4use crate::network::protocol::Protocol;
5use crate::network::timeout::Timeout;
6use crate::network::{Client, CommandErrors};
7use crate::subscription::messages::{DecodeError, Message as PushMessage, ToPushMessage};
8use bytes::Bytes;
9use embedded_nal::TcpClientStack;
10use embedded_time::Clock;
11
12/// Subscription errors
13#[derive(Debug, Eq, PartialEq, Clone)]
14pub enum Error {
15    /// Error while sending SUBSCRIBE or UNSUBSCRIBE command
16    CommandError(CommandErrors),
17    /// Upstream time error
18    ClockError,
19    /// Network error receiving or sending data
20    TcpError,
21    /// Error while decoding a push message. Either Redis sent invalid data or there is a decoder bug.
22    DecodeError,
23    /// Subscription or Unsubscription was not confirmed by Redis within time limit. Its recommended to close/reconnect the socket to avoid
24    /// subsequent errors based on invalid state.
25    Timeout,
26}
27
28/// A published subscription message
29#[derive(Debug, Clone)]
30pub struct Message {
31    /// The channel the message has been published to
32    pub channel: Bytes,
33
34    /// The actual payload
35    pub payload: Bytes,
36}
37
38/// Client for handling subscriptions
39///
40/// L: Number of subscribed topics
41#[derive(Debug)]
42pub struct Subscription<'a, N: TcpClientStack, C: Clock, P: Protocol, const L: usize>
43where
44    HelloCommand: Command<<P as Protocol>::FrameType>,
45    <P as Protocol>::FrameType: From<CommandBuilder>,
46    <P as Protocol>::FrameType: ToPushMessage,
47{
48    client: Client<'a, N, C, P>,
49
50    /// List of subscribed topics
51    channels: [Bytes; L],
52
53    /// Confirmed + active subscription
54    subscribed: bool,
55}
56
57impl<'a, N, C, P, const L: usize> Subscription<'a, N, C, P, L>
58where
59    N: TcpClientStack,
60    C: Clock,
61    P: Protocol,
62    HelloCommand: Command<<P as Protocol>::FrameType>,
63    <P as Protocol>::FrameType: From<CommandBuilder>,
64    <P as Protocol>::FrameType: ToPushMessage,
65{
66    pub fn new(client: Client<'a, N, C, P>, topics: [Bytes; L]) -> Self {
67        Self {
68            client,
69            channels: topics,
70            subscribed: false,
71        }
72    }
73
74    /// Receives a message. Returns None in case no message is pending
75    pub fn receive(&mut self) -> Result<Option<Message>, Error> {
76        loop {
77            let message = self.receive_message()?;
78
79            if message.is_none() {
80                return Ok(None);
81            }
82
83            if let PushMessage::Publish(channel, payload) = message.unwrap() {
84                return Ok(Some(Message { channel, payload }));
85            }
86        }
87    }
88
89    /// Starts the subscription and waits for confirmation
90    pub(crate) fn subscribe(mut self) -> Result<Self, Error> {
91        let mut cmd = CommandBuilder::new("SUBSCRIBE");
92        for topic in &self.channels {
93            cmd = cmd.arg(topic);
94        }
95
96        self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
97        self.wait_for_confirmation(|message| message == PushMessage::SubConfirmation(self.channels.len()))?;
98
99        self.subscribed = true;
100        Ok(self)
101    }
102
103    /// Unsubscribes from all topics and waits for confirmation
104    ///
105    /// *If this fails, it's recommended to clos the connection to avoid subsequent errors caused by invalid state*
106    pub fn unsubscribe(mut self) -> Result<(), Error> {
107        self.close()
108    }
109
110    /// Unsubscribes from all topics and waits for confirmation
111    pub(crate) fn close(&mut self) -> Result<(), Error> {
112        self.subscribed = false;
113        let cmd = CommandBuilder::new("UNSUBSCRIBE");
114
115        self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
116        self.wait_for_confirmation(|message| message == PushMessage::UnSubConfirmation(0))?;
117
118        Ok(())
119    }
120
121    /// Waits for the confirmation of all topics
122    fn wait_for_confirmation<F: Fn(PushMessage) -> bool>(&self, is_confirmation: F) -> Result<(), Error> {
123        let timeout =
124            Timeout::new(self.client.clock, self.client.timeout_duration).map_err(|_| Error::ClockError)?;
125
126        while !timeout.expired().map_err(|_| Error::ClockError)? {
127            if let Some(message) = self.receive_message()? {
128                if is_confirmation(message) {
129                    return Ok(());
130                }
131            }
132        }
133
134        Err(Error::Timeout)
135    }
136
137    /// Receives and decodes the next message. Returns None in case no message is pending or not complete yet.
138    fn receive_message(&self) -> Result<Option<PushMessage>, Error> {
139        // Receive all pending data
140        loop {
141            if let Err(error) = self.client.network.receive_chunk() {
142                match error {
143                    nb::Error::Other(_) => return Err(Error::TcpError),
144                    nb::Error::WouldBlock => break,
145                };
146            }
147        }
148
149        let frame = self.client.network.take_next_frame();
150        if frame.is_none() {
151            return Ok(None);
152        }
153
154        match frame.unwrap().decode_push() {
155            Ok(message) => Ok(Some(message)),
156            Err(error) => match error {
157                DecodeError::ProtocolViolation => Err(Error::DecodeError),
158                DecodeError::IntegerOverflow => Err(Error::DecodeError),
159            },
160        }
161    }
162
163    /// Prevents the automatic unsubscription when client is dropped
164    #[cfg(test)]
165    pub(crate) fn set_unsubscribed(&mut self) {
166        self.subscribed = false;
167    }
168}
169
170impl<N, C, P, const L: usize> Drop for Subscription<'_, N, C, P, L>
171where
172    N: TcpClientStack,
173    C: Clock,
174    P: Protocol,
175    HelloCommand: Command<<P as Protocol>::FrameType>,
176    <P as Protocol>::FrameType: From<CommandBuilder>,
177    <P as Protocol>::FrameType: ToPushMessage,
178{
179    fn drop(&mut self) {
180        if self.subscribed {
181            let _ = self.close();
182        }
183    }
184}