Skip to main content

agent_trace/runtime/
change_processor.rs

1use crate::config::MergedConfig;
2use crate::git_store::{CommitInfo, GitStore};
3use crate::manifest::Manifest;
4use crate::permissions::{check_permission, Overrides, PermissionResult, Violation};
5use crate::session;
6use crate::trace::pipeline::apply_trace_hooks;
7use crate::types::{Action, Actor, DocType, FileChange, LogEntry};
8use anyhow::Result;
9use chrono::Utc;
10use git2::Oid;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14pub use crate::runtime::session::AgentState;
15
16// ── UI Event channel ──────────────────────────────────────────────────────────
17
18#[derive(Debug, Clone)]
19pub enum UiEvent {
20    NewCommit(LogEntry),
21    Violation(String),
22}
23
24fn is_pid_alive(pid: u32) -> bool {
25    #[cfg(unix)]
26    {
27        // kill(pid, 0) returns 0 if process exists.
28        let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
29        result == 0
30    }
31    #[cfg(not(unix))]
32    {
33        let _ = pid;
34        false
35    }
36}
37
38// ── Change Processor ──────────────────────────────────────────────────────────
39
40pub struct ChangeProcessor {
41    pub git: GitStore,
42    pub manifest: Arc<Mutex<Manifest>>,
43    pub agent_state: AgentState,
44    pub ui_tx: Option<tokio::sync::mpsc::Sender<UiEvent>>,
45    /// Session ID used for agent log file naming.
46    session_id: String,
47    /// Last HEAD OID seen by the poll loop (dedup with external commits).
48    last_seen_oid: Oid,
49}
50
51impl ChangeProcessor {
52    pub fn new(
53        git: GitStore,
54        manifest: Arc<Mutex<Manifest>>,
55        _config: MergedConfig,
56        agent_state: AgentState,
57        ui_tx: Option<tokio::sync::mpsc::Sender<UiEvent>>,
58    ) -> Self {
59        let session_id =
60            session::session_id_for_store(&git.workdir).unwrap_or_else(session::new_session_id);
61        let last_seen_oid = git.head_oid().unwrap_or_else(|_| Oid::zero());
62        Self {
63            git,
64            manifest,
65            agent_state,
66            ui_tx,
67            session_id,
68            last_seen_oid,
69        }
70    }
71
72    fn refresh_session_id(&mut self) {
73        if let Some(sid) = session::session_id_for_store(&self.git.workdir) {
74            self.session_id = sid;
75        }
76    }
77
78    fn reload_manifest_from_disk(&self) -> Result<()> {
79        let loaded = Manifest::load(&self.git.workdir)?;
80        *self.manifest.lock().unwrap() = loaded;
81        Ok(())
82    }
83
84    fn poll_external_commits(&mut self) -> Result<()> {
85        let current_head = self.git.head_oid()?;
86        if current_head == self.last_seen_oid {
87            return Ok(());
88        }
89
90        let new_commits = self.git.commits_since(self.last_seen_oid)?;
91        if !new_commits.is_empty() {
92            // Violation-only commits do not touch manifest.toml; reloading would
93            // clobber in-memory registrations with a stale on-disk manifest.
94            let has_non_violation = new_commits
95                .iter()
96                .any(|e| !matches!(e.action, Action::Violation));
97            if has_non_violation {
98                if let Err(e) = self.reload_manifest_from_disk() {
99                    tracing::warn!("Failed to reload manifest after external commit: {}", e);
100                }
101            }
102            if let Some(tx) = &self.ui_tx {
103                for entry in new_commits {
104                    let _ = tx.try_send(UiEvent::NewCommit(entry));
105                }
106            }
107        }
108
109        self.last_seen_oid = current_head;
110        Ok(())
111    }
112
113    pub fn run_poll_cycle(&mut self) -> Result<()> {
114        self.refresh_session_id();
115
116        // Reload the manifest from disk at the start of each cycle. Disk is the
117        // source of truth: out-of-process writers (MCP/CLI) persist the manifest
118        // before committing, so reloading here keeps the poll loop's in-memory
119        // view consistent and avoids drift.
120        if let Err(e) = self.reload_manifest_from_disk() {
121            tracing::warn!("Failed to reload manifest at poll cycle start: {e}");
122        }
123
124        let store_root = self.git.workdir.clone();
125        let changes = self.git.detect_changes()?;
126        let mut own_commit_oid: Option<Oid> = None;
127
128        if changes.is_empty() {
129            return self.poll_external_commits();
130        }
131
132        let actor = self.agent_state.current_actor(&store_root);
133        let overrides = Overrides::load(&store_root).unwrap_or_default();
134
135        let mut allowed: Vec<(PathBuf, Action, DocType)> = Vec::new();
136        let mut violations: Vec<Violation> = Vec::new();
137
138        let mut manifest = self.manifest.lock().unwrap();
139
140        for change in &changes {
141            let path = change.path().clone();
142            let action = change.action();
143
144            // Determine doc type from manifest, default to Scratch for new files.
145            let doc_type = manifest
146                .find_by_path(&path)
147                .map(|d| d.doc_type.clone())
148                .unwrap_or(DocType::Scratch);
149
150            let perm = check_permission(&doc_type, &actor, &overrides, Some(&path));
151
152            match perm {
153                PermissionResult::Allowed => {
154                    // Poll-detected files are committed to git and recorded as
155                    // activity, but NOT auto-registered in the manifest: shell
156                    // edits to source files (e.g. worker.py) must not appear in
157                    // the curated document tree. New files are committed with an
158                    // ephemeral Scratch doc_type used only for the permission
159                    // check above.
160                    //
161                    // Keep manifest path metadata consistent across renames of
162                    // already-tracked documents.
163                    if let FileChange::Renamed { from, to } = change {
164                        if manifest.is_tracked(from) {
165                            let _ = manifest.update_path(from, to);
166                        }
167                    }
168                    allowed.push((path, action, doc_type));
169                }
170                PermissionResult::Denied { reason } => {
171                    // Revert the file.
172                    if let Err(e) = self.git.revert_file(&path) {
173                        tracing::warn!("Failed to revert {}: {}", path.display(), e);
174                    }
175
176                    // Save rejected content.
177                    let full = store_root.join(&path);
178                    if let Ok(content) = std::fs::read_to_string(&full) {
179                        let _ = self.git.save_rejected(&path, &content);
180                    }
181
182                    violations.push(Violation {
183                        timestamp: Utc::now(),
184                        doc_path: path.clone(),
185                        actor: actor.clone(),
186                        agent_name: actor.agent_name().map(String::from),
187                        attempted_action: action,
188                        reason: reason.clone(),
189                    });
190
191                    // Send UI warning.
192                    if let Some(tx) = &self.ui_tx {
193                        let msg = format!(
194                            "Permission denied: {} tried to modify {} ({})",
195                            actor,
196                            path.display(),
197                            reason
198                        );
199                        let _ = tx.try_send(UiEvent::Violation(msg));
200                    }
201                }
202                PermissionResult::RequiresConfirmation { .. } => {
203                    // In headless/poll mode, require confirmation → treat as allowed for user.
204                    allowed.push((path, action, doc_type));
205                }
206            }
207        }
208
209        // Commit violations.
210        for v in &violations {
211            let info = CommitInfo {
212                action: Action::Violation,
213                files: vec![(
214                    v.doc_path.clone(),
215                    v.attempted_action.clone(),
216                    manifest
217                        .find_by_path(&v.doc_path)
218                        .map(|d| d.doc_type.clone())
219                        .unwrap_or(DocType::Scratch),
220                )],
221                actor: Actor::System,
222                summary: format!(
223                    "violation: {} attempted {} on {} — {}",
224                    v.actor,
225                    v.attempted_action,
226                    v.doc_path.display(),
227                    v.reason
228                ),
229                agent_name: v.agent_name.clone(),
230                session_id: None,
231            };
232            if let Err(e) = self.git.commit(&info) {
233                tracing::error!(
234                    "Failed to commit violation record for {:?}: {:#}",
235                    v.doc_path,
236                    e
237                );
238            }
239        }
240
241        // Batch commit allowed changes.
242        if !allowed.is_empty() {
243            // WS-A: gate synthesis before committing. A degraded backend (with
244            // no escape hatch) must not produce commits whose trace hooks would
245            // emit degraded artifacts, so check the gate before the git commit
246            // rather than warning after the fact.
247            if let Err(e) = crate::runtime::require_synthesis_backend(Some(&store_root)) {
248                tracing::warn!(
249                    "Skipping poll commit of {} file(s) — synthesis backend unavailable: {e}",
250                    allowed.len()
251                );
252                drop(manifest);
253                return self.poll_external_commits();
254            }
255            let info = CommitInfo {
256                action: allowed[0].1.clone(),
257                files: allowed.clone(),
258                actor: actor.clone(),
259                summary: format!("{} {} file(s)", allowed[0].1, allowed.len()),
260                agent_name: actor.agent_name().map(String::from),
261                session_id: Some(self.session_id.clone()),
262            };
263            match self.git.commit(&info) {
264                Ok(oid) => {
265                    own_commit_oid = Some(oid);
266                    // Persist the manifest now that git is consistent.
267                    let _ = manifest.save(&store_root);
268                    let entry = LogEntry {
269                        commit_id: crate::types::CommitId(oid.to_string()),
270                        timestamp: Utc::now(),
271                        action: info.action,
272                        actor: actor.clone(),
273                        agent_name: actor.agent_name().map(String::from),
274                        files: info.files.clone(),
275                        summary: info.summary,
276                    };
277                    if let Some(tx) = &self.ui_tx {
278                        let _ = tx.try_send(UiEvent::NewCommit(entry));
279                    }
280
281                    if let Err(e) = apply_trace_hooks(
282                        &store_root,
283                        &self.git,
284                        &manifest,
285                        &actor,
286                        Some(&self.session_id),
287                        &allowed,
288                        "poll",
289                    ) {
290                        tracing::warn!("post-write trace hooks failed: {}", e);
291                    }
292                }
293                Err(e) => {
294                    // No poll-time manifest registrations to roll back; just
295                    // skip persisting and fall through to HEAD polling.
296                    tracing::warn!("Poll batch commit failed: {}", e);
297                    drop(manifest);
298                    return self.poll_external_commits();
299                }
300            }
301        }
302
303        drop(manifest);
304
305        // Sync to HEAD after poll-cycle commits (batch + trace hooks) so HEAD poll
306        // does not re-emit commits from this cycle.
307        if own_commit_oid.is_some() {
308            if let Ok(head) = self.git.head_oid() {
309                self.last_seen_oid = head;
310            }
311        }
312        self.poll_external_commits()
313    }
314}
315
316// ── Instance Locking ─────────────────────────────────────────────────────────
317
318pub struct InstanceLock {
319    path: PathBuf,
320}
321
322impl InstanceLock {
323    pub fn acquire(store_root: &Path) -> Result<Self> {
324        let path = store_root
325            .join(".agent-trace")
326            .join("locks")
327            .join("instance.lock");
328        std::fs::create_dir_all(path.parent().unwrap())?;
329
330        // Check for stale lock.
331        if path.exists() {
332            if let Ok(content) = std::fs::read_to_string(&path) {
333                if let Ok(pid) = content.trim().parse::<u32>() {
334                    if is_pid_alive(pid) {
335                        anyhow::bail!(
336                            "Another agent-trace instance is running (PID {pid}). \
337                             Opening in read-only mode."
338                        );
339                    }
340                }
341            }
342        }
343
344        let pid = std::process::id();
345        std::fs::write(&path, pid.to_string())?;
346        Ok(Self { path })
347    }
348}
349
350impl Drop for InstanceLock {
351    fn drop(&mut self) {
352        let _ = std::fs::remove_file(&self.path);
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use crate::config::{GlobalConfig, MergedConfig, PollingConfig, StoreConfig, StoreInfo};
360    use crate::session::AgentState;
361    use tempfile::TempDir;
362
363    fn setup(tmp: &TempDir) -> (GitStore, Arc<Mutex<Manifest>>, MergedConfig) {
364        let root = tmp.path();
365        std::fs::create_dir_all(root.join(".agent-trace").join("locks")).unwrap();
366        let git = GitStore::init(root).unwrap();
367        let info = StoreInfo::new("test".into());
368        let manifest = Manifest::create_empty(info.clone(), root).unwrap();
369        let global = GlobalConfig::default();
370        let store_cfg = StoreConfig {
371            store: info,
372            llm: None,
373            synthesis: None,
374            polling: PollingConfig::default(),
375        };
376        store_cfg.save(root).unwrap();
377        let config = MergedConfig::merge(global, store_cfg);
378        (git, Arc::new(Mutex::new(manifest)), config)
379    }
380
381    #[test]
382    fn test_poll_no_changes() {
383        let tmp = TempDir::new().unwrap();
384        let (git, manifest, config) = setup(&tmp);
385        let agent = AgentState::new(None);
386        let mut proc = ChangeProcessor::new(git, manifest, config, agent, None);
387        proc.run_poll_cycle().unwrap(); // should be a no-op
388    }
389
390    #[test]
391    fn test_poll_new_file_committed_not_registered() {
392        let tmp = TempDir::new().unwrap();
393        let (git, manifest, config) = setup(&tmp);
394
395        // Create a new source file (the kind of file a shell edit would touch).
396        std::fs::write(tmp.path().join("task.py"), "print('work')\n").unwrap();
397
398        let agent = AgentState::new(None);
399        let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
400        proc.run_poll_cycle().unwrap();
401
402        // WS-C: the file is NOT auto-registered in the manifest.
403        {
404            let m = manifest.lock().unwrap();
405            assert!(
406                !m.is_tracked(&PathBuf::from("task.py")),
407                "poll must not auto-register source files in the manifest"
408            );
409        }
410
411        // ...but it IS committed to git.
412        let store = GitStore::open(tmp.path()).unwrap();
413        let log = store.log(10).unwrap();
414        assert!(
415            log.iter().any(|e| e
416                .files
417                .iter()
418                .any(|(p, _, _)| p == &PathBuf::from("task.py"))),
419            "poll should still commit the new file to git"
420        );
421    }
422
423    #[test]
424    fn test_agent_state_no_lock_no_flag() {
425        let tmp = TempDir::new().unwrap();
426        let state = AgentState::new(None);
427        assert_eq!(state.current_actor(tmp.path()), Actor::User);
428    }
429
430    #[test]
431    fn test_agent_state_cli_flag() {
432        let tmp = TempDir::new().unwrap();
433        let state = AgentState::new(Some("aider".into()));
434        assert_eq!(
435            state.current_actor(tmp.path()),
436            Actor::Agent {
437                name: "aider".into()
438            }
439        );
440    }
441
442    #[test]
443    fn test_agent_state_connect_lock_file() {
444        let tmp = TempDir::new().unwrap();
445        let root = tmp.path();
446        std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
447        // Simulate `agent-trace connect my-agent` (no PID field)
448        std::fs::write(
449            root.join(".agent-trace/locks/agent-lock.toml"),
450            "[agent]\nname=\"my-agent\"\nsession_id=\"s1\"\ntransport=\"cli\"\nstarted_at=\"2026-01-01T00:00:00Z\"\nlast_heartbeat=\"2099-01-01T00:00:00Z\"\n",
451        ).unwrap();
452        let state = AgentState::new(None);
453        assert_eq!(
454            state.current_actor(root),
455            Actor::Agent {
456                name: "my-agent".into()
457            }
458        );
459    }
460
461    #[test]
462    fn test_agent_state_cli_flag_takes_priority_over_no_lock() {
463        let tmp = TempDir::new().unwrap();
464        // No lock file, but CLI flag set — should be Agent
465        let state = AgentState::new(Some("cli-agent".into()));
466        assert_eq!(
467            state.current_actor(tmp.path()),
468            Actor::Agent {
469                name: "cli-agent".into()
470            }
471        );
472    }
473
474    #[test]
475    fn test_instance_lock_created_and_removed() {
476        let tmp = TempDir::new().unwrap();
477        let root = tmp.path();
478        std::fs::create_dir_all(root.join(".agent-trace").join("locks")).unwrap();
479        let lock_path = root
480            .join(".agent-trace")
481            .join("locks")
482            .join("instance.lock");
483
484        {
485            let _lock = InstanceLock::acquire(root).unwrap();
486            assert!(lock_path.exists());
487        }
488        assert!(!lock_path.exists());
489    }
490
491    #[test]
492    fn test_poll_agent_denied_context_reverted() {
493        let tmp = TempDir::new().unwrap();
494        let (git, manifest, config) = setup(&tmp);
495
496        // Create and commit a context.md.
497        std::fs::write(tmp.path().join("context.md"), "# Context").unwrap();
498        {
499            let info = CommitInfo {
500                action: Action::Create,
501                files: vec![(
502                    PathBuf::from("context.md"),
503                    Action::Create,
504                    DocType::Context,
505                )],
506                actor: Actor::System,
507                summary: "create context".into(),
508                agent_name: None,
509                session_id: None,
510            };
511            git.commit(&info).unwrap();
512        }
513        {
514            let mut m = manifest.lock().unwrap();
515            m.register(&PathBuf::from("context.md"), DocType::Context, "")
516                .unwrap();
517            // Persist so the poll cycle's start-of-cycle reload sees it (disk is
518            // the source of truth).
519            m.save(tmp.path()).unwrap();
520        }
521
522        // Agent modifies context.md.
523        std::fs::write(tmp.path().join("context.md"), "# Agent tampered").unwrap();
524
525        let agent = AgentState::new(Some("claude-code".into()));
526        let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
527        proc.run_poll_cycle().unwrap();
528
529        // File should be reverted to original.
530        let content = std::fs::read_to_string(tmp.path().join("context.md")).unwrap();
531        assert_eq!(content, "# Context");
532    }
533
534    #[test]
535    fn test_head_poll_detects_external_commit() {
536        let tmp = TempDir::new().unwrap();
537        let (git, manifest, config) = setup(&tmp);
538        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
539
540        let agent = AgentState::new(None);
541        let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, Some(tx));
542
543        // Simulate MCP write: file + manifest update + commit outside poll loop.
544        std::fs::write(tmp.path().join("mcp.md"), "# MCP write").unwrap();
545        {
546            let mut m = Manifest::load(tmp.path()).unwrap();
547            m.register(&PathBuf::from("mcp.md"), DocType::Plan, "claude")
548                .unwrap();
549            m.save(tmp.path()).unwrap();
550        }
551        let store = GitStore::open(tmp.path()).unwrap();
552        let info = CommitInfo {
553            action: Action::Create,
554            files: vec![(PathBuf::from("mcp.md"), Action::Create, DocType::Plan)],
555            actor: Actor::Agent {
556                name: "claude".into(),
557            },
558            summary: "mcp write: mcp.md".into(),
559            agent_name: Some("claude".into()),
560            session_id: Some("sess-mcp".into()),
561        };
562        store.commit(&info).unwrap();
563
564        // In-memory manifest is stale (no mcp.md).
565        assert!(
566            !manifest
567                .lock()
568                .unwrap()
569                .is_tracked(&PathBuf::from("mcp.md")),
570            "pre-poll manifest should not track mcp.md"
571        );
572
573        proc.run_poll_cycle().unwrap();
574
575        let event = rx
576            .try_recv()
577            .expect("expected NewCommit for external commit");
578        match event {
579            UiEvent::NewCommit(entry) => {
580                assert_eq!(entry.agent_name.as_deref(), Some("claude"));
581                assert!(entry.summary.contains("mcp"));
582            }
583            other => panic!("expected NewCommit, got {other:?}"),
584        }
585
586        assert!(
587            manifest
588                .lock()
589                .unwrap()
590                .is_tracked(&PathBuf::from("mcp.md")),
591            "manifest should reload from disk after external commit"
592        );
593    }
594
595    #[test]
596    fn test_poll_own_commit_not_duplicated_via_head_poll() {
597        let tmp = TempDir::new().unwrap();
598        let (git, manifest, config) = setup(&tmp);
599        let (tx, mut rx) = tokio::sync::mpsc::channel(10);
600
601        std::fs::write(tmp.path().join("notes.md"), "# notes").unwrap();
602
603        let agent = AgentState::new(None);
604        let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, Some(tx));
605        proc.run_poll_cycle().unwrap();
606
607        let first = rx.try_recv().expect("poll commit should emit once");
608        assert!(matches!(first, UiEvent::NewCommit(_)));
609        assert!(
610            rx.try_recv().is_err(),
611            "HEAD poll should not duplicate poll-loop commit"
612        );
613    }
614
615    #[test]
616    fn test_session_id_reread_from_lock() {
617        let tmp = TempDir::new().unwrap();
618        let (git, manifest, config) = setup(&tmp);
619
620        std::fs::write(
621            tmp.path().join(".agent-trace/locks/agent-lock.toml"),
622            "[agent]\nname=\"my-agent\"\nsession_id=\"lock-sid-42\"\ntransport=\"mcp\"\nstarted_at=\"2099-01-01T00:00:00Z\"\nlast_heartbeat=\"2099-01-01T00:00:00Z\"\n",
623        )
624        .unwrap();
625
626        std::fs::write(tmp.path().join("scratch.md"), "# scratch").unwrap();
627
628        let agent = AgentState::new(None);
629        let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
630        proc.run_poll_cycle().unwrap();
631
632        let store = GitStore::open(tmp.path()).unwrap();
633        let all = store.log(10).unwrap();
634        let scratch_entry = all
635            .iter()
636            .find(|e| {
637                e.files
638                    .iter()
639                    .any(|(p, _, _)| p == &PathBuf::from("scratch.md"))
640            })
641            .expect("poll commit for scratch.md expected");
642        let commit_id = scratch_entry.commit_id.0.clone();
643        let repo_path = tmp.path().join(".agent-trace/repo");
644        let repo = git2::Repository::open(&repo_path).unwrap();
645        let commit = repo
646            .find_commit(git2::Oid::from_str(&commit_id).expect("valid commit oid"))
647            .unwrap();
648        let msg = commit.message().unwrap_or("");
649        assert!(
650            msg.contains("lock-sid-42"),
651            "commit should use session_id from lock file, got:\n{msg}"
652        );
653    }
654
655    #[test]
656    fn test_head_poll_no_changes_still_runs() {
657        let tmp = TempDir::new().unwrap();
658        let (git, manifest, config) = setup(&tmp);
659        let agent = AgentState::new(None);
660        let mut proc = ChangeProcessor::new(git, manifest, config, agent, None);
661        proc.run_poll_cycle().unwrap(); // no file changes, should not error
662    }
663}