cloud_scanner_cli/
aws_cloud_provider.rs

1//! A module that returns inventory of AWS resources.
2use std::time::Instant;
3
4use crate::cloud_provider::Inventoriable;
5use crate::get_version;
6use crate::usage_location::*;
7
8use anyhow::{Context, Error, Result};
9use aws_sdk_cloudwatch::operation::get_metric_statistics::GetMetricStatisticsOutput;
10use aws_sdk_cloudwatch::types::{Dimension, StandardUnit, Statistic};
11use aws_sdk_ec2::config::Region;
12use aws_sdk_ec2::types::{Instance, InstanceStateName, Volume};
13use chrono::{TimeDelta, Utc};
14
15use crate::model::{
16    CloudProvider, CloudResource, CloudResourceTag, ExecutionStatistics, InstanceState,
17    InstanceUsage, Inventory, InventoryMetadata, ResourceDetails, StorageAttachment, StorageUsage,
18};
19use async_trait::async_trait;
20use aws_types::SdkConfig;
21
22///  A service that returns inventory of AWS resources.
23#[derive(Clone, Debug)]
24pub struct AwsCloudProvider {
25    aws_region: String,
26    ec2_client: aws_sdk_ec2::Client,
27    cloudwatch_client: aws_sdk_cloudwatch::Client,
28}
29
30impl AwsCloudProvider {
31    /// Creates a service that returns inventory of AWS resources.
32    ///
33    /// Initializes it with a specific region and configures the SDK's that will query your account to perform the inventory of resources.
34    pub async fn new(aws_region: &str) -> Result<Self> {
35        let shared_config = Self::load_aws_config(aws_region).await?;
36        let retained_region = Self::get_configured_region_or_exit_if_unsupported(&shared_config);
37
38        Ok(AwsCloudProvider {
39            aws_region: retained_region,
40            ec2_client: aws_sdk_ec2::Client::new(&shared_config),
41            cloudwatch_client: aws_sdk_cloudwatch::Client::new(&shared_config),
42        })
43    }
44
45    /// Initialize an AWS SDK config using credentials from the environment and a region passed as argument.
46    ///
47    /// - If region is empty, uses the default region from environment.
48    /// - ⚠  If the region is invalid, it does **not** return error.
49    async fn load_aws_config(aws_region: &str) -> Result<SdkConfig> {
50        if aws_region.is_empty() {
51            // Use default region (from environment, if any)
52            let sdk_config = aws_config::load_from_env().await;
53
54            match sdk_config.region() {
55                None => {
56                    warn!("Tried to initialize AWS client without a region.");
57                }
58                Some(region) => {
59                    warn!(
60                        "Initialized AWS client from the default region picked up from environment [{}]", region
61                    );
62                }
63            }
64            Ok(sdk_config)
65        } else {
66            // Use the region passed in argument
67            let sdk_config = aws_config::from_env()
68                .region(Region::new(aws_region.to_string()))
69                .load()
70                .await;
71            info!("Initialized AWS client with with region [{}]", aws_region);
72            Ok(sdk_config)
73        }
74    }
75
76    /// Util function that panics with error message if the region cannot be set or is not supported by cloud-scanner
77    fn get_configured_region_or_exit_if_unsupported(sdk_config: &SdkConfig) -> String {
78        if let Some(retained_region) = sdk_config.region() {
79            // retained_region.
80            let tmp_reg = UsageLocation::try_from(retained_region.as_ref());
81            if tmp_reg.is_err() {
82                error!(
83                    "Cannot initialize AWS client for region ({}). Exiting.",
84                    retained_region
85                );
86                panic!();
87            }
88            retained_region.to_string().to_owned()
89        } else {
90            error!("Unable to configure AWS client region. You should consider setting a AWS_DEFAULT_REGION as environment variable or pass region as a CLI parameter.... Exiting...");
91            panic!();
92        }
93    }
94
95    /// Convert AWS tags into Cloud Scanner tags
96    fn cloud_resource_tags_from_aws_tags(
97        aws_tags: &[aws_sdk_ec2::types::Tag],
98    ) -> Vec<CloudResourceTag> {
99        let mut cs_tags: Vec<CloudResourceTag> = Vec::new();
100        for nt in aws_tags.iter() {
101            let k = nt.key.to_owned().unwrap();
102            let v = nt.value.to_owned();
103            cs_tags.push(CloudResourceTag { key: k, value: v });
104        }
105        cs_tags
106    }
107
108    /// Perform inventory of all aws instances of the region
109    async fn get_instances_with_usage_data(&self, tags: &[String]) -> Result<Vec<CloudResource>> {
110        let instances: Vec<Instance> = self
111            .clone()
112            .list_instances(tags)
113            .await
114            .context("Cannot list instances")?;
115        let location = UsageLocation::try_from(self.aws_region.as_str())?;
116
117        // Just to display statistics
118        let cpu_info_timer = Instant::now();
119
120        let mut inventory: Vec<CloudResource> = Vec::new();
121        for instance in instances {
122            let instance_id = instance.instance_id().unwrap().to_string();
123            let cpuload: f64 = self
124                .clone()
125                .get_average_cpu(&instance_id)
126                .await
127                .context("Cannot get CPU load of instance")?;
128
129            let usage: InstanceUsage = InstanceUsage {
130                average_cpu_load: cpuload,
131                state: Self::aws_state_to_generic(instance.clone()),
132            };
133
134            let cloud_resource_tags = Self::cloud_resource_tags_from_aws_tags(instance.tags());
135
136            info!(
137                "Total time spend querying CPU load of instances: {:?}",
138                cpu_info_timer.elapsed()
139            );
140
141            let inst = CloudResource {
142                provider: CloudProvider::AWS,
143                id: instance_id,
144                location: location.clone(),
145                resource_details: ResourceDetails::Instance {
146                    instance_type: instance.instance_type().unwrap().as_str().to_owned(),
147                    usage: Some(usage),
148                },
149
150                tags: cloud_resource_tags,
151            };
152
153            if inst.has_matching_tags(tags) {
154                debug!("Resource matched on tags: {:?}", inst.id);
155                inventory.push(inst);
156            } else {
157                debug!("Filtered instance (tags do not match: {:?}", inst);
158            }
159            //if cs matches the tags passed in param keep it (push it, otherwise skip it)
160        }
161
162        Ok(inventory)
163    }
164
165    /// We consider that an instance is running unless explicitly stopped or terminated
166    fn aws_state_to_generic(instance: Instance) -> InstanceState {
167        if let Some(state) = instance.state {
168            if let Some(state_name) = state.name {
169                match state_name {
170                    InstanceStateName::Stopped => InstanceState::Stopped,
171                    InstanceStateName::Terminated => InstanceState::Stopped,
172                    _ => InstanceState::Running,
173                }
174            } else {
175                InstanceState::Running
176            }
177        } else {
178            InstanceState::Running
179        }
180    }
181
182    /// List all ec2 instances of the current account / region
183    ///
184    /// ⚠  Filtering instance on tags during query is not yet implemented. All instances (running or stopped) are returned.
185    async fn list_instances(self, _tags: &[String]) -> Result<Vec<Instance>> {
186        let client = &self.ec2_client;
187        let mut instances: Vec<Instance> = Vec::new();
188        // Filter: AND on name, OR on values
189        //let filters :std::vec::Vec<aws_sdk_ec2::model::Filter>;
190
191        let resp = client
192            .describe_instances()
193            //set_filters() // Use filters for tags
194            .send()
195            .await?;
196
197        for reservation in resp.reservations() {
198            for instance in reservation.instances() {
199                instances.push(instance.clone());
200            }
201        }
202        Ok(instances)
203    }
204
205    /// Returns average CPU load of a given instance.
206    ///
207    async fn get_average_cpu(self, instance_id: &str) -> Result<f64> {
208        let res = self
209            .get_average_cpu_usage_of_last_10_minutes(instance_id)
210            .await
211            .with_context(|| {
212                format!(
213                    "Cannot retrieve average CPU load of instance: {}",
214                    instance_id
215                )
216            })?;
217        if let Some(points) = res.datapoints {
218            if !points.is_empty() {
219                debug!("Averaging cpu load data point: {:#?}", points);
220                let mut sum: f64 = 0.0;
221                for x in &points {
222                    sum += x.average().unwrap();
223                }
224                let avg = sum / points.len() as f64;
225                return Ok(avg);
226            }
227        }
228        warn!(
229            "Unable to get CPU load of  instance {}, it is likely stopped, using 0 as load",
230            instance_id
231        );
232        Ok(0 as f64)
233    }
234
235    /// Returns the instance CPU utilization usage on the last 10 minutes
236    async fn get_average_cpu_usage_of_last_10_minutes(
237        self,
238        instance_id: &str,
239    ) -> Result<GetMetricStatisticsOutput, Error> {
240        // We want statistics about the last 10 minutes using  5min  sample
241        let measure_duration: chrono::TimeDelta =
242            TimeDelta::try_minutes(10).context("Unsupported duration")?;
243        let sample_period_seconds = 300; // 5*60 (the default granularity of cloudwatch standard CPU metrics)
244        let now: chrono::DateTime<Utc> = Utc::now();
245        let start_time: chrono::DateTime<Utc> = now - measure_duration;
246
247        let cpu_metric_name = String::from("CPUUtilization");
248        let ec2_namespace = "AWS/EC2";
249
250        let dimensions = vec![Dimension::builder()
251            .name("InstanceId")
252            .value(instance_id)
253            .build()];
254
255        let end_time_aws: aws_sdk_cloudwatch::primitives::DateTime =
256            aws_sdk_cloudwatch::primitives::DateTime::from_secs(now.timestamp());
257        let start_time_aws: aws_sdk_cloudwatch::primitives::DateTime =
258            aws_sdk_cloudwatch::primitives::DateTime::from_secs(start_time.timestamp());
259
260        let resp: GetMetricStatisticsOutput = self
261            .cloudwatch_client
262            .get_metric_statistics()
263            .end_time(end_time_aws)
264            .metric_name(cpu_metric_name)
265            .namespace(ec2_namespace)
266            .period(sample_period_seconds)
267            .set_dimensions(Some(dimensions))
268            .start_time(start_time_aws)
269            .statistics(Statistic::Average)
270            .unit(StandardUnit::Percent)
271            .send()
272            .await
273            .context("Trying to get cloudwatch statistics")?;
274
275        Ok(resp)
276    }
277
278    /// List all Volumes of current account.
279    ///
280    /// ⚠  Filtering on tags is not yet implemented.
281    async fn list_volumes(self, tags: &[String]) -> Result<Vec<Volume>> {
282        warn!(
283            "Warning: filtering volumes on tags is not implemented {:?}",
284            tags
285        );
286        let client = &self.ec2_client;
287        let mut volumes: Vec<Volume> = Vec::new();
288        // Filter: AND on name, OR on values
289        //let filters :std::vec::Vec<aws_sdk_ec2::model::Filter>;
290        let resp = client
291            .describe_volumes()
292            //set_filters() // Use filters for tags
293            .send()
294            .await?;
295        for v in resp.volumes() {
296            volumes.push(v.clone());
297        }
298        Ok(volumes)
299    }
300
301    /// Perform inventory of all aws volumes of the region
302    async fn get_volumes_with_usage_data(&self, tags: &[String]) -> Result<Vec<CloudResource>> {
303        let location = UsageLocation::try_from(self.aws_region.as_str())?;
304        let volumes = self.clone().list_volumes(tags).await.unwrap();
305        let mut resources: Vec<CloudResource> = Vec::new();
306
307        for volume in volumes {
308            let volume_id = volume.volume_id().unwrap();
309
310            let usage: StorageUsage = StorageUsage {
311                size_gb: volume.size().unwrap(),
312            };
313
314            let volume_type: String = volume.volume_type().unwrap().as_str().to_string();
315            let mut attached_instances: Option<Vec<StorageAttachment>> = None;
316
317            if let Some(all_volume_attachments) = volume.attachments.clone() {
318                for single_attachment in all_volume_attachments {
319                    let mut attachment_list: Vec<StorageAttachment> = Vec::new();
320
321                    if let Some(instance_id) = single_attachment.instance_id {
322                        attachment_list.push(StorageAttachment { instance_id });
323                    }
324                    attached_instances = Some(attachment_list);
325                }
326            }
327
328            let disk = CloudResource {
329                provider: CloudProvider::AWS,
330                id: volume_id.into(),
331                location: location.clone(),
332                resource_details: ResourceDetails::BlockStorage {
333                    storage_type: volume_type,
334                    usage: Some(usage),
335                    attached_instances,
336                },
337                tags: Self::cloud_resource_tags_from_aws_tags(volume.tags()),
338            };
339            resources.push(disk);
340        }
341
342        Ok(resources)
343    }
344}
345
346#[async_trait]
347impl Inventoriable for AwsCloudProvider {
348    /// List resources whose tags match passed tags
349    async fn list_resources(
350        &self,
351        tags: &[String],
352        include_block_storage: bool,
353    ) -> Result<Inventory> {
354        let start = Instant::now();
355
356        let mut resources: Vec<CloudResource> = Vec::new();
357
358        let mut instances = self.clone().get_instances_with_usage_data(tags).await?;
359        resources.append(&mut instances);
360        if include_block_storage {
361            let mut volumes = self.clone().get_volumes_with_usage_data(tags).await?;
362            resources.append(&mut volumes);
363        }
364        let stats = ExecutionStatistics {
365            inventory_duration: start.elapsed(),
366            impact_estimation_duration: std::time::Duration::from_millis(0),
367            total_duration: start.elapsed(),
368        };
369        warn!("{:?}", stats);
370
371        let metadata = InventoryMetadata {
372            inventory_date: Some(Utc::now()),
373            description: Some(String::from("About this inventory")),
374            cloud_scanner_version: Some(get_version()),
375            execution_statistics: Some(stats),
376        };
377
378        let inventory = Inventory {
379            metadata,
380            resources,
381        };
382        Ok(inventory)
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::model::vec_to_map;
390
391    static RUNNING_INSTANCE_ID: &str = "i-03c8f84a6318a8186";
392
393    #[tokio::test]
394    #[ignore]
395    async fn inventory_should_return_correct_number_of_instances() {
396        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
397        let filtertags: Vec<String> = Vec::new();
398        let res: Vec<CloudResource> = aws
399            .get_instances_with_usage_data(&filtertags)
400            .await
401            .context("Failed to list")
402            .unwrap();
403        assert_eq!(4, res.len());
404
405        let inst = res.first().unwrap();
406        assert_eq!(3, inst.tags.len(), "Wrong number of tags");
407        let tag_map = vec_to_map(inst.tags.clone());
408        let v = tag_map.get("Name").unwrap();
409        assert_eq!(
410            Some("test-boapi".to_string()),
411            v.to_owned(),
412            "Wrong tag value"
413        );
414    }
415
416    #[tokio::test]
417    async fn test_create_sdk_config_works_with_wrong_region() {
418        let region: &str = "eu-west-3";
419        let config = AwsCloudProvider::load_aws_config(region).await.unwrap();
420        assert_eq!(region, config.region().unwrap().to_string());
421
422        let wrong_region: &str = "impossible-region";
423        let config = AwsCloudProvider::load_aws_config(wrong_region)
424            .await
425            .unwrap();
426        assert_eq!(wrong_region, config.region().unwrap().to_string())
427    }
428
429    #[tokio::test]
430    #[ignore]
431    async fn get_cpu_usage_metrics_of_running_instance_should_return_right_number_of_data_points() {
432        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
433        let res = aws
434            .get_average_cpu_usage_of_last_10_minutes(&RUNNING_INSTANCE_ID)
435            .await
436            .unwrap();
437        let datapoints = res.datapoints.unwrap();
438        assert!(
439            0 < datapoints.len() && datapoints.len() < 3,
440            "Strange number of datapoint returned for instance {}, is it really up ?. I was expecting 1 or 2  but got {} .\n {:#?}",
441            &RUNNING_INSTANCE_ID,
442            datapoints.len(),
443            datapoints
444        )
445    }
446
447    #[tokio::test]
448    #[ignore]
449    async fn test_get_instance_usage_metrics_of_shutdown_instance() {
450        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
451        let instance_id = "i-03e0b3b1246001382";
452        let res = aws
453            .get_average_cpu_usage_of_last_10_minutes(instance_id)
454            .await
455            .unwrap();
456        let datapoints = res.datapoints.unwrap();
457        assert_eq!(0, datapoints.len(), "Wrong number of datapoint returned");
458    }
459
460    #[tokio::test]
461    #[ignore]
462    async fn test_get_instance_usage_metrics_of_non_existing_instance() {
463        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
464        let instance_id = "IDONOTEXISTS";
465        let res = aws
466            .get_average_cpu_usage_of_last_10_minutes(instance_id)
467            .await
468            .unwrap();
469        let datapoints = res.datapoints.unwrap();
470        assert_eq!(0, datapoints.len());
471    }
472
473    #[tokio::test]
474    #[ignore]
475    async fn test_average_cpu_load_of_running_instance_is_not_zero() {
476        // This instance  needs to be running for the test to pass
477        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
478
479        let avg_cpu_load = aws.get_average_cpu(&RUNNING_INSTANCE_ID).await.unwrap();
480        assert_ne!(
481            0 as f64, avg_cpu_load,
482            "CPU load of instance {} is zero, is it really running ?",
483            &RUNNING_INSTANCE_ID
484        );
485        println!("{:#?}", avg_cpu_load);
486        assert!((0 as f64) < avg_cpu_load);
487        assert!((100 as f64) > avg_cpu_load);
488    }
489
490    #[tokio::test]
491    #[ignore]
492    async fn test_average_cpu_load_of_non_existing_instance_is_zero() {
493        let instance_id = "IDONOTEXISTS";
494        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
495        let res = aws.get_average_cpu(instance_id).await.unwrap();
496        assert_eq!(0 as f64, res);
497    }
498
499    #[tokio::test]
500    #[ignore]
501    async fn test_average_cpu_load_of_shutdown_instance_is_zero() {
502        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
503        let instance_id = "i-03e0b3b1246001382";
504        let res = aws.get_average_cpu(instance_id).await.unwrap();
505        assert_eq!(0 as f64, res);
506    }
507
508    #[tokio::test]
509    #[ignore]
510    async fn returns_the_right_number_of_volumes() {
511        let aws: AwsCloudProvider = AwsCloudProvider::new("eu-west-1").await.unwrap();
512        let filtertags: Vec<String> = Vec::new();
513        let res = aws.list_volumes(&filtertags).await.unwrap();
514        assert_eq!(6, res.len());
515    }
516}