1use std::path::{Path, PathBuf};
8use std::time::Duration;
9
10use ito_domain::audit::event::AuditEvent;
11
12use super::reader::read_audit_events;
13use super::worktree::{discover_worktrees, worktree_audit_log_path};
14use super::writer::audit_log_path;
15
16#[derive(Debug, Clone)]
18pub struct StreamConfig {
19 pub poll_interval: Duration,
21 pub all_worktrees: bool,
23 pub last: usize,
25}
26
27impl Default for StreamConfig {
28 fn default() -> Self {
29 Self {
30 poll_interval: Duration::from_millis(500),
31 all_worktrees: false,
32 last: 10,
33 }
34 }
35}
36
37#[derive(Debug)]
39pub struct StreamSource {
40 pub label: String,
42 log_path: PathBuf,
44 offset: usize,
46}
47
48#[derive(Debug)]
50pub struct StreamEvent {
51 pub event: AuditEvent,
53 pub source: String,
55}
56
57pub fn read_initial_events(
62 ito_path: &Path,
63 config: &StreamConfig,
64) -> (Vec<StreamEvent>, Vec<StreamSource>) {
65 let mut sources = Vec::new();
66 let mut events = Vec::new();
67
68 let main_events = read_audit_events(ito_path);
70 let main_log = audit_log_path(ito_path);
71 let start = main_events.len().saturating_sub(config.last);
72 for event in &main_events[start..] {
73 events.push(StreamEvent {
74 event: event.clone(),
75 source: "main".to_string(),
76 });
77 }
78 sources.push(StreamSource {
79 label: "main".to_string(),
80 log_path: main_log,
81 offset: main_events.len(),
82 });
83
84 if config.all_worktrees {
86 let worktrees = discover_worktrees(ito_path);
87 for wt in &worktrees {
88 if wt.is_main {
89 continue; }
91 let wt_ito_path = wt.path.join(".ito");
92 let wt_log = worktree_audit_log_path(wt);
93 let wt_events = read_audit_events(&wt_ito_path);
94 let label = wt
95 .branch
96 .clone()
97 .unwrap_or_else(|| wt.path.display().to_string());
98 let start = wt_events.len().saturating_sub(config.last);
99 for event in &wt_events[start..] {
100 events.push(StreamEvent {
101 event: event.clone(),
102 source: label.clone(),
103 });
104 }
105 sources.push(StreamSource {
106 label,
107 log_path: wt_log,
108 offset: wt_events.len(),
109 });
110 }
111 }
112
113 (events, sources)
114}
115
116pub fn poll_new_events(sources: &mut [StreamSource]) -> Vec<StreamEvent> {
120 let mut new_events = Vec::new();
121
122 for source in sources.iter_mut() {
123 let Ok(contents) = std::fs::read_to_string(&source.log_path) else {
124 continue;
125 };
126
127 let lines: Vec<&str> = contents.lines().collect();
128 if lines.len() <= source.offset {
129 continue;
130 }
131
132 for line in &lines[source.offset..] {
133 let line = line.trim();
134 if line.is_empty() {
135 continue;
136 }
137 if let Ok(event) = serde_json::from_str::<AuditEvent>(line) {
138 new_events.push(StreamEvent {
139 event,
140 source: source.label.clone(),
141 });
142 }
143 }
144
145 source.offset = lines.len();
146 }
147
148 new_events
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
155 use ito_domain::audit::writer::AuditWriter;
156
157 fn test_event(entity_id: &str) -> AuditEvent {
158 AuditEvent {
159 v: SCHEMA_VERSION,
160 ts: "2026-02-08T14:30:00.000Z".to_string(),
161 entity: "task".to_string(),
162 entity_id: entity_id.to_string(),
163 scope: Some("test-change".to_string()),
164 op: "create".to_string(),
165 from: None,
166 to: Some("pending".to_string()),
167 actor: "cli".to_string(),
168 by: "@test".to_string(),
169 meta: None,
170 ctx: EventContext {
171 session_id: "test-sid".to_string(),
172 harness_session_id: None,
173 branch: None,
174 worktree: None,
175 commit: None,
176 },
177 }
178 }
179
180 #[test]
181 fn read_initial_events_returns_last_n() {
182 let tmp = tempfile::tempdir().expect("tempdir");
183 let ito_path = tmp.path().join(".ito");
184
185 let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
186 for i in 0..20 {
187 writer
188 .append(&test_event(&format!("1.{i}")))
189 .expect("append");
190 }
191
192 let config = StreamConfig {
193 last: 5,
194 all_worktrees: false,
195 ..Default::default()
196 };
197
198 let (events, sources) = read_initial_events(&ito_path, &config);
199 assert_eq!(events.len(), 5);
200 assert_eq!(sources.len(), 1);
201 assert_eq!(sources[0].offset, 20);
202 assert_eq!(events[0].event.entity_id, "1.15");
203 }
204
205 #[test]
206 fn poll_detects_new_events() {
207 let tmp = tempfile::tempdir().expect("tempdir");
208 let ito_path = tmp.path().join(".ito");
209
210 let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
211 writer.append(&test_event("1.1")).expect("append");
212
213 let config = StreamConfig::default();
214 let (_initial, mut sources) = read_initial_events(&ito_path, &config);
215
216 writer.append(&test_event("1.2")).expect("append");
218 writer.append(&test_event("1.3")).expect("append");
219
220 let new = poll_new_events(&mut sources);
221 assert_eq!(new.len(), 2);
222 assert_eq!(new[0].event.entity_id, "1.2");
223 assert_eq!(new[1].event.entity_id, "1.3");
224 }
225
226 #[test]
227 fn poll_returns_empty_when_no_new_events() {
228 let tmp = tempfile::tempdir().expect("tempdir");
229 let ito_path = tmp.path().join(".ito");
230
231 let writer = crate::audit::writer::FsAuditWriter::new(&ito_path);
232 writer.append(&test_event("1.1")).expect("append");
233
234 let config = StreamConfig::default();
235 let (_initial, mut sources) = read_initial_events(&ito_path, &config);
236
237 let new = poll_new_events(&mut sources);
238 assert!(new.is_empty());
239 }
240
241 #[test]
242 fn default_config_has_sensible_values() {
243 let config = StreamConfig::default();
244 assert_eq!(config.poll_interval, Duration::from_millis(500));
245 assert!(!config.all_worktrees);
246 assert_eq!(config.last, 10);
247 }
248}