Skip to main content

ripvec_core/
profile.rs

1//! Pipeline profiling instrumentation.
2//!
3//! The [`Profiler`] enum provides real-time timing output for each pipeline
4//! phase. When disabled ([`Profiler::Noop`]), all methods are no-ops with
5//! zero overhead at release optimization levels.
6
7use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// Pipeline profiler that prints phase timing to stderr.
12///
13/// Use [`Profiler::new`] to create an active or noop profiler based on
14/// whether `--profile` was passed.
15/// Callback for live progress updates (e.g. driving a progress bar).
16///
17/// Called with a formatted status string whenever the profiler would
18/// normally print to stderr. Set via [`Profiler::with_callback`].
19type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21/// Snapshot of embed-phase progress, passed to the tick callback.
22#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24    /// Chunks completed so far.
25    pub done: usize,
26    /// Total chunks to embed.
27    pub total: usize,
28    /// Chunks/s over the most recent reporting window.
29    pub window_rate: f64,
30    /// Chunks/s since embed phase started.
31    pub overall_rate: f64,
32    /// Fraction of time spent waiting for the model lock (0.0–1.0).
33    pub lock_wait_pct: f64,
34    /// Fraction of time spent in inference (0.0–1.0).
35    pub inference_pct: f64,
36}
37
38/// Callback for per-batch embed progress.
39type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42    clippy::large_enum_variant,
43    reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46    /// Actively collects timing and prints to stderr.
47    #[expect(
48        private_interfaces,
49        reason = "EmbedState is intentionally opaque; callers cannot name it"
50    )]
51    Active {
52        /// Wall-clock start of the entire run.
53        start: Instant,
54        /// Reporting interval for embed progress.
55        interval: Duration,
56        /// Mutable embed-phase state (sequential access only, Mutex for Sync).
57        embed: Mutex<EmbedState>,
58        /// Per-rayon-thread chunk counts (parallel access during chunk phase).
59        chunk_counts: Mutex<Vec<usize>>,
60        /// Optional callback for live progress updates.
61        on_progress: Option<ProgressCallback>,
62        /// Optional callback for per-chunk embed progress (drives progress bars).
63        on_embed_tick: Option<EmbedTickCallback>,
64    },
65    /// No-op profiler. All methods are empty.
66    Noop,
67}
68
69pub(crate) struct EmbedState {
70    phase_start: Instant,
71    last_report: Instant,
72    chunks_at_last_report: usize,
73    /// Timestamp of the last `embed_tick` call (for per-batch window rate).
74    last_tick: Instant,
75    /// Chunks completed at last tick (for per-batch window rate).
76    chunks_at_last_tick: usize,
77    total_lock_wait: Duration,
78    total_inference: Duration,
79    total_chunks: usize,
80}
81
82impl EmbedState {
83    fn new() -> Self {
84        let now = Instant::now();
85        Self {
86            phase_start: now,
87            last_report: now,
88            chunks_at_last_report: 0,
89            last_tick: now,
90            chunks_at_last_tick: 0,
91            total_lock_wait: Duration::ZERO,
92            total_inference: Duration::ZERO,
93            total_chunks: 0,
94        }
95    }
96}
97
98impl Profiler {
99    /// Create a new profiler. If `enabled` is false, returns `Noop`.
100    #[must_use]
101    pub fn new(enabled: bool, interval: Duration) -> Self {
102        if enabled {
103            Self::Active {
104                start: Instant::now(),
105                interval,
106                embed: Mutex::new(EmbedState::new()),
107                chunk_counts: Mutex::new(Vec::new()),
108                on_progress: None,
109                on_embed_tick: None,
110            }
111        } else {
112            Self::Noop
113        }
114    }
115
116    /// Create an active profiler that drives a progress callback instead of
117    /// printing to stderr. The callback receives formatted status messages
118    /// at each pipeline phase transition and embed progress tick.
119    #[must_use]
120    pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121        Self::Active {
122            start: Instant::now(),
123            interval,
124            embed: Mutex::new(EmbedState::new()),
125            chunk_counts: Mutex::new(Vec::new()),
126            on_progress: Some(Box::new(cb)),
127            on_embed_tick: None,
128        }
129    }
130
131    /// Set a callback that fires on every embed chunk completion with `(done, total)`.
132    ///
133    /// Unlike the throttled `on_progress` callback, this fires for every chunk
134    /// so progress bars can update smoothly (indicatif handles display rate).
135    #[must_use]
136    pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137        if let Self::Active {
138            ref mut on_embed_tick,
139            ..
140        } = self
141        {
142            *on_embed_tick = Some(Box::new(cb));
143        }
144        self
145    }
146
147    /// Create a no-op profiler.
148    #[must_use]
149    pub fn noop() -> Self {
150        Self::Noop
151    }
152
153    /// Send a progress message: calls the callback if set, otherwise prints to stderr.
154    fn report(&self, msg: &str) {
155        if let Self::Active { on_progress, .. } = self {
156            if let Some(cb) = on_progress {
157                cb(msg);
158            } else {
159                eprintln!("{msg}");
160            }
161        }
162    }
163
164    /// Print the system info header line.
165    pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166        if let Self::Active { .. } = self {
167            self.report(&format!(
168                "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169            ));
170        }
171    }
172
173    /// Start timing a named phase. Returns a guard that prints on drop.
174    #[must_use]
175    pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176        PhaseGuard {
177            profiler: self,
178            name,
179            start: Instant::now(),
180            detail: Cell::new(None),
181        }
182    }
183
184    /// Record that a rayon thread produced `n` chunks during the chunk phase.
185    pub fn chunk_thread_report(&self, n: usize) {
186        if let Self::Active { chunk_counts, .. } = self
187            && let Ok(mut counts) = chunk_counts.lock()
188        {
189            let idx = rayon::current_thread_index().unwrap_or(0);
190            if counts.len() <= idx {
191                counts.resize(idx + 1, 0);
192            }
193            counts[idx] += n;
194        }
195    }
196
197    /// Print the chunk phase summary with thread utilization stats.
198    pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
199        if let Self::Active {
200            start,
201            chunk_counts,
202            ..
203        } = self
204        {
205            let wall = start.elapsed();
206            if let Ok(counts) = chunk_counts.lock() {
207                let active = counts.iter().filter(|&&c| c > 0).count();
208                let pool_size = rayon::current_num_threads();
209                if active > 0 {
210                    let min = counts
211                        .iter()
212                        .filter(|&&c| c > 0)
213                        .min()
214                        .copied()
215                        .unwrap_or(0);
216                    let max = counts.iter().max().copied().unwrap_or(0);
217                    self.report(&format!(
218                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
219                        wall.as_secs_f64(),
220                        total_chunks,
221                        total_files,
222                        elapsed,
223                        pool_size,
224                        active,
225                        min,
226                        max,
227                    ));
228                } else {
229                    self.report(&format!(
230                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads)",
231                        wall.as_secs_f64(),
232                        total_chunks,
233                        total_files,
234                        elapsed,
235                        pool_size,
236                    ));
237                }
238            }
239        }
240    }
241
242    /// Begin the embed phase. Call before the embedding loop.
243    pub fn embed_begin(&self, total: usize) {
244        if let Self::Active { embed, .. } = self
245            && let Ok(mut state) = embed.lock()
246        {
247            let now = Instant::now();
248            state.phase_start = now;
249            state.last_report = now;
250            state.chunks_at_last_report = 0;
251            state.total_lock_wait = Duration::ZERO;
252            state.total_inference = Duration::ZERO;
253            state.total_chunks = total;
254        }
255    }
256
257    /// Update the total chunk count for the embed phase.
258    ///
259    /// Used by the streaming pipeline where the total isn't known at
260    /// [`embed_begin`] time. Only updates if the new total is larger
261    /// than the current one (monotonic).
262    pub fn embed_begin_update_total(&self, total: usize) {
263        if let Self::Active { embed, .. } = self
264            && let Ok(mut state) = embed.lock()
265            && total > state.total_chunks
266        {
267            state.total_chunks = total;
268        }
269    }
270
271    /// Called after each chunk is embedded. Prints periodic progress.
272    #[expect(
273        clippy::cast_precision_loss,
274        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
275    )]
276    pub fn embed_tick(&self, done: usize) {
277        if let Self::Active {
278            start,
279            interval,
280            embed,
281            on_embed_tick,
282            ..
283        } = self
284        {
285            let Ok(mut state) = embed.lock() else {
286                return;
287            };
288            let now = Instant::now();
289            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
290            let overall_rate = if overall_elapsed > 0.0 {
291                done as f64 / overall_elapsed
292            } else {
293                0.0
294            };
295            let total_timing = state.total_lock_wait + state.total_inference;
296            let lock_pct = if total_timing.as_nanos() > 0 {
297                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
298            } else {
299                0.0
300            };
301
302            // Per-batch callback: compute a fresh per-batch window rate so the
303            // sparkline gets a new data point on every batch, not every interval.
304            if let Some(cb) = on_embed_tick {
305                let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
306                let tick_chunks = done - state.chunks_at_last_tick;
307                let batch_rate = if tick_elapsed > 0.0 {
308                    tick_chunks as f64 / tick_elapsed
309                } else {
310                    overall_rate
311                };
312                state.last_tick = now;
313                state.chunks_at_last_tick = done;
314
315                cb(&EmbedProgress {
316                    done,
317                    total: state.total_chunks,
318                    window_rate: batch_rate,
319                    overall_rate,
320                    lock_wait_pct: lock_pct,
321                    inference_pct: 1.0 - lock_pct,
322                });
323            }
324
325            // Throttled text report for --profile / spinner modes.
326            if now.duration_since(state.last_report) >= *interval {
327                let wall = start.elapsed();
328                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
329                let report_chunks = done - state.chunks_at_last_report;
330                let window_rate = if report_elapsed > 0.0 {
331                    report_chunks as f64 / report_elapsed
332                } else {
333                    0.0
334                };
335                self.report(&format!(
336                    "[{:.1}s]  embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
337                    wall.as_secs_f64(),
338                    done,
339                    state.total_chunks,
340                    report_elapsed,
341                    window_rate,
342                    overall_rate,
343                    lock_pct * 100.0,
344                    (1.0 - lock_pct) * 100.0,
345                ));
346                state.last_report = now;
347                state.chunks_at_last_report = done;
348            }
349        }
350    }
351
352    /// Byte-based progress for streaming mode.
353    ///
354    /// Shows `processed_bytes/total_bytes` as MB with chunk rate. The total is
355    /// known from the walk phase (file sizes), so the denominator is stable.
356    #[expect(
357        clippy::cast_precision_loss,
358        reason = "display-only: sub-1% precision loss acceptable for MB/rate"
359    )]
360    pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
361        if let Self::Active {
362            start,
363            interval,
364            embed,
365            ..
366        } = self
367        {
368            let Ok(mut state) = embed.lock() else {
369                return;
370            };
371            let now = Instant::now();
372            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
373            let overall_rate = if overall_elapsed > 0.0 {
374                done_chunks as f64 / overall_elapsed
375            } else {
376                0.0
377            };
378
379            if now.duration_since(state.last_report) >= *interval {
380                let wall = start.elapsed();
381                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
382                let report_chunks = done_chunks - state.chunks_at_last_report;
383                let window_rate = if report_elapsed > 0.0 {
384                    report_chunks as f64 / report_elapsed
385                } else {
386                    0.0
387                };
388                let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
389                let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
390                self.report(&format!(
391                    "[{:.1}s]  embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
392                    wall.as_secs_f64(),
393                    mb_done,
394                    mb_total,
395                    report_elapsed,
396                    window_rate,
397                    overall_rate,
398                ));
399                state.last_report = now;
400                state.chunks_at_last_report = done_chunks;
401            }
402        }
403    }
404
405    /// Accumulate time spent waiting for the model mutex lock.
406    pub fn embed_lock_wait(&self, duration: Duration) {
407        if let Self::Active { embed, .. } = self
408            && let Ok(mut state) = embed.lock()
409        {
410            state.total_lock_wait += duration;
411        }
412    }
413
414    /// Accumulate time spent in ONNX inference.
415    pub fn embed_inference(&self, duration: Duration) {
416        if let Self::Active { embed, .. } = self
417            && let Ok(mut state) = embed.lock()
418        {
419            state.total_inference += duration;
420        }
421    }
422
423    /// Print the final embed phase summary.
424    #[expect(
425        clippy::cast_precision_loss,
426        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
427    )]
428    pub fn embed_done(&self) {
429        if let Self::Active { start, embed, .. } = self
430            && let Ok(state) = embed.lock()
431        {
432            let wall = start.elapsed();
433            if state.total_chunks == 0 {
434                self.report(&format!(
435                    "[{:.1}s]  embed: skipped (0 chunks)",
436                    wall.as_secs_f64()
437                ));
438                return;
439            }
440            let elapsed = Instant::now().duration_since(state.phase_start);
441            let rate = if elapsed.as_secs_f64() > 0.0 {
442                state.total_chunks as f64 / elapsed.as_secs_f64()
443            } else {
444                0.0
445            };
446            let total_timing = state.total_lock_wait + state.total_inference;
447            let lock_pct = if total_timing.as_nanos() > 0 {
448                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
449            } else {
450                0.0
451            };
452            self.report(&format!(
453                "[{:.1}s]  embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
454                wall.as_secs_f64(),
455                state.total_chunks,
456                state.total_chunks,
457                elapsed.as_secs_f64(),
458                rate,
459                lock_pct,
460                100.0 - lock_pct,
461            ));
462        }
463    }
464
465    /// Print the total wall-clock time.
466    pub fn finish(&self) {
467        if let Self::Active { start, .. } = self {
468            let elapsed = start.elapsed().as_secs_f64();
469            self.report(&format!("[{elapsed:.1}s]  total: {elapsed:.1}s"));
470        }
471    }
472}
473
474/// Guard returned by [`Profiler::phase`]. Prints elapsed time on drop.
475pub struct PhaseGuard<'a> {
476    profiler: &'a Profiler,
477    name: &'static str,
478    start: Instant,
479    detail: Cell<Option<String>>,
480}
481
482impl PhaseGuard<'_> {
483    /// Attach a detail string printed alongside the phase timing.
484    pub fn set_detail(&self, detail: String) {
485        self.detail.set(Some(detail));
486    }
487}
488
489impl Drop for PhaseGuard<'_> {
490    fn drop(&mut self) {
491        if let Profiler::Active { start, .. } = self.profiler {
492            let elapsed = self.start.elapsed();
493            let wall = start.elapsed();
494            let msg = if let Some(detail) = self.detail.take() {
495                format!(
496                    "[{:.3}s] {}: {} in {:.1?}",
497                    wall.as_secs_f64(),
498                    self.name,
499                    detail,
500                    elapsed,
501                )
502            } else {
503                format!(
504                    "[{:.3}s] {}: {:.1?}",
505                    wall.as_secs_f64(),
506                    self.name,
507                    elapsed,
508                )
509            };
510            self.profiler.report(&msg);
511        }
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use std::time::Duration;
519
520    #[test]
521    fn profiler_is_sync() {
522        fn assert_sync<T: Sync>() {}
523        assert_sync::<Profiler>();
524    }
525
526    #[test]
527    fn noop_profiler_all_methods_are_safe() {
528        let p = Profiler::noop();
529        p.header("0.1.0", "test/repo", 4, 8);
530        {
531            let g = p.phase("test_phase");
532            g.set_detail("some detail".to_string());
533        }
534        p.chunk_thread_report(10);
535        p.chunk_summary(100, 50, Duration::from_millis(89));
536        p.embed_begin(100);
537        p.embed_tick(1);
538        p.embed_lock_wait(Duration::from_millis(1));
539        p.embed_inference(Duration::from_millis(10));
540        p.embed_done();
541        p.finish();
542    }
543
544    #[test]
545    fn active_profiler_phase_guard_formats_correctly() {
546        let p = Profiler::new(true, Duration::from_secs(10));
547        {
548            let g = p.phase("test_phase");
549            std::thread::sleep(Duration::from_millis(5));
550            g.set_detail("42 items".to_string());
551        }
552        // Just verify it doesn't panic — stderr output is visual
553    }
554
555    #[test]
556    fn embed_tick_respects_interval() {
557        let p = Profiler::new(true, Duration::from_secs(100)); // very long interval
558        p.embed_begin(10);
559        for i in 1..=10 {
560            p.embed_tick(i);
561            // With a 100s interval, no periodic output should fire
562        }
563        p.embed_done();
564    }
565
566    #[test]
567    fn chunk_summary_with_zero_files() {
568        let p = Profiler::new(true, Duration::from_secs(10));
569        p.chunk_summary(0, 0, Duration::from_millis(0));
570        // Should not panic or divide by zero
571    }
572
573    #[test]
574    fn embed_done_with_zero_chunks() {
575        let p = Profiler::new(true, Duration::from_secs(10));
576        p.embed_begin(0);
577        p.embed_done();
578        // Should print "skipped" line, not divide by zero
579    }
580}