mqtt_async_embedded/
transport.rs1#[derive(Debug, Copy, Clone)]
14#[cfg_attr(feature = "defmt", derive(defmt::Format))]
15pub struct ErrorPlaceHolder;
16
17pub trait MqttTransport {
22 type Error: core::fmt::Debug;
24
25 async fn send(&mut self, buf: &[u8]) -> Result<(), Self::Error>;
27
28 async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
32}
33
34impl TransportError for ErrorPlaceHolder {}
36
37pub trait TransportError: core::fmt::Debug {}
39
40#[cfg(feature = "transport-smoltcp")]
42pub struct TcpTransport<'a> {
43 socket: embassy_net::tcp::TcpSocket<'a>,
44 timeout: Duration,
45}
46
47#[cfg(feature = "transport-smoltcp")]
48impl<'a> TcpTransport<'a> {
49 pub fn new(socket: embassy_net::tcp::TcpSocket<'a>, timeout: Duration) -> Self {
51 Self { socket, timeout }
52 }
53
54 async fn read_with_timeout<'b>(
56 &'b mut self,
57 buf: &'b mut [u8],
58 ) -> Result<Result<usize, MqttError<embassy_net::tcp::Error>>, MqttError<embassy_net::tcp::Error>>
59 {
60 let read_fut = self.socket.read(buf).map(Ok);
62 let timer = Timer::after(self.timeout).map(|_| Err(MqttError::Timeout));
63
64 match futures::future::select(read_fut, timer).await {
65 futures::future::Either::Left((Ok(Ok(n)), _)) => {
66 if n == 0 {
67 Err(MqttError::Protocol(super::error::ProtocolError::InvalidResponse))
69 } else {
70 Ok(Ok(n))
71 }
72 }
73 futures::future::Either::Left((Ok(Err(e)), _)) => Ok(Err(MqttError::Transport(e))),
74 futures::future::Either::Right((Err(e), _)) => Err(e),
75 _ => unreachable!(),
76 }
77 }
78}
79
80#[cfg(feature = "transport-smoltcp")]
81impl<'a> MqttTransport for TcpTransport<'a> {
82 type Error = MqttError<embassy_net::tcp::Error>;
83
84 async fn send(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
85 self.socket.write_all(buf).await.map_err(MqttError::Transport)
86 }
87
88 async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
89 match self.read_with_timeout(buf).await {
90 Ok(Ok(n)) => Ok(n),
91 Ok(Err(e)) => Err(e),
92 Err(e) => Err(e),
93 }
94 }
95}
96