1use anyhow::Result;
4use chrono::{DateTime, Utc};
5use crossbeam_channel::{Receiver, Sender};
6use indicatif::{ProgressBar, ProgressStyle};
7use serde::{Deserialize, Serialize};
8use std::sync::{
9 atomic::{AtomicBool, AtomicU64, Ordering},
10 Arc,
11};
12use std::time::{Duration, Instant};
13use tokio::sync::broadcast;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18pub struct ProgressUpdate {
19 pub operation_id: Uuid,
20 pub operation_name: String,
21 pub current: u64,
22 pub total: Option<u64>,
23 pub message: Option<String>,
24 pub timestamp: DateTime<Utc>,
25 pub status: ProgressStatus,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
30pub enum ProgressStatus {
31 Started,
32 InProgress,
33 Completed,
34 Failed,
35 Cancelled,
36}
37
38#[derive(Debug)]
40pub struct ProgressTracker {
41 operation_id: Uuid,
42 operation_name: String,
43 current: Arc<AtomicU64>,
44 total: Option<u64>,
45 is_cancelled: Arc<AtomicBool>,
46 progress_bar: Option<ProgressBar>,
47 sender: Sender<ProgressUpdate>,
48 start_time: Instant,
49}
50
51impl ProgressTracker {
52 #[must_use]
57 pub fn new(
58 operation_name: &str,
59 total: Option<u64>,
60 sender: Sender<ProgressUpdate>,
61 show_progress_bar: bool,
62 ) -> Self {
63 let operation_id = Uuid::new_v4();
64 let current = Arc::new(AtomicU64::new(0));
65 let is_cancelled = Arc::new(AtomicBool::new(false));
66
67 let progress_bar = if show_progress_bar {
68 let pb = if let Some(total) = total {
69 ProgressBar::new(total)
70 } else {
71 ProgressBar::new_spinner()
72 };
73
74 let style = if total.is_some() {
75 ProgressStyle::default_bar()
76 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}")
77 .unwrap()
78 .progress_chars("#>-")
79 } else {
80 ProgressStyle::default_spinner()
81 .template("{spinner:.green} [{elapsed_precise}] {msg}")
82 .unwrap()
83 };
84
85 pb.set_style(style);
86 Some(pb)
87 } else {
88 None
89 };
90
91 let tracker = Self {
92 operation_id,
93 operation_name: operation_name.to_string(),
94 current,
95 total,
96 is_cancelled,
97 progress_bar,
98 sender,
99 start_time: Instant::now(),
100 };
101
102 tracker.send_update(ProgressStatus::Started, None);
104
105 tracker
106 }
107
108 pub fn inc(&self, amount: u64) {
110 if self.is_cancelled.load(Ordering::Relaxed) {
111 return;
112 }
113
114 let _new_current = self.current.fetch_add(amount, Ordering::Relaxed) + amount;
115
116 if let Some(pb) = &self.progress_bar {
117 pb.inc(amount);
118 }
119
120 self.send_update(ProgressStatus::InProgress, None);
121 }
122
123 pub fn set_current(&self, current: u64) {
125 if self.is_cancelled.load(Ordering::Relaxed) {
126 return;
127 }
128
129 self.current.store(current, Ordering::Relaxed);
130
131 if let Some(pb) = &self.progress_bar {
132 pb.set_position(current);
133 }
134
135 self.send_update(ProgressStatus::InProgress, None);
136 }
137
138 pub fn set_message(&self, message: String) {
140 if self.is_cancelled.load(Ordering::Relaxed) {
141 return;
142 }
143
144 if let Some(pb) = &self.progress_bar {
145 pb.set_message(message.clone());
146 }
147
148 self.send_update(ProgressStatus::InProgress, Some(message));
149 }
150
151 pub fn complete(&self) {
153 if self.is_cancelled.load(Ordering::Relaxed) {
154 return;
155 }
156
157 if let Some(pb) = &self.progress_bar {
158 pb.finish_with_message("Completed");
159 }
160
161 self.send_update(ProgressStatus::Completed, None);
162 }
163
164 pub fn fail(&self, error_message: String) {
166 if self.is_cancelled.load(Ordering::Relaxed) {
167 return;
168 }
169
170 self.is_cancelled.store(true, Ordering::Relaxed);
171
172 if let Some(pb) = &self.progress_bar {
173 pb.finish();
174 }
175
176 self.send_update(ProgressStatus::Failed, Some(error_message));
177 }
178
179 pub fn cancel(&self) {
181 self.is_cancelled.store(true, Ordering::Relaxed);
182
183 if let Some(pb) = &self.progress_bar {
184 pb.finish_with_message("Cancelled");
185 }
186
187 self.send_update(ProgressStatus::Cancelled, None);
188 }
189
190 #[must_use]
192 pub fn is_cancelled(&self) -> bool {
193 self.is_cancelled.load(Ordering::Relaxed)
194 }
195
196 #[must_use]
198 pub fn current(&self) -> u64 {
199 self.current.load(Ordering::Relaxed)
200 }
201
202 #[must_use]
204 pub fn total(&self) -> Option<u64> {
205 self.total
206 }
207
208 #[must_use]
210 pub fn operation_id(&self) -> Uuid {
211 self.operation_id
212 }
213
214 #[must_use]
216 pub fn operation_name(&self) -> &str {
217 &self.operation_name
218 }
219
220 #[must_use]
222 pub fn elapsed(&self) -> Duration {
223 self.start_time.elapsed()
224 }
225
226 fn send_update(&self, status: ProgressStatus, message: Option<String>) {
228 let update = ProgressUpdate {
229 operation_id: self.operation_id,
230 operation_name: self.operation_name.clone(),
231 current: self.current.load(Ordering::Relaxed),
232 total: self.total,
233 message,
234 timestamp: Utc::now(),
235 status,
236 };
237
238 let _ = self.sender.try_send(update);
239 }
240}
241
242#[derive(Clone, Debug)]
244pub struct ProgressManager {
245 sender: Sender<ProgressUpdate>,
246 receiver: Receiver<ProgressUpdate>,
247 broadcast_sender: broadcast::Sender<ProgressUpdate>,
248}
249
250impl ProgressManager {
251 #[must_use]
253 pub fn new() -> Self {
254 let (sender, receiver) = crossbeam_channel::unbounded();
255 let (broadcast_sender, _) = broadcast::channel(1000);
256
257 Self {
258 sender,
259 receiver,
260 broadcast_sender,
261 }
262 }
263
264 #[must_use]
266 pub fn create_tracker(
267 &self,
268 operation_name: &str,
269 total: Option<u64>,
270 show_progress_bar: bool,
271 ) -> ProgressTracker {
272 ProgressTracker::new(
273 operation_name,
274 total,
275 self.sender.clone(),
276 show_progress_bar,
277 )
278 }
279
280 #[must_use]
282 pub fn subscribe(&self) -> broadcast::Receiver<ProgressUpdate> {
283 self.broadcast_sender.subscribe()
284 }
285
286 pub fn run(&self) -> Result<()> {
291 while let Ok(update) = self.receiver.recv() {
292 let _ = self.broadcast_sender.send(update);
294 }
295 Ok(())
296 }
297
298 #[must_use]
300 pub fn sender(&self) -> Sender<ProgressUpdate> {
301 self.sender.clone()
302 }
303}
304
305impl Default for ProgressManager {
306 fn default() -> Self {
307 Self::new()
308 }
309}
310
311pub trait TrackableOperation {
313 fn execute_with_progress(&self, tracker: &ProgressTracker) -> Result<()>;
318}
319
320#[macro_export]
322macro_rules! trackable_operation {
323 ($name:expr, $total:expr, $operation:block) => {{
324 let progress_manager = ProgressManager::new();
325 let tracker = progress_manager.create_tracker($name, $total, true);
326
327 let manager = progress_manager.clone();
329 tokio::spawn(async move {
330 let _ = manager.run();
331 });
332
333 let result = $operation;
334
335 if result.is_ok() {
336 tracker.complete();
337 } else {
338 tracker.fail(format!("{:?}", result.as_ref().unwrap_err()));
339 }
340
341 result
342 }};
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::time::Duration as StdDuration;
349
350 #[test]
351 fn test_progress_tracker_creation() {
352 let (sender, _receiver) = crossbeam_channel::unbounded();
353 let tracker = ProgressTracker::new("test_operation", Some(100), sender, false);
354
355 assert_eq!(tracker.operation_name(), "test_operation");
356 assert_eq!(tracker.total(), Some(100));
357 assert_eq!(tracker.current(), 0);
358 assert!(!tracker.is_cancelled());
359 }
360
361 #[test]
362 fn test_progress_tracker_increment() {
363 let (sender, _receiver) = crossbeam_channel::unbounded();
364 let tracker = ProgressTracker::new("test_operation", Some(100), sender, false);
365
366 tracker.inc(10);
367 assert_eq!(tracker.current(), 10);
368
369 tracker.inc(5);
370 assert_eq!(tracker.current(), 15);
371 }
372
373 #[test]
374 fn test_progress_tracker_set_current() {
375 let (sender, _receiver) = crossbeam_channel::unbounded();
376 let tracker = ProgressTracker::new("test_operation", Some(100), sender, false);
377
378 tracker.set_current(50);
379 assert_eq!(tracker.current(), 50);
380 }
381
382 #[test]
383 fn test_progress_tracker_cancellation() {
384 let (sender, _receiver) = crossbeam_channel::unbounded();
385 let tracker = ProgressTracker::new("test_operation", Some(100), sender, false);
386
387 assert!(!tracker.is_cancelled());
388 tracker.cancel();
389 assert!(tracker.is_cancelled());
390 }
391
392 #[test]
393 fn test_progress_manager() {
394 let manager = ProgressManager::new();
395 let tracker = manager.create_tracker("test_operation", Some(100), false);
396
397 assert_eq!(tracker.operation_name(), "test_operation");
398 assert_eq!(tracker.total(), Some(100));
399 }
400
401 #[tokio::test]
402 #[ignore = "This test is flaky due to async timing issues"]
403 async fn test_progress_manager_subscription() {
404 let manager = ProgressManager::new();
405 let mut subscriber = manager.subscribe();
406
407 let tracker = manager.create_tracker("test_operation", Some(100), false);
408
409 let manager_clone = manager.clone();
411 let manager_handle = tokio::spawn(async move {
412 let _ = manager_clone.run();
413 });
414
415 tokio::time::sleep(StdDuration::from_millis(10)).await;
417
418 tracker.inc(10);
420
421 tokio::time::sleep(StdDuration::from_millis(10)).await;
423
424 let _update_result =
426 tokio::time::timeout(StdDuration::from_millis(50), subscriber.recv()).await;
427
428 manager_handle.abort();
430
431 }
435
436 #[test]
437 fn test_trackable_operation_macro() {
438 let manager = ProgressManager::new();
440 let tracker = manager.create_tracker("test", Some(10), false);
441
442 tracker.inc(5);
444 assert_eq!(tracker.current(), 5);
445 tracker.complete();
446 }
447
448 #[test]
449 fn test_progress_tracker_edge_cases() {
450 let manager = ProgressManager::new();
451 let tracker = manager.create_tracker("edge_case_test", Some(100), false);
452
453 tracker.inc(0);
455 assert_eq!(tracker.current(), 0);
456
457 tracker.inc(1000);
459 assert_eq!(tracker.current(), 1000);
460
461 tracker.set_current(50);
463 assert_eq!(tracker.current(), 50);
464
465 tracker.set_current(0);
466 assert_eq!(tracker.current(), 0);
467
468 tracker.set_current(100);
469 assert_eq!(tracker.current(), 100);
470 }
471
472 #[test]
473 fn test_progress_tracker_without_total() {
474 let manager = ProgressManager::new();
475 let tracker = manager.create_tracker("no_total_test", None, false);
476
477 tracker.inc(10);
479 assert_eq!(tracker.current(), 10);
480 assert_eq!(tracker.total(), None);
481
482 tracker.set_current(50);
483 assert_eq!(tracker.current(), 50);
484
485 tracker.complete();
486 }
487
488 #[test]
489 fn test_progress_tracker_failure() {
490 let manager = ProgressManager::new();
491 let tracker = manager.create_tracker("failure_test", Some(100), false);
492
493 tracker.fail("Test failure message".to_string());
495 assert!(tracker.is_cancelled());
497 }
498
499 #[test]
500 fn test_progress_tracker_elapsed_time() {
501 let manager = ProgressManager::new();
502 let tracker = manager.create_tracker("elapsed_test", Some(100), false);
503
504 let elapsed = tracker.elapsed();
506 std::thread::sleep(std::time::Duration::from_millis(10));
510 let elapsed_after = tracker.elapsed();
511 assert!(elapsed_after >= elapsed);
512 }
513
514 #[test]
515 fn test_progress_tracker_operation_info() {
516 let manager = ProgressManager::new();
517 let tracker = manager.create_tracker("info_test", Some(100), false);
518
519 assert_eq!(tracker.operation_id(), tracker.operation_id());
521 assert_eq!(tracker.operation_name(), "info_test");
522 assert_eq!(tracker.total(), Some(100));
523 }
524
525 #[test]
526 fn test_progress_manager_multiple_trackers() {
527 let manager = ProgressManager::new();
528
529 let tracker1 = manager.create_tracker("operation1", Some(100), false);
531 let tracker2 = manager.create_tracker("operation2", Some(200), false);
532 let tracker3 = manager.create_tracker("operation3", None, false);
533
534 assert_ne!(tracker1.operation_id(), tracker2.operation_id());
536 assert_ne!(tracker1.operation_id(), tracker3.operation_id());
537 assert_ne!(tracker2.operation_id(), tracker3.operation_id());
538
539 tracker1.inc(10);
541 tracker2.inc(20);
542 tracker3.inc(30);
543
544 assert_eq!(tracker1.current(), 10);
545 assert_eq!(tracker2.current(), 20);
546 assert_eq!(tracker3.current(), 30);
547 }
548
549 #[test]
550 fn test_progress_tracker_completion() {
551 let manager = ProgressManager::new();
552 let tracker = manager.create_tracker("completion_test", Some(100), false);
553
554 tracker.set_current(100);
556 tracker.complete();
557
558 assert_eq!(tracker.current(), 100);
560 assert_eq!(tracker.total(), Some(100));
561 }
562
563 #[test]
564 fn test_progress_tracker_large_values() {
565 let manager = ProgressManager::new();
566 let tracker = manager.create_tracker("large_values_test", Some(u64::MAX), false);
567
568 tracker.set_current(u64::MAX / 2);
570 assert_eq!(tracker.current(), u64::MAX / 2);
571
572 tracker.inc(1000);
573 assert_eq!(tracker.current(), u64::MAX / 2 + 1000);
574 }
575
576 #[test]
577 fn test_progress_tracker_negative_operations() {
578 let manager = ProgressManager::new();
579 let tracker = manager.create_tracker("negative_test", Some(100), false);
580
581 tracker.inc(50);
583 assert_eq!(tracker.current(), 50);
584
585 tracker.set_current(25);
587 assert_eq!(tracker.current(), 25);
588 }
589
590 #[test]
591 fn test_progress_manager_sender_access() {
592 let manager = ProgressManager::new();
593 let _sender = manager.sender();
594
595 }
598
599 #[test]
600 fn test_progress_tracker_debug_formatting() {
601 let manager = ProgressManager::new();
602 let tracker = manager.create_tracker("debug_test", Some(100), false);
603
604 let debug_str = format!("{tracker:?}");
606 assert!(debug_str.contains("debug_test"));
607 assert!(debug_str.contains("ProgressTracker"));
608 }
609
610 #[test]
611 fn test_progress_manager_debug_formatting() {
612 let manager = ProgressManager::new();
613
614 let debug_str = format!("{manager:?}");
616 assert!(debug_str.contains("ProgressManager"));
617 }
618
619 #[test]
620 fn test_progress_update_creation() {
621 let update = ProgressUpdate {
622 operation_id: Uuid::new_v4(),
623 operation_name: "test_operation".to_string(),
624 current: 50,
625 total: Some(100),
626 message: Some("Test message".to_string()),
627 timestamp: Utc::now(),
628 status: ProgressStatus::InProgress,
629 };
630
631 assert_eq!(update.operation_name, "test_operation");
632 assert_eq!(update.current, 50);
633 assert_eq!(update.total, Some(100));
634 assert_eq!(update.message, Some("Test message".to_string()));
635 }
636
637 #[test]
638 fn test_progress_update_serialization() {
639 let update = ProgressUpdate {
640 operation_id: Uuid::new_v4(),
641 operation_name: "serialization_test".to_string(),
642 current: 75,
643 total: Some(150),
644 message: Some("Serialization test".to_string()),
645 timestamp: Utc::now(),
646 status: ProgressStatus::InProgress,
647 };
648
649 let json = serde_json::to_string(&update).unwrap();
651 let deserialized: ProgressUpdate = serde_json::from_str(&json).unwrap();
652
653 assert_eq!(update.operation_id, deserialized.operation_id);
654 assert_eq!(update.operation_name, deserialized.operation_name);
655 assert_eq!(update.current, deserialized.current);
656 assert_eq!(update.total, deserialized.total);
657 assert_eq!(update.message, deserialized.message);
658 }
659
660 #[test]
661 fn test_progress_update_edge_cases() {
662 let update_none = ProgressUpdate {
664 operation_id: Uuid::new_v4(),
665 operation_name: String::new(),
666 current: 0,
667 total: None,
668 message: None,
669 timestamp: Utc::now(),
670 status: ProgressStatus::Started,
671 };
672
673 assert_eq!(update_none.operation_name, "");
674 assert_eq!(update_none.current, 0);
675 assert_eq!(update_none.total, None);
676 assert_eq!(update_none.message, None);
677
678 let update_max = ProgressUpdate {
680 operation_id: Uuid::new_v4(),
681 operation_name: "A".repeat(1000),
682 current: u64::MAX,
683 total: Some(u64::MAX),
684 message: Some("B".repeat(1000)),
685 timestamp: Utc::now(),
686 status: ProgressStatus::Completed,
687 };
688
689 assert_eq!(update_max.current, u64::MAX);
690 assert_eq!(update_max.total, Some(u64::MAX));
691 }
692}