use anyhow::Result;
use bytes::Bytes;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use crate::iroh::{IrohClientBuilder, IrohConnection};
pub struct Client {
send: Arc<Mutex<iroh::endpoint::SendStream>>,
recv: Arc<Mutex<iroh::endpoint::RecvStream>>,
conn: Arc<IrohConnection>,
use_datagrams: bool,
}
impl Client {
pub async fn connect(server_id: &str) -> Result<Self> {
Self::connect_with_options(server_id, false).await
}
pub async fn connect_with_options(server_id: &str, use_datagrams: bool) -> Result<Self> {
tracing::debug!(
"Connecting to server: {} (datagrams={})",
server_id,
use_datagrams
);
let conn = IrohClientBuilder::new()
.connect_str(server_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to server: {}", e))?;
tracing::debug!("Connected to server, opening stream...");
let stream = conn
.open_stream()
.await
.map_err(|e| anyhow::anyhow!("Failed to open stream: {}", e))?;
tracing::debug!("Stream opened successfully");
let (send, recv) = stream.split();
Ok(Self {
send: Arc::new(Mutex::new(send)),
recv: Arc::new(Mutex::new(recv)),
conn: Arc::new(conn),
use_datagrams,
})
}
pub async fn write(&self, data: &[u8]) -> Result<()> {
if self.use_datagrams {
self.conn.send_datagram(Bytes::copy_from_slice(data))?;
return Ok(());
}
let mut send = self.send.lock().await;
send.write_all(data).await?;
drop(send);
tokio::task::yield_now().await;
Ok(())
}
pub async fn write_str(&self, data: &str) -> Result<()> {
self.write(data.as_bytes()).await
}
pub async fn read(&self, buf: &mut [u8]) -> Result<Option<usize>> {
let mut recv = self.recv.lock().await;
Ok(recv.read(buf).await?)
}
pub async fn read_string(&self) -> Result<Option<String>> {
let mut buf = vec![0u8; 4096];
if let Some(n) = self.read(&mut buf).await? {
return Ok(Some(String::from_utf8_lossy(&buf[..n]).to_string()));
}
Ok(None)
}
pub async fn run_interactive(&self) -> Result<()> {
use std::io::{Read, Write};
let recv = self.recv.clone();
let send = self.send.clone();
tokio::spawn(async move {
let mut buf = vec![0u8; 1024];
loop {
let n = {
let mut r = recv.lock().await;
match r.read(&mut buf).await {
Ok(Some(n)) if n > 0 => {
tracing::debug!("Received {} bytes from network", n);
n
}
Ok(Some(_)) => {
tracing::debug!("Received 0 bytes (EOF)");
break;
}
Ok(None) => {
tracing::debug!("Stream closed");
break;
}
Err(e) => {
tracing::debug!("Network read error: {}", e);
break;
}
}
};
let _ = std::io::stdout().write_all(&buf[..n]);
let _ = std::io::stdout().flush();
}
});
loop {
let result = tokio::task::spawn_blocking(|| {
let mut buf = [0u8; 256];
match std::io::stdin().read(&mut buf) {
Ok(n) if n > 0 => Some(buf[..n].to_vec()),
_ => None,
}
})
.await?;
match result {
Some(data) => {
tracing::debug!(
"Sending {} bytes to network: {:?}",
data.len(),
String::from_utf8_lossy(&data)
);
let mut s = send.lock().await;
if let Err(e) = s.write_all(&data).await {
tracing::debug!("Network write error: {}", e);
break;
}
drop(s);
tokio::task::yield_now().await;
tracing::debug!("Sent successfully");
}
None => break,
}
}
Ok(())
}
}
#[derive(Clone)]
pub enum Transport {
Iroh {
alpn: Option<Vec<u8>>,
},
Moq {
relay: String,
token: Option<String>,
insecure: bool,
},
}
impl Default for Transport {
fn default() -> Self {
Transport::Iroh { alpn: None }
}
}
pub struct SerialPortBuilder {
port_name: String,
timeout: Duration,
transport: Transport,
use_datagrams: bool,
}
impl SerialPortBuilder {
pub fn new(port: &str) -> Self {
Self {
port_name: port.to_string(),
timeout: Duration::from_secs(1),
transport: Transport::default(),
use_datagrams: false,
}
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_iroh(mut self) -> Self {
self.transport = Transport::Iroh { alpn: None };
self
}
pub fn alpn(mut self, alpn: &[u8]) -> Self {
if let Transport::Iroh { alpn: ref mut a } = self.transport {
*a = Some(alpn.to_vec());
}
self
}
pub fn with_moq(mut self, relay: &str) -> Self {
self.transport = Transport::Moq {
relay: relay.to_string(),
token: None,
insecure: false,
};
self
}
pub fn insecure(mut self) -> Self {
if let Transport::Moq {
insecure: ref mut i,
..
} = self.transport
{
*i = true;
}
self
}
pub fn token(mut self, token: &str) -> Self {
if let Transport::Moq {
token: ref mut t, ..
} = self.transport
{
*t = Some(token.to_string());
}
self
}
pub fn use_datagrams(mut self, enabled: bool) -> Self {
self.use_datagrams = enabled;
self
}
pub fn open(self) -> Result<RemoteSerialPort> {
let runtime = tokio::runtime::Runtime::new()?;
let use_datagrams = self.use_datagrams;
let client = match self.transport {
Transport::Iroh { alpn } => runtime.block_on(async {
let mut builder = IrohClientBuilder::new();
if let Some(alpn) = alpn {
builder = builder.alpn(&alpn);
}
let conn = builder.connect_str(&self.port_name).await?;
let stream = conn.open_stream().await?;
let conn = Arc::new(conn);
Ok::<_, anyhow::Error>(ClientInner::Iroh {
stream: Arc::new(Mutex::new(stream)),
conn,
use_datagrams,
})
})?,
Transport::Moq {
relay,
token: _,
insecure,
} => runtime.block_on(async {
let stream = if insecure {
crate::moq::MoqStream::connect_to_insecure(&relay, &self.port_name).await?
} else {
crate::moq::MoqStream::connect_to(&relay, &self.port_name).await?
};
Ok::<_, anyhow::Error>(ClientInner::Moq {
stream: Arc::new(Mutex::new(stream)),
})
})?,
};
Ok(RemoteSerialPort {
client,
runtime,
port_name: self.port_name,
timeout: self.timeout,
buffer: Vec::new(),
})
}
}
enum ClientInner {
Iroh {
stream: Arc<Mutex<crate::iroh::IrohStream>>,
conn: Arc<IrohConnection>,
use_datagrams: bool,
},
Moq {
stream: Arc<Mutex<crate::moq::MoqStream>>,
},
}
pub fn new(port: &str) -> SerialPortBuilder {
SerialPortBuilder::new(port)
}
pub struct RemoteSerialPort {
client: ClientInner,
runtime: tokio::runtime::Runtime,
port_name: String,
timeout: Duration,
buffer: Vec<u8>,
}
impl RemoteSerialPort {
pub fn open(port: &str) -> Result<Self> {
new(port).open()
}
pub fn name(&self) -> Option<String> {
Some(self.port_name.clone())
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn set_timeout(&mut self, timeout: Duration) -> Result<()> {
self.timeout = timeout;
Ok(())
}
pub fn bytes_to_read(&self) -> Result<u32> {
Ok(self.buffer.len() as u32)
}
pub fn bytes_to_write(&self) -> Result<u32> {
Ok(0)
}
pub fn clear_input(&mut self) -> Result<()> {
self.buffer.clear();
Ok(())
}
pub fn clear_output(&mut self) -> Result<()> {
Ok(())
}
pub fn clear_all(&mut self) -> Result<()> {
self.buffer.clear();
Ok(())
}
pub fn write_bytes(&mut self, data: &[u8]) -> Result<usize> {
self.runtime.block_on(async {
match &self.client {
ClientInner::Iroh {
stream,
conn,
use_datagrams,
} => {
if *use_datagrams {
conn.send_datagram(Bytes::copy_from_slice(data))?;
} else {
let mut s = stream.lock().await;
s.write(data).await?;
}
}
ClientInner::Moq { stream } => {
let mut s = stream.lock().await;
s.write(data.to_vec());
}
}
Ok::<_, anyhow::Error>(())
})?;
Ok(data.len())
}
pub fn read_bytes(&mut self, buf: &mut [u8]) -> Result<usize> {
if !self.buffer.is_empty() {
let take = std::cmp::min(buf.len(), self.buffer.len());
buf[..take].copy_from_slice(&self.buffer[..take]);
self.buffer.drain(..take);
return Ok(take);
}
let timeout = self.timeout;
let result = self.runtime.block_on(async {
tokio::time::timeout(timeout, async {
match &self.client {
ClientInner::Iroh { stream, .. } => {
let mut s = stream.lock().await;
s.read(buf).await
}
ClientInner::Moq { stream } => {
let mut s = stream.lock().await;
if let Some(data) = s.read().await? {
let n = std::cmp::min(data.len(), buf.len());
buf[..n].copy_from_slice(&data[..n]);
Ok(Some(n))
} else {
Ok(None)
}
}
}
})
.await
});
match result {
Ok(Ok(Some(n))) => Ok(n),
Ok(Ok(None)) => Ok(0),
Ok(Err(e)) => Err(e),
Err(_) => Ok(0), }
}
pub fn read_until(&mut self, byte: u8) -> Result<Vec<u8>> {
let mut result = Vec::new();
if let Some(pos) = self.buffer.iter().position(|&b| b == byte) {
result.extend(self.buffer.drain(..=pos));
return Ok(result);
}
result.append(&mut self.buffer);
let mut temp = [0u8; 256];
loop {
let n = self.read_bytes(&mut temp)?;
if n == 0 {
break;
}
if let Some(pos) = temp[..n].iter().position(|&b| b == byte) {
result.extend_from_slice(&temp[..=pos]);
self.buffer.extend_from_slice(&temp[pos + 1..n]);
break;
}
result.extend_from_slice(&temp[..n]);
}
Ok(result)
}
pub fn read_line(&mut self) -> Result<String> {
let bytes = self.read_until(b'\n')?;
Ok(String::from_utf8_lossy(&bytes).into_owned())
}
}
impl std::io::Read for RemoteSerialPort {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.read_bytes(buf).map_err(std::io::Error::other)
}
}
impl std::io::Write for RemoteSerialPort {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.write_bytes(buf).map_err(std::io::Error::other)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(feature = "serial")]
impl serialport::SerialPort for RemoteSerialPort {
fn name(&self) -> Option<String> {
Some(self.port_name.clone())
}
fn baud_rate(&self) -> serialport::Result<u32> {
Ok(1_000_000)
}
fn data_bits(&self) -> serialport::Result<serialport::DataBits> {
Ok(serialport::DataBits::Eight)
}
fn flow_control(&self) -> serialport::Result<serialport::FlowControl> {
Ok(serialport::FlowControl::None)
}
fn parity(&self) -> serialport::Result<serialport::Parity> {
Ok(serialport::Parity::None)
}
fn stop_bits(&self) -> serialport::Result<serialport::StopBits> {
Ok(serialport::StopBits::One)
}
fn timeout(&self) -> Duration {
self.timeout
}
fn set_baud_rate(&mut self, _: u32) -> serialport::Result<()> {
Ok(())
}
fn set_data_bits(&mut self, _: serialport::DataBits) -> serialport::Result<()> {
Ok(())
}
fn set_flow_control(&mut self, _: serialport::FlowControl) -> serialport::Result<()> {
Ok(())
}
fn set_parity(&mut self, _: serialport::Parity) -> serialport::Result<()> {
Ok(())
}
fn set_stop_bits(&mut self, _: serialport::StopBits) -> serialport::Result<()> {
Ok(())
}
fn set_timeout(&mut self, timeout: Duration) -> serialport::Result<()> {
self.timeout = timeout;
Ok(())
}
fn write_request_to_send(&mut self, _: bool) -> serialport::Result<()> {
Ok(())
}
fn write_data_terminal_ready(&mut self, _: bool) -> serialport::Result<()> {
Ok(())
}
fn read_clear_to_send(&mut self) -> serialport::Result<bool> {
Ok(true)
}
fn read_data_set_ready(&mut self) -> serialport::Result<bool> {
Ok(true)
}
fn read_ring_indicator(&mut self) -> serialport::Result<bool> {
Ok(false)
}
fn read_carrier_detect(&mut self) -> serialport::Result<bool> {
Ok(true)
}
fn bytes_to_read(&self) -> serialport::Result<u32> {
Ok(self.buffer.len() as u32)
}
fn bytes_to_write(&self) -> serialport::Result<u32> {
Ok(0)
}
fn clear(&self, _: serialport::ClearBuffer) -> serialport::Result<()> {
Ok(())
}
fn try_clone(&self) -> serialport::Result<Box<dyn serialport::SerialPort>> {
Err(serialport::Error::new(
serialport::ErrorKind::Io(std::io::ErrorKind::Unsupported),
"Clone not supported for remote serial ports",
))
}
fn set_break(&self) -> serialport::Result<()> {
Ok(())
}
fn clear_break(&self) -> serialport::Result<()> {
Ok(())
}
}