aws_sdk_manager/
ec2.rs

1use std::{
2    fs::File,
3    io::prelude::*,
4    path::Path,
5    sync::Arc,
6    thread,
7    time::{Duration, Instant},
8};
9
10use crate::{
11    errors::{
12        Error::{Other, API},
13        Result,
14    },
15    utils::rfc3339,
16};
17use aws_sdk_ec2::{
18    error::DeleteKeyPairError,
19    model::{
20        Filter, Instance, InstanceState, InstanceStateName, Tag, Volume, VolumeAttachmentState,
21    },
22    types::SdkError,
23    Client,
24};
25use aws_types::SdkConfig as AwsSdkConfig;
26use chrono::{DateTime, NaiveDateTime, Utc};
27use hyper::{Body, Method, Request};
28use log::{info, warn};
29use serde::{Deserialize, Serialize};
30
31/// Implements AWS EC2 manager.
32#[derive(Debug, Clone)]
33pub struct Manager {
34    #[allow(dead_code)]
35    shared_config: AwsSdkConfig,
36    cli: Client,
37}
38
39impl Manager {
40    pub fn new(shared_config: &AwsSdkConfig) -> Self {
41        let cloned = shared_config.clone();
42        let cli = Client::new(shared_config);
43        Self {
44            shared_config: cloned,
45            cli,
46        }
47    }
48
49    /// Creates an AWS EC2 key-pair and saves the private key to disk.
50    /// It overwrites "key_path" file with the newly created key.
51    pub async fn create_key_pair(&self, key_name: &str, key_path: &str) -> Result<()> {
52        let path = Path::new(key_path);
53        if path.exists() {
54            return Err(Other {
55                message: format!("key path {} already exists", key_path),
56                is_retryable: false,
57            });
58        }
59
60        info!("creating EC2 key-pair '{}'", key_name);
61        let ret = self.cli.create_key_pair().key_name(key_name).send().await;
62        let resp = match ret {
63            Ok(v) => v,
64            Err(e) => {
65                return Err(API {
66                    message: format!("failed create_key_pair {:?}", e),
67                    is_retryable: is_error_retryable(&e),
68                });
69            }
70        };
71
72        info!("saving EC2 key-pair '{}' to '{}'", key_name, key_path);
73        let key_material = resp.key_material().unwrap();
74
75        let mut f = match File::create(&path) {
76            Ok(f) => f,
77            Err(e) => {
78                return Err(Other {
79                    message: format!("failed to create file {:?}", e),
80                    is_retryable: false,
81                });
82            }
83        };
84        match f.write_all(key_material.as_bytes()) {
85            Ok(_) => {}
86            Err(e) => {
87                return Err(Other {
88                    message: format!("failed to write file {:?}", e),
89                    is_retryable: false,
90                });
91            }
92        }
93
94        Ok(())
95    }
96
97    /// Deletes the AWS EC2 key-pair.
98    pub async fn delete_key_pair(&self, key_name: &str) -> Result<()> {
99        info!("deleting EC2 key-pair '{}'", key_name);
100        let ret = self.cli.delete_key_pair().key_name(key_name).send().await;
101        match ret {
102            Ok(_) => {}
103            Err(e) => {
104                if !is_error_delete_key_pair_does_not_exist(&e) {
105                    return Err(API {
106                        message: format!("failed delete_key_pair {:?}", e),
107                        is_retryable: is_error_retryable(&e),
108                    });
109                }
110                warn!("key already deleted ({})", e);
111            }
112        };
113
114        Ok(())
115    }
116
117    /// Describes all attached volumes by instance Id and device.
118    /// If "instance_id" is empty, it fetches from the local EC2 instance's metadata service.
119    /// The region used for API call is inherited from the EC2 client SDK.
120    ///
121    /// e.g.,
122    /// aws ec2 describe-volumes \
123    /// --region ${AWS::Region} \
124    /// --filters \
125    ///   Name=attachment.instance-id,Values=$INSTANCE_ID \
126    ///   Name=attachment.device,Values=/dev/xvdb \
127    /// --query Volumes[].Attachments[].State \
128    /// --output text
129    ///
130    /// ref. https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVolumes.html
131    /// ref. https://github.com/ava-labs/avalanche-ops/blob/fcbac87a219a8d3d6d3c38a1663fe1dafe78e04e/bin/avalancheup-aws/cfn-templates/asg_amd64_ubuntu.yaml#L397-L409
132    ///
133    pub async fn describe_attached_volumes(
134        &self,
135        instance_id: Option<String>,
136        device_path: Option<String>,
137    ) -> Result<Vec<Volume>> {
138        let inst_id = if let Some(inst_id) = instance_id {
139            inst_id
140        } else {
141            fetch_instance_id().await?
142        };
143        info!(
144            "describing volumes via instance Id {} and device {:?}",
145            inst_id, device_path
146        );
147
148        let mut filters = vec![Filter::builder()
149            .set_name(Some(String::from("attachment.instance-id")))
150            .set_values(Some(vec![inst_id.clone()]))
151            .build()];
152        if let Some(device) = device_path {
153            filters.push(
154                Filter::builder()
155                    .set_name(Some(String::from("attachment.device")))
156                    .set_values(Some(vec![device]))
157                    .build(),
158            )
159        }
160
161        let resp = match self
162            .cli
163            .describe_volumes()
164            .set_filters(Some(filters))
165            .send()
166            .await
167        {
168            Ok(r) => r,
169            Err(e) => {
170                return Err(API {
171                    message: format!("failed describe_volumes {:?}", e),
172                    is_retryable: is_error_retryable(&e),
173                });
174            }
175        };
176
177        let volumes = if let Some(vols) = resp.volumes {
178            vols
179        } else {
180            Vec::new()
181        };
182        info!(
183            "described {} volumes for instance {}",
184            volumes.len(),
185            inst_id
186        );
187
188        Ok(volumes)
189    }
190
191    /// Fetches the EBS volume by its attachment state.
192    /// If "instance_id" is empty, it fetches from the local EC2 instance's metadata service.
193    pub async fn get_volume(
194        &self,
195        instance_id: Option<String>,
196        device_path: &str,
197    ) -> Result<Volume> {
198        let inst_id = if let Some(inst_id) = instance_id {
199            inst_id
200        } else {
201            fetch_instance_id().await?
202        };
203
204        info!(
205            "fetchingt EBS volume for '{}' on '{}'",
206            inst_id, device_path
207        );
208
209        let volumes = self
210            .describe_attached_volumes(Some(inst_id), Some(device_path.to_string()))
211            .await?;
212        if volumes.is_empty() {
213            return Err(API {
214                message: "no volume found".to_string(),
215                is_retryable: false,
216            });
217        }
218        if volumes.len() != 1 {
219            return Err(API {
220                message: format!("unexpected volume devices found {}", volumes.len()),
221                is_retryable: false,
222            });
223        }
224        let volume = volumes[0].clone();
225
226        return Ok(volume);
227    }
228
229    /// Polls EBS volume attachment state.
230    /// If "instance_id" is empty, it fetches from the local EC2 instance's metadata service.
231    pub async fn poll_volume_attachment_state(
232        &self,
233        instance_id: Option<String>,
234        device_path: &str,
235        desired_attachment_state: VolumeAttachmentState,
236        timeout: Duration,
237        interval: Duration,
238    ) -> Result<Volume> {
239        let inst_id = if let Some(inst_id) = instance_id {
240            inst_id
241        } else {
242            fetch_instance_id().await?
243        };
244
245        info!(
246            "polling volume attachment state '{}' '{}' with desired state {:?} for timeout {:?} and interval {:?}",
247            inst_id, device_path, desired_attachment_state, timeout, interval,
248        );
249
250        let start = Instant::now();
251        let mut cnt: u128 = 0;
252        loop {
253            let elapsed = start.elapsed();
254            if elapsed.gt(&timeout) {
255                break;
256            }
257
258            let itv = {
259                if cnt == 0 {
260                    // first poll with no wait
261                    Duration::from_secs(1)
262                } else {
263                    interval
264                }
265            };
266            thread::sleep(itv);
267
268            let volume = self.get_volume(Some(inst_id.clone()), device_path).await?;
269            if volume.attachments().is_none() {
270                warn!("no attachment found");
271                continue;
272            }
273            let attachments = volume.attachments().unwrap();
274            if attachments.is_empty() {
275                warn!("no attachment found");
276                continue;
277            }
278            if attachments.len() != 1 {
279                warn!("unexpected attachment found {}", attachments.len());
280                continue;
281            }
282            let current_state = attachments[0].state().unwrap();
283            info!("poll (current {:?}, elapsed {:?})", current_state, elapsed);
284
285            if current_state.eq(&desired_attachment_state) {
286                return Ok(volume);
287            }
288
289            cnt += 1;
290        }
291
292        return Err(Other {
293            message: format!("failed to poll volume state for '{}' in time", inst_id),
294            is_retryable: true,
295        });
296    }
297
298    /// Fetches all tags for the specified instance.
299    ///
300    /// "If a single piece of data must be accessible from more than one task
301    /// concurrently, then it must be shared using synchronization primitives such as Arc."
302    /// ref. https://tokio.rs/tokio/tutorial/spawning
303    pub async fn fetch_tags(&self, instance_id: Arc<String>) -> Result<Vec<Tag>> {
304        info!("fetching tags for '{}'", instance_id);
305        let ret = self
306            .cli
307            .describe_instances()
308            .instance_ids(instance_id.to_string())
309            .send()
310            .await;
311        let resp = match ret {
312            Ok(r) => r,
313            Err(e) => {
314                return Err(API {
315                    message: format!("failed describe_instances {:?}", e),
316                    is_retryable: is_error_retryable(&e),
317                });
318            }
319        };
320
321        let reservations = match resp.reservations {
322            Some(rvs) => rvs,
323            None => {
324                return Err(API {
325                    message: String::from("empty reservation from describe_instances response"),
326                    is_retryable: false,
327                });
328            }
329        };
330        if reservations.len() != 1 {
331            return Err(API {
332                message: format!(
333                    "expected only 1 reservation from describe_instances response but got {}",
334                    reservations.len()
335                ),
336                is_retryable: false,
337            });
338        }
339
340        let rvs = reservations.get(0).unwrap();
341        let instances = rvs.instances.to_owned().unwrap();
342        if instances.len() != 1 {
343            return Err(API {
344                message: format!(
345                    "expected only 1 instance from describe_instances response but got {}",
346                    instances.len()
347                ),
348                is_retryable: false,
349            });
350        }
351
352        let instance = instances.get(0).unwrap();
353        let tags = match instance.tags.to_owned() {
354            Some(ss) => ss,
355            None => {
356                return Err(API {
357                    message: String::from("empty tags from describe_instances response"),
358                    is_retryable: false,
359                });
360            }
361        };
362        info!("fetched {} tags for '{}'", tags.len(), instance_id);
363
364        Ok(tags)
365    }
366
367    /// Lists instances by the Auto Scaling Groups name.
368    pub async fn list_asg(&self, asg_name: &str) -> Result<Vec<Droplet>> {
369        let filter = Filter::builder()
370            .set_name(Some(String::from("tag:aws:autoscaling:groupName")))
371            .set_values(Some(vec![String::from(asg_name)]))
372            .build();
373        let resp = match self
374            .cli
375            .describe_instances()
376            .set_filters(Some(vec![filter]))
377            .send()
378            .await
379        {
380            Ok(r) => r,
381            Err(e) => {
382                return Err(API {
383                    message: format!("failed describe_instances {:?}", e),
384                    is_retryable: is_error_retryable(&e),
385                });
386            }
387        };
388
389        let reservations = match resp.reservations {
390            Some(rvs) => rvs,
391            None => {
392                warn!("empty reservation from describe_instances response");
393                return Ok(vec![]);
394            }
395        };
396
397        let mut droplets: Vec<Droplet> = Vec::new();
398        for rsv in reservations.iter() {
399            let instances = rsv.instances().unwrap();
400            for instance in instances {
401                let instance_id = instance.instance_id().unwrap();
402                info!("instance {}", instance_id);
403                droplets.push(Droplet::new(instance));
404            }
405        }
406
407        Ok(droplets)
408    }
409}
410
411/// Represents the underlying EC2 instance.
412#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
413#[serde(rename_all = "snake_case")]
414pub struct Droplet {
415    pub instance_id: String,
416    /// Represents the data format in RFC3339.
417    /// ref. https://serde.rs/custom-date-format.html
418    #[serde(with = "rfc3339::serde_format")]
419    pub launched_at_utc: DateTime<Utc>,
420    pub instance_state_code: i32,
421    pub instance_state_name: String,
422    pub availability_zone: String,
423    pub public_hostname: String,
424    pub public_ipv4: String,
425}
426
427impl Droplet {
428    pub fn new(inst: &Instance) -> Self {
429        let instance_id = match inst.instance_id.to_owned() {
430            Some(v) => v,
431            None => String::new(),
432        };
433        let launch_time = inst.launch_time().unwrap();
434        let native_dt = NaiveDateTime::from_timestamp(launch_time.secs(), 0);
435        let launched_at_utc = DateTime::<Utc>::from_utc(native_dt, Utc);
436
437        let instance_state = match inst.state.to_owned() {
438            Some(v) => v,
439            None => InstanceState::builder().build(),
440        };
441        let instance_state_code = instance_state.code.unwrap_or(0);
442        let instance_state_name = instance_state
443            .name
444            .unwrap_or_else(|| InstanceStateName::Unknown(String::from("unknown")));
445        let instance_state_name = instance_state_name.as_str().to_string();
446
447        let availability_zone = match inst.placement.to_owned() {
448            Some(v) => match v.availability_zone {
449                Some(v2) => v2,
450                None => String::new(),
451            },
452            None => String::new(),
453        };
454
455        let public_hostname = inst
456            .public_dns_name
457            .to_owned()
458            .unwrap_or_else(|| String::from(""));
459        let public_ipv4 = inst
460            .public_ip_address
461            .to_owned()
462            .unwrap_or_else(|| String::from(""));
463
464        Self {
465            instance_id,
466            launched_at_utc,
467            instance_state_code,
468            instance_state_name,
469            availability_zone,
470            public_hostname,
471            public_ipv4,
472        }
473    }
474}
475
476#[inline]
477pub fn is_error_retryable<E>(e: &SdkError<E>) -> bool {
478    match e {
479        SdkError::TimeoutError(_) | SdkError::ResponseError { .. } => true,
480        SdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
481        _ => false,
482    }
483}
484
485/// EC2 does not return any error for non-existing key deletes, just in case...
486#[inline]
487fn is_error_delete_key_pair_does_not_exist(e: &SdkError<DeleteKeyPairError>) -> bool {
488    match e {
489        SdkError::ServiceError { err, .. } => {
490            let msg = format!("{:?}", err);
491            msg.contains("does not exist")
492        }
493        _ => false,
494    }
495}
496
497/// Fetches the instance ID on the host EC2 machine.
498pub async fn fetch_instance_id() -> Result<String> {
499    fetch_metadata("instance-id").await
500}
501
502/// Fetches the public hostname of the host EC2 machine.
503pub async fn fetch_public_hostname() -> Result<String> {
504    fetch_metadata("public-hostname").await
505}
506
507/// Fetches the public IPv4 address of the host EC2 machine.
508pub async fn fetch_public_ipv4() -> Result<String> {
509    fetch_metadata("public-ipv4").await
510}
511
512/// Fetches the availability of the host EC2 machine.
513pub async fn fetch_availability_zone() -> Result<String> {
514    fetch_metadata("placement/availability-zone").await
515}
516
517/// Fetches the region of the host EC2 machine.
518/// TODO: fix this...
519pub async fn fetch_region() -> Result<String> {
520    let mut az = fetch_availability_zone().await?;
521    az.truncate(az.len() - 1);
522    Ok(az)
523}
524
525/// Fetches instance metadata service v2 with the "path".
526/// ref. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
527/// ref. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
528/// e.g., curl -H "X-aws-ec2-metadata-token: $TOKEN" -v http://169.254.169.254/latest/meta-data/public-ipv4
529async fn fetch_metadata(path: &str) -> Result<String> {
530    info!("fetching meta-data/{}", path);
531
532    let uri = format!("http://169.254.169.254/latest/meta-data/{}", path);
533    let token = fetch_token().await?;
534    let req = match Request::builder()
535        .method(Method::GET)
536        .uri(uri)
537        .header("X-aws-ec2-metadata-token", token)
538        .body(Body::empty())
539    {
540        Ok(r) => r,
541        Err(e) => {
542            return Err(API {
543                message: format!("failed to build GET meta-data/{} {:?}", path, e),
544                is_retryable: false,
545            });
546        }
547    };
548
549    let ret = http_manager::read_bytes(req, Duration::from_secs(5), false, true).await;
550    let rs = match ret {
551        Ok(bytes) => {
552            let s = match String::from_utf8(bytes.to_vec()) {
553                Ok(text) => text,
554                Err(e) => {
555                    return Err(API {
556                        message: format!(
557                            "GET meta-data/{} returned unexpected bytes {:?} ({})",
558                            path, bytes, e
559                        ),
560                        is_retryable: false,
561                    });
562                }
563            };
564            s
565        }
566        Err(e) => {
567            return Err(API {
568                message: format!("failed GET meta-data/{} {:?}", path, e),
569                is_retryable: false,
570            })
571        }
572    };
573    Ok(rs)
574}
575
576/// Serves session token for instance metadata service v2.
577/// ref. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
578/// e.g., curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"
579const IMDS_V2_SESSION_TOKEN_URI: &str = "http://169.254.169.254/latest/api/token";
580
581/// Fetches the IMDS v2 token.
582async fn fetch_token() -> Result<String> {
583    info!("fetching IMDS v2 token");
584
585    let req = match Request::builder()
586        .method(Method::PUT)
587        .uri(IMDS_V2_SESSION_TOKEN_URI)
588        .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
589        .body(Body::empty())
590    {
591        Ok(r) => r,
592        Err(e) => {
593            return Err(API {
594                message: format!("failed to build PUT api/token {:?}", e),
595                is_retryable: false,
596            });
597        }
598    };
599
600    let ret = http_manager::read_bytes(req, Duration::from_secs(5), false, true).await;
601    let token = match ret {
602        Ok(bytes) => {
603            let s = match String::from_utf8(bytes.to_vec()) {
604                Ok(text) => text,
605                Err(e) => {
606                    return Err(API {
607                        message: format!(
608                            "PUT api/token returned unexpected bytes {:?} ({})",
609                            bytes, e
610                        ),
611                        is_retryable: false,
612                    });
613                }
614            };
615            s
616        }
617        Err(e) => {
618            return Err(API {
619                message: format!("failed PUT api/token {:?}", e),
620                is_retryable: false,
621            })
622        }
623    };
624    Ok(token)
625}