Skip to main content

rusticity_core/
cw.rs

1use anyhow::Result;
2use aws_sdk_cloudwatchlogs::Client;
3use chrono::DateTime;
4
5use crate::config::AwsConfig;
6use crate::types::{LogEvent, LogGroup, LogStream};
7
8pub struct CloudWatchClient {
9    client: Client,
10    config: AwsConfig,
11}
12
13impl CloudWatchClient {
14    pub async fn new(config: AwsConfig) -> Result<Self> {
15        let client = config.cloudwatch_logs_client().await;
16        Ok(Self { client, config })
17    }
18
19    pub fn config(&self) -> &AwsConfig {
20        &self.config
21    }
22
23    pub fn dummy(config: AwsConfig) -> Self {
24        let aws_config = aws_config::SdkConfig::builder()
25            .behavior_version(aws_config::BehaviorVersion::latest())
26            .region(aws_config::Region::new(config.region.clone()))
27            .build();
28        let client = Client::new(&aws_config);
29        Self { client, config }
30    }
31
32    pub async fn list_log_groups(&self) -> Result<Vec<LogGroup>> {
33        let resp = self.client.describe_log_groups().send().await?;
34
35        let groups = resp
36            .log_groups()
37            .iter()
38            .map(|g| LogGroup {
39                name: g.log_group_name().unwrap_or("").to_string(),
40                creation_time: g
41                    .creation_time()
42                    .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
43                stored_bytes: g.stored_bytes(),
44                retention_days: g.retention_in_days(),
45                log_class: g.log_group_class().map(|c| c.as_str().to_string()),
46                arn: g.arn().map(|a| a.to_string()),
47                log_group_arn: g.log_group_arn().map(|a| a.to_string()),
48                deletion_protection_enabled: g.deletion_protection_enabled(),
49            })
50            .collect();
51
52        Ok(groups)
53    }
54
55    pub async fn list_log_streams(&self, log_group: &str) -> Result<Vec<LogStream>> {
56        let mut streams = Vec::new();
57        let mut next_token: Option<String> = None;
58
59        loop {
60            let mut request = self
61                .client
62                .describe_log_streams()
63                .log_group_name(log_group)
64                .order_by(aws_sdk_cloudwatchlogs::types::OrderBy::LastEventTime)
65                .descending(true);
66
67            if let Some(token) = next_token {
68                request = request.next_token(token);
69            }
70
71            let resp = request.send().await?;
72
73            streams.extend(resp.log_streams().iter().map(|s| {
74                LogStream {
75                    name: s.log_stream_name().unwrap_or("").to_string(),
76                    creation_time: s
77                        .creation_time()
78                        .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
79                    last_event_time: s
80                        .last_event_timestamp()
81                        .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
82                }
83            }));
84
85            if streams.len() >= 100 {
86                break;
87            }
88
89            next_token = resp.next_token().map(|s| s.to_string());
90            if next_token.is_none() {
91                break;
92            }
93        }
94
95        Ok(streams)
96    }
97
98    pub async fn get_log_events(
99        &self,
100        log_group: &str,
101        log_stream: &str,
102        backward_token: Option<String>,
103        start_time: Option<i64>,
104        end_time: Option<i64>,
105    ) -> Result<(Vec<LogEvent>, bool, Option<String>)> {
106        let prev_token = backward_token.clone();
107        let mut request = self
108            .client
109            .get_log_events()
110            .log_group_name(log_group)
111            .log_stream_name(log_stream)
112            .set_start_from_head(Some(false))
113            .set_limit(Some(25));
114
115        if let Some(start) = start_time {
116            request = request.set_start_time(Some(start));
117        }
118
119        if let Some(end) = end_time {
120            request = request.set_end_time(Some(end));
121        }
122
123        if let Some(token) = backward_token {
124            request = request.set_next_token(Some(token));
125        }
126
127        let resp = request.send().await?;
128
129        let mut events: Vec<LogEvent> = resp
130            .events()
131            .iter()
132            .map(|e| LogEvent {
133                timestamp: DateTime::from_timestamp_millis(e.timestamp().unwrap_or(0))
134                    .unwrap_or_default(),
135                message: e.message().unwrap_or("").to_string(),
136            })
137            .collect();
138
139        events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
140
141        let next_backward_token = resp.next_backward_token().map(|s| s.to_string());
142        let has_more = next_backward_token.is_some()
143            && next_backward_token != prev_token
144            && !events.is_empty();
145
146        Ok((events, has_more, next_backward_token))
147    }
148
149    pub async fn start_query(
150        &self,
151        log_group_names: Vec<String>,
152        query_string: String,
153        start_time: i64,
154        end_time: i64,
155    ) -> Result<String> {
156        let resp = self
157            .client
158            .start_query()
159            .set_log_group_names(Some(log_group_names))
160            .query_string(query_string)
161            .start_time(start_time / 1000)
162            .end_time(end_time / 1000)
163            .send()
164            .await?;
165
166        Ok(resp.query_id().unwrap_or("").to_string())
167    }
168
169    pub async fn get_query_results(
170        &self,
171        query_id: &str,
172    ) -> Result<(String, Vec<Vec<(String, String)>>)> {
173        let resp = self
174            .client
175            .get_query_results()
176            .query_id(query_id)
177            .send()
178            .await?;
179
180        let status = resp
181            .status()
182            .map(|s| s.as_str())
183            .unwrap_or("Unknown")
184            .to_string();
185
186        let results: Vec<Vec<(String, String)>> = resp
187            .results()
188            .iter()
189            .map(|result_row| {
190                result_row
191                    .iter()
192                    .map(|field| {
193                        (
194                            field.field().unwrap_or("").to_string(),
195                            field.value().unwrap_or("").to_string(),
196                        )
197                    })
198                    .collect()
199            })
200            .collect();
201
202        Ok((status, results))
203    }
204
205    pub async fn list_tags_for_log_group(
206        &self,
207        log_group_arn: &str,
208    ) -> Result<Vec<(String, String)>> {
209        let resp = self
210            .client
211            .list_tags_for_resource()
212            .resource_arn(log_group_arn)
213            .send()
214            .await?;
215
216        let tags = if let Some(tag_map) = resp.tags() {
217            tag_map
218                .iter()
219                .map(|(k, v)| (k.clone(), v.clone()))
220                .collect()
221        } else {
222            Vec::new()
223        };
224
225        Ok(tags)
226    }
227}