use super::{METRICS_PORT, SYSTEM_PORT};
use crate::aws::{
utils::{exact_cidr, DEPLOYER_MAX_PORT, DEPLOYER_MIN_PORT, DEPLOYER_PROTOCOL, RETRY_INTERVAL},
PortConfig,
};
use aws_config::BehaviorVersion;
pub use aws_config::Region;
use aws_sdk_ec2::{
error::BuildError,
primitives::Blob,
types::{
BlockDeviceMapping, EbsBlockDevice, Filter, InstanceStateName, ResourceType, SecurityGroup,
SummaryStatus, Tag, TagSpecification, VpcPeeringConnectionStateReasonCode,
},
Error as Ec2Error,
};
pub use aws_sdk_ec2::{
types::{InstanceType, IpPermission, IpRange, UserIdGroupPair, VolumeType},
Client as Ec2Client,
};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
time::Duration,
};
use tokio::time::sleep;
use tracing::debug;
pub async fn create_client(region: Region) -> Ec2Client {
let retry = aws_config::retry::RetryConfig::adaptive()
.with_max_attempts(u32::MAX)
.with_initial_backoff(Duration::from_millis(500))
.with_max_backoff(Duration::from_secs(30))
.with_reconnect_mode(aws_sdk_ec2::config::retry::ReconnectMode::ReconnectOnTransientError);
let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
.region(region)
.retry_config(retry)
.load()
.await;
Ec2Client::new(&config)
}
pub async fn import_key_pair(
client: &Ec2Client,
key_name: &str,
public_key: &str,
) -> Result<(), Ec2Error> {
let blob = Blob::new(public_key.as_bytes());
client
.import_key_pair()
.key_name(key_name)
.public_key_material(blob)
.send()
.await?;
Ok(())
}
pub async fn delete_key_pair(client: &Ec2Client, key_name: &str) -> Result<(), Ec2Error> {
client.delete_key_pair().key_name(key_name).send().await?;
Ok(())
}
pub(crate) async fn detect_architecture(
client: &Ec2Client,
instance_type: &str,
) -> Result<super::Architecture, Ec2Error> {
let response = client
.describe_instance_types()
.instance_types(InstanceType::try_parse(instance_type).expect("invalid instance type"))
.send()
.await?;
let instance_info = response
.instance_types
.and_then(|types| types.into_iter().next())
.ok_or_else(|| {
Ec2Error::from(BuildError::other(format!(
"instance type {instance_type} not found"
)))
})?;
let architectures = instance_info
.processor_info
.and_then(|p| p.supported_architectures)
.unwrap_or_default();
if architectures.iter().any(|a| a.as_ref() == "arm64") {
Ok(super::Architecture::Arm64)
} else if architectures.iter().any(|a| a.as_ref() == "x86_64") {
Ok(super::Architecture::X86_64)
} else {
Err(Ec2Error::from(BuildError::other(format!(
"instance type {instance_type} has no supported architecture"
))))
}
}
pub(crate) async fn find_latest_ami(
client: &Ec2Client,
architecture: super::Architecture,
) -> Result<String, Ec2Error> {
let arch = architecture.as_str();
let resp = client
.describe_images()
.filters(
Filter::builder()
.name("name")
.values(format!(
"ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-{arch}-server-*"
))
.build(),
)
.filters(
Filter::builder()
.name("root-device-type")
.values("ebs")
.build(),
)
.owners("099720109477") .send()
.await?;
let mut images = resp.images.unwrap_or_default();
if images.is_empty() {
return Err(Ec2Error::from(BuildError::other(
"No matching AMI found".to_string(),
)));
}
images.sort_by(|a, b| b.creation_date().cmp(&a.creation_date()));
let latest_ami = images[0].image_id().unwrap();
Ok(latest_ami.to_string())
}
pub async fn create_vpc(
client: &Ec2Client,
cidr_block: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let resp = client
.create_vpc()
.cidr_block(cidr_block)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::Vpc)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
Ok(resp.vpc.unwrap().vpc_id.unwrap())
}
pub async fn create_and_attach_igw(
client: &Ec2Client,
vpc_id: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let igw_resp = client
.create_internet_gateway()
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::InternetGateway)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
let igw_id = igw_resp
.internet_gateway
.unwrap()
.internet_gateway_id
.unwrap();
client
.attach_internet_gateway()
.internet_gateway_id(&igw_id)
.vpc_id(vpc_id)
.send()
.await?;
Ok(igw_id)
}
pub async fn create_route_table(
client: &Ec2Client,
vpc_id: &str,
igw_id: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let rt_resp = client
.create_route_table()
.vpc_id(vpc_id)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::RouteTable)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
let rt_id = rt_resp.route_table.unwrap().route_table_id.unwrap();
client
.create_route()
.route_table_id(&rt_id)
.destination_cidr_block("0.0.0.0/0")
.gateway_id(igw_id)
.send()
.await?;
Ok(rt_id)
}
pub async fn create_subnet(
client: &Ec2Client,
vpc_id: &str,
route_table_id: &str,
subnet_cidr: &str,
availability_zone: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let subnet_resp = client
.create_subnet()
.vpc_id(vpc_id)
.cidr_block(subnet_cidr)
.availability_zone(availability_zone)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::Subnet)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
let subnet_id = subnet_resp.subnet.unwrap().subnet_id.unwrap();
client
.associate_route_table()
.route_table_id(route_table_id)
.subnet_id(&subnet_id)
.send()
.await?;
Ok(subnet_id)
}
pub async fn create_security_group_monitoring(
client: &Ec2Client,
vpc_id: &str,
deployer_ip: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let sg_resp = client
.create_security_group()
.group_name(tag)
.description("Security group for monitoring instance")
.vpc_id(vpc_id)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::SecurityGroup)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
let sg_id = sg_resp.group_id.unwrap();
client
.authorize_security_group_ingress()
.group_id(&sg_id)
.ip_permissions(
IpPermission::builder()
.ip_protocol(DEPLOYER_PROTOCOL)
.from_port(DEPLOYER_MIN_PORT)
.to_port(DEPLOYER_MAX_PORT)
.ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
.build(),
)
.send()
.await?;
Ok(sg_id)
}
pub async fn create_security_group_binary(
client: &Ec2Client,
vpc_id: &str,
deployer_ip: &str,
tag: &str,
ports: &[PortConfig],
) -> Result<String, Ec2Error> {
let sg_resp = client
.create_security_group()
.group_name(format!("{tag}-binary"))
.description("Security group for binary instances")
.vpc_id(vpc_id)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::SecurityGroup)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
let sg_id = sg_resp.group_id.unwrap();
let mut builder = client
.authorize_security_group_ingress()
.group_id(&sg_id)
.ip_permissions(
IpPermission::builder()
.ip_protocol(DEPLOYER_PROTOCOL)
.from_port(DEPLOYER_MIN_PORT)
.to_port(DEPLOYER_MAX_PORT)
.ip_ranges(IpRange::builder().cidr_ip(exact_cidr(deployer_ip)).build())
.build(),
);
for port in ports {
builder = builder.ip_permissions(
IpPermission::builder()
.ip_protocol(&port.protocol)
.from_port(port.port as i32)
.to_port(port.port as i32)
.ip_ranges(IpRange::builder().cidr_ip(&port.cidr).build())
.build(),
);
}
builder.send().await?;
Ok(sg_id)
}
pub async fn add_monitoring_ingress(
client: &Ec2Client,
sg_id: &str,
monitoring_ip: &str,
) -> Result<(), Ec2Error> {
client
.authorize_security_group_ingress()
.group_id(sg_id)
.ip_permissions(
IpPermission::builder()
.ip_protocol("tcp")
.from_port(METRICS_PORT as i32)
.to_port(METRICS_PORT as i32)
.ip_ranges(
IpRange::builder()
.cidr_ip(exact_cidr(monitoring_ip))
.build(),
)
.build(),
)
.ip_permissions(
IpPermission::builder()
.ip_protocol("tcp")
.from_port(SYSTEM_PORT as i32)
.to_port(SYSTEM_PORT as i32)
.ip_ranges(
IpRange::builder()
.cidr_ip(exact_cidr(monitoring_ip))
.build(),
)
.build(),
)
.send()
.await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn try_launch_instances(
client: &Ec2Client,
ami_id: &str,
instance_type: InstanceType,
storage_size: i32,
storage_class: VolumeType,
key_name: &str,
subnet_id: &str,
sg_id: &str,
count: i32,
name: &str,
tag: &str,
) -> Result<Vec<String>, Ec2Error> {
let resp = client
.run_instances()
.image_id(ami_id)
.instance_type(instance_type)
.key_name(key_name)
.min_count(count)
.max_count(count)
.network_interfaces(
aws_sdk_ec2::types::InstanceNetworkInterfaceSpecification::builder()
.associate_public_ip_address(true)
.device_index(0)
.subnet_id(subnet_id)
.groups(sg_id)
.build(),
)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::Instance)
.set_tags(Some(vec![
Tag::builder().key("deployer").value(tag).build(),
Tag::builder().key("name").value(name).build(),
]))
.build(),
)
.block_device_mappings(
BlockDeviceMapping::builder()
.device_name("/dev/sda1")
.ebs(
EbsBlockDevice::builder()
.volume_size(storage_size)
.volume_type(storage_class)
.delete_on_termination(true)
.build(),
)
.build(),
)
.send()
.await?;
Ok(resp
.instances
.unwrap()
.into_iter()
.map(|i| i.instance_id.unwrap())
.collect())
}
fn is_subnet_fallback_error(e: &Ec2Error) -> bool {
let error_str = e.to_string();
error_str.contains("InsufficientInstanceCapacity")
|| error_str.contains("InsufficientFreeAddressesInSubnet")
}
fn is_fatal_ec2_error(e: &Ec2Error) -> bool {
let error_str = e.to_string();
error_str.contains("VcpuLimitExceeded")
|| error_str.contains("InstanceLimitExceeded")
|| error_str.contains("MaxSpotInstanceCountExceeded")
|| error_str.contains("VolumeLimitExceeded")
|| error_str.contains("InvalidParameterValue")
|| error_str.contains("InvalidAMIID")
|| error_str.contains("InvalidSubnetID")
|| error_str.contains("InvalidGroup")
|| error_str.contains("InvalidKeyPair")
}
#[allow(clippy::too_many_arguments)]
pub async fn launch_instances(
client: &Ec2Client,
ami_id: &str,
instance_type: InstanceType,
storage_size: i32,
storage_class: VolumeType,
key_name: &str,
subnets: &[(String, String)],
az_support: &BTreeMap<String, BTreeSet<String>>,
start_idx: usize,
sg_id: &str,
count: i32,
name: &str,
tag: &str,
) -> Result<(Vec<String>, String), super::Error> {
let instance_type_str = instance_type.to_string();
let eligible: Vec<(&str, &str)> = subnets
.iter()
.filter(|(az, _)| {
az_support
.get(az)
.is_some_and(|types| types.contains(&instance_type_str))
})
.map(|(az, subnet_id)| (az.as_str(), subnet_id.as_str()))
.collect();
if eligible.is_empty() {
return Err(super::Error::UnsupportedInstanceType(instance_type_str));
}
let len = eligible.len();
let mut last_error = None;
for i in 0..len {
let (az, subnet_id) = eligible[(start_idx + i) % len];
let mut attempt = 0u32;
loop {
match try_launch_instances(
client,
ami_id,
instance_type.clone(),
storage_size,
storage_class.clone(),
key_name,
subnet_id,
sg_id,
count,
name,
tag,
)
.await
{
Ok(ids) => return Ok((ids, az.to_string())),
Err(e) => {
if is_fatal_ec2_error(&e) {
return Err(super::Error::AwsEc2(e));
}
if is_subnet_fallback_error(&e) {
debug!(
name = name,
subnets_remaining = len - i - 1,
error = %e,
"capacity error, trying next subnet"
);
last_error = Some(e);
break;
}
debug!(
name = name,
attempt = attempt + 1,
error = %e,
"launch_instances failed, retrying"
);
attempt = attempt.saturating_add(1);
let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
sleep(backoff).await;
}
}
}
}
Err(last_error.map_or(super::Error::NoSubnetsAvailable, super::Error::AwsEc2))
}
pub async fn wait_for_instances_running(
client: &Ec2Client,
instance_ids: &[String],
) -> Result<Vec<String>, Ec2Error> {
let mut discovered_ips: HashMap<String, String> = HashMap::new();
let mut pending_ids: HashSet<String> = instance_ids.iter().cloned().collect();
let mut attempt = 0u32;
loop {
let query_ids: Vec<String> = pending_ids.iter().cloned().collect();
let resp = match client
.describe_instances()
.set_instance_ids(Some(query_ids))
.send()
.await
{
Ok(resp) => {
attempt = 0;
resp
}
Err(e) => {
attempt = attempt.saturating_add(1);
debug!(
pending = pending_ids.len(),
attempt = attempt,
error = %e,
"describe_instances failed, retrying"
);
sleep(RETRY_INTERVAL).await;
continue;
}
};
for reservation in resp.reservations.unwrap_or_default() {
for instance in reservation.instances.unwrap_or_default() {
let id = match instance.instance_id {
Some(id) => id,
None => continue,
};
let is_running = instance.state.as_ref().and_then(|s| s.name.as_ref())
== Some(&InstanceStateName::Running);
if is_running {
if let Some(ip) = instance.public_ip_address {
discovered_ips.insert(id.clone(), ip);
pending_ids.remove(&id);
}
}
}
}
if pending_ids.is_empty() {
return Ok(instance_ids
.iter()
.map(|id| discovered_ips.remove(id).unwrap())
.collect());
}
sleep(RETRY_INTERVAL).await;
}
}
pub async fn wait_for_instances_ready(
client: &Ec2Client,
instance_ids: &[String],
) -> Result<(), Ec2Error> {
loop {
let Ok(resp) = client
.describe_instance_status()
.set_instance_ids(Some(instance_ids.to_vec()))
.include_all_instances(true) .send()
.await
else {
sleep(RETRY_INTERVAL).await;
continue;
};
let statuses = resp.instance_statuses.unwrap_or_default();
let all_ready = statuses.iter().all(|s| {
s.instance_state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Running
&& s.system_status.as_ref().unwrap().status.as_ref().unwrap() == &SummaryStatus::Ok
&& s.instance_status.as_ref().unwrap().status.as_ref().unwrap()
== &SummaryStatus::Ok
});
if !all_ready {
sleep(RETRY_INTERVAL).await;
continue;
}
return Ok(());
}
}
pub async fn get_private_ip(client: &Ec2Client, instance_id: &str) -> Result<String, Ec2Error> {
let resp = client
.describe_instances()
.instance_ids(instance_id)
.send()
.await?;
let reservations = resp.reservations.unwrap();
let instance = &reservations[0].instances.as_ref().unwrap()[0];
Ok(instance.private_ip_address.as_ref().unwrap().clone())
}
pub async fn create_vpc_peering_connection(
client: &Ec2Client,
requester_vpc_id: &str,
peer_vpc_id: &str,
peer_region: &str,
tag: &str,
) -> Result<String, Ec2Error> {
let resp = client
.create_vpc_peering_connection()
.vpc_id(requester_vpc_id)
.peer_vpc_id(peer_vpc_id)
.peer_region(peer_region)
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::VpcPeeringConnection)
.tags(Tag::builder().key("deployer").value(tag).build())
.build(),
)
.send()
.await?;
Ok(resp
.vpc_peering_connection
.unwrap()
.vpc_peering_connection_id
.unwrap())
}
pub async fn wait_for_vpc_peering_connection(
client: &Ec2Client,
peer_id: &str,
) -> Result<(), Ec2Error> {
loop {
if let Ok(resp) = client
.describe_vpc_peering_connections()
.vpc_peering_connection_ids(peer_id)
.send()
.await
{
if let Some(connections) = resp.vpc_peering_connections {
if let Some(connection) = connections.first() {
if connection.status.as_ref().unwrap().code
== Some(VpcPeeringConnectionStateReasonCode::PendingAcceptance)
{
return Ok(());
}
}
}
}
sleep(Duration::from_secs(2)).await;
}
}
pub async fn accept_vpc_peering_connection(
client: &Ec2Client,
peer_id: &str,
) -> Result<(), Ec2Error> {
client
.accept_vpc_peering_connection()
.vpc_peering_connection_id(peer_id)
.send()
.await?;
Ok(())
}
pub async fn add_route(
client: &Ec2Client,
route_table_id: &str,
destination_cidr: &str,
peer_id: &str,
) -> Result<(), Ec2Error> {
client
.create_route()
.route_table_id(route_table_id)
.destination_cidr_block(destination_cidr)
.vpc_peering_connection_id(peer_id)
.send()
.await?;
Ok(())
}
pub async fn find_vpc_peering_by_tag(
client: &Ec2Client,
tag: &str,
) -> Result<Vec<String>, Ec2Error> {
let resp = client
.describe_vpc_peering_connections()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.vpc_peering_connections
.unwrap_or_default()
.into_iter()
.map(|p| p.vpc_peering_connection_id.unwrap())
.collect())
}
pub async fn delete_vpc_peering(client: &Ec2Client, peering_id: &str) -> Result<(), Ec2Error> {
client
.delete_vpc_peering_connection()
.vpc_peering_connection_id(peering_id)
.send()
.await?;
Ok(())
}
pub async fn wait_for_vpc_peering_deletion(
ec2_client: &Ec2Client,
peer_id: &str,
) -> Result<(), Ec2Error> {
loop {
let resp = ec2_client
.describe_vpc_peering_connections()
.vpc_peering_connection_ids(peer_id)
.send()
.await?;
if let Some(connections) = resp.vpc_peering_connections {
if let Some(connection) = connections.first() {
if connection.status.as_ref().unwrap().code
== Some(VpcPeeringConnectionStateReasonCode::Deleted)
{
return Ok(());
}
} else {
return Ok(());
}
} else {
return Ok(());
}
sleep(RETRY_INTERVAL).await;
}
}
pub async fn find_instances_by_tag(
ec2_client: &Ec2Client,
tag: &str,
) -> Result<Vec<String>, Ec2Error> {
let resp = ec2_client
.describe_instances()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.reservations
.unwrap_or_default()
.into_iter()
.flat_map(|r| r.instances.unwrap_or_default())
.map(|i| i.instance_id.unwrap())
.collect())
}
pub async fn terminate_instances(
ec2_client: &Ec2Client,
instance_ids: &[String],
) -> Result<(), Ec2Error> {
if instance_ids.is_empty() {
return Ok(());
}
ec2_client
.terminate_instances()
.set_instance_ids(Some(instance_ids.to_vec()))
.send()
.await?;
Ok(())
}
pub async fn wait_for_instances_terminated(
ec2_client: &Ec2Client,
instance_ids: &[String],
) -> Result<(), Ec2Error> {
loop {
let resp = ec2_client
.describe_instances()
.set_instance_ids(Some(instance_ids.to_vec()))
.send()
.await?;
let instances = resp
.reservations
.unwrap_or_default()
.into_iter()
.flat_map(|r| r.instances.unwrap_or_default())
.collect::<Vec<_>>();
if instances.iter().all(|i| {
i.state.as_ref().unwrap().name.as_ref().unwrap() == &InstanceStateName::Terminated
}) {
return Ok(());
}
sleep(RETRY_INTERVAL).await;
}
}
pub async fn find_security_groups_by_tag(
ec2_client: &Ec2Client,
tag: &str,
) -> Result<Vec<SecurityGroup>, Ec2Error> {
let resp = ec2_client
.describe_security_groups()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.security_groups
.unwrap_or_default()
.into_iter()
.collect())
}
pub async fn delete_security_group(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
ec2_client
.delete_security_group()
.group_id(sg_id)
.send()
.await?;
Ok(())
}
pub async fn find_route_tables_by_tag(
ec2_client: &Ec2Client,
tag: &str,
) -> Result<Vec<String>, Ec2Error> {
let resp = ec2_client
.describe_route_tables()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.route_tables
.unwrap_or_default()
.into_iter()
.map(|rt| rt.route_table_id.unwrap())
.collect())
}
pub async fn delete_route_table(ec2_client: &Ec2Client, rt_id: &str) -> Result<(), Ec2Error> {
ec2_client
.delete_route_table()
.route_table_id(rt_id)
.send()
.await?;
Ok(())
}
pub async fn find_igws_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
let resp = ec2_client
.describe_internet_gateways()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.internet_gateways
.unwrap_or_default()
.into_iter()
.map(|igw| igw.internet_gateway_id.unwrap())
.collect())
}
pub async fn find_vpc_by_igw(
ec2_client: &Ec2Client,
igw_id: &str,
) -> Result<Option<String>, Ec2Error> {
let resp = ec2_client
.describe_internet_gateways()
.internet_gateway_ids(igw_id)
.send()
.await?;
Ok(resp
.internet_gateways
.and_then(|gws| gws.into_iter().next())
.and_then(|gw| gw.attachments)
.and_then(|attachments| attachments.into_iter().next())
.and_then(|attachment| attachment.vpc_id))
}
pub async fn get_enabled_regions(ec2_client: &Ec2Client) -> Result<HashSet<String>, Ec2Error> {
let resp = ec2_client
.describe_regions()
.all_regions(true)
.filters(
Filter::builder()
.name("opt-in-status")
.values("opt-in-not-required")
.values("opted-in")
.build(),
)
.send()
.await?;
Ok(resp
.regions
.unwrap_or_default()
.into_iter()
.filter_map(|r| r.region_name)
.collect())
}
pub async fn detach_igw(
ec2_client: &Ec2Client,
igw_id: &str,
vpc_id: &str,
) -> Result<(), Ec2Error> {
ec2_client
.detach_internet_gateway()
.internet_gateway_id(igw_id)
.vpc_id(vpc_id)
.send()
.await?;
Ok(())
}
pub async fn delete_igw(ec2_client: &Ec2Client, igw_id: &str) -> Result<(), Ec2Error> {
ec2_client
.delete_internet_gateway()
.internet_gateway_id(igw_id)
.send()
.await?;
Ok(())
}
pub async fn find_subnets_by_tag(
ec2_client: &Ec2Client,
tag: &str,
) -> Result<Vec<String>, Ec2Error> {
let resp = ec2_client
.describe_subnets()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.subnets
.unwrap_or_default()
.into_iter()
.map(|subnet| subnet.subnet_id.unwrap())
.collect())
}
pub async fn delete_subnet(ec2_client: &Ec2Client, subnet_id: &str) -> Result<(), Ec2Error> {
ec2_client
.delete_subnet()
.subnet_id(subnet_id)
.send()
.await?;
Ok(())
}
pub async fn find_vpcs_by_tag(ec2_client: &Ec2Client, tag: &str) -> Result<Vec<String>, Ec2Error> {
let resp = ec2_client
.describe_vpcs()
.filters(Filter::builder().name("tag:deployer").values(tag).build())
.send()
.await?;
Ok(resp
.vpcs
.unwrap_or_default()
.into_iter()
.map(|vpc| vpc.vpc_id.unwrap())
.collect())
}
pub async fn delete_vpc(ec2_client: &Ec2Client, vpc_id: &str) -> Result<(), Ec2Error> {
ec2_client.delete_vpc().vpc_id(vpc_id).send().await?;
Ok(())
}
pub async fn find_az_instance_support(
client: &Ec2Client,
instance_types: &[String],
) -> Result<BTreeMap<String, BTreeSet<String>>, Ec2Error> {
let offerings = client
.describe_instance_type_offerings()
.location_type("availability-zone".into())
.filters(
Filter::builder()
.name("instance-type")
.set_values(Some(instance_types.to_vec()))
.build(),
)
.send()
.await?
.instance_type_offerings
.unwrap_or_default();
let mut az_to_instance_types: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
for offering in offerings {
if let (Some(location), Some(instance_type)) = (
offering.location,
offering.instance_type.map(|it| it.to_string()),
) {
az_to_instance_types
.entry(location)
.or_default()
.insert(instance_type);
}
}
if az_to_instance_types.is_empty() {
return Err(Ec2Error::from(BuildError::other(format!(
"no availability zone supports any of: {instance_types:?}"
))));
}
Ok(az_to_instance_types)
}
pub async fn wait_for_enis_deleted(ec2_client: &Ec2Client, sg_id: &str) -> Result<(), Ec2Error> {
loop {
let resp = ec2_client
.describe_network_interfaces()
.filters(Filter::builder().name("group-id").values(sg_id).build())
.send()
.await?;
if resp.network_interfaces.unwrap_or_default().is_empty() {
return Ok(());
}
sleep(RETRY_INTERVAL).await;
}
}