1use bytes::{Buf, Bytes};
2use camino::{Utf8Path, Utf8PathBuf};
3use thiserror::Error;
4use tokio::io::{AsyncRead, AsyncWrite};
5#[cfg(unix)]
6use tokio_serial::SerialPort;
7use tokio_serial::SerialPortBuilderExt;
8use tokio_serial::SerialStream;
9use tokio_stream::{StreamExt, StreamMap};
10use tokio_util::io::ReaderStream;
11use tracing::{error, info, warn};
12
13use std::collections::HashMap;
14use std::fs;
15use std::io::ErrorKind;
16use std::pin::Pin;
17use std::task::Poll::Ready;
18
19#[cfg(unix)]
20use std::os::unix;
21
22#[derive(Error, Debug)]
23pub enum Error {
24 #[error("could not create link to pty")]
25 Link(#[source] std::io::Error),
26
27 #[error("serial error")]
28 Serial(#[source] tokio_serial::Error),
29
30 #[error("stream closed")]
31 Closed,
32
33 #[error("read error")]
34 Read(#[source] std::io::Error),
35
36 #[error("write error")]
37 Write(#[source] std::io::Error),
38}
39
40pub struct PtyLink {
41 _subordinate: SerialStream,
46 link: Utf8PathBuf,
47}
48
49pub type Result<T> = std::result::Result<T, Error>;
50
51#[cfg(unix)]
52pub fn create_virtual_serial_port<P>(path: P) -> Result<(SerialStream, PtyLink)>
53where
54 P: AsRef<Utf8Path>,
55{
56 let (manager, subordinate) = SerialStream::pair().map_err(Error::Serial)?;
57 let link = PtyLink::new(subordinate, path)?;
58
59 Ok((manager, link))
60}
61
62pub fn open_physical_serial_port<P>(path: P, baud_rate: u32) -> Result<SerialStream>
63where
64 P: AsRef<Utf8Path>,
65{
66 tokio_serial::new(path.as_ref().as_str(), baud_rate)
67 .open_native_async()
68 .map_err(Error::Serial)
69}
70
71#[cfg(unix)]
72impl PtyLink {
73 fn new<P: AsRef<Utf8Path>>(subordinate: SerialStream, path: P) -> Result<Self> {
74 let link = path.as_ref().to_path_buf();
75 unix::fs::symlink(subordinate.name().unwrap(), link.as_std_path()).map_err(Error::Link)?;
76
77 Ok(PtyLink {
78 _subordinate: subordinate,
79 link,
80 })
81 }
82
83 pub fn link(&self) -> &Utf8Path {
84 self.link.as_path()
85 }
86
87 pub fn id(&self) -> &str {
88 self.link.as_str()
89 }
90}
91
92impl Drop for PtyLink {
93 fn drop(&mut self) {
94 if fs::remove_file(&self.link).is_err() {
95 eprintln!("error: could not delete {}", self.link);
96 }
97 }
98}
99
100#[tracing::instrument(skip_all)]
101pub async fn transfer<R, W>(
102 mut sources: StreamMap<String, ReaderStream<R>>,
103 mut sinks: HashMap<String, W>,
104 routes: HashMap<String, Vec<String>>,
105) -> Result<()>
106where
107 R: AsyncRead + Unpin,
108 W: AsyncWrite + Unpin,
109{
110 while let Some((src_id, result)) = sources.next().await {
111 if let Some(dst_ids) = routes.get(&src_id) {
112 let bytes = result.map_err(Error::Read)?;
113 info!(?src_id, ?dst_ids, ?bytes, "read");
114 for dst_id in dst_ids {
115 if let Some(dst) = sinks.get_mut(dst_id) {
116 let mut buf = bytes.clone();
117 if let Err(e) = write_non_blocking(dst, &mut buf).await {
118 if let Error::Write(io_err) = &e {
119 if io_err.kind() == ErrorKind::WouldBlock {
120 warn!(?dst_id, ?bytes, "discarded");
121 } else {
122 error!(?dst_id, ?e, "write error");
123 }
124 }
125 } else {
126 info!(?dst_id, ?bytes, "wrote");
127 }
128 }
129 }
130 }
131 }
132
133 Ok(())
134}
135
136async fn write_non_blocking<W: AsyncWrite + Unpin>(dst: &mut W, buf: &mut Bytes) -> Result<()> {
137 let waker = futures::task::noop_waker();
138 let mut cx = futures::task::Context::from_waker(&waker);
139
140 let pinned_dst = Pin::new(dst);
141 match pinned_dst.poll_write(&mut cx, buf) {
142 Ready(Ok(bytes_written)) => {
143 buf.advance(bytes_written);
144 Ok(())
145 }
146 Ready(Err(e)) => Err(Error::Write(e)),
147 _ => Err(Error::Write(std::io::Error::new(
148 ErrorKind::WouldBlock,
149 "Would block",
150 ))),
151 }
152}