Skip to main content

ripvec_core/
profile.rs

1//! Pipeline profiling instrumentation.
2//!
3//! The [`Profiler`] enum provides real-time timing output for each pipeline
4//! phase. When disabled ([`Profiler::Noop`]), all methods are no-ops with
5//! zero overhead at release optimization levels.
6
7use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// Pipeline profiler that prints phase timing to stderr.
12///
13/// Use [`Profiler::new`] to create an active or noop profiler based on
14/// whether `--profile` was passed.
15/// Callback for live progress updates (e.g. driving a progress bar).
16///
17/// Called with a formatted status string whenever the profiler would
18/// normally print to stderr. Set via [`Profiler::with_callback`].
19type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21/// Snapshot of embed-phase progress, passed to the tick callback.
22#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24    /// Chunks completed so far.
25    pub done: usize,
26    /// Total chunks to embed.
27    pub total: usize,
28    /// Chunks/s over the most recent reporting window.
29    pub window_rate: f64,
30    /// Chunks/s since embed phase started.
31    pub overall_rate: f64,
32    /// Fraction of time spent waiting for the model lock (0.0–1.0).
33    pub lock_wait_pct: f64,
34    /// Fraction of time spent in inference (0.0–1.0).
35    pub inference_pct: f64,
36}
37
38/// Callback for per-batch embed progress.
39type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42    clippy::large_enum_variant,
43    reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46    /// Actively collects timing and prints to stderr.
47    #[expect(
48        private_interfaces,
49        reason = "EmbedState is intentionally opaque; callers cannot name it"
50    )]
51    Active {
52        /// Wall-clock start of the entire run.
53        start: Instant,
54        /// Reporting interval for embed progress.
55        interval: Duration,
56        /// Mutable embed-phase state (sequential access only, Mutex for Sync).
57        embed: Mutex<EmbedState>,
58        /// Per-rayon-thread chunk counts (parallel access during chunk phase).
59        chunk_counts: Mutex<Vec<usize>>,
60        /// Optional callback for live progress updates.
61        on_progress: Option<ProgressCallback>,
62        /// Optional callback for per-chunk embed progress (drives progress bars).
63        on_embed_tick: Option<EmbedTickCallback>,
64    },
65    /// No-op profiler. All methods are empty.
66    Noop,
67}
68
69pub(crate) struct EmbedState {
70    phase_start: Instant,
71    last_report: Instant,
72    chunks_at_last_report: usize,
73    /// Timestamp of the last `embed_tick` call (for per-batch window rate).
74    last_tick: Instant,
75    /// Chunks completed at last tick (for per-batch window rate).
76    chunks_at_last_tick: usize,
77    total_lock_wait: Duration,
78    total_inference: Duration,
79    total_chunks: usize,
80}
81
82impl EmbedState {
83    fn new() -> Self {
84        let now = Instant::now();
85        Self {
86            phase_start: now,
87            last_report: now,
88            chunks_at_last_report: 0,
89            last_tick: now,
90            chunks_at_last_tick: 0,
91            total_lock_wait: Duration::ZERO,
92            total_inference: Duration::ZERO,
93            total_chunks: 0,
94        }
95    }
96}
97
98impl Profiler {
99    /// Create a new profiler. If `enabled` is false, returns `Noop`.
100    #[must_use]
101    pub fn new(enabled: bool, interval: Duration) -> Self {
102        if enabled {
103            Self::Active {
104                start: Instant::now(),
105                interval,
106                embed: Mutex::new(EmbedState::new()),
107                chunk_counts: Mutex::new(Vec::new()),
108                on_progress: None,
109                on_embed_tick: None,
110            }
111        } else {
112            Self::Noop
113        }
114    }
115
116    /// Create an active profiler that drives a progress callback instead of
117    /// printing to stderr. The callback receives formatted status messages
118    /// at each pipeline phase transition and embed progress tick.
119    #[must_use]
120    pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121        Self::Active {
122            start: Instant::now(),
123            interval,
124            embed: Mutex::new(EmbedState::new()),
125            chunk_counts: Mutex::new(Vec::new()),
126            on_progress: Some(Box::new(cb)),
127            on_embed_tick: None,
128        }
129    }
130
131    /// Set a callback that fires on every embed chunk completion with `(done, total)`.
132    ///
133    /// Unlike the throttled `on_progress` callback, this fires for every chunk
134    /// so progress bars can update smoothly (indicatif handles display rate).
135    #[must_use]
136    pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137        if let Self::Active {
138            ref mut on_embed_tick,
139            ..
140        } = self
141        {
142            *on_embed_tick = Some(Box::new(cb));
143        }
144        self
145    }
146
147    /// Create a no-op profiler.
148    #[must_use]
149    pub fn noop() -> Self {
150        Self::Noop
151    }
152
153    /// Send a progress message: calls the callback if set, otherwise prints to stderr.
154    fn report(&self, msg: &str) {
155        if let Self::Active { on_progress, .. } = self {
156            if let Some(cb) = on_progress {
157                cb(msg);
158            } else {
159                eprintln!("{msg}");
160            }
161        }
162    }
163
164    /// Print the system info header line.
165    pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166        if let Self::Active { .. } = self {
167            self.report(&format!(
168                "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169            ));
170        }
171    }
172
173    /// Start timing a named phase. Returns a guard that prints on drop.
174    #[must_use]
175    pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176        PhaseGuard {
177            profiler: self,
178            name,
179            start: Instant::now(),
180            detail: Cell::new(None),
181        }
182    }
183
184    /// Record that a rayon thread produced `n` chunks during the chunk phase.
185    pub fn chunk_thread_report(&self, n: usize) {
186        if let Self::Active { chunk_counts, .. } = self
187            && let Ok(mut counts) = chunk_counts.lock()
188        {
189            let idx = rayon::current_thread_index().unwrap_or(0);
190            if counts.len() <= idx {
191                counts.resize(idx + 1, 0);
192            }
193            counts[idx] += n;
194        }
195    }
196
197    /// Print the chunk phase summary with thread utilization stats.
198    pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
199        if let Self::Active {
200            start,
201            chunk_counts,
202            ..
203        } = self
204        {
205            let wall = start.elapsed();
206            if let Ok(counts) = chunk_counts.lock() {
207                let active = counts.iter().filter(|&&c| c > 0).count();
208                let pool_size = rayon::current_num_threads();
209                if active > 0 {
210                    let min = counts
211                        .iter()
212                        .filter(|&&c| c > 0)
213                        .min()
214                        .copied()
215                        .unwrap_or(0);
216                    let max = counts.iter().max().copied().unwrap_or(0);
217                    self.report(&format!(
218                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
219                        wall.as_secs_f64(),
220                        total_chunks,
221                        total_files,
222                        elapsed,
223                        pool_size,
224                        active,
225                        min,
226                        max,
227                    ));
228                } else {
229                    self.report(&format!(
230                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads)",
231                        wall.as_secs_f64(),
232                        total_chunks,
233                        total_files,
234                        elapsed,
235                        pool_size,
236                    ));
237                }
238            }
239        }
240    }
241
242    /// Begin the embed phase. Call before the embedding loop.
243    pub fn embed_begin(&self, total: usize) {
244        if let Self::Active { embed, .. } = self
245            && let Ok(mut state) = embed.lock()
246        {
247            let now = Instant::now();
248            state.phase_start = now;
249            state.last_report = now;
250            state.chunks_at_last_report = 0;
251            state.total_lock_wait = Duration::ZERO;
252            state.total_inference = Duration::ZERO;
253            state.total_chunks = total;
254        }
255    }
256
257    /// Update the total chunk count for the embed phase.
258    ///
259    /// Used by the streaming pipeline where the total isn't known at
260    /// [`embed_begin`] time. Only updates if the new total is larger
261    /// than the current one (monotonic).
262    pub fn embed_begin_update_total(&self, total: usize) {
263        if let Self::Active { embed, .. } = self
264            && let Ok(mut state) = embed.lock()
265            && total > state.total_chunks
266        {
267            state.total_chunks = total;
268        }
269    }
270
271    /// Called after each chunk is embedded. Prints periodic progress.
272    #[expect(
273        clippy::cast_precision_loss,
274        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
275    )]
276    pub fn embed_tick(&self, done: usize) {
277        if let Self::Active {
278            start,
279            interval,
280            embed,
281            on_embed_tick,
282            ..
283        } = self
284        {
285            let Ok(mut state) = embed.lock() else {
286                return;
287            };
288            let now = Instant::now();
289            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
290            let overall_rate = if overall_elapsed > 0.0 {
291                done as f64 / overall_elapsed
292            } else {
293                0.0
294            };
295            let total_timing = state.total_lock_wait + state.total_inference;
296            let lock_pct = if total_timing.as_nanos() > 0 {
297                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
298            } else {
299                0.0
300            };
301
302            // Per-batch callback: compute a fresh per-batch window rate so the
303            // sparkline gets a new data point on every batch, not every interval.
304            if let Some(cb) = on_embed_tick {
305                let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
306                let tick_chunks = done - state.chunks_at_last_tick;
307                let batch_rate = if tick_elapsed > 0.0 {
308                    tick_chunks as f64 / tick_elapsed
309                } else {
310                    overall_rate
311                };
312                state.last_tick = now;
313                state.chunks_at_last_tick = done;
314
315                cb(&EmbedProgress {
316                    done,
317                    total: state.total_chunks,
318                    window_rate: batch_rate,
319                    overall_rate,
320                    lock_wait_pct: lock_pct,
321                    inference_pct: 1.0 - lock_pct,
322                });
323            }
324
325            // Throttled text report for --profile / spinner modes.
326            if now.duration_since(state.last_report) >= *interval {
327                let wall = start.elapsed();
328                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
329                let report_chunks = done - state.chunks_at_last_report;
330                let window_rate = if report_elapsed > 0.0 {
331                    report_chunks as f64 / report_elapsed
332                } else {
333                    0.0
334                };
335                self.report(&format!(
336                    "[{:.1}s]  embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
337                    wall.as_secs_f64(),
338                    done,
339                    state.total_chunks,
340                    report_elapsed,
341                    window_rate,
342                    overall_rate,
343                    lock_pct * 100.0,
344                    (1.0 - lock_pct) * 100.0,
345                ));
346                state.last_report = now;
347                state.chunks_at_last_report = done;
348            }
349        }
350    }
351
352    /// Byte-based progress for streaming mode.
353    ///
354    /// Shows `processed_bytes/total_bytes` as MB with chunk rate. The total is
355    /// known from the walk phase (file sizes), so the denominator is stable.
356    #[expect(
357        clippy::cast_precision_loss,
358        reason = "display-only: sub-1% precision loss acceptable for MB/rate"
359    )]
360    pub fn embed_tick_bytes(
361        &self,
362        done_chunks: usize,
363        bytes_processed: u64,
364        total_bytes: u64,
365    ) {
366        if let Self::Active {
367            start,
368            interval,
369            embed,
370            ..
371        } = self
372        {
373            let Ok(mut state) = embed.lock() else {
374                return;
375            };
376            let now = Instant::now();
377            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
378            let overall_rate = if overall_elapsed > 0.0 {
379                done_chunks as f64 / overall_elapsed
380            } else {
381                0.0
382            };
383
384            if now.duration_since(state.last_report) >= *interval {
385                let wall = start.elapsed();
386                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
387                let report_chunks = done_chunks - state.chunks_at_last_report;
388                let window_rate = if report_elapsed > 0.0 {
389                    report_chunks as f64 / report_elapsed
390                } else {
391                    0.0
392                };
393                let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
394                let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
395                self.report(&format!(
396                    "[{:.1}s]  embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
397                    wall.as_secs_f64(),
398                    mb_done,
399                    mb_total,
400                    report_elapsed,
401                    window_rate,
402                    overall_rate,
403                ));
404                state.last_report = now;
405                state.chunks_at_last_report = done_chunks;
406            }
407        }
408    }
409
410    /// Accumulate time spent waiting for the model mutex lock.
411    pub fn embed_lock_wait(&self, duration: Duration) {
412        if let Self::Active { embed, .. } = self
413            && let Ok(mut state) = embed.lock()
414        {
415            state.total_lock_wait += duration;
416        }
417    }
418
419    /// Accumulate time spent in ONNX inference.
420    pub fn embed_inference(&self, duration: Duration) {
421        if let Self::Active { embed, .. } = self
422            && let Ok(mut state) = embed.lock()
423        {
424            state.total_inference += duration;
425        }
426    }
427
428    /// Print the final embed phase summary.
429    #[expect(
430        clippy::cast_precision_loss,
431        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
432    )]
433    pub fn embed_done(&self) {
434        if let Self::Active { start, embed, .. } = self
435            && let Ok(state) = embed.lock()
436        {
437            let wall = start.elapsed();
438            if state.total_chunks == 0 {
439                self.report(&format!(
440                    "[{:.1}s]  embed: skipped (0 chunks)",
441                    wall.as_secs_f64()
442                ));
443                return;
444            }
445            let elapsed = Instant::now().duration_since(state.phase_start);
446            let rate = if elapsed.as_secs_f64() > 0.0 {
447                state.total_chunks as f64 / elapsed.as_secs_f64()
448            } else {
449                0.0
450            };
451            let total_timing = state.total_lock_wait + state.total_inference;
452            let lock_pct = if total_timing.as_nanos() > 0 {
453                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
454            } else {
455                0.0
456            };
457            self.report(&format!(
458                "[{:.1}s]  embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
459                wall.as_secs_f64(),
460                state.total_chunks,
461                state.total_chunks,
462                elapsed.as_secs_f64(),
463                rate,
464                lock_pct,
465                100.0 - lock_pct,
466            ));
467        }
468    }
469
470    /// Print the total wall-clock time.
471    pub fn finish(&self) {
472        if let Self::Active { start, .. } = self {
473            let elapsed = start.elapsed().as_secs_f64();
474            self.report(&format!("[{elapsed:.1}s]  total: {elapsed:.1}s"));
475        }
476    }
477}
478
479/// Guard returned by [`Profiler::phase`]. Prints elapsed time on drop.
480pub struct PhaseGuard<'a> {
481    profiler: &'a Profiler,
482    name: &'static str,
483    start: Instant,
484    detail: Cell<Option<String>>,
485}
486
487impl PhaseGuard<'_> {
488    /// Attach a detail string printed alongside the phase timing.
489    pub fn set_detail(&self, detail: String) {
490        self.detail.set(Some(detail));
491    }
492}
493
494impl Drop for PhaseGuard<'_> {
495    fn drop(&mut self) {
496        if let Profiler::Active { start, .. } = self.profiler {
497            let elapsed = self.start.elapsed();
498            let wall = start.elapsed();
499            let msg = if let Some(detail) = self.detail.take() {
500                format!(
501                    "[{:.3}s] {}: {} in {:.1?}",
502                    wall.as_secs_f64(),
503                    self.name,
504                    detail,
505                    elapsed,
506                )
507            } else {
508                format!(
509                    "[{:.3}s] {}: {:.1?}",
510                    wall.as_secs_f64(),
511                    self.name,
512                    elapsed,
513                )
514            };
515            self.profiler.report(&msg);
516        }
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use std::time::Duration;
524
525    #[test]
526    fn profiler_is_sync() {
527        fn assert_sync<T: Sync>() {}
528        assert_sync::<Profiler>();
529    }
530
531    #[test]
532    fn noop_profiler_all_methods_are_safe() {
533        let p = Profiler::noop();
534        p.header("0.1.0", "test/repo", 4, 8);
535        {
536            let g = p.phase("test_phase");
537            g.set_detail("some detail".to_string());
538        }
539        p.chunk_thread_report(10);
540        p.chunk_summary(100, 50, Duration::from_millis(89));
541        p.embed_begin(100);
542        p.embed_tick(1);
543        p.embed_lock_wait(Duration::from_millis(1));
544        p.embed_inference(Duration::from_millis(10));
545        p.embed_done();
546        p.finish();
547    }
548
549    #[test]
550    fn active_profiler_phase_guard_formats_correctly() {
551        let p = Profiler::new(true, Duration::from_secs(10));
552        {
553            let g = p.phase("test_phase");
554            std::thread::sleep(Duration::from_millis(5));
555            g.set_detail("42 items".to_string());
556        }
557        // Just verify it doesn't panic — stderr output is visual
558    }
559
560    #[test]
561    fn embed_tick_respects_interval() {
562        let p = Profiler::new(true, Duration::from_secs(100)); // very long interval
563        p.embed_begin(10);
564        for i in 1..=10 {
565            p.embed_tick(i);
566            // With a 100s interval, no periodic output should fire
567        }
568        p.embed_done();
569    }
570
571    #[test]
572    fn chunk_summary_with_zero_files() {
573        let p = Profiler::new(true, Duration::from_secs(10));
574        p.chunk_summary(0, 0, Duration::from_millis(0));
575        // Should not panic or divide by zero
576    }
577
578    #[test]
579    fn embed_done_with_zero_chunks() {
580        let p = Profiler::new(true, Duration::from_secs(10));
581        p.embed_begin(0);
582        p.embed_done();
583        // Should print "skipped" line, not divide by zero
584    }
585}