1use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5 utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6 PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12 error::BuildError,
13 primitives::Blob,
14 types::{
15 BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16 SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17 },
18 Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21 collections::{HashMap, HashSet},
22 time::Duration,
23};
24use tokio::time::sleep;
25
26pub async fn create_ec2_client(region: Region) -> Ec2Client {
28 let retry = aws_config::retry::RetryConfig::adaptive()
29 .with_max_attempts(10)
30 .with_initial_backoff(Duration::from_millis(500))
31 .with_max_backoff(Duration::from_secs(30));
32 let config = aws_config::defaults(BehaviorVersion::v2025_01_17())
33 .region(region)
34 .retry_config(retry)
35 .load()
36 .await;
37 Ec2Client::new(&config)
38}
39
40pub async fn import_key_pair(
42 client: &Ec2Client,
43 key_name: &str,
44 public_key: &str,
45) -> Result<(), Ec2Error> {
46 let blob = Blob::new(public_key.as_bytes());
47 client
48 .import_key_pair()
49 .key_name(key_name)
50 .public_key_material(blob)
51 .send()
52 .await?;
53 Ok(())
54}
55
56pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58 client.delete_key_pair().key_name(key_name).send().await?;
59 Ok(())
60}
61
62pub async fn find_latest_ami(client: &Ec2Client) -> Result<String, Ec2Error> {
64 let resp = client
65 .describe_images()
66 .filters(
67 Filter::builder()
68 .name("name")
69 .values("ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*")
70 .build(),
71 )
72 .filters(
73 Filter::builder()
74 .name("root-device-type")
75 .values("ebs")
76 .build(),
77 )
78 .owners("099720109477") .send()
80 .await?;
81 let mut images = resp.images.unwrap_or_default();
82 if images.is_empty() {
83 return Err(Ec2Error::from(BuildError::other(
84 "No matching AMI found".to_string(),
85 )));
86 }
87 images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
88 let latest_ami = images[0].image_id().unwrap();
89 Ok(latest_ami.to_string())
90}
91
92pub async fn create_vpc(
94 client: &Ec2Client,
95 cidr_block: &str,
96 tag: &str,
97) -> Result<String, Ec2Error> {
98 let resp = client
99 .create_vpc()
100 .cidr_block(cidr_block)
101 .tag_specifications(
102 TagSpecification::builder()
103 .resource_type(ResourceType::Vpc)
104 .tags(Tag::builder().key("deployer").value(tag).build())
105 .build(),
106 )
107 .send()
108 .await?;
109 Ok(resp.vpc.unwrap().vpc_id.unwrap())
110}
111
112pub async fn create_and_attach_igw(
114 client: &Ec2Client,
115 vpc_id: &str,
116 tag: &str,
117) -> Result<String, Ec2Error> {
118 let igw_resp = client
119 .create_internet_gateway()
120 .tag_specifications(
121 TagSpecification::builder()
122 .resource_type(ResourceType::InternetGateway)
123 .tags(Tag::builder().key("deployer").value(tag).build())
124 .build(),
125 )
126 .send()
127 .await?;
128 let igw_id = igw_resp
129 .internet_gateway
130 .unwrap()
131 .internet_gateway_id
132 .unwrap();
133 client
134 .attach_internet_gateway()
135 .internet_gateway_id(&igw_id)
136 .vpc_id(vpc_id)
137 .send()
138 .await?;
139 Ok(igw_id)
140}
141
142pub async fn create_route_table(
144 client: &Ec2Client,
145 vpc_id: &str,
146 igw_id: &str,
147 tag: &str,
148) -> Result<String, Ec2Error> {
149 let rt_resp = client
150 .create_route_table()
151 .vpc_id(vpc_id)
152 .tag_specifications(
153 TagSpecification::builder()
154 .resource_type(ResourceType::RouteTable)
155 .tags(Tag::builder().key("deployer").value(tag).build())
156 .build(),
157 )
158 .send()
159 .await?;
160 let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
161 client
162 .create_route()
163 .route_table_id(&rt_id)
164 .destination_cidr_block("0.0.0.0/0")
165 .gateway_id(igw_id)
166 .send()
167 .await?;
168 Ok(rt_id)
169}
170
171pub async fn create_subnet(
173 client: &Ec2Client,
174 vpc_id: &str,
175 route_table_id: &str,
176 subnet_cidr: &str,
177 availability_zone: &str,
178 tag: &str,
179) -> Result<String, Ec2Error> {
180 let subnet_resp = client
181 .create_subnet()
182 .vpc_id(vpc_id)
183 .cidr_block(subnet_cidr)
184 .availability_zone(availability_zone)
185 .tag_specifications(
186 TagSpecification::builder()
187 .resource_type(ResourceType::Subnet)
188 .tags(Tag::builder().key("deployer").value(tag).build())
189 .build(),
190 )
191 .send()
192 .await?;
193 let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
194 client
195 .associate_route_table()
196 .route_table_id(route_table_id)
197 .subnet_id(&subnet_id)
198 .send()
199 .await?;
200 Ok(subnet_id)
201}
202
203pub async fn create_security_group_monitoring(
205 client: &Ec2Client,
206 vpc_id: &str,
207 deployer_ip: &str,
208 tag: &str,
209) -> Result<String, Ec2Error> {
210 let sg_resp = client
211 .create_security_group()
212 .group_name(tag)
213 .description("Security group for monitoring instance")
214 .vpc_id(vpc_id)
215 .tag_specifications(
216 TagSpecification::builder()
217 .resource_type(ResourceType::SecurityGroup)
218 .tags(Tag::builder().key("deployer").value(tag).build())
219 .build(),
220 )
221 .send()
222 .await?;
223 let sg_id = sg_resp.group_id.unwrap();
224 client
225 .authorize_security_group_ingress()
226 .group_id(&sg_id)
227 .ip_permissions(
228 IpPermission::builder()
229 .ip_protocol(DEPLOYER_PROTOCOL)
230 .from_port(DEPLOYER_MIN_PORT)
231 .to_port(DEPLOYER_MAX_PORT)
232 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
233 .build(),
234 )
235 .send()
236 .await?;
237 Ok(sg_id)
238}
239
240pub async fn create_security_group_binary(
242 client: &Ec2Client,
243 vpc_id: &str,
244 deployer_ip: &str,
245 monitoring_ip: &str,
246 tag: &str,
247 ports: &[PortConfig],
248) -> Result<String, Ec2Error> {
249 let sg_resp = client
250 .create_security_group()
251 .group_name(format!("{tag}-binary"))
252 .description("Security group for binary instances")
253 .vpc_id(vpc_id)
254 .tag_specifications(
255 TagSpecification::builder()
256 .resource_type(ResourceType::SecurityGroup)
257 .tags(Tag::builder().key("deployer").value(tag).build())
258 .build(),
259 )
260 .send()
261 .await?;
262 let sg_id = sg_resp.group_id.unwrap();
263 let mut builder = client
264 .authorize_security_group_ingress()
265 .group_id(&sg_id)
266 .ip_permissions(
267 IpPermission::builder()
268 .ip_protocol(DEPLOYER_PROTOCOL)
269 .from_port(DEPLOYER_MIN_PORT)
270 .to_port(DEPLOYER_MAX_PORT)
271 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
272 .build(),
273 )
274 .ip_permissions(
275 IpPermission::builder()
276 .ip_protocol("tcp")
277 .from_port(METRICS_PORT as i32)
278 .to_port(METRICS_PORT as i32)
279 .ip_ranges(
280 IpRange::builder()
281 .cidr_ip(exact_cidr(monitoring_ip))
282 .build(),
283 )
284 .build(),
285 )
286 .ip_permissions(
287 IpPermission::builder()
288 .ip_protocol("tcp")
289 .from_port(SYSTEM_PORT as i32)
290 .to_port(SYSTEM_PORT as i32)
291 .ip_ranges(
292 IpRange::builder()
293 .cidr_ip(exact_cidr(monitoring_ip))
294 .build(),
295 )
296 .build(),
297 );
298 for port in ports {
299 builder = builder.ip_permissions(
300 IpPermission::builder()
301 .ip_protocol(&port.protocol)
302 .from_port(port.port as i32)
303 .to_port(port.port as i32)
304 .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
305 .build(),
306 );
307 }
308 builder.send().await?;
309 Ok(sg_id)
310}
311
312#[allow(clippy::too_many_arguments)]
314pub async fn launch_instances(
315 client: &Ec2Client,
316 ami_id: &str,
317 instance_type: InstanceType,
318 storage_size: i32,
319 storage_class: VolumeType,
320 key_name: &str,
321 subnet_id: &str,
322 sg_id: &str,
323 count: i32,
324 name: &str,
325 tag: &str,
326) -> Result<Vec<String>, Ec2Error> {
327 let resp = client
328 .run_instances()
329 .image_id(ami_id)
330 .instance_type(instance_type)
331 .key_name(key_name)
332 .min_count(count)
333 .max_count(count)
334 .network_interfaces(
335 aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
336 .associate_public_ip_address(true)
337 .device_index(0)
338 .subnet_id(subnet_id)
339 .groups(sg_id)
340 .build(),
341 )
342 .tag_specifications(
343 TagSpecification::builder()
344 .resource_type(ResourceType::Instance)
345 .set_tags(Some(vec![
346 Tag::builder().key("deployer").value(tag).build(),
347 Tag::builder().key("name").value(name).build(),
348 ]))
349 .build(),
350 )
351 .block_device_mappings(
352 BlockDeviceMapping::builder()
353 .device_name("/dev/sda1")
354 .ebs(
355 EbsBlockDevice::builder()
356 .volume_size(storage_size)
357 .volume_type(storage_class)
358 .delete_on_termination(true)
359 .build(),
360 )
361 .build(),
362 )
363 .send()
364 .await?;
365 Ok(resp
366 .instances
367 .unwrap()
368 .into_iter()
369 .map(|i| i.instance_id.unwrap())
370 .collect())
371}
372
373pub async fn wait_for_instances_running(
375 client: &Ec2Client,
376 instance_ids: &[String],
377) -> Result<Vec<String>, Ec2Error> {
378 loop {
379 let Ok(resp) = client
381 .describe_instances()
382 .set_instance_ids(Some(instance_ids.to_vec()))
383 .send()
384 .await
385 else {
386 sleep(RETRY_INTERVAL).await;
387 continue;
388 };
389
390 let reservations = resp.reservations.unwrap();
392 let instances = reservations[0].instances.as_ref().unwrap();
393 if !instances.iter().all(|i| {
394 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
395 }) {
396 sleep(RETRY_INTERVAL).await;
397 continue;
398 }
399 return Ok(instances
400 .iter()
401 .map(|i| i.public_ip_address.as_ref().unwrap().clone())
402 .collect());
403 }
404}
405
406pub async fn wait_for_instances_ready(
407 client: &Ec2Client,
408 instance_ids: &[String],
409) -> Result<(), Ec2Error> {
410 loop {
411 let Ok(resp) = client
413 .describe_instance_status()
414 .set_instance_ids(Some(instance_ids.to_vec()))
415 .include_all_instances(true) .send()
417 .await
418 else {
419 sleep(RETRY_INTERVAL).await;
420 continue;
421 };
422
423 let statuses = resp.instance_statuses.unwrap_or_default();
425 let all_ready = statuses.iter().all(|s| {
426 s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
427 && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
428 && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
429 == &SummaryStatus::Ok
430 });
431 if !all_ready {
432 sleep(RETRY_INTERVAL).await;
433 continue;
434 }
435 return Ok(());
436 }
437}
438
439pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
441 let resp = client
442 .describe_instances()
443 .instance_ids(instance_id)
444 .send()
445 .await?;
446 let reservations = resp.reservations.unwrap();
447 let instance = &reservations[0].instances.as_ref().unwrap()[0];
448 Ok(instance.private_ip_address.as_ref().unwrap().clone())
449}
450
451pub async fn create_vpc_peering_connection(
453 client: &Ec2Client,
454 requester_vpc_id: &str,
455 peer_vpc_id: &str,
456 peer_region: &str,
457 tag: &str,
458) -> Result<String, Ec2Error> {
459 let resp = client
460 .create_vpc_peering_connection()
461 .vpc_id(requester_vpc_id)
462 .peer_vpc_id(peer_vpc_id)
463 .peer_region(peer_region)
464 .tag_specifications(
465 TagSpecification::builder()
466 .resource_type(ResourceType::VpcPeeringConnection)
467 .tags(Tag::builder().key("deployer").value(tag).build())
468 .build(),
469 )
470 .send()
471 .await?;
472 Ok(resp
473 .vpc_peering_connection
474 .unwrap()
475 .vpc_peering_connection_id
476 .unwrap())
477}
478
479pub async fn wait_for_vpc_peering_connection(
481 client: &Ec2Client,
482 peer_id: &str,
483) -> Result<(), Ec2Error> {
484 loop {
485 if let Ok(resp) = client
486 .describe_vpc_peering_connections()
487 .vpc_peering_connection_ids(peer_id)
488 .send()
489 .await
490 {
491 if let Some(connections) = resp.vpc_peering_connections {
492 if let Some(connection) = connections.first() {
493 if connection.status.as_ref().unwrap().code
494 == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
495 {
496 return Ok(());
497 }
498 }
499 }
500 }
501 sleep(Duration::from_secs(2)).await;
502 }
503}
504
505pub async fn accept_vpc_peering_connection(
507 client: &Ec2Client,
508 peer_id: &str,
509) -> Result<(), Ec2Error> {
510 client
511 .accept_vpc_peering_connection()
512 .vpc_peering_connection_id(peer_id)
513 .send()
514 .await?;
515 Ok(())
516}
517
518pub async fn add_route(
520 client: &Ec2Client,
521 route_table_id: &str,
522 destination_cidr: &str,
523 peer_id: &str,
524) -> Result<(), Ec2Error> {
525 client
526 .create_route()
527 .route_table_id(route_table_id)
528 .destination_cidr_block(destination_cidr)
529 .vpc_peering_connection_id(peer_id)
530 .send()
531 .await?;
532 Ok(())
533}
534
535pub async fn find_vpc_peering_by_tag(
537 client: &Ec2Client,
538 tag: &str,
539) -> Result<Vec<String>, Ec2Error> {
540 let resp = client
541 .describe_vpc_peering_connections()
542 .filters(Filter::builder().name("tag:deployer").values(tag).build())
543 .send()
544 .await?;
545 Ok(resp
546 .vpc_peering_connections
547 .unwrap_or_default()
548 .into_iter()
549 .map(|p| p.vpc_peering_connection_id.unwrap())
550 .collect())
551}
552
553pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
555 client
556 .delete_vpc_peering_connection()
557 .vpc_peering_connection_id(peering_id)
558 .send()
559 .await?;
560 Ok(())
561}
562
563pub async fn wait_for_vpc_peering_deletion(
565 ec2_client: &Ec2Client,
566 peer_id: &str,
567) -> Result<(), Ec2Error> {
568 loop {
569 let resp = ec2_client
570 .describe_vpc_peering_connections()
571 .vpc_peering_connection_ids(peer_id)
572 .send()
573 .await?;
574 if let Some(connections) = resp.vpc_peering_connections {
575 if let Some(connection) = connections.first() {
576 if connection.status.as_ref().unwrap().code
577 == Some(VpcPeeringConnectionStateReasonCode::Deleted)
578 {
579 return Ok(());
580 }
581 } else {
582 return Ok(());
583 }
584 } else {
585 return Ok(());
586 }
587 sleep(RETRY_INTERVAL).await;
588 }
589}
590
591pub async fn find_instances_by_tag(
593 ec2_client: &Ec2Client,
594 tag: &str,
595) -> Result<Vec<String>, Ec2Error> {
596 let resp = ec2_client
597 .describe_instances()
598 .filters(Filter::builder().name("tag:deployer").values(tag).build())
599 .send()
600 .await?;
601 Ok(resp
602 .reservations
603 .unwrap_or_default()
604 .into_iter()
605 .flat_map(|r| r.instances.unwrap_or_default())
606 .map(|i| i.instance_id.unwrap())
607 .collect())
608}
609
610pub async fn terminate_instances(
612 ec2_client: &Ec2Client,
613 instance_ids: &[String],
614) -> Result<(), Ec2Error> {
615 if instance_ids.is_empty() {
616 return Ok(());
617 }
618 ec2_client
619 .terminate_instances()
620 .set_instance_ids(Some(instance_ids.to_vec()))
621 .send()
622 .await?;
623 Ok(())
624}
625
626pub async fn wait_for_instances_terminated(
628 ec2_client: &Ec2Client,
629 instance_ids: &[String],
630) -> Result<(), Ec2Error> {
631 loop {
632 let resp = ec2_client
633 .describe_instances()
634 .set_instance_ids(Some(instance_ids.to_vec()))
635 .send()
636 .await?;
637 let instances = resp
638 .reservations
639 .unwrap_or_default()
640 .into_iter()
641 .flat_map(|r| r.instances.unwrap_or_default())
642 .collect::<Vec<_>>();
643 if instances.iter().all(|i| {
644 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
645 }) {
646 return Ok(());
647 }
648 sleep(RETRY_INTERVAL).await;
649 }
650}
651
652pub async fn find_security_groups_by_tag(
654 ec2_client: &Ec2Client,
655 tag: &str,
656) -> Result<Vec<SecurityGroup>, Ec2Error> {
657 let resp = ec2_client
658 .describe_security_groups()
659 .filters(Filter::builder().name("tag:deployer").values(tag).build())
660 .send()
661 .await?;
662 Ok(resp
663 .security_groups
664 .unwrap_or_default()
665 .into_iter()
666 .collect())
667}
668
669pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
671 ec2_client
672 .delete_security_group()
673 .group_id(sg_id)
674 .send()
675 .await?;
676 Ok(())
677}
678
679pub async fn find_route_tables_by_tag(
681 ec2_client: &Ec2Client,
682 tag: &str,
683) -> Result<Vec<String>, Ec2Error> {
684 let resp = ec2_client
685 .describe_route_tables()
686 .filters(Filter::builder().name("tag:deployer").values(tag).build())
687 .send()
688 .await?;
689 Ok(resp
690 .route_tables
691 .unwrap_or_default()
692 .into_iter()
693 .map(|rt| rt.route_table_id.unwrap())
694 .collect())
695}
696
697pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
699 ec2_client
700 .delete_route_table()
701 .route_table_id(rt_id)
702 .send()
703 .await?;
704 Ok(())
705}
706
707pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
709 let resp = ec2_client
710 .describe_internet_gateways()
711 .filters(Filter::builder().name("tag:deployer").values(tag).build())
712 .send()
713 .await?;
714 Ok(resp
715 .internet_gateways
716 .unwrap_or_default()
717 .into_iter()
718 .map(|igw| igw.internet_gateway_id.unwrap())
719 .collect())
720}
721
722pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
724 let resp = ec2_client
725 .describe_internet_gateways()
726 .internet_gateway_ids(igw_id)
727 .send()
728 .await?;
729 Ok(resp.internet_gateways.unwrap()[0]
730 .attachments
731 .as_ref()
732 .unwrap()[0]
733 .vpc_id
734 .as_ref()
735 .unwrap()
736 .clone())
737}
738
739pub async fn detach_igw(
741 ec2_client: &Ec2Client,
742 igw_id: &str,
743 vpc_id: &str,
744) -> Result<(), Ec2Error> {
745 ec2_client
746 .detach_internet_gateway()
747 .internet_gateway_id(igw_id)
748 .vpc_id(vpc_id)
749 .send()
750 .await?;
751 Ok(())
752}
753
754pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
756 ec2_client
757 .delete_internet_gateway()
758 .internet_gateway_id(igw_id)
759 .send()
760 .await?;
761 Ok(())
762}
763
764pub async fn find_subnets_by_tag(
766 ec2_client: &Ec2Client,
767 tag: &str,
768) -> Result<Vec<String>, Ec2Error> {
769 let resp = ec2_client
770 .describe_subnets()
771 .filters(Filter::builder().name("tag:deployer").values(tag).build())
772 .send()
773 .await?;
774 Ok(resp
775 .subnets
776 .unwrap_or_default()
777 .into_iter()
778 .map(|subnet| subnet.subnet_id.unwrap())
779 .collect())
780}
781
782pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
784 ec2_client
785 .delete_subnet()
786 .subnet_id(subnet_id)
787 .send()
788 .await?;
789 Ok(())
790}
791
792pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
794 let resp = ec2_client
795 .describe_vpcs()
796 .filters(Filter::builder().name("tag:deployer").values(tag).build())
797 .send()
798 .await?;
799 Ok(resp
800 .vpcs
801 .unwrap_or_default()
802 .into_iter()
803 .map(|vpc| vpc.vpc_id.unwrap())
804 .collect())
805}
806
807pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
809 ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
810 Ok(())
811}
812
813pub async fn assert_arm64_support(
815 client: &Ec2Client,
816 instance_types: &[String],
817) -> Result<(), Ec2Error> {
818 let mut next_token: Option<String> = None;
819 let mut supported_instance_types = HashSet::new();
820
821 loop {
823 let mut request = client.describe_instance_types().filters(
825 Filter::builder()
826 .name("processor-info.supported-architecture")
827 .values("arm64")
828 .build(),
829 );
830 if let Some(token) = next_token {
831 request = request.next_token(token);
832 }
833 let response = request.send().await?;
834
835 for instance_type in response.instance_types.unwrap_or_default() {
837 if let Some(it) = instance_type.instance_type {
838 supported_instance_types.insert(it.to_string());
839 }
840 }
841
842 next_token = response.next_token;
844 if next_token.is_none() {
845 break;
846 }
847 }
848
849 for instance_type in instance_types {
851 if !supported_instance_types.contains(instance_type) {
852 return Err(Ec2Error::from(BuildError::other(format!(
853 "instance type {instance_type} not ARM64-based"
854 ))));
855 }
856 }
857 Ok(())
858}
859
860pub async fn find_availability_zone(
862 client: &Ec2Client,
863 instance_types: &[String],
864) -> Result<String, Ec2Error> {
865 let offerings = client
867 .describe_instance_type_offerings()
868 .location_type("availability-zone".into())
869 .filters(
870 Filter::builder()
871 .name("instance-type")
872 .set_values(Some(instance_types.to_vec()))
873 .build(),
874 )
875 .send()
876 .await?
877 .instance_type_offerings
878 .unwrap_or_default();
879
880 let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
882 for offering in offerings {
883 if let (Some(location), Some(instance_type)) = (
884 offering.location,
885 offering.instance_type.map(|it| it.to_string()), ) {
887 az_to_instance_types
888 .entry(location)
889 .or_default()
890 .insert(instance_type);
891 }
892 }
893
894 let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
896
897 for (az, supported_types) in az_to_instance_types {
899 if required_instance_types.is_subset(&supported_types) {
900 return Ok(az); }
902 }
903
904 Err(Ec2Error::from(BuildError::other(format!(
906 "no availability zone supports all required instance types: {instance_types:?}"
907 ))))
908}
909
910pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
912 loop {
913 let resp = ec2_client
914 .describe_network_interfaces()
915 .filters(Filter::builder().name("group-id").values(sg_id).build())
916 .send()
917 .await?;
918 if resp.network_interfaces.unwrap_or_default().is_empty() {
919 return Ok(());
920 }
921 sleep(RETRY_INTERVAL).await;
922 }
923}