1use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24 time::Duration,
25};
26
27use compact_str::CompactString;
28use parking_lot::RwLock;
29use web_time::Instant;
30
31#[derive(Clone)]
39pub struct Progress {
40 pub(crate) kind: ProgressType,
42
43 pub(crate) start: Option<Instant>,
45
46 pub(crate) cold: Arc<RwLock<Cold>>,
48
49 pub(crate) item: Arc<RwLock<CompactString>>,
51
52 pub(crate) position: Arc<AtomicU64>,
54 pub(crate) total: Arc<AtomicU64>,
55 pub(crate) finished: Arc<AtomicBool>,
56}
57
58pub struct Cold {
60 pub(crate) name: CompactString,
61 pub(crate) stopped: Option<Instant>,
62 pub(crate) error: Option<CompactString>,
63}
64
65#[repr(u8)]
67#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
68#[cfg_attr(
69 feature = "rkyv",
70 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
71)]
72#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
73#[cfg_attr(feature = "rkyv", rkyv(derive(Debug, Eq, PartialEq)))]
74pub enum ProgressType {
75 #[default]
77 Spinner,
78 Bar,
80}
81
82impl Progress {
83 pub fn new(kind: ProgressType, name: impl Into<CompactString>, total: impl Into<u64>) -> Self {
91 Self {
92 kind,
93 start: None,
94 cold: Arc::new(RwLock::new(Cold {
95 name: name.into(),
96 stopped: None,
97 error: None,
98 })),
99 item: Arc::new(RwLock::new(CompactString::default())),
100 position: Arc::new(AtomicU64::new(0)),
101 total: Arc::new(AtomicU64::new(total.into())),
102 finished: Arc::new(AtomicBool::new(false)),
103 }
104 }
105
106 #[must_use]
108 pub fn new_pb(name: impl Into<CompactString>, total: impl Into<u64>) -> Self {
109 Self::new(ProgressType::Bar, name, total)
110 }
111
112 #[must_use]
114 pub fn new_spinner(name: impl Into<CompactString>) -> Self {
115 Self::new(ProgressType::Spinner, name, 0u64)
116 }
117
118 #[must_use]
124 pub fn get_name(&self) -> CompactString {
125 self.cold.read().name.clone()
126 }
127
128 pub fn set_name(&self, name: impl Into<CompactString>) {
130 self.cold.write().name = name.into();
131 }
132
133 #[must_use]
135 pub fn get_item(&self) -> CompactString {
136 self.item.read().clone()
137 }
138
139 pub fn set_item(&self, item: impl Into<CompactString>) {
141 *self.item.write() = item.into();
142 }
143
144 #[must_use]
146 pub fn get_error(&self) -> Option<CompactString> {
147 self.cold.read().error.clone()
148 }
149
150 pub fn set_error(&self, error: Option<impl Into<CompactString>>) {
152 let error = error.map(Into::into);
153 self.cold.write().error = error;
154 }
155
156 pub fn inc(&self, amount: impl Into<u64>) {
164 self.position.fetch_add(amount.into(), Ordering::Relaxed);
165 }
166
167 #[must_use]
169 pub fn get_pos(&self) -> u64 {
170 self.position.load(Ordering::Relaxed)
171 }
172
173 pub fn set_pos(&self, pos: u64) {
175 self.position.store(pos, Ordering::Relaxed);
176 }
177
178 #[must_use]
180 pub fn get_total(&self) -> u64 {
181 self.total.load(Ordering::Relaxed)
182 }
183
184 pub fn set_total(&self, total: u64) {
186 self.total.store(total, Ordering::Relaxed);
187 }
188
189 #[must_use]
191 pub fn is_finished(&self) -> bool {
192 self.finished.load(Ordering::Acquire)
194 }
195
196 pub fn set_finished(&self, finished: bool) {
201 self.finished.store(finished, Ordering::Release);
202 }
203
204 #[must_use]
213 pub fn get_elapsed(&self) -> Option<Duration> {
214 let start = self.start?;
215 let cold = self.cold.read();
216
217 Some(
218 cold.stopped
219 .map_or_else(|| start.elapsed(), |stopped| stopped.duration_since(start)),
220 )
221 }
222
223 #[allow(clippy::cast_precision_loss)]
227 #[must_use]
228 pub fn get_percent(&self) -> f64 {
229 let pos = self.get_pos() as f64;
230 let total = self.get_total() as f64;
231
232 if total == 0.0 {
233 0.0
234 } else {
235 (pos / total) * 100.0
236 }
237 }
238
239 pub fn finish(&self) {
245 if self.start.is_some() {
246 self.cold.write().stopped.replace(Instant::now());
247 }
248 self.set_finished(true);
249 }
250
251 pub fn finish_with_item(&self, item: impl Into<CompactString>) {
253 self.set_item(item);
254 self.finish(); }
256
257 pub fn finish_with_error(&self, error: impl Into<CompactString>) {
259 self.set_error(Some(error));
260 self.finish();
261 }
262
263 #[must_use]
271 pub fn atomic_pos(&self) -> Arc<AtomicU64> {
272 self.position.clone()
273 }
274
275 #[must_use]
277 pub fn atomic_total(&self) -> Arc<AtomicU64> {
278 self.total.clone()
279 }
280
281 #[must_use]
285 pub fn snapshot(&self) -> ProgressSnapshot {
286 self.into()
287 }
288}
289
290#[derive(Clone, Debug, Default, Eq, PartialEq)]
295#[cfg_attr(
296 feature = "rkyv",
297 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
298)]
299#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
300#[cfg_attr(feature = "rkyv", rkyv(derive(Debug, Eq, PartialEq)))]
301pub struct ProgressSnapshot {
302 kind: ProgressType,
303
304 name: CompactString,
305 item: CompactString,
306
307 elapsed: Option<Duration>,
308
309 position: u64,
310 total: u64,
311
312 finished: bool,
313
314 error: Option<CompactString>,
315}
316
317impl From<&Progress> for ProgressSnapshot {
318 fn from(progress: &Progress) -> Self {
319 let cold = progress.cold.read();
321 let name = cold.name.clone();
322 let error = cold.error.clone();
323 drop(cold);
324
325 Self {
326 kind: progress.kind,
327 name,
328 item: progress.item.read().clone(),
329 elapsed: progress.get_elapsed(),
330 position: progress.position.load(Ordering::Relaxed),
331 total: progress.total.load(Ordering::Relaxed),
332 finished: progress.finished.load(Ordering::Relaxed),
333 error,
334 }
335 }
336}
337
338impl ProgressSnapshot {
339 #[must_use]
341 pub const fn kind(&self) -> ProgressType {
342 self.kind
343 }
344
345 #[must_use]
347 pub fn name(&self) -> &str {
348 &self.name
349 }
350 #[must_use]
352 pub fn item(&self) -> &str {
353 &self.item
354 }
355
356 #[must_use]
358 pub const fn elapsed(&self) -> Option<Duration> {
359 self.elapsed
360 }
361
362 #[must_use]
364 pub const fn position(&self) -> u64 {
365 self.position
366 }
367 #[must_use]
369 pub const fn total(&self) -> u64 {
370 self.total
371 }
372
373 #[must_use]
375 pub const fn finished(&self) -> bool {
376 self.finished
377 }
378
379 #[must_use]
381 pub fn error(&self) -> Option<&str> {
382 self.error.as_deref()
383 }
384
385 #[allow(clippy::cast_precision_loss)]
393 #[must_use]
394 pub fn eta(&self) -> Option<Duration> {
395 if self.position == 0 || self.total == 0 || self.finished {
396 return None;
397 }
398
399 let elapsed = self.elapsed?;
400 let secs = elapsed.as_secs_f64();
401
402 if secs <= 1e-6 {
404 return None;
405 }
406
407 let rate = self.position as f64 / secs;
408 if rate <= 0.0 {
409 return None;
410 }
411
412 let remaining_items = self.total.saturating_sub(self.position);
413 let remaining_secs = remaining_items as f64 / rate;
414
415 Some(Duration::from_secs_f64(remaining_secs))
416 }
417
418 #[allow(clippy::cast_precision_loss)]
420 #[must_use]
421 pub fn throughput(&self) -> f64 {
422 if let Some(elapsed) = self.elapsed {
423 let secs = elapsed.as_secs_f64();
424 if secs > 0.0 {
425 return self.position as f64 / secs;
426 }
427 }
428 0.0
429 }
430
431 #[allow(clippy::cast_precision_loss)]
435 #[must_use]
436 pub fn throughput_since(&self, prev: &Self) -> f64 {
437 let pos_diff = self.position.saturating_sub(prev.position) as f64;
438
439 let time_diff = match (self.elapsed, prev.elapsed) {
440 (Some(curr), Some(old)) => curr.as_secs_f64() - old.as_secs_f64(),
441 _ => 0.0,
442 };
443
444 if time_diff > 0.0 {
445 pos_diff / time_diff
446 } else {
447 0.0
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use std::thread;
455
456 use super::Progress;
457
458 #[test]
461 #[allow(clippy::float_cmp)]
462 fn test_basic_lifecycle() {
463 let p = Progress::new_pb("test_job", 100u64);
464
465 assert_eq!(p.get_pos(), 0);
466 assert!(!p.is_finished());
467 assert_eq!(p.get_percent(), 0.0);
468
469 p.inc(50u64);
470 assert_eq!(p.get_pos(), 50);
471 assert_eq!(p.get_percent(), 50.0);
472
473 p.finish();
474 assert!(p.is_finished());
475
476 assert!(p.get_elapsed().is_none());
478 }
479
480 #[test]
483 fn test_concurrency_atomics() {
484 let p = Progress::new_spinner("concurrent_job");
485 let mut handles = vec![];
486
487 for _ in 0..10 {
489 let p_ref = p.clone();
490 handles.push(thread::spawn(move || {
491 for _ in 0..100 {
492 p_ref.inc(1u64);
493 }
494 }));
495 }
496
497 for h in handles {
498 h.join().unwrap();
499 }
500
501 assert_eq!(p.get_pos(), 1000, "Atomic updates should be lossless");
502 }
503
504 #[test]
507 fn test_snapshot_metadata() {
508 let p = Progress::new_pb("initial_name", 100u64);
509
510 p.set_name("updated_name");
512 p.set_item("file_a.txt");
513 p.set_error(Some("disk_full"));
514
515 let snap = p.snapshot();
516
517 assert_eq!(snap.name, "updated_name");
518 assert_eq!(snap.item, "file_a.txt");
519 assert_eq!(snap.error, Some("disk_full".into()));
520 }
521
522 #[allow(clippy::float_cmp)]
525 #[test]
526 fn test_math_safety() {
527 let p = Progress::new_pb("math_test", 100u64);
528 let snap = p.snapshot();
529
530 assert_eq!(snap.throughput(), 0.0);
532 assert!(snap.eta().is_none());
533
534 let p_zero = Progress::new_pb("zero_total", 0u64);
537 assert_eq!(p_zero.get_percent(), 0.0);
538 }
539}