1use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24 pub done: usize,
26 pub total: usize,
28 pub window_rate: f64,
30 pub overall_rate: f64,
32 pub lock_wait_pct: f64,
34 pub inference_pct: f64,
36}
37
38type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42 clippy::large_enum_variant,
43 reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46 #[expect(
48 private_interfaces,
49 reason = "EmbedState is intentionally opaque; callers cannot name it"
50 )]
51 Active {
52 start: Instant,
54 interval: Duration,
56 embed: Mutex<EmbedState>,
58 chunk_counts: Mutex<Vec<usize>>,
60 on_progress: Option<ProgressCallback>,
62 on_embed_tick: Option<EmbedTickCallback>,
64 },
65 Noop,
67}
68
69pub(crate) struct EmbedState {
70 phase_start: Instant,
71 last_report: Instant,
72 chunks_at_last_report: usize,
73 last_tick: Instant,
75 chunks_at_last_tick: usize,
77 total_lock_wait: Duration,
78 total_inference: Duration,
79 total_chunks: usize,
80}
81
82impl EmbedState {
83 fn new() -> Self {
84 let now = Instant::now();
85 Self {
86 phase_start: now,
87 last_report: now,
88 chunks_at_last_report: 0,
89 last_tick: now,
90 chunks_at_last_tick: 0,
91 total_lock_wait: Duration::ZERO,
92 total_inference: Duration::ZERO,
93 total_chunks: 0,
94 }
95 }
96}
97
98impl Profiler {
99 #[must_use]
101 pub fn new(enabled: bool, interval: Duration) -> Self {
102 if enabled {
103 Self::Active {
104 start: Instant::now(),
105 interval,
106 embed: Mutex::new(EmbedState::new()),
107 chunk_counts: Mutex::new(Vec::new()),
108 on_progress: None,
109 on_embed_tick: None,
110 }
111 } else {
112 Self::Noop
113 }
114 }
115
116 #[must_use]
120 pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121 Self::Active {
122 start: Instant::now(),
123 interval,
124 embed: Mutex::new(EmbedState::new()),
125 chunk_counts: Mutex::new(Vec::new()),
126 on_progress: Some(Box::new(cb)),
127 on_embed_tick: None,
128 }
129 }
130
131 #[must_use]
136 pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137 if let Self::Active {
138 ref mut on_embed_tick,
139 ..
140 } = self
141 {
142 *on_embed_tick = Some(Box::new(cb));
143 }
144 self
145 }
146
147 #[must_use]
149 pub fn noop() -> Self {
150 Self::Noop
151 }
152
153 fn report(&self, msg: &str) {
155 if let Self::Active { on_progress, .. } = self {
156 if let Some(cb) = on_progress {
157 cb(msg);
158 } else {
159 eprintln!("{msg}");
160 }
161 }
162 }
163
164 pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166 if let Self::Active { .. } = self {
167 self.report(&format!(
168 "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169 ));
170 }
171 }
172
173 #[must_use]
175 pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176 PhaseGuard {
177 profiler: self,
178 name,
179 start: Instant::now(),
180 detail: Cell::new(None),
181 }
182 }
183
184 pub fn chunk_thread_report(&self, n: usize) {
186 if let Self::Active { chunk_counts, .. } = self
187 && let Ok(mut counts) = chunk_counts.lock()
188 {
189 let idx = rayon::current_thread_index().unwrap_or(0);
190 if counts.len() <= idx {
191 counts.resize(idx + 1, 0);
192 }
193 counts[idx] += n;
194 }
195 }
196
197 pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
199 if let Self::Active {
200 start,
201 chunk_counts,
202 ..
203 } = self
204 {
205 let wall = start.elapsed();
206 if let Ok(counts) = chunk_counts.lock() {
207 let active = counts.iter().filter(|&&c| c > 0).count();
208 let pool_size = rayon::current_num_threads();
209 if active > 0 {
210 let min = counts
211 .iter()
212 .filter(|&&c| c > 0)
213 .min()
214 .copied()
215 .unwrap_or(0);
216 let max = counts.iter().max().copied().unwrap_or(0);
217 self.report(&format!(
218 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
219 wall.as_secs_f64(),
220 total_chunks,
221 total_files,
222 elapsed,
223 pool_size,
224 active,
225 min,
226 max,
227 ));
228 } else {
229 self.report(&format!(
230 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads)",
231 wall.as_secs_f64(),
232 total_chunks,
233 total_files,
234 elapsed,
235 pool_size,
236 ));
237 }
238 }
239 }
240 }
241
242 pub fn embed_begin(&self, total: usize) {
244 if let Self::Active { embed, .. } = self
245 && let Ok(mut state) = embed.lock()
246 {
247 let now = Instant::now();
248 state.phase_start = now;
249 state.last_report = now;
250 state.chunks_at_last_report = 0;
251 state.total_lock_wait = Duration::ZERO;
252 state.total_inference = Duration::ZERO;
253 state.total_chunks = total;
254 }
255 }
256
257 pub fn embed_begin_update_total(&self, total: usize) {
263 if let Self::Active { embed, .. } = self
264 && let Ok(mut state) = embed.lock()
265 && total > state.total_chunks
266 {
267 state.total_chunks = total;
268 }
269 }
270
271 #[expect(
273 clippy::cast_precision_loss,
274 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
275 )]
276 pub fn embed_tick(&self, done: usize) {
277 if let Self::Active {
278 start,
279 interval,
280 embed,
281 on_embed_tick,
282 ..
283 } = self
284 {
285 let Ok(mut state) = embed.lock() else {
286 return;
287 };
288 let now = Instant::now();
289 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
290 let overall_rate = if overall_elapsed > 0.0 {
291 done as f64 / overall_elapsed
292 } else {
293 0.0
294 };
295 let total_timing = state.total_lock_wait + state.total_inference;
296 let lock_pct = if total_timing.as_nanos() > 0 {
297 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
298 } else {
299 0.0
300 };
301
302 if let Some(cb) = on_embed_tick {
305 let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
306 let tick_chunks = done - state.chunks_at_last_tick;
307 let batch_rate = if tick_elapsed > 0.0 {
308 tick_chunks as f64 / tick_elapsed
309 } else {
310 overall_rate
311 };
312 state.last_tick = now;
313 state.chunks_at_last_tick = done;
314
315 cb(&EmbedProgress {
316 done,
317 total: state.total_chunks,
318 window_rate: batch_rate,
319 overall_rate,
320 lock_wait_pct: lock_pct,
321 inference_pct: 1.0 - lock_pct,
322 });
323 }
324
325 if now.duration_since(state.last_report) >= *interval {
327 let wall = start.elapsed();
328 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
329 let report_chunks = done - state.chunks_at_last_report;
330 let window_rate = if report_elapsed > 0.0 {
331 report_chunks as f64 / report_elapsed
332 } else {
333 0.0
334 };
335 self.report(&format!(
336 "[{:.1}s] embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
337 wall.as_secs_f64(),
338 done,
339 state.total_chunks,
340 report_elapsed,
341 window_rate,
342 overall_rate,
343 lock_pct * 100.0,
344 (1.0 - lock_pct) * 100.0,
345 ));
346 state.last_report = now;
347 state.chunks_at_last_report = done;
348 }
349 }
350 }
351
352 #[expect(
357 clippy::cast_precision_loss,
358 reason = "display-only: sub-1% precision loss acceptable for MB/rate"
359 )]
360 pub fn embed_tick_bytes(
361 &self,
362 done_chunks: usize,
363 bytes_processed: u64,
364 total_bytes: u64,
365 ) {
366 if let Self::Active {
367 start,
368 interval,
369 embed,
370 ..
371 } = self
372 {
373 let Ok(mut state) = embed.lock() else {
374 return;
375 };
376 let now = Instant::now();
377 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
378 let overall_rate = if overall_elapsed > 0.0 {
379 done_chunks as f64 / overall_elapsed
380 } else {
381 0.0
382 };
383
384 if now.duration_since(state.last_report) >= *interval {
385 let wall = start.elapsed();
386 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
387 let report_chunks = done_chunks - state.chunks_at_last_report;
388 let window_rate = if report_elapsed > 0.0 {
389 report_chunks as f64 / report_elapsed
390 } else {
391 0.0
392 };
393 let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
394 let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
395 self.report(&format!(
396 "[{:.1}s] embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
397 wall.as_secs_f64(),
398 mb_done,
399 mb_total,
400 report_elapsed,
401 window_rate,
402 overall_rate,
403 ));
404 state.last_report = now;
405 state.chunks_at_last_report = done_chunks;
406 }
407 }
408 }
409
410 pub fn embed_lock_wait(&self, duration: Duration) {
412 if let Self::Active { embed, .. } = self
413 && let Ok(mut state) = embed.lock()
414 {
415 state.total_lock_wait += duration;
416 }
417 }
418
419 pub fn embed_inference(&self, duration: Duration) {
421 if let Self::Active { embed, .. } = self
422 && let Ok(mut state) = embed.lock()
423 {
424 state.total_inference += duration;
425 }
426 }
427
428 #[expect(
430 clippy::cast_precision_loss,
431 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
432 )]
433 pub fn embed_done(&self) {
434 if let Self::Active { start, embed, .. } = self
435 && let Ok(state) = embed.lock()
436 {
437 let wall = start.elapsed();
438 if state.total_chunks == 0 {
439 self.report(&format!(
440 "[{:.1}s] embed: skipped (0 chunks)",
441 wall.as_secs_f64()
442 ));
443 return;
444 }
445 let elapsed = Instant::now().duration_since(state.phase_start);
446 let rate = if elapsed.as_secs_f64() > 0.0 {
447 state.total_chunks as f64 / elapsed.as_secs_f64()
448 } else {
449 0.0
450 };
451 let total_timing = state.total_lock_wait + state.total_inference;
452 let lock_pct = if total_timing.as_nanos() > 0 {
453 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
454 } else {
455 0.0
456 };
457 self.report(&format!(
458 "[{:.1}s] embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
459 wall.as_secs_f64(),
460 state.total_chunks,
461 state.total_chunks,
462 elapsed.as_secs_f64(),
463 rate,
464 lock_pct,
465 100.0 - lock_pct,
466 ));
467 }
468 }
469
470 pub fn finish(&self) {
472 if let Self::Active { start, .. } = self {
473 let elapsed = start.elapsed().as_secs_f64();
474 self.report(&format!("[{elapsed:.1}s] total: {elapsed:.1}s"));
475 }
476 }
477}
478
479pub struct PhaseGuard<'a> {
481 profiler: &'a Profiler,
482 name: &'static str,
483 start: Instant,
484 detail: Cell<Option<String>>,
485}
486
487impl PhaseGuard<'_> {
488 pub fn set_detail(&self, detail: String) {
490 self.detail.set(Some(detail));
491 }
492}
493
494impl Drop for PhaseGuard<'_> {
495 fn drop(&mut self) {
496 if let Profiler::Active { start, .. } = self.profiler {
497 let elapsed = self.start.elapsed();
498 let wall = start.elapsed();
499 let msg = if let Some(detail) = self.detail.take() {
500 format!(
501 "[{:.3}s] {}: {} in {:.1?}",
502 wall.as_secs_f64(),
503 self.name,
504 detail,
505 elapsed,
506 )
507 } else {
508 format!(
509 "[{:.3}s] {}: {:.1?}",
510 wall.as_secs_f64(),
511 self.name,
512 elapsed,
513 )
514 };
515 self.profiler.report(&msg);
516 }
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use std::time::Duration;
524
525 #[test]
526 fn profiler_is_sync() {
527 fn assert_sync<T: Sync>() {}
528 assert_sync::<Profiler>();
529 }
530
531 #[test]
532 fn noop_profiler_all_methods_are_safe() {
533 let p = Profiler::noop();
534 p.header("0.1.0", "test/repo", 4, 8);
535 {
536 let g = p.phase("test_phase");
537 g.set_detail("some detail".to_string());
538 }
539 p.chunk_thread_report(10);
540 p.chunk_summary(100, 50, Duration::from_millis(89));
541 p.embed_begin(100);
542 p.embed_tick(1);
543 p.embed_lock_wait(Duration::from_millis(1));
544 p.embed_inference(Duration::from_millis(10));
545 p.embed_done();
546 p.finish();
547 }
548
549 #[test]
550 fn active_profiler_phase_guard_formats_correctly() {
551 let p = Profiler::new(true, Duration::from_secs(10));
552 {
553 let g = p.phase("test_phase");
554 std::thread::sleep(Duration::from_millis(5));
555 g.set_detail("42 items".to_string());
556 }
557 }
559
560 #[test]
561 fn embed_tick_respects_interval() {
562 let p = Profiler::new(true, Duration::from_secs(100)); p.embed_begin(10);
564 for i in 1..=10 {
565 p.embed_tick(i);
566 }
568 p.embed_done();
569 }
570
571 #[test]
572 fn chunk_summary_with_zero_files() {
573 let p = Profiler::new(true, Duration::from_secs(10));
574 p.chunk_summary(0, 0, Duration::from_millis(0));
575 }
577
578 #[test]
579 fn embed_done_with_zero_chunks() {
580 let p = Profiler::new(true, Duration::from_secs(10));
581 p.embed_begin(0);
582 p.embed_done();
583 }
585}