1use crate::{
2 QoSConfig,
3 error::{self as qos_error, Result},
4 heartbeat::{HeartbeatConsumer, HeartbeatService},
5 logging::grafana::{
6 CreateDataSourceRequest, Dashboard, GrafanaClient, LokiJsonData, PrometheusJsonData,
7 },
8 logging::loki::init_loki_logging,
9 metrics::{
10 opentelemetry::OpenTelemetryConfig, provider::EnhancedMetricsProvider,
11 service::MetricsService,
12 },
13 servers::{
14 ServerManager, grafana::GrafanaServer, loki::LokiServer, prometheus::PrometheusServer,
15 },
16};
17use blueprint_core::{error, info};
18use std::sync::Arc;
19use tokio::sync::{Mutex, oneshot};
20
21pub struct QoSService<C: HeartbeatConsumer + Send + Sync + 'static> {
33 heartbeat_service: Option<Arc<HeartbeatService<C>>>,
34 metrics_service: Option<Arc<MetricsService>>,
35 grafana_client: Option<Arc<GrafanaClient>>,
36 dashboard_url: Option<String>,
37 grafana_server: Option<Arc<GrafanaServer>>,
38 loki_server: Option<Arc<LokiServer>>,
39 prometheus_server: Option<Arc<PrometheusServer>>,
40 completion_tx: Arc<Mutex<Option<oneshot::Sender<Result<()>>>>>,
41 completion_rx: Mutex<Option<tokio::sync::oneshot::Receiver<Result<()>>>>,
42}
43
44impl<C: HeartbeatConsumer + Send + Sync + 'static> QoSService<C> {
45 pub fn heartbeat_service(&self) -> Option<&Arc<HeartbeatService<C>>> {
50 self.heartbeat_service.as_ref()
51 }
52
53 pub async fn set_completion_sender(&self, sender: oneshot::Sender<Result<()>>) {
55 let mut guard = self.completion_tx.lock().await;
56 if guard.is_some() {
57 error!("Completion sender already set for QoSService, overwriting.");
58 }
59 *guard = Some(sender);
60 }
61
62 async fn initialize(
64 config: QoSConfig,
65 heartbeat_consumer: Arc<C>,
66 ws_rpc_endpoint: String,
67 keystore_uri: String,
68 otel_config: Option<OpenTelemetryConfig>,
69 ) -> Result<Self> {
70 let heartbeat_service = config.heartbeat.clone().map(|hc| {
71 let ws_rpc = ws_rpc_endpoint.clone();
72 Arc::new(HeartbeatService::new(
73 hc.clone(),
74 heartbeat_consumer.clone(),
75 ws_rpc,
76 keystore_uri.clone(),
77 hc.service_id,
78 hc.blueprint_id,
79 ))
80 });
81
82 let metrics_service = match (config.metrics.clone(), otel_config) {
83 (Some(mc), Some(oc)) => Some(Arc::new(MetricsService::with_otel_config(mc, &oc)?)),
84 (Some(mc), None) => Some(Arc::new(MetricsService::new(mc)?)),
85 (None, _) => None,
86 };
87
88 if let Some(ms) = &metrics_service {
89 info!("Metrics service is Some, attempting to start collection.");
90 ms.provider().clone().start_collection().await?;
91 }
92
93 if let Some(loki_config) = &config.loki {
94 if let Err(e) = init_loki_logging(loki_config.clone()) {
95 error!("Failed to initialize Loki logging: {}", e);
96 } else {
97 info!("Initialized Loki logging");
98 }
99 }
100
101 let bind_ip = config.docker_bind_ip.clone();
102 let (grafana_server, loki_server, prometheus_server) = if config.manage_servers {
103 let grafana = config
104 .grafana_server
105 .as_ref()
106 .map(|c| GrafanaServer::new(c.clone()))
107 .transpose()?;
108
109 let loki = config
110 .loki_server
111 .as_ref()
112 .map(|c| LokiServer::new(c.clone()))
113 .transpose()?;
114
115 let prometheus = config.prometheus_server.as_ref().map(|c| {
116 PrometheusServer::new(
117 c.clone(),
118 Some(
119 metrics_service
120 .as_ref()
121 .ok_or_else(|| qos_error::Error::Generic("Metrics service is required for Prometheus but not configured".to_string()))?
122 .provider()
123 .shared_registry()
124 .clone(),
125 ),
126 metrics_service.as_ref().ok_or_else(|| qos_error::Error::Generic("Metrics service is required for Prometheus but not configured".to_string()))?.provider().clone(),
127 )
128 }).transpose()?;
129
130 if let Some(s) = &grafana {
131 info!("Starting Grafana server...");
132 s.start(config.docker_network.as_deref(), bind_ip.clone())
133 .await
134 .map_err(|e| {
135 error!("Failed to start Grafana server: {}", e);
136 e
137 })?;
138 info!("Grafana server started successfully: {}", s.url());
139 }
140
141 if let Some(s) = &loki {
142 info!("Starting Loki server...");
143 s.start(config.docker_network.as_deref(), bind_ip.clone())
144 .await
145 .map_err(|e| {
146 error!("Failed to start Loki server: {}", e);
147 e
148 })?;
149 info!("Loki server started successfully: {}", s.url());
150 }
151
152 if let Some(s) = &prometheus {
153 info!("Starting Prometheus server...");
154 if let Err(e) = s
155 .start(config.docker_network.as_deref(), bind_ip.clone())
156 .await
157 {
158 error!("Failed to start critical Prometheus server: {}", e);
159 return Err(e);
160 }
161 info!("Prometheus server started successfully: {}", s.url());
162 }
163
164 (
165 grafana.map(Arc::new),
166 loki.map(Arc::new),
167 prometheus.map(Arc::new),
168 )
169 } else {
170 (None, None, None)
171 };
172
173 let grafana_client = match &grafana_server {
174 Some(server) => {
175 let mut client_config = server.client_config();
176 client_config.loki_config = config.loki.clone();
177 Some(Arc::new(GrafanaClient::new(client_config)))
178 }
179 None => config.grafana.as_ref().map(|user_config| {
180 let mut client_config = user_config.clone();
181 client_config.loki_config = config.loki.clone();
182 Arc::new(GrafanaClient::new(client_config))
183 }),
184 };
185
186 let (tx, rx): (oneshot::Sender<Result<()>>, oneshot::Receiver<Result<()>>) =
187 oneshot::channel();
188 Ok(Self {
189 heartbeat_service,
190 metrics_service,
191 grafana_client,
192 dashboard_url: None,
193 grafana_server,
194 loki_server,
195 prometheus_server,
196 completion_tx: Arc::new(Mutex::new(Some(tx))),
197 completion_rx: Mutex::new(Some(rx)),
198 })
199 }
200
201 pub async fn new(
205 config: QoSConfig,
206 heartbeat_consumer: Arc<C>,
207 ws_rpc_endpoint: String,
208 keystore_uri: String,
209 ) -> Result<Self> {
210 Self::initialize(
211 config,
212 heartbeat_consumer,
213 ws_rpc_endpoint,
214 keystore_uri,
215 None,
216 )
217 .await
218 }
219
220 pub async fn with_otel_config(
224 config: QoSConfig,
225 heartbeat_consumer: Arc<C>,
226 ws_rpc_endpoint: String,
227 keystore_uri: String,
228 otel_config: OpenTelemetryConfig,
229 ) -> Result<Self> {
230 Self::initialize(
231 config,
232 heartbeat_consumer,
233 ws_rpc_endpoint,
234 keystore_uri,
235 Some(otel_config),
236 )
237 .await
238 }
239
240 pub fn debug_server_status(&self) {
241 info!("--- QoS Server Status ---");
242 if self.grafana_server.is_some() {
243 info!("Grafana Server: Configured (instance present)");
244 } else {
245 info!("Grafana Server: Not configured");
246 }
247 if self.loki_server.is_some() {
248 info!("Loki Server: Configured (instance present)");
249 } else {
250 info!("Loki Server: Not configured");
251 }
252 if self.prometheus_server.is_some() {
253 info!("Prometheus Server: Configured (instance present)");
254 } else {
255 info!("Prometheus Server: Not configured");
256 }
257 if self.grafana_client.is_some() {
258 info!("Grafana Client: Configured (instance present)");
259 } else {
260 info!("Grafana Client: Not configured");
261 }
262 if self
263 .metrics_service
264 .as_ref()
265 .map(|ms| ms.provider())
266 .is_some()
267 {
268 info!("Metrics Service: Configured (instance present)");
269 } else {
270 info!("Metrics Service: Not configured");
271 }
272 info!("-------------------------");
273 }
274
275 pub fn grafana_client(&self) -> Option<Arc<GrafanaClient>> {
280 self.grafana_client.clone()
281 }
282
283 pub fn grafana_server_url(&self) -> Option<String> {
288 self.grafana_server.as_ref().map(|server| server.url())
289 }
290
291 #[must_use]
296 pub fn loki_server_url(&self) -> Option<String> {
297 self.loki_server.as_ref().map(|s| s.url())
298 }
299
300 pub fn provider(&self) -> Option<Arc<EnhancedMetricsProvider>> {
305 self.metrics_service.as_ref().map(|s| s.provider())
306 }
307
308 pub async fn create_dashboard(&mut self, blueprint_name: &str) -> Result<()> {
325 let client = self.grafana_client.as_ref().ok_or(qos_error::Error::Other(
326 "Grafana client not configured".to_string(),
327 ))?;
328
329 let loki_ds = CreateDataSourceRequest {
330 name: "Loki".to_string(),
331 ds_type: "loki".to_string(),
332 uid: Some("loki-blueprint".to_string()),
333 url: self
334 .loki_server
335 .as_ref()
336 .map_or_else(|| "http://loki:3100".to_string(), |s| s.url()),
337 access: "proxy".to_string(),
338 is_default: Some(false),
339 json_data: Some(serde_json::to_value(LokiJsonData {
340 max_lines: Some(1000),
341 })?),
342 };
343 client.create_or_update_datasource(loki_ds).await?;
344
345 let prometheus_url = self
346 .grafana_client
347 .as_ref()
348 .and_then(|gc| gc.prometheus_datasource_url())
349 .cloned()
350 .or_else(|| self.prometheus_server.as_ref().map(|s| s.url()))
351 .ok_or_else(|| {
352 qos_error::Error::Other(
353 "Prometheus datasource URL is not configured and no managed server is available."
354 .to_string(),
355 )
356 })?;
357
358 let prometheus_ds = CreateDataSourceRequest {
359 name: "Prometheus".to_string(),
360 ds_type: "prometheus".to_string(),
361 uid: Some("prometheus_blueprint_default".to_string()),
362 url: prometheus_url,
363 access: "proxy".to_string(),
364 is_default: Some(true),
365 json_data: Some(serde_json::to_value(PrometheusJsonData {
366 http_method: "GET".to_string(),
367 timeout: 30,
368 })?),
369 };
370 let created_prometheus_ds = client.create_or_update_datasource(prometheus_ds).await?;
371 info!(
372 "Successfully provisioned Prometheus datasource '{}' with UID '{}'. Checking health...",
373 created_prometheus_ds.name, created_prometheus_ds.datasource.uid
374 );
375 match client
376 .check_datasource_health(&created_prometheus_ds.datasource.uid)
377 .await
378 {
379 Ok(health) if health.status.to_lowercase() == "ok" => {
380 info!(
381 "Prometheus datasource '{}' (UID: {}) is healthy: {}",
382 created_prometheus_ds.name,
383 created_prometheus_ds.datasource.uid,
384 health.message
385 );
386 }
387 Ok(health) => {
388 error!(
389 "Prometheus datasource '{}' (UID: {}) is not healthy: Status: {}, Message: {}",
390 created_prometheus_ds.name,
391 created_prometheus_ds.datasource.uid,
392 health.status,
393 health.message
394 );
395 return Err(qos_error::Error::GrafanaApi(format!(
396 "Datasource {} (UID: {}) reported unhealthy: {}",
397 created_prometheus_ds.name,
398 created_prometheus_ds.datasource.uid,
399 health.message
400 )));
401 }
402 Err(e) => {
403 error!(
404 "Failed to check health for Prometheus datasource '{}' (UID: {}): {}",
405 created_prometheus_ds.name, created_prometheus_ds.datasource.uid, e
406 );
407 return Err(e);
408 }
409 }
410
411 const DASHBOARD_TEMPLATE: &str = include_str!("../config/grafana_dashboard.json");
412 let mut dashboard: Dashboard = serde_json::from_str(DASHBOARD_TEMPLATE)?;
413 dashboard.title = format!("{} Dashboard", blueprint_name);
414
415 let dashboard_url = client
416 .create_dashboard(dashboard, None, "Provisioning Blueprint Dashboard")
417 .await?;
418 self.dashboard_url = Some(dashboard_url);
419
420 Ok(())
421 }
422
423 pub fn record_job_execution(
434 &self,
435 job_id: u64,
436 execution_time: f64,
437 service_id: u64,
438 blueprint_id: u64,
439 ) {
440 if let Some(service) = self.metrics_service.as_ref() {
441 service.provider().record_job_execution(
442 job_id,
443 execution_time,
444 service_id,
445 blueprint_id,
446 );
447 }
448 }
449
450 pub fn record_job_error(&self, job_id: u64, error_type: &str) {
459 if let Some(service) = self.metrics_service.as_ref() {
460 service.provider().record_job_error(job_id, error_type);
461 }
462 }
463
464 pub async fn wait_for_completion(&self) -> Result<()> {
474 let rx_option = {
475 let mut guard = self.completion_rx.lock().await;
476 guard.take()
477 };
478
479 if let Some(rx) = rx_option {
480 match rx.await {
481 Ok(inner_result) => inner_result,
482 Err(_recv_error) => Err(qos_error::Error::Other(
483 "Completion signal receiver dropped before completion".to_string(),
484 )),
485 }
486 } else {
487 Err(qos_error::Error::Other(
488 "wait_for_completion can only be called once".to_string(),
489 ))
490 }
491 }
492
493 pub fn shutdown(&self) -> Result<()> {
502 info!("QoSService shutting down...");
503 info!("QoSService shutdown complete.");
504 Ok(())
505 }
506}
507
508impl<C: HeartbeatConsumer + Send + Sync + 'static> Drop for QoSService<C> {
509 fn drop(&mut self) {
510 let flush_result = self.metrics_service.as_ref().map_or(Ok(()), |ms| {
511 ms.provider().force_flush_otel_metrics().map_err(|e| {
512 error!("Failed to flush OpenTelemetry metrics on drop: {}", e);
513 qos_error::Error::Metrics(format!("OpenTelemetry flush failed on drop: {}", e))
514 })
515 });
516
517 match self.completion_tx.try_lock() {
518 Ok(mut guard) => {
519 if let Some(tx) = guard.take() {
520 if tx.send(flush_result).is_err() {
521 info!(
522 "Attempted to send completion signal on drop, but receiver was already gone."
523 );
524 }
525 }
526 }
527 Err(_) => {
528 error!(
529 "Failed to acquire lock for completion_tx during drop (lock was contended). Signal not sent."
530 );
531 }
532 }
533 }
534}