Skip to main content

donglora_client/
transport.rs

1//! Async transport abstractions.
2//!
3//! The [`Transport`] marker trait unifies tokio-based byte streams so the
4//! [`Session`](crate::session::Session) can drive any of them. Three
5//! concrete transports are provided:
6//!
7//! - [`SerialTransport`] — `tokio-serial` USB CDC-ACM.
8//! - [`UnixSocketTransport`] — mux daemon over `tokio::net::UnixStream`.
9//! - [`TcpTransport`] — mux daemon over `tokio::net::TcpStream`.
10//!
11//! Most callers use [`AnyTransport`] so the concrete variant is decided
12//! at `connect()` time without leaking type parameters into user code.
13
14use std::io;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use std::time::Duration;
18
19use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
20use tokio::net::TcpStream;
21#[cfg(unix)]
22use tokio::net::UnixStream;
23use tokio_serial::{SerialPortBuilderExt, SerialStream};
24
25use crate::errors::{ClientError, ClientResult};
26
27/// Marker for any async byte-stream we can carry DongLoRa Protocol frames over.
28///
29/// This is intentionally a blanket trait so the [`Session`](crate::session::Session)
30/// can accept `SerialStream`, `UnixStream`, `TcpStream`, or the
31/// type-erased [`AnyTransport`] uniformly.
32pub trait Transport: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
33impl<T: AsyncRead + AsyncWrite + Unpin + Send + 'static> Transport for T {}
34
35// ── Serial transport ───────────────────────────────────────────────
36
37/// Direct USB serial port connection (tokio-serial).
38pub struct SerialTransport {
39    inner: SerialStream,
40}
41
42impl SerialTransport {
43    /// Open a serial port. Baud rate is irrelevant for USB CDC-ACM but
44    /// `tokio-serial` still demands a value; 115200 is conventional.
45    pub fn open(path: &str) -> ClientResult<Self> {
46        let inner = tokio_serial::new(path, 115_200)
47            .open_native_async()
48            .map_err(|e| ClientError::Other(format!("failed to open serial port {path}: {e}")))?;
49        Ok(Self { inner })
50    }
51}
52
53impl AsyncRead for SerialTransport {
54    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
55        Pin::new(&mut self.inner).poll_read(cx, buf)
56    }
57}
58
59impl AsyncWrite for SerialTransport {
60    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
61        Pin::new(&mut self.inner).poll_write(cx, buf)
62    }
63
64    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
65        Pin::new(&mut self.inner).poll_flush(cx)
66    }
67
68    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
69        Pin::new(&mut self.inner).poll_shutdown(cx)
70    }
71}
72
73// ── Unix socket transport ──────────────────────────────────────────
74
75/// Mux daemon connection over a Unix domain socket.
76#[cfg(unix)]
77pub struct UnixSocketTransport {
78    inner: UnixStream,
79}
80
81#[cfg(unix)]
82impl UnixSocketTransport {
83    /// Connect to the mux daemon at `path`.
84    pub async fn connect(path: &str) -> ClientResult<Self> {
85        let inner = UnixStream::connect(path)
86            .await
87            .map_err(|e| ClientError::Other(format!("failed to connect to mux socket {path}: {e}")))?;
88        Ok(Self { inner })
89    }
90}
91
92#[cfg(unix)]
93impl AsyncRead for UnixSocketTransport {
94    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
95        Pin::new(&mut self.inner).poll_read(cx, buf)
96    }
97}
98
99#[cfg(unix)]
100impl AsyncWrite for UnixSocketTransport {
101    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
102        Pin::new(&mut self.inner).poll_write(cx, buf)
103    }
104
105    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
106        Pin::new(&mut self.inner).poll_flush(cx)
107    }
108
109    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
110        Pin::new(&mut self.inner).poll_shutdown(cx)
111    }
112}
113
114// ── TCP transport ──────────────────────────────────────────────────
115
116/// Mux daemon connection over TCP. Uses `TCP_NODELAY` because DongLoRa Protocol frames
117/// are small and latency-sensitive.
118pub struct TcpTransport {
119    inner: TcpStream,
120}
121
122impl TcpTransport {
123    /// Connect to `host:port`. A failed connect attempt returns an error
124    /// immediately; no internal retries (callers decide policy).
125    pub async fn connect(host: &str, port: u16, timeout: Duration) -> ClientResult<Self> {
126        let addr = format!("{host}:{port}");
127        let fut = TcpStream::connect(&addr);
128        let stream = tokio::time::timeout(timeout, fut)
129            .await
130            .map_err(|_| ClientError::Timeout { what: "tcp connect" })?
131            .map_err(|e| ClientError::Other(format!("failed to connect to {addr}: {e}")))?;
132        stream.set_nodelay(true).map_err(|e| ClientError::Other(format!("failed to set TCP_NODELAY: {e}")))?;
133        Ok(Self { inner: stream })
134    }
135}
136
137impl AsyncRead for TcpTransport {
138    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
139        Pin::new(&mut self.inner).poll_read(cx, buf)
140    }
141}
142
143impl AsyncWrite for TcpTransport {
144    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
145        Pin::new(&mut self.inner).poll_write(cx, buf)
146    }
147
148    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
149        Pin::new(&mut self.inner).poll_flush(cx)
150    }
151
152    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
153        Pin::new(&mut self.inner).poll_shutdown(cx)
154    }
155}
156
157// ── AnyTransport ───────────────────────────────────────────────────
158
159/// Type-erased transport returned by [`crate::connect`]. One of the three
160/// concrete variants depending on which transport the auto-discovery chain
161/// succeeded on.
162pub enum AnyTransport {
163    Serial(SerialTransport),
164    #[cfg(unix)]
165    Unix(UnixSocketTransport),
166    Tcp(TcpTransport),
167}
168
169impl AsyncRead for AnyTransport {
170    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
171        match self.get_mut() {
172            Self::Serial(t) => Pin::new(t).poll_read(cx, buf),
173            #[cfg(unix)]
174            Self::Unix(t) => Pin::new(t).poll_read(cx, buf),
175            Self::Tcp(t) => Pin::new(t).poll_read(cx, buf),
176        }
177    }
178}
179
180impl AsyncWrite for AnyTransport {
181    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
182        match self.get_mut() {
183            Self::Serial(t) => Pin::new(t).poll_write(cx, buf),
184            #[cfg(unix)]
185            Self::Unix(t) => Pin::new(t).poll_write(cx, buf),
186            Self::Tcp(t) => Pin::new(t).poll_write(cx, buf),
187        }
188    }
189
190    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
191        match self.get_mut() {
192            Self::Serial(t) => Pin::new(t).poll_flush(cx),
193            #[cfg(unix)]
194            Self::Unix(t) => Pin::new(t).poll_flush(cx),
195            Self::Tcp(t) => Pin::new(t).poll_flush(cx),
196        }
197    }
198
199    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
200        match self.get_mut() {
201            Self::Serial(t) => Pin::new(t).poll_shutdown(cx),
202            #[cfg(unix)]
203            Self::Unix(t) => Pin::new(t).poll_shutdown(cx),
204            Self::Tcp(t) => Pin::new(t).poll_shutdown(cx),
205        }
206    }
207}