1use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24 pub done: usize,
26 pub total: usize,
28 pub window_rate: f64,
30 pub overall_rate: f64,
32 pub lock_wait_pct: f64,
34 pub inference_pct: f64,
36}
37
38type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42 clippy::large_enum_variant,
43 reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46 #[expect(
48 private_interfaces,
49 reason = "EmbedState is intentionally opaque; callers cannot name it"
50 )]
51 Active {
52 start: Instant,
54 interval: Duration,
56 embed: Mutex<EmbedState>,
58 chunk_counts: Mutex<Vec<usize>>,
60 on_progress: Option<ProgressCallback>,
62 on_embed_tick: Option<EmbedTickCallback>,
64 },
65 Noop,
67}
68
69pub(crate) struct EmbedState {
70 phase_start: Instant,
71 last_report: Instant,
72 chunks_at_last_report: usize,
73 last_tick: Instant,
75 chunks_at_last_tick: usize,
77 total_lock_wait: Duration,
78 total_inference: Duration,
79 total_chunks: usize,
80}
81
82impl EmbedState {
83 fn new() -> Self {
84 let now = Instant::now();
85 Self {
86 phase_start: now,
87 last_report: now,
88 chunks_at_last_report: 0,
89 last_tick: now,
90 chunks_at_last_tick: 0,
91 total_lock_wait: Duration::ZERO,
92 total_inference: Duration::ZERO,
93 total_chunks: 0,
94 }
95 }
96}
97
98impl Profiler {
99 #[must_use]
101 pub fn new(enabled: bool, interval: Duration) -> Self {
102 if enabled {
103 Self::Active {
104 start: Instant::now(),
105 interval,
106 embed: Mutex::new(EmbedState::new()),
107 chunk_counts: Mutex::new(Vec::new()),
108 on_progress: None,
109 on_embed_tick: None,
110 }
111 } else {
112 Self::Noop
113 }
114 }
115
116 #[must_use]
120 pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121 Self::Active {
122 start: Instant::now(),
123 interval,
124 embed: Mutex::new(EmbedState::new()),
125 chunk_counts: Mutex::new(Vec::new()),
126 on_progress: Some(Box::new(cb)),
127 on_embed_tick: None,
128 }
129 }
130
131 #[must_use]
136 pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137 if let Self::Active {
138 ref mut on_embed_tick,
139 ..
140 } = self
141 {
142 *on_embed_tick = Some(Box::new(cb));
143 }
144 self
145 }
146
147 #[must_use]
149 pub fn noop() -> Self {
150 Self::Noop
151 }
152
153 fn report(&self, msg: &str) {
155 if let Self::Active { on_progress, .. } = self {
156 if let Some(cb) = on_progress {
157 cb(msg);
158 } else {
159 eprintln!("{msg}");
160 }
161 }
162 }
163
164 pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166 if let Self::Active { .. } = self {
167 self.report(&format!(
168 "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169 ));
170 }
171 }
172
173 #[must_use]
175 pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176 PhaseGuard {
177 profiler: self,
178 name,
179 start: Instant::now(),
180 detail: Cell::new(None),
181 }
182 }
183
184 pub fn chunk_thread_report(&self, n: usize) {
186 if let Self::Active { chunk_counts, .. } = self
187 && let Ok(mut counts) = chunk_counts.lock()
188 {
189 let idx = rayon::current_thread_index().unwrap_or(0);
190 if counts.len() <= idx {
191 counts.resize(idx + 1, 0);
192 }
193 counts[idx] += n;
194 }
195 }
196
197 pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
199 if let Self::Active {
200 start,
201 chunk_counts,
202 ..
203 } = self
204 {
205 let wall = start.elapsed();
206 if let Ok(counts) = chunk_counts.lock() {
207 let active = counts.iter().filter(|&&c| c > 0).count();
208 let pool_size = rayon::current_num_threads();
209 if active > 0 {
210 let min = counts
211 .iter()
212 .filter(|&&c| c > 0)
213 .min()
214 .copied()
215 .unwrap_or(0);
216 let max = counts.iter().max().copied().unwrap_or(0);
217 self.report(&format!(
218 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
219 wall.as_secs_f64(),
220 total_chunks,
221 total_files,
222 elapsed,
223 pool_size,
224 active,
225 min,
226 max,
227 ));
228 } else {
229 self.report(&format!(
230 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads)",
231 wall.as_secs_f64(),
232 total_chunks,
233 total_files,
234 elapsed,
235 pool_size,
236 ));
237 }
238 }
239 }
240 }
241
242 pub fn embed_begin(&self, total: usize) {
244 if let Self::Active { embed, .. } = self
245 && let Ok(mut state) = embed.lock()
246 {
247 let now = Instant::now();
248 state.phase_start = now;
249 state.last_report = now;
250 state.chunks_at_last_report = 0;
251 state.total_lock_wait = Duration::ZERO;
252 state.total_inference = Duration::ZERO;
253 state.total_chunks = total;
254 }
255 }
256
257 pub fn embed_begin_update_total(&self, total: usize) {
263 if let Self::Active { embed, .. } = self
264 && let Ok(mut state) = embed.lock()
265 && total > state.total_chunks
266 {
267 state.total_chunks = total;
268 }
269 }
270
271 #[expect(
273 clippy::cast_precision_loss,
274 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
275 )]
276 pub fn embed_tick(&self, done: usize) {
277 if let Self::Active {
278 start,
279 interval,
280 embed,
281 on_embed_tick,
282 ..
283 } = self
284 {
285 let Ok(mut state) = embed.lock() else {
286 return;
287 };
288 let now = Instant::now();
289 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
290 let overall_rate = if overall_elapsed > 0.0 {
291 done as f64 / overall_elapsed
292 } else {
293 0.0
294 };
295 let total_timing = state.total_lock_wait + state.total_inference;
296 let lock_pct = if total_timing.as_nanos() > 0 {
297 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
298 } else {
299 0.0
300 };
301
302 if let Some(cb) = on_embed_tick {
305 let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
306 let tick_chunks = done - state.chunks_at_last_tick;
307 let batch_rate = if tick_elapsed > 0.0 {
308 tick_chunks as f64 / tick_elapsed
309 } else {
310 overall_rate
311 };
312 state.last_tick = now;
313 state.chunks_at_last_tick = done;
314
315 cb(&EmbedProgress {
316 done,
317 total: state.total_chunks,
318 window_rate: batch_rate,
319 overall_rate,
320 lock_wait_pct: lock_pct,
321 inference_pct: 1.0 - lock_pct,
322 });
323 }
324
325 if now.duration_since(state.last_report) >= *interval {
327 let wall = start.elapsed();
328 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
329 let report_chunks = done - state.chunks_at_last_report;
330 let window_rate = if report_elapsed > 0.0 {
331 report_chunks as f64 / report_elapsed
332 } else {
333 0.0
334 };
335 self.report(&format!(
336 "[{:.1}s] embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
337 wall.as_secs_f64(),
338 done,
339 state.total_chunks,
340 report_elapsed,
341 window_rate,
342 overall_rate,
343 lock_pct * 100.0,
344 (1.0 - lock_pct) * 100.0,
345 ));
346 state.last_report = now;
347 state.chunks_at_last_report = done;
348 }
349 }
350 }
351
352 #[expect(
357 clippy::cast_precision_loss,
358 reason = "display-only: sub-1% precision loss acceptable for MB/rate"
359 )]
360 pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
361 if let Self::Active {
362 start,
363 interval,
364 embed,
365 ..
366 } = self
367 {
368 let Ok(mut state) = embed.lock() else {
369 return;
370 };
371 let now = Instant::now();
372 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
373 let overall_rate = if overall_elapsed > 0.0 {
374 done_chunks as f64 / overall_elapsed
375 } else {
376 0.0
377 };
378
379 if now.duration_since(state.last_report) >= *interval {
380 let wall = start.elapsed();
381 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
382 let report_chunks = done_chunks - state.chunks_at_last_report;
383 let window_rate = if report_elapsed > 0.0 {
384 report_chunks as f64 / report_elapsed
385 } else {
386 0.0
387 };
388 let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
389 let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
390 self.report(&format!(
391 "[{:.1}s] embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
392 wall.as_secs_f64(),
393 mb_done,
394 mb_total,
395 report_elapsed,
396 window_rate,
397 overall_rate,
398 ));
399 state.last_report = now;
400 state.chunks_at_last_report = done_chunks;
401 }
402 }
403 }
404
405 pub fn embed_lock_wait(&self, duration: Duration) {
407 if let Self::Active { embed, .. } = self
408 && let Ok(mut state) = embed.lock()
409 {
410 state.total_lock_wait += duration;
411 }
412 }
413
414 pub fn embed_inference(&self, duration: Duration) {
416 if let Self::Active { embed, .. } = self
417 && let Ok(mut state) = embed.lock()
418 {
419 state.total_inference += duration;
420 }
421 }
422
423 #[expect(
425 clippy::cast_precision_loss,
426 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
427 )]
428 pub fn embed_done(&self) {
429 if let Self::Active { start, embed, .. } = self
430 && let Ok(state) = embed.lock()
431 {
432 let wall = start.elapsed();
433 if state.total_chunks == 0 {
434 self.report(&format!(
435 "[{:.1}s] embed: skipped (0 chunks)",
436 wall.as_secs_f64()
437 ));
438 return;
439 }
440 let elapsed = Instant::now().duration_since(state.phase_start);
441 let rate = if elapsed.as_secs_f64() > 0.0 {
442 state.total_chunks as f64 / elapsed.as_secs_f64()
443 } else {
444 0.0
445 };
446 let total_timing = state.total_lock_wait + state.total_inference;
447 let lock_pct = if total_timing.as_nanos() > 0 {
448 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
449 } else {
450 0.0
451 };
452 self.report(&format!(
453 "[{:.1}s] embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
454 wall.as_secs_f64(),
455 state.total_chunks,
456 state.total_chunks,
457 elapsed.as_secs_f64(),
458 rate,
459 lock_pct,
460 100.0 - lock_pct,
461 ));
462 }
463 }
464
465 pub fn finish(&self) {
467 if let Self::Active { start, .. } = self {
468 let elapsed = start.elapsed().as_secs_f64();
469 self.report(&format!("[{elapsed:.1}s] total: {elapsed:.1}s"));
470 }
471 }
472}
473
474pub struct PhaseGuard<'a> {
476 profiler: &'a Profiler,
477 name: &'static str,
478 start: Instant,
479 detail: Cell<Option<String>>,
480}
481
482impl PhaseGuard<'_> {
483 pub fn set_detail(&self, detail: String) {
485 self.detail.set(Some(detail));
486 }
487}
488
489impl Drop for PhaseGuard<'_> {
490 fn drop(&mut self) {
491 if let Profiler::Active { start, .. } = self.profiler {
492 let elapsed = self.start.elapsed();
493 let wall = start.elapsed();
494 let msg = if let Some(detail) = self.detail.take() {
495 format!(
496 "[{:.3}s] {}: {} in {:.1?}",
497 wall.as_secs_f64(),
498 self.name,
499 detail,
500 elapsed,
501 )
502 } else {
503 format!(
504 "[{:.3}s] {}: {:.1?}",
505 wall.as_secs_f64(),
506 self.name,
507 elapsed,
508 )
509 };
510 self.profiler.report(&msg);
511 }
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use std::time::Duration;
519
520 #[test]
521 fn profiler_is_sync() {
522 fn assert_sync<T: Sync>() {}
523 assert_sync::<Profiler>();
524 }
525
526 #[test]
527 fn noop_profiler_all_methods_are_safe() {
528 let p = Profiler::noop();
529 p.header("0.1.0", "test/repo", 4, 8);
530 {
531 let g = p.phase("test_phase");
532 g.set_detail("some detail".to_string());
533 }
534 p.chunk_thread_report(10);
535 p.chunk_summary(100, 50, Duration::from_millis(89));
536 p.embed_begin(100);
537 p.embed_tick(1);
538 p.embed_lock_wait(Duration::from_millis(1));
539 p.embed_inference(Duration::from_millis(10));
540 p.embed_done();
541 p.finish();
542 }
543
544 #[test]
545 fn active_profiler_phase_guard_formats_correctly() {
546 let p = Profiler::new(true, Duration::from_secs(10));
547 {
548 let g = p.phase("test_phase");
549 std::thread::sleep(Duration::from_millis(5));
550 g.set_detail("42 items".to_string());
551 }
552 }
554
555 #[test]
556 fn embed_tick_respects_interval() {
557 let p = Profiler::new(true, Duration::from_secs(100)); p.embed_begin(10);
559 for i in 1..=10 {
560 p.embed_tick(i);
561 }
563 p.embed_done();
564 }
565
566 #[test]
567 fn chunk_summary_with_zero_files() {
568 let p = Profiler::new(true, Duration::from_secs(10));
569 p.chunk_summary(0, 0, Duration::from_millis(0));
570 }
572
573 #[test]
574 fn embed_done_with_zero_chunks() {
575 let p = Profiler::new(true, Duration::from_secs(10));
576 p.embed_begin(0);
577 p.embed_done();
578 }
580}