1use std::path::Path;
10use std::thread;
11use std::time::Duration;
12
13use ito_domain::audit::event::AuditEvent;
14use ito_domain::backend::{BackendError, BackendEventIngestClient, EventBatch};
15
16use crate::backend_client::{idempotency_key, is_retriable_status};
17use crate::errors::{CoreError, CoreResult};
18
19const DEFAULT_BATCH_SIZE: usize = 100;
21
22const CHECKPOINT_FILE: &str = "event-forward-offset";
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ForwardResult {
28 pub forwarded: usize,
30 pub duplicates: usize,
32 pub failed_batches: usize,
34 pub total_local: usize,
36 pub new_offset: usize,
38}
39
40#[derive(Debug, Clone)]
42pub struct ForwarderConfig {
43 pub batch_size: usize,
45 pub max_retries: u32,
47 pub retry_base_delay: Duration,
49}
50
51impl Default for ForwarderConfig {
52 fn default() -> Self {
53 Self {
54 batch_size: DEFAULT_BATCH_SIZE,
55 max_retries: 3,
56 retry_base_delay: Duration::from_millis(500),
57 }
58 }
59}
60
61pub fn forward_events(
70 ingest_client: &dyn BackendEventIngestClient,
71 ito_path: &Path,
72 config: &ForwarderConfig,
73) -> CoreResult<ForwardResult> {
74 let all_events = read_all_events(ito_path);
75 let total_local = all_events.len();
76
77 let mut current_offset = read_checkpoint(ito_path);
78 if current_offset > total_local {
79 current_offset = total_local;
80 }
81
82 if current_offset >= total_local {
83 return Ok(ForwardResult {
84 forwarded: 0,
85 duplicates: 0,
86 failed_batches: 0,
87 total_local,
88 new_offset: current_offset,
89 });
90 }
91
92 let batch_size = config.batch_size.max(1);
93 let new_events = &all_events[current_offset..];
94 let mut forwarded = 0usize;
95 let mut duplicates = 0usize;
96 let mut failed_batches = 0usize;
97 let mut offset = current_offset;
98
99 for chunk in new_events.chunks(batch_size) {
100 let batch = EventBatch {
101 events: chunk.to_vec(),
102 idempotency_key: idempotency_key("event-forward"),
103 };
104
105 match submit_with_retry(ingest_client, &batch, config) {
106 Ok(result) => {
107 forwarded += result.accepted;
108 duplicates += result.duplicates;
109 offset += chunk.len();
110 if let Err(e) = write_checkpoint(ito_path, offset) {
112 tracing::warn!("failed to write forwarding checkpoint: {e}");
113 }
114 }
115 Err(e) => {
116 tracing::warn!("event forwarding batch failed: {e}");
117 failed_batches += 1;
118 break;
120 }
121 }
122 }
123
124 Ok(ForwardResult {
125 forwarded,
126 duplicates,
127 failed_batches,
128 total_local,
129 new_offset: offset,
130 })
131}
132
133fn submit_with_retry(
135 client: &dyn BackendEventIngestClient,
136 batch: &EventBatch,
137 config: &ForwarderConfig,
138) -> CoreResult<ito_domain::backend::EventIngestResult> {
139 let mut attempts = 0u32;
140 loop {
141 match client.ingest(batch) {
142 Ok(result) => return Ok(result),
143 Err(err) => {
144 attempts += 1;
145 if !is_retriable_backend_error(&err) || attempts > config.max_retries {
146 return Err(backend_ingest_error_to_core(err));
147 }
148 let delay = config.retry_base_delay * 2u32.saturating_pow(attempts - 1);
150 thread::sleep(delay);
151 }
152 }
153 }
154}
155
156fn is_retriable_backend_error(err: &BackendError) -> bool {
158 match err {
159 BackendError::Unavailable(_) => true,
160 BackendError::Other(msg) => {
161 if let Some(code_str) = msg.strip_prefix("HTTP ")
163 && let Ok(code) = code_str
164 .chars()
165 .take_while(|c| c.is_ascii_digit())
166 .collect::<String>()
167 .parse::<u16>()
168 {
169 return is_retriable_status(code);
170 }
171 false
172 }
173 BackendError::Unauthorized(_) => false,
174 BackendError::NotFound(_) => false,
175 BackendError::LeaseConflict(_) => false,
176 BackendError::RevisionConflict(_) => false,
177 }
178}
179
180fn backend_ingest_error_to_core(err: BackendError) -> CoreError {
182 match err {
183 BackendError::Unavailable(msg) => CoreError::process(format!(
184 "Backend unavailable during event forwarding: {msg}"
185 )),
186 BackendError::Unauthorized(msg) => CoreError::validation(format!(
187 "Backend auth failed during event forwarding: {msg}"
188 )),
189 BackendError::NotFound(msg) => {
190 CoreError::not_found(format!("Backend ingest endpoint not found: {msg}"))
191 }
192 BackendError::Other(msg) => {
193 CoreError::process(format!("Backend error during event forwarding: {msg}"))
194 }
195 BackendError::LeaseConflict(c) => CoreError::process(format!(
196 "Unexpected lease conflict during event forwarding: {}",
197 c.change_id
198 )),
199 BackendError::RevisionConflict(c) => CoreError::process(format!(
200 "Unexpected revision conflict during event forwarding: {}",
201 c.change_id
202 )),
203 }
204}
205
206fn checkpoint_path(ito_path: &Path) -> std::path::PathBuf {
210 ito_path.join(".state").join(CHECKPOINT_FILE)
211}
212
213fn read_checkpoint(ito_path: &Path) -> usize {
217 let path = checkpoint_path(ito_path);
218 let Ok(content) = std::fs::read_to_string(&path) else {
219 return 0;
220 };
221 content.trim().parse::<usize>().unwrap_or(0)
222}
223
224fn write_checkpoint(ito_path: &Path, offset: usize) -> CoreResult<()> {
226 let path = checkpoint_path(ito_path);
227 if let Some(parent) = path.parent() {
228 std::fs::create_dir_all(parent)
229 .map_err(|e| CoreError::io("creating checkpoint directory", e))?;
230 }
231 std::fs::write(&path, offset.to_string())
232 .map_err(|e| CoreError::io("writing forwarding checkpoint", e))
233}
234
235fn read_all_events(ito_path: &Path) -> Vec<AuditEvent> {
239 crate::audit::default_audit_store(ito_path).read_all()
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
246 use ito_domain::backend::{BackendError, EventIngestResult};
247 use std::path::Path;
248 use std::sync::Mutex;
249 use std::sync::atomic::{AtomicUsize, Ordering};
250
251 fn run_git(repo: &Path, args: &[&str]) {
252 let output = std::process::Command::new("git")
253 .args(args)
254 .current_dir(repo)
255 .env_remove("GIT_DIR")
256 .env_remove("GIT_WORK_TREE")
257 .output()
258 .expect("git should run");
259 assert!(
260 output.status.success(),
261 "git command failed: git {}\nstdout:\n{}\nstderr:\n{}",
262 args.join(" "),
263 String::from_utf8_lossy(&output.stdout),
264 String::from_utf8_lossy(&output.stderr)
265 );
266 }
267
268 fn init_git_repo(repo: &Path) {
269 run_git(repo, &["init"]);
270 run_git(repo, &["config", "user.email", "test@example.com"]);
271 run_git(repo, &["config", "user.name", "Test User"]);
272 run_git(repo, &["config", "commit.gpgsign", "false"]);
273 std::fs::write(repo.join("README.md"), "hi\n").expect("write readme");
274 run_git(repo, &["add", "README.md"]);
275 run_git(repo, &["commit", "-m", "initial"]);
276 }
277
278 fn make_event(id: &str) -> AuditEvent {
279 AuditEvent {
280 v: SCHEMA_VERSION,
281 ts: "2026-02-28T10:00:00.000Z".to_string(),
282 entity: "task".to_string(),
283 entity_id: id.to_string(),
284 scope: Some("test-change".to_string()),
285 op: "create".to_string(),
286 from: None,
287 to: Some("pending".to_string()),
288 actor: "cli".to_string(),
289 by: "@test".to_string(),
290 meta: None,
291 ctx: EventContext {
292 session_id: "test-sid".to_string(),
293 harness_session_id: None,
294 branch: None,
295 worktree: None,
296 commit: None,
297 },
298 }
299 }
300
301 struct FakeIngestClient {
303 call_count: AtomicUsize,
304 results: Mutex<Vec<Result<EventIngestResult, BackendError>>>,
305 }
306
307 impl FakeIngestClient {
308 fn always_ok() -> Self {
309 Self {
310 call_count: AtomicUsize::new(0),
311 results: Mutex::new(Vec::new()),
312 }
313 }
314
315 fn with_results(results: Vec<Result<EventIngestResult, BackendError>>) -> Self {
316 Self {
317 call_count: AtomicUsize::new(0),
318 results: Mutex::new(results),
319 }
320 }
321
322 fn calls(&self) -> usize {
323 self.call_count.load(Ordering::SeqCst)
324 }
325 }
326
327 impl BackendEventIngestClient for FakeIngestClient {
328 fn ingest(&self, batch: &EventBatch) -> Result<EventIngestResult, BackendError> {
329 let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
330 let results = self.results.lock().unwrap();
331 if idx < results.len() {
332 return results[idx].clone();
333 }
334 Ok(EventIngestResult {
336 accepted: batch.events.len(),
337 duplicates: 0,
338 })
339 }
340 }
341
342 fn write_events_to_log(ito_path: &Path, events: &[AuditEvent]) {
343 let writer = crate::audit::default_audit_store(ito_path);
344 for event in events {
345 crate::audit::AuditWriter::append(writer.as_ref(), event).unwrap();
346 }
347 }
348
349 #[test]
350 fn forward_reads_events_from_routed_local_store() {
351 let tmp = tempfile::tempdir().unwrap();
352 init_git_repo(tmp.path());
353 let ito_path = tmp.path().join(".ito");
354 std::fs::create_dir_all(&ito_path).unwrap();
355
356 let store = crate::audit::default_audit_store(&ito_path);
357 crate::audit::AuditWriter::append(store.as_ref(), &make_event("1.1")).unwrap();
358
359 let client = FakeIngestClient::always_ok();
360 let config = ForwarderConfig::default();
361 let result = forward_events(&client, &ito_path, &config).unwrap();
362
363 assert_eq!(result.forwarded, 1);
364 assert_eq!(result.total_local, 1);
365 assert_eq!(client.calls(), 1);
366 }
367
368 #[test]
369 fn forward_no_events_returns_zero() {
370 let tmp = tempfile::tempdir().unwrap();
371 let ito_path = tmp.path().join(".ito");
372 let client = FakeIngestClient::always_ok();
373 let config = ForwarderConfig::default();
374
375 let result = forward_events(&client, &ito_path, &config).unwrap();
376 assert_eq!(result.forwarded, 0);
377 assert_eq!(result.total_local, 0);
378 assert_eq!(result.new_offset, 0);
379 assert_eq!(result.failed_batches, 0);
380 assert_eq!(client.calls(), 0);
381 }
382
383 #[test]
384 fn forward_sends_all_new_events() {
385 let tmp = tempfile::tempdir().unwrap();
386 let ito_path = tmp.path().join(".ito");
387
388 let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
389 write_events_to_log(&ito_path, &events);
390
391 let client = FakeIngestClient::always_ok();
392 let config = ForwarderConfig {
393 batch_size: 10,
394 ..ForwarderConfig::default()
395 };
396
397 let result = forward_events(&client, &ito_path, &config).unwrap();
398 assert_eq!(result.forwarded, 5);
399 assert_eq!(result.total_local, 5);
400 assert_eq!(result.new_offset, 5);
401 assert_eq!(result.failed_batches, 0);
402 assert_eq!(client.calls(), 1); }
404
405 #[test]
406 fn forward_respects_checkpoint() {
407 let tmp = tempfile::tempdir().unwrap();
408 let ito_path = tmp.path().join(".ito");
409
410 let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
411 write_events_to_log(&ito_path, &events);
412
413 write_checkpoint(&ito_path, 3).unwrap();
415
416 let client = FakeIngestClient::always_ok();
417 let config = ForwarderConfig::default();
418
419 let result = forward_events(&client, &ito_path, &config).unwrap();
420 assert_eq!(result.forwarded, 2); assert_eq!(result.new_offset, 5);
422 }
423
424 #[test]
425 fn forward_skips_when_fully_forwarded() {
426 let tmp = tempfile::tempdir().unwrap();
427 let ito_path = tmp.path().join(".ito");
428
429 let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
430 write_events_to_log(&ito_path, &events);
431 write_checkpoint(&ito_path, 3).unwrap();
432
433 let client = FakeIngestClient::always_ok();
434 let config = ForwarderConfig::default();
435
436 let result = forward_events(&client, &ito_path, &config).unwrap();
437 assert_eq!(result.forwarded, 0);
438 assert_eq!(result.new_offset, 3);
439 assert_eq!(client.calls(), 0);
440 }
441
442 #[test]
443 fn forward_batches_correctly() {
444 let tmp = tempfile::tempdir().unwrap();
445 let ito_path = tmp.path().join(".ito");
446
447 let events: Vec<AuditEvent> = (0..7).map(|i| make_event(&format!("1.{i}"))).collect();
448 write_events_to_log(&ito_path, &events);
449
450 let client = FakeIngestClient::always_ok();
451 let config = ForwarderConfig {
452 batch_size: 3,
453 ..ForwarderConfig::default()
454 };
455
456 let result = forward_events(&client, &ito_path, &config).unwrap();
457 assert_eq!(result.forwarded, 7);
458 assert_eq!(client.calls(), 3); }
460
461 #[test]
462 fn forward_stops_on_permanent_failure() {
463 let tmp = tempfile::tempdir().unwrap();
464 let ito_path = tmp.path().join(".ito");
465
466 let events: Vec<AuditEvent> = (0..6).map(|i| make_event(&format!("1.{i}"))).collect();
467 write_events_to_log(&ito_path, &events);
468
469 let client = FakeIngestClient::with_results(vec![
470 Ok(EventIngestResult {
471 accepted: 3,
472 duplicates: 0,
473 }),
474 Err(BackendError::Unauthorized("bad token".to_string())),
475 ]);
476 let config = ForwarderConfig {
477 batch_size: 3,
478 max_retries: 0,
479 retry_base_delay: Duration::from_millis(1),
480 };
481
482 let result = forward_events(&client, &ito_path, &config).unwrap();
483 assert_eq!(result.forwarded, 3); assert_eq!(result.failed_batches, 1);
485 assert_eq!(result.new_offset, 3); }
487
488 #[test]
489 fn forward_retries_transient_failure() {
490 let tmp = tempfile::tempdir().unwrap();
491 let ito_path = tmp.path().join(".ito");
492
493 let events: Vec<AuditEvent> = (0..2).map(|i| make_event(&format!("1.{i}"))).collect();
494 write_events_to_log(&ito_path, &events);
495
496 let client = FakeIngestClient::with_results(vec![
497 Err(BackendError::Unavailable("timeout".to_string())),
498 Ok(EventIngestResult {
499 accepted: 2,
500 duplicates: 0,
501 }),
502 ]);
503 let config = ForwarderConfig {
504 batch_size: 10,
505 max_retries: 3,
506 retry_base_delay: Duration::from_millis(1),
507 };
508
509 let result = forward_events(&client, &ito_path, &config).unwrap();
510 assert_eq!(result.forwarded, 2);
511 assert_eq!(result.failed_batches, 0);
512 assert_eq!(client.calls(), 2); }
514
515 #[test]
516 fn forward_reports_duplicates() {
517 let tmp = tempfile::tempdir().unwrap();
518 let ito_path = tmp.path().join(".ito");
519
520 let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
521 write_events_to_log(&ito_path, &events);
522
523 let client = FakeIngestClient::with_results(vec![Ok(EventIngestResult {
524 accepted: 1,
525 duplicates: 2,
526 })]);
527 let config = ForwarderConfig::default();
528
529 let result = forward_events(&client, &ito_path, &config).unwrap();
530 assert_eq!(result.forwarded, 1);
531 assert_eq!(result.duplicates, 2);
532 }
533
534 #[test]
535 fn checkpoint_roundtrip() {
536 let tmp = tempfile::tempdir().unwrap();
537 let ito_path = tmp.path().join(".ito");
538
539 assert_eq!(read_checkpoint(&ito_path), 0);
540
541 write_checkpoint(&ito_path, 42).unwrap();
542 assert_eq!(read_checkpoint(&ito_path), 42);
543
544 write_checkpoint(&ito_path, 100).unwrap();
545 assert_eq!(read_checkpoint(&ito_path), 100);
546 }
547
548 #[test]
549 fn checkpoint_missing_returns_zero() {
550 let tmp = tempfile::tempdir().unwrap();
551 let ito_path = tmp.path().join(".ito");
552 assert_eq!(read_checkpoint(&ito_path), 0);
553 }
554
555 #[test]
556 fn is_retriable_backend_error_checks() {
557 assert!(is_retriable_backend_error(&BackendError::Unavailable(
558 "timeout".to_string()
559 )));
560 assert!(!is_retriable_backend_error(&BackendError::Unauthorized(
561 "bad".to_string()
562 )));
563 assert!(!is_retriable_backend_error(&BackendError::NotFound(
564 "nope".to_string()
565 )));
566 }
567
568 #[test]
569 fn forward_persists_checkpoint_per_batch() {
570 let tmp = tempfile::tempdir().unwrap();
571 let ito_path = tmp.path().join(".ito");
572
573 let events: Vec<AuditEvent> = (0..4).map(|i| make_event(&format!("1.{i}"))).collect();
574 write_events_to_log(&ito_path, &events);
575
576 let client = FakeIngestClient::always_ok();
577 let config = ForwarderConfig {
578 batch_size: 2,
579 ..ForwarderConfig::default()
580 };
581
582 forward_events(&client, &ito_path, &config).unwrap();
583 assert_eq!(read_checkpoint(&ito_path), 4);
585 }
586
587 #[test]
588 fn forward_result_equality() {
589 let a = ForwardResult {
590 forwarded: 5,
591 duplicates: 0,
592 failed_batches: 0,
593 total_local: 5,
594 new_offset: 5,
595 };
596 let b = a.clone();
597 assert_eq!(a, b);
598 }
599}