embedded_redis/subscription/
client.rs1use crate::commands::builder::CommandBuilder;
2use crate::commands::hello::HelloCommand;
3use crate::commands::Command;
4use crate::network::protocol::Protocol;
5use crate::network::timeout::Timeout;
6use crate::network::{Client, CommandErrors};
7use crate::subscription::messages::{DecodeError, Message as PushMessage, ToPushMessage};
8use bytes::Bytes;
9use embedded_nal::TcpClientStack;
10use embedded_time::Clock;
11
12#[derive(Debug, Eq, PartialEq, Clone)]
14pub enum Error {
15 CommandError(CommandErrors),
17 ClockError,
19 TcpError,
21 DecodeError,
23 Timeout,
26}
27
28#[derive(Debug, Clone)]
30pub struct Message {
31 pub channel: Bytes,
33
34 pub payload: Bytes,
36}
37
38#[derive(Debug)]
42pub struct Subscription<'a, N: TcpClientStack, C: Clock, P: Protocol, const L: usize>
43where
44 HelloCommand: Command<<P as Protocol>::FrameType>,
45 <P as Protocol>::FrameType: From<CommandBuilder>,
46 <P as Protocol>::FrameType: ToPushMessage,
47{
48 client: Client<'a, N, C, P>,
49
50 channels: [Bytes; L],
52
53 subscribed: bool,
55}
56
57impl<'a, N, C, P, const L: usize> Subscription<'a, N, C, P, L>
58where
59 N: TcpClientStack,
60 C: Clock,
61 P: Protocol,
62 HelloCommand: Command<<P as Protocol>::FrameType>,
63 <P as Protocol>::FrameType: From<CommandBuilder>,
64 <P as Protocol>::FrameType: ToPushMessage,
65{
66 pub fn new(client: Client<'a, N, C, P>, topics: [Bytes; L]) -> Self {
67 Self {
68 client,
69 channels: topics,
70 subscribed: false,
71 }
72 }
73
74 pub fn receive(&mut self) -> Result<Option<Message>, Error> {
76 loop {
77 let message = self.receive_message()?;
78
79 if message.is_none() {
80 return Ok(None);
81 }
82
83 if let PushMessage::Publish(channel, payload) = message.unwrap() {
84 return Ok(Some(Message { channel, payload }));
85 }
86 }
87 }
88
89 pub(crate) fn subscribe(mut self) -> Result<Self, Error> {
91 let mut cmd = CommandBuilder::new("SUBSCRIBE");
92 for topic in &self.channels {
93 cmd = cmd.arg(topic);
94 }
95
96 self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
97 self.wait_for_confirmation(|message| message == PushMessage::SubConfirmation(self.channels.len()))?;
98
99 self.subscribed = true;
100 Ok(self)
101 }
102
103 pub fn unsubscribe(mut self) -> Result<(), Error> {
107 self.close()
108 }
109
110 pub(crate) fn close(&mut self) -> Result<(), Error> {
112 self.subscribed = false;
113 let cmd = CommandBuilder::new("UNSUBSCRIBE");
114
115 self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
116 self.wait_for_confirmation(|message| message == PushMessage::UnSubConfirmation(0))?;
117
118 Ok(())
119 }
120
121 fn wait_for_confirmation<F: Fn(PushMessage) -> bool>(&self, is_confirmation: F) -> Result<(), Error> {
123 let timeout =
124 Timeout::new(self.client.clock, self.client.timeout_duration).map_err(|_| Error::ClockError)?;
125
126 while !timeout.expired().map_err(|_| Error::ClockError)? {
127 if let Some(message) = self.receive_message()? {
128 if is_confirmation(message) {
129 return Ok(());
130 }
131 }
132 }
133
134 Err(Error::Timeout)
135 }
136
137 fn receive_message(&self) -> Result<Option<PushMessage>, Error> {
139 loop {
141 if let Err(error) = self.client.network.receive_chunk() {
142 match error {
143 nb::Error::Other(_) => return Err(Error::TcpError),
144 nb::Error::WouldBlock => break,
145 };
146 }
147 }
148
149 let frame = self.client.network.take_next_frame();
150 if frame.is_none() {
151 return Ok(None);
152 }
153
154 match frame.unwrap().decode_push() {
155 Ok(message) => Ok(Some(message)),
156 Err(error) => match error {
157 DecodeError::ProtocolViolation => Err(Error::DecodeError),
158 DecodeError::IntegerOverflow => Err(Error::DecodeError),
159 },
160 }
161 }
162
163 #[cfg(test)]
165 pub(crate) fn set_unsubscribed(&mut self) {
166 self.subscribed = false;
167 }
168}
169
170impl<N, C, P, const L: usize> Drop for Subscription<'_, N, C, P, L>
171where
172 N: TcpClientStack,
173 C: Clock,
174 P: Protocol,
175 HelloCommand: Command<<P as Protocol>::FrameType>,
176 <P as Protocol>::FrameType: From<CommandBuilder>,
177 <P as Protocol>::FrameType: ToPushMessage,
178{
179 fn drop(&mut self) {
180 if self.subscribed {
181 let _ = self.close();
182 }
183 }
184}