backblaze_b2_client/tasks/
shared.rs1use std::{
2 fmt::Display,
3 sync::{
4 atomic::{AtomicU64, Ordering},
5 Arc,
6 },
7 time::{Duration, Instant},
8};
9
10use tokio::io::{AsyncRead, AsyncSeek};
11
12use crate::util::{write_lock_arc::WriteLockArc, RollingTimeSeries, SizeUnit};
13
14pub trait AsyncFileReader: AsyncRead + AsyncSeek + Unpin + Send + Sync {}
15impl<T: AsyncRead + AsyncSeek + Unpin + Send + Sync> AsyncFileReader for T {}
16
17#[derive(Debug, Clone)]
18pub struct CurrentFileNetworkStats {
19 pub bps: SizeUnit,
21 pub eta: Duration,
23 pub percentage: f64,
25 pub done: SizeUnit,
27 pub total: SizeUnit,
29 pub elapsed: Duration,
31}
32
33impl CurrentFileNetworkStats {}
34
35impl Display for CurrentFileNetworkStats {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match f.precision() {
38 Some(precision) => f.write_fmt(format_args!(
39 "Speed: {:.precision$}PS | ETA: {:.precision$?} | Progress: {:.precision$}/{:.precision$} ({:.precision$}%) | Elapsed: {:.precision$?}",
40 self.bps, self.eta, self.done, self.total, self.percentage * 100.0, self.elapsed, precision = precision
41 )),
42 None => f.write_fmt(format_args!(
43 "Speed: {}PS | ETA: {:?} | Progress: {}/{}({}) | Elapsed: {:?}",
44 self.bps, self.eta, self.done, self.total, self.percentage, self.elapsed
45 )),
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum FileStatus {
52 Pending,
53 Working,
54 Finished,
55 Retrying,
56 Aborted,
57}
58
59#[derive(Debug)]
60pub struct FileNetworkStats {
61 pub(super) done: Arc<AtomicU64>,
62 pub(super) speed_buffer: WriteLockArc<RollingTimeSeries<u64, 5000>>,
63 pub(super) total: f64,
64 pub(super) start_time: WriteLockArc<Instant>,
65}
66
67impl FileNetworkStats {
68 pub(super) fn new(total: f64) -> Self {
69 Self {
70 total,
71 done: Arc::new(AtomicU64::new(0)),
72 speed_buffer: WriteLockArc::new(RollingTimeSeries::new(Duration::from_secs(10))),
73 start_time: WriteLockArc::new(Instant::now()),
74 }
75 }
76
77 pub fn bytes_per_second(&self) -> f64 {
79 self.inner_bytes_per_second()
80 }
81
82 pub fn estimated_time(&self) -> f64 {
84 let done = self.done.load(Ordering::Relaxed) as f64;
85
86 self.inner_estimated_time(done)
87 }
88
89 pub fn percentage(&self) -> f64 {
91 let done = self.done.load(Ordering::Relaxed) as f64;
92
93 done / self.total
94 }
95
96 pub fn current_stats(&self) -> CurrentFileNetworkStats {
98 let done = self.done.load(Ordering::Relaxed) as f64;
99
100 CurrentFileNetworkStats {
101 bps: self.inner_bytes_per_second().into(),
102 eta: Duration::from_secs_f64(self.inner_estimated_time(done).max(0.0)),
103 percentage: done / self.total,
104 done: done.into(),
105 total: self.total.into(),
106 elapsed: self.start_time.elapsed(),
107 }
108 }
109
110 pub(super) async fn add_done_bytes(&self, bytes: u64) {
111 self.done.fetch_add(bytes, Ordering::Relaxed);
112 let mut buffer = self.speed_buffer.lock_write().await;
113 buffer.add_value(bytes);
114 }
115
116 fn inner_bytes_per_second(&self) -> f64 {
117 let dps = self.speed_buffer.get_valid_points();
118 let mut total = 0.0;
119 let oldest_time = dps
120 .iter()
121 .map(|dp| {
122 total += dp.data as f64;
123 dp.time.elapsed()
124 })
125 .max();
126
127 match oldest_time {
128 Some(dur) => total / dur.as_secs_f64(),
129 None => 0.0,
130 }
131 }
132
133 fn inner_estimated_time(&self, done: f64) -> f64 {
134 let mut bytes_per_sec = self.inner_bytes_per_second();
135
136 if bytes_per_sec == 0.0 {
137 bytes_per_sec = 1.0;
138 }
139
140 (self.total - done) / bytes_per_sec
141 }
142}