adaptive_pipeline/infrastructure/services/
progress_indicator.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure module - contains future features not yet fully utilized
9#![allow(dead_code, unused_imports, unused_variables)]
10//! # Progress Indicator Service
11//!
12//! This module provides a real-time progress indicator for user feedback during
13//! pipeline processing operations. It offers immediate visual feedback to users
14//! about processing progress, separate from logging and metrics systems.
15//!
16//! ## Overview
17//!
18//! The progress indicator service provides:
19//!
20//! - **Real-Time Updates**: Live progress updates as chunks are processed
21//! - **User-Focused Feedback**: Immediate visual feedback for end users
22//! - **Terminal Integration**: Direct terminal output with in-place updates
23//! - **Thread Safety**: Concurrent-safe for multi-threaded processing
24//! - **Performance Metrics**: Throughput and timing information
25//!
26//! ## Architecture
27//!
28//! The progress indicator follows these design principles:
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │                    Progress Indicator System                     │
33//! │                                                                     │
34//! │  ┌─────────────────────────────────────────────────────────┐    │
35//! │  │                 Real-Time Display                       │    │
36//! │  │  - Terminal output with in-place updates                │    │
37//! │  │  - Chunk progress tracking                              │    │
38//! │  │  - Throughput calculations                              │    │
39//! │  └─────────────────────────────────────────────────────────┘    │
40//! │                                                                     │
41//! │  ┌─────────────────────────────────────────────────────────┐    │
42//! │  │                Thread-Safe Tracking                    │    │
43//! │  │  - Atomic counters for concurrent access               │    │
44//! │  │  - Mutex coordination for terminal output              │    │
45//! │  │  - Lock-free progress updates                           │    │
46//! │  └─────────────────────────────────────────────────────────┘    │
47//! │                                                                     │
48//! │  ┌─────────────────────────────────────────────────────────┐    │
49//! │  │              Performance Monitoring                   │    │
50//! │  │  - Throughput calculation (MB/s)                       │    │
51//! │  │  - Duration tracking                                    │    │
52//! │  │  - Completion statistics                               │    │
53//! │  └─────────────────────────────────────────────────────────┘    │
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Design Principles
58//!
59//! ### User-Focused Design
60//!
61//! The progress indicator is designed specifically for end-user feedback:
62//!
63//! - **Immediate Feedback**: Updates appear as soon as chunks are processed
64//! - **Clear Format**: Easy-to-read progress format with chunk IDs and counts
65//! - **Non-Intrusive**: Doesn't interfere with normal application logging
66//! - **Terminal Integration**: Works seamlessly with terminal-based
67//!   applications
68//!
69//! ### Separation of Concerns
70//!
71//! Progress indication is separate from other monitoring systems:
72//!
73//! - **Not Logging**: Writes directly to terminal, bypassing logging systems
74//! - **Not Metrics**: Focused on user feedback, not system monitoring
75//! - **Real-Time**: Updates immediately, not batched or aggregated
76//! - **Ephemeral**: Progress display is temporary and contextual
77//!
78//! ### Performance Considerations
79//!
80//! - **Minimal Overhead**: Lightweight implementation to avoid performance
81//!   impact
82//! - **Atomic Operations**: Lock-free progress updates using atomic counters
83//! - **Coordinated Output**: Mutex only for terminal output coordination
84//! - **Efficient Updates**: In-place terminal updates without scrolling
85//!
86//! ## Output Format
87//!
88//! ### Progress Display
89//!
90//! The progress indicator shows real-time chunk processing status:
91//!
92//! ```text
93//! Wrote Id: 000097/Completed: 002000
94//! ```
95//!
96//! - **Wrote Id**: Last chunk ID that was processed
97//! - **Completed**: Total number of chunks completed
98//! - **Format**: Zero-padded for consistent alignment
99//!
100//! ### Completion Summary
101//!
102//! Upon completion, shows comprehensive statistics:
103//!
104//! ```text
105//! ✅ Processing completed successfully!
106//! 📄 Processed: 1.25 GB
107//! ⏱️ Duration: 2.34 seconds
108//! 🚀 Throughput: 534.2 MB/s
109//! 📊 Chunks: 2000 total
110//! ```
111//!
112//! ## Usage Examples
113//!
114//! ### Basic Progress Tracking
115
116//!
117//! ### Concurrent Processing
118//!
119//!
120//! ### Integration with File Processing
121
122//!
123//! ## Thread Safety
124//!
125//! The progress indicator is designed for concurrent use:
126//!
127//! ### Atomic Counters
128//!
129//! - **Completed Chunks**: Atomic counter for lock-free updates
130//! - **Last Chunk ID**: Atomic storage for the most recent chunk ID
131//! - **Performance**: No contention on progress updates
132//!
133//! ### Terminal Coordination
134//!
135//! - **Output Mutex**: Coordinates terminal output to prevent garbled display
136//! - **Minimal Locking**: Mutex only held during actual terminal writes
137//! - **Non-Blocking**: Progress updates don't block on terminal output
138//!
139//! ## Performance Impact
140//!
141//! The progress indicator is designed to have minimal performance impact:
142//!
143//! - **Atomic Operations**: Lock-free progress updates
144//! - **Minimal Allocations**: Reuses buffers and avoids unnecessary allocations
145//! - **Efficient Terminal I/O**: Direct terminal writes without buffering
146//! - **Optional**: Can be disabled in production environments if needed
147//!
148//! ## Integration with Other Systems
149//!
150//! ### Logging System
151//!
152//! Progress indication is separate from logging:
153//!
154//! - **No Log Interference**: Doesn't interfere with structured logging
155//! - **Direct Terminal**: Writes directly to terminal, not through log handlers
156//! - **Complementary**: Works alongside logging for different purposes
157//!
158//! ### Metrics System
159//!
160//! Progress indication complements metrics collection:
161//!
162//! - **Different Purpose**: User feedback vs. system monitoring
163//! - **Real-Time**: Immediate updates vs. aggregated metrics
164//! - **Ephemeral**: Temporary display vs. persistent metrics storage
165//!
166//! ## Error Handling
167//!
168//! The progress indicator handles errors gracefully:
169//!
170//! - **Terminal Errors**: Gracefully handles terminal I/O errors
171//! - **Non-Fatal**: Progress indicator failures don't affect processing
172//! - **Fallback**: Can fall back to silent operation if terminal is unavailable
173//! - **Recovery**: Automatically recovers from transient terminal issues
174
175use std::io::{self, Write};
176use std::sync::atomic::{AtomicU64, Ordering};
177use std::sync::Arc;
178use tokio::sync::Mutex;
179use tokio::time::{Duration, Instant};
180
181/// Real-time progress indicator for user feedback during pipeline processing.
182///
183/// This provides immediate visual feedback to users about processing progress,
184/// separate from logging and metrics systems. It writes directly to the
185/// terminal with real-time updates on the same line.
186///
187/// # Design Principles
188/// - **User-Focused**: Provides immediate visual feedback for end users
189/// - **Non-Logging**: Writes directly to terminal, not through logging system
190/// - **Real-Time**: Updates as chunks are processed, not batched
191/// - **Concurrent-Safe**: Thread-safe for concurrent chunk processing
192/// - **Minimal Overhead**: Lightweight to avoid impacting performance
193///
194/// # Example Output
195/// ```text
196/// Wrote Id: 000097/Completed: 002000
197/// ```
198///
199/// # Usage
200pub struct ProgressIndicatorService {
201    /// Total number of chunks expected
202    total_chunks: u64,
203
204    /// Number of chunks completed (atomic for thread safety)
205    completed_chunks: Arc<AtomicU64>,
206
207    /// Last chunk ID written (for display)
208    last_chunk_id: Arc<AtomicU64>,
209
210    /// Mutex for terminal output coordination
211    terminal_mutex: Arc<Mutex<()>>,
212
213    /// Start time for duration calculation
214    start_time: Instant,
215
216    /// Last update time (to avoid too frequent updates)
217    last_update: Arc<Mutex<Instant>>,
218}
219
220impl ProgressIndicatorService {
221    /// Creates a new progress indicator.
222    ///
223    /// # Arguments
224    /// * `total_chunks` - Total number of chunks expected to be processed
225    ///
226    /// # Returns
227    /// * `Self` - New progress indicator instance
228    pub fn new(total_chunks: u64) -> Self {
229        // Show initial progress with blank line before
230        println!();
231        print!("\rWrote Id: 000000/Completed: {:06}", total_chunks);
232        io::stdout().flush().unwrap_or(());
233
234        Self {
235            total_chunks,
236            completed_chunks: Arc::new(AtomicU64::new(0)),
237            last_chunk_id: Arc::new(AtomicU64::new(0)),
238            terminal_mutex: Arc::new(Mutex::new(())),
239            start_time: Instant::now(),
240            last_update: Arc::new(Mutex::new(Instant::now())),
241        }
242    }
243
244    /// Updates progress when a chunk has been successfully written.
245    ///
246    /// This method is thread-safe and can be called concurrently from
247    /// multiple chunk processing tasks.
248    ///
249    /// # Arguments
250    /// * `chunk_id` - ID of the chunk that was just written
251    ///
252    /// # Performance
253    /// Updates are throttled to avoid excessive terminal I/O during
254    /// high-throughput processing.
255    pub async fn update_progress(&self, chunk_id: u64) {
256        // Update counters atomically
257        let completed = self.completed_chunks.fetch_add(1, Ordering::Relaxed) + 1;
258        self.last_chunk_id.store(chunk_id, Ordering::Relaxed);
259
260        // Throttle updates to avoid excessive terminal I/O
261        // Only update every 100ms or every 10 chunks, whichever comes first
262        let should_update = {
263            let mut last_update = self.last_update.lock().await;
264            let now = Instant::now();
265            let time_since_update = now.duration_since(*last_update);
266
267            if time_since_update >= Duration::from_millis(100) || completed.is_multiple_of(10) {
268                *last_update = now;
269                true
270            } else {
271                false
272            }
273        };
274
275        if should_update {
276            self.update_display(chunk_id, completed).await;
277        }
278    }
279
280    /// Updates the terminal display with current progress.
281    ///
282    /// This method coordinates terminal access to ensure clean output
283    /// even with concurrent chunk processing.
284    async fn update_display(&self, chunk_id: u64, completed: u64) {
285        let _terminal_lock = self.terminal_mutex.lock().await;
286
287        // Clear the current line and write new progress
288        print!("\rWrote Id: {:06}/Completed: {:06}", chunk_id, completed);
289        io::stdout().flush().unwrap_or(());
290    }
291
292    /// Shows the final completion summary.
293    ///
294    /// This replaces the progress indicator with a comprehensive summary
295    /// of the processing results.
296    ///
297    /// # Arguments
298    /// * `bytes_processed` - Total bytes processed
299    /// * `throughput_mb_s` - Processing throughput in MB/s
300    /// * `total_duration` - Total time taken for processing
301    pub async fn show_completion(&self, _bytes_processed: u64, _throughput_mb_s: f64, _total_duration: Duration) {
302        let _terminal_lock = self.terminal_mutex.lock().await;
303
304        // Clear the progress line and show final progress with correct total
305        let final_completed = self.completed_chunks.load(Ordering::Relaxed);
306        print!(
307            "\rWrote Id: {:06}/Completed: {:06}\n",
308            self.last_chunk_id.load(Ordering::Relaxed),
309            final_completed
310        );
311
312        io::stdout().flush().unwrap_or(());
313    }
314
315    /// Shows an error summary if processing fails.
316    ///
317    /// # Arguments
318    /// * `error_message` - Description of what went wrong
319    pub async fn show_error_summary(&self, error_message: &str) {
320        let _terminal_lock = self.terminal_mutex.lock().await;
321
322        // Clear the progress line and show final progress
323        let final_completed = self.completed_chunks.load(Ordering::Relaxed);
324        println!(
325            "\rWrote Id: {:06}/Completed: {:06}",
326            self.last_chunk_id.load(Ordering::Relaxed),
327            final_completed
328        );
329
330        // Show error summary with 6-digit precision
331        println!("\n✗ Processing Failed!");
332        println!("  Chunks Completed: {:06}", final_completed);
333        println!("  Total Expected:   {:06}", self.total_chunks);
334        println!("  Error:            {}", error_message);
335        println!();
336        io::stdout().flush().unwrap_or(());
337    }
338
339    /// Gets the current progress as a percentage.
340    ///
341    /// # Returns
342    /// * `f64` - Progress percentage (0.0 to 100.0)
343    pub fn progress_percentage(&self) -> f64 {
344        let completed = self.completed_chunks.load(Ordering::Relaxed);
345        if self.total_chunks > 0 {
346            ((completed as f64) / (self.total_chunks as f64)) * 100.0
347        } else {
348            0.0
349        }
350    }
351}
352
353// Explicitly implement Send and Sync for ProgressIndicatorService
354// All fields are Send + Sync (Arc<AtomicU64>, Arc<Mutex<T>>, u64, Instant)
355unsafe impl Send for ProgressIndicatorService {}
356unsafe impl Sync for ProgressIndicatorService {}
357
358/// Formats bytes in human-readable format.
359///
360/// # Arguments
361/// * `bytes` - Number of bytes to format
362///
363/// # Returns
364/// * `String` - Human-readable byte format (e.g., "1.5 MB")
365#[allow(dead_code)]
366fn format_bytes(bytes: u64) -> String {
367    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
368    const THRESHOLD: f64 = 1024.0;
369
370    if bytes == 0 {
371        return "0 B".to_string();
372    }
373
374    let mut size = bytes as f64;
375    let mut unit_index = 0;
376
377    while size >= THRESHOLD && unit_index < UNITS.len() - 1 {
378        size /= THRESHOLD;
379        unit_index += 1;
380    }
381
382    if unit_index == 0 {
383        format!("{} {}", bytes, UNITS[unit_index])
384    } else {
385        format!("{:.1} {}", size, UNITS[unit_index])
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use tokio::time::{sleep, Duration};
393
394    #[tokio::test]
395    async fn test_progress_indicator_creation() {
396        let progress = ProgressIndicatorService::new(100);
397        assert_eq!(progress.total_chunks, 100);
398        assert_eq!(progress.completed_chunks.load(Ordering::Relaxed), 0);
399    }
400
401    #[tokio::test]
402    async fn test_chunk_update() {
403        let progress = ProgressIndicatorService::new(100);
404
405        progress.update_progress(1).await;
406        progress.update_progress(2).await;
407
408        assert_eq!(progress.completed_chunks.load(Ordering::Relaxed), 2);
409        assert_eq!(progress.last_chunk_id.load(Ordering::Relaxed), 2);
410    }
411
412    #[tokio::test]
413    async fn test_progress_percentage() {
414        let progress = ProgressIndicatorService::new(100);
415
416        assert_eq!(progress.progress_percentage(), 0.0);
417
418        progress.update_progress(1).await;
419        progress.update_progress(2).await;
420
421        assert_eq!(progress.progress_percentage(), 2.0);
422    }
423
424    /// Tests byte formatting for human-readable display.
425    ///
426    /// This test validates that the byte formatting function properly
427    /// converts byte values to human-readable strings with appropriate
428    /// units (B, KB, MB, GB) and decimal precision.
429    ///
430    /// # Test Coverage
431    ///
432    /// - Zero byte formatting
433    /// - Byte-level formatting (< 1KB)
434    /// - Kilobyte formatting with decimal precision
435    /// - Megabyte formatting with decimal precision
436    /// - Gigabyte formatting with decimal precision
437    /// - Unit selection and conversion accuracy
438    ///
439    /// # Test Scenario
440    ///
441    /// Tests various byte values across different scales to ensure
442    /// proper unit selection and formatting precision.
443    ///
444    /// # Infrastructure Concerns
445    ///
446    /// - User interface display formatting
447    /// - Progress reporting and visualization
448    /// - Human-readable data size representation
449    /// - Consistent formatting across the application
450    ///
451    /// # Assertions
452    ///
453    /// - Zero bytes display as "0 B"
454    /// - Small values display in bytes
455    /// - KB values display with 1 decimal place
456    /// - MB values display with 1 decimal place
457    /// - GB values display with 1 decimal place
458    #[test]
459    fn test_format_bytes() {
460        assert_eq!(format_bytes(0), "0 B");
461        assert_eq!(format_bytes(512), "512 B");
462        assert_eq!(format_bytes(1024), "1.0 KB");
463        assert_eq!(format_bytes(1536), "1.5 KB");
464        assert_eq!(format_bytes(1048576), "1.0 MB");
465        assert_eq!(format_bytes(1073741824), "1.0 GB");
466    }
467}