1use crate::config::MergedConfig;
2use crate::git_store::{CommitInfo, GitStore};
3use crate::manifest::Manifest;
4use crate::permissions::{check_permission, Overrides, PermissionResult, Violation};
5use crate::session;
6use crate::trace::pipeline::apply_trace_hooks;
7use crate::types::{Action, Actor, DocType, FileChange, LogEntry};
8use anyhow::Result;
9use chrono::Utc;
10use git2::Oid;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex};
13
14pub use crate::runtime::session::AgentState;
15
16#[derive(Debug, Clone)]
19pub enum UiEvent {
20 NewCommit(LogEntry),
21 Violation(String),
22}
23
24fn is_pid_alive(pid: u32) -> bool {
25 #[cfg(unix)]
26 {
27 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
29 result == 0
30 }
31 #[cfg(not(unix))]
32 {
33 let _ = pid;
34 false
35 }
36}
37
38pub struct ChangeProcessor {
41 pub git: GitStore,
42 pub manifest: Arc<Mutex<Manifest>>,
43 pub agent_state: AgentState,
44 pub ui_tx: Option<tokio::sync::mpsc::Sender<UiEvent>>,
45 session_id: String,
47 last_seen_oid: Oid,
49}
50
51impl ChangeProcessor {
52 pub fn new(
53 git: GitStore,
54 manifest: Arc<Mutex<Manifest>>,
55 _config: MergedConfig,
56 agent_state: AgentState,
57 ui_tx: Option<tokio::sync::mpsc::Sender<UiEvent>>,
58 ) -> Self {
59 let session_id =
60 session::session_id_for_store(&git.workdir).unwrap_or_else(session::new_session_id);
61 let last_seen_oid = git.head_oid().unwrap_or_else(|_| Oid::zero());
62 Self {
63 git,
64 manifest,
65 agent_state,
66 ui_tx,
67 session_id,
68 last_seen_oid,
69 }
70 }
71
72 fn refresh_session_id(&mut self) {
73 if let Some(sid) = session::session_id_for_store(&self.git.workdir) {
74 self.session_id = sid;
75 }
76 }
77
78 fn reload_manifest_from_disk(&self) -> Result<()> {
79 let loaded = Manifest::load(&self.git.workdir)?;
80 *self.manifest.lock().unwrap() = loaded;
81 Ok(())
82 }
83
84 fn poll_external_commits(&mut self) -> Result<()> {
85 let current_head = self.git.head_oid()?;
86 if current_head == self.last_seen_oid {
87 return Ok(());
88 }
89
90 let new_commits = self.git.commits_since(self.last_seen_oid)?;
91 if !new_commits.is_empty() {
92 let has_non_violation = new_commits
95 .iter()
96 .any(|e| !matches!(e.action, Action::Violation));
97 if has_non_violation {
98 if let Err(e) = self.reload_manifest_from_disk() {
99 tracing::warn!("Failed to reload manifest after external commit: {}", e);
100 }
101 }
102 if let Some(tx) = &self.ui_tx {
103 for entry in new_commits {
104 let _ = tx.try_send(UiEvent::NewCommit(entry));
105 }
106 }
107 }
108
109 self.last_seen_oid = current_head;
110 Ok(())
111 }
112
113 pub fn run_poll_cycle(&mut self) -> Result<()> {
114 self.refresh_session_id();
115
116 if let Err(e) = self.reload_manifest_from_disk() {
121 tracing::warn!("Failed to reload manifest at poll cycle start: {e}");
122 }
123
124 let store_root = self.git.workdir.clone();
125 let changes = self.git.detect_changes()?;
126 let mut own_commit_oid: Option<Oid> = None;
127
128 if changes.is_empty() {
129 return self.poll_external_commits();
130 }
131
132 let actor = self.agent_state.current_actor(&store_root);
133 let overrides = Overrides::load(&store_root).unwrap_or_default();
134
135 let mut allowed: Vec<(PathBuf, Action, DocType)> = Vec::new();
136 let mut violations: Vec<Violation> = Vec::new();
137
138 let mut manifest = self.manifest.lock().unwrap();
139
140 for change in &changes {
141 let path = change.path().clone();
142 let action = change.action();
143
144 let doc_type = manifest
146 .find_by_path(&path)
147 .map(|d| d.doc_type.clone())
148 .unwrap_or(DocType::Scratch);
149
150 let perm = check_permission(&doc_type, &actor, &overrides, Some(&path));
151
152 match perm {
153 PermissionResult::Allowed => {
154 if let FileChange::Renamed { from, to } = change {
164 if manifest.is_tracked(from) {
165 let _ = manifest.update_path(from, to);
166 }
167 }
168 allowed.push((path, action, doc_type));
169 }
170 PermissionResult::Denied { reason } => {
171 if let Err(e) = self.git.revert_file(&path) {
173 tracing::warn!("Failed to revert {}: {}", path.display(), e);
174 }
175
176 let full = store_root.join(&path);
178 if let Ok(content) = std::fs::read_to_string(&full) {
179 let _ = self.git.save_rejected(&path, &content);
180 }
181
182 violations.push(Violation {
183 timestamp: Utc::now(),
184 doc_path: path.clone(),
185 actor: actor.clone(),
186 agent_name: actor.agent_name().map(String::from),
187 attempted_action: action,
188 reason: reason.clone(),
189 });
190
191 if let Some(tx) = &self.ui_tx {
193 let msg = format!(
194 "Permission denied: {} tried to modify {} ({})",
195 actor,
196 path.display(),
197 reason
198 );
199 let _ = tx.try_send(UiEvent::Violation(msg));
200 }
201 }
202 PermissionResult::RequiresConfirmation { .. } => {
203 allowed.push((path, action, doc_type));
205 }
206 }
207 }
208
209 for v in &violations {
211 let info = CommitInfo {
212 action: Action::Violation,
213 files: vec![(
214 v.doc_path.clone(),
215 v.attempted_action.clone(),
216 manifest
217 .find_by_path(&v.doc_path)
218 .map(|d| d.doc_type.clone())
219 .unwrap_or(DocType::Scratch),
220 )],
221 actor: Actor::System,
222 summary: format!(
223 "violation: {} attempted {} on {} — {}",
224 v.actor,
225 v.attempted_action,
226 v.doc_path.display(),
227 v.reason
228 ),
229 agent_name: v.agent_name.clone(),
230 session_id: None,
231 };
232 if let Err(e) = self.git.commit(&info) {
233 tracing::error!(
234 "Failed to commit violation record for {:?}: {:#}",
235 v.doc_path,
236 e
237 );
238 }
239 }
240
241 if !allowed.is_empty() {
243 if let Err(e) = crate::runtime::require_synthesis_backend(Some(&store_root)) {
248 tracing::warn!(
249 "Skipping poll commit of {} file(s) — synthesis backend unavailable: {e}",
250 allowed.len()
251 );
252 drop(manifest);
253 return self.poll_external_commits();
254 }
255 let info = CommitInfo {
256 action: allowed[0].1.clone(),
257 files: allowed.clone(),
258 actor: actor.clone(),
259 summary: format!("{} {} file(s)", allowed[0].1, allowed.len()),
260 agent_name: actor.agent_name().map(String::from),
261 session_id: Some(self.session_id.clone()),
262 };
263 match self.git.commit(&info) {
264 Ok(oid) => {
265 own_commit_oid = Some(oid);
266 let _ = manifest.save(&store_root);
268 let entry = LogEntry {
269 commit_id: crate::types::CommitId(oid.to_string()),
270 timestamp: Utc::now(),
271 action: info.action,
272 actor: actor.clone(),
273 agent_name: actor.agent_name().map(String::from),
274 files: info.files.clone(),
275 summary: info.summary,
276 };
277 if let Some(tx) = &self.ui_tx {
278 let _ = tx.try_send(UiEvent::NewCommit(entry));
279 }
280
281 if let Err(e) = apply_trace_hooks(
282 &store_root,
283 &self.git,
284 &manifest,
285 &actor,
286 Some(&self.session_id),
287 &allowed,
288 "poll",
289 ) {
290 tracing::warn!("post-write trace hooks failed: {}", e);
291 }
292 }
293 Err(e) => {
294 tracing::warn!("Poll batch commit failed: {}", e);
297 drop(manifest);
298 return self.poll_external_commits();
299 }
300 }
301 }
302
303 drop(manifest);
304
305 if own_commit_oid.is_some() {
308 if let Ok(head) = self.git.head_oid() {
309 self.last_seen_oid = head;
310 }
311 }
312 self.poll_external_commits()
313 }
314}
315
316pub struct InstanceLock {
319 path: PathBuf,
320}
321
322impl InstanceLock {
323 pub fn acquire(store_root: &Path) -> Result<Self> {
324 let path = store_root
325 .join(".agent-trace")
326 .join("locks")
327 .join("instance.lock");
328 std::fs::create_dir_all(path.parent().unwrap())?;
329
330 if path.exists() {
332 if let Ok(content) = std::fs::read_to_string(&path) {
333 if let Ok(pid) = content.trim().parse::<u32>() {
334 if is_pid_alive(pid) {
335 anyhow::bail!(
336 "Another agent-trace instance is running (PID {pid}). \
337 Opening in read-only mode."
338 );
339 }
340 }
341 }
342 }
343
344 let pid = std::process::id();
345 std::fs::write(&path, pid.to_string())?;
346 Ok(Self { path })
347 }
348}
349
350impl Drop for InstanceLock {
351 fn drop(&mut self) {
352 let _ = std::fs::remove_file(&self.path);
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use crate::config::{GlobalConfig, MergedConfig, PollingConfig, StoreConfig, StoreInfo};
360 use crate::session::AgentState;
361 use tempfile::TempDir;
362
363 fn setup(tmp: &TempDir) -> (GitStore, Arc<Mutex<Manifest>>, MergedConfig) {
364 let root = tmp.path();
365 std::fs::create_dir_all(root.join(".agent-trace").join("locks")).unwrap();
366 let git = GitStore::init(root).unwrap();
367 let info = StoreInfo::new("test".into());
368 let manifest = Manifest::create_empty(info.clone(), root).unwrap();
369 let global = GlobalConfig::default();
370 let store_cfg = StoreConfig {
371 store: info,
372 llm: None,
373 synthesis: None,
374 polling: PollingConfig::default(),
375 };
376 store_cfg.save(root).unwrap();
377 let config = MergedConfig::merge(global, store_cfg);
378 (git, Arc::new(Mutex::new(manifest)), config)
379 }
380
381 #[test]
382 fn test_poll_no_changes() {
383 let tmp = TempDir::new().unwrap();
384 let (git, manifest, config) = setup(&tmp);
385 let agent = AgentState::new(None);
386 let mut proc = ChangeProcessor::new(git, manifest, config, agent, None);
387 proc.run_poll_cycle().unwrap(); }
389
390 #[test]
391 fn test_poll_new_file_committed_not_registered() {
392 let tmp = TempDir::new().unwrap();
393 let (git, manifest, config) = setup(&tmp);
394
395 std::fs::write(tmp.path().join("task.py"), "print('work')\n").unwrap();
397
398 let agent = AgentState::new(None);
399 let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
400 proc.run_poll_cycle().unwrap();
401
402 {
404 let m = manifest.lock().unwrap();
405 assert!(
406 !m.is_tracked(&PathBuf::from("task.py")),
407 "poll must not auto-register source files in the manifest"
408 );
409 }
410
411 let store = GitStore::open(tmp.path()).unwrap();
413 let log = store.log(10).unwrap();
414 assert!(
415 log.iter().any(|e| e
416 .files
417 .iter()
418 .any(|(p, _, _)| p == &PathBuf::from("task.py"))),
419 "poll should still commit the new file to git"
420 );
421 }
422
423 #[test]
424 fn test_agent_state_no_lock_no_flag() {
425 let tmp = TempDir::new().unwrap();
426 let state = AgentState::new(None);
427 assert_eq!(state.current_actor(tmp.path()), Actor::User);
428 }
429
430 #[test]
431 fn test_agent_state_cli_flag() {
432 let tmp = TempDir::new().unwrap();
433 let state = AgentState::new(Some("aider".into()));
434 assert_eq!(
435 state.current_actor(tmp.path()),
436 Actor::Agent {
437 name: "aider".into()
438 }
439 );
440 }
441
442 #[test]
443 fn test_agent_state_connect_lock_file() {
444 let tmp = TempDir::new().unwrap();
445 let root = tmp.path();
446 std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
447 std::fs::write(
449 root.join(".agent-trace/locks/agent-lock.toml"),
450 "[agent]\nname=\"my-agent\"\nsession_id=\"s1\"\ntransport=\"cli\"\nstarted_at=\"2026-01-01T00:00:00Z\"\nlast_heartbeat=\"2099-01-01T00:00:00Z\"\n",
451 ).unwrap();
452 let state = AgentState::new(None);
453 assert_eq!(
454 state.current_actor(root),
455 Actor::Agent {
456 name: "my-agent".into()
457 }
458 );
459 }
460
461 #[test]
462 fn test_agent_state_cli_flag_takes_priority_over_no_lock() {
463 let tmp = TempDir::new().unwrap();
464 let state = AgentState::new(Some("cli-agent".into()));
466 assert_eq!(
467 state.current_actor(tmp.path()),
468 Actor::Agent {
469 name: "cli-agent".into()
470 }
471 );
472 }
473
474 #[test]
475 fn test_instance_lock_created_and_removed() {
476 let tmp = TempDir::new().unwrap();
477 let root = tmp.path();
478 std::fs::create_dir_all(root.join(".agent-trace").join("locks")).unwrap();
479 let lock_path = root
480 .join(".agent-trace")
481 .join("locks")
482 .join("instance.lock");
483
484 {
485 let _lock = InstanceLock::acquire(root).unwrap();
486 assert!(lock_path.exists());
487 }
488 assert!(!lock_path.exists());
489 }
490
491 #[test]
492 fn test_poll_agent_denied_context_reverted() {
493 let tmp = TempDir::new().unwrap();
494 let (git, manifest, config) = setup(&tmp);
495
496 std::fs::write(tmp.path().join("context.md"), "# Context").unwrap();
498 {
499 let info = CommitInfo {
500 action: Action::Create,
501 files: vec![(
502 PathBuf::from("context.md"),
503 Action::Create,
504 DocType::Context,
505 )],
506 actor: Actor::System,
507 summary: "create context".into(),
508 agent_name: None,
509 session_id: None,
510 };
511 git.commit(&info).unwrap();
512 }
513 {
514 let mut m = manifest.lock().unwrap();
515 m.register(&PathBuf::from("context.md"), DocType::Context, "")
516 .unwrap();
517 m.save(tmp.path()).unwrap();
520 }
521
522 std::fs::write(tmp.path().join("context.md"), "# Agent tampered").unwrap();
524
525 let agent = AgentState::new(Some("claude-code".into()));
526 let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
527 proc.run_poll_cycle().unwrap();
528
529 let content = std::fs::read_to_string(tmp.path().join("context.md")).unwrap();
531 assert_eq!(content, "# Context");
532 }
533
534 #[test]
535 fn test_head_poll_detects_external_commit() {
536 let tmp = TempDir::new().unwrap();
537 let (git, manifest, config) = setup(&tmp);
538 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
539
540 let agent = AgentState::new(None);
541 let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, Some(tx));
542
543 std::fs::write(tmp.path().join("mcp.md"), "# MCP write").unwrap();
545 {
546 let mut m = Manifest::load(tmp.path()).unwrap();
547 m.register(&PathBuf::from("mcp.md"), DocType::Plan, "claude")
548 .unwrap();
549 m.save(tmp.path()).unwrap();
550 }
551 let store = GitStore::open(tmp.path()).unwrap();
552 let info = CommitInfo {
553 action: Action::Create,
554 files: vec![(PathBuf::from("mcp.md"), Action::Create, DocType::Plan)],
555 actor: Actor::Agent {
556 name: "claude".into(),
557 },
558 summary: "mcp write: mcp.md".into(),
559 agent_name: Some("claude".into()),
560 session_id: Some("sess-mcp".into()),
561 };
562 store.commit(&info).unwrap();
563
564 assert!(
566 !manifest
567 .lock()
568 .unwrap()
569 .is_tracked(&PathBuf::from("mcp.md")),
570 "pre-poll manifest should not track mcp.md"
571 );
572
573 proc.run_poll_cycle().unwrap();
574
575 let event = rx
576 .try_recv()
577 .expect("expected NewCommit for external commit");
578 match event {
579 UiEvent::NewCommit(entry) => {
580 assert_eq!(entry.agent_name.as_deref(), Some("claude"));
581 assert!(entry.summary.contains("mcp"));
582 }
583 other => panic!("expected NewCommit, got {other:?}"),
584 }
585
586 assert!(
587 manifest
588 .lock()
589 .unwrap()
590 .is_tracked(&PathBuf::from("mcp.md")),
591 "manifest should reload from disk after external commit"
592 );
593 }
594
595 #[test]
596 fn test_poll_own_commit_not_duplicated_via_head_poll() {
597 let tmp = TempDir::new().unwrap();
598 let (git, manifest, config) = setup(&tmp);
599 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
600
601 std::fs::write(tmp.path().join("notes.md"), "# notes").unwrap();
602
603 let agent = AgentState::new(None);
604 let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, Some(tx));
605 proc.run_poll_cycle().unwrap();
606
607 let first = rx.try_recv().expect("poll commit should emit once");
608 assert!(matches!(first, UiEvent::NewCommit(_)));
609 assert!(
610 rx.try_recv().is_err(),
611 "HEAD poll should not duplicate poll-loop commit"
612 );
613 }
614
615 #[test]
616 fn test_session_id_reread_from_lock() {
617 let tmp = TempDir::new().unwrap();
618 let (git, manifest, config) = setup(&tmp);
619
620 std::fs::write(
621 tmp.path().join(".agent-trace/locks/agent-lock.toml"),
622 "[agent]\nname=\"my-agent\"\nsession_id=\"lock-sid-42\"\ntransport=\"mcp\"\nstarted_at=\"2099-01-01T00:00:00Z\"\nlast_heartbeat=\"2099-01-01T00:00:00Z\"\n",
623 )
624 .unwrap();
625
626 std::fs::write(tmp.path().join("scratch.md"), "# scratch").unwrap();
627
628 let agent = AgentState::new(None);
629 let mut proc = ChangeProcessor::new(git, manifest.clone(), config, agent, None);
630 proc.run_poll_cycle().unwrap();
631
632 let store = GitStore::open(tmp.path()).unwrap();
633 let all = store.log(10).unwrap();
634 let scratch_entry = all
635 .iter()
636 .find(|e| {
637 e.files
638 .iter()
639 .any(|(p, _, _)| p == &PathBuf::from("scratch.md"))
640 })
641 .expect("poll commit for scratch.md expected");
642 let commit_id = scratch_entry.commit_id.0.clone();
643 let repo_path = tmp.path().join(".agent-trace/repo");
644 let repo = git2::Repository::open(&repo_path).unwrap();
645 let commit = repo
646 .find_commit(git2::Oid::from_str(&commit_id).expect("valid commit oid"))
647 .unwrap();
648 let msg = commit.message().unwrap_or("");
649 assert!(
650 msg.contains("lock-sid-42"),
651 "commit should use session_id from lock file, got:\n{msg}"
652 );
653 }
654
655 #[test]
656 fn test_head_poll_no_changes_still_runs() {
657 let tmp = TempDir::new().unwrap();
658 let (git, manifest, config) = setup(&tmp);
659 let agent = AgentState::new(None);
660 let mut proc = ChangeProcessor::new(git, manifest, config, agent, None);
661 proc.run_poll_cycle().unwrap(); }
663}