use std::collections::BTreeMap;
use std::io::{self, IsTerminal};
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration as StdDuration;
use anyhow::{Context, Result, anyhow};
use clap::{ArgAction, Parser, Subcommand};
use crossterm::event::{self, Event, KeyEventKind};
use crossterm::execute;
use crossterm::terminal::{
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
};
use ratatui::Terminal;
use ratatui::backend::CrosstermBackend;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::time::{Duration, MissedTickBehavior, interval};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use tracing_subscriber::Layer;
use tracing_subscriber::fmt::writer::BoxMakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, fmt};
use cargowatch_core::{AppConfig, AppPaths, SessionEvent, load_config};
use cargowatch_detector::{DetectionService, RustProcessDetector};
use cargowatch_runner::{ManagedRunRequest, ManagedSessionHandle, spawn_managed_session};
use cargowatch_store::SessionStore;
use cargowatch_ui::{Dashboard, UiAction};
#[derive(Debug, Parser)]
#[command(
name = "cargowatch",
about = "Rust-first build monitoring with full-fidelity managed runs and summary-only detection.",
after_help = "Managed mode launches the build and captures logs, diagnostics, and artifacts. Detect mode only reports best-effort summaries for unrelated external Rust toolchain processes."
)]
struct Cli {
#[arg(long, global = true)]
config: Option<PathBuf>,
#[arg(long, global = true)]
database_path: Option<PathBuf>,
#[arg(short, long, global = true, action = ArgAction::Count)]
verbose: u8,
#[command(subcommand)]
command: Command,
}
#[derive(Debug, Subcommand)]
enum Command {
Tui,
Run {
#[arg(last = true, required = true)]
command: Vec<String>,
},
Detect,
History {
#[arg(long, default_value_t = 12)]
limit: usize,
},
Doctor {
#[arg(long)]
json: bool,
},
CleanHistory,
}
struct AppContext {
paths: AppPaths,
config: AppConfig,
store: SessionStore,
}
#[derive(Debug, Serialize)]
struct DoctorReport {
config_file: String,
data_dir: String,
log_dir: String,
database_path: String,
stdout_is_terminal: bool,
stderr_is_terminal: bool,
term: Option<String>,
colorterm: Option<String>,
detection_poll_interval_ms: u64,
capture_raw_log_storage: bool,
managed_vs_detected: &'static str,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let mut paths = AppPaths::discover(cli.database_path.clone())?;
if let Some(config_path) = &cli.config {
paths.config_file = config_path.clone();
}
paths.ensure_exists()?;
let mut config = load_config(&paths)?;
if let Some(database_path) = cli.database_path.clone() {
config.database_path = database_path;
}
let _tracing = init_tracing(&paths, cli.verbose)?;
let store = SessionStore::connect(&config.database_path).await?;
let cleaned = store.cleanup_old_sessions(config.retention_days).await?;
if cleaned > 0 {
info!(rows = cleaned, "cleaned old sessions");
}
let context = AppContext {
paths,
config,
store,
};
match cli.command {
Command::Tui => run_tui(context, None).await,
Command::Run { command } => run_tui(context, Some(command)).await,
Command::Detect => run_detect(context).await,
Command::History { limit } => run_history(context, limit).await,
Command::Doctor { json } => run_doctor(context, json).await,
Command::CleanHistory => run_clean_history(context).await,
}
}
async fn run_tui(context: AppContext, initial_command: Option<Vec<String>>) -> Result<()> {
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<SessionEvent>();
let detector_cancel = CancellationToken::new();
let detector_task = tokio::spawn(
DetectionService::new(event_tx.clone(), context.config.detection_poll_interval_ms)
.run(detector_cancel.clone()),
);
let mut dashboard = Dashboard::new(context.config.clone());
let mut managed_handles = BTreeMap::<String, ManagedSessionHandle>::new();
let mut persisted_log_counts = BTreeMap::<String, usize>::new();
dashboard.set_history(
context
.store
.recent_sessions(context.config.history_limit)
.await?,
);
if let Some(command) = initial_command {
let handle = start_managed_command(&context, command, &event_tx, &mut dashboard).await?;
managed_handles.insert(handle.session_id().to_string(), handle);
}
enable_raw_mode().context("failed to enable raw mode")?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen).context("failed to enter alternate screen")?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend).context("failed to create terminal")?;
terminal.clear().context("failed to clear terminal")?;
let key_reader_cancel = CancellationToken::new();
let mut key_rx = spawn_key_reader(key_reader_cancel.clone());
let loop_result = async {
let mut redraw = interval(Duration::from_millis(100));
redraw.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = redraw.tick() => {
terminal.draw(|frame| {
dashboard.render(frame);
}).context("failed to draw dashboard")?;
}
Some(event) = event_rx.recv() => {
persist_event(&context, &event, &mut persisted_log_counts).await?;
dashboard.apply_event(&event, context.config.max_log_lines_in_memory);
if matches!(event, SessionEvent::SessionStarted(_) | SessionEvent::SessionFinished(_) | SessionEvent::ProcessDetected(_) | SessionEvent::ProcessGone { .. }) {
dashboard.set_history(context.store.recent_sessions(context.config.history_limit).await?);
}
}
Some(key) = key_rx.recv() => {
if let Some(action) = dashboard.handle_key(key) {
match action {
UiAction::Quit => break Ok(()),
UiAction::StartManagedCommand(command) => {
let parsed = shell_words::split(&command)
.map_err(|error| anyhow!("failed to parse command: {error}"))?;
let handle =
start_managed_command(&context, parsed, &event_tx, &mut dashboard)
.await?;
managed_handles.insert(handle.session_id().to_string(), handle);
}
UiAction::CancelSession(session_id) => {
if let Some(handle) = managed_handles.get(&session_id) {
handle.cancel();
}
}
UiAction::LoadSession(session_id) => {
if let Some(session) = context
.store
.load_session(&session_id, context.config.max_log_lines_in_memory)
.await?
{
dashboard.insert_history_session(session);
}
}
}
}
}
}
}
}
.await;
key_reader_cancel.cancel();
disable_raw_mode().ok();
execute!(terminal.backend_mut(), LeaveAlternateScreen).ok();
terminal.show_cursor().ok();
detector_cancel.cancel();
let _ = detector_task.await;
for handle in managed_handles.into_values() {
handle.cancel();
let _ = handle.wait().await;
}
loop_result
}
async fn start_managed_command(
context: &AppContext,
command: Vec<String>,
event_tx: &mpsc::UnboundedSender<SessionEvent>,
dashboard: &mut Dashboard,
) -> Result<ManagedSessionHandle> {
let cwd = std::env::current_dir().context("failed to determine current directory")?;
let workspace_root = infer_workspace_root(&cwd);
let request = ManagedRunRequest::new(command, cwd, workspace_root);
let handle = spawn_managed_session(request, event_tx.clone())?;
dashboard.set_history(
context
.store
.recent_sessions(context.config.history_limit)
.await?,
);
Ok(handle)
}
async fn persist_event(
context: &AppContext,
event: &SessionEvent,
persisted_log_counts: &mut BTreeMap<String, usize>,
) -> Result<()> {
match event {
SessionEvent::SessionStarted(info) => context.store.insert_session_start(info).await?,
SessionEvent::OutputLine { session_id, entry } => {
let count = persisted_log_counts.entry(session_id.clone()).or_default();
if *count < context.config.max_persisted_log_lines {
let mut entry = entry.clone();
if !context.config.capture_raw_log_storage {
entry.raw = None;
}
context.store.insert_log_line(session_id, &entry).await?;
*count += 1;
}
}
SessionEvent::Diagnostic {
session_id,
diagnostic,
} => {
context
.store
.insert_diagnostic(session_id, diagnostic)
.await?;
}
SessionEvent::ArtifactBuilt {
session_id,
artifact,
} => {
context.store.insert_artifact(session_id, artifact).await?;
}
SessionEvent::SessionFinished(finish) => {
context.store.finish_session(finish).await?;
}
SessionEvent::ProcessDetected(process) | SessionEvent::ProcessUpdated(process) => {
context.store.upsert_detected_process(process).await?;
}
SessionEvent::ProcessGone {
session_id,
observed_at,
..
} => {
context
.store
.mark_process_gone(session_id, *observed_at)
.await?;
}
}
Ok(())
}
async fn run_detect(context: AppContext) -> Result<()> {
let mut detector = RustProcessDetector::default();
let processes = detector.scan();
for process in &processes {
context.store.upsert_detected_process(process).await?;
}
if processes.is_empty() {
println!("No active Rust build processes detected.");
println!(
"Managed mode only captures logs, diagnostics, and artifacts for commands it launches."
);
return Ok(());
}
println!(
"{:<8} {:<14} {:<9} {:<8} {:<24} COMMAND",
"PID", "CLASS", "ELAPSED", "MODE", "WORKSPACE"
);
for process in processes {
println!(
"{:<8} {:<14} {:<9} {:<8} {:<24} {}",
process.pid,
process.classification.label(),
format_elapsed(process.elapsed_ms),
"detect",
process
.workspace_root
.as_ref()
.map(|path| shorten_path(path, 24))
.unwrap_or_else(|| "-".to_string()),
process.command.join(" ")
);
}
println!();
println!(
"Detection is summary-only. CargoWatch does not claim live log attachment for unrelated external processes."
);
Ok(())
}
async fn run_history(context: AppContext, limit: usize) -> Result<()> {
let sessions = context.store.recent_sessions(limit).await?;
if sessions.is_empty() {
println!("No session history found yet.");
return Ok(());
}
println!(
"{:<26} {:<9} {:<10} {:<9} {:<7} COMMAND",
"SESSION ID", "MODE", "STATUS", "DURATION", "ERRORS"
);
for session in sessions {
println!(
"{:<26} {:<9} {:<10} {:<9} {:<7} {}",
truncate_id(&session.info.session_id),
match session.info.mode {
cargowatch_core::SessionMode::Managed => "managed",
cargowatch_core::SessionMode::Detected => "detected",
},
session.status_label(),
session
.duration_ms
.map(format_elapsed)
.unwrap_or_else(|| "-".to_string()),
session.summary.errors,
session.command_line()
);
}
Ok(())
}
async fn run_doctor(context: AppContext, json: bool) -> Result<()> {
let report = DoctorReport {
config_file: context.paths.config_file.display().to_string(),
data_dir: context.paths.data_dir.display().to_string(),
log_dir: context.paths.log_dir.display().to_string(),
database_path: context.config.database_path.display().to_string(),
stdout_is_terminal: io::stdout().is_terminal(),
stderr_is_terminal: io::stderr().is_terminal(),
term: std::env::var("TERM").ok(),
colorterm: std::env::var("COLORTERM").ok(),
detection_poll_interval_ms: context.config.detection_poll_interval_ms,
capture_raw_log_storage: context.config.capture_raw_log_storage,
managed_vs_detected: "Managed mode captures logs, diagnostics, and artifacts for commands CargoWatch launches. Detect mode stores best-effort summaries for unrelated external Rust processes.",
};
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
println!("CargoWatch doctor");
println!(" config file: {}", report.config_file);
println!(" data dir: {}", report.data_dir);
println!(" log dir: {}", report.log_dir);
println!(" database: {}", report.database_path);
println!(" stdout tty: {}", report.stdout_is_terminal);
println!(" stderr tty: {}", report.stderr_is_terminal);
println!(" TERM: {}", report.term.as_deref().unwrap_or("-"));
println!(
" COLORTERM: {}",
report.colorterm.as_deref().unwrap_or("-")
);
println!(" poll every: {} ms", report.detection_poll_interval_ms);
println!(" raw log db: {}", report.capture_raw_log_storage);
println!(" note: {}", report.managed_vs_detected);
}
Ok(())
}
async fn run_clean_history(context: AppContext) -> Result<()> {
let removed = context
.store
.cleanup_old_sessions(context.config.retention_days)
.await?;
println!("Removed {removed} stale session rows.");
Ok(())
}
fn init_tracing(
paths: &AppPaths,
verbose: u8,
) -> Result<tracing_appender::non_blocking::WorkerGuard> {
let log_file = tracing_appender::rolling::daily(&paths.log_dir, "cargowatch.log");
let (file_writer, guard) = tracing_appender::non_blocking(log_file);
let env_filter = match verbose {
0 => EnvFilter::new("info"),
1 => EnvFilter::new("debug"),
_ => EnvFilter::new("trace"),
};
let file_layer = fmt::layer()
.with_ansi(false)
.with_writer(BoxMakeWriter::new(file_writer))
.with_target(true);
let stderr_layer = fmt::layer()
.with_writer(io::stderr)
.with_target(false)
.with_filter(env_filter.clone());
tracing_subscriber::registry()
.with(env_filter)
.with(file_layer)
.with(stderr_layer)
.init();
Ok(guard)
}
fn spawn_key_reader(
cancellation: CancellationToken,
) -> mpsc::UnboundedReceiver<crossterm::event::KeyEvent> {
let (tx, rx) = mpsc::unbounded_channel();
thread::spawn(move || {
while !cancellation.is_cancelled() {
match event::poll(StdDuration::from_millis(100)) {
Ok(true) => match event::read() {
Ok(Event::Key(key)) if key.kind == KeyEventKind::Press => {
if tx.send(key).is_err() {
break;
}
}
Ok(_) => {}
Err(error) => {
warn!(%error, "keyboard reader failed");
break;
}
},
Ok(false) => {}
Err(error) => {
warn!(%error, "keyboard polling failed");
break;
}
}
}
});
rx
}
fn infer_workspace_root(start: &Path) -> Option<PathBuf> {
for ancestor in start.ancestors() {
if ancestor.join("Cargo.toml").exists() {
return Some(ancestor.to_path_buf());
}
}
None
}
fn truncate_id(session_id: &str) -> String {
session_id.chars().take(26).collect()
}
fn format_elapsed(duration_ms: i64) -> String {
let duration_ms = duration_ms.max(0);
if duration_ms >= 60_000 {
format!("{}m{}s", duration_ms / 60_000, (duration_ms / 1_000) % 60)
} else {
format!("{}s", duration_ms / 1_000)
}
}
fn shorten_path(path: &Path, limit: usize) -> String {
let text = path.display().to_string();
if text.len() <= limit {
text
} else {
format!("…{}", &text[text.len().saturating_sub(limit - 1)..])
}
}