1use anyhow::Result;
2use aws_sdk_cloudwatchlogs::Client;
3use chrono::DateTime;
4
5use crate::config::AwsConfig;
6use crate::types::{LogEvent, LogGroup, LogStream};
7
8pub struct CloudWatchClient {
9 client: Client,
10 config: AwsConfig,
11}
12
13impl CloudWatchClient {
14 pub async fn new(config: AwsConfig) -> Result<Self> {
15 let client = config.cloudwatch_logs_client().await;
16 Ok(Self { client, config })
17 }
18
19 pub fn config(&self) -> &AwsConfig {
20 &self.config
21 }
22
23 pub fn dummy(config: AwsConfig) -> Self {
24 let aws_config = aws_config::SdkConfig::builder()
25 .behavior_version(aws_config::BehaviorVersion::latest())
26 .region(aws_config::Region::new(config.region.clone()))
27 .build();
28 let client = Client::new(&aws_config);
29 Self { client, config }
30 }
31
32 pub async fn list_log_groups(&self) -> Result<Vec<LogGroup>> {
33 let resp = self.client.describe_log_groups().send().await?;
34
35 let groups = resp
36 .log_groups()
37 .iter()
38 .map(|g| LogGroup {
39 name: g.log_group_name().unwrap_or("").to_string(),
40 creation_time: g
41 .creation_time()
42 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
43 stored_bytes: g.stored_bytes(),
44 retention_days: g.retention_in_days(),
45 log_class: g.log_group_class().map(|c| c.as_str().to_string()),
46 arn: g.arn().map(|a| a.to_string()),
47 log_group_arn: g.log_group_arn().map(|a| a.to_string()),
48 deletion_protection_enabled: g.deletion_protection_enabled(),
49 })
50 .collect();
51
52 Ok(groups)
53 }
54
55 pub async fn list_log_streams(&self, log_group: &str) -> Result<Vec<LogStream>> {
56 let mut streams = Vec::new();
57 let mut next_token: Option<String> = None;
58
59 loop {
60 let mut request = self
61 .client
62 .describe_log_streams()
63 .log_group_name(log_group)
64 .order_by(aws_sdk_cloudwatchlogs::types::OrderBy::LastEventTime)
65 .descending(true);
66
67 if let Some(token) = next_token {
68 request = request.next_token(token);
69 }
70
71 let resp = request.send().await?;
72
73 streams.extend(resp.log_streams().iter().map(|s| {
74 LogStream {
75 name: s.log_stream_name().unwrap_or("").to_string(),
76 creation_time: s
77 .creation_time()
78 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
79 last_event_time: s
80 .last_event_timestamp()
81 .map(|t| DateTime::from_timestamp_millis(t).unwrap_or_default()),
82 }
83 }));
84
85 if streams.len() >= 100 {
86 break;
87 }
88
89 next_token = resp.next_token().map(|s| s.to_string());
90 if next_token.is_none() {
91 break;
92 }
93 }
94
95 Ok(streams)
96 }
97
98 pub async fn get_log_events(
99 &self,
100 log_group: &str,
101 log_stream: &str,
102 backward_token: Option<String>,
103 start_time: Option<i64>,
104 end_time: Option<i64>,
105 ) -> Result<(Vec<LogEvent>, bool, Option<String>)> {
106 let prev_token = backward_token.clone();
107 let mut request = self
108 .client
109 .get_log_events()
110 .log_group_name(log_group)
111 .log_stream_name(log_stream)
112 .set_start_from_head(Some(false))
113 .set_limit(Some(25));
114
115 if let Some(start) = start_time {
116 request = request.set_start_time(Some(start));
117 }
118
119 if let Some(end) = end_time {
120 request = request.set_end_time(Some(end));
121 }
122
123 if let Some(token) = backward_token {
124 request = request.set_next_token(Some(token));
125 }
126
127 let resp = request.send().await?;
128
129 let mut events: Vec<LogEvent> = resp
130 .events()
131 .iter()
132 .map(|e| LogEvent {
133 timestamp: DateTime::from_timestamp_millis(e.timestamp().unwrap_or(0))
134 .unwrap_or_default(),
135 message: e.message().unwrap_or("").to_string(),
136 })
137 .collect();
138
139 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
140
141 let next_backward_token = resp.next_backward_token().map(|s| s.to_string());
142 let has_more = next_backward_token.is_some()
143 && next_backward_token != prev_token
144 && !events.is_empty();
145
146 Ok((events, has_more, next_backward_token))
147 }
148
149 pub async fn start_query(
150 &self,
151 log_group_names: Vec<String>,
152 query_string: String,
153 start_time: i64,
154 end_time: i64,
155 ) -> Result<String> {
156 let resp = self
157 .client
158 .start_query()
159 .set_log_group_names(Some(log_group_names))
160 .query_string(query_string)
161 .start_time(start_time / 1000)
162 .end_time(end_time / 1000)
163 .send()
164 .await?;
165
166 Ok(resp.query_id().unwrap_or("").to_string())
167 }
168
169 pub async fn get_query_results(
170 &self,
171 query_id: &str,
172 ) -> Result<(String, Vec<Vec<(String, String)>>)> {
173 let resp = self
174 .client
175 .get_query_results()
176 .query_id(query_id)
177 .send()
178 .await?;
179
180 let status = resp
181 .status()
182 .map(|s| s.as_str())
183 .unwrap_or("Unknown")
184 .to_string();
185
186 let results: Vec<Vec<(String, String)>> = resp
187 .results()
188 .iter()
189 .map(|result_row| {
190 result_row
191 .iter()
192 .map(|field| {
193 (
194 field.field().unwrap_or("").to_string(),
195 field.value().unwrap_or("").to_string(),
196 )
197 })
198 .collect()
199 })
200 .collect();
201
202 Ok((status, results))
203 }
204
205 pub async fn list_tags_for_log_group(
206 &self,
207 log_group_arn: &str,
208 ) -> Result<Vec<(String, String)>> {
209 let resp = self
210 .client
211 .list_tags_for_resource()
212 .resource_arn(log_group_arn)
213 .send()
214 .await?;
215
216 let tags = if let Some(tag_map) = resp.tags() {
217 tag_map
218 .iter()
219 .map(|(k, v)| (k.clone(), v.clone()))
220 .collect()
221 } else {
222 Vec::new()
223 };
224
225 Ok(tags)
226 }
227}