Skip to main content

rust_memex/tui/indexer/
sinks.rs

1//! Concrete index event sinks.
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Mutex as StdMutex};
5use std::time::Instant;
6
7use chrono::Utc;
8
9use super::contracts::{
10    IndexEvent, IndexEventSink, IndexTelemetrySnapshot, MAX_RECENT_WARNINGS, SharedIndexTelemetry,
11    WarningEntry,
12};
13
14const RATE_WINDOW_SIZE: usize = 50;
15
16#[derive(Debug, Clone)]
17struct CompletionSample {
18    completed_at: Instant,
19    embedder_ms: Option<u64>,
20}
21
22/// Updates the latest dashboard snapshot for the TUI.
23pub struct TuiTelemetrySink {
24    sender: Arc<SharedIndexTelemetry>,
25    completion_window: StdMutex<VecDeque<CompletionSample>>,
26    run_started_at: StdMutex<Option<Instant>>,
27}
28
29impl TuiTelemetrySink {
30    pub fn new(sender: Arc<SharedIndexTelemetry>) -> Self {
31        Self {
32            sender,
33            completion_window: StdMutex::new(VecDeque::with_capacity(RATE_WINDOW_SIZE)),
34            run_started_at: StdMutex::new(None),
35        }
36    }
37
38    fn completion_window_rate(window: &VecDeque<CompletionSample>) -> f64 {
39        if window.len() < 2 {
40            return 0.0;
41        }
42
43        let oldest = window.front().map(|entry| entry.completed_at);
44        let newest = window.back().map(|entry| entry.completed_at);
45        match (oldest, newest) {
46            (Some(oldest), Some(newest)) => {
47                let seconds = newest.duration_since(oldest).as_secs_f64();
48                if seconds <= f64::EPSILON {
49                    0.0
50                } else {
51                    (window.len() - 1) as f64 / seconds
52                }
53            }
54            _ => 0.0,
55        }
56    }
57
58    fn average_embedder_ms(window: &VecDeque<CompletionSample>) -> Option<f64> {
59        let mut total = 0_u64;
60        let mut count = 0_u64;
61
62        for sample in window {
63            if let Some(embedder_ms) = sample.embedder_ms {
64                total += embedder_ms;
65                count += 1;
66            }
67        }
68
69        if count == 0 {
70            None
71        } else {
72            Some(total as f64 / count as f64)
73        }
74    }
75
76    fn touch_elapsed(&self, snapshot: &mut IndexTelemetrySnapshot) {
77        let run_started_at = self
78            .run_started_at
79            .lock()
80            .unwrap_or_else(|poisoned| poisoned.into_inner());
81        if let Some(started_at) = *run_started_at {
82            snapshot.elapsed = started_at.elapsed();
83        }
84    }
85
86    fn recompute_rates(&self, snapshot: &mut IndexTelemetrySnapshot) {
87        let window = self
88            .completion_window
89            .lock()
90            .unwrap_or_else(|poisoned| poisoned.into_inner());
91        let rate = Self::completion_window_rate(&window);
92        snapshot.files_per_sec = rate;
93        snapshot.avg_embedder_ms = Self::average_embedder_ms(&window);
94        if rate > f64::EPSILON {
95            let remaining = snapshot.total.saturating_sub(snapshot.processed);
96            snapshot.eta_secs = Some(remaining as f64 / rate);
97        } else {
98            snapshot.eta_secs = None;
99        }
100    }
101
102    fn push_completion(&self, embedder_ms: Option<u64>) {
103        let mut window = self
104            .completion_window
105            .lock()
106            .unwrap_or_else(|poisoned| poisoned.into_inner());
107        if window.len() == RATE_WINDOW_SIZE {
108            window.pop_front();
109        }
110        window.push_back(CompletionSample {
111            completed_at: Instant::now(),
112            embedder_ms,
113        });
114    }
115
116    fn send_snapshot(&self, snapshot: IndexTelemetrySnapshot) {
117        let _ = self.sender.send(snapshot);
118    }
119}
120
121impl IndexEventSink for TuiTelemetrySink {
122    fn on_event(&self, event: &IndexEvent) {
123        let mut snapshot = self.sender.borrow().clone();
124
125        match event {
126            IndexEvent::RunStarted {
127                total_files,
128                namespace,
129                source_dir,
130                parallelism,
131                started_at,
132            } => {
133                snapshot = IndexTelemetrySnapshot::default();
134                snapshot.total = *total_files;
135                snapshot.namespace = namespace.clone();
136                snapshot.source_dir = source_dir.clone();
137                snapshot.parallelism = *parallelism;
138                snapshot.started_at = Some(*started_at);
139                *self
140                    .run_started_at
141                    .lock()
142                    .unwrap_or_else(|poisoned| poisoned.into_inner()) = Some(Instant::now());
143                self.completion_window
144                    .lock()
145                    .unwrap_or_else(|poisoned| poisoned.into_inner())
146                    .clear();
147            }
148            IndexEvent::FileStarted { path, .. } => {
149                snapshot.current_file = Some(path.clone());
150                snapshot.in_flight += 1;
151                self.touch_elapsed(&mut snapshot);
152            }
153            IndexEvent::FileIndexed {
154                chunks_indexed,
155                embedder_ms,
156                tokens_estimated,
157                ..
158            } => {
159                snapshot.processed += 1;
160                snapshot.indexed += 1;
161                snapshot.total_chunks += chunks_indexed;
162                snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
163                snapshot.stopping = false;
164                if let Some(tokens_estimated) = tokens_estimated {
165                    snapshot.total_tokens_estimated += tokens_estimated;
166                }
167                if snapshot.in_flight == 0 {
168                    snapshot.current_file = None;
169                }
170                self.push_completion(*embedder_ms);
171                self.touch_elapsed(&mut snapshot);
172                self.recompute_rates(&mut snapshot);
173            }
174            IndexEvent::FileSkipped { .. } => {
175                snapshot.processed += 1;
176                snapshot.skipped += 1;
177                snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
178                snapshot.stopping = false;
179                if snapshot.in_flight == 0 {
180                    snapshot.current_file = None;
181                }
182                self.push_completion(None);
183                self.touch_elapsed(&mut snapshot);
184                self.recompute_rates(&mut snapshot);
185            }
186            IndexEvent::FileFailed { .. } => {
187                snapshot.processed += 1;
188                snapshot.failed += 1;
189                snapshot.in_flight = snapshot.in_flight.saturating_sub(1);
190                snapshot.stopping = false;
191                if snapshot.in_flight == 0 {
192                    snapshot.current_file = None;
193                }
194                self.push_completion(None);
195                self.touch_elapsed(&mut snapshot);
196                self.recompute_rates(&mut snapshot);
197            }
198            IndexEvent::StatsTick {
199                processed,
200                indexed,
201                skipped,
202                failed,
203                total,
204                total_chunks,
205                in_flight,
206                ..
207            } => {
208                snapshot.processed = *processed;
209                snapshot.indexed = *indexed;
210                snapshot.skipped = *skipped;
211                snapshot.failed = *failed;
212                snapshot.total = *total;
213                snapshot.total_chunks = *total_chunks;
214                snapshot.in_flight = *in_flight;
215                self.touch_elapsed(&mut snapshot);
216                self.recompute_rates(&mut snapshot);
217            }
218            IndexEvent::RunCompleted {
219                processed,
220                indexed,
221                skipped,
222                failed,
223                total_chunks,
224                elapsed,
225                stopped_early,
226            } => {
227                snapshot.processed = *processed;
228                snapshot.indexed = *indexed;
229                snapshot.skipped = *skipped;
230                snapshot.failed = *failed;
231                snapshot.total_chunks = *total_chunks;
232                snapshot.elapsed = *elapsed;
233                snapshot.in_flight = 0;
234                snapshot.current_file = None;
235                snapshot.complete = true;
236                snapshot.paused = false;
237                snapshot.stopping = false;
238                snapshot.stopped_early = *stopped_early;
239                self.recompute_rates(&mut snapshot);
240            }
241            IndexEvent::RunFailed {
242                error,
243                processed_before_failure,
244            } => {
245                snapshot.processed = *processed_before_failure;
246                snapshot.complete = true;
247                snapshot.fatal_error = Some(error.clone());
248                snapshot.current_file = None;
249                snapshot.in_flight = 0;
250                snapshot.paused = false;
251                snapshot.stopping = false;
252                self.touch_elapsed(&mut snapshot);
253            }
254            IndexEvent::Paused => {
255                snapshot.paused = true;
256                self.touch_elapsed(&mut snapshot);
257            }
258            IndexEvent::Resumed => {
259                snapshot.paused = false;
260                self.touch_elapsed(&mut snapshot);
261            }
262            IndexEvent::ParallelismChanged { current, .. } => {
263                snapshot.parallelism = *current;
264                self.touch_elapsed(&mut snapshot);
265            }
266            IndexEvent::StopRequested => {
267                snapshot.stopping = true;
268                snapshot.paused = false;
269                self.touch_elapsed(&mut snapshot);
270            }
271            IndexEvent::Warning { code, message } => {
272                if snapshot.recent_warnings.len() == MAX_RECENT_WARNINGS {
273                    snapshot.recent_warnings.pop_front();
274                }
275                snapshot.recent_warnings.push_back(WarningEntry {
276                    code: code.clone(),
277                    message: message.clone(),
278                    at: Utc::now(),
279                });
280                self.touch_elapsed(&mut snapshot);
281            }
282        }
283
284        self.send_snapshot(snapshot);
285    }
286}
287
288/// Emits tracing logs for indexing events.
289pub struct TracingSink;
290
291impl IndexEventSink for TracingSink {
292    fn on_event(&self, event: &IndexEvent) {
293        match event {
294            IndexEvent::RunStarted {
295                total_files,
296                namespace,
297                source_dir,
298                parallelism,
299                ..
300            } => tracing::info!(
301                total_files,
302                namespace,
303                source_dir,
304                parallelism,
305                "indexing run started"
306            ),
307            IndexEvent::FileStarted {
308                file_index, path, ..
309            } => tracing::debug!(file_index, path, "file indexing started"),
310            IndexEvent::FileIndexed {
311                file_index,
312                path,
313                chunks_indexed,
314                duration_ms,
315                ..
316            } => tracing::info!(
317                file_index,
318                path,
319                chunks_indexed,
320                duration_ms,
321                "file indexed"
322            ),
323            IndexEvent::FileSkipped {
324                file_index,
325                path,
326                reason,
327                ..
328            } => tracing::debug!(file_index, path, reason, "file skipped"),
329            IndexEvent::FileFailed {
330                file_index,
331                path,
332                error,
333            } => tracing::warn!(file_index, path, error, "file failed"),
334            IndexEvent::StatsTick {
335                processed,
336                total,
337                files_per_sec,
338                in_flight,
339                ..
340            } => tracing::debug!(
341                processed,
342                total,
343                files_per_sec,
344                in_flight,
345                "index stats tick"
346            ),
347            IndexEvent::RunCompleted {
348                processed,
349                indexed,
350                skipped,
351                failed,
352                total_chunks,
353                stopped_early,
354                elapsed,
355            } => tracing::info!(
356                processed,
357                indexed,
358                skipped,
359                failed,
360                total_chunks,
361                stopped_early,
362                elapsed_secs = elapsed.as_secs_f64(),
363                "indexing run completed"
364            ),
365            IndexEvent::RunFailed { error, .. } => {
366                tracing::error!(error, "indexing run failed");
367            }
368            IndexEvent::Paused => tracing::info!("indexing paused"),
369            IndexEvent::Resumed => tracing::info!("indexing resumed"),
370            IndexEvent::ParallelismChanged { previous, current } => {
371                tracing::info!(previous, current, "indexing parallelism changed");
372            }
373            IndexEvent::StopRequested => tracing::info!("indexing stop requested"),
374            IndexEvent::Warning { code, message } => {
375                tracing::warn!(code, message, "indexing warning");
376            }
377        }
378    }
379}
380
381/// Forwards every event to every child sink.
382pub struct FanOut {
383    sinks: Vec<Arc<dyn IndexEventSink>>,
384}
385
386impl FanOut {
387    pub fn new(sinks: Vec<Arc<dyn IndexEventSink>>) -> Self {
388        Self { sinks }
389    }
390}
391
392impl IndexEventSink for FanOut {
393    fn on_event(&self, event: &IndexEvent) {
394        for sink in &self.sinks {
395            sink.on_event(event);
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::tui::indexer::contracts::{IndexEvent, new_index_telemetry};
404
405    #[test]
406    fn tui_telemetry_sink_tracks_completion_state() {
407        let (sender, receiver) = new_index_telemetry();
408        let sink = TuiTelemetrySink::new(Arc::new(sender));
409
410        sink.on_event(&IndexEvent::RunStarted {
411            total_files: 4,
412            namespace: "kb:test".to_string(),
413            source_dir: "/tmp/docs".to_string(),
414            parallelism: 2,
415            started_at: Utc::now(),
416        });
417        sink.on_event(&IndexEvent::FileStarted {
418            file_index: 0,
419            path: "a.md".to_string(),
420            size_bytes: 10,
421        });
422        sink.on_event(&IndexEvent::FileIndexed {
423            file_index: 0,
424            path: "a.md".to_string(),
425            chunks_indexed: 3,
426            content_hash: "aaa".to_string(),
427            duration_ms: 10,
428            embedder_ms: Some(7),
429            tokens_estimated: Some(20),
430        });
431        sink.on_event(&IndexEvent::FileStarted {
432            file_index: 1,
433            path: "b.md".to_string(),
434            size_bytes: 11,
435        });
436        sink.on_event(&IndexEvent::FileFailed {
437            file_index: 1,
438            path: "b.md".to_string(),
439            error: "boom".to_string(),
440        });
441        sink.on_event(&IndexEvent::RunCompleted {
442            processed: 2,
443            indexed: 1,
444            skipped: 0,
445            failed: 1,
446            total_chunks: 3,
447            elapsed: std::time::Duration::from_secs(1),
448            stopped_early: false,
449        });
450
451        let snapshot = receiver.borrow().clone();
452        assert_eq!(snapshot.processed, 2);
453        assert_eq!(snapshot.failed, 1);
454        assert_eq!(snapshot.indexed, 1);
455        assert_eq!(snapshot.total, 4);
456        assert_eq!(snapshot.total_chunks, 3);
457        assert!(snapshot.complete);
458        assert!(!snapshot.stopped_early);
459    }
460}