agent_trace/runtime/
activity_monitor.rs1use crate::config::MergedConfig;
2use crate::git_store::GitStore;
3use crate::manifest::Manifest;
4use crate::runtime::{AgentState, ChangeProcessor, PollLock, UiEvent};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9use tokio::sync::mpsc::Sender;
10
11pub struct ActivityMonitor {
19 poll_leader: bool,
20 _processor: Option<Arc<Mutex<ChangeProcessor>>>,
21 _poll_lock: Option<PollLock>,
23}
24
25impl ActivityMonitor {
26 pub fn try_start(
32 store_root: &Path,
33 config: MergedConfig,
34 manifest: Arc<Mutex<Manifest>>,
35 agent_state: AgentState,
36 ui_tx: Option<Sender<UiEvent>>,
37 ) -> Result<Self> {
38 if !config.polling.enabled {
39 tracing::info!("Polling disabled in store config; activity monitor not started.");
40 return Ok(Self {
41 poll_leader: false,
42 _processor: None,
43 _poll_lock: None,
44 });
45 }
46
47 let poll_lock = match PollLock::try_acquire(store_root)? {
48 Some(lock) => lock,
49 None => {
50 tracing::info!(
51 "Poll leader already active in another process; \
52 running without a poll thread (HEAD-only updates)."
53 );
54 return Ok(Self {
55 poll_leader: false,
56 _processor: None,
57 _poll_lock: None,
58 });
59 }
60 };
61
62 let git = GitStore::open(store_root)?;
63 let processor = ChangeProcessor::new(git, manifest, config.clone(), agent_state, ui_tx);
64 let processor = Arc::new(Mutex::new(processor));
65 let poll_interval_ms = config.polling.interval_ms;
66 let processor_clone = processor.clone();
67
68 std::thread::spawn(move || loop {
69 std::thread::sleep(Duration::from_millis(poll_interval_ms));
70 if let Ok(mut p) = processor_clone.lock() {
71 if let Err(e) = p.run_poll_cycle() {
72 tracing::warn!("Poll cycle error: {e}");
73 }
74 }
75 });
76
77 Ok(Self {
78 poll_leader: true,
79 _processor: Some(processor),
80 _poll_lock: Some(poll_lock),
81 })
82 }
83
84 pub fn is_poll_leader(&self) -> bool {
86 self.poll_leader
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::config::{GlobalConfig, PollingConfig, StoreConfig, StoreInfo};
94 use tempfile::TempDir;
95
96 #[test]
97 fn activity_monitor_becomes_poll_leader_when_lock_free() {
98 let tmp = TempDir::new().unwrap();
99 let root = tmp.path();
100 std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
101 let git = GitStore::init(root).unwrap();
102 let _ = git;
103 let info = StoreInfo::new("test".into());
104 let manifest = Manifest::create_empty(info.clone(), root).unwrap();
105 let store_cfg = StoreConfig {
106 store: info,
107 llm: None,
108 synthesis: None,
109 polling: PollingConfig {
110 interval_ms: 100,
111 ..PollingConfig::default()
112 },
113 };
114 store_cfg.save(root).unwrap();
115 let global = GlobalConfig::default();
116 let config = MergedConfig::merge(global, store_cfg);
117 let manifest = Arc::new(Mutex::new(manifest));
118 let monitor =
119 ActivityMonitor::try_start(root, config, manifest, AgentState::new(None), None)
120 .unwrap();
121 assert!(monitor.is_poll_leader());
122 }
123
124 #[test]
125 fn second_monitor_is_not_poll_leader() {
126 let tmp = TempDir::new().unwrap();
127 let root = tmp.path();
128 std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
129 let git = GitStore::init(root).unwrap();
130 let _ = git;
131 let info = StoreInfo::new("test".into());
132 let manifest = Manifest::create_empty(info.clone(), root).unwrap();
133 let store_cfg = StoreConfig {
134 store: info,
135 llm: None,
136 synthesis: None,
137 polling: PollingConfig {
138 interval_ms: 100,
139 ..PollingConfig::default()
140 },
141 };
142 store_cfg.save(root).unwrap();
143 let global = GlobalConfig::default();
144 let config = MergedConfig::merge(global, store_cfg);
145 let manifest = Arc::new(Mutex::new(manifest));
146
147 let first = ActivityMonitor::try_start(
148 root,
149 config.clone(),
150 manifest.clone(),
151 AgentState::new(None),
152 None,
153 )
154 .unwrap();
155 assert!(first.is_poll_leader(), "first monitor should lead the poll");
156
157 let second =
158 ActivityMonitor::try_start(root, config, manifest, AgentState::new(None), None)
159 .unwrap();
160 assert!(
161 !second.is_poll_leader(),
162 "second monitor must not run a duplicate poll loop"
163 );
164 }
165}