blueprint_qos/metrics/provider/
enhanced.rs1use blueprint_core::error;
2use blueprint_core::{debug, info};
3use prometheus::Registry;
4use std::sync::Arc;
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6use tokio::sync::RwLock;
7
8use crate::error::Result;
9use crate::metrics::opentelemetry::{OpenTelemetryConfig, OpenTelemetryExporter};
10use crate::metrics::prometheus::PrometheusCollector;
11use crate::metrics::types::{
12 BlueprintMetrics, BlueprintStatus, MetricsConfig, MetricsProvider, SystemMetrics,
13};
14use crate::servers::ServerManager;
15use crate::servers::prometheus::PrometheusServer;
16use opentelemetry::KeyValue;
17
18#[derive(Clone)]
26pub struct EnhancedMetricsProvider {
27 system_metrics: Arc<RwLock<Vec<SystemMetrics>>>,
29
30 blueprint_metrics: Arc<RwLock<Vec<BlueprintMetrics>>>,
32
33 blueprint_status: Arc<RwLock<BlueprintStatus>>,
35
36 custom_metrics: Arc<RwLock<std::collections::HashMap<String, String>>>,
38
39 prometheus_collector: Arc<PrometheusCollector>,
41
42 opentelemetry_exporter: Arc<OpenTelemetryExporter>,
44
45 prometheus_server: Arc<RwLock<Option<PrometheusServer>>>,
47
48 shared_registry: Arc<Registry>,
50
51 otel_job_executions_counter: opentelemetry::metrics::Counter<u64>,
53
54 config: MetricsConfig,
56
57 start_time: Instant,
59}
60
61impl EnhancedMetricsProvider {
62 pub fn new(metrics_config: MetricsConfig, otel_config: &OpenTelemetryConfig) -> Result<Self> {
76 let shared_registry = Arc::new(Registry::new());
77 let prometheus_collector = Arc::new(
78 PrometheusCollector::new(metrics_config.clone(), shared_registry.clone()).map_err(
79 |e| {
80 crate::error::Error::Other(format!(
81 "Failed to create Prometheus collector: {}",
82 e
83 ))
84 },
85 )?,
86 );
87
88 let otel_exporter_instance =
89 OpenTelemetryExporter::new(otel_config, shared_registry.clone())?;
90
91 info!("OpenTelemetryExporter initialized with shared Prometheus registry.");
92
93 let opentelemetry_exporter = Arc::new(otel_exporter_instance);
94 info!(
95 "Created and configured OpenTelemetryExporter in EnhancedMetricsProvider: {:?}",
96 opentelemetry_exporter
97 );
98
99 let otel_job_executions_counter = opentelemetry_exporter
100 .meter()
101 .u64_counter("otel_job_executions")
102 .with_description("Total number of job executions recorded via OTel")
103 .build();
104 info!("Created otel_job_executions_counter in EnhancedMetricsProvider");
105
106 let blueprint_status = BlueprintStatus {
107 service_id: metrics_config.service_id,
108 blueprint_id: metrics_config.blueprint_id,
109 ..BlueprintStatus::default()
110 };
111
112 let provider = Self {
113 system_metrics: Arc::new(RwLock::new(Vec::new())),
114 blueprint_metrics: Arc::new(RwLock::new(Vec::new())),
115 blueprint_status: Arc::new(RwLock::new(blueprint_status)),
116 custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
117 prometheus_collector,
118 opentelemetry_exporter,
119 prometheus_server: Arc::new(RwLock::new(None)),
120 shared_registry,
121 otel_job_executions_counter,
122 config: metrics_config,
123 start_time: Instant::now(),
124 };
125
126 Ok(provider)
127 }
128
129 pub async fn start_collection(self: Arc<Self>) -> Result<()> {
140 let prometheus_server_config = self.config.prometheus_server.clone().unwrap_or_default();
141
142 let server = PrometheusServer::new(
143 prometheus_server_config,
144 Some(self.shared_registry.clone()),
145 self.clone(),
146 )?;
147 server.start(None, None).await?;
148
149 let mut prometheus_server = self.prometheus_server.write().await;
150 *prometheus_server = Some(server);
151
152 let system_metrics = self.system_metrics.clone();
153 let blueprint_metrics = self.blueprint_metrics.clone();
154 let blueprint_status = self.blueprint_status.clone();
155 let custom_metrics = self.custom_metrics.clone();
156 let prometheus_collector = self.prometheus_collector.clone();
157 let start_time = self.start_time;
158 let config = self.config.clone();
159
160 tokio::spawn(async move {
161 let mut interval =
162 tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
163
164 loop {
165 interval.tick().await;
166
167 let sys_metrics = Self::collect_system_metrics();
168
169 prometheus_collector.update_system_metrics(&sys_metrics);
170
171 let mut metrics = system_metrics.write().await;
172 metrics.push(sys_metrics);
173 if metrics.len() > config.max_history {
174 metrics.remove(0);
175 }
176
177 let mut bp_metrics = BlueprintMetrics::default();
178 let custom = custom_metrics.read().await;
179 bp_metrics.custom_metrics = custom.clone();
180
181 let mut metrics = blueprint_metrics.write().await;
182 metrics.push(bp_metrics);
183 if metrics.len() > config.max_history {
184 metrics.remove(0);
185 }
186
187 let mut status = blueprint_status.write().await;
188 status.uptime = start_time.elapsed().as_secs();
189 status.timestamp = SystemTime::now()
190 .duration_since(UNIX_EPOCH)
191 .unwrap_or_default()
192 .as_secs();
193
194 prometheus_collector.update_blueprint_status(&status);
195
196 debug!("Collected metrics");
197 }
198 });
199
200 info!("Started metrics collection");
201 Ok(())
202 }
203
204 fn collect_system_metrics() -> SystemMetrics {
210 let mut sys = sysinfo::System::new_all();
211 sys.refresh_all();
212
213 let memory_usage = sys.used_memory();
214 let total_memory = sys.total_memory();
215 let cpu_usage = sys.global_cpu_usage();
216
217 let disk_usage = 0;
219 let total_disk = 0;
220 let network_rx_bytes = 0;
221 let network_tx_bytes = 0;
222
223 let timestamp = SystemTime::now()
224 .duration_since(UNIX_EPOCH)
225 .unwrap_or_default()
226 .as_secs();
227
228 SystemMetrics {
229 cpu_usage,
230 memory_usage,
231 total_memory,
232 disk_usage,
233 total_disk,
234 network_rx_bytes,
235 network_tx_bytes,
236 timestamp,
237 }
238 }
239
240 pub fn record_job_execution(
253 &self,
254 job_id: u64,
255 execution_time: f64,
256 service_id: u64,
257 blueprint_id: u64,
258 ) {
259 info!(
260 "Recording job execution (job_id: {}). Incrementing otel_job_executions_counter.",
261 job_id
262 );
263 self.otel_job_executions_counter.add(
264 1,
265 &[
266 KeyValue::new("service_id", service_id.to_string()),
267 KeyValue::new("blueprint_id", blueprint_id.to_string()),
268 ],
269 );
270
271 self.prometheus_collector.record_job_execution(
272 job_id,
273 self.config.service_id,
274 self.config.blueprint_id,
275 execution_time,
276 );
277 }
278
279 pub fn record_job_error(&self, job_id: u64, error_type: &str) {
288 self.prometheus_collector.record_job_error(
289 job_id,
290 self.config.service_id,
291 self.config.blueprint_id,
292 error_type,
293 );
294 }
295
296 #[must_use]
302 pub fn opentelemetry_exporter(&self) -> Arc<OpenTelemetryExporter> {
303 self.opentelemetry_exporter.clone()
304 }
305
306 #[must_use]
311 pub fn prometheus_collector(&self) -> Arc<PrometheusCollector> {
312 self.prometheus_collector.clone()
313 }
314
315 #[must_use]
320 pub fn get_otel_job_executions_counter(&self) -> opentelemetry::metrics::Counter<u64> {
321 self.otel_job_executions_counter.clone()
322 }
323
324 #[must_use]
330 pub fn shared_registry(&self) -> Arc<Registry> {
331 self.shared_registry.clone()
332 }
333
334 pub fn force_flush_otel_metrics(&self) -> crate::error::Result<()> {
343 info!("EnhancedMetricsProvider: Attempting to force flush OpenTelemetry metrics...");
344 match self.opentelemetry_exporter.force_flush() {
345 Ok(()) => {
346 info!("EnhancedMetricsProvider: OpenTelemetry metrics force_flush successful.");
347 Ok(())
348 }
349 Err(err) => {
350 error!(
351 "EnhancedMetricsProvider: OpenTelemetry metrics force_flush failed: {:?}",
352 err
353 );
354 Err(crate::error::Error::Metrics(format!(
355 "OpenTelemetry SDK flush error: {}",
356 err
357 )))
358 }
359 }
360 }
361}
362
363impl MetricsProvider for EnhancedMetricsProvider {
364 async fn get_system_metrics(&self) -> SystemMetrics {
366 self.system_metrics
367 .read()
368 .await
369 .last()
370 .cloned()
371 .unwrap_or_default()
372 }
373
374 async fn get_blueprint_metrics(&self) -> BlueprintMetrics {
376 self.blueprint_metrics
377 .read()
378 .await
379 .last()
380 .cloned()
381 .unwrap_or_default()
382 }
383
384 async fn get_blueprint_status(&self) -> BlueprintStatus {
386 self.blueprint_status.read().await.clone()
387 }
388
389 async fn get_system_metrics_history(&self) -> Vec<SystemMetrics> {
391 self.system_metrics.read().await.clone()
392 }
393
394 async fn get_blueprint_metrics_history(&self) -> Vec<BlueprintMetrics> {
396 self.blueprint_metrics.read().await.clone()
397 }
398
399 async fn add_custom_metric(&self, key: String, value: String) {
405 let pc_key = key.clone();
406 let pc_value = value.clone();
407
408 let mut custom = self.custom_metrics.write().await;
409 custom.insert(key, value);
410
411 self.prometheus_collector
412 .add_custom_metric(pc_key, pc_value)
413 .await;
414 }
415
416 async fn set_blueprint_status(&self, status_code: u32, status_message: Option<String>) {
422 let mut status = self.blueprint_status.write().await;
423 status.status_code = status_code;
424 status.status_message = status_message;
425 self.prometheus_collector.update_blueprint_status(&status);
426 }
427
428 async fn update_last_heartbeat(&self, timestamp: u64) {
433 let mut status = self.blueprint_status.write().await;
434 status.last_heartbeat = Some(timestamp);
435 self.prometheus_collector.update_blueprint_status(&status);
436 }
437
438 async fn start_collection(&self) -> crate::error::Result<()> {
446 let prometheus_server_config = self.config.prometheus_server.clone().unwrap_or_default();
447
448 let server = PrometheusServer::new(
449 prometheus_server_config,
450 Some(self.shared_registry.clone()),
451 Arc::new(self.clone()),
452 )?;
453 server.start(None, None).await?;
454
455 let mut prometheus_server = self.prometheus_server.write().await;
456 *prometheus_server = Some(server);
457
458 let system_metrics = self.system_metrics.clone();
459 let blueprint_metrics = self.blueprint_metrics.clone();
460 let blueprint_status = self.blueprint_status.clone();
461 let custom_metrics = self.custom_metrics.clone();
462 let prometheus_collector = self.prometheus_collector.clone();
463 let start_time = self.start_time;
464 let config = self.config.clone();
465
466 tokio::spawn(async move {
467 let mut interval =
468 tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
469
470 loop {
471 interval.tick().await;
472
473 let sys_metrics = Self::collect_system_metrics();
474
475 prometheus_collector.update_system_metrics(&sys_metrics);
476
477 let mut metrics = system_metrics.write().await;
478 metrics.push(sys_metrics);
479 if metrics.len() > config.max_history {
480 metrics.remove(0);
481 }
482
483 let mut bp_metrics = BlueprintMetrics::default();
484 let custom = custom_metrics.read().await;
485 bp_metrics.custom_metrics = custom.clone();
486
487 let mut metrics = blueprint_metrics.write().await;
488 metrics.push(bp_metrics);
489 if metrics.len() > config.max_history {
490 metrics.remove(0);
491 }
492
493 let mut status = blueprint_status.write().await;
494 status.uptime = start_time.elapsed().as_secs();
495 status.timestamp = SystemTime::now()
496 .duration_since(UNIX_EPOCH)
497 .unwrap_or_default()
498 .as_secs();
499
500 prometheus_collector.update_blueprint_status(&status);
501
502 debug!("Collected metrics");
503 }
504 });
505
506 info!("Started metrics collection");
507 Ok(())
508 }
509}