1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
7use fakecloud_core::validation::*;
8
9use crate::state::{
10 AccountPolicy, AnomalyDetector, DataProtectionPolicy, Delivery, DeliveryDestination,
11 DeliverySource, Destination, ExportTask, ImportTask, IndexPolicy, Integration, LogEvent,
12 LogGroup, LogStream, LookupTable, MetricFilter, MetricTransformation, QueryDefinition,
13 QueryInfo, ResourcePolicy, ScheduledQuery, SharedLogsState, SubscriptionFilter, Transformer,
14};
15
16pub struct LogsService {
17 state: SharedLogsState,
18}
19
20impl LogsService {
21 pub fn new(state: SharedLogsState) -> Self {
22 Self { state }
23 }
24}
25
26#[async_trait]
27impl AwsService for LogsService {
28 fn service_name(&self) -> &str {
29 "logs"
30 }
31
32 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
33 match req.action.as_str() {
34 "CreateLogGroup" => self.create_log_group(&req),
35 "DeleteLogGroup" => self.delete_log_group(&req),
36 "DescribeLogGroups" => self.describe_log_groups(&req),
37 "CreateLogStream" => self.create_log_stream(&req),
38 "DeleteLogStream" => self.delete_log_stream(&req),
39 "DescribeLogStreams" => self.describe_log_streams(&req),
40 "PutLogEvents" => self.put_log_events(&req),
41 "GetLogEvents" => self.get_log_events(&req),
42 "FilterLogEvents" => self.filter_log_events(&req),
43 "TagLogGroup" => self.tag_log_group(&req),
44 "UntagLogGroup" => self.untag_log_group(&req),
45 "ListTagsLogGroup" => self.list_tags_log_group(&req),
46 "TagResource" => self.tag_resource(&req),
47 "UntagResource" => self.untag_resource(&req),
48 "ListTagsForResource" => self.list_tags_for_resource(&req),
49 "PutRetentionPolicy" => self.put_retention_policy(&req),
50 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
51 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
52 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
53 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
54 "PutMetricFilter" => self.put_metric_filter(&req),
55 "DescribeMetricFilters" => self.describe_metric_filters(&req),
56 "DeleteMetricFilter" => self.delete_metric_filter(&req),
57 "PutResourcePolicy" => self.put_resource_policy(&req),
58 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
59 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
60 "PutDestination" => self.put_destination(&req),
61 "DescribeDestinations" => self.describe_destinations(&req),
62 "DeleteDestination" => self.delete_destination(&req),
63 "PutDestinationPolicy" => self.put_destination_policy(&req),
64 "StartQuery" => self.start_query(&req),
65 "GetQueryResults" => self.get_query_results(&req),
66 "DescribeQueries" => self.describe_queries(&req),
67 "CreateExportTask" => self.create_export_task(&req),
68 "DescribeExportTasks" => self.describe_export_tasks(&req),
69 "CancelExportTask" => self.cancel_export_task(&req),
70 "PutDeliveryDestination" => self.put_delivery_destination(&req),
71 "GetDeliveryDestination" => self.get_delivery_destination(&req),
72 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
73 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
74 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
75 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
76 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
77 "PutDeliverySource" => self.put_delivery_source(&req),
78 "GetDeliverySource" => self.get_delivery_source(&req),
79 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
80 "DeleteDeliverySource" => self.delete_delivery_source(&req),
81 "CreateDelivery" => self.create_delivery(&req),
82 "GetDelivery" => self.get_delivery(&req),
83 "DescribeDeliveries" => self.describe_deliveries(&req),
84 "DeleteDelivery" => self.delete_delivery(&req),
85 "AssociateKmsKey" => self.associate_kms_key(&req),
86 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
87 "PutQueryDefinition" => self.put_query_definition(&req),
88 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
89 "DeleteQueryDefinition" => self.delete_query_definition(&req),
90 "PutAccountPolicy" => self.put_account_policy(&req),
91 "DescribeAccountPolicies" => self.describe_account_policies(&req),
92 "DeleteAccountPolicy" => self.delete_account_policy(&req),
93 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
94 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
95 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
96 "PutIndexPolicy" => self.put_index_policy(&req),
97 "DescribeIndexPolicies" => self.describe_index_policies(&req),
98 "DeleteIndexPolicy" => self.delete_index_policy(&req),
99 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
100 "PutTransformer" => self.put_transformer(&req),
101 "GetTransformer" => self.get_transformer(&req),
102 "DeleteTransformer" => self.delete_transformer(&req),
103 "TestTransformer" => self.test_transformer(&req),
104 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
105 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
106 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
107 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
108 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
109 "GetLogGroupFields" => self.get_log_group_fields(&req),
110 "TestMetricFilter" => self.test_metric_filter(&req),
111 "StopQuery" => self.stop_query(&req),
112 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
113 "GetLogRecord" => self.get_log_record(&req),
114 "ListAnomalies" => self.list_anomalies(&req),
115 "UpdateAnomaly" => self.update_anomaly(&req),
116 "CreateImportTask" => self.create_import_task(&req),
117 "DescribeImportTasks" => self.describe_import_tasks(&req),
118 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
119 "CancelImportTask" => self.cancel_import_task(&req),
120 "PutIntegration" => self.put_integration(&req),
121 "GetIntegration" => self.get_integration(&req),
122 "DeleteIntegration" => self.delete_integration(&req),
123 "ListIntegrations" => self.list_integrations(&req),
124 "CreateLookupTable" => self.create_lookup_table(&req),
125 "GetLookupTable" => self.get_lookup_table(&req),
126 "DescribeLookupTables" => self.describe_lookup_tables(&req),
127 "DeleteLookupTable" => self.delete_lookup_table(&req),
128 "UpdateLookupTable" => self.update_lookup_table(&req),
129 "CreateScheduledQuery" => self.create_scheduled_query(&req),
130 "GetScheduledQuery" => self.get_scheduled_query(&req),
131 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
132 "ListScheduledQueries" => self.list_scheduled_queries(&req),
133 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
134 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
135 "StartLiveTail" => self.start_live_tail(&req),
136 "ListLogGroups" => self.describe_log_groups(&req),
137 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
138 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
139 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
140 "GetLogObject" => self.get_log_object(&req),
141 "GetLogFields" => self.get_log_fields(&req),
142 "AssociateSourceToS3TableIntegration" => {
143 self.associate_source_to_s3_table_integration(&req)
144 }
145 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
146 "DisassociateSourceFromS3TableIntegration" => {
147 self.disassociate_source_from_s3_table_integration(&req)
148 }
149 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
150 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
151 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
152 }
153 }
154
155 fn supported_actions(&self) -> &[&str] {
156 &[
157 "CreateLogGroup",
158 "DeleteLogGroup",
159 "DescribeLogGroups",
160 "CreateLogStream",
161 "DeleteLogStream",
162 "DescribeLogStreams",
163 "PutLogEvents",
164 "GetLogEvents",
165 "FilterLogEvents",
166 "TagLogGroup",
167 "UntagLogGroup",
168 "ListTagsLogGroup",
169 "TagResource",
170 "UntagResource",
171 "ListTagsForResource",
172 "PutRetentionPolicy",
173 "DeleteRetentionPolicy",
174 "PutSubscriptionFilter",
175 "DescribeSubscriptionFilters",
176 "DeleteSubscriptionFilter",
177 "PutMetricFilter",
178 "DescribeMetricFilters",
179 "DeleteMetricFilter",
180 "PutResourcePolicy",
181 "DescribeResourcePolicies",
182 "DeleteResourcePolicy",
183 "PutDestination",
184 "DescribeDestinations",
185 "DeleteDestination",
186 "PutDestinationPolicy",
187 "StartQuery",
188 "GetQueryResults",
189 "DescribeQueries",
190 "CreateExportTask",
191 "DescribeExportTasks",
192 "CancelExportTask",
193 "PutDeliveryDestination",
194 "GetDeliveryDestination",
195 "DescribeDeliveryDestinations",
196 "DeleteDeliveryDestination",
197 "PutDeliveryDestinationPolicy",
198 "GetDeliveryDestinationPolicy",
199 "DeleteDeliveryDestinationPolicy",
200 "PutDeliverySource",
201 "GetDeliverySource",
202 "DescribeDeliverySources",
203 "DeleteDeliverySource",
204 "CreateDelivery",
205 "GetDelivery",
206 "DescribeDeliveries",
207 "DeleteDelivery",
208 "AssociateKmsKey",
209 "DisassociateKmsKey",
210 "PutQueryDefinition",
211 "DescribeQueryDefinitions",
212 "DeleteQueryDefinition",
213 "PutAccountPolicy",
214 "DescribeAccountPolicies",
215 "DeleteAccountPolicy",
216 "PutDataProtectionPolicy",
217 "GetDataProtectionPolicy",
218 "DeleteDataProtectionPolicy",
219 "PutIndexPolicy",
220 "DescribeIndexPolicies",
221 "DeleteIndexPolicy",
222 "DescribeFieldIndexes",
223 "PutTransformer",
224 "GetTransformer",
225 "DeleteTransformer",
226 "TestTransformer",
227 "CreateLogAnomalyDetector",
228 "GetLogAnomalyDetector",
229 "DeleteLogAnomalyDetector",
230 "ListLogAnomalyDetectors",
231 "UpdateLogAnomalyDetector",
232 "GetLogGroupFields",
233 "TestMetricFilter",
234 "StopQuery",
235 "PutLogGroupDeletionProtection",
236 "GetLogRecord",
237 "ListAnomalies",
238 "UpdateAnomaly",
239 "CreateImportTask",
240 "DescribeImportTasks",
241 "DescribeImportTaskBatches",
242 "CancelImportTask",
243 "PutIntegration",
244 "GetIntegration",
245 "DeleteIntegration",
246 "ListIntegrations",
247 "CreateLookupTable",
248 "GetLookupTable",
249 "DescribeLookupTables",
250 "DeleteLookupTable",
251 "UpdateLookupTable",
252 "CreateScheduledQuery",
253 "GetScheduledQuery",
254 "GetScheduledQueryHistory",
255 "ListScheduledQueries",
256 "DeleteScheduledQuery",
257 "UpdateScheduledQuery",
258 "StartLiveTail",
259 "ListLogGroups",
260 "ListLogGroupsForQuery",
261 "ListAggregateLogGroupSummaries",
262 "PutBearerTokenAuthentication",
263 "GetLogObject",
264 "GetLogFields",
265 "AssociateSourceToS3TableIntegration",
266 "ListSourcesForS3TableIntegration",
267 "DisassociateSourceFromS3TableIntegration",
268 "UpdateDeliveryConfiguration",
269 "DescribeConfigurationTemplates",
270 ]
271 }
272}
273
274fn body_json(req: &AwsRequest) -> Value {
275 serde_json::from_slice(&req.body).unwrap_or(Value::Null)
276}
277
278fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
279 body[field].as_str().ok_or_else(|| {
280 AwsServiceError::aws_error(
281 StatusCode::BAD_REQUEST,
282 "InvalidParameterException",
283 format!("{field} is required"),
284 )
285 })
286}
287
288fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
291 let mut m: serde_json::Map<String, Value> =
292 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
293 m.entry("destinationResourceArn".to_string())
294 .or_insert_with(|| json!(""));
295 Value::Object(m)
296}
297
298fn generate_sequence_token() -> String {
299 use std::time::{SystemTime, UNIX_EPOCH};
300 let nanos = SystemTime::now()
301 .duration_since(UNIX_EPOCH)
302 .unwrap_or_default()
303 .as_nanos();
304 format!("{:038}", nanos % 10u128.pow(38))
306}
307
308fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
309 AwsServiceError::aws_error(
310 StatusCode::BAD_REQUEST,
311 "InvalidParameterException",
312 format!(
313 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
314 ),
315 )
316}
317
318impl LogsService {
319 fn create_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
322 let body = body_json(req);
323 let name = body["logGroupName"]
324 .as_str()
325 .ok_or_else(|| {
326 AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "InvalidParameterException",
329 "logGroupName is required",
330 )
331 })?
332 .to_string();
333
334 validate_string_length("logGroupName", &name, 1, 512)?;
335 validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 1, 256)?;
336
337 let mut state = self.state.write();
338 if state.log_groups.contains_key(&name) {
339 return Err(AwsServiceError::aws_error(
340 StatusCode::BAD_REQUEST,
341 "ResourceAlreadyExistsException",
342 format!("The specified log group already exists: {name}"),
343 ));
344 }
345
346 let arn = format!(
347 "arn:aws:logs:{}:{}:log-group:{}:*",
348 state.region, state.account_id, name
349 );
350 let now = Utc::now().timestamp_millis();
351
352 let tags = body["tags"]
353 .as_object()
354 .map(|m| {
355 m.iter()
356 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
357 .collect()
358 })
359 .unwrap_or_default();
360
361 let kms_key_id = body["kmsKeyId"].as_str().map(|s| s.to_string());
362
363 state.log_groups.insert(
364 name.clone(),
365 LogGroup {
366 name,
367 arn,
368 creation_time: now,
369 retention_in_days: None,
370 kms_key_id,
371 tags,
372 log_streams: std::collections::HashMap::new(),
373 stored_bytes: 0,
374 subscription_filters: Vec::new(),
375 data_protection_policy: None,
376 index_policies: Vec::new(),
377 transformer: None,
378 deletion_protection: false,
379 },
380 );
381
382 Ok(AwsResponse::json(StatusCode::OK, "{}"))
383 }
384
385 fn delete_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
386 let body = body_json(req);
387 let name = body["logGroupName"].as_str().ok_or_else(|| {
388 AwsServiceError::aws_error(
389 StatusCode::BAD_REQUEST,
390 "InvalidParameterException",
391 "logGroupName is required",
392 )
393 })?;
394
395 validate_string_length("logGroupName", name, 1, 512)?;
396
397 let mut state = self.state.write();
398 if let Some(group) = state.log_groups.get(name) {
400 if group.deletion_protection {
401 return Err(AwsServiceError::aws_error(
402 StatusCode::BAD_REQUEST,
403 "OperationAbortedException",
404 format!("Log group {name} has deletion protection enabled"),
405 ));
406 }
407 }
408 if state.log_groups.remove(name).is_none() {
409 return Err(AwsServiceError::aws_error(
410 StatusCode::BAD_REQUEST,
411 "ResourceNotFoundException",
412 format!("The specified log group does not exist: {name}"),
413 ));
414 }
415
416 Ok(AwsResponse::json(StatusCode::OK, "{}"))
417 }
418
419 fn describe_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420 let body = body_json(req);
421 let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
422 let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
423 let limit = body["limit"].as_i64().unwrap_or(50) as usize;
424 let next_token = body["nextToken"].as_str();
425
426 validate_optional_string_length(
427 "logGroupNamePrefix",
428 body["logGroupNamePrefix"].as_str(),
429 1,
430 512,
431 )?;
432 validate_optional_string_length(
433 "logGroupNamePattern",
434 body["logGroupNamePattern"].as_str(),
435 0,
436 512,
437 )?;
438 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
439 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
440 validate_optional_enum_value(
441 "logGroupClass",
442 &body["logGroupClass"],
443 &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
444 )?;
445
446 let state = self.state.read();
447 let mut groups: Vec<&LogGroup> = state
448 .log_groups
449 .values()
450 .filter(|g| {
451 (prefix.is_empty() || g.name.starts_with(prefix))
452 && (pattern.is_empty() || g.name.contains(pattern))
453 })
454 .collect();
455 groups.sort_by(|a, b| a.name.cmp(&b.name));
456
457 let start_idx = if let Some(token) = next_token {
459 groups
460 .iter()
461 .position(|g| g.name.as_str() > token)
462 .unwrap_or(groups.len())
463 } else {
464 0
465 };
466
467 let page = &groups[start_idx..];
468 let has_more = page.len() > limit;
469 let page = if has_more { &page[..limit] } else { page };
470
471 let log_groups: Vec<Value> = page
472 .iter()
473 .map(|g| {
474 let log_group_arn = g.arn.trim_end_matches(":*").to_string();
475 let mut obj = json!({
476 "logGroupName": g.name,
477 "arn": g.arn,
478 "logGroupArn": log_group_arn,
479 "creationTime": g.creation_time,
480 "storedBytes": g.stored_bytes,
481 "metricFilterCount": 0,
482 });
483 if let Some(days) = g.retention_in_days {
484 obj["retentionInDays"] = json!(days);
485 }
486 if let Some(ref kms) = g.kms_key_id {
487 obj["kmsKeyId"] = json!(kms);
488 }
489 obj
490 })
491 .collect();
492
493 let mut result = json!({ "logGroups": log_groups });
494 if has_more {
495 if let Some(last) = page.last() {
496 result["nextToken"] = json!(last.name);
497 }
498 }
499
500 Ok(AwsResponse::json(
501 StatusCode::OK,
502 serde_json::to_string(&result).unwrap(),
503 ))
504 }
505
506 fn create_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509 let body = body_json(req);
510 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
511 AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "InvalidParameterException",
514 "logGroupName is required",
515 )
516 })?;
517 let stream_name = body["logStreamName"]
518 .as_str()
519 .ok_or_else(|| {
520 AwsServiceError::aws_error(
521 StatusCode::BAD_REQUEST,
522 "InvalidParameterException",
523 "logStreamName is required",
524 )
525 })?
526 .to_string();
527
528 validate_string_length("logGroupName", group_name, 1, 512)?;
529 validate_string_length("logStreamName", &stream_name, 1, 512)?;
530
531 let mut state = self.state.write();
532 let region = state.region.clone();
533 let account_id = state.account_id.clone();
534
535 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
536 AwsServiceError::aws_error(
537 StatusCode::BAD_REQUEST,
538 "ResourceNotFoundException",
539 format!("The specified log group does not exist: {group_name}"),
540 )
541 })?;
542
543 if group.log_streams.contains_key(&stream_name) {
544 return Err(AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "ResourceAlreadyExistsException",
547 format!("The specified log stream already exists: {stream_name}"),
548 ));
549 }
550
551 let arn = format!(
552 "arn:aws:logs:{region}:{account_id}:log-group:{group_name}:log-stream:{stream_name}",
553 );
554 let now = Utc::now().timestamp_millis();
555
556 group.log_streams.insert(
557 stream_name.clone(),
558 LogStream {
559 name: stream_name,
560 arn,
561 creation_time: now,
562 first_event_timestamp: None,
563 last_event_timestamp: None,
564 last_ingestion_time: None,
565 upload_sequence_token: generate_sequence_token(),
566 events: Vec::new(),
567 },
568 );
569
570 Ok(AwsResponse::json(StatusCode::OK, "{}"))
571 }
572
573 fn delete_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574 let body = body_json(req);
575 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
576 AwsServiceError::aws_error(
577 StatusCode::BAD_REQUEST,
578 "InvalidParameterException",
579 "logGroupName is required",
580 )
581 })?;
582 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
583 AwsServiceError::aws_error(
584 StatusCode::BAD_REQUEST,
585 "InvalidParameterException",
586 "logStreamName is required",
587 )
588 })?;
589
590 validate_string_length("logGroupName", group_name, 1, 512)?;
591 validate_string_length("logStreamName", stream_name, 1, 512)?;
592
593 let mut state = self.state.write();
594 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
595 AwsServiceError::aws_error(
596 StatusCode::BAD_REQUEST,
597 "ResourceNotFoundException",
598 format!("The specified log group does not exist: {group_name}"),
599 )
600 })?;
601
602 if group.log_streams.remove(stream_name).is_none() {
603 return Err(AwsServiceError::aws_error(
604 StatusCode::BAD_REQUEST,
605 "ResourceNotFoundException",
606 format!("The specified log stream does not exist: {stream_name}"),
607 ));
608 }
609
610 Ok(AwsResponse::json(StatusCode::OK, "{}"))
611 }
612
613 fn describe_log_streams(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
614 let body = body_json(req);
615
616 let group_name = if let Some(name) = body["logGroupName"].as_str() {
618 name.to_string()
619 } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
620 if identifier.ends_with(":*") {
622 return Err(AwsServiceError::aws_error(
623 StatusCode::BAD_REQUEST,
624 "InvalidParameterException",
625 format!(
626 "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
627 identifier
628 ),
629 ));
630 }
631 if identifier.starts_with("arn:aws:logs:") {
633 extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
634 } else {
635 identifier.to_string()
636 }
637 } else {
638 return Err(AwsServiceError::aws_error(
639 StatusCode::BAD_REQUEST,
640 "InvalidParameterException",
641 "logGroupName is required",
642 ));
643 };
644
645 let prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
646 let limit = body["limit"].as_i64().unwrap_or(50) as usize;
647 let order_by = body["orderBy"].as_str().unwrap_or("LogStreamName");
648 let next_token = body["nextToken"].as_str();
649
650 validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
651 validate_optional_string_length(
652 "logGroupIdentifier",
653 body["logGroupIdentifier"].as_str(),
654 1,
655 2048,
656 )?;
657 validate_optional_string_length(
658 "logStreamNamePrefix",
659 body["logStreamNamePrefix"].as_str(),
660 1,
661 512,
662 )?;
663
664 if limit > 50 {
666 return Err(validation_error(
667 "limit",
668 &limit.to_string(),
669 "Member must have value less than or equal to 50",
670 ));
671 }
672
673 if order_by != "LogStreamName" && order_by != "LastEventTime" {
675 return Err(validation_error(
676 "orderBy",
677 order_by,
678 "Member must satisfy enum value set: [LogStreamName, LastEventTime]",
679 ));
680 }
681
682 if order_by == "LastEventTime" && !prefix.is_empty() {
684 return Err(AwsServiceError::aws_error(
685 StatusCode::BAD_REQUEST,
686 "InvalidParameterException",
687 "Cannot order by LastEventTime with a logStreamNamePrefix.",
688 ));
689 }
690
691 let state = self.state.read();
692 let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
693 AwsServiceError::aws_error(
694 StatusCode::BAD_REQUEST,
695 "ResourceNotFoundException",
696 format!("The specified log group does not exist: {group_name}"),
697 )
698 })?;
699
700 let mut streams: Vec<&LogStream> = group
701 .log_streams
702 .values()
703 .filter(|s| prefix.is_empty() || s.name.starts_with(prefix))
704 .collect();
705 streams.sort_by(|a, b| a.name.cmp(&b.name));
706
707 let start_idx = if let Some(token) = next_token {
709 if let Some((_group, last_stream)) = token.split_once('@') {
710 streams
711 .iter()
712 .position(|s| s.name.as_str() > last_stream)
713 .unwrap_or(streams.len())
714 } else {
715 streams.len() }
717 } else {
718 0
719 };
720
721 let page = &streams[start_idx..];
722 let has_more = page.len() > limit;
723 let page = if has_more { &page[..limit] } else { page };
724
725 let log_streams: Vec<Value> = page
726 .iter()
727 .map(|s| {
728 let mut obj = json!({
729 "logStreamName": s.name,
730 "arn": s.arn,
731 "creationTime": s.creation_time,
732 "uploadSequenceToken": s.upload_sequence_token,
733 });
734 if let Some(ts) = s.first_event_timestamp {
735 obj["firstEventTimestamp"] = json!(ts);
736 }
737 if let Some(ts) = s.last_event_timestamp {
738 obj["lastEventTimestamp"] = json!(ts);
739 }
740 if let Some(ts) = s.last_ingestion_time {
741 obj["lastIngestionTime"] = json!(ts);
742 }
743 obj
744 })
745 .collect();
746
747 let mut result = json!({ "logStreams": log_streams });
748 if has_more {
749 if let Some(last) = page.last() {
750 result["nextToken"] = json!(format!("{}@{}", group_name, last.name));
751 }
752 }
753
754 Ok(AwsResponse::json(
755 StatusCode::OK,
756 serde_json::to_string(&result).unwrap(),
757 ))
758 }
759
760 fn put_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763 let body = body_json(req);
764 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
765 AwsServiceError::aws_error(
766 StatusCode::BAD_REQUEST,
767 "InvalidParameterException",
768 "logGroupName is required",
769 )
770 })?;
771 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
772 AwsServiceError::aws_error(
773 StatusCode::BAD_REQUEST,
774 "InvalidParameterException",
775 "logStreamName is required",
776 )
777 })?;
778
779 validate_string_length("logGroupName", group_name, 1, 512)?;
780 validate_string_length("logStreamName", stream_name, 1, 512)?;
781
782 let log_events = body["logEvents"].as_array().ok_or_else(|| {
783 AwsServiceError::aws_error(
784 StatusCode::BAD_REQUEST,
785 "InvalidParameterException",
786 "logEvents is required",
787 )
788 })?;
789
790 let now = Utc::now().timestamp_millis();
791
792 let timestamps: Vec<i64> = log_events
794 .iter()
795 .map(|e| e["timestamp"].as_i64().unwrap_or(now))
796 .collect();
797 for i in 1..timestamps.len() {
798 if timestamps[i] < timestamps[i - 1] {
799 return Err(AwsServiceError::aws_error(
800 StatusCode::BAD_REQUEST,
801 "InvalidParameterException",
802 "Log events in a single PutLogEvents request must be in chronological order.",
803 ));
804 }
805 }
806
807 let fourteen_days_ms = 14 * 24 * 60 * 60 * 1000i64;
809 let two_hours_ms = 2 * 60 * 60 * 1000i64;
810 let mut too_old_end_index: Option<usize> = None;
811 let mut too_new_start_index: Option<usize> = None;
812
813 for (i, ts) in timestamps.iter().enumerate() {
814 if now.saturating_sub(*ts) > fourteen_days_ms {
815 too_old_end_index = Some(i);
816 } else if ts.saturating_sub(now) > two_hours_ms && too_new_start_index.is_none() {
817 too_new_start_index = Some(i);
818 }
819 }
820
821 let mut new_events: Vec<LogEvent> = Vec::new();
823 let mut rejected_info = json!({});
824 let mut has_rejected = false;
825
826 for (i, e) in log_events.iter().enumerate() {
827 let ts = e["timestamp"].as_i64().unwrap_or(now);
828 let is_too_old = too_old_end_index.is_some() && i <= too_old_end_index.unwrap();
829 let is_too_new = too_new_start_index.is_some() && i >= too_new_start_index.unwrap();
830
831 if is_too_old || is_too_new {
832 continue;
833 }
834
835 new_events.push(LogEvent {
836 timestamp: ts,
837 message: e["message"].as_str().unwrap_or("").to_string(),
838 ingestion_time: now,
839 });
840 }
841
842 if let Some(idx) = too_old_end_index {
843 rejected_info["tooOldLogEventEndIndex"] = json!(idx);
844 has_rejected = true;
845 }
846 if let Some(idx) = too_new_start_index {
847 rejected_info["tooNewLogEventStartIndex"] = json!(idx);
848 has_rejected = true;
849 }
850
851 let mut state = self.state.write();
852 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
853 AwsServiceError::aws_error(
854 StatusCode::BAD_REQUEST,
855 "ResourceNotFoundException",
856 format!("The specified log group does not exist: {group_name}"),
857 )
858 })?;
859
860 let stream = group.log_streams.get_mut(stream_name).ok_or_else(|| {
861 AwsServiceError::aws_error(
862 StatusCode::BAD_REQUEST,
863 "ResourceNotFoundException",
864 "The specified log stream does not exist.",
865 )
866 })?;
867
868 for event in &new_events {
870 if stream.first_event_timestamp.is_none()
871 || Some(event.timestamp) < stream.first_event_timestamp
872 {
873 stream.first_event_timestamp = Some(event.timestamp);
874 }
875 if stream.last_event_timestamp.is_none()
876 || Some(event.timestamp) > stream.last_event_timestamp
877 {
878 stream.last_event_timestamp = Some(event.timestamp);
879 }
880 group.stored_bytes += event.message.len() as i64 + 26;
881 }
882 stream.last_ingestion_time = Some(now);
883
884 stream.upload_sequence_token = generate_sequence_token();
886
887 stream.events.append(&mut new_events);
888 stream.events.sort_by_key(|e| e.timestamp);
889
890 let mut response = json!({
891 "nextSequenceToken": stream.upload_sequence_token,
892 });
893 if has_rejected {
894 response["rejectedLogEventsInfo"] = rejected_info;
895 }
896
897 Ok(AwsResponse::json(
898 StatusCode::OK,
899 serde_json::to_string(&response).unwrap(),
900 ))
901 }
902
903 fn get_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904 let body = body_json(req);
905
906 let group_name = if let Some(name) = body["logGroupName"].as_str() {
908 name.to_string()
909 } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
910 if identifier.ends_with(":*") {
911 return Err(AwsServiceError::aws_error(
912 StatusCode::BAD_REQUEST,
913 "InvalidParameterException",
914 format!(
915 "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
916 identifier
917 ),
918 ));
919 }
920 if identifier.starts_with("arn:aws:logs:") {
921 extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
922 } else {
923 identifier.to_string()
924 }
925 } else {
926 return Err(AwsServiceError::aws_error(
927 StatusCode::BAD_REQUEST,
928 "InvalidParameterException",
929 "logGroupName is required",
930 ));
931 };
932
933 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
934 AwsServiceError::aws_error(
935 StatusCode::BAD_REQUEST,
936 "InvalidParameterException",
937 "logStreamName is required",
938 )
939 })?;
940
941 validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
942 validate_optional_string_length(
943 "logGroupIdentifier",
944 body["logGroupIdentifier"].as_str(),
945 1,
946 2048,
947 )?;
948 validate_string_length("logStreamName", stream_name, 1, 512)?;
949
950 let start_time = body["startTime"].as_i64();
951 let end_time = body["endTime"].as_i64();
952 let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
953 let start_from_head = body["startFromHead"].as_bool().unwrap_or(false);
954 let next_token = body["nextToken"].as_str();
955
956 if limit > 10000 {
958 return Err(validation_error(
959 "limit",
960 &limit.to_string(),
961 "Member must have value less than or equal to 10000",
962 ));
963 }
964
965 if let Some(token) = next_token {
967 if !token.starts_with("f/") && !token.starts_with("b/") {
968 return Err(AwsServiceError::aws_error(
969 StatusCode::BAD_REQUEST,
970 "InvalidParameterException",
971 "The specified nextToken is invalid.",
972 ));
973 }
974 let num_part = &token[2..];
975 if num_part.len() != 56 || num_part.parse::<u128>().is_err() {
976 return Err(AwsServiceError::aws_error(
977 StatusCode::BAD_REQUEST,
978 "InvalidParameterException",
979 "The specified nextToken is invalid.",
980 ));
981 }
982 }
983
984 let state = self.state.read();
985 let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
986 AwsServiceError::aws_error(
987 StatusCode::BAD_REQUEST,
988 "ResourceNotFoundException",
989 format!("The specified log group does not exist: {group_name}"),
990 )
991 })?;
992
993 let stream = group.log_streams.get(stream_name).ok_or_else(|| {
994 AwsServiceError::aws_error(
995 StatusCode::BAD_REQUEST,
996 "ResourceNotFoundException",
997 format!("The specified log stream does not exist: {stream_name}"),
998 )
999 })?;
1000
1001 let all_events: Vec<&LogEvent> = stream
1003 .events
1004 .iter()
1005 .filter(|e| {
1006 if let Some(start) = start_time {
1007 if e.timestamp < start {
1008 return false;
1009 }
1010 }
1011 if let Some(end) = end_time {
1012 if e.timestamp >= end {
1013 return false;
1014 }
1015 }
1016 true
1017 })
1018 .collect();
1019
1020 let total = all_events.len();
1021
1022 let (start_idx, is_forward) = if let Some(token) = next_token {
1024 let is_forward = token.starts_with("f/");
1025 let idx: usize = token[2..].parse().unwrap_or(0);
1026 if is_forward {
1027 (idx + 1, true)
1029 } else {
1030 (idx, false)
1032 }
1033 } else {
1034 (0, start_from_head)
1035 };
1036
1037 let events_slice: Vec<&LogEvent>;
1038 let next_forward_idx: usize;
1039 let next_backward_idx: usize;
1040
1041 if is_forward || start_from_head && next_token.is_none() {
1042 let end_idx = std::cmp::min(start_idx + limit, total);
1044 if start_idx >= total {
1045 events_slice = Vec::new();
1046 let last_idx = if total > 0 { total - 1 } else { 0 };
1047 next_forward_idx = last_idx;
1048 next_backward_idx = last_idx;
1049 } else {
1050 events_slice = all_events[start_idx..end_idx].to_vec();
1051 next_forward_idx = end_idx - 1;
1052 next_backward_idx = start_idx;
1053 }
1054 } else {
1055 if next_token.is_some() {
1057 let begin = start_idx.saturating_sub(limit);
1059 let end_idx = start_idx;
1060 if begin >= total || end_idx > total || begin >= end_idx {
1061 events_slice = Vec::new();
1062 next_forward_idx = start_idx;
1063 next_backward_idx = start_idx;
1064 } else {
1065 events_slice = all_events[begin..end_idx].to_vec();
1066 next_forward_idx = end_idx - 1;
1067 next_backward_idx = begin;
1068 }
1069 } else {
1070 let begin = total.saturating_sub(limit);
1072 events_slice = all_events[begin..].to_vec();
1073 next_forward_idx = if total > 0 { total - 1 } else { 0 };
1074 next_backward_idx = begin;
1075 }
1076 }
1077
1078 let events_json: Vec<Value> = events_slice
1079 .iter()
1080 .map(|e| {
1081 json!({
1082 "timestamp": e.timestamp,
1083 "message": e.message,
1084 "ingestionTime": e.ingestion_time,
1085 })
1086 })
1087 .collect();
1088
1089 let forward_token = format!("f/{:056}", next_forward_idx);
1090 let backward_token = format!("b/{:056}", next_backward_idx);
1091
1092 Ok(AwsResponse::json(
1093 StatusCode::OK,
1094 serde_json::to_string(&json!({
1095 "events": events_json,
1096 "nextForwardToken": forward_token,
1097 "nextBackwardToken": backward_token,
1098 }))
1099 .unwrap(),
1100 ))
1101 }
1102
1103 fn filter_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1104 let body = body_json(req);
1105 let log_group_identifier = body["logGroupIdentifier"].as_str();
1106 let log_group_name = body["logGroupName"].as_str();
1107 let filter_pattern = body["filterPattern"].as_str().unwrap_or("");
1108 let start_time = body["startTime"].as_i64();
1109 let end_time = body["endTime"].as_i64();
1110 let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
1111 let next_token = body["nextToken"].as_str();
1112 let stream_names: Vec<&str> = body["logStreamNames"]
1113 .as_array()
1114 .map(|a| a.iter().filter_map(|v| v.as_str()).collect())
1115 .unwrap_or_default();
1116 let stream_name_prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
1117
1118 if let Some(name) = log_group_name {
1119 validate_string_length("logGroupName", name, 1, 512)?;
1120 }
1121 validate_optional_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
1122 validate_optional_string_length(
1123 "logStreamNamePrefix",
1124 body["logStreamNamePrefix"].as_str(),
1125 1,
1126 512,
1127 )?;
1128 validate_optional_string_length("filterPattern", Some(filter_pattern), 0, 1024)?;
1129
1130 let resolved_group_name = if let Some(identifier) = log_group_identifier {
1133 if identifier.starts_with("arn:") {
1134 extract_log_group_from_arn(identifier).ok_or_else(|| {
1135 AwsServiceError::aws_error(
1136 StatusCode::BAD_REQUEST,
1137 "InvalidParameterException",
1138 format!("Invalid ARN: {identifier}"),
1139 )
1140 })?
1141 } else {
1142 identifier.to_string()
1143 }
1144 } else if let Some(name) = log_group_name {
1145 name.to_string()
1146 } else {
1147 return Err(AwsServiceError::aws_error(
1148 StatusCode::BAD_REQUEST,
1149 "InvalidParameterException",
1150 "Either logGroupName or logGroupIdentifier is required",
1151 ));
1152 };
1153
1154 if limit > 10000 {
1156 return Err(validation_error(
1157 "limit",
1158 &limit.to_string(),
1159 "Member must have value less than or equal to 10000",
1160 ));
1161 }
1162
1163 let state = self.state.read();
1164 let group = state
1165 .log_groups
1166 .get(resolved_group_name.as_str())
1167 .ok_or_else(|| {
1168 AwsServiceError::aws_error(
1169 StatusCode::BAD_REQUEST,
1170 "ResourceNotFoundException",
1171 format!("The specified log group does not exist: {resolved_group_name}"),
1172 )
1173 })?;
1174
1175 let mut filtered_events: Vec<Value> = Vec::new();
1176
1177 let streams: Vec<(&String, &LogStream)> = if !stream_names.is_empty() {
1178 group
1179 .log_streams
1180 .iter()
1181 .filter(|(name, _)| stream_names.contains(&name.as_str()))
1182 .collect()
1183 } else if !stream_name_prefix.is_empty() {
1184 group
1185 .log_streams
1186 .iter()
1187 .filter(|(name, _)| name.starts_with(stream_name_prefix))
1188 .collect()
1189 } else {
1190 group.log_streams.iter().collect()
1191 };
1192
1193 for (_, stream) in streams {
1194 for event in &stream.events {
1195 if let Some(start) = start_time {
1196 if event.timestamp < start {
1197 continue;
1198 }
1199 }
1200 if let Some(end) = end_time {
1201 if event.timestamp >= end {
1202 continue;
1203 }
1204 }
1205 if !filter_pattern.is_empty()
1207 && !matches_filter_pattern(filter_pattern, &event.message)
1208 {
1209 continue;
1210 }
1211
1212 let event_id = format!("{}-{}", stream.name, event.timestamp);
1213
1214 filtered_events.push(json!({
1215 "logStreamName": stream.name,
1216 "timestamp": event.timestamp,
1217 "message": event.message,
1218 "ingestionTime": event.ingestion_time,
1219 "eventId": event_id,
1220 }));
1221 }
1222 }
1223
1224 filtered_events.sort_by_key(|e| e["timestamp"].as_i64().unwrap_or(0));
1225
1226 let start_idx = if let Some(token) = next_token {
1229 let parts: Vec<&str> = token.splitn(3, '@').collect();
1230 if parts.len() == 3 {
1231 let after_event_id = parts[2];
1232 filtered_events
1234 .iter()
1235 .position(|e| e["eventId"].as_str().unwrap_or("") == after_event_id)
1236 .map(|pos| pos + 1)
1237 .unwrap_or(filtered_events.len())
1238 } else {
1239 filtered_events.len() }
1241 } else {
1242 0
1243 };
1244
1245 let remaining = &filtered_events[start_idx..];
1246 let has_more = remaining.len() > limit;
1247 let page: Vec<Value> = if has_more {
1248 remaining[..limit].to_vec()
1249 } else {
1250 remaining.to_vec()
1251 };
1252
1253 let mut result = json!({
1254 "events": page,
1255 "searchedLogStreams": [],
1256 });
1257
1258 if has_more {
1259 if let Some(last) = page.last() {
1260 let event_id = last["eventId"].as_str().unwrap_or("");
1261 result["nextToken"] = json!(format!(
1262 "{}@{}@{}",
1263 resolved_group_name,
1264 last["logStreamName"].as_str().unwrap_or(""),
1265 event_id
1266 ));
1267 }
1268 }
1269
1270 Ok(AwsResponse::json(
1271 StatusCode::OK,
1272 serde_json::to_string(&result).unwrap(),
1273 ))
1274 }
1275
1276 fn tag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1279 let body = body_json(req);
1280 let name = body["logGroupName"].as_str().ok_or_else(|| {
1281 AwsServiceError::aws_error(
1282 StatusCode::BAD_REQUEST,
1283 "InvalidParameterException",
1284 "logGroupName is required",
1285 )
1286 })?;
1287
1288 validate_string_length("logGroupName", name, 1, 512)?;
1289
1290 let tags = body["tags"].as_object().ok_or_else(|| {
1291 AwsServiceError::aws_error(
1292 StatusCode::BAD_REQUEST,
1293 "InvalidParameterException",
1294 "tags is required",
1295 )
1296 })?;
1297
1298 let mut state = self.state.write();
1299 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1300 AwsServiceError::aws_error(
1301 StatusCode::BAD_REQUEST,
1302 "ResourceNotFoundException",
1303 format!("The specified log group does not exist: {name}"),
1304 )
1305 })?;
1306
1307 for (k, v) in tags {
1308 group
1309 .tags
1310 .insert(k.clone(), v.as_str().unwrap_or("").to_string());
1311 }
1312
1313 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1314 }
1315
1316 fn untag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317 let body = body_json(req);
1318 let name = body["logGroupName"].as_str().ok_or_else(|| {
1319 AwsServiceError::aws_error(
1320 StatusCode::BAD_REQUEST,
1321 "InvalidParameterException",
1322 "logGroupName is required",
1323 )
1324 })?;
1325
1326 validate_string_length("logGroupName", name, 1, 512)?;
1327
1328 let keys = body["tags"].as_array().ok_or_else(|| {
1329 AwsServiceError::aws_error(
1330 StatusCode::BAD_REQUEST,
1331 "InvalidParameterException",
1332 "tags is required",
1333 )
1334 })?;
1335
1336 let mut state = self.state.write();
1337 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1338 AwsServiceError::aws_error(
1339 StatusCode::BAD_REQUEST,
1340 "ResourceNotFoundException",
1341 format!("The specified log group does not exist: {name}"),
1342 )
1343 })?;
1344
1345 for key in keys {
1346 if let Some(k) = key.as_str() {
1347 group.tags.remove(k);
1348 }
1349 }
1350
1351 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1352 }
1353
1354 fn list_tags_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355 let body = body_json(req);
1356 let name = body["logGroupName"].as_str().ok_or_else(|| {
1357 AwsServiceError::aws_error(
1358 StatusCode::BAD_REQUEST,
1359 "InvalidParameterException",
1360 "logGroupName is required",
1361 )
1362 })?;
1363
1364 validate_string_length("logGroupName", name, 1, 512)?;
1365
1366 let state = self.state.read();
1367 let group = state.log_groups.get(name).ok_or_else(|| {
1368 AwsServiceError::aws_error(
1369 StatusCode::BAD_REQUEST,
1370 "ResourceNotFoundException",
1371 format!("The specified log group does not exist: {name}"),
1372 )
1373 })?;
1374
1375 Ok(AwsResponse::json(
1376 StatusCode::OK,
1377 serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1378 ))
1379 }
1380
1381 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384 let body = body_json(req);
1385 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1386 AwsServiceError::aws_error(
1387 StatusCode::BAD_REQUEST,
1388 "InvalidParameterException",
1389 "resourceArn is required",
1390 )
1391 })?;
1392
1393 validate_string_length("resourceArn", arn, 1, 1011)?;
1394
1395 let tags = body["tags"].as_object().ok_or_else(|| {
1396 AwsServiceError::aws_error(
1397 StatusCode::BAD_REQUEST,
1398 "InvalidParameterException",
1399 "tags is required",
1400 )
1401 })?;
1402
1403 let new_tags: std::collections::HashMap<String, String> = tags
1404 .iter()
1405 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
1406 .collect();
1407
1408 let mut state = self.state.write();
1409
1410 if let Some(group) = state
1412 .log_groups
1413 .values_mut()
1414 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1415 {
1416 for (k, v) in new_tags {
1417 group.tags.insert(k, v);
1418 }
1419 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1420 }
1421
1422 if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1424 for (k, v) in new_tags {
1425 dest.tags.insert(k, v);
1426 }
1427 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1428 }
1429
1430 Err(AwsServiceError::aws_error(
1431 StatusCode::BAD_REQUEST,
1432 "ResourceNotFoundException",
1433 format!("The specified resource does not exist: {arn}"),
1434 ))
1435 }
1436
1437 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438 let body = body_json(req);
1439 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1440 AwsServiceError::aws_error(
1441 StatusCode::BAD_REQUEST,
1442 "InvalidParameterException",
1443 "resourceArn is required",
1444 )
1445 })?;
1446
1447 validate_string_length("resourceArn", arn, 1, 1011)?;
1448
1449 let tag_keys = body["tagKeys"].as_array().ok_or_else(|| {
1450 AwsServiceError::aws_error(
1451 StatusCode::BAD_REQUEST,
1452 "InvalidParameterException",
1453 "tagKeys is required",
1454 )
1455 })?;
1456
1457 let keys: Vec<String> = tag_keys
1458 .iter()
1459 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1460 .collect();
1461
1462 let mut state = self.state.write();
1463
1464 if let Some(group) = state
1466 .log_groups
1467 .values_mut()
1468 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1469 {
1470 for k in &keys {
1471 group.tags.remove(k);
1472 }
1473 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1474 }
1475
1476 if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1478 for k in &keys {
1479 dest.tags.remove(k);
1480 }
1481 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1482 }
1483
1484 Err(AwsServiceError::aws_error(
1485 StatusCode::BAD_REQUEST,
1486 "ResourceNotFoundException",
1487 format!("The specified resource does not exist: {arn}"),
1488 ))
1489 }
1490
1491 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1492 let body = body_json(req);
1493 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1494 AwsServiceError::aws_error(
1495 StatusCode::BAD_REQUEST,
1496 "InvalidParameterException",
1497 "resourceArn is required",
1498 )
1499 })?;
1500
1501 validate_string_length("resourceArn", arn, 1, 1011)?;
1502
1503 let state = self.state.read();
1504
1505 if let Some(group) = state
1507 .log_groups
1508 .values()
1509 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1510 {
1511 return Ok(AwsResponse::json(
1512 StatusCode::OK,
1513 serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1514 ));
1515 }
1516
1517 if let Some(dest) = state.destinations.values().find(|d| d.arn == arn) {
1519 return Ok(AwsResponse::json(
1520 StatusCode::OK,
1521 serde_json::to_string(&json!({ "tags": dest.tags })).unwrap(),
1522 ));
1523 }
1524
1525 Err(AwsServiceError::aws_error(
1526 StatusCode::BAD_REQUEST,
1527 "ResourceNotFoundException",
1528 format!("The specified resource does not exist: {arn}"),
1529 ))
1530 }
1531
1532 fn put_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535 let body = body_json(req);
1536 let name = body["logGroupName"].as_str().ok_or_else(|| {
1537 AwsServiceError::aws_error(
1538 StatusCode::BAD_REQUEST,
1539 "InvalidParameterException",
1540 "logGroupName is required",
1541 )
1542 })?;
1543
1544 validate_string_length("logGroupName", name, 1, 512)?;
1545
1546 let days = body["retentionInDays"].as_i64().ok_or_else(|| {
1547 AwsServiceError::aws_error(
1548 StatusCode::BAD_REQUEST,
1549 "InvalidParameterException",
1550 "retentionInDays is required",
1551 )
1552 })?;
1553
1554 let mut state = self.state.write();
1555 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1556 AwsServiceError::aws_error(
1557 StatusCode::BAD_REQUEST,
1558 "ResourceNotFoundException",
1559 format!("The specified log group does not exist: {name}"),
1560 )
1561 })?;
1562
1563 group.retention_in_days = Some(days as i32);
1564
1565 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1566 }
1567
1568 fn delete_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569 let body = body_json(req);
1570 let name = body["logGroupName"].as_str().ok_or_else(|| {
1571 AwsServiceError::aws_error(
1572 StatusCode::BAD_REQUEST,
1573 "InvalidParameterException",
1574 "logGroupName is required",
1575 )
1576 })?;
1577
1578 validate_string_length("logGroupName", name, 1, 512)?;
1579
1580 let mut state = self.state.write();
1581 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1582 AwsServiceError::aws_error(
1583 StatusCode::BAD_REQUEST,
1584 "ResourceNotFoundException",
1585 format!("The specified log group does not exist: {name}"),
1586 )
1587 })?;
1588
1589 group.retention_in_days = None;
1590
1591 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1592 }
1593
1594 fn put_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1597 let body = body_json(req);
1598 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1599 AwsServiceError::aws_error(
1600 StatusCode::BAD_REQUEST,
1601 "InvalidParameterException",
1602 "logGroupName is required",
1603 )
1604 })?;
1605 let filter_name = body["filterName"]
1606 .as_str()
1607 .ok_or_else(|| {
1608 AwsServiceError::aws_error(
1609 StatusCode::BAD_REQUEST,
1610 "InvalidParameterException",
1611 "filterName is required",
1612 )
1613 })?
1614 .to_string();
1615 let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1616 let destination_arn = body["destinationArn"]
1617 .as_str()
1618 .ok_or_else(|| {
1619 AwsServiceError::aws_error(
1620 StatusCode::BAD_REQUEST,
1621 "InvalidParameterException",
1622 "destinationArn is required",
1623 )
1624 })?
1625 .to_string();
1626 let role_arn = body["roleArn"].as_str().map(|s| s.to_string());
1627 let distribution = body["distribution"]
1628 .as_str()
1629 .unwrap_or("ByLogStream")
1630 .to_string();
1631
1632 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1633 validate_string_length("filterName", &filter_name, 1, 512)?;
1634 validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1635
1636 let mut state = self.state.write();
1637 let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1638 AwsServiceError::aws_error(
1639 StatusCode::BAD_REQUEST,
1640 "ResourceNotFoundException",
1641 "The specified log group does not exist.",
1642 )
1643 })?;
1644
1645 if let Some(existing) = group
1647 .subscription_filters
1648 .iter_mut()
1649 .find(|f| f.filter_name == filter_name)
1650 {
1651 existing.filter_pattern = filter_pattern;
1652 existing.destination_arn = destination_arn;
1653 existing.role_arn = role_arn;
1654 existing.distribution = distribution;
1655 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1656 }
1657
1658 if group.subscription_filters.len() >= 2 {
1660 return Err(AwsServiceError::aws_error(
1661 StatusCode::BAD_REQUEST,
1662 "LimitExceededException",
1663 "Resource limit exceeded.",
1664 ));
1665 }
1666
1667 let now = Utc::now().timestamp_millis();
1668 group.subscription_filters.push(SubscriptionFilter {
1669 filter_name,
1670 log_group_name: log_group_name.to_string(),
1671 filter_pattern,
1672 destination_arn,
1673 role_arn,
1674 distribution,
1675 creation_time: now,
1676 });
1677
1678 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1679 }
1680
1681 fn describe_subscription_filters(
1682 &self,
1683 req: &AwsRequest,
1684 ) -> Result<AwsResponse, AwsServiceError> {
1685 let body = body_json(req);
1686 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1687 AwsServiceError::aws_error(
1688 StatusCode::BAD_REQUEST,
1689 "InvalidParameterException",
1690 "logGroupName is required",
1691 )
1692 })?;
1693
1694 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1695 validate_optional_string_length(
1696 "filterNamePrefix",
1697 body["filterNamePrefix"].as_str(),
1698 1,
1699 512,
1700 )?;
1701
1702 let state = self.state.read();
1703 let group = state.log_groups.get(log_group_name).ok_or_else(|| {
1704 AwsServiceError::aws_error(
1705 StatusCode::BAD_REQUEST,
1706 "ResourceNotFoundException",
1707 "The specified log group does not exist.",
1708 )
1709 })?;
1710
1711 let filters: Vec<Value> = group
1712 .subscription_filters
1713 .iter()
1714 .map(|f| {
1715 let mut obj = json!({
1716 "filterName": f.filter_name,
1717 "logGroupName": f.log_group_name,
1718 "filterPattern": f.filter_pattern,
1719 "destinationArn": f.destination_arn,
1720 "distribution": f.distribution,
1721 "creationTime": f.creation_time,
1722 });
1723 if let Some(ref arn) = f.role_arn {
1724 obj["roleArn"] = json!(arn);
1725 }
1726 obj
1727 })
1728 .collect();
1729
1730 Ok(AwsResponse::json(
1731 StatusCode::OK,
1732 serde_json::to_string(&json!({ "subscriptionFilters": filters })).unwrap(),
1733 ))
1734 }
1735
1736 fn delete_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1737 let body = body_json(req);
1738 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1739 AwsServiceError::aws_error(
1740 StatusCode::BAD_REQUEST,
1741 "InvalidParameterException",
1742 "logGroupName is required",
1743 )
1744 })?;
1745 let filter_name = body["filterName"].as_str().ok_or_else(|| {
1746 AwsServiceError::aws_error(
1747 StatusCode::BAD_REQUEST,
1748 "InvalidParameterException",
1749 "filterName is required",
1750 )
1751 })?;
1752
1753 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1754 validate_string_length("filterName", filter_name, 1, 512)?;
1755
1756 let mut state = self.state.write();
1757 let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1758 AwsServiceError::aws_error(
1759 StatusCode::BAD_REQUEST,
1760 "ResourceNotFoundException",
1761 "The specified log group does not exist.",
1762 )
1763 })?;
1764
1765 let idx = group
1766 .subscription_filters
1767 .iter()
1768 .position(|f| f.filter_name == filter_name)
1769 .ok_or_else(|| {
1770 AwsServiceError::aws_error(
1771 StatusCode::BAD_REQUEST,
1772 "ResourceNotFoundException",
1773 "The specified subscription filter does not exist.",
1774 )
1775 })?;
1776
1777 group.subscription_filters.remove(idx);
1778
1779 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1780 }
1781
1782 fn put_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1785 let body = body_json(req);
1786 let filter_name = body["filterName"]
1787 .as_str()
1788 .ok_or_else(|| {
1789 AwsServiceError::aws_error(
1790 StatusCode::BAD_REQUEST,
1791 "InvalidParameterException",
1792 "filterName is required",
1793 )
1794 })?
1795 .to_string();
1796 validate_required("filterPattern", &body["filterPattern"])?;
1797 let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1798 let log_group_name = body["logGroupName"]
1799 .as_str()
1800 .ok_or_else(|| {
1801 AwsServiceError::aws_error(
1802 StatusCode::BAD_REQUEST,
1803 "InvalidParameterException",
1804 "logGroupName is required",
1805 )
1806 })?
1807 .to_string();
1808
1809 validate_string_length("filterName", &filter_name, 1, 512)?;
1810 validate_string_length("logGroupName", &log_group_name, 1, 512)?;
1811 validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1812 validate_optional_string_length(
1813 "fieldSelectionCriteria",
1814 body["fieldSelectionCriteria"].as_str(),
1815 0,
1816 2000,
1817 )?;
1818
1819 let transformations_json = body["metricTransformations"].as_array().ok_or_else(|| {
1820 AwsServiceError::aws_error(
1821 StatusCode::BAD_REQUEST,
1822 "InvalidParameterException",
1823 "metricTransformations is required",
1824 )
1825 })?;
1826
1827 if transformations_json.len() > 1 {
1829 return Err(validation_error(
1830 "metricTransformations",
1831 &format!("{}", transformations_json.len()),
1832 "Member must have length less than or equal to 1",
1833 ));
1834 }
1835
1836 let transformations: Vec<MetricTransformation> = transformations_json
1837 .iter()
1838 .map(|t| MetricTransformation {
1839 metric_name: t["metricName"].as_str().unwrap_or("").to_string(),
1840 metric_namespace: t["metricNamespace"].as_str().unwrap_or("").to_string(),
1841 metric_value: t["metricValue"].as_str().unwrap_or("").to_string(),
1842 default_value: t["defaultValue"].as_f64(),
1843 })
1844 .collect();
1845
1846 let now = Utc::now().timestamp_millis();
1847
1848 let mut state = self.state.write();
1849
1850 if let Some(existing) = state
1852 .metric_filters
1853 .iter_mut()
1854 .find(|f| f.filter_name == filter_name && f.log_group_name == log_group_name)
1855 {
1856 existing.filter_pattern = filter_pattern;
1857 existing.metric_transformations = transformations;
1858 } else {
1859 state.metric_filters.push(MetricFilter {
1860 filter_name,
1861 filter_pattern,
1862 log_group_name,
1863 metric_transformations: transformations,
1864 creation_time: now,
1865 });
1866 }
1867
1868 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1869 }
1870
1871 fn describe_metric_filters(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872 let body = body_json(req);
1873 let filter_name_prefix = body["filterNamePrefix"].as_str();
1874 let log_group_name = body["logGroupName"].as_str();
1875 let metric_name = body["metricName"].as_str();
1876 let metric_namespace = body["metricNamespace"].as_str();
1877
1878 validate_optional_string_length("filterNamePrefix", filter_name_prefix, 1, 512)?;
1879 validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
1880 validate_optional_string_length("metricName", metric_name, 0, 255)?;
1881 validate_optional_string_length("metricNamespace", metric_namespace, 0, 255)?;
1882 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
1883 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
1884
1885 let state = self.state.read();
1886 let filters: Vec<Value> = state
1887 .metric_filters
1888 .iter()
1889 .filter(|f| {
1890 if let Some(prefix) = filter_name_prefix {
1891 if !f.filter_name.starts_with(prefix) {
1892 return false;
1893 }
1894 }
1895 if let Some(lg) = log_group_name {
1896 if f.log_group_name != lg {
1897 return false;
1898 }
1899 }
1900 if let Some(mn) = metric_name {
1901 if !f.metric_transformations.iter().any(|t| t.metric_name == mn) {
1902 return false;
1903 }
1904 }
1905 if let Some(ns) = metric_namespace {
1906 if !f
1907 .metric_transformations
1908 .iter()
1909 .any(|t| t.metric_namespace == ns)
1910 {
1911 return false;
1912 }
1913 }
1914 true
1915 })
1916 .map(|f| {
1917 let transformations: Vec<Value> = f
1918 .metric_transformations
1919 .iter()
1920 .map(|t| {
1921 let mut obj = json!({
1922 "metricName": t.metric_name,
1923 "metricNamespace": t.metric_namespace,
1924 "metricValue": t.metric_value,
1925 });
1926 if let Some(dv) = t.default_value {
1927 obj["defaultValue"] = json!(dv);
1928 }
1929 obj
1930 })
1931 .collect();
1932
1933 json!({
1934 "filterName": f.filter_name,
1935 "filterPattern": f.filter_pattern,
1936 "logGroupName": f.log_group_name,
1937 "metricTransformations": transformations,
1938 "creationTime": f.creation_time,
1939 })
1940 })
1941 .collect();
1942
1943 Ok(AwsResponse::json(
1944 StatusCode::OK,
1945 serde_json::to_string(&json!({ "metricFilters": filters })).unwrap(),
1946 ))
1947 }
1948
1949 fn delete_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1950 let body = body_json(req);
1951 let filter_name = body["filterName"].as_str().ok_or_else(|| {
1952 AwsServiceError::aws_error(
1953 StatusCode::BAD_REQUEST,
1954 "InvalidParameterException",
1955 "filterName is required",
1956 )
1957 })?;
1958 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1959 AwsServiceError::aws_error(
1960 StatusCode::BAD_REQUEST,
1961 "InvalidParameterException",
1962 "logGroupName is required",
1963 )
1964 })?;
1965
1966 validate_string_length("filterName", filter_name, 1, 512)?;
1967 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1968
1969 let mut state = self.state.write();
1970 let idx = state
1971 .metric_filters
1972 .iter()
1973 .position(|f| f.filter_name == filter_name && f.log_group_name == log_group_name);
1974
1975 if let Some(i) = idx {
1976 state.metric_filters.remove(i);
1977 }
1978
1979 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1980 }
1981
1982 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1985 let body = body_json(req);
1986 let policy_name = body["policyName"]
1987 .as_str()
1988 .ok_or_else(|| {
1989 AwsServiceError::aws_error(
1990 StatusCode::BAD_REQUEST,
1991 "InvalidParameterException",
1992 "policyName is required",
1993 )
1994 })?
1995 .to_string();
1996 let policy_document = body["policyDocument"]
1997 .as_str()
1998 .ok_or_else(|| {
1999 AwsServiceError::aws_error(
2000 StatusCode::BAD_REQUEST,
2001 "InvalidParameterException",
2002 "policyDocument is required",
2003 )
2004 })?
2005 .to_string();
2006
2007 let now = Utc::now().timestamp_millis();
2008
2009 let mut state = self.state.write();
2010
2011 if !state.resource_policies.contains_key(&policy_name)
2013 && state.resource_policies.len() >= 10
2014 {
2015 return Err(AwsServiceError::aws_error(
2016 StatusCode::BAD_REQUEST,
2017 "LimitExceededException",
2018 "Resource limit exceeded.",
2019 ));
2020 }
2021
2022 let policy = ResourcePolicy {
2023 policy_name: policy_name.clone(),
2024 policy_document: policy_document.clone(),
2025 last_updated_time: now,
2026 };
2027
2028 state.resource_policies.insert(policy_name.clone(), policy);
2029
2030 Ok(AwsResponse::json(
2031 StatusCode::OK,
2032 serde_json::to_string(&json!({
2033 "resourcePolicy": {
2034 "policyName": policy_name,
2035 "policyDocument": policy_document,
2036 "lastUpdatedTime": now,
2037 }
2038 }))
2039 .unwrap(),
2040 ))
2041 }
2042
2043 fn describe_resource_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2044 let body = body_json(req);
2045 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2046 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2047 validate_optional_enum_value(
2048 "policyScope",
2049 &body["policyScope"],
2050 &["ACCOUNT", "RESOURCE"],
2051 )?;
2052 let state = self.state.read();
2053
2054 let mut policies: Vec<Value> = state
2055 .resource_policies
2056 .values()
2057 .map(|p| {
2058 json!({
2059 "policyName": p.policy_name,
2060 "policyDocument": p.policy_document,
2061 "lastUpdatedTime": p.last_updated_time,
2062 })
2063 })
2064 .collect();
2065 policies.sort_by(|a, b| {
2066 a["policyName"]
2067 .as_str()
2068 .unwrap()
2069 .cmp(b["policyName"].as_str().unwrap())
2070 });
2071
2072 Ok(AwsResponse::json(
2073 StatusCode::OK,
2074 serde_json::to_string(&json!({ "resourcePolicies": policies })).unwrap(),
2075 ))
2076 }
2077
2078 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079 let body = body_json(req);
2080 let policy_name = body["policyName"].as_str().ok_or_else(|| {
2081 AwsServiceError::aws_error(
2082 StatusCode::BAD_REQUEST,
2083 "InvalidParameterException",
2084 "policyName is required",
2085 )
2086 })?;
2087
2088 let mut state = self.state.write();
2089 if state.resource_policies.remove(policy_name).is_none() {
2090 return Err(AwsServiceError::aws_error(
2091 StatusCode::BAD_REQUEST,
2092 "ResourceNotFoundException",
2093 format!("Policy with name [{policy_name}] does not exist"),
2094 ));
2095 }
2096
2097 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2098 }
2099
2100 fn put_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2103 let body = body_json(req);
2104 let destination_name = body["destinationName"]
2105 .as_str()
2106 .ok_or_else(|| {
2107 AwsServiceError::aws_error(
2108 StatusCode::BAD_REQUEST,
2109 "InvalidParameterException",
2110 "destinationName is required",
2111 )
2112 })?
2113 .to_string();
2114 let target_arn = body["targetArn"]
2115 .as_str()
2116 .ok_or_else(|| {
2117 AwsServiceError::aws_error(
2118 StatusCode::BAD_REQUEST,
2119 "InvalidParameterException",
2120 "targetArn is required",
2121 )
2122 })?
2123 .to_string();
2124 let role_arn = body["roleArn"]
2125 .as_str()
2126 .ok_or_else(|| {
2127 AwsServiceError::aws_error(
2128 StatusCode::BAD_REQUEST,
2129 "InvalidParameterException",
2130 "roleArn is required",
2131 )
2132 })?
2133 .to_string();
2134
2135 validate_string_length("destinationName", &destination_name, 1, 512)?;
2136 validate_string_length("targetArn", &target_arn, 1, 2048)?;
2137 validate_string_length("roleArn", &role_arn, 1, 2048)?;
2138
2139 let tags: std::collections::HashMap<String, String> = body["tags"]
2140 .as_object()
2141 .map(|m| {
2142 m.iter()
2143 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2144 .collect()
2145 })
2146 .unwrap_or_default();
2147
2148 let mut state = self.state.write();
2149 let arn = format!(
2150 "arn:aws:logs:{}:{}:destination:{}",
2151 state.region, state.account_id, destination_name
2152 );
2153 let now = Utc::now().timestamp_millis();
2154
2155 let access_policy = state
2157 .destinations
2158 .get(&destination_name)
2159 .and_then(|d| d.access_policy.clone());
2160
2161 let dest = Destination {
2162 destination_name: destination_name.clone(),
2163 target_arn: target_arn.clone(),
2164 role_arn: role_arn.clone(),
2165 arn: arn.clone(),
2166 access_policy,
2167 creation_time: now,
2168 tags: tags.clone(),
2169 };
2170
2171 state.destinations.insert(destination_name.clone(), dest);
2172
2173 let dest_json = json!({
2174 "destinationName": destination_name,
2175 "targetArn": target_arn,
2176 "roleArn": role_arn,
2177 "arn": arn,
2178 "creationTime": now,
2179 });
2180
2181 Ok(AwsResponse::json(
2182 StatusCode::OK,
2183 serde_json::to_string(&json!({ "destination": dest_json })).unwrap(),
2184 ))
2185 }
2186
2187 fn describe_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2188 let body = body_json(req);
2189 let prefix = body["DestinationNamePrefix"].as_str().unwrap_or("");
2190
2191 validate_optional_string_length(
2192 "DestinationNamePrefix",
2193 body["DestinationNamePrefix"].as_str(),
2194 1,
2195 512,
2196 )?;
2197 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2198 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2199
2200 let state = self.state.read();
2201 let destinations: Vec<Value> = state
2202 .destinations
2203 .values()
2204 .filter(|d| prefix.is_empty() || d.destination_name.starts_with(prefix))
2205 .map(|d| {
2206 let mut obj = json!({
2207 "destinationName": d.destination_name,
2208 "targetArn": d.target_arn,
2209 "roleArn": d.role_arn,
2210 "arn": d.arn,
2211 "creationTime": d.creation_time,
2212 });
2213 if let Some(ref policy) = d.access_policy {
2214 obj["accessPolicy"] = json!(policy);
2215 }
2216 obj
2217 })
2218 .collect();
2219
2220 Ok(AwsResponse::json(
2221 StatusCode::OK,
2222 serde_json::to_string(&json!({ "destinations": destinations })).unwrap(),
2223 ))
2224 }
2225
2226 fn delete_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2227 let body = body_json(req);
2228 let name = body["destinationName"].as_str().ok_or_else(|| {
2229 AwsServiceError::aws_error(
2230 StatusCode::BAD_REQUEST,
2231 "InvalidParameterException",
2232 "destinationName is required",
2233 )
2234 })?;
2235
2236 validate_string_length("destinationName", name, 1, 512)?;
2237
2238 let mut state = self.state.write();
2239 if state.destinations.remove(name).is_none() {
2240 return Err(AwsServiceError::aws_error(
2241 StatusCode::BAD_REQUEST,
2242 "ResourceNotFoundException",
2243 format!("The specified destination does not exist: {name}"),
2244 ));
2245 }
2246
2247 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2248 }
2249
2250 fn put_destination_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2251 let body = body_json(req);
2252 let name = body["destinationName"].as_str().ok_or_else(|| {
2253 AwsServiceError::aws_error(
2254 StatusCode::BAD_REQUEST,
2255 "InvalidParameterException",
2256 "destinationName is required",
2257 )
2258 })?;
2259
2260 validate_string_length("destinationName", name, 1, 512)?;
2261
2262 let policy = body["accessPolicy"].as_str().ok_or_else(|| {
2263 AwsServiceError::aws_error(
2264 StatusCode::BAD_REQUEST,
2265 "InvalidParameterException",
2266 "accessPolicy is required",
2267 )
2268 })?;
2269
2270 validate_string_length("accessPolicy", policy, 1, 5120)?;
2271
2272 let mut state = self.state.write();
2273 let dest = state.destinations.get_mut(name).ok_or_else(|| {
2274 AwsServiceError::aws_error(
2275 StatusCode::BAD_REQUEST,
2276 "ResourceNotFoundException",
2277 format!("The specified destination does not exist: {name}"),
2278 )
2279 })?;
2280
2281 dest.access_policy = Some(policy.to_string());
2282
2283 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2284 }
2285
2286 fn start_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2289 let body = body_json(req);
2290 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
2291 AwsServiceError::aws_error(
2292 StatusCode::BAD_REQUEST,
2293 "InvalidParameterException",
2294 "logGroupName is required",
2295 )
2296 })?;
2297 let start_time = body["startTime"].as_i64().unwrap_or(0);
2298 let end_time = body["endTime"].as_i64().unwrap_or(0);
2299 let query_string = body["queryString"].as_str().unwrap_or("").to_string();
2300
2301 validate_string_length("logGroupName", log_group_name, 1, 512)?;
2302 validate_optional_string_length("queryString", Some(&query_string), 0, 10000)?;
2303
2304 let mut state = self.state.write();
2305
2306 if !state.log_groups.contains_key(log_group_name) {
2308 return Err(AwsServiceError::aws_error(
2309 StatusCode::BAD_REQUEST,
2310 "ResourceNotFoundException",
2311 "The specified log group does not exist.",
2312 ));
2313 }
2314
2315 let query_id = uuid::Uuid::new_v4().to_string();
2316 let now = Utc::now().timestamp_millis();
2317
2318 state.queries.insert(
2319 query_id.clone(),
2320 QueryInfo {
2321 query_id: query_id.clone(),
2322 log_group_name: log_group_name.to_string(),
2323 query_string,
2324 start_time,
2325 end_time,
2326 status: "Complete".to_string(),
2327 create_time: now,
2328 },
2329 );
2330
2331 Ok(AwsResponse::json(
2332 StatusCode::OK,
2333 serde_json::to_string(&json!({ "queryId": query_id })).unwrap(),
2334 ))
2335 }
2336
2337 fn get_query_results(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2338 let body = body_json(req);
2339 let query_id = body["queryId"].as_str().ok_or_else(|| {
2340 AwsServiceError::aws_error(
2341 StatusCode::BAD_REQUEST,
2342 "InvalidParameterException",
2343 "queryId is required",
2344 )
2345 })?;
2346
2347 validate_string_length("queryId", query_id, 1, 256)?;
2348
2349 let state = self.state.read();
2350 let query = state.queries.get(query_id).ok_or_else(|| {
2351 AwsServiceError::aws_error(
2352 StatusCode::BAD_REQUEST,
2353 "ResourceNotFoundException",
2354 "The specified query does not exist.",
2355 )
2356 })?;
2357
2358 let mut results: Vec<Value> = Vec::new();
2360 if let Some(group) = state.log_groups.get(&query.log_group_name) {
2361 for stream in group.log_streams.values() {
2362 for event in &stream.events {
2363 let event_time_secs = event.timestamp / 1000;
2365 if event_time_secs >= query.start_time && event_time_secs < query.end_time {
2366 results.push(json!([
2367 {"field": "@message", "value": event.message},
2368 {"field": "@ptr", "value": format!("{}/{}", stream.name, event.timestamp)},
2369 ]));
2370 }
2371 }
2372 }
2373 }
2374
2375 results.sort_by(|a, b| {
2377 let a_msg = a[0]["value"].as_str().unwrap_or("");
2378 let b_msg = b[0]["value"].as_str().unwrap_or("");
2379 a_msg.cmp(b_msg)
2380 });
2381
2382 Ok(AwsResponse::json(
2383 StatusCode::OK,
2384 serde_json::to_string(&json!({
2385 "status": query.status,
2386 "results": results,
2387 "statistics": {
2388 "recordsMatched": results.len() as f64,
2389 "recordsScanned": results.len() as f64,
2390 "bytesScanned": 0.0,
2391 },
2392 }))
2393 .unwrap(),
2394 ))
2395 }
2396
2397 fn describe_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2398 let body = body_json(req);
2399 let log_group_name = body["logGroupName"].as_str();
2400 let status_filter = body["status"].as_str();
2401
2402 validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
2403 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
2404 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2405 validate_optional_enum_value(
2406 "status",
2407 &body["status"],
2408 &[
2409 "Scheduled",
2410 "Running",
2411 "Complete",
2412 "Failed",
2413 "Cancelled",
2414 "Timeout",
2415 "Unknown",
2416 ],
2417 )?;
2418 validate_optional_enum_value(
2419 "queryLanguage",
2420 &body["queryLanguage"],
2421 &["CWLI", "SQL", "PPL"],
2422 )?;
2423
2424 let state = self.state.read();
2425 let queries: Vec<Value> = state
2426 .queries
2427 .values()
2428 .filter(|q| {
2429 if let Some(lg) = log_group_name {
2430 if q.log_group_name != lg {
2431 return false;
2432 }
2433 }
2434 if let Some(status) = status_filter {
2435 if q.status != status {
2436 return false;
2437 }
2438 }
2439 true
2440 })
2441 .map(|q| {
2442 json!({
2443 "queryId": q.query_id,
2444 "queryString": q.query_string,
2445 "status": q.status,
2446 "createTime": q.create_time,
2447 "logGroupName": q.log_group_name,
2448 })
2449 })
2450 .collect();
2451
2452 Ok(AwsResponse::json(
2453 StatusCode::OK,
2454 serde_json::to_string(&json!({ "queries": queries })).unwrap(),
2455 ))
2456 }
2457
2458 fn create_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461 let body = body_json(req);
2462 let log_group_name = body["logGroupName"]
2463 .as_str()
2464 .ok_or_else(|| {
2465 AwsServiceError::aws_error(
2466 StatusCode::BAD_REQUEST,
2467 "InvalidParameterException",
2468 "logGroupName is required",
2469 )
2470 })?
2471 .to_string();
2472 let from_time = body["from"].as_i64().unwrap_or(0);
2473 let to_time = body["to"].as_i64().unwrap_or(0);
2474 let destination = body["destination"]
2475 .as_str()
2476 .ok_or_else(|| {
2477 AwsServiceError::aws_error(
2478 StatusCode::BAD_REQUEST,
2479 "InvalidParameterException",
2480 "destination is required",
2481 )
2482 })?
2483 .to_string();
2484 let destination_prefix = body["destinationPrefix"]
2485 .as_str()
2486 .unwrap_or("exportedlogs")
2487 .to_string();
2488
2489 validate_string_length("logGroupName", &log_group_name, 1, 512)?;
2490 validate_optional_string_length("taskName", body["taskName"].as_str(), 1, 512)?;
2491 validate_optional_string_length(
2492 "logStreamNamePrefix",
2493 body["logStreamNamePrefix"].as_str(),
2494 1,
2495 512,
2496 )?;
2497 validate_string_length("destination", &destination, 1, 512)?;
2498
2499 let state = self.state.read();
2500 if !state.log_groups.contains_key(&log_group_name) {
2501 return Err(AwsServiceError::aws_error(
2502 StatusCode::BAD_REQUEST,
2503 "ResourceNotFoundException",
2504 "The specified log group does not exist.",
2505 ));
2506 }
2507 drop(state);
2508
2509 let task_name = body["taskName"].as_str().map(|s| s.to_string());
2510 let log_stream_name_prefix = body["logStreamNamePrefix"].as_str().map(|s| s.to_string());
2511
2512 let task_id = uuid::Uuid::new_v4().to_string();
2513 let (status_code, status_message) = if from_time < to_time {
2514 (
2515 "COMPLETED".to_string(),
2516 "Completed successfully".to_string(),
2517 )
2518 } else {
2519 ("active".to_string(), "Task is active".to_string())
2520 };
2521
2522 let mut state = self.state.write();
2523 state.export_tasks.push(ExportTask {
2524 task_id: task_id.clone(),
2525 task_name,
2526 log_group_name,
2527 log_stream_name_prefix,
2528 from_time,
2529 to_time,
2530 destination,
2531 destination_prefix,
2532 status_code,
2533 status_message,
2534 });
2535
2536 Ok(AwsResponse::json(
2537 StatusCode::OK,
2538 serde_json::to_string(&json!({ "taskId": task_id })).unwrap(),
2539 ))
2540 }
2541
2542 fn describe_export_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2543 let body = body_json(req);
2544 let task_id_filter = body["taskId"].as_str();
2545
2546 validate_optional_string_length("taskId", task_id_filter, 1, 512)?;
2547 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2548 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2549 validate_optional_enum_value(
2550 "statusCode",
2551 &body["statusCode"],
2552 &[
2553 "CANCELLED",
2554 "COMPLETED",
2555 "FAILED",
2556 "PENDING",
2557 "PENDING_CANCEL",
2558 "RUNNING",
2559 ],
2560 )?;
2561
2562 let state = self.state.read();
2563
2564 if let Some(task_id) = task_id_filter {
2565 let task = state.export_tasks.iter().find(|t| t.task_id == task_id);
2566 if task.is_none() {
2567 return Err(AwsServiceError::aws_error(
2568 StatusCode::BAD_REQUEST,
2569 "ResourceNotFoundException",
2570 "The specified export task does not exist.",
2571 ));
2572 }
2573 }
2574
2575 let tasks: Vec<Value> = state
2576 .export_tasks
2577 .iter()
2578 .filter(|t| {
2579 if let Some(tid) = task_id_filter {
2580 t.task_id == tid
2581 } else {
2582 true
2583 }
2584 })
2585 .map(|t| {
2586 let mut obj = json!({
2587 "taskId": t.task_id,
2588 "logGroupName": t.log_group_name,
2589 "from": t.from_time,
2590 "to": t.to_time,
2591 "destination": t.destination,
2592 "destinationPrefix": t.destination_prefix,
2593 "status": {
2594 "code": t.status_code,
2595 "message": t.status_message,
2596 },
2597 });
2598 if let Some(ref name) = t.task_name {
2599 obj["taskName"] = json!(name);
2600 }
2601 if let Some(ref prefix) = t.log_stream_name_prefix {
2602 obj["logStreamNamePrefix"] = json!(prefix);
2603 }
2604 obj
2605 })
2606 .collect();
2607
2608 Ok(AwsResponse::json(
2609 StatusCode::OK,
2610 serde_json::to_string(&json!({ "exportTasks": tasks })).unwrap(),
2611 ))
2612 }
2613
2614 fn cancel_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615 let body = body_json(req);
2616 let task_id = body["taskId"].as_str().ok_or_else(|| {
2617 AwsServiceError::aws_error(
2618 StatusCode::BAD_REQUEST,
2619 "InvalidParameterException",
2620 "taskId is required",
2621 )
2622 })?;
2623
2624 validate_string_length("taskId", task_id, 1, 512)?;
2625
2626 let mut state = self.state.write();
2627 let task = state
2628 .export_tasks
2629 .iter_mut()
2630 .find(|t| t.task_id == task_id)
2631 .ok_or_else(|| {
2632 AwsServiceError::aws_error(
2633 StatusCode::BAD_REQUEST,
2634 "ResourceNotFoundException",
2635 "The specified export task does not exist.",
2636 )
2637 })?;
2638
2639 task.status_code = "CANCELLED".to_string();
2640 task.status_message = "Task was cancelled".to_string();
2641
2642 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2643 }
2644
2645 fn put_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2648 let body = body_json(req);
2649 let name = body["name"]
2650 .as_str()
2651 .ok_or_else(|| {
2652 AwsServiceError::aws_error(
2653 StatusCode::BAD_REQUEST,
2654 "InvalidParameterException",
2655 "name is required",
2656 )
2657 })?
2658 .to_string();
2659
2660 validate_string_length("name", &name, 1, 60)?;
2661
2662 validate_optional_enum_value(
2663 "deliveryDestinationType",
2664 &body["deliveryDestinationType"],
2665 &["S3", "CWL", "FH", "XRAY"],
2666 )?;
2667
2668 let output_format = body["outputFormat"].as_str().map(|s| s.to_string());
2669
2670 if let Some(ref fmt) = output_format {
2672 let valid = ["json", "plain", "w3c", "raw", "parquet"];
2673 if !valid.contains(&fmt.as_str()) {
2674 return Err(AwsServiceError::aws_error(
2675 StatusCode::BAD_REQUEST,
2676 "ValidationException",
2677 format!("1 validation error detected: Value '{fmt}' at 'outputFormat' failed to satisfy constraint: Member must satisfy enum value set: [json, plain, w3c, raw, parquet]"),
2678 ));
2679 }
2680 }
2681
2682 let config: std::collections::HashMap<String, String> = body
2683 ["deliveryDestinationConfiguration"]
2684 .as_object()
2685 .map(|m| {
2686 m.iter()
2687 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2688 .collect()
2689 })
2690 .unwrap_or_default();
2691
2692 let tags: std::collections::HashMap<String, String> = body["tags"]
2693 .as_object()
2694 .map(|m| {
2695 m.iter()
2696 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2697 .collect()
2698 })
2699 .unwrap_or_default();
2700
2701 let mut state = self.state.write();
2702
2703 if let Some(existing) = state.delivery_destinations.get(&name) {
2705 if let Some(ref new_fmt) = output_format {
2706 if let Some(ref existing_fmt) = existing.output_format {
2707 if new_fmt != existing_fmt {
2708 return Err(AwsServiceError::aws_error(
2709 StatusCode::BAD_REQUEST,
2710 "ValidationException",
2711 "Cannot update outputFormat for an existing delivery destination.",
2712 ));
2713 }
2714 }
2715 }
2716 }
2717
2718 let arn = format!(
2719 "arn:aws:logs:{}:{}:delivery-destination:{}",
2720 state.region, state.account_id, name
2721 );
2722
2723 let existing_policy = state
2724 .delivery_destinations
2725 .get(&name)
2726 .and_then(|d| d.delivery_destination_policy.clone());
2727
2728 let dd = DeliveryDestination {
2729 name: name.clone(),
2730 arn: arn.clone(),
2731 output_format: output_format.clone(),
2732 delivery_destination_configuration: config.clone(),
2733 tags: tags.clone(),
2734 delivery_destination_policy: existing_policy,
2735 };
2736
2737 state.delivery_destinations.insert(name.clone(), dd);
2738
2739 let config_resp = {
2742 let mut c: serde_json::Map<String, Value> =
2743 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
2744 c.entry("destinationResourceArn".to_string())
2745 .or_insert_with(|| json!(""));
2746 Value::Object(c)
2747 };
2748
2749 let mut resp = json!({
2750 "deliveryDestination": {
2751 "name": name,
2752 "arn": arn,
2753 "deliveryDestinationConfiguration": config_resp,
2754 }
2755 });
2756 if let Some(ref fmt) = output_format {
2757 resp["deliveryDestination"]["outputFormat"] = json!(fmt);
2758 }
2759 if !tags.is_empty() {
2760 resp["deliveryDestination"]["tags"] = json!(tags);
2761 }
2762
2763 Ok(AwsResponse::json(
2764 StatusCode::OK,
2765 serde_json::to_string(&resp).unwrap(),
2766 ))
2767 }
2768
2769 fn get_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2770 let body = body_json(req);
2771 let name = body["name"].as_str().ok_or_else(|| {
2772 AwsServiceError::aws_error(
2773 StatusCode::BAD_REQUEST,
2774 "InvalidParameterException",
2775 "name is required",
2776 )
2777 })?;
2778
2779 validate_string_length("name", name, 1, 60)?;
2780
2781 let state = self.state.read();
2782 let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2783 AwsServiceError::aws_error(
2784 StatusCode::BAD_REQUEST,
2785 "ResourceNotFoundException",
2786 format!("Delivery destination '{name}' does not exist."),
2787 )
2788 })?;
2789
2790 let mut obj = json!({
2791 "name": dd.name,
2792 "arn": dd.arn,
2793 "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2794 });
2795 if let Some(ref fmt) = dd.output_format {
2796 obj["outputFormat"] = json!(fmt);
2797 }
2798
2799 Ok(AwsResponse::json(
2800 StatusCode::OK,
2801 serde_json::to_string(&json!({ "deliveryDestination": obj })).unwrap(),
2802 ))
2803 }
2804
2805 fn describe_delivery_destinations(
2806 &self,
2807 req: &AwsRequest,
2808 ) -> Result<AwsResponse, AwsServiceError> {
2809 let body = body_json(req);
2810 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2811 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2812
2813 let state = self.state.read();
2814 let dds: Vec<Value> = state
2815 .delivery_destinations
2816 .values()
2817 .map(|dd| {
2818 let mut obj = json!({
2819 "name": dd.name,
2820 "arn": dd.arn,
2821 "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2822 });
2823 if let Some(ref fmt) = dd.output_format {
2824 obj["outputFormat"] = json!(fmt);
2825 }
2826 obj
2827 })
2828 .collect();
2829
2830 Ok(AwsResponse::json(
2831 StatusCode::OK,
2832 serde_json::to_string(&json!({ "deliveryDestinations": dds })).unwrap(),
2833 ))
2834 }
2835
2836 fn delete_delivery_destination(
2837 &self,
2838 req: &AwsRequest,
2839 ) -> Result<AwsResponse, AwsServiceError> {
2840 let body = body_json(req);
2841 let name = body["name"].as_str().ok_or_else(|| {
2842 AwsServiceError::aws_error(
2843 StatusCode::BAD_REQUEST,
2844 "InvalidParameterException",
2845 "name is required",
2846 )
2847 })?;
2848
2849 validate_string_length("name", name, 1, 60)?;
2850
2851 let mut state = self.state.write();
2852 if state.delivery_destinations.remove(name).is_none() {
2853 return Err(AwsServiceError::aws_error(
2854 StatusCode::BAD_REQUEST,
2855 "ResourceNotFoundException",
2856 format!("Delivery destination '{name}' does not exist."),
2857 ));
2858 }
2859
2860 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2861 }
2862
2863 fn put_delivery_destination_policy(
2864 &self,
2865 req: &AwsRequest,
2866 ) -> Result<AwsResponse, AwsServiceError> {
2867 let body = body_json(req);
2868 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2869 AwsServiceError::aws_error(
2870 StatusCode::BAD_REQUEST,
2871 "InvalidParameterException",
2872 "deliveryDestinationName is required",
2873 )
2874 })?;
2875 let policy = body["deliveryDestinationPolicy"]
2876 .as_str()
2877 .ok_or_else(|| {
2878 AwsServiceError::aws_error(
2879 StatusCode::BAD_REQUEST,
2880 "InvalidParameterException",
2881 "deliveryDestinationPolicy is required",
2882 )
2883 })?
2884 .to_string();
2885
2886 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2887 validate_string_length("deliveryDestinationPolicy", &policy, 1, 51200)?;
2888
2889 let mut state = self.state.write();
2890 let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2891 AwsServiceError::aws_error(
2892 StatusCode::BAD_REQUEST,
2893 "ResourceNotFoundException",
2894 format!("Delivery destination '{name}' does not exist."),
2895 )
2896 })?;
2897
2898 dd.delivery_destination_policy = Some(policy.clone());
2899
2900 Ok(AwsResponse::json(
2901 StatusCode::OK,
2902 serde_json::to_string(&json!({
2903 "policy": {
2904 "deliveryDestinationPolicy": policy,
2905 }
2906 }))
2907 .unwrap(),
2908 ))
2909 }
2910
2911 fn get_delivery_destination_policy(
2912 &self,
2913 req: &AwsRequest,
2914 ) -> Result<AwsResponse, AwsServiceError> {
2915 let body = body_json(req);
2916 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2917 AwsServiceError::aws_error(
2918 StatusCode::BAD_REQUEST,
2919 "InvalidParameterException",
2920 "deliveryDestinationName is required",
2921 )
2922 })?;
2923
2924 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2925
2926 let state = self.state.read();
2927 let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2928 AwsServiceError::aws_error(
2929 StatusCode::BAD_REQUEST,
2930 "ResourceNotFoundException",
2931 format!("Delivery destination '{name}' does not exist."),
2932 )
2933 })?;
2934
2935 let policy_json = if let Some(ref policy) = dd.delivery_destination_policy {
2936 json!({
2937 "deliveryDestinationPolicy": policy,
2938 })
2939 } else {
2940 json!({})
2941 };
2942
2943 Ok(AwsResponse::json(
2944 StatusCode::OK,
2945 serde_json::to_string(&json!({
2946 "policy": policy_json,
2947 }))
2948 .unwrap(),
2949 ))
2950 }
2951
2952 fn delete_delivery_destination_policy(
2953 &self,
2954 req: &AwsRequest,
2955 ) -> Result<AwsResponse, AwsServiceError> {
2956 let body = body_json(req);
2957 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2958 AwsServiceError::aws_error(
2959 StatusCode::BAD_REQUEST,
2960 "InvalidParameterException",
2961 "deliveryDestinationName is required",
2962 )
2963 })?;
2964
2965 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2966
2967 let mut state = self.state.write();
2968 let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2969 AwsServiceError::aws_error(
2970 StatusCode::BAD_REQUEST,
2971 "ResourceNotFoundException",
2972 format!("Delivery destination '{name}' does not exist."),
2973 )
2974 })?;
2975
2976 dd.delivery_destination_policy = None;
2977
2978 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2979 }
2980
2981 fn put_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2984 let body = body_json(req);
2985 let name = body["name"]
2986 .as_str()
2987 .ok_or_else(|| {
2988 AwsServiceError::aws_error(
2989 StatusCode::BAD_REQUEST,
2990 "InvalidParameterException",
2991 "name is required",
2992 )
2993 })?
2994 .to_string();
2995 let resource_arn = body["resourceArn"]
2996 .as_str()
2997 .ok_or_else(|| {
2998 AwsServiceError::aws_error(
2999 StatusCode::BAD_REQUEST,
3000 "InvalidParameterException",
3001 "resourceArn is required",
3002 )
3003 })?
3004 .to_string();
3005 let log_type = body["logType"]
3006 .as_str()
3007 .ok_or_else(|| {
3008 AwsServiceError::aws_error(
3009 StatusCode::BAD_REQUEST,
3010 "InvalidParameterException",
3011 "logType is required",
3012 )
3013 })?
3014 .to_string();
3015
3016 validate_string_length("name", &name, 1, 60)?;
3017 validate_string_length("logType", &log_type, 1, 255)?;
3018
3019 let tags: std::collections::HashMap<String, String> = body["tags"]
3020 .as_object()
3021 .map(|m| {
3022 m.iter()
3023 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3024 .collect()
3025 })
3026 .unwrap_or_default();
3027
3028 let service = resource_arn
3030 .split(':')
3031 .nth(2)
3032 .unwrap_or("unknown")
3033 .to_string();
3034
3035 if !resource_arn.starts_with("arn:aws:") {
3037 return Err(AwsServiceError::aws_error(
3038 StatusCode::BAD_REQUEST,
3039 "ValidationException",
3040 format!("Invalid resource ARN: {resource_arn}"),
3041 ));
3042 }
3043
3044 if service == "s3" {
3046 return Err(AwsServiceError::aws_error(
3047 StatusCode::BAD_REQUEST,
3048 "ResourceNotFoundException",
3049 format!("The resource ARN '{resource_arn}' is not a valid delivery source."),
3050 ));
3051 }
3052
3053 let valid_log_types: &[&str] = match service.as_str() {
3055 "cloudfront" => &["ACCESS_LOGS"],
3056 _ => &["ACCESS_LOGS", "APPLICATION_LOGS", "FW_LOGS"],
3057 };
3058 if !valid_log_types.contains(&log_type.as_str()) {
3059 return Err(AwsServiceError::aws_error(
3060 StatusCode::BAD_REQUEST,
3061 "ValidationException",
3062 format!("Log type '{log_type}' is not valid for this resource."),
3063 ));
3064 }
3065
3066 let mut state = self.state.write();
3067
3068 if let Some(existing) = state.delivery_sources.get(&name) {
3070 if !existing.resource_arns.is_empty() && existing.resource_arns[0] != resource_arn {
3071 return Err(AwsServiceError::aws_error(
3072 StatusCode::BAD_REQUEST,
3073 "ConflictException",
3074 "Cannot update delivery source with a different resourceArn.",
3075 ));
3076 }
3077 }
3078
3079 let arn = format!(
3080 "arn:aws:logs:{}:{}:delivery-source:{}",
3081 state.region, state.account_id, name
3082 );
3083
3084 let ds = DeliverySource {
3085 name: name.clone(),
3086 arn: arn.clone(),
3087 resource_arns: vec![resource_arn],
3088 service: service.clone(),
3089 log_type: log_type.clone(),
3090 tags: tags.clone(),
3091 };
3092
3093 state.delivery_sources.insert(name.clone(), ds);
3094
3095 let state_ref = state.delivery_sources.get(&name).unwrap();
3096
3097 Ok(AwsResponse::json(
3098 StatusCode::OK,
3099 serde_json::to_string(&json!({
3100 "deliverySource": {
3101 "name": state_ref.name,
3102 "arn": state_ref.arn,
3103 "resourceArns": state_ref.resource_arns,
3104 "service": state_ref.service,
3105 "logType": state_ref.log_type,
3106 "tags": state_ref.tags,
3107 }
3108 }))
3109 .unwrap(),
3110 ))
3111 }
3112
3113 fn get_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3114 let body = body_json(req);
3115 let name = body["name"].as_str().ok_or_else(|| {
3116 AwsServiceError::aws_error(
3117 StatusCode::BAD_REQUEST,
3118 "InvalidParameterException",
3119 "name is required",
3120 )
3121 })?;
3122
3123 validate_string_length("name", name, 1, 60)?;
3124
3125 let state = self.state.read();
3126 let ds = state.delivery_sources.get(name).ok_or_else(|| {
3127 AwsServiceError::aws_error(
3128 StatusCode::BAD_REQUEST,
3129 "ResourceNotFoundException",
3130 format!("Delivery source '{name}' does not exist."),
3131 )
3132 })?;
3133
3134 Ok(AwsResponse::json(
3135 StatusCode::OK,
3136 serde_json::to_string(&json!({
3137 "deliverySource": {
3138 "name": ds.name,
3139 "arn": ds.arn,
3140 "resourceArns": ds.resource_arns,
3141 "service": ds.service,
3142 "logType": ds.log_type,
3143 "tags": ds.tags,
3144 }
3145 }))
3146 .unwrap(),
3147 ))
3148 }
3149
3150 fn describe_delivery_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3151 let body = body_json(req);
3152 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3153 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3154
3155 let state = self.state.read();
3156 let sources: Vec<Value> = state
3157 .delivery_sources
3158 .values()
3159 .map(|ds| {
3160 json!({
3161 "name": ds.name,
3162 "arn": ds.arn,
3163 "resourceArns": ds.resource_arns,
3164 "service": ds.service,
3165 "logType": ds.log_type,
3166 "tags": ds.tags,
3167 })
3168 })
3169 .collect();
3170
3171 Ok(AwsResponse::json(
3172 StatusCode::OK,
3173 serde_json::to_string(&json!({ "deliverySources": sources })).unwrap(),
3174 ))
3175 }
3176
3177 fn delete_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3178 let body = body_json(req);
3179 let name = body["name"].as_str().ok_or_else(|| {
3180 AwsServiceError::aws_error(
3181 StatusCode::BAD_REQUEST,
3182 "InvalidParameterException",
3183 "name is required",
3184 )
3185 })?;
3186
3187 validate_string_length("name", name, 1, 60)?;
3188
3189 let mut state = self.state.write();
3190 if state.delivery_sources.remove(name).is_none() {
3191 return Err(AwsServiceError::aws_error(
3192 StatusCode::BAD_REQUEST,
3193 "ResourceNotFoundException",
3194 format!("Delivery source '{name}' does not exist."),
3195 ));
3196 }
3197
3198 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3199 }
3200
3201 fn create_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3204 let body = body_json(req);
3205 let delivery_source_name = body["deliverySourceName"]
3206 .as_str()
3207 .ok_or_else(|| {
3208 AwsServiceError::aws_error(
3209 StatusCode::BAD_REQUEST,
3210 "InvalidParameterException",
3211 "deliverySourceName is required",
3212 )
3213 })?
3214 .to_string();
3215 let delivery_destination_arn = body["deliveryDestinationArn"]
3216 .as_str()
3217 .ok_or_else(|| {
3218 AwsServiceError::aws_error(
3219 StatusCode::BAD_REQUEST,
3220 "InvalidParameterException",
3221 "deliveryDestinationArn is required",
3222 )
3223 })?
3224 .to_string();
3225
3226 validate_string_length("deliverySourceName", &delivery_source_name, 1, 60)?;
3227 validate_optional_string_length("fieldDelimiter", body["fieldDelimiter"].as_str(), 0, 5)?;
3228
3229 let tags: std::collections::HashMap<String, String> = body["tags"]
3230 .as_object()
3231 .map(|m| {
3232 m.iter()
3233 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3234 .collect()
3235 })
3236 .unwrap_or_default();
3237
3238 let record_fields: Vec<String> = body["recordFields"]
3239 .as_array()
3240 .map(|a| {
3241 a.iter()
3242 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3243 .collect()
3244 })
3245 .unwrap_or_default();
3246 let field_delimiter = body["fieldDelimiter"].as_str().map(|s| s.to_string());
3247 let s3_delivery_config = body["s3DeliveryConfiguration"].clone();
3248
3249 let mut state = self.state.write();
3250
3251 if !state.delivery_sources.contains_key(&delivery_source_name) {
3253 return Err(AwsServiceError::aws_error(
3254 StatusCode::BAD_REQUEST,
3255 "ResourceNotFoundException",
3256 format!("Delivery source '{}' does not exist.", delivery_source_name),
3257 ));
3258 }
3259
3260 let dest_exists = state
3262 .delivery_destinations
3263 .values()
3264 .any(|dd| dd.arn == delivery_destination_arn);
3265 if !dest_exists {
3266 return Err(AwsServiceError::aws_error(
3267 StatusCode::BAD_REQUEST,
3268 "ResourceNotFoundException",
3269 format!(
3270 "Delivery destination '{}' does not exist.",
3271 delivery_destination_arn
3272 ),
3273 ));
3274 }
3275
3276 let already_exists = state.deliveries.values().any(|d| {
3278 d.delivery_source_name == delivery_source_name
3279 && d.delivery_destination_arn == delivery_destination_arn
3280 });
3281 if already_exists {
3282 return Err(AwsServiceError::aws_error(
3283 StatusCode::BAD_REQUEST,
3284 "ConflictException",
3285 "A delivery already exists for this source and destination.",
3286 ));
3287 }
3288
3289 let dest_type = if delivery_destination_arn.contains(":s3:") {
3291 "S3"
3292 } else if delivery_destination_arn.contains(":firehose:") {
3293 "FH"
3294 } else {
3295 "CWL"
3296 };
3297
3298 let delivery_id = uuid::Uuid::new_v4().to_string();
3299 let arn = format!(
3300 "arn:aws:logs:{}:{}:delivery:{}",
3301 state.region, state.account_id, delivery_id
3302 );
3303
3304 let delivery = Delivery {
3305 id: delivery_id.clone(),
3306 delivery_source_name: delivery_source_name.clone(),
3307 delivery_destination_arn: delivery_destination_arn.clone(),
3308 delivery_destination_type: dest_type.to_string(),
3309 arn: arn.clone(),
3310 tags: tags.clone(),
3311 };
3312
3313 state.deliveries.insert(delivery_id.clone(), delivery);
3314
3315 let mut delivery_json = json!({
3316 "id": delivery_id,
3317 "deliverySourceName": delivery_source_name,
3318 "deliveryDestinationArn": delivery_destination_arn,
3319 "deliveryDestinationType": dest_type,
3320 "arn": arn,
3321 "tags": tags,
3322 });
3323 if !record_fields.is_empty() {
3324 delivery_json["recordFields"] = json!(record_fields);
3325 }
3326 if let Some(ref delim) = field_delimiter {
3327 delivery_json["fieldDelimiter"] = json!(delim);
3328 }
3329 if !s3_delivery_config.is_null() {
3330 delivery_json["s3DeliveryConfiguration"] = s3_delivery_config;
3331 }
3332
3333 Ok(AwsResponse::json(
3334 StatusCode::OK,
3335 serde_json::to_string(&json!({
3336 "delivery": delivery_json,
3337 }))
3338 .unwrap(),
3339 ))
3340 }
3341
3342 fn get_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3343 let body = body_json(req);
3344 let delivery_id = body["id"].as_str().ok_or_else(|| {
3345 AwsServiceError::aws_error(
3346 StatusCode::BAD_REQUEST,
3347 "InvalidParameterException",
3348 "id is required",
3349 )
3350 })?;
3351
3352 validate_string_length("id", delivery_id, 1, 64)?;
3353
3354 let state = self.state.read();
3355 let d = state.deliveries.get(delivery_id).ok_or_else(|| {
3356 AwsServiceError::aws_error(
3357 StatusCode::BAD_REQUEST,
3358 "ResourceNotFoundException",
3359 format!("Delivery '{delivery_id}' does not exist."),
3360 )
3361 })?;
3362
3363 Ok(AwsResponse::json(
3364 StatusCode::OK,
3365 serde_json::to_string(&json!({
3366 "delivery": {
3367 "id": d.id,
3368 "deliverySourceName": d.delivery_source_name,
3369 "deliveryDestinationArn": d.delivery_destination_arn,
3370 "deliveryDestinationType": d.delivery_destination_type,
3371 "arn": d.arn,
3372 "tags": d.tags,
3373 }
3374 }))
3375 .unwrap(),
3376 ))
3377 }
3378
3379 fn describe_deliveries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3380 let body = body_json(req);
3381 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3382 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3383
3384 let state = self.state.read();
3385 let deliveries: Vec<Value> = state
3386 .deliveries
3387 .values()
3388 .map(|d| {
3389 json!({
3390 "id": d.id,
3391 "deliverySourceName": d.delivery_source_name,
3392 "deliveryDestinationArn": d.delivery_destination_arn,
3393 "deliveryDestinationType": d.delivery_destination_type,
3394 "arn": d.arn,
3395 "tags": d.tags,
3396 })
3397 })
3398 .collect();
3399
3400 Ok(AwsResponse::json(
3401 StatusCode::OK,
3402 serde_json::to_string(&json!({ "deliveries": deliveries })).unwrap(),
3403 ))
3404 }
3405
3406 fn delete_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3407 let body = body_json(req);
3408 let delivery_id = body["id"].as_str().ok_or_else(|| {
3409 AwsServiceError::aws_error(
3410 StatusCode::BAD_REQUEST,
3411 "InvalidParameterException",
3412 "id is required",
3413 )
3414 })?;
3415
3416 validate_string_length("id", delivery_id, 1, 64)?;
3417
3418 let mut state = self.state.write();
3419 if state.deliveries.remove(delivery_id).is_none() {
3420 return Err(AwsServiceError::aws_error(
3421 StatusCode::BAD_REQUEST,
3422 "ResourceNotFoundException",
3423 format!("Delivery '{delivery_id}' does not exist."),
3424 ));
3425 }
3426
3427 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3428 }
3429
3430 fn associate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3433 let body = body_json(req);
3434 let log_group_name = body["logGroupName"].as_str();
3435 let resource_identifier = body["resourceIdentifier"].as_str();
3436 let kms_key_id = body["kmsKeyId"]
3437 .as_str()
3438 .ok_or_else(|| {
3439 AwsServiceError::aws_error(
3440 StatusCode::BAD_REQUEST,
3441 "InvalidParameterException",
3442 "kmsKeyId is required",
3443 )
3444 })?
3445 .to_string();
3446
3447 if let Some(name) = log_group_name {
3448 validate_string_length("logGroupName", name, 1, 512)?;
3449 }
3450 validate_string_length("kmsKeyId", &kms_key_id, 1, 256)?;
3451 validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3452
3453 let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3454
3455 let mut state = self.state.write();
3456 let group = state
3457 .log_groups
3458 .get_mut(resolved_name.as_str())
3459 .ok_or_else(|| {
3460 AwsServiceError::aws_error(
3461 StatusCode::BAD_REQUEST,
3462 "ResourceNotFoundException",
3463 format!("The specified log group does not exist: {resolved_name}"),
3464 )
3465 })?;
3466
3467 group.kms_key_id = Some(kms_key_id);
3468
3469 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3470 }
3471
3472 fn disassociate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3473 let body = body_json(req);
3474 let log_group_name = body["logGroupName"].as_str();
3475 let resource_identifier = body["resourceIdentifier"].as_str();
3476
3477 if let Some(name) = log_group_name {
3478 validate_string_length("logGroupName", name, 1, 512)?;
3479 }
3480 validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3481
3482 let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3483
3484 let mut state = self.state.write();
3485 let group = state
3486 .log_groups
3487 .get_mut(resolved_name.as_str())
3488 .ok_or_else(|| {
3489 AwsServiceError::aws_error(
3490 StatusCode::BAD_REQUEST,
3491 "ResourceNotFoundException",
3492 format!("The specified log group does not exist: {resolved_name}"),
3493 )
3494 })?;
3495
3496 group.kms_key_id = None;
3497
3498 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3499 }
3500
3501 fn put_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3504 let body = body_json(req);
3505 let name = body["name"]
3506 .as_str()
3507 .ok_or_else(|| {
3508 AwsServiceError::aws_error(
3509 StatusCode::BAD_REQUEST,
3510 "InvalidParameterException",
3511 "name is required",
3512 )
3513 })?
3514 .to_string();
3515 let query_string = body["queryString"]
3516 .as_str()
3517 .ok_or_else(|| {
3518 AwsServiceError::aws_error(
3519 StatusCode::BAD_REQUEST,
3520 "InvalidParameterException",
3521 "queryString is required",
3522 )
3523 })?
3524 .to_string();
3525 let log_group_names: Vec<String> = body["logGroupNames"]
3526 .as_array()
3527 .map(|a| {
3528 a.iter()
3529 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3530 .collect()
3531 })
3532 .unwrap_or_default();
3533
3534 let query_definition_id = body["queryDefinitionId"]
3535 .as_str()
3536 .map(|s| s.to_string())
3537 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
3538
3539 validate_string_length("name", &name, 1, 255)?;
3540 validate_string_length("queryString", &query_string, 1, 10000)?;
3541 validate_optional_string_length(
3542 "queryDefinitionId",
3543 body["queryDefinitionId"].as_str(),
3544 1,
3545 256,
3546 )?;
3547 validate_optional_string_length("clientToken", body["clientToken"].as_str(), 36, 128)?;
3548 validate_optional_enum_value(
3549 "queryLanguage",
3550 &body["queryLanguage"],
3551 &["CWLI", "SQL", "PPL"],
3552 )?;
3553
3554 let now = Utc::now().timestamp_millis();
3555
3556 let mut state = self.state.write();
3557 state.query_definitions.insert(
3558 query_definition_id.clone(),
3559 QueryDefinition {
3560 query_definition_id: query_definition_id.clone(),
3561 name,
3562 query_string,
3563 log_group_names,
3564 last_modified: now,
3565 },
3566 );
3567
3568 Ok(AwsResponse::json(
3569 StatusCode::OK,
3570 serde_json::to_string(&json!({
3571 "queryDefinitionId": query_definition_id,
3572 }))
3573 .unwrap(),
3574 ))
3575 }
3576
3577 fn describe_query_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3578 let body = body_json(req);
3579 let name_prefix = body["queryDefinitionNamePrefix"].as_str().unwrap_or("");
3580 validate_optional_string_length(
3581 "queryDefinitionNamePrefix",
3582 body["queryDefinitionNamePrefix"].as_str(),
3583 1,
3584 255,
3585 )?;
3586 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
3587 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3588 validate_optional_enum_value(
3589 "queryLanguage",
3590 &body["queryLanguage"],
3591 &["CWLI", "SQL", "PPL"],
3592 )?;
3593
3594 let state = self.state.read();
3595 let defs: Vec<Value> = state
3596 .query_definitions
3597 .values()
3598 .filter(|qd| name_prefix.is_empty() || qd.name.starts_with(name_prefix))
3599 .map(|qd| {
3600 json!({
3601 "queryDefinitionId": qd.query_definition_id,
3602 "name": qd.name,
3603 "queryString": qd.query_string,
3604 "logGroupNames": qd.log_group_names,
3605 "lastModified": qd.last_modified,
3606 })
3607 })
3608 .collect();
3609
3610 Ok(AwsResponse::json(
3611 StatusCode::OK,
3612 serde_json::to_string(&json!({ "queryDefinitions": defs })).unwrap(),
3613 ))
3614 }
3615
3616 fn delete_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3617 let body = body_json(req);
3618 let qd_id = body["queryDefinitionId"].as_str().ok_or_else(|| {
3619 AwsServiceError::aws_error(
3620 StatusCode::BAD_REQUEST,
3621 "InvalidParameterException",
3622 "queryDefinitionId is required",
3623 )
3624 })?;
3625
3626 validate_string_length("queryDefinitionId", qd_id, 1, 256)?;
3627
3628 let mut state = self.state.write();
3629 let success = state.query_definitions.remove(qd_id).is_some();
3630
3631 Ok(AwsResponse::json(
3632 StatusCode::OK,
3633 serde_json::to_string(&json!({ "success": success })).unwrap(),
3634 ))
3635 }
3636
3637 fn put_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3640 let body = body_json(req);
3641 let policy_name = body["policyName"].as_str().ok_or_else(|| {
3642 AwsServiceError::aws_error(
3643 StatusCode::BAD_REQUEST,
3644 "InvalidParameterException",
3645 "policyName is required",
3646 )
3647 })?;
3648 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3649 AwsServiceError::aws_error(
3650 StatusCode::BAD_REQUEST,
3651 "InvalidParameterException",
3652 "policyType is required",
3653 )
3654 })?;
3655 let policy_document = body["policyDocument"].as_str().ok_or_else(|| {
3656 AwsServiceError::aws_error(
3657 StatusCode::BAD_REQUEST,
3658 "InvalidParameterException",
3659 "policyDocument is required",
3660 )
3661 })?;
3662
3663 let now = Utc::now().timestamp_millis();
3664 let mut state = self.state.write();
3665 let account_id = state.account_id.clone();
3666 let scope = body["scope"].as_str().map(|s| s.to_string());
3667 let selection_criteria = body["selectionCriteria"].as_str().map(|s| s.to_string());
3668
3669 let policy = AccountPolicy {
3670 policy_name: policy_name.to_string(),
3671 policy_type: policy_type.to_string(),
3672 policy_document: policy_document.to_string(),
3673 scope: scope.clone(),
3674 selection_criteria: selection_criteria.clone(),
3675 account_id: account_id.clone(),
3676 last_updated_time: now,
3677 };
3678
3679 let key = (policy_name.to_string(), policy_type.to_string());
3680 state.account_policies.insert(key, policy);
3681
3682 let mut result = json!({
3683 "accountPolicy": {
3684 "policyName": policy_name,
3685 "policyType": policy_type,
3686 "policyDocument": policy_document,
3687 "accountId": account_id,
3688 "lastUpdatedTime": now,
3689 }
3690 });
3691 if let Some(s) = scope {
3692 result["accountPolicy"]["scope"] = json!(s);
3693 }
3694 if let Some(s) = selection_criteria {
3695 result["accountPolicy"]["selectionCriteria"] = json!(s);
3696 }
3697
3698 Ok(AwsResponse::json(
3699 StatusCode::OK,
3700 serde_json::to_string(&result).unwrap(),
3701 ))
3702 }
3703
3704 fn describe_account_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3705 let body = body_json(req);
3706 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3707 AwsServiceError::aws_error(
3708 StatusCode::BAD_REQUEST,
3709 "InvalidParameterException",
3710 "policyType is required",
3711 )
3712 })?;
3713 let policy_name = body["policyName"].as_str();
3714
3715 let state = self.state.read();
3716 let policies: Vec<Value> = state
3717 .account_policies
3718 .values()
3719 .filter(|p| {
3720 p.policy_type == policy_type && policy_name.is_none_or(|n| p.policy_name == n)
3721 })
3722 .map(|p| {
3723 let mut obj = json!({
3724 "policyName": p.policy_name,
3725 "policyType": p.policy_type,
3726 "policyDocument": p.policy_document,
3727 "accountId": p.account_id,
3728 "lastUpdatedTime": p.last_updated_time,
3729 });
3730 if let Some(ref s) = p.scope {
3731 obj["scope"] = json!(s);
3732 }
3733 if let Some(ref s) = p.selection_criteria {
3734 obj["selectionCriteria"] = json!(s);
3735 }
3736 obj
3737 })
3738 .collect();
3739
3740 Ok(AwsResponse::json(
3741 StatusCode::OK,
3742 serde_json::to_string(&json!({ "accountPolicies": policies })).unwrap(),
3743 ))
3744 }
3745
3746 fn delete_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3747 let body = body_json(req);
3748 let policy_name = body["policyName"].as_str().ok_or_else(|| {
3749 AwsServiceError::aws_error(
3750 StatusCode::BAD_REQUEST,
3751 "InvalidParameterException",
3752 "policyName is required",
3753 )
3754 })?;
3755 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3756 AwsServiceError::aws_error(
3757 StatusCode::BAD_REQUEST,
3758 "InvalidParameterException",
3759 "policyType is required",
3760 )
3761 })?;
3762
3763 let key = (policy_name.to_string(), policy_type.to_string());
3764 let mut state = self.state.write();
3765 if state.account_policies.remove(&key).is_none() {
3766 return Err(AwsServiceError::aws_error(
3767 StatusCode::BAD_REQUEST,
3768 "ResourceNotFoundException",
3769 format!("Account policy {policy_name} of type {policy_type} not found"),
3770 ));
3771 }
3772
3773 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3774 }
3775
3776 fn put_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3779 let body = body_json(req);
3780 let log_group_id = body["logGroupIdentifier"]
3781 .as_str()
3782 .ok_or_else(|| {
3783 AwsServiceError::aws_error(
3784 StatusCode::BAD_REQUEST,
3785 "InvalidParameterException",
3786 "logGroupIdentifier is required",
3787 )
3788 })?
3789 .to_string();
3790 let policy_document = body["policyDocument"]
3791 .as_str()
3792 .ok_or_else(|| {
3793 AwsServiceError::aws_error(
3794 StatusCode::BAD_REQUEST,
3795 "InvalidParameterException",
3796 "policyDocument is required",
3797 )
3798 })?
3799 .to_string();
3800
3801 let group_name = if log_group_id.starts_with("arn:") {
3802 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3803 AwsServiceError::aws_error(
3804 StatusCode::BAD_REQUEST,
3805 "InvalidParameterException",
3806 format!("Invalid ARN: {log_group_id}"),
3807 )
3808 })?
3809 } else {
3810 log_group_id.clone()
3811 };
3812
3813 let now = Utc::now().timestamp_millis();
3814 let mut state = self.state.write();
3815 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3816 AwsServiceError::aws_error(
3817 StatusCode::BAD_REQUEST,
3818 "ResourceNotFoundException",
3819 format!("The specified log group does not exist: {group_name}"),
3820 )
3821 })?;
3822 let log_group_id_resp = group.arn.clone();
3823
3824 group.data_protection_policy = Some(DataProtectionPolicy {
3825 policy_document: policy_document.clone(),
3826 last_updated_time: now,
3827 });
3828
3829 Ok(AwsResponse::json(
3830 StatusCode::OK,
3831 serde_json::to_string(&json!({
3832 "logGroupIdentifier": log_group_id_resp,
3833 "policyDocument": policy_document,
3834 "lastUpdatedTime": now,
3835 }))
3836 .unwrap(),
3837 ))
3838 }
3839
3840 fn get_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3841 let body = body_json(req);
3842 let log_group_id = body["logGroupIdentifier"]
3843 .as_str()
3844 .ok_or_else(|| {
3845 AwsServiceError::aws_error(
3846 StatusCode::BAD_REQUEST,
3847 "InvalidParameterException",
3848 "logGroupIdentifier is required",
3849 )
3850 })?
3851 .to_string();
3852
3853 let group_name = if log_group_id.starts_with("arn:") {
3854 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3855 AwsServiceError::aws_error(
3856 StatusCode::BAD_REQUEST,
3857 "InvalidParameterException",
3858 format!("Invalid ARN: {log_group_id}"),
3859 )
3860 })?
3861 } else {
3862 log_group_id.clone()
3863 };
3864
3865 let state = self.state.read();
3866 let group = state.log_groups.get(&group_name).ok_or_else(|| {
3867 AwsServiceError::aws_error(
3868 StatusCode::BAD_REQUEST,
3869 "ResourceNotFoundException",
3870 format!("The specified log group does not exist: {group_name}"),
3871 )
3872 })?;
3873
3874 let mut result = json!({
3875 "logGroupIdentifier": group.arn,
3876 });
3877 if let Some(ref dp) = group.data_protection_policy {
3878 result["policyDocument"] = json!(dp.policy_document);
3879 result["lastUpdatedTime"] = json!(dp.last_updated_time);
3880 }
3881
3882 Ok(AwsResponse::json(
3883 StatusCode::OK,
3884 serde_json::to_string(&result).unwrap(),
3885 ))
3886 }
3887
3888 fn delete_data_protection_policy(
3889 &self,
3890 req: &AwsRequest,
3891 ) -> Result<AwsResponse, AwsServiceError> {
3892 let body = body_json(req);
3893 let log_group_id = body["logGroupIdentifier"]
3894 .as_str()
3895 .ok_or_else(|| {
3896 AwsServiceError::aws_error(
3897 StatusCode::BAD_REQUEST,
3898 "InvalidParameterException",
3899 "logGroupIdentifier is required",
3900 )
3901 })?
3902 .to_string();
3903
3904 let group_name = if log_group_id.starts_with("arn:") {
3905 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3906 AwsServiceError::aws_error(
3907 StatusCode::BAD_REQUEST,
3908 "InvalidParameterException",
3909 format!("Invalid ARN: {log_group_id}"),
3910 )
3911 })?
3912 } else {
3913 log_group_id
3914 };
3915
3916 let mut state = self.state.write();
3917 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3918 AwsServiceError::aws_error(
3919 StatusCode::BAD_REQUEST,
3920 "ResourceNotFoundException",
3921 format!("The specified log group does not exist: {group_name}"),
3922 )
3923 })?;
3924
3925 if group.data_protection_policy.is_none() {
3926 return Err(AwsServiceError::aws_error(
3927 StatusCode::BAD_REQUEST,
3928 "ResourceNotFoundException",
3929 "No data protection policy found for this log group",
3930 ));
3931 }
3932
3933 group.data_protection_policy = None;
3934 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3935 }
3936
3937 fn put_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3940 let body = body_json(req);
3941 let log_group_id = body["logGroupIdentifier"]
3942 .as_str()
3943 .ok_or_else(|| {
3944 AwsServiceError::aws_error(
3945 StatusCode::BAD_REQUEST,
3946 "InvalidParameterException",
3947 "logGroupIdentifier is required",
3948 )
3949 })?
3950 .to_string();
3951 let policy_document = body["policyDocument"]
3952 .as_str()
3953 .ok_or_else(|| {
3954 AwsServiceError::aws_error(
3955 StatusCode::BAD_REQUEST,
3956 "InvalidParameterException",
3957 "policyDocument is required",
3958 )
3959 })?
3960 .to_string();
3961
3962 let group_name = if log_group_id.starts_with("arn:") {
3963 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3964 AwsServiceError::aws_error(
3965 StatusCode::BAD_REQUEST,
3966 "InvalidParameterException",
3967 format!("Invalid ARN: {log_group_id}"),
3968 )
3969 })?
3970 } else {
3971 log_group_id.clone()
3972 };
3973
3974 let policy_name = body["policyName"].as_str().unwrap_or("default").to_string();
3975
3976 let now = Utc::now().timestamp_millis();
3977 let mut state = self.state.write();
3978 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3979 AwsServiceError::aws_error(
3980 StatusCode::BAD_REQUEST,
3981 "ResourceNotFoundException",
3982 format!("The specified log group does not exist: {group_name}"),
3983 )
3984 })?;
3985
3986 if let Some(existing) = group
3988 .index_policies
3989 .iter_mut()
3990 .find(|p| p.policy_name == policy_name)
3991 {
3992 existing.policy_document = policy_document.clone();
3993 existing.last_updated_time = now;
3994 } else {
3995 group.index_policies.push(IndexPolicy {
3996 policy_name: policy_name.clone(),
3997 policy_document: policy_document.clone(),
3998 last_updated_time: now,
3999 });
4000 }
4001
4002 let result = json!({
4003 "indexPolicy": {
4004 "policyName": policy_name,
4005 "policyDocument": policy_document,
4006 "logGroupIdentifier": group.arn,
4007 "lastUpdateTime": now,
4008 }
4009 });
4010
4011 Ok(AwsResponse::json(
4012 StatusCode::OK,
4013 serde_json::to_string(&result).unwrap(),
4014 ))
4015 }
4016
4017 fn describe_index_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4018 let body = body_json(req);
4019 let log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4020 AwsServiceError::aws_error(
4021 StatusCode::BAD_REQUEST,
4022 "InvalidParameterException",
4023 "logGroupIdentifiers is required",
4024 )
4025 })?;
4026
4027 let state = self.state.read();
4028 let mut policies = Vec::new();
4029
4030 for id_val in log_group_ids {
4031 let id = id_val.as_str().unwrap_or("");
4032 let group_name = if id.starts_with("arn:") {
4033 extract_log_group_from_arn(id).unwrap_or_default()
4034 } else {
4035 id.to_string()
4036 };
4037 if let Some(group) = state.log_groups.get(&group_name) {
4038 for p in &group.index_policies {
4039 policies.push(json!({
4040 "policyName": p.policy_name,
4041 "policyDocument": p.policy_document,
4042 "logGroupIdentifier": group.arn,
4043 "lastUpdateTime": p.last_updated_time,
4044 }));
4045 }
4046 }
4047 }
4048
4049 Ok(AwsResponse::json(
4050 StatusCode::OK,
4051 serde_json::to_string(&json!({ "indexPolicies": policies })).unwrap(),
4052 ))
4053 }
4054
4055 fn delete_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4056 let body = body_json(req);
4057 let log_group_id = body["logGroupIdentifier"]
4058 .as_str()
4059 .ok_or_else(|| {
4060 AwsServiceError::aws_error(
4061 StatusCode::BAD_REQUEST,
4062 "InvalidParameterException",
4063 "logGroupIdentifier is required",
4064 )
4065 })?
4066 .to_string();
4067
4068 let group_name = if log_group_id.starts_with("arn:") {
4069 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4070 AwsServiceError::aws_error(
4071 StatusCode::BAD_REQUEST,
4072 "InvalidParameterException",
4073 format!("Invalid ARN: {log_group_id}"),
4074 )
4075 })?
4076 } else {
4077 log_group_id
4078 };
4079
4080 let mut state = self.state.write();
4081 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4082 AwsServiceError::aws_error(
4083 StatusCode::BAD_REQUEST,
4084 "ResourceNotFoundException",
4085 format!("The specified log group does not exist: {group_name}"),
4086 )
4087 })?;
4088
4089 if group.index_policies.is_empty() {
4090 return Err(AwsServiceError::aws_error(
4091 StatusCode::BAD_REQUEST,
4092 "ResourceNotFoundException",
4093 "No index policy found for this log group",
4094 ));
4095 }
4096
4097 group.index_policies.clear();
4098 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4099 }
4100
4101 fn describe_field_indexes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4102 let body = body_json(req);
4103 let _log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4105 AwsServiceError::aws_error(
4106 StatusCode::BAD_REQUEST,
4107 "InvalidParameterException",
4108 "logGroupIdentifiers is required",
4109 )
4110 })?;
4111
4112 Ok(AwsResponse::json(
4114 StatusCode::OK,
4115 serde_json::to_string(&json!({ "fieldIndexes": [] })).unwrap(),
4116 ))
4117 }
4118
4119 fn put_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4122 let body = body_json(req);
4123 let log_group_id = body["logGroupIdentifier"]
4124 .as_str()
4125 .ok_or_else(|| {
4126 AwsServiceError::aws_error(
4127 StatusCode::BAD_REQUEST,
4128 "InvalidParameterException",
4129 "logGroupIdentifier is required",
4130 )
4131 })?
4132 .to_string();
4133 let transformer_config = body.get("transformerConfig").cloned().ok_or_else(|| {
4134 AwsServiceError::aws_error(
4135 StatusCode::BAD_REQUEST,
4136 "InvalidParameterException",
4137 "transformerConfig is required",
4138 )
4139 })?;
4140
4141 let group_name = if log_group_id.starts_with("arn:") {
4142 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4143 AwsServiceError::aws_error(
4144 StatusCode::BAD_REQUEST,
4145 "InvalidParameterException",
4146 format!("Invalid ARN: {log_group_id}"),
4147 )
4148 })?
4149 } else {
4150 log_group_id.clone()
4151 };
4152
4153 let now = Utc::now().timestamp_millis();
4154 let mut state = self.state.write();
4155 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4156 AwsServiceError::aws_error(
4157 StatusCode::BAD_REQUEST,
4158 "ResourceNotFoundException",
4159 format!("The specified log group does not exist: {group_name}"),
4160 )
4161 })?;
4162
4163 group.transformer = Some(Transformer {
4164 transformer_config,
4165 creation_time: now,
4166 last_modified_time: now,
4167 });
4168
4169 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4170 }
4171
4172 fn get_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4173 let body = body_json(req);
4174 let log_group_id = body["logGroupIdentifier"]
4175 .as_str()
4176 .ok_or_else(|| {
4177 AwsServiceError::aws_error(
4178 StatusCode::BAD_REQUEST,
4179 "InvalidParameterException",
4180 "logGroupIdentifier is required",
4181 )
4182 })?
4183 .to_string();
4184
4185 let group_name = if log_group_id.starts_with("arn:") {
4186 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4187 AwsServiceError::aws_error(
4188 StatusCode::BAD_REQUEST,
4189 "InvalidParameterException",
4190 format!("Invalid ARN: {log_group_id}"),
4191 )
4192 })?
4193 } else {
4194 log_group_id.clone()
4195 };
4196
4197 let state = self.state.read();
4198 let group = state.log_groups.get(&group_name).ok_or_else(|| {
4199 AwsServiceError::aws_error(
4200 StatusCode::BAD_REQUEST,
4201 "ResourceNotFoundException",
4202 format!("The specified log group does not exist: {group_name}"),
4203 )
4204 })?;
4205
4206 let mut result = json!({
4207 "logGroupIdentifier": group.arn,
4208 });
4209 if let Some(ref t) = group.transformer {
4210 result["transformerConfig"] = t.transformer_config.clone();
4211 result["creationTime"] = json!(t.creation_time);
4212 result["lastModifiedTime"] = json!(t.last_modified_time);
4213 }
4214
4215 Ok(AwsResponse::json(
4216 StatusCode::OK,
4217 serde_json::to_string(&result).unwrap(),
4218 ))
4219 }
4220
4221 fn delete_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4222 let body = body_json(req);
4223 let log_group_id = body["logGroupIdentifier"]
4224 .as_str()
4225 .ok_or_else(|| {
4226 AwsServiceError::aws_error(
4227 StatusCode::BAD_REQUEST,
4228 "InvalidParameterException",
4229 "logGroupIdentifier is required",
4230 )
4231 })?
4232 .to_string();
4233
4234 let group_name = if log_group_id.starts_with("arn:") {
4235 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4236 AwsServiceError::aws_error(
4237 StatusCode::BAD_REQUEST,
4238 "InvalidParameterException",
4239 format!("Invalid ARN: {log_group_id}"),
4240 )
4241 })?
4242 } else {
4243 log_group_id
4244 };
4245
4246 let mut state = self.state.write();
4247 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4248 AwsServiceError::aws_error(
4249 StatusCode::BAD_REQUEST,
4250 "ResourceNotFoundException",
4251 format!("The specified log group does not exist: {group_name}"),
4252 )
4253 })?;
4254
4255 group.transformer = None;
4256 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4257 }
4258
4259 fn test_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4260 let body = body_json(req);
4261 let _transformer_config = body.get("transformerConfig").ok_or_else(|| {
4262 AwsServiceError::aws_error(
4263 StatusCode::BAD_REQUEST,
4264 "InvalidParameterException",
4265 "transformerConfig is required",
4266 )
4267 })?;
4268 let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4269 AwsServiceError::aws_error(
4270 StatusCode::BAD_REQUEST,
4271 "InvalidParameterException",
4272 "logEventMessages is required",
4273 )
4274 })?;
4275
4276 let transformed: Vec<Value> = log_event_messages
4278 .iter()
4279 .map(|msg| {
4280 json!({
4281 "eventMessage": msg,
4282 "transformedEventMessage": msg,
4283 })
4284 })
4285 .collect();
4286
4287 Ok(AwsResponse::json(
4288 StatusCode::OK,
4289 serde_json::to_string(&json!({
4290 "transformedLogs": transformed,
4291 }))
4292 .unwrap(),
4293 ))
4294 }
4295
4296 fn create_log_anomaly_detector(
4299 &self,
4300 req: &AwsRequest,
4301 ) -> Result<AwsResponse, AwsServiceError> {
4302 let body = body_json(req);
4303 let log_group_arn_list = body["logGroupArnList"]
4304 .as_array()
4305 .ok_or_else(|| {
4306 AwsServiceError::aws_error(
4307 StatusCode::BAD_REQUEST,
4308 "InvalidParameterException",
4309 "logGroupArnList is required",
4310 )
4311 })?
4312 .iter()
4313 .filter_map(|v| v.as_str().map(|s| s.to_string()))
4314 .collect::<Vec<_>>();
4315
4316 let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
4317 let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
4318 let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
4319 let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
4320
4321 let now = Utc::now().timestamp_millis();
4322 let mut state = self.state.write();
4323 let detector_id = uuid::Uuid::new_v4().to_string();
4324 let arn = format!(
4325 "arn:aws:logs:{}:{}:anomaly-detector:{}",
4326 state.region, state.account_id, detector_id
4327 );
4328
4329 let detector = AnomalyDetector {
4330 detector_name: detector_name.clone(),
4331 arn: arn.clone(),
4332 log_group_arn_list,
4333 evaluation_frequency,
4334 filter_pattern,
4335 anomaly_visibility_time,
4336 creation_time: now,
4337 last_modified_time: now,
4338 enabled: true,
4339 };
4340
4341 state.anomaly_detectors.insert(arn.clone(), detector);
4342
4343 Ok(AwsResponse::json(
4344 StatusCode::OK,
4345 serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
4346 ))
4347 }
4348
4349 fn get_log_anomaly_detector(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4350 let body = body_json(req);
4351 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4352 AwsServiceError::aws_error(
4353 StatusCode::BAD_REQUEST,
4354 "InvalidParameterException",
4355 "anomalyDetectorArn is required",
4356 )
4357 })?;
4358
4359 let state = self.state.read();
4360 let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
4361 AwsServiceError::aws_error(
4362 StatusCode::BAD_REQUEST,
4363 "ResourceNotFoundException",
4364 format!("Anomaly detector not found: {arn}"),
4365 )
4366 })?;
4367
4368 let mut result = json!({
4369 "anomalyDetectorArn": detector.arn,
4370 "detectorName": detector.detector_name,
4371 "logGroupArnList": detector.log_group_arn_list,
4372 "creationTimeStamp": detector.creation_time,
4373 "lastModifiedTimeStamp": detector.last_modified_time,
4374 "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
4375 });
4376 if let Some(ref f) = detector.evaluation_frequency {
4377 result["evaluationFrequency"] = json!(f);
4378 }
4379 if let Some(ref f) = detector.filter_pattern {
4380 result["filterPattern"] = json!(f);
4381 }
4382 if let Some(t) = detector.anomaly_visibility_time {
4383 result["anomalyVisibilityTime"] = json!(t);
4384 }
4385
4386 Ok(AwsResponse::json(
4387 StatusCode::OK,
4388 serde_json::to_string(&result).unwrap(),
4389 ))
4390 }
4391
4392 fn delete_log_anomaly_detector(
4393 &self,
4394 req: &AwsRequest,
4395 ) -> Result<AwsResponse, AwsServiceError> {
4396 let body = body_json(req);
4397 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4398 AwsServiceError::aws_error(
4399 StatusCode::BAD_REQUEST,
4400 "InvalidParameterException",
4401 "anomalyDetectorArn is required",
4402 )
4403 })?;
4404
4405 let mut state = self.state.write();
4406 if state.anomaly_detectors.remove(arn).is_none() {
4407 return Err(AwsServiceError::aws_error(
4408 StatusCode::BAD_REQUEST,
4409 "ResourceNotFoundException",
4410 format!("Anomaly detector not found: {arn}"),
4411 ));
4412 }
4413
4414 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4415 }
4416
4417 fn list_log_anomaly_detectors(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4418 let body = body_json(req);
4419 let filter_log_group_arn = body["filterLogGroupArn"].as_str();
4420 let _limit = body["limit"].as_i64().unwrap_or(50);
4421
4422 let state = self.state.read();
4423 let detectors: Vec<Value> = state
4424 .anomaly_detectors
4425 .values()
4426 .filter(|d| {
4427 filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
4428 })
4429 .map(|d| {
4430 let mut obj = json!({
4431 "anomalyDetectorArn": d.arn,
4432 "detectorName": d.detector_name,
4433 "logGroupArnList": d.log_group_arn_list,
4434 "creationTimeStamp": d.creation_time,
4435 "lastModifiedTimeStamp": d.last_modified_time,
4436 "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
4437 });
4438 if let Some(ref f) = d.evaluation_frequency {
4439 obj["evaluationFrequency"] = json!(f);
4440 }
4441 if let Some(ref f) = d.filter_pattern {
4442 obj["filterPattern"] = json!(f);
4443 }
4444 if let Some(t) = d.anomaly_visibility_time {
4445 obj["anomalyVisibilityTime"] = json!(t);
4446 }
4447 obj
4448 })
4449 .collect();
4450
4451 Ok(AwsResponse::json(
4452 StatusCode::OK,
4453 serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
4454 ))
4455 }
4456
4457 fn update_log_anomaly_detector(
4458 &self,
4459 req: &AwsRequest,
4460 ) -> Result<AwsResponse, AwsServiceError> {
4461 let body = body_json(req);
4462 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4463 AwsServiceError::aws_error(
4464 StatusCode::BAD_REQUEST,
4465 "InvalidParameterException",
4466 "anomalyDetectorArn is required",
4467 )
4468 })?;
4469 let enabled = body["enabled"].as_bool().unwrap_or(true);
4470
4471 let mut state = self.state.write();
4472 let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
4473 AwsServiceError::aws_error(
4474 StatusCode::BAD_REQUEST,
4475 "ResourceNotFoundException",
4476 format!("Anomaly detector not found: {arn}"),
4477 )
4478 })?;
4479
4480 detector.enabled = enabled;
4481 if let Some(f) = body["evaluationFrequency"].as_str() {
4482 detector.evaluation_frequency = Some(f.to_string());
4483 }
4484 if let Some(f) = body["filterPattern"].as_str() {
4485 detector.filter_pattern = Some(f.to_string());
4486 }
4487 if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
4488 detector.anomaly_visibility_time = Some(t);
4489 }
4490 detector.last_modified_time = Utc::now().timestamp_millis();
4491
4492 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4493 }
4494
4495 fn get_log_group_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4498 let body = body_json(req);
4499 let log_group_id = body["logGroupName"]
4500 .as_str()
4501 .or_else(|| body["logGroupIdentifier"].as_str())
4502 .ok_or_else(|| {
4503 AwsServiceError::aws_error(
4504 StatusCode::BAD_REQUEST,
4505 "InvalidParameterException",
4506 "logGroupName or logGroupIdentifier is required",
4507 )
4508 })?;
4509
4510 let group_name = if log_group_id.starts_with("arn:") {
4511 extract_log_group_from_arn(log_group_id).ok_or_else(|| {
4512 AwsServiceError::aws_error(
4513 StatusCode::BAD_REQUEST,
4514 "InvalidParameterException",
4515 format!("Invalid ARN: {log_group_id}"),
4516 )
4517 })?
4518 } else {
4519 log_group_id.to_string()
4520 };
4521
4522 let state = self.state.read();
4523 if !state.log_groups.contains_key(&group_name) {
4524 return Err(AwsServiceError::aws_error(
4525 StatusCode::BAD_REQUEST,
4526 "ResourceNotFoundException",
4527 format!("The specified log group does not exist: {group_name}"),
4528 ));
4529 }
4530
4531 let fields = json!([
4533 { "fieldName": "@timestamp", "percent": 100 },
4534 { "fieldName": "@message", "percent": 100 },
4535 ]);
4536
4537 Ok(AwsResponse::json(
4538 StatusCode::OK,
4539 serde_json::to_string(&json!({ "logGroupFields": fields })).unwrap(),
4540 ))
4541 }
4542
4543 fn test_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4544 let body = body_json(req);
4545 let filter_pattern = body["filterPattern"].as_str().ok_or_else(|| {
4546 AwsServiceError::aws_error(
4547 StatusCode::BAD_REQUEST,
4548 "InvalidParameterException",
4549 "filterPattern is required",
4550 )
4551 })?;
4552 let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4553 AwsServiceError::aws_error(
4554 StatusCode::BAD_REQUEST,
4555 "InvalidParameterException",
4556 "logEventMessages is required",
4557 )
4558 })?;
4559
4560 let matches: Vec<Value> = log_event_messages
4561 .iter()
4562 .enumerate()
4563 .filter(|(_, msg)| {
4564 let msg_str = msg.as_str().unwrap_or("");
4565 matches_filter_pattern(filter_pattern, msg_str)
4566 })
4567 .map(|(i, msg)| {
4568 json!({
4569 "eventNumber": i + 1,
4570 "eventMessage": msg,
4571 "extractedValues": {},
4572 })
4573 })
4574 .collect();
4575
4576 Ok(AwsResponse::json(
4577 StatusCode::OK,
4578 serde_json::to_string(
4579 &json!({ "matches": matches, "testResults": !matches.is_empty() }),
4580 )
4581 .unwrap(),
4582 ))
4583 }
4584
4585 fn stop_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4586 let body = body_json(req);
4587 let query_id = body["queryId"].as_str().ok_or_else(|| {
4588 AwsServiceError::aws_error(
4589 StatusCode::BAD_REQUEST,
4590 "InvalidParameterException",
4591 "queryId is required",
4592 )
4593 })?;
4594
4595 let mut state = self.state.write();
4596 let query = state.queries.get_mut(query_id).ok_or_else(|| {
4597 AwsServiceError::aws_error(
4598 StatusCode::BAD_REQUEST,
4599 "InvalidParameterException",
4600 format!("Query {query_id} is not in a cancellable state"),
4601 )
4602 })?;
4603
4604 let was_running = query.status == "Running" || query.status == "Scheduled";
4605 if was_running {
4606 query.status = "Cancelled".to_string();
4607 }
4608
4609 Ok(AwsResponse::json(
4610 StatusCode::OK,
4611 serde_json::to_string(&json!({ "success": was_running })).unwrap(),
4612 ))
4613 }
4614
4615 fn put_log_group_deletion_protection(
4616 &self,
4617 req: &AwsRequest,
4618 ) -> Result<AwsResponse, AwsServiceError> {
4619 let body = body_json(req);
4620 let log_group_id = body["logGroupIdentifier"]
4621 .as_str()
4622 .ok_or_else(|| {
4623 AwsServiceError::aws_error(
4624 StatusCode::BAD_REQUEST,
4625 "InvalidParameterException",
4626 "logGroupIdentifier is required",
4627 )
4628 })?
4629 .to_string();
4630 let deletion_protection = body["deletionProtectionEnabled"].as_bool().unwrap_or(true);
4631
4632 let group_name = if log_group_id.starts_with("arn:") {
4633 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4634 AwsServiceError::aws_error(
4635 StatusCode::BAD_REQUEST,
4636 "InvalidParameterException",
4637 format!("Invalid ARN: {log_group_id}"),
4638 )
4639 })?
4640 } else {
4641 log_group_id
4642 };
4643
4644 let mut state = self.state.write();
4645 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4646 AwsServiceError::aws_error(
4647 StatusCode::BAD_REQUEST,
4648 "ResourceNotFoundException",
4649 format!("The specified log group does not exist: {group_name}"),
4650 )
4651 })?;
4652
4653 group.deletion_protection = deletion_protection;
4654 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4655 }
4656
4657 fn get_log_record(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4658 let body = body_json(req);
4659 let _log_record_pointer = body["logRecordPointer"].as_str().ok_or_else(|| {
4660 AwsServiceError::aws_error(
4661 StatusCode::BAD_REQUEST,
4662 "InvalidParameterException",
4663 "logRecordPointer is required",
4664 )
4665 })?;
4666
4667 Ok(AwsResponse::json(
4669 StatusCode::OK,
4670 serde_json::to_string(&json!({ "logRecord": {} })).unwrap(),
4671 ))
4672 }
4673
4674 fn list_anomalies(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4675 Ok(AwsResponse::json(
4677 StatusCode::OK,
4678 serde_json::to_string(&json!({ "anomalies": [] })).unwrap(),
4679 ))
4680 }
4681
4682 fn update_anomaly(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4683 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4685 }
4686
4687 fn create_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4690 let body = body_json(req);
4691 let import_source_arn = require_str(&body, "importSourceArn")?;
4692 let import_role_arn = require_str(&body, "importRoleArn")?;
4693 let log_group_name = body["logGroupName"].as_str().map(|s| s.to_string());
4694
4695 let import_id = uuid::Uuid::new_v4().to_string();
4696 let now = Utc::now().timestamp_millis();
4697
4698 let task = ImportTask {
4699 import_id: import_id.clone(),
4700 import_source_arn: import_source_arn.to_string(),
4701 import_role_arn: import_role_arn.to_string(),
4702 log_group_name,
4703 status: "RUNNING".to_string(),
4704 creation_time: now,
4705 };
4706
4707 let mut state = self.state.write();
4708 state.import_tasks.insert(import_id.clone(), task);
4709
4710 Ok(AwsResponse::json(
4711 StatusCode::OK,
4712 serde_json::to_string(&json!({ "importId": import_id })).unwrap(),
4713 ))
4714 }
4715
4716 fn describe_import_tasks(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4717 let state = self.state.read();
4718 let tasks: Vec<Value> = state
4719 .import_tasks
4720 .values()
4721 .map(|t| {
4722 json!({
4723 "importId": t.import_id,
4724 "importSourceArn": t.import_source_arn,
4725 "importStatus": t.status,
4726 "creationTime": t.creation_time,
4727 })
4728 })
4729 .collect();
4730 Ok(AwsResponse::json(
4731 StatusCode::OK,
4732 serde_json::to_string(&json!({ "imports": tasks })).unwrap(),
4733 ))
4734 }
4735
4736 fn describe_import_task_batches(
4737 &self,
4738 req: &AwsRequest,
4739 ) -> Result<AwsResponse, AwsServiceError> {
4740 let body = body_json(req);
4741 let _import_id = require_str(&body, "importId")?;
4742 Ok(AwsResponse::json(
4744 StatusCode::OK,
4745 serde_json::to_string(&json!({ "importBatches": [] })).unwrap(),
4746 ))
4747 }
4748
4749 fn cancel_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4750 let body = body_json(req);
4751 let import_id = require_str(&body, "importId")?;
4752
4753 let mut state = self.state.write();
4754 match state.import_tasks.get_mut(import_id) {
4755 Some(task) => {
4756 task.status = "CANCELLED".to_string();
4757 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4758 }
4759 None => Err(AwsServiceError::aws_error(
4760 StatusCode::NOT_FOUND,
4761 "ResourceNotFoundException",
4762 format!("Import task not found: {import_id}"),
4763 )),
4764 }
4765 }
4766
4767 fn put_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4770 let body = body_json(req);
4771 let integration_name = require_str(&body, "integrationName")?;
4772 let integration_type = require_str(&body, "integrationType")?;
4773 let resource_config = body["resourceConfig"].clone();
4774
4775 let now = Utc::now().timestamp_millis();
4776 let integration = Integration {
4777 integration_name: integration_name.to_string(),
4778 integration_type: integration_type.to_string(),
4779 resource_config,
4780 status: "ACTIVE".to_string(),
4781 creation_time: now,
4782 };
4783
4784 let mut state = self.state.write();
4785 state
4786 .integrations
4787 .insert(integration_name.to_string(), integration);
4788
4789 Ok(AwsResponse::json(
4790 StatusCode::OK,
4791 serde_json::to_string(&json!({
4792 "integrationName": integration_name,
4793 "integrationStatus": "ACTIVE"
4794 }))
4795 .unwrap(),
4796 ))
4797 }
4798
4799 fn get_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4800 let body = body_json(req);
4801 let integration_name = require_str(&body, "integrationName")?;
4802
4803 let state = self.state.read();
4804 match state.integrations.get(integration_name) {
4805 Some(i) => Ok(AwsResponse::json(
4806 StatusCode::OK,
4807 serde_json::to_string(&json!({
4808 "integrationName": i.integration_name,
4809 "integrationType": i.integration_type,
4810 "integrationStatus": i.status,
4811 }))
4812 .unwrap(),
4813 )),
4814 None => Err(AwsServiceError::aws_error(
4815 StatusCode::NOT_FOUND,
4816 "ResourceNotFoundException",
4817 format!("Integration not found: {integration_name}"),
4818 )),
4819 }
4820 }
4821
4822 fn delete_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4823 let body = body_json(req);
4824 let integration_name = require_str(&body, "integrationName")?;
4825
4826 let mut state = self.state.write();
4827 state.integrations.remove(integration_name);
4828 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4829 }
4830
4831 fn list_integrations(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4832 let state = self.state.read();
4833 let integrations: Vec<Value> = state
4834 .integrations
4835 .values()
4836 .map(|i| {
4837 json!({
4838 "integrationName": i.integration_name,
4839 "integrationType": i.integration_type,
4840 "integrationStatus": i.status,
4841 })
4842 })
4843 .collect();
4844 Ok(AwsResponse::json(
4845 StatusCode::OK,
4846 serde_json::to_string(&json!({ "integrationSummaries": integrations })).unwrap(),
4847 ))
4848 }
4849
4850 fn create_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4853 let body = body_json(req);
4854 let lookup_table_name = require_str(&body, "lookupTableName")?;
4855 let table_body = require_str(&body, "tableBody")?;
4856
4857 let state_r = self.state.read();
4858 let account_id = state_r.account_id.clone();
4859 let region = state_r.region.clone();
4860 drop(state_r);
4861
4862 let arn = format!("arn:aws:logs:{region}:{account_id}:lookup-table:{lookup_table_name}");
4863 let now = Utc::now().timestamp_millis();
4864
4865 let table = LookupTable {
4866 lookup_table_name: lookup_table_name.to_string(),
4867 arn: arn.clone(),
4868 table_body: table_body.to_string(),
4869 creation_time: now,
4870 last_modified_time: now,
4871 };
4872
4873 let mut state = self.state.write();
4874 state.lookup_tables.insert(arn.clone(), table);
4875
4876 Ok(AwsResponse::json(
4877 StatusCode::OK,
4878 serde_json::to_string(&json!({ "lookupTableArn": arn })).unwrap(),
4879 ))
4880 }
4881
4882 fn get_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4883 let body = body_json(req);
4884 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4885
4886 let state = self.state.read();
4887 match state.lookup_tables.get(lookup_table_arn) {
4888 Some(t) => Ok(AwsResponse::json(
4889 StatusCode::OK,
4890 serde_json::to_string(&json!({
4891 "lookupTableName": t.lookup_table_name,
4892 "lookupTableArn": t.arn,
4893 "tableBody": t.table_body,
4894 "creationTime": t.creation_time,
4895 "lastModifiedTime": t.last_modified_time,
4896 }))
4897 .unwrap(),
4898 )),
4899 None => Err(AwsServiceError::aws_error(
4900 StatusCode::NOT_FOUND,
4901 "ResourceNotFoundException",
4902 format!("Lookup table not found: {lookup_table_arn}"),
4903 )),
4904 }
4905 }
4906
4907 fn describe_lookup_tables(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4908 let state = self.state.read();
4909 let tables: Vec<Value> = state
4910 .lookup_tables
4911 .values()
4912 .map(|t| {
4913 json!({
4914 "lookupTableName": t.lookup_table_name,
4915 "lookupTableArn": t.arn,
4916 "creationTime": t.creation_time,
4917 "lastModifiedTime": t.last_modified_time,
4918 })
4919 })
4920 .collect();
4921 Ok(AwsResponse::json(
4922 StatusCode::OK,
4923 serde_json::to_string(&json!({ "lookupTables": tables })).unwrap(),
4924 ))
4925 }
4926
4927 fn delete_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4928 let body = body_json(req);
4929 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4930
4931 let mut state = self.state.write();
4932 state.lookup_tables.remove(lookup_table_arn);
4933 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4934 }
4935
4936 fn update_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4937 let body = body_json(req);
4938 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
4939 let table_body = require_str(&body, "tableBody")?;
4940
4941 let mut state = self.state.write();
4942 match state.lookup_tables.get_mut(lookup_table_arn) {
4943 Some(t) => {
4944 t.table_body = table_body.to_string();
4945 t.last_modified_time = Utc::now().timestamp_millis();
4946 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4947 }
4948 None => Err(AwsServiceError::aws_error(
4949 StatusCode::NOT_FOUND,
4950 "ResourceNotFoundException",
4951 format!("Lookup table not found: {lookup_table_arn}"),
4952 )),
4953 }
4954 }
4955
4956 fn create_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4959 let body = body_json(req);
4960 let name = require_str(&body, "name")?;
4961 let query_string = require_str(&body, "queryString")?;
4962 let query_language = require_str(&body, "queryLanguage")?;
4963 let schedule_expression = require_str(&body, "scheduleExpression")?;
4964 let execution_role_arn = require_str(&body, "executionRoleArn")?;
4965
4966 let state_r = self.state.read();
4967 let account_id = state_r.account_id.clone();
4968 let region = state_r.region.clone();
4969 drop(state_r);
4970
4971 let arn = format!("arn:aws:logs:{region}:{account_id}:scheduled-query:{name}");
4972 let now = Utc::now().timestamp_millis();
4973
4974 let sq = ScheduledQuery {
4975 name: name.to_string(),
4976 arn: arn.clone(),
4977 query_string: query_string.to_string(),
4978 query_language: query_language.to_string(),
4979 schedule_expression: schedule_expression.to_string(),
4980 execution_role_arn: execution_role_arn.to_string(),
4981 status: "ACTIVE".to_string(),
4982 creation_time: now,
4983 last_modified_time: now,
4984 };
4985
4986 let mut state = self.state.write();
4987 state.scheduled_queries.insert(arn.clone(), sq);
4988
4989 Ok(AwsResponse::json(
4990 StatusCode::OK,
4991 serde_json::to_string(&json!({ "scheduledQueryArn": arn })).unwrap(),
4992 ))
4993 }
4994
4995 fn get_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4996 let body = body_json(req);
4997 let identifier = require_str(&body, "identifier")?;
4998
4999 let state = self.state.read();
5000 match state.scheduled_queries.get(identifier) {
5001 Some(sq) => Ok(AwsResponse::json(
5002 StatusCode::OK,
5003 serde_json::to_string(&json!({
5004 "scheduledQueryArn": sq.arn,
5005 "name": sq.name,
5006 "queryString": sq.query_string,
5007 "queryLanguage": sq.query_language,
5008 "scheduleExpression": sq.schedule_expression,
5009 "executionRoleArn": sq.execution_role_arn,
5010 }))
5011 .unwrap(),
5012 )),
5013 None => Err(AwsServiceError::aws_error(
5014 StatusCode::NOT_FOUND,
5015 "ResourceNotFoundException",
5016 format!("Scheduled query not found: {identifier}"),
5017 )),
5018 }
5019 }
5020
5021 fn get_scheduled_query_history(
5022 &self,
5023 req: &AwsRequest,
5024 ) -> Result<AwsResponse, AwsServiceError> {
5025 let body = body_json(req);
5026 let _identifier = require_str(&body, "identifier")?;
5027 Ok(AwsResponse::json(
5029 StatusCode::OK,
5030 serde_json::to_string(&json!({ "triggerHistory": [] })).unwrap(),
5031 ))
5032 }
5033
5034 fn list_scheduled_queries(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5035 let state = self.state.read();
5036 let queries: Vec<Value> = state
5037 .scheduled_queries
5038 .values()
5039 .map(|sq| {
5040 json!({
5041 "name": sq.name,
5042 "scheduledQueryArn": sq.arn,
5043 })
5044 })
5045 .collect();
5046 Ok(AwsResponse::json(
5047 StatusCode::OK,
5048 serde_json::to_string(&json!({ "scheduledQueries": queries })).unwrap(),
5049 ))
5050 }
5051
5052 fn delete_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5053 let body = body_json(req);
5054 let identifier = require_str(&body, "identifier")?;
5055
5056 let mut state = self.state.write();
5057 state.scheduled_queries.remove(identifier);
5058 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5059 }
5060
5061 fn update_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5062 let body = body_json(req);
5063 let identifier = require_str(&body, "identifier")?;
5064 let query_string = require_str(&body, "queryString")?;
5065 let query_language = require_str(&body, "queryLanguage")?;
5066 let schedule_expression = require_str(&body, "scheduleExpression")?;
5067 let execution_role_arn = require_str(&body, "executionRoleArn")?;
5068
5069 let mut state = self.state.write();
5070 match state.scheduled_queries.get_mut(identifier) {
5071 Some(sq) => {
5072 sq.query_string = query_string.to_string();
5073 sq.query_language = query_language.to_string();
5074 sq.schedule_expression = schedule_expression.to_string();
5075 sq.execution_role_arn = execution_role_arn.to_string();
5076 sq.last_modified_time = Utc::now().timestamp_millis();
5077 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5078 }
5079 None => Err(AwsServiceError::aws_error(
5080 StatusCode::NOT_FOUND,
5081 "ResourceNotFoundException",
5082 format!("Scheduled query not found: {identifier}"),
5083 )),
5084 }
5085 }
5086
5087 fn start_live_tail(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5090 let session_id = uuid::Uuid::new_v4().to_string();
5091 Ok(AwsResponse::json(
5092 StatusCode::OK,
5093 serde_json::to_string(&json!({
5094 "responseStream": {
5095 "sessionStart": {
5096 "sessionId": session_id,
5097 "logGroupIdentifiers": [],
5098 }
5099 }
5100 }))
5101 .unwrap(),
5102 ))
5103 }
5104
5105 fn list_log_groups_for_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5106 let body = body_json(req);
5107 let _query_id = require_str(&body, "queryId")?;
5108 Ok(AwsResponse::json(
5110 StatusCode::OK,
5111 serde_json::to_string(&json!({ "logGroupIdentifiers": [] })).unwrap(),
5112 ))
5113 }
5114
5115 fn list_aggregate_log_group_summaries(
5116 &self,
5117 _req: &AwsRequest,
5118 ) -> Result<AwsResponse, AwsServiceError> {
5119 Ok(AwsResponse::json(
5121 StatusCode::OK,
5122 serde_json::to_string(&json!({ "aggregateLogGroupSummaries": [] })).unwrap(),
5123 ))
5124 }
5125
5126 fn put_bearer_token_authentication(
5127 &self,
5128 req: &AwsRequest,
5129 ) -> Result<AwsResponse, AwsServiceError> {
5130 let body = body_json(req);
5131 let log_group_identifier = require_str(&body, "logGroupIdentifier")?;
5132 let enabled = body["bearerTokenAuthenticationEnabled"]
5133 .as_bool()
5134 .unwrap_or(false);
5135
5136 let mut state = self.state.write();
5137 state
5138 .bearer_token_auth
5139 .insert(log_group_identifier.to_string(), enabled);
5140 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5141 }
5142
5143 fn get_log_object(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5144 Ok(AwsResponse::json(
5146 StatusCode::OK,
5147 serde_json::to_string(&json!({ "logObject": {} })).unwrap(),
5148 ))
5149 }
5150
5151 fn get_log_fields(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5152 Ok(AwsResponse::json(
5154 StatusCode::OK,
5155 serde_json::to_string(&json!({ "logFields": [] })).unwrap(),
5156 ))
5157 }
5158
5159 fn associate_source_to_s3_table_integration(
5160 &self,
5161 req: &AwsRequest,
5162 ) -> Result<AwsResponse, AwsServiceError> {
5163 let body = body_json(req);
5164 let integration_arn = require_str(&body, "integrationArn")?;
5165 let data_source = body["dataSource"].clone();
5166 let source_id = data_source
5167 .as_object()
5168 .and_then(|o| o.get("resourceArn"))
5169 .and_then(|v| v.as_str())
5170 .unwrap_or("unknown")
5171 .to_string();
5172
5173 let mut state = self.state.write();
5174 state
5175 .s3_table_sources
5176 .entry(integration_arn.to_string())
5177 .or_default()
5178 .push(source_id);
5179 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5180 }
5181
5182 fn list_sources_for_s3_table_integration(
5183 &self,
5184 req: &AwsRequest,
5185 ) -> Result<AwsResponse, AwsServiceError> {
5186 let body = body_json(req);
5187 let integration_arn = require_str(&body, "integrationArn")?;
5188
5189 let state = self.state.read();
5190 let sources: Vec<Value> = state
5191 .s3_table_sources
5192 .get(integration_arn)
5193 .map(|sources| {
5194 sources
5195 .iter()
5196 .map(|s| json!({ "resourceArn": s }))
5197 .collect()
5198 })
5199 .unwrap_or_default();
5200 Ok(AwsResponse::json(
5201 StatusCode::OK,
5202 serde_json::to_string(&json!({ "dataSources": sources })).unwrap(),
5203 ))
5204 }
5205
5206 fn disassociate_source_from_s3_table_integration(
5207 &self,
5208 req: &AwsRequest,
5209 ) -> Result<AwsResponse, AwsServiceError> {
5210 let body = body_json(req);
5211 let _identifier = require_str(&body, "identifier")?;
5212 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5214 }
5215
5216 fn update_delivery_configuration(
5217 &self,
5218 req: &AwsRequest,
5219 ) -> Result<AwsResponse, AwsServiceError> {
5220 let body = body_json(req);
5221 let id = require_str(&body, "id")?;
5222
5223 let state = self.state.read();
5224 if !state.deliveries.contains_key(id) {
5225 return Err(AwsServiceError::aws_error(
5226 StatusCode::NOT_FOUND,
5227 "ResourceNotFoundException",
5228 format!("Delivery not found: {id}"),
5229 ));
5230 }
5231 drop(state);
5232
5233 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5235 }
5236
5237 fn describe_configuration_templates(
5238 &self,
5239 _req: &AwsRequest,
5240 ) -> Result<AwsResponse, AwsServiceError> {
5241 Ok(AwsResponse::json(
5243 StatusCode::OK,
5244 serde_json::to_string(&json!({ "configurationTemplates": [] })).unwrap(),
5245 ))
5246 }
5247}
5248
5249fn resolve_log_group_name(
5252 log_group_name: Option<&str>,
5253 resource_identifier: Option<&str>,
5254) -> Result<String, AwsServiceError> {
5255 if let Some(identifier) = resource_identifier {
5256 if identifier.starts_with("arn:") {
5257 extract_log_group_from_arn(identifier).ok_or_else(|| {
5258 AwsServiceError::aws_error(
5259 StatusCode::BAD_REQUEST,
5260 "InvalidParameterException",
5261 format!("Invalid ARN: {identifier}"),
5262 )
5263 })
5264 } else {
5265 Ok(identifier.to_string())
5266 }
5267 } else if let Some(name) = log_group_name {
5268 Ok(name.to_string())
5269 } else {
5270 Err(AwsServiceError::aws_error(
5271 StatusCode::BAD_REQUEST,
5272 "InvalidParameterException",
5273 "Either logGroupName or resourceIdentifier is required",
5274 ))
5275 }
5276}
5277
5278fn extract_log_group_from_arn(arn: &str) -> Option<String> {
5280 let parts: Vec<&str> = arn.splitn(7, ':').collect();
5282 if parts.len() >= 7 && parts[5] == "log-group" {
5283 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
5284 Some(name.to_string())
5285 } else {
5286 None
5287 }
5288}
5289
5290fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
5298 let pattern = pattern.trim();
5299
5300 if pattern.is_empty() {
5302 return true;
5303 }
5304
5305 if pattern.starts_with('{') || pattern.starts_with('[') {
5307 return true;
5308 }
5309
5310 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
5312 let inner = &pattern[1..pattern.len() - 1];
5313 return message.contains(inner);
5314 }
5315
5316 let words: Vec<&str> = pattern.split_whitespace().collect();
5318 words.iter().all(|word| message.contains(word))
5319}
5320
5321#[cfg(test)]
5322mod tests {
5323 use super::*;
5324 use crate::state::LogsState;
5325 use bytes::Bytes;
5326 use http::{HeaderMap, Method};
5327 use std::collections::HashMap;
5328 use std::sync::Arc;
5329
5330 fn make_service() -> LogsService {
5331 let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
5332 "123456789012",
5333 "us-east-1",
5334 )));
5335 LogsService::new(state)
5336 }
5337
5338 fn make_request(action: &str, body: Value) -> AwsRequest {
5339 AwsRequest {
5340 service: "logs".to_string(),
5341 action: action.to_string(),
5342 region: "us-east-1".to_string(),
5343 account_id: "123456789012".to_string(),
5344 request_id: "test-request-id".to_string(),
5345 headers: HeaderMap::new(),
5346 query_params: HashMap::new(),
5347 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
5348 path_segments: vec![],
5349 raw_path: "/".to_string(),
5350 method: Method::POST,
5351 is_query_protocol: false,
5352 access_key_id: None,
5353 }
5354 }
5355
5356 fn create_group(svc: &LogsService, name: &str) {
5357 let req = make_request("CreateLogGroup", json!({ "logGroupName": name }));
5358 svc.create_log_group(&req).unwrap();
5359 }
5360
5361 fn create_stream(svc: &LogsService, group: &str, stream: &str) {
5362 let req = make_request(
5363 "CreateLogStream",
5364 json!({ "logGroupName": group, "logStreamName": stream }),
5365 );
5366 svc.create_log_stream(&req).unwrap();
5367 }
5368
5369 fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
5370 let now = chrono::Utc::now().timestamp_millis();
5371 let events: Vec<Value> = messages
5372 .iter()
5373 .enumerate()
5374 .map(|(i, msg)| json!({ "timestamp": now + i as i64, "message": msg }))
5375 .collect();
5376 let req = make_request(
5377 "PutLogEvents",
5378 json!({
5379 "logGroupName": group,
5380 "logStreamName": stream,
5381 "logEvents": events,
5382 }),
5383 );
5384 svc.put_log_events(&req).unwrap();
5385 }
5386
5387 #[test]
5390 fn describe_log_groups_pattern_filters_by_substring() {
5391 let svc = make_service();
5392 create_group(&svc, "/app/web");
5393 create_group(&svc, "/app/api");
5394 create_group(&svc, "/system/metrics");
5395
5396 let req = make_request("DescribeLogGroups", json!({ "logGroupNamePattern": "app" }));
5397 let resp = svc.describe_log_groups(&req).unwrap();
5398 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5399 let names: Vec<&str> = body["logGroups"]
5400 .as_array()
5401 .unwrap()
5402 .iter()
5403 .map(|g| g["logGroupName"].as_str().unwrap())
5404 .collect();
5405 assert_eq!(names.len(), 2);
5406 assert!(names.contains(&"/app/web"));
5407 assert!(names.contains(&"/app/api"));
5408 }
5409
5410 #[test]
5411 fn describe_log_groups_pattern_empty_returns_all() {
5412 let svc = make_service();
5413 create_group(&svc, "/app/web");
5414 create_group(&svc, "/system/metrics");
5415
5416 let req = make_request("DescribeLogGroups", json!({}));
5417 let resp = svc.describe_log_groups(&req).unwrap();
5418 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5419 assert_eq!(body["logGroups"].as_array().unwrap().len(), 2);
5420 }
5421
5422 #[test]
5425 fn filter_log_events_uses_log_group_identifier_as_name() {
5426 let svc = make_service();
5427 create_group(&svc, "my-group");
5428 create_stream(&svc, "my-group", "stream-1");
5429 put_events(&svc, "my-group", "stream-1", &["hello"]);
5430
5431 let req = make_request(
5432 "FilterLogEvents",
5433 json!({ "logGroupIdentifier": "my-group" }),
5434 );
5435 let resp = svc.filter_log_events(&req).unwrap();
5436 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5437 assert_eq!(body["events"].as_array().unwrap().len(), 1);
5438 }
5439
5440 #[test]
5441 fn filter_log_events_uses_log_group_identifier_as_arn() {
5442 let svc = make_service();
5443 create_group(&svc, "my-group");
5444 create_stream(&svc, "my-group", "stream-1");
5445 put_events(&svc, "my-group", "stream-1", &["hello"]);
5446
5447 let req = make_request(
5448 "FilterLogEvents",
5449 json!({ "logGroupIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*" }),
5450 );
5451 let resp = svc.filter_log_events(&req).unwrap();
5452 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5453 assert_eq!(body["events"].as_array().unwrap().len(), 1);
5454 }
5455
5456 #[test]
5457 fn filter_log_events_errors_without_group_name_or_identifier() {
5458 let svc = make_service();
5459 let req = make_request("FilterLogEvents", json!({}));
5460 assert!(svc.filter_log_events(&req).is_err());
5461 }
5462
5463 #[test]
5466 fn filter_log_events_filters_by_stream_name_prefix() {
5467 let svc = make_service();
5468 create_group(&svc, "grp");
5469 create_stream(&svc, "grp", "web-1");
5470 create_stream(&svc, "grp", "web-2");
5471 create_stream(&svc, "grp", "api-1");
5472 put_events(&svc, "grp", "web-1", &["a"]);
5473 put_events(&svc, "grp", "web-2", &["b"]);
5474 put_events(&svc, "grp", "api-1", &["c"]);
5475
5476 let req = make_request(
5477 "FilterLogEvents",
5478 json!({ "logGroupName": "grp", "logStreamNamePrefix": "web" }),
5479 );
5480 let resp = svc.filter_log_events(&req).unwrap();
5481 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5482 let events = body["events"].as_array().unwrap();
5483 assert_eq!(events.len(), 2);
5484 for e in events {
5485 assert!(e["logStreamName"].as_str().unwrap().starts_with("web"));
5486 }
5487 }
5488
5489 #[test]
5492 fn create_export_task_stores_task_name_and_stream_prefix() {
5493 let svc = make_service();
5494 create_group(&svc, "grp");
5495
5496 let req = make_request(
5497 "CreateExportTask",
5498 json!({
5499 "logGroupName": "grp",
5500 "from": 0,
5501 "to": 1000,
5502 "destination": "my-bucket",
5503 "taskName": "my-export",
5504 "logStreamNamePrefix": "web-",
5505 }),
5506 );
5507 let resp = svc.create_export_task(&req).unwrap();
5508 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5509 let task_id = body["taskId"].as_str().unwrap();
5510
5511 let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5512 let resp = svc.describe_export_tasks(&req).unwrap();
5513 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5514 let task = &body["exportTasks"][0];
5515 assert_eq!(task["taskName"].as_str().unwrap(), "my-export");
5516 assert_eq!(task["logStreamNamePrefix"].as_str().unwrap(), "web-");
5517 }
5518
5519 #[test]
5520 fn create_export_task_omits_optional_fields_when_not_provided() {
5521 let svc = make_service();
5522 create_group(&svc, "grp");
5523
5524 let req = make_request(
5525 "CreateExportTask",
5526 json!({
5527 "logGroupName": "grp",
5528 "from": 0,
5529 "to": 1000,
5530 "destination": "my-bucket",
5531 }),
5532 );
5533 let resp = svc.create_export_task(&req).unwrap();
5534 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5535 let task_id = body["taskId"].as_str().unwrap();
5536
5537 let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5538 let resp = svc.describe_export_tasks(&req).unwrap();
5539 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5540 let task = &body["exportTasks"][0];
5541 assert!(task.get("taskName").is_none() || task["taskName"].is_null());
5542 assert!(task.get("logStreamNamePrefix").is_none() || task["logStreamNamePrefix"].is_null());
5543 }
5544
5545 #[test]
5548 fn associate_kms_key_via_resource_identifier_arn() {
5549 let svc = make_service();
5550 create_group(&svc, "grp");
5551
5552 let req = make_request(
5553 "AssociateKmsKey",
5554 json!({
5555 "resourceIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:grp:*",
5556 "kmsKeyId": "arn:aws:kms:us-east-1:123456789012:key/abc-123",
5557 }),
5558 );
5559 svc.associate_kms_key(&req).unwrap();
5560
5561 let state = svc.state.read();
5562 assert_eq!(
5563 state.log_groups["grp"].kms_key_id.as_deref(),
5564 Some("arn:aws:kms:us-east-1:123456789012:key/abc-123")
5565 );
5566 }
5567
5568 #[test]
5569 fn disassociate_kms_key_via_resource_identifier_name() {
5570 let svc = make_service();
5571 create_group(&svc, "grp");
5572
5573 let req = make_request(
5575 "AssociateKmsKey",
5576 json!({ "logGroupName": "grp", "kmsKeyId": "some-key" }),
5577 );
5578 svc.associate_kms_key(&req).unwrap();
5579
5580 let req = make_request("DisassociateKmsKey", json!({ "resourceIdentifier": "grp" }));
5582 svc.disassociate_kms_key(&req).unwrap();
5583
5584 let state = svc.state.read();
5585 assert!(state.log_groups["grp"].kms_key_id.is_none());
5586 }
5587
5588 #[test]
5591 fn describe_query_definitions_filters_by_name_prefix() {
5592 let svc = make_service();
5593
5594 for name in &["error-queries-1", "error-queries-2", "latency-queries-1"] {
5596 let req = make_request(
5597 "PutQueryDefinition",
5598 json!({
5599 "name": name,
5600 "queryString": "fields @timestamp | limit 20",
5601 }),
5602 );
5603 svc.put_query_definition(&req).unwrap();
5604 }
5605
5606 let req = make_request(
5607 "DescribeQueryDefinitions",
5608 json!({ "queryDefinitionNamePrefix": "error" }),
5609 );
5610 let resp = svc.describe_query_definitions(&req).unwrap();
5611 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5612 let defs = body["queryDefinitions"].as_array().unwrap();
5613 assert_eq!(defs.len(), 2);
5614 for d in defs {
5615 assert!(d["name"].as_str().unwrap().starts_with("error"));
5616 }
5617 }
5618
5619 #[test]
5620 fn describe_query_definitions_no_prefix_returns_all() {
5621 let svc = make_service();
5622
5623 for name in &["a", "b", "c"] {
5624 let req = make_request(
5625 "PutQueryDefinition",
5626 json!({ "name": name, "queryString": "fields @timestamp" }),
5627 );
5628 svc.put_query_definition(&req).unwrap();
5629 }
5630
5631 let req = make_request("DescribeQueryDefinitions", json!({}));
5632 let resp = svc.describe_query_definitions(&req).unwrap();
5633 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5634 assert_eq!(body["queryDefinitions"].as_array().unwrap().len(), 3);
5635 }
5636
5637 #[test]
5640 fn put_delivery_destination_includes_empty_destination_resource_arn() {
5641 let svc = make_service();
5642 let req = make_request(
5643 "PutDeliveryDestination",
5644 json!({
5645 "name": "my-dest",
5646 "deliveryDestinationConfiguration": {}
5647 }),
5648 );
5649 let resp = svc.put_delivery_destination(&req).unwrap();
5650 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5651 let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5652 assert_eq!(
5654 config["destinationResourceArn"].as_str().unwrap(),
5655 "",
5656 "destinationResourceArn should be an empty string when not set"
5657 );
5658 }
5659
5660 #[test]
5661 fn put_delivery_destination_includes_destination_resource_arn_when_set() {
5662 let svc = make_service();
5663 let req = make_request(
5664 "PutDeliveryDestination",
5665 json!({
5666 "name": "my-dest",
5667 "deliveryDestinationConfiguration": {
5668 "destinationResourceArn": "arn:aws:s3:::my-bucket"
5669 }
5670 }),
5671 );
5672 let resp = svc.put_delivery_destination(&req).unwrap();
5673 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5674 let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5675 assert_eq!(
5676 config["destinationResourceArn"].as_str().unwrap(),
5677 "arn:aws:s3:::my-bucket"
5678 );
5679 }
5680
5681 #[test]
5682 fn extract_log_group_from_arn_strips_wildcard_suffix() {
5683 let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*";
5684 assert_eq!(
5685 extract_log_group_from_arn(arn),
5686 Some("my-group".to_string())
5687 );
5688 }
5689
5690 #[test]
5691 fn extract_log_group_from_arn_without_wildcard() {
5692 let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group";
5693 assert_eq!(
5694 extract_log_group_from_arn(arn),
5695 Some("my-group".to_string())
5696 );
5697 }
5698
5699 #[test]
5700 fn extract_log_group_from_arn_invalid() {
5701 assert_eq!(extract_log_group_from_arn("not-an-arn"), None);
5702 }
5703
5704 #[test]
5707 fn account_policy_lifecycle() {
5708 let svc = make_service();
5709
5710 let req = make_request(
5711 "PutAccountPolicy",
5712 json!({
5713 "policyName": "test-policy",
5714 "policyType": "DATA_PROTECTION_POLICY",
5715 "policyDocument": "{\"Name\":\"test\"}",
5716 }),
5717 );
5718 let resp = svc.put_account_policy(&req).unwrap();
5719 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5720 assert_eq!(body["accountPolicy"]["policyName"], "test-policy");
5721
5722 let req = make_request(
5723 "DescribeAccountPolicies",
5724 json!({ "policyType": "DATA_PROTECTION_POLICY" }),
5725 );
5726 let resp = svc.describe_account_policies(&req).unwrap();
5727 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5728 assert_eq!(body["accountPolicies"].as_array().unwrap().len(), 1);
5729
5730 let req = make_request(
5731 "DeleteAccountPolicy",
5732 json!({
5733 "policyName": "test-policy",
5734 "policyType": "DATA_PROTECTION_POLICY",
5735 }),
5736 );
5737 svc.delete_account_policy(&req).unwrap();
5738
5739 let req = make_request(
5740 "DescribeAccountPolicies",
5741 json!({ "policyType": "DATA_PROTECTION_POLICY" }),
5742 );
5743 let resp = svc.describe_account_policies(&req).unwrap();
5744 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5745 assert!(body["accountPolicies"].as_array().unwrap().is_empty());
5746 }
5747
5748 #[test]
5751 fn data_protection_policy_lifecycle() {
5752 let svc = make_service();
5753 create_group(&svc, "dp-group");
5754
5755 let req = make_request(
5756 "PutDataProtectionPolicy",
5757 json!({
5758 "logGroupIdentifier": "dp-group",
5759 "policyDocument": "{\"Name\":\"dp\"}",
5760 }),
5761 );
5762 svc.put_data_protection_policy(&req).unwrap();
5763
5764 let req = make_request(
5765 "GetDataProtectionPolicy",
5766 json!({ "logGroupIdentifier": "dp-group" }),
5767 );
5768 let resp = svc.get_data_protection_policy(&req).unwrap();
5769 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5770 assert_eq!(body["policyDocument"], "{\"Name\":\"dp\"}");
5771
5772 let req = make_request(
5773 "DeleteDataProtectionPolicy",
5774 json!({ "logGroupIdentifier": "dp-group" }),
5775 );
5776 svc.delete_data_protection_policy(&req).unwrap();
5777
5778 let req = make_request(
5779 "GetDataProtectionPolicy",
5780 json!({ "logGroupIdentifier": "dp-group" }),
5781 );
5782 let resp = svc.get_data_protection_policy(&req).unwrap();
5783 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5784 assert!(body.get("policyDocument").is_none());
5785 }
5786
5787 #[test]
5790 fn index_policy_lifecycle() {
5791 let svc = make_service();
5792 create_group(&svc, "idx-group");
5793
5794 let req = make_request(
5795 "PutIndexPolicy",
5796 json!({
5797 "logGroupIdentifier": "idx-group",
5798 "policyDocument": "{\"Fields\":[\"field1\"]}",
5799 }),
5800 );
5801 svc.put_index_policy(&req).unwrap();
5802
5803 let req = make_request(
5804 "DescribeIndexPolicies",
5805 json!({ "logGroupIdentifiers": ["idx-group"] }),
5806 );
5807 let resp = svc.describe_index_policies(&req).unwrap();
5808 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809 assert_eq!(body["indexPolicies"].as_array().unwrap().len(), 1);
5810
5811 let req = make_request(
5812 "DeleteIndexPolicy",
5813 json!({
5814 "logGroupIdentifier": "idx-group",
5815 }),
5816 );
5817 svc.delete_index_policy(&req).unwrap();
5818 }
5819
5820 #[test]
5823 fn transformer_lifecycle() {
5824 let svc = make_service();
5825 create_group(&svc, "tx-group");
5826
5827 let req = make_request(
5828 "PutTransformer",
5829 json!({
5830 "logGroupIdentifier": "tx-group",
5831 "transformerConfig": [{"addKeys":{"entries":[{"key":"new","value":"val"}]}}],
5832 }),
5833 );
5834 svc.put_transformer(&req).unwrap();
5835
5836 let req = make_request(
5837 "GetTransformer",
5838 json!({ "logGroupIdentifier": "tx-group" }),
5839 );
5840 let resp = svc.get_transformer(&req).unwrap();
5841 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5842 assert!(body["transformerConfig"].is_array());
5843
5844 let req = make_request(
5845 "DeleteTransformer",
5846 json!({ "logGroupIdentifier": "tx-group" }),
5847 );
5848 svc.delete_transformer(&req).unwrap();
5849 }
5850
5851 #[test]
5852 fn test_transformer_returns_transformed_events() {
5853 let svc = make_service();
5854
5855 let req = make_request(
5856 "TestTransformer",
5857 json!({
5858 "transformerConfig": [{"addKeys":{"entries":[]}}],
5859 "logEventMessages": ["hello", "world"],
5860 }),
5861 );
5862 let resp = svc.test_transformer(&req).unwrap();
5863 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5864 assert_eq!(body["transformedLogs"].as_array().unwrap().len(), 2);
5865 }
5866
5867 #[test]
5870 fn anomaly_detector_lifecycle() {
5871 let svc = make_service();
5872
5873 let req = make_request(
5874 "CreateLogAnomalyDetector",
5875 json!({
5876 "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
5877 "detectorName": "my-detector",
5878 }),
5879 );
5880 let resp = svc.create_log_anomaly_detector(&req).unwrap();
5881 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5882 let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
5883
5884 let req = make_request(
5885 "GetLogAnomalyDetector",
5886 json!({ "anomalyDetectorArn": &arn }),
5887 );
5888 let resp = svc.get_log_anomaly_detector(&req).unwrap();
5889 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5890 assert_eq!(body["detectorName"], "my-detector");
5891
5892 let req = make_request("ListLogAnomalyDetectors", json!({}));
5893 let resp = svc.list_log_anomaly_detectors(&req).unwrap();
5894 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5895 assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
5896
5897 let req = make_request(
5898 "UpdateLogAnomalyDetector",
5899 json!({ "anomalyDetectorArn": &arn, "enabled": false }),
5900 );
5901 svc.update_log_anomaly_detector(&req).unwrap();
5902
5903 let req = make_request(
5904 "DeleteLogAnomalyDetector",
5905 json!({ "anomalyDetectorArn": &arn }),
5906 );
5907 svc.delete_log_anomaly_detector(&req).unwrap();
5908 }
5909
5910 #[test]
5913 fn get_log_group_fields_returns_stub() {
5914 let svc = make_service();
5915 create_group(&svc, "fields-group");
5916
5917 let req = make_request(
5918 "GetLogGroupFields",
5919 json!({ "logGroupName": "fields-group" }),
5920 );
5921 let resp = svc.get_log_group_fields(&req).unwrap();
5922 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5923 assert_eq!(body["logGroupFields"].as_array().unwrap().len(), 2);
5924 }
5925
5926 #[test]
5927 fn test_metric_filter_matches() {
5928 let svc = make_service();
5929
5930 let req = make_request(
5931 "TestMetricFilter",
5932 json!({
5933 "filterPattern": "ERROR",
5934 "logEventMessages": ["ERROR: oops", "INFO: ok", "ERROR: again"],
5935 }),
5936 );
5937 let resp = svc.test_metric_filter(&req).unwrap();
5938 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5939 assert_eq!(body["matches"].as_array().unwrap().len(), 2);
5940 }
5941
5942 #[test]
5943 fn stop_query_marks_as_cancelled() {
5944 let svc = make_service();
5945 create_group(&svc, "sq-group");
5946
5947 let req = make_request(
5948 "StartQuery",
5949 json!({
5950 "logGroupName": "sq-group",
5951 "startTime": 0,
5952 "endTime": 9999999999i64,
5953 "queryString": "fields @timestamp",
5954 }),
5955 );
5956 let resp = svc.start_query(&req).unwrap();
5957 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5958 let qid = body["queryId"].as_str().unwrap().to_string();
5959
5960 {
5962 let mut state = svc.state.write();
5963 state.queries.get_mut(&qid).unwrap().status = "Running".to_string();
5964 }
5965
5966 let req = make_request("StopQuery", json!({ "queryId": &qid }));
5967 let resp = svc.stop_query(&req).unwrap();
5968 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969 assert_eq!(body["success"], true);
5970
5971 let state = svc.state.read();
5972 assert_eq!(state.queries[&qid].status, "Cancelled");
5973 }
5974
5975 #[test]
5976 fn put_log_group_deletion_protection() {
5977 let svc = make_service();
5978 create_group(&svc, "prot-group");
5979
5980 let req = make_request(
5981 "PutLogGroupDeletionProtection",
5982 json!({
5983 "logGroupIdentifier": "prot-group",
5984 "deletionProtectionEnabled": true,
5985 }),
5986 );
5987 svc.put_log_group_deletion_protection(&req).unwrap();
5988
5989 let state = svc.state.read();
5990 assert!(state.log_groups["prot-group"].deletion_protection);
5991 }
5992
5993 #[test]
5994 fn get_log_record_returns_empty_stub() {
5995 let svc = make_service();
5996
5997 let req = make_request(
5998 "GetLogRecord",
5999 json!({ "logRecordPointer": "some-pointer" }),
6000 );
6001 let resp = svc.get_log_record(&req).unwrap();
6002 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6003 assert!(body["logRecord"].is_object());
6004 }
6005
6006 #[test]
6007 fn list_anomalies_returns_empty() {
6008 let svc = make_service();
6009
6010 let req = make_request("ListAnomalies", json!({}));
6011 let resp = svc.list_anomalies(&req).unwrap();
6012 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013 assert!(body["anomalies"].as_array().unwrap().is_empty());
6014 }
6015
6016 #[test]
6017 fn update_anomaly_noop() {
6018 let svc = make_service();
6019 let req = make_request("UpdateAnomaly", json!({}));
6020 svc.update_anomaly(&req).unwrap();
6021 }
6022
6023 #[test]
6026 fn import_task_lifecycle() {
6027 let svc = make_service();
6028
6029 let req = make_request(
6030 "CreateImportTask",
6031 json!({
6032 "importSourceArn": "arn:aws:s3:::my-bucket/logs",
6033 "importRoleArn": "arn:aws:iam::123456789012:role/import-role"
6034 }),
6035 );
6036 let resp = svc.create_import_task(&req).unwrap();
6037 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6038 let import_id = body["importId"].as_str().unwrap().to_string();
6039
6040 let req = make_request("DescribeImportTasks", json!({}));
6041 let resp = svc.describe_import_tasks(&req).unwrap();
6042 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6043 assert_eq!(body["imports"].as_array().unwrap().len(), 1);
6044
6045 let req = make_request(
6046 "DescribeImportTaskBatches",
6047 json!({ "importId": import_id }),
6048 );
6049 let resp = svc.describe_import_task_batches(&req).unwrap();
6050 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6051 assert!(body["importBatches"].as_array().unwrap().is_empty());
6052
6053 let req = make_request("CancelImportTask", json!({ "importId": import_id }));
6054 svc.cancel_import_task(&req).unwrap();
6055
6056 let req = make_request("DescribeImportTasks", json!({}));
6057 let resp = svc.describe_import_tasks(&req).unwrap();
6058 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6059 assert_eq!(
6060 body["imports"][0]["importStatus"].as_str().unwrap(),
6061 "CANCELLED"
6062 );
6063 }
6064
6065 #[test]
6068 fn integration_lifecycle() {
6069 let svc = make_service();
6070
6071 let req = make_request(
6072 "PutIntegration",
6073 json!({
6074 "integrationName": "test-int",
6075 "integrationType": "OPENSEARCH",
6076 "resourceConfig": { "openSearchResourceConfig": {} }
6077 }),
6078 );
6079 svc.put_integration(&req).unwrap();
6080
6081 let req = make_request("GetIntegration", json!({ "integrationName": "test-int" }));
6082 let resp = svc.get_integration(&req).unwrap();
6083 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6084 assert_eq!(body["integrationName"].as_str().unwrap(), "test-int");
6085
6086 let req = make_request("ListIntegrations", json!({}));
6087 let resp = svc.list_integrations(&req).unwrap();
6088 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6089 assert_eq!(body["integrationSummaries"].as_array().unwrap().len(), 1);
6090
6091 let req = make_request(
6092 "DeleteIntegration",
6093 json!({ "integrationName": "test-int" }),
6094 );
6095 svc.delete_integration(&req).unwrap();
6096
6097 let req = make_request("ListIntegrations", json!({}));
6098 let resp = svc.list_integrations(&req).unwrap();
6099 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6100 assert!(body["integrationSummaries"].as_array().unwrap().is_empty());
6101 }
6102
6103 #[test]
6106 fn lookup_table_lifecycle() {
6107 let svc = make_service();
6108
6109 let req = make_request(
6110 "CreateLookupTable",
6111 json!({
6112 "lookupTableName": "test-table",
6113 "tableBody": "key,value\na,b"
6114 }),
6115 );
6116 let resp = svc.create_lookup_table(&req).unwrap();
6117 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6118 let arn = body["lookupTableArn"].as_str().unwrap().to_string();
6119
6120 let req = make_request("GetLookupTable", json!({ "lookupTableArn": arn }));
6121 let resp = svc.get_lookup_table(&req).unwrap();
6122 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6123 assert_eq!(body["lookupTableName"].as_str().unwrap(), "test-table");
6124
6125 let req = make_request("DescribeLookupTables", json!({}));
6126 let resp = svc.describe_lookup_tables(&req).unwrap();
6127 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6128 assert_eq!(body["lookupTables"].as_array().unwrap().len(), 1);
6129
6130 let req = make_request(
6131 "UpdateLookupTable",
6132 json!({ "lookupTableArn": arn, "tableBody": "key,value\nc,d" }),
6133 );
6134 svc.update_lookup_table(&req).unwrap();
6135
6136 let req = make_request("DeleteLookupTable", json!({ "lookupTableArn": arn }));
6137 svc.delete_lookup_table(&req).unwrap();
6138
6139 let req = make_request("DescribeLookupTables", json!({}));
6140 let resp = svc.describe_lookup_tables(&req).unwrap();
6141 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6142 assert!(body["lookupTables"].as_array().unwrap().is_empty());
6143 }
6144
6145 #[test]
6148 fn scheduled_query_lifecycle() {
6149 let svc = make_service();
6150
6151 let req = make_request(
6152 "CreateScheduledQuery",
6153 json!({
6154 "name": "test-sq",
6155 "queryString": "fields @timestamp | limit 10",
6156 "queryLanguage": "CWLI",
6157 "scheduleExpression": "rate(1 hour)",
6158 "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6159 }),
6160 );
6161 let resp = svc.create_scheduled_query(&req).unwrap();
6162 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6163 let arn = body["scheduledQueryArn"].as_str().unwrap().to_string();
6164
6165 let req = make_request("GetScheduledQuery", json!({ "identifier": arn }));
6166 let resp = svc.get_scheduled_query(&req).unwrap();
6167 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168 assert_eq!(body["name"].as_str().unwrap(), "test-sq");
6169
6170 let req = make_request(
6171 "GetScheduledQueryHistory",
6172 json!({ "identifier": arn, "startTime": 0_i64, "endTime": 9999999999_i64 }),
6173 );
6174 let resp = svc.get_scheduled_query_history(&req).unwrap();
6175 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6176 assert!(body["triggerHistory"].as_array().unwrap().is_empty());
6177
6178 let req = make_request("ListScheduledQueries", json!({}));
6179 let resp = svc.list_scheduled_queries(&req).unwrap();
6180 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6181 assert_eq!(body["scheduledQueries"].as_array().unwrap().len(), 1);
6182
6183 let req = make_request(
6184 "UpdateScheduledQuery",
6185 json!({
6186 "identifier": arn,
6187 "queryString": "fields @message | limit 5",
6188 "queryLanguage": "CWLI",
6189 "scheduleExpression": "rate(2 hours)",
6190 "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6191 }),
6192 );
6193 svc.update_scheduled_query(&req).unwrap();
6194
6195 let req = make_request("DeleteScheduledQuery", json!({ "identifier": arn }));
6196 svc.delete_scheduled_query(&req).unwrap();
6197
6198 let req = make_request("ListScheduledQueries", json!({}));
6199 let resp = svc.list_scheduled_queries(&req).unwrap();
6200 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6201 assert!(body["scheduledQueries"].as_array().unwrap().is_empty());
6202 }
6203
6204 #[test]
6207 fn start_live_tail_returns_session() {
6208 let svc = make_service();
6209 let req = make_request(
6210 "StartLiveTail",
6211 json!({ "logGroupIdentifiers": ["/test/group"] }),
6212 );
6213 let resp = svc.start_live_tail(&req).unwrap();
6214 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6215 assert!(body["responseStream"]["sessionStart"]["sessionId"]
6216 .as_str()
6217 .is_some());
6218 }
6219
6220 #[test]
6221 fn list_log_groups_delegates_to_describe() {
6222 let svc = make_service();
6223 create_group(&svc, "/test/list");
6224 let req = make_request("DescribeLogGroups", json!({}));
6225 let resp = svc.describe_log_groups(&req).unwrap();
6226 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6227 assert_eq!(body["logGroups"].as_array().unwrap().len(), 1);
6228 }
6229
6230 #[test]
6231 fn list_log_groups_for_query_returns_empty() {
6232 let svc = make_service();
6233 let req = make_request(
6234 "ListLogGroupsForQuery",
6235 json!({ "queryId": "some-query-id" }),
6236 );
6237 let resp = svc.list_log_groups_for_query(&req).unwrap();
6238 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6239 assert!(body["logGroupIdentifiers"].as_array().unwrap().is_empty());
6240 }
6241
6242 #[test]
6243 fn list_aggregate_log_group_summaries_returns_empty() {
6244 let svc = make_service();
6245 let req = make_request(
6246 "ListAggregateLogGroupSummaries",
6247 json!({ "groupBy": "ACCOUNT" }),
6248 );
6249 let resp = svc.list_aggregate_log_group_summaries(&req).unwrap();
6250 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6251 assert!(body["aggregateLogGroupSummaries"]
6252 .as_array()
6253 .unwrap()
6254 .is_empty());
6255 }
6256
6257 #[test]
6258 fn put_bearer_token_authentication_stores_flag() {
6259 let svc = make_service();
6260 create_group(&svc, "/test/bearer");
6261 let req = make_request(
6262 "PutBearerTokenAuthentication",
6263 json!({
6264 "logGroupIdentifier": "/test/bearer",
6265 "bearerTokenAuthenticationEnabled": true
6266 }),
6267 );
6268 svc.put_bearer_token_authentication(&req).unwrap();
6269 }
6270
6271 #[test]
6272 fn get_log_object_returns_stub() {
6273 let svc = make_service();
6274 let req = make_request(
6275 "GetLogObject",
6276 json!({ "logObjectPointer": "some-pointer" }),
6277 );
6278 let resp = svc.get_log_object(&req).unwrap();
6279 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6280 assert!(body["logObject"].is_object());
6281 }
6282
6283 #[test]
6284 fn get_log_fields_returns_stub() {
6285 let svc = make_service();
6286 let req = make_request(
6287 "GetLogFields",
6288 json!({ "dataSourceName": "test", "dataSourceType": "CW_LOG" }),
6289 );
6290 let resp = svc.get_log_fields(&req).unwrap();
6291 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6292 assert!(body["logFields"].as_array().unwrap().is_empty());
6293 }
6294
6295 #[test]
6296 fn s3_table_integration_stubs() {
6297 let svc = make_service();
6298
6299 let req = make_request(
6300 "AssociateSourceToS3TableIntegration",
6301 json!({
6302 "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test",
6303 "dataSource": { "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:test" }
6304 }),
6305 );
6306 svc.associate_source_to_s3_table_integration(&req).unwrap();
6307
6308 let req = make_request(
6309 "ListSourcesForS3TableIntegration",
6310 json!({
6311 "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test"
6312 }),
6313 );
6314 let resp = svc.list_sources_for_s3_table_integration(&req).unwrap();
6315 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6316 assert_eq!(body["dataSources"].as_array().unwrap().len(), 1);
6317
6318 let req = make_request(
6319 "DisassociateSourceFromS3TableIntegration",
6320 json!({ "identifier": "arn:aws:logs:us-east-1:123456789012:integration:test" }),
6321 );
6322 svc.disassociate_source_from_s3_table_integration(&req)
6323 .unwrap();
6324 }
6325
6326 #[test]
6327 fn update_delivery_configuration_noop() {
6328 let svc = make_service();
6329 let req = make_request(
6331 "PutDeliverySource",
6332 json!({
6333 "name": "test-ds",
6334 "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:dummy",
6335 "logType": "APPLICATION_LOGS"
6336 }),
6337 );
6338 svc.put_delivery_source(&req).unwrap();
6339
6340 let req = make_request(
6341 "PutDeliveryDestination",
6342 json!({
6343 "name": "test-dd",
6344 "deliveryDestinationConfiguration": {
6345 "destinationResourceArn": "arn:aws:s3:::test-bucket"
6346 }
6347 }),
6348 );
6349 svc.put_delivery_destination(&req).unwrap();
6350
6351 let req = make_request(
6352 "CreateDelivery",
6353 json!({
6354 "deliverySourceName": "test-ds",
6355 "deliveryDestinationArn": "arn:aws:logs:us-east-1:123456789012:delivery-destination:test-dd"
6356 }),
6357 );
6358 let resp = svc.create_delivery(&req).unwrap();
6359 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6360 let delivery_id = body["delivery"]["id"].as_str().unwrap().to_string();
6361
6362 let req = make_request("UpdateDeliveryConfiguration", json!({ "id": delivery_id }));
6363 svc.update_delivery_configuration(&req).unwrap();
6364 }
6365
6366 #[test]
6367 fn describe_configuration_templates_returns_empty() {
6368 let svc = make_service();
6369 let req = make_request("DescribeConfigurationTemplates", json!({}));
6370 let resp = svc.describe_configuration_templates(&req).unwrap();
6371 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6372 assert!(body["configurationTemplates"]
6373 .as_array()
6374 .unwrap()
6375 .is_empty());
6376 }
6377}