1use blueprint_core::{debug, error, info, warn};
2use bollard::{
3 Docker,
4 container::{
5 Config, CreateContainerOptions, ListContainersOptions, LogsOptions, RemoveContainerOptions,
6 StartContainerOptions, StopContainerOptions,
7 },
8 image::CreateImageOptions,
9 models::{HealthConfig, HealthStatusEnum, HostConfig, PortBinding},
10 network::{ConnectNetworkOptions, CreateNetworkOptions, InspectNetworkOptions},
11};
12use futures::StreamExt;
13use std::{collections::HashMap, default::Default};
14use tokio::time::{Duration, Instant, sleep};
15
16const DEFAULT_CONTAINER_HEALTH_POLL_INTERVAL_SECS: u64 = 1;
17
18use crate::error::{Error, Result};
19
20#[derive(Clone)]
22pub struct DockerManager {
23 docker: Docker,
25}
26
27impl Default for DockerManager {
28 fn default() -> Self {
29 Self::new().expect("Failed to create Docker manager")
30 }
31}
32
33impl DockerManager {
34 fn validate_volume_path_component(path_str: &str) -> Result<()> {
36 if path_str.contains("..") {
37 error!("Invalid volume path component due to '..': {}", path_str);
38 return Err(Error::DockerOperation(format!(
39 "Volume path component cannot contain '..': {}",
40 path_str
41 )));
42 }
43 Ok(())
44 }
45
46 pub fn new() -> Result<Self> {
52 let docker = Docker::connect_with_local_defaults()
53 .map_err(|e| Error::DockerConnection(e.to_string()))?;
54 Ok(Self { docker })
55 }
56
57 pub async fn ensure_image(&self, image: &str) -> Result<()> {
63 info!("Ensuring image exists: {}", image);
64 if self.docker.inspect_image(image).await.is_ok() {
65 info!("Image '{}' already exists locally.", image);
66 return Ok(());
67 }
68
69 info!("Pulling image '{}'...", image);
70 let options = Some(CreateImageOptions {
71 from_image: image,
72 ..Default::default()
73 });
74
75 let mut stream = self.docker.create_image(options, None, None);
76 while let Some(pull_result) = stream.next().await {
77 if let Err(e) = pull_result {
78 let err_msg = format!("Failed to pull image '{}': {}", image, e);
79 error!("{}", err_msg);
80 return Err(Error::DockerOperation(err_msg));
81 }
82 }
83 info!("Image pull complete for: {}", image);
84 Ok(())
85 }
86
87 #[allow(clippy::too_many_arguments)]
93 pub async fn run_container(
94 &self,
95 image: &str,
96 name: &str,
97 env_vars: HashMap<String, String>,
98 ports: HashMap<String, String>,
99 volumes: HashMap<String, String>,
100 cmd: Option<Vec<String>>,
101 extra_hosts: Option<Vec<String>>,
102 health_check_cmd: Option<Vec<String>>,
103 bind_ip: Option<String>,
104 ) -> Result<String> {
105 if let Err(e) = self.ensure_image(image).await {
106 warn!("Failed to ensure image exists, but proceeding: {}", e);
107 }
108
109 self.cleanup_container_by_name(name).await?;
110
111 let env: Vec<String> = env_vars
112 .into_iter()
113 .map(|(k, v)| format!("{}={}", k, v))
114 .collect();
115
116 let final_bind_ip = bind_ip.unwrap_or_else(|| "127.0.0.1".to_string());
117 let port_bindings = ports
118 .into_iter()
119 .map(|(container_port, host_port)| {
120 (
121 container_port.to_string(),
122 Some(vec![PortBinding {
123 host_ip: Some(final_bind_ip.clone()),
124 host_port: Some(host_port),
125 }]),
126 )
127 })
128 .collect();
129
130 let binds = if volumes.is_empty() {
131 None
132 } else {
133 Some(
134 volumes
135 .into_iter()
136 .map(|(host_path, container_path)| {
137 Self::validate_volume_path_component(&host_path)?;
138 Self::validate_volume_path_component(&container_path)?;
139 Ok(format!("{}:{}", host_path, container_path))
140 })
141 .collect::<Result<Vec<String>>>()?,
142 )
143 };
144
145 let host_config = HostConfig {
146 port_bindings: Some(port_bindings),
147 binds,
148 extra_hosts,
149 ..Default::default()
150 };
151
152 let health_config = health_check_cmd.map(|cmd| HealthConfig {
153 test: Some(cmd),
154 interval: Some(5_000_000_000),
155 timeout: Some(5_000_000_000),
156 retries: Some(3),
157 start_period: Some(5_000_000_000),
158 start_interval: Some(1_000_000_000),
159 });
160
161 let cmd_slices: Option<Vec<&str>> =
162 cmd.as_ref().map(|v| v.iter().map(AsRef::as_ref).collect());
163
164 let config = Config {
165 image: Some(image),
166 env: Some(env.iter().map(AsRef::as_ref).collect()),
167 host_config: Some(host_config),
168 healthcheck: health_config,
169 cmd: cmd_slices,
170 ..Default::default()
171 };
172
173 info!("Creating container '{}' from image '{}'", name, image);
174 let options = Some(CreateContainerOptions {
175 name: name.to_string(),
176 platform: None,
177 });
178
179 let container_id = match self.docker.create_container(options, config).await {
180 Ok(response) => response.id,
181 Err(e) => {
182 let err_msg = format!("Failed to create container '{}': {}", name, e);
183 error!("[DOCKER ERROR] {}", err_msg);
184 return Err(crate::error::Error::DockerOperation(err_msg));
185 }
186 };
187
188 info!("Starting container '{}' (ID: {})", name, &container_id);
189 if let Err(e) = self
190 .docker
191 .start_container(&container_id, None::<StartContainerOptions<String>>)
192 .await
193 {
194 let err_msg = format!("Failed to start container '{}': {}", name, e);
195 error!("[DOCKER ERROR] {}", err_msg);
196 return Err(crate::error::Error::DockerOperation(err_msg));
197 }
198
199 info!(
200 "Successfully started container '{}' (ID: {})",
201 name, &container_id
202 );
203 Ok(container_id)
204 }
205
206 async fn cleanup_container_by_name(&self, name: &str) -> Result<()> {
212 let list_options = ListContainersOptions::<String>::default();
213
214 let containers = self
215 .docker
216 .list_containers(Some(list_options))
217 .await
218 .map_err(|e| Error::DockerOperation(e.to_string()))?;
219
220 for container_summary in containers {
221 if container_summary
222 .names
223 .as_ref()
224 .is_some_and(|names| names.contains(&format!("/{}", name)))
225 {
226 info!(
227 "Found existing container '{}', stopping and removing.",
228 name
229 );
230 if let Some(container_id) = container_summary.id.as_deref() {
231 self.stop_and_remove_container(container_id, name).await?;
232 }
233 break;
234 }
235 }
236 Ok(())
237 }
238
239 pub async fn stop_and_remove_container(
245 &self,
246 container_id: &str,
247 container_name: &str,
248 ) -> Result<()> {
249 info!("Stopping container '{}' ({})", container_name, container_id);
250 if let Err(e) = self
251 .docker
252 .stop_container(container_id, None::<StopContainerOptions>)
253 .await
254 {
255 warn!(
256 "Could not stop container '{}' (may already be stopped): {}. Proceeding with removal.",
257 container_name, e
258 );
259 }
260
261 info!("Removing container '{}' ({})", container_name, container_id);
262 self.docker
263 .remove_container(
264 container_id,
265 Some(RemoveContainerOptions {
266 force: true,
267 ..Default::default()
268 }),
269 )
270 .await
271 .map_err(|e| {
272 Error::DockerOperation(format!(
273 "Failed to remove container '{}' ({}): {}",
274 container_name, container_id, e
275 ))
276 })?;
277 Ok(())
278 }
279
280 pub async fn create_network(&self, network_name: &str) -> Result<()> {
286 match self
287 .docker
288 .inspect_network(network_name, None::<InspectNetworkOptions<String>>)
289 .await
290 {
291 Ok(_) => {
292 info!("Network '{}' already exists.", network_name);
293 Ok(())
294 }
295 Err(bollard::errors::Error::DockerResponseServerError {
296 status_code: 404, ..
297 }) => {
298 info!("Creating Docker network: '{}'", network_name);
299 let options = CreateNetworkOptions {
300 name: network_name,
301 ..Default::default()
302 };
303 self.docker
304 .create_network(options)
305 .await
306 .map_err(|e| Error::DockerOperation(e.to_string()))?;
307 info!("Successfully created Docker network: '{}'", network_name);
308 Ok(())
309 }
310 Err(e) => {
311 let err_msg = format!("Failed to inspect Docker network '{}': {}", network_name, e);
312 error!("{}", err_msg);
313 Err(Error::DockerOperation(err_msg))
314 }
315 }
316 }
317
318 pub async fn connect_to_network(&self, container_name: &str, network_name: &str) -> Result<()> {
324 info!(
325 "Connecting container '{}' to network '{}'",
326 container_name, network_name
327 );
328 let options = ConnectNetworkOptions {
329 container: container_name,
330 ..Default::default()
331 };
332 self.docker
333 .connect_network(network_name, options)
334 .await
335 .map_err(|e| Error::DockerOperation(e.to_string()))
336 }
337
338 pub async fn is_container_running(&self, container_id: &str) -> Result<bool> {
344 let container = self
345 .docker
346 .inspect_container(container_id, None)
347 .await
348 .map_err(|e| Error::DockerOperation(e.to_string()))?;
349
350 Ok(container.state.is_some_and(|s| s.running.unwrap_or(false)))
351 }
352
353 pub async fn dump_container_logs(&self, container_id: &str) {
359 let options = Some(LogsOptions {
360 stdout: true,
361 stderr: true,
362 tail: "all".to_string(),
363 ..Default::default()
364 });
365 error!("Dumping logs for container {}:", container_id);
366 let mut logs_stream = self.docker.logs(container_id, options);
367 while let Some(log_result) = logs_stream.next().await {
368 match log_result {
369 Ok(log_output) => {
370 error!("[CONTAINER LOG]: {}", log_output);
371 }
372 Err(e) => {
373 error!("Error reading container logs: {}", e);
374 }
375 }
376 }
377 }
378
379 pub async fn wait_for_container_health(
396 &self,
397 container_id: &str,
398 timeout_secs: u64,
399 ) -> Result<()> {
400 let timeout = Duration::from_secs(timeout_secs);
401 let start = Instant::now();
402
403 while start.elapsed() < timeout {
404 let inspect_result = self.docker.inspect_container(container_id, None).await;
405
406 match inspect_result {
407 Ok(container) => {
408 if let Some(state) = &container.state {
409 if let Some(health) = &state.health {
410 if let Some(status) = &health.status {
411 match status {
412 HealthStatusEnum::HEALTHY => {
413 info!("Container {} is healthy.", container_id);
414 return Ok(());
415 }
416 HealthStatusEnum::UNHEALTHY => {
417 error!(
418 "Container {} reported unhealthy status.",
419 container_id.chars().take(12).collect::<String>()
420 );
421 self.dump_container_logs(container_id).await;
422 return Err(Error::DockerOperation(format!(
423 "Container {} reported unhealthy status.",
424 container_id
425 )));
426 }
427 _ => {
428 debug!(
429 "Container {} health status: {:?}",
430 container_id, status
431 );
432 }
433 }
434 }
435 } else if state.running.unwrap_or(false) {
436 info!(
437 "Container {} is running (no health check configured).",
438 container_id
439 );
440 return Ok(());
441 }
442 }
443 }
444 Err(e) => {
445 warn!(
446 "Error inspecting container {}: {}. Retrying...",
447 container_id, e
448 );
449 }
450 }
451
452 sleep(Duration::from_secs(
453 DEFAULT_CONTAINER_HEALTH_POLL_INTERVAL_SECS,
454 ))
455 .await;
456 }
457
458 warn!(
459 "Container {} did not become ready within {} seconds.",
460 container_id.chars().take(12).collect::<String>(),
461 timeout_secs
462 );
463 self.dump_container_logs(container_id).await;
464 Err(Error::DockerOperation(format!(
465 "Container {} did not become ready within {} seconds.",
466 container_id, timeout_secs
467 )))
468 }
469}