vsp_router/
lib.rs

1use bytes::{Buf, Bytes};
2use camino::{Utf8Path, Utf8PathBuf};
3use thiserror::Error;
4use tokio::io::{AsyncRead, AsyncWrite};
5#[cfg(unix)]
6use tokio_serial::SerialPort;
7use tokio_serial::SerialPortBuilderExt;
8use tokio_serial::SerialStream;
9use tokio_stream::{StreamExt, StreamMap};
10use tokio_util::io::ReaderStream;
11use tracing::{error, info, warn};
12
13use std::collections::HashMap;
14use std::fs;
15use std::io::ErrorKind;
16use std::pin::Pin;
17use std::task::Poll::Ready;
18
19#[cfg(unix)]
20use std::os::unix;
21
22#[derive(Error, Debug)]
23pub enum Error {
24    #[error("could not create link to pty")]
25    Link(#[source] std::io::Error),
26
27    #[error("serial error")]
28    Serial(#[source] tokio_serial::Error),
29
30    #[error("stream closed")]
31    Closed,
32
33    #[error("read error")]
34    Read(#[source] std::io::Error),
35
36    #[error("write error")]
37    Write(#[source] std::io::Error),
38}
39
40pub struct PtyLink {
41    // Not used directly but need to keep around to prevent early close of the file descriptor.
42    //
43    // tokio_serial::SerialStream includes a mio_serial::SerialStream which includes a
44    // serialport::TTY which includes a Drop impl that closes the file descriptor.
45    _subordinate: SerialStream,
46    link: Utf8PathBuf,
47}
48
49pub type Result<T> = std::result::Result<T, Error>;
50
51#[cfg(unix)]
52pub fn create_virtual_serial_port<P>(path: P) -> Result<(SerialStream, PtyLink)>
53where
54    P: AsRef<Utf8Path>,
55{
56    let (manager, subordinate) = SerialStream::pair().map_err(Error::Serial)?;
57    let link = PtyLink::new(subordinate, path)?;
58
59    Ok((manager, link))
60}
61
62pub fn open_physical_serial_port<P>(path: P, baud_rate: u32) -> Result<SerialStream>
63where
64    P: AsRef<Utf8Path>,
65{
66    tokio_serial::new(path.as_ref().as_str(), baud_rate)
67        .open_native_async()
68        .map_err(Error::Serial)
69}
70
71#[cfg(unix)]
72impl PtyLink {
73    fn new<P: AsRef<Utf8Path>>(subordinate: SerialStream, path: P) -> Result<Self> {
74        let link = path.as_ref().to_path_buf();
75        unix::fs::symlink(subordinate.name().unwrap(), link.as_std_path()).map_err(Error::Link)?;
76
77        Ok(PtyLink {
78            _subordinate: subordinate,
79            link,
80        })
81    }
82
83    pub fn link(&self) -> &Utf8Path {
84        self.link.as_path()
85    }
86
87    pub fn id(&self) -> &str {
88        self.link.as_str()
89    }
90}
91
92impl Drop for PtyLink {
93    fn drop(&mut self) {
94        if fs::remove_file(&self.link).is_err() {
95            eprintln!("error: could not delete {}", self.link);
96        }
97    }
98}
99
100#[tracing::instrument(skip_all)]
101pub async fn transfer<R, W>(
102    mut sources: StreamMap<String, ReaderStream<R>>,
103    mut sinks: HashMap<String, W>,
104    routes: HashMap<String, Vec<String>>,
105) -> Result<()>
106where
107    R: AsyncRead + Unpin,
108    W: AsyncWrite + Unpin,
109{
110    while let Some((src_id, result)) = sources.next().await {
111        if let Some(dst_ids) = routes.get(&src_id) {
112            let bytes = result.map_err(Error::Read)?;
113            info!(?src_id, ?dst_ids, ?bytes, "read");
114            for dst_id in dst_ids {
115                if let Some(dst) = sinks.get_mut(dst_id) {
116                    let mut buf = bytes.clone();
117                    if let Err(e) = write_non_blocking(dst, &mut buf).await {
118                        if let Error::Write(io_err) = &e {
119                            if io_err.kind() == ErrorKind::WouldBlock {
120                                warn!(?dst_id, ?bytes, "discarded");
121                            } else {
122                                error!(?dst_id, ?e, "write error");
123                            }
124                        }
125                    } else {
126                        info!(?dst_id, ?bytes, "wrote");
127                    }
128                }
129            }
130        }
131    }
132
133    Ok(())
134}
135
136async fn write_non_blocking<W: AsyncWrite + Unpin>(dst: &mut W, buf: &mut Bytes) -> Result<()> {
137    let waker = futures::task::noop_waker();
138    let mut cx = futures::task::Context::from_waker(&waker);
139
140    let pinned_dst = Pin::new(dst);
141    match pinned_dst.poll_write(&mut cx, buf) {
142        Ready(Ok(bytes_written)) => {
143            buf.advance(bytes_written);
144            Ok(())
145        }
146        Ready(Err(e)) => Err(Error::Write(e)),
147        _ => Err(Error::Write(std::io::Error::new(
148            ErrorKind::WouldBlock,
149            "Would block",
150        ))),
151    }
152}