Skip to main content

fakecloud_logs/
service.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
7use fakecloud_core::validation::*;
8
9use crate::state::{
10    AccountPolicy, AnomalyDetector, DataProtectionPolicy, Delivery, DeliveryDestination,
11    DeliverySource, Destination, ExportTask, ImportTask, IndexPolicy, Integration, LogEvent,
12    LogGroup, LogStream, LookupTable, MetricFilter, MetricTransformation, QueryDefinition,
13    QueryInfo, ResourcePolicy, ScheduledQuery, SharedLogsState, SubscriptionFilter, Transformer,
14};
15
16pub struct LogsService {
17    state: SharedLogsState,
18}
19
20impl LogsService {
21    pub fn new(state: SharedLogsState) -> Self {
22        Self { state }
23    }
24}
25
26#[async_trait]
27impl AwsService for LogsService {
28    fn service_name(&self) -> &str {
29        "logs"
30    }
31
32    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
33        match req.action.as_str() {
34            "CreateLogGroup" => self.create_log_group(&req),
35            "DeleteLogGroup" => self.delete_log_group(&req),
36            "DescribeLogGroups" => self.describe_log_groups(&req),
37            "CreateLogStream" => self.create_log_stream(&req),
38            "DeleteLogStream" => self.delete_log_stream(&req),
39            "DescribeLogStreams" => self.describe_log_streams(&req),
40            "PutLogEvents" => self.put_log_events(&req),
41            "GetLogEvents" => self.get_log_events(&req),
42            "FilterLogEvents" => self.filter_log_events(&req),
43            "TagLogGroup" => self.tag_log_group(&req),
44            "UntagLogGroup" => self.untag_log_group(&req),
45            "ListTagsLogGroup" => self.list_tags_log_group(&req),
46            "TagResource" => self.tag_resource(&req),
47            "UntagResource" => self.untag_resource(&req),
48            "ListTagsForResource" => self.list_tags_for_resource(&req),
49            "PutRetentionPolicy" => self.put_retention_policy(&req),
50            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
51            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
52            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
53            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
54            "PutMetricFilter" => self.put_metric_filter(&req),
55            "DescribeMetricFilters" => self.describe_metric_filters(&req),
56            "DeleteMetricFilter" => self.delete_metric_filter(&req),
57            "PutResourcePolicy" => self.put_resource_policy(&req),
58            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
59            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
60            "PutDestination" => self.put_destination(&req),
61            "DescribeDestinations" => self.describe_destinations(&req),
62            "DeleteDestination" => self.delete_destination(&req),
63            "PutDestinationPolicy" => self.put_destination_policy(&req),
64            "StartQuery" => self.start_query(&req),
65            "GetQueryResults" => self.get_query_results(&req),
66            "DescribeQueries" => self.describe_queries(&req),
67            "CreateExportTask" => self.create_export_task(&req),
68            "DescribeExportTasks" => self.describe_export_tasks(&req),
69            "CancelExportTask" => self.cancel_export_task(&req),
70            "PutDeliveryDestination" => self.put_delivery_destination(&req),
71            "GetDeliveryDestination" => self.get_delivery_destination(&req),
72            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
73            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
74            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
75            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
76            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
77            "PutDeliverySource" => self.put_delivery_source(&req),
78            "GetDeliverySource" => self.get_delivery_source(&req),
79            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
80            "DeleteDeliverySource" => self.delete_delivery_source(&req),
81            "CreateDelivery" => self.create_delivery(&req),
82            "GetDelivery" => self.get_delivery(&req),
83            "DescribeDeliveries" => self.describe_deliveries(&req),
84            "DeleteDelivery" => self.delete_delivery(&req),
85            "AssociateKmsKey" => self.associate_kms_key(&req),
86            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
87            "PutQueryDefinition" => self.put_query_definition(&req),
88            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
89            "DeleteQueryDefinition" => self.delete_query_definition(&req),
90            "PutAccountPolicy" => self.put_account_policy(&req),
91            "DescribeAccountPolicies" => self.describe_account_policies(&req),
92            "DeleteAccountPolicy" => self.delete_account_policy(&req),
93            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
94            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
95            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
96            "PutIndexPolicy" => self.put_index_policy(&req),
97            "DescribeIndexPolicies" => self.describe_index_policies(&req),
98            "DeleteIndexPolicy" => self.delete_index_policy(&req),
99            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
100            "PutTransformer" => self.put_transformer(&req),
101            "GetTransformer" => self.get_transformer(&req),
102            "DeleteTransformer" => self.delete_transformer(&req),
103            "TestTransformer" => self.test_transformer(&req),
104            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
105            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
106            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
107            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
108            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
109            "GetLogGroupFields" => self.get_log_group_fields(&req),
110            "TestMetricFilter" => self.test_metric_filter(&req),
111            "StopQuery" => self.stop_query(&req),
112            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
113            "GetLogRecord" => self.get_log_record(&req),
114            "ListAnomalies" => self.list_anomalies(&req),
115            "UpdateAnomaly" => self.update_anomaly(&req),
116            "CreateImportTask" => self.create_import_task(&req),
117            "DescribeImportTasks" => self.describe_import_tasks(&req),
118            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
119            "CancelImportTask" => self.cancel_import_task(&req),
120            "PutIntegration" => self.put_integration(&req),
121            "GetIntegration" => self.get_integration(&req),
122            "DeleteIntegration" => self.delete_integration(&req),
123            "ListIntegrations" => self.list_integrations(&req),
124            "CreateLookupTable" => self.create_lookup_table(&req),
125            "GetLookupTable" => self.get_lookup_table(&req),
126            "DescribeLookupTables" => self.describe_lookup_tables(&req),
127            "DeleteLookupTable" => self.delete_lookup_table(&req),
128            "UpdateLookupTable" => self.update_lookup_table(&req),
129            "CreateScheduledQuery" => self.create_scheduled_query(&req),
130            "GetScheduledQuery" => self.get_scheduled_query(&req),
131            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
132            "ListScheduledQueries" => self.list_scheduled_queries(&req),
133            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
134            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
135            "StartLiveTail" => self.start_live_tail(&req),
136            "ListLogGroups" => self.list_log_groups(&req),
137            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
138            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
139            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
140            "GetLogObject" => self.get_log_object(&req),
141            "GetLogFields" => self.get_log_fields(&req),
142            "AssociateSourceToS3TableIntegration" => {
143                self.associate_source_to_s3_table_integration(&req)
144            }
145            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
146            "DisassociateSourceFromS3TableIntegration" => {
147                self.disassociate_source_from_s3_table_integration(&req)
148            }
149            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
150            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
151            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
152        }
153    }
154
155    fn supported_actions(&self) -> &[&str] {
156        &[
157            "CreateLogGroup",
158            "DeleteLogGroup",
159            "DescribeLogGroups",
160            "CreateLogStream",
161            "DeleteLogStream",
162            "DescribeLogStreams",
163            "PutLogEvents",
164            "GetLogEvents",
165            "FilterLogEvents",
166            "TagLogGroup",
167            "UntagLogGroup",
168            "ListTagsLogGroup",
169            "TagResource",
170            "UntagResource",
171            "ListTagsForResource",
172            "PutRetentionPolicy",
173            "DeleteRetentionPolicy",
174            "PutSubscriptionFilter",
175            "DescribeSubscriptionFilters",
176            "DeleteSubscriptionFilter",
177            "PutMetricFilter",
178            "DescribeMetricFilters",
179            "DeleteMetricFilter",
180            "PutResourcePolicy",
181            "DescribeResourcePolicies",
182            "DeleteResourcePolicy",
183            "PutDestination",
184            "DescribeDestinations",
185            "DeleteDestination",
186            "PutDestinationPolicy",
187            "StartQuery",
188            "GetQueryResults",
189            "DescribeQueries",
190            "CreateExportTask",
191            "DescribeExportTasks",
192            "CancelExportTask",
193            "PutDeliveryDestination",
194            "GetDeliveryDestination",
195            "DescribeDeliveryDestinations",
196            "DeleteDeliveryDestination",
197            "PutDeliveryDestinationPolicy",
198            "GetDeliveryDestinationPolicy",
199            "DeleteDeliveryDestinationPolicy",
200            "PutDeliverySource",
201            "GetDeliverySource",
202            "DescribeDeliverySources",
203            "DeleteDeliverySource",
204            "CreateDelivery",
205            "GetDelivery",
206            "DescribeDeliveries",
207            "DeleteDelivery",
208            "AssociateKmsKey",
209            "DisassociateKmsKey",
210            "PutQueryDefinition",
211            "DescribeQueryDefinitions",
212            "DeleteQueryDefinition",
213            "PutAccountPolicy",
214            "DescribeAccountPolicies",
215            "DeleteAccountPolicy",
216            "PutDataProtectionPolicy",
217            "GetDataProtectionPolicy",
218            "DeleteDataProtectionPolicy",
219            "PutIndexPolicy",
220            "DescribeIndexPolicies",
221            "DeleteIndexPolicy",
222            "DescribeFieldIndexes",
223            "PutTransformer",
224            "GetTransformer",
225            "DeleteTransformer",
226            "TestTransformer",
227            "CreateLogAnomalyDetector",
228            "GetLogAnomalyDetector",
229            "DeleteLogAnomalyDetector",
230            "ListLogAnomalyDetectors",
231            "UpdateLogAnomalyDetector",
232            "GetLogGroupFields",
233            "TestMetricFilter",
234            "StopQuery",
235            "PutLogGroupDeletionProtection",
236            "GetLogRecord",
237            "ListAnomalies",
238            "UpdateAnomaly",
239            "CreateImportTask",
240            "DescribeImportTasks",
241            "DescribeImportTaskBatches",
242            "CancelImportTask",
243            "PutIntegration",
244            "GetIntegration",
245            "DeleteIntegration",
246            "ListIntegrations",
247            "CreateLookupTable",
248            "GetLookupTable",
249            "DescribeLookupTables",
250            "DeleteLookupTable",
251            "UpdateLookupTable",
252            "CreateScheduledQuery",
253            "GetScheduledQuery",
254            "GetScheduledQueryHistory",
255            "ListScheduledQueries",
256            "DeleteScheduledQuery",
257            "UpdateScheduledQuery",
258            "StartLiveTail",
259            "ListLogGroups",
260            "ListLogGroupsForQuery",
261            "ListAggregateLogGroupSummaries",
262            "PutBearerTokenAuthentication",
263            "GetLogObject",
264            "GetLogFields",
265            "AssociateSourceToS3TableIntegration",
266            "ListSourcesForS3TableIntegration",
267            "DisassociateSourceFromS3TableIntegration",
268            "UpdateDeliveryConfiguration",
269            "DescribeConfigurationTemplates",
270        ]
271    }
272}
273
274fn body_json(req: &AwsRequest) -> Value {
275    serde_json::from_slice(&req.body).unwrap_or(Value::Null)
276}
277
278fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
279    body[field].as_str().ok_or_else(|| {
280        AwsServiceError::aws_error(
281            StatusCode::BAD_REQUEST,
282            "InvalidParameterException",
283            format!("{field} is required"),
284        )
285    })
286}
287
288/// Build a delivery destination configuration JSON object, ensuring
289/// `destinationResourceArn` is always present as a string (Smithy requirement).
290fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
291    let mut m: serde_json::Map<String, Value> =
292        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
293    m.entry("destinationResourceArn".to_string())
294        .or_insert_with(|| json!(""));
295    Value::Object(m)
296}
297
298fn generate_sequence_token() -> String {
299    use std::time::{SystemTime, UNIX_EPOCH};
300    let nanos = SystemTime::now()
301        .duration_since(UNIX_EPOCH)
302        .unwrap_or_default()
303        .as_nanos();
304    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
305    format!("{:038}", nanos % 10u128.pow(38))
306}
307
308fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
309    AwsServiceError::aws_error(
310        StatusCode::BAD_REQUEST,
311        "InvalidParameterException",
312        format!(
313            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
314        ),
315    )
316}
317
318impl LogsService {
319    // ---- Log Groups ----
320
321    fn create_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
322        let body = body_json(req);
323        let name = body["logGroupName"]
324            .as_str()
325            .ok_or_else(|| {
326                AwsServiceError::aws_error(
327                    StatusCode::BAD_REQUEST,
328                    "InvalidParameterException",
329                    "logGroupName is required",
330                )
331            })?
332            .to_string();
333
334        validate_string_length("logGroupName", &name, 1, 512)?;
335        validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 1, 256)?;
336
337        let mut state = self.state.write();
338        if state.log_groups.contains_key(&name) {
339            return Err(AwsServiceError::aws_error(
340                StatusCode::BAD_REQUEST,
341                "ResourceAlreadyExistsException",
342                format!("The specified log group already exists: {name}"),
343            ));
344        }
345
346        let arn = format!(
347            "arn:aws:logs:{}:{}:log-group:{}:*",
348            state.region, state.account_id, name
349        );
350        let now = Utc::now().timestamp_millis();
351
352        let tags = body["tags"]
353            .as_object()
354            .map(|m| {
355                m.iter()
356                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
357                    .collect()
358            })
359            .unwrap_or_default();
360
361        let kms_key_id = body["kmsKeyId"].as_str().map(|s| s.to_string());
362
363        state.log_groups.insert(
364            name.clone(),
365            LogGroup {
366                name,
367                arn,
368                creation_time: now,
369                retention_in_days: None,
370                kms_key_id,
371                tags,
372                log_streams: std::collections::HashMap::new(),
373                stored_bytes: 0,
374                subscription_filters: Vec::new(),
375                data_protection_policy: None,
376                index_policies: Vec::new(),
377                transformer: None,
378                deletion_protection: false,
379            },
380        );
381
382        Ok(AwsResponse::json(StatusCode::OK, "{}"))
383    }
384
385    fn delete_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
386        let body = body_json(req);
387        let name = body["logGroupName"].as_str().ok_or_else(|| {
388            AwsServiceError::aws_error(
389                StatusCode::BAD_REQUEST,
390                "InvalidParameterException",
391                "logGroupName is required",
392            )
393        })?;
394
395        validate_string_length("logGroupName", name, 1, 512)?;
396
397        let mut state = self.state.write();
398        // Check deletion protection
399        if let Some(group) = state.log_groups.get(name) {
400            if group.deletion_protection {
401                return Err(AwsServiceError::aws_error(
402                    StatusCode::BAD_REQUEST,
403                    "OperationAbortedException",
404                    format!("Log group {name} has deletion protection enabled"),
405                ));
406            }
407        }
408        if state.log_groups.remove(name).is_none() {
409            return Err(AwsServiceError::aws_error(
410                StatusCode::BAD_REQUEST,
411                "ResourceNotFoundException",
412                format!("The specified log group does not exist: {name}"),
413            ));
414        }
415
416        Ok(AwsResponse::json(StatusCode::OK, "{}"))
417    }
418
419    fn describe_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420        let body = body_json(req);
421        let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
422        let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
423        let limit = body["limit"].as_i64().unwrap_or(50) as usize;
424        let next_token = body["nextToken"].as_str();
425
426        validate_optional_string_length(
427            "logGroupNamePrefix",
428            body["logGroupNamePrefix"].as_str(),
429            1,
430            512,
431        )?;
432        validate_optional_string_length(
433            "logGroupNamePattern",
434            body["logGroupNamePattern"].as_str(),
435            0,
436            512,
437        )?;
438        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
439        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
440        validate_optional_enum_value(
441            "logGroupClass",
442            &body["logGroupClass"],
443            &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
444        )?;
445
446        let state = self.state.read();
447        let mut groups: Vec<&LogGroup> = state
448            .log_groups
449            .values()
450            .filter(|g| {
451                (prefix.is_empty() || g.name.starts_with(prefix))
452                    && (pattern.is_empty() || g.name.contains(pattern))
453            })
454            .collect();
455        groups.sort_by(|a, b| a.name.cmp(&b.name));
456
457        // Handle pagination
458        let start_idx = if let Some(token) = next_token {
459            groups
460                .iter()
461                .position(|g| g.name.as_str() > token)
462                .unwrap_or(groups.len())
463        } else {
464            0
465        };
466
467        let page = &groups[start_idx..];
468        let has_more = page.len() > limit;
469        let page = if has_more { &page[..limit] } else { page };
470
471        let log_groups: Vec<Value> = page
472            .iter()
473            .map(|g| {
474                let log_group_arn = g.arn.trim_end_matches(":*").to_string();
475                let mut obj = json!({
476                    "logGroupName": g.name,
477                    "arn": g.arn,
478                    "logGroupArn": log_group_arn,
479                    "creationTime": g.creation_time,
480                    "storedBytes": g.stored_bytes,
481                    "metricFilterCount": 0,
482                });
483                if let Some(days) = g.retention_in_days {
484                    obj["retentionInDays"] = json!(days);
485                }
486                if let Some(ref kms) = g.kms_key_id {
487                    obj["kmsKeyId"] = json!(kms);
488                }
489                obj
490            })
491            .collect();
492
493        let mut result = json!({ "logGroups": log_groups });
494        if has_more {
495            if let Some(last) = page.last() {
496                result["nextToken"] = json!(last.name);
497            }
498        }
499
500        Ok(AwsResponse::json(
501            StatusCode::OK,
502            serde_json::to_string(&result).unwrap(),
503        ))
504    }
505
506    // ---- Log Streams ----
507
508    fn create_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509        let body = body_json(req);
510        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
511            AwsServiceError::aws_error(
512                StatusCode::BAD_REQUEST,
513                "InvalidParameterException",
514                "logGroupName is required",
515            )
516        })?;
517        let stream_name = body["logStreamName"]
518            .as_str()
519            .ok_or_else(|| {
520                AwsServiceError::aws_error(
521                    StatusCode::BAD_REQUEST,
522                    "InvalidParameterException",
523                    "logStreamName is required",
524                )
525            })?
526            .to_string();
527
528        validate_string_length("logGroupName", group_name, 1, 512)?;
529        validate_string_length("logStreamName", &stream_name, 1, 512)?;
530
531        let mut state = self.state.write();
532        let region = state.region.clone();
533        let account_id = state.account_id.clone();
534
535        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
536            AwsServiceError::aws_error(
537                StatusCode::BAD_REQUEST,
538                "ResourceNotFoundException",
539                format!("The specified log group does not exist: {group_name}"),
540            )
541        })?;
542
543        if group.log_streams.contains_key(&stream_name) {
544            return Err(AwsServiceError::aws_error(
545                StatusCode::BAD_REQUEST,
546                "ResourceAlreadyExistsException",
547                format!("The specified log stream already exists: {stream_name}"),
548            ));
549        }
550
551        let arn = format!(
552            "arn:aws:logs:{region}:{account_id}:log-group:{group_name}:log-stream:{stream_name}",
553        );
554        let now = Utc::now().timestamp_millis();
555
556        group.log_streams.insert(
557            stream_name.clone(),
558            LogStream {
559                name: stream_name,
560                arn,
561                creation_time: now,
562                first_event_timestamp: None,
563                last_event_timestamp: None,
564                last_ingestion_time: None,
565                upload_sequence_token: generate_sequence_token(),
566                events: Vec::new(),
567            },
568        );
569
570        Ok(AwsResponse::json(StatusCode::OK, "{}"))
571    }
572
573    fn delete_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574        let body = body_json(req);
575        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
576            AwsServiceError::aws_error(
577                StatusCode::BAD_REQUEST,
578                "InvalidParameterException",
579                "logGroupName is required",
580            )
581        })?;
582        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
583            AwsServiceError::aws_error(
584                StatusCode::BAD_REQUEST,
585                "InvalidParameterException",
586                "logStreamName is required",
587            )
588        })?;
589
590        validate_string_length("logGroupName", group_name, 1, 512)?;
591        validate_string_length("logStreamName", stream_name, 1, 512)?;
592
593        let mut state = self.state.write();
594        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
595            AwsServiceError::aws_error(
596                StatusCode::BAD_REQUEST,
597                "ResourceNotFoundException",
598                format!("The specified log group does not exist: {group_name}"),
599            )
600        })?;
601
602        if group.log_streams.remove(stream_name).is_none() {
603            return Err(AwsServiceError::aws_error(
604                StatusCode::BAD_REQUEST,
605                "ResourceNotFoundException",
606                format!("The specified log stream does not exist: {stream_name}"),
607            ));
608        }
609
610        Ok(AwsResponse::json(StatusCode::OK, "{}"))
611    }
612
613    fn describe_log_streams(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
614        let body = body_json(req);
615
616        // Support both logGroupName and logGroupIdentifier
617        let group_name = if let Some(name) = body["logGroupName"].as_str() {
618            name.to_string()
619        } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
620            // Validate: must not end with :*
621            if identifier.ends_with(":*") {
622                return Err(AwsServiceError::aws_error(
623                    StatusCode::BAD_REQUEST,
624                    "InvalidParameterException",
625                    format!(
626                        "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
627                        identifier
628                    ),
629                ));
630            }
631            // If it's an ARN, extract the log group name
632            if identifier.starts_with("arn:aws:logs:") {
633                extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
634            } else {
635                identifier.to_string()
636            }
637        } else {
638            return Err(AwsServiceError::aws_error(
639                StatusCode::BAD_REQUEST,
640                "InvalidParameterException",
641                "logGroupName is required",
642            ));
643        };
644
645        let prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
646        let limit = body["limit"].as_i64().unwrap_or(50) as usize;
647        let order_by = body["orderBy"].as_str().unwrap_or("LogStreamName");
648        let next_token = body["nextToken"].as_str();
649
650        validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
651        validate_optional_string_length(
652            "logGroupIdentifier",
653            body["logGroupIdentifier"].as_str(),
654            1,
655            2048,
656        )?;
657        validate_optional_string_length(
658            "logStreamNamePrefix",
659            body["logStreamNamePrefix"].as_str(),
660            1,
661            512,
662        )?;
663
664        // Validate limit
665        if limit > 50 {
666            return Err(validation_error(
667                "limit",
668                &limit.to_string(),
669                "Member must have value less than or equal to 50",
670            ));
671        }
672
673        // Validate orderBy
674        if order_by != "LogStreamName" && order_by != "LastEventTime" {
675            return Err(validation_error(
676                "orderBy",
677                order_by,
678                "Member must satisfy enum value set: [LogStreamName, LastEventTime]",
679            ));
680        }
681
682        // Cannot use prefix with LastEventTime ordering
683        if order_by == "LastEventTime" && !prefix.is_empty() {
684            return Err(AwsServiceError::aws_error(
685                StatusCode::BAD_REQUEST,
686                "InvalidParameterException",
687                "Cannot order by LastEventTime with a logStreamNamePrefix.",
688            ));
689        }
690
691        let state = self.state.read();
692        let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
693            AwsServiceError::aws_error(
694                StatusCode::BAD_REQUEST,
695                "ResourceNotFoundException",
696                format!("The specified log group does not exist: {group_name}"),
697            )
698        })?;
699
700        let mut streams: Vec<&LogStream> = group
701            .log_streams
702            .values()
703            .filter(|s| prefix.is_empty() || s.name.starts_with(prefix))
704            .collect();
705        streams.sort_by(|a, b| a.name.cmp(&b.name));
706
707        // Handle pagination with token format: logGroupName@lastStreamName
708        let start_idx = if let Some(token) = next_token {
709            if let Some((_group, last_stream)) = token.split_once('@') {
710                streams
711                    .iter()
712                    .position(|s| s.name.as_str() > last_stream)
713                    .unwrap_or(streams.len())
714            } else {
715                streams.len() // invalid token -> empty results
716            }
717        } else {
718            0
719        };
720
721        let page = &streams[start_idx..];
722        let has_more = page.len() > limit;
723        let page = if has_more { &page[..limit] } else { page };
724
725        let log_streams: Vec<Value> = page
726            .iter()
727            .map(|s| {
728                let mut obj = json!({
729                    "logStreamName": s.name,
730                    "arn": s.arn,
731                    "creationTime": s.creation_time,
732                    "uploadSequenceToken": s.upload_sequence_token,
733                });
734                if let Some(ts) = s.first_event_timestamp {
735                    obj["firstEventTimestamp"] = json!(ts);
736                }
737                if let Some(ts) = s.last_event_timestamp {
738                    obj["lastEventTimestamp"] = json!(ts);
739                }
740                if let Some(ts) = s.last_ingestion_time {
741                    obj["lastIngestionTime"] = json!(ts);
742                }
743                obj
744            })
745            .collect();
746
747        let mut result = json!({ "logStreams": log_streams });
748        if has_more {
749            if let Some(last) = page.last() {
750                result["nextToken"] = json!(format!("{}@{}", group_name, last.name));
751            }
752        }
753
754        Ok(AwsResponse::json(
755            StatusCode::OK,
756            serde_json::to_string(&result).unwrap(),
757        ))
758    }
759
760    // ---- Log Events ----
761
762    fn put_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763        let body = body_json(req);
764        let group_name = body["logGroupName"].as_str().ok_or_else(|| {
765            AwsServiceError::aws_error(
766                StatusCode::BAD_REQUEST,
767                "InvalidParameterException",
768                "logGroupName is required",
769            )
770        })?;
771        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
772            AwsServiceError::aws_error(
773                StatusCode::BAD_REQUEST,
774                "InvalidParameterException",
775                "logStreamName is required",
776            )
777        })?;
778
779        validate_string_length("logGroupName", group_name, 1, 512)?;
780        validate_string_length("logStreamName", stream_name, 1, 512)?;
781
782        let log_events = body["logEvents"].as_array().ok_or_else(|| {
783            AwsServiceError::aws_error(
784                StatusCode::BAD_REQUEST,
785                "InvalidParameterException",
786                "logEvents is required",
787            )
788        })?;
789
790        let now = Utc::now().timestamp_millis();
791
792        // Check chronological order
793        let timestamps: Vec<i64> = log_events
794            .iter()
795            .map(|e| e["timestamp"].as_i64().unwrap_or(now))
796            .collect();
797        for i in 1..timestamps.len() {
798            if timestamps[i] < timestamps[i - 1] {
799                return Err(AwsServiceError::aws_error(
800                    StatusCode::BAD_REQUEST,
801                    "InvalidParameterException",
802                    "Log events in a single PutLogEvents request must be in chronological order.",
803                ));
804            }
805        }
806
807        // Check for too old (14 days) and too new (2 hours) events
808        let fourteen_days_ms = 14 * 24 * 60 * 60 * 1000i64;
809        let two_hours_ms = 2 * 60 * 60 * 1000i64;
810        let mut too_old_end_index: Option<usize> = None;
811        let mut too_new_start_index: Option<usize> = None;
812
813        for (i, ts) in timestamps.iter().enumerate() {
814            if now.saturating_sub(*ts) > fourteen_days_ms {
815                too_old_end_index = Some(i);
816            } else if ts.saturating_sub(now) > two_hours_ms && too_new_start_index.is_none() {
817                too_new_start_index = Some(i);
818            }
819        }
820
821        // Build events list (only accepted ones)
822        let mut new_events: Vec<LogEvent> = Vec::new();
823        let mut rejected_info = json!({});
824        let mut has_rejected = false;
825
826        for (i, e) in log_events.iter().enumerate() {
827            let ts = e["timestamp"].as_i64().unwrap_or(now);
828            let is_too_old = too_old_end_index.is_some() && i <= too_old_end_index.unwrap();
829            let is_too_new = too_new_start_index.is_some() && i >= too_new_start_index.unwrap();
830
831            if is_too_old || is_too_new {
832                continue;
833            }
834
835            new_events.push(LogEvent {
836                timestamp: ts,
837                message: e["message"].as_str().unwrap_or("").to_string(),
838                ingestion_time: now,
839            });
840        }
841
842        if let Some(idx) = too_old_end_index {
843            rejected_info["tooOldLogEventEndIndex"] = json!(idx);
844            has_rejected = true;
845        }
846        if let Some(idx) = too_new_start_index {
847            rejected_info["tooNewLogEventStartIndex"] = json!(idx);
848            has_rejected = true;
849        }
850
851        let mut state = self.state.write();
852        let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
853            AwsServiceError::aws_error(
854                StatusCode::BAD_REQUEST,
855                "ResourceNotFoundException",
856                format!("The specified log group does not exist: {group_name}"),
857            )
858        })?;
859
860        let stream = group.log_streams.get_mut(stream_name).ok_or_else(|| {
861            AwsServiceError::aws_error(
862                StatusCode::BAD_REQUEST,
863                "ResourceNotFoundException",
864                "The specified log stream does not exist.",
865            )
866        })?;
867
868        // Update stream metadata
869        for event in &new_events {
870            if stream.first_event_timestamp.is_none()
871                || Some(event.timestamp) < stream.first_event_timestamp
872            {
873                stream.first_event_timestamp = Some(event.timestamp);
874            }
875            if stream.last_event_timestamp.is_none()
876                || Some(event.timestamp) > stream.last_event_timestamp
877            {
878                stream.last_event_timestamp = Some(event.timestamp);
879            }
880            group.stored_bytes += event.message.len() as i64 + 26;
881        }
882        stream.last_ingestion_time = Some(now);
883
884        // Generate new sequence token
885        stream.upload_sequence_token = generate_sequence_token();
886
887        stream.events.append(&mut new_events);
888        stream.events.sort_by_key(|e| e.timestamp);
889
890        let mut response = json!({
891            "nextSequenceToken": stream.upload_sequence_token,
892        });
893        if has_rejected {
894            response["rejectedLogEventsInfo"] = rejected_info;
895        }
896
897        Ok(AwsResponse::json(
898            StatusCode::OK,
899            serde_json::to_string(&response).unwrap(),
900        ))
901    }
902
903    fn get_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904        let body = body_json(req);
905
906        // Support both logGroupName and logGroupIdentifier
907        let group_name = if let Some(name) = body["logGroupName"].as_str() {
908            name.to_string()
909        } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
910            if identifier.ends_with(":*") {
911                return Err(AwsServiceError::aws_error(
912                    StatusCode::BAD_REQUEST,
913                    "InvalidParameterException",
914                    format!(
915                        "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
916                        identifier
917                    ),
918                ));
919            }
920            if identifier.starts_with("arn:aws:logs:") {
921                extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
922            } else {
923                identifier.to_string()
924            }
925        } else {
926            return Err(AwsServiceError::aws_error(
927                StatusCode::BAD_REQUEST,
928                "InvalidParameterException",
929                "logGroupName is required",
930            ));
931        };
932
933        let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
934            AwsServiceError::aws_error(
935                StatusCode::BAD_REQUEST,
936                "InvalidParameterException",
937                "logStreamName is required",
938            )
939        })?;
940
941        validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
942        validate_optional_string_length(
943            "logGroupIdentifier",
944            body["logGroupIdentifier"].as_str(),
945            1,
946            2048,
947        )?;
948        validate_string_length("logStreamName", stream_name, 1, 512)?;
949
950        let start_time = body["startTime"].as_i64();
951        let end_time = body["endTime"].as_i64();
952        let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
953        let start_from_head = body["startFromHead"].as_bool().unwrap_or(false);
954        let next_token = body["nextToken"].as_str();
955
956        // Validate limit
957        if limit > 10000 {
958            return Err(validation_error(
959                "limit",
960                &limit.to_string(),
961                "Member must have value less than or equal to 10000",
962            ));
963        }
964
965        // Validate nextToken format
966        if let Some(token) = next_token {
967            if !token.starts_with("f/") && !token.starts_with("b/") {
968                return Err(AwsServiceError::aws_error(
969                    StatusCode::BAD_REQUEST,
970                    "InvalidParameterException",
971                    "The specified nextToken is invalid.",
972                ));
973            }
974            let num_part = &token[2..];
975            if num_part.len() != 56 || num_part.parse::<u128>().is_err() {
976                return Err(AwsServiceError::aws_error(
977                    StatusCode::BAD_REQUEST,
978                    "InvalidParameterException",
979                    "The specified nextToken is invalid.",
980                ));
981            }
982        }
983
984        let state = self.state.read();
985        let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
986            AwsServiceError::aws_error(
987                StatusCode::BAD_REQUEST,
988                "ResourceNotFoundException",
989                format!("The specified log group does not exist: {group_name}"),
990            )
991        })?;
992
993        let stream = group.log_streams.get(stream_name).ok_or_else(|| {
994            AwsServiceError::aws_error(
995                StatusCode::BAD_REQUEST,
996                "ResourceNotFoundException",
997                format!("The specified log stream does not exist: {stream_name}"),
998            )
999        })?;
1000
1001        // All events are indexed 0..n
1002        let all_events: Vec<&LogEvent> = stream
1003            .events
1004            .iter()
1005            .filter(|e| {
1006                if let Some(start) = start_time {
1007                    if e.timestamp < start {
1008                        return false;
1009                    }
1010                }
1011                if let Some(end) = end_time {
1012                    if e.timestamp >= end {
1013                        return false;
1014                    }
1015                }
1016                true
1017            })
1018            .collect();
1019
1020        let total = all_events.len();
1021
1022        // Determine start position from token
1023        let (start_idx, is_forward) = if let Some(token) = next_token {
1024            let is_forward = token.starts_with("f/");
1025            let idx: usize = token[2..].parse().unwrap_or(0);
1026            if is_forward {
1027                // Forward token: start from idx+1
1028                (idx + 1, true)
1029            } else {
1030                // Backward token: end at idx (exclusive), so start at max(0, idx-limit)
1031                (idx, false)
1032            }
1033        } else {
1034            (0, start_from_head)
1035        };
1036
1037        let events_slice: Vec<&LogEvent>;
1038        let next_forward_idx: usize;
1039        let next_backward_idx: usize;
1040
1041        if is_forward || start_from_head && next_token.is_none() {
1042            // Forward: from start_idx, take limit
1043            let end_idx = std::cmp::min(start_idx + limit, total);
1044            if start_idx >= total {
1045                events_slice = Vec::new();
1046                let last_idx = if total > 0 { total - 1 } else { 0 };
1047                next_forward_idx = last_idx;
1048                next_backward_idx = last_idx;
1049            } else {
1050                events_slice = all_events[start_idx..end_idx].to_vec();
1051                next_forward_idx = end_idx - 1;
1052                next_backward_idx = start_idx;
1053            }
1054        } else {
1055            // Backward (default): from end, take last `limit` events
1056            if next_token.is_some() {
1057                // Backward token: start_idx is the position, go backward `limit` from here
1058                let begin = start_idx.saturating_sub(limit);
1059                let end_idx = start_idx;
1060                if begin >= total || end_idx > total || begin >= end_idx {
1061                    events_slice = Vec::new();
1062                    next_forward_idx = start_idx;
1063                    next_backward_idx = start_idx;
1064                } else {
1065                    events_slice = all_events[begin..end_idx].to_vec();
1066                    next_forward_idx = end_idx - 1;
1067                    next_backward_idx = begin;
1068                }
1069            } else {
1070                // No token, not start_from_head: return last `limit` events
1071                let begin = total.saturating_sub(limit);
1072                events_slice = all_events[begin..].to_vec();
1073                next_forward_idx = if total > 0 { total - 1 } else { 0 };
1074                next_backward_idx = begin;
1075            }
1076        }
1077
1078        let events_json: Vec<Value> = events_slice
1079            .iter()
1080            .map(|e| {
1081                json!({
1082                    "timestamp": e.timestamp,
1083                    "message": e.message,
1084                    "ingestionTime": e.ingestion_time,
1085                })
1086            })
1087            .collect();
1088
1089        let forward_token = format!("f/{:056}", next_forward_idx);
1090        let backward_token = format!("b/{:056}", next_backward_idx);
1091
1092        Ok(AwsResponse::json(
1093            StatusCode::OK,
1094            serde_json::to_string(&json!({
1095                "events": events_json,
1096                "nextForwardToken": forward_token,
1097                "nextBackwardToken": backward_token,
1098            }))
1099            .unwrap(),
1100        ))
1101    }
1102
1103    fn filter_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1104        let body = body_json(req);
1105        let log_group_identifier = body["logGroupIdentifier"].as_str();
1106        let log_group_name = body["logGroupName"].as_str();
1107        let filter_pattern = body["filterPattern"].as_str().unwrap_or("");
1108        let start_time = body["startTime"].as_i64();
1109        let end_time = body["endTime"].as_i64();
1110        let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
1111        let next_token = body["nextToken"].as_str();
1112        let stream_names: Vec<&str> = body["logStreamNames"]
1113            .as_array()
1114            .map(|a| a.iter().filter_map(|v| v.as_str()).collect())
1115            .unwrap_or_default();
1116        let stream_name_prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
1117
1118        if let Some(name) = log_group_name {
1119            validate_string_length("logGroupName", name, 1, 512)?;
1120        }
1121        validate_optional_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
1122        validate_optional_string_length(
1123            "logStreamNamePrefix",
1124            body["logStreamNamePrefix"].as_str(),
1125            1,
1126            512,
1127        )?;
1128        validate_optional_string_length("filterPattern", Some(filter_pattern), 0, 1024)?;
1129
1130        // Resolve the effective log group name: logGroupIdentifier takes precedence,
1131        // and can be either a name or an ARN.
1132        let resolved_group_name = if let Some(identifier) = log_group_identifier {
1133            if identifier.starts_with("arn:") {
1134                extract_log_group_from_arn(identifier).ok_or_else(|| {
1135                    AwsServiceError::aws_error(
1136                        StatusCode::BAD_REQUEST,
1137                        "InvalidParameterException",
1138                        format!("Invalid ARN: {identifier}"),
1139                    )
1140                })?
1141            } else {
1142                identifier.to_string()
1143            }
1144        } else if let Some(name) = log_group_name {
1145            name.to_string()
1146        } else {
1147            return Err(AwsServiceError::aws_error(
1148                StatusCode::BAD_REQUEST,
1149                "InvalidParameterException",
1150                "Either logGroupName or logGroupIdentifier is required",
1151            ));
1152        };
1153
1154        // Validate limit
1155        if limit > 10000 {
1156            return Err(validation_error(
1157                "limit",
1158                &limit.to_string(),
1159                "Member must have value less than or equal to 10000",
1160            ));
1161        }
1162
1163        let state = self.state.read();
1164        let group = state
1165            .log_groups
1166            .get(resolved_group_name.as_str())
1167            .ok_or_else(|| {
1168                AwsServiceError::aws_error(
1169                    StatusCode::BAD_REQUEST,
1170                    "ResourceNotFoundException",
1171                    format!("The specified log group does not exist: {resolved_group_name}"),
1172                )
1173            })?;
1174
1175        let mut filtered_events: Vec<Value> = Vec::new();
1176
1177        let streams: Vec<(&String, &LogStream)> = if !stream_names.is_empty() {
1178            group
1179                .log_streams
1180                .iter()
1181                .filter(|(name, _)| stream_names.contains(&name.as_str()))
1182                .collect()
1183        } else if !stream_name_prefix.is_empty() {
1184            group
1185                .log_streams
1186                .iter()
1187                .filter(|(name, _)| name.starts_with(stream_name_prefix))
1188                .collect()
1189        } else {
1190            group.log_streams.iter().collect()
1191        };
1192
1193        for (_, stream) in streams {
1194            for event in &stream.events {
1195                if let Some(start) = start_time {
1196                    if event.timestamp < start {
1197                        continue;
1198                    }
1199                }
1200                if let Some(end) = end_time {
1201                    if event.timestamp >= end {
1202                        continue;
1203                    }
1204                }
1205                // Filter pattern matching
1206                if !filter_pattern.is_empty()
1207                    && !matches_filter_pattern(filter_pattern, &event.message)
1208                {
1209                    continue;
1210                }
1211
1212                let event_id = format!("{}-{}", stream.name, event.timestamp);
1213
1214                filtered_events.push(json!({
1215                    "logStreamName": stream.name,
1216                    "timestamp": event.timestamp,
1217                    "message": event.message,
1218                    "ingestionTime": event.ingestion_time,
1219                    "eventId": event_id,
1220                }));
1221            }
1222        }
1223
1224        filtered_events.sort_by_key(|e| e["timestamp"].as_i64().unwrap_or(0));
1225
1226        // Handle pagination
1227        // Token format: groupName@streamName@eventId
1228        let start_idx = if let Some(token) = next_token {
1229            let parts: Vec<&str> = token.splitn(3, '@').collect();
1230            if parts.len() == 3 {
1231                let after_event_id = parts[2];
1232                // Find the position after this eventId
1233                filtered_events
1234                    .iter()
1235                    .position(|e| e["eventId"].as_str().unwrap_or("") == after_event_id)
1236                    .map(|pos| pos + 1)
1237                    .unwrap_or(filtered_events.len())
1238            } else {
1239                filtered_events.len() // invalid token -> empty results
1240            }
1241        } else {
1242            0
1243        };
1244
1245        let remaining = &filtered_events[start_idx..];
1246        let has_more = remaining.len() > limit;
1247        let page: Vec<Value> = if has_more {
1248            remaining[..limit].to_vec()
1249        } else {
1250            remaining.to_vec()
1251        };
1252
1253        let mut result = json!({
1254            "events": page,
1255            "searchedLogStreams": [],
1256        });
1257
1258        if has_more {
1259            if let Some(last) = page.last() {
1260                let event_id = last["eventId"].as_str().unwrap_or("");
1261                result["nextToken"] = json!(format!(
1262                    "{}@{}@{}",
1263                    resolved_group_name,
1264                    last["logStreamName"].as_str().unwrap_or(""),
1265                    event_id
1266                ));
1267            }
1268        }
1269
1270        Ok(AwsResponse::json(
1271            StatusCode::OK,
1272            serde_json::to_string(&result).unwrap(),
1273        ))
1274    }
1275
1276    // ---- Tags (legacy) ----
1277
1278    fn tag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1279        let body = body_json(req);
1280        let name = body["logGroupName"].as_str().ok_or_else(|| {
1281            AwsServiceError::aws_error(
1282                StatusCode::BAD_REQUEST,
1283                "InvalidParameterException",
1284                "logGroupName is required",
1285            )
1286        })?;
1287
1288        validate_string_length("logGroupName", name, 1, 512)?;
1289
1290        let tags = body["tags"].as_object().ok_or_else(|| {
1291            AwsServiceError::aws_error(
1292                StatusCode::BAD_REQUEST,
1293                "InvalidParameterException",
1294                "tags is required",
1295            )
1296        })?;
1297
1298        let mut state = self.state.write();
1299        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1300            AwsServiceError::aws_error(
1301                StatusCode::BAD_REQUEST,
1302                "ResourceNotFoundException",
1303                format!("The specified log group does not exist: {name}"),
1304            )
1305        })?;
1306
1307        for (k, v) in tags {
1308            group
1309                .tags
1310                .insert(k.clone(), v.as_str().unwrap_or("").to_string());
1311        }
1312
1313        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1314    }
1315
1316    fn untag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317        let body = body_json(req);
1318        let name = body["logGroupName"].as_str().ok_or_else(|| {
1319            AwsServiceError::aws_error(
1320                StatusCode::BAD_REQUEST,
1321                "InvalidParameterException",
1322                "logGroupName is required",
1323            )
1324        })?;
1325
1326        validate_string_length("logGroupName", name, 1, 512)?;
1327
1328        let keys = body["tags"].as_array().ok_or_else(|| {
1329            AwsServiceError::aws_error(
1330                StatusCode::BAD_REQUEST,
1331                "InvalidParameterException",
1332                "tags is required",
1333            )
1334        })?;
1335
1336        let mut state = self.state.write();
1337        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1338            AwsServiceError::aws_error(
1339                StatusCode::BAD_REQUEST,
1340                "ResourceNotFoundException",
1341                format!("The specified log group does not exist: {name}"),
1342            )
1343        })?;
1344
1345        for key in keys {
1346            if let Some(k) = key.as_str() {
1347                group.tags.remove(k);
1348            }
1349        }
1350
1351        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1352    }
1353
1354    fn list_tags_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355        let body = body_json(req);
1356        let name = body["logGroupName"].as_str().ok_or_else(|| {
1357            AwsServiceError::aws_error(
1358                StatusCode::BAD_REQUEST,
1359                "InvalidParameterException",
1360                "logGroupName is required",
1361            )
1362        })?;
1363
1364        validate_string_length("logGroupName", name, 1, 512)?;
1365
1366        let state = self.state.read();
1367        let group = state.log_groups.get(name).ok_or_else(|| {
1368            AwsServiceError::aws_error(
1369                StatusCode::BAD_REQUEST,
1370                "ResourceNotFoundException",
1371                format!("The specified log group does not exist: {name}"),
1372            )
1373        })?;
1374
1375        Ok(AwsResponse::json(
1376            StatusCode::OK,
1377            serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1378        ))
1379    }
1380
1381    // ---- Tags (new API: TagResource/UntagResource/ListTagsForResource) ----
1382
1383    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384        let body = body_json(req);
1385        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1386            AwsServiceError::aws_error(
1387                StatusCode::BAD_REQUEST,
1388                "InvalidParameterException",
1389                "resourceArn is required",
1390            )
1391        })?;
1392
1393        validate_string_length("resourceArn", arn, 1, 1011)?;
1394
1395        let tags = body["tags"].as_object().ok_or_else(|| {
1396            AwsServiceError::aws_error(
1397                StatusCode::BAD_REQUEST,
1398                "InvalidParameterException",
1399                "tags is required",
1400            )
1401        })?;
1402
1403        let new_tags: std::collections::HashMap<String, String> = tags
1404            .iter()
1405            .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
1406            .collect();
1407
1408        let mut state = self.state.write();
1409
1410        // Try log group
1411        if let Some(group) = state
1412            .log_groups
1413            .values_mut()
1414            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1415        {
1416            for (k, v) in new_tags {
1417                group.tags.insert(k, v);
1418            }
1419            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1420        }
1421
1422        // Try destination
1423        if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1424            for (k, v) in new_tags {
1425                dest.tags.insert(k, v);
1426            }
1427            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1428        }
1429
1430        Err(AwsServiceError::aws_error(
1431            StatusCode::BAD_REQUEST,
1432            "ResourceNotFoundException",
1433            format!("The specified resource does not exist: {arn}"),
1434        ))
1435    }
1436
1437    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438        let body = body_json(req);
1439        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1440            AwsServiceError::aws_error(
1441                StatusCode::BAD_REQUEST,
1442                "InvalidParameterException",
1443                "resourceArn is required",
1444            )
1445        })?;
1446
1447        validate_string_length("resourceArn", arn, 1, 1011)?;
1448
1449        let tag_keys = body["tagKeys"].as_array().ok_or_else(|| {
1450            AwsServiceError::aws_error(
1451                StatusCode::BAD_REQUEST,
1452                "InvalidParameterException",
1453                "tagKeys is required",
1454            )
1455        })?;
1456
1457        let keys: Vec<String> = tag_keys
1458            .iter()
1459            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1460            .collect();
1461
1462        let mut state = self.state.write();
1463
1464        // Try log group
1465        if let Some(group) = state
1466            .log_groups
1467            .values_mut()
1468            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1469        {
1470            for k in &keys {
1471                group.tags.remove(k);
1472            }
1473            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1474        }
1475
1476        // Try destination
1477        if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1478            for k in &keys {
1479                dest.tags.remove(k);
1480            }
1481            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1482        }
1483
1484        Err(AwsServiceError::aws_error(
1485            StatusCode::BAD_REQUEST,
1486            "ResourceNotFoundException",
1487            format!("The specified resource does not exist: {arn}"),
1488        ))
1489    }
1490
1491    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1492        let body = body_json(req);
1493        let arn = body["resourceArn"].as_str().ok_or_else(|| {
1494            AwsServiceError::aws_error(
1495                StatusCode::BAD_REQUEST,
1496                "InvalidParameterException",
1497                "resourceArn is required",
1498            )
1499        })?;
1500
1501        validate_string_length("resourceArn", arn, 1, 1011)?;
1502
1503        let state = self.state.read();
1504
1505        // Try log group
1506        if let Some(group) = state
1507            .log_groups
1508            .values()
1509            .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1510        {
1511            return Ok(AwsResponse::json(
1512                StatusCode::OK,
1513                serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1514            ));
1515        }
1516
1517        // Try destination
1518        if let Some(dest) = state.destinations.values().find(|d| d.arn == arn) {
1519            return Ok(AwsResponse::json(
1520                StatusCode::OK,
1521                serde_json::to_string(&json!({ "tags": dest.tags })).unwrap(),
1522            ));
1523        }
1524
1525        Err(AwsServiceError::aws_error(
1526            StatusCode::BAD_REQUEST,
1527            "ResourceNotFoundException",
1528            format!("The specified resource does not exist: {arn}"),
1529        ))
1530    }
1531
1532    // ---- Retention Policy ----
1533
1534    fn put_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535        let body = body_json(req);
1536        let name = body["logGroupName"].as_str().ok_or_else(|| {
1537            AwsServiceError::aws_error(
1538                StatusCode::BAD_REQUEST,
1539                "InvalidParameterException",
1540                "logGroupName is required",
1541            )
1542        })?;
1543
1544        validate_string_length("logGroupName", name, 1, 512)?;
1545
1546        let days = body["retentionInDays"].as_i64().ok_or_else(|| {
1547            AwsServiceError::aws_error(
1548                StatusCode::BAD_REQUEST,
1549                "InvalidParameterException",
1550                "retentionInDays is required",
1551            )
1552        })?;
1553
1554        let mut state = self.state.write();
1555        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1556            AwsServiceError::aws_error(
1557                StatusCode::BAD_REQUEST,
1558                "ResourceNotFoundException",
1559                format!("The specified log group does not exist: {name}"),
1560            )
1561        })?;
1562
1563        group.retention_in_days = Some(days as i32);
1564
1565        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1566    }
1567
1568    fn delete_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569        let body = body_json(req);
1570        let name = body["logGroupName"].as_str().ok_or_else(|| {
1571            AwsServiceError::aws_error(
1572                StatusCode::BAD_REQUEST,
1573                "InvalidParameterException",
1574                "logGroupName is required",
1575            )
1576        })?;
1577
1578        validate_string_length("logGroupName", name, 1, 512)?;
1579
1580        let mut state = self.state.write();
1581        let group = state.log_groups.get_mut(name).ok_or_else(|| {
1582            AwsServiceError::aws_error(
1583                StatusCode::BAD_REQUEST,
1584                "ResourceNotFoundException",
1585                format!("The specified log group does not exist: {name}"),
1586            )
1587        })?;
1588
1589        group.retention_in_days = None;
1590
1591        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1592    }
1593
1594    // ---- Subscription Filters ----
1595
1596    fn put_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1597        let body = body_json(req);
1598        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1599            AwsServiceError::aws_error(
1600                StatusCode::BAD_REQUEST,
1601                "InvalidParameterException",
1602                "logGroupName is required",
1603            )
1604        })?;
1605        let filter_name = body["filterName"]
1606            .as_str()
1607            .ok_or_else(|| {
1608                AwsServiceError::aws_error(
1609                    StatusCode::BAD_REQUEST,
1610                    "InvalidParameterException",
1611                    "filterName is required",
1612                )
1613            })?
1614            .to_string();
1615        let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1616        let destination_arn = body["destinationArn"]
1617            .as_str()
1618            .ok_or_else(|| {
1619                AwsServiceError::aws_error(
1620                    StatusCode::BAD_REQUEST,
1621                    "InvalidParameterException",
1622                    "destinationArn is required",
1623                )
1624            })?
1625            .to_string();
1626        let role_arn = body["roleArn"].as_str().map(|s| s.to_string());
1627        let distribution = body["distribution"]
1628            .as_str()
1629            .unwrap_or("ByLogStream")
1630            .to_string();
1631
1632        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1633        validate_string_length("filterName", &filter_name, 1, 512)?;
1634        validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1635
1636        let mut state = self.state.write();
1637        let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1638            AwsServiceError::aws_error(
1639                StatusCode::BAD_REQUEST,
1640                "ResourceNotFoundException",
1641                "The specified log group does not exist.",
1642            )
1643        })?;
1644
1645        // Check if updating existing filter
1646        if let Some(existing) = group
1647            .subscription_filters
1648            .iter_mut()
1649            .find(|f| f.filter_name == filter_name)
1650        {
1651            existing.filter_pattern = filter_pattern;
1652            existing.destination_arn = destination_arn;
1653            existing.role_arn = role_arn;
1654            existing.distribution = distribution;
1655            return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1656        }
1657
1658        // Max 2 subscription filters per log group
1659        if group.subscription_filters.len() >= 2 {
1660            return Err(AwsServiceError::aws_error(
1661                StatusCode::BAD_REQUEST,
1662                "LimitExceededException",
1663                "Resource limit exceeded.",
1664            ));
1665        }
1666
1667        let now = Utc::now().timestamp_millis();
1668        group.subscription_filters.push(SubscriptionFilter {
1669            filter_name,
1670            log_group_name: log_group_name.to_string(),
1671            filter_pattern,
1672            destination_arn,
1673            role_arn,
1674            distribution,
1675            creation_time: now,
1676        });
1677
1678        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1679    }
1680
1681    fn describe_subscription_filters(
1682        &self,
1683        req: &AwsRequest,
1684    ) -> Result<AwsResponse, AwsServiceError> {
1685        let body = body_json(req);
1686        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1687            AwsServiceError::aws_error(
1688                StatusCode::BAD_REQUEST,
1689                "InvalidParameterException",
1690                "logGroupName is required",
1691            )
1692        })?;
1693
1694        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1695        validate_optional_string_length(
1696            "filterNamePrefix",
1697            body["filterNamePrefix"].as_str(),
1698            1,
1699            512,
1700        )?;
1701
1702        let state = self.state.read();
1703        let group = state.log_groups.get(log_group_name).ok_or_else(|| {
1704            AwsServiceError::aws_error(
1705                StatusCode::BAD_REQUEST,
1706                "ResourceNotFoundException",
1707                "The specified log group does not exist.",
1708            )
1709        })?;
1710
1711        let filters: Vec<Value> = group
1712            .subscription_filters
1713            .iter()
1714            .map(|f| {
1715                let mut obj = json!({
1716                    "filterName": f.filter_name,
1717                    "logGroupName": f.log_group_name,
1718                    "filterPattern": f.filter_pattern,
1719                    "destinationArn": f.destination_arn,
1720                    "distribution": f.distribution,
1721                    "creationTime": f.creation_time,
1722                });
1723                if let Some(ref arn) = f.role_arn {
1724                    obj["roleArn"] = json!(arn);
1725                }
1726                obj
1727            })
1728            .collect();
1729
1730        Ok(AwsResponse::json(
1731            StatusCode::OK,
1732            serde_json::to_string(&json!({ "subscriptionFilters": filters })).unwrap(),
1733        ))
1734    }
1735
1736    fn delete_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1737        let body = body_json(req);
1738        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1739            AwsServiceError::aws_error(
1740                StatusCode::BAD_REQUEST,
1741                "InvalidParameterException",
1742                "logGroupName is required",
1743            )
1744        })?;
1745        let filter_name = body["filterName"].as_str().ok_or_else(|| {
1746            AwsServiceError::aws_error(
1747                StatusCode::BAD_REQUEST,
1748                "InvalidParameterException",
1749                "filterName is required",
1750            )
1751        })?;
1752
1753        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1754        validate_string_length("filterName", filter_name, 1, 512)?;
1755
1756        let mut state = self.state.write();
1757        let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1758            AwsServiceError::aws_error(
1759                StatusCode::BAD_REQUEST,
1760                "ResourceNotFoundException",
1761                "The specified log group does not exist.",
1762            )
1763        })?;
1764
1765        let idx = group
1766            .subscription_filters
1767            .iter()
1768            .position(|f| f.filter_name == filter_name)
1769            .ok_or_else(|| {
1770                AwsServiceError::aws_error(
1771                    StatusCode::BAD_REQUEST,
1772                    "ResourceNotFoundException",
1773                    "The specified subscription filter does not exist.",
1774                )
1775            })?;
1776
1777        group.subscription_filters.remove(idx);
1778
1779        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1780    }
1781
1782    // ---- Metric Filters ----
1783
1784    fn put_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1785        let body = body_json(req);
1786        let filter_name = body["filterName"]
1787            .as_str()
1788            .ok_or_else(|| {
1789                AwsServiceError::aws_error(
1790                    StatusCode::BAD_REQUEST,
1791                    "InvalidParameterException",
1792                    "filterName is required",
1793                )
1794            })?
1795            .to_string();
1796        validate_required("filterPattern", &body["filterPattern"])?;
1797        let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1798        let log_group_name = body["logGroupName"]
1799            .as_str()
1800            .ok_or_else(|| {
1801                AwsServiceError::aws_error(
1802                    StatusCode::BAD_REQUEST,
1803                    "InvalidParameterException",
1804                    "logGroupName is required",
1805                )
1806            })?
1807            .to_string();
1808
1809        validate_string_length("filterName", &filter_name, 1, 512)?;
1810        validate_string_length("logGroupName", &log_group_name, 1, 512)?;
1811        validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1812        validate_optional_string_length(
1813            "fieldSelectionCriteria",
1814            body["fieldSelectionCriteria"].as_str(),
1815            0,
1816            2000,
1817        )?;
1818
1819        let transformations_json = body["metricTransformations"].as_array().ok_or_else(|| {
1820            AwsServiceError::aws_error(
1821                StatusCode::BAD_REQUEST,
1822                "InvalidParameterException",
1823                "metricTransformations is required",
1824            )
1825        })?;
1826
1827        // Validate max 1 transformation
1828        if transformations_json.len() > 1 {
1829            return Err(validation_error(
1830                "metricTransformations",
1831                &format!("{}", transformations_json.len()),
1832                "Member must have length less than or equal to 1",
1833            ));
1834        }
1835
1836        let transformations: Vec<MetricTransformation> = transformations_json
1837            .iter()
1838            .map(|t| MetricTransformation {
1839                metric_name: t["metricName"].as_str().unwrap_or("").to_string(),
1840                metric_namespace: t["metricNamespace"].as_str().unwrap_or("").to_string(),
1841                metric_value: t["metricValue"].as_str().unwrap_or("").to_string(),
1842                default_value: t["defaultValue"].as_f64(),
1843            })
1844            .collect();
1845
1846        let now = Utc::now().timestamp_millis();
1847
1848        let mut state = self.state.write();
1849
1850        // Update existing or add new
1851        if let Some(existing) = state
1852            .metric_filters
1853            .iter_mut()
1854            .find(|f| f.filter_name == filter_name && f.log_group_name == log_group_name)
1855        {
1856            existing.filter_pattern = filter_pattern;
1857            existing.metric_transformations = transformations;
1858        } else {
1859            state.metric_filters.push(MetricFilter {
1860                filter_name,
1861                filter_pattern,
1862                log_group_name,
1863                metric_transformations: transformations,
1864                creation_time: now,
1865            });
1866        }
1867
1868        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1869    }
1870
1871    fn describe_metric_filters(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872        let body = body_json(req);
1873        let filter_name_prefix = body["filterNamePrefix"].as_str();
1874        let log_group_name = body["logGroupName"].as_str();
1875        let metric_name = body["metricName"].as_str();
1876        let metric_namespace = body["metricNamespace"].as_str();
1877
1878        validate_optional_string_length("filterNamePrefix", filter_name_prefix, 1, 512)?;
1879        validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
1880        validate_optional_string_length("metricName", metric_name, 0, 255)?;
1881        validate_optional_string_length("metricNamespace", metric_namespace, 0, 255)?;
1882        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
1883        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
1884
1885        let state = self.state.read();
1886        let filters: Vec<Value> = state
1887            .metric_filters
1888            .iter()
1889            .filter(|f| {
1890                if let Some(prefix) = filter_name_prefix {
1891                    if !f.filter_name.starts_with(prefix) {
1892                        return false;
1893                    }
1894                }
1895                if let Some(lg) = log_group_name {
1896                    if f.log_group_name != lg {
1897                        return false;
1898                    }
1899                }
1900                if let Some(mn) = metric_name {
1901                    if !f.metric_transformations.iter().any(|t| t.metric_name == mn) {
1902                        return false;
1903                    }
1904                }
1905                if let Some(ns) = metric_namespace {
1906                    if !f
1907                        .metric_transformations
1908                        .iter()
1909                        .any(|t| t.metric_namespace == ns)
1910                    {
1911                        return false;
1912                    }
1913                }
1914                true
1915            })
1916            .map(|f| {
1917                let transformations: Vec<Value> = f
1918                    .metric_transformations
1919                    .iter()
1920                    .map(|t| {
1921                        let mut obj = json!({
1922                            "metricName": t.metric_name,
1923                            "metricNamespace": t.metric_namespace,
1924                            "metricValue": t.metric_value,
1925                        });
1926                        if let Some(dv) = t.default_value {
1927                            obj["defaultValue"] = json!(dv);
1928                        }
1929                        obj
1930                    })
1931                    .collect();
1932
1933                json!({
1934                    "filterName": f.filter_name,
1935                    "filterPattern": f.filter_pattern,
1936                    "logGroupName": f.log_group_name,
1937                    "metricTransformations": transformations,
1938                    "creationTime": f.creation_time,
1939                })
1940            })
1941            .collect();
1942
1943        Ok(AwsResponse::json(
1944            StatusCode::OK,
1945            serde_json::to_string(&json!({ "metricFilters": filters })).unwrap(),
1946        ))
1947    }
1948
1949    fn delete_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1950        let body = body_json(req);
1951        let filter_name = body["filterName"].as_str().ok_or_else(|| {
1952            AwsServiceError::aws_error(
1953                StatusCode::BAD_REQUEST,
1954                "InvalidParameterException",
1955                "filterName is required",
1956            )
1957        })?;
1958        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1959            AwsServiceError::aws_error(
1960                StatusCode::BAD_REQUEST,
1961                "InvalidParameterException",
1962                "logGroupName is required",
1963            )
1964        })?;
1965
1966        validate_string_length("filterName", filter_name, 1, 512)?;
1967        validate_string_length("logGroupName", log_group_name, 1, 512)?;
1968
1969        let mut state = self.state.write();
1970        let idx = state
1971            .metric_filters
1972            .iter()
1973            .position(|f| f.filter_name == filter_name && f.log_group_name == log_group_name);
1974
1975        if let Some(i) = idx {
1976            state.metric_filters.remove(i);
1977        }
1978
1979        Ok(AwsResponse::json(StatusCode::OK, "{}"))
1980    }
1981
1982    // ---- Resource Policies ----
1983
1984    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1985        let body = body_json(req);
1986        let policy_name = body["policyName"]
1987            .as_str()
1988            .ok_or_else(|| {
1989                AwsServiceError::aws_error(
1990                    StatusCode::BAD_REQUEST,
1991                    "InvalidParameterException",
1992                    "policyName is required",
1993                )
1994            })?
1995            .to_string();
1996        let policy_document = body["policyDocument"]
1997            .as_str()
1998            .ok_or_else(|| {
1999                AwsServiceError::aws_error(
2000                    StatusCode::BAD_REQUEST,
2001                    "InvalidParameterException",
2002                    "policyDocument is required",
2003                )
2004            })?
2005            .to_string();
2006
2007        let now = Utc::now().timestamp_millis();
2008
2009        let mut state = self.state.write();
2010
2011        // Check limit (10 per region) only if adding new
2012        if !state.resource_policies.contains_key(&policy_name)
2013            && state.resource_policies.len() >= 10
2014        {
2015            return Err(AwsServiceError::aws_error(
2016                StatusCode::BAD_REQUEST,
2017                "LimitExceededException",
2018                "Resource limit exceeded.",
2019            ));
2020        }
2021
2022        let policy = ResourcePolicy {
2023            policy_name: policy_name.clone(),
2024            policy_document: policy_document.clone(),
2025            last_updated_time: now,
2026        };
2027
2028        state.resource_policies.insert(policy_name.clone(), policy);
2029
2030        Ok(AwsResponse::json(
2031            StatusCode::OK,
2032            serde_json::to_string(&json!({
2033                "resourcePolicy": {
2034                    "policyName": policy_name,
2035                    "policyDocument": policy_document,
2036                    "lastUpdatedTime": now,
2037                }
2038            }))
2039            .unwrap(),
2040        ))
2041    }
2042
2043    fn describe_resource_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2044        let body = body_json(req);
2045        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2046        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2047        validate_optional_enum_value(
2048            "policyScope",
2049            &body["policyScope"],
2050            &["ACCOUNT", "RESOURCE"],
2051        )?;
2052        let state = self.state.read();
2053
2054        let mut policies: Vec<Value> = state
2055            .resource_policies
2056            .values()
2057            .map(|p| {
2058                json!({
2059                    "policyName": p.policy_name,
2060                    "policyDocument": p.policy_document,
2061                    "lastUpdatedTime": p.last_updated_time,
2062                })
2063            })
2064            .collect();
2065        policies.sort_by(|a, b| {
2066            a["policyName"]
2067                .as_str()
2068                .unwrap()
2069                .cmp(b["policyName"].as_str().unwrap())
2070        });
2071
2072        Ok(AwsResponse::json(
2073            StatusCode::OK,
2074            serde_json::to_string(&json!({ "resourcePolicies": policies })).unwrap(),
2075        ))
2076    }
2077
2078    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079        let body = body_json(req);
2080        let policy_name = body["policyName"].as_str().ok_or_else(|| {
2081            AwsServiceError::aws_error(
2082                StatusCode::BAD_REQUEST,
2083                "InvalidParameterException",
2084                "policyName is required",
2085            )
2086        })?;
2087
2088        let mut state = self.state.write();
2089        if state.resource_policies.remove(policy_name).is_none() {
2090            return Err(AwsServiceError::aws_error(
2091                StatusCode::BAD_REQUEST,
2092                "ResourceNotFoundException",
2093                format!("Policy with name [{policy_name}] does not exist"),
2094            ));
2095        }
2096
2097        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2098    }
2099
2100    // ---- Destinations ----
2101
2102    fn put_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2103        let body = body_json(req);
2104        let destination_name = body["destinationName"]
2105            .as_str()
2106            .ok_or_else(|| {
2107                AwsServiceError::aws_error(
2108                    StatusCode::BAD_REQUEST,
2109                    "InvalidParameterException",
2110                    "destinationName is required",
2111                )
2112            })?
2113            .to_string();
2114        let target_arn = body["targetArn"]
2115            .as_str()
2116            .ok_or_else(|| {
2117                AwsServiceError::aws_error(
2118                    StatusCode::BAD_REQUEST,
2119                    "InvalidParameterException",
2120                    "targetArn is required",
2121                )
2122            })?
2123            .to_string();
2124        let role_arn = body["roleArn"]
2125            .as_str()
2126            .ok_or_else(|| {
2127                AwsServiceError::aws_error(
2128                    StatusCode::BAD_REQUEST,
2129                    "InvalidParameterException",
2130                    "roleArn is required",
2131                )
2132            })?
2133            .to_string();
2134
2135        validate_string_length("destinationName", &destination_name, 1, 512)?;
2136        validate_string_length("targetArn", &target_arn, 1, 2048)?;
2137        validate_string_length("roleArn", &role_arn, 1, 2048)?;
2138
2139        let tags: std::collections::HashMap<String, String> = body["tags"]
2140            .as_object()
2141            .map(|m| {
2142                m.iter()
2143                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2144                    .collect()
2145            })
2146            .unwrap_or_default();
2147
2148        let mut state = self.state.write();
2149        let arn = format!(
2150            "arn:aws:logs:{}:{}:destination:{}",
2151            state.region, state.account_id, destination_name
2152        );
2153        let now = Utc::now().timestamp_millis();
2154
2155        // Update or create
2156        let access_policy = state
2157            .destinations
2158            .get(&destination_name)
2159            .and_then(|d| d.access_policy.clone());
2160
2161        let dest = Destination {
2162            destination_name: destination_name.clone(),
2163            target_arn: target_arn.clone(),
2164            role_arn: role_arn.clone(),
2165            arn: arn.clone(),
2166            access_policy,
2167            creation_time: now,
2168            tags: tags.clone(),
2169        };
2170
2171        state.destinations.insert(destination_name.clone(), dest);
2172
2173        let dest_json = json!({
2174            "destinationName": destination_name,
2175            "targetArn": target_arn,
2176            "roleArn": role_arn,
2177            "arn": arn,
2178            "creationTime": now,
2179        });
2180
2181        Ok(AwsResponse::json(
2182            StatusCode::OK,
2183            serde_json::to_string(&json!({ "destination": dest_json })).unwrap(),
2184        ))
2185    }
2186
2187    fn describe_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2188        let body = body_json(req);
2189        let prefix = body["DestinationNamePrefix"].as_str().unwrap_or("");
2190
2191        validate_optional_string_length(
2192            "DestinationNamePrefix",
2193            body["DestinationNamePrefix"].as_str(),
2194            1,
2195            512,
2196        )?;
2197        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2198        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2199
2200        let state = self.state.read();
2201        let destinations: Vec<Value> = state
2202            .destinations
2203            .values()
2204            .filter(|d| prefix.is_empty() || d.destination_name.starts_with(prefix))
2205            .map(|d| {
2206                let mut obj = json!({
2207                    "destinationName": d.destination_name,
2208                    "targetArn": d.target_arn,
2209                    "roleArn": d.role_arn,
2210                    "arn": d.arn,
2211                    "creationTime": d.creation_time,
2212                });
2213                if let Some(ref policy) = d.access_policy {
2214                    obj["accessPolicy"] = json!(policy);
2215                }
2216                obj
2217            })
2218            .collect();
2219
2220        Ok(AwsResponse::json(
2221            StatusCode::OK,
2222            serde_json::to_string(&json!({ "destinations": destinations })).unwrap(),
2223        ))
2224    }
2225
2226    fn delete_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2227        let body = body_json(req);
2228        let name = body["destinationName"].as_str().ok_or_else(|| {
2229            AwsServiceError::aws_error(
2230                StatusCode::BAD_REQUEST,
2231                "InvalidParameterException",
2232                "destinationName is required",
2233            )
2234        })?;
2235
2236        validate_string_length("destinationName", name, 1, 512)?;
2237
2238        let mut state = self.state.write();
2239        if state.destinations.remove(name).is_none() {
2240            return Err(AwsServiceError::aws_error(
2241                StatusCode::BAD_REQUEST,
2242                "ResourceNotFoundException",
2243                format!("The specified destination does not exist: {name}"),
2244            ));
2245        }
2246
2247        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2248    }
2249
2250    fn put_destination_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2251        let body = body_json(req);
2252        let name = body["destinationName"].as_str().ok_or_else(|| {
2253            AwsServiceError::aws_error(
2254                StatusCode::BAD_REQUEST,
2255                "InvalidParameterException",
2256                "destinationName is required",
2257            )
2258        })?;
2259
2260        validate_string_length("destinationName", name, 1, 512)?;
2261
2262        let policy = body["accessPolicy"].as_str().ok_or_else(|| {
2263            AwsServiceError::aws_error(
2264                StatusCode::BAD_REQUEST,
2265                "InvalidParameterException",
2266                "accessPolicy is required",
2267            )
2268        })?;
2269
2270        validate_string_length("accessPolicy", policy, 1, 5120)?;
2271
2272        let mut state = self.state.write();
2273        let dest = state.destinations.get_mut(name).ok_or_else(|| {
2274            AwsServiceError::aws_error(
2275                StatusCode::BAD_REQUEST,
2276                "ResourceNotFoundException",
2277                format!("The specified destination does not exist: {name}"),
2278            )
2279        })?;
2280
2281        dest.access_policy = Some(policy.to_string());
2282
2283        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2284    }
2285
2286    // ---- Queries ----
2287
2288    fn start_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2289        let body = body_json(req);
2290        let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
2291            AwsServiceError::aws_error(
2292                StatusCode::BAD_REQUEST,
2293                "InvalidParameterException",
2294                "logGroupName is required",
2295            )
2296        })?;
2297        let start_time = body["startTime"].as_i64().unwrap_or(0);
2298        let end_time = body["endTime"].as_i64().unwrap_or(0);
2299        let query_string = body["queryString"].as_str().unwrap_or("").to_string();
2300
2301        validate_string_length("logGroupName", log_group_name, 1, 512)?;
2302        validate_optional_string_length("queryString", Some(&query_string), 0, 10000)?;
2303
2304        let mut state = self.state.write();
2305
2306        // Verify log group exists
2307        if !state.log_groups.contains_key(log_group_name) {
2308            return Err(AwsServiceError::aws_error(
2309                StatusCode::BAD_REQUEST,
2310                "ResourceNotFoundException",
2311                "The specified log group does not exist.",
2312            ));
2313        }
2314
2315        let query_id = uuid::Uuid::new_v4().to_string();
2316        let now = Utc::now().timestamp_millis();
2317
2318        state.queries.insert(
2319            query_id.clone(),
2320            QueryInfo {
2321                query_id: query_id.clone(),
2322                log_group_name: log_group_name.to_string(),
2323                query_string,
2324                start_time,
2325                end_time,
2326                status: "Complete".to_string(),
2327                create_time: now,
2328            },
2329        );
2330
2331        Ok(AwsResponse::json(
2332            StatusCode::OK,
2333            serde_json::to_string(&json!({ "queryId": query_id })).unwrap(),
2334        ))
2335    }
2336
2337    fn get_query_results(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2338        let body = body_json(req);
2339        let query_id = body["queryId"].as_str().ok_or_else(|| {
2340            AwsServiceError::aws_error(
2341                StatusCode::BAD_REQUEST,
2342                "InvalidParameterException",
2343                "queryId is required",
2344            )
2345        })?;
2346
2347        validate_string_length("queryId", query_id, 1, 256)?;
2348
2349        let state = self.state.read();
2350        let query = state.queries.get(query_id).ok_or_else(|| {
2351            AwsServiceError::aws_error(
2352                StatusCode::BAD_REQUEST,
2353                "ResourceNotFoundException",
2354                "The specified query does not exist.",
2355            )
2356        })?;
2357
2358        // Find matching log events
2359        let mut results: Vec<Value> = Vec::new();
2360        if let Some(group) = state.log_groups.get(&query.log_group_name) {
2361            for stream in group.log_streams.values() {
2362                for event in &stream.events {
2363                    // Convert timestamps: query uses seconds, events use milliseconds
2364                    let event_time_secs = event.timestamp / 1000;
2365                    if event_time_secs >= query.start_time && event_time_secs < query.end_time {
2366                        results.push(json!([
2367                            {"field": "@message", "value": event.message},
2368                            {"field": "@ptr", "value": format!("{}/{}", stream.name, event.timestamp)},
2369                        ]));
2370                    }
2371                }
2372            }
2373        }
2374
2375        // Sort by @message value
2376        results.sort_by(|a, b| {
2377            let a_msg = a[0]["value"].as_str().unwrap_or("");
2378            let b_msg = b[0]["value"].as_str().unwrap_or("");
2379            a_msg.cmp(b_msg)
2380        });
2381
2382        Ok(AwsResponse::json(
2383            StatusCode::OK,
2384            serde_json::to_string(&json!({
2385                "status": query.status,
2386                "results": results,
2387                "statistics": {
2388                    "recordsMatched": results.len() as f64,
2389                    "recordsScanned": results.len() as f64,
2390                    "bytesScanned": 0.0,
2391                },
2392            }))
2393            .unwrap(),
2394        ))
2395    }
2396
2397    fn describe_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2398        let body = body_json(req);
2399        let log_group_name = body["logGroupName"].as_str();
2400        let status_filter = body["status"].as_str();
2401
2402        validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
2403        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
2404        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2405        validate_optional_enum_value(
2406            "status",
2407            &body["status"],
2408            &[
2409                "Scheduled",
2410                "Running",
2411                "Complete",
2412                "Failed",
2413                "Cancelled",
2414                "Timeout",
2415                "Unknown",
2416            ],
2417        )?;
2418        validate_optional_enum_value(
2419            "queryLanguage",
2420            &body["queryLanguage"],
2421            &["CWLI", "SQL", "PPL"],
2422        )?;
2423
2424        let state = self.state.read();
2425        let queries: Vec<Value> = state
2426            .queries
2427            .values()
2428            .filter(|q| {
2429                if let Some(lg) = log_group_name {
2430                    if q.log_group_name != lg {
2431                        return false;
2432                    }
2433                }
2434                if let Some(status) = status_filter {
2435                    if q.status != status {
2436                        return false;
2437                    }
2438                }
2439                true
2440            })
2441            .map(|q| {
2442                json!({
2443                    "queryId": q.query_id,
2444                    "queryString": q.query_string,
2445                    "status": q.status,
2446                    "createTime": q.create_time,
2447                    "logGroupName": q.log_group_name,
2448                })
2449            })
2450            .collect();
2451
2452        Ok(AwsResponse::json(
2453            StatusCode::OK,
2454            serde_json::to_string(&json!({ "queries": queries })).unwrap(),
2455        ))
2456    }
2457
2458    // ---- Export Tasks ----
2459
2460    fn create_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461        let body = body_json(req);
2462        let log_group_name = body["logGroupName"]
2463            .as_str()
2464            .ok_or_else(|| {
2465                AwsServiceError::aws_error(
2466                    StatusCode::BAD_REQUEST,
2467                    "InvalidParameterException",
2468                    "logGroupName is required",
2469                )
2470            })?
2471            .to_string();
2472        let from_time = body["from"].as_i64().unwrap_or(0);
2473        let to_time = body["to"].as_i64().unwrap_or(0);
2474        let destination = body["destination"]
2475            .as_str()
2476            .ok_or_else(|| {
2477                AwsServiceError::aws_error(
2478                    StatusCode::BAD_REQUEST,
2479                    "InvalidParameterException",
2480                    "destination is required",
2481                )
2482            })?
2483            .to_string();
2484        let destination_prefix = body["destinationPrefix"]
2485            .as_str()
2486            .unwrap_or("exportedlogs")
2487            .to_string();
2488
2489        validate_string_length("logGroupName", &log_group_name, 1, 512)?;
2490        validate_optional_string_length("taskName", body["taskName"].as_str(), 1, 512)?;
2491        validate_optional_string_length(
2492            "logStreamNamePrefix",
2493            body["logStreamNamePrefix"].as_str(),
2494            1,
2495            512,
2496        )?;
2497        validate_string_length("destination", &destination, 1, 512)?;
2498
2499        let state = self.state.read();
2500        if !state.log_groups.contains_key(&log_group_name) {
2501            return Err(AwsServiceError::aws_error(
2502                StatusCode::BAD_REQUEST,
2503                "ResourceNotFoundException",
2504                "The specified log group does not exist.",
2505            ));
2506        }
2507        drop(state);
2508
2509        let task_name = body["taskName"].as_str().map(|s| s.to_string());
2510        let log_stream_name_prefix = body["logStreamNamePrefix"].as_str().map(|s| s.to_string());
2511
2512        let task_id = uuid::Uuid::new_v4().to_string();
2513        let (status_code, status_message) = if from_time < to_time {
2514            (
2515                "COMPLETED".to_string(),
2516                "Completed successfully".to_string(),
2517            )
2518        } else {
2519            ("active".to_string(), "Task is active".to_string())
2520        };
2521
2522        let mut state = self.state.write();
2523        state.export_tasks.push(ExportTask {
2524            task_id: task_id.clone(),
2525            task_name,
2526            log_group_name,
2527            log_stream_name_prefix,
2528            from_time,
2529            to_time,
2530            destination,
2531            destination_prefix,
2532            status_code,
2533            status_message,
2534        });
2535
2536        Ok(AwsResponse::json(
2537            StatusCode::OK,
2538            serde_json::to_string(&json!({ "taskId": task_id })).unwrap(),
2539        ))
2540    }
2541
2542    fn describe_export_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2543        let body = body_json(req);
2544        let task_id_filter = body["taskId"].as_str();
2545
2546        validate_optional_string_length("taskId", task_id_filter, 1, 512)?;
2547        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2548        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2549        validate_optional_enum_value(
2550            "statusCode",
2551            &body["statusCode"],
2552            &[
2553                "CANCELLED",
2554                "COMPLETED",
2555                "FAILED",
2556                "PENDING",
2557                "PENDING_CANCEL",
2558                "RUNNING",
2559            ],
2560        )?;
2561
2562        let state = self.state.read();
2563
2564        if let Some(task_id) = task_id_filter {
2565            let task = state.export_tasks.iter().find(|t| t.task_id == task_id);
2566            if task.is_none() {
2567                return Err(AwsServiceError::aws_error(
2568                    StatusCode::BAD_REQUEST,
2569                    "ResourceNotFoundException",
2570                    "The specified export task does not exist.",
2571                ));
2572            }
2573        }
2574
2575        let tasks: Vec<Value> = state
2576            .export_tasks
2577            .iter()
2578            .filter(|t| {
2579                if let Some(tid) = task_id_filter {
2580                    t.task_id == tid
2581                } else {
2582                    true
2583                }
2584            })
2585            .map(|t| {
2586                let mut obj = json!({
2587                    "taskId": t.task_id,
2588                    "logGroupName": t.log_group_name,
2589                    "from": t.from_time,
2590                    "to": t.to_time,
2591                    "destination": t.destination,
2592                    "destinationPrefix": t.destination_prefix,
2593                    "status": {
2594                        "code": t.status_code,
2595                        "message": t.status_message,
2596                    },
2597                });
2598                if let Some(ref name) = t.task_name {
2599                    obj["taskName"] = json!(name);
2600                }
2601                if let Some(ref prefix) = t.log_stream_name_prefix {
2602                    obj["logStreamNamePrefix"] = json!(prefix);
2603                }
2604                obj
2605            })
2606            .collect();
2607
2608        Ok(AwsResponse::json(
2609            StatusCode::OK,
2610            serde_json::to_string(&json!({ "exportTasks": tasks })).unwrap(),
2611        ))
2612    }
2613
2614    fn cancel_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615        let body = body_json(req);
2616        let task_id = body["taskId"].as_str().ok_or_else(|| {
2617            AwsServiceError::aws_error(
2618                StatusCode::BAD_REQUEST,
2619                "InvalidParameterException",
2620                "taskId is required",
2621            )
2622        })?;
2623
2624        validate_string_length("taskId", task_id, 1, 512)?;
2625
2626        let mut state = self.state.write();
2627        let task = state
2628            .export_tasks
2629            .iter_mut()
2630            .find(|t| t.task_id == task_id)
2631            .ok_or_else(|| {
2632                AwsServiceError::aws_error(
2633                    StatusCode::BAD_REQUEST,
2634                    "ResourceNotFoundException",
2635                    "The specified export task does not exist.",
2636                )
2637            })?;
2638
2639        task.status_code = "CANCELLED".to_string();
2640        task.status_message = "Task was cancelled".to_string();
2641
2642        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2643    }
2644
2645    // ---- Delivery Destinations ----
2646
2647    fn put_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2648        let body = body_json(req);
2649        let name = body["name"]
2650            .as_str()
2651            .ok_or_else(|| {
2652                AwsServiceError::aws_error(
2653                    StatusCode::BAD_REQUEST,
2654                    "InvalidParameterException",
2655                    "name is required",
2656                )
2657            })?
2658            .to_string();
2659
2660        validate_string_length("name", &name, 1, 60)?;
2661
2662        validate_optional_enum_value(
2663            "deliveryDestinationType",
2664            &body["deliveryDestinationType"],
2665            &["S3", "CWL", "FH", "XRAY"],
2666        )?;
2667
2668        let output_format = body["outputFormat"].as_str().map(|s| s.to_string());
2669
2670        // Validate output format
2671        if let Some(ref fmt) = output_format {
2672            let valid = ["json", "plain", "w3c", "raw", "parquet"];
2673            if !valid.contains(&fmt.as_str()) {
2674                return Err(AwsServiceError::aws_error(
2675                    StatusCode::BAD_REQUEST,
2676                    "ValidationException",
2677                    format!("1 validation error detected: Value '{fmt}' at 'outputFormat' failed to satisfy constraint: Member must satisfy enum value set: [json, plain, w3c, raw, parquet]"),
2678                ));
2679            }
2680        }
2681
2682        let config: std::collections::HashMap<String, String> = body
2683            ["deliveryDestinationConfiguration"]
2684            .as_object()
2685            .map(|m| {
2686                m.iter()
2687                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2688                    .collect()
2689            })
2690            .unwrap_or_default();
2691
2692        let tags: std::collections::HashMap<String, String> = body["tags"]
2693            .as_object()
2694            .map(|m| {
2695                m.iter()
2696                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2697                    .collect()
2698            })
2699            .unwrap_or_default();
2700
2701        let mut state = self.state.write();
2702
2703        // Check if updating - cannot change output format
2704        if let Some(existing) = state.delivery_destinations.get(&name) {
2705            if let Some(ref new_fmt) = output_format {
2706                if let Some(ref existing_fmt) = existing.output_format {
2707                    if new_fmt != existing_fmt {
2708                        return Err(AwsServiceError::aws_error(
2709                            StatusCode::BAD_REQUEST,
2710                            "ValidationException",
2711                            "Cannot update outputFormat for an existing delivery destination.",
2712                        ));
2713                    }
2714                }
2715            }
2716        }
2717
2718        let arn = format!(
2719            "arn:aws:logs:{}:{}:delivery-destination:{}",
2720            state.region, state.account_id, name
2721        );
2722
2723        let existing_policy = state
2724            .delivery_destinations
2725            .get(&name)
2726            .and_then(|d| d.delivery_destination_policy.clone());
2727
2728        let dd = DeliveryDestination {
2729            name: name.clone(),
2730            arn: arn.clone(),
2731            output_format: output_format.clone(),
2732            delivery_destination_configuration: config.clone(),
2733            tags: tags.clone(),
2734            delivery_destination_policy: existing_policy,
2735        };
2736
2737        state.delivery_destinations.insert(name.clone(), dd);
2738
2739        // Build the configuration object for the response, preserving existing fields
2740        // and always including destinationResourceArn (Smithy shape requires string, not null)
2741        let config_resp = {
2742            let mut c: serde_json::Map<String, Value> =
2743                config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
2744            c.entry("destinationResourceArn".to_string())
2745                .or_insert_with(|| json!(""));
2746            Value::Object(c)
2747        };
2748
2749        let mut resp = json!({
2750            "deliveryDestination": {
2751                "name": name,
2752                "arn": arn,
2753                "deliveryDestinationConfiguration": config_resp,
2754            }
2755        });
2756        if let Some(ref fmt) = output_format {
2757            resp["deliveryDestination"]["outputFormat"] = json!(fmt);
2758        }
2759        if !tags.is_empty() {
2760            resp["deliveryDestination"]["tags"] = json!(tags);
2761        }
2762
2763        Ok(AwsResponse::json(
2764            StatusCode::OK,
2765            serde_json::to_string(&resp).unwrap(),
2766        ))
2767    }
2768
2769    fn get_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2770        let body = body_json(req);
2771        let name = body["name"].as_str().ok_or_else(|| {
2772            AwsServiceError::aws_error(
2773                StatusCode::BAD_REQUEST,
2774                "InvalidParameterException",
2775                "name is required",
2776            )
2777        })?;
2778
2779        validate_string_length("name", name, 1, 60)?;
2780
2781        let state = self.state.read();
2782        let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2783            AwsServiceError::aws_error(
2784                StatusCode::BAD_REQUEST,
2785                "ResourceNotFoundException",
2786                format!("Delivery destination '{name}' does not exist."),
2787            )
2788        })?;
2789
2790        let mut obj = json!({
2791            "name": dd.name,
2792            "arn": dd.arn,
2793            "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2794        });
2795        if let Some(ref fmt) = dd.output_format {
2796            obj["outputFormat"] = json!(fmt);
2797        }
2798
2799        Ok(AwsResponse::json(
2800            StatusCode::OK,
2801            serde_json::to_string(&json!({ "deliveryDestination": obj })).unwrap(),
2802        ))
2803    }
2804
2805    fn describe_delivery_destinations(
2806        &self,
2807        req: &AwsRequest,
2808    ) -> Result<AwsResponse, AwsServiceError> {
2809        let body = body_json(req);
2810        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2811        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2812
2813        let state = self.state.read();
2814        let dds: Vec<Value> = state
2815            .delivery_destinations
2816            .values()
2817            .map(|dd| {
2818                let mut obj = json!({
2819                    "name": dd.name,
2820                    "arn": dd.arn,
2821                    "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2822                });
2823                if let Some(ref fmt) = dd.output_format {
2824                    obj["outputFormat"] = json!(fmt);
2825                }
2826                obj
2827            })
2828            .collect();
2829
2830        Ok(AwsResponse::json(
2831            StatusCode::OK,
2832            serde_json::to_string(&json!({ "deliveryDestinations": dds })).unwrap(),
2833        ))
2834    }
2835
2836    fn delete_delivery_destination(
2837        &self,
2838        req: &AwsRequest,
2839    ) -> Result<AwsResponse, AwsServiceError> {
2840        let body = body_json(req);
2841        let name = body["name"].as_str().ok_or_else(|| {
2842            AwsServiceError::aws_error(
2843                StatusCode::BAD_REQUEST,
2844                "InvalidParameterException",
2845                "name is required",
2846            )
2847        })?;
2848
2849        validate_string_length("name", name, 1, 60)?;
2850
2851        let mut state = self.state.write();
2852        if state.delivery_destinations.remove(name).is_none() {
2853            return Err(AwsServiceError::aws_error(
2854                StatusCode::BAD_REQUEST,
2855                "ResourceNotFoundException",
2856                format!("Delivery destination '{name}' does not exist."),
2857            ));
2858        }
2859
2860        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2861    }
2862
2863    fn put_delivery_destination_policy(
2864        &self,
2865        req: &AwsRequest,
2866    ) -> Result<AwsResponse, AwsServiceError> {
2867        let body = body_json(req);
2868        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2869            AwsServiceError::aws_error(
2870                StatusCode::BAD_REQUEST,
2871                "InvalidParameterException",
2872                "deliveryDestinationName is required",
2873            )
2874        })?;
2875        let policy = body["deliveryDestinationPolicy"]
2876            .as_str()
2877            .ok_or_else(|| {
2878                AwsServiceError::aws_error(
2879                    StatusCode::BAD_REQUEST,
2880                    "InvalidParameterException",
2881                    "deliveryDestinationPolicy is required",
2882                )
2883            })?
2884            .to_string();
2885
2886        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2887        validate_string_length("deliveryDestinationPolicy", &policy, 1, 51200)?;
2888
2889        let mut state = self.state.write();
2890        let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2891            AwsServiceError::aws_error(
2892                StatusCode::BAD_REQUEST,
2893                "ResourceNotFoundException",
2894                format!("Delivery destination '{name}' does not exist."),
2895            )
2896        })?;
2897
2898        dd.delivery_destination_policy = Some(policy.clone());
2899
2900        Ok(AwsResponse::json(
2901            StatusCode::OK,
2902            serde_json::to_string(&json!({
2903                "policy": {
2904                    "deliveryDestinationPolicy": policy,
2905                }
2906            }))
2907            .unwrap(),
2908        ))
2909    }
2910
2911    fn get_delivery_destination_policy(
2912        &self,
2913        req: &AwsRequest,
2914    ) -> Result<AwsResponse, AwsServiceError> {
2915        let body = body_json(req);
2916        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2917            AwsServiceError::aws_error(
2918                StatusCode::BAD_REQUEST,
2919                "InvalidParameterException",
2920                "deliveryDestinationName is required",
2921            )
2922        })?;
2923
2924        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2925
2926        let state = self.state.read();
2927        let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2928            AwsServiceError::aws_error(
2929                StatusCode::BAD_REQUEST,
2930                "ResourceNotFoundException",
2931                format!("Delivery destination '{name}' does not exist."),
2932            )
2933        })?;
2934
2935        let policy_json = if let Some(ref policy) = dd.delivery_destination_policy {
2936            json!({
2937                "deliveryDestinationPolicy": policy,
2938            })
2939        } else {
2940            json!({})
2941        };
2942
2943        Ok(AwsResponse::json(
2944            StatusCode::OK,
2945            serde_json::to_string(&json!({
2946                "policy": policy_json,
2947            }))
2948            .unwrap(),
2949        ))
2950    }
2951
2952    fn delete_delivery_destination_policy(
2953        &self,
2954        req: &AwsRequest,
2955    ) -> Result<AwsResponse, AwsServiceError> {
2956        let body = body_json(req);
2957        let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2958            AwsServiceError::aws_error(
2959                StatusCode::BAD_REQUEST,
2960                "InvalidParameterException",
2961                "deliveryDestinationName is required",
2962            )
2963        })?;
2964
2965        validate_string_length("deliveryDestinationName", name, 1, 60)?;
2966
2967        let mut state = self.state.write();
2968        let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2969            AwsServiceError::aws_error(
2970                StatusCode::BAD_REQUEST,
2971                "ResourceNotFoundException",
2972                format!("Delivery destination '{name}' does not exist."),
2973            )
2974        })?;
2975
2976        dd.delivery_destination_policy = None;
2977
2978        Ok(AwsResponse::json(StatusCode::OK, "{}"))
2979    }
2980
2981    // ---- Delivery Sources ----
2982
2983    fn put_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2984        let body = body_json(req);
2985        let name = body["name"]
2986            .as_str()
2987            .ok_or_else(|| {
2988                AwsServiceError::aws_error(
2989                    StatusCode::BAD_REQUEST,
2990                    "InvalidParameterException",
2991                    "name is required",
2992                )
2993            })?
2994            .to_string();
2995        let resource_arn = body["resourceArn"]
2996            .as_str()
2997            .ok_or_else(|| {
2998                AwsServiceError::aws_error(
2999                    StatusCode::BAD_REQUEST,
3000                    "InvalidParameterException",
3001                    "resourceArn is required",
3002                )
3003            })?
3004            .to_string();
3005        let log_type = body["logType"]
3006            .as_str()
3007            .ok_or_else(|| {
3008                AwsServiceError::aws_error(
3009                    StatusCode::BAD_REQUEST,
3010                    "InvalidParameterException",
3011                    "logType is required",
3012                )
3013            })?
3014            .to_string();
3015
3016        validate_string_length("name", &name, 1, 60)?;
3017        validate_string_length("logType", &log_type, 1, 255)?;
3018
3019        let tags: std::collections::HashMap<String, String> = body["tags"]
3020            .as_object()
3021            .map(|m| {
3022                m.iter()
3023                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3024                    .collect()
3025            })
3026            .unwrap_or_default();
3027
3028        // Extract service from ARN
3029        let service = resource_arn
3030            .split(':')
3031            .nth(2)
3032            .unwrap_or("unknown")
3033            .to_string();
3034
3035        // Validate resource ARN format - must start with arn:aws:
3036        if !resource_arn.starts_with("arn:aws:") {
3037            return Err(AwsServiceError::aws_error(
3038                StatusCode::BAD_REQUEST,
3039                "ValidationException",
3040                format!("Invalid resource ARN: {resource_arn}"),
3041            ));
3042        }
3043
3044        // S3 cannot be a delivery source
3045        if service == "s3" {
3046            return Err(AwsServiceError::aws_error(
3047                StatusCode::BAD_REQUEST,
3048                "ResourceNotFoundException",
3049                format!("The resource ARN '{resource_arn}' is not a valid delivery source."),
3050            ));
3051        }
3052
3053        // Validate log type based on service
3054        let valid_log_types: &[&str] = match service.as_str() {
3055            "cloudfront" => &["ACCESS_LOGS"],
3056            _ => &["ACCESS_LOGS", "APPLICATION_LOGS", "FW_LOGS"],
3057        };
3058        if !valid_log_types.contains(&log_type.as_str()) {
3059            return Err(AwsServiceError::aws_error(
3060                StatusCode::BAD_REQUEST,
3061                "ValidationException",
3062                format!("Log type '{log_type}' is not valid for this resource."),
3063            ));
3064        }
3065
3066        let mut state = self.state.write();
3067
3068        // Cannot update with different resourceArn
3069        if let Some(existing) = state.delivery_sources.get(&name) {
3070            if !existing.resource_arns.is_empty() && existing.resource_arns[0] != resource_arn {
3071                return Err(AwsServiceError::aws_error(
3072                    StatusCode::BAD_REQUEST,
3073                    "ConflictException",
3074                    "Cannot update delivery source with a different resourceArn.",
3075                ));
3076            }
3077        }
3078
3079        let arn = format!(
3080            "arn:aws:logs:{}:{}:delivery-source:{}",
3081            state.region, state.account_id, name
3082        );
3083
3084        let ds = DeliverySource {
3085            name: name.clone(),
3086            arn: arn.clone(),
3087            resource_arns: vec![resource_arn],
3088            service: service.clone(),
3089            log_type: log_type.clone(),
3090            tags: tags.clone(),
3091        };
3092
3093        state.delivery_sources.insert(name.clone(), ds);
3094
3095        let state_ref = state.delivery_sources.get(&name).unwrap();
3096
3097        Ok(AwsResponse::json(
3098            StatusCode::OK,
3099            serde_json::to_string(&json!({
3100                "deliverySource": {
3101                    "name": state_ref.name,
3102                    "arn": state_ref.arn,
3103                    "resourceArns": state_ref.resource_arns,
3104                    "service": state_ref.service,
3105                    "logType": state_ref.log_type,
3106                    "tags": state_ref.tags,
3107                }
3108            }))
3109            .unwrap(),
3110        ))
3111    }
3112
3113    fn get_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3114        let body = body_json(req);
3115        let name = body["name"].as_str().ok_or_else(|| {
3116            AwsServiceError::aws_error(
3117                StatusCode::BAD_REQUEST,
3118                "InvalidParameterException",
3119                "name is required",
3120            )
3121        })?;
3122
3123        validate_string_length("name", name, 1, 60)?;
3124
3125        let state = self.state.read();
3126        let ds = state.delivery_sources.get(name).ok_or_else(|| {
3127            AwsServiceError::aws_error(
3128                StatusCode::BAD_REQUEST,
3129                "ResourceNotFoundException",
3130                format!("Delivery source '{name}' does not exist."),
3131            )
3132        })?;
3133
3134        Ok(AwsResponse::json(
3135            StatusCode::OK,
3136            serde_json::to_string(&json!({
3137                "deliverySource": {
3138                    "name": ds.name,
3139                    "arn": ds.arn,
3140                    "resourceArns": ds.resource_arns,
3141                    "service": ds.service,
3142                    "logType": ds.log_type,
3143                    "tags": ds.tags,
3144                }
3145            }))
3146            .unwrap(),
3147        ))
3148    }
3149
3150    fn describe_delivery_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3151        let body = body_json(req);
3152        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3153        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3154
3155        let state = self.state.read();
3156        let sources: Vec<Value> = state
3157            .delivery_sources
3158            .values()
3159            .map(|ds| {
3160                json!({
3161                    "name": ds.name,
3162                    "arn": ds.arn,
3163                    "resourceArns": ds.resource_arns,
3164                    "service": ds.service,
3165                    "logType": ds.log_type,
3166                    "tags": ds.tags,
3167                })
3168            })
3169            .collect();
3170
3171        Ok(AwsResponse::json(
3172            StatusCode::OK,
3173            serde_json::to_string(&json!({ "deliverySources": sources })).unwrap(),
3174        ))
3175    }
3176
3177    fn delete_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3178        let body = body_json(req);
3179        let name = body["name"].as_str().ok_or_else(|| {
3180            AwsServiceError::aws_error(
3181                StatusCode::BAD_REQUEST,
3182                "InvalidParameterException",
3183                "name is required",
3184            )
3185        })?;
3186
3187        validate_string_length("name", name, 1, 60)?;
3188
3189        let mut state = self.state.write();
3190        if state.delivery_sources.remove(name).is_none() {
3191            return Err(AwsServiceError::aws_error(
3192                StatusCode::BAD_REQUEST,
3193                "ResourceNotFoundException",
3194                format!("Delivery source '{name}' does not exist."),
3195            ));
3196        }
3197
3198        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3199    }
3200
3201    // ---- Deliveries ----
3202
3203    fn create_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3204        let body = body_json(req);
3205        let delivery_source_name = body["deliverySourceName"]
3206            .as_str()
3207            .ok_or_else(|| {
3208                AwsServiceError::aws_error(
3209                    StatusCode::BAD_REQUEST,
3210                    "InvalidParameterException",
3211                    "deliverySourceName is required",
3212                )
3213            })?
3214            .to_string();
3215        let delivery_destination_arn = body["deliveryDestinationArn"]
3216            .as_str()
3217            .ok_or_else(|| {
3218                AwsServiceError::aws_error(
3219                    StatusCode::BAD_REQUEST,
3220                    "InvalidParameterException",
3221                    "deliveryDestinationArn is required",
3222                )
3223            })?
3224            .to_string();
3225
3226        validate_string_length("deliverySourceName", &delivery_source_name, 1, 60)?;
3227        validate_optional_string_length("fieldDelimiter", body["fieldDelimiter"].as_str(), 0, 5)?;
3228
3229        let tags: std::collections::HashMap<String, String> = body["tags"]
3230            .as_object()
3231            .map(|m| {
3232                m.iter()
3233                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3234                    .collect()
3235            })
3236            .unwrap_or_default();
3237
3238        let record_fields: Vec<String> = body["recordFields"]
3239            .as_array()
3240            .map(|a| {
3241                a.iter()
3242                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3243                    .collect()
3244            })
3245            .unwrap_or_default();
3246        let field_delimiter = body["fieldDelimiter"].as_str().map(|s| s.to_string());
3247        let s3_delivery_config = body["s3DeliveryConfiguration"].clone();
3248
3249        let mut state = self.state.write();
3250
3251        // Verify source exists
3252        if !state.delivery_sources.contains_key(&delivery_source_name) {
3253            return Err(AwsServiceError::aws_error(
3254                StatusCode::BAD_REQUEST,
3255                "ResourceNotFoundException",
3256                format!("Delivery source '{}' does not exist.", delivery_source_name),
3257            ));
3258        }
3259
3260        // Verify destination exists
3261        let dest_exists = state
3262            .delivery_destinations
3263            .values()
3264            .any(|dd| dd.arn == delivery_destination_arn);
3265        if !dest_exists {
3266            return Err(AwsServiceError::aws_error(
3267                StatusCode::BAD_REQUEST,
3268                "ResourceNotFoundException",
3269                format!(
3270                    "Delivery destination '{}' does not exist.",
3271                    delivery_destination_arn
3272                ),
3273            ));
3274        }
3275
3276        // Check for duplicate delivery (same source + destination)
3277        let already_exists = state.deliveries.values().any(|d| {
3278            d.delivery_source_name == delivery_source_name
3279                && d.delivery_destination_arn == delivery_destination_arn
3280        });
3281        if already_exists {
3282            return Err(AwsServiceError::aws_error(
3283                StatusCode::BAD_REQUEST,
3284                "ConflictException",
3285                "A delivery already exists for this source and destination.",
3286            ));
3287        }
3288
3289        // Determine destination type from ARN
3290        let dest_type = if delivery_destination_arn.contains(":s3:") {
3291            "S3"
3292        } else if delivery_destination_arn.contains(":firehose:") {
3293            "FH"
3294        } else {
3295            "CWL"
3296        };
3297
3298        let delivery_id = uuid::Uuid::new_v4().to_string();
3299        let arn = format!(
3300            "arn:aws:logs:{}:{}:delivery:{}",
3301            state.region, state.account_id, delivery_id
3302        );
3303
3304        let delivery = Delivery {
3305            id: delivery_id.clone(),
3306            delivery_source_name: delivery_source_name.clone(),
3307            delivery_destination_arn: delivery_destination_arn.clone(),
3308            delivery_destination_type: dest_type.to_string(),
3309            arn: arn.clone(),
3310            tags: tags.clone(),
3311        };
3312
3313        state.deliveries.insert(delivery_id.clone(), delivery);
3314
3315        let mut delivery_json = json!({
3316            "id": delivery_id,
3317            "deliverySourceName": delivery_source_name,
3318            "deliveryDestinationArn": delivery_destination_arn,
3319            "deliveryDestinationType": dest_type,
3320            "arn": arn,
3321            "tags": tags,
3322        });
3323        if !record_fields.is_empty() {
3324            delivery_json["recordFields"] = json!(record_fields);
3325        }
3326        if let Some(ref delim) = field_delimiter {
3327            delivery_json["fieldDelimiter"] = json!(delim);
3328        }
3329        if !s3_delivery_config.is_null() {
3330            delivery_json["s3DeliveryConfiguration"] = s3_delivery_config;
3331        }
3332
3333        Ok(AwsResponse::json(
3334            StatusCode::OK,
3335            serde_json::to_string(&json!({
3336                "delivery": delivery_json,
3337            }))
3338            .unwrap(),
3339        ))
3340    }
3341
3342    fn get_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3343        let body = body_json(req);
3344        let delivery_id = body["id"].as_str().ok_or_else(|| {
3345            AwsServiceError::aws_error(
3346                StatusCode::BAD_REQUEST,
3347                "InvalidParameterException",
3348                "id is required",
3349            )
3350        })?;
3351
3352        validate_string_length("id", delivery_id, 1, 64)?;
3353
3354        let state = self.state.read();
3355        let d = state.deliveries.get(delivery_id).ok_or_else(|| {
3356            AwsServiceError::aws_error(
3357                StatusCode::BAD_REQUEST,
3358                "ResourceNotFoundException",
3359                format!("Delivery '{delivery_id}' does not exist."),
3360            )
3361        })?;
3362
3363        Ok(AwsResponse::json(
3364            StatusCode::OK,
3365            serde_json::to_string(&json!({
3366                "delivery": {
3367                    "id": d.id,
3368                    "deliverySourceName": d.delivery_source_name,
3369                    "deliveryDestinationArn": d.delivery_destination_arn,
3370                    "deliveryDestinationType": d.delivery_destination_type,
3371                    "arn": d.arn,
3372                    "tags": d.tags,
3373                }
3374            }))
3375            .unwrap(),
3376        ))
3377    }
3378
3379    fn describe_deliveries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3380        let body = body_json(req);
3381        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3382        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3383
3384        let state = self.state.read();
3385        let deliveries: Vec<Value> = state
3386            .deliveries
3387            .values()
3388            .map(|d| {
3389                json!({
3390                    "id": d.id,
3391                    "deliverySourceName": d.delivery_source_name,
3392                    "deliveryDestinationArn": d.delivery_destination_arn,
3393                    "deliveryDestinationType": d.delivery_destination_type,
3394                    "arn": d.arn,
3395                    "tags": d.tags,
3396                })
3397            })
3398            .collect();
3399
3400        Ok(AwsResponse::json(
3401            StatusCode::OK,
3402            serde_json::to_string(&json!({ "deliveries": deliveries })).unwrap(),
3403        ))
3404    }
3405
3406    fn delete_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3407        let body = body_json(req);
3408        let delivery_id = body["id"].as_str().ok_or_else(|| {
3409            AwsServiceError::aws_error(
3410                StatusCode::BAD_REQUEST,
3411                "InvalidParameterException",
3412                "id is required",
3413            )
3414        })?;
3415
3416        validate_string_length("id", delivery_id, 1, 64)?;
3417
3418        let mut state = self.state.write();
3419        if state.deliveries.remove(delivery_id).is_none() {
3420            return Err(AwsServiceError::aws_error(
3421                StatusCode::BAD_REQUEST,
3422                "ResourceNotFoundException",
3423                format!("Delivery '{delivery_id}' does not exist."),
3424            ));
3425        }
3426
3427        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3428    }
3429
3430    // ---- KMS Key ----
3431
3432    fn associate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3433        let body = body_json(req);
3434        let log_group_name = body["logGroupName"].as_str();
3435        let resource_identifier = body["resourceIdentifier"].as_str();
3436        let kms_key_id = body["kmsKeyId"]
3437            .as_str()
3438            .ok_or_else(|| {
3439                AwsServiceError::aws_error(
3440                    StatusCode::BAD_REQUEST,
3441                    "InvalidParameterException",
3442                    "kmsKeyId is required",
3443                )
3444            })?
3445            .to_string();
3446
3447        if let Some(name) = log_group_name {
3448            validate_string_length("logGroupName", name, 1, 512)?;
3449        }
3450        validate_string_length("kmsKeyId", &kms_key_id, 1, 256)?;
3451        validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3452
3453        let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3454
3455        let mut state = self.state.write();
3456        let group = state
3457            .log_groups
3458            .get_mut(resolved_name.as_str())
3459            .ok_or_else(|| {
3460                AwsServiceError::aws_error(
3461                    StatusCode::BAD_REQUEST,
3462                    "ResourceNotFoundException",
3463                    format!("The specified log group does not exist: {resolved_name}"),
3464                )
3465            })?;
3466
3467        group.kms_key_id = Some(kms_key_id);
3468
3469        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3470    }
3471
3472    fn disassociate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3473        let body = body_json(req);
3474        let log_group_name = body["logGroupName"].as_str();
3475        let resource_identifier = body["resourceIdentifier"].as_str();
3476
3477        if let Some(name) = log_group_name {
3478            validate_string_length("logGroupName", name, 1, 512)?;
3479        }
3480        validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3481
3482        let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3483
3484        let mut state = self.state.write();
3485        let group = state
3486            .log_groups
3487            .get_mut(resolved_name.as_str())
3488            .ok_or_else(|| {
3489                AwsServiceError::aws_error(
3490                    StatusCode::BAD_REQUEST,
3491                    "ResourceNotFoundException",
3492                    format!("The specified log group does not exist: {resolved_name}"),
3493                )
3494            })?;
3495
3496        group.kms_key_id = None;
3497
3498        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3499    }
3500
3501    // ---- Query Definitions ----
3502
3503    fn put_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3504        let body = body_json(req);
3505        let name = body["name"]
3506            .as_str()
3507            .ok_or_else(|| {
3508                AwsServiceError::aws_error(
3509                    StatusCode::BAD_REQUEST,
3510                    "InvalidParameterException",
3511                    "name is required",
3512                )
3513            })?
3514            .to_string();
3515        let query_string = body["queryString"]
3516            .as_str()
3517            .ok_or_else(|| {
3518                AwsServiceError::aws_error(
3519                    StatusCode::BAD_REQUEST,
3520                    "InvalidParameterException",
3521                    "queryString is required",
3522                )
3523            })?
3524            .to_string();
3525        let log_group_names: Vec<String> = body["logGroupNames"]
3526            .as_array()
3527            .map(|a| {
3528                a.iter()
3529                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3530                    .collect()
3531            })
3532            .unwrap_or_default();
3533
3534        let query_definition_id = body["queryDefinitionId"]
3535            .as_str()
3536            .map(|s| s.to_string())
3537            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
3538
3539        validate_string_length("name", &name, 1, 255)?;
3540        validate_string_length("queryString", &query_string, 1, 10000)?;
3541        validate_optional_string_length(
3542            "queryDefinitionId",
3543            body["queryDefinitionId"].as_str(),
3544            1,
3545            256,
3546        )?;
3547        validate_optional_string_length("clientToken", body["clientToken"].as_str(), 36, 128)?;
3548        validate_optional_enum_value(
3549            "queryLanguage",
3550            &body["queryLanguage"],
3551            &["CWLI", "SQL", "PPL"],
3552        )?;
3553
3554        let now = Utc::now().timestamp_millis();
3555
3556        let mut state = self.state.write();
3557        state.query_definitions.insert(
3558            query_definition_id.clone(),
3559            QueryDefinition {
3560                query_definition_id: query_definition_id.clone(),
3561                name,
3562                query_string,
3563                log_group_names,
3564                last_modified: now,
3565            },
3566        );
3567
3568        Ok(AwsResponse::json(
3569            StatusCode::OK,
3570            serde_json::to_string(&json!({
3571                "queryDefinitionId": query_definition_id,
3572            }))
3573            .unwrap(),
3574        ))
3575    }
3576
3577    fn describe_query_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3578        let body = body_json(req);
3579        let name_prefix = body["queryDefinitionNamePrefix"].as_str().unwrap_or("");
3580        validate_optional_string_length(
3581            "queryDefinitionNamePrefix",
3582            body["queryDefinitionNamePrefix"].as_str(),
3583            1,
3584            255,
3585        )?;
3586        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
3587        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3588        validate_optional_enum_value(
3589            "queryLanguage",
3590            &body["queryLanguage"],
3591            &["CWLI", "SQL", "PPL"],
3592        )?;
3593
3594        let state = self.state.read();
3595        let defs: Vec<Value> = state
3596            .query_definitions
3597            .values()
3598            .filter(|qd| name_prefix.is_empty() || qd.name.starts_with(name_prefix))
3599            .map(|qd| {
3600                json!({
3601                    "queryDefinitionId": qd.query_definition_id,
3602                    "name": qd.name,
3603                    "queryString": qd.query_string,
3604                    "logGroupNames": qd.log_group_names,
3605                    "lastModified": qd.last_modified,
3606                })
3607            })
3608            .collect();
3609
3610        Ok(AwsResponse::json(
3611            StatusCode::OK,
3612            serde_json::to_string(&json!({ "queryDefinitions": defs })).unwrap(),
3613        ))
3614    }
3615
3616    fn delete_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3617        let body = body_json(req);
3618        let qd_id = body["queryDefinitionId"].as_str().ok_or_else(|| {
3619            AwsServiceError::aws_error(
3620                StatusCode::BAD_REQUEST,
3621                "InvalidParameterException",
3622                "queryDefinitionId is required",
3623            )
3624        })?;
3625
3626        validate_string_length("queryDefinitionId", qd_id, 1, 256)?;
3627
3628        let mut state = self.state.write();
3629        let success = state.query_definitions.remove(qd_id).is_some();
3630
3631        Ok(AwsResponse::json(
3632            StatusCode::OK,
3633            serde_json::to_string(&json!({ "success": success })).unwrap(),
3634        ))
3635    }
3636
3637    // ---- Account Policies ----
3638
3639    fn put_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3640        let body = body_json(req);
3641        validate_optional_enum_value(
3642            "policyType",
3643            &body["policyType"],
3644            &[
3645                "DATA_PROTECTION_POLICY",
3646                "SUBSCRIPTION_FILTER_POLICY",
3647                "FIELD_INDEX_POLICY",
3648                "TRANSFORMER_POLICY",
3649                "METRIC_EXTRACTION_POLICY",
3650            ],
3651        )?;
3652        validate_optional_enum_value("scope", &body["scope"], &["ALL"])?;
3653        let policy_name = body["policyName"].as_str().ok_or_else(|| {
3654            AwsServiceError::aws_error(
3655                StatusCode::BAD_REQUEST,
3656                "InvalidParameterException",
3657                "policyName is required",
3658            )
3659        })?;
3660        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3661            AwsServiceError::aws_error(
3662                StatusCode::BAD_REQUEST,
3663                "InvalidParameterException",
3664                "policyType is required",
3665            )
3666        })?;
3667        let policy_document = body["policyDocument"].as_str().ok_or_else(|| {
3668            AwsServiceError::aws_error(
3669                StatusCode::BAD_REQUEST,
3670                "InvalidParameterException",
3671                "policyDocument is required",
3672            )
3673        })?;
3674
3675        let now = Utc::now().timestamp_millis();
3676        let mut state = self.state.write();
3677        let account_id = state.account_id.clone();
3678        let scope = body["scope"].as_str().map(|s| s.to_string());
3679        let selection_criteria = body["selectionCriteria"].as_str().map(|s| s.to_string());
3680
3681        let policy = AccountPolicy {
3682            policy_name: policy_name.to_string(),
3683            policy_type: policy_type.to_string(),
3684            policy_document: policy_document.to_string(),
3685            scope: scope.clone(),
3686            selection_criteria: selection_criteria.clone(),
3687            account_id: account_id.clone(),
3688            last_updated_time: now,
3689        };
3690
3691        let key = (policy_name.to_string(), policy_type.to_string());
3692        state.account_policies.insert(key, policy);
3693
3694        let mut result = json!({
3695            "accountPolicy": {
3696                "policyName": policy_name,
3697                "policyType": policy_type,
3698                "policyDocument": policy_document,
3699                "accountId": account_id,
3700                "lastUpdatedTime": now,
3701            }
3702        });
3703        if let Some(s) = scope {
3704            result["accountPolicy"]["scope"] = json!(s);
3705        }
3706        if let Some(s) = selection_criteria {
3707            result["accountPolicy"]["selectionCriteria"] = json!(s);
3708        }
3709
3710        Ok(AwsResponse::json(
3711            StatusCode::OK,
3712            serde_json::to_string(&result).unwrap(),
3713        ))
3714    }
3715
3716    fn describe_account_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3717        let body = body_json(req);
3718        validate_optional_enum_value(
3719            "policyType",
3720            &body["policyType"],
3721            &[
3722                "DATA_PROTECTION_POLICY",
3723                "SUBSCRIPTION_FILTER_POLICY",
3724                "FIELD_INDEX_POLICY",
3725                "TRANSFORMER_POLICY",
3726                "METRIC_EXTRACTION_POLICY",
3727            ],
3728        )?;
3729        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
3730        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3731            AwsServiceError::aws_error(
3732                StatusCode::BAD_REQUEST,
3733                "InvalidParameterException",
3734                "policyType is required",
3735            )
3736        })?;
3737        let policy_name = body["policyName"].as_str();
3738
3739        let state = self.state.read();
3740        let policies: Vec<Value> = state
3741            .account_policies
3742            .values()
3743            .filter(|p| {
3744                p.policy_type == policy_type && policy_name.is_none_or(|n| p.policy_name == n)
3745            })
3746            .map(|p| {
3747                let mut obj = json!({
3748                    "policyName": p.policy_name,
3749                    "policyType": p.policy_type,
3750                    "policyDocument": p.policy_document,
3751                    "accountId": p.account_id,
3752                    "lastUpdatedTime": p.last_updated_time,
3753                });
3754                if let Some(ref s) = p.scope {
3755                    obj["scope"] = json!(s);
3756                }
3757                if let Some(ref s) = p.selection_criteria {
3758                    obj["selectionCriteria"] = json!(s);
3759                }
3760                obj
3761            })
3762            .collect();
3763
3764        Ok(AwsResponse::json(
3765            StatusCode::OK,
3766            serde_json::to_string(&json!({ "accountPolicies": policies })).unwrap(),
3767        ))
3768    }
3769
3770    fn delete_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3771        let body = body_json(req);
3772        let policy_name = body["policyName"].as_str().ok_or_else(|| {
3773            AwsServiceError::aws_error(
3774                StatusCode::BAD_REQUEST,
3775                "InvalidParameterException",
3776                "policyName is required",
3777            )
3778        })?;
3779        let policy_type = body["policyType"].as_str().ok_or_else(|| {
3780            AwsServiceError::aws_error(
3781                StatusCode::BAD_REQUEST,
3782                "InvalidParameterException",
3783                "policyType is required",
3784            )
3785        })?;
3786
3787        let key = (policy_name.to_string(), policy_type.to_string());
3788        let mut state = self.state.write();
3789        if state.account_policies.remove(&key).is_none() {
3790            return Err(AwsServiceError::aws_error(
3791                StatusCode::BAD_REQUEST,
3792                "ResourceNotFoundException",
3793                format!("Account policy {policy_name} of type {policy_type} not found"),
3794            ));
3795        }
3796
3797        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3798    }
3799
3800    // ---- Data Protection Policies ----
3801
3802    fn put_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3803        let body = body_json(req);
3804        let log_group_id = body["logGroupIdentifier"]
3805            .as_str()
3806            .ok_or_else(|| {
3807                AwsServiceError::aws_error(
3808                    StatusCode::BAD_REQUEST,
3809                    "InvalidParameterException",
3810                    "logGroupIdentifier is required",
3811                )
3812            })?
3813            .to_string();
3814        let policy_document = body["policyDocument"]
3815            .as_str()
3816            .ok_or_else(|| {
3817                AwsServiceError::aws_error(
3818                    StatusCode::BAD_REQUEST,
3819                    "InvalidParameterException",
3820                    "policyDocument is required",
3821                )
3822            })?
3823            .to_string();
3824
3825        let group_name = if log_group_id.starts_with("arn:") {
3826            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3827                AwsServiceError::aws_error(
3828                    StatusCode::BAD_REQUEST,
3829                    "InvalidParameterException",
3830                    format!("Invalid ARN: {log_group_id}"),
3831                )
3832            })?
3833        } else {
3834            log_group_id.clone()
3835        };
3836
3837        let now = Utc::now().timestamp_millis();
3838        let mut state = self.state.write();
3839        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3840            AwsServiceError::aws_error(
3841                StatusCode::BAD_REQUEST,
3842                "ResourceNotFoundException",
3843                format!("The specified log group does not exist: {group_name}"),
3844            )
3845        })?;
3846        let log_group_id_resp = group.arn.clone();
3847
3848        group.data_protection_policy = Some(DataProtectionPolicy {
3849            policy_document: policy_document.clone(),
3850            last_updated_time: now,
3851        });
3852
3853        Ok(AwsResponse::json(
3854            StatusCode::OK,
3855            serde_json::to_string(&json!({
3856                "logGroupIdentifier": log_group_id_resp,
3857                "policyDocument": policy_document,
3858                "lastUpdatedTime": now,
3859            }))
3860            .unwrap(),
3861        ))
3862    }
3863
3864    fn get_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3865        let body = body_json(req);
3866        let log_group_id = body["logGroupIdentifier"]
3867            .as_str()
3868            .ok_or_else(|| {
3869                AwsServiceError::aws_error(
3870                    StatusCode::BAD_REQUEST,
3871                    "InvalidParameterException",
3872                    "logGroupIdentifier is required",
3873                )
3874            })?
3875            .to_string();
3876
3877        let group_name = if log_group_id.starts_with("arn:") {
3878            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3879                AwsServiceError::aws_error(
3880                    StatusCode::BAD_REQUEST,
3881                    "InvalidParameterException",
3882                    format!("Invalid ARN: {log_group_id}"),
3883                )
3884            })?
3885        } else {
3886            log_group_id.clone()
3887        };
3888
3889        let state = self.state.read();
3890        let group = state.log_groups.get(&group_name).ok_or_else(|| {
3891            AwsServiceError::aws_error(
3892                StatusCode::BAD_REQUEST,
3893                "ResourceNotFoundException",
3894                format!("The specified log group does not exist: {group_name}"),
3895            )
3896        })?;
3897
3898        let mut result = json!({
3899            "logGroupIdentifier": group.arn,
3900        });
3901        if let Some(ref dp) = group.data_protection_policy {
3902            result["policyDocument"] = json!(dp.policy_document);
3903            result["lastUpdatedTime"] = json!(dp.last_updated_time);
3904        }
3905
3906        Ok(AwsResponse::json(
3907            StatusCode::OK,
3908            serde_json::to_string(&result).unwrap(),
3909        ))
3910    }
3911
3912    fn delete_data_protection_policy(
3913        &self,
3914        req: &AwsRequest,
3915    ) -> Result<AwsResponse, AwsServiceError> {
3916        let body = body_json(req);
3917        let log_group_id = body["logGroupIdentifier"]
3918            .as_str()
3919            .ok_or_else(|| {
3920                AwsServiceError::aws_error(
3921                    StatusCode::BAD_REQUEST,
3922                    "InvalidParameterException",
3923                    "logGroupIdentifier is required",
3924                )
3925            })?
3926            .to_string();
3927
3928        let group_name = if log_group_id.starts_with("arn:") {
3929            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3930                AwsServiceError::aws_error(
3931                    StatusCode::BAD_REQUEST,
3932                    "InvalidParameterException",
3933                    format!("Invalid ARN: {log_group_id}"),
3934                )
3935            })?
3936        } else {
3937            log_group_id
3938        };
3939
3940        let mut state = self.state.write();
3941        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3942            AwsServiceError::aws_error(
3943                StatusCode::BAD_REQUEST,
3944                "ResourceNotFoundException",
3945                format!("The specified log group does not exist: {group_name}"),
3946            )
3947        })?;
3948
3949        if group.data_protection_policy.is_none() {
3950            return Err(AwsServiceError::aws_error(
3951                StatusCode::BAD_REQUEST,
3952                "ResourceNotFoundException",
3953                "No data protection policy found for this log group",
3954            ));
3955        }
3956
3957        group.data_protection_policy = None;
3958        Ok(AwsResponse::json(StatusCode::OK, "{}"))
3959    }
3960
3961    // ---- Index Policies ----
3962
3963    fn put_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3964        let body = body_json(req);
3965        let log_group_id = body["logGroupIdentifier"]
3966            .as_str()
3967            .ok_or_else(|| {
3968                AwsServiceError::aws_error(
3969                    StatusCode::BAD_REQUEST,
3970                    "InvalidParameterException",
3971                    "logGroupIdentifier is required",
3972                )
3973            })?
3974            .to_string();
3975        let policy_document = body["policyDocument"]
3976            .as_str()
3977            .ok_or_else(|| {
3978                AwsServiceError::aws_error(
3979                    StatusCode::BAD_REQUEST,
3980                    "InvalidParameterException",
3981                    "policyDocument is required",
3982                )
3983            })?
3984            .to_string();
3985
3986        let group_name = if log_group_id.starts_with("arn:") {
3987            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3988                AwsServiceError::aws_error(
3989                    StatusCode::BAD_REQUEST,
3990                    "InvalidParameterException",
3991                    format!("Invalid ARN: {log_group_id}"),
3992                )
3993            })?
3994        } else {
3995            log_group_id.clone()
3996        };
3997
3998        let policy_name = body["policyName"].as_str().unwrap_or("default").to_string();
3999
4000        let now = Utc::now().timestamp_millis();
4001        let mut state = self.state.write();
4002        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4003            AwsServiceError::aws_error(
4004                StatusCode::BAD_REQUEST,
4005                "ResourceNotFoundException",
4006                format!("The specified log group does not exist: {group_name}"),
4007            )
4008        })?;
4009
4010        // Replace existing policy with same name, or add new one
4011        if let Some(existing) = group
4012            .index_policies
4013            .iter_mut()
4014            .find(|p| p.policy_name == policy_name)
4015        {
4016            existing.policy_document = policy_document.clone();
4017            existing.last_updated_time = now;
4018        } else {
4019            group.index_policies.push(IndexPolicy {
4020                policy_name: policy_name.clone(),
4021                policy_document: policy_document.clone(),
4022                last_updated_time: now,
4023            });
4024        }
4025
4026        let result = json!({
4027            "indexPolicy": {
4028                "policyName": policy_name,
4029                "policyDocument": policy_document,
4030                "logGroupIdentifier": group.arn,
4031                "lastUpdateTime": now,
4032            }
4033        });
4034
4035        Ok(AwsResponse::json(
4036            StatusCode::OK,
4037            serde_json::to_string(&result).unwrap(),
4038        ))
4039    }
4040
4041    fn describe_index_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4042        let body = body_json(req);
4043        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4044        let log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4045            AwsServiceError::aws_error(
4046                StatusCode::BAD_REQUEST,
4047                "InvalidParameterException",
4048                "logGroupIdentifiers is required",
4049            )
4050        })?;
4051
4052        let state = self.state.read();
4053        let mut policies = Vec::new();
4054
4055        for id_val in log_group_ids {
4056            let id = id_val.as_str().unwrap_or("");
4057            let group_name = if id.starts_with("arn:") {
4058                extract_log_group_from_arn(id).unwrap_or_default()
4059            } else {
4060                id.to_string()
4061            };
4062            if let Some(group) = state.log_groups.get(&group_name) {
4063                for p in &group.index_policies {
4064                    policies.push(json!({
4065                        "policyName": p.policy_name,
4066                        "policyDocument": p.policy_document,
4067                        "logGroupIdentifier": group.arn,
4068                        "lastUpdateTime": p.last_updated_time,
4069                    }));
4070                }
4071            }
4072        }
4073
4074        Ok(AwsResponse::json(
4075            StatusCode::OK,
4076            serde_json::to_string(&json!({ "indexPolicies": policies })).unwrap(),
4077        ))
4078    }
4079
4080    fn delete_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4081        let body = body_json(req);
4082        let log_group_id = body["logGroupIdentifier"]
4083            .as_str()
4084            .ok_or_else(|| {
4085                AwsServiceError::aws_error(
4086                    StatusCode::BAD_REQUEST,
4087                    "InvalidParameterException",
4088                    "logGroupIdentifier is required",
4089                )
4090            })?
4091            .to_string();
4092
4093        let group_name = if log_group_id.starts_with("arn:") {
4094            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4095                AwsServiceError::aws_error(
4096                    StatusCode::BAD_REQUEST,
4097                    "InvalidParameterException",
4098                    format!("Invalid ARN: {log_group_id}"),
4099                )
4100            })?
4101        } else {
4102            log_group_id
4103        };
4104
4105        let mut state = self.state.write();
4106        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4107            AwsServiceError::aws_error(
4108                StatusCode::BAD_REQUEST,
4109                "ResourceNotFoundException",
4110                format!("The specified log group does not exist: {group_name}"),
4111            )
4112        })?;
4113
4114        if group.index_policies.is_empty() {
4115            return Err(AwsServiceError::aws_error(
4116                StatusCode::BAD_REQUEST,
4117                "ResourceNotFoundException",
4118                "No index policy found for this log group",
4119            ));
4120        }
4121
4122        group.index_policies.clear();
4123        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4124    }
4125
4126    fn describe_field_indexes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4127        let body = body_json(req);
4128        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4129        // Validate that logGroupIdentifiers is provided
4130        let _log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4131            AwsServiceError::aws_error(
4132                StatusCode::BAD_REQUEST,
4133                "InvalidParameterException",
4134                "logGroupIdentifiers is required",
4135            )
4136        })?;
4137
4138        // Stub: return empty list
4139        Ok(AwsResponse::json(
4140            StatusCode::OK,
4141            serde_json::to_string(&json!({ "fieldIndexes": [] })).unwrap(),
4142        ))
4143    }
4144
4145    // ---- Transformers ----
4146
4147    fn put_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4148        let body = body_json(req);
4149        let log_group_id = body["logGroupIdentifier"]
4150            .as_str()
4151            .ok_or_else(|| {
4152                AwsServiceError::aws_error(
4153                    StatusCode::BAD_REQUEST,
4154                    "InvalidParameterException",
4155                    "logGroupIdentifier is required",
4156                )
4157            })?
4158            .to_string();
4159        let transformer_config = body.get("transformerConfig").cloned().ok_or_else(|| {
4160            AwsServiceError::aws_error(
4161                StatusCode::BAD_REQUEST,
4162                "InvalidParameterException",
4163                "transformerConfig is required",
4164            )
4165        })?;
4166
4167        let group_name = if log_group_id.starts_with("arn:") {
4168            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4169                AwsServiceError::aws_error(
4170                    StatusCode::BAD_REQUEST,
4171                    "InvalidParameterException",
4172                    format!("Invalid ARN: {log_group_id}"),
4173                )
4174            })?
4175        } else {
4176            log_group_id.clone()
4177        };
4178
4179        let now = Utc::now().timestamp_millis();
4180        let mut state = self.state.write();
4181        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4182            AwsServiceError::aws_error(
4183                StatusCode::BAD_REQUEST,
4184                "ResourceNotFoundException",
4185                format!("The specified log group does not exist: {group_name}"),
4186            )
4187        })?;
4188
4189        group.transformer = Some(Transformer {
4190            transformer_config,
4191            creation_time: now,
4192            last_modified_time: now,
4193        });
4194
4195        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4196    }
4197
4198    fn get_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4199        let body = body_json(req);
4200        let log_group_id = body["logGroupIdentifier"]
4201            .as_str()
4202            .ok_or_else(|| {
4203                AwsServiceError::aws_error(
4204                    StatusCode::BAD_REQUEST,
4205                    "InvalidParameterException",
4206                    "logGroupIdentifier is required",
4207                )
4208            })?
4209            .to_string();
4210
4211        let group_name = if log_group_id.starts_with("arn:") {
4212            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4213                AwsServiceError::aws_error(
4214                    StatusCode::BAD_REQUEST,
4215                    "InvalidParameterException",
4216                    format!("Invalid ARN: {log_group_id}"),
4217                )
4218            })?
4219        } else {
4220            log_group_id.clone()
4221        };
4222
4223        let state = self.state.read();
4224        let group = state.log_groups.get(&group_name).ok_or_else(|| {
4225            AwsServiceError::aws_error(
4226                StatusCode::BAD_REQUEST,
4227                "ResourceNotFoundException",
4228                format!("The specified log group does not exist: {group_name}"),
4229            )
4230        })?;
4231
4232        let mut result = json!({
4233            "logGroupIdentifier": group.arn,
4234        });
4235        if let Some(ref t) = group.transformer {
4236            result["transformerConfig"] = t.transformer_config.clone();
4237            result["creationTime"] = json!(t.creation_time);
4238            result["lastModifiedTime"] = json!(t.last_modified_time);
4239        }
4240
4241        Ok(AwsResponse::json(
4242            StatusCode::OK,
4243            serde_json::to_string(&result).unwrap(),
4244        ))
4245    }
4246
4247    fn delete_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4248        let body = body_json(req);
4249        let log_group_id = body["logGroupIdentifier"]
4250            .as_str()
4251            .ok_or_else(|| {
4252                AwsServiceError::aws_error(
4253                    StatusCode::BAD_REQUEST,
4254                    "InvalidParameterException",
4255                    "logGroupIdentifier is required",
4256                )
4257            })?
4258            .to_string();
4259
4260        let group_name = if log_group_id.starts_with("arn:") {
4261            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4262                AwsServiceError::aws_error(
4263                    StatusCode::BAD_REQUEST,
4264                    "InvalidParameterException",
4265                    format!("Invalid ARN: {log_group_id}"),
4266                )
4267            })?
4268        } else {
4269            log_group_id
4270        };
4271
4272        let mut state = self.state.write();
4273        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4274            AwsServiceError::aws_error(
4275                StatusCode::BAD_REQUEST,
4276                "ResourceNotFoundException",
4277                format!("The specified log group does not exist: {group_name}"),
4278            )
4279        })?;
4280
4281        group.transformer = None;
4282        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4283    }
4284
4285    fn test_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4286        let body = body_json(req);
4287        let _transformer_config = body.get("transformerConfig").ok_or_else(|| {
4288            AwsServiceError::aws_error(
4289                StatusCode::BAD_REQUEST,
4290                "InvalidParameterException",
4291                "transformerConfig is required",
4292            )
4293        })?;
4294        let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4295            AwsServiceError::aws_error(
4296                StatusCode::BAD_REQUEST,
4297                "InvalidParameterException",
4298                "logEventMessages is required",
4299            )
4300        })?;
4301
4302        // Stub: return the input events as transformed output unchanged
4303        let transformed: Vec<Value> = log_event_messages
4304            .iter()
4305            .map(|msg| {
4306                json!({
4307                    "eventMessage": msg,
4308                    "transformedEventMessage": msg,
4309                })
4310            })
4311            .collect();
4312
4313        Ok(AwsResponse::json(
4314            StatusCode::OK,
4315            serde_json::to_string(&json!({
4316                "transformedLogs": transformed,
4317            }))
4318            .unwrap(),
4319        ))
4320    }
4321
4322    // ---- Anomaly Detectors ----
4323
4324    fn create_log_anomaly_detector(
4325        &self,
4326        req: &AwsRequest,
4327    ) -> Result<AwsResponse, AwsServiceError> {
4328        let body = body_json(req);
4329        validate_optional_string_length("detectorName", body["detectorName"].as_str(), 1, 2048)?;
4330        validate_optional_enum_value(
4331            "evaluationFrequency",
4332            &body["evaluationFrequency"],
4333            &[
4334                "ONE_MIN",
4335                "FIVE_MIN",
4336                "TEN_MIN",
4337                "FIFTEEN_MIN",
4338                "THIRTY_MIN",
4339                "ONE_HOUR",
4340            ],
4341        )?;
4342        validate_optional_string_length("filterPattern", body["filterPattern"].as_str(), 0, 1024)?;
4343        validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
4344        validate_optional_range_i64(
4345            "anomalyVisibilityTime",
4346            body["anomalyVisibilityTime"].as_i64(),
4347            7,
4348            90,
4349        )?;
4350
4351        let log_group_arn_list = body["logGroupArnList"]
4352            .as_array()
4353            .ok_or_else(|| {
4354                AwsServiceError::aws_error(
4355                    StatusCode::BAD_REQUEST,
4356                    "InvalidParameterException",
4357                    "logGroupArnList is required",
4358                )
4359            })?
4360            .iter()
4361            .filter_map(|v| v.as_str().map(|s| s.to_string()))
4362            .collect::<Vec<_>>();
4363
4364        let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
4365        let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
4366        let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
4367        let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
4368
4369        let now = Utc::now().timestamp_millis();
4370        let mut state = self.state.write();
4371        let detector_id = uuid::Uuid::new_v4().to_string();
4372        let arn = format!(
4373            "arn:aws:logs:{}:{}:anomaly-detector:{}",
4374            state.region, state.account_id, detector_id
4375        );
4376
4377        let detector = AnomalyDetector {
4378            detector_name: detector_name.clone(),
4379            arn: arn.clone(),
4380            log_group_arn_list,
4381            evaluation_frequency,
4382            filter_pattern,
4383            anomaly_visibility_time,
4384            creation_time: now,
4385            last_modified_time: now,
4386            enabled: true,
4387        };
4388
4389        state.anomaly_detectors.insert(arn.clone(), detector);
4390
4391        Ok(AwsResponse::json(
4392            StatusCode::OK,
4393            serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
4394        ))
4395    }
4396
4397    fn get_log_anomaly_detector(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4398        let body = body_json(req);
4399        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4400            AwsServiceError::aws_error(
4401                StatusCode::BAD_REQUEST,
4402                "InvalidParameterException",
4403                "anomalyDetectorArn is required",
4404            )
4405        })?;
4406
4407        let state = self.state.read();
4408        let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
4409            AwsServiceError::aws_error(
4410                StatusCode::BAD_REQUEST,
4411                "ResourceNotFoundException",
4412                format!("Anomaly detector not found: {arn}"),
4413            )
4414        })?;
4415
4416        let mut result = json!({
4417            "anomalyDetectorArn": detector.arn,
4418            "detectorName": detector.detector_name,
4419            "logGroupArnList": detector.log_group_arn_list,
4420            "creationTimeStamp": detector.creation_time,
4421            "lastModifiedTimeStamp": detector.last_modified_time,
4422            "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
4423        });
4424        if let Some(ref f) = detector.evaluation_frequency {
4425            result["evaluationFrequency"] = json!(f);
4426        }
4427        if let Some(ref f) = detector.filter_pattern {
4428            result["filterPattern"] = json!(f);
4429        }
4430        if let Some(t) = detector.anomaly_visibility_time {
4431            result["anomalyVisibilityTime"] = json!(t);
4432        }
4433
4434        Ok(AwsResponse::json(
4435            StatusCode::OK,
4436            serde_json::to_string(&result).unwrap(),
4437        ))
4438    }
4439
4440    fn delete_log_anomaly_detector(
4441        &self,
4442        req: &AwsRequest,
4443    ) -> Result<AwsResponse, AwsServiceError> {
4444        let body = body_json(req);
4445        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4446            AwsServiceError::aws_error(
4447                StatusCode::BAD_REQUEST,
4448                "InvalidParameterException",
4449                "anomalyDetectorArn is required",
4450            )
4451        })?;
4452
4453        let mut state = self.state.write();
4454        if state.anomaly_detectors.remove(arn).is_none() {
4455            return Err(AwsServiceError::aws_error(
4456                StatusCode::BAD_REQUEST,
4457                "ResourceNotFoundException",
4458                format!("Anomaly detector not found: {arn}"),
4459            ));
4460        }
4461
4462        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4463    }
4464
4465    fn list_log_anomaly_detectors(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4466        let body = body_json(req);
4467        validate_optional_string_length(
4468            "filterLogGroupArn",
4469            body["filterLogGroupArn"].as_str(),
4470            1,
4471            2048,
4472        )?;
4473        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4474        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4475        let filter_log_group_arn = body["filterLogGroupArn"].as_str();
4476        let _limit = body["limit"].as_i64().unwrap_or(50);
4477
4478        let state = self.state.read();
4479        let detectors: Vec<Value> = state
4480            .anomaly_detectors
4481            .values()
4482            .filter(|d| {
4483                filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
4484            })
4485            .map(|d| {
4486                let mut obj = json!({
4487                    "anomalyDetectorArn": d.arn,
4488                    "detectorName": d.detector_name,
4489                    "logGroupArnList": d.log_group_arn_list,
4490                    "creationTimeStamp": d.creation_time,
4491                    "lastModifiedTimeStamp": d.last_modified_time,
4492                    "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
4493                });
4494                if let Some(ref f) = d.evaluation_frequency {
4495                    obj["evaluationFrequency"] = json!(f);
4496                }
4497                if let Some(ref f) = d.filter_pattern {
4498                    obj["filterPattern"] = json!(f);
4499                }
4500                if let Some(t) = d.anomaly_visibility_time {
4501                    obj["anomalyVisibilityTime"] = json!(t);
4502                }
4503                obj
4504            })
4505            .collect();
4506
4507        Ok(AwsResponse::json(
4508            StatusCode::OK,
4509            serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
4510        ))
4511    }
4512
4513    fn update_log_anomaly_detector(
4514        &self,
4515        req: &AwsRequest,
4516    ) -> Result<AwsResponse, AwsServiceError> {
4517        let body = body_json(req);
4518        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4519            AwsServiceError::aws_error(
4520                StatusCode::BAD_REQUEST,
4521                "InvalidParameterException",
4522                "anomalyDetectorArn is required",
4523            )
4524        })?;
4525        let enabled = body["enabled"].as_bool().unwrap_or(true);
4526
4527        let mut state = self.state.write();
4528        let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
4529            AwsServiceError::aws_error(
4530                StatusCode::BAD_REQUEST,
4531                "ResourceNotFoundException",
4532                format!("Anomaly detector not found: {arn}"),
4533            )
4534        })?;
4535
4536        detector.enabled = enabled;
4537        if let Some(f) = body["evaluationFrequency"].as_str() {
4538            detector.evaluation_frequency = Some(f.to_string());
4539        }
4540        if let Some(f) = body["filterPattern"].as_str() {
4541            detector.filter_pattern = Some(f.to_string());
4542        }
4543        if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
4544            detector.anomaly_visibility_time = Some(t);
4545        }
4546        detector.last_modified_time = Utc::now().timestamp_millis();
4547
4548        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4549    }
4550
4551    // ---- Misc Operations ----
4552
4553    fn get_log_group_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4554        let body = body_json(req);
4555        let log_group_id = body["logGroupName"]
4556            .as_str()
4557            .or_else(|| body["logGroupIdentifier"].as_str())
4558            .ok_or_else(|| {
4559                AwsServiceError::aws_error(
4560                    StatusCode::BAD_REQUEST,
4561                    "InvalidParameterException",
4562                    "logGroupName or logGroupIdentifier is required",
4563                )
4564            })?;
4565
4566        let group_name = if log_group_id.starts_with("arn:") {
4567            extract_log_group_from_arn(log_group_id).ok_or_else(|| {
4568                AwsServiceError::aws_error(
4569                    StatusCode::BAD_REQUEST,
4570                    "InvalidParameterException",
4571                    format!("Invalid ARN: {log_group_id}"),
4572                )
4573            })?
4574        } else {
4575            log_group_id.to_string()
4576        };
4577
4578        let state = self.state.read();
4579        if !state.log_groups.contains_key(&group_name) {
4580            return Err(AwsServiceError::aws_error(
4581                StatusCode::BAD_REQUEST,
4582                "ResourceNotFoundException",
4583                format!("The specified log group does not exist: {group_name}"),
4584            ));
4585        }
4586
4587        // Stub response with common fields
4588        let fields = json!([
4589            { "fieldName": "@timestamp", "percent": 100 },
4590            { "fieldName": "@message", "percent": 100 },
4591        ]);
4592
4593        Ok(AwsResponse::json(
4594            StatusCode::OK,
4595            serde_json::to_string(&json!({ "logGroupFields": fields })).unwrap(),
4596        ))
4597    }
4598
4599    fn test_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4600        let body = body_json(req);
4601        let filter_pattern = body["filterPattern"].as_str().ok_or_else(|| {
4602            AwsServiceError::aws_error(
4603                StatusCode::BAD_REQUEST,
4604                "InvalidParameterException",
4605                "filterPattern is required",
4606            )
4607        })?;
4608        validate_string_length("filterPattern", filter_pattern, 0, 1024)?;
4609        let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4610            AwsServiceError::aws_error(
4611                StatusCode::BAD_REQUEST,
4612                "InvalidParameterException",
4613                "logEventMessages is required",
4614            )
4615        })?;
4616
4617        let matches: Vec<Value> = log_event_messages
4618            .iter()
4619            .enumerate()
4620            .filter(|(_, msg)| {
4621                let msg_str = msg.as_str().unwrap_or("");
4622                matches_filter_pattern(filter_pattern, msg_str)
4623            })
4624            .map(|(i, msg)| {
4625                json!({
4626                    "eventNumber": i + 1,
4627                    "eventMessage": msg,
4628                    "extractedValues": {},
4629                })
4630            })
4631            .collect();
4632
4633        Ok(AwsResponse::json(
4634            StatusCode::OK,
4635            serde_json::to_string(&json!({ "matches": matches })).unwrap(),
4636        ))
4637    }
4638
4639    fn stop_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4640        let body = body_json(req);
4641        let query_id = body["queryId"].as_str().ok_or_else(|| {
4642            AwsServiceError::aws_error(
4643                StatusCode::BAD_REQUEST,
4644                "InvalidParameterException",
4645                "queryId is required",
4646            )
4647        })?;
4648
4649        let mut state = self.state.write();
4650        let query = state.queries.get_mut(query_id).ok_or_else(|| {
4651            AwsServiceError::aws_error(
4652                StatusCode::BAD_REQUEST,
4653                "InvalidParameterException",
4654                format!("Query {query_id} is not in a cancellable state"),
4655            )
4656        })?;
4657
4658        let was_running = query.status == "Running" || query.status == "Scheduled";
4659        if was_running {
4660            query.status = "Cancelled".to_string();
4661        }
4662
4663        Ok(AwsResponse::json(
4664            StatusCode::OK,
4665            serde_json::to_string(&json!({ "success": was_running })).unwrap(),
4666        ))
4667    }
4668
4669    fn put_log_group_deletion_protection(
4670        &self,
4671        req: &AwsRequest,
4672    ) -> Result<AwsResponse, AwsServiceError> {
4673        let body = body_json(req);
4674        let log_group_id = body["logGroupIdentifier"]
4675            .as_str()
4676            .ok_or_else(|| {
4677                AwsServiceError::aws_error(
4678                    StatusCode::BAD_REQUEST,
4679                    "InvalidParameterException",
4680                    "logGroupIdentifier is required",
4681                )
4682            })?
4683            .to_string();
4684        let deletion_protection = body["deletionProtectionEnabled"].as_bool().unwrap_or(true);
4685
4686        let group_name = if log_group_id.starts_with("arn:") {
4687            extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4688                AwsServiceError::aws_error(
4689                    StatusCode::BAD_REQUEST,
4690                    "InvalidParameterException",
4691                    format!("Invalid ARN: {log_group_id}"),
4692                )
4693            })?
4694        } else {
4695            log_group_id
4696        };
4697
4698        let mut state = self.state.write();
4699        let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4700            AwsServiceError::aws_error(
4701                StatusCode::BAD_REQUEST,
4702                "ResourceNotFoundException",
4703                format!("The specified log group does not exist: {group_name}"),
4704            )
4705        })?;
4706
4707        group.deletion_protection = deletion_protection;
4708        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4709    }
4710
4711    fn get_log_record(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4712        let body = body_json(req);
4713        let _log_record_pointer = body["logRecordPointer"].as_str().ok_or_else(|| {
4714            AwsServiceError::aws_error(
4715                StatusCode::BAD_REQUEST,
4716                "InvalidParameterException",
4717                "logRecordPointer is required",
4718            )
4719        })?;
4720
4721        // Stub: return empty log record
4722        Ok(AwsResponse::json(
4723            StatusCode::OK,
4724            serde_json::to_string(&json!({ "logRecord": {} })).unwrap(),
4725        ))
4726    }
4727
4728    fn list_anomalies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4729        let body = body_json(req);
4730        validate_optional_string_length(
4731            "anomalyDetectorArn",
4732            body["anomalyDetectorArn"].as_str(),
4733            1,
4734            2048,
4735        )?;
4736        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4737        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4738        validate_optional_enum_value(
4739            "suppressionState",
4740            &body["suppressionState"],
4741            &["SUPPRESSED", "UNSUPPRESSED"],
4742        )?;
4743        // Stub: return empty anomalies list
4744        Ok(AwsResponse::json(
4745            StatusCode::OK,
4746            serde_json::to_string(&json!({ "anomalies": [] })).unwrap(),
4747        ))
4748    }
4749
4750    fn update_anomaly(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4751        let body = body_json(req);
4752        validate_required("anomalyDetectorArn", &body["anomalyDetectorArn"])?;
4753        validate_optional_string_length(
4754            "anomalyDetectorArn",
4755            body["anomalyDetectorArn"].as_str(),
4756            1,
4757            2048,
4758        )?;
4759        validate_optional_string_length("anomalyId", body["anomalyId"].as_str(), 36, 36)?;
4760        validate_optional_string_length("patternId", body["patternId"].as_str(), 32, 32)?;
4761        validate_optional_enum_value(
4762            "suppressionType",
4763            &body["suppressionType"],
4764            &["LIMITED", "INFINITE"],
4765        )?;
4766        // No-op stub
4767        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4768    }
4769
4770    // -- Import tasks --
4771
4772    fn create_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4773        let body = body_json(req);
4774        let import_source_arn = require_str(&body, "importSourceArn")?;
4775        let import_role_arn = require_str(&body, "importRoleArn")?;
4776        validate_string_length("importRoleArn", import_role_arn, 1, 2048)?;
4777        let log_group_name = body["logGroupName"].as_str().map(|s| s.to_string());
4778
4779        let import_id = uuid::Uuid::new_v4().to_string();
4780        let now = Utc::now().timestamp_millis();
4781
4782        let task = ImportTask {
4783            import_id: import_id.clone(),
4784            import_source_arn: import_source_arn.to_string(),
4785            import_role_arn: import_role_arn.to_string(),
4786            log_group_name,
4787            status: "RUNNING".to_string(),
4788            creation_time: now,
4789        };
4790
4791        let mut state = self.state.write();
4792        state.import_tasks.insert(import_id.clone(), task);
4793
4794        Ok(AwsResponse::json(
4795            StatusCode::OK,
4796            serde_json::to_string(&json!({ "importId": import_id })).unwrap(),
4797        ))
4798    }
4799
4800    fn describe_import_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4801        let body = body_json(req);
4802        validate_optional_string_length("importId", body["importId"].as_str(), 1, 256)?;
4803        validate_optional_enum_value(
4804            "importStatus",
4805            &body["importStatus"],
4806            &["IN_PROGRESS", "CANCELLED", "COMPLETED", "FAILED"],
4807        )?;
4808        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4809        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4810
4811        let state = self.state.read();
4812        let tasks: Vec<Value> = state
4813            .import_tasks
4814            .values()
4815            .map(|t| {
4816                json!({
4817                    "importId": t.import_id,
4818                    "importSourceArn": t.import_source_arn,
4819                    "importStatus": t.status,
4820                    "creationTime": t.creation_time,
4821                })
4822            })
4823            .collect();
4824        Ok(AwsResponse::json(
4825            StatusCode::OK,
4826            serde_json::to_string(&json!({ "imports": tasks })).unwrap(),
4827        ))
4828    }
4829
4830    fn describe_import_task_batches(
4831        &self,
4832        req: &AwsRequest,
4833    ) -> Result<AwsResponse, AwsServiceError> {
4834        let body = body_json(req);
4835        let import_id = require_str(&body, "importId")?;
4836        validate_string_length("importId", import_id, 1, 256)?;
4837        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4838        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4839        // Stub: return empty batches
4840        Ok(AwsResponse::json(
4841            StatusCode::OK,
4842            serde_json::to_string(&json!({ "importBatches": [] })).unwrap(),
4843        ))
4844    }
4845
4846    fn cancel_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4847        let body = body_json(req);
4848        let import_id = require_str(&body, "importId")?;
4849
4850        let mut state = self.state.write();
4851        match state.import_tasks.get_mut(import_id) {
4852            Some(task) => {
4853                task.status = "CANCELLED".to_string();
4854                Ok(AwsResponse::json(StatusCode::OK, "{}"))
4855            }
4856            None => Err(AwsServiceError::aws_error(
4857                StatusCode::NOT_FOUND,
4858                "ResourceNotFoundException",
4859                format!("Import task not found: {import_id}"),
4860            )),
4861        }
4862    }
4863
4864    // -- Integrations --
4865
4866    fn put_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4867        let body = body_json(req);
4868        validate_required("resourceConfig", &body["resourceConfig"])?;
4869        let integration_name = require_str(&body, "integrationName")?;
4870        validate_string_length("integrationName", integration_name, 1, 50)?;
4871        let integration_type = require_str(&body, "integrationType")?;
4872        validate_enum("integrationType", integration_type, &["OPENSEARCH"])?;
4873        let resource_config = body["resourceConfig"].clone();
4874
4875        let now = Utc::now().timestamp_millis();
4876        let integration = Integration {
4877            integration_name: integration_name.to_string(),
4878            integration_type: integration_type.to_string(),
4879            resource_config,
4880            status: "ACTIVE".to_string(),
4881            creation_time: now,
4882        };
4883
4884        let mut state = self.state.write();
4885        state
4886            .integrations
4887            .insert(integration_name.to_string(), integration);
4888
4889        Ok(AwsResponse::json(
4890            StatusCode::OK,
4891            serde_json::to_string(&json!({
4892                "integrationName": integration_name,
4893                "integrationStatus": "ACTIVE"
4894            }))
4895            .unwrap(),
4896        ))
4897    }
4898
4899    fn get_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4900        let body = body_json(req);
4901        let integration_name = require_str(&body, "integrationName")?;
4902
4903        let state = self.state.read();
4904        match state.integrations.get(integration_name) {
4905            Some(i) => Ok(AwsResponse::json(
4906                StatusCode::OK,
4907                serde_json::to_string(&json!({
4908                    "integrationName": i.integration_name,
4909                    "integrationType": i.integration_type,
4910                    "integrationStatus": i.status,
4911                }))
4912                .unwrap(),
4913            )),
4914            None => Err(AwsServiceError::aws_error(
4915                StatusCode::NOT_FOUND,
4916                "ResourceNotFoundException",
4917                format!("Integration not found: {integration_name}"),
4918            )),
4919        }
4920    }
4921
4922    fn delete_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4923        let body = body_json(req);
4924        let integration_name = require_str(&body, "integrationName")?;
4925        validate_string_length("integrationName", integration_name, 1, 50)?;
4926
4927        let mut state = self.state.write();
4928        state.integrations.remove(integration_name);
4929        Ok(AwsResponse::json(StatusCode::OK, "{}"))
4930    }
4931
4932    fn list_integrations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4933        let body = body_json(req);
4934        validate_optional_string_length(
4935            "integrationNamePrefix",
4936            body["integrationNamePrefix"].as_str(),
4937            1,
4938            50,
4939        )?;
4940        validate_optional_enum_value("integrationType", &body["integrationType"], &["OPENSEARCH"])?;
4941        validate_optional_enum_value(
4942            "integrationStatus",
4943            &body["integrationStatus"],
4944            &["PROVISIONING", "ACTIVE", "FAILED"],
4945        )?;
4946
4947        let state = self.state.read();
4948        let integrations: Vec<Value> = state
4949            .integrations
4950            .values()
4951            .map(|i| {
4952                json!({
4953                    "integrationName": i.integration_name,
4954                    "integrationType": i.integration_type,
4955                    "integrationStatus": i.status,
4956                })
4957            })
4958            .collect();
4959        Ok(AwsResponse::json(
4960            StatusCode::OK,
4961            serde_json::to_string(&json!({ "integrationSummaries": integrations })).unwrap(),
4962        ))
4963    }
4964
4965    // -- Lookup tables --
4966
4967    fn create_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4968        let body = body_json(req);
4969        let lookup_table_name = require_str(&body, "lookupTableName")?;
4970        validate_string_length("lookupTableName", lookup_table_name, 1, 256)?;
4971        let table_body = require_str(&body, "tableBody")?;
4972        validate_string_length("tableBody", table_body, 1, 10485760)?;
4973        validate_optional_string_length("description", body["description"].as_str(), 0, 1024)?;
4974        validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
4975
4976        let state_r = self.state.read();
4977        let account_id = state_r.account_id.clone();
4978        let region = state_r.region.clone();
4979        drop(state_r);
4980
4981        let arn = format!("arn:aws:logs:{region}:{account_id}:lookup-table:{lookup_table_name}");
4982        let now = Utc::now().timestamp_millis();
4983
4984        let table = LookupTable {
4985            lookup_table_name: lookup_table_name.to_string(),
4986            arn: arn.clone(),
4987            table_body: table_body.to_string(),
4988            creation_time: now,
4989            last_modified_time: now,
4990        };
4991
4992        let mut state = self.state.write();
4993        state.lookup_tables.insert(arn.clone(), table);
4994
4995        Ok(AwsResponse::json(
4996            StatusCode::OK,
4997            serde_json::to_string(&json!({ "lookupTableArn": arn })).unwrap(),
4998        ))
4999    }
5000
5001    fn get_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5002        let body = body_json(req);
5003        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5004
5005        let state = self.state.read();
5006        match state.lookup_tables.get(lookup_table_arn) {
5007            Some(t) => Ok(AwsResponse::json(
5008                StatusCode::OK,
5009                serde_json::to_string(&json!({
5010                    "lookupTableName": t.lookup_table_name,
5011                    "lookupTableArn": t.arn,
5012                    "tableBody": t.table_body,
5013                    "creationTime": t.creation_time,
5014                    "lastModifiedTime": t.last_modified_time,
5015                }))
5016                .unwrap(),
5017            )),
5018            None => Err(AwsServiceError::aws_error(
5019                StatusCode::NOT_FOUND,
5020                "ResourceNotFoundException",
5021                format!("Lookup table not found: {lookup_table_arn}"),
5022            )),
5023        }
5024    }
5025
5026    fn describe_lookup_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5027        let body = body_json(req);
5028        validate_optional_string_length(
5029            "lookupTableNamePrefix",
5030            body["lookupTableNamePrefix"].as_str(),
5031            1,
5032            256,
5033        )?;
5034        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 100)?;
5035        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5036
5037        let state = self.state.read();
5038        let tables: Vec<Value> = state
5039            .lookup_tables
5040            .values()
5041            .map(|t| {
5042                json!({
5043                    "lookupTableName": t.lookup_table_name,
5044                    "lookupTableArn": t.arn,
5045                })
5046            })
5047            .collect();
5048        Ok(AwsResponse::json(
5049            StatusCode::OK,
5050            serde_json::to_string(&json!({ "lookupTables": tables })).unwrap(),
5051        ))
5052    }
5053
5054    fn delete_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5055        let body = body_json(req);
5056        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5057
5058        let mut state = self.state.write();
5059        state.lookup_tables.remove(lookup_table_arn);
5060        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5061    }
5062
5063    fn update_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5064        let body = body_json(req);
5065        let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5066        let table_body = require_str(&body, "tableBody")?;
5067
5068        let mut state = self.state.write();
5069        match state.lookup_tables.get_mut(lookup_table_arn) {
5070            Some(t) => {
5071                t.table_body = table_body.to_string();
5072                t.last_modified_time = Utc::now().timestamp_millis();
5073                Ok(AwsResponse::json(StatusCode::OK, "{}"))
5074            }
5075            None => Err(AwsServiceError::aws_error(
5076                StatusCode::NOT_FOUND,
5077                "ResourceNotFoundException",
5078                format!("Lookup table not found: {lookup_table_arn}"),
5079            )),
5080        }
5081    }
5082
5083    // -- Scheduled queries --
5084
5085    fn create_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5086        let body = body_json(req);
5087        let name = require_str(&body, "name")?;
5088        validate_string_length("name", name, 1, 255)?;
5089        validate_optional_string_length("description", body["description"].as_str(), 0, 1024)?;
5090        let query_string = require_str(&body, "queryString")?;
5091        validate_string_length("queryString", query_string, 0, 10000)?;
5092        let query_language = require_str(&body, "queryLanguage")?;
5093        validate_enum("queryLanguage", query_language, &["CWLI", "SQL", "PPL"])?;
5094        let schedule_expression = require_str(&body, "scheduleExpression")?;
5095        validate_string_length("scheduleExpression", schedule_expression, 0, 256)?;
5096        let execution_role_arn = require_str(&body, "executionRoleArn")?;
5097        validate_string_length("executionRoleArn", execution_role_arn, 1, 2048)?;
5098        validate_optional_string_length("timezone", body["timezone"].as_str(), 1, 2048)?;
5099        validate_optional_range_i64(
5100            "scheduleStartTime",
5101            body["scheduleStartTime"].as_i64(),
5102            0,
5103            i64::MAX,
5104        )?;
5105        validate_optional_range_i64(
5106            "scheduleEndTime",
5107            body["scheduleEndTime"].as_i64(),
5108            0,
5109            i64::MAX,
5110        )?;
5111        validate_optional_enum_value("state", &body["state"], &["ENABLED", "DISABLED"])?;
5112
5113        let state_r = self.state.read();
5114        let account_id = state_r.account_id.clone();
5115        let region = state_r.region.clone();
5116        drop(state_r);
5117
5118        let arn = format!("arn:aws:logs:{region}:{account_id}:scheduled-query:{name}");
5119        let now = Utc::now().timestamp_millis();
5120
5121        let sq = ScheduledQuery {
5122            name: name.to_string(),
5123            arn: arn.clone(),
5124            query_string: query_string.to_string(),
5125            query_language: query_language.to_string(),
5126            schedule_expression: schedule_expression.to_string(),
5127            execution_role_arn: execution_role_arn.to_string(),
5128            status: "ACTIVE".to_string(),
5129            creation_time: now,
5130            last_modified_time: now,
5131        };
5132
5133        let mut state = self.state.write();
5134        state.scheduled_queries.insert(arn.clone(), sq);
5135
5136        Ok(AwsResponse::json(
5137            StatusCode::OK,
5138            serde_json::to_string(&json!({ "scheduledQueryArn": arn })).unwrap(),
5139        ))
5140    }
5141
5142    fn get_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5143        let body = body_json(req);
5144        let identifier = require_str(&body, "identifier")?;
5145
5146        let state = self.state.read();
5147        match state.scheduled_queries.get(identifier) {
5148            Some(sq) => Ok(AwsResponse::json(
5149                StatusCode::OK,
5150                serde_json::to_string(&json!({
5151                    "scheduledQueryArn": sq.arn,
5152                    "name": sq.name,
5153                    "queryString": sq.query_string,
5154                    "queryLanguage": sq.query_language,
5155                    "scheduleExpression": sq.schedule_expression,
5156                    "executionRoleArn": sq.execution_role_arn,
5157                }))
5158                .unwrap(),
5159            )),
5160            None => Err(AwsServiceError::aws_error(
5161                StatusCode::NOT_FOUND,
5162                "ResourceNotFoundException",
5163                format!("Scheduled query not found: {identifier}"),
5164            )),
5165        }
5166    }
5167
5168    fn get_scheduled_query_history(
5169        &self,
5170        req: &AwsRequest,
5171    ) -> Result<AwsResponse, AwsServiceError> {
5172        let body = body_json(req);
5173        let _identifier = require_str(&body, "identifier")?;
5174        validate_required("startTime", &body["startTime"])?;
5175        validate_required("endTime", &body["endTime"])?;
5176        validate_optional_range_i64("startTime", body["startTime"].as_i64(), 0, i64::MAX)?;
5177        validate_optional_range_i64("endTime", body["endTime"].as_i64(), 0, i64::MAX)?;
5178        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
5179        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5180        // Stub: return empty history
5181        Ok(AwsResponse::json(
5182            StatusCode::OK,
5183            serde_json::to_string(&json!({ "triggerHistory": [] })).unwrap(),
5184        ))
5185    }
5186
5187    fn list_scheduled_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5188        let body = body_json(req);
5189        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
5190        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5191        validate_optional_enum_value("state", &body["state"], &["ENABLED", "DISABLED"])?;
5192
5193        let state = self.state.read();
5194        let queries: Vec<Value> = state
5195            .scheduled_queries
5196            .values()
5197            .map(|sq| {
5198                json!({
5199                    "name": sq.name,
5200                    "scheduledQueryArn": sq.arn,
5201                })
5202            })
5203            .collect();
5204        Ok(AwsResponse::json(
5205            StatusCode::OK,
5206            serde_json::to_string(&json!({ "scheduledQueries": queries })).unwrap(),
5207        ))
5208    }
5209
5210    fn delete_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5211        let body = body_json(req);
5212        let identifier = require_str(&body, "identifier")?;
5213
5214        let mut state = self.state.write();
5215        state.scheduled_queries.remove(identifier);
5216        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5217    }
5218
5219    fn update_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5220        let body = body_json(req);
5221        let identifier = require_str(&body, "identifier")?;
5222        let query_string = require_str(&body, "queryString")?;
5223        let query_language = require_str(&body, "queryLanguage")?;
5224        let schedule_expression = require_str(&body, "scheduleExpression")?;
5225        let execution_role_arn = require_str(&body, "executionRoleArn")?;
5226
5227        let mut state = self.state.write();
5228        match state.scheduled_queries.get_mut(identifier) {
5229            Some(sq) => {
5230                sq.query_string = query_string.to_string();
5231                sq.query_language = query_language.to_string();
5232                sq.schedule_expression = schedule_expression.to_string();
5233                sq.execution_role_arn = execution_role_arn.to_string();
5234                sq.last_modified_time = Utc::now().timestamp_millis();
5235                Ok(AwsResponse::json(StatusCode::OK, "{}"))
5236            }
5237            None => Err(AwsServiceError::aws_error(
5238                StatusCode::NOT_FOUND,
5239                "ResourceNotFoundException",
5240                format!("Scheduled query not found: {identifier}"),
5241            )),
5242        }
5243    }
5244
5245    // -- Misc stubs --
5246
5247    fn start_live_tail(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5248        let body = body_json(req);
5249        validate_required("logGroupIdentifiers", &body["logGroupIdentifiers"])?;
5250        validate_optional_string_length(
5251            "logEventFilterPattern",
5252            body["logEventFilterPattern"].as_str(),
5253            0,
5254            1024,
5255        )?;
5256        let session_id = uuid::Uuid::new_v4().to_string();
5257        Ok(AwsResponse::json(
5258            StatusCode::OK,
5259            serde_json::to_string(&json!({
5260                "responseStream": {
5261                    "sessionStart": {
5262                        "sessionId": session_id,
5263                        "logGroupIdentifiers": [],
5264                    }
5265                }
5266            }))
5267            .unwrap(),
5268        ))
5269    }
5270
5271    fn list_log_groups_for_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5272        let body = body_json(req);
5273        let query_id = require_str(&body, "queryId")?;
5274        validate_string_length("queryId", query_id, 1, 256)?;
5275        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 50, 500)?;
5276        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5277        // Stub: return empty log group names
5278        Ok(AwsResponse::json(
5279            StatusCode::OK,
5280            serde_json::to_string(&json!({ "logGroupIdentifiers": [] })).unwrap(),
5281        ))
5282    }
5283
5284    fn list_aggregate_log_group_summaries(
5285        &self,
5286        req: &AwsRequest,
5287    ) -> Result<AwsResponse, AwsServiceError> {
5288        let body = body_json(req);
5289        validate_required("groupBy", &body["groupBy"])?;
5290        validate_optional_enum_value(
5291            "groupBy",
5292            &body["groupBy"],
5293            &[
5294                "DATA_SOURCE_NAME_TYPE_AND_FORMAT",
5295                "DATA_SOURCE_NAME_AND_TYPE",
5296            ],
5297        )?;
5298        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
5299        validate_optional_enum_value(
5300            "logGroupClass",
5301            &body["logGroupClass"],
5302            &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
5303        )?;
5304        validate_optional_string_length(
5305            "logGroupNamePattern",
5306            body["logGroupNamePattern"].as_str(),
5307            3,
5308            129,
5309        )?;
5310        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5311        // Stub: return empty summaries
5312        Ok(AwsResponse::json(
5313            StatusCode::OK,
5314            serde_json::to_string(&json!({ "aggregateLogGroupSummaries": [] })).unwrap(),
5315        ))
5316    }
5317
5318    fn list_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5319        let body = body_json(req);
5320        let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
5321        let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
5322        let limit = body["limit"].as_i64().unwrap_or(50) as usize;
5323        let next_token = body["nextToken"].as_str();
5324
5325        validate_optional_string_length(
5326            "logGroupNamePrefix",
5327            body["logGroupNamePrefix"].as_str(),
5328            1,
5329            512,
5330        )?;
5331        validate_optional_string_length(
5332            "logGroupNamePattern",
5333            body["logGroupNamePattern"].as_str(),
5334            3,
5335            129,
5336        )?;
5337        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 1000)?;
5338        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5339        validate_optional_enum_value(
5340            "logGroupClass",
5341            &body["logGroupClass"],
5342            &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
5343        )?;
5344
5345        let state = self.state.read();
5346        let mut groups: Vec<&LogGroup> = state
5347            .log_groups
5348            .values()
5349            .filter(|g| {
5350                (prefix.is_empty() || g.name.starts_with(prefix))
5351                    && (pattern.is_empty() || g.name.contains(pattern))
5352            })
5353            .collect();
5354        groups.sort_by(|a, b| a.name.cmp(&b.name));
5355
5356        let start_idx = if let Some(token) = next_token {
5357            groups
5358                .iter()
5359                .position(|g| g.name.as_str() > token)
5360                .unwrap_or(groups.len())
5361        } else {
5362            0
5363        };
5364
5365        let page = &groups[start_idx..];
5366        let has_more = page.len() > limit;
5367        let page = if has_more { &page[..limit] } else { page };
5368
5369        // ListLogGroups returns LogGroupSummary (logGroupName, logGroupArn, logGroupClass only)
5370        let log_groups: Vec<Value> = page
5371            .iter()
5372            .map(|g| {
5373                let log_group_arn = g.arn.trim_end_matches(":*").to_string();
5374                json!({
5375                    "logGroupName": g.name,
5376                    "logGroupArn": log_group_arn,
5377                    "logGroupClass": "STANDARD",
5378                })
5379            })
5380            .collect();
5381
5382        let mut result = json!({ "logGroups": log_groups });
5383        if has_more {
5384            if let Some(last) = page.last() {
5385                result["nextToken"] = json!(last.name);
5386            }
5387        }
5388
5389        Ok(AwsResponse::json(
5390            StatusCode::OK,
5391            serde_json::to_string(&result).unwrap(),
5392        ))
5393    }
5394
5395    fn put_bearer_token_authentication(
5396        &self,
5397        req: &AwsRequest,
5398    ) -> Result<AwsResponse, AwsServiceError> {
5399        let body = body_json(req);
5400        validate_required(
5401            "bearerTokenAuthenticationEnabled",
5402            &body["bearerTokenAuthenticationEnabled"],
5403        )?;
5404        let log_group_identifier = require_str(&body, "logGroupIdentifier")?;
5405        validate_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
5406        let enabled = body["bearerTokenAuthenticationEnabled"]
5407            .as_bool()
5408            .unwrap_or(false);
5409
5410        let mut state = self.state.write();
5411        state
5412            .bearer_token_auth
5413            .insert(log_group_identifier.to_string(), enabled);
5414        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5415    }
5416
5417    fn get_log_object(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5418        let body = body_json(req);
5419        validate_required("logObjectPointer", &body["logObjectPointer"])?;
5420        validate_optional_string_length(
5421            "logObjectPointer",
5422            body["logObjectPointer"].as_str(),
5423            1,
5424            512,
5425        )?;
5426        // Stub: return empty (fieldStream is streaming, represented as empty object)
5427        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5428    }
5429
5430    fn get_log_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5431        let body = body_json(req);
5432        validate_required("dataSourceName", &body["dataSourceName"])?;
5433        validate_required("dataSourceType", &body["dataSourceType"])?;
5434        // Stub: return empty log fields
5435        Ok(AwsResponse::json(
5436            StatusCode::OK,
5437            serde_json::to_string(&json!({ "logFields": [] })).unwrap(),
5438        ))
5439    }
5440
5441    fn associate_source_to_s3_table_integration(
5442        &self,
5443        req: &AwsRequest,
5444    ) -> Result<AwsResponse, AwsServiceError> {
5445        let body = body_json(req);
5446        validate_required("dataSource", &body["dataSource"])?;
5447        let integration_arn = require_str(&body, "integrationArn")?;
5448        let data_source = body["dataSource"].clone();
5449        let source_id = data_source
5450            .as_object()
5451            .and_then(|o| o.get("resourceArn"))
5452            .and_then(|v| v.as_str())
5453            .unwrap_or("unknown")
5454            .to_string();
5455
5456        let mut state = self.state.write();
5457        state
5458            .s3_table_sources
5459            .entry(integration_arn.to_string())
5460            .or_default()
5461            .push(source_id);
5462        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5463    }
5464
5465    fn list_sources_for_s3_table_integration(
5466        &self,
5467        req: &AwsRequest,
5468    ) -> Result<AwsResponse, AwsServiceError> {
5469        let body = body_json(req);
5470        let integration_arn = require_str(&body, "integrationArn")?;
5471        validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 100)?;
5472        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5473
5474        let state = self.state.read();
5475        let sources: Vec<Value> = state
5476            .s3_table_sources
5477            .get(integration_arn)
5478            .map(|sources| {
5479                sources
5480                    .iter()
5481                    .map(|s| {
5482                        json!({
5483                            "identifier": s,
5484                            "status": "ACTIVE",
5485                        })
5486                    })
5487                    .collect()
5488            })
5489            .unwrap_or_default();
5490        Ok(AwsResponse::json(
5491            StatusCode::OK,
5492            serde_json::to_string(&json!({ "sources": sources })).unwrap(),
5493        ))
5494    }
5495
5496    fn disassociate_source_from_s3_table_integration(
5497        &self,
5498        req: &AwsRequest,
5499    ) -> Result<AwsResponse, AwsServiceError> {
5500        let body = body_json(req);
5501        let identifier = require_str(&body, "identifier")?;
5502        validate_string_length("identifier", identifier, 1, 2048)?;
5503        // No-op stub (we don't track detailed enough to remove specific sources)
5504        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5505    }
5506
5507    fn update_delivery_configuration(
5508        &self,
5509        req: &AwsRequest,
5510    ) -> Result<AwsResponse, AwsServiceError> {
5511        let body = body_json(req);
5512        let id = require_str(&body, "id")?;
5513
5514        let state = self.state.read();
5515        if !state.deliveries.contains_key(id) {
5516            return Err(AwsServiceError::aws_error(
5517                StatusCode::NOT_FOUND,
5518                "ResourceNotFoundException",
5519                format!("Delivery not found: {id}"),
5520            ));
5521        }
5522        drop(state);
5523
5524        // No-op: delivery configuration update is accepted but not stored
5525        Ok(AwsResponse::json(StatusCode::OK, "{}"))
5526    }
5527
5528    fn describe_configuration_templates(
5529        &self,
5530        req: &AwsRequest,
5531    ) -> Result<AwsResponse, AwsServiceError> {
5532        let body = body_json(req);
5533        validate_optional_string_length("service", body["service"].as_str(), 1, 255)?;
5534        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5535        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
5536        // Stub: return empty configuration templates
5537        Ok(AwsResponse::json(
5538            StatusCode::OK,
5539            serde_json::to_string(&json!({ "configurationTemplates": [] })).unwrap(),
5540        ))
5541    }
5542}
5543
5544/// Resolve log group name from either logGroupName or resourceIdentifier.
5545/// resourceIdentifier can be a log group name or an ARN.
5546fn resolve_log_group_name(
5547    log_group_name: Option<&str>,
5548    resource_identifier: Option<&str>,
5549) -> Result<String, AwsServiceError> {
5550    if let Some(identifier) = resource_identifier {
5551        if identifier.starts_with("arn:") {
5552            extract_log_group_from_arn(identifier).ok_or_else(|| {
5553                AwsServiceError::aws_error(
5554                    StatusCode::BAD_REQUEST,
5555                    "InvalidParameterException",
5556                    format!("Invalid ARN: {identifier}"),
5557                )
5558            })
5559        } else {
5560            Ok(identifier.to_string())
5561        }
5562    } else if let Some(name) = log_group_name {
5563        Ok(name.to_string())
5564    } else {
5565        Err(AwsServiceError::aws_error(
5566            StatusCode::BAD_REQUEST,
5567            "InvalidParameterException",
5568            "Either logGroupName or resourceIdentifier is required",
5569        ))
5570    }
5571}
5572
5573/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
5574fn extract_log_group_from_arn(arn: &str) -> Option<String> {
5575    // arn:aws:logs:region:account:log-group:name:*
5576    let parts: Vec<&str> = arn.splitn(7, ':').collect();
5577    if parts.len() >= 7 && parts[5] == "log-group" {
5578        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
5579        Some(name.to_string())
5580    } else {
5581        None
5582    }
5583}
5584
5585/// CloudWatch Logs filter pattern matching.
5586///
5587/// Rules:
5588/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
5589/// - Quoted string `"foo bar"` matches the exact substring
5590/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
5591/// - Single unquoted word `foo` is a simple substring match
5592fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
5593    let pattern = pattern.trim();
5594
5595    // Empty pattern matches everything
5596    if pattern.is_empty() {
5597        return true;
5598    }
5599
5600    // JSON/metric filter patterns (start with { or [) - we don't parse these, match everything
5601    if pattern.starts_with('{') || pattern.starts_with('[') {
5602        return true;
5603    }
5604
5605    // Quoted pattern: exact substring match
5606    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
5607        let inner = &pattern[1..pattern.len() - 1];
5608        return message.contains(inner);
5609    }
5610
5611    // Multiple words: all must be present
5612    let words: Vec<&str> = pattern.split_whitespace().collect();
5613    words.iter().all(|word| message.contains(word))
5614}
5615
5616#[cfg(test)]
5617mod tests {
5618    use super::*;
5619    use crate::state::LogsState;
5620    use bytes::Bytes;
5621    use http::{HeaderMap, Method};
5622    use std::collections::HashMap;
5623    use std::sync::Arc;
5624
5625    fn make_service() -> LogsService {
5626        let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
5627            "123456789012",
5628            "us-east-1",
5629        )));
5630        LogsService::new(state)
5631    }
5632
5633    fn make_request(action: &str, body: Value) -> AwsRequest {
5634        AwsRequest {
5635            service: "logs".to_string(),
5636            action: action.to_string(),
5637            region: "us-east-1".to_string(),
5638            account_id: "123456789012".to_string(),
5639            request_id: "test-request-id".to_string(),
5640            headers: HeaderMap::new(),
5641            query_params: HashMap::new(),
5642            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
5643            path_segments: vec![],
5644            raw_path: "/".to_string(),
5645            method: Method::POST,
5646            is_query_protocol: false,
5647            access_key_id: None,
5648        }
5649    }
5650
5651    fn create_group(svc: &LogsService, name: &str) {
5652        let req = make_request("CreateLogGroup", json!({ "logGroupName": name }));
5653        svc.create_log_group(&req).unwrap();
5654    }
5655
5656    fn create_stream(svc: &LogsService, group: &str, stream: &str) {
5657        let req = make_request(
5658            "CreateLogStream",
5659            json!({ "logGroupName": group, "logStreamName": stream }),
5660        );
5661        svc.create_log_stream(&req).unwrap();
5662    }
5663
5664    fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
5665        let now = chrono::Utc::now().timestamp_millis();
5666        let events: Vec<Value> = messages
5667            .iter()
5668            .enumerate()
5669            .map(|(i, msg)| json!({ "timestamp": now + i as i64, "message": msg }))
5670            .collect();
5671        let req = make_request(
5672            "PutLogEvents",
5673            json!({
5674                "logGroupName": group,
5675                "logStreamName": stream,
5676                "logEvents": events,
5677            }),
5678        );
5679        svc.put_log_events(&req).unwrap();
5680    }
5681
5682    // ---- describe_log_groups: logGroupNamePattern ----
5683
5684    #[test]
5685    fn describe_log_groups_pattern_filters_by_substring() {
5686        let svc = make_service();
5687        create_group(&svc, "/app/web");
5688        create_group(&svc, "/app/api");
5689        create_group(&svc, "/system/metrics");
5690
5691        let req = make_request("DescribeLogGroups", json!({ "logGroupNamePattern": "app" }));
5692        let resp = svc.describe_log_groups(&req).unwrap();
5693        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5694        let names: Vec<&str> = body["logGroups"]
5695            .as_array()
5696            .unwrap()
5697            .iter()
5698            .map(|g| g["logGroupName"].as_str().unwrap())
5699            .collect();
5700        assert_eq!(names.len(), 2);
5701        assert!(names.contains(&"/app/web"));
5702        assert!(names.contains(&"/app/api"));
5703    }
5704
5705    #[test]
5706    fn describe_log_groups_pattern_empty_returns_all() {
5707        let svc = make_service();
5708        create_group(&svc, "/app/web");
5709        create_group(&svc, "/system/metrics");
5710
5711        let req = make_request("DescribeLogGroups", json!({}));
5712        let resp = svc.describe_log_groups(&req).unwrap();
5713        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5714        assert_eq!(body["logGroups"].as_array().unwrap().len(), 2);
5715    }
5716
5717    // ---- filter_log_events: logGroupIdentifier ----
5718
5719    #[test]
5720    fn filter_log_events_uses_log_group_identifier_as_name() {
5721        let svc = make_service();
5722        create_group(&svc, "my-group");
5723        create_stream(&svc, "my-group", "stream-1");
5724        put_events(&svc, "my-group", "stream-1", &["hello"]);
5725
5726        let req = make_request(
5727            "FilterLogEvents",
5728            json!({ "logGroupIdentifier": "my-group" }),
5729        );
5730        let resp = svc.filter_log_events(&req).unwrap();
5731        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5732        assert_eq!(body["events"].as_array().unwrap().len(), 1);
5733    }
5734
5735    #[test]
5736    fn filter_log_events_uses_log_group_identifier_as_arn() {
5737        let svc = make_service();
5738        create_group(&svc, "my-group");
5739        create_stream(&svc, "my-group", "stream-1");
5740        put_events(&svc, "my-group", "stream-1", &["hello"]);
5741
5742        let req = make_request(
5743            "FilterLogEvents",
5744            json!({ "logGroupIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*" }),
5745        );
5746        let resp = svc.filter_log_events(&req).unwrap();
5747        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5748        assert_eq!(body["events"].as_array().unwrap().len(), 1);
5749    }
5750
5751    #[test]
5752    fn filter_log_events_errors_without_group_name_or_identifier() {
5753        let svc = make_service();
5754        let req = make_request("FilterLogEvents", json!({}));
5755        assert!(svc.filter_log_events(&req).is_err());
5756    }
5757
5758    // ---- filter_log_events: logStreamNamePrefix ----
5759
5760    #[test]
5761    fn filter_log_events_filters_by_stream_name_prefix() {
5762        let svc = make_service();
5763        create_group(&svc, "grp");
5764        create_stream(&svc, "grp", "web-1");
5765        create_stream(&svc, "grp", "web-2");
5766        create_stream(&svc, "grp", "api-1");
5767        put_events(&svc, "grp", "web-1", &["a"]);
5768        put_events(&svc, "grp", "web-2", &["b"]);
5769        put_events(&svc, "grp", "api-1", &["c"]);
5770
5771        let req = make_request(
5772            "FilterLogEvents",
5773            json!({ "logGroupName": "grp", "logStreamNamePrefix": "web" }),
5774        );
5775        let resp = svc.filter_log_events(&req).unwrap();
5776        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5777        let events = body["events"].as_array().unwrap();
5778        assert_eq!(events.len(), 2);
5779        for e in events {
5780            assert!(e["logStreamName"].as_str().unwrap().starts_with("web"));
5781        }
5782    }
5783
5784    // ---- create_export_task: taskName + logStreamNamePrefix stored ----
5785
5786    #[test]
5787    fn create_export_task_stores_task_name_and_stream_prefix() {
5788        let svc = make_service();
5789        create_group(&svc, "grp");
5790
5791        let req = make_request(
5792            "CreateExportTask",
5793            json!({
5794                "logGroupName": "grp",
5795                "from": 0,
5796                "to": 1000,
5797                "destination": "my-bucket",
5798                "taskName": "my-export",
5799                "logStreamNamePrefix": "web-",
5800            }),
5801        );
5802        let resp = svc.create_export_task(&req).unwrap();
5803        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5804        let task_id = body["taskId"].as_str().unwrap();
5805
5806        let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5807        let resp = svc.describe_export_tasks(&req).unwrap();
5808        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809        let task = &body["exportTasks"][0];
5810        assert_eq!(task["taskName"].as_str().unwrap(), "my-export");
5811        assert_eq!(task["logStreamNamePrefix"].as_str().unwrap(), "web-");
5812    }
5813
5814    #[test]
5815    fn create_export_task_omits_optional_fields_when_not_provided() {
5816        let svc = make_service();
5817        create_group(&svc, "grp");
5818
5819        let req = make_request(
5820            "CreateExportTask",
5821            json!({
5822                "logGroupName": "grp",
5823                "from": 0,
5824                "to": 1000,
5825                "destination": "my-bucket",
5826            }),
5827        );
5828        let resp = svc.create_export_task(&req).unwrap();
5829        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5830        let task_id = body["taskId"].as_str().unwrap();
5831
5832        let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5833        let resp = svc.describe_export_tasks(&req).unwrap();
5834        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5835        let task = &body["exportTasks"][0];
5836        assert!(task.get("taskName").is_none() || task["taskName"].is_null());
5837        assert!(task.get("logStreamNamePrefix").is_none() || task["logStreamNamePrefix"].is_null());
5838    }
5839
5840    // ---- associate_kms_key / disassociate_kms_key: resourceIdentifier ----
5841
5842    #[test]
5843    fn associate_kms_key_via_resource_identifier_arn() {
5844        let svc = make_service();
5845        create_group(&svc, "grp");
5846
5847        let req = make_request(
5848            "AssociateKmsKey",
5849            json!({
5850                "resourceIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:grp:*",
5851                "kmsKeyId": "arn:aws:kms:us-east-1:123456789012:key/abc-123",
5852            }),
5853        );
5854        svc.associate_kms_key(&req).unwrap();
5855
5856        let state = svc.state.read();
5857        assert_eq!(
5858            state.log_groups["grp"].kms_key_id.as_deref(),
5859            Some("arn:aws:kms:us-east-1:123456789012:key/abc-123")
5860        );
5861    }
5862
5863    #[test]
5864    fn disassociate_kms_key_via_resource_identifier_name() {
5865        let svc = make_service();
5866        create_group(&svc, "grp");
5867
5868        // First associate
5869        let req = make_request(
5870            "AssociateKmsKey",
5871            json!({ "logGroupName": "grp", "kmsKeyId": "some-key" }),
5872        );
5873        svc.associate_kms_key(&req).unwrap();
5874
5875        // Disassociate via resourceIdentifier (plain name)
5876        let req = make_request("DisassociateKmsKey", json!({ "resourceIdentifier": "grp" }));
5877        svc.disassociate_kms_key(&req).unwrap();
5878
5879        let state = svc.state.read();
5880        assert!(state.log_groups["grp"].kms_key_id.is_none());
5881    }
5882
5883    // ---- describe_query_definitions: queryDefinitionNamePrefix ----
5884
5885    #[test]
5886    fn describe_query_definitions_filters_by_name_prefix() {
5887        let svc = make_service();
5888
5889        // Create some query definitions
5890        for name in &["error-queries-1", "error-queries-2", "latency-queries-1"] {
5891            let req = make_request(
5892                "PutQueryDefinition",
5893                json!({
5894                    "name": name,
5895                    "queryString": "fields @timestamp | limit 20",
5896                }),
5897            );
5898            svc.put_query_definition(&req).unwrap();
5899        }
5900
5901        let req = make_request(
5902            "DescribeQueryDefinitions",
5903            json!({ "queryDefinitionNamePrefix": "error" }),
5904        );
5905        let resp = svc.describe_query_definitions(&req).unwrap();
5906        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5907        let defs = body["queryDefinitions"].as_array().unwrap();
5908        assert_eq!(defs.len(), 2);
5909        for d in defs {
5910            assert!(d["name"].as_str().unwrap().starts_with("error"));
5911        }
5912    }
5913
5914    #[test]
5915    fn describe_query_definitions_no_prefix_returns_all() {
5916        let svc = make_service();
5917
5918        for name in &["a", "b", "c"] {
5919            let req = make_request(
5920                "PutQueryDefinition",
5921                json!({ "name": name, "queryString": "fields @timestamp" }),
5922            );
5923            svc.put_query_definition(&req).unwrap();
5924        }
5925
5926        let req = make_request("DescribeQueryDefinitions", json!({}));
5927        let resp = svc.describe_query_definitions(&req).unwrap();
5928        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5929        assert_eq!(body["queryDefinitions"].as_array().unwrap().len(), 3);
5930    }
5931
5932    // ---- extract_log_group_from_arn ----
5933
5934    #[test]
5935    fn put_delivery_destination_includes_empty_destination_resource_arn() {
5936        let svc = make_service();
5937        let req = make_request(
5938            "PutDeliveryDestination",
5939            json!({
5940                "name": "my-dest",
5941                "deliveryDestinationConfiguration": {}
5942            }),
5943        );
5944        let resp = svc.put_delivery_destination(&req).unwrap();
5945        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5946        let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5947        // destinationResourceArn should always be present as a string (Smithy requirement)
5948        assert_eq!(
5949            config["destinationResourceArn"].as_str().unwrap(),
5950            "",
5951            "destinationResourceArn should be an empty string when not set"
5952        );
5953    }
5954
5955    #[test]
5956    fn put_delivery_destination_includes_destination_resource_arn_when_set() {
5957        let svc = make_service();
5958        let req = make_request(
5959            "PutDeliveryDestination",
5960            json!({
5961                "name": "my-dest",
5962                "deliveryDestinationConfiguration": {
5963                    "destinationResourceArn": "arn:aws:s3:::my-bucket"
5964                }
5965            }),
5966        );
5967        let resp = svc.put_delivery_destination(&req).unwrap();
5968        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969        let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5970        assert_eq!(
5971            config["destinationResourceArn"].as_str().unwrap(),
5972            "arn:aws:s3:::my-bucket"
5973        );
5974    }
5975
5976    #[test]
5977    fn extract_log_group_from_arn_strips_wildcard_suffix() {
5978        let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*";
5979        assert_eq!(
5980            extract_log_group_from_arn(arn),
5981            Some("my-group".to_string())
5982        );
5983    }
5984
5985    #[test]
5986    fn extract_log_group_from_arn_without_wildcard() {
5987        let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group";
5988        assert_eq!(
5989            extract_log_group_from_arn(arn),
5990            Some("my-group".to_string())
5991        );
5992    }
5993
5994    #[test]
5995    fn extract_log_group_from_arn_invalid() {
5996        assert_eq!(extract_log_group_from_arn("not-an-arn"), None);
5997    }
5998
5999    // ---- Account policies ----
6000
6001    #[test]
6002    fn account_policy_lifecycle() {
6003        let svc = make_service();
6004
6005        let req = make_request(
6006            "PutAccountPolicy",
6007            json!({
6008                "policyName": "test-policy",
6009                "policyType": "DATA_PROTECTION_POLICY",
6010                "policyDocument": "{\"Name\":\"test\"}",
6011            }),
6012        );
6013        let resp = svc.put_account_policy(&req).unwrap();
6014        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6015        assert_eq!(body["accountPolicy"]["policyName"], "test-policy");
6016
6017        let req = make_request(
6018            "DescribeAccountPolicies",
6019            json!({ "policyType": "DATA_PROTECTION_POLICY" }),
6020        );
6021        let resp = svc.describe_account_policies(&req).unwrap();
6022        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6023        assert_eq!(body["accountPolicies"].as_array().unwrap().len(), 1);
6024
6025        let req = make_request(
6026            "DeleteAccountPolicy",
6027            json!({
6028                "policyName": "test-policy",
6029                "policyType": "DATA_PROTECTION_POLICY",
6030            }),
6031        );
6032        svc.delete_account_policy(&req).unwrap();
6033
6034        let req = make_request(
6035            "DescribeAccountPolicies",
6036            json!({ "policyType": "DATA_PROTECTION_POLICY" }),
6037        );
6038        let resp = svc.describe_account_policies(&req).unwrap();
6039        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6040        assert!(body["accountPolicies"].as_array().unwrap().is_empty());
6041    }
6042
6043    // ---- Data protection policy ----
6044
6045    #[test]
6046    fn data_protection_policy_lifecycle() {
6047        let svc = make_service();
6048        create_group(&svc, "dp-group");
6049
6050        let req = make_request(
6051            "PutDataProtectionPolicy",
6052            json!({
6053                "logGroupIdentifier": "dp-group",
6054                "policyDocument": "{\"Name\":\"dp\"}",
6055            }),
6056        );
6057        svc.put_data_protection_policy(&req).unwrap();
6058
6059        let req = make_request(
6060            "GetDataProtectionPolicy",
6061            json!({ "logGroupIdentifier": "dp-group" }),
6062        );
6063        let resp = svc.get_data_protection_policy(&req).unwrap();
6064        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6065        assert_eq!(body["policyDocument"], "{\"Name\":\"dp\"}");
6066
6067        let req = make_request(
6068            "DeleteDataProtectionPolicy",
6069            json!({ "logGroupIdentifier": "dp-group" }),
6070        );
6071        svc.delete_data_protection_policy(&req).unwrap();
6072
6073        let req = make_request(
6074            "GetDataProtectionPolicy",
6075            json!({ "logGroupIdentifier": "dp-group" }),
6076        );
6077        let resp = svc.get_data_protection_policy(&req).unwrap();
6078        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6079        assert!(body.get("policyDocument").is_none());
6080    }
6081
6082    // ---- Index policies ----
6083
6084    #[test]
6085    fn index_policy_lifecycle() {
6086        let svc = make_service();
6087        create_group(&svc, "idx-group");
6088
6089        let req = make_request(
6090            "PutIndexPolicy",
6091            json!({
6092                "logGroupIdentifier": "idx-group",
6093                "policyDocument": "{\"Fields\":[\"field1\"]}",
6094            }),
6095        );
6096        svc.put_index_policy(&req).unwrap();
6097
6098        let req = make_request(
6099            "DescribeIndexPolicies",
6100            json!({ "logGroupIdentifiers": ["idx-group"] }),
6101        );
6102        let resp = svc.describe_index_policies(&req).unwrap();
6103        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6104        assert_eq!(body["indexPolicies"].as_array().unwrap().len(), 1);
6105
6106        let req = make_request(
6107            "DeleteIndexPolicy",
6108            json!({
6109                "logGroupIdentifier": "idx-group",
6110            }),
6111        );
6112        svc.delete_index_policy(&req).unwrap();
6113    }
6114
6115    // ---- Transformers ----
6116
6117    #[test]
6118    fn transformer_lifecycle() {
6119        let svc = make_service();
6120        create_group(&svc, "tx-group");
6121
6122        let req = make_request(
6123            "PutTransformer",
6124            json!({
6125                "logGroupIdentifier": "tx-group",
6126                "transformerConfig": [{"addKeys":{"entries":[{"key":"new","value":"val"}]}}],
6127            }),
6128        );
6129        svc.put_transformer(&req).unwrap();
6130
6131        let req = make_request(
6132            "GetTransformer",
6133            json!({ "logGroupIdentifier": "tx-group" }),
6134        );
6135        let resp = svc.get_transformer(&req).unwrap();
6136        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6137        assert!(body["transformerConfig"].is_array());
6138
6139        let req = make_request(
6140            "DeleteTransformer",
6141            json!({ "logGroupIdentifier": "tx-group" }),
6142        );
6143        svc.delete_transformer(&req).unwrap();
6144    }
6145
6146    #[test]
6147    fn test_transformer_returns_transformed_events() {
6148        let svc = make_service();
6149
6150        let req = make_request(
6151            "TestTransformer",
6152            json!({
6153                "transformerConfig": [{"addKeys":{"entries":[]}}],
6154                "logEventMessages": ["hello", "world"],
6155            }),
6156        );
6157        let resp = svc.test_transformer(&req).unwrap();
6158        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6159        assert_eq!(body["transformedLogs"].as_array().unwrap().len(), 2);
6160    }
6161
6162    // ---- Anomaly detectors ----
6163
6164    #[test]
6165    fn anomaly_detector_lifecycle() {
6166        let svc = make_service();
6167
6168        let req = make_request(
6169            "CreateLogAnomalyDetector",
6170            json!({
6171                "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
6172                "detectorName": "my-detector",
6173            }),
6174        );
6175        let resp = svc.create_log_anomaly_detector(&req).unwrap();
6176        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6177        let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
6178
6179        let req = make_request(
6180            "GetLogAnomalyDetector",
6181            json!({ "anomalyDetectorArn": &arn }),
6182        );
6183        let resp = svc.get_log_anomaly_detector(&req).unwrap();
6184        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6185        assert_eq!(body["detectorName"], "my-detector");
6186
6187        let req = make_request("ListLogAnomalyDetectors", json!({}));
6188        let resp = svc.list_log_anomaly_detectors(&req).unwrap();
6189        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6190        assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
6191
6192        let req = make_request(
6193            "UpdateLogAnomalyDetector",
6194            json!({ "anomalyDetectorArn": &arn, "enabled": false }),
6195        );
6196        svc.update_log_anomaly_detector(&req).unwrap();
6197
6198        let req = make_request(
6199            "DeleteLogAnomalyDetector",
6200            json!({ "anomalyDetectorArn": &arn }),
6201        );
6202        svc.delete_log_anomaly_detector(&req).unwrap();
6203    }
6204
6205    // ---- Misc operations ----
6206
6207    #[test]
6208    fn get_log_group_fields_returns_stub() {
6209        let svc = make_service();
6210        create_group(&svc, "fields-group");
6211
6212        let req = make_request(
6213            "GetLogGroupFields",
6214            json!({ "logGroupName": "fields-group" }),
6215        );
6216        let resp = svc.get_log_group_fields(&req).unwrap();
6217        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218        assert_eq!(body["logGroupFields"].as_array().unwrap().len(), 2);
6219    }
6220
6221    #[test]
6222    fn test_metric_filter_matches() {
6223        let svc = make_service();
6224
6225        let req = make_request(
6226            "TestMetricFilter",
6227            json!({
6228                "filterPattern": "ERROR",
6229                "logEventMessages": ["ERROR: oops", "INFO: ok", "ERROR: again"],
6230            }),
6231        );
6232        let resp = svc.test_metric_filter(&req).unwrap();
6233        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6234        assert_eq!(body["matches"].as_array().unwrap().len(), 2);
6235    }
6236
6237    #[test]
6238    fn stop_query_marks_as_cancelled() {
6239        let svc = make_service();
6240        create_group(&svc, "sq-group");
6241
6242        let req = make_request(
6243            "StartQuery",
6244            json!({
6245                "logGroupName": "sq-group",
6246                "startTime": 0,
6247                "endTime": 9999999999i64,
6248                "queryString": "fields @timestamp",
6249            }),
6250        );
6251        let resp = svc.start_query(&req).unwrap();
6252        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6253        let qid = body["queryId"].as_str().unwrap().to_string();
6254
6255        // Manually set query status to Running so we can test cancellation
6256        {
6257            let mut state = svc.state.write();
6258            state.queries.get_mut(&qid).unwrap().status = "Running".to_string();
6259        }
6260
6261        let req = make_request("StopQuery", json!({ "queryId": &qid }));
6262        let resp = svc.stop_query(&req).unwrap();
6263        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6264        assert_eq!(body["success"], true);
6265
6266        let state = svc.state.read();
6267        assert_eq!(state.queries[&qid].status, "Cancelled");
6268    }
6269
6270    #[test]
6271    fn put_log_group_deletion_protection() {
6272        let svc = make_service();
6273        create_group(&svc, "prot-group");
6274
6275        let req = make_request(
6276            "PutLogGroupDeletionProtection",
6277            json!({
6278                "logGroupIdentifier": "prot-group",
6279                "deletionProtectionEnabled": true,
6280            }),
6281        );
6282        svc.put_log_group_deletion_protection(&req).unwrap();
6283
6284        let state = svc.state.read();
6285        assert!(state.log_groups["prot-group"].deletion_protection);
6286    }
6287
6288    #[test]
6289    fn get_log_record_returns_empty_stub() {
6290        let svc = make_service();
6291
6292        let req = make_request(
6293            "GetLogRecord",
6294            json!({ "logRecordPointer": "some-pointer" }),
6295        );
6296        let resp = svc.get_log_record(&req).unwrap();
6297        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6298        assert!(body["logRecord"].is_object());
6299    }
6300
6301    #[test]
6302    fn list_anomalies_returns_empty() {
6303        let svc = make_service();
6304
6305        let req = make_request("ListAnomalies", json!({}));
6306        let resp = svc.list_anomalies(&req).unwrap();
6307        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6308        assert!(body["anomalies"].as_array().unwrap().is_empty());
6309    }
6310
6311    #[test]
6312    fn update_anomaly_noop() {
6313        let svc = make_service();
6314        let req = make_request(
6315            "UpdateAnomaly",
6316            json!({"anomalyDetectorArn": "arn:aws:logs:us-east-1:123456789012:anomaly-detector:test"}),
6317        );
6318        svc.update_anomaly(&req).unwrap();
6319    }
6320
6321    // -- Import tasks --
6322
6323    #[test]
6324    fn import_task_lifecycle() {
6325        let svc = make_service();
6326
6327        let req = make_request(
6328            "CreateImportTask",
6329            json!({
6330                "importSourceArn": "arn:aws:s3:::my-bucket/logs",
6331                "importRoleArn": "arn:aws:iam::123456789012:role/import-role"
6332            }),
6333        );
6334        let resp = svc.create_import_task(&req).unwrap();
6335        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6336        let import_id = body["importId"].as_str().unwrap().to_string();
6337
6338        let req = make_request("DescribeImportTasks", json!({}));
6339        let resp = svc.describe_import_tasks(&req).unwrap();
6340        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6341        assert_eq!(body["imports"].as_array().unwrap().len(), 1);
6342
6343        let req = make_request(
6344            "DescribeImportTaskBatches",
6345            json!({ "importId": import_id }),
6346        );
6347        let resp = svc.describe_import_task_batches(&req).unwrap();
6348        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6349        assert!(body["importBatches"].as_array().unwrap().is_empty());
6350
6351        let req = make_request("CancelImportTask", json!({ "importId": import_id }));
6352        svc.cancel_import_task(&req).unwrap();
6353
6354        let req = make_request("DescribeImportTasks", json!({}));
6355        let resp = svc.describe_import_tasks(&req).unwrap();
6356        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6357        assert_eq!(
6358            body["imports"][0]["importStatus"].as_str().unwrap(),
6359            "CANCELLED"
6360        );
6361    }
6362
6363    // -- Integrations --
6364
6365    #[test]
6366    fn integration_lifecycle() {
6367        let svc = make_service();
6368
6369        let req = make_request(
6370            "PutIntegration",
6371            json!({
6372                "integrationName": "test-int",
6373                "integrationType": "OPENSEARCH",
6374                "resourceConfig": { "openSearchResourceConfig": {} }
6375            }),
6376        );
6377        svc.put_integration(&req).unwrap();
6378
6379        let req = make_request("GetIntegration", json!({ "integrationName": "test-int" }));
6380        let resp = svc.get_integration(&req).unwrap();
6381        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6382        assert_eq!(body["integrationName"].as_str().unwrap(), "test-int");
6383
6384        let req = make_request("ListIntegrations", json!({}));
6385        let resp = svc.list_integrations(&req).unwrap();
6386        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6387        assert_eq!(body["integrationSummaries"].as_array().unwrap().len(), 1);
6388
6389        let req = make_request(
6390            "DeleteIntegration",
6391            json!({ "integrationName": "test-int" }),
6392        );
6393        svc.delete_integration(&req).unwrap();
6394
6395        let req = make_request("ListIntegrations", json!({}));
6396        let resp = svc.list_integrations(&req).unwrap();
6397        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6398        assert!(body["integrationSummaries"].as_array().unwrap().is_empty());
6399    }
6400
6401    // -- Lookup tables --
6402
6403    #[test]
6404    fn lookup_table_lifecycle() {
6405        let svc = make_service();
6406
6407        let req = make_request(
6408            "CreateLookupTable",
6409            json!({
6410                "lookupTableName": "test-table",
6411                "tableBody": "key,value\na,b"
6412            }),
6413        );
6414        let resp = svc.create_lookup_table(&req).unwrap();
6415        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6416        let arn = body["lookupTableArn"].as_str().unwrap().to_string();
6417
6418        let req = make_request("GetLookupTable", json!({ "lookupTableArn": arn }));
6419        let resp = svc.get_lookup_table(&req).unwrap();
6420        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6421        assert_eq!(body["lookupTableName"].as_str().unwrap(), "test-table");
6422
6423        let req = make_request("DescribeLookupTables", json!({}));
6424        let resp = svc.describe_lookup_tables(&req).unwrap();
6425        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6426        assert_eq!(body["lookupTables"].as_array().unwrap().len(), 1);
6427
6428        let req = make_request(
6429            "UpdateLookupTable",
6430            json!({ "lookupTableArn": arn, "tableBody": "key,value\nc,d" }),
6431        );
6432        svc.update_lookup_table(&req).unwrap();
6433
6434        let req = make_request("DeleteLookupTable", json!({ "lookupTableArn": arn }));
6435        svc.delete_lookup_table(&req).unwrap();
6436
6437        let req = make_request("DescribeLookupTables", json!({}));
6438        let resp = svc.describe_lookup_tables(&req).unwrap();
6439        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6440        assert!(body["lookupTables"].as_array().unwrap().is_empty());
6441    }
6442
6443    // -- Scheduled queries --
6444
6445    #[test]
6446    fn scheduled_query_lifecycle() {
6447        let svc = make_service();
6448
6449        let req = make_request(
6450            "CreateScheduledQuery",
6451            json!({
6452                "name": "test-sq",
6453                "queryString": "fields @timestamp | limit 10",
6454                "queryLanguage": "CWLI",
6455                "scheduleExpression": "rate(1 hour)",
6456                "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6457            }),
6458        );
6459        let resp = svc.create_scheduled_query(&req).unwrap();
6460        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6461        let arn = body["scheduledQueryArn"].as_str().unwrap().to_string();
6462
6463        let req = make_request("GetScheduledQuery", json!({ "identifier": arn }));
6464        let resp = svc.get_scheduled_query(&req).unwrap();
6465        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6466        assert_eq!(body["name"].as_str().unwrap(), "test-sq");
6467
6468        let req = make_request(
6469            "GetScheduledQueryHistory",
6470            json!({ "identifier": arn, "startTime": 0_i64, "endTime": 9999999999_i64 }),
6471        );
6472        let resp = svc.get_scheduled_query_history(&req).unwrap();
6473        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6474        assert!(body["triggerHistory"].as_array().unwrap().is_empty());
6475
6476        let req = make_request("ListScheduledQueries", json!({}));
6477        let resp = svc.list_scheduled_queries(&req).unwrap();
6478        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6479        assert_eq!(body["scheduledQueries"].as_array().unwrap().len(), 1);
6480
6481        let req = make_request(
6482            "UpdateScheduledQuery",
6483            json!({
6484                "identifier": arn,
6485                "queryString": "fields @message | limit 5",
6486                "queryLanguage": "CWLI",
6487                "scheduleExpression": "rate(2 hours)",
6488                "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6489            }),
6490        );
6491        svc.update_scheduled_query(&req).unwrap();
6492
6493        let req = make_request("DeleteScheduledQuery", json!({ "identifier": arn }));
6494        svc.delete_scheduled_query(&req).unwrap();
6495
6496        let req = make_request("ListScheduledQueries", json!({}));
6497        let resp = svc.list_scheduled_queries(&req).unwrap();
6498        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6499        assert!(body["scheduledQueries"].as_array().unwrap().is_empty());
6500    }
6501
6502    // -- Misc stubs --
6503
6504    #[test]
6505    fn start_live_tail_returns_session() {
6506        let svc = make_service();
6507        let req = make_request(
6508            "StartLiveTail",
6509            json!({ "logGroupIdentifiers": ["/test/group"] }),
6510        );
6511        let resp = svc.start_live_tail(&req).unwrap();
6512        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6513        assert!(body["responseStream"]["sessionStart"]["sessionId"]
6514            .as_str()
6515            .is_some());
6516    }
6517
6518    #[test]
6519    fn list_log_groups_delegates_to_describe() {
6520        let svc = make_service();
6521        create_group(&svc, "/test/list");
6522        let req = make_request("DescribeLogGroups", json!({}));
6523        let resp = svc.describe_log_groups(&req).unwrap();
6524        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6525        assert_eq!(body["logGroups"].as_array().unwrap().len(), 1);
6526    }
6527
6528    #[test]
6529    fn list_log_groups_for_query_returns_empty() {
6530        let svc = make_service();
6531        let req = make_request(
6532            "ListLogGroupsForQuery",
6533            json!({ "queryId": "some-query-id" }),
6534        );
6535        let resp = svc.list_log_groups_for_query(&req).unwrap();
6536        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6537        assert!(body["logGroupIdentifiers"].as_array().unwrap().is_empty());
6538    }
6539
6540    #[test]
6541    fn list_aggregate_log_group_summaries_returns_empty() {
6542        let svc = make_service();
6543        let req = make_request(
6544            "ListAggregateLogGroupSummaries",
6545            json!({ "groupBy": "DATA_SOURCE_NAME_AND_TYPE" }),
6546        );
6547        let resp = svc.list_aggregate_log_group_summaries(&req).unwrap();
6548        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6549        assert!(body["aggregateLogGroupSummaries"]
6550            .as_array()
6551            .unwrap()
6552            .is_empty());
6553    }
6554
6555    #[test]
6556    fn put_bearer_token_authentication_stores_flag() {
6557        let svc = make_service();
6558        create_group(&svc, "/test/bearer");
6559        let req = make_request(
6560            "PutBearerTokenAuthentication",
6561            json!({
6562                "logGroupIdentifier": "/test/bearer",
6563                "bearerTokenAuthenticationEnabled": true
6564            }),
6565        );
6566        svc.put_bearer_token_authentication(&req).unwrap();
6567    }
6568
6569    #[test]
6570    fn get_log_object_returns_stub() {
6571        let svc = make_service();
6572        let req = make_request(
6573            "GetLogObject",
6574            json!({ "logObjectPointer": "some-pointer" }),
6575        );
6576        let resp = svc.get_log_object(&req).unwrap();
6577        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6578        assert!(body.is_object());
6579    }
6580
6581    #[test]
6582    fn get_log_fields_returns_stub() {
6583        let svc = make_service();
6584        let req = make_request(
6585            "GetLogFields",
6586            json!({ "dataSourceName": "test", "dataSourceType": "CW_LOG" }),
6587        );
6588        let resp = svc.get_log_fields(&req).unwrap();
6589        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6590        assert!(body["logFields"].as_array().unwrap().is_empty());
6591    }
6592
6593    #[test]
6594    fn s3_table_integration_stubs() {
6595        let svc = make_service();
6596
6597        let req = make_request(
6598            "AssociateSourceToS3TableIntegration",
6599            json!({
6600                "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test",
6601                "dataSource": { "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:test" }
6602            }),
6603        );
6604        svc.associate_source_to_s3_table_integration(&req).unwrap();
6605
6606        let req = make_request(
6607            "ListSourcesForS3TableIntegration",
6608            json!({
6609                "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test"
6610            }),
6611        );
6612        let resp = svc.list_sources_for_s3_table_integration(&req).unwrap();
6613        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6614        assert_eq!(body["sources"].as_array().unwrap().len(), 1);
6615
6616        let req = make_request(
6617            "DisassociateSourceFromS3TableIntegration",
6618            json!({ "identifier": "arn:aws:logs:us-east-1:123456789012:integration:test" }),
6619        );
6620        svc.disassociate_source_from_s3_table_integration(&req)
6621            .unwrap();
6622    }
6623
6624    #[test]
6625    fn update_delivery_configuration_noop() {
6626        let svc = make_service();
6627        // First create a delivery setup
6628        let req = make_request(
6629            "PutDeliverySource",
6630            json!({
6631                "name": "test-ds",
6632                "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:dummy",
6633                "logType": "APPLICATION_LOGS"
6634            }),
6635        );
6636        svc.put_delivery_source(&req).unwrap();
6637
6638        let req = make_request(
6639            "PutDeliveryDestination",
6640            json!({
6641                "name": "test-dd",
6642                "deliveryDestinationConfiguration": {
6643                    "destinationResourceArn": "arn:aws:s3:::test-bucket"
6644                }
6645            }),
6646        );
6647        svc.put_delivery_destination(&req).unwrap();
6648
6649        let req = make_request(
6650            "CreateDelivery",
6651            json!({
6652                "deliverySourceName": "test-ds",
6653                "deliveryDestinationArn": "arn:aws:logs:us-east-1:123456789012:delivery-destination:test-dd"
6654            }),
6655        );
6656        let resp = svc.create_delivery(&req).unwrap();
6657        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6658        let delivery_id = body["delivery"]["id"].as_str().unwrap().to_string();
6659
6660        let req = make_request("UpdateDeliveryConfiguration", json!({ "id": delivery_id }));
6661        svc.update_delivery_configuration(&req).unwrap();
6662    }
6663
6664    #[test]
6665    fn describe_configuration_templates_returns_empty() {
6666        let svc = make_service();
6667        let req = make_request("DescribeConfigurationTemplates", json!({}));
6668        let resp = svc.describe_configuration_templates(&req).unwrap();
6669        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6670        assert!(body["configurationTemplates"]
6671            .as_array()
6672            .unwrap()
6673            .is_empty());
6674    }
6675}