1use std::path::Path;
10use std::thread;
11use std::time::Duration;
12
13use ito_domain::audit::event::AuditEvent;
14use ito_domain::backend::{BackendError, BackendEventIngestClient, EventBatch};
15
16use crate::backend_client::{idempotency_key, is_retriable_status};
17use crate::errors::{CoreError, CoreResult};
18
19const DEFAULT_BATCH_SIZE: usize = 100;
21
22const CHECKPOINT_FILE: &str = "event-forward-offset";
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ForwardResult {
28 pub forwarded: usize,
30 pub duplicates: usize,
32 pub failed_batches: usize,
34 pub total_local: usize,
36 pub new_offset: usize,
38}
39
40#[derive(Debug, Clone)]
42pub struct ForwarderConfig {
43 pub batch_size: usize,
45 pub max_retries: u32,
47 pub retry_base_delay: Duration,
49}
50
51impl Default for ForwarderConfig {
52 fn default() -> Self {
53 Self {
54 batch_size: DEFAULT_BATCH_SIZE,
55 max_retries: 3,
56 retry_base_delay: Duration::from_millis(500),
57 }
58 }
59}
60
61pub fn forward_events(
70 ingest_client: &dyn BackendEventIngestClient,
71 ito_path: &Path,
72 config: &ForwarderConfig,
73) -> CoreResult<ForwardResult> {
74 let all_events = read_all_events(ito_path);
75 let total_local = all_events.len();
76
77 let mut current_offset = read_checkpoint(ito_path);
78 if current_offset > total_local {
79 current_offset = total_local;
80 }
81
82 if current_offset >= total_local {
83 return Ok(ForwardResult {
84 forwarded: 0,
85 duplicates: 0,
86 failed_batches: 0,
87 total_local,
88 new_offset: current_offset,
89 });
90 }
91
92 let batch_size = config.batch_size.max(1);
93 let new_events = &all_events[current_offset..];
94 let mut forwarded = 0usize;
95 let mut duplicates = 0usize;
96 let mut failed_batches = 0usize;
97 let mut offset = current_offset;
98
99 for chunk in new_events.chunks(batch_size) {
100 let batch = EventBatch {
101 events: chunk.to_vec(),
102 idempotency_key: idempotency_key("event-forward"),
103 };
104
105 match submit_with_retry(ingest_client, &batch, config) {
106 Ok(result) => {
107 forwarded += result.accepted;
108 duplicates += result.duplicates;
109 offset += chunk.len();
110 if let Err(e) = write_checkpoint(ito_path, offset) {
112 tracing::warn!("failed to write forwarding checkpoint: {e}");
113 }
114 }
115 Err(e) => {
116 tracing::warn!("event forwarding batch failed: {e}");
117 failed_batches += 1;
118 break;
120 }
121 }
122 }
123
124 Ok(ForwardResult {
125 forwarded,
126 duplicates,
127 failed_batches,
128 total_local,
129 new_offset: offset,
130 })
131}
132
133fn submit_with_retry(
135 client: &dyn BackendEventIngestClient,
136 batch: &EventBatch,
137 config: &ForwarderConfig,
138) -> CoreResult<ito_domain::backend::EventIngestResult> {
139 let mut attempts = 0u32;
140 loop {
141 match client.ingest(batch) {
142 Ok(result) => return Ok(result),
143 Err(err) => {
144 attempts += 1;
145 if !is_retriable_backend_error(&err) || attempts > config.max_retries {
146 return Err(backend_ingest_error_to_core(err));
147 }
148 let delay = config.retry_base_delay * 2u32.saturating_pow(attempts - 1);
150 thread::sleep(delay);
151 }
152 }
153 }
154}
155
156fn is_retriable_backend_error(err: &BackendError) -> bool {
158 match err {
159 BackendError::Unavailable(_) => true,
160 BackendError::Other(msg) => {
161 if let Some(code_str) = msg.strip_prefix("HTTP ")
163 && let Ok(code) = code_str
164 .chars()
165 .take_while(|c| c.is_ascii_digit())
166 .collect::<String>()
167 .parse::<u16>()
168 {
169 return is_retriable_status(code);
170 }
171 false
172 }
173 BackendError::Unauthorized(_) => false,
174 BackendError::NotFound(_) => false,
175 BackendError::LeaseConflict(_) => false,
176 BackendError::RevisionConflict(_) => false,
177 }
178}
179
180fn backend_ingest_error_to_core(err: BackendError) -> CoreError {
182 match err {
183 BackendError::Unavailable(msg) => CoreError::process(format!(
184 "Backend unavailable during event forwarding: {msg}"
185 )),
186 BackendError::Unauthorized(msg) => CoreError::validation(format!(
187 "Backend auth failed during event forwarding: {msg}"
188 )),
189 BackendError::NotFound(msg) => {
190 CoreError::not_found(format!("Backend ingest endpoint not found: {msg}"))
191 }
192 BackendError::Other(msg) => {
193 CoreError::process(format!("Backend error during event forwarding: {msg}"))
194 }
195 BackendError::LeaseConflict(c) => CoreError::process(format!(
196 "Unexpected lease conflict during event forwarding: {}",
197 c.change_id
198 )),
199 BackendError::RevisionConflict(c) => CoreError::process(format!(
200 "Unexpected revision conflict during event forwarding: {}",
201 c.change_id
202 )),
203 }
204}
205
206fn checkpoint_path(ito_path: &Path) -> std::path::PathBuf {
210 ito_path.join(".state").join(CHECKPOINT_FILE)
211}
212
213fn read_checkpoint(ito_path: &Path) -> usize {
217 let path = checkpoint_path(ito_path);
218 let Ok(content) = std::fs::read_to_string(&path) else {
219 return 0;
220 };
221 content.trim().parse::<usize>().unwrap_or(0)
222}
223
224fn write_checkpoint(ito_path: &Path, offset: usize) -> CoreResult<()> {
226 let path = checkpoint_path(ito_path);
227 if let Some(parent) = path.parent() {
228 std::fs::create_dir_all(parent)
229 .map_err(|e| CoreError::io("creating checkpoint directory", e))?;
230 }
231 std::fs::write(&path, offset.to_string())
232 .map_err(|e| CoreError::io("writing forwarding checkpoint", e))
233}
234
235fn read_all_events(ito_path: &Path) -> Vec<AuditEvent> {
242 crate::audit::read_audit_events(ito_path)
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
249 use ito_domain::backend::{BackendError, EventIngestResult};
250 use std::sync::Mutex;
251 use std::sync::atomic::{AtomicUsize, Ordering};
252
253 fn make_event(id: &str) -> AuditEvent {
254 AuditEvent {
255 v: SCHEMA_VERSION,
256 ts: "2026-02-28T10:00:00.000Z".to_string(),
257 entity: "task".to_string(),
258 entity_id: id.to_string(),
259 scope: Some("test-change".to_string()),
260 op: "create".to_string(),
261 from: None,
262 to: Some("pending".to_string()),
263 actor: "cli".to_string(),
264 by: "@test".to_string(),
265 meta: None,
266 ctx: EventContext {
267 session_id: "test-sid".to_string(),
268 harness_session_id: None,
269 branch: None,
270 worktree: None,
271 commit: None,
272 },
273 }
274 }
275
276 struct FakeIngestClient {
278 call_count: AtomicUsize,
279 results: Mutex<Vec<Result<EventIngestResult, BackendError>>>,
280 }
281
282 impl FakeIngestClient {
283 fn always_ok() -> Self {
284 Self {
285 call_count: AtomicUsize::new(0),
286 results: Mutex::new(Vec::new()),
287 }
288 }
289
290 fn with_results(results: Vec<Result<EventIngestResult, BackendError>>) -> Self {
291 Self {
292 call_count: AtomicUsize::new(0),
293 results: Mutex::new(results),
294 }
295 }
296
297 fn calls(&self) -> usize {
298 self.call_count.load(Ordering::SeqCst)
299 }
300 }
301
302 impl BackendEventIngestClient for FakeIngestClient {
303 fn ingest(&self, batch: &EventBatch) -> Result<EventIngestResult, BackendError> {
304 let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
305 let results = self.results.lock().unwrap();
306 if idx < results.len() {
307 return results[idx].clone();
308 }
309 Ok(EventIngestResult {
311 accepted: batch.events.len(),
312 duplicates: 0,
313 })
314 }
315 }
316
317 fn write_events_to_log(ito_path: &Path, events: &[AuditEvent]) {
318 let writer = crate::audit::FsAuditWriter::new(ito_path);
319 for event in events {
320 crate::audit::AuditWriter::append(&writer, event).unwrap();
321 }
322 }
323
324 #[test]
325 fn forward_no_events_returns_zero() {
326 let tmp = tempfile::tempdir().unwrap();
327 let ito_path = tmp.path().join(".ito");
328 let client = FakeIngestClient::always_ok();
329 let config = ForwarderConfig::default();
330
331 let result = forward_events(&client, &ito_path, &config).unwrap();
332 assert_eq!(result.forwarded, 0);
333 assert_eq!(result.total_local, 0);
334 assert_eq!(result.new_offset, 0);
335 assert_eq!(result.failed_batches, 0);
336 assert_eq!(client.calls(), 0);
337 }
338
339 #[test]
340 fn forward_sends_all_new_events() {
341 let tmp = tempfile::tempdir().unwrap();
342 let ito_path = tmp.path().join(".ito");
343
344 let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
345 write_events_to_log(&ito_path, &events);
346
347 let client = FakeIngestClient::always_ok();
348 let config = ForwarderConfig {
349 batch_size: 10,
350 ..ForwarderConfig::default()
351 };
352
353 let result = forward_events(&client, &ito_path, &config).unwrap();
354 assert_eq!(result.forwarded, 5);
355 assert_eq!(result.total_local, 5);
356 assert_eq!(result.new_offset, 5);
357 assert_eq!(result.failed_batches, 0);
358 assert_eq!(client.calls(), 1); }
360
361 #[test]
362 fn forward_respects_checkpoint() {
363 let tmp = tempfile::tempdir().unwrap();
364 let ito_path = tmp.path().join(".ito");
365
366 let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
367 write_events_to_log(&ito_path, &events);
368
369 write_checkpoint(&ito_path, 3).unwrap();
371
372 let client = FakeIngestClient::always_ok();
373 let config = ForwarderConfig::default();
374
375 let result = forward_events(&client, &ito_path, &config).unwrap();
376 assert_eq!(result.forwarded, 2); assert_eq!(result.new_offset, 5);
378 }
379
380 #[test]
381 fn forward_skips_when_fully_forwarded() {
382 let tmp = tempfile::tempdir().unwrap();
383 let ito_path = tmp.path().join(".ito");
384
385 let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
386 write_events_to_log(&ito_path, &events);
387 write_checkpoint(&ito_path, 3).unwrap();
388
389 let client = FakeIngestClient::always_ok();
390 let config = ForwarderConfig::default();
391
392 let result = forward_events(&client, &ito_path, &config).unwrap();
393 assert_eq!(result.forwarded, 0);
394 assert_eq!(result.new_offset, 3);
395 assert_eq!(client.calls(), 0);
396 }
397
398 #[test]
399 fn forward_batches_correctly() {
400 let tmp = tempfile::tempdir().unwrap();
401 let ito_path = tmp.path().join(".ito");
402
403 let events: Vec<AuditEvent> = (0..7).map(|i| make_event(&format!("1.{i}"))).collect();
404 write_events_to_log(&ito_path, &events);
405
406 let client = FakeIngestClient::always_ok();
407 let config = ForwarderConfig {
408 batch_size: 3,
409 ..ForwarderConfig::default()
410 };
411
412 let result = forward_events(&client, &ito_path, &config).unwrap();
413 assert_eq!(result.forwarded, 7);
414 assert_eq!(client.calls(), 3); }
416
417 #[test]
418 fn forward_stops_on_permanent_failure() {
419 let tmp = tempfile::tempdir().unwrap();
420 let ito_path = tmp.path().join(".ito");
421
422 let events: Vec<AuditEvent> = (0..6).map(|i| make_event(&format!("1.{i}"))).collect();
423 write_events_to_log(&ito_path, &events);
424
425 let client = FakeIngestClient::with_results(vec![
426 Ok(EventIngestResult {
427 accepted: 3,
428 duplicates: 0,
429 }),
430 Err(BackendError::Unauthorized("bad token".to_string())),
431 ]);
432 let config = ForwarderConfig {
433 batch_size: 3,
434 max_retries: 0,
435 retry_base_delay: Duration::from_millis(1),
436 };
437
438 let result = forward_events(&client, &ito_path, &config).unwrap();
439 assert_eq!(result.forwarded, 3); assert_eq!(result.failed_batches, 1);
441 assert_eq!(result.new_offset, 3); }
443
444 #[test]
445 fn forward_retries_transient_failure() {
446 let tmp = tempfile::tempdir().unwrap();
447 let ito_path = tmp.path().join(".ito");
448
449 let events: Vec<AuditEvent> = (0..2).map(|i| make_event(&format!("1.{i}"))).collect();
450 write_events_to_log(&ito_path, &events);
451
452 let client = FakeIngestClient::with_results(vec![
453 Err(BackendError::Unavailable("timeout".to_string())),
454 Ok(EventIngestResult {
455 accepted: 2,
456 duplicates: 0,
457 }),
458 ]);
459 let config = ForwarderConfig {
460 batch_size: 10,
461 max_retries: 3,
462 retry_base_delay: Duration::from_millis(1),
463 };
464
465 let result = forward_events(&client, &ito_path, &config).unwrap();
466 assert_eq!(result.forwarded, 2);
467 assert_eq!(result.failed_batches, 0);
468 assert_eq!(client.calls(), 2); }
470
471 #[test]
472 fn forward_reports_duplicates() {
473 let tmp = tempfile::tempdir().unwrap();
474 let ito_path = tmp.path().join(".ito");
475
476 let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
477 write_events_to_log(&ito_path, &events);
478
479 let client = FakeIngestClient::with_results(vec![Ok(EventIngestResult {
480 accepted: 1,
481 duplicates: 2,
482 })]);
483 let config = ForwarderConfig::default();
484
485 let result = forward_events(&client, &ito_path, &config).unwrap();
486 assert_eq!(result.forwarded, 1);
487 assert_eq!(result.duplicates, 2);
488 }
489
490 #[test]
491 fn checkpoint_roundtrip() {
492 let tmp = tempfile::tempdir().unwrap();
493 let ito_path = tmp.path().join(".ito");
494
495 assert_eq!(read_checkpoint(&ito_path), 0);
496
497 write_checkpoint(&ito_path, 42).unwrap();
498 assert_eq!(read_checkpoint(&ito_path), 42);
499
500 write_checkpoint(&ito_path, 100).unwrap();
501 assert_eq!(read_checkpoint(&ito_path), 100);
502 }
503
504 #[test]
505 fn checkpoint_missing_returns_zero() {
506 let tmp = tempfile::tempdir().unwrap();
507 let ito_path = tmp.path().join(".ito");
508 assert_eq!(read_checkpoint(&ito_path), 0);
509 }
510
511 #[test]
512 fn is_retriable_backend_error_checks() {
513 assert!(is_retriable_backend_error(&BackendError::Unavailable(
514 "timeout".to_string()
515 )));
516 assert!(!is_retriable_backend_error(&BackendError::Unauthorized(
517 "bad".to_string()
518 )));
519 assert!(!is_retriable_backend_error(&BackendError::NotFound(
520 "nope".to_string()
521 )));
522 }
523
524 #[test]
525 fn forward_persists_checkpoint_per_batch() {
526 let tmp = tempfile::tempdir().unwrap();
527 let ito_path = tmp.path().join(".ito");
528
529 let events: Vec<AuditEvent> = (0..4).map(|i| make_event(&format!("1.{i}"))).collect();
530 write_events_to_log(&ito_path, &events);
531
532 let client = FakeIngestClient::always_ok();
533 let config = ForwarderConfig {
534 batch_size: 2,
535 ..ForwarderConfig::default()
536 };
537
538 forward_events(&client, &ito_path, &config).unwrap();
539 assert_eq!(read_checkpoint(&ito_path), 4);
541 }
542
543 #[test]
544 fn forward_result_equality() {
545 let a = ForwardResult {
546 forwarded: 5,
547 duplicates: 0,
548 failed_batches: 0,
549 total_local: 5,
550 new_offset: 5,
551 };
552 let b = a.clone();
553 assert_eq!(a, b);
554 }
555}