Skip to main content

aws_lite_rs/api/
logs.rs

1//! Amazon CloudWatch Logs API client.
2//!
3//! Thin wrapper over generated ops. All URL construction and HTTP methods
4//! are in `ops::logs::LogsOps`. This layer adds:
5//! - Ergonomic method signatures
6//! - Pagination stream helpers
7
8use crate::{
9    AwsHttpClient, Result,
10    ops::logs::LogsOps,
11    types::logs::{
12        DeleteLogStreamRequest, DescribeLogGroupsRequest, DescribeLogGroupsResponse,
13        DescribeLogStreamsRequest, DescribeLogStreamsResponse, DescribeMetricFiltersRequest,
14        DescribeMetricFiltersResponse, ListTagsForResourceRequest, ListTagsForResourceResponse,
15        LogGroup, MetricFilter, PutRetentionPolicyRequest,
16    },
17};
18
19/// Client for the Amazon CloudWatch Logs API
20pub struct LogsClient<'a> {
21    ops: LogsOps<'a>,
22}
23
24impl<'a> LogsClient<'a> {
25    /// Create a new Amazon CloudWatch Logs API client
26    pub(crate) fn new(client: &'a AwsHttpClient) -> Self {
27        Self {
28            ops: LogsOps::new(client),
29        }
30    }
31
32    /// Lists the specified log groups.
33    pub async fn describe_log_groups(
34        &self,
35        body: &DescribeLogGroupsRequest,
36    ) -> Result<DescribeLogGroupsResponse> {
37        self.ops.describe_log_groups(body).await
38    }
39
40    /// Stream all log groups, automatically handling pagination.
41    pub fn describe_log_groups_stream(
42        &self,
43    ) -> impl futures_core::Stream<Item = Result<LogGroup>> + '_ {
44        async_stream::try_stream! {
45            let mut next_token: Option<String> = None;
46            loop {
47                let request = DescribeLogGroupsRequest {
48                    next_token: next_token.clone(),
49                    ..Default::default()
50                };
51                let response = self.ops.describe_log_groups(&request).await?;
52                for group in response.log_groups {
53                    yield group;
54                }
55                match response.next_token {
56                    Some(token) if !token.is_empty() => next_token = Some(token),
57                    _ => break,
58                }
59            }
60        }
61    }
62
63    /// Displays the tags associated with a CloudWatch Logs resource.
64    pub async fn list_tags_for_resource(
65        &self,
66        body: &ListTagsForResourceRequest,
67    ) -> Result<ListTagsForResourceResponse> {
68        self.ops.list_tags_for_resource(body).await
69    }
70
71    /// Lists the log streams for the specified log group.
72    pub async fn describe_log_streams(
73        &self,
74        body: &DescribeLogStreamsRequest,
75    ) -> Result<DescribeLogStreamsResponse> {
76        self.ops.describe_log_streams(body).await
77    }
78
79    /// Sets the retention of the specified log group.
80    pub async fn put_retention_policy(
81        &self,
82        log_group_name: &str,
83        retention_in_days: i32,
84    ) -> Result<()> {
85        let body = PutRetentionPolicyRequest {
86            log_group_name: log_group_name.into(),
87            retention_in_days,
88        };
89        self.ops.put_retention_policy(&body).await
90    }
91
92    /// Deletes the specified log stream.
93    pub async fn delete_log_stream(
94        &self,
95        log_group_name: &str,
96        log_stream_name: &str,
97    ) -> Result<()> {
98        let body = DeleteLogStreamRequest {
99            log_group_name: log_group_name.into(),
100            log_stream_name: log_stream_name.into(),
101        };
102        self.ops.delete_log_stream(&body).await
103    }
104
105    // ── Metric Filters ─────────────────────────────────────────────────
106
107    /// Return the first page of metric filters.
108    ///
109    /// Optionally filter by `log_group_name` or `filter_name_prefix`.
110    /// Pass `next_token` from a previous response to paginate.
111    ///
112    /// CIS 5.1–5.15: check whether a metric filter with the expected pattern
113    /// exists before checking the corresponding CloudWatch alarm.
114    pub async fn describe_metric_filters(
115        &self,
116        log_group_name: Option<&str>,
117        filter_name_prefix: Option<&str>,
118        next_token: Option<&str>,
119    ) -> Result<DescribeMetricFiltersResponse> {
120        let body = DescribeMetricFiltersRequest {
121            log_group_name: log_group_name.map(str::to_string),
122            filter_name_prefix: filter_name_prefix.map(str::to_string),
123            next_token: next_token.map(str::to_string),
124            ..Default::default()
125        };
126        self.ops.describe_metric_filters(&body).await
127    }
128
129    /// Return all metric filters, following pagination.
130    ///
131    /// Optionally scope to a specific `log_group_name`.
132    ///
133    /// CIS 5.1–5.15: collect all metric filters, then search for the
134    /// expected `filterPattern` values for each alarm check.
135    pub async fn list_all_metric_filters(
136        &self,
137        log_group_name: Option<&str>,
138    ) -> Result<Vec<MetricFilter>> {
139        let mut all = Vec::new();
140        let mut next_token: Option<String> = None;
141        loop {
142            let body = DescribeMetricFiltersRequest {
143                log_group_name: log_group_name.map(str::to_string),
144                next_token: next_token.clone(),
145                ..Default::default()
146            };
147            let resp = self.ops.describe_metric_filters(&body).await?;
148            all.extend(resp.metric_filters);
149            match resp.next_token {
150                Some(t) if !t.is_empty() => next_token = Some(t),
151                _ => break,
152            }
153        }
154        Ok(all)
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::mock_client::MockClient;
162    use tokio_stream::StreamExt;
163
164    #[tokio::test]
165    async fn describe_log_groups_stream_paginates() {
166        let mut mock = MockClient::new();
167
168        // Single expectation with two sequential responses
169        mock.expect_post("/")
170            .returning_json_sequence(vec![
171                serde_json::json!({
172                    "logGroups": [{"logGroupName": "/aws/lambda/page1"}],
173                    "nextToken": "token-abc"
174                }),
175                serde_json::json!({
176                    "logGroups": [{"logGroupName": "/aws/lambda/page2"}]
177                }),
178            ])
179            .times(2);
180
181        let client = AwsHttpClient::from_mock(mock);
182        let logs = client.logs();
183
184        let groups: Vec<LogGroup> = logs
185            .describe_log_groups_stream()
186            .map(|r| r.unwrap())
187            .collect()
188            .await;
189
190        assert_eq!(groups.len(), 2);
191        assert_eq!(
192            groups[0].log_group_name,
193            Some("/aws/lambda/page1".to_string())
194        );
195        assert_eq!(
196            groups[1].log_group_name,
197            Some("/aws/lambda/page2".to_string())
198        );
199    }
200
201    #[tokio::test]
202    async fn describe_log_groups_stream_single_page() {
203        let mut mock = MockClient::new();
204
205        mock.expect_post("/").returning_json(serde_json::json!({
206            "logGroups": [
207                {"logGroupName": "/aws/lambda/one"},
208                {"logGroupName": "/aws/lambda/two"}
209            ]
210        }));
211
212        let client = AwsHttpClient::from_mock(mock);
213        let logs = client.logs();
214
215        let groups: Vec<LogGroup> = logs
216            .describe_log_groups_stream()
217            .map(|r| r.unwrap())
218            .collect()
219            .await;
220
221        assert_eq!(groups.len(), 2);
222    }
223
224    #[tokio::test]
225    async fn describe_log_groups_stream_empty() {
226        let mut mock = MockClient::new();
227
228        mock.expect_post("/").returning_json(serde_json::json!({
229            "logGroups": []
230        }));
231
232        let client = AwsHttpClient::from_mock(mock);
233        let logs = client.logs();
234
235        let groups: Vec<LogGroup> = logs
236            .describe_log_groups_stream()
237            .map(|r| r.unwrap())
238            .collect()
239            .await;
240
241        assert_eq!(groups.len(), 0);
242    }
243
244    #[tokio::test]
245    async fn describe_log_streams_returns_parsed_response() {
246        let mut mock = MockClient::new();
247        mock.expect_post("/").returning_json(serde_json::json!({
248            "logStreams": [
249                {
250                    "logStreamName": "stream-1",
251                    "creationTime": 1700000000000_i64,
252                    "arn": "arn:aws:logs:us-east-1:123:log-group:/test:log-stream:stream-1"
253                }
254            ]
255        }));
256
257        let client = AwsHttpClient::from_mock(mock);
258        let result = client
259            .logs()
260            .describe_log_streams(&DescribeLogStreamsRequest {
261                log_group_name: Some("/test".into()),
262                ..Default::default()
263            })
264            .await
265            .unwrap();
266        assert_eq!(result.log_streams.len(), 1);
267        assert_eq!(
268            result.log_streams[0].log_stream_name.as_deref(),
269            Some("stream-1")
270        );
271        assert!(result.log_streams[0].creation_time.is_some());
272    }
273
274    #[tokio::test]
275    async fn put_retention_policy_succeeds() {
276        let mut mock = MockClient::new();
277        mock.expect_post("/")
278            .returning_json(serde_json::json!(null));
279
280        let client = AwsHttpClient::from_mock(mock);
281        client
282            .logs()
283            .put_retention_policy("/test-group", 7)
284            .await
285            .unwrap();
286    }
287
288    #[tokio::test]
289    async fn delete_log_stream_succeeds() {
290        let mut mock = MockClient::new();
291        mock.expect_post("/")
292            .returning_json(serde_json::json!(null));
293
294        let client = AwsHttpClient::from_mock(mock);
295        client
296            .logs()
297            .delete_log_stream("/test-group", "test-stream")
298            .await
299            .unwrap();
300    }
301
302    // ── Metric Filters ─────────────────────────────────────────────────
303
304    #[tokio::test]
305    async fn describe_metric_filters_returns_filters() {
306        let mut mock = MockClient::new();
307        mock.expect_post("/").returning_json(serde_json::json!({
308            "metricFilters": [
309                {
310                    "filterName": "UnauthorizedApiCalls",
311                    "filterPattern": "{ ($.errorCode = \"*UnauthorizedOperation\") || ($.errorCode = \"AccessDenied*\") }",
312                    "logGroupName": "CloudTrail/DefaultLogGroup",
313                    "creationTime": 1700000000000_i64,
314                    "metricTransformations": [
315                        {
316                            "metricName": "UnauthorizedApiCallCount",
317                            "metricNamespace": "CISBenchmark",
318                            "metricValue": "1"
319                        }
320                    ]
321                }
322            ]
323        }));
324
325        let client = AwsHttpClient::from_mock(mock);
326        let resp = client
327            .logs()
328            .describe_metric_filters(Some("CloudTrail/DefaultLogGroup"), None, None)
329            .await
330            .unwrap();
331
332        assert_eq!(resp.metric_filters.len(), 1);
333        let f = &resp.metric_filters[0];
334        assert_eq!(f.filter_name.as_deref(), Some("UnauthorizedApiCalls"));
335        assert!(
336            f.filter_pattern
337                .as_deref()
338                .unwrap_or("")
339                .contains("UnauthorizedOperation")
340        );
341        assert_eq!(
342            f.log_group_name.as_deref(),
343            Some("CloudTrail/DefaultLogGroup")
344        );
345        assert_eq!(f.metric_transformations.len(), 1);
346        assert_eq!(
347            f.metric_transformations[0].metric_name,
348            "UnauthorizedApiCallCount"
349        );
350        assert_eq!(f.metric_transformations[0].metric_namespace, "CISBenchmark");
351    }
352
353    #[tokio::test]
354    async fn describe_metric_filters_handles_empty() {
355        let mut mock = MockClient::new();
356        mock.expect_post("/")
357            .returning_json(serde_json::json!({"metricFilters": []}));
358
359        let client = AwsHttpClient::from_mock(mock);
360        let resp = client
361            .logs()
362            .describe_metric_filters(None, None, None)
363            .await
364            .unwrap();
365        assert!(resp.metric_filters.is_empty());
366    }
367
368    #[tokio::test]
369    async fn list_all_metric_filters_paginates() {
370        let mut mock = MockClient::new();
371        mock.expect_post("/")
372            .returning_json_sequence(vec![
373                serde_json::json!({
374                    "metricFilters": [{
375                        "filterName": "Filter1",
376                        "filterPattern": "{ $.errorCode = \"*\" }",
377                        "logGroupName": "/aws/cloudtrail",
378                        "metricTransformations": [{"metricName": "M1", "metricNamespace": "NS", "metricValue": "1"}]
379                    }],
380                    "nextToken": "page2"
381                }),
382                serde_json::json!({
383                    "metricFilters": [{
384                        "filterName": "Filter2",
385                        "filterPattern": "{ $.eventName = \"ConsoleLogin\" }",
386                        "logGroupName": "/aws/cloudtrail",
387                        "metricTransformations": [{"metricName": "M2", "metricNamespace": "NS", "metricValue": "1"}]
388                    }]
389                }),
390            ])
391            .times(2);
392
393        let client = AwsHttpClient::from_mock(mock);
394        let filters = client
395            .logs()
396            .list_all_metric_filters(Some("/aws/cloudtrail"))
397            .await
398            .unwrap();
399
400        assert_eq!(filters.len(), 2);
401        assert_eq!(filters[0].filter_name.as_deref(), Some("Filter1"));
402        assert_eq!(filters[1].filter_name.as_deref(), Some("Filter2"));
403    }
404}