1use anyhow::Result;
2use aws_sdk_cloudwatchlogs::Client;
3use chrono::DateTime;
4
5use crate::config::AwsConfig;
6use crate::types::{LogEvent, LogGroup, LogStream};
7
8pub struct CloudWatchClient {
9 client: Client,
10 config: AwsConfig,
11}
12
13impl CloudWatchClient {
14 pub async fn new(config: AwsConfig) -> Result<Self> {
15 let client = config.cloudwatch_logs_client().await;
16 Ok(Self { client, config })
17 }
18
19 pub fn config(&self) -> &AwsConfig {
20 &self.config
21 }
22
23 pub fn dummy(config: AwsConfig) -> Self {
24 let aws_config = aws_config::SdkConfig::builder()
25 .behavior_version(aws_config::BehaviorVersion::latest())
26 .region(aws_config::Region::new(config.region.clone()))
27 .build();
28 let client = Client::new(&aws_config);
29 Self { client, config }
30 }
31
32 pub async fn list_log_groups(&self) -> Result<Vec<LogGroup>> {
33 let resp = self.client.describe_log_groups().send().await?;
34
35 let groups = resp
36 .log_groups()
37 .iter()
38 .map(|g| LogGroup {
39 name: g.log_group_name().unwrap_or("").to_string(),
40 creation_time: g
41 .creation_time()
42 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
43 stored_bytes: g.stored_bytes(),
44 retention_days: g.retention_in_days(),
45 log_class: g.log_group_class().map(|c| c.as_str().to_string()),
46 arn: g.log_group_arn().map(|a| a.to_string()),
47 })
48 .collect();
49
50 Ok(groups)
51 }
52
53 pub async fn list_log_streams(&self, log_group: &str) -> Result<Vec<LogStream>> {
54 let mut streams = Vec::new();
55 let mut next_token: Option<String> = None;
56
57 loop {
58 let mut request = self
59 .client
60 .describe_log_streams()
61 .log_group_name(log_group)
62 .order_by(aws_sdk_cloudwatchlogs::types::OrderBy::LastEventTime)
63 .descending(true);
64
65 if let Some(token) = next_token {
66 request = request.next_token(token);
67 }
68
69 let resp = request.send().await?;
70
71 streams.extend(resp.log_streams().iter().map(|s| {
72 LogStream {
73 name: s.log_stream_name().unwrap_or("").to_string(),
74 creation_time: s
75 .creation_time()
76 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
77 last_event_time: s
78 .last_event_timestamp()
79 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
80 }
81 }));
82
83 if streams.len() >= 100 {
84 break;
85 }
86
87 next_token = resp.next_token().map(|s| s.to_string());
88 if next_token.is_none() {
89 break;
90 }
91 }
92
93 Ok(streams)
94 }
95
96 pub async fn get_log_events(
97 &self,
98 log_group: &str,
99 log_stream: &str,
100 backward_token: Option<String>,
101 start_time: Option<i64>,
102 end_time: Option<i64>,
103 ) -> Result<(Vec<LogEvent>, bool, Option<String>)> {
104 let prev_token = backward_token.clone();
105 let mut request = self
106 .client
107 .get_log_events()
108 .log_group_name(log_group)
109 .log_stream_name(log_stream)
110 .set_start_from_head(Some(false))
111 .set_limit(Some(25));
112
113 if let Some(start) = start_time {
114 request = request.set_start_time(Some(start));
115 }
116
117 if let Some(end) = end_time {
118 request = request.set_end_time(Some(end));
119 }
120
121 if let Some(token) = backward_token {
122 request = request.set_next_token(Some(token));
123 }
124
125 let resp = request.send().await?;
126
127 let mut events: Vec<LogEvent> = resp
128 .events()
129 .iter()
130 .map(|e| LogEvent {
131 timestamp: DateTime::from_timestamp_millis(e.timestamp().unwrap_or(0))
132 .unwrap_or_default(),
133 message: e.message().unwrap_or("").to_string(),
134 })
135 .collect();
136
137 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
138
139 let next_backward_token = resp.next_backward_token().map(|s| s.to_string());
140 let has_more = next_backward_token.is_some()
141 && next_backward_token != prev_token
142 && !events.is_empty();
143
144 Ok((events, has_more, next_backward_token))
145 }
146
147 pub async fn start_query(
148 &self,
149 log_group_names: Vec<String>,
150 query_string: String,
151 start_time: i64,
152 end_time: i64,
153 ) -> Result<String> {
154 let resp = self
155 .client
156 .start_query()
157 .set_log_group_names(Some(log_group_names))
158 .query_string(query_string)
159 .start_time(start_time / 1000)
160 .end_time(end_time / 1000)
161 .send()
162 .await?;
163
164 Ok(resp.query_id().unwrap_or("").to_string())
165 }
166
167 pub async fn get_query_results(
168 &self,
169 query_id: &str,
170 ) -> Result<(String, Vec<Vec<(String, String)>>)> {
171 let resp = self
172 .client
173 .get_query_results()
174 .query_id(query_id)
175 .send()
176 .await?;
177
178 let status = resp
179 .status()
180 .map(|s| s.as_str())
181 .unwrap_or("Unknown")
182 .to_string();
183
184 let results: Vec<Vec<(String, String)>> = resp
185 .results()
186 .iter()
187 .map(|result_row| {
188 result_row
189 .iter()
190 .map(|field| {
191 (
192 field.field().unwrap_or("").to_string(),
193 field.value().unwrap_or("").to_string(),
194 )
195 })
196 .collect()
197 })
198 .collect();
199
200 Ok((status, results))
201 }
202}