1use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24 pub done: usize,
26 pub total: usize,
28 pub window_rate: f64,
30 pub overall_rate: f64,
32 pub lock_wait_pct: f64,
34 pub inference_pct: f64,
36}
37
38type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41type EmbeddingBatchCallback = Box<dyn Fn(&[Vec<f32>]) + Send + Sync>;
43
44#[expect(
45 clippy::large_enum_variant,
46 reason = "Active is the common case; Noop is only for --profile=false"
47)]
48pub enum Profiler {
49 #[expect(
51 private_interfaces,
52 reason = "EmbedState is intentionally opaque; callers cannot name it"
53 )]
54 Active {
55 start: Instant,
57 interval: Duration,
59 embed: Mutex<EmbedState>,
61 chunk_counts: Mutex<Vec<usize>>,
63 on_progress: Option<ProgressCallback>,
65 on_embed_tick: Option<EmbedTickCallback>,
67 on_embedding_batch: Option<EmbeddingBatchCallback>,
69 },
70 Noop,
72}
73
74pub(crate) struct EmbedState {
75 phase_start: Instant,
76 last_report: Instant,
77 chunks_at_last_report: usize,
78 last_tick: Instant,
80 chunks_at_last_tick: usize,
82 total_lock_wait: Duration,
83 total_inference: Duration,
84 total_chunks: usize,
85}
86
87impl EmbedState {
88 fn new() -> Self {
89 let now = Instant::now();
90 Self {
91 phase_start: now,
92 last_report: now,
93 chunks_at_last_report: 0,
94 last_tick: now,
95 chunks_at_last_tick: 0,
96 total_lock_wait: Duration::ZERO,
97 total_inference: Duration::ZERO,
98 total_chunks: 0,
99 }
100 }
101}
102
103impl Profiler {
104 #[must_use]
106 pub fn new(enabled: bool, interval: Duration) -> Self {
107 if enabled {
108 Self::Active {
109 start: Instant::now(),
110 interval,
111 embed: Mutex::new(EmbedState::new()),
112 chunk_counts: Mutex::new(Vec::new()),
113 on_progress: None,
114 on_embed_tick: None,
115 on_embedding_batch: None,
116 }
117 } else {
118 Self::Noop
119 }
120 }
121
122 #[must_use]
126 pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
127 Self::Active {
128 start: Instant::now(),
129 interval,
130 embed: Mutex::new(EmbedState::new()),
131 chunk_counts: Mutex::new(Vec::new()),
132 on_progress: Some(Box::new(cb)),
133 on_embed_tick: None,
134 on_embedding_batch: None,
135 }
136 }
137
138 #[must_use]
143 pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
144 if let Self::Active {
145 ref mut on_embed_tick,
146 ..
147 } = self
148 {
149 *on_embed_tick = Some(Box::new(cb));
150 }
151 self
152 }
153
154 #[must_use]
156 pub fn with_embedding_batch(
157 mut self,
158 cb: impl Fn(&[Vec<f32>]) + Send + Sync + 'static,
159 ) -> Self {
160 if let Self::Active {
161 ref mut on_embedding_batch,
162 ..
163 } = self
164 {
165 *on_embedding_batch = Some(Box::new(cb));
166 }
167 self
168 }
169
170 pub fn embedding_batch(&self, embeddings: &[Vec<f32>]) {
172 if embeddings.is_empty() {
173 return;
174 }
175 if let Self::Active {
176 on_embedding_batch, ..
177 } = self
178 && let Some(cb) = on_embedding_batch
179 {
180 cb(embeddings);
181 }
182 }
183
184 #[must_use]
186 pub fn noop() -> Self {
187 Self::Noop
188 }
189
190 fn report(&self, msg: &str) {
192 if let Self::Active { on_progress, .. } = self {
193 if let Some(cb) = on_progress {
194 cb(msg);
195 } else {
196 eprintln!("{msg}");
197 }
198 }
199 }
200
201 pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
203 if let Self::Active { .. } = self {
204 self.report(&format!(
205 "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
206 ));
207 }
208 }
209
210 #[must_use]
212 pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
213 if let Self::Active { start, .. } = self {
214 self.report(&format!(
215 "[{:.3}s] {name}: starting",
216 start.elapsed().as_secs_f64(),
217 ));
218 }
219 PhaseGuard {
220 profiler: self,
221 name,
222 start: Instant::now(),
223 detail: Cell::new(None),
224 }
225 }
226
227 pub fn chunk_thread_report(&self, n: usize) {
229 if let Self::Active { chunk_counts, .. } = self
230 && let Ok(mut counts) = chunk_counts.lock()
231 {
232 let idx = rayon::current_thread_index().unwrap_or(0);
233 if counts.len() <= idx {
234 counts.resize(idx + 1, 0);
235 }
236 counts[idx] += n;
237 }
238 }
239
240 pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
242 if let Self::Active {
243 start,
244 chunk_counts,
245 ..
246 } = self
247 {
248 let wall = start.elapsed();
249 if let Ok(counts) = chunk_counts.lock() {
250 let active = counts.iter().filter(|&&c| c > 0).count();
251 let pool_size = rayon::current_num_threads();
252 if active > 0 {
253 let min = counts
254 .iter()
255 .filter(|&&c| c > 0)
256 .min()
257 .copied()
258 .unwrap_or(0);
259 let max = counts.iter().max().copied().unwrap_or(0);
260 self.report(&format!(
261 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
262 wall.as_secs_f64(),
263 total_chunks,
264 total_files,
265 elapsed,
266 pool_size,
267 active,
268 min,
269 max,
270 ));
271 } else {
272 self.report(&format!(
273 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads)",
274 wall.as_secs_f64(),
275 total_chunks,
276 total_files,
277 elapsed,
278 pool_size,
279 ));
280 }
281 }
282 }
283 }
284
285 pub fn embed_begin(&self, total: usize) {
287 if let Self::Active { embed, .. } = self
288 && let Ok(mut state) = embed.lock()
289 {
290 let now = Instant::now();
291 state.phase_start = now;
292 state.last_report = now;
293 state.chunks_at_last_report = 0;
294 state.total_lock_wait = Duration::ZERO;
295 state.total_inference = Duration::ZERO;
296 state.total_chunks = total;
297 }
298 }
299
300 pub fn embed_begin_update_total(&self, total: usize) {
306 if let Self::Active { embed, .. } = self
307 && let Ok(mut state) = embed.lock()
308 && total > state.total_chunks
309 {
310 state.total_chunks = total;
311 }
312 }
313
314 #[expect(
316 clippy::cast_precision_loss,
317 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
318 )]
319 pub fn embed_tick(&self, done: usize) {
320 if let Self::Active {
321 start,
322 interval,
323 embed,
324 on_embed_tick,
325 ..
326 } = self
327 {
328 let Ok(mut state) = embed.lock() else {
329 return;
330 };
331 let now = Instant::now();
332 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
333 let overall_rate = if overall_elapsed > 0.0 {
334 done as f64 / overall_elapsed
335 } else {
336 0.0
337 };
338 let total_timing = state.total_lock_wait + state.total_inference;
339 let lock_pct = if total_timing.as_nanos() > 0 {
340 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
341 } else {
342 0.0
343 };
344
345 if let Some(cb) = on_embed_tick {
348 let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
349 let tick_chunks = done - state.chunks_at_last_tick;
350 let batch_rate = if tick_elapsed > 0.0 {
351 tick_chunks as f64 / tick_elapsed
352 } else {
353 overall_rate
354 };
355 state.last_tick = now;
356 state.chunks_at_last_tick = done;
357
358 cb(&EmbedProgress {
359 done,
360 total: state.total_chunks,
361 window_rate: batch_rate,
362 overall_rate,
363 lock_wait_pct: lock_pct,
364 inference_pct: 1.0 - lock_pct,
365 });
366 }
367
368 if now.duration_since(state.last_report) >= *interval {
370 let wall = start.elapsed();
371 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
372 let report_chunks = done - state.chunks_at_last_report;
373 let window_rate = if report_elapsed > 0.0 {
374 report_chunks as f64 / report_elapsed
375 } else {
376 0.0
377 };
378 self.report(&format!(
379 "[{:.1}s] embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
380 wall.as_secs_f64(),
381 done,
382 state.total_chunks,
383 report_elapsed,
384 window_rate,
385 overall_rate,
386 lock_pct * 100.0,
387 (1.0 - lock_pct) * 100.0,
388 ));
389 state.last_report = now;
390 state.chunks_at_last_report = done;
391 }
392 }
393 }
394
395 #[expect(
400 clippy::cast_precision_loss,
401 reason = "display-only: sub-1% precision loss acceptable for MB/rate"
402 )]
403 pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
404 if let Self::Active {
405 start,
406 interval,
407 embed,
408 ..
409 } = self
410 {
411 let Ok(mut state) = embed.lock() else {
412 return;
413 };
414 let now = Instant::now();
415 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
416 let overall_rate = if overall_elapsed > 0.0 {
417 done_chunks as f64 / overall_elapsed
418 } else {
419 0.0
420 };
421
422 if now.duration_since(state.last_report) >= *interval {
423 let wall = start.elapsed();
424 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
425 let report_chunks = done_chunks - state.chunks_at_last_report;
426 let window_rate = if report_elapsed > 0.0 {
427 report_chunks as f64 / report_elapsed
428 } else {
429 0.0
430 };
431 let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
432 let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
433 self.report(&format!(
434 "[{:.1}s] embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
435 wall.as_secs_f64(),
436 mb_done,
437 mb_total,
438 report_elapsed,
439 window_rate,
440 overall_rate,
441 ));
442 state.last_report = now;
443 state.chunks_at_last_report = done_chunks;
444 }
445 }
446 }
447
448 pub fn embed_lock_wait(&self, duration: Duration) {
450 if let Self::Active { embed, .. } = self
451 && let Ok(mut state) = embed.lock()
452 {
453 state.total_lock_wait += duration;
454 }
455 }
456
457 pub fn embed_inference(&self, duration: Duration) {
459 if let Self::Active { embed, .. } = self
460 && let Ok(mut state) = embed.lock()
461 {
462 state.total_inference += duration;
463 }
464 }
465
466 #[expect(
468 clippy::cast_precision_loss,
469 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
470 )]
471 pub fn embed_done(&self) {
472 if let Self::Active { start, embed, .. } = self
473 && let Ok(state) = embed.lock()
474 {
475 let wall = start.elapsed();
476 if state.total_chunks == 0 {
477 self.report(&format!(
478 "[{:.1}s] embed: skipped (0 chunks)",
479 wall.as_secs_f64()
480 ));
481 return;
482 }
483 let elapsed = Instant::now().duration_since(state.phase_start);
484 let rate = if elapsed.as_secs_f64() > 0.0 {
485 state.total_chunks as f64 / elapsed.as_secs_f64()
486 } else {
487 0.0
488 };
489 let total_timing = state.total_lock_wait + state.total_inference;
490 let lock_pct = if total_timing.as_nanos() > 0 {
491 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
492 } else {
493 0.0
494 };
495 self.report(&format!(
496 "[{:.1}s] embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
497 wall.as_secs_f64(),
498 state.total_chunks,
499 state.total_chunks,
500 elapsed.as_secs_f64(),
501 rate,
502 lock_pct,
503 100.0 - lock_pct,
504 ));
505 }
506 }
507
508 pub fn finish(&self) {
510 if let Self::Active { start, .. } = self {
511 let elapsed = start.elapsed().as_secs_f64();
512 self.report(&format!("[{elapsed:.1}s] total: {elapsed:.1}s"));
513 }
514 }
515}
516
517pub struct PhaseGuard<'a> {
519 profiler: &'a Profiler,
520 name: &'static str,
521 start: Instant,
522 detail: Cell<Option<String>>,
523}
524
525impl PhaseGuard<'_> {
526 pub fn set_detail(&self, detail: String) {
528 self.detail.set(Some(detail));
529 }
530}
531
532impl Drop for PhaseGuard<'_> {
533 fn drop(&mut self) {
534 if let Profiler::Active { start, .. } = self.profiler {
535 let elapsed = self.start.elapsed();
536 let wall = start.elapsed();
537 let msg = if let Some(detail) = self.detail.take() {
538 format!(
539 "[{:.3}s] {}: {} in {:.1?}",
540 wall.as_secs_f64(),
541 self.name,
542 detail,
543 elapsed,
544 )
545 } else {
546 format!(
547 "[{:.3}s] {}: {:.1?}",
548 wall.as_secs_f64(),
549 self.name,
550 elapsed,
551 )
552 };
553 self.profiler.report(&msg);
554 }
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561 use std::time::Duration;
562
563 #[test]
564 fn profiler_is_sync() {
565 fn assert_sync<T: Sync>() {}
566 assert_sync::<Profiler>();
567 }
568
569 #[test]
570 fn noop_profiler_all_methods_are_safe() {
571 let p = Profiler::noop();
572 p.header("0.1.0", "test/repo", 4, 8);
573 {
574 let g = p.phase("test_phase");
575 g.set_detail("some detail".to_string());
576 }
577 p.chunk_thread_report(10);
578 p.chunk_summary(100, 50, Duration::from_millis(89));
579 p.embed_begin(100);
580 p.embed_tick(1);
581 p.embed_lock_wait(Duration::from_millis(1));
582 p.embed_inference(Duration::from_millis(10));
583 p.embed_done();
584 p.finish();
585 }
586
587 #[test]
588 fn active_profiler_phase_guard_formats_correctly() {
589 let p = Profiler::new(true, Duration::from_secs(10));
590 {
591 let g = p.phase("test_phase");
592 std::thread::sleep(Duration::from_millis(5));
593 g.set_detail("42 items".to_string());
594 }
595 }
597
598 #[test]
599 fn embed_tick_respects_interval() {
600 let p = Profiler::new(true, Duration::from_secs(100)); p.embed_begin(10);
602 for i in 1..=10 {
603 p.embed_tick(i);
604 }
606 p.embed_done();
607 }
608
609 #[test]
610 fn chunk_summary_with_zero_files() {
611 let p = Profiler::new(true, Duration::from_secs(10));
612 p.chunk_summary(0, 0, Duration::from_millis(0));
613 }
615
616 #[test]
617 fn embed_done_with_zero_chunks() {
618 let p = Profiler::new(true, Duration::from_secs(10));
619 p.embed_begin(0);
620 p.embed_done();
621 }
623
624 #[test]
625 fn embedding_batch_callback_receives_embeddings() {
626 let seen = std::sync::Arc::new(std::sync::Mutex::new(0usize));
627 let seen_for_cb = seen.clone();
628 let p = Profiler::with_callback(Duration::from_secs(10), |_| {}).with_embedding_batch(
629 move |batch| {
630 *seen_for_cb.lock().unwrap() += batch.len();
631 },
632 );
633
634 p.embedding_batch(&[vec![1.0, 0.0], vec![0.0, 1.0]]);
635
636 assert_eq!(*seen.lock().unwrap(), 2);
637 }
638}