Skip to main content

ripvec_core/
profile.rs

1//! Pipeline profiling instrumentation.
2//!
3//! The [`Profiler`] enum provides real-time timing output for each pipeline
4//! phase. When disabled ([`Profiler::Noop`]), all methods are no-ops with
5//! zero overhead at release optimization levels.
6
7use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// Pipeline profiler that prints phase timing to stderr.
12///
13/// Use [`Profiler::new`] to create an active or noop profiler based on
14/// whether `--profile` was passed.
15/// Callback for live progress updates (e.g. driving a progress bar).
16///
17/// Called with a formatted status string whenever the profiler would
18/// normally print to stderr. Set via [`Profiler::with_callback`].
19type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21/// Snapshot of embed-phase progress, passed to the tick callback.
22#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24    /// Chunks completed so far.
25    pub done: usize,
26    /// Total chunks to embed.
27    pub total: usize,
28    /// Chunks/s over the most recent reporting window.
29    pub window_rate: f64,
30    /// Chunks/s since embed phase started.
31    pub overall_rate: f64,
32    /// Fraction of time spent waiting for the model lock (0.0–1.0).
33    pub lock_wait_pct: f64,
34    /// Fraction of time spent in inference (0.0–1.0).
35    pub inference_pct: f64,
36}
37
38/// Callback for per-batch embed progress.
39type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42    clippy::large_enum_variant,
43    reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46    /// Actively collects timing and prints to stderr.
47    #[expect(
48        private_interfaces,
49        reason = "EmbedState is intentionally opaque; callers cannot name it"
50    )]
51    Active {
52        /// Wall-clock start of the entire run.
53        start: Instant,
54        /// Reporting interval for embed progress.
55        interval: Duration,
56        /// Mutable embed-phase state (sequential access only, Mutex for Sync).
57        embed: Mutex<EmbedState>,
58        /// Per-rayon-thread chunk counts (parallel access during chunk phase).
59        chunk_counts: Mutex<Vec<usize>>,
60        /// Optional callback for live progress updates.
61        on_progress: Option<ProgressCallback>,
62        /// Optional callback for per-chunk embed progress (drives progress bars).
63        on_embed_tick: Option<EmbedTickCallback>,
64    },
65    /// No-op profiler. All methods are empty.
66    Noop,
67}
68
69pub(crate) struct EmbedState {
70    phase_start: Instant,
71    last_report: Instant,
72    chunks_at_last_report: usize,
73    /// Timestamp of the last `embed_tick` call (for per-batch window rate).
74    last_tick: Instant,
75    /// Chunks completed at last tick (for per-batch window rate).
76    chunks_at_last_tick: usize,
77    total_lock_wait: Duration,
78    total_inference: Duration,
79    total_chunks: usize,
80}
81
82impl EmbedState {
83    fn new() -> Self {
84        let now = Instant::now();
85        Self {
86            phase_start: now,
87            last_report: now,
88            chunks_at_last_report: 0,
89            last_tick: now,
90            chunks_at_last_tick: 0,
91            total_lock_wait: Duration::ZERO,
92            total_inference: Duration::ZERO,
93            total_chunks: 0,
94        }
95    }
96}
97
98impl Profiler {
99    /// Create a new profiler. If `enabled` is false, returns `Noop`.
100    #[must_use]
101    pub fn new(enabled: bool, interval: Duration) -> Self {
102        if enabled {
103            Self::Active {
104                start: Instant::now(),
105                interval,
106                embed: Mutex::new(EmbedState::new()),
107                chunk_counts: Mutex::new(Vec::new()),
108                on_progress: None,
109                on_embed_tick: None,
110            }
111        } else {
112            Self::Noop
113        }
114    }
115
116    /// Create an active profiler that drives a progress callback instead of
117    /// printing to stderr. The callback receives formatted status messages
118    /// at each pipeline phase transition and embed progress tick.
119    #[must_use]
120    pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121        Self::Active {
122            start: Instant::now(),
123            interval,
124            embed: Mutex::new(EmbedState::new()),
125            chunk_counts: Mutex::new(Vec::new()),
126            on_progress: Some(Box::new(cb)),
127            on_embed_tick: None,
128        }
129    }
130
131    /// Set a callback that fires on every embed chunk completion with `(done, total)`.
132    ///
133    /// Unlike the throttled `on_progress` callback, this fires for every chunk
134    /// so progress bars can update smoothly (indicatif handles display rate).
135    #[must_use]
136    pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137        if let Self::Active {
138            ref mut on_embed_tick,
139            ..
140        } = self
141        {
142            *on_embed_tick = Some(Box::new(cb));
143        }
144        self
145    }
146
147    /// Create a no-op profiler.
148    #[must_use]
149    pub fn noop() -> Self {
150        Self::Noop
151    }
152
153    /// Send a progress message: calls the callback if set, otherwise prints to stderr.
154    fn report(&self, msg: &str) {
155        if let Self::Active { on_progress, .. } = self {
156            if let Some(cb) = on_progress {
157                cb(msg);
158            } else {
159                eprintln!("{msg}");
160            }
161        }
162    }
163
164    /// Print the system info header line.
165    pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166        if let Self::Active { .. } = self {
167            self.report(&format!(
168                "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169            ));
170        }
171    }
172
173    /// Start timing a named phase. Returns a guard that prints on drop.
174    #[must_use]
175    pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176        if let Self::Active { start, .. } = self {
177            self.report(&format!(
178                "[{:.3}s] {name}: starting",
179                start.elapsed().as_secs_f64(),
180            ));
181        }
182        PhaseGuard {
183            profiler: self,
184            name,
185            start: Instant::now(),
186            detail: Cell::new(None),
187        }
188    }
189
190    /// Record that a rayon thread produced `n` chunks during the chunk phase.
191    pub fn chunk_thread_report(&self, n: usize) {
192        if let Self::Active { chunk_counts, .. } = self
193            && let Ok(mut counts) = chunk_counts.lock()
194        {
195            let idx = rayon::current_thread_index().unwrap_or(0);
196            if counts.len() <= idx {
197                counts.resize(idx + 1, 0);
198            }
199            counts[idx] += n;
200        }
201    }
202
203    /// Print the chunk phase summary with thread utilization stats.
204    pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
205        if let Self::Active {
206            start,
207            chunk_counts,
208            ..
209        } = self
210        {
211            let wall = start.elapsed();
212            if let Ok(counts) = chunk_counts.lock() {
213                let active = counts.iter().filter(|&&c| c > 0).count();
214                let pool_size = rayon::current_num_threads();
215                if active > 0 {
216                    let min = counts
217                        .iter()
218                        .filter(|&&c| c > 0)
219                        .min()
220                        .copied()
221                        .unwrap_or(0);
222                    let max = counts.iter().max().copied().unwrap_or(0);
223                    self.report(&format!(
224                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
225                        wall.as_secs_f64(),
226                        total_chunks,
227                        total_files,
228                        elapsed,
229                        pool_size,
230                        active,
231                        min,
232                        max,
233                    ));
234                } else {
235                    self.report(&format!(
236                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads)",
237                        wall.as_secs_f64(),
238                        total_chunks,
239                        total_files,
240                        elapsed,
241                        pool_size,
242                    ));
243                }
244            }
245        }
246    }
247
248    /// Begin the embed phase. Call before the embedding loop.
249    pub fn embed_begin(&self, total: usize) {
250        if let Self::Active { embed, .. } = self
251            && let Ok(mut state) = embed.lock()
252        {
253            let now = Instant::now();
254            state.phase_start = now;
255            state.last_report = now;
256            state.chunks_at_last_report = 0;
257            state.total_lock_wait = Duration::ZERO;
258            state.total_inference = Duration::ZERO;
259            state.total_chunks = total;
260        }
261    }
262
263    /// Update the total chunk count for the embed phase.
264    ///
265    /// Used by the streaming pipeline where the total isn't known at
266    /// [`Self::embed_begin`] time. Only updates if the new total is larger
267    /// than the current one (monotonic).
268    pub fn embed_begin_update_total(&self, total: usize) {
269        if let Self::Active { embed, .. } = self
270            && let Ok(mut state) = embed.lock()
271            && total > state.total_chunks
272        {
273            state.total_chunks = total;
274        }
275    }
276
277    /// Called after each chunk is embedded. Prints periodic progress.
278    #[expect(
279        clippy::cast_precision_loss,
280        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
281    )]
282    pub fn embed_tick(&self, done: usize) {
283        if let Self::Active {
284            start,
285            interval,
286            embed,
287            on_embed_tick,
288            ..
289        } = self
290        {
291            let Ok(mut state) = embed.lock() else {
292                return;
293            };
294            let now = Instant::now();
295            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
296            let overall_rate = if overall_elapsed > 0.0 {
297                done as f64 / overall_elapsed
298            } else {
299                0.0
300            };
301            let total_timing = state.total_lock_wait + state.total_inference;
302            let lock_pct = if total_timing.as_nanos() > 0 {
303                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
304            } else {
305                0.0
306            };
307
308            // Per-batch callback: compute a fresh per-batch window rate so the
309            // sparkline gets a new data point on every batch, not every interval.
310            if let Some(cb) = on_embed_tick {
311                let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
312                let tick_chunks = done - state.chunks_at_last_tick;
313                let batch_rate = if tick_elapsed > 0.0 {
314                    tick_chunks as f64 / tick_elapsed
315                } else {
316                    overall_rate
317                };
318                state.last_tick = now;
319                state.chunks_at_last_tick = done;
320
321                cb(&EmbedProgress {
322                    done,
323                    total: state.total_chunks,
324                    window_rate: batch_rate,
325                    overall_rate,
326                    lock_wait_pct: lock_pct,
327                    inference_pct: 1.0 - lock_pct,
328                });
329            }
330
331            // Throttled text report for --profile / spinner modes.
332            if now.duration_since(state.last_report) >= *interval {
333                let wall = start.elapsed();
334                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
335                let report_chunks = done - state.chunks_at_last_report;
336                let window_rate = if report_elapsed > 0.0 {
337                    report_chunks as f64 / report_elapsed
338                } else {
339                    0.0
340                };
341                self.report(&format!(
342                    "[{:.1}s]  embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
343                    wall.as_secs_f64(),
344                    done,
345                    state.total_chunks,
346                    report_elapsed,
347                    window_rate,
348                    overall_rate,
349                    lock_pct * 100.0,
350                    (1.0 - lock_pct) * 100.0,
351                ));
352                state.last_report = now;
353                state.chunks_at_last_report = done;
354            }
355        }
356    }
357
358    /// Byte-based progress for streaming mode.
359    ///
360    /// Shows `processed_bytes/total_bytes` as MB with chunk rate. The total is
361    /// known from the walk phase (file sizes), so the denominator is stable.
362    #[expect(
363        clippy::cast_precision_loss,
364        reason = "display-only: sub-1% precision loss acceptable for MB/rate"
365    )]
366    pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
367        if let Self::Active {
368            start,
369            interval,
370            embed,
371            ..
372        } = self
373        {
374            let Ok(mut state) = embed.lock() else {
375                return;
376            };
377            let now = Instant::now();
378            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
379            let overall_rate = if overall_elapsed > 0.0 {
380                done_chunks as f64 / overall_elapsed
381            } else {
382                0.0
383            };
384
385            if now.duration_since(state.last_report) >= *interval {
386                let wall = start.elapsed();
387                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
388                let report_chunks = done_chunks - state.chunks_at_last_report;
389                let window_rate = if report_elapsed > 0.0 {
390                    report_chunks as f64 / report_elapsed
391                } else {
392                    0.0
393                };
394                let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
395                let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
396                self.report(&format!(
397                    "[{:.1}s]  embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
398                    wall.as_secs_f64(),
399                    mb_done,
400                    mb_total,
401                    report_elapsed,
402                    window_rate,
403                    overall_rate,
404                ));
405                state.last_report = now;
406                state.chunks_at_last_report = done_chunks;
407            }
408        }
409    }
410
411    /// Accumulate time spent waiting for the model mutex lock.
412    pub fn embed_lock_wait(&self, duration: Duration) {
413        if let Self::Active { embed, .. } = self
414            && let Ok(mut state) = embed.lock()
415        {
416            state.total_lock_wait += duration;
417        }
418    }
419
420    /// Accumulate time spent in ONNX inference.
421    pub fn embed_inference(&self, duration: Duration) {
422        if let Self::Active { embed, .. } = self
423            && let Ok(mut state) = embed.lock()
424        {
425            state.total_inference += duration;
426        }
427    }
428
429    /// Print the final embed phase summary.
430    #[expect(
431        clippy::cast_precision_loss,
432        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
433    )]
434    pub fn embed_done(&self) {
435        if let Self::Active { start, embed, .. } = self
436            && let Ok(state) = embed.lock()
437        {
438            let wall = start.elapsed();
439            if state.total_chunks == 0 {
440                self.report(&format!(
441                    "[{:.1}s]  embed: skipped (0 chunks)",
442                    wall.as_secs_f64()
443                ));
444                return;
445            }
446            let elapsed = Instant::now().duration_since(state.phase_start);
447            let rate = if elapsed.as_secs_f64() > 0.0 {
448                state.total_chunks as f64 / elapsed.as_secs_f64()
449            } else {
450                0.0
451            };
452            let total_timing = state.total_lock_wait + state.total_inference;
453            let lock_pct = if total_timing.as_nanos() > 0 {
454                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
455            } else {
456                0.0
457            };
458            self.report(&format!(
459                "[{:.1}s]  embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
460                wall.as_secs_f64(),
461                state.total_chunks,
462                state.total_chunks,
463                elapsed.as_secs_f64(),
464                rate,
465                lock_pct,
466                100.0 - lock_pct,
467            ));
468        }
469    }
470
471    /// Print the total wall-clock time.
472    pub fn finish(&self) {
473        if let Self::Active { start, .. } = self {
474            let elapsed = start.elapsed().as_secs_f64();
475            self.report(&format!("[{elapsed:.1}s]  total: {elapsed:.1}s"));
476        }
477    }
478}
479
480/// Guard returned by [`Profiler::phase`]. Prints elapsed time on drop.
481pub struct PhaseGuard<'a> {
482    profiler: &'a Profiler,
483    name: &'static str,
484    start: Instant,
485    detail: Cell<Option<String>>,
486}
487
488impl PhaseGuard<'_> {
489    /// Attach a detail string printed alongside the phase timing.
490    pub fn set_detail(&self, detail: String) {
491        self.detail.set(Some(detail));
492    }
493}
494
495impl Drop for PhaseGuard<'_> {
496    fn drop(&mut self) {
497        if let Profiler::Active { start, .. } = self.profiler {
498            let elapsed = self.start.elapsed();
499            let wall = start.elapsed();
500            let msg = if let Some(detail) = self.detail.take() {
501                format!(
502                    "[{:.3}s] {}: {} in {:.1?}",
503                    wall.as_secs_f64(),
504                    self.name,
505                    detail,
506                    elapsed,
507                )
508            } else {
509                format!(
510                    "[{:.3}s] {}: {:.1?}",
511                    wall.as_secs_f64(),
512                    self.name,
513                    elapsed,
514                )
515            };
516            self.profiler.report(&msg);
517        }
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use std::time::Duration;
525
526    #[test]
527    fn profiler_is_sync() {
528        fn assert_sync<T: Sync>() {}
529        assert_sync::<Profiler>();
530    }
531
532    #[test]
533    fn noop_profiler_all_methods_are_safe() {
534        let p = Profiler::noop();
535        p.header("0.1.0", "test/repo", 4, 8);
536        {
537            let g = p.phase("test_phase");
538            g.set_detail("some detail".to_string());
539        }
540        p.chunk_thread_report(10);
541        p.chunk_summary(100, 50, Duration::from_millis(89));
542        p.embed_begin(100);
543        p.embed_tick(1);
544        p.embed_lock_wait(Duration::from_millis(1));
545        p.embed_inference(Duration::from_millis(10));
546        p.embed_done();
547        p.finish();
548    }
549
550    #[test]
551    fn active_profiler_phase_guard_formats_correctly() {
552        let p = Profiler::new(true, Duration::from_secs(10));
553        {
554            let g = p.phase("test_phase");
555            std::thread::sleep(Duration::from_millis(5));
556            g.set_detail("42 items".to_string());
557        }
558        // Just verify it doesn't panic — stderr output is visual
559    }
560
561    #[test]
562    fn embed_tick_respects_interval() {
563        let p = Profiler::new(true, Duration::from_secs(100)); // very long interval
564        p.embed_begin(10);
565        for i in 1..=10 {
566            p.embed_tick(i);
567            // With a 100s interval, no periodic output should fire
568        }
569        p.embed_done();
570    }
571
572    #[test]
573    fn chunk_summary_with_zero_files() {
574        let p = Profiler::new(true, Duration::from_secs(10));
575        p.chunk_summary(0, 0, Duration::from_millis(0));
576        // Should not panic or divide by zero
577    }
578
579    #[test]
580    fn embed_done_with_zero_chunks() {
581        let p = Profiler::new(true, Duration::from_secs(10));
582        p.embed_begin(0);
583        p.embed_done();
584        // Should print "skipped" line, not divide by zero
585    }
586}