blueprint_qos/servers/
prometheus.rs

1use blueprint_core::{error, info, warn};
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5use tokio::time::sleep;
6
7use crate::error::Result;
8use crate::metrics::EnhancedMetricsProvider;
9use crate::metrics::prometheus::server::PrometheusServer as PrometheusMetricsServer;
10use crate::servers::ServerManager;
11use crate::servers::common::DockerManager;
12
13/// Configuration settings for a Prometheus metrics server.
14///
15/// This struct defines all the parameters needed to set up and run a Prometheus server,
16/// either as a Docker container or as an embedded server within the application.
17#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
18pub struct PrometheusServerConfig {
19    /// The port to bind the Prometheus server to
20    pub port: u16,
21
22    /// The host to bind the Prometheus server to
23    pub host: String,
24
25    /// Whether to use Docker for the Prometheus server
26    pub use_docker: bool,
27
28    /// The Docker image to use for the Prometheus server
29    pub docker_image: String,
30
31    /// The Docker container name to use for the Prometheus server
32    pub docker_container_name: String,
33
34    /// The path to the Prometheus configuration file
35    pub config_path: Option<String>,
36
37    /// The path to mount as the Prometheus data directory
38    pub data_path: Option<String>,
39}
40
41// Default values for PrometheusServerConfig
42const DEFAULT_PROMETHEUS_PORT: u16 = 9090;
43const DEFAULT_PROMETHEUS_HOST: &str = "0.0.0.0";
44const DEFAULT_PROMETHEUS_DOCKER_IMAGE: &str = "prom/prometheus:latest";
45const DEFAULT_PROMETHEUS_CONTAINER_NAME: &str = "blueprint-prometheus";
46const PROMETHEUS_DOCKER_HEALTH_TIMEOUT_SECS: u64 = 120;
47
48impl Default for PrometheusServerConfig {
49    fn default() -> Self {
50        Self {
51            port: DEFAULT_PROMETHEUS_PORT,
52            host: DEFAULT_PROMETHEUS_HOST.to_string(),
53            use_docker: false,
54            docker_image: DEFAULT_PROMETHEUS_DOCKER_IMAGE.to_string(),
55            docker_container_name: DEFAULT_PROMETHEUS_CONTAINER_NAME.to_string(),
56            config_path: None,
57            data_path: None,
58        }
59    }
60}
61
62/// Manager for a Prometheus metrics server instance.
63///
64/// This struct handles the lifecycle of a Prometheus server, which can either run
65/// as a Docker container or as an embedded server within the application process.
66/// It provides methods for starting, stopping, and monitoring the server.
67#[derive(Clone)]
68pub struct PrometheusServer {
69    /// The configuration for the Prometheus server
70    config: PrometheusServerConfig,
71
72    /// The Docker manager for the Prometheus server (if using Docker)
73    docker_manager: DockerManager,
74
75    /// The container ID for the Prometheus server (if using Docker)
76    container_id: Arc<Mutex<Option<String>>>,
77
78    /// The embedded Prometheus server (if not using Docker)
79    embedded_server: Arc<Mutex<Option<PrometheusMetricsServer>>>,
80
81    /// The metrics registry provided by `EnhancedMetricsProvider` (if not using Docker)
82    metrics_registry: Option<Arc<prometheus::Registry>>,
83
84    /// The enhanced metrics provider, used to force flush OTEL metrics on scrape
85    enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
86}
87
88impl PrometheusServer {
89    /// Creates a new Prometheus server manager with the provided configuration and metrics registry.
90    ///
91    /// This constructor prepares the Prometheus server infrastructure but does not start the server.
92    /// The actual server (Docker container or embedded) is started when `start()` is called.
93    ///
94    /// # Parameters
95    /// * `config` - The configuration settings for the Prometheus server
96    /// * `metrics_registry` - Optional Prometheus registry for the embedded server mode
97    /// * `enhanced_metrics_provider` - Provider that generates metrics data
98    ///
99    /// # Errors
100    /// Returns an error if the Docker manager connection fails to initialize
101    pub fn new(
102        config: PrometheusServerConfig,
103        metrics_registry: Option<Arc<prometheus::Registry>>,
104        enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
105    ) -> Result<Self> {
106        Ok(Self {
107            config,
108            docker_manager: DockerManager::new()
109                .map_err(|e| crate::error::Error::DockerConnection(e.to_string()))?,
110            container_id: Arc::new(Mutex::new(None)),
111            embedded_server: Arc::new(Mutex::new(None)),
112            metrics_registry,
113            enhanced_metrics_provider,
114        })
115    }
116
117    /// Creates and configures a new Docker container for the Prometheus server.
118    ///
119    /// Sets up a new container with appropriate port mappings, volume mounts, and configuration
120    /// based on the server settings. This does not start the container, only creates it.
121    ///
122    /// # Parameters
123    /// * `network` - Optional Docker network to attach the container to
124    ///
125    /// # Errors
126    /// Returns an error if the Docker API fails to create the container
127    ///
128    /// # Panics
129    /// Panics if mutex locks cannot be acquired
130    pub async fn create_docker_container(&self) -> Result<()> {
131        let env_vars = HashMap::new();
132
133        let mut ports = HashMap::new();
134        ports.insert(self.config.port.to_string(), self.config.port.to_string());
135
136        let mut volumes = HashMap::new();
137        if let Some(config_path) = &self.config.config_path {
138            volumes.insert(
139                config_path.clone(),
140                "/etc/prometheus/prometheus.yml".to_string(),
141            );
142        }
143
144        if let Some(data_path) = &self.config.data_path {
145            volumes.insert(data_path.clone(), "/prometheus".to_string());
146        }
147
148        let new_container_id_result = self
149            .docker_manager
150            .run_container(
151                &self.config.docker_image,
152                &self.config.docker_container_name,
153                env_vars,
154                ports,
155                volumes,
156                None,
157                None,
158                None,
159                None,
160            )
161            .await;
162
163        match new_container_id_result {
164            Ok(id) => {
165                info!(
166                    "PrometheusServer::start: docker_manager.run_container succeeded. Raw new_container_id: '{}'",
167                    id
168                );
169                if id.trim().is_empty() {
170                    error!(
171                        "PrometheusServer::start: docker_manager.run_container returned an EMPTY string for container ID."
172                    );
173                    return Err(crate::error::Error::Other(
174                        "Docker manager returned empty container ID for Prometheus".to_string(),
175                    ));
176                }
177                let mut id_guard = self.container_id.lock().unwrap();
178                *id_guard = Some(id.clone());
179                info!(
180                    "PrometheusServer::start: Stored new container_id into self.container_id. Current self.container_id: {:?}",
181                    *id_guard
182                );
183            }
184            Err(e) => {
185                error!(
186                    "PrometheusServer::start: docker_manager.run_container FAILED: {}",
187                    e
188                );
189                return Err(e);
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Returns a reference to the metrics registry used by the embedded Prometheus server.
197    ///
198    /// This registry contains all the metrics that are exposed by the embedded Prometheus server.
199    /// Returns None if no registry was provided during initialization.
200    #[must_use]
201    pub fn registry(&self) -> Option<Arc<prometheus::Registry>> {
202        self.metrics_registry.clone()
203    }
204
205    /// Returns the Docker container ID if the server is running in Docker mode.
206    ///
207    /// This identifier can be used to reference the specific Prometheus container for
208    /// management operations via Docker API. The container ID is only available when the
209    /// server is configured to run in Docker mode and after the container has been created.
210    ///
211    /// # Returns
212    /// * `Some(String)` - Container ID if the server is using Docker and a container has been created
213    /// * `None` - If the server is not using Docker, the container hasn't been created yet,
214    ///   or if there was an error acquiring the lock on the container ID
215    #[must_use]
216    pub fn container_id(&self) -> Option<String> {
217        self.container_id.lock().map(|id| id.clone()).ok()?
218    }
219
220    /// Checks if this server is configured to use Docker.
221    ///
222    /// The Prometheus server can operate in two modes:
223    /// 1. Docker mode: Runs Prometheus in a separate Docker container, providing isolation
224    ///    and easier management but requiring Docker to be available
225    /// 2. Embedded mode: Runs a Prometheus-compatible metrics HTTP server directly within
226    ///    the application process, with no external dependencies
227    ///
228    /// # Returns
229    /// * `true` - If the server is configured to use Docker
230    /// * `false` - If the server is using the embedded mode
231    #[must_use]
232    pub fn is_docker_based(&self) -> bool {
233        self.config.use_docker
234    }
235}
236
237impl ServerManager for PrometheusServer {
238    /// Starts the Prometheus server in either Docker or embedded mode.
239    ///
240    /// * **`Docker`** – Creates (or reuses) a container, mounts configuration/data volumes,
241    ///   optionally connects it to a network, and waits for the container health-check.
242    /// * **`Embedded`** – Binds an in-process HTTP server to `host:port` and serves the
243    ///   `/metrics` endpoint backed by the supplied registry.
244    ///
245    /// The call blocks until the server is ready.
246    ///
247    /// # Parameters
248    /// * `network` – Optional Docker network name (`Docker` mode only).
249    ///
250    /// # Errors
251    /// Returns an error if the container or server fails to start, health-check fails, the
252    /// port is already in use, or required configuration is missing.
253    ///
254    ///
255    /// * `network` - Optional Docker network name to connect the container to. Only used in `Docker` mode.
256    ///   When provided, the container will be connected to this network to allow service discovery
257    ///   between services (e.g., Prometheus to scrape metrics from other containers).
258    ///
259    /// # Errors
260    /// Returns an error if:
261    /// * Docker container creation or startup fails (permission issues, Docker daemon not running, image not found)
262    /// * Port binding fails for the embedded server (port already in use or insufficient permissions)
263    /// * Server startup times out or fails for any other reason
264    /// * Configuration file generation fails or specified paths are invalid (Docker mode only)
265    /// * Health check fails for Docker container
266    /// * No metrics registry is provided for embedded server mode
267    async fn start(&self, network: Option<&str>, bind_ip: Option<String>) -> Result<()> {
268        let mut current_container_id_val: Option<String> = None;
269        if self.config.use_docker {
270            info!(
271                "PrometheusServer::start: Attempting to run Docker container '{}' with name '{}' on network {:?}. Ports: {:?}, Config mount: {:?} -> /etc/prometheus/prometheus.yml",
272                self.config.docker_image,
273                self.config.docker_container_name,
274                network,
275                self.config.port,
276                self.config.config_path
277            );
278            let mut perform_health_check = true;
279
280            let initial_id_check = self.container_id.lock().unwrap().clone();
281
282            if let Some(existing_id) = initial_id_check {
283                info!(
284                    "PrometheusServer::start: Found existing container_id: {}. Checking if it's running.",
285                    existing_id
286                );
287                let is_running = self
288                    .docker_manager
289                    .is_container_running(&existing_id)
290                    .await?;
291                if is_running {
292                    info!(
293                        "PrometheusServer::start: Container {} is already running.",
294                        existing_id
295                    );
296                    current_container_id_val = Some(existing_id);
297                    perform_health_check = false;
298                } else {
299                    warn!(
300                        "PrometheusServer::start: Container {} was found but is not running. Attempting to remove and recreate.",
301                        existing_id
302                    );
303                    if let Err(e) = self
304                        .docker_manager
305                        .stop_and_remove_container(&existing_id, &self.config.docker_container_name)
306                        .await
307                    {
308                        warn!(
309                            "PrometheusServer::start: Failed to remove non-running container {}: {}. Proceeding to create a new one.",
310                            existing_id, e
311                        );
312                    }
313                    // Reset container ID to indicate we need to create a new one
314                    current_container_id_val = None;
315                    *self.container_id.lock().unwrap() = None;
316                }
317            }
318
319            if current_container_id_val.is_none() {
320                info!(
321                    "PrometheusServer::start: No existing container_id found or old one was not running. Creating new container."
322                );
323                let mut ports = std::collections::HashMap::new();
324                ports.insert("9090/tcp".to_string(), self.config.port.to_string());
325
326                let mut volumes = std::collections::HashMap::new();
327                if let Some(config_host_path) = &self.config.config_path {
328                    volumes.insert(
329                        config_host_path.clone(),
330                        "/etc/prometheus/prometheus.yml".to_string(),
331                    );
332                }
333                if let Some(data_host_path) = &self.config.data_path {
334                    volumes.insert(data_host_path.clone(), "/prometheus".to_string());
335                }
336
337                let extra_hosts = vec!["host.docker.internal:host-gateway".to_string()];
338
339                let new_id_result = self
340                    .docker_manager
341                    .run_container(
342                        &self.config.docker_image,
343                        &self.config.docker_container_name,
344                        std::collections::HashMap::new(), // env_vars
345                        ports,
346                        volumes,
347                        None, // cmd
348                        Some(extra_hosts),
349                        None, // health_check_cmd
350                        bind_ip,
351                    )
352                    .await;
353
354                match new_id_result {
355                    Ok(id) => {
356                        if id.trim().is_empty() {
357                            error!(
358                                "PrometheusServer::start: Docker manager returned an EMPTY string for container ID."
359                            );
360                            return Err(crate::error::Error::Other(
361                                "Docker manager returned empty container ID for Prometheus"
362                                    .to_string(),
363                            ));
364                        }
365                        info!(
366                            "PrometheusServer::start: Successfully created container with ID: {}",
367                            id
368                        );
369                        current_container_id_val = Some(id.clone());
370                    }
371                    Err(e) => {
372                        error!(
373                            "PrometheusServer::start: Failed to run Prometheus container: {}",
374                            e
375                        );
376                        return Err(e);
377                    }
378                }
379            }
380
381            let final_id_for_connection_and_health_check =
382                current_container_id_val.clone().ok_or_else(|| {
383                    crate::error::Error::Other(
384                        "Prometheus container ID unexpectedly None after creation/check logic"
385                            .to_string(),
386                    )
387                })?;
388
389            if let Some(net) = network {
390                info!(
391                    "Connecting Prometheus container {} ({}) to network {}",
392                    &self.config.docker_container_name,
393                    final_id_for_connection_and_health_check,
394                    net
395                );
396                self.docker_manager
397                    .connect_to_network(&final_id_for_connection_and_health_check, net)
398                    .await?;
399            }
400
401            if perform_health_check {
402                info!(
403                    "Performing health check for Prometheus container {} ({})",
404                    &self.config.docker_container_name, final_id_for_connection_and_health_check
405                );
406                if self
407                    .docker_manager
408                    .wait_for_container_health(
409                        &final_id_for_connection_and_health_check,
410                        PROMETHEUS_DOCKER_HEALTH_TIMEOUT_SECS,
411                    )
412                    .await
413                    .is_err()
414                {
415                    let err_msg = format!(
416                        "Prometheus Docker container {} ({}) did not become healthy.",
417                        self.config.docker_container_name, final_id_for_connection_and_health_check
418                    );
419                    error!("{}", err_msg);
420                    return Err(crate::error::Error::Other(format!(
421                        "Prometheus container ({}) health check failed: {}",
422                        final_id_for_connection_and_health_check, err_msg
423                    )));
424                }
425                info!(
426                    "Prometheus Docker container {} ({}) is healthy.",
427                    &self.config.docker_container_name, final_id_for_connection_and_health_check
428                );
429            } else {
430                info!(
431                    "Skipping health check for already running Prometheus container {} ({})",
432                    &self.config.docker_container_name, final_id_for_connection_and_health_check
433                );
434            }
435        } else {
436            {
437                let guard = self.embedded_server.lock().unwrap();
438                if guard.is_some() {
439                    info!(
440                        "Embedded Prometheus server on {}:{} already initialized.",
441                        self.config.host, self.config.port
442                    );
443                    return Ok(());
444                }
445            }
446
447            let registry_arc_clone;
448            let bind_address_for_new_server;
449
450            if let Some(registry) = &self.metrics_registry {
451                registry_arc_clone = registry.clone();
452                bind_address_for_new_server = format!("{}:{}", self.config.host, self.config.port);
453                info!(
454                    "Attempting to start embedded Prometheus server on {} using provided registry",
455                    bind_address_for_new_server
456                );
457            } else {
458                return Err(crate::error::Error::Other(
459                    "Metrics registry not provided for embedded Prometheus server".to_string(),
460                ));
461            }
462
463            match std::net::TcpListener::bind(&bind_address_for_new_server) {
464                Ok(listener) => {
465                    drop(listener);
466                    info!(
467                        "Port {} is free, proceeding to start embedded Prometheus server.",
468                        bind_address_for_new_server
469                    );
470                }
471                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
472                    error!(
473                        "Failed to bind embedded Prometheus server to {}: Address already in use.",
474                        bind_address_for_new_server
475                    );
476                    return Err(crate::error::Error::Other(format!(
477                        "Address {} already in use for embedded Prometheus server",
478                        bind_address_for_new_server
479                    )));
480                }
481                Err(e) => {
482                    error!(
483                        "Failed to perform pre-bind check for embedded Prometheus server on {}: {}",
484                        bind_address_for_new_server, e
485                    );
486                    return Err(crate::error::Error::Other(format!(
487                        "Failed pre-bind check for {}: {}",
488                        bind_address_for_new_server, e
489                    )));
490                }
491            }
492
493            let mut server_instance = PrometheusMetricsServer::new(
494                registry_arc_clone,
495                self.enhanced_metrics_provider.clone(),
496                bind_address_for_new_server.clone(),
497            );
498
499            server_instance.start().await?;
500
501            {
502                let mut guard = self.embedded_server.lock().unwrap();
503                *guard = Some(server_instance);
504            }
505
506            info!(
507                "Successfully started embedded Prometheus server on {}",
508                bind_address_for_new_server
509            );
510        }
511
512        Ok(())
513    }
514
515    /// Stops the running Prometheus server and performs necessary cleanup.
516    ///
517    /// This method safely terminates the Prometheus server instance based on its operational mode:
518    ///
519    /// ## Docker Mode
520    /// When running as a Docker container:
521    /// * Retrieves the container ID from internal state
522    /// * Stops the Docker container using the Docker API
523    /// * Removes the container to free up resources
524    /// * Clears the internal container ID reference
525    /// * Handles cases where container is already stopped gracefully
526    ///
527    /// ## Embedded Mode
528    /// When running as an embedded server:
529    /// * Acquires lock on the server instance
530    /// * Triggers a graceful shutdown of the HTTP server
531    /// * Waits for in-flight requests to complete
532    /// * Releases resources associated with the server
533    ///
534    /// The method is idempotent and safe to call multiple times, even if the server
535    /// is not running.
536    ///
537    /// # Errors
538    /// Returns an error if:
539    /// * Docker API encounters errors when stopping or removing the container
540    /// * Communication with Docker daemon fails
541    /// * Container removal fails due to permission issues
542    async fn stop(&self) -> Result<()> {
543        if self.config.use_docker {
544            let container_id = {
545                let id = self.container_id.lock().unwrap();
546                match id.as_ref() {
547                    Some(id) => id.clone(),
548                    None => {
549                        info!("Prometheus Docker container is not running, nothing to stop.");
550                        return Ok(());
551                    }
552                }
553            };
554
555            info!(
556                "Stopping Prometheus server in Docker container: {}",
557                &self.config.docker_container_name
558            );
559            self.docker_manager
560                .stop_and_remove_container(&container_id, &self.config.docker_container_name)
561                .await?;
562
563            let mut id = self.container_id.lock().unwrap();
564            *id = None;
565
566            info!("Prometheus Docker container stopped successfully.");
567        } else {
568            let mut server_guard = self.embedded_server.lock().unwrap();
569            if let Some(server) = server_guard.as_mut() {
570                server.stop();
571                info!("Stopped embedded Prometheus server");
572            }
573        }
574
575        Ok(())
576    }
577
578    /// Returns the fully qualified URL where the Prometheus server can be accessed.
579    ///
580    /// Constructs a URL using the configured host and port values in the format:
581    /// `http://{host}:{port}`
582    ///
583    /// This URL can be used to:
584    /// * Access the Prometheus web UI for manual query and visualization
585    /// * Configure Grafana datasources programmatically
586    /// * Set up health checks or monitoring of the Prometheus instance
587    /// * Access the Prometheus HTTP API for programmatic queries
588    ///
589    /// The URL is valid regardless of whether the server is running in Docker mode
590    /// or embedded mode, as both modes expose the same interface on the configured
591    /// host:port combination.
592    fn url(&self) -> String {
593        format!(
594            "http://{}:{}",
595            if self.config.use_docker {
596                "localhost"
597            } else {
598                &self.config.host
599            },
600            self.config.port
601        )
602    }
603
604    /// Checks if the Prometheus server is currently running.
605    ///
606    /// For Docker-based servers, checks the container status.
607    /// For embedded servers, checks if the server instance exists.
608    ///
609    /// # Errors
610    /// Returns an error if checking the server status fails or if
611    /// the Docker API reports an error.
612    async fn is_running(&self) -> Result<bool> {
613        if self.config.use_docker {
614            let container_id = {
615                let id = self.container_id.lock().unwrap();
616                match id.as_ref() {
617                    Some(id) => id.clone(),
618                    None => return Ok(false),
619                }
620            };
621
622            return self
623                .docker_manager
624                .is_container_running(&container_id)
625                .await;
626        }
627
628        let server = self.embedded_server.lock().unwrap();
629        Ok(server.is_some())
630    }
631
632    /// Waits until the Prometheus server is ready to accept connections.
633    ///
634    /// Periodically checks the server status until it's ready or until the timeout expires.
635    /// For Docker-based servers, performs HTTP health checks.
636    /// For embedded servers, verifies the server is bound and responding.
637    ///
638    /// # Parameters
639    /// * `timeout_secs` - Maximum time to wait in seconds
640    ///
641    /// # Errors
642    /// Returns an error if the server fails to become ready within the timeout period
643    /// or if health checks fail.
644    async fn wait_until_ready(&self, timeout_secs: u64) -> Result<()> {
645        if self.config.use_docker {
646            let container_id = {
647                let id = self.container_id.lock().unwrap();
648                id.as_ref().map(String::clone).ok_or_else(|| {
649                    crate::error::Error::Generic("Prometheus container not running".to_string())
650                })?
651            };
652
653            info!("Waiting for Prometheus container to be healthy...");
654            if let Err(e) = self
655                .docker_manager
656                .wait_for_container_health(&container_id, timeout_secs)
657                .await
658            {
659                warn!(
660                    "Prometheus container health check failed: {}. Proceeding with API check.",
661                    e
662                );
663            } else {
664                info!("Prometheus container health check passed.");
665            }
666
667            info!("Waiting for Prometheus API to be responsive...");
668            let client = reqwest::Client::new();
669            let url = format!("{}/-/ready", self.url());
670            let start_time = tokio::time::Instant::now();
671            let timeout = Duration::from_secs(timeout_secs);
672
673            loop {
674                if start_time.elapsed() > timeout {
675                    return Err(crate::error::Error::Generic(format!(
676                        "Prometheus API did not become responsive within {} seconds.",
677                        timeout_secs
678                    )));
679                }
680
681                match client.get(&url).send().await {
682                    Ok(response) if response.status().is_success() => {
683                        info!("Prometheus API is responsive.");
684                        return Ok(());
685                    }
686                    _ => {}
687                }
688
689                tokio::time::sleep(Duration::from_secs(1)).await;
690            }
691        } else {
692            let start_time = std::time::Instant::now();
693            let timeout = Duration::from_secs(timeout_secs);
694
695            while start_time.elapsed() < timeout {
696                if self.is_running().await? {
697                    info!("Embedded Prometheus server is running.");
698                    return Ok(());
699                }
700                sleep(Duration::from_millis(500)).await;
701            }
702
703            Err(crate::error::Error::Generic(format!(
704                "Embedded Prometheus server did not become ready within {} seconds",
705                timeout_secs
706            )))
707        }
708    }
709}
710
711impl Drop for PrometheusServer {
712    fn drop(&mut self) {
713        let mut server_guard = self.embedded_server.lock().unwrap();
714        if let Some(server) = server_guard.as_mut() {
715            server.stop();
716        }
717
718        let final_container_id_check = self.container_id.lock().unwrap();
719        info!(
720            "PrometheusServer::start: For health check, read self.container_id as: {:?}",
721            *final_container_id_check
722        );
723        let final_container_id = final_container_id_check.clone();
724        if let Some(id) = final_container_id {
725            info!(
726                "Note: Docker container {} will not be automatically stopped on drop",
727                id
728            );
729        }
730    }
731}