1use std::collections::VecDeque;
24use std::time::{Duration, Instant};
25
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(tag = "type", rename_all = "snake_case")]
37pub enum IndexEvent {
38 RunStarted {
39 total_items: usize,
40 namespace: String,
41 source_label: String,
42 parallelism: usize,
43 started_at: DateTime<Utc>,
44 },
45 ItemStarted {
46 item_index: usize,
47 label: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 size_bytes: Option<u64>,
50 },
51 ItemIndexed {
52 item_index: usize,
53 label: String,
54 chunks_indexed: usize,
55 duration_ms: u64,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 embedder_ms: Option<u64>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 tokens_estimated: Option<usize>,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 content_hash: Option<String>,
62 },
63 ItemSkipped {
64 item_index: usize,
65 label: String,
66 reason: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 content_hash: Option<String>,
69 },
70 ItemFailed {
71 item_index: usize,
72 label: String,
73 error: String,
74 },
75 StatsTick {
76 processed: usize,
77 indexed: usize,
78 skipped: usize,
79 failed: usize,
80 total: usize,
81 items_per_sec: f64,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 eta_secs: Option<f64>,
84 total_chunks: usize,
85 in_flight: usize,
86 },
87 RunCompleted {
88 processed: usize,
89 indexed: usize,
90 skipped: usize,
91 failed: usize,
92 total_chunks: usize,
93 elapsed: Duration,
94 stopped_early: bool,
95 },
96 RunFailed {
97 error: String,
98 processed_before_failure: usize,
99 },
100 Paused,
101 Resumed,
102 StopRequested,
103 ParallelismChanged {
104 previous: usize,
105 current: usize,
106 },
107 Warning {
108 code: String,
109 message: String,
110 },
111}
112
113pub trait EventSink: Send + Sync {
119 fn on_event(&self, event: &IndexEvent);
120}
121
122pub const MAX_RECENT_WARNINGS: usize = 20;
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
127pub struct WarningEntry {
128 pub code: String,
129 pub message: String,
130 pub at: DateTime<Utc>,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub struct IndexTelemetrySnapshot {
143 pub namespace: String,
144 pub source_label: String,
145 pub started_at: Option<DateTime<Utc>>,
146 pub total: usize,
147 pub processed: usize,
148 pub indexed: usize,
149 pub skipped: usize,
150 pub failed: usize,
151 pub total_chunks: usize,
152 pub current_item: Option<String>,
153 pub in_flight: usize,
154 pub parallelism: usize,
155 pub paused: bool,
156 pub stopping: bool,
157 pub items_per_sec: f64,
158 pub eta_secs: Option<f64>,
159 pub elapsed: Duration,
160 pub avg_embedder_ms: Option<f64>,
161 pub total_tokens_estimated: usize,
162 pub complete: bool,
163 pub stopped_early: bool,
164 pub fatal_error: Option<String>,
165 pub recent_warnings: VecDeque<WarningEntry>,
166}
167
168impl Default for IndexTelemetrySnapshot {
169 fn default() -> Self {
170 Self {
171 namespace: String::new(),
172 source_label: String::new(),
173 started_at: None,
174 total: 0,
175 processed: 0,
176 indexed: 0,
177 skipped: 0,
178 failed: 0,
179 total_chunks: 0,
180 current_item: None,
181 in_flight: 0,
182 parallelism: 1,
183 paused: false,
184 stopping: false,
185 items_per_sec: 0.0,
186 eta_secs: None,
187 elapsed: Duration::ZERO,
188 avg_embedder_ms: None,
189 total_tokens_estimated: 0,
190 complete: false,
191 stopped_early: false,
192 fatal_error: None,
193 recent_warnings: VecDeque::new(),
194 }
195 }
196}
197
198impl IndexTelemetrySnapshot {
199 pub fn apply(&mut self, event: &IndexEvent) {
210 match event {
211 IndexEvent::RunStarted {
212 total_items,
213 namespace,
214 source_label,
215 parallelism,
216 started_at,
217 } => {
218 self.namespace = namespace.clone();
219 self.source_label = source_label.clone();
220 self.total = *total_items;
221 self.parallelism = *parallelism;
222 self.started_at = Some(*started_at);
223 self.complete = false;
224 self.stopped_early = false;
225 self.fatal_error = None;
226 self.processed = 0;
227 self.indexed = 0;
228 self.skipped = 0;
229 self.failed = 0;
230 self.total_chunks = 0;
231 self.in_flight = 0;
232 self.paused = false;
233 self.stopping = false;
234 self.items_per_sec = 0.0;
235 self.eta_secs = None;
236 self.elapsed = Duration::ZERO;
237 self.avg_embedder_ms = None;
238 self.total_tokens_estimated = 0;
239 self.current_item = None;
240 self.recent_warnings.clear();
241 }
242 IndexEvent::ItemStarted { label, .. } => {
243 self.in_flight = self.in_flight.saturating_add(1);
244 self.current_item = Some(label.clone());
245 }
246 IndexEvent::ItemIndexed {
247 label,
248 chunks_indexed,
249 embedder_ms,
250 tokens_estimated,
251 ..
252 } => {
253 self.processed = self.processed.saturating_add(1);
254 self.indexed = self.indexed.saturating_add(1);
255 self.total_chunks = self.total_chunks.saturating_add(*chunks_indexed);
256 self.in_flight = self.in_flight.saturating_sub(1);
257 self.current_item = Some(label.clone());
258 if let Some(tokens) = tokens_estimated {
259 self.total_tokens_estimated =
260 self.total_tokens_estimated.saturating_add(*tokens);
261 }
262 if let Some(ms) = embedder_ms {
263 let sample = *ms as f64;
268 self.avg_embedder_ms = Some(match self.avg_embedder_ms {
269 Some(prev) => (prev + sample) / 2.0,
270 None => sample,
271 });
272 }
273 }
274 IndexEvent::ItemSkipped { label, .. } => {
275 self.processed = self.processed.saturating_add(1);
276 self.skipped = self.skipped.saturating_add(1);
277 self.in_flight = self.in_flight.saturating_sub(1);
278 self.current_item = Some(label.clone());
279 }
280 IndexEvent::ItemFailed { label, .. } => {
281 self.processed = self.processed.saturating_add(1);
282 self.failed = self.failed.saturating_add(1);
283 self.in_flight = self.in_flight.saturating_sub(1);
284 self.current_item = Some(label.clone());
285 }
286 IndexEvent::StatsTick {
287 processed,
288 indexed,
289 skipped,
290 failed,
291 total,
292 items_per_sec,
293 eta_secs,
294 total_chunks,
295 in_flight,
296 } => {
297 self.processed = *processed;
298 self.indexed = *indexed;
299 self.skipped = *skipped;
300 self.failed = *failed;
301 self.total = *total;
302 self.items_per_sec = *items_per_sec;
303 self.eta_secs = *eta_secs;
304 self.total_chunks = *total_chunks;
305 self.in_flight = *in_flight;
306 }
307 IndexEvent::RunCompleted {
308 processed,
309 indexed,
310 skipped,
311 failed,
312 total_chunks,
313 elapsed,
314 stopped_early,
315 } => {
316 self.processed = *processed;
317 self.indexed = *indexed;
318 self.skipped = *skipped;
319 self.failed = *failed;
320 self.total_chunks = *total_chunks;
321 self.elapsed = *elapsed;
322 self.stopped_early = *stopped_early;
323 self.complete = true;
324 self.in_flight = 0;
325 self.stopping = false;
326 }
327 IndexEvent::RunFailed {
328 error,
329 processed_before_failure,
330 } => {
331 self.fatal_error = Some(error.clone());
332 self.processed = *processed_before_failure;
333 self.complete = true;
334 self.in_flight = 0;
335 }
336 IndexEvent::Paused => {
337 self.paused = true;
338 }
339 IndexEvent::Resumed => {
340 self.paused = false;
341 }
342 IndexEvent::StopRequested => {
343 self.stopping = true;
344 }
345 IndexEvent::ParallelismChanged { current, .. } => {
346 self.parallelism = *current;
347 }
348 IndexEvent::Warning { code, message } => {
349 if self.recent_warnings.len() >= MAX_RECENT_WARNINGS {
350 self.recent_warnings.pop_front();
351 }
352 self.recent_warnings.push_back(WarningEntry {
353 code: code.clone(),
354 message: message.clone(),
355 at: Utc::now(),
356 });
357 }
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
370pub struct RollingRate {
371 window: VecDeque<(Instant, usize)>,
372 window_size: Duration,
373}
374
375impl RollingRate {
376 pub fn new(window_size: Duration) -> Self {
378 Self {
379 window: VecDeque::new(),
380 window_size,
381 }
382 }
383
384 pub fn record(&mut self, count: usize) {
386 let now = Instant::now();
387 self.window.push_back((now, count));
388 self.evict(now);
389 }
390
391 pub fn rate_per_sec(&self) -> f64 {
396 if self.window.is_empty() {
397 return 0.0;
398 }
399 let now = Instant::now();
400 let total: usize = self.window.iter().map(|(_, c)| *c).sum();
402 let oldest = self.window.front().map(|(t, _)| *t).unwrap_or(now);
403 let span = now.saturating_duration_since(oldest);
404 let secs = span.as_secs_f64();
405 if secs < 0.001 {
406 return 0.0;
407 }
408 total as f64 / secs
409 }
410
411 pub fn eta_secs(&self, remaining: usize) -> Option<f64> {
416 if remaining == 0 {
417 return Some(0.0);
418 }
419 let rate = self.rate_per_sec();
420 if rate <= 0.0 {
421 return None;
422 }
423 Some(remaining as f64 / rate)
424 }
425
426 fn evict(&mut self, now: Instant) {
427 let cutoff = now.checked_sub(self.window_size);
428 if let Some(cutoff) = cutoff {
429 while let Some((t, _)) = self.window.front() {
430 if *t < cutoff {
431 self.window.pop_front();
432 } else {
433 break;
434 }
435 }
436 }
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use std::thread::sleep;
444
445 fn sample_events() -> Vec<IndexEvent> {
446 vec![
447 IndexEvent::RunStarted {
448 total_items: 12,
449 namespace: "kb:test".to_string(),
450 source_label: "/tmp/input".to_string(),
451 parallelism: 4,
452 started_at: Utc::now(),
453 },
454 IndexEvent::ItemStarted {
455 item_index: 2,
456 label: "notes.md".to_string(),
457 size_bytes: Some(512),
458 },
459 IndexEvent::ItemIndexed {
460 item_index: 2,
461 label: "notes.md".to_string(),
462 chunks_indexed: 7,
463 duration_ms: 231,
464 embedder_ms: Some(187),
465 tokens_estimated: Some(128),
466 content_hash: Some("abc123".to_string()),
467 },
468 IndexEvent::ItemSkipped {
469 item_index: 3,
470 label: "binary.bin".to_string(),
471 reason: "unsupported".to_string(),
472 content_hash: None,
473 },
474 IndexEvent::ItemFailed {
475 item_index: 4,
476 label: "broken.md".to_string(),
477 error: "parse error".to_string(),
478 },
479 IndexEvent::StatsTick {
480 processed: 8,
481 indexed: 6,
482 skipped: 1,
483 failed: 1,
484 total: 12,
485 items_per_sec: 1.5,
486 eta_secs: Some(2.6),
487 total_chunks: 18,
488 in_flight: 2,
489 },
490 IndexEvent::Paused,
491 IndexEvent::Resumed,
492 IndexEvent::StopRequested,
493 IndexEvent::ParallelismChanged {
494 previous: 4,
495 current: 8,
496 },
497 IndexEvent::Warning {
498 code: "embedder_slow".to_string(),
499 message: "embedder over 5s".to_string(),
500 },
501 IndexEvent::RunCompleted {
502 processed: 12,
503 indexed: 9,
504 skipped: 2,
505 failed: 1,
506 total_chunks: 28,
507 elapsed: Duration::from_secs(12),
508 stopped_early: false,
509 },
510 IndexEvent::RunFailed {
511 error: "ollama oom".to_string(),
512 processed_before_failure: 5,
513 },
514 ]
515 }
516
517 #[test]
518 fn index_event_serde_roundtrip_all_variants() {
519 for event in sample_events() {
520 let json = serde_json::to_string(&event).expect("serialize");
521 let roundtrip: IndexEvent = serde_json::from_str(&json).expect("deserialize");
522 assert_eq!(roundtrip, event, "roundtrip mismatch for {:?}", event);
523 }
524 }
525
526 #[test]
527 fn snapshot_apply_increments_counters() {
528 let mut snap = IndexTelemetrySnapshot::default();
529 let started_at = Utc::now();
530 snap.apply(&IndexEvent::RunStarted {
531 total_items: 10,
532 namespace: "kb:a".into(),
533 source_label: "src".into(),
534 parallelism: 2,
535 started_at,
536 });
537 assert_eq!(snap.total, 10);
538 assert_eq!(snap.parallelism, 2);
539 assert_eq!(snap.namespace, "kb:a");
540 assert_eq!(snap.source_label, "src");
541 assert!(snap.started_at.is_some());
542
543 snap.apply(&IndexEvent::ItemStarted {
544 item_index: 0,
545 label: "a.md".into(),
546 size_bytes: Some(10),
547 });
548 assert_eq!(snap.in_flight, 1);
549 assert_eq!(snap.current_item.as_deref(), Some("a.md"));
550
551 snap.apply(&IndexEvent::ItemIndexed {
552 item_index: 0,
553 label: "a.md".into(),
554 chunks_indexed: 3,
555 duration_ms: 100,
556 embedder_ms: Some(60),
557 tokens_estimated: Some(50),
558 content_hash: None,
559 });
560 assert_eq!(snap.processed, 1);
561 assert_eq!(snap.indexed, 1);
562 assert_eq!(snap.total_chunks, 3);
563 assert_eq!(snap.in_flight, 0);
564 assert_eq!(snap.total_tokens_estimated, 50);
565 assert!(snap.avg_embedder_ms.is_some());
566
567 snap.apply(&IndexEvent::ItemSkipped {
568 item_index: 1,
569 label: "b.md".into(),
570 reason: "dup".into(),
571 content_hash: None,
572 });
573 assert_eq!(snap.skipped, 1);
574 assert_eq!(snap.processed, 2);
575
576 snap.apply(&IndexEvent::ItemFailed {
577 item_index: 2,
578 label: "c.md".into(),
579 error: "boom".into(),
580 });
581 assert_eq!(snap.failed, 1);
582 assert_eq!(snap.processed, 3);
583
584 snap.apply(&IndexEvent::StatsTick {
585 processed: 3,
586 indexed: 1,
587 skipped: 1,
588 failed: 1,
589 total: 10,
590 items_per_sec: 2.0,
591 eta_secs: Some(3.5),
592 total_chunks: 3,
593 in_flight: 0,
594 });
595 assert_eq!(snap.items_per_sec, 2.0);
596 assert_eq!(snap.eta_secs, Some(3.5));
597
598 snap.apply(&IndexEvent::Paused);
599 assert!(snap.paused);
600 snap.apply(&IndexEvent::Resumed);
601 assert!(!snap.paused);
602 snap.apply(&IndexEvent::StopRequested);
603 assert!(snap.stopping);
604 snap.apply(&IndexEvent::ParallelismChanged {
605 previous: 2,
606 current: 4,
607 });
608 assert_eq!(snap.parallelism, 4);
609 }
610
611 #[test]
612 fn warning_fold_respects_cap() {
613 let mut snap = IndexTelemetrySnapshot::default();
614 for i in 0..25 {
615 snap.apply(&IndexEvent::Warning {
616 code: format!("c{i}"),
617 message: format!("m{i}"),
618 });
619 }
620 assert_eq!(snap.recent_warnings.len(), MAX_RECENT_WARNINGS);
621 assert_eq!(snap.recent_warnings.front().unwrap().code, "c5");
623 assert_eq!(snap.recent_warnings.back().unwrap().code, "c24");
624 }
625
626 #[test]
627 fn run_completed_sets_complete_and_elapsed() {
628 let mut snap = IndexTelemetrySnapshot::default();
629 snap.apply(&IndexEvent::RunCompleted {
630 processed: 5,
631 indexed: 4,
632 skipped: 1,
633 failed: 0,
634 total_chunks: 12,
635 elapsed: Duration::from_secs(7),
636 stopped_early: false,
637 });
638 assert!(snap.complete);
639 assert_eq!(snap.elapsed, Duration::from_secs(7));
640 assert_eq!(snap.in_flight, 0);
641 }
642
643 #[test]
644 fn run_failed_sets_fatal_error() {
645 let mut snap = IndexTelemetrySnapshot::default();
646 snap.apply(&IndexEvent::RunFailed {
647 error: "ollama died".into(),
648 processed_before_failure: 3,
649 });
650 assert_eq!(snap.fatal_error.as_deref(), Some("ollama died"));
651 assert!(snap.complete);
652 assert_eq!(snap.processed, 3);
653 }
654
655 #[test]
656 fn rolling_rate_records_and_extrapolates() {
657 let mut rr = RollingRate::new(Duration::from_secs(2));
658 assert_eq!(rr.rate_per_sec(), 0.0);
660
661 rr.record(1);
662 sleep(Duration::from_millis(50));
663 rr.record(1);
664 sleep(Duration::from_millis(50));
665 rr.record(1);
666
667 let rate = rr.rate_per_sec();
668 assert!(rate > 5.0, "expected meaningful rate, got {rate}");
670 assert!(rate < 500.0, "expected sane rate, got {rate}");
671
672 let eta = rr.eta_secs(10).expect("eta with positive rate");
673 assert!(eta > 0.0);
674
675 assert_eq!(rr.eta_secs(0), Some(0.0));
677
678 let empty = RollingRate::new(Duration::from_secs(1));
680 assert!(empty.eta_secs(5).is_none());
681 }
682}