adaptive_pipeline/infrastructure/services/progress_indicator.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure module - contains future features not yet fully utilized
9#![allow(dead_code, unused_imports, unused_variables)]
10//! # Progress Indicator Service
11//!
12//! This module provides a real-time progress indicator for user feedback during
13//! pipeline processing operations. It offers immediate visual feedback to users
14//! about processing progress, separate from logging and metrics systems.
15//!
16//! ## Overview
17//!
18//! The progress indicator service provides:
19//!
20//! - **Real-Time Updates**: Live progress updates as chunks are processed
21//! - **User-Focused Feedback**: Immediate visual feedback for end users
22//! - **Terminal Integration**: Direct terminal output with in-place updates
23//! - **Thread Safety**: Concurrent-safe for multi-threaded processing
24//! - **Performance Metrics**: Throughput and timing information
25//!
26//! ## Architecture
27//!
28//! The progress indicator follows these design principles:
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │ Progress Indicator System │
33//! │ │
34//! │ ┌─────────────────────────────────────────────────────────┐ │
35//! │ │ Real-Time Display │ │
36//! │ │ - Terminal output with in-place updates │ │
37//! │ │ - Chunk progress tracking │ │
38//! │ │ - Throughput calculations │ │
39//! │ └─────────────────────────────────────────────────────────┘ │
40//! │ │
41//! │ ┌─────────────────────────────────────────────────────────┐ │
42//! │ │ Thread-Safe Tracking │ │
43//! │ │ - Atomic counters for concurrent access │ │
44//! │ │ - Mutex coordination for terminal output │ │
45//! │ │ - Lock-free progress updates │ │
46//! │ └─────────────────────────────────────────────────────────┘ │
47//! │ │
48//! │ ┌─────────────────────────────────────────────────────────┐ │
49//! │ │ Performance Monitoring │ │
50//! │ │ - Throughput calculation (MB/s) │ │
51//! │ │ - Duration tracking │ │
52//! │ │ - Completion statistics │ │
53//! │ └─────────────────────────────────────────────────────────┘ │
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Design Principles
58//!
59//! ### User-Focused Design
60//!
61//! The progress indicator is designed specifically for end-user feedback:
62//!
63//! - **Immediate Feedback**: Updates appear as soon as chunks are processed
64//! - **Clear Format**: Easy-to-read progress format with chunk IDs and counts
65//! - **Non-Intrusive**: Doesn't interfere with normal application logging
66//! - **Terminal Integration**: Works seamlessly with terminal-based
67//! applications
68//!
69//! ### Separation of Concerns
70//!
71//! Progress indication is separate from other monitoring systems:
72//!
73//! - **Not Logging**: Writes directly to terminal, bypassing logging systems
74//! - **Not Metrics**: Focused on user feedback, not system monitoring
75//! - **Real-Time**: Updates immediately, not batched or aggregated
76//! - **Ephemeral**: Progress display is temporary and contextual
77//!
78//! ### Performance Considerations
79//!
80//! - **Minimal Overhead**: Lightweight implementation to avoid performance
81//! impact
82//! - **Atomic Operations**: Lock-free progress updates using atomic counters
83//! - **Coordinated Output**: Mutex only for terminal output coordination
84//! - **Efficient Updates**: In-place terminal updates without scrolling
85//!
86//! ## Output Format
87//!
88//! ### Progress Display
89//!
90//! The progress indicator shows real-time chunk processing status:
91//!
92//! ```text
93//! Wrote Id: 000097/Completed: 002000
94//! ```
95//!
96//! - **Wrote Id**: Last chunk ID that was processed
97//! - **Completed**: Total number of chunks completed
98//! - **Format**: Zero-padded for consistent alignment
99//!
100//! ### Completion Summary
101//!
102//! Upon completion, shows comprehensive statistics:
103//!
104//! ```text
105//! ✅ Processing completed successfully!
106//! 📄 Processed: 1.25 GB
107//! ⏱️ Duration: 2.34 seconds
108//! 🚀 Throughput: 534.2 MB/s
109//! 📊 Chunks: 2000 total
110//! ```
111//!
112//! ## Usage Examples
113//!
114//! ### Basic Progress Tracking
115
116//!
117//! ### Concurrent Processing
118//!
119//!
120//! ### Integration with File Processing
121
122//!
123//! ## Thread Safety
124//!
125//! The progress indicator is designed for concurrent use:
126//!
127//! ### Atomic Counters
128//!
129//! - **Completed Chunks**: Atomic counter for lock-free updates
130//! - **Last Chunk ID**: Atomic storage for the most recent chunk ID
131//! - **Performance**: No contention on progress updates
132//!
133//! ### Terminal Coordination
134//!
135//! - **Output Mutex**: Coordinates terminal output to prevent garbled display
136//! - **Minimal Locking**: Mutex only held during actual terminal writes
137//! - **Non-Blocking**: Progress updates don't block on terminal output
138//!
139//! ## Performance Impact
140//!
141//! The progress indicator is designed to have minimal performance impact:
142//!
143//! - **Atomic Operations**: Lock-free progress updates
144//! - **Minimal Allocations**: Reuses buffers and avoids unnecessary allocations
145//! - **Efficient Terminal I/O**: Direct terminal writes without buffering
146//! - **Optional**: Can be disabled in production environments if needed
147//!
148//! ## Integration with Other Systems
149//!
150//! ### Logging System
151//!
152//! Progress indication is separate from logging:
153//!
154//! - **No Log Interference**: Doesn't interfere with structured logging
155//! - **Direct Terminal**: Writes directly to terminal, not through log handlers
156//! - **Complementary**: Works alongside logging for different purposes
157//!
158//! ### Metrics System
159//!
160//! Progress indication complements metrics collection:
161//!
162//! - **Different Purpose**: User feedback vs. system monitoring
163//! - **Real-Time**: Immediate updates vs. aggregated metrics
164//! - **Ephemeral**: Temporary display vs. persistent metrics storage
165//!
166//! ## Error Handling
167//!
168//! The progress indicator handles errors gracefully:
169//!
170//! - **Terminal Errors**: Gracefully handles terminal I/O errors
171//! - **Non-Fatal**: Progress indicator failures don't affect processing
172//! - **Fallback**: Can fall back to silent operation if terminal is unavailable
173//! - **Recovery**: Automatically recovers from transient terminal issues
174
175use std::io::{self, Write};
176use std::sync::atomic::{AtomicU64, Ordering};
177use std::sync::Arc;
178use tokio::sync::Mutex;
179use tokio::time::{Duration, Instant};
180
181/// Real-time progress indicator for user feedback during pipeline processing.
182///
183/// This provides immediate visual feedback to users about processing progress,
184/// separate from logging and metrics systems. It writes directly to the
185/// terminal with real-time updates on the same line.
186///
187/// # Design Principles
188/// - **User-Focused**: Provides immediate visual feedback for end users
189/// - **Non-Logging**: Writes directly to terminal, not through logging system
190/// - **Real-Time**: Updates as chunks are processed, not batched
191/// - **Concurrent-Safe**: Thread-safe for concurrent chunk processing
192/// - **Minimal Overhead**: Lightweight to avoid impacting performance
193///
194/// # Example Output
195/// ```text
196/// Wrote Id: 000097/Completed: 002000
197/// ```
198///
199/// # Usage
200pub struct ProgressIndicatorService {
201 /// Total number of chunks expected
202 total_chunks: u64,
203
204 /// Number of chunks completed (atomic for thread safety)
205 completed_chunks: Arc<AtomicU64>,
206
207 /// Last chunk ID written (for display)
208 last_chunk_id: Arc<AtomicU64>,
209
210 /// Mutex for terminal output coordination
211 terminal_mutex: Arc<Mutex<()>>,
212
213 /// Start time for duration calculation
214 start_time: Instant,
215
216 /// Last update time (to avoid too frequent updates)
217 last_update: Arc<Mutex<Instant>>,
218}
219
220impl ProgressIndicatorService {
221 /// Creates a new progress indicator.
222 ///
223 /// # Arguments
224 /// * `total_chunks` - Total number of chunks expected to be processed
225 ///
226 /// # Returns
227 /// * `Self` - New progress indicator instance
228 pub fn new(total_chunks: u64) -> Self {
229 // Show initial progress with blank line before
230 println!();
231 print!("\rWrote Id: 000000/Completed: {:06}", total_chunks);
232 io::stdout().flush().unwrap_or(());
233
234 Self {
235 total_chunks,
236 completed_chunks: Arc::new(AtomicU64::new(0)),
237 last_chunk_id: Arc::new(AtomicU64::new(0)),
238 terminal_mutex: Arc::new(Mutex::new(())),
239 start_time: Instant::now(),
240 last_update: Arc::new(Mutex::new(Instant::now())),
241 }
242 }
243
244 /// Updates progress when a chunk has been successfully written.
245 ///
246 /// This method is thread-safe and can be called concurrently from
247 /// multiple chunk processing tasks.
248 ///
249 /// # Arguments
250 /// * `chunk_id` - ID of the chunk that was just written
251 ///
252 /// # Performance
253 /// Updates are throttled to avoid excessive terminal I/O during
254 /// high-throughput processing.
255 pub async fn update_progress(&self, chunk_id: u64) {
256 // Update counters atomically
257 let completed = self.completed_chunks.fetch_add(1, Ordering::Relaxed) + 1;
258 self.last_chunk_id.store(chunk_id, Ordering::Relaxed);
259
260 // Throttle updates to avoid excessive terminal I/O
261 // Only update every 100ms or every 10 chunks, whichever comes first
262 let should_update = {
263 let mut last_update = self.last_update.lock().await;
264 let now = Instant::now();
265 let time_since_update = now.duration_since(*last_update);
266
267 if time_since_update >= Duration::from_millis(100) || completed.is_multiple_of(10) {
268 *last_update = now;
269 true
270 } else {
271 false
272 }
273 };
274
275 if should_update {
276 self.update_display(chunk_id, completed).await;
277 }
278 }
279
280 /// Updates the terminal display with current progress.
281 ///
282 /// This method coordinates terminal access to ensure clean output
283 /// even with concurrent chunk processing.
284 async fn update_display(&self, chunk_id: u64, completed: u64) {
285 let _terminal_lock = self.terminal_mutex.lock().await;
286
287 // Clear the current line and write new progress
288 print!("\rWrote Id: {:06}/Completed: {:06}", chunk_id, completed);
289 io::stdout().flush().unwrap_or(());
290 }
291
292 /// Shows the final completion summary.
293 ///
294 /// This replaces the progress indicator with a comprehensive summary
295 /// of the processing results.
296 ///
297 /// # Arguments
298 /// * `bytes_processed` - Total bytes processed
299 /// * `throughput_mb_s` - Processing throughput in MB/s
300 /// * `total_duration` - Total time taken for processing
301 pub async fn show_completion(&self, _bytes_processed: u64, _throughput_mb_s: f64, _total_duration: Duration) {
302 let _terminal_lock = self.terminal_mutex.lock().await;
303
304 // Clear the progress line and show final progress with correct total
305 let final_completed = self.completed_chunks.load(Ordering::Relaxed);
306 print!(
307 "\rWrote Id: {:06}/Completed: {:06}\n",
308 self.last_chunk_id.load(Ordering::Relaxed),
309 final_completed
310 );
311
312 io::stdout().flush().unwrap_or(());
313 }
314
315 /// Shows an error summary if processing fails.
316 ///
317 /// # Arguments
318 /// * `error_message` - Description of what went wrong
319 pub async fn show_error_summary(&self, error_message: &str) {
320 let _terminal_lock = self.terminal_mutex.lock().await;
321
322 // Clear the progress line and show final progress
323 let final_completed = self.completed_chunks.load(Ordering::Relaxed);
324 println!(
325 "\rWrote Id: {:06}/Completed: {:06}",
326 self.last_chunk_id.load(Ordering::Relaxed),
327 final_completed
328 );
329
330 // Show error summary with 6-digit precision
331 println!("\n✗ Processing Failed!");
332 println!(" Chunks Completed: {:06}", final_completed);
333 println!(" Total Expected: {:06}", self.total_chunks);
334 println!(" Error: {}", error_message);
335 println!();
336 io::stdout().flush().unwrap_or(());
337 }
338
339 /// Gets the current progress as a percentage.
340 ///
341 /// # Returns
342 /// * `f64` - Progress percentage (0.0 to 100.0)
343 pub fn progress_percentage(&self) -> f64 {
344 let completed = self.completed_chunks.load(Ordering::Relaxed);
345 if self.total_chunks > 0 {
346 ((completed as f64) / (self.total_chunks as f64)) * 100.0
347 } else {
348 0.0
349 }
350 }
351}
352
353// Explicitly implement Send and Sync for ProgressIndicatorService
354// All fields are Send + Sync (Arc<AtomicU64>, Arc<Mutex<T>>, u64, Instant)
355unsafe impl Send for ProgressIndicatorService {}
356unsafe impl Sync for ProgressIndicatorService {}
357
358/// Formats bytes in human-readable format.
359///
360/// # Arguments
361/// * `bytes` - Number of bytes to format
362///
363/// # Returns
364/// * `String` - Human-readable byte format (e.g., "1.5 MB")
365#[allow(dead_code)]
366fn format_bytes(bytes: u64) -> String {
367 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
368 const THRESHOLD: f64 = 1024.0;
369
370 if bytes == 0 {
371 return "0 B".to_string();
372 }
373
374 let mut size = bytes as f64;
375 let mut unit_index = 0;
376
377 while size >= THRESHOLD && unit_index < UNITS.len() - 1 {
378 size /= THRESHOLD;
379 unit_index += 1;
380 }
381
382 if unit_index == 0 {
383 format!("{} {}", bytes, UNITS[unit_index])
384 } else {
385 format!("{:.1} {}", size, UNITS[unit_index])
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use tokio::time::{sleep, Duration};
393
394 #[tokio::test]
395 async fn test_progress_indicator_creation() {
396 let progress = ProgressIndicatorService::new(100);
397 assert_eq!(progress.total_chunks, 100);
398 assert_eq!(progress.completed_chunks.load(Ordering::Relaxed), 0);
399 }
400
401 #[tokio::test]
402 async fn test_chunk_update() {
403 let progress = ProgressIndicatorService::new(100);
404
405 progress.update_progress(1).await;
406 progress.update_progress(2).await;
407
408 assert_eq!(progress.completed_chunks.load(Ordering::Relaxed), 2);
409 assert_eq!(progress.last_chunk_id.load(Ordering::Relaxed), 2);
410 }
411
412 #[tokio::test]
413 async fn test_progress_percentage() {
414 let progress = ProgressIndicatorService::new(100);
415
416 assert_eq!(progress.progress_percentage(), 0.0);
417
418 progress.update_progress(1).await;
419 progress.update_progress(2).await;
420
421 assert_eq!(progress.progress_percentage(), 2.0);
422 }
423
424 /// Tests byte formatting for human-readable display.
425 ///
426 /// This test validates that the byte formatting function properly
427 /// converts byte values to human-readable strings with appropriate
428 /// units (B, KB, MB, GB) and decimal precision.
429 ///
430 /// # Test Coverage
431 ///
432 /// - Zero byte formatting
433 /// - Byte-level formatting (< 1KB)
434 /// - Kilobyte formatting with decimal precision
435 /// - Megabyte formatting with decimal precision
436 /// - Gigabyte formatting with decimal precision
437 /// - Unit selection and conversion accuracy
438 ///
439 /// # Test Scenario
440 ///
441 /// Tests various byte values across different scales to ensure
442 /// proper unit selection and formatting precision.
443 ///
444 /// # Infrastructure Concerns
445 ///
446 /// - User interface display formatting
447 /// - Progress reporting and visualization
448 /// - Human-readable data size representation
449 /// - Consistent formatting across the application
450 ///
451 /// # Assertions
452 ///
453 /// - Zero bytes display as "0 B"
454 /// - Small values display in bytes
455 /// - KB values display with 1 decimal place
456 /// - MB values display with 1 decimal place
457 /// - GB values display with 1 decimal place
458 #[test]
459 fn test_format_bytes() {
460 assert_eq!(format_bytes(0), "0 B");
461 assert_eq!(format_bytes(512), "512 B");
462 assert_eq!(format_bytes(1024), "1.0 KB");
463 assert_eq!(format_bytes(1536), "1.5 KB");
464 assert_eq!(format_bytes(1048576), "1.0 MB");
465 assert_eq!(format_bytes(1073741824), "1.0 GB");
466 }
467}