blueprint_qos/servers/prometheus.rs
1use blueprint_core::{error, info, warn};
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5use tokio::time::sleep;
6
7use crate::error::Result;
8use crate::metrics::EnhancedMetricsProvider;
9use crate::metrics::prometheus::server::PrometheusServer as PrometheusMetricsServer;
10use crate::servers::ServerManager;
11use crate::servers::common::DockerManager;
12
13/// Configuration settings for a Prometheus metrics server.
14///
15/// This struct defines all the parameters needed to set up and run a Prometheus server,
16/// either as a Docker container or as an embedded server within the application.
17#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
18pub struct PrometheusServerConfig {
19 /// The port to bind the Prometheus server to
20 pub port: u16,
21
22 /// The host to bind the Prometheus server to
23 pub host: String,
24
25 /// Whether to use Docker for the Prometheus server
26 pub use_docker: bool,
27
28 /// The Docker image to use for the Prometheus server
29 pub docker_image: String,
30
31 /// The Docker container name to use for the Prometheus server
32 pub docker_container_name: String,
33
34 /// The path to the Prometheus configuration file
35 pub config_path: Option<String>,
36
37 /// The path to mount as the Prometheus data directory
38 pub data_path: Option<String>,
39}
40
41// Default values for PrometheusServerConfig
42const DEFAULT_PROMETHEUS_PORT: u16 = 9090;
43const DEFAULT_PROMETHEUS_HOST: &str = "0.0.0.0";
44const DEFAULT_PROMETHEUS_DOCKER_IMAGE: &str = "prom/prometheus:latest";
45const DEFAULT_PROMETHEUS_CONTAINER_NAME: &str = "blueprint-prometheus";
46const PROMETHEUS_DOCKER_HEALTH_TIMEOUT_SECS: u64 = 120;
47
48impl Default for PrometheusServerConfig {
49 fn default() -> Self {
50 Self {
51 port: DEFAULT_PROMETHEUS_PORT,
52 host: DEFAULT_PROMETHEUS_HOST.to_string(),
53 use_docker: false,
54 docker_image: DEFAULT_PROMETHEUS_DOCKER_IMAGE.to_string(),
55 docker_container_name: DEFAULT_PROMETHEUS_CONTAINER_NAME.to_string(),
56 config_path: None,
57 data_path: None,
58 }
59 }
60}
61
62/// Manager for a Prometheus metrics server instance.
63///
64/// This struct handles the lifecycle of a Prometheus server, which can either run
65/// as a Docker container or as an embedded server within the application process.
66/// It provides methods for starting, stopping, and monitoring the server.
67#[derive(Clone)]
68pub struct PrometheusServer {
69 /// The configuration for the Prometheus server
70 config: PrometheusServerConfig,
71
72 /// The Docker manager for the Prometheus server (if using Docker)
73 docker_manager: DockerManager,
74
75 /// The container ID for the Prometheus server (if using Docker)
76 container_id: Arc<Mutex<Option<String>>>,
77
78 /// The embedded Prometheus server (if not using Docker)
79 embedded_server: Arc<Mutex<Option<PrometheusMetricsServer>>>,
80
81 /// The metrics registry provided by `EnhancedMetricsProvider` (if not using Docker)
82 metrics_registry: Option<Arc<prometheus::Registry>>,
83
84 /// The enhanced metrics provider, used to force flush OTEL metrics on scrape
85 enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
86}
87
88impl PrometheusServer {
89 /// Creates a new Prometheus server manager with the provided configuration and metrics registry.
90 ///
91 /// This constructor prepares the Prometheus server infrastructure but does not start the server.
92 /// The actual server (Docker container or embedded) is started when `start()` is called.
93 ///
94 /// # Parameters
95 /// * `config` - The configuration settings for the Prometheus server
96 /// * `metrics_registry` - Optional Prometheus registry for the embedded server mode
97 /// * `enhanced_metrics_provider` - Provider that generates metrics data
98 ///
99 /// # Errors
100 /// Returns an error if the Docker manager connection fails to initialize
101 pub fn new(
102 config: PrometheusServerConfig,
103 metrics_registry: Option<Arc<prometheus::Registry>>,
104 enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
105 ) -> Result<Self> {
106 Ok(Self {
107 config,
108 docker_manager: DockerManager::new()
109 .map_err(|e| crate::error::Error::DockerConnection(e.to_string()))?,
110 container_id: Arc::new(Mutex::new(None)),
111 embedded_server: Arc::new(Mutex::new(None)),
112 metrics_registry,
113 enhanced_metrics_provider,
114 })
115 }
116
117 /// Creates and configures a new Docker container for the Prometheus server.
118 ///
119 /// Sets up a new container with appropriate port mappings, volume mounts, and configuration
120 /// based on the server settings. This does not start the container, only creates it.
121 ///
122 /// # Parameters
123 /// * `network` - Optional Docker network to attach the container to
124 ///
125 /// # Errors
126 /// Returns an error if the Docker API fails to create the container
127 ///
128 /// # Panics
129 /// Panics if mutex locks cannot be acquired
130 pub async fn create_docker_container(&self) -> Result<()> {
131 let env_vars = HashMap::new();
132
133 let mut ports = HashMap::new();
134 ports.insert(self.config.port.to_string(), self.config.port.to_string());
135
136 let mut volumes = HashMap::new();
137 if let Some(config_path) = &self.config.config_path {
138 volumes.insert(
139 config_path.clone(),
140 "/etc/prometheus/prometheus.yml".to_string(),
141 );
142 }
143
144 if let Some(data_path) = &self.config.data_path {
145 volumes.insert(data_path.clone(), "/prometheus".to_string());
146 }
147
148 let new_container_id_result = self
149 .docker_manager
150 .run_container(
151 &self.config.docker_image,
152 &self.config.docker_container_name,
153 env_vars,
154 ports,
155 volumes,
156 None,
157 None,
158 None,
159 None,
160 )
161 .await;
162
163 match new_container_id_result {
164 Ok(id) => {
165 info!(
166 "PrometheusServer::start: docker_manager.run_container succeeded. Raw new_container_id: '{}'",
167 id
168 );
169 if id.trim().is_empty() {
170 error!(
171 "PrometheusServer::start: docker_manager.run_container returned an EMPTY string for container ID."
172 );
173 return Err(crate::error::Error::Other(
174 "Docker manager returned empty container ID for Prometheus".to_string(),
175 ));
176 }
177 let mut id_guard = self.container_id.lock().unwrap();
178 *id_guard = Some(id.clone());
179 info!(
180 "PrometheusServer::start: Stored new container_id into self.container_id. Current self.container_id: {:?}",
181 *id_guard
182 );
183 }
184 Err(e) => {
185 error!(
186 "PrometheusServer::start: docker_manager.run_container FAILED: {}",
187 e
188 );
189 return Err(e);
190 }
191 }
192
193 Ok(())
194 }
195
196 /// Returns a reference to the metrics registry used by the embedded Prometheus server.
197 ///
198 /// This registry contains all the metrics that are exposed by the embedded Prometheus server.
199 /// Returns None if no registry was provided during initialization.
200 #[must_use]
201 pub fn registry(&self) -> Option<Arc<prometheus::Registry>> {
202 self.metrics_registry.clone()
203 }
204
205 /// Returns the Docker container ID if the server is running in Docker mode.
206 ///
207 /// This identifier can be used to reference the specific Prometheus container for
208 /// management operations via Docker API. The container ID is only available when the
209 /// server is configured to run in Docker mode and after the container has been created.
210 ///
211 /// # Returns
212 /// * `Some(String)` - Container ID if the server is using Docker and a container has been created
213 /// * `None` - If the server is not using Docker, the container hasn't been created yet,
214 /// or if there was an error acquiring the lock on the container ID
215 #[must_use]
216 pub fn container_id(&self) -> Option<String> {
217 self.container_id.lock().map(|id| id.clone()).ok()?
218 }
219
220 /// Checks if this server is configured to use Docker.
221 ///
222 /// The Prometheus server can operate in two modes:
223 /// 1. Docker mode: Runs Prometheus in a separate Docker container, providing isolation
224 /// and easier management but requiring Docker to be available
225 /// 2. Embedded mode: Runs a Prometheus-compatible metrics HTTP server directly within
226 /// the application process, with no external dependencies
227 ///
228 /// # Returns
229 /// * `true` - If the server is configured to use Docker
230 /// * `false` - If the server is using the embedded mode
231 #[must_use]
232 pub fn is_docker_based(&self) -> bool {
233 self.config.use_docker
234 }
235}
236
237impl ServerManager for PrometheusServer {
238 /// Starts the Prometheus server in either Docker or embedded mode.
239 ///
240 /// * **`Docker`** – Creates (or reuses) a container, mounts configuration/data volumes,
241 /// optionally connects it to a network, and waits for the container health-check.
242 /// * **`Embedded`** – Binds an in-process HTTP server to `host:port` and serves the
243 /// `/metrics` endpoint backed by the supplied registry.
244 ///
245 /// The call blocks until the server is ready.
246 ///
247 /// # Parameters
248 /// * `network` – Optional Docker network name (`Docker` mode only).
249 ///
250 /// # Errors
251 /// Returns an error if the container or server fails to start, health-check fails, the
252 /// port is already in use, or required configuration is missing.
253 ///
254 ///
255 /// * `network` - Optional Docker network name to connect the container to. Only used in `Docker` mode.
256 /// When provided, the container will be connected to this network to allow service discovery
257 /// between services (e.g., Prometheus to scrape metrics from other containers).
258 ///
259 /// # Errors
260 /// Returns an error if:
261 /// * Docker container creation or startup fails (permission issues, Docker daemon not running, image not found)
262 /// * Port binding fails for the embedded server (port already in use or insufficient permissions)
263 /// * Server startup times out or fails for any other reason
264 /// * Configuration file generation fails or specified paths are invalid (Docker mode only)
265 /// * Health check fails for Docker container
266 /// * No metrics registry is provided for embedded server mode
267 async fn start(&self, network: Option<&str>, bind_ip: Option<String>) -> Result<()> {
268 let mut current_container_id_val: Option<String> = None;
269 if self.config.use_docker {
270 info!(
271 "PrometheusServer::start: Attempting to run Docker container '{}' with name '{}' on network {:?}. Ports: {:?}, Config mount: {:?} -> /etc/prometheus/prometheus.yml",
272 self.config.docker_image,
273 self.config.docker_container_name,
274 network,
275 self.config.port,
276 self.config.config_path
277 );
278 let mut perform_health_check = true;
279
280 let initial_id_check = self.container_id.lock().unwrap().clone();
281
282 if let Some(existing_id) = initial_id_check {
283 info!(
284 "PrometheusServer::start: Found existing container_id: {}. Checking if it's running.",
285 existing_id
286 );
287 let is_running = self
288 .docker_manager
289 .is_container_running(&existing_id)
290 .await?;
291 if is_running {
292 info!(
293 "PrometheusServer::start: Container {} is already running.",
294 existing_id
295 );
296 current_container_id_val = Some(existing_id);
297 perform_health_check = false;
298 } else {
299 warn!(
300 "PrometheusServer::start: Container {} was found but is not running. Attempting to remove and recreate.",
301 existing_id
302 );
303 if let Err(e) = self
304 .docker_manager
305 .stop_and_remove_container(&existing_id, &self.config.docker_container_name)
306 .await
307 {
308 warn!(
309 "PrometheusServer::start: Failed to remove non-running container {}: {}. Proceeding to create a new one.",
310 existing_id, e
311 );
312 }
313 // Reset container ID to indicate we need to create a new one
314 current_container_id_val = None;
315 *self.container_id.lock().unwrap() = None;
316 }
317 }
318
319 if current_container_id_val.is_none() {
320 info!(
321 "PrometheusServer::start: No existing container_id found or old one was not running. Creating new container."
322 );
323 let mut ports = std::collections::HashMap::new();
324 ports.insert("9090/tcp".to_string(), self.config.port.to_string());
325
326 let mut volumes = std::collections::HashMap::new();
327 if let Some(config_host_path) = &self.config.config_path {
328 volumes.insert(
329 config_host_path.clone(),
330 "/etc/prometheus/prometheus.yml".to_string(),
331 );
332 }
333 if let Some(data_host_path) = &self.config.data_path {
334 volumes.insert(data_host_path.clone(), "/prometheus".to_string());
335 }
336
337 let extra_hosts = vec!["host.docker.internal:host-gateway".to_string()];
338
339 let new_id_result = self
340 .docker_manager
341 .run_container(
342 &self.config.docker_image,
343 &self.config.docker_container_name,
344 std::collections::HashMap::new(), // env_vars
345 ports,
346 volumes,
347 None, // cmd
348 Some(extra_hosts),
349 None, // health_check_cmd
350 bind_ip,
351 )
352 .await;
353
354 match new_id_result {
355 Ok(id) => {
356 if id.trim().is_empty() {
357 error!(
358 "PrometheusServer::start: Docker manager returned an EMPTY string for container ID."
359 );
360 return Err(crate::error::Error::Other(
361 "Docker manager returned empty container ID for Prometheus"
362 .to_string(),
363 ));
364 }
365 info!(
366 "PrometheusServer::start: Successfully created container with ID: {}",
367 id
368 );
369 current_container_id_val = Some(id.clone());
370 }
371 Err(e) => {
372 error!(
373 "PrometheusServer::start: Failed to run Prometheus container: {}",
374 e
375 );
376 return Err(e);
377 }
378 }
379 }
380
381 let final_id_for_connection_and_health_check =
382 current_container_id_val.clone().ok_or_else(|| {
383 crate::error::Error::Other(
384 "Prometheus container ID unexpectedly None after creation/check logic"
385 .to_string(),
386 )
387 })?;
388
389 if let Some(net) = network {
390 info!(
391 "Connecting Prometheus container {} ({}) to network {}",
392 &self.config.docker_container_name,
393 final_id_for_connection_and_health_check,
394 net
395 );
396 self.docker_manager
397 .connect_to_network(&final_id_for_connection_and_health_check, net)
398 .await?;
399 }
400
401 if perform_health_check {
402 info!(
403 "Performing health check for Prometheus container {} ({})",
404 &self.config.docker_container_name, final_id_for_connection_and_health_check
405 );
406 if self
407 .docker_manager
408 .wait_for_container_health(
409 &final_id_for_connection_and_health_check,
410 PROMETHEUS_DOCKER_HEALTH_TIMEOUT_SECS,
411 )
412 .await
413 .is_err()
414 {
415 let err_msg = format!(
416 "Prometheus Docker container {} ({}) did not become healthy.",
417 self.config.docker_container_name, final_id_for_connection_and_health_check
418 );
419 error!("{}", err_msg);
420 return Err(crate::error::Error::Other(format!(
421 "Prometheus container ({}) health check failed: {}",
422 final_id_for_connection_and_health_check, err_msg
423 )));
424 }
425 info!(
426 "Prometheus Docker container {} ({}) is healthy.",
427 &self.config.docker_container_name, final_id_for_connection_and_health_check
428 );
429 } else {
430 info!(
431 "Skipping health check for already running Prometheus container {} ({})",
432 &self.config.docker_container_name, final_id_for_connection_and_health_check
433 );
434 }
435 } else {
436 {
437 let guard = self.embedded_server.lock().unwrap();
438 if guard.is_some() {
439 info!(
440 "Embedded Prometheus server on {}:{} already initialized.",
441 self.config.host, self.config.port
442 );
443 return Ok(());
444 }
445 }
446
447 let registry_arc_clone;
448 let bind_address_for_new_server;
449
450 if let Some(registry) = &self.metrics_registry {
451 registry_arc_clone = registry.clone();
452 bind_address_for_new_server = format!("{}:{}", self.config.host, self.config.port);
453 info!(
454 "Attempting to start embedded Prometheus server on {} using provided registry",
455 bind_address_for_new_server
456 );
457 } else {
458 return Err(crate::error::Error::Other(
459 "Metrics registry not provided for embedded Prometheus server".to_string(),
460 ));
461 }
462
463 match std::net::TcpListener::bind(&bind_address_for_new_server) {
464 Ok(listener) => {
465 drop(listener);
466 info!(
467 "Port {} is free, proceeding to start embedded Prometheus server.",
468 bind_address_for_new_server
469 );
470 }
471 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
472 error!(
473 "Failed to bind embedded Prometheus server to {}: Address already in use.",
474 bind_address_for_new_server
475 );
476 return Err(crate::error::Error::Other(format!(
477 "Address {} already in use for embedded Prometheus server",
478 bind_address_for_new_server
479 )));
480 }
481 Err(e) => {
482 error!(
483 "Failed to perform pre-bind check for embedded Prometheus server on {}: {}",
484 bind_address_for_new_server, e
485 );
486 return Err(crate::error::Error::Other(format!(
487 "Failed pre-bind check for {}: {}",
488 bind_address_for_new_server, e
489 )));
490 }
491 }
492
493 let mut server_instance = PrometheusMetricsServer::new(
494 registry_arc_clone,
495 self.enhanced_metrics_provider.clone(),
496 bind_address_for_new_server.clone(),
497 );
498
499 server_instance.start().await?;
500
501 {
502 let mut guard = self.embedded_server.lock().unwrap();
503 *guard = Some(server_instance);
504 }
505
506 info!(
507 "Successfully started embedded Prometheus server on {}",
508 bind_address_for_new_server
509 );
510 }
511
512 Ok(())
513 }
514
515 /// Stops the running Prometheus server and performs necessary cleanup.
516 ///
517 /// This method safely terminates the Prometheus server instance based on its operational mode:
518 ///
519 /// ## Docker Mode
520 /// When running as a Docker container:
521 /// * Retrieves the container ID from internal state
522 /// * Stops the Docker container using the Docker API
523 /// * Removes the container to free up resources
524 /// * Clears the internal container ID reference
525 /// * Handles cases where container is already stopped gracefully
526 ///
527 /// ## Embedded Mode
528 /// When running as an embedded server:
529 /// * Acquires lock on the server instance
530 /// * Triggers a graceful shutdown of the HTTP server
531 /// * Waits for in-flight requests to complete
532 /// * Releases resources associated with the server
533 ///
534 /// The method is idempotent and safe to call multiple times, even if the server
535 /// is not running.
536 ///
537 /// # Errors
538 /// Returns an error if:
539 /// * Docker API encounters errors when stopping or removing the container
540 /// * Communication with Docker daemon fails
541 /// * Container removal fails due to permission issues
542 async fn stop(&self) -> Result<()> {
543 if self.config.use_docker {
544 let container_id = {
545 let id = self.container_id.lock().unwrap();
546 match id.as_ref() {
547 Some(id) => id.clone(),
548 None => {
549 info!("Prometheus Docker container is not running, nothing to stop.");
550 return Ok(());
551 }
552 }
553 };
554
555 info!(
556 "Stopping Prometheus server in Docker container: {}",
557 &self.config.docker_container_name
558 );
559 self.docker_manager
560 .stop_and_remove_container(&container_id, &self.config.docker_container_name)
561 .await?;
562
563 let mut id = self.container_id.lock().unwrap();
564 *id = None;
565
566 info!("Prometheus Docker container stopped successfully.");
567 } else {
568 let mut server_guard = self.embedded_server.lock().unwrap();
569 if let Some(server) = server_guard.as_mut() {
570 server.stop();
571 info!("Stopped embedded Prometheus server");
572 }
573 }
574
575 Ok(())
576 }
577
578 /// Returns the fully qualified URL where the Prometheus server can be accessed.
579 ///
580 /// Constructs a URL using the configured host and port values in the format:
581 /// `http://{host}:{port}`
582 ///
583 /// This URL can be used to:
584 /// * Access the Prometheus web UI for manual query and visualization
585 /// * Configure Grafana datasources programmatically
586 /// * Set up health checks or monitoring of the Prometheus instance
587 /// * Access the Prometheus HTTP API for programmatic queries
588 ///
589 /// The URL is valid regardless of whether the server is running in Docker mode
590 /// or embedded mode, as both modes expose the same interface on the configured
591 /// host:port combination.
592 fn url(&self) -> String {
593 format!(
594 "http://{}:{}",
595 if self.config.use_docker {
596 "localhost"
597 } else {
598 &self.config.host
599 },
600 self.config.port
601 )
602 }
603
604 /// Checks if the Prometheus server is currently running.
605 ///
606 /// For Docker-based servers, checks the container status.
607 /// For embedded servers, checks if the server instance exists.
608 ///
609 /// # Errors
610 /// Returns an error if checking the server status fails or if
611 /// the Docker API reports an error.
612 async fn is_running(&self) -> Result<bool> {
613 if self.config.use_docker {
614 let container_id = {
615 let id = self.container_id.lock().unwrap();
616 match id.as_ref() {
617 Some(id) => id.clone(),
618 None => return Ok(false),
619 }
620 };
621
622 return self
623 .docker_manager
624 .is_container_running(&container_id)
625 .await;
626 }
627
628 let server = self.embedded_server.lock().unwrap();
629 Ok(server.is_some())
630 }
631
632 /// Waits until the Prometheus server is ready to accept connections.
633 ///
634 /// Periodically checks the server status until it's ready or until the timeout expires.
635 /// For Docker-based servers, performs HTTP health checks.
636 /// For embedded servers, verifies the server is bound and responding.
637 ///
638 /// # Parameters
639 /// * `timeout_secs` - Maximum time to wait in seconds
640 ///
641 /// # Errors
642 /// Returns an error if the server fails to become ready within the timeout period
643 /// or if health checks fail.
644 async fn wait_until_ready(&self, timeout_secs: u64) -> Result<()> {
645 if self.config.use_docker {
646 let container_id = {
647 let id = self.container_id.lock().unwrap();
648 id.as_ref().map(String::clone).ok_or_else(|| {
649 crate::error::Error::Generic("Prometheus container not running".to_string())
650 })?
651 };
652
653 info!("Waiting for Prometheus container to be healthy...");
654 if let Err(e) = self
655 .docker_manager
656 .wait_for_container_health(&container_id, timeout_secs)
657 .await
658 {
659 warn!(
660 "Prometheus container health check failed: {}. Proceeding with API check.",
661 e
662 );
663 } else {
664 info!("Prometheus container health check passed.");
665 }
666
667 info!("Waiting for Prometheus API to be responsive...");
668 let client = reqwest::Client::new();
669 let url = format!("{}/-/ready", self.url());
670 let start_time = tokio::time::Instant::now();
671 let timeout = Duration::from_secs(timeout_secs);
672
673 loop {
674 if start_time.elapsed() > timeout {
675 return Err(crate::error::Error::Generic(format!(
676 "Prometheus API did not become responsive within {} seconds.",
677 timeout_secs
678 )));
679 }
680
681 match client.get(&url).send().await {
682 Ok(response) if response.status().is_success() => {
683 info!("Prometheus API is responsive.");
684 return Ok(());
685 }
686 _ => {}
687 }
688
689 tokio::time::sleep(Duration::from_secs(1)).await;
690 }
691 } else {
692 let start_time = std::time::Instant::now();
693 let timeout = Duration::from_secs(timeout_secs);
694
695 while start_time.elapsed() < timeout {
696 if self.is_running().await? {
697 info!("Embedded Prometheus server is running.");
698 return Ok(());
699 }
700 sleep(Duration::from_millis(500)).await;
701 }
702
703 Err(crate::error::Error::Generic(format!(
704 "Embedded Prometheus server did not become ready within {} seconds",
705 timeout_secs
706 )))
707 }
708 }
709}
710
711impl Drop for PrometheusServer {
712 fn drop(&mut self) {
713 let mut server_guard = self.embedded_server.lock().unwrap();
714 if let Some(server) = server_guard.as_mut() {
715 server.stop();
716 }
717
718 let final_container_id_check = self.container_id.lock().unwrap();
719 info!(
720 "PrometheusServer::start: For health check, read self.container_id as: {:?}",
721 *final_container_id_check
722 );
723 let final_container_id = final_container_id_check.clone();
724 if let Some(id) = final_container_id {
725 info!(
726 "Note: Docker container {} will not be automatically stopped on drop",
727 id
728 );
729 }
730 }
731}