backblaze_b2_client/tasks/
shared.rs

1use std::{
2    fmt::Display,
3    sync::{
4        atomic::{AtomicU64, Ordering},
5        Arc,
6    },
7    time::{Duration, Instant},
8};
9
10use tokio::io::{AsyncRead, AsyncSeek};
11
12use crate::util::{write_lock_arc::WriteLockArc, RollingTimeSeries, SizeUnit};
13
14pub trait AsyncFileReader: AsyncRead + AsyncSeek + Unpin + Send + Sync {}
15impl<T: AsyncRead + AsyncSeek + Unpin + Send + Sync> AsyncFileReader for T {}
16
17#[derive(Debug, Clone)]
18pub struct CurrentFileNetworkStats {
19    /// Bytes per seconds
20    pub bps: SizeUnit,
21    /// Estimated finished time in seconds
22    pub eta: Duration,
23    /// Completion Percentage
24    pub percentage: f64,
25    /// Uploaded bytes so far
26    pub done: SizeUnit,
27    /// Total bytes to upload
28    pub total: SizeUnit,
29    /// Elapsed time
30    pub elapsed: Duration,
31}
32
33impl CurrentFileNetworkStats {}
34
35impl Display for CurrentFileNetworkStats {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match f.precision() {
38            Some(precision) =>  f.write_fmt(format_args!(
39                "Speed: {:.precision$}PS | ETA: {:.precision$?} | Progress: {:.precision$}/{:.precision$} ({:.precision$}%) | Elapsed: {:.precision$?}",
40                self.bps, self.eta, self.done, self.total, self.percentage * 100.0, self.elapsed, precision = precision
41            )),
42            None =>  f.write_fmt(format_args!(
43                "Speed: {}PS | ETA: {:?} | Progress: {}/{}({}) | Elapsed: {:?}",
44                self.bps, self.eta, self.done, self.total, self.percentage, self.elapsed
45            )),
46        }
47    }
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum FileStatus {
52    Pending,
53    Working,
54    Finished,
55    Retrying,
56    Aborted,
57}
58
59#[derive(Debug)]
60pub struct FileNetworkStats {
61    pub(super) done: Arc<AtomicU64>,
62    pub(super) speed_buffer: WriteLockArc<RollingTimeSeries<u64, 5000>>,
63    pub(super) total: f64,
64    pub(super) start_time: WriteLockArc<Instant>,
65}
66
67impl FileNetworkStats {
68    pub(super) fn new(total: f64) -> Self {
69        Self {
70            total,
71            done: Arc::new(AtomicU64::new(0)),
72            speed_buffer: WriteLockArc::new(RollingTimeSeries::new(Duration::from_secs(10))),
73            start_time: WriteLockArc::new(Instant::now()),
74        }
75    }
76
77    /// Returns estimated download/upload speed in bytes per second
78    pub fn bytes_per_second(&self) -> f64 {
79        self.inner_bytes_per_second()
80    }
81
82    /// Returns estimated finish time in seconds
83    pub fn estimated_time(&self) -> f64 {
84        let done = self.done.load(Ordering::Relaxed) as f64;
85
86        self.inner_estimated_time(done)
87    }
88
89    /// Returns current percentage
90    pub fn percentage(&self) -> f64 {
91        let done = self.done.load(Ordering::Relaxed) as f64;
92
93        done / self.total
94    }
95
96    /// Returns file stats at this point of time
97    pub fn current_stats(&self) -> CurrentFileNetworkStats {
98        let done = self.done.load(Ordering::Relaxed) as f64;
99
100        CurrentFileNetworkStats {
101            bps: self.inner_bytes_per_second().into(),
102            eta: Duration::from_secs_f64(self.inner_estimated_time(done).max(0.0)),
103            percentage: done / self.total,
104            done: done.into(),
105            total: self.total.into(),
106            elapsed: self.start_time.elapsed(),
107        }
108    }
109
110    pub(super) async fn add_done_bytes(&self, bytes: u64) {
111        self.done.fetch_add(bytes, Ordering::Relaxed);
112        let mut buffer = self.speed_buffer.lock_write().await;
113        buffer.add_value(bytes);
114    }
115
116    fn inner_bytes_per_second(&self) -> f64 {
117        let dps = self.speed_buffer.get_valid_points();
118        let mut total = 0.0;
119        let oldest_time = dps
120            .iter()
121            .map(|dp| {
122                total += dp.data as f64;
123                dp.time.elapsed()
124            })
125            .max();
126
127        match oldest_time {
128            Some(dur) => total / dur.as_secs_f64(),
129            None => 0.0,
130        }
131    }
132
133    fn inner_estimated_time(&self, done: f64) -> f64 {
134        let mut bytes_per_sec = self.inner_bytes_per_second();
135
136        if bytes_per_sec == 0.0 {
137            bytes_per_sec = 1.0;
138        }
139
140        (self.total - done) / bytes_per_sec
141    }
142}