rustybit_lib/stats/
mod.rs

1use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
2use std::time::Duration;
3
4use tokio::sync::oneshot;
5use tokio::time::Instant;
6
7use self::buffer::CircularBuffer;
8
9mod buffer;
10
11pub static DOWNLOADED_BYTES: AtomicUsize = AtomicUsize::new(0);
12pub static DOWNLOADED_PIECES: AtomicUsize = AtomicUsize::new(0);
13pub static NUMBER_OF_PEERS: AtomicU8 = AtomicU8::new(0);
14
15struct Snapshot {
16    time: Instant,
17    downloaded_bytes: usize,
18    bytes_left: usize,
19    kibps: f64,
20    number_of_peers: u8,
21}
22
23impl Snapshot {
24    fn new(time: Instant, downloaded: usize, left: usize, download_speed: f64, number_of_peers: u8) -> Self {
25        Snapshot {
26            time,
27            downloaded_bytes: downloaded,
28            bytes_left: left,
29            kibps: download_speed,
30            number_of_peers,
31        }
32    }
33}
34
35pub struct Stats {
36    snapshots: CircularBuffer<Snapshot>,
37    total_length: usize,
38}
39
40impl Stats {
41    pub fn new(downloaded: usize, left: usize, total_length: usize) -> Self {
42        let mut snapshots = CircularBuffer::new(15);
43        snapshots.push_back(Snapshot::new(Instant::now(), downloaded, left, 0., 0));
44
45        Stats {
46            snapshots,
47            total_length,
48        }
49    }
50
51    pub async fn collect_stats(&mut self, mut cancellation: oneshot::Receiver<()>) {
52        tracing::debug!("starting the stats collector");
53
54        let mut stats_collecting_interval = tokio::time::interval(Duration::from_secs(1));
55        loop {
56            tokio::select! {
57                snapshot_time = stats_collecting_interval.tick() => {
58                    self.snapshot(snapshot_time);
59                    self.print_stats();
60                }
61                _ = &mut cancellation => {
62                    // We don't care about the result, we exit either way
63                    break;
64                }
65            }
66        }
67
68        tracing::debug!("shutting down the stats collector");
69    }
70
71    fn print_stats(&self) {
72        let last_snapshot = self.snapshots.get_last().expect("not a single snapshot?");
73
74        let download_completion_percent = {
75            let downloaded_kb = (last_snapshot.downloaded_bytes / 1000) as f64;
76            let total_kb = (self.total_length / 1000) as f64;
77            if total_kb == 0. {
78                100.
79            } else {
80                (downloaded_kb / total_kb) * 100.
81            }
82        };
83
84        let (download_speed, measurement_unit, eta_seconds) = {
85            let average_kibps =
86                self.snapshots.iter().map(|snapshot| snapshot.kibps).sum::<f64>() / self.snapshots.len() as f64;
87
88            let eta_seconds = if average_kibps == 0. {
89                f64::INFINITY
90            } else {
91                last_snapshot.bytes_left as f64 / (average_kibps * 1024.)
92            };
93
94            if average_kibps >= 512. {
95                // Convert to mibps
96                (average_kibps / 1024., "MiB", eta_seconds)
97            } else {
98                (average_kibps, "KiB", eta_seconds)
99            }
100        };
101
102        tracing::info!(
103            "ETA: {:.2} s - {:.2}% - ↓{:.1} {}/s - peers: {}",
104            eta_seconds,
105            download_completion_percent,
106            download_speed,
107            measurement_unit,
108            last_snapshot.number_of_peers
109        );
110    }
111
112    fn snapshot(&mut self, snapshot_time: Instant) {
113        let downloaded_bytes = DOWNLOADED_BYTES.load(Ordering::Relaxed);
114        let number_of_peers = NUMBER_OF_PEERS.load(Ordering::Relaxed);
115
116        // SAFETY: we always have at least one snapshot in the buffer, as we add the first one
117        // ourselves
118        let prev_snapshot = self.snapshots.get_last().expect("bug: not a single snapshot?");
119
120        let downloaded_since_last = downloaded_bytes - prev_snapshot.downloaded_bytes;
121        let bytes_left = prev_snapshot.bytes_left - downloaded_since_last;
122        let time_since_last_snapshot = prev_snapshot.time.elapsed().as_secs_f64();
123        let kibps = {
124            let downloaded_kibs = (downloaded_since_last / 1024) as f64;
125            downloaded_kibs / time_since_last_snapshot
126        };
127
128        self.snapshots.push_back(Snapshot::new(
129            snapshot_time,
130            downloaded_bytes,
131            bytes_left,
132            kibps,
133            number_of_peers,
134        ));
135    }
136}