use anyhow::{bail, Context, Result};
use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::{Duration, Instant};
use notify::{EventKind, RecursiveMode, Watcher};
use crate::{config::Config, frontmatter, sessions, stream, run};
const PID_FILE: &str = ".agent-doc/watch.pid";
const IDLE_TIMEOUT_SECS: u64 = 60;
pub struct WatchConfig {
pub debounce_ms: u64,
pub max_cycles: u32,
}
struct FileState {
last_run: Option<Instant>,
cycle_count: u32,
last_hash: Option<u64>,
}
impl FileState {
fn new() -> Self {
Self {
last_run: None,
cycle_count: 0,
last_hash: None,
}
}
}
struct StreamState {
pane: String,
last_capture: String,
target: String,
max_lines: usize,
}
struct WatchEntry {
path: PathBuf,
pane: String,
mode: DocMode,
target: String,
max_lines: usize,
reactive: bool,
}
#[derive(Debug, PartialEq)]
enum DocMode {
FileWatch,
StreamCapture,
}
fn hash_content(path: &Path) -> Option<u64> {
let content = std::fs::read_to_string(path).ok()?;
let content = strip_boundaries_for_hash(&content);
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
Some(hasher.finish())
}
fn limit_lines(content: &str, max_lines: usize) -> String {
let lines: Vec<&str> = content.lines().collect();
if lines.len() <= max_lines {
return content.to_string();
}
lines[lines.len() - max_lines..].join("\n")
}
fn strip_boundaries_for_hash(content: &str) -> String {
content
.lines()
.filter(|line| {
let trimmed = line.trim();
!(trimmed.starts_with("<!-- agent:boundary:") && trimmed.ends_with(" -->"))
})
.collect::<Vec<_>>()
.join("\n")
}
fn pid_alive(pid: u32) -> bool {
Path::new(&format!("/proc/{}", pid)).exists()
}
fn read_pid() -> Option<u32> {
let content = std::fs::read_to_string(PID_FILE).ok()?;
content.trim().parse().ok()
}
fn write_pid() -> Result<()> {
let pid_path = Path::new(PID_FILE);
if let Some(parent) = pid_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(pid_path, format!("{}", std::process::id()))?;
Ok(())
}
fn remove_pid() {
let _ = std::fs::remove_file(PID_FILE);
}
pub fn is_running() -> bool {
read_pid().is_some_and(pid_alive)
}
pub fn ensure_running() -> Result<bool> {
if is_running() {
return Ok(false);
}
let cwd = std::env::current_dir().unwrap_or_default();
let project_root = crate::snapshot::find_project_root(&cwd)
.context("could not find .agent-doc/ directory — not in an agent-doc project")?;
let exe = std::env::current_exe().context("failed to resolve agent-doc binary path")?;
std::process::Command::new(exe)
.arg("watch")
.current_dir(&project_root)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.context("failed to spawn watch daemon")?;
for _ in 0..10 {
std::thread::sleep(Duration::from_millis(50));
if is_running() {
return Ok(true);
}
}
Ok(true)
}
pub fn start(config: &Config, watch_config: WatchConfig) -> Result<()> {
let cwd = std::env::current_dir().unwrap_or_default();
if let Some(root) = find_project_root(&cwd)
&& root != cwd
{
std::env::set_current_dir(&root)
.with_context(|| format!("failed to cd to project root {}", root.display()))?;
eprintln!("Resolved project root: {}", root.display());
}
if let Some(pid) = read_pid() {
if pid_alive(pid) {
bail!("watch daemon already running (PID {})", pid);
}
remove_pid();
}
write_pid()?;
eprintln!("Watch daemon started (PID {})", std::process::id());
let running = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
{
let running = running.clone();
ctrlc_handler(move || {
running.store(false, std::sync::atomic::Ordering::SeqCst);
});
}
let result = run_event_loop(config, &watch_config, &running);
remove_pid();
eprintln!("Watch daemon stopped.");
result
}
fn ctrlc_handler<F: Fn() + Send + 'static>(f: F) {
std::thread::spawn(move || {
signal_wait();
f();
});
}
fn signal_wait() {
loop {
std::thread::sleep(Duration::from_secs(3600));
}
}
fn run_event_loop(
config: &Config,
watch_config: &WatchConfig,
running: &std::sync::atomic::AtomicBool,
) -> Result<()> {
let debounce = Duration::from_millis(watch_config.debounce_ms);
let idle_timeout = Duration::from_secs(IDLE_TIMEOUT_SECS);
let (tx, rx) = mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
let _ = tx.send(event);
}
})
.context("failed to create file watcher")?;
let entries = discover_entries()?;
let mut watched_files: Vec<PathBuf> = Vec::new();
let mut reactive_paths: HashSet<PathBuf> = HashSet::new();
let mut stream_states: HashMap<PathBuf, StreamState> = HashMap::new();
for entry in &entries {
match entry.mode {
DocMode::FileWatch => {
if let Err(e) = watcher.watch(&entry.path, RecursiveMode::NonRecursive) {
eprintln!("Warning: could not watch {}: {}", entry.path.display(), e);
} else {
watched_files.push(entry.path.clone());
}
}
DocMode::StreamCapture => {
stream_states.insert(
entry.path.clone(),
StreamState {
pane: entry.pane.clone(),
last_capture: String::new(),
target: entry.target.clone(),
max_lines: entry.max_lines,
},
);
if entry.reactive {
if let Err(e) = watcher.watch(&entry.path, RecursiveMode::NonRecursive) {
eprintln!("Warning: could not watch {}: {}", entry.path.display(), e);
} else {
watched_files.push(entry.path.clone());
reactive_paths.insert(entry.path.clone());
}
}
}
}
}
let file_count = watched_files.len();
let stream_count = stream_states.len();
if file_count == 0 && stream_count == 0 {
eprintln!("No session files found. Watching for new sessions...");
} else {
eprintln!(
"Watching {} file(s), {} stream(s)",
file_count, stream_count
);
}
let mut states: HashMap<PathBuf, FileState> = HashMap::new();
let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
let mut last_rescan = Instant::now();
let mut idle_since: Option<Instant> = None;
let tmux = sessions::Tmux::default_server();
while running.load(std::sync::atomic::Ordering::Relaxed) {
if !Path::new(PID_FILE).exists() {
eprintln!("PID file removed — shutting down.");
break;
}
let has_active = !watched_files.is_empty() || !stream_states.is_empty();
if has_active {
idle_since = None;
} else {
let idle_start = *idle_since.get_or_insert_with(Instant::now);
if Instant::now().duration_since(idle_start) >= idle_timeout {
eprintln!("No active sessions for {}s — shutting down.", IDLE_TIMEOUT_SECS);
break;
}
}
if last_rescan.elapsed() > Duration::from_secs(10) {
let new_entries = discover_entries().unwrap_or_default();
for entry in &new_entries {
match entry.mode {
DocMode::FileWatch => {
if !watched_files.contains(&entry.path) {
if let Err(e) =
watcher.watch(&entry.path, RecursiveMode::NonRecursive)
{
eprintln!(
"Warning: could not watch {}: {}",
entry.path.display(),
e
);
} else {
eprintln!("Now watching {}", entry.path.display());
watched_files.push(entry.path.clone());
}
}
}
DocMode::StreamCapture => {
if !stream_states.contains_key(&entry.path) {
eprintln!("Now streaming {}", entry.path.display());
stream_states.insert(
entry.path.clone(),
StreamState {
pane: entry.pane.clone(),
last_capture: String::new(),
target: entry.target.clone(),
max_lines: entry.max_lines,
},
);
}
if entry.reactive && !reactive_paths.contains(&entry.path) {
if !watched_files.contains(&entry.path) {
if let Err(e) =
watcher.watch(&entry.path, RecursiveMode::NonRecursive)
{
eprintln!(
"Warning: could not watch {}: {}",
entry.path.display(),
e
);
} else {
eprintln!(
"Now watching {} (reactive)",
entry.path.display()
);
watched_files.push(entry.path.clone());
}
}
reactive_paths.insert(entry.path.clone());
}
}
}
}
let dead_streams: Vec<PathBuf> = stream_states
.iter()
.filter(|(_, ss)| !tmux.pane_alive(&ss.pane))
.map(|(p, _)| p.clone())
.collect();
for path in dead_streams {
eprintln!("Stream pane dead for {} — removing", path.display());
stream_states.remove(&path);
}
last_rescan = Instant::now();
}
for (path, ss) in &mut stream_states {
match sessions::capture_pane(&tmux, &ss.pane) {
Ok(captured) => {
if captured != ss.last_capture {
let new_content = extract_new_lines(&ss.last_capture, &captured);
let limited = limit_lines(&new_content, ss.max_lines);
if !limited.is_empty() {
match stream::flush_to_document(path, &limited, &ss.target, "") {
Ok(()) => {
eprint!(".");
}
Err(e) => {
eprintln!(
"[watch-stream] flush error for {}: {}",
path.display(),
e
);
}
}
}
ss.last_capture = captured;
}
}
Err(e) => {
eprintln!(
"[watch-stream] capture error for {}: {}",
path.display(),
e
);
}
}
}
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(event) => {
if matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_)
) {
for path in event.paths {
let canonical = path.canonicalize().unwrap_or(path);
if watched_files.iter().any(|w| {
w.canonicalize().unwrap_or_else(|_| w.clone()) == canonical
}) {
pending.insert(canonical, Instant::now());
}
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
let now = Instant::now();
let ready: Vec<PathBuf> = pending
.iter()
.filter(|(path, when)| {
let effective_debounce = if reactive_paths.contains(*path) {
Duration::ZERO
} else {
debounce
};
now.duration_since(**when) >= effective_debounce
})
.map(|(path, _)| path.clone())
.collect();
for path in ready {
pending.remove(&path);
let state = states.entry(path.clone()).or_insert_with(FileState::new);
let is_agent_change = state
.last_run
.is_some_and(|t| now.duration_since(t) < debounce * 3);
if is_agent_change {
state.cycle_count += 1;
if state.cycle_count > watch_config.max_cycles {
eprintln!(
"Max cycles ({}) reached for {} — skipping",
watch_config.max_cycles,
path.display()
);
continue;
}
let current_hash = hash_content(&path);
if current_hash.is_some() && current_hash == state.last_hash {
eprintln!("Converged for {} — skipping", path.display());
state.cycle_count = 0;
continue;
}
state.last_hash = current_hash;
} else {
state.cycle_count = 0;
state.last_hash = hash_content(&path);
}
let file_str = path.to_string_lossy().to_string();
if agent_doc::debounce::is_busy(&file_str) {
eprintln!("[watch] skipping {} — busy (active operation in progress)", path.display());
continue;
}
eprintln!("Change detected: {}", path.display());
match run::run(&path, false, None, None, false, false, config) {
Ok(()) => {
state.last_run = Some(Instant::now());
eprintln!("Submit complete: {}", path.display());
}
Err(e) => {
eprintln!("Submit failed for {}: {}", path.display(), e);
}
}
}
}
Ok(())
}
fn discover_entries() -> Result<Vec<WatchEntry>> {
let registry = sessions::load()?;
let mut entries = Vec::new();
for entry in registry.values() {
let path = PathBuf::from(&entry.file);
if !path.exists() {
continue;
}
let canonical = path.canonicalize().unwrap_or(path);
let (mode, target, reactive, max_lines) = match std::fs::read_to_string(&canonical) {
Ok(content) => match frontmatter::parse(&content) {
Ok((fm, _)) => {
let resolved = fm.resolve_mode();
if resolved.is_crdt() {
let target = fm
.stream_config
.as_ref()
.and_then(|sc| sc.target.clone())
.unwrap_or_else(|| "console".to_string());
let max_lines = fm
.stream_config
.as_ref()
.and_then(|sc| sc.max_lines)
.unwrap_or(50);
(DocMode::StreamCapture, target, true, max_lines)
} else {
(DocMode::FileWatch, String::new(), false, 50)
}
}
Err(_) => (DocMode::FileWatch, String::new(), false, 50),
},
Err(_) => (DocMode::FileWatch, String::new(), false, 50),
};
entries.push(WatchEntry {
path: canonical,
pane: entry.pane.clone(),
mode,
target,
max_lines,
reactive,
});
}
Ok(entries)
}
#[cfg(test)]
fn discover_files() -> Result<Vec<PathBuf>> {
Ok(discover_entries()?
.into_iter()
.map(|e| e.path)
.collect())
}
fn extract_new_lines(old: &str, new: &str) -> String {
let old_lines: Vec<&str> = old.lines().collect();
let new_lines: Vec<&str> = new.lines().collect();
let common_prefix = old_lines
.iter()
.zip(new_lines.iter())
.take_while(|(a, b)| a == b)
.count();
if common_prefix < new_lines.len() {
new_lines[common_prefix..].join("\n")
} else {
String::new()
}
}
pub fn stop() -> Result<()> {
match read_pid() {
Some(pid) => {
if pid_alive(pid) {
remove_pid();
eprintln!("Signaled watch daemon (PID {}) to stop.", pid);
} else {
remove_pid();
eprintln!(
"Watch daemon (PID {}) was not running. Cleaned up PID file.",
pid
);
}
}
None => {
eprintln!("No watch daemon running.");
}
}
Ok(())
}
pub fn status() -> Result<()> {
match read_pid() {
Some(pid) => {
if pid_alive(pid) {
println!("Watch daemon running (PID {})", pid);
} else {
println!("Watch daemon not running (stale PID file: {})", pid);
}
}
None => {
println!("Watch daemon not running.");
}
}
Ok(())
}
fn find_project_root(path: &Path) -> Option<PathBuf> {
let mut current = if path.is_file() {
path.parent()?
} else {
path
};
loop {
if current.join(".agent-doc").is_dir() {
return Some(current.to_path_buf());
}
current = current.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn pid_file_roundtrip() {
let dir = TempDir::new().unwrap();
let _guard = std::env::set_current_dir(dir.path());
std::fs::create_dir_all(".agent-doc").unwrap();
write_pid().unwrap();
let pid = read_pid().unwrap();
assert_eq!(pid, std::process::id());
remove_pid();
assert!(read_pid().is_none());
}
#[test]
fn pid_alive_self() {
assert!(pid_alive(std::process::id()));
}
#[test]
fn pid_alive_nonexistent() {
assert!(!pid_alive(4_294_967_295));
}
#[test]
fn discover_empty_registry() {
let dir = TempDir::new().unwrap();
let _guard = std::env::set_current_dir(dir.path());
let files = discover_files().unwrap();
assert!(files.is_empty());
}
#[test]
fn hash_deterministic() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.md");
std::fs::write(&path, "hello world").unwrap();
let h1 = hash_content(&path).unwrap();
let h2 = hash_content(&path).unwrap();
assert_eq!(h1, h2);
}
#[test]
fn hash_changes_with_content() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.md");
std::fs::write(&path, "version 1").unwrap();
let h1 = hash_content(&path).unwrap();
std::fs::write(&path, "version 2").unwrap();
let h2 = hash_content(&path).unwrap();
assert_ne!(h1, h2);
}
#[test]
fn loop_prevention_counter() {
let mut state = FileState::new();
assert_eq!(state.cycle_count, 0);
state.cycle_count += 1;
assert_eq!(state.cycle_count, 1);
state.cycle_count = 0; assert_eq!(state.cycle_count, 0);
}
#[test]
fn convergence_detection() {
let mut state = FileState::new();
state.last_hash = Some(42);
assert_eq!(state.last_hash, Some(42));
}
#[test]
fn extract_new_lines_appended() {
let old = "line 1\nline 2\nline 3";
let new = "line 1\nline 2\nline 3\nline 4\nline 5";
let result = extract_new_lines(old, new);
assert_eq!(result, "line 4\nline 5");
}
#[test]
fn extract_new_lines_modified() {
let old = "line 1\nline 2\nline 3";
let new = "line 1\nchanged\nline 3\nline 4";
let result = extract_new_lines(old, new);
assert_eq!(result, "changed\nline 3\nline 4");
}
#[test]
fn extract_new_lines_identical() {
let old = "line 1\nline 2";
let new = "line 1\nline 2";
let result = extract_new_lines(old, new);
assert_eq!(result, "");
}
#[test]
fn extract_new_lines_empty_old() {
let old = "";
let new = "line 1\nline 2";
let result = extract_new_lines(old, new);
assert_eq!(result, "line 1\nline 2");
}
#[test]
fn extract_new_lines_empty_new() {
let old = "line 1\nline 2";
let new = "";
let result = extract_new_lines(old, new);
assert_eq!(result, "");
}
#[test]
fn stream_state_tracks_capture() {
let mut ss = StreamState {
pane: "%42".to_string(),
last_capture: String::new(),
target: "console".to_string(),
max_lines: 50,
};
let capture = "claude output line 1\nclaude output line 2".to_string();
let new_content = extract_new_lines(&ss.last_capture, &capture);
assert_eq!(new_content, "claude output line 1\nclaude output line 2");
ss.last_capture = capture;
let capture2 = "claude output line 1\nclaude output line 2\nclaude output line 3".to_string();
let new_content2 = extract_new_lines(&ss.last_capture, &capture2);
assert_eq!(new_content2, "claude output line 3");
ss.last_capture = capture2;
}
#[test]
fn doc_mode_eq() {
assert_eq!(DocMode::FileWatch, DocMode::FileWatch);
assert_eq!(DocMode::StreamCapture, DocMode::StreamCapture);
assert_ne!(DocMode::FileWatch, DocMode::StreamCapture);
}
#[test]
#[ignore] fn watcher_detects_change() {
use std::time::Duration;
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.md");
std::fs::write(&path, "initial").unwrap();
let (tx, rx) = mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
let _ = tx.send(event);
}
})
.unwrap();
watcher
.watch(&path, RecursiveMode::NonRecursive)
.unwrap();
std::thread::sleep(Duration::from_millis(100));
std::fs::write(&path, "modified").unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(!event.paths.is_empty());
}
}