Skip to main content

sqry_core/graph/unified/build/
progress.rs

1//! Progress reporting for graph build phases.
2//!
3//! This module provides utilities for reporting progress during the 4-pass
4//! graph build pipeline with thread-safe aggregation and time-based throttling.
5//!
6//! # Thread Safety
7//!
8//! Graph build passes may run in parallel. This module uses atomic counters
9//! for thread-safe progress aggregation and time-based throttling to avoid
10//! overwhelming the progress reporter (max 60 updates/second).
11//!
12//! # Panic Safety
13//!
14//! Reporter calls are wrapped in `catch_unwind` to ensure that progress
15//! reporting failures never abort the build. Panics are logged and ignored.
16//!
17//! # Example
18//!
19//! ```rust,ignore
20//! use sqry_core::graph::unified::build::progress::GraphBuildProgressTracker;
21//! use sqry_core::progress::no_op_reporter;
22//!
23//! let tracker = GraphBuildProgressTracker::new(no_op_reporter());
24//!
25//! // Start phase 1
26//! tracker.start_phase(1, "AST extraction", 1000);
27//!
28//! // Report progress (throttled)
29//! for i in 0..1000 {
30//!     tracker.increment_progress();
31//! }
32//!
33//! // Complete phase
34//! tracker.complete_phase();
35//! ```
36
37use std::panic;
38use std::sync::Mutex;
39use std::sync::atomic::{AtomicUsize, Ordering};
40use std::time::{Duration, Instant};
41
42use crate::progress::{IndexProgress, SharedReporter};
43
44/// Safely report a progress event, catching any panics.
45///
46/// Progress reporting should never abort a build. If the reporter panics,
47/// the error is logged and the build continues.
48fn safe_report(reporter: &SharedReporter, event: IndexProgress) {
49    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
50        reporter.report(event);
51    }));
52
53    if let Err(e) = result {
54        // Log the panic but don't propagate it
55        let msg = if let Some(s) = e.downcast_ref::<&str>() {
56            (*s).to_string()
57        } else if let Some(s) = e.downcast_ref::<String>() {
58            s.clone()
59        } else {
60            "unknown panic".to_string()
61        };
62        log::warn!("Progress reporter panicked (ignored): {msg}");
63    }
64}
65
66/// Minimum interval between progress updates (16.67ms = 60 Hz max)
67const MIN_UPDATE_INTERVAL: Duration = Duration::from_millis(17);
68
69/// Progress tracker for graph build phases with thread-safe aggregation.
70///
71/// Tracks progress within a single phase and provides time-based throttling
72/// to limit update frequency to 60 Hz or less.
73pub struct GraphBuildProgressTracker {
74    /// Shared progress reporter
75    reporter: SharedReporter,
76
77    /// Current phase state (protected by mutex for phase transitions)
78    phase_state: Mutex<PhaseState>,
79
80    /// Atomic counter for thread-safe progress updates
81    items_processed: AtomicUsize,
82
83    /// Total items in current phase (set at phase start)
84    total_items: AtomicUsize,
85}
86
87/// Internal state for the current phase.
88struct PhaseState {
89    /// Current phase number (1-4), or 0 if no phase active
90    phase_number: u8,
91
92    /// Phase name for display
93    phase_name: &'static str,
94
95    /// When the current phase started
96    phase_start: Instant,
97
98    /// When we last emitted a progress update
99    last_update: Instant,
100}
101
102impl Default for PhaseState {
103    fn default() -> Self {
104        Self {
105            phase_number: 0,
106            phase_name: "",
107            phase_start: Instant::now(),
108            last_update: Instant::now(),
109        }
110    }
111}
112
113impl GraphBuildProgressTracker {
114    /// Create a new progress tracker with the given reporter.
115    #[must_use]
116    pub fn new(reporter: SharedReporter) -> Self {
117        Self {
118            reporter,
119            phase_state: Mutex::new(PhaseState::default()),
120            items_processed: AtomicUsize::new(0),
121            total_items: AtomicUsize::new(0),
122        }
123    }
124
125    /// Start a new graph build phase.
126    ///
127    /// # Arguments
128    ///
129    /// * `phase_number` - Phase number (1-4)
130    /// * `phase_name` - Human-readable phase name
131    /// * `total_items` - Total items to process in this phase
132    ///
133    /// # Panics
134    ///
135    /// Panics if the phase state mutex is poisoned.
136    pub fn start_phase(&self, phase_number: u8, phase_name: &'static str, total_items: usize) {
137        // Reset counters
138        self.items_processed.store(0, Ordering::SeqCst);
139        self.total_items.store(total_items, Ordering::SeqCst);
140
141        // Update phase state
142        {
143            let mut state = self.phase_state.lock().unwrap();
144            state.phase_number = phase_number;
145            state.phase_name = phase_name;
146            state.phase_start = Instant::now();
147            state.last_update = Instant::now();
148        }
149
150        // Report phase start (panic-safe)
151        safe_report(
152            &self.reporter,
153            IndexProgress::GraphPhaseStarted {
154                phase_number,
155                phase_name,
156                total_items,
157            },
158        );
159    }
160
161    /// Increment progress counter by one (thread-safe).
162    ///
163    /// Emits a progress update if enough time has passed since the last update
164    /// (time-based throttling at 60 Hz max).
165    pub fn increment_progress(&self) {
166        self.add_progress(1);
167    }
168
169    /// Add to progress counter (thread-safe).
170    ///
171    /// Emits a progress update if enough time has passed since the last update.
172    pub fn add_progress(&self, count: usize) {
173        let new_count = self.items_processed.fetch_add(count, Ordering::SeqCst) + count;
174        self.maybe_emit_progress(new_count);
175    }
176
177    /// Check if we should emit a progress update (time-based throttling).
178    fn maybe_emit_progress(&self, items_processed: usize) {
179        let total = self.total_items.load(Ordering::SeqCst);
180
181        // Try to acquire lock without blocking (non-contended fast path)
182        // Capture phase_number while holding the lock to avoid drift
183        let emit_info = {
184            let Ok(mut state) = self.phase_state.try_lock() else {
185                // Another thread is updating, skip this update
186                return;
187            };
188
189            let now = Instant::now();
190            if now.duration_since(state.last_update) >= MIN_UPDATE_INTERVAL {
191                state.last_update = now;
192                Some(state.phase_number)
193            } else {
194                None
195            }
196        };
197
198        if let Some(phase_number) = emit_info {
199            safe_report(
200                &self.reporter,
201                IndexProgress::GraphPhaseProgress {
202                    phase_number,
203                    items_processed,
204                    total_items: total,
205                },
206            );
207        }
208    }
209
210    /// Complete the current phase and report duration.
211    ///
212    /// # Panics
213    ///
214    /// Panics if the phase state mutex is poisoned.
215    pub fn complete_phase(&self) {
216        let (phase_number, phase_name, phase_duration) = {
217            let state = self.phase_state.lock().unwrap();
218            (
219                state.phase_number,
220                state.phase_name,
221                state.phase_start.elapsed(),
222            )
223        };
224
225        safe_report(
226            &self.reporter,
227            IndexProgress::GraphPhaseCompleted {
228                phase_number,
229                phase_name,
230                phase_duration,
231            },
232        );
233    }
234
235    /// Report that index saving has started for a component.
236    pub fn start_saving(&self, component_name: &'static str) {
237        safe_report(
238            &self.reporter,
239            IndexProgress::SavingStarted { component_name },
240        );
241    }
242
243    /// Report that index saving has completed for a component.
244    pub fn complete_saving(&self, component_name: &'static str, save_duration: Duration) {
245        safe_report(
246            &self.reporter,
247            IndexProgress::SavingCompleted {
248                component_name,
249                save_duration,
250            },
251        );
252    }
253
254    /// Get the current progress count (for testing).
255    #[cfg(test)]
256    pub fn current_progress(&self) -> usize {
257        self.items_processed.load(Ordering::SeqCst)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::progress::no_op_reporter;
265    use std::sync::Arc;
266
267    struct EventCapture {
268        events: Mutex<Vec<IndexProgress>>,
269    }
270
271    impl EventCapture {
272        fn new() -> Arc<Self> {
273            Arc::new(Self {
274                events: Mutex::new(Vec::new()),
275            })
276        }
277
278        fn events(&self) -> Vec<IndexProgress> {
279            self.events.lock().unwrap().clone()
280        }
281
282        fn event_count(&self) -> usize {
283            self.events.lock().unwrap().len()
284        }
285    }
286
287    impl crate::progress::ProgressReporter for EventCapture {
288        fn report(&self, event: IndexProgress) {
289            self.events.lock().unwrap().push(event);
290        }
291    }
292
293    #[test]
294    fn test_phase_lifecycle() {
295        let capture = EventCapture::new();
296        let tracker = GraphBuildProgressTracker::new(capture.clone());
297
298        tracker.start_phase(1, "Test phase", 100);
299        tracker.complete_phase();
300
301        let events = capture.events();
302        assert_eq!(events.len(), 2);
303        assert!(matches!(
304            events[0],
305            IndexProgress::GraphPhaseStarted {
306                phase_number: 1,
307                phase_name: "Test phase",
308                total_items: 100
309            }
310        ));
311        assert!(matches!(
312            events[1],
313            IndexProgress::GraphPhaseCompleted {
314                phase_number: 1,
315                phase_name: "Test phase",
316                ..
317            }
318        ));
319    }
320
321    #[test]
322    fn test_progress_increment() {
323        let capture = EventCapture::new();
324        let tracker = GraphBuildProgressTracker::new(capture.clone());
325
326        tracker.start_phase(2, "Increment test", 10);
327
328        // First increment should emit (enough time has passed)
329        tracker.increment_progress();
330        assert_eq!(tracker.current_progress(), 1);
331
332        tracker.complete_phase();
333
334        // Should have at least start + complete events
335        assert!(capture.event_count() >= 2);
336    }
337
338    #[test]
339    fn test_saving_events() {
340        let capture = EventCapture::new();
341        let tracker = GraphBuildProgressTracker::new(capture.clone());
342
343        tracker.start_saving("symbols");
344        tracker.complete_saving("symbols", Duration::from_millis(100));
345
346        let events = capture.events();
347        assert_eq!(events.len(), 2);
348        assert!(matches!(
349            events[0],
350            IndexProgress::SavingStarted {
351                component_name: "symbols"
352            }
353        ));
354        assert!(matches!(
355            events[1],
356            IndexProgress::SavingCompleted {
357                component_name: "symbols",
358                ..
359            }
360        ));
361    }
362
363    #[test]
364    fn test_no_op_reporter_no_panic() {
365        let tracker = GraphBuildProgressTracker::new(no_op_reporter());
366
367        tracker.start_phase(1, "No-op test", 1000);
368        for _ in 0..1000 {
369            tracker.increment_progress();
370        }
371        tracker.complete_phase();
372        // Should complete without panic
373    }
374
375    #[test]
376    fn test_throttling_limits_updates() {
377        let capture = EventCapture::new();
378        let tracker = GraphBuildProgressTracker::new(capture.clone());
379
380        tracker.start_phase(3, "Throttle test", 10000);
381
382        // Rapid updates should be throttled
383        for _ in 0..1000 {
384            tracker.increment_progress();
385        }
386
387        tracker.complete_phase();
388
389        // With 17ms throttle interval, 1000 rapid updates should result in
390        // far fewer than 1000 progress events (likely just start + 1-2 + complete)
391        let progress_events = capture
392            .events()
393            .iter()
394            .filter(|e| matches!(e, IndexProgress::GraphPhaseProgress { .. }))
395            .count();
396
397        // Should have significantly fewer progress events than increments
398        assert!(
399            progress_events < 100,
400            "Expected throttling to limit updates"
401        );
402    }
403
404    /// A reporter that panics on every report call.
405    struct PanickingReporter;
406
407    impl crate::progress::ProgressReporter for PanickingReporter {
408        fn report(&self, _event: IndexProgress) {
409            panic!("Intentional test panic from PanickingReporter");
410        }
411    }
412
413    #[test]
414    fn test_safe_report_catches_panics() {
415        // Create a reporter that panics
416        let reporter: SharedReporter = Arc::new(PanickingReporter);
417
418        // safe_report should catch the panic and not propagate it
419        // This should not panic the test
420        safe_report(
421            &reporter,
422            IndexProgress::SavingStarted {
423                component_name: "test",
424            },
425        );
426
427        // If we got here, the panic was successfully caught
428    }
429
430    #[test]
431    fn test_tracker_with_panicking_reporter_continues() {
432        // Create tracker with a panicking reporter
433        let tracker = GraphBuildProgressTracker::new(Arc::new(PanickingReporter));
434
435        // All operations should complete without propagating the panic
436        tracker.start_phase(1, "Panic test", 100);
437        tracker.increment_progress();
438        tracker.add_progress(5);
439        tracker.complete_phase();
440        tracker.start_saving("test");
441        tracker.complete_saving("test", Duration::from_millis(10));
442
443        // If we got here, all panics were caught and the build continued
444        assert_eq!(tracker.current_progress(), 6);
445    }
446}