blueprint_qos/
unified_service.rs

1use crate::{
2    QoSConfig,
3    error::{self as qos_error, Result},
4    heartbeat::{HeartbeatConsumer, HeartbeatService},
5    logging::grafana::{
6        CreateDataSourceRequest, Dashboard, GrafanaClient, LokiJsonData, PrometheusJsonData,
7    },
8    logging::loki::init_loki_logging,
9    metrics::{
10        opentelemetry::OpenTelemetryConfig, provider::EnhancedMetricsProvider,
11        service::MetricsService,
12    },
13    servers::{
14        ServerManager, grafana::GrafanaServer, loki::LokiServer, prometheus::PrometheusServer,
15    },
16};
17use blueprint_core::{error, info};
18use std::sync::Arc;
19use tokio::sync::{Mutex, oneshot};
20
21/// Unified Quality of Service (`QoS`) service that integrates heartbeat monitoring, metrics collection,
22/// logging, and dashboard visualization into a single cohesive system.
23///
24/// `QoSService` orchestrates multiple components:
25/// - Heartbeat service for liveness monitoring and reporting
26/// - Metrics collection and exposure via Prometheus
27/// - Log aggregation via Loki
28/// - Dashboard visualization via Grafana
29/// - Server management for the above components
30///
31/// It can be configured to automatically manage server instances or connect to externally managed services.
32pub struct QoSService<C: HeartbeatConsumer + Send + Sync + 'static> {
33    heartbeat_service: Option<Arc<HeartbeatService<C>>>,
34    metrics_service: Option<Arc<MetricsService>>,
35    grafana_client: Option<Arc<GrafanaClient>>,
36    dashboard_url: Option<String>,
37    grafana_server: Option<Arc<GrafanaServer>>,
38    loki_server: Option<Arc<LokiServer>>,
39    prometheus_server: Option<Arc<PrometheusServer>>,
40    completion_tx: Arc<Mutex<Option<oneshot::Sender<Result<()>>>>>,
41    completion_rx: Mutex<Option<tokio::sync::oneshot::Receiver<Result<()>>>>,
42}
43
44impl<C: HeartbeatConsumer + Send + Sync + 'static> QoSService<C> {
45    /// Returns a reference to the heartbeat service if configured.
46    ///
47    /// The `HeartbeatService` is responsible for sending periodic liveness updates
48    /// to the blockchain or other monitoring systems.
49    pub fn heartbeat_service(&self) -> Option<&Arc<HeartbeatService<C>>> {
50        self.heartbeat_service.as_ref()
51    }
52
53    /// Sets the completion sender for this `QoS` service.
54    pub async fn set_completion_sender(&self, sender: oneshot::Sender<Result<()>>) {
55        let mut guard = self.completion_tx.lock().await;
56        if guard.is_some() {
57            error!("Completion sender already set for QoSService, overwriting.");
58        }
59        *guard = Some(sender);
60    }
61
62    /// Common initialization logic for `QoSService`.
63    async fn initialize(
64        config: QoSConfig,
65        heartbeat_consumer: Arc<C>,
66        ws_rpc_endpoint: String,
67        keystore_uri: String,
68        otel_config: Option<OpenTelemetryConfig>,
69    ) -> Result<Self> {
70        let heartbeat_service = config.heartbeat.clone().map(|hc| {
71            let ws_rpc = ws_rpc_endpoint.clone();
72            Arc::new(HeartbeatService::new(
73                hc.clone(),
74                heartbeat_consumer.clone(),
75                ws_rpc,
76                keystore_uri.clone(),
77                hc.service_id,
78                hc.blueprint_id,
79            ))
80        });
81
82        let metrics_service = match (config.metrics.clone(), otel_config) {
83            (Some(mc), Some(oc)) => Some(Arc::new(MetricsService::with_otel_config(mc, &oc)?)),
84            (Some(mc), None) => Some(Arc::new(MetricsService::new(mc)?)),
85            (None, _) => None,
86        };
87
88        if let Some(ms) = &metrics_service {
89            info!("Metrics service is Some, attempting to start collection.");
90            ms.provider().clone().start_collection().await?;
91        }
92
93        if let Some(loki_config) = &config.loki {
94            if let Err(e) = init_loki_logging(loki_config.clone()) {
95                error!("Failed to initialize Loki logging: {}", e);
96            } else {
97                info!("Initialized Loki logging");
98            }
99        }
100
101        let bind_ip = config.docker_bind_ip.clone();
102        let (grafana_server, loki_server, prometheus_server) = if config.manage_servers {
103            let grafana = config
104                .grafana_server
105                .as_ref()
106                .map(|c| GrafanaServer::new(c.clone()))
107                .transpose()?;
108
109            let loki = config
110                .loki_server
111                .as_ref()
112                .map(|c| LokiServer::new(c.clone()))
113                .transpose()?;
114
115            let prometheus = config.prometheus_server.as_ref().map(|c| {
116                PrometheusServer::new(
117                    c.clone(),
118                    Some(
119                        metrics_service
120                            .as_ref()
121                            .ok_or_else(|| qos_error::Error::Generic("Metrics service is required for Prometheus but not configured".to_string()))?
122                            .provider()
123                            .shared_registry()
124                            .clone(),
125                    ),
126                    metrics_service.as_ref().ok_or_else(|| qos_error::Error::Generic("Metrics service is required for Prometheus but not configured".to_string()))?.provider().clone(),
127                )
128            }).transpose()?;
129
130            if let Some(s) = &grafana {
131                info!("Starting Grafana server...");
132                s.start(config.docker_network.as_deref(), bind_ip.clone())
133                    .await
134                    .map_err(|e| {
135                        error!("Failed to start Grafana server: {}", e);
136                        e
137                    })?;
138                info!("Grafana server started successfully: {}", s.url());
139            }
140
141            if let Some(s) = &loki {
142                info!("Starting Loki server...");
143                s.start(config.docker_network.as_deref(), bind_ip.clone())
144                    .await
145                    .map_err(|e| {
146                        error!("Failed to start Loki server: {}", e);
147                        e
148                    })?;
149                info!("Loki server started successfully: {}", s.url());
150            }
151
152            if let Some(s) = &prometheus {
153                info!("Starting Prometheus server...");
154                if let Err(e) = s
155                    .start(config.docker_network.as_deref(), bind_ip.clone())
156                    .await
157                {
158                    error!("Failed to start critical Prometheus server: {}", e);
159                    return Err(e);
160                }
161                info!("Prometheus server started successfully: {}", s.url());
162            }
163
164            (
165                grafana.map(Arc::new),
166                loki.map(Arc::new),
167                prometheus.map(Arc::new),
168            )
169        } else {
170            (None, None, None)
171        };
172
173        let grafana_client = match &grafana_server {
174            Some(server) => {
175                let mut client_config = server.client_config();
176                client_config.loki_config = config.loki.clone();
177                Some(Arc::new(GrafanaClient::new(client_config)))
178            }
179            None => config.grafana.as_ref().map(|user_config| {
180                let mut client_config = user_config.clone();
181                client_config.loki_config = config.loki.clone();
182                Arc::new(GrafanaClient::new(client_config))
183            }),
184        };
185
186        let (tx, rx): (oneshot::Sender<Result<()>>, oneshot::Receiver<Result<()>>) =
187            oneshot::channel();
188        Ok(Self {
189            heartbeat_service,
190            metrics_service,
191            grafana_client,
192            dashboard_url: None,
193            grafana_server,
194            loki_server,
195            prometheus_server,
196            completion_tx: Arc::new(Mutex::new(Some(tx))),
197            completion_rx: Mutex::new(Some(rx)),
198        })
199    }
200
201    /// # Errors
202    ///
203    /// Returns an error if initialization of any underlying service fails.
204    pub async fn new(
205        config: QoSConfig,
206        heartbeat_consumer: Arc<C>,
207        ws_rpc_endpoint: String,
208        keystore_uri: String,
209    ) -> Result<Self> {
210        Self::initialize(
211            config,
212            heartbeat_consumer,
213            ws_rpc_endpoint,
214            keystore_uri,
215            None,
216        )
217        .await
218    }
219
220    /// # Errors
221    ///
222    /// Returns an error if initialization of any underlying service fails.
223    pub async fn with_otel_config(
224        config: QoSConfig,
225        heartbeat_consumer: Arc<C>,
226        ws_rpc_endpoint: String,
227        keystore_uri: String,
228        otel_config: OpenTelemetryConfig,
229    ) -> Result<Self> {
230        Self::initialize(
231            config,
232            heartbeat_consumer,
233            ws_rpc_endpoint,
234            keystore_uri,
235            Some(otel_config),
236        )
237        .await
238    }
239
240    pub fn debug_server_status(&self) {
241        info!("--- QoS Server Status ---");
242        if self.grafana_server.is_some() {
243            info!("Grafana Server: Configured (instance present)");
244        } else {
245            info!("Grafana Server: Not configured");
246        }
247        if self.loki_server.is_some() {
248            info!("Loki Server: Configured (instance present)");
249        } else {
250            info!("Loki Server: Not configured");
251        }
252        if self.prometheus_server.is_some() {
253            info!("Prometheus Server: Configured (instance present)");
254        } else {
255            info!("Prometheus Server: Not configured");
256        }
257        if self.grafana_client.is_some() {
258            info!("Grafana Client: Configured (instance present)");
259        } else {
260            info!("Grafana Client: Not configured");
261        }
262        if self
263            .metrics_service
264            .as_ref()
265            .map(|ms| ms.provider())
266            .is_some()
267        {
268            info!("Metrics Service: Configured (instance present)");
269        } else {
270            info!("Metrics Service: Not configured");
271        }
272        info!("-------------------------");
273    }
274
275    /// Returns a reference to the Grafana API client if configured.
276    ///
277    /// The Grafana client can be used to programmatically create or update dashboards,
278    /// manage data sources, and configure alerts.
279    pub fn grafana_client(&self) -> Option<Arc<GrafanaClient>> {
280        self.grafana_client.clone()
281    }
282
283    /// Returns the URL of the Grafana server if running.
284    ///
285    /// This URL can be used to access the Grafana web interface for viewing dashboards
286    /// and visualizations.
287    pub fn grafana_server_url(&self) -> Option<String> {
288        self.grafana_server.as_ref().map(|server| server.url())
289    }
290
291    /// Returns the URL of the Loki server if configured and running.
292    ///
293    /// This URL can be used to configure log shipping or to query logs directly
294    /// via the Loki API.
295    #[must_use]
296    pub fn loki_server_url(&self) -> Option<String> {
297        self.loki_server.as_ref().map(|s| s.url())
298    }
299
300    /// Returns a reference to the metrics provider if configured.
301    ///
302    /// The `EnhancedMetricsProvider` collects and aggregates system and application metrics
303    /// that can be exposed via Prometheus or queried programmatically.
304    pub fn provider(&self) -> Option<Arc<EnhancedMetricsProvider>> {
305        self.metrics_service.as_ref().map(|s| s.provider())
306    }
307
308    /// Creates a Grafana dashboard for visualizing metrics from the blueprint.
309    ///
310    /// This method:
311    /// 1. Creates or updates required data sources (Prometheus and Loki)
312    /// 2. Creates a dashboard with panels for system metrics, job execution statistics,
313    ///    and log visualization
314    /// 3. Sets up appropriate refresh intervals and time ranges
315    ///
316    /// # Parameters
317    /// * `blueprint_name` - The name of the blueprint to use in dashboard titles
318    ///
319    /// # Errors
320    /// Returns an error if:
321    /// - Creating or updating data sources fails
322    /// - Dashboard creation fails
323    /// - The Grafana client is not configured
324    pub async fn create_dashboard(&mut self, blueprint_name: &str) -> Result<()> {
325        let client = self.grafana_client.as_ref().ok_or(qos_error::Error::Other(
326            "Grafana client not configured".to_string(),
327        ))?;
328
329        let loki_ds = CreateDataSourceRequest {
330            name: "Loki".to_string(),
331            ds_type: "loki".to_string(),
332            uid: Some("loki-blueprint".to_string()),
333            url: self
334                .loki_server
335                .as_ref()
336                .map_or_else(|| "http://loki:3100".to_string(), |s| s.url()),
337            access: "proxy".to_string(),
338            is_default: Some(false),
339            json_data: Some(serde_json::to_value(LokiJsonData {
340                max_lines: Some(1000),
341            })?),
342        };
343        client.create_or_update_datasource(loki_ds).await?;
344
345        let prometheus_url = self
346            .grafana_client
347            .as_ref()
348            .and_then(|gc| gc.prometheus_datasource_url())
349            .cloned()
350            .or_else(|| self.prometheus_server.as_ref().map(|s| s.url()))
351            .ok_or_else(|| {
352                qos_error::Error::Other(
353                    "Prometheus datasource URL is not configured and no managed server is available."
354                        .to_string(),
355                )
356            })?;
357
358        let prometheus_ds = CreateDataSourceRequest {
359            name: "Prometheus".to_string(),
360            ds_type: "prometheus".to_string(),
361            uid: Some("prometheus_blueprint_default".to_string()),
362            url: prometheus_url,
363            access: "proxy".to_string(),
364            is_default: Some(true),
365            json_data: Some(serde_json::to_value(PrometheusJsonData {
366                http_method: "GET".to_string(),
367                timeout: 30,
368            })?),
369        };
370        let created_prometheus_ds = client.create_or_update_datasource(prometheus_ds).await?;
371        info!(
372            "Successfully provisioned Prometheus datasource '{}' with UID '{}'. Checking health...",
373            created_prometheus_ds.name, created_prometheus_ds.datasource.uid
374        );
375        match client
376            .check_datasource_health(&created_prometheus_ds.datasource.uid)
377            .await
378        {
379            Ok(health) if health.status.to_lowercase() == "ok" => {
380                info!(
381                    "Prometheus datasource '{}' (UID: {}) is healthy: {}",
382                    created_prometheus_ds.name,
383                    created_prometheus_ds.datasource.uid,
384                    health.message
385                );
386            }
387            Ok(health) => {
388                error!(
389                    "Prometheus datasource '{}' (UID: {}) is not healthy: Status: {}, Message: {}",
390                    created_prometheus_ds.name,
391                    created_prometheus_ds.datasource.uid,
392                    health.status,
393                    health.message
394                );
395                return Err(qos_error::Error::GrafanaApi(format!(
396                    "Datasource {} (UID: {}) reported unhealthy: {}",
397                    created_prometheus_ds.name,
398                    created_prometheus_ds.datasource.uid,
399                    health.message
400                )));
401            }
402            Err(e) => {
403                error!(
404                    "Failed to check health for Prometheus datasource '{}' (UID: {}): {}",
405                    created_prometheus_ds.name, created_prometheus_ds.datasource.uid, e
406                );
407                return Err(e);
408            }
409        }
410
411        const DASHBOARD_TEMPLATE: &str = include_str!("../config/grafana_dashboard.json");
412        let mut dashboard: Dashboard = serde_json::from_str(DASHBOARD_TEMPLATE)?;
413        dashboard.title = format!("{} Dashboard", blueprint_name);
414
415        let dashboard_url = client
416            .create_dashboard(dashboard, None, "Provisioning Blueprint Dashboard")
417            .await?;
418        self.dashboard_url = Some(dashboard_url);
419
420        Ok(())
421    }
422
423    /// Records metrics about a job execution for monitoring and visualization.
424    ///
425    /// This method tracks job execution frequency and performance metrics, which are
426    /// exposed via Prometheus and can be visualized in Grafana dashboards.
427    ///
428    /// # Parameters
429    /// * `job_id` - Unique identifier of the executed job
430    /// * `execution_time` - Time taken to execute the job in seconds
431    /// * `service_id` - ID of the service that executed the job
432    /// * `blueprint_id` - ID of the blueprint that contains the job
433    pub fn record_job_execution(
434        &self,
435        job_id: u64,
436        execution_time: f64,
437        service_id: u64,
438        blueprint_id: u64,
439    ) {
440        if let Some(service) = self.metrics_service.as_ref() {
441            service.provider().record_job_execution(
442                job_id,
443                execution_time,
444                service_id,
445                blueprint_id,
446            );
447        }
448    }
449
450    /// Records metrics about job execution errors for monitoring and alerting.
451    ///
452    /// This method tracks job failures by type, which can be used for alerting
453    /// and diagnostic purposes in Grafana dashboards.
454    ///
455    /// # Parameters
456    /// * `job_id` - Unique identifier of the job that encountered an error
457    /// * `error_type` - Classification or description of the error that occurred
458    pub fn record_job_error(&self, job_id: u64, error_type: &str) {
459        if let Some(service) = self.metrics_service.as_ref() {
460            service.provider().record_job_error(job_id, error_type);
461        }
462    }
463
464    /// Waits for the `QoS` service to complete its operation.
465    ///
466    /// This method blocks until a completion signal is received, which typically
467    /// happens when the service is being shut down gracefully. It's useful for
468    /// coordinating shutdown of the `QoS` service with the rest of the application.
469    ///
470    /// # Errors
471    /// Returns an error if the completion signal receiver is dropped prematurely,
472    /// indicating an unexpected termination of the service.
473    pub async fn wait_for_completion(&self) -> Result<()> {
474        let rx_option = {
475            let mut guard = self.completion_rx.lock().await;
476            guard.take()
477        };
478
479        if let Some(rx) = rx_option {
480            match rx.await {
481                Ok(inner_result) => inner_result,
482                Err(_recv_error) => Err(qos_error::Error::Other(
483                    "Completion signal receiver dropped before completion".to_string(),
484                )),
485            }
486        } else {
487            Err(qos_error::Error::Other(
488                "wait_for_completion can only be called once".to_string(),
489            ))
490        }
491    }
492
493    /// Initiates a graceful shutdown of the `QoS` service and all managed components.
494    ///
495    /// This method stops all server instances (Grafana, Prometheus, Loki) if they
496    /// were started by this service, and signals completion to any waiting tasks.
497    ///
498    /// # Errors
499    /// This function is designed to return errors from shutdown operations,
500    /// though the current implementation always returns Ok(()).
501    pub fn shutdown(&self) -> Result<()> {
502        info!("QoSService shutting down...");
503        info!("QoSService shutdown complete.");
504        Ok(())
505    }
506}
507
508impl<C: HeartbeatConsumer + Send + Sync + 'static> Drop for QoSService<C> {
509    fn drop(&mut self) {
510        let flush_result = self.metrics_service.as_ref().map_or(Ok(()), |ms| {
511            ms.provider().force_flush_otel_metrics().map_err(|e| {
512                error!("Failed to flush OpenTelemetry metrics on drop: {}", e);
513                qos_error::Error::Metrics(format!("OpenTelemetry flush failed on drop: {}", e))
514            })
515        });
516
517        match self.completion_tx.try_lock() {
518            Ok(mut guard) => {
519                if let Some(tx) = guard.take() {
520                    if tx.send(flush_result).is_err() {
521                        info!(
522                            "Attempted to send completion signal on drop, but receiver was already gone."
523                        );
524                    }
525                }
526            }
527            Err(_) => {
528                error!(
529                    "Failed to acquire lock for completion_tx during drop (lock was contended). Signal not sent."
530                );
531            }
532        }
533    }
534}