Skip to main content

atlas_local/client/
watch_deployment.rs

1use bollard::query_parameters::InspectContainerOptions;
2use bollard::secret::HealthStatusEnum;
3use tokio::time;
4
5use crate::{client::Client, docker::DockerInspectContainer, models::WatchOptions};
6
7#[derive(Debug, thiserror::Error)]
8pub enum WatchDeploymentError {
9    #[error("Failed to inspect container: {0}")]
10    ContainerInspect(#[from] bollard::errors::Error),
11    #[error("Timeout while waiting for container {deployment_name} to become healthy")]
12    Timeout { deployment_name: String },
13    #[error("Deployment {deployment_name} is not healthy [status: {status}]")]
14    UnhealthyDeployment {
15        deployment_name: String,
16        status: HealthStatusEnum,
17    },
18}
19
20impl<D: DockerInspectContainer> Client<D> {
21    /// Waits for a deployment to become healthy.
22    ///
23    /// This method polls the container's health status until it becomes healthy,
24    /// or until the timeout specified in the options is reached.
25    ///
26    /// # Arguments
27    ///
28    /// * `cluster_name` - The name or ID of the container to watch
29    /// * `options` - Configuration options including timeout duration
30    ///
31    /// # Returns
32    ///
33    /// Returns `Ok(())` when the container becomes healthy, or an error if:
34    /// - The container inspection fails
35    /// - The container becomes unhealthy
36    /// - The timeout is reached
37    ///
38    /// # Examples
39    ///
40    /// ```
41    /// use atlas_local::models::WatchOptions;
42    /// use std::time::Duration;
43    ///
44    /// # async fn example(client: atlas_local::Client) -> Result<(), Box<dyn std::error::Error>> {
45    /// let options = WatchOptions::builder()
46    ///     .timeout_duration(Duration::from_secs(300))
47    ///     .build();
48    ///
49    /// client.wait_for_healthy_deployment("my-deployment", options).await?;
50    /// # Ok(())
51    /// # }
52    /// ```
53    pub async fn wait_for_healthy_deployment(
54        &self,
55        deployment_name: &str,
56        options: WatchOptions,
57    ) -> Result<(), WatchDeploymentError> {
58        let timeout_duration = options
59            .timeout_duration
60            .unwrap_or(time::Duration::from_secs(60) * 10);
61        time::timeout(
62            timeout_duration,
63            self.wait_for_healthy_deployment_inner(deployment_name, options),
64        )
65        .await
66        .map_err(|_| WatchDeploymentError::Timeout {
67            deployment_name: deployment_name.to_string(),
68        })?
69    }
70
71    async fn wait_for_healthy_deployment_inner(
72        &self,
73        deployment_name: &str,
74        options: WatchOptions,
75    ) -> Result<(), WatchDeploymentError> {
76        // Loop until the container is healthy
77        loop {
78            let mut status = self
79                .docker
80                .inspect_container(deployment_name, None::<InspectContainerOptions>)
81                .await
82                .map_err(WatchDeploymentError::ContainerInspect)?
83                .state
84                .and_then(|s| s.health)
85                .and_then(|h| h.status)
86                .ok_or_else(|| WatchDeploymentError::UnhealthyDeployment {
87                    deployment_name: deployment_name.to_string(),
88                    status: HealthStatusEnum::NONE,
89                })?;
90
91            // If allow_unhealthy_initial_state is set then we handle it as a starting state
92            if options.allow_unhealthy_initial_state && status == HealthStatusEnum::UNHEALTHY {
93                status = HealthStatusEnum::STARTING;
94            }
95
96            match status {
97                HealthStatusEnum::HEALTHY => return Ok(()),
98                HealthStatusEnum::STARTING => {
99                    time::sleep(std::time::Duration::from_secs(1)).await;
100                }
101                HealthStatusEnum::NONE | HealthStatusEnum::EMPTY | HealthStatusEnum::UNHEALTHY => {
102                    return Err(WatchDeploymentError::UnhealthyDeployment {
103                        deployment_name: deployment_name.to_string(),
104                        status,
105                    });
106                }
107            }
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use bollard::{
116        errors::Error as BollardError,
117        secret::{
118            ContainerConfig, ContainerInspectResponse, ContainerState, ContainerStateStatusEnum,
119            HealthStatusEnum,
120        },
121    };
122    use maplit::hashmap;
123    use mockall::mock;
124    use pretty_assertions::assert_eq;
125
126    mock! {
127        Docker {}
128
129        impl DockerInspectContainer for Docker {
130            async fn inspect_container(
131                &self,
132                container_id: &str,
133                options: Option<InspectContainerOptions>,
134            ) -> Result<ContainerInspectResponse, BollardError>;
135        }
136    }
137
138    fn create_test_container_inspect_response() -> ContainerInspectResponse {
139        ContainerInspectResponse {
140            id: Some("test_container_id".to_string()),
141            name: Some("/test-deployment".to_string()),
142            config: Some(ContainerConfig {
143                labels: Some(hashmap! {
144                    "mongodb-atlas-local".to_string() => "container".to_string(),
145                    "version".to_string() => "8.0.0".to_string(),
146                    "mongodb-type".to_string() => "community".to_string(),
147                }),
148                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
149                ..Default::default()
150            }),
151            state: Some(ContainerState {
152                status: Some(ContainerStateStatusEnum::RUNNING),
153                health: Some(bollard::secret::Health {
154                    status: Some(HealthStatusEnum::HEALTHY),
155                    ..Default::default()
156                }),
157                ..Default::default()
158            }),
159            ..Default::default()
160        }
161    }
162
163    fn create_test_container_inspect_response_unhealthy() -> ContainerInspectResponse {
164        ContainerInspectResponse {
165            id: Some("test_container_id".to_string()),
166            name: Some("/test-deployment".to_string()),
167            config: Some(ContainerConfig {
168                labels: Some(hashmap! {
169                    "mongodb-atlas-local".to_string() => "container".to_string(),
170                    "version".to_string() => "8.0.0".to_string(),
171                    "mongodb-type".to_string() => "community".to_string(),
172                }),
173                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
174                ..Default::default()
175            }),
176            state: Some(ContainerState {
177                health: Some(bollard::secret::Health {
178                    status: Some(HealthStatusEnum::UNHEALTHY),
179                    ..Default::default()
180                }),
181                ..Default::default()
182            }),
183            ..Default::default()
184        }
185    }
186
187    fn create_test_container_inspect_response_starting() -> ContainerInspectResponse {
188        ContainerInspectResponse {
189            id: Some("test_container_id".to_string()),
190            name: Some("/test-deployment".to_string()),
191            config: Some(ContainerConfig {
192                labels: Some(hashmap! {
193                    "mongodb-atlas-local".to_string() => "container".to_string(),
194                    "version".to_string() => "8.0.0".to_string(),
195                    "mongodb-type".to_string() => "community".to_string(),
196                }),
197                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
198                ..Default::default()
199            }),
200            state: Some(ContainerState {
201                health: Some(bollard::secret::Health {
202                    status: Some(HealthStatusEnum::STARTING),
203                    ..Default::default()
204                }),
205                ..Default::default()
206            }),
207            ..Default::default()
208        }
209    }
210
211    fn create_test_container_inspect_response_no_state() -> ContainerInspectResponse {
212        ContainerInspectResponse {
213            id: Some("test_container_id".to_string()),
214            name: Some("/test-deployment".to_string()),
215            config: Some(ContainerConfig {
216                labels: Some(hashmap! {
217                    "mongodb-atlas-local".to_string() => "container".to_string(),
218                    "version".to_string() => "8.0.0".to_string(),
219                    "mongodb-type".to_string() => "community".to_string(),
220                }),
221                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
222                ..Default::default()
223            }),
224            state: None,
225            ..Default::default()
226        }
227    }
228
229    fn create_test_container_inspect_response_no_health() -> ContainerInspectResponse {
230        ContainerInspectResponse {
231            id: Some("test_container_id".to_string()),
232            name: Some("/test-deployment".to_string()),
233            config: Some(ContainerConfig {
234                labels: Some(hashmap! {
235                    "mongodb-atlas-local".to_string() => "container".to_string(),
236                    "version".to_string() => "8.0.0".to_string(),
237                    "mongodb-type".to_string() => "community".to_string(),
238                }),
239                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
240                ..Default::default()
241            }),
242            state: Some(ContainerState {
243                health: None,
244                ..Default::default()
245            }),
246            ..Default::default()
247        }
248    }
249
250    #[tokio::test]
251    async fn test_wait_for_healthy_deployment() {
252        // Arrange
253        let mut mock_docker = MockDocker::new();
254        let options = WatchOptions::builder().build();
255
256        mock_docker
257            .expect_inspect_container()
258            .with(
259                mockall::predicate::eq("test-deployment"),
260                mockall::predicate::eq(None::<InspectContainerOptions>),
261            )
262            .times(1)
263            .returning(|_, _| Ok(create_test_container_inspect_response()));
264
265        let client = Client::new(mock_docker);
266
267        // Act
268        let result = client
269            .wait_for_healthy_deployment("test-deployment", options)
270            .await;
271
272        // Assert
273        assert!(result.is_ok());
274    }
275
276    #[tokio::test]
277    async fn test_wait_for_healthy_deployment_unhealthy() {
278        // Arrange
279        let mut mock_docker = MockDocker::new();
280        let options = WatchOptions::builder().build();
281
282        mock_docker
283            .expect_inspect_container()
284            .with(
285                mockall::predicate::eq("test-deployment"),
286                mockall::predicate::eq(None::<InspectContainerOptions>),
287            )
288            .times(1)
289            .returning(|_, _| Ok(create_test_container_inspect_response_unhealthy()));
290
291        let client = Client::new(mock_docker);
292
293        // Act
294        let result = client
295            .wait_for_healthy_deployment("test-deployment", options)
296            .await;
297
298        // Assert
299        assert!(result.is_err());
300        assert!(matches!(
301            result.unwrap_err(),
302            WatchDeploymentError::UnhealthyDeployment { .. }
303        ));
304    }
305
306    #[tokio::test]
307    async fn test_wait_for_healthy_deployment_retries() {
308        // Arrange
309        let mut mock_docker = MockDocker::new();
310        let options = WatchOptions::builder().build();
311
312        mock_docker
313            .expect_inspect_container()
314            .with(
315                mockall::predicate::eq("test-deployment"),
316                mockall::predicate::eq(None::<InspectContainerOptions>),
317            )
318            .times(1)
319            .returning(|_, _| Ok(create_test_container_inspect_response_starting()));
320
321        mock_docker
322            .expect_inspect_container()
323            .with(
324                mockall::predicate::eq("test-deployment"),
325                mockall::predicate::eq(None::<InspectContainerOptions>),
326            )
327            .times(1)
328            .returning(|_, _| Ok(create_test_container_inspect_response()));
329
330        let client = Client::new(mock_docker);
331
332        // Act
333        let result = client
334            .wait_for_healthy_deployment("test-deployment", options)
335            .await;
336
337        // Assert
338        assert!(result.is_ok());
339    }
340
341    #[tokio::test]
342    async fn test_wait_for_healthy_deployment_timeout() {
343        // Arrange
344        let mut mock_docker = MockDocker::new();
345        let options = WatchOptions::builder()
346            .timeout_duration(time::Duration::from_millis(100))
347            .build();
348
349        // Mock inspect_container to always return STARTING status, which will cause timeout
350        mock_docker
351            .expect_inspect_container()
352            .with(
353                mockall::predicate::eq("test-deployment"),
354                mockall::predicate::eq(None::<InspectContainerOptions>),
355            )
356            .returning(|_, _| Ok(create_test_container_inspect_response_starting()));
357
358        let client = Client::new(mock_docker);
359
360        // Act
361        let result = client
362            .wait_for_healthy_deployment("test-deployment", options)
363            .await;
364
365        // Assert
366        assert!(result.is_err());
367        match result.unwrap_err() {
368            WatchDeploymentError::Timeout { deployment_name } => {
369                assert_eq!(deployment_name, "test-deployment");
370            }
371            _ => panic!("Expected Timeout error"),
372        }
373    }
374
375    #[tokio::test]
376    async fn test_wait_for_healthy_deployment_no_state() {
377        // Arrange
378        let mut mock_docker = MockDocker::new();
379        let options = WatchOptions::builder().build();
380
381        mock_docker
382            .expect_inspect_container()
383            .with(
384                mockall::predicate::eq("test-deployment"),
385                mockall::predicate::eq(None::<InspectContainerOptions>),
386            )
387            .times(1)
388            .returning(|_, _| Ok(create_test_container_inspect_response_no_state()));
389
390        let client = Client::new(mock_docker);
391
392        // Act
393        let result = client
394            .wait_for_healthy_deployment("test-deployment", options)
395            .await;
396
397        // Assert
398        assert!(result.is_err());
399        match result.unwrap_err() {
400            WatchDeploymentError::UnhealthyDeployment {
401                deployment_name,
402                status,
403            } => {
404                assert_eq!(deployment_name, "test-deployment");
405                assert_eq!(status, HealthStatusEnum::NONE);
406            }
407            _ => panic!("Expected UnhealthyDeployment error"),
408        }
409    }
410
411    #[tokio::test]
412    async fn test_wait_for_healthy_deployment_no_health() {
413        // Arrange
414        let mut mock_docker = MockDocker::new();
415        let options = WatchOptions::builder().build();
416
417        mock_docker
418            .expect_inspect_container()
419            .with(
420                mockall::predicate::eq("test-deployment"),
421                mockall::predicate::eq(None::<InspectContainerOptions>),
422            )
423            .times(1)
424            .returning(|_, _| Ok(create_test_container_inspect_response_no_health()));
425
426        let client = Client::new(mock_docker);
427
428        // Act
429        let result = client
430            .wait_for_healthy_deployment("test-deployment", options)
431            .await;
432
433        // Assert
434        assert!(result.is_err());
435        match result.unwrap_err() {
436            WatchDeploymentError::UnhealthyDeployment {
437                deployment_name,
438                status,
439            } => {
440                assert_eq!(deployment_name, "test-deployment");
441                assert_eq!(status, HealthStatusEnum::NONE);
442            }
443            _ => panic!("Expected UnhealthyDeployment error"),
444        }
445    }
446
447    #[tokio::test]
448    async fn test_wait_for_healthy_deployment_container_inspect_error() {
449        // Arrange
450        let mut mock_docker = MockDocker::new();
451        let options = WatchOptions::builder().build();
452
453        mock_docker
454            .expect_inspect_container()
455            .with(
456                mockall::predicate::eq("test-deployment"),
457                mockall::predicate::eq(None::<InspectContainerOptions>),
458            )
459            .times(1)
460            .returning(|_, _| {
461                Err(BollardError::DockerResponseServerError {
462                    status_code: 404,
463                    message: "No such container".to_string(),
464                })
465            });
466
467        let client = Client::new(mock_docker);
468
469        // Act
470        let result = client
471            .wait_for_healthy_deployment("test-deployment", options)
472            .await;
473
474        // Assert
475        assert!(result.is_err());
476        assert!(matches!(
477            result.unwrap_err(),
478            WatchDeploymentError::ContainerInspect(_)
479        ));
480    }
481}