devops_cli/
aws.rs

1use aws_sdk_ec2::types::{Instance, InstanceState, InstanceStateChange};
2use aws_smithy_types_convert::date_time::DateTimeExt;
3use derive_new::new;
4use serde::{Deserialize, Serialize};
5use std::fmt::{Display, Formatter};
6use std::ops::BitAnd;
7use thiserror::Error;
8
9pub struct AwsClient(aws_types::SdkConfig);
10
11impl AwsClient {
12    pub async fn new() -> AwsClient {
13        // We can't set unlimited, so we set a sufficiently high value (12h)
14        let long_enough = chrono::Duration::hours(12)
15            .to_std()
16            .expect(crate::XKCD_EXPECT_MSG);
17
18        let cache = aws_config::identity::LazyCacheBuilder::default()
19            .load_timeout(long_enough)
20            .build();
21
22        let aws_config = aws_config::ConfigLoader::default()
23            .behavior_version(aws_config::BehaviorVersion::v2025_01_17())
24            .identity_cache(cache);
25
26        AwsClient(aws_config.load().await)
27    }
28
29    pub async fn query_instances(
30        &self,
31        filters: Vec<QueryFilter>,
32    ) -> Result<Vec<Ec2Instance>, AwsClientError> {
33        let client = aws_sdk_ec2::client::Client::new(&self.0);
34        let mut operation = client.describe_instances();
35
36        for filter in filters {
37            operation = operation.filters(filter.into());
38        }
39
40        let result = operation.max_results(1000).send().await?;
41
42        let reservations: Vec<aws_sdk_ec2::types::Reservation> = result
43            .reservations
44            .expect("AWS provided instance data did not include reservations, which is expected");
45        let instances: Vec<_> = reservations
46            .into_iter()
47            .flat_map(|r| r.instances.unwrap_or_default())
48            .flat_map(std::convert::TryInto::try_into)
49            .collect();
50
51        Ok(instances)
52    }
53}
54
55#[derive(Deserialize)]
56pub struct AwsConfig {
57    /// The profile to use
58    pub profile: Option<String>,
59}
60
61#[derive(Error, Debug)]
62pub enum AwsClientError {
63    #[error(transparent)]
64    SdkError(Box<dyn std::error::Error + Send + Sync>),
65}
66
67impl<E> From<aws_sdk_ec2::error::SdkError<E>> for AwsClientError
68where
69    E: std::error::Error + Send + Sync + 'static,
70{
71    fn from(e: aws_sdk_ec2::error::SdkError<E>) -> Self {
72        AwsClientError::SdkError(Box::new(e))
73    }
74}
75
76#[derive(Serialize, Deserialize, Clone, Debug)]
77pub struct Ec2Instance {
78    pub id: String,
79    pub state: Ec2InstanceState,
80    pub availability_zone: String,
81    pub private_ip: String,
82    pub private_dns: Option<String>,
83    pub public_ip: Option<String>,
84    pub public_dns: Option<String>,
85    pub launch_time: chrono::DateTime<chrono::Utc>,
86    pub tags: indexmap::IndexMap<String, String>,
87}
88
89impl Ec2Instance {
90    #[must_use]
91    pub fn to_short_string(&self) -> String {
92        let name = self.tags.get("Name").map_or("", |s| s.as_str());
93        format!("{} ({}, {:?})", name, self.id, self.state)
94    }
95}
96
97impl TryFrom<Instance> for Ec2Instance {
98    type Error = ParseError;
99
100    fn try_from(value: Instance) -> Result<Self, Self::Error> {
101        let mut tags: indexmap::IndexMap<_, _> = value
102            .tags
103            .unwrap_or_default()
104            .into_iter()
105            .map(|v| (v.key.unwrap_or_default(), v.value.unwrap_or_default()))
106            .collect();
107        tags.sort_keys();
108
109        let instance = Ec2Instance {
110            id: value.instance_id.ok_or(ParseError("instance_id"))?,
111            state: value.state.ok_or(ParseError("state"))?.into(),
112            availability_zone: value
113                .placement
114                .ok_or(ParseError("placement"))?
115                .availability_zone
116                .ok_or(ParseError("placement.availability_zone"))?,
117            private_ip: value.private_ip_address.ok_or(ParseError("ip_private"))?,
118            private_dns: value.private_dns_name.filter(|s| !s.is_empty()),
119            public_ip: value.public_ip_address.filter(|s| !s.is_empty()),
120            public_dns: value.public_dns_name.filter(|s| !s.is_empty()),
121            launch_time: value
122                .launch_time
123                .and_then(|dt| dt.to_chrono_utc().ok())
124                .ok_or(ParseError("launch_time"))?,
125            tags,
126        };
127        Ok(instance)
128    }
129}
130
131#[derive(Serialize, Deserialize, Default, Clone, Debug)]
132pub enum Ec2InstanceState {
133    #[default]
134    Unknown,
135    Pending,
136    Running,
137    ShuttingDown,
138    Terminated,
139    Stopping,
140    Stopped,
141}
142
143impl From<InstanceState> for Ec2InstanceState {
144    // Lower bits
145    //  0 : pending
146    // 16 : running
147    // 32 : shutting-down
148    // 48 : terminated
149    // 64 : stopping
150    // 80 : stopped
151    fn from(s: InstanceState) -> Self {
152        match s.code.unwrap_or(0).bitand(0xFF) {
153            0 => Ec2InstanceState::Pending,
154            16 => Ec2InstanceState::Running,
155            32 => Ec2InstanceState::ShuttingDown,
156            48 => Ec2InstanceState::Terminated,
157            64 => Ec2InstanceState::Stopping,
158            80 => Ec2InstanceState::Stopped,
159            _ => Ec2InstanceState::Unknown,
160        }
161    }
162}
163
164impl Display for Ec2InstanceState {
165    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166        let name = match self {
167            Ec2InstanceState::Pending => "pending",
168            Ec2InstanceState::Running => "running",
169            Ec2InstanceState::ShuttingDown => "shutting down",
170            Ec2InstanceState::Terminated => "terminated",
171            Ec2InstanceState::Stopping => "stopping",
172            Ec2InstanceState::Stopped => "stopped",
173            Ec2InstanceState::Unknown => "unknown",
174        };
175        f.write_str(name)
176    }
177}
178
179#[derive(Serialize, Debug)]
180pub struct Ec2InstanceStateChange {
181    pub instance_id: String,
182    pub previous_state: Ec2InstanceState,
183    pub current_state: Ec2InstanceState,
184}
185
186impl From<InstanceStateChange> for Ec2InstanceStateChange {
187    fn from(i: InstanceStateChange) -> Self {
188        Self {
189            instance_id: i.instance_id.unwrap_or_default(),
190            previous_state: i
191                .previous_state
192                .map(std::convert::Into::into)
193                .unwrap_or_default(),
194            current_state: i
195                .current_state
196                .map(std::convert::Into::into)
197                .unwrap_or_default(),
198        }
199    }
200}
201
202#[derive(Error, Debug)]
203#[error("Missing field {0}")]
204pub struct ParseError(&'static str);
205
206#[derive(clap::Args, Debug)]
207pub struct Ec2SelectArgs {
208    /// Raw filters passed to AWS API (`--filter key=value[,value2...]`)
209    ///
210    /// For possible values see the 'Filter.N' section of DescribeInstances:
211    /// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html
212    #[clap(short, long, verbatim_doc_comment)]
213    pub filter: Option<Vec<String>>,
214
215    /// Only 'regular' doc-proc servers
216    ///
217    /// This is a shortcut for `--filter tag:AV_Scan=false --filter tag:StepfileProcessor=false`
218    #[clap(long)]
219    pub docproc: bool,
220
221    /// Only av-scan doc-proc servers
222    ///
223    /// This is a shortcut for `--filter tag:AV_Scan=true`
224    #[clap(long)]
225    pub avscan: bool,
226
227    /// Exclude av-scan doc-proc servers
228    ///
229    /// This is a shortcut for `--filter tag:AV_Scan=false`
230    #[clap(long, conflicts_with = "avscan")]
231    pub no_avscan: bool,
232
233    /// Only stepfile-processor doc-proc servers
234    ///
235    /// This is a shortcut for `--filter tag:StepfileProcessor=true`
236    #[clap(long)]
237    pub stepfile: bool,
238
239    /// Exclude stepfile-processor doc-proc servers
240    ///
241    /// This is a shortcut for `--filter tag:StepfileProcessor=false`
242    #[clap(long, conflicts_with = "stepfile")]
243    pub no_stepfile: bool,
244
245    /// Filter instances
246    ///
247    /// - start with `i-` to filter (starts_with) match on aws instance id
248    /// - numbers will only match on the end of the name, intended for cluster-id filtering
249    /// - anything else will be matched (contains) on the tag `Name`
250    #[clap(verbatim_doc_comment)]
251    pub query: Vec<String>,
252}
253
254impl Ec2SelectArgs {
255    pub fn filter_with_extra_flags(&self) -> Vec<String> {
256        let mut filters = self.filter.as_ref().cloned().unwrap_or_default();
257        if self.docproc {
258            filters.push("tag:AV_Scan=false".to_string());
259            filters.push("tag:StepfileProcessor=false".to_string());
260        }
261        if self.avscan {
262            filters.push("tag:AV_Scan=true".to_string());
263        }
264        if self.no_avscan {
265            filters.push("tag:AV_Scan=false".to_string());
266        }
267        if self.stepfile {
268            filters.push("tag:StepfileProcessor=true".to_string());
269        }
270        if self.no_stepfile {
271            filters.push("tag:StepfileProcessor=false".to_string());
272        }
273
274        filters
275    }
276
277    pub fn has_no_filters(&self) -> bool {
278        self.query.is_empty()
279            && self.filter.as_ref().map(|f| f.is_empty()).unwrap_or(true)
280            && !self.avscan
281            && !self.no_avscan
282            && !self.stepfile
283            && !self.docproc
284            && !self.no_stepfile
285    }
286}
287
288pub async fn list_instances(
289    opts: &Ec2SelectArgs,
290    client: &AwsClient,
291) -> anyhow::Result<Vec<Ec2Instance>> {
292    let filters: Result<Vec<_>, ()> = opts
293        .filter_with_extra_flags()
294        .iter()
295        .map(|f| f.parse())
296        .collect();
297    let filters = filters.map_err(|_| anyhow::anyhow!("Unable to parse filters"))?;
298
299    let instances = client.query_instances(filters).await?;
300    let user_query = Ec2InstanceFilter::new(opts.query.clone());
301    let mut instances: Vec<_> = instances
302        .into_iter()
303        .filter(|i| user_query.filter(i))
304        .collect();
305
306    instances.sort_by_key(|i| std::cmp::Reverse(i.launch_time));
307
308    Ok(instances)
309}
310
311#[derive(Debug, Eq, PartialEq)]
312enum Ec2InstanceFilterKind {
313    /// AWS instance id in the format `i-abcdefghijk`
314    AwsInstanceId(String),
315    /// A numeric identifier, used in clusters (1, 2, 3, ...)
316    InstanceId(u8),
317    /// Free form text to match
318    Text(Vec<String>),
319}
320
321impl std::str::FromStr for Ec2InstanceFilterKind {
322    type Err = ();
323
324    fn from_str(s: &str) -> Result<Self, Self::Err> {
325        if s.is_empty() {
326            return Err(());
327        }
328
329        if s.starts_with("i-") {
330            return Ok(Ec2InstanceFilterKind::AwsInstanceId(s.to_string()));
331        }
332
333        if let Ok(i) = s.parse::<u8>() {
334            return Ok(Ec2InstanceFilterKind::InstanceId(i));
335        }
336
337        Ok(Ec2InstanceFilterKind::Text(
338            s.split(',').map(String::from).collect(),
339        ))
340    }
341}
342
343struct Ec2InstanceFilter(Vec<Ec2InstanceFilterKind>);
344
345impl Ec2InstanceFilter {
346    pub fn new(f: Vec<String>) -> Self {
347        let filters: Vec<Ec2InstanceFilterKind> = f.into_iter().flat_map(|s| s.parse()).collect();
348
349        Self(filters)
350    }
351
352    pub fn filter(&self, i: &Ec2Instance) -> bool {
353        self.0.iter().all(|filter| match filter {
354            Ec2InstanceFilterKind::AwsInstanceId(id) => i.id.starts_with(id),
355            Ec2InstanceFilterKind::InstanceId(id) => i
356                .tags
357                .get("Name")
358                .map(|name| name.ends_with(&format!("{id}")))
359                .unwrap_or_default(),
360            Ec2InstanceFilterKind::Text(query) => i
361                .tags
362                .get("Name")
363                .map(|name| {
364                    query.iter().any(|q| {
365                        if let Some(q) = q.strip_prefix('_') {
366                            !name.contains(q)
367                        } else {
368                            name.contains(q)
369                        }
370                    })
371                })
372                .unwrap_or_default(),
373        })
374    }
375}
376
377#[derive(new, Debug)]
378pub struct QueryFilter {
379    pub key: String,
380    pub values: Vec<String>,
381}
382
383impl std::str::FromStr for QueryFilter {
384    type Err = ();
385
386    fn from_str(value: &str) -> Result<Self, Self::Err> {
387        let (key, values) = value.split_once('=').ok_or(())?;
388
389        let key = key.to_owned();
390        let values: Vec<String> = values.split(',').map(ToOwned::to_owned).collect();
391
392        Ok(QueryFilter { key, values })
393    }
394}
395
396impl From<QueryFilter> for aws_sdk_ec2::types::Filter {
397    fn from(f: QueryFilter) -> Self {
398        aws_sdk_ec2::types::Filter::builder()
399            .name(f.key)
400            .set_values(Some(f.values).filter(|v| !v.is_empty()))
401            .build()
402    }
403}
404
405impl From<QueryFilter> for aws_sdk_autoscaling::types::Filter {
406    fn from(f: QueryFilter) -> Self {
407        aws_sdk_autoscaling::types::Filter::builder()
408            .name(f.key)
409            .set_values(Some(f.values).filter(|v| !v.is_empty()))
410            .build()
411    }
412}