1use crate::{
4 ring_buffer::{self, RingBuffer},
5 ClientId, WorkerId,
6};
7use enum_map::EnumMap;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt, time::Duration};
10
11pub const BROKER_STATISTICS_INTERVAL: Duration = Duration::from_millis(500);
13
14#[derive(
15 Copy,
16 Clone,
17 Debug,
18 PartialEq,
19 Eq,
20 enum_map::Enum,
21 strum::EnumIter,
22 Serialize,
23 Deserialize,
24 Hash,
25 PartialOrd,
26 Ord,
27)]
28pub enum JobState {
29 WaitingForArtifacts,
30 Pending,
31 Running,
32 Complete,
33}
34
35impl fmt::Display for JobState {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 Self::WaitingForArtifacts => write!(f, "waiting for artifacts"),
39 Self::Pending => write!(f, "pending"),
40 Self::Running => write!(f, "running"),
41 Self::Complete => write!(f, "complete"),
42 }
43 }
44}
45
46impl JobState {
47 pub fn iter() -> impl DoubleEndedIterator<Item = Self> {
48 <Self as strum::IntoEnumIterator>::iter()
49 }
50}
51
52#[test]
53fn job_state_iter() {
54 let exp = vec![
55 JobState::WaitingForArtifacts,
56 JobState::Pending,
57 JobState::Running,
58 JobState::Complete,
59 ];
60 assert_eq!(Vec::from_iter(JobState::iter()), exp);
61 assert_eq!(
62 Vec::from_iter(JobState::iter().rev()),
63 Vec::from_iter(exp.into_iter().rev())
64 );
65}
66
67pub type JobStateCounts = EnumMap<JobState, u64>;
69
70#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
73pub struct JobStatisticsSample {
74 pub client_to_stats: HashMap<ClientId, JobStateCounts>,
75}
76
77pub const CAPACITY: usize = 1024;
79
80#[derive(Serialize, Default, Deserialize, Clone, Debug, PartialEq, Eq)]
83pub struct JobStatisticsTimeSeries {
84 entries: RingBuffer<JobStatisticsSample, CAPACITY>,
85}
86
87impl FromIterator<JobStatisticsSample> for JobStatisticsTimeSeries {
88 fn from_iter<T>(iter: T) -> Self
89 where
90 T: IntoIterator<Item = JobStatisticsSample>,
91 {
92 let mut s = Self::default();
93 for e in iter {
94 s.entries.push(e);
95 }
96 s
97 }
98}
99
100impl JobStatisticsTimeSeries {
101 pub fn new() -> Self {
102 Self::default()
103 }
104
105 pub fn insert(&mut self, entry: JobStatisticsSample) {
106 self.entries.push(entry);
107 }
108
109 pub fn iter(&self) -> ring_buffer::Iter<'_, JobStatisticsSample, CAPACITY> {
110 self.entries.iter()
111 }
112
113 pub fn capacity(&self) -> usize {
114 CAPACITY
115 }
116
117 pub fn len(&self) -> usize {
118 self.entries.len()
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.entries.is_empty()
123 }
124}
125
126#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
127pub struct WorkerStatistics {
128 pub slots: usize,
129}
130
131#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
133pub struct BrokerStatistics {
134 pub worker_statistics: HashMap<WorkerId, WorkerStatistics>,
135 pub job_statistics: JobStatisticsTimeSeries,
136}