blueprint_qos/metrics/prometheus/
collector.rs1use blueprint_core::debug;
2use prometheus::{Gauge, HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::metrics::types::{BlueprintStatus, MetricsConfig, SystemMetrics};
7
8pub struct PrometheusCollector {
10 registry: Arc<Registry>,
11
12 cpu_usage: Gauge,
14 memory_usage: IntGauge,
15 total_memory: IntGauge,
16 disk_usage: IntGauge,
17 total_disk: IntGauge,
18 network_rx_bytes: IntCounter,
19 network_tx_bytes: IntCounter,
20
21 job_executions: IntCounterVec,
23 job_execution_time: HistogramVec,
24 job_errors: IntCounterVec,
25
26 uptime: IntGauge,
28 last_heartbeat: IntGauge,
29 status_code: IntGauge,
30
31 #[allow(dead_code)]
33 config: MetricsConfig,
34
35 custom_metrics: Arc<RwLock<std::collections::HashMap<String, String>>>,
37}
38
39impl PrometheusCollector {
40 pub fn new(config: MetricsConfig, registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
45 let cpu_usage = Gauge::new("blueprint_cpu_usage", "CPU usage percentage")?;
47 let memory_usage = IntGauge::new("blueprint_memory_usage", "Memory usage in bytes")?;
48 let total_memory =
49 IntGauge::new("blueprint_total_memory", "Total memory available in bytes")?;
50 let disk_usage = IntGauge::new("blueprint_disk_usage", "Disk usage in bytes")?;
51 let total_disk = IntGauge::new("blueprint_total_disk", "Total disk space in bytes")?;
52 let network_rx_bytes =
53 IntCounter::new("blueprint_network_rx_bytes", "Network received bytes")?;
54 let network_tx_bytes =
55 IntCounter::new("blueprint_network_tx_bytes", "Network transmitted bytes")?;
56
57 let job_executions = IntCounterVec::new(
59 prometheus::opts!("blueprint_job_executions", "Number of job executions"),
60 &["job_id", "service_id", "blueprint_id"],
61 )?;
62 let job_execution_time = HistogramVec::new(
63 prometheus::histogram_opts!(
64 "blueprint_job_execution_time",
65 "Job execution time in seconds"
66 ),
67 &["job_id", "service_id", "blueprint_id"],
68 )?;
69 let job_errors = IntCounterVec::new(
70 prometheus::opts!("blueprint_job_errors", "Number of job errors"),
71 &["job_id", "service_id", "blueprint_id", "error_type"],
72 )?;
73
74 let uptime = IntGauge::new("blueprint_uptime", "Uptime in seconds")?;
76 let last_heartbeat = IntGauge::new(
77 "blueprint_last_heartbeat",
78 "Last heartbeat time as Unix timestamp",
79 )?;
80 let status_code = IntGauge::new("blueprint_status_code", "Status code")?;
81
82 registry.register(Box::new(cpu_usage.clone()))?;
84 registry.register(Box::new(memory_usage.clone()))?;
85 registry.register(Box::new(total_memory.clone()))?;
86 registry.register(Box::new(disk_usage.clone()))?;
87 registry.register(Box::new(total_disk.clone()))?;
88 registry.register(Box::new(network_rx_bytes.clone()))?;
89 registry.register(Box::new(network_tx_bytes.clone()))?;
90 registry.register(Box::new(job_executions.clone()))?;
91 registry.register(Box::new(job_execution_time.clone()))?;
92 registry.register(Box::new(job_errors.clone()))?;
93 registry.register(Box::new(uptime.clone()))?;
94 registry.register(Box::new(last_heartbeat.clone()))?;
95 registry.register(Box::new(status_code.clone()))?;
96
97 Ok(Self {
98 registry,
99 cpu_usage,
100 memory_usage,
101 total_memory,
102 disk_usage,
103 total_disk,
104 network_rx_bytes,
105 network_tx_bytes,
106 job_executions,
107 job_execution_time,
108 job_errors,
109 uptime,
110 last_heartbeat,
111 status_code,
112 config,
113 custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
114 })
115 }
116
117 #[must_use]
119 pub fn registry(&self) -> Arc<Registry> {
120 self.registry.clone()
121 }
122
123 pub fn update_system_metrics(&self, metrics: &SystemMetrics) {
125 self.cpu_usage.set(f64::from(metrics.cpu_usage));
126 self.memory_usage
127 .set(metrics.memory_usage.try_into().unwrap_or(i64::MAX));
128 self.total_memory
129 .set(metrics.total_memory.try_into().unwrap_or(i64::MAX));
130 self.disk_usage
131 .set(metrics.disk_usage.try_into().unwrap_or(i64::MAX));
132 self.total_disk
133 .set(metrics.total_disk.try_into().unwrap_or(i64::MAX));
134
135 self.network_rx_bytes.inc_by(metrics.network_rx_bytes);
136 self.network_tx_bytes.inc_by(metrics.network_tx_bytes);
137
138 debug!("Updated system metrics in Prometheus");
139 }
140
141 pub fn update_blueprint_status(&self, status: &BlueprintStatus) {
143 self.uptime
144 .set(status.uptime.try_into().unwrap_or(i64::MAX));
145 if let Some(last_heartbeat) = status.last_heartbeat {
146 self.last_heartbeat
147 .set(last_heartbeat.try_into().unwrap_or(i64::MAX));
148 }
149 self.status_code.set(i64::from(status.status_code));
150
151 debug!("Updated blueprint status in Prometheus");
152 }
153
154 pub fn record_job_execution(
156 &self,
157 job_id: u64,
158 service_id: u64,
159 blueprint_id: u64,
160 execution_time: f64,
161 ) {
162 let job_id_str = job_id.to_string();
163 let service_id_str = service_id.to_string();
164 let blueprint_id_str = blueprint_id.to_string();
165 let labels = [
166 job_id_str.as_str(),
167 service_id_str.as_str(),
168 blueprint_id_str.as_str(),
169 ];
170
171 self.job_executions.with_label_values(&labels).inc();
172 self.job_execution_time
173 .with_label_values(&labels)
174 .observe(execution_time);
175
176 debug!(
177 job_id = job_id,
178 service_id = service_id,
179 blueprint_id = blueprint_id,
180 execution_time = execution_time,
181 "Recorded job execution in Prometheus"
182 );
183 }
184
185 pub fn record_job_error(
187 &self,
188 job_id: u64,
189 service_id: u64,
190 blueprint_id: u64,
191 error_type: &str,
192 ) {
193 let job_id_str = job_id.to_string();
194 let service_id_str = service_id.to_string();
195 let blueprint_id_str = blueprint_id.to_string();
196 let labels = [
197 job_id_str.as_str(),
198 service_id_str.as_str(),
199 blueprint_id_str.as_str(),
200 error_type,
201 ];
202
203 self.job_errors.with_label_values(&labels).inc();
204
205 debug!(
206 job_id = job_id,
207 service_id = service_id,
208 blueprint_id = blueprint_id,
209 error_type = error_type,
210 "Recorded job error in Prometheus"
211 );
212 }
213
214 pub async fn add_custom_metric(&self, key: String, value: String) {
216 let mut custom_metrics = self.custom_metrics.write().await;
217 custom_metrics.insert(key.clone(), value.clone());
218 debug!(key = key, value = value, "Added custom metric");
219 }
220
221 pub async fn get_custom_metrics(&self) -> std::collections::HashMap<String, String> {
223 let custom_metrics = self.custom_metrics.read().await;
224 custom_metrics.clone()
225 }
226}