use anyhow::{Context, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use super::config::SerialConfig;
pub enum SerialEvent {
Data(Vec<u8>, Instant),
Error(String),
Disconnected,
}
pub struct SerialConnection {
write_tx: Option<Sender<Vec<u8>>>,
stop_flag: Arc<AtomicBool>,
reader_handle: Option<thread::JoinHandle<()>>,
writer_handle: Option<thread::JoinHandle<()>>,
port_name: String,
tx_count: Arc<std::sync::atomic::AtomicU64>,
pub rx_bytes: u64,
}
impl SerialConnection {
pub fn open(
port_name: &str,
config: &SerialConfig,
tx: Sender<SerialEvent>,
) -> Result<Self> {
let port = serialport::new(port_name, config.baud_rate)
.data_bits(config.data_bits)
.parity(config.parity)
.stop_bits(config.stop_bits)
.flow_control(config.flow_control)
.timeout(Duration::from_millis(100))
.open()
.with_context(|| format!("Failed to open serial port: {}", port_name))?;
let stop_flag = Arc::new(AtomicBool::new(false));
let tx_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
let reader_port = port.try_clone()
.context("Failed to clone serial port for reader thread")?;
let reader_stop = stop_flag.clone();
let reader_handle = thread::Builder::new()
.name(format!("serial-reader-{}", port_name))
.spawn(move || {
Self::reader_loop(reader_port, tx, reader_stop);
})
.context("Failed to spawn serial reader thread")?;
let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>();
let writer_stop = stop_flag.clone();
let writer_tx_count = tx_count.clone();
let writer_handle = thread::Builder::new()
.name(format!("serial-writer-{}", port_name))
.spawn(move || {
Self::writer_loop(port, write_rx, writer_stop, writer_tx_count);
})
.context("Failed to spawn serial writer thread")?;
Ok(Self {
write_tx: Some(write_tx),
stop_flag,
reader_handle: Some(reader_handle),
writer_handle: Some(writer_handle),
port_name: port_name.to_string(),
tx_count,
rx_bytes: 0,
})
}
fn reader_loop(
mut port: Box<dyn serialport::SerialPort>,
tx: Sender<SerialEvent>,
stop: Arc<AtomicBool>,
) {
let mut buf = [0u8; 1024];
loop {
if stop.load(Ordering::Relaxed) {
break;
}
match port.read(&mut buf) {
Ok(n) if n > 0 => {
if tx.send(SerialEvent::Data(buf[..n].to_vec(), Instant::now())).is_err() {
break; }
}
Ok(_) => {} Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => {
}
Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe
|| e.kind() == std::io::ErrorKind::PermissionDenied =>
{
let _ = tx.send(SerialEvent::Disconnected);
break;
}
Err(e) => {
let _ = tx.send(SerialEvent::Error(e.to_string()));
break;
}
}
}
}
fn writer_loop(
mut port: Box<dyn serialport::SerialPort>,
rx: mpsc::Receiver<Vec<u8>>,
stop: Arc<AtomicBool>,
tx_count: Arc<std::sync::atomic::AtomicU64>,
) {
use std::io::Write;
while !stop.load(Ordering::Relaxed) {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(data) => {
match port.write_all(&data) {
Ok(_) => {
let _ = port.flush();
tx_count.fetch_add(data.len() as u64, Ordering::Relaxed);
}
Err(_) => break,
}
}
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
}
pub fn write(&self, data: &[u8]) -> Result<usize> {
let len = data.len();
if let Some(tx) = &self.write_tx {
tx.send(data.to_vec())
.map_err(|_| anyhow::anyhow!("Writer thread disconnected"))?;
}
Ok(len)
}
pub fn tx_bytes(&self) -> u64 {
self.tx_count.load(Ordering::Relaxed)
}
pub fn port_name(&self) -> &str {
&self.port_name
}
pub fn close(mut self) {
self.stop_flag.store(true, Ordering::Relaxed);
self.write_tx.take(); if let Some(handle) = self.reader_handle.take() {
let _ = handle.join();
}
if let Some(handle) = self.writer_handle.take() {
let _ = handle.join();
}
}
}
impl Drop for SerialConnection {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::Relaxed);
}
}