1use crate::aws::{
4 deployer_directory,
5 ec2::{self, *},
6 s3::{
7 self, delete_prefix, get_bucket_name, is_no_such_bucket_error, Region, DEPLOYMENTS_PREFIX,
8 },
9 Config, Error, Metadata, DESTROYED_FILE_NAME, LOGS_PORT, METADATA_FILE_NAME, MONITORING_REGION,
10 PROFILES_PORT, TRACES_PORT,
11};
12use futures::future::try_join_all;
13use std::{
14 collections::{HashMap, HashSet},
15 fs::File,
16 path::PathBuf,
17};
18use tracing::{info, warn};
19
20pub async fn destroy(config: Option<&PathBuf>, tag: Option<&str>) -> Result<(), Error> {
22 let (tag, all_regions) = if let Some(config_path) = config {
24 let config: Config = {
25 let config_file = File::open(config_path)?;
26 serde_yaml::from_reader(config_file)?
27 };
28 let mut regions = HashSet::new();
29 regions.insert(MONITORING_REGION.to_string());
30 for instance in &config.instances {
31 regions.insert(instance.region.clone());
32 }
33 (config.tag, regions)
34 } else if let Some(tag) = tag {
35 let tag_directory = deployer_directory(Some(tag));
36 let metadata_path = tag_directory.join(METADATA_FILE_NAME);
37 if !metadata_path.exists() {
38 return Err(Error::MetadataNotFound(tag.to_string()));
39 }
40 let metadata: Metadata = {
41 let file = File::open(&metadata_path)?;
42 serde_yaml::from_reader(file)?
43 };
44 (tag.to_string(), metadata.regions.into_iter().collect())
45 } else {
46 return Err(Error::MissingTagOrConfig);
47 };
48 info!(tag = tag.as_str(), "loaded configuration");
49
50 let tag_directory = deployer_directory(Some(&tag));
52 if !tag_directory.exists() {
53 return Err(Error::DeploymentDoesNotExist(tag.clone()));
54 }
55
56 let destroyed_file = tag_directory.join(DESTROYED_FILE_NAME);
58 if destroyed_file.exists() {
59 warn!("infrastructure already destroyed");
60 return Ok(());
61 }
62
63 let bucket_name = get_bucket_name();
65 info!(
66 bucket = bucket_name.as_str(),
67 "cleaning up S3 deployment data"
68 );
69 let s3_client = s3::create_client(Region::new(MONITORING_REGION)).await;
70 let deployment_prefix = format!("{}/{}/", DEPLOYMENTS_PREFIX, &tag);
71 match delete_prefix(&s3_client, &bucket_name, &deployment_prefix).await {
72 Ok(()) => {
73 info!(
74 bucket = bucket_name.as_str(),
75 prefix = deployment_prefix.as_str(),
76 "deleted S3 deployment data"
77 );
78 }
79 Err(e) => {
80 if is_no_such_bucket_error(&e) {
81 info!(
82 bucket = bucket_name.as_str(),
83 "bucket does not exist, skipping S3 cleanup"
84 );
85 } else {
86 warn!(bucket = bucket_name.as_str(), %e, "failed to delete S3 deployment data, continuing with destroy");
87 }
88 }
89 }
90
91 info!(regions=?all_regions, "removing resources");
93 let jobs = all_regions.iter().map(|region| {
94 let region = region.clone();
95 let tag = tag.clone();
96 async move {
97 let ec2_client = ec2::create_client(Region::new(region.clone())).await;
98 info!(region = region.as_str(), "created EC2 client");
99
100 let instance_ids = find_instances_by_tag(&ec2_client, &tag).await?;
101 if !instance_ids.is_empty() {
102 terminate_instances(&ec2_client, &instance_ids).await?;
103 wait_for_instances_terminated(&ec2_client, &instance_ids).await?;
104 info!(
105 region = region.as_str(),
106 ?instance_ids,
107 "terminated instances"
108 );
109 }
110
111 let security_groups = find_security_groups_by_tag(&ec2_client, &tag).await?;
113 let has_monitoring_sg = security_groups
114 .iter()
115 .any(|sg| sg.group_name() == Some(&tag));
116 let has_binary_sg = security_groups
117 .iter()
118 .any(|sg| sg.group_name() == Some(&format!("{tag}-binary")));
119 if region == MONITORING_REGION && has_monitoring_sg && has_binary_sg {
120 let monitoring_sg = security_groups
122 .iter()
123 .find(|sg| sg.group_name() == Some(&tag))
124 .expect("Monitoring security group not found")
125 .group_id()
126 .unwrap();
127
128 let binary_sg = security_groups
130 .iter()
131 .find(|sg| sg.group_name() == Some(&format!("{tag}-binary")))
132 .expect("Regular security group not found")
133 .group_id()
134 .unwrap();
135
136 let logging_permission = IpPermission::builder()
138 .ip_protocol("tcp")
139 .from_port(LOGS_PORT as i32)
140 .to_port(LOGS_PORT as i32)
141 .user_id_group_pairs(UserIdGroupPair::builder().group_id(binary_sg).build())
142 .build();
143 if let Err(e) = ec2_client
144 .revoke_security_group_ingress()
145 .group_id(monitoring_sg)
146 .ip_permissions(logging_permission)
147 .send()
148 .await
149 {
150 warn!(%e, "failed to revoke logs ingress rule between monitoring and binary security groups");
151 } else {
152 info!(
153 monitoring_sg,
154 binary_sg,
155 "revoked logs ingress rule between monitoring and binary security groups"
156 );
157 }
158
159 let profiling_permission = IpPermission::builder()
161 .ip_protocol("tcp")
162 .from_port(PROFILES_PORT as i32)
163 .to_port(PROFILES_PORT as i32)
164 .user_id_group_pairs(UserIdGroupPair::builder().group_id(binary_sg).build())
165 .build();
166 if let Err(e) = ec2_client
167 .revoke_security_group_ingress()
168 .group_id(monitoring_sg)
169 .ip_permissions(profiling_permission)
170 .send()
171 .await
172 {
173 warn!(%e, "failed to revoke profiles ingress rule between monitoring and binary security groups");
174 } else {
175 info!(
176 monitoring_sg,
177 binary_sg,
178 "revoked profiles ingress rule between monitoring and binary security groups"
179 );
180 }
181
182 let tracing_permission = IpPermission::builder()
184 .ip_protocol("tcp")
185 .from_port(TRACES_PORT as i32)
186 .to_port(TRACES_PORT as i32)
187 .user_id_group_pairs(UserIdGroupPair::builder().group_id(binary_sg).build())
188 .build();
189 if let Err(e) = ec2_client
190 .revoke_security_group_ingress()
191 .group_id(monitoring_sg)
192 .ip_permissions(tracing_permission)
193 .send()
194 .await
195 {
196 warn!(%e, "failed to revoke traces ingress rule between monitoring and binary security groups");
197 } else {
198 info!(
199 monitoring_sg,
200 binary_sg,
201 "revoked traces ingress rule between monitoring and binary security groups"
202 );
203 }
204 }
205
206 let sgs = find_security_groups_by_tag(&ec2_client, &tag).await?;
208 for sg in sgs {
209 let sg_id = sg.group_id().unwrap();
210 wait_for_enis_deleted(&ec2_client, sg_id).await?;
211 info!(
212 region = region.as_str(),
213 sg_id, "ENIs deleted from security group"
214 );
215 delete_security_group(&ec2_client, sg_id).await?;
216 info!(region = region.as_str(), sg_id, "deleted security group");
217 }
218
219 let subnet_ids = find_subnets_by_tag(&ec2_client, &tag).await?;
220 for subnet_id in subnet_ids {
221 delete_subnet(&ec2_client, &subnet_id).await?;
222 info!(region = region.as_str(), subnet_id, "deleted subnet");
223 }
224
225 let route_table_ids = find_route_tables_by_tag(&ec2_client, &tag).await?;
226 for rt_id in route_table_ids {
227 delete_route_table(&ec2_client, &rt_id).await?;
228 info!(region = region.as_str(), rt_id, "deleted route table");
229 }
230
231 let peering_ids = find_vpc_peering_by_tag(&ec2_client, &tag).await?;
232 for peering_id in peering_ids {
233 delete_vpc_peering(&ec2_client, &peering_id).await?;
234 wait_for_vpc_peering_deletion(&ec2_client, &peering_id).await?;
235 info!(
236 region = region.as_str(),
237 peering_id, "deleted VPC peering connection"
238 );
239 }
240
241 let igw_ids = find_igws_by_tag(&ec2_client, &tag).await?;
242 for igw_id in igw_ids {
243 if let Some(vpc_id) = find_vpc_by_igw(&ec2_client, &igw_id).await? {
244 detach_igw(&ec2_client, &igw_id, &vpc_id).await?;
245 info!(
246 region = region.as_str(),
247 igw_id, vpc_id, "detached internet gateway"
248 );
249 }
250 delete_igw(&ec2_client, &igw_id).await?;
251 info!(region = region.as_str(), igw_id, "deleted internet gateway");
252 }
253
254 let key_name = format!("deployer-{tag}");
255 delete_key_pair(&ec2_client, &key_name).await?;
256 info!(region = region.as_str(), key_name, "deleted key pair");
257 Ok::<_, Error>((region, ec2_client))
258 }
259 });
260 let ec2_clients: HashMap<String, Ec2Client> = try_join_all(jobs).await?.into_iter().collect();
261
262 let vpc_jobs = ec2_clients.into_iter().map(|(region, ec2_client)| {
264 let tag = tag.clone();
265 async move {
266 let vpc_ids = find_vpcs_by_tag(&ec2_client, &tag).await?;
267 for vpc_id in vpc_ids {
268 delete_vpc(&ec2_client, &vpc_id).await?;
269 info!(region = region.as_str(), vpc_id, "deleted VPC");
270 }
271 Ok::<(), Error>(())
272 }
273 });
274 try_join_all(vpc_jobs).await?;
275 info!(regions = ?all_regions, "resources removed");
276
277 File::create(destroyed_file)?;
279
280 info!(tag = tag.as_str(), "destruction complete");
282 Ok(())
283}