1use std::collections::VecDeque;
4use std::sync::{Arc, Mutex as StdMutex};
5use std::time::Instant;
6
7use chrono::Utc;
8
9use super::contracts::{
10 IndexEvent, IndexEventSink, IndexTelemetrySnapshot, MAX_RECENT_WARNINGS, SharedIndexTelemetry,
11 WarningEntry,
12};
13
14const RATE_WINDOW_SIZE: usize = 50;
15
16#[derive(Debug, Clone)]
17struct CompletionSample {
18 completed_at: Instant,
19 embedder_ms: Option<u64>,
20}
21
22pub struct TuiTelemetrySink {
24 sender: Arc<SharedIndexTelemetry>,
25 completion_window: StdMutex<VecDeque<CompletionSample>>,
26 run_started_at: StdMutex<Option<Instant>>,
27}
28
29impl TuiTelemetrySink {
30 pub fn new(sender: Arc<SharedIndexTelemetry>) -> Self {
31 Self {
32 sender,
33 completion_window: StdMutex::new(VecDeque::with_capacity(RATE_WINDOW_SIZE)),
34 run_started_at: StdMutex::new(None),
35 }
36 }
37
38 fn completion_window_rate(window: &VecDeque<CompletionSample>) -> f64 {
39 if window.len() < 2 {
40 return 0.0;
41 }
42
43 let oldest = window.front().map(|entry| entry.completed_at);
44 let newest = window.back().map(|entry| entry.completed_at);
45 match (oldest, newest) {
46 (Some(oldest), Some(newest)) => {
47 let seconds = newest.duration_since(oldest).as_secs_f64();
48 if seconds <= f64::EPSILON {
49 0.0
50 } else {
51 (window.len() - 1) as f64 / seconds
52 }
53 }
54 _ => 0.0,
55 }
56 }
57
58 fn average_embedder_ms(window: &VecDeque<CompletionSample>) -> Option<f64> {
59 let mut total = 0_u64;
60 let mut count = 0_u64;
61
62 for sample in window {
63 if let Some(embedder_ms) = sample.embedder_ms {
64 total += embedder_ms;
65 count += 1;
66 }
67 }
68
69 if count == 0 {
70 None
71 } else {
72 Some(total as f64 / count as f64)
73 }
74 }
75
76 fn touch_elapsed(&self, snapshot: &mut IndexTelemetrySnapshot) {
77 let run_started_at = self
78 .run_started_at
79 .lock()
80 .unwrap_or_else(|poisoned| poisoned.into_inner());
81 if let Some(started_at) = *run_started_at {
82 snapshot.elapsed = started_at.elapsed();
83 }
84 }
85
86 fn recompute_rates(&self, snapshot: &mut IndexTelemetrySnapshot) {
87 let window = self
88 .completion_window
89 .lock()
90 .unwrap_or_else(|poisoned| poisoned.into_inner());
91 let rate = Self::completion_window_rate(&window);
92 snapshot.files_per_sec = rate;
93 snapshot.avg_embedder_ms = Self::average_embedder_ms(&window);
94 if rate > f64::EPSILON {
95 let remaining = snapshot.total.saturating_sub(snapshot.processed);
96 snapshot.eta_secs = Some(remaining as f64 / rate);
97 } else {
98 snapshot.eta_secs = None;
99 }
100 }
101
102 fn push_completion(&self, embedder_ms: Option<u64>) {
103 let mut window = self
104 .completion_window
105 .lock()
106 .unwrap_or_else(|poisoned| poisoned.into_inner());
107 if window.len() == RATE_WINDOW_SIZE {
108 window.pop_front();
109 }
110 window.push_back(CompletionSample {
111 completed_at: Instant::now(),
112 embedder_ms,
113 });
114 }
115
116 fn send_snapshot(&self, snapshot: IndexTelemetrySnapshot) {
117 let _ = self.sender.send(snapshot);
118 }
119}
120
121impl IndexEventSink for TuiTelemetrySink {
122 fn on_event(&self, event: &IndexEvent) {
123 let mut snapshot = self.sender.borrow().clone();
124
125 match event {
126 IndexEvent::RunStarted {
127 total_files,
128 namespace,
129 source_dir,
130 parallelism,
131 started_at,
132 } => {
133 snapshot = IndexTelemetrySnapshot::default();
134 snapshot.total = *total_files;
135 snapshot.namespace = namespace.clone();
136 snapshot.source_dir = source_dir.clone();
137 snapshot.parallelism = *parallelism;
138 snapshot.started_at = Some(*started_at);
139 *self
140 .run_started_at
141 .lock()
142 .unwrap_or_else(|poisoned| poisoned.into_inner()) = Some(Instant::now());
143 self.completion_window
144 .lock()
145 .unwrap_or_else(|poisoned| poisoned.into_inner())
146 .clear();
147 }
148 IndexEvent::FileStarted { path, .. } => {
149 snapshot.current_file = Some(path.clone());
150 snapshot.in_flight += 1;
151 self.touch_elapsed(&mut snapshot);
152 }
153 IndexEvent::FileIndexed {
154 chunks_indexed,
155 embedder_ms,
156 tokens_estimated,
157 ..
158 } => {
159 snapshot.processed += 1;
160 snapshot.indexed += 1;
161 snapshot.total_chunks += chunks_indexed;
162 snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
163 snapshot.stopping = false;
164 if let Some(tokens_estimated) = tokens_estimated {
165 snapshot.total_tokens_estimated += tokens_estimated;
166 }
167 if snapshot.in_flight == 0 {
168 snapshot.current_file = None;
169 }
170 self.push_completion(*embedder_ms);
171 self.touch_elapsed(&mut snapshot);
172 self.recompute_rates(&mut snapshot);
173 }
174 IndexEvent::FileSkipped { .. } => {
175 snapshot.processed += 1;
176 snapshot.skipped += 1;
177 snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
178 snapshot.stopping = false;
179 if snapshot.in_flight == 0 {
180 snapshot.current_file = None;
181 }
182 self.push_completion(None);
183 self.touch_elapsed(&mut snapshot);
184 self.recompute_rates(&mut snapshot);
185 }
186 IndexEvent::FileFailed { .. } => {
187 snapshot.processed += 1;
188 snapshot.failed += 1;
189 snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
190 snapshot.stopping = false;
191 if snapshot.in_flight == 0 {
192 snapshot.current_file = None;
193 }
194 self.push_completion(None);
195 self.touch_elapsed(&mut snapshot);
196 self.recompute_rates(&mut snapshot);
197 }
198 IndexEvent::StatsTick {
199 processed,
200 indexed,
201 skipped,
202 failed,
203 total,
204 total_chunks,
205 in_flight,
206 ..
207 } => {
208 snapshot.processed = *processed;
209 snapshot.indexed = *indexed;
210 snapshot.skipped = *skipped;
211 snapshot.failed = *failed;
212 snapshot.total = *total;
213 snapshot.total_chunks = *total_chunks;
214 snapshot.in_flight = *in_flight;
215 self.touch_elapsed(&mut snapshot);
216 self.recompute_rates(&mut snapshot);
217 }
218 IndexEvent::RunCompleted {
219 processed,
220 indexed,
221 skipped,
222 failed,
223 total_chunks,
224 elapsed,
225 stopped_early,
226 } => {
227 snapshot.processed = *processed;
228 snapshot.indexed = *indexed;
229 snapshot.skipped = *skipped;
230 snapshot.failed = *failed;
231 snapshot.total_chunks = *total_chunks;
232 snapshot.elapsed = *elapsed;
233 snapshot.in_flight = 0;
234 snapshot.current_file = None;
235 snapshot.complete = true;
236 snapshot.paused = false;
237 snapshot.stopping = false;
238 snapshot.stopped_early = *stopped_early;
239 self.recompute_rates(&mut snapshot);
240 }
241 IndexEvent::RunFailed {
242 error,
243 processed_before_failure,
244 } => {
245 snapshot.processed = *processed_before_failure;
246 snapshot.complete = true;
247 snapshot.fatal_error = Some(error.clone());
248 snapshot.current_file = None;
249 snapshot.in_flight = 0;
250 snapshot.paused = false;
251 snapshot.stopping = false;
252 self.touch_elapsed(&mut snapshot);
253 }
254 IndexEvent::Paused => {
255 snapshot.paused = true;
256 self.touch_elapsed(&mut snapshot);
257 }
258 IndexEvent::Resumed => {
259 snapshot.paused = false;
260 self.touch_elapsed(&mut snapshot);
261 }
262 IndexEvent::ParallelismChanged { current, .. } => {
263 snapshot.parallelism = *current;
264 self.touch_elapsed(&mut snapshot);
265 }
266 IndexEvent::StopRequested => {
267 snapshot.stopping = true;
268 snapshot.paused = false;
269 self.touch_elapsed(&mut snapshot);
270 }
271 IndexEvent::Warning { code, message } => {
272 if snapshot.recent_warnings.len() == MAX_RECENT_WARNINGS {
273 snapshot.recent_warnings.pop_front();
274 }
275 snapshot.recent_warnings.push_back(WarningEntry {
276 code: code.clone(),
277 message: message.clone(),
278 at: Utc::now(),
279 });
280 self.touch_elapsed(&mut snapshot);
281 }
282 }
283
284 self.send_snapshot(snapshot);
285 }
286}
287
288pub struct TracingSink;
290
291impl IndexEventSink for TracingSink {
292 fn on_event(&self, event: &IndexEvent) {
293 match event {
294 IndexEvent::RunStarted {
295 total_files,
296 namespace,
297 source_dir,
298 parallelism,
299 ..
300 } => tracing::info!(
301 total_files,
302 namespace,
303 source_dir,
304 parallelism,
305 "indexing run started"
306 ),
307 IndexEvent::FileStarted {
308 file_index, path, ..
309 } => tracing::debug!(file_index, path, "file indexing started"),
310 IndexEvent::FileIndexed {
311 file_index,
312 path,
313 chunks_indexed,
314 duration_ms,
315 ..
316 } => tracing::info!(
317 file_index,
318 path,
319 chunks_indexed,
320 duration_ms,
321 "file indexed"
322 ),
323 IndexEvent::FileSkipped {
324 file_index,
325 path,
326 reason,
327 ..
328 } => tracing::debug!(file_index, path, reason, "file skipped"),
329 IndexEvent::FileFailed {
330 file_index,
331 path,
332 error,
333 } => tracing::warn!(file_index, path, error, "file failed"),
334 IndexEvent::StatsTick {
335 processed,
336 total,
337 files_per_sec,
338 in_flight,
339 ..
340 } => tracing::debug!(
341 processed,
342 total,
343 files_per_sec,
344 in_flight,
345 "index stats tick"
346 ),
347 IndexEvent::RunCompleted {
348 processed,
349 indexed,
350 skipped,
351 failed,
352 total_chunks,
353 stopped_early,
354 elapsed,
355 } => tracing::info!(
356 processed,
357 indexed,
358 skipped,
359 failed,
360 total_chunks,
361 stopped_early,
362 elapsed_secs = elapsed.as_secs_f64(),
363 "indexing run completed"
364 ),
365 IndexEvent::RunFailed { error, .. } => {
366 tracing::error!(error, "indexing run failed");
367 }
368 IndexEvent::Paused => tracing::info!("indexing paused"),
369 IndexEvent::Resumed => tracing::info!("indexing resumed"),
370 IndexEvent::ParallelismChanged { previous, current } => {
371 tracing::info!(previous, current, "indexing parallelism changed");
372 }
373 IndexEvent::StopRequested => tracing::info!("indexing stop requested"),
374 IndexEvent::Warning { code, message } => {
375 tracing::warn!(code, message, "indexing warning");
376 }
377 }
378 }
379}
380
381pub struct FanOut {
383 sinks: Vec<Arc<dyn IndexEventSink>>,
384}
385
386impl FanOut {
387 pub fn new(sinks: Vec<Arc<dyn IndexEventSink>>) -> Self {
388 Self { sinks }
389 }
390}
391
392impl IndexEventSink for FanOut {
393 fn on_event(&self, event: &IndexEvent) {
394 for sink in &self.sinks {
395 sink.on_event(event);
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::tui::indexer::contracts::{IndexEvent, new_index_telemetry};
404
405 #[test]
406 fn tui_telemetry_sink_tracks_completion_state() {
407 let (sender, receiver) = new_index_telemetry();
408 let sink = TuiTelemetrySink::new(Arc::new(sender));
409
410 sink.on_event(&IndexEvent::RunStarted {
411 total_files: 4,
412 namespace: "kb:test".to_string(),
413 source_dir: "/tmp/docs".to_string(),
414 parallelism: 2,
415 started_at: Utc::now(),
416 });
417 sink.on_event(&IndexEvent::FileStarted {
418 file_index: 0,
419 path: "a.md".to_string(),
420 size_bytes: 10,
421 });
422 sink.on_event(&IndexEvent::FileIndexed {
423 file_index: 0,
424 path: "a.md".to_string(),
425 chunks_indexed: 3,
426 content_hash: "aaa".to_string(),
427 duration_ms: 10,
428 embedder_ms: Some(7),
429 tokens_estimated: Some(20),
430 });
431 sink.on_event(&IndexEvent::FileStarted {
432 file_index: 1,
433 path: "b.md".to_string(),
434 size_bytes: 11,
435 });
436 sink.on_event(&IndexEvent::FileFailed {
437 file_index: 1,
438 path: "b.md".to_string(),
439 error: "boom".to_string(),
440 });
441 sink.on_event(&IndexEvent::RunCompleted {
442 processed: 2,
443 indexed: 1,
444 skipped: 0,
445 failed: 1,
446 total_chunks: 3,
447 elapsed: std::time::Duration::from_secs(1),
448 stopped_early: false,
449 });
450
451 let snapshot = receiver.borrow().clone();
452 assert_eq!(snapshot.processed, 2);
453 assert_eq!(snapshot.failed, 1);
454 assert_eq!(snapshot.indexed, 1);
455 assert_eq!(snapshot.total, 4);
456 assert_eq!(snapshot.total_chunks, 3);
457 assert!(snapshot.complete);
458 assert!(!snapshot.stopped_early);
459 }
460}