blueprint_qos/metrics/provider/
default.rs1use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3use tokio::sync::RwLock; use crate::metrics::types::{
6 BlueprintMetrics, BlueprintStatus, MetricsConfig, MetricsProvider, SystemMetrics,
7};
8
9pub struct DefaultMetricsProvider {
11 system_metrics: Arc<tokio::sync::RwLock<Vec<SystemMetrics>>>,
12 blueprint_metrics: Arc<tokio::sync::RwLock<Vec<BlueprintMetrics>>>,
13 blueprint_status: Arc<tokio::sync::RwLock<BlueprintStatus>>,
14 custom_metrics: Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>,
15 config: MetricsConfig,
16 start_time: Instant,
17}
18
19impl DefaultMetricsProvider {
20 #[must_use]
22 pub fn new(config: MetricsConfig) -> Self {
23 Self {
24 system_metrics: Arc::new(RwLock::new(Vec::new())),
25 blueprint_metrics: Arc::new(RwLock::new(Vec::new())),
26 blueprint_status: Arc::new(RwLock::new(BlueprintStatus::default())),
27 custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
28 config,
29 start_time: Instant::now(),
30 }
31 }
32
33 #[must_use]
35 pub fn system_metrics_arc_clone(&self) -> Arc<tokio::sync::RwLock<Vec<SystemMetrics>>> {
36 self.system_metrics.clone()
37 }
38
39 #[must_use]
41 pub fn blueprint_metrics_arc_clone(&self) -> Arc<tokio::sync::RwLock<Vec<BlueprintMetrics>>> {
42 self.blueprint_metrics.clone()
43 }
44
45 #[must_use]
47 pub fn blueprint_status_arc_clone(&self) -> Arc<tokio::sync::RwLock<BlueprintStatus>> {
48 self.blueprint_status.clone()
49 }
50
51 #[must_use]
53 pub fn custom_metrics_arc_clone(
54 &self,
55 ) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
56 self.custom_metrics.clone()
57 }
58
59 fn collect_system_metrics() -> SystemMetrics {
61 use sysinfo::{Disks, Networks, System};
62
63 let mut sys = System::new_all();
64 sys.refresh_all();
65
66 let cpu_usage = sys.global_cpu_usage();
67
68 let memory_usage = sys.used_memory();
69 let total_memory = sys.total_memory();
70
71 let mut disk_usage: u64 = 0;
72 let mut total_disk: u64 = 0;
73 for disk in Disks::new_with_refreshed_list().list() {
74 total_disk += disk.total_space();
75 disk_usage += disk.total_space() - disk.available_space();
76 }
77
78 let mut network_rx_bytes: u64 = 0;
79 let mut network_tx_bytes: u64 = 0;
80 for data in Networks::new_with_refreshed_list().list().values() {
81 network_rx_bytes += data.received();
82 network_tx_bytes += data.transmitted();
83 }
84
85 let timestamp = SystemTime::now()
86 .duration_since(UNIX_EPOCH)
87 .unwrap_or_default()
88 .as_secs();
89
90 SystemMetrics {
91 cpu_usage,
92 memory_usage,
93 total_memory,
94 disk_usage,
95 total_disk,
96 network_rx_bytes,
97 network_tx_bytes,
98 timestamp,
99 }
100 }
101}
102
103impl MetricsProvider for DefaultMetricsProvider {
104 async fn get_system_metrics(&self) -> SystemMetrics {
106 if let Ok(metrics_guard) = self.system_metrics.try_read() {
107 metrics_guard.last().cloned().unwrap_or_default()
108 } else {
109 eprintln!(
110 "ERROR: System metrics lock poisoned or unavailable in get_system_metrics. Returning default."
111 );
112 SystemMetrics::default()
113 }
114 }
115 async fn get_blueprint_metrics(&self) -> BlueprintMetrics {
117 if let Ok(metrics_guard) = self.blueprint_metrics.try_read() {
118 metrics_guard.last().cloned().unwrap_or_default()
119 } else {
120 eprintln!(
121 "ERROR: Blueprint metrics lock poisoned or unavailable in get_blueprint_metrics. Returning default."
122 );
123 BlueprintMetrics::default()
124 }
125 }
126 async fn get_blueprint_status(&self) -> BlueprintStatus {
128 if let Ok(status_guard) = self.blueprint_status.try_read() {
129 status_guard.clone()
130 } else {
131 eprintln!(
132 "ERROR: Blueprint status lock poisoned or unavailable in get_blueprint_status. Returning default."
133 );
134 BlueprintStatus::default()
135 }
136 }
137 async fn get_system_metrics_history(&self) -> Vec<SystemMetrics> {
139 if let Ok(metrics_guard) = self.system_metrics.try_read() {
140 metrics_guard.clone()
141 } else {
142 eprintln!(
143 "ERROR: System metrics lock poisoned or unavailable in get_system_metrics_history. Returning empty history."
144 );
145 Vec::new()
146 }
147 }
148 async fn get_blueprint_metrics_history(&self) -> Vec<BlueprintMetrics> {
150 if let Ok(metrics_guard) = self.blueprint_metrics.try_read() {
151 metrics_guard.clone()
152 } else {
153 eprintln!(
154 "ERROR: Blueprint metrics lock poisoned or unavailable in get_blueprint_metrics_history. Returning empty history."
155 );
156 Vec::new()
157 }
158 }
159 async fn add_custom_metric(&self, key: String, value: String) {
161 if let Ok(mut custom_guard) = self.custom_metrics.try_write() {
162 custom_guard.insert(key, value);
163 } else {
164 eprintln!(
165 "ERROR: Custom metrics lock poisoned or unavailable in add_custom_metric. Metric not added."
166 );
167 }
168 }
169 async fn set_blueprint_status(&self, status_code: u32, status_message: Option<String>) {
171 if let Ok(mut status_guard) = self.blueprint_status.try_write() {
172 status_guard.status_code = status_code;
173 status_guard.status_message = status_message;
174 } else {
175 eprintln!(
176 "ERROR: Blueprint status lock poisoned or unavailable in set_blueprint_status. Status not set."
177 );
178 }
179 }
180 async fn update_last_heartbeat(&self, timestamp: u64) {
182 if let Ok(mut status_guard) = self.blueprint_status.try_write() {
183 status_guard.last_heartbeat = Some(timestamp);
184 } else {
185 eprintln!(
186 "ERROR: Blueprint status lock poisoned or unavailable in update_last_heartbeat. Heartbeat not updated."
187 );
188 }
189 }
190
191 async fn start_collection(&self) -> Result<(), crate::error::Error> {
197 let system_metrics = self.system_metrics.clone();
198 let blueprint_metrics = self.blueprint_metrics.clone();
199 let blueprint_status = self.blueprint_status.clone();
200 let custom_metrics = self.custom_metrics.clone();
201 let start_time = self.start_time;
202 let config = self.config.clone();
203
204 tokio::spawn(async move {
205 let mut interval =
206 tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
207 loop {
208 interval.tick().await;
209 let sys_metrics_data = DefaultMetricsProvider::collect_system_metrics();
210 if let Ok(mut metrics) = system_metrics.try_write() {
211 metrics.push(sys_metrics_data);
212 if metrics.len() > config.max_history {
213 metrics.remove(0);
214 }
215 } else {
216 println!(
217 "COLLECTION_LOOP_ERROR: Failed to acquire system_metrics write lock (try_write); skipping system metrics update for this cycle."
218 );
219 }
220
221 let mut bp_metrics_data = BlueprintMetrics::default();
222 if let Ok(custom) = custom_metrics.try_read() {
223 bp_metrics_data.custom_metrics = custom.clone();
224 } else {
225 println!(
226 "COLLECTION_LOOP_ERROR: Failed to acquire custom_metrics read lock (try_read); skipping custom metrics update for this cycle."
227 );
228 }
229
230 if let Ok(mut metrics) = blueprint_metrics.try_write() {
231 metrics.push(bp_metrics_data);
232 if metrics.len() > config.max_history {
233 metrics.remove(0);
234 }
235 } else {
236 println!(
237 "COLLECTION_LOOP_ERROR: Failed to acquire blueprint_metrics write lock (try_write); skipping blueprint metrics update for this cycle."
238 );
239 }
240
241 if let Ok(mut status) = blueprint_status.try_write() {
242 status.uptime = Instant::now().duration_since(start_time).as_secs();
243 status.timestamp = SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .unwrap_or_default()
246 .as_secs();
247 } else {
248 println!(
249 "COLLECTION_LOOP_ERROR: Failed to acquire blueprint_status write lock (try_write); skipping blueprint status update for this cycle."
250 );
251 }
252 println!("COLLECTION_LOOP_INFO: Metrics collection cycle finished.");
253 }
254 });
255 Ok(())
256 }
257}