blueprint_qos/metrics/provider/
default.rs

1use std::sync::Arc; // Arc can remain std::sync::Arc as tokio::sync::RwLock is Send + Sync
2use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3use tokio::sync::RwLock; // Changed from std::sync to tokio::sync
4
5use crate::metrics::types::{
6    BlueprintMetrics, BlueprintStatus, MetricsConfig, MetricsProvider, SystemMetrics,
7};
8
9/// Default metrics provider implementation
10pub struct DefaultMetricsProvider {
11    system_metrics: Arc<tokio::sync::RwLock<Vec<SystemMetrics>>>,
12    blueprint_metrics: Arc<tokio::sync::RwLock<Vec<BlueprintMetrics>>>,
13    blueprint_status: Arc<tokio::sync::RwLock<BlueprintStatus>>,
14    custom_metrics: Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>,
15    config: MetricsConfig,
16    start_time: Instant,
17}
18
19impl DefaultMetricsProvider {
20    /// Create a new `DefaultMetricsProvider`
21    #[must_use]
22    pub fn new(config: MetricsConfig) -> Self {
23        Self {
24            system_metrics: Arc::new(RwLock::new(Vec::new())),
25            blueprint_metrics: Arc::new(RwLock::new(Vec::new())),
26            blueprint_status: Arc::new(RwLock::new(BlueprintStatus::default())),
27            custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
28            config,
29            start_time: Instant::now(),
30        }
31    }
32
33    /// Test helper: Clones the internal Arc for `system_metrics`.
34    #[must_use]
35    pub fn system_metrics_arc_clone(&self) -> Arc<tokio::sync::RwLock<Vec<SystemMetrics>>> {
36        self.system_metrics.clone()
37    }
38
39    /// Test helper: Clones the internal Arc for `blueprint_metrics`.
40    #[must_use]
41    pub fn blueprint_metrics_arc_clone(&self) -> Arc<tokio::sync::RwLock<Vec<BlueprintMetrics>>> {
42        self.blueprint_metrics.clone()
43    }
44
45    /// Test helper: Clones the internal Arc for `blueprint_status`.
46    #[must_use]
47    pub fn blueprint_status_arc_clone(&self) -> Arc<tokio::sync::RwLock<BlueprintStatus>> {
48        self.blueprint_status.clone()
49    }
50
51    /// Test helper: Clones the internal Arc for `custom_metrics`.
52    #[must_use]
53    pub fn custom_metrics_arc_clone(
54        &self,
55    ) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
56        self.custom_metrics.clone()
57    }
58
59    /// Collect system metrics
60    fn collect_system_metrics() -> SystemMetrics {
61        use sysinfo::{Disks, Networks, System};
62
63        let mut sys = System::new_all();
64        sys.refresh_all();
65
66        let cpu_usage = sys.global_cpu_usage();
67
68        let memory_usage = sys.used_memory();
69        let total_memory = sys.total_memory();
70
71        let mut disk_usage: u64 = 0;
72        let mut total_disk: u64 = 0;
73        for disk in Disks::new_with_refreshed_list().list() {
74            total_disk += disk.total_space();
75            disk_usage += disk.total_space() - disk.available_space();
76        }
77
78        let mut network_rx_bytes: u64 = 0;
79        let mut network_tx_bytes: u64 = 0;
80        for data in Networks::new_with_refreshed_list().list().values() {
81            network_rx_bytes += data.received();
82            network_tx_bytes += data.transmitted();
83        }
84
85        let timestamp = SystemTime::now()
86            .duration_since(UNIX_EPOCH)
87            .unwrap_or_default()
88            .as_secs();
89
90        SystemMetrics {
91            cpu_usage,
92            memory_usage,
93            total_memory,
94            disk_usage,
95            total_disk,
96            network_rx_bytes,
97            network_tx_bytes,
98            timestamp,
99        }
100    }
101}
102
103impl MetricsProvider for DefaultMetricsProvider {
104    /// Returns the latest collected `SystemMetrics`.
105    async fn get_system_metrics(&self) -> SystemMetrics {
106        if let Ok(metrics_guard) = self.system_metrics.try_read() {
107            metrics_guard.last().cloned().unwrap_or_default()
108        } else {
109            eprintln!(
110                "ERROR: System metrics lock poisoned or unavailable in get_system_metrics. Returning default."
111            );
112            SystemMetrics::default()
113        }
114    }
115    /// Returns the latest collected `BlueprintMetrics`.
116    async fn get_blueprint_metrics(&self) -> BlueprintMetrics {
117        if let Ok(metrics_guard) = self.blueprint_metrics.try_read() {
118            metrics_guard.last().cloned().unwrap_or_default()
119        } else {
120            eprintln!(
121                "ERROR: Blueprint metrics lock poisoned or unavailable in get_blueprint_metrics. Returning default."
122            );
123            BlueprintMetrics::default()
124        }
125    }
126    /// Returns the current `BlueprintStatus`.
127    async fn get_blueprint_status(&self) -> BlueprintStatus {
128        if let Ok(status_guard) = self.blueprint_status.try_read() {
129            status_guard.clone()
130        } else {
131            eprintln!(
132                "ERROR: Blueprint status lock poisoned or unavailable in get_blueprint_status. Returning default."
133            );
134            BlueprintStatus::default()
135        }
136    }
137    /// Returns a history of collected `SystemMetrics`.
138    async fn get_system_metrics_history(&self) -> Vec<SystemMetrics> {
139        if let Ok(metrics_guard) = self.system_metrics.try_read() {
140            metrics_guard.clone()
141        } else {
142            eprintln!(
143                "ERROR: System metrics lock poisoned or unavailable in get_system_metrics_history. Returning empty history."
144            );
145            Vec::new()
146        }
147    }
148    /// Returns a history of collected `BlueprintMetrics`.
149    async fn get_blueprint_metrics_history(&self) -> Vec<BlueprintMetrics> {
150        if let Ok(metrics_guard) = self.blueprint_metrics.try_read() {
151            metrics_guard.clone()
152        } else {
153            eprintln!(
154                "ERROR: Blueprint metrics lock poisoned or unavailable in get_blueprint_metrics_history. Returning empty history."
155            );
156            Vec::new()
157        }
158    }
159    /// Adds a custom key-value metric.
160    async fn add_custom_metric(&self, key: String, value: String) {
161        if let Ok(mut custom_guard) = self.custom_metrics.try_write() {
162            custom_guard.insert(key, value);
163        } else {
164            eprintln!(
165                "ERROR: Custom metrics lock poisoned or unavailable in add_custom_metric. Metric not added."
166            );
167        }
168    }
169    /// Sets the current `BlueprintStatus`.
170    async fn set_blueprint_status(&self, status_code: u32, status_message: Option<String>) {
171        if let Ok(mut status_guard) = self.blueprint_status.try_write() {
172            status_guard.status_code = status_code;
173            status_guard.status_message = status_message;
174        } else {
175            eprintln!(
176                "ERROR: Blueprint status lock poisoned or unavailable in set_blueprint_status. Status not set."
177            );
178        }
179    }
180    /// Updates the last heartbeat timestamp in `BlueprintStatus`.
181    async fn update_last_heartbeat(&self, timestamp: u64) {
182        if let Ok(mut status_guard) = self.blueprint_status.try_write() {
183            status_guard.last_heartbeat = Some(timestamp);
184        } else {
185            eprintln!(
186                "ERROR: Blueprint status lock poisoned or unavailable in update_last_heartbeat. Heartbeat not updated."
187            );
188        }
189    }
190
191    /// Starts the background task for periodic metrics collection.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if the metrics collection background task cannot be started.
196    async fn start_collection(&self) -> Result<(), crate::error::Error> {
197        let system_metrics = self.system_metrics.clone();
198        let blueprint_metrics = self.blueprint_metrics.clone();
199        let blueprint_status = self.blueprint_status.clone();
200        let custom_metrics = self.custom_metrics.clone();
201        let start_time = self.start_time;
202        let config = self.config.clone();
203
204        tokio::spawn(async move {
205            let mut interval =
206                tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
207            loop {
208                interval.tick().await;
209                let sys_metrics_data = DefaultMetricsProvider::collect_system_metrics();
210                if let Ok(mut metrics) = system_metrics.try_write() {
211                    metrics.push(sys_metrics_data);
212                    if metrics.len() > config.max_history {
213                        metrics.remove(0);
214                    }
215                } else {
216                    println!(
217                        "COLLECTION_LOOP_ERROR: Failed to acquire system_metrics write lock (try_write); skipping system metrics update for this cycle."
218                    );
219                }
220
221                let mut bp_metrics_data = BlueprintMetrics::default();
222                if let Ok(custom) = custom_metrics.try_read() {
223                    bp_metrics_data.custom_metrics = custom.clone();
224                } else {
225                    println!(
226                        "COLLECTION_LOOP_ERROR: Failed to acquire custom_metrics read lock (try_read); skipping custom metrics update for this cycle."
227                    );
228                }
229
230                if let Ok(mut metrics) = blueprint_metrics.try_write() {
231                    metrics.push(bp_metrics_data);
232                    if metrics.len() > config.max_history {
233                        metrics.remove(0);
234                    }
235                } else {
236                    println!(
237                        "COLLECTION_LOOP_ERROR: Failed to acquire blueprint_metrics write lock (try_write); skipping blueprint metrics update for this cycle."
238                    );
239                }
240
241                if let Ok(mut status) = blueprint_status.try_write() {
242                    status.uptime = Instant::now().duration_since(start_time).as_secs();
243                    status.timestamp = SystemTime::now()
244                        .duration_since(UNIX_EPOCH)
245                        .unwrap_or_default()
246                        .as_secs();
247                } else {
248                    println!(
249                        "COLLECTION_LOOP_ERROR: Failed to acquire blueprint_status write lock (try_write); skipping blueprint status update for this cycle."
250                    );
251                }
252                println!("COLLECTION_LOOP_INFO: Metrics collection cycle finished.");
253            }
254        });
255        Ok(())
256    }
257}