Skip to main content

fakecloud_logs/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
7use fakecloud_core::validation::*;
8
9use crate::state::{
10    AccountPolicy, AnomalyDetector, DataProtectionPolicy, Delivery, DeliveryDestination,
11    DeliverySource, Destination, ExportTask, ImportTask, IndexPolicy, Integration, LogEvent,
12    LogGroup, LogStream, LookupTable, MetricFilter, MetricTransformation, QueryDefinition,
13    QueryInfo, ResourcePolicy, ScheduledQuery, SharedLogsState, SubscriptionFilter, Transformer,
14};
15
16pub struct LogsService {
17    state: SharedLogsState,
18}
19
20impl LogsService {
21    pub fn new(state: SharedLogsState) -> Self {
22        Self { state }
23    }
24}
25
26#[async_trait]
27impl AwsService for LogsService {
28    fn service_name(&self) -> &str {
29        "logs"
30    }
31
32    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
33        match req.action.as_str() {
34            "CreateLogGroup" => self.create_log_group(&req),
35            "DeleteLogGroup" => self.delete_log_group(&req),
36            "DescribeLogGroups" => self.describe_log_groups(&req),
37            "CreateLogStream" => self.create_log_stream(&req),
38            "DeleteLogStream" => self.delete_log_stream(&req),
39            "DescribeLogStreams" => self.describe_log_streams(&req),
40            "PutLogEvents" => self.put_log_events(&req),
41            "GetLogEvents" => self.get_log_events(&req),
42            "FilterLogEvents" => self.filter_log_events(&req),
43            "TagLogGroup" => self.tag_log_group(&req),
44            "UntagLogGroup" => self.untag_log_group(&req),
45            "ListTagsLogGroup" => self.list_tags_log_group(&req),
46            "TagResource" => self.tag_resource(&req),
47            "UntagResource" => self.untag_resource(&req),
48            "ListTagsForResource" => self.list_tags_for_resource(&req),
49            "PutRetentionPolicy" => self.put_retention_policy(&req),
50            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
51            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
52            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
53            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
54            "PutMetricFilter" => self.put_metric_filter(&req),
55            "DescribeMetricFilters" => self.describe_metric_filters(&req),
56            "DeleteMetricFilter" => self.delete_metric_filter(&req),
57            "PutResourcePolicy" => self.put_resource_policy(&req),
58            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
59            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
60            "PutDestination" => self.put_destination(&req),
61            "DescribeDestinations" => self.describe_destinations(&req),
62            "DeleteDestination" => self.delete_destination(&req),
63            "PutDestinationPolicy" => self.put_destination_policy(&req),
64            "StartQuery" => self.start_query(&req),
65            "GetQueryResults" => self.get_query_results(&req),
66            "DescribeQueries" => self.describe_queries(&req),
67            "CreateExportTask" => self.create_export_task(&req),
68            "DescribeExportTasks" => self.describe_export_tasks(&req),
69            "CancelExportTask" => self.cancel_export_task(&req),
70            "PutDeliveryDestination" => self.put_delivery_destination(&req),
71            "GetDeliveryDestination" => self.get_delivery_destination(&req),
72            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
73            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
74            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
75            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
76            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
77            "PutDeliverySource" => self.put_delivery_source(&req),
78            "GetDeliverySource" => self.get_delivery_source(&req),
79            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
80            "DeleteDeliverySource" => self.delete_delivery_source(&req),
81            "CreateDelivery" => self.create_delivery(&req),
82            "GetDelivery" => self.get_delivery(&req),
83            "DescribeDeliveries" => self.describe_deliveries(&req),
84            "DeleteDelivery" => self.delete_delivery(&req),
85            "AssociateKmsKey" => self.associate_kms_key(&req),
86            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
87            "PutQueryDefinition" => self.put_query_definition(&req),
88            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
89            "DeleteQueryDefinition" => self.delete_query_definition(&req),
90            "PutAccountPolicy" => self.put_account_policy(&req),
91            "DescribeAccountPolicies" => self.describe_account_policies(&req),
92            "DeleteAccountPolicy" => self.delete_account_policy(&req),
93            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
94            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
95            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
96            "PutIndexPolicy" => self.put_index_policy(&req),
97            "DescribeIndexPolicies" => self.describe_index_policies(&req),
98            "DeleteIndexPolicy" => self.delete_index_policy(&req),
99            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
100            "PutTransformer" => self.put_transformer(&req),
101            "GetTransformer" => self.get_transformer(&req),
102            "DeleteTransformer" => self.delete_transformer(&req),
103            "TestTransformer" => self.test_transformer(&req),
104            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
105            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
106            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
107            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
108            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
109            "GetLogGroupFields" => self.get_log_group_fields(&req),
110            "TestMetricFilter" => self.test_metric_filter(&req),
111            "StopQuery" => self.stop_query(&req),
112            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
113            "GetLogRecord" => self.get_log_record(&req),
114            "ListAnomalies" => self.list_anomalies(&req),
115            "UpdateAnomaly" => self.update_anomaly(&req),
116            "CreateImportTask" => self.create_import_task(&req),
117            "DescribeImportTasks" => self.describe_import_tasks(&req),
118            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
119            "CancelImportTask" => self.cancel_import_task(&req),
120            "PutIntegration" => self.put_integration(&req),
121            "GetIntegration" => self.get_integration(&req),
122            "DeleteIntegration" => self.delete_integration(&req),
123            "ListIntegrations" => self.list_integrations(&req),
124            "CreateLookupTable" => self.create_lookup_table(&req),
125            "GetLookupTable" => self.get_lookup_table(&req),
126            "DescribeLookupTables" => self.describe_lookup_tables(&req),
127            "DeleteLookupTable" => self.delete_lookup_table(&req),
128            "UpdateLookupTable" => self.update_lookup_table(&req),
129            "CreateScheduledQuery" => self.create_scheduled_query(&req),
130            "GetScheduledQuery" => self.get_scheduled_query(&req),
131            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
132            "ListScheduledQueries" => self.list_scheduled_queries(&req),
133            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
134            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
135            "StartLiveTail" => self.start_live_tail(&req),
136            "ListLogGroups" => self.describe_log_groups(&req),
137            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
138            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
139            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
140            "GetLogObject" => self.get_log_object(&req),
141            "GetLogFields" => self.get_log_fields(&req),
142            "AssociateSourceToS3TableIntegration" => {
143                self.associate_source_to_s3_table_integration(&req)
144            }
145            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
146            "DisassociateSourceFromS3TableIntegration" => {
147                self.disassociate_source_from_s3_table_integration(&req)
148            }
149            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
150            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
151            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
152        }
153    }
154
155    fn supported_actions(&self) -> &[&str] {
156        &[
157            "CreateLogGroup",
158            "DeleteLogGroup",
159            "DescribeLogGroups",
160            "CreateLogStream",
161            "DeleteLogStream",
162            "DescribeLogStreams",
163            "PutLogEvents",
164            "GetLogEvents",
165            "FilterLogEvents",
166            "TagLogGroup",
167            "UntagLogGroup",
168            "ListTagsLogGroup",
169            "TagResource",
170            "UntagResource",
171            "ListTagsForResource",
172            "PutRetentionPolicy",
173            "DeleteRetentionPolicy",
174            "PutSubscriptionFilter",
175            "DescribeSubscriptionFilters",
176            "DeleteSubscriptionFilter",
177            "PutMetricFilter",
178            "DescribeMetricFilters",
179            "DeleteMetricFilter",
180            "PutResourcePolicy",
181            "DescribeResourcePolicies",
182            "DeleteResourcePolicy",
183            "PutDestination",
184            "DescribeDestinations",
185            "DeleteDestination",
186            "PutDestinationPolicy",
187            "StartQuery",
188            "GetQueryResults",
189            "DescribeQueries",
190            "CreateExportTask",
191            "DescribeExportTasks",
192            "CancelExportTask",
193            "PutDeliveryDestination",
194            "GetDeliveryDestination",
195            "DescribeDeliveryDestinations",
196            "DeleteDeliveryDestination",
197            "PutDeliveryDestinationPolicy",
198            "GetDeliveryDestinationPolicy",
199            "DeleteDeliveryDestinationPolicy",
200            "PutDeliverySource",
201            "GetDeliverySource",
202            "DescribeDeliverySources",
203            "DeleteDeliverySource",
204            "CreateDelivery",
205            "GetDelivery",
206            "DescribeDeliveries",
207            "DeleteDelivery",
208            "AssociateKmsKey",
209            "DisassociateKmsKey",
210            "PutQueryDefinition",
211            "DescribeQueryDefinitions",
212            "DeleteQueryDefinition",
213            "PutAccountPolicy",
214            "DescribeAccountPolicies",
215            "DeleteAccountPolicy",
216            "PutDataProtectionPolicy",
217            "GetDataProtectionPolicy",
218            "DeleteDataProtectionPolicy",
219            "PutIndexPolicy",
220            "DescribeIndexPolicies",
221            "DeleteIndexPolicy",
222            "DescribeFieldIndexes",
223            "PutTransformer",
224            "GetTransformer",
225            "DeleteTransformer",
226            "TestTransformer",
227            "CreateLogAnomalyDetector",
228            "GetLogAnomalyDetector",
229            "DeleteLogAnomalyDetector",
230            "ListLogAnomalyDetectors",
231            "UpdateLogAnomalyDetector",
232            "GetLogGroupFields",
233            "TestMetricFilter",
234            "StopQuery",
235            "PutLogGroupDeletionProtection",
236            "GetLogRecord",
237            "ListAnomalies",
238            "UpdateAnomaly",
239            "CreateImportTask",
240            "DescribeImportTasks",
241            "DescribeImportTaskBatches",
242            "CancelImportTask",
243            "PutIntegration",
244            "GetIntegration",
245            "DeleteIntegration",
246            "ListIntegrations",
247            "CreateLookupTable",
248            "GetLookupTable",
249            "DescribeLookupTables",
250            "DeleteLookupTable",
251            "UpdateLookupTable",
252            "CreateScheduledQuery",
253            "GetScheduledQuery",
254            "GetScheduledQueryHistory",
255            "ListScheduledQueries",
256            "DeleteScheduledQuery",
257            "UpdateScheduledQuery",
258            "StartLiveTail",
259            "ListLogGroups",
260            "ListLogGroupsForQuery",
261            "ListAggregateLogGroupSummaries",
262            "PutBearerTokenAuthentication",
263            "GetLogObject",
264            "GetLogFields",
265            "AssociateSourceToS3TableIntegration",
266            "ListSourcesForS3TableIntegration",
267            "DisassociateSourceFromS3TableIntegration",
268            "UpdateDeliveryConfiguration",
269            "DescribeConfigurationTemplates",
270        ]
271    }
272}
273
274fn body_json(req: &AwsRequest) -> Value {
275    serde_json::from_slice(&req.body).unwrap_or(Value::Null)
276}
277
278fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
279    body[field].as_str().ok_or_else(|| {
280        AwsServiceError::aws_error(
281            StatusCode::BAD_REQUEST,
282            "InvalidParameterException",
283            format!("{field} is required"),
284        )
285    })
286}
287
288/// Build a delivery destination configuration JSON object, ensuring
289/// `destinationResourceArn` is always present as a string (Smithy requirement).
290fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
291    let mut m: serde_json::Map<String, Value> =
292        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
293    m.entry("destinationResourceArn".to_string())
294        .or_insert_with(|| json!(""));
295    Value::Object(m)
296}
297
298fn generate_sequence_token() -> String {
299    use std::time::{SystemTime, UNIX_EPOCH};
300    let nanos = SystemTime::now()
301        .duration_since(UNIX_EPOCH)
302        .unwrap_or_default()
303        .as_nanos();
304    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
305    format!("{:038}", nanos % 10u128.pow(38))
306}
307
308fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
309    AwsServiceError::aws_error(
310        StatusCode::BAD_REQUEST,
311        "InvalidParameterException",
312        format!(
313            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
314        ),
315    )
316}
317
318impl LogsService {
319    // ---- Log Groups ----
320
321    fn create_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
322        let body = body_json(req);
323        let name = body["logGroupName"]
324            .as_str()
325            .ok_or_else(|| {
326                AwsServiceError::aws_error(
327                    StatusCode::BAD_REQUEST,
328                    "InvalidParameterException",
329                    "logGroupName is required",
330                )
331            })?
332            .to_string();
333
334        validate_string_length("logGroupName", &name, 1, 512)?;
335        validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 1, 256)?;
336
337        let mut state = self.state.write();
338        if state.log_groups.contains_key(&name) {
339            return Err(AwsServiceError::aws_error(
340                StatusCode::BAD_REQUEST,
341                "ResourceAlreadyExistsException",
342                format!("The specified log group already exists: {name}"),
343            ));
344        }
345
346        let arn = format!(
347            "arn:aws:logs:{}:{}:log-group:{}:*",
348            state.region, state.account_id, name
349        );
350        let now = Utc::now().timestamp_millis();
351
352        let tags = body["tags"]
353            .as_object()
354            .map(|m| {
355                m.iter()
356                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
357                    .collect()
358            })
359            .unwrap_or_default();
360
361        let kms_key_id = body["kmsKeyId"].as_str().map(|s| s.to_string());
362
363        state.log_groups.insert(
364            name.clone(),
365            LogGroup {
366                name,
367                arn,
368                creation_time: now,
369                retention_in_days: None,
370                kms_key_id,
371                tags,
372                log_streams: std::collections::HashMap::new(),
373                stored_bytes: 0,
374                subscription_filters: Vec::new(),
375                data_protection_policy: None,
376                index_policies: Vec::new(),
377                transformer: None,
378                deletion_protection: false,
379            },
380        );
381
382        Ok(AwsResponse::json(StatusCode::OK, "{}"))
383    }
384
385    fn delete_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
386        let body = body_json(req);
387        let name = body["logGroupName"].as_str().ok_or_else(|| {
388            AwsServiceError::aws_error(
389                StatusCode::BAD_REQUEST,
390                "InvalidParameterException",
391                "logGroupName is required",
392            )
393        })?;
394
395        validate_string_length("logGroupName", name, 1, 512)?;
396
397        let mut state = self.state.write();
398        // Check deletion protection
399        if let Some(group) = state.log_groups.get(name) {
400            if group.deletion_protection {
401                return Err(AwsServiceError::aws_error(
402                    StatusCode::BAD_REQUEST,
403                    "OperationAbortedException",
404                    format!("Log group {name} has deletion protection enabled"),
405                ));
406            }
407        }
408        if state.log_groups.remove(name).is_none() {
409            return Err(AwsServiceError::aws_error(
410                StatusCode::BAD_REQUEST,
411                "ResourceNotFoundException",
412                format!("The specified log group does not exist: {name}"),
413            ));
414        }
415
416        Ok(AwsResponse::json(StatusCode::OK, "{}"))
417    }
418
419    fn describe_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420        let body = body_json(req);
421        let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
422        let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
423        let limit = body["limit"].as_i64().unwrap_or(50) as usize;
424        let next_token = body["nextToken"].as_str();
425
426        validate_optional_string_length(
427            "logGroupNamePrefix",
428            body["logGroupNamePrefix"].as_str(),
429            1,
430            512,
431        )?;
432        validate_optional_string_length(
433            "logGroupNamePattern",
434            body["logGroupNamePattern"].as_str(),
435            0,
436            512,
437        )?;
438        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
439        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
440        validate_optional_enum_value(
441            "logGroupClass",
442            &body["logGroupClass"],
443            &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
444        )?;
445
446        let state = self.state.read();
447        let mut groups: Vec<&LogGroup> = state
448            .log_groups
449            .values()
450            .filter(|g| {
451                (prefix.is_empty() || g.name.starts_with(prefix))
452                    && (pattern.is_empty() || g.name.contains(pattern))
453            })
454            .collect();
455        groups.sort_by(|a, b| a.name.cmp(&b.name));
456
457        // Handle pagination
458        let start_idx = if let Some(token) = next_token {
459            groups
460                .iter()
461                .position(|g| g.name.as_str() > token)
462                .unwrap_or(groups.len())
463        } else {
464            0
465        };
466
467        let page = &groups[start_idx..];
468        let has_more = page.len() > limit;
469        let page = if has_more { &page[..limit] } else { page };
470
471        let log_groups: Vec<Value> = page
472            .iter()
473            .map(|g| {
474                let log_group_arn = g.arn.trim_end_matches(":*").to_string();
475                let mut obj = json!({
476                    "logGroupName": g.name,
477                    "arn": g.arn,
478                    "logGroupArn": log_group_arn,
479                    "creationTime": g.creation_time,
480                    "storedBytes": g.stored_bytes,
481                    "metricFilterCount": 0,
482                });
483                if let Some(days) = g.retention_in_days {
484                    obj["retentionInDays"] = json!(days);
485                }
486                if let Some(ref kms) = g.kms_key_id {
487                    obj["kmsKeyId"] = json!(kms);
488                }
489                obj
490            })
491            .collect();
492
493        let mut result = json!({ "logGroups": log_groups });
494        if has_more {
495            if let Some(last) = page.last() {
496                result["nextToken"] = json!(last.name);
497            }
498        }
499
500        Ok(AwsResponse::json(
501            StatusCode::OK,
502            serde_json::to_string(&result).unwrap(),
503        ))
504    }
505
506    // ---- Log Streams ----
507
508    fn create_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509        let body = body_json(req);
510        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
511            AwsServiceError::aws_error(
512                StatusCode::BAD_REQUEST,
513                "InvalidParameterException",
514                "logGroupName is required",
515            )
516        })?;
517        let stream_name = body["logStreamName"]
518            .as_str()
519            .ok_or_else(|| {
520                AwsServiceError::aws_error(
521                    StatusCode::BAD_REQUEST,
522                    "InvalidParameterException",
523                    "logStreamName is required",
524                )
525            })?
526            .to_string();
527
528        validate_string_length("logGroupName", group_name, 1, 512)?;
529        validate_string_length("logStreamName", &stream_name, 1, 512)?;
530
531        let mut state = self.state.write();
532        let region = state.region.clone();
533        let account_id = state.account_id.clone();
534
535        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
536            AwsServiceError::aws_error(
537                StatusCode::BAD_REQUEST,
538                "ResourceNotFoundException",
539                format!("The specified log group does not exist: {group_name}"),
540            )
541        })?;
542
543        if group.log_streams.contains_key(&stream_name) {
544            return Err(AwsServiceError::aws_error(
545                StatusCode::BAD_REQUEST,
546                "ResourceAlreadyExistsException",
547                format!("The specified log stream already exists: {stream_name}"),
548            ));
549        }
550
551        let arn = format!(
552            "arn:aws:logs:{region}:{account_id}:log-group:{group_name}:log-stream:{stream_name}",
553        );
554        let now = Utc::now().timestamp_millis();
555
556        group.log_streams.insert(
557            stream_name.clone(),
558            LogStream {
559                name: stream_name,
560                arn,
561                creation_time: now,
562                first_event_timestamp: None,
563                last_event_timestamp: None,
564                last_ingestion_time: None,
565                upload_sequence_token: generate_sequence_token(),
566                events: Vec::new(),
567            },
568        );
569
570        Ok(AwsResponse::json(StatusCode::OK, "{}"))
571    }
572
573    fn delete_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574        let body = body_json(req);
575        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
576            AwsServiceError::aws_error(
577                StatusCode::BAD_REQUEST,
578                "InvalidParameterException",
579                "logGroupName is required",
580            )
581        })?;
582        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
583            AwsServiceError::aws_error(
584                StatusCode::BAD_REQUEST,
585                "InvalidParameterException",
586                "logStreamName is required",
587            )
588        })?;
589
590        validate_string_length("logGroupName", group_name, 1, 512)?;
591        validate_string_length("logStreamName", stream_name, 1, 512)?;
592
593        let mut state = self.state.write();
594        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
595            AwsServiceError::aws_error(
596                StatusCode::BAD_REQUEST,
597                "ResourceNotFoundException",
598                format!("The specified log group does not exist: {group_name}"),
599            )
600        })?;
601
602        if group.log_streams.remove(stream_name).is_none() {
603            return Err(AwsServiceError::aws_error(
604                StatusCode::BAD_REQUEST,
605                "ResourceNotFoundException",
606                format!("The specified log stream does not exist: {stream_name}"),
607            ));
608        }
609
610        Ok(AwsResponse::json(StatusCode::OK, "{}"))
611    }
612
613    fn describe_log_streams(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
614        let body = body_json(req);
615
616        // Support both logGroupName and logGroupIdentifier
617        let group_name = if let Some(name) = body["logGroupName"].as_str() {
618            name.to_string()
619        } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
620            // Validate: must not end with :*
621            if identifier.ends_with(":*") {
622                return Err(AwsServiceError::aws_error(
623                    StatusCode::BAD_REQUEST,
624                    "InvalidParameterException",
625                    format!(
626                        "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
627                        identifier
628                    ),
629                ));
630            }
631            // If it's an ARN, extract the log group name
632            if identifier.starts_with("arn:aws:logs:") {
633                extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
634            } else {
635                identifier.to_string()
636            }
637        } else {
638            return Err(AwsServiceError::aws_error(
639                StatusCode::BAD_REQUEST,
640                "InvalidParameterException",
641                "logGroupName is required",
642            ));
643        };
644
645        let prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
646        let limit = body["limit"].as_i64().unwrap_or(50) as usize;
647        let order_by = body["orderBy"].as_str().unwrap_or("LogStreamName");
648        let next_token = body["nextToken"].as_str();
649
650        validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
651        validate_optional_string_length(
652            "logGroupIdentifier",
653            body["logGroupIdentifier"].as_str(),
654            1,
655            2048,
656        )?;
657        validate_optional_string_length(
658            "logStreamNamePrefix",
659            body["logStreamNamePrefix"].as_str(),
660            1,
661            512,
662        )?;
663
664        // Validate limit
665        if limit > 50 {
666            return Err(validation_error(
667                "limit",
668                &limit.to_string(),
669                "Member must have value less than or equal to 50",
670            ));
671        }
672
673        // Validate orderBy
674        if order_by != "LogStreamName" && order_by != "LastEventTime" {
675            return Err(validation_error(
676                "orderBy",
677                order_by,
678                "Member must satisfy enum value set: [LogStreamName, LastEventTime]",
679            ));
680        }
681
682        // Cannot use prefix with LastEventTime ordering
683        if order_by == "LastEventTime" && !prefix.is_empty() {
684            return Err(AwsServiceError::aws_error(
685                StatusCode::BAD_REQUEST,
686                "InvalidParameterException",
687                "Cannot order by LastEventTime with a logStreamNamePrefix.",
688            ));
689        }
690
691        let state = self.state.read();
692        let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
693            AwsServiceError::aws_error(
694                StatusCode::BAD_REQUEST,
695                "ResourceNotFoundException",
696                format!("The specified log group does not exist: {group_name}"),
697            )
698        })?;
699
700        let mut streams: Vec<&LogStream> = group
701            .log_streams
702            .values()
703            .filter(|s| prefix.is_empty() || s.name.starts_with(prefix))
704            .collect();
705        streams.sort_by(|a, b| a.name.cmp(&b.name));
706
707        // Handle pagination with token format: logGroupName@lastStreamName
708        let start_idx = if let Some(token) = next_token {
709            if let Some((_group, last_stream)) = token.split_once('@') {
710                streams
711                    .iter()
712                    .position(|s| s.name.as_str() > last_stream)
713                    .unwrap_or(streams.len())
714            } else {
715                streams.len() // invalid token -> empty results
716            }
717        } else {
718            0
719        };
720
721        let page = &streams[start_idx..];
722        let has_more = page.len() > limit;
723        let page = if has_more { &page[..limit] } else { page };
724
725        let log_streams: Vec<Value> = page
726            .iter()
727            .map(|s| {
728                let mut obj = json!({
729                    "logStreamName": s.name,
730                    "arn": s.arn,
731                    "creationTime": s.creation_time,
732                    "uploadSequenceToken": s.upload_sequence_token,
733                });
734                if let Some(ts) = s.first_event_timestamp {
735                    obj["firstEventTimestamp"] = json!(ts);
736                }
737                if let Some(ts) = s.last_event_timestamp {
738                    obj["lastEventTimestamp"] = json!(ts);
739                }
740                if let Some(ts) = s.last_ingestion_time {
741                    obj["lastIngestionTime"] = json!(ts);
742                }
743                obj
744            })
745            .collect();
746
747        let mut result = json!({ "logStreams": log_streams });
748        if has_more {
749            if let Some(last) = page.last() {
750                result["nextToken"] = json!(format!("{}@{}", group_name, last.name));
751            }
752        }
753
754        Ok(AwsResponse::json(
755            StatusCode::OK,
756            serde_json::to_string(&result).unwrap(),
757        ))
758    }
759
760    // ---- Log Events ----
761
762    fn put_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763        let body = body_json(req);
764        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
765            AwsServiceError::aws_error(
766                StatusCode::BAD_REQUEST,
767                "InvalidParameterException",
768                "logGroupName is required",
769            )
770        })?;
771        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
772            AwsServiceError::aws_error(
773                StatusCode::BAD_REQUEST,
774                "InvalidParameterException",
775                "logStreamName is required",
776            )
777        })?;
778
779        validate_string_length("logGroupName", group_name, 1, 512)?;
780        validate_string_length("logStreamName", stream_name, 1, 512)?;
781
782        let log_events = body["logEvents"].as_array().ok_or_else(|| {
783            AwsServiceError::aws_error(
784                StatusCode::BAD_REQUEST,
785                "InvalidParameterException",
786                "logEvents is required",
787            )
788        })?;
789
790        let now = Utc::now().timestamp_millis();
791
792        // Check chronological order
793        let timestamps: Vec<i64> = log_events
794            .iter()
795            .map(|e| e["timestamp"].as_i64().unwrap_or(now))
796            .collect();
797        for i in 1..timestamps.len() {
798            if timestamps[i] < timestamps[i - 1] {
799                return Err(AwsServiceError::aws_error(
800                    StatusCode::BAD_REQUEST,
801                    "InvalidParameterException",
802                    "Log events in a single PutLogEvents request must be in chronological order.",
803                ));
804            }
805        }
806
807        // Check for too old (14 days) and too new (2 hours) events
808        let fourteen_days_ms = 14 * 24 * 60 * 60 * 1000i64;
809        let two_hours_ms = 2 * 60 * 60 * 1000i64;
810        let mut too_old_end_index: Option<usize> = None;
811        let mut too_new_start_index: Option<usize> = None;
812
813        for (i, ts) in timestamps.iter().enumerate() {
814            if now.saturating_sub(*ts) > fourteen_days_ms {
815                too_old_end_index = Some(i);
816            } else if ts.saturating_sub(now) > two_hours_ms && too_new_start_index.is_none() {
817                too_new_start_index = Some(i);
818            }
819        }
820
821        // Build events list (only accepted ones)
822        let mut new_events: Vec<LogEvent> = Vec::new();
823        let mut rejected_info = json!({});
824        let mut has_rejected = false;
825
826        for (i, e) in log_events.iter().enumerate() {
827            let ts = e["timestamp"].as_i64().unwrap_or(now);
828            let is_too_old = too_old_end_index.is_some() && i <= too_old_end_index.unwrap();
829            let is_too_new = too_new_start_index.is_some() && i >= too_new_start_index.unwrap();
830
831            if is_too_old || is_too_new {
832                continue;
833            }
834
835            new_events.push(LogEvent {
836                timestamp: ts,
837                message: e["message"].as_str().unwrap_or("").to_string(),
838                ingestion_time: now,
839            });
840        }
841
842        if let Some(idx) = too_old_end_index {
843            rejected_info["tooOldLogEventEndIndex"] = json!(idx);
844            has_rejected = true;
845        }
846        if let Some(idx) = too_new_start_index {
847            rejected_info["tooNewLogEventStartIndex"] = json!(idx);
848            has_rejected = true;
849        }
850
851        let mut state = self.state.write();
852        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
853            AwsServiceError::aws_error(
854                StatusCode::BAD_REQUEST,
855                "ResourceNotFoundException",
856                format!("The specified log group does not exist: {group_name}"),
857            )
858        })?;
859
860        let stream = group.log_streams.get_mut(stream_name).ok_or_else(|| {
861            AwsServiceError::aws_error(
862                StatusCode::BAD_REQUEST,
863                "ResourceNotFoundException",
864                "The specified log stream does not exist.",
865            )
866        })?;
867
868        // Update stream metadata
869        for event in &new_events {
870            if stream.first_event_timestamp.is_none()
871                || Some(event.timestamp) < stream.first_event_timestamp
872            {
873                stream.first_event_timestamp = Some(event.timestamp);
874            }
875            if stream.last_event_timestamp.is_none()
876                || Some(event.timestamp) > stream.last_event_timestamp
877            {
878                stream.last_event_timestamp = Some(event.timestamp);
879            }
880            group.stored_bytes += event.message.len() as i64 + 26;
881        }
882        stream.last_ingestion_time = Some(now);
883
884        // Generate new sequence token
885        stream.upload_sequence_token = generate_sequence_token();
886
887        stream.events.append(&mut new_events);
888        stream.events.sort_by_key(|e| e.timestamp);
889
890        let mut response = json!({
891            "nextSequenceToken": stream.upload_sequence_token,
892        });
893        if has_rejected {
894            response["rejectedLogEventsInfo"] = rejected_info;
895        }
896
897        Ok(AwsResponse::json(
898            StatusCode::OK,
899            serde_json::to_string(&response).unwrap(),
900        ))
901    }
902
903    fn get_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904        let body = body_json(req);
905
906        // Support both logGroupName and logGroupIdentifier
907        let group_name = if let Some(name) = body["logGroupName"].as_str() {
908            name.to_string()
909        } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
910            if identifier.ends_with(":*") {
911                return Err(AwsServiceError::aws_error(
912                    StatusCode::BAD_REQUEST,
913                    "InvalidParameterException",
914                    format!(
915                        "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
916                        identifier
917                    ),
918                ));
919            }
920            if identifier.starts_with("arn:aws:logs:") {
921                extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
922            } else {
923                identifier.to_string()
924            }
925        } else {
926            return Err(AwsServiceError::aws_error(
927                StatusCode::BAD_REQUEST,
928                "InvalidParameterException",
929                "logGroupName is required",
930            ));
931        };
932
933        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
934            AwsServiceError::aws_error(
935                StatusCode::BAD_REQUEST,
936                "InvalidParameterException",
937                "logStreamName is required",
938            )
939        })?;
940
941        validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
942        validate_optional_string_length(
943            "logGroupIdentifier",
944            body["logGroupIdentifier"].as_str(),
945            1,
946            2048,
947        )?;
948        validate_string_length("logStreamName", stream_name, 1, 512)?;
949
950        let start_time = body["startTime"].as_i64();
951        let end_time = body["endTime"].as_i64();
952        let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
953        let start_from_head = body["startFromHead"].as_bool().unwrap_or(false);
954        let next_token = body["nextToken"].as_str();
955
956        // Validate limit
957        if limit > 10000 {
958            return Err(validation_error(
959                "limit",
960                &limit.to_string(),
961                "Member must have value less than or equal to 10000",
962            ));
963        }
964
965        // Validate nextToken format
966        if let Some(token) = next_token {
967            if !token.starts_with("f/") && !token.starts_with("b/") {
968                return Err(AwsServiceError::aws_error(
969                    StatusCode::BAD_REQUEST,
970                    "InvalidParameterException",
971                    "The specified nextToken is invalid.",
972                ));
973            }
974            let num_part = &token[2..];
975            if num_part.len() != 56 || num_part.parse::<u128>().is_err() {
976                return Err(AwsServiceError::aws_error(
977                    StatusCode::BAD_REQUEST,
978                    "InvalidParameterException",
979                    "The specified nextToken is invalid.",
980                ));
981            }
982        }
983
984        let state = self.state.read();
985        let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
986            AwsServiceError::aws_error(
987                StatusCode::BAD_REQUEST,
988                "ResourceNotFoundException",
989                format!("The specified log group does not exist: {group_name}"),
990            )
991        })?;
992
993        let stream = group.log_streams.get(stream_name).ok_or_else(|| {
994            AwsServiceError::aws_error(
995                StatusCode::BAD_REQUEST,
996                "ResourceNotFoundException",
997                format!("The specified log stream does not exist: {stream_name}"),
998            )
999        })?;
1000
1001        // All events are indexed 0..n
1002        let all_events: Vec<&LogEvent> = stream
1003            .events
1004            .iter()
1005            .filter(|e| {
1006                if let Some(start) = start_time {
1007                    if e.timestamp < start {
1008                        return false;
1009                    }
1010                }
1011                if let Some(end) = end_time {
1012                    if e.timestamp >= end {
1013                        return false;
1014                    }
1015                }
1016                true
1017            })
1018            .collect();
1019
1020        let total = all_events.len();
1021
1022        // Determine start position from token
1023        let (start_idx, is_forward) = if let Some(token) = next_token {
1024            let is_forward = token.starts_with("f/");
1025            let idx: usize = token[2..].parse().unwrap_or(0);
1026            if is_forward {
1027                // Forward token: start from idx+1
1028                (idx + 1, true)
1029            } else {
1030                // Backward token: end at idx (exclusive), so start at max(0, idx-limit)
1031                (idx, false)
1032            }
1033        } else {
1034            (0, start_from_head)
1035        };
1036
1037        let events_slice: Vec<&LogEvent>;
1038        let next_forward_idx: usize;
1039        let next_backward_idx: usize;
1040
1041        if is_forward || start_from_head && next_token.is_none() {
1042            // Forward: from start_idx, take limit
1043            let end_idx = std::cmp::min(start_idx + limit, total);
1044            if start_idx >= total {
1045                events_slice = Vec::new();
1046                let last_idx = if total > 0 { total - 1 } else { 0 };
1047                next_forward_idx = last_idx;
1048                next_backward_idx = last_idx;
1049            } else {
1050                events_slice = all_events[start_idx..end_idx].to_vec();
1051                next_forward_idx = end_idx - 1;
1052                next_backward_idx = start_idx;
1053            }
1054        } else {
1055            // Backward (default): from end, take last `limit` events
1056            if next_token.is_some() {
1057                // Backward token: start_idx is the position, go backward `limit` from here
1058                let begin = start_idx.saturating_sub(limit);
1059                let end_idx = start_idx;
1060                if begin >= total || end_idx > total || begin >= end_idx {
1061                    events_slice = Vec::new();
1062                    next_forward_idx = start_idx;
1063                    next_backward_idx = start_idx;
1064                } else {
1065                    events_slice = all_events[begin..end_idx].to_vec();
1066                    next_forward_idx = end_idx - 1;
1067                    next_backward_idx = begin;
1068                }
1069            } else {
1070                // No token, not start_from_head: return last `limit` events
1071                let begin = total.saturating_sub(limit);
1072                events_slice = all_events[begin..].to_vec();
1073                next_forward_idx = if total > 0 { total - 1 } else { 0 };
1074                next_backward_idx = begin;
1075            }
1076        }
1077
1078        let events_json: Vec<Value> = events_slice
1079            .iter()
1080            .map(|e| {
1081                json!({
1082                    "timestamp": e.timestamp,
1083                    "message": e.message,
1084                    "ingestionTime": e.ingestion_time,
1085                })
1086            })
1087            .collect();
1088
1089        let forward_token = format!("f/{:056}", next_forward_idx);
1090        let backward_token = format!("b/{:056}", next_backward_idx);
1091
1092        Ok(AwsResponse::json(
1093            StatusCode::OK,
1094            serde_json::to_string(&json!({
1095                "events": events_json,
1096                "nextForwardToken": forward_token,
1097                "nextBackwardToken": backward_token,
1098            }))
1099            .unwrap(),
1100        ))
1101    }
1102
1103    fn filter_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1104        let body = body_json(req);
1105        let log_group_identifier = body["logGroupIdentifier"].as_str();
1106        let log_group_name = body["logGroupName"].as_str();
1107        let filter_pattern = body["filterPattern"].as_str().unwrap_or("");
1108        let start_time = body["startTime"].as_i64();
1109        let end_time = body["endTime"].as_i64();
1110        let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
1111        let next_token = body["nextToken"].as_str();
1112        let stream_names: Vec<&str> = body["logStreamNames"]
1113            .as_array()
1114            .map(|a| a.iter().filter_map(|v| v.as_str()).collect())
1115            .unwrap_or_default();
1116        let stream_name_prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
1117
1118        if let Some(name) = log_group_name {
1119            validate_string_length("logGroupName", name, 1, 512)?;
1120        }
1121        validate_optional_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
1122        validate_optional_string_length(
1123            "logStreamNamePrefix",
1124            body["logStreamNamePrefix"].as_str(),
1125            1,
1126            512,
1127        )?;
1128        validate_optional_string_length("filterPattern", Some(filter_pattern), 0, 1024)?;
1129
1130        // Resolve the effective log group name: logGroupIdentifier takes precedence,
1131        // and can be either a name or an ARN.
1132        let resolved_group_name = if let Some(identifier) = log_group_identifier {
1133            if identifier.starts_with("arn:") {
1134                extract_log_group_from_arn(identifier).ok_or_else(|| {
1135                    AwsServiceError::aws_error(
1136                        StatusCode::BAD_REQUEST,
1137                        "InvalidParameterException",
1138                        format!("Invalid ARN: {identifier}"),
1139                    )
1140                })?
1141            } else {
1142                identifier.to_string()
1143            }
1144        } else if let Some(name) = log_group_name {
1145            name.to_string()
1146        } else {
1147            return Err(AwsServiceError::aws_error(
1148                StatusCode::BAD_REQUEST,
1149                "InvalidParameterException",
1150                "Either logGroupName or logGroupIdentifier is required",
1151            ));
1152        };
1153
1154        // Validate limit
1155        if limit > 10000 {
1156            return Err(validation_error(
1157                "limit",
1158                &limit.to_string(),
1159                "Member must have value less than or equal to 10000",
1160            ));
1161        }
1162
1163        let state = self.state.read();
1164        let group = state
1165            .log_groups
1166            .get(resolved_group_name.as_str())
1167            .ok_or_else(|| {
1168                AwsServiceError::aws_error(
1169                    StatusCode::BAD_REQUEST,
1170                    "ResourceNotFoundException",
1171                    format!("The specified log group does not exist: {resolved_group_name}"),
1172                )
1173            })?;
1174
1175        let mut filtered_events: Vec<Value> = Vec::new();
1176
1177        let streams: Vec<(&String, &LogStream)> = if !stream_names.is_empty() {
1178            group
1179                .log_streams
1180                .iter()
1181                .filter(|(name, _)| stream_names.contains(&name.as_str()))
1182                .collect()
1183        } else if !stream_name_prefix.is_empty() {
1184            group
1185                .log_streams
1186                .iter()
1187                .filter(|(name, _)| name.starts_with(stream_name_prefix))
1188                .collect()
1189        } else {
1190            group.log_streams.iter().collect()
1191        };
1192
1193        for (_, stream) in streams {
1194            for event in &stream.events {
1195                if let Some(start) = start_time {
1196                    if event.timestamp < start {
1197                        continue;
1198                    }
1199                }
1200                if let Some(end) = end_time {
1201                    if event.timestamp >= end {
1202                        continue;
1203                    }
1204                }
1205                // Filter pattern matching
1206                if !filter_pattern.is_empty()
1207                    && !matches_filter_pattern(filter_pattern, &event.message)
1208                {
1209                    continue;
1210                }
1211
1212                let event_id = format!("{}-{}", stream.name, event.timestamp);
1213
1214                filtered_events.push(json!({
1215                    "logStreamName": stream.name,
1216                    "timestamp": event.timestamp,
1217                    "message": event.message,
1218                    "ingestionTime": event.ingestion_time,
1219                    "eventId": event_id,
1220                }));
1221            }
1222        }
1223
1224        filtered_events.sort_by_key(|e| e["timestamp"].as_i64().unwrap_or(0));
1225
1226        // Handle pagination
1227        // Token format: groupName@streamName@eventId
1228        let start_idx = if let Some(token) = next_token {
1229            let parts: Vec<&str> = token.splitn(3, '@').collect();
1230            if parts.len() == 3 {
1231                let after_event_id = parts[2];
1232                // Find the position after this eventId
1233                filtered_events
1234                    .iter()
1235                    .position(|e| e["eventId"].as_str().unwrap_or("") == after_event_id)
1236                    .map(|pos| pos + 1)
1237                    .unwrap_or(filtered_events.len())
1238            } else {
1239                filtered_events.len() // invalid token -> empty results
1240            }
1241        } else {
1242            0
1243        };
1244
1245        let remaining = &filtered_events[start_idx..];
1246        let has_more = remaining.len() > limit;
1247        let page: Vec<Value> = if has_more {
1248            remaining[..limit].to_vec()
1249        } else {
1250            remaining.to_vec()
1251        };
1252
1253        let mut result = json!({
1254            "events": page,
1255            "searchedLogStreams": [],
1256        });
1257
1258        if has_more {
1259            if let Some(last) = page.last() {
1260                let event_id = last["eventId"].as_str().unwrap_or("");
1261                result["nextToken"] = json!(format!(
1262                    "{}@{}@{}",
1263                    resolved_group_name,
1264                    last["logStreamName"].as_str().unwrap_or(""),
1265                    event_id
1266                ));
1267            }
1268        }
1269
1270        Ok(AwsResponse::json(
1271            StatusCode::OK,
1272            serde_json::to_string(&result).unwrap(),
1273        ))
1274    }
1275
1276    // ---- Tags (legacy) ----
1277
1278    fn tag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1279        let body = body_json(req);
1280        let name = body["logGroupName"].as_str().ok_or_else(|| {
1281            AwsServiceError::aws_error(
1282                StatusCode::BAD_REQUEST,
1283                "InvalidParameterException",
1284                "logGroupName is required",
1285            )
1286        })?;
1287
1288        validate_string_length("logGroupName", name, 1, 512)?;
1289
1290        let tags = body["tags"].as_object().ok_or_else(|| {
1291            AwsServiceError::aws_error(
1292                StatusCode::BAD_REQUEST,
1293                "InvalidParameterException",
1294                "tags is required",
1295            )
1296        })?;
1297
1298        let mut state = self.state.write();
1299        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1300            AwsServiceError::aws_error(
1301                StatusCode::BAD_REQUEST,
1302                "ResourceNotFoundException",
1303                format!("The specified log group does not exist: {name}"),
1304            )
1305        })?;
1306
1307        for (k, v) in tags {
1308            group
1309                .tags
1310                .insert(k.clone(), v.as_str().unwrap_or("").to_string());
1311        }
1312
1313        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1314    }
1315
1316    fn untag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317        let body = body_json(req);
1318        let name = body["logGroupName"].as_str().ok_or_else(|| {
1319            AwsServiceError::aws_error(
1320                StatusCode::BAD_REQUEST,
1321                "InvalidParameterException",
1322                "logGroupName is required",
1323            )
1324        })?;
1325
1326        validate_string_length("logGroupName", name, 1, 512)?;
1327
1328        let keys = body["tags"].as_array().ok_or_else(|| {
1329            AwsServiceError::aws_error(
1330                StatusCode::BAD_REQUEST,
1331                "InvalidParameterException",
1332                "tags is required",
1333            )
1334        })?;
1335
1336        let mut state = self.state.write();
1337        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1338            AwsServiceError::aws_error(
1339                StatusCode::BAD_REQUEST,
1340                "ResourceNotFoundException",
1341                format!("The specified log group does not exist: {name}"),
1342            )
1343        })?;
1344
1345        for key in keys {
1346            if let Some(k) = key.as_str() {
1347                group.tags.remove(k);
1348            }
1349        }
1350
1351        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1352    }
1353
1354    fn list_tags_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355        let body = body_json(req);
1356        let name = body["logGroupName"].as_str().ok_or_else(|| {
1357            AwsServiceError::aws_error(
1358                StatusCode::BAD_REQUEST,
1359                "InvalidParameterException",
1360                "logGroupName is required",
1361            )
1362        })?;
1363
1364        validate_string_length("logGroupName", name, 1, 512)?;
1365
1366        let state = self.state.read();
1367        let group = state.log_groups.get(name).ok_or_else(|| {
1368            AwsServiceError::aws_error(
1369                StatusCode::BAD_REQUEST,
1370                "ResourceNotFoundException",
1371                format!("The specified log group does not exist: {name}"),
1372            )
1373        })?;
1374
1375        Ok(AwsResponse::json(
1376            StatusCode::OK,
1377            serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1378        ))
1379    }
1380
1381    // ---- Tags (new API: TagResource/UntagResource/ListTagsForResource) ----
1382
1383    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384        let body = body_json(req);
1385        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1386            AwsServiceError::aws_error(
1387                StatusCode::BAD_REQUEST,
1388                "InvalidParameterException",
1389                "resourceArn is required",
1390            )
1391        })?;
1392
1393        validate_string_length("resourceArn", arn, 1, 1011)?;
1394
1395        let tags = body["tags"].as_object().ok_or_else(|| {
1396            AwsServiceError::aws_error(
1397                StatusCode::BAD_REQUEST,
1398                "InvalidParameterException",
1399                "tags is required",
1400            )
1401        })?;
1402
1403        let new_tags: std::collections::HashMap<String, String> = tags
1404            .iter()
1405            .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
1406            .collect();
1407
1408        let mut state = self.state.write();
1409
1410        // Try log group
1411        if let Some(group) = state
1412            .log_groups
1413            .values_mut()
1414            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1415        {
1416            for (k, v) in new_tags {
1417                group.tags.insert(k, v);
1418            }
1419            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1420        }
1421
1422        // Try destination
1423        if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1424            for (k, v) in new_tags {
1425                dest.tags.insert(k, v);
1426            }
1427            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1428        }
1429
1430        Err(AwsServiceError::aws_error(
1431            StatusCode::BAD_REQUEST,
1432            "ResourceNotFoundException",
1433            format!("The specified resource does not exist: {arn}"),
1434        ))
1435    }
1436
1437    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438        let body = body_json(req);
1439        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1440            AwsServiceError::aws_error(
1441                StatusCode::BAD_REQUEST,
1442                "InvalidParameterException",
1443                "resourceArn is required",
1444            )
1445        })?;
1446
1447        validate_string_length("resourceArn", arn, 1, 1011)?;
1448
1449        let tag_keys = body["tagKeys"].as_array().ok_or_else(|| {
1450            AwsServiceError::aws_error(
1451                StatusCode::BAD_REQUEST,
1452                "InvalidParameterException",
1453                "tagKeys is required",
1454            )
1455        })?;
1456
1457        let keys: Vec<String> = tag_keys
1458            .iter()
1459            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1460            .collect();
1461
1462        let mut state = self.state.write();
1463
1464        // Try log group
1465        if let Some(group) = state
1466            .log_groups
1467            .values_mut()
1468            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1469        {
1470            for k in &keys {
1471                group.tags.remove(k);
1472            }
1473            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1474        }
1475
1476        // Try destination
1477        if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1478            for k in &keys {
1479                dest.tags.remove(k);
1480            }
1481            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1482        }
1483
1484        Err(AwsServiceError::aws_error(
1485            StatusCode::BAD_REQUEST,
1486            "ResourceNotFoundException",
1487            format!("The specified resource does not exist: {arn}"),
1488        ))
1489    }
1490
1491    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1492        let body = body_json(req);
1493        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1494            AwsServiceError::aws_error(
1495                StatusCode::BAD_REQUEST,
1496                "InvalidParameterException",
1497                "resourceArn is required",
1498            )
1499        })?;
1500
1501        validate_string_length("resourceArn", arn, 1, 1011)?;
1502
1503        let state = self.state.read();
1504
1505        // Try log group
1506        if let Some(group) = state
1507            .log_groups
1508            .values()
1509            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1510        {
1511            return Ok(AwsResponse::json(
1512                StatusCode::OK,
1513                serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1514            ));
1515        }
1516
1517        // Try destination
1518        if let Some(dest) = state.destinations.values().find(|d| d.arn == arn) {
1519            return Ok(AwsResponse::json(
1520                StatusCode::OK,
1521                serde_json::to_string(&json!({ "tags": dest.tags })).unwrap(),
1522            ));
1523        }
1524
1525        Err(AwsServiceError::aws_error(
1526            StatusCode::BAD_REQUEST,
1527            "ResourceNotFoundException",
1528            format!("The specified resource does not exist: {arn}"),
1529        ))
1530    }
1531
1532    // ---- Retention Policy ----
1533
1534    fn put_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535        let body = body_json(req);
1536        let name = body["logGroupName"].as_str().ok_or_else(|| {
1537            AwsServiceError::aws_error(
1538                StatusCode::BAD_REQUEST,
1539                "InvalidParameterException",
1540                "logGroupName is required",
1541            )
1542        })?;
1543
1544        validate_string_length("logGroupName", name, 1, 512)?;
1545
1546        let days = body["retentionInDays"].as_i64().ok_or_else(|| {
1547            AwsServiceError::aws_error(
1548                StatusCode::BAD_REQUEST,
1549                "InvalidParameterException",
1550                "retentionInDays is required",
1551            )
1552        })?;
1553
1554        let mut state = self.state.write();
1555        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1556            AwsServiceError::aws_error(
1557                StatusCode::BAD_REQUEST,
1558                "ResourceNotFoundException",
1559                format!("The specified log group does not exist: {name}"),
1560            )
1561        })?;
1562
1563        group.retention_in_days = Some(days as i32);
1564
1565        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1566    }
1567
1568    fn delete_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569        let body = body_json(req);
1570        let name = body["logGroupName"].as_str().ok_or_else(|| {
1571            AwsServiceError::aws_error(
1572                StatusCode::BAD_REQUEST,
1573                "InvalidParameterException",
1574                "logGroupName is required",
1575            )
1576        })?;
1577
1578        validate_string_length("logGroupName", name, 1, 512)?;
1579
1580        let mut state = self.state.write();
1581        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1582            AwsServiceError::aws_error(
1583                StatusCode::BAD_REQUEST,
1584                "ResourceNotFoundException",
1585                format!("The specified log group does not exist: {name}"),
1586            )
1587        })?;
1588
1589        group.retention_in_days = None;
1590
1591        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1592    }
1593
1594    // ---- Subscription Filters ----
1595
1596    fn put_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1597        let body = body_json(req);
1598        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1599            AwsServiceError::aws_error(
1600                StatusCode::BAD_REQUEST,
1601                "InvalidParameterException",
1602                "logGroupName is required",
1603            )
1604        })?;
1605        let filter_name = body["filterName"]
1606            .as_str()
1607            .ok_or_else(|| {
1608                AwsServiceError::aws_error(
1609                    StatusCode::BAD_REQUEST,
1610                    "InvalidParameterException",
1611                    "filterName is required",
1612                )
1613            })?
1614            .to_string();
1615        let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1616        let destination_arn = body["destinationArn"]
1617            .as_str()
1618            .ok_or_else(|| {
1619                AwsServiceError::aws_error(
1620                    StatusCode::BAD_REQUEST,
1621                    "InvalidParameterException",
1622                    "destinationArn is required",
1623                )
1624            })?
1625            .to_string();
1626        let role_arn = body["roleArn"].as_str().map(|s| s.to_string());
1627        let distribution = body["distribution"]
1628            .as_str()
1629            .unwrap_or("ByLogStream")
1630            .to_string();
1631
1632        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1633        validate_string_length("filterName", &filter_name, 1, 512)?;
1634        validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1635
1636        let mut state = self.state.write();
1637        let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1638            AwsServiceError::aws_error(
1639                StatusCode::BAD_REQUEST,
1640                "ResourceNotFoundException",
1641                "The specified log group does not exist.",
1642            )
1643        })?;
1644
1645        // Check if updating existing filter
1646        if let Some(existing) = group
1647            .subscription_filters
1648            .iter_mut()
1649            .find(|f| f.filter_name == filter_name)
1650        {
1651            existing.filter_pattern = filter_pattern;
1652            existing.destination_arn = destination_arn;
1653            existing.role_arn = role_arn;
1654            existing.distribution = distribution;
1655            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1656        }
1657
1658        // Max 2 subscription filters per log group
1659        if group.subscription_filters.len() >= 2 {
1660            return Err(AwsServiceError::aws_error(
1661                StatusCode::BAD_REQUEST,
1662                "LimitExceededException",
1663                "Resource limit exceeded.",
1664            ));
1665        }
1666
1667        let now = Utc::now().timestamp_millis();
1668        group.subscription_filters.push(SubscriptionFilter {
1669            filter_name,
1670            log_group_name: log_group_name.to_string(),
1671            filter_pattern,
1672            destination_arn,
1673            role_arn,
1674            distribution,
1675            creation_time: now,
1676        });
1677
1678        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1679    }
1680
1681    fn describe_subscription_filters(
1682        &self,
1683        req: &AwsRequest,
1684    ) -> Result<AwsResponse, AwsServiceError> {
1685        let body = body_json(req);
1686        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1687            AwsServiceError::aws_error(
1688                StatusCode::BAD_REQUEST,
1689                "InvalidParameterException",
1690                "logGroupName is required",
1691            )
1692        })?;
1693
1694        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1695        validate_optional_string_length(
1696            "filterNamePrefix",
1697            body["filterNamePrefix"].as_str(),
1698            1,
1699            512,
1700        )?;
1701
1702        let state = self.state.read();
1703        let group = state.log_groups.get(log_group_name).ok_or_else(|| {
1704            AwsServiceError::aws_error(
1705                StatusCode::BAD_REQUEST,
1706                "ResourceNotFoundException",
1707                "The specified log group does not exist.",
1708            )
1709        })?;
1710
1711        let filters: Vec<Value> = group
1712            .subscription_filters
1713            .iter()
1714            .map(|f| {
1715                let mut obj = json!({
1716                    "filterName": f.filter_name,
1717                    "logGroupName": f.log_group_name,
1718                    "filterPattern": f.filter_pattern,
1719                    "destinationArn": f.destination_arn,
1720                    "distribution": f.distribution,
1721                    "creationTime": f.creation_time,
1722                });
1723                if let Some(ref arn) = f.role_arn {
1724                    obj["roleArn"] = json!(arn);
1725                }
1726                obj
1727            })
1728            .collect();
1729
1730        Ok(AwsResponse::json(
1731            StatusCode::OK,
1732            serde_json::to_string(&json!({ "subscriptionFilters": filters })).unwrap(),
1733        ))
1734    }
1735
1736    fn delete_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1737        let body = body_json(req);
1738        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1739            AwsServiceError::aws_error(
1740                StatusCode::BAD_REQUEST,
1741                "InvalidParameterException",
1742                "logGroupName is required",
1743            )
1744        })?;
1745        let filter_name = body["filterName"].as_str().ok_or_else(|| {
1746            AwsServiceError::aws_error(
1747                StatusCode::BAD_REQUEST,
1748                "InvalidParameterException",
1749                "filterName is required",
1750            )
1751        })?;
1752
1753        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1754        validate_string_length("filterName", filter_name, 1, 512)?;
1755
1756        let mut state = self.state.write();
1757        let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1758            AwsServiceError::aws_error(
1759                StatusCode::BAD_REQUEST,
1760                "ResourceNotFoundException",
1761                "The specified log group does not exist.",
1762            )
1763        })?;
1764
1765        let idx = group
1766            .subscription_filters
1767            .iter()
1768            .position(|f| f.filter_name == filter_name)
1769            .ok_or_else(|| {
1770                AwsServiceError::aws_error(
1771                    StatusCode::BAD_REQUEST,
1772                    "ResourceNotFoundException",
1773                    "The specified subscription filter does not exist.",
1774                )
1775            })?;
1776
1777        group.subscription_filters.remove(idx);
1778
1779        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1780    }
1781
1782    // ---- Metric Filters ----
1783
1784    fn put_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1785        let body = body_json(req);
1786        let filter_name = body["filterName"]
1787            .as_str()
1788            .ok_or_else(|| {
1789                AwsServiceError::aws_error(
1790                    StatusCode::BAD_REQUEST,
1791                    "InvalidParameterException",
1792                    "filterName is required",
1793                )
1794            })?
1795            .to_string();
1796        validate_required("filterPattern", &body["filterPattern"])?;
1797        let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1798        let log_group_name = body["logGroupName"]
1799            .as_str()
1800            .ok_or_else(|| {
1801                AwsServiceError::aws_error(
1802                    StatusCode::BAD_REQUEST,
1803                    "InvalidParameterException",
1804                    "logGroupName is required",
1805                )
1806            })?
1807            .to_string();
1808
1809        validate_string_length("filterName", &filter_name, 1, 512)?;
1810        validate_string_length("logGroupName", &log_group_name, 1, 512)?;
1811        validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1812        validate_optional_string_length(
1813            "fieldSelectionCriteria",
1814            body["fieldSelectionCriteria"].as_str(),
1815            0,
1816            2000,
1817        )?;
1818
1819        let transformations_json = body["metricTransformations"].as_array().ok_or_else(|| {
1820            AwsServiceError::aws_error(
1821                StatusCode::BAD_REQUEST,
1822                "InvalidParameterException",
1823                "metricTransformations is required",
1824            )
1825        })?;
1826
1827        // Validate max 1 transformation
1828        if transformations_json.len() > 1 {
1829            return Err(validation_error(
1830                "metricTransformations",
1831                &format!("{}", transformations_json.len()),
1832                "Member must have length less than or equal to 1",
1833            ));
1834        }
1835
1836        let transformations: Vec<MetricTransformation> = transformations_json
1837            .iter()
1838            .map(|t| MetricTransformation {
1839                metric_name: t["metricName"].as_str().unwrap_or("").to_string(),
1840                metric_namespace: t["metricNamespace"].as_str().unwrap_or("").to_string(),
1841                metric_value: t["metricValue"].as_str().unwrap_or("").to_string(),
1842                default_value: t["defaultValue"].as_f64(),
1843            })
1844            .collect();
1845
1846        let now = Utc::now().timestamp_millis();
1847
1848        let mut state = self.state.write();
1849
1850        // Update existing or add new
1851        if let Some(existing) = state
1852            .metric_filters
1853            .iter_mut()
1854            .find(|f| f.filter_name == filter_name && f.log_group_name == log_group_name)
1855        {
1856            existing.filter_pattern = filter_pattern;
1857            existing.metric_transformations = transformations;
1858        } else {
1859            state.metric_filters.push(MetricFilter {
1860                filter_name,
1861                filter_pattern,
1862                log_group_name,
1863                metric_transformations: transformations,
1864                creation_time: now,
1865            });
1866        }
1867
1868        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1869    }
1870
1871    fn describe_metric_filters(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872        let body = body_json(req);
1873        let filter_name_prefix = body["filterNamePrefix"].as_str();
1874        let log_group_name = body["logGroupName"].as_str();
1875        let metric_name = body["metricName"].as_str();
1876        let metric_namespace = body["metricNamespace"].as_str();
1877
1878        validate_optional_string_length("filterNamePrefix", filter_name_prefix, 1, 512)?;
1879        validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
1880        validate_optional_string_length("metricName", metric_name, 0, 255)?;
1881        validate_optional_string_length("metricNamespace", metric_namespace, 0, 255)?;
1882        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
1883        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
1884
1885        let state = self.state.read();
1886        let filters: Vec<Value> = state
1887            .metric_filters
1888            .iter()
1889            .filter(|f| {
1890                if let Some(prefix) = filter_name_prefix {
1891                    if !f.filter_name.starts_with(prefix) {
1892                        return false;
1893                    }
1894                }
1895                if let Some(lg) = log_group_name {
1896                    if f.log_group_name != lg {
1897                        return false;
1898                    }
1899                }
1900                if let Some(mn) = metric_name {
1901                    if !f.metric_transformations.iter().any(|t| t.metric_name == mn) {
1902                        return false;
1903                    }
1904                }
1905                if let Some(ns) = metric_namespace {
1906                    if !f
1907                        .metric_transformations
1908                        .iter()
1909                        .any(|t| t.metric_namespace == ns)
1910                    {
1911                        return false;
1912                    }
1913                }
1914                true
1915            })
1916            .map(|f| {
1917                let transformations: Vec<Value> = f
1918                    .metric_transformations
1919                    .iter()
1920                    .map(|t| {
1921                        let mut obj = json!({
1922                            "metricName": t.metric_name,
1923                            "metricNamespace": t.metric_namespace,
1924                            "metricValue": t.metric_value,
1925                        });
1926                        if let Some(dv) = t.default_value {
1927                            obj["defaultValue"] = json!(dv);
1928                        }
1929                        obj
1930                    })
1931                    .collect();
1932
1933                json!({
1934                    "filterName": f.filter_name,
1935                    "filterPattern": f.filter_pattern,
1936                    "logGroupName": f.log_group_name,
1937                    "metricTransformations": transformations,
1938                    "creationTime": f.creation_time,
1939                })
1940            })
1941            .collect();
1942
1943        Ok(AwsResponse::json(
1944            StatusCode::OK,
1945            serde_json::to_string(&json!({ "metricFilters": filters })).unwrap(),
1946        ))
1947    }
1948
1949    fn delete_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1950        let body = body_json(req);
1951        let filter_name = body["filterName"].as_str().ok_or_else(|| {
1952            AwsServiceError::aws_error(
1953                StatusCode::BAD_REQUEST,
1954                "InvalidParameterException",
1955                "filterName is required",
1956            )
1957        })?;
1958        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1959            AwsServiceError::aws_error(
1960                StatusCode::BAD_REQUEST,
1961                "InvalidParameterException",
1962                "logGroupName is required",
1963            )
1964        })?;
1965
1966        validate_string_length("filterName", filter_name, 1, 512)?;
1967        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1968
1969        let mut state = self.state.write();
1970        let idx = state
1971            .metric_filters
1972            .iter()
1973            .position(|f| f.filter_name == filter_name && f.log_group_name == log_group_name);
1974
1975        if let Some(i) = idx {
1976            state.metric_filters.remove(i);
1977        }
1978
1979        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1980    }
1981
1982    // ---- Resource Policies ----
1983
1984    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1985        let body = body_json(req);
1986        let policy_name = body["policyName"]
1987            .as_str()
1988            .ok_or_else(|| {
1989                AwsServiceError::aws_error(
1990                    StatusCode::BAD_REQUEST,
1991                    "InvalidParameterException",
1992                    "policyName is required",
1993                )
1994            })?
1995            .to_string();
1996        let policy_document = body["policyDocument"]
1997            .as_str()
1998            .ok_or_else(|| {
1999                AwsServiceError::aws_error(
2000                    StatusCode::BAD_REQUEST,
2001                    "InvalidParameterException",
2002                    "policyDocument is required",
2003                )
2004            })?
2005            .to_string();
2006
2007        let now = Utc::now().timestamp_millis();
2008
2009        let mut state = self.state.write();
2010
2011        // Check limit (10 per region) only if adding new
2012        if !state.resource_policies.contains_key(&policy_name)
2013            && state.resource_policies.len() >= 10
2014        {
2015            return Err(AwsServiceError::aws_error(
2016                StatusCode::BAD_REQUEST,
2017                "LimitExceededException",
2018                "Resource limit exceeded.",
2019            ));
2020        }
2021
2022        let policy = ResourcePolicy {
2023            policy_name: policy_name.clone(),
2024            policy_document: policy_document.clone(),
2025            last_updated_time: now,
2026        };
2027
2028        state.resource_policies.insert(policy_name.clone(), policy);
2029
2030        Ok(AwsResponse::json(
2031            StatusCode::OK,
2032            serde_json::to_string(&json!({
2033                "resourcePolicy": {
2034                    "policyName": policy_name,
2035                    "policyDocument": policy_document,
2036                    "lastUpdatedTime": now,
2037                }
2038            }))
2039            .unwrap(),
2040        ))
2041    }
2042
2043    fn describe_resource_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2044        let body = body_json(req);
2045        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2046        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2047        validate_optional_enum_value(
2048            "policyScope",
2049            &body["policyScope"],
2050            &["ACCOUNT", "RESOURCE"],
2051        )?;
2052        let state = self.state.read();
2053
2054        let mut policies: Vec<Value> = state
2055            .resource_policies
2056            .values()
2057            .map(|p| {
2058                json!({
2059                    "policyName": p.policy_name,
2060                    "policyDocument": p.policy_document,
2061                    "lastUpdatedTime": p.last_updated_time,
2062                })
2063            })
2064            .collect();
2065        policies.sort_by(|a, b| {
2066            a["policyName"]
2067                .as_str()
2068                .unwrap()
2069                .cmp(b["policyName"].as_str().unwrap())
2070        });
2071
2072        Ok(AwsResponse::json(
2073            StatusCode::OK,
2074            serde_json::to_string(&json!({ "resourcePolicies": policies })).unwrap(),
2075        ))
2076    }
2077
2078    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079        let body = body_json(req);
2080        let policy_name = body["policyName"].as_str().ok_or_else(|| {
2081            AwsServiceError::aws_error(
2082                StatusCode::BAD_REQUEST,
2083                "InvalidParameterException",
2084                "policyName is required",
2085            )
2086        })?;
2087
2088        let mut state = self.state.write();
2089        if state.resource_policies.remove(policy_name).is_none() {
2090            return Err(AwsServiceError::aws_error(
2091                StatusCode::BAD_REQUEST,
2092                "ResourceNotFoundException",
2093                format!("Policy with name [{policy_name}] does not exist"),
2094            ));
2095        }
2096
2097        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2098    }
2099
2100    // ---- Destinations ----
2101
2102    fn put_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2103        let body = body_json(req);
2104        let destination_name = body["destinationName"]
2105            .as_str()
2106            .ok_or_else(|| {
2107                AwsServiceError::aws_error(
2108                    StatusCode::BAD_REQUEST,
2109                    "InvalidParameterException",
2110                    "destinationName is required",
2111                )
2112            })?
2113            .to_string();
2114        let target_arn = body["targetArn"]
2115            .as_str()
2116            .ok_or_else(|| {
2117                AwsServiceError::aws_error(
2118                    StatusCode::BAD_REQUEST,
2119                    "InvalidParameterException",
2120                    "targetArn is required",
2121                )
2122            })?
2123            .to_string();
2124        let role_arn = body["roleArn"]
2125            .as_str()
2126            .ok_or_else(|| {
2127                AwsServiceError::aws_error(
2128                    StatusCode::BAD_REQUEST,
2129                    "InvalidParameterException",
2130                    "roleArn is required",
2131                )
2132            })?
2133            .to_string();
2134
2135        validate_string_length("destinationName", &destination_name, 1, 512)?;
2136        validate_string_length("targetArn", &target_arn, 1, 2048)?;
2137        validate_string_length("roleArn", &role_arn, 1, 2048)?;
2138
2139        let tags: std::collections::HashMap<String, String> = body["tags"]
2140            .as_object()
2141            .map(|m| {
2142                m.iter()
2143                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2144                    .collect()
2145            })
2146            .unwrap_or_default();
2147
2148        let mut state = self.state.write();
2149        let arn = format!(
2150            "arn:aws:logs:{}:{}:destination:{}",
2151            state.region, state.account_id, destination_name
2152        );
2153        let now = Utc::now().timestamp_millis();
2154
2155        // Update or create
2156        let access_policy = state
2157            .destinations
2158            .get(&destination_name)
2159            .and_then(|d| d.access_policy.clone());
2160
2161        let dest = Destination {
2162            destination_name: destination_name.clone(),
2163            target_arn: target_arn.clone(),
2164            role_arn: role_arn.clone(),
2165            arn: arn.clone(),
2166            access_policy,
2167            creation_time: now,
2168            tags: tags.clone(),
2169        };
2170
2171        state.destinations.insert(destination_name.clone(), dest);
2172
2173        let dest_json = json!({
2174            "destinationName": destination_name,
2175            "targetArn": target_arn,
2176            "roleArn": role_arn,
2177            "arn": arn,
2178            "creationTime": now,
2179        });
2180
2181        Ok(AwsResponse::json(
2182            StatusCode::OK,
2183            serde_json::to_string(&json!({ "destination": dest_json })).unwrap(),
2184        ))
2185    }
2186
2187    fn describe_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2188        let body = body_json(req);
2189        let prefix = body["DestinationNamePrefix"].as_str().unwrap_or("");
2190
2191        validate_optional_string_length(
2192            "DestinationNamePrefix",
2193            body["DestinationNamePrefix"].as_str(),
2194            1,
2195            512,
2196        )?;
2197        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2198        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2199
2200        let state = self.state.read();
2201        let destinations: Vec<Value> = state
2202            .destinations
2203            .values()
2204            .filter(|d| prefix.is_empty() || d.destination_name.starts_with(prefix))
2205            .map(|d| {
2206                let mut obj = json!({
2207                    "destinationName": d.destination_name,
2208                    "targetArn": d.target_arn,
2209                    "roleArn": d.role_arn,
2210                    "arn": d.arn,
2211                    "creationTime": d.creation_time,
2212                });
2213                if let Some(ref policy) = d.access_policy {
2214                    obj["accessPolicy"] = json!(policy);
2215                }
2216                obj
2217            })
2218            .collect();
2219
2220        Ok(AwsResponse::json(
2221            StatusCode::OK,
2222            serde_json::to_string(&json!({ "destinations": destinations })).unwrap(),
2223        ))
2224    }
2225
2226    fn delete_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2227        let body = body_json(req);
2228        let name = body["destinationName"].as_str().ok_or_else(|| {
2229            AwsServiceError::aws_error(
2230                StatusCode::BAD_REQUEST,
2231                "InvalidParameterException",
2232                "destinationName is required",
2233            )
2234        })?;
2235
2236        validate_string_length("destinationName", name, 1, 512)?;
2237
2238        let mut state = self.state.write();
2239        if state.destinations.remove(name).is_none() {
2240            return Err(AwsServiceError::aws_error(
2241                StatusCode::BAD_REQUEST,
2242                "ResourceNotFoundException",
2243                format!("The specified destination does not exist: {name}"),
2244            ));
2245        }
2246
2247        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2248    }
2249
2250    fn put_destination_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2251        let body = body_json(req);
2252        let name = body["destinationName"].as_str().ok_or_else(|| {
2253            AwsServiceError::aws_error(
2254                StatusCode::BAD_REQUEST,
2255                "InvalidParameterException",
2256                "destinationName is required",
2257            )
2258        })?;
2259
2260        validate_string_length("destinationName", name, 1, 512)?;
2261
2262        let policy = body["accessPolicy"].as_str().ok_or_else(|| {
2263            AwsServiceError::aws_error(
2264                StatusCode::BAD_REQUEST,
2265                "InvalidParameterException",
2266                "accessPolicy is required",
2267            )
2268        })?;
2269
2270        validate_string_length("accessPolicy", policy, 1, 5120)?;
2271
2272        let mut state = self.state.write();
2273        let dest = state.destinations.get_mut(name).ok_or_else(|| {
2274            AwsServiceError::aws_error(
2275                StatusCode::BAD_REQUEST,
2276                "ResourceNotFoundException",
2277                format!("The specified destination does not exist: {name}"),
2278            )
2279        })?;
2280
2281        dest.access_policy = Some(policy.to_string());
2282
2283        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2284    }
2285
2286    // ---- Queries ----
2287
2288    fn start_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2289        let body = body_json(req);
2290        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
2291            AwsServiceError::aws_error(
2292                StatusCode::BAD_REQUEST,
2293                "InvalidParameterException",
2294                "logGroupName is required",
2295            )
2296        })?;
2297        let start_time = body["startTime"].as_i64().unwrap_or(0);
2298        let end_time = body["endTime"].as_i64().unwrap_or(0);
2299        let query_string = body["queryString"].as_str().unwrap_or("").to_string();
2300
2301        validate_string_length("logGroupName", log_group_name, 1, 512)?;
2302        validate_optional_string_length("queryString", Some(&query_string), 0, 10000)?;
2303
2304        let mut state = self.state.write();
2305
2306        // Verify log group exists
2307        if !state.log_groups.contains_key(log_group_name) {
2308            return Err(AwsServiceError::aws_error(
2309                StatusCode::BAD_REQUEST,
2310                "ResourceNotFoundException",
2311                "The specified log group does not exist.",
2312            ));
2313        }
2314
2315        let query_id = uuid::Uuid::new_v4().to_string();
2316        let now = Utc::now().timestamp_millis();
2317
2318        state.queries.insert(
2319            query_id.clone(),
2320            QueryInfo {
2321                query_id: query_id.clone(),
2322                log_group_name: log_group_name.to_string(),
2323                query_string,
2324                start_time,
2325                end_time,
2326                status: "Complete".to_string(),
2327                create_time: now,
2328            },
2329        );
2330
2331        Ok(AwsResponse::json(
2332            StatusCode::OK,
2333            serde_json::to_string(&json!({ "queryId": query_id })).unwrap(),
2334        ))
2335    }
2336
2337    fn get_query_results(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2338        let body = body_json(req);
2339        let query_id = body["queryId"].as_str().ok_or_else(|| {
2340            AwsServiceError::aws_error(
2341                StatusCode::BAD_REQUEST,
2342                "InvalidParameterException",
2343                "queryId is required",
2344            )
2345        })?;
2346
2347        validate_string_length("queryId", query_id, 1, 256)?;
2348
2349        let state = self.state.read();
2350        let query = state.queries.get(query_id).ok_or_else(|| {
2351            AwsServiceError::aws_error(
2352                StatusCode::BAD_REQUEST,
2353                "ResourceNotFoundException",
2354                "The specified query does not exist.",
2355            )
2356        })?;
2357
2358        // Find matching log events
2359        let mut results: Vec<Value> = Vec::new();
2360        if let Some(group) = state.log_groups.get(&query.log_group_name) {
2361            for stream in group.log_streams.values() {
2362                for event in &stream.events {
2363                    // Convert timestamps: query uses seconds, events use milliseconds
2364                    let event_time_secs = event.timestamp / 1000;
2365                    if event_time_secs >= query.start_time && event_time_secs < query.end_time {
2366                        results.push(json!([
2367                            {"field": "@message", "value": event.message},
2368                            {"field": "@ptr", "value": format!("{}/{}", stream.name, event.timestamp)},
2369                        ]));
2370                    }
2371                }
2372            }
2373        }
2374
2375        // Sort by @message value
2376        results.sort_by(|a, b| {
2377            let a_msg = a[0]["value"].as_str().unwrap_or("");
2378            let b_msg = b[0]["value"].as_str().unwrap_or("");
2379            a_msg.cmp(b_msg)
2380        });
2381
2382        Ok(AwsResponse::json(
2383            StatusCode::OK,
2384            serde_json::to_string(&json!({
2385                "status": query.status,
2386                "results": results,
2387                "statistics": {
2388                    "recordsMatched": results.len() as f64,
2389                    "recordsScanned": results.len() as f64,
2390                    "bytesScanned": 0.0,
2391                },
2392            }))
2393            .unwrap(),
2394        ))
2395    }
2396
2397    fn describe_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2398        let body = body_json(req);
2399        let log_group_name = body["logGroupName"].as_str();
2400        let status_filter = body["status"].as_str();
2401
2402        validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
2403        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
2404        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2405        validate_optional_enum_value(
2406            "status",
2407            &body["status"],
2408            &[
2409                "Scheduled",
2410                "Running",
2411                "Complete",
2412                "Failed",
2413                "Cancelled",
2414                "Timeout",
2415                "Unknown",
2416            ],
2417        )?;
2418        validate_optional_enum_value(
2419            "queryLanguage",
2420            &body["queryLanguage"],
2421            &["CWLI", "SQL", "PPL"],
2422        )?;
2423
2424        let state = self.state.read();
2425        let queries: Vec<Value> = state
2426            .queries
2427            .values()
2428            .filter(|q| {
2429                if let Some(lg) = log_group_name {
2430                    if q.log_group_name != lg {
2431                        return false;
2432                    }
2433                }
2434                if let Some(status) = status_filter {
2435                    if q.status != status {
2436                        return false;
2437                    }
2438                }
2439                true
2440            })
2441            .map(|q| {
2442                json!({
2443                    "queryId": q.query_id,
2444                    "queryString": q.query_string,
2445                    "status": q.status,
2446                    "createTime": q.create_time,
2447                    "logGroupName": q.log_group_name,
2448                })
2449            })
2450            .collect();
2451
2452        Ok(AwsResponse::json(
2453            StatusCode::OK,
2454            serde_json::to_string(&json!({ "queries": queries })).unwrap(),
2455        ))
2456    }
2457
2458    // ---- Export Tasks ----
2459
2460    fn create_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461        let body = body_json(req);
2462        let log_group_name = body["logGroupName"]
2463            .as_str()
2464            .ok_or_else(|| {
2465                AwsServiceError::aws_error(
2466                    StatusCode::BAD_REQUEST,
2467                    "InvalidParameterException",
2468                    "logGroupName is required",
2469                )
2470            })?
2471            .to_string();
2472        let from_time = body["from"].as_i64().unwrap_or(0);
2473        let to_time = body["to"].as_i64().unwrap_or(0);
2474        let destination = body["destination"]
2475            .as_str()
2476            .ok_or_else(|| {
2477                AwsServiceError::aws_error(
2478                    StatusCode::BAD_REQUEST,
2479                    "InvalidParameterException",
2480                    "destination is required",
2481                )
2482            })?
2483            .to_string();
2484        let destination_prefix = body["destinationPrefix"]
2485            .as_str()
2486            .unwrap_or("exportedlogs")
2487            .to_string();
2488
2489        validate_string_length("logGroupName", &log_group_name, 1, 512)?;
2490        validate_optional_string_length("taskName", body["taskName"].as_str(), 1, 512)?;
2491        validate_optional_string_length(
2492            "logStreamNamePrefix",
2493            body["logStreamNamePrefix"].as_str(),
2494            1,
2495            512,
2496        )?;
2497        validate_string_length("destination", &destination, 1, 512)?;
2498
2499        let state = self.state.read();
2500        if !state.log_groups.contains_key(&log_group_name) {
2501            return Err(AwsServiceError::aws_error(
2502                StatusCode::BAD_REQUEST,
2503                "ResourceNotFoundException",
2504                "The specified log group does not exist.",
2505            ));
2506        }
2507        drop(state);
2508
2509        let task_name = body["taskName"].as_str().map(|s| s.to_string());
2510        let log_stream_name_prefix = body["logStreamNamePrefix"].as_str().map(|s| s.to_string());
2511
2512        let task_id = uuid::Uuid::new_v4().to_string();
2513        let (status_code, status_message) = if from_time < to_time {
2514            (
2515                "COMPLETED".to_string(),
2516                "Completed successfully".to_string(),
2517            )
2518        } else {
2519            ("active".to_string(), "Task is active".to_string())
2520        };
2521
2522        let mut state = self.state.write();
2523        state.export_tasks.push(ExportTask {
2524            task_id: task_id.clone(),
2525            task_name,
2526            log_group_name,
2527            log_stream_name_prefix,
2528            from_time,
2529            to_time,
2530            destination,
2531            destination_prefix,
2532            status_code,
2533            status_message,
2534        });
2535
2536        Ok(AwsResponse::json(
2537            StatusCode::OK,
2538            serde_json::to_string(&json!({ "taskId": task_id })).unwrap(),
2539        ))
2540    }
2541
2542    fn describe_export_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2543        let body = body_json(req);
2544        let task_id_filter = body["taskId"].as_str();
2545
2546        validate_optional_string_length("taskId", task_id_filter, 1, 512)?;
2547        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2548        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2549        validate_optional_enum_value(
2550            "statusCode",
2551            &body["statusCode"],
2552            &[
2553                "CANCELLED",
2554                "COMPLETED",
2555                "FAILED",
2556                "PENDING",
2557                "PENDING_CANCEL",
2558                "RUNNING",
2559            ],
2560        )?;
2561
2562        let state = self.state.read();
2563
2564        if let Some(task_id) = task_id_filter {
2565            let task = state.export_tasks.iter().find(|t| t.task_id == task_id);
2566            if task.is_none() {
2567                return Err(AwsServiceError::aws_error(
2568                    StatusCode::BAD_REQUEST,
2569                    "ResourceNotFoundException",
2570                    "The specified export task does not exist.",
2571                ));
2572            }
2573        }
2574
2575        let tasks: Vec<Value> = state
2576            .export_tasks
2577            .iter()
2578            .filter(|t| {
2579                if let Some(tid) = task_id_filter {
2580                    t.task_id == tid
2581                } else {
2582                    true
2583                }
2584            })
2585            .map(|t| {
2586                let mut obj = json!({
2587                    "taskId": t.task_id,
2588                    "logGroupName": t.log_group_name,
2589                    "from": t.from_time,
2590                    "to": t.to_time,
2591                    "destination": t.destination,
2592                    "destinationPrefix": t.destination_prefix,
2593                    "status": {
2594                        "code": t.status_code,
2595                        "message": t.status_message,
2596                    },
2597                });
2598                if let Some(ref name) = t.task_name {
2599                    obj["taskName"] = json!(name);
2600                }
2601                if let Some(ref prefix) = t.log_stream_name_prefix {
2602                    obj["logStreamNamePrefix"] = json!(prefix);
2603                }
2604                obj
2605            })
2606            .collect();
2607
2608        Ok(AwsResponse::json(
2609            StatusCode::OK,
2610            serde_json::to_string(&json!({ "exportTasks": tasks })).unwrap(),
2611        ))
2612    }
2613
2614    fn cancel_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615        let body = body_json(req);
2616        let task_id = body["taskId"].as_str().ok_or_else(|| {
2617            AwsServiceError::aws_error(
2618                StatusCode::BAD_REQUEST,
2619                "InvalidParameterException",
2620                "taskId is required",
2621            )
2622        })?;
2623
2624        validate_string_length("taskId", task_id, 1, 512)?;
2625
2626        let mut state = self.state.write();
2627        let task = state
2628            .export_tasks
2629            .iter_mut()
2630            .find(|t| t.task_id == task_id)
2631            .ok_or_else(|| {
2632                AwsServiceError::aws_error(
2633                    StatusCode::BAD_REQUEST,
2634                    "ResourceNotFoundException",
2635                    "The specified export task does not exist.",
2636                )
2637            })?;
2638
2639        task.status_code = "CANCELLED".to_string();
2640        task.status_message = "Task was cancelled".to_string();
2641
2642        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2643    }
2644
2645    // ---- Delivery Destinations ----
2646
2647    fn put_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2648        let body = body_json(req);
2649        let name = body["name"]
2650            .as_str()
2651            .ok_or_else(|| {
2652                AwsServiceError::aws_error(
2653                    StatusCode::BAD_REQUEST,
2654                    "InvalidParameterException",
2655                    "name is required",
2656                )
2657            })?
2658            .to_string();
2659
2660        validate_string_length("name", &name, 1, 60)?;
2661
2662        validate_optional_enum_value(
2663            "deliveryDestinationType",
2664            &body["deliveryDestinationType"],
2665            &["S3", "CWL", "FH", "XRAY"],
2666        )?;
2667
2668        let output_format = body["outputFormat"].as_str().map(|s| s.to_string());
2669
2670        // Validate output format
2671        if let Some(ref fmt) = output_format {
2672            let valid = ["json", "plain", "w3c", "raw", "parquet"];
2673            if !valid.contains(&fmt.as_str()) {
2674                return Err(AwsServiceError::aws_error(
2675                    StatusCode::BAD_REQUEST,
2676                    "ValidationException",
2677                    format!("1 validation error detected: Value '{fmt}' at 'outputFormat' failed to satisfy constraint: Member must satisfy enum value set: [json, plain, w3c, raw, parquet]"),
2678                ));
2679            }
2680        }
2681
2682        let config: std::collections::HashMap<String, String> = body
2683            ["deliveryDestinationConfiguration"]
2684            .as_object()
2685            .map(|m| {
2686                m.iter()
2687                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2688                    .collect()
2689            })
2690            .unwrap_or_default();
2691
2692        let tags: std::collections::HashMap<String, String> = body["tags"]
2693            .as_object()
2694            .map(|m| {
2695                m.iter()
2696                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2697                    .collect()
2698            })
2699            .unwrap_or_default();
2700
2701        let mut state = self.state.write();
2702
2703        // Check if updating - cannot change output format
2704        if let Some(existing) = state.delivery_destinations.get(&name) {
2705            if let Some(ref new_fmt) = output_format {
2706                if let Some(ref existing_fmt) = existing.output_format {
2707                    if new_fmt != existing_fmt {
2708                        return Err(AwsServiceError::aws_error(
2709                            StatusCode::BAD_REQUEST,
2710                            "ValidationException",
2711                            "Cannot update outputFormat for an existing delivery destination.",
2712                        ));
2713                    }
2714                }
2715            }
2716        }
2717
2718        let arn = format!(
2719            "arn:aws:logs:{}:{}:delivery-destination:{}",
2720            state.region, state.account_id, name
2721        );
2722
2723        let existing_policy = state
2724            .delivery_destinations
2725            .get(&name)
2726            .and_then(|d| d.delivery_destination_policy.clone());
2727
2728        let dd = DeliveryDestination {
2729            name: name.clone(),
2730            arn: arn.clone(),
2731            output_format: output_format.clone(),
2732            delivery_destination_configuration: config.clone(),
2733            tags: tags.clone(),
2734            delivery_destination_policy: existing_policy,
2735        };
2736
2737        state.delivery_destinations.insert(name.clone(), dd);
2738
2739        // Build the configuration object for the response, preserving existing fields
2740        // and always including destinationResourceArn (Smithy shape requires string, not null)
2741        let config_resp = {
2742            let mut c: serde_json::Map<String, Value> =
2743                config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
2744            c.entry("destinationResourceArn".to_string())
2745                .or_insert_with(|| json!(""));
2746            Value::Object(c)
2747        };
2748
2749        let mut resp = json!({
2750            "deliveryDestination": {
2751                "name": name,
2752                "arn": arn,
2753                "deliveryDestinationConfiguration": config_resp,
2754            }
2755        });
2756        if let Some(ref fmt) = output_format {
2757            resp["deliveryDestination"]["outputFormat"] = json!(fmt);
2758        }
2759        if !tags.is_empty() {
2760            resp["deliveryDestination"]["tags"] = json!(tags);
2761        }
2762
2763        Ok(AwsResponse::json(
2764            StatusCode::OK,
2765            serde_json::to_string(&resp).unwrap(),
2766        ))
2767    }
2768
2769    fn get_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2770        let body = body_json(req);
2771        let name = body["name"].as_str().ok_or_else(|| {
2772            AwsServiceError::aws_error(
2773                StatusCode::BAD_REQUEST,
2774                "InvalidParameterException",
2775                "name is required",
2776            )
2777        })?;
2778
2779        validate_string_length("name", name, 1, 60)?;
2780
2781        let state = self.state.read();
2782        let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2783            AwsServiceError::aws_error(
2784                StatusCode::BAD_REQUEST,
2785                "ResourceNotFoundException",
2786                format!("Delivery destination '{name}' does not exist."),
2787            )
2788        })?;
2789
2790        let mut obj = json!({
2791            "name": dd.name,
2792            "arn": dd.arn,
2793            "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2794        });
2795        if let Some(ref fmt) = dd.output_format {
2796            obj["outputFormat"] = json!(fmt);
2797        }
2798
2799        Ok(AwsResponse::json(
2800            StatusCode::OK,
2801            serde_json::to_string(&json!({ "deliveryDestination": obj })).unwrap(),
2802        ))
2803    }
2804
2805    fn describe_delivery_destinations(
2806        &self,
2807        req: &AwsRequest,
2808    ) -> Result<AwsResponse, AwsServiceError> {
2809        let body = body_json(req);
2810        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2811        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2812
2813        let state = self.state.read();
2814        let dds: Vec<Value> = state
2815            .delivery_destinations
2816            .values()
2817            .map(|dd| {
2818                let mut obj = json!({
2819                    "name": dd.name,
2820                    "arn": dd.arn,
2821                    "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2822                });
2823                if let Some(ref fmt) = dd.output_format {
2824                    obj["outputFormat"] = json!(fmt);
2825                }
2826                obj
2827            })
2828            .collect();
2829
2830        Ok(AwsResponse::json(
2831            StatusCode::OK,
2832            serde_json::to_string(&json!({ "deliveryDestinations": dds })).unwrap(),
2833        ))
2834    }
2835
2836    fn delete_delivery_destination(
2837        &self,
2838        req: &AwsRequest,
2839    ) -> Result<AwsResponse, AwsServiceError> {
2840        let body = body_json(req);
2841        let name = body["name"].as_str().ok_or_else(|| {
2842            AwsServiceError::aws_error(
2843                StatusCode::BAD_REQUEST,
2844                "InvalidParameterException",
2845                "name is required",
2846            )
2847        })?;
2848
2849        validate_string_length("name", name, 1, 60)?;
2850
2851        let mut state = self.state.write();
2852        if state.delivery_destinations.remove(name).is_none() {
2853            return Err(AwsServiceError::aws_error(
2854                StatusCode::BAD_REQUEST,
2855                "ResourceNotFoundException",
2856                format!("Delivery destination '{name}' does not exist."),
2857            ));
2858        }
2859
2860        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2861    }
2862
2863    fn put_delivery_destination_policy(
2864        &self,
2865        req: &AwsRequest,
2866    ) -> Result<AwsResponse, AwsServiceError> {
2867        let body = body_json(req);
2868        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2869            AwsServiceError::aws_error(
2870                StatusCode::BAD_REQUEST,
2871                "InvalidParameterException",
2872                "deliveryDestinationName is required",
2873            )
2874        })?;
2875        let policy = body["deliveryDestinationPolicy"]
2876            .as_str()
2877            .ok_or_else(|| {
2878                AwsServiceError::aws_error(
2879                    StatusCode::BAD_REQUEST,
2880                    "InvalidParameterException",
2881                    "deliveryDestinationPolicy is required",
2882                )
2883            })?
2884            .to_string();
2885
2886        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2887        validate_string_length("deliveryDestinationPolicy", &policy, 1, 51200)?;
2888
2889        let mut state = self.state.write();
2890        let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2891            AwsServiceError::aws_error(
2892                StatusCode::BAD_REQUEST,
2893                "ResourceNotFoundException",
2894                format!("Delivery destination '{name}' does not exist."),
2895            )
2896        })?;
2897
2898        dd.delivery_destination_policy = Some(policy.clone());
2899
2900        Ok(AwsResponse::json(
2901            StatusCode::OK,
2902            serde_json::to_string(&json!({
2903                "policy": {
2904                    "deliveryDestinationPolicy": policy,
2905                }
2906            }))
2907            .unwrap(),
2908        ))
2909    }
2910
2911    fn get_delivery_destination_policy(
2912        &self,
2913        req: &AwsRequest,
2914    ) -> Result<AwsResponse, AwsServiceError> {
2915        let body = body_json(req);
2916        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2917            AwsServiceError::aws_error(
2918                StatusCode::BAD_REQUEST,
2919                "InvalidParameterException",
2920                "deliveryDestinationName is required",
2921            )
2922        })?;
2923
2924        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2925
2926        let state = self.state.read();
2927        let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2928            AwsServiceError::aws_error(
2929                StatusCode::BAD_REQUEST,
2930                "ResourceNotFoundException",
2931                format!("Delivery destination '{name}' does not exist."),
2932            )
2933        })?;
2934
2935        let policy_json = if let Some(ref policy) = dd.delivery_destination_policy {
2936            json!({
2937                "deliveryDestinationPolicy": policy,
2938            })
2939        } else {
2940            json!({})
2941        };
2942
2943        Ok(AwsResponse::json(
2944            StatusCode::OK,
2945            serde_json::to_string(&json!({
2946                "policy": policy_json,
2947            }))
2948            .unwrap(),
2949        ))
2950    }
2951
2952    fn delete_delivery_destination_policy(
2953        &self,
2954        req: &AwsRequest,
2955    ) -> Result<AwsResponse, AwsServiceError> {
2956        let body = body_json(req);
2957        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2958            AwsServiceError::aws_error(
2959                StatusCode::BAD_REQUEST,
2960                "InvalidParameterException",
2961                "deliveryDestinationName is required",
2962            )
2963        })?;
2964
2965        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2966
2967        let mut state = self.state.write();
2968        let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2969            AwsServiceError::aws_error(
2970                StatusCode::BAD_REQUEST,
2971                "ResourceNotFoundException",
2972                format!("Delivery destination '{name}' does not exist."),
2973            )
2974        })?;
2975
2976        dd.delivery_destination_policy = None;
2977
2978        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2979    }
2980
2981    // ---- Delivery Sources ----
2982
2983    fn put_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2984        let body = body_json(req);
2985        let name = body["name"]
2986            .as_str()
2987            .ok_or_else(|| {
2988                AwsServiceError::aws_error(
2989                    StatusCode::BAD_REQUEST,
2990                    "InvalidParameterException",
2991                    "name is required",
2992                )
2993            })?
2994            .to_string();
2995        let resource_arn = body["resourceArn"]
2996            .as_str()
2997            .ok_or_else(|| {
2998                AwsServiceError::aws_error(
2999                    StatusCode::BAD_REQUEST,
3000                    "InvalidParameterException",
3001                    "resourceArn is required",
3002                )
3003            })?
3004            .to_string();
3005        let log_type = body["logType"]
3006            .as_str()
3007            .ok_or_else(|| {
3008                AwsServiceError::aws_error(
3009                    StatusCode::BAD_REQUEST,
3010                    "InvalidParameterException",
3011                    "logType is required",
3012                )
3013            })?
3014            .to_string();
3015
3016        validate_string_length("name", &name, 1, 60)?;
3017        validate_string_length("logType", &log_type, 1, 255)?;
3018
3019        let tags: std::collections::HashMap<String, String> = body["tags"]
3020            .as_object()
3021            .map(|m| {
3022                m.iter()
3023                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3024                    .collect()
3025            })
3026            .unwrap_or_default();
3027
3028        // Extract service from ARN
3029        let service = resource_arn
3030            .split(':')
3031            .nth(2)
3032            .unwrap_or("unknown")
3033            .to_string();
3034
3035        // Validate resource ARN format - must start with arn:aws:
3036        if !resource_arn.starts_with("arn:aws:") {
3037            return Err(AwsServiceError::aws_error(
3038                StatusCode::BAD_REQUEST,
3039                "ValidationException",
3040                format!("Invalid resource ARN: {resource_arn}"),
3041            ));
3042        }
3043
3044        // S3 cannot be a delivery source
3045        if service == "s3" {
3046            return Err(AwsServiceError::aws_error(
3047                StatusCode::BAD_REQUEST,
3048                "ResourceNotFoundException",
3049                format!("The resource ARN '{resource_arn}' is not a valid delivery source."),
3050            ));
3051        }
3052
3053        // Validate log type based on service
3054        let valid_log_types: &[&str] = match service.as_str() {
3055            "cloudfront" => &["ACCESS_LOGS"],
3056            _ => &["ACCESS_LOGS", "APPLICATION_LOGS", "FW_LOGS"],
3057        };
3058        if !valid_log_types.contains(&log_type.as_str()) {
3059            return Err(AwsServiceError::aws_error(
3060                StatusCode::BAD_REQUEST,
3061                "ValidationException",
3062                format!("Log type '{log_type}' is not valid for this resource."),
3063            ));
3064        }
3065
3066        let mut state = self.state.write();
3067
3068        // Cannot update with different resourceArn
3069        if let Some(existing) = state.delivery_sources.get(&name) {
3070            if !existing.resource_arns.is_empty() && existing.resource_arns[0] != resource_arn {
3071                return Err(AwsServiceError::aws_error(
3072                    StatusCode::BAD_REQUEST,
3073                    "ConflictException",
3074                    "Cannot update delivery source with a different resourceArn.",
3075                ));
3076            }
3077        }
3078
3079        let arn = format!(
3080            "arn:aws:logs:{}:{}:delivery-source:{}",
3081            state.region, state.account_id, name
3082        );
3083
3084        let ds = DeliverySource {
3085            name: name.clone(),
3086            arn: arn.clone(),
3087            resource_arns: vec![resource_arn],
3088            service: service.clone(),
3089            log_type: log_type.clone(),
3090            tags: tags.clone(),
3091        };
3092
3093        state.delivery_sources.insert(name.clone(), ds);
3094
3095        let state_ref = state.delivery_sources.get(&name).unwrap();
3096
3097        Ok(AwsResponse::json(
3098            StatusCode::OK,
3099            serde_json::to_string(&json!({
3100                "deliverySource": {
3101                    "name": state_ref.name,
3102                    "arn": state_ref.arn,
3103                    "resourceArns": state_ref.resource_arns,
3104                    "service": state_ref.service,
3105                    "logType": state_ref.log_type,
3106                    "tags": state_ref.tags,
3107                }
3108            }))
3109            .unwrap(),
3110        ))
3111    }
3112
3113    fn get_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3114        let body = body_json(req);
3115        let name = body["name"].as_str().ok_or_else(|| {
3116            AwsServiceError::aws_error(
3117                StatusCode::BAD_REQUEST,
3118                "InvalidParameterException",
3119                "name is required",
3120            )
3121        })?;
3122
3123        validate_string_length("name", name, 1, 60)?;
3124
3125        let state = self.state.read();
3126        let ds = state.delivery_sources.get(name).ok_or_else(|| {
3127            AwsServiceError::aws_error(
3128                StatusCode::BAD_REQUEST,
3129                "ResourceNotFoundException",
3130                format!("Delivery source '{name}' does not exist."),
3131            )
3132        })?;
3133
3134        Ok(AwsResponse::json(
3135            StatusCode::OK,
3136            serde_json::to_string(&json!({
3137                "deliverySource": {
3138                    "name": ds.name,
3139                    "arn": ds.arn,
3140                    "resourceArns": ds.resource_arns,
3141                    "service": ds.service,
3142                    "logType": ds.log_type,
3143                    "tags": ds.tags,
3144                }
3145            }))
3146            .unwrap(),
3147        ))
3148    }
3149
3150    fn describe_delivery_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3151        let body = body_json(req);
3152        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3153        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3154
3155        let state = self.state.read();
3156        let sources: Vec<Value> = state
3157            .delivery_sources
3158            .values()
3159            .map(|ds| {
3160                json!({
3161                    "name": ds.name,
3162                    "arn": ds.arn,
3163                    "resourceArns": ds.resource_arns,
3164                    "service": ds.service,
3165                    "logType": ds.log_type,
3166                    "tags": ds.tags,
3167                })
3168            })
3169            .collect();
3170
3171        Ok(AwsResponse::json(
3172            StatusCode::OK,
3173            serde_json::to_string(&json!({ "deliverySources": sources })).unwrap(),
3174        ))
3175    }
3176
3177    fn delete_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3178        let body = body_json(req);
3179        let name = body["name"].as_str().ok_or_else(|| {
3180            AwsServiceError::aws_error(
3181                StatusCode::BAD_REQUEST,
3182                "InvalidParameterException",
3183                "name is required",
3184            )
3185        })?;
3186
3187        validate_string_length("name", name, 1, 60)?;
3188
3189        let mut state = self.state.write();
3190        if state.delivery_sources.remove(name).is_none() {
3191            return Err(AwsServiceError::aws_error(
3192                StatusCode::BAD_REQUEST,
3193                "ResourceNotFoundException",
3194                format!("Delivery source '{name}' does not exist."),
3195            ));
3196        }
3197
3198        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3199    }
3200
3201    // ---- Deliveries ----
3202
3203    fn create_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3204        let body = body_json(req);
3205        let delivery_source_name = body["deliverySourceName"]
3206            .as_str()
3207            .ok_or_else(|| {
3208                AwsServiceError::aws_error(
3209                    StatusCode::BAD_REQUEST,
3210                    "InvalidParameterException",
3211                    "deliverySourceName is required",
3212                )
3213            })?
3214            .to_string();
3215        let delivery_destination_arn = body["deliveryDestinationArn"]
3216            .as_str()
3217            .ok_or_else(|| {
3218                AwsServiceError::aws_error(
3219                    StatusCode::BAD_REQUEST,
3220                    "InvalidParameterException",
3221                    "deliveryDestinationArn is required",
3222                )
3223            })?
3224            .to_string();
3225
3226        validate_string_length("deliverySourceName", &delivery_source_name, 1, 60)?;
3227        validate_optional_string_length("fieldDelimiter", body["fieldDelimiter"].as_str(), 0, 5)?;
3228
3229        let tags: std::collections::HashMap<String, String> = body["tags"]
3230            .as_object()
3231            .map(|m| {
3232                m.iter()
3233                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3234                    .collect()
3235            })
3236            .unwrap_or_default();
3237
3238        let record_fields: Vec<String> = body["recordFields"]
3239            .as_array()
3240            .map(|a| {
3241                a.iter()
3242                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3243                    .collect()
3244            })
3245            .unwrap_or_default();
3246        let field_delimiter = body["fieldDelimiter"].as_str().map(|s| s.to_string());
3247        let s3_delivery_config = body["s3DeliveryConfiguration"].clone();
3248
3249        let mut state = self.state.write();
3250
3251        // Verify source exists
3252        if !state.delivery_sources.contains_key(&delivery_source_name) {
3253            return Err(AwsServiceError::aws_error(
3254                StatusCode::BAD_REQUEST,
3255                "ResourceNotFoundException",
3256                format!("Delivery source '{}' does not exist.", delivery_source_name),
3257            ));
3258        }
3259
3260        // Verify destination exists
3261        let dest_exists = state
3262            .delivery_destinations
3263            .values()
3264            .any(|dd| dd.arn == delivery_destination_arn);
3265        if !dest_exists {
3266            return Err(AwsServiceError::aws_error(
3267                StatusCode::BAD_REQUEST,
3268                "ResourceNotFoundException",
3269                format!(
3270                    "Delivery destination '{}' does not exist.",
3271                    delivery_destination_arn
3272                ),
3273            ));
3274        }
3275
3276        // Check for duplicate delivery (same source + destination)
3277        let already_exists = state.deliveries.values().any(|d| {
3278            d.delivery_source_name == delivery_source_name
3279                && d.delivery_destination_arn == delivery_destination_arn
3280        });
3281        if already_exists {
3282            return Err(AwsServiceError::aws_error(
3283                StatusCode::BAD_REQUEST,
3284                "ConflictException",
3285                "A delivery already exists for this source and destination.",
3286            ));
3287        }
3288
3289        // Determine destination type from ARN
3290        let dest_type = if delivery_destination_arn.contains(":s3:") {
3291            "S3"
3292        } else if delivery_destination_arn.contains(":firehose:") {
3293            "FH"
3294        } else {
3295            "CWL"
3296        };
3297
3298        let delivery_id = uuid::Uuid::new_v4().to_string();
3299        let arn = format!(
3300            "arn:aws:logs:{}:{}:delivery:{}",
3301            state.region, state.account_id, delivery_id
3302        );
3303
3304        let delivery = Delivery {
3305            id: delivery_id.clone(),
3306            delivery_source_name: delivery_source_name.clone(),
3307            delivery_destination_arn: delivery_destination_arn.clone(),
3308            delivery_destination_type: dest_type.to_string(),
3309            arn: arn.clone(),
3310            tags: tags.clone(),
3311        };
3312
3313        state.deliveries.insert(delivery_id.clone(), delivery);
3314
3315        let mut delivery_json = json!({
3316            "id": delivery_id,
3317            "deliverySourceName": delivery_source_name,
3318            "deliveryDestinationArn": delivery_destination_arn,
3319            "deliveryDestinationType": dest_type,
3320            "arn": arn,
3321            "tags": tags,
3322        });
3323        if !record_fields.is_empty() {
3324            delivery_json["recordFields"] = json!(record_fields);
3325        }
3326        if let Some(ref delim) = field_delimiter {
3327            delivery_json["fieldDelimiter"] = json!(delim);
3328        }
3329        if !s3_delivery_config.is_null() {
3330            delivery_json["s3DeliveryConfiguration"] = s3_delivery_config;
3331        }
3332
3333        Ok(AwsResponse::json(
3334            StatusCode::OK,
3335            serde_json::to_string(&json!({
3336                "delivery": delivery_json,
3337            }))
3338            .unwrap(),
3339        ))
3340    }
3341
3342    fn get_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3343        let body = body_json(req);
3344        let delivery_id = body["id"].as_str().ok_or_else(|| {
3345            AwsServiceError::aws_error(
3346                StatusCode::BAD_REQUEST,
3347                "InvalidParameterException",
3348                "id is required",
3349            )
3350        })?;
3351
3352        validate_string_length("id", delivery_id, 1, 64)?;
3353
3354        let state = self.state.read();
3355        let d = state.deliveries.get(delivery_id).ok_or_else(|| {
3356            AwsServiceError::aws_error(
3357                StatusCode::BAD_REQUEST,
3358                "ResourceNotFoundException",
3359                format!("Delivery '{delivery_id}' does not exist."),
3360            )
3361        })?;
3362
3363        Ok(AwsResponse::json(
3364            StatusCode::OK,
3365            serde_json::to_string(&json!({
3366                "delivery": {
3367                    "id": d.id,
3368                    "deliverySourceName": d.delivery_source_name,
3369                    "deliveryDestinationArn": d.delivery_destination_arn,
3370                    "deliveryDestinationType": d.delivery_destination_type,
3371                    "arn": d.arn,
3372                    "tags": d.tags,
3373                }
3374            }))
3375            .unwrap(),
3376        ))
3377    }
3378
3379    fn describe_deliveries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3380        let body = body_json(req);
3381        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3382        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3383
3384        let state = self.state.read();
3385        let deliveries: Vec<Value> = state
3386            .deliveries
3387            .values()
3388            .map(|d| {
3389                json!({
3390                    "id": d.id,
3391                    "deliverySourceName": d.delivery_source_name,
3392                    "deliveryDestinationArn": d.delivery_destination_arn,
3393                    "deliveryDestinationType": d.delivery_destination_type,
3394                    "arn": d.arn,
3395                    "tags": d.tags,
3396                })
3397            })
3398            .collect();
3399
3400        Ok(AwsResponse::json(
3401            StatusCode::OK,
3402            serde_json::to_string(&json!({ "deliveries": deliveries })).unwrap(),
3403        ))
3404    }
3405
3406    fn delete_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3407        let body = body_json(req);
3408        let delivery_id = body["id"].as_str().ok_or_else(|| {
3409            AwsServiceError::aws_error(
3410                StatusCode::BAD_REQUEST,
3411                "InvalidParameterException",
3412                "id is required",
3413            )
3414        })?;
3415
3416        validate_string_length("id", delivery_id, 1, 64)?;
3417
3418        let mut state = self.state.write();
3419        if state.deliveries.remove(delivery_id).is_none() {
3420            return Err(AwsServiceError::aws_error(
3421                StatusCode::BAD_REQUEST,
3422                "ResourceNotFoundException",
3423                format!("Delivery '{delivery_id}' does not exist."),
3424            ));
3425        }
3426
3427        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3428    }
3429
3430    // ---- KMS Key ----
3431
3432    fn associate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3433        let body = body_json(req);
3434        let log_group_name = body["logGroupName"].as_str();
3435        let resource_identifier = body["resourceIdentifier"].as_str();
3436        let kms_key_id = body["kmsKeyId"]
3437            .as_str()
3438            .ok_or_else(|| {
3439                AwsServiceError::aws_error(
3440                    StatusCode::BAD_REQUEST,
3441                    "InvalidParameterException",
3442                    "kmsKeyId is required",
3443                )
3444            })?
3445            .to_string();
3446
3447        if let Some(name) = log_group_name {
3448            validate_string_length("logGroupName", name, 1, 512)?;
3449        }
3450        validate_string_length("kmsKeyId", &kms_key_id, 1, 256)?;
3451        validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3452
3453        let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3454
3455        let mut state = self.state.write();
3456        let group = state
3457            .log_groups
3458            .get_mut(resolved_name.as_str())
3459            .ok_or_else(|| {
3460                AwsServiceError::aws_error(
3461                    StatusCode::BAD_REQUEST,
3462                    "ResourceNotFoundException",
3463                    format!("The specified log group does not exist: {resolved_name}"),
3464                )
3465            })?;
3466
3467        group.kms_key_id = Some(kms_key_id);
3468
3469        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3470    }
3471
3472    fn disassociate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3473        let body = body_json(req);
3474        let log_group_name = body["logGroupName"].as_str();
3475        let resource_identifier = body["resourceIdentifier"].as_str();
3476
3477        if let Some(name) = log_group_name {
3478            validate_string_length("logGroupName", name, 1, 512)?;
3479        }
3480        validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3481
3482        let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3483
3484        let mut state = self.state.write();
3485        let group = state
3486            .log_groups
3487            .get_mut(resolved_name.as_str())
3488            .ok_or_else(|| {
3489                AwsServiceError::aws_error(
3490                    StatusCode::BAD_REQUEST,
3491                    "ResourceNotFoundException",
3492                    format!("The specified log group does not exist: {resolved_name}"),
3493                )
3494            })?;
3495
3496        group.kms_key_id = None;
3497
3498        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3499    }
3500
3501    // ---- Query Definitions ----
3502
3503    fn put_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3504        let body = body_json(req);
3505        let name = body["name"]
3506            .as_str()
3507            .ok_or_else(|| {
3508                AwsServiceError::aws_error(
3509                    StatusCode::BAD_REQUEST,
3510                    "InvalidParameterException",
3511                    "name is required",
3512                )
3513            })?
3514            .to_string();
3515        let query_string = body["queryString"]
3516            .as_str()
3517            .ok_or_else(|| {
3518                AwsServiceError::aws_error(
3519                    StatusCode::BAD_REQUEST,
3520                    "InvalidParameterException",
3521                    "queryString is required",
3522                )
3523            })?
3524            .to_string();
3525        let log_group_names: Vec<String> = body["logGroupNames"]
3526            .as_array()
3527            .map(|a| {
3528                a.iter()
3529                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3530                    .collect()
3531            })
3532            .unwrap_or_default();
3533
3534        let query_definition_id = body["queryDefinitionId"]
3535            .as_str()
3536            .map(|s| s.to_string())
3537            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
3538
3539        validate_string_length("name", &name, 1, 255)?;
3540        validate_string_length("queryString", &query_string, 1, 10000)?;
3541        validate_optional_string_length(
3542            "queryDefinitionId",
3543            body["queryDefinitionId"].as_str(),
3544            1,
3545            256,
3546        )?;
3547        validate_optional_string_length("clientToken", body["clientToken"].as_str(), 36, 128)?;
3548        validate_optional_enum_value(
3549            "queryLanguage",
3550            &body["queryLanguage"],
3551            &["CWLI", "SQL", "PPL"],
3552        )?;
3553
3554        let now = Utc::now().timestamp_millis();
3555
3556        let mut state = self.state.write();
3557        state.query_definitions.insert(
3558            query_definition_id.clone(),
3559            QueryDefinition {
3560                query_definition_id: query_definition_id.clone(),
3561                name,
3562                query_string,
3563                log_group_names,
3564                last_modified: now,
3565            },
3566        );
3567
3568        Ok(AwsResponse::json(
3569            StatusCode::OK,
3570            serde_json::to_string(&json!({
3571                "queryDefinitionId": query_definition_id,
3572            }))
3573            .unwrap(),
3574        ))
3575    }
3576
3577    fn describe_query_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3578        let body = body_json(req);
3579        let name_prefix = body["queryDefinitionNamePrefix"].as_str().unwrap_or("");
3580        validate_optional_string_length(
3581            "queryDefinitionNamePrefix",
3582            body["queryDefinitionNamePrefix"].as_str(),
3583            1,
3584            255,
3585        )?;
3586        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
3587        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3588        validate_optional_enum_value(
3589            "queryLanguage",
3590            &body["queryLanguage"],
3591            &["CWLI", "SQL", "PPL"],
3592        )?;
3593
3594        let state = self.state.read();
3595        let defs: Vec<Value> = state
3596            .query_definitions
3597            .values()
3598            .filter(|qd| name_prefix.is_empty() || qd.name.starts_with(name_prefix))
3599            .map(|qd| {
3600                json!({
3601                    "queryDefinitionId": qd.query_definition_id,
3602                    "name": qd.name,
3603                    "queryString": qd.query_string,
3604                    "logGroupNames": qd.log_group_names,
3605                    "lastModified": qd.last_modified,
3606                })
3607            })
3608            .collect();
3609
3610        Ok(AwsResponse::json(
3611            StatusCode::OK,
3612            serde_json::to_string(&json!({ "queryDefinitions": defs })).unwrap(),
3613        ))
3614    }
3615
3616    fn delete_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3617        let body = body_json(req);
3618        let qd_id = body["queryDefinitionId"].as_str().ok_or_else(|| {
3619            AwsServiceError::aws_error(
3620                StatusCode::BAD_REQUEST,
3621                "InvalidParameterException",
3622                "queryDefinitionId is required",
3623            )
3624        })?;
3625
3626        validate_string_length("queryDefinitionId", qd_id, 1, 256)?;
3627
3628        let mut state = self.state.write();
3629        let success = state.query_definitions.remove(qd_id).is_some();
3630
3631        Ok(AwsResponse::json(
3632            StatusCode::OK,
3633            serde_json::to_string(&json!({ "success": success })).unwrap(),
3634        ))
3635    }
3636
3637    // ---- Account Policies ----
3638
3639    fn put_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3640        let body = body_json(req);
3641        let policy_name = body["policyName"].as_str().ok_or_else(|| {
3642            AwsServiceError::aws_error(
3643                StatusCode::BAD_REQUEST,
3644                "InvalidParameterException",
3645                "policyName is required",
3646            )
3647        })?;
3648        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3649            AwsServiceError::aws_error(
3650                StatusCode::BAD_REQUEST,
3651                "InvalidParameterException",
3652                "policyType is required",
3653            )
3654        })?;
3655        let policy_document = body["policyDocument"].as_str().ok_or_else(|| {
3656            AwsServiceError::aws_error(
3657                StatusCode::BAD_REQUEST,
3658                "InvalidParameterException",
3659                "policyDocument is required",
3660            )
3661        })?;
3662
3663        let now = Utc::now().timestamp_millis();
3664        let mut state = self.state.write();
3665        let account_id = state.account_id.clone();
3666        let scope = body["scope"].as_str().map(|s| s.to_string());
3667        let selection_criteria = body["selectionCriteria"].as_str().map(|s| s.to_string());
3668
3669        let policy = AccountPolicy {
3670            policy_name: policy_name.to_string(),
3671            policy_type: policy_type.to_string(),
3672            policy_document: policy_document.to_string(),
3673            scope: scope.clone(),
3674            selection_criteria: selection_criteria.clone(),
3675            account_id: account_id.clone(),
3676            last_updated_time: now,
3677        };
3678
3679        let key = (policy_name.to_string(), policy_type.to_string());
3680        state.account_policies.insert(key, policy);
3681
3682        let mut result = json!({
3683            "accountPolicy": {
3684                "policyName": policy_name,
3685                "policyType": policy_type,
3686                "policyDocument": policy_document,
3687                "accountId": account_id,
3688                "lastUpdatedTime": now,
3689            }
3690        });
3691        if let Some(s) = scope {
3692            result["accountPolicy"]["scope"] = json!(s);
3693        }
3694        if let Some(s) = selection_criteria {
3695            result["accountPolicy"]["selectionCriteria"] = json!(s);
3696        }
3697
3698        Ok(AwsResponse::json(
3699            StatusCode::OK,
3700            serde_json::to_string(&result).unwrap(),
3701        ))
3702    }
3703
3704    fn describe_account_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3705        let body = body_json(req);
3706        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3707            AwsServiceError::aws_error(
3708                StatusCode::BAD_REQUEST,
3709                "InvalidParameterException",
3710                "policyType is required",
3711            )
3712        })?;
3713        let policy_name = body["policyName"].as_str();
3714
3715        let state = self.state.read();
3716        let policies: Vec<Value> = state
3717            .account_policies
3718            .values()
3719            .filter(|p| {
3720                p.policy_type == policy_type && policy_name.is_none_or(|n| p.policy_name == n)
3721            })
3722            .map(|p| {
3723                let mut obj = json!({
3724                    "policyName": p.policy_name,
3725                    "policyType": p.policy_type,
3726                    "policyDocument": p.policy_document,
3727                    "accountId": p.account_id,
3728                    "lastUpdatedTime": p.last_updated_time,
3729                });
3730                if let Some(ref s) = p.scope {
3731                    obj["scope"] = json!(s);
3732                }
3733                if let Some(ref s) = p.selection_criteria {
3734                    obj["selectionCriteria"] = json!(s);
3735                }
3736                obj
3737            })
3738            .collect();
3739
3740        Ok(AwsResponse::json(
3741            StatusCode::OK,
3742            serde_json::to_string(&json!({ "accountPolicies": policies })).unwrap(),
3743        ))
3744    }
3745
3746    fn delete_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3747        let body = body_json(req);
3748        let policy_name = body["policyName"].as_str().ok_or_else(|| {
3749            AwsServiceError::aws_error(
3750                StatusCode::BAD_REQUEST,
3751                "InvalidParameterException",
3752                "policyName is required",
3753            )
3754        })?;
3755        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3756            AwsServiceError::aws_error(
3757                StatusCode::BAD_REQUEST,
3758                "InvalidParameterException",
3759                "policyType is required",
3760            )
3761        })?;
3762
3763        let key = (policy_name.to_string(), policy_type.to_string());
3764        let mut state = self.state.write();
3765        if state.account_policies.remove(&key).is_none() {
3766            return Err(AwsServiceError::aws_error(
3767                StatusCode::BAD_REQUEST,
3768                "ResourceNotFoundException",
3769                format!("Account policy {policy_name} of type {policy_type} not found"),
3770            ));
3771        }
3772
3773        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3774    }
3775
3776    // ---- Data Protection Policies ----
3777
3778    fn put_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3779        let body = body_json(req);
3780        let log_group_id = body["logGroupIdentifier"]
3781            .as_str()
3782            .ok_or_else(|| {
3783                AwsServiceError::aws_error(
3784                    StatusCode::BAD_REQUEST,
3785                    "InvalidParameterException",
3786                    "logGroupIdentifier is required",
3787                )
3788            })?
3789            .to_string();
3790        let policy_document = body["policyDocument"]
3791            .as_str()
3792            .ok_or_else(|| {
3793                AwsServiceError::aws_error(
3794                    StatusCode::BAD_REQUEST,
3795                    "InvalidParameterException",
3796                    "policyDocument is required",
3797                )
3798            })?
3799            .to_string();
3800
3801        let group_name = if log_group_id.starts_with("arn:") {
3802            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3803                AwsServiceError::aws_error(
3804                    StatusCode::BAD_REQUEST,
3805                    "InvalidParameterException",
3806                    format!("Invalid ARN: {log_group_id}"),
3807                )
3808            })?
3809        } else {
3810            log_group_id.clone()
3811        };
3812
3813        let now = Utc::now().timestamp_millis();
3814        let mut state = self.state.write();
3815        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3816            AwsServiceError::aws_error(
3817                StatusCode::BAD_REQUEST,
3818                "ResourceNotFoundException",
3819                format!("The specified log group does not exist: {group_name}"),
3820            )
3821        })?;
3822        let log_group_id_resp = group.arn.clone();
3823
3824        group.data_protection_policy = Some(DataProtectionPolicy {
3825            policy_document: policy_document.clone(),
3826            last_updated_time: now,
3827        });
3828
3829        Ok(AwsResponse::json(
3830            StatusCode::OK,
3831            serde_json::to_string(&json!({
3832                "logGroupIdentifier": log_group_id_resp,
3833                "policyDocument": policy_document,
3834                "lastUpdatedTime": now,
3835            }))
3836            .unwrap(),
3837        ))
3838    }
3839
3840    fn get_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3841        let body = body_json(req);
3842        let log_group_id = body["logGroupIdentifier"]
3843            .as_str()
3844            .ok_or_else(|| {
3845                AwsServiceError::aws_error(
3846                    StatusCode::BAD_REQUEST,
3847                    "InvalidParameterException",
3848                    "logGroupIdentifier is required",
3849                )
3850            })?
3851            .to_string();
3852
3853        let group_name = if log_group_id.starts_with("arn:") {
3854            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3855                AwsServiceError::aws_error(
3856                    StatusCode::BAD_REQUEST,
3857                    "InvalidParameterException",
3858                    format!("Invalid ARN: {log_group_id}"),
3859                )
3860            })?
3861        } else {
3862            log_group_id.clone()
3863        };
3864
3865        let state = self.state.read();
3866        let group = state.log_groups.get(&group_name).ok_or_else(|| {
3867            AwsServiceError::aws_error(
3868                StatusCode::BAD_REQUEST,
3869                "ResourceNotFoundException",
3870                format!("The specified log group does not exist: {group_name}"),
3871            )
3872        })?;
3873
3874        let mut result = json!({
3875            "logGroupIdentifier": group.arn,
3876        });
3877        if let Some(ref dp) = group.data_protection_policy {
3878            result["policyDocument"] = json!(dp.policy_document);
3879            result["lastUpdatedTime"] = json!(dp.last_updated_time);
3880        }
3881
3882        Ok(AwsResponse::json(
3883            StatusCode::OK,
3884            serde_json::to_string(&result).unwrap(),
3885        ))
3886    }
3887
3888    fn delete_data_protection_policy(
3889        &self,
3890        req: &AwsRequest,
3891    ) -> Result<AwsResponse, AwsServiceError> {
3892        let body = body_json(req);
3893        let log_group_id = body["logGroupIdentifier"]
3894            .as_str()
3895            .ok_or_else(|| {
3896                AwsServiceError::aws_error(
3897                    StatusCode::BAD_REQUEST,
3898                    "InvalidParameterException",
3899                    "logGroupIdentifier is required",
3900                )
3901            })?
3902            .to_string();
3903
3904        let group_name = if log_group_id.starts_with("arn:") {
3905            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3906                AwsServiceError::aws_error(
3907                    StatusCode::BAD_REQUEST,
3908                    "InvalidParameterException",
3909                    format!("Invalid ARN: {log_group_id}"),
3910                )
3911            })?
3912        } else {
3913            log_group_id
3914        };
3915
3916        let mut state = self.state.write();
3917        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3918            AwsServiceError::aws_error(
3919                StatusCode::BAD_REQUEST,
3920                "ResourceNotFoundException",
3921                format!("The specified log group does not exist: {group_name}"),
3922            )
3923        })?;
3924
3925        if group.data_protection_policy.is_none() {
3926            return Err(AwsServiceError::aws_error(
3927                StatusCode::BAD_REQUEST,
3928                "ResourceNotFoundException",
3929                "No data protection policy found for this log group",
3930            ));
3931        }
3932
3933        group.data_protection_policy = None;
3934        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3935    }
3936
3937    // ---- Index Policies ----
3938
3939    fn put_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3940        let body = body_json(req);
3941        let log_group_id = body["logGroupIdentifier"]
3942            .as_str()
3943            .ok_or_else(|| {
3944                AwsServiceError::aws_error(
3945                    StatusCode::BAD_REQUEST,
3946                    "InvalidParameterException",
3947                    "logGroupIdentifier is required",
3948                )
3949            })?
3950            .to_string();
3951        let policy_document = body["policyDocument"]
3952            .as_str()
3953            .ok_or_else(|| {
3954                AwsServiceError::aws_error(
3955                    StatusCode::BAD_REQUEST,
3956                    "InvalidParameterException",
3957                    "policyDocument is required",
3958                )
3959            })?
3960            .to_string();
3961
3962        let group_name = if log_group_id.starts_with("arn:") {
3963            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3964                AwsServiceError::aws_error(
3965                    StatusCode::BAD_REQUEST,
3966                    "InvalidParameterException",
3967                    format!("Invalid ARN: {log_group_id}"),
3968                )
3969            })?
3970        } else {
3971            log_group_id.clone()
3972        };
3973
3974        let policy_name = body["policyName"].as_str().unwrap_or("default").to_string();
3975
3976        let now = Utc::now().timestamp_millis();
3977        let mut state = self.state.write();
3978        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3979            AwsServiceError::aws_error(
3980                StatusCode::BAD_REQUEST,
3981                "ResourceNotFoundException",
3982                format!("The specified log group does not exist: {group_name}"),
3983            )
3984        })?;
3985
3986        // Replace existing policy with same name, or add new one
3987        if let Some(existing) = group
3988            .index_policies
3989            .iter_mut()
3990            .find(|p| p.policy_name == policy_name)
3991        {
3992            existing.policy_document = policy_document.clone();
3993            existing.last_updated_time = now;
3994        } else {
3995            group.index_policies.push(IndexPolicy {
3996                policy_name: policy_name.clone(),
3997                policy_document: policy_document.clone(),
3998                last_updated_time: now,
3999            });
4000        }
4001
4002        let result = json!({
4003            "indexPolicy": {
4004                "policyName": policy_name,
4005                "policyDocument": policy_document,
4006                "logGroupIdentifier": group.arn,
4007                "lastUpdateTime": now,
4008            }
4009        });
4010
4011        Ok(AwsResponse::json(
4012            StatusCode::OK,
4013            serde_json::to_string(&result).unwrap(),
4014        ))
4015    }
4016
4017    fn describe_index_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4018        let body = body_json(req);
4019        let log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4020            AwsServiceError::aws_error(
4021                StatusCode::BAD_REQUEST,
4022                "InvalidParameterException",
4023                "logGroupIdentifiers is required",
4024            )
4025        })?;
4026
4027        let state = self.state.read();
4028        let mut policies = Vec::new();
4029
4030        for id_val in log_group_ids {
4031            let id = id_val.as_str().unwrap_or("");
4032            let group_name = if id.starts_with("arn:") {
4033                extract_log_group_from_arn(id).unwrap_or_default()
4034            } else {
4035                id.to_string()
4036            };
4037            if let Some(group) = state.log_groups.get(&group_name) {
4038                for p in &group.index_policies {
4039                    policies.push(json!({
4040                        "policyName": p.policy_name,
4041                        "policyDocument": p.policy_document,
4042                        "logGroupIdentifier": group.arn,
4043                        "lastUpdateTime": p.last_updated_time,
4044                    }));
4045                }
4046            }
4047        }
4048
4049        Ok(AwsResponse::json(
4050            StatusCode::OK,
4051            serde_json::to_string(&json!({ "indexPolicies": policies })).unwrap(),
4052        ))
4053    }
4054
4055    fn delete_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4056        let body = body_json(req);
4057        let log_group_id = body["logGroupIdentifier"]
4058            .as_str()
4059            .ok_or_else(|| {
4060                AwsServiceError::aws_error(
4061                    StatusCode::BAD_REQUEST,
4062                    "InvalidParameterException",
4063                    "logGroupIdentifier is required",
4064                )
4065            })?
4066            .to_string();
4067
4068        let group_name = if log_group_id.starts_with("arn:") {
4069            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4070                AwsServiceError::aws_error(
4071                    StatusCode::BAD_REQUEST,
4072                    "InvalidParameterException",
4073                    format!("Invalid ARN: {log_group_id}"),
4074                )
4075            })?
4076        } else {
4077            log_group_id
4078        };
4079
4080        let mut state = self.state.write();
4081        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4082            AwsServiceError::aws_error(
4083                StatusCode::BAD_REQUEST,
4084                "ResourceNotFoundException",
4085                format!("The specified log group does not exist: {group_name}"),
4086            )
4087        })?;
4088
4089        if group.index_policies.is_empty() {
4090            return Err(AwsServiceError::aws_error(
4091                StatusCode::BAD_REQUEST,
4092                "ResourceNotFoundException",
4093                "No index policy found for this log group",
4094            ));
4095        }
4096
4097        group.index_policies.clear();
4098        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4099    }
4100
4101    fn describe_field_indexes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4102        let body = body_json(req);
4103        // Validate that logGroupIdentifiers is provided
4104        let _log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4105            AwsServiceError::aws_error(
4106                StatusCode::BAD_REQUEST,
4107                "InvalidParameterException",
4108                "logGroupIdentifiers is required",
4109            )
4110        })?;
4111
4112        // Stub: return empty list
4113        Ok(AwsResponse::json(
4114            StatusCode::OK,
4115            serde_json::to_string(&json!({ "fieldIndexes": [] })).unwrap(),
4116        ))
4117    }
4118
4119    // ---- Transformers ----
4120
4121    fn put_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4122        let body = body_json(req);
4123        let log_group_id = body["logGroupIdentifier"]
4124            .as_str()
4125            .ok_or_else(|| {
4126                AwsServiceError::aws_error(
4127                    StatusCode::BAD_REQUEST,
4128                    "InvalidParameterException",
4129                    "logGroupIdentifier is required",
4130                )
4131            })?
4132            .to_string();
4133        let transformer_config = body.get("transformerConfig").cloned().ok_or_else(|| {
4134            AwsServiceError::aws_error(
4135                StatusCode::BAD_REQUEST,
4136                "InvalidParameterException",
4137                "transformerConfig is required",
4138            )
4139        })?;
4140
4141        let group_name = if log_group_id.starts_with("arn:") {
4142            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4143                AwsServiceError::aws_error(
4144                    StatusCode::BAD_REQUEST,
4145                    "InvalidParameterException",
4146                    format!("Invalid ARN: {log_group_id}"),
4147                )
4148            })?
4149        } else {
4150            log_group_id.clone()
4151        };
4152
4153        let now = Utc::now().timestamp_millis();
4154        let mut state = self.state.write();
4155        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4156            AwsServiceError::aws_error(
4157                StatusCode::BAD_REQUEST,
4158                "ResourceNotFoundException",
4159                format!("The specified log group does not exist: {group_name}"),
4160            )
4161        })?;
4162
4163        group.transformer = Some(Transformer {
4164            transformer_config,
4165            creation_time: now,
4166            last_modified_time: now,
4167        });
4168
4169        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4170    }
4171
4172    fn get_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4173        let body = body_json(req);
4174        let log_group_id = body["logGroupIdentifier"]
4175            .as_str()
4176            .ok_or_else(|| {
4177                AwsServiceError::aws_error(
4178                    StatusCode::BAD_REQUEST,
4179                    "InvalidParameterException",
4180                    "logGroupIdentifier is required",
4181                )
4182            })?
4183            .to_string();
4184
4185        let group_name = if log_group_id.starts_with("arn:") {
4186            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4187                AwsServiceError::aws_error(
4188                    StatusCode::BAD_REQUEST,
4189                    "InvalidParameterException",
4190                    format!("Invalid ARN: {log_group_id}"),
4191                )
4192            })?
4193        } else {
4194            log_group_id.clone()
4195        };
4196
4197        let state = self.state.read();
4198        let group = state.log_groups.get(&group_name).ok_or_else(|| {
4199            AwsServiceError::aws_error(
4200                StatusCode::BAD_REQUEST,
4201                "ResourceNotFoundException",
4202                format!("The specified log group does not exist: {group_name}"),
4203            )
4204        })?;
4205
4206        let mut result = json!({
4207            "logGroupIdentifier": group.arn,
4208        });
4209        if let Some(ref t) = group.transformer {
4210            result["transformerConfig"] = t.transformer_config.clone();
4211            result["creationTime"] = json!(t.creation_time);
4212            result["lastModifiedTime"] = json!(t.last_modified_time);
4213        }
4214
4215        Ok(AwsResponse::json(
4216            StatusCode::OK,
4217            serde_json::to_string(&result).unwrap(),
4218        ))
4219    }
4220
4221    fn delete_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4222        let body = body_json(req);
4223        let log_group_id = body["logGroupIdentifier"]
4224            .as_str()
4225            .ok_or_else(|| {
4226                AwsServiceError::aws_error(
4227                    StatusCode::BAD_REQUEST,
4228                    "InvalidParameterException",
4229                    "logGroupIdentifier is required",
4230                )
4231            })?
4232            .to_string();
4233
4234        let group_name = if log_group_id.starts_with("arn:") {
4235            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4236                AwsServiceError::aws_error(
4237                    StatusCode::BAD_REQUEST,
4238                    "InvalidParameterException",
4239                    format!("Invalid ARN: {log_group_id}"),
4240                )
4241            })?
4242        } else {
4243            log_group_id
4244        };
4245
4246        let mut state = self.state.write();
4247        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4248            AwsServiceError::aws_error(
4249                StatusCode::BAD_REQUEST,
4250                "ResourceNotFoundException",
4251                format!("The specified log group does not exist: {group_name}"),
4252            )
4253        })?;
4254
4255        group.transformer = None;
4256        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4257    }
4258
4259    fn test_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4260        let body = body_json(req);
4261        let _transformer_config = body.get("transformerConfig").ok_or_else(|| {
4262            AwsServiceError::aws_error(
4263                StatusCode::BAD_REQUEST,
4264                "InvalidParameterException",
4265                "transformerConfig is required",
4266            )
4267        })?;
4268        let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4269            AwsServiceError::aws_error(
4270                StatusCode::BAD_REQUEST,
4271                "InvalidParameterException",
4272                "logEventMessages is required",
4273            )
4274        })?;
4275
4276        // Stub: return the input events as transformed output unchanged
4277        let transformed: Vec<Value> = log_event_messages
4278            .iter()
4279            .map(|msg| {
4280                json!({
4281                    "eventMessage": msg,
4282                    "transformedEventMessage": msg,
4283                })
4284            })
4285            .collect();
4286
4287        Ok(AwsResponse::json(
4288            StatusCode::OK,
4289            serde_json::to_string(&json!({
4290                "transformedLogs": transformed,
4291            }))
4292            .unwrap(),
4293        ))
4294    }
4295
4296    // ---- Anomaly Detectors ----
4297
4298    fn create_log_anomaly_detector(
4299        &self,
4300        req: &AwsRequest,
4301    ) -> Result<AwsResponse, AwsServiceError> {
4302        let body = body_json(req);
4303        let log_group_arn_list = body["logGroupArnList"]
4304            .as_array()
4305            .ok_or_else(|| {
4306                AwsServiceError::aws_error(
4307                    StatusCode::BAD_REQUEST,
4308                    "InvalidParameterException",
4309                    "logGroupArnList is required",
4310                )
4311            })?
4312            .iter()
4313            .filter_map(|v| v.as_str().map(|s| s.to_string()))
4314            .collect::<Vec<_>>();
4315
4316        let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
4317        let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
4318        let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
4319        let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
4320
4321        let now = Utc::now().timestamp_millis();
4322        let mut state = self.state.write();
4323        let detector_id = uuid::Uuid::new_v4().to_string();
4324        let arn = format!(
4325            "arn:aws:logs:{}:{}:anomaly-detector:{}",
4326            state.region, state.account_id, detector_id
4327        );
4328
4329        let detector = AnomalyDetector {
4330            detector_name: detector_name.clone(),
4331            arn: arn.clone(),
4332            log_group_arn_list,
4333            evaluation_frequency,
4334            filter_pattern,
4335            anomaly_visibility_time,
4336            creation_time: now,
4337            last_modified_time: now,
4338            enabled: true,
4339        };
4340
4341        state.anomaly_detectors.insert(arn.clone(), detector);
4342
4343        Ok(AwsResponse::json(
4344            StatusCode::OK,
4345            serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
4346        ))
4347    }
4348
4349    fn get_log_anomaly_detector(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4350        let body = body_json(req);
4351        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4352            AwsServiceError::aws_error(
4353                StatusCode::BAD_REQUEST,
4354                "InvalidParameterException",
4355                "anomalyDetectorArn is required",
4356            )
4357        })?;
4358
4359        let state = self.state.read();
4360        let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
4361            AwsServiceError::aws_error(
4362                StatusCode::BAD_REQUEST,
4363                "ResourceNotFoundException",
4364                format!("Anomaly detector not found: {arn}"),
4365            )
4366        })?;
4367
4368        let mut result = json!({
4369            "anomalyDetectorArn": detector.arn,
4370            "detectorName": detector.detector_name,
4371            "logGroupArnList": detector.log_group_arn_list,
4372            "creationTimeStamp": detector.creation_time,
4373            "lastModifiedTimeStamp": detector.last_modified_time,
4374            "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
4375        });
4376        if let Some(ref f) = detector.evaluation_frequency {
4377            result["evaluationFrequency"] = json!(f);
4378        }
4379        if let Some(ref f) = detector.filter_pattern {
4380            result["filterPattern"] = json!(f);
4381        }
4382        if let Some(t) = detector.anomaly_visibility_time {
4383            result["anomalyVisibilityTime"] = json!(t);
4384        }
4385
4386        Ok(AwsResponse::json(
4387            StatusCode::OK,
4388            serde_json::to_string(&result).unwrap(),
4389        ))
4390    }
4391
4392    fn delete_log_anomaly_detector(
4393        &self,
4394        req: &AwsRequest,
4395    ) -> Result<AwsResponse, AwsServiceError> {
4396        let body = body_json(req);
4397        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4398            AwsServiceError::aws_error(
4399                StatusCode::BAD_REQUEST,
4400                "InvalidParameterException",
4401                "anomalyDetectorArn is required",
4402            )
4403        })?;
4404
4405        let mut state = self.state.write();
4406        if state.anomaly_detectors.remove(arn).is_none() {
4407            return Err(AwsServiceError::aws_error(
4408                StatusCode::BAD_REQUEST,
4409                "ResourceNotFoundException",
4410                format!("Anomaly detector not found: {arn}"),
4411            ));
4412        }
4413
4414        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4415    }
4416
4417    fn list_log_anomaly_detectors(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4418        let body = body_json(req);
4419        let filter_log_group_arn = body["filterLogGroupArn"].as_str();
4420        let _limit = body["limit"].as_i64().unwrap_or(50);
4421
4422        let state = self.state.read();
4423        let detectors: Vec<Value> = state
4424            .anomaly_detectors
4425            .values()
4426            .filter(|d| {
4427                filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
4428            })
4429            .map(|d| {
4430                let mut obj = json!({
4431                    "anomalyDetectorArn": d.arn,
4432                    "detectorName": d.detector_name,
4433                    "logGroupArnList": d.log_group_arn_list,
4434                    "creationTimeStamp": d.creation_time,
4435                    "lastModifiedTimeStamp": d.last_modified_time,
4436                    "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
4437                });
4438                if let Some(ref f) = d.evaluation_frequency {
4439                    obj["evaluationFrequency"] = json!(f);
4440                }
4441                if let Some(ref f) = d.filter_pattern {
4442                    obj["filterPattern"] = json!(f);
4443                }
4444                if let Some(t) = d.anomaly_visibility_time {
4445                    obj["anomalyVisibilityTime"] = json!(t);
4446                }
4447                obj
4448            })
4449            .collect();
4450
4451        Ok(AwsResponse::json(
4452            StatusCode::OK,
4453            serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
4454        ))
4455    }
4456
4457    fn update_log_anomaly_detector(
4458        &self,
4459        req: &AwsRequest,
4460    ) -> Result<AwsResponse, AwsServiceError> {
4461        let body = body_json(req);
4462        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4463            AwsServiceError::aws_error(
4464                StatusCode::BAD_REQUEST,
4465                "InvalidParameterException",
4466                "anomalyDetectorArn is required",
4467            )
4468        })?;
4469        let enabled = body["enabled"].as_bool().unwrap_or(true);
4470
4471        let mut state = self.state.write();
4472        let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
4473            AwsServiceError::aws_error(
4474                StatusCode::BAD_REQUEST,
4475                "ResourceNotFoundException",
4476                format!("Anomaly detector not found: {arn}"),
4477            )
4478        })?;
4479
4480        detector.enabled = enabled;
4481        if let Some(f) = body["evaluationFrequency"].as_str() {
4482            detector.evaluation_frequency = Some(f.to_string());
4483        }
4484        if let Some(f) = body["filterPattern"].as_str() {
4485            detector.filter_pattern = Some(f.to_string());
4486        }
4487        if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
4488            detector.anomaly_visibility_time = Some(t);
4489        }
4490        detector.last_modified_time = Utc::now().timestamp_millis();
4491
4492        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4493    }
4494
4495    // ---- Misc Operations ----
4496
4497    fn get_log_group_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4498        let body = body_json(req);
4499        let log_group_id = body["logGroupName"]
4500            .as_str()
4501            .or_else(|| body["logGroupIdentifier"].as_str())
4502            .ok_or_else(|| {
4503                AwsServiceError::aws_error(
4504                    StatusCode::BAD_REQUEST,
4505                    "InvalidParameterException",
4506                    "logGroupName or logGroupIdentifier is required",
4507                )
4508            })?;
4509
4510        let group_name = if log_group_id.starts_with("arn:") {
4511            extract_log_group_from_arn(log_group_id).ok_or_else(|| {
4512                AwsServiceError::aws_error(
4513                    StatusCode::BAD_REQUEST,
4514                    "InvalidParameterException",
4515                    format!("Invalid ARN: {log_group_id}"),
4516                )
4517            })?
4518        } else {
4519            log_group_id.to_string()
4520        };
4521
4522        let state = self.state.read();
4523        if !state.log_groups.contains_key(&group_name) {
4524            return Err(AwsServiceError::aws_error(
4525                StatusCode::BAD_REQUEST,
4526                "ResourceNotFoundException",
4527                format!("The specified log group does not exist: {group_name}"),
4528            ));
4529        }
4530
4531        // Stub response with common fields
4532        let fields = json!([
4533            { "fieldName": "@timestamp", "percent": 100 },
4534            { "fieldName": "@message", "percent": 100 },
4535        ]);
4536
4537        Ok(AwsResponse::json(
4538            StatusCode::OK,
4539            serde_json::to_string(&json!({ "logGroupFields": fields })).unwrap(),
4540        ))
4541    }
4542
4543    fn test_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4544        let body = body_json(req);
4545        let filter_pattern = body["filterPattern"].as_str().ok_or_else(|| {
4546            AwsServiceError::aws_error(
4547                StatusCode::BAD_REQUEST,
4548                "InvalidParameterException",
4549                "filterPattern is required",
4550            )
4551        })?;
4552        let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4553            AwsServiceError::aws_error(
4554                StatusCode::BAD_REQUEST,
4555                "InvalidParameterException",
4556                "logEventMessages is required",
4557            )
4558        })?;
4559
4560        let matches: Vec<Value> = log_event_messages
4561            .iter()
4562            .enumerate()
4563            .filter(|(_, msg)| {
4564                let msg_str = msg.as_str().unwrap_or("");
4565                matches_filter_pattern(filter_pattern, msg_str)
4566            })
4567            .map(|(i, msg)| {
4568                json!({
4569                    "eventNumber": i + 1,
4570                    "eventMessage": msg,
4571                    "extractedValues": {},
4572                })
4573            })
4574            .collect();
4575
4576        Ok(AwsResponse::json(
4577            StatusCode::OK,
4578            serde_json::to_string(
4579                &json!({ "matches": matches, "testResults": !matches.is_empty() }),
4580            )
4581            .unwrap(),
4582        ))
4583    }
4584
4585    fn stop_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4586        let body = body_json(req);
4587        let query_id = body["queryId"].as_str().ok_or_else(|| {
4588            AwsServiceError::aws_error(
4589                StatusCode::BAD_REQUEST,
4590                "InvalidParameterException",
4591                "queryId is required",
4592            )
4593        })?;
4594
4595        let mut state = self.state.write();
4596        let query = state.queries.get_mut(query_id).ok_or_else(|| {
4597            AwsServiceError::aws_error(
4598                StatusCode::BAD_REQUEST,
4599                "InvalidParameterException",
4600                format!("Query {query_id} is not in a cancellable state"),
4601            )
4602        })?;
4603
4604        let was_running = query.status == "Running" || query.status == "Scheduled";
4605        if was_running {
4606            query.status = "Cancelled".to_string();
4607        }
4608
4609        Ok(AwsResponse::json(
4610            StatusCode::OK,
4611            serde_json::to_string(&json!({ "success": was_running })).unwrap(),
4612        ))
4613    }
4614
4615    fn put_log_group_deletion_protection(
4616        &self,
4617        req: &AwsRequest,
4618    ) -> Result<AwsResponse, AwsServiceError> {
4619        let body = body_json(req);
4620        let log_group_id = body["logGroupIdentifier"]
4621            .as_str()
4622            .ok_or_else(|| {
4623                AwsServiceError::aws_error(
4624                    StatusCode::BAD_REQUEST,
4625                    "InvalidParameterException",
4626                    "logGroupIdentifier is required",
4627                )
4628            })?
4629            .to_string();
4630        let deletion_protection = body["deletionProtectionEnabled"].as_bool().unwrap_or(true);
4631
4632        let group_name = if log_group_id.starts_with("arn:") {
4633            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4634                AwsServiceError::aws_error(
4635                    StatusCode::BAD_REQUEST,
4636                    "InvalidParameterException",
4637                    format!("Invalid ARN: {log_group_id}"),
4638                )
4639            })?
4640        } else {
4641            log_group_id
4642        };
4643
4644        let mut state = self.state.write();
4645        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4646            AwsServiceError::aws_error(
4647                StatusCode::BAD_REQUEST,
4648                "ResourceNotFoundException",
4649                format!("The specified log group does not exist: {group_name}"),
4650            )
4651        })?;
4652
4653        group.deletion_protection = deletion_protection;
4654        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4655    }
4656
4657    fn get_log_record(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4658        let body = body_json(req);
4659        let _log_record_pointer = body["logRecordPointer"].as_str().ok_or_else(|| {
4660            AwsServiceError::aws_error(
4661                StatusCode::BAD_REQUEST,
4662                "InvalidParameterException",
4663                "logRecordPointer is required",
4664            )
4665        })?;
4666
4667        // Stub: return empty log record
4668        Ok(AwsResponse::json(
4669            StatusCode::OK,
4670            serde_json::to_string(&json!({ "logRecord": {} })).unwrap(),
4671        ))
4672    }
4673
4674    fn list_anomalies(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4675        // Stub: return empty anomalies list
4676        Ok(AwsResponse::json(
4677            StatusCode::OK,
4678            serde_json::to_string(&json!({ "anomalies": [] })).unwrap(),
4679        ))
4680    }
4681
4682    fn update_anomaly(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4683        // No-op stub
4684        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4685    }
4686
4687    // -- Import tasks --
4688
4689    fn create_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4690        let body = body_json(req);
4691        let import_source_arn = require_str(&body, "importSourceArn")?;
4692        let import_role_arn = require_str(&body, "importRoleArn")?;
4693        let log_group_name = body["logGroupName"].as_str().map(|s| s.to_string());
4694
4695        let import_id = uuid::Uuid::new_v4().to_string();
4696        let now = Utc::now().timestamp_millis();
4697
4698        let task = ImportTask {
4699            import_id: import_id.clone(),
4700            import_source_arn: import_source_arn.to_string(),
4701            import_role_arn: import_role_arn.to_string(),
4702            log_group_name,
4703            status: "RUNNING".to_string(),
4704            creation_time: now,
4705        };
4706
4707        let mut state = self.state.write();
4708        state.import_tasks.insert(import_id.clone(), task);
4709
4710        Ok(AwsResponse::json(
4711            StatusCode::OK,
4712            serde_json::to_string(&json!({ "importId": import_id })).unwrap(),
4713        ))
4714    }
4715
4716    fn describe_import_tasks(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4717        let state = self.state.read();
4718        let tasks: Vec<Value> = state
4719            .import_tasks
4720            .values()
4721            .map(|t| {
4722                json!({
4723                    "importId": t.import_id,
4724                    "importSourceArn": t.import_source_arn,
4725                    "importStatus": t.status,
4726                    "creationTime": t.creation_time,
4727                })
4728            })
4729            .collect();
4730        Ok(AwsResponse::json(
4731            StatusCode::OK,
4732            serde_json::to_string(&json!({ "imports": tasks })).unwrap(),
4733        ))
4734    }
4735
4736    fn describe_import_task_batches(
4737        &self,
4738        req: &AwsRequest,
4739    ) -> Result<AwsResponse, AwsServiceError> {
4740        let body = body_json(req);
4741        let _import_id = require_str(&body, "importId")?;
4742        // Stub: return empty batches
4743        Ok(AwsResponse::json(
4744            StatusCode::OK,
4745            serde_json::to_string(&json!({ "importBatches": [] })).unwrap(),
4746        ))
4747    }
4748
4749    fn cancel_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4750        let body = body_json(req);
4751        let import_id = require_str(&body, "importId")?;
4752
4753        let mut state = self.state.write();
4754        match state.import_tasks.get_mut(import_id) {
4755            Some(task) => {
4756                task.status = "CANCELLED".to_string();
4757                Ok(AwsResponse::json(StatusCode::OK, "{}"))
4758            }
4759            None => Err(AwsServiceError::aws_error(
4760                StatusCode::NOT_FOUND,
4761                "ResourceNotFoundException",
4762                format!("Import task not found: {import_id}"),
4763            )),
4764        }
4765    }
4766
4767    // -- Integrations --
4768
4769    fn put_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4770        let body = body_json(req);
4771        let integration_name = require_str(&body, "integrationName")?;
4772        let integration_type = require_str(&body, "integrationType")?;
4773        let resource_config = body["resourceConfig"].clone();
4774
4775        let now = Utc::now().timestamp_millis();
4776        let integration = Integration {
4777            integration_name: integration_name.to_string(),
4778            integration_type: integration_type.to_string(),
4779            resource_config,
4780            status: "ACTIVE".to_string(),
4781            creation_time: now,
4782        };
4783
4784        let mut state = self.state.write();
4785        state
4786            .integrations
4787            .insert(integration_name.to_string(), integration);
4788
4789        Ok(AwsResponse::json(
4790            StatusCode::OK,
4791            serde_json::to_string(&json!({
4792                "integrationName": integration_name,
4793                "integrationStatus": "ACTIVE"
4794            }))
4795            .unwrap(),
4796        ))
4797    }
4798
4799    fn get_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4800        let body = body_json(req);
4801        let integration_name = require_str(&body, "integrationName")?;
4802
4803        let state = self.state.read();
4804        match state.integrations.get(integration_name) {
4805            Some(i) => Ok(AwsResponse::json(
4806                StatusCode::OK,
4807                serde_json::to_string(&json!({
4808                    "integrationName": i.integration_name,
4809                    "integrationType": i.integration_type,
4810                    "integrationStatus": i.status,
4811                }))
4812                .unwrap(),
4813            )),
4814            None => Err(AwsServiceError::aws_error(
4815                StatusCode::NOT_FOUND,
4816                "ResourceNotFoundException",
4817                format!("Integration not found: {integration_name}"),
4818            )),
4819        }
4820    }
4821
4822    fn delete_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4823        let body = body_json(req);
4824        let integration_name = require_str(&body, "integrationName")?;
4825
4826        let mut state = self.state.write();
4827        state.integrations.remove(integration_name);
4828        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4829    }
4830
4831    fn list_integrations(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4832        let state = self.state.read();
4833        let integrations: Vec<Value> = state
4834            .integrations
4835            .values()
4836            .map(|i| {
4837                json!({
4838                    "integrationName": i.integration_name,
4839                    "integrationType": i.integration_type,
4840                    "integrationStatus": i.status,
4841                })
4842            })
4843            .collect();
4844        Ok(AwsResponse::json(
4845            StatusCode::OK,
4846            serde_json::to_string(&json!({ "integrationSummaries": integrations })).unwrap(),
4847        ))
4848    }
4849
4850    // -- Lookup tables --
4851
4852    fn create_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4853        let body = body_json(req);
4854        let lookup_table_name = require_str(&body, "lookupTableName")?;
4855        let table_body = require_str(&body, "tableBody")?;
4856
4857        let state_r = self.state.read();
4858        let account_id = state_r.account_id.clone();
4859        let region = state_r.region.clone();
4860        drop(state_r);
4861
4862        let arn = format!("arn:aws:logs:{region}:{account_id}:lookup-table:{lookup_table_name}");
4863        let now = Utc::now().timestamp_millis();
4864
4865        let table = LookupTable {
4866            lookup_table_name: lookup_table_name.to_string(),
4867            arn: arn.clone(),
4868            table_body: table_body.to_string(),
4869            creation_time: now,
4870            last_modified_time: now,
4871        };
4872
4873        let mut state = self.state.write();
4874        state.lookup_tables.insert(arn.clone(), table);
4875
4876        Ok(AwsResponse::json(
4877            StatusCode::OK,
4878            serde_json::to_string(&json!({ "lookupTableArn": arn })).unwrap(),
4879        ))
4880    }
4881
4882    fn get_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4883        let body = body_json(req);
4884        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4885
4886        let state = self.state.read();
4887        match state.lookup_tables.get(lookup_table_arn) {
4888            Some(t) => Ok(AwsResponse::json(
4889                StatusCode::OK,
4890                serde_json::to_string(&json!({
4891                    "lookupTableName": t.lookup_table_name,
4892                    "lookupTableArn": t.arn,
4893                    "tableBody": t.table_body,
4894                    "creationTime": t.creation_time,
4895                    "lastModifiedTime": t.last_modified_time,
4896                }))
4897                .unwrap(),
4898            )),
4899            None => Err(AwsServiceError::aws_error(
4900                StatusCode::NOT_FOUND,
4901                "ResourceNotFoundException",
4902                format!("Lookup table not found: {lookup_table_arn}"),
4903            )),
4904        }
4905    }
4906
4907    fn describe_lookup_tables(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4908        let state = self.state.read();
4909        let tables: Vec<Value> = state
4910            .lookup_tables
4911            .values()
4912            .map(|t| {
4913                json!({
4914                    "lookupTableName": t.lookup_table_name,
4915                    "lookupTableArn": t.arn,
4916                    "creationTime": t.creation_time,
4917                    "lastModifiedTime": t.last_modified_time,
4918                })
4919            })
4920            .collect();
4921        Ok(AwsResponse::json(
4922            StatusCode::OK,
4923            serde_json::to_string(&json!({ "lookupTables": tables })).unwrap(),
4924        ))
4925    }
4926
4927    fn delete_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4928        let body = body_json(req);
4929        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4930
4931        let mut state = self.state.write();
4932        state.lookup_tables.remove(lookup_table_arn);
4933        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4934    }
4935
4936    fn update_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4937        let body = body_json(req);
4938        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4939        let table_body = require_str(&body, "tableBody")?;
4940
4941        let mut state = self.state.write();
4942        match state.lookup_tables.get_mut(lookup_table_arn) {
4943            Some(t) => {
4944                t.table_body = table_body.to_string();
4945                t.last_modified_time = Utc::now().timestamp_millis();
4946                Ok(AwsResponse::json(StatusCode::OK, "{}"))
4947            }
4948            None => Err(AwsServiceError::aws_error(
4949                StatusCode::NOT_FOUND,
4950                "ResourceNotFoundException",
4951                format!("Lookup table not found: {lookup_table_arn}"),
4952            )),
4953        }
4954    }
4955
4956    // -- Scheduled queries --
4957
4958    fn create_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4959        let body = body_json(req);
4960        let name = require_str(&body, "name")?;
4961        let query_string = require_str(&body, "queryString")?;
4962        let query_language = require_str(&body, "queryLanguage")?;
4963        let schedule_expression = require_str(&body, "scheduleExpression")?;
4964        let execution_role_arn = require_str(&body, "executionRoleArn")?;
4965
4966        let state_r = self.state.read();
4967        let account_id = state_r.account_id.clone();
4968        let region = state_r.region.clone();
4969        drop(state_r);
4970
4971        let arn = format!("arn:aws:logs:{region}:{account_id}:scheduled-query:{name}");
4972        let now = Utc::now().timestamp_millis();
4973
4974        let sq = ScheduledQuery {
4975            name: name.to_string(),
4976            arn: arn.clone(),
4977            query_string: query_string.to_string(),
4978            query_language: query_language.to_string(),
4979            schedule_expression: schedule_expression.to_string(),
4980            execution_role_arn: execution_role_arn.to_string(),
4981            status: "ACTIVE".to_string(),
4982            creation_time: now,
4983            last_modified_time: now,
4984        };
4985
4986        let mut state = self.state.write();
4987        state.scheduled_queries.insert(arn.clone(), sq);
4988
4989        Ok(AwsResponse::json(
4990            StatusCode::OK,
4991            serde_json::to_string(&json!({ "scheduledQueryArn": arn })).unwrap(),
4992        ))
4993    }
4994
4995    fn get_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4996        let body = body_json(req);
4997        let identifier = require_str(&body, "identifier")?;
4998
4999        let state = self.state.read();
5000        match state.scheduled_queries.get(identifier) {
5001            Some(sq) => Ok(AwsResponse::json(
5002                StatusCode::OK,
5003                serde_json::to_string(&json!({
5004                    "scheduledQueryArn": sq.arn,
5005                    "name": sq.name,
5006                    "queryString": sq.query_string,
5007                    "queryLanguage": sq.query_language,
5008                    "scheduleExpression": sq.schedule_expression,
5009                    "executionRoleArn": sq.execution_role_arn,
5010                }))
5011                .unwrap(),
5012            )),
5013            None => Err(AwsServiceError::aws_error(
5014                StatusCode::NOT_FOUND,
5015                "ResourceNotFoundException",
5016                format!("Scheduled query not found: {identifier}"),
5017            )),
5018        }
5019    }
5020
5021    fn get_scheduled_query_history(
5022        &self,
5023        req: &AwsRequest,
5024    ) -> Result<AwsResponse, AwsServiceError> {
5025        let body = body_json(req);
5026        let _identifier = require_str(&body, "identifier")?;
5027        // Stub: return empty history
5028        Ok(AwsResponse::json(
5029            StatusCode::OK,
5030            serde_json::to_string(&json!({ "triggerHistory": [] })).unwrap(),
5031        ))
5032    }
5033
5034    fn list_scheduled_queries(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5035        let state = self.state.read();
5036        let queries: Vec<Value> = state
5037            .scheduled_queries
5038            .values()
5039            .map(|sq| {
5040                json!({
5041                    "name": sq.name,
5042                    "scheduledQueryArn": sq.arn,
5043                })
5044            })
5045            .collect();
5046        Ok(AwsResponse::json(
5047            StatusCode::OK,
5048            serde_json::to_string(&json!({ "scheduledQueries": queries })).unwrap(),
5049        ))
5050    }
5051
5052    fn delete_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5053        let body = body_json(req);
5054        let identifier = require_str(&body, "identifier")?;
5055
5056        let mut state = self.state.write();
5057        state.scheduled_queries.remove(identifier);
5058        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5059    }
5060
5061    fn update_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5062        let body = body_json(req);
5063        let identifier = require_str(&body, "identifier")?;
5064        let query_string = require_str(&body, "queryString")?;
5065        let query_language = require_str(&body, "queryLanguage")?;
5066        let schedule_expression = require_str(&body, "scheduleExpression")?;
5067        let execution_role_arn = require_str(&body, "executionRoleArn")?;
5068
5069        let mut state = self.state.write();
5070        match state.scheduled_queries.get_mut(identifier) {
5071            Some(sq) => {
5072                sq.query_string = query_string.to_string();
5073                sq.query_language = query_language.to_string();
5074                sq.schedule_expression = schedule_expression.to_string();
5075                sq.execution_role_arn = execution_role_arn.to_string();
5076                sq.last_modified_time = Utc::now().timestamp_millis();
5077                Ok(AwsResponse::json(StatusCode::OK, "{}"))
5078            }
5079            None => Err(AwsServiceError::aws_error(
5080                StatusCode::NOT_FOUND,
5081                "ResourceNotFoundException",
5082                format!("Scheduled query not found: {identifier}"),
5083            )),
5084        }
5085    }
5086
5087    // -- Misc stubs --
5088
5089    fn start_live_tail(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5090        let session_id = uuid::Uuid::new_v4().to_string();
5091        Ok(AwsResponse::json(
5092            StatusCode::OK,
5093            serde_json::to_string(&json!({
5094                "responseStream": {
5095                    "sessionStart": {
5096                        "sessionId": session_id,
5097                        "logGroupIdentifiers": [],
5098                    }
5099                }
5100            }))
5101            .unwrap(),
5102        ))
5103    }
5104
5105    fn list_log_groups_for_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5106        let body = body_json(req);
5107        let _query_id = require_str(&body, "queryId")?;
5108        // Stub: return empty log group names
5109        Ok(AwsResponse::json(
5110            StatusCode::OK,
5111            serde_json::to_string(&json!({ "logGroupIdentifiers": [] })).unwrap(),
5112        ))
5113    }
5114
5115    fn list_aggregate_log_group_summaries(
5116        &self,
5117        _req: &AwsRequest,
5118    ) -> Result<AwsResponse, AwsServiceError> {
5119        // Stub: return empty summaries
5120        Ok(AwsResponse::json(
5121            StatusCode::OK,
5122            serde_json::to_string(&json!({ "aggregateLogGroupSummaries": [] })).unwrap(),
5123        ))
5124    }
5125
5126    fn put_bearer_token_authentication(
5127        &self,
5128        req: &AwsRequest,
5129    ) -> Result<AwsResponse, AwsServiceError> {
5130        let body = body_json(req);
5131        let log_group_identifier = require_str(&body, "logGroupIdentifier")?;
5132        let enabled = body["bearerTokenAuthenticationEnabled"]
5133            .as_bool()
5134            .unwrap_or(false);
5135
5136        let mut state = self.state.write();
5137        state
5138            .bearer_token_auth
5139            .insert(log_group_identifier.to_string(), enabled);
5140        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5141    }
5142
5143    fn get_log_object(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5144        // Stub: return empty log object
5145        Ok(AwsResponse::json(
5146            StatusCode::OK,
5147            serde_json::to_string(&json!({ "logObject": {} })).unwrap(),
5148        ))
5149    }
5150
5151    fn get_log_fields(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5152        // Stub: return empty log fields
5153        Ok(AwsResponse::json(
5154            StatusCode::OK,
5155            serde_json::to_string(&json!({ "logFields": [] })).unwrap(),
5156        ))
5157    }
5158
5159    fn associate_source_to_s3_table_integration(
5160        &self,
5161        req: &AwsRequest,
5162    ) -> Result<AwsResponse, AwsServiceError> {
5163        let body = body_json(req);
5164        let integration_arn = require_str(&body, "integrationArn")?;
5165        let data_source = body["dataSource"].clone();
5166        let source_id = data_source
5167            .as_object()
5168            .and_then(|o| o.get("resourceArn"))
5169            .and_then(|v| v.as_str())
5170            .unwrap_or("unknown")
5171            .to_string();
5172
5173        let mut state = self.state.write();
5174        state
5175            .s3_table_sources
5176            .entry(integration_arn.to_string())
5177            .or_default()
5178            .push(source_id);
5179        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5180    }
5181
5182    fn list_sources_for_s3_table_integration(
5183        &self,
5184        req: &AwsRequest,
5185    ) -> Result<AwsResponse, AwsServiceError> {
5186        let body = body_json(req);
5187        let integration_arn = require_str(&body, "integrationArn")?;
5188
5189        let state = self.state.read();
5190        let sources: Vec<Value> = state
5191            .s3_table_sources
5192            .get(integration_arn)
5193            .map(|sources| {
5194                sources
5195                    .iter()
5196                    .map(|s| json!({ "resourceArn": s }))
5197                    .collect()
5198            })
5199            .unwrap_or_default();
5200        Ok(AwsResponse::json(
5201            StatusCode::OK,
5202            serde_json::to_string(&json!({ "dataSources": sources })).unwrap(),
5203        ))
5204    }
5205
5206    fn disassociate_source_from_s3_table_integration(
5207        &self,
5208        req: &AwsRequest,
5209    ) -> Result<AwsResponse, AwsServiceError> {
5210        let body = body_json(req);
5211        let _identifier = require_str(&body, "identifier")?;
5212        // No-op stub (we don't track detailed enough to remove specific sources)
5213        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5214    }
5215
5216    fn update_delivery_configuration(
5217        &self,
5218        req: &AwsRequest,
5219    ) -> Result<AwsResponse, AwsServiceError> {
5220        let body = body_json(req);
5221        let id = require_str(&body, "id")?;
5222
5223        let state = self.state.read();
5224        if !state.deliveries.contains_key(id) {
5225            return Err(AwsServiceError::aws_error(
5226                StatusCode::NOT_FOUND,
5227                "ResourceNotFoundException",
5228                format!("Delivery not found: {id}"),
5229            ));
5230        }
5231        drop(state);
5232
5233        // No-op: delivery configuration update is accepted but not stored
5234        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5235    }
5236
5237    fn describe_configuration_templates(
5238        &self,
5239        _req: &AwsRequest,
5240    ) -> Result<AwsResponse, AwsServiceError> {
5241        // Stub: return empty configuration templates
5242        Ok(AwsResponse::json(
5243            StatusCode::OK,
5244            serde_json::to_string(&json!({ "configurationTemplates": [] })).unwrap(),
5245        ))
5246    }
5247}
5248
5249/// Resolve log group name from either logGroupName or resourceIdentifier.
5250/// resourceIdentifier can be a log group name or an ARN.
5251fn resolve_log_group_name(
5252    log_group_name: Option<&str>,
5253    resource_identifier: Option<&str>,
5254) -> Result<String, AwsServiceError> {
5255    if let Some(identifier) = resource_identifier {
5256        if identifier.starts_with("arn:") {
5257            extract_log_group_from_arn(identifier).ok_or_else(|| {
5258                AwsServiceError::aws_error(
5259                    StatusCode::BAD_REQUEST,
5260                    "InvalidParameterException",
5261                    format!("Invalid ARN: {identifier}"),
5262                )
5263            })
5264        } else {
5265            Ok(identifier.to_string())
5266        }
5267    } else if let Some(name) = log_group_name {
5268        Ok(name.to_string())
5269    } else {
5270        Err(AwsServiceError::aws_error(
5271            StatusCode::BAD_REQUEST,
5272            "InvalidParameterException",
5273            "Either logGroupName or resourceIdentifier is required",
5274        ))
5275    }
5276}
5277
5278/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
5279fn extract_log_group_from_arn(arn: &str) -> Option<String> {
5280    // arn:aws:logs:region:account:log-group:name:*
5281    let parts: Vec<&str> = arn.splitn(7, ':').collect();
5282    if parts.len() >= 7 && parts[5] == "log-group" {
5283        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
5284        Some(name.to_string())
5285    } else {
5286        None
5287    }
5288}
5289
5290/// CloudWatch Logs filter pattern matching.
5291///
5292/// Rules:
5293/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
5294/// - Quoted string `"foo bar"` matches the exact substring
5295/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
5296/// - Single unquoted word `foo` is a simple substring match
5297fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
5298    let pattern = pattern.trim();
5299
5300    // Empty pattern matches everything
5301    if pattern.is_empty() {
5302        return true;
5303    }
5304
5305    // JSON/metric filter patterns (start with { or [) - we don't parse these, match everything
5306    if pattern.starts_with('{') || pattern.starts_with('[') {
5307        return true;
5308    }
5309
5310    // Quoted pattern: exact substring match
5311    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
5312        let inner = &pattern[1..pattern.len() - 1];
5313        return message.contains(inner);
5314    }
5315
5316    // Multiple words: all must be present
5317    let words: Vec<&str> = pattern.split_whitespace().collect();
5318    words.iter().all(|word| message.contains(word))
5319}
5320
5321#[cfg(test)]
5322mod tests {
5323    use super::*;
5324    use crate::state::LogsState;
5325    use bytes::Bytes;
5326    use http::{HeaderMap, Method};
5327    use std::collections::HashMap;
5328    use std::sync::Arc;
5329
5330    fn make_service() -> LogsService {
5331        let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
5332            "123456789012",
5333            "us-east-1",
5334        )));
5335        LogsService::new(state)
5336    }
5337
5338    fn make_request(action: &str, body: Value) -> AwsRequest {
5339        AwsRequest {
5340            service: "logs".to_string(),
5341            action: action.to_string(),
5342            region: "us-east-1".to_string(),
5343            account_id: "123456789012".to_string(),
5344            request_id: "test-request-id".to_string(),
5345            headers: HeaderMap::new(),
5346            query_params: HashMap::new(),
5347            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
5348            path_segments: vec![],
5349            raw_path: "/".to_string(),
5350            method: Method::POST,
5351            is_query_protocol: false,
5352            access_key_id: None,
5353        }
5354    }
5355
5356    fn create_group(svc: &LogsService, name: &str) {
5357        let req = make_request("CreateLogGroup", json!({ "logGroupName": name }));
5358        svc.create_log_group(&req).unwrap();
5359    }
5360
5361    fn create_stream(svc: &LogsService, group: &str, stream: &str) {
5362        let req = make_request(
5363            "CreateLogStream",
5364            json!({ "logGroupName": group, "logStreamName": stream }),
5365        );
5366        svc.create_log_stream(&req).unwrap();
5367    }
5368
5369    fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
5370        let now = chrono::Utc::now().timestamp_millis();
5371        let events: Vec<Value> = messages
5372            .iter()
5373            .enumerate()
5374            .map(|(i, msg)| json!({ "timestamp": now + i as i64, "message": msg }))
5375            .collect();
5376        let req = make_request(
5377            "PutLogEvents",
5378            json!({
5379                "logGroupName": group,
5380                "logStreamName": stream,
5381                "logEvents": events,
5382            }),
5383        );
5384        svc.put_log_events(&req).unwrap();
5385    }
5386
5387    // ---- describe_log_groups: logGroupNamePattern ----
5388
5389    #[test]
5390    fn describe_log_groups_pattern_filters_by_substring() {
5391        let svc = make_service();
5392        create_group(&svc, "/app/web");
5393        create_group(&svc, "/app/api");
5394        create_group(&svc, "/system/metrics");
5395
5396        let req = make_request("DescribeLogGroups", json!({ "logGroupNamePattern": "app" }));
5397        let resp = svc.describe_log_groups(&req).unwrap();
5398        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5399        let names: Vec<&str> = body["logGroups"]
5400            .as_array()
5401            .unwrap()
5402            .iter()
5403            .map(|g| g["logGroupName"].as_str().unwrap())
5404            .collect();
5405        assert_eq!(names.len(), 2);
5406        assert!(names.contains(&"/app/web"));
5407        assert!(names.contains(&"/app/api"));
5408    }
5409
5410    #[test]
5411    fn describe_log_groups_pattern_empty_returns_all() {
5412        let svc = make_service();
5413        create_group(&svc, "/app/web");
5414        create_group(&svc, "/system/metrics");
5415
5416        let req = make_request("DescribeLogGroups", json!({}));
5417        let resp = svc.describe_log_groups(&req).unwrap();
5418        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5419        assert_eq!(body["logGroups"].as_array().unwrap().len(), 2);
5420    }
5421
5422    // ---- filter_log_events: logGroupIdentifier ----
5423
5424    #[test]
5425    fn filter_log_events_uses_log_group_identifier_as_name() {
5426        let svc = make_service();
5427        create_group(&svc, "my-group");
5428        create_stream(&svc, "my-group", "stream-1");
5429        put_events(&svc, "my-group", "stream-1", &["hello"]);
5430
5431        let req = make_request(
5432            "FilterLogEvents",
5433            json!({ "logGroupIdentifier": "my-group" }),
5434        );
5435        let resp = svc.filter_log_events(&req).unwrap();
5436        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5437        assert_eq!(body["events"].as_array().unwrap().len(), 1);
5438    }
5439
5440    #[test]
5441    fn filter_log_events_uses_log_group_identifier_as_arn() {
5442        let svc = make_service();
5443        create_group(&svc, "my-group");
5444        create_stream(&svc, "my-group", "stream-1");
5445        put_events(&svc, "my-group", "stream-1", &["hello"]);
5446
5447        let req = make_request(
5448            "FilterLogEvents",
5449            json!({ "logGroupIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*" }),
5450        );
5451        let resp = svc.filter_log_events(&req).unwrap();
5452        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5453        assert_eq!(body["events"].as_array().unwrap().len(), 1);
5454    }
5455
5456    #[test]
5457    fn filter_log_events_errors_without_group_name_or_identifier() {
5458        let svc = make_service();
5459        let req = make_request("FilterLogEvents", json!({}));
5460        assert!(svc.filter_log_events(&req).is_err());
5461    }
5462
5463    // ---- filter_log_events: logStreamNamePrefix ----
5464
5465    #[test]
5466    fn filter_log_events_filters_by_stream_name_prefix() {
5467        let svc = make_service();
5468        create_group(&svc, "grp");
5469        create_stream(&svc, "grp", "web-1");
5470        create_stream(&svc, "grp", "web-2");
5471        create_stream(&svc, "grp", "api-1");
5472        put_events(&svc, "grp", "web-1", &["a"]);
5473        put_events(&svc, "grp", "web-2", &["b"]);
5474        put_events(&svc, "grp", "api-1", &["c"]);
5475
5476        let req = make_request(
5477            "FilterLogEvents",
5478            json!({ "logGroupName": "grp", "logStreamNamePrefix": "web" }),
5479        );
5480        let resp = svc.filter_log_events(&req).unwrap();
5481        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5482        let events = body["events"].as_array().unwrap();
5483        assert_eq!(events.len(), 2);
5484        for e in events {
5485            assert!(e["logStreamName"].as_str().unwrap().starts_with("web"));
5486        }
5487    }
5488
5489    // ---- create_export_task: taskName + logStreamNamePrefix stored ----
5490
5491    #[test]
5492    fn create_export_task_stores_task_name_and_stream_prefix() {
5493        let svc = make_service();
5494        create_group(&svc, "grp");
5495
5496        let req = make_request(
5497            "CreateExportTask",
5498            json!({
5499                "logGroupName": "grp",
5500                "from": 0,
5501                "to": 1000,
5502                "destination": "my-bucket",
5503                "taskName": "my-export",
5504                "logStreamNamePrefix": "web-",
5505            }),
5506        );
5507        let resp = svc.create_export_task(&req).unwrap();
5508        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5509        let task_id = body["taskId"].as_str().unwrap();
5510
5511        let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5512        let resp = svc.describe_export_tasks(&req).unwrap();
5513        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5514        let task = &body["exportTasks"][0];
5515        assert_eq!(task["taskName"].as_str().unwrap(), "my-export");
5516        assert_eq!(task["logStreamNamePrefix"].as_str().unwrap(), "web-");
5517    }
5518
5519    #[test]
5520    fn create_export_task_omits_optional_fields_when_not_provided() {
5521        let svc = make_service();
5522        create_group(&svc, "grp");
5523
5524        let req = make_request(
5525            "CreateExportTask",
5526            json!({
5527                "logGroupName": "grp",
5528                "from": 0,
5529                "to": 1000,
5530                "destination": "my-bucket",
5531            }),
5532        );
5533        let resp = svc.create_export_task(&req).unwrap();
5534        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5535        let task_id = body["taskId"].as_str().unwrap();
5536
5537        let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5538        let resp = svc.describe_export_tasks(&req).unwrap();
5539        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5540        let task = &body["exportTasks"][0];
5541        assert!(task.get("taskName").is_none() || task["taskName"].is_null());
5542        assert!(task.get("logStreamNamePrefix").is_none() || task["logStreamNamePrefix"].is_null());
5543    }
5544
5545    // ---- associate_kms_key / disassociate_kms_key: resourceIdentifier ----
5546
5547    #[test]
5548    fn associate_kms_key_via_resource_identifier_arn() {
5549        let svc = make_service();
5550        create_group(&svc, "grp");
5551
5552        let req = make_request(
5553            "AssociateKmsKey",
5554            json!({
5555                "resourceIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:grp:*",
5556                "kmsKeyId": "arn:aws:kms:us-east-1:123456789012:key/abc-123",
5557            }),
5558        );
5559        svc.associate_kms_key(&req).unwrap();
5560
5561        let state = svc.state.read();
5562        assert_eq!(
5563            state.log_groups["grp"].kms_key_id.as_deref(),
5564            Some("arn:aws:kms:us-east-1:123456789012:key/abc-123")
5565        );
5566    }
5567
5568    #[test]
5569    fn disassociate_kms_key_via_resource_identifier_name() {
5570        let svc = make_service();
5571        create_group(&svc, "grp");
5572
5573        // First associate
5574        let req = make_request(
5575            "AssociateKmsKey",
5576            json!({ "logGroupName": "grp", "kmsKeyId": "some-key" }),
5577        );
5578        svc.associate_kms_key(&req).unwrap();
5579
5580        // Disassociate via resourceIdentifier (plain name)
5581        let req = make_request("DisassociateKmsKey", json!({ "resourceIdentifier": "grp" }));
5582        svc.disassociate_kms_key(&req).unwrap();
5583
5584        let state = svc.state.read();
5585        assert!(state.log_groups["grp"].kms_key_id.is_none());
5586    }
5587
5588    // ---- describe_query_definitions: queryDefinitionNamePrefix ----
5589
5590    #[test]
5591    fn describe_query_definitions_filters_by_name_prefix() {
5592        let svc = make_service();
5593
5594        // Create some query definitions
5595        for name in &["error-queries-1", "error-queries-2", "latency-queries-1"] {
5596            let req = make_request(
5597                "PutQueryDefinition",
5598                json!({
5599                    "name": name,
5600                    "queryString": "fields @timestamp | limit 20",
5601                }),
5602            );
5603            svc.put_query_definition(&req).unwrap();
5604        }
5605
5606        let req = make_request(
5607            "DescribeQueryDefinitions",
5608            json!({ "queryDefinitionNamePrefix": "error" }),
5609        );
5610        let resp = svc.describe_query_definitions(&req).unwrap();
5611        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5612        let defs = body["queryDefinitions"].as_array().unwrap();
5613        assert_eq!(defs.len(), 2);
5614        for d in defs {
5615            assert!(d["name"].as_str().unwrap().starts_with("error"));
5616        }
5617    }
5618
5619    #[test]
5620    fn describe_query_definitions_no_prefix_returns_all() {
5621        let svc = make_service();
5622
5623        for name in &["a", "b", "c"] {
5624            let req = make_request(
5625                "PutQueryDefinition",
5626                json!({ "name": name, "queryString": "fields @timestamp" }),
5627            );
5628            svc.put_query_definition(&req).unwrap();
5629        }
5630
5631        let req = make_request("DescribeQueryDefinitions", json!({}));
5632        let resp = svc.describe_query_definitions(&req).unwrap();
5633        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5634        assert_eq!(body["queryDefinitions"].as_array().unwrap().len(), 3);
5635    }
5636
5637    // ---- extract_log_group_from_arn ----
5638
5639    #[test]
5640    fn put_delivery_destination_includes_empty_destination_resource_arn() {
5641        let svc = make_service();
5642        let req = make_request(
5643            "PutDeliveryDestination",
5644            json!({
5645                "name": "my-dest",
5646                "deliveryDestinationConfiguration": {}
5647            }),
5648        );
5649        let resp = svc.put_delivery_destination(&req).unwrap();
5650        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5651        let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5652        // destinationResourceArn should always be present as a string (Smithy requirement)
5653        assert_eq!(
5654            config["destinationResourceArn"].as_str().unwrap(),
5655            "",
5656            "destinationResourceArn should be an empty string when not set"
5657        );
5658    }
5659
5660    #[test]
5661    fn put_delivery_destination_includes_destination_resource_arn_when_set() {
5662        let svc = make_service();
5663        let req = make_request(
5664            "PutDeliveryDestination",
5665            json!({
5666                "name": "my-dest",
5667                "deliveryDestinationConfiguration": {
5668                    "destinationResourceArn": "arn:aws:s3:::my-bucket"
5669                }
5670            }),
5671        );
5672        let resp = svc.put_delivery_destination(&req).unwrap();
5673        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5674        let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5675        assert_eq!(
5676            config["destinationResourceArn"].as_str().unwrap(),
5677            "arn:aws:s3:::my-bucket"
5678        );
5679    }
5680
5681    #[test]
5682    fn extract_log_group_from_arn_strips_wildcard_suffix() {
5683        let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*";
5684        assert_eq!(
5685            extract_log_group_from_arn(arn),
5686            Some("my-group".to_string())
5687        );
5688    }
5689
5690    #[test]
5691    fn extract_log_group_from_arn_without_wildcard() {
5692        let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group";
5693        assert_eq!(
5694            extract_log_group_from_arn(arn),
5695            Some("my-group".to_string())
5696        );
5697    }
5698
5699    #[test]
5700    fn extract_log_group_from_arn_invalid() {
5701        assert_eq!(extract_log_group_from_arn("not-an-arn"), None);
5702    }
5703
5704    // ---- Account policies ----
5705
5706    #[test]
5707    fn account_policy_lifecycle() {
5708        let svc = make_service();
5709
5710        let req = make_request(
5711            "PutAccountPolicy",
5712            json!({
5713                "policyName": "test-policy",
5714                "policyType": "DATA_PROTECTION_POLICY",
5715                "policyDocument": "{\"Name\":\"test\"}",
5716            }),
5717        );
5718        let resp = svc.put_account_policy(&req).unwrap();
5719        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5720        assert_eq!(body["accountPolicy"]["policyName"], "test-policy");
5721
5722        let req = make_request(
5723            "DescribeAccountPolicies",
5724            json!({ "policyType": "DATA_PROTECTION_POLICY" }),
5725        );
5726        let resp = svc.describe_account_policies(&req).unwrap();
5727        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5728        assert_eq!(body["accountPolicies"].as_array().unwrap().len(), 1);
5729
5730        let req = make_request(
5731            "DeleteAccountPolicy",
5732            json!({
5733                "policyName": "test-policy",
5734                "policyType": "DATA_PROTECTION_POLICY",
5735            }),
5736        );
5737        svc.delete_account_policy(&req).unwrap();
5738
5739        let req = make_request(
5740            "DescribeAccountPolicies",
5741            json!({ "policyType": "DATA_PROTECTION_POLICY" }),
5742        );
5743        let resp = svc.describe_account_policies(&req).unwrap();
5744        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5745        assert!(body["accountPolicies"].as_array().unwrap().is_empty());
5746    }
5747
5748    // ---- Data protection policy ----
5749
5750    #[test]
5751    fn data_protection_policy_lifecycle() {
5752        let svc = make_service();
5753        create_group(&svc, "dp-group");
5754
5755        let req = make_request(
5756            "PutDataProtectionPolicy",
5757            json!({
5758                "logGroupIdentifier": "dp-group",
5759                "policyDocument": "{\"Name\":\"dp\"}",
5760            }),
5761        );
5762        svc.put_data_protection_policy(&req).unwrap();
5763
5764        let req = make_request(
5765            "GetDataProtectionPolicy",
5766            json!({ "logGroupIdentifier": "dp-group" }),
5767        );
5768        let resp = svc.get_data_protection_policy(&req).unwrap();
5769        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5770        assert_eq!(body["policyDocument"], "{\"Name\":\"dp\"}");
5771
5772        let req = make_request(
5773            "DeleteDataProtectionPolicy",
5774            json!({ "logGroupIdentifier": "dp-group" }),
5775        );
5776        svc.delete_data_protection_policy(&req).unwrap();
5777
5778        let req = make_request(
5779            "GetDataProtectionPolicy",
5780            json!({ "logGroupIdentifier": "dp-group" }),
5781        );
5782        let resp = svc.get_data_protection_policy(&req).unwrap();
5783        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5784        assert!(body.get("policyDocument").is_none());
5785    }
5786
5787    // ---- Index policies ----
5788
5789    #[test]
5790    fn index_policy_lifecycle() {
5791        let svc = make_service();
5792        create_group(&svc, "idx-group");
5793
5794        let req = make_request(
5795            "PutIndexPolicy",
5796            json!({
5797                "logGroupIdentifier": "idx-group",
5798                "policyDocument": "{\"Fields\":[\"field1\"]}",
5799            }),
5800        );
5801        svc.put_index_policy(&req).unwrap();
5802
5803        let req = make_request(
5804            "DescribeIndexPolicies",
5805            json!({ "logGroupIdentifiers": ["idx-group"] }),
5806        );
5807        let resp = svc.describe_index_policies(&req).unwrap();
5808        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809        assert_eq!(body["indexPolicies"].as_array().unwrap().len(), 1);
5810
5811        let req = make_request(
5812            "DeleteIndexPolicy",
5813            json!({
5814                "logGroupIdentifier": "idx-group",
5815            }),
5816        );
5817        svc.delete_index_policy(&req).unwrap();
5818    }
5819
5820    // ---- Transformers ----
5821
5822    #[test]
5823    fn transformer_lifecycle() {
5824        let svc = make_service();
5825        create_group(&svc, "tx-group");
5826
5827        let req = make_request(
5828            "PutTransformer",
5829            json!({
5830                "logGroupIdentifier": "tx-group",
5831                "transformerConfig": [{"addKeys":{"entries":[{"key":"new","value":"val"}]}}],
5832            }),
5833        );
5834        svc.put_transformer(&req).unwrap();
5835
5836        let req = make_request(
5837            "GetTransformer",
5838            json!({ "logGroupIdentifier": "tx-group" }),
5839        );
5840        let resp = svc.get_transformer(&req).unwrap();
5841        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5842        assert!(body["transformerConfig"].is_array());
5843
5844        let req = make_request(
5845            "DeleteTransformer",
5846            json!({ "logGroupIdentifier": "tx-group" }),
5847        );
5848        svc.delete_transformer(&req).unwrap();
5849    }
5850
5851    #[test]
5852    fn test_transformer_returns_transformed_events() {
5853        let svc = make_service();
5854
5855        let req = make_request(
5856            "TestTransformer",
5857            json!({
5858                "transformerConfig": [{"addKeys":{"entries":[]}}],
5859                "logEventMessages": ["hello", "world"],
5860            }),
5861        );
5862        let resp = svc.test_transformer(&req).unwrap();
5863        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5864        assert_eq!(body["transformedLogs"].as_array().unwrap().len(), 2);
5865    }
5866
5867    // ---- Anomaly detectors ----
5868
5869    #[test]
5870    fn anomaly_detector_lifecycle() {
5871        let svc = make_service();
5872
5873        let req = make_request(
5874            "CreateLogAnomalyDetector",
5875            json!({
5876                "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
5877                "detectorName": "my-detector",
5878            }),
5879        );
5880        let resp = svc.create_log_anomaly_detector(&req).unwrap();
5881        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5882        let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
5883
5884        let req = make_request(
5885            "GetLogAnomalyDetector",
5886            json!({ "anomalyDetectorArn": &arn }),
5887        );
5888        let resp = svc.get_log_anomaly_detector(&req).unwrap();
5889        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5890        assert_eq!(body["detectorName"], "my-detector");
5891
5892        let req = make_request("ListLogAnomalyDetectors", json!({}));
5893        let resp = svc.list_log_anomaly_detectors(&req).unwrap();
5894        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5895        assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
5896
5897        let req = make_request(
5898            "UpdateLogAnomalyDetector",
5899            json!({ "anomalyDetectorArn": &arn, "enabled": false }),
5900        );
5901        svc.update_log_anomaly_detector(&req).unwrap();
5902
5903        let req = make_request(
5904            "DeleteLogAnomalyDetector",
5905            json!({ "anomalyDetectorArn": &arn }),
5906        );
5907        svc.delete_log_anomaly_detector(&req).unwrap();
5908    }
5909
5910    // ---- Misc operations ----
5911
5912    #[test]
5913    fn get_log_group_fields_returns_stub() {
5914        let svc = make_service();
5915        create_group(&svc, "fields-group");
5916
5917        let req = make_request(
5918            "GetLogGroupFields",
5919            json!({ "logGroupName": "fields-group" }),
5920        );
5921        let resp = svc.get_log_group_fields(&req).unwrap();
5922        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5923        assert_eq!(body["logGroupFields"].as_array().unwrap().len(), 2);
5924    }
5925
5926    #[test]
5927    fn test_metric_filter_matches() {
5928        let svc = make_service();
5929
5930        let req = make_request(
5931            "TestMetricFilter",
5932            json!({
5933                "filterPattern": "ERROR",
5934                "logEventMessages": ["ERROR: oops", "INFO: ok", "ERROR: again"],
5935            }),
5936        );
5937        let resp = svc.test_metric_filter(&req).unwrap();
5938        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5939        assert_eq!(body["matches"].as_array().unwrap().len(), 2);
5940    }
5941
5942    #[test]
5943    fn stop_query_marks_as_cancelled() {
5944        let svc = make_service();
5945        create_group(&svc, "sq-group");
5946
5947        let req = make_request(
5948            "StartQuery",
5949            json!({
5950                "logGroupName": "sq-group",
5951                "startTime": 0,
5952                "endTime": 9999999999i64,
5953                "queryString": "fields @timestamp",
5954            }),
5955        );
5956        let resp = svc.start_query(&req).unwrap();
5957        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5958        let qid = body["queryId"].as_str().unwrap().to_string();
5959
5960        // Manually set query status to Running so we can test cancellation
5961        {
5962            let mut state = svc.state.write();
5963            state.queries.get_mut(&qid).unwrap().status = "Running".to_string();
5964        }
5965
5966        let req = make_request("StopQuery", json!({ "queryId": &qid }));
5967        let resp = svc.stop_query(&req).unwrap();
5968        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969        assert_eq!(body["success"], true);
5970
5971        let state = svc.state.read();
5972        assert_eq!(state.queries[&qid].status, "Cancelled");
5973    }
5974
5975    #[test]
5976    fn put_log_group_deletion_protection() {
5977        let svc = make_service();
5978        create_group(&svc, "prot-group");
5979
5980        let req = make_request(
5981            "PutLogGroupDeletionProtection",
5982            json!({
5983                "logGroupIdentifier": "prot-group",
5984                "deletionProtectionEnabled": true,
5985            }),
5986        );
5987        svc.put_log_group_deletion_protection(&req).unwrap();
5988
5989        let state = svc.state.read();
5990        assert!(state.log_groups["prot-group"].deletion_protection);
5991    }
5992
5993    #[test]
5994    fn get_log_record_returns_empty_stub() {
5995        let svc = make_service();
5996
5997        let req = make_request(
5998            "GetLogRecord",
5999            json!({ "logRecordPointer": "some-pointer" }),
6000        );
6001        let resp = svc.get_log_record(&req).unwrap();
6002        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6003        assert!(body["logRecord"].is_object());
6004    }
6005
6006    #[test]
6007    fn list_anomalies_returns_empty() {
6008        let svc = make_service();
6009
6010        let req = make_request("ListAnomalies", json!({}));
6011        let resp = svc.list_anomalies(&req).unwrap();
6012        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013        assert!(body["anomalies"].as_array().unwrap().is_empty());
6014    }
6015
6016    #[test]
6017    fn update_anomaly_noop() {
6018        let svc = make_service();
6019        let req = make_request("UpdateAnomaly", json!({}));
6020        svc.update_anomaly(&req).unwrap();
6021    }
6022
6023    // -- Import tasks --
6024
6025    #[test]
6026    fn import_task_lifecycle() {
6027        let svc = make_service();
6028
6029        let req = make_request(
6030            "CreateImportTask",
6031            json!({
6032                "importSourceArn": "arn:aws:s3:::my-bucket/logs",
6033                "importRoleArn": "arn:aws:iam::123456789012:role/import-role"
6034            }),
6035        );
6036        let resp = svc.create_import_task(&req).unwrap();
6037        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6038        let import_id = body["importId"].as_str().unwrap().to_string();
6039
6040        let req = make_request("DescribeImportTasks", json!({}));
6041        let resp = svc.describe_import_tasks(&req).unwrap();
6042        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6043        assert_eq!(body["imports"].as_array().unwrap().len(), 1);
6044
6045        let req = make_request(
6046            "DescribeImportTaskBatches",
6047            json!({ "importId": import_id }),
6048        );
6049        let resp = svc.describe_import_task_batches(&req).unwrap();
6050        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6051        assert!(body["importBatches"].as_array().unwrap().is_empty());
6052
6053        let req = make_request("CancelImportTask", json!({ "importId": import_id }));
6054        svc.cancel_import_task(&req).unwrap();
6055
6056        let req = make_request("DescribeImportTasks", json!({}));
6057        let resp = svc.describe_import_tasks(&req).unwrap();
6058        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6059        assert_eq!(
6060            body["imports"][0]["importStatus"].as_str().unwrap(),
6061            "CANCELLED"
6062        );
6063    }
6064
6065    // -- Integrations --
6066
6067    #[test]
6068    fn integration_lifecycle() {
6069        let svc = make_service();
6070
6071        let req = make_request(
6072            "PutIntegration",
6073            json!({
6074                "integrationName": "test-int",
6075                "integrationType": "OPENSEARCH",
6076                "resourceConfig": { "openSearchResourceConfig": {} }
6077            }),
6078        );
6079        svc.put_integration(&req).unwrap();
6080
6081        let req = make_request("GetIntegration", json!({ "integrationName": "test-int" }));
6082        let resp = svc.get_integration(&req).unwrap();
6083        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6084        assert_eq!(body["integrationName"].as_str().unwrap(), "test-int");
6085
6086        let req = make_request("ListIntegrations", json!({}));
6087        let resp = svc.list_integrations(&req).unwrap();
6088        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6089        assert_eq!(body["integrationSummaries"].as_array().unwrap().len(), 1);
6090
6091        let req = make_request(
6092            "DeleteIntegration",
6093            json!({ "integrationName": "test-int" }),
6094        );
6095        svc.delete_integration(&req).unwrap();
6096
6097        let req = make_request("ListIntegrations", json!({}));
6098        let resp = svc.list_integrations(&req).unwrap();
6099        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6100        assert!(body["integrationSummaries"].as_array().unwrap().is_empty());
6101    }
6102
6103    // -- Lookup tables --
6104
6105    #[test]
6106    fn lookup_table_lifecycle() {
6107        let svc = make_service();
6108
6109        let req = make_request(
6110            "CreateLookupTable",
6111            json!({
6112                "lookupTableName": "test-table",
6113                "tableBody": "key,value\na,b"
6114            }),
6115        );
6116        let resp = svc.create_lookup_table(&req).unwrap();
6117        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6118        let arn = body["lookupTableArn"].as_str().unwrap().to_string();
6119
6120        let req = make_request("GetLookupTable", json!({ "lookupTableArn": arn }));
6121        let resp = svc.get_lookup_table(&req).unwrap();
6122        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6123        assert_eq!(body["lookupTableName"].as_str().unwrap(), "test-table");
6124
6125        let req = make_request("DescribeLookupTables", json!({}));
6126        let resp = svc.describe_lookup_tables(&req).unwrap();
6127        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6128        assert_eq!(body["lookupTables"].as_array().unwrap().len(), 1);
6129
6130        let req = make_request(
6131            "UpdateLookupTable",
6132            json!({ "lookupTableArn": arn, "tableBody": "key,value\nc,d" }),
6133        );
6134        svc.update_lookup_table(&req).unwrap();
6135
6136        let req = make_request("DeleteLookupTable", json!({ "lookupTableArn": arn }));
6137        svc.delete_lookup_table(&req).unwrap();
6138
6139        let req = make_request("DescribeLookupTables", json!({}));
6140        let resp = svc.describe_lookup_tables(&req).unwrap();
6141        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6142        assert!(body["lookupTables"].as_array().unwrap().is_empty());
6143    }
6144
6145    // -- Scheduled queries --
6146
6147    #[test]
6148    fn scheduled_query_lifecycle() {
6149        let svc = make_service();
6150
6151        let req = make_request(
6152            "CreateScheduledQuery",
6153            json!({
6154                "name": "test-sq",
6155                "queryString": "fields @timestamp | limit 10",
6156                "queryLanguage": "CWLI",
6157                "scheduleExpression": "rate(1 hour)",
6158                "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6159            }),
6160        );
6161        let resp = svc.create_scheduled_query(&req).unwrap();
6162        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6163        let arn = body["scheduledQueryArn"].as_str().unwrap().to_string();
6164
6165        let req = make_request("GetScheduledQuery", json!({ "identifier": arn }));
6166        let resp = svc.get_scheduled_query(&req).unwrap();
6167        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168        assert_eq!(body["name"].as_str().unwrap(), "test-sq");
6169
6170        let req = make_request(
6171            "GetScheduledQueryHistory",
6172            json!({ "identifier": arn, "startTime": 0_i64, "endTime": 9999999999_i64 }),
6173        );
6174        let resp = svc.get_scheduled_query_history(&req).unwrap();
6175        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6176        assert!(body["triggerHistory"].as_array().unwrap().is_empty());
6177
6178        let req = make_request("ListScheduledQueries", json!({}));
6179        let resp = svc.list_scheduled_queries(&req).unwrap();
6180        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6181        assert_eq!(body["scheduledQueries"].as_array().unwrap().len(), 1);
6182
6183        let req = make_request(
6184            "UpdateScheduledQuery",
6185            json!({
6186                "identifier": arn,
6187                "queryString": "fields @message | limit 5",
6188                "queryLanguage": "CWLI",
6189                "scheduleExpression": "rate(2 hours)",
6190                "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6191            }),
6192        );
6193        svc.update_scheduled_query(&req).unwrap();
6194
6195        let req = make_request("DeleteScheduledQuery", json!({ "identifier": arn }));
6196        svc.delete_scheduled_query(&req).unwrap();
6197
6198        let req = make_request("ListScheduledQueries", json!({}));
6199        let resp = svc.list_scheduled_queries(&req).unwrap();
6200        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6201        assert!(body["scheduledQueries"].as_array().unwrap().is_empty());
6202    }
6203
6204    // -- Misc stubs --
6205
6206    #[test]
6207    fn start_live_tail_returns_session() {
6208        let svc = make_service();
6209        let req = make_request(
6210            "StartLiveTail",
6211            json!({ "logGroupIdentifiers": ["/test/group"] }),
6212        );
6213        let resp = svc.start_live_tail(&req).unwrap();
6214        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6215        assert!(body["responseStream"]["sessionStart"]["sessionId"]
6216            .as_str()
6217            .is_some());
6218    }
6219
6220    #[test]
6221    fn list_log_groups_delegates_to_describe() {
6222        let svc = make_service();
6223        create_group(&svc, "/test/list");
6224        let req = make_request("DescribeLogGroups", json!({}));
6225        let resp = svc.describe_log_groups(&req).unwrap();
6226        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6227        assert_eq!(body["logGroups"].as_array().unwrap().len(), 1);
6228    }
6229
6230    #[test]
6231    fn list_log_groups_for_query_returns_empty() {
6232        let svc = make_service();
6233        let req = make_request(
6234            "ListLogGroupsForQuery",
6235            json!({ "queryId": "some-query-id" }),
6236        );
6237        let resp = svc.list_log_groups_for_query(&req).unwrap();
6238        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6239        assert!(body["logGroupIdentifiers"].as_array().unwrap().is_empty());
6240    }
6241
6242    #[test]
6243    fn list_aggregate_log_group_summaries_returns_empty() {
6244        let svc = make_service();
6245        let req = make_request(
6246            "ListAggregateLogGroupSummaries",
6247            json!({ "groupBy": "ACCOUNT" }),
6248        );
6249        let resp = svc.list_aggregate_log_group_summaries(&req).unwrap();
6250        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6251        assert!(body["aggregateLogGroupSummaries"]
6252            .as_array()
6253            .unwrap()
6254            .is_empty());
6255    }
6256
6257    #[test]
6258    fn put_bearer_token_authentication_stores_flag() {
6259        let svc = make_service();
6260        create_group(&svc, "/test/bearer");
6261        let req = make_request(
6262            "PutBearerTokenAuthentication",
6263            json!({
6264                "logGroupIdentifier": "/test/bearer",
6265                "bearerTokenAuthenticationEnabled": true
6266            }),
6267        );
6268        svc.put_bearer_token_authentication(&req).unwrap();
6269    }
6270
6271    #[test]
6272    fn get_log_object_returns_stub() {
6273        let svc = make_service();
6274        let req = make_request(
6275            "GetLogObject",
6276            json!({ "logObjectPointer": "some-pointer" }),
6277        );
6278        let resp = svc.get_log_object(&req).unwrap();
6279        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6280        assert!(body["logObject"].is_object());
6281    }
6282
6283    #[test]
6284    fn get_log_fields_returns_stub() {
6285        let svc = make_service();
6286        let req = make_request(
6287            "GetLogFields",
6288            json!({ "dataSourceName": "test", "dataSourceType": "CW_LOG" }),
6289        );
6290        let resp = svc.get_log_fields(&req).unwrap();
6291        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6292        assert!(body["logFields"].as_array().unwrap().is_empty());
6293    }
6294
6295    #[test]
6296    fn s3_table_integration_stubs() {
6297        let svc = make_service();
6298
6299        let req = make_request(
6300            "AssociateSourceToS3TableIntegration",
6301            json!({
6302                "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test",
6303                "dataSource": { "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:test" }
6304            }),
6305        );
6306        svc.associate_source_to_s3_table_integration(&req).unwrap();
6307
6308        let req = make_request(
6309            "ListSourcesForS3TableIntegration",
6310            json!({
6311                "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test"
6312            }),
6313        );
6314        let resp = svc.list_sources_for_s3_table_integration(&req).unwrap();
6315        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6316        assert_eq!(body["dataSources"].as_array().unwrap().len(), 1);
6317
6318        let req = make_request(
6319            "DisassociateSourceFromS3TableIntegration",
6320            json!({ "identifier": "arn:aws:logs:us-east-1:123456789012:integration:test" }),
6321        );
6322        svc.disassociate_source_from_s3_table_integration(&req)
6323            .unwrap();
6324    }
6325
6326    #[test]
6327    fn update_delivery_configuration_noop() {
6328        let svc = make_service();
6329        // First create a delivery setup
6330        let req = make_request(
6331            "PutDeliverySource",
6332            json!({
6333                "name": "test-ds",
6334                "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:dummy",
6335                "logType": "APPLICATION_LOGS"
6336            }),
6337        );
6338        svc.put_delivery_source(&req).unwrap();
6339
6340        let req = make_request(
6341            "PutDeliveryDestination",
6342            json!({
6343                "name": "test-dd",
6344                "deliveryDestinationConfiguration": {
6345                    "destinationResourceArn": "arn:aws:s3:::test-bucket"
6346                }
6347            }),
6348        );
6349        svc.put_delivery_destination(&req).unwrap();
6350
6351        let req = make_request(
6352            "CreateDelivery",
6353            json!({
6354                "deliverySourceName": "test-ds",
6355                "deliveryDestinationArn": "arn:aws:logs:us-east-1:123456789012:delivery-destination:test-dd"
6356            }),
6357        );
6358        let resp = svc.create_delivery(&req).unwrap();
6359        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6360        let delivery_id = body["delivery"]["id"].as_str().unwrap().to_string();
6361
6362        let req = make_request("UpdateDeliveryConfiguration", json!({ "id": delivery_id }));
6363        svc.update_delivery_configuration(&req).unwrap();
6364    }
6365
6366    #[test]
6367    fn describe_configuration_templates_returns_empty() {
6368        let svc = make_service();
6369        let req = make_request("DescribeConfigurationTemplates", json!({}));
6370        let resp = svc.describe_configuration_templates(&req).unwrap();
6371        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6372        assert!(body["configurationTemplates"]
6373            .as_array()
6374            .unwrap()
6375            .is_empty());
6376    }
6377}