Skip to main content

ito_core/
event_forwarder.rs

1//! Client-side forwarding of locally produced audit events to the backend.
2//!
3//! The forwarder reads new events from the local JSONL audit log, batches
4//! them, and submits each batch to the backend event ingest endpoint with
5//! an idempotency key so retries are safe. A checkpoint file under
6//! `.ito/.state/` tracks the last forwarded line offset to avoid
7//! re-sending the entire log on each invocation.
8
9use std::path::Path;
10use std::thread;
11use std::time::Duration;
12
13use ito_domain::audit::event::AuditEvent;
14use ito_domain::backend::{BackendError, BackendEventIngestClient, EventBatch};
15
16use crate::backend_client::{idempotency_key, is_retriable_status};
17use crate::errors::{CoreError, CoreResult};
18
19/// Maximum events per batch submission.
20const DEFAULT_BATCH_SIZE: usize = 100;
21
22/// Checkpoint file name within `.ito/.state/`.
23const CHECKPOINT_FILE: &str = "event-forward-offset";
24
25/// Result of a forwarding run, used for CLI diagnostics.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ForwardResult {
28    /// Total events forwarded in this run.
29    pub forwarded: usize,
30    /// Total events skipped as duplicates by the backend.
31    pub duplicates: usize,
32    /// Number of batches that failed after exhausting retries.
33    pub failed_batches: usize,
34    /// Total events in the local log.
35    pub total_local: usize,
36    /// Offset after forwarding (line count forwarded so far).
37    pub new_offset: usize,
38}
39
40/// Configuration for the event forwarder.
41#[derive(Debug, Clone)]
42pub struct ForwarderConfig {
43    /// Maximum events per batch.
44    pub batch_size: usize,
45    /// Maximum retry attempts per batch on transient failure.
46    pub max_retries: u32,
47    /// Base delay between retries (doubles on each attempt).
48    pub retry_base_delay: Duration,
49}
50
51impl Default for ForwarderConfig {
52    fn default() -> Self {
53        Self {
54            batch_size: DEFAULT_BATCH_SIZE,
55            max_retries: 3,
56            retry_base_delay: Duration::from_millis(500),
57        }
58    }
59}
60
61/// Forward new local audit events to the backend.
62///
63/// Reads the audit log, skips events already forwarded (tracked by offset),
64/// batches new events, and submits each batch with an idempotency key.
65/// Updates the checkpoint file on success.
66///
67/// This function is designed to be called best-effort from the CLI after
68/// command completion in backend mode.
69pub fn forward_events(
70    ingest_client: &dyn BackendEventIngestClient,
71    ito_path: &Path,
72    config: &ForwarderConfig,
73) -> CoreResult<ForwardResult> {
74    let all_events = read_all_events(ito_path);
75    let total_local = all_events.len();
76
77    let mut current_offset = read_checkpoint(ito_path);
78    if current_offset > total_local {
79        current_offset = total_local;
80    }
81
82    if current_offset >= total_local {
83        return Ok(ForwardResult {
84            forwarded: 0,
85            duplicates: 0,
86            failed_batches: 0,
87            total_local,
88            new_offset: current_offset,
89        });
90    }
91
92    let batch_size = config.batch_size.max(1);
93    let new_events = &all_events[current_offset..];
94    let mut forwarded = 0usize;
95    let mut duplicates = 0usize;
96    let mut failed_batches = 0usize;
97    let mut offset = current_offset;
98
99    for chunk in new_events.chunks(batch_size) {
100        let batch = EventBatch {
101            events: chunk.to_vec(),
102            idempotency_key: idempotency_key("event-forward"),
103        };
104
105        match submit_with_retry(ingest_client, &batch, config) {
106            Ok(result) => {
107                forwarded += result.accepted;
108                duplicates += result.duplicates;
109                offset += chunk.len();
110                // Persist offset after each successful batch
111                if let Err(e) = write_checkpoint(ito_path, offset) {
112                    tracing::warn!("failed to write forwarding checkpoint: {e}");
113                }
114            }
115            Err(e) => {
116                tracing::warn!("event forwarding batch failed: {e}");
117                failed_batches += 1;
118                // Stop forwarding on failure to preserve ordering
119                break;
120            }
121        }
122    }
123
124    Ok(ForwardResult {
125        forwarded,
126        duplicates,
127        failed_batches,
128        total_local,
129        new_offset: offset,
130    })
131}
132
133/// Submit a batch with bounded retries on transient failures.
134fn submit_with_retry(
135    client: &dyn BackendEventIngestClient,
136    batch: &EventBatch,
137    config: &ForwarderConfig,
138) -> CoreResult<ito_domain::backend::EventIngestResult> {
139    let mut attempts = 0u32;
140    loop {
141        match client.ingest(batch) {
142            Ok(result) => return Ok(result),
143            Err(err) => {
144                attempts += 1;
145                if !is_retriable_backend_error(&err) || attempts > config.max_retries {
146                    return Err(backend_ingest_error_to_core(err));
147                }
148                // Exponential backoff
149                let delay = config.retry_base_delay * 2u32.saturating_pow(attempts - 1);
150                thread::sleep(delay);
151            }
152        }
153    }
154}
155
156/// Check if a backend error is transient and worth retrying.
157fn is_retriable_backend_error(err: &BackendError) -> bool {
158    match err {
159        BackendError::Unavailable(_) => true,
160        BackendError::Other(msg) => {
161            // Check for HTTP status codes embedded in the message
162            if let Some(code_str) = msg.strip_prefix("HTTP ")
163                && let Ok(code) = code_str
164                    .chars()
165                    .take_while(|c| c.is_ascii_digit())
166                    .collect::<String>()
167                    .parse::<u16>()
168            {
169                return is_retriable_status(code);
170            }
171            false
172        }
173        BackendError::Unauthorized(_) => false,
174        BackendError::NotFound(_) => false,
175        BackendError::LeaseConflict(_) => false,
176        BackendError::RevisionConflict(_) => false,
177    }
178}
179
180/// Convert a backend ingest error to a `CoreError`.
181fn backend_ingest_error_to_core(err: BackendError) -> CoreError {
182    match err {
183        BackendError::Unavailable(msg) => CoreError::process(format!(
184            "Backend unavailable during event forwarding: {msg}"
185        )),
186        BackendError::Unauthorized(msg) => CoreError::validation(format!(
187            "Backend auth failed during event forwarding: {msg}"
188        )),
189        BackendError::NotFound(msg) => {
190            CoreError::not_found(format!("Backend ingest endpoint not found: {msg}"))
191        }
192        BackendError::Other(msg) => {
193            CoreError::process(format!("Backend error during event forwarding: {msg}"))
194        }
195        BackendError::LeaseConflict(c) => CoreError::process(format!(
196            "Unexpected lease conflict during event forwarding: {}",
197            c.change_id
198        )),
199        BackendError::RevisionConflict(c) => CoreError::process(format!(
200            "Unexpected revision conflict during event forwarding: {}",
201            c.change_id
202        )),
203    }
204}
205
206// ── Checkpoint persistence ─────────────────────────────────────────
207
208/// Path to the forwarding checkpoint file.
209fn checkpoint_path(ito_path: &Path) -> std::path::PathBuf {
210    ito_path.join(".state").join(CHECKPOINT_FILE)
211}
212
213/// Read the current forwarding offset from the checkpoint file.
214///
215/// Returns 0 if the file does not exist or is malformed.
216fn read_checkpoint(ito_path: &Path) -> usize {
217    let path = checkpoint_path(ito_path);
218    let Ok(content) = std::fs::read_to_string(&path) else {
219        return 0;
220    };
221    content.trim().parse::<usize>().unwrap_or(0)
222}
223
224/// Write the forwarding offset to the checkpoint file.
225fn write_checkpoint(ito_path: &Path, offset: usize) -> CoreResult<()> {
226    let path = checkpoint_path(ito_path);
227    if let Some(parent) = path.parent() {
228        std::fs::create_dir_all(parent)
229            .map_err(|e| CoreError::io("creating checkpoint directory", e))?;
230    }
231    std::fs::write(&path, offset.to_string())
232        .map_err(|e| CoreError::io("writing forwarding checkpoint", e))
233}
234
235// ── Event reading ──────────────────────────────────────────────────
236
237/// Read all audit events from the local JSONL log.
238///
239/// Returns an empty vec if the log does not exist. Malformed lines are
240/// skipped (same behavior as `audit::reader::read_audit_events`).
241fn read_all_events(ito_path: &Path) -> Vec<AuditEvent> {
242    crate::audit::read_audit_events(ito_path)
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use ito_domain::audit::event::{EventContext, SCHEMA_VERSION};
249    use ito_domain::backend::{BackendError, EventIngestResult};
250    use std::sync::Mutex;
251    use std::sync::atomic::{AtomicUsize, Ordering};
252
253    fn make_event(id: &str) -> AuditEvent {
254        AuditEvent {
255            v: SCHEMA_VERSION,
256            ts: "2026-02-28T10:00:00.000Z".to_string(),
257            entity: "task".to_string(),
258            entity_id: id.to_string(),
259            scope: Some("test-change".to_string()),
260            op: "create".to_string(),
261            from: None,
262            to: Some("pending".to_string()),
263            actor: "cli".to_string(),
264            by: "@test".to_string(),
265            meta: None,
266            ctx: EventContext {
267                session_id: "test-sid".to_string(),
268                harness_session_id: None,
269                branch: None,
270                worktree: None,
271                commit: None,
272            },
273        }
274    }
275
276    /// A fake ingest client that records calls and returns configurable results.
277    struct FakeIngestClient {
278        call_count: AtomicUsize,
279        results: Mutex<Vec<Result<EventIngestResult, BackendError>>>,
280    }
281
282    impl FakeIngestClient {
283        fn always_ok() -> Self {
284            Self {
285                call_count: AtomicUsize::new(0),
286                results: Mutex::new(Vec::new()),
287            }
288        }
289
290        fn with_results(results: Vec<Result<EventIngestResult, BackendError>>) -> Self {
291            Self {
292                call_count: AtomicUsize::new(0),
293                results: Mutex::new(results),
294            }
295        }
296
297        fn calls(&self) -> usize {
298            self.call_count.load(Ordering::SeqCst)
299        }
300    }
301
302    impl BackendEventIngestClient for FakeIngestClient {
303        fn ingest(&self, batch: &EventBatch) -> Result<EventIngestResult, BackendError> {
304            let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
305            let results = self.results.lock().unwrap();
306            if idx < results.len() {
307                return results[idx].clone();
308            }
309            // Default: accept all
310            Ok(EventIngestResult {
311                accepted: batch.events.len(),
312                duplicates: 0,
313            })
314        }
315    }
316
317    fn write_events_to_log(ito_path: &Path, events: &[AuditEvent]) {
318        let writer = crate::audit::FsAuditWriter::new(ito_path);
319        for event in events {
320            crate::audit::AuditWriter::append(&writer, event).unwrap();
321        }
322    }
323
324    #[test]
325    fn forward_no_events_returns_zero() {
326        let tmp = tempfile::tempdir().unwrap();
327        let ito_path = tmp.path().join(".ito");
328        let client = FakeIngestClient::always_ok();
329        let config = ForwarderConfig::default();
330
331        let result = forward_events(&client, &ito_path, &config).unwrap();
332        assert_eq!(result.forwarded, 0);
333        assert_eq!(result.total_local, 0);
334        assert_eq!(result.new_offset, 0);
335        assert_eq!(result.failed_batches, 0);
336        assert_eq!(client.calls(), 0);
337    }
338
339    #[test]
340    fn forward_sends_all_new_events() {
341        let tmp = tempfile::tempdir().unwrap();
342        let ito_path = tmp.path().join(".ito");
343
344        let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
345        write_events_to_log(&ito_path, &events);
346
347        let client = FakeIngestClient::always_ok();
348        let config = ForwarderConfig {
349            batch_size: 10,
350            ..ForwarderConfig::default()
351        };
352
353        let result = forward_events(&client, &ito_path, &config).unwrap();
354        assert_eq!(result.forwarded, 5);
355        assert_eq!(result.total_local, 5);
356        assert_eq!(result.new_offset, 5);
357        assert_eq!(result.failed_batches, 0);
358        assert_eq!(client.calls(), 1); // All in one batch
359    }
360
361    #[test]
362    fn forward_respects_checkpoint() {
363        let tmp = tempfile::tempdir().unwrap();
364        let ito_path = tmp.path().join(".ito");
365
366        let events: Vec<AuditEvent> = (0..5).map(|i| make_event(&format!("1.{i}"))).collect();
367        write_events_to_log(&ito_path, &events);
368
369        // Pre-set checkpoint at 3 (already forwarded 3 events)
370        write_checkpoint(&ito_path, 3).unwrap();
371
372        let client = FakeIngestClient::always_ok();
373        let config = ForwarderConfig::default();
374
375        let result = forward_events(&client, &ito_path, &config).unwrap();
376        assert_eq!(result.forwarded, 2); // Only events 3 and 4
377        assert_eq!(result.new_offset, 5);
378    }
379
380    #[test]
381    fn forward_skips_when_fully_forwarded() {
382        let tmp = tempfile::tempdir().unwrap();
383        let ito_path = tmp.path().join(".ito");
384
385        let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
386        write_events_to_log(&ito_path, &events);
387        write_checkpoint(&ito_path, 3).unwrap();
388
389        let client = FakeIngestClient::always_ok();
390        let config = ForwarderConfig::default();
391
392        let result = forward_events(&client, &ito_path, &config).unwrap();
393        assert_eq!(result.forwarded, 0);
394        assert_eq!(result.new_offset, 3);
395        assert_eq!(client.calls(), 0);
396    }
397
398    #[test]
399    fn forward_batches_correctly() {
400        let tmp = tempfile::tempdir().unwrap();
401        let ito_path = tmp.path().join(".ito");
402
403        let events: Vec<AuditEvent> = (0..7).map(|i| make_event(&format!("1.{i}"))).collect();
404        write_events_to_log(&ito_path, &events);
405
406        let client = FakeIngestClient::always_ok();
407        let config = ForwarderConfig {
408            batch_size: 3,
409            ..ForwarderConfig::default()
410        };
411
412        let result = forward_events(&client, &ito_path, &config).unwrap();
413        assert_eq!(result.forwarded, 7);
414        assert_eq!(client.calls(), 3); // 3 + 3 + 1
415    }
416
417    #[test]
418    fn forward_stops_on_permanent_failure() {
419        let tmp = tempfile::tempdir().unwrap();
420        let ito_path = tmp.path().join(".ito");
421
422        let events: Vec<AuditEvent> = (0..6).map(|i| make_event(&format!("1.{i}"))).collect();
423        write_events_to_log(&ito_path, &events);
424
425        let client = FakeIngestClient::with_results(vec![
426            Ok(EventIngestResult {
427                accepted: 3,
428                duplicates: 0,
429            }),
430            Err(BackendError::Unauthorized("bad token".to_string())),
431        ]);
432        let config = ForwarderConfig {
433            batch_size: 3,
434            max_retries: 0,
435            retry_base_delay: Duration::from_millis(1),
436        };
437
438        let result = forward_events(&client, &ito_path, &config).unwrap();
439        assert_eq!(result.forwarded, 3); // First batch succeeded
440        assert_eq!(result.failed_batches, 1);
441        assert_eq!(result.new_offset, 3); // Checkpoint at first batch end
442    }
443
444    #[test]
445    fn forward_retries_transient_failure() {
446        let tmp = tempfile::tempdir().unwrap();
447        let ito_path = tmp.path().join(".ito");
448
449        let events: Vec<AuditEvent> = (0..2).map(|i| make_event(&format!("1.{i}"))).collect();
450        write_events_to_log(&ito_path, &events);
451
452        let client = FakeIngestClient::with_results(vec![
453            Err(BackendError::Unavailable("timeout".to_string())),
454            Ok(EventIngestResult {
455                accepted: 2,
456                duplicates: 0,
457            }),
458        ]);
459        let config = ForwarderConfig {
460            batch_size: 10,
461            max_retries: 3,
462            retry_base_delay: Duration::from_millis(1),
463        };
464
465        let result = forward_events(&client, &ito_path, &config).unwrap();
466        assert_eq!(result.forwarded, 2);
467        assert_eq!(result.failed_batches, 0);
468        assert_eq!(client.calls(), 2); // 1 retry + 1 success
469    }
470
471    #[test]
472    fn forward_reports_duplicates() {
473        let tmp = tempfile::tempdir().unwrap();
474        let ito_path = tmp.path().join(".ito");
475
476        let events: Vec<AuditEvent> = (0..3).map(|i| make_event(&format!("1.{i}"))).collect();
477        write_events_to_log(&ito_path, &events);
478
479        let client = FakeIngestClient::with_results(vec![Ok(EventIngestResult {
480            accepted: 1,
481            duplicates: 2,
482        })]);
483        let config = ForwarderConfig::default();
484
485        let result = forward_events(&client, &ito_path, &config).unwrap();
486        assert_eq!(result.forwarded, 1);
487        assert_eq!(result.duplicates, 2);
488    }
489
490    #[test]
491    fn checkpoint_roundtrip() {
492        let tmp = tempfile::tempdir().unwrap();
493        let ito_path = tmp.path().join(".ito");
494
495        assert_eq!(read_checkpoint(&ito_path), 0);
496
497        write_checkpoint(&ito_path, 42).unwrap();
498        assert_eq!(read_checkpoint(&ito_path), 42);
499
500        write_checkpoint(&ito_path, 100).unwrap();
501        assert_eq!(read_checkpoint(&ito_path), 100);
502    }
503
504    #[test]
505    fn checkpoint_missing_returns_zero() {
506        let tmp = tempfile::tempdir().unwrap();
507        let ito_path = tmp.path().join(".ito");
508        assert_eq!(read_checkpoint(&ito_path), 0);
509    }
510
511    #[test]
512    fn is_retriable_backend_error_checks() {
513        assert!(is_retriable_backend_error(&BackendError::Unavailable(
514            "timeout".to_string()
515        )));
516        assert!(!is_retriable_backend_error(&BackendError::Unauthorized(
517            "bad".to_string()
518        )));
519        assert!(!is_retriable_backend_error(&BackendError::NotFound(
520            "nope".to_string()
521        )));
522    }
523
524    #[test]
525    fn forward_persists_checkpoint_per_batch() {
526        let tmp = tempfile::tempdir().unwrap();
527        let ito_path = tmp.path().join(".ito");
528
529        let events: Vec<AuditEvent> = (0..4).map(|i| make_event(&format!("1.{i}"))).collect();
530        write_events_to_log(&ito_path, &events);
531
532        let client = FakeIngestClient::always_ok();
533        let config = ForwarderConfig {
534            batch_size: 2,
535            ..ForwarderConfig::default()
536        };
537
538        forward_events(&client, &ito_path, &config).unwrap();
539        // Checkpoint should be at 4 after all batches
540        assert_eq!(read_checkpoint(&ito_path), 4);
541    }
542
543    #[test]
544    fn forward_result_equality() {
545        let a = ForwardResult {
546            forwarded: 5,
547            duplicates: 0,
548            failed_batches: 0,
549            total_local: 5,
550            new_offset: 5,
551        };
552        let b = a.clone();
553        assert_eq!(a, b);
554    }
555}