Skip to main content

ito_core/audit/
stream.rs

1//! Poll-based audit event streaming with multi-worktree support.
2//!
3//! Provides a simple file-watching mechanism that polls the JSONL audit log
4//! for new events at a configurable interval. Supports monitoring events
5//! across multiple git worktrees.
6
7use std::path::{Path, PathBuf};
8use std::time::Duration;
9
10use ito_domain::audit::event::AuditEvent;
11
12use super::reader::read_audit_events;
13use super::worktree::{discover_worktrees, worktree_audit_log_path};
14use super::writer::audit_log_path;
15
16/// Configuration for the event stream.
17#[derive(Debug, Clone)]
18pub struct StreamConfig {
19    /// Poll interval (default: 500ms).
20    pub poll_interval: Duration,
21    /// Monitor all worktrees, not just the current one.
22    pub all_worktrees: bool,
23    /// Number of initial events to emit on startup.
24    pub last: usize,
25}
26
27impl Default for StreamConfig {
28    fn default() -> Self {
29        Self {
30            poll_interval: Duration::from_millis(500),
31            all_worktrees: false,
32            last: 10,
33        }
34    }
35}
36
37/// A single stream source: either the main project or a worktree.
38#[derive(Debug)]
39pub struct StreamSource {
40    /// Label for this source (e.g., "main" or worktree branch name).
41    pub label: String,
42    /// Path to the audit log file.
43    log_path: PathBuf,
44    /// Number of lines previously seen.
45    offset: usize,
46}
47
48/// A streamed event with its source label.
49#[derive(Debug)]
50pub struct StreamEvent {
51    /// The audit event.
52    pub event: AuditEvent,
53    /// Label of the source (e.g., "main" or branch name).
54    pub source: String,
55}
56
57/// Read the initial batch of events for streaming (the last N events).
58///
59/// Returns events from the main project log and, if `all_worktrees` is true,
60/// from all discovered worktrees.
61pub fn read_initial_events(
62    ito_path: &Path,
63    config: &StreamConfig,
64) -> (Vec<StreamEvent>, Vec<StreamSource>) {
65    let mut sources = Vec::new();
66    let mut events = Vec::new();
67
68    // Main project source
69    let main_events = read_audit_events(ito_path);
70    let main_log = audit_log_path(ito_path);
71    let start = main_events.len().saturating_sub(config.last);
72    for event in &main_events[start..] {
73        events.push(StreamEvent {
74            event: event.clone(),
75            source: "main".to_string(),
76        });
77    }
78    sources.push(StreamSource {
79        label: "main".to_string(),
80        log_path: main_log,
81        offset: main_events.len(),
82    });
83
84    // Worktree sources
85    if config.all_worktrees {
86        let worktrees = discover_worktrees(ito_path);
87        for wt in &worktrees {
88            if wt.is_main {
89                continue; // Already handled above
90            }
91            let wt_ito_path = wt.path.join(".ito");
92            let wt_log = worktree_audit_log_path(wt);
93            let wt_events = read_audit_events(&wt_ito_path);
94            let label = wt
95                .branch
96                .clone()
97                .unwrap_or_else(|| wt.path.display().to_string());
98            let start = wt_events.len().saturating_sub(config.last);
99            for event in &wt_events[start..] {
100                events.push(StreamEvent {
101                    event: event.clone(),
102                    source: label.clone(),
103                });
104            }
105            sources.push(StreamSource {
106                label,
107                log_path: wt_log,
108                offset: wt_events.len(),
109            });
110        }
111    }
112
113    (events, sources)
114}
115
116/// Poll all sources for new events since the last check.
117///
118/// Updates the offsets in each source so subsequent polls only return new events.
119pub fn poll_new_events(sources: &mut [StreamSource]) -> Vec<StreamEvent> {
120    let mut new_events = Vec::new();
121
122    for source in sources.iter_mut() {
123        let Ok(contents) = std::fs::read_to_string(&source.log_path) else {
124            continue;
125        };
126
127        let lines: Vec<&str> = contents.lines().collect();
128        if lines.len() <= source.offset {
129            continue;
130        }
131
132        for line in &lines[source.offset..] {
133            let line = line.trim();
134            if line.is_empty() {
135                continue;
136            }
137            if let Ok(event) = serde_json::from_str::<AuditEvent>(line) {
138                new_events.push(StreamEvent {
139                    event,
140                    source: source.label.clone(),
141                });
142            }
143        }
144
145        source.offset = lines.len();
146    }
147
148    new_events
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
155    use ito_domain::audit::writer::AuditWriter;
156
157    fn test_event(entity_id: &str) -> AuditEvent {
158        AuditEvent {
159            v: SCHEMA_VERSION,
160            ts: "2026-02-08T14:30:00.000Z".to_string(),
161            entity: "task".to_string(),
162            entity_id: entity_id.to_string(),
163            scope: Some("test-change".to_string()),
164            op: "create".to_string(),
165            from: None,
166            to: Some("pending".to_string()),
167            actor: "cli".to_string(),
168            by: "@test".to_string(),
169            meta: None,
170            ctx: EventContext {
171                session_id: "test-sid".to_string(),
172                harness_session_id: None,
173                branch: None,
174                worktree: None,
175                commit: None,
176            },
177        }
178    }
179
180    #[test]
181    fn read_initial_events_returns_last_n() {
182        let tmp = tempfile::tempdir().expect("tempdir");
183        let ito_path = tmp.path().join(".ito");
184
185        let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
186        for i in 0..20 {
187            writer
188                .append(&test_event(&format!("1.{i}")))
189                .expect("append");
190        }
191
192        let config = StreamConfig {
193            last: 5,
194            all_worktrees: false,
195            ..Default::default()
196        };
197
198        let (events, sources) = read_initial_events(&ito_path, &config);
199        assert_eq!(events.len(), 5);
200        assert_eq!(sources.len(), 1);
201        assert_eq!(sources[0].offset, 20);
202        assert_eq!(events[0].event.entity_id, "1.15");
203    }
204
205    #[test]
206    fn poll_detects_new_events() {
207        let tmp = tempfile::tempdir().expect("tempdir");
208        let ito_path = tmp.path().join(".ito");
209
210        let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
211        writer.append(&test_event("1.1")).expect("append");
212
213        let config = StreamConfig::default();
214        let (_initial, mut sources) = read_initial_events(&ito_path, &config);
215
216        // Write more events
217        writer.append(&test_event("1.2")).expect("append");
218        writer.append(&test_event("1.3")).expect("append");
219
220        let new = poll_new_events(&mut sources);
221        assert_eq!(new.len(), 2);
222        assert_eq!(new[0].event.entity_id, "1.2");
223        assert_eq!(new[1].event.entity_id, "1.3");
224    }
225
226    #[test]
227    fn poll_returns_empty_when_no_new_events() {
228        let tmp = tempfile::tempdir().expect("tempdir");
229        let ito_path = tmp.path().join(".ito");
230
231        let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
232        writer.append(&test_event("1.1")).expect("append");
233
234        let config = StreamConfig::default();
235        let (_initial, mut sources) = read_initial_events(&ito_path, &config);
236
237        let new = poll_new_events(&mut sources);
238        assert!(new.is_empty());
239    }
240
241    #[test]
242    fn default_config_has_sensible_values() {
243        let config = StreamConfig::default();
244        assert_eq!(config.poll_interval, Duration::from_millis(500));
245        assert!(!config.all_worktrees);
246        assert_eq!(config.last, 10);
247    }
248}