use crate::{
error::Error,
socket::{Socket, SocketFamily, SocketProtocol, SocketType, SplitSocketHandle},
CancellationToken, LteLink,
};
use core::net::SocketAddr;
pub struct TcpStream {
inner: Socket,
}
macro_rules! impl_receive {
() => {
pub async fn receive<'buf>(&self, buf: &'buf mut [u8]) -> Result<&'buf mut [u8], Error> {
self.receive_with_cancellation(buf, &Default::default())
.await
}
pub async fn receive_with_cancellation<'buf>(
&self,
buf: &'buf mut [u8],
token: &CancellationToken,
) -> Result<&'buf mut [u8], Error> {
let max_receive_len = 1024.min(buf.len());
let received_bytes = self
.socket()
.receive_with_cancellation(&mut buf[..max_receive_len], token)
.await?;
Ok(&mut buf[..received_bytes])
}
pub async fn receive_exact<'buf>(
&self,
buf: &'buf mut [u8],
) -> Result<(), (Error, &'buf mut [u8])> {
self.receive_exact_with_cancellation(buf, &Default::default())
.await
}
pub async fn receive_exact_with_cancellation<'buf>(
&self,
buf: &'buf mut [u8],
token: &CancellationToken,
) -> Result<(), (Error, &'buf mut [u8])> {
let mut received_bytes = 0;
while received_bytes < buf.len() {
match self
.receive_with_cancellation(&mut buf[received_bytes..], token)
.await
{
Ok(received_data) => received_bytes += received_data.len(),
Err(e) => return Err((e.into(), &mut buf[..received_bytes])),
}
}
Ok(())
}
};
}
macro_rules! impl_write {
() => {
pub async fn write(&self, buf: &[u8]) -> Result<(), Error> {
self.write_with_cancellation(buf, &Default::default()).await
}
pub async fn write_with_cancellation(
&self,
buf: &[u8],
token: &CancellationToken,
) -> Result<(), Error> {
let mut written_bytes = 0;
while written_bytes < buf.len() {
let max_write_len = 1024.min(buf.len() - written_bytes);
written_bytes += self
.socket()
.write_with_cancellation(&buf[written_bytes..][..max_write_len], token)
.await?;
}
Ok(())
}
};
}
impl TcpStream {
pub async fn connect(addr: SocketAddr) -> Result<Self, Error> {
Self::connect_with_cancellation(addr, &Default::default()).await
}
pub async fn connect_with_cancellation(
addr: SocketAddr,
token: &CancellationToken,
) -> Result<Self, Error> {
let lte_link = LteLink::new().await?;
token.as_result()?;
let family = match addr {
SocketAddr::V4(_) => SocketFamily::Ipv4,
SocketAddr::V6(_) => SocketFamily::Ipv6,
};
let socket = Socket::create(family, SocketType::Stream, SocketProtocol::Tcp).await?;
match unsafe { socket.connect_with_cancellation(addr, token).await } {
Ok(_) => {
lte_link.deactivate().await?;
Ok(TcpStream { inner: socket })
}
Err(e) => {
lte_link.deactivate().await?;
socket.deactivate().await?;
Err(e)
}
}
}
pub fn as_raw_fd(&self) -> i32 {
self.inner.as_raw_fd()
}
fn socket(&self) -> &Socket {
&self.inner
}
pub async fn split_owned(self) -> Result<(OwnedTcpReadStream, OwnedTcpWriteStream), Error> {
let (read_split, write_split) = self.inner.split().await?;
Ok((
OwnedTcpReadStream { stream: read_split },
OwnedTcpWriteStream {
stream: write_split,
},
))
}
pub fn split(&self) -> (TcpReadStream<'_>, TcpWriteStream<'_>) {
(
TcpReadStream { stream: self },
TcpWriteStream { stream: self },
)
}
impl_receive!();
impl_write!();
pub async fn deactivate(self) -> Result<(), Error> {
self.inner.deactivate().await?;
Ok(())
}
}
crate::embedded_io_macros::impl_error_trait!(TcpStream, Error, <>);
crate::embedded_io_macros::impl_read_trait!(TcpStream, <>);
crate::embedded_io_macros::impl_write_trait!(TcpStream, <>);
pub struct TcpReadStream<'a> {
stream: &'a TcpStream,
}
impl TcpReadStream<'_> {
fn socket(&self) -> &Socket {
&self.stream.inner
}
impl_receive!();
}
crate::embedded_io_macros::impl_error_trait!(TcpReadStream<'a>, Error, <'a>);
crate::embedded_io_macros::impl_read_trait!(TcpReadStream<'a>, <'a>);
pub struct TcpWriteStream<'a> {
stream: &'a TcpStream,
}
impl TcpWriteStream<'_> {
fn socket(&self) -> &Socket {
&self.stream.inner
}
impl_write!();
}
crate::embedded_io_macros::impl_error_trait!(TcpWriteStream<'a>, Error, <'a>);
crate::embedded_io_macros::impl_write_trait!(TcpWriteStream<'a>, <'a>);
pub struct OwnedTcpReadStream {
stream: SplitSocketHandle,
}
impl OwnedTcpReadStream {
fn socket(&self) -> &Socket {
&self.stream
}
impl_receive!();
pub async fn deactivate(self) -> Result<(), Error> {
self.stream.deactivate().await?;
Ok(())
}
}
crate::embedded_io_macros::impl_error_trait!(OwnedTcpReadStream, Error, <>);
crate::embedded_io_macros::impl_read_trait!(OwnedTcpReadStream, <>);
pub struct OwnedTcpWriteStream {
stream: SplitSocketHandle,
}
impl OwnedTcpWriteStream {
fn socket(&self) -> &Socket {
&self.stream
}
impl_write!();
pub async fn deactivate(self) -> Result<(), Error> {
self.stream.deactivate().await?;
Ok(())
}
}
crate::embedded_io_macros::impl_error_trait!(OwnedTcpWriteStream, Error, <>);
crate::embedded_io_macros::impl_write_trait!(OwnedTcpWriteStream, <>);