1use std::collections::VecDeque;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::watch;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12#[serde(tag = "type", rename_all = "snake_case")]
13pub enum IndexEvent {
14 RunStarted {
15 total_files: usize,
16 namespace: String,
17 source_dir: String,
18 parallelism: usize,
19 started_at: DateTime<Utc>,
20 },
21 FileStarted {
22 file_index: usize,
23 path: String,
24 size_bytes: u64,
25 },
26 FileIndexed {
27 file_index: usize,
28 path: String,
29 chunks_indexed: usize,
30 content_hash: String,
31 duration_ms: u64,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 embedder_ms: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 tokens_estimated: Option<usize>,
36 },
37 FileSkipped {
38 file_index: usize,
39 path: String,
40 reason: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 content_hash: Option<String>,
43 },
44 FileFailed {
45 file_index: usize,
46 path: String,
47 error: String,
48 },
49 StatsTick {
50 processed: usize,
51 indexed: usize,
52 skipped: usize,
53 failed: usize,
54 total: usize,
55 files_per_sec: f64,
56 eta_secs: Option<f64>,
57 total_chunks: usize,
58 in_flight: usize,
59 },
60 RunCompleted {
61 processed: usize,
62 indexed: usize,
63 skipped: usize,
64 failed: usize,
65 total_chunks: usize,
66 elapsed: Duration,
67 stopped_early: bool,
68 },
69 RunFailed {
70 error: String,
71 processed_before_failure: usize,
72 },
73 Paused,
74 Resumed,
75 ParallelismChanged {
76 previous: usize,
77 current: usize,
78 },
79 StopRequested,
80 Warning {
81 code: String,
82 message: String,
83 },
84}
85
86pub trait IndexEventSink: Send + Sync {
88 fn on_event(&self, event: &IndexEvent);
89}
90
91pub const MAX_RECENT_WARNINGS: usize = 20;
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub struct IndexTelemetrySnapshot {
97 pub namespace: String,
98 pub source_dir: String,
99 pub started_at: Option<DateTime<Utc>>,
100 pub total: usize,
101 pub processed: usize,
102 pub indexed: usize,
103 pub skipped: usize,
104 pub failed: usize,
105 pub total_chunks: usize,
106 pub current_file: Option<String>,
107 pub in_flight: usize,
108 pub parallelism: usize,
109 pub paused: bool,
110 pub stopping: bool,
111 pub files_per_sec: f64,
112 pub eta_secs: Option<f64>,
113 pub elapsed: Duration,
114 pub avg_embedder_ms: Option<f64>,
115 pub total_tokens_estimated: usize,
116 pub complete: bool,
117 pub stopped_early: bool,
118 pub fatal_error: Option<String>,
119 pub recent_warnings: VecDeque<WarningEntry>,
120}
121
122impl Default for IndexTelemetrySnapshot {
123 fn default() -> Self {
124 Self {
125 namespace: String::new(),
126 source_dir: String::new(),
127 started_at: None,
128 total: 0,
129 processed: 0,
130 indexed: 0,
131 skipped: 0,
132 failed: 0,
133 total_chunks: 0,
134 current_file: None,
135 in_flight: 0,
136 parallelism: 1,
137 paused: false,
138 stopping: false,
139 files_per_sec: 0.0,
140 eta_secs: None,
141 elapsed: Duration::ZERO,
142 avg_embedder_ms: None,
143 total_tokens_estimated: 0,
144 complete: false,
145 stopped_early: false,
146 fatal_error: None,
147 recent_warnings: VecDeque::new(),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
154pub struct WarningEntry {
155 pub code: String,
156 pub message: String,
157 pub at: DateTime<Utc>,
158}
159
160pub type SharedIndexTelemetry = watch::Sender<IndexTelemetrySnapshot>;
165
166pub fn new_index_telemetry() -> (
168 SharedIndexTelemetry,
169 watch::Receiver<IndexTelemetrySnapshot>,
170) {
171 watch::channel(IndexTelemetrySnapshot::default())
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176#[serde(tag = "type", rename_all = "snake_case")]
177pub enum IndexControl {
178 Pause,
179 Resume,
180 SetParallelism(usize),
181 Stop,
182}
183
184pub const INDEX_CONTROL_CHANNEL_CAPACITY: usize = 16;
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn index_event_serde_roundtrip_representative_variants() {
193 let events = vec![
194 IndexEvent::RunStarted {
195 total_files: 12,
196 namespace: "kb:test".to_string(),
197 source_dir: "/tmp/input".to_string(),
198 parallelism: 4,
199 started_at: Utc::now(),
200 },
201 IndexEvent::FileStarted {
202 file_index: 2,
203 path: "notes.md".to_string(),
204 size_bytes: 512,
205 },
206 IndexEvent::FileIndexed {
207 file_index: 2,
208 path: "notes.md".to_string(),
209 chunks_indexed: 7,
210 content_hash: "abc123".to_string(),
211 duration_ms: 231,
212 embedder_ms: Some(187),
213 tokens_estimated: Some(128),
214 },
215 IndexEvent::StatsTick {
216 processed: 8,
217 indexed: 6,
218 skipped: 1,
219 failed: 1,
220 total: 12,
221 files_per_sec: 1.5,
222 eta_secs: Some(2.6),
223 total_chunks: 18,
224 in_flight: 2,
225 },
226 IndexEvent::RunCompleted {
227 processed: 12,
228 indexed: 9,
229 skipped: 2,
230 failed: 1,
231 total_chunks: 28,
232 elapsed: Duration::from_secs(12),
233 stopped_early: false,
234 },
235 ];
236
237 for event in events {
238 let json = serde_json::to_string(&event).expect("serialize event");
239 let roundtrip: IndexEvent = serde_json::from_str(&json).expect("deserialize event");
240 assert_eq!(roundtrip, event);
241 }
242 }
243}