smelter_worker/
stats.rs

1//! Worker statistics collection
2
3use std::fmt::Display;
4
5use serde::{Deserialize, Serialize};
6use sysinfo::System;
7
8/// OS-level statistics about the task
9#[cfg_attr(feature = "worker-side", derive(Serialize))]
10#[cfg_attr(feature = "spawner-side", derive(Deserialize))]
11#[derive(Clone, Debug)]
12pub struct Statistics {
13    /// The average one-minute load average throughout the task
14    pub avg_load_average: f64,
15    /// The maximum one-minute load average throughout the task
16    pub max_load_average: f64,
17    /// The maximum amount of RAM used, in bytes.
18    pub max_ram_used: u64,
19    /// The system uptime when the worker started running
20    pub uptime_at_start: u64,
21    /// System uptime when the worker finished running
22    pub uptime_at_end: u64,
23}
24
25impl Display for Statistics {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(
28            f,
29            "load: avg {:.2}, max {:.2}, max_ram: {}MiB, start_at: ",
30            self.avg_load_average,
31            self.max_load_average,
32            (self.max_ram_used as f64 / 1048576.0).round(),
33        )?;
34
35        Self::write_uptime_duration(f, self.uptime_at_start)?;
36        write!(f, ", took: ")?;
37        Self::write_uptime_duration(f, self.uptime_at_end - self.uptime_at_start)?;
38        Ok(())
39    }
40}
41
42impl Statistics {
43    fn write_uptime_duration(f: &mut std::fmt::Formatter<'_>, uptime: u64) -> std::fmt::Result {
44        let days = uptime / 86400;
45        let hours = (uptime % 86400) / 3600;
46        let minutes = (uptime % 3600) / 60;
47        let seconds = uptime % 60;
48        match (days, hours, minutes, seconds) {
49            (0, 0, 0, _) => write!(f, "{}s", seconds),
50            (0, 0, _, _) => write!(f, "{}m{}s", minutes, seconds),
51            (0, _, _, _) => write!(f, "{}h{}m{}s", hours, minutes, seconds),
52            (_, _, _, _) => write!(f, "{}d{}h{}m{}s", days, hours, minutes, seconds),
53        }
54    }
55}
56
57/// Runs a task that periodically collects statistics
58pub struct StatisticsTracker {
59    close_tx: tokio::sync::oneshot::Sender<()>,
60    task_handle: tokio::task::JoinHandle<Statistics>,
61}
62
63impl StatisticsTracker {
64    /// Close the collection task and return the collected [Statistics]
65    pub async fn finish(self) -> Option<Statistics> {
66        drop(self.close_tx);
67        self.task_handle.await.ok()
68    }
69}
70
71/// Start a task to track system statistics. The task can be shut down by dropping the
72/// returned [Receiver], and the [JoinHandle] will then return a [Statistics] object.
73pub fn track_system_stats() -> StatisticsTracker {
74    let (close_tx, mut close_rx) = tokio::sync::oneshot::channel();
75    let task_handle = tokio::task::spawn(async move {
76        let mut system = System::new();
77
78        let mut stats = Statistics {
79            avg_load_average: 0.0,
80            max_load_average: 0.0,
81            max_ram_used: 0,
82            uptime_at_start: System::uptime(),
83            uptime_at_end: 0,
84        };
85
86        let mut total_load_average = 0.0;
87        let mut num_load_averages = 0;
88
89        let mut check_interval = tokio::time::interval(std::time::Duration::from_secs(10));
90
91        loop {
92            tokio::select! {
93                _ = check_interval.tick() => {
94                    let load_avg = System::load_average();
95                    total_load_average += load_avg.one;
96                    num_load_averages += 1;
97                    if load_avg.one > stats.max_load_average {
98                        stats.max_load_average = load_avg.one;
99                    }
100
101                    system.refresh_memory_specifics(sysinfo::MemoryRefreshKind::new().with_ram());
102                    let used_ram = system.used_memory();
103                    if used_ram > stats.max_ram_used {
104                        stats.max_ram_used = used_ram;
105                    }
106                }
107                _ = &mut close_rx => {
108                    break;
109                }
110            }
111        }
112
113        if num_load_averages > 0 {
114            stats.avg_load_average = total_load_average / (num_load_averages as f64);
115        }
116        stats.uptime_at_end = System::uptime();
117        stats
118    });
119
120    StatisticsTracker {
121        close_tx,
122        task_handle,
123    }
124}