nrf_modem/
tcp_stream.rs

1use crate::{
2    error::Error,
3    socket::{Socket, SocketFamily, SocketProtocol, SocketType, SplitSocketHandle},
4    CancellationToken, LteLink,
5};
6use core::net::SocketAddr;
7
8/// A TCP stream that is connected to another endpoint
9pub struct TcpStream {
10    inner: Socket,
11}
12
13macro_rules! impl_receive {
14    () => {
15        /// Try fill the given buffer with the data that has been received. The written part of the
16        /// buffer is returned.
17        pub async fn receive<'buf>(&self, buf: &'buf mut [u8]) -> Result<&'buf mut [u8], Error> {
18            self.receive_with_cancellation(buf, &Default::default())
19                .await
20        }
21
22        /// Try fill the given buffer with the data that has been received. The written part of the
23        /// buffer is returned.
24        pub async fn receive_with_cancellation<'buf>(
25            &self,
26            buf: &'buf mut [u8],
27            token: &CancellationToken,
28        ) -> Result<&'buf mut [u8], Error> {
29            let max_receive_len = 1024.min(buf.len());
30            let received_bytes = self
31                .socket()
32                .receive(&mut buf[..max_receive_len], token)
33                .await?;
34            Ok(&mut buf[..received_bytes])
35        }
36
37        /// Fill the entire buffer with data that has been received. This will wait as long as necessary to fill up the
38        /// buffer.
39        ///
40        /// If there's an error while receiving, then the error is returned as well as the part of the buffer that was
41        /// partially filled with received data.
42        pub async fn receive_exact<'buf>(
43            &self,
44            buf: &'buf mut [u8],
45        ) -> Result<(), (Error, &'buf mut [u8])> {
46            self.receive_exact_with_cancellation(buf, &Default::default())
47                .await
48        }
49
50        /// Fill the entire buffer with data that has been received. This will wait as long as necessary to fill up the
51        /// buffer.
52        ///
53        /// If there's an error while receiving, then the error is returned as well as the part of the buffer that was
54        /// partially filled with received data.
55        pub async fn receive_exact_with_cancellation<'buf>(
56            &self,
57            buf: &'buf mut [u8],
58            token: &CancellationToken,
59        ) -> Result<(), (Error, &'buf mut [u8])> {
60            let mut received_bytes = 0;
61
62            while received_bytes < buf.len() {
63                match self
64                    .receive_with_cancellation(&mut buf[received_bytes..], token)
65                    .await
66                {
67                    Ok(received_data) => received_bytes += received_data.len(),
68                    Err(e) => return Err((e.into(), &mut buf[..received_bytes])),
69                }
70            }
71
72            Ok(())
73        }
74    };
75}
76
77macro_rules! impl_write {
78    () => {
79        /// Write the entire buffer to the stream
80        pub async fn write(&self, buf: &[u8]) -> Result<(), Error> {
81            self.write_with_cancellation(buf, &Default::default()).await
82        }
83
84        /// Write the entire buffer to the stream
85        pub async fn write_with_cancellation(
86            &self,
87            buf: &[u8],
88            token: &CancellationToken,
89        ) -> Result<(), Error> {
90            let mut written_bytes = 0;
91
92            while written_bytes < buf.len() {
93                // We can't write very huge chunks because then the socket can't process it all at once
94                let max_write_len = 1024.min(buf.len() - written_bytes);
95                written_bytes += self
96                    .socket()
97                    .write(&buf[written_bytes..][..max_write_len], token)
98                    .await?;
99            }
100
101            Ok(())
102        }
103    };
104}
105
106impl TcpStream {
107    /// Connect a TCP stream to the given address
108    pub async fn connect(addr: SocketAddr) -> Result<Self, Error> {
109        Self::connect_with_cancellation(addr, &Default::default()).await
110    }
111
112    /// Connect a TCP stream to the given address
113    pub async fn connect_with_cancellation(
114        addr: SocketAddr,
115        token: &CancellationToken,
116    ) -> Result<Self, Error> {
117        let lte_link = LteLink::new().await?;
118
119        token.as_result()?;
120
121        let family = match addr {
122            SocketAddr::V4(_) => SocketFamily::Ipv4,
123            SocketAddr::V6(_) => SocketFamily::Ipv6,
124        };
125
126        let socket = Socket::create(family, SocketType::Stream, SocketProtocol::Tcp).await?;
127
128        match unsafe { socket.connect(addr, token).await } {
129            Ok(_) => {
130                lte_link.deactivate().await?;
131                Ok(TcpStream { inner: socket })
132            }
133            Err(e) => {
134                lte_link.deactivate().await?;
135                socket.deactivate().await?;
136                Err(e)
137            }
138        }
139    }
140
141    /// Get the raw underlying file descriptor for when you need to interact with the nrf libraries directly
142    pub fn as_raw_fd(&self) -> i32 {
143        self.inner.as_raw_fd()
144    }
145
146    fn socket(&self) -> &Socket {
147        &self.inner
148    }
149
150    /// Split the stream into an owned read and write half
151    pub async fn split_owned(self) -> Result<(OwnedTcpReadStream, OwnedTcpWriteStream), Error> {
152        let (read_split, write_split) = self.inner.split().await?;
153
154        Ok((
155            OwnedTcpReadStream { stream: read_split },
156            OwnedTcpWriteStream {
157                stream: write_split,
158            },
159        ))
160    }
161
162    /// Split the stream into a borrowed read and write half
163    pub fn split(&self) -> (TcpReadStream<'_>, TcpWriteStream<'_>) {
164        (
165            TcpReadStream { stream: self },
166            TcpWriteStream { stream: self },
167        )
168    }
169
170    impl_receive!();
171    impl_write!();
172
173    /// Deactivates the socket and the LTE link.
174    /// A normal drop will do the same thing, but blocking.
175    pub async fn deactivate(self) -> Result<(), Error> {
176        self.inner.deactivate().await?;
177        Ok(())
178    }
179}
180
181crate::embedded_io_macros::impl_error_trait!(TcpStream, Error, <>);
182crate::embedded_io_macros::impl_read_trait!(TcpStream, <>);
183crate::embedded_io_macros::impl_write_trait!(TcpStream, <>);
184
185/// A borrowed read half of a TCP stream
186pub struct TcpReadStream<'a> {
187    stream: &'a TcpStream,
188}
189
190impl TcpReadStream<'_> {
191    fn socket(&self) -> &Socket {
192        &self.stream.inner
193    }
194
195    impl_receive!();
196}
197
198crate::embedded_io_macros::impl_error_trait!(TcpReadStream<'a>, Error, <'a>);
199crate::embedded_io_macros::impl_read_trait!(TcpReadStream<'a>, <'a>);
200
201/// A borrowed write half of a TCP stream
202pub struct TcpWriteStream<'a> {
203    stream: &'a TcpStream,
204}
205
206impl TcpWriteStream<'_> {
207    fn socket(&self) -> &Socket {
208        &self.stream.inner
209    }
210
211    impl_write!();
212}
213
214crate::embedded_io_macros::impl_error_trait!(TcpWriteStream<'a>, Error, <'a>);
215crate::embedded_io_macros::impl_write_trait!(TcpWriteStream<'a>, <'a>);
216
217/// An owned read half of a TCP stream
218pub struct OwnedTcpReadStream {
219    stream: SplitSocketHandle,
220}
221
222impl OwnedTcpReadStream {
223    fn socket(&self) -> &Socket {
224        &self.stream
225    }
226
227    impl_receive!();
228
229    /// Deactivates the socket and the LTE link.
230    /// A normal drop will do the same thing, but blocking.
231    pub async fn deactivate(self) -> Result<(), Error> {
232        self.stream.deactivate().await?;
233        Ok(())
234    }
235}
236
237crate::embedded_io_macros::impl_error_trait!(OwnedTcpReadStream, Error, <>);
238crate::embedded_io_macros::impl_read_trait!(OwnedTcpReadStream, <>);
239
240/// An owned write half of a TCP stream
241pub struct OwnedTcpWriteStream {
242    stream: SplitSocketHandle,
243}
244
245impl OwnedTcpWriteStream {
246    fn socket(&self) -> &Socket {
247        &self.stream
248    }
249
250    impl_write!();
251
252    /// Deactivates the socket and the LTE link.
253    /// A normal drop will do the same thing, but blocking.
254    pub async fn deactivate(self) -> Result<(), Error> {
255        self.stream.deactivate().await?;
256        Ok(())
257    }
258}
259
260crate::embedded_io_macros::impl_error_trait!(OwnedTcpWriteStream, Error, <>);
261crate::embedded_io_macros::impl_write_trait!(OwnedTcpWriteStream, <>);