1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::future::Fuse;
7use futures_util::FutureExt;
8use tokio::sync::oneshot::{self, Receiver, Sender, error::RecvError};
9
10use crate::models::Deployment;
11
12use super::CreateDeploymentError;
13
14pub struct CreateDeploymentProgress {
15 pub pull_image_finished: Fuse<Receiver<CreateDeploymentStepOutcome>>,
16 pub create_container_finished: Fuse<Receiver<CreateDeploymentStepOutcome>>,
17 pub start_container_finished: Fuse<Receiver<CreateDeploymentStepOutcome>>,
18 pub wait_for_healthy_deployment_finished: Fuse<Receiver<CreateDeploymentStepOutcome>>,
19 pub deployment: Fuse<Receiver<Result<Deployment, CreateDeploymentError>>>,
20}
21
22impl CreateDeploymentProgress {
23 fn await_receiver<T>(
25 receiver: &mut Fuse<Receiver<T>>,
26 ) -> impl std::future::Future<Output = Result<T, RecvError>> {
27 Pin::new(receiver).into_future()
28 }
29
30 pub async fn wait_for_pull_image_outcome(
31 &mut self,
32 ) -> Result<CreateDeploymentStepOutcome, RecvError> {
33 Self::await_receiver(&mut self.pull_image_finished).await
34 }
35
36 pub async fn wait_for_create_container_outcome(
37 &mut self,
38 ) -> Result<CreateDeploymentStepOutcome, RecvError> {
39 Self::await_receiver(&mut self.create_container_finished).await
40 }
41
42 pub async fn wait_for_start_container_outcome(
43 &mut self,
44 ) -> Result<CreateDeploymentStepOutcome, RecvError> {
45 Self::await_receiver(&mut self.start_container_finished).await
46 }
47
48 pub async fn wait_for_wait_for_healthy_deployment_outcome(
49 &mut self,
50 ) -> Result<CreateDeploymentStepOutcome, RecvError> {
51 Self::await_receiver(&mut self.wait_for_healthy_deployment_finished).await
52 }
53
54 pub async fn wait_for_deployment_outcome(
55 &mut self,
56 ) -> Result<Deployment, CreateDeploymentError> {
57 self.await
59 }
60}
61
62impl std::future::Future for CreateDeploymentProgress {
63 type Output = Result<Deployment, CreateDeploymentError>;
64
65 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 match Pin::new(&mut self.deployment).poll(cx) {
67 Poll::Ready(Ok(result)) => Poll::Ready(result),
68 Poll::Ready(Err(error)) => {
69 Poll::Ready(Err(CreateDeploymentError::ReceiveDeployment(error)))
70 }
71 Poll::Pending => Poll::Pending,
72 }
73 }
74}
75
76pub struct CreateDeploymentProgressSender {
77 pub pull_image_finished: Option<Sender<CreateDeploymentStepOutcome>>,
78 pub create_container_finished: Option<Sender<CreateDeploymentStepOutcome>>,
79 pub start_container_finished: Option<Sender<CreateDeploymentStepOutcome>>,
80 pub wait_for_healthy_deployment_finished: Option<Sender<CreateDeploymentStepOutcome>>,
81 pub deployment: Sender<Result<Deployment, CreateDeploymentError>>,
82}
83
84impl CreateDeploymentProgressSender {
85 async fn send_outcome(
88 sender: &mut Option<Sender<CreateDeploymentStepOutcome>>,
89 outcome: CreateDeploymentStepOutcome,
90 ) -> bool {
91 if let Some(sender) = sender.take() {
92 if sender.send(outcome).is_ok() {
94 return true;
95 }
96 }
97
98 false
99 }
100
101 pub async fn set_pull_image_finished(&mut self, outcome: CreateDeploymentStepOutcome) {
102 Self::send_outcome(&mut self.pull_image_finished, outcome).await;
103 }
104
105 pub async fn set_create_container_finished(&mut self, outcome: CreateDeploymentStepOutcome) {
106 Self::send_outcome(&mut self.create_container_finished, outcome).await;
107 }
108
109 pub async fn set_start_container_finished(&mut self, outcome: CreateDeploymentStepOutcome) {
110 Self::send_outcome(&mut self.start_container_finished, outcome).await;
111 }
112
113 pub async fn set_wait_for_healthy_deployment_finished(
114 &mut self,
115 outcome: CreateDeploymentStepOutcome,
116 ) {
117 Self::send_outcome(&mut self.wait_for_healthy_deployment_finished, outcome).await;
118 }
119
120 pub async fn finalize_deployment(mut self, result: Result<Deployment, CreateDeploymentError>) {
127 let mut outcome = if result.is_err() {
131 CreateDeploymentStepOutcome::Failure
132 } else {
133 CreateDeploymentStepOutcome::Skipped
134 };
135
136 let send_failure_or_skipped =
138 async |outcome: &mut CreateDeploymentStepOutcome,
139 sender: &mut Option<Sender<CreateDeploymentStepOutcome>>| {
140 if Self::send_outcome(sender, *outcome).await {
141 *outcome = CreateDeploymentStepOutcome::Skipped;
144 }
145 };
146
147 send_failure_or_skipped(&mut outcome, &mut self.pull_image_finished).await;
149 send_failure_or_skipped(&mut outcome, &mut self.create_container_finished).await;
150 send_failure_or_skipped(&mut outcome, &mut self.start_container_finished).await;
151 send_failure_or_skipped(&mut outcome, &mut self.wait_for_healthy_deployment_finished).await;
152
153 _ = self.deployment.send(result);
155 }
156}
157
158#[derive(Debug, Copy, Clone, PartialEq, Eq)]
159pub enum CreateDeploymentStepOutcome {
160 Success,
161 Skipped,
162 Failure,
163}
164
165pub fn create_progress_pairs() -> (CreateDeploymentProgressSender, CreateDeploymentProgress) {
166 let (pull_image_finished, pull_image_finished_receiver) = oneshot::channel();
167 let (create_container_finished, create_container_finished_receiver) = oneshot::channel();
168 let (start_container_finished, start_container_finished_receiver) = oneshot::channel();
169 let (wait_for_healthy_deployment_finished, wait_for_healthy_deployment_finished_receiver) =
170 oneshot::channel();
171 let (deployment, deployment_receiver) = oneshot::channel();
172
173 (
174 CreateDeploymentProgressSender {
175 pull_image_finished: Some(pull_image_finished),
176 create_container_finished: Some(create_container_finished),
177 start_container_finished: Some(start_container_finished),
178 wait_for_healthy_deployment_finished: Some(wait_for_healthy_deployment_finished),
179 deployment,
180 },
181 CreateDeploymentProgress {
182 pull_image_finished: pull_image_finished_receiver.fuse(),
183 create_container_finished: create_container_finished_receiver.fuse(),
184 start_container_finished: start_container_finished_receiver.fuse(),
185 wait_for_healthy_deployment_finished: wait_for_healthy_deployment_finished_receiver
186 .fuse(),
187 deployment: deployment_receiver.fuse(),
188 },
189 )
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::models::{MongodbType, State};
196 use semver::Version;
197
198 fn create_test_deployment() -> Deployment {
199 Deployment {
200 container_id: "test_container_id".to_string(),
201 name: Some("test-deployment".to_string()),
202 state: State::Running,
203 port_bindings: None,
204 mongodb_type: MongodbType::Community,
205 mongodb_version: Version::new(8, 0, 0),
206 creation_source: None,
207 local_seed_location: None,
208 mongodb_initdb_database: None,
209 mongodb_initdb_root_password_file: None,
210 mongodb_initdb_root_password: None,
211 mongodb_initdb_root_username_file: None,
212 mongodb_initdb_root_username: None,
213 mongodb_load_sample_data: None,
214 voyage_api_key: None,
215 mongot_log_file: None,
216 runner_log_file: None,
217 do_not_track: false,
218 telemetry_base_url: None,
219 }
220 }
221
222 async fn create_test_error() -> CreateDeploymentError {
223 let (sender, receiver) = oneshot::channel::<Result<Deployment, CreateDeploymentError>>();
225 drop(sender);
226 CreateDeploymentError::ReceiveDeployment(receiver.await.unwrap_err())
228 }
229
230 #[tokio::test]
231 async fn test_create_progress_pairs() {
232 let (mut sender, mut progress) = create_progress_pairs();
233
234 assert!(sender.pull_image_finished.is_some());
236 assert!(sender.create_container_finished.is_some());
237 assert!(sender.start_container_finished.is_some());
238 assert!(sender.wait_for_healthy_deployment_finished.is_some());
239
240 sender
242 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
243 .await;
244 let outcome = progress.wait_for_pull_image_outcome().await.unwrap();
245 assert_eq!(outcome, CreateDeploymentStepOutcome::Success);
246 }
247
248 #[tokio::test]
249 async fn test_wait_for_pull_image_outcome() {
250 let (mut sender, mut progress) = create_progress_pairs();
251
252 sender
254 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
255 .await;
256 assert_eq!(
257 progress.wait_for_pull_image_outcome().await.unwrap(),
258 CreateDeploymentStepOutcome::Success
259 );
260
261 let (mut sender, mut progress) = create_progress_pairs();
263 sender
264 .set_pull_image_finished(CreateDeploymentStepOutcome::Failure)
265 .await;
266 assert_eq!(
267 progress.wait_for_pull_image_outcome().await.unwrap(),
268 CreateDeploymentStepOutcome::Failure
269 );
270
271 let (mut sender, mut progress) = create_progress_pairs();
273 sender
274 .set_pull_image_finished(CreateDeploymentStepOutcome::Skipped)
275 .await;
276 assert_eq!(
277 progress.wait_for_pull_image_outcome().await.unwrap(),
278 CreateDeploymentStepOutcome::Skipped
279 );
280 }
281
282 #[tokio::test]
283 async fn test_wait_for_create_container_outcome() {
284 let (mut sender, mut progress) = create_progress_pairs();
285
286 sender
287 .set_create_container_finished(CreateDeploymentStepOutcome::Success)
288 .await;
289 assert_eq!(
290 progress.wait_for_create_container_outcome().await.unwrap(),
291 CreateDeploymentStepOutcome::Success
292 );
293 }
294
295 #[tokio::test]
296 async fn test_wait_for_start_container_outcome() {
297 let (mut sender, mut progress) = create_progress_pairs();
298
299 sender
300 .set_start_container_finished(CreateDeploymentStepOutcome::Success)
301 .await;
302 assert_eq!(
303 progress.wait_for_start_container_outcome().await.unwrap(),
304 CreateDeploymentStepOutcome::Success
305 );
306 }
307
308 #[tokio::test]
309 async fn test_wait_for_wait_for_healthy_deployment_outcome() {
310 let (mut sender, mut progress) = create_progress_pairs();
311
312 sender
313 .set_wait_for_healthy_deployment_finished(CreateDeploymentStepOutcome::Success)
314 .await;
315 assert_eq!(
316 progress
317 .wait_for_wait_for_healthy_deployment_outcome()
318 .await
319 .unwrap(),
320 CreateDeploymentStepOutcome::Success
321 );
322 }
323
324 #[tokio::test]
325 async fn test_wait_for_deployment_outcome_success() {
326 let (sender, mut progress) = create_progress_pairs();
327
328 let deployment = create_test_deployment();
329 sender.finalize_deployment(Ok(deployment.clone())).await;
330
331 let result = progress.wait_for_deployment_outcome().await;
332 assert!(result.is_ok());
333 assert_eq!(result.unwrap(), deployment);
334 }
335
336 #[tokio::test]
337 async fn test_wait_for_deployment_outcome_error() {
338 let (sender, mut progress) = create_progress_pairs();
339
340 let error = create_test_error().await;
341 sender.finalize_deployment(Err(error)).await;
342
343 let result = progress.wait_for_deployment_outcome().await;
344 assert!(result.is_err());
345 match result.unwrap_err() {
347 CreateDeploymentError::ReceiveDeployment(_) => {}
348 _ => panic!("Expected ReceiveDeployment error"),
349 }
350 }
351
352 #[tokio::test]
353 async fn test_future_implementation_success() {
354 let (sender, progress) = create_progress_pairs();
355
356 let deployment = create_test_deployment();
357 sender.finalize_deployment(Ok(deployment.clone())).await;
358
359 let result = progress.await;
361 assert!(result.is_ok());
362 assert_eq!(result.unwrap(), deployment);
363 }
364
365 #[tokio::test]
366 async fn test_future_implementation_error() {
367 let (sender, progress) = create_progress_pairs();
368
369 let error = create_test_error().await;
370 sender.finalize_deployment(Err(error)).await;
371
372 let result = progress.await;
374 assert!(result.is_err());
375 match result.unwrap_err() {
376 CreateDeploymentError::ReceiveDeployment(_) => {}
377 _ => panic!("Expected ReceiveDeployment error"),
378 }
379 }
380
381 #[tokio::test]
382 async fn test_set_pull_image_finished() {
383 let (mut sender, mut progress) = create_progress_pairs();
384
385 sender
386 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
387 .await;
388 assert_eq!(
389 progress.wait_for_pull_image_outcome().await.unwrap(),
390 CreateDeploymentStepOutcome::Success
391 );
392
393 sender
395 .set_pull_image_finished(CreateDeploymentStepOutcome::Failure)
396 .await;
397 }
399
400 #[tokio::test]
401 async fn test_set_create_container_finished() {
402 let (mut sender, mut progress) = create_progress_pairs();
403
404 sender
405 .set_create_container_finished(CreateDeploymentStepOutcome::Success)
406 .await;
407 assert_eq!(
408 progress.wait_for_create_container_outcome().await.unwrap(),
409 CreateDeploymentStepOutcome::Success
410 );
411 }
412
413 #[tokio::test]
414 async fn test_set_start_container_finished() {
415 let (mut sender, mut progress) = create_progress_pairs();
416
417 sender
418 .set_start_container_finished(CreateDeploymentStepOutcome::Success)
419 .await;
420 assert_eq!(
421 progress.wait_for_start_container_outcome().await.unwrap(),
422 CreateDeploymentStepOutcome::Success
423 );
424 }
425
426 #[tokio::test]
427 async fn test_set_wait_for_healthy_deployment_finished() {
428 let (mut sender, mut progress) = create_progress_pairs();
429
430 sender
431 .set_wait_for_healthy_deployment_finished(CreateDeploymentStepOutcome::Success)
432 .await;
433 assert_eq!(
434 progress
435 .wait_for_wait_for_healthy_deployment_outcome()
436 .await
437 .unwrap(),
438 CreateDeploymentStepOutcome::Success
439 );
440 }
441
442 #[tokio::test]
443 async fn test_finalize_deployment_success_all_steps_completed() {
444 let (mut sender, mut progress) = create_progress_pairs();
445
446 sender
448 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
449 .await;
450 sender
451 .set_create_container_finished(CreateDeploymentStepOutcome::Success)
452 .await;
453 sender
454 .set_start_container_finished(CreateDeploymentStepOutcome::Success)
455 .await;
456 sender
457 .set_wait_for_healthy_deployment_finished(CreateDeploymentStepOutcome::Success)
458 .await;
459
460 let deployment = create_test_deployment();
461 sender.finalize_deployment(Ok(deployment.clone())).await;
462
463 let result = progress.wait_for_deployment_outcome().await;
466 assert!(result.is_ok());
467 assert_eq!(result.unwrap(), deployment);
468 }
469
470 #[tokio::test]
471 async fn test_finalize_deployment_success_some_steps_skipped() {
472 let (mut sender, mut progress) = create_progress_pairs();
473
474 sender
476 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
477 .await;
478 sender
479 .set_create_container_finished(CreateDeploymentStepOutcome::Success)
480 .await;
481 let deployment = create_test_deployment();
484 sender.finalize_deployment(Ok(deployment.clone())).await;
485
486 assert_eq!(
488 progress.wait_for_start_container_outcome().await.unwrap(),
489 CreateDeploymentStepOutcome::Skipped
490 );
491 assert_eq!(
492 progress
493 .wait_for_wait_for_healthy_deployment_outcome()
494 .await
495 .unwrap(),
496 CreateDeploymentStepOutcome::Skipped
497 );
498
499 let result = progress.wait_for_deployment_outcome().await;
501 assert!(result.is_ok());
502 assert_eq!(result.unwrap(), deployment);
503 }
504
505 #[tokio::test]
506 async fn test_finalize_deployment_error_all_steps_uncompleted() {
507 let (sender, mut progress) = create_progress_pairs();
508
509 let error = create_test_error().await;
510 sender.finalize_deployment(Err(error)).await;
511
512 assert_eq!(
514 progress.wait_for_pull_image_outcome().await.unwrap(),
515 CreateDeploymentStepOutcome::Failure
516 );
517 assert_eq!(
518 progress.wait_for_create_container_outcome().await.unwrap(),
519 CreateDeploymentStepOutcome::Skipped
520 );
521 assert_eq!(
522 progress.wait_for_start_container_outcome().await.unwrap(),
523 CreateDeploymentStepOutcome::Skipped
524 );
525 assert_eq!(
526 progress
527 .wait_for_wait_for_healthy_deployment_outcome()
528 .await
529 .unwrap(),
530 CreateDeploymentStepOutcome::Skipped
531 );
532
533 let result = progress.wait_for_deployment_outcome().await;
535 assert!(result.is_err());
536 match result.unwrap_err() {
537 CreateDeploymentError::ReceiveDeployment(_) => {}
538 _ => panic!("Expected ReceiveDeployment error"),
539 }
540 }
541
542 #[tokio::test]
543 async fn test_finalize_deployment_error_some_steps_completed() {
544 let (mut sender, mut progress) = create_progress_pairs();
545
546 sender
548 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
549 .await;
550 sender
551 .set_create_container_finished(CreateDeploymentStepOutcome::Success)
552 .await;
553 let error = create_test_error().await;
557 sender.finalize_deployment(Err(error)).await;
558
559 assert_eq!(
561 progress.wait_for_pull_image_outcome().await.unwrap(),
562 CreateDeploymentStepOutcome::Success
563 );
564 assert_eq!(
565 progress.wait_for_create_container_outcome().await.unwrap(),
566 CreateDeploymentStepOutcome::Success
567 );
568
569 assert_eq!(
571 progress.wait_for_start_container_outcome().await.unwrap(),
572 CreateDeploymentStepOutcome::Failure
573 );
574
575 assert_eq!(
577 progress
578 .wait_for_wait_for_healthy_deployment_outcome()
579 .await
580 .unwrap(),
581 CreateDeploymentStepOutcome::Skipped
582 );
583
584 let result = progress.wait_for_deployment_outcome().await;
586 assert!(result.is_err());
587 }
588
589 #[tokio::test]
590 async fn test_send_outcome_when_sender_absent() {
591 let (mut sender, _progress) = create_progress_pairs();
592
593 sender
595 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
596 .await;
597
598 sender
600 .set_pull_image_finished(CreateDeploymentStepOutcome::Failure)
601 .await;
602 }
604
605 #[tokio::test]
606 async fn test_send_outcome_when_receiver_dropped() {
607 let (mut sender, progress) = create_progress_pairs();
608
609 drop(progress);
611
612 sender
614 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
615 .await;
616 }
618
619 #[tokio::test]
620 async fn test_finalize_deployment_when_receiver_dropped() {
621 let (sender, progress) = create_progress_pairs();
622
623 drop(progress);
625
626 let deployment = create_test_deployment();
628 sender.finalize_deployment(Ok(deployment)).await;
629 }
631
632 #[test]
633 fn test_create_deployment_step_outcome_debug() {
634 let outcome = CreateDeploymentStepOutcome::Success;
635 let debug_str = format!("{:?}", outcome);
636 assert!(debug_str.contains("Success"));
637 }
638
639 #[test]
640 fn test_create_deployment_step_outcome_clone() {
641 let outcome = CreateDeploymentStepOutcome::Success;
642 let cloned = outcome;
643 assert_eq!(outcome, cloned);
644 }
645
646 #[test]
647 fn test_create_deployment_step_outcome_partial_eq() {
648 assert_eq!(
649 CreateDeploymentStepOutcome::Success,
650 CreateDeploymentStepOutcome::Success
651 );
652 assert_eq!(
653 CreateDeploymentStepOutcome::Skipped,
654 CreateDeploymentStepOutcome::Skipped
655 );
656 assert_eq!(
657 CreateDeploymentStepOutcome::Failure,
658 CreateDeploymentStepOutcome::Failure
659 );
660 assert_ne!(
661 CreateDeploymentStepOutcome::Success,
662 CreateDeploymentStepOutcome::Failure
663 );
664 assert_ne!(
665 CreateDeploymentStepOutcome::Success,
666 CreateDeploymentStepOutcome::Skipped
667 );
668 assert_ne!(
669 CreateDeploymentStepOutcome::Skipped,
670 CreateDeploymentStepOutcome::Failure
671 );
672 }
673
674 #[tokio::test]
675 async fn test_await_receiver_pending() {
676 let (mut sender, mut progress) = create_progress_pairs();
677
678 tokio::spawn(async move {
680 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
681 sender
682 .set_pull_image_finished(CreateDeploymentStepOutcome::Success)
683 .await;
684 });
685
686 let outcome = progress.wait_for_pull_image_outcome().await;
688 assert!(outcome.is_ok());
689 assert_eq!(outcome.unwrap(), CreateDeploymentStepOutcome::Success);
690 }
691}