embedded_mqttc/network/
mod.rs1
2use core::future::Future;
3
4use embytes_buffer::{Buffer, BufferReader, BufferWriter, ReadWrite};
5use embedded_io_async::{ErrorType, Read, ReadReady, Write, WriteReady};
6use thiserror::Error;
7
8use crate::fmt::Debug2Format;
9
10#[cfg(feature = "embassy")]
11pub mod embassy;
12
13#[cfg(feature = "std")]
14pub mod std;
15
16pub mod fake;
17
18pub mod mqtt;
19
20#[derive(Debug, PartialEq, Clone, Error)]
21#[cfg_attr(feature = "defmt", derive(defmt::Format))]
22pub enum NetworkError {
23
24 #[error("Could not find host")]
25 HostNotFound,
26
27 #[error("DNS Request failed")]
28 DnsFailed,
29
30 #[error("sending / retrieving from network failed")]
31 ConnectionFailed,
32
33 #[cfg(feature = "embassy")]
34 #[error("failed to connect to tcp endpoint")]
35 ConnectError(embassy_net::tcp::ConnectError),
36}
37
38#[cfg(feature = "embassy")]
39impl From<embassy_net::tcp::ConnectError> for NetworkError {
40 fn from(value: embassy_net::tcp::ConnectError) -> Self {
41 Self::ConnectError(value)
42 }
43}
44
45
46
47pub trait TryRead: ErrorType {
48 fn try_read(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<usize, Self::Error>>;
49}
50
51impl <T> TryRead for T where T: Read + ReadReady{
52 async fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
53 if self.read_ready()? {
54 self.read(buf).await
55 } else {
56 Ok(0)
57 }
58 }
59}
60
61pub trait TryWrite: ErrorType {
62 fn try_write(&mut self, buf: &[u8]) -> impl Future<Output = Result<usize, Self::Error>>;
63}
64
65impl <T> TryWrite for T where T: Write + WriteReady{
66 async fn try_write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
67 if self.write_ready()? {
68 self.write(buf).await
69 } else {
70 Ok(0)
71 }
72 }
73}
74
75pub trait NetworkConnection: Read + Write + TryWrite + TryRead + ErrorType {
76
77 fn connect(&mut self) -> impl Future<Output = Result<(), NetworkError>>;
79
80}
81
82pub trait NetwordSendReceive {
83 fn send_all(&mut self, buffer: &mut impl BufferReader) -> impl Future<Output = Result<(), NetworkError>>;
84 fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
85 fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
86
87 fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
88 fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
89
90}
91
92impl <C> NetwordSendReceive for C where C: NetworkConnection {
93
94 async fn send_all(&mut self, buffer: &mut impl BufferReader) -> Result<(), NetworkError> {
95 self.write_all(buffer)
96 .await.map_err(|e| {
97 error!("error sending to network: {}", Debug2Format(e));
98 NetworkError::ConnectionFailed
99 })?;
100
101 Ok(())
102 }
103
104 async fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
106 let reader = buf.create_reader();
107 let result = self.write(&reader[..]).await
108 .map_err(|e| {
109 error!("error sending to network: {}", Debug2Format(e));
110 NetworkError::ConnectionFailed
111 });
112 match result {
113 Ok(n) => {
114 reader.add_bytes_read(n);
115 trace!("sent {} bytes to network", n);
116 Ok(n)
117 },
118 Err(e) => {
119 Err(e)
120 },
121 }
122 }
123
124 async fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
125
126 let reader = buf.create_reader();
127 let result = self.try_write(&reader[..]).await
128 .map_err(|e| {
129 error!("error try_sending to network: {}", Debug2Format(e));
130 NetworkError::ConnectionFailed}
131 );
132 match result {
133 Ok(n) => {
134 reader.add_bytes_read(n);
135 trace!("sent {} bytes to network", n);
136 Ok(n)
137 },
138 Err(e) => {
139 Err(e)
140 },
141 }
142 }
143
144 async fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
145 if ! buf.ensure_remaining_capacity() {
146 warn!("cannot receive from network: buffer is full");
147 return Ok(0);
148 }
149
150 let mut writer = buf.create_writer();
151
152 let result = self.read(&mut writer).await
153 .map_err(|e| {
154 error!("error receive from network: {}", Debug2Format(e));
155 NetworkError::ConnectionFailed
156 });
157
158 match result {
159 Ok(n) => {
160 writer.commit(n).unwrap();
162
163 trace!("received {} bytes from network", n);
164 Ok(n)
165 },
166 Err(e) => {
167 Err(e)
168 },
169 }
170 }
171
172 async fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
173 if ! buf.ensure_remaining_capacity() {
174 warn!("cannot receive from network: buffer is full");
175 return Ok(0);
176 }
177
178 let mut writer = buf.create_writer();
179
180 let result = self.try_read(&mut writer).await
181 .map_err(|e| {
182 error!("error try_receive from network: {}", Debug2Format(e));
183 NetworkError::ConnectionFailed
184 });
185
186 match result {
187 Ok(n) => {
188 writer.commit(n).unwrap();
190
191 trace!("received {} bytes from network", n);
192 Ok(n)
193 },
194 Err(e) => {
195 Err(e)
196 },
197 }
198 }
199
200}