1use std::cell::Cell;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11type ProgressCallback = Box<dyn Fn(&str) + Send + Sync>;
20
21#[derive(Debug, Clone)]
23pub struct EmbedProgress {
24 pub done: usize,
26 pub total: usize,
28 pub window_rate: f64,
30 pub overall_rate: f64,
32 pub lock_wait_pct: f64,
34 pub inference_pct: f64,
36}
37
38type EmbedTickCallback = Box<dyn Fn(&EmbedProgress) + Send + Sync>;
40
41#[expect(
42 clippy::large_enum_variant,
43 reason = "Active is the common case; Noop is only for --profile=false"
44)]
45pub enum Profiler {
46 #[expect(
48 private_interfaces,
49 reason = "EmbedState is intentionally opaque; callers cannot name it"
50 )]
51 Active {
52 start: Instant,
54 interval: Duration,
56 embed: Mutex<EmbedState>,
58 chunk_counts: Mutex<Vec<usize>>,
60 on_progress: Option<ProgressCallback>,
62 on_embed_tick: Option<EmbedTickCallback>,
64 },
65 Noop,
67}
68
69pub(crate) struct EmbedState {
70 phase_start: Instant,
71 last_report: Instant,
72 chunks_at_last_report: usize,
73 last_tick: Instant,
75 chunks_at_last_tick: usize,
77 total_lock_wait: Duration,
78 total_inference: Duration,
79 total_chunks: usize,
80}
81
82impl EmbedState {
83 fn new() -> Self {
84 let now = Instant::now();
85 Self {
86 phase_start: now,
87 last_report: now,
88 chunks_at_last_report: 0,
89 last_tick: now,
90 chunks_at_last_tick: 0,
91 total_lock_wait: Duration::ZERO,
92 total_inference: Duration::ZERO,
93 total_chunks: 0,
94 }
95 }
96}
97
98impl Profiler {
99 #[must_use]
101 pub fn new(enabled: bool, interval: Duration) -> Self {
102 if enabled {
103 Self::Active {
104 start: Instant::now(),
105 interval,
106 embed: Mutex::new(EmbedState::new()),
107 chunk_counts: Mutex::new(Vec::new()),
108 on_progress: None,
109 on_embed_tick: None,
110 }
111 } else {
112 Self::Noop
113 }
114 }
115
116 #[must_use]
120 pub fn with_callback(interval: Duration, cb: impl Fn(&str) + Send + Sync + 'static) -> Self {
121 Self::Active {
122 start: Instant::now(),
123 interval,
124 embed: Mutex::new(EmbedState::new()),
125 chunk_counts: Mutex::new(Vec::new()),
126 on_progress: Some(Box::new(cb)),
127 on_embed_tick: None,
128 }
129 }
130
131 #[must_use]
136 pub fn with_embed_tick(mut self, cb: impl Fn(&EmbedProgress) + Send + Sync + 'static) -> Self {
137 if let Self::Active {
138 ref mut on_embed_tick,
139 ..
140 } = self
141 {
142 *on_embed_tick = Some(Box::new(cb));
143 }
144 self
145 }
146
147 #[must_use]
149 pub fn noop() -> Self {
150 Self::Noop
151 }
152
153 fn report(&self, msg: &str) {
155 if let Self::Active { on_progress, .. } = self {
156 if let Some(cb) = on_progress {
157 cb(msg);
158 } else {
159 eprintln!("{msg}");
160 }
161 }
162 }
163
164 pub fn header(&self, version: &str, model_repo: &str, threads: usize, cores: usize) {
166 if let Self::Active { .. } = self {
167 self.report(&format!(
168 "[profile] ripvec {version} | {cores}-core | rayon: {threads} threads | model: {model_repo}",
169 ));
170 }
171 }
172
173 #[must_use]
175 pub fn phase(&self, name: &'static str) -> PhaseGuard<'_> {
176 if let Self::Active { start, .. } = self {
177 self.report(&format!(
178 "[{:.3}s] {name}: starting",
179 start.elapsed().as_secs_f64(),
180 ));
181 }
182 PhaseGuard {
183 profiler: self,
184 name,
185 start: Instant::now(),
186 detail: Cell::new(None),
187 }
188 }
189
190 pub fn chunk_thread_report(&self, n: usize) {
192 if let Self::Active { chunk_counts, .. } = self
193 && let Ok(mut counts) = chunk_counts.lock()
194 {
195 let idx = rayon::current_thread_index().unwrap_or(0);
196 if counts.len() <= idx {
197 counts.resize(idx + 1, 0);
198 }
199 counts[idx] += n;
200 }
201 }
202
203 pub fn chunk_summary(&self, total_chunks: usize, total_files: usize, elapsed: Duration) {
205 if let Self::Active {
206 start,
207 chunk_counts,
208 ..
209 } = self
210 {
211 let wall = start.elapsed();
212 if let Ok(counts) = chunk_counts.lock() {
213 let active = counts.iter().filter(|&&c| c > 0).count();
214 let pool_size = rayon::current_num_threads();
215 if active > 0 {
216 let min = counts
217 .iter()
218 .filter(|&&c| c > 0)
219 .min()
220 .copied()
221 .unwrap_or(0);
222 let max = counts.iter().max().copied().unwrap_or(0);
223 self.report(&format!(
224 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads, {} active, skew: {}-{} chunks/thread)",
225 wall.as_secs_f64(),
226 total_chunks,
227 total_files,
228 elapsed,
229 pool_size,
230 active,
231 min,
232 max,
233 ));
234 } else {
235 self.report(&format!(
236 "[{:.1}s] chunk: {} chunks from {} files in {:.0?} ({} threads)",
237 wall.as_secs_f64(),
238 total_chunks,
239 total_files,
240 elapsed,
241 pool_size,
242 ));
243 }
244 }
245 }
246 }
247
248 pub fn embed_begin(&self, total: usize) {
250 if let Self::Active { embed, .. } = self
251 && let Ok(mut state) = embed.lock()
252 {
253 let now = Instant::now();
254 state.phase_start = now;
255 state.last_report = now;
256 state.chunks_at_last_report = 0;
257 state.total_lock_wait = Duration::ZERO;
258 state.total_inference = Duration::ZERO;
259 state.total_chunks = total;
260 }
261 }
262
263 pub fn embed_begin_update_total(&self, total: usize) {
269 if let Self::Active { embed, .. } = self
270 && let Ok(mut state) = embed.lock()
271 && total > state.total_chunks
272 {
273 state.total_chunks = total;
274 }
275 }
276
277 #[expect(
279 clippy::cast_precision_loss,
280 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
281 )]
282 pub fn embed_tick(&self, done: usize) {
283 if let Self::Active {
284 start,
285 interval,
286 embed,
287 on_embed_tick,
288 ..
289 } = self
290 {
291 let Ok(mut state) = embed.lock() else {
292 return;
293 };
294 let now = Instant::now();
295 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
296 let overall_rate = if overall_elapsed > 0.0 {
297 done as f64 / overall_elapsed
298 } else {
299 0.0
300 };
301 let total_timing = state.total_lock_wait + state.total_inference;
302 let lock_pct = if total_timing.as_nanos() > 0 {
303 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64
304 } else {
305 0.0
306 };
307
308 if let Some(cb) = on_embed_tick {
311 let tick_elapsed = now.duration_since(state.last_tick).as_secs_f64();
312 let tick_chunks = done - state.chunks_at_last_tick;
313 let batch_rate = if tick_elapsed > 0.0 {
314 tick_chunks as f64 / tick_elapsed
315 } else {
316 overall_rate
317 };
318 state.last_tick = now;
319 state.chunks_at_last_tick = done;
320
321 cb(&EmbedProgress {
322 done,
323 total: state.total_chunks,
324 window_rate: batch_rate,
325 overall_rate,
326 lock_wait_pct: lock_pct,
327 inference_pct: 1.0 - lock_pct,
328 });
329 }
330
331 if now.duration_since(state.last_report) >= *interval {
333 let wall = start.elapsed();
334 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
335 let report_chunks = done - state.chunks_at_last_report;
336 let window_rate = if report_elapsed > 0.0 {
337 report_chunks as f64 / report_elapsed
338 } else {
339 0.0
340 };
341 self.report(&format!(
342 "[{:.1}s] embed: {}/{} (last {:.0}s: {:.1}/s, overall: {:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
343 wall.as_secs_f64(),
344 done,
345 state.total_chunks,
346 report_elapsed,
347 window_rate,
348 overall_rate,
349 lock_pct * 100.0,
350 (1.0 - lock_pct) * 100.0,
351 ));
352 state.last_report = now;
353 state.chunks_at_last_report = done;
354 }
355 }
356 }
357
358 #[expect(
363 clippy::cast_precision_loss,
364 reason = "display-only: sub-1% precision loss acceptable for MB/rate"
365 )]
366 pub fn embed_tick_bytes(&self, done_chunks: usize, bytes_processed: u64, total_bytes: u64) {
367 if let Self::Active {
368 start,
369 interval,
370 embed,
371 ..
372 } = self
373 {
374 let Ok(mut state) = embed.lock() else {
375 return;
376 };
377 let now = Instant::now();
378 let overall_elapsed = now.duration_since(state.phase_start).as_secs_f64();
379 let overall_rate = if overall_elapsed > 0.0 {
380 done_chunks as f64 / overall_elapsed
381 } else {
382 0.0
383 };
384
385 if now.duration_since(state.last_report) >= *interval {
386 let wall = start.elapsed();
387 let report_elapsed = now.duration_since(state.last_report).as_secs_f64();
388 let report_chunks = done_chunks - state.chunks_at_last_report;
389 let window_rate = if report_elapsed > 0.0 {
390 report_chunks as f64 / report_elapsed
391 } else {
392 0.0
393 };
394 let mb_done = bytes_processed as f64 / (1024.0 * 1024.0);
395 let mb_total = total_bytes as f64 / (1024.0 * 1024.0);
396 self.report(&format!(
397 "[{:.1}s] embed: {:.1}/{:.1} MB (last {:.0}s: {:.1}/s, overall: {:.1}/s)",
398 wall.as_secs_f64(),
399 mb_done,
400 mb_total,
401 report_elapsed,
402 window_rate,
403 overall_rate,
404 ));
405 state.last_report = now;
406 state.chunks_at_last_report = done_chunks;
407 }
408 }
409 }
410
411 pub fn embed_lock_wait(&self, duration: Duration) {
413 if let Self::Active { embed, .. } = self
414 && let Ok(mut state) = embed.lock()
415 {
416 state.total_lock_wait += duration;
417 }
418 }
419
420 pub fn embed_inference(&self, duration: Duration) {
422 if let Self::Active { embed, .. } = self
423 && let Ok(mut state) = embed.lock()
424 {
425 state.total_inference += duration;
426 }
427 }
428
429 #[expect(
431 clippy::cast_precision_loss,
432 reason = "display-only rate/percentage calculations; sub-1% precision loss is acceptable"
433 )]
434 pub fn embed_done(&self) {
435 if let Self::Active { start, embed, .. } = self
436 && let Ok(state) = embed.lock()
437 {
438 let wall = start.elapsed();
439 if state.total_chunks == 0 {
440 self.report(&format!(
441 "[{:.1}s] embed: skipped (0 chunks)",
442 wall.as_secs_f64()
443 ));
444 return;
445 }
446 let elapsed = Instant::now().duration_since(state.phase_start);
447 let rate = if elapsed.as_secs_f64() > 0.0 {
448 state.total_chunks as f64 / elapsed.as_secs_f64()
449 } else {
450 0.0
451 };
452 let total_timing = state.total_lock_wait + state.total_inference;
453 let lock_pct = if total_timing.as_nanos() > 0 {
454 state.total_lock_wait.as_nanos() as f64 / total_timing.as_nanos() as f64 * 100.0
455 } else {
456 0.0
457 };
458 self.report(&format!(
459 "[{:.1}s] embed: {}/{} done in {:.1}s ({:.1}/s) lock_wait: {:.0}% inference: {:.0}%",
460 wall.as_secs_f64(),
461 state.total_chunks,
462 state.total_chunks,
463 elapsed.as_secs_f64(),
464 rate,
465 lock_pct,
466 100.0 - lock_pct,
467 ));
468 }
469 }
470
471 pub fn finish(&self) {
473 if let Self::Active { start, .. } = self {
474 let elapsed = start.elapsed().as_secs_f64();
475 self.report(&format!("[{elapsed:.1}s] total: {elapsed:.1}s"));
476 }
477 }
478}
479
480pub struct PhaseGuard<'a> {
482 profiler: &'a Profiler,
483 name: &'static str,
484 start: Instant,
485 detail: Cell<Option<String>>,
486}
487
488impl PhaseGuard<'_> {
489 pub fn set_detail(&self, detail: String) {
491 self.detail.set(Some(detail));
492 }
493}
494
495impl Drop for PhaseGuard<'_> {
496 fn drop(&mut self) {
497 if let Profiler::Active { start, .. } = self.profiler {
498 let elapsed = self.start.elapsed();
499 let wall = start.elapsed();
500 let msg = if let Some(detail) = self.detail.take() {
501 format!(
502 "[{:.3}s] {}: {} in {:.1?}",
503 wall.as_secs_f64(),
504 self.name,
505 detail,
506 elapsed,
507 )
508 } else {
509 format!(
510 "[{:.3}s] {}: {:.1?}",
511 wall.as_secs_f64(),
512 self.name,
513 elapsed,
514 )
515 };
516 self.profiler.report(&msg);
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use std::time::Duration;
525
526 #[test]
527 fn profiler_is_sync() {
528 fn assert_sync<T: Sync>() {}
529 assert_sync::<Profiler>();
530 }
531
532 #[test]
533 fn noop_profiler_all_methods_are_safe() {
534 let p = Profiler::noop();
535 p.header("0.1.0", "test/repo", 4, 8);
536 {
537 let g = p.phase("test_phase");
538 g.set_detail("some detail".to_string());
539 }
540 p.chunk_thread_report(10);
541 p.chunk_summary(100, 50, Duration::from_millis(89));
542 p.embed_begin(100);
543 p.embed_tick(1);
544 p.embed_lock_wait(Duration::from_millis(1));
545 p.embed_inference(Duration::from_millis(10));
546 p.embed_done();
547 p.finish();
548 }
549
550 #[test]
551 fn active_profiler_phase_guard_formats_correctly() {
552 let p = Profiler::new(true, Duration::from_secs(10));
553 {
554 let g = p.phase("test_phase");
555 std::thread::sleep(Duration::from_millis(5));
556 g.set_detail("42 items".to_string());
557 }
558 }
560
561 #[test]
562 fn embed_tick_respects_interval() {
563 let p = Profiler::new(true, Duration::from_secs(100)); p.embed_begin(10);
565 for i in 1..=10 {
566 p.embed_tick(i);
567 }
569 p.embed_done();
570 }
571
572 #[test]
573 fn chunk_summary_with_zero_files() {
574 let p = Profiler::new(true, Duration::from_secs(10));
575 p.chunk_summary(0, 0, Duration::from_millis(0));
576 }
578
579 #[test]
580 fn embed_done_with_zero_chunks() {
581 let p = Profiler::new(true, Duration::from_secs(10));
582 p.embed_begin(0);
583 p.embed_done();
584 }
586}