cloud_meta/cloud/
amazon.rs1use std::collections::HashMap;
2use std::io::BufRead;
3use std::time::Duration;
4use http::uri::{Uri, Parts};
5use hyper::{Body, Method, Request};
6use hyper::body::{to_bytes, Bytes};
7use hyper::client::{Client, HttpConnector};
8use serde::{Deserialize, Serialize};
9use serde_json::{Map, Value};
10use crate::Error;
11
12pub struct Amazon {
13 client: Client<HttpConnector, Body>,
14 endpoint: Parts,
15 version: String,
16}
17
18#[derive(Debug, Deserialize, Serialize)]
19#[serde(rename_all = "camelCase")]
20pub struct Instance {
21 pub instance_id: String,
22 pub image_id: String,
23 pub architecture: String,
24 pub instance_type: String,
25 pub region: String,
26
27 #[serde(flatten)]
28 pub extra: HashMap<String, Value>,
29}
30
31impl Amazon {
32 pub fn new(client: Client<HttpConnector, Body>) -> Self {
33 let endpoint = "http://169.254.169.254";
34 let version = "2021-07-15";
35 Self {
36 client: client,
37 endpoint: Uri::from_static(endpoint).into_parts(),
38 version: version.to_owned(),
39 }
40 }
41
42 pub async fn instance(&self, token: Option<&[u8]>) -> Result<Instance, Error> {
43 let path = "dynamic/instance-identity/document";
44 let bytes = self.get(path, token).await?;
45 Ok(serde_json::from_slice(&bytes)?)
46 }
47
48 #[async_recursion::async_recursion]
49 pub async fn scan(
50 &self,
51 path: &str,
52 token: Option<&'async_recursion [u8]>
53 ) -> Result<Value, Error> {
54 let bytes = self.get(path, token).await?;
55
56 if !path.ends_with('/') {
57 let value = String::from_utf8(bytes.to_vec())?;
58 return Ok(Value::String(value));
59 }
60
61 let mut map = Map::new();
62 for line in bytes.lines() {
63 let name = line?;
64 let path = format!("{}{}", path, name);
65 let value = self.scan(&path, token).await;
66 map.insert(name, value.unwrap_or(Value::Null));
67 }
68 Ok(Value::Object(map))
69 }
70
71 pub async fn get(&self, path: &str, token: Option<&[u8]>) -> Result<Bytes, Error> {
72 let version = &self.version;
73 let path = format!("/{version}/{path}");
74
75 let mut request = self.request(Method::GET, &path)?;
76
77 if let Some(token) = token {
78 let header = "X-aws-ec2-metadata-token";
79 let value = token.try_into()?;
80 request.headers_mut().insert(header, value);
81 }
82
83 let response = self.client.request(request).await?;
84 if !response.status().is_success() {
85 return Err(response.status().into());
86 }
87
88 Ok(to_bytes(response).await?)
89 }
90
91 pub async fn token(&self, ttl: Duration) -> Result<Vec<u8>, Error> {
92 let mut request = self.request(Method::PUT, "/latest/api/token")?;
93
94 let header = "X-aws-ec2-metadata-token-ttl-seconds";
95 let value = ttl.as_secs().to_string().into_bytes().try_into()?;
96 request.headers_mut().insert(header, value);
97
98 let response = self.client.request(request).await?;
99 if !response.status().is_success() {
100 return Err(response.status().into());
101 }
102
103 Ok(to_bytes(response).await?.to_vec())
104 }
105
106 fn request(&self, method: Method, path: &str) -> Result<Request<Body>, Error> {
107 let mut endpoint = Parts::default();
108 endpoint.scheme = self.endpoint.scheme.clone();
109 endpoint.authority = self.endpoint.authority.clone();
110 endpoint.path_and_query = Some(path.try_into()?);
111
112 let mut request = Request::new(Body::empty());
113 *request.method_mut() = method;
114 *request.uri_mut() = endpoint.try_into()?;
115
116 Ok(request)
117 }
118}