rustybit_lib/stats/
mod.rs1use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
2use std::time::Duration;
3
4use tokio::sync::oneshot;
5use tokio::time::Instant;
6
7use self::buffer::CircularBuffer;
8
9mod buffer;
10
11pub static DOWNLOADED_BYTES: AtomicUsize = AtomicUsize::new(0);
12pub static DOWNLOADED_PIECES: AtomicUsize = AtomicUsize::new(0);
13pub static NUMBER_OF_PEERS: AtomicU8 = AtomicU8::new(0);
14
15struct Snapshot {
16 time: Instant,
17 downloaded_bytes: usize,
18 bytes_left: usize,
19 kibps: f64,
20 number_of_peers: u8,
21}
22
23impl Snapshot {
24 fn new(time: Instant, downloaded: usize, left: usize, download_speed: f64, number_of_peers: u8) -> Self {
25 Snapshot {
26 time,
27 downloaded_bytes: downloaded,
28 bytes_left: left,
29 kibps: download_speed,
30 number_of_peers,
31 }
32 }
33}
34
35pub struct Stats {
36 snapshots: CircularBuffer<Snapshot>,
37 total_length: usize,
38}
39
40impl Stats {
41 pub fn new(downloaded: usize, left: usize, total_length: usize) -> Self {
42 let mut snapshots = CircularBuffer::new(15);
43 snapshots.push_back(Snapshot::new(Instant::now(), downloaded, left, 0., 0));
44
45 Stats {
46 snapshots,
47 total_length,
48 }
49 }
50
51 pub async fn collect_stats(&mut self, mut cancellation: oneshot::Receiver<()>) {
52 tracing::debug!("starting the stats collector");
53
54 let mut stats_collecting_interval = tokio::time::interval(Duration::from_secs(1));
55 loop {
56 tokio::select! {
57 snapshot_time = stats_collecting_interval.tick() => {
58 self.snapshot(snapshot_time);
59 self.print_stats();
60 }
61 _ = &mut cancellation => {
62 break;
64 }
65 }
66 }
67
68 tracing::debug!("shutting down the stats collector");
69 }
70
71 fn print_stats(&self) {
72 let last_snapshot = self.snapshots.get_last().expect("not a single snapshot?");
73
74 let download_completion_percent = {
75 let downloaded_kb = (last_snapshot.downloaded_bytes / 1000) as f64;
76 let total_kb = (self.total_length / 1000) as f64;
77 if total_kb == 0. {
78 100.
79 } else {
80 (downloaded_kb / total_kb) * 100.
81 }
82 };
83
84 let (download_speed, measurement_unit, eta_seconds) = {
85 let average_kibps =
86 self.snapshots.iter().map(|snapshot| snapshot.kibps).sum::<f64>() / self.snapshots.len() as f64;
87
88 let eta_seconds = if average_kibps == 0. {
89 f64::INFINITY
90 } else {
91 last_snapshot.bytes_left as f64 / (average_kibps * 1024.)
92 };
93
94 if average_kibps >= 512. {
95 (average_kibps / 1024., "MiB", eta_seconds)
97 } else {
98 (average_kibps, "KiB", eta_seconds)
99 }
100 };
101
102 tracing::info!(
103 "ETA: {:.2} s - {:.2}% - ↓{:.1} {}/s - peers: {}",
104 eta_seconds,
105 download_completion_percent,
106 download_speed,
107 measurement_unit,
108 last_snapshot.number_of_peers
109 );
110 }
111
112 fn snapshot(&mut self, snapshot_time: Instant) {
113 let downloaded_bytes = DOWNLOADED_BYTES.load(Ordering::Relaxed);
114 let number_of_peers = NUMBER_OF_PEERS.load(Ordering::Relaxed);
115
116 let prev_snapshot = self.snapshots.get_last().expect("bug: not a single snapshot?");
119
120 let downloaded_since_last = downloaded_bytes - prev_snapshot.downloaded_bytes;
121 let bytes_left = prev_snapshot.bytes_left - downloaded_since_last;
122 let time_since_last_snapshot = prev_snapshot.time.elapsed().as_secs_f64();
123 let kibps = {
124 let downloaded_kibs = (downloaded_since_last / 1024) as f64;
125 downloaded_kibs / time_since_last_snapshot
126 };
127
128 self.snapshots.push_back(Snapshot::new(
129 snapshot_time,
130 downloaded_bytes,
131 bytes_left,
132 kibps,
133 number_of_peers,
134 ));
135 }
136}