donglora_client/
transport.rs1use std::io;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use std::time::Duration;
18
19use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
20use tokio::net::TcpStream;
21#[cfg(unix)]
22use tokio::net::UnixStream;
23use tokio_serial::{SerialPortBuilderExt, SerialStream};
24
25use crate::errors::{ClientError, ClientResult};
26
27pub trait Transport: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
33impl<T: AsyncRead + AsyncWrite + Unpin + Send + 'static> Transport for T {}
34
35pub struct SerialTransport {
39 inner: SerialStream,
40}
41
42impl SerialTransport {
43 pub fn open(path: &str) -> ClientResult<Self> {
46 let inner = tokio_serial::new(path, 115_200)
47 .open_native_async()
48 .map_err(|e| ClientError::Other(format!("failed to open serial port {path}: {e}")))?;
49 Ok(Self { inner })
50 }
51}
52
53impl AsyncRead for SerialTransport {
54 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
55 Pin::new(&mut self.inner).poll_read(cx, buf)
56 }
57}
58
59impl AsyncWrite for SerialTransport {
60 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
61 Pin::new(&mut self.inner).poll_write(cx, buf)
62 }
63
64 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
65 Pin::new(&mut self.inner).poll_flush(cx)
66 }
67
68 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
69 Pin::new(&mut self.inner).poll_shutdown(cx)
70 }
71}
72
73#[cfg(unix)]
77pub struct UnixSocketTransport {
78 inner: UnixStream,
79}
80
81#[cfg(unix)]
82impl UnixSocketTransport {
83 pub async fn connect(path: &str) -> ClientResult<Self> {
85 let inner = UnixStream::connect(path)
86 .await
87 .map_err(|e| ClientError::Other(format!("failed to connect to mux socket {path}: {e}")))?;
88 Ok(Self { inner })
89 }
90}
91
92#[cfg(unix)]
93impl AsyncRead for UnixSocketTransport {
94 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
95 Pin::new(&mut self.inner).poll_read(cx, buf)
96 }
97}
98
99#[cfg(unix)]
100impl AsyncWrite for UnixSocketTransport {
101 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
102 Pin::new(&mut self.inner).poll_write(cx, buf)
103 }
104
105 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
106 Pin::new(&mut self.inner).poll_flush(cx)
107 }
108
109 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
110 Pin::new(&mut self.inner).poll_shutdown(cx)
111 }
112}
113
114pub struct TcpTransport {
119 inner: TcpStream,
120}
121
122impl TcpTransport {
123 pub async fn connect(host: &str, port: u16, timeout: Duration) -> ClientResult<Self> {
126 let addr = format!("{host}:{port}");
127 let fut = TcpStream::connect(&addr);
128 let stream = tokio::time::timeout(timeout, fut)
129 .await
130 .map_err(|_| ClientError::Timeout { what: "tcp connect" })?
131 .map_err(|e| ClientError::Other(format!("failed to connect to {addr}: {e}")))?;
132 stream.set_nodelay(true).map_err(|e| ClientError::Other(format!("failed to set TCP_NODELAY: {e}")))?;
133 Ok(Self { inner: stream })
134 }
135}
136
137impl AsyncRead for TcpTransport {
138 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
139 Pin::new(&mut self.inner).poll_read(cx, buf)
140 }
141}
142
143impl AsyncWrite for TcpTransport {
144 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
145 Pin::new(&mut self.inner).poll_write(cx, buf)
146 }
147
148 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
149 Pin::new(&mut self.inner).poll_flush(cx)
150 }
151
152 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
153 Pin::new(&mut self.inner).poll_shutdown(cx)
154 }
155}
156
157pub enum AnyTransport {
163 Serial(SerialTransport),
164 #[cfg(unix)]
165 Unix(UnixSocketTransport),
166 Tcp(TcpTransport),
167}
168
169impl AsyncRead for AnyTransport {
170 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
171 match self.get_mut() {
172 Self::Serial(t) => Pin::new(t).poll_read(cx, buf),
173 #[cfg(unix)]
174 Self::Unix(t) => Pin::new(t).poll_read(cx, buf),
175 Self::Tcp(t) => Pin::new(t).poll_read(cx, buf),
176 }
177 }
178}
179
180impl AsyncWrite for AnyTransport {
181 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
182 match self.get_mut() {
183 Self::Serial(t) => Pin::new(t).poll_write(cx, buf),
184 #[cfg(unix)]
185 Self::Unix(t) => Pin::new(t).poll_write(cx, buf),
186 Self::Tcp(t) => Pin::new(t).poll_write(cx, buf),
187 }
188 }
189
190 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
191 match self.get_mut() {
192 Self::Serial(t) => Pin::new(t).poll_flush(cx),
193 #[cfg(unix)]
194 Self::Unix(t) => Pin::new(t).poll_flush(cx),
195 Self::Tcp(t) => Pin::new(t).poll_flush(cx),
196 }
197 }
198
199 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
200 match self.get_mut() {
201 Self::Serial(t) => Pin::new(t).poll_shutdown(cx),
202 #[cfg(unix)]
203 Self::Unix(t) => Pin::new(t).poll_shutdown(cx),
204 Self::Tcp(t) => Pin::new(t).poll_shutdown(cx),
205 }
206 }
207}