Skip to main content

commonware_deployer/aws/
ec2.rs

1//! AWS EC2 SDK function wrappers
2
3use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::aws::{
5    utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6    PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10use aws_sdk_ec2::{
11    error::BuildError,
12    primitives::Blob,
13    types::{
14        BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
15        SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
16    },
17    Error as Ec2Error,
18};
19pub use aws_sdk_ec2::{
20    types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType},
21    Client as Ec2Client,
22};
23use std::{
24    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
25    time::Duration,
26};
27use tokio::time::sleep;
28use tracing::debug;
29
30/// Creates an EC2 client for the specified AWS region
31pub async fn create_client(region: Region) -> Ec2Client {
32    let retry = aws_config::retry::RetryConfig::adaptive()
33        .with_max_attempts(u32::MAX)
34        .with_initial_backoff(Duration::from_millis(500))
35        .with_max_backoff(Duration::from_secs(30))
36        .with_reconnect_mode(aws_sdk_ec2::config::retry::ReconnectMode::ReconnectOnTransientError);
37    let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
38        .region(region)
39        .retry_config(retry)
40        .load()
41        .await;
42    Ec2Client::new(&config)
43}
44
45/// Imports an SSH public key into the specified region
46pub async fn import_key_pair(
47    client: &Ec2Client,
48    key_name: &str,
49    public_key: &str,
50) -> Result<(), Ec2Error> {
51    let blob = Blob::new(public_key.as_bytes());
52    client
53        .import_key_pair()
54        .key_name(key_name)
55        .public_key_material(blob)
56        .send()
57        .await?;
58    Ok(())
59}
60
61/// Deletes an SSH key pair from the specified region
62pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
63    client.delete_key_pair().key_name(key_name).send().await?;
64    Ok(())
65}
66
67/// Detects the architecture of an instance type using the AWS API
68pub(crate) async fn detect_architecture(
69    client: &Ec2Client,
70    instance_type: &str,
71) -> Result<super::Architecture, Ec2Error> {
72    let response = client
73        .describe_instance_types()
74        .instance_types(InstanceType::try_parse(instance_type).expect("invalid instance type"))
75        .send()
76        .await?;
77
78    let instance_info = response
79        .instance_types
80        .and_then(|types| types.into_iter().next())
81        .ok_or_else(|| {
82            Ec2Error::from(BuildError::other(format!(
83                "instance type {instance_type} not found"
84            )))
85        })?;
86
87    let architectures = instance_info
88        .processor_info
89        .and_then(|p| p.supported_architectures)
90        .unwrap_or_default();
91
92    // EC2 instance types only support one architecture (e.g., t4g.* = arm64, t3.* = x86_64),
93    // so the check order here doesn't matter in practice.
94    if architectures.iter().any(|a| a.as_ref() == "arm64") {
95        Ok(super::Architecture::Arm64)
96    } else if architectures.iter().any(|a| a.as_ref() == "x86_64") {
97        Ok(super::Architecture::X86_64)
98    } else {
99        Err(Ec2Error::from(BuildError::other(format!(
100            "instance type {instance_type} has no supported architecture"
101        ))))
102    }
103}
104
105/// Finds the latest Ubuntu 24.04 AMI for the given architecture in the region
106pub(crate) async fn find_latest_ami(
107    client: &Ec2Client,
108    architecture: super::Architecture,
109) -> Result<String, Ec2Error> {
110    let arch = architecture.as_str();
111    let resp = client
112        .describe_images()
113        .filters(
114            Filter::builder()
115                .name("name")
116                .values(format!(
117                    "ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-{arch}-server-*"
118                ))
119                .build(),
120        )
121        .filters(
122            Filter::builder()
123                .name("root-device-type")
124                .values("ebs")
125                .build(),
126        )
127        .owners("099720109477") // Canonical's AWS account ID
128        .send()
129        .await?;
130    let mut images = resp.images.unwrap_or_default();
131    if images.is_empty() {
132        return Err(Ec2Error::from(BuildError::other(
133            "No matching AMI found".to_string(),
134        )));
135    }
136    images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
137    let latest_ami = images[0].image_id().unwrap();
138    Ok(latest_ami.to_string())
139}
140
141/// Creates a VPC with the specified CIDR block and tag
142pub async fn create_vpc(
143    client: &Ec2Client,
144    cidr_block: &str,
145    tag: &str,
146) -> Result<String, Ec2Error> {
147    let resp = client
148        .create_vpc()
149        .cidr_block(cidr_block)
150        .tag_specifications(
151            TagSpecification::builder()
152                .resource_type(ResourceType::Vpc)
153                .tags(Tag::builder().key("deployer").value(tag).build())
154                .build(),
155        )
156        .send()
157        .await?;
158    Ok(resp.vpc.unwrap().vpc_id.unwrap())
159}
160
161/// Creates an Internet Gateway and attaches it to the specified VPC
162pub async fn create_and_attach_igw(
163    client: &Ec2Client,
164    vpc_id: &str,
165    tag: &str,
166) -> Result<String, Ec2Error> {
167    let igw_resp = client
168        .create_internet_gateway()
169        .tag_specifications(
170            TagSpecification::builder()
171                .resource_type(ResourceType::InternetGateway)
172                .tags(Tag::builder().key("deployer").value(tag).build())
173                .build(),
174        )
175        .send()
176        .await?;
177    let igw_id = igw_resp
178        .internet_gateway
179        .unwrap()
180        .internet_gateway_id
181        .unwrap();
182    client
183        .attach_internet_gateway()
184        .internet_gateway_id(&igw_id)
185        .vpc_id(vpc_id)
186        .send()
187        .await?;
188    Ok(igw_id)
189}
190
191/// Creates a route table for the VPC and sets up a default route to the Internet Gateway
192pub async fn create_route_table(
193    client: &Ec2Client,
194    vpc_id: &str,
195    igw_id: &str,
196    tag: &str,
197) -> Result<String, Ec2Error> {
198    let rt_resp = client
199        .create_route_table()
200        .vpc_id(vpc_id)
201        .tag_specifications(
202            TagSpecification::builder()
203                .resource_type(ResourceType::RouteTable)
204                .tags(Tag::builder().key("deployer").value(tag).build())
205                .build(),
206        )
207        .send()
208        .await?;
209    let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
210    client
211        .create_route()
212        .route_table_id(&rt_id)
213        .destination_cidr_block("0.0.0.0/0")
214        .gateway_id(igw_id)
215        .send()
216        .await?;
217    Ok(rt_id)
218}
219
220/// Creates a subnet within the VPC and associates it with the route table
221pub async fn create_subnet(
222    client: &Ec2Client,
223    vpc_id: &str,
224    route_table_id: &str,
225    subnet_cidr: &str,
226    availability_zone: &str,
227    tag: &str,
228) -> Result<String, Ec2Error> {
229    let subnet_resp = client
230        .create_subnet()
231        .vpc_id(vpc_id)
232        .cidr_block(subnet_cidr)
233        .availability_zone(availability_zone)
234        .tag_specifications(
235            TagSpecification::builder()
236                .resource_type(ResourceType::Subnet)
237                .tags(Tag::builder().key("deployer").value(tag).build())
238                .build(),
239        )
240        .send()
241        .await?;
242    let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
243    client
244        .associate_route_table()
245        .route_table_id(route_table_id)
246        .subnet_id(&subnet_id)
247        .send()
248        .await?;
249    Ok(subnet_id)
250}
251
252/// Creates a security group for the monitoring instance with access from the deployer IP
253pub async fn create_security_group_monitoring(
254    client: &Ec2Client,
255    vpc_id: &str,
256    deployer_ip: &str,
257    tag: &str,
258) -> Result<String, Ec2Error> {
259    let sg_resp = client
260        .create_security_group()
261        .group_name(tag)
262        .description("Security group for monitoring instance")
263        .vpc_id(vpc_id)
264        .tag_specifications(
265            TagSpecification::builder()
266                .resource_type(ResourceType::SecurityGroup)
267                .tags(Tag::builder().key("deployer").value(tag).build())
268                .build(),
269        )
270        .send()
271        .await?;
272    let sg_id = sg_resp.group_id.unwrap();
273    client
274        .authorize_security_group_ingress()
275        .group_id(&sg_id)
276        .ip_permissions(
277            IpPermission::builder()
278                .ip_protocol(DEPLOYER_PROTOCOL)
279                .from_port(DEPLOYER_MIN_PORT)
280                .to_port(DEPLOYER_MAX_PORT)
281                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
282                .build(),
283        )
284        .send()
285        .await?;
286    Ok(sg_id)
287}
288
289/// Creates a security group for binary instances with access from deployer and custom ports
290/// Note: monitoring IP rules are added separately via `add_monitoring_ingress` after monitoring instance launches
291pub async fn create_security_group_binary(
292    client: &Ec2Client,
293    vpc_id: &str,
294    deployer_ip: &str,
295    tag: &str,
296    ports: &[PortConfig],
297) -> Result<String, Ec2Error> {
298    let sg_resp = client
299        .create_security_group()
300        .group_name(format!("{tag}-binary"))
301        .description("Security group for binary instances")
302        .vpc_id(vpc_id)
303        .tag_specifications(
304            TagSpecification::builder()
305                .resource_type(ResourceType::SecurityGroup)
306                .tags(Tag::builder().key("deployer").value(tag).build())
307                .build(),
308        )
309        .send()
310        .await?;
311    let sg_id = sg_resp.group_id.unwrap();
312    let mut builder = client
313        .authorize_security_group_ingress()
314        .group_id(&sg_id)
315        .ip_permissions(
316            IpPermission::builder()
317                .ip_protocol(DEPLOYER_PROTOCOL)
318                .from_port(DEPLOYER_MIN_PORT)
319                .to_port(DEPLOYER_MAX_PORT)
320                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
321                .build(),
322        );
323    for port in ports {
324        builder = builder.ip_permissions(
325            IpPermission::builder()
326                .ip_protocol(&port.protocol)
327                .from_port(port.port as i32)
328                .to_port(port.port as i32)
329                .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
330                .build(),
331        );
332    }
333    builder.send().await?;
334    Ok(sg_id)
335}
336
337/// Adds monitoring IP ingress rules to a binary security group for Prometheus scraping
338pub async fn add_monitoring_ingress(
339    client: &Ec2Client,
340    sg_id: &str,
341    monitoring_ip: &str,
342) -> Result<(), Ec2Error> {
343    client
344        .authorize_security_group_ingress()
345        .group_id(sg_id)
346        .ip_permissions(
347            IpPermission::builder()
348                .ip_protocol("tcp")
349                .from_port(METRICS_PORT as i32)
350                .to_port(METRICS_PORT as i32)
351                .ip_ranges(
352                    IpRange::builder()
353                        .cidr_ip(exact_cidr(monitoring_ip))
354                        .build(),
355                )
356                .build(),
357        )
358        .ip_permissions(
359            IpPermission::builder()
360                .ip_protocol("tcp")
361                .from_port(SYSTEM_PORT as i32)
362                .to_port(SYSTEM_PORT as i32)
363                .ip_ranges(
364                    IpRange::builder()
365                        .cidr_ip(exact_cidr(monitoring_ip))
366                        .build(),
367                )
368                .build(),
369        )
370        .send()
371        .await?;
372    Ok(())
373}
374
375/// Attempts to launch EC2 instances. May fail on transient errors or rate limits.
376#[allow(clippy::too_many_arguments)]
377async fn try_launch_instances(
378    client: &Ec2Client,
379    ami_id: &str,
380    instance_type: InstanceType,
381    storage_size: i32,
382    storage_class: VolumeType,
383    key_name: &str,
384    subnet_id: &str,
385    sg_id: &str,
386    count: i32,
387    name: &str,
388    tag: &str,
389) -> Result<Vec<String>, Ec2Error> {
390    let resp = client
391        .run_instances()
392        .image_id(ami_id)
393        .instance_type(instance_type)
394        .key_name(key_name)
395        .min_count(count)
396        .max_count(count)
397        .network_interfaces(
398            aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
399                .associate_public_ip_address(true)
400                .device_index(0)
401                .subnet_id(subnet_id)
402                .groups(sg_id)
403                .build(),
404        )
405        .tag_specifications(
406            TagSpecification::builder()
407                .resource_type(ResourceType::Instance)
408                .set_tags(Some(vec![
409                    Tag::builder().key("deployer").value(tag).build(),
410                    Tag::builder().key("name").value(name).build(),
411                ]))
412                .build(),
413        )
414        .block_device_mappings(
415            BlockDeviceMapping::builder()
416                .device_name("/dev/sda1")
417                .ebs(
418                    EbsBlockDevice::builder()
419                        .volume_size(storage_size)
420                        .volume_type(storage_class)
421                        .delete_on_termination(true)
422                        .build(),
423                )
424                .build(),
425        )
426        .send()
427        .await?;
428    Ok(resp
429        .instances
430        .unwrap()
431        .into_iter()
432        .map(|i| i.instance_id.unwrap())
433        .collect())
434}
435
436/// Checks if an EC2 error can be resolved by trying a different subnet/AZ.
437fn is_subnet_fallback_error(e: &Ec2Error) -> bool {
438    let error_str = e.to_string();
439    error_str.contains("InsufficientInstanceCapacity")
440        || error_str.contains("InsufficientFreeAddressesInSubnet")
441}
442
443/// Checks if an EC2 error is fatal and should not be retried.
444fn is_fatal_ec2_error(e: &Ec2Error) -> bool {
445    let error_str = e.to_string();
446    error_str.contains("VcpuLimitExceeded")
447        || error_str.contains("InstanceLimitExceeded")
448        || error_str.contains("MaxSpotInstanceCountExceeded")
449        || error_str.contains("VolumeLimitExceeded")
450        || error_str.contains("InvalidParameterValue")
451        || error_str.contains("InvalidAMIID")
452        || error_str.contains("InvalidSubnetID")
453        || error_str.contains("InvalidGroup")
454        || error_str.contains("InvalidKeyPair")
455}
456
457/// Launches EC2 instances with specified configurations.
458/// Filters subnets to those supporting the instance type, distributes across them starting at
459/// `start_idx`, and falls back to other subnets on capacity errors.
460#[allow(clippy::too_many_arguments)]
461pub async fn launch_instances(
462    client: &Ec2Client,
463    ami_id: &str,
464    instance_type: InstanceType,
465    storage_size: i32,
466    storage_class: VolumeType,
467    key_name: &str,
468    subnets: &[(String, String)],
469    az_support: &BTreeMap<String, BTreeSet<String>>,
470    start_idx: usize,
471    sg_id: &str,
472    count: i32,
473    name: &str,
474    tag: &str,
475) -> Result<(Vec<String>, String), super::Error> {
476    // Filter to subnets in AZs that support this instance type
477    let instance_type_str = instance_type.to_string();
478    let eligible: Vec<(&str, &str)> = subnets
479        .iter()
480        .filter(|(az, _)| {
481            az_support
482                .get(az)
483                .is_some_and(|types| types.contains(&instance_type_str))
484        })
485        .map(|(az, subnet_id)| (az.as_str(), subnet_id.as_str()))
486        .collect();
487    if eligible.is_empty() {
488        return Err(super::Error::UnsupportedInstanceType(instance_type_str));
489    }
490
491    // Try each subnet starting at start_idx offset (for round-robin distribution across instances)
492    let len = eligible.len();
493    let mut last_error = None;
494    for i in 0..len {
495        let (az, subnet_id) = eligible[(start_idx + i) % len];
496        let mut attempt = 0u32;
497        loop {
498            match try_launch_instances(
499                client,
500                ami_id,
501                instance_type.clone(),
502                storage_size,
503                storage_class.clone(),
504                key_name,
505                subnet_id,
506                sg_id,
507                count,
508                name,
509                tag,
510            )
511            .await
512            {
513                Ok(ids) => return Ok((ids, az.to_string())),
514                Err(e) => {
515                    if is_fatal_ec2_error(&e) {
516                        return Err(super::Error::AwsEc2(e));
517                    }
518                    if is_subnet_fallback_error(&e) {
519                        // Capacity error in this AZ, try next subnet
520                        debug!(
521                            name = name,
522                            subnets_remaining = len - i - 1,
523                            error = %e,
524                            "capacity error, trying next subnet"
525                        );
526                        last_error = Some(e);
527                        break;
528                    }
529                    debug!(
530                        name = name,
531                        attempt = attempt + 1,
532                        error = %e,
533                        "launch_instances failed, retrying"
534                    );
535                    attempt = attempt.saturating_add(1);
536                    let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
537                    sleep(backoff).await;
538                }
539            }
540        }
541    }
542
543    Err(last_error.map_or(super::Error::NoSubnetsAvailable, super::Error::AwsEc2))
544}
545
546/// Waits for instances to reach the "running" state and returns their public IPs
547/// in the same order as the input instance IDs.
548pub async fn wait_for_instances_running(
549    client: &Ec2Client,
550    instance_ids: &[String],
551) -> Result<Vec<String>, Ec2Error> {
552    // Track discovered IPs to avoid re-polling running instances
553    let mut discovered_ips: HashMap<String, String> = HashMap::new();
554    let mut pending_ids: HashSet<String> = instance_ids.iter().cloned().collect();
555    let mut attempt = 0u32;
556    loop {
557        // Only query instances that haven't been discovered yet
558        let query_ids: Vec<String> = pending_ids.iter().cloned().collect();
559        let resp = match client
560            .describe_instances()
561            .set_instance_ids(Some(query_ids))
562            .send()
563            .await
564        {
565            Ok(resp) => {
566                attempt = 0;
567                resp
568            }
569            Err(e) => {
570                attempt = attempt.saturating_add(1);
571                debug!(
572                    pending = pending_ids.len(),
573                    attempt = attempt,
574                    error = %e,
575                    "describe_instances failed, retrying"
576                );
577                sleep(RETRY_INTERVAL).await;
578                continue;
579            }
580        };
581
582        // Check each instance and record those that are running with IPs
583        for reservation in resp.reservations.unwrap_or_default() {
584            for instance in reservation.instances.unwrap_or_default() {
585                let id = match instance.instance_id {
586                    Some(id) => id,
587                    None => continue,
588                };
589                let is_running = instance.state.as_ref().and_then(|s| s.name.as_ref())
590                    == Some(&InstanceStateName::Running);
591                if is_running {
592                    if let Some(ip) = instance.public_ip_address {
593                        discovered_ips.insert(id.clone(), ip);
594                        pending_ids.remove(&id);
595                    }
596                }
597            }
598        }
599
600        // Return once all instances are discovered
601        if pending_ids.is_empty() {
602            return Ok(instance_ids
603                .iter()
604                .map(|id| discovered_ips.remove(id).unwrap())
605                .collect());
606        }
607
608        // Try again after a delay
609        sleep(RETRY_INTERVAL).await;
610    }
611}
612
613pub async fn wait_for_instances_ready(
614    client: &Ec2Client,
615    instance_ids: &[String],
616) -> Result<(), Ec2Error> {
617    loop {
618        // Ask for instance status
619        let Ok(resp) = client
620            .describe_instance_status()
621            .set_instance_ids(Some(instance_ids.to_vec()))
622            .include_all_instances(true) // Include instances regardless of state
623            .send()
624            .await
625        else {
626            sleep(RETRY_INTERVAL).await;
627            continue;
628        };
629
630        // Confirm all are ready
631        let statuses = resp.instance_statuses.unwrap_or_default();
632        let all_ready = statuses.iter().all(|s| {
633            s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
634                && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
635                && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
636                    == &SummaryStatus::Ok
637        });
638        if !all_ready {
639            sleep(RETRY_INTERVAL).await;
640            continue;
641        }
642        return Ok(());
643    }
644}
645
646/// Retrieves the private IP address of an instance
647pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
648    let resp = client
649        .describe_instances()
650        .instance_ids(instance_id)
651        .send()
652        .await?;
653    let reservations = resp.reservations.unwrap();
654    let instance = &reservations[0].instances.as_ref().unwrap()[0];
655    Ok(instance.private_ip_address.as_ref().unwrap().clone())
656}
657
658/// Creates a VPC peering connection between two VPCs
659pub async fn create_vpc_peering_connection(
660    client: &Ec2Client,
661    requester_vpc_id: &str,
662    peer_vpc_id: &str,
663    peer_region: &str,
664    tag: &str,
665) -> Result<String, Ec2Error> {
666    let resp = client
667        .create_vpc_peering_connection()
668        .vpc_id(requester_vpc_id)
669        .peer_vpc_id(peer_vpc_id)
670        .peer_region(peer_region)
671        .tag_specifications(
672            TagSpecification::builder()
673                .resource_type(ResourceType::VpcPeeringConnection)
674                .tags(Tag::builder().key("deployer").value(tag).build())
675                .build(),
676        )
677        .send()
678        .await?;
679    Ok(resp
680        .vpc_peering_connection
681        .unwrap()
682        .vpc_peering_connection_id
683        .unwrap())
684}
685
686/// Waits for a VPC peering connection to reach the "pending-acceptance" state
687pub async fn wait_for_vpc_peering_connection(
688    client: &Ec2Client,
689    peer_id: &str,
690) -> Result<(), Ec2Error> {
691    loop {
692        if let Ok(resp) = client
693            .describe_vpc_peering_connections()
694            .vpc_peering_connection_ids(peer_id)
695            .send()
696            .await
697        {
698            if let Some(connections) = resp.vpc_peering_connections {
699                if let Some(connection) = connections.first() {
700                    if connection.status.as_ref().unwrap().code
701                        == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
702                    {
703                        return Ok(());
704                    }
705                }
706            }
707        }
708        sleep(Duration::from_secs(2)).await;
709    }
710}
711
712/// Accepts a VPC peering connection in the peer region
713pub async fn accept_vpc_peering_connection(
714    client: &Ec2Client,
715    peer_id: &str,
716) -> Result<(), Ec2Error> {
717    client
718        .accept_vpc_peering_connection()
719        .vpc_peering_connection_id(peer_id)
720        .send()
721        .await?;
722    Ok(())
723}
724
725/// Adds a route to a route table for VPC peering
726pub async fn add_route(
727    client: &Ec2Client,
728    route_table_id: &str,
729    destination_cidr: &str,
730    peer_id: &str,
731) -> Result<(), Ec2Error> {
732    client
733        .create_route()
734        .route_table_id(route_table_id)
735        .destination_cidr_block(destination_cidr)
736        .vpc_peering_connection_id(peer_id)
737        .send()
738        .await?;
739    Ok(())
740}
741
742/// Finds VPC peering connections by deployer tag
743pub async fn find_vpc_peering_by_tag(
744    client: &Ec2Client,
745    tag: &str,
746) -> Result<Vec<String>, Ec2Error> {
747    let resp = client
748        .describe_vpc_peering_connections()
749        .filters(Filter::builder().name("tag:deployer").values(tag).build())
750        .send()
751        .await?;
752    Ok(resp
753        .vpc_peering_connections
754        .unwrap_or_default()
755        .into_iter()
756        .map(|p| p.vpc_peering_connection_id.unwrap())
757        .collect())
758}
759
760/// Deletes a VPC peering connection
761pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
762    client
763        .delete_vpc_peering_connection()
764        .vpc_peering_connection_id(peering_id)
765        .send()
766        .await?;
767    Ok(())
768}
769
770/// Waits for a VPC peering connection to be deleted
771pub async fn wait_for_vpc_peering_deletion(
772    ec2_client: &Ec2Client,
773    peer_id: &str,
774) -> Result<(), Ec2Error> {
775    loop {
776        let resp = ec2_client
777            .describe_vpc_peering_connections()
778            .vpc_peering_connection_ids(peer_id)
779            .send()
780            .await?;
781        if let Some(connections) = resp.vpc_peering_connections {
782            if let Some(connection) = connections.first() {
783                if connection.status.as_ref().unwrap().code
784                    == Some(VpcPeeringConnectionStateReasonCode::Deleted)
785                {
786                    return Ok(());
787                }
788            } else {
789                return Ok(());
790            }
791        } else {
792            return Ok(());
793        }
794        sleep(RETRY_INTERVAL).await;
795    }
796}
797
798/// Finds instances by deployer tag
799pub async fn find_instances_by_tag(
800    ec2_client: &Ec2Client,
801    tag: &str,
802) -> Result<Vec<String>, Ec2Error> {
803    let resp = ec2_client
804        .describe_instances()
805        .filters(Filter::builder().name("tag:deployer").values(tag).build())
806        .send()
807        .await?;
808    Ok(resp
809        .reservations
810        .unwrap_or_default()
811        .into_iter()
812        .flat_map(|r| r.instances.unwrap_or_default())
813        .map(|i| i.instance_id.unwrap())
814        .collect())
815}
816
817/// Terminates specified instances
818pub async fn terminate_instances(
819    ec2_client: &Ec2Client,
820    instance_ids: &[String],
821) -> Result<(), Ec2Error> {
822    if instance_ids.is_empty() {
823        return Ok(());
824    }
825    ec2_client
826        .terminate_instances()
827        .set_instance_ids(Some(instance_ids.to_vec()))
828        .send()
829        .await?;
830    Ok(())
831}
832
833/// Waits for instances to be terminated
834pub async fn wait_for_instances_terminated(
835    ec2_client: &Ec2Client,
836    instance_ids: &[String],
837) -> Result<(), Ec2Error> {
838    loop {
839        let resp = ec2_client
840            .describe_instances()
841            .set_instance_ids(Some(instance_ids.to_vec()))
842            .send()
843            .await?;
844        let instances = resp
845            .reservations
846            .unwrap_or_default()
847            .into_iter()
848            .flat_map(|r| r.instances.unwrap_or_default())
849            .collect::<Vec<_>>();
850        if instances.iter().all(|i| {
851            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
852        }) {
853            return Ok(());
854        }
855        sleep(RETRY_INTERVAL).await;
856    }
857}
858
859/// Finds security groups by deployer tag
860pub async fn find_security_groups_by_tag(
861    ec2_client: &Ec2Client,
862    tag: &str,
863) -> Result<Vec<SecurityGroup>, Ec2Error> {
864    let resp = ec2_client
865        .describe_security_groups()
866        .filters(Filter::builder().name("tag:deployer").values(tag).build())
867        .send()
868        .await?;
869    Ok(resp
870        .security_groups
871        .unwrap_or_default()
872        .into_iter()
873        .collect())
874}
875
876/// Deletes a security group
877pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
878    ec2_client
879        .delete_security_group()
880        .group_id(sg_id)
881        .send()
882        .await?;
883    Ok(())
884}
885
886/// Finds route tables by deployer tag
887pub async fn find_route_tables_by_tag(
888    ec2_client: &Ec2Client,
889    tag: &str,
890) -> Result<Vec<String>, Ec2Error> {
891    let resp = ec2_client
892        .describe_route_tables()
893        .filters(Filter::builder().name("tag:deployer").values(tag).build())
894        .send()
895        .await?;
896    Ok(resp
897        .route_tables
898        .unwrap_or_default()
899        .into_iter()
900        .map(|rt| rt.route_table_id.unwrap())
901        .collect())
902}
903
904/// Deletes a route table
905pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
906    ec2_client
907        .delete_route_table()
908        .route_table_id(rt_id)
909        .send()
910        .await?;
911    Ok(())
912}
913
914/// Finds Internet Gateways by deployer tag
915pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
916    let resp = ec2_client
917        .describe_internet_gateways()
918        .filters(Filter::builder().name("tag:deployer").values(tag).build())
919        .send()
920        .await?;
921    Ok(resp
922        .internet_gateways
923        .unwrap_or_default()
924        .into_iter()
925        .map(|igw| igw.internet_gateway_id.unwrap())
926        .collect())
927}
928
929/// Finds the VPC ID attached to an Internet Gateway, if any
930pub async fn find_vpc_by_igw(
931    ec2_client: &Ec2Client,
932    igw_id: &str,
933) -> Result<Option<String>, Ec2Error> {
934    let resp = ec2_client
935        .describe_internet_gateways()
936        .internet_gateway_ids(igw_id)
937        .send()
938        .await?;
939    Ok(resp
940        .internet_gateways
941        .and_then(|gws| gws.into_iter().next())
942        .and_then(|gw| gw.attachments)
943        .and_then(|attachments| attachments.into_iter().next())
944        .and_then(|attachment| attachment.vpc_id))
945}
946
947/// Returns the set of regions that are enabled for the AWS account
948pub async fn get_enabled_regions(ec2_client: &Ec2Client) -> Result<HashSet<String>, Ec2Error> {
949    let resp = ec2_client
950        .describe_regions()
951        .all_regions(true)
952        .filters(
953            Filter::builder()
954                .name("opt-in-status")
955                .values("opt-in-not-required")
956                .values("opted-in")
957                .build(),
958        )
959        .send()
960        .await?;
961    Ok(resp
962        .regions
963        .unwrap_or_default()
964        .into_iter()
965        .filter_map(|r| r.region_name)
966        .collect())
967}
968
969/// Detaches an Internet Gateway from a VPC
970pub async fn detach_igw(
971    ec2_client: &Ec2Client,
972    igw_id: &str,
973    vpc_id: &str,
974) -> Result<(), Ec2Error> {
975    ec2_client
976        .detach_internet_gateway()
977        .internet_gateway_id(igw_id)
978        .vpc_id(vpc_id)
979        .send()
980        .await?;
981    Ok(())
982}
983
984/// Deletes an Internet Gateway
985pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
986    ec2_client
987        .delete_internet_gateway()
988        .internet_gateway_id(igw_id)
989        .send()
990        .await?;
991    Ok(())
992}
993
994/// Finds subnets by deployer tag
995pub async fn find_subnets_by_tag(
996    ec2_client: &Ec2Client,
997    tag: &str,
998) -> Result<Vec<String>, Ec2Error> {
999    let resp = ec2_client
1000        .describe_subnets()
1001        .filters(Filter::builder().name("tag:deployer").values(tag).build())
1002        .send()
1003        .await?;
1004    Ok(resp
1005        .subnets
1006        .unwrap_or_default()
1007        .into_iter()
1008        .map(|subnet| subnet.subnet_id.unwrap())
1009        .collect())
1010}
1011
1012/// Deletes a subnet
1013pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
1014    ec2_client
1015        .delete_subnet()
1016        .subnet_id(subnet_id)
1017        .send()
1018        .await?;
1019    Ok(())
1020}
1021
1022/// Finds VPCs by deployer tag
1023pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
1024    let resp = ec2_client
1025        .describe_vpcs()
1026        .filters(Filter::builder().name("tag:deployer").values(tag).build())
1027        .send()
1028        .await?;
1029    Ok(resp
1030        .vpcs
1031        .unwrap_or_default()
1032        .into_iter()
1033        .map(|vpc| vpc.vpc_id.unwrap())
1034        .collect())
1035}
1036
1037/// Deletes a VPC
1038pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
1039    ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
1040    Ok(())
1041}
1042
1043/// Returns a map of AZ -> set of supported instance types for the given instance types.
1044pub async fn find_az_instance_support(
1045    client: &Ec2Client,
1046    instance_types: &[String],
1047) -> Result<BTreeMap<String, BTreeSet<String>>, Ec2Error> {
1048    let offerings = client
1049        .describe_instance_type_offerings()
1050        .location_type("availability-zone".into())
1051        .filters(
1052            Filter::builder()
1053                .name("instance-type")
1054                .set_values(Some(instance_types.to_vec()))
1055                .build(),
1056        )
1057        .send()
1058        .await?
1059        .instance_type_offerings
1060        .unwrap_or_default();
1061
1062    // Build map of AZ -> supported instance types
1063    let mut az_to_instance_types: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
1064    for offering in offerings {
1065        if let (Some(location), Some(instance_type)) = (
1066            offering.location,
1067            offering.instance_type.map(|it| it.to_string()),
1068        ) {
1069            az_to_instance_types
1070                .entry(location)
1071                .or_default()
1072                .insert(instance_type);
1073        }
1074    }
1075    if az_to_instance_types.is_empty() {
1076        return Err(Ec2Error::from(BuildError::other(format!(
1077            "no availability zone supports any of: {instance_types:?}"
1078        ))));
1079    }
1080
1081    Ok(az_to_instance_types)
1082}
1083
1084/// Waits until all network interfaces associated with a security group are deleted
1085pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
1086    loop {
1087        let resp = ec2_client
1088            .describe_network_interfaces()
1089            .filters(Filter::builder().name("group-id").values(sg_id).build())
1090            .send()
1091            .await?;
1092        if resp.network_interfaces.unwrap_or_default().is_empty() {
1093            return Ok(());
1094        }
1095        sleep(RETRY_INTERVAL).await;
1096    }
1097}