1use std::time::Instant;
3
4use crate::cloud_provider::Inventoriable;
5use crate::get_version;
6use crate::usage_location::*;
7
8use anyhow::{Context, Error, Result};
9use aws_sdk_cloudwatch::operation::get_metric_statistics::GetMetricStatisticsOutput;
10use aws_sdk_cloudwatch::types::{Dimension, StandardUnit, Statistic};
11use aws_sdk_ec2::config::Region;
12use aws_sdk_ec2::types::{Instance, InstanceStateName, Volume};
13use chrono::{TimeDelta, Utc};
14
15use crate::model::{
16 CloudProvider, CloudResource, CloudResourceTag, ExecutionStatistics, InstanceState,
17 InstanceUsage, Inventory, InventoryMetadata, ResourceDetails, StorageAttachment, StorageUsage,
18};
19use async_trait::async_trait;
20use aws_types::SdkConfig;
21
22#[derive(Clone, Debug)]
24pub struct AwsCloudProvider {
25 aws_region: String,
26 ec2_client: aws_sdk_ec2::Client,
27 cloudwatch_client: aws_sdk_cloudwatch::Client,
28}
29
30impl AwsCloudProvider {
31 pub async fn new(aws_region: &str) -> Result<Self> {
35 let shared_config = Self::load_aws_config(aws_region).await?;
36 let retained_region = Self::get_configured_region_or_exit_if_unsupported(&shared_config);
37
38 Ok(AwsCloudProvider {
39 aws_region: retained_region,
40 ec2_client: aws_sdk_ec2::Client::new(&shared_config),
41 cloudwatch_client: aws_sdk_cloudwatch::Client::new(&shared_config),
42 })
43 }
44
45 async fn load_aws_config(aws_region: &str) -> Result<SdkConfig> {
50 if aws_region.is_empty() {
51 let sdk_config = aws_config::load_from_env().await;
53
54 match sdk_config.region() {
55 None => {
56 warn!("Tried to initialize AWS client without a region.");
57 }
58 Some(region) => {
59 warn!(
60 "Initialized AWS client from the default region picked up from environment [{}]", region
61 );
62 }
63 }
64 Ok(sdk_config)
65 } else {
66 let sdk_config = aws_config::from_env()
68 .region(Region::new(aws_region.to_string()))
69 .load()
70 .await;
71 info!("Initialized AWS client with with region [{}]", aws_region);
72 Ok(sdk_config)
73 }
74 }
75
76 fn get_configured_region_or_exit_if_unsupported(sdk_config: &SdkConfig) -> String {
78 if let Some(retained_region) = sdk_config.region() {
79 let tmp_reg = UsageLocation::try_from(retained_region.as_ref());
81 if tmp_reg.is_err() {
82 error!(
83 "Cannot initialize AWS client for region ({}). Exiting.",
84 retained_region
85 );
86 panic!();
87 }
88 retained_region.to_string().to_owned()
89 } else {
90 error!("Unable to configure AWS client region. You should consider setting a AWS_DEFAULT_REGION as environment variable or pass region as a CLI parameter.... Exiting...");
91 panic!();
92 }
93 }
94
95 fn cloud_resource_tags_from_aws_tags(
97 aws_tags: &[aws_sdk_ec2::types::Tag],
98 ) -> Vec<CloudResourceTag> {
99 let mut cs_tags: Vec<CloudResourceTag> = Vec::new();
100 for nt in aws_tags.iter() {
101 let k = nt.key.to_owned().unwrap();
102 let v = nt.value.to_owned();
103 cs_tags.push(CloudResourceTag { key: k, value: v });
104 }
105 cs_tags
106 }
107
108 async fn get_instances_with_usage_data(&self, tags: &[String]) -> Result<Vec<CloudResource>> {
110 let instances: Vec<Instance> = self
111 .clone()
112 .list_instances(tags)
113 .await
114 .context("Cannot list instances")?;
115 let location = UsageLocation::try_from(self.aws_region.as_str())?;
116
117 let cpu_info_timer = Instant::now();
119
120 let mut inventory: Vec<CloudResource> = Vec::new();
121 for instance in instances {
122 let instance_id = instance.instance_id().unwrap().to_string();
123 let cpuload: f64 = self
124 .clone()
125 .get_average_cpu(&instance_id)
126 .await
127 .context("Cannot get CPU load of instance")?;
128
129 let usage: InstanceUsage = InstanceUsage {
130 average_cpu_load: cpuload,
131 state: Self::aws_state_to_generic(instance.clone()),
132 };
133
134 let cloud_resource_tags = Self::cloud_resource_tags_from_aws_tags(instance.tags());
135
136 info!(
137 "Total time spend querying CPU load of instances: {:?}",
138 cpu_info_timer.elapsed()
139 );
140
141 let inst = CloudResource {
142 provider: CloudProvider::AWS,
143 id: instance_id,
144 location: location.clone(),
145 resource_details: ResourceDetails::Instance {
146 instance_type: instance.instance_type().unwrap().as_str().to_owned(),
147 usage: Some(usage),
148 },
149
150 tags: cloud_resource_tags,
151 };
152
153 if inst.has_matching_tags(tags) {
154 debug!("Resource matched on tags: {:?}", inst.id);
155 inventory.push(inst);
156 } else {
157 debug!("Filtered instance (tags do not match: {:?}", inst);
158 }
159 }
161
162 Ok(inventory)
163 }
164
165 fn aws_state_to_generic(instance: Instance) -> InstanceState {
167 if let Some(state) = instance.state {
168 if let Some(state_name) = state.name {
169 match state_name {
170 InstanceStateName::Stopped => InstanceState::Stopped,
171 InstanceStateName::Terminated => InstanceState::Stopped,
172 _ => InstanceState::Running,
173 }
174 } else {
175 InstanceState::Running
176 }
177 } else {
178 InstanceState::Running
179 }
180 }
181
182 async fn list_instances(self, _tags: &[String]) -> Result<Vec<Instance>> {
186 let client = &self.ec2_client;
187 let mut instances: Vec<Instance> = Vec::new();
188 let resp = client
192 .describe_instances()
193 .send()
195 .await?;
196
197 for reservation in resp.reservations() {
198 for instance in reservation.instances() {
199 instances.push(instance.clone());
200 }
201 }
202 Ok(instances)
203 }
204
205 async fn get_average_cpu(self, instance_id: &str) -> Result<f64> {
208 let res = self
209 .get_average_cpu_usage_of_last_10_minutes(instance_id)
210 .await
211 .with_context(|| {
212 format!(
213 "Cannot retrieve average CPU load of instance: {}",
214 instance_id
215 )
216 })?;
217 if let Some(points) = res.datapoints {
218 if !points.is_empty() {
219 debug!("Averaging cpu load data point: {:#?}", points);
220 let mut sum: f64 = 0.0;
221 for x in &points {
222 sum += x.average().unwrap();
223 }
224 let avg = sum / points.len() as f64;
225 return Ok(avg);
226 }
227 }
228 warn!(
229 "Unable to get CPU load of instance {}, it is likely stopped, using 0 as load",
230 instance_id
231 );
232 Ok(0 as f64)
233 }
234
235 async fn get_average_cpu_usage_of_last_10_minutes(
237 self,
238 instance_id: &str,
239 ) -> Result<GetMetricStatisticsOutput, Error> {
240 let measure_duration: chrono::TimeDelta =
242 TimeDelta::try_minutes(10).context("Unsupported duration")?;
243 let sample_period_seconds = 300; let now: chrono::DateTime<Utc> = Utc::now();
245 let start_time: chrono::DateTime<Utc> = now - measure_duration;
246
247 let cpu_metric_name = String::from("CPUUtilization");
248 let ec2_namespace = "AWS/EC2";
249
250 let dimensions = vec![Dimension::builder()
251 .name("InstanceId")
252 .value(instance_id)
253 .build()];
254
255 let end_time_aws: aws_sdk_cloudwatch::primitives::DateTime =
256 aws_sdk_cloudwatch::primitives::DateTime::from_secs(now.timestamp());
257 let start_time_aws: aws_sdk_cloudwatch::primitives::DateTime =
258 aws_sdk_cloudwatch::primitives::DateTime::from_secs(start_time.timestamp());
259
260 let resp: GetMetricStatisticsOutput = self
261 .cloudwatch_client
262 .get_metric_statistics()
263 .end_time(end_time_aws)
264 .metric_name(cpu_metric_name)
265 .namespace(ec2_namespace)
266 .period(sample_period_seconds)
267 .set_dimensions(Some(dimensions))
268 .start_time(start_time_aws)
269 .statistics(Statistic::Average)
270 .unit(StandardUnit::Percent)
271 .send()
272 .await
273 .context("Trying to get cloudwatch statistics")?;
274
275 Ok(resp)
276 }
277
278 async fn list_volumes(self, tags: &[String]) -> Result<Vec<Volume>> {
282 warn!(
283 "Warning: filtering volumes on tags is not implemented {:?}",
284 tags
285 );
286 let client = &self.ec2_client;
287 let mut volumes: Vec<Volume> = Vec::new();
288 let resp = client
291 .describe_volumes()
292 .send()
294 .await?;
295 for v in resp.volumes() {
296 volumes.push(v.clone());
297 }
298 Ok(volumes)
299 }
300
301 async fn get_volumes_with_usage_data(&self, tags: &[String]) -> Result<Vec<CloudResource>> {
303 let location = UsageLocation::try_from(self.aws_region.as_str())?;
304 let volumes = self.clone().list_volumes(tags).await.unwrap();
305 let mut resources: Vec<CloudResource> = Vec::new();
306
307 for volume in volumes {
308 let volume_id = volume.volume_id().unwrap();
309
310 let usage: StorageUsage = StorageUsage {
311 size_gb: volume.size().unwrap(),
312 };
313
314 let volume_type: String = volume.volume_type().unwrap().as_str().to_string();
315 let mut attached_instances: Option<Vec<StorageAttachment>> = None;
316
317 if let Some(all_volume_attachments) = volume.attachments.clone() {
318 for single_attachment in all_volume_attachments {
319 let mut attachment_list: Vec<StorageAttachment> = Vec::new();
320
321 if let Some(instance_id) = single_attachment.instance_id {
322 attachment_list.push(StorageAttachment { instance_id });
323 }
324 attached_instances = Some(attachment_list);
325 }
326 }
327
328 let disk = CloudResource {
329 provider: CloudProvider::AWS,
330 id: volume_id.into(),
331 location: location.clone(),
332 resource_details: ResourceDetails::BlockStorage {
333 storage_type: volume_type,
334 usage: Some(usage),
335 attached_instances,
336 },
337 tags: Self::cloud_resource_tags_from_aws_tags(volume.tags()),
338 };
339 resources.push(disk);
340 }
341
342 Ok(resources)
343 }
344}
345
346#[async_trait]
347impl Inventoriable for AwsCloudProvider {
348 async fn list_resources(
350 &self,
351 tags: &[String],
352 include_block_storage: bool,
353 ) -> Result<Inventory> {
354 let start = Instant::now();
355
356 let mut resources: Vec<CloudResource> = Vec::new();
357
358 let mut instances = self.clone().get_instances_with_usage_data(tags).await?;
359 resources.append(&mut instances);
360 if include_block_storage {
361 let mut volumes = self.clone().get_volumes_with_usage_data(tags).await?;
362 resources.append(&mut volumes);
363 }
364 let stats = ExecutionStatistics {
365 inventory_duration: start.elapsed(),
366 impact_estimation_duration: std::time::Duration::from_millis(0),
367 total_duration: start.elapsed(),
368 };
369 warn!("{:?}", stats);
370
371 let metadata = InventoryMetadata {
372 inventory_date: Some(Utc::now()),
373 description: Some(String::from("About this inventory")),
374 cloud_scanner_version: Some(get_version()),
375 execution_statistics: Some(stats),
376 };
377
378 let inventory = Inventory {
379 metadata,
380 resources,
381 };
382 Ok(inventory)
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::model::vec_to_map;
390
391 static RUNNING_INSTANCE_ID: &str = "i-03c8f84a6318a8186";
392
393 #[tokio::test]
394 #[ignore]
395 async fn inventory_should_return_correct_number_of_instances() {
396 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
397 let filtertags: Vec<String> = Vec::new();
398 let res: Vec<CloudResource> = aws
399 .get_instances_with_usage_data(&filtertags)
400 .await
401 .context("Failed to list")
402 .unwrap();
403 assert_eq!(4, res.len());
404
405 let inst = res.first().unwrap();
406 assert_eq!(3, inst.tags.len(), "Wrong number of tags");
407 let tag_map = vec_to_map(inst.tags.clone());
408 let v = tag_map.get("Name").unwrap();
409 assert_eq!(
410 Some("test-boapi".to_string()),
411 v.to_owned(),
412 "Wrong tag value"
413 );
414 }
415
416 #[tokio::test]
417 async fn test_create_sdk_config_works_with_wrong_region() {
418 let region: &str = "eu-west-3";
419 let config = AwsCloudProvider::load_aws_config(region).await.unwrap();
420 assert_eq!(region, config.region().unwrap().to_string());
421
422 let wrong_region: &str = "impossible-region";
423 let config = AwsCloudProvider::load_aws_config(wrong_region)
424 .await
425 .unwrap();
426 assert_eq!(wrong_region, config.region().unwrap().to_string())
427 }
428
429 #[tokio::test]
430 #[ignore]
431 async fn get_cpu_usage_metrics_of_running_instance_should_return_right_number_of_data_points() {
432 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
433 let res = aws
434 .get_average_cpu_usage_of_last_10_minutes(&RUNNING_INSTANCE_ID)
435 .await
436 .unwrap();
437 let datapoints = res.datapoints.unwrap();
438 assert!(
439 0 < datapoints.len() && datapoints.len() < 3,
440 "Strange number of datapoint returned for instance {}, is it really up ?. I was expecting 1 or 2 but got {} .\n {:#?}",
441 &RUNNING_INSTANCE_ID,
442 datapoints.len(),
443 datapoints
444 )
445 }
446
447 #[tokio::test]
448 #[ignore]
449 async fn test_get_instance_usage_metrics_of_shutdown_instance() {
450 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
451 let instance_id = "i-03e0b3b1246001382";
452 let res = aws
453 .get_average_cpu_usage_of_last_10_minutes(instance_id)
454 .await
455 .unwrap();
456 let datapoints = res.datapoints.unwrap();
457 assert_eq!(0, datapoints.len(), "Wrong number of datapoint returned");
458 }
459
460 #[tokio::test]
461 #[ignore]
462 async fn test_get_instance_usage_metrics_of_non_existing_instance() {
463 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
464 let instance_id = "IDONOTEXISTS";
465 let res = aws
466 .get_average_cpu_usage_of_last_10_minutes(instance_id)
467 .await
468 .unwrap();
469 let datapoints = res.datapoints.unwrap();
470 assert_eq!(0, datapoints.len());
471 }
472
473 #[tokio::test]
474 #[ignore]
475 async fn test_average_cpu_load_of_running_instance_is_not_zero() {
476 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
478
479 let avg_cpu_load = aws.get_average_cpu(&RUNNING_INSTANCE_ID).await.unwrap();
480 assert_ne!(
481 0 as f64, avg_cpu_load,
482 "CPU load of instance {} is zero, is it really running ?",
483 &RUNNING_INSTANCE_ID
484 );
485 println!("{:#?}", avg_cpu_load);
486 assert!((0 as f64) < avg_cpu_load);
487 assert!((100 as f64) > avg_cpu_load);
488 }
489
490 #[tokio::test]
491 #[ignore]
492 async fn test_average_cpu_load_of_non_existing_instance_is_zero() {
493 let instance_id = "IDONOTEXISTS";
494 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
495 let res = aws.get_average_cpu(instance_id).await.unwrap();
496 assert_eq!(0 as f64, res);
497 }
498
499 #[tokio::test]
500 #[ignore]
501 async fn test_average_cpu_load_of_shutdown_instance_is_zero() {
502 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
503 let instance_id = "i-03e0b3b1246001382";
504 let res = aws.get_average_cpu(instance_id).await.unwrap();
505 assert_eq!(0 as f64, res);
506 }
507
508 #[tokio::test]
509 #[ignore]
510 async fn returns_the_right_number_of_volumes() {
511 let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
512 let filtertags: Vec<String> = Vec::new();
513 let res = aws.list_volumes(&filtertags).await.unwrap();
514 assert_eq!(6, res.len());
515 }
516}