embedded_mqttc/network/
mod.rs

1
2use core::future::Future;
3
4use embytes_buffer::{Buffer, BufferReader, BufferWriter, ReadWrite};
5use embedded_io_async::{ErrorType, Read, ReadReady, Write, WriteReady};
6use thiserror::Error;
7
8use crate::fmt::Debug2Format;
9
10#[cfg(feature = "embassy")]
11pub mod embassy;
12
13#[cfg(feature = "std")]
14pub mod std;
15
16pub mod fake;
17
18pub mod mqtt;
19
20#[derive(Debug, PartialEq, Clone, Error)]
21#[cfg_attr(feature = "defmt", derive(defmt::Format))]
22pub enum NetworkError {
23
24    #[error("Could not find host")]
25    HostNotFound,
26
27    #[error("DNS Request failed")]
28    DnsFailed,
29
30    #[error("sending / retrieving from network failed")]
31    ConnectionFailed,
32
33    #[cfg(feature = "embassy")]
34    #[error("failed to connect to tcp endpoint")]
35    ConnectError(embassy_net::tcp::ConnectError),
36}
37
38#[cfg(feature = "embassy")]
39impl From<embassy_net::tcp::ConnectError> for NetworkError {
40    fn from(value: embassy_net::tcp::ConnectError) -> Self {
41        Self::ConnectError(value)
42    }
43}
44
45
46
47pub trait TryRead: ErrorType {
48    fn try_read(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<usize, Self::Error>>;
49}
50
51impl <T> TryRead for T where T: Read + ReadReady{
52    async fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
53        if self.read_ready()? {
54            self.read(buf).await
55        } else {
56            Ok(0)
57        }
58    }
59}
60
61pub trait TryWrite: ErrorType {
62    fn try_write(&mut self, buf: &[u8]) -> impl Future<Output = Result<usize, Self::Error>>;
63}
64
65impl <T> TryWrite for T where T: Write + WriteReady{
66    async fn try_write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
67        if self.write_ready()? {
68            self.write(buf).await
69        } else {
70            Ok(0)
71        }
72    }
73}
74
75pub trait NetworkConnection: Read + Write + TryWrite + TryRead + ErrorType {
76
77    /// Used to establish a connection and reconnect after a connection fail
78    fn connect(&mut self) -> impl Future<Output = Result<(), NetworkError>>;
79    
80}
81
82pub trait NetwordSendReceive {
83    fn send_all(&mut self, buffer: &mut impl BufferReader) -> impl Future<Output = Result<(), NetworkError>>;
84    fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
85    fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
86
87    fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
88    fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
89
90}
91
92impl <C> NetwordSendReceive for C where C: NetworkConnection {
93
94    async fn send_all(&mut self, buffer: &mut impl BufferReader) -> Result<(), NetworkError> {
95        self.write_all(buffer)
96            .await.map_err(|e| {
97                error!("error sending to network: {}", Debug2Format(e));
98                NetworkError::ConnectionFailed
99            })?;
100
101        Ok(())
102    }
103
104    /// Send data from the buffer to the network and block is the network is not ready
105    async fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
106        let reader = buf.create_reader();
107        let result = self.write(&reader[..]).await
108            .map_err(|e| {
109                error!("error sending to network: {}", Debug2Format(e));
110                NetworkError::ConnectionFailed
111            });
112        match result {
113            Ok(n) => {
114                reader.add_bytes_read(n);
115                trace!("sent {} bytes to network", n);
116                Ok(n)
117            },
118            Err(e) => {
119                Err(e)
120            },
121        }
122    }
123
124    async fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
125
126        let reader = buf.create_reader();
127        let result = self.try_write(&reader[..]).await
128            .map_err(|e| {
129                error!("error try_sending to network: {}", Debug2Format(e));
130                NetworkError::ConnectionFailed}
131            );
132        match result {
133            Ok(n) => {
134                reader.add_bytes_read(n);
135                trace!("sent {} bytes to network", n);
136                Ok(n)
137            },
138            Err(e) => {
139                Err(e)
140            },
141        }
142    }
143
144    async fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
145        if ! buf.ensure_remaining_capacity() {
146            warn!("cannot receive from network: buffer is full");
147            return Ok(0);
148        }
149        
150        let mut writer = buf.create_writer();
151
152        let result = self.read(&mut writer).await
153            .map_err(|e| {
154                error!("error receive from network: {}", Debug2Format(e));
155                NetworkError::ConnectionFailed
156            });
157
158        match result {
159            Ok(n) => {
160                // Error is unwrapped because read() should ensure that not too many bytes are written
161                writer.commit(n).unwrap();
162
163                trace!("received {} bytes from network", n);
164                Ok(n)
165            },
166            Err(e) => {
167                Err(e)
168            },
169        }
170    }
171
172    async fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
173        if ! buf.ensure_remaining_capacity() {
174            warn!("cannot receive from network: buffer is full");
175            return Ok(0);
176        }
177        
178        let mut writer = buf.create_writer();
179
180        let result = self.try_read(&mut writer).await
181            .map_err(|e| {
182                error!("error try_receive from network: {}", Debug2Format(e));
183                NetworkError::ConnectionFailed
184            });
185
186        match result {
187            Ok(n) => {
188                // Error is unwrapped because read() should ensure that not too many bytes are written
189                writer.commit(n).unwrap();
190
191                trace!("received {} bytes from network", n);
192                Ok(n)
193            },
194            Err(e) => {
195                Err(e)
196            },
197        }
198    }
199
200}