1use std::collections::HashMap;
7
8use dashmap::{DashMap, mapref::entry::Entry};
9use rustack_logs_model::{
10 error::{LogsError, LogsErrorCode},
11 input::{
12 AssociateKmsKeyInput, CreateLogGroupInput, CreateLogStreamInput, DeleteDestinationInput,
13 DeleteLogGroupInput, DeleteLogStreamInput, DeleteMetricFilterInput,
14 DeleteQueryDefinitionInput, DeleteResourcePolicyInput, DeleteRetentionPolicyInput,
15 DeleteSubscriptionFilterInput, DescribeDestinationsInput, DescribeLogGroupsInput,
16 DescribeLogStreamsInput, DescribeMetricFiltersInput, DescribeQueriesInput,
17 DescribeQueryDefinitionsInput, DescribeResourcePoliciesInput,
18 DescribeSubscriptionFiltersInput, DisassociateKmsKeyInput, FilterLogEventsInput,
19 GetLogEventsInput, GetQueryResultsInput, ListTagsForResourceInput, ListTagsLogGroupInput,
20 PutDestinationInput, PutDestinationPolicyInput, PutLogEventsInput, PutMetricFilterInput,
21 PutQueryDefinitionInput, PutResourcePolicyInput, PutRetentionPolicyInput,
22 PutSubscriptionFilterInput, StartQueryInput, StopQueryInput, TagLogGroupInput,
23 TagResourceInput, TestMetricFilterInput, UntagLogGroupInput, UntagResourceInput,
24 },
25 output::{
26 DescribeDestinationsResponse, DescribeLogGroupsResponse, DescribeLogStreamsResponse,
27 DescribeMetricFiltersResponse, DescribeQueriesResponse, DescribeQueryDefinitionsResponse,
28 DescribeResourcePoliciesResponse, DescribeSubscriptionFiltersResponse,
29 FilterLogEventsResponse, GetLogEventsResponse, GetQueryResultsResponse,
30 ListTagsForResourceResponse, ListTagsLogGroupResponse, PutDestinationResponse,
31 PutLogEventsResponse, PutQueryDefinitionResponse, PutResourcePolicyResponse,
32 StartQueryResponse, StopQueryResponse, TestMetricFilterResponse,
33 },
34 types::{
35 Destination, FilteredLogEvent, LogGroup, LogStream, MetricFilter, MetricFilterMatchRecord,
36 OutputLogEvent, QueryDefinition, QueryStatistics, QueryStatus, ResourcePolicy,
37 SearchedLogStream, SubscriptionFilter,
38 },
39};
40
41use crate::config::LogsConfig;
42
43const MAX_PUT_LOG_EVENTS: usize = 10_000;
45
46const MAX_BATCH_SIZE_BYTES: usize = 1_048_576;
48
49const MAX_EVENT_AGE_MS: i64 = 14 * 24 * 60 * 60 * 1000;
51
52const MAX_FUTURE_MS: i64 = 2 * 60 * 60 * 1000;
54
55const DEFAULT_PAGE_SIZE: usize = 50;
57
58fn resolve_page_size(limit: Option<i32>) -> usize {
60 limit.map_or(DEFAULT_PAGE_SIZE, |l| {
61 usize::try_from(l.max(0))
62 .unwrap_or(DEFAULT_PAGE_SIZE)
63 .min(DEFAULT_PAGE_SIZE)
64 })
65}
66
67#[derive(Debug)]
72struct LogGroupRecord {
73 name: String,
74 arn: String,
75 creation_time: i64,
76 retention_in_days: Option<i32>,
77 kms_key_id: Option<String>,
78 tags: HashMap<String, String>,
79 streams: HashMap<String, LogStreamRecord>,
80 metric_filters: HashMap<String, MetricFilterRecord>,
81 subscription_filters: HashMap<String, SubscriptionFilterRecord>,
82 stored_bytes: i64,
83}
84
85#[derive(Debug)]
86struct LogStreamRecord {
87 name: String,
88 arn: String,
89 creation_time: i64,
90 first_event_timestamp: Option<i64>,
91 last_event_timestamp: Option<i64>,
92 last_ingestion_time: Option<i64>,
93 upload_sequence_token: String,
94 events: Vec<StoredLogEvent>,
95}
96
97#[derive(Debug)]
98struct StoredLogEvent {
99 timestamp: i64,
100 message: String,
101 ingestion_time: i64,
102}
103
104#[derive(Debug)]
105struct MetricFilterRecord {
106 name: String,
107 filter_pattern: String,
108 log_group_name: String,
109 metric_transformations: serde_json::Value,
110 creation_time: i64,
111}
112
113#[derive(Debug)]
114struct SubscriptionFilterRecord {
115 name: String,
116 filter_pattern: String,
117 log_group_name: String,
118 destination_arn: String,
119 role_arn: Option<String>,
120 distribution: Option<String>,
121 creation_time: i64,
122}
123
124#[derive(Debug)]
125struct ResourcePolicyRecord {
126 name: String,
127 policy_document: String,
128 last_updated_time: i64,
129}
130
131#[derive(Debug)]
132struct DestinationRecord {
133 name: String,
134 target_arn: String,
135 role_arn: String,
136 access_policy: Option<String>,
137 arn: String,
138 creation_time: i64,
139}
140
141#[derive(Debug)]
142struct QueryDefinitionRecord {
143 query_definition_id: String,
144 name: String,
145 query_string: String,
146 log_group_names: Vec<String>,
147 last_modified: i64,
148}
149
150#[derive(Debug)]
156pub struct RustackLogs {
157 config: LogsConfig,
158 groups: DashMap<String, LogGroupRecord>,
159 resource_policies: DashMap<String, ResourcePolicyRecord>,
160 destinations: DashMap<String, DestinationRecord>,
161 query_definitions: DashMap<String, QueryDefinitionRecord>,
162}
163
164impl RustackLogs {
165 #[must_use]
167 pub fn new(config: LogsConfig) -> Self {
168 Self {
169 config,
170 groups: DashMap::new(),
171 resource_policies: DashMap::new(),
172 destinations: DashMap::new(),
173 query_definitions: DashMap::new(),
174 }
175 }
176
177 fn log_group_arn(&self, name: &str) -> String {
178 format!(
179 "arn:aws:logs:{}:{}:log-group:{}",
180 self.config.default_region, self.config.account_id, name,
181 )
182 }
183
184 fn log_stream_arn(&self, group_name: &str, stream_name: &str) -> String {
185 format!(
186 "arn:aws:logs:{}:{}:log-group:{}:log-stream:{}",
187 self.config.default_region, self.config.account_id, group_name, stream_name,
188 )
189 }
190
191 fn destination_arn(&self, name: &str) -> String {
192 format!(
193 "arn:aws:logs:{}:{}:destination:{}",
194 self.config.default_region, self.config.account_id, name,
195 )
196 }
197
198 fn now_millis() -> i64 {
199 chrono::Utc::now().timestamp_millis()
200 }
201
202 fn validate_log_group_name(name: &str) -> Result<(), LogsError> {
203 if name.is_empty() || name.len() > 512 {
204 return Err(LogsError::with_message(
205 LogsErrorCode::InvalidParameterException,
206 format!(
207 "Log group name must be between 1 and 512 characters, got {}",
208 name.len()
209 ),
210 ));
211 }
212 if !name
214 .chars()
215 .all(|c| c.is_ascii_alphanumeric() || "._-/#".contains(c))
216 {
217 return Err(LogsError::with_message(
218 LogsErrorCode::InvalidParameterException,
219 format!("Log group name does not match pattern [.\\-_/#A-Za-z0-9]+: {name}"),
220 ));
221 }
222 Ok(())
223 }
224
225 fn validate_log_stream_name(name: &str) -> Result<(), LogsError> {
226 if name.is_empty() || name.len() > 512 {
227 return Err(LogsError::with_message(
228 LogsErrorCode::InvalidParameterException,
229 format!(
230 "Log stream name must be between 1 and 512 characters, got {}",
231 name.len()
232 ),
233 ));
234 }
235 if name.contains(':') || name.contains('*') {
236 return Err(LogsError::with_message(
237 LogsErrorCode::InvalidParameterException,
238 format!("Log stream name must not contain ':' or '*': {name}"),
239 ));
240 }
241 Ok(())
242 }
243
244 pub fn handle_create_log_group(
249 &self,
250 input: &CreateLogGroupInput,
251 ) -> Result<serde_json::Value, LogsError> {
252 Self::validate_log_group_name(&input.log_group_name)?;
253
254 match self.groups.entry(input.log_group_name.clone()) {
255 Entry::Occupied(_) => Err(LogsError::with_message(
256 LogsErrorCode::ResourceAlreadyExistsException,
257 format!(
258 "The specified log group already exists: {}",
259 input.log_group_name
260 ),
261 )),
262 Entry::Vacant(entry) => {
263 let arn = self.log_group_arn(&input.log_group_name);
264 entry.insert(LogGroupRecord {
265 name: input.log_group_name.clone(),
266 arn,
267 creation_time: Self::now_millis(),
268 retention_in_days: None,
269 kms_key_id: input.kms_key_id.clone(),
270 tags: input.tags.clone(),
271 streams: HashMap::new(),
272 metric_filters: HashMap::new(),
273 subscription_filters: HashMap::new(),
274 stored_bytes: 0,
275 });
276 Ok(serde_json::json!({}))
277 }
278 }
279 }
280
281 pub fn handle_delete_log_group(
282 &self,
283 input: &DeleteLogGroupInput,
284 ) -> Result<serde_json::Value, LogsError> {
285 self.groups.remove(&input.log_group_name).ok_or_else(|| {
286 LogsError::with_message(
287 LogsErrorCode::ResourceNotFoundException,
288 format!(
289 "The specified log group does not exist: {}",
290 input.log_group_name
291 ),
292 )
293 })?;
294 Ok(serde_json::json!({}))
295 }
296
297 pub fn handle_describe_log_groups(
298 &self,
299 input: &DescribeLogGroupsInput,
300 ) -> Result<DescribeLogGroupsResponse, LogsError> {
301 let page_size = resolve_page_size(input.limit);
302 let mut groups: Vec<LogGroup> = Vec::new();
303
304 for entry in &self.groups {
305 let g = entry.value();
306
307 if let Some(ref prefix) = input.log_group_name_prefix {
309 if !g.name.starts_with(prefix.as_str()) {
310 continue;
311 }
312 }
313
314 if let Some(ref pattern) = input.log_group_name_pattern {
316 if !g.name.contains(pattern.as_str()) {
317 continue;
318 }
319 }
320
321 groups.push(LogGroup {
322 log_group_name: Some(g.name.clone()),
323 log_group_arn: Some(format!("{}:*", g.arn)),
324 arn: Some(format!("{}:*", g.arn)),
325 creation_time: Some(g.creation_time),
326 retention_in_days: g.retention_in_days,
327 kms_key_id: g.kms_key_id.clone(),
328 metric_filter_count: Some(
329 i32::try_from(g.metric_filters.len()).unwrap_or(i32::MAX),
330 ),
331 stored_bytes: Some(g.stored_bytes),
332 ..LogGroup::default()
333 });
334 }
335
336 groups.sort_by(|a, b| a.log_group_name.cmp(&b.log_group_name));
337
338 if let Some(ref cursor) = input.next_token {
340 groups.retain(|g| {
341 g.log_group_name
342 .as_ref()
343 .is_some_and(|n| n.as_str() > cursor.as_str())
344 });
345 }
346
347 let has_more = groups.len() > page_size;
348 groups.truncate(page_size);
349 let next_token = if has_more {
350 groups.last().and_then(|g| g.log_group_name.clone())
351 } else {
352 None
353 };
354
355 Ok(DescribeLogGroupsResponse {
356 log_groups: groups,
357 next_token,
358 })
359 }
360
361 pub fn handle_create_log_stream(
366 &self,
367 input: &CreateLogStreamInput,
368 ) -> Result<serde_json::Value, LogsError> {
369 Self::validate_log_stream_name(&input.log_stream_name)?;
370
371 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
372 LogsError::with_message(
373 LogsErrorCode::ResourceNotFoundException,
374 format!(
375 "The specified log group does not exist: {}",
376 input.log_group_name
377 ),
378 )
379 })?;
380
381 if group.streams.contains_key(&input.log_stream_name) {
382 return Err(LogsError::with_message(
383 LogsErrorCode::ResourceAlreadyExistsException,
384 format!(
385 "The specified log stream already exists: {}",
386 input.log_stream_name,
387 ),
388 ));
389 }
390
391 let arn = self.log_stream_arn(&input.log_group_name, &input.log_stream_name);
392 group.streams.insert(
393 input.log_stream_name.clone(),
394 LogStreamRecord {
395 name: input.log_stream_name.clone(),
396 arn,
397 creation_time: Self::now_millis(),
398 first_event_timestamp: None,
399 last_event_timestamp: None,
400 last_ingestion_time: None,
401 upload_sequence_token: uuid::Uuid::new_v4().to_string(),
402 events: Vec::new(),
403 },
404 );
405
406 Ok(serde_json::json!({}))
407 }
408
409 pub fn handle_delete_log_stream(
410 &self,
411 input: &DeleteLogStreamInput,
412 ) -> Result<serde_json::Value, LogsError> {
413 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
414 LogsError::with_message(
415 LogsErrorCode::ResourceNotFoundException,
416 format!(
417 "The specified log group does not exist: {}",
418 input.log_group_name
419 ),
420 )
421 })?;
422
423 group
424 .streams
425 .remove(&input.log_stream_name)
426 .ok_or_else(|| {
427 LogsError::with_message(
428 LogsErrorCode::ResourceNotFoundException,
429 format!(
430 "The specified log stream does not exist: {}",
431 input.log_stream_name,
432 ),
433 )
434 })?;
435
436 Ok(serde_json::json!({}))
437 }
438
439 pub fn handle_describe_log_streams(
440 &self,
441 input: &DescribeLogStreamsInput,
442 ) -> Result<DescribeLogStreamsResponse, LogsError> {
443 let log_group_name = input
444 .log_group_name
445 .as_deref()
446 .or(input.log_group_identifier.as_deref())
447 .ok_or_else(|| {
448 LogsError::with_message(
449 LogsErrorCode::InvalidParameterException,
450 "Either logGroupName or logGroupIdentifier must be specified",
451 )
452 })?;
453
454 let group = self.groups.get(log_group_name).ok_or_else(|| {
455 LogsError::with_message(
456 LogsErrorCode::ResourceNotFoundException,
457 format!("The specified log group does not exist: {log_group_name}"),
458 )
459 })?;
460
461 let page_size = resolve_page_size(input.limit);
462 let mut streams: Vec<LogStream> = group
463 .streams
464 .values()
465 .filter(|s| {
466 if let Some(ref prefix) = input.log_stream_name_prefix {
467 s.name.starts_with(prefix.as_str())
468 } else {
469 true
470 }
471 })
472 .map(|s| LogStream {
473 log_stream_name: Some(s.name.clone()),
474 arn: Some(s.arn.clone()),
475 creation_time: Some(s.creation_time),
476 first_event_timestamp: s.first_event_timestamp,
477 last_event_timestamp: s.last_event_timestamp,
478 last_ingestion_time: s.last_ingestion_time,
479 upload_sequence_token: Some(s.upload_sequence_token.clone()),
480 stored_bytes: Some(
481 i64::try_from(s.events.iter().map(|e| e.message.len()).sum::<usize>())
482 .unwrap_or(0),
483 ),
484 })
485 .collect();
486
487 let descending = input.descending.unwrap_or(false);
489 match input.order_by {
490 Some(rustack_logs_model::types::OrderBy::LastEventTime) => {
491 streams.sort_by(|a, b| {
492 let ta = a.last_event_timestamp.unwrap_or(0);
493 let tb = b.last_event_timestamp.unwrap_or(0);
494 if descending { tb.cmp(&ta) } else { ta.cmp(&tb) }
495 });
496 }
497 _ => {
498 streams.sort_by(|a, b| {
499 let cmp = a.log_stream_name.cmp(&b.log_stream_name);
500 if descending { cmp.reverse() } else { cmp }
501 });
502 }
503 }
504
505 if let Some(ref cursor) = input.next_token {
507 streams.retain(|s| {
508 s.log_stream_name
509 .as_ref()
510 .is_some_and(|n| n.as_str() > cursor.as_str())
511 });
512 }
513
514 let has_more = streams.len() > page_size;
515 streams.truncate(page_size);
516 let next_token = if has_more {
517 streams.last().and_then(|s| s.log_stream_name.clone())
518 } else {
519 None
520 };
521
522 Ok(DescribeLogStreamsResponse {
523 log_streams: streams,
524 next_token,
525 })
526 }
527
528 pub fn handle_put_log_events(
533 &self,
534 input: &PutLogEventsInput,
535 ) -> Result<PutLogEventsResponse, LogsError> {
536 if input.log_events.len() > MAX_PUT_LOG_EVENTS {
537 return Err(LogsError::with_message(
538 LogsErrorCode::InvalidParameterException,
539 format!(
540 "Log events in a single PutLogEvents request cannot exceed \
541 {MAX_PUT_LOG_EVENTS}"
542 ),
543 ));
544 }
545
546 let total_size: usize = input
548 .log_events
549 .iter()
550 .map(|e| e.message.len() + 26) .sum();
552 if total_size > MAX_BATCH_SIZE_BYTES {
553 return Err(LogsError::with_message(
554 LogsErrorCode::InvalidParameterException,
555 "The batch of log events in a single PutLogEvents request cannot exceed 1 MB",
556 ));
557 }
558
559 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
560 LogsError::with_message(
561 LogsErrorCode::ResourceNotFoundException,
562 format!(
563 "The specified log group does not exist: {}",
564 input.log_group_name
565 ),
566 )
567 })?;
568
569 let stream = group
570 .streams
571 .get_mut(&input.log_stream_name)
572 .ok_or_else(|| {
573 LogsError::with_message(
574 LogsErrorCode::ResourceNotFoundException,
575 format!(
576 "The specified log stream does not exist: {}",
577 input.log_stream_name,
578 ),
579 )
580 })?;
581
582 let now = Self::now_millis();
583
584 for event in &input.log_events {
586 if now - event.timestamp > MAX_EVENT_AGE_MS {
587 return Err(LogsError::with_message(
588 LogsErrorCode::InvalidParameterException,
589 "Log event timestamp is too old (more than 14 days ago)",
590 ));
591 }
592 if event.timestamp - now > MAX_FUTURE_MS {
593 return Err(LogsError::with_message(
594 LogsErrorCode::InvalidParameterException,
595 "Log event timestamp is too far in the future (more than 2 hours)",
596 ));
597 }
598 }
599
600 let mut sorted_events: Vec<&rustack_logs_model::types::InputLogEvent> =
602 input.log_events.iter().collect();
603 sorted_events.sort_by_key(|e| e.timestamp);
604
605 let ingestion_time = now;
606 for event in &sorted_events {
607 stream.events.push(StoredLogEvent {
608 timestamp: event.timestamp,
609 message: event.message.clone(),
610 ingestion_time,
611 });
612 }
613
614 stream.events.sort_by_key(|e| e.timestamp);
616
617 if let Some(first) = sorted_events.first() {
619 if stream.first_event_timestamp.is_none()
620 || first.timestamp < stream.first_event_timestamp.unwrap_or(i64::MAX)
621 {
622 stream.first_event_timestamp = Some(first.timestamp);
623 }
624 }
625 if let Some(last) = sorted_events.last() {
626 stream.last_event_timestamp = Some(last.timestamp);
627 }
628 stream.last_ingestion_time = Some(ingestion_time);
629
630 let new_token = uuid::Uuid::new_v4().to_string();
632 stream.upload_sequence_token.clone_from(&new_token);
633
634 let added_bytes: i64 = sorted_events
636 .iter()
637 .map(|e| i64::try_from(e.message.len()).unwrap_or(0))
638 .sum();
639 group.stored_bytes += added_bytes;
640
641 Ok(PutLogEventsResponse {
642 next_sequence_token: Some(new_token),
643 rejected_log_events_info: None,
644 rejected_entity_info: None,
645 })
646 }
647
648 pub fn handle_get_log_events(
649 &self,
650 input: &GetLogEventsInput,
651 ) -> Result<GetLogEventsResponse, LogsError> {
652 let log_group_name = input
653 .log_group_name
654 .as_deref()
655 .or(input.log_group_identifier.as_deref())
656 .ok_or_else(|| {
657 LogsError::with_message(
658 LogsErrorCode::InvalidParameterException,
659 "Either logGroupName or logGroupIdentifier must be specified",
660 )
661 })?;
662
663 let group = self.groups.get(log_group_name).ok_or_else(|| {
664 LogsError::with_message(
665 LogsErrorCode::ResourceNotFoundException,
666 format!("The specified log group does not exist: {log_group_name}"),
667 )
668 })?;
669
670 let stream = group.streams.get(&input.log_stream_name).ok_or_else(|| {
671 LogsError::with_message(
672 LogsErrorCode::ResourceNotFoundException,
673 format!(
674 "The specified log stream does not exist: {}",
675 input.log_stream_name,
676 ),
677 )
678 })?;
679
680 let limit = input
681 .limit
682 .map_or(10_000, |l| usize::try_from(l.max(1)).unwrap_or(10_000));
683
684 let start_time = input.start_time.unwrap_or(0);
685 let end_time = input.end_time.unwrap_or(i64::MAX);
686 let start_from_head = input.start_from_head.unwrap_or(false);
687
688 let filtered: Vec<(usize, &StoredLogEvent)> = stream
689 .events
690 .iter()
691 .enumerate()
692 .filter(|(_, e)| e.timestamp >= start_time && e.timestamp < end_time)
693 .collect();
694
695 let cursor = input.next_token.as_ref().and_then(|t| {
697 let parts: Vec<&str> = t.splitn(2, '/').collect();
698 if parts.len() == 2 {
699 parts[1].parse::<usize>().ok().map(|idx| (parts[0], idx))
700 } else {
701 None
702 }
703 });
704
705 let (events, forward_idx, backward_idx) =
706 if start_from_head || cursor.as_ref().is_some_and(|(dir, _)| *dir == "f") {
707 let start_idx = cursor.map_or(0, |(_, idx)| idx);
708 let page: Vec<OutputLogEvent> = filtered
709 .iter()
710 .filter(|(orig_idx, _)| *orig_idx >= start_idx)
711 .take(limit)
712 .map(|(_, e)| OutputLogEvent {
713 timestamp: Some(e.timestamp),
714 message: Some(e.message.clone()),
715 ingestion_time: Some(e.ingestion_time),
716 })
717 .collect();
718 let fwd = filtered
719 .iter()
720 .filter(|(orig_idx, _)| *orig_idx >= start_idx)
721 .nth(limit)
722 .map(|(idx, _)| *idx);
723 let bwd = if start_idx > 0 { Some(start_idx) } else { None };
724 (page, fwd, bwd)
725 } else {
726 let end_idx = cursor.map_or(filtered.len(), |(_, idx)| {
727 filtered
728 .iter()
729 .position(|(orig, _)| *orig >= idx)
730 .unwrap_or(filtered.len())
731 });
732 let start_pos = end_idx.saturating_sub(limit);
733 let page: Vec<OutputLogEvent> = filtered[start_pos..end_idx]
734 .iter()
735 .map(|(_, e)| OutputLogEvent {
736 timestamp: Some(e.timestamp),
737 message: Some(e.message.clone()),
738 ingestion_time: Some(e.ingestion_time),
739 })
740 .collect();
741 let fwd = filtered.get(end_idx).map(|(idx, _)| *idx);
742 let bwd = if start_pos > 0 {
743 Some(filtered[start_pos].0)
744 } else {
745 None
746 };
747 (page, fwd, bwd)
748 };
749
750 let forward_token = forward_idx.map(|idx| format!("f/{idx}"));
751 let backward_token = backward_idx.map(|idx| format!("b/{idx}"));
752
753 Ok(GetLogEventsResponse {
754 events,
755 next_forward_token: forward_token,
756 next_backward_token: backward_token,
757 })
758 }
759
760 pub fn handle_filter_log_events(
761 &self,
762 input: &FilterLogEventsInput,
763 ) -> Result<FilterLogEventsResponse, LogsError> {
764 let log_group_name = input
765 .log_group_name
766 .as_deref()
767 .or(input.log_group_identifier.as_deref())
768 .ok_or_else(|| {
769 LogsError::with_message(
770 LogsErrorCode::InvalidParameterException,
771 "Either logGroupName or logGroupIdentifier must be specified",
772 )
773 })?;
774
775 let group = self.groups.get(log_group_name).ok_or_else(|| {
776 LogsError::with_message(
777 LogsErrorCode::ResourceNotFoundException,
778 format!("The specified log group does not exist: {log_group_name}"),
779 )
780 })?;
781
782 let start_time = input.start_time.unwrap_or(0);
783 let end_time = input.end_time.unwrap_or(i64::MAX);
784 let limit = resolve_page_size(input.limit);
785 let filter_pattern = input.filter_pattern.as_deref().unwrap_or("");
786
787 let mut events: Vec<FilteredLogEvent> = Vec::new();
788 let mut searched_streams: Vec<SearchedLogStream> = Vec::new();
789
790 for (stream_name, stream) in &group.streams {
792 if !input.log_stream_names.is_empty() && !input.log_stream_names.contains(stream_name) {
794 continue;
795 }
796 if let Some(ref prefix) = input.log_stream_name_prefix {
797 if !stream_name.starts_with(prefix.as_str()) {
798 continue;
799 }
800 }
801
802 for event in &stream.events {
803 if event.timestamp < start_time || event.timestamp >= end_time {
805 continue;
806 }
807 if !filter_pattern.is_empty() && !event.message.contains(filter_pattern) {
809 continue;
810 }
811
812 events.push(FilteredLogEvent {
813 log_stream_name: Some(stream_name.clone()),
814 timestamp: Some(event.timestamp),
815 message: Some(event.message.clone()),
816 ingestion_time: Some(event.ingestion_time),
817 event_id: Some(uuid::Uuid::new_v4().to_string()),
818 });
819 }
820
821 searched_streams.push(SearchedLogStream {
822 log_stream_name: Some(stream_name.clone()),
823 searched_completely: Some(true),
824 });
825 }
826
827 events.sort_by_key(|e| e.timestamp);
829 let has_more = events.len() > limit;
830 events.truncate(limit);
831
832 if has_more {
834 for ss in &mut searched_streams {
835 ss.searched_completely = Some(false);
836 }
837 }
838
839 Ok(FilterLogEventsResponse {
840 events,
841 searched_log_streams: searched_streams,
842 next_token: None,
843 })
844 }
845
846 pub fn handle_put_retention_policy(
851 &self,
852 input: &PutRetentionPolicyInput,
853 ) -> Result<serde_json::Value, LogsError> {
854 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
855 LogsError::with_message(
856 LogsErrorCode::ResourceNotFoundException,
857 format!(
858 "The specified log group does not exist: {}",
859 input.log_group_name
860 ),
861 )
862 })?;
863 group.retention_in_days = Some(input.retention_in_days);
864 Ok(serde_json::json!({}))
865 }
866
867 pub fn handle_delete_retention_policy(
868 &self,
869 input: &DeleteRetentionPolicyInput,
870 ) -> Result<serde_json::Value, LogsError> {
871 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
872 LogsError::with_message(
873 LogsErrorCode::ResourceNotFoundException,
874 format!(
875 "The specified log group does not exist: {}",
876 input.log_group_name
877 ),
878 )
879 })?;
880 group.retention_in_days = None;
881 Ok(serde_json::json!({}))
882 }
883
884 pub fn handle_tag_log_group(
889 &self,
890 input: &TagLogGroupInput,
891 ) -> Result<serde_json::Value, LogsError> {
892 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
893 LogsError::with_message(
894 LogsErrorCode::ResourceNotFoundException,
895 format!(
896 "The specified log group does not exist: {}",
897 input.log_group_name
898 ),
899 )
900 })?;
901 group.tags.extend(input.tags.clone());
902 Ok(serde_json::json!({}))
903 }
904
905 pub fn handle_untag_log_group(
906 &self,
907 input: &UntagLogGroupInput,
908 ) -> Result<serde_json::Value, LogsError> {
909 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
910 LogsError::with_message(
911 LogsErrorCode::ResourceNotFoundException,
912 format!(
913 "The specified log group does not exist: {}",
914 input.log_group_name
915 ),
916 )
917 })?;
918 for key in &input.tags {
919 group.tags.remove(key);
920 }
921 Ok(serde_json::json!({}))
922 }
923
924 pub fn handle_list_tags_log_group(
925 &self,
926 input: &ListTagsLogGroupInput,
927 ) -> Result<ListTagsLogGroupResponse, LogsError> {
928 let group = self.groups.get(&input.log_group_name).ok_or_else(|| {
929 LogsError::with_message(
930 LogsErrorCode::ResourceNotFoundException,
931 format!(
932 "The specified log group does not exist: {}",
933 input.log_group_name
934 ),
935 )
936 })?;
937 Ok(ListTagsLogGroupResponse {
938 tags: group.tags.clone(),
939 })
940 }
941
942 pub fn handle_tag_resource(
947 &self,
948 input: &TagResourceInput,
949 ) -> Result<serde_json::Value, LogsError> {
950 let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
951 let mut group = self.groups.get_mut(&group_name).ok_or_else(|| {
952 LogsError::with_message(
953 LogsErrorCode::ResourceNotFoundException,
954 format!(
955 "The specified resource does not exist: {}",
956 input.resource_arn
957 ),
958 )
959 })?;
960 group.tags.extend(input.tags.clone());
961 Ok(serde_json::json!({}))
962 }
963
964 pub fn handle_untag_resource(
965 &self,
966 input: &UntagResourceInput,
967 ) -> Result<serde_json::Value, LogsError> {
968 let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
969 let mut group = self.groups.get_mut(&group_name).ok_or_else(|| {
970 LogsError::with_message(
971 LogsErrorCode::ResourceNotFoundException,
972 format!(
973 "The specified resource does not exist: {}",
974 input.resource_arn
975 ),
976 )
977 })?;
978 for key in &input.tag_keys {
979 group.tags.remove(key);
980 }
981 Ok(serde_json::json!({}))
982 }
983
984 pub fn handle_list_tags_for_resource(
985 &self,
986 input: &ListTagsForResourceInput,
987 ) -> Result<ListTagsForResourceResponse, LogsError> {
988 let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
989 let group = self.groups.get(&group_name).ok_or_else(|| {
990 LogsError::with_message(
991 LogsErrorCode::ResourceNotFoundException,
992 format!(
993 "The specified resource does not exist: {}",
994 input.resource_arn
995 ),
996 )
997 })?;
998 Ok(ListTagsForResourceResponse {
999 tags: group.tags.clone(),
1000 })
1001 }
1002
1003 fn resolve_log_group_name_from_arn(arn: &str) -> Result<String, LogsError> {
1004 let parts: Vec<&str> = arn.split(':').collect();
1007 if parts.len() < 7 || parts[2] != "logs" || parts[5] != "log-group" {
1008 return Err(LogsError::with_message(
1009 LogsErrorCode::InvalidParameterException,
1010 format!("Invalid ARN: {arn}"),
1011 ));
1012 }
1013 let name = parts[6];
1014 Ok(name.to_owned())
1016 }
1017
1018 pub fn handle_put_resource_policy(
1023 &self,
1024 input: &PutResourcePolicyInput,
1025 ) -> Result<PutResourcePolicyResponse, LogsError> {
1026 let policy_name = input.policy_name.clone().unwrap_or_default();
1027 let policy_document = input.policy_document.clone().unwrap_or_default();
1028 let now = Self::now_millis();
1029
1030 self.resource_policies.insert(
1031 policy_name.clone(),
1032 ResourcePolicyRecord {
1033 name: policy_name.clone(),
1034 policy_document: policy_document.clone(),
1035 last_updated_time: now,
1036 },
1037 );
1038
1039 Ok(PutResourcePolicyResponse {
1040 resource_policy: Some(ResourcePolicy {
1041 policy_name: Some(policy_name),
1042 policy_document: Some(policy_document),
1043 last_updated_time: Some(now),
1044 ..ResourcePolicy::default()
1045 }),
1046 revision_id: None,
1047 })
1048 }
1049
1050 pub fn handle_delete_resource_policy(
1051 &self,
1052 input: &DeleteResourcePolicyInput,
1053 ) -> Result<serde_json::Value, LogsError> {
1054 let policy_name = input.policy_name.as_deref().unwrap_or("");
1055 self.resource_policies.remove(policy_name).ok_or_else(|| {
1056 LogsError::with_message(
1057 LogsErrorCode::ResourceNotFoundException,
1058 format!("The specified resource policy does not exist: {policy_name}"),
1059 )
1060 })?;
1061 Ok(serde_json::json!({}))
1062 }
1063
1064 pub fn handle_describe_resource_policies(
1065 &self,
1066 _input: &DescribeResourcePoliciesInput,
1067 ) -> Result<DescribeResourcePoliciesResponse, LogsError> {
1068 let policies: Vec<ResourcePolicy> = self
1069 .resource_policies
1070 .iter()
1071 .map(|entry| {
1072 let rp = entry.value();
1073 ResourcePolicy {
1074 policy_name: Some(rp.name.clone()),
1075 policy_document: Some(rp.policy_document.clone()),
1076 last_updated_time: Some(rp.last_updated_time),
1077 ..ResourcePolicy::default()
1078 }
1079 })
1080 .collect();
1081
1082 Ok(DescribeResourcePoliciesResponse {
1083 resource_policies: policies,
1084 next_token: None,
1085 })
1086 }
1087
1088 pub fn handle_put_metric_filter(
1093 &self,
1094 input: &PutMetricFilterInput,
1095 ) -> Result<serde_json::Value, LogsError> {
1096 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1097 LogsError::with_message(
1098 LogsErrorCode::ResourceNotFoundException,
1099 format!(
1100 "The specified log group does not exist: {}",
1101 input.log_group_name
1102 ),
1103 )
1104 })?;
1105
1106 let transformations = serde_json::to_value(&input.metric_transformations)
1107 .unwrap_or_else(|_| serde_json::json!([]));
1108
1109 group.metric_filters.insert(
1110 input.filter_name.clone(),
1111 MetricFilterRecord {
1112 name: input.filter_name.clone(),
1113 filter_pattern: input.filter_pattern.clone(),
1114 log_group_name: input.log_group_name.clone(),
1115 metric_transformations: transformations,
1116 creation_time: Self::now_millis(),
1117 },
1118 );
1119
1120 Ok(serde_json::json!({}))
1121 }
1122
1123 pub fn handle_delete_metric_filter(
1124 &self,
1125 input: &DeleteMetricFilterInput,
1126 ) -> Result<serde_json::Value, LogsError> {
1127 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1128 LogsError::with_message(
1129 LogsErrorCode::ResourceNotFoundException,
1130 format!(
1131 "The specified log group does not exist: {}",
1132 input.log_group_name
1133 ),
1134 )
1135 })?;
1136
1137 group
1138 .metric_filters
1139 .remove(&input.filter_name)
1140 .ok_or_else(|| {
1141 LogsError::with_message(
1142 LogsErrorCode::ResourceNotFoundException,
1143 format!(
1144 "The specified metric filter does not exist: {}",
1145 input.filter_name
1146 ),
1147 )
1148 })?;
1149
1150 Ok(serde_json::json!({}))
1151 }
1152
1153 pub fn handle_describe_metric_filters(
1154 &self,
1155 input: &DescribeMetricFiltersInput,
1156 ) -> Result<DescribeMetricFiltersResponse, LogsError> {
1157 let mut filters: Vec<MetricFilter> = Vec::new();
1158
1159 if let Some(ref group_name) = input.log_group_name {
1160 let group = self.groups.get(group_name).ok_or_else(|| {
1161 LogsError::with_message(
1162 LogsErrorCode::ResourceNotFoundException,
1163 format!("The specified log group does not exist: {group_name}"),
1164 )
1165 })?;
1166
1167 for mf in group.metric_filters.values() {
1168 if let Some(ref prefix) = input.filter_name_prefix {
1169 if !mf.name.starts_with(prefix.as_str()) {
1170 continue;
1171 }
1172 }
1173 let transformations: Vec<rustack_logs_model::types::MetricTransformation> =
1174 serde_json::from_value(mf.metric_transformations.clone()).unwrap_or_default();
1175 filters.push(MetricFilter {
1176 filter_name: Some(mf.name.clone()),
1177 filter_pattern: Some(mf.filter_pattern.clone()),
1178 log_group_name: Some(mf.log_group_name.clone()),
1179 metric_transformations: transformations,
1180 creation_time: Some(mf.creation_time),
1181 ..MetricFilter::default()
1182 });
1183 }
1184 }
1185
1186 Ok(DescribeMetricFiltersResponse {
1187 metric_filters: filters,
1188 next_token: None,
1189 })
1190 }
1191
1192 pub fn handle_test_metric_filter(
1193 &self,
1194 input: &TestMetricFilterInput,
1195 ) -> Result<TestMetricFilterResponse, LogsError> {
1196 let matches: Vec<MetricFilterMatchRecord> = input
1198 .log_event_messages
1199 .iter()
1200 .enumerate()
1201 .filter(|(_, msg)| {
1202 input.filter_pattern.is_empty() || msg.contains(&input.filter_pattern)
1203 })
1204 .map(|(i, msg)| MetricFilterMatchRecord {
1205 event_number: Some(i64::try_from(i + 1).unwrap_or(0)),
1206 event_message: Some(msg.clone()),
1207 extracted_values: HashMap::new(),
1208 })
1209 .collect();
1210
1211 Ok(TestMetricFilterResponse { matches })
1212 }
1213
1214 pub fn handle_put_subscription_filter(
1219 &self,
1220 input: &PutSubscriptionFilterInput,
1221 ) -> Result<serde_json::Value, LogsError> {
1222 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1223 LogsError::with_message(
1224 LogsErrorCode::ResourceNotFoundException,
1225 format!(
1226 "The specified log group does not exist: {}",
1227 input.log_group_name
1228 ),
1229 )
1230 })?;
1231
1232 group.subscription_filters.insert(
1233 input.filter_name.clone(),
1234 SubscriptionFilterRecord {
1235 name: input.filter_name.clone(),
1236 filter_pattern: input.filter_pattern.clone(),
1237 log_group_name: input.log_group_name.clone(),
1238 destination_arn: input.destination_arn.clone(),
1239 role_arn: input.role_arn.clone(),
1240 distribution: input.distribution.as_ref().map(|d| d.as_str().to_owned()),
1241 creation_time: Self::now_millis(),
1242 },
1243 );
1244
1245 Ok(serde_json::json!({}))
1246 }
1247
1248 pub fn handle_delete_subscription_filter(
1249 &self,
1250 input: &DeleteSubscriptionFilterInput,
1251 ) -> Result<serde_json::Value, LogsError> {
1252 let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1253 LogsError::with_message(
1254 LogsErrorCode::ResourceNotFoundException,
1255 format!(
1256 "The specified log group does not exist: {}",
1257 input.log_group_name
1258 ),
1259 )
1260 })?;
1261
1262 group
1263 .subscription_filters
1264 .remove(&input.filter_name)
1265 .ok_or_else(|| {
1266 LogsError::with_message(
1267 LogsErrorCode::ResourceNotFoundException,
1268 format!(
1269 "The specified subscription filter does not exist: {}",
1270 input.filter_name,
1271 ),
1272 )
1273 })?;
1274
1275 Ok(serde_json::json!({}))
1276 }
1277
1278 pub fn handle_describe_subscription_filters(
1279 &self,
1280 input: &DescribeSubscriptionFiltersInput,
1281 ) -> Result<DescribeSubscriptionFiltersResponse, LogsError> {
1282 let group = self.groups.get(&input.log_group_name).ok_or_else(|| {
1283 LogsError::with_message(
1284 LogsErrorCode::ResourceNotFoundException,
1285 format!(
1286 "The specified log group does not exist: {}",
1287 input.log_group_name,
1288 ),
1289 )
1290 })?;
1291
1292 let filters: Vec<SubscriptionFilter> = group
1293 .subscription_filters
1294 .values()
1295 .filter(|sf| {
1296 if let Some(ref prefix) = input.filter_name_prefix {
1297 sf.name.starts_with(prefix.as_str())
1298 } else {
1299 true
1300 }
1301 })
1302 .map(|sf| SubscriptionFilter {
1303 filter_name: Some(sf.name.clone()),
1304 filter_pattern: Some(sf.filter_pattern.clone()),
1305 log_group_name: Some(sf.log_group_name.clone()),
1306 destination_arn: Some(sf.destination_arn.clone()),
1307 role_arn: sf.role_arn.clone(),
1308 distribution: sf
1309 .distribution
1310 .as_ref()
1311 .map(|d| rustack_logs_model::types::Distribution::from(d.as_str())),
1312 creation_time: Some(sf.creation_time),
1313 ..SubscriptionFilter::default()
1314 })
1315 .collect();
1316
1317 Ok(DescribeSubscriptionFiltersResponse {
1318 subscription_filters: filters,
1319 next_token: None,
1320 })
1321 }
1322
1323 pub fn handle_put_destination(
1328 &self,
1329 input: &PutDestinationInput,
1330 ) -> Result<PutDestinationResponse, LogsError> {
1331 let arn = self.destination_arn(&input.destination_name);
1332 let now = Self::now_millis();
1333
1334 self.destinations.insert(
1335 input.destination_name.clone(),
1336 DestinationRecord {
1337 name: input.destination_name.clone(),
1338 target_arn: input.target_arn.clone(),
1339 role_arn: input.role_arn.clone(),
1340 access_policy: None,
1341 arn: arn.clone(),
1342 creation_time: now,
1343 },
1344 );
1345
1346 Ok(PutDestinationResponse {
1347 destination: Some(Destination {
1348 destination_name: Some(input.destination_name.clone()),
1349 target_arn: Some(input.target_arn.clone()),
1350 role_arn: Some(input.role_arn.clone()),
1351 arn: Some(arn),
1352 creation_time: Some(now),
1353 access_policy: None,
1354 }),
1355 })
1356 }
1357
1358 pub fn handle_put_destination_policy(
1359 &self,
1360 input: &PutDestinationPolicyInput,
1361 ) -> Result<serde_json::Value, LogsError> {
1362 let mut dest = self
1363 .destinations
1364 .get_mut(&input.destination_name)
1365 .ok_or_else(|| {
1366 LogsError::with_message(
1367 LogsErrorCode::ResourceNotFoundException,
1368 format!(
1369 "The specified destination does not exist: {}",
1370 input.destination_name,
1371 ),
1372 )
1373 })?;
1374
1375 dest.access_policy = Some(input.access_policy.clone());
1376 Ok(serde_json::json!({}))
1377 }
1378
1379 pub fn handle_delete_destination(
1380 &self,
1381 input: &DeleteDestinationInput,
1382 ) -> Result<serde_json::Value, LogsError> {
1383 self.destinations
1384 .remove(&input.destination_name)
1385 .ok_or_else(|| {
1386 LogsError::with_message(
1387 LogsErrorCode::ResourceNotFoundException,
1388 format!(
1389 "The specified destination does not exist: {}",
1390 input.destination_name,
1391 ),
1392 )
1393 })?;
1394 Ok(serde_json::json!({}))
1395 }
1396
1397 pub fn handle_describe_destinations(
1398 &self,
1399 input: &DescribeDestinationsInput,
1400 ) -> Result<DescribeDestinationsResponse, LogsError> {
1401 let destinations: Vec<Destination> = self
1402 .destinations
1403 .iter()
1404 .filter(|entry| {
1405 if let Some(ref prefix) = input.destination_name_prefix {
1406 entry.value().name.starts_with(prefix.as_str())
1407 } else {
1408 true
1409 }
1410 })
1411 .map(|entry| {
1412 let d = entry.value();
1413 Destination {
1414 destination_name: Some(d.name.clone()),
1415 target_arn: Some(d.target_arn.clone()),
1416 role_arn: Some(d.role_arn.clone()),
1417 access_policy: d.access_policy.clone(),
1418 arn: Some(d.arn.clone()),
1419 creation_time: Some(d.creation_time),
1420 }
1421 })
1422 .collect();
1423
1424 Ok(DescribeDestinationsResponse {
1425 destinations,
1426 next_token: None,
1427 })
1428 }
1429
1430 pub fn handle_start_query(
1435 &self,
1436 _input: &StartQueryInput,
1437 ) -> Result<StartQueryResponse, LogsError> {
1438 let query_id = uuid::Uuid::new_v4().to_string();
1439 Ok(StartQueryResponse {
1440 query_id: Some(query_id),
1441 })
1442 }
1443
1444 pub fn handle_stop_query(
1445 &self,
1446 _input: &StopQueryInput,
1447 ) -> Result<StopQueryResponse, LogsError> {
1448 Ok(StopQueryResponse {
1449 success: Some(true),
1450 })
1451 }
1452
1453 pub fn handle_get_query_results(
1454 &self,
1455 _input: &GetQueryResultsInput,
1456 ) -> Result<GetQueryResultsResponse, LogsError> {
1457 Ok(GetQueryResultsResponse {
1458 results: Vec::new(),
1459 statistics: Some(QueryStatistics {
1460 records_matched: Some(0.0),
1461 records_scanned: Some(0.0),
1462 bytes_scanned: Some(0.0),
1463 ..QueryStatistics::default()
1464 }),
1465 status: Some(QueryStatus::Complete),
1466 ..GetQueryResultsResponse::default()
1467 })
1468 }
1469
1470 pub fn handle_describe_queries(
1471 &self,
1472 _input: &DescribeQueriesInput,
1473 ) -> Result<DescribeQueriesResponse, LogsError> {
1474 Ok(DescribeQueriesResponse {
1475 queries: Vec::new(),
1476 next_token: None,
1477 })
1478 }
1479
1480 pub fn handle_put_query_definition(
1485 &self,
1486 input: &PutQueryDefinitionInput,
1487 ) -> Result<PutQueryDefinitionResponse, LogsError> {
1488 let id = input
1489 .query_definition_id
1490 .clone()
1491 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1492
1493 self.query_definitions.insert(
1494 id.clone(),
1495 QueryDefinitionRecord {
1496 query_definition_id: id.clone(),
1497 name: input.name.clone(),
1498 query_string: input.query_string.clone(),
1499 log_group_names: input.log_group_names.clone(),
1500 last_modified: Self::now_millis(),
1501 },
1502 );
1503
1504 Ok(PutQueryDefinitionResponse {
1505 query_definition_id: Some(id),
1506 })
1507 }
1508
1509 pub fn handle_delete_query_definition(
1510 &self,
1511 input: &DeleteQueryDefinitionInput,
1512 ) -> Result<rustack_logs_model::output::DeleteQueryDefinitionResponse, LogsError> {
1513 let removed = self
1514 .query_definitions
1515 .remove(&input.query_definition_id)
1516 .is_some();
1517 Ok(rustack_logs_model::output::DeleteQueryDefinitionResponse {
1518 success: Some(removed),
1519 })
1520 }
1521
1522 pub fn handle_describe_query_definitions(
1523 &self,
1524 input: &DescribeQueryDefinitionsInput,
1525 ) -> Result<DescribeQueryDefinitionsResponse, LogsError> {
1526 let defs: Vec<QueryDefinition> = self
1527 .query_definitions
1528 .iter()
1529 .filter(|entry| {
1530 if let Some(ref prefix) = input.query_definition_name_prefix {
1531 entry.value().name.starts_with(prefix.as_str())
1532 } else {
1533 true
1534 }
1535 })
1536 .map(|entry| {
1537 let qd = entry.value();
1538 QueryDefinition {
1539 query_definition_id: Some(qd.query_definition_id.clone()),
1540 name: Some(qd.name.clone()),
1541 query_string: Some(qd.query_string.clone()),
1542 log_group_names: qd.log_group_names.clone(),
1543 last_modified: Some(qd.last_modified),
1544 ..QueryDefinition::default()
1545 }
1546 })
1547 .collect();
1548
1549 Ok(DescribeQueryDefinitionsResponse {
1550 query_definitions: defs,
1551 next_token: None,
1552 })
1553 }
1554
1555 pub fn handle_associate_kms_key(
1560 &self,
1561 input: &AssociateKmsKeyInput,
1562 ) -> Result<serde_json::Value, LogsError> {
1563 if let Some(ref group_name) = input.log_group_name {
1564 let mut group = self.groups.get_mut(group_name).ok_or_else(|| {
1565 LogsError::with_message(
1566 LogsErrorCode::ResourceNotFoundException,
1567 format!("The specified log group does not exist: {group_name}"),
1568 )
1569 })?;
1570 group.kms_key_id = Some(input.kms_key_id.clone());
1571 }
1572 Ok(serde_json::json!({}))
1573 }
1574
1575 pub fn handle_disassociate_kms_key(
1576 &self,
1577 input: &DisassociateKmsKeyInput,
1578 ) -> Result<serde_json::Value, LogsError> {
1579 if let Some(ref group_name) = input.log_group_name {
1580 let mut group = self.groups.get_mut(group_name).ok_or_else(|| {
1581 LogsError::with_message(
1582 LogsErrorCode::ResourceNotFoundException,
1583 format!("The specified log group does not exist: {group_name}"),
1584 )
1585 })?;
1586 group.kms_key_id = None;
1587 }
1588 Ok(serde_json::json!({}))
1589 }
1590}