Skip to main content

agent_trace/runtime/
activity_monitor.rs

1use crate::config::MergedConfig;
2use crate::git_store::GitStore;
3use crate::manifest::Manifest;
4use crate::runtime::{AgentState, ChangeProcessor, PollLock, UiEvent};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9use tokio::sync::mpsc::Sender;
10
11/// Background filesystem activity monitor.
12///
13/// Poll-loop ownership is elected across processes with a [`PollLock`]. Only the
14/// process that acquires the lock runs the poll thread (the *poll leader*);
15/// other processes still construct an `ActivityMonitor` but with
16/// `poll_leader == false` and no thread, relying on HEAD-only updates so that
17/// commits / activity events are never duplicated.
18pub struct ActivityMonitor {
19    poll_leader: bool,
20    _processor: Option<Arc<Mutex<ChangeProcessor>>>,
21    /// Held for the lifetime of the monitor to keep poll leadership.
22    _poll_lock: Option<PollLock>,
23}
24
25impl ActivityMonitor {
26    /// Start the poll loop if this process can become the poll leader.
27    ///
28    /// Always returns a monitor: when the poll lock is already held by another
29    /// process the returned monitor has no poll thread (`is_poll_leader()` is
30    /// `false`).
31    pub fn try_start(
32        store_root: &Path,
33        config: MergedConfig,
34        manifest: Arc<Mutex<Manifest>>,
35        agent_state: AgentState,
36        ui_tx: Option<Sender<UiEvent>>,
37    ) -> Result<Self> {
38        if !config.polling.enabled {
39            tracing::info!("Polling disabled in store config; activity monitor not started.");
40            return Ok(Self {
41                poll_leader: false,
42                _processor: None,
43                _poll_lock: None,
44            });
45        }
46
47        let poll_lock = match PollLock::try_acquire(store_root)? {
48            Some(lock) => lock,
49            None => {
50                tracing::info!(
51                    "Poll leader already active in another process; \
52                     running without a poll thread (HEAD-only updates)."
53                );
54                return Ok(Self {
55                    poll_leader: false,
56                    _processor: None,
57                    _poll_lock: None,
58                });
59            }
60        };
61
62        let git = GitStore::open(store_root)?;
63        let processor = ChangeProcessor::new(git, manifest, config.clone(), agent_state, ui_tx);
64        let processor = Arc::new(Mutex::new(processor));
65        let poll_interval_ms = config.polling.interval_ms;
66        let processor_clone = processor.clone();
67
68        std::thread::spawn(move || loop {
69            std::thread::sleep(Duration::from_millis(poll_interval_ms));
70            if let Ok(mut p) = processor_clone.lock() {
71                if let Err(e) = p.run_poll_cycle() {
72                    tracing::warn!("Poll cycle error: {e}");
73                }
74            }
75        });
76
77        Ok(Self {
78            poll_leader: true,
79            _processor: Some(processor),
80            _poll_lock: Some(poll_lock),
81        })
82    }
83
84    /// Whether this monitor owns the cross-process poll loop.
85    pub fn is_poll_leader(&self) -> bool {
86        self.poll_leader
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::config::{GlobalConfig, PollingConfig, StoreConfig, StoreInfo};
94    use tempfile::TempDir;
95
96    #[test]
97    fn activity_monitor_becomes_poll_leader_when_lock_free() {
98        let tmp = TempDir::new().unwrap();
99        let root = tmp.path();
100        std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
101        let git = GitStore::init(root).unwrap();
102        let _ = git;
103        let info = StoreInfo::new("test".into());
104        let manifest = Manifest::create_empty(info.clone(), root).unwrap();
105        let store_cfg = StoreConfig {
106            store: info,
107            llm: None,
108            synthesis: None,
109            polling: PollingConfig {
110                interval_ms: 100,
111                ..PollingConfig::default()
112            },
113        };
114        store_cfg.save(root).unwrap();
115        let global = GlobalConfig::default();
116        let config = MergedConfig::merge(global, store_cfg);
117        let manifest = Arc::new(Mutex::new(manifest));
118        let monitor =
119            ActivityMonitor::try_start(root, config, manifest, AgentState::new(None), None)
120                .unwrap();
121        assert!(monitor.is_poll_leader());
122    }
123
124    #[test]
125    fn second_monitor_is_not_poll_leader() {
126        let tmp = TempDir::new().unwrap();
127        let root = tmp.path();
128        std::fs::create_dir_all(root.join(".agent-trace/locks")).unwrap();
129        let git = GitStore::init(root).unwrap();
130        let _ = git;
131        let info = StoreInfo::new("test".into());
132        let manifest = Manifest::create_empty(info.clone(), root).unwrap();
133        let store_cfg = StoreConfig {
134            store: info,
135            llm: None,
136            synthesis: None,
137            polling: PollingConfig {
138                interval_ms: 100,
139                ..PollingConfig::default()
140            },
141        };
142        store_cfg.save(root).unwrap();
143        let global = GlobalConfig::default();
144        let config = MergedConfig::merge(global, store_cfg);
145        let manifest = Arc::new(Mutex::new(manifest));
146
147        let first = ActivityMonitor::try_start(
148            root,
149            config.clone(),
150            manifest.clone(),
151            AgentState::new(None),
152            None,
153        )
154        .unwrap();
155        assert!(first.is_poll_leader(), "first monitor should lead the poll");
156
157        let second =
158            ActivityMonitor::try_start(root, config, manifest, AgentState::new(None), None)
159                .unwrap();
160        assert!(
161            !second.is_poll_leader(),
162            "second monitor must not run a duplicate poll loop"
163        );
164    }
165}