use std::collections::{HashMap, HashSet};
use super::types::{DockerManager, ServiceInfo, ServiceStatus};
use crate::constants::timeout;
use anyhow::Result;
use ducker::docker::{container::DockerContainer, util::new_local_docker_connection};
use tokio::time::{Duration, sleep};
use tracing::{debug, error, info, warn};
impl DockerManager {
pub async fn start_services(&self) -> Result<()> {
info!("🚀 Starting Docker services...");
info!("📁 Step 1: Check and create host mount directories...");
self.ensure_host_volumes_exist().await?;
info!("🎯 Step 2: Run docker-compose up...");
let output = self.run_compose_command(&["up", "-d"]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let exit_code = output.status.code().unwrap_or(-1);
let error_msg = format!(
"Failed to start services (exit code: {exit_code}):\nstderr: {stderr}\nstdout: {stdout}"
);
error!("❌ Service startup failure details: {}", error_msg);
return Err(anyhow::anyhow!(error_msg));
}
info!("✅ docker-compose up completed successfully");
info!("⏳ Step 3: Wait for services and verify status...");
self.verify_services_started(None).await?;
info!("🎉 All services started!");
Ok(())
}
pub async fn stop_services(&self) -> Result<()> {
let output = self.run_compose_command(&["down"]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let exit_code = output.status.code().unwrap_or(-1);
let error_msg = format!(
"Failed to stop services (exit code: {exit_code}):\nstderr: {stderr}\nstdout: {stdout}"
);
error!("{}", error_msg);
return Err(anyhow::anyhow!(error_msg));
}
Ok(())
}
pub async fn restart_services(&self) -> Result<()> {
let output = self.run_compose_command(&["restart"]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let exit_code = output.status.code().unwrap_or(-1);
let error_msg = format!(
"Failed to restart services (exit code: {exit_code}):\nstderr: {stderr}\nstdout: {stdout}"
);
error!("{}", error_msg);
return Err(anyhow::anyhow!(error_msg));
}
self.verify_services_started(None).await?;
Ok(())
}
pub async fn restart_service(&self, service_name: &str) -> Result<()> {
let output = self.run_compose_command(&["stop", service_name]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let exit_code = output.status.code().unwrap_or(-1);
let error_msg = format!(
"Failed to stop service {service_name} (exit code: {exit_code}):\nstderr: {stderr}\nstdout: {stdout}"
);
error!("{}", error_msg);
return Err(anyhow::anyhow!(error_msg));
}
let output = self.run_compose_command(&["start", service_name]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let exit_code = output.status.code().unwrap_or(-1);
let error_msg = format!(
"Failed to start service {service_name} (exit code: {exit_code}):\nstderr: {stderr}\nstdout: {stdout}"
);
error!("{}", error_msg);
return Err(anyhow::anyhow!(error_msg));
}
Ok(())
}
pub async fn get_services_status(&self) -> Result<Vec<ServiceInfo>> {
info!("Using ducker library to query container status...");
let compose_services = self.get_compose_service_names().await?;
info!("Services defined in docker-compose.yml: {:?}", compose_services);
let containers = self.get_all_containers_with_ducker().await?;
info!("Discovered {} containers in the system", containers.len());
let mut service_containers: HashMap<String, Vec<ServiceInfo>> = HashMap::new();
let mut compose_services_found = HashSet::new();
for container in containers {
for service_name in &compose_services {
if self.is_service_name_match(&container.names, service_name) {
let service_info =
self.convert_docker_container_to_service_info(container.clone());
let mut normalized_service_info = service_info;
normalized_service_info.name = service_name.clone();
service_containers
.entry(service_name.clone())
.or_default()
.push(normalized_service_info);
compose_services_found.insert(service_name.clone());
break; }
}
}
let mut final_services = Vec::new();
for service_name in &compose_services {
if let Some(containers) = service_containers.get(service_name) {
let best_container = containers
.iter()
.max_by_key(|container| {
match container.status {
ServiceStatus::Running => 2,
ServiceStatus::Stopped => 1,
ServiceStatus::Unknown => 0,
ServiceStatus::Created => 0,
ServiceStatus::Restarting => 0,
}
})
.unwrap();
final_services.push(best_container.clone());
} else {
final_services.push(ServiceInfo {
name: service_name.clone(),
status: ServiceStatus::Stopped,
image: "Not started".to_string(),
ports: Vec::new(),
});
}
}
info!(
"Matched {}/{} compose service containers",
compose_services_found.len(),
compose_services.len()
);
Ok(final_services)
}
pub async fn get_all_containers_status(&self) -> Result<Vec<ServiceInfo>> {
info!("Using ducker library to query all container statuses...");
let containers = self.get_all_containers_with_ducker().await?;
let services = containers
.into_iter()
.map(|container| self.convert_docker_container_to_service_info(container))
.collect();
Ok(services)
}
async fn get_all_containers_with_ducker(&self) -> Result<Vec<DockerContainer>> {
match new_local_docker_connection(crate::constants::docker::DOCKER_SOCKET_PATH, None).await
{
Ok(docker) => match DockerContainer::list(&docker).await {
Ok(containers) => {
info!("ducker fetched {} containers successfully", containers.len());
Ok(containers)
}
Err(e) => {
error!("ducker failed to list containers: {}", e);
Err(anyhow::anyhow!("Failed to list containers: {e}"))
}
},
Err(e) => {
error!("ducker failed to connect to Docker: {}", e);
Err(anyhow::anyhow!("Failed to connect to Docker: {e}"))
}
}
}
fn convert_docker_container_to_service_info(&self, container: DockerContainer) -> ServiceInfo {
let status = if container.running {
ServiceStatus::Running
} else {
match container.status.to_lowercase().as_str() {
s if s.contains("exited") => ServiceStatus::Stopped,
s if s.contains("created") => ServiceStatus::Created,
s if s.contains("restarting") => ServiceStatus::Restarting,
s if s.contains("paused") => ServiceStatus::Stopped,
s if s.contains("dead") => ServiceStatus::Stopped,
s if s.contains("running") => ServiceStatus::Running,
_ => ServiceStatus::Unknown,
}
};
let ports = if container.ports.is_empty() {
Vec::new()
} else {
container
.ports
.split(", ")
.filter(|s| !s.trim().is_empty())
.map(|s| s.trim().to_string())
.collect()
};
ServiceInfo {
name: container.names.clone(),
status,
image: container.image.clone(),
ports,
}
}
pub async fn is_service_running(&self, service_name: &str) -> Result<bool> {
let services = self.get_services_status().await?;
for service in services {
if self.is_service_name_match(&service.name, service_name) {
return Ok(service.status == ServiceStatus::Running);
}
}
Ok(false)
}
fn is_service_name_match(&self, container_name: &str, service_name: &str) -> bool {
let patterns = self.generate_compose_container_patterns(service_name);
let container_lower = container_name.to_lowercase();
for pattern in patterns {
let pattern_lower = pattern.to_lowercase();
if container_lower == pattern_lower {
return true;
}
if container_lower.starts_with(&pattern_lower) {
return true;
}
}
let service_lower = service_name.to_lowercase();
let separators = vec!["_", "-"];
for separator in separators {
let pattern1 = format!("{separator}{service_lower}{separator}");
if container_lower.contains(&pattern1) {
return true;
}
let pattern2 = format!("{separator}{service_lower}");
if container_lower.ends_with(&pattern2) {
return true;
}
let pattern3 = format!("{service_lower}{separator}");
if container_lower.starts_with(&pattern3) {
return true;
}
}
container_lower == service_lower
}
pub async fn get_service_detail(&self, service_name: &str) -> Result<Option<ServiceInfo>> {
let services = self.get_services_status().await?;
for service in services {
if self.is_service_name_match(&service.name, service_name) {
return Ok(Some(service));
}
}
Ok(None)
}
pub async fn check_services_health(&self) -> Result<()> {
let services = self.get_services_status().await?;
if services.is_empty() {
return Err(anyhow::anyhow!("No services found"));
}
let mut unhealthy_services = Vec::new();
for service in services {
if service.status != ServiceStatus::Running {
unhealthy_services.push(service.name);
}
}
if !unhealthy_services.is_empty() {
return Err(anyhow::anyhow!(
"Some services are not running: {}",
unhealthy_services.join(", ")
));
}
Ok(())
}
async fn verify_services_started(&self, custom_timeout: Option<u64>) -> Result<()> {
let max_wait_time =
Duration::from_secs(custom_timeout.unwrap_or(timeout::SERVICE_START_TIMEOUT));
let check_interval = Duration::from_secs(timeout::SERVICE_CHECK_INTERVAL);
let max_attempts = max_wait_time.as_secs() / check_interval.as_secs();
info!(
"🔍 Verifying service startup status (timeout: {}s, interval: {}s)",
max_wait_time.as_secs(),
check_interval.as_secs()
);
for attempt in 1..=max_attempts {
info!("⏳ Service status check {}/{}...", attempt, max_attempts);
match self.get_services_status().await {
Ok(services) => {
if services.is_empty() {
info!("⚠️ No services found; compose file may not define services");
return Ok(()); }
info!("📊 Found {} services; checking states...", services.len());
let mut failed_services = Vec::new();
let mut pending_services = Vec::new();
let mut running_services = Vec::new();
for service in &services {
match service.status {
ServiceStatus::Running => {
running_services.push(service.name.clone());
debug!("Service {} is running normally", service.name);
}
ServiceStatus::Stopped => {
if self
.is_oneshot_service(&service.name)
.await
.unwrap_or(false)
{
debug!("Service {} is a one-shot task and exited normally", service.name);
} else {
failed_services.push(service.name.clone());
}
}
ServiceStatus::Unknown => {
pending_services.push(service.name.clone());
}
ServiceStatus::Created => {
pending_services.push(service.name.clone());
}
ServiceStatus::Restarting => {
pending_services.push(service.name.clone());
}
}
}
if !running_services.is_empty() {
info!("✅ Running services: {}", running_services.join(", "));
}
if !pending_services.is_empty() {
info!("⏳ Pending services: {}", pending_services.join(", "));
}
if !failed_services.is_empty() {
info!("⚠️ Failed services: {}", failed_services.join(", "));
}
if failed_services.is_empty() && pending_services.is_empty() {
info!("🎉 Service startup verification passed!");
tracing::info!("Service startup verification passed");
return Ok(());
}
if !failed_services.is_empty() {
warn!("⚠️ Service startup failed: {}", failed_services.join(", "));
tracing::warn!("Service startup failed: {}", failed_services.join(", "));
}
if !pending_services.is_empty() {
info!("⏳ Continuing to wait for services: {}", pending_services.join(", "));
tracing::debug!("Waiting for services to start: {}", pending_services.join(", "));
}
if attempt == max_attempts {
let mut error_msg = String::new();
if !failed_services.is_empty() {
error_msg.push_str(&format!(
"Failed services: {}",
failed_services.join(", ")
));
}
if !pending_services.is_empty() {
if !error_msg.is_empty() {
error_msg.push_str("; ");
}
error_msg.push_str(&format!(
"Services timed out during startup: {}",
pending_services.join(", ")
));
}
error!("❌ Service startup verification failed: {}", error_msg);
return Err(anyhow::anyhow!("Service startup verification failed: {error_msg}"));
}
}
Err(e) => {
warn!("⚠️ Failed to get service status: {}", e);
if attempt == max_attempts {
error!("❌ Unable to get service status: {}", e);
return Err(anyhow::anyhow!("Unable to get service status: {e}"));
}
}
}
if attempt < max_attempts {
info!("⏳ Waiting {} seconds before the next check...", check_interval.as_secs());
sleep(check_interval).await;
}
}
Ok(())
}
}