Skip to main content

rustack_logs_core/
provider.rs

1//! CloudWatch Logs provider implementing all Phase 0 through Phase 3 operations.
2//!
3//! The provider uses `DashMap` for concurrent access to log groups, keeping
4//! the design simple without an actor model.
5
6use std::collections::HashMap;
7
8use dashmap::{DashMap, mapref::entry::Entry};
9use rustack_logs_model::{
10    error::{LogsError, LogsErrorCode},
11    input::{
12        AssociateKmsKeyInput, CreateLogGroupInput, CreateLogStreamInput, DeleteDestinationInput,
13        DeleteLogGroupInput, DeleteLogStreamInput, DeleteMetricFilterInput,
14        DeleteQueryDefinitionInput, DeleteResourcePolicyInput, DeleteRetentionPolicyInput,
15        DeleteSubscriptionFilterInput, DescribeDestinationsInput, DescribeLogGroupsInput,
16        DescribeLogStreamsInput, DescribeMetricFiltersInput, DescribeQueriesInput,
17        DescribeQueryDefinitionsInput, DescribeResourcePoliciesInput,
18        DescribeSubscriptionFiltersInput, DisassociateKmsKeyInput, FilterLogEventsInput,
19        GetLogEventsInput, GetQueryResultsInput, ListTagsForResourceInput, ListTagsLogGroupInput,
20        PutDestinationInput, PutDestinationPolicyInput, PutLogEventsInput, PutMetricFilterInput,
21        PutQueryDefinitionInput, PutResourcePolicyInput, PutRetentionPolicyInput,
22        PutSubscriptionFilterInput, StartQueryInput, StopQueryInput, TagLogGroupInput,
23        TagResourceInput, TestMetricFilterInput, UntagLogGroupInput, UntagResourceInput,
24    },
25    output::{
26        DescribeDestinationsResponse, DescribeLogGroupsResponse, DescribeLogStreamsResponse,
27        DescribeMetricFiltersResponse, DescribeQueriesResponse, DescribeQueryDefinitionsResponse,
28        DescribeResourcePoliciesResponse, DescribeSubscriptionFiltersResponse,
29        FilterLogEventsResponse, GetLogEventsResponse, GetQueryResultsResponse,
30        ListTagsForResourceResponse, ListTagsLogGroupResponse, PutDestinationResponse,
31        PutLogEventsResponse, PutQueryDefinitionResponse, PutResourcePolicyResponse,
32        StartQueryResponse, StopQueryResponse, TestMetricFilterResponse,
33    },
34    types::{
35        Destination, FilteredLogEvent, LogGroup, LogStream, MetricFilter, MetricFilterMatchRecord,
36        OutputLogEvent, QueryDefinition, QueryStatistics, QueryStatus, ResourcePolicy,
37        SearchedLogStream, SubscriptionFilter,
38    },
39};
40
41use crate::config::LogsConfig;
42
43/// Maximum number of events per `PutLogEvents` call.
44const MAX_PUT_LOG_EVENTS: usize = 10_000;
45
46/// Maximum batch size in bytes for `PutLogEvents`.
47const MAX_BATCH_SIZE_BYTES: usize = 1_048_576;
48
49/// Maximum age for log events (14 days in milliseconds).
50const MAX_EVENT_AGE_MS: i64 = 14 * 24 * 60 * 60 * 1000;
51
52/// Maximum future tolerance for log events (2 hours in milliseconds).
53const MAX_FUTURE_MS: i64 = 2 * 60 * 60 * 1000;
54
55/// Default page size for list/describe operations.
56const DEFAULT_PAGE_SIZE: usize = 50;
57
58/// Resolve the page size from an optional limit, clamping to `DEFAULT_PAGE_SIZE`.
59fn resolve_page_size(limit: Option<i32>) -> usize {
60    limit.map_or(DEFAULT_PAGE_SIZE, |l| {
61        usize::try_from(l.max(0))
62            .unwrap_or(DEFAULT_PAGE_SIZE)
63            .min(DEFAULT_PAGE_SIZE)
64    })
65}
66
67// ---------------------------------------------------------------------------
68// Internal state types
69// ---------------------------------------------------------------------------
70
71#[derive(Debug)]
72struct LogGroupRecord {
73    name: String,
74    arn: String,
75    creation_time: i64,
76    retention_in_days: Option<i32>,
77    kms_key_id: Option<String>,
78    tags: HashMap<String, String>,
79    streams: HashMap<String, LogStreamRecord>,
80    metric_filters: HashMap<String, MetricFilterRecord>,
81    subscription_filters: HashMap<String, SubscriptionFilterRecord>,
82    stored_bytes: i64,
83}
84
85#[derive(Debug)]
86struct LogStreamRecord {
87    name: String,
88    arn: String,
89    creation_time: i64,
90    first_event_timestamp: Option<i64>,
91    last_event_timestamp: Option<i64>,
92    last_ingestion_time: Option<i64>,
93    upload_sequence_token: String,
94    events: Vec<StoredLogEvent>,
95}
96
97#[derive(Debug)]
98struct StoredLogEvent {
99    timestamp: i64,
100    message: String,
101    ingestion_time: i64,
102}
103
104#[derive(Debug)]
105struct MetricFilterRecord {
106    name: String,
107    filter_pattern: String,
108    log_group_name: String,
109    metric_transformations: serde_json::Value,
110    creation_time: i64,
111}
112
113#[derive(Debug)]
114struct SubscriptionFilterRecord {
115    name: String,
116    filter_pattern: String,
117    log_group_name: String,
118    destination_arn: String,
119    role_arn: Option<String>,
120    distribution: Option<String>,
121    creation_time: i64,
122}
123
124#[derive(Debug)]
125struct ResourcePolicyRecord {
126    name: String,
127    policy_document: String,
128    last_updated_time: i64,
129}
130
131#[derive(Debug)]
132struct DestinationRecord {
133    name: String,
134    target_arn: String,
135    role_arn: String,
136    access_policy: Option<String>,
137    arn: String,
138    creation_time: i64,
139}
140
141#[derive(Debug)]
142struct QueryDefinitionRecord {
143    query_definition_id: String,
144    name: String,
145    query_string: String,
146    log_group_names: Vec<String>,
147    last_modified: i64,
148}
149
150// ---------------------------------------------------------------------------
151// Provider
152// ---------------------------------------------------------------------------
153
154/// CloudWatch Logs provider with `DashMap`-based in-memory storage.
155#[derive(Debug)]
156pub struct RustackLogs {
157    config: LogsConfig,
158    groups: DashMap<String, LogGroupRecord>,
159    resource_policies: DashMap<String, ResourcePolicyRecord>,
160    destinations: DashMap<String, DestinationRecord>,
161    query_definitions: DashMap<String, QueryDefinitionRecord>,
162}
163
164impl RustackLogs {
165    /// Create a new provider with the given configuration.
166    #[must_use]
167    pub fn new(config: LogsConfig) -> Self {
168        Self {
169            config,
170            groups: DashMap::new(),
171            resource_policies: DashMap::new(),
172            destinations: DashMap::new(),
173            query_definitions: DashMap::new(),
174        }
175    }
176
177    fn log_group_arn(&self, name: &str) -> String {
178        format!(
179            "arn:aws:logs:{}:{}:log-group:{}",
180            self.config.default_region, self.config.account_id, name,
181        )
182    }
183
184    fn log_stream_arn(&self, group_name: &str, stream_name: &str) -> String {
185        format!(
186            "arn:aws:logs:{}:{}:log-group:{}:log-stream:{}",
187            self.config.default_region, self.config.account_id, group_name, stream_name,
188        )
189    }
190
191    fn destination_arn(&self, name: &str) -> String {
192        format!(
193            "arn:aws:logs:{}:{}:destination:{}",
194            self.config.default_region, self.config.account_id, name,
195        )
196    }
197
198    fn now_millis() -> i64 {
199        chrono::Utc::now().timestamp_millis()
200    }
201
202    fn validate_log_group_name(name: &str) -> Result<(), LogsError> {
203        if name.is_empty() || name.len() > 512 {
204            return Err(LogsError::with_message(
205                LogsErrorCode::InvalidParameterException,
206                format!(
207                    "Log group name must be between 1 and 512 characters, got {}",
208                    name.len()
209                ),
210            ));
211        }
212        // Pattern: [\.\-_/#A-Za-z0-9]+
213        if !name
214            .chars()
215            .all(|c| c.is_ascii_alphanumeric() || "._-/#".contains(c))
216        {
217            return Err(LogsError::with_message(
218                LogsErrorCode::InvalidParameterException,
219                format!("Log group name does not match pattern [.\\-_/#A-Za-z0-9]+: {name}"),
220            ));
221        }
222        Ok(())
223    }
224
225    fn validate_log_stream_name(name: &str) -> Result<(), LogsError> {
226        if name.is_empty() || name.len() > 512 {
227            return Err(LogsError::with_message(
228                LogsErrorCode::InvalidParameterException,
229                format!(
230                    "Log stream name must be between 1 and 512 characters, got {}",
231                    name.len()
232                ),
233            ));
234        }
235        if name.contains(':') || name.contains('*') {
236            return Err(LogsError::with_message(
237                LogsErrorCode::InvalidParameterException,
238                format!("Log stream name must not contain ':' or '*': {name}"),
239            ));
240        }
241        Ok(())
242    }
243
244    // -----------------------------------------------------------------------
245    // Phase 0: Log Group Management
246    // -----------------------------------------------------------------------
247
248    pub fn handle_create_log_group(
249        &self,
250        input: &CreateLogGroupInput,
251    ) -> Result<serde_json::Value, LogsError> {
252        Self::validate_log_group_name(&input.log_group_name)?;
253
254        match self.groups.entry(input.log_group_name.clone()) {
255            Entry::Occupied(_) => Err(LogsError::with_message(
256                LogsErrorCode::ResourceAlreadyExistsException,
257                format!(
258                    "The specified log group already exists: {}",
259                    input.log_group_name
260                ),
261            )),
262            Entry::Vacant(entry) => {
263                let arn = self.log_group_arn(&input.log_group_name);
264                entry.insert(LogGroupRecord {
265                    name: input.log_group_name.clone(),
266                    arn,
267                    creation_time: Self::now_millis(),
268                    retention_in_days: None,
269                    kms_key_id: input.kms_key_id.clone(),
270                    tags: input.tags.clone(),
271                    streams: HashMap::new(),
272                    metric_filters: HashMap::new(),
273                    subscription_filters: HashMap::new(),
274                    stored_bytes: 0,
275                });
276                Ok(serde_json::json!({}))
277            }
278        }
279    }
280
281    pub fn handle_delete_log_group(
282        &self,
283        input: &DeleteLogGroupInput,
284    ) -> Result<serde_json::Value, LogsError> {
285        self.groups.remove(&input.log_group_name).ok_or_else(|| {
286            LogsError::with_message(
287                LogsErrorCode::ResourceNotFoundException,
288                format!(
289                    "The specified log group does not exist: {}",
290                    input.log_group_name
291                ),
292            )
293        })?;
294        Ok(serde_json::json!({}))
295    }
296
297    pub fn handle_describe_log_groups(
298        &self,
299        input: &DescribeLogGroupsInput,
300    ) -> Result<DescribeLogGroupsResponse, LogsError> {
301        let page_size = resolve_page_size(input.limit);
302        let mut groups: Vec<LogGroup> = Vec::new();
303
304        for entry in &self.groups {
305            let g = entry.value();
306
307            // Filter by prefix
308            if let Some(ref prefix) = input.log_group_name_prefix {
309                if !g.name.starts_with(prefix.as_str()) {
310                    continue;
311                }
312            }
313
314            // Filter by pattern (simple substring match)
315            if let Some(ref pattern) = input.log_group_name_pattern {
316                if !g.name.contains(pattern.as_str()) {
317                    continue;
318                }
319            }
320
321            groups.push(LogGroup {
322                log_group_name: Some(g.name.clone()),
323                log_group_arn: Some(format!("{}:*", g.arn)),
324                arn: Some(format!("{}:*", g.arn)),
325                creation_time: Some(g.creation_time),
326                retention_in_days: g.retention_in_days,
327                kms_key_id: g.kms_key_id.clone(),
328                metric_filter_count: Some(
329                    i32::try_from(g.metric_filters.len()).unwrap_or(i32::MAX),
330                ),
331                stored_bytes: Some(g.stored_bytes),
332                ..LogGroup::default()
333            });
334        }
335
336        groups.sort_by(|a, b| a.log_group_name.cmp(&b.log_group_name));
337
338        // Name-based cursor pagination (stable across concurrent modifications).
339        if let Some(ref cursor) = input.next_token {
340            groups.retain(|g| {
341                g.log_group_name
342                    .as_ref()
343                    .is_some_and(|n| n.as_str() > cursor.as_str())
344            });
345        }
346
347        let has_more = groups.len() > page_size;
348        groups.truncate(page_size);
349        let next_token = if has_more {
350            groups.last().and_then(|g| g.log_group_name.clone())
351        } else {
352            None
353        };
354
355        Ok(DescribeLogGroupsResponse {
356            log_groups: groups,
357            next_token,
358        })
359    }
360
361    // -----------------------------------------------------------------------
362    // Phase 0: Log Stream Management
363    // -----------------------------------------------------------------------
364
365    pub fn handle_create_log_stream(
366        &self,
367        input: &CreateLogStreamInput,
368    ) -> Result<serde_json::Value, LogsError> {
369        Self::validate_log_stream_name(&input.log_stream_name)?;
370
371        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
372            LogsError::with_message(
373                LogsErrorCode::ResourceNotFoundException,
374                format!(
375                    "The specified log group does not exist: {}",
376                    input.log_group_name
377                ),
378            )
379        })?;
380
381        if group.streams.contains_key(&input.log_stream_name) {
382            return Err(LogsError::with_message(
383                LogsErrorCode::ResourceAlreadyExistsException,
384                format!(
385                    "The specified log stream already exists: {}",
386                    input.log_stream_name,
387                ),
388            ));
389        }
390
391        let arn = self.log_stream_arn(&input.log_group_name, &input.log_stream_name);
392        group.streams.insert(
393            input.log_stream_name.clone(),
394            LogStreamRecord {
395                name: input.log_stream_name.clone(),
396                arn,
397                creation_time: Self::now_millis(),
398                first_event_timestamp: None,
399                last_event_timestamp: None,
400                last_ingestion_time: None,
401                upload_sequence_token: uuid::Uuid::new_v4().to_string(),
402                events: Vec::new(),
403            },
404        );
405
406        Ok(serde_json::json!({}))
407    }
408
409    pub fn handle_delete_log_stream(
410        &self,
411        input: &DeleteLogStreamInput,
412    ) -> Result<serde_json::Value, LogsError> {
413        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
414            LogsError::with_message(
415                LogsErrorCode::ResourceNotFoundException,
416                format!(
417                    "The specified log group does not exist: {}",
418                    input.log_group_name
419                ),
420            )
421        })?;
422
423        group
424            .streams
425            .remove(&input.log_stream_name)
426            .ok_or_else(|| {
427                LogsError::with_message(
428                    LogsErrorCode::ResourceNotFoundException,
429                    format!(
430                        "The specified log stream does not exist: {}",
431                        input.log_stream_name,
432                    ),
433                )
434            })?;
435
436        Ok(serde_json::json!({}))
437    }
438
439    pub fn handle_describe_log_streams(
440        &self,
441        input: &DescribeLogStreamsInput,
442    ) -> Result<DescribeLogStreamsResponse, LogsError> {
443        let log_group_name = input
444            .log_group_name
445            .as_deref()
446            .or(input.log_group_identifier.as_deref())
447            .ok_or_else(|| {
448                LogsError::with_message(
449                    LogsErrorCode::InvalidParameterException,
450                    "Either logGroupName or logGroupIdentifier must be specified",
451                )
452            })?;
453
454        let group = self.groups.get(log_group_name).ok_or_else(|| {
455            LogsError::with_message(
456                LogsErrorCode::ResourceNotFoundException,
457                format!("The specified log group does not exist: {log_group_name}"),
458            )
459        })?;
460
461        let page_size = resolve_page_size(input.limit);
462        let mut streams: Vec<LogStream> = group
463            .streams
464            .values()
465            .filter(|s| {
466                if let Some(ref prefix) = input.log_stream_name_prefix {
467                    s.name.starts_with(prefix.as_str())
468                } else {
469                    true
470                }
471            })
472            .map(|s| LogStream {
473                log_stream_name: Some(s.name.clone()),
474                arn: Some(s.arn.clone()),
475                creation_time: Some(s.creation_time),
476                first_event_timestamp: s.first_event_timestamp,
477                last_event_timestamp: s.last_event_timestamp,
478                last_ingestion_time: s.last_ingestion_time,
479                upload_sequence_token: Some(s.upload_sequence_token.clone()),
480                stored_bytes: Some(
481                    i64::try_from(s.events.iter().map(|e| e.message.len()).sum::<usize>())
482                        .unwrap_or(0),
483                ),
484            })
485            .collect();
486
487        // Sort by order
488        let descending = input.descending.unwrap_or(false);
489        match input.order_by {
490            Some(rustack_logs_model::types::OrderBy::LastEventTime) => {
491                streams.sort_by(|a, b| {
492                    let ta = a.last_event_timestamp.unwrap_or(0);
493                    let tb = b.last_event_timestamp.unwrap_or(0);
494                    if descending { tb.cmp(&ta) } else { ta.cmp(&tb) }
495                });
496            }
497            _ => {
498                streams.sort_by(|a, b| {
499                    let cmp = a.log_stream_name.cmp(&b.log_stream_name);
500                    if descending { cmp.reverse() } else { cmp }
501                });
502            }
503        }
504
505        // Name-based cursor pagination.
506        if let Some(ref cursor) = input.next_token {
507            streams.retain(|s| {
508                s.log_stream_name
509                    .as_ref()
510                    .is_some_and(|n| n.as_str() > cursor.as_str())
511            });
512        }
513
514        let has_more = streams.len() > page_size;
515        streams.truncate(page_size);
516        let next_token = if has_more {
517            streams.last().and_then(|s| s.log_stream_name.clone())
518        } else {
519            None
520        };
521
522        Ok(DescribeLogStreamsResponse {
523            log_streams: streams,
524            next_token,
525        })
526    }
527
528    // -----------------------------------------------------------------------
529    // Phase 0: Log Events
530    // -----------------------------------------------------------------------
531
532    pub fn handle_put_log_events(
533        &self,
534        input: &PutLogEventsInput,
535    ) -> Result<PutLogEventsResponse, LogsError> {
536        if input.log_events.len() > MAX_PUT_LOG_EVENTS {
537            return Err(LogsError::with_message(
538                LogsErrorCode::InvalidParameterException,
539                format!(
540                    "Log events in a single PutLogEvents request cannot exceed \
541                     {MAX_PUT_LOG_EVENTS}"
542                ),
543            ));
544        }
545
546        // Check batch size
547        let total_size: usize = input
548            .log_events
549            .iter()
550            .map(|e| e.message.len() + 26) // 26 bytes overhead per event
551            .sum();
552        if total_size > MAX_BATCH_SIZE_BYTES {
553            return Err(LogsError::with_message(
554                LogsErrorCode::InvalidParameterException,
555                "The batch of log events in a single PutLogEvents request cannot exceed 1 MB",
556            ));
557        }
558
559        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
560            LogsError::with_message(
561                LogsErrorCode::ResourceNotFoundException,
562                format!(
563                    "The specified log group does not exist: {}",
564                    input.log_group_name
565                ),
566            )
567        })?;
568
569        let stream = group
570            .streams
571            .get_mut(&input.log_stream_name)
572            .ok_or_else(|| {
573                LogsError::with_message(
574                    LogsErrorCode::ResourceNotFoundException,
575                    format!(
576                        "The specified log stream does not exist: {}",
577                        input.log_stream_name,
578                    ),
579                )
580            })?;
581
582        let now = Self::now_millis();
583
584        // Validate timestamps
585        for event in &input.log_events {
586            if now - event.timestamp > MAX_EVENT_AGE_MS {
587                return Err(LogsError::with_message(
588                    LogsErrorCode::InvalidParameterException,
589                    "Log event timestamp is too old (more than 14 days ago)",
590                ));
591            }
592            if event.timestamp - now > MAX_FUTURE_MS {
593                return Err(LogsError::with_message(
594                    LogsErrorCode::InvalidParameterException,
595                    "Log event timestamp is too far in the future (more than 2 hours)",
596                ));
597            }
598        }
599
600        // Sort events by timestamp and append
601        let mut sorted_events: Vec<&rustack_logs_model::types::InputLogEvent> =
602            input.log_events.iter().collect();
603        sorted_events.sort_by_key(|e| e.timestamp);
604
605        let ingestion_time = now;
606        for event in &sorted_events {
607            stream.events.push(StoredLogEvent {
608                timestamp: event.timestamp,
609                message: event.message.clone(),
610                ingestion_time,
611            });
612        }
613
614        // Re-sort to maintain time order across multiple PutLogEvents calls.
615        stream.events.sort_by_key(|e| e.timestamp);
616
617        // Update stream metadata
618        if let Some(first) = sorted_events.first() {
619            if stream.first_event_timestamp.is_none()
620                || first.timestamp < stream.first_event_timestamp.unwrap_or(i64::MAX)
621            {
622                stream.first_event_timestamp = Some(first.timestamp);
623            }
624        }
625        if let Some(last) = sorted_events.last() {
626            stream.last_event_timestamp = Some(last.timestamp);
627        }
628        stream.last_ingestion_time = Some(ingestion_time);
629
630        // Update sequence token
631        let new_token = uuid::Uuid::new_v4().to_string();
632        stream.upload_sequence_token.clone_from(&new_token);
633
634        // Update stored bytes
635        let added_bytes: i64 = sorted_events
636            .iter()
637            .map(|e| i64::try_from(e.message.len()).unwrap_or(0))
638            .sum();
639        group.stored_bytes += added_bytes;
640
641        Ok(PutLogEventsResponse {
642            next_sequence_token: Some(new_token),
643            rejected_log_events_info: None,
644            rejected_entity_info: None,
645        })
646    }
647
648    pub fn handle_get_log_events(
649        &self,
650        input: &GetLogEventsInput,
651    ) -> Result<GetLogEventsResponse, LogsError> {
652        let log_group_name = input
653            .log_group_name
654            .as_deref()
655            .or(input.log_group_identifier.as_deref())
656            .ok_or_else(|| {
657                LogsError::with_message(
658                    LogsErrorCode::InvalidParameterException,
659                    "Either logGroupName or logGroupIdentifier must be specified",
660                )
661            })?;
662
663        let group = self.groups.get(log_group_name).ok_or_else(|| {
664            LogsError::with_message(
665                LogsErrorCode::ResourceNotFoundException,
666                format!("The specified log group does not exist: {log_group_name}"),
667            )
668        })?;
669
670        let stream = group.streams.get(&input.log_stream_name).ok_or_else(|| {
671            LogsError::with_message(
672                LogsErrorCode::ResourceNotFoundException,
673                format!(
674                    "The specified log stream does not exist: {}",
675                    input.log_stream_name,
676                ),
677            )
678        })?;
679
680        let limit = input
681            .limit
682            .map_or(10_000, |l| usize::try_from(l.max(1)).unwrap_or(10_000));
683
684        let start_time = input.start_time.unwrap_or(0);
685        let end_time = input.end_time.unwrap_or(i64::MAX);
686        let start_from_head = input.start_from_head.unwrap_or(false);
687
688        let filtered: Vec<(usize, &StoredLogEvent)> = stream
689            .events
690            .iter()
691            .enumerate()
692            .filter(|(_, e)| e.timestamp >= start_time && e.timestamp < end_time)
693            .collect();
694
695        // Decode cursor from next_token if provided (format: "f/{index}" or "b/{index}")
696        let cursor = input.next_token.as_ref().and_then(|t| {
697            let parts: Vec<&str> = t.splitn(2, '/').collect();
698            if parts.len() == 2 {
699                parts[1].parse::<usize>().ok().map(|idx| (parts[0], idx))
700            } else {
701                None
702            }
703        });
704
705        let (events, forward_idx, backward_idx) =
706            if start_from_head || cursor.as_ref().is_some_and(|(dir, _)| *dir == "f") {
707                let start_idx = cursor.map_or(0, |(_, idx)| idx);
708                let page: Vec<OutputLogEvent> = filtered
709                    .iter()
710                    .filter(|(orig_idx, _)| *orig_idx >= start_idx)
711                    .take(limit)
712                    .map(|(_, e)| OutputLogEvent {
713                        timestamp: Some(e.timestamp),
714                        message: Some(e.message.clone()),
715                        ingestion_time: Some(e.ingestion_time),
716                    })
717                    .collect();
718                let fwd = filtered
719                    .iter()
720                    .filter(|(orig_idx, _)| *orig_idx >= start_idx)
721                    .nth(limit)
722                    .map(|(idx, _)| *idx);
723                let bwd = if start_idx > 0 { Some(start_idx) } else { None };
724                (page, fwd, bwd)
725            } else {
726                let end_idx = cursor.map_or(filtered.len(), |(_, idx)| {
727                    filtered
728                        .iter()
729                        .position(|(orig, _)| *orig >= idx)
730                        .unwrap_or(filtered.len())
731                });
732                let start_pos = end_idx.saturating_sub(limit);
733                let page: Vec<OutputLogEvent> = filtered[start_pos..end_idx]
734                    .iter()
735                    .map(|(_, e)| OutputLogEvent {
736                        timestamp: Some(e.timestamp),
737                        message: Some(e.message.clone()),
738                        ingestion_time: Some(e.ingestion_time),
739                    })
740                    .collect();
741                let fwd = filtered.get(end_idx).map(|(idx, _)| *idx);
742                let bwd = if start_pos > 0 {
743                    Some(filtered[start_pos].0)
744                } else {
745                    None
746                };
747                (page, fwd, bwd)
748            };
749
750        let forward_token = forward_idx.map(|idx| format!("f/{idx}"));
751        let backward_token = backward_idx.map(|idx| format!("b/{idx}"));
752
753        Ok(GetLogEventsResponse {
754            events,
755            next_forward_token: forward_token,
756            next_backward_token: backward_token,
757        })
758    }
759
760    pub fn handle_filter_log_events(
761        &self,
762        input: &FilterLogEventsInput,
763    ) -> Result<FilterLogEventsResponse, LogsError> {
764        let log_group_name = input
765            .log_group_name
766            .as_deref()
767            .or(input.log_group_identifier.as_deref())
768            .ok_or_else(|| {
769                LogsError::with_message(
770                    LogsErrorCode::InvalidParameterException,
771                    "Either logGroupName or logGroupIdentifier must be specified",
772                )
773            })?;
774
775        let group = self.groups.get(log_group_name).ok_or_else(|| {
776            LogsError::with_message(
777                LogsErrorCode::ResourceNotFoundException,
778                format!("The specified log group does not exist: {log_group_name}"),
779            )
780        })?;
781
782        let start_time = input.start_time.unwrap_or(0);
783        let end_time = input.end_time.unwrap_or(i64::MAX);
784        let limit = resolve_page_size(input.limit);
785        let filter_pattern = input.filter_pattern.as_deref().unwrap_or("");
786
787        let mut events: Vec<FilteredLogEvent> = Vec::new();
788        let mut searched_streams: Vec<SearchedLogStream> = Vec::new();
789
790        // Collect ALL matching events from all streams (no per-stream break).
791        for (stream_name, stream) in &group.streams {
792            // Filter by specific stream names or prefix
793            if !input.log_stream_names.is_empty() && !input.log_stream_names.contains(stream_name) {
794                continue;
795            }
796            if let Some(ref prefix) = input.log_stream_name_prefix {
797                if !stream_name.starts_with(prefix.as_str()) {
798                    continue;
799                }
800            }
801
802            for event in &stream.events {
803                // end_time is exclusive per AWS docs
804                if event.timestamp < start_time || event.timestamp >= end_time {
805                    continue;
806                }
807                // Empty pattern matches all
808                if !filter_pattern.is_empty() && !event.message.contains(filter_pattern) {
809                    continue;
810                }
811
812                events.push(FilteredLogEvent {
813                    log_stream_name: Some(stream_name.clone()),
814                    timestamp: Some(event.timestamp),
815                    message: Some(event.message.clone()),
816                    ingestion_time: Some(event.ingestion_time),
817                    event_id: Some(uuid::Uuid::new_v4().to_string()),
818                });
819            }
820
821            searched_streams.push(SearchedLogStream {
822                log_stream_name: Some(stream_name.clone()),
823                searched_completely: Some(true),
824            });
825        }
826
827        // Sort globally by timestamp, then truncate to limit.
828        events.sort_by_key(|e| e.timestamp);
829        let has_more = events.len() > limit;
830        events.truncate(limit);
831
832        // If we had to truncate, mark streams whose events were excluded as not fully searched.
833        if has_more {
834            for ss in &mut searched_streams {
835                ss.searched_completely = Some(false);
836            }
837        }
838
839        Ok(FilterLogEventsResponse {
840            events,
841            searched_log_streams: searched_streams,
842            next_token: None,
843        })
844    }
845
846    // -----------------------------------------------------------------------
847    // Phase 1: Retention Policy
848    // -----------------------------------------------------------------------
849
850    pub fn handle_put_retention_policy(
851        &self,
852        input: &PutRetentionPolicyInput,
853    ) -> Result<serde_json::Value, LogsError> {
854        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
855            LogsError::with_message(
856                LogsErrorCode::ResourceNotFoundException,
857                format!(
858                    "The specified log group does not exist: {}",
859                    input.log_group_name
860                ),
861            )
862        })?;
863        group.retention_in_days = Some(input.retention_in_days);
864        Ok(serde_json::json!({}))
865    }
866
867    pub fn handle_delete_retention_policy(
868        &self,
869        input: &DeleteRetentionPolicyInput,
870    ) -> Result<serde_json::Value, LogsError> {
871        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
872            LogsError::with_message(
873                LogsErrorCode::ResourceNotFoundException,
874                format!(
875                    "The specified log group does not exist: {}",
876                    input.log_group_name
877                ),
878            )
879        })?;
880        group.retention_in_days = None;
881        Ok(serde_json::json!({}))
882    }
883
884    // -----------------------------------------------------------------------
885    // Phase 1: Tagging (legacy log group API)
886    // -----------------------------------------------------------------------
887
888    pub fn handle_tag_log_group(
889        &self,
890        input: &TagLogGroupInput,
891    ) -> Result<serde_json::Value, LogsError> {
892        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
893            LogsError::with_message(
894                LogsErrorCode::ResourceNotFoundException,
895                format!(
896                    "The specified log group does not exist: {}",
897                    input.log_group_name
898                ),
899            )
900        })?;
901        group.tags.extend(input.tags.clone());
902        Ok(serde_json::json!({}))
903    }
904
905    pub fn handle_untag_log_group(
906        &self,
907        input: &UntagLogGroupInput,
908    ) -> Result<serde_json::Value, LogsError> {
909        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
910            LogsError::with_message(
911                LogsErrorCode::ResourceNotFoundException,
912                format!(
913                    "The specified log group does not exist: {}",
914                    input.log_group_name
915                ),
916            )
917        })?;
918        for key in &input.tags {
919            group.tags.remove(key);
920        }
921        Ok(serde_json::json!({}))
922    }
923
924    pub fn handle_list_tags_log_group(
925        &self,
926        input: &ListTagsLogGroupInput,
927    ) -> Result<ListTagsLogGroupResponse, LogsError> {
928        let group = self.groups.get(&input.log_group_name).ok_or_else(|| {
929            LogsError::with_message(
930                LogsErrorCode::ResourceNotFoundException,
931                format!(
932                    "The specified log group does not exist: {}",
933                    input.log_group_name
934                ),
935            )
936        })?;
937        Ok(ListTagsLogGroupResponse {
938            tags: group.tags.clone(),
939        })
940    }
941
942    // -----------------------------------------------------------------------
943    // Phase 1: Tagging (ARN-based API)
944    // -----------------------------------------------------------------------
945
946    pub fn handle_tag_resource(
947        &self,
948        input: &TagResourceInput,
949    ) -> Result<serde_json::Value, LogsError> {
950        let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
951        let mut group = self.groups.get_mut(&group_name).ok_or_else(|| {
952            LogsError::with_message(
953                LogsErrorCode::ResourceNotFoundException,
954                format!(
955                    "The specified resource does not exist: {}",
956                    input.resource_arn
957                ),
958            )
959        })?;
960        group.tags.extend(input.tags.clone());
961        Ok(serde_json::json!({}))
962    }
963
964    pub fn handle_untag_resource(
965        &self,
966        input: &UntagResourceInput,
967    ) -> Result<serde_json::Value, LogsError> {
968        let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
969        let mut group = self.groups.get_mut(&group_name).ok_or_else(|| {
970            LogsError::with_message(
971                LogsErrorCode::ResourceNotFoundException,
972                format!(
973                    "The specified resource does not exist: {}",
974                    input.resource_arn
975                ),
976            )
977        })?;
978        for key in &input.tag_keys {
979            group.tags.remove(key);
980        }
981        Ok(serde_json::json!({}))
982    }
983
984    pub fn handle_list_tags_for_resource(
985        &self,
986        input: &ListTagsForResourceInput,
987    ) -> Result<ListTagsForResourceResponse, LogsError> {
988        let group_name = Self::resolve_log_group_name_from_arn(&input.resource_arn)?;
989        let group = self.groups.get(&group_name).ok_or_else(|| {
990            LogsError::with_message(
991                LogsErrorCode::ResourceNotFoundException,
992                format!(
993                    "The specified resource does not exist: {}",
994                    input.resource_arn
995                ),
996            )
997        })?;
998        Ok(ListTagsForResourceResponse {
999            tags: group.tags.clone(),
1000        })
1001    }
1002
1003    fn resolve_log_group_name_from_arn(arn: &str) -> Result<String, LogsError> {
1004        // ARN format: arn:aws:logs:{region}:{account}:log-group:{name}:*
1005        // or:         arn:aws:logs:{region}:{account}:log-group:{name}
1006        let parts: Vec<&str> = arn.split(':').collect();
1007        if parts.len() < 7 || parts[2] != "logs" || parts[5] != "log-group" {
1008            return Err(LogsError::with_message(
1009                LogsErrorCode::InvalidParameterException,
1010                format!("Invalid ARN: {arn}"),
1011            ));
1012        }
1013        let name = parts[6];
1014        // Remove trailing :* if present
1015        Ok(name.to_owned())
1016    }
1017
1018    // -----------------------------------------------------------------------
1019    // Phase 1: Resource Policies
1020    // -----------------------------------------------------------------------
1021
1022    pub fn handle_put_resource_policy(
1023        &self,
1024        input: &PutResourcePolicyInput,
1025    ) -> Result<PutResourcePolicyResponse, LogsError> {
1026        let policy_name = input.policy_name.clone().unwrap_or_default();
1027        let policy_document = input.policy_document.clone().unwrap_or_default();
1028        let now = Self::now_millis();
1029
1030        self.resource_policies.insert(
1031            policy_name.clone(),
1032            ResourcePolicyRecord {
1033                name: policy_name.clone(),
1034                policy_document: policy_document.clone(),
1035                last_updated_time: now,
1036            },
1037        );
1038
1039        Ok(PutResourcePolicyResponse {
1040            resource_policy: Some(ResourcePolicy {
1041                policy_name: Some(policy_name),
1042                policy_document: Some(policy_document),
1043                last_updated_time: Some(now),
1044                ..ResourcePolicy::default()
1045            }),
1046            revision_id: None,
1047        })
1048    }
1049
1050    pub fn handle_delete_resource_policy(
1051        &self,
1052        input: &DeleteResourcePolicyInput,
1053    ) -> Result<serde_json::Value, LogsError> {
1054        let policy_name = input.policy_name.as_deref().unwrap_or("");
1055        self.resource_policies.remove(policy_name).ok_or_else(|| {
1056            LogsError::with_message(
1057                LogsErrorCode::ResourceNotFoundException,
1058                format!("The specified resource policy does not exist: {policy_name}"),
1059            )
1060        })?;
1061        Ok(serde_json::json!({}))
1062    }
1063
1064    pub fn handle_describe_resource_policies(
1065        &self,
1066        _input: &DescribeResourcePoliciesInput,
1067    ) -> Result<DescribeResourcePoliciesResponse, LogsError> {
1068        let policies: Vec<ResourcePolicy> = self
1069            .resource_policies
1070            .iter()
1071            .map(|entry| {
1072                let rp = entry.value();
1073                ResourcePolicy {
1074                    policy_name: Some(rp.name.clone()),
1075                    policy_document: Some(rp.policy_document.clone()),
1076                    last_updated_time: Some(rp.last_updated_time),
1077                    ..ResourcePolicy::default()
1078                }
1079            })
1080            .collect();
1081
1082        Ok(DescribeResourcePoliciesResponse {
1083            resource_policies: policies,
1084            next_token: None,
1085        })
1086    }
1087
1088    // -----------------------------------------------------------------------
1089    // Phase 2: Metric Filters
1090    // -----------------------------------------------------------------------
1091
1092    pub fn handle_put_metric_filter(
1093        &self,
1094        input: &PutMetricFilterInput,
1095    ) -> Result<serde_json::Value, LogsError> {
1096        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1097            LogsError::with_message(
1098                LogsErrorCode::ResourceNotFoundException,
1099                format!(
1100                    "The specified log group does not exist: {}",
1101                    input.log_group_name
1102                ),
1103            )
1104        })?;
1105
1106        let transformations = serde_json::to_value(&input.metric_transformations)
1107            .unwrap_or_else(|_| serde_json::json!([]));
1108
1109        group.metric_filters.insert(
1110            input.filter_name.clone(),
1111            MetricFilterRecord {
1112                name: input.filter_name.clone(),
1113                filter_pattern: input.filter_pattern.clone(),
1114                log_group_name: input.log_group_name.clone(),
1115                metric_transformations: transformations,
1116                creation_time: Self::now_millis(),
1117            },
1118        );
1119
1120        Ok(serde_json::json!({}))
1121    }
1122
1123    pub fn handle_delete_metric_filter(
1124        &self,
1125        input: &DeleteMetricFilterInput,
1126    ) -> Result<serde_json::Value, LogsError> {
1127        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1128            LogsError::with_message(
1129                LogsErrorCode::ResourceNotFoundException,
1130                format!(
1131                    "The specified log group does not exist: {}",
1132                    input.log_group_name
1133                ),
1134            )
1135        })?;
1136
1137        group
1138            .metric_filters
1139            .remove(&input.filter_name)
1140            .ok_or_else(|| {
1141                LogsError::with_message(
1142                    LogsErrorCode::ResourceNotFoundException,
1143                    format!(
1144                        "The specified metric filter does not exist: {}",
1145                        input.filter_name
1146                    ),
1147                )
1148            })?;
1149
1150        Ok(serde_json::json!({}))
1151    }
1152
1153    pub fn handle_describe_metric_filters(
1154        &self,
1155        input: &DescribeMetricFiltersInput,
1156    ) -> Result<DescribeMetricFiltersResponse, LogsError> {
1157        let mut filters: Vec<MetricFilter> = Vec::new();
1158
1159        if let Some(ref group_name) = input.log_group_name {
1160            let group = self.groups.get(group_name).ok_or_else(|| {
1161                LogsError::with_message(
1162                    LogsErrorCode::ResourceNotFoundException,
1163                    format!("The specified log group does not exist: {group_name}"),
1164                )
1165            })?;
1166
1167            for mf in group.metric_filters.values() {
1168                if let Some(ref prefix) = input.filter_name_prefix {
1169                    if !mf.name.starts_with(prefix.as_str()) {
1170                        continue;
1171                    }
1172                }
1173                let transformations: Vec<rustack_logs_model::types::MetricTransformation> =
1174                    serde_json::from_value(mf.metric_transformations.clone()).unwrap_or_default();
1175                filters.push(MetricFilter {
1176                    filter_name: Some(mf.name.clone()),
1177                    filter_pattern: Some(mf.filter_pattern.clone()),
1178                    log_group_name: Some(mf.log_group_name.clone()),
1179                    metric_transformations: transformations,
1180                    creation_time: Some(mf.creation_time),
1181                    ..MetricFilter::default()
1182                });
1183            }
1184        }
1185
1186        Ok(DescribeMetricFiltersResponse {
1187            metric_filters: filters,
1188            next_token: None,
1189        })
1190    }
1191
1192    pub fn handle_test_metric_filter(
1193        &self,
1194        input: &TestMetricFilterInput,
1195    ) -> Result<TestMetricFilterResponse, LogsError> {
1196        // Simple implementation: if pattern is empty, match all; otherwise substring match
1197        let matches: Vec<MetricFilterMatchRecord> = input
1198            .log_event_messages
1199            .iter()
1200            .enumerate()
1201            .filter(|(_, msg)| {
1202                input.filter_pattern.is_empty() || msg.contains(&input.filter_pattern)
1203            })
1204            .map(|(i, msg)| MetricFilterMatchRecord {
1205                event_number: Some(i64::try_from(i + 1).unwrap_or(0)),
1206                event_message: Some(msg.clone()),
1207                extracted_values: HashMap::new(),
1208            })
1209            .collect();
1210
1211        Ok(TestMetricFilterResponse { matches })
1212    }
1213
1214    // -----------------------------------------------------------------------
1215    // Phase 2: Subscription Filters
1216    // -----------------------------------------------------------------------
1217
1218    pub fn handle_put_subscription_filter(
1219        &self,
1220        input: &PutSubscriptionFilterInput,
1221    ) -> Result<serde_json::Value, LogsError> {
1222        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1223            LogsError::with_message(
1224                LogsErrorCode::ResourceNotFoundException,
1225                format!(
1226                    "The specified log group does not exist: {}",
1227                    input.log_group_name
1228                ),
1229            )
1230        })?;
1231
1232        group.subscription_filters.insert(
1233            input.filter_name.clone(),
1234            SubscriptionFilterRecord {
1235                name: input.filter_name.clone(),
1236                filter_pattern: input.filter_pattern.clone(),
1237                log_group_name: input.log_group_name.clone(),
1238                destination_arn: input.destination_arn.clone(),
1239                role_arn: input.role_arn.clone(),
1240                distribution: input.distribution.as_ref().map(|d| d.as_str().to_owned()),
1241                creation_time: Self::now_millis(),
1242            },
1243        );
1244
1245        Ok(serde_json::json!({}))
1246    }
1247
1248    pub fn handle_delete_subscription_filter(
1249        &self,
1250        input: &DeleteSubscriptionFilterInput,
1251    ) -> Result<serde_json::Value, LogsError> {
1252        let mut group = self.groups.get_mut(&input.log_group_name).ok_or_else(|| {
1253            LogsError::with_message(
1254                LogsErrorCode::ResourceNotFoundException,
1255                format!(
1256                    "The specified log group does not exist: {}",
1257                    input.log_group_name
1258                ),
1259            )
1260        })?;
1261
1262        group
1263            .subscription_filters
1264            .remove(&input.filter_name)
1265            .ok_or_else(|| {
1266                LogsError::with_message(
1267                    LogsErrorCode::ResourceNotFoundException,
1268                    format!(
1269                        "The specified subscription filter does not exist: {}",
1270                        input.filter_name,
1271                    ),
1272                )
1273            })?;
1274
1275        Ok(serde_json::json!({}))
1276    }
1277
1278    pub fn handle_describe_subscription_filters(
1279        &self,
1280        input: &DescribeSubscriptionFiltersInput,
1281    ) -> Result<DescribeSubscriptionFiltersResponse, LogsError> {
1282        let group = self.groups.get(&input.log_group_name).ok_or_else(|| {
1283            LogsError::with_message(
1284                LogsErrorCode::ResourceNotFoundException,
1285                format!(
1286                    "The specified log group does not exist: {}",
1287                    input.log_group_name,
1288                ),
1289            )
1290        })?;
1291
1292        let filters: Vec<SubscriptionFilter> = group
1293            .subscription_filters
1294            .values()
1295            .filter(|sf| {
1296                if let Some(ref prefix) = input.filter_name_prefix {
1297                    sf.name.starts_with(prefix.as_str())
1298                } else {
1299                    true
1300                }
1301            })
1302            .map(|sf| SubscriptionFilter {
1303                filter_name: Some(sf.name.clone()),
1304                filter_pattern: Some(sf.filter_pattern.clone()),
1305                log_group_name: Some(sf.log_group_name.clone()),
1306                destination_arn: Some(sf.destination_arn.clone()),
1307                role_arn: sf.role_arn.clone(),
1308                distribution: sf
1309                    .distribution
1310                    .as_ref()
1311                    .map(|d| rustack_logs_model::types::Distribution::from(d.as_str())),
1312                creation_time: Some(sf.creation_time),
1313                ..SubscriptionFilter::default()
1314            })
1315            .collect();
1316
1317        Ok(DescribeSubscriptionFiltersResponse {
1318            subscription_filters: filters,
1319            next_token: None,
1320        })
1321    }
1322
1323    // -----------------------------------------------------------------------
1324    // Phase 3: Destinations
1325    // -----------------------------------------------------------------------
1326
1327    pub fn handle_put_destination(
1328        &self,
1329        input: &PutDestinationInput,
1330    ) -> Result<PutDestinationResponse, LogsError> {
1331        let arn = self.destination_arn(&input.destination_name);
1332        let now = Self::now_millis();
1333
1334        self.destinations.insert(
1335            input.destination_name.clone(),
1336            DestinationRecord {
1337                name: input.destination_name.clone(),
1338                target_arn: input.target_arn.clone(),
1339                role_arn: input.role_arn.clone(),
1340                access_policy: None,
1341                arn: arn.clone(),
1342                creation_time: now,
1343            },
1344        );
1345
1346        Ok(PutDestinationResponse {
1347            destination: Some(Destination {
1348                destination_name: Some(input.destination_name.clone()),
1349                target_arn: Some(input.target_arn.clone()),
1350                role_arn: Some(input.role_arn.clone()),
1351                arn: Some(arn),
1352                creation_time: Some(now),
1353                access_policy: None,
1354            }),
1355        })
1356    }
1357
1358    pub fn handle_put_destination_policy(
1359        &self,
1360        input: &PutDestinationPolicyInput,
1361    ) -> Result<serde_json::Value, LogsError> {
1362        let mut dest = self
1363            .destinations
1364            .get_mut(&input.destination_name)
1365            .ok_or_else(|| {
1366                LogsError::with_message(
1367                    LogsErrorCode::ResourceNotFoundException,
1368                    format!(
1369                        "The specified destination does not exist: {}",
1370                        input.destination_name,
1371                    ),
1372                )
1373            })?;
1374
1375        dest.access_policy = Some(input.access_policy.clone());
1376        Ok(serde_json::json!({}))
1377    }
1378
1379    pub fn handle_delete_destination(
1380        &self,
1381        input: &DeleteDestinationInput,
1382    ) -> Result<serde_json::Value, LogsError> {
1383        self.destinations
1384            .remove(&input.destination_name)
1385            .ok_or_else(|| {
1386                LogsError::with_message(
1387                    LogsErrorCode::ResourceNotFoundException,
1388                    format!(
1389                        "The specified destination does not exist: {}",
1390                        input.destination_name,
1391                    ),
1392                )
1393            })?;
1394        Ok(serde_json::json!({}))
1395    }
1396
1397    pub fn handle_describe_destinations(
1398        &self,
1399        input: &DescribeDestinationsInput,
1400    ) -> Result<DescribeDestinationsResponse, LogsError> {
1401        let destinations: Vec<Destination> = self
1402            .destinations
1403            .iter()
1404            .filter(|entry| {
1405                if let Some(ref prefix) = input.destination_name_prefix {
1406                    entry.value().name.starts_with(prefix.as_str())
1407                } else {
1408                    true
1409                }
1410            })
1411            .map(|entry| {
1412                let d = entry.value();
1413                Destination {
1414                    destination_name: Some(d.name.clone()),
1415                    target_arn: Some(d.target_arn.clone()),
1416                    role_arn: Some(d.role_arn.clone()),
1417                    access_policy: d.access_policy.clone(),
1418                    arn: Some(d.arn.clone()),
1419                    creation_time: Some(d.creation_time),
1420                }
1421            })
1422            .collect();
1423
1424        Ok(DescribeDestinationsResponse {
1425            destinations,
1426            next_token: None,
1427        })
1428    }
1429
1430    // -----------------------------------------------------------------------
1431    // Phase 3: Query operations (stubs)
1432    // -----------------------------------------------------------------------
1433
1434    pub fn handle_start_query(
1435        &self,
1436        _input: &StartQueryInput,
1437    ) -> Result<StartQueryResponse, LogsError> {
1438        let query_id = uuid::Uuid::new_v4().to_string();
1439        Ok(StartQueryResponse {
1440            query_id: Some(query_id),
1441        })
1442    }
1443
1444    pub fn handle_stop_query(
1445        &self,
1446        _input: &StopQueryInput,
1447    ) -> Result<StopQueryResponse, LogsError> {
1448        Ok(StopQueryResponse {
1449            success: Some(true),
1450        })
1451    }
1452
1453    pub fn handle_get_query_results(
1454        &self,
1455        _input: &GetQueryResultsInput,
1456    ) -> Result<GetQueryResultsResponse, LogsError> {
1457        Ok(GetQueryResultsResponse {
1458            results: Vec::new(),
1459            statistics: Some(QueryStatistics {
1460                records_matched: Some(0.0),
1461                records_scanned: Some(0.0),
1462                bytes_scanned: Some(0.0),
1463                ..QueryStatistics::default()
1464            }),
1465            status: Some(QueryStatus::Complete),
1466            ..GetQueryResultsResponse::default()
1467        })
1468    }
1469
1470    pub fn handle_describe_queries(
1471        &self,
1472        _input: &DescribeQueriesInput,
1473    ) -> Result<DescribeQueriesResponse, LogsError> {
1474        Ok(DescribeQueriesResponse {
1475            queries: Vec::new(),
1476            next_token: None,
1477        })
1478    }
1479
1480    // -----------------------------------------------------------------------
1481    // Phase 3: Query Definitions
1482    // -----------------------------------------------------------------------
1483
1484    pub fn handle_put_query_definition(
1485        &self,
1486        input: &PutQueryDefinitionInput,
1487    ) -> Result<PutQueryDefinitionResponse, LogsError> {
1488        let id = input
1489            .query_definition_id
1490            .clone()
1491            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1492
1493        self.query_definitions.insert(
1494            id.clone(),
1495            QueryDefinitionRecord {
1496                query_definition_id: id.clone(),
1497                name: input.name.clone(),
1498                query_string: input.query_string.clone(),
1499                log_group_names: input.log_group_names.clone(),
1500                last_modified: Self::now_millis(),
1501            },
1502        );
1503
1504        Ok(PutQueryDefinitionResponse {
1505            query_definition_id: Some(id),
1506        })
1507    }
1508
1509    pub fn handle_delete_query_definition(
1510        &self,
1511        input: &DeleteQueryDefinitionInput,
1512    ) -> Result<rustack_logs_model::output::DeleteQueryDefinitionResponse, LogsError> {
1513        let removed = self
1514            .query_definitions
1515            .remove(&input.query_definition_id)
1516            .is_some();
1517        Ok(rustack_logs_model::output::DeleteQueryDefinitionResponse {
1518            success: Some(removed),
1519        })
1520    }
1521
1522    pub fn handle_describe_query_definitions(
1523        &self,
1524        input: &DescribeQueryDefinitionsInput,
1525    ) -> Result<DescribeQueryDefinitionsResponse, LogsError> {
1526        let defs: Vec<QueryDefinition> = self
1527            .query_definitions
1528            .iter()
1529            .filter(|entry| {
1530                if let Some(ref prefix) = input.query_definition_name_prefix {
1531                    entry.value().name.starts_with(prefix.as_str())
1532                } else {
1533                    true
1534                }
1535            })
1536            .map(|entry| {
1537                let qd = entry.value();
1538                QueryDefinition {
1539                    query_definition_id: Some(qd.query_definition_id.clone()),
1540                    name: Some(qd.name.clone()),
1541                    query_string: Some(qd.query_string.clone()),
1542                    log_group_names: qd.log_group_names.clone(),
1543                    last_modified: Some(qd.last_modified),
1544                    ..QueryDefinition::default()
1545                }
1546            })
1547            .collect();
1548
1549        Ok(DescribeQueryDefinitionsResponse {
1550            query_definitions: defs,
1551            next_token: None,
1552        })
1553    }
1554
1555    // -----------------------------------------------------------------------
1556    // Phase 3: KMS Key Association
1557    // -----------------------------------------------------------------------
1558
1559    pub fn handle_associate_kms_key(
1560        &self,
1561        input: &AssociateKmsKeyInput,
1562    ) -> Result<serde_json::Value, LogsError> {
1563        if let Some(ref group_name) = input.log_group_name {
1564            let mut group = self.groups.get_mut(group_name).ok_or_else(|| {
1565                LogsError::with_message(
1566                    LogsErrorCode::ResourceNotFoundException,
1567                    format!("The specified log group does not exist: {group_name}"),
1568                )
1569            })?;
1570            group.kms_key_id = Some(input.kms_key_id.clone());
1571        }
1572        Ok(serde_json::json!({}))
1573    }
1574
1575    pub fn handle_disassociate_kms_key(
1576        &self,
1577        input: &DisassociateKmsKeyInput,
1578    ) -> Result<serde_json::Value, LogsError> {
1579        if let Some(ref group_name) = input.log_group_name {
1580            let mut group = self.groups.get_mut(group_name).ok_or_else(|| {
1581                LogsError::with_message(
1582                    LogsErrorCode::ResourceNotFoundException,
1583                    format!("The specified log group does not exist: {group_name}"),
1584                )
1585            })?;
1586            group.kms_key_id = None;
1587        }
1588        Ok(serde_json::json!({}))
1589    }
1590}