embedded_mqttc/network/
mod.rs1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::future::Future;
4
5use embytes_buffer::{Buffer, BufferReader, BufferWriter, ReadWrite};
6use embedded_io_async::{ErrorType, Read, ReadReady, Write, WriteReady};
7use thiserror::Error;
8
9
10use crate::fmt::Debug2Format;
11
12
13#[cfg(feature = "embassy")]
14pub mod embassy;
15
16#[cfg(feature = "std")]
17pub mod std;
18
19pub mod fake;
20
21pub mod mqtt;
22
23#[derive(Debug, PartialEq, Clone, Error)]
24#[cfg_attr(feature = "defmt", derive(defmt::Format))]
25pub enum NetworkError {
26
27 #[error("Could not find host")]
28 HostNotFound,
29
30 #[error("DNS Request failed")]
31 DnsFailed,
32
33 #[error("sending / retrieving from network failed")]
34 ConnectionFailed,
35
36 #[cfg(feature = "embassy")]
37 #[error("failed to connect to tcp endpoint")]
38 ConnectError(embassy_net::tcp::ConnectError),
39}
40
41#[cfg(feature = "embassy")]
42impl From<embassy_net::tcp::ConnectError> for NetworkError {
43 fn from(value: embassy_net::tcp::ConnectError) -> Self {
44 Self::ConnectError(value)
45 }
46}
47
48
49
50pub trait TryRead: ErrorType {
51 fn try_read(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<usize, Self::Error>>;
52}
53
54impl <T> TryRead for T where T: Read + ReadReady{
55 async fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
56 if self.read_ready()? {
57 self.read(buf).await
58 } else {
59 Ok(0)
60 }
61 }
62}
63
64pub trait TryWrite: ErrorType {
65 fn try_write(&mut self, buf: &[u8]) -> impl Future<Output = Result<usize, Self::Error>>;
66}
67
68impl <T> TryWrite for T where T: Write + WriteReady{
69 async fn try_write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
70 if self.write_ready()? {
71 self.write(buf).await
72 } else {
73 Ok(0)
74 }
75 }
76}
77
78pub trait NetworkConnection: Read + Write + TryWrite + TryRead + ErrorType {
79
80 fn connect(&mut self) -> impl Future<Output = Result<(), NetworkError>>;
82
83}
84
85pub trait NetwordSendReceive {
86 fn send_all(&mut self, buffer: &mut impl BufferReader) -> impl Future<Output = Result<(), NetworkError>>;
87 fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
88 fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
89
90 fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
91 fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> impl Future<Output = Result<usize, NetworkError>>;
92
93}
94
95impl <C> NetwordSendReceive for C where C: NetworkConnection {
96
97 async fn send_all(&mut self, buffer: &mut impl BufferReader) -> Result<(), NetworkError> {
98 self.write_all(buffer)
99 .await.map_err(|e| {
100 error!("error sending to network: {}", Debug2Format(e));
101 NetworkError::ConnectionFailed
102 })?;
103
104 Ok(())
105 }
106
107 async fn send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
109 let reader = buf.create_reader();
110 let result = self.write(&reader[..]).await
111 .map_err(|e| {
112 error!("error sending to network: {}", Debug2Format(e));
113 NetworkError::ConnectionFailed
114 });
115 match result {
116 Ok(n) => {
117 reader.add_bytes_read(n);
118 trace!("sent {} bytes to network", n);
119 Ok(n)
120 },
121 Err(e) => {
122 Err(e)
123 },
124 }
125 }
126
127 async fn try_send<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
128
129 let reader = buf.create_reader();
130 let result = self.try_write(&reader[..]).await
131 .map_err(|e| {
132 error!("error try_sending to network: {}", Debug2Format(e));
133 NetworkError::ConnectionFailed}
134 );
135 match result {
136 Ok(n) => {
137 reader.add_bytes_read(n);
138 trace!("sent {} bytes to network", n);
139 Ok(n)
140 },
141 Err(e) => {
142 Err(e)
143 },
144 }
145 }
146
147 async fn receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
148 if ! buf.ensure_remaining_capacity() {
149 warn!("cannot receive from network: buffer is full");
150 return Ok(0);
151 }
152
153 let mut writer = buf.create_writer();
154
155 let result = self.read(&mut writer).await
156 .map_err(|e| {
157 error!("error receive from network: {}", Debug2Format(e));
158 NetworkError::ConnectionFailed
159 });
160
161 match result {
162 Ok(n) => {
163 writer.commit(n).unwrap();
165
166 trace!("received {} bytes from network", n);
167 Ok(n)
168 },
169 Err(e) => {
170 Err(e)
171 },
172 }
173 }
174
175 async fn try_receive<T: AsMut<[u8]> + AsRef<[u8]>>(&mut self, buf: &mut Buffer<T>) -> Result<usize, NetworkError> {
176 if ! buf.ensure_remaining_capacity() {
177 warn!("cannot receive from network: buffer is full");
178 return Ok(0);
179 }
180
181 let mut writer = buf.create_writer();
182
183 let result = self.try_read(&mut writer).await
184 .map_err(|e| {
185 error!("error try_receive from network: {}", Debug2Format(e));
186 NetworkError::ConnectionFailed
187 });
188
189 match result {
190 Ok(n) => {
191 writer.commit(n).unwrap();
193
194 trace!("received {} bytes from network", n);
195 Ok(n)
196 },
197 Err(e) => {
198 Err(e)
199 },
200 }
201 }
202
203}