Skip to main content

atlas_local/client/
watch_deployment.rs

1use bollard::query_parameters::InspectContainerOptions;
2use tokio::time;
3
4use crate::{
5    client::Client,
6    docker::{DockerError, DockerInspectContainer},
7    models::{ContainerHealthStatus, WatchOptions},
8};
9
10#[derive(Debug, thiserror::Error)]
11pub enum WatchDeploymentError {
12    #[error("Failed to inspect container: {0}")]
13    ContainerInspect(#[from] DockerError),
14    #[error("Timeout while waiting for container {deployment_name} to become healthy")]
15    Timeout { deployment_name: String },
16    #[error("Deployment {deployment_name} is not healthy [status: {status}]")]
17    UnhealthyDeployment {
18        deployment_name: String,
19        status: ContainerHealthStatus,
20    },
21}
22
23impl<D: DockerInspectContainer> Client<D> {
24    /// Waits for a deployment to become healthy.
25    ///
26    /// This method polls the container's health status until it becomes healthy,
27    /// or until the timeout specified in the options is reached.
28    ///
29    /// # Arguments
30    ///
31    /// * `cluster_name` - The name or ID of the container to watch
32    /// * `options` - Configuration options including timeout duration
33    ///
34    /// # Returns
35    ///
36    /// Returns `Ok(())` when the container becomes healthy, or an error if:
37    /// - The container inspection fails
38    /// - The container becomes unhealthy
39    /// - The timeout is reached
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// use atlas_local::models::WatchOptions;
45    /// use std::time::Duration;
46    ///
47    /// # async fn example(client: atlas_local::Client) -> Result<(), Box<dyn std::error::Error>> {
48    /// let options = WatchOptions::builder()
49    ///     .timeout_duration(Duration::from_secs(300))
50    ///     .build();
51    ///
52    /// client.wait_for_healthy_deployment("my-deployment", options).await?;
53    /// # Ok(())
54    /// # }
55    /// ```
56    pub async fn wait_for_healthy_deployment(
57        &self,
58        deployment_name: &str,
59        options: WatchOptions,
60    ) -> Result<(), WatchDeploymentError> {
61        let timeout_duration = options
62            .timeout_duration
63            .unwrap_or(time::Duration::from_secs(60) * 10);
64        time::timeout(
65            timeout_duration,
66            self.wait_for_healthy_deployment_inner(deployment_name, options),
67        )
68        .await
69        .map_err(|_| WatchDeploymentError::Timeout {
70            deployment_name: deployment_name.to_string(),
71        })?
72    }
73
74    async fn wait_for_healthy_deployment_inner(
75        &self,
76        deployment_name: &str,
77        options: WatchOptions,
78    ) -> Result<(), WatchDeploymentError> {
79        // Loop until the container is healthy
80        loop {
81            let mut status: ContainerHealthStatus = self
82                .docker
83                .inspect_container(deployment_name, None::<InspectContainerOptions>)
84                .await
85                .map_err(WatchDeploymentError::ContainerInspect)?
86                .state
87                .and_then(|s| s.health)
88                .and_then(|h| h.status)
89                .map(ContainerHealthStatus::from)
90                .ok_or_else(|| WatchDeploymentError::UnhealthyDeployment {
91                    deployment_name: deployment_name.to_string(),
92                    status: ContainerHealthStatus::None,
93                })?;
94
95            // If allow_unhealthy_initial_state is set then we handle it as a starting state
96            if options.allow_unhealthy_initial_state && status == ContainerHealthStatus::Unhealthy {
97                status = ContainerHealthStatus::Starting;
98            }
99
100            match status {
101                ContainerHealthStatus::Healthy => return Ok(()),
102                ContainerHealthStatus::Starting => {
103                    time::sleep(std::time::Duration::from_secs(1)).await;
104                }
105                ContainerHealthStatus::None
106                | ContainerHealthStatus::Empty
107                | ContainerHealthStatus::Unhealthy => {
108                    return Err(WatchDeploymentError::UnhealthyDeployment {
109                        deployment_name: deployment_name.to_string(),
110                        status,
111                    });
112                }
113            }
114        }
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::docker::DockerError;
122    use bollard::models::{
123        ContainerConfig, ContainerInspectResponse, ContainerState, ContainerStateStatusEnum,
124        HealthStatusEnum,
125    };
126    use maplit::hashmap;
127    use mockall::mock;
128    use pretty_assertions::assert_eq;
129
130    mock! {
131        Docker {}
132
133        impl DockerInspectContainer for Docker {
134            async fn inspect_container(
135                &self,
136                container_id: &str,
137                options: Option<InspectContainerOptions>,
138            ) -> Result<ContainerInspectResponse, DockerError>;
139        }
140    }
141
142    fn create_test_container_inspect_response() -> ContainerInspectResponse {
143        ContainerInspectResponse {
144            id: Some("test_container_id".to_string()),
145            name: Some("/test-deployment".to_string()),
146            config: Some(ContainerConfig {
147                labels: Some(hashmap! {
148                    "mongodb-atlas-local".to_string() => "container".to_string(),
149                    "version".to_string() => "8.0.0".to_string(),
150                    "mongodb-type".to_string() => "community".to_string(),
151                }),
152                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
153                ..Default::default()
154            }),
155            state: Some(ContainerState {
156                status: Some(ContainerStateStatusEnum::RUNNING),
157                health: Some(bollard::models::Health {
158                    status: Some(HealthStatusEnum::HEALTHY),
159                    ..Default::default()
160                }),
161                ..Default::default()
162            }),
163            ..Default::default()
164        }
165    }
166
167    fn create_test_container_inspect_response_unhealthy() -> ContainerInspectResponse {
168        ContainerInspectResponse {
169            id: Some("test_container_id".to_string()),
170            name: Some("/test-deployment".to_string()),
171            config: Some(ContainerConfig {
172                labels: Some(hashmap! {
173                    "mongodb-atlas-local".to_string() => "container".to_string(),
174                    "version".to_string() => "8.0.0".to_string(),
175                    "mongodb-type".to_string() => "community".to_string(),
176                }),
177                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
178                ..Default::default()
179            }),
180            state: Some(ContainerState {
181                health: Some(bollard::models::Health {
182                    status: Some(HealthStatusEnum::UNHEALTHY),
183                    ..Default::default()
184                }),
185                ..Default::default()
186            }),
187            ..Default::default()
188        }
189    }
190
191    fn create_test_container_inspect_response_starting() -> ContainerInspectResponse {
192        ContainerInspectResponse {
193            id: Some("test_container_id".to_string()),
194            name: Some("/test-deployment".to_string()),
195            config: Some(ContainerConfig {
196                labels: Some(hashmap! {
197                    "mongodb-atlas-local".to_string() => "container".to_string(),
198                    "version".to_string() => "8.0.0".to_string(),
199                    "mongodb-type".to_string() => "community".to_string(),
200                }),
201                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
202                ..Default::default()
203            }),
204            state: Some(ContainerState {
205                health: Some(bollard::models::Health {
206                    status: Some(HealthStatusEnum::STARTING),
207                    ..Default::default()
208                }),
209                ..Default::default()
210            }),
211            ..Default::default()
212        }
213    }
214
215    fn create_test_container_inspect_response_no_state() -> ContainerInspectResponse {
216        ContainerInspectResponse {
217            id: Some("test_container_id".to_string()),
218            name: Some("/test-deployment".to_string()),
219            config: Some(ContainerConfig {
220                labels: Some(hashmap! {
221                    "mongodb-atlas-local".to_string() => "container".to_string(),
222                    "version".to_string() => "8.0.0".to_string(),
223                    "mongodb-type".to_string() => "community".to_string(),
224                }),
225                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
226                ..Default::default()
227            }),
228            state: None,
229            ..Default::default()
230        }
231    }
232
233    fn create_test_container_inspect_response_no_health() -> ContainerInspectResponse {
234        ContainerInspectResponse {
235            id: Some("test_container_id".to_string()),
236            name: Some("/test-deployment".to_string()),
237            config: Some(ContainerConfig {
238                labels: Some(hashmap! {
239                    "mongodb-atlas-local".to_string() => "container".to_string(),
240                    "version".to_string() => "8.0.0".to_string(),
241                    "mongodb-type".to_string() => "community".to_string(),
242                }),
243                env: Some(vec!["TOOL=ATLASCLI".to_string()]),
244                ..Default::default()
245            }),
246            state: Some(ContainerState {
247                health: None,
248                ..Default::default()
249            }),
250            ..Default::default()
251        }
252    }
253
254    #[tokio::test]
255    async fn test_wait_for_healthy_deployment() {
256        // Arrange
257        let mut mock_docker = MockDocker::new();
258        let options = WatchOptions::builder().build();
259
260        mock_docker
261            .expect_inspect_container()
262            .with(
263                mockall::predicate::eq("test-deployment"),
264                mockall::predicate::eq(None::<InspectContainerOptions>),
265            )
266            .times(1)
267            .returning(|_, _| Ok(create_test_container_inspect_response()));
268
269        let client = Client::new(mock_docker);
270
271        // Act
272        let result = client
273            .wait_for_healthy_deployment("test-deployment", options)
274            .await;
275
276        // Assert
277        assert!(result.is_ok());
278    }
279
280    #[tokio::test]
281    async fn test_wait_for_healthy_deployment_unhealthy() {
282        // Arrange
283        let mut mock_docker = MockDocker::new();
284        let options = WatchOptions::builder().build();
285
286        mock_docker
287            .expect_inspect_container()
288            .with(
289                mockall::predicate::eq("test-deployment"),
290                mockall::predicate::eq(None::<InspectContainerOptions>),
291            )
292            .times(1)
293            .returning(|_, _| Ok(create_test_container_inspect_response_unhealthy()));
294
295        let client = Client::new(mock_docker);
296
297        // Act
298        let result = client
299            .wait_for_healthy_deployment("test-deployment", options)
300            .await;
301
302        // Assert
303        assert!(result.is_err());
304        assert!(matches!(
305            result.unwrap_err(),
306            WatchDeploymentError::UnhealthyDeployment { .. }
307        ));
308    }
309
310    #[tokio::test]
311    async fn test_wait_for_healthy_deployment_retries() {
312        // Arrange
313        let mut mock_docker = MockDocker::new();
314        let options = WatchOptions::builder().build();
315
316        mock_docker
317            .expect_inspect_container()
318            .with(
319                mockall::predicate::eq("test-deployment"),
320                mockall::predicate::eq(None::<InspectContainerOptions>),
321            )
322            .times(1)
323            .returning(|_, _| Ok(create_test_container_inspect_response_starting()));
324
325        mock_docker
326            .expect_inspect_container()
327            .with(
328                mockall::predicate::eq("test-deployment"),
329                mockall::predicate::eq(None::<InspectContainerOptions>),
330            )
331            .times(1)
332            .returning(|_, _| Ok(create_test_container_inspect_response()));
333
334        let client = Client::new(mock_docker);
335
336        // Act
337        let result = client
338            .wait_for_healthy_deployment("test-deployment", options)
339            .await;
340
341        // Assert
342        assert!(result.is_ok());
343    }
344
345    #[tokio::test]
346    async fn test_wait_for_healthy_deployment_timeout() {
347        // Arrange
348        let mut mock_docker = MockDocker::new();
349        let options = WatchOptions::builder()
350            .timeout_duration(time::Duration::from_millis(100))
351            .build();
352
353        // Mock inspect_container to always return STARTING status, which will cause timeout
354        mock_docker
355            .expect_inspect_container()
356            .with(
357                mockall::predicate::eq("test-deployment"),
358                mockall::predicate::eq(None::<InspectContainerOptions>),
359            )
360            .returning(|_, _| Ok(create_test_container_inspect_response_starting()));
361
362        let client = Client::new(mock_docker);
363
364        // Act
365        let result = client
366            .wait_for_healthy_deployment("test-deployment", options)
367            .await;
368
369        // Assert
370        assert!(result.is_err());
371        match result.unwrap_err() {
372            WatchDeploymentError::Timeout { deployment_name } => {
373                assert_eq!(deployment_name, "test-deployment");
374            }
375            _ => panic!("Expected Timeout error"),
376        }
377    }
378
379    #[tokio::test]
380    async fn test_wait_for_healthy_deployment_no_state() {
381        // Arrange
382        let mut mock_docker = MockDocker::new();
383        let options = WatchOptions::builder().build();
384
385        mock_docker
386            .expect_inspect_container()
387            .with(
388                mockall::predicate::eq("test-deployment"),
389                mockall::predicate::eq(None::<InspectContainerOptions>),
390            )
391            .times(1)
392            .returning(|_, _| Ok(create_test_container_inspect_response_no_state()));
393
394        let client = Client::new(mock_docker);
395
396        // Act
397        let result = client
398            .wait_for_healthy_deployment("test-deployment", options)
399            .await;
400
401        // Assert
402        assert!(result.is_err());
403        match result.unwrap_err() {
404            WatchDeploymentError::UnhealthyDeployment {
405                deployment_name,
406                status,
407            } => {
408                assert_eq!(deployment_name, "test-deployment");
409                assert_eq!(status, ContainerHealthStatus::None);
410            }
411            _ => panic!("Expected UnhealthyDeployment error"),
412        }
413    }
414
415    #[tokio::test]
416    async fn test_wait_for_healthy_deployment_no_health() {
417        // Arrange
418        let mut mock_docker = MockDocker::new();
419        let options = WatchOptions::builder().build();
420
421        mock_docker
422            .expect_inspect_container()
423            .with(
424                mockall::predicate::eq("test-deployment"),
425                mockall::predicate::eq(None::<InspectContainerOptions>),
426            )
427            .times(1)
428            .returning(|_, _| Ok(create_test_container_inspect_response_no_health()));
429
430        let client = Client::new(mock_docker);
431
432        // Act
433        let result = client
434            .wait_for_healthy_deployment("test-deployment", options)
435            .await;
436
437        // Assert
438        assert!(result.is_err());
439        match result.unwrap_err() {
440            WatchDeploymentError::UnhealthyDeployment {
441                deployment_name,
442                status,
443            } => {
444                assert_eq!(deployment_name, "test-deployment");
445                assert_eq!(status, ContainerHealthStatus::None);
446            }
447            _ => panic!("Expected UnhealthyDeployment error"),
448        }
449    }
450
451    #[tokio::test]
452    async fn test_wait_for_healthy_deployment_container_inspect_error() {
453        // Arrange
454        let mut mock_docker = MockDocker::new();
455        let options = WatchOptions::builder().build();
456
457        mock_docker
458            .expect_inspect_container()
459            .with(
460                mockall::predicate::eq("test-deployment"),
461                mockall::predicate::eq(None::<InspectContainerOptions>),
462            )
463            .times(1)
464            .returning(|_, _| Err(DockerError::NotFound));
465
466        let client = Client::new(mock_docker);
467
468        // Act
469        let result = client
470            .wait_for_healthy_deployment("test-deployment", options)
471            .await;
472
473        // Assert
474        assert!(result.is_err());
475        assert!(matches!(
476            result.unwrap_err(),
477            WatchDeploymentError::ContainerInspect(_)
478        ));
479    }
480}