1use std::collections::HashSet;
8use std::path::Path;
9use std::time::Duration;
10
11use ito_domain::audit::event::AuditEvent;
12
13use super::store::{AuditEventStore, audit_storage_location_key, default_audit_store};
14use super::worktree::discover_worktrees;
15
16#[derive(Debug, Clone)]
18pub struct StreamConfig {
19 pub poll_interval: Duration,
21 pub all_worktrees: bool,
23 pub last: usize,
25}
26
27impl Default for StreamConfig {
28 fn default() -> Self {
29 Self {
30 poll_interval: Duration::from_millis(500),
31 all_worktrees: false,
32 last: 10,
33 }
34 }
35}
36
37pub struct StreamSource {
39 pub label: String,
41 store: Box<dyn AuditEventStore>,
43 offset: usize,
49}
50
51#[derive(Debug)]
53pub struct StreamEvent {
54 pub event: AuditEvent,
56 pub source: String,
58}
59
60pub fn read_initial_events(
65 ito_path: &Path,
66 config: &StreamConfig,
67) -> (Vec<StreamEvent>, Vec<StreamSource>) {
68 let mut sources = Vec::new();
69 let mut events = Vec::new();
70 let mut seen_locations = HashSet::new();
71
72 let main_store = default_audit_store(ito_path);
74 let main_key = audit_storage_location_key(&main_store.location());
75 let main_events = main_store.read_all();
76 let start = main_events.len().saturating_sub(config.last);
77 for event in &main_events[start..] {
78 events.push(StreamEvent {
79 event: event.clone(),
80 source: "main".to_string(),
81 });
82 }
83 sources.push(StreamSource {
84 label: "main".to_string(),
85 store: main_store,
86 offset: main_events.len(),
87 });
88 seen_locations.insert(main_key);
89
90 if config.all_worktrees {
92 let worktrees = discover_worktrees(ito_path);
93 for wt in &worktrees {
94 if wt.is_main {
95 continue; }
97 let wt_ito_path = wt.path.join(".ito");
98 if !wt_ito_path.exists() {
99 continue;
100 }
101
102 let wt_store = default_audit_store(&wt_ito_path);
103 let wt_key = audit_storage_location_key(&wt_store.location());
104 if !seen_locations.insert(wt_key) {
105 continue;
106 }
107
108 let wt_events = wt_store.read_all();
109 let label = wt
110 .branch
111 .clone()
112 .unwrap_or_else(|| wt.path.display().to_string());
113 let start = wt_events.len().saturating_sub(config.last);
114 for event in &wt_events[start..] {
115 events.push(StreamEvent {
116 event: event.clone(),
117 source: label.clone(),
118 });
119 }
120 sources.push(StreamSource {
121 label,
122 store: wt_store,
123 offset: wt_events.len(),
124 });
125 }
126 }
127
128 (events, sources)
129}
130
131pub fn poll_new_events(sources: &mut [StreamSource]) -> Vec<StreamEvent> {
139 let mut new_events = Vec::new();
140
141 for source in sources.iter_mut() {
142 let current_events = source.store.read_all();
143 if current_events.len() <= source.offset {
144 continue;
145 }
146
147 for event in ¤t_events[source.offset..] {
148 new_events.push(StreamEvent {
149 event: event.clone(),
150 source: source.label.clone(),
151 });
152 }
153
154 source.offset = current_events.len();
155 }
156
157 new_events
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
164 use std::path::Path;
165
166 fn run_git(repo: &Path, args: &[&str]) {
167 let output = std::process::Command::new("git")
168 .args(args)
169 .current_dir(repo)
170 .env_remove("GIT_DIR")
171 .env_remove("GIT_WORK_TREE")
172 .output()
173 .expect("git should run");
174 assert!(
175 output.status.success(),
176 "git command failed: git {}\nstdout:\n{}\nstderr:\n{}",
177 args.join(" "),
178 String::from_utf8_lossy(&output.stdout),
179 String::from_utf8_lossy(&output.stderr)
180 );
181 }
182
183 fn init_git_repo(repo: &Path) {
184 run_git(repo, &["init"]);
185 run_git(repo, &["config", "user.email", "test@example.com"]);
186 run_git(repo, &["config", "user.name", "Test User"]);
187 run_git(repo, &["config", "commit.gpgsign", "false"]);
188 std::fs::write(repo.join("README.md"), "hi\n").expect("write readme");
189 run_git(repo, &["add", "README.md"]);
190 run_git(repo, &["commit", "-m", "initial"]);
191 }
192
193 fn test_event(entity_id: &str) -> AuditEvent {
194 AuditEvent {
195 v: SCHEMA_VERSION,
196 ts: "2026-02-08T14:30:00.000Z".to_string(),
197 entity: "task".to_string(),
198 entity_id: entity_id.to_string(),
199 scope: Some("test-change".to_string()),
200 op: "create".to_string(),
201 from: None,
202 to: Some("pending".to_string()),
203 actor: "cli".to_string(),
204 by: "@test".to_string(),
205 meta: None,
206 ctx: EventContext {
207 session_id: "test-sid".to_string(),
208 harness_session_id: None,
209 branch: None,
210 worktree: None,
211 commit: None,
212 },
213 }
214 }
215
216 #[test]
217 fn read_initial_events_returns_last_n() {
218 let tmp = tempfile::tempdir().expect("tempdir");
219 let ito_path = tmp.path().join(".ito");
220
221 let writer = crate::audit::default_audit_store(&ito_path);
222 for i in 0..20 {
223 writer
224 .append(&test_event(&format!("1.{i}")))
225 .expect("append");
226 }
227
228 let config = StreamConfig {
229 last: 5,
230 all_worktrees: false,
231 ..Default::default()
232 };
233
234 let (events, sources) = read_initial_events(&ito_path, &config);
235 assert_eq!(events.len(), 5);
236 assert_eq!(sources.len(), 1);
237 assert_eq!(sources[0].offset, 20);
238 assert_eq!(events[0].event.entity_id, "1.15");
239 }
240
241 #[test]
242 fn poll_detects_new_events() {
243 let tmp = tempfile::tempdir().expect("tempdir");
244 let ito_path = tmp.path().join(".ito");
245
246 let writer = crate::audit::default_audit_store(&ito_path);
247 writer.append(&test_event("1.1")).expect("append");
248
249 let config = StreamConfig::default();
250 let (_initial, mut sources) = read_initial_events(&ito_path, &config);
251
252 writer.append(&test_event("1.2")).expect("append");
254 writer.append(&test_event("1.3")).expect("append");
255
256 let new = poll_new_events(&mut sources);
257 assert_eq!(new.len(), 2);
258 assert_eq!(new[0].event.entity_id, "1.2");
259 assert_eq!(new[1].event.entity_id, "1.3");
260 }
261
262 #[test]
263 fn poll_returns_empty_when_no_new_events() {
264 let tmp = tempfile::tempdir().expect("tempdir");
265 let ito_path = tmp.path().join(".ito");
266
267 let writer = crate::audit::default_audit_store(&ito_path);
268 writer.append(&test_event("1.1")).expect("append");
269
270 let config = StreamConfig::default();
271 let (_initial, mut sources) = read_initial_events(&ito_path, &config);
272
273 let new = poll_new_events(&mut sources);
274 assert!(new.is_empty());
275 }
276
277 #[test]
278 fn poll_detects_new_events_from_routed_store() {
279 let tmp = tempfile::tempdir().expect("tempdir");
280 init_git_repo(tmp.path());
281 let ito_path = tmp.path().join(".ito");
282 std::fs::create_dir_all(&ito_path).expect("create ito dir");
283
284 let store = crate::audit::default_audit_store(&ito_path);
285 store.append(&test_event("1.1")).expect("append");
286
287 let config = StreamConfig::default();
288 let (_initial, mut sources) = read_initial_events(&ito_path, &config);
289
290 store.append(&test_event("1.2")).expect("append");
291 store.append(&test_event("1.3")).expect("append");
292
293 let new = poll_new_events(&mut sources);
294 assert_eq!(new.len(), 2);
295 assert_eq!(new[0].event.entity_id, "1.2");
296 assert_eq!(new[1].event.entity_id, "1.3");
297 }
298
299 #[test]
300 fn default_config_has_sensible_values() {
301 let config = StreamConfig::default();
302 assert_eq!(config.poll_interval, Duration::from_millis(500));
303 assert!(!config.all_worktrees);
304 assert_eq!(config.last, 10);
305 }
306}