1use std::{
2 collections::HashMap,
3 fs::{self, File},
4 io::{self, Error, ErrorKind, Write},
5 path::Path,
6 string::String,
7 sync::Arc,
8 thread, time,
9};
10
11use crate::errors::{Error::API, Result};
12use aws_sdk_cloudwatch::{
13 model::MetricDatum, types::SdkError as MetricsSdkError, Client as MetricsClient,
14};
15use aws_sdk_cloudwatchlogs::{
16 error::{
17 CreateLogGroupError, CreateLogGroupErrorKind, DeleteLogGroupError, DeleteLogGroupErrorKind,
18 },
19 types::SdkError as LogsSdkError,
20 Client as LogsClient,
21};
22use aws_types::SdkConfig as AwsSdkConfig;
23use log::{info, warn};
24use serde::{Deserialize, Serialize};
25
26#[derive(Debug, Clone)]
28pub struct Manager {
29 #[allow(dead_code)]
30 shared_config: AwsSdkConfig,
31 metrics_cli: MetricsClient,
32 logs_cli: LogsClient,
33}
34
35impl Manager {
36 pub fn new(shared_config: &AwsSdkConfig) -> Self {
37 let cloned = shared_config.clone();
38 let metrics_cli = MetricsClient::new(shared_config);
39 let logs_cli = LogsClient::new(shared_config);
40 Self {
41 shared_config: cloned,
42 metrics_cli,
43 logs_cli,
44 }
45 }
46
47 pub async fn put_metric_data(
56 &self,
57 namespace: Arc<String>,
58 data: Arc<Vec<MetricDatum>>,
59 ) -> Result<()> {
60 let n = data.len();
61 info!("posting CloudWatch {} metrics in '{}'", n, namespace);
62 if n <= 20 {
63 let ret = self
64 .metrics_cli
65 .put_metric_data()
66 .namespace(namespace.clone().to_string())
67 .set_metric_data(Some(data.to_vec()))
68 .send()
69 .await;
70 match ret {
71 Ok(_) => {
72 info!("successfully post metrics");
73 }
74 Err(e) => {
75 return Err(API {
76 message: format!("failed put_metric_data {:?}", e),
77 is_retryable: is_metrics_error_retryable(&e),
78 });
79 }
80 };
81 } else {
82 warn!("put_metric_data limit is 20, got {}; batching by 20...", n);
83 for batch in data.chunks(20) {
84 let batch_n = batch.len();
85 let ret = self
86 .metrics_cli
87 .put_metric_data()
88 .namespace(namespace.to_string())
89 .set_metric_data(Some(batch.to_vec()))
90 .send()
91 .await;
92 match ret {
93 Ok(_) => {
94 info!("successfully post {} metrics in batch", batch_n);
95 }
96 Err(e) => {
97 return Err(API {
98 message: format!("failed put_metric_data {:?}", e),
99 is_retryable: is_metrics_error_retryable(&e),
100 });
101 }
102 }
103 thread::sleep(time::Duration::from_secs(1));
104 }
105 }
106
107 Ok(())
108 }
109
110 pub async fn create_log_group(&self, log_group_name: &str) -> Result<()> {
113 info!("creating CloudWatch log group '{}'", log_group_name);
114 let ret = self
115 .logs_cli
116 .create_log_group()
117 .log_group_name(log_group_name)
118 .send()
119 .await;
120 let already_created = match ret {
121 Ok(_) => false,
122 Err(e) => {
123 if !is_logs_error_create_log_group_already_exists(&e) {
124 return Err(API {
125 message: format!("failed create_log_group {:?}", e),
126 is_retryable: is_logs_error_retryable(&e),
127 });
128 }
129 warn!("log_group already exists ({})", e);
130 true
131 }
132 };
133 if !already_created {
134 info!("created CloudWatch log group");
135 }
136 Ok(())
137 }
138
139 pub async fn delete_log_group(&self, log_group_name: &str) -> Result<()> {
142 info!("deleting CloudWatch log group '{}'", log_group_name);
143 let ret = self
144 .logs_cli
145 .delete_log_group()
146 .log_group_name(log_group_name)
147 .send()
148 .await;
149 let deleted = match ret {
150 Ok(_) => true,
151 Err(e) => {
152 let mut ignore_err: bool = false;
153 if is_logs_error_delete_log_group_does_not_exist(&e) {
154 warn!(
155 "delete_log_group failed; '{}' does not exist ({}",
156 log_group_name, e
157 );
158 ignore_err = true
159 }
160 if !ignore_err {
161 return Err(API {
162 message: format!("failed delete_log_group {:?}", e),
163 is_retryable: is_logs_error_retryable(&e),
164 });
165 }
166 false
167 }
168 };
169 if deleted {
170 info!("deleted CloudWatch log group");
171 };
172 Ok(())
173 }
174}
175
176#[inline]
177pub fn is_metrics_error_retryable<E>(e: &MetricsSdkError<E>) -> bool {
178 match e {
179 MetricsSdkError::TimeoutError(_) | MetricsSdkError::ResponseError { .. } => true,
180 MetricsSdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
181 _ => false,
182 }
183}
184
185#[inline]
186pub fn is_logs_error_retryable<E>(e: &LogsSdkError<E>) -> bool {
187 match e {
188 LogsSdkError::TimeoutError(_) | LogsSdkError::ResponseError { .. } => true,
189 LogsSdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
190 _ => false,
191 }
192}
193
194#[inline]
195fn is_logs_error_create_log_group_already_exists(e: &LogsSdkError<CreateLogGroupError>) -> bool {
196 match e {
197 LogsSdkError::ServiceError { err, .. } => {
198 matches!(
199 err.kind,
200 CreateLogGroupErrorKind::ResourceAlreadyExistsException(_)
201 )
202 }
203 _ => false,
204 }
205}
206
207#[inline]
208fn is_logs_error_delete_log_group_does_not_exist(e: &LogsSdkError<DeleteLogGroupError>) -> bool {
209 match e {
210 LogsSdkError::ServiceError { err, .. } => {
211 matches!(
212 err.kind,
213 DeleteLogGroupErrorKind::ResourceNotFoundException(_)
214 )
215 }
216 _ => false,
217 }
218}
219
220pub async fn spawn_put_metric_data(
221 cw_manager: Manager,
222 namespace: &str,
223 data: Vec<MetricDatum>,
224) -> Result<()> {
225 let cw_manager_arc = Arc::new(cw_manager);
226 let namespace_arc = Arc::new(namespace.to_string());
227 tokio::spawn(async move {
228 cw_manager_arc
229 .put_metric_data(namespace_arc, Arc::new(data))
230 .await
231 })
232 .await
233 .expect("failed spawn await")
234}
235
236pub const DEFAULT_CONFIG_FILE_PATH: &str = "/opt/aws/amazon-cloudwatch-agent/bin/config.json";
238
239pub const DEFAULT_METRICS_COLLECTION_INTERVAL: u32 = 60;
240pub const DEFAULT_LOGFILE: &str =
241 "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log";
242
243#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
247#[serde(rename_all = "snake_case")]
248pub struct Config {
249 #[serde(skip_serializing_if = "Option::is_none")]
250 pub agent: Option<Agent>,
251 #[serde(skip_serializing_if = "Option::is_none")]
252 pub logs: Option<Logs>,
253 #[serde(skip_serializing_if = "Option::is_none")]
254 pub metrics: Option<Metrics>,
255}
256
257#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
258#[serde(rename_all = "snake_case")]
259pub struct Agent {
260 pub metrics_collection_interval: u32,
261 #[serde(skip_serializing_if = "Option::is_none")]
262 pub region: Option<String>,
263 pub logfile: String,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 pub debug: Option<bool>,
266}
267
268impl Default for Agent {
269 fn default() -> Self {
270 Self {
271 metrics_collection_interval: DEFAULT_METRICS_COLLECTION_INTERVAL,
272 region: None,
273 logfile: String::from(DEFAULT_LOGFILE),
274 debug: Some(false),
275 }
276 }
277}
278
279#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
280#[serde(rename_all = "snake_case")]
281pub struct Logs {
282 #[serde(skip_serializing_if = "Option::is_none")]
283 pub logs_collected: Option<LogsCollected>,
284 #[serde(skip_serializing_if = "Option::is_none")]
285 pub force_flush_interval: Option<u32>,
286}
287
288#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
289#[serde(rename_all = "snake_case")]
290pub struct LogsCollected {
291 #[serde(skip_serializing_if = "Option::is_none")]
292 pub files: Option<Files>,
293}
294
295#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
296#[serde(rename_all = "snake_case")]
297pub struct Files {
298 #[serde(skip_serializing_if = "Option::is_none")]
299 pub collect_list: Option<Vec<Collect>>,
300}
301
302#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
304#[serde(rename_all = "snake_case")]
305pub struct Collect {
306 pub log_group_name: String,
308 pub log_stream_name: String,
309 pub file_path: String,
311 #[serde(skip_serializing_if = "Option::is_none")]
312 pub timestamp_format: Option<String>,
313 #[serde(skip_serializing_if = "Option::is_none")]
315 pub timezone: Option<String>,
316 #[serde(skip_serializing_if = "Option::is_none")]
317 pub auto_removal: Option<bool>,
318 #[serde(skip_serializing_if = "Option::is_none")]
319 pub retention_in_days: Option<u16>,
320}
321
322impl Default for Collect {
323 fn default() -> Self {
324 Self {
325 log_group_name: String::from(""),
326 log_stream_name: String::from(""),
327 file_path: String::from(""),
328 timestamp_format: None,
329 timezone: None,
330 auto_removal: None,
331 retention_in_days: None,
332 }
333 }
334}
335
336#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
338#[serde(rename_all = "snake_case")]
339pub struct Metrics {
340 pub namespace: String,
341 pub metrics_collected: MetricsCollected,
342 #[serde(skip_serializing_if = "Option::is_none")]
343 pub append_dimensions: Option<HashMap<String, String>>,
344 #[serde(skip_serializing_if = "Option::is_none")]
347 pub aggregation_dimensions: Option<Vec<Vec<String>>>,
348 pub force_flush_interval: u32,
349}
350
351impl Default for Metrics {
352 fn default() -> Self {
353 let mut m = HashMap::new();
355 m.insert("InstanceId".to_string(), "${aws:InstanceId}".to_string());
356 m.insert(
357 "InstanceType".to_string(),
358 "${aws:InstanceType}".to_string(),
359 );
360 m.insert(
361 "AutoScalingGroupName".to_string(),
362 "${aws:AutoScalingGroupName}".to_string(),
363 );
364 Self {
365 namespace: String::new(),
366 metrics_collected: MetricsCollected::default(),
367 append_dimensions: Some(m),
368 aggregation_dimensions: Some(vec![
369 vec!["AutoScalingGroupName".to_string()],
370 vec!["InstanceId".to_string(), "InstanceType".to_string()],
371 ]),
372 force_flush_interval: 30,
373 }
374 }
375}
376
377#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
379#[serde(rename_all = "snake_case")]
380pub struct MetricsCollected {
381 #[serde(skip_serializing_if = "Option::is_none")]
382 pub cpu: Option<Cpu>,
383 #[serde(skip_serializing_if = "Option::is_none")]
384 pub mem: Option<Mem>,
385 #[serde(skip_serializing_if = "Option::is_none")]
386 pub disk: Option<Disk>,
387 #[serde(skip_serializing_if = "Option::is_none")]
388 pub diskio: Option<DiskIo>,
389 #[serde(skip_serializing_if = "Option::is_none")]
390 pub net: Option<Net>,
391 #[serde(skip_serializing_if = "Option::is_none")]
392 pub netstat: Option<Netstat>,
393}
394
395impl Default for MetricsCollected {
396 fn default() -> Self {
397 Self {
398 cpu: Some(Cpu::default()),
399 mem: Some(Mem::default()),
400 disk: Some(Disk::default()),
401 diskio: Some(DiskIo::default()),
402 net: Some(Net::default()),
403 netstat: Some(Netstat::default()),
404 }
405 }
406}
407
408#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
410#[serde(rename_all = "snake_case")]
411pub struct Cpu {
412 pub resources: Vec<String>,
413 pub measurement: Vec<String>,
414 pub metrics_collection_interval: u32,
415}
416
417impl Default for Cpu {
418 fn default() -> Self {
419 Self {
420 resources: vec!["*".to_string()],
421 measurement: vec![
422 "usage_active".to_string(), "usage_system".to_string(), ],
425 metrics_collection_interval: 60,
426 }
427 }
428}
429
430#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
432#[serde(rename_all = "snake_case")]
433pub struct Mem {
434 pub measurement: Vec<String>,
435 pub metrics_collection_interval: u32,
436}
437
438impl Default for Mem {
439 fn default() -> Self {
440 Self {
441 measurement: vec!["mem_used".to_string(), "mem_total".to_string()],
442 metrics_collection_interval: 60,
443 }
444 }
445}
446
447#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
449#[serde(rename_all = "snake_case")]
450pub struct Disk {
451 pub resources: Vec<String>,
452 pub measurement: Vec<String>,
453 #[serde(skip_serializing_if = "Option::is_none")]
454 pub ignore_file_system_types: Option<Vec<String>>,
455 pub metrics_collection_interval: u32,
456}
457
458impl Default for Disk {
459 fn default() -> Self {
460 Self {
461 resources: vec!["/".to_string()],
462 measurement: vec![
463 "used".to_string(),
464 "total".to_string(),
465 "inodes_used".to_string(),
466 "inodes_total".to_string(),
467 ],
468 ignore_file_system_types: Some(vec!["sysfs".to_string(), "devtmpfs".to_string()]),
469 metrics_collection_interval: 60,
470 }
471 }
472}
473
474impl Disk {
475 pub fn new(resources: Vec<String>) -> Self {
476 Self {
477 resources,
478 ..Default::default()
479 }
480 }
481}
482
483#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
485#[serde(rename_all = "snake_case")]
486pub struct DiskIo {
487 pub resources: Vec<String>,
488 pub measurement: Vec<String>,
489 pub metrics_collection_interval: u32,
490}
491
492impl Default for DiskIo {
493 fn default() -> Self {
494 Self {
495 resources: vec!["nvme1n1".to_string()],
500 measurement: vec![
501 "reads".to_string(),
502 "writes".to_string(),
503 "read_time".to_string(),
504 "write_time".to_string(),
505 ],
506 metrics_collection_interval: 60,
507 }
508 }
509}
510
511#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
513#[serde(rename_all = "snake_case")]
514pub struct Net {
515 pub resources: Vec<String>,
516 pub measurement: Vec<String>,
517 pub metrics_collection_interval: u32,
518}
519
520impl Default for Net {
521 fn default() -> Self {
522 Self {
523 resources: vec!["*".to_string()],
524 measurement: vec![
525 "bytes_sent".to_string(),
526 "bytes_recv".to_string(),
527 "packets_sent".to_string(),
528 "packets_recv".to_string(),
529 ],
530 metrics_collection_interval: 60,
531 }
532 }
533}
534
535#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
537#[serde(rename_all = "snake_case")]
538pub struct Netstat {
539 pub measurement: Vec<String>,
540 pub metrics_collection_interval: u32,
541}
542
543impl Default for Netstat {
544 fn default() -> Self {
545 Self {
546 measurement: vec!["tcp_listen".to_string(), "tcp_established".to_string()],
547 metrics_collection_interval: 60,
548 }
549 }
550}
551
552impl Default for Config {
553 fn default() -> Self {
554 Self::default()
555 }
556}
557
558impl Config {
559 pub fn new() -> Self {
560 Self {
561 agent: None,
562 logs: None,
563 metrics: None,
564 }
565 }
566
567 pub fn default() -> Self {
568 let mut config = Self::new();
569 config.agent = Some(Agent::default());
570 config.metrics = Some(Metrics::default());
571 config
572 }
573
574 pub fn encode_json(&self) -> io::Result<String> {
576 match serde_json::to_string(&self) {
577 Ok(s) => Ok(s),
578 Err(e) => {
579 return Err(Error::new(
580 ErrorKind::Other,
581 format!("failed to serialize Config to YAML {}", e),
582 ));
583 }
584 }
585 }
586
587 pub fn sync(&self, file_path: &str) -> io::Result<()> {
590 info!("syncing CloudWatch config to '{}'", file_path);
591 let path = Path::new(file_path);
592 let parent_dir = path.parent().unwrap();
593 fs::create_dir_all(parent_dir)?;
594
595 let ret = serde_json::to_vec(self);
596 let d = match ret {
597 Ok(d) => d,
598 Err(e) => {
599 return Err(Error::new(
600 ErrorKind::Other,
601 format!("failed to serialize Config to YAML {}", e),
602 ));
603 }
604 };
605 let mut f = File::create(file_path)?;
606 f.write_all(&d)?;
607
608 Ok(())
609 }
610
611 pub fn load(file_path: &str) -> io::Result<Self> {
612 info!("loading config from {}", file_path);
613
614 if !Path::new(file_path).exists() {
615 return Err(Error::new(
616 ErrorKind::NotFound,
617 format!("file {} does not exists", file_path),
618 ));
619 }
620
621 let f = File::open(&file_path).map_err(|e| {
622 return Error::new(
623 ErrorKind::Other,
624 format!("failed to open {} ({})", file_path, e),
625 );
626 })?;
627 serde_json::from_reader(f).map_err(|e| {
628 return Error::new(ErrorKind::InvalidInput, format!("invalid JSON: {}", e));
629 })
630 }
631
632 pub fn validate(&self) -> io::Result<()> {
634 info!("validating the CloudWatch configuration");
635
636 Ok(())
637 }
638}
639
640#[test]
641fn test_config() {
642 use std::fs;
643 let _ = env_logger::builder().is_test(true).try_init();
644
645 let config = Config::default();
646 let ret = config.encode_json();
647 assert!(ret.is_ok());
648 let s = ret.unwrap();
649 info!("config: {}", s);
650
651 let p = random_manager::tmp_path(10, Some(".json")).unwrap();
652 let ret = config.sync(&p);
653 assert!(ret.is_ok());
654 fs::remove_file(p).unwrap();
655}