embedded_mqttc/network/
mqtt.rs

1use core::future::Future;
2
3use embytes_buffer::{BufferReader, BufferWriter};
4use mqttrs2::{decode_slice_with_len, encode_slice, Packet};
5use thiserror::Error;
6
7use super::fake::{BufferedStream, ServerConnection};
8
9#[derive(Debug, PartialEq, Clone, Error)]
10#[cfg_attr(feature = "defmt", derive(defmt::Format))]
11pub enum MqttPacketError {
12
13    #[error("error whie encoding / decoding packet")]
14    CodecError,
15    
16    #[error("error whie writing / reading packet")]
17    IoError(embedded_io_async::ErrorKind),
18
19    #[error("something wrong with the network")]
20    NetworkError(super::NetworkError),
21
22    #[error("buffer has not enaugh space left to write packet")]
23    NotEnaughBufferSpace
24}
25
26pub trait WriteMqttPacketMut {
27    fn write_mqtt_packet_sync(&mut self, packet: &Packet<'_>) -> Result<(), MqttPacketError>;
28}
29
30impl <T> WriteMqttPacketMut for T where T: BufferWriter {
31    fn write_mqtt_packet_sync(&mut self, packet: &Packet<'_>) -> Result<(), MqttPacketError> {
32        
33
34        let result = encode_slice(packet, self);
35
36        match result {
37            Ok(n) => {
38                self.commit(n).unwrap();
39                Ok(())
40            },
41            Err(mqttrs2::Error::WriteZero) => {
42                Err(MqttPacketError::NotEnaughBufferSpace)
43            }
44            Err(e) => {
45                error!("cannot write mqtt packet: {}", e);
46                Err(MqttPacketError::CodecError)
47            },
48        }
49    }
50}
51
52pub trait WriteMqttPacket {
53    fn write_mqtt_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttPacketError>>;
54}
55
56impl <const N: usize> WriteMqttPacket for  BufferedStream<N> {
57    async fn write_mqtt_packet(&self, packet: &Packet<'_>) -> Result<(), MqttPacketError> {
58        let mut bytes = [0; 256];
59        let n = encode_slice(packet, &mut bytes)
60            .map_err(|_| MqttPacketError::CodecError)?;
61
62        let mut to_write = &bytes[..n];
63
64        while ! to_write.is_empty() {
65            let n = self.write_async(&bytes[..n]).await 
66                .map_err(|e| MqttPacketError::IoError(e))?;
67
68            to_write = &to_write[n..];
69        }
70
71        Ok(())
72    }
73}
74
75impl <'a, const N: usize> WriteMqttPacket for ServerConnection<'a, N> {
76    fn write_mqtt_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttPacketError>>{
77        self.out_stream.write_mqtt_packet(packet)
78    }
79}
80
81pub trait ReadMqttPacket {
82    fn read_packet<'a>(&'a self) -> Result<Option<Packet<'a>>, MqttPacketError>;
83}
84
85impl <T> ReadMqttPacket for T where T: BufferReader + ?Sized {
86    fn read_packet<'a>(&'a self) -> Result<Option<Packet<'a>>, MqttPacketError> {
87        let nop = decode_slice_with_len(&self)
88            .map_err(|_| MqttPacketError::CodecError)?;
89
90        if let Some((n, p)) = nop {
91            self.add_bytes_read(n);
92            Ok(Some(p))
93        } else {
94            Ok(None)
95        }
96    }
97}
98