commonware_deployer/ec2/
aws.rs

1//! AWS EC2 SDK function wrappers
2
3use super::{MEMLEAK_PORT, METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5    utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6    PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12    error::BuildError,
13    primitives::Blob,
14    types::{
15        BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16        SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17    },
18    Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21    collections::{HashMap, HashSet},
22    time::Duration,
23};
24use tokio::time::sleep;
25
26/// Creates an EC2 client for the specified AWS region
27pub async fn create_ec2_client(region: Region) -> Ec2Client {
28    let retry = aws_config::retry::RetryConfig::adaptive()
29        .with_max_attempts(10)
30        .with_initial_backoff(Duration::from_millis(500))
31        .with_max_backoff(Duration::from_secs(30));
32    let config = aws_config::defaults(BehaviorVersion::v2025_01_17())
33        .region(region)
34        .retry_config(retry)
35        .load()
36        .await;
37    Ec2Client::new(&config)
38}
39
40/// Imports an SSH public key into the specified region
41pub async fn import_key_pair(
42    client: &Ec2Client,
43    key_name: &str,
44    public_key: &str,
45) -> Result<(), Ec2Error> {
46    let blob = Blob::new(public_key.as_bytes());
47    client
48        .import_key_pair()
49        .key_name(key_name)
50        .public_key_material(blob)
51        .send()
52        .await?;
53    Ok(())
54}
55
56/// Deletes an SSH key pair from the specified region
57pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58    client.delete_key_pair().key_name(key_name).send().await?;
59    Ok(())
60}
61
62/// Finds the latest Ubuntu 24.04 ARM64 AMI in the region
63pub async fn find_latest_ami(client: &Ec2Client) -> Result<String, Ec2Error> {
64    let resp = client
65        .describe_images()
66        .filters(
67            Filter::builder()
68                .name("name")
69                .values("ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*")
70                .build(),
71        )
72        .filters(
73            Filter::builder()
74                .name("root-device-type")
75                .values("ebs")
76                .build(),
77        )
78        .owners("099720109477") // Canonical's AWS account ID
79        .send()
80        .await?;
81    let mut images = resp.images.unwrap_or_default();
82    if images.is_empty() {
83        return Err(Ec2Error::from(BuildError::other(
84            "No matching AMI found".to_string(),
85        )));
86    }
87    images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
88    let latest_ami = images[0].image_id().unwrap();
89    Ok(latest_ami.to_string())
90}
91
92/// Creates a VPC with the specified CIDR block and tag
93pub async fn create_vpc(
94    client: &Ec2Client,
95    cidr_block: &str,
96    tag: &str,
97) -> Result<String, Ec2Error> {
98    let resp = client
99        .create_vpc()
100        .cidr_block(cidr_block)
101        .tag_specifications(
102            TagSpecification::builder()
103                .resource_type(ResourceType::Vpc)
104                .tags(Tag::builder().key("deployer").value(tag).build())
105                .build(),
106        )
107        .send()
108        .await?;
109    Ok(resp.vpc.unwrap().vpc_id.unwrap())
110}
111
112/// Creates an Internet Gateway and attaches it to the specified VPC
113pub async fn create_and_attach_igw(
114    client: &Ec2Client,
115    vpc_id: &str,
116    tag: &str,
117) -> Result<String, Ec2Error> {
118    let igw_resp = client
119        .create_internet_gateway()
120        .tag_specifications(
121            TagSpecification::builder()
122                .resource_type(ResourceType::InternetGateway)
123                .tags(Tag::builder().key("deployer").value(tag).build())
124                .build(),
125        )
126        .send()
127        .await?;
128    let igw_id = igw_resp
129        .internet_gateway
130        .unwrap()
131        .internet_gateway_id
132        .unwrap();
133    client
134        .attach_internet_gateway()
135        .internet_gateway_id(&igw_id)
136        .vpc_id(vpc_id)
137        .send()
138        .await?;
139    Ok(igw_id)
140}
141
142/// Creates a route table for the VPC and sets up a default route to the Internet Gateway
143pub async fn create_route_table(
144    client: &Ec2Client,
145    vpc_id: &str,
146    igw_id: &str,
147    tag: &str,
148) -> Result<String, Ec2Error> {
149    let rt_resp = client
150        .create_route_table()
151        .vpc_id(vpc_id)
152        .tag_specifications(
153            TagSpecification::builder()
154                .resource_type(ResourceType::RouteTable)
155                .tags(Tag::builder().key("deployer").value(tag).build())
156                .build(),
157        )
158        .send()
159        .await?;
160    let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
161    client
162        .create_route()
163        .route_table_id(&rt_id)
164        .destination_cidr_block("0.0.0.0/0")
165        .gateway_id(igw_id)
166        .send()
167        .await?;
168    Ok(rt_id)
169}
170
171/// Creates a subnet within the VPC and associates it with the route table
172pub async fn create_subnet(
173    client: &Ec2Client,
174    vpc_id: &str,
175    route_table_id: &str,
176    subnet_cidr: &str,
177    availability_zone: &str,
178    tag: &str,
179) -> Result<String, Ec2Error> {
180    let subnet_resp = client
181        .create_subnet()
182        .vpc_id(vpc_id)
183        .cidr_block(subnet_cidr)
184        .availability_zone(availability_zone)
185        .tag_specifications(
186            TagSpecification::builder()
187                .resource_type(ResourceType::Subnet)
188                .tags(Tag::builder().key("deployer").value(tag).build())
189                .build(),
190        )
191        .send()
192        .await?;
193    let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
194    client
195        .associate_route_table()
196        .route_table_id(route_table_id)
197        .subnet_id(&subnet_id)
198        .send()
199        .await?;
200    Ok(subnet_id)
201}
202
203/// Creates a security group for the monitoring instance with access from the deployer IP
204pub async fn create_security_group_monitoring(
205    client: &Ec2Client,
206    vpc_id: &str,
207    deployer_ip: &str,
208    tag: &str,
209) -> Result<String, Ec2Error> {
210    let sg_resp = client
211        .create_security_group()
212        .group_name(tag)
213        .description("Security group for monitoring instance")
214        .vpc_id(vpc_id)
215        .tag_specifications(
216            TagSpecification::builder()
217                .resource_type(ResourceType::SecurityGroup)
218                .tags(Tag::builder().key("deployer").value(tag).build())
219                .build(),
220        )
221        .send()
222        .await?;
223    let sg_id = sg_resp.group_id.unwrap();
224    client
225        .authorize_security_group_ingress()
226        .group_id(&sg_id)
227        .ip_permissions(
228            IpPermission::builder()
229                .ip_protocol(DEPLOYER_PROTOCOL)
230                .from_port(DEPLOYER_MIN_PORT)
231                .to_port(DEPLOYER_MAX_PORT)
232                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
233                .build(),
234        )
235        .send()
236        .await?;
237    Ok(sg_id)
238}
239
240/// Creates a security group for binary instances with access from deployer, monitoring, and custom ports
241pub async fn create_security_group_binary(
242    client: &Ec2Client,
243    vpc_id: &str,
244    deployer_ip: &str,
245    monitoring_ip: &str,
246    tag: &str,
247    ports: &[PortConfig],
248) -> Result<String, Ec2Error> {
249    let sg_resp = client
250        .create_security_group()
251        .group_name(format!("{tag}-binary"))
252        .description("Security group for binary instances")
253        .vpc_id(vpc_id)
254        .tag_specifications(
255            TagSpecification::builder()
256                .resource_type(ResourceType::SecurityGroup)
257                .tags(Tag::builder().key("deployer").value(tag).build())
258                .build(),
259        )
260        .send()
261        .await?;
262    let sg_id = sg_resp.group_id.unwrap();
263    let mut builder = client
264        .authorize_security_group_ingress()
265        .group_id(&sg_id)
266        .ip_permissions(
267            IpPermission::builder()
268                .ip_protocol(DEPLOYER_PROTOCOL)
269                .from_port(DEPLOYER_MIN_PORT)
270                .to_port(DEPLOYER_MAX_PORT)
271                .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
272                .build(),
273        )
274        .ip_permissions(
275            IpPermission::builder()
276                .ip_protocol("tcp")
277                .from_port(METRICS_PORT as i32)
278                .to_port(METRICS_PORT as i32)
279                .ip_ranges(
280                    IpRange::builder()
281                        .cidr_ip(exact_cidr(monitoring_ip))
282                        .build(),
283                )
284                .build(),
285        )
286        .ip_permissions(
287            IpPermission::builder()
288                .ip_protocol("tcp")
289                .from_port(SYSTEM_PORT as i32)
290                .to_port(SYSTEM_PORT as i32)
291                .ip_ranges(
292                    IpRange::builder()
293                        .cidr_ip(exact_cidr(monitoring_ip))
294                        .build(),
295                )
296                .build(),
297        )
298        .ip_permissions(
299            IpPermission::builder()
300                .ip_protocol("tcp")
301                .from_port(MEMLEAK_PORT as i32)
302                .to_port(MEMLEAK_PORT as i32)
303                .ip_ranges(
304                    IpRange::builder()
305                        .cidr_ip(exact_cidr(monitoring_ip))
306                        .build(),
307                )
308                .build(),
309        );
310    for port in ports {
311        builder = builder.ip_permissions(
312            IpPermission::builder()
313                .ip_protocol(&port.protocol)
314                .from_port(port.port as i32)
315                .to_port(port.port as i32)
316                .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
317                .build(),
318        );
319    }
320    builder.send().await?;
321    Ok(sg_id)
322}
323
324/// Launches EC2 instances with specified configurations
325#[allow(clippy::too_many_arguments)]
326pub async fn launch_instances(
327    client: &Ec2Client,
328    ami_id: &str,
329    instance_type: InstanceType,
330    storage_size: i32,
331    storage_class: VolumeType,
332    key_name: &str,
333    subnet_id: &str,
334    sg_id: &str,
335    count: i32,
336    name: &str,
337    tag: &str,
338) -> Result<Vec<String>, Ec2Error> {
339    let resp = client
340        .run_instances()
341        .image_id(ami_id)
342        .instance_type(instance_type)
343        .key_name(key_name)
344        .min_count(count)
345        .max_count(count)
346        .network_interfaces(
347            aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
348                .associate_public_ip_address(true)
349                .device_index(0)
350                .subnet_id(subnet_id)
351                .groups(sg_id)
352                .build(),
353        )
354        .tag_specifications(
355            TagSpecification::builder()
356                .resource_type(ResourceType::Instance)
357                .set_tags(Some(vec![
358                    Tag::builder().key("deployer").value(tag).build(),
359                    Tag::builder().key("name").value(name).build(),
360                ]))
361                .build(),
362        )
363        .block_device_mappings(
364            BlockDeviceMapping::builder()
365                .device_name("/dev/sda1")
366                .ebs(
367                    EbsBlockDevice::builder()
368                        .volume_size(storage_size)
369                        .volume_type(storage_class)
370                        .delete_on_termination(true)
371                        .build(),
372                )
373                .build(),
374        )
375        .send()
376        .await?;
377    Ok(resp
378        .instances
379        .unwrap()
380        .into_iter()
381        .map(|i| i.instance_id.unwrap())
382        .collect())
383}
384
385/// Waits for instances to reach the "running" state and returns their public IPs
386pub async fn wait_for_instances_running(
387    client: &Ec2Client,
388    instance_ids: &[String],
389) -> Result<Vec<String>, Ec2Error> {
390    loop {
391        // Ask for instance details
392        let Ok(resp) = client
393            .describe_instances()
394            .set_instance_ids(Some(instance_ids.to_vec()))
395            .send()
396            .await
397        else {
398            sleep(RETRY_INTERVAL).await;
399            continue;
400        };
401
402        // Confirm all are running
403        let reservations = resp.reservations.unwrap();
404        let instances = reservations[0].instances.as_ref().unwrap();
405        if !instances.iter().all(|i| {
406            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
407        }) {
408            sleep(RETRY_INTERVAL).await;
409            continue;
410        }
411        return Ok(instances
412            .iter()
413            .map(|i| i.public_ip_address.as_ref().unwrap().clone())
414            .collect());
415    }
416}
417
418pub async fn wait_for_instances_ready(
419    client: &Ec2Client,
420    instance_ids: &[String],
421) -> Result<(), Ec2Error> {
422    loop {
423        // Ask for instance status
424        let Ok(resp) = client
425            .describe_instance_status()
426            .set_instance_ids(Some(instance_ids.to_vec()))
427            .include_all_instances(true) // Include instances regardless of state
428            .send()
429            .await
430        else {
431            sleep(RETRY_INTERVAL).await;
432            continue;
433        };
434
435        // Confirm all are ready
436        let statuses = resp.instance_statuses.unwrap_or_default();
437        let all_ready = statuses.iter().all(|s| {
438            s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
439                && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
440                && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
441                    == &SummaryStatus::Ok
442        });
443        if !all_ready {
444            sleep(RETRY_INTERVAL).await;
445            continue;
446        }
447        return Ok(());
448    }
449}
450
451/// Retrieves the private IP address of an instance
452pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
453    let resp = client
454        .describe_instances()
455        .instance_ids(instance_id)
456        .send()
457        .await?;
458    let reservations = resp.reservations.unwrap();
459    let instance = &reservations[0].instances.as_ref().unwrap()[0];
460    Ok(instance.private_ip_address.as_ref().unwrap().clone())
461}
462
463/// Creates a VPC peering connection between two VPCs
464pub async fn create_vpc_peering_connection(
465    client: &Ec2Client,
466    requester_vpc_id: &str,
467    peer_vpc_id: &str,
468    peer_region: &str,
469    tag: &str,
470) -> Result<String, Ec2Error> {
471    let resp = client
472        .create_vpc_peering_connection()
473        .vpc_id(requester_vpc_id)
474        .peer_vpc_id(peer_vpc_id)
475        .peer_region(peer_region)
476        .tag_specifications(
477            TagSpecification::builder()
478                .resource_type(ResourceType::VpcPeeringConnection)
479                .tags(Tag::builder().key("deployer").value(tag).build())
480                .build(),
481        )
482        .send()
483        .await?;
484    Ok(resp
485        .vpc_peering_connection
486        .unwrap()
487        .vpc_peering_connection_id
488        .unwrap())
489}
490
491/// Waits for a VPC peering connection to reach the "pending-acceptance" state
492pub async fn wait_for_vpc_peering_connection(
493    client: &Ec2Client,
494    peer_id: &str,
495) -> Result<(), Ec2Error> {
496    loop {
497        if let Ok(resp) = client
498            .describe_vpc_peering_connections()
499            .vpc_peering_connection_ids(peer_id)
500            .send()
501            .await
502        {
503            if let Some(connections) = resp.vpc_peering_connections {
504                if let Some(connection) = connections.first() {
505                    if connection.status.as_ref().unwrap().code
506                        == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
507                    {
508                        return Ok(());
509                    }
510                }
511            }
512        }
513        sleep(Duration::from_secs(2)).await;
514    }
515}
516
517/// Accepts a VPC peering connection in the peer region
518pub async fn accept_vpc_peering_connection(
519    client: &Ec2Client,
520    peer_id: &str,
521) -> Result<(), Ec2Error> {
522    client
523        .accept_vpc_peering_connection()
524        .vpc_peering_connection_id(peer_id)
525        .send()
526        .await?;
527    Ok(())
528}
529
530/// Adds a route to a route table for VPC peering
531pub async fn add_route(
532    client: &Ec2Client,
533    route_table_id: &str,
534    destination_cidr: &str,
535    peer_id: &str,
536) -> Result<(), Ec2Error> {
537    client
538        .create_route()
539        .route_table_id(route_table_id)
540        .destination_cidr_block(destination_cidr)
541        .vpc_peering_connection_id(peer_id)
542        .send()
543        .await?;
544    Ok(())
545}
546
547/// Finds VPC peering connections by deployer tag
548pub async fn find_vpc_peering_by_tag(
549    client: &Ec2Client,
550    tag: &str,
551) -> Result<Vec<String>, Ec2Error> {
552    let resp = client
553        .describe_vpc_peering_connections()
554        .filters(Filter::builder().name("tag:deployer").values(tag).build())
555        .send()
556        .await?;
557    Ok(resp
558        .vpc_peering_connections
559        .unwrap_or_default()
560        .into_iter()
561        .map(|p| p.vpc_peering_connection_id.unwrap())
562        .collect())
563}
564
565/// Deletes a VPC peering connection
566pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
567    client
568        .delete_vpc_peering_connection()
569        .vpc_peering_connection_id(peering_id)
570        .send()
571        .await?;
572    Ok(())
573}
574
575/// Waits for a VPC peering connection to be deleted
576pub async fn wait_for_vpc_peering_deletion(
577    ec2_client: &Ec2Client,
578    peer_id: &str,
579) -> Result<(), Ec2Error> {
580    loop {
581        let resp = ec2_client
582            .describe_vpc_peering_connections()
583            .vpc_peering_connection_ids(peer_id)
584            .send()
585            .await?;
586        if let Some(connections) = resp.vpc_peering_connections {
587            if let Some(connection) = connections.first() {
588                if connection.status.as_ref().unwrap().code
589                    == Some(VpcPeeringConnectionStateReasonCode::Deleted)
590                {
591                    return Ok(());
592                }
593            } else {
594                return Ok(());
595            }
596        } else {
597            return Ok(());
598        }
599        sleep(RETRY_INTERVAL).await;
600    }
601}
602
603/// Finds instances by deployer tag
604pub async fn find_instances_by_tag(
605    ec2_client: &Ec2Client,
606    tag: &str,
607) -> Result<Vec<String>, Ec2Error> {
608    let resp = ec2_client
609        .describe_instances()
610        .filters(Filter::builder().name("tag:deployer").values(tag).build())
611        .send()
612        .await?;
613    Ok(resp
614        .reservations
615        .unwrap_or_default()
616        .into_iter()
617        .flat_map(|r| r.instances.unwrap_or_default())
618        .map(|i| i.instance_id.unwrap())
619        .collect())
620}
621
622/// Terminates specified instances
623pub async fn terminate_instances(
624    ec2_client: &Ec2Client,
625    instance_ids: &[String],
626) -> Result<(), Ec2Error> {
627    if instance_ids.is_empty() {
628        return Ok(());
629    }
630    ec2_client
631        .terminate_instances()
632        .set_instance_ids(Some(instance_ids.to_vec()))
633        .send()
634        .await?;
635    Ok(())
636}
637
638/// Waits for instances to be terminated
639pub async fn wait_for_instances_terminated(
640    ec2_client: &Ec2Client,
641    instance_ids: &[String],
642) -> Result<(), Ec2Error> {
643    loop {
644        let resp = ec2_client
645            .describe_instances()
646            .set_instance_ids(Some(instance_ids.to_vec()))
647            .send()
648            .await?;
649        let instances = resp
650            .reservations
651            .unwrap_or_default()
652            .into_iter()
653            .flat_map(|r| r.instances.unwrap_or_default())
654            .collect::<Vec<_>>();
655        if instances.iter().all(|i| {
656            i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
657        }) {
658            return Ok(());
659        }
660        sleep(RETRY_INTERVAL).await;
661    }
662}
663
664/// Finds security groups by deployer tag
665pub async fn find_security_groups_by_tag(
666    ec2_client: &Ec2Client,
667    tag: &str,
668) -> Result<Vec<SecurityGroup>, Ec2Error> {
669    let resp = ec2_client
670        .describe_security_groups()
671        .filters(Filter::builder().name("tag:deployer").values(tag).build())
672        .send()
673        .await?;
674    Ok(resp
675        .security_groups
676        .unwrap_or_default()
677        .into_iter()
678        .collect())
679}
680
681/// Deletes a security group
682pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
683    ec2_client
684        .delete_security_group()
685        .group_id(sg_id)
686        .send()
687        .await?;
688    Ok(())
689}
690
691/// Finds route tables by deployer tag
692pub async fn find_route_tables_by_tag(
693    ec2_client: &Ec2Client,
694    tag: &str,
695) -> Result<Vec<String>, Ec2Error> {
696    let resp = ec2_client
697        .describe_route_tables()
698        .filters(Filter::builder().name("tag:deployer").values(tag).build())
699        .send()
700        .await?;
701    Ok(resp
702        .route_tables
703        .unwrap_or_default()
704        .into_iter()
705        .map(|rt| rt.route_table_id.unwrap())
706        .collect())
707}
708
709/// Deletes a route table
710pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
711    ec2_client
712        .delete_route_table()
713        .route_table_id(rt_id)
714        .send()
715        .await?;
716    Ok(())
717}
718
719/// Finds Internet Gateways by deployer tag
720pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
721    let resp = ec2_client
722        .describe_internet_gateways()
723        .filters(Filter::builder().name("tag:deployer").values(tag).build())
724        .send()
725        .await?;
726    Ok(resp
727        .internet_gateways
728        .unwrap_or_default()
729        .into_iter()
730        .map(|igw| igw.internet_gateway_id.unwrap())
731        .collect())
732}
733
734/// Finds the VPC ID attached to an Internet Gateway
735pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
736    let resp = ec2_client
737        .describe_internet_gateways()
738        .internet_gateway_ids(igw_id)
739        .send()
740        .await?;
741    Ok(resp.internet_gateways.unwrap()[0]
742        .attachments
743        .as_ref()
744        .unwrap()[0]
745        .vpc_id
746        .as_ref()
747        .unwrap()
748        .clone())
749}
750
751/// Detaches an Internet Gateway from a VPC
752pub async fn detach_igw(
753    ec2_client: &Ec2Client,
754    igw_id: &str,
755    vpc_id: &str,
756) -> Result<(), Ec2Error> {
757    ec2_client
758        .detach_internet_gateway()
759        .internet_gateway_id(igw_id)
760        .vpc_id(vpc_id)
761        .send()
762        .await?;
763    Ok(())
764}
765
766/// Deletes an Internet Gateway
767pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
768    ec2_client
769        .delete_internet_gateway()
770        .internet_gateway_id(igw_id)
771        .send()
772        .await?;
773    Ok(())
774}
775
776/// Finds subnets by deployer tag
777pub async fn find_subnets_by_tag(
778    ec2_client: &Ec2Client,
779    tag: &str,
780) -> Result<Vec<String>, Ec2Error> {
781    let resp = ec2_client
782        .describe_subnets()
783        .filters(Filter::builder().name("tag:deployer").values(tag).build())
784        .send()
785        .await?;
786    Ok(resp
787        .subnets
788        .unwrap_or_default()
789        .into_iter()
790        .map(|subnet| subnet.subnet_id.unwrap())
791        .collect())
792}
793
794/// Deletes a subnet
795pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
796    ec2_client
797        .delete_subnet()
798        .subnet_id(subnet_id)
799        .send()
800        .await?;
801    Ok(())
802}
803
804/// Finds VPCs by deployer tag
805pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
806    let resp = ec2_client
807        .describe_vpcs()
808        .filters(Filter::builder().name("tag:deployer").values(tag).build())
809        .send()
810        .await?;
811    Ok(resp
812        .vpcs
813        .unwrap_or_default()
814        .into_iter()
815        .map(|vpc| vpc.vpc_id.unwrap())
816        .collect())
817}
818
819/// Deletes a VPC
820pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
821    ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
822    Ok(())
823}
824
825/// Enforces that all instance types are ARM64-based
826pub async fn assert_arm64_support(
827    client: &Ec2Client,
828    instance_types: &[String],
829) -> Result<(), Ec2Error> {
830    let mut next_token: Option<String> = None;
831    let mut supported_instance_types = HashSet::new();
832
833    // Loop through all pages of results
834    loop {
835        // Get the next page of instance types
836        let mut request = client.describe_instance_types().filters(
837            Filter::builder()
838                .name("processor-info.supported-architecture")
839                .values("arm64")
840                .build(),
841        );
842        if let Some(token) = next_token {
843            request = request.next_token(token);
844        }
845        let response = request.send().await?;
846
847        // Collect instance types from this page
848        for instance_type in response.instance_types.unwrap_or_default() {
849            if let Some(it) = instance_type.instance_type {
850                supported_instance_types.insert(it.to_string());
851            }
852        }
853
854        // Check if there's another page
855        next_token = response.next_token;
856        if next_token.is_none() {
857            break;
858        }
859    }
860
861    // Validate all requested instance types
862    for instance_type in instance_types {
863        if !supported_instance_types.contains(instance_type) {
864            return Err(Ec2Error::from(BuildError::other(format!(
865                "instance type {instance_type} not ARM64-based"
866            ))));
867        }
868    }
869    Ok(())
870}
871
872/// Finds the availability zone that supports all required instance types
873pub async fn find_availability_zone(
874    client: &Ec2Client,
875    instance_types: &[String],
876) -> Result<String, Ec2Error> {
877    // Retrieve all instance type offerings for availability zones in the region
878    let offerings = client
879        .describe_instance_type_offerings()
880        .location_type("availability-zone".into())
881        .filters(
882            Filter::builder()
883                .name("instance-type")
884                .set_values(Some(instance_types.to_vec()))
885                .build(),
886        )
887        .send()
888        .await?
889        .instance_type_offerings
890        .unwrap_or_default();
891
892    // Build a map from availability zone to the set of supported instance types
893    let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
894    for offering in offerings {
895        if let (Some(location), Some(instance_type)) = (
896            offering.location,
897            offering.instance_type.map(|it| it.to_string()), // Convert enum to String if necessary
898        ) {
899            az_to_instance_types
900                .entry(location)
901                .or_default()
902                .insert(instance_type);
903        }
904    }
905
906    // Convert the required instance types to a HashSet for efficient subset checking
907    let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
908
909    // Find an availability zone that supports all required instance types
910    for (az, supported_types) in az_to_instance_types {
911        if required_instance_types.is_subset(&supported_types) {
912            return Ok(az); // Return the first matching availability zone
913        }
914    }
915
916    // If no availability zone supports all instance types, return an error
917    Err(Ec2Error::from(BuildError::other(format!(
918        "no availability zone supports all required instance types: {instance_types:?}"
919    ))))
920}
921
922/// Waits until all network interfaces associated with a security group are deleted
923pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
924    loop {
925        let resp = ec2_client
926            .describe_network_interfaces()
927            .filters(Filter::builder().name("group-id").values(sg_id).build())
928            .send()
929            .await?;
930        if resp.network_interfaces.unwrap_or_default().is_empty() {
931            return Ok(());
932        }
933        sleep(RETRY_INTERVAL).await;
934    }
935}