use crate::commands::builder::CommandBuilder;
use crate::commands::hello::HelloCommand;
use crate::commands::Command;
use crate::network::protocol::Protocol;
use crate::network::timeout::Timeout;
use crate::network::{Client, CommandErrors};
use crate::subscription::messages::{DecodeError, Message as PushMessage, ToPushMessage};
use bytes::Bytes;
use embedded_nal::TcpClientStack;
use embedded_time::Clock;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum Error {
CommandError(CommandErrors),
ClockError,
TcpError,
DecodeError,
Timeout,
}
#[derive(Debug, Clone)]
pub struct Message {
pub channel: Bytes,
pub payload: Bytes,
}
#[derive(Debug)]
pub struct Subscription<'a, N: TcpClientStack, C: Clock, P: Protocol, const L: usize>
where
HelloCommand: Command<<P as Protocol>::FrameType>,
<P as Protocol>::FrameType: From<CommandBuilder>,
<P as Protocol>::FrameType: ToPushMessage,
{
client: Client<'a, N, C, P>,
channels: [Bytes; L],
subscribed: bool,
}
impl<'a, N, C, P, const L: usize> Subscription<'a, N, C, P, L>
where
N: TcpClientStack,
C: Clock,
P: Protocol,
HelloCommand: Command<<P as Protocol>::FrameType>,
<P as Protocol>::FrameType: From<CommandBuilder>,
<P as Protocol>::FrameType: ToPushMessage,
{
pub fn new(client: Client<'a, N, C, P>, topics: [Bytes; L]) -> Self {
Self {
client,
channels: topics,
subscribed: false,
}
}
pub fn receive(&mut self) -> Result<Option<Message>, Error> {
loop {
let message = self.receive_message()?;
if message.is_none() {
return Ok(None);
}
if let PushMessage::Publish(channel, payload) = message.unwrap() {
return Ok(Some(Message { channel, payload }));
}
}
}
pub(crate) fn subscribe(mut self) -> Result<Self, Error> {
let mut cmd = CommandBuilder::new("SUBSCRIBE");
for topic in &self.channels {
cmd = cmd.arg(topic);
}
self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
self.wait_for_confirmation(|message| message == PushMessage::SubConfirmation(self.channels.len()))?;
self.subscribed = true;
Ok(self)
}
pub fn unsubscribe(mut self) -> Result<(), Error> {
self.close()
}
pub(crate) fn close(&mut self) -> Result<(), Error> {
self.subscribed = false;
let cmd = CommandBuilder::new("UNSUBSCRIBE");
self.client.network.send_frame(cmd.into()).map_err(Error::CommandError)?;
self.wait_for_confirmation(|message| message == PushMessage::UnSubConfirmation(0))?;
Ok(())
}
fn wait_for_confirmation<F: Fn(PushMessage) -> bool>(&self, is_confirmation: F) -> Result<(), Error> {
let timeout =
Timeout::new(self.client.clock, self.client.timeout_duration).map_err(|_| Error::ClockError)?;
while !timeout.expired().map_err(|_| Error::ClockError)? {
if let Some(message) = self.receive_message()? {
if is_confirmation(message) {
return Ok(());
}
}
}
Err(Error::Timeout)
}
fn receive_message(&self) -> Result<Option<PushMessage>, Error> {
loop {
if let Err(error) = self.client.network.receive_chunk() {
match error {
nb::Error::Other(_) => return Err(Error::TcpError),
nb::Error::WouldBlock => break,
};
}
}
let frame = self.client.network.take_next_frame();
if frame.is_none() {
return Ok(None);
}
match frame.unwrap().decode_push() {
Ok(message) => Ok(Some(message)),
Err(error) => match error {
DecodeError::ProtocolViolation => Err(Error::DecodeError),
DecodeError::IntegerOverflow => Err(Error::DecodeError),
},
}
}
#[cfg(test)]
pub(crate) fn set_unsubscribed(&mut self) {
self.subscribed = false;
}
}
impl<N, C, P, const L: usize> Drop for Subscription<'_, N, C, P, L>
where
N: TcpClientStack,
C: Clock,
P: Protocol,
HelloCommand: Command<<P as Protocol>::FrameType>,
<P as Protocol>::FrameType: From<CommandBuilder>,
<P as Protocol>::FrameType: ToPushMessage,
{
fn drop(&mut self) {
if self.subscribed {
let _ = self.close();
}
}
}