1use crate::{
2 error::Error,
3 socket::{Socket, SocketFamily, SocketProtocol, SocketType, SplitSocketHandle},
4 CancellationToken, LteLink,
5};
6use core::net::SocketAddr;
7
8pub struct TcpStream {
10 inner: Socket,
11}
12
13macro_rules! impl_receive {
14 () => {
15 pub async fn receive<'buf>(&self, buf: &'buf mut [u8]) -> Result<&'buf mut [u8], Error> {
18 self.receive_with_cancellation(buf, &Default::default())
19 .await
20 }
21
22 pub async fn receive_with_cancellation<'buf>(
25 &self,
26 buf: &'buf mut [u8],
27 token: &CancellationToken,
28 ) -> Result<&'buf mut [u8], Error> {
29 let max_receive_len = 1024.min(buf.len());
30 let received_bytes = self
31 .socket()
32 .receive(&mut buf[..max_receive_len], token)
33 .await?;
34 Ok(&mut buf[..received_bytes])
35 }
36
37 pub async fn receive_exact<'buf>(
43 &self,
44 buf: &'buf mut [u8],
45 ) -> Result<(), (Error, &'buf mut [u8])> {
46 self.receive_exact_with_cancellation(buf, &Default::default())
47 .await
48 }
49
50 pub async fn receive_exact_with_cancellation<'buf>(
56 &self,
57 buf: &'buf mut [u8],
58 token: &CancellationToken,
59 ) -> Result<(), (Error, &'buf mut [u8])> {
60 let mut received_bytes = 0;
61
62 while received_bytes < buf.len() {
63 match self
64 .receive_with_cancellation(&mut buf[received_bytes..], token)
65 .await
66 {
67 Ok(received_data) => received_bytes += received_data.len(),
68 Err(e) => return Err((e.into(), &mut buf[..received_bytes])),
69 }
70 }
71
72 Ok(())
73 }
74 };
75}
76
77macro_rules! impl_write {
78 () => {
79 pub async fn write(&self, buf: &[u8]) -> Result<(), Error> {
81 self.write_with_cancellation(buf, &Default::default()).await
82 }
83
84 pub async fn write_with_cancellation(
86 &self,
87 buf: &[u8],
88 token: &CancellationToken,
89 ) -> Result<(), Error> {
90 let mut written_bytes = 0;
91
92 while written_bytes < buf.len() {
93 let max_write_len = 1024.min(buf.len() - written_bytes);
95 written_bytes += self
96 .socket()
97 .write(&buf[written_bytes..][..max_write_len], token)
98 .await?;
99 }
100
101 Ok(())
102 }
103 };
104}
105
106impl TcpStream {
107 pub async fn connect(addr: SocketAddr) -> Result<Self, Error> {
109 Self::connect_with_cancellation(addr, &Default::default()).await
110 }
111
112 pub async fn connect_with_cancellation(
114 addr: SocketAddr,
115 token: &CancellationToken,
116 ) -> Result<Self, Error> {
117 let lte_link = LteLink::new().await?;
118
119 token.as_result()?;
120
121 let family = match addr {
122 SocketAddr::V4(_) => SocketFamily::Ipv4,
123 SocketAddr::V6(_) => SocketFamily::Ipv6,
124 };
125
126 let socket = Socket::create(family, SocketType::Stream, SocketProtocol::Tcp).await?;
127
128 match unsafe { socket.connect(addr, token).await } {
129 Ok(_) => {
130 lte_link.deactivate().await?;
131 Ok(TcpStream { inner: socket })
132 }
133 Err(e) => {
134 lte_link.deactivate().await?;
135 socket.deactivate().await?;
136 Err(e)
137 }
138 }
139 }
140
141 pub fn as_raw_fd(&self) -> i32 {
143 self.inner.as_raw_fd()
144 }
145
146 fn socket(&self) -> &Socket {
147 &self.inner
148 }
149
150 pub async fn split_owned(self) -> Result<(OwnedTcpReadStream, OwnedTcpWriteStream), Error> {
152 let (read_split, write_split) = self.inner.split().await?;
153
154 Ok((
155 OwnedTcpReadStream { stream: read_split },
156 OwnedTcpWriteStream {
157 stream: write_split,
158 },
159 ))
160 }
161
162 pub fn split(&self) -> (TcpReadStream<'_>, TcpWriteStream<'_>) {
164 (
165 TcpReadStream { stream: self },
166 TcpWriteStream { stream: self },
167 )
168 }
169
170 impl_receive!();
171 impl_write!();
172
173 pub async fn deactivate(self) -> Result<(), Error> {
176 self.inner.deactivate().await?;
177 Ok(())
178 }
179}
180
181crate::embedded_io_macros::impl_error_trait!(TcpStream, Error, <>);
182crate::embedded_io_macros::impl_read_trait!(TcpStream, <>);
183crate::embedded_io_macros::impl_write_trait!(TcpStream, <>);
184
185pub struct TcpReadStream<'a> {
187 stream: &'a TcpStream,
188}
189
190impl TcpReadStream<'_> {
191 fn socket(&self) -> &Socket {
192 &self.stream.inner
193 }
194
195 impl_receive!();
196}
197
198crate::embedded_io_macros::impl_error_trait!(TcpReadStream<'a>, Error, <'a>);
199crate::embedded_io_macros::impl_read_trait!(TcpReadStream<'a>, <'a>);
200
201pub struct TcpWriteStream<'a> {
203 stream: &'a TcpStream,
204}
205
206impl TcpWriteStream<'_> {
207 fn socket(&self) -> &Socket {
208 &self.stream.inner
209 }
210
211 impl_write!();
212}
213
214crate::embedded_io_macros::impl_error_trait!(TcpWriteStream<'a>, Error, <'a>);
215crate::embedded_io_macros::impl_write_trait!(TcpWriteStream<'a>, <'a>);
216
217pub struct OwnedTcpReadStream {
219 stream: SplitSocketHandle,
220}
221
222impl OwnedTcpReadStream {
223 fn socket(&self) -> &Socket {
224 &self.stream
225 }
226
227 impl_receive!();
228
229 pub async fn deactivate(self) -> Result<(), Error> {
232 self.stream.deactivate().await?;
233 Ok(())
234 }
235}
236
237crate::embedded_io_macros::impl_error_trait!(OwnedTcpReadStream, Error, <>);
238crate::embedded_io_macros::impl_read_trait!(OwnedTcpReadStream, <>);
239
240pub struct OwnedTcpWriteStream {
242 stream: SplitSocketHandle,
243}
244
245impl OwnedTcpWriteStream {
246 fn socket(&self) -> &Socket {
247 &self.stream
248 }
249
250 impl_write!();
251
252 pub async fn deactivate(self) -> Result<(), Error> {
255 self.stream.deactivate().await?;
256 Ok(())
257 }
258}
259
260crate::embedded_io_macros::impl_error_trait!(OwnedTcpWriteStream, Error, <>);
261crate::embedded_io_macros::impl_write_trait!(OwnedTcpWriteStream, <>);