kioto_serial/
posix.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures::{future::FutureExt, stream::StreamExt};
8use pin_project_lite::pin_project;
9use serialport::SerialPort;
10use tokio::io::{AsyncWriteExt, ReadBuf};
11
12// ensure that we never instantiate a NeverOk type
13macro_rules! assert_never {
14    ($never: expr) => {{
15        let _: NeverOk = $never;
16        unreachable!("NeverOk was instantiated");
17    }};
18}
19
20/// Builder to open a serial port.
21///
22/// Create this by calling [new]. Open the port by calling
23/// [SerialPortBuilderExt::open_native_async].
24pub struct SerialPortBuilder {
25    path: String,
26    baud_rate: u32,
27    max_buf_size: usize,
28}
29
30/// Create a [SerialPortBuilder] from a device path and a baud rate.
31pub fn new<'a>(path: impl Into<std::borrow::Cow<'a, str>>, baud_rate: u32) -> SerialPortBuilder {
32    SerialPortBuilder {
33        path: path.into().into_owned(),
34        baud_rate,
35        max_buf_size: 1024,
36    }
37}
38
39impl SerialPortBuilder {
40    /// Set the maximum buffer size in the internal buffer.
41    pub fn max_buf_size(self, max_buf_size: usize) -> Self {
42        Self {
43            max_buf_size,
44            ..self
45        }
46    }
47}
48
49/// Provides a convenience function for maximum compatibility with `tokio-serial`.
50pub trait SerialPortBuilderExt {
51    /// Open a serial port and return it as a [SerialStream].
52    fn open_native_async(self) -> std::io::Result<SerialStream>;
53}
54
55impl SerialPortBuilderExt for SerialPortBuilder {
56    fn open_native_async(self) -> std::io::Result<SerialStream> {
57        let port = serialport::new(self.path, self.baud_rate).open()?;
58        open(port, self.max_buf_size)
59    }
60}
61
62pin_project! {
63    /// An asynchronous implementation of a serial port.
64    ///
65    /// Implements both [tokio::io::AsyncRead] and [tokio::io::AsyncWrite].
66    ///
67    /// This could be wrapped with
68    /// [`tokio_util::codec::Framed`](https://docs.rs/tokio-util/0.7.11/tokio_util/codec/struct.Framed.html),
69    /// for example.
70    pub struct SerialStream {
71        #[pin]
72        read_err: Pin<Box<dyn Future<Output = Result<NeverOk, Error>> + Send>>,
73        #[pin]
74        write_err: Pin<Box<dyn Future<Output = Result<NeverOk, Error>> + Send>>,
75        #[pin]
76        reader_duplex: tokio::io::ReadHalf<tokio::io::DuplexStream>,
77        #[pin]
78        writer_duplex: tokio::io::WriteHalf<tokio::io::DuplexStream>,
79    }
80}
81
82// ----------- implementation details below here -----------
83
84impl tokio::io::AsyncRead for SerialStream {
85    fn poll_read(
86        self: Pin<&mut Self>,
87        cx: &mut Context<'_>,
88        buf: &mut ReadBuf<'_>,
89    ) -> Poll<Result<(), std::io::Error>> {
90        let this = self.project();
91        match this.read_err.poll(cx) {
92            Poll::Pending => this.reader_duplex.poll_read(cx, buf),
93            Poll::Ready(res) => Poll::Ready(to_std_io(res)),
94        }
95    }
96}
97
98impl tokio::io::AsyncWrite for SerialStream {
99    fn poll_write(
100        self: Pin<&mut Self>,
101        cx: &mut Context<'_>,
102        buf: &[u8],
103    ) -> Poll<Result<usize, std::io::Error>> {
104        let this = self.project();
105        match this.write_err.poll(cx) {
106            Poll::Pending => this.writer_duplex.poll_write(cx, buf),
107            Poll::Ready(res) => Poll::Ready(to_std_io(res)),
108        }
109    }
110    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
111        let this = self.project();
112        match this.write_err.poll(cx) {
113            Poll::Pending => this.writer_duplex.poll_flush(cx),
114            Poll::Ready(res) => Poll::Ready(to_std_io(res)),
115        }
116    }
117    fn poll_shutdown(
118        self: Pin<&mut Self>,
119        cx: &mut Context<'_>,
120    ) -> Poll<Result<(), std::io::Error>> {
121        let this = self.project();
122        match this.write_err.poll(cx) {
123            Poll::Pending => this.writer_duplex.poll_shutdown(cx),
124            Poll::Ready(res) => Poll::Ready(to_std_io(res)),
125        }
126    }
127}
128
129/// A zero-sized type which is never created to indicate that Ok(_) never
130/// happens.
131#[derive(Debug)]
132enum NeverOk {}
133
134#[derive(thiserror::Error, Debug)]
135enum Error {
136    #[error("IO error {0}")]
137    Io(#[from] std::io::Error),
138    #[error("sending thread paniced {0}")]
139    OneshotRecv(tokio::sync::oneshot::error::RecvError),
140    #[error("sending channel closed")]
141    SenderClosed,
142}
143
144/// Read loop, launched on own thread. Returns only on error.
145fn reader(
146    mut port: Box<dyn SerialPort>,
147    mut tx: tokio::io::WriteHalf<tokio::io::DuplexStream>,
148) -> Result<NeverOk, Error> {
149    let mut buffer = vec![0u8; 1024];
150    loop {
151        let sz = port.read(&mut buffer)?;
152        futures::executor::block_on(tx.write_all(&buffer[..sz]))?;
153    }
154}
155
156/// Write loop, launched on own thread. Returns only on error.
157fn writer(
158    mut port: Box<dyn SerialPort>,
159    rx: tokio::io::ReadHalf<tokio::io::DuplexStream>,
160) -> Result<NeverOk, Error> {
161    let mut rx = tokio_util::io::ReaderStream::new(rx);
162    while let Some(buf) = futures::executor::block_on(rx.next()) {
163        let buf = buf?;
164        port.write_all(&buf[..])?
165    }
166    Err(Error::SenderClosed)
167}
168
169/// Opens a serial port and returns a [SerialStream] to read and write to
170/// it.
171///
172/// Reading and writing to the serial port is handled by two newly spawned
173/// threads.
174fn open(
175    mut port: Box<dyn serialport::SerialPort>,
176    max_buf_size: usize,
177) -> std::io::Result<SerialStream> {
178    // Convert port to blocking (more-or-less). Actually a 100 year timeout.
179    // See https://github.com/serialport/serialport-rs/pull/185 for full blocking.
180    port.set_timeout(std::time::Duration::from_secs(60 * 60 * 24 * 365 * 100))?;
181
182    let write_port = port.try_clone()?;
183
184    let (for_rw_threads, duplex) = tokio::io::duplex(max_buf_size);
185    let (read_half, write_half) = tokio::io::split(for_rw_threads);
186    let (read_thread_result_tx, read_thread_result_rx) = tokio::sync::oneshot::channel();
187    std::thread::spawn(move || {
188        read_thread_result_tx
189            .send(reader(port, write_half))
190            .unwrap();
191    });
192    let (write_thread_result_tx, write_thread_result_rx) = tokio::sync::oneshot::channel();
193    std::thread::spawn(move || {
194        write_thread_result_tx
195            .send(writer(write_port, read_half))
196            .unwrap();
197    });
198
199    let (reader_duplex, writer_duplex) = tokio::io::split(duplex);
200    Ok(SerialStream {
201        read_err: Box::pin(read_thread_result_rx.map(flatten)),
202        write_err: Box::pin(write_thread_result_rx.map(flatten)),
203        reader_duplex,
204        writer_duplex,
205    })
206}
207
208/// convert our Result type to Result from std::io
209fn to_std_io<T>(res: Result<NeverOk, Error>) -> std::io::Result<T> {
210    match res {
211        Ok(never) => assert_never!(never),
212        Err(e) => match e {
213            Error::Io(e) => Err(e),
214            other => Err(std::io::Error::new(
215                std::io::ErrorKind::Other,
216                format!("{other}"),
217            )),
218        },
219    }
220}
221
222/// flatten Result<Result<_>> to Result<_>
223fn flatten(
224    full: Result<Result<NeverOk, Error>, tokio::sync::oneshot::error::RecvError>,
225) -> Result<NeverOk, Error> {
226    match full {
227        Ok(res) => res,
228        Err(e) => Err(Error::OneshotRecv(e)),
229    }
230}