use std::{
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::{self, RecvTimeoutError},
},
time::Duration,
};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Duration as ChronoDuration, SecondsFormat, Utc};
use notify::{Config as NotifyConfig, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use objects::{object::ChangeId, store::ObjectStore};
use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
use repo::Repository;
use serde::Serialize;
use super::{advice::RecoveryAdvice, command_runtime_contract};
use crate::cli::{
Cli, JsonOutputMode, WatchArgs, json_output_mode_for_kind,
style::{accent, change_id as style_change_id, confidence as style_confidence, dim, warn},
};
const DEFAULT_POLL_INTERVAL_MS: u64 = 200;
const INTENT_DISPLAY_WIDTH: usize = 50;
const MAX_TAIL_WINDOW: usize = 100_000;
fn valid_filter_kinds() -> Vec<&'static str> {
let mut kinds = OpRecord::verbs(true);
kinds.push("merge");
kinds
}
pub async fn cmd_watch(cli: &Cli, args: WatchArgs) -> Result<()> {
let repo = cli.open_repo().context("opening repository for watch")?;
let heddle_dir = repo.heddle_dir().to_path_buf();
let oplog_path = oplog_file_path(&heddle_dir);
if !oplog_path.parent().is_some_and(Path::is_dir) {
let path = oplog_path
.parent()
.map(Path::display)
.map_or_else(|| "<unknown>".to_string(), |display| display.to_string());
return Err(anyhow!(RecoveryAdvice::invalid_usage(
"watch_oplog_missing",
format!("oplog directory missing at {path}; run `heddle init` first"),
"Run `heddle init` in this repository before watching oplog events.",
"heddle init",
)));
}
let json_mode = json_mode(cli, &args);
let filter = parse_filter(args.filter.as_deref())?;
let since_cutoff = match args.since.as_deref() {
Some(spec) => Some(parse_since(spec)?),
None => None,
};
let renderer = Renderer {
json: json_mode,
filter,
};
let mut watermark: u64 = 0;
if let Some(cutoff) = since_cutoff {
let entries = drain_pending(&heddle_dir, watermark, &repo, Some(cutoff), MAX_TAIL_WINDOW)?;
for emitted in &entries {
renderer.emit(emitted);
}
if let Some(last) = entries.iter().map(|e| e.entry.id).max() {
watermark = last;
}
}
if watermark == 0 {
let log = OpLog::new_unattributed(&heddle_dir);
if let Some(last) = log.last().context("reading oplog head")? {
watermark = last.id;
}
}
let stop_flag = install_sigint_handler();
let poll_interval =
Duration::from_millis(args.poll_interval_ms.unwrap_or(DEFAULT_POLL_INTERVAL_MS));
tail_loop(
&heddle_dir,
&oplog_path,
&repo,
&renderer,
watermark,
stop_flag,
poll_interval,
args.max_iterations,
)?;
Ok(())
}
fn oplog_file_path(heddle_dir: &Path) -> PathBuf {
heddle_dir.join("oplog").join("oplog.bin")
}
fn json_mode(cli: &Cli, _args: &WatchArgs) -> bool {
let contract =
command_runtime_contract("watch").expect("watch command contract should be registered");
matches!(
json_output_mode_for_kind(cli, None, contract.json_kind),
JsonOutputMode::Jsonl
)
}
fn parse_filter(spec: Option<&str>) -> Result<Option<Vec<String>>> {
let Some(raw) = spec else {
return Ok(None);
};
let kinds: Vec<String> = raw
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect();
if kinds.is_empty() {
return Ok(None);
}
let valid = valid_filter_kinds();
for kind in &kinds {
if !valid.contains(&kind.as_str()) {
return Err(anyhow!(RecoveryAdvice::invalid_usage(
"watch_filter_invalid",
format!(
"unknown event kind in --filter: {kind:?} (valid: {})",
valid.join(", ")
),
"Use one of the valid watch event kinds, or omit `--filter`.",
"heddle watch --filter snapshot",
)));
}
}
Ok(Some(kinds))
}
fn parse_since(spec: &str) -> Result<DateTime<Utc>> {
let trimmed = spec.trim();
if trimmed.is_empty() {
return Err(anyhow!(RecoveryAdvice::invalid_usage(
"watch_since_empty",
"--since cannot be empty",
"Use a duration like `30s`, `5m`, `1h`, or `2d`, or omit `--since`.",
"heddle watch --since 5m",
)));
}
let (num_part, unit) = trimmed.split_at(
trimmed
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(trimmed.len()),
);
let n: i64 = num_part
.parse()
.with_context(|| format!("--since: expected leading digits in {spec:?}"))?;
let delta = match unit {
"s" | "" => ChronoDuration::seconds(n),
"m" => ChronoDuration::minutes(n),
"h" => ChronoDuration::hours(n),
"d" => ChronoDuration::days(n),
other => {
return Err(anyhow!(
"--since: unknown unit {other:?} (use s, m, h, or d)"
));
}
};
Ok(Utc::now() - delta)
}
fn install_sigint_handler() -> Arc<AtomicBool> {
let stop = Arc::new(AtomicBool::new(false));
let stop_for_handler = Arc::clone(&stop);
drop(tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
stop_for_handler.store(true, Ordering::SeqCst);
}
}));
stop
}
#[allow(clippy::too_many_arguments)]
fn tail_loop(
heddle_dir: &Path,
oplog_path: &Path,
repo: &Repository,
renderer: &Renderer,
mut watermark: u64,
stop_flag: Arc<AtomicBool>,
poll_interval: Duration,
max_iterations: Option<usize>,
) -> Result<()> {
let (tx, rx) = mpsc::channel();
let watch_target = if oplog_path.exists() {
oplog_path.to_path_buf()
} else {
oplog_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| heddle_dir.to_path_buf())
};
let mut watcher: RecommendedWatcher = RecommendedWatcher::new(
move |event| {
let _ = tx.send(event);
},
NotifyConfig::default(),
)
.context("constructing notify watcher")?;
watcher
.watch(&watch_target, RecursiveMode::NonRecursive)
.with_context(|| format!("watching {}", watch_target.display()))?;
let mut iterations = 0usize;
loop {
if stop_flag.load(Ordering::SeqCst) {
break;
}
match rx.recv_timeout(poll_interval) {
Ok(Ok(event)) => {
if !is_relevant_event(&event.kind) {
continue;
}
while let Ok(_extra) = rx.try_recv() {}
if drain_and_emit_pending(heddle_dir, repo, renderer, &mut watermark)? {
iterations += 1;
if max_iterations.is_some_and(|limit| iterations >= limit) {
break;
}
}
}
Ok(Err(_err)) => {
}
Err(RecvTimeoutError::Timeout) => {
if drain_and_emit_pending(heddle_dir, repo, renderer, &mut watermark)? {
iterations += 1;
if max_iterations.is_some_and(|limit| iterations >= limit) {
break;
}
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
Ok(())
}
fn drain_and_emit_pending(
heddle_dir: &Path,
repo: &Repository,
renderer: &Renderer,
watermark: &mut u64,
) -> Result<bool> {
let entries = drain_pending(heddle_dir, *watermark, repo, None, MAX_TAIL_WINDOW)?;
for emitted in &entries {
renderer.emit(emitted);
}
if let Some(last) = entries.iter().map(|e| e.entry.id).max() {
*watermark = last;
}
Ok(!entries.is_empty())
}
fn is_relevant_event(kind: &EventKind) -> bool {
matches!(
kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
)
}
fn drain_pending(
heddle_dir: &Path,
watermark: u64,
repo: &Repository,
since_cutoff: Option<DateTime<Utc>>,
window: usize,
) -> Result<Vec<EmittedEntry>> {
let log = OpLog::new_unattributed(heddle_dir);
let mut recent = log.recent(window).context("reading recent oplog entries")?;
recent.reverse();
let mut out = Vec::new();
for entry in recent {
if entry.id <= watermark {
continue;
}
if let Some(cutoff) = since_cutoff
&& entry.timestamp < cutoff
{
continue;
}
out.push(annotate_entry(entry, repo));
}
Ok(out)
}
fn annotate_entry(entry: OpEntry, repo: &Repository) -> EmittedEntry {
let kind = kind_for(&entry.operation);
let thread = thread_for(&entry.operation, &kind);
let change = primary_change_id(&entry.operation);
let (intent, confidence, actor) = match &change {
Some(id) => state_lookup(repo, id),
None => (None, None, None),
};
EmittedEntry {
entry,
kind,
thread,
change_id: change,
intent,
confidence,
actor,
}
}
fn state_lookup(
repo: &Repository,
change_id: &ChangeId,
) -> (Option<String>, Option<f32>, Option<ActorInfo>) {
let Ok(Some(state)) = repo.store().get_state(change_id) else {
return (None, None, None);
};
let actor = state.attribution.agent.as_ref().map(|agent| ActorInfo {
provider: agent.provider.clone(),
model: agent.model.clone(),
});
(state.intent.clone(), state.confidence, actor)
}
fn kind_for(op: &OpRecord) -> String {
op.verb().to_string()
}
fn thread_for(op: &OpRecord, _kind: &str) -> Option<String> {
match op {
OpRecord::Snapshot { thread, .. } => thread.clone(),
OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
OpRecord::Checkpoint { thread, .. } => thread.clone(),
OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
OpRecord::RemoteThreadUpdate { thread, .. }
| OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
OpRecord::Goto { .. }
| OpRecord::Fork { .. }
| OpRecord::Collapse { .. }
| OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::Redact { .. }
| OpRecord::UndoRecoveryUpdate { .. }
| OpRecord::StateVisibilitySet { .. }
| OpRecord::StateVisibilityPromote { .. }
| OpRecord::Purge { .. } => None,
}
}
fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
match op {
OpRecord::Snapshot { new_state, .. } => Some(*new_state),
OpRecord::Goto { target, .. } => Some(*target),
OpRecord::ThreadCreate { state, .. } => Some(*state),
OpRecord::ThreadDelete { state, .. } => Some(*state),
OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
OpRecord::Fork { new_state, .. } => Some(*new_state),
OpRecord::Collapse { result, .. } => Some(*result),
OpRecord::MarkerCreate { state, .. } => Some(*state),
OpRecord::MarkerDelete { state, .. } => Some(*state),
OpRecord::Checkpoint { state, .. } => Some(*state),
OpRecord::GitCheckpoint { state, .. } => Some(*state),
OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
OpRecord::Redact { state, .. } => Some(*state),
OpRecord::StateVisibilitySet { state, .. }
| OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
Some(*state)
}
OpRecord::UndoRecoveryUpdate { state } => Some(*state),
OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::Purge { .. }
| OpRecord::FastForward { .. } => None,
}
}
#[derive(Clone, Debug)]
struct EmittedEntry {
entry: OpEntry,
kind: String,
thread: Option<String>,
change_id: Option<ChangeId>,
intent: Option<String>,
confidence: Option<f32>,
actor: Option<ActorInfo>,
}
#[derive(Clone, Debug, Serialize)]
struct ActorInfo {
provider: String,
model: String,
}
#[derive(Serialize)]
struct WatchLineJson<'a> {
ts: String,
thread: Option<&'a str>,
kind: &'a str,
change_id: Option<String>,
intent: Option<&'a str>,
confidence: Option<f32>,
actor: Option<&'a ActorInfo>,
id: u64,
}
struct Renderer {
json: bool,
filter: Option<Vec<String>>,
}
impl Renderer {
fn emit(&self, entry: &EmittedEntry) {
if !self.passes_filter(entry) {
return;
}
let line = if self.json {
self.render_json(entry)
} else {
self.render_text(entry)
};
println!("{line}");
}
fn passes_filter(&self, entry: &EmittedEntry) -> bool {
let Some(allowed) = &self.filter else {
return true;
};
let kind = entry.kind.as_str();
allowed
.iter()
.any(|k| k == kind || (k == "merge" && kind == "thread_update"))
}
fn render_json(&self, e: &EmittedEntry) -> String {
let view = WatchLineJson {
ts: e.entry.timestamp.to_rfc3339_opts(SecondsFormat::Secs, true),
thread: e.thread.as_deref(),
kind: e.kind.as_str(),
change_id: e.change_id.as_ref().map(|id| id.to_string_full()),
intent: e.intent.as_deref(),
confidence: e.confidence,
actor: e.actor.as_ref(),
id: e.entry.id,
};
serde_json::to_string(&view).unwrap_or_else(|err| {
format!("{{\"error\":\"json render failed: {err}\"}}")
})
}
fn render_text(&self, e: &EmittedEntry) -> String {
let ts = e.entry.timestamp.format("%H:%M:%S");
let thread = truncate(e.thread.as_deref().unwrap_or("-"), 28);
let kind_plain = e.kind.as_str();
let kind_styled = style_kind(kind_plain);
let kind_pad = pad_right(kind_plain, 15);
let kind_field = kind_styled + &" ".repeat(kind_pad.len() - kind_plain.len());
let change = e
.change_id
.map(|id| id.short())
.unwrap_or_else(|| "-".to_string());
let change_field = pad_right(&style_change_id(&change), 15 + ansi_overhead(&change));
let intent = truncate(e.intent.as_deref().unwrap_or(""), INTENT_DISPLAY_WIDTH);
let intent_padded = pad_right(&intent, INTENT_DISPLAY_WIDTH);
let confidence_field = match e.confidence {
Some(c) => style_confidence(Some(c), &format!("conf={c:.2}")),
None => String::new(),
};
format!(
"{ts} {thread:<28} {kind_field} {change_field} {intent_padded} {confidence_field}",
ts = dim(&ts.to_string()),
)
.trim_end()
.to_string()
}
}
fn style_kind(kind: &str) -> String {
match kind {
"snapshot" | "thread_update" => accent(kind),
"thread_create" | "marker_create" | "fork" | "collapse" | "goto" => dim(kind),
"thread_delete" | "marker_delete" => warn(kind),
_ => kind.to_string(),
}
}
fn truncate(s: &str, max: usize) -> String {
if s.chars().count() <= max {
return s.to_string();
}
if max == 0 {
return String::new();
}
let mut out: String = s.chars().take(max - 1).collect();
out.push('…');
out
}
fn pad_right(s: &str, width: usize) -> String {
let visible = visible_width(s);
if visible >= width {
return s.to_string();
}
format!("{s}{}", " ".repeat(width - visible))
}
fn visible_width(s: &str) -> usize {
let mut count = 0usize;
let mut in_escape = false;
for ch in s.chars() {
if in_escape {
if ch == 'm' {
in_escape = false;
}
continue;
}
if ch == '\x1b' {
in_escape = true;
continue;
}
count += 1;
}
count
}
fn ansi_overhead(plain: &str) -> usize {
let styled = style_change_id(plain);
styled.len() - plain.len()
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use objects::object::ChangeId;
use oplog::{OpEntry, OpRecord};
use tempfile::TempDir;
use super::*;
fn make_entry(id: u64, op: OpRecord) -> OpEntry {
OpEntry {
id,
timestamp: Utc.with_ymd_and_hms(2026, 5, 2, 22, 43, 8).unwrap(),
operation: op,
undone: false,
batch_id: id,
batch_index: 0,
scope: None,
actor: std::sync::Arc::new(objects::object::Principal::new("Test", "test@example.com")),
operation_id: None,
}
}
fn write_entries(heddle_dir: &Path, entries: Vec<OpRecord>) -> Vec<u64> {
let log = OpLog::new_unattributed(heddle_dir);
log.init().expect("init oplog");
log.record_batch(entries).expect("record batch")
}
fn synthetic_repo() -> (TempDir, PathBuf) {
let tmp = TempDir::new().unwrap();
let heddle = tmp.path().join(".heddle");
std::fs::create_dir_all(&heddle).unwrap();
let log = OpLog::new_unattributed(&heddle);
log.init().unwrap();
let path = heddle.clone();
(tmp, path)
}
#[test]
fn parse_since_accepts_common_units() {
let now = Utc::now();
let cases = [("30s", 30), ("5m", 5 * 60), ("2h", 2 * 60 * 60)];
for (spec, secs) in cases {
let cutoff = parse_since(spec).unwrap();
let delta = (now - cutoff).num_seconds();
assert!((delta - secs).abs() <= 2, "spec {spec}: delta={delta}");
}
}
#[test]
fn parse_since_rejects_unknown_unit() {
assert!(parse_since("5x").is_err());
assert!(parse_since("").is_err());
}
#[test]
fn parse_filter_validates_kinds() {
assert!(parse_filter(None).unwrap().is_none());
assert!(parse_filter(Some("")).unwrap().is_none());
let parsed = parse_filter(Some("snapshot,merge")).unwrap().unwrap();
assert_eq!(parsed, vec!["snapshot", "merge"]);
assert!(parse_filter(Some("not_a_real_kind")).is_err());
}
#[test]
fn filter_accepts_newer_emitted_kinds() {
for kind in [
"remote_thread_update",
"remote_thread_delete",
"transaction_commit",
"redact",
"purge",
"git_checkpoint",
"undo_recovery_update",
] {
assert!(
parse_filter(Some(kind)).is_ok(),
"filter kind {kind:?} must be accepted (it is a real emitted kind)"
);
}
}
#[test]
fn drain_pending_emits_only_above_watermark() {
let (_tmp, heddle_dir) = synthetic_repo();
let cid_a = ChangeId::generate();
let cid_b = ChangeId::generate();
let ids = write_entries(
&heddle_dir,
vec![
OpRecord::Snapshot {
new_state: cid_a,
prev_head: None,
head: None,
thread: Some("modulo-race".into()),
},
OpRecord::ThreadCreate {
name: "approach-anthropic".into(),
state: cid_b,
manager_snapshot: None,
},
],
);
assert_eq!(ids.len(), 2);
let log = OpLog::new_unattributed(&heddle_dir);
let entries = log.recent(usize::MAX).unwrap();
let seen_ids: Vec<u64> = entries.iter().map(|e| e.id).collect();
assert_eq!(seen_ids, vec![ids[1], ids[0]]);
let above_first: Vec<_> = entries.iter().rev().filter(|e| e.id > ids[0]).collect();
assert_eq!(above_first.len(), 1);
assert_eq!(above_first[0].id, ids[1]);
}
#[test]
fn renderer_filter_passes_only_matching_kinds() {
let renderer = Renderer {
json: true,
filter: Some(vec!["snapshot".into()]),
};
let snap_id = ChangeId::generate();
let snap = EmittedEntry {
entry: make_entry(
1,
OpRecord::Snapshot {
new_state: snap_id,
prev_head: None,
head: Some(snap_id),
thread: None,
},
),
kind: "snapshot".into(),
thread: None,
change_id: None,
intent: None,
confidence: None,
actor: None,
};
let create = EmittedEntry {
entry: make_entry(
2,
OpRecord::ThreadCreate {
name: "x".into(),
state: ChangeId::generate(),
manager_snapshot: None,
},
),
kind: "thread_create".into(),
thread: Some("x".into()),
change_id: None,
intent: None,
confidence: None,
actor: None,
};
assert!(renderer.passes_filter(&snap));
assert!(!renderer.passes_filter(&create));
}
#[test]
fn render_json_round_trips() {
let renderer = Renderer {
json: true,
filter: None,
};
let cid = ChangeId::generate();
let entry = EmittedEntry {
entry: make_entry(
7,
OpRecord::Snapshot {
new_state: cid,
prev_head: None,
head: None,
thread: Some("modulo-race/approach-anthropic".into()),
},
),
kind: "snapshot".into(),
thread: Some("modulo-race/approach-anthropic".into()),
change_id: Some(cid),
intent: Some("feat(modulo): error-returning impl".into()),
confidence: Some(0.92),
actor: Some(ActorInfo {
provider: "anthropic".into(),
model: "claude-sonnet-4-5".into(),
}),
};
let line = renderer.render_json(&entry);
let value: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(value["kind"], "snapshot");
assert_eq!(value["thread"], "modulo-race/approach-anthropic");
assert_eq!(value["confidence"], 0.92);
assert_eq!(value["actor"]["provider"], "anthropic");
assert_eq!(value["id"], 7);
assert!(value["change_id"].is_string());
assert!(value["ts"].as_str().unwrap().ends_with('Z'));
}
#[test]
fn truncate_handles_ascii_and_unicode() {
assert_eq!(truncate("short", 10), "short");
assert_eq!(truncate("aaaaaaaaaa", 5), "aaaa…");
assert_eq!(truncate("résumé café", 6), "résum…");
}
#[test]
fn render_text_columns_preserve_widths() {
let renderer = Renderer {
json: false,
filter: None,
};
let cid = ChangeId::generate();
let entry = EmittedEntry {
entry: make_entry(
1,
OpRecord::Snapshot {
new_state: cid,
prev_head: None,
head: None,
thread: Some("modulo-race/approach-anthropic".into()),
},
),
kind: "snapshot".into(),
thread: Some("modulo-race/approach-anthropic".into()),
change_id: Some(cid),
intent: Some("feat(modulo): error-returning impl".into()),
confidence: Some(0.92),
actor: None,
};
let line = renderer.render_text(&entry);
assert!(line.contains("22:43:08"), "missing timestamp: {line}");
assert!(line.contains("snapshot"), "missing kind: {line}");
assert!(line.contains("conf=0.92"), "missing conf: {line}");
let visible = visible_width(&line);
assert!(visible >= 80, "line too short: {visible}");
}
#[test]
fn primary_change_id_covers_all_variants() {
let cid = ChangeId::generate();
for op in [
OpRecord::Snapshot {
new_state: cid,
prev_head: None,
head: Some(cid),
thread: None,
},
OpRecord::Goto {
target: cid,
prev_head: None,
head: cid,
},
OpRecord::ThreadCreate {
name: "x".into(),
state: cid,
manager_snapshot: None,
},
OpRecord::ThreadDelete {
name: "x".into(),
state: cid,
},
OpRecord::ThreadUpdate {
name: "x".into(),
old_state: cid,
new_state: cid,
manager_snapshots: None,
},
OpRecord::Fork {
from: cid,
new_state: cid,
thread: None,
head: None,
},
OpRecord::Collapse {
sources: vec![cid],
result: cid,
thread: None,
pre_thread_state: None,
},
OpRecord::MarkerCreate {
name: "m".into(),
state: cid,
},
OpRecord::MarkerDelete {
name: "m".into(),
state: cid,
},
] {
assert!(primary_change_id(&op).is_some());
}
}
}