use crate::commands::index::resolve_sources;
use crate::config::{default_db_path, default_state_path, DEFAULT_LAUNCH_AGENT_LABEL};
use crate::indexer::{index_sources_with_progress, scan_sources_for_pending, SourceScanReport};
use crate::output::{format_bytes, now_timestamp, progress_line};
use crate::store::Store;
use anyhow::{anyhow, Context, Result};
use clap::Args;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::ffi::OsString;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command as ProcessCommand;
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Args)]
pub struct WatchArgs {
#[arg(long, help = "SQLite index path")]
pub db: Option<PathBuf>,
#[arg(long, help = "Watch state JSON path")]
pub state: Option<PathBuf>,
#[arg(long = "source", help = "Session archive root to scan; repeatable")]
pub sources: Vec<PathBuf>,
#[arg(long, default_value_t = 10, help = "Seconds between watch scans")]
pub interval: u64,
#[arg(
long,
default_value_t = 3,
help = "Seconds a file must be quiet before indexing"
)]
pub quiet_for: u64,
#[arg(long, help = "Run one scan and exit")]
pub once: bool,
#[arg(long, help = "Write a macOS LaunchAgent plist for background indexing")]
pub install_launch_agent: bool,
#[arg(long, help = "Bootstrap and verify the LaunchAgent after writing it")]
pub start_launch_agent: bool,
#[arg(
long,
default_value = DEFAULT_LAUNCH_AGENT_LABEL,
help = "LaunchAgent label"
)]
pub agent_label: String,
#[arg(long, help = "LaunchAgent plist output path")]
pub agent_path: Option<PathBuf>,
}
#[derive(Debug, Clone, Args)]
pub struct StatusArgs {
#[arg(long, help = "SQLite index path")]
pub db: Option<PathBuf>,
#[arg(long, help = "Watch state JSON path")]
pub state: Option<PathBuf>,
#[arg(long = "source", help = "Session archive root to scan; repeatable")]
pub sources: Vec<PathBuf>,
#[arg(
long,
default_value_t = 3,
help = "Seconds a file must be quiet before indexing"
)]
pub quiet_for: u64,
#[arg(long, help = "Emit machine-readable JSON")]
pub json: bool,
#[arg(
long,
default_value = DEFAULT_LAUNCH_AGENT_LABEL,
help = "LaunchAgent label"
)]
pub agent_label: String,
#[arg(long, help = "LaunchAgent plist path")]
pub agent_path: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub(crate) struct WatchState {
last_run_at: Option<String>,
last_indexed_at: Option<String>,
last_error: Option<String>,
last_indexed_sessions: usize,
last_indexed_events: usize,
last_files_seen: usize,
last_files_total: usize,
pending_files: usize,
}
#[derive(Debug, Clone)]
pub(crate) struct WatchStatusReport {
pub db_path: PathBuf,
pub db_exists: bool,
pub state_path: PathBuf,
pub sources: Vec<PathBuf>,
pub scan: SourceScanReport,
pub state: WatchState,
pub last_indexed_at: Option<String>,
pub freshness: FreshnessVerdict,
pub launch_agent: LaunchAgentStatus,
}
#[derive(Debug, Clone)]
pub(crate) struct FreshnessVerdict {
pub state: &'static str,
pub message: String,
}
#[derive(Debug, Clone)]
pub(crate) struct LaunchAgentStatus {
pub label: String,
pub path: PathBuf,
pub supported: bool,
pub installed: bool,
pub running: Option<bool>,
}
pub fn run_watch(args: WatchArgs) -> Result<()> {
let db_path = args.db.unwrap_or(default_db_path()?);
let state_path = args.state.unwrap_or(default_state_path()?);
let sources = resolve_sources(args.sources)?;
let interval = Duration::from_secs(args.interval);
let quiet_for = Duration::from_secs(args.quiet_for);
if args.install_launch_agent {
if !launch_agent_supported() {
return Err(anyhow!(
"watch --install-launch-agent is only supported on macOS"
));
}
let agent_path = args
.agent_path
.unwrap_or(default_launch_agent_path(&args.agent_label)?);
install_launch_agent_plist(
&agent_path,
&args.agent_label,
&db_path,
&state_path,
&sources,
interval,
quiet_for,
)?;
println!("installed launch agent: {}", agent_path.display());
if args.start_launch_agent {
start_launch_agent(&args.agent_label, &agent_path)?;
println!("started launch agent: {}", args.agent_label);
}
println!(
"next: launchctl bootstrap gui/$(id -u) {}",
agent_path.display()
);
return Ok(());
}
loop {
match run_watch_iteration(&db_path, &state_path, &sources, quiet_for) {
Ok(()) => {
if args.once {
return Ok(());
}
}
Err(error) => {
let mut state = read_watch_state(&state_path).unwrap_or_default();
state.last_run_at = Some(now_timestamp());
state.last_error = Some(format!("{error:#}"));
let _ = write_watch_state(&state_path, &state);
if args.once {
return Err(error);
}
eprintln!("watch error: {error:#}");
}
}
thread::sleep(interval);
}
}
fn run_watch_iteration(
db_path: &Path,
state_path: &Path,
sources: &[PathBuf],
quiet_for: Duration,
) -> Result<()> {
let mut state = read_watch_state(state_path).unwrap_or_default();
let (scan, last_indexed_at) = scan_status(db_path, sources, quiet_for)?;
state.last_run_at = Some(now_timestamp());
state.pending_files = scan.pending_files;
state.last_error = None;
if scan.pending_files == 0 {
state.last_indexed_at = last_indexed_at.or(state.last_indexed_at);
write_watch_state(state_path, &state)?;
println!("watch idle: no pending files");
return Ok(());
}
if scan.waiting_files > 0 {
state.last_indexed_at = last_indexed_at.or(state.last_indexed_at);
write_watch_state(state_path, &state)?;
println!(
"watch waiting: {} pending files, {} still within quiet window",
scan.pending_files, scan.waiting_files
);
return Ok(());
}
let store = Store::open(db_path)?;
let started = Instant::now();
let report = index_sources_with_progress(&store, sources, |report| {
eprintln!("{}", progress_line(report, started.elapsed()));
})?;
let remaining = scan_sources_for_pending(Some(&store), sources, quiet_for)?;
state.last_run_at = Some(now_timestamp());
state.last_indexed_at = store
.last_indexed_at()?
.or_else(|| state.last_run_at.clone());
state.last_indexed_sessions = report.sessions_indexed;
state.last_indexed_events = report.events_indexed;
state.last_files_seen = report.files_seen;
state.last_files_total = report.files_total;
state.pending_files = remaining.pending_files;
state.last_error = None;
write_watch_state(state_path, &state)?;
println!(
"watch indexed {} session files, {} events from {}/{} files; {} pending files remain",
report.sessions_indexed,
report.events_indexed,
report.files_seen,
report.files_total,
remaining.pending_files
);
Ok(())
}
pub fn run_status(args: StatusArgs) -> Result<()> {
let db_path = args.db.unwrap_or(default_db_path()?);
let state_path = args.state.unwrap_or(default_state_path()?);
let sources = resolve_sources(args.sources)?;
let quiet_for = Duration::from_secs(args.quiet_for);
let agent_path = args
.agent_path
.unwrap_or(default_launch_agent_path(&args.agent_label)?);
let report = build_status_report(
db_path,
state_path,
sources,
quiet_for,
args.agent_label,
agent_path,
)?;
if args.json {
println!("{}", serde_json::to_string_pretty(&status_json(&report))?);
return Ok(());
}
let db_status = if report.db_exists {
"exists"
} else {
"missing"
};
println!("database: {} ({db_status})", report.db_path.display());
println!("state: {}", report.state_path.display());
println!(
"freshness: {} ({})",
report.freshness.state, report.freshness.message
);
println!(
"pending: {} files, {} stable, {} waiting, {}",
report.scan.pending_files,
report.scan.stable_pending_files,
report.scan.waiting_files,
format_bytes(report.scan.pending_bytes)
);
println!(
"last_indexed_at: {}",
report
.last_indexed_at
.clone()
.unwrap_or_else(|| "never".to_owned())
);
println!(
"last_error: {}",
report
.state
.last_error
.clone()
.unwrap_or_else(|| "none".to_owned())
);
if report.launch_agent.supported {
println!(
"launch_agent: {} (installed: {}, running: {})",
report.launch_agent.label,
report.launch_agent.installed,
report
.launch_agent
.running
.map(|running| running.to_string())
.unwrap_or_else(|| "unknown".to_owned())
);
} else {
println!(
"launch_agent: unsupported on this platform (label: {}, path: {})",
report.launch_agent.label,
report.launch_agent.path.display()
);
}
for source in report.sources {
let status = if source.exists() { "exists" } else { "missing" };
println!("source: {} ({status})", source.display());
}
Ok(())
}
pub(crate) fn build_status_report(
db_path: PathBuf,
state_path: PathBuf,
sources: Vec<PathBuf>,
quiet_for: Duration,
agent_label: String,
agent_path: PathBuf,
) -> Result<WatchStatusReport> {
let state = read_watch_state(&state_path).unwrap_or_default();
let (scan, last_indexed_at) = scan_status(&db_path, &sources, quiet_for)?;
let last_indexed_at = last_indexed_at.or(state.last_indexed_at.clone());
let db_exists = db_path.exists();
let launch_agent = launch_agent_status(agent_label, agent_path);
let freshness = freshness_verdict(db_exists, &scan, &state);
Ok(WatchStatusReport {
db_path,
db_exists,
state_path,
sources,
scan,
state,
last_indexed_at,
freshness,
launch_agent,
})
}
pub(crate) fn status_json(report: &WatchStatusReport) -> Value {
json!({
"db_path": report.db_path,
"db_exists": report.db_exists,
"state_path": report.state_path,
"sources": report.sources,
"files_total": report.scan.files_total,
"pending_files": report.scan.pending_files,
"pending_bytes": report.scan.pending_bytes,
"stable_pending_files": report.scan.stable_pending_files,
"waiting_files": report.scan.waiting_files,
"missing_sources": report.scan.missing_sources,
"freshness": report.freshness.state,
"freshness_message": report.freshness.message,
"last_run_at": report.state.last_run_at,
"last_indexed_at": report.last_indexed_at,
"last_error": report.state.last_error,
"last_indexed_sessions": report.state.last_indexed_sessions,
"last_indexed_events": report.state.last_indexed_events,
"launch_agent": {
"label": report.launch_agent.label,
"path": report.launch_agent.path,
"supported": report.launch_agent.supported,
"installed": report.launch_agent.installed,
"running": report.launch_agent.running,
},
})
}
fn freshness_verdict(
db_exists: bool,
scan: &SourceScanReport,
state: &WatchState,
) -> FreshnessVerdict {
if let Some(error) = &state.last_error {
return FreshnessVerdict {
state: "stale",
message: format!("watcher last failed: {error}"),
};
}
if scan.pending_files > 0 && state.last_run_at.is_none() {
return FreshnessVerdict {
state: "watcher-not-running",
message: format!(
"{} pending stable files and no watcher state",
scan.stable_pending_files
),
};
}
if scan.stable_pending_files > 0 || (!db_exists && scan.pending_files > 0) {
return FreshnessVerdict {
state: "stale",
message: format!(
"{} stable files are ready to index",
scan.stable_pending_files
),
};
}
if scan.waiting_files > 0 {
return FreshnessVerdict {
state: "pending-live-writes",
message: format!(
"{} files are still within the quiet window",
scan.waiting_files
),
};
}
if !scan.missing_sources.is_empty() {
return FreshnessVerdict {
state: "stale",
message: format!(
"{} configured sources are missing",
scan.missing_sources.len()
),
};
}
FreshnessVerdict {
state: "fresh",
message: "index is current".to_owned(),
}
}
fn launch_agent_status(label: String, path: PathBuf) -> LaunchAgentStatus {
let supported = launch_agent_supported();
let installed = supported && path.exists();
let running = if installed {
Some(is_launch_agent_running(&label))
} else {
None
};
LaunchAgentStatus {
label,
path,
supported,
installed,
running,
}
}
fn is_launch_agent_running(label: &str) -> bool {
let Ok(domain) = launch_agent_domain() else {
return false;
};
ProcessCommand::new(launchctl_executable())
.args(["print", &format!("{domain}/{label}")])
.output()
.map(|output| output.status.success())
.unwrap_or(false)
}
fn start_launch_agent(label: &str, path: &Path) -> Result<()> {
if !launch_agent_supported() {
return Err(anyhow!("launchctl integration is only supported on macOS"));
}
let domain = launch_agent_domain()?;
let bootstrap = ProcessCommand::new(launchctl_executable())
.args(["bootstrap", &domain])
.arg(path)
.output()
.context("run launchctl bootstrap")?;
if !bootstrap.status.success() && !launchctl_already_loaded(&bootstrap) {
return Err(anyhow!(
"launchctl bootstrap failed: {}{}",
String::from_utf8_lossy(&bootstrap.stdout),
String::from_utf8_lossy(&bootstrap.stderr)
));
}
if !bootstrap.status.success() {
let kickstart = ProcessCommand::new(launchctl_executable())
.args(["kickstart", "-k", &format!("{domain}/{label}")])
.output()
.context("run launchctl kickstart")?;
if !kickstart.status.success() {
return Err(anyhow!(
"launchctl kickstart failed: {}{}",
String::from_utf8_lossy(&kickstart.stdout),
String::from_utf8_lossy(&kickstart.stderr)
));
}
}
let print = ProcessCommand::new(launchctl_executable())
.args(["print", &format!("{domain}/{label}")])
.output()
.context("run launchctl print")?;
if !print.status.success() {
return Err(anyhow!(
"launchctl print failed after bootstrap: {}{}",
String::from_utf8_lossy(&print.stdout),
String::from_utf8_lossy(&print.stderr)
));
}
Ok(())
}
fn launchctl_already_loaded(output: &std::process::Output) -> bool {
let text = format!(
"{}{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
)
.to_ascii_lowercase();
text.contains("already") && (text.contains("loaded") || text.contains("bootstrapped"))
}
fn launch_agent_domain() -> Result<String> {
if let Ok(uid) = std::env::var("CODEX_RECALL_UID") {
return Ok(format!("gui/{uid}"));
}
if let Ok(uid) = std::env::var("UID") {
return Ok(format!("gui/{uid}"));
}
let output = ProcessCommand::new("id")
.arg("-u")
.output()
.context("run id -u")?;
if !output.status.success() {
return Err(anyhow!("id -u failed"));
}
Ok(format!(
"gui/{}",
String::from_utf8_lossy(&output.stdout).trim()
))
}
fn launchctl_executable() -> OsString {
std::env::var_os("CODEX_RECALL_LAUNCHCTL").unwrap_or_else(|| OsString::from("launchctl"))
}
fn launch_agent_supported() -> bool {
cfg!(target_os = "macos")
}
fn scan_status(
db_path: &Path,
sources: &[PathBuf],
quiet_for: Duration,
) -> Result<(SourceScanReport, Option<String>)> {
if !db_path.exists() {
return Ok((scan_sources_for_pending(None, sources, quiet_for)?, None));
}
let store = Store::open_readonly(db_path)?;
let last_indexed_at = store.last_indexed_at()?;
let scan = scan_sources_for_pending(Some(&store), sources, quiet_for)?;
Ok((scan, last_indexed_at))
}
fn read_watch_state(path: &Path) -> Result<WatchState> {
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
return Ok(WatchState::default());
}
Err(error) => return Err(error).with_context(|| format!("read {}", path.display())),
};
serde_json::from_slice(&bytes).with_context(|| format!("parse {}", path.display()))
}
fn write_watch_state(path: &Path, state: &WatchState) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(state)?;
fs::write(path, bytes).with_context(|| format!("write {}", path.display()))?;
Ok(())
}
fn default_launch_agent_path(label: &str) -> Result<PathBuf> {
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.ok_or_else(|| anyhow!("HOME is not set"))?;
Ok(home
.join("Library")
.join("LaunchAgents")
.join(format!("{label}.plist")))
}
fn install_launch_agent_plist(
path: &Path,
label: &str,
db_path: &Path,
state_path: &Path,
sources: &[PathBuf],
interval: Duration,
quiet_for: Duration,
) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
}
let executable = std::env::current_exe().context("resolve current executable")?;
let stdout_path = state_path.with_extension("out.log");
let stderr_path = state_path.with_extension("err.log");
let mut args = vec![
executable.display().to_string(),
"watch".to_owned(),
"--db".to_owned(),
db_path.display().to_string(),
"--state".to_owned(),
state_path.display().to_string(),
"--interval".to_owned(),
interval.as_secs().to_string(),
"--quiet-for".to_owned(),
quiet_for.as_secs().to_string(),
];
for source in sources {
args.push("--source".to_owned());
args.push(source.display().to_string());
}
let program_arguments = args
.iter()
.map(|arg| format!(" <string>{}</string>", xml_escape(arg)))
.collect::<Vec<_>>()
.join("\n");
let plist = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{}</string>
<key>ProgramArguments</key>
<array>
{}
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>{}</string>
<key>StandardErrorPath</key>
<string>{}</string>
</dict>
</plist>
"#,
xml_escape(label),
program_arguments,
xml_escape(&stdout_path.display().to_string()),
xml_escape(&stderr_path.display().to_string())
);
fs::write(path, plist).with_context(|| format!("write {}", path.display()))?;
Ok(())
}
fn xml_escape(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}