blueprint_qos/metrics/prometheus/
collector.rs

1use blueprint_core::debug;
2use prometheus::{Gauge, HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::metrics::types::{BlueprintStatus, MetricsConfig, SystemMetrics};
7
8/// Prometheus metrics collector
9pub struct PrometheusCollector {
10    registry: Arc<Registry>,
11
12    // System metrics
13    cpu_usage: Gauge,
14    memory_usage: IntGauge,
15    total_memory: IntGauge,
16    disk_usage: IntGauge,
17    total_disk: IntGauge,
18    network_rx_bytes: IntCounter,
19    network_tx_bytes: IntCounter,
20
21    // Blueprint metrics
22    job_executions: IntCounterVec,
23    job_execution_time: HistogramVec,
24    job_errors: IntCounterVec,
25
26    // Status metrics
27    uptime: IntGauge,
28    last_heartbeat: IntGauge,
29    status_code: IntGauge,
30
31    // Configuration
32    #[allow(dead_code)]
33    config: MetricsConfig,
34
35    // Custom metrics storage
36    custom_metrics: Arc<RwLock<std::collections::HashMap<String, String>>>,
37}
38
39impl PrometheusCollector {
40    /// Create a new Prometheus metrics collector
41    ///
42    /// # Errors
43    /// Returns an error if any of the Prometheus metrics cannot be registered
44    pub fn new(config: MetricsConfig, registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
45        // System metrics
46        let cpu_usage = Gauge::new("blueprint_cpu_usage", "CPU usage percentage")?;
47        let memory_usage = IntGauge::new("blueprint_memory_usage", "Memory usage in bytes")?;
48        let total_memory =
49            IntGauge::new("blueprint_total_memory", "Total memory available in bytes")?;
50        let disk_usage = IntGauge::new("blueprint_disk_usage", "Disk usage in bytes")?;
51        let total_disk = IntGauge::new("blueprint_total_disk", "Total disk space in bytes")?;
52        let network_rx_bytes =
53            IntCounter::new("blueprint_network_rx_bytes", "Network received bytes")?;
54        let network_tx_bytes =
55            IntCounter::new("blueprint_network_tx_bytes", "Network transmitted bytes")?;
56
57        // Blueprint metrics
58        let job_executions = IntCounterVec::new(
59            prometheus::opts!("blueprint_job_executions", "Number of job executions"),
60            &["job_id", "service_id", "blueprint_id"],
61        )?;
62        let job_execution_time = HistogramVec::new(
63            prometheus::histogram_opts!(
64                "blueprint_job_execution_time",
65                "Job execution time in seconds"
66            ),
67            &["job_id", "service_id", "blueprint_id"],
68        )?;
69        let job_errors = IntCounterVec::new(
70            prometheus::opts!("blueprint_job_errors", "Number of job errors"),
71            &["job_id", "service_id", "blueprint_id", "error_type"],
72        )?;
73
74        // Status metrics
75        let uptime = IntGauge::new("blueprint_uptime", "Uptime in seconds")?;
76        let last_heartbeat = IntGauge::new(
77            "blueprint_last_heartbeat",
78            "Last heartbeat time as Unix timestamp",
79        )?;
80        let status_code = IntGauge::new("blueprint_status_code", "Status code")?;
81
82        // Register metrics with the provided registry
83        registry.register(Box::new(cpu_usage.clone()))?;
84        registry.register(Box::new(memory_usage.clone()))?;
85        registry.register(Box::new(total_memory.clone()))?;
86        registry.register(Box::new(disk_usage.clone()))?;
87        registry.register(Box::new(total_disk.clone()))?;
88        registry.register(Box::new(network_rx_bytes.clone()))?;
89        registry.register(Box::new(network_tx_bytes.clone()))?;
90        registry.register(Box::new(job_executions.clone()))?;
91        registry.register(Box::new(job_execution_time.clone()))?;
92        registry.register(Box::new(job_errors.clone()))?;
93        registry.register(Box::new(uptime.clone()))?;
94        registry.register(Box::new(last_heartbeat.clone()))?;
95        registry.register(Box::new(status_code.clone()))?;
96
97        Ok(Self {
98            registry,
99            cpu_usage,
100            memory_usage,
101            total_memory,
102            disk_usage,
103            total_disk,
104            network_rx_bytes,
105            network_tx_bytes,
106            job_executions,
107            job_execution_time,
108            job_errors,
109            uptime,
110            last_heartbeat,
111            status_code,
112            config,
113            custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
114        })
115    }
116
117    /// Get the Prometheus registry
118    #[must_use]
119    pub fn registry(&self) -> Arc<Registry> {
120        self.registry.clone()
121    }
122
123    /// Update system metrics
124    pub fn update_system_metrics(&self, metrics: &SystemMetrics) {
125        self.cpu_usage.set(f64::from(metrics.cpu_usage));
126        self.memory_usage
127            .set(metrics.memory_usage.try_into().unwrap_or(i64::MAX));
128        self.total_memory
129            .set(metrics.total_memory.try_into().unwrap_or(i64::MAX));
130        self.disk_usage
131            .set(metrics.disk_usage.try_into().unwrap_or(i64::MAX));
132        self.total_disk
133            .set(metrics.total_disk.try_into().unwrap_or(i64::MAX));
134
135        self.network_rx_bytes.inc_by(metrics.network_rx_bytes);
136        self.network_tx_bytes.inc_by(metrics.network_tx_bytes);
137
138        debug!("Updated system metrics in Prometheus");
139    }
140
141    /// Update blueprint status
142    pub fn update_blueprint_status(&self, status: &BlueprintStatus) {
143        self.uptime
144            .set(status.uptime.try_into().unwrap_or(i64::MAX));
145        if let Some(last_heartbeat) = status.last_heartbeat {
146            self.last_heartbeat
147                .set(last_heartbeat.try_into().unwrap_or(i64::MAX));
148        }
149        self.status_code.set(i64::from(status.status_code));
150
151        debug!("Updated blueprint status in Prometheus");
152    }
153
154    /// Record job execution
155    pub fn record_job_execution(
156        &self,
157        job_id: u64,
158        service_id: u64,
159        blueprint_id: u64,
160        execution_time: f64,
161    ) {
162        let job_id_str = job_id.to_string();
163        let service_id_str = service_id.to_string();
164        let blueprint_id_str = blueprint_id.to_string();
165        let labels = [
166            job_id_str.as_str(),
167            service_id_str.as_str(),
168            blueprint_id_str.as_str(),
169        ];
170
171        self.job_executions.with_label_values(&labels).inc();
172        self.job_execution_time
173            .with_label_values(&labels)
174            .observe(execution_time);
175
176        debug!(
177            job_id = job_id,
178            service_id = service_id,
179            blueprint_id = blueprint_id,
180            execution_time = execution_time,
181            "Recorded job execution in Prometheus"
182        );
183    }
184
185    /// Record job error
186    pub fn record_job_error(
187        &self,
188        job_id: u64,
189        service_id: u64,
190        blueprint_id: u64,
191        error_type: &str,
192    ) {
193        let job_id_str = job_id.to_string();
194        let service_id_str = service_id.to_string();
195        let blueprint_id_str = blueprint_id.to_string();
196        let labels = [
197            job_id_str.as_str(),
198            service_id_str.as_str(),
199            blueprint_id_str.as_str(),
200            error_type,
201        ];
202
203        self.job_errors.with_label_values(&labels).inc();
204
205        debug!(
206            job_id = job_id,
207            service_id = service_id,
208            blueprint_id = blueprint_id,
209            error_type = error_type,
210            "Recorded job error in Prometheus"
211        );
212    }
213
214    /// Add custom metric
215    pub async fn add_custom_metric(&self, key: String, value: String) {
216        let mut custom_metrics = self.custom_metrics.write().await;
217        custom_metrics.insert(key.clone(), value.clone());
218        debug!(key = key, value = value, "Added custom metric");
219    }
220
221    /// Get custom metrics
222    pub async fn get_custom_metrics(&self) -> std::collections::HashMap<String, String> {
223        let custom_metrics = self.custom_metrics.read().await;
224        custom_metrics.clone()
225    }
226}