use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
use tokio::sync::{broadcast, mpsc, watch};
use tokio::time::{Duration, sleep};
use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialPortType};
use crate::models::{LogMode, OutputMode};
const DEFAULT_BAUD_RATE: u32 = 115_200;
const ESP_USB_VIDS: &[u16] = &[
0x10C4, 0x1A86, 0x303A, ];
pub fn detect_esp_port() -> Result<String, String> {
if let Ok(port) = std::env::var("CSI_SERIAL_PORT") {
tracing::info!("Using CSI_SERIAL_PORT override: {port}");
return Ok(port);
}
let ports = tokio_serial::available_ports()
.map_err(|e| format!("Failed to enumerate serial ports: {e}"))?;
for port in &ports {
if let SerialPortType::UsbPort(ref info) = port.port_type {
let name_ok = port.port_name.contains("usbserial")
|| port.port_name.contains("usbmodem")
|| port.port_name.contains("ttyUSB")
|| port.port_name.contains("ttyACM");
let vid_ok = ESP_USB_VIDS.contains(&info.vid);
if name_ok || vid_ok {
let product = info
.product
.as_deref()
.map(|p| format!(", {p}"))
.unwrap_or_default();
tracing::info!(
"Auto-detected ESP port: {} (VID:{:04X} PID:{:04X}{product})",
port.port_name,
info.vid,
info.pid,
);
return Ok(port.port_name.clone());
}
}
}
for port in &ports {
if matches!(port.port_type, SerialPortType::UsbPort(_)) {
tracing::warn!(
"No known ESP port found — using first USB port: {}",
port.port_name
);
return Ok(port.port_name.clone());
}
}
let names: Vec<&str> = ports.iter().map(|p| p.port_name.as_str()).collect();
Err(format!(
"No USB serial ports detected. Available ports: [{}]",
names.join(", ")
))
}
pub async fn run_serial_task(
initial_port_path: String,
mut cmd_rx: mpsc::Receiver<String>,
csi_tx: broadcast::Sender<Vec<u8>>,
mut log_mode_rx: watch::Receiver<LogMode>,
mut output_mode_rx: watch::Receiver<OutputMode>,
mut session_file_rx: watch::Receiver<Option<String>>,
serial_connected: Arc<AtomicBool>,
collection_running: Arc<AtomicBool>,
shared_port_path: Arc<Mutex<String>>,
) {
let baud = std::env::var("CSI_BAUD_RATE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BAUD_RATE);
let mut port_path = initial_port_path;
const RECONNECT_DELAY: Duration = Duration::from_millis(800);
loop {
{
let mut lock = shared_port_path.lock().await;
*lock = port_path.clone();
}
let mut stream = match tokio_serial::new(&port_path, baud).open_native_async() {
Ok(s) => s,
Err(e) => {
serial_connected.store(false, Ordering::SeqCst);
collection_running.store(false, Ordering::SeqCst);
tracing::warn!("Failed to open serial port {port_path}: {e}. Retrying...");
sleep(RECONNECT_DELAY).await;
if let Ok(new_path) = detect_esp_port() {
port_path = new_path;
}
continue;
}
};
#[cfg(unix)]
{
let _ = stream.set_exclusive(false);
}
let _ = stream.write_data_terminal_ready(false);
if let Err(e) = stream.write_request_to_send(true) {
tracing::warn!("Failed to assert RTS on {port_path}: {e}");
} else {
sleep(Duration::from_millis(100)).await;
if let Err(e) = stream.write_request_to_send(false) {
tracing::warn!("Failed to deassert RTS on {port_path}: {e}");
} else {
tracing::info!("ESP32 reset on connect via RTS ({port_path})");
}
}
serial_connected.store(true, Ordering::SeqCst);
tracing::info!("Opened serial port {port_path} @ {baud} baud");
let exit = run_serial_connection(
&port_path,
stream,
&mut cmd_rx,
&csi_tx,
&mut log_mode_rx,
&mut output_mode_rx,
&mut session_file_rx,
)
.await;
serial_connected.store(false, Ordering::SeqCst);
collection_running.store(false, Ordering::SeqCst);
match exit {
ConnectionExit::Disconnected => {
tracing::warn!("ESP32 disconnected; waiting for reconnect...");
sleep(RECONNECT_DELAY).await;
if let Ok(new_path) = detect_esp_port() {
port_path = new_path;
}
}
ConnectionExit::CommandChannelClosed => {
tracing::info!("Command channel closed — shutting down serial task");
break;
}
}
}
}
enum ConnectionExit {
Disconnected,
CommandChannelClosed,
}
async fn run_serial_connection(
port_path: &str,
stream: tokio_serial::SerialStream,
cmd_rx: &mut mpsc::Receiver<String>,
csi_tx: &broadcast::Sender<Vec<u8>>,
log_mode_rx: &mut watch::Receiver<LogMode>,
output_mode_rx: &mut watch::Receiver<OutputMode>,
session_file_rx: &mut watch::Receiver<Option<String>>,
) -> ConnectionExit {
let (reader, mut writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader);
let mut buf = Vec::new();
let mut current_mode = output_mode_rx.borrow().clone();
let mut current_session_path = session_file_rx.borrow().clone();
let mut current_log_mode = log_mode_rx.borrow().clone();
let mut drop_next_serialized_chunk = matches!(current_log_mode, LogMode::Serialized);
let mut dump_file: Option<tokio::fs::File> = None;
sync_dump_file(¤t_mode, ¤t_session_path, &mut dump_file).await;
loop {
let mode_changed = output_mode_rx.has_changed().unwrap_or(false);
let session_changed = session_file_rx.has_changed().unwrap_or(false);
let log_mode_changed = log_mode_rx.has_changed().unwrap_or(false);
if mode_changed {
current_mode = output_mode_rx.borrow_and_update().clone();
}
if session_changed {
match session_file_rx.borrow_and_update().clone() {
Some(path) => current_session_path = Some(path),
None => {
dump_file = None;
current_session_path = None;
tracing::info!("Session ended — dump file closed");
}
}
}
if log_mode_changed {
current_log_mode = log_mode_rx.borrow_and_update().clone();
buf.clear();
if matches!(current_log_mode, LogMode::Serialized) {
drop_next_serialized_chunk = true;
}
}
if mode_changed || session_changed {
sync_dump_file(¤t_mode, ¤t_session_path, &mut dump_file).await;
}
let is_text_mode = matches!(current_log_mode, LogMode::Text);
let is_array_list_mode = matches!(current_log_mode, LogMode::ArrayList);
let delimiter = if matches!(current_log_mode, LogMode::Serialized) {
b'\0'
} else {
b'\n'
};
tokio::select! {
result = reader.read_until(delimiter, &mut buf) => {
match result {
Ok(0) => {
tracing::warn!("Serial port {port_path} closed (EOF)");
return ConnectionExit::Disconnected;
}
Ok(_) => {
if matches!(current_log_mode, LogMode::Serialized) && drop_next_serialized_chunk {
drop_next_serialized_chunk = false;
buf.clear();
continue;
}
if is_text_mode {
let text = String::from_utf8_lossy(&buf);
if !text.contains("csi raw data:") && buf.len() < 65536 {
continue;
}
if let Some(start) = find_subsequence(&buf, b"mac:") {
if start > 0 {
buf.drain(..start);
}
} else {
buf.clear();
continue;
}
buf.retain(|b| {
*b == b'\n' || *b == b'\r' || *b == b'\t' || (*b >= 0x20 && *b <= 0x7E)
});
} else if is_array_list_mode {
while matches!(buf.last(), Some(b'\n' | b'\r')) {
buf.pop();
}
if buf.first() != Some(&b'[') || buf.last() != Some(&b']') {
buf.clear();
continue;
}
}
if buf.last() == Some(&delimiter) {
buf.pop();
}
if is_text_mode && buf.last() == Some(&b'\r') {
buf.pop();
}
if !matches!(current_log_mode, LogMode::Serialized)
&& !buf.is_empty()
&& buf.last() != Some(&b'\n')
{
buf.push(b'\n');
}
if !buf.is_empty() {
if matches!(current_mode, OutputMode::Dump | OutputMode::Both) {
if let Some(ref mut file) = dump_file {
if matches!(current_log_mode, LogMode::Serialized) {
let len = buf.len() as u32;
if let Err(e) = file.write_all(&len.to_le_bytes()).await {
tracing::error!("Dump write error (len): {e}");
} else if let Err(e) = file.write_all(&buf).await {
tracing::error!("Dump write error (data): {e}");
}
} else if let Err(e) = file.write_all(&buf).await {
tracing::error!("Dump write error (text): {e}");
}
}
}
if matches!(current_mode, OutputMode::Stream | OutputMode::Both) {
let _ = csi_tx.send(buf.clone());
}
}
buf.clear();
}
Err(e) => {
tracing::error!("Serial read error on {port_path}: {e}");
return ConnectionExit::Disconnected;
}
}
}
cmd = cmd_rx.recv() => {
match cmd {
Some(cmd) => {
tracing::debug!("→ ESP32: {cmd}");
if matches!(current_log_mode, LogMode::Serialized) {
drop_next_serialized_chunk = true;
}
let line = format!("{cmd}\r\n");
if let Err(e) = writer.write_all(line.as_bytes()).await {
tracing::error!("Serial write error: {e}");
return ConnectionExit::Disconnected;
}
}
None => {
return ConnectionExit::CommandChannelClosed;
}
}
}
}
}
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() || haystack.len() < needle.len() {
return None;
}
haystack
.windows(needle.len())
.position(|window| window == needle)
}
async fn sync_dump_file(
mode: &OutputMode,
session_path: &Option<String>,
dump_file: &mut Option<tokio::fs::File>,
) {
match mode {
OutputMode::Dump | OutputMode::Both => {
if dump_file.is_none() {
if let Some(path) = session_path {
match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await
{
Ok(f) => {
tracing::info!("Opened dump file: {path}");
*dump_file = Some(f);
}
Err(e) => {
tracing::error!("Failed to open dump file {path}: {e}");
}
}
}
}
}
OutputMode::Stream => {
if dump_file.take().is_some() {
tracing::info!("Switched to stream mode — dump file closed");
}
}
}
}