1mod handler;
2mod operations;
3pub mod sqlite_store;
4mod state;
5
6pub use handler::CloudWatchLogsService;
7pub use sqlite_store::{LogEventRow, SqliteStore};
8
9#[cfg(test)]
10mod tests {
11 use awsim_core::RequestContext;
12 use serde_json::json;
13
14 use super::handler::CloudWatchLogsService;
15 use awsim_core::ServiceHandler;
16
17 fn ctx() -> RequestContext {
18 RequestContext::new("logs", "us-east-1")
19 }
20
21 fn now_ts() -> u64 {
22 super::state::now_millis()
23 }
24
25 fn block_on<F: std::future::Future>(f: F) -> F::Output {
26 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
27
28 fn noop_clone(_: *const ()) -> RawWaker {
29 noop_raw_waker()
30 }
31 fn noop(_: *const ()) {}
32 fn noop_raw_waker() -> RawWaker {
33 static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
34 RawWaker::new(std::ptr::null(), &VTABLE)
35 }
36 let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
37 let mut cx = Context::from_waker(&waker);
38 let mut fut = std::pin::pin!(f);
39 loop {
40 match fut.as_mut().poll(&mut cx) {
41 Poll::Ready(v) => return v,
42 Poll::Pending => {}
43 }
44 }
45 }
46
47 #[test]
52 fn test_create_log_group_basic() {
53 let svc = CloudWatchLogsService::new();
54 let ctx = ctx();
55 block_on(svc.handle(
56 "CreateLogGroup",
57 json!({ "logGroupName": "/my/app/logs" }),
58 &ctx,
59 ))
60 .unwrap();
61 }
62
63 #[test]
64 fn test_create_log_group_with_tags() {
65 let svc = CloudWatchLogsService::new();
66 let ctx = ctx();
67 block_on(svc.handle(
68 "CreateLogGroup",
69 json!({ "logGroupName": "/tagged/group", "tags": { "env": "test" } }),
70 &ctx,
71 ))
72 .unwrap();
73
74 let tags = block_on(svc.handle(
75 "ListTagsLogGroup",
76 json!({ "logGroupName": "/tagged/group" }),
77 &ctx,
78 ))
79 .unwrap();
80 assert_eq!(tags["tags"]["env"].as_str().unwrap(), "test");
81 }
82
83 #[test]
84 fn test_create_log_group_duplicate() {
85 let svc = CloudWatchLogsService::new();
86 let ctx = ctx();
87 block_on(svc.handle(
88 "CreateLogGroup",
89 json!({ "logGroupName": "/dup/group" }),
90 &ctx,
91 ))
92 .unwrap();
93
94 let err = block_on(svc.handle(
95 "CreateLogGroup",
96 json!({ "logGroupName": "/dup/group" }),
97 &ctx,
98 ))
99 .unwrap_err();
100 assert_eq!(err.code, "ResourceAlreadyExistsException");
101 }
102
103 #[test]
104 fn test_delete_log_group() {
105 let svc = CloudWatchLogsService::new();
106 let ctx = ctx();
107 block_on(svc.handle(
108 "CreateLogGroup",
109 json!({ "logGroupName": "/delete/me" }),
110 &ctx,
111 ))
112 .unwrap();
113
114 block_on(svc.handle(
115 "DeleteLogGroup",
116 json!({ "logGroupName": "/delete/me" }),
117 &ctx,
118 ))
119 .unwrap();
120
121 let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
122 assert_eq!(result["logGroups"].as_array().unwrap().len(), 0);
123 }
124
125 #[test]
126 fn test_delete_nonexistent_log_group() {
127 let svc = CloudWatchLogsService::new();
128 let ctx = ctx();
129 let err = block_on(svc.handle("DeleteLogGroup", json!({ "logGroupName": "/ghost" }), &ctx))
130 .unwrap_err();
131 assert_eq!(err.code, "ResourceNotFoundException");
132 }
133
134 #[test]
135 fn test_describe_log_groups_empty() {
136 let svc = CloudWatchLogsService::new();
137 let ctx = ctx();
138 let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
139 assert_eq!(result["logGroups"].as_array().unwrap().len(), 0);
140 }
141
142 #[test]
143 fn test_describe_log_groups_prefix_filter() {
144 let svc = CloudWatchLogsService::new();
145 let ctx = ctx();
146 block_on(svc.handle(
147 "CreateLogGroup",
148 json!({ "logGroupName": "/app/foo" }),
149 &ctx,
150 ))
151 .unwrap();
152 block_on(svc.handle(
153 "CreateLogGroup",
154 json!({ "logGroupName": "/app/bar" }),
155 &ctx,
156 ))
157 .unwrap();
158 block_on(svc.handle(
159 "CreateLogGroup",
160 json!({ "logGroupName": "/other/baz" }),
161 &ctx,
162 ))
163 .unwrap();
164
165 let result = block_on(svc.handle(
166 "DescribeLogGroups",
167 json!({ "logGroupNamePrefix": "/app/" }),
168 &ctx,
169 ))
170 .unwrap();
171 assert_eq!(result["logGroups"].as_array().unwrap().len(), 2);
172 }
173
174 #[test]
175 fn test_put_retention_policy() {
176 let svc = CloudWatchLogsService::new();
177 let ctx = ctx();
178 block_on(svc.handle(
179 "CreateLogGroup",
180 json!({ "logGroupName": "/ret/group" }),
181 &ctx,
182 ))
183 .unwrap();
184
185 block_on(svc.handle(
186 "PutRetentionPolicy",
187 json!({ "logGroupName": "/ret/group", "retentionInDays": 7 }),
188 &ctx,
189 ))
190 .unwrap();
191
192 let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
193 let group = &result["logGroups"].as_array().unwrap()[0];
194 assert_eq!(group["retentionInDays"].as_u64().unwrap(), 7);
195 }
196
197 #[test]
198 fn test_put_retention_policy_invalid_days() {
199 let svc = CloudWatchLogsService::new();
200 let ctx = ctx();
201 block_on(svc.handle(
202 "CreateLogGroup",
203 json!({ "logGroupName": "/ret2/group" }),
204 &ctx,
205 ))
206 .unwrap();
207
208 let err = block_on(svc.handle(
209 "PutRetentionPolicy",
210 json!({ "logGroupName": "/ret2/group", "retentionInDays": 99 }),
211 &ctx,
212 ))
213 .unwrap_err();
214 assert_eq!(err.code, "InvalidParameterException");
215 }
216
217 #[test]
218 fn test_delete_retention_policy() {
219 let svc = CloudWatchLogsService::new();
220 let ctx = ctx();
221 block_on(svc.handle(
222 "CreateLogGroup",
223 json!({ "logGroupName": "/delret/group" }),
224 &ctx,
225 ))
226 .unwrap();
227 block_on(svc.handle(
228 "PutRetentionPolicy",
229 json!({ "logGroupName": "/delret/group", "retentionInDays": 30 }),
230 &ctx,
231 ))
232 .unwrap();
233
234 block_on(svc.handle(
235 "DeleteRetentionPolicy",
236 json!({ "logGroupName": "/delret/group" }),
237 &ctx,
238 ))
239 .unwrap();
240
241 let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
242 let group = &result["logGroups"].as_array().unwrap()[0];
243 assert!(group.get("retentionInDays").is_none() || group["retentionInDays"].is_null());
244 }
245
246 #[test]
247 fn test_tag_untag_log_group() {
248 let svc = CloudWatchLogsService::new();
249 let ctx = ctx();
250 block_on(svc.handle(
251 "CreateLogGroup",
252 json!({ "logGroupName": "/tag/group" }),
253 &ctx,
254 ))
255 .unwrap();
256
257 block_on(svc.handle(
258 "TagLogGroup",
259 json!({ "logGroupName": "/tag/group", "tags": { "key1": "val1", "key2": "val2" } }),
260 &ctx,
261 ))
262 .unwrap();
263
264 let tags = block_on(svc.handle(
265 "ListTagsLogGroup",
266 json!({ "logGroupName": "/tag/group" }),
267 &ctx,
268 ))
269 .unwrap();
270 assert_eq!(tags["tags"].as_object().unwrap().len(), 2);
271
272 block_on(svc.handle(
273 "UntagLogGroup",
274 json!({ "logGroupName": "/tag/group", "tags": ["key1"] }),
275 &ctx,
276 ))
277 .unwrap();
278
279 let tags2 = block_on(svc.handle(
280 "ListTagsLogGroup",
281 json!({ "logGroupName": "/tag/group" }),
282 &ctx,
283 ))
284 .unwrap();
285 assert_eq!(tags2["tags"].as_object().unwrap().len(), 1);
286 }
287
288 #[test]
293 fn test_create_log_stream() {
294 let svc = CloudWatchLogsService::new();
295 let ctx = ctx();
296 block_on(svc.handle(
297 "CreateLogGroup",
298 json!({ "logGroupName": "/stream/group" }),
299 &ctx,
300 ))
301 .unwrap();
302
303 block_on(svc.handle(
304 "CreateLogStream",
305 json!({ "logGroupName": "/stream/group", "logStreamName": "stream-1" }),
306 &ctx,
307 ))
308 .unwrap();
309 }
310
311 #[test]
312 fn test_create_log_stream_duplicate() {
313 let svc = CloudWatchLogsService::new();
314 let ctx = ctx();
315 block_on(svc.handle(
316 "CreateLogGroup",
317 json!({ "logGroupName": "/dup/stream/group" }),
318 &ctx,
319 ))
320 .unwrap();
321
322 block_on(svc.handle(
323 "CreateLogStream",
324 json!({ "logGroupName": "/dup/stream/group", "logStreamName": "dup-stream" }),
325 &ctx,
326 ))
327 .unwrap();
328
329 let err = block_on(svc.handle(
330 "CreateLogStream",
331 json!({ "logGroupName": "/dup/stream/group", "logStreamName": "dup-stream" }),
332 &ctx,
333 ))
334 .unwrap_err();
335 assert_eq!(err.code, "ResourceAlreadyExistsException");
336 }
337
338 #[test]
339 fn test_delete_log_stream() {
340 let svc = CloudWatchLogsService::new();
341 let ctx = ctx();
342 block_on(svc.handle(
343 "CreateLogGroup",
344 json!({ "logGroupName": "/del/stream/group" }),
345 &ctx,
346 ))
347 .unwrap();
348 block_on(svc.handle(
349 "CreateLogStream",
350 json!({ "logGroupName": "/del/stream/group", "logStreamName": "del-stream" }),
351 &ctx,
352 ))
353 .unwrap();
354
355 block_on(svc.handle(
356 "DeleteLogStream",
357 json!({ "logGroupName": "/del/stream/group", "logStreamName": "del-stream" }),
358 &ctx,
359 ))
360 .unwrap();
361
362 let result = block_on(svc.handle(
363 "DescribeLogStreams",
364 json!({ "logGroupName": "/del/stream/group" }),
365 &ctx,
366 ))
367 .unwrap();
368 assert_eq!(result["logStreams"].as_array().unwrap().len(), 0);
369 }
370
371 #[test]
372 fn test_describe_log_streams_prefix() {
373 let svc = CloudWatchLogsService::new();
374 let ctx = ctx();
375 block_on(svc.handle(
376 "CreateLogGroup",
377 json!({ "logGroupName": "/desc/streams" }),
378 &ctx,
379 ))
380 .unwrap();
381
382 for name in &["app-stream-1", "app-stream-2", "other-stream"] {
383 block_on(svc.handle(
384 "CreateLogStream",
385 json!({ "logGroupName": "/desc/streams", "logStreamName": name }),
386 &ctx,
387 ))
388 .unwrap();
389 }
390
391 let result = block_on(svc.handle(
392 "DescribeLogStreams",
393 json!({ "logGroupName": "/desc/streams", "logStreamNamePrefix": "app-" }),
394 &ctx,
395 ))
396 .unwrap();
397 assert_eq!(result["logStreams"].as_array().unwrap().len(), 2);
398 }
399
400 fn setup_group_and_stream(svc: &CloudWatchLogsService, group: &str, stream: &str) {
405 let ctx = ctx();
406 block_on(svc.handle("CreateLogGroup", json!({ "logGroupName": group }), &ctx)).unwrap();
407 block_on(svc.handle(
408 "CreateLogStream",
409 json!({ "logGroupName": group, "logStreamName": stream }),
410 &ctx,
411 ))
412 .unwrap();
413 }
414
415 #[test]
416 fn test_put_and_get_log_events() {
417 let svc = CloudWatchLogsService::new();
418 let ctx = ctx();
419 setup_group_and_stream(&svc, "/events/group", "events-stream");
420
421 let now = now_ts();
422 let result = block_on(svc.handle(
423 "PutLogEvents",
424 json!({
425 "logGroupName": "/events/group",
426 "logStreamName": "events-stream",
427 "logEvents": [
428 { "timestamp": now - 2000, "message": "first event" },
429 { "timestamp": now - 1000, "message": "second event" },
430 ],
431 }),
432 &ctx,
433 ))
434 .unwrap();
435 assert!(result["nextSequenceToken"].as_str().is_some());
436
437 let got = block_on(svc.handle(
438 "GetLogEvents",
439 json!({
440 "logGroupName": "/events/group",
441 "logStreamName": "events-stream",
442 "startFromHead": true,
443 }),
444 &ctx,
445 ))
446 .unwrap();
447 assert_eq!(got["events"].as_array().unwrap().len(), 2);
448 assert_eq!(got["events"][0]["message"].as_str().unwrap(), "first event");
449 }
450
451 #[test]
452 fn test_get_log_events_time_filter() {
453 let svc = CloudWatchLogsService::new();
454 let ctx = ctx();
455 setup_group_and_stream(&svc, "/time/group", "time-stream");
456
457 let now = now_ts();
458 block_on(svc.handle(
459 "PutLogEvents",
460 json!({
461 "logGroupName": "/time/group",
462 "logStreamName": "time-stream",
463 "logEvents": [
464 { "timestamp": now - 9000, "message": "before" },
465 { "timestamp": now - 5000, "message": "during" },
466 { "timestamp": now - 1000, "message": "after" },
467 ],
468 }),
469 &ctx,
470 ))
471 .unwrap();
472
473 let got = block_on(svc.handle(
474 "GetLogEvents",
475 json!({
476 "logGroupName": "/time/group",
477 "logStreamName": "time-stream",
478 "startTime": now - 8000,
479 "endTime": now - 2000,
480 "startFromHead": true,
481 }),
482 &ctx,
483 ))
484 .unwrap();
485 let events = got["events"].as_array().unwrap();
486 assert_eq!(events.len(), 1);
487 assert_eq!(events[0]["message"].as_str().unwrap(), "during");
488 }
489
490 #[test]
491 fn test_filter_log_events_pattern() {
492 let svc = CloudWatchLogsService::new();
493 let ctx = ctx();
494 setup_group_and_stream(&svc, "/filter/group", "filter-stream");
495
496 let now = now_ts();
497 block_on(svc.handle(
498 "PutLogEvents",
499 json!({
500 "logGroupName": "/filter/group",
501 "logStreamName": "filter-stream",
502 "logEvents": [
503 { "timestamp": now - 3000, "message": "ERROR something failed" },
504 { "timestamp": now - 2000, "message": "INFO all good" },
505 { "timestamp": now - 1000, "message": "ERROR another failure" },
506 ],
507 }),
508 &ctx,
509 ))
510 .unwrap();
511
512 let got = block_on(svc.handle(
513 "FilterLogEvents",
514 json!({
515 "logGroupName": "/filter/group",
516 "filterPattern": "ERROR",
517 }),
518 &ctx,
519 ))
520 .unwrap();
521 assert_eq!(got["events"].as_array().unwrap().len(), 2);
522 }
523
524 #[test]
525 fn test_filter_log_events_empty_pattern() {
526 let svc = CloudWatchLogsService::new();
527 let ctx = ctx();
528 setup_group_and_stream(&svc, "/filter2/group", "filter2-stream");
529
530 let now = now_ts();
531 block_on(svc.handle(
532 "PutLogEvents",
533 json!({
534 "logGroupName": "/filter2/group",
535 "logStreamName": "filter2-stream",
536 "logEvents": [
537 { "timestamp": now - 2000, "message": "msg1" },
538 { "timestamp": now - 1000, "message": "msg2" },
539 ],
540 }),
541 &ctx,
542 ))
543 .unwrap();
544
545 let got = block_on(svc.handle(
546 "FilterLogEvents",
547 json!({
548 "logGroupName": "/filter2/group",
549 }),
550 &ctx,
551 ))
552 .unwrap();
553 assert_eq!(got["events"].as_array().unwrap().len(), 2);
554 }
555
556 #[test]
557 fn test_put_log_events_missing_group() {
558 let svc = CloudWatchLogsService::new();
559 let ctx = ctx();
560 let now = now_ts();
561 let err = block_on(svc.handle(
562 "PutLogEvents",
563 json!({
564 "logGroupName": "/ghost",
565 "logStreamName": "stream",
566 "logEvents": [{ "timestamp": now, "message": "hi" }],
567 }),
568 &ctx,
569 ))
570 .unwrap_err();
571 assert_eq!(err.code, "ResourceNotFoundException");
572 }
573
574 #[test]
575 fn test_create_log_group_persists_log_group_class() {
576 let svc = CloudWatchLogsService::new();
577 let ctx = ctx();
578 block_on(svc.handle(
579 "CreateLogGroup",
580 json!({ "logGroupName": "/ia/group", "logGroupClass": "INFREQUENT_ACCESS" }),
581 &ctx,
582 ))
583 .unwrap();
584 let desc = block_on(svc.handle(
585 "DescribeLogGroups",
586 json!({ "logGroupNamePrefix": "/ia/" }),
587 &ctx,
588 ))
589 .unwrap();
590 let groups = desc["logGroups"].as_array().unwrap();
591 assert_eq!(groups.len(), 1);
592 assert_eq!(groups[0]["logGroupClass"], "INFREQUENT_ACCESS");
593 }
594
595 #[test]
596 fn test_create_log_group_rejects_invalid_log_group_class() {
597 let svc = CloudWatchLogsService::new();
598 let ctx = ctx();
599 let err = block_on(svc.handle(
600 "CreateLogGroup",
601 json!({ "logGroupName": "/bad/group", "logGroupClass": "ARCHIVE" }),
602 &ctx,
603 ))
604 .unwrap_err();
605 assert_eq!(err.code, "InvalidParameterException");
606 }
607
608 #[test]
609 fn test_put_log_events_rejects_too_old_events() {
610 let svc = CloudWatchLogsService::new();
611 let ctx = ctx();
612 setup_group_and_stream(&svc, "/rej/group", "rej-stream");
613
614 let now = now_ts();
615 let too_old = now - (15 * 24 * 60 * 60 * 1000);
616 let result = block_on(svc.handle(
617 "PutLogEvents",
618 json!({
619 "logGroupName": "/rej/group",
620 "logStreamName": "rej-stream",
621 "logEvents": [
622 { "timestamp": too_old, "message": "ancient" },
623 { "timestamp": now - 1000, "message": "fresh" },
624 ],
625 }),
626 &ctx,
627 ))
628 .unwrap();
629 let rej = &result["rejectedLogEventsInfo"];
630 assert_eq!(rej["tooOldLogEventEndIndex"].as_u64(), Some(0));
631 }
632
633 #[test]
634 fn test_put_log_events_rejects_too_new_events() {
635 let svc = CloudWatchLogsService::new();
636 let ctx = ctx();
637 setup_group_and_stream(&svc, "/rej2/group", "rej2-stream");
638
639 let now = now_ts();
640 let too_future = now + (3 * 60 * 60 * 1000);
641 let result = block_on(svc.handle(
642 "PutLogEvents",
643 json!({
644 "logGroupName": "/rej2/group",
645 "logStreamName": "rej2-stream",
646 "logEvents": [
647 { "timestamp": now - 1000, "message": "fresh" },
648 { "timestamp": too_future, "message": "from the future" },
649 ],
650 }),
651 &ctx,
652 ))
653 .unwrap();
654 let rej = &result["rejectedLogEventsInfo"];
655 assert_eq!(rej["tooNewLogEventStartIndex"].as_u64(), Some(1));
656 }
657
658 #[test]
659 fn test_unknown_operation() {
660 let svc = CloudWatchLogsService::new();
661 let ctx = ctx();
662 let err = block_on(svc.handle("NonExistentOp", json!({}), &ctx)).unwrap_err();
663 assert_eq!(err.code, "UnknownOperationException");
664 }
665
666 #[test]
667 fn test_log_group_arn_format() {
668 let svc = CloudWatchLogsService::new();
669 let ctx = ctx();
670 block_on(svc.handle(
671 "CreateLogGroup",
672 json!({ "logGroupName": "/arn/check" }),
673 &ctx,
674 ))
675 .unwrap();
676
677 let result = block_on(svc.handle("DescribeLogGroups", json!({}), &ctx)).unwrap();
678 let group = &result["logGroups"].as_array().unwrap()[0];
679 let arn = group["arn"].as_str().unwrap();
680 assert!(
681 arn.starts_with("arn:aws:logs:us-east-1:000000000000:log-group:/arn/check"),
682 "arn={arn}"
683 );
684 }
685}