1use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11use crate::chunk::CodeChunk;
12
13type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
22
23#[derive(Debug, Clone)]
25pub struct EmbedProgress {
26 pub done: usize,
28 pub total: usize,
30 pub window_rate: f64,
32 pub overall_rate: f64,
34 pub lock_wait_pct: f64,
36 pub inference_pct: f64,
38}
39
40type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
42
43type EmbeddingBatchCallback = Box<dyn Fn(&[Vec<f32>]) + Send + Sync>;
45
46type ChunkBatchCallback = Box<dyn Fn(&[CodeChunk]) + Send + Sync>;
48
49#[expect(
50 clippy::large_enum_variant,
51 reason = "Active is the common case; Noop is only for --profile=false"
52)]
53pub enum Profiler {
54 #[expect(
56 private_interfaces,
57 reason = "EmbedState is intentionally opaque; callers cannot name it"
58 )]
59 Active {
60 start: Instant,
62 interval: Duration,
64 embed: Mutex<EmbedState>,
66 chunk_counts: Mutex<Vec<usize>>,
68 on_progress: Option<ProgressCallback>,
70 on_embed_tick: Option<EmbedTickCallback>,
72 on_embedding_batch: Option<EmbeddingBatchCallback>,
74 on_chunk_batch: Option<ChunkBatchCallback>,
76 },
77 Noop,
79}
80
81pub(crate) struct EmbedState {
82 phase_start: Instant,
83 last_report: Instant,
84 chunks_at_last_report: usize,
85 last_tick: Instant,
87 chunks_at_last_tick: usize,
89 total_lock_wait: Duration,
90 total_inference: Duration,
91 total_chunks: usize,
92}
93
94impl EmbedState {
95 fn new() -> Self {
96 let now = Instant::now();
97 Self {
98 phase_start: now,
99 last_report: now,
100 chunks_at_last_report: 0,
101 last_tick: now,
102 chunks_at_last_tick: 0,
103 total_lock_wait: Duration::ZERO,
104 total_inference: Duration::ZERO,
105 total_chunks: 0,
106 }
107 }
108}
109
110impl Profiler {
111 #[must_use]
113 pub fn new(enabled: bool, interval: Duration) -> Self {
114 if enabled {
115 Self::Active {
116 start: Instant::now(),
117 interval,
118 embed: Mutex::new(EmbedState::new()),
119 chunk_counts: Mutex::new(Vec::new()),
120 on_progress: None,
121 on_embed_tick: None,
122 on_embedding_batch: None,
123 on_chunk_batch: None,
124 }
125 } else {
126 Self::Noop
127 }
128 }
129
130 #[must_use]
134 pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
135 Self::Active {
136 start: Instant::now(),
137 interval,
138 embed: Mutex::new(EmbedState::new()),
139 chunk_counts: Mutex::new(Vec::new()),
140 on_progress: Some(Box::new(cb)),
141 on_embed_tick: None,
142 on_embedding_batch: None,
143 on_chunk_batch: None,
144 }
145 }
146
147 #[must_use]
152 pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
153 if let Self::Active {
154 ref mut on_embed_tick,
155 ..
156 } = self
157 {
158 *on_embed_tick = Some(Box::new(cb));
159 }
160 self
161 }
162
163 #[must_use]
165 pub fn with_embedding_batch(
166 mut self,
167 cb: impl Fn(&[Vec<f32>]) + Send + Sync + 'static,
168 ) -> Self {
169 if let Self::Active {
170 ref mut on_embedding_batch,
171 ..
172 } = self
173 {
174 *on_embedding_batch = Some(Box::new(cb));
175 }
176 self
177 }
178
179 #[must_use]
181 pub fn with_chunk_batch(mut self, cb: impl Fn(&[CodeChunk]) + Send + Sync + 'static) -> Self {
182 if let Self::Active {
183 ref mut on_chunk_batch,
184 ..
185 } = self
186 {
187 *on_chunk_batch = Some(Box::new(cb));
188 }
189 self
190 }
191
192 pub fn embedding_batch(&self, embeddings: &[Vec<f32>]) {
194 if embeddings.is_empty() {
195 return;
196 }
197 if let Self::Active {
198 on_embedding_batch, ..
199 } = self
200 && let Some(cb) = on_embedding_batch
201 {
202 cb(embeddings);
203 }
204 }
205
206 pub fn chunk_batch(&self, chunks: &[CodeChunk]) {
208 if chunks.is_empty() {
209 return;
210 }
211 if let Self::Active { on_chunk_batch, .. } = self
212 && let Some(cb) = on_chunk_batch
213 {
214 cb(chunks);
215 }
216 }
217
218 #[must_use]
220 pub fn noop() -> Self {
221 Self::Noop
222 }
223
224 fn report(&self, msg: &str) {
226 if let Self::Active { on_progress, .. } = self {
227 if let Some(cb) = on_progress {
228 cb(msg);
229 } else {
230 eprintln!("{msg}");
231 }
232 }
233 }
234
235 pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
237 if let Self::Active { .. } = self {
238 self.report(&format!(
239 "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
240 ));
241 }
242 }
243
244 #[must_use]
246 pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
247 if let Self::Active { start, .. } = self {
248 self.report(&format!(
249 "[{:.3}s] {name}: starting",
250 start.elapsed().as_secs_f64(),
251 ));
252 }
253 PhaseGuard {
254 profiler: self,
255 name,
256 start: Instant::now(),
257 detail: Cell::new(None),
258 }
259 }
260
261 pub fn chunk_thread_report(&self, n: usize) {
263 if let Self::Active { chunk_counts, .. } = self
264 && let Ok(mut counts) = chunk_counts.lock()
265 {
266 let idx = rayon::current_thread_index().unwrap_or(0);
267 if counts.len() <= idx {
268 counts.resize(idx + 1, 0);
269 }
270 counts[idx] += n;
271 }
272 }
273
274 pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
276 if let Self::Active {
277 start,
278 chunk_counts,
279 ..
280 } = self
281 {
282 let wall = start.elapsed();
283 if let Ok(counts) = chunk_counts.lock() {
284 let active = counts.iter().filter(|&&c| c > 0).count();
285 let pool_size = rayon::current_num_threads();
286 if active > 0 {
287 let min = counts
288 .iter()
289 .filter(|&&c| c > 0)
290 .min()
291 .copied()
292 .unwrap_or(0);
293 let max = counts.iter().max().copied().unwrap_or(0);
294 self.report(&format!(
295 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
296 wall.as_secs_f64(),
297 total_chunks,
298 total_files,
299 elapsed,
300 pool_size,
301 active,
302 min,
303 max,
304 ));
305 } else {
306 self.report(&format!(
307 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads)",
308 wall.as_secs_f64(),
309 total_chunks,
310 total_files,
311 elapsed,
312 pool_size,
313 ));
314 }
315 }
316 }
317 }
318
319 pub fn embed_begin(&self, total: usize) {
321 if let Self::Active { embed, .. } = self
322 && let Ok(mut state) = embed.lock()
323 {
324 let now = Instant::now();
325 state.phase_start = now;
326 state.last_report = now;
327 state.chunks_at_last_report = 0;
328 state.total_lock_wait = Duration::ZERO;
329 state.total_inference = Duration::ZERO;
330 state.total_chunks = total;
331 }
332 }
333
334 pub fn embed_begin_update_total(&self, total: usize) {
340 if let Self::Active { embed, .. } = self
341 && let Ok(mut state) = embed.lock()
342 && total > state.total_chunks
343 {
344 state.total_chunks = total;
345 }
346 }
347
348 #[expect(
350 clippy::cast_precision_loss,
351 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
352 )]
353 pub fn embed_tick(&self, done: usize) {
354 if let Self::Active {
355 start,
356 interval,
357 embed,
358 on_embed_tick,
359 ..
360 } = self
361 {
362 let Ok(mut state) = embed.lock() else {
363 return;
364 };
365 let now = Instant::now();
366 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
367 let overall_rate = if overall_elapsed > 0.0 {
368 done as f64 / overall_elapsed
369 } else {
370 0.0
371 };
372 let total_timing = state.total_lock_wait + state.total_inference;
373 let lock_pct = if total_timing.as_nanos() > 0 {
374 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
375 } else {
376 0.0
377 };
378
379 if let Some(cb) = on_embed_tick {
382 let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
383 let tick_chunks = done - state.chunks_at_last_tick;
384 let batch_rate = if tick_elapsed > 0.0 {
385 tick_chunks as f64 / tick_elapsed
386 } else {
387 overall_rate
388 };
389 state.last_tick = now;
390 state.chunks_at_last_tick = done;
391
392 cb(&EmbedProgress {
393 done,
394 total: state.total_chunks,
395 window_rate: batch_rate,
396 overall_rate,
397 lock_wait_pct: lock_pct,
398 inference_pct: 1.0 - lock_pct,
399 });
400 }
401
402 if now.duration_since(state.last_report) >= *interval {
404 let wall = start.elapsed();
405 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
406 let report_chunks = done - state.chunks_at_last_report;
407 let window_rate = if report_elapsed > 0.0 {
408 report_chunks as f64 / report_elapsed
409 } else {
410 0.0
411 };
412 self.report(&format!(
413 "[{:.1}s] embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
414 wall.as_secs_f64(),
415 done,
416 state.total_chunks,
417 report_elapsed,
418 window_rate,
419 overall_rate,
420 lock_pct * 100.0,
421 (1.0 - lock_pct) * 100.0,
422 ));
423 state.last_report = now;
424 state.chunks_at_last_report = done;
425 }
426 }
427 }
428
429 #[expect(
434 clippy::cast_precision_loss,
435 reason = "display-only: sub-1% precision loss acceptable for MB/rate"
436 )]
437 pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
438 if let Self::Active {
439 start,
440 interval,
441 embed,
442 ..
443 } = self
444 {
445 let Ok(mut state) = embed.lock() else {
446 return;
447 };
448 let now = Instant::now();
449 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
450 let overall_rate = if overall_elapsed > 0.0 {
451 done_chunks as f64 / overall_elapsed
452 } else {
453 0.0
454 };
455
456 if now.duration_since(state.last_report) >= *interval {
457 let wall = start.elapsed();
458 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
459 let report_chunks = done_chunks - state.chunks_at_last_report;
460 let window_rate = if report_elapsed > 0.0 {
461 report_chunks as f64 / report_elapsed
462 } else {
463 0.0
464 };
465 let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
466 let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
467 self.report(&format!(
468 "[{:.1}s] embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
469 wall.as_secs_f64(),
470 mb_done,
471 mb_total,
472 report_elapsed,
473 window_rate,
474 overall_rate,
475 ));
476 state.last_report = now;
477 state.chunks_at_last_report = done_chunks;
478 }
479 }
480 }
481
482 pub fn embed_lock_wait(&self, duration: Duration) {
484 if let Self::Active { embed, .. } = self
485 && let Ok(mut state) = embed.lock()
486 {
487 state.total_lock_wait += duration;
488 }
489 }
490
491 pub fn embed_inference(&self, duration: Duration) {
493 if let Self::Active { embed, .. } = self
494 && let Ok(mut state) = embed.lock()
495 {
496 state.total_inference += duration;
497 }
498 }
499
500 #[expect(
502 clippy::cast_precision_loss,
503 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
504 )]
505 pub fn embed_done(&self) {
506 if let Self::Active { start, embed, .. } = self
507 && let Ok(state) = embed.lock()
508 {
509 let wall = start.elapsed();
510 if state.total_chunks == 0 {
511 self.report(&format!(
512 "[{:.1}s] embed: skipped (0 chunks)",
513 wall.as_secs_f64()
514 ));
515 return;
516 }
517 let elapsed = Instant::now().duration_since(state.phase_start);
518 let rate = if elapsed.as_secs_f64() > 0.0 {
519 state.total_chunks as f64 / elapsed.as_secs_f64()
520 } else {
521 0.0
522 };
523 let total_timing = state.total_lock_wait + state.total_inference;
524 let lock_pct = if total_timing.as_nanos() > 0 {
525 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
526 } else {
527 0.0
528 };
529 self.report(&format!(
530 "[{:.1}s] embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
531 wall.as_secs_f64(),
532 state.total_chunks,
533 state.total_chunks,
534 elapsed.as_secs_f64(),
535 rate,
536 lock_pct,
537 100.0 - lock_pct,
538 ));
539 }
540 }
541
542 pub fn finish(&self) {
544 if let Self::Active { start, .. } = self {
545 let elapsed = start.elapsed().as_secs_f64();
546 self.report(&format!("[{elapsed:.1}s] total: {elapsed:.1}s"));
547 }
548 }
549}
550
551pub struct PhaseGuard<'a> {
553 profiler: &'a Profiler,
554 name: &'static str,
555 start: Instant,
556 detail: Cell<Option<String>>,
557}
558
559impl PhaseGuard<'_> {
560 pub fn set_detail(&self, detail: String) {
562 self.detail.set(Some(detail));
563 }
564}
565
566impl Drop for PhaseGuard<'_> {
567 fn drop(&mut self) {
568 if let Profiler::Active { start, .. } = self.profiler {
569 let elapsed = self.start.elapsed();
570 let wall = start.elapsed();
571 let msg = if let Some(detail) = self.detail.take() {
572 format!(
573 "[{:.3}s] {}: {} in {:.1?}",
574 wall.as_secs_f64(),
575 self.name,
576 detail,
577 elapsed,
578 )
579 } else {
580 format!(
581 "[{:.3}s] {}: {:.1?}",
582 wall.as_secs_f64(),
583 self.name,
584 elapsed,
585 )
586 };
587 self.profiler.report(&msg);
588 }
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::chunk::CodeChunk;
596 use std::time::Duration;
597
598 #[test]
599 fn profiler_is_sync() {
600 fn assert_sync<T: Sync>() {}
601 assert_sync::<Profiler>();
602 }
603
604 #[test]
605 fn noop_profiler_all_methods_are_safe() {
606 let p = Profiler::noop();
607 p.header("0.1.0", "test/repo", 4, 8);
608 {
609 let g = p.phase("test_phase");
610 g.set_detail("some detail".to_string());
611 }
612 p.chunk_thread_report(10);
613 p.chunk_summary(100, 50, Duration::from_millis(89));
614 p.embed_begin(100);
615 p.embed_tick(1);
616 p.embed_lock_wait(Duration::from_millis(1));
617 p.embed_inference(Duration::from_millis(10));
618 p.embed_done();
619 p.finish();
620 }
621
622 #[test]
623 fn active_profiler_phase_guard_formats_correctly() {
624 let p = Profiler::new(true, Duration::from_secs(10));
625 {
626 let g = p.phase("test_phase");
627 std::thread::sleep(Duration::from_millis(5));
628 g.set_detail("42 items".to_string());
629 }
630 }
632
633 #[test]
634 fn embed_tick_respects_interval() {
635 let p = Profiler::new(true, Duration::from_secs(100)); p.embed_begin(10);
637 for i in 1..=10 {
638 p.embed_tick(i);
639 }
641 p.embed_done();
642 }
643
644 #[test]
645 fn chunk_summary_with_zero_files() {
646 let p = Profiler::new(true, Duration::from_secs(10));
647 p.chunk_summary(0, 0, Duration::from_millis(0));
648 }
650
651 #[test]
652 fn embed_done_with_zero_chunks() {
653 let p = Profiler::new(true, Duration::from_secs(10));
654 p.embed_begin(0);
655 p.embed_done();
656 }
658
659 #[test]
660 fn embedding_batch_callback_receives_embeddings() {
661 let seen = std::sync::Arc::new(std::sync::Mutex::new(0usize));
662 let seen_for_cb = seen.clone();
663 let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_embedding_batch(
664 move |batch| {
665 *seen_for_cb.lock().unwrap() += batch.len();
666 },
667 );
668
669 p.embedding_batch(&[vec![1.0, 0.0], vec![0.0, 1.0]]);
670
671 assert_eq!(*seen.lock().unwrap(), 2);
672 }
673
674 #[test]
675 fn chunk_batch_callback_receives_chunks() {
676 let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
677 let seen_for_cb = seen.clone();
678 let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_chunk_batch(
679 move |chunks| {
680 seen_for_cb
681 .lock()
682 .unwrap()
683 .extend(chunks.iter().map(|chunk| chunk.kind.clone()));
684 },
685 );
686 let chunks = vec![CodeChunk {
687 file_path: "ontology/schema.owl".to_string(),
688 name: "owl:Class".to_string(),
689 kind: "element".to_string(),
690 start_line: 1,
691 end_line: 1,
692 content: String::new(),
693 enriched_content: String::new(),
694 }];
695
696 p.chunk_batch(&chunks);
697
698 assert_eq!(*seen.lock().unwrap(), vec!["element".to_string()]);
699 }
700}