rusticity_core/
cw.rs

1use anyhow::Result;
2use aws_sdk_cloudwatchlogs::Client;
3use chrono::DateTime;
4
5use crate::config::AwsConfig;
6use crate::types::{LogEvent, LogGroup, LogStream};
7
8pub struct CloudWatchClient {
9    client: Client,
10    config: AwsConfig,
11}
12
13impl CloudWatchClient {
14    pub async fn new(config: AwsConfig) -> Result<Self> {
15        let client = config.cloudwatch_logs_client().await;
16        Ok(Self { client, config })
17    }
18
19    pub fn config(&self) -> &AwsConfig {
20        &self.config
21    }
22
23    pub fn dummy(config: AwsConfig) -> Self {
24        let aws_config = aws_config::SdkConfig::builder()
25            .behavior_version(aws_config::BehaviorVersion::latest())
26            .region(aws_config::Region::new(config.region.clone()))
27            .build();
28        let client = Client::new(&aws_config);
29        Self { client, config }
30    }
31
32    pub async fn list_log_groups(&self) -> Result<Vec<LogGroup>> {
33        let resp = self.client.describe_log_groups().send().await?;
34
35        let groups = resp
36            .log_groups()
37            .iter()
38            .map(|g| LogGroup {
39                name: g.log_group_name().unwrap_or("").to_string(),
40                creation_time: g
41                    .creation_time()
42                    .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
43                stored_bytes: g.stored_bytes(),
44                retention_days: g.retention_in_days(),
45                log_class: g.log_group_class().map(|c| c.as_str().to_string()),
46                arn: g.log_group_arn().map(|a| a.to_string()),
47            })
48            .collect();
49
50        Ok(groups)
51    }
52
53    pub async fn list_log_streams(&self, log_group: &str) -> Result<Vec<LogStream>> {
54        let mut streams = Vec::new();
55        let mut next_token: Option<String> = None;
56
57        loop {
58            let mut request = self
59                .client
60                .describe_log_streams()
61                .log_group_name(log_group)
62                .order_by(aws_sdk_cloudwatchlogs::types::OrderBy::LastEventTime)
63                .descending(true);
64
65            if let Some(token) = next_token {
66                request = request.next_token(token);
67            }
68
69            let resp = request.send().await?;
70
71            streams.extend(resp.log_streams().iter().map(|s| {
72                LogStream {
73                    name: s.log_stream_name().unwrap_or("").to_string(),
74                    creation_time: s
75                        .creation_time()
76                        .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
77                    last_event_time: s
78                        .last_event_timestamp()
79                        .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
80                }
81            }));
82
83            if streams.len() >= 100 {
84                break;
85            }
86
87            next_token = resp.next_token().map(|s| s.to_string());
88            if next_token.is_none() {
89                break;
90            }
91        }
92
93        Ok(streams)
94    }
95
96    pub async fn get_log_events(
97        &self,
98        log_group: &str,
99        log_stream: &str,
100        backward_token: Option<String>,
101        start_time: Option<i64>,
102        end_time: Option<i64>,
103    ) -> Result<(Vec<LogEvent>, bool, Option<String>)> {
104        let prev_token = backward_token.clone();
105        let mut request = self
106            .client
107            .get_log_events()
108            .log_group_name(log_group)
109            .log_stream_name(log_stream)
110            .set_start_from_head(Some(false))
111            .set_limit(Some(25));
112
113        if let Some(start) = start_time {
114            request = request.set_start_time(Some(start));
115        }
116
117        if let Some(end) = end_time {
118            request = request.set_end_time(Some(end));
119        }
120
121        if let Some(token) = backward_token {
122            request = request.set_next_token(Some(token));
123        }
124
125        let resp = request.send().await?;
126
127        let mut events: Vec<LogEvent> = resp
128            .events()
129            .iter()
130            .map(|e| LogEvent {
131                timestamp: DateTime::from_timestamp_millis(e.timestamp().unwrap_or(0))
132                    .unwrap_or_default(),
133                message: e.message().unwrap_or("").to_string(),
134            })
135            .collect();
136
137        events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
138
139        let next_backward_token = resp.next_backward_token().map(|s| s.to_string());
140        let has_more = next_backward_token.is_some()
141            && next_backward_token != prev_token
142            && !events.is_empty();
143
144        Ok((events, has_more, next_backward_token))
145    }
146
147    pub async fn start_query(
148        &self,
149        log_group_names: Vec<String>,
150        query_string: String,
151        start_time: i64,
152        end_time: i64,
153    ) -> Result<String> {
154        let resp = self
155            .client
156            .start_query()
157            .set_log_group_names(Some(log_group_names))
158            .query_string(query_string)
159            .start_time(start_time / 1000)
160            .end_time(end_time / 1000)
161            .send()
162            .await?;
163
164        Ok(resp.query_id().unwrap_or("").to_string())
165    }
166
167    pub async fn get_query_results(
168        &self,
169        query_id: &str,
170    ) -> Result<(String, Vec<Vec<(String, String)>>)> {
171        let resp = self
172            .client
173            .get_query_results()
174            .query_id(query_id)
175            .send()
176            .await?;
177
178        let status = resp
179            .status()
180            .map(|s| s.as_str())
181            .unwrap_or("Unknown")
182            .to_string();
183
184        let results: Vec<Vec<(String, String)>> = resp
185            .results()
186            .iter()
187            .map(|result_row| {
188                result_row
189                    .iter()
190                    .map(|field| {
191                        (
192                            field.field().unwrap_or("").to_string(),
193                            field.value().unwrap_or("").to_string(),
194                        )
195                    })
196                    .collect()
197            })
198            .collect();
199
200        Ok((status, results))
201    }
202}