1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::{future::FutureExt, stream::StreamExt};
8use pin_project_lite::pin_project;
9use serialport::SerialPort;
10use tokio::io::{AsyncWriteExt, ReadBuf};
11
12macro_rules! assert_never {
14 ($never: expr) => {{
15 let _: NeverOk = $never;
16 unreachable!("NeverOk was instantiated");
17 }};
18}
19
20pub struct SerialPortBuilder {
25 path: String,
26 baud_rate: u32,
27 max_buf_size: usize,
28}
29
30pub fn new<'a>(path: impl Into<std::borrow::Cow<'a, str>>, baud_rate: u32) -> SerialPortBuilder {
32 SerialPortBuilder {
33 path: path.into().into_owned(),
34 baud_rate,
35 max_buf_size: 1024,
36 }
37}
38
39impl SerialPortBuilder {
40 pub fn max_buf_size(self, max_buf_size: usize) -> Self {
42 Self {
43 max_buf_size,
44 ..self
45 }
46 }
47}
48
49pub trait SerialPortBuilderExt {
51 fn open_native_async(self) -> std::io::Result<SerialStream>;
53}
54
55impl SerialPortBuilderExt for SerialPortBuilder {
56 fn open_native_async(self) -> std::io::Result<SerialStream> {
57 let port = serialport::new(self.path, self.baud_rate).open()?;
58 open(port, self.max_buf_size)
59 }
60}
61
62pin_project! {
63 pub struct SerialStream {
71 #[pin]
72 read_err: Pin<Box<dyn Future<Output = Result<NeverOk, Error>> + Send>>,
73 #[pin]
74 write_err: Pin<Box<dyn Future<Output = Result<NeverOk, Error>> + Send>>,
75 #[pin]
76 reader_duplex: tokio::io::ReadHalf<tokio::io::DuplexStream>,
77 #[pin]
78 writer_duplex: tokio::io::WriteHalf<tokio::io::DuplexStream>,
79 }
80}
81
82impl tokio::io::AsyncRead for SerialStream {
85 fn poll_read(
86 self: Pin<&mut Self>,
87 cx: &mut Context<'_>,
88 buf: &mut ReadBuf<'_>,
89 ) -> Poll<Result<(), std::io::Error>> {
90 let this = self.project();
91 match this.read_err.poll(cx) {
92 Poll::Pending => this.reader_duplex.poll_read(cx, buf),
93 Poll::Ready(res) => Poll::Ready(to_std_io(res)),
94 }
95 }
96}
97
98impl tokio::io::AsyncWrite for SerialStream {
99 fn poll_write(
100 self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 buf: &[u8],
103 ) -> Poll<Result<usize, std::io::Error>> {
104 let this = self.project();
105 match this.write_err.poll(cx) {
106 Poll::Pending => this.writer_duplex.poll_write(cx, buf),
107 Poll::Ready(res) => Poll::Ready(to_std_io(res)),
108 }
109 }
110 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
111 let this = self.project();
112 match this.write_err.poll(cx) {
113 Poll::Pending => this.writer_duplex.poll_flush(cx),
114 Poll::Ready(res) => Poll::Ready(to_std_io(res)),
115 }
116 }
117 fn poll_shutdown(
118 self: Pin<&mut Self>,
119 cx: &mut Context<'_>,
120 ) -> Poll<Result<(), std::io::Error>> {
121 let this = self.project();
122 match this.write_err.poll(cx) {
123 Poll::Pending => this.writer_duplex.poll_shutdown(cx),
124 Poll::Ready(res) => Poll::Ready(to_std_io(res)),
125 }
126 }
127}
128
129#[derive(Debug)]
132enum NeverOk {}
133
134#[derive(thiserror::Error, Debug)]
135enum Error {
136 #[error("IO error {0}")]
137 Io(#[from] std::io::Error),
138 #[error("sending thread paniced {0}")]
139 OneshotRecv(tokio::sync::oneshot::error::RecvError),
140 #[error("sending channel closed")]
141 SenderClosed,
142}
143
144fn reader(
146 mut port: Box<dyn SerialPort>,
147 mut tx: tokio::io::WriteHalf<tokio::io::DuplexStream>,
148) -> Result<NeverOk, Error> {
149 let mut buffer = vec![0u8; 1024];
150 loop {
151 let sz = port.read(&mut buffer)?;
152 futures::executor::block_on(tx.write_all(&buffer[..sz]))?;
153 }
154}
155
156fn writer(
158 mut port: Box<dyn SerialPort>,
159 rx: tokio::io::ReadHalf<tokio::io::DuplexStream>,
160) -> Result<NeverOk, Error> {
161 let mut rx = tokio_util::io::ReaderStream::new(rx);
162 while let Some(buf) = futures::executor::block_on(rx.next()) {
163 let buf = buf?;
164 port.write_all(&buf[..])?
165 }
166 Err(Error::SenderClosed)
167}
168
169fn open(
175 mut port: Box<dyn serialport::SerialPort>,
176 max_buf_size: usize,
177) -> std::io::Result<SerialStream> {
178 port.set_timeout(std::time::Duration::from_secs(60 * 60 * 24 * 365 * 100))?;
181
182 let write_port = port.try_clone()?;
183
184 let (for_rw_threads, duplex) = tokio::io::duplex(max_buf_size);
185 let (read_half, write_half) = tokio::io::split(for_rw_threads);
186 let (read_thread_result_tx, read_thread_result_rx) = tokio::sync::oneshot::channel();
187 std::thread::spawn(move || {
188 read_thread_result_tx
189 .send(reader(port, write_half))
190 .unwrap();
191 });
192 let (write_thread_result_tx, write_thread_result_rx) = tokio::sync::oneshot::channel();
193 std::thread::spawn(move || {
194 write_thread_result_tx
195 .send(writer(write_port, read_half))
196 .unwrap();
197 });
198
199 let (reader_duplex, writer_duplex) = tokio::io::split(duplex);
200 Ok(SerialStream {
201 read_err: Box::pin(read_thread_result_rx.map(flatten)),
202 write_err: Box::pin(write_thread_result_rx.map(flatten)),
203 reader_duplex,
204 writer_duplex,
205 })
206}
207
208fn to_std_io<T>(res: Result<NeverOk, Error>) -> std::io::Result<T> {
210 match res {
211 Ok(never) => assert_never!(never),
212 Err(e) => match e {
213 Error::Io(e) => Err(e),
214 other => Err(std::io::Error::new(
215 std::io::ErrorKind::Other,
216 format!("{other}"),
217 )),
218 },
219 }
220}
221
222fn flatten(
224 full: Result<Result<NeverOk, Error>, tokio::sync::oneshot::error::RecvError>,
225) -> Result<NeverOk, Error> {
226 match full {
227 Ok(res) => res,
228 Err(e) => Err(Error::OneshotRecv(e)),
229 }
230}