1use super::tags::Tags;
2use crate::CleanupResources;
3use crate::ec2_instance::{Ec2Instance, NetworkInterface};
4use crate::ec2_instance_definition::{Ec2InstanceDefinition, InstanceOs};
5use crate::{AwsBuilder, IngressRestriction};
6use anyhow::anyhow;
7use aws_config::meta::region::RegionProviderChain;
8use aws_config::retry::ProvideErrorKind;
9use aws_config::{BehaviorVersion, SdkConfig};
10use aws_sdk_ec2::config::Region;
11use aws_sdk_ec2::types::PlacementStrategy;
12use aws_sdk_ec2::types::{
13 BlockDeviceMapping, EbsBlockDevice, Filter, InstanceNetworkInterfaceSpecification, KeyType,
14 Placement, ResourceType, Subnet, VolumeType,
15};
16use base64::Engine;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use ssh_key::PrivateKey;
20use ssh_key::rand_core::OsRng;
21use std::fmt::Write;
22use std::future::Future;
23use std::pin::Pin;
24use std::time::{Duration, Instant};
25use uuid::Uuid;
26
27async fn config() -> SdkConfig {
28 let region_provider = RegionProviderChain::first_try(Region::new("us-east-1"));
29 aws_config::defaults(BehaviorVersion::latest())
30 .region(region_provider)
31 .load()
32 .await
33}
34
35pub struct Aws {
37 client: aws_sdk_ec2::Client,
38 az_name: String,
39 keyname: String,
40 client_private_key: String,
41 host_public_key: String,
42 host_public_key_bytes: Vec<u8>,
43 host_private_key: String,
44 security_group_id: String,
45 placement_group_name: String,
46 subnet_id: String,
47 subnet_map_public_ip_on_launch: bool,
48 use_public_addresses: bool,
49 tags: Tags,
50}
51
52impl Aws {
53 pub(crate) async fn new(builder: AwsBuilder) -> Aws {
54 let config = config().await;
55 let user_name = super::iam::user_name(&config).await;
56 let keyname = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
57 let security_group_name = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
58 let placement_group_name = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
59 let az_name = builder.az_name.unwrap_or_else(|| "us-east-1c".into());
60 let client = aws_sdk_ec2::Client::new(&config);
61
62 let tags = Tags {
63 user_name: user_name.clone(),
64 cleanup: builder.cleanup,
65 };
66
67 Aws::cleanup_resources_inner(&client, &tags).await;
69
70 let (client_private_key, security_group_id, _, subnet) = tokio::join!(
71 Aws::create_key_pair(&client, &tags, &keyname),
72 Aws::create_security_group(
73 &client,
74 &tags,
75 &security_group_name,
76 &builder.vpc_id,
77 builder.security_group_id,
78 &builder.expose_ports_to_internet,
79 builder.ingress_restriction,
80 ),
81 Aws::create_placement_group(
82 &client,
83 &tags,
84 &placement_group_name,
85 builder.placement_strategy
86 ),
87 Aws::get_subnet(&client, builder.subnet_id, az_name.clone())
88 );
89
90 let subnet_id = subnet.subnet_id.unwrap();
91 let subnet_map_public_ip_on_launch = subnet.map_public_ip_on_launch.unwrap();
92
93 let key = PrivateKey::random(&mut OsRng {}, ssh_key::Algorithm::Ed25519).unwrap();
94 let host_public_key_bytes = key.public_key().to_bytes().unwrap();
95 let host_public_key = key.public_key().to_openssh().unwrap();
96 let host_private_key = key.to_openssh(ssh_key::LineEnding::LF).unwrap().to_string();
97
98 let use_public_addresses = builder.use_public_addresses;
99
100 Aws {
101 use_public_addresses,
102 client,
103 az_name,
104 keyname,
105 client_private_key,
106 host_public_key_bytes,
107 host_public_key,
108 host_private_key,
109 security_group_id,
110 placement_group_name,
111 subnet_id,
112 subnet_map_public_ip_on_launch,
113 tags,
114 }
115 }
116
117 pub fn builder(cleanup: CleanupResources) -> AwsBuilder {
122 AwsBuilder::new(cleanup)
123 }
124
125 async fn create_key_pair(client: &aws_sdk_ec2::Client, tags: &Tags, name: &str) -> String {
126 let keypair = client
127 .create_key_pair()
128 .key_name(name)
129 .key_type(KeyType::Ed25519)
130 .tag_specifications(tags.create_tags(ResourceType::KeyPair, "aws-throwaway"))
131 .send()
132 .await
133 .map_err(|e| e.into_service_error())
134 .unwrap();
135 let client_private_key = keypair.key_material().unwrap().to_string();
136 tracing::info!("client_private_key:\n{}", client_private_key);
137 client_private_key
138 }
139
140 async fn create_security_group(
141 client: &aws_sdk_ec2::Client,
142 tags: &Tags,
143 name: &str,
144 vpc_id: &Option<String>,
145 security_group_id: Option<String>,
146 ports: &[u16],
147 ingress_restriction: IngressRestriction,
148 ) -> String {
149 match security_group_id {
150 Some(id) => id,
151 None => {
152 let (security_group, cidr_ip) = tokio::join!(
153 client
154 .create_security_group()
155 .group_name(name)
156 .set_vpc_id(vpc_id.clone())
157 .description("aws-throwaway security group")
158 .tag_specifications(
159 tags.create_tags(ResourceType::SecurityGroup, "aws-throwaway"),
160 )
161 .send(),
162 ingress_restriction.cidr_ip()
163 );
164 let security_group_id = security_group
165 .map_err(|e| e.into_service_error())
166 .unwrap()
167 .group_id
168 .unwrap();
169 tracing::info!("created security group");
170
171 let mut futures =
172 FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::new();
173 futures.push(Box::pin(Aws::create_ingress_rule_internal(
174 client, tags, name,
175 )));
176
177 if !ports.contains(&22) {
179 futures.push(Box::pin(Aws::create_ingress_rule_for_port(
180 client, tags, name, &cidr_ip, 22,
181 )));
182 }
183 for port in ports {
184 futures.push(Box::pin(Aws::create_ingress_rule_for_port(
185 client, tags, name, &cidr_ip, *port,
186 )));
187 }
188 while futures.next().await.is_some() {}
189 security_group_id
190 }
191 }
192 }
193
194 async fn create_ingress_rule_internal(
195 client: &aws_sdk_ec2::Client,
196 tags: &Tags,
197 group_name: &str,
198 ) {
199 assert!(
200 client
201 .authorize_security_group_ingress()
202 .group_name(group_name)
203 .source_security_group_name(group_name)
204 .tag_specifications(
205 tags.create_tags(ResourceType::SecurityGroupRule, "within aws-throwaway SG")
206 )
207 .send()
208 .await
209 .map_err(|e| e.into_service_error())
210 .unwrap()
211 .r#return()
212 .unwrap()
213 );
214 tracing::info!("created security group rule - internal");
215 }
216
217 async fn create_ingress_rule_for_port(
218 client: &aws_sdk_ec2::Client,
219 tags: &Tags,
220 group_name: &str,
221 cidr_ip: &str,
222 port: u16,
223 ) {
224 let port = port.to_string();
225 assert!(
226 client
227 .authorize_security_group_ingress()
228 .group_name(group_name)
229 .ip_protocol("tcp")
230 .from_port(22)
231 .to_port(22)
232 .cidr_ip(cidr_ip)
233 .tag_specifications(
234 tags.create_tags(ResourceType::SecurityGroupRule, &format!("port {port}"))
235 )
236 .send()
237 .await
238 .map_err(|e| e.into_service_error())
239 .unwrap()
240 .r#return()
241 .unwrap()
242 );
243 tracing::info!("created security group rule - port {port}");
244 }
245
246 async fn create_placement_group(
247 client: &aws_sdk_ec2::Client,
248 tags: &Tags,
249 name: &str,
250 strategy: PlacementStrategy,
251 ) {
252 client
253 .create_placement_group()
254 .group_name(name)
255 .strategy(strategy)
256 .tag_specifications(tags.create_tags(ResourceType::PlacementGroup, "aws-throwaway"))
257 .send()
258 .await
259 .map_err(|e| e.into_service_error())
260 .unwrap();
261 tracing::info!("created placement group");
262 }
263
264 async fn get_subnet(
265 client: &aws_sdk_ec2::Client,
266 subnet_id: Option<String>,
267 az_name: String,
268 ) -> Subnet {
269 match &subnet_id {
270 Some(subnet_id) => client.describe_subnets().filters(
271 Filter::builder()
272 .name("subnet-id")
273 .values(subnet_id)
274 .build(),
275 ),
276 None => client
277 .describe_subnets()
278 .filters(
279 Filter::builder()
280 .name("default-for-az")
281 .values("true")
282 .build(),
283 )
284 .filters(
285 Filter::builder()
286 .name("availability-zone")
287 .values(&az_name)
288 .build(),
289 ),
290 }
291 .send()
292 .await
293 .map_err(|e| e.into_service_error())
294 .unwrap()
295 .subnets
296 .unwrap()
297 .pop()
298 .unwrap_or_else(|| match subnet_id {
299 Some(subnet) => panic!("Subnet {subnet} could not be found"),
300 None => panic!("No default subnet configured for {az_name}"),
301 })
302 }
303
304 pub async fn cleanup_resources(&self) {
307 Self::cleanup_resources_inner(&self.client, &self.tags).await
308 }
309
310 pub async fn cleanup_resources_static(cleanup: CleanupResources) {
312 let config = config().await;
313 let user_name = super::iam::user_name(&config).await;
314 let client = aws_sdk_ec2::Client::new(&config);
315 let tags = Tags { user_name, cleanup };
316 Aws::cleanup_resources_inner(&client, &tags).await;
317 }
318
319 async fn get_all_throwaway_tags(
320 client: &aws_sdk_ec2::Client,
321 tags: &Tags,
322 resource_type: &str,
323 ) -> Vec<String> {
324 let (user_tags, app_tags) = tokio::join!(
325 tags.fetch_user_tags(client, resource_type),
326 tags.fetch_app_tags(client, resource_type),
327 );
328
329 let mut ids_of_user = vec![];
330 for tag in user_tags.tags() {
331 if let Some(id) = tag.resource_id() {
332 ids_of_user.push(id.to_owned());
333 }
334 }
335
336 if let Some(app_tags) = app_tags {
337 let mut ids_of_user_and_app = vec![];
338 for app_tag in app_tags.tags() {
339 if let Some(id) = app_tag.resource_id() {
340 let id = id.to_owned();
341 if ids_of_user.contains(&id) {
342 ids_of_user_and_app.push(id);
343 }
344 }
345 }
346 ids_of_user_and_app
347 } else {
348 ids_of_user
349 }
350 }
351
352 async fn cleanup_resources_inner(client: &aws_sdk_ec2::Client, tags: &Tags) {
353 for id in Self::get_all_throwaway_tags(client, tags, "elastic-ip").await {
355 client
356 .release_address()
357 .allocation_id(&id)
358 .send()
359 .await
360 .map_err(|e| {
361 anyhow::anyhow!(e.into_service_error())
362 .context(format!("Failed to release elastic ip {id:?}"))
363 })
364 .unwrap();
365 tracing::info!("elastic ip {id:?} was succesfully deleted");
366 }
367
368 tracing::info!("Terminating instances");
369 let instance_ids = Self::get_all_throwaway_tags(client, tags, "instance").await;
370 Self::terminate_instances(client, instance_ids).await;
371
372 tokio::join!(
373 Aws::delete_security_groups(client, tags),
374 Aws::delete_placement_groups(client, tags),
375 Aws::delete_keypairs(client, tags),
376 );
377 }
378
379 pub(crate) async fn terminate_instances(
380 client: &aws_sdk_ec2::Client,
381 instance_ids: Vec<String>,
382 ) {
383 if !instance_ids.is_empty() {
384 for result in client
385 .terminate_instances()
386 .set_instance_ids(Some(instance_ids))
387 .send()
388 .await
389 .map_err(|e| e.into_service_error())
390 .unwrap()
391 .terminating_instances()
392 {
393 tracing::info!(
394 "Instance {:?} {:?} -> {:?}",
395 result.instance_id.as_ref().unwrap(),
396 result.previous_state().unwrap().name().unwrap(),
397 result.current_state().unwrap().name().unwrap()
398 );
399 }
400 }
401 }
402
403 pub(crate) async fn terminate_instance(&self, instance: Ec2Instance) {
404 Self::terminate_instances(&self.client, vec![instance.aws_id]).await
405 }
406
407 async fn delete_security_groups(client: &aws_sdk_ec2::Client, tags: &Tags) {
408 for id in Self::get_all_throwaway_tags(client, tags, "security-group").await {
409 if let Err(err) = client.delete_security_group().group_id(&id).send().await {
410 tracing::info!(
411 "security group {id:?} could not be deleted, this will get cleaned up eventually on a future aws-throwaway cleanup: {:?}",
412 err.into_service_error().meta().message()
413 )
414 } else {
415 tracing::info!("security group {id:?} was succesfully deleted")
416 }
417 }
418 }
419
420 async fn delete_placement_groups(client: &aws_sdk_ec2::Client, tags: &Tags) {
421 let placement_group_ids =
422 Self::get_all_throwaway_tags(client, tags, "placement-group").await;
423 if !placement_group_ids.is_empty() {
424 let placement_groups = client
426 .describe_placement_groups()
427 .set_group_ids(Some(placement_group_ids))
428 .send()
429 .await
430 .map_err(|e| e.into_service_error())
431 .unwrap();
432 for placement_group in placement_groups.placement_groups() {
433 let name = placement_group.group_name().unwrap();
434 if let Err(err) = client
435 .delete_placement_group()
436 .group_name(name)
437 .send()
438 .await
439 {
440 tracing::info!(
441 "placement group {name:?} could not be deleted, this will get cleaned up eventually on a future aws-throwaway cleanup: {:?}",
442 err.into_service_error().meta().message()
443 )
444 } else {
445 tracing::info!("placement group {name:?} was succesfully deleted")
446 }
447 }
448 }
449 }
450
451 async fn delete_keypairs(client: &aws_sdk_ec2::Client, tags: &Tags) {
452 for id in Self::get_all_throwaway_tags(client, tags, "key-pair").await {
453 if let Err(err) = client.delete_key_pair().key_pair_id(&id).send().await {
454 let err = err.into_service_error();
455 if err.code() == Some("UnauthorizedOperation") {
456 tracing::error!("{:?}", anyhow!(err).context(format!(
457 "Did not have permissions to delete keypair {id:?}, skipping all other keypairs since they will also fail."
458 )));
459 return;
460 } else {
461 panic!("Failed to delete keypair {id:?}: {err:?}")
462 }
463 } else {
464 tracing::info!("keypair {id:?} was succesfully deleted");
465 }
466 }
467 }
468
469 pub async fn create_ec2_instance(&self, definition: Ec2InstanceDefinition) -> Ec2Instance {
471 let elastic_ip = if self.use_public_addresses && definition.network_interface_count > 1 {
473 Some(
474 self.client
475 .allocate_address()
476 .tag_specifications(
477 self.tags
478 .create_tags(ResourceType::ElasticIp, "aws-throwaway"),
479 )
480 .send()
481 .await
482 .map_err(|e| e.into_service_error())
483 .unwrap(),
484 )
485 } else {
486 None
487 };
488
489 let security_group_ids = if definition.network_interface_count == 1 {
491 Some(vec![self.security_group_id.clone()])
492 } else {
493 None
494 };
495
496 let mut bring_down_secondary_interfaces = String::new();
498 for i in 1..definition.network_interface_count {
499 writeln!(
500 bring_down_secondary_interfaces,
501 "sudo ip link set dev ens{} down",
502 5 + i
503 )
504 .unwrap();
505 }
506
507 let ubuntu_version = match definition.os {
508 InstanceOs::Ubuntu20_04 => "20.04",
509 InstanceOs::Ubuntu22_04 => "22.04",
510 };
511 let image_id = definition.ami.unwrap_or_else(|| format!(
512 "resolve:ssm:/aws/service/canonical/ubuntu/server/{}/stable/current/{}/hvm/ebs-gp2/ami-id",
513 ubuntu_version,
514 super::cpu_arch::get_arch_of_instance_type(definition.instance_type.clone()).get_ubuntu_arch_identifier()
515 ));
516 let result = self
517 .client
518 .run_instances()
519 .instance_type(definition.instance_type)
520 .set_placement(Some(
521 Placement::builder()
522 .group_name(&self.placement_group_name)
523 .availability_zone(&self.az_name)
524 .build(),
525 ))
526 .set_subnet_id(if definition.network_interface_count == 1 {
527 Some(self.subnet_id.to_owned())
528 } else {
529 None
530 })
531 .min_count(1)
532 .max_count(1)
533 .block_device_mappings(
534 BlockDeviceMapping::builder()
535 .device_name("/dev/sda1")
536 .ebs(
537 EbsBlockDevice::builder()
538 .delete_on_termination(true)
539 .volume_size(definition.volume_size_gb as i32)
540 .volume_type(VolumeType::Gp2)
541 .encrypted(true)
542 .build(),
543 )
544 .build(),
545 )
546 .set_security_group_ids(security_group_ids)
547 .set_network_interfaces(if definition.network_interface_count == 1 {
548 None
549 } else {
550 Some(
551 (0..definition.network_interface_count)
552 .map(|i| {
553 InstanceNetworkInterfaceSpecification::builder()
554 .delete_on_termination(true)
555 .device_index(i as i32)
556 .groups(&self.security_group_id)
557 .associate_public_ip_address(false)
559 .subnet_id(&self.subnet_id)
560 .description(i.to_string())
561 .build()
562 })
563 .collect(),
564 )
565 })
566 .key_name(&self.keyname)
567 .user_data(base64::engine::general_purpose::STANDARD.encode(format!(
568 r#"#!/bin/bash
569{bring_down_secondary_interfaces}
570sudo systemctl stop ssh
571echo "{}" > /etc/ssh/ssh_host_ed25519_key.pub
572echo "{}" > /etc/ssh/ssh_host_ed25519_key
573
574echo "ClientAliveInterval 30" >> /etc/ssh/sshd_config
575sudo systemctl start ssh
576"#,
577 self.host_public_key, self.host_private_key
578 )))
579 .tag_specifications(
580 self.tags
581 .create_tags(ResourceType::Instance, "aws-throwaway"),
582 )
583 .image_id(image_id)
584 .send()
585 .await
586 .map_err(|e| e.into_service_error())
587 .unwrap();
588
589 let instance = result.instances().first().unwrap();
590 let primary_network_interface_id = instance
591 .network_interfaces
592 .as_ref()
593 .unwrap()
594 .iter()
595 .find(|x| x.attachment.as_ref().unwrap().device_index.unwrap() == 0)
596 .unwrap()
597 .network_interface_id
598 .as_ref()
599 .unwrap();
600
601 let network_interfaces = instance
602 .network_interfaces
603 .as_ref()
604 .unwrap()
605 .iter()
606 .map(|x| NetworkInterface {
607 device_index: x.attachment.as_ref().unwrap().device_index.unwrap(),
608 private_ipv4: x.private_ip_address.as_ref().unwrap().parse().unwrap(),
609 })
610 .collect();
611
612 if let Some(elastic_ip) = &elastic_ip {
613 let start = Instant::now();
614 loop {
615 match self
616 .client
617 .associate_address()
618 .allocation_id(elastic_ip.allocation_id.as_ref().unwrap())
619 .network_interface_id(primary_network_interface_id)
620 .send()
621 .await
622 {
623 Ok(_) => {
624 break;
625 }
626 Err(err) => {
627 if start.elapsed() > Duration::from_secs(120) {
630 panic!(
631 "Received error while associating address after 120s retrying: {}",
632 err.into_service_error()
633 );
634 } else {
635 tokio::time::sleep(Duration::from_secs(2)).await;
636 }
637 }
638 }
639 }
640 }
641
642 let mut public_ip = elastic_ip.map(|x| x.public_ip.unwrap().parse().unwrap());
643 let mut private_ip = None;
644
645 let public_ip_expected = self.use_public_addresses || self.subnet_map_public_ip_on_launch;
646
647 if public_ip_expected {
648 tracing::info!("Waiting for instance private ip and public ip to be assigned");
649 } else {
650 tracing::info!("Waiting for instance private ip to be assigned");
651 }
652 while (public_ip_expected && public_ip.is_none()) || private_ip.is_none() {
653 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
656
657 let instance = self
658 .client
659 .describe_instances()
660 .instance_ids(instance.instance_id().unwrap())
661 .send()
662 .await
663 .map_err(|e| e.into_service_error());
664 match instance {
665 Ok(instance) => {
666 for reservation in instance.reservations() {
667 for instance in reservation.instances() {
668 if public_ip.is_none() {
669 public_ip =
670 instance.public_ip_address().map(|x| x.parse().unwrap());
671 }
672 private_ip = instance.private_ip_address().map(|x| x.parse().unwrap());
673 }
674 }
675 }
676 Err(err) => {
677 if err.code() != Some("InvalidInstanceID.NotFound") {
680 panic!("Failed to describe instance {err:?}");
681 }
682 }
683 }
684 }
685 let aws_id = instance.instance_id().unwrap().to_owned();
686
687 let private_ip = private_ip.unwrap();
688 let connect_ip = if self.use_public_addresses {
689 public_ip.unwrap()
690 } else {
691 private_ip
692 };
693 tracing::info!("created EC2 instance at public:{public_ip:?} private:{private_ip}");
694
695 Ec2Instance::new(
696 aws_id,
697 connect_ip,
698 public_ip,
699 private_ip,
700 self.host_public_key_bytes.clone(),
701 self.host_public_key.clone(),
702 &self.client_private_key,
703 network_interfaces,
704 )
705 .await
706 }
707}