blueprint_qos/servers/
common.rs

1use blueprint_core::{debug, error, info, warn};
2use bollard::{
3    Docker,
4    container::{
5        Config, CreateContainerOptions, ListContainersOptions, LogsOptions, RemoveContainerOptions,
6        StartContainerOptions, StopContainerOptions,
7    },
8    image::CreateImageOptions,
9    models::{HealthConfig, HealthStatusEnum, HostConfig, PortBinding},
10    network::{ConnectNetworkOptions, CreateNetworkOptions, InspectNetworkOptions},
11};
12use futures::StreamExt;
13use std::{collections::HashMap, default::Default};
14use tokio::time::{Duration, Instant, sleep};
15
16const DEFAULT_CONTAINER_HEALTH_POLL_INTERVAL_SECS: u64 = 1;
17
18use crate::error::{Error, Result};
19
20/// Docker container manager
21#[derive(Clone)]
22pub struct DockerManager {
23    /// Docker client
24    docker: Docker,
25}
26
27impl Default for DockerManager {
28    fn default() -> Self {
29        Self::new().expect("Failed to create Docker manager")
30    }
31}
32
33impl DockerManager {
34    // Helper function to validate a single path component for volume mounts
35    fn validate_volume_path_component(path_str: &str) -> Result<()> {
36        if path_str.contains("..") {
37            error!("Invalid volume path component due to '..': {}", path_str);
38            return Err(Error::DockerOperation(format!(
39                "Volume path component cannot contain '..': {}",
40                path_str
41            )));
42        }
43        Ok(())
44    }
45
46    /// Create a new Docker manager
47    ///
48    /// # Errors
49    ///
50    /// Returns an error if the Docker client cannot be created
51    pub fn new() -> Result<Self> {
52        let docker = Docker::connect_with_local_defaults()
53            .map_err(|e| Error::DockerConnection(e.to_string()))?;
54        Ok(Self { docker })
55    }
56
57    /// Pull an image if it doesn't exist
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the image cannot be pulled
62    pub async fn ensure_image(&self, image: &str) -> Result<()> {
63        info!("Ensuring image exists: {}", image);
64        if self.docker.inspect_image(image).await.is_ok() {
65            info!("Image '{}' already exists locally.", image);
66            return Ok(());
67        }
68
69        info!("Pulling image '{}'...", image);
70        let options = Some(CreateImageOptions {
71            from_image: image,
72            ..Default::default()
73        });
74
75        let mut stream = self.docker.create_image(options, None, None);
76        while let Some(pull_result) = stream.next().await {
77            if let Err(e) = pull_result {
78                let err_msg = format!("Failed to pull image '{}': {}", image, e);
79                error!("{}", err_msg);
80                return Err(Error::DockerOperation(err_msg));
81            }
82        }
83        info!("Image pull complete for: {}", image);
84        Ok(())
85    }
86
87    /// Create and start a container
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the container cannot be created or started
92    #[allow(clippy::too_many_arguments)]
93    pub async fn run_container(
94        &self,
95        image: &str,
96        name: &str,
97        env_vars: HashMap<String, String>,
98        ports: HashMap<String, String>,
99        volumes: HashMap<String, String>,
100        cmd: Option<Vec<String>>,
101        extra_hosts: Option<Vec<String>>,
102        health_check_cmd: Option<Vec<String>>,
103        bind_ip: Option<String>,
104    ) -> Result<String> {
105        if let Err(e) = self.ensure_image(image).await {
106            warn!("Failed to ensure image exists, but proceeding: {}", e);
107        }
108
109        self.cleanup_container_by_name(name).await?;
110
111        let env: Vec<String> = env_vars
112            .into_iter()
113            .map(|(k, v)| format!("{}={}", k, v))
114            .collect();
115
116        let final_bind_ip = bind_ip.unwrap_or_else(|| "127.0.0.1".to_string());
117        let port_bindings = ports
118            .into_iter()
119            .map(|(container_port, host_port)| {
120                (
121                    container_port.to_string(),
122                    Some(vec![PortBinding {
123                        host_ip: Some(final_bind_ip.clone()),
124                        host_port: Some(host_port),
125                    }]),
126                )
127            })
128            .collect();
129
130        let binds = if volumes.is_empty() {
131            None
132        } else {
133            Some(
134                volumes
135                    .into_iter()
136                    .map(|(host_path, container_path)| {
137                        Self::validate_volume_path_component(&host_path)?;
138                        Self::validate_volume_path_component(&container_path)?;
139                        Ok(format!("{}:{}", host_path, container_path))
140                    })
141                    .collect::<Result<Vec<String>>>()?,
142            )
143        };
144
145        let host_config = HostConfig {
146            port_bindings: Some(port_bindings),
147            binds,
148            extra_hosts,
149            ..Default::default()
150        };
151
152        let health_config = health_check_cmd.map(|cmd| HealthConfig {
153            test: Some(cmd),
154            interval: Some(5_000_000_000),
155            timeout: Some(5_000_000_000),
156            retries: Some(3),
157            start_period: Some(5_000_000_000),
158            start_interval: Some(1_000_000_000),
159        });
160
161        let cmd_slices: Option<Vec<&str>> =
162            cmd.as_ref().map(|v| v.iter().map(AsRef::as_ref).collect());
163
164        let config = Config {
165            image: Some(image),
166            env: Some(env.iter().map(AsRef::as_ref).collect()),
167            host_config: Some(host_config),
168            healthcheck: health_config,
169            cmd: cmd_slices,
170            ..Default::default()
171        };
172
173        info!("Creating container '{}' from image '{}'", name, image);
174        let options = Some(CreateContainerOptions {
175            name: name.to_string(),
176            platform: None,
177        });
178
179        let container_id = match self.docker.create_container(options, config).await {
180            Ok(response) => response.id,
181            Err(e) => {
182                let err_msg = format!("Failed to create container '{}': {}", name, e);
183                error!("[DOCKER ERROR] {}", err_msg);
184                return Err(crate::error::Error::DockerOperation(err_msg));
185            }
186        };
187
188        info!("Starting container '{}' (ID: {})", name, &container_id);
189        if let Err(e) = self
190            .docker
191            .start_container(&container_id, None::<StartContainerOptions<String>>)
192            .await
193        {
194            let err_msg = format!("Failed to start container '{}': {}", name, e);
195            error!("[DOCKER ERROR] {}", err_msg);
196            return Err(crate::error::Error::DockerOperation(err_msg));
197        }
198
199        info!(
200            "Successfully started container '{}' (ID: {})",
201            name, &container_id
202        );
203        Ok(container_id)
204    }
205
206    /// Cleans up a Docker container by name
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if the container cannot be cleaned up
211    async fn cleanup_container_by_name(&self, name: &str) -> Result<()> {
212        let list_options = ListContainersOptions::<String>::default();
213
214        let containers = self
215            .docker
216            .list_containers(Some(list_options))
217            .await
218            .map_err(|e| Error::DockerOperation(e.to_string()))?;
219
220        for container_summary in containers {
221            if container_summary
222                .names
223                .as_ref()
224                .is_some_and(|names| names.contains(&format!("/{}", name)))
225            {
226                info!(
227                    "Found existing container '{}', stopping and removing.",
228                    name
229                );
230                if let Some(container_id) = container_summary.id.as_deref() {
231                    self.stop_and_remove_container(container_id, name).await?;
232                }
233                break;
234            }
235        }
236        Ok(())
237    }
238
239    /// Stops and removes a Docker container
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the container cannot be stopped or removed
244    pub async fn stop_and_remove_container(
245        &self,
246        container_id: &str,
247        container_name: &str,
248    ) -> Result<()> {
249        info!("Stopping container '{}' ({})", container_name, container_id);
250        if let Err(e) = self
251            .docker
252            .stop_container(container_id, None::<StopContainerOptions>)
253            .await
254        {
255            warn!(
256                "Could not stop container '{}' (may already be stopped): {}. Proceeding with removal.",
257                container_name, e
258            );
259        }
260
261        info!("Removing container '{}' ({})", container_name, container_id);
262        self.docker
263            .remove_container(
264                container_id,
265                Some(RemoveContainerOptions {
266                    force: true,
267                    ..Default::default()
268                }),
269            )
270            .await
271            .map_err(|e| {
272                Error::DockerOperation(format!(
273                    "Failed to remove container '{}' ({}): {}",
274                    container_name, container_id, e
275                ))
276            })?;
277        Ok(())
278    }
279
280    /// Creates a Docker network
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if the network cannot be created
285    pub async fn create_network(&self, network_name: &str) -> Result<()> {
286        match self
287            .docker
288            .inspect_network(network_name, None::<InspectNetworkOptions<String>>)
289            .await
290        {
291            Ok(_) => {
292                info!("Network '{}' already exists.", network_name);
293                Ok(())
294            }
295            Err(bollard::errors::Error::DockerResponseServerError {
296                status_code: 404, ..
297            }) => {
298                info!("Creating Docker network: '{}'", network_name);
299                let options = CreateNetworkOptions {
300                    name: network_name,
301                    ..Default::default()
302                };
303                self.docker
304                    .create_network(options)
305                    .await
306                    .map_err(|e| Error::DockerOperation(e.to_string()))?;
307                info!("Successfully created Docker network: '{}'", network_name);
308                Ok(())
309            }
310            Err(e) => {
311                let err_msg = format!("Failed to inspect Docker network '{}': {}", network_name, e);
312                error!("{}", err_msg);
313                Err(Error::DockerOperation(err_msg))
314            }
315        }
316    }
317
318    /// Connects a container to a Docker network
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the container cannot be connected to the network
323    pub async fn connect_to_network(&self, container_name: &str, network_name: &str) -> Result<()> {
324        info!(
325            "Connecting container '{}' to network '{}'",
326            container_name, network_name
327        );
328        let options = ConnectNetworkOptions {
329            container: container_name,
330            ..Default::default()
331        };
332        self.docker
333            .connect_network(network_name, options)
334            .await
335            .map_err(|e| Error::DockerOperation(e.to_string()))
336    }
337
338    /// Checks if the container is running
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the container cannot be inspected
343    pub async fn is_container_running(&self, container_id: &str) -> Result<bool> {
344        let container = self
345            .docker
346            .inspect_container(container_id, None)
347            .await
348            .map_err(|e| Error::DockerOperation(e.to_string()))?;
349
350        Ok(container.state.is_some_and(|s| s.running.unwrap_or(false)))
351    }
352
353    /// Wait for the container to become healthy
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the container is not healthy within the timeout period
358    pub async fn dump_container_logs(&self, container_id: &str) {
359        let options = Some(LogsOptions {
360            stdout: true,
361            stderr: true,
362            tail: "all".to_string(),
363            ..Default::default()
364        });
365        error!("Dumping logs for container {}:", container_id);
366        let mut logs_stream = self.docker.logs(container_id, options);
367        while let Some(log_result) = logs_stream.next().await {
368            match log_result {
369                Ok(log_output) => {
370                    error!("[CONTAINER LOG]: {}", log_output);
371                }
372                Err(e) => {
373                    error!("Error reading container logs: {}", e);
374                }
375            }
376        }
377    }
378
379    /// Waits for a specified Docker container to report a healthy status.
380    ///
381    /// This function polls the container's health status periodically until it becomes
382    /// healthy, an unhealthy status is reported, or the timeout is reached.
383    /// If the container reports unhealthy, its logs will be dumped.
384    ///
385    /// # Arguments
386    /// * `container_id` - The ID of the container to monitor.
387    /// * `timeout_secs` - The maximum time in seconds to wait for the container to become healthy.
388    ///
389    /// # Errors
390    /// This function will return an error if:
391    /// * Inspecting the container fails (e.g., due to Docker API issues).
392    /// * The container reports an `UNHEALTHY` status.
393    /// * The timeout is reached before the container becomes healthy.
394    /// * The container is not running and has no health check configured.
395    pub async fn wait_for_container_health(
396        &self,
397        container_id: &str,
398        timeout_secs: u64,
399    ) -> Result<()> {
400        let timeout = Duration::from_secs(timeout_secs);
401        let start = Instant::now();
402
403        while start.elapsed() < timeout {
404            let inspect_result = self.docker.inspect_container(container_id, None).await;
405
406            match inspect_result {
407                Ok(container) => {
408                    if let Some(state) = &container.state {
409                        if let Some(health) = &state.health {
410                            if let Some(status) = &health.status {
411                                match status {
412                                    HealthStatusEnum::HEALTHY => {
413                                        info!("Container {} is healthy.", container_id);
414                                        return Ok(());
415                                    }
416                                    HealthStatusEnum::UNHEALTHY => {
417                                        error!(
418                                            "Container {} reported unhealthy status.",
419                                            container_id.chars().take(12).collect::<String>()
420                                        );
421                                        self.dump_container_logs(container_id).await;
422                                        return Err(Error::DockerOperation(format!(
423                                            "Container {} reported unhealthy status.",
424                                            container_id
425                                        )));
426                                    }
427                                    _ => {
428                                        debug!(
429                                            "Container {} health status: {:?}",
430                                            container_id, status
431                                        );
432                                    }
433                                }
434                            }
435                        } else if state.running.unwrap_or(false) {
436                            info!(
437                                "Container {} is running (no health check configured).",
438                                container_id
439                            );
440                            return Ok(());
441                        }
442                    }
443                }
444                Err(e) => {
445                    warn!(
446                        "Error inspecting container {}: {}. Retrying...",
447                        container_id, e
448                    );
449                }
450            }
451
452            sleep(Duration::from_secs(
453                DEFAULT_CONTAINER_HEALTH_POLL_INTERVAL_SECS,
454            ))
455            .await;
456        }
457
458        warn!(
459            "Container {} did not become ready within {} seconds.",
460            container_id.chars().take(12).collect::<String>(),
461            timeout_secs
462        );
463        self.dump_container_logs(container_id).await;
464        Err(Error::DockerOperation(format!(
465            "Container {} did not become ready within {} seconds.",
466            container_id, timeout_secs
467        )))
468    }
469}