Skip to main content

ripvec_core/
profile.rs

1//! Pipeline profiling instrumentation.
2//!
3//! The [`Profiler`] enum provides real-time timing output for each pipeline
4//! phase. When disabled ([`Profiler::Noop`]), all methods are no-ops with
5//! zero overhead at release optimization levels.
6
7use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// Pipeline profiler that prints phase timing to stderr.
12///
13/// Use [`Profiler::new`] to create an active or noop profiler based on
14/// whether `--profile` was passed.
15/// Callback for live progress updates (e.g. driving a progress bar).
16///
17/// Called with a formatted status string whenever the profiler would
18/// normally print to stderr. Set via [`Profiler::with_callback`].
19type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21/// Snapshot of embed-phase progress, passed to the tick callback.
22#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24    /// Chunks completed so far.
25    pub done: usize,
26    /// Total chunks to embed.
27    pub total: usize,
28    /// Chunks/s over the most recent reporting window.
29    pub window_rate: f64,
30    /// Chunks/s since embed phase started.
31    pub overall_rate: f64,
32    /// Fraction of time spent waiting for the model lock (0.0–1.0).
33    pub lock_wait_pct: f64,
34    /// Fraction of time spent in inference (0.0–1.0).
35    pub inference_pct: f64,
36}
37
38/// Callback for per-batch embed progress.
39type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41/// Callback for observing completed embedding vectors.
42type EmbeddingBatchCallback = Box<dyn Fn(&[Vec<f32>]) + Send + Sync>;
43
44#[expect(
45    clippy::large_enum_variant,
46    reason = "Active is the common case; Noop is only for --profile=false"
47)]
48pub enum Profiler {
49    /// Actively collects timing and prints to stderr.
50    #[expect(
51        private_interfaces,
52        reason = "EmbedState is intentionally opaque; callers cannot name it"
53    )]
54    Active {
55        /// Wall-clock start of the entire run.
56        start: Instant,
57        /// Reporting interval for embed progress.
58        interval: Duration,
59        /// Mutable embed-phase state (sequential access only, Mutex for Sync).
60        embed: Mutex<EmbedState>,
61        /// Per-rayon-thread chunk counts (parallel access during chunk phase).
62        chunk_counts: Mutex<Vec<usize>>,
63        /// Optional callback for live progress updates.
64        on_progress: Option<ProgressCallback>,
65        /// Optional callback for per-chunk embed progress (drives progress bars).
66        on_embed_tick: Option<EmbedTickCallback>,
67        /// Optional callback for completed embedding vectors.
68        on_embedding_batch: Option<EmbeddingBatchCallback>,
69    },
70    /// No-op profiler. All methods are empty.
71    Noop,
72}
73
74pub(crate) struct EmbedState {
75    phase_start: Instant,
76    last_report: Instant,
77    chunks_at_last_report: usize,
78    /// Timestamp of the last `embed_tick` call (for per-batch window rate).
79    last_tick: Instant,
80    /// Chunks completed at last tick (for per-batch window rate).
81    chunks_at_last_tick: usize,
82    total_lock_wait: Duration,
83    total_inference: Duration,
84    total_chunks: usize,
85}
86
87impl EmbedState {
88    fn new() -> Self {
89        let now = Instant::now();
90        Self {
91            phase_start: now,
92            last_report: now,
93            chunks_at_last_report: 0,
94            last_tick: now,
95            chunks_at_last_tick: 0,
96            total_lock_wait: Duration::ZERO,
97            total_inference: Duration::ZERO,
98            total_chunks: 0,
99        }
100    }
101}
102
103impl Profiler {
104    /// Create a new profiler. If `enabled` is false, returns `Noop`.
105    #[must_use]
106    pub fn new(enabled: bool, interval: Duration) -> Self {
107        if enabled {
108            Self::Active {
109                start: Instant::now(),
110                interval,
111                embed: Mutex::new(EmbedState::new()),
112                chunk_counts: Mutex::new(Vec::new()),
113                on_progress: None,
114                on_embed_tick: None,
115                on_embedding_batch: None,
116            }
117        } else {
118            Self::Noop
119        }
120    }
121
122    /// Create an active profiler that drives a progress callback instead of
123    /// printing to stderr. The callback receives formatted status messages
124    /// at each pipeline phase transition and embed progress tick.
125    #[must_use]
126    pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
127        Self::Active {
128            start: Instant::now(),
129            interval,
130            embed: Mutex::new(EmbedState::new()),
131            chunk_counts: Mutex::new(Vec::new()),
132            on_progress: Some(Box::new(cb)),
133            on_embed_tick: None,
134            on_embedding_batch: None,
135        }
136    }
137
138    /// Set a callback that fires on every embed chunk completion with `(done, total)`.
139    ///
140    /// Unlike the throttled `on_progress` callback, this fires for every chunk
141    /// so progress bars can update smoothly (indicatif handles display rate).
142    #[must_use]
143    pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
144        if let Self::Active {
145            ref mut on_embed_tick,
146            ..
147        } = self
148        {
149            *on_embed_tick = Some(Box::new(cb));
150        }
151        self
152    }
153
154    /// Set a callback that receives each completed embedding batch.
155    #[must_use]
156    pub fn with_embedding_batch(
157        mut self,
158        cb: impl Fn(&[Vec<f32>]) + Send + Sync + 'static,
159    ) -> Self {
160        if let Self::Active {
161            ref mut on_embedding_batch,
162            ..
163        } = self
164        {
165            *on_embedding_batch = Some(Box::new(cb));
166        }
167        self
168    }
169
170    /// Notify observers that a batch of embeddings is available.
171    pub fn embedding_batch(&self, embeddings: &[Vec<f32>]) {
172        if embeddings.is_empty() {
173            return;
174        }
175        if let Self::Active {
176            on_embedding_batch, ..
177        } = self
178            && let Some(cb) = on_embedding_batch
179        {
180            cb(embeddings);
181        }
182    }
183
184    /// Create a no-op profiler.
185    #[must_use]
186    pub fn noop() -> Self {
187        Self::Noop
188    }
189
190    /// Send a progress message: calls the callback if set, otherwise prints to stderr.
191    fn report(&self, msg: &str) {
192        if let Self::Active { on_progress, .. } = self {
193            if let Some(cb) = on_progress {
194                cb(msg);
195            } else {
196                eprintln!("{msg}");
197            }
198        }
199    }
200
201    /// Print the system info header line.
202    pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
203        if let Self::Active { .. } = self {
204            self.report(&format!(
205                "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
206            ));
207        }
208    }
209
210    /// Start timing a named phase. Returns a guard that prints on drop.
211    #[must_use]
212    pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
213        if let Self::Active { start, .. } = self {
214            self.report(&format!(
215                "[{:.3}s] {name}: starting",
216                start.elapsed().as_secs_f64(),
217            ));
218        }
219        PhaseGuard {
220            profiler: self,
221            name,
222            start: Instant::now(),
223            detail: Cell::new(None),
224        }
225    }
226
227    /// Record that a rayon thread produced `n` chunks during the chunk phase.
228    pub fn chunk_thread_report(&self, n: usize) {
229        if let Self::Active { chunk_counts, .. } = self
230            && let Ok(mut counts) = chunk_counts.lock()
231        {
232            let idx = rayon::current_thread_index().unwrap_or(0);
233            if counts.len() <= idx {
234                counts.resize(idx + 1, 0);
235            }
236            counts[idx] += n;
237        }
238    }
239
240    /// Print the chunk phase summary with thread utilization stats.
241    pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
242        if let Self::Active {
243            start,
244            chunk_counts,
245            ..
246        } = self
247        {
248            let wall = start.elapsed();
249            if let Ok(counts) = chunk_counts.lock() {
250                let active = counts.iter().filter(|&&c| c > 0).count();
251                let pool_size = rayon::current_num_threads();
252                if active > 0 {
253                    let min = counts
254                        .iter()
255                        .filter(|&&c| c > 0)
256                        .min()
257                        .copied()
258                        .unwrap_or(0);
259                    let max = counts.iter().max().copied().unwrap_or(0);
260                    self.report(&format!(
261                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
262                        wall.as_secs_f64(),
263                        total_chunks,
264                        total_files,
265                        elapsed,
266                        pool_size,
267                        active,
268                        min,
269                        max,
270                    ));
271                } else {
272                    self.report(&format!(
273                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads)",
274                        wall.as_secs_f64(),
275                        total_chunks,
276                        total_files,
277                        elapsed,
278                        pool_size,
279                    ));
280                }
281            }
282        }
283    }
284
285    /// Begin the embed phase. Call before the embedding loop.
286    pub fn embed_begin(&self, total: usize) {
287        if let Self::Active { embed, .. } = self
288            && let Ok(mut state) = embed.lock()
289        {
290            let now = Instant::now();
291            state.phase_start = now;
292            state.last_report = now;
293            state.chunks_at_last_report = 0;
294            state.total_lock_wait = Duration::ZERO;
295            state.total_inference = Duration::ZERO;
296            state.total_chunks = total;
297        }
298    }
299
300    /// Update the total chunk count for the embed phase.
301    ///
302    /// Used by the streaming pipeline where the total isn't known at
303    /// [`Self::embed_begin`] time. Only updates if the new total is larger
304    /// than the current one (monotonic).
305    pub fn embed_begin_update_total(&self, total: usize) {
306        if let Self::Active { embed, .. } = self
307            && let Ok(mut state) = embed.lock()
308            && total > state.total_chunks
309        {
310            state.total_chunks = total;
311        }
312    }
313
314    /// Called after each chunk is embedded. Prints periodic progress.
315    #[expect(
316        clippy::cast_precision_loss,
317        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
318    )]
319    pub fn embed_tick(&self, done: usize) {
320        if let Self::Active {
321            start,
322            interval,
323            embed,
324            on_embed_tick,
325            ..
326        } = self
327        {
328            let Ok(mut state) = embed.lock() else {
329                return;
330            };
331            let now = Instant::now();
332            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
333            let overall_rate = if overall_elapsed > 0.0 {
334                done as f64 / overall_elapsed
335            } else {
336                0.0
337            };
338            let total_timing = state.total_lock_wait + state.total_inference;
339            let lock_pct = if total_timing.as_nanos() > 0 {
340                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
341            } else {
342                0.0
343            };
344
345            // Per-batch callback: compute a fresh per-batch window rate so the
346            // sparkline gets a new data point on every batch, not every interval.
347            if let Some(cb) = on_embed_tick {
348                let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
349                let tick_chunks = done - state.chunks_at_last_tick;
350                let batch_rate = if tick_elapsed > 0.0 {
351                    tick_chunks as f64 / tick_elapsed
352                } else {
353                    overall_rate
354                };
355                state.last_tick = now;
356                state.chunks_at_last_tick = done;
357
358                cb(&EmbedProgress {
359                    done,
360                    total: state.total_chunks,
361                    window_rate: batch_rate,
362                    overall_rate,
363                    lock_wait_pct: lock_pct,
364                    inference_pct: 1.0 - lock_pct,
365                });
366            }
367
368            // Throttled text report for --profile / spinner modes.
369            if now.duration_since(state.last_report) >= *interval {
370                let wall = start.elapsed();
371                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
372                let report_chunks = done - state.chunks_at_last_report;
373                let window_rate = if report_elapsed > 0.0 {
374                    report_chunks as f64 / report_elapsed
375                } else {
376                    0.0
377                };
378                self.report(&format!(
379                    "[{:.1}s]  embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
380                    wall.as_secs_f64(),
381                    done,
382                    state.total_chunks,
383                    report_elapsed,
384                    window_rate,
385                    overall_rate,
386                    lock_pct * 100.0,
387                    (1.0 - lock_pct) * 100.0,
388                ));
389                state.last_report = now;
390                state.chunks_at_last_report = done;
391            }
392        }
393    }
394
395    /// Byte-based progress for streaming mode.
396    ///
397    /// Shows `processed_bytes/total_bytes` as MB with chunk rate. The total is
398    /// known from the walk phase (file sizes), so the denominator is stable.
399    #[expect(
400        clippy::cast_precision_loss,
401        reason = "display-only: sub-1% precision loss acceptable for MB/rate"
402    )]
403    pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
404        if let Self::Active {
405            start,
406            interval,
407            embed,
408            ..
409        } = self
410        {
411            let Ok(mut state) = embed.lock() else {
412                return;
413            };
414            let now = Instant::now();
415            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
416            let overall_rate = if overall_elapsed > 0.0 {
417                done_chunks as f64 / overall_elapsed
418            } else {
419                0.0
420            };
421
422            if now.duration_since(state.last_report) >= *interval {
423                let wall = start.elapsed();
424                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
425                let report_chunks = done_chunks - state.chunks_at_last_report;
426                let window_rate = if report_elapsed > 0.0 {
427                    report_chunks as f64 / report_elapsed
428                } else {
429                    0.0
430                };
431                let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
432                let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
433                self.report(&format!(
434                    "[{:.1}s]  embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
435                    wall.as_secs_f64(),
436                    mb_done,
437                    mb_total,
438                    report_elapsed,
439                    window_rate,
440                    overall_rate,
441                ));
442                state.last_report = now;
443                state.chunks_at_last_report = done_chunks;
444            }
445        }
446    }
447
448    /// Accumulate time spent waiting for the model mutex lock.
449    pub fn embed_lock_wait(&self, duration: Duration) {
450        if let Self::Active { embed, .. } = self
451            && let Ok(mut state) = embed.lock()
452        {
453            state.total_lock_wait += duration;
454        }
455    }
456
457    /// Accumulate time spent in ONNX inference.
458    pub fn embed_inference(&self, duration: Duration) {
459        if let Self::Active { embed, .. } = self
460            && let Ok(mut state) = embed.lock()
461        {
462            state.total_inference += duration;
463        }
464    }
465
466    /// Print the final embed phase summary.
467    #[expect(
468        clippy::cast_precision_loss,
469        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
470    )]
471    pub fn embed_done(&self) {
472        if let Self::Active { start, embed, .. } = self
473            && let Ok(state) = embed.lock()
474        {
475            let wall = start.elapsed();
476            if state.total_chunks == 0 {
477                self.report(&format!(
478                    "[{:.1}s]  embed: skipped (0 chunks)",
479                    wall.as_secs_f64()
480                ));
481                return;
482            }
483            let elapsed = Instant::now().duration_since(state.phase_start);
484            let rate = if elapsed.as_secs_f64() > 0.0 {
485                state.total_chunks as f64 / elapsed.as_secs_f64()
486            } else {
487                0.0
488            };
489            let total_timing = state.total_lock_wait + state.total_inference;
490            let lock_pct = if total_timing.as_nanos() > 0 {
491                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
492            } else {
493                0.0
494            };
495            self.report(&format!(
496                "[{:.1}s]  embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
497                wall.as_secs_f64(),
498                state.total_chunks,
499                state.total_chunks,
500                elapsed.as_secs_f64(),
501                rate,
502                lock_pct,
503                100.0 - lock_pct,
504            ));
505        }
506    }
507
508    /// Print the total wall-clock time.
509    pub fn finish(&self) {
510        if let Self::Active { start, .. } = self {
511            let elapsed = start.elapsed().as_secs_f64();
512            self.report(&format!("[{elapsed:.1}s]  total: {elapsed:.1}s"));
513        }
514    }
515}
516
517/// Guard returned by [`Profiler::phase`]. Prints elapsed time on drop.
518pub struct PhaseGuard<'a> {
519    profiler: &'a Profiler,
520    name: &'static str,
521    start: Instant,
522    detail: Cell<Option<String>>,
523}
524
525impl PhaseGuard<'_> {
526    /// Attach a detail string printed alongside the phase timing.
527    pub fn set_detail(&self, detail: String) {
528        self.detail.set(Some(detail));
529    }
530}
531
532impl Drop for PhaseGuard<'_> {
533    fn drop(&mut self) {
534        if let Profiler::Active { start, .. } = self.profiler {
535            let elapsed = self.start.elapsed();
536            let wall = start.elapsed();
537            let msg = if let Some(detail) = self.detail.take() {
538                format!(
539                    "[{:.3}s] {}: {} in {:.1?}",
540                    wall.as_secs_f64(),
541                    self.name,
542                    detail,
543                    elapsed,
544                )
545            } else {
546                format!(
547                    "[{:.3}s] {}: {:.1?}",
548                    wall.as_secs_f64(),
549                    self.name,
550                    elapsed,
551                )
552            };
553            self.profiler.report(&msg);
554        }
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561    use std::time::Duration;
562
563    #[test]
564    fn profiler_is_sync() {
565        fn assert_sync<T: Sync>() {}
566        assert_sync::<Profiler>();
567    }
568
569    #[test]
570    fn noop_profiler_all_methods_are_safe() {
571        let p = Profiler::noop();
572        p.header("0.1.0", "test/repo", 4, 8);
573        {
574            let g = p.phase("test_phase");
575            g.set_detail("some detail".to_string());
576        }
577        p.chunk_thread_report(10);
578        p.chunk_summary(100, 50, Duration::from_millis(89));
579        p.embed_begin(100);
580        p.embed_tick(1);
581        p.embed_lock_wait(Duration::from_millis(1));
582        p.embed_inference(Duration::from_millis(10));
583        p.embed_done();
584        p.finish();
585    }
586
587    #[test]
588    fn active_profiler_phase_guard_formats_correctly() {
589        let p = Profiler::new(true, Duration::from_secs(10));
590        {
591            let g = p.phase("test_phase");
592            std::thread::sleep(Duration::from_millis(5));
593            g.set_detail("42 items".to_string());
594        }
595        // Just verify it doesn't panic — stderr output is visual
596    }
597
598    #[test]
599    fn embed_tick_respects_interval() {
600        let p = Profiler::new(true, Duration::from_secs(100)); // very long interval
601        p.embed_begin(10);
602        for i in 1..=10 {
603            p.embed_tick(i);
604            // With a 100s interval, no periodic output should fire
605        }
606        p.embed_done();
607    }
608
609    #[test]
610    fn chunk_summary_with_zero_files() {
611        let p = Profiler::new(true, Duration::from_secs(10));
612        p.chunk_summary(0, 0, Duration::from_millis(0));
613        // Should not panic or divide by zero
614    }
615
616    #[test]
617    fn embed_done_with_zero_chunks() {
618        let p = Profiler::new(true, Duration::from_secs(10));
619        p.embed_begin(0);
620        p.embed_done();
621        // Should print "skipped" line, not divide by zero
622    }
623
624    #[test]
625    fn embedding_batch_callback_receives_embeddings() {
626        let seen = std::sync::Arc::new(std::sync::Mutex::new(0usize));
627        let seen_for_cb = seen.clone();
628        let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_embedding_batch(
629            move |batch| {
630                *seen_for_cb.lock().unwrap() += batch.len();
631            },
632        );
633
634        p.embedding_batch(&[vec![1.0, 0.0], vec![0.0, 1.0]]);
635
636        assert_eq!(*seen.lock().unwrap(), 2);
637    }
638}