maelstrom_base/
stats.rs

1//! Contains data-structures for maintaining historical statistics of jobs
2
3use crate::{
4    ring_buffer::{self, RingBuffer},
5    ClientId, WorkerId,
6};
7use enum_map::EnumMap;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt, time::Duration};
10
11/// The amount of time between broker statistic samples
12pub const BROKER_STATISTICS_INTERVAL: Duration = Duration::from_millis(500);
13
14#[derive(
15    Copy,
16    Clone,
17    Debug,
18    PartialEq,
19    Eq,
20    enum_map::Enum,
21    strum::EnumIter,
22    Serialize,
23    Deserialize,
24    Hash,
25    PartialOrd,
26    Ord,
27)]
28pub enum JobState {
29    WaitingForArtifacts,
30    Pending,
31    Running,
32    Complete,
33}
34
35impl fmt::Display for JobState {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        match self {
38            Self::WaitingForArtifacts => write!(f, "waiting for artifacts"),
39            Self::Pending => write!(f, "pending"),
40            Self::Running => write!(f, "running"),
41            Self::Complete => write!(f, "complete"),
42        }
43    }
44}
45
46impl JobState {
47    pub fn iter() -> impl DoubleEndedIterator<Item = Self> {
48        <Self as strum::IntoEnumIterator>::iter()
49    }
50}
51
52#[test]
53fn job_state_iter() {
54    let exp = vec![
55        JobState::WaitingForArtifacts,
56        JobState::Pending,
57        JobState::Running,
58        JobState::Complete,
59    ];
60    assert_eq!(Vec::from_iter(JobState::iter()), exp);
61    assert_eq!(
62        Vec::from_iter(JobState::iter().rev()),
63        Vec::from_iter(exp.into_iter().rev())
64    );
65}
66
67/// For a single client, counts of jobs in various states
68pub type JobStateCounts = EnumMap<JobState, u64>;
69
70/// Single point-in-time snapshot
71/// TODO: This should contain a timestamp
72#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
73pub struct JobStatisticsSample {
74    pub client_to_stats: HashMap<ClientId, JobStateCounts>,
75}
76
77/// The number of data-points to save before it is deleted
78pub const CAPACITY: usize = 1024;
79
80/// Time-series of job statistics.
81/// It is implemented with a ring buffer. The entries are ordered by time
82#[derive(Serialize, Default, Deserialize, Clone, Debug, PartialEq, Eq)]
83pub struct JobStatisticsTimeSeries {
84    entries: RingBuffer<JobStatisticsSample, CAPACITY>,
85}
86
87impl FromIterator<JobStatisticsSample> for JobStatisticsTimeSeries {
88    fn from_iter<T>(iter: T) -> Self
89    where
90        T: IntoIterator<Item = JobStatisticsSample>,
91    {
92        let mut s = Self::default();
93        for e in iter {
94            s.entries.push(e);
95        }
96        s
97    }
98}
99
100impl JobStatisticsTimeSeries {
101    pub fn new() -> Self {
102        Self::default()
103    }
104
105    pub fn insert(&mut self, entry: JobStatisticsSample) {
106        self.entries.push(entry);
107    }
108
109    pub fn iter(&self) -> ring_buffer::Iter<'_, JobStatisticsSample, CAPACITY> {
110        self.entries.iter()
111    }
112
113    pub fn capacity(&self) -> usize {
114        CAPACITY
115    }
116
117    pub fn len(&self) -> usize {
118        self.entries.len()
119    }
120
121    pub fn is_empty(&self) -> bool {
122        self.entries.is_empty()
123    }
124}
125
126#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
127pub struct WorkerStatistics {
128    pub slots: usize,
129}
130
131/// Useful information for a client to display about the broker's state.
132#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
133pub struct BrokerStatistics {
134    pub worker_statistics: HashMap<WorkerId, WorkerStatistics>,
135    pub job_statistics: JobStatisticsTimeSeries,
136}