1use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::ec2::{
5 utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6 PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10pub use aws_sdk_ec2::types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType};
11use aws_sdk_ec2::{
12 error::BuildError,
13 primitives::Blob,
14 types::{
15 BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
16 SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
17 },
18 Client as Ec2Client, Error as Ec2Error,
19};
20use std::{
21 collections::{HashMap, HashSet},
22 time::Duration,
23};
24use tokio::time::sleep;
25
26pub async fn create_ec2_client(region: Region) -> Ec2Client {
28 let retry = aws_config::retry::RetryConfig::adaptive()
29 .with_max_attempts(10)
30 .with_initial_backoff(Duration::from_millis(500))
31 .with_max_backoff(Duration::from_secs(30));
32 let config = aws_config::defaults(BehaviorVersion::v2025_08_07())
33 .region(region)
34 .retry_config(retry)
35 .load()
36 .await;
37 Ec2Client::new(&config)
38}
39
40pub async fn import_key_pair(
42 client: &Ec2Client,
43 key_name: &str,
44 public_key: &str,
45) -> Result<(), Ec2Error> {
46 let blob = Blob::new(public_key.as_bytes());
47 client
48 .import_key_pair()
49 .key_name(key_name)
50 .public_key_material(blob)
51 .send()
52 .await?;
53 Ok(())
54}
55
56pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
58 client.delete_key_pair().key_name(key_name).send().await?;
59 Ok(())
60}
61
62pub(crate) async fn detect_architecture(
64 client: &Ec2Client,
65 instance_type: &str,
66) -> Result<super::Architecture, Ec2Error> {
67 let response = client
68 .describe_instance_types()
69 .instance_types(InstanceType::try_parse(instance_type).expect("invalid instance type"))
70 .send()
71 .await?;
72
73 let instance_info = response
74 .instance_types
75 .and_then(|types| types.into_iter().next())
76 .ok_or_else(|| {
77 Ec2Error::from(BuildError::other(format!(
78 "instance type {instance_type} not found"
79 )))
80 })?;
81
82 let architectures = instance_info
83 .processor_info
84 .and_then(|p| p.supported_architectures)
85 .unwrap_or_default();
86
87 if architectures.iter().any(|a| a.as_ref() == "arm64") {
90 Ok(super::Architecture::Arm64)
91 } else if architectures.iter().any(|a| a.as_ref() == "x86_64") {
92 Ok(super::Architecture::X86_64)
93 } else {
94 Err(Ec2Error::from(BuildError::other(format!(
95 "instance type {instance_type} has no supported architecture"
96 ))))
97 }
98}
99
100pub(crate) async fn find_latest_ami(
102 client: &Ec2Client,
103 architecture: super::Architecture,
104) -> Result<String, Ec2Error> {
105 let arch = architecture.as_str();
106 let resp = client
107 .describe_images()
108 .filters(
109 Filter::builder()
110 .name("name")
111 .values(format!(
112 "ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-{arch}-server-*"
113 ))
114 .build(),
115 )
116 .filters(
117 Filter::builder()
118 .name("root-device-type")
119 .values("ebs")
120 .build(),
121 )
122 .owners("099720109477") .send()
124 .await?;
125 let mut images = resp.images.unwrap_or_default();
126 if images.is_empty() {
127 return Err(Ec2Error::from(BuildError::other(
128 "No matching AMI found".to_string(),
129 )));
130 }
131 images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
132 let latest_ami = images[0].image_id().unwrap();
133 Ok(latest_ami.to_string())
134}
135
136pub async fn create_vpc(
138 client: &Ec2Client,
139 cidr_block: &str,
140 tag: &str,
141) -> Result<String, Ec2Error> {
142 let resp = client
143 .create_vpc()
144 .cidr_block(cidr_block)
145 .tag_specifications(
146 TagSpecification::builder()
147 .resource_type(ResourceType::Vpc)
148 .tags(Tag::builder().key("deployer").value(tag).build())
149 .build(),
150 )
151 .send()
152 .await?;
153 Ok(resp.vpc.unwrap().vpc_id.unwrap())
154}
155
156pub async fn create_and_attach_igw(
158 client: &Ec2Client,
159 vpc_id: &str,
160 tag: &str,
161) -> Result<String, Ec2Error> {
162 let igw_resp = client
163 .create_internet_gateway()
164 .tag_specifications(
165 TagSpecification::builder()
166 .resource_type(ResourceType::InternetGateway)
167 .tags(Tag::builder().key("deployer").value(tag).build())
168 .build(),
169 )
170 .send()
171 .await?;
172 let igw_id = igw_resp
173 .internet_gateway
174 .unwrap()
175 .internet_gateway_id
176 .unwrap();
177 client
178 .attach_internet_gateway()
179 .internet_gateway_id(&igw_id)
180 .vpc_id(vpc_id)
181 .send()
182 .await?;
183 Ok(igw_id)
184}
185
186pub async fn create_route_table(
188 client: &Ec2Client,
189 vpc_id: &str,
190 igw_id: &str,
191 tag: &str,
192) -> Result<String, Ec2Error> {
193 let rt_resp = client
194 .create_route_table()
195 .vpc_id(vpc_id)
196 .tag_specifications(
197 TagSpecification::builder()
198 .resource_type(ResourceType::RouteTable)
199 .tags(Tag::builder().key("deployer").value(tag).build())
200 .build(),
201 )
202 .send()
203 .await?;
204 let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
205 client
206 .create_route()
207 .route_table_id(&rt_id)
208 .destination_cidr_block("0.0.0.0/0")
209 .gateway_id(igw_id)
210 .send()
211 .await?;
212 Ok(rt_id)
213}
214
215pub async fn create_subnet(
217 client: &Ec2Client,
218 vpc_id: &str,
219 route_table_id: &str,
220 subnet_cidr: &str,
221 availability_zone: &str,
222 tag: &str,
223) -> Result<String, Ec2Error> {
224 let subnet_resp = client
225 .create_subnet()
226 .vpc_id(vpc_id)
227 .cidr_block(subnet_cidr)
228 .availability_zone(availability_zone)
229 .tag_specifications(
230 TagSpecification::builder()
231 .resource_type(ResourceType::Subnet)
232 .tags(Tag::builder().key("deployer").value(tag).build())
233 .build(),
234 )
235 .send()
236 .await?;
237 let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
238 client
239 .associate_route_table()
240 .route_table_id(route_table_id)
241 .subnet_id(&subnet_id)
242 .send()
243 .await?;
244 Ok(subnet_id)
245}
246
247pub async fn create_security_group_monitoring(
249 client: &Ec2Client,
250 vpc_id: &str,
251 deployer_ip: &str,
252 tag: &str,
253) -> Result<String, Ec2Error> {
254 let sg_resp = client
255 .create_security_group()
256 .group_name(tag)
257 .description("Security group for monitoring instance")
258 .vpc_id(vpc_id)
259 .tag_specifications(
260 TagSpecification::builder()
261 .resource_type(ResourceType::SecurityGroup)
262 .tags(Tag::builder().key("deployer").value(tag).build())
263 .build(),
264 )
265 .send()
266 .await?;
267 let sg_id = sg_resp.group_id.unwrap();
268 client
269 .authorize_security_group_ingress()
270 .group_id(&sg_id)
271 .ip_permissions(
272 IpPermission::builder()
273 .ip_protocol(DEPLOYER_PROTOCOL)
274 .from_port(DEPLOYER_MIN_PORT)
275 .to_port(DEPLOYER_MAX_PORT)
276 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
277 .build(),
278 )
279 .send()
280 .await?;
281 Ok(sg_id)
282}
283
284pub async fn create_security_group_binary(
287 client: &Ec2Client,
288 vpc_id: &str,
289 deployer_ip: &str,
290 tag: &str,
291 ports: &[PortConfig],
292) -> Result<String, Ec2Error> {
293 let sg_resp = client
294 .create_security_group()
295 .group_name(format!("{tag}-binary"))
296 .description("Security group for binary instances")
297 .vpc_id(vpc_id)
298 .tag_specifications(
299 TagSpecification::builder()
300 .resource_type(ResourceType::SecurityGroup)
301 .tags(Tag::builder().key("deployer").value(tag).build())
302 .build(),
303 )
304 .send()
305 .await?;
306 let sg_id = sg_resp.group_id.unwrap();
307 let mut builder = client
308 .authorize_security_group_ingress()
309 .group_id(&sg_id)
310 .ip_permissions(
311 IpPermission::builder()
312 .ip_protocol(DEPLOYER_PROTOCOL)
313 .from_port(DEPLOYER_MIN_PORT)
314 .to_port(DEPLOYER_MAX_PORT)
315 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
316 .build(),
317 );
318 for port in ports {
319 builder = builder.ip_permissions(
320 IpPermission::builder()
321 .ip_protocol(&port.protocol)
322 .from_port(port.port as i32)
323 .to_port(port.port as i32)
324 .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
325 .build(),
326 );
327 }
328 builder.send().await?;
329 Ok(sg_id)
330}
331
332pub async fn add_monitoring_ingress(
334 client: &Ec2Client,
335 sg_id: &str,
336 monitoring_ip: &str,
337) -> Result<(), Ec2Error> {
338 client
339 .authorize_security_group_ingress()
340 .group_id(sg_id)
341 .ip_permissions(
342 IpPermission::builder()
343 .ip_protocol("tcp")
344 .from_port(METRICS_PORT as i32)
345 .to_port(METRICS_PORT as i32)
346 .ip_ranges(
347 IpRange::builder()
348 .cidr_ip(exact_cidr(monitoring_ip))
349 .build(),
350 )
351 .build(),
352 )
353 .ip_permissions(
354 IpPermission::builder()
355 .ip_protocol("tcp")
356 .from_port(SYSTEM_PORT as i32)
357 .to_port(SYSTEM_PORT as i32)
358 .ip_ranges(
359 IpRange::builder()
360 .cidr_ip(exact_cidr(monitoring_ip))
361 .build(),
362 )
363 .build(),
364 )
365 .send()
366 .await?;
367 Ok(())
368}
369
370#[allow(clippy::too_many_arguments)]
372pub async fn launch_instances(
373 client: &Ec2Client,
374 ami_id: &str,
375 instance_type: InstanceType,
376 storage_size: i32,
377 storage_class: VolumeType,
378 key_name: &str,
379 subnet_id: &str,
380 sg_id: &str,
381 count: i32,
382 name: &str,
383 tag: &str,
384) -> Result<Vec<String>, Ec2Error> {
385 let resp = client
386 .run_instances()
387 .image_id(ami_id)
388 .instance_type(instance_type)
389 .key_name(key_name)
390 .min_count(count)
391 .max_count(count)
392 .network_interfaces(
393 aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
394 .associate_public_ip_address(true)
395 .device_index(0)
396 .subnet_id(subnet_id)
397 .groups(sg_id)
398 .build(),
399 )
400 .tag_specifications(
401 TagSpecification::builder()
402 .resource_type(ResourceType::Instance)
403 .set_tags(Some(vec![
404 Tag::builder().key("deployer").value(tag).build(),
405 Tag::builder().key("name").value(name).build(),
406 ]))
407 .build(),
408 )
409 .block_device_mappings(
410 BlockDeviceMapping::builder()
411 .device_name("/dev/sda1")
412 .ebs(
413 EbsBlockDevice::builder()
414 .volume_size(storage_size)
415 .volume_type(storage_class)
416 .delete_on_termination(true)
417 .build(),
418 )
419 .build(),
420 )
421 .send()
422 .await?;
423 Ok(resp
424 .instances
425 .unwrap()
426 .into_iter()
427 .map(|i| i.instance_id.unwrap())
428 .collect())
429}
430
431pub async fn wait_for_instances_running(
433 client: &Ec2Client,
434 instance_ids: &[String],
435) -> Result<Vec<String>, Ec2Error> {
436 loop {
437 let Ok(resp) = client
439 .describe_instances()
440 .set_instance_ids(Some(instance_ids.to_vec()))
441 .send()
442 .await
443 else {
444 sleep(RETRY_INTERVAL).await;
445 continue;
446 };
447
448 let reservations = resp.reservations.unwrap();
450 let instances = reservations[0].instances.as_ref().unwrap();
451 if !instances.iter().all(|i| {
452 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
453 }) {
454 sleep(RETRY_INTERVAL).await;
455 continue;
456 }
457 return Ok(instances
458 .iter()
459 .map(|i| i.public_ip_address.as_ref().unwrap().clone())
460 .collect());
461 }
462}
463
464pub async fn wait_for_instances_ready(
465 client: &Ec2Client,
466 instance_ids: &[String],
467) -> Result<(), Ec2Error> {
468 loop {
469 let Ok(resp) = client
471 .describe_instance_status()
472 .set_instance_ids(Some(instance_ids.to_vec()))
473 .include_all_instances(true) .send()
475 .await
476 else {
477 sleep(RETRY_INTERVAL).await;
478 continue;
479 };
480
481 let statuses = resp.instance_statuses.unwrap_or_default();
483 let all_ready = statuses.iter().all(|s| {
484 s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
485 && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
486 && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
487 == &SummaryStatus::Ok
488 });
489 if !all_ready {
490 sleep(RETRY_INTERVAL).await;
491 continue;
492 }
493 return Ok(());
494 }
495}
496
497pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
499 let resp = client
500 .describe_instances()
501 .instance_ids(instance_id)
502 .send()
503 .await?;
504 let reservations = resp.reservations.unwrap();
505 let instance = &reservations[0].instances.as_ref().unwrap()[0];
506 Ok(instance.private_ip_address.as_ref().unwrap().clone())
507}
508
509pub async fn create_vpc_peering_connection(
511 client: &Ec2Client,
512 requester_vpc_id: &str,
513 peer_vpc_id: &str,
514 peer_region: &str,
515 tag: &str,
516) -> Result<String, Ec2Error> {
517 let resp = client
518 .create_vpc_peering_connection()
519 .vpc_id(requester_vpc_id)
520 .peer_vpc_id(peer_vpc_id)
521 .peer_region(peer_region)
522 .tag_specifications(
523 TagSpecification::builder()
524 .resource_type(ResourceType::VpcPeeringConnection)
525 .tags(Tag::builder().key("deployer").value(tag).build())
526 .build(),
527 )
528 .send()
529 .await?;
530 Ok(resp
531 .vpc_peering_connection
532 .unwrap()
533 .vpc_peering_connection_id
534 .unwrap())
535}
536
537pub async fn wait_for_vpc_peering_connection(
539 client: &Ec2Client,
540 peer_id: &str,
541) -> Result<(), Ec2Error> {
542 loop {
543 if let Ok(resp) = client
544 .describe_vpc_peering_connections()
545 .vpc_peering_connection_ids(peer_id)
546 .send()
547 .await
548 {
549 if let Some(connections) = resp.vpc_peering_connections {
550 if let Some(connection) = connections.first() {
551 if connection.status.as_ref().unwrap().code
552 == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
553 {
554 return Ok(());
555 }
556 }
557 }
558 }
559 sleep(Duration::from_secs(2)).await;
560 }
561}
562
563pub async fn accept_vpc_peering_connection(
565 client: &Ec2Client,
566 peer_id: &str,
567) -> Result<(), Ec2Error> {
568 client
569 .accept_vpc_peering_connection()
570 .vpc_peering_connection_id(peer_id)
571 .send()
572 .await?;
573 Ok(())
574}
575
576pub async fn add_route(
578 client: &Ec2Client,
579 route_table_id: &str,
580 destination_cidr: &str,
581 peer_id: &str,
582) -> Result<(), Ec2Error> {
583 client
584 .create_route()
585 .route_table_id(route_table_id)
586 .destination_cidr_block(destination_cidr)
587 .vpc_peering_connection_id(peer_id)
588 .send()
589 .await?;
590 Ok(())
591}
592
593pub async fn find_vpc_peering_by_tag(
595 client: &Ec2Client,
596 tag: &str,
597) -> Result<Vec<String>, Ec2Error> {
598 let resp = client
599 .describe_vpc_peering_connections()
600 .filters(Filter::builder().name("tag:deployer").values(tag).build())
601 .send()
602 .await?;
603 Ok(resp
604 .vpc_peering_connections
605 .unwrap_or_default()
606 .into_iter()
607 .map(|p| p.vpc_peering_connection_id.unwrap())
608 .collect())
609}
610
611pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
613 client
614 .delete_vpc_peering_connection()
615 .vpc_peering_connection_id(peering_id)
616 .send()
617 .await?;
618 Ok(())
619}
620
621pub async fn wait_for_vpc_peering_deletion(
623 ec2_client: &Ec2Client,
624 peer_id: &str,
625) -> Result<(), Ec2Error> {
626 loop {
627 let resp = ec2_client
628 .describe_vpc_peering_connections()
629 .vpc_peering_connection_ids(peer_id)
630 .send()
631 .await?;
632 if let Some(connections) = resp.vpc_peering_connections {
633 if let Some(connection) = connections.first() {
634 if connection.status.as_ref().unwrap().code
635 == Some(VpcPeeringConnectionStateReasonCode::Deleted)
636 {
637 return Ok(());
638 }
639 } else {
640 return Ok(());
641 }
642 } else {
643 return Ok(());
644 }
645 sleep(RETRY_INTERVAL).await;
646 }
647}
648
649pub async fn find_instances_by_tag(
651 ec2_client: &Ec2Client,
652 tag: &str,
653) -> Result<Vec<String>, Ec2Error> {
654 let resp = ec2_client
655 .describe_instances()
656 .filters(Filter::builder().name("tag:deployer").values(tag).build())
657 .send()
658 .await?;
659 Ok(resp
660 .reservations
661 .unwrap_or_default()
662 .into_iter()
663 .flat_map(|r| r.instances.unwrap_or_default())
664 .map(|i| i.instance_id.unwrap())
665 .collect())
666}
667
668pub async fn terminate_instances(
670 ec2_client: &Ec2Client,
671 instance_ids: &[String],
672) -> Result<(), Ec2Error> {
673 if instance_ids.is_empty() {
674 return Ok(());
675 }
676 ec2_client
677 .terminate_instances()
678 .set_instance_ids(Some(instance_ids.to_vec()))
679 .send()
680 .await?;
681 Ok(())
682}
683
684pub async fn wait_for_instances_terminated(
686 ec2_client: &Ec2Client,
687 instance_ids: &[String],
688) -> Result<(), Ec2Error> {
689 loop {
690 let resp = ec2_client
691 .describe_instances()
692 .set_instance_ids(Some(instance_ids.to_vec()))
693 .send()
694 .await?;
695 let instances = resp
696 .reservations
697 .unwrap_or_default()
698 .into_iter()
699 .flat_map(|r| r.instances.unwrap_or_default())
700 .collect::<Vec<_>>();
701 if instances.iter().all(|i| {
702 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
703 }) {
704 return Ok(());
705 }
706 sleep(RETRY_INTERVAL).await;
707 }
708}
709
710pub async fn find_security_groups_by_tag(
712 ec2_client: &Ec2Client,
713 tag: &str,
714) -> Result<Vec<SecurityGroup>, Ec2Error> {
715 let resp = ec2_client
716 .describe_security_groups()
717 .filters(Filter::builder().name("tag:deployer").values(tag).build())
718 .send()
719 .await?;
720 Ok(resp
721 .security_groups
722 .unwrap_or_default()
723 .into_iter()
724 .collect())
725}
726
727pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
729 ec2_client
730 .delete_security_group()
731 .group_id(sg_id)
732 .send()
733 .await?;
734 Ok(())
735}
736
737pub async fn find_route_tables_by_tag(
739 ec2_client: &Ec2Client,
740 tag: &str,
741) -> Result<Vec<String>, Ec2Error> {
742 let resp = ec2_client
743 .describe_route_tables()
744 .filters(Filter::builder().name("tag:deployer").values(tag).build())
745 .send()
746 .await?;
747 Ok(resp
748 .route_tables
749 .unwrap_or_default()
750 .into_iter()
751 .map(|rt| rt.route_table_id.unwrap())
752 .collect())
753}
754
755pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
757 ec2_client
758 .delete_route_table()
759 .route_table_id(rt_id)
760 .send()
761 .await?;
762 Ok(())
763}
764
765pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
767 let resp = ec2_client
768 .describe_internet_gateways()
769 .filters(Filter::builder().name("tag:deployer").values(tag).build())
770 .send()
771 .await?;
772 Ok(resp
773 .internet_gateways
774 .unwrap_or_default()
775 .into_iter()
776 .map(|igw| igw.internet_gateway_id.unwrap())
777 .collect())
778}
779
780pub async fn find_vpc_by_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<String, Ec2Error> {
782 let resp = ec2_client
783 .describe_internet_gateways()
784 .internet_gateway_ids(igw_id)
785 .send()
786 .await?;
787 Ok(resp.internet_gateways.unwrap()[0]
788 .attachments
789 .as_ref()
790 .unwrap()[0]
791 .vpc_id
792 .as_ref()
793 .unwrap()
794 .clone())
795}
796
797pub async fn detach_igw(
799 ec2_client: &Ec2Client,
800 igw_id: &str,
801 vpc_id: &str,
802) -> Result<(), Ec2Error> {
803 ec2_client
804 .detach_internet_gateway()
805 .internet_gateway_id(igw_id)
806 .vpc_id(vpc_id)
807 .send()
808 .await?;
809 Ok(())
810}
811
812pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
814 ec2_client
815 .delete_internet_gateway()
816 .internet_gateway_id(igw_id)
817 .send()
818 .await?;
819 Ok(())
820}
821
822pub async fn find_subnets_by_tag(
824 ec2_client: &Ec2Client,
825 tag: &str,
826) -> Result<Vec<String>, Ec2Error> {
827 let resp = ec2_client
828 .describe_subnets()
829 .filters(Filter::builder().name("tag:deployer").values(tag).build())
830 .send()
831 .await?;
832 Ok(resp
833 .subnets
834 .unwrap_or_default()
835 .into_iter()
836 .map(|subnet| subnet.subnet_id.unwrap())
837 .collect())
838}
839
840pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
842 ec2_client
843 .delete_subnet()
844 .subnet_id(subnet_id)
845 .send()
846 .await?;
847 Ok(())
848}
849
850pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
852 let resp = ec2_client
853 .describe_vpcs()
854 .filters(Filter::builder().name("tag:deployer").values(tag).build())
855 .send()
856 .await?;
857 Ok(resp
858 .vpcs
859 .unwrap_or_default()
860 .into_iter()
861 .map(|vpc| vpc.vpc_id.unwrap())
862 .collect())
863}
864
865pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
867 ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
868 Ok(())
869}
870
871pub async fn find_availability_zone(
873 client: &Ec2Client,
874 instance_types: &[String],
875) -> Result<String, Ec2Error> {
876 let offerings = client
878 .describe_instance_type_offerings()
879 .location_type("availability-zone".into())
880 .filters(
881 Filter::builder()
882 .name("instance-type")
883 .set_values(Some(instance_types.to_vec()))
884 .build(),
885 )
886 .send()
887 .await?
888 .instance_type_offerings
889 .unwrap_or_default();
890
891 let mut az_to_instance_types: HashMap<String, HashSet<String>> = HashMap::new();
893 for offering in offerings {
894 if let (Some(location), Some(instance_type)) = (
895 offering.location,
896 offering.instance_type.map(|it| it.to_string()), ) {
898 az_to_instance_types
899 .entry(location)
900 .or_default()
901 .insert(instance_type);
902 }
903 }
904
905 let required_instance_types: HashSet<String> = instance_types.iter().cloned().collect();
907
908 for (az, supported_types) in az_to_instance_types {
910 if required_instance_types.is_subset(&supported_types) {
911 return Ok(az); }
913 }
914
915 Err(Ec2Error::from(BuildError::other(format!(
917 "no availability zone supports all required instance types: {instance_types:?}"
918 ))))
919}
920
921pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
923 loop {
924 let resp = ec2_client
925 .describe_network_interfaces()
926 .filters(Filter::builder().name("group-id").values(sg_id).build())
927 .send()
928 .await?;
929 if resp.network_interfaces.unwrap_or_default().is_empty() {
930 return Ok(());
931 }
932 sleep(RETRY_INTERVAL).await;
933 }
934}