commonware_deployer/ec2/
aws.rs

1//! AWS EC2 SDK function wrappers
2
3use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5    utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6    PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12    error::BuildError,
13    primitives::Blob,
14    types::{
15        BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16        SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17    },
18    Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21    collections::{HashMap, HashSet},
22    time::Duration,
23};
24use tokio::time::sleep;
25
26/// Creates an EC2 client for the specified AWS region
27pub async fn create_ec2_client(region: Region) -> Ec2Client {
28    let retry = aws_config::retry::RetryConfig::adaptive()
29        .with_max_attempts(10)
30        .with_initial_backoff(Duration::from_millis(500))
31        .with_max_backoff(Duration::from_secs(30));
32    let config = aws_config::defaults(BehaviorVersion::v2025_01_17())
33        .region(region)
34        .retry_config(retry)
35        .load()
36        .await;
37    Ec2Client::new(&config)
38}
39
40/// Imports an SSH public key into the specified region
41pub async fn import_key_pair(
42    client: &Ec2Client,
43    key_name: &str,
44    public_key: &str,
45) -> Result<(), Ec2Error> {
46    let blob = Blob::new(public_key.as_bytes());
47    client
48        .import_key_pair()
49        .key_name(key_name)
50        .public_key_material(blob)
51        .send()
52        .await?;
53    Ok(())
54}
55
56/// Deletes an SSH key pair from the specified region
57pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58    client.delete_key_pair().key_name(key_name).send().await?;
59    Ok(())
60}
61
62/// Finds the latest Ubuntu 24.04 ARM64 AMI in the region
63pub async fn find_latest_ami(client: &Ec2Client) -> Result<String, Ec2Error> {
64    let resp = client
65        .describe_images()
66        .filters(
67            Filter::builder()
68                .name("name")
69                .values("ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*")
70                .build(),
71        )
72        .filters(
73            Filter::builder()
74                .name("root-device-type")
75                .values("ebs")
76                .build(),
77        )
78        .owners("099720109477") // Canonical's AWS account ID
79        .send()
80        .await?;
81    let mut images = resp.images.unwrap_or_default();
82    if images.is_empty() {
83        return Err(Ec2Error::from(BuildError::other(
84            "No matching AMI found".to_string(),
85        )));
86    }
87    images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
88    let latest_ami = images[0].image_id().unwrap();
89    Ok(latest_ami.to_string())
90}
91
92/// Creates a VPC with the specified CIDR block and tag
93pub async fn create_vpc(
94    client: &Ec2Client,
95    cidr_block: &str,
96    tag: &str,
97) -> Result<String, Ec2Error> {
98    let resp = client
99        .create_vpc()
100        .cidr_block(cidr_block)
101        .tag_specifications(
102            TagSpecification::builder()
103                .resource_type(ResourceType::Vpc)
104                .tags(Tag::builder().key("deployer").value(tag).build())
105                .build(),
106        )
107        .send()
108        .await?;
109    Ok(resp.vpc.unwrap().vpc_id.unwrap())
110}
111
112/// Creates an Internet Gateway and attaches it to the specified VPC
113pub async fn create_and_attach_igw(
114    client: &Ec2Client,
115    vpc_id: &str,
116    tag: &str,
117) -> Result<String, Ec2Error> {
118    let igw_resp = client
119        .create_internet_gateway()
120        .tag_specifications(
121            TagSpecification::builder()
122                .resource_type(ResourceType::InternetGateway)
123                .tags(Tag::builder().key("deployer").value(tag).build())
124                .build(),
125        )
126        .send()
127        .await?;
128    let igw_id = igw_resp
129        .internet_gateway
130        .unwrap()
131        .internet_gateway_id
132        .unwrap();
133    client
134        .attach_internet_gateway()
135        .internet_gateway_id(&igw_id)
136        .vpc_id(vpc_id)
137        .send()
138        .await?;
139    Ok(igw_id)
140}
141
142/// Creates a route table for the VPC and sets up a default route to the Internet Gateway
143pub async fn create_route_table(
144    client: &Ec2Client,
145    vpc_id: &str,
146    igw_id: &str,
147    tag: &str,
148) -> Result<String, Ec2Error> {
149    let rt_resp = client
150        .create_route_table()
151        .vpc_id(vpc_id)
152        .tag_specifications(
153            TagSpecification::builder()
154                .resource_type(ResourceType::RouteTable)
155                .tags(Tag::builder().key("deployer").value(tag).build())
156                .build(),
157        )
158        .send()
159        .await?;
160    let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
161    client
162        .create_route()
163        .route_table_id(&rt_id)
164        .destination_cidr_block("0.0.0.0/0")
165        .gateway_id(igw_id)
166        .send()
167        .await?;
168    Ok(rt_id)
169}
170
171/// Creates a subnet within the VPC and associates it with the route table
172pub async fn create_subnet(
173    client: &Ec2Client,
174    vpc_id: &str,
175    route_table_id: &str,
176    subnet_cidr: &str,
177    availability_zone: &str,
178    tag: &str,
179) -> Result<String, Ec2Error> {
180    let subnet_resp = client
181        .create_subnet()
182        .vpc_id(vpc_id)
183        .cidr_block(subnet_cidr)
184        .availability_zone(availability_zone)
185        .tag_specifications(
186            TagSpecification::builder()
187                .resource_type(ResourceType::Subnet)
188                .tags(Tag::builder().key("deployer").value(tag).build())
189                .build(),
190        )
191        .send()
192        .await?;
193    let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
194    client
195        .associate_route_table()
196        .route_table_id(route_table_id)
197        .subnet_id(&subnet_id)
198        .send()
199        .await?;
200    Ok(subnet_id)
201}
202
203/// Creates a security group for the monitoring instance with access from the deployer IP
204pub async fn create_security_group_monitoring(
205    client: &Ec2Client,
206    vpc_id: &str,
207    deployer_ip: &str,
208    tag: &str,
209) -> Result<String, Ec2Error> {
210    let sg_resp = client
211        .create_security_group()
212        .group_name(tag)
213        .description("Security group for monitoring instance")
214        .vpc_id(vpc_id)
215        .tag_specifications(
216            TagSpecification::builder()
217                .resource_type(ResourceType::SecurityGroup)
218                .tags(Tag::builder().key("deployer").value(tag).build())
219                .build(),
220        )
221        .send()
222        .await?;
223    let sg_id = sg_resp.group_id.unwrap();
224    client
225        .authorize_security_group_ingress()
226        .group_id(&sg_id)
227        .ip_permissions(
228            IpPermission::builder()
229                .ip_protocol(DEPLOYER_PROTOCOL)
230                .from_port(DEPLOYER_MIN_PORT)
231                .to_port(DEPLOYER_MAX_PORT)
232                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
233                .build(),
234        )
235        .send()
236        .await?;
237    Ok(sg_id)
238}
239
240/// Creates a security group for binary instances with access from deployer, monitoring, and custom ports
241pub async fn create_security_group_binary(
242    client: &Ec2Client,
243    vpc_id: &str,
244    deployer_ip: &str,
245    monitoring_ip: &str,
246    tag: &str,
247    ports: &[PortConfig],
248) -> Result<String, Ec2Error> {
249    let sg_resp = client
250        .create_security_group()
251        .group_name(format!("{tag}-binary"))
252        .description("Security group for binary instances")
253        .vpc_id(vpc_id)
254        .tag_specifications(
255            TagSpecification::builder()
256                .resource_type(ResourceType::SecurityGroup)
257                .tags(Tag::builder().key("deployer").value(tag).build())
258                .build(),
259        )
260        .send()
261        .await?;
262    let sg_id = sg_resp.group_id.unwrap();
263    let mut builder = client
264        .authorize_security_group_ingress()
265        .group_id(&sg_id)
266        .ip_permissions(
267            IpPermission::builder()
268                .ip_protocol(DEPLOYER_PROTOCOL)
269                .from_port(DEPLOYER_MIN_PORT)
270                .to_port(DEPLOYER_MAX_PORT)
271                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
272                .build(),
273        )
274        .ip_permissions(
275            IpPermission::builder()
276                .ip_protocol("tcp")
277                .from_port(METRICS_PORT as i32)
278                .to_port(METRICS_PORT as i32)
279                .ip_ranges(
280                    IpRange::builder()
281                        .cidr_ip(exact_cidr(monitoring_ip))
282                        .build(),
283                )
284                .build(),
285        )
286        .ip_permissions(
287            IpPermission::builder()
288                .ip_protocol("tcp")
289                .from_port(SYSTEM_PORT as i32)
290                .to_port(SYSTEM_PORT as i32)
291                .ip_ranges(
292                    IpRange::builder()
293                        .cidr_ip(exact_cidr(monitoring_ip))
294                        .build(),
295                )
296                .build(),
297        );
298    for port in ports {
299        builder = builder.ip_permissions(
300            IpPermission::builder()
301                .ip_protocol(&port.protocol)
302                .from_port(port.port as i32)
303                .to_port(port.port as i32)
304                .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
305                .build(),
306        );
307    }
308    builder.send().await?;
309    Ok(sg_id)
310}
311
312/// Launches EC2 instances with specified configurations
313#[allow(clippy::too_many_arguments)]
314pub async fn launch_instances(
315    client: &Ec2Client,
316    ami_id: &str,
317    instance_type: InstanceType,
318    storage_size: i32,
319    storage_class: VolumeType,
320    key_name: &str,
321    subnet_id: &str,
322    sg_id: &str,
323    count: i32,
324    name: &str,
325    tag: &str,
326) -> Result<Vec<String>, Ec2Error> {
327    let resp = client
328        .run_instances()
329        .image_id(ami_id)
330        .instance_type(instance_type)
331        .key_name(key_name)
332        .min_count(count)
333        .max_count(count)
334        .network_interfaces(
335            aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
336                .associate_public_ip_address(true)
337                .device_index(0)
338                .subnet_id(subnet_id)
339                .groups(sg_id)
340                .build(),
341        )
342        .tag_specifications(
343            TagSpecification::builder()
344                .resource_type(ResourceType::Instance)
345                .set_tags(Some(vec![
346                    Tag::builder().key("deployer").value(tag).build(),
347                    Tag::builder().key("name").value(name).build(),
348                ]))
349                .build(),
350        )
351        .block_device_mappings(
352            BlockDeviceMapping::builder()
353                .device_name("/dev/sda1")
354                .ebs(
355                    EbsBlockDevice::builder()
356                        .volume_size(storage_size)
357                        .volume_type(storage_class)
358                        .delete_on_termination(true)
359                        .build(),
360                )
361                .build(),
362        )
363        .send()
364        .await?;
365    Ok(resp
366        .instances
367        .unwrap()
368        .into_iter()
369        .map(|i| i.instance_id.unwrap())
370        .collect())
371}
372
373/// Waits for instances to reach the "running" state and returns their public IPs
374pub async fn wait_for_instances_running(
375    client: &Ec2Client,
376    instance_ids: &[String],
377) -> Result<Vec<String>, Ec2Error> {
378    loop {
379        // Ask for instance details
380        let Ok(resp) = client
381            .describe_instances()
382            .set_instance_ids(Some(instance_ids.to_vec()))
383            .send()
384            .await
385        else {
386            sleep(RETRY_INTERVAL).await;
387            continue;
388        };
389
390        // Confirm all are running
391        let reservations = resp.reservations.unwrap();
392        let instances = reservations[0].instances.as_ref().unwrap();
393        if !instances.iter().all(|i| {
394            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
395        }) {
396            sleep(RETRY_INTERVAL).await;
397            continue;
398        }
399        return Ok(instances
400            .iter()
401            .map(|i| i.public_ip_address.as_ref().unwrap().clone())
402            .collect());
403    }
404}
405
406pub async fn wait_for_instances_ready(
407    client: &Ec2Client,
408    instance_ids: &[String],
409) -> Result<(), Ec2Error> {
410    loop {
411        // Ask for instance status
412        let Ok(resp) = client
413            .describe_instance_status()
414            .set_instance_ids(Some(instance_ids.to_vec()))
415            .include_all_instances(true) // Include instances regardless of state
416            .send()
417            .await
418        else {
419            sleep(RETRY_INTERVAL).await;
420            continue;
421        };
422
423        // Confirm all are ready
424        let statuses = resp.instance_statuses.unwrap_or_default();
425        let all_ready = statuses.iter().all(|s| {
426            s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
427                && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
428                && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
429                    == &SummaryStatus::Ok
430        });
431        if !all_ready {
432            sleep(RETRY_INTERVAL).await;
433            continue;
434        }
435        return Ok(());
436    }
437}
438
439/// Retrieves the private IP address of an instance
440pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
441    let resp = client
442        .describe_instances()
443        .instance_ids(instance_id)
444        .send()
445        .await?;
446    let reservations = resp.reservations.unwrap();
447    let instance = &reservations[0].instances.as_ref().unwrap()[0];
448    Ok(instance.private_ip_address.as_ref().unwrap().clone())
449}
450
451/// Creates a VPC peering connection between two VPCs
452pub async fn create_vpc_peering_connection(
453    client: &Ec2Client,
454    requester_vpc_id: &str,
455    peer_vpc_id: &str,
456    peer_region: &str,
457    tag: &str,
458) -> Result<String, Ec2Error> {
459    let resp = client
460        .create_vpc_peering_connection()
461        .vpc_id(requester_vpc_id)
462        .peer_vpc_id(peer_vpc_id)
463        .peer_region(peer_region)
464        .tag_specifications(
465            TagSpecification::builder()
466                .resource_type(ResourceType::VpcPeeringConnection)
467                .tags(Tag::builder().key("deployer").value(tag).build())
468                .build(),
469        )
470        .send()
471        .await?;
472    Ok(resp
473        .vpc_peering_connection
474        .unwrap()
475        .vpc_peering_connection_id
476        .unwrap())
477}
478
479/// Waits for a VPC peering connection to reach the "pending-acceptance" state
480pub async fn wait_for_vpc_peering_connection(
481    client: &Ec2Client,
482    peer_id: &str,
483) -> Result<(), Ec2Error> {
484    loop {
485        if let Ok(resp) = client
486            .describe_vpc_peering_connections()
487            .vpc_peering_connection_ids(peer_id)
488            .send()
489            .await
490        {
491            if let Some(connections) = resp.vpc_peering_connections {
492                if let Some(connection) = connections.first() {
493                    if connection.status.as_ref().unwrap().code
494                        == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
495                    {
496                        return Ok(());
497                    }
498                }
499            }
500        }
501        sleep(Duration::from_secs(2)).await;
502    }
503}
504
505/// Accepts a VPC peering connection in the peer region
506pub async fn accept_vpc_peering_connection(
507    client: &Ec2Client,
508    peer_id: &str,
509) -> Result<(), Ec2Error> {
510    client
511        .accept_vpc_peering_connection()
512        .vpc_peering_connection_id(peer_id)
513        .send()
514        .await?;
515    Ok(())
516}
517
518/// Adds a route to a route table for VPC peering
519pub async fn add_route(
520    client: &Ec2Client,
521    route_table_id: &str,
522    destination_cidr: &str,
523    peer_id: &str,
524) -> Result<(), Ec2Error> {
525    client
526        .create_route()
527        .route_table_id(route_table_id)
528        .destination_cidr_block(destination_cidr)
529        .vpc_peering_connection_id(peer_id)
530        .send()
531        .await?;
532    Ok(())
533}
534
535/// Finds VPC peering connections by deployer tag
536pub async fn find_vpc_peering_by_tag(
537    client: &Ec2Client,
538    tag: &str,
539) -> Result<Vec<String>, Ec2Error> {
540    let resp = client
541        .describe_vpc_peering_connections()
542        .filters(Filter::builder().name("tag:deployer").values(tag).build())
543        .send()
544        .await?;
545    Ok(resp
546        .vpc_peering_connections
547        .unwrap_or_default()
548        .into_iter()
549        .map(|p| p.vpc_peering_connection_id.unwrap())
550        .collect())
551}
552
553/// Deletes a VPC peering connection
554pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
555    client
556        .delete_vpc_peering_connection()
557        .vpc_peering_connection_id(peering_id)
558        .send()
559        .await?;
560    Ok(())
561}
562
563/// Waits for a VPC peering connection to be deleted
564pub async fn wait_for_vpc_peering_deletion(
565    ec2_client: &Ec2Client,
566    peer_id: &str,
567) -> Result<(), Ec2Error> {
568    loop {
569        let resp = ec2_client
570            .describe_vpc_peering_connections()
571            .vpc_peering_connection_ids(peer_id)
572            .send()
573            .await?;
574        if let Some(connections) = resp.vpc_peering_connections {
575            if let Some(connection) = connections.first() {
576                if connection.status.as_ref().unwrap().code
577                    == Some(VpcPeeringConnectionStateReasonCode::Deleted)
578                {
579                    return Ok(());
580                }
581            } else {
582                return Ok(());
583            }
584        } else {
585            return Ok(());
586        }
587        sleep(RETRY_INTERVAL).await;
588    }
589}
590
591/// Finds instances by deployer tag
592pub async fn find_instances_by_tag(
593    ec2_client: &Ec2Client,
594    tag: &str,
595) -> Result<Vec<String>, Ec2Error> {
596    let resp = ec2_client
597        .describe_instances()
598        .filters(Filter::builder().name("tag:deployer").values(tag).build())
599        .send()
600        .await?;
601    Ok(resp
602        .reservations
603        .unwrap_or_default()
604        .into_iter()
605        .flat_map(|r| r.instances.unwrap_or_default())
606        .map(|i| i.instance_id.unwrap())
607        .collect())
608}
609
610/// Terminates specified instances
611pub async fn terminate_instances(
612    ec2_client: &Ec2Client,
613    instance_ids: &[String],
614) -> Result<(), Ec2Error> {
615    if instance_ids.is_empty() {
616        return Ok(());
617    }
618    ec2_client
619        .terminate_instances()
620        .set_instance_ids(Some(instance_ids.to_vec()))
621        .send()
622        .await?;
623    Ok(())
624}
625
626/// Waits for instances to be terminated
627pub async fn wait_for_instances_terminated(
628    ec2_client: &Ec2Client,
629    instance_ids: &[String],
630) -> Result<(), Ec2Error> {
631    loop {
632        let resp = ec2_client
633            .describe_instances()
634            .set_instance_ids(Some(instance_ids.to_vec()))
635            .send()
636            .await?;
637        let instances = resp
638            .reservations
639            .unwrap_or_default()
640            .into_iter()
641            .flat_map(|r| r.instances.unwrap_or_default())
642            .collect::<Vec<_>>();
643        if instances.iter().all(|i| {
644            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
645        }) {
646            return Ok(());
647        }
648        sleep(RETRY_INTERVAL).await;
649    }
650}
651
652/// Finds security groups by deployer tag
653pub async fn find_security_groups_by_tag(
654    ec2_client: &Ec2Client,
655    tag: &str,
656) -> Result<Vec<SecurityGroup>, Ec2Error> {
657    let resp = ec2_client
658        .describe_security_groups()
659        .filters(Filter::builder().name("tag:deployer").values(tag).build())
660        .send()
661        .await?;
662    Ok(resp
663        .security_groups
664        .unwrap_or_default()
665        .into_iter()
666        .collect())
667}
668
669/// Deletes a security group
670pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
671    ec2_client
672        .delete_security_group()
673        .group_id(sg_id)
674        .send()
675        .await?;
676    Ok(())
677}
678
679/// Finds route tables by deployer tag
680pub async fn find_route_tables_by_tag(
681    ec2_client: &Ec2Client,
682    tag: &str,
683) -> Result<Vec<String>, Ec2Error> {
684    let resp = ec2_client
685        .describe_route_tables()
686        .filters(Filter::builder().name("tag:deployer").values(tag).build())
687        .send()
688        .await?;
689    Ok(resp
690        .route_tables
691        .unwrap_or_default()
692        .into_iter()
693        .map(|rt| rt.route_table_id.unwrap())
694        .collect())
695}
696
697/// Deletes a route table
698pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
699    ec2_client
700        .delete_route_table()
701        .route_table_id(rt_id)
702        .send()
703        .await?;
704    Ok(())
705}
706
707/// Finds Internet Gateways by deployer tag
708pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
709    let resp = ec2_client
710        .describe_internet_gateways()
711        .filters(Filter::builder().name("tag:deployer").values(tag).build())
712        .send()
713        .await?;
714    Ok(resp
715        .internet_gateways
716        .unwrap_or_default()
717        .into_iter()
718        .map(|igw| igw.internet_gateway_id.unwrap())
719        .collect())
720}
721
722/// Finds the VPC ID attached to an Internet Gateway
723pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
724    let resp = ec2_client
725        .describe_internet_gateways()
726        .internet_gateway_ids(igw_id)
727        .send()
728        .await?;
729    Ok(resp.internet_gateways.unwrap()[0]
730        .attachments
731        .as_ref()
732        .unwrap()[0]
733        .vpc_id
734        .as_ref()
735        .unwrap()
736        .clone())
737}
738
739/// Detaches an Internet Gateway from a VPC
740pub async fn detach_igw(
741    ec2_client: &Ec2Client,
742    igw_id: &str,
743    vpc_id: &str,
744) -> Result<(), Ec2Error> {
745    ec2_client
746        .detach_internet_gateway()
747        .internet_gateway_id(igw_id)
748        .vpc_id(vpc_id)
749        .send()
750        .await?;
751    Ok(())
752}
753
754/// Deletes an Internet Gateway
755pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
756    ec2_client
757        .delete_internet_gateway()
758        .internet_gateway_id(igw_id)
759        .send()
760        .await?;
761    Ok(())
762}
763
764/// Finds subnets by deployer tag
765pub async fn find_subnets_by_tag(
766    ec2_client: &Ec2Client,
767    tag: &str,
768) -> Result<Vec<String>, Ec2Error> {
769    let resp = ec2_client
770        .describe_subnets()
771        .filters(Filter::builder().name("tag:deployer").values(tag).build())
772        .send()
773        .await?;
774    Ok(resp
775        .subnets
776        .unwrap_or_default()
777        .into_iter()
778        .map(|subnet| subnet.subnet_id.unwrap())
779        .collect())
780}
781
782/// Deletes a subnet
783pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
784    ec2_client
785        .delete_subnet()
786        .subnet_id(subnet_id)
787        .send()
788        .await?;
789    Ok(())
790}
791
792/// Finds VPCs by deployer tag
793pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
794    let resp = ec2_client
795        .describe_vpcs()
796        .filters(Filter::builder().name("tag:deployer").values(tag).build())
797        .send()
798        .await?;
799    Ok(resp
800        .vpcs
801        .unwrap_or_default()
802        .into_iter()
803        .map(|vpc| vpc.vpc_id.unwrap())
804        .collect())
805}
806
807/// Deletes a VPC
808pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
809    ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
810    Ok(())
811}
812
813/// Enforces that all instance types are ARM64-based
814pub async fn assert_arm64_support(
815    client: &Ec2Client,
816    instance_types: &[String],
817) -> Result<(), Ec2Error> {
818    let mut next_token: Option<String> = None;
819    let mut supported_instance_types = HashSet::new();
820
821    // Loop through all pages of results
822    loop {
823        // Get the next page of instance types
824        let mut request = client.describe_instance_types().filters(
825            Filter::builder()
826                .name("processor-info.supported-architecture")
827                .values("arm64")
828                .build(),
829        );
830        if let Some(token) = next_token {
831            request = request.next_token(token);
832        }
833        let response = request.send().await?;
834
835        // Collect instance types from this page
836        for instance_type in response.instance_types.unwrap_or_default() {
837            if let Some(it) = instance_type.instance_type {
838                supported_instance_types.insert(it.to_string());
839            }
840        }
841
842        // Check if there's another page
843        next_token = response.next_token;
844        if next_token.is_none() {
845            break;
846        }
847    }
848
849    // Validate all requested instance types
850    for instance_type in instance_types {
851        if !supported_instance_types.contains(instance_type) {
852            return Err(Ec2Error::from(BuildError::other(format!(
853                "instance type {instance_type} not ARM64-based"
854            ))));
855        }
856    }
857    Ok(())
858}
859
860/// Finds the availability zone that supports all required instance types
861pub async fn find_availability_zone(
862    client: &Ec2Client,
863    instance_types: &[String],
864) -> Result<String, Ec2Error> {
865    // Retrieve all instance type offerings for availability zones in the region
866    let offerings = client
867        .describe_instance_type_offerings()
868        .location_type("availability-zone".into())
869        .filters(
870            Filter::builder()
871                .name("instance-type")
872                .set_values(Some(instance_types.to_vec()))
873                .build(),
874        )
875        .send()
876        .await?
877        .instance_type_offerings
878        .unwrap_or_default();
879
880    // Build a map from availability zone to the set of supported instance types
881    let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
882    for offering in offerings {
883        if let (Some(location), Some(instance_type)) = (
884            offering.location,
885            offering.instance_type.map(|it| it.to_string()), // Convert enum to String if necessary
886        ) {
887            az_to_instance_types
888                .entry(location)
889                .or_default()
890                .insert(instance_type);
891        }
892    }
893
894    // Convert the required instance types to a HashSet for efficient subset checking
895    let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
896
897    // Find an availability zone that supports all required instance types
898    for (az, supported_types) in az_to_instance_types {
899        if required_instance_types.is_subset(&supported_types) {
900            return Ok(az); // Return the first matching availability zone
901        }
902    }
903
904    // If no availability zone supports all instance types, return an error
905    Err(Ec2Error::from(BuildError::other(format!(
906        "no availability zone supports all required instance types: {instance_types:?}"
907    ))))
908}
909
910/// Waits until all network interfaces associated with a security group are deleted
911pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
912    loop {
913        let resp = ec2_client
914            .describe_network_interfaces()
915            .filters(Filter::builder().name("group-id").values(sg_id).build())
916            .send()
917            .await?;
918        if resp.network_interfaces.unwrap_or_default().is_empty() {
919            return Ok(());
920        }
921        sleep(RETRY_INTERVAL).await;
922    }
923}