1use std::collections::VecDeque;
4use std::fmt;
5use std::sync::Mutex;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7
8const ROLLING_WINDOW_TASK_DURATIONS: usize = 100;
10const ROLLING_WINDOW_FAILURES: usize = 50;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum NodeError {
16 QueueFull,
18 QueueClosed,
20 NodeShuttingDown,
22 SignalSendError,
24 NoNodesAvailable,
26 SystemMaxedOut,
28}
29
30impl fmt::Display for NodeError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 NodeError::QueueFull => write!(f, "Node task queue is full"),
34 NodeError::QueueClosed => write!(f, "Node task queue is closed"),
35 NodeError::NodeShuttingDown => write!(f, "Node is shutting down"),
36 NodeError::SignalSendError => write!(f, "Node failed to send a system signal"),
37 NodeError::NoNodesAvailable => write!(f, "No VibeNodes available for task submission"),
38 NodeError::SystemMaxedOut => {
39 write!(
40 f,
41 "System is maxed out; all sampled nodes are at full pressure"
42 )
43 }
44 }
45 }
46}
47
48impl std::error::Error for NodeError {}
49
50#[derive(Debug)]
55pub struct LocalStats {
56 tasks_submitted: AtomicUsize,
58 tasks_processed: AtomicUsize,
60 tasks_succeeded: AtomicUsize,
62 tasks_failed: AtomicUsize,
64 total_processing_time_micros: AtomicU64,
66 recent_task_durations_micros: Mutex<VecDeque<u64>>,
68 recent_task_outcomes: Mutex<VecDeque<u8>>,
70}
71
72impl Default for LocalStats {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl LocalStats {
79 pub fn new() -> Self {
81 LocalStats {
82 tasks_submitted: AtomicUsize::new(0),
83 tasks_processed: AtomicUsize::new(0),
84 tasks_succeeded: AtomicUsize::new(0),
85 tasks_failed: AtomicUsize::new(0),
86 total_processing_time_micros: AtomicU64::new(0),
87 recent_task_durations_micros: Mutex::new(VecDeque::with_capacity(
88 ROLLING_WINDOW_TASK_DURATIONS,
89 )),
90 recent_task_outcomes: Mutex::new(VecDeque::with_capacity(ROLLING_WINDOW_FAILURES)),
91 }
92 }
93
94 pub fn task_submitted(&self) {
96 self.tasks_submitted.fetch_add(1, Ordering::Relaxed);
97 }
98
99 pub fn record_task_outcome(&self, duration: u64, success: bool) {
101 self.total_processing_time_micros
102 .fetch_add(duration, Ordering::Relaxed);
103
104 if success {
105 self.tasks_succeeded.fetch_add(1, Ordering::Relaxed);
106 } else {
107 self.tasks_failed.fetch_add(1, Ordering::Relaxed);
108 }
109
110 {
112 let mut durations_guard = self
113 .recent_task_durations_micros
114 .lock()
115 .expect("Mutex poisoned: recent_task_durations_micros");
116 if durations_guard.len() == ROLLING_WINDOW_TASK_DURATIONS && !durations_guard.is_empty()
117 {
118 durations_guard.pop_front();
119 }
120 if ROLLING_WINDOW_TASK_DURATIONS > 0 {
121 durations_guard.push_back(duration);
122 }
123 }
124
125 {
127 let mut outcomes_guard = self
128 .recent_task_outcomes
129 .lock()
130 .expect("Mutex poisoned: recent_task_outcomes");
131 if outcomes_guard.len() == ROLLING_WINDOW_FAILURES && !outcomes_guard.is_empty() {
132 outcomes_guard.pop_front();
133 }
134 if ROLLING_WINDOW_FAILURES > 0 {
135 outcomes_guard.push_back(if success { 0 } else { 1 });
136 }
137 }
138 }
139}