1use super::{METRICS_PORT, SYSTEM_PORT};
4use crate::aws::{
5 utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
6 PortConfig,
7};
8use aws_config::BehaviorVersion;
9pub use aws_config::Region;
10use aws_sdk_ec2::{
11 error::BuildError,
12 primitives::Blob,
13 types::{
14 BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
15 SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
16 },
17 Error as Ec2Error,
18};
19pub use aws_sdk_ec2::{
20 types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType},
21 Client as Ec2Client,
22};
23use std::{
24 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
25 time::Duration,
26};
27use tokio::time::sleep;
28use tracing::debug;
29
30pub async fn create_client(region: Region) -> Ec2Client {
32 let retry = aws_config::retry::RetryConfig::adaptive()
33 .with_max_attempts(u32::MAX)
34 .with_initial_backoff(Duration::from_millis(500))
35 .with_max_backoff(Duration::from_secs(30))
36 .with_reconnect_mode(aws_sdk_ec2::config::retry::ReconnectMode::ReconnectOnTransientError);
37 let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
38 .region(region)
39 .retry_config(retry)
40 .load()
41 .await;
42 Ec2Client::new(&config)
43}
44
45pub async fn import_key_pair(
47 client: &Ec2Client,
48 key_name: &str,
49 public_key: &str,
50) -> Result<(), Ec2Error> {
51 let blob = Blob::new(public_key.as_bytes());
52 client
53 .import_key_pair()
54 .key_name(key_name)
55 .public_key_material(blob)
56 .send()
57 .await?;
58 Ok(())
59}
60
61pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
63 client.delete_key_pair().key_name(key_name).send().await?;
64 Ok(())
65}
66
67pub(crate) async fn detect_architecture(
69 client: &Ec2Client,
70 instance_type: &str,
71) -> Result<super::Architecture, Ec2Error> {
72 let response = client
73 .describe_instance_types()
74 .instance_types(InstanceType::try_parse(instance_type).expect("invalid instance type"))
75 .send()
76 .await?;
77
78 let instance_info = response
79 .instance_types
80 .and_then(|types| types.into_iter().next())
81 .ok_or_else(|| {
82 Ec2Error::from(BuildError::other(format!(
83 "instance type {instance_type} not found"
84 )))
85 })?;
86
87 let architectures = instance_info
88 .processor_info
89 .and_then(|p| p.supported_architectures)
90 .unwrap_or_default();
91
92 if architectures.iter().any(|a| a.as_ref() == "arm64") {
95 Ok(super::Architecture::Arm64)
96 } else if architectures.iter().any(|a| a.as_ref() == "x86_64") {
97 Ok(super::Architecture::X86_64)
98 } else {
99 Err(Ec2Error::from(BuildError::other(format!(
100 "instance type {instance_type} has no supported architecture"
101 ))))
102 }
103}
104
105pub(crate) async fn find_latest_ami(
107 client: &Ec2Client,
108 architecture: super::Architecture,
109) -> Result<String, Ec2Error> {
110 let arch = architecture.as_str();
111 let resp = client
112 .describe_images()
113 .filters(
114 Filter::builder()
115 .name("name")
116 .values(format!(
117 "ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-{arch}-server-*"
118 ))
119 .build(),
120 )
121 .filters(
122 Filter::builder()
123 .name("root-device-type")
124 .values("ebs")
125 .build(),
126 )
127 .owners("099720109477") .send()
129 .await?;
130 let mut images = resp.images.unwrap_or_default();
131 if images.is_empty() {
132 return Err(Ec2Error::from(BuildError::other(
133 "No matching AMI found".to_string(),
134 )));
135 }
136 images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
137 let latest_ami = images[0].image_id().unwrap();
138 Ok(latest_ami.to_string())
139}
140
141pub async fn create_vpc(
143 client: &Ec2Client,
144 cidr_block: &str,
145 tag: &str,
146) -> Result<String, Ec2Error> {
147 let resp = client
148 .create_vpc()
149 .cidr_block(cidr_block)
150 .tag_specifications(
151 TagSpecification::builder()
152 .resource_type(ResourceType::Vpc)
153 .tags(Tag::builder().key("deployer").value(tag).build())
154 .build(),
155 )
156 .send()
157 .await?;
158 Ok(resp.vpc.unwrap().vpc_id.unwrap())
159}
160
161pub async fn create_and_attach_igw(
163 client: &Ec2Client,
164 vpc_id: &str,
165 tag: &str,
166) -> Result<String, Ec2Error> {
167 let igw_resp = client
168 .create_internet_gateway()
169 .tag_specifications(
170 TagSpecification::builder()
171 .resource_type(ResourceType::InternetGateway)
172 .tags(Tag::builder().key("deployer").value(tag).build())
173 .build(),
174 )
175 .send()
176 .await?;
177 let igw_id = igw_resp
178 .internet_gateway
179 .unwrap()
180 .internet_gateway_id
181 .unwrap();
182 client
183 .attach_internet_gateway()
184 .internet_gateway_id(&igw_id)
185 .vpc_id(vpc_id)
186 .send()
187 .await?;
188 Ok(igw_id)
189}
190
191pub async fn create_route_table(
193 client: &Ec2Client,
194 vpc_id: &str,
195 igw_id: &str,
196 tag: &str,
197) -> Result<String, Ec2Error> {
198 let rt_resp = client
199 .create_route_table()
200 .vpc_id(vpc_id)
201 .tag_specifications(
202 TagSpecification::builder()
203 .resource_type(ResourceType::RouteTable)
204 .tags(Tag::builder().key("deployer").value(tag).build())
205 .build(),
206 )
207 .send()
208 .await?;
209 let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
210 client
211 .create_route()
212 .route_table_id(&rt_id)
213 .destination_cidr_block("0.0.0.0/0")
214 .gateway_id(igw_id)
215 .send()
216 .await?;
217 Ok(rt_id)
218}
219
220pub async fn create_subnet(
222 client: &Ec2Client,
223 vpc_id: &str,
224 route_table_id: &str,
225 subnet_cidr: &str,
226 availability_zone: &str,
227 tag: &str,
228) -> Result<String, Ec2Error> {
229 let subnet_resp = client
230 .create_subnet()
231 .vpc_id(vpc_id)
232 .cidr_block(subnet_cidr)
233 .availability_zone(availability_zone)
234 .tag_specifications(
235 TagSpecification::builder()
236 .resource_type(ResourceType::Subnet)
237 .tags(Tag::builder().key("deployer").value(tag).build())
238 .build(),
239 )
240 .send()
241 .await?;
242 let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
243 client
244 .associate_route_table()
245 .route_table_id(route_table_id)
246 .subnet_id(&subnet_id)
247 .send()
248 .await?;
249 Ok(subnet_id)
250}
251
252pub async fn create_security_group_monitoring(
254 client: &Ec2Client,
255 vpc_id: &str,
256 deployer_ip: &str,
257 tag: &str,
258) -> Result<String, Ec2Error> {
259 let sg_resp = client
260 .create_security_group()
261 .group_name(tag)
262 .description("Security group for monitoring instance")
263 .vpc_id(vpc_id)
264 .tag_specifications(
265 TagSpecification::builder()
266 .resource_type(ResourceType::SecurityGroup)
267 .tags(Tag::builder().key("deployer").value(tag).build())
268 .build(),
269 )
270 .send()
271 .await?;
272 let sg_id = sg_resp.group_id.unwrap();
273 client
274 .authorize_security_group_ingress()
275 .group_id(&sg_id)
276 .ip_permissions(
277 IpPermission::builder()
278 .ip_protocol(DEPLOYER_PROTOCOL)
279 .from_port(DEPLOYER_MIN_PORT)
280 .to_port(DEPLOYER_MAX_PORT)
281 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
282 .build(),
283 )
284 .send()
285 .await?;
286 Ok(sg_id)
287}
288
289pub async fn create_security_group_binary(
292 client: &Ec2Client,
293 vpc_id: &str,
294 deployer_ip: &str,
295 tag: &str,
296 ports: &[PortConfig],
297) -> Result<String, Ec2Error> {
298 let sg_resp = client
299 .create_security_group()
300 .group_name(format!("{tag}-binary"))
301 .description("Security group for binary instances")
302 .vpc_id(vpc_id)
303 .tag_specifications(
304 TagSpecification::builder()
305 .resource_type(ResourceType::SecurityGroup)
306 .tags(Tag::builder().key("deployer").value(tag).build())
307 .build(),
308 )
309 .send()
310 .await?;
311 let sg_id = sg_resp.group_id.unwrap();
312 let mut builder = client
313 .authorize_security_group_ingress()
314 .group_id(&sg_id)
315 .ip_permissions(
316 IpPermission::builder()
317 .ip_protocol(DEPLOYER_PROTOCOL)
318 .from_port(DEPLOYER_MIN_PORT)
319 .to_port(DEPLOYER_MAX_PORT)
320 .ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
321 .build(),
322 );
323 for port in ports {
324 builder = builder.ip_permissions(
325 IpPermission::builder()
326 .ip_protocol(&port.protocol)
327 .from_port(port.port as i32)
328 .to_port(port.port as i32)
329 .ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
330 .build(),
331 );
332 }
333 builder.send().await?;
334 Ok(sg_id)
335}
336
337pub async fn add_monitoring_ingress(
339 client: &Ec2Client,
340 sg_id: &str,
341 monitoring_ip: &str,
342) -> Result<(), Ec2Error> {
343 client
344 .authorize_security_group_ingress()
345 .group_id(sg_id)
346 .ip_permissions(
347 IpPermission::builder()
348 .ip_protocol("tcp")
349 .from_port(METRICS_PORT as i32)
350 .to_port(METRICS_PORT as i32)
351 .ip_ranges(
352 IpRange::builder()
353 .cidr_ip(exact_cidr(monitoring_ip))
354 .build(),
355 )
356 .build(),
357 )
358 .ip_permissions(
359 IpPermission::builder()
360 .ip_protocol("tcp")
361 .from_port(SYSTEM_PORT as i32)
362 .to_port(SYSTEM_PORT as i32)
363 .ip_ranges(
364 IpRange::builder()
365 .cidr_ip(exact_cidr(monitoring_ip))
366 .build(),
367 )
368 .build(),
369 )
370 .send()
371 .await?;
372 Ok(())
373}
374
375#[allow(clippy::too_many_arguments)]
377async fn try_launch_instances(
378 client: &Ec2Client,
379 ami_id: &str,
380 instance_type: InstanceType,
381 storage_size: i32,
382 storage_class: VolumeType,
383 key_name: &str,
384 subnet_id: &str,
385 sg_id: &str,
386 count: i32,
387 name: &str,
388 tag: &str,
389) -> Result<Vec<String>, Ec2Error> {
390 let resp = client
391 .run_instances()
392 .image_id(ami_id)
393 .instance_type(instance_type)
394 .key_name(key_name)
395 .min_count(count)
396 .max_count(count)
397 .network_interfaces(
398 aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
399 .associate_public_ip_address(true)
400 .device_index(0)
401 .subnet_id(subnet_id)
402 .groups(sg_id)
403 .build(),
404 )
405 .tag_specifications(
406 TagSpecification::builder()
407 .resource_type(ResourceType::Instance)
408 .set_tags(Some(vec![
409 Tag::builder().key("deployer").value(tag).build(),
410 Tag::builder().key("name").value(name).build(),
411 ]))
412 .build(),
413 )
414 .block_device_mappings(
415 BlockDeviceMapping::builder()
416 .device_name("/dev/sda1")
417 .ebs(
418 EbsBlockDevice::builder()
419 .volume_size(storage_size)
420 .volume_type(storage_class)
421 .delete_on_termination(true)
422 .build(),
423 )
424 .build(),
425 )
426 .send()
427 .await?;
428 Ok(resp
429 .instances
430 .unwrap()
431 .into_iter()
432 .map(|i| i.instance_id.unwrap())
433 .collect())
434}
435
436fn is_subnet_fallback_error(e: &Ec2Error) -> bool {
438 let error_str = e.to_string();
439 error_str.contains("InsufficientInstanceCapacity")
440 || error_str.contains("InsufficientFreeAddressesInSubnet")
441}
442
443fn is_fatal_ec2_error(e: &Ec2Error) -> bool {
445 let error_str = e.to_string();
446 error_str.contains("VcpuLimitExceeded")
447 || error_str.contains("InstanceLimitExceeded")
448 || error_str.contains("MaxSpotInstanceCountExceeded")
449 || error_str.contains("VolumeLimitExceeded")
450 || error_str.contains("InvalidParameterValue")
451 || error_str.contains("InvalidAMIID")
452 || error_str.contains("InvalidSubnetID")
453 || error_str.contains("InvalidGroup")
454 || error_str.contains("InvalidKeyPair")
455}
456
457#[allow(clippy::too_many_arguments)]
461pub async fn launch_instances(
462 client: &Ec2Client,
463 ami_id: &str,
464 instance_type: InstanceType,
465 storage_size: i32,
466 storage_class: VolumeType,
467 key_name: &str,
468 subnets: &[(String, String)],
469 az_support: &BTreeMap<String, BTreeSet<String>>,
470 start_idx: usize,
471 sg_id: &str,
472 count: i32,
473 name: &str,
474 tag: &str,
475) -> Result<(Vec<String>, String), super::Error> {
476 let instance_type_str = instance_type.to_string();
478 let eligible: Vec<(&str, &str)> = subnets
479 .iter()
480 .filter(|(az, _)| {
481 az_support
482 .get(az)
483 .is_some_and(|types| types.contains(&instance_type_str))
484 })
485 .map(|(az, subnet_id)| (az.as_str(), subnet_id.as_str()))
486 .collect();
487 if eligible.is_empty() {
488 return Err(super::Error::UnsupportedInstanceType(instance_type_str));
489 }
490
491 let len = eligible.len();
493 let mut last_error = None;
494 for i in 0..len {
495 let (az, subnet_id) = eligible[(start_idx + i) % len];
496 let mut attempt = 0u32;
497 loop {
498 match try_launch_instances(
499 client,
500 ami_id,
501 instance_type.clone(),
502 storage_size,
503 storage_class.clone(),
504 key_name,
505 subnet_id,
506 sg_id,
507 count,
508 name,
509 tag,
510 )
511 .await
512 {
513 Ok(ids) => return Ok((ids, az.to_string())),
514 Err(e) => {
515 if is_fatal_ec2_error(&e) {
516 return Err(super::Error::AwsEc2(e));
517 }
518 if is_subnet_fallback_error(&e) {
519 debug!(
521 name = name,
522 subnets_remaining = len - i - 1,
523 error = %e,
524 "capacity error, trying next subnet"
525 );
526 last_error = Some(e);
527 break;
528 }
529 debug!(
530 name = name,
531 attempt = attempt + 1,
532 error = %e,
533 "launch_instances failed, retrying"
534 );
535 attempt = attempt.saturating_add(1);
536 let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
537 sleep(backoff).await;
538 }
539 }
540 }
541 }
542
543 Err(last_error.map_or(super::Error::NoSubnetsAvailable, super::Error::AwsEc2))
544}
545
546pub async fn wait_for_instances_running(
549 client: &Ec2Client,
550 instance_ids: &[String],
551) -> Result<Vec<String>, Ec2Error> {
552 let mut discovered_ips: HashMap<String, String> = HashMap::new();
554 let mut pending_ids: HashSet<String> = instance_ids.iter().cloned().collect();
555 let mut attempt = 0u32;
556 loop {
557 let query_ids: Vec<String> = pending_ids.iter().cloned().collect();
559 let resp = match client
560 .describe_instances()
561 .set_instance_ids(Some(query_ids))
562 .send()
563 .await
564 {
565 Ok(resp) => {
566 attempt = 0;
567 resp
568 }
569 Err(e) => {
570 attempt = attempt.saturating_add(1);
571 debug!(
572 pending = pending_ids.len(),
573 attempt = attempt,
574 error = %e,
575 "describe_instances failed, retrying"
576 );
577 sleep(RETRY_INTERVAL).await;
578 continue;
579 }
580 };
581
582 for reservation in resp.reservations.unwrap_or_default() {
584 for instance in reservation.instances.unwrap_or_default() {
585 let id = match instance.instance_id {
586 Some(id) => id,
587 None => continue,
588 };
589 let is_running = instance.state.as_ref().and_then(|s| s.name.as_ref())
590 == Some(&InstanceStateName::Running);
591 if is_running {
592 if let Some(ip) = instance.public_ip_address {
593 discovered_ips.insert(id.clone(), ip);
594 pending_ids.remove(&id);
595 }
596 }
597 }
598 }
599
600 if pending_ids.is_empty() {
602 return Ok(instance_ids
603 .iter()
604 .map(|id| discovered_ips.remove(id).unwrap())
605 .collect());
606 }
607
608 sleep(RETRY_INTERVAL).await;
610 }
611}
612
613pub async fn wait_for_instances_ready(
614 client: &Ec2Client,
615 instance_ids: &[String],
616) -> Result<(), Ec2Error> {
617 loop {
618 let Ok(resp) = client
620 .describe_instance_status()
621 .set_instance_ids(Some(instance_ids.to_vec()))
622 .include_all_instances(true) .send()
624 .await
625 else {
626 sleep(RETRY_INTERVAL).await;
627 continue;
628 };
629
630 let statuses = resp.instance_statuses.unwrap_or_default();
632 let all_ready = statuses.iter().all(|s| {
633 s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
634 && s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
635 && s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
636 == &SummaryStatus::Ok
637 });
638 if !all_ready {
639 sleep(RETRY_INTERVAL).await;
640 continue;
641 }
642 return Ok(());
643 }
644}
645
646pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
648 let resp = client
649 .describe_instances()
650 .instance_ids(instance_id)
651 .send()
652 .await?;
653 let reservations = resp.reservations.unwrap();
654 let instance = &reservations[0].instances.as_ref().unwrap()[0];
655 Ok(instance.private_ip_address.as_ref().unwrap().clone())
656}
657
658pub async fn create_vpc_peering_connection(
660 client: &Ec2Client,
661 requester_vpc_id: &str,
662 peer_vpc_id: &str,
663 peer_region: &str,
664 tag: &str,
665) -> Result<String, Ec2Error> {
666 let resp = client
667 .create_vpc_peering_connection()
668 .vpc_id(requester_vpc_id)
669 .peer_vpc_id(peer_vpc_id)
670 .peer_region(peer_region)
671 .tag_specifications(
672 TagSpecification::builder()
673 .resource_type(ResourceType::VpcPeeringConnection)
674 .tags(Tag::builder().key("deployer").value(tag).build())
675 .build(),
676 )
677 .send()
678 .await?;
679 Ok(resp
680 .vpc_peering_connection
681 .unwrap()
682 .vpc_peering_connection_id
683 .unwrap())
684}
685
686pub async fn wait_for_vpc_peering_connection(
688 client: &Ec2Client,
689 peer_id: &str,
690) -> Result<(), Ec2Error> {
691 loop {
692 if let Ok(resp) = client
693 .describe_vpc_peering_connections()
694 .vpc_peering_connection_ids(peer_id)
695 .send()
696 .await
697 {
698 if let Some(connections) = resp.vpc_peering_connections {
699 if let Some(connection) = connections.first() {
700 if connection.status.as_ref().unwrap().code
701 == Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
702 {
703 return Ok(());
704 }
705 }
706 }
707 }
708 sleep(Duration::from_secs(2)).await;
709 }
710}
711
712pub async fn accept_vpc_peering_connection(
714 client: &Ec2Client,
715 peer_id: &str,
716) -> Result<(), Ec2Error> {
717 client
718 .accept_vpc_peering_connection()
719 .vpc_peering_connection_id(peer_id)
720 .send()
721 .await?;
722 Ok(())
723}
724
725pub async fn add_route(
727 client: &Ec2Client,
728 route_table_id: &str,
729 destination_cidr: &str,
730 peer_id: &str,
731) -> Result<(), Ec2Error> {
732 client
733 .create_route()
734 .route_table_id(route_table_id)
735 .destination_cidr_block(destination_cidr)
736 .vpc_peering_connection_id(peer_id)
737 .send()
738 .await?;
739 Ok(())
740}
741
742pub async fn find_vpc_peering_by_tag(
744 client: &Ec2Client,
745 tag: &str,
746) -> Result<Vec<String>, Ec2Error> {
747 let resp = client
748 .describe_vpc_peering_connections()
749 .filters(Filter::builder().name("tag:deployer").values(tag).build())
750 .send()
751 .await?;
752 Ok(resp
753 .vpc_peering_connections
754 .unwrap_or_default()
755 .into_iter()
756 .map(|p| p.vpc_peering_connection_id.unwrap())
757 .collect())
758}
759
760pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
762 client
763 .delete_vpc_peering_connection()
764 .vpc_peering_connection_id(peering_id)
765 .send()
766 .await?;
767 Ok(())
768}
769
770pub async fn wait_for_vpc_peering_deletion(
772 ec2_client: &Ec2Client,
773 peer_id: &str,
774) -> Result<(), Ec2Error> {
775 loop {
776 let resp = ec2_client
777 .describe_vpc_peering_connections()
778 .vpc_peering_connection_ids(peer_id)
779 .send()
780 .await?;
781 if let Some(connections) = resp.vpc_peering_connections {
782 if let Some(connection) = connections.first() {
783 if connection.status.as_ref().unwrap().code
784 == Some(VpcPeeringConnectionStateReasonCode::Deleted)
785 {
786 return Ok(());
787 }
788 } else {
789 return Ok(());
790 }
791 } else {
792 return Ok(());
793 }
794 sleep(RETRY_INTERVAL).await;
795 }
796}
797
798pub async fn find_instances_by_tag(
800 ec2_client: &Ec2Client,
801 tag: &str,
802) -> Result<Vec<String>, Ec2Error> {
803 let resp = ec2_client
804 .describe_instances()
805 .filters(Filter::builder().name("tag:deployer").values(tag).build())
806 .send()
807 .await?;
808 Ok(resp
809 .reservations
810 .unwrap_or_default()
811 .into_iter()
812 .flat_map(|r| r.instances.unwrap_or_default())
813 .map(|i| i.instance_id.unwrap())
814 .collect())
815}
816
817pub async fn terminate_instances(
819 ec2_client: &Ec2Client,
820 instance_ids: &[String],
821) -> Result<(), Ec2Error> {
822 if instance_ids.is_empty() {
823 return Ok(());
824 }
825 ec2_client
826 .terminate_instances()
827 .set_instance_ids(Some(instance_ids.to_vec()))
828 .send()
829 .await?;
830 Ok(())
831}
832
833pub async fn wait_for_instances_terminated(
835 ec2_client: &Ec2Client,
836 instance_ids: &[String],
837) -> Result<(), Ec2Error> {
838 loop {
839 let resp = ec2_client
840 .describe_instances()
841 .set_instance_ids(Some(instance_ids.to_vec()))
842 .send()
843 .await?;
844 let instances = resp
845 .reservations
846 .unwrap_or_default()
847 .into_iter()
848 .flat_map(|r| r.instances.unwrap_or_default())
849 .collect::<Vec<_>>();
850 if instances.iter().all(|i| {
851 i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
852 }) {
853 return Ok(());
854 }
855 sleep(RETRY_INTERVAL).await;
856 }
857}
858
859pub async fn find_security_groups_by_tag(
861 ec2_client: &Ec2Client,
862 tag: &str,
863) -> Result<Vec<SecurityGroup>, Ec2Error> {
864 let resp = ec2_client
865 .describe_security_groups()
866 .filters(Filter::builder().name("tag:deployer").values(tag).build())
867 .send()
868 .await?;
869 Ok(resp
870 .security_groups
871 .unwrap_or_default()
872 .into_iter()
873 .collect())
874}
875
876pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
878 ec2_client
879 .delete_security_group()
880 .group_id(sg_id)
881 .send()
882 .await?;
883 Ok(())
884}
885
886pub async fn find_route_tables_by_tag(
888 ec2_client: &Ec2Client,
889 tag: &str,
890) -> Result<Vec<String>, Ec2Error> {
891 let resp = ec2_client
892 .describe_route_tables()
893 .filters(Filter::builder().name("tag:deployer").values(tag).build())
894 .send()
895 .await?;
896 Ok(resp
897 .route_tables
898 .unwrap_or_default()
899 .into_iter()
900 .map(|rt| rt.route_table_id.unwrap())
901 .collect())
902}
903
904pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
906 ec2_client
907 .delete_route_table()
908 .route_table_id(rt_id)
909 .send()
910 .await?;
911 Ok(())
912}
913
914pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
916 let resp = ec2_client
917 .describe_internet_gateways()
918 .filters(Filter::builder().name("tag:deployer").values(tag).build())
919 .send()
920 .await?;
921 Ok(resp
922 .internet_gateways
923 .unwrap_or_default()
924 .into_iter()
925 .map(|igw| igw.internet_gateway_id.unwrap())
926 .collect())
927}
928
929pub async fn find_vpc_by_igw(
931 ec2_client: &Ec2Client,
932 igw_id: &str,
933) -> Result<Option<String>, Ec2Error> {
934 let resp = ec2_client
935 .describe_internet_gateways()
936 .internet_gateway_ids(igw_id)
937 .send()
938 .await?;
939 Ok(resp
940 .internet_gateways
941 .and_then(|gws| gws.into_iter().next())
942 .and_then(|gw| gw.attachments)
943 .and_then(|attachments| attachments.into_iter().next())
944 .and_then(|attachment| attachment.vpc_id))
945}
946
947pub async fn get_enabled_regions(ec2_client: &Ec2Client) -> Result<HashSet<String>, Ec2Error> {
949 let resp = ec2_client
950 .describe_regions()
951 .all_regions(true)
952 .filters(
953 Filter::builder()
954 .name("opt-in-status")
955 .values("opt-in-not-required")
956 .values("opted-in")
957 .build(),
958 )
959 .send()
960 .await?;
961 Ok(resp
962 .regions
963 .unwrap_or_default()
964 .into_iter()
965 .filter_map(|r| r.region_name)
966 .collect())
967}
968
969pub async fn detach_igw(
971 ec2_client: &Ec2Client,
972 igw_id: &str,
973 vpc_id: &str,
974) -> Result<(), Ec2Error> {
975 ec2_client
976 .detach_internet_gateway()
977 .internet_gateway_id(igw_id)
978 .vpc_id(vpc_id)
979 .send()
980 .await?;
981 Ok(())
982}
983
984pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
986 ec2_client
987 .delete_internet_gateway()
988 .internet_gateway_id(igw_id)
989 .send()
990 .await?;
991 Ok(())
992}
993
994pub async fn find_subnets_by_tag(
996 ec2_client: &Ec2Client,
997 tag: &str,
998) -> Result<Vec<String>, Ec2Error> {
999 let resp = ec2_client
1000 .describe_subnets()
1001 .filters(Filter::builder().name("tag:deployer").values(tag).build())
1002 .send()
1003 .await?;
1004 Ok(resp
1005 .subnets
1006 .unwrap_or_default()
1007 .into_iter()
1008 .map(|subnet| subnet.subnet_id.unwrap())
1009 .collect())
1010}
1011
1012pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
1014 ec2_client
1015 .delete_subnet()
1016 .subnet_id(subnet_id)
1017 .send()
1018 .await?;
1019 Ok(())
1020}
1021
1022pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
1024 let resp = ec2_client
1025 .describe_vpcs()
1026 .filters(Filter::builder().name("tag:deployer").values(tag).build())
1027 .send()
1028 .await?;
1029 Ok(resp
1030 .vpcs
1031 .unwrap_or_default()
1032 .into_iter()
1033 .map(|vpc| vpc.vpc_id.unwrap())
1034 .collect())
1035}
1036
1037pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
1039 ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
1040 Ok(())
1041}
1042
1043pub async fn find_az_instance_support(
1045 client: &Ec2Client,
1046 instance_types: &[String],
1047) -> Result<BTreeMap<String, BTreeSet<String>>, Ec2Error> {
1048 let offerings = client
1049 .describe_instance_type_offerings()
1050 .location_type("availability-zone".into())
1051 .filters(
1052 Filter::builder()
1053 .name("instance-type")
1054 .set_values(Some(instance_types.to_vec()))
1055 .build(),
1056 )
1057 .send()
1058 .await?
1059 .instance_type_offerings
1060 .unwrap_or_default();
1061
1062 let mut az_to_instance_types: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
1064 for offering in offerings {
1065 if let (Some(location), Some(instance_type)) = (
1066 offering.location,
1067 offering.instance_type.map(|it| it.to_string()),
1068 ) {
1069 az_to_instance_types
1070 .entry(location)
1071 .or_default()
1072 .insert(instance_type);
1073 }
1074 }
1075 if az_to_instance_types.is_empty() {
1076 return Err(Ec2Error::from(BuildError::other(format!(
1077 "no availability zone supports any of: {instance_types:?}"
1078 ))));
1079 }
1080
1081 Ok(az_to_instance_types)
1082}
1083
1084pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
1086 loop {
1087 let resp = ec2_client
1088 .describe_network_interfaces()
1089 .filters(Filter::builder().name("group-id").values(sg_id).build())
1090 .send()
1091 .await?;
1092 if resp.network_interfaces.unwrap_or_default().is_empty() {
1093 return Ok(());
1094 }
1095 sleep(RETRY_INTERVAL).await;
1096 }
1097}