Skip to main content

ito_core/
event_forwarder.rs

1//! Client-side forwarding of locally produced audit events to the backend.
2//!
3//! The forwarder reads new events from the local JSONL audit log, batches
4//! them, and submits each batch to the backend event ingest endpoint with
5//! an idempotency key so retries are safe. A checkpoint file under
6//! `.ito/.state/` tracks the last forwarded line offset to avoid
7//! re-sending the entire log on each invocation.
8
9use std::path::Path;
10use std::thread;
11use std::time::Duration;
12
13use ito_domain::audit::event::AuditEvent;
14use ito_domain::backend::{BackendError, BackendEventIngestClient, EventBatch};
15
16use crate::backend_client::{idempotency_key, is_retriable_status};
17use crate::errors::{CoreError, CoreResult};
18
19/// Maximum events per batch submission.
20const DEFAULT_BATCH_SIZE: usize = 100;
21
22/// Checkpoint file name within `.ito/.state/`.
23const CHECKPOINT_FILE: &str = "event-forward-offset";
24
25/// Result of a forwarding run, used for CLI diagnostics.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ForwardResult {
28    /// Total events forwarded in this run.
29    pub forwarded: usize,
30    /// Total events skipped as duplicates by the backend.
31    pub duplicates: usize,
32    /// Number of batches that failed after exhausting retries.
33    pub failed_batches: usize,
34    /// Total events in the local log.
35    pub total_local: usize,
36    /// Offset after forwarding (line count forwarded so far).
37    pub new_offset: usize,
38}
39
40/// Configuration for the event forwarder.
41#[derive(Debug, Clone)]
42pub struct ForwarderConfig {
43    /// Maximum events per batch.
44    pub batch_size: usize,
45    /// Maximum retry attempts per batch on transient failure.
46    pub max_retries: u32,
47    /// Base delay between retries (doubles on each attempt).
48    pub retry_base_delay: Duration,
49}
50
51impl Default for ForwarderConfig {
52    fn default() -> Self {
53        Self {
54            batch_size: DEFAULT_BATCH_SIZE,
55            max_retries: 3,
56            retry_base_delay: Duration::from_millis(500),
57        }
58    }
59}
60
61/// Forward new local audit events to the backend.
62///
63/// Reads the audit log, skips events already forwarded (tracked by offset),
64/// batches new events, and submits each batch with an idempotency key.
65/// Updates the checkpoint file on success.
66///
67/// This function is designed to be called best-effort from the CLI after
68/// command completion in backend mode.
69pub fn forward_events(
70    ingest_client: &dyn BackendEventIngestClient,
71    ito_path: &Path,
72    config: &ForwarderConfig,
73) -> CoreResult<ForwardResult> {
74    let all_events = read_all_events(ito_path);
75    let total_local = all_events.len();
76
77    let mut current_offset = read_checkpoint(ito_path);
78    if current_offset > total_local {
79        current_offset = total_local;
80    }
81
82    if current_offset >= total_local {
83        return Ok(ForwardResult {
84            forwarded: 0,
85            duplicates: 0,
86            failed_batches: 0,
87            total_local,
88            new_offset: current_offset,
89        });
90    }
91
92    let batch_size = config.batch_size.max(1);
93    let new_events = &all_events[current_offset..];
94    let mut forwarded = 0usize;
95    let mut duplicates = 0usize;
96    let mut failed_batches = 0usize;
97    let mut offset = current_offset;
98
99    for chunk in new_events.chunks(batch_size) {
100        let batch = EventBatch {
101            events: chunk.to_vec(),
102            idempotency_key: idempotency_key("event-forward"),
103        };
104
105        match submit_with_retry(ingest_client, &batch, config) {
106            Ok(result) => {
107                forwarded += result.accepted;
108                duplicates += result.duplicates;
109                offset += chunk.len();
110                // Persist offset after each successful batch
111                if let Err(e) = write_checkpoint(ito_path, offset) {
112                    tracing::warn!("failed to write forwarding checkpoint: {e}");
113                }
114            }
115            Err(e) => {
116                tracing::warn!("event forwarding batch failed: {e}");
117                failed_batches += 1;
118                // Stop forwarding on failure to preserve ordering
119                break;
120            }
121        }
122    }
123
124    Ok(ForwardResult {
125        forwarded,
126        duplicates,
127        failed_batches,
128        total_local,
129        new_offset: offset,
130    })
131}
132
133/// Submit a batch with bounded retries on transient failures.
134fn submit_with_retry(
135    client: &dyn BackendEventIngestClient,
136    batch: &EventBatch,
137    config: &ForwarderConfig,
138) -> CoreResult<ito_domain::backend::EventIngestResult> {
139    let mut attempts = 0u32;
140    loop {
141        match client.ingest(batch) {
142            Ok(result) => return Ok(result),
143            Err(err) => {
144                attempts += 1;
145                if !is_retriable_backend_error(&err) || attempts > config.max_retries {
146                    return Err(backend_ingest_error_to_core(err));
147                }
148                // Exponential backoff
149                let delay = config.retry_base_delay * 2u32.saturating_pow(attempts - 1);
150                thread::sleep(delay);
151            }
152        }
153    }
154}
155
156/// Check if a backend error is transient and worth retrying.
157fn is_retriable_backend_error(err: &BackendError) -> bool {
158    match err {
159        BackendError::Unavailable(_) => true,
160        BackendError::Other(msg) => {
161            // Check for HTTP status codes embedded in the message
162            if let Some(code_str) = msg.strip_prefix("HTTP ")
163                && let Ok(code) = code_str
164                    .chars()
165                    .take_while(|c| c.is_ascii_digit())
166                    .collect::<String>()
167                    .parse::<u16>()
168            {
169                return is_retriable_status(code);
170            }
171            false
172        }
173        BackendError::Unauthorized(_) => false,
174        BackendError::NotFound(_) => false,
175        BackendError::LeaseConflict(_) => false,
176        BackendError::RevisionConflict(_) => false,
177    }
178}
179
180/// Convert a backend ingest error to a `CoreError`.
181fn backend_ingest_error_to_core(err: BackendError) -> CoreError {
182    match err {
183        BackendError::Unavailable(msg) => CoreError::process(format!(
184            "Backend unavailable during event forwarding: {msg}"
185        )),
186        BackendError::Unauthorized(msg) => CoreError::validation(format!(
187            "Backend auth failed during event forwarding: {msg}"
188        )),
189        BackendError::NotFound(msg) => {
190            CoreError::not_found(format!("Backend ingest endpoint not found: {msg}"))
191        }
192        BackendError::Other(msg) => {
193            CoreError::process(format!("Backend error during event forwarding: {msg}"))
194        }
195        BackendError::LeaseConflict(c) => CoreError::process(format!(
196            "Unexpected lease conflict during event forwarding: {}",
197            c.change_id
198        )),
199        BackendError::RevisionConflict(c) => CoreError::process(format!(
200            "Unexpected revision conflict during event forwarding: {}",
201            c.change_id
202        )),
203    }
204}
205
206// ── Checkpoint persistence ─────────────────────────────────────────
207
208/// Path to the forwarding checkpoint file.
209fn checkpoint_path(ito_path: &Path) -> std::path::PathBuf {
210    ito_path.join(".state").join(CHECKPOINT_FILE)
211}
212
213/// Read the current forwarding offset from the checkpoint file.
214///
215/// Returns 0 if the file does not exist or is malformed.
216fn read_checkpoint(ito_path: &Path) -> usize {
217    let path = checkpoint_path(ito_path);
218    let Ok(content) = std::fs::read_to_string(&path) else {
219        return 0;
220    };
221    content.trim().parse::<usize>().unwrap_or(0)
222}
223
224/// Write the forwarding offset to the checkpoint file.
225fn write_checkpoint(ito_path: &Path, offset: usize) -> CoreResult<()> {
226    let path = checkpoint_path(ito_path);
227    if let Some(parent) = path.parent() {
228        std::fs::create_dir_all(parent)
229            .map_err(|e| CoreError::io("creating checkpoint directory", e))?;
230    }
231    std::fs::write(&path, offset.to_string())
232        .map_err(|e| CoreError::io("writing forwarding checkpoint", e))
233}
234
235// ── Event reading ──────────────────────────────────────────────────
236
237/// Read all audit events from the routed local audit store.
238fn read_all_events(ito_path: &Path) -> Vec<AuditEvent> {
239    crate::audit::default_audit_store(ito_path).read_all()
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
246    use ito_domain::backend::{BackendError, EventIngestResult};
247    use std::path::Path;
248    use std::sync::Mutex;
249    use std::sync::atomic::{AtomicUsize, Ordering};
250
251    fn run_git(repo: &Path, args: &[&str]) {
252        let output = std::process::Command::new("git")
253            .args(args)
254            .current_dir(repo)
255            .env_remove("GIT_DIR")
256            .env_remove("GIT_WORK_TREE")
257            .output()
258            .expect("git should run");
259        assert!(
260            output.status.success(),
261            "git command failed: git {}\nstdout:\n{}\nstderr:\n{}",
262            args.join(" "),
263            String::from_utf8_lossy(&output.stdout),
264            String::from_utf8_lossy(&output.stderr)
265        );
266    }
267
268    fn init_git_repo(repo: &Path) {
269        run_git(repo, &["init"]);
270        run_git(repo, &["config", "user.email", "test@example.com"]);
271        run_git(repo, &["config", "user.name", "Test User"]);
272        run_git(repo, &["config", "commit.gpgsign", "false"]);
273        std::fs::write(repo.join("README.md"), "hi\n").expect("write readme");
274        run_git(repo, &["add", "README.md"]);
275        run_git(repo, &["commit", "-m", "initial"]);
276    }
277
278    fn make_event(id: &str) -> AuditEvent {
279        AuditEvent {
280            v: SCHEMA_VERSION,
281            ts: "2026-02-28T10:00:00.000Z".to_string(),
282            entity: "task".to_string(),
283            entity_id: id.to_string(),
284            scope: Some("test-change".to_string()),
285            op: "create".to_string(),
286            from: None,
287            to: Some("pending".to_string()),
288            actor: "cli".to_string(),
289            by: "@test".to_string(),
290            meta: None,
291            ctx: EventContext {
292                session_id: "test-sid".to_string(),
293                harness_session_id: None,
294                branch: None,
295                worktree: None,
296                commit: None,
297            },
298        }
299    }
300
301    /// A fake ingest client that records calls and returns configurable results.
302    struct FakeIngestClient {
303        call_count: AtomicUsize,
304        results: Mutex<Vec<Result<EventIngestResult, BackendError>>>,
305    }
306
307    impl FakeIngestClient {
308        fn always_ok() -> Self {
309            Self {
310                call_count: AtomicUsize::new(0),
311                results: Mutex::new(Vec::new()),
312            }
313        }
314
315        fn with_results(results: Vec<Result<EventIngestResult, BackendError>>) -> Self {
316            Self {
317                call_count: AtomicUsize::new(0),
318                results: Mutex::new(results),
319            }
320        }
321
322        fn calls(&self) -> usize {
323            self.call_count.load(Ordering::SeqCst)
324        }
325    }
326
327    impl BackendEventIngestClient for FakeIngestClient {
328        fn ingest(&self, batch: &EventBatch) -> Result<EventIngestResult, BackendError> {
329            let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
330            let results = self.results.lock().unwrap();
331            if idx < results.len() {
332                return results[idx].clone();
333            }
334            // Default: accept all
335            Ok(EventIngestResult {
336                accepted: batch.events.len(),
337                duplicates: 0,
338            })
339        }
340    }
341
342    fn write_events_to_log(ito_path: &Path, events: &[AuditEvent]) {
343        let writer = crate::audit::default_audit_store(ito_path);
344        for event in events {
345            crate::audit::AuditWriter::append(writer.as_ref(), event).unwrap();
346        }
347    }
348
349    #[test]
350    fn forward_reads_events_from_routed_local_store() {
351        let tmp = tempfile::tempdir().unwrap();
352        init_git_repo(tmp.path());
353        let ito_path = tmp.path().join(".ito");
354        std::fs::create_dir_all(&ito_path).unwrap();
355
356        let store = crate::audit::default_audit_store(&ito_path);
357        crate::audit::AuditWriter::append(store.as_ref(), &make_event("1.1")).unwrap();
358
359        let client = FakeIngestClient::always_ok();
360        let config = ForwarderConfig::default();
361        let result = forward_events(&client, &ito_path, &config).unwrap();
362
363        assert_eq!(result.forwarded, 1);
364        assert_eq!(result.total_local, 1);
365        assert_eq!(client.calls(), 1);
366    }
367
368    #[test]
369    fn forward_no_events_returns_zero() {
370        let tmp = tempfile::tempdir().unwrap();
371        let ito_path = tmp.path().join(".ito");
372        let client = FakeIngestClient::always_ok();
373        let config = ForwarderConfig::default();
374
375        let result = forward_events(&client, &ito_path, &config).unwrap();
376        assert_eq!(result.forwarded, 0);
377        assert_eq!(result.total_local, 0);
378        assert_eq!(result.new_offset, 0);
379        assert_eq!(result.failed_batches, 0);
380        assert_eq!(client.calls(), 0);
381    }
382
383    #[test]
384    fn forward_sends_all_new_events() {
385        let tmp = tempfile::tempdir().unwrap();
386        let ito_path = tmp.path().join(".ito");
387
388        let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
389        write_events_to_log(&ito_path, &events);
390
391        let client = FakeIngestClient::always_ok();
392        let config = ForwarderConfig {
393            batch_size: 10,
394            ..ForwarderConfig::default()
395        };
396
397        let result = forward_events(&client, &ito_path, &config).unwrap();
398        assert_eq!(result.forwarded, 5);
399        assert_eq!(result.total_local, 5);
400        assert_eq!(result.new_offset, 5);
401        assert_eq!(result.failed_batches, 0);
402        assert_eq!(client.calls(), 1); // All in one batch
403    }
404
405    #[test]
406    fn forward_respects_checkpoint() {
407        let tmp = tempfile::tempdir().unwrap();
408        let ito_path = tmp.path().join(".ito");
409
410        let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
411        write_events_to_log(&ito_path, &events);
412
413        // Pre-set checkpoint at 3 (already forwarded 3 events)
414        write_checkpoint(&ito_path, 3).unwrap();
415
416        let client = FakeIngestClient::always_ok();
417        let config = ForwarderConfig::default();
418
419        let result = forward_events(&client, &ito_path, &config).unwrap();
420        assert_eq!(result.forwarded, 2); // Only events 3 and 4
421        assert_eq!(result.new_offset, 5);
422    }
423
424    #[test]
425    fn forward_skips_when_fully_forwarded() {
426        let tmp = tempfile::tempdir().unwrap();
427        let ito_path = tmp.path().join(".ito");
428
429        let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
430        write_events_to_log(&ito_path, &events);
431        write_checkpoint(&ito_path, 3).unwrap();
432
433        let client = FakeIngestClient::always_ok();
434        let config = ForwarderConfig::default();
435
436        let result = forward_events(&client, &ito_path, &config).unwrap();
437        assert_eq!(result.forwarded, 0);
438        assert_eq!(result.new_offset, 3);
439        assert_eq!(client.calls(), 0);
440    }
441
442    #[test]
443    fn forward_batches_correctly() {
444        let tmp = tempfile::tempdir().unwrap();
445        let ito_path = tmp.path().join(".ito");
446
447        let events: Vec<AuditEvent> = (0..7).map(|i| make_event(&format!("1.{i}"))).collect();
448        write_events_to_log(&ito_path, &events);
449
450        let client = FakeIngestClient::always_ok();
451        let config = ForwarderConfig {
452            batch_size: 3,
453            ..ForwarderConfig::default()
454        };
455
456        let result = forward_events(&client, &ito_path, &config).unwrap();
457        assert_eq!(result.forwarded, 7);
458        assert_eq!(client.calls(), 3); // 3 + 3 + 1
459    }
460
461    #[test]
462    fn forward_stops_on_permanent_failure() {
463        let tmp = tempfile::tempdir().unwrap();
464        let ito_path = tmp.path().join(".ito");
465
466        let events: Vec<AuditEvent> = (0..6).map(|i| make_event(&format!("1.{i}"))).collect();
467        write_events_to_log(&ito_path, &events);
468
469        let client = FakeIngestClient::with_results(vec![
470            Ok(EventIngestResult {
471                accepted: 3,
472                duplicates: 0,
473            }),
474            Err(BackendError::Unauthorized("bad token".to_string())),
475        ]);
476        let config = ForwarderConfig {
477            batch_size: 3,
478            max_retries: 0,
479            retry_base_delay: Duration::from_millis(1),
480        };
481
482        let result = forward_events(&client, &ito_path, &config).unwrap();
483        assert_eq!(result.forwarded, 3); // First batch succeeded
484        assert_eq!(result.failed_batches, 1);
485        assert_eq!(result.new_offset, 3); // Checkpoint at first batch end
486    }
487
488    #[test]
489    fn forward_retries_transient_failure() {
490        let tmp = tempfile::tempdir().unwrap();
491        let ito_path = tmp.path().join(".ito");
492
493        let events: Vec<AuditEvent> = (0..2).map(|i| make_event(&format!("1.{i}"))).collect();
494        write_events_to_log(&ito_path, &events);
495
496        let client = FakeIngestClient::with_results(vec![
497            Err(BackendError::Unavailable("timeout".to_string())),
498            Ok(EventIngestResult {
499                accepted: 2,
500                duplicates: 0,
501            }),
502        ]);
503        let config = ForwarderConfig {
504            batch_size: 10,
505            max_retries: 3,
506            retry_base_delay: Duration::from_millis(1),
507        };
508
509        let result = forward_events(&client, &ito_path, &config).unwrap();
510        assert_eq!(result.forwarded, 2);
511        assert_eq!(result.failed_batches, 0);
512        assert_eq!(client.calls(), 2); // 1 retry + 1 success
513    }
514
515    #[test]
516    fn forward_reports_duplicates() {
517        let tmp = tempfile::tempdir().unwrap();
518        let ito_path = tmp.path().join(".ito");
519
520        let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
521        write_events_to_log(&ito_path, &events);
522
523        let client = FakeIngestClient::with_results(vec![Ok(EventIngestResult {
524            accepted: 1,
525            duplicates: 2,
526        })]);
527        let config = ForwarderConfig::default();
528
529        let result = forward_events(&client, &ito_path, &config).unwrap();
530        assert_eq!(result.forwarded, 1);
531        assert_eq!(result.duplicates, 2);
532    }
533
534    #[test]
535    fn checkpoint_roundtrip() {
536        let tmp = tempfile::tempdir().unwrap();
537        let ito_path = tmp.path().join(".ito");
538
539        assert_eq!(read_checkpoint(&ito_path), 0);
540
541        write_checkpoint(&ito_path, 42).unwrap();
542        assert_eq!(read_checkpoint(&ito_path), 42);
543
544        write_checkpoint(&ito_path, 100).unwrap();
545        assert_eq!(read_checkpoint(&ito_path), 100);
546    }
547
548    #[test]
549    fn checkpoint_missing_returns_zero() {
550        let tmp = tempfile::tempdir().unwrap();
551        let ito_path = tmp.path().join(".ito");
552        assert_eq!(read_checkpoint(&ito_path), 0);
553    }
554
555    #[test]
556    fn is_retriable_backend_error_checks() {
557        assert!(is_retriable_backend_error(&BackendError::Unavailable(
558            "timeout".to_string()
559        )));
560        assert!(!is_retriable_backend_error(&BackendError::Unauthorized(
561            "bad".to_string()
562        )));
563        assert!(!is_retriable_backend_error(&BackendError::NotFound(
564            "nope".to_string()
565        )));
566    }
567
568    #[test]
569    fn forward_persists_checkpoint_per_batch() {
570        let tmp = tempfile::tempdir().unwrap();
571        let ito_path = tmp.path().join(".ito");
572
573        let events: Vec<AuditEvent> = (0..4).map(|i| make_event(&format!("1.{i}"))).collect();
574        write_events_to_log(&ito_path, &events);
575
576        let client = FakeIngestClient::always_ok();
577        let config = ForwarderConfig {
578            batch_size: 2,
579            ..ForwarderConfig::default()
580        };
581
582        forward_events(&client, &ito_path, &config).unwrap();
583        // Checkpoint should be at 4 after all batches
584        assert_eq!(read_checkpoint(&ito_path), 4);
585    }
586
587    #[test]
588    fn forward_result_equality() {
589        let a = ForwardResult {
590            forwarded: 5,
591            duplicates: 0,
592            failed_batches: 0,
593            total_local: 5,
594            new_offset: 5,
595        };
596        let b = a.clone();
597        assert_eq!(a, b);
598    }
599}