cloud_meta/cloud/
amazon.rs

1use std::collections::HashMap;
2use std::io::BufRead;
3use std::time::Duration;
4use http::uri::{Uri, Parts};
5use hyper::{Body, Method, Request};
6use hyper::body::{to_bytes, Bytes};
7use hyper::client::{Client, HttpConnector};
8use serde::{Deserialize, Serialize};
9use serde_json::{Map, Value};
10use crate::Error;
11
12pub struct Amazon {
13    client:   Client<HttpConnector, Body>,
14    endpoint: Parts,
15    version:  String,
16}
17
18#[derive(Debug, Deserialize, Serialize)]
19#[serde(rename_all = "camelCase")]
20pub struct Instance {
21    pub instance_id:   String,
22    pub image_id:      String,
23    pub architecture:  String,
24    pub instance_type: String,
25    pub region:        String,
26
27    #[serde(flatten)]
28    pub extra: HashMap<String, Value>,
29}
30
31impl Amazon {
32    pub fn new(client: Client<HttpConnector, Body>) -> Self {
33        let endpoint = "http://169.254.169.254";
34        let version  = "2021-07-15";
35        Self {
36            client:   client,
37            endpoint: Uri::from_static(endpoint).into_parts(),
38            version:  version.to_owned(),
39        }
40    }
41
42    pub async fn instance(&self, token: Option<&[u8]>) -> Result<Instance, Error> {
43        let path  = "dynamic/instance-identity/document";
44        let bytes = self.get(path, token).await?;
45        Ok(serde_json::from_slice(&bytes)?)
46    }
47
48    #[async_recursion::async_recursion]
49    pub async fn scan(
50        &self,
51        path:  &str,
52        token: Option<&'async_recursion [u8]>
53    ) -> Result<Value, Error> {
54        let bytes = self.get(path, token).await?;
55
56        if !path.ends_with('/') {
57            let value = String::from_utf8(bytes.to_vec())?;
58            return Ok(Value::String(value));
59        }
60
61        let mut map = Map::new();
62        for line in bytes.lines() {
63            let name  = line?;
64            let path  = format!("{}{}", path, name);
65            let value = self.scan(&path, token).await;
66            map.insert(name, value.unwrap_or(Value::Null));
67        }
68        Ok(Value::Object(map))
69    }
70
71    pub async fn get(&self, path: &str, token: Option<&[u8]>) -> Result<Bytes, Error> {
72        let version = &self.version;
73        let path    = format!("/{version}/{path}");
74
75        let mut request = self.request(Method::GET, &path)?;
76
77        if let Some(token) = token {
78            let header = "X-aws-ec2-metadata-token";
79            let value  = token.try_into()?;
80            request.headers_mut().insert(header, value);
81        }
82
83        let response = self.client.request(request).await?;
84        if !response.status().is_success() {
85            return Err(response.status().into());
86        }
87
88        Ok(to_bytes(response).await?)
89    }
90
91    pub async fn token(&self, ttl: Duration) -> Result<Vec<u8>, Error> {
92        let mut request = self.request(Method::PUT, "/latest/api/token")?;
93
94        let header = "X-aws-ec2-metadata-token-ttl-seconds";
95        let value  = ttl.as_secs().to_string().into_bytes().try_into()?;
96        request.headers_mut().insert(header, value);
97
98        let response = self.client.request(request).await?;
99        if !response.status().is_success() {
100            return Err(response.status().into());
101        }
102
103        Ok(to_bytes(response).await?.to_vec())
104    }
105
106    fn request(&self, method: Method, path: &str) -> Result<Request<Body>, Error> {
107        let mut endpoint = Parts::default();
108        endpoint.scheme         = self.endpoint.scheme.clone();
109        endpoint.authority      = self.endpoint.authority.clone();
110        endpoint.path_and_query = Some(path.try_into()?);
111
112        let mut request = Request::new(Body::empty());
113        *request.method_mut() = method;
114        *request.uri_mut() = endpoint.try_into()?;
115
116        Ok(request)
117    }
118}