commonware_deployer/ec2/
aws.rs

1//! AWS EC2 SDK function wrappers
2
3use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5    utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6    PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12    error::BuildError,
13    primitives::Blob,
14    types::{
15        BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16        SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17    },
18    Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21    collections::{HashMap, HashSet},
22    time::Duration,
23};
24use tokio::time::sleep;
25
26/// Creates an EC2 client for the specified AWS region
27pub async fn create_ec2_client(region: Region) -> Ec2Client {
28    let retry = aws_config::retry::RetryConfig::adaptive()
29        .with_max_attempts(10)
30        .with_initial_backoff(Duration::from_millis(500))
31        .with_max_backoff(Duration::from_secs(30));
32    let config = aws_config::defaults(BehaviorVersion::v2025_08_07())
33        .region(region)
34        .retry_config(retry)
35        .load()
36        .await;
37    Ec2Client::new(&config)
38}
39
40/// Imports an SSH public key into the specified region
41pub async fn import_key_pair(
42    client: &Ec2Client,
43    key_name: &str,
44    public_key: &str,
45) -> Result<(), Ec2Error> {
46    let blob = Blob::new(public_key.as_bytes());
47    client
48        .import_key_pair()
49        .key_name(key_name)
50        .public_key_material(blob)
51        .send()
52        .await?;
53    Ok(())
54}
55
56/// Deletes an SSH key pair from the specified region
57pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58    client.delete_key_pair().key_name(key_name).send().await?;
59    Ok(())
60}
61
62/// Detects the architecture of an instance type using the AWS API
63pub(crate) async fn detect_architecture(
64    client: &Ec2Client,
65    instance_type: &str,
66) -> Result<super::Architecture, Ec2Error> {
67    let response = client
68        .describe_instance_types()
69        .instance_types(InstanceType::try_parse(instance_type).expect("invalid instance type"))
70        .send()
71        .await?;
72
73    let instance_info = response
74        .instance_types
75        .and_then(|types| types.into_iter().next())
76        .ok_or_else(|| {
77            Ec2Error::from(BuildError::other(format!(
78                "instance type {instance_type} not found"
79            )))
80        })?;
81
82    let architectures = instance_info
83        .processor_info
84        .and_then(|p| p.supported_architectures)
85        .unwrap_or_default();
86
87    // EC2 instance types only support one architecture (e.g., t4g.* = arm64, t3.* = x86_64),
88    // so the check order here doesn't matter in practice.
89    if architectures.iter().any(|a| a.as_ref() == "arm64") {
90        Ok(super::Architecture::Arm64)
91    } else if architectures.iter().any(|a| a.as_ref() == "x86_64") {
92        Ok(super::Architecture::X86_64)
93    } else {
94        Err(Ec2Error::from(BuildError::other(format!(
95            "instance type {instance_type} has no supported architecture"
96        ))))
97    }
98}
99
100/// Finds the latest Ubuntu 24.04 AMI for the given architecture in the region
101pub(crate) async fn find_latest_ami(
102    client: &Ec2Client,
103    architecture: super::Architecture,
104) -> Result<String, Ec2Error> {
105    let arch = architecture.as_str();
106    let resp = client
107        .describe_images()
108        .filters(
109            Filter::builder()
110                .name("name")
111                .values(format!(
112                    "ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-{arch}-server-*"
113                ))
114                .build(),
115        )
116        .filters(
117            Filter::builder()
118                .name("root-device-type")
119                .values("ebs")
120                .build(),
121        )
122        .owners("099720109477") // Canonical's AWS account ID
123        .send()
124        .await?;
125    let mut images = resp.images.unwrap_or_default();
126    if images.is_empty() {
127        return Err(Ec2Error::from(BuildError::other(
128            "No matching AMI found".to_string(),
129        )));
130    }
131    images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
132    let latest_ami = images[0].image_id().unwrap();
133    Ok(latest_ami.to_string())
134}
135
136/// Creates a VPC with the specified CIDR block and tag
137pub async fn create_vpc(
138    client: &Ec2Client,
139    cidr_block: &str,
140    tag: &str,
141) -> Result<String, Ec2Error> {
142    let resp = client
143        .create_vpc()
144        .cidr_block(cidr_block)
145        .tag_specifications(
146            TagSpecification::builder()
147                .resource_type(ResourceType::Vpc)
148                .tags(Tag::builder().key("deployer").value(tag).build())
149                .build(),
150        )
151        .send()
152        .await?;
153    Ok(resp.vpc.unwrap().vpc_id.unwrap())
154}
155
156/// Creates an Internet Gateway and attaches it to the specified VPC
157pub async fn create_and_attach_igw(
158    client: &Ec2Client,
159    vpc_id: &str,
160    tag: &str,
161) -> Result<String, Ec2Error> {
162    let igw_resp = client
163        .create_internet_gateway()
164        .tag_specifications(
165            TagSpecification::builder()
166                .resource_type(ResourceType::InternetGateway)
167                .tags(Tag::builder().key("deployer").value(tag).build())
168                .build(),
169        )
170        .send()
171        .await?;
172    let igw_id = igw_resp
173        .internet_gateway
174        .unwrap()
175        .internet_gateway_id
176        .unwrap();
177    client
178        .attach_internet_gateway()
179        .internet_gateway_id(&igw_id)
180        .vpc_id(vpc_id)
181        .send()
182        .await?;
183    Ok(igw_id)
184}
185
186/// Creates a route table for the VPC and sets up a default route to the Internet Gateway
187pub async fn create_route_table(
188    client: &Ec2Client,
189    vpc_id: &str,
190    igw_id: &str,
191    tag: &str,
192) -> Result<String, Ec2Error> {
193    let rt_resp = client
194        .create_route_table()
195        .vpc_id(vpc_id)
196        .tag_specifications(
197            TagSpecification::builder()
198                .resource_type(ResourceType::RouteTable)
199                .tags(Tag::builder().key("deployer").value(tag).build())
200                .build(),
201        )
202        .send()
203        .await?;
204    let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
205    client
206        .create_route()
207        .route_table_id(&rt_id)
208        .destination_cidr_block("0.0.0.0/0")
209        .gateway_id(igw_id)
210        .send()
211        .await?;
212    Ok(rt_id)
213}
214
215/// Creates a subnet within the VPC and associates it with the route table
216pub async fn create_subnet(
217    client: &Ec2Client,
218    vpc_id: &str,
219    route_table_id: &str,
220    subnet_cidr: &str,
221    availability_zone: &str,
222    tag: &str,
223) -> Result<String, Ec2Error> {
224    let subnet_resp = client
225        .create_subnet()
226        .vpc_id(vpc_id)
227        .cidr_block(subnet_cidr)
228        .availability_zone(availability_zone)
229        .tag_specifications(
230            TagSpecification::builder()
231                .resource_type(ResourceType::Subnet)
232                .tags(Tag::builder().key("deployer").value(tag).build())
233                .build(),
234        )
235        .send()
236        .await?;
237    let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
238    client
239        .associate_route_table()
240        .route_table_id(route_table_id)
241        .subnet_id(&subnet_id)
242        .send()
243        .await?;
244    Ok(subnet_id)
245}
246
247/// Creates a security group for the monitoring instance with access from the deployer IP
248pub async fn create_security_group_monitoring(
249    client: &Ec2Client,
250    vpc_id: &str,
251    deployer_ip: &str,
252    tag: &str,
253) -> Result<String, Ec2Error> {
254    let sg_resp = client
255        .create_security_group()
256        .group_name(tag)
257        .description("Security group for monitoring instance")
258        .vpc_id(vpc_id)
259        .tag_specifications(
260            TagSpecification::builder()
261                .resource_type(ResourceType::SecurityGroup)
262                .tags(Tag::builder().key("deployer").value(tag).build())
263                .build(),
264        )
265        .send()
266        .await?;
267    let sg_id = sg_resp.group_id.unwrap();
268    client
269        .authorize_security_group_ingress()
270        .group_id(&sg_id)
271        .ip_permissions(
272            IpPermission::builder()
273                .ip_protocol(DEPLOYER_PROTOCOL)
274                .from_port(DEPLOYER_MIN_PORT)
275                .to_port(DEPLOYER_MAX_PORT)
276                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
277                .build(),
278        )
279        .send()
280        .await?;
281    Ok(sg_id)
282}
283
284/// Creates a security group for binary instances with access from deployer and custom ports
285/// Note: monitoring IP rules are added separately via `add_monitoring_ingress` after monitoring instance launches
286pub async fn create_security_group_binary(
287    client: &Ec2Client,
288    vpc_id: &str,
289    deployer_ip: &str,
290    tag: &str,
291    ports: &[PortConfig],
292) -> Result<String, Ec2Error> {
293    let sg_resp = client
294        .create_security_group()
295        .group_name(format!("{tag}-binary"))
296        .description("Security group for binary instances")
297        .vpc_id(vpc_id)
298        .tag_specifications(
299            TagSpecification::builder()
300                .resource_type(ResourceType::SecurityGroup)
301                .tags(Tag::builder().key("deployer").value(tag).build())
302                .build(),
303        )
304        .send()
305        .await?;
306    let sg_id = sg_resp.group_id.unwrap();
307    let mut builder = client
308        .authorize_security_group_ingress()
309        .group_id(&sg_id)
310        .ip_permissions(
311            IpPermission::builder()
312                .ip_protocol(DEPLOYER_PROTOCOL)
313                .from_port(DEPLOYER_MIN_PORT)
314                .to_port(DEPLOYER_MAX_PORT)
315                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
316                .build(),
317        );
318    for port in ports {
319        builder = builder.ip_permissions(
320            IpPermission::builder()
321                .ip_protocol(&port.protocol)
322                .from_port(port.port as i32)
323                .to_port(port.port as i32)
324                .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
325                .build(),
326        );
327    }
328    builder.send().await?;
329    Ok(sg_id)
330}
331
332/// Adds monitoring IP ingress rules to a binary security group for Prometheus scraping
333pub async fn add_monitoring_ingress(
334    client: &Ec2Client,
335    sg_id: &str,
336    monitoring_ip: &str,
337) -> Result<(), Ec2Error> {
338    client
339        .authorize_security_group_ingress()
340        .group_id(sg_id)
341        .ip_permissions(
342            IpPermission::builder()
343                .ip_protocol("tcp")
344                .from_port(METRICS_PORT as i32)
345                .to_port(METRICS_PORT as i32)
346                .ip_ranges(
347                    IpRange::builder()
348                        .cidr_ip(exact_cidr(monitoring_ip))
349                        .build(),
350                )
351                .build(),
352        )
353        .ip_permissions(
354            IpPermission::builder()
355                .ip_protocol("tcp")
356                .from_port(SYSTEM_PORT as i32)
357                .to_port(SYSTEM_PORT as i32)
358                .ip_ranges(
359                    IpRange::builder()
360                        .cidr_ip(exact_cidr(monitoring_ip))
361                        .build(),
362                )
363                .build(),
364        )
365        .send()
366        .await?;
367    Ok(())
368}
369
370/// Launches EC2 instances with specified configurations
371#[allow(clippy::too_many_arguments)]
372pub async fn launch_instances(
373    client: &Ec2Client,
374    ami_id: &str,
375    instance_type: InstanceType,
376    storage_size: i32,
377    storage_class: VolumeType,
378    key_name: &str,
379    subnet_id: &str,
380    sg_id: &str,
381    count: i32,
382    name: &str,
383    tag: &str,
384) -> Result<Vec<String>, Ec2Error> {
385    let resp = client
386        .run_instances()
387        .image_id(ami_id)
388        .instance_type(instance_type)
389        .key_name(key_name)
390        .min_count(count)
391        .max_count(count)
392        .network_interfaces(
393            aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
394                .associate_public_ip_address(true)
395                .device_index(0)
396                .subnet_id(subnet_id)
397                .groups(sg_id)
398                .build(),
399        )
400        .tag_specifications(
401            TagSpecification::builder()
402                .resource_type(ResourceType::Instance)
403                .set_tags(Some(vec![
404                    Tag::builder().key("deployer").value(tag).build(),
405                    Tag::builder().key("name").value(name).build(),
406                ]))
407                .build(),
408        )
409        .block_device_mappings(
410            BlockDeviceMapping::builder()
411                .device_name("/dev/sda1")
412                .ebs(
413                    EbsBlockDevice::builder()
414                        .volume_size(storage_size)
415                        .volume_type(storage_class)
416                        .delete_on_termination(true)
417                        .build(),
418                )
419                .build(),
420        )
421        .send()
422        .await?;
423    Ok(resp
424        .instances
425        .unwrap()
426        .into_iter()
427        .map(|i| i.instance_id.unwrap())
428        .collect())
429}
430
431/// Waits for instances to reach the "running" state and returns their public IPs
432pub async fn wait_for_instances_running(
433    client: &Ec2Client,
434    instance_ids: &[String],
435) -> Result<Vec<String>, Ec2Error> {
436    loop {
437        // Ask for instance details
438        let Ok(resp) = client
439            .describe_instances()
440            .set_instance_ids(Some(instance_ids.to_vec()))
441            .send()
442            .await
443        else {
444            sleep(RETRY_INTERVAL).await;
445            continue;
446        };
447
448        // Confirm all are running
449        let reservations = resp.reservations.unwrap();
450        let instances = reservations[0].instances.as_ref().unwrap();
451        if !instances.iter().all(|i| {
452            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
453        }) {
454            sleep(RETRY_INTERVAL).await;
455            continue;
456        }
457        return Ok(instances
458            .iter()
459            .map(|i| i.public_ip_address.as_ref().unwrap().clone())
460            .collect());
461    }
462}
463
464pub async fn wait_for_instances_ready(
465    client: &Ec2Client,
466    instance_ids: &[String],
467) -> Result<(), Ec2Error> {
468    loop {
469        // Ask for instance status
470        let Ok(resp) = client
471            .describe_instance_status()
472            .set_instance_ids(Some(instance_ids.to_vec()))
473            .include_all_instances(true) // Include instances regardless of state
474            .send()
475            .await
476        else {
477            sleep(RETRY_INTERVAL).await;
478            continue;
479        };
480
481        // Confirm all are ready
482        let statuses = resp.instance_statuses.unwrap_or_default();
483        let all_ready = statuses.iter().all(|s| {
484            s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
485                && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
486                && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
487                    == &SummaryStatus::Ok
488        });
489        if !all_ready {
490            sleep(RETRY_INTERVAL).await;
491            continue;
492        }
493        return Ok(());
494    }
495}
496
497/// Retrieves the private IP address of an instance
498pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
499    let resp = client
500        .describe_instances()
501        .instance_ids(instance_id)
502        .send()
503        .await?;
504    let reservations = resp.reservations.unwrap();
505    let instance = &reservations[0].instances.as_ref().unwrap()[0];
506    Ok(instance.private_ip_address.as_ref().unwrap().clone())
507}
508
509/// Creates a VPC peering connection between two VPCs
510pub async fn create_vpc_peering_connection(
511    client: &Ec2Client,
512    requester_vpc_id: &str,
513    peer_vpc_id: &str,
514    peer_region: &str,
515    tag: &str,
516) -> Result<String, Ec2Error> {
517    let resp = client
518        .create_vpc_peering_connection()
519        .vpc_id(requester_vpc_id)
520        .peer_vpc_id(peer_vpc_id)
521        .peer_region(peer_region)
522        .tag_specifications(
523            TagSpecification::builder()
524                .resource_type(ResourceType::VpcPeeringConnection)
525                .tags(Tag::builder().key("deployer").value(tag).build())
526                .build(),
527        )
528        .send()
529        .await?;
530    Ok(resp
531        .vpc_peering_connection
532        .unwrap()
533        .vpc_peering_connection_id
534        .unwrap())
535}
536
537/// Waits for a VPC peering connection to reach the "pending-acceptance" state
538pub async fn wait_for_vpc_peering_connection(
539    client: &Ec2Client,
540    peer_id: &str,
541) -> Result<(), Ec2Error> {
542    loop {
543        if let Ok(resp) = client
544            .describe_vpc_peering_connections()
545            .vpc_peering_connection_ids(peer_id)
546            .send()
547            .await
548        {
549            if let Some(connections) = resp.vpc_peering_connections {
550                if let Some(connection) = connections.first() {
551                    if connection.status.as_ref().unwrap().code
552                        == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
553                    {
554                        return Ok(());
555                    }
556                }
557            }
558        }
559        sleep(Duration::from_secs(2)).await;
560    }
561}
562
563/// Accepts a VPC peering connection in the peer region
564pub async fn accept_vpc_peering_connection(
565    client: &Ec2Client,
566    peer_id: &str,
567) -> Result<(), Ec2Error> {
568    client
569        .accept_vpc_peering_connection()
570        .vpc_peering_connection_id(peer_id)
571        .send()
572        .await?;
573    Ok(())
574}
575
576/// Adds a route to a route table for VPC peering
577pub async fn add_route(
578    client: &Ec2Client,
579    route_table_id: &str,
580    destination_cidr: &str,
581    peer_id: &str,
582) -> Result<(), Ec2Error> {
583    client
584        .create_route()
585        .route_table_id(route_table_id)
586        .destination_cidr_block(destination_cidr)
587        .vpc_peering_connection_id(peer_id)
588        .send()
589        .await?;
590    Ok(())
591}
592
593/// Finds VPC peering connections by deployer tag
594pub async fn find_vpc_peering_by_tag(
595    client: &Ec2Client,
596    tag: &str,
597) -> Result<Vec<String>, Ec2Error> {
598    let resp = client
599        .describe_vpc_peering_connections()
600        .filters(Filter::builder().name("tag:deployer").values(tag).build())
601        .send()
602        .await?;
603    Ok(resp
604        .vpc_peering_connections
605        .unwrap_or_default()
606        .into_iter()
607        .map(|p| p.vpc_peering_connection_id.unwrap())
608        .collect())
609}
610
611/// Deletes a VPC peering connection
612pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
613    client
614        .delete_vpc_peering_connection()
615        .vpc_peering_connection_id(peering_id)
616        .send()
617        .await?;
618    Ok(())
619}
620
621/// Waits for a VPC peering connection to be deleted
622pub async fn wait_for_vpc_peering_deletion(
623    ec2_client: &Ec2Client,
624    peer_id: &str,
625) -> Result<(), Ec2Error> {
626    loop {
627        let resp = ec2_client
628            .describe_vpc_peering_connections()
629            .vpc_peering_connection_ids(peer_id)
630            .send()
631            .await?;
632        if let Some(connections) = resp.vpc_peering_connections {
633            if let Some(connection) = connections.first() {
634                if connection.status.as_ref().unwrap().code
635                    == Some(VpcPeeringConnectionStateReasonCode::Deleted)
636                {
637                    return Ok(());
638                }
639            } else {
640                return Ok(());
641            }
642        } else {
643            return Ok(());
644        }
645        sleep(RETRY_INTERVAL).await;
646    }
647}
648
649/// Finds instances by deployer tag
650pub async fn find_instances_by_tag(
651    ec2_client: &Ec2Client,
652    tag: &str,
653) -> Result<Vec<String>, Ec2Error> {
654    let resp = ec2_client
655        .describe_instances()
656        .filters(Filter::builder().name("tag:deployer").values(tag).build())
657        .send()
658        .await?;
659    Ok(resp
660        .reservations
661        .unwrap_or_default()
662        .into_iter()
663        .flat_map(|r| r.instances.unwrap_or_default())
664        .map(|i| i.instance_id.unwrap())
665        .collect())
666}
667
668/// Terminates specified instances
669pub async fn terminate_instances(
670    ec2_client: &Ec2Client,
671    instance_ids: &[String],
672) -> Result<(), Ec2Error> {
673    if instance_ids.is_empty() {
674        return Ok(());
675    }
676    ec2_client
677        .terminate_instances()
678        .set_instance_ids(Some(instance_ids.to_vec()))
679        .send()
680        .await?;
681    Ok(())
682}
683
684/// Waits for instances to be terminated
685pub async fn wait_for_instances_terminated(
686    ec2_client: &Ec2Client,
687    instance_ids: &[String],
688) -> Result<(), Ec2Error> {
689    loop {
690        let resp = ec2_client
691            .describe_instances()
692            .set_instance_ids(Some(instance_ids.to_vec()))
693            .send()
694            .await?;
695        let instances = resp
696            .reservations
697            .unwrap_or_default()
698            .into_iter()
699            .flat_map(|r| r.instances.unwrap_or_default())
700            .collect::<Vec<_>>();
701        if instances.iter().all(|i| {
702            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
703        }) {
704            return Ok(());
705        }
706        sleep(RETRY_INTERVAL).await;
707    }
708}
709
710/// Finds security groups by deployer tag
711pub async fn find_security_groups_by_tag(
712    ec2_client: &Ec2Client,
713    tag: &str,
714) -> Result<Vec<SecurityGroup>, Ec2Error> {
715    let resp = ec2_client
716        .describe_security_groups()
717        .filters(Filter::builder().name("tag:deployer").values(tag).build())
718        .send()
719        .await?;
720    Ok(resp
721        .security_groups
722        .unwrap_or_default()
723        .into_iter()
724        .collect())
725}
726
727/// Deletes a security group
728pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
729    ec2_client
730        .delete_security_group()
731        .group_id(sg_id)
732        .send()
733        .await?;
734    Ok(())
735}
736
737/// Finds route tables by deployer tag
738pub async fn find_route_tables_by_tag(
739    ec2_client: &Ec2Client,
740    tag: &str,
741) -> Result<Vec<String>, Ec2Error> {
742    let resp = ec2_client
743        .describe_route_tables()
744        .filters(Filter::builder().name("tag:deployer").values(tag).build())
745        .send()
746        .await?;
747    Ok(resp
748        .route_tables
749        .unwrap_or_default()
750        .into_iter()
751        .map(|rt| rt.route_table_id.unwrap())
752        .collect())
753}
754
755/// Deletes a route table
756pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
757    ec2_client
758        .delete_route_table()
759        .route_table_id(rt_id)
760        .send()
761        .await?;
762    Ok(())
763}
764
765/// Finds Internet Gateways by deployer tag
766pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
767    let resp = ec2_client
768        .describe_internet_gateways()
769        .filters(Filter::builder().name("tag:deployer").values(tag).build())
770        .send()
771        .await?;
772    Ok(resp
773        .internet_gateways
774        .unwrap_or_default()
775        .into_iter()
776        .map(|igw| igw.internet_gateway_id.unwrap())
777        .collect())
778}
779
780/// Finds the VPC ID attached to an Internet Gateway
781pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
782    let resp = ec2_client
783        .describe_internet_gateways()
784        .internet_gateway_ids(igw_id)
785        .send()
786        .await?;
787    Ok(resp.internet_gateways.unwrap()[0]
788        .attachments
789        .as_ref()
790        .unwrap()[0]
791        .vpc_id
792        .as_ref()
793        .unwrap()
794        .clone())
795}
796
797/// Detaches an Internet Gateway from a VPC
798pub async fn detach_igw(
799    ec2_client: &Ec2Client,
800    igw_id: &str,
801    vpc_id: &str,
802) -> Result<(), Ec2Error> {
803    ec2_client
804        .detach_internet_gateway()
805        .internet_gateway_id(igw_id)
806        .vpc_id(vpc_id)
807        .send()
808        .await?;
809    Ok(())
810}
811
812/// Deletes an Internet Gateway
813pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
814    ec2_client
815        .delete_internet_gateway()
816        .internet_gateway_id(igw_id)
817        .send()
818        .await?;
819    Ok(())
820}
821
822/// Finds subnets by deployer tag
823pub async fn find_subnets_by_tag(
824    ec2_client: &Ec2Client,
825    tag: &str,
826) -> Result<Vec<String>, Ec2Error> {
827    let resp = ec2_client
828        .describe_subnets()
829        .filters(Filter::builder().name("tag:deployer").values(tag).build())
830        .send()
831        .await?;
832    Ok(resp
833        .subnets
834        .unwrap_or_default()
835        .into_iter()
836        .map(|subnet| subnet.subnet_id.unwrap())
837        .collect())
838}
839
840/// Deletes a subnet
841pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
842    ec2_client
843        .delete_subnet()
844        .subnet_id(subnet_id)
845        .send()
846        .await?;
847    Ok(())
848}
849
850/// Finds VPCs by deployer tag
851pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
852    let resp = ec2_client
853        .describe_vpcs()
854        .filters(Filter::builder().name("tag:deployer").values(tag).build())
855        .send()
856        .await?;
857    Ok(resp
858        .vpcs
859        .unwrap_or_default()
860        .into_iter()
861        .map(|vpc| vpc.vpc_id.unwrap())
862        .collect())
863}
864
865/// Deletes a VPC
866pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
867    ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
868    Ok(())
869}
870
871/// Finds the availability zone that supports all required instance types
872pub async fn find_availability_zone(
873    client: &Ec2Client,
874    instance_types: &[String],
875) -> Result<String, Ec2Error> {
876    // Retrieve all instance type offerings for availability zones in the region
877    let offerings = client
878        .describe_instance_type_offerings()
879        .location_type("availability-zone".into())
880        .filters(
881            Filter::builder()
882                .name("instance-type")
883                .set_values(Some(instance_types.to_vec()))
884                .build(),
885        )
886        .send()
887        .await?
888        .instance_type_offerings
889        .unwrap_or_default();
890
891    // Build a map from availability zone to the set of supported instance types
892    let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
893    for offering in offerings {
894        if let (Some(location), Some(instance_type)) = (
895            offering.location,
896            offering.instance_type.map(|it| it.to_string()), // Convert enum to String if necessary
897        ) {
898            az_to_instance_types
899                .entry(location)
900                .or_default()
901                .insert(instance_type);
902        }
903    }
904
905    // Convert the required instance types to a HashSet for efficient subset checking
906    let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
907
908    // Find an availability zone that supports all required instance types
909    for (az, supported_types) in az_to_instance_types {
910        if required_instance_types.is_subset(&supported_types) {
911            return Ok(az); // Return the first matching availability zone
912        }
913    }
914
915    // If no availability zone supports all instance types, return an error
916    Err(Ec2Error::from(BuildError::other(format!(
917        "no availability zone supports all required instance types: {instance_types:?}"
918    ))))
919}
920
921/// Waits until all network interfaces associated with a security group are deleted
922pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
923    loop {
924        let resp = ec2_client
925            .describe_network_interfaces()
926            .filters(Filter::builder().name("group-id").values(sg_id).build())
927            .send()
928            .await?;
929        if resp.network_interfaces.unwrap_or_default().is_empty() {
930            return Ok(());
931        }
932        sleep(RETRY_INTERVAL).await;
933    }
934}