Skip to main content

aicx_progress_contracts/
lib.rs

1//! Rich event contracts for aicx pipeline progress.
2//!
3//! Vibecrafted. with AI Agents by VetCoders (c)2024-2026 LibraxisAI
4//!
5//! This crate is the canonical source of truth for the indexing/embedding
6//! progress event stream emitted by aicx pipelines. It is intentionally a
7//! pure-types crate (no async, no I/O, no UI deps) so that:
8//!
9//! - Producers (the `aicx index` scheduler, embedders, parsers) can emit
10//!   into a single sink trait without dragging UI deps.
11//! - Consumers (FanOut, IndicatifSink, future TUI, JSON-line exporter,
12//!   future SSE bridge) can subscribe to the same canonical stream.
13//! - The on-wire representation (serde JSON) is stable across crates and
14//!   binaries — agents and IPC peers can decode without linking the
15//!   producer crate.
16//!
17//! The shape is salvaged from rust-memex's `tui::indexer::contracts`
18//! (`IndexEvent` + `IndexTelemetrySnapshot` + `IndexEventSink`) and
19//! generalized to aicx's "item" vocabulary (entries / chunks / embeddings
20//! depending on phase) rather than the file-only assumption rust-memex
21//! makes.
22
23use std::collections::VecDeque;
24use std::time::{Duration, Instant};
25
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28
29/// Events emitted by the aicx indexing / embedding pipeline.
30///
31/// Items in aicx are intentionally generic: depending on the pipeline phase
32/// an "item" may be a source entry (file/document), a chunk, or an
33/// embedding batch. The event consumer interprets `label` as the
34/// human-readable identifier for that item.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(tag = "type", rename_all = "snake_case")]
37pub enum IndexEvent {
38    RunStarted {
39        total_items: usize,
40        namespace: String,
41        source_label: String,
42        parallelism: usize,
43        started_at: DateTime<Utc>,
44    },
45    ItemStarted {
46        item_index: usize,
47        label: String,
48        #[serde(skip_serializing_if = "Option::is_none")]
49        size_bytes: Option<u64>,
50    },
51    ItemIndexed {
52        item_index: usize,
53        label: String,
54        chunks_indexed: usize,
55        duration_ms: u64,
56        #[serde(skip_serializing_if = "Option::is_none")]
57        embedder_ms: Option<u64>,
58        #[serde(skip_serializing_if = "Option::is_none")]
59        tokens_estimated: Option<usize>,
60        #[serde(skip_serializing_if = "Option::is_none")]
61        content_hash: Option<String>,
62    },
63    ItemSkipped {
64        item_index: usize,
65        label: String,
66        reason: String,
67        #[serde(skip_serializing_if = "Option::is_none")]
68        content_hash: Option<String>,
69    },
70    ItemFailed {
71        item_index: usize,
72        label: String,
73        error: String,
74    },
75    StatsTick {
76        processed: usize,
77        indexed: usize,
78        skipped: usize,
79        failed: usize,
80        total: usize,
81        items_per_sec: f64,
82        #[serde(skip_serializing_if = "Option::is_none")]
83        eta_secs: Option<f64>,
84        total_chunks: usize,
85        in_flight: usize,
86    },
87    RunCompleted {
88        processed: usize,
89        indexed: usize,
90        skipped: usize,
91        failed: usize,
92        total_chunks: usize,
93        elapsed: Duration,
94        stopped_early: bool,
95    },
96    RunFailed {
97        error: String,
98        processed_before_failure: usize,
99    },
100    Paused,
101    Resumed,
102    StopRequested,
103    ParallelismChanged {
104        previous: usize,
105        current: usize,
106    },
107    Warning {
108        code: String,
109        message: String,
110    },
111}
112
113/// Event sinks must stay synchronous and infallible.
114///
115/// Producers call `on_event` from the hot path; sinks that need async
116/// (network, file I/O) must internally buffer or fan-out to a worker
117/// rather than blocking the producer.
118pub trait EventSink: Send + Sync {
119    fn on_event(&self, event: &IndexEvent);
120}
121
122/// Maximum number of recent warnings retained in the rolling snapshot.
123pub const MAX_RECENT_WARNINGS: usize = 20;
124
125/// Warning displayed in the dashboard / surfaced through the snapshot.
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
127pub struct WarningEntry {
128    pub code: String,
129    pub message: String,
130    pub at: DateTime<Utc>,
131}
132
133/// Pull-friendly indexing telemetry snapshot.
134///
135/// The snapshot is the folded-up state of the event stream — anything a
136/// dashboard, status command, or external observer needs to render the
137/// current run without replaying history. Producers fold events into
138/// this struct via [`IndexTelemetrySnapshot::apply`] and publish the
139/// updated snapshot however they wish (watch channel, broadcast, JSON
140/// file, etc.).
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub struct IndexTelemetrySnapshot {
143    pub namespace: String,
144    pub source_label: String,
145    pub started_at: Option<DateTime<Utc>>,
146    pub total: usize,
147    pub processed: usize,
148    pub indexed: usize,
149    pub skipped: usize,
150    pub failed: usize,
151    pub total_chunks: usize,
152    pub current_item: Option<String>,
153    pub in_flight: usize,
154    pub parallelism: usize,
155    pub paused: bool,
156    pub stopping: bool,
157    pub items_per_sec: f64,
158    pub eta_secs: Option<f64>,
159    pub elapsed: Duration,
160    pub avg_embedder_ms: Option<f64>,
161    pub total_tokens_estimated: usize,
162    pub complete: bool,
163    pub stopped_early: bool,
164    pub fatal_error: Option<String>,
165    pub recent_warnings: VecDeque<WarningEntry>,
166}
167
168impl Default for IndexTelemetrySnapshot {
169    fn default() -> Self {
170        Self {
171            namespace: String::new(),
172            source_label: String::new(),
173            started_at: None,
174            total: 0,
175            processed: 0,
176            indexed: 0,
177            skipped: 0,
178            failed: 0,
179            total_chunks: 0,
180            current_item: None,
181            in_flight: 0,
182            parallelism: 1,
183            paused: false,
184            stopping: false,
185            items_per_sec: 0.0,
186            eta_secs: None,
187            elapsed: Duration::ZERO,
188            avg_embedder_ms: None,
189            total_tokens_estimated: 0,
190            complete: false,
191            stopped_early: false,
192            fatal_error: None,
193            recent_warnings: VecDeque::new(),
194        }
195    }
196}
197
198impl IndexTelemetrySnapshot {
199    /// Fold an event into the running snapshot.
200    ///
201    /// Counters are advanced based on event variant. `items_per_sec` and
202    /// `eta_secs` are sourced from `StatsTick` events — the producer is
203    /// responsible for measuring rate (see [`RollingRate`]) and emitting
204    /// ticks; this method does not infer rate from `ItemIndexed` alone.
205    ///
206    /// Warnings are pushed onto `recent_warnings` with a hard cap of
207    /// [`MAX_RECENT_WARNINGS`] — oldest entries are dropped from the
208    /// front when capacity is exceeded.
209    pub fn apply(&mut self, event: &IndexEvent) {
210        match event {
211            IndexEvent::RunStarted {
212                total_items,
213                namespace,
214                source_label,
215                parallelism,
216                started_at,
217            } => {
218                self.namespace = namespace.clone();
219                self.source_label = source_label.clone();
220                self.total = *total_items;
221                self.parallelism = *parallelism;
222                self.started_at = Some(*started_at);
223                self.complete = false;
224                self.stopped_early = false;
225                self.fatal_error = None;
226                self.processed = 0;
227                self.indexed = 0;
228                self.skipped = 0;
229                self.failed = 0;
230                self.total_chunks = 0;
231                self.in_flight = 0;
232                self.paused = false;
233                self.stopping = false;
234                self.items_per_sec = 0.0;
235                self.eta_secs = None;
236                self.elapsed = Duration::ZERO;
237                self.avg_embedder_ms = None;
238                self.total_tokens_estimated = 0;
239                self.current_item = None;
240                self.recent_warnings.clear();
241            }
242            IndexEvent::ItemStarted { label, .. } => {
243                self.in_flight = self.in_flight.saturating_add(1);
244                self.current_item = Some(label.clone());
245            }
246            IndexEvent::ItemIndexed {
247                label,
248                chunks_indexed,
249                embedder_ms,
250                tokens_estimated,
251                ..
252            } => {
253                self.processed = self.processed.saturating_add(1);
254                self.indexed = self.indexed.saturating_add(1);
255                self.total_chunks = self.total_chunks.saturating_add(*chunks_indexed);
256                self.in_flight = self.in_flight.saturating_sub(1);
257                self.current_item = Some(label.clone());
258                if let Some(tokens) = tokens_estimated {
259                    self.total_tokens_estimated =
260                        self.total_tokens_estimated.saturating_add(*tokens);
261                }
262                if let Some(ms) = embedder_ms {
263                    // Running average: weight new sample equally with prior
264                    // running mean (we don't track sample count separately
265                    // because the snapshot is a coarse dashboard — high
266                    // fidelity is the producer's job if needed).
267                    let sample = *ms as f64;
268                    self.avg_embedder_ms = Some(match self.avg_embedder_ms {
269                        Some(prev) => (prev + sample) / 2.0,
270                        None => sample,
271                    });
272                }
273            }
274            IndexEvent::ItemSkipped { label, .. } => {
275                self.processed = self.processed.saturating_add(1);
276                self.skipped = self.skipped.saturating_add(1);
277                self.in_flight = self.in_flight.saturating_sub(1);
278                self.current_item = Some(label.clone());
279            }
280            IndexEvent::ItemFailed { label, .. } => {
281                self.processed = self.processed.saturating_add(1);
282                self.failed = self.failed.saturating_add(1);
283                self.in_flight = self.in_flight.saturating_sub(1);
284                self.current_item = Some(label.clone());
285            }
286            IndexEvent::StatsTick {
287                processed,
288                indexed,
289                skipped,
290                failed,
291                total,
292                items_per_sec,
293                eta_secs,
294                total_chunks,
295                in_flight,
296            } => {
297                self.processed = *processed;
298                self.indexed = *indexed;
299                self.skipped = *skipped;
300                self.failed = *failed;
301                self.total = *total;
302                self.items_per_sec = *items_per_sec;
303                self.eta_secs = *eta_secs;
304                self.total_chunks = *total_chunks;
305                self.in_flight = *in_flight;
306            }
307            IndexEvent::RunCompleted {
308                processed,
309                indexed,
310                skipped,
311                failed,
312                total_chunks,
313                elapsed,
314                stopped_early,
315            } => {
316                self.processed = *processed;
317                self.indexed = *indexed;
318                self.skipped = *skipped;
319                self.failed = *failed;
320                self.total_chunks = *total_chunks;
321                self.elapsed = *elapsed;
322                self.stopped_early = *stopped_early;
323                self.complete = true;
324                self.in_flight = 0;
325                self.stopping = false;
326            }
327            IndexEvent::RunFailed {
328                error,
329                processed_before_failure,
330            } => {
331                self.fatal_error = Some(error.clone());
332                self.processed = *processed_before_failure;
333                self.complete = true;
334                self.in_flight = 0;
335            }
336            IndexEvent::Paused => {
337                self.paused = true;
338            }
339            IndexEvent::Resumed => {
340                self.paused = false;
341            }
342            IndexEvent::StopRequested => {
343                self.stopping = true;
344            }
345            IndexEvent::ParallelismChanged { current, .. } => {
346                self.parallelism = *current;
347            }
348            IndexEvent::Warning { code, message } => {
349                if self.recent_warnings.len() >= MAX_RECENT_WARNINGS {
350                    self.recent_warnings.pop_front();
351                }
352                self.recent_warnings.push_back(WarningEntry {
353                    code: code.clone(),
354                    message: message.clone(),
355                    at: Utc::now(),
356                });
357            }
358        }
359    }
360}
361
362/// Rolling-window rate tracker.
363///
364/// The producer records completion counts (typically 1 per `ItemIndexed`
365/// / `ItemSkipped` / `ItemFailed`) and queries [`Self::rate_per_sec`]
366/// or [`Self::eta_secs`] when emitting a [`IndexEvent::StatsTick`]. The
367/// window is purely time-based: samples older than `window_size` are
368/// evicted on every observation.
369#[derive(Debug, Clone)]
370pub struct RollingRate {
371    window: VecDeque<(Instant, usize)>,
372    window_size: Duration,
373}
374
375impl RollingRate {
376    /// Create a new tracker with the given rolling window.
377    pub fn new(window_size: Duration) -> Self {
378        Self {
379            window: VecDeque::new(),
380            window_size,
381        }
382    }
383
384    /// Record `count` completions at the current instant.
385    pub fn record(&mut self, count: usize) {
386        let now = Instant::now();
387        self.window.push_back((now, count));
388        self.evict(now);
389    }
390
391    /// Current rate in items/sec over the rolling window.
392    ///
393    /// Returns 0.0 if there is fewer than two observations or the window
394    /// spans less than 1ms (avoids division-by-near-zero blow-ups).
395    pub fn rate_per_sec(&self) -> f64 {
396        if self.window.is_empty() {
397            return 0.0;
398        }
399        let now = Instant::now();
400        // Compute total counts in window and the time span.
401        let total: usize = self.window.iter().map(|(_, c)| *c).sum();
402        let oldest = self.window.front().map(|(t, _)| *t).unwrap_or(now);
403        let span = now.saturating_duration_since(oldest);
404        let secs = span.as_secs_f64();
405        if secs < 0.001 {
406            return 0.0;
407        }
408        total as f64 / secs
409    }
410
411    /// ETA in seconds for `remaining` items at the current rate.
412    ///
413    /// Returns `None` if the rate is zero (cannot extrapolate) or
414    /// `remaining` is zero (nothing to wait for).
415    pub fn eta_secs(&self, remaining: usize) -> Option<f64> {
416        if remaining == 0 {
417            return Some(0.0);
418        }
419        let rate = self.rate_per_sec();
420        if rate <= 0.0 {
421            return None;
422        }
423        Some(remaining as f64 / rate)
424    }
425
426    fn evict(&mut self, now: Instant) {
427        let cutoff = now.checked_sub(self.window_size);
428        if let Some(cutoff) = cutoff {
429            while let Some((t, _)) = self.window.front() {
430                if *t < cutoff {
431                    self.window.pop_front();
432                } else {
433                    break;
434                }
435            }
436        }
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use std::thread::sleep;
444
445    fn sample_events() -> Vec<IndexEvent> {
446        vec![
447            IndexEvent::RunStarted {
448                total_items: 12,
449                namespace: "kb:test".to_string(),
450                source_label: "/tmp/input".to_string(),
451                parallelism: 4,
452                started_at: Utc::now(),
453            },
454            IndexEvent::ItemStarted {
455                item_index: 2,
456                label: "notes.md".to_string(),
457                size_bytes: Some(512),
458            },
459            IndexEvent::ItemIndexed {
460                item_index: 2,
461                label: "notes.md".to_string(),
462                chunks_indexed: 7,
463                duration_ms: 231,
464                embedder_ms: Some(187),
465                tokens_estimated: Some(128),
466                content_hash: Some("abc123".to_string()),
467            },
468            IndexEvent::ItemSkipped {
469                item_index: 3,
470                label: "binary.bin".to_string(),
471                reason: "unsupported".to_string(),
472                content_hash: None,
473            },
474            IndexEvent::ItemFailed {
475                item_index: 4,
476                label: "broken.md".to_string(),
477                error: "parse error".to_string(),
478            },
479            IndexEvent::StatsTick {
480                processed: 8,
481                indexed: 6,
482                skipped: 1,
483                failed: 1,
484                total: 12,
485                items_per_sec: 1.5,
486                eta_secs: Some(2.6),
487                total_chunks: 18,
488                in_flight: 2,
489            },
490            IndexEvent::Paused,
491            IndexEvent::Resumed,
492            IndexEvent::StopRequested,
493            IndexEvent::ParallelismChanged {
494                previous: 4,
495                current: 8,
496            },
497            IndexEvent::Warning {
498                code: "embedder_slow".to_string(),
499                message: "embedder over 5s".to_string(),
500            },
501            IndexEvent::RunCompleted {
502                processed: 12,
503                indexed: 9,
504                skipped: 2,
505                failed: 1,
506                total_chunks: 28,
507                elapsed: Duration::from_secs(12),
508                stopped_early: false,
509            },
510            IndexEvent::RunFailed {
511                error: "ollama oom".to_string(),
512                processed_before_failure: 5,
513            },
514        ]
515    }
516
517    #[test]
518    fn index_event_serde_roundtrip_all_variants() {
519        for event in sample_events() {
520            let json = serde_json::to_string(&event).expect("serialize");
521            let roundtrip: IndexEvent = serde_json::from_str(&json).expect("deserialize");
522            assert_eq!(roundtrip, event, "roundtrip mismatch for {:?}", event);
523        }
524    }
525
526    #[test]
527    fn snapshot_apply_increments_counters() {
528        let mut snap = IndexTelemetrySnapshot::default();
529        let started_at = Utc::now();
530        snap.apply(&IndexEvent::RunStarted {
531            total_items: 10,
532            namespace: "kb:a".into(),
533            source_label: "src".into(),
534            parallelism: 2,
535            started_at,
536        });
537        assert_eq!(snap.total, 10);
538        assert_eq!(snap.parallelism, 2);
539        assert_eq!(snap.namespace, "kb:a");
540        assert_eq!(snap.source_label, "src");
541        assert!(snap.started_at.is_some());
542
543        snap.apply(&IndexEvent::ItemStarted {
544            item_index: 0,
545            label: "a.md".into(),
546            size_bytes: Some(10),
547        });
548        assert_eq!(snap.in_flight, 1);
549        assert_eq!(snap.current_item.as_deref(), Some("a.md"));
550
551        snap.apply(&IndexEvent::ItemIndexed {
552            item_index: 0,
553            label: "a.md".into(),
554            chunks_indexed: 3,
555            duration_ms: 100,
556            embedder_ms: Some(60),
557            tokens_estimated: Some(50),
558            content_hash: None,
559        });
560        assert_eq!(snap.processed, 1);
561        assert_eq!(snap.indexed, 1);
562        assert_eq!(snap.total_chunks, 3);
563        assert_eq!(snap.in_flight, 0);
564        assert_eq!(snap.total_tokens_estimated, 50);
565        assert!(snap.avg_embedder_ms.is_some());
566
567        snap.apply(&IndexEvent::ItemSkipped {
568            item_index: 1,
569            label: "b.md".into(),
570            reason: "dup".into(),
571            content_hash: None,
572        });
573        assert_eq!(snap.skipped, 1);
574        assert_eq!(snap.processed, 2);
575
576        snap.apply(&IndexEvent::ItemFailed {
577            item_index: 2,
578            label: "c.md".into(),
579            error: "boom".into(),
580        });
581        assert_eq!(snap.failed, 1);
582        assert_eq!(snap.processed, 3);
583
584        snap.apply(&IndexEvent::StatsTick {
585            processed: 3,
586            indexed: 1,
587            skipped: 1,
588            failed: 1,
589            total: 10,
590            items_per_sec: 2.0,
591            eta_secs: Some(3.5),
592            total_chunks: 3,
593            in_flight: 0,
594        });
595        assert_eq!(snap.items_per_sec, 2.0);
596        assert_eq!(snap.eta_secs, Some(3.5));
597
598        snap.apply(&IndexEvent::Paused);
599        assert!(snap.paused);
600        snap.apply(&IndexEvent::Resumed);
601        assert!(!snap.paused);
602        snap.apply(&IndexEvent::StopRequested);
603        assert!(snap.stopping);
604        snap.apply(&IndexEvent::ParallelismChanged {
605            previous: 2,
606            current: 4,
607        });
608        assert_eq!(snap.parallelism, 4);
609    }
610
611    #[test]
612    fn warning_fold_respects_cap() {
613        let mut snap = IndexTelemetrySnapshot::default();
614        for i in 0..25 {
615            snap.apply(&IndexEvent::Warning {
616                code: format!("c{i}"),
617                message: format!("m{i}"),
618            });
619        }
620        assert_eq!(snap.recent_warnings.len(), MAX_RECENT_WARNINGS);
621        // Oldest 5 should have been evicted; the front should be c5.
622        assert_eq!(snap.recent_warnings.front().unwrap().code, "c5");
623        assert_eq!(snap.recent_warnings.back().unwrap().code, "c24");
624    }
625
626    #[test]
627    fn run_completed_sets_complete_and_elapsed() {
628        let mut snap = IndexTelemetrySnapshot::default();
629        snap.apply(&IndexEvent::RunCompleted {
630            processed: 5,
631            indexed: 4,
632            skipped: 1,
633            failed: 0,
634            total_chunks: 12,
635            elapsed: Duration::from_secs(7),
636            stopped_early: false,
637        });
638        assert!(snap.complete);
639        assert_eq!(snap.elapsed, Duration::from_secs(7));
640        assert_eq!(snap.in_flight, 0);
641    }
642
643    #[test]
644    fn run_failed_sets_fatal_error() {
645        let mut snap = IndexTelemetrySnapshot::default();
646        snap.apply(&IndexEvent::RunFailed {
647            error: "ollama died".into(),
648            processed_before_failure: 3,
649        });
650        assert_eq!(snap.fatal_error.as_deref(), Some("ollama died"));
651        assert!(snap.complete);
652        assert_eq!(snap.processed, 3);
653    }
654
655    #[test]
656    fn rolling_rate_records_and_extrapolates() {
657        let mut rr = RollingRate::new(Duration::from_secs(2));
658        // Empty window → rate 0.0
659        assert_eq!(rr.rate_per_sec(), 0.0);
660
661        rr.record(1);
662        sleep(Duration::from_millis(50));
663        rr.record(1);
664        sleep(Duration::from_millis(50));
665        rr.record(1);
666
667        let rate = rr.rate_per_sec();
668        // ~3 items over ~100ms → ~30 items/sec. Loose bounds for CI jitter.
669        assert!(rate > 5.0, "expected meaningful rate, got {rate}");
670        assert!(rate < 500.0, "expected sane rate, got {rate}");
671
672        let eta = rr.eta_secs(10).expect("eta with positive rate");
673        assert!(eta > 0.0);
674
675        // Zero remaining → 0.0
676        assert_eq!(rr.eta_secs(0), Some(0.0));
677
678        // Empty tracker after construction → eta None for nonzero remaining
679        let empty = RollingRate::new(Duration::from_secs(1));
680        assert!(empty.eta_secs(5).is_none());
681    }
682}