1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::errors::{
    Error::{Other, API},
    Result,
};
use aws_sdk_ssm::{model::CommandInvocationStatus, types::SdkError, Client};
use aws_types::SdkConfig as AwsSdkConfig;
use tokio::time::{sleep, Duration, Instant};

/// Implements AWS SSM manager.
#[derive(Debug, Clone)]
pub struct Manager {
    #[allow(dead_code)]
    shared_config: AwsSdkConfig,
    cli: Client,
}

impl Manager {
    pub fn new(shared_config: &AwsSdkConfig) -> Self {
        let cloned = shared_config.clone();
        let cli = Client::new(shared_config);
        Self {
            shared_config: cloned,
            cli,
        }
    }

    pub fn client(&self) -> Client {
        self.cli.clone()
    }

    /// Polls SSM command status.
    /// ref. <https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_GetCommandInvocation.html>
    pub async fn poll_command(
        &self,
        command_id: &str,
        instance_id: &str,
        desired_status: CommandInvocationStatus,
        timeout: Duration,
        interval: Duration,
    ) -> Result<CommandInvocationStatus> {
        log::info!(
            "polling invocation status for command '{command_id}' and instance id '{instance_id}' with desired status {:?} for timeout {:?} and interval {:?}",
            desired_status,
            timeout,
            interval,
        );

        let start = Instant::now();
        let mut cnt: u128 = 0;
        loop {
            let elapsed = start.elapsed();
            if elapsed.gt(&timeout) {
                break;
            }

            let itv = {
                if cnt == 0 {
                    // first poll with no wait
                    Duration::from_secs(1)
                } else {
                    interval
                }
            };
            sleep(itv).await;

            let ret = self
                .cli
                .get_command_invocation()
                .command_id(command_id)
                .instance_id(instance_id)
                .send()
                .await;
            let out = match ret {
                Ok(v) => v,
                Err(e) => {
                    return Err(API {
                        message: format!("failed get_command_invocation {:?}", e),
                        is_retryable: is_error_retryable(&e),
                    });
                }
            };

            let current_status = out.status().unwrap();
            log::info!(
                "poll (current command status {:?}, elapsed {:?})",
                current_status,
                elapsed
            );

            if desired_status.ne(&CommandInvocationStatus::Failed)
                && current_status.eq(&CommandInvocationStatus::Failed)
            {
                return Err(Other {
                    message: String::from("command invocation failed"),
                    is_retryable: false,
                });
            }

            if current_status.eq(&desired_status) {
                return Ok(current_status.clone());
            }

            cnt += 1;
        }

        Err(Other {
            message: format!("failed to get command invocation {} in time", command_id),
            is_retryable: true,
        })
    }
}

#[inline]
pub fn is_error_retryable<E>(e: &SdkError<E>) -> bool {
    match e {
        SdkError::TimeoutError(_) | SdkError::ResponseError { .. } => true,
        SdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
        _ => false,
    }
}