1use std::{
2 fs::File,
3 io::prelude::*,
4 path::Path,
5 sync::Arc,
6 thread,
7 time::{Duration, Instant},
8};
9
10use crate::{
11 errors::{
12 Error::{Other, API},
13 Result,
14 },
15 utils::rfc3339,
16};
17use aws_sdk_ec2::{
18 error::DeleteKeyPairError,
19 model::{
20 Filter, Instance, InstanceState, InstanceStateName, Tag, Volume, VolumeAttachmentState,
21 },
22 types::SdkError,
23 Client,
24};
25use aws_types::SdkConfig as AwsSdkConfig;
26use chrono::{DateTime, NaiveDateTime, Utc};
27use hyper::{Body, Method, Request};
28use log::{info, warn};
29use serde::{Deserialize, Serialize};
30
31#[derive(Debug, Clone)]
33pub struct Manager {
34 #[allow(dead_code)]
35 shared_config: AwsSdkConfig,
36 cli: Client,
37}
38
39impl Manager {
40 pub fn new(shared_config: &AwsSdkConfig) -> Self {
41 let cloned = shared_config.clone();
42 let cli = Client::new(shared_config);
43 Self {
44 shared_config: cloned,
45 cli,
46 }
47 }
48
49 pub async fn create_key_pair(&self, key_name: &str, key_path: &str) -> Result<()> {
52 let path = Path::new(key_path);
53 if path.exists() {
54 return Err(Other {
55 message: format!("key path {} already exists", key_path),
56 is_retryable: false,
57 });
58 }
59
60 info!("creating EC2 key-pair '{}'", key_name);
61 let ret = self.cli.create_key_pair().key_name(key_name).send().await;
62 let resp = match ret {
63 Ok(v) => v,
64 Err(e) => {
65 return Err(API {
66 message: format!("failed create_key_pair {:?}", e),
67 is_retryable: is_error_retryable(&e),
68 });
69 }
70 };
71
72 info!("saving EC2 key-pair '{}' to '{}'", key_name, key_path);
73 let key_material = resp.key_material().unwrap();
74
75 let mut f = match File::create(&path) {
76 Ok(f) => f,
77 Err(e) => {
78 return Err(Other {
79 message: format!("failed to create file {:?}", e),
80 is_retryable: false,
81 });
82 }
83 };
84 match f.write_all(key_material.as_bytes()) {
85 Ok(_) => {}
86 Err(e) => {
87 return Err(Other {
88 message: format!("failed to write file {:?}", e),
89 is_retryable: false,
90 });
91 }
92 }
93
94 Ok(())
95 }
96
97 pub async fn delete_key_pair(&self, key_name: &str) -> Result<()> {
99 info!("deleting EC2 key-pair '{}'", key_name);
100 let ret = self.cli.delete_key_pair().key_name(key_name).send().await;
101 match ret {
102 Ok(_) => {}
103 Err(e) => {
104 if !is_error_delete_key_pair_does_not_exist(&e) {
105 return Err(API {
106 message: format!("failed delete_key_pair {:?}", e),
107 is_retryable: is_error_retryable(&e),
108 });
109 }
110 warn!("key already deleted ({})", e);
111 }
112 };
113
114 Ok(())
115 }
116
117 pub async fn describe_attached_volumes(
134 &self,
135 instance_id: Option<String>,
136 device_path: Option<String>,
137 ) -> Result<Vec<Volume>> {
138 let inst_id = if let Some(inst_id) = instance_id {
139 inst_id
140 } else {
141 fetch_instance_id().await?
142 };
143 info!(
144 "describing volumes via instance Id {} and device {:?}",
145 inst_id, device_path
146 );
147
148 let mut filters = vec![Filter::builder()
149 .set_name(Some(String::from("attachment.instance-id")))
150 .set_values(Some(vec![inst_id.clone()]))
151 .build()];
152 if let Some(device) = device_path {
153 filters.push(
154 Filter::builder()
155 .set_name(Some(String::from("attachment.device")))
156 .set_values(Some(vec![device]))
157 .build(),
158 )
159 }
160
161 let resp = match self
162 .cli
163 .describe_volumes()
164 .set_filters(Some(filters))
165 .send()
166 .await
167 {
168 Ok(r) => r,
169 Err(e) => {
170 return Err(API {
171 message: format!("failed describe_volumes {:?}", e),
172 is_retryable: is_error_retryable(&e),
173 });
174 }
175 };
176
177 let volumes = if let Some(vols) = resp.volumes {
178 vols
179 } else {
180 Vec::new()
181 };
182 info!(
183 "described {} volumes for instance {}",
184 volumes.len(),
185 inst_id
186 );
187
188 Ok(volumes)
189 }
190
191 pub async fn get_volume(
194 &self,
195 instance_id: Option<String>,
196 device_path: &str,
197 ) -> Result<Volume> {
198 let inst_id = if let Some(inst_id) = instance_id {
199 inst_id
200 } else {
201 fetch_instance_id().await?
202 };
203
204 info!(
205 "fetchingt EBS volume for '{}' on '{}'",
206 inst_id, device_path
207 );
208
209 let volumes = self
210 .describe_attached_volumes(Some(inst_id), Some(device_path.to_string()))
211 .await?;
212 if volumes.is_empty() {
213 return Err(API {
214 message: "no volume found".to_string(),
215 is_retryable: false,
216 });
217 }
218 if volumes.len() != 1 {
219 return Err(API {
220 message: format!("unexpected volume devices found {}", volumes.len()),
221 is_retryable: false,
222 });
223 }
224 let volume = volumes[0].clone();
225
226 return Ok(volume);
227 }
228
229 pub async fn poll_volume_attachment_state(
232 &self,
233 instance_id: Option<String>,
234 device_path: &str,
235 desired_attachment_state: VolumeAttachmentState,
236 timeout: Duration,
237 interval: Duration,
238 ) -> Result<Volume> {
239 let inst_id = if let Some(inst_id) = instance_id {
240 inst_id
241 } else {
242 fetch_instance_id().await?
243 };
244
245 info!(
246 "polling volume attachment state '{}' '{}' with desired state {:?} for timeout {:?} and interval {:?}",
247 inst_id, device_path, desired_attachment_state, timeout, interval,
248 );
249
250 let start = Instant::now();
251 let mut cnt: u128 = 0;
252 loop {
253 let elapsed = start.elapsed();
254 if elapsed.gt(&timeout) {
255 break;
256 }
257
258 let itv = {
259 if cnt == 0 {
260 Duration::from_secs(1)
262 } else {
263 interval
264 }
265 };
266 thread::sleep(itv);
267
268 let volume = self.get_volume(Some(inst_id.clone()), device_path).await?;
269 if volume.attachments().is_none() {
270 warn!("no attachment found");
271 continue;
272 }
273 let attachments = volume.attachments().unwrap();
274 if attachments.is_empty() {
275 warn!("no attachment found");
276 continue;
277 }
278 if attachments.len() != 1 {
279 warn!("unexpected attachment found {}", attachments.len());
280 continue;
281 }
282 let current_state = attachments[0].state().unwrap();
283 info!("poll (current {:?}, elapsed {:?})", current_state, elapsed);
284
285 if current_state.eq(&desired_attachment_state) {
286 return Ok(volume);
287 }
288
289 cnt += 1;
290 }
291
292 return Err(Other {
293 message: format!("failed to poll volume state for '{}' in time", inst_id),
294 is_retryable: true,
295 });
296 }
297
298 pub async fn fetch_tags(&self, instance_id: Arc<String>) -> Result<Vec<Tag>> {
304 info!("fetching tags for '{}'", instance_id);
305 let ret = self
306 .cli
307 .describe_instances()
308 .instance_ids(instance_id.to_string())
309 .send()
310 .await;
311 let resp = match ret {
312 Ok(r) => r,
313 Err(e) => {
314 return Err(API {
315 message: format!("failed describe_instances {:?}", e),
316 is_retryable: is_error_retryable(&e),
317 });
318 }
319 };
320
321 let reservations = match resp.reservations {
322 Some(rvs) => rvs,
323 None => {
324 return Err(API {
325 message: String::from("empty reservation from describe_instances response"),
326 is_retryable: false,
327 });
328 }
329 };
330 if reservations.len() != 1 {
331 return Err(API {
332 message: format!(
333 "expected only 1 reservation from describe_instances response but got {}",
334 reservations.len()
335 ),
336 is_retryable: false,
337 });
338 }
339
340 let rvs = reservations.get(0).unwrap();
341 let instances = rvs.instances.to_owned().unwrap();
342 if instances.len() != 1 {
343 return Err(API {
344 message: format!(
345 "expected only 1 instance from describe_instances response but got {}",
346 instances.len()
347 ),
348 is_retryable: false,
349 });
350 }
351
352 let instance = instances.get(0).unwrap();
353 let tags = match instance.tags.to_owned() {
354 Some(ss) => ss,
355 None => {
356 return Err(API {
357 message: String::from("empty tags from describe_instances response"),
358 is_retryable: false,
359 });
360 }
361 };
362 info!("fetched {} tags for '{}'", tags.len(), instance_id);
363
364 Ok(tags)
365 }
366
367 pub async fn list_asg(&self, asg_name: &str) -> Result<Vec<Droplet>> {
369 let filter = Filter::builder()
370 .set_name(Some(String::from("tag:aws:autoscaling:groupName")))
371 .set_values(Some(vec![String::from(asg_name)]))
372 .build();
373 let resp = match self
374 .cli
375 .describe_instances()
376 .set_filters(Some(vec![filter]))
377 .send()
378 .await
379 {
380 Ok(r) => r,
381 Err(e) => {
382 return Err(API {
383 message: format!("failed describe_instances {:?}", e),
384 is_retryable: is_error_retryable(&e),
385 });
386 }
387 };
388
389 let reservations = match resp.reservations {
390 Some(rvs) => rvs,
391 None => {
392 warn!("empty reservation from describe_instances response");
393 return Ok(vec![]);
394 }
395 };
396
397 let mut droplets: Vec<Droplet> = Vec::new();
398 for rsv in reservations.iter() {
399 let instances = rsv.instances().unwrap();
400 for instance in instances {
401 let instance_id = instance.instance_id().unwrap();
402 info!("instance {}", instance_id);
403 droplets.push(Droplet::new(instance));
404 }
405 }
406
407 Ok(droplets)
408 }
409}
410
411#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
413#[serde(rename_all = "snake_case")]
414pub struct Droplet {
415 pub instance_id: String,
416 #[serde(with = "rfc3339::serde_format")]
419 pub launched_at_utc: DateTime<Utc>,
420 pub instance_state_code: i32,
421 pub instance_state_name: String,
422 pub availability_zone: String,
423 pub public_hostname: String,
424 pub public_ipv4: String,
425}
426
427impl Droplet {
428 pub fn new(inst: &Instance) -> Self {
429 let instance_id = match inst.instance_id.to_owned() {
430 Some(v) => v,
431 None => String::new(),
432 };
433 let launch_time = inst.launch_time().unwrap();
434 let native_dt = NaiveDateTime::from_timestamp(launch_time.secs(), 0);
435 let launched_at_utc = DateTime::<Utc>::from_utc(native_dt, Utc);
436
437 let instance_state = match inst.state.to_owned() {
438 Some(v) => v,
439 None => InstanceState::builder().build(),
440 };
441 let instance_state_code = instance_state.code.unwrap_or(0);
442 let instance_state_name = instance_state
443 .name
444 .unwrap_or_else(|| InstanceStateName::Unknown(String::from("unknown")));
445 let instance_state_name = instance_state_name.as_str().to_string();
446
447 let availability_zone = match inst.placement.to_owned() {
448 Some(v) => match v.availability_zone {
449 Some(v2) => v2,
450 None => String::new(),
451 },
452 None => String::new(),
453 };
454
455 let public_hostname = inst
456 .public_dns_name
457 .to_owned()
458 .unwrap_or_else(|| String::from(""));
459 let public_ipv4 = inst
460 .public_ip_address
461 .to_owned()
462 .unwrap_or_else(|| String::from(""));
463
464 Self {
465 instance_id,
466 launched_at_utc,
467 instance_state_code,
468 instance_state_name,
469 availability_zone,
470 public_hostname,
471 public_ipv4,
472 }
473 }
474}
475
476#[inline]
477pub fn is_error_retryable<E>(e: &SdkError<E>) -> bool {
478 match e {
479 SdkError::TimeoutError(_) | SdkError::ResponseError { .. } => true,
480 SdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
481 _ => false,
482 }
483}
484
485#[inline]
487fn is_error_delete_key_pair_does_not_exist(e: &SdkError<DeleteKeyPairError>) -> bool {
488 match e {
489 SdkError::ServiceError { err, .. } => {
490 let msg = format!("{:?}", err);
491 msg.contains("does not exist")
492 }
493 _ => false,
494 }
495}
496
497pub async fn fetch_instance_id() -> Result<String> {
499 fetch_metadata("instance-id").await
500}
501
502pub async fn fetch_public_hostname() -> Result<String> {
504 fetch_metadata("public-hostname").await
505}
506
507pub async fn fetch_public_ipv4() -> Result<String> {
509 fetch_metadata("public-ipv4").await
510}
511
512pub async fn fetch_availability_zone() -> Result<String> {
514 fetch_metadata("placement/availability-zone").await
515}
516
517pub async fn fetch_region() -> Result<String> {
520 let mut az = fetch_availability_zone().await?;
521 az.truncate(az.len() - 1);
522 Ok(az)
523}
524
525async fn fetch_metadata(path: &str) -> Result<String> {
530 info!("fetching meta-data/{}", path);
531
532 let uri = format!("http://169.254.169.254/latest/meta-data/{}", path);
533 let token = fetch_token().await?;
534 let req = match Request::builder()
535 .method(Method::GET)
536 .uri(uri)
537 .header("X-aws-ec2-metadata-token", token)
538 .body(Body::empty())
539 {
540 Ok(r) => r,
541 Err(e) => {
542 return Err(API {
543 message: format!("failed to build GET meta-data/{} {:?}", path, e),
544 is_retryable: false,
545 });
546 }
547 };
548
549 let ret = http_manager::read_bytes(req, Duration::from_secs(5), false, true).await;
550 let rs = match ret {
551 Ok(bytes) => {
552 let s = match String::from_utf8(bytes.to_vec()) {
553 Ok(text) => text,
554 Err(e) => {
555 return Err(API {
556 message: format!(
557 "GET meta-data/{} returned unexpected bytes {:?} ({})",
558 path, bytes, e
559 ),
560 is_retryable: false,
561 });
562 }
563 };
564 s
565 }
566 Err(e) => {
567 return Err(API {
568 message: format!("failed GET meta-data/{} {:?}", path, e),
569 is_retryable: false,
570 })
571 }
572 };
573 Ok(rs)
574}
575
576const IMDS_V2_SESSION_TOKEN_URI: &str = "http://169.254.169.254/latest/api/token";
580
581async fn fetch_token() -> Result<String> {
583 info!("fetching IMDS v2 token");
584
585 let req = match Request::builder()
586 .method(Method::PUT)
587 .uri(IMDS_V2_SESSION_TOKEN_URI)
588 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
589 .body(Body::empty())
590 {
591 Ok(r) => r,
592 Err(e) => {
593 return Err(API {
594 message: format!("failed to build PUT api/token {:?}", e),
595 is_retryable: false,
596 });
597 }
598 };
599
600 let ret = http_manager::read_bytes(req, Duration::from_secs(5), false, true).await;
601 let token = match ret {
602 Ok(bytes) => {
603 let s = match String::from_utf8(bytes.to_vec()) {
604 Ok(text) => text,
605 Err(e) => {
606 return Err(API {
607 message: format!(
608 "PUT api/token returned unexpected bytes {:?} ({})",
609 bytes, e
610 ),
611 is_retryable: false,
612 });
613 }
614 };
615 s
616 }
617 Err(e) => {
618 return Err(API {
619 message: format!("failed PUT api/token {:?}", e),
620 is_retryable: false,
621 })
622 }
623 };
624 Ok(token)
625}