use bollard::query_parameters::InspectContainerOptions;
use tokio::time;
use crate::{
client::Client,
docker::{DockerError, DockerInspectContainer},
models::{ContainerHealthStatus, WatchOptions},
};
#[derive(Debug, thiserror::Error)]
pub enum WatchDeploymentError {
#[error("Failed to inspect container: {0}")]
ContainerInspect(#[from] DockerError),
#[error("Timeout while waiting for container {deployment_name} to become healthy")]
Timeout { deployment_name: String },
#[error("Deployment {deployment_name} is not healthy [status: {status}]")]
UnhealthyDeployment {
deployment_name: String,
status: ContainerHealthStatus,
},
}
impl<D: DockerInspectContainer> Client<D> {
pub async fn wait_for_healthy_deployment(
&self,
deployment_name: &str,
options: WatchOptions,
) -> Result<(), WatchDeploymentError> {
let timeout_duration = options
.timeout_duration
.unwrap_or(time::Duration::from_secs(60) * 10);
time::timeout(
timeout_duration,
self.wait_for_healthy_deployment_inner(deployment_name, options),
)
.await
.map_err(|_| WatchDeploymentError::Timeout {
deployment_name: deployment_name.to_string(),
})?
}
async fn wait_for_healthy_deployment_inner(
&self,
deployment_name: &str,
options: WatchOptions,
) -> Result<(), WatchDeploymentError> {
loop {
let mut status: ContainerHealthStatus = self
.docker
.inspect_container(deployment_name, None::<InspectContainerOptions>)
.await
.map_err(WatchDeploymentError::ContainerInspect)?
.state
.and_then(|s| s.health)
.and_then(|h| h.status)
.map(ContainerHealthStatus::from)
.ok_or_else(|| WatchDeploymentError::UnhealthyDeployment {
deployment_name: deployment_name.to_string(),
status: ContainerHealthStatus::None,
})?;
if options.allow_unhealthy_initial_state && status == ContainerHealthStatus::Unhealthy {
status = ContainerHealthStatus::Starting;
}
match status {
ContainerHealthStatus::Healthy => return Ok(()),
ContainerHealthStatus::Starting => {
time::sleep(std::time::Duration::from_secs(1)).await;
}
ContainerHealthStatus::None
| ContainerHealthStatus::Empty
| ContainerHealthStatus::Unhealthy => {
return Err(WatchDeploymentError::UnhealthyDeployment {
deployment_name: deployment_name.to_string(),
status,
});
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::docker::DockerError;
use bollard::models::{
ContainerConfig, ContainerInspectResponse, ContainerState, ContainerStateStatusEnum,
HealthStatusEnum,
};
use maplit::hashmap;
use mockall::mock;
use pretty_assertions::assert_eq;
mock! {
Docker {}
impl DockerInspectContainer for Docker {
async fn inspect_container(
&self,
container_id: &str,
options: Option<InspectContainerOptions>,
) -> Result<ContainerInspectResponse, DockerError>;
}
}
fn create_test_container_inspect_response() -> ContainerInspectResponse {
ContainerInspectResponse {
id: Some("test_container_id".to_string()),
name: Some("/test-deployment".to_string()),
config: Some(ContainerConfig {
labels: Some(hashmap! {
"mongodb-atlas-local".to_string() => "container".to_string(),
"version".to_string() => "8.0.0".to_string(),
"mongodb-type".to_string() => "community".to_string(),
}),
env: Some(vec!["TOOL=ATLASCLI".to_string()]),
..Default::default()
}),
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
health: Some(bollard::models::Health {
status: Some(HealthStatusEnum::HEALTHY),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}
}
fn create_test_container_inspect_response_unhealthy() -> ContainerInspectResponse {
ContainerInspectResponse {
id: Some("test_container_id".to_string()),
name: Some("/test-deployment".to_string()),
config: Some(ContainerConfig {
labels: Some(hashmap! {
"mongodb-atlas-local".to_string() => "container".to_string(),
"version".to_string() => "8.0.0".to_string(),
"mongodb-type".to_string() => "community".to_string(),
}),
env: Some(vec!["TOOL=ATLASCLI".to_string()]),
..Default::default()
}),
state: Some(ContainerState {
health: Some(bollard::models::Health {
status: Some(HealthStatusEnum::UNHEALTHY),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}
}
fn create_test_container_inspect_response_starting() -> ContainerInspectResponse {
ContainerInspectResponse {
id: Some("test_container_id".to_string()),
name: Some("/test-deployment".to_string()),
config: Some(ContainerConfig {
labels: Some(hashmap! {
"mongodb-atlas-local".to_string() => "container".to_string(),
"version".to_string() => "8.0.0".to_string(),
"mongodb-type".to_string() => "community".to_string(),
}),
env: Some(vec!["TOOL=ATLASCLI".to_string()]),
..Default::default()
}),
state: Some(ContainerState {
health: Some(bollard::models::Health {
status: Some(HealthStatusEnum::STARTING),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}
}
fn create_test_container_inspect_response_no_state() -> ContainerInspectResponse {
ContainerInspectResponse {
id: Some("test_container_id".to_string()),
name: Some("/test-deployment".to_string()),
config: Some(ContainerConfig {
labels: Some(hashmap! {
"mongodb-atlas-local".to_string() => "container".to_string(),
"version".to_string() => "8.0.0".to_string(),
"mongodb-type".to_string() => "community".to_string(),
}),
env: Some(vec!["TOOL=ATLASCLI".to_string()]),
..Default::default()
}),
state: None,
..Default::default()
}
}
fn create_test_container_inspect_response_no_health() -> ContainerInspectResponse {
ContainerInspectResponse {
id: Some("test_container_id".to_string()),
name: Some("/test-deployment".to_string()),
config: Some(ContainerConfig {
labels: Some(hashmap! {
"mongodb-atlas-local".to_string() => "container".to_string(),
"version".to_string() => "8.0.0".to_string(),
"mongodb-type".to_string() => "community".to_string(),
}),
env: Some(vec!["TOOL=ATLASCLI".to_string()]),
..Default::default()
}),
state: Some(ContainerState {
health: None,
..Default::default()
}),
..Default::default()
}
}
#[tokio::test]
async fn test_wait_for_healthy_deployment() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_unhealthy() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response_unhealthy()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
WatchDeploymentError::UnhealthyDeployment { .. }
));
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_retries() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response_starting()));
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_timeout() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder()
.timeout_duration(time::Duration::from_millis(100))
.build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.returning(|_, _| Ok(create_test_container_inspect_response_starting()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_err());
match result.unwrap_err() {
WatchDeploymentError::Timeout { deployment_name } => {
assert_eq!(deployment_name, "test-deployment");
}
_ => panic!("Expected Timeout error"),
}
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_no_state() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response_no_state()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_err());
match result.unwrap_err() {
WatchDeploymentError::UnhealthyDeployment {
deployment_name,
status,
} => {
assert_eq!(deployment_name, "test-deployment");
assert_eq!(status, ContainerHealthStatus::None);
}
_ => panic!("Expected UnhealthyDeployment error"),
}
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_no_health() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Ok(create_test_container_inspect_response_no_health()));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_err());
match result.unwrap_err() {
WatchDeploymentError::UnhealthyDeployment {
deployment_name,
status,
} => {
assert_eq!(deployment_name, "test-deployment");
assert_eq!(status, ContainerHealthStatus::None);
}
_ => panic!("Expected UnhealthyDeployment error"),
}
}
#[tokio::test]
async fn test_wait_for_healthy_deployment_container_inspect_error() {
let mut mock_docker = MockDocker::new();
let options = WatchOptions::builder().build();
mock_docker
.expect_inspect_container()
.with(
mockall::predicate::eq("test-deployment"),
mockall::predicate::eq(None::<InspectContainerOptions>),
)
.times(1)
.returning(|_, _| Err(DockerError::NotFound));
let client = Client::new(mock_docker);
let result = client
.wait_for_healthy_deployment("test-deployment", options)
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
WatchDeploymentError::ContainerInspect(_)
));
}
}