Skip to main content

ripvec_core/
profile.rs

1//! Pipeline profiling instrumentation.
2//!
3//! The [`Profiler`] enum provides real-time timing output for each pipeline
4//! phase. When disabled ([`Profiler::Noop`]), all methods are no-ops with
5//! zero overhead at release optimization levels.
6
7use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11use crate::chunk::CodeChunk;
12
13/// Pipeline profiler that prints phase timing to stderr.
14///
15/// Use [`Profiler::new`] to create an active or noop profiler based on
16/// whether `--profile` was passed.
17/// Callback for live progress updates (e.g. driving a progress bar).
18///
19/// Called with a formatted status string whenever the profiler would
20/// normally print to stderr. Set via [`Profiler::with_callback`].
21type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
22
23/// Snapshot of embed-phase progress, passed to the tick callback.
24#[derive(Debug, Clone)]
25pub struct EmbedProgress {
26    /// Chunks completed so far.
27    pub done: usize,
28    /// Total chunks to embed.
29    pub total: usize,
30    /// Chunks/s over the most recent reporting window.
31    pub window_rate: f64,
32    /// Chunks/s since embed phase started.
33    pub overall_rate: f64,
34    /// Fraction of time spent waiting for the model lock (0.0–1.0).
35    pub lock_wait_pct: f64,
36    /// Fraction of time spent in inference (0.0–1.0).
37    pub inference_pct: f64,
38}
39
40/// Callback for per-batch embed progress.
41type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
42
43/// Callback for observing completed embedding vectors.
44type EmbeddingBatchCallback = Box<dyn Fn(&[Vec<f32>]) + Send + Sync>;
45
46/// Callback for observing source chunks as they are discovered.
47type ChunkBatchCallback = Box<dyn Fn(&[CodeChunk]) + Send + Sync>;
48
49#[expect(
50    clippy::large_enum_variant,
51    reason = "Active is the common case; Noop is only for --profile=false"
52)]
53pub enum Profiler {
54    /// Actively collects timing and prints to stderr.
55    #[expect(
56        private_interfaces,
57        reason = "EmbedState is intentionally opaque; callers cannot name it"
58    )]
59    Active {
60        /// Wall-clock start of the entire run.
61        start: Instant,
62        /// Reporting interval for embed progress.
63        interval: Duration,
64        /// Mutable embed-phase state (sequential access only, Mutex for Sync).
65        embed: Mutex<EmbedState>,
66        /// Per-rayon-thread chunk counts (parallel access during chunk phase).
67        chunk_counts: Mutex<Vec<usize>>,
68        /// Optional callback for live progress updates.
69        on_progress: Option<ProgressCallback>,
70        /// Optional callback for per-chunk embed progress (drives progress bars).
71        on_embed_tick: Option<EmbedTickCallback>,
72        /// Optional callback for completed embedding vectors.
73        on_embedding_batch: Option<EmbeddingBatchCallback>,
74        /// Optional callback for chunks discovered by the chunking stage.
75        on_chunk_batch: Option<ChunkBatchCallback>,
76    },
77    /// No-op profiler. All methods are empty.
78    Noop,
79}
80
81pub(crate) struct EmbedState {
82    phase_start: Instant,
83    last_report: Instant,
84    chunks_at_last_report: usize,
85    /// Timestamp of the last `embed_tick` call (for per-batch window rate).
86    last_tick: Instant,
87    /// Chunks completed at last tick (for per-batch window rate).
88    chunks_at_last_tick: usize,
89    total_lock_wait: Duration,
90    total_inference: Duration,
91    total_chunks: usize,
92}
93
94impl EmbedState {
95    fn new() -> Self {
96        let now = Instant::now();
97        Self {
98            phase_start: now,
99            last_report: now,
100            chunks_at_last_report: 0,
101            last_tick: now,
102            chunks_at_last_tick: 0,
103            total_lock_wait: Duration::ZERO,
104            total_inference: Duration::ZERO,
105            total_chunks: 0,
106        }
107    }
108}
109
110impl Profiler {
111    /// Create a new profiler. If `enabled` is false, returns `Noop`.
112    #[must_use]
113    pub fn new(enabled: bool, interval: Duration) -> Self {
114        if enabled {
115            Self::Active {
116                start: Instant::now(),
117                interval,
118                embed: Mutex::new(EmbedState::new()),
119                chunk_counts: Mutex::new(Vec::new()),
120                on_progress: None,
121                on_embed_tick: None,
122                on_embedding_batch: None,
123                on_chunk_batch: None,
124            }
125        } else {
126            Self::Noop
127        }
128    }
129
130    /// Create an active profiler that drives a progress callback instead of
131    /// printing to stderr. The callback receives formatted status messages
132    /// at each pipeline phase transition and embed progress tick.
133    #[must_use]
134    pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
135        Self::Active {
136            start: Instant::now(),
137            interval,
138            embed: Mutex::new(EmbedState::new()),
139            chunk_counts: Mutex::new(Vec::new()),
140            on_progress: Some(Box::new(cb)),
141            on_embed_tick: None,
142            on_embedding_batch: None,
143            on_chunk_batch: None,
144        }
145    }
146
147    /// Set a callback that fires on every embed chunk completion with `(done, total)`.
148    ///
149    /// Unlike the throttled `on_progress` callback, this fires for every chunk
150    /// so progress bars can update smoothly (indicatif handles display rate).
151    #[must_use]
152    pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
153        if let Self::Active {
154            ref mut on_embed_tick,
155            ..
156        } = self
157        {
158            *on_embed_tick = Some(Box::new(cb));
159        }
160        self
161    }
162
163    /// Set a callback that receives each completed embedding batch.
164    #[must_use]
165    pub fn with_embedding_batch(
166        mut self,
167        cb: impl Fn(&[Vec<f32>]) + Send + Sync + 'static,
168    ) -> Self {
169        if let Self::Active {
170            ref mut on_embedding_batch,
171            ..
172        } = self
173        {
174            *on_embedding_batch = Some(Box::new(cb));
175        }
176        self
177    }
178
179    /// Set a callback that receives chunks as source files are parsed.
180    #[must_use]
181    pub fn with_chunk_batch(mut self, cb: impl Fn(&[CodeChunk]) + Send + Sync + 'static) -> Self {
182        if let Self::Active {
183            ref mut on_chunk_batch,
184            ..
185        } = self
186        {
187            *on_chunk_batch = Some(Box::new(cb));
188        }
189        self
190    }
191
192    /// Notify observers that a batch of embeddings is available.
193    pub fn embedding_batch(&self, embeddings: &[Vec<f32>]) {
194        if embeddings.is_empty() {
195            return;
196        }
197        if let Self::Active {
198            on_embedding_batch, ..
199        } = self
200            && let Some(cb) = on_embedding_batch
201        {
202            cb(embeddings);
203        }
204    }
205
206    /// Notify observers that a batch of source chunks is available.
207    pub fn chunk_batch(&self, chunks: &[CodeChunk]) {
208        if chunks.is_empty() {
209            return;
210        }
211        if let Self::Active { on_chunk_batch, .. } = self
212            && let Some(cb) = on_chunk_batch
213        {
214            cb(chunks);
215        }
216    }
217
218    /// Create a no-op profiler.
219    #[must_use]
220    pub fn noop() -> Self {
221        Self::Noop
222    }
223
224    /// Send a progress message: calls the callback if set, otherwise prints to stderr.
225    fn report(&self, msg: &str) {
226        if let Self::Active { on_progress, .. } = self {
227            if let Some(cb) = on_progress {
228                cb(msg);
229            } else {
230                eprintln!("{msg}");
231            }
232        }
233    }
234
235    /// Print the system info header line.
236    pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
237        if let Self::Active { .. } = self {
238            self.report(&format!(
239                "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
240            ));
241        }
242    }
243
244    /// Start timing a named phase. Returns a guard that prints on drop.
245    #[must_use]
246    pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
247        if let Self::Active { start, .. } = self {
248            self.report(&format!(
249                "[{:.3}s] {name}: starting",
250                start.elapsed().as_secs_f64(),
251            ));
252        }
253        PhaseGuard {
254            profiler: self,
255            name,
256            start: Instant::now(),
257            detail: Cell::new(None),
258        }
259    }
260
261    /// Record that a rayon thread produced `n` chunks during the chunk phase.
262    pub fn chunk_thread_report(&self, n: usize) {
263        if let Self::Active { chunk_counts, .. } = self
264            && let Ok(mut counts) = chunk_counts.lock()
265        {
266            let idx = rayon::current_thread_index().unwrap_or(0);
267            if counts.len() <= idx {
268                counts.resize(idx + 1, 0);
269            }
270            counts[idx] += n;
271        }
272    }
273
274    /// Print the chunk phase summary with thread utilization stats.
275    pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
276        if let Self::Active {
277            start,
278            chunk_counts,
279            ..
280        } = self
281        {
282            let wall = start.elapsed();
283            if let Ok(counts) = chunk_counts.lock() {
284                let active = counts.iter().filter(|&&c| c > 0).count();
285                let pool_size = rayon::current_num_threads();
286                if active > 0 {
287                    let min = counts
288                        .iter()
289                        .filter(|&&c| c > 0)
290                        .min()
291                        .copied()
292                        .unwrap_or(0);
293                    let max = counts.iter().max().copied().unwrap_or(0);
294                    self.report(&format!(
295                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
296                        wall.as_secs_f64(),
297                        total_chunks,
298                        total_files,
299                        elapsed,
300                        pool_size,
301                        active,
302                        min,
303                        max,
304                    ));
305                } else {
306                    self.report(&format!(
307                        "[{:.1}s]  chunk: {} chunks from {} files in {:.0?} ({} threads)",
308                        wall.as_secs_f64(),
309                        total_chunks,
310                        total_files,
311                        elapsed,
312                        pool_size,
313                    ));
314                }
315            }
316        }
317    }
318
319    /// Begin the embed phase. Call before the embedding loop.
320    pub fn embed_begin(&self, total: usize) {
321        if let Self::Active { embed, .. } = self
322            && let Ok(mut state) = embed.lock()
323        {
324            let now = Instant::now();
325            state.phase_start = now;
326            state.last_report = now;
327            state.chunks_at_last_report = 0;
328            state.total_lock_wait = Duration::ZERO;
329            state.total_inference = Duration::ZERO;
330            state.total_chunks = total;
331        }
332    }
333
334    /// Update the total chunk count for the embed phase.
335    ///
336    /// Used by the streaming pipeline where the total isn't known at
337    /// [`Self::embed_begin`] time. Only updates if the new total is larger
338    /// than the current one (monotonic).
339    pub fn embed_begin_update_total(&self, total: usize) {
340        if let Self::Active { embed, .. } = self
341            && let Ok(mut state) = embed.lock()
342            && total > state.total_chunks
343        {
344            state.total_chunks = total;
345        }
346    }
347
348    /// Called after each chunk is embedded. Prints periodic progress.
349    #[expect(
350        clippy::cast_precision_loss,
351        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
352    )]
353    pub fn embed_tick(&self, done: usize) {
354        if let Self::Active {
355            start,
356            interval,
357            embed,
358            on_embed_tick,
359            ..
360        } = self
361        {
362            let Ok(mut state) = embed.lock() else {
363                return;
364            };
365            let now = Instant::now();
366            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
367            let overall_rate = if overall_elapsed > 0.0 {
368                done as f64 / overall_elapsed
369            } else {
370                0.0
371            };
372            let total_timing = state.total_lock_wait + state.total_inference;
373            let lock_pct = if total_timing.as_nanos() > 0 {
374                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
375            } else {
376                0.0
377            };
378
379            // Per-batch callback: compute a fresh per-batch window rate so the
380            // sparkline gets a new data point on every batch, not every interval.
381            if let Some(cb) = on_embed_tick {
382                let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
383                let tick_chunks = done - state.chunks_at_last_tick;
384                let batch_rate = if tick_elapsed > 0.0 {
385                    tick_chunks as f64 / tick_elapsed
386                } else {
387                    overall_rate
388                };
389                state.last_tick = now;
390                state.chunks_at_last_tick = done;
391
392                cb(&EmbedProgress {
393                    done,
394                    total: state.total_chunks,
395                    window_rate: batch_rate,
396                    overall_rate,
397                    lock_wait_pct: lock_pct,
398                    inference_pct: 1.0 - lock_pct,
399                });
400            }
401
402            // Throttled text report for --profile / spinner modes.
403            if now.duration_since(state.last_report) >= *interval {
404                let wall = start.elapsed();
405                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
406                let report_chunks = done - state.chunks_at_last_report;
407                let window_rate = if report_elapsed > 0.0 {
408                    report_chunks as f64 / report_elapsed
409                } else {
410                    0.0
411                };
412                self.report(&format!(
413                    "[{:.1}s]  embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
414                    wall.as_secs_f64(),
415                    done,
416                    state.total_chunks,
417                    report_elapsed,
418                    window_rate,
419                    overall_rate,
420                    lock_pct * 100.0,
421                    (1.0 - lock_pct) * 100.0,
422                ));
423                state.last_report = now;
424                state.chunks_at_last_report = done;
425            }
426        }
427    }
428
429    /// Byte-based progress for streaming mode.
430    ///
431    /// Shows `processed_bytes/total_bytes` as MB with chunk rate. The total is
432    /// known from the walk phase (file sizes), so the denominator is stable.
433    #[expect(
434        clippy::cast_precision_loss,
435        reason = "display-only: sub-1% precision loss acceptable for MB/rate"
436    )]
437    pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
438        if let Self::Active {
439            start,
440            interval,
441            embed,
442            ..
443        } = self
444        {
445            let Ok(mut state) = embed.lock() else {
446                return;
447            };
448            let now = Instant::now();
449            let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
450            let overall_rate = if overall_elapsed > 0.0 {
451                done_chunks as f64 / overall_elapsed
452            } else {
453                0.0
454            };
455
456            if now.duration_since(state.last_report) >= *interval {
457                let wall = start.elapsed();
458                let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
459                let report_chunks = done_chunks - state.chunks_at_last_report;
460                let window_rate = if report_elapsed > 0.0 {
461                    report_chunks as f64 / report_elapsed
462                } else {
463                    0.0
464                };
465                let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
466                let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
467                self.report(&format!(
468                    "[{:.1}s]  embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
469                    wall.as_secs_f64(),
470                    mb_done,
471                    mb_total,
472                    report_elapsed,
473                    window_rate,
474                    overall_rate,
475                ));
476                state.last_report = now;
477                state.chunks_at_last_report = done_chunks;
478            }
479        }
480    }
481
482    /// Accumulate time spent waiting for the model mutex lock.
483    pub fn embed_lock_wait(&self, duration: Duration) {
484        if let Self::Active { embed, .. } = self
485            && let Ok(mut state) = embed.lock()
486        {
487            state.total_lock_wait += duration;
488        }
489    }
490
491    /// Accumulate time spent in ONNX inference.
492    pub fn embed_inference(&self, duration: Duration) {
493        if let Self::Active { embed, .. } = self
494            && let Ok(mut state) = embed.lock()
495        {
496            state.total_inference += duration;
497        }
498    }
499
500    /// Print the final embed phase summary.
501    #[expect(
502        clippy::cast_precision_loss,
503        reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
504    )]
505    pub fn embed_done(&self) {
506        if let Self::Active { start, embed, .. } = self
507            && let Ok(state) = embed.lock()
508        {
509            let wall = start.elapsed();
510            if state.total_chunks == 0 {
511                self.report(&format!(
512                    "[{:.1}s]  embed: skipped (0 chunks)",
513                    wall.as_secs_f64()
514                ));
515                return;
516            }
517            let elapsed = Instant::now().duration_since(state.phase_start);
518            let rate = if elapsed.as_secs_f64() > 0.0 {
519                state.total_chunks as f64 / elapsed.as_secs_f64()
520            } else {
521                0.0
522            };
523            let total_timing = state.total_lock_wait + state.total_inference;
524            let lock_pct = if total_timing.as_nanos() > 0 {
525                state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
526            } else {
527                0.0
528            };
529            self.report(&format!(
530                "[{:.1}s]  embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
531                wall.as_secs_f64(),
532                state.total_chunks,
533                state.total_chunks,
534                elapsed.as_secs_f64(),
535                rate,
536                lock_pct,
537                100.0 - lock_pct,
538            ));
539        }
540    }
541
542    /// Print the total wall-clock time.
543    pub fn finish(&self) {
544        if let Self::Active { start, .. } = self {
545            let elapsed = start.elapsed().as_secs_f64();
546            self.report(&format!("[{elapsed:.1}s]  total: {elapsed:.1}s"));
547        }
548    }
549}
550
551/// Guard returned by [`Profiler::phase`]. Prints elapsed time on drop.
552pub struct PhaseGuard<'a> {
553    profiler: &'a Profiler,
554    name: &'static str,
555    start: Instant,
556    detail: Cell<Option<String>>,
557}
558
559impl PhaseGuard<'_> {
560    /// Attach a detail string printed alongside the phase timing.
561    pub fn set_detail(&self, detail: String) {
562        self.detail.set(Some(detail));
563    }
564}
565
566impl Drop for PhaseGuard<'_> {
567    fn drop(&mut self) {
568        if let Profiler::Active { start, .. } = self.profiler {
569            let elapsed = self.start.elapsed();
570            let wall = start.elapsed();
571            let msg = if let Some(detail) = self.detail.take() {
572                format!(
573                    "[{:.3}s] {}: {} in {:.1?}",
574                    wall.as_secs_f64(),
575                    self.name,
576                    detail,
577                    elapsed,
578                )
579            } else {
580                format!(
581                    "[{:.3}s] {}: {:.1?}",
582                    wall.as_secs_f64(),
583                    self.name,
584                    elapsed,
585                )
586            };
587            self.profiler.report(&msg);
588        }
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::chunk::CodeChunk;
596    use std::time::Duration;
597
598    #[test]
599    fn profiler_is_sync() {
600        fn assert_sync<T: Sync>() {}
601        assert_sync::<Profiler>();
602    }
603
604    #[test]
605    fn noop_profiler_all_methods_are_safe() {
606        let p = Profiler::noop();
607        p.header("0.1.0", "test/repo", 4, 8);
608        {
609            let g = p.phase("test_phase");
610            g.set_detail("some detail".to_string());
611        }
612        p.chunk_thread_report(10);
613        p.chunk_summary(100, 50, Duration::from_millis(89));
614        p.embed_begin(100);
615        p.embed_tick(1);
616        p.embed_lock_wait(Duration::from_millis(1));
617        p.embed_inference(Duration::from_millis(10));
618        p.embed_done();
619        p.finish();
620    }
621
622    #[test]
623    fn active_profiler_phase_guard_formats_correctly() {
624        let p = Profiler::new(true, Duration::from_secs(10));
625        {
626            let g = p.phase("test_phase");
627            std::thread::sleep(Duration::from_millis(5));
628            g.set_detail("42 items".to_string());
629        }
630        // Just verify it doesn't panic — stderr output is visual
631    }
632
633    #[test]
634    fn embed_tick_respects_interval() {
635        let p = Profiler::new(true, Duration::from_secs(100)); // very long interval
636        p.embed_begin(10);
637        for i in 1..=10 {
638            p.embed_tick(i);
639            // With a 100s interval, no periodic output should fire
640        }
641        p.embed_done();
642    }
643
644    #[test]
645    fn chunk_summary_with_zero_files() {
646        let p = Profiler::new(true, Duration::from_secs(10));
647        p.chunk_summary(0, 0, Duration::from_millis(0));
648        // Should not panic or divide by zero
649    }
650
651    #[test]
652    fn embed_done_with_zero_chunks() {
653        let p = Profiler::new(true, Duration::from_secs(10));
654        p.embed_begin(0);
655        p.embed_done();
656        // Should print "skipped" line, not divide by zero
657    }
658
659    #[test]
660    fn embedding_batch_callback_receives_embeddings() {
661        let seen = std::sync::Arc::new(std::sync::Mutex::new(0usize));
662        let seen_for_cb = seen.clone();
663        let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_embedding_batch(
664            move |batch| {
665                *seen_for_cb.lock().unwrap() += batch.len();
666            },
667        );
668
669        p.embedding_batch(&[vec![1.0, 0.0], vec![0.0, 1.0]]);
670
671        assert_eq!(*seen.lock().unwrap(), 2);
672    }
673
674    #[test]
675    fn chunk_batch_callback_receives_chunks() {
676        let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
677        let seen_for_cb = seen.clone();
678        let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_chunk_batch(
679            move |chunks| {
680                seen_for_cb
681                    .lock()
682                    .unwrap()
683                    .extend(chunks.iter().map(|chunk| chunk.kind.clone()));
684            },
685        );
686        let chunks = vec![CodeChunk {
687            file_path: "ontology/schema.owl".to_string(),
688            name: "owl:Class".to_string(),
689            kind: "element".to_string(),
690            start_line: 1,
691            end_line: 1,
692            content: String::new(),
693            enriched_content: String::new(),
694        }];
695
696        p.chunk_batch(&chunks);
697
698        assert_eq!(*seen.lock().unwrap(), vec!["element".to_string()]);
699    }
700}