1use crate::{
9 AwsHttpClient, Result,
10 ops::logs::LogsOps,
11 types::logs::{
12 DeleteLogStreamRequest, DescribeLogGroupsRequest, DescribeLogGroupsResponse,
13 DescribeLogStreamsRequest, DescribeLogStreamsResponse, DescribeMetricFiltersRequest,
14 DescribeMetricFiltersResponse, ListTagsForResourceRequest, ListTagsForResourceResponse,
15 LogGroup, MetricFilter, PutRetentionPolicyRequest,
16 },
17};
18
19pub struct LogsClient<'a> {
21 ops: LogsOps<'a>,
22}
23
24impl<'a> LogsClient<'a> {
25 pub(crate) fn new(client: &'a AwsHttpClient) -> Self {
27 Self {
28 ops: LogsOps::new(client),
29 }
30 }
31
32 pub async fn describe_log_groups(
34 &self,
35 body: &DescribeLogGroupsRequest,
36 ) -> Result<DescribeLogGroupsResponse> {
37 self.ops.describe_log_groups(body).await
38 }
39
40 pub fn describe_log_groups_stream(
42 &self,
43 ) -> impl futures_core::Stream<Item = Result<LogGroup>> + '_ {
44 async_stream::try_stream! {
45 let mut next_token: Option<String> = None;
46 loop {
47 let request = DescribeLogGroupsRequest {
48 next_token: next_token.clone(),
49 ..Default::default()
50 };
51 let response = self.ops.describe_log_groups(&request).await?;
52 for group in response.log_groups {
53 yield group;
54 }
55 match response.next_token {
56 Some(token) if !token.is_empty() => next_token = Some(token),
57 _ => break,
58 }
59 }
60 }
61 }
62
63 pub async fn list_tags_for_resource(
65 &self,
66 body: &ListTagsForResourceRequest,
67 ) -> Result<ListTagsForResourceResponse> {
68 self.ops.list_tags_for_resource(body).await
69 }
70
71 pub async fn describe_log_streams(
73 &self,
74 body: &DescribeLogStreamsRequest,
75 ) -> Result<DescribeLogStreamsResponse> {
76 self.ops.describe_log_streams(body).await
77 }
78
79 pub async fn put_retention_policy(
81 &self,
82 log_group_name: &str,
83 retention_in_days: i32,
84 ) -> Result<()> {
85 let body = PutRetentionPolicyRequest {
86 log_group_name: log_group_name.into(),
87 retention_in_days,
88 };
89 self.ops.put_retention_policy(&body).await
90 }
91
92 pub async fn delete_log_stream(
94 &self,
95 log_group_name: &str,
96 log_stream_name: &str,
97 ) -> Result<()> {
98 let body = DeleteLogStreamRequest {
99 log_group_name: log_group_name.into(),
100 log_stream_name: log_stream_name.into(),
101 };
102 self.ops.delete_log_stream(&body).await
103 }
104
105 pub async fn describe_metric_filters(
115 &self,
116 log_group_name: Option<&str>,
117 filter_name_prefix: Option<&str>,
118 next_token: Option<&str>,
119 ) -> Result<DescribeMetricFiltersResponse> {
120 let body = DescribeMetricFiltersRequest {
121 log_group_name: log_group_name.map(str::to_string),
122 filter_name_prefix: filter_name_prefix.map(str::to_string),
123 next_token: next_token.map(str::to_string),
124 ..Default::default()
125 };
126 self.ops.describe_metric_filters(&body).await
127 }
128
129 pub async fn list_all_metric_filters(
136 &self,
137 log_group_name: Option<&str>,
138 ) -> Result<Vec<MetricFilter>> {
139 let mut all = Vec::new();
140 let mut next_token: Option<String> = None;
141 loop {
142 let body = DescribeMetricFiltersRequest {
143 log_group_name: log_group_name.map(str::to_string),
144 next_token: next_token.clone(),
145 ..Default::default()
146 };
147 let resp = self.ops.describe_metric_filters(&body).await?;
148 all.extend(resp.metric_filters);
149 match resp.next_token {
150 Some(t) if !t.is_empty() => next_token = Some(t),
151 _ => break,
152 }
153 }
154 Ok(all)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::mock_client::MockClient;
162 use tokio_stream::StreamExt;
163
164 #[tokio::test]
165 async fn describe_log_groups_stream_paginates() {
166 let mut mock = MockClient::new();
167
168 mock.expect_post("/")
170 .returning_json_sequence(vec![
171 serde_json::json!({
172 "logGroups": [{"logGroupName": "/aws/lambda/page1"}],
173 "nextToken": "token-abc"
174 }),
175 serde_json::json!({
176 "logGroups": [{"logGroupName": "/aws/lambda/page2"}]
177 }),
178 ])
179 .times(2);
180
181 let client = AwsHttpClient::from_mock(mock);
182 let logs = client.logs();
183
184 let groups: Vec<LogGroup> = logs
185 .describe_log_groups_stream()
186 .map(|r| r.unwrap())
187 .collect()
188 .await;
189
190 assert_eq!(groups.len(), 2);
191 assert_eq!(
192 groups[0].log_group_name,
193 Some("/aws/lambda/page1".to_string())
194 );
195 assert_eq!(
196 groups[1].log_group_name,
197 Some("/aws/lambda/page2".to_string())
198 );
199 }
200
201 #[tokio::test]
202 async fn describe_log_groups_stream_single_page() {
203 let mut mock = MockClient::new();
204
205 mock.expect_post("/").returning_json(serde_json::json!({
206 "logGroups": [
207 {"logGroupName": "/aws/lambda/one"},
208 {"logGroupName": "/aws/lambda/two"}
209 ]
210 }));
211
212 let client = AwsHttpClient::from_mock(mock);
213 let logs = client.logs();
214
215 let groups: Vec<LogGroup> = logs
216 .describe_log_groups_stream()
217 .map(|r| r.unwrap())
218 .collect()
219 .await;
220
221 assert_eq!(groups.len(), 2);
222 }
223
224 #[tokio::test]
225 async fn describe_log_groups_stream_empty() {
226 let mut mock = MockClient::new();
227
228 mock.expect_post("/").returning_json(serde_json::json!({
229 "logGroups": []
230 }));
231
232 let client = AwsHttpClient::from_mock(mock);
233 let logs = client.logs();
234
235 let groups: Vec<LogGroup> = logs
236 .describe_log_groups_stream()
237 .map(|r| r.unwrap())
238 .collect()
239 .await;
240
241 assert_eq!(groups.len(), 0);
242 }
243
244 #[tokio::test]
245 async fn describe_log_streams_returns_parsed_response() {
246 let mut mock = MockClient::new();
247 mock.expect_post("/").returning_json(serde_json::json!({
248 "logStreams": [
249 {
250 "logStreamName": "stream-1",
251 "creationTime": 1700000000000_i64,
252 "arn": "arn:aws:logs:us-east-1:123:log-group:/test:log-stream:stream-1"
253 }
254 ]
255 }));
256
257 let client = AwsHttpClient::from_mock(mock);
258 let result = client
259 .logs()
260 .describe_log_streams(&DescribeLogStreamsRequest {
261 log_group_name: Some("/test".into()),
262 ..Default::default()
263 })
264 .await
265 .unwrap();
266 assert_eq!(result.log_streams.len(), 1);
267 assert_eq!(
268 result.log_streams[0].log_stream_name.as_deref(),
269 Some("stream-1")
270 );
271 assert!(result.log_streams[0].creation_time.is_some());
272 }
273
274 #[tokio::test]
275 async fn put_retention_policy_succeeds() {
276 let mut mock = MockClient::new();
277 mock.expect_post("/")
278 .returning_json(serde_json::json!(null));
279
280 let client = AwsHttpClient::from_mock(mock);
281 client
282 .logs()
283 .put_retention_policy("/test-group", 7)
284 .await
285 .unwrap();
286 }
287
288 #[tokio::test]
289 async fn delete_log_stream_succeeds() {
290 let mut mock = MockClient::new();
291 mock.expect_post("/")
292 .returning_json(serde_json::json!(null));
293
294 let client = AwsHttpClient::from_mock(mock);
295 client
296 .logs()
297 .delete_log_stream("/test-group", "test-stream")
298 .await
299 .unwrap();
300 }
301
302 #[tokio::test]
305 async fn describe_metric_filters_returns_filters() {
306 let mut mock = MockClient::new();
307 mock.expect_post("/").returning_json(serde_json::json!({
308 "metricFilters": [
309 {
310 "filterName": "UnauthorizedApiCalls",
311 "filterPattern": "{ ($.errorCode = \"*UnauthorizedOperation\") || ($.errorCode = \"AccessDenied*\") }",
312 "logGroupName": "CloudTrail/DefaultLogGroup",
313 "creationTime": 1700000000000_i64,
314 "metricTransformations": [
315 {
316 "metricName": "UnauthorizedApiCallCount",
317 "metricNamespace": "CISBenchmark",
318 "metricValue": "1"
319 }
320 ]
321 }
322 ]
323 }));
324
325 let client = AwsHttpClient::from_mock(mock);
326 let resp = client
327 .logs()
328 .describe_metric_filters(Some("CloudTrail/DefaultLogGroup"), None, None)
329 .await
330 .unwrap();
331
332 assert_eq!(resp.metric_filters.len(), 1);
333 let f = &resp.metric_filters[0];
334 assert_eq!(f.filter_name.as_deref(), Some("UnauthorizedApiCalls"));
335 assert!(
336 f.filter_pattern
337 .as_deref()
338 .unwrap_or("")
339 .contains("UnauthorizedOperation")
340 );
341 assert_eq!(
342 f.log_group_name.as_deref(),
343 Some("CloudTrail/DefaultLogGroup")
344 );
345 assert_eq!(f.metric_transformations.len(), 1);
346 assert_eq!(
347 f.metric_transformations[0].metric_name,
348 "UnauthorizedApiCallCount"
349 );
350 assert_eq!(f.metric_transformations[0].metric_namespace, "CISBenchmark");
351 }
352
353 #[tokio::test]
354 async fn describe_metric_filters_handles_empty() {
355 let mut mock = MockClient::new();
356 mock.expect_post("/")
357 .returning_json(serde_json::json!({"metricFilters": []}));
358
359 let client = AwsHttpClient::from_mock(mock);
360 let resp = client
361 .logs()
362 .describe_metric_filters(None, None, None)
363 .await
364 .unwrap();
365 assert!(resp.metric_filters.is_empty());
366 }
367
368 #[tokio::test]
369 async fn list_all_metric_filters_paginates() {
370 let mut mock = MockClient::new();
371 mock.expect_post("/")
372 .returning_json_sequence(vec![
373 serde_json::json!({
374 "metricFilters": [{
375 "filterName": "Filter1",
376 "filterPattern": "{ $.errorCode = \"*\" }",
377 "logGroupName": "/aws/cloudtrail",
378 "metricTransformations": [{"metricName": "M1", "metricNamespace": "NS", "metricValue": "1"}]
379 }],
380 "nextToken": "page2"
381 }),
382 serde_json::json!({
383 "metricFilters": [{
384 "filterName": "Filter2",
385 "filterPattern": "{ $.eventName = \"ConsoleLogin\" }",
386 "logGroupName": "/aws/cloudtrail",
387 "metricTransformations": [{"metricName": "M2", "metricNamespace": "NS", "metricValue": "1"}]
388 }]
389 }),
390 ])
391 .times(2);
392
393 let client = AwsHttpClient::from_mock(mock);
394 let filters = client
395 .logs()
396 .list_all_metric_filters(Some("/aws/cloudtrail"))
397 .await
398 .unwrap();
399
400 assert_eq!(filters.len(), 2);
401 assert_eq!(filters[0].filter_name.as_deref(), Some("Filter1"));
402 assert_eq!(filters[1].filter_name.as_deref(), Some("Filter2"));
403 }
404}