use crate::docker_service::{DockerServiceError, DockerServiceResult};
use bollard::Docker;
use bollard::models::HealthStatusEnum;
use bollard::query_parameters::{InspectContainerOptions, ListContainersOptions};
use client_core::constants::timeout;
use client_core::container::DockerManager;
use rust_i18n::t;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::time::Duration;
use std::{collections::HashSet, sync::Arc};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RestartPolicy {
No,
Always,
UnlessStopped,
OnFailure,
OnFailureWithRetries(u32),
}
impl FromStr for RestartPolicy {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
RestartPolicy::parse(s).ok_or_else(|| anyhow::anyhow!("Invalid restart policy: {}", s))
}
}
impl std::fmt::Display for RestartPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::No => "no",
Self::Always => "always",
Self::UnlessStopped => "unless-stopped",
Self::OnFailure => "on-failure",
Self::OnFailureWithRetries(retries) => return write!(f, "on-failure:{retries}"),
};
write!(f, "{}", s)
}
}
impl RestartPolicy {
pub fn parse(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"no" | "false" => Some(Self::No),
"always" => Some(Self::Always),
"unless-stopped" => Some(Self::UnlessStopped),
"on-failure" => Some(Self::OnFailure),
s if s.starts_with("on-failure:") => {
if let Ok(retries) = s[11..].parse::<u32>() {
Some(Self::OnFailureWithRetries(retries))
} else {
Some(Self::OnFailure)
}
}
_ => None,
}
}
pub fn as_str(&self) -> String {
match self {
Self::No => "no".to_string(),
Self::Always => "always".to_string(),
Self::UnlessStopped => "unless-stopped".to_string(),
Self::OnFailure => "on-failure".to_string(),
Self::OnFailureWithRetries(retries) => format!("on-failure:{retries}"),
}
}
pub fn is_oneshot(&self) -> bool {
matches!(self, Self::No)
}
pub fn should_keep_running(&self) -> bool {
matches!(
self,
Self::Always | Self::UnlessStopped | Self::OnFailure | Self::OnFailureWithRetries(_)
)
}
pub fn display_name(&self) -> String {
match self {
Self::No => t!("restart_policy.no"),
Self::Always => t!("restart_policy.always"),
Self::UnlessStopped => t!("restart_policy.unless_stopped"),
Self::OnFailure => t!("restart_policy.on_failure"),
Self::OnFailureWithRetries(_) => t!("restart_policy.on_failure_n"),
}
.to_string()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComposeLabels {
pub project: Option<String>,
pub service: Option<String>,
pub container_number: Option<String>,
pub oneoff: Option<bool>,
pub config_files: Option<String>,
pub working_dir: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ContainerStatus {
Running,
Stopped,
Starting,
Completed,
Unknown,
}
impl ContainerStatus {
pub fn from_ducker_status(running: bool, status: &str, is_oneshot: bool) -> Self {
if running {
ContainerStatus::Running
} else if status.to_lowercase().contains("exited") {
if is_oneshot {
if status.contains("(0)") {
ContainerStatus::Completed } else {
ContainerStatus::Stopped }
} else {
ContainerStatus::Stopped }
} else if status.to_lowercase().contains("restarting")
|| status.to_lowercase().contains("created")
{
ContainerStatus::Starting
} else {
ContainerStatus::Unknown
}
}
pub fn display_name(&self) -> String {
match self {
ContainerStatus::Running => t!("container_status.running").to_string(),
ContainerStatus::Stopped => t!("container_status.stopped").to_string(),
ContainerStatus::Starting => t!("container_status.starting").to_string(),
ContainerStatus::Completed => t!("container_status.completed").to_string(),
ContainerStatus::Unknown => t!("container_status.unknown").to_string(),
}
}
pub fn is_running(&self) -> bool {
matches!(self, ContainerStatus::Running)
}
pub fn is_healthy(&self) -> bool {
matches!(self, ContainerStatus::Running | ContainerStatus::Completed)
}
pub fn is_transitioning(&self) -> bool {
matches!(self, ContainerStatus::Starting)
}
pub fn is_failed(&self) -> bool {
matches!(self, ContainerStatus::Stopped | ContainerStatus::Unknown)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerInfo {
pub name: String,
pub status: ContainerStatus,
pub image: String,
pub ports: Vec<String>,
pub uptime: Option<String>,
pub health: Option<HealthStatusEnum>,
pub is_oneshot: bool,
pub restart: Option<RestartPolicy>,
}
impl ContainerInfo {
pub fn is_oneshot(&self) -> bool {
match &self.restart {
Some(policy) => policy.is_oneshot(),
None => {
false
}
}
}
pub fn is_persistent_service(&self) -> bool {
match &self.restart {
Some(policy) => policy.should_keep_running(),
None => {
true
}
}
}
pub fn get_restart_display(&self) -> String {
match &self.restart {
Some(policy) => policy.as_str(),
None => t!("restart_policy.unknown").to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ServiceStatus {
AllRunning,
PartiallyRunning,
AllStopped,
Starting,
Unknown,
NoContainer,
}
impl ServiceStatus {
pub fn display_name(&self) -> String {
match self {
ServiceStatus::AllRunning => t!("service_status.all_running").to_string(),
ServiceStatus::PartiallyRunning => t!("service_status.partially_running").to_string(),
ServiceStatus::AllStopped => t!("service_status.all_stopped").to_string(),
ServiceStatus::Starting => t!("service_status.starting").to_string(),
ServiceStatus::Unknown => t!("service_status.unknown").to_string(),
ServiceStatus::NoContainer => t!("service_status.no_container").to_string(),
}
}
pub fn is_healthy(&self) -> bool {
matches!(self, ServiceStatus::AllRunning)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthReport {
pub containers: Vec<ContainerInfo>,
running_count: usize,
one_shot_count: usize,
total_count: usize,
pub check_time: chrono::DateTime<chrono::Utc>,
pub errors: Vec<String>,
}
impl HealthReport {
pub fn add_container(&mut self, container: ContainerInfo) {
self.containers.push(container);
}
pub fn add_error(&mut self, error: String) {
self.errors.push(error);
}
pub fn finalize(&self) -> ServiceStatus {
let healthy_count = self.get_healthy_count();
let total_count = self.get_total_count();
let one_shot_count = self.get_one_shot_count();
let running_count = self.get_running_count();
if total_count == 0 {
ServiceStatus::NoContainer
} else if (healthy_count + one_shot_count) == total_count {
ServiceStatus::AllRunning
} else if running_count == 0 {
ServiceStatus::AllStopped
} else {
let has_starting = self.containers.iter().any(|c| c.status.is_transitioning());
if has_starting {
ServiceStatus::Starting
} else {
ServiceStatus::PartiallyRunning
}
}
}
pub fn get_running_containers(&self) -> Vec<&ContainerInfo> {
self.containers
.iter()
.filter(|c| matches!(c.status, ContainerStatus::Running))
.collect()
}
pub fn get_completed_containers(&self) -> Vec<&ContainerInfo> {
self.containers
.iter()
.filter(|c| matches!(c.status, ContainerStatus::Completed))
.collect()
}
pub fn get_failed_containers(&self) -> Vec<&ContainerInfo> {
self.containers
.iter()
.filter(|c| c.status.is_failed())
.collect()
}
pub fn get_running_count(&self) -> usize {
self.containers
.iter()
.filter(|c| c.status.is_running())
.count()
}
pub fn get_total_count(&self) -> usize {
self.containers.len()
}
pub fn get_starting_containers(&self) -> Vec<&ContainerInfo> {
self.containers
.iter()
.filter(|c| c.status.is_transitioning())
.collect()
}
pub fn get_one_shot_count(&self) -> usize {
self.containers.iter().filter(|c| c.is_oneshot()).count()
}
pub fn get_healthy_count(&self) -> usize {
self.containers
.iter()
.filter_map(|c| c.health)
.filter(|&c| c == HealthStatusEnum::HEALTHY)
.count()
}
pub fn get_failed_container_names(&self) -> Vec<String> {
self.get_failed_containers()
.iter()
.map(|c| c.name.clone())
.collect()
}
pub fn get_status_summary(&self) -> String {
let failed_containers = self.get_failed_container_names();
let starting_containers: Vec<String> = self
.get_starting_containers()
.iter()
.map(|c| c.name.clone())
.collect();
let mut summary = format!(
"📊 [Healthy: {}/{}] ✅ Running: {} | ✔️ One-shot (init): {} | ❌ Failed: {} | ⏳ Starting: {}",
self.get_healthy_count(),
self.get_total_count(),
self.get_running_count(),
self.get_one_shot_count(),
failed_containers.len(),
starting_containers.len()
);
if !failed_containers.is_empty() {
summary.push_str(&format!(
" | Failed containers: {}",
failed_containers.join(", ")
));
}
if !starting_containers.is_empty() {
summary.push_str(&format!(" | Starting: {}", starting_containers.join(", ")));
}
summary
}
pub fn is_all_healthy(&self) -> bool {
let healthy_count = self.get_healthy_count();
let one_shot_count = self.get_one_shot_count();
let total_count = self.get_total_count();
healthy_count > 0 && healthy_count == total_count - one_shot_count
}
pub fn healthy_containers(&self) -> Vec<&ContainerInfo> {
self.containers
.iter()
.filter(|c| c.status.is_healthy())
.collect()
}
pub fn total_containers(&self) -> usize {
self.containers.len()
}
pub fn failed_containers(&self) -> Vec<&ContainerInfo> {
self.get_failed_containers()
}
}
impl Default for HealthReport {
fn default() -> Self {
Self {
containers: Vec::new(),
running_count: 0,
one_shot_count: 0,
total_count: 0,
check_time: chrono::Utc::now(),
errors: Vec::new(),
}
}
}
pub struct HealthChecker {
docker_manager: Arc<DockerManager>,
}
impl HealthChecker {
pub fn new(docker_manager: Arc<DockerManager>) -> Self {
Self { docker_manager }
}
async fn get_restart_policy(&self, service_name: &str) -> Option<RestartPolicy> {
if let Ok(service_config) = self.docker_manager.parse_service_config(service_name).await
&& let Some(restart_str) = service_config.restart
{
return RestartPolicy::parse(&restart_str);
}
None
}
pub async fn health_check(&self) -> DockerServiceResult<HealthReport> {
info!("🏥 Starting health check...");
let compose_project_name = self.docker_manager.get_compose_project_name();
let compose_file_path = self
.docker_manager
.get_compose_file()
.to_string_lossy()
.to_string();
info!("📋 Docker Compose Project Info:");
info!(" - Project name: {name}", name = compose_project_name);
info!(" - Config file: {path}", path = compose_file_path);
let mut report = HealthReport::default();
let compose_services = self
.docker_manager
.get_compose_service_names()
.await
.unwrap_or_else(|e| {
error!(
"Failed to get compose services: {error}",
error = e.to_string()
);
HashSet::new()
});
if compose_services.is_empty() {
warn!("⚠️ No services defined in compose file");
return Ok(report);
}
info!(
"🔍 Services defined in compose file: {services}",
services = format!("{:?}", compose_services)
);
let all_containers = self
.docker_manager
.get_all_containers_status()
.await
.unwrap_or_else(|e| {
error!(
"Failed to get container status: {error}",
error = e.to_string()
);
Vec::new()
});
info!(
"📊 Found {count} containers in system",
count = all_containers.len()
);
let mut found_services = HashSet::new();
let mut added_containers = HashSet::new();
for service in &all_containers {
if let Some(service_name) = self.get_container_service_name(&service.name).await {
if self
.is_container_from_compose_project(
&service.name,
&compose_project_name,
&compose_file_path,
)
.await
{
if compose_services.contains(&service_name) {
info!(
"✅ Matched compose service: {container} -> {service}",
container = service.name,
service = service_name
);
if added_containers.contains(&service_name) {
warn!(
"⚠️ Skipping duplicate compose service: {service} (container: {container})",
service = service_name,
container = service.name
);
continue;
}
found_services.insert(service_name.clone());
added_containers.insert(service_name.clone());
let is_oneshot = self.is_oneshot_service(&service_name).await;
let restart_policy = self.get_restart_policy(&service_name).await;
let status = self.determine_container_status(service, is_oneshot);
let health = self.get_container_health_status(&service.name).await;
let container = ContainerInfo {
name: service_name.clone(), status,
image: service.image.clone(),
ports: service.ports.clone(),
uptime: None,
health,
is_oneshot,
restart: restart_policy,
};
debug!(
"📦 Added container: {} (status: {:?}, oneshot: {})",
container.name, container.status, is_oneshot
);
report.add_container(container);
} else {
warn!(
"⏭️ Skipping non-project container: {container} (service: {service})",
container = service.name,
service = service_name
);
}
} else {
debug!(
"⏭️ Skipping other project container: {container} (project: other)",
container = service.name
);
}
} else {
debug!(
"⏭️ Skipping non-compose container: {container} (no label info)",
container = service.name
);
}
}
info!(
"📊 Round 1 complete: added {count} containers",
count = added_containers.len()
);
for service_name in &compose_services {
if !found_services.contains(service_name) {
if added_containers.contains(service_name) {
warn!(
"⚠️ Skipping duplicate stopped service: {service}",
service = service_name
);
continue;
}
let is_oneshot = self.is_oneshot_service(service_name).await;
let restart_policy = self.get_restart_policy(service_name).await;
let status = if is_oneshot {
ContainerStatus::Completed
} else {
ContainerStatus::Stopped
};
let container = ContainerInfo {
name: service_name.clone(),
status,
image: t!("health_check.not_started_label").to_string(),
ports: Vec::new(),
uptime: None,
health: None,
is_oneshot,
restart: restart_policy,
};
info!(
"📦 Adding stopped service: {name} (status: {status}, oneshot: {oneshot})",
name = container.name,
status = format!("{:?}", container.status),
oneshot = is_oneshot
);
report.add_container(container);
added_containers.insert(service_name.clone());
}
}
info!(
"📊 Final stats: compose services={compose}, added containers={containers}",
compose = compose_services.len(),
containers = added_containers.len()
);
let summary = format!(
"{}: {}/{}",
t!("health_check.complete"),
report.get_healthy_count(),
report.get_total_count()
);
info!("🎯 {summary}", summary = summary);
Ok(report)
}
fn determine_container_status(
&self,
service: &client_core::container::ServiceInfo,
is_oneshot: bool,
) -> ContainerStatus {
match service.status {
client_core::container::ServiceStatus::Running => ContainerStatus::Running,
client_core::container::ServiceStatus::Stopped => {
if is_oneshot {
ContainerStatus::Completed
} else {
ContainerStatus::Stopped
}
}
client_core::container::ServiceStatus::Unknown => ContainerStatus::Unknown,
client_core::container::ServiceStatus::Created => ContainerStatus::Unknown,
client_core::container::ServiceStatus::Restarting => ContainerStatus::Starting,
}
}
async fn is_oneshot_service(&self, service_name: &str) -> bool {
if let Ok(service_config) = self.docker_manager.parse_service_config(service_name).await
&& let Some(restart_policy) = service_config.restart
{
if restart_policy == "no" || restart_policy == "false" {
info!(
"Service {service} restart policy: {policy} (oneshot)",
service = service_name,
policy = restart_policy
);
return true;
}
if restart_policy == "always"
|| restart_policy == "unless-stopped"
|| restart_policy == "on-failure"
{
info!(
"Service {service} restart policy: {policy} (persistent)",
service = service_name,
policy = restart_policy
);
return false;
}
}
false
}
async fn get_container_labels(&self, container_name: &str) -> Option<ComposeLabels> {
match Docker::connect_with_socket_defaults() {
Ok(docker) => {
match docker
.list_containers(Some(ListContainersOptions {
all: true,
..Default::default()
}))
.await
{
Ok(containers) => {
for container in containers {
if let Some(names) = &container.names {
let container_matches = names.iter().any(|name| {
let clean_name = name.strip_prefix('/').unwrap_or(name);
clean_name == container_name
});
if container_matches {
if let Some(labels) = &container.labels {
return Some(ComposeLabels {
project: labels
.get("com.docker.compose.project")
.cloned(),
service: labels
.get("com.docker.compose.service")
.cloned(),
container_number: labels
.get("com.docker.compose.container-number")
.cloned(),
oneoff: labels
.get("com.docker.compose.oneoff")
.and_then(|v| v.parse::<bool>().ok())
.or_else(|| {
labels
.get("com.docker.compose.oneoff")
.map(|v| v.to_lowercase() == "true")
}),
config_files: labels
.get("com.docker.compose.project.config_files")
.cloned(),
working_dir: labels
.get("com.docker.compose.project.working_dir")
.cloned(),
});
}
return None; }
}
}
None }
Err(e) => {
warn!(
"Bollard failed to get container list: {error}",
error = e.to_string()
);
None
}
}
}
Err(e) => {
warn!(
"Bollard failed to connect to Docker: {error}",
error = e.to_string()
);
None
}
}
}
async fn is_container_from_compose_project(
&self,
container_name: &str,
project_name: &str,
compose_file_path: &str,
) -> bool {
if let Some(labels) = self.get_container_labels(container_name).await {
if let Some(label_project) = &labels.project {
if label_project != project_name {
info!(
"⚠️ Container {container} project mismatch: {label} vs {expected}",
container = container_name,
label = label_project,
expected = project_name
);
return false;
}
} else {
info!(
"⚠️ Container {container} missing project label",
container = container_name
);
return false;
}
if let Some(label_config_files) = &labels.config_files {
let compose_file_absolute =
match std::path::Path::new(compose_file_path).canonicalize() {
Ok(abs_path) => abs_path.to_string_lossy().to_string(),
Err(_) => {
let current_dir = std::env::current_dir().unwrap_or_default();
let full_path = current_dir.join(compose_file_path);
full_path.to_string_lossy().to_string()
}
};
debug!(
"🔍 Path comparison: container label path={}, local absolute path={}",
label_config_files, compose_file_absolute
);
#[cfg(windows)]
fn normalize_win_path(path: &str) -> &str {
if path.starts_with(r"\\?\") {
&path[4..]
} else {
path
}
}
#[cfg(windows)]
let matched = normalize_win_path(label_config_files)
.eq_ignore_ascii_case(normalize_win_path(&compose_file_absolute));
#[cfg(not(windows))]
let matched = label_config_files == &compose_file_absolute;
if matched {
debug!(
"✅ Container {container} config path matched",
container = container_name
);
return true;
} else {
debug!(
"❌ Container {container} config path mismatch: {label} vs {expected}",
container = container_name,
label = label_config_files,
expected = compose_file_absolute
);
return false;
}
}
info!(
"⚠️ Container {container} missing config path, but project name matched",
container = container_name
);
true
} else {
info!(
"⚠️ Container {container} cannot get compose labels",
container = container_name
);
false
}
}
async fn get_container_service_name(&self, container_name: &str) -> Option<String> {
self.get_container_labels(container_name)
.await
.and_then(|labels| labels.service)
}
async fn get_container_health_status(&self, container_name: &str) -> Option<HealthStatusEnum> {
match Docker::connect_with_socket_defaults() {
Ok(docker) => {
match docker
.inspect_container(container_name, None::<InspectContainerOptions>)
.await
{
Ok(container_info) => container_info
.state
.and_then(|state| state.health.and_then(|health| health.status)),
Err(e) => {
warn!(
"Cannot get health status for container {container}: {error}",
container = container_name,
error = e.to_string()
);
None
}
}
}
Err(e) => {
warn!(
"Cannot connect to Docker for health check: {error}",
error = e.to_string()
);
None
}
}
}
pub async fn wait_for_services_ready(
&self,
check_interval: Duration,
) -> DockerServiceResult<HealthReport> {
use std::time::Instant;
let timeout = Duration::from_secs(timeout::HEALTH_CHECK_TIMEOUT);
let start_time = Instant::now();
info!(
"⏳ Starting service startup check, timeout: {timeout}s",
timeout = timeout.as_secs()
);
loop {
let elapsed = start_time.elapsed();
if elapsed >= timeout {
error!(
"⏰ Health check timeout! elapsed: {elapsed}s",
elapsed = elapsed.as_secs()
);
return Err(DockerServiceError::Timeout {
operation: t!("health_check.wait_operation").to_string(),
timeout_seconds: timeout.as_secs(),
});
}
let report = self.health_check().await?;
if report.is_all_healthy() {
info!(
"🎉 All services started! elapsed: {elapsed}s",
elapsed = elapsed.as_secs()
);
return Ok(report);
} else {
info!(
"⏳ Services starting... elapsed: {elapsed}s",
elapsed = elapsed.as_secs()
);
let failed_containers = report.failed_containers();
if !failed_containers.is_empty() {
let failed_names: Vec<&str> =
failed_containers.iter().map(|c| c.name.as_str()).collect();
info!(
"⚠️ Not started containers: {names}",
names = format!("{:?}", failed_names)
);
}
}
tokio::time::sleep(check_interval).await;
}
}
pub async fn get_status_summary(&self) -> DockerServiceResult<String> {
let report = self.health_check().await?;
let mut summary = format!(
"{}: {} ({}/{})",
t!("health_check.service_status"),
t!("health_check.healthy"),
report.healthy_containers().len(),
report.total_containers()
);
if !report.errors.is_empty() {
summary.push_str(&format!(
"\n{}: {}",
t!("health_check.errors"),
report.errors.join(", ")
));
}
let failed_containers = report.failed_containers();
if !failed_containers.is_empty() {
let failed_names: Vec<&str> =
failed_containers.iter().map(|c| c.name.as_str()).collect();
summary.push_str(&format!(
"\n{}: {:?}",
t!("health_check.failed_containers"),
failed_names
));
}
Ok(summary)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_report() {
let mut report = HealthReport::default();
report.add_container(ContainerInfo {
name: "service1".to_string(),
status: ContainerStatus::Running,
image: "test:latest".to_string(),
ports: vec!["8080:8080".to_string()],
uptime: None,
health: None,
is_oneshot: false,
restart: Some(RestartPolicy::UnlessStopped),
});
report.add_container(ContainerInfo {
name: "service2".to_string(),
status: ContainerStatus::Starting,
image: "test2:latest".to_string(),
ports: vec![],
uptime: None,
health: None,
is_oneshot: false,
restart: Some(RestartPolicy::Always),
});
assert_eq!(report.finalize(), ServiceStatus::Starting);
assert_eq!(report.get_running_count(), 1);
assert_eq!(report.get_total_count(), 2);
}
}