dcd/deployer/docker_manager/
mod.rs1mod error;
2mod installer;
3mod types;
4mod validator;
5
6use crate::deployer::types::ComposeExec;
7use crate::executor::{CommandExecutor, CommandResult, FileTransfer, OutputError};
8use async_trait::async_trait;
9pub use error::DockerError;
10use installer::DockerInstaller;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use types::{DockerResult, DockerVersion, LinuxDistro};
14use validator::DockerValidator;
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct UnhealthyService {
21 pub name: String,
22 pub state: String, pub health: String, pub exit_code: i32,
25 pub status: String, }
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum HealthCheckResult {
31 Healthy,
33 Starting(Vec<UnhealthyService>),
36 Failed(Vec<UnhealthyService>),
39 NoServices,
41}
42
43#[async_trait]
46pub trait DockerManager: Send {
47 async fn ensure_docker_installed(&mut self) -> DockerResult<()>;
49
50 async fn ensure_docker_compose_installed(&mut self) -> DockerResult<()>;
52
53 async fn get_docker_version(&mut self) -> DockerResult<DockerVersion>;
55
56 async fn verify_compose_file(&mut self) -> DockerResult<()>;
58
59 async fn get_services_status(&mut self) -> DockerResult<ComposeStatus>;
61
62 async fn compose_up(&mut self) -> DockerResult<()>;
64
65 async fn upload_compose_file(
67 &mut self,
68 local_path: &Path, ) -> DockerResult<()>;
70
71 async fn verify_services_healthy(&mut self) -> DockerResult<HealthCheckResult>;
74
75 async fn has_running_services(&mut self) -> DockerResult<bool>;
77
78 async fn compose_down(&mut self, remove_volumes: bool, remove_images: bool)
80 -> DockerResult<()>;
81
82 async fn remove_volume(&mut self, volume_name: &str) -> DockerResult<()>;
84
85 async fn prune_images(&mut self) -> DockerResult<()>;
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub struct Publisher {
91 #[serde(rename = "URL")]
92 pub url: String,
93 #[serde(rename = "TargetPort")]
94 pub target_port: u16,
95 #[serde(rename = "PublishedPort")]
96 pub published_port: u16,
97 #[serde(rename = "Protocol")]
98 pub protocol: String,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102pub struct ServiceStatus {
103 #[serde(rename = "Command")]
104 pub command: String,
105 #[serde(rename = "CreatedAt")]
106 pub created_at: String,
107 #[serde(rename = "ExitCode")]
108 pub exit_code: i32,
109 #[serde(rename = "Health")]
110 pub health: String,
111 #[serde(rename = "ID")]
112 pub id: String,
113 #[serde(rename = "Image")]
114 pub image: String,
115 #[serde(rename = "Labels")]
116 pub labels: String, #[serde(rename = "LocalVolumes")]
118 pub local_volumes: String,
119 #[serde(rename = "Mounts")]
120 pub mounts: String,
121 #[serde(rename = "Name")]
122 pub name: String,
123 #[serde(rename = "Names")]
124 pub names: String,
125 #[serde(rename = "Networks")]
126 pub networks: String,
127 #[serde(rename = "Ports")]
128 pub ports: String,
129 #[serde(rename = "Project")]
130 pub project: String,
131 #[serde(rename = "Publishers")]
132 pub publishers: Vec<Publisher>,
133 #[serde(rename = "RunningFor")]
134 pub running_for: String,
135 #[serde(rename = "Service")]
136 pub service: String,
137 #[serde(rename = "Size")]
138 pub size: String,
139 #[serde(rename = "State")]
140 pub state: String,
141 #[serde(rename = "Status")]
142 pub status: String,
143}
144
145pub trait DockerExec: CommandExecutor + FileTransfer {}
146
147impl<T: CommandExecutor + FileTransfer> DockerExec for T {}
148
149#[derive(Debug, Serialize, Deserialize)]
150pub struct ComposeStatus {
151 #[serde(flatten)]
152 pub services: Vec<ServiceStatus>,
153}
154
155impl ServiceStatus {
156 pub fn is_running(&self) -> bool {
157 self.state == "running"
158 }
159
160 pub fn is_healthy(&self) -> bool {
163 self.is_running() && (self.health.is_empty() || self.health == "healthy")
164 }
165
166 pub fn get_ports(&self) -> Vec<(u16, u16)> {
167 self.publishers
168 .iter()
169 .map(|p| (p.published_port, p.target_port))
170 .collect()
171 }
172}
173
174impl ComposeStatus {
175 pub fn new(services: Vec<ServiceStatus>) -> Self {
176 Self { services }
177 }
178
179 pub fn all_running(&self) -> bool {
180 self.services.iter().all(|s| s.is_running())
181 }
182
183 pub fn all_healthy(&self) -> bool {
184 self.services.iter().all(|s| s.is_healthy())
185 }
186
187 pub fn get_service(&self, name: &str) -> Option<&ServiceStatus> {
188 self.services.iter().find(|s| s.service == name)
189 }
190}
191
192pub struct SshDockerManager<'a> {
193 executor: &'a mut (dyn ComposeExec + Send),
194 distro: LinuxDistro,
195 working_directory: PathBuf,
196 compose_files: Vec<PathBuf>,
198 env_files: Vec<PathBuf>,
200}
201
202impl<'a> SshDockerManager<'a> {
203 pub async fn new(
205 executor: &'a mut (dyn ComposeExec + Send),
206 working_directory: PathBuf,
207 compose_files: Vec<PathBuf>,
208 env_files: Vec<PathBuf>,
209 ) -> DockerResult<Self> {
210 let mut validator = DockerValidator::new(executor);
211 let distro = validator.detect_distro().await?;
212
213 let mut manager = Self {
214 executor,
215 distro,
216 working_directory,
217 compose_files,
218 env_files,
219 };
220
221 manager.verify_working_directory().await?;
223
224 Ok(manager)
225 }
226
227 async fn verify_working_directory(&mut self) -> DockerResult<()> {
228 let cmd = format!(
229 "test -d {} && echo 'exists'",
230 self.working_directory.display()
231 );
232 let result = self.executor.execute_command(&cmd).await?;
233 if !result.is_success() {
234 return Err(DockerError::WorkingDirError(
235 "Working directory does not exist".into(),
236 ));
237 }
238 Ok(())
239 }
240
241 async fn execute_compose_command(&mut self, cmd: &str) -> DockerResult<CommandResult> {
242 let full_cmd = format!("cd {} && {}", self.working_directory.display(), cmd);
243 self.executor
244 .execute_command(&full_cmd)
245 .await
246 .map_err(DockerError::from)
247 }
248
249 fn format_docker_compose_command(&self, subcommand: &str) -> String {
251 let mut cmd = String::from("docker-compose");
252 for cf in &self.compose_files {
253 cmd.push_str(" -f ");
254 cmd.push_str(&cf.to_string_lossy());
255 }
256 for ef in &self.env_files {
257 cmd.push_str(" --env-file ");
258 cmd.push_str(&ef.to_string_lossy());
259 }
260 cmd.push(' ');
261 cmd.push_str(subcommand);
262 cmd
263 }
264}
265
266#[async_trait]
267impl DockerManager for SshDockerManager<'_> {
268 #[inline]
269 async fn ensure_docker_installed(&mut self) -> DockerResult<()> {
270 let mut validator = DockerValidator::new(self.executor);
271 if !validator.is_docker_installed().await? {
272 let mut installer = DockerInstaller::new(self.executor);
273 installer.install_docker(&self.distro).await?;
274 }
275 Ok(())
276 }
277
278 #[inline]
279 async fn ensure_docker_compose_installed(&mut self) -> DockerResult<()> {
280 let mut validator = DockerValidator::new(self.executor);
281 if !validator.is_docker_compose_installed().await? {
282 let mut installer = DockerInstaller::new(self.executor);
283 installer.install_docker_compose().await?;
284 }
285 Ok(())
286 }
287
288 #[inline]
289 async fn get_docker_version(&mut self) -> DockerResult<DockerVersion> {
290 let mut validator = DockerValidator::new(self.executor);
291 validator.get_docker_version().await
292 }
293
294 async fn get_services_status(&mut self) -> DockerResult<ComposeStatus> {
295 let cmd = self.format_docker_compose_command("ps --format json");
297 let result = self.execute_compose_command(&cmd).await?;
298
299 if !result.is_success() {
300 return Err(DockerError::CommandError {
301 cmd: "docker-compose ps".to_string(),
302 message: result.output.to_stderr_string()?,
303 });
304 }
305
306 let stdout = result.output.to_stdout_string()?;
309 let services: Vec<ServiceStatus> = if stdout.trim_start().starts_with('[') {
310 serde_json::from_str(&stdout)
312 .map_err(|e| DockerError::Output(OutputError::JsonError(e)))?
313 } else {
314 let mut v = Vec::new();
316 for line in stdout.lines() {
317 let line = line.trim();
318 if line.is_empty() {
319 continue;
320 }
321 let svc: ServiceStatus = serde_json::from_str(line)
322 .map_err(|e| DockerError::Output(OutputError::JsonError(e)))?;
323 v.push(svc);
324 }
325 v
326 };
327 Ok(ComposeStatus { services })
328 }
329
330 async fn compose_up(&mut self) -> DockerResult<()> {
331 self.prune_images().await?;
333
334 let pull_cmd = self.format_docker_compose_command("pull");
336 let up_cmd = self.format_docker_compose_command("up -d --remove-orphans");
337 let commands = [pull_cmd, up_cmd];
338
339 for cmd in &commands {
340 tracing::info!("Executing compose command: '{}'", cmd);
341 let result = self.execute_compose_command(cmd).await?;
342
343 if !result.is_success() {
344 let error_msg = result.output.to_stderr_string()?;
345 tracing::error!("Compose command failed: '{}'. Error: {}", cmd, error_msg);
346 return Err(DockerError::CommandError {
347 cmd: cmd.to_string(),
348 message: error_msg,
349 });
350 }
351 }
352 Ok(())
353 }
354
355 async fn verify_services_healthy(&mut self) -> DockerResult<HealthCheckResult> {
356 let status = self.get_services_status().await?;
357
358 if status.services.is_empty() {
359 tracing::warn!("No services found when checking health status.");
360 return Ok(HealthCheckResult::NoServices);
361 }
362
363 let mut starting_services = Vec::new();
364 let mut failed_services = Vec::new();
365
366 for s in status.services.iter() {
367 let is_technically_healthy =
369 s.is_running() && (s.health.is_empty() || s.health == "healthy");
370
371 if !is_technically_healthy {
372 let unhealthy_detail = UnhealthyService {
373 name: s.service.clone(),
374 state: s.state.clone(),
375 health: s.health.clone(),
376 exit_code: s.exit_code,
377 status: s.status.clone(),
378 };
379
380 if s.is_running() && s.health == "starting" {
382 starting_services.push(unhealthy_detail);
384 } else {
385 failed_services.push(unhealthy_detail);
387 }
388 }
389 }
390
391 if !failed_services.is_empty() {
392 tracing::warn!(
395 "Found definitively failed services: {:?}",
396 failed_services.iter().map(|s| &s.name).collect::<Vec<_>>()
397 );
398 failed_services.extend(starting_services); Ok(HealthCheckResult::Failed(failed_services))
400 } else if !starting_services.is_empty() {
401 tracing::info!(
403 "Found services still starting: {:?}",
404 starting_services
405 .iter()
406 .map(|s| &s.name)
407 .collect::<Vec<_>>()
408 );
409 Ok(HealthCheckResult::Starting(starting_services))
410 } else {
411 Ok(HealthCheckResult::Healthy)
413 }
414 }
415
416 async fn verify_compose_file(&mut self) -> DockerResult<()> {
417 let compose_path = self.working_directory.join("docker-compose.yml");
418 let cmd = format!("test -f {}", compose_path.display());
419
420 let result = self
421 .executor
422 .execute_command(&cmd)
423 .await
424 .map_err(DockerError::from)?;
425
426 if !result.is_success() {
427 return Err(DockerError::ComposeError(
428 "docker-compose.yml not found".to_string(),
429 ));
430 }
431 Ok(())
432 }
433
434 async fn upload_compose_file(&mut self, local_path: &Path) -> DockerResult<()> {
435 let remote_path = self.working_directory.join("docker-compose.yml");
436 self.executor
437 .upload_file(local_path, remote_path.as_ref())
438 .await
439 .map_err(DockerError::from)?;
440 self.verify_compose_file().await
441 }
442
443 async fn has_running_services(&mut self) -> DockerResult<bool> {
444 let status = self.get_services_status().await?;
445 Ok(status.services.iter().any(|s| s.is_running()))
446 }
447
448 async fn compose_down(
449 &mut self,
450 remove_volumes: bool,
451 remove_images: bool,
452 ) -> DockerResult<()> {
453 let mut cmd = self.format_docker_compose_command("down");
455 if remove_volumes {
456 cmd.push_str(" -v");
457 }
458 if remove_images {
459 cmd.push_str(" --rmi all");
460 }
461 let result = self.execute_compose_command(&cmd).await?;
462 if !result.is_success() {
463 return Err(DockerError::CommandError {
464 cmd: cmd.to_string(),
465 message: result
466 .output
467 .to_stderr_string()
468 .map_err(DockerError::from)?,
469 });
470 }
471 Ok(())
472 }
473
474 async fn remove_volume(&mut self, volume_name: &str) -> DockerResult<()> {
475 let cmd = format!("docker volume rm {}", volume_name);
476 let result = self.executor.execute_command(&cmd).await?;
477
478 if !result.is_success() && !result.output.to_stderr_string()?.contains("No such volume") {
480 return Err(DockerError::CommandError {
481 cmd,
482 message: result.output.to_stderr_string()?,
483 });
484 }
485 Ok(())
486 }
487
488 async fn prune_images(&mut self) -> DockerResult<()> {
489 tracing::info!("Pruning unused images for current project to save disk space...");
490
491 let project_name = self
493 .working_directory
494 .file_name()
495 .map(|name| name.to_string_lossy().into_owned())
496 .unwrap_or_else(|| "default".to_string());
497
498 let images_cmd = self.format_docker_compose_command("images --format json");
500 if let Ok(images_result) = self.execute_compose_command(&images_cmd).await {
501 if images_result.is_success() {
502 let stdout = images_result.output.to_stdout_string().unwrap_or_default();
503 if !stdout.trim().is_empty() {
504 tracing::debug!("Current project images: {}", stdout.trim());
505 }
506 }
507 }
508
509 let cmd = format!(
511 "docker image prune -f --filter label=com.docker.compose.project={}",
512 project_name
513 );
514
515 let result = self.executor.execute_command(&cmd).await?;
516
517 if !result.is_success() {
518 let error_msg = result.output.to_stderr_string()?;
519 tracing::warn!("Project image pruning failed: {}", error_msg);
520 return Err(DockerError::CommandError {
521 cmd: cmd.to_string(),
522 message: error_msg,
523 });
524 }
525
526 let stdout = result.output.to_stdout_string()?;
527 if !stdout.trim().is_empty() {
528 tracing::info!("Project image pruning result: {}", stdout.trim());
529 } else {
530 tracing::info!("No unused project images found to prune.");
531 }
532 Ok(())
533 }
534}