sqry_core/graph/unified/build/
progress.rs1use std::panic;
38use std::sync::Mutex;
39use std::sync::atomic::{AtomicUsize, Ordering};
40use std::time::{Duration, Instant};
41
42use crate::progress::{IndexProgress, SharedReporter};
43
44fn safe_report(reporter: &SharedReporter, event: IndexProgress) {
49 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
50 reporter.report(event);
51 }));
52
53 if let Err(e) = result {
54 let msg = if let Some(s) = e.downcast_ref::<&str>() {
56 (*s).to_string()
57 } else if let Some(s) = e.downcast_ref::<String>() {
58 s.clone()
59 } else {
60 "unknown panic".to_string()
61 };
62 log::warn!("Progress reporter panicked (ignored): {msg}");
63 }
64}
65
66const MIN_UPDATE_INTERVAL: Duration = Duration::from_millis(17);
68
69pub struct GraphBuildProgressTracker {
74 reporter: SharedReporter,
76
77 phase_state: Mutex<PhaseState>,
79
80 items_processed: AtomicUsize,
82
83 total_items: AtomicUsize,
85}
86
87struct PhaseState {
89 phase_number: u8,
91
92 phase_name: &'static str,
94
95 phase_start: Instant,
97
98 last_update: Instant,
100}
101
102impl Default for PhaseState {
103 fn default() -> Self {
104 Self {
105 phase_number: 0,
106 phase_name: "",
107 phase_start: Instant::now(),
108 last_update: Instant::now(),
109 }
110 }
111}
112
113impl GraphBuildProgressTracker {
114 #[must_use]
116 pub fn new(reporter: SharedReporter) -> Self {
117 Self {
118 reporter,
119 phase_state: Mutex::new(PhaseState::default()),
120 items_processed: AtomicUsize::new(0),
121 total_items: AtomicUsize::new(0),
122 }
123 }
124
125 pub fn start_phase(&self, phase_number: u8, phase_name: &'static str, total_items: usize) {
137 self.items_processed.store(0, Ordering::SeqCst);
139 self.total_items.store(total_items, Ordering::SeqCst);
140
141 {
143 let mut state = self.phase_state.lock().unwrap();
144 state.phase_number = phase_number;
145 state.phase_name = phase_name;
146 state.phase_start = Instant::now();
147 state.last_update = Instant::now();
148 }
149
150 safe_report(
152 &self.reporter,
153 IndexProgress::GraphPhaseStarted {
154 phase_number,
155 phase_name,
156 total_items,
157 },
158 );
159 }
160
161 pub fn increment_progress(&self) {
166 self.add_progress(1);
167 }
168
169 pub fn add_progress(&self, count: usize) {
173 let new_count = self.items_processed.fetch_add(count, Ordering::SeqCst) + count;
174 self.maybe_emit_progress(new_count);
175 }
176
177 fn maybe_emit_progress(&self, items_processed: usize) {
179 let total = self.total_items.load(Ordering::SeqCst);
180
181 let emit_info = {
184 let Ok(mut state) = self.phase_state.try_lock() else {
185 return;
187 };
188
189 let now = Instant::now();
190 if now.duration_since(state.last_update) >= MIN_UPDATE_INTERVAL {
191 state.last_update = now;
192 Some(state.phase_number)
193 } else {
194 None
195 }
196 };
197
198 if let Some(phase_number) = emit_info {
199 safe_report(
200 &self.reporter,
201 IndexProgress::GraphPhaseProgress {
202 phase_number,
203 items_processed,
204 total_items: total,
205 },
206 );
207 }
208 }
209
210 pub fn complete_phase(&self) {
216 let (phase_number, phase_name, phase_duration) = {
217 let state = self.phase_state.lock().unwrap();
218 (
219 state.phase_number,
220 state.phase_name,
221 state.phase_start.elapsed(),
222 )
223 };
224
225 safe_report(
226 &self.reporter,
227 IndexProgress::GraphPhaseCompleted {
228 phase_number,
229 phase_name,
230 phase_duration,
231 },
232 );
233 }
234
235 pub fn start_saving(&self, component_name: &'static str) {
237 safe_report(
238 &self.reporter,
239 IndexProgress::SavingStarted { component_name },
240 );
241 }
242
243 pub fn complete_saving(&self, component_name: &'static str, save_duration: Duration) {
245 safe_report(
246 &self.reporter,
247 IndexProgress::SavingCompleted {
248 component_name,
249 save_duration,
250 },
251 );
252 }
253
254 #[cfg(test)]
256 pub fn current_progress(&self) -> usize {
257 self.items_processed.load(Ordering::SeqCst)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::progress::no_op_reporter;
265 use std::sync::Arc;
266
267 struct EventCapture {
268 events: Mutex<Vec<IndexProgress>>,
269 }
270
271 impl EventCapture {
272 fn new() -> Arc<Self> {
273 Arc::new(Self {
274 events: Mutex::new(Vec::new()),
275 })
276 }
277
278 fn events(&self) -> Vec<IndexProgress> {
279 self.events.lock().unwrap().clone()
280 }
281
282 fn event_count(&self) -> usize {
283 self.events.lock().unwrap().len()
284 }
285 }
286
287 impl crate::progress::ProgressReporter for EventCapture {
288 fn report(&self, event: IndexProgress) {
289 self.events.lock().unwrap().push(event);
290 }
291 }
292
293 #[test]
294 fn test_phase_lifecycle() {
295 let capture = EventCapture::new();
296 let tracker = GraphBuildProgressTracker::new(capture.clone());
297
298 tracker.start_phase(1, "Test phase", 100);
299 tracker.complete_phase();
300
301 let events = capture.events();
302 assert_eq!(events.len(), 2);
303 assert!(matches!(
304 events[0],
305 IndexProgress::GraphPhaseStarted {
306 phase_number: 1,
307 phase_name: "Test phase",
308 total_items: 100
309 }
310 ));
311 assert!(matches!(
312 events[1],
313 IndexProgress::GraphPhaseCompleted {
314 phase_number: 1,
315 phase_name: "Test phase",
316 ..
317 }
318 ));
319 }
320
321 #[test]
322 fn test_progress_increment() {
323 let capture = EventCapture::new();
324 let tracker = GraphBuildProgressTracker::new(capture.clone());
325
326 tracker.start_phase(2, "Increment test", 10);
327
328 tracker.increment_progress();
330 assert_eq!(tracker.current_progress(), 1);
331
332 tracker.complete_phase();
333
334 assert!(capture.event_count() >= 2);
336 }
337
338 #[test]
339 fn test_saving_events() {
340 let capture = EventCapture::new();
341 let tracker = GraphBuildProgressTracker::new(capture.clone());
342
343 tracker.start_saving("symbols");
344 tracker.complete_saving("symbols", Duration::from_millis(100));
345
346 let events = capture.events();
347 assert_eq!(events.len(), 2);
348 assert!(matches!(
349 events[0],
350 IndexProgress::SavingStarted {
351 component_name: "symbols"
352 }
353 ));
354 assert!(matches!(
355 events[1],
356 IndexProgress::SavingCompleted {
357 component_name: "symbols",
358 ..
359 }
360 ));
361 }
362
363 #[test]
364 fn test_no_op_reporter_no_panic() {
365 let tracker = GraphBuildProgressTracker::new(no_op_reporter());
366
367 tracker.start_phase(1, "No-op test", 1000);
368 for _ in 0..1000 {
369 tracker.increment_progress();
370 }
371 tracker.complete_phase();
372 }
374
375 #[test]
376 fn test_throttling_limits_updates() {
377 let capture = EventCapture::new();
378 let tracker = GraphBuildProgressTracker::new(capture.clone());
379
380 tracker.start_phase(3, "Throttle test", 10000);
381
382 for _ in 0..1000 {
384 tracker.increment_progress();
385 }
386
387 tracker.complete_phase();
388
389 let progress_events = capture
392 .events()
393 .iter()
394 .filter(|e| matches!(e, IndexProgress::GraphPhaseProgress { .. }))
395 .count();
396
397 assert!(
399 progress_events < 100,
400 "Expected throttling to limit updates"
401 );
402 }
403
404 struct PanickingReporter;
406
407 impl crate::progress::ProgressReporter for PanickingReporter {
408 fn report(&self, _event: IndexProgress) {
409 panic!("Intentional test panic from PanickingReporter");
410 }
411 }
412
413 #[test]
414 fn test_safe_report_catches_panics() {
415 let reporter: SharedReporter = Arc::new(PanickingReporter);
417
418 safe_report(
421 &reporter,
422 IndexProgress::SavingStarted {
423 component_name: "test",
424 },
425 );
426
427 }
429
430 #[test]
431 fn test_tracker_with_panicking_reporter_continues() {
432 let tracker = GraphBuildProgressTracker::new(Arc::new(PanickingReporter));
434
435 tracker.start_phase(1, "Panic test", 100);
437 tracker.increment_progress();
438 tracker.add_progress(5);
439 tracker.complete_phase();
440 tracker.start_saving("test");
441 tracker.complete_saving("test", Duration::from_millis(10));
442
443 assert_eq!(tracker.current_progress(), 6);
445 }
446}