use serialport::{new as serial_new, SerialPort};
use std::collections::VecDeque;
use std::io::{self, BufRead, Write};
use std::process::{Command, Stdio};
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc,
};
use std::thread;
use std::time::Duration;
use chrono::Local;
use crate::app_ports::{select_most_likely_port, PortsCmd};
use crate::console_log::{open_log_file, write_to_log};
use crate::line_editor::{LineEditAction, LineEditor};
use crate::native_terminal::{NativeTerminal, TermEvent};
const SERIAL_READ_BUF_SIZE: usize = 4096;
const MAX_OUTPUT_LINES: usize = 2000;
#[derive(Clone, Debug)]
pub enum RxTimestampMode {
First,
Eol,
}
fn parse_rx_timestamp_mode(rx_timestamps: Option<&str>) -> Option<RxTimestampMode> {
match rx_timestamps {
Some("first") => Some(RxTimestampMode::First),
Some("eol") => Some(RxTimestampMode::Eol),
Some(other) => {
eprintln!(
"Warning: unknown --rx-timestamps value '{}', valid values are 'first' or 'eol'",
other
);
None
}
None => None,
}
}
fn inject_timestamps_for_stream(
data: &str,
rx_timestamps: &Option<RxTimestampMode>,
at_line_start: &mut bool,
) -> String {
let mut out = String::with_capacity(data.len() + 20);
match rx_timestamps {
Some(RxTimestampMode::First) => {
for ch in data.chars() {
if *at_line_start && ch != '\r' && ch != '\n' {
out.push_str(&Local::now().format("[%H:%M:%S%.3f] ").to_string());
*at_line_start = false;
}
if ch == '\n' {
*at_line_start = true;
}
out.push(ch);
}
}
Some(RxTimestampMode::Eol) => {
for ch in data.chars() {
if ch == '\n' {
out.push_str(&Local::now().format(" [%H:%M:%S%.3f]").to_string());
}
out.push(ch);
}
}
None => return data.to_string(),
}
out
}
enum ReaderEvent {
Data(Vec<u8>),
Error(String),
Reconnected,
}
struct Display {
term: NativeTerminal,
cols: u16,
rows: u16,
output_col: u16,
output_row: u16,
is_error: bool,
editor: LineEditor,
pending_wrap: bool,
line_buffer: VecDeque<String>,
current_line: String,
rx_timestamps: Option<RxTimestampMode>,
at_line_start: bool,
}
impl Display {
fn new(history_file_path: &str, rx_timestamps: Option<RxTimestampMode>) -> Display {
let term = NativeTerminal::new().expect("Failed to initialize terminal");
let (cols, rows) = term.size();
Display {
term,
cols,
rows,
output_col: 0,
output_row: 0,
is_error: false,
editor: LineEditor::new(history_file_path),
pending_wrap: false,
line_buffer: VecDeque::new(),
current_line: String::new(),
rx_timestamps,
at_line_start: true,
}
}
fn inject_timestamps(&mut self, data: &str) -> String {
let mut out = String::with_capacity(data.len() + 20);
match &self.rx_timestamps {
Some(RxTimestampMode::First) => {
for ch in data.chars() {
if self.at_line_start && ch != '\r' && ch != '\n' {
out.push_str(&Local::now().format("[%H:%M:%S%.3f] ").to_string());
self.at_line_start = false;
}
if ch == '\n' {
self.at_line_start = true;
}
out.push(ch);
}
}
Some(RxTimestampMode::Eol) => {
for ch in data.chars() {
if ch == '\n' {
out.push_str(&Local::now().format(" [%H:%M:%S%.3f]").to_string());
}
out.push(ch);
}
}
None => {
return data.to_string();
}
}
out
}
fn init(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let (cols, rows) = self.term.size();
self.cols = cols;
self.rows = rows;
self.term.clear_screen();
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
self.term.move_to(0, 0);
self.pending_wrap = false;
Ok(())
}
fn handle_resize(&mut self, cols: u16, rows: u16) {
self.cols = cols;
self.rows = rows;
std::thread::sleep(std::time::Duration::from_millis(50));
let (cols, rows) = self.term.size();
self.cols = cols;
self.rows = rows;
self.term.reset_scroll_region();
self.term.clear_screen();
self.output_row = 0;
self.output_col = 0;
self.pending_wrap = false;
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
self.term.move_to(0, 0);
self.redraw_from_buffer();
self.draw_prompt();
}
fn print_output(&mut self, data: &str) {
self.is_error = false;
let owned;
let data: &str = if self.rx_timestamps.is_some() {
owned = self.inject_timestamps(data);
&owned
} else {
data
};
self.term.hide_cursor();
for ch in data.chars() {
if ch == '\n' {
self.line_buffer.push_back(std::mem::take(&mut self.current_line));
if self.line_buffer.len() > MAX_OUTPUT_LINES {
self.line_buffer.pop_front();
}
} else if ch != '\r' && !ch.is_control() {
self.current_line.push(ch);
}
}
self.term.reset_scroll_region();
self.term.move_to(0, self.rows - 1);
self.term.clear_line();
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
self.term.move_to(self.output_col, self.output_row);
let max_output_row = self.rows.saturating_sub(2);
let mut out_buf = String::with_capacity(data.len() + 32);
for ch in data.chars() {
match ch {
'\n' => {
self.pending_wrap = false;
self.output_col = 0;
if self.output_row >= max_output_row {
out_buf.push('\n');
} else {
self.output_row += 1;
out_buf.push('\n');
}
}
'\r' => {
self.pending_wrap = false;
self.output_col = 0;
out_buf.push('\r');
}
c if !c.is_control() => {
if self.pending_wrap {
out_buf.push_str("\r\n");
self.output_col = 0;
if self.output_row < max_output_row {
self.output_row += 1;
}
self.pending_wrap = false;
}
out_buf.push(c);
self.output_col += 1;
if self.output_col >= self.cols {
self.output_col = self.cols - 1;
self.pending_wrap = true;
}
}
_ => {
}
}
}
let mut out = io::stdout();
out.write_all(out_buf.as_bytes()).unwrap();
out.flush().unwrap();
}
fn redraw_from_buffer(&mut self) {
let output_rows = self.rows.saturating_sub(1) as usize; if output_rows == 0 {
return;
}
let cols = self.cols as usize;
if cols == 0 {
return;
}
let mut screen_lines: Vec<&str> = Vec::new();
let mut total_rows_used: usize = 0;
let partial = if !self.current_line.is_empty() {
Some(self.current_line.as_str())
} else {
None
};
let iter_partial = partial.into_iter();
let iter_completed = self.line_buffer.iter().rev().map(|s| s.as_str());
for line in iter_partial.chain(iter_completed) {
let line_rows = if line.is_empty() {
1
} else {
(line.chars().count() + cols - 1) / cols
};
if total_rows_used + line_rows > output_rows {
break;
}
total_rows_used += line_rows;
screen_lines.push(line);
}
screen_lines.reverse();
self.term.move_to(0, 0);
let mut out = io::stdout();
let max_output_row = self.rows.saturating_sub(2);
self.output_row = 0;
self.output_col = 0;
for (i, line) in screen_lines.iter().enumerate() {
write!(out, "{}", line).unwrap();
let char_count = line.chars().count() as u16;
self.output_col = char_count % self.cols;
let rows_used = if char_count == 0 { 0 } else { char_count / self.cols };
self.output_row = self.output_row.saturating_add(rows_used);
if self.output_row > max_output_row {
self.output_row = max_output_row;
}
let is_last = i == screen_lines.len() - 1;
let is_partial = is_last && !self.current_line.is_empty();
if !is_last || !is_partial {
write!(out, "\r\n").unwrap();
self.output_col = 0;
if self.output_row < max_output_row {
self.output_row += 1;
}
}
}
out.flush().unwrap();
}
fn draw_prompt(&mut self) {
self.term.reset_scroll_region();
self.term.move_to(0, self.rows - 1);
self.term.clear_line();
self.term.set_color_yellow();
let buf = self.editor.buffer_str();
self.term.write_str(&format!("> {}", buf));
self.term.reset_color();
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
let cursor_col = 2 + self.editor.cursor_pos() as u16;
self.term.move_to(cursor_col, self.rows - 1);
self.term.show_cursor();
self.term.flush();
}
fn show_error(&mut self, msg: &str) {
self.term.reset_scroll_region();
self.term.move_to(0, self.rows - 1);
self.term.clear_line();
self.term.set_color_red();
self.term.write_str(&format!("! {}", msg));
self.term.reset_color();
self.term.flush();
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
self.is_error = true;
}
fn show_info(&mut self, msg: &str) {
self.term.reset_scroll_region();
self.term.move_to(0, self.rows - 1);
self.term.clear_line();
self.term.set_color_green();
self.term.write_str(&format!("> {}", msg));
self.term.reset_color();
self.term.flush();
if self.rows > 1 {
self.term.set_scroll_region(0, self.rows - 2);
}
}
fn handle_key_event(&mut self, key_event: crate::native_terminal::KeyEvent, send_command: &mut dyn FnMut(String)) -> bool {
match self.editor.handle_key(&key_event) {
LineEditAction::Exit => return false,
LineEditAction::Submit(command) => {
send_command(command.clone());
self.print_output(&format!("> {}\r\n", command));
}
LineEditAction::Updated => {
self.draw_prompt();
}
LineEditAction::None => {}
}
true
}
}
fn open_serial_port(
port_name: &str,
baud_rate: u32,
) -> Result<Box<dyn SerialPort>, Box<dyn std::error::Error>> {
let port = serial_new(port_name, baud_rate)
.timeout(Duration::from_millis(50))
.dtr_on_open(true)
.open()?;
Ok(port)
}
fn spawn_reader_thread(
read_port: Box<dyn SerialPort>,
running: Arc<AtomicBool>,
no_reconnect: bool,
port_name: String,
baud_rate: u32,
write_rx: mpsc::Receiver<Vec<u8>>,
) -> mpsc::Receiver<ReaderEvent> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut port = read_port;
let mut buf = [0u8; SERIAL_READ_BUF_SIZE];
let mut backoff_ms: u64 = 100;
let current_port_name = port_name;
while running.load(Ordering::SeqCst) {
let mut write_error = false;
loop {
match write_rx.try_recv() {
Ok(data) => {
if port.write_all(&data).is_err() || port.flush().is_err() {
write_error = true;
break;
}
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
write_error = true;
break;
}
}
}
if write_error {
let _ = tx.send(ReaderEvent::Error("Serial port write error".into()));
if no_reconnect {
break;
}
loop {
if !running.load(Ordering::SeqCst) {
return;
}
thread::sleep(Duration::from_millis(backoff_ms));
match open_serial_port(¤t_port_name, baud_rate) {
Ok(new_port) => {
port = new_port;
let _ = tx.send(ReaderEvent::Reconnected);
backoff_ms = 100;
break;
}
Err(_) => {
backoff_ms = (backoff_ms * 2).min(2000);
}
}
}
continue;
}
match port.read(&mut buf) {
Ok(n) if n > 0 => {
backoff_ms = 100;
if tx.send(ReaderEvent::Data(buf[..n].to_vec())).is_err() {
break;
}
}
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {}
Err(_e) => {
let _ = tx.send(ReaderEvent::Error("Serial port read error".into()));
if no_reconnect {
break;
}
loop {
if !running.load(Ordering::SeqCst) {
return;
}
thread::sleep(Duration::from_millis(backoff_ms));
match open_serial_port(¤t_port_name, baud_rate) {
Ok(new_port) => {
port = new_port;
let _ = tx.send(ReaderEvent::Reconnected);
backoff_ms = 100;
break;
}
Err(_) => {
backoff_ms = (backoff_ms * 2).min(2000);
}
}
}
}
}
}
});
rx
}
pub fn start_native(
app_folder: String,
serial_port_name: Option<String>,
baud_rate: u32,
no_reconnect: bool,
log: bool,
log_folder: String,
vid: Option<String>,
rx_timestamps: Option<String>,
history_file_name: String,
agent_stream: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let rx_ts_mode = parse_rx_timestamp_mode(rx_timestamps.as_deref());
let log_file = open_log_file(log, &log_folder)?;
let port_name = if let Some(name) = serial_port_name {
name
} else {
let port_cmd = PortsCmd::new_with_vid(vid);
match select_most_likely_port(&port_cmd, false) {
Some(p) => p.port_name,
None => {
println!("Error: No suitable port found");
std::process::exit(1);
}
}
};
let port = open_serial_port(&port_name, baud_rate)?;
let running = Arc::new(AtomicBool::new(true));
let (write_tx, write_rx) = mpsc::channel();
let serial_rx = spawn_reader_thread(
port,
running.clone(),
no_reconnect,
port_name.clone(),
baud_rate,
write_rx,
);
if agent_stream {
return run_agent_stream(serial_rx, write_tx, running, no_reconnect, log_file, rx_ts_mode);
}
let history_file_path = format!("{}/{}", app_folder, history_file_name);
let mut display = Display::new(&history_file_path, rx_ts_mode);
display.init()?;
display.draw_prompt();
let mut send_command = |command: String| {
let mut data = command.into_bytes();
data.push(b'\n');
let _ = write_tx.send(data);
};
while running.load(Ordering::SeqCst) {
while display.term.poll_event(Duration::ZERO) {
match display.term.read_event() {
Some(TermEvent::Key(ke)) => {
if !display.handle_key_event(ke, &mut send_command) {
running.store(false, Ordering::SeqCst);
break;
}
}
Some(TermEvent::Resize(cols, rows)) => {
display.handle_resize(cols, rows);
}
None => break,
}
}
if !running.load(Ordering::SeqCst) {
break;
}
const MAX_SERIAL_DRAIN: usize = 64;
let mut had_serial_data = false;
let mut drain_error = false;
for _ in 0..MAX_SERIAL_DRAIN {
match serial_rx.try_recv() {
Ok(ReaderEvent::Data(bytes)) => {
let text = String::from_utf8_lossy(&bytes);
display.print_output(&text);
write_to_log(&log_file, &text);
had_serial_data = true;
}
Ok(ReaderEvent::Error(msg)) => {
display.show_error(&msg);
drain_error = true;
break;
}
Ok(ReaderEvent::Reconnected) => {
display.show_info("Reconnected");
thread::sleep(Duration::from_millis(500));
display.draw_prompt();
had_serial_data = false; break;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
display.show_error("Serial reader thread disconnected");
running.store(false, Ordering::SeqCst);
drain_error = true;
break;
}
}
}
if had_serial_data && !drain_error {
display.draw_prompt();
}
let _ = display.term.poll_event(Duration::from_millis(5));
}
display.term.cleanup();
println!("Exiting...\r");
Ok(())
}
pub fn start_non_native(
app_folder: String,
port: Option<String>,
baud: u32,
no_reconnect: bool,
log: bool,
log_folder: String,
vid: Option<String>,
rx_timestamps: Option<String>,
agent_stream: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let mut args = vec![
"monitor".to_string(),
app_folder.clone(),
"-b".to_string(),
baud.to_string(),
];
if let Some(p) = port {
args.push("-p".to_string());
args.push(p);
}
if let Some(v) = vid {
args.push("-v".to_string());
args.push(v);
}
if no_reconnect {
args.push("-r".to_string());
}
if log {
args.push("-l".to_string());
args.push("-g".to_string());
args.push(log_folder);
}
if let Some(mode) = rx_timestamps {
args.push("--rx-timestamps".to_string());
args.push(mode);
}
if agent_stream {
args.push("--agent-stream".to_string());
}
let mut command = Command::new("raft.exe");
command.args(args);
if agent_stream {
command.stdin(Stdio::inherit()).stdout(Stdio::piped()).stderr(Stdio::piped());
} else {
command.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit());
}
let process = command.spawn();
match process {
Ok(mut child) => {
let stdout_thread = if agent_stream {
child.stdout.take().map(|mut stdout| {
thread::spawn(move || {
let mut out = io::stdout();
let _ = io::copy(&mut stdout, &mut out);
let _ = out.flush();
})
})
} else {
None
};
let stderr_thread = if agent_stream {
child.stderr.take().map(|mut stderr| {
thread::spawn(move || {
let mut err = io::stderr();
let _ = io::copy(&mut stderr, &mut err);
let _ = err.flush();
})
})
} else {
None
};
let status = child.wait()?;
if let Some(handle) = stdout_thread {
let _ = handle.join();
}
if let Some(handle) = stderr_thread {
let _ = handle.join();
}
if !status.success() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Windows raft.exe monitor command failed with exit code: {:?}", status.code()),
)));
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Could not find raft.exe (Windows version of raftcli).\n\n\
When using WSL, raftcli needs the Windows version (raft.exe) to access USB serial ports.\n\n\
Please ensure:\n\
1. raftcli is installed on Windows: cargo install raftcli\n\
2. raft.exe is in your Windows PATH\n\
3. You can access Windows executables from WSL (try: raft.exe --version)\n\n\
Alternative: Use the -n flag to attempt monitoring with native Linux serial ports (requires USBIPD or similar)",
)));
}
return Err(Box::new(e));
}
}
Ok(())
}
fn run_agent_stream(
serial_rx: mpsc::Receiver<ReaderEvent>,
write_tx: mpsc::Sender<Vec<u8>>,
running: Arc<AtomicBool>,
no_reconnect: bool,
log_file: crate::console_log::SharedLogFile,
rx_timestamps: Option<RxTimestampMode>,
) -> Result<(), Box<dyn std::error::Error>> {
let stdin_write_tx = write_tx.clone();
thread::spawn(move || {
let stdin = io::stdin();
for line in stdin.lock().lines() {
match line {
Ok(line) => {
let mut data = line.into_bytes();
data.push(b'\n');
if stdin_write_tx.send(data).is_err() {
break;
}
}
Err(_) => break,
}
}
});
let mut at_line_start = true;
let mut stdout = io::stdout();
while running.load(Ordering::SeqCst) {
match serial_rx.recv_timeout(Duration::from_millis(50)) {
Ok(ReaderEvent::Data(bytes)) => {
if rx_timestamps.is_some() {
let text = String::from_utf8_lossy(&bytes);
let text = inject_timestamps_for_stream(&text, &rx_timestamps, &mut at_line_start);
stdout.write_all(text.as_bytes())?;
stdout.flush()?;
write_to_log(&log_file, &text);
} else {
stdout.write_all(&bytes)?;
stdout.flush()?;
let text = String::from_utf8_lossy(&bytes);
write_to_log(&log_file, &text);
}
}
Ok(ReaderEvent::Error(msg)) => {
eprintln!("Serial monitor error: {}", msg);
if no_reconnect {
running.store(false, Ordering::SeqCst);
return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, msg)));
}
}
Ok(ReaderEvent::Reconnected) => {
eprintln!("Serial monitor reconnected");
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
running.store(false, Ordering::SeqCst);
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Serial reader thread disconnected",
)));
}
}
}
Ok(())
}