blueprint_qos/metrics/provider/
enhanced.rs

1use blueprint_core::error;
2use blueprint_core::{debug, info};
3use prometheus::Registry;
4use std::sync::Arc;
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6use tokio::sync::RwLock;
7
8use crate::error::Result;
9use crate::metrics::opentelemetry::{OpenTelemetryConfig, OpenTelemetryExporter};
10use crate::metrics::prometheus::PrometheusCollector;
11use crate::metrics::types::{
12    BlueprintMetrics, BlueprintStatus, MetricsConfig, MetricsProvider, SystemMetrics,
13};
14use crate::servers::ServerManager;
15use crate::servers::prometheus::PrometheusServer;
16use opentelemetry::KeyValue;
17
18/// A comprehensive metrics provider that integrates Prometheus and OpenTelemetry systems.
19///
20/// This provider acts as the central metrics collection and export hub for the `QoS` system,
21/// collecting system metrics, application-specific metrics, and custom metrics. It manages
22/// metric collection, storage, and export to monitoring systems through Prometheus and
23/// OpenTelemetry protocols. The provider supports historical metrics collection and
24/// can manage an embedded Prometheus server for metrics exposure.
25#[derive(Clone)]
26pub struct EnhancedMetricsProvider {
27    /// System metrics
28    system_metrics: Arc<RwLock<Vec<SystemMetrics>>>,
29
30    /// Blueprint metrics
31    blueprint_metrics: Arc<RwLock<Vec<BlueprintMetrics>>>,
32
33    /// Blueprint status
34    blueprint_status: Arc<RwLock<BlueprintStatus>>,
35
36    /// Custom metrics
37    custom_metrics: Arc<RwLock<std::collections::HashMap<String, String>>>,
38
39    /// Prometheus collector
40    prometheus_collector: Arc<PrometheusCollector>,
41
42    /// OpenTelemetry exporter
43    opentelemetry_exporter: Arc<OpenTelemetryExporter>,
44
45    /// Prometheus server
46    prometheus_server: Arc<RwLock<Option<PrometheusServer>>>,
47
48    /// Shared Prometheus registry for all metrics
49    shared_registry: Arc<Registry>,
50
51    /// OpenTelemetry counter for job executions
52    otel_job_executions_counter: opentelemetry::metrics::Counter<u64>,
53
54    /// Configuration
55    config: MetricsConfig,
56
57    /// Start time
58    start_time: Instant,
59}
60
61impl EnhancedMetricsProvider {
62    /// Creates a new enhanced metrics provider with Prometheus and OpenTelemetry support.
63    ///
64    /// Initializes the metrics collection infrastructure including Prometheus collectors,
65    /// OpenTelemetry exporters, and shared registries. Sets up metric collection for both
66    /// system-level and application-specific metrics, and prepares the provider for metrics
67    /// export through multiple protocols.
68    ///
69    /// # Parameters
70    /// * `metrics_config` - Configuration for metrics collection, retention, and reporting
71    /// * `otel_config` - OpenTelemetry-specific configuration settings
72    ///
73    /// # Errors
74    /// Returns an error if the `PrometheusCollector` or `OpenTelemetryExporter` initialization fails.
75    pub fn new(metrics_config: MetricsConfig, otel_config: &OpenTelemetryConfig) -> Result<Self> {
76        let shared_registry = Arc::new(Registry::new());
77        let prometheus_collector = Arc::new(
78            PrometheusCollector::new(metrics_config.clone(), shared_registry.clone()).map_err(
79                |e| {
80                    crate::error::Error::Other(format!(
81                        "Failed to create Prometheus collector: {}",
82                        e
83                    ))
84                },
85            )?,
86        );
87
88        let otel_exporter_instance =
89            OpenTelemetryExporter::new(otel_config, shared_registry.clone())?;
90
91        info!("OpenTelemetryExporter initialized with shared Prometheus registry.");
92
93        let opentelemetry_exporter = Arc::new(otel_exporter_instance);
94        info!(
95            "Created and configured OpenTelemetryExporter in EnhancedMetricsProvider: {:?}",
96            opentelemetry_exporter
97        );
98
99        let otel_job_executions_counter = opentelemetry_exporter
100            .meter()
101            .u64_counter("otel_job_executions")
102            .with_description("Total number of job executions recorded via OTel")
103            .build();
104        info!("Created otel_job_executions_counter in EnhancedMetricsProvider");
105
106        let blueprint_status = BlueprintStatus {
107            service_id: metrics_config.service_id,
108            blueprint_id: metrics_config.blueprint_id,
109            ..BlueprintStatus::default()
110        };
111
112        let provider = Self {
113            system_metrics: Arc::new(RwLock::new(Vec::new())),
114            blueprint_metrics: Arc::new(RwLock::new(Vec::new())),
115            blueprint_status: Arc::new(RwLock::new(blueprint_status)),
116            custom_metrics: Arc::new(RwLock::new(std::collections::HashMap::new())),
117            prometheus_collector,
118            opentelemetry_exporter,
119            prometheus_server: Arc::new(RwLock::new(None)),
120            shared_registry,
121            otel_job_executions_counter,
122            config: metrics_config,
123            start_time: Instant::now(),
124        };
125
126        Ok(provider)
127    }
128
129    /// Starts the metrics collection and reporting process.
130    ///
131    /// This method initializes the background metrics collection task that periodically gathers
132    /// system and blueprint metrics. It also starts the Prometheus server if configured to
133    /// expose metrics via HTTP endpoints. This method should be called once after creating
134    /// the provider to begin the metrics pipeline.
135    ///
136    /// # Errors
137    /// Returns an error if the `PrometheusServer` fails to start or if the background
138    /// metrics collection task cannot be created.
139    pub async fn start_collection(self: Arc<Self>) -> Result<()> {
140        let prometheus_server_config = self.config.prometheus_server.clone().unwrap_or_default();
141
142        let server = PrometheusServer::new(
143            prometheus_server_config,
144            Some(self.shared_registry.clone()),
145            self.clone(),
146        )?;
147        server.start(None, None).await?;
148
149        let mut prometheus_server = self.prometheus_server.write().await;
150        *prometheus_server = Some(server);
151
152        let system_metrics = self.system_metrics.clone();
153        let blueprint_metrics = self.blueprint_metrics.clone();
154        let blueprint_status = self.blueprint_status.clone();
155        let custom_metrics = self.custom_metrics.clone();
156        let prometheus_collector = self.prometheus_collector.clone();
157        let start_time = self.start_time;
158        let config = self.config.clone();
159
160        tokio::spawn(async move {
161            let mut interval =
162                tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
163
164            loop {
165                interval.tick().await;
166
167                let sys_metrics = Self::collect_system_metrics();
168
169                prometheus_collector.update_system_metrics(&sys_metrics);
170
171                let mut metrics = system_metrics.write().await;
172                metrics.push(sys_metrics);
173                if metrics.len() > config.max_history {
174                    metrics.remove(0);
175                }
176
177                let mut bp_metrics = BlueprintMetrics::default();
178                let custom = custom_metrics.read().await;
179                bp_metrics.custom_metrics = custom.clone();
180
181                let mut metrics = blueprint_metrics.write().await;
182                metrics.push(bp_metrics);
183                if metrics.len() > config.max_history {
184                    metrics.remove(0);
185                }
186
187                let mut status = blueprint_status.write().await;
188                status.uptime = start_time.elapsed().as_secs();
189                status.timestamp = SystemTime::now()
190                    .duration_since(UNIX_EPOCH)
191                    .unwrap_or_default()
192                    .as_secs();
193
194                prometheus_collector.update_blueprint_status(&status);
195
196                debug!("Collected metrics");
197            }
198        });
199
200        info!("Started metrics collection");
201        Ok(())
202    }
203
204    /// Collects current system metrics including CPU, memory, and network usage.
205    ///
206    /// This method gathers real-time system metrics using system APIs and formats them
207    /// into a structured `SystemMetrics` object. It includes CPU utilization, memory usage,
208    /// disk activity, and network statistics from the host system.
209    fn collect_system_metrics() -> SystemMetrics {
210        let mut sys = sysinfo::System::new_all();
211        sys.refresh_all();
212
213        let memory_usage = sys.used_memory();
214        let total_memory = sys.total_memory();
215        let cpu_usage = sys.global_cpu_usage();
216
217        // TODO: Implement disk and network metrics collection
218        let disk_usage = 0;
219        let total_disk = 0;
220        let network_rx_bytes = 0;
221        let network_tx_bytes = 0;
222
223        let timestamp = SystemTime::now()
224            .duration_since(UNIX_EPOCH)
225            .unwrap_or_default()
226            .as_secs();
227
228        SystemMetrics {
229            cpu_usage,
230            memory_usage,
231            total_memory,
232            disk_usage,
233            total_disk,
234            network_rx_bytes,
235            network_tx_bytes,
236            timestamp,
237        }
238    }
239
240    /// Records metrics for a successful job execution.
241    ///
242    /// Updates both Prometheus and OpenTelemetry metrics with information about a completed job.
243    /// This includes recording the execution time, incrementing job counters, and updating histograms
244    /// with execution duration data. Job metrics are tagged with service ID, blueprint ID, and job ID
245    /// to enable detailed filtering and analysis.
246    ///
247    /// # Parameters
248    /// * `job_id` - Unique identifier for the executed job
249    /// * `execution_time` - Duration of the job execution in seconds
250    /// * `service_id` - Identifier for the service that executed the job
251    /// * `blueprint_id` - Identifier for the blueprint that executed the job
252    pub fn record_job_execution(
253        &self,
254        job_id: u64,
255        execution_time: f64,
256        service_id: u64,
257        blueprint_id: u64,
258    ) {
259        info!(
260            "Recording job execution (job_id: {}). Incrementing otel_job_executions_counter.",
261            job_id
262        );
263        self.otel_job_executions_counter.add(
264            1,
265            &[
266                KeyValue::new("service_id", service_id.to_string()),
267                KeyValue::new("blueprint_id", blueprint_id.to_string()),
268            ],
269        );
270
271        self.prometheus_collector.record_job_execution(
272            job_id,
273            self.config.service_id,
274            self.config.blueprint_id,
275            execution_time,
276        );
277    }
278
279    /// Records metrics for a failed job execution.
280    ///
281    /// Updates error counters and metrics when a job fails, categorizing the error by type.
282    /// This method enables tracking of error rates and common failure modes across jobs.
283    ///
284    /// # Parameters
285    /// * `job_id` - Unique identifier for the failed job
286    /// * `error_type` - Classification of the error that occurred
287    pub fn record_job_error(&self, job_id: u64, error_type: &str) {
288        self.prometheus_collector.record_job_error(
289            job_id,
290            self.config.service_id,
291            self.config.blueprint_id,
292            error_type,
293        );
294    }
295
296    /// Returns a reference to the `OpenTelemetryExporter`.
297    ///
298    /// Provides access to the underlying `OpenTelemetryExporter` for advanced operations
299    /// such as creating custom meters, recorders, or manually pushing metrics to the
300    /// OpenTelemetry backend.
301    #[must_use]
302    pub fn opentelemetry_exporter(&self) -> Arc<OpenTelemetryExporter> {
303        self.opentelemetry_exporter.clone()
304    }
305
306    /// Returns a reference to the `PrometheusCollector`.
307    ///
308    /// Provides access to the underlying `PrometheusCollector` for advanced operations
309    /// such as registering custom collectors or directly manipulating `Prometheus` metrics.
310    #[must_use]
311    pub fn prometheus_collector(&self) -> Arc<PrometheusCollector> {
312        self.prometheus_collector.clone()
313    }
314
315    /// Returns a clone of the `OpenTelemetry` job executions counter.
316    ///
317    /// This counter tracks the total number of job executions recorded through `OpenTelemetry`.
318    /// It can be used to increment execution counts from external components.
319    #[must_use]
320    pub fn get_otel_job_executions_counter(&self) -> opentelemetry::metrics::Counter<u64> {
321        self.otel_job_executions_counter.clone()
322    }
323
324    /// Returns the shared `Prometheus` registry used for all metrics.
325    ///
326    /// This registry consolidates all `Prometheus` metrics from both direct `Prometheus` collectors
327    /// and `OpenTelemetry` exporters. It's useful for registering additional custom collectors
328    /// or exporting all metrics to external systems.
329    #[must_use]
330    pub fn shared_registry(&self) -> Arc<Registry> {
331        self.shared_registry.clone()
332    }
333
334    /// Forces flush of accumulated OpenTelemetry metrics to their destination.
335    ///
336    /// This method triggers an immediate export of all buffered OpenTelemetry metrics
337    /// rather than waiting for the normal export interval. This is useful during graceful
338    /// shutdown or when immediate metric visibility is required.
339    ///
340    /// # Errors
341    /// Returns an error if the `OpenTelemetryExporter` fails to force flush metrics.
342    pub fn force_flush_otel_metrics(&self) -> crate::error::Result<()> {
343        info!("EnhancedMetricsProvider: Attempting to force flush OpenTelemetry metrics...");
344        match self.opentelemetry_exporter.force_flush() {
345            Ok(()) => {
346                info!("EnhancedMetricsProvider: OpenTelemetry metrics force_flush successful.");
347                Ok(())
348            }
349            Err(err) => {
350                error!(
351                    "EnhancedMetricsProvider: OpenTelemetry metrics force_flush failed: {:?}",
352                    err
353                );
354                Err(crate::error::Error::Metrics(format!(
355                    "OpenTelemetry SDK flush error: {}",
356                    err
357                )))
358            }
359        }
360    }
361}
362
363impl MetricsProvider for EnhancedMetricsProvider {
364    /// Returns the latest collected `SystemMetrics`.
365    async fn get_system_metrics(&self) -> SystemMetrics {
366        self.system_metrics
367            .read()
368            .await
369            .last()
370            .cloned()
371            .unwrap_or_default()
372    }
373
374    /// Returns the latest collected `BlueprintMetrics`.
375    async fn get_blueprint_metrics(&self) -> BlueprintMetrics {
376        self.blueprint_metrics
377            .read()
378            .await
379            .last()
380            .cloned()
381            .unwrap_or_default()
382    }
383
384    /// Returns the current `BlueprintStatus`.
385    async fn get_blueprint_status(&self) -> BlueprintStatus {
386        self.blueprint_status.read().await.clone()
387    }
388
389    /// Returns the historical `SystemMetrics` (up to `max_history`).
390    async fn get_system_metrics_history(&self) -> Vec<SystemMetrics> {
391        self.system_metrics.read().await.clone()
392    }
393
394    /// Returns the historical `BlueprintMetrics` (up to `max_history`).
395    async fn get_blueprint_metrics_history(&self) -> Vec<BlueprintMetrics> {
396        self.blueprint_metrics.read().await.clone()
397    }
398
399    /// Adds a custom metric to the collection.
400    ///
401    /// # Parameters
402    /// * `key` - The key for the custom metric.
403    /// * `value` - The value of the custom metric.
404    async fn add_custom_metric(&self, key: String, value: String) {
405        let pc_key = key.clone();
406        let pc_value = value.clone();
407
408        let mut custom = self.custom_metrics.write().await;
409        custom.insert(key, value);
410
411        self.prometheus_collector
412            .add_custom_metric(pc_key, pc_value)
413            .await;
414    }
415
416    /// Sets the `BlueprintStatus`.
417    ///
418    /// # Parameters
419    /// * `status_code` - The status code to set.
420    /// * `status_message` - An optional status message.
421    async fn set_blueprint_status(&self, status_code: u32, status_message: Option<String>) {
422        let mut status = self.blueprint_status.write().await;
423        status.status_code = status_code;
424        status.status_message = status_message;
425        self.prometheus_collector.update_blueprint_status(&status);
426    }
427
428    /// Updates the last heartbeat timestamp in the `BlueprintStatus`.
429    ///
430    /// # Parameters
431    /// * `timestamp` - The Unix timestamp of the last heartbeat.
432    async fn update_last_heartbeat(&self, timestamp: u64) {
433        let mut status = self.blueprint_status.write().await;
434        status.last_heartbeat = Some(timestamp);
435        self.prometheus_collector.update_blueprint_status(&status);
436    }
437
438    /// Starts the metrics collection process for this provider.
439    ///
440    /// This typically involves spawning background tasks for periodic metric gathering
441    /// and potentially starting servers for metric exposure (e.g., a Prometheus server).
442    ///
443    /// # Errors
444    /// Returns an error if the collection process or any associated services fail to start.
445    async fn start_collection(&self) -> crate::error::Result<()> {
446        let prometheus_server_config = self.config.prometheus_server.clone().unwrap_or_default();
447
448        let server = PrometheusServer::new(
449            prometheus_server_config,
450            Some(self.shared_registry.clone()),
451            Arc::new(self.clone()),
452        )?;
453        server.start(None, None).await?;
454
455        let mut prometheus_server = self.prometheus_server.write().await;
456        *prometheus_server = Some(server);
457
458        let system_metrics = self.system_metrics.clone();
459        let blueprint_metrics = self.blueprint_metrics.clone();
460        let blueprint_status = self.blueprint_status.clone();
461        let custom_metrics = self.custom_metrics.clone();
462        let prometheus_collector = self.prometheus_collector.clone();
463        let start_time = self.start_time;
464        let config = self.config.clone();
465
466        tokio::spawn(async move {
467            let mut interval =
468                tokio::time::interval(Duration::from_secs(config.collection_interval_secs));
469
470            loop {
471                interval.tick().await;
472
473                let sys_metrics = Self::collect_system_metrics();
474
475                prometheus_collector.update_system_metrics(&sys_metrics);
476
477                let mut metrics = system_metrics.write().await;
478                metrics.push(sys_metrics);
479                if metrics.len() > config.max_history {
480                    metrics.remove(0);
481                }
482
483                let mut bp_metrics = BlueprintMetrics::default();
484                let custom = custom_metrics.read().await;
485                bp_metrics.custom_metrics = custom.clone();
486
487                let mut metrics = blueprint_metrics.write().await;
488                metrics.push(bp_metrics);
489                if metrics.len() > config.max_history {
490                    metrics.remove(0);
491                }
492
493                let mut status = blueprint_status.write().await;
494                status.uptime = start_time.elapsed().as_secs();
495                status.timestamp = SystemTime::now()
496                    .duration_since(UNIX_EPOCH)
497                    .unwrap_or_default()
498                    .as_secs();
499
500                prometheus_collector.update_blueprint_status(&status);
501
502                debug!("Collected metrics");
503            }
504        });
505
506        info!("Started metrics collection");
507        Ok(())
508    }
509}