use anyhow::Result;
use chrono::{DateTime, Utc};
use clap::{Parser, Subcommand, ValueEnum};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, RwLock};
use tokio::time;
mod alerts;
mod config;
mod daemon;
mod demo;
mod events;
mod history;
mod interactive;
mod analysis;
mod monitor;
mod platform;
mod report;
mod ui;
#[derive(Parser, Debug)]
#[command(name = "sandspy", version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
#[arg(short, long, global = true)]
dashboard: bool,
#[arg(short, long, value_enum, default_value_t = Verbosity::Low, global = true)]
verbosity: Verbosity,
#[arg(short, long, global = true)]
output: Option<PathBuf>,
#[arg(long, global = true)]
profile: Option<String>,
#[arg(long, global = true)]
no_color: bool,
#[arg(long, global = true)]
json: bool,
#[arg(long, global = true)]
poll_interval: Option<u64>,
#[arg(long, global = true)]
max_events: Option<usize>,
#[arg(long, global = true, action = clap::ArgAction::Append)]
exclude_path: Vec<String>,
}
#[derive(Subcommand, Debug)]
enum Commands {
Watch {
command: Option<String>,
},
Attach {
#[arg(long, required_unless_present = "name", conflicts_with = "name")]
pid: Option<u32>,
#[arg(long, required_unless_present = "pid", conflicts_with = "pid")]
name: Option<String>,
},
Demo {
#[arg(long)]
scan: bool,
#[arg(long)]
seed: Option<u64>,
},
Report {
#[arg(long)]
session: Option<String>,
#[arg(long, value_enum, default_value_t = ReportFormat::Markdown)]
format: ReportFormat,
},
History {
#[arg(long)]
session: Option<String>,
#[arg(long)]
agent: Option<String>,
#[arg(long)]
since: Option<String>,
#[arg(long)]
until: Option<String>,
#[arg(long)]
min_risk: Option<u32>,
#[arg(long)]
delete: bool,
},
Daemon {
#[command(subcommand)]
action: DaemonAction,
},
Profiles {
#[command(subcommand)]
action: ProfileAction,
},
}
#[derive(Subcommand, Debug)]
enum DaemonAction {
Start,
Stop,
Status,
Watch,
}
#[derive(Subcommand, Debug)]
enum ProfileAction {
List,
Show { name: String },
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
enum Verbosity {
Low,
Medium,
High,
All,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
enum ReportFormat {
Markdown,
Json,
Html,
}
#[derive(Debug, Clone)]
struct GlobalOptions {
dashboard: bool,
verbosity: Verbosity,
output: Option<PathBuf>,
profile: Option<String>,
no_color: bool,
json: bool,
poll_interval: Option<u64>,
max_events: Option<usize>,
#[allow(dead_code)]
exclude_paths: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
if cli.dashboard {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::ERROR)
.with_writer(std::io::sink)
.init();
} else {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.init();
}
let global = GlobalOptions {
dashboard: cli.dashboard,
verbosity: cli.verbosity,
output: cli.output,
profile: cli.profile,
no_color: cli.no_color,
json: cli.json,
poll_interval: cli.poll_interval,
max_events: cli.max_events,
exclude_paths: cli.exclude_path,
};
let mut cfg = config::load_config();
if let Some(pi) = global.poll_interval {
cfg.monitoring.poll_interval_ms = pi.clamp(50, 10_000);
cfg.monitoring.net_poll_interval_ms = pi.clamp(50, 10_000);
}
if let Some(me) = global.max_events {
cfg.monitoring.max_events = me.min(200_000);
}
let config = std::sync::Arc::new(cfg);
if global.no_color || std::env::var("NO_COLOR").is_ok() {
colored::control::set_override(false);
}
match cli.command {
None => handle_interactive().await?,
Some(command) => handle_command(command, global, config.clone()).await?,
}
Ok(())
}
async fn handle_interactive() -> Result<()> {
interactive::run().await
}
async fn handle_command(
command: Commands,
global: GlobalOptions,
config: std::sync::Arc<config::Config>,
) -> Result<()> {
match command {
Commands::Watch { command } => handle_watch(command, global, config.clone()).await,
Commands::Attach { pid, name } => handle_attach(pid, name, global, config.clone()).await,
Commands::Demo { scan, seed } => handle_demo(scan, seed, global).await,
Commands::Report { session, format } => handle_report(session, format, global).await,
Commands::History {
session,
agent,
since,
until,
min_risk,
delete,
} => handle_history(session, agent, since, until, min_risk, delete, global).await,
Commands::Daemon { action } => handle_daemon(action, global).await,
Commands::Profiles { action } => handle_profiles(action, global).await,
}
}
async fn handle_watch(
command: Option<String>,
global: GlobalOptions,
config: std::sync::Arc<config::Config>,
) -> Result<()> {
let watch_command = match command.clone() {
Some(cmd) => cmd,
None => {
let agents = monitor::process::scan_for_agents();
if agents.is_empty() {
anyhow::bail!("No running AI agents detected. Please specify an agent name (e.g. `sandspy watch Code`)");
}
let agent = &agents[0];
println!(
"\x1b[32m[!] Detected {} running on PID {}. Attaching...\x1b[0m",
agent.name, agent.pid
);
agent.name.clone()
}
};
let session_start = SystemTime::now();
let loaded_profiles = analysis::profiler::load_profiles()?;
let command_token = watch_command.split_whitespace().next();
let matched_profile = analysis::profiler::match_profile(
&loaded_profiles,
global.profile.as_deref(),
command_token,
);
tracing::info!(
command = %watch_command,
profile = ?matched_profile.map(|profile| profile.id.clone()),
dashboard = global.dashboard,
verbosity = ?global.verbosity,
output = ?global.output,
json = global.json,
"watch mode"
);
let (tx, mut rx) = events::create_event_bus();
let pids = monitor::process::create_pid_set();
let process_state = Arc::new(RwLock::new(WatchProcessState::default()));
let tx_for_process = tx.clone();
let tx_for_filesystem = tx.clone();
let tx_for_network = tx.clone();
let tx_for_command = tx.clone();
let tx_for_environment = tx.clone();
let tx_for_clipboard = tx.clone();
let pids_for_process = pids.clone();
let pids_for_filesystem = pids.clone();
let pids_for_network = pids.clone();
let pids_for_command = pids.clone();
let pids_for_environment = pids.clone();
let watch_command_clone = watch_command.clone();
let process_handle = tokio::spawn(async move {
let command_name = watch_command_clone
.split_whitespace()
.next()
.unwrap_or(&watch_command_clone);
let is_spawn = monitor::process::find_all_pids_by_name(command_name).is_empty();
if is_spawn {
tracing::info!(command = %watch_command_clone, "spawning and monitoring");
let _ = monitor::process::spawn_and_monitor(
&watch_command_clone,
tx_for_process.clone(),
pids_for_process.clone(),
)
.await;
}
loop {
let all_pids = monitor::process::find_all_pids_by_name(command_name);
if !all_pids.is_empty() {
monitor::process::seed_pid_set(&pids_for_process, &all_pids).await;
tracing::info!(
pids = ?all_pids,
name = command_name,
"attaching to existing process(es)"
);
let event = crate::events::Event::new(crate::events::EventKind::Alert {
message: format!("Agent {} attached (PID {})", command_name, all_pids[0]),
severity: crate::events::RiskLevel::Low,
});
let _ = tx_for_process.send(event).await;
let _ = monitor::process::attach_and_monitor(
all_pids[0],
tx_for_process.clone(),
pids_for_process.clone(),
)
.await;
tracing::info!("Agent {} exited. Waiting for respawn...", command_name);
let event = crate::events::Event::new(crate::events::EventKind::Alert {
message: format!("Agent {} exited. Waiting for respawn...", command_name),
severity: crate::events::RiskLevel::Medium,
});
let _ = tx_for_process.send(event).await;
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
let filesystem_handle = tokio::spawn(async move {
monitor::filesystem::run(
tx_for_filesystem,
pids_for_filesystem,
config.monitoring.max_scan_bytes,
config.monitoring.debounce_ms,
)
.await
});
let network_handle =
tokio::spawn(async move { monitor::network::run(tx_for_network, pids_for_network).await });
let command_handle =
tokio::spawn(async move { monitor::command::run(tx_for_command, pids_for_command).await });
let environment_handle = tokio::spawn(async move {
monitor::environment::run(tx_for_environment, pids_for_environment).await
});
let clipboard_handle =
tokio::spawn(async move { monitor::clipboard::run(tx_for_clipboard).await });
let agent_label = watch_command.clone();
let verbosity_level: u8 = match global.verbosity {
Verbosity::Low => 0,
Verbosity::Medium => 1,
Verbosity::High => 2,
Verbosity::All => 3,
};
let session_start_utc = chrono::Utc::now();
let agent_pid_hint = {
let guard = process_state.read().await;
guard.seen.iter().min().copied()
};
let live_stats = if global.dashboard {
let stats = ui::run_dashboard(
rx,
agent_label.clone(),
agent_pid_hint,
global.no_color || std::env::var("NO_COLOR").is_ok(),
)
.await?;
process_handle.abort();
filesystem_handle.abort();
network_handle.abort();
command_handle.abort();
environment_handle.abort();
clipboard_handle.abort();
drop(tx);
stats
} else {
let printer_handle =
tokio::spawn(
async move { ui::live::run(&mut rx, &agent_label, verbosity_level).await },
);
process_handle.await??;
filesystem_handle.abort();
let _ = filesystem_handle.await;
network_handle.abort();
let _ = network_handle.await;
command_handle.abort();
let _ = command_handle.await;
environment_handle.abort();
let _ = environment_handle.await;
clipboard_handle.abort();
let _ = clipboard_handle.await;
let seen_pids = {
let guard = process_state.read().await;
guard.seen.clone()
};
monitor::memory::run(tx.clone(), session_start, &seen_pids).await?;
time::sleep(Duration::from_millis(250)).await;
drop(tx);
printer_handle.await?
};
let duration_secs = (chrono::Utc::now() - session_start_utc)
.num_seconds()
.max(0) as u64;
let agent_pid = agent_pid_hint;
let metadata = history::SessionMetadata {
agent_name: watch_command.clone(),
pid: agent_pid,
duration: duration_secs,
risk_score: live_stats.risk_score,
event_count: live_stats.event_count,
timestamp: session_start_utc,
};
let session_id = history::persist_session(&metadata, &live_stats.events)?;
let summary_data = ui::summary::SessionData {
agent_name: watch_command.clone(),
agent_pid,
start: session_start_utc,
end: chrono::Utc::now(),
events: live_stats.events.clone(),
risk_score: live_stats.risk_score,
};
ui::summary::print_summary(&summary_data);
println!(" session saved: {}", session_id);
let _ = alerts::notify(
"sandspy",
&format!("session complete — risk {}", live_stats.risk_score),
);
Ok(())
}
async fn handle_attach(
pid: Option<u32>,
name: Option<String>,
global: GlobalOptions,
config: std::sync::Arc<config::Config>,
) -> Result<()> {
tracing::info!(
pid = ?pid,
name = ?name,
dashboard = global.dashboard,
verbosity = ?global.verbosity,
output = ?global.output,
profile = ?global.profile,
json = global.json,
"attach mode"
);
let target_pid = match (pid, name.as_deref()) {
(Some(id), None) => id,
(None, Some(agent_name)) => {
let normalized = normalize_process_name(agent_name);
let found = monitor::process::scan_for_agents()
.into_iter()
.find(|agent| normalize_process_name(&agent.name) == normalized);
match found {
Some(agent) => agent.pid,
None => {
anyhow::bail!("no running agent found with name: {agent_name}");
}
}
}
_ => unreachable!("clap enforces exactly one attach target"),
};
let (tx, mut rx) = events::create_event_bus();
let pids = monitor::process::create_pid_set();
let loaded_profiles = analysis::profiler::load_profiles()?;
let process_name = {
let mut sys = sysinfo::System::new_all();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
sys.process(sysinfo::Pid::from_u32(target_pid))
.map(|p| p.name().to_string_lossy().to_string())
.unwrap_or_default()
};
let matched_profile = analysis::profiler::match_profile(
&loaded_profiles,
global.profile.as_deref(),
Some(&process_name),
);
tracing::info!(profile = ?matched_profile.map(|p| p.id.clone()), "attach mode using profile");
monitor::process::seed_pid_set(&pids, &[target_pid]).await;
let tx_for_process = tx.clone();
let pids_for_process = pids.clone();
let process_handle = tokio::spawn(async move {
monitor::process::attach_and_monitor(target_pid, tx_for_process, pids_for_process).await
});
let tx_for_filesystem = tx.clone();
let pids_for_filesystem = pids.clone();
let filesystem_handle = tokio::spawn(async move {
monitor::filesystem::run(
tx_for_filesystem,
pids_for_filesystem,
config.monitoring.max_scan_bytes,
config.monitoring.debounce_ms,
)
.await
});
let tx_for_network = tx.clone();
let pids_for_network = pids.clone();
let network_handle =
tokio::spawn(async move { monitor::network::run(tx_for_network, pids_for_network).await });
let tx_for_command = tx.clone();
let pids_for_command = pids.clone();
let command_handle =
tokio::spawn(async move { monitor::command::run(tx_for_command, pids_for_command).await });
let tx_for_environment = tx.clone();
let pids_for_environment = pids.clone();
let environment_handle = tokio::spawn(async move {
monitor::environment::run(tx_for_environment, pids_for_environment).await
});
let tx_for_clipboard = tx.clone();
let clipboard_handle =
tokio::spawn(async move { monitor::clipboard::run(tx_for_clipboard).await });
let agent_label = process_name.clone();
let verbosity_level: u8 = match global.verbosity {
Verbosity::Low => 0,
Verbosity::Medium => 1,
Verbosity::High => 2,
Verbosity::All => 3,
};
let live_stats = if global.dashboard {
let stats = ui::run_dashboard(
rx,
agent_label.clone(),
Some(target_pid),
global.no_color || std::env::var("NO_COLOR").is_ok(),
)
.await?;
process_handle.abort();
filesystem_handle.abort();
network_handle.abort();
command_handle.abort();
environment_handle.abort();
clipboard_handle.abort();
drop(tx);
stats
} else {
let tx_for_mem = tx.clone();
let agent_for_printer = agent_label.clone();
let printer_handle = tokio::spawn(async move {
ui::live::run(&mut rx, &agent_for_printer, verbosity_level).await
});
process_handle.await??;
filesystem_handle.abort();
let _ = filesystem_handle.await;
network_handle.abort();
let _ = network_handle.await;
command_handle.abort();
let _ = command_handle.await;
environment_handle.abort();
let _ = environment_handle.await;
clipboard_handle.abort();
let _ = clipboard_handle.await;
monitor::memory::run(tx_for_mem, std::time::SystemTime::now(), &HashSet::new()).await?;
drop(tx);
printer_handle.await?
};
let session_start = chrono::Utc::now();
let duration_secs = chrono::Utc::now()
.signed_duration_since(session_start)
.num_seconds()
.max(0) as u64;
let metadata = history::SessionMetadata {
agent_name: agent_label,
pid: Some(target_pid),
duration: duration_secs,
risk_score: live_stats.risk_score,
event_count: live_stats.event_count,
timestamp: session_start,
};
let session_id = history::persist_session(&metadata, &live_stats.events)?;
println!(" session saved: {}", session_id);
Ok(())
}
async fn handle_demo(scan: bool, seed: Option<u64>, global: GlobalOptions) -> Result<()> {
tracing::info!(
scan = scan,
seed = ?seed,
dashboard = global.dashboard,
verbosity = ?global.verbosity,
json = global.json,
"demo mode"
);
demo::run(scan, seed, global.dashboard).await
}
async fn handle_report(
session: Option<String>,
format: ReportFormat,
global: GlobalOptions,
) -> Result<()> {
tracing::info!(
session = ?session,
format = ?format,
dashboard = global.dashboard,
verbosity = ?global.verbosity,
output = ?global.output,
profile = ?global.profile,
json = global.json,
"report mode"
);
let session_id = session.ok_or_else(|| anyhow::anyhow!("--session is required for report"))?;
let (metadata, events) = report::load_session(&session_id)?;
match format {
ReportFormat::Json => {
let output = report::build_json_report(metadata, events);
println!("{}", serde_json::to_string_pretty(&output)?);
}
ReportFormat::Markdown => {
report::print_markdown_summary(metadata, events);
}
ReportFormat::Html => {
let output = report::html::build_html_report(&metadata, &events);
let path = std::env::current_dir()?.join(format!(
"sandspy_report_{}_{}.html",
metadata.agent_name,
metadata.timestamp.format("%Y%m%d_%H%M%S")
));
std::fs::write(&path, output)?;
println!("HTML Report saved to: {}", path.display());
}
}
Ok(())
}
async fn handle_history(
session: Option<String>,
agent: Option<String>,
since: Option<String>,
until: Option<String>,
min_risk: Option<u32>,
delete: bool,
_global: GlobalOptions,
) -> Result<()> {
if delete {
if let Some(session_id) = session {
return history::delete(&session_id).await;
} else {
anyhow::bail!("--session is required with --delete");
}
}
match session {
Some(session_id) => history::show(&session_id).await,
None => {
let filter = history::ListFilter {
agent,
since: since.as_ref().and_then(|s| {
DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}),
until: until.as_ref().and_then(|s| {
DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}),
min_risk,
};
history::list(filter).await
}
}
}
async fn handle_daemon(action: DaemonAction, _global: GlobalOptions) -> Result<()> {
match action {
DaemonAction::Start => daemon::start().await,
DaemonAction::Stop => daemon::stop().await,
DaemonAction::Status => daemon::status().await,
DaemonAction::Watch => daemon::watch().await,
}
}
async fn handle_profiles(action: ProfileAction, _global: GlobalOptions) -> Result<()> {
let profiles = analysis::profiler::load_profiles()?;
match action {
ProfileAction::List => {
for profile in &profiles {
println!("{} ({})", profile.id, profile.agent.name);
}
}
ProfileAction::Show { name } => {
let profile = analysis::profiler::match_profile(&profiles, Some(&name), None)
.ok_or_else(|| anyhow::anyhow!("profile not found: {}", name))?;
println!("id: {}", profile.id);
println!("name: {}", profile.agent.name);
println!("description: {}", profile.agent.description);
println!("process_names: {}", profile.agent.process_names.join(", "));
println!(
"allowed_domains: {}",
profile.expected.network.allowed_domains.join(", ")
);
}
}
Ok(())
}
#[derive(Debug, Default)]
#[allow(dead_code)]
struct WatchProcessState {
seen: HashSet<u32>,
active: HashSet<u32>,
}
#[allow(dead_code)]
async fn print_watch_events(
rx: &mut mpsc::Receiver<events::Event>,
tx: Option<&mpsc::Sender<events::Event>>,
process_state: Arc<RwLock<WatchProcessState>>,
) {
let mut risk_scorer = analysis::risk::RiskScorer::new();
while let Some(event) = rx.recv().await {
{
let mut state = process_state.write().await;
match &event.kind {
events::EventKind::ProcessSpawn { pid, .. } => {
state.seen.insert(*pid);
state.active.insert(*pid);
}
events::EventKind::ProcessExit { pid, .. } => {
state.active.remove(pid);
}
_ => {}
}
}
let (score, alerts) = risk_scorer.process_with_alerts(&event);
match &event.kind {
events::EventKind::ProcessSpawn {
pid,
name,
cmdline,
parent_pid,
} => {
println!(
"{:?} ProcessSpawn pid={} parent={} name={} cmdline={} score={}",
event.timestamp, pid, parent_pid, name, cmdline, score
);
}
events::EventKind::ProcessExit { pid, exit_code } => {
println!(
"{:?} ProcessExit pid={} exit_code={:?} score={}",
event.timestamp, pid, exit_code, score
);
}
events::EventKind::FileRead {
path,
sensitive,
category,
} => {
println!(
"{:?} FileRead path={} sensitive={} category={:?} score={}",
event.timestamp,
path.display(),
sensitive,
category,
score
);
}
events::EventKind::FileWrite { path, diff_summary } => {
println!(
"{:?} FileWrite path={} diff_summary={:?} score={}",
event.timestamp,
path.display(),
diff_summary,
score
);
}
events::EventKind::FileDelete { path } => {
println!(
"{:?} FileDelete path={} score={}",
event.timestamp,
path.display(),
score
);
}
events::EventKind::NetworkConnection {
remote_addr,
remote_port,
domain,
category,
bytes_sent,
bytes_recv,
} => {
println!(
"{:?} NetworkConnection remote={}:{} domain={:?} category={:?} sent={} recv={} score={}",
event.timestamp,
remote_addr,
remote_port,
domain,
category,
bytes_sent,
bytes_recv,
score
);
}
events::EventKind::ShellCommand {
command,
working_dir,
risk,
} => {
println!(
"{:?} ShellCommand command={} cwd={} risk={:?} score={}",
event.timestamp,
command,
working_dir.display(),
risk,
score
);
}
events::EventKind::EnvVarRead { name, sensitive } => {
println!(
"{:?} EnvVarRead name={} sensitive={} score={}",
event.timestamp, name, sensitive, score
);
}
events::EventKind::ClipboardRead {
content_type,
contains_secret,
} => {
println!(
"{:?} ClipboardRead type={} contains_secret={} score={}",
event.timestamp, content_type, contains_secret, score
);
}
events::EventKind::Alert { message, severity } => {
println!(
"{:?} ALERT severity={:?} message={} score={}",
event.timestamp, severity, message, score
);
}
_ => {}
}
if let Some(sender) = tx {
for alert_event in alerts {
let _ = sender.send(alert_event).await;
}
}
}
}
fn normalize_process_name(name: &str) -> String {
let lower = name.to_lowercase();
if let Some(stripped) = lower.strip_suffix(".exe") {
stripped.to_string()
} else {
lower
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::CommandFactory;
#[test]
fn cli_has_required_subcommands() {
let command = Cli::command();
let subcommands: Vec<String> = command
.get_subcommands()
.map(|sub| sub.get_name().to_string())
.collect();
assert!(subcommands.contains(&"watch".to_string()));
assert!(subcommands.contains(&"attach".to_string()));
assert!(subcommands.contains(&"demo".to_string()));
assert!(subcommands.contains(&"report".to_string()));
assert!(subcommands.contains(&"history".to_string()));
assert!(subcommands.contains(&"daemon".to_string()));
assert!(subcommands.contains(&"profiles".to_string()));
}
#[test]
fn attach_requires_exactly_one_target() {
let missing = Cli::try_parse_from(["sandspy", "attach"]);
assert!(missing.is_err());
let both = Cli::try_parse_from(["sandspy", "attach", "--pid", "42", "--name", "cursor"]);
assert!(both.is_err());
let by_pid = Cli::try_parse_from(["sandspy", "attach", "--pid", "42"]);
assert!(by_pid.is_ok());
let by_name = Cli::try_parse_from(["sandspy", "attach", "--name", "cursor"]);
assert!(by_name.is_ok());
}
#[test]
fn verbosity_restricts_allowed_values() {
let valid = Cli::try_parse_from(["sandspy", "watch", "codex", "-v", "medium"]);
assert!(valid.is_ok());
let invalid = Cli::try_parse_from(["sandspy", "watch", "codex", "-v", "verbose"]);
assert!(invalid.is_err());
}
#[test]
fn report_format_restricts_allowed_values() {
let valid = Cli::try_parse_from(["sandspy", "report", "--format", "json"]);
assert!(valid.is_ok());
let invalid = Cli::try_parse_from(["sandspy", "report", "--format", "yaml"]);
assert!(invalid.is_err());
}
}