Skip to main content

sqlitegraph/
progress.rs

1//! Progress tracking for long-running operations.
2//!
3//! This module provides callback-based progress reporting for algorithms
4//! that may take significant time to execute (PageRank, betweenness centrality, etc.).
5//!
6//! # Key Types
7//!
8//! - [`ProgressCallback`] - Trait for progress reporting
9//! - [`NoProgress`] - Zero-overhead no-op implementation (default)
10//! - [`ConsoleProgress`] - CLI-friendly stderr output
11//! - [`ProgressState`] - Throttled wrapper to avoid spam
12//!
13//! # Usage Patterns
14//!
15//! ## Zero-Overhead Progress (Default)
16//!
17//! Use [`NoProgress`] when you don't need progress reporting:
18//!
19//! ```rust,ignore
20//! use sqlitegraph::{algo::pagerank, progress::NoProgress};
21//!
22//! let results = pagerank(&graph)?;
23//! // No progress output, zero overhead
24//! ```
25//!
26//! ## Console Progress for CLI
27//!
28//! Use [`ConsoleProgress`] for CLI applications:
29//!
30//! ```rust,ignore
31//! use sqlitegraph::{algo::pagerank_with_progress, progress::ConsoleProgress};
32//!
33//! let progress = ConsoleProgress::new();
34//! let results = pagerank_with_progress(&graph, progress)?;
35//! // Output to stderr:
36//! // PageRank iteration 1/100...
37//! // PageRank iteration 2/100...
38//! // ...
39//! ```
40//!
41//! Note: Progress is written to **stderr** to avoid interfering with
42//! data output on stdout.
43//!
44//! # ProgressCallback Trait
45//!
46//! The [`ProgressCallback`] trait defines the interface for progress reporting:
47//!
48//! ## Thread Safety
49//!
50//! All implementations must be `Send + Sync` for thread-safe use:
51//!
52//! ```rust,ignore
53//! use std::sync::Arc;
54//! use sqlitegraph::progress::ConsoleProgress;
55//!
56//! let progress = Arc::new(ConsoleProgress::new());
57//! // Safe to share across threads
58//! ```
59//!
60//! ## Callback Methods
61//!
62//! - **`on_progress(current, total, message)`**: Called repeatedly during operation
63//! - **`on_complete()`**: Called exactly once on success
64//! - **`on_error(error)`**: Called exactly once on failure
65//!
66//! # Implementations
67//!
68//! ## NoProgress
69//!
70//! Zero-overhead no-op implementation:
71//!
72//! - **Cost**: Zero (all methods are `#[inline]` no-ops)
73//! - **Use case**: Library code, batch processing, tests
74//! - **Output**: None
75//!
76//! ```rust,ignore
77//! let progress = NoProgress;
78//! progress.on_progress(50, Some(100), "Processing"); // Does nothing
79//! ```
80//!
81//! ## ConsoleProgress
82//!
83//! CLI-friendly stderr output:
84//!
85//! - **Cost**: Minimal (formatted write to stderr)
86//! - **Use case**: Interactive CLI applications
87//! - **Output**: `Message [current/total]` or `Message: current`
88//!
89//! ```rust,ignore
90//! let console = ConsoleProgress::new();
91//! console.on_progress(5, Some(10), "Processing");
92//! // Output: Processing [5/10]
93//!
94//! console.on_progress(5, None, "Processing");
95//! // Output: Processing: 5
96//! ```
97//!
98//! ## ProgressState
99//!
100//! Throttled wrapper to avoid spam:
101//!
102//! - **Cost**: Minimal (time-checked throttling)
103//! - **Use case**: High-frequency progress updates
104//! - **Behavior**: Only calls underlying callback every N milliseconds
105//!
106//! ```rust,ignore
107//! use std::time::Duration;
108//! use sqlitegraph::progress::ProgressState;
109//!
110//! let base = ConsoleProgress::new();
111//! let throttled = ProgressState::new(base, Duration::from_millis(100));
112//!
113//! // Only outputs every 100ms, even if called more frequently
114//! for i in 0..1000 {
115//!     throttled.on_progress(i, Some(1000), "Processing");
116//! }
117//! ```
118//!
119//! # Progress Throttling
120//!
121//! High-frequency progress updates can cause performance issues and output spam.
122//! [`ProgressState`] addresses this with **time-based throttling**:
123//!
124//! ## Throttling Behavior
125//!
126//! - **Minimum interval**: Configurable (default 100ms)
127//! - **First call**: Always executes
128//! - **Subsequent calls**: Only if `now - last_call >= min_interval`
129//! - **Completion**: Always calls `on_complete()` (not throttled)
130//! - **Errors**: Always calls `on_error()` (not throttled)
131//!
132//! ## Why Throttle?
133//!
134//! - **Performance**: Avoid excessive I/O from rapid updates
135//! - **UX**: Prevent unreadable rapid-fire output
136//! - **LLM-friendly**: Provide summarized progress for AI consumption
137//!
138//! # Using with Algorithms
139//!
140//! Progress-tracking variants are available for long-running algorithms:
141//!
142//! ```rust,ignore
143//! use sqlitegraph::{
144//!     algo::{pagerank_with_progress, louvain_communities_with_progress},
145//!     progress::ConsoleProgress
146//! };
147//!
148//! let progress = ConsoleProgress::new();
149//!
150//! // PageRank with progress
151//! let rankings = pagerank_with_progress(&graph, progress.clone())?;
152//!
153//! // Louvain with progress
154//! let communities = louvain_communities_with_progress(&graph, progress)?;
155//! ```
156//!
157//! Available `_with_progress` variants:
158//! - [`pagerank_with_progress`](crate::algo::pagerank_with_progress)
159//! - [`betweenness_centrality_with_progress`](crate::algo::betweenness_centrality_with_progress)
160//! - [`louvain_communities_with_progress`](crate::algo::louvain_communities_with_progress)
161//!
162//! # Custom Implementations
163//!
164//! Implement [`ProgressCallback`] for custom behavior:
165//!
166//! ```rust,ignore
167//! use sqlitegraph::progress::ProgressCallback;
168//!
169//! struct CustomProgress {
170//!     start_time: std::time::Instant,
171//! }
172//!
173//! impl ProgressCallback for CustomProgress {
174//!     fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
175//!         let elapsed = self.start_time.elapsed().as_secs_f64();
176//!         match total {
177//!             Some(total) => {
178//!                 let percent = (current as f64 / total as f64) * 100.0;
179//!                 println!("{}: {:.1}% ({:.2}s elapsed)", message, percent, elapsed);
180//!             }
181//!             None => {
182//!                 println!("{}: {} ({:.2}s elapsed)", message, current, elapsed);
183//!             }
184//!         }
185//!     }
186//!
187//!     fn on_complete(&self) {
188//!         let elapsed = self.start_time.elapsed().as_secs_f64();
189//!         println!("Complete in {:.2}s", elapsed);
190//!     }
191//!
192//!     fn on_error(&self, error: &dyn std::error::Error) {
193//!         eprintln!("Error: {}", error);
194//!     }
195//! }
196//! ```
197
198use std::sync::Mutex;
199use std::time::{Duration, Instant};
200
201/// Callback trait for progress reporting on long-running operations.
202///
203/// This trait allows algorithms to report progress updates during execution,
204/// enabling user feedback and LLM visibility into operation status.
205///
206/// # Thread Safety
207///
208/// All methods are thread-safe (require `Send + Sync`), allowing progress
209/// callbacks to be shared across threads if needed.
210///
211/// # Example
212///
213/// ```rust
214/// use sqlitegraph::progress::ProgressCallback;
215///
216/// struct MyCallback {
217///     // Your fields here
218/// }
219///
220/// impl ProgressCallback for MyCallback {
221///     fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
222///         // Handle progress update
223///     }
224///
225///     fn on_complete(&self) {
226///         // Handle completion
227///     }
228///
229///     fn on_error(&self, error: &dyn std::error::Error) {
230///         // Handle error
231///     }
232/// }
233/// ```
234pub trait ProgressCallback: Send + Sync {
235    /// Called when progress is made.
236    ///
237    /// # Parameters
238    /// - `current`: Current step or item being processed
239    /// - `total`: Total number of steps (if known), `None` for indeterminate operations
240    /// - `message`: Human-readable progress message
241    ///
242    /// # Example
243    ///
244    /// ```rust
245    /// # use sqlitegraph::progress::ProgressCallback;
246    /// # struct MyCallback;
247    /// # impl ProgressCallback for MyCallback {
248    /// fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
249    ///     match total {
250    ///         Some(total) => println!("{}: {}/{}", message, current, total),
251    ///         None => println!("{}: {}", message, current),
252    ///     }
253    /// }
254    /// # fn on_complete(&self) {}
255    /// # fn on_error(&self, _: &dyn std::error::Error) {}
256    /// # }
257    /// ```
258    fn on_progress(&self, current: usize, total: Option<usize>, message: &str);
259
260    /// Called when the operation completes successfully.
261    ///
262    /// This is called exactly once if no errors occur.
263    fn on_complete(&self);
264
265    /// Called when the operation encounters an error.
266    ///
267    /// # Parameters
268    /// - `error`: The error that caused the operation to fail
269    ///
270    /// This is called exactly once if an error occurs, and `on_complete` will not be called.
271    fn on_error(&self, error: &dyn std::error::Error);
272}
273
274/// No-op progress callback (default implementation).
275///
276/// This implementation does nothing, allowing progress-based APIs
277/// to have zero overhead when progress reporting is not needed.
278///
279/// # Example
280///
281/// ```rust
282/// use sqlitegraph::progress::{NoProgress, ProgressCallback};
283///
284/// let progress = NoProgress;
285/// progress.on_progress(5, Some(10), "Processing..."); // Does nothing
286/// progress.on_complete(); // Does nothing
287/// ```
288#[derive(Debug, Clone, Copy, Default)]
289pub struct NoProgress;
290
291impl ProgressCallback for NoProgress {
292    #[inline]
293    fn on_progress(&self, _current: usize, _total: Option<usize>, _message: &str) {
294        // No-op
295    }
296
297    #[inline]
298    fn on_complete(&self) {
299        // No-op
300    }
301
302    #[inline]
303    fn on_error(&self, _error: &dyn std::error::Error) {
304        // No-op
305    }
306}
307
308/// Console progress reporter for CLI use.
309///
310/// Prints progress updates to stderr, making it suitable for CLI applications
311/// where stdout may be used for data output.
312///
313/// # Example
314///
315/// ```rust
316/// use sqlitegraph::progress::{ConsoleProgress, ProgressCallback};
317///
318/// let console = ConsoleProgress::new();
319/// console.on_progress(5, Some(10), "Processing");
320/// // Output to stderr: Processing [5/10]
321/// ```
322#[derive(Debug)]
323pub struct ConsoleProgress {
324    // Not strictly needed for Mutex, but provides future flexibility
325    // for potential shared state across threads
326    _private: (),
327}
328
329impl ConsoleProgress {
330    /// Creates a new console progress reporter.
331    ///
332    /// # Example
333    ///
334    /// ```rust
335    /// use sqlitegraph::progress::ConsoleProgress;
336    ///
337    /// let console = ConsoleProgress::new();
338    /// ```
339    #[inline]
340    pub fn new() -> Self {
341        Self { _private: () }
342    }
343}
344
345impl Default for ConsoleProgress {
346    #[inline]
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352impl ProgressCallback for ConsoleProgress {
353    fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
354        match total {
355            Some(total) => eprintln!("{} [{}/{}]", message, current, total),
356            None => eprintln!("{} [{}]", message, current),
357        }
358    }
359
360    fn on_complete(&self) {
361        eprintln!("Complete");
362    }
363
364    fn on_error(&self, error: &dyn std::error::Error) {
365        eprintln!("Error: {}", error);
366    }
367}
368
369/// Helper wrapper for throttling progress callback frequency.
370///
371/// Some operations progress very quickly (e.g., processing thousands of items),
372/// and reporting progress on every item would overwhelm the callback and impact
373/// performance. This wrapper enforces a minimum time between updates.
374///
375/// # Example
376///
377/// ```rust
378/// use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
379/// use std::time::Duration;
380///
381/// let inner = NoProgress;
382/// let mut progress = ProgressState::new(&inner, Duration::from_millis(100));
383///
384/// // Only reports if at least 100ms has passed since last report
385/// progress.update(5, Some(10), "Processing");
386/// progress.update(6, Some(10), "Processing"); // May be skipped
387/// ```
388#[derive(Debug)]
389pub struct ProgressState<'a, F>
390where
391    F: ProgressCallback + ?Sized,
392{
393    callback: &'a F,
394    interval: Duration,
395    last_update: Mutex<Instant>,
396}
397
398impl<'a, F> ProgressState<'a, F>
399where
400    F: ProgressCallback + ?Sized,
401{
402    /// Creates a new progress state wrapper.
403    ///
404    /// # Parameters
405    /// - `callback`: The underlying progress callback to wrap
406    /// - `interval`: Minimum time between progress updates
407    ///
408    /// # Example
409    ///
410    /// ```rust
411    /// use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
412    /// use std::time::Duration;
413    ///
414    /// let callback = NoProgress;
415    /// let progress = ProgressState::new(&callback, Duration::from_millis(100));
416    /// ```
417    #[inline]
418    pub fn new(callback: &'a F, interval: Duration) -> Self {
419        Self {
420            callback,
421            interval,
422            last_update: Mutex::new(Instant::now() - interval), // Allow immediate first update
423        }
424    }
425
426    /// Updates progress, but only if the minimum interval has elapsed.
427    ///
428    /// # Parameters
429    /// - `current`: Current step or item being processed
430    /// - `total`: Total number of steps (if known)
431    /// - `message`: Human-readable progress message
432    ///
433    /// # Example
434    ///
435    /// ```rust
436    /// # use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
437    /// # use std::time::Duration;
438    /// # let callback = NoProgress;
439    /// # let mut progress = ProgressState::new(&callback, Duration::from_millis(100));
440    /// // Returns immediately if interval hasn't elapsed
441    /// progress.update(50, Some(100), "Processing");
442    /// ```
443    pub fn update(&mut self, current: usize, total: Option<usize>, message: &str) {
444        let mut last_update = match self.last_update.lock() {
445            Ok(guard) => guard,
446            Err(_) => return, // Mutex poisoned - skip update
447        };
448
449        let now = Instant::now();
450
451        if now.duration_since(*last_update) >= self.interval {
452            self.callback.on_progress(current, total, message);
453            *last_update = now;
454        }
455    }
456
457    /// Forces an immediate progress update, bypassing the throttling logic.
458    ///
459    /// Use this for important milestones (e.g., completion) that should
460    /// always be reported regardless of timing.
461    ///
462    /// # Example
463    ///
464    /// ```rust
465    /// # use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
466    /// # use std::time::Duration;
467    /// # let callback = NoProgress;
468    /// # let mut progress = ProgressState::new(&callback, Duration::from_secs(10));
469    /// // Always report the final update, even if interval hasn't elapsed
470    /// progress.force_update(100, Some(100), "Complete");
471    /// ```
472    #[inline]
473    pub fn force_update(&mut self, current: usize, total: Option<usize>, message: &str) {
474        self.callback.on_progress(current, total, message);
475        if let Ok(mut last_update) = self.last_update.lock() {
476            *last_update = Instant::now();
477        }
478    }
479
480    /// Returns the configured update interval.
481    ///
482    /// # Example
483    ///
484    /// ```rust
485    /// # use sqlitegraph::progress::{ProgressState, NoProgress};
486    /// # use std::time::Duration;
487    /// # let callback = NoProgress;
488    /// # let progress = ProgressState::new(&callback, Duration::from_millis(100));
489    /// let interval = progress.update_interval();
490    /// assert_eq!(interval, Duration::from_millis(100));
491    /// ```
492    #[inline]
493    pub fn update_interval(&self) -> Duration {
494        self.interval
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use std::sync::atomic::{AtomicUsize, Ordering};
502
503    #[derive(Debug)]
504    struct TestCallback {
505        progress_count: AtomicUsize,
506        complete_count: AtomicUsize,
507        error_count: AtomicUsize,
508    }
509
510    impl TestCallback {
511        fn new() -> Self {
512            Self {
513                progress_count: AtomicUsize::new(0),
514                complete_count: AtomicUsize::new(0),
515                error_count: AtomicUsize::new(0),
516            }
517        }
518
519        fn progress_count(&self) -> usize {
520            self.progress_count.load(Ordering::SeqCst)
521        }
522
523        fn complete_count(&self) -> usize {
524            self.complete_count.load(Ordering::SeqCst)
525        }
526
527        fn error_count(&self) -> usize {
528            self.error_count.load(Ordering::SeqCst)
529        }
530    }
531
532    impl ProgressCallback for TestCallback {
533        fn on_progress(&self, _current: usize, _total: Option<usize>, _message: &str) {
534            self.progress_count.fetch_add(1, Ordering::SeqCst);
535        }
536
537        fn on_complete(&self) {
538            self.complete_count.fetch_add(1, Ordering::SeqCst);
539        }
540
541        fn on_error(&self, _error: &dyn std::error::Error) {
542            self.error_count.fetch_add(1, Ordering::SeqCst);
543        }
544    }
545
546    #[test]
547    fn test_no_progress_is_no_op() {
548        let progress = NoProgress;
549        // Should not panic or do anything
550        progress.on_progress(5, Some(10), "Test");
551        progress.on_complete();
552        progress.on_error(&std::io::Error::other("test"));
553    }
554
555    #[test]
556    fn test_callback_invocation() {
557        let callback = TestCallback::new();
558
559        assert_eq!(callback.progress_count(), 0);
560        assert_eq!(callback.complete_count(), 0);
561        assert_eq!(callback.error_count(), 0);
562
563        callback.on_progress(1, Some(10), "Test 1");
564        callback.on_progress(2, Some(10), "Test 2");
565        callback.on_complete();
566
567        assert_eq!(callback.progress_count(), 2);
568        assert_eq!(callback.complete_count(), 1);
569        assert_eq!(callback.error_count(), 0);
570    }
571
572    #[test]
573    fn test_error_invocation() {
574        let callback = TestCallback::new();
575
576        let error = std::io::Error::other("test error");
577        callback.on_error(&error);
578
579        assert_eq!(callback.progress_count(), 0);
580        assert_eq!(callback.complete_count(), 0);
581        assert_eq!(callback.error_count(), 1);
582    }
583
584    #[test]
585    fn test_progress_state_throttling() {
586        let callback = TestCallback::new();
587        let interval = Duration::from_millis(50);
588        let mut progress = ProgressState::new(&callback, interval);
589
590        // First update should always succeed (last_update is initialized in the past)
591        progress.update(1, Some(10), "Test 1");
592        assert_eq!(callback.progress_count(), 1);
593
594        // Immediate second update should be throttled
595        progress.update(2, Some(10), "Test 2");
596        assert_eq!(callback.progress_count(), 1); // Still 1
597
598        // Force update should bypass throttling
599        progress.force_update(3, Some(10), "Test 3");
600        assert_eq!(callback.progress_count(), 2); // Now 2
601
602        // Wait for interval to elapse
603        std::thread::sleep(interval);
604
605        // Next update should succeed
606        progress.update(4, Some(10), "Test 4");
607        assert_eq!(callback.progress_count(), 3); // Now 3
608    }
609
610    #[test]
611    fn test_progress_state_update_interval() {
612        let callback = NoProgress;
613        let interval = Duration::from_millis(100);
614        let progress = ProgressState::new(&callback, interval);
615
616        assert_eq!(progress.update_interval(), interval);
617    }
618
619    #[test]
620    fn test_console_progress_default() {
621        let console = ConsoleProgress::default();
622        // Just verify it compiles and doesn't panic
623        console.on_progress(5, Some(10), "Test");
624        console.on_complete();
625    }
626}