Skip to main content

dcd/deployer/docker_manager/
mod.rs

1mod error;
2mod installer;
3mod types;
4mod validator;
5
6use crate::deployer::types::ComposeExec;
7use crate::executor::{CommandExecutor, CommandResult, FileTransfer, OutputError};
8use async_trait::async_trait;
9pub use error::DockerError;
10use installer::DockerInstaller;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use types::{DockerResult, DockerVersion, LinuxDistro};
14use validator::DockerValidator;
15
16// --- New Types for Health Check ---
17
18/// Represents details of a service that is not healthy.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] // Added Serialize/Deserialize for potential future use
20pub struct UnhealthyService {
21    pub name: String,
22    pub state: String,  // e.g., "running", "exited"
23    pub health: String, // e.g., "unhealthy", "starting", "" (if no healthcheck but not running)
24    pub exit_code: i32,
25    pub status: String, // Full status string like "Exited (1)" or "Up (unhealthy)"
26}
27
28/// Represents the overall health status of the deployment.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum HealthCheckResult {
31    /// All services are running and healthy (or have no health check configured).
32    Healthy,
33    /// Contains services that are running but still in the 'starting' health state.
34    /// All other services are Healthy.
35    Starting(Vec<UnhealthyService>),
36    /// Contains services that are definitively unhealthy (e.g., exited, health='unhealthy', dead).
37    /// May also contain 'starting' services, but the presence of one terminal failure triggers this.
38    Failed(Vec<UnhealthyService>),
39    /// No services were found (e.g., compose file defines no services or `ps` returned empty).
40    NoServices,
41}
42
43// --- End of New Types ---
44
45#[async_trait]
46pub trait DockerManager: Send {
47    /// Check if Docker is installed and install if not
48    async fn ensure_docker_installed(&mut self) -> DockerResult<()>;
49
50    /// Check if Docker Compose is installed and install if not
51    async fn ensure_docker_compose_installed(&mut self) -> DockerResult<()>;
52
53    /// Get Docker version information
54    async fn get_docker_version(&mut self) -> DockerResult<DockerVersion>;
55
56    /// Verify docker-compose.yml exists in working directory
57    async fn verify_compose_file(&mut self) -> DockerResult<()>;
58
59    /// Get status of all services
60    async fn get_services_status(&mut self) -> DockerResult<ComposeStatus>;
61
62    /// Start services using docker-compose up -d
63    async fn compose_up(&mut self) -> DockerResult<()>;
64
65    /// Upload docker-compose.yml file
66    async fn upload_compose_file(
67        &mut self,
68        local_path: &Path, // Changed from generic P to &Path
69    ) -> DockerResult<()>;
70
71    /// Check the health status of all services.
72    /// Returns detailed information about unhealthy services if any.
73    async fn verify_services_healthy(&mut self) -> DockerResult<HealthCheckResult>;
74
75    /// Check if there are any running services
76    async fn has_running_services(&mut self) -> DockerResult<bool>;
77
78    /// Stop services using docker-compose down with optional volume and image removal
79    async fn compose_down(&mut self, remove_volumes: bool, remove_images: bool)
80        -> DockerResult<()>;
81
82    /// Remove a specific volume
83    async fn remove_volume(&mut self, volume_name: &str) -> DockerResult<()>;
84
85    /// Remove unused images to save disk space
86    async fn prune_images(&mut self) -> DockerResult<()>;
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct Publisher {
91    #[serde(rename = "URL")]
92    pub url: String,
93    #[serde(rename = "TargetPort")]
94    pub target_port: u16,
95    #[serde(rename = "PublishedPort")]
96    pub published_port: u16,
97    #[serde(rename = "Protocol")]
98    pub protocol: String,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102pub struct ServiceStatus {
103    #[serde(rename = "Command")]
104    pub command: String,
105    #[serde(rename = "CreatedAt")]
106    pub created_at: String,
107    #[serde(rename = "ExitCode")]
108    pub exit_code: i32,
109    #[serde(rename = "Health")]
110    pub health: String,
111    #[serde(rename = "ID")]
112    pub id: String,
113    #[serde(rename = "Image")]
114    pub image: String,
115    #[serde(rename = "Labels")]
116    pub labels: String, // Could be converted to HashMap if needed
117    #[serde(rename = "LocalVolumes")]
118    pub local_volumes: String,
119    #[serde(rename = "Mounts")]
120    pub mounts: String,
121    #[serde(rename = "Name")]
122    pub name: String,
123    #[serde(rename = "Names")]
124    pub names: String,
125    #[serde(rename = "Networks")]
126    pub networks: String,
127    #[serde(rename = "Ports")]
128    pub ports: String,
129    #[serde(rename = "Project")]
130    pub project: String,
131    #[serde(rename = "Publishers")]
132    pub publishers: Vec<Publisher>,
133    #[serde(rename = "RunningFor")]
134    pub running_for: String,
135    #[serde(rename = "Service")]
136    pub service: String,
137    #[serde(rename = "Size")]
138    pub size: String,
139    #[serde(rename = "State")]
140    pub state: String,
141    #[serde(rename = "Status")]
142    pub status: String,
143}
144
145pub trait DockerExec: CommandExecutor + FileTransfer {}
146
147impl<T: CommandExecutor + FileTransfer> DockerExec for T {}
148
149#[derive(Debug, Serialize, Deserialize)]
150pub struct ComposeStatus {
151    #[serde(flatten)]
152    pub services: Vec<ServiceStatus>,
153}
154
155impl ServiceStatus {
156    pub fn is_running(&self) -> bool {
157        self.state == "running"
158    }
159
160    /// A service is considered healthy if it's running AND
161    /// (it has no health check OR its health check reports "healthy").
162    pub fn is_healthy(&self) -> bool {
163        self.is_running() && (self.health.is_empty() || self.health == "healthy")
164    }
165
166    pub fn get_ports(&self) -> Vec<(u16, u16)> {
167        self.publishers
168            .iter()
169            .map(|p| (p.published_port, p.target_port))
170            .collect()
171    }
172}
173
174impl ComposeStatus {
175    pub fn new(services: Vec<ServiceStatus>) -> Self {
176        Self { services }
177    }
178
179    pub fn all_running(&self) -> bool {
180        self.services.iter().all(|s| s.is_running())
181    }
182
183    pub fn all_healthy(&self) -> bool {
184        self.services.iter().all(|s| s.is_healthy())
185    }
186
187    pub fn get_service(&self, name: &str) -> Option<&ServiceStatus> {
188        self.services.iter().find(|s| s.service == name)
189    }
190}
191
192pub struct SshDockerManager<'a> {
193    executor: &'a mut (dyn ComposeExec + Send),
194    distro: LinuxDistro,
195    working_directory: PathBuf,
196    /// List of compose files (relative to working directory or absolute paths)
197    compose_files: Vec<PathBuf>,
198    /// List of env files
199    env_files: Vec<PathBuf>,
200}
201
202impl<'a> SshDockerManager<'a> {
203    /// Create a new Docker manager for SSH, specifying compose and env files to use.
204    pub async fn new(
205        executor: &'a mut (dyn ComposeExec + Send),
206        working_directory: PathBuf,
207        compose_files: Vec<PathBuf>,
208        env_files: Vec<PathBuf>,
209    ) -> DockerResult<Self> {
210        let mut validator = DockerValidator::new(executor);
211        let distro = validator.detect_distro().await?;
212
213        let mut manager = Self {
214            executor,
215            distro,
216            working_directory,
217            compose_files,
218            env_files,
219        };
220
221        // Verify working directory exists
222        manager.verify_working_directory().await?;
223
224        Ok(manager)
225    }
226
227    async fn verify_working_directory(&mut self) -> DockerResult<()> {
228        let cmd = format!(
229            "test -d {} && echo 'exists'",
230            self.working_directory.display()
231        );
232        let result = self.executor.execute_command(&cmd).await?;
233        if !result.is_success() {
234            return Err(DockerError::WorkingDirError(
235                "Working directory does not exist".into(),
236            ));
237        }
238        Ok(())
239    }
240
241    async fn execute_compose_command(&mut self, cmd: &str) -> DockerResult<CommandResult> {
242        let full_cmd = format!("cd {} && {}", self.working_directory.display(), cmd);
243        self.executor
244            .execute_command(&full_cmd)
245            .await
246            .map_err(DockerError::from)
247    }
248
249    /// Build a docker-compose command string with configured compose files and env files.
250    fn format_docker_compose_command(&self, subcommand: &str) -> String {
251        let mut cmd = String::from("docker-compose");
252        for cf in &self.compose_files {
253            cmd.push_str(" -f ");
254            cmd.push_str(&cf.to_string_lossy());
255        }
256        for ef in &self.env_files {
257            cmd.push_str(" --env-file ");
258            cmd.push_str(&ef.to_string_lossy());
259        }
260        cmd.push(' ');
261        cmd.push_str(subcommand);
262        cmd
263    }
264}
265
266#[async_trait]
267impl DockerManager for SshDockerManager<'_> {
268    #[inline]
269    async fn ensure_docker_installed(&mut self) -> DockerResult<()> {
270        let mut validator = DockerValidator::new(self.executor);
271        if !validator.is_docker_installed().await? {
272            let mut installer = DockerInstaller::new(self.executor);
273            installer.install_docker(&self.distro).await?;
274        }
275        Ok(())
276    }
277
278    #[inline]
279    async fn ensure_docker_compose_installed(&mut self) -> DockerResult<()> {
280        let mut validator = DockerValidator::new(self.executor);
281        if !validator.is_docker_compose_installed().await? {
282            let mut installer = DockerInstaller::new(self.executor);
283            installer.install_docker_compose().await?;
284        }
285        Ok(())
286    }
287
288    #[inline]
289    async fn get_docker_version(&mut self) -> DockerResult<DockerVersion> {
290        let mut validator = DockerValidator::new(self.executor);
291        validator.get_docker_version().await
292    }
293
294    async fn get_services_status(&mut self) -> DockerResult<ComposeStatus> {
295        // Use configured compose and env files when listing services
296        let cmd = self.format_docker_compose_command("ps --format json");
297        let result = self.execute_compose_command(&cmd).await?;
298
299        if !result.is_success() {
300            return Err(DockerError::CommandError {
301                cmd: "docker-compose ps".to_string(),
302                message: result.output.to_stderr_string()?,
303            });
304        }
305
306        // Parse the JSON output into ServiceStatus structs.
307        // Some versions emit a JSON array, others emit one JSON object per line.
308        let stdout = result.output.to_stdout_string()?;
309        let services: Vec<ServiceStatus> = if stdout.trim_start().starts_with('[') {
310            // entire array as JSON
311            serde_json::from_str(&stdout)
312                .map_err(|e| DockerError::Output(OutputError::JsonError(e)))?
313        } else {
314            // line-delimited JSON objects
315            let mut v = Vec::new();
316            for line in stdout.lines() {
317                let line = line.trim();
318                if line.is_empty() {
319                    continue;
320                }
321                let svc: ServiceStatus = serde_json::from_str(line)
322                    .map_err(|e| DockerError::Output(OutputError::JsonError(e)))?;
323                v.push(svc);
324            }
325            v
326        };
327        Ok(ComposeStatus { services })
328    }
329
330    async fn compose_up(&mut self) -> DockerResult<()> {
331        // First prune unused images to save disk space
332        self.prune_images().await?;
333
334        // Pull latest images and start services with configured compose and env files
335        let pull_cmd = self.format_docker_compose_command("pull");
336        let up_cmd = self.format_docker_compose_command("up -d --remove-orphans");
337        let commands = [pull_cmd, up_cmd];
338
339        for cmd in &commands {
340            tracing::info!("Executing compose command: '{}'", cmd);
341            let result = self.execute_compose_command(cmd).await?;
342
343            if !result.is_success() {
344                let error_msg = result.output.to_stderr_string()?;
345                tracing::error!("Compose command failed: '{}'. Error: {}", cmd, error_msg);
346                return Err(DockerError::CommandError {
347                    cmd: cmd.to_string(),
348                    message: error_msg,
349                });
350            }
351        }
352        Ok(())
353    }
354
355    async fn verify_services_healthy(&mut self) -> DockerResult<HealthCheckResult> {
356        let status = self.get_services_status().await?;
357
358        if status.services.is_empty() {
359            tracing::warn!("No services found when checking health status.");
360            return Ok(HealthCheckResult::NoServices);
361        }
362
363        let mut starting_services = Vec::new();
364        let mut failed_services = Vec::new();
365
366        for s in status.services.iter() {
367            // Base definition of "healthy" - running and (no healthcheck or health='healthy')
368            let is_technically_healthy =
369                s.is_running() && (s.health.is_empty() || s.health == "healthy");
370
371            if !is_technically_healthy {
372                let unhealthy_detail = UnhealthyService {
373                    name: s.service.clone(),
374                    state: s.state.clone(),
375                    health: s.health.clone(),
376                    exit_code: s.exit_code,
377                    status: s.status.clone(),
378                };
379
380                // Categorize: Is it just starting or actually failed?
381                if s.is_running() && s.health == "starting" {
382                    // It's running but health is 'starting' -> Potential recovery
383                    starting_services.push(unhealthy_detail);
384                } else {
385                    // It's exited, restarting, dead, or health='unhealthy' -> Definitive failure
386                    failed_services.push(unhealthy_detail);
387                }
388            }
389        }
390
391        if !failed_services.is_empty() {
392            // If any service has definitively failed, report Failed overall.
393            // Include starting services in the report for completeness.
394            tracing::warn!(
395                "Found definitively failed services: {:?}",
396                failed_services.iter().map(|s| &s.name).collect::<Vec<_>>()
397            );
398            failed_services.extend(starting_services); // Combine lists
399            Ok(HealthCheckResult::Failed(failed_services))
400        } else if !starting_services.is_empty() {
401            // No failed services, but some are still starting.
402            tracing::info!(
403                "Found services still starting: {:?}",
404                starting_services
405                    .iter()
406                    .map(|s| &s.name)
407                    .collect::<Vec<_>>()
408            );
409            Ok(HealthCheckResult::Starting(starting_services))
410        } else {
411            // All services are technically healthy.
412            Ok(HealthCheckResult::Healthy)
413        }
414    }
415
416    async fn verify_compose_file(&mut self) -> DockerResult<()> {
417        let compose_path = self.working_directory.join("docker-compose.yml");
418        let cmd = format!("test -f {}", compose_path.display());
419
420        let result = self
421            .executor
422            .execute_command(&cmd)
423            .await
424            .map_err(DockerError::from)?;
425
426        if !result.is_success() {
427            return Err(DockerError::ComposeError(
428                "docker-compose.yml not found".to_string(),
429            ));
430        }
431        Ok(())
432    }
433
434    async fn upload_compose_file(&mut self, local_path: &Path) -> DockerResult<()> {
435        let remote_path = self.working_directory.join("docker-compose.yml");
436        self.executor
437            .upload_file(local_path, remote_path.as_ref())
438            .await
439            .map_err(DockerError::from)?;
440        self.verify_compose_file().await
441    }
442
443    async fn has_running_services(&mut self) -> DockerResult<bool> {
444        let status = self.get_services_status().await?;
445        Ok(status.services.iter().any(|s| s.is_running()))
446    }
447
448    async fn compose_down(
449        &mut self,
450        remove_volumes: bool,
451        remove_images: bool,
452    ) -> DockerResult<()> {
453        // Stop services and remove containers/networks with configured compose and env files
454        let mut cmd = self.format_docker_compose_command("down");
455        if remove_volumes {
456            cmd.push_str(" -v");
457        }
458        if remove_images {
459            cmd.push_str(" --rmi all");
460        }
461        let result = self.execute_compose_command(&cmd).await?;
462        if !result.is_success() {
463            return Err(DockerError::CommandError {
464                cmd: cmd.to_string(),
465                message: result
466                    .output
467                    .to_stderr_string()
468                    .map_err(DockerError::from)?,
469            });
470        }
471        Ok(())
472    }
473
474    async fn remove_volume(&mut self, volume_name: &str) -> DockerResult<()> {
475        let cmd = format!("docker volume rm {}", volume_name);
476        let result = self.executor.execute_command(&cmd).await?;
477
478        // If the volume doesn't exist, that's fine
479        if !result.is_success() && !result.output.to_stderr_string()?.contains("No such volume") {
480            return Err(DockerError::CommandError {
481                cmd,
482                message: result.output.to_stderr_string()?,
483            });
484        }
485        Ok(())
486    }
487
488    async fn prune_images(&mut self) -> DockerResult<()> {
489        tracing::info!("Pruning unused images for current project to save disk space...");
490
491        // Get the project name from the working directory (used by docker-compose for labeling)
492        let project_name = self
493            .working_directory
494            .file_name()
495            .map(|name| name.to_string_lossy().into_owned())
496            .unwrap_or_else(|| "default".to_string());
497
498        // First, try to get current project images to understand what we're working with
499        let images_cmd = self.format_docker_compose_command("images --format json");
500        if let Ok(images_result) = self.execute_compose_command(&images_cmd).await {
501            if images_result.is_success() {
502                let stdout = images_result.output.to_stdout_string().unwrap_or_default();
503                if !stdout.trim().is_empty() {
504                    tracing::debug!("Current project images: {}", stdout.trim());
505                }
506            }
507        }
508
509        // Prune images with compose project label to target only this project's unused images
510        let cmd = format!(
511            "docker image prune -f --filter label=com.docker.compose.project={}",
512            project_name
513        );
514
515        let result = self.executor.execute_command(&cmd).await?;
516
517        if !result.is_success() {
518            let error_msg = result.output.to_stderr_string()?;
519            tracing::warn!("Project image pruning failed: {}", error_msg);
520            return Err(DockerError::CommandError {
521                cmd: cmd.to_string(),
522                message: error_msg,
523            });
524        }
525
526        let stdout = result.output.to_stdout_string()?;
527        if !stdout.trim().is_empty() {
528            tracing::info!("Project image pruning result: {}", stdout.trim());
529        } else {
530            tracing::info!("No unused project images found to prune.");
531        }
532        Ok(())
533    }
534}