Skip to main content

awsim_cloudwatch_logs/
lib.rs

1mod handler;
2mod operations;
3pub mod sqlite_store;
4mod state;
5
6pub use handler::CloudWatchLogsService;
7pub use sqlite_store::{LogEventRow, SqliteStore};
8
9#[cfg(test)]
10mod tests {
11    use awsim_core::RequestContext;
12    use serde_json::json;
13
14    use super::handler::CloudWatchLogsService;
15    use awsim_core::ServiceHandler;
16
17    fn ctx() -> RequestContext {
18        RequestContext::new("logs", "us-east-1")
19    }
20
21    fn now_ts() -> u64 {
22        super::state::now_millis()
23    }
24
25    fn block_on<F: std::future::Future>(f: F) -> F::Output {
26        use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
27
28        fn noop_clone(_: *const ()) -> RawWaker {
29            noop_raw_waker()
30        }
31        fn noop(_: *const ()) {}
32        fn noop_raw_waker() -> RawWaker {
33            static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
34            RawWaker::new(std::ptr::null(), &VTABLE)
35        }
36        let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
37        let mut cx = Context::from_waker(&waker);
38        let mut fut = std::pin::pin!(f);
39        loop {
40            match fut.as_mut().poll(&mut cx) {
41                Poll::Ready(v) => return v,
42                Poll::Pending => {}
43            }
44        }
45    }
46
47    // -----------------------------------------------------------------------
48    // Log Groups
49    // -----------------------------------------------------------------------
50
51    #[test]
52    fn test_create_log_group_basic() {
53        let svc = CloudWatchLogsService::new();
54        let ctx = ctx();
55        block_on(svc.handle(
56            "CreateLogGroup",
57            json!({ "logGroupName": "/my/app/logs" }),
58            &ctx,
59        ))
60        .unwrap();
61    }
62
63    #[test]
64    fn test_create_log_group_with_tags() {
65        let svc = CloudWatchLogsService::new();
66        let ctx = ctx();
67        block_on(svc.handle(
68            "CreateLogGroup",
69            json!({ "logGroupName": "/tagged/group", "tags": { "env": "test" } }),
70            &ctx,
71        ))
72        .unwrap();
73
74        let tags = block_on(svc.handle(
75            "ListTagsLogGroup",
76            json!({ "logGroupName": "/tagged/group" }),
77            &ctx,
78        ))
79        .unwrap();
80        assert_eq!(tags["tags"]["env"].as_str().unwrap(), "test");
81    }
82
83    #[test]
84    fn test_create_log_group_duplicate() {
85        let svc = CloudWatchLogsService::new();
86        let ctx = ctx();
87        block_on(svc.handle(
88            "CreateLogGroup",
89            json!({ "logGroupName": "/dup/group" }),
90            &ctx,
91        ))
92        .unwrap();
93
94        let err = block_on(svc.handle(
95            "CreateLogGroup",
96            json!({ "logGroupName": "/dup/group" }),
97            &ctx,
98        ))
99        .unwrap_err();
100        assert_eq!(err.code, "ResourceAlreadyExistsException");
101    }
102
103    #[test]
104    fn test_delete_log_group() {
105        let svc = CloudWatchLogsService::new();
106        let ctx = ctx();
107        block_on(svc.handle(
108            "CreateLogGroup",
109            json!({ "logGroupName": "/delete/me" }),
110            &ctx,
111        ))
112        .unwrap();
113
114        block_on(svc.handle(
115            "DeleteLogGroup",
116            json!({ "logGroupName": "/delete/me" }),
117            &ctx,
118        ))
119        .unwrap();
120
121        let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
122        assert_eq!(result["logGroups"].as_array().unwrap().len(), 0);
123    }
124
125    #[test]
126    fn test_delete_nonexistent_log_group() {
127        let svc = CloudWatchLogsService::new();
128        let ctx = ctx();
129        let err = block_on(svc.handle("DeleteLogGroup", json!({ "logGroupName": "/ghost" }), &ctx))
130            .unwrap_err();
131        assert_eq!(err.code, "ResourceNotFoundException");
132    }
133
134    #[test]
135    fn test_describe_log_groups_empty() {
136        let svc = CloudWatchLogsService::new();
137        let ctx = ctx();
138        let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
139        assert_eq!(result["logGroups"].as_array().unwrap().len(), 0);
140    }
141
142    #[test]
143    fn test_describe_log_groups_prefix_filter() {
144        let svc = CloudWatchLogsService::new();
145        let ctx = ctx();
146        block_on(svc.handle(
147            "CreateLogGroup",
148            json!({ "logGroupName": "/app/foo" }),
149            &ctx,
150        ))
151        .unwrap();
152        block_on(svc.handle(
153            "CreateLogGroup",
154            json!({ "logGroupName": "/app/bar" }),
155            &ctx,
156        ))
157        .unwrap();
158        block_on(svc.handle(
159            "CreateLogGroup",
160            json!({ "logGroupName": "/other/baz" }),
161            &ctx,
162        ))
163        .unwrap();
164
165        let result = block_on(svc.handle(
166            "DescribeLogGroups",
167            json!({ "logGroupNamePrefix": "/app/" }),
168            &ctx,
169        ))
170        .unwrap();
171        assert_eq!(result["logGroups"].as_array().unwrap().len(), 2);
172    }
173
174    #[test]
175    fn test_put_retention_policy() {
176        let svc = CloudWatchLogsService::new();
177        let ctx = ctx();
178        block_on(svc.handle(
179            "CreateLogGroup",
180            json!({ "logGroupName": "/ret/group" }),
181            &ctx,
182        ))
183        .unwrap();
184
185        block_on(svc.handle(
186            "PutRetentionPolicy",
187            json!({ "logGroupName": "/ret/group", "retentionInDays": 7 }),
188            &ctx,
189        ))
190        .unwrap();
191
192        let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
193        let group = &result["logGroups"].as_array().unwrap()[0];
194        assert_eq!(group["retentionInDays"].as_u64().unwrap(), 7);
195    }
196
197    #[test]
198    fn test_put_retention_policy_invalid_days() {
199        let svc = CloudWatchLogsService::new();
200        let ctx = ctx();
201        block_on(svc.handle(
202            "CreateLogGroup",
203            json!({ "logGroupName": "/ret2/group" }),
204            &ctx,
205        ))
206        .unwrap();
207
208        let err = block_on(svc.handle(
209            "PutRetentionPolicy",
210            json!({ "logGroupName": "/ret2/group", "retentionInDays": 99 }),
211            &ctx,
212        ))
213        .unwrap_err();
214        assert_eq!(err.code, "InvalidParameterException");
215    }
216
217    #[test]
218    fn test_delete_retention_policy() {
219        let svc = CloudWatchLogsService::new();
220        let ctx = ctx();
221        block_on(svc.handle(
222            "CreateLogGroup",
223            json!({ "logGroupName": "/delret/group" }),
224            &ctx,
225        ))
226        .unwrap();
227        block_on(svc.handle(
228            "PutRetentionPolicy",
229            json!({ "logGroupName": "/delret/group", "retentionInDays": 30 }),
230            &ctx,
231        ))
232        .unwrap();
233
234        block_on(svc.handle(
235            "DeleteRetentionPolicy",
236            json!({ "logGroupName": "/delret/group" }),
237            &ctx,
238        ))
239        .unwrap();
240
241        let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
242        let group = &result["logGroups"].as_array().unwrap()[0];
243        assert!(group.get("retentionInDays").is_none() || group["retentionInDays"].is_null());
244    }
245
246    #[test]
247    fn test_tag_untag_log_group() {
248        let svc = CloudWatchLogsService::new();
249        let ctx = ctx();
250        block_on(svc.handle(
251            "CreateLogGroup",
252            json!({ "logGroupName": "/tag/group" }),
253            &ctx,
254        ))
255        .unwrap();
256
257        block_on(svc.handle(
258            "TagLogGroup",
259            json!({ "logGroupName": "/tag/group", "tags": { "key1": "val1", "key2": "val2" } }),
260            &ctx,
261        ))
262        .unwrap();
263
264        let tags = block_on(svc.handle(
265            "ListTagsLogGroup",
266            json!({ "logGroupName": "/tag/group" }),
267            &ctx,
268        ))
269        .unwrap();
270        assert_eq!(tags["tags"].as_object().unwrap().len(), 2);
271
272        block_on(svc.handle(
273            "UntagLogGroup",
274            json!({ "logGroupName": "/tag/group", "tags": ["key1"] }),
275            &ctx,
276        ))
277        .unwrap();
278
279        let tags2 = block_on(svc.handle(
280            "ListTagsLogGroup",
281            json!({ "logGroupName": "/tag/group" }),
282            &ctx,
283        ))
284        .unwrap();
285        assert_eq!(tags2["tags"].as_object().unwrap().len(), 1);
286    }
287
288    // -----------------------------------------------------------------------
289    // Log Streams
290    // -----------------------------------------------------------------------
291
292    #[test]
293    fn test_create_log_stream() {
294        let svc = CloudWatchLogsService::new();
295        let ctx = ctx();
296        block_on(svc.handle(
297            "CreateLogGroup",
298            json!({ "logGroupName": "/stream/group" }),
299            &ctx,
300        ))
301        .unwrap();
302
303        block_on(svc.handle(
304            "CreateLogStream",
305            json!({ "logGroupName": "/stream/group", "logStreamName": "stream-1" }),
306            &ctx,
307        ))
308        .unwrap();
309    }
310
311    #[test]
312    fn test_create_log_stream_duplicate() {
313        let svc = CloudWatchLogsService::new();
314        let ctx = ctx();
315        block_on(svc.handle(
316            "CreateLogGroup",
317            json!({ "logGroupName": "/dup/stream/group" }),
318            &ctx,
319        ))
320        .unwrap();
321
322        block_on(svc.handle(
323            "CreateLogStream",
324            json!({ "logGroupName": "/dup/stream/group", "logStreamName": "dup-stream" }),
325            &ctx,
326        ))
327        .unwrap();
328
329        let err = block_on(svc.handle(
330            "CreateLogStream",
331            json!({ "logGroupName": "/dup/stream/group", "logStreamName": "dup-stream" }),
332            &ctx,
333        ))
334        .unwrap_err();
335        assert_eq!(err.code, "ResourceAlreadyExistsException");
336    }
337
338    #[test]
339    fn test_delete_log_stream() {
340        let svc = CloudWatchLogsService::new();
341        let ctx = ctx();
342        block_on(svc.handle(
343            "CreateLogGroup",
344            json!({ "logGroupName": "/del/stream/group" }),
345            &ctx,
346        ))
347        .unwrap();
348        block_on(svc.handle(
349            "CreateLogStream",
350            json!({ "logGroupName": "/del/stream/group", "logStreamName": "del-stream" }),
351            &ctx,
352        ))
353        .unwrap();
354
355        block_on(svc.handle(
356            "DeleteLogStream",
357            json!({ "logGroupName": "/del/stream/group", "logStreamName": "del-stream" }),
358            &ctx,
359        ))
360        .unwrap();
361
362        let result = block_on(svc.handle(
363            "DescribeLogStreams",
364            json!({ "logGroupName": "/del/stream/group" }),
365            &ctx,
366        ))
367        .unwrap();
368        assert_eq!(result["logStreams"].as_array().unwrap().len(), 0);
369    }
370
371    #[test]
372    fn test_describe_log_streams_prefix() {
373        let svc = CloudWatchLogsService::new();
374        let ctx = ctx();
375        block_on(svc.handle(
376            "CreateLogGroup",
377            json!({ "logGroupName": "/desc/streams" }),
378            &ctx,
379        ))
380        .unwrap();
381
382        for name in &["app-stream-1", "app-stream-2", "other-stream"] {
383            block_on(svc.handle(
384                "CreateLogStream",
385                json!({ "logGroupName": "/desc/streams", "logStreamName": name }),
386                &ctx,
387            ))
388            .unwrap();
389        }
390
391        let result = block_on(svc.handle(
392            "DescribeLogStreams",
393            json!({ "logGroupName": "/desc/streams", "logStreamNamePrefix": "app-" }),
394            &ctx,
395        ))
396        .unwrap();
397        assert_eq!(result["logStreams"].as_array().unwrap().len(), 2);
398    }
399
400    // -----------------------------------------------------------------------
401    // Log Events
402    // -----------------------------------------------------------------------
403
404    fn setup_group_and_stream(svc: &CloudWatchLogsService, group: &str, stream: &str) {
405        let ctx = ctx();
406        block_on(svc.handle("CreateLogGroup", json!({ "logGroupName": group }), &ctx)).unwrap();
407        block_on(svc.handle(
408            "CreateLogStream",
409            json!({ "logGroupName": group, "logStreamName": stream }),
410            &ctx,
411        ))
412        .unwrap();
413    }
414
415    #[test]
416    fn test_put_and_get_log_events() {
417        let svc = CloudWatchLogsService::new();
418        let ctx = ctx();
419        setup_group_and_stream(&svc, "/events/group", "events-stream");
420
421        let now = now_ts();
422        let result = block_on(svc.handle(
423            "PutLogEvents",
424            json!({
425                "logGroupName": "/events/group",
426                "logStreamName": "events-stream",
427                "logEvents": [
428                    { "timestamp": now - 2000, "message": "first event" },
429                    { "timestamp": now - 1000, "message": "second event" },
430                ],
431            }),
432            &ctx,
433        ))
434        .unwrap();
435        assert!(result["nextSequenceToken"].as_str().is_some());
436
437        let got = block_on(svc.handle(
438            "GetLogEvents",
439            json!({
440                "logGroupName": "/events/group",
441                "logStreamName": "events-stream",
442                "startFromHead": true,
443            }),
444            &ctx,
445        ))
446        .unwrap();
447        assert_eq!(got["events"].as_array().unwrap().len(), 2);
448        assert_eq!(got["events"][0]["message"].as_str().unwrap(), "first event");
449    }
450
451    #[test]
452    fn test_get_log_events_time_filter() {
453        let svc = CloudWatchLogsService::new();
454        let ctx = ctx();
455        setup_group_and_stream(&svc, "/time/group", "time-stream");
456
457        let now = now_ts();
458        block_on(svc.handle(
459            "PutLogEvents",
460            json!({
461                "logGroupName": "/time/group",
462                "logStreamName": "time-stream",
463                "logEvents": [
464                    { "timestamp": now - 9000, "message": "before" },
465                    { "timestamp": now - 5000, "message": "during" },
466                    { "timestamp": now - 1000, "message": "after" },
467                ],
468            }),
469            &ctx,
470        ))
471        .unwrap();
472
473        let got = block_on(svc.handle(
474            "GetLogEvents",
475            json!({
476                "logGroupName": "/time/group",
477                "logStreamName": "time-stream",
478                "startTime": now - 8000,
479                "endTime": now - 2000,
480                "startFromHead": true,
481            }),
482            &ctx,
483        ))
484        .unwrap();
485        let events = got["events"].as_array().unwrap();
486        assert_eq!(events.len(), 1);
487        assert_eq!(events[0]["message"].as_str().unwrap(), "during");
488    }
489
490    #[test]
491    fn test_filter_log_events_pattern() {
492        let svc = CloudWatchLogsService::new();
493        let ctx = ctx();
494        setup_group_and_stream(&svc, "/filter/group", "filter-stream");
495
496        let now = now_ts();
497        block_on(svc.handle(
498            "PutLogEvents",
499            json!({
500                "logGroupName": "/filter/group",
501                "logStreamName": "filter-stream",
502                "logEvents": [
503                    { "timestamp": now - 3000, "message": "ERROR something failed" },
504                    { "timestamp": now - 2000, "message": "INFO all good" },
505                    { "timestamp": now - 1000, "message": "ERROR another failure" },
506                ],
507            }),
508            &ctx,
509        ))
510        .unwrap();
511
512        let got = block_on(svc.handle(
513            "FilterLogEvents",
514            json!({
515                "logGroupName": "/filter/group",
516                "filterPattern": "ERROR",
517            }),
518            &ctx,
519        ))
520        .unwrap();
521        assert_eq!(got["events"].as_array().unwrap().len(), 2);
522    }
523
524    #[test]
525    fn test_filter_log_events_empty_pattern() {
526        let svc = CloudWatchLogsService::new();
527        let ctx = ctx();
528        setup_group_and_stream(&svc, "/filter2/group", "filter2-stream");
529
530        let now = now_ts();
531        block_on(svc.handle(
532            "PutLogEvents",
533            json!({
534                "logGroupName": "/filter2/group",
535                "logStreamName": "filter2-stream",
536                "logEvents": [
537                    { "timestamp": now - 2000, "message": "msg1" },
538                    { "timestamp": now - 1000, "message": "msg2" },
539                ],
540            }),
541            &ctx,
542        ))
543        .unwrap();
544
545        let got = block_on(svc.handle(
546            "FilterLogEvents",
547            json!({
548                "logGroupName": "/filter2/group",
549            }),
550            &ctx,
551        ))
552        .unwrap();
553        assert_eq!(got["events"].as_array().unwrap().len(), 2);
554    }
555
556    #[test]
557    fn test_put_log_events_missing_group() {
558        let svc = CloudWatchLogsService::new();
559        let ctx = ctx();
560        let now = now_ts();
561        let err = block_on(svc.handle(
562            "PutLogEvents",
563            json!({
564                "logGroupName": "/ghost",
565                "logStreamName": "stream",
566                "logEvents": [{ "timestamp": now, "message": "hi" }],
567            }),
568            &ctx,
569        ))
570        .unwrap_err();
571        assert_eq!(err.code, "ResourceNotFoundException");
572    }
573
574    #[test]
575    fn test_create_log_group_persists_log_group_class() {
576        let svc = CloudWatchLogsService::new();
577        let ctx = ctx();
578        block_on(svc.handle(
579            "CreateLogGroup",
580            json!({ "logGroupName": "/ia/group", "logGroupClass": "INFREQUENT_ACCESS" }),
581            &ctx,
582        ))
583        .unwrap();
584        let desc = block_on(svc.handle(
585            "DescribeLogGroups",
586            json!({ "logGroupNamePrefix": "/ia/" }),
587            &ctx,
588        ))
589        .unwrap();
590        let groups = desc["logGroups"].as_array().unwrap();
591        assert_eq!(groups.len(), 1);
592        assert_eq!(groups[0]["logGroupClass"], "INFREQUENT_ACCESS");
593    }
594
595    #[test]
596    fn test_create_log_group_rejects_invalid_log_group_class() {
597        let svc = CloudWatchLogsService::new();
598        let ctx = ctx();
599        let err = block_on(svc.handle(
600            "CreateLogGroup",
601            json!({ "logGroupName": "/bad/group", "logGroupClass": "ARCHIVE" }),
602            &ctx,
603        ))
604        .unwrap_err();
605        assert_eq!(err.code, "InvalidParameterException");
606    }
607
608    #[test]
609    fn test_put_log_events_rejects_too_old_events() {
610        let svc = CloudWatchLogsService::new();
611        let ctx = ctx();
612        setup_group_and_stream(&svc, "/rej/group", "rej-stream");
613
614        let now = now_ts();
615        let too_old = now - (15 * 24 * 60 * 60 * 1000);
616        let result = block_on(svc.handle(
617            "PutLogEvents",
618            json!({
619                "logGroupName": "/rej/group",
620                "logStreamName": "rej-stream",
621                "logEvents": [
622                    { "timestamp": too_old, "message": "ancient" },
623                    { "timestamp": now - 1000, "message": "fresh" },
624                ],
625            }),
626            &ctx,
627        ))
628        .unwrap();
629        let rej = &result["rejectedLogEventsInfo"];
630        assert_eq!(rej["tooOldLogEventEndIndex"].as_u64(), Some(0));
631    }
632
633    #[test]
634    fn test_put_log_events_rejects_too_new_events() {
635        let svc = CloudWatchLogsService::new();
636        let ctx = ctx();
637        setup_group_and_stream(&svc, "/rej2/group", "rej2-stream");
638
639        let now = now_ts();
640        let too_future = now + (3 * 60 * 60 * 1000);
641        let result = block_on(svc.handle(
642            "PutLogEvents",
643            json!({
644                "logGroupName": "/rej2/group",
645                "logStreamName": "rej2-stream",
646                "logEvents": [
647                    { "timestamp": now - 1000, "message": "fresh" },
648                    { "timestamp": too_future, "message": "from the future" },
649                ],
650            }),
651            &ctx,
652        ))
653        .unwrap();
654        let rej = &result["rejectedLogEventsInfo"];
655        assert_eq!(rej["tooNewLogEventStartIndex"].as_u64(), Some(1));
656    }
657
658    #[test]
659    fn test_unknown_operation() {
660        let svc = CloudWatchLogsService::new();
661        let ctx = ctx();
662        let err = block_on(svc.handle("NonExistentOp", json!({}), &ctx)).unwrap_err();
663        assert_eq!(err.code, "UnknownOperationException");
664    }
665
666    #[test]
667    fn test_log_group_arn_format() {
668        let svc = CloudWatchLogsService::new();
669        let ctx = ctx();
670        block_on(svc.handle(
671            "CreateLogGroup",
672            json!({ "logGroupName": "/arn/check" }),
673            &ctx,
674        ))
675        .unwrap();
676
677        let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
678        let group = &result["logGroups"].as_array().unwrap()[0];
679        let arn = group["arn"].as_str().unwrap();
680        assert!(
681            arn.starts_with("arn:aws:logs:us-east-1:000000000000:log-group:/arn/check"),
682            "arn={arn}"
683        );
684    }
685}