aws_throwaway/backend/sdk/
aws.rs

1use super::tags::Tags;
2use crate::CleanupResources;
3use crate::ec2_instance::{Ec2Instance, NetworkInterface};
4use crate::ec2_instance_definition::{Ec2InstanceDefinition, InstanceOs};
5use crate::{AwsBuilder, IngressRestriction};
6use anyhow::anyhow;
7use aws_config::meta::region::RegionProviderChain;
8use aws_config::retry::ProvideErrorKind;
9use aws_config::{BehaviorVersion, SdkConfig};
10use aws_sdk_ec2::config::Region;
11use aws_sdk_ec2::types::PlacementStrategy;
12use aws_sdk_ec2::types::{
13    BlockDeviceMapping, EbsBlockDevice, Filter, InstanceNetworkInterfaceSpecification, KeyType,
14    Placement, ResourceType, Subnet, VolumeType,
15};
16use base64::Engine;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use ssh_key::PrivateKey;
20use ssh_key::rand_core::OsRng;
21use std::fmt::Write;
22use std::future::Future;
23use std::pin::Pin;
24use std::time::{Duration, Instant};
25use uuid::Uuid;
26
27async fn config() -> SdkConfig {
28    let region_provider = RegionProviderChain::first_try(Region::new("us-east-1"));
29    aws_config::defaults(BehaviorVersion::latest())
30        .region(region_provider)
31        .load()
32        .await
33}
34
35/// Construct this type to create and cleanup aws resources.
36pub struct Aws {
37    client: aws_sdk_ec2::Client,
38    az_name: String,
39    keyname: String,
40    client_private_key: String,
41    host_public_key: String,
42    host_public_key_bytes: Vec<u8>,
43    host_private_key: String,
44    security_group_id: String,
45    placement_group_name: String,
46    subnet_id: String,
47    subnet_map_public_ip_on_launch: bool,
48    use_public_addresses: bool,
49    tags: Tags,
50}
51
52impl Aws {
53    pub(crate) async fn new(builder: AwsBuilder) -> Aws {
54        let config = config().await;
55        let user_name = super::iam::user_name(&config).await;
56        let keyname = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
57        let security_group_name = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
58        let placement_group_name = format!("aws-throwaway-{user_name}-{}", Uuid::new_v4());
59        let az_name = builder.az_name.unwrap_or_else(|| "us-east-1c".into());
60        let client = aws_sdk_ec2::Client::new(&config);
61
62        let tags = Tags {
63            user_name: user_name.clone(),
64            cleanup: builder.cleanup,
65        };
66
67        // Cleanup any resources that were previously failed to cleanup
68        Aws::cleanup_resources_inner(&client, &tags).await;
69
70        let (client_private_key, security_group_id, _, subnet) = tokio::join!(
71            Aws::create_key_pair(&client, &tags, &keyname),
72            Aws::create_security_group(
73                &client,
74                &tags,
75                &security_group_name,
76                &builder.vpc_id,
77                builder.security_group_id,
78                &builder.expose_ports_to_internet,
79                builder.ingress_restriction,
80            ),
81            Aws::create_placement_group(
82                &client,
83                &tags,
84                &placement_group_name,
85                builder.placement_strategy
86            ),
87            Aws::get_subnet(&client, builder.subnet_id, az_name.clone())
88        );
89
90        let subnet_id = subnet.subnet_id.unwrap();
91        let subnet_map_public_ip_on_launch = subnet.map_public_ip_on_launch.unwrap();
92
93        let key = PrivateKey::random(&mut OsRng {}, ssh_key::Algorithm::Ed25519).unwrap();
94        let host_public_key_bytes = key.public_key().to_bytes().unwrap();
95        let host_public_key = key.public_key().to_openssh().unwrap();
96        let host_private_key = key.to_openssh(ssh_key::LineEnding::LF).unwrap().to_string();
97
98        let use_public_addresses = builder.use_public_addresses;
99
100        Aws {
101            use_public_addresses,
102            client,
103            az_name,
104            keyname,
105            client_private_key,
106            host_public_key_bytes,
107            host_public_key,
108            host_private_key,
109            security_group_id,
110            placement_group_name,
111            subnet_id,
112            subnet_map_public_ip_on_launch,
113            tags,
114        }
115    }
116
117    /// Returns an [`AwsBuilder`] that will build a new [`Aws`].
118    ///
119    /// Before building the [`Aws`], all preexisting resources conforming to the specified [`CleanupResources`] approach are destroyed.
120    /// The specified [`CleanupResources`] is then also used by the [`Aws::cleanup_resources`] method.
121    pub fn builder(cleanup: CleanupResources) -> AwsBuilder {
122        AwsBuilder::new(cleanup)
123    }
124
125    async fn create_key_pair(client: &aws_sdk_ec2::Client, tags: &Tags, name: &str) -> String {
126        let keypair = client
127            .create_key_pair()
128            .key_name(name)
129            .key_type(KeyType::Ed25519)
130            .tag_specifications(tags.create_tags(ResourceType::KeyPair, "aws-throwaway"))
131            .send()
132            .await
133            .map_err(|e| e.into_service_error())
134            .unwrap();
135        let client_private_key = keypair.key_material().unwrap().to_string();
136        tracing::info!("client_private_key:\n{}", client_private_key);
137        client_private_key
138    }
139
140    async fn create_security_group(
141        client: &aws_sdk_ec2::Client,
142        tags: &Tags,
143        name: &str,
144        vpc_id: &Option<String>,
145        security_group_id: Option<String>,
146        ports: &[u16],
147        ingress_restriction: IngressRestriction,
148    ) -> String {
149        match security_group_id {
150            Some(id) => id,
151            None => {
152                let (security_group, cidr_ip) = tokio::join!(
153                    client
154                        .create_security_group()
155                        .group_name(name)
156                        .set_vpc_id(vpc_id.clone())
157                        .description("aws-throwaway security group")
158                        .tag_specifications(
159                            tags.create_tags(ResourceType::SecurityGroup, "aws-throwaway"),
160                        )
161                        .send(),
162                    ingress_restriction.cidr_ip()
163                );
164                let security_group_id = security_group
165                    .map_err(|e| e.into_service_error())
166                    .unwrap()
167                    .group_id
168                    .unwrap();
169                tracing::info!("created security group");
170
171                let mut futures =
172                    FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::new();
173                futures.push(Box::pin(Aws::create_ingress_rule_internal(
174                    client, tags, name,
175                )));
176
177                // SSH
178                if !ports.contains(&22) {
179                    futures.push(Box::pin(Aws::create_ingress_rule_for_port(
180                        client, tags, name, &cidr_ip, 22,
181                    )));
182                }
183                for port in ports {
184                    futures.push(Box::pin(Aws::create_ingress_rule_for_port(
185                        client, tags, name, &cidr_ip, *port,
186                    )));
187                }
188                while futures.next().await.is_some() {}
189                security_group_id
190            }
191        }
192    }
193
194    async fn create_ingress_rule_internal(
195        client: &aws_sdk_ec2::Client,
196        tags: &Tags,
197        group_name: &str,
198    ) {
199        assert!(
200            client
201                .authorize_security_group_ingress()
202                .group_name(group_name)
203                .source_security_group_name(group_name)
204                .tag_specifications(
205                    tags.create_tags(ResourceType::SecurityGroupRule, "within aws-throwaway SG")
206                )
207                .send()
208                .await
209                .map_err(|e| e.into_service_error())
210                .unwrap()
211                .r#return()
212                .unwrap()
213        );
214        tracing::info!("created security group rule - internal");
215    }
216
217    async fn create_ingress_rule_for_port(
218        client: &aws_sdk_ec2::Client,
219        tags: &Tags,
220        group_name: &str,
221        cidr_ip: &str,
222        port: u16,
223    ) {
224        let port = port.to_string();
225        assert!(
226            client
227                .authorize_security_group_ingress()
228                .group_name(group_name)
229                .ip_protocol("tcp")
230                .from_port(22)
231                .to_port(22)
232                .cidr_ip(cidr_ip)
233                .tag_specifications(
234                    tags.create_tags(ResourceType::SecurityGroupRule, &format!("port {port}"))
235                )
236                .send()
237                .await
238                .map_err(|e| e.into_service_error())
239                .unwrap()
240                .r#return()
241                .unwrap()
242        );
243        tracing::info!("created security group rule - port {port}");
244    }
245
246    async fn create_placement_group(
247        client: &aws_sdk_ec2::Client,
248        tags: &Tags,
249        name: &str,
250        strategy: PlacementStrategy,
251    ) {
252        client
253            .create_placement_group()
254            .group_name(name)
255            .strategy(strategy)
256            .tag_specifications(tags.create_tags(ResourceType::PlacementGroup, "aws-throwaway"))
257            .send()
258            .await
259            .map_err(|e| e.into_service_error())
260            .unwrap();
261        tracing::info!("created placement group");
262    }
263
264    async fn get_subnet(
265        client: &aws_sdk_ec2::Client,
266        subnet_id: Option<String>,
267        az_name: String,
268    ) -> Subnet {
269        match &subnet_id {
270            Some(subnet_id) => client.describe_subnets().filters(
271                Filter::builder()
272                    .name("subnet-id")
273                    .values(subnet_id)
274                    .build(),
275            ),
276            None => client
277                .describe_subnets()
278                .filters(
279                    Filter::builder()
280                        .name("default-for-az")
281                        .values("true")
282                        .build(),
283                )
284                .filters(
285                    Filter::builder()
286                        .name("availability-zone")
287                        .values(&az_name)
288                        .build(),
289                ),
290        }
291        .send()
292        .await
293        .map_err(|e| e.into_service_error())
294        .unwrap()
295        .subnets
296        .unwrap()
297        .pop()
298        .unwrap_or_else(|| match subnet_id {
299            Some(subnet) => panic!("Subnet {subnet} could not be found"),
300            None => panic!("No default subnet configured for {az_name}"),
301        })
302    }
303
304    /// Call before dropping [`Aws`]
305    /// Uses the `CleanupResources` method specified in the constructor.
306    pub async fn cleanup_resources(&self) {
307        Self::cleanup_resources_inner(&self.client, &self.tags).await
308    }
309
310    /// Call to cleanup without constructing an [`Aws`]
311    pub async fn cleanup_resources_static(cleanup: CleanupResources) {
312        let config = config().await;
313        let user_name = super::iam::user_name(&config).await;
314        let client = aws_sdk_ec2::Client::new(&config);
315        let tags = Tags { user_name, cleanup };
316        Aws::cleanup_resources_inner(&client, &tags).await;
317    }
318
319    async fn get_all_throwaway_tags(
320        client: &aws_sdk_ec2::Client,
321        tags: &Tags,
322        resource_type: &str,
323    ) -> Vec<String> {
324        let (user_tags, app_tags) = tokio::join!(
325            tags.fetch_user_tags(client, resource_type),
326            tags.fetch_app_tags(client, resource_type),
327        );
328
329        let mut ids_of_user = vec![];
330        for tag in user_tags.tags() {
331            if let Some(id) = tag.resource_id() {
332                ids_of_user.push(id.to_owned());
333            }
334        }
335
336        if let Some(app_tags) = app_tags {
337            let mut ids_of_user_and_app = vec![];
338            for app_tag in app_tags.tags() {
339                if let Some(id) = app_tag.resource_id() {
340                    let id = id.to_owned();
341                    if ids_of_user.contains(&id) {
342                        ids_of_user_and_app.push(id);
343                    }
344                }
345            }
346            ids_of_user_and_app
347        } else {
348            ids_of_user
349        }
350    }
351
352    async fn cleanup_resources_inner(client: &aws_sdk_ec2::Client, tags: &Tags) {
353        // release elastic ips
354        for id in Self::get_all_throwaway_tags(client, tags, "elastic-ip").await {
355            client
356                .release_address()
357                .allocation_id(&id)
358                .send()
359                .await
360                .map_err(|e| {
361                    anyhow::anyhow!(e.into_service_error())
362                        .context(format!("Failed to release elastic ip {id:?}"))
363                })
364                .unwrap();
365            tracing::info!("elastic ip {id:?} was succesfully deleted");
366        }
367
368        tracing::info!("Terminating instances");
369        let instance_ids = Self::get_all_throwaway_tags(client, tags, "instance").await;
370        Self::terminate_instances(client, instance_ids).await;
371
372        tokio::join!(
373            Aws::delete_security_groups(client, tags),
374            Aws::delete_placement_groups(client, tags),
375            Aws::delete_keypairs(client, tags),
376        );
377    }
378
379    pub(crate) async fn terminate_instances(
380        client: &aws_sdk_ec2::Client,
381        instance_ids: Vec<String>,
382    ) {
383        if !instance_ids.is_empty() {
384            for result in client
385                .terminate_instances()
386                .set_instance_ids(Some(instance_ids))
387                .send()
388                .await
389                .map_err(|e| e.into_service_error())
390                .unwrap()
391                .terminating_instances()
392            {
393                tracing::info!(
394                    "Instance {:?} {:?} -> {:?}",
395                    result.instance_id.as_ref().unwrap(),
396                    result.previous_state().unwrap().name().unwrap(),
397                    result.current_state().unwrap().name().unwrap()
398                );
399            }
400        }
401    }
402
403    pub(crate) async fn terminate_instance(&self, instance: Ec2Instance) {
404        Self::terminate_instances(&self.client, vec![instance.aws_id]).await
405    }
406
407    async fn delete_security_groups(client: &aws_sdk_ec2::Client, tags: &Tags) {
408        for id in Self::get_all_throwaway_tags(client, tags, "security-group").await {
409            if let Err(err) = client.delete_security_group().group_id(&id).send().await {
410                tracing::info!(
411                    "security group {id:?} could not be deleted, this will get cleaned up eventually on a future aws-throwaway cleanup: {:?}",
412                    err.into_service_error().meta().message()
413                )
414            } else {
415                tracing::info!("security group {id:?} was succesfully deleted")
416            }
417        }
418    }
419
420    async fn delete_placement_groups(client: &aws_sdk_ec2::Client, tags: &Tags) {
421        let placement_group_ids =
422            Self::get_all_throwaway_tags(client, tags, "placement-group").await;
423        if !placement_group_ids.is_empty() {
424            // placement groups can not be deleted by id so we must look up their names
425            let placement_groups = client
426                .describe_placement_groups()
427                .set_group_ids(Some(placement_group_ids))
428                .send()
429                .await
430                .map_err(|e| e.into_service_error())
431                .unwrap();
432            for placement_group in placement_groups.placement_groups() {
433                let name = placement_group.group_name().unwrap();
434                if let Err(err) = client
435                    .delete_placement_group()
436                    .group_name(name)
437                    .send()
438                    .await
439                {
440                    tracing::info!(
441                        "placement group {name:?} could not be deleted, this will get cleaned up eventually on a future aws-throwaway cleanup: {:?}",
442                        err.into_service_error().meta().message()
443                    )
444                } else {
445                    tracing::info!("placement group {name:?} was succesfully deleted")
446                }
447            }
448        }
449    }
450
451    async fn delete_keypairs(client: &aws_sdk_ec2::Client, tags: &Tags) {
452        for id in Self::get_all_throwaway_tags(client, tags, "key-pair").await {
453            if let Err(err) = client.delete_key_pair().key_pair_id(&id).send().await {
454                let err = err.into_service_error();
455                if err.code() == Some("UnauthorizedOperation") {
456                    tracing::error!("{:?}", anyhow!(err).context(format!(
457                        "Did not have permissions to delete keypair {id:?}, skipping all other keypairs since they will also fail."
458                    )));
459                    return;
460                } else {
461                    panic!("Failed to delete keypair {id:?}: {err:?}")
462                }
463            } else {
464                tracing::info!("keypair {id:?} was succesfully deleted");
465            }
466        }
467    }
468
469    /// Creates a new EC2 instance as defined by [`Ec2InstanceDefinition`]
470    pub async fn create_ec2_instance(&self, definition: Ec2InstanceDefinition) -> Ec2Instance {
471        // elastic IP's are a limited resource so only create it if we truly need it.
472        let elastic_ip = if self.use_public_addresses && definition.network_interface_count > 1 {
473            Some(
474                self.client
475                    .allocate_address()
476                    .tag_specifications(
477                        self.tags
478                            .create_tags(ResourceType::ElasticIp, "aws-throwaway"),
479                    )
480                    .send()
481                    .await
482                    .map_err(|e| e.into_service_error())
483                    .unwrap(),
484            )
485        } else {
486            None
487        };
488
489        // if we specify a list of network interfaces we cannot specify an instance level security group
490        let security_group_ids = if definition.network_interface_count == 1 {
491            Some(vec![self.security_group_id.clone()])
492        } else {
493            None
494        };
495
496        // Secondary interfaces should not be used until they are configured.
497        let mut bring_down_secondary_interfaces = String::new();
498        for i in 1..definition.network_interface_count {
499            writeln!(
500                bring_down_secondary_interfaces,
501                "sudo ip link set dev ens{} down",
502                5 + i
503            )
504            .unwrap();
505        }
506
507        let ubuntu_version = match definition.os {
508            InstanceOs::Ubuntu20_04 => "20.04",
509            InstanceOs::Ubuntu22_04 => "22.04",
510        };
511        let image_id = definition.ami.unwrap_or_else(|| format!(
512            "resolve:ssm:/aws/service/canonical/ubuntu/server/{}/stable/current/{}/hvm/ebs-gp2/ami-id",
513            ubuntu_version,
514            super::cpu_arch::get_arch_of_instance_type(definition.instance_type.clone()).get_ubuntu_arch_identifier()
515        ));
516        let result = self
517            .client
518            .run_instances()
519            .instance_type(definition.instance_type)
520            .set_placement(Some(
521                Placement::builder()
522                    .group_name(&self.placement_group_name)
523                    .availability_zone(&self.az_name)
524                    .build(),
525            ))
526            .set_subnet_id(if definition.network_interface_count == 1 {
527                Some(self.subnet_id.to_owned())
528            } else {
529                None
530            })
531            .min_count(1)
532            .max_count(1)
533            .block_device_mappings(
534                BlockDeviceMapping::builder()
535                    .device_name("/dev/sda1")
536                    .ebs(
537                        EbsBlockDevice::builder()
538                            .delete_on_termination(true)
539                            .volume_size(definition.volume_size_gb as i32)
540                            .volume_type(VolumeType::Gp2)
541                            .encrypted(true)
542                            .build(),
543                    )
544                    .build(),
545            )
546            .set_security_group_ids(security_group_ids)
547            .set_network_interfaces(if definition.network_interface_count == 1 {
548                None
549            } else {
550                Some(
551                    (0..definition.network_interface_count)
552                        .map(|i| {
553                            InstanceNetworkInterfaceSpecification::builder()
554                                .delete_on_termination(true)
555                                .device_index(i as i32)
556                                .groups(&self.security_group_id)
557                                // must be false when launching with multiple network interfaces
558                                .associate_public_ip_address(false)
559                                .subnet_id(&self.subnet_id)
560                                .description(i.to_string())
561                                .build()
562                        })
563                        .collect(),
564                )
565            })
566            .key_name(&self.keyname)
567            .user_data(base64::engine::general_purpose::STANDARD.encode(format!(
568                r#"#!/bin/bash
569{bring_down_secondary_interfaces}
570sudo systemctl stop ssh
571echo "{}" > /etc/ssh/ssh_host_ed25519_key.pub
572echo "{}" > /etc/ssh/ssh_host_ed25519_key
573
574echo "ClientAliveInterval 30" >> /etc/ssh/sshd_config
575sudo systemctl start ssh
576"#,
577                self.host_public_key, self.host_private_key
578            )))
579            .tag_specifications(
580                self.tags
581                    .create_tags(ResourceType::Instance, "aws-throwaway"),
582            )
583            .image_id(image_id)
584            .send()
585            .await
586            .map_err(|e| e.into_service_error())
587            .unwrap();
588
589        let instance = result.instances().first().unwrap();
590        let primary_network_interface_id = instance
591            .network_interfaces
592            .as_ref()
593            .unwrap()
594            .iter()
595            .find(|x| x.attachment.as_ref().unwrap().device_index.unwrap() == 0)
596            .unwrap()
597            .network_interface_id
598            .as_ref()
599            .unwrap();
600
601        let network_interfaces = instance
602            .network_interfaces
603            .as_ref()
604            .unwrap()
605            .iter()
606            .map(|x| NetworkInterface {
607                device_index: x.attachment.as_ref().unwrap().device_index.unwrap(),
608                private_ipv4: x.private_ip_address.as_ref().unwrap().parse().unwrap(),
609            })
610            .collect();
611
612        if let Some(elastic_ip) = &elastic_ip {
613            let start = Instant::now();
614            loop {
615                match self
616                    .client
617                    .associate_address()
618                    .allocation_id(elastic_ip.allocation_id.as_ref().unwrap())
619                    .network_interface_id(primary_network_interface_id)
620                    .send()
621                    .await
622                {
623                    Ok(_) => {
624                        break;
625                    }
626                    Err(err) => {
627                        // It is expected to receive the following error if we attempt too early:
628                        // `The pending-instance-running instance to which 'eni-***' is attached is not in a valid state for this operation`
629                        if start.elapsed() > Duration::from_secs(120) {
630                            panic!(
631                                "Received error while associating address after 120s retrying: {}",
632                                err.into_service_error()
633                            );
634                        } else {
635                            tokio::time::sleep(Duration::from_secs(2)).await;
636                        }
637                    }
638                }
639            }
640        }
641
642        let mut public_ip = elastic_ip.map(|x| x.public_ip.unwrap().parse().unwrap());
643        let mut private_ip = None;
644
645        let public_ip_expected = self.use_public_addresses || self.subnet_map_public_ip_on_launch;
646
647        if public_ip_expected {
648            tracing::info!("Waiting for instance private ip and public ip to be assigned");
649        } else {
650            tracing::info!("Waiting for instance private ip to be assigned");
651        }
652        while (public_ip_expected && public_ip.is_none()) || private_ip.is_none() {
653            // There is no way the instance will be ready in 1 second,
654            // so sleep before trying and then after all future attempts
655            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
656
657            let instance = self
658                .client
659                .describe_instances()
660                .instance_ids(instance.instance_id().unwrap())
661                .send()
662                .await
663                .map_err(|e| e.into_service_error());
664            match instance {
665                Ok(instance) => {
666                    for reservation in instance.reservations() {
667                        for instance in reservation.instances() {
668                            if public_ip.is_none() {
669                                public_ip =
670                                    instance.public_ip_address().map(|x| x.parse().unwrap());
671                            }
672                            private_ip = instance.private_ip_address().map(|x| x.parse().unwrap());
673                        }
674                    }
675                }
676                Err(err) => {
677                    // InvalidInstanceID.NotFound can occur when we query too soon after creating the instance,
678                    // so we need to retry when we hit that
679                    if err.code() != Some("InvalidInstanceID.NotFound") {
680                        panic!("Failed to describe instance {err:?}");
681                    }
682                }
683            }
684        }
685        let aws_id = instance.instance_id().unwrap().to_owned();
686
687        let private_ip = private_ip.unwrap();
688        let connect_ip = if self.use_public_addresses {
689            public_ip.unwrap()
690        } else {
691            private_ip
692        };
693        tracing::info!("created EC2 instance at public:{public_ip:?} private:{private_ip}");
694
695        Ec2Instance::new(
696            aws_id,
697            connect_ip,
698            public_ip,
699            private_ip,
700            self.host_public_key_bytes.clone(),
701            self.host_public_key.clone(),
702            &self.client_private_key,
703            network_interfaces,
704        )
705        .await
706    }
707}