#[cfg(feature = "hardware")]
mod inner {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, WriteHalf};
use tokio::sync::Mutex;
use tokio_serial::SerialPortBuilderExt;
use tracing::{debug, error, info, warn};
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::channels::types::{BaseChannelConfig, Channel};
use crate::config::SerialChannelConfig;
use crate::error::{Result, ZeptoError};
use crate::peripherals::validate_serial_path;
#[derive(Debug, Deserialize)]
struct SerialInbound {
#[serde(rename = "type")]
msg_type: String,
text: String,
#[serde(default)]
sender: String,
}
#[derive(Debug, Serialize)]
struct SerialOutbound {
#[serde(rename = "type")]
msg_type: String,
text: String,
}
pub struct SerialChannel {
config: SerialChannelConfig,
base_config: BaseChannelConfig,
bus: Arc<MessageBus>,
running: Arc<AtomicBool>,
writer: Option<Arc<Mutex<WriteHalf<tokio_serial::SerialStream>>>>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl SerialChannel {
pub fn new(config: SerialChannelConfig, bus: Arc<MessageBus>) -> Self {
let base_config = BaseChannelConfig {
name: "serial".to_string(),
allowlist: config.allow_from.clone(),
deny_by_default: config.deny_by_default,
};
Self {
config,
base_config,
bus,
running: Arc::new(AtomicBool::new(false)),
writer: None,
shutdown_tx: None,
}
}
}
#[async_trait]
impl Channel for SerialChannel {
fn name(&self) -> &str {
"serial"
}
async fn start(&mut self) -> Result<()> {
validate_serial_path(&self.config.port).map_err(ZeptoError::Config)?;
let stream = tokio_serial::new(&self.config.port, self.config.baud_rate)
.open_native_async()
.map_err(|e| {
ZeptoError::Config(format!(
"Failed to open serial port {}: {}",
self.config.port, e
))
})?;
let (read_half, write_half) = tokio::io::split(stream);
let writer = Arc::new(Mutex::new(write_half));
self.writer = Some(writer.clone());
self.running.store(true, Ordering::SeqCst);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
self.shutdown_tx = Some(shutdown_tx);
let bus = self.bus.clone();
let running = self.running.clone();
let channel_name = "serial".to_string();
let allow_from = self.config.allow_from.clone();
let deny_by_default = self.config.deny_by_default;
tokio::spawn(async move {
let task_result = std::panic::AssertUnwindSafe(async move {
let mut shutdown_rx = shutdown_rx;
let mut reader = BufReader::new(read_half);
loop {
let mut buf = String::new();
let read_result = tokio::select! {
result = reader.read_line(&mut buf) => result,
_ = &mut shutdown_rx => {
info!("Serial channel shutdown signal received");
break;
}
};
match read_result {
Ok(0) => {
info!("Serial channel: port closed (EOF)");
break;
}
Ok(_) => {}
Err(e) => {
error!("Serial read error: {}", e);
break;
}
}
let trimmed = buf.trim();
if trimmed.is_empty() {
continue;
}
let inbound: SerialInbound = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
warn!(
"Serial: failed to parse inbound JSON: {} — {:?}",
e, trimmed
);
continue;
}
};
if inbound.msg_type != "message" {
debug!("Serial: ignoring non-message type '{}'", inbound.msg_type);
continue;
}
let sender = if inbound.sender.is_empty() {
"serial-device".to_string()
} else {
inbound.sender.clone()
};
let allowed = if allow_from.is_empty() {
!deny_by_default
} else {
allow_from.contains(&sender)
};
if !allowed {
warn!("Serial: message from '{}' denied by allowlist", sender);
continue;
}
let msg = InboundMessage::new(
&channel_name,
&sender,
&channel_name,
&inbound.text,
);
if let Err(e) = bus.publish_inbound(msg).await {
error!("Serial: failed to publish inbound message: {}", e);
}
}
})
.catch_unwind()
.await;
if task_result.is_err() {
error!("Serial read loop task panicked");
}
running.store(false, Ordering::SeqCst);
info!("Serial channel stopped");
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.running.store(false, Ordering::SeqCst);
if let Some(tx) = self.shutdown_tx.take() {
if tx.send(()).is_err() {
warn!("Serial shutdown receiver already dropped");
}
}
self.writer = None;
Ok(())
}
async fn send(&self, msg: OutboundMessage) -> Result<()> {
let writer = match &self.writer {
Some(w) => w.clone(),
None => return Err(ZeptoError::Config("Serial channel not started".to_string())),
};
let outbound = SerialOutbound {
msg_type: "response".to_string(),
text: msg.content,
};
let mut line = serde_json::to_string(&outbound)
.map_err(|e| ZeptoError::Tool(format!("Serial serialize error: {e}")))?;
line.push('\n');
let mut guard = writer.lock().await;
guard
.write_all(line.as_bytes())
.await
.map_err(|e| ZeptoError::Tool(format!("Serial write error: {e}")))?;
guard
.flush()
.await
.map_err(|e| ZeptoError::Tool(format!("Serial flush error: {e}")))?;
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn is_allowed(&self, user_id: &str) -> bool {
self.base_config.is_allowed(user_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::MessageBus;
use crate::config::SerialChannelConfig;
fn make_channel(config: SerialChannelConfig) -> SerialChannel {
let bus = Arc::new(MessageBus::new());
SerialChannel::new(config, bus)
}
#[test]
fn test_serial_channel_name() {
let ch = make_channel(SerialChannelConfig {
port: "/dev/ttyUSB0".to_string(),
..Default::default()
});
assert_eq!(ch.name(), "serial");
}
#[test]
fn test_serial_channel_not_running_initially() {
let ch = make_channel(SerialChannelConfig {
port: "/dev/ttyUSB0".to_string(),
..Default::default()
});
assert!(!ch.is_running());
}
#[test]
fn test_serial_channel_allowlist() {
let ch = make_channel(SerialChannelConfig {
port: "/dev/ttyUSB0".to_string(),
allow_from: vec!["esp32-0".to_string()],
..Default::default()
});
assert!(ch.is_allowed("esp32-0"));
assert!(!ch.is_allowed("esp32-1"));
}
#[test]
fn test_serial_channel_deny_by_default() {
let ch = make_channel(SerialChannelConfig {
port: "/dev/ttyUSB0".to_string(),
allow_from: vec![],
deny_by_default: true,
..Default::default()
});
assert!(!ch.is_allowed("anyone"));
}
#[test]
fn test_serial_outbound_serialization() {
let outbound = SerialOutbound {
msg_type: "response".to_string(),
text: "Hi!".to_string(),
};
let json = serde_json::to_string(&outbound).unwrap();
assert!(json.contains("\"type\":\"response\""));
assert!(json.contains("\"text\":\"Hi!\""));
}
#[test]
fn test_serial_inbound_deserialization() {
let raw = r#"{"type":"message","text":"Hello","sender":"esp32-0"}"#;
let inbound: SerialInbound = serde_json::from_str(raw).unwrap();
assert_eq!(inbound.msg_type, "message");
assert_eq!(inbound.text, "Hello");
assert_eq!(inbound.sender, "esp32-0");
}
#[test]
fn test_serial_channel_running_flag_is_atomic() {
let ch = make_channel(SerialChannelConfig {
port: "/dev/ttyUSB0".to_string(),
..Default::default()
});
assert!(!ch.is_running());
ch.running.store(true, Ordering::SeqCst);
assert!(ch.is_running());
ch.running.store(false, Ordering::SeqCst);
assert!(!ch.is_running());
}
}
}
#[cfg(feature = "hardware")]
pub use inner::SerialChannel;