Skip to main content

agent_trace/runtime/
poll_lock.rs

1//! Cross-process poll leader election.
2//!
3//! `InstanceLock` (see [`crate::runtime::change_processor`]) guards single TUI
4//! ownership but is only meaningful for interactive sessions. The poll loop,
5//! however, must be a *single writer across processes* — an MCP server and a
6//! `agent-trace open` TUI started against the same store would otherwise both
7//! run a poll loop and duplicate commits / `summary_events.jsonl` entries.
8//!
9//! `PollLock` provides that election with an OS-level advisory lock (`flock` on
10//! Unix) over `.agent-trace/locks/poll.lock`. The first acquirer becomes the
11//! poll leader; subsequent acquirers get `None` and fall back to HEAD-only
12//! updates. The lock is released when the [`PollLock`] is dropped (the file
13//! descriptor closes), so a crashed leader does not leave a stale lock.
14
15use anyhow::{Context, Result};
16use std::fs::{File, OpenOptions};
17use std::path::{Path, PathBuf};
18
19/// Advisory, cross-process lock that elects a single poll-loop writer.
20pub struct PollLock {
21    /// Kept open for the lifetime of the lock; closing the fd releases `flock`.
22    _file: File,
23    #[allow(dead_code)]
24    path: PathBuf,
25}
26
27impl PollLock {
28    /// Try to become the poll leader without blocking.
29    ///
30    /// Returns `Ok(Some(lock))` if this process acquired the poll lock, or
31    /// `Ok(None)` if another process already holds it.
32    pub fn try_acquire(store_root: &Path) -> Result<Option<Self>> {
33        let dir = store_root.join(".agent-trace").join("locks");
34        std::fs::create_dir_all(&dir).context("create .agent-trace/locks directory")?;
35        let path = dir.join("poll.lock");
36
37        let file = OpenOptions::new()
38            .create(true)
39            .read(true)
40            .write(true)
41            .truncate(false)
42            .open(&path)
43            .with_context(|| format!("open poll lock at {}", path.display()))?;
44
45        #[cfg(unix)]
46        {
47            use std::os::unix::io::AsRawFd;
48            let fd = file.as_raw_fd();
49            // Non-blocking exclusive advisory lock.
50            let rc = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) };
51            if rc != 0 {
52                let err = std::io::Error::last_os_error();
53                match err.raw_os_error() {
54                    Some(code) if code == libc::EWOULDBLOCK || code == libc::EAGAIN => {
55                        return Ok(None);
56                    }
57                    _ => {
58                        return Err(anyhow::anyhow!(
59                            "failed to acquire poll lock {}: {err}",
60                            path.display()
61                        ));
62                    }
63                }
64            }
65            // Record the leader PID for diagnostics (best-effort).
66            use std::io::{Seek, SeekFrom, Write};
67            let mut f = &file;
68            let _ = f.set_len(0);
69            let _ = f.seek(SeekFrom::Start(0));
70            let _ = write!(f, "{}", std::process::id());
71        }
72
73        #[cfg(not(unix))]
74        {
75            // No advisory flock available; fall back to an exclusive-create
76            // sentinel with stale-PID detection (best effort).
77            if !try_acquire_sentinel(&path)? {
78                return Ok(None);
79            }
80        }
81
82        Ok(Some(Self { _file: file, path }))
83    }
84}
85
86#[cfg(not(unix))]
87fn try_acquire_sentinel(path: &Path) -> Result<bool> {
88    use std::io::Write;
89    if path.exists() {
90        if let Ok(content) = std::fs::read_to_string(path) {
91            if content.trim().parse::<u32>().is_ok() {
92                // Conservatively treat an existing sentinel as a live leader.
93                return Ok(false);
94            }
95        }
96    }
97    let mut f = std::fs::File::create(path)?;
98    write!(f, "{}", std::process::id())?;
99    Ok(true)
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use tempfile::TempDir;
106
107    /// V-D1: a second acquire returns `None` while the first lock is held.
108    #[test]
109    fn second_acquire_blocked_while_held() {
110        let tmp = TempDir::new().unwrap();
111        let root = tmp.path();
112        let first = PollLock::try_acquire(root).unwrap();
113        assert!(first.is_some(), "first acquire should become poll leader");
114
115        let second = PollLock::try_acquire(root).unwrap();
116        assert!(
117            second.is_none(),
118            "second acquire must fail while first lock is held"
119        );
120
121        drop(first);
122        drop(second);
123    }
124
125    /// V-D2: after the first lock is dropped, a new acquire succeeds.
126    #[test]
127    fn acquire_succeeds_after_release() {
128        let tmp = TempDir::new().unwrap();
129        let root = tmp.path();
130
131        {
132            let first = PollLock::try_acquire(root).unwrap();
133            assert!(first.is_some());
134        } // dropped here → lock released
135
136        let again = PollLock::try_acquire(root).unwrap();
137        assert!(
138            again.is_some(),
139            "acquire should succeed after the previous lock was released"
140        );
141    }
142}