use std::collections::VecDeque;
use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum IndexEvent {
RunStarted {
total_files: usize,
namespace: String,
source_dir: String,
parallelism: usize,
started_at: DateTime<Utc>,
},
FileStarted {
file_index: usize,
path: String,
size_bytes: u64,
},
FileIndexed {
file_index: usize,
path: String,
chunks_indexed: usize,
content_hash: String,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
embedder_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
tokens_estimated: Option<usize>,
},
FileSkipped {
file_index: usize,
path: String,
reason: String,
#[serde(skip_serializing_if = "Option::is_none")]
content_hash: Option<String>,
},
FileFailed {
file_index: usize,
path: String,
error: String,
},
StatsTick {
processed: usize,
indexed: usize,
skipped: usize,
failed: usize,
total: usize,
files_per_sec: f64,
eta_secs: Option<f64>,
total_chunks: usize,
in_flight: usize,
},
RunCompleted {
processed: usize,
indexed: usize,
skipped: usize,
failed: usize,
total_chunks: usize,
elapsed: Duration,
stopped_early: bool,
},
RunFailed {
error: String,
processed_before_failure: usize,
},
Paused,
Resumed,
ParallelismChanged {
previous: usize,
current: usize,
},
StopRequested,
Warning {
code: String,
message: String,
},
}
pub trait IndexEventSink: Send + Sync {
fn on_event(&self, event: &IndexEvent);
}
pub const MAX_RECENT_WARNINGS: usize = 20;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IndexTelemetrySnapshot {
pub namespace: String,
pub source_dir: String,
pub started_at: Option<DateTime<Utc>>,
pub total: usize,
pub processed: usize,
pub indexed: usize,
pub skipped: usize,
pub failed: usize,
pub total_chunks: usize,
pub current_file: Option<String>,
pub in_flight: usize,
pub parallelism: usize,
pub paused: bool,
pub stopping: bool,
pub files_per_sec: f64,
pub eta_secs: Option<f64>,
pub elapsed: Duration,
pub avg_embedder_ms: Option<f64>,
pub total_tokens_estimated: usize,
pub complete: bool,
pub stopped_early: bool,
pub fatal_error: Option<String>,
pub recent_warnings: VecDeque<WarningEntry>,
}
impl Default for IndexTelemetrySnapshot {
fn default() -> Self {
Self {
namespace: String::new(),
source_dir: String::new(),
started_at: None,
total: 0,
processed: 0,
indexed: 0,
skipped: 0,
failed: 0,
total_chunks: 0,
current_file: None,
in_flight: 0,
parallelism: 1,
paused: false,
stopping: false,
files_per_sec: 0.0,
eta_secs: None,
elapsed: Duration::ZERO,
avg_embedder_ms: None,
total_tokens_estimated: 0,
complete: false,
stopped_early: false,
fatal_error: None,
recent_warnings: VecDeque::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct WarningEntry {
pub code: String,
pub message: String,
pub at: DateTime<Utc>,
}
pub type SharedIndexTelemetry = watch::Sender<IndexTelemetrySnapshot>;
pub fn new_index_telemetry() -> (
SharedIndexTelemetry,
watch::Receiver<IndexTelemetrySnapshot>,
) {
watch::channel(IndexTelemetrySnapshot::default())
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum IndexControl {
Pause,
Resume,
SetParallelism(usize),
Stop,
}
pub const INDEX_CONTROL_CHANNEL_CAPACITY: usize = 16;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn index_event_serde_roundtrip_representative_variants() {
let events = vec![
IndexEvent::RunStarted {
total_files: 12,
namespace: "kb:test".to_string(),
source_dir: "/tmp/input".to_string(),
parallelism: 4,
started_at: Utc::now(),
},
IndexEvent::FileStarted {
file_index: 2,
path: "notes.md".to_string(),
size_bytes: 512,
},
IndexEvent::FileIndexed {
file_index: 2,
path: "notes.md".to_string(),
chunks_indexed: 7,
content_hash: "abc123".to_string(),
duration_ms: 231,
embedder_ms: Some(187),
tokens_estimated: Some(128),
},
IndexEvent::StatsTick {
processed: 8,
indexed: 6,
skipped: 1,
failed: 1,
total: 12,
files_per_sec: 1.5,
eta_secs: Some(2.6),
total_chunks: 18,
in_flight: 2,
},
IndexEvent::RunCompleted {
processed: 12,
indexed: 9,
skipped: 2,
failed: 1,
total_chunks: 28,
elapsed: Duration::from_secs(12),
stopped_early: false,
},
];
for event in events {
let json = serde_json::to_string(&event).expect("serialize event");
let roundtrip: IndexEvent = serde_json::from_str(&json).expect("deserialize event");
assert_eq!(roundtrip, event);
}
}
}