sqlitegraph/progress.rs
1//! Progress tracking for long-running operations.
2//!
3//! This module provides callback-based progress reporting for algorithms
4//! that may take significant time to execute (PageRank, betweenness centrality, etc.).
5//!
6//! # Key Types
7//!
8//! - [`ProgressCallback`] - Trait for progress reporting
9//! - [`NoProgress`] - Zero-overhead no-op implementation (default)
10//! - [`ConsoleProgress`] - CLI-friendly stderr output
11//! - [`ProgressState`] - Throttled wrapper to avoid spam
12//!
13//! # Usage Patterns
14//!
15//! ## Zero-Overhead Progress (Default)
16//!
17//! Use [`NoProgress`] when you don't need progress reporting:
18//!
19//! ```rust,ignore
20//! use sqlitegraph::{algo::pagerank, progress::NoProgress};
21//!
22//! let results = pagerank(&graph)?;
23//! // No progress output, zero overhead
24//! ```
25//!
26//! ## Console Progress for CLI
27//!
28//! Use [`ConsoleProgress`] for CLI applications:
29//!
30//! ```rust,ignore
31//! use sqlitegraph::{algo::pagerank_with_progress, progress::ConsoleProgress};
32//!
33//! let progress = ConsoleProgress::new();
34//! let results = pagerank_with_progress(&graph, progress)?;
35//! // Output to stderr:
36//! // PageRank iteration 1/100...
37//! // PageRank iteration 2/100...
38//! // ...
39//! ```
40//!
41//! Note: Progress is written to **stderr** to avoid interfering with
42//! data output on stdout.
43//!
44//! # ProgressCallback Trait
45//!
46//! The [`ProgressCallback`] trait defines the interface for progress reporting:
47//!
48//! ## Thread Safety
49//!
50//! All implementations must be `Send + Sync` for thread-safe use:
51//!
52//! ```rust,ignore
53//! use std::sync::Arc;
54//! use sqlitegraph::progress::ConsoleProgress;
55//!
56//! let progress = Arc::new(ConsoleProgress::new());
57//! // Safe to share across threads
58//! ```
59//!
60//! ## Callback Methods
61//!
62//! - **`on_progress(current, total, message)`**: Called repeatedly during operation
63//! - **`on_complete()`**: Called exactly once on success
64//! - **`on_error(error)`**: Called exactly once on failure
65//!
66//! # Implementations
67//!
68//! ## NoProgress
69//!
70//! Zero-overhead no-op implementation:
71//!
72//! - **Cost**: Zero (all methods are `#[inline]` no-ops)
73//! - **Use case**: Library code, batch processing, tests
74//! - **Output**: None
75//!
76//! ```rust,ignore
77//! let progress = NoProgress;
78//! progress.on_progress(50, Some(100), "Processing"); // Does nothing
79//! ```
80//!
81//! ## ConsoleProgress
82//!
83//! CLI-friendly stderr output:
84//!
85//! - **Cost**: Minimal (formatted write to stderr)
86//! - **Use case**: Interactive CLI applications
87//! - **Output**: `Message [current/total]` or `Message: current`
88//!
89//! ```rust,ignore
90//! let console = ConsoleProgress::new();
91//! console.on_progress(5, Some(10), "Processing");
92//! // Output: Processing [5/10]
93//!
94//! console.on_progress(5, None, "Processing");
95//! // Output: Processing: 5
96//! ```
97//!
98//! ## ProgressState
99//!
100//! Throttled wrapper to avoid spam:
101//!
102//! - **Cost**: Minimal (time-checked throttling)
103//! - **Use case**: High-frequency progress updates
104//! - **Behavior**: Only calls underlying callback every N milliseconds
105//!
106//! ```rust,ignore
107//! use std::time::Duration;
108//! use sqlitegraph::progress::ProgressState;
109//!
110//! let base = ConsoleProgress::new();
111//! let throttled = ProgressState::new(base, Duration::from_millis(100));
112//!
113//! // Only outputs every 100ms, even if called more frequently
114//! for i in 0..1000 {
115//! throttled.on_progress(i, Some(1000), "Processing");
116//! }
117//! ```
118//!
119//! # Progress Throttling
120//!
121//! High-frequency progress updates can cause performance issues and output spam.
122//! [`ProgressState`] addresses this with **time-based throttling**:
123//!
124//! ## Throttling Behavior
125//!
126//! - **Minimum interval**: Configurable (default 100ms)
127//! - **First call**: Always executes
128//! - **Subsequent calls**: Only if `now - last_call >= min_interval`
129//! - **Completion**: Always calls `on_complete()` (not throttled)
130//! - **Errors**: Always calls `on_error()` (not throttled)
131//!
132//! ## Why Throttle?
133//!
134//! - **Performance**: Avoid excessive I/O from rapid updates
135//! - **UX**: Prevent unreadable rapid-fire output
136//! - **LLM-friendly**: Provide summarized progress for AI consumption
137//!
138//! # Using with Algorithms
139//!
140//! Progress-tracking variants are available for long-running algorithms:
141//!
142//! ```rust,ignore
143//! use sqlitegraph::{
144//! algo::{pagerank_with_progress, louvain_communities_with_progress},
145//! progress::ConsoleProgress
146//! };
147//!
148//! let progress = ConsoleProgress::new();
149//!
150//! // PageRank with progress
151//! let rankings = pagerank_with_progress(&graph, progress.clone())?;
152//!
153//! // Louvain with progress
154//! let communities = louvain_communities_with_progress(&graph, progress)?;
155//! ```
156//!
157//! Available `_with_progress` variants:
158//! - [`pagerank_with_progress`](crate::algo::pagerank_with_progress)
159//! - [`betweenness_centrality_with_progress`](crate::algo::betweenness_centrality_with_progress)
160//! - [`louvain_communities_with_progress`](crate::algo::louvain_communities_with_progress)
161//!
162//! # Custom Implementations
163//!
164//! Implement [`ProgressCallback`] for custom behavior:
165//!
166//! ```rust,ignore
167//! use sqlitegraph::progress::ProgressCallback;
168//!
169//! struct CustomProgress {
170//! start_time: std::time::Instant,
171//! }
172//!
173//! impl ProgressCallback for CustomProgress {
174//! fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
175//! let elapsed = self.start_time.elapsed().as_secs_f64();
176//! match total {
177//! Some(total) => {
178//! let percent = (current as f64 / total as f64) * 100.0;
179//! println!("{}: {:.1}% ({:.2}s elapsed)", message, percent, elapsed);
180//! }
181//! None => {
182//! println!("{}: {} ({:.2}s elapsed)", message, current, elapsed);
183//! }
184//! }
185//! }
186//!
187//! fn on_complete(&self) {
188//! let elapsed = self.start_time.elapsed().as_secs_f64();
189//! println!("Complete in {:.2}s", elapsed);
190//! }
191//!
192//! fn on_error(&self, error: &dyn std::error::Error) {
193//! eprintln!("Error: {}", error);
194//! }
195//! }
196//! ```
197
198use std::sync::Mutex;
199use std::time::{Duration, Instant};
200
201/// Callback trait for progress reporting on long-running operations.
202///
203/// This trait allows algorithms to report progress updates during execution,
204/// enabling user feedback and LLM visibility into operation status.
205///
206/// # Thread Safety
207///
208/// All methods are thread-safe (require `Send + Sync`), allowing progress
209/// callbacks to be shared across threads if needed.
210///
211/// # Example
212///
213/// ```rust
214/// use sqlitegraph::progress::ProgressCallback;
215///
216/// struct MyCallback {
217/// // Your fields here
218/// }
219///
220/// impl ProgressCallback for MyCallback {
221/// fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
222/// // Handle progress update
223/// }
224///
225/// fn on_complete(&self) {
226/// // Handle completion
227/// }
228///
229/// fn on_error(&self, error: &dyn std::error::Error) {
230/// // Handle error
231/// }
232/// }
233/// ```
234pub trait ProgressCallback: Send + Sync {
235 /// Called when progress is made.
236 ///
237 /// # Parameters
238 /// - `current`: Current step or item being processed
239 /// - `total`: Total number of steps (if known), `None` for indeterminate operations
240 /// - `message`: Human-readable progress message
241 ///
242 /// # Example
243 ///
244 /// ```rust
245 /// # use sqlitegraph::progress::ProgressCallback;
246 /// # struct MyCallback;
247 /// # impl ProgressCallback for MyCallback {
248 /// fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
249 /// match total {
250 /// Some(total) => println!("{}: {}/{}", message, current, total),
251 /// None => println!("{}: {}", message, current),
252 /// }
253 /// }
254 /// # fn on_complete(&self) {}
255 /// # fn on_error(&self, _: &dyn std::error::Error) {}
256 /// # }
257 /// ```
258 fn on_progress(&self, current: usize, total: Option<usize>, message: &str);
259
260 /// Called when the operation completes successfully.
261 ///
262 /// This is called exactly once if no errors occur.
263 fn on_complete(&self);
264
265 /// Called when the operation encounters an error.
266 ///
267 /// # Parameters
268 /// - `error`: The error that caused the operation to fail
269 ///
270 /// This is called exactly once if an error occurs, and `on_complete` will not be called.
271 fn on_error(&self, error: &dyn std::error::Error);
272}
273
274/// No-op progress callback (default implementation).
275///
276/// This implementation does nothing, allowing progress-based APIs
277/// to have zero overhead when progress reporting is not needed.
278///
279/// # Example
280///
281/// ```rust
282/// use sqlitegraph::progress::NoProgress;
283///
284/// let progress = NoProgress;
285/// progress.on_progress(5, Some(10), "Processing..."); // Does nothing
286/// progress.on_complete(); // Does nothing
287/// ```
288#[derive(Debug, Clone, Copy, Default)]
289pub struct NoProgress;
290
291impl ProgressCallback for NoProgress {
292 #[inline]
293 fn on_progress(&self, _current: usize, _total: Option<usize>, _message: &str) {
294 // No-op
295 }
296
297 #[inline]
298 fn on_complete(&self) {
299 // No-op
300 }
301
302 #[inline]
303 fn on_error(&self, _error: &dyn std::error::Error) {
304 // No-op
305 }
306}
307
308/// Console progress reporter for CLI use.
309///
310/// Prints progress updates to stderr, making it suitable for CLI applications
311/// where stdout may be used for data output.
312///
313/// # Example
314///
315/// ```rust
316/// use sqlitegraph::progress::ConsoleProgress;
317///
318/// let console = ConsoleProgress::new();
319/// console.on_progress(5, Some(10), "Processing");
320/// // Output to stderr: Processing [5/10]
321/// ```
322#[derive(Debug)]
323pub struct ConsoleProgress {
324 // Not strictly needed for Mutex, but provides future flexibility
325 // for potential shared state across threads
326 _private: (),
327}
328
329impl ConsoleProgress {
330 /// Creates a new console progress reporter.
331 ///
332 /// # Example
333 ///
334 /// ```rust
335 /// use sqlitegraph::progress::ConsoleProgress;
336 ///
337 /// let console = ConsoleProgress::new();
338 /// ```
339 #[inline]
340 pub fn new() -> Self {
341 Self { _private: () }
342 }
343}
344
345impl Default for ConsoleProgress {
346 #[inline]
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352impl ProgressCallback for ConsoleProgress {
353 fn on_progress(&self, current: usize, total: Option<usize>, message: &str) {
354 match total {
355 Some(total) => eprintln!("{} [{}/{}]", message, current, total),
356 None => eprintln!("{} [{}]", message, current),
357 }
358 }
359
360 fn on_complete(&self) {
361 eprintln!("Complete");
362 }
363
364 fn on_error(&self, error: &dyn std::error::Error) {
365 eprintln!("Error: {}", error);
366 }
367}
368
369/// Helper wrapper for throttling progress callback frequency.
370///
371/// Some operations progress very quickly (e.g., processing thousands of items),
372/// and reporting progress on every item would overwhelm the callback and impact
373/// performance. This wrapper enforces a minimum time between updates.
374///
375/// # Example
376///
377/// ```rust
378/// use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
379/// use std::time::Duration;
380///
381/// let inner = NoProgress;
382/// let mut progress = ProgressState::new(&inner, Duration::from_millis(100));
383///
384/// // Only reports if at least 100ms has passed since last report
385/// progress.update(5, Some(10), "Processing")?;
386/// progress.update(6, Some(10), "Processing")?; // May be skipped
387/// # Ok::<(), Box<dyn std::error::Error>>(())
388/// ```
389#[derive(Debug)]
390pub struct ProgressState<'a, F>
391where
392 F: ProgressCallback + ?Sized,
393{
394 callback: &'a F,
395 interval: Duration,
396 last_update: Mutex<Instant>,
397}
398
399impl<'a, F> ProgressState<'a, F>
400where
401 F: ProgressCallback + ?Sized,
402{
403 /// Creates a new progress state wrapper.
404 ///
405 /// # Parameters
406 /// - `callback`: The underlying progress callback to wrap
407 /// - `interval`: Minimum time between progress updates
408 ///
409 /// # Example
410 ///
411 /// ```rust
412 /// use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
413 /// use std::time::Duration;
414 ///
415 /// let callback = NoProgress;
416 /// let progress = ProgressState::new(&callback, Duration::from_millis(100));
417 /// ```
418 #[inline]
419 pub fn new(callback: &'a F, interval: Duration) -> Self {
420 Self {
421 callback,
422 interval,
423 last_update: Mutex::new(Instant::now() - interval), // Allow immediate first update
424 }
425 }
426
427 /// Updates progress, but only if the minimum interval has elapsed.
428 ///
429 /// # Parameters
430 /// - `current`: Current step or item being processed
431 /// - `total`: Total number of steps (if known)
432 /// - `message`: Human-readable progress message
433 ///
434 /// # Example
435 ///
436 /// ```rust
437 /// # use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
438 /// # use std::time::Duration;
439 /// # let callback = NoProgress;
440 /// # let mut progress = ProgressState::new(&callback, Duration::from_millis(100));
441 /// // Returns immediately if interval hasn't elapsed
442 /// progress.update(50, Some(100), "Processing");
443 /// ```
444 pub fn update(&mut self, current: usize, total: Option<usize>, message: &str) {
445 let mut last_update = match self.last_update.lock() {
446 Ok(guard) => guard,
447 Err(_) => return, // Mutex poisoned - skip update
448 };
449
450 let now = Instant::now();
451
452 if now.duration_since(*last_update) >= self.interval {
453 self.callback.on_progress(current, total, message);
454 *last_update = now;
455 }
456 }
457
458 /// Forces an immediate progress update, bypassing the throttling logic.
459 ///
460 /// Use this for important milestones (e.g., completion) that should
461 /// always be reported regardless of timing.
462 ///
463 /// # Example
464 ///
465 /// ```rust
466 /// # use sqlitegraph::progress::{ProgressCallback, ProgressState, NoProgress};
467 /// # use std::time::Duration;
468 /// # let callback = NoProgress;
469 /// # let mut progress = ProgressState::new(&callback, Duration::from_secs(10));
470 /// // Always report the final update, even if interval hasn't elapsed
471 /// progress.force_update(100, Some(100), "Complete");
472 /// ```
473 #[inline]
474 pub fn force_update(&mut self, current: usize, total: Option<usize>, message: &str) {
475 self.callback.on_progress(current, total, message);
476 if let Ok(mut last_update) = self.last_update.lock() {
477 *last_update = Instant::now();
478 }
479 }
480
481 /// Returns the configured update interval.
482 ///
483 /// # Example
484 ///
485 /// ```rust
486 /// # use sqlitegraph::progress::{ProgressState, NoProgress};
487 /// # use std::time::Duration;
488 /// # let callback = NoProgress;
489 /// # let progress = ProgressState::new(&callback, Duration::from_millis(100));
490 /// let interval = progress.update_interval();
491 /// assert_eq!(interval, Duration::from_millis(100));
492 /// ```
493 #[inline]
494 pub fn update_interval(&self) -> Duration {
495 self.interval
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use std::sync::atomic::{AtomicUsize, Ordering};
503
504 #[derive(Debug)]
505 struct TestCallback {
506 progress_count: AtomicUsize,
507 complete_count: AtomicUsize,
508 error_count: AtomicUsize,
509 }
510
511 impl TestCallback {
512 fn new() -> Self {
513 Self {
514 progress_count: AtomicUsize::new(0),
515 complete_count: AtomicUsize::new(0),
516 error_count: AtomicUsize::new(0),
517 }
518 }
519
520 fn progress_count(&self) -> usize {
521 self.progress_count.load(Ordering::SeqCst)
522 }
523
524 fn complete_count(&self) -> usize {
525 self.complete_count.load(Ordering::SeqCst)
526 }
527
528 fn error_count(&self) -> usize {
529 self.error_count.load(Ordering::SeqCst)
530 }
531 }
532
533 impl ProgressCallback for TestCallback {
534 fn on_progress(&self, _current: usize, _total: Option<usize>, _message: &str) {
535 self.progress_count.fetch_add(1, Ordering::SeqCst);
536 }
537
538 fn on_complete(&self) {
539 self.complete_count.fetch_add(1, Ordering::SeqCst);
540 }
541
542 fn on_error(&self, _error: &dyn std::error::Error) {
543 self.error_count.fetch_add(1, Ordering::SeqCst);
544 }
545 }
546
547 #[test]
548 fn test_no_progress_is_no_op() {
549 let progress = NoProgress;
550 // Should not panic or do anything
551 progress.on_progress(5, Some(10), "Test");
552 progress.on_complete();
553 progress.on_error(&std::io::Error::new(std::io::ErrorKind::Other, "test"));
554 }
555
556 #[test]
557 fn test_callback_invocation() {
558 let callback = TestCallback::new();
559
560 assert_eq!(callback.progress_count(), 0);
561 assert_eq!(callback.complete_count(), 0);
562 assert_eq!(callback.error_count(), 0);
563
564 callback.on_progress(1, Some(10), "Test 1");
565 callback.on_progress(2, Some(10), "Test 2");
566 callback.on_complete();
567
568 assert_eq!(callback.progress_count(), 2);
569 assert_eq!(callback.complete_count(), 1);
570 assert_eq!(callback.error_count(), 0);
571 }
572
573 #[test]
574 fn test_error_invocation() {
575 let callback = TestCallback::new();
576
577 let error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
578 callback.on_error(&error);
579
580 assert_eq!(callback.progress_count(), 0);
581 assert_eq!(callback.complete_count(), 0);
582 assert_eq!(callback.error_count(), 1);
583 }
584
585 #[test]
586 fn test_progress_state_throttling() {
587 let callback = TestCallback::new();
588 let interval = Duration::from_millis(50);
589 let mut progress = ProgressState::new(&callback, interval);
590
591 // First update should always succeed (last_update is initialized in the past)
592 progress.update(1, Some(10), "Test 1");
593 assert_eq!(callback.progress_count(), 1);
594
595 // Immediate second update should be throttled
596 progress.update(2, Some(10), "Test 2");
597 assert_eq!(callback.progress_count(), 1); // Still 1
598
599 // Force update should bypass throttling
600 progress.force_update(3, Some(10), "Test 3");
601 assert_eq!(callback.progress_count(), 2); // Now 2
602
603 // Wait for interval to elapse
604 std::thread::sleep(interval);
605
606 // Next update should succeed
607 progress.update(4, Some(10), "Test 4");
608 assert_eq!(callback.progress_count(), 3); // Now 3
609 }
610
611 #[test]
612 fn test_progress_state_update_interval() {
613 let callback = NoProgress;
614 let interval = Duration::from_millis(100);
615 let progress = ProgressState::new(&callback, interval);
616
617 assert_eq!(progress.update_interval(), interval);
618 }
619
620 #[test]
621 fn test_console_progress_default() {
622 let console = ConsoleProgress::default();
623 // Just verify it compiles and doesn't panic
624 console.on_progress(5, Some(10), "Test");
625 console.on_complete();
626 }
627}