1use super::{MEMLEAK_PORT, METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5 utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6 PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12 error::BuildError,
13 primitives::Blob,
14 types::{
15 BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16 SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17 },
18 Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21 collections::{HashMap, HashSet},
22 time::Duration,
23};
24use tokio::time::sleep;
25
26pub async fn create_ec2_client(region: Region) -> Ec2Client {
28 let retry = aws_config::retry::RetryConfig::adaptive()
29 .with_max_attempts(10)
30 .with_initial_backoff(Duration::from_millis(500))
31 .with_max_backoff(Duration::from_secs(30));
32 let config = aws_config::defaults(BehaviorVersion::v2025_01_17())
33 .region(region)
34 .retry_config(retry)
35 .load()
36 .await;
37 Ec2Client::new(&config)
38}
39
40pub async fn import_key_pair(
42 client: &Ec2Client,
43 key_name: &str,
44 public_key: &str,
45) -> Result<(), Ec2Error> {
46 let blob = Blob::new(public_key.as_bytes());
47 client
48 .import_key_pair()
49 .key_name(key_name)
50 .public_key_material(blob)
51 .send()
52 .await?;
53 Ok(())
54}
55
56pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58 client.delete_key_pair().key_name(key_name).send().await?;
59 Ok(())
60}
61
62pub async fn find_latest_ami(client: &Ec2Client) -> Result<String, Ec2Error> {
64 let resp = client
65 .describe_images()
66 .filters(
67 Filter::builder()
68 .name("name")
69 .values("ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*")
70 .build(),
71 )
72 .filters(
73 Filter::builder()
74 .name("root-device-type")
75 .values("ebs")
76 .build(),
77 )
78 .owners("099720109477") .send()
80 .await?;
81 let mut images = resp.images.unwrap_or_default();
82 if images.is_empty() {
83 return Err(Ec2Error::from(BuildError::other(
84 "No matching AMI found".to_string(),
85 )));
86 }
87 images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
88 let latest_ami = images[0].image_id().unwrap();
89 Ok(latest_ami.to_string())
90}
91
92pub async fn create_vpc(
94 client: &Ec2Client,
95 cidr_block: &str,
96 tag: &str,
97) -> Result<String, Ec2Error> {
98 let resp = client
99 .create_vpc()
100 .cidr_block(cidr_block)
101 .tag_specifications(
102 TagSpecification::builder()
103 .resource_type(ResourceType::Vpc)
104 .tags(Tag::builder().key("deployer").value(tag).build())
105 .build(),
106 )
107 .send()
108 .await?;
109 Ok(resp.vpc.unwrap().vpc_id.unwrap())
110}
111
112pub async fn create_and_attach_igw(
114 client: &Ec2Client,
115 vpc_id: &str,
116 tag: &str,
117) -> Result<String, Ec2Error> {
118 let igw_resp = client
119 .create_internet_gateway()
120 .tag_specifications(
121 TagSpecification::builder()
122 .resource_type(ResourceType::InternetGateway)
123 .tags(Tag::builder().key("deployer").value(tag).build())
124 .build(),
125 )
126 .send()
127 .await?;
128 let igw_id = igw_resp
129 .internet_gateway
130 .unwrap()
131 .internet_gateway_id
132 .unwrap();
133 client
134 .attach_internet_gateway()
135 .internet_gateway_id(&igw_id)
136 .vpc_id(vpc_id)
137 .send()
138 .await?;
139 Ok(igw_id)
140}
141
142pub async fn create_route_table(
144 client: &Ec2Client,
145 vpc_id: &str,
146 igw_id: &str,
147 tag: &str,
148) -> Result<String, Ec2Error> {
149 let rt_resp = client
150 .create_route_table()
151 .vpc_id(vpc_id)
152 .tag_specifications(
153 TagSpecification::builder()
154 .resource_type(ResourceType::RouteTable)
155 .tags(Tag::builder().key("deployer").value(tag).build())
156 .build(),
157 )
158 .send()
159 .await?;
160 let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
161 client
162 .create_route()
163 .route_table_id(&rt_id)
164 .destination_cidr_block("0.0.0.0/0")
165 .gateway_id(igw_id)
166 .send()
167 .await?;
168 Ok(rt_id)
169}
170
171pub async fn create_subnet(
173 client: &Ec2Client,
174 vpc_id: &str,
175 route_table_id: &str,
176 subnet_cidr: &str,
177 availability_zone: &str,
178 tag: &str,
179) -> Result<String, Ec2Error> {
180 let subnet_resp = client
181 .create_subnet()
182 .vpc_id(vpc_id)
183 .cidr_block(subnet_cidr)
184 .availability_zone(availability_zone)
185 .tag_specifications(
186 TagSpecification::builder()
187 .resource_type(ResourceType::Subnet)
188 .tags(Tag::builder().key("deployer").value(tag).build())
189 .build(),
190 )
191 .send()
192 .await?;
193 let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
194 client
195 .associate_route_table()
196 .route_table_id(route_table_id)
197 .subnet_id(&subnet_id)
198 .send()
199 .await?;
200 Ok(subnet_id)
201}
202
203pub async fn create_security_group_monitoring(
205 client: &Ec2Client,
206 vpc_id: &str,
207 deployer_ip: &str,
208 tag: &str,
209) -> Result<String, Ec2Error> {
210 let sg_resp = client
211 .create_security_group()
212 .group_name(tag)
213 .description("Security group for monitoring instance")
214 .vpc_id(vpc_id)
215 .tag_specifications(
216 TagSpecification::builder()
217 .resource_type(ResourceType::SecurityGroup)
218 .tags(Tag::builder().key("deployer").value(tag).build())
219 .build(),
220 )
221 .send()
222 .await?;
223 let sg_id = sg_resp.group_id.unwrap();
224 client
225 .authorize_security_group_ingress()
226 .group_id(&sg_id)
227 .ip_permissions(
228 IpPermission::builder()
229 .ip_protocol(DEPLOYER_PROTOCOL)
230 .from_port(DEPLOYER_MIN_PORT)
231 .to_port(DEPLOYER_MAX_PORT)
232 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
233 .build(),
234 )
235 .send()
236 .await?;
237 Ok(sg_id)
238}
239
240pub async fn create_security_group_binary(
242 client: &Ec2Client,
243 vpc_id: &str,
244 deployer_ip: &str,
245 monitoring_ip: &str,
246 tag: &str,
247 ports: &[PortConfig],
248) -> Result<String, Ec2Error> {
249 let sg_resp = client
250 .create_security_group()
251 .group_name(format!("{tag}-binary"))
252 .description("Security group for binary instances")
253 .vpc_id(vpc_id)
254 .tag_specifications(
255 TagSpecification::builder()
256 .resource_type(ResourceType::SecurityGroup)
257 .tags(Tag::builder().key("deployer").value(tag).build())
258 .build(),
259 )
260 .send()
261 .await?;
262 let sg_id = sg_resp.group_id.unwrap();
263 let mut builder = client
264 .authorize_security_group_ingress()
265 .group_id(&sg_id)
266 .ip_permissions(
267 IpPermission::builder()
268 .ip_protocol(DEPLOYER_PROTOCOL)
269 .from_port(DEPLOYER_MIN_PORT)
270 .to_port(DEPLOYER_MAX_PORT)
271 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
272 .build(),
273 )
274 .ip_permissions(
275 IpPermission::builder()
276 .ip_protocol("tcp")
277 .from_port(METRICS_PORT as i32)
278 .to_port(METRICS_PORT as i32)
279 .ip_ranges(
280 IpRange::builder()
281 .cidr_ip(exact_cidr(monitoring_ip))
282 .build(),
283 )
284 .build(),
285 )
286 .ip_permissions(
287 IpPermission::builder()
288 .ip_protocol("tcp")
289 .from_port(SYSTEM_PORT as i32)
290 .to_port(SYSTEM_PORT as i32)
291 .ip_ranges(
292 IpRange::builder()
293 .cidr_ip(exact_cidr(monitoring_ip))
294 .build(),
295 )
296 .build(),
297 )
298 .ip_permissions(
299 IpPermission::builder()
300 .ip_protocol("tcp")
301 .from_port(MEMLEAK_PORT as i32)
302 .to_port(MEMLEAK_PORT as i32)
303 .ip_ranges(
304 IpRange::builder()
305 .cidr_ip(exact_cidr(monitoring_ip))
306 .build(),
307 )
308 .build(),
309 );
310 for port in ports {
311 builder = builder.ip_permissions(
312 IpPermission::builder()
313 .ip_protocol(&port.protocol)
314 .from_port(port.port as i32)
315 .to_port(port.port as i32)
316 .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
317 .build(),
318 );
319 }
320 builder.send().await?;
321 Ok(sg_id)
322}
323
324#[allow(clippy::too_many_arguments)]
326pub async fn launch_instances(
327 client: &Ec2Client,
328 ami_id: &str,
329 instance_type: InstanceType,
330 storage_size: i32,
331 storage_class: VolumeType,
332 key_name: &str,
333 subnet_id: &str,
334 sg_id: &str,
335 count: i32,
336 name: &str,
337 tag: &str,
338) -> Result<Vec<String>, Ec2Error> {
339 let resp = client
340 .run_instances()
341 .image_id(ami_id)
342 .instance_type(instance_type)
343 .key_name(key_name)
344 .min_count(count)
345 .max_count(count)
346 .network_interfaces(
347 aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
348 .associate_public_ip_address(true)
349 .device_index(0)
350 .subnet_id(subnet_id)
351 .groups(sg_id)
352 .build(),
353 )
354 .tag_specifications(
355 TagSpecification::builder()
356 .resource_type(ResourceType::Instance)
357 .set_tags(Some(vec![
358 Tag::builder().key("deployer").value(tag).build(),
359 Tag::builder().key("name").value(name).build(),
360 ]))
361 .build(),
362 )
363 .block_device_mappings(
364 BlockDeviceMapping::builder()
365 .device_name("/dev/sda1")
366 .ebs(
367 EbsBlockDevice::builder()
368 .volume_size(storage_size)
369 .volume_type(storage_class)
370 .delete_on_termination(true)
371 .build(),
372 )
373 .build(),
374 )
375 .send()
376 .await?;
377 Ok(resp
378 .instances
379 .unwrap()
380 .into_iter()
381 .map(|i| i.instance_id.unwrap())
382 .collect())
383}
384
385pub async fn wait_for_instances_running(
387 client: &Ec2Client,
388 instance_ids: &[String],
389) -> Result<Vec<String>, Ec2Error> {
390 loop {
391 let Ok(resp) = client
393 .describe_instances()
394 .set_instance_ids(Some(instance_ids.to_vec()))
395 .send()
396 .await
397 else {
398 sleep(RETRY_INTERVAL).await;
399 continue;
400 };
401
402 let reservations = resp.reservations.unwrap();
404 let instances = reservations[0].instances.as_ref().unwrap();
405 if !instances.iter().all(|i| {
406 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
407 }) {
408 sleep(RETRY_INTERVAL).await;
409 continue;
410 }
411 return Ok(instances
412 .iter()
413 .map(|i| i.public_ip_address.as_ref().unwrap().clone())
414 .collect());
415 }
416}
417
418pub async fn wait_for_instances_ready(
419 client: &Ec2Client,
420 instance_ids: &[String],
421) -> Result<(), Ec2Error> {
422 loop {
423 let Ok(resp) = client
425 .describe_instance_status()
426 .set_instance_ids(Some(instance_ids.to_vec()))
427 .include_all_instances(true) .send()
429 .await
430 else {
431 sleep(RETRY_INTERVAL).await;
432 continue;
433 };
434
435 let statuses = resp.instance_statuses.unwrap_or_default();
437 let all_ready = statuses.iter().all(|s| {
438 s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
439 && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
440 && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
441 == &SummaryStatus::Ok
442 });
443 if !all_ready {
444 sleep(RETRY_INTERVAL).await;
445 continue;
446 }
447 return Ok(());
448 }
449}
450
451pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
453 let resp = client
454 .describe_instances()
455 .instance_ids(instance_id)
456 .send()
457 .await?;
458 let reservations = resp.reservations.unwrap();
459 let instance = &reservations[0].instances.as_ref().unwrap()[0];
460 Ok(instance.private_ip_address.as_ref().unwrap().clone())
461}
462
463pub async fn create_vpc_peering_connection(
465 client: &Ec2Client,
466 requester_vpc_id: &str,
467 peer_vpc_id: &str,
468 peer_region: &str,
469 tag: &str,
470) -> Result<String, Ec2Error> {
471 let resp = client
472 .create_vpc_peering_connection()
473 .vpc_id(requester_vpc_id)
474 .peer_vpc_id(peer_vpc_id)
475 .peer_region(peer_region)
476 .tag_specifications(
477 TagSpecification::builder()
478 .resource_type(ResourceType::VpcPeeringConnection)
479 .tags(Tag::builder().key("deployer").value(tag).build())
480 .build(),
481 )
482 .send()
483 .await?;
484 Ok(resp
485 .vpc_peering_connection
486 .unwrap()
487 .vpc_peering_connection_id
488 .unwrap())
489}
490
491pub async fn wait_for_vpc_peering_connection(
493 client: &Ec2Client,
494 peer_id: &str,
495) -> Result<(), Ec2Error> {
496 loop {
497 if let Ok(resp) = client
498 .describe_vpc_peering_connections()
499 .vpc_peering_connection_ids(peer_id)
500 .send()
501 .await
502 {
503 if let Some(connections) = resp.vpc_peering_connections {
504 if let Some(connection) = connections.first() {
505 if connection.status.as_ref().unwrap().code
506 == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
507 {
508 return Ok(());
509 }
510 }
511 }
512 }
513 sleep(Duration::from_secs(2)).await;
514 }
515}
516
517pub async fn accept_vpc_peering_connection(
519 client: &Ec2Client,
520 peer_id: &str,
521) -> Result<(), Ec2Error> {
522 client
523 .accept_vpc_peering_connection()
524 .vpc_peering_connection_id(peer_id)
525 .send()
526 .await?;
527 Ok(())
528}
529
530pub async fn add_route(
532 client: &Ec2Client,
533 route_table_id: &str,
534 destination_cidr: &str,
535 peer_id: &str,
536) -> Result<(), Ec2Error> {
537 client
538 .create_route()
539 .route_table_id(route_table_id)
540 .destination_cidr_block(destination_cidr)
541 .vpc_peering_connection_id(peer_id)
542 .send()
543 .await?;
544 Ok(())
545}
546
547pub async fn find_vpc_peering_by_tag(
549 client: &Ec2Client,
550 tag: &str,
551) -> Result<Vec<String>, Ec2Error> {
552 let resp = client
553 .describe_vpc_peering_connections()
554 .filters(Filter::builder().name("tag:deployer").values(tag).build())
555 .send()
556 .await?;
557 Ok(resp
558 .vpc_peering_connections
559 .unwrap_or_default()
560 .into_iter()
561 .map(|p| p.vpc_peering_connection_id.unwrap())
562 .collect())
563}
564
565pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
567 client
568 .delete_vpc_peering_connection()
569 .vpc_peering_connection_id(peering_id)
570 .send()
571 .await?;
572 Ok(())
573}
574
575pub async fn wait_for_vpc_peering_deletion(
577 ec2_client: &Ec2Client,
578 peer_id: &str,
579) -> Result<(), Ec2Error> {
580 loop {
581 let resp = ec2_client
582 .describe_vpc_peering_connections()
583 .vpc_peering_connection_ids(peer_id)
584 .send()
585 .await?;
586 if let Some(connections) = resp.vpc_peering_connections {
587 if let Some(connection) = connections.first() {
588 if connection.status.as_ref().unwrap().code
589 == Some(VpcPeeringConnectionStateReasonCode::Deleted)
590 {
591 return Ok(());
592 }
593 } else {
594 return Ok(());
595 }
596 } else {
597 return Ok(());
598 }
599 sleep(RETRY_INTERVAL).await;
600 }
601}
602
603pub async fn find_instances_by_tag(
605 ec2_client: &Ec2Client,
606 tag: &str,
607) -> Result<Vec<String>, Ec2Error> {
608 let resp = ec2_client
609 .describe_instances()
610 .filters(Filter::builder().name("tag:deployer").values(tag).build())
611 .send()
612 .await?;
613 Ok(resp
614 .reservations
615 .unwrap_or_default()
616 .into_iter()
617 .flat_map(|r| r.instances.unwrap_or_default())
618 .map(|i| i.instance_id.unwrap())
619 .collect())
620}
621
622pub async fn terminate_instances(
624 ec2_client: &Ec2Client,
625 instance_ids: &[String],
626) -> Result<(), Ec2Error> {
627 if instance_ids.is_empty() {
628 return Ok(());
629 }
630 ec2_client
631 .terminate_instances()
632 .set_instance_ids(Some(instance_ids.to_vec()))
633 .send()
634 .await?;
635 Ok(())
636}
637
638pub async fn wait_for_instances_terminated(
640 ec2_client: &Ec2Client,
641 instance_ids: &[String],
642) -> Result<(), Ec2Error> {
643 loop {
644 let resp = ec2_client
645 .describe_instances()
646 .set_instance_ids(Some(instance_ids.to_vec()))
647 .send()
648 .await?;
649 let instances = resp
650 .reservations
651 .unwrap_or_default()
652 .into_iter()
653 .flat_map(|r| r.instances.unwrap_or_default())
654 .collect::<Vec<_>>();
655 if instances.iter().all(|i| {
656 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
657 }) {
658 return Ok(());
659 }
660 sleep(RETRY_INTERVAL).await;
661 }
662}
663
664pub async fn find_security_groups_by_tag(
666 ec2_client: &Ec2Client,
667 tag: &str,
668) -> Result<Vec<SecurityGroup>, Ec2Error> {
669 let resp = ec2_client
670 .describe_security_groups()
671 .filters(Filter::builder().name("tag:deployer").values(tag).build())
672 .send()
673 .await?;
674 Ok(resp
675 .security_groups
676 .unwrap_or_default()
677 .into_iter()
678 .collect())
679}
680
681pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
683 ec2_client
684 .delete_security_group()
685 .group_id(sg_id)
686 .send()
687 .await?;
688 Ok(())
689}
690
691pub async fn find_route_tables_by_tag(
693 ec2_client: &Ec2Client,
694 tag: &str,
695) -> Result<Vec<String>, Ec2Error> {
696 let resp = ec2_client
697 .describe_route_tables()
698 .filters(Filter::builder().name("tag:deployer").values(tag).build())
699 .send()
700 .await?;
701 Ok(resp
702 .route_tables
703 .unwrap_or_default()
704 .into_iter()
705 .map(|rt| rt.route_table_id.unwrap())
706 .collect())
707}
708
709pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
711 ec2_client
712 .delete_route_table()
713 .route_table_id(rt_id)
714 .send()
715 .await?;
716 Ok(())
717}
718
719pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
721 let resp = ec2_client
722 .describe_internet_gateways()
723 .filters(Filter::builder().name("tag:deployer").values(tag).build())
724 .send()
725 .await?;
726 Ok(resp
727 .internet_gateways
728 .unwrap_or_default()
729 .into_iter()
730 .map(|igw| igw.internet_gateway_id.unwrap())
731 .collect())
732}
733
734pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
736 let resp = ec2_client
737 .describe_internet_gateways()
738 .internet_gateway_ids(igw_id)
739 .send()
740 .await?;
741 Ok(resp.internet_gateways.unwrap()[0]
742 .attachments
743 .as_ref()
744 .unwrap()[0]
745 .vpc_id
746 .as_ref()
747 .unwrap()
748 .clone())
749}
750
751pub async fn detach_igw(
753 ec2_client: &Ec2Client,
754 igw_id: &str,
755 vpc_id: &str,
756) -> Result<(), Ec2Error> {
757 ec2_client
758 .detach_internet_gateway()
759 .internet_gateway_id(igw_id)
760 .vpc_id(vpc_id)
761 .send()
762 .await?;
763 Ok(())
764}
765
766pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
768 ec2_client
769 .delete_internet_gateway()
770 .internet_gateway_id(igw_id)
771 .send()
772 .await?;
773 Ok(())
774}
775
776pub async fn find_subnets_by_tag(
778 ec2_client: &Ec2Client,
779 tag: &str,
780) -> Result<Vec<String>, Ec2Error> {
781 let resp = ec2_client
782 .describe_subnets()
783 .filters(Filter::builder().name("tag:deployer").values(tag).build())
784 .send()
785 .await?;
786 Ok(resp
787 .subnets
788 .unwrap_or_default()
789 .into_iter()
790 .map(|subnet| subnet.subnet_id.unwrap())
791 .collect())
792}
793
794pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
796 ec2_client
797 .delete_subnet()
798 .subnet_id(subnet_id)
799 .send()
800 .await?;
801 Ok(())
802}
803
804pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
806 let resp = ec2_client
807 .describe_vpcs()
808 .filters(Filter::builder().name("tag:deployer").values(tag).build())
809 .send()
810 .await?;
811 Ok(resp
812 .vpcs
813 .unwrap_or_default()
814 .into_iter()
815 .map(|vpc| vpc.vpc_id.unwrap())
816 .collect())
817}
818
819pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
821 ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
822 Ok(())
823}
824
825pub async fn assert_arm64_support(
827 client: &Ec2Client,
828 instance_types: &[String],
829) -> Result<(), Ec2Error> {
830 let mut next_token: Option<String> = None;
831 let mut supported_instance_types = HashSet::new();
832
833 loop {
835 let mut request = client.describe_instance_types().filters(
837 Filter::builder()
838 .name("processor-info.supported-architecture")
839 .values("arm64")
840 .build(),
841 );
842 if let Some(token) = next_token {
843 request = request.next_token(token);
844 }
845 let response = request.send().await?;
846
847 for instance_type in response.instance_types.unwrap_or_default() {
849 if let Some(it) = instance_type.instance_type {
850 supported_instance_types.insert(it.to_string());
851 }
852 }
853
854 next_token = response.next_token;
856 if next_token.is_none() {
857 break;
858 }
859 }
860
861 for instance_type in instance_types {
863 if !supported_instance_types.contains(instance_type) {
864 return Err(Ec2Error::from(BuildError::other(format!(
865 "instance type {instance_type} not ARM64-based"
866 ))));
867 }
868 }
869 Ok(())
870}
871
872pub async fn find_availability_zone(
874 client: &Ec2Client,
875 instance_types: &[String],
876) -> Result<String, Ec2Error> {
877 let offerings = client
879 .describe_instance_type_offerings()
880 .location_type("availability-zone".into())
881 .filters(
882 Filter::builder()
883 .name("instance-type")
884 .set_values(Some(instance_types.to_vec()))
885 .build(),
886 )
887 .send()
888 .await?
889 .instance_type_offerings
890 .unwrap_or_default();
891
892 let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
894 for offering in offerings {
895 if let (Some(location), Some(instance_type)) = (
896 offering.location,
897 offering.instance_type.map(|it| it.to_string()), ) {
899 az_to_instance_types
900 .entry(location)
901 .or_default()
902 .insert(instance_type);
903 }
904 }
905
906 let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
908
909 for (az, supported_types) in az_to_instance_types {
911 if required_instance_types.is_subset(&supported_types) {
912 return Ok(az); }
914 }
915
916 Err(Ec2Error::from(BuildError::other(format!(
918 "no availability zone supports all required instance types: {instance_types:?}"
919 ))))
920}
921
922pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
924 loop {
925 let resp = ec2_client
926 .describe_network_interfaces()
927 .filters(Filter::builder().name("group-id").values(sg_id).build())
928 .send()
929 .await?;
930 if resp.network_interfaces.unwrap_or_default().is_empty() {
931 return Ok(());
932 }
933 sleep(RETRY_INTERVAL).await;
934 }
935}