Skip to main content

ito_core/audit/
stream.rs

1//! Poll-based audit event streaming with multi-worktree support.
2//!
3//! Provides a simple polling mechanism that checks routed audit storage for
4//! new events at a configurable interval. Supports monitoring events across
5//! multiple git worktrees without assuming a tracked worktree JSONL file.
6
7use std::collections::HashSet;
8use std::path::Path;
9use std::time::Duration;
10
11use ito_domain::audit::event::AuditEvent;
12
13use super::store::{AuditEventStore, audit_storage_location_key, default_audit_store};
14use super::worktree::discover_worktrees;
15
16/// Configuration for the event stream.
17#[derive(Debug, Clone)]
18pub struct StreamConfig {
19    /// Poll interval (default: 500ms).
20    pub poll_interval: Duration,
21    /// Monitor all worktrees, not just the current one.
22    pub all_worktrees: bool,
23    /// Number of initial events to emit on startup.
24    pub last: usize,
25}
26
27impl Default for StreamConfig {
28    fn default() -> Self {
29        Self {
30            poll_interval: Duration::from_millis(500),
31            all_worktrees: false,
32            last: 10,
33        }
34    }
35}
36
37/// A single stream source: either the main project or a worktree.
38pub struct StreamSource {
39    /// Label for this source (e.g., "main" or worktree branch name).
40    pub label: String,
41    /// Routed audit store retained across polls.
42    store: Box<dyn AuditEventStore>,
43    /// Number of events previously seen.
44    ///
45    /// This offset assumes the store is append-only between polls. If the
46    /// underlying log is truncated or rotated, a shorter read is treated as a
47    /// reset and the next append-only growth resumes from the new length.
48    offset: usize,
49}
50
51/// A streamed event with its source label.
52#[derive(Debug)]
53pub struct StreamEvent {
54    /// The audit event.
55    pub event: AuditEvent,
56    /// Label of the source (e.g., "main" or branch name).
57    pub source: String,
58}
59
60/// Read the initial batch of events for streaming (the last N events).
61///
62/// Returns events from the main project log and, if `all_worktrees` is true,
63/// from all discovered worktrees.
64pub fn read_initial_events(
65    ito_path: &Path,
66    config: &StreamConfig,
67) -> (Vec<StreamEvent>, Vec<StreamSource>) {
68    let mut sources = Vec::new();
69    let mut events = Vec::new();
70    let mut seen_locations = HashSet::new();
71
72    // Main project source
73    let main_store = default_audit_store(ito_path);
74    let main_key = audit_storage_location_key(&main_store.location());
75    let main_events = main_store.read_all();
76    let start = main_events.len().saturating_sub(config.last);
77    for event in &main_events[start..] {
78        events.push(StreamEvent {
79            event: event.clone(),
80            source: "main".to_string(),
81        });
82    }
83    sources.push(StreamSource {
84        label: "main".to_string(),
85        store: main_store,
86        offset: main_events.len(),
87    });
88    seen_locations.insert(main_key);
89
90    // Worktree sources
91    if config.all_worktrees {
92        let worktrees = discover_worktrees(ito_path);
93        for wt in &worktrees {
94            if wt.is_main {
95                continue; // Already handled above
96            }
97            let wt_ito_path = wt.path.join(".ito");
98            if !wt_ito_path.exists() {
99                continue;
100            }
101
102            let wt_store = default_audit_store(&wt_ito_path);
103            let wt_key = audit_storage_location_key(&wt_store.location());
104            if !seen_locations.insert(wt_key) {
105                continue;
106            }
107
108            let wt_events = wt_store.read_all();
109            let label = wt
110                .branch
111                .clone()
112                .unwrap_or_else(|| wt.path.display().to_string());
113            let start = wt_events.len().saturating_sub(config.last);
114            for event in &wt_events[start..] {
115                events.push(StreamEvent {
116                    event: event.clone(),
117                    source: label.clone(),
118                });
119            }
120            sources.push(StreamSource {
121                label,
122                store: wt_store,
123                offset: wt_events.len(),
124            });
125        }
126    }
127
128    (events, sources)
129}
130
131/// Poll all sources for new events since the last check.
132///
133/// Updates the offsets in each source so subsequent polls only return new events.
134///
135/// Offsets are based on event counts, so a store that shrinks between polls is
136/// treated as having reset; the shorter snapshot advances no offset until new
137/// events extend the store again.
138pub fn poll_new_events(sources: &mut [StreamSource]) -> Vec<StreamEvent> {
139    let mut new_events = Vec::new();
140
141    for source in sources.iter_mut() {
142        let current_events = source.store.read_all();
143        if current_events.len() <= source.offset {
144            continue;
145        }
146
147        for event in &current_events[source.offset..] {
148            new_events.push(StreamEvent {
149                event: event.clone(),
150                source: source.label.clone(),
151            });
152        }
153
154        source.offset = current_events.len();
155    }
156
157    new_events
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
164    use std::path::Path;
165
166    fn run_git(repo: &Path, args: &[&str]) {
167        let output = std::process::Command::new("git")
168            .args(args)
169            .current_dir(repo)
170            .env_remove("GIT_DIR")
171            .env_remove("GIT_WORK_TREE")
172            .output()
173            .expect("git should run");
174        assert!(
175            output.status.success(),
176            "git command failed: git {}\nstdout:\n{}\nstderr:\n{}",
177            args.join(" "),
178            String::from_utf8_lossy(&output.stdout),
179            String::from_utf8_lossy(&output.stderr)
180        );
181    }
182
183    fn init_git_repo(repo: &Path) {
184        run_git(repo, &["init"]);
185        run_git(repo, &["config", "user.email", "test@example.com"]);
186        run_git(repo, &["config", "user.name", "Test User"]);
187        run_git(repo, &["config", "commit.gpgsign", "false"]);
188        std::fs::write(repo.join("README.md"), "hi\n").expect("write readme");
189        run_git(repo, &["add", "README.md"]);
190        run_git(repo, &["commit", "-m", "initial"]);
191    }
192
193    fn test_event(entity_id: &str) -> AuditEvent {
194        AuditEvent {
195            v: SCHEMA_VERSION,
196            ts: "2026-02-08T14:30:00.000Z".to_string(),
197            entity: "task".to_string(),
198            entity_id: entity_id.to_string(),
199            scope: Some("test-change".to_string()),
200            op: "create".to_string(),
201            from: None,
202            to: Some("pending".to_string()),
203            actor: "cli".to_string(),
204            by: "@test".to_string(),
205            meta: None,
206            ctx: EventContext {
207                session_id: "test-sid".to_string(),
208                harness_session_id: None,
209                branch: None,
210                worktree: None,
211                commit: None,
212            },
213        }
214    }
215
216    #[test]
217    fn read_initial_events_returns_last_n() {
218        let tmp = tempfile::tempdir().expect("tempdir");
219        let ito_path = tmp.path().join(".ito");
220
221        let writer = crate::audit::default_audit_store(&ito_path);
222        for i in 0..20 {
223            writer
224                .append(&test_event(&format!("1.{i}")))
225                .expect("append");
226        }
227
228        let config = StreamConfig {
229            last: 5,
230            all_worktrees: false,
231            ..Default::default()
232        };
233
234        let (events, sources) = read_initial_events(&ito_path, &config);
235        assert_eq!(events.len(), 5);
236        assert_eq!(sources.len(), 1);
237        assert_eq!(sources[0].offset, 20);
238        assert_eq!(events[0].event.entity_id, "1.15");
239    }
240
241    #[test]
242    fn poll_detects_new_events() {
243        let tmp = tempfile::tempdir().expect("tempdir");
244        let ito_path = tmp.path().join(".ito");
245
246        let writer = crate::audit::default_audit_store(&ito_path);
247        writer.append(&test_event("1.1")).expect("append");
248
249        let config = StreamConfig::default();
250        let (_initial, mut sources) = read_initial_events(&ito_path, &config);
251
252        // Write more events
253        writer.append(&test_event("1.2")).expect("append");
254        writer.append(&test_event("1.3")).expect("append");
255
256        let new = poll_new_events(&mut sources);
257        assert_eq!(new.len(), 2);
258        assert_eq!(new[0].event.entity_id, "1.2");
259        assert_eq!(new[1].event.entity_id, "1.3");
260    }
261
262    #[test]
263    fn poll_returns_empty_when_no_new_events() {
264        let tmp = tempfile::tempdir().expect("tempdir");
265        let ito_path = tmp.path().join(".ito");
266
267        let writer = crate::audit::default_audit_store(&ito_path);
268        writer.append(&test_event("1.1")).expect("append");
269
270        let config = StreamConfig::default();
271        let (_initial, mut sources) = read_initial_events(&ito_path, &config);
272
273        let new = poll_new_events(&mut sources);
274        assert!(new.is_empty());
275    }
276
277    #[test]
278    fn poll_detects_new_events_from_routed_store() {
279        let tmp = tempfile::tempdir().expect("tempdir");
280        init_git_repo(tmp.path());
281        let ito_path = tmp.path().join(".ito");
282        std::fs::create_dir_all(&ito_path).expect("create ito dir");
283
284        let store = crate::audit::default_audit_store(&ito_path);
285        store.append(&test_event("1.1")).expect("append");
286
287        let config = StreamConfig::default();
288        let (_initial, mut sources) = read_initial_events(&ito_path, &config);
289
290        store.append(&test_event("1.2")).expect("append");
291        store.append(&test_event("1.3")).expect("append");
292
293        let new = poll_new_events(&mut sources);
294        assert_eq!(new.len(), 2);
295        assert_eq!(new[0].event.entity_id, "1.2");
296        assert_eq!(new[1].event.entity_id, "1.3");
297    }
298
299    #[test]
300    fn default_config_has_sensible_values() {
301        let config = StreamConfig::default();
302        assert_eq!(config.poll_interval, Duration::from_millis(500));
303        assert!(!config.all_worktrees);
304        assert_eq!(config.last, 10);
305    }
306}