aws_sdk_manager/
cloudwatch.rs

1use std::{
2    collections::HashMap,
3    fs::{self, File},
4    io::{self, Error, ErrorKind, Write},
5    path::Path,
6    string::String,
7    sync::Arc,
8    thread, time,
9};
10
11use crate::errors::{Error::API, Result};
12use aws_sdk_cloudwatch::{
13    model::MetricDatum, types::SdkError as MetricsSdkError, Client as MetricsClient,
14};
15use aws_sdk_cloudwatchlogs::{
16    error::{
17        CreateLogGroupError, CreateLogGroupErrorKind, DeleteLogGroupError, DeleteLogGroupErrorKind,
18    },
19    types::SdkError as LogsSdkError,
20    Client as LogsClient,
21};
22use aws_types::SdkConfig as AwsSdkConfig;
23use log::{info, warn};
24use serde::{Deserialize, Serialize};
25
26/// Implements AWS CloudWatch manager.
27#[derive(Debug, Clone)]
28pub struct Manager {
29    #[allow(dead_code)]
30    shared_config: AwsSdkConfig,
31    metrics_cli: MetricsClient,
32    logs_cli: LogsClient,
33}
34
35impl Manager {
36    pub fn new(shared_config: &AwsSdkConfig) -> Self {
37        let cloned = shared_config.clone();
38        let metrics_cli = MetricsClient::new(shared_config);
39        let logs_cli = LogsClient::new(shared_config);
40        Self {
41            shared_config: cloned,
42            metrics_cli,
43            logs_cli,
44        }
45    }
46
47    /// Posts CloudWatch metrics.
48    ///
49    /// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
50    /// ref. https://docs.rs/aws-sdk-cloudwatch/latest/aws_sdk_cloudwatch/struct.Client.html#method.put_metric_data
51    ///
52    /// "If a single piece of data must be accessible from more than one task
53    /// concurrently, then it must be shared using synchronization primitives such as Arc."
54    /// ref. https://tokio.rs/tokio/tutorial/spawning
55    pub async fn put_metric_data(
56        &self,
57        namespace: Arc<String>,
58        data: Arc<Vec<MetricDatum>>,
59    ) -> Result<()> {
60        let n = data.len();
61        info!("posting CloudWatch {} metrics in '{}'", n, namespace);
62        if n <= 20 {
63            let ret = self
64                .metrics_cli
65                .put_metric_data()
66                .namespace(namespace.clone().to_string())
67                .set_metric_data(Some(data.to_vec()))
68                .send()
69                .await;
70            match ret {
71                Ok(_) => {
72                    info!("successfully post metrics");
73                }
74                Err(e) => {
75                    return Err(API {
76                        message: format!("failed put_metric_data {:?}", e),
77                        is_retryable: is_metrics_error_retryable(&e),
78                    });
79                }
80            };
81        } else {
82            warn!("put_metric_data limit is 20, got {}; batching by 20...", n);
83            for batch in data.chunks(20) {
84                let batch_n = batch.len();
85                let ret = self
86                    .metrics_cli
87                    .put_metric_data()
88                    .namespace(namespace.to_string())
89                    .set_metric_data(Some(batch.to_vec()))
90                    .send()
91                    .await;
92                match ret {
93                    Ok(_) => {
94                        info!("successfully post {} metrics in batch", batch_n);
95                    }
96                    Err(e) => {
97                        return Err(API {
98                            message: format!("failed put_metric_data {:?}", e),
99                            is_retryable: is_metrics_error_retryable(&e),
100                        });
101                    }
102                }
103                thread::sleep(time::Duration::from_secs(1));
104            }
105        }
106
107        Ok(())
108    }
109
110    /// Creates a CloudWatch log group.
111    /// ref. https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
112    pub async fn create_log_group(&self, log_group_name: &str) -> Result<()> {
113        info!("creating CloudWatch log group '{}'", log_group_name);
114        let ret = self
115            .logs_cli
116            .create_log_group()
117            .log_group_name(log_group_name)
118            .send()
119            .await;
120        let already_created = match ret {
121            Ok(_) => false,
122            Err(e) => {
123                if !is_logs_error_create_log_group_already_exists(&e) {
124                    return Err(API {
125                        message: format!("failed create_log_group {:?}", e),
126                        is_retryable: is_logs_error_retryable(&e),
127                    });
128                }
129                warn!("log_group already exists ({})", e);
130                true
131            }
132        };
133        if !already_created {
134            info!("created CloudWatch log group");
135        }
136        Ok(())
137    }
138
139    /// Deletes a CloudWatch log group.
140    /// ref. https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
141    pub async fn delete_log_group(&self, log_group_name: &str) -> Result<()> {
142        info!("deleting CloudWatch log group '{}'", log_group_name);
143        let ret = self
144            .logs_cli
145            .delete_log_group()
146            .log_group_name(log_group_name)
147            .send()
148            .await;
149        let deleted = match ret {
150            Ok(_) => true,
151            Err(e) => {
152                let mut ignore_err: bool = false;
153                if is_logs_error_delete_log_group_does_not_exist(&e) {
154                    warn!(
155                        "delete_log_group failed; '{}' does not exist ({}",
156                        log_group_name, e
157                    );
158                    ignore_err = true
159                }
160                if !ignore_err {
161                    return Err(API {
162                        message: format!("failed delete_log_group {:?}", e),
163                        is_retryable: is_logs_error_retryable(&e),
164                    });
165                }
166                false
167            }
168        };
169        if deleted {
170            info!("deleted CloudWatch log group");
171        };
172        Ok(())
173    }
174}
175
176#[inline]
177pub fn is_metrics_error_retryable<E>(e: &MetricsSdkError<E>) -> bool {
178    match e {
179        MetricsSdkError::TimeoutError(_) | MetricsSdkError::ResponseError { .. } => true,
180        MetricsSdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
181        _ => false,
182    }
183}
184
185#[inline]
186pub fn is_logs_error_retryable<E>(e: &LogsSdkError<E>) -> bool {
187    match e {
188        LogsSdkError::TimeoutError(_) | LogsSdkError::ResponseError { .. } => true,
189        LogsSdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
190        _ => false,
191    }
192}
193
194#[inline]
195fn is_logs_error_create_log_group_already_exists(e: &LogsSdkError<CreateLogGroupError>) -> bool {
196    match e {
197        LogsSdkError::ServiceError { err, .. } => {
198            matches!(
199                err.kind,
200                CreateLogGroupErrorKind::ResourceAlreadyExistsException(_)
201            )
202        }
203        _ => false,
204    }
205}
206
207#[inline]
208fn is_logs_error_delete_log_group_does_not_exist(e: &LogsSdkError<DeleteLogGroupError>) -> bool {
209    match e {
210        LogsSdkError::ServiceError { err, .. } => {
211            matches!(
212                err.kind,
213                DeleteLogGroupErrorKind::ResourceNotFoundException(_)
214            )
215        }
216        _ => false,
217    }
218}
219
220pub async fn spawn_put_metric_data(
221    cw_manager: Manager,
222    namespace: &str,
223    data: Vec<MetricDatum>,
224) -> Result<()> {
225    let cw_manager_arc = Arc::new(cw_manager);
226    let namespace_arc = Arc::new(namespace.to_string());
227    tokio::spawn(async move {
228        cw_manager_arc
229            .put_metric_data(namespace_arc, Arc::new(data))
230            .await
231    })
232    .await
233    .expect("failed spawn await")
234}
235
236/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
237pub const DEFAULT_CONFIG_FILE_PATH: &str = "/opt/aws/amazon-cloudwatch-agent/bin/config.json";
238
239pub const DEFAULT_METRICS_COLLECTION_INTERVAL: u32 = 60;
240pub const DEFAULT_LOGFILE: &str =
241    "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log";
242
243/// Represents CloudWatch configuration.
244/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
245/// ref. https://serde.rs/container-attrs.html
246#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
247#[serde(rename_all = "snake_case")]
248pub struct Config {
249    #[serde(skip_serializing_if = "Option::is_none")]
250    pub agent: Option<Agent>,
251    #[serde(skip_serializing_if = "Option::is_none")]
252    pub logs: Option<Logs>,
253    #[serde(skip_serializing_if = "Option::is_none")]
254    pub metrics: Option<Metrics>,
255}
256
257#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
258#[serde(rename_all = "snake_case")]
259pub struct Agent {
260    pub metrics_collection_interval: u32,
261    #[serde(skip_serializing_if = "Option::is_none")]
262    pub region: Option<String>,
263    pub logfile: String,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub debug: Option<bool>,
266}
267
268impl Default for Agent {
269    fn default() -> Self {
270        Self {
271            metrics_collection_interval: DEFAULT_METRICS_COLLECTION_INTERVAL,
272            region: None,
273            logfile: String::from(DEFAULT_LOGFILE),
274            debug: Some(false),
275        }
276    }
277}
278
279#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
280#[serde(rename_all = "snake_case")]
281pub struct Logs {
282    #[serde(skip_serializing_if = "Option::is_none")]
283    pub logs_collected: Option<LogsCollected>,
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub force_flush_interval: Option<u32>,
286}
287
288#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
289#[serde(rename_all = "snake_case")]
290pub struct LogsCollected {
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub files: Option<Files>,
293}
294
295#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
296#[serde(rename_all = "snake_case")]
297pub struct Files {
298    #[serde(skip_serializing_if = "Option::is_none")]
299    pub collect_list: Option<Vec<Collect>>,
300}
301
302/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
303#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
304#[serde(rename_all = "snake_case")]
305pub struct Collect {
306    /// Specifies what to use as the log group name in CloudWatch Logs.
307    pub log_group_name: String,
308    pub log_stream_name: String,
309    /// Specifies the path of the log file to upload to CloudWatch Logs.
310    pub file_path: String,
311    #[serde(skip_serializing_if = "Option::is_none")]
312    pub timestamp_format: Option<String>,
313    /// The valid values are UTC and Local.
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub timezone: Option<String>,
316    #[serde(skip_serializing_if = "Option::is_none")]
317    pub auto_removal: Option<bool>,
318    #[serde(skip_serializing_if = "Option::is_none")]
319    pub retention_in_days: Option<u16>,
320}
321
322impl Default for Collect {
323    fn default() -> Self {
324        Self {
325            log_group_name: String::from(""),
326            log_stream_name: String::from(""),
327            file_path: String::from(""),
328            timestamp_format: None,
329            timezone: None,
330            auto_removal: None,
331            retention_in_days: None,
332        }
333    }
334}
335
336/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
337#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
338#[serde(rename_all = "snake_case")]
339pub struct Metrics {
340    pub namespace: String,
341    pub metrics_collected: MetricsCollected,
342    #[serde(skip_serializing_if = "Option::is_none")]
343    pub append_dimensions: Option<HashMap<String, String>>,
344    /// Specifies the dimensions that collected metrics are to be aggregated on.
345    /// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub aggregation_dimensions: Option<Vec<Vec<String>>>,
348    pub force_flush_interval: u32,
349}
350
351impl Default for Metrics {
352    fn default() -> Self {
353        // ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
354        let mut m = HashMap::new();
355        m.insert("InstanceId".to_string(), "${aws:InstanceId}".to_string());
356        m.insert(
357            "InstanceType".to_string(),
358            "${aws:InstanceType}".to_string(),
359        );
360        m.insert(
361            "AutoScalingGroupName".to_string(),
362            "${aws:AutoScalingGroupName}".to_string(),
363        );
364        Self {
365            namespace: String::new(),
366            metrics_collected: MetricsCollected::default(),
367            append_dimensions: Some(m),
368            aggregation_dimensions: Some(vec![
369                vec!["AutoScalingGroupName".to_string()],
370                vec!["InstanceId".to_string(), "InstanceType".to_string()],
371            ]),
372            force_flush_interval: 30,
373        }
374    }
375}
376
377/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
378#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
379#[serde(rename_all = "snake_case")]
380pub struct MetricsCollected {
381    #[serde(skip_serializing_if = "Option::is_none")]
382    pub cpu: Option<Cpu>,
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub mem: Option<Mem>,
385    #[serde(skip_serializing_if = "Option::is_none")]
386    pub disk: Option<Disk>,
387    #[serde(skip_serializing_if = "Option::is_none")]
388    pub diskio: Option<DiskIo>,
389    #[serde(skip_serializing_if = "Option::is_none")]
390    pub net: Option<Net>,
391    #[serde(skip_serializing_if = "Option::is_none")]
392    pub netstat: Option<Netstat>,
393}
394
395impl Default for MetricsCollected {
396    fn default() -> Self {
397        Self {
398            cpu: Some(Cpu::default()),
399            mem: Some(Mem::default()),
400            disk: Some(Disk::default()),
401            diskio: Some(DiskIo::default()),
402            net: Some(Net::default()),
403            netstat: Some(Netstat::default()),
404        }
405    }
406}
407
408/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
409#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
410#[serde(rename_all = "snake_case")]
411pub struct Cpu {
412    pub resources: Vec<String>,
413    pub measurement: Vec<String>,
414    pub metrics_collection_interval: u32,
415}
416
417impl Default for Cpu {
418    fn default() -> Self {
419        Self {
420            resources: vec!["*".to_string()],
421            measurement: vec![
422                "usage_active".to_string(), // cpu_usage_* metrics is Percent
423                "usage_system".to_string(), // cpu_usage_* metrics is Percent
424            ],
425            metrics_collection_interval: 60,
426        }
427    }
428}
429
430/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
431#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
432#[serde(rename_all = "snake_case")]
433pub struct Mem {
434    pub measurement: Vec<String>,
435    pub metrics_collection_interval: u32,
436}
437
438impl Default for Mem {
439    fn default() -> Self {
440        Self {
441            measurement: vec!["mem_used".to_string(), "mem_total".to_string()],
442            metrics_collection_interval: 60,
443        }
444    }
445}
446
447/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
448#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
449#[serde(rename_all = "snake_case")]
450pub struct Disk {
451    pub resources: Vec<String>,
452    pub measurement: Vec<String>,
453    #[serde(skip_serializing_if = "Option::is_none")]
454    pub ignore_file_system_types: Option<Vec<String>>,
455    pub metrics_collection_interval: u32,
456}
457
458impl Default for Disk {
459    fn default() -> Self {
460        Self {
461            resources: vec!["/".to_string()],
462            measurement: vec![
463                "used".to_string(),
464                "total".to_string(),
465                "inodes_used".to_string(),
466                "inodes_total".to_string(),
467            ],
468            ignore_file_system_types: Some(vec!["sysfs".to_string(), "devtmpfs".to_string()]),
469            metrics_collection_interval: 60,
470        }
471    }
472}
473
474impl Disk {
475    pub fn new(resources: Vec<String>) -> Self {
476        Self {
477            resources,
478            ..Default::default()
479        }
480    }
481}
482
483/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
484#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
485#[serde(rename_all = "snake_case")]
486pub struct DiskIo {
487    pub resources: Vec<String>,
488    pub measurement: Vec<String>,
489    pub metrics_collection_interval: u32,
490}
491
492impl Default for DiskIo {
493    fn default() -> Self {
494        Self {
495            // "nvme0n1" for boot volume (AWS)
496            // "nvme0n1p1" for boot volume (AWS)
497            // "nvme1n1" for mounted EBS (AWS)
498            // (run "lsblk" to find out which devices)
499            resources: vec!["nvme1n1".to_string()],
500            measurement: vec![
501                "reads".to_string(),
502                "writes".to_string(),
503                "read_time".to_string(),
504                "write_time".to_string(),
505            ],
506            metrics_collection_interval: 60,
507        }
508    }
509}
510
511/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
512#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
513#[serde(rename_all = "snake_case")]
514pub struct Net {
515    pub resources: Vec<String>,
516    pub measurement: Vec<String>,
517    pub metrics_collection_interval: u32,
518}
519
520impl Default for Net {
521    fn default() -> Self {
522        Self {
523            resources: vec!["*".to_string()],
524            measurement: vec![
525                "bytes_sent".to_string(),
526                "bytes_recv".to_string(),
527                "packets_sent".to_string(),
528                "packets_recv".to_string(),
529            ],
530            metrics_collection_interval: 60,
531        }
532    }
533}
534
535/// ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
536#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
537#[serde(rename_all = "snake_case")]
538pub struct Netstat {
539    pub measurement: Vec<String>,
540    pub metrics_collection_interval: u32,
541}
542
543impl Default for Netstat {
544    fn default() -> Self {
545        Self {
546            measurement: vec!["tcp_listen".to_string(), "tcp_established".to_string()],
547            metrics_collection_interval: 60,
548        }
549    }
550}
551
552impl Default for Config {
553    fn default() -> Self {
554        Self::default()
555    }
556}
557
558impl Config {
559    pub fn new() -> Self {
560        Self {
561            agent: None,
562            logs: None,
563            metrics: None,
564        }
565    }
566
567    pub fn default() -> Self {
568        let mut config = Self::new();
569        config.agent = Some(Agent::default());
570        config.metrics = Some(Metrics::default());
571        config
572    }
573
574    /// Converts to string.
575    pub fn encode_json(&self) -> io::Result<String> {
576        match serde_json::to_string(&self) {
577            Ok(s) => Ok(s),
578            Err(e) => {
579                return Err(Error::new(
580                    ErrorKind::Other,
581                    format!("failed to serialize Config to YAML {}", e),
582                ));
583            }
584        }
585    }
586
587    /// Saves the current configuration to disk
588    /// and overwrites the file.
589    pub fn sync(&self, file_path: &str) -> io::Result<()> {
590        info!("syncing CloudWatch config to '{}'", file_path);
591        let path = Path::new(file_path);
592        let parent_dir = path.parent().unwrap();
593        fs::create_dir_all(parent_dir)?;
594
595        let ret = serde_json::to_vec(self);
596        let d = match ret {
597            Ok(d) => d,
598            Err(e) => {
599                return Err(Error::new(
600                    ErrorKind::Other,
601                    format!("failed to serialize Config to YAML {}", e),
602                ));
603            }
604        };
605        let mut f = File::create(file_path)?;
606        f.write_all(&d)?;
607
608        Ok(())
609    }
610
611    pub fn load(file_path: &str) -> io::Result<Self> {
612        info!("loading config from {}", file_path);
613
614        if !Path::new(file_path).exists() {
615            return Err(Error::new(
616                ErrorKind::NotFound,
617                format!("file {} does not exists", file_path),
618            ));
619        }
620
621        let f = File::open(&file_path).map_err(|e| {
622            return Error::new(
623                ErrorKind::Other,
624                format!("failed to open {} ({})", file_path, e),
625            );
626        })?;
627        serde_json::from_reader(f).map_err(|e| {
628            return Error::new(ErrorKind::InvalidInput, format!("invalid JSON: {}", e));
629        })
630    }
631
632    /// Validates the configuration.
633    pub fn validate(&self) -> io::Result<()> {
634        info!("validating the CloudWatch configuration");
635
636        Ok(())
637    }
638}
639
640#[test]
641fn test_config() {
642    use std::fs;
643    let _ = env_logger::builder().is_test(true).try_init();
644
645    let config = Config::default();
646    let ret = config.encode_json();
647    assert!(ret.is_ok());
648    let s = ret.unwrap();
649    info!("config: {}", s);
650
651    let p = random_manager::tmp_path(10, Some(".json")).unwrap();
652    let ret = config.sync(&p);
653    assert!(ret.is_ok());
654    fs::remove_file(p).unwrap();
655}