embedded_mqttc/network/
mqtt.rs1use core::future::Future;
2
3use embytes_buffer::{BufferReader, BufferWriter};
4use mqttrs2::{decode_slice_with_len, encode_slice, Packet};
5use thiserror::Error;
6
7use super::fake::{BufferedStream, ServerConnection};
8
9#[derive(Debug, PartialEq, Clone, Error)]
10#[cfg_attr(feature = "defmt", derive(defmt::Format))]
11pub enum MqttPacketError {
12
13 #[error("error whie encoding / decoding packet")]
14 CodecError,
15
16 #[error("error whie writing / reading packet")]
17 IoError(embedded_io_async::ErrorKind),
18
19 #[error("something wrong with the network")]
20 NetworkError(super::NetworkError),
21
22 #[error("buffer has not enaugh space left to write packet")]
23 NotEnaughBufferSpace
24}
25
26pub trait WriteMqttPacketMut {
27 fn write_mqtt_packet_sync(&mut self, packet: &Packet<'_>) -> Result<(), MqttPacketError>;
28}
29
30impl <T> WriteMqttPacketMut for T where T: BufferWriter {
31 fn write_mqtt_packet_sync(&mut self, packet: &Packet<'_>) -> Result<(), MqttPacketError> {
32
33
34 let result = encode_slice(packet, self);
35
36 match result {
37 Ok(n) => {
38 self.commit(n).unwrap();
39 Ok(())
40 },
41 Err(mqttrs2::Error::WriteZero) => {
42 Err(MqttPacketError::NotEnaughBufferSpace)
43 }
44 Err(e) => {
45 error!("cannot write mqtt packet: {}", e);
46 Err(MqttPacketError::CodecError)
47 },
48 }
49 }
50}
51
52pub trait WriteMqttPacket {
53 fn write_mqtt_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttPacketError>>;
54}
55
56impl <const N: usize> WriteMqttPacket for BufferedStream<N> {
57 async fn write_mqtt_packet(&self, packet: &Packet<'_>) -> Result<(), MqttPacketError> {
58 let mut bytes = [0; 256];
59 let n = encode_slice(packet, &mut bytes)
60 .map_err(|_| MqttPacketError::CodecError)?;
61
62 let mut to_write = &bytes[..n];
63
64 while ! to_write.is_empty() {
65 let n = self.write_async(&bytes[..n]).await
66 .map_err(|e| MqttPacketError::IoError(e))?;
67
68 to_write = &to_write[n..];
69 }
70
71 Ok(())
72 }
73}
74
75impl <'a, const N: usize> WriteMqttPacket for ServerConnection<'a, N> {
76 fn write_mqtt_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttPacketError>>{
77 self.out_stream.write_mqtt_packet(packet)
78 }
79}
80
81pub trait ReadMqttPacket {
82 fn read_packet<'a>(&'a self) -> Result<Option<Packet<'a>>, MqttPacketError>;
83}
84
85impl <T> ReadMqttPacket for T where T: BufferReader + ?Sized {
86 fn read_packet<'a>(&'a self) -> Result<Option<Packet<'a>>, MqttPacketError> {
87 let nop = decode_slice_with_len(&self)
88 .map_err(|_| MqttPacketError::CodecError)?;
89
90 if let Some((n, p)) = nop {
91 self.add_bytes_read(n);
92 Ok(Some(p))
93 } else {
94 Ok(None)
95 }
96 }
97}
98