1use async_trait::async_trait;
2use chrono::Utc;
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
7use fakecloud_core::validation::*;
8
9use crate::state::{
10 AccountPolicy, AnomalyDetector, DataProtectionPolicy, Delivery, DeliveryDestination,
11 DeliverySource, Destination, ExportTask, ImportTask, IndexPolicy, Integration, LogEvent,
12 LogGroup, LogStream, LookupTable, MetricFilter, MetricTransformation, QueryDefinition,
13 QueryInfo, ResourcePolicy, ScheduledQuery, SharedLogsState, SubscriptionFilter, Transformer,
14};
15
16pub struct LogsService {
17 state: SharedLogsState,
18}
19
20impl LogsService {
21 pub fn new(state: SharedLogsState) -> Self {
22 Self { state }
23 }
24}
25
26#[async_trait]
27impl AwsService for LogsService {
28 fn service_name(&self) -> &str {
29 "logs"
30 }
31
32 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
33 match req.action.as_str() {
34 "CreateLogGroup" => self.create_log_group(&req),
35 "DeleteLogGroup" => self.delete_log_group(&req),
36 "DescribeLogGroups" => self.describe_log_groups(&req),
37 "CreateLogStream" => self.create_log_stream(&req),
38 "DeleteLogStream" => self.delete_log_stream(&req),
39 "DescribeLogStreams" => self.describe_log_streams(&req),
40 "PutLogEvents" => self.put_log_events(&req),
41 "GetLogEvents" => self.get_log_events(&req),
42 "FilterLogEvents" => self.filter_log_events(&req),
43 "TagLogGroup" => self.tag_log_group(&req),
44 "UntagLogGroup" => self.untag_log_group(&req),
45 "ListTagsLogGroup" => self.list_tags_log_group(&req),
46 "TagResource" => self.tag_resource(&req),
47 "UntagResource" => self.untag_resource(&req),
48 "ListTagsForResource" => self.list_tags_for_resource(&req),
49 "PutRetentionPolicy" => self.put_retention_policy(&req),
50 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
51 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
52 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
53 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
54 "PutMetricFilter" => self.put_metric_filter(&req),
55 "DescribeMetricFilters" => self.describe_metric_filters(&req),
56 "DeleteMetricFilter" => self.delete_metric_filter(&req),
57 "PutResourcePolicy" => self.put_resource_policy(&req),
58 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
59 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
60 "PutDestination" => self.put_destination(&req),
61 "DescribeDestinations" => self.describe_destinations(&req),
62 "DeleteDestination" => self.delete_destination(&req),
63 "PutDestinationPolicy" => self.put_destination_policy(&req),
64 "StartQuery" => self.start_query(&req),
65 "GetQueryResults" => self.get_query_results(&req),
66 "DescribeQueries" => self.describe_queries(&req),
67 "CreateExportTask" => self.create_export_task(&req),
68 "DescribeExportTasks" => self.describe_export_tasks(&req),
69 "CancelExportTask" => self.cancel_export_task(&req),
70 "PutDeliveryDestination" => self.put_delivery_destination(&req),
71 "GetDeliveryDestination" => self.get_delivery_destination(&req),
72 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
73 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
74 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
75 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
76 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
77 "PutDeliverySource" => self.put_delivery_source(&req),
78 "GetDeliverySource" => self.get_delivery_source(&req),
79 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
80 "DeleteDeliverySource" => self.delete_delivery_source(&req),
81 "CreateDelivery" => self.create_delivery(&req),
82 "GetDelivery" => self.get_delivery(&req),
83 "DescribeDeliveries" => self.describe_deliveries(&req),
84 "DeleteDelivery" => self.delete_delivery(&req),
85 "AssociateKmsKey" => self.associate_kms_key(&req),
86 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
87 "PutQueryDefinition" => self.put_query_definition(&req),
88 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
89 "DeleteQueryDefinition" => self.delete_query_definition(&req),
90 "PutAccountPolicy" => self.put_account_policy(&req),
91 "DescribeAccountPolicies" => self.describe_account_policies(&req),
92 "DeleteAccountPolicy" => self.delete_account_policy(&req),
93 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
94 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
95 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
96 "PutIndexPolicy" => self.put_index_policy(&req),
97 "DescribeIndexPolicies" => self.describe_index_policies(&req),
98 "DeleteIndexPolicy" => self.delete_index_policy(&req),
99 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
100 "PutTransformer" => self.put_transformer(&req),
101 "GetTransformer" => self.get_transformer(&req),
102 "DeleteTransformer" => self.delete_transformer(&req),
103 "TestTransformer" => self.test_transformer(&req),
104 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
105 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
106 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
107 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
108 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
109 "GetLogGroupFields" => self.get_log_group_fields(&req),
110 "TestMetricFilter" => self.test_metric_filter(&req),
111 "StopQuery" => self.stop_query(&req),
112 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
113 "GetLogRecord" => self.get_log_record(&req),
114 "ListAnomalies" => self.list_anomalies(&req),
115 "UpdateAnomaly" => self.update_anomaly(&req),
116 "CreateImportTask" => self.create_import_task(&req),
117 "DescribeImportTasks" => self.describe_import_tasks(&req),
118 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
119 "CancelImportTask" => self.cancel_import_task(&req),
120 "PutIntegration" => self.put_integration(&req),
121 "GetIntegration" => self.get_integration(&req),
122 "DeleteIntegration" => self.delete_integration(&req),
123 "ListIntegrations" => self.list_integrations(&req),
124 "CreateLookupTable" => self.create_lookup_table(&req),
125 "GetLookupTable" => self.get_lookup_table(&req),
126 "DescribeLookupTables" => self.describe_lookup_tables(&req),
127 "DeleteLookupTable" => self.delete_lookup_table(&req),
128 "UpdateLookupTable" => self.update_lookup_table(&req),
129 "CreateScheduledQuery" => self.create_scheduled_query(&req),
130 "GetScheduledQuery" => self.get_scheduled_query(&req),
131 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
132 "ListScheduledQueries" => self.list_scheduled_queries(&req),
133 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
134 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
135 "StartLiveTail" => self.start_live_tail(&req),
136 "ListLogGroups" => self.list_log_groups(&req),
137 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
138 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
139 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
140 "GetLogObject" => self.get_log_object(&req),
141 "GetLogFields" => self.get_log_fields(&req),
142 "AssociateSourceToS3TableIntegration" => {
143 self.associate_source_to_s3_table_integration(&req)
144 }
145 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
146 "DisassociateSourceFromS3TableIntegration" => {
147 self.disassociate_source_from_s3_table_integration(&req)
148 }
149 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
150 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
151 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
152 }
153 }
154
155 fn supported_actions(&self) -> &[&str] {
156 &[
157 "CreateLogGroup",
158 "DeleteLogGroup",
159 "DescribeLogGroups",
160 "CreateLogStream",
161 "DeleteLogStream",
162 "DescribeLogStreams",
163 "PutLogEvents",
164 "GetLogEvents",
165 "FilterLogEvents",
166 "TagLogGroup",
167 "UntagLogGroup",
168 "ListTagsLogGroup",
169 "TagResource",
170 "UntagResource",
171 "ListTagsForResource",
172 "PutRetentionPolicy",
173 "DeleteRetentionPolicy",
174 "PutSubscriptionFilter",
175 "DescribeSubscriptionFilters",
176 "DeleteSubscriptionFilter",
177 "PutMetricFilter",
178 "DescribeMetricFilters",
179 "DeleteMetricFilter",
180 "PutResourcePolicy",
181 "DescribeResourcePolicies",
182 "DeleteResourcePolicy",
183 "PutDestination",
184 "DescribeDestinations",
185 "DeleteDestination",
186 "PutDestinationPolicy",
187 "StartQuery",
188 "GetQueryResults",
189 "DescribeQueries",
190 "CreateExportTask",
191 "DescribeExportTasks",
192 "CancelExportTask",
193 "PutDeliveryDestination",
194 "GetDeliveryDestination",
195 "DescribeDeliveryDestinations",
196 "DeleteDeliveryDestination",
197 "PutDeliveryDestinationPolicy",
198 "GetDeliveryDestinationPolicy",
199 "DeleteDeliveryDestinationPolicy",
200 "PutDeliverySource",
201 "GetDeliverySource",
202 "DescribeDeliverySources",
203 "DeleteDeliverySource",
204 "CreateDelivery",
205 "GetDelivery",
206 "DescribeDeliveries",
207 "DeleteDelivery",
208 "AssociateKmsKey",
209 "DisassociateKmsKey",
210 "PutQueryDefinition",
211 "DescribeQueryDefinitions",
212 "DeleteQueryDefinition",
213 "PutAccountPolicy",
214 "DescribeAccountPolicies",
215 "DeleteAccountPolicy",
216 "PutDataProtectionPolicy",
217 "GetDataProtectionPolicy",
218 "DeleteDataProtectionPolicy",
219 "PutIndexPolicy",
220 "DescribeIndexPolicies",
221 "DeleteIndexPolicy",
222 "DescribeFieldIndexes",
223 "PutTransformer",
224 "GetTransformer",
225 "DeleteTransformer",
226 "TestTransformer",
227 "CreateLogAnomalyDetector",
228 "GetLogAnomalyDetector",
229 "DeleteLogAnomalyDetector",
230 "ListLogAnomalyDetectors",
231 "UpdateLogAnomalyDetector",
232 "GetLogGroupFields",
233 "TestMetricFilter",
234 "StopQuery",
235 "PutLogGroupDeletionProtection",
236 "GetLogRecord",
237 "ListAnomalies",
238 "UpdateAnomaly",
239 "CreateImportTask",
240 "DescribeImportTasks",
241 "DescribeImportTaskBatches",
242 "CancelImportTask",
243 "PutIntegration",
244 "GetIntegration",
245 "DeleteIntegration",
246 "ListIntegrations",
247 "CreateLookupTable",
248 "GetLookupTable",
249 "DescribeLookupTables",
250 "DeleteLookupTable",
251 "UpdateLookupTable",
252 "CreateScheduledQuery",
253 "GetScheduledQuery",
254 "GetScheduledQueryHistory",
255 "ListScheduledQueries",
256 "DeleteScheduledQuery",
257 "UpdateScheduledQuery",
258 "StartLiveTail",
259 "ListLogGroups",
260 "ListLogGroupsForQuery",
261 "ListAggregateLogGroupSummaries",
262 "PutBearerTokenAuthentication",
263 "GetLogObject",
264 "GetLogFields",
265 "AssociateSourceToS3TableIntegration",
266 "ListSourcesForS3TableIntegration",
267 "DisassociateSourceFromS3TableIntegration",
268 "UpdateDeliveryConfiguration",
269 "DescribeConfigurationTemplates",
270 ]
271 }
272}
273
274fn body_json(req: &AwsRequest) -> Value {
275 serde_json::from_slice(&req.body).unwrap_or(Value::Null)
276}
277
278fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
279 body[field].as_str().ok_or_else(|| {
280 AwsServiceError::aws_error(
281 StatusCode::BAD_REQUEST,
282 "InvalidParameterException",
283 format!("{field} is required"),
284 )
285 })
286}
287
288fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
291 let mut m: serde_json::Map<String, Value> =
292 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
293 m.entry("destinationResourceArn".to_string())
294 .or_insert_with(|| json!(""));
295 Value::Object(m)
296}
297
298fn generate_sequence_token() -> String {
299 use std::time::{SystemTime, UNIX_EPOCH};
300 let nanos = SystemTime::now()
301 .duration_since(UNIX_EPOCH)
302 .unwrap_or_default()
303 .as_nanos();
304 format!("{:038}", nanos % 10u128.pow(38))
306}
307
308fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
309 AwsServiceError::aws_error(
310 StatusCode::BAD_REQUEST,
311 "InvalidParameterException",
312 format!(
313 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
314 ),
315 )
316}
317
318impl LogsService {
319 fn create_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
322 let body = body_json(req);
323 let name = body["logGroupName"]
324 .as_str()
325 .ok_or_else(|| {
326 AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "InvalidParameterException",
329 "logGroupName is required",
330 )
331 })?
332 .to_string();
333
334 validate_string_length("logGroupName", &name, 1, 512)?;
335 validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 1, 256)?;
336
337 let mut state = self.state.write();
338 if state.log_groups.contains_key(&name) {
339 return Err(AwsServiceError::aws_error(
340 StatusCode::BAD_REQUEST,
341 "ResourceAlreadyExistsException",
342 format!("The specified log group already exists: {name}"),
343 ));
344 }
345
346 let arn = format!(
347 "arn:aws:logs:{}:{}:log-group:{}:*",
348 state.region, state.account_id, name
349 );
350 let now = Utc::now().timestamp_millis();
351
352 let tags = body["tags"]
353 .as_object()
354 .map(|m| {
355 m.iter()
356 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
357 .collect()
358 })
359 .unwrap_or_default();
360
361 let kms_key_id = body["kmsKeyId"].as_str().map(|s| s.to_string());
362
363 state.log_groups.insert(
364 name.clone(),
365 LogGroup {
366 name,
367 arn,
368 creation_time: now,
369 retention_in_days: None,
370 kms_key_id,
371 tags,
372 log_streams: std::collections::HashMap::new(),
373 stored_bytes: 0,
374 subscription_filters: Vec::new(),
375 data_protection_policy: None,
376 index_policies: Vec::new(),
377 transformer: None,
378 deletion_protection: false,
379 },
380 );
381
382 Ok(AwsResponse::json(StatusCode::OK, "{}"))
383 }
384
385 fn delete_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
386 let body = body_json(req);
387 let name = body["logGroupName"].as_str().ok_or_else(|| {
388 AwsServiceError::aws_error(
389 StatusCode::BAD_REQUEST,
390 "InvalidParameterException",
391 "logGroupName is required",
392 )
393 })?;
394
395 validate_string_length("logGroupName", name, 1, 512)?;
396
397 let mut state = self.state.write();
398 if let Some(group) = state.log_groups.get(name) {
400 if group.deletion_protection {
401 return Err(AwsServiceError::aws_error(
402 StatusCode::BAD_REQUEST,
403 "OperationAbortedException",
404 format!("Log group {name} has deletion protection enabled"),
405 ));
406 }
407 }
408 if state.log_groups.remove(name).is_none() {
409 return Err(AwsServiceError::aws_error(
410 StatusCode::BAD_REQUEST,
411 "ResourceNotFoundException",
412 format!("The specified log group does not exist: {name}"),
413 ));
414 }
415
416 Ok(AwsResponse::json(StatusCode::OK, "{}"))
417 }
418
419 fn describe_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420 let body = body_json(req);
421 let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
422 let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
423 let limit = body["limit"].as_i64().unwrap_or(50) as usize;
424 let next_token = body["nextToken"].as_str();
425
426 validate_optional_string_length(
427 "logGroupNamePrefix",
428 body["logGroupNamePrefix"].as_str(),
429 1,
430 512,
431 )?;
432 validate_optional_string_length(
433 "logGroupNamePattern",
434 body["logGroupNamePattern"].as_str(),
435 0,
436 512,
437 )?;
438 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
439 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
440 validate_optional_enum_value(
441 "logGroupClass",
442 &body["logGroupClass"],
443 &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
444 )?;
445
446 let state = self.state.read();
447 let mut groups: Vec<&LogGroup> = state
448 .log_groups
449 .values()
450 .filter(|g| {
451 (prefix.is_empty() || g.name.starts_with(prefix))
452 && (pattern.is_empty() || g.name.contains(pattern))
453 })
454 .collect();
455 groups.sort_by(|a, b| a.name.cmp(&b.name));
456
457 let start_idx = if let Some(token) = next_token {
459 groups
460 .iter()
461 .position(|g| g.name.as_str() > token)
462 .unwrap_or(groups.len())
463 } else {
464 0
465 };
466
467 let page = &groups[start_idx..];
468 let has_more = page.len() > limit;
469 let page = if has_more { &page[..limit] } else { page };
470
471 let log_groups: Vec<Value> = page
472 .iter()
473 .map(|g| {
474 let log_group_arn = g.arn.trim_end_matches(":*").to_string();
475 let mut obj = json!({
476 "logGroupName": g.name,
477 "arn": g.arn,
478 "logGroupArn": log_group_arn,
479 "creationTime": g.creation_time,
480 "storedBytes": g.stored_bytes,
481 "metricFilterCount": 0,
482 });
483 if let Some(days) = g.retention_in_days {
484 obj["retentionInDays"] = json!(days);
485 }
486 if let Some(ref kms) = g.kms_key_id {
487 obj["kmsKeyId"] = json!(kms);
488 }
489 obj
490 })
491 .collect();
492
493 let mut result = json!({ "logGroups": log_groups });
494 if has_more {
495 if let Some(last) = page.last() {
496 result["nextToken"] = json!(last.name);
497 }
498 }
499
500 Ok(AwsResponse::json(
501 StatusCode::OK,
502 serde_json::to_string(&result).unwrap(),
503 ))
504 }
505
506 fn create_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
509 let body = body_json(req);
510 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
511 AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "InvalidParameterException",
514 "logGroupName is required",
515 )
516 })?;
517 let stream_name = body["logStreamName"]
518 .as_str()
519 .ok_or_else(|| {
520 AwsServiceError::aws_error(
521 StatusCode::BAD_REQUEST,
522 "InvalidParameterException",
523 "logStreamName is required",
524 )
525 })?
526 .to_string();
527
528 validate_string_length("logGroupName", group_name, 1, 512)?;
529 validate_string_length("logStreamName", &stream_name, 1, 512)?;
530
531 let mut state = self.state.write();
532 let region = state.region.clone();
533 let account_id = state.account_id.clone();
534
535 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
536 AwsServiceError::aws_error(
537 StatusCode::BAD_REQUEST,
538 "ResourceNotFoundException",
539 format!("The specified log group does not exist: {group_name}"),
540 )
541 })?;
542
543 if group.log_streams.contains_key(&stream_name) {
544 return Err(AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "ResourceAlreadyExistsException",
547 format!("The specified log stream already exists: {stream_name}"),
548 ));
549 }
550
551 let arn = format!(
552 "arn:aws:logs:{region}:{account_id}:log-group:{group_name}:log-stream:{stream_name}",
553 );
554 let now = Utc::now().timestamp_millis();
555
556 group.log_streams.insert(
557 stream_name.clone(),
558 LogStream {
559 name: stream_name,
560 arn,
561 creation_time: now,
562 first_event_timestamp: None,
563 last_event_timestamp: None,
564 last_ingestion_time: None,
565 upload_sequence_token: generate_sequence_token(),
566 events: Vec::new(),
567 },
568 );
569
570 Ok(AwsResponse::json(StatusCode::OK, "{}"))
571 }
572
573 fn delete_log_stream(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574 let body = body_json(req);
575 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
576 AwsServiceError::aws_error(
577 StatusCode::BAD_REQUEST,
578 "InvalidParameterException",
579 "logGroupName is required",
580 )
581 })?;
582 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
583 AwsServiceError::aws_error(
584 StatusCode::BAD_REQUEST,
585 "InvalidParameterException",
586 "logStreamName is required",
587 )
588 })?;
589
590 validate_string_length("logGroupName", group_name, 1, 512)?;
591 validate_string_length("logStreamName", stream_name, 1, 512)?;
592
593 let mut state = self.state.write();
594 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
595 AwsServiceError::aws_error(
596 StatusCode::BAD_REQUEST,
597 "ResourceNotFoundException",
598 format!("The specified log group does not exist: {group_name}"),
599 )
600 })?;
601
602 if group.log_streams.remove(stream_name).is_none() {
603 return Err(AwsServiceError::aws_error(
604 StatusCode::BAD_REQUEST,
605 "ResourceNotFoundException",
606 format!("The specified log stream does not exist: {stream_name}"),
607 ));
608 }
609
610 Ok(AwsResponse::json(StatusCode::OK, "{}"))
611 }
612
613 fn describe_log_streams(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
614 let body = body_json(req);
615
616 let group_name = if let Some(name) = body["logGroupName"].as_str() {
618 name.to_string()
619 } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
620 if identifier.ends_with(":*") {
622 return Err(AwsServiceError::aws_error(
623 StatusCode::BAD_REQUEST,
624 "InvalidParameterException",
625 format!(
626 "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
627 identifier
628 ),
629 ));
630 }
631 if identifier.starts_with("arn:aws:logs:") {
633 extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
634 } else {
635 identifier.to_string()
636 }
637 } else {
638 return Err(AwsServiceError::aws_error(
639 StatusCode::BAD_REQUEST,
640 "InvalidParameterException",
641 "logGroupName is required",
642 ));
643 };
644
645 let prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
646 let limit = body["limit"].as_i64().unwrap_or(50) as usize;
647 let order_by = body["orderBy"].as_str().unwrap_or("LogStreamName");
648 let next_token = body["nextToken"].as_str();
649
650 validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
651 validate_optional_string_length(
652 "logGroupIdentifier",
653 body["logGroupIdentifier"].as_str(),
654 1,
655 2048,
656 )?;
657 validate_optional_string_length(
658 "logStreamNamePrefix",
659 body["logStreamNamePrefix"].as_str(),
660 1,
661 512,
662 )?;
663
664 if limit > 50 {
666 return Err(validation_error(
667 "limit",
668 &limit.to_string(),
669 "Member must have value less than or equal to 50",
670 ));
671 }
672
673 if order_by != "LogStreamName" && order_by != "LastEventTime" {
675 return Err(validation_error(
676 "orderBy",
677 order_by,
678 "Member must satisfy enum value set: [LogStreamName, LastEventTime]",
679 ));
680 }
681
682 if order_by == "LastEventTime" && !prefix.is_empty() {
684 return Err(AwsServiceError::aws_error(
685 StatusCode::BAD_REQUEST,
686 "InvalidParameterException",
687 "Cannot order by LastEventTime with a logStreamNamePrefix.",
688 ));
689 }
690
691 let state = self.state.read();
692 let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
693 AwsServiceError::aws_error(
694 StatusCode::BAD_REQUEST,
695 "ResourceNotFoundException",
696 format!("The specified log group does not exist: {group_name}"),
697 )
698 })?;
699
700 let mut streams: Vec<&LogStream> = group
701 .log_streams
702 .values()
703 .filter(|s| prefix.is_empty() || s.name.starts_with(prefix))
704 .collect();
705 streams.sort_by(|a, b| a.name.cmp(&b.name));
706
707 let start_idx = if let Some(token) = next_token {
709 if let Some((_group, last_stream)) = token.split_once('@') {
710 streams
711 .iter()
712 .position(|s| s.name.as_str() > last_stream)
713 .unwrap_or(streams.len())
714 } else {
715 streams.len() }
717 } else {
718 0
719 };
720
721 let page = &streams[start_idx..];
722 let has_more = page.len() > limit;
723 let page = if has_more { &page[..limit] } else { page };
724
725 let log_streams: Vec<Value> = page
726 .iter()
727 .map(|s| {
728 let mut obj = json!({
729 "logStreamName": s.name,
730 "arn": s.arn,
731 "creationTime": s.creation_time,
732 "uploadSequenceToken": s.upload_sequence_token,
733 });
734 if let Some(ts) = s.first_event_timestamp {
735 obj["firstEventTimestamp"] = json!(ts);
736 }
737 if let Some(ts) = s.last_event_timestamp {
738 obj["lastEventTimestamp"] = json!(ts);
739 }
740 if let Some(ts) = s.last_ingestion_time {
741 obj["lastIngestionTime"] = json!(ts);
742 }
743 obj
744 })
745 .collect();
746
747 let mut result = json!({ "logStreams": log_streams });
748 if has_more {
749 if let Some(last) = page.last() {
750 result["nextToken"] = json!(format!("{}@{}", group_name, last.name));
751 }
752 }
753
754 Ok(AwsResponse::json(
755 StatusCode::OK,
756 serde_json::to_string(&result).unwrap(),
757 ))
758 }
759
760 fn put_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763 let body = body_json(req);
764 let group_name = body["logGroupName"].as_str().ok_or_else(|| {
765 AwsServiceError::aws_error(
766 StatusCode::BAD_REQUEST,
767 "InvalidParameterException",
768 "logGroupName is required",
769 )
770 })?;
771 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
772 AwsServiceError::aws_error(
773 StatusCode::BAD_REQUEST,
774 "InvalidParameterException",
775 "logStreamName is required",
776 )
777 })?;
778
779 validate_string_length("logGroupName", group_name, 1, 512)?;
780 validate_string_length("logStreamName", stream_name, 1, 512)?;
781
782 let log_events = body["logEvents"].as_array().ok_or_else(|| {
783 AwsServiceError::aws_error(
784 StatusCode::BAD_REQUEST,
785 "InvalidParameterException",
786 "logEvents is required",
787 )
788 })?;
789
790 let now = Utc::now().timestamp_millis();
791
792 let timestamps: Vec<i64> = log_events
794 .iter()
795 .map(|e| e["timestamp"].as_i64().unwrap_or(now))
796 .collect();
797 for i in 1..timestamps.len() {
798 if timestamps[i] < timestamps[i - 1] {
799 return Err(AwsServiceError::aws_error(
800 StatusCode::BAD_REQUEST,
801 "InvalidParameterException",
802 "Log events in a single PutLogEvents request must be in chronological order.",
803 ));
804 }
805 }
806
807 let fourteen_days_ms = 14 * 24 * 60 * 60 * 1000i64;
809 let two_hours_ms = 2 * 60 * 60 * 1000i64;
810 let mut too_old_end_index: Option<usize> = None;
811 let mut too_new_start_index: Option<usize> = None;
812
813 for (i, ts) in timestamps.iter().enumerate() {
814 if now.saturating_sub(*ts) > fourteen_days_ms {
815 too_old_end_index = Some(i);
816 } else if ts.saturating_sub(now) > two_hours_ms && too_new_start_index.is_none() {
817 too_new_start_index = Some(i);
818 }
819 }
820
821 let mut new_events: Vec<LogEvent> = Vec::new();
823 let mut rejected_info = json!({});
824 let mut has_rejected = false;
825
826 for (i, e) in log_events.iter().enumerate() {
827 let ts = e["timestamp"].as_i64().unwrap_or(now);
828 let is_too_old = too_old_end_index.is_some() && i <= too_old_end_index.unwrap();
829 let is_too_new = too_new_start_index.is_some() && i >= too_new_start_index.unwrap();
830
831 if is_too_old || is_too_new {
832 continue;
833 }
834
835 new_events.push(LogEvent {
836 timestamp: ts,
837 message: e["message"].as_str().unwrap_or("").to_string(),
838 ingestion_time: now,
839 });
840 }
841
842 if let Some(idx) = too_old_end_index {
843 rejected_info["tooOldLogEventEndIndex"] = json!(idx);
844 has_rejected = true;
845 }
846 if let Some(idx) = too_new_start_index {
847 rejected_info["tooNewLogEventStartIndex"] = json!(idx);
848 has_rejected = true;
849 }
850
851 let mut state = self.state.write();
852 let group = state.log_groups.get_mut(group_name).ok_or_else(|| {
853 AwsServiceError::aws_error(
854 StatusCode::BAD_REQUEST,
855 "ResourceNotFoundException",
856 format!("The specified log group does not exist: {group_name}"),
857 )
858 })?;
859
860 let stream = group.log_streams.get_mut(stream_name).ok_or_else(|| {
861 AwsServiceError::aws_error(
862 StatusCode::BAD_REQUEST,
863 "ResourceNotFoundException",
864 "The specified log stream does not exist.",
865 )
866 })?;
867
868 for event in &new_events {
870 if stream.first_event_timestamp.is_none()
871 || Some(event.timestamp) < stream.first_event_timestamp
872 {
873 stream.first_event_timestamp = Some(event.timestamp);
874 }
875 if stream.last_event_timestamp.is_none()
876 || Some(event.timestamp) > stream.last_event_timestamp
877 {
878 stream.last_event_timestamp = Some(event.timestamp);
879 }
880 group.stored_bytes += event.message.len() as i64 + 26;
881 }
882 stream.last_ingestion_time = Some(now);
883
884 stream.upload_sequence_token = generate_sequence_token();
886
887 stream.events.append(&mut new_events);
888 stream.events.sort_by_key(|e| e.timestamp);
889
890 let mut response = json!({
891 "nextSequenceToken": stream.upload_sequence_token,
892 });
893 if has_rejected {
894 response["rejectedLogEventsInfo"] = rejected_info;
895 }
896
897 Ok(AwsResponse::json(
898 StatusCode::OK,
899 serde_json::to_string(&response).unwrap(),
900 ))
901 }
902
903 fn get_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904 let body = body_json(req);
905
906 let group_name = if let Some(name) = body["logGroupName"].as_str() {
908 name.to_string()
909 } else if let Some(identifier) = body["logGroupIdentifier"].as_str() {
910 if identifier.ends_with(":*") {
911 return Err(AwsServiceError::aws_error(
912 StatusCode::BAD_REQUEST,
913 "InvalidParameterException",
914 format!(
915 "1 validation error detected: Value '{}' at 'logGroupIdentifier' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w#+=/:,.@-]*",
916 identifier
917 ),
918 ));
919 }
920 if identifier.starts_with("arn:aws:logs:") {
921 extract_log_group_from_arn(identifier).unwrap_or_else(|| identifier.to_string())
922 } else {
923 identifier.to_string()
924 }
925 } else {
926 return Err(AwsServiceError::aws_error(
927 StatusCode::BAD_REQUEST,
928 "InvalidParameterException",
929 "logGroupName is required",
930 ));
931 };
932
933 let stream_name = body["logStreamName"].as_str().ok_or_else(|| {
934 AwsServiceError::aws_error(
935 StatusCode::BAD_REQUEST,
936 "InvalidParameterException",
937 "logStreamName is required",
938 )
939 })?;
940
941 validate_optional_string_length("logGroupName", body["logGroupName"].as_str(), 1, 512)?;
942 validate_optional_string_length(
943 "logGroupIdentifier",
944 body["logGroupIdentifier"].as_str(),
945 1,
946 2048,
947 )?;
948 validate_string_length("logStreamName", stream_name, 1, 512)?;
949
950 let start_time = body["startTime"].as_i64();
951 let end_time = body["endTime"].as_i64();
952 let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
953 let start_from_head = body["startFromHead"].as_bool().unwrap_or(false);
954 let next_token = body["nextToken"].as_str();
955
956 if limit > 10000 {
958 return Err(validation_error(
959 "limit",
960 &limit.to_string(),
961 "Member must have value less than or equal to 10000",
962 ));
963 }
964
965 if let Some(token) = next_token {
967 if !token.starts_with("f/") && !token.starts_with("b/") {
968 return Err(AwsServiceError::aws_error(
969 StatusCode::BAD_REQUEST,
970 "InvalidParameterException",
971 "The specified nextToken is invalid.",
972 ));
973 }
974 let num_part = &token[2..];
975 if num_part.len() != 56 || num_part.parse::<u128>().is_err() {
976 return Err(AwsServiceError::aws_error(
977 StatusCode::BAD_REQUEST,
978 "InvalidParameterException",
979 "The specified nextToken is invalid.",
980 ));
981 }
982 }
983
984 let state = self.state.read();
985 let group = state.log_groups.get(group_name.as_str()).ok_or_else(|| {
986 AwsServiceError::aws_error(
987 StatusCode::BAD_REQUEST,
988 "ResourceNotFoundException",
989 format!("The specified log group does not exist: {group_name}"),
990 )
991 })?;
992
993 let stream = group.log_streams.get(stream_name).ok_or_else(|| {
994 AwsServiceError::aws_error(
995 StatusCode::BAD_REQUEST,
996 "ResourceNotFoundException",
997 format!("The specified log stream does not exist: {stream_name}"),
998 )
999 })?;
1000
1001 let all_events: Vec<&LogEvent> = stream
1003 .events
1004 .iter()
1005 .filter(|e| {
1006 if let Some(start) = start_time {
1007 if e.timestamp < start {
1008 return false;
1009 }
1010 }
1011 if let Some(end) = end_time {
1012 if e.timestamp >= end {
1013 return false;
1014 }
1015 }
1016 true
1017 })
1018 .collect();
1019
1020 let total = all_events.len();
1021
1022 let (start_idx, is_forward) = if let Some(token) = next_token {
1024 let is_forward = token.starts_with("f/");
1025 let idx: usize = token[2..].parse().unwrap_or(0);
1026 if is_forward {
1027 (idx + 1, true)
1029 } else {
1030 (idx, false)
1032 }
1033 } else {
1034 (0, start_from_head)
1035 };
1036
1037 let events_slice: Vec<&LogEvent>;
1038 let next_forward_idx: usize;
1039 let next_backward_idx: usize;
1040
1041 if is_forward || start_from_head && next_token.is_none() {
1042 let end_idx = std::cmp::min(start_idx + limit, total);
1044 if start_idx >= total {
1045 events_slice = Vec::new();
1046 let last_idx = if total > 0 { total - 1 } else { 0 };
1047 next_forward_idx = last_idx;
1048 next_backward_idx = last_idx;
1049 } else {
1050 events_slice = all_events[start_idx..end_idx].to_vec();
1051 next_forward_idx = end_idx - 1;
1052 next_backward_idx = start_idx;
1053 }
1054 } else {
1055 if next_token.is_some() {
1057 let begin = start_idx.saturating_sub(limit);
1059 let end_idx = start_idx;
1060 if begin >= total || end_idx > total || begin >= end_idx {
1061 events_slice = Vec::new();
1062 next_forward_idx = start_idx;
1063 next_backward_idx = start_idx;
1064 } else {
1065 events_slice = all_events[begin..end_idx].to_vec();
1066 next_forward_idx = end_idx - 1;
1067 next_backward_idx = begin;
1068 }
1069 } else {
1070 let begin = total.saturating_sub(limit);
1072 events_slice = all_events[begin..].to_vec();
1073 next_forward_idx = if total > 0 { total - 1 } else { 0 };
1074 next_backward_idx = begin;
1075 }
1076 }
1077
1078 let events_json: Vec<Value> = events_slice
1079 .iter()
1080 .map(|e| {
1081 json!({
1082 "timestamp": e.timestamp,
1083 "message": e.message,
1084 "ingestionTime": e.ingestion_time,
1085 })
1086 })
1087 .collect();
1088
1089 let forward_token = format!("f/{:056}", next_forward_idx);
1090 let backward_token = format!("b/{:056}", next_backward_idx);
1091
1092 Ok(AwsResponse::json(
1093 StatusCode::OK,
1094 serde_json::to_string(&json!({
1095 "events": events_json,
1096 "nextForwardToken": forward_token,
1097 "nextBackwardToken": backward_token,
1098 }))
1099 .unwrap(),
1100 ))
1101 }
1102
1103 fn filter_log_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1104 let body = body_json(req);
1105 let log_group_identifier = body["logGroupIdentifier"].as_str();
1106 let log_group_name = body["logGroupName"].as_str();
1107 let filter_pattern = body["filterPattern"].as_str().unwrap_or("");
1108 let start_time = body["startTime"].as_i64();
1109 let end_time = body["endTime"].as_i64();
1110 let limit = body["limit"].as_i64().unwrap_or(10000) as usize;
1111 let next_token = body["nextToken"].as_str();
1112 let stream_names: Vec<&str> = body["logStreamNames"]
1113 .as_array()
1114 .map(|a| a.iter().filter_map(|v| v.as_str()).collect())
1115 .unwrap_or_default();
1116 let stream_name_prefix = body["logStreamNamePrefix"].as_str().unwrap_or("");
1117
1118 if let Some(name) = log_group_name {
1119 validate_string_length("logGroupName", name, 1, 512)?;
1120 }
1121 validate_optional_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
1122 validate_optional_string_length(
1123 "logStreamNamePrefix",
1124 body["logStreamNamePrefix"].as_str(),
1125 1,
1126 512,
1127 )?;
1128 validate_optional_string_length("filterPattern", Some(filter_pattern), 0, 1024)?;
1129
1130 let resolved_group_name = if let Some(identifier) = log_group_identifier {
1133 if identifier.starts_with("arn:") {
1134 extract_log_group_from_arn(identifier).ok_or_else(|| {
1135 AwsServiceError::aws_error(
1136 StatusCode::BAD_REQUEST,
1137 "InvalidParameterException",
1138 format!("Invalid ARN: {identifier}"),
1139 )
1140 })?
1141 } else {
1142 identifier.to_string()
1143 }
1144 } else if let Some(name) = log_group_name {
1145 name.to_string()
1146 } else {
1147 return Err(AwsServiceError::aws_error(
1148 StatusCode::BAD_REQUEST,
1149 "InvalidParameterException",
1150 "Either logGroupName or logGroupIdentifier is required",
1151 ));
1152 };
1153
1154 if limit > 10000 {
1156 return Err(validation_error(
1157 "limit",
1158 &limit.to_string(),
1159 "Member must have value less than or equal to 10000",
1160 ));
1161 }
1162
1163 let state = self.state.read();
1164 let group = state
1165 .log_groups
1166 .get(resolved_group_name.as_str())
1167 .ok_or_else(|| {
1168 AwsServiceError::aws_error(
1169 StatusCode::BAD_REQUEST,
1170 "ResourceNotFoundException",
1171 format!("The specified log group does not exist: {resolved_group_name}"),
1172 )
1173 })?;
1174
1175 let mut filtered_events: Vec<Value> = Vec::new();
1176
1177 let streams: Vec<(&String, &LogStream)> = if !stream_names.is_empty() {
1178 group
1179 .log_streams
1180 .iter()
1181 .filter(|(name, _)| stream_names.contains(&name.as_str()))
1182 .collect()
1183 } else if !stream_name_prefix.is_empty() {
1184 group
1185 .log_streams
1186 .iter()
1187 .filter(|(name, _)| name.starts_with(stream_name_prefix))
1188 .collect()
1189 } else {
1190 group.log_streams.iter().collect()
1191 };
1192
1193 for (_, stream) in streams {
1194 for event in &stream.events {
1195 if let Some(start) = start_time {
1196 if event.timestamp < start {
1197 continue;
1198 }
1199 }
1200 if let Some(end) = end_time {
1201 if event.timestamp >= end {
1202 continue;
1203 }
1204 }
1205 if !filter_pattern.is_empty()
1207 && !matches_filter_pattern(filter_pattern, &event.message)
1208 {
1209 continue;
1210 }
1211
1212 let event_id = format!("{}-{}", stream.name, event.timestamp);
1213
1214 filtered_events.push(json!({
1215 "logStreamName": stream.name,
1216 "timestamp": event.timestamp,
1217 "message": event.message,
1218 "ingestionTime": event.ingestion_time,
1219 "eventId": event_id,
1220 }));
1221 }
1222 }
1223
1224 filtered_events.sort_by_key(|e| e["timestamp"].as_i64().unwrap_or(0));
1225
1226 let start_idx = if let Some(token) = next_token {
1229 let parts: Vec<&str> = token.splitn(3, '@').collect();
1230 if parts.len() == 3 {
1231 let after_event_id = parts[2];
1232 filtered_events
1234 .iter()
1235 .position(|e| e["eventId"].as_str().unwrap_or("") == after_event_id)
1236 .map(|pos| pos + 1)
1237 .unwrap_or(filtered_events.len())
1238 } else {
1239 filtered_events.len() }
1241 } else {
1242 0
1243 };
1244
1245 let remaining = &filtered_events[start_idx..];
1246 let has_more = remaining.len() > limit;
1247 let page: Vec<Value> = if has_more {
1248 remaining[..limit].to_vec()
1249 } else {
1250 remaining.to_vec()
1251 };
1252
1253 let mut result = json!({
1254 "events": page,
1255 "searchedLogStreams": [],
1256 });
1257
1258 if has_more {
1259 if let Some(last) = page.last() {
1260 let event_id = last["eventId"].as_str().unwrap_or("");
1261 result["nextToken"] = json!(format!(
1262 "{}@{}@{}",
1263 resolved_group_name,
1264 last["logStreamName"].as_str().unwrap_or(""),
1265 event_id
1266 ));
1267 }
1268 }
1269
1270 Ok(AwsResponse::json(
1271 StatusCode::OK,
1272 serde_json::to_string(&result).unwrap(),
1273 ))
1274 }
1275
1276 fn tag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1279 let body = body_json(req);
1280 let name = body["logGroupName"].as_str().ok_or_else(|| {
1281 AwsServiceError::aws_error(
1282 StatusCode::BAD_REQUEST,
1283 "InvalidParameterException",
1284 "logGroupName is required",
1285 )
1286 })?;
1287
1288 validate_string_length("logGroupName", name, 1, 512)?;
1289
1290 let tags = body["tags"].as_object().ok_or_else(|| {
1291 AwsServiceError::aws_error(
1292 StatusCode::BAD_REQUEST,
1293 "InvalidParameterException",
1294 "tags is required",
1295 )
1296 })?;
1297
1298 let mut state = self.state.write();
1299 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1300 AwsServiceError::aws_error(
1301 StatusCode::BAD_REQUEST,
1302 "ResourceNotFoundException",
1303 format!("The specified log group does not exist: {name}"),
1304 )
1305 })?;
1306
1307 for (k, v) in tags {
1308 group
1309 .tags
1310 .insert(k.clone(), v.as_str().unwrap_or("").to_string());
1311 }
1312
1313 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1314 }
1315
1316 fn untag_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317 let body = body_json(req);
1318 let name = body["logGroupName"].as_str().ok_or_else(|| {
1319 AwsServiceError::aws_error(
1320 StatusCode::BAD_REQUEST,
1321 "InvalidParameterException",
1322 "logGroupName is required",
1323 )
1324 })?;
1325
1326 validate_string_length("logGroupName", name, 1, 512)?;
1327
1328 let keys = body["tags"].as_array().ok_or_else(|| {
1329 AwsServiceError::aws_error(
1330 StatusCode::BAD_REQUEST,
1331 "InvalidParameterException",
1332 "tags is required",
1333 )
1334 })?;
1335
1336 let mut state = self.state.write();
1337 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1338 AwsServiceError::aws_error(
1339 StatusCode::BAD_REQUEST,
1340 "ResourceNotFoundException",
1341 format!("The specified log group does not exist: {name}"),
1342 )
1343 })?;
1344
1345 for key in keys {
1346 if let Some(k) = key.as_str() {
1347 group.tags.remove(k);
1348 }
1349 }
1350
1351 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1352 }
1353
1354 fn list_tags_log_group(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355 let body = body_json(req);
1356 let name = body["logGroupName"].as_str().ok_or_else(|| {
1357 AwsServiceError::aws_error(
1358 StatusCode::BAD_REQUEST,
1359 "InvalidParameterException",
1360 "logGroupName is required",
1361 )
1362 })?;
1363
1364 validate_string_length("logGroupName", name, 1, 512)?;
1365
1366 let state = self.state.read();
1367 let group = state.log_groups.get(name).ok_or_else(|| {
1368 AwsServiceError::aws_error(
1369 StatusCode::BAD_REQUEST,
1370 "ResourceNotFoundException",
1371 format!("The specified log group does not exist: {name}"),
1372 )
1373 })?;
1374
1375 Ok(AwsResponse::json(
1376 StatusCode::OK,
1377 serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1378 ))
1379 }
1380
1381 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384 let body = body_json(req);
1385 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1386 AwsServiceError::aws_error(
1387 StatusCode::BAD_REQUEST,
1388 "InvalidParameterException",
1389 "resourceArn is required",
1390 )
1391 })?;
1392
1393 validate_string_length("resourceArn", arn, 1, 1011)?;
1394
1395 let tags = body["tags"].as_object().ok_or_else(|| {
1396 AwsServiceError::aws_error(
1397 StatusCode::BAD_REQUEST,
1398 "InvalidParameterException",
1399 "tags is required",
1400 )
1401 })?;
1402
1403 let new_tags: std::collections::HashMap<String, String> = tags
1404 .iter()
1405 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
1406 .collect();
1407
1408 let mut state = self.state.write();
1409
1410 if let Some(group) = state
1412 .log_groups
1413 .values_mut()
1414 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1415 {
1416 for (k, v) in new_tags {
1417 group.tags.insert(k, v);
1418 }
1419 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1420 }
1421
1422 if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1424 for (k, v) in new_tags {
1425 dest.tags.insert(k, v);
1426 }
1427 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1428 }
1429
1430 Err(AwsServiceError::aws_error(
1431 StatusCode::BAD_REQUEST,
1432 "ResourceNotFoundException",
1433 format!("The specified resource does not exist: {arn}"),
1434 ))
1435 }
1436
1437 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438 let body = body_json(req);
1439 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1440 AwsServiceError::aws_error(
1441 StatusCode::BAD_REQUEST,
1442 "InvalidParameterException",
1443 "resourceArn is required",
1444 )
1445 })?;
1446
1447 validate_string_length("resourceArn", arn, 1, 1011)?;
1448
1449 let tag_keys = body["tagKeys"].as_array().ok_or_else(|| {
1450 AwsServiceError::aws_error(
1451 StatusCode::BAD_REQUEST,
1452 "InvalidParameterException",
1453 "tagKeys is required",
1454 )
1455 })?;
1456
1457 let keys: Vec<String> = tag_keys
1458 .iter()
1459 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1460 .collect();
1461
1462 let mut state = self.state.write();
1463
1464 if let Some(group) = state
1466 .log_groups
1467 .values_mut()
1468 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1469 {
1470 for k in &keys {
1471 group.tags.remove(k);
1472 }
1473 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1474 }
1475
1476 if let Some(dest) = state.destinations.values_mut().find(|d| d.arn == arn) {
1478 for k in &keys {
1479 dest.tags.remove(k);
1480 }
1481 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1482 }
1483
1484 Err(AwsServiceError::aws_error(
1485 StatusCode::BAD_REQUEST,
1486 "ResourceNotFoundException",
1487 format!("The specified resource does not exist: {arn}"),
1488 ))
1489 }
1490
1491 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1492 let body = body_json(req);
1493 let arn = body["resourceArn"].as_str().ok_or_else(|| {
1494 AwsServiceError::aws_error(
1495 StatusCode::BAD_REQUEST,
1496 "InvalidParameterException",
1497 "resourceArn is required",
1498 )
1499 })?;
1500
1501 validate_string_length("resourceArn", arn, 1, 1011)?;
1502
1503 let state = self.state.read();
1504
1505 if let Some(group) = state
1507 .log_groups
1508 .values()
1509 .find(|g| g.arn == arn || g.arn.trim_end_matches(":*") == arn)
1510 {
1511 return Ok(AwsResponse::json(
1512 StatusCode::OK,
1513 serde_json::to_string(&json!({ "tags": group.tags })).unwrap(),
1514 ));
1515 }
1516
1517 if let Some(dest) = state.destinations.values().find(|d| d.arn == arn) {
1519 return Ok(AwsResponse::json(
1520 StatusCode::OK,
1521 serde_json::to_string(&json!({ "tags": dest.tags })).unwrap(),
1522 ));
1523 }
1524
1525 Err(AwsServiceError::aws_error(
1526 StatusCode::BAD_REQUEST,
1527 "ResourceNotFoundException",
1528 format!("The specified resource does not exist: {arn}"),
1529 ))
1530 }
1531
1532 fn put_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535 let body = body_json(req);
1536 let name = body["logGroupName"].as_str().ok_or_else(|| {
1537 AwsServiceError::aws_error(
1538 StatusCode::BAD_REQUEST,
1539 "InvalidParameterException",
1540 "logGroupName is required",
1541 )
1542 })?;
1543
1544 validate_string_length("logGroupName", name, 1, 512)?;
1545
1546 let days = body["retentionInDays"].as_i64().ok_or_else(|| {
1547 AwsServiceError::aws_error(
1548 StatusCode::BAD_REQUEST,
1549 "InvalidParameterException",
1550 "retentionInDays is required",
1551 )
1552 })?;
1553
1554 let mut state = self.state.write();
1555 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1556 AwsServiceError::aws_error(
1557 StatusCode::BAD_REQUEST,
1558 "ResourceNotFoundException",
1559 format!("The specified log group does not exist: {name}"),
1560 )
1561 })?;
1562
1563 group.retention_in_days = Some(days as i32);
1564
1565 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1566 }
1567
1568 fn delete_retention_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569 let body = body_json(req);
1570 let name = body["logGroupName"].as_str().ok_or_else(|| {
1571 AwsServiceError::aws_error(
1572 StatusCode::BAD_REQUEST,
1573 "InvalidParameterException",
1574 "logGroupName is required",
1575 )
1576 })?;
1577
1578 validate_string_length("logGroupName", name, 1, 512)?;
1579
1580 let mut state = self.state.write();
1581 let group = state.log_groups.get_mut(name).ok_or_else(|| {
1582 AwsServiceError::aws_error(
1583 StatusCode::BAD_REQUEST,
1584 "ResourceNotFoundException",
1585 format!("The specified log group does not exist: {name}"),
1586 )
1587 })?;
1588
1589 group.retention_in_days = None;
1590
1591 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1592 }
1593
1594 fn put_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1597 let body = body_json(req);
1598 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1599 AwsServiceError::aws_error(
1600 StatusCode::BAD_REQUEST,
1601 "InvalidParameterException",
1602 "logGroupName is required",
1603 )
1604 })?;
1605 let filter_name = body["filterName"]
1606 .as_str()
1607 .ok_or_else(|| {
1608 AwsServiceError::aws_error(
1609 StatusCode::BAD_REQUEST,
1610 "InvalidParameterException",
1611 "filterName is required",
1612 )
1613 })?
1614 .to_string();
1615 let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1616 let destination_arn = body["destinationArn"]
1617 .as_str()
1618 .ok_or_else(|| {
1619 AwsServiceError::aws_error(
1620 StatusCode::BAD_REQUEST,
1621 "InvalidParameterException",
1622 "destinationArn is required",
1623 )
1624 })?
1625 .to_string();
1626 let role_arn = body["roleArn"].as_str().map(|s| s.to_string());
1627 let distribution = body["distribution"]
1628 .as_str()
1629 .unwrap_or("ByLogStream")
1630 .to_string();
1631
1632 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1633 validate_string_length("filterName", &filter_name, 1, 512)?;
1634 validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1635
1636 let mut state = self.state.write();
1637 let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1638 AwsServiceError::aws_error(
1639 StatusCode::BAD_REQUEST,
1640 "ResourceNotFoundException",
1641 "The specified log group does not exist.",
1642 )
1643 })?;
1644
1645 if let Some(existing) = group
1647 .subscription_filters
1648 .iter_mut()
1649 .find(|f| f.filter_name == filter_name)
1650 {
1651 existing.filter_pattern = filter_pattern;
1652 existing.destination_arn = destination_arn;
1653 existing.role_arn = role_arn;
1654 existing.distribution = distribution;
1655 return Ok(AwsResponse::json(StatusCode::OK, "{}"));
1656 }
1657
1658 if group.subscription_filters.len() >= 2 {
1660 return Err(AwsServiceError::aws_error(
1661 StatusCode::BAD_REQUEST,
1662 "LimitExceededException",
1663 "Resource limit exceeded.",
1664 ));
1665 }
1666
1667 let now = Utc::now().timestamp_millis();
1668 group.subscription_filters.push(SubscriptionFilter {
1669 filter_name,
1670 log_group_name: log_group_name.to_string(),
1671 filter_pattern,
1672 destination_arn,
1673 role_arn,
1674 distribution,
1675 creation_time: now,
1676 });
1677
1678 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1679 }
1680
1681 fn describe_subscription_filters(
1682 &self,
1683 req: &AwsRequest,
1684 ) -> Result<AwsResponse, AwsServiceError> {
1685 let body = body_json(req);
1686 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1687 AwsServiceError::aws_error(
1688 StatusCode::BAD_REQUEST,
1689 "InvalidParameterException",
1690 "logGroupName is required",
1691 )
1692 })?;
1693
1694 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1695 validate_optional_string_length(
1696 "filterNamePrefix",
1697 body["filterNamePrefix"].as_str(),
1698 1,
1699 512,
1700 )?;
1701
1702 let state = self.state.read();
1703 let group = state.log_groups.get(log_group_name).ok_or_else(|| {
1704 AwsServiceError::aws_error(
1705 StatusCode::BAD_REQUEST,
1706 "ResourceNotFoundException",
1707 "The specified log group does not exist.",
1708 )
1709 })?;
1710
1711 let filters: Vec<Value> = group
1712 .subscription_filters
1713 .iter()
1714 .map(|f| {
1715 let mut obj = json!({
1716 "filterName": f.filter_name,
1717 "logGroupName": f.log_group_name,
1718 "filterPattern": f.filter_pattern,
1719 "destinationArn": f.destination_arn,
1720 "distribution": f.distribution,
1721 "creationTime": f.creation_time,
1722 });
1723 if let Some(ref arn) = f.role_arn {
1724 obj["roleArn"] = json!(arn);
1725 }
1726 obj
1727 })
1728 .collect();
1729
1730 Ok(AwsResponse::json(
1731 StatusCode::OK,
1732 serde_json::to_string(&json!({ "subscriptionFilters": filters })).unwrap(),
1733 ))
1734 }
1735
1736 fn delete_subscription_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1737 let body = body_json(req);
1738 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1739 AwsServiceError::aws_error(
1740 StatusCode::BAD_REQUEST,
1741 "InvalidParameterException",
1742 "logGroupName is required",
1743 )
1744 })?;
1745 let filter_name = body["filterName"].as_str().ok_or_else(|| {
1746 AwsServiceError::aws_error(
1747 StatusCode::BAD_REQUEST,
1748 "InvalidParameterException",
1749 "filterName is required",
1750 )
1751 })?;
1752
1753 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1754 validate_string_length("filterName", filter_name, 1, 512)?;
1755
1756 let mut state = self.state.write();
1757 let group = state.log_groups.get_mut(log_group_name).ok_or_else(|| {
1758 AwsServiceError::aws_error(
1759 StatusCode::BAD_REQUEST,
1760 "ResourceNotFoundException",
1761 "The specified log group does not exist.",
1762 )
1763 })?;
1764
1765 let idx = group
1766 .subscription_filters
1767 .iter()
1768 .position(|f| f.filter_name == filter_name)
1769 .ok_or_else(|| {
1770 AwsServiceError::aws_error(
1771 StatusCode::BAD_REQUEST,
1772 "ResourceNotFoundException",
1773 "The specified subscription filter does not exist.",
1774 )
1775 })?;
1776
1777 group.subscription_filters.remove(idx);
1778
1779 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1780 }
1781
1782 fn put_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1785 let body = body_json(req);
1786 let filter_name = body["filterName"]
1787 .as_str()
1788 .ok_or_else(|| {
1789 AwsServiceError::aws_error(
1790 StatusCode::BAD_REQUEST,
1791 "InvalidParameterException",
1792 "filterName is required",
1793 )
1794 })?
1795 .to_string();
1796 validate_required("filterPattern", &body["filterPattern"])?;
1797 let filter_pattern = body["filterPattern"].as_str().unwrap_or("").to_string();
1798 let log_group_name = body["logGroupName"]
1799 .as_str()
1800 .ok_or_else(|| {
1801 AwsServiceError::aws_error(
1802 StatusCode::BAD_REQUEST,
1803 "InvalidParameterException",
1804 "logGroupName is required",
1805 )
1806 })?
1807 .to_string();
1808
1809 validate_string_length("filterName", &filter_name, 1, 512)?;
1810 validate_string_length("logGroupName", &log_group_name, 1, 512)?;
1811 validate_optional_string_length("filterPattern", Some(&filter_pattern), 0, 1024)?;
1812 validate_optional_string_length(
1813 "fieldSelectionCriteria",
1814 body["fieldSelectionCriteria"].as_str(),
1815 0,
1816 2000,
1817 )?;
1818
1819 let transformations_json = body["metricTransformations"].as_array().ok_or_else(|| {
1820 AwsServiceError::aws_error(
1821 StatusCode::BAD_REQUEST,
1822 "InvalidParameterException",
1823 "metricTransformations is required",
1824 )
1825 })?;
1826
1827 if transformations_json.len() > 1 {
1829 return Err(validation_error(
1830 "metricTransformations",
1831 &format!("{}", transformations_json.len()),
1832 "Member must have length less than or equal to 1",
1833 ));
1834 }
1835
1836 let transformations: Vec<MetricTransformation> = transformations_json
1837 .iter()
1838 .map(|t| MetricTransformation {
1839 metric_name: t["metricName"].as_str().unwrap_or("").to_string(),
1840 metric_namespace: t["metricNamespace"].as_str().unwrap_or("").to_string(),
1841 metric_value: t["metricValue"].as_str().unwrap_or("").to_string(),
1842 default_value: t["defaultValue"].as_f64(),
1843 })
1844 .collect();
1845
1846 let now = Utc::now().timestamp_millis();
1847
1848 let mut state = self.state.write();
1849
1850 if let Some(existing) = state
1852 .metric_filters
1853 .iter_mut()
1854 .find(|f| f.filter_name == filter_name && f.log_group_name == log_group_name)
1855 {
1856 existing.filter_pattern = filter_pattern;
1857 existing.metric_transformations = transformations;
1858 } else {
1859 state.metric_filters.push(MetricFilter {
1860 filter_name,
1861 filter_pattern,
1862 log_group_name,
1863 metric_transformations: transformations,
1864 creation_time: now,
1865 });
1866 }
1867
1868 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1869 }
1870
1871 fn describe_metric_filters(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872 let body = body_json(req);
1873 let filter_name_prefix = body["filterNamePrefix"].as_str();
1874 let log_group_name = body["logGroupName"].as_str();
1875 let metric_name = body["metricName"].as_str();
1876 let metric_namespace = body["metricNamespace"].as_str();
1877
1878 validate_optional_string_length("filterNamePrefix", filter_name_prefix, 1, 512)?;
1879 validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
1880 validate_optional_string_length("metricName", metric_name, 0, 255)?;
1881 validate_optional_string_length("metricNamespace", metric_namespace, 0, 255)?;
1882 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
1883 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
1884
1885 let state = self.state.read();
1886 let filters: Vec<Value> = state
1887 .metric_filters
1888 .iter()
1889 .filter(|f| {
1890 if let Some(prefix) = filter_name_prefix {
1891 if !f.filter_name.starts_with(prefix) {
1892 return false;
1893 }
1894 }
1895 if let Some(lg) = log_group_name {
1896 if f.log_group_name != lg {
1897 return false;
1898 }
1899 }
1900 if let Some(mn) = metric_name {
1901 if !f.metric_transformations.iter().any(|t| t.metric_name == mn) {
1902 return false;
1903 }
1904 }
1905 if let Some(ns) = metric_namespace {
1906 if !f
1907 .metric_transformations
1908 .iter()
1909 .any(|t| t.metric_namespace == ns)
1910 {
1911 return false;
1912 }
1913 }
1914 true
1915 })
1916 .map(|f| {
1917 let transformations: Vec<Value> = f
1918 .metric_transformations
1919 .iter()
1920 .map(|t| {
1921 let mut obj = json!({
1922 "metricName": t.metric_name,
1923 "metricNamespace": t.metric_namespace,
1924 "metricValue": t.metric_value,
1925 });
1926 if let Some(dv) = t.default_value {
1927 obj["defaultValue"] = json!(dv);
1928 }
1929 obj
1930 })
1931 .collect();
1932
1933 json!({
1934 "filterName": f.filter_name,
1935 "filterPattern": f.filter_pattern,
1936 "logGroupName": f.log_group_name,
1937 "metricTransformations": transformations,
1938 "creationTime": f.creation_time,
1939 })
1940 })
1941 .collect();
1942
1943 Ok(AwsResponse::json(
1944 StatusCode::OK,
1945 serde_json::to_string(&json!({ "metricFilters": filters })).unwrap(),
1946 ))
1947 }
1948
1949 fn delete_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1950 let body = body_json(req);
1951 let filter_name = body["filterName"].as_str().ok_or_else(|| {
1952 AwsServiceError::aws_error(
1953 StatusCode::BAD_REQUEST,
1954 "InvalidParameterException",
1955 "filterName is required",
1956 )
1957 })?;
1958 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
1959 AwsServiceError::aws_error(
1960 StatusCode::BAD_REQUEST,
1961 "InvalidParameterException",
1962 "logGroupName is required",
1963 )
1964 })?;
1965
1966 validate_string_length("filterName", filter_name, 1, 512)?;
1967 validate_string_length("logGroupName", log_group_name, 1, 512)?;
1968
1969 let mut state = self.state.write();
1970 let idx = state
1971 .metric_filters
1972 .iter()
1973 .position(|f| f.filter_name == filter_name && f.log_group_name == log_group_name);
1974
1975 if let Some(i) = idx {
1976 state.metric_filters.remove(i);
1977 }
1978
1979 Ok(AwsResponse::json(StatusCode::OK, "{}"))
1980 }
1981
1982 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1985 let body = body_json(req);
1986 let policy_name = body["policyName"]
1987 .as_str()
1988 .ok_or_else(|| {
1989 AwsServiceError::aws_error(
1990 StatusCode::BAD_REQUEST,
1991 "InvalidParameterException",
1992 "policyName is required",
1993 )
1994 })?
1995 .to_string();
1996 let policy_document = body["policyDocument"]
1997 .as_str()
1998 .ok_or_else(|| {
1999 AwsServiceError::aws_error(
2000 StatusCode::BAD_REQUEST,
2001 "InvalidParameterException",
2002 "policyDocument is required",
2003 )
2004 })?
2005 .to_string();
2006
2007 let now = Utc::now().timestamp_millis();
2008
2009 let mut state = self.state.write();
2010
2011 if !state.resource_policies.contains_key(&policy_name)
2013 && state.resource_policies.len() >= 10
2014 {
2015 return Err(AwsServiceError::aws_error(
2016 StatusCode::BAD_REQUEST,
2017 "LimitExceededException",
2018 "Resource limit exceeded.",
2019 ));
2020 }
2021
2022 let policy = ResourcePolicy {
2023 policy_name: policy_name.clone(),
2024 policy_document: policy_document.clone(),
2025 last_updated_time: now,
2026 };
2027
2028 state.resource_policies.insert(policy_name.clone(), policy);
2029
2030 Ok(AwsResponse::json(
2031 StatusCode::OK,
2032 serde_json::to_string(&json!({
2033 "resourcePolicy": {
2034 "policyName": policy_name,
2035 "policyDocument": policy_document,
2036 "lastUpdatedTime": now,
2037 }
2038 }))
2039 .unwrap(),
2040 ))
2041 }
2042
2043 fn describe_resource_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2044 let body = body_json(req);
2045 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2046 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2047 validate_optional_enum_value(
2048 "policyScope",
2049 &body["policyScope"],
2050 &["ACCOUNT", "RESOURCE"],
2051 )?;
2052 let state = self.state.read();
2053
2054 let mut policies: Vec<Value> = state
2055 .resource_policies
2056 .values()
2057 .map(|p| {
2058 json!({
2059 "policyName": p.policy_name,
2060 "policyDocument": p.policy_document,
2061 "lastUpdatedTime": p.last_updated_time,
2062 })
2063 })
2064 .collect();
2065 policies.sort_by(|a, b| {
2066 a["policyName"]
2067 .as_str()
2068 .unwrap()
2069 .cmp(b["policyName"].as_str().unwrap())
2070 });
2071
2072 Ok(AwsResponse::json(
2073 StatusCode::OK,
2074 serde_json::to_string(&json!({ "resourcePolicies": policies })).unwrap(),
2075 ))
2076 }
2077
2078 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079 let body = body_json(req);
2080 let policy_name = body["policyName"].as_str().ok_or_else(|| {
2081 AwsServiceError::aws_error(
2082 StatusCode::BAD_REQUEST,
2083 "InvalidParameterException",
2084 "policyName is required",
2085 )
2086 })?;
2087
2088 let mut state = self.state.write();
2089 if state.resource_policies.remove(policy_name).is_none() {
2090 return Err(AwsServiceError::aws_error(
2091 StatusCode::BAD_REQUEST,
2092 "ResourceNotFoundException",
2093 format!("Policy with name [{policy_name}] does not exist"),
2094 ));
2095 }
2096
2097 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2098 }
2099
2100 fn put_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2103 let body = body_json(req);
2104 let destination_name = body["destinationName"]
2105 .as_str()
2106 .ok_or_else(|| {
2107 AwsServiceError::aws_error(
2108 StatusCode::BAD_REQUEST,
2109 "InvalidParameterException",
2110 "destinationName is required",
2111 )
2112 })?
2113 .to_string();
2114 let target_arn = body["targetArn"]
2115 .as_str()
2116 .ok_or_else(|| {
2117 AwsServiceError::aws_error(
2118 StatusCode::BAD_REQUEST,
2119 "InvalidParameterException",
2120 "targetArn is required",
2121 )
2122 })?
2123 .to_string();
2124 let role_arn = body["roleArn"]
2125 .as_str()
2126 .ok_or_else(|| {
2127 AwsServiceError::aws_error(
2128 StatusCode::BAD_REQUEST,
2129 "InvalidParameterException",
2130 "roleArn is required",
2131 )
2132 })?
2133 .to_string();
2134
2135 validate_string_length("destinationName", &destination_name, 1, 512)?;
2136 validate_string_length("targetArn", &target_arn, 1, 2048)?;
2137 validate_string_length("roleArn", &role_arn, 1, 2048)?;
2138
2139 let tags: std::collections::HashMap<String, String> = body["tags"]
2140 .as_object()
2141 .map(|m| {
2142 m.iter()
2143 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2144 .collect()
2145 })
2146 .unwrap_or_default();
2147
2148 let mut state = self.state.write();
2149 let arn = format!(
2150 "arn:aws:logs:{}:{}:destination:{}",
2151 state.region, state.account_id, destination_name
2152 );
2153 let now = Utc::now().timestamp_millis();
2154
2155 let access_policy = state
2157 .destinations
2158 .get(&destination_name)
2159 .and_then(|d| d.access_policy.clone());
2160
2161 let dest = Destination {
2162 destination_name: destination_name.clone(),
2163 target_arn: target_arn.clone(),
2164 role_arn: role_arn.clone(),
2165 arn: arn.clone(),
2166 access_policy,
2167 creation_time: now,
2168 tags: tags.clone(),
2169 };
2170
2171 state.destinations.insert(destination_name.clone(), dest);
2172
2173 let dest_json = json!({
2174 "destinationName": destination_name,
2175 "targetArn": target_arn,
2176 "roleArn": role_arn,
2177 "arn": arn,
2178 "creationTime": now,
2179 });
2180
2181 Ok(AwsResponse::json(
2182 StatusCode::OK,
2183 serde_json::to_string(&json!({ "destination": dest_json })).unwrap(),
2184 ))
2185 }
2186
2187 fn describe_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2188 let body = body_json(req);
2189 let prefix = body["DestinationNamePrefix"].as_str().unwrap_or("");
2190
2191 validate_optional_string_length(
2192 "DestinationNamePrefix",
2193 body["DestinationNamePrefix"].as_str(),
2194 1,
2195 512,
2196 )?;
2197 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2198 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2199
2200 let state = self.state.read();
2201 let destinations: Vec<Value> = state
2202 .destinations
2203 .values()
2204 .filter(|d| prefix.is_empty() || d.destination_name.starts_with(prefix))
2205 .map(|d| {
2206 let mut obj = json!({
2207 "destinationName": d.destination_name,
2208 "targetArn": d.target_arn,
2209 "roleArn": d.role_arn,
2210 "arn": d.arn,
2211 "creationTime": d.creation_time,
2212 });
2213 if let Some(ref policy) = d.access_policy {
2214 obj["accessPolicy"] = json!(policy);
2215 }
2216 obj
2217 })
2218 .collect();
2219
2220 Ok(AwsResponse::json(
2221 StatusCode::OK,
2222 serde_json::to_string(&json!({ "destinations": destinations })).unwrap(),
2223 ))
2224 }
2225
2226 fn delete_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2227 let body = body_json(req);
2228 let name = body["destinationName"].as_str().ok_or_else(|| {
2229 AwsServiceError::aws_error(
2230 StatusCode::BAD_REQUEST,
2231 "InvalidParameterException",
2232 "destinationName is required",
2233 )
2234 })?;
2235
2236 validate_string_length("destinationName", name, 1, 512)?;
2237
2238 let mut state = self.state.write();
2239 if state.destinations.remove(name).is_none() {
2240 return Err(AwsServiceError::aws_error(
2241 StatusCode::BAD_REQUEST,
2242 "ResourceNotFoundException",
2243 format!("The specified destination does not exist: {name}"),
2244 ));
2245 }
2246
2247 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2248 }
2249
2250 fn put_destination_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2251 let body = body_json(req);
2252 let name = body["destinationName"].as_str().ok_or_else(|| {
2253 AwsServiceError::aws_error(
2254 StatusCode::BAD_REQUEST,
2255 "InvalidParameterException",
2256 "destinationName is required",
2257 )
2258 })?;
2259
2260 validate_string_length("destinationName", name, 1, 512)?;
2261
2262 let policy = body["accessPolicy"].as_str().ok_or_else(|| {
2263 AwsServiceError::aws_error(
2264 StatusCode::BAD_REQUEST,
2265 "InvalidParameterException",
2266 "accessPolicy is required",
2267 )
2268 })?;
2269
2270 validate_string_length("accessPolicy", policy, 1, 5120)?;
2271
2272 let mut state = self.state.write();
2273 let dest = state.destinations.get_mut(name).ok_or_else(|| {
2274 AwsServiceError::aws_error(
2275 StatusCode::BAD_REQUEST,
2276 "ResourceNotFoundException",
2277 format!("The specified destination does not exist: {name}"),
2278 )
2279 })?;
2280
2281 dest.access_policy = Some(policy.to_string());
2282
2283 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2284 }
2285
2286 fn start_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2289 let body = body_json(req);
2290 let log_group_name = body["logGroupName"].as_str().ok_or_else(|| {
2291 AwsServiceError::aws_error(
2292 StatusCode::BAD_REQUEST,
2293 "InvalidParameterException",
2294 "logGroupName is required",
2295 )
2296 })?;
2297 let start_time = body["startTime"].as_i64().unwrap_or(0);
2298 let end_time = body["endTime"].as_i64().unwrap_or(0);
2299 let query_string = body["queryString"].as_str().unwrap_or("").to_string();
2300
2301 validate_string_length("logGroupName", log_group_name, 1, 512)?;
2302 validate_optional_string_length("queryString", Some(&query_string), 0, 10000)?;
2303
2304 let mut state = self.state.write();
2305
2306 if !state.log_groups.contains_key(log_group_name) {
2308 return Err(AwsServiceError::aws_error(
2309 StatusCode::BAD_REQUEST,
2310 "ResourceNotFoundException",
2311 "The specified log group does not exist.",
2312 ));
2313 }
2314
2315 let query_id = uuid::Uuid::new_v4().to_string();
2316 let now = Utc::now().timestamp_millis();
2317
2318 state.queries.insert(
2319 query_id.clone(),
2320 QueryInfo {
2321 query_id: query_id.clone(),
2322 log_group_name: log_group_name.to_string(),
2323 query_string,
2324 start_time,
2325 end_time,
2326 status: "Complete".to_string(),
2327 create_time: now,
2328 },
2329 );
2330
2331 Ok(AwsResponse::json(
2332 StatusCode::OK,
2333 serde_json::to_string(&json!({ "queryId": query_id })).unwrap(),
2334 ))
2335 }
2336
2337 fn get_query_results(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2338 let body = body_json(req);
2339 let query_id = body["queryId"].as_str().ok_or_else(|| {
2340 AwsServiceError::aws_error(
2341 StatusCode::BAD_REQUEST,
2342 "InvalidParameterException",
2343 "queryId is required",
2344 )
2345 })?;
2346
2347 validate_string_length("queryId", query_id, 1, 256)?;
2348
2349 let state = self.state.read();
2350 let query = state.queries.get(query_id).ok_or_else(|| {
2351 AwsServiceError::aws_error(
2352 StatusCode::BAD_REQUEST,
2353 "ResourceNotFoundException",
2354 "The specified query does not exist.",
2355 )
2356 })?;
2357
2358 let mut results: Vec<Value> = Vec::new();
2360 if let Some(group) = state.log_groups.get(&query.log_group_name) {
2361 for stream in group.log_streams.values() {
2362 for event in &stream.events {
2363 let event_time_secs = event.timestamp / 1000;
2365 if event_time_secs >= query.start_time && event_time_secs < query.end_time {
2366 results.push(json!([
2367 {"field": "@message", "value": event.message},
2368 {"field": "@ptr", "value": format!("{}/{}", stream.name, event.timestamp)},
2369 ]));
2370 }
2371 }
2372 }
2373 }
2374
2375 results.sort_by(|a, b| {
2377 let a_msg = a[0]["value"].as_str().unwrap_or("");
2378 let b_msg = b[0]["value"].as_str().unwrap_or("");
2379 a_msg.cmp(b_msg)
2380 });
2381
2382 Ok(AwsResponse::json(
2383 StatusCode::OK,
2384 serde_json::to_string(&json!({
2385 "status": query.status,
2386 "results": results,
2387 "statistics": {
2388 "recordsMatched": results.len() as f64,
2389 "recordsScanned": results.len() as f64,
2390 "bytesScanned": 0.0,
2391 },
2392 }))
2393 .unwrap(),
2394 ))
2395 }
2396
2397 fn describe_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2398 let body = body_json(req);
2399 let log_group_name = body["logGroupName"].as_str();
2400 let status_filter = body["status"].as_str();
2401
2402 validate_optional_string_length("logGroupName", log_group_name, 1, 512)?;
2403 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
2404 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2405 validate_optional_enum_value(
2406 "status",
2407 &body["status"],
2408 &[
2409 "Scheduled",
2410 "Running",
2411 "Complete",
2412 "Failed",
2413 "Cancelled",
2414 "Timeout",
2415 "Unknown",
2416 ],
2417 )?;
2418 validate_optional_enum_value(
2419 "queryLanguage",
2420 &body["queryLanguage"],
2421 &["CWLI", "SQL", "PPL"],
2422 )?;
2423
2424 let state = self.state.read();
2425 let queries: Vec<Value> = state
2426 .queries
2427 .values()
2428 .filter(|q| {
2429 if let Some(lg) = log_group_name {
2430 if q.log_group_name != lg {
2431 return false;
2432 }
2433 }
2434 if let Some(status) = status_filter {
2435 if q.status != status {
2436 return false;
2437 }
2438 }
2439 true
2440 })
2441 .map(|q| {
2442 json!({
2443 "queryId": q.query_id,
2444 "queryString": q.query_string,
2445 "status": q.status,
2446 "createTime": q.create_time,
2447 "logGroupName": q.log_group_name,
2448 })
2449 })
2450 .collect();
2451
2452 Ok(AwsResponse::json(
2453 StatusCode::OK,
2454 serde_json::to_string(&json!({ "queries": queries })).unwrap(),
2455 ))
2456 }
2457
2458 fn create_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461 let body = body_json(req);
2462 let log_group_name = body["logGroupName"]
2463 .as_str()
2464 .ok_or_else(|| {
2465 AwsServiceError::aws_error(
2466 StatusCode::BAD_REQUEST,
2467 "InvalidParameterException",
2468 "logGroupName is required",
2469 )
2470 })?
2471 .to_string();
2472 let from_time = body["from"].as_i64().unwrap_or(0);
2473 let to_time = body["to"].as_i64().unwrap_or(0);
2474 let destination = body["destination"]
2475 .as_str()
2476 .ok_or_else(|| {
2477 AwsServiceError::aws_error(
2478 StatusCode::BAD_REQUEST,
2479 "InvalidParameterException",
2480 "destination is required",
2481 )
2482 })?
2483 .to_string();
2484 let destination_prefix = body["destinationPrefix"]
2485 .as_str()
2486 .unwrap_or("exportedlogs")
2487 .to_string();
2488
2489 validate_string_length("logGroupName", &log_group_name, 1, 512)?;
2490 validate_optional_string_length("taskName", body["taskName"].as_str(), 1, 512)?;
2491 validate_optional_string_length(
2492 "logStreamNamePrefix",
2493 body["logStreamNamePrefix"].as_str(),
2494 1,
2495 512,
2496 )?;
2497 validate_string_length("destination", &destination, 1, 512)?;
2498
2499 let state = self.state.read();
2500 if !state.log_groups.contains_key(&log_group_name) {
2501 return Err(AwsServiceError::aws_error(
2502 StatusCode::BAD_REQUEST,
2503 "ResourceNotFoundException",
2504 "The specified log group does not exist.",
2505 ));
2506 }
2507 drop(state);
2508
2509 let task_name = body["taskName"].as_str().map(|s| s.to_string());
2510 let log_stream_name_prefix = body["logStreamNamePrefix"].as_str().map(|s| s.to_string());
2511
2512 let task_id = uuid::Uuid::new_v4().to_string();
2513 let (status_code, status_message) = if from_time < to_time {
2514 (
2515 "COMPLETED".to_string(),
2516 "Completed successfully".to_string(),
2517 )
2518 } else {
2519 ("active".to_string(), "Task is active".to_string())
2520 };
2521
2522 let mut state = self.state.write();
2523 state.export_tasks.push(ExportTask {
2524 task_id: task_id.clone(),
2525 task_name,
2526 log_group_name,
2527 log_stream_name_prefix,
2528 from_time,
2529 to_time,
2530 destination,
2531 destination_prefix,
2532 status_code,
2533 status_message,
2534 });
2535
2536 Ok(AwsResponse::json(
2537 StatusCode::OK,
2538 serde_json::to_string(&json!({ "taskId": task_id })).unwrap(),
2539 ))
2540 }
2541
2542 fn describe_export_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2543 let body = body_json(req);
2544 let task_id_filter = body["taskId"].as_str();
2545
2546 validate_optional_string_length("taskId", task_id_filter, 1, 512)?;
2547 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2548 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2549 validate_optional_enum_value(
2550 "statusCode",
2551 &body["statusCode"],
2552 &[
2553 "CANCELLED",
2554 "COMPLETED",
2555 "FAILED",
2556 "PENDING",
2557 "PENDING_CANCEL",
2558 "RUNNING",
2559 ],
2560 )?;
2561
2562 let state = self.state.read();
2563
2564 if let Some(task_id) = task_id_filter {
2565 let task = state.export_tasks.iter().find(|t| t.task_id == task_id);
2566 if task.is_none() {
2567 return Err(AwsServiceError::aws_error(
2568 StatusCode::BAD_REQUEST,
2569 "ResourceNotFoundException",
2570 "The specified export task does not exist.",
2571 ));
2572 }
2573 }
2574
2575 let tasks: Vec<Value> = state
2576 .export_tasks
2577 .iter()
2578 .filter(|t| {
2579 if let Some(tid) = task_id_filter {
2580 t.task_id == tid
2581 } else {
2582 true
2583 }
2584 })
2585 .map(|t| {
2586 let mut obj = json!({
2587 "taskId": t.task_id,
2588 "logGroupName": t.log_group_name,
2589 "from": t.from_time,
2590 "to": t.to_time,
2591 "destination": t.destination,
2592 "destinationPrefix": t.destination_prefix,
2593 "status": {
2594 "code": t.status_code,
2595 "message": t.status_message,
2596 },
2597 });
2598 if let Some(ref name) = t.task_name {
2599 obj["taskName"] = json!(name);
2600 }
2601 if let Some(ref prefix) = t.log_stream_name_prefix {
2602 obj["logStreamNamePrefix"] = json!(prefix);
2603 }
2604 obj
2605 })
2606 .collect();
2607
2608 Ok(AwsResponse::json(
2609 StatusCode::OK,
2610 serde_json::to_string(&json!({ "exportTasks": tasks })).unwrap(),
2611 ))
2612 }
2613
2614 fn cancel_export_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615 let body = body_json(req);
2616 let task_id = body["taskId"].as_str().ok_or_else(|| {
2617 AwsServiceError::aws_error(
2618 StatusCode::BAD_REQUEST,
2619 "InvalidParameterException",
2620 "taskId is required",
2621 )
2622 })?;
2623
2624 validate_string_length("taskId", task_id, 1, 512)?;
2625
2626 let mut state = self.state.write();
2627 let task = state
2628 .export_tasks
2629 .iter_mut()
2630 .find(|t| t.task_id == task_id)
2631 .ok_or_else(|| {
2632 AwsServiceError::aws_error(
2633 StatusCode::BAD_REQUEST,
2634 "ResourceNotFoundException",
2635 "The specified export task does not exist.",
2636 )
2637 })?;
2638
2639 task.status_code = "CANCELLED".to_string();
2640 task.status_message = "Task was cancelled".to_string();
2641
2642 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2643 }
2644
2645 fn put_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2648 let body = body_json(req);
2649 let name = body["name"]
2650 .as_str()
2651 .ok_or_else(|| {
2652 AwsServiceError::aws_error(
2653 StatusCode::BAD_REQUEST,
2654 "InvalidParameterException",
2655 "name is required",
2656 )
2657 })?
2658 .to_string();
2659
2660 validate_string_length("name", &name, 1, 60)?;
2661
2662 validate_optional_enum_value(
2663 "deliveryDestinationType",
2664 &body["deliveryDestinationType"],
2665 &["S3", "CWL", "FH", "XRAY"],
2666 )?;
2667
2668 let output_format = body["outputFormat"].as_str().map(|s| s.to_string());
2669
2670 if let Some(ref fmt) = output_format {
2672 let valid = ["json", "plain", "w3c", "raw", "parquet"];
2673 if !valid.contains(&fmt.as_str()) {
2674 return Err(AwsServiceError::aws_error(
2675 StatusCode::BAD_REQUEST,
2676 "ValidationException",
2677 format!("1 validation error detected: Value '{fmt}' at 'outputFormat' failed to satisfy constraint: Member must satisfy enum value set: [json, plain, w3c, raw, parquet]"),
2678 ));
2679 }
2680 }
2681
2682 let config: std::collections::HashMap<String, String> = body
2683 ["deliveryDestinationConfiguration"]
2684 .as_object()
2685 .map(|m| {
2686 m.iter()
2687 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2688 .collect()
2689 })
2690 .unwrap_or_default();
2691
2692 let tags: std::collections::HashMap<String, String> = body["tags"]
2693 .as_object()
2694 .map(|m| {
2695 m.iter()
2696 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
2697 .collect()
2698 })
2699 .unwrap_or_default();
2700
2701 let mut state = self.state.write();
2702
2703 if let Some(existing) = state.delivery_destinations.get(&name) {
2705 if let Some(ref new_fmt) = output_format {
2706 if let Some(ref existing_fmt) = existing.output_format {
2707 if new_fmt != existing_fmt {
2708 return Err(AwsServiceError::aws_error(
2709 StatusCode::BAD_REQUEST,
2710 "ValidationException",
2711 "Cannot update outputFormat for an existing delivery destination.",
2712 ));
2713 }
2714 }
2715 }
2716 }
2717
2718 let arn = format!(
2719 "arn:aws:logs:{}:{}:delivery-destination:{}",
2720 state.region, state.account_id, name
2721 );
2722
2723 let existing_policy = state
2724 .delivery_destinations
2725 .get(&name)
2726 .and_then(|d| d.delivery_destination_policy.clone());
2727
2728 let dd = DeliveryDestination {
2729 name: name.clone(),
2730 arn: arn.clone(),
2731 output_format: output_format.clone(),
2732 delivery_destination_configuration: config.clone(),
2733 tags: tags.clone(),
2734 delivery_destination_policy: existing_policy,
2735 };
2736
2737 state.delivery_destinations.insert(name.clone(), dd);
2738
2739 let config_resp = {
2742 let mut c: serde_json::Map<String, Value> =
2743 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
2744 c.entry("destinationResourceArn".to_string())
2745 .or_insert_with(|| json!(""));
2746 Value::Object(c)
2747 };
2748
2749 let mut resp = json!({
2750 "deliveryDestination": {
2751 "name": name,
2752 "arn": arn,
2753 "deliveryDestinationConfiguration": config_resp,
2754 }
2755 });
2756 if let Some(ref fmt) = output_format {
2757 resp["deliveryDestination"]["outputFormat"] = json!(fmt);
2758 }
2759 if !tags.is_empty() {
2760 resp["deliveryDestination"]["tags"] = json!(tags);
2761 }
2762
2763 Ok(AwsResponse::json(
2764 StatusCode::OK,
2765 serde_json::to_string(&resp).unwrap(),
2766 ))
2767 }
2768
2769 fn get_delivery_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2770 let body = body_json(req);
2771 let name = body["name"].as_str().ok_or_else(|| {
2772 AwsServiceError::aws_error(
2773 StatusCode::BAD_REQUEST,
2774 "InvalidParameterException",
2775 "name is required",
2776 )
2777 })?;
2778
2779 validate_string_length("name", name, 1, 60)?;
2780
2781 let state = self.state.read();
2782 let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2783 AwsServiceError::aws_error(
2784 StatusCode::BAD_REQUEST,
2785 "ResourceNotFoundException",
2786 format!("Delivery destination '{name}' does not exist."),
2787 )
2788 })?;
2789
2790 let mut obj = json!({
2791 "name": dd.name,
2792 "arn": dd.arn,
2793 "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2794 });
2795 if let Some(ref fmt) = dd.output_format {
2796 obj["outputFormat"] = json!(fmt);
2797 }
2798
2799 Ok(AwsResponse::json(
2800 StatusCode::OK,
2801 serde_json::to_string(&json!({ "deliveryDestination": obj })).unwrap(),
2802 ))
2803 }
2804
2805 fn describe_delivery_destinations(
2806 &self,
2807 req: &AwsRequest,
2808 ) -> Result<AwsResponse, AwsServiceError> {
2809 let body = body_json(req);
2810 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
2811 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
2812
2813 let state = self.state.read();
2814 let dds: Vec<Value> = state
2815 .delivery_destinations
2816 .values()
2817 .map(|dd| {
2818 let mut obj = json!({
2819 "name": dd.name,
2820 "arn": dd.arn,
2821 "deliveryDestinationConfiguration": dd_config_json(&dd.delivery_destination_configuration),
2822 });
2823 if let Some(ref fmt) = dd.output_format {
2824 obj["outputFormat"] = json!(fmt);
2825 }
2826 obj
2827 })
2828 .collect();
2829
2830 Ok(AwsResponse::json(
2831 StatusCode::OK,
2832 serde_json::to_string(&json!({ "deliveryDestinations": dds })).unwrap(),
2833 ))
2834 }
2835
2836 fn delete_delivery_destination(
2837 &self,
2838 req: &AwsRequest,
2839 ) -> Result<AwsResponse, AwsServiceError> {
2840 let body = body_json(req);
2841 let name = body["name"].as_str().ok_or_else(|| {
2842 AwsServiceError::aws_error(
2843 StatusCode::BAD_REQUEST,
2844 "InvalidParameterException",
2845 "name is required",
2846 )
2847 })?;
2848
2849 validate_string_length("name", name, 1, 60)?;
2850
2851 let mut state = self.state.write();
2852 if state.delivery_destinations.remove(name).is_none() {
2853 return Err(AwsServiceError::aws_error(
2854 StatusCode::BAD_REQUEST,
2855 "ResourceNotFoundException",
2856 format!("Delivery destination '{name}' does not exist."),
2857 ));
2858 }
2859
2860 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2861 }
2862
2863 fn put_delivery_destination_policy(
2864 &self,
2865 req: &AwsRequest,
2866 ) -> Result<AwsResponse, AwsServiceError> {
2867 let body = body_json(req);
2868 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2869 AwsServiceError::aws_error(
2870 StatusCode::BAD_REQUEST,
2871 "InvalidParameterException",
2872 "deliveryDestinationName is required",
2873 )
2874 })?;
2875 let policy = body["deliveryDestinationPolicy"]
2876 .as_str()
2877 .ok_or_else(|| {
2878 AwsServiceError::aws_error(
2879 StatusCode::BAD_REQUEST,
2880 "InvalidParameterException",
2881 "deliveryDestinationPolicy is required",
2882 )
2883 })?
2884 .to_string();
2885
2886 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2887 validate_string_length("deliveryDestinationPolicy", &policy, 1, 51200)?;
2888
2889 let mut state = self.state.write();
2890 let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2891 AwsServiceError::aws_error(
2892 StatusCode::BAD_REQUEST,
2893 "ResourceNotFoundException",
2894 format!("Delivery destination '{name}' does not exist."),
2895 )
2896 })?;
2897
2898 dd.delivery_destination_policy = Some(policy.clone());
2899
2900 Ok(AwsResponse::json(
2901 StatusCode::OK,
2902 serde_json::to_string(&json!({
2903 "policy": {
2904 "deliveryDestinationPolicy": policy,
2905 }
2906 }))
2907 .unwrap(),
2908 ))
2909 }
2910
2911 fn get_delivery_destination_policy(
2912 &self,
2913 req: &AwsRequest,
2914 ) -> Result<AwsResponse, AwsServiceError> {
2915 let body = body_json(req);
2916 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2917 AwsServiceError::aws_error(
2918 StatusCode::BAD_REQUEST,
2919 "InvalidParameterException",
2920 "deliveryDestinationName is required",
2921 )
2922 })?;
2923
2924 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2925
2926 let state = self.state.read();
2927 let dd = state.delivery_destinations.get(name).ok_or_else(|| {
2928 AwsServiceError::aws_error(
2929 StatusCode::BAD_REQUEST,
2930 "ResourceNotFoundException",
2931 format!("Delivery destination '{name}' does not exist."),
2932 )
2933 })?;
2934
2935 let policy_json = if let Some(ref policy) = dd.delivery_destination_policy {
2936 json!({
2937 "deliveryDestinationPolicy": policy,
2938 })
2939 } else {
2940 json!({})
2941 };
2942
2943 Ok(AwsResponse::json(
2944 StatusCode::OK,
2945 serde_json::to_string(&json!({
2946 "policy": policy_json,
2947 }))
2948 .unwrap(),
2949 ))
2950 }
2951
2952 fn delete_delivery_destination_policy(
2953 &self,
2954 req: &AwsRequest,
2955 ) -> Result<AwsResponse, AwsServiceError> {
2956 let body = body_json(req);
2957 let name = body["deliveryDestinationName"].as_str().ok_or_else(|| {
2958 AwsServiceError::aws_error(
2959 StatusCode::BAD_REQUEST,
2960 "InvalidParameterException",
2961 "deliveryDestinationName is required",
2962 )
2963 })?;
2964
2965 validate_string_length("deliveryDestinationName", name, 1, 60)?;
2966
2967 let mut state = self.state.write();
2968 let dd = state.delivery_destinations.get_mut(name).ok_or_else(|| {
2969 AwsServiceError::aws_error(
2970 StatusCode::BAD_REQUEST,
2971 "ResourceNotFoundException",
2972 format!("Delivery destination '{name}' does not exist."),
2973 )
2974 })?;
2975
2976 dd.delivery_destination_policy = None;
2977
2978 Ok(AwsResponse::json(StatusCode::OK, "{}"))
2979 }
2980
2981 fn put_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2984 let body = body_json(req);
2985 let name = body["name"]
2986 .as_str()
2987 .ok_or_else(|| {
2988 AwsServiceError::aws_error(
2989 StatusCode::BAD_REQUEST,
2990 "InvalidParameterException",
2991 "name is required",
2992 )
2993 })?
2994 .to_string();
2995 let resource_arn = body["resourceArn"]
2996 .as_str()
2997 .ok_or_else(|| {
2998 AwsServiceError::aws_error(
2999 StatusCode::BAD_REQUEST,
3000 "InvalidParameterException",
3001 "resourceArn is required",
3002 )
3003 })?
3004 .to_string();
3005 let log_type = body["logType"]
3006 .as_str()
3007 .ok_or_else(|| {
3008 AwsServiceError::aws_error(
3009 StatusCode::BAD_REQUEST,
3010 "InvalidParameterException",
3011 "logType is required",
3012 )
3013 })?
3014 .to_string();
3015
3016 validate_string_length("name", &name, 1, 60)?;
3017 validate_string_length("logType", &log_type, 1, 255)?;
3018
3019 let tags: std::collections::HashMap<String, String> = body["tags"]
3020 .as_object()
3021 .map(|m| {
3022 m.iter()
3023 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3024 .collect()
3025 })
3026 .unwrap_or_default();
3027
3028 let service = resource_arn
3030 .split(':')
3031 .nth(2)
3032 .unwrap_or("unknown")
3033 .to_string();
3034
3035 if !resource_arn.starts_with("arn:aws:") {
3037 return Err(AwsServiceError::aws_error(
3038 StatusCode::BAD_REQUEST,
3039 "ValidationException",
3040 format!("Invalid resource ARN: {resource_arn}"),
3041 ));
3042 }
3043
3044 if service == "s3" {
3046 return Err(AwsServiceError::aws_error(
3047 StatusCode::BAD_REQUEST,
3048 "ResourceNotFoundException",
3049 format!("The resource ARN '{resource_arn}' is not a valid delivery source."),
3050 ));
3051 }
3052
3053 let valid_log_types: &[&str] = match service.as_str() {
3055 "cloudfront" => &["ACCESS_LOGS"],
3056 _ => &["ACCESS_LOGS", "APPLICATION_LOGS", "FW_LOGS"],
3057 };
3058 if !valid_log_types.contains(&log_type.as_str()) {
3059 return Err(AwsServiceError::aws_error(
3060 StatusCode::BAD_REQUEST,
3061 "ValidationException",
3062 format!("Log type '{log_type}' is not valid for this resource."),
3063 ));
3064 }
3065
3066 let mut state = self.state.write();
3067
3068 if let Some(existing) = state.delivery_sources.get(&name) {
3070 if !existing.resource_arns.is_empty() && existing.resource_arns[0] != resource_arn {
3071 return Err(AwsServiceError::aws_error(
3072 StatusCode::BAD_REQUEST,
3073 "ConflictException",
3074 "Cannot update delivery source with a different resourceArn.",
3075 ));
3076 }
3077 }
3078
3079 let arn = format!(
3080 "arn:aws:logs:{}:{}:delivery-source:{}",
3081 state.region, state.account_id, name
3082 );
3083
3084 let ds = DeliverySource {
3085 name: name.clone(),
3086 arn: arn.clone(),
3087 resource_arns: vec![resource_arn],
3088 service: service.clone(),
3089 log_type: log_type.clone(),
3090 tags: tags.clone(),
3091 };
3092
3093 state.delivery_sources.insert(name.clone(), ds);
3094
3095 let state_ref = state.delivery_sources.get(&name).unwrap();
3096
3097 Ok(AwsResponse::json(
3098 StatusCode::OK,
3099 serde_json::to_string(&json!({
3100 "deliverySource": {
3101 "name": state_ref.name,
3102 "arn": state_ref.arn,
3103 "resourceArns": state_ref.resource_arns,
3104 "service": state_ref.service,
3105 "logType": state_ref.log_type,
3106 "tags": state_ref.tags,
3107 }
3108 }))
3109 .unwrap(),
3110 ))
3111 }
3112
3113 fn get_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3114 let body = body_json(req);
3115 let name = body["name"].as_str().ok_or_else(|| {
3116 AwsServiceError::aws_error(
3117 StatusCode::BAD_REQUEST,
3118 "InvalidParameterException",
3119 "name is required",
3120 )
3121 })?;
3122
3123 validate_string_length("name", name, 1, 60)?;
3124
3125 let state = self.state.read();
3126 let ds = state.delivery_sources.get(name).ok_or_else(|| {
3127 AwsServiceError::aws_error(
3128 StatusCode::BAD_REQUEST,
3129 "ResourceNotFoundException",
3130 format!("Delivery source '{name}' does not exist."),
3131 )
3132 })?;
3133
3134 Ok(AwsResponse::json(
3135 StatusCode::OK,
3136 serde_json::to_string(&json!({
3137 "deliverySource": {
3138 "name": ds.name,
3139 "arn": ds.arn,
3140 "resourceArns": ds.resource_arns,
3141 "service": ds.service,
3142 "logType": ds.log_type,
3143 "tags": ds.tags,
3144 }
3145 }))
3146 .unwrap(),
3147 ))
3148 }
3149
3150 fn describe_delivery_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3151 let body = body_json(req);
3152 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3153 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3154
3155 let state = self.state.read();
3156 let sources: Vec<Value> = state
3157 .delivery_sources
3158 .values()
3159 .map(|ds| {
3160 json!({
3161 "name": ds.name,
3162 "arn": ds.arn,
3163 "resourceArns": ds.resource_arns,
3164 "service": ds.service,
3165 "logType": ds.log_type,
3166 "tags": ds.tags,
3167 })
3168 })
3169 .collect();
3170
3171 Ok(AwsResponse::json(
3172 StatusCode::OK,
3173 serde_json::to_string(&json!({ "deliverySources": sources })).unwrap(),
3174 ))
3175 }
3176
3177 fn delete_delivery_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3178 let body = body_json(req);
3179 let name = body["name"].as_str().ok_or_else(|| {
3180 AwsServiceError::aws_error(
3181 StatusCode::BAD_REQUEST,
3182 "InvalidParameterException",
3183 "name is required",
3184 )
3185 })?;
3186
3187 validate_string_length("name", name, 1, 60)?;
3188
3189 let mut state = self.state.write();
3190 if state.delivery_sources.remove(name).is_none() {
3191 return Err(AwsServiceError::aws_error(
3192 StatusCode::BAD_REQUEST,
3193 "ResourceNotFoundException",
3194 format!("Delivery source '{name}' does not exist."),
3195 ));
3196 }
3197
3198 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3199 }
3200
3201 fn create_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3204 let body = body_json(req);
3205 let delivery_source_name = body["deliverySourceName"]
3206 .as_str()
3207 .ok_or_else(|| {
3208 AwsServiceError::aws_error(
3209 StatusCode::BAD_REQUEST,
3210 "InvalidParameterException",
3211 "deliverySourceName is required",
3212 )
3213 })?
3214 .to_string();
3215 let delivery_destination_arn = body["deliveryDestinationArn"]
3216 .as_str()
3217 .ok_or_else(|| {
3218 AwsServiceError::aws_error(
3219 StatusCode::BAD_REQUEST,
3220 "InvalidParameterException",
3221 "deliveryDestinationArn is required",
3222 )
3223 })?
3224 .to_string();
3225
3226 validate_string_length("deliverySourceName", &delivery_source_name, 1, 60)?;
3227 validate_optional_string_length("fieldDelimiter", body["fieldDelimiter"].as_str(), 0, 5)?;
3228
3229 let tags: std::collections::HashMap<String, String> = body["tags"]
3230 .as_object()
3231 .map(|m| {
3232 m.iter()
3233 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string()))
3234 .collect()
3235 })
3236 .unwrap_or_default();
3237
3238 let record_fields: Vec<String> = body["recordFields"]
3239 .as_array()
3240 .map(|a| {
3241 a.iter()
3242 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3243 .collect()
3244 })
3245 .unwrap_or_default();
3246 let field_delimiter = body["fieldDelimiter"].as_str().map(|s| s.to_string());
3247 let s3_delivery_config = body["s3DeliveryConfiguration"].clone();
3248
3249 let mut state = self.state.write();
3250
3251 if !state.delivery_sources.contains_key(&delivery_source_name) {
3253 return Err(AwsServiceError::aws_error(
3254 StatusCode::BAD_REQUEST,
3255 "ResourceNotFoundException",
3256 format!("Delivery source '{}' does not exist.", delivery_source_name),
3257 ));
3258 }
3259
3260 let dest_exists = state
3262 .delivery_destinations
3263 .values()
3264 .any(|dd| dd.arn == delivery_destination_arn);
3265 if !dest_exists {
3266 return Err(AwsServiceError::aws_error(
3267 StatusCode::BAD_REQUEST,
3268 "ResourceNotFoundException",
3269 format!(
3270 "Delivery destination '{}' does not exist.",
3271 delivery_destination_arn
3272 ),
3273 ));
3274 }
3275
3276 let already_exists = state.deliveries.values().any(|d| {
3278 d.delivery_source_name == delivery_source_name
3279 && d.delivery_destination_arn == delivery_destination_arn
3280 });
3281 if already_exists {
3282 return Err(AwsServiceError::aws_error(
3283 StatusCode::BAD_REQUEST,
3284 "ConflictException",
3285 "A delivery already exists for this source and destination.",
3286 ));
3287 }
3288
3289 let dest_type = if delivery_destination_arn.contains(":s3:") {
3291 "S3"
3292 } else if delivery_destination_arn.contains(":firehose:") {
3293 "FH"
3294 } else {
3295 "CWL"
3296 };
3297
3298 let delivery_id = uuid::Uuid::new_v4().to_string();
3299 let arn = format!(
3300 "arn:aws:logs:{}:{}:delivery:{}",
3301 state.region, state.account_id, delivery_id
3302 );
3303
3304 let delivery = Delivery {
3305 id: delivery_id.clone(),
3306 delivery_source_name: delivery_source_name.clone(),
3307 delivery_destination_arn: delivery_destination_arn.clone(),
3308 delivery_destination_type: dest_type.to_string(),
3309 arn: arn.clone(),
3310 tags: tags.clone(),
3311 };
3312
3313 state.deliveries.insert(delivery_id.clone(), delivery);
3314
3315 let mut delivery_json = json!({
3316 "id": delivery_id,
3317 "deliverySourceName": delivery_source_name,
3318 "deliveryDestinationArn": delivery_destination_arn,
3319 "deliveryDestinationType": dest_type,
3320 "arn": arn,
3321 "tags": tags,
3322 });
3323 if !record_fields.is_empty() {
3324 delivery_json["recordFields"] = json!(record_fields);
3325 }
3326 if let Some(ref delim) = field_delimiter {
3327 delivery_json["fieldDelimiter"] = json!(delim);
3328 }
3329 if !s3_delivery_config.is_null() {
3330 delivery_json["s3DeliveryConfiguration"] = s3_delivery_config;
3331 }
3332
3333 Ok(AwsResponse::json(
3334 StatusCode::OK,
3335 serde_json::to_string(&json!({
3336 "delivery": delivery_json,
3337 }))
3338 .unwrap(),
3339 ))
3340 }
3341
3342 fn get_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3343 let body = body_json(req);
3344 let delivery_id = body["id"].as_str().ok_or_else(|| {
3345 AwsServiceError::aws_error(
3346 StatusCode::BAD_REQUEST,
3347 "InvalidParameterException",
3348 "id is required",
3349 )
3350 })?;
3351
3352 validate_string_length("id", delivery_id, 1, 64)?;
3353
3354 let state = self.state.read();
3355 let d = state.deliveries.get(delivery_id).ok_or_else(|| {
3356 AwsServiceError::aws_error(
3357 StatusCode::BAD_REQUEST,
3358 "ResourceNotFoundException",
3359 format!("Delivery '{delivery_id}' does not exist."),
3360 )
3361 })?;
3362
3363 Ok(AwsResponse::json(
3364 StatusCode::OK,
3365 serde_json::to_string(&json!({
3366 "delivery": {
3367 "id": d.id,
3368 "deliverySourceName": d.delivery_source_name,
3369 "deliveryDestinationArn": d.delivery_destination_arn,
3370 "deliveryDestinationType": d.delivery_destination_type,
3371 "arn": d.arn,
3372 "tags": d.tags,
3373 }
3374 }))
3375 .unwrap(),
3376 ))
3377 }
3378
3379 fn describe_deliveries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3380 let body = body_json(req);
3381 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
3382 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3383
3384 let state = self.state.read();
3385 let deliveries: Vec<Value> = state
3386 .deliveries
3387 .values()
3388 .map(|d| {
3389 json!({
3390 "id": d.id,
3391 "deliverySourceName": d.delivery_source_name,
3392 "deliveryDestinationArn": d.delivery_destination_arn,
3393 "deliveryDestinationType": d.delivery_destination_type,
3394 "arn": d.arn,
3395 "tags": d.tags,
3396 })
3397 })
3398 .collect();
3399
3400 Ok(AwsResponse::json(
3401 StatusCode::OK,
3402 serde_json::to_string(&json!({ "deliveries": deliveries })).unwrap(),
3403 ))
3404 }
3405
3406 fn delete_delivery(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3407 let body = body_json(req);
3408 let delivery_id = body["id"].as_str().ok_or_else(|| {
3409 AwsServiceError::aws_error(
3410 StatusCode::BAD_REQUEST,
3411 "InvalidParameterException",
3412 "id is required",
3413 )
3414 })?;
3415
3416 validate_string_length("id", delivery_id, 1, 64)?;
3417
3418 let mut state = self.state.write();
3419 if state.deliveries.remove(delivery_id).is_none() {
3420 return Err(AwsServiceError::aws_error(
3421 StatusCode::BAD_REQUEST,
3422 "ResourceNotFoundException",
3423 format!("Delivery '{delivery_id}' does not exist."),
3424 ));
3425 }
3426
3427 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3428 }
3429
3430 fn associate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3433 let body = body_json(req);
3434 let log_group_name = body["logGroupName"].as_str();
3435 let resource_identifier = body["resourceIdentifier"].as_str();
3436 let kms_key_id = body["kmsKeyId"]
3437 .as_str()
3438 .ok_or_else(|| {
3439 AwsServiceError::aws_error(
3440 StatusCode::BAD_REQUEST,
3441 "InvalidParameterException",
3442 "kmsKeyId is required",
3443 )
3444 })?
3445 .to_string();
3446
3447 if let Some(name) = log_group_name {
3448 validate_string_length("logGroupName", name, 1, 512)?;
3449 }
3450 validate_string_length("kmsKeyId", &kms_key_id, 1, 256)?;
3451 validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3452
3453 let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3454
3455 let mut state = self.state.write();
3456 let group = state
3457 .log_groups
3458 .get_mut(resolved_name.as_str())
3459 .ok_or_else(|| {
3460 AwsServiceError::aws_error(
3461 StatusCode::BAD_REQUEST,
3462 "ResourceNotFoundException",
3463 format!("The specified log group does not exist: {resolved_name}"),
3464 )
3465 })?;
3466
3467 group.kms_key_id = Some(kms_key_id);
3468
3469 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3470 }
3471
3472 fn disassociate_kms_key(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3473 let body = body_json(req);
3474 let log_group_name = body["logGroupName"].as_str();
3475 let resource_identifier = body["resourceIdentifier"].as_str();
3476
3477 if let Some(name) = log_group_name {
3478 validate_string_length("logGroupName", name, 1, 512)?;
3479 }
3480 validate_optional_string_length("resourceIdentifier", resource_identifier, 1, 2048)?;
3481
3482 let resolved_name = resolve_log_group_name(log_group_name, resource_identifier)?;
3483
3484 let mut state = self.state.write();
3485 let group = state
3486 .log_groups
3487 .get_mut(resolved_name.as_str())
3488 .ok_or_else(|| {
3489 AwsServiceError::aws_error(
3490 StatusCode::BAD_REQUEST,
3491 "ResourceNotFoundException",
3492 format!("The specified log group does not exist: {resolved_name}"),
3493 )
3494 })?;
3495
3496 group.kms_key_id = None;
3497
3498 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3499 }
3500
3501 fn put_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3504 let body = body_json(req);
3505 let name = body["name"]
3506 .as_str()
3507 .ok_or_else(|| {
3508 AwsServiceError::aws_error(
3509 StatusCode::BAD_REQUEST,
3510 "InvalidParameterException",
3511 "name is required",
3512 )
3513 })?
3514 .to_string();
3515 let query_string = body["queryString"]
3516 .as_str()
3517 .ok_or_else(|| {
3518 AwsServiceError::aws_error(
3519 StatusCode::BAD_REQUEST,
3520 "InvalidParameterException",
3521 "queryString is required",
3522 )
3523 })?
3524 .to_string();
3525 let log_group_names: Vec<String> = body["logGroupNames"]
3526 .as_array()
3527 .map(|a| {
3528 a.iter()
3529 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3530 .collect()
3531 })
3532 .unwrap_or_default();
3533
3534 let query_definition_id = body["queryDefinitionId"]
3535 .as_str()
3536 .map(|s| s.to_string())
3537 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
3538
3539 validate_string_length("name", &name, 1, 255)?;
3540 validate_string_length("queryString", &query_string, 1, 10000)?;
3541 validate_optional_string_length(
3542 "queryDefinitionId",
3543 body["queryDefinitionId"].as_str(),
3544 1,
3545 256,
3546 )?;
3547 validate_optional_string_length("clientToken", body["clientToken"].as_str(), 36, 128)?;
3548 validate_optional_enum_value(
3549 "queryLanguage",
3550 &body["queryLanguage"],
3551 &["CWLI", "SQL", "PPL"],
3552 )?;
3553
3554 let now = Utc::now().timestamp_millis();
3555
3556 let mut state = self.state.write();
3557 state.query_definitions.insert(
3558 query_definition_id.clone(),
3559 QueryDefinition {
3560 query_definition_id: query_definition_id.clone(),
3561 name,
3562 query_string,
3563 log_group_names,
3564 last_modified: now,
3565 },
3566 );
3567
3568 Ok(AwsResponse::json(
3569 StatusCode::OK,
3570 serde_json::to_string(&json!({
3571 "queryDefinitionId": query_definition_id,
3572 }))
3573 .unwrap(),
3574 ))
3575 }
3576
3577 fn describe_query_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3578 let body = body_json(req);
3579 let name_prefix = body["queryDefinitionNamePrefix"].as_str().unwrap_or("");
3580 validate_optional_string_length(
3581 "queryDefinitionNamePrefix",
3582 body["queryDefinitionNamePrefix"].as_str(),
3583 1,
3584 255,
3585 )?;
3586 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
3587 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 2048)?;
3588 validate_optional_enum_value(
3589 "queryLanguage",
3590 &body["queryLanguage"],
3591 &["CWLI", "SQL", "PPL"],
3592 )?;
3593
3594 let state = self.state.read();
3595 let defs: Vec<Value> = state
3596 .query_definitions
3597 .values()
3598 .filter(|qd| name_prefix.is_empty() || qd.name.starts_with(name_prefix))
3599 .map(|qd| {
3600 json!({
3601 "queryDefinitionId": qd.query_definition_id,
3602 "name": qd.name,
3603 "queryString": qd.query_string,
3604 "logGroupNames": qd.log_group_names,
3605 "lastModified": qd.last_modified,
3606 })
3607 })
3608 .collect();
3609
3610 Ok(AwsResponse::json(
3611 StatusCode::OK,
3612 serde_json::to_string(&json!({ "queryDefinitions": defs })).unwrap(),
3613 ))
3614 }
3615
3616 fn delete_query_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3617 let body = body_json(req);
3618 let qd_id = body["queryDefinitionId"].as_str().ok_or_else(|| {
3619 AwsServiceError::aws_error(
3620 StatusCode::BAD_REQUEST,
3621 "InvalidParameterException",
3622 "queryDefinitionId is required",
3623 )
3624 })?;
3625
3626 validate_string_length("queryDefinitionId", qd_id, 1, 256)?;
3627
3628 let mut state = self.state.write();
3629 let success = state.query_definitions.remove(qd_id).is_some();
3630
3631 Ok(AwsResponse::json(
3632 StatusCode::OK,
3633 serde_json::to_string(&json!({ "success": success })).unwrap(),
3634 ))
3635 }
3636
3637 fn put_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3640 let body = body_json(req);
3641 validate_optional_enum_value(
3642 "policyType",
3643 &body["policyType"],
3644 &[
3645 "DATA_PROTECTION_POLICY",
3646 "SUBSCRIPTION_FILTER_POLICY",
3647 "FIELD_INDEX_POLICY",
3648 "TRANSFORMER_POLICY",
3649 "METRIC_EXTRACTION_POLICY",
3650 ],
3651 )?;
3652 validate_optional_enum_value("scope", &body["scope"], &["ALL"])?;
3653 let policy_name = body["policyName"].as_str().ok_or_else(|| {
3654 AwsServiceError::aws_error(
3655 StatusCode::BAD_REQUEST,
3656 "InvalidParameterException",
3657 "policyName is required",
3658 )
3659 })?;
3660 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3661 AwsServiceError::aws_error(
3662 StatusCode::BAD_REQUEST,
3663 "InvalidParameterException",
3664 "policyType is required",
3665 )
3666 })?;
3667 let policy_document = body["policyDocument"].as_str().ok_or_else(|| {
3668 AwsServiceError::aws_error(
3669 StatusCode::BAD_REQUEST,
3670 "InvalidParameterException",
3671 "policyDocument is required",
3672 )
3673 })?;
3674
3675 let now = Utc::now().timestamp_millis();
3676 let mut state = self.state.write();
3677 let account_id = state.account_id.clone();
3678 let scope = body["scope"].as_str().map(|s| s.to_string());
3679 let selection_criteria = body["selectionCriteria"].as_str().map(|s| s.to_string());
3680
3681 let policy = AccountPolicy {
3682 policy_name: policy_name.to_string(),
3683 policy_type: policy_type.to_string(),
3684 policy_document: policy_document.to_string(),
3685 scope: scope.clone(),
3686 selection_criteria: selection_criteria.clone(),
3687 account_id: account_id.clone(),
3688 last_updated_time: now,
3689 };
3690
3691 let key = (policy_name.to_string(), policy_type.to_string());
3692 state.account_policies.insert(key, policy);
3693
3694 let mut result = json!({
3695 "accountPolicy": {
3696 "policyName": policy_name,
3697 "policyType": policy_type,
3698 "policyDocument": policy_document,
3699 "accountId": account_id,
3700 "lastUpdatedTime": now,
3701 }
3702 });
3703 if let Some(s) = scope {
3704 result["accountPolicy"]["scope"] = json!(s);
3705 }
3706 if let Some(s) = selection_criteria {
3707 result["accountPolicy"]["selectionCriteria"] = json!(s);
3708 }
3709
3710 Ok(AwsResponse::json(
3711 StatusCode::OK,
3712 serde_json::to_string(&result).unwrap(),
3713 ))
3714 }
3715
3716 fn describe_account_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3717 let body = body_json(req);
3718 validate_optional_enum_value(
3719 "policyType",
3720 &body["policyType"],
3721 &[
3722 "DATA_PROTECTION_POLICY",
3723 "SUBSCRIPTION_FILTER_POLICY",
3724 "FIELD_INDEX_POLICY",
3725 "TRANSFORMER_POLICY",
3726 "METRIC_EXTRACTION_POLICY",
3727 ],
3728 )?;
3729 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
3730 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3731 AwsServiceError::aws_error(
3732 StatusCode::BAD_REQUEST,
3733 "InvalidParameterException",
3734 "policyType is required",
3735 )
3736 })?;
3737 let policy_name = body["policyName"].as_str();
3738
3739 let state = self.state.read();
3740 let policies: Vec<Value> = state
3741 .account_policies
3742 .values()
3743 .filter(|p| {
3744 p.policy_type == policy_type && policy_name.is_none_or(|n| p.policy_name == n)
3745 })
3746 .map(|p| {
3747 let mut obj = json!({
3748 "policyName": p.policy_name,
3749 "policyType": p.policy_type,
3750 "policyDocument": p.policy_document,
3751 "accountId": p.account_id,
3752 "lastUpdatedTime": p.last_updated_time,
3753 });
3754 if let Some(ref s) = p.scope {
3755 obj["scope"] = json!(s);
3756 }
3757 if let Some(ref s) = p.selection_criteria {
3758 obj["selectionCriteria"] = json!(s);
3759 }
3760 obj
3761 })
3762 .collect();
3763
3764 Ok(AwsResponse::json(
3765 StatusCode::OK,
3766 serde_json::to_string(&json!({ "accountPolicies": policies })).unwrap(),
3767 ))
3768 }
3769
3770 fn delete_account_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3771 let body = body_json(req);
3772 let policy_name = body["policyName"].as_str().ok_or_else(|| {
3773 AwsServiceError::aws_error(
3774 StatusCode::BAD_REQUEST,
3775 "InvalidParameterException",
3776 "policyName is required",
3777 )
3778 })?;
3779 let policy_type = body["policyType"].as_str().ok_or_else(|| {
3780 AwsServiceError::aws_error(
3781 StatusCode::BAD_REQUEST,
3782 "InvalidParameterException",
3783 "policyType is required",
3784 )
3785 })?;
3786
3787 let key = (policy_name.to_string(), policy_type.to_string());
3788 let mut state = self.state.write();
3789 if state.account_policies.remove(&key).is_none() {
3790 return Err(AwsServiceError::aws_error(
3791 StatusCode::BAD_REQUEST,
3792 "ResourceNotFoundException",
3793 format!("Account policy {policy_name} of type {policy_type} not found"),
3794 ));
3795 }
3796
3797 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3798 }
3799
3800 fn put_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3803 let body = body_json(req);
3804 let log_group_id = body["logGroupIdentifier"]
3805 .as_str()
3806 .ok_or_else(|| {
3807 AwsServiceError::aws_error(
3808 StatusCode::BAD_REQUEST,
3809 "InvalidParameterException",
3810 "logGroupIdentifier is required",
3811 )
3812 })?
3813 .to_string();
3814 let policy_document = body["policyDocument"]
3815 .as_str()
3816 .ok_or_else(|| {
3817 AwsServiceError::aws_error(
3818 StatusCode::BAD_REQUEST,
3819 "InvalidParameterException",
3820 "policyDocument is required",
3821 )
3822 })?
3823 .to_string();
3824
3825 let group_name = if log_group_id.starts_with("arn:") {
3826 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3827 AwsServiceError::aws_error(
3828 StatusCode::BAD_REQUEST,
3829 "InvalidParameterException",
3830 format!("Invalid ARN: {log_group_id}"),
3831 )
3832 })?
3833 } else {
3834 log_group_id.clone()
3835 };
3836
3837 let now = Utc::now().timestamp_millis();
3838 let mut state = self.state.write();
3839 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3840 AwsServiceError::aws_error(
3841 StatusCode::BAD_REQUEST,
3842 "ResourceNotFoundException",
3843 format!("The specified log group does not exist: {group_name}"),
3844 )
3845 })?;
3846 let log_group_id_resp = group.arn.clone();
3847
3848 group.data_protection_policy = Some(DataProtectionPolicy {
3849 policy_document: policy_document.clone(),
3850 last_updated_time: now,
3851 });
3852
3853 Ok(AwsResponse::json(
3854 StatusCode::OK,
3855 serde_json::to_string(&json!({
3856 "logGroupIdentifier": log_group_id_resp,
3857 "policyDocument": policy_document,
3858 "lastUpdatedTime": now,
3859 }))
3860 .unwrap(),
3861 ))
3862 }
3863
3864 fn get_data_protection_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3865 let body = body_json(req);
3866 let log_group_id = body["logGroupIdentifier"]
3867 .as_str()
3868 .ok_or_else(|| {
3869 AwsServiceError::aws_error(
3870 StatusCode::BAD_REQUEST,
3871 "InvalidParameterException",
3872 "logGroupIdentifier is required",
3873 )
3874 })?
3875 .to_string();
3876
3877 let group_name = if log_group_id.starts_with("arn:") {
3878 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3879 AwsServiceError::aws_error(
3880 StatusCode::BAD_REQUEST,
3881 "InvalidParameterException",
3882 format!("Invalid ARN: {log_group_id}"),
3883 )
3884 })?
3885 } else {
3886 log_group_id.clone()
3887 };
3888
3889 let state = self.state.read();
3890 let group = state.log_groups.get(&group_name).ok_or_else(|| {
3891 AwsServiceError::aws_error(
3892 StatusCode::BAD_REQUEST,
3893 "ResourceNotFoundException",
3894 format!("The specified log group does not exist: {group_name}"),
3895 )
3896 })?;
3897
3898 let mut result = json!({
3899 "logGroupIdentifier": group.arn,
3900 });
3901 if let Some(ref dp) = group.data_protection_policy {
3902 result["policyDocument"] = json!(dp.policy_document);
3903 result["lastUpdatedTime"] = json!(dp.last_updated_time);
3904 }
3905
3906 Ok(AwsResponse::json(
3907 StatusCode::OK,
3908 serde_json::to_string(&result).unwrap(),
3909 ))
3910 }
3911
3912 fn delete_data_protection_policy(
3913 &self,
3914 req: &AwsRequest,
3915 ) -> Result<AwsResponse, AwsServiceError> {
3916 let body = body_json(req);
3917 let log_group_id = body["logGroupIdentifier"]
3918 .as_str()
3919 .ok_or_else(|| {
3920 AwsServiceError::aws_error(
3921 StatusCode::BAD_REQUEST,
3922 "InvalidParameterException",
3923 "logGroupIdentifier is required",
3924 )
3925 })?
3926 .to_string();
3927
3928 let group_name = if log_group_id.starts_with("arn:") {
3929 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3930 AwsServiceError::aws_error(
3931 StatusCode::BAD_REQUEST,
3932 "InvalidParameterException",
3933 format!("Invalid ARN: {log_group_id}"),
3934 )
3935 })?
3936 } else {
3937 log_group_id
3938 };
3939
3940 let mut state = self.state.write();
3941 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
3942 AwsServiceError::aws_error(
3943 StatusCode::BAD_REQUEST,
3944 "ResourceNotFoundException",
3945 format!("The specified log group does not exist: {group_name}"),
3946 )
3947 })?;
3948
3949 if group.data_protection_policy.is_none() {
3950 return Err(AwsServiceError::aws_error(
3951 StatusCode::BAD_REQUEST,
3952 "ResourceNotFoundException",
3953 "No data protection policy found for this log group",
3954 ));
3955 }
3956
3957 group.data_protection_policy = None;
3958 Ok(AwsResponse::json(StatusCode::OK, "{}"))
3959 }
3960
3961 fn put_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3964 let body = body_json(req);
3965 let log_group_id = body["logGroupIdentifier"]
3966 .as_str()
3967 .ok_or_else(|| {
3968 AwsServiceError::aws_error(
3969 StatusCode::BAD_REQUEST,
3970 "InvalidParameterException",
3971 "logGroupIdentifier is required",
3972 )
3973 })?
3974 .to_string();
3975 let policy_document = body["policyDocument"]
3976 .as_str()
3977 .ok_or_else(|| {
3978 AwsServiceError::aws_error(
3979 StatusCode::BAD_REQUEST,
3980 "InvalidParameterException",
3981 "policyDocument is required",
3982 )
3983 })?
3984 .to_string();
3985
3986 let group_name = if log_group_id.starts_with("arn:") {
3987 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
3988 AwsServiceError::aws_error(
3989 StatusCode::BAD_REQUEST,
3990 "InvalidParameterException",
3991 format!("Invalid ARN: {log_group_id}"),
3992 )
3993 })?
3994 } else {
3995 log_group_id.clone()
3996 };
3997
3998 let policy_name = body["policyName"].as_str().unwrap_or("default").to_string();
3999
4000 let now = Utc::now().timestamp_millis();
4001 let mut state = self.state.write();
4002 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4003 AwsServiceError::aws_error(
4004 StatusCode::BAD_REQUEST,
4005 "ResourceNotFoundException",
4006 format!("The specified log group does not exist: {group_name}"),
4007 )
4008 })?;
4009
4010 if let Some(existing) = group
4012 .index_policies
4013 .iter_mut()
4014 .find(|p| p.policy_name == policy_name)
4015 {
4016 existing.policy_document = policy_document.clone();
4017 existing.last_updated_time = now;
4018 } else {
4019 group.index_policies.push(IndexPolicy {
4020 policy_name: policy_name.clone(),
4021 policy_document: policy_document.clone(),
4022 last_updated_time: now,
4023 });
4024 }
4025
4026 let result = json!({
4027 "indexPolicy": {
4028 "policyName": policy_name,
4029 "policyDocument": policy_document,
4030 "logGroupIdentifier": group.arn,
4031 "lastUpdateTime": now,
4032 }
4033 });
4034
4035 Ok(AwsResponse::json(
4036 StatusCode::OK,
4037 serde_json::to_string(&result).unwrap(),
4038 ))
4039 }
4040
4041 fn describe_index_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4042 let body = body_json(req);
4043 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4044 let log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4045 AwsServiceError::aws_error(
4046 StatusCode::BAD_REQUEST,
4047 "InvalidParameterException",
4048 "logGroupIdentifiers is required",
4049 )
4050 })?;
4051
4052 let state = self.state.read();
4053 let mut policies = Vec::new();
4054
4055 for id_val in log_group_ids {
4056 let id = id_val.as_str().unwrap_or("");
4057 let group_name = if id.starts_with("arn:") {
4058 extract_log_group_from_arn(id).unwrap_or_default()
4059 } else {
4060 id.to_string()
4061 };
4062 if let Some(group) = state.log_groups.get(&group_name) {
4063 for p in &group.index_policies {
4064 policies.push(json!({
4065 "policyName": p.policy_name,
4066 "policyDocument": p.policy_document,
4067 "logGroupIdentifier": group.arn,
4068 "lastUpdateTime": p.last_updated_time,
4069 }));
4070 }
4071 }
4072 }
4073
4074 Ok(AwsResponse::json(
4075 StatusCode::OK,
4076 serde_json::to_string(&json!({ "indexPolicies": policies })).unwrap(),
4077 ))
4078 }
4079
4080 fn delete_index_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4081 let body = body_json(req);
4082 let log_group_id = body["logGroupIdentifier"]
4083 .as_str()
4084 .ok_or_else(|| {
4085 AwsServiceError::aws_error(
4086 StatusCode::BAD_REQUEST,
4087 "InvalidParameterException",
4088 "logGroupIdentifier is required",
4089 )
4090 })?
4091 .to_string();
4092
4093 let group_name = if log_group_id.starts_with("arn:") {
4094 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4095 AwsServiceError::aws_error(
4096 StatusCode::BAD_REQUEST,
4097 "InvalidParameterException",
4098 format!("Invalid ARN: {log_group_id}"),
4099 )
4100 })?
4101 } else {
4102 log_group_id
4103 };
4104
4105 let mut state = self.state.write();
4106 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4107 AwsServiceError::aws_error(
4108 StatusCode::BAD_REQUEST,
4109 "ResourceNotFoundException",
4110 format!("The specified log group does not exist: {group_name}"),
4111 )
4112 })?;
4113
4114 if group.index_policies.is_empty() {
4115 return Err(AwsServiceError::aws_error(
4116 StatusCode::BAD_REQUEST,
4117 "ResourceNotFoundException",
4118 "No index policy found for this log group",
4119 ));
4120 }
4121
4122 group.index_policies.clear();
4123 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4124 }
4125
4126 fn describe_field_indexes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4127 let body = body_json(req);
4128 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4129 let _log_group_ids = body["logGroupIdentifiers"].as_array().ok_or_else(|| {
4131 AwsServiceError::aws_error(
4132 StatusCode::BAD_REQUEST,
4133 "InvalidParameterException",
4134 "logGroupIdentifiers is required",
4135 )
4136 })?;
4137
4138 Ok(AwsResponse::json(
4140 StatusCode::OK,
4141 serde_json::to_string(&json!({ "fieldIndexes": [] })).unwrap(),
4142 ))
4143 }
4144
4145 fn put_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4148 let body = body_json(req);
4149 let log_group_id = body["logGroupIdentifier"]
4150 .as_str()
4151 .ok_or_else(|| {
4152 AwsServiceError::aws_error(
4153 StatusCode::BAD_REQUEST,
4154 "InvalidParameterException",
4155 "logGroupIdentifier is required",
4156 )
4157 })?
4158 .to_string();
4159 let transformer_config = body.get("transformerConfig").cloned().ok_or_else(|| {
4160 AwsServiceError::aws_error(
4161 StatusCode::BAD_REQUEST,
4162 "InvalidParameterException",
4163 "transformerConfig is required",
4164 )
4165 })?;
4166
4167 let group_name = if log_group_id.starts_with("arn:") {
4168 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4169 AwsServiceError::aws_error(
4170 StatusCode::BAD_REQUEST,
4171 "InvalidParameterException",
4172 format!("Invalid ARN: {log_group_id}"),
4173 )
4174 })?
4175 } else {
4176 log_group_id.clone()
4177 };
4178
4179 let now = Utc::now().timestamp_millis();
4180 let mut state = self.state.write();
4181 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4182 AwsServiceError::aws_error(
4183 StatusCode::BAD_REQUEST,
4184 "ResourceNotFoundException",
4185 format!("The specified log group does not exist: {group_name}"),
4186 )
4187 })?;
4188
4189 group.transformer = Some(Transformer {
4190 transformer_config,
4191 creation_time: now,
4192 last_modified_time: now,
4193 });
4194
4195 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4196 }
4197
4198 fn get_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4199 let body = body_json(req);
4200 let log_group_id = body["logGroupIdentifier"]
4201 .as_str()
4202 .ok_or_else(|| {
4203 AwsServiceError::aws_error(
4204 StatusCode::BAD_REQUEST,
4205 "InvalidParameterException",
4206 "logGroupIdentifier is required",
4207 )
4208 })?
4209 .to_string();
4210
4211 let group_name = if log_group_id.starts_with("arn:") {
4212 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4213 AwsServiceError::aws_error(
4214 StatusCode::BAD_REQUEST,
4215 "InvalidParameterException",
4216 format!("Invalid ARN: {log_group_id}"),
4217 )
4218 })?
4219 } else {
4220 log_group_id.clone()
4221 };
4222
4223 let state = self.state.read();
4224 let group = state.log_groups.get(&group_name).ok_or_else(|| {
4225 AwsServiceError::aws_error(
4226 StatusCode::BAD_REQUEST,
4227 "ResourceNotFoundException",
4228 format!("The specified log group does not exist: {group_name}"),
4229 )
4230 })?;
4231
4232 let mut result = json!({
4233 "logGroupIdentifier": group.arn,
4234 });
4235 if let Some(ref t) = group.transformer {
4236 result["transformerConfig"] = t.transformer_config.clone();
4237 result["creationTime"] = json!(t.creation_time);
4238 result["lastModifiedTime"] = json!(t.last_modified_time);
4239 }
4240
4241 Ok(AwsResponse::json(
4242 StatusCode::OK,
4243 serde_json::to_string(&result).unwrap(),
4244 ))
4245 }
4246
4247 fn delete_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4248 let body = body_json(req);
4249 let log_group_id = body["logGroupIdentifier"]
4250 .as_str()
4251 .ok_or_else(|| {
4252 AwsServiceError::aws_error(
4253 StatusCode::BAD_REQUEST,
4254 "InvalidParameterException",
4255 "logGroupIdentifier is required",
4256 )
4257 })?
4258 .to_string();
4259
4260 let group_name = if log_group_id.starts_with("arn:") {
4261 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4262 AwsServiceError::aws_error(
4263 StatusCode::BAD_REQUEST,
4264 "InvalidParameterException",
4265 format!("Invalid ARN: {log_group_id}"),
4266 )
4267 })?
4268 } else {
4269 log_group_id
4270 };
4271
4272 let mut state = self.state.write();
4273 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4274 AwsServiceError::aws_error(
4275 StatusCode::BAD_REQUEST,
4276 "ResourceNotFoundException",
4277 format!("The specified log group does not exist: {group_name}"),
4278 )
4279 })?;
4280
4281 group.transformer = None;
4282 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4283 }
4284
4285 fn test_transformer(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4286 let body = body_json(req);
4287 let _transformer_config = body.get("transformerConfig").ok_or_else(|| {
4288 AwsServiceError::aws_error(
4289 StatusCode::BAD_REQUEST,
4290 "InvalidParameterException",
4291 "transformerConfig is required",
4292 )
4293 })?;
4294 let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4295 AwsServiceError::aws_error(
4296 StatusCode::BAD_REQUEST,
4297 "InvalidParameterException",
4298 "logEventMessages is required",
4299 )
4300 })?;
4301
4302 let transformed: Vec<Value> = log_event_messages
4304 .iter()
4305 .map(|msg| {
4306 json!({
4307 "eventMessage": msg,
4308 "transformedEventMessage": msg,
4309 })
4310 })
4311 .collect();
4312
4313 Ok(AwsResponse::json(
4314 StatusCode::OK,
4315 serde_json::to_string(&json!({
4316 "transformedLogs": transformed,
4317 }))
4318 .unwrap(),
4319 ))
4320 }
4321
4322 fn create_log_anomaly_detector(
4325 &self,
4326 req: &AwsRequest,
4327 ) -> Result<AwsResponse, AwsServiceError> {
4328 let body = body_json(req);
4329 validate_optional_string_length("detectorName", body["detectorName"].as_str(), 1, 2048)?;
4330 validate_optional_enum_value(
4331 "evaluationFrequency",
4332 &body["evaluationFrequency"],
4333 &[
4334 "ONE_MIN",
4335 "FIVE_MIN",
4336 "TEN_MIN",
4337 "FIFTEEN_MIN",
4338 "THIRTY_MIN",
4339 "ONE_HOUR",
4340 ],
4341 )?;
4342 validate_optional_string_length("filterPattern", body["filterPattern"].as_str(), 0, 1024)?;
4343 validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
4344 validate_optional_range_i64(
4345 "anomalyVisibilityTime",
4346 body["anomalyVisibilityTime"].as_i64(),
4347 7,
4348 90,
4349 )?;
4350
4351 let log_group_arn_list = body["logGroupArnList"]
4352 .as_array()
4353 .ok_or_else(|| {
4354 AwsServiceError::aws_error(
4355 StatusCode::BAD_REQUEST,
4356 "InvalidParameterException",
4357 "logGroupArnList is required",
4358 )
4359 })?
4360 .iter()
4361 .filter_map(|v| v.as_str().map(|s| s.to_string()))
4362 .collect::<Vec<_>>();
4363
4364 let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
4365 let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
4366 let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
4367 let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
4368
4369 let now = Utc::now().timestamp_millis();
4370 let mut state = self.state.write();
4371 let detector_id = uuid::Uuid::new_v4().to_string();
4372 let arn = format!(
4373 "arn:aws:logs:{}:{}:anomaly-detector:{}",
4374 state.region, state.account_id, detector_id
4375 );
4376
4377 let detector = AnomalyDetector {
4378 detector_name: detector_name.clone(),
4379 arn: arn.clone(),
4380 log_group_arn_list,
4381 evaluation_frequency,
4382 filter_pattern,
4383 anomaly_visibility_time,
4384 creation_time: now,
4385 last_modified_time: now,
4386 enabled: true,
4387 };
4388
4389 state.anomaly_detectors.insert(arn.clone(), detector);
4390
4391 Ok(AwsResponse::json(
4392 StatusCode::OK,
4393 serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
4394 ))
4395 }
4396
4397 fn get_log_anomaly_detector(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4398 let body = body_json(req);
4399 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4400 AwsServiceError::aws_error(
4401 StatusCode::BAD_REQUEST,
4402 "InvalidParameterException",
4403 "anomalyDetectorArn is required",
4404 )
4405 })?;
4406
4407 let state = self.state.read();
4408 let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
4409 AwsServiceError::aws_error(
4410 StatusCode::BAD_REQUEST,
4411 "ResourceNotFoundException",
4412 format!("Anomaly detector not found: {arn}"),
4413 )
4414 })?;
4415
4416 let mut result = json!({
4417 "anomalyDetectorArn": detector.arn,
4418 "detectorName": detector.detector_name,
4419 "logGroupArnList": detector.log_group_arn_list,
4420 "creationTimeStamp": detector.creation_time,
4421 "lastModifiedTimeStamp": detector.last_modified_time,
4422 "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
4423 });
4424 if let Some(ref f) = detector.evaluation_frequency {
4425 result["evaluationFrequency"] = json!(f);
4426 }
4427 if let Some(ref f) = detector.filter_pattern {
4428 result["filterPattern"] = json!(f);
4429 }
4430 if let Some(t) = detector.anomaly_visibility_time {
4431 result["anomalyVisibilityTime"] = json!(t);
4432 }
4433
4434 Ok(AwsResponse::json(
4435 StatusCode::OK,
4436 serde_json::to_string(&result).unwrap(),
4437 ))
4438 }
4439
4440 fn delete_log_anomaly_detector(
4441 &self,
4442 req: &AwsRequest,
4443 ) -> Result<AwsResponse, AwsServiceError> {
4444 let body = body_json(req);
4445 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4446 AwsServiceError::aws_error(
4447 StatusCode::BAD_REQUEST,
4448 "InvalidParameterException",
4449 "anomalyDetectorArn is required",
4450 )
4451 })?;
4452
4453 let mut state = self.state.write();
4454 if state.anomaly_detectors.remove(arn).is_none() {
4455 return Err(AwsServiceError::aws_error(
4456 StatusCode::BAD_REQUEST,
4457 "ResourceNotFoundException",
4458 format!("Anomaly detector not found: {arn}"),
4459 ));
4460 }
4461
4462 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4463 }
4464
4465 fn list_log_anomaly_detectors(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4466 let body = body_json(req);
4467 validate_optional_string_length(
4468 "filterLogGroupArn",
4469 body["filterLogGroupArn"].as_str(),
4470 1,
4471 2048,
4472 )?;
4473 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4474 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4475 let filter_log_group_arn = body["filterLogGroupArn"].as_str();
4476 let _limit = body["limit"].as_i64().unwrap_or(50);
4477
4478 let state = self.state.read();
4479 let detectors: Vec<Value> = state
4480 .anomaly_detectors
4481 .values()
4482 .filter(|d| {
4483 filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
4484 })
4485 .map(|d| {
4486 let mut obj = json!({
4487 "anomalyDetectorArn": d.arn,
4488 "detectorName": d.detector_name,
4489 "logGroupArnList": d.log_group_arn_list,
4490 "creationTimeStamp": d.creation_time,
4491 "lastModifiedTimeStamp": d.last_modified_time,
4492 "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
4493 });
4494 if let Some(ref f) = d.evaluation_frequency {
4495 obj["evaluationFrequency"] = json!(f);
4496 }
4497 if let Some(ref f) = d.filter_pattern {
4498 obj["filterPattern"] = json!(f);
4499 }
4500 if let Some(t) = d.anomaly_visibility_time {
4501 obj["anomalyVisibilityTime"] = json!(t);
4502 }
4503 obj
4504 })
4505 .collect();
4506
4507 Ok(AwsResponse::json(
4508 StatusCode::OK,
4509 serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
4510 ))
4511 }
4512
4513 fn update_log_anomaly_detector(
4514 &self,
4515 req: &AwsRequest,
4516 ) -> Result<AwsResponse, AwsServiceError> {
4517 let body = body_json(req);
4518 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
4519 AwsServiceError::aws_error(
4520 StatusCode::BAD_REQUEST,
4521 "InvalidParameterException",
4522 "anomalyDetectorArn is required",
4523 )
4524 })?;
4525 let enabled = body["enabled"].as_bool().unwrap_or(true);
4526
4527 let mut state = self.state.write();
4528 let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
4529 AwsServiceError::aws_error(
4530 StatusCode::BAD_REQUEST,
4531 "ResourceNotFoundException",
4532 format!("Anomaly detector not found: {arn}"),
4533 )
4534 })?;
4535
4536 detector.enabled = enabled;
4537 if let Some(f) = body["evaluationFrequency"].as_str() {
4538 detector.evaluation_frequency = Some(f.to_string());
4539 }
4540 if let Some(f) = body["filterPattern"].as_str() {
4541 detector.filter_pattern = Some(f.to_string());
4542 }
4543 if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
4544 detector.anomaly_visibility_time = Some(t);
4545 }
4546 detector.last_modified_time = Utc::now().timestamp_millis();
4547
4548 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4549 }
4550
4551 fn get_log_group_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4554 let body = body_json(req);
4555 let log_group_id = body["logGroupName"]
4556 .as_str()
4557 .or_else(|| body["logGroupIdentifier"].as_str())
4558 .ok_or_else(|| {
4559 AwsServiceError::aws_error(
4560 StatusCode::BAD_REQUEST,
4561 "InvalidParameterException",
4562 "logGroupName or logGroupIdentifier is required",
4563 )
4564 })?;
4565
4566 let group_name = if log_group_id.starts_with("arn:") {
4567 extract_log_group_from_arn(log_group_id).ok_or_else(|| {
4568 AwsServiceError::aws_error(
4569 StatusCode::BAD_REQUEST,
4570 "InvalidParameterException",
4571 format!("Invalid ARN: {log_group_id}"),
4572 )
4573 })?
4574 } else {
4575 log_group_id.to_string()
4576 };
4577
4578 let state = self.state.read();
4579 if !state.log_groups.contains_key(&group_name) {
4580 return Err(AwsServiceError::aws_error(
4581 StatusCode::BAD_REQUEST,
4582 "ResourceNotFoundException",
4583 format!("The specified log group does not exist: {group_name}"),
4584 ));
4585 }
4586
4587 let fields = json!([
4589 { "fieldName": "@timestamp", "percent": 100 },
4590 { "fieldName": "@message", "percent": 100 },
4591 ]);
4592
4593 Ok(AwsResponse::json(
4594 StatusCode::OK,
4595 serde_json::to_string(&json!({ "logGroupFields": fields })).unwrap(),
4596 ))
4597 }
4598
4599 fn test_metric_filter(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4600 let body = body_json(req);
4601 let filter_pattern = body["filterPattern"].as_str().ok_or_else(|| {
4602 AwsServiceError::aws_error(
4603 StatusCode::BAD_REQUEST,
4604 "InvalidParameterException",
4605 "filterPattern is required",
4606 )
4607 })?;
4608 validate_string_length("filterPattern", filter_pattern, 0, 1024)?;
4609 let log_event_messages = body["logEventMessages"].as_array().ok_or_else(|| {
4610 AwsServiceError::aws_error(
4611 StatusCode::BAD_REQUEST,
4612 "InvalidParameterException",
4613 "logEventMessages is required",
4614 )
4615 })?;
4616
4617 let matches: Vec<Value> = log_event_messages
4618 .iter()
4619 .enumerate()
4620 .filter(|(_, msg)| {
4621 let msg_str = msg.as_str().unwrap_or("");
4622 matches_filter_pattern(filter_pattern, msg_str)
4623 })
4624 .map(|(i, msg)| {
4625 json!({
4626 "eventNumber": i + 1,
4627 "eventMessage": msg,
4628 "extractedValues": {},
4629 })
4630 })
4631 .collect();
4632
4633 Ok(AwsResponse::json(
4634 StatusCode::OK,
4635 serde_json::to_string(&json!({ "matches": matches })).unwrap(),
4636 ))
4637 }
4638
4639 fn stop_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4640 let body = body_json(req);
4641 let query_id = body["queryId"].as_str().ok_or_else(|| {
4642 AwsServiceError::aws_error(
4643 StatusCode::BAD_REQUEST,
4644 "InvalidParameterException",
4645 "queryId is required",
4646 )
4647 })?;
4648
4649 let mut state = self.state.write();
4650 let query = state.queries.get_mut(query_id).ok_or_else(|| {
4651 AwsServiceError::aws_error(
4652 StatusCode::BAD_REQUEST,
4653 "InvalidParameterException",
4654 format!("Query {query_id} is not in a cancellable state"),
4655 )
4656 })?;
4657
4658 let was_running = query.status == "Running" || query.status == "Scheduled";
4659 if was_running {
4660 query.status = "Cancelled".to_string();
4661 }
4662
4663 Ok(AwsResponse::json(
4664 StatusCode::OK,
4665 serde_json::to_string(&json!({ "success": was_running })).unwrap(),
4666 ))
4667 }
4668
4669 fn put_log_group_deletion_protection(
4670 &self,
4671 req: &AwsRequest,
4672 ) -> Result<AwsResponse, AwsServiceError> {
4673 let body = body_json(req);
4674 let log_group_id = body["logGroupIdentifier"]
4675 .as_str()
4676 .ok_or_else(|| {
4677 AwsServiceError::aws_error(
4678 StatusCode::BAD_REQUEST,
4679 "InvalidParameterException",
4680 "logGroupIdentifier is required",
4681 )
4682 })?
4683 .to_string();
4684 let deletion_protection = body["deletionProtectionEnabled"].as_bool().unwrap_or(true);
4685
4686 let group_name = if log_group_id.starts_with("arn:") {
4687 extract_log_group_from_arn(&log_group_id).ok_or_else(|| {
4688 AwsServiceError::aws_error(
4689 StatusCode::BAD_REQUEST,
4690 "InvalidParameterException",
4691 format!("Invalid ARN: {log_group_id}"),
4692 )
4693 })?
4694 } else {
4695 log_group_id
4696 };
4697
4698 let mut state = self.state.write();
4699 let group = state.log_groups.get_mut(&group_name).ok_or_else(|| {
4700 AwsServiceError::aws_error(
4701 StatusCode::BAD_REQUEST,
4702 "ResourceNotFoundException",
4703 format!("The specified log group does not exist: {group_name}"),
4704 )
4705 })?;
4706
4707 group.deletion_protection = deletion_protection;
4708 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4709 }
4710
4711 fn get_log_record(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4712 let body = body_json(req);
4713 let _log_record_pointer = body["logRecordPointer"].as_str().ok_or_else(|| {
4714 AwsServiceError::aws_error(
4715 StatusCode::BAD_REQUEST,
4716 "InvalidParameterException",
4717 "logRecordPointer is required",
4718 )
4719 })?;
4720
4721 Ok(AwsResponse::json(
4723 StatusCode::OK,
4724 serde_json::to_string(&json!({ "logRecord": {} })).unwrap(),
4725 ))
4726 }
4727
4728 fn list_anomalies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4729 let body = body_json(req);
4730 validate_optional_string_length(
4731 "anomalyDetectorArn",
4732 body["anomalyDetectorArn"].as_str(),
4733 1,
4734 2048,
4735 )?;
4736 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4737 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4738 validate_optional_enum_value(
4739 "suppressionState",
4740 &body["suppressionState"],
4741 &["SUPPRESSED", "UNSUPPRESSED"],
4742 )?;
4743 Ok(AwsResponse::json(
4745 StatusCode::OK,
4746 serde_json::to_string(&json!({ "anomalies": [] })).unwrap(),
4747 ))
4748 }
4749
4750 fn update_anomaly(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4751 let body = body_json(req);
4752 validate_required("anomalyDetectorArn", &body["anomalyDetectorArn"])?;
4753 validate_optional_string_length(
4754 "anomalyDetectorArn",
4755 body["anomalyDetectorArn"].as_str(),
4756 1,
4757 2048,
4758 )?;
4759 validate_optional_string_length("anomalyId", body["anomalyId"].as_str(), 36, 36)?;
4760 validate_optional_string_length("patternId", body["patternId"].as_str(), 32, 32)?;
4761 validate_optional_enum_value(
4762 "suppressionType",
4763 &body["suppressionType"],
4764 &["LIMITED", "INFINITE"],
4765 )?;
4766 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4768 }
4769
4770 fn create_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4773 let body = body_json(req);
4774 let import_source_arn = require_str(&body, "importSourceArn")?;
4775 let import_role_arn = require_str(&body, "importRoleArn")?;
4776 validate_string_length("importRoleArn", import_role_arn, 1, 2048)?;
4777 let log_group_name = body["logGroupName"].as_str().map(|s| s.to_string());
4778
4779 let import_id = uuid::Uuid::new_v4().to_string();
4780 let now = Utc::now().timestamp_millis();
4781
4782 let task = ImportTask {
4783 import_id: import_id.clone(),
4784 import_source_arn: import_source_arn.to_string(),
4785 import_role_arn: import_role_arn.to_string(),
4786 log_group_name,
4787 status: "RUNNING".to_string(),
4788 creation_time: now,
4789 };
4790
4791 let mut state = self.state.write();
4792 state.import_tasks.insert(import_id.clone(), task);
4793
4794 Ok(AwsResponse::json(
4795 StatusCode::OK,
4796 serde_json::to_string(&json!({ "importId": import_id })).unwrap(),
4797 ))
4798 }
4799
4800 fn describe_import_tasks(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4801 let body = body_json(req);
4802 validate_optional_string_length("importId", body["importId"].as_str(), 1, 256)?;
4803 validate_optional_enum_value(
4804 "importStatus",
4805 &body["importStatus"],
4806 &["IN_PROGRESS", "CANCELLED", "COMPLETED", "FAILED"],
4807 )?;
4808 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4809 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4810
4811 let state = self.state.read();
4812 let tasks: Vec<Value> = state
4813 .import_tasks
4814 .values()
4815 .map(|t| {
4816 json!({
4817 "importId": t.import_id,
4818 "importSourceArn": t.import_source_arn,
4819 "importStatus": t.status,
4820 "creationTime": t.creation_time,
4821 })
4822 })
4823 .collect();
4824 Ok(AwsResponse::json(
4825 StatusCode::OK,
4826 serde_json::to_string(&json!({ "imports": tasks })).unwrap(),
4827 ))
4828 }
4829
4830 fn describe_import_task_batches(
4831 &self,
4832 req: &AwsRequest,
4833 ) -> Result<AwsResponse, AwsServiceError> {
4834 let body = body_json(req);
4835 let import_id = require_str(&body, "importId")?;
4836 validate_string_length("importId", import_id, 1, 256)?;
4837 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
4838 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
4839 Ok(AwsResponse::json(
4841 StatusCode::OK,
4842 serde_json::to_string(&json!({ "importBatches": [] })).unwrap(),
4843 ))
4844 }
4845
4846 fn cancel_import_task(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4847 let body = body_json(req);
4848 let import_id = require_str(&body, "importId")?;
4849
4850 let mut state = self.state.write();
4851 match state.import_tasks.get_mut(import_id) {
4852 Some(task) => {
4853 task.status = "CANCELLED".to_string();
4854 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4855 }
4856 None => Err(AwsServiceError::aws_error(
4857 StatusCode::NOT_FOUND,
4858 "ResourceNotFoundException",
4859 format!("Import task not found: {import_id}"),
4860 )),
4861 }
4862 }
4863
4864 fn put_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4867 let body = body_json(req);
4868 validate_required("resourceConfig", &body["resourceConfig"])?;
4869 let integration_name = require_str(&body, "integrationName")?;
4870 validate_string_length("integrationName", integration_name, 1, 50)?;
4871 let integration_type = require_str(&body, "integrationType")?;
4872 validate_enum("integrationType", integration_type, &["OPENSEARCH"])?;
4873 let resource_config = body["resourceConfig"].clone();
4874
4875 let now = Utc::now().timestamp_millis();
4876 let integration = Integration {
4877 integration_name: integration_name.to_string(),
4878 integration_type: integration_type.to_string(),
4879 resource_config,
4880 status: "ACTIVE".to_string(),
4881 creation_time: now,
4882 };
4883
4884 let mut state = self.state.write();
4885 state
4886 .integrations
4887 .insert(integration_name.to_string(), integration);
4888
4889 Ok(AwsResponse::json(
4890 StatusCode::OK,
4891 serde_json::to_string(&json!({
4892 "integrationName": integration_name,
4893 "integrationStatus": "ACTIVE"
4894 }))
4895 .unwrap(),
4896 ))
4897 }
4898
4899 fn get_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4900 let body = body_json(req);
4901 let integration_name = require_str(&body, "integrationName")?;
4902
4903 let state = self.state.read();
4904 match state.integrations.get(integration_name) {
4905 Some(i) => Ok(AwsResponse::json(
4906 StatusCode::OK,
4907 serde_json::to_string(&json!({
4908 "integrationName": i.integration_name,
4909 "integrationType": i.integration_type,
4910 "integrationStatus": i.status,
4911 }))
4912 .unwrap(),
4913 )),
4914 None => Err(AwsServiceError::aws_error(
4915 StatusCode::NOT_FOUND,
4916 "ResourceNotFoundException",
4917 format!("Integration not found: {integration_name}"),
4918 )),
4919 }
4920 }
4921
4922 fn delete_integration(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4923 let body = body_json(req);
4924 let integration_name = require_str(&body, "integrationName")?;
4925 validate_string_length("integrationName", integration_name, 1, 50)?;
4926
4927 let mut state = self.state.write();
4928 state.integrations.remove(integration_name);
4929 Ok(AwsResponse::json(StatusCode::OK, "{}"))
4930 }
4931
4932 fn list_integrations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4933 let body = body_json(req);
4934 validate_optional_string_length(
4935 "integrationNamePrefix",
4936 body["integrationNamePrefix"].as_str(),
4937 1,
4938 50,
4939 )?;
4940 validate_optional_enum_value("integrationType", &body["integrationType"], &["OPENSEARCH"])?;
4941 validate_optional_enum_value(
4942 "integrationStatus",
4943 &body["integrationStatus"],
4944 &["PROVISIONING", "ACTIVE", "FAILED"],
4945 )?;
4946
4947 let state = self.state.read();
4948 let integrations: Vec<Value> = state
4949 .integrations
4950 .values()
4951 .map(|i| {
4952 json!({
4953 "integrationName": i.integration_name,
4954 "integrationType": i.integration_type,
4955 "integrationStatus": i.status,
4956 })
4957 })
4958 .collect();
4959 Ok(AwsResponse::json(
4960 StatusCode::OK,
4961 serde_json::to_string(&json!({ "integrationSummaries": integrations })).unwrap(),
4962 ))
4963 }
4964
4965 fn create_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
4968 let body = body_json(req);
4969 let lookup_table_name = require_str(&body, "lookupTableName")?;
4970 validate_string_length("lookupTableName", lookup_table_name, 1, 256)?;
4971 let table_body = require_str(&body, "tableBody")?;
4972 validate_string_length("tableBody", table_body, 1, 10485760)?;
4973 validate_optional_string_length("description", body["description"].as_str(), 0, 1024)?;
4974 validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
4975
4976 let state_r = self.state.read();
4977 let account_id = state_r.account_id.clone();
4978 let region = state_r.region.clone();
4979 drop(state_r);
4980
4981 let arn = format!("arn:aws:logs:{region}:{account_id}:lookup-table:{lookup_table_name}");
4982 let now = Utc::now().timestamp_millis();
4983
4984 let table = LookupTable {
4985 lookup_table_name: lookup_table_name.to_string(),
4986 arn: arn.clone(),
4987 table_body: table_body.to_string(),
4988 creation_time: now,
4989 last_modified_time: now,
4990 };
4991
4992 let mut state = self.state.write();
4993 state.lookup_tables.insert(arn.clone(), table);
4994
4995 Ok(AwsResponse::json(
4996 StatusCode::OK,
4997 serde_json::to_string(&json!({ "lookupTableArn": arn })).unwrap(),
4998 ))
4999 }
5000
5001 fn get_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5002 let body = body_json(req);
5003 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5004
5005 let state = self.state.read();
5006 match state.lookup_tables.get(lookup_table_arn) {
5007 Some(t) => Ok(AwsResponse::json(
5008 StatusCode::OK,
5009 serde_json::to_string(&json!({
5010 "lookupTableName": t.lookup_table_name,
5011 "lookupTableArn": t.arn,
5012 "tableBody": t.table_body,
5013 "creationTime": t.creation_time,
5014 "lastModifiedTime": t.last_modified_time,
5015 }))
5016 .unwrap(),
5017 )),
5018 None => Err(AwsServiceError::aws_error(
5019 StatusCode::NOT_FOUND,
5020 "ResourceNotFoundException",
5021 format!("Lookup table not found: {lookup_table_arn}"),
5022 )),
5023 }
5024 }
5025
5026 fn describe_lookup_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5027 let body = body_json(req);
5028 validate_optional_string_length(
5029 "lookupTableNamePrefix",
5030 body["lookupTableNamePrefix"].as_str(),
5031 1,
5032 256,
5033 )?;
5034 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 100)?;
5035 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5036
5037 let state = self.state.read();
5038 let tables: Vec<Value> = state
5039 .lookup_tables
5040 .values()
5041 .map(|t| {
5042 json!({
5043 "lookupTableName": t.lookup_table_name,
5044 "lookupTableArn": t.arn,
5045 })
5046 })
5047 .collect();
5048 Ok(AwsResponse::json(
5049 StatusCode::OK,
5050 serde_json::to_string(&json!({ "lookupTables": tables })).unwrap(),
5051 ))
5052 }
5053
5054 fn delete_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5055 let body = body_json(req);
5056 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5057
5058 let mut state = self.state.write();
5059 state.lookup_tables.remove(lookup_table_arn);
5060 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5061 }
5062
5063 fn update_lookup_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5064 let body = body_json(req);
5065 let lookup_table_arn = require_str(&body, "lookupTableArn")?;
5066 let table_body = require_str(&body, "tableBody")?;
5067
5068 let mut state = self.state.write();
5069 match state.lookup_tables.get_mut(lookup_table_arn) {
5070 Some(t) => {
5071 t.table_body = table_body.to_string();
5072 t.last_modified_time = Utc::now().timestamp_millis();
5073 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5074 }
5075 None => Err(AwsServiceError::aws_error(
5076 StatusCode::NOT_FOUND,
5077 "ResourceNotFoundException",
5078 format!("Lookup table not found: {lookup_table_arn}"),
5079 )),
5080 }
5081 }
5082
5083 fn create_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5086 let body = body_json(req);
5087 let name = require_str(&body, "name")?;
5088 validate_string_length("name", name, 1, 255)?;
5089 validate_optional_string_length("description", body["description"].as_str(), 0, 1024)?;
5090 let query_string = require_str(&body, "queryString")?;
5091 validate_string_length("queryString", query_string, 0, 10000)?;
5092 let query_language = require_str(&body, "queryLanguage")?;
5093 validate_enum("queryLanguage", query_language, &["CWLI", "SQL", "PPL"])?;
5094 let schedule_expression = require_str(&body, "scheduleExpression")?;
5095 validate_string_length("scheduleExpression", schedule_expression, 0, 256)?;
5096 let execution_role_arn = require_str(&body, "executionRoleArn")?;
5097 validate_string_length("executionRoleArn", execution_role_arn, 1, 2048)?;
5098 validate_optional_string_length("timezone", body["timezone"].as_str(), 1, 2048)?;
5099 validate_optional_range_i64(
5100 "scheduleStartTime",
5101 body["scheduleStartTime"].as_i64(),
5102 0,
5103 i64::MAX,
5104 )?;
5105 validate_optional_range_i64(
5106 "scheduleEndTime",
5107 body["scheduleEndTime"].as_i64(),
5108 0,
5109 i64::MAX,
5110 )?;
5111 validate_optional_enum_value("state", &body["state"], &["ENABLED", "DISABLED"])?;
5112
5113 let state_r = self.state.read();
5114 let account_id = state_r.account_id.clone();
5115 let region = state_r.region.clone();
5116 drop(state_r);
5117
5118 let arn = format!("arn:aws:logs:{region}:{account_id}:scheduled-query:{name}");
5119 let now = Utc::now().timestamp_millis();
5120
5121 let sq = ScheduledQuery {
5122 name: name.to_string(),
5123 arn: arn.clone(),
5124 query_string: query_string.to_string(),
5125 query_language: query_language.to_string(),
5126 schedule_expression: schedule_expression.to_string(),
5127 execution_role_arn: execution_role_arn.to_string(),
5128 status: "ACTIVE".to_string(),
5129 creation_time: now,
5130 last_modified_time: now,
5131 };
5132
5133 let mut state = self.state.write();
5134 state.scheduled_queries.insert(arn.clone(), sq);
5135
5136 Ok(AwsResponse::json(
5137 StatusCode::OK,
5138 serde_json::to_string(&json!({ "scheduledQueryArn": arn })).unwrap(),
5139 ))
5140 }
5141
5142 fn get_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5143 let body = body_json(req);
5144 let identifier = require_str(&body, "identifier")?;
5145
5146 let state = self.state.read();
5147 match state.scheduled_queries.get(identifier) {
5148 Some(sq) => Ok(AwsResponse::json(
5149 StatusCode::OK,
5150 serde_json::to_string(&json!({
5151 "scheduledQueryArn": sq.arn,
5152 "name": sq.name,
5153 "queryString": sq.query_string,
5154 "queryLanguage": sq.query_language,
5155 "scheduleExpression": sq.schedule_expression,
5156 "executionRoleArn": sq.execution_role_arn,
5157 }))
5158 .unwrap(),
5159 )),
5160 None => Err(AwsServiceError::aws_error(
5161 StatusCode::NOT_FOUND,
5162 "ResourceNotFoundException",
5163 format!("Scheduled query not found: {identifier}"),
5164 )),
5165 }
5166 }
5167
5168 fn get_scheduled_query_history(
5169 &self,
5170 req: &AwsRequest,
5171 ) -> Result<AwsResponse, AwsServiceError> {
5172 let body = body_json(req);
5173 let _identifier = require_str(&body, "identifier")?;
5174 validate_required("startTime", &body["startTime"])?;
5175 validate_required("endTime", &body["endTime"])?;
5176 validate_optional_range_i64("startTime", body["startTime"].as_i64(), 0, i64::MAX)?;
5177 validate_optional_range_i64("endTime", body["endTime"].as_i64(), 0, i64::MAX)?;
5178 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
5179 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5180 Ok(AwsResponse::json(
5182 StatusCode::OK,
5183 serde_json::to_string(&json!({ "triggerHistory": [] })).unwrap(),
5184 ))
5185 }
5186
5187 fn list_scheduled_queries(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5188 let body = body_json(req);
5189 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 1000)?;
5190 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5191 validate_optional_enum_value("state", &body["state"], &["ENABLED", "DISABLED"])?;
5192
5193 let state = self.state.read();
5194 let queries: Vec<Value> = state
5195 .scheduled_queries
5196 .values()
5197 .map(|sq| {
5198 json!({
5199 "name": sq.name,
5200 "scheduledQueryArn": sq.arn,
5201 })
5202 })
5203 .collect();
5204 Ok(AwsResponse::json(
5205 StatusCode::OK,
5206 serde_json::to_string(&json!({ "scheduledQueries": queries })).unwrap(),
5207 ))
5208 }
5209
5210 fn delete_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5211 let body = body_json(req);
5212 let identifier = require_str(&body, "identifier")?;
5213
5214 let mut state = self.state.write();
5215 state.scheduled_queries.remove(identifier);
5216 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5217 }
5218
5219 fn update_scheduled_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5220 let body = body_json(req);
5221 let identifier = require_str(&body, "identifier")?;
5222 let query_string = require_str(&body, "queryString")?;
5223 let query_language = require_str(&body, "queryLanguage")?;
5224 let schedule_expression = require_str(&body, "scheduleExpression")?;
5225 let execution_role_arn = require_str(&body, "executionRoleArn")?;
5226
5227 let mut state = self.state.write();
5228 match state.scheduled_queries.get_mut(identifier) {
5229 Some(sq) => {
5230 sq.query_string = query_string.to_string();
5231 sq.query_language = query_language.to_string();
5232 sq.schedule_expression = schedule_expression.to_string();
5233 sq.execution_role_arn = execution_role_arn.to_string();
5234 sq.last_modified_time = Utc::now().timestamp_millis();
5235 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5236 }
5237 None => Err(AwsServiceError::aws_error(
5238 StatusCode::NOT_FOUND,
5239 "ResourceNotFoundException",
5240 format!("Scheduled query not found: {identifier}"),
5241 )),
5242 }
5243 }
5244
5245 fn start_live_tail(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5248 let body = body_json(req);
5249 validate_required("logGroupIdentifiers", &body["logGroupIdentifiers"])?;
5250 validate_optional_string_length(
5251 "logEventFilterPattern",
5252 body["logEventFilterPattern"].as_str(),
5253 0,
5254 1024,
5255 )?;
5256 let session_id = uuid::Uuid::new_v4().to_string();
5257 Ok(AwsResponse::json(
5258 StatusCode::OK,
5259 serde_json::to_string(&json!({
5260 "responseStream": {
5261 "sessionStart": {
5262 "sessionId": session_id,
5263 "logGroupIdentifiers": [],
5264 }
5265 }
5266 }))
5267 .unwrap(),
5268 ))
5269 }
5270
5271 fn list_log_groups_for_query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5272 let body = body_json(req);
5273 let query_id = require_str(&body, "queryId")?;
5274 validate_string_length("queryId", query_id, 1, 256)?;
5275 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 50, 500)?;
5276 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5277 Ok(AwsResponse::json(
5279 StatusCode::OK,
5280 serde_json::to_string(&json!({ "logGroupIdentifiers": [] })).unwrap(),
5281 ))
5282 }
5283
5284 fn list_aggregate_log_group_summaries(
5285 &self,
5286 req: &AwsRequest,
5287 ) -> Result<AwsResponse, AwsServiceError> {
5288 let body = body_json(req);
5289 validate_required("groupBy", &body["groupBy"])?;
5290 validate_optional_enum_value(
5291 "groupBy",
5292 &body["groupBy"],
5293 &[
5294 "DATA_SOURCE_NAME_TYPE_AND_FORMAT",
5295 "DATA_SOURCE_NAME_AND_TYPE",
5296 ],
5297 )?;
5298 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
5299 validate_optional_enum_value(
5300 "logGroupClass",
5301 &body["logGroupClass"],
5302 &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
5303 )?;
5304 validate_optional_string_length(
5305 "logGroupNamePattern",
5306 body["logGroupNamePattern"].as_str(),
5307 3,
5308 129,
5309 )?;
5310 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5311 Ok(AwsResponse::json(
5313 StatusCode::OK,
5314 serde_json::to_string(&json!({ "aggregateLogGroupSummaries": [] })).unwrap(),
5315 ))
5316 }
5317
5318 fn list_log_groups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5319 let body = body_json(req);
5320 let prefix = body["logGroupNamePrefix"].as_str().unwrap_or("");
5321 let pattern = body["logGroupNamePattern"].as_str().unwrap_or("");
5322 let limit = body["limit"].as_i64().unwrap_or(50) as usize;
5323 let next_token = body["nextToken"].as_str();
5324
5325 validate_optional_string_length(
5326 "logGroupNamePrefix",
5327 body["logGroupNamePrefix"].as_str(),
5328 1,
5329 512,
5330 )?;
5331 validate_optional_string_length(
5332 "logGroupNamePattern",
5333 body["logGroupNamePattern"].as_str(),
5334 3,
5335 129,
5336 )?;
5337 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 1000)?;
5338 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5339 validate_optional_enum_value(
5340 "logGroupClass",
5341 &body["logGroupClass"],
5342 &["STANDARD", "INFREQUENT_ACCESS", "DELIVERY"],
5343 )?;
5344
5345 let state = self.state.read();
5346 let mut groups: Vec<&LogGroup> = state
5347 .log_groups
5348 .values()
5349 .filter(|g| {
5350 (prefix.is_empty() || g.name.starts_with(prefix))
5351 && (pattern.is_empty() || g.name.contains(pattern))
5352 })
5353 .collect();
5354 groups.sort_by(|a, b| a.name.cmp(&b.name));
5355
5356 let start_idx = if let Some(token) = next_token {
5357 groups
5358 .iter()
5359 .position(|g| g.name.as_str() > token)
5360 .unwrap_or(groups.len())
5361 } else {
5362 0
5363 };
5364
5365 let page = &groups[start_idx..];
5366 let has_more = page.len() > limit;
5367 let page = if has_more { &page[..limit] } else { page };
5368
5369 let log_groups: Vec<Value> = page
5371 .iter()
5372 .map(|g| {
5373 let log_group_arn = g.arn.trim_end_matches(":*").to_string();
5374 json!({
5375 "logGroupName": g.name,
5376 "logGroupArn": log_group_arn,
5377 "logGroupClass": "STANDARD",
5378 })
5379 })
5380 .collect();
5381
5382 let mut result = json!({ "logGroups": log_groups });
5383 if has_more {
5384 if let Some(last) = page.last() {
5385 result["nextToken"] = json!(last.name);
5386 }
5387 }
5388
5389 Ok(AwsResponse::json(
5390 StatusCode::OK,
5391 serde_json::to_string(&result).unwrap(),
5392 ))
5393 }
5394
5395 fn put_bearer_token_authentication(
5396 &self,
5397 req: &AwsRequest,
5398 ) -> Result<AwsResponse, AwsServiceError> {
5399 let body = body_json(req);
5400 validate_required(
5401 "bearerTokenAuthenticationEnabled",
5402 &body["bearerTokenAuthenticationEnabled"],
5403 )?;
5404 let log_group_identifier = require_str(&body, "logGroupIdentifier")?;
5405 validate_string_length("logGroupIdentifier", log_group_identifier, 1, 2048)?;
5406 let enabled = body["bearerTokenAuthenticationEnabled"]
5407 .as_bool()
5408 .unwrap_or(false);
5409
5410 let mut state = self.state.write();
5411 state
5412 .bearer_token_auth
5413 .insert(log_group_identifier.to_string(), enabled);
5414 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5415 }
5416
5417 fn get_log_object(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5418 let body = body_json(req);
5419 validate_required("logObjectPointer", &body["logObjectPointer"])?;
5420 validate_optional_string_length(
5421 "logObjectPointer",
5422 body["logObjectPointer"].as_str(),
5423 1,
5424 512,
5425 )?;
5426 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5428 }
5429
5430 fn get_log_fields(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
5431 let body = body_json(req);
5432 validate_required("dataSourceName", &body["dataSourceName"])?;
5433 validate_required("dataSourceType", &body["dataSourceType"])?;
5434 Ok(AwsResponse::json(
5436 StatusCode::OK,
5437 serde_json::to_string(&json!({ "logFields": [] })).unwrap(),
5438 ))
5439 }
5440
5441 fn associate_source_to_s3_table_integration(
5442 &self,
5443 req: &AwsRequest,
5444 ) -> Result<AwsResponse, AwsServiceError> {
5445 let body = body_json(req);
5446 validate_required("dataSource", &body["dataSource"])?;
5447 let integration_arn = require_str(&body, "integrationArn")?;
5448 let data_source = body["dataSource"].clone();
5449 let source_id = data_source
5450 .as_object()
5451 .and_then(|o| o.get("resourceArn"))
5452 .and_then(|v| v.as_str())
5453 .unwrap_or("unknown")
5454 .to_string();
5455
5456 let mut state = self.state.write();
5457 state
5458 .s3_table_sources
5459 .entry(integration_arn.to_string())
5460 .or_default()
5461 .push(source_id);
5462 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5463 }
5464
5465 fn list_sources_for_s3_table_integration(
5466 &self,
5467 req: &AwsRequest,
5468 ) -> Result<AwsResponse, AwsServiceError> {
5469 let body = body_json(req);
5470 let integration_arn = require_str(&body, "integrationArn")?;
5471 validate_optional_range_i64("maxResults", body["maxResults"].as_i64(), 1, 100)?;
5472 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5473
5474 let state = self.state.read();
5475 let sources: Vec<Value> = state
5476 .s3_table_sources
5477 .get(integration_arn)
5478 .map(|sources| {
5479 sources
5480 .iter()
5481 .map(|s| {
5482 json!({
5483 "identifier": s,
5484 "status": "ACTIVE",
5485 })
5486 })
5487 .collect()
5488 })
5489 .unwrap_or_default();
5490 Ok(AwsResponse::json(
5491 StatusCode::OK,
5492 serde_json::to_string(&json!({ "sources": sources })).unwrap(),
5493 ))
5494 }
5495
5496 fn disassociate_source_from_s3_table_integration(
5497 &self,
5498 req: &AwsRequest,
5499 ) -> Result<AwsResponse, AwsServiceError> {
5500 let body = body_json(req);
5501 let identifier = require_str(&body, "identifier")?;
5502 validate_string_length("identifier", identifier, 1, 2048)?;
5503 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5505 }
5506
5507 fn update_delivery_configuration(
5508 &self,
5509 req: &AwsRequest,
5510 ) -> Result<AwsResponse, AwsServiceError> {
5511 let body = body_json(req);
5512 let id = require_str(&body, "id")?;
5513
5514 let state = self.state.read();
5515 if !state.deliveries.contains_key(id) {
5516 return Err(AwsServiceError::aws_error(
5517 StatusCode::NOT_FOUND,
5518 "ResourceNotFoundException",
5519 format!("Delivery not found: {id}"),
5520 ));
5521 }
5522 drop(state);
5523
5524 Ok(AwsResponse::json(StatusCode::OK, "{}"))
5526 }
5527
5528 fn describe_configuration_templates(
5529 &self,
5530 req: &AwsRequest,
5531 ) -> Result<AwsResponse, AwsServiceError> {
5532 let body = body_json(req);
5533 validate_optional_string_length("service", body["service"].as_str(), 1, 255)?;
5534 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
5535 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
5536 Ok(AwsResponse::json(
5538 StatusCode::OK,
5539 serde_json::to_string(&json!({ "configurationTemplates": [] })).unwrap(),
5540 ))
5541 }
5542}
5543
5544fn resolve_log_group_name(
5547 log_group_name: Option<&str>,
5548 resource_identifier: Option<&str>,
5549) -> Result<String, AwsServiceError> {
5550 if let Some(identifier) = resource_identifier {
5551 if identifier.starts_with("arn:") {
5552 extract_log_group_from_arn(identifier).ok_or_else(|| {
5553 AwsServiceError::aws_error(
5554 StatusCode::BAD_REQUEST,
5555 "InvalidParameterException",
5556 format!("Invalid ARN: {identifier}"),
5557 )
5558 })
5559 } else {
5560 Ok(identifier.to_string())
5561 }
5562 } else if let Some(name) = log_group_name {
5563 Ok(name.to_string())
5564 } else {
5565 Err(AwsServiceError::aws_error(
5566 StatusCode::BAD_REQUEST,
5567 "InvalidParameterException",
5568 "Either logGroupName or resourceIdentifier is required",
5569 ))
5570 }
5571}
5572
5573fn extract_log_group_from_arn(arn: &str) -> Option<String> {
5575 let parts: Vec<&str> = arn.splitn(7, ':').collect();
5577 if parts.len() >= 7 && parts[5] == "log-group" {
5578 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
5579 Some(name.to_string())
5580 } else {
5581 None
5582 }
5583}
5584
5585fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
5593 let pattern = pattern.trim();
5594
5595 if pattern.is_empty() {
5597 return true;
5598 }
5599
5600 if pattern.starts_with('{') || pattern.starts_with('[') {
5602 return true;
5603 }
5604
5605 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
5607 let inner = &pattern[1..pattern.len() - 1];
5608 return message.contains(inner);
5609 }
5610
5611 let words: Vec<&str> = pattern.split_whitespace().collect();
5613 words.iter().all(|word| message.contains(word))
5614}
5615
5616#[cfg(test)]
5617mod tests {
5618 use super::*;
5619 use crate::state::LogsState;
5620 use bytes::Bytes;
5621 use http::{HeaderMap, Method};
5622 use std::collections::HashMap;
5623 use std::sync::Arc;
5624
5625 fn make_service() -> LogsService {
5626 let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
5627 "123456789012",
5628 "us-east-1",
5629 )));
5630 LogsService::new(state)
5631 }
5632
5633 fn make_request(action: &str, body: Value) -> AwsRequest {
5634 AwsRequest {
5635 service: "logs".to_string(),
5636 action: action.to_string(),
5637 region: "us-east-1".to_string(),
5638 account_id: "123456789012".to_string(),
5639 request_id: "test-request-id".to_string(),
5640 headers: HeaderMap::new(),
5641 query_params: HashMap::new(),
5642 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
5643 path_segments: vec![],
5644 raw_path: "/".to_string(),
5645 method: Method::POST,
5646 is_query_protocol: false,
5647 access_key_id: None,
5648 }
5649 }
5650
5651 fn create_group(svc: &LogsService, name: &str) {
5652 let req = make_request("CreateLogGroup", json!({ "logGroupName": name }));
5653 svc.create_log_group(&req).unwrap();
5654 }
5655
5656 fn create_stream(svc: &LogsService, group: &str, stream: &str) {
5657 let req = make_request(
5658 "CreateLogStream",
5659 json!({ "logGroupName": group, "logStreamName": stream }),
5660 );
5661 svc.create_log_stream(&req).unwrap();
5662 }
5663
5664 fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
5665 let now = chrono::Utc::now().timestamp_millis();
5666 let events: Vec<Value> = messages
5667 .iter()
5668 .enumerate()
5669 .map(|(i, msg)| json!({ "timestamp": now + i as i64, "message": msg }))
5670 .collect();
5671 let req = make_request(
5672 "PutLogEvents",
5673 json!({
5674 "logGroupName": group,
5675 "logStreamName": stream,
5676 "logEvents": events,
5677 }),
5678 );
5679 svc.put_log_events(&req).unwrap();
5680 }
5681
5682 #[test]
5685 fn describe_log_groups_pattern_filters_by_substring() {
5686 let svc = make_service();
5687 create_group(&svc, "/app/web");
5688 create_group(&svc, "/app/api");
5689 create_group(&svc, "/system/metrics");
5690
5691 let req = make_request("DescribeLogGroups", json!({ "logGroupNamePattern": "app" }));
5692 let resp = svc.describe_log_groups(&req).unwrap();
5693 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5694 let names: Vec<&str> = body["logGroups"]
5695 .as_array()
5696 .unwrap()
5697 .iter()
5698 .map(|g| g["logGroupName"].as_str().unwrap())
5699 .collect();
5700 assert_eq!(names.len(), 2);
5701 assert!(names.contains(&"/app/web"));
5702 assert!(names.contains(&"/app/api"));
5703 }
5704
5705 #[test]
5706 fn describe_log_groups_pattern_empty_returns_all() {
5707 let svc = make_service();
5708 create_group(&svc, "/app/web");
5709 create_group(&svc, "/system/metrics");
5710
5711 let req = make_request("DescribeLogGroups", json!({}));
5712 let resp = svc.describe_log_groups(&req).unwrap();
5713 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5714 assert_eq!(body["logGroups"].as_array().unwrap().len(), 2);
5715 }
5716
5717 #[test]
5720 fn filter_log_events_uses_log_group_identifier_as_name() {
5721 let svc = make_service();
5722 create_group(&svc, "my-group");
5723 create_stream(&svc, "my-group", "stream-1");
5724 put_events(&svc, "my-group", "stream-1", &["hello"]);
5725
5726 let req = make_request(
5727 "FilterLogEvents",
5728 json!({ "logGroupIdentifier": "my-group" }),
5729 );
5730 let resp = svc.filter_log_events(&req).unwrap();
5731 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5732 assert_eq!(body["events"].as_array().unwrap().len(), 1);
5733 }
5734
5735 #[test]
5736 fn filter_log_events_uses_log_group_identifier_as_arn() {
5737 let svc = make_service();
5738 create_group(&svc, "my-group");
5739 create_stream(&svc, "my-group", "stream-1");
5740 put_events(&svc, "my-group", "stream-1", &["hello"]);
5741
5742 let req = make_request(
5743 "FilterLogEvents",
5744 json!({ "logGroupIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*" }),
5745 );
5746 let resp = svc.filter_log_events(&req).unwrap();
5747 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5748 assert_eq!(body["events"].as_array().unwrap().len(), 1);
5749 }
5750
5751 #[test]
5752 fn filter_log_events_errors_without_group_name_or_identifier() {
5753 let svc = make_service();
5754 let req = make_request("FilterLogEvents", json!({}));
5755 assert!(svc.filter_log_events(&req).is_err());
5756 }
5757
5758 #[test]
5761 fn filter_log_events_filters_by_stream_name_prefix() {
5762 let svc = make_service();
5763 create_group(&svc, "grp");
5764 create_stream(&svc, "grp", "web-1");
5765 create_stream(&svc, "grp", "web-2");
5766 create_stream(&svc, "grp", "api-1");
5767 put_events(&svc, "grp", "web-1", &["a"]);
5768 put_events(&svc, "grp", "web-2", &["b"]);
5769 put_events(&svc, "grp", "api-1", &["c"]);
5770
5771 let req = make_request(
5772 "FilterLogEvents",
5773 json!({ "logGroupName": "grp", "logStreamNamePrefix": "web" }),
5774 );
5775 let resp = svc.filter_log_events(&req).unwrap();
5776 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5777 let events = body["events"].as_array().unwrap();
5778 assert_eq!(events.len(), 2);
5779 for e in events {
5780 assert!(e["logStreamName"].as_str().unwrap().starts_with("web"));
5781 }
5782 }
5783
5784 #[test]
5787 fn create_export_task_stores_task_name_and_stream_prefix() {
5788 let svc = make_service();
5789 create_group(&svc, "grp");
5790
5791 let req = make_request(
5792 "CreateExportTask",
5793 json!({
5794 "logGroupName": "grp",
5795 "from": 0,
5796 "to": 1000,
5797 "destination": "my-bucket",
5798 "taskName": "my-export",
5799 "logStreamNamePrefix": "web-",
5800 }),
5801 );
5802 let resp = svc.create_export_task(&req).unwrap();
5803 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5804 let task_id = body["taskId"].as_str().unwrap();
5805
5806 let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5807 let resp = svc.describe_export_tasks(&req).unwrap();
5808 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809 let task = &body["exportTasks"][0];
5810 assert_eq!(task["taskName"].as_str().unwrap(), "my-export");
5811 assert_eq!(task["logStreamNamePrefix"].as_str().unwrap(), "web-");
5812 }
5813
5814 #[test]
5815 fn create_export_task_omits_optional_fields_when_not_provided() {
5816 let svc = make_service();
5817 create_group(&svc, "grp");
5818
5819 let req = make_request(
5820 "CreateExportTask",
5821 json!({
5822 "logGroupName": "grp",
5823 "from": 0,
5824 "to": 1000,
5825 "destination": "my-bucket",
5826 }),
5827 );
5828 let resp = svc.create_export_task(&req).unwrap();
5829 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5830 let task_id = body["taskId"].as_str().unwrap();
5831
5832 let req = make_request("DescribeExportTasks", json!({ "taskId": task_id }));
5833 let resp = svc.describe_export_tasks(&req).unwrap();
5834 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5835 let task = &body["exportTasks"][0];
5836 assert!(task.get("taskName").is_none() || task["taskName"].is_null());
5837 assert!(task.get("logStreamNamePrefix").is_none() || task["logStreamNamePrefix"].is_null());
5838 }
5839
5840 #[test]
5843 fn associate_kms_key_via_resource_identifier_arn() {
5844 let svc = make_service();
5845 create_group(&svc, "grp");
5846
5847 let req = make_request(
5848 "AssociateKmsKey",
5849 json!({
5850 "resourceIdentifier": "arn:aws:logs:us-east-1:123456789012:log-group:grp:*",
5851 "kmsKeyId": "arn:aws:kms:us-east-1:123456789012:key/abc-123",
5852 }),
5853 );
5854 svc.associate_kms_key(&req).unwrap();
5855
5856 let state = svc.state.read();
5857 assert_eq!(
5858 state.log_groups["grp"].kms_key_id.as_deref(),
5859 Some("arn:aws:kms:us-east-1:123456789012:key/abc-123")
5860 );
5861 }
5862
5863 #[test]
5864 fn disassociate_kms_key_via_resource_identifier_name() {
5865 let svc = make_service();
5866 create_group(&svc, "grp");
5867
5868 let req = make_request(
5870 "AssociateKmsKey",
5871 json!({ "logGroupName": "grp", "kmsKeyId": "some-key" }),
5872 );
5873 svc.associate_kms_key(&req).unwrap();
5874
5875 let req = make_request("DisassociateKmsKey", json!({ "resourceIdentifier": "grp" }));
5877 svc.disassociate_kms_key(&req).unwrap();
5878
5879 let state = svc.state.read();
5880 assert!(state.log_groups["grp"].kms_key_id.is_none());
5881 }
5882
5883 #[test]
5886 fn describe_query_definitions_filters_by_name_prefix() {
5887 let svc = make_service();
5888
5889 for name in &["error-queries-1", "error-queries-2", "latency-queries-1"] {
5891 let req = make_request(
5892 "PutQueryDefinition",
5893 json!({
5894 "name": name,
5895 "queryString": "fields @timestamp | limit 20",
5896 }),
5897 );
5898 svc.put_query_definition(&req).unwrap();
5899 }
5900
5901 let req = make_request(
5902 "DescribeQueryDefinitions",
5903 json!({ "queryDefinitionNamePrefix": "error" }),
5904 );
5905 let resp = svc.describe_query_definitions(&req).unwrap();
5906 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5907 let defs = body["queryDefinitions"].as_array().unwrap();
5908 assert_eq!(defs.len(), 2);
5909 for d in defs {
5910 assert!(d["name"].as_str().unwrap().starts_with("error"));
5911 }
5912 }
5913
5914 #[test]
5915 fn describe_query_definitions_no_prefix_returns_all() {
5916 let svc = make_service();
5917
5918 for name in &["a", "b", "c"] {
5919 let req = make_request(
5920 "PutQueryDefinition",
5921 json!({ "name": name, "queryString": "fields @timestamp" }),
5922 );
5923 svc.put_query_definition(&req).unwrap();
5924 }
5925
5926 let req = make_request("DescribeQueryDefinitions", json!({}));
5927 let resp = svc.describe_query_definitions(&req).unwrap();
5928 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5929 assert_eq!(body["queryDefinitions"].as_array().unwrap().len(), 3);
5930 }
5931
5932 #[test]
5935 fn put_delivery_destination_includes_empty_destination_resource_arn() {
5936 let svc = make_service();
5937 let req = make_request(
5938 "PutDeliveryDestination",
5939 json!({
5940 "name": "my-dest",
5941 "deliveryDestinationConfiguration": {}
5942 }),
5943 );
5944 let resp = svc.put_delivery_destination(&req).unwrap();
5945 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5946 let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5947 assert_eq!(
5949 config["destinationResourceArn"].as_str().unwrap(),
5950 "",
5951 "destinationResourceArn should be an empty string when not set"
5952 );
5953 }
5954
5955 #[test]
5956 fn put_delivery_destination_includes_destination_resource_arn_when_set() {
5957 let svc = make_service();
5958 let req = make_request(
5959 "PutDeliveryDestination",
5960 json!({
5961 "name": "my-dest",
5962 "deliveryDestinationConfiguration": {
5963 "destinationResourceArn": "arn:aws:s3:::my-bucket"
5964 }
5965 }),
5966 );
5967 let resp = svc.put_delivery_destination(&req).unwrap();
5968 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969 let config = &body["deliveryDestination"]["deliveryDestinationConfiguration"];
5970 assert_eq!(
5971 config["destinationResourceArn"].as_str().unwrap(),
5972 "arn:aws:s3:::my-bucket"
5973 );
5974 }
5975
5976 #[test]
5977 fn extract_log_group_from_arn_strips_wildcard_suffix() {
5978 let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group:*";
5979 assert_eq!(
5980 extract_log_group_from_arn(arn),
5981 Some("my-group".to_string())
5982 );
5983 }
5984
5985 #[test]
5986 fn extract_log_group_from_arn_without_wildcard() {
5987 let arn = "arn:aws:logs:us-east-1:123456789012:log-group:my-group";
5988 assert_eq!(
5989 extract_log_group_from_arn(arn),
5990 Some("my-group".to_string())
5991 );
5992 }
5993
5994 #[test]
5995 fn extract_log_group_from_arn_invalid() {
5996 assert_eq!(extract_log_group_from_arn("not-an-arn"), None);
5997 }
5998
5999 #[test]
6002 fn account_policy_lifecycle() {
6003 let svc = make_service();
6004
6005 let req = make_request(
6006 "PutAccountPolicy",
6007 json!({
6008 "policyName": "test-policy",
6009 "policyType": "DATA_PROTECTION_POLICY",
6010 "policyDocument": "{\"Name\":\"test\"}",
6011 }),
6012 );
6013 let resp = svc.put_account_policy(&req).unwrap();
6014 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6015 assert_eq!(body["accountPolicy"]["policyName"], "test-policy");
6016
6017 let req = make_request(
6018 "DescribeAccountPolicies",
6019 json!({ "policyType": "DATA_PROTECTION_POLICY" }),
6020 );
6021 let resp = svc.describe_account_policies(&req).unwrap();
6022 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6023 assert_eq!(body["accountPolicies"].as_array().unwrap().len(), 1);
6024
6025 let req = make_request(
6026 "DeleteAccountPolicy",
6027 json!({
6028 "policyName": "test-policy",
6029 "policyType": "DATA_PROTECTION_POLICY",
6030 }),
6031 );
6032 svc.delete_account_policy(&req).unwrap();
6033
6034 let req = make_request(
6035 "DescribeAccountPolicies",
6036 json!({ "policyType": "DATA_PROTECTION_POLICY" }),
6037 );
6038 let resp = svc.describe_account_policies(&req).unwrap();
6039 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6040 assert!(body["accountPolicies"].as_array().unwrap().is_empty());
6041 }
6042
6043 #[test]
6046 fn data_protection_policy_lifecycle() {
6047 let svc = make_service();
6048 create_group(&svc, "dp-group");
6049
6050 let req = make_request(
6051 "PutDataProtectionPolicy",
6052 json!({
6053 "logGroupIdentifier": "dp-group",
6054 "policyDocument": "{\"Name\":\"dp\"}",
6055 }),
6056 );
6057 svc.put_data_protection_policy(&req).unwrap();
6058
6059 let req = make_request(
6060 "GetDataProtectionPolicy",
6061 json!({ "logGroupIdentifier": "dp-group" }),
6062 );
6063 let resp = svc.get_data_protection_policy(&req).unwrap();
6064 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6065 assert_eq!(body["policyDocument"], "{\"Name\":\"dp\"}");
6066
6067 let req = make_request(
6068 "DeleteDataProtectionPolicy",
6069 json!({ "logGroupIdentifier": "dp-group" }),
6070 );
6071 svc.delete_data_protection_policy(&req).unwrap();
6072
6073 let req = make_request(
6074 "GetDataProtectionPolicy",
6075 json!({ "logGroupIdentifier": "dp-group" }),
6076 );
6077 let resp = svc.get_data_protection_policy(&req).unwrap();
6078 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6079 assert!(body.get("policyDocument").is_none());
6080 }
6081
6082 #[test]
6085 fn index_policy_lifecycle() {
6086 let svc = make_service();
6087 create_group(&svc, "idx-group");
6088
6089 let req = make_request(
6090 "PutIndexPolicy",
6091 json!({
6092 "logGroupIdentifier": "idx-group",
6093 "policyDocument": "{\"Fields\":[\"field1\"]}",
6094 }),
6095 );
6096 svc.put_index_policy(&req).unwrap();
6097
6098 let req = make_request(
6099 "DescribeIndexPolicies",
6100 json!({ "logGroupIdentifiers": ["idx-group"] }),
6101 );
6102 let resp = svc.describe_index_policies(&req).unwrap();
6103 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6104 assert_eq!(body["indexPolicies"].as_array().unwrap().len(), 1);
6105
6106 let req = make_request(
6107 "DeleteIndexPolicy",
6108 json!({
6109 "logGroupIdentifier": "idx-group",
6110 }),
6111 );
6112 svc.delete_index_policy(&req).unwrap();
6113 }
6114
6115 #[test]
6118 fn transformer_lifecycle() {
6119 let svc = make_service();
6120 create_group(&svc, "tx-group");
6121
6122 let req = make_request(
6123 "PutTransformer",
6124 json!({
6125 "logGroupIdentifier": "tx-group",
6126 "transformerConfig": [{"addKeys":{"entries":[{"key":"new","value":"val"}]}}],
6127 }),
6128 );
6129 svc.put_transformer(&req).unwrap();
6130
6131 let req = make_request(
6132 "GetTransformer",
6133 json!({ "logGroupIdentifier": "tx-group" }),
6134 );
6135 let resp = svc.get_transformer(&req).unwrap();
6136 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6137 assert!(body["transformerConfig"].is_array());
6138
6139 let req = make_request(
6140 "DeleteTransformer",
6141 json!({ "logGroupIdentifier": "tx-group" }),
6142 );
6143 svc.delete_transformer(&req).unwrap();
6144 }
6145
6146 #[test]
6147 fn test_transformer_returns_transformed_events() {
6148 let svc = make_service();
6149
6150 let req = make_request(
6151 "TestTransformer",
6152 json!({
6153 "transformerConfig": [{"addKeys":{"entries":[]}}],
6154 "logEventMessages": ["hello", "world"],
6155 }),
6156 );
6157 let resp = svc.test_transformer(&req).unwrap();
6158 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6159 assert_eq!(body["transformedLogs"].as_array().unwrap().len(), 2);
6160 }
6161
6162 #[test]
6165 fn anomaly_detector_lifecycle() {
6166 let svc = make_service();
6167
6168 let req = make_request(
6169 "CreateLogAnomalyDetector",
6170 json!({
6171 "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
6172 "detectorName": "my-detector",
6173 }),
6174 );
6175 let resp = svc.create_log_anomaly_detector(&req).unwrap();
6176 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6177 let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
6178
6179 let req = make_request(
6180 "GetLogAnomalyDetector",
6181 json!({ "anomalyDetectorArn": &arn }),
6182 );
6183 let resp = svc.get_log_anomaly_detector(&req).unwrap();
6184 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6185 assert_eq!(body["detectorName"], "my-detector");
6186
6187 let req = make_request("ListLogAnomalyDetectors", json!({}));
6188 let resp = svc.list_log_anomaly_detectors(&req).unwrap();
6189 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6190 assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
6191
6192 let req = make_request(
6193 "UpdateLogAnomalyDetector",
6194 json!({ "anomalyDetectorArn": &arn, "enabled": false }),
6195 );
6196 svc.update_log_anomaly_detector(&req).unwrap();
6197
6198 let req = make_request(
6199 "DeleteLogAnomalyDetector",
6200 json!({ "anomalyDetectorArn": &arn }),
6201 );
6202 svc.delete_log_anomaly_detector(&req).unwrap();
6203 }
6204
6205 #[test]
6208 fn get_log_group_fields_returns_stub() {
6209 let svc = make_service();
6210 create_group(&svc, "fields-group");
6211
6212 let req = make_request(
6213 "GetLogGroupFields",
6214 json!({ "logGroupName": "fields-group" }),
6215 );
6216 let resp = svc.get_log_group_fields(&req).unwrap();
6217 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218 assert_eq!(body["logGroupFields"].as_array().unwrap().len(), 2);
6219 }
6220
6221 #[test]
6222 fn test_metric_filter_matches() {
6223 let svc = make_service();
6224
6225 let req = make_request(
6226 "TestMetricFilter",
6227 json!({
6228 "filterPattern": "ERROR",
6229 "logEventMessages": ["ERROR: oops", "INFO: ok", "ERROR: again"],
6230 }),
6231 );
6232 let resp = svc.test_metric_filter(&req).unwrap();
6233 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6234 assert_eq!(body["matches"].as_array().unwrap().len(), 2);
6235 }
6236
6237 #[test]
6238 fn stop_query_marks_as_cancelled() {
6239 let svc = make_service();
6240 create_group(&svc, "sq-group");
6241
6242 let req = make_request(
6243 "StartQuery",
6244 json!({
6245 "logGroupName": "sq-group",
6246 "startTime": 0,
6247 "endTime": 9999999999i64,
6248 "queryString": "fields @timestamp",
6249 }),
6250 );
6251 let resp = svc.start_query(&req).unwrap();
6252 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6253 let qid = body["queryId"].as_str().unwrap().to_string();
6254
6255 {
6257 let mut state = svc.state.write();
6258 state.queries.get_mut(&qid).unwrap().status = "Running".to_string();
6259 }
6260
6261 let req = make_request("StopQuery", json!({ "queryId": &qid }));
6262 let resp = svc.stop_query(&req).unwrap();
6263 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6264 assert_eq!(body["success"], true);
6265
6266 let state = svc.state.read();
6267 assert_eq!(state.queries[&qid].status, "Cancelled");
6268 }
6269
6270 #[test]
6271 fn put_log_group_deletion_protection() {
6272 let svc = make_service();
6273 create_group(&svc, "prot-group");
6274
6275 let req = make_request(
6276 "PutLogGroupDeletionProtection",
6277 json!({
6278 "logGroupIdentifier": "prot-group",
6279 "deletionProtectionEnabled": true,
6280 }),
6281 );
6282 svc.put_log_group_deletion_protection(&req).unwrap();
6283
6284 let state = svc.state.read();
6285 assert!(state.log_groups["prot-group"].deletion_protection);
6286 }
6287
6288 #[test]
6289 fn get_log_record_returns_empty_stub() {
6290 let svc = make_service();
6291
6292 let req = make_request(
6293 "GetLogRecord",
6294 json!({ "logRecordPointer": "some-pointer" }),
6295 );
6296 let resp = svc.get_log_record(&req).unwrap();
6297 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6298 assert!(body["logRecord"].is_object());
6299 }
6300
6301 #[test]
6302 fn list_anomalies_returns_empty() {
6303 let svc = make_service();
6304
6305 let req = make_request("ListAnomalies", json!({}));
6306 let resp = svc.list_anomalies(&req).unwrap();
6307 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6308 assert!(body["anomalies"].as_array().unwrap().is_empty());
6309 }
6310
6311 #[test]
6312 fn update_anomaly_noop() {
6313 let svc = make_service();
6314 let req = make_request(
6315 "UpdateAnomaly",
6316 json!({"anomalyDetectorArn": "arn:aws:logs:us-east-1:123456789012:anomaly-detector:test"}),
6317 );
6318 svc.update_anomaly(&req).unwrap();
6319 }
6320
6321 #[test]
6324 fn import_task_lifecycle() {
6325 let svc = make_service();
6326
6327 let req = make_request(
6328 "CreateImportTask",
6329 json!({
6330 "importSourceArn": "arn:aws:s3:::my-bucket/logs",
6331 "importRoleArn": "arn:aws:iam::123456789012:role/import-role"
6332 }),
6333 );
6334 let resp = svc.create_import_task(&req).unwrap();
6335 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6336 let import_id = body["importId"].as_str().unwrap().to_string();
6337
6338 let req = make_request("DescribeImportTasks", json!({}));
6339 let resp = svc.describe_import_tasks(&req).unwrap();
6340 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6341 assert_eq!(body["imports"].as_array().unwrap().len(), 1);
6342
6343 let req = make_request(
6344 "DescribeImportTaskBatches",
6345 json!({ "importId": import_id }),
6346 );
6347 let resp = svc.describe_import_task_batches(&req).unwrap();
6348 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6349 assert!(body["importBatches"].as_array().unwrap().is_empty());
6350
6351 let req = make_request("CancelImportTask", json!({ "importId": import_id }));
6352 svc.cancel_import_task(&req).unwrap();
6353
6354 let req = make_request("DescribeImportTasks", json!({}));
6355 let resp = svc.describe_import_tasks(&req).unwrap();
6356 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6357 assert_eq!(
6358 body["imports"][0]["importStatus"].as_str().unwrap(),
6359 "CANCELLED"
6360 );
6361 }
6362
6363 #[test]
6366 fn integration_lifecycle() {
6367 let svc = make_service();
6368
6369 let req = make_request(
6370 "PutIntegration",
6371 json!({
6372 "integrationName": "test-int",
6373 "integrationType": "OPENSEARCH",
6374 "resourceConfig": { "openSearchResourceConfig": {} }
6375 }),
6376 );
6377 svc.put_integration(&req).unwrap();
6378
6379 let req = make_request("GetIntegration", json!({ "integrationName": "test-int" }));
6380 let resp = svc.get_integration(&req).unwrap();
6381 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6382 assert_eq!(body["integrationName"].as_str().unwrap(), "test-int");
6383
6384 let req = make_request("ListIntegrations", json!({}));
6385 let resp = svc.list_integrations(&req).unwrap();
6386 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6387 assert_eq!(body["integrationSummaries"].as_array().unwrap().len(), 1);
6388
6389 let req = make_request(
6390 "DeleteIntegration",
6391 json!({ "integrationName": "test-int" }),
6392 );
6393 svc.delete_integration(&req).unwrap();
6394
6395 let req = make_request("ListIntegrations", json!({}));
6396 let resp = svc.list_integrations(&req).unwrap();
6397 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6398 assert!(body["integrationSummaries"].as_array().unwrap().is_empty());
6399 }
6400
6401 #[test]
6404 fn lookup_table_lifecycle() {
6405 let svc = make_service();
6406
6407 let req = make_request(
6408 "CreateLookupTable",
6409 json!({
6410 "lookupTableName": "test-table",
6411 "tableBody": "key,value\na,b"
6412 }),
6413 );
6414 let resp = svc.create_lookup_table(&req).unwrap();
6415 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6416 let arn = body["lookupTableArn"].as_str().unwrap().to_string();
6417
6418 let req = make_request("GetLookupTable", json!({ "lookupTableArn": arn }));
6419 let resp = svc.get_lookup_table(&req).unwrap();
6420 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6421 assert_eq!(body["lookupTableName"].as_str().unwrap(), "test-table");
6422
6423 let req = make_request("DescribeLookupTables", json!({}));
6424 let resp = svc.describe_lookup_tables(&req).unwrap();
6425 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6426 assert_eq!(body["lookupTables"].as_array().unwrap().len(), 1);
6427
6428 let req = make_request(
6429 "UpdateLookupTable",
6430 json!({ "lookupTableArn": arn, "tableBody": "key,value\nc,d" }),
6431 );
6432 svc.update_lookup_table(&req).unwrap();
6433
6434 let req = make_request("DeleteLookupTable", json!({ "lookupTableArn": arn }));
6435 svc.delete_lookup_table(&req).unwrap();
6436
6437 let req = make_request("DescribeLookupTables", json!({}));
6438 let resp = svc.describe_lookup_tables(&req).unwrap();
6439 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6440 assert!(body["lookupTables"].as_array().unwrap().is_empty());
6441 }
6442
6443 #[test]
6446 fn scheduled_query_lifecycle() {
6447 let svc = make_service();
6448
6449 let req = make_request(
6450 "CreateScheduledQuery",
6451 json!({
6452 "name": "test-sq",
6453 "queryString": "fields @timestamp | limit 10",
6454 "queryLanguage": "CWLI",
6455 "scheduleExpression": "rate(1 hour)",
6456 "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6457 }),
6458 );
6459 let resp = svc.create_scheduled_query(&req).unwrap();
6460 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6461 let arn = body["scheduledQueryArn"].as_str().unwrap().to_string();
6462
6463 let req = make_request("GetScheduledQuery", json!({ "identifier": arn }));
6464 let resp = svc.get_scheduled_query(&req).unwrap();
6465 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6466 assert_eq!(body["name"].as_str().unwrap(), "test-sq");
6467
6468 let req = make_request(
6469 "GetScheduledQueryHistory",
6470 json!({ "identifier": arn, "startTime": 0_i64, "endTime": 9999999999_i64 }),
6471 );
6472 let resp = svc.get_scheduled_query_history(&req).unwrap();
6473 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6474 assert!(body["triggerHistory"].as_array().unwrap().is_empty());
6475
6476 let req = make_request("ListScheduledQueries", json!({}));
6477 let resp = svc.list_scheduled_queries(&req).unwrap();
6478 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6479 assert_eq!(body["scheduledQueries"].as_array().unwrap().len(), 1);
6480
6481 let req = make_request(
6482 "UpdateScheduledQuery",
6483 json!({
6484 "identifier": arn,
6485 "queryString": "fields @message | limit 5",
6486 "queryLanguage": "CWLI",
6487 "scheduleExpression": "rate(2 hours)",
6488 "executionRoleArn": "arn:aws:iam::123456789012:role/exec"
6489 }),
6490 );
6491 svc.update_scheduled_query(&req).unwrap();
6492
6493 let req = make_request("DeleteScheduledQuery", json!({ "identifier": arn }));
6494 svc.delete_scheduled_query(&req).unwrap();
6495
6496 let req = make_request("ListScheduledQueries", json!({}));
6497 let resp = svc.list_scheduled_queries(&req).unwrap();
6498 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6499 assert!(body["scheduledQueries"].as_array().unwrap().is_empty());
6500 }
6501
6502 #[test]
6505 fn start_live_tail_returns_session() {
6506 let svc = make_service();
6507 let req = make_request(
6508 "StartLiveTail",
6509 json!({ "logGroupIdentifiers": ["/test/group"] }),
6510 );
6511 let resp = svc.start_live_tail(&req).unwrap();
6512 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6513 assert!(body["responseStream"]["sessionStart"]["sessionId"]
6514 .as_str()
6515 .is_some());
6516 }
6517
6518 #[test]
6519 fn list_log_groups_delegates_to_describe() {
6520 let svc = make_service();
6521 create_group(&svc, "/test/list");
6522 let req = make_request("DescribeLogGroups", json!({}));
6523 let resp = svc.describe_log_groups(&req).unwrap();
6524 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6525 assert_eq!(body["logGroups"].as_array().unwrap().len(), 1);
6526 }
6527
6528 #[test]
6529 fn list_log_groups_for_query_returns_empty() {
6530 let svc = make_service();
6531 let req = make_request(
6532 "ListLogGroupsForQuery",
6533 json!({ "queryId": "some-query-id" }),
6534 );
6535 let resp = svc.list_log_groups_for_query(&req).unwrap();
6536 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6537 assert!(body["logGroupIdentifiers"].as_array().unwrap().is_empty());
6538 }
6539
6540 #[test]
6541 fn list_aggregate_log_group_summaries_returns_empty() {
6542 let svc = make_service();
6543 let req = make_request(
6544 "ListAggregateLogGroupSummaries",
6545 json!({ "groupBy": "DATA_SOURCE_NAME_AND_TYPE" }),
6546 );
6547 let resp = svc.list_aggregate_log_group_summaries(&req).unwrap();
6548 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6549 assert!(body["aggregateLogGroupSummaries"]
6550 .as_array()
6551 .unwrap()
6552 .is_empty());
6553 }
6554
6555 #[test]
6556 fn put_bearer_token_authentication_stores_flag() {
6557 let svc = make_service();
6558 create_group(&svc, "/test/bearer");
6559 let req = make_request(
6560 "PutBearerTokenAuthentication",
6561 json!({
6562 "logGroupIdentifier": "/test/bearer",
6563 "bearerTokenAuthenticationEnabled": true
6564 }),
6565 );
6566 svc.put_bearer_token_authentication(&req).unwrap();
6567 }
6568
6569 #[test]
6570 fn get_log_object_returns_stub() {
6571 let svc = make_service();
6572 let req = make_request(
6573 "GetLogObject",
6574 json!({ "logObjectPointer": "some-pointer" }),
6575 );
6576 let resp = svc.get_log_object(&req).unwrap();
6577 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6578 assert!(body.is_object());
6579 }
6580
6581 #[test]
6582 fn get_log_fields_returns_stub() {
6583 let svc = make_service();
6584 let req = make_request(
6585 "GetLogFields",
6586 json!({ "dataSourceName": "test", "dataSourceType": "CW_LOG" }),
6587 );
6588 let resp = svc.get_log_fields(&req).unwrap();
6589 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6590 assert!(body["logFields"].as_array().unwrap().is_empty());
6591 }
6592
6593 #[test]
6594 fn s3_table_integration_stubs() {
6595 let svc = make_service();
6596
6597 let req = make_request(
6598 "AssociateSourceToS3TableIntegration",
6599 json!({
6600 "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test",
6601 "dataSource": { "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:test" }
6602 }),
6603 );
6604 svc.associate_source_to_s3_table_integration(&req).unwrap();
6605
6606 let req = make_request(
6607 "ListSourcesForS3TableIntegration",
6608 json!({
6609 "integrationArn": "arn:aws:logs:us-east-1:123456789012:integration:test"
6610 }),
6611 );
6612 let resp = svc.list_sources_for_s3_table_integration(&req).unwrap();
6613 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6614 assert_eq!(body["sources"].as_array().unwrap().len(), 1);
6615
6616 let req = make_request(
6617 "DisassociateSourceFromS3TableIntegration",
6618 json!({ "identifier": "arn:aws:logs:us-east-1:123456789012:integration:test" }),
6619 );
6620 svc.disassociate_source_from_s3_table_integration(&req)
6621 .unwrap();
6622 }
6623
6624 #[test]
6625 fn update_delivery_configuration_noop() {
6626 let svc = make_service();
6627 let req = make_request(
6629 "PutDeliverySource",
6630 json!({
6631 "name": "test-ds",
6632 "resourceArn": "arn:aws:logs:us-east-1:123456789012:log-group:dummy",
6633 "logType": "APPLICATION_LOGS"
6634 }),
6635 );
6636 svc.put_delivery_source(&req).unwrap();
6637
6638 let req = make_request(
6639 "PutDeliveryDestination",
6640 json!({
6641 "name": "test-dd",
6642 "deliveryDestinationConfiguration": {
6643 "destinationResourceArn": "arn:aws:s3:::test-bucket"
6644 }
6645 }),
6646 );
6647 svc.put_delivery_destination(&req).unwrap();
6648
6649 let req = make_request(
6650 "CreateDelivery",
6651 json!({
6652 "deliverySourceName": "test-ds",
6653 "deliveryDestinationArn": "arn:aws:logs:us-east-1:123456789012:delivery-destination:test-dd"
6654 }),
6655 );
6656 let resp = svc.create_delivery(&req).unwrap();
6657 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6658 let delivery_id = body["delivery"]["id"].as_str().unwrap().to_string();
6659
6660 let req = make_request("UpdateDeliveryConfiguration", json!({ "id": delivery_id }));
6661 svc.update_delivery_configuration(&req).unwrap();
6662 }
6663
6664 #[test]
6665 fn describe_configuration_templates_returns_empty() {
6666 let svc = make_service();
6667 let req = make_request("DescribeConfigurationTemplates", json!({}));
6668 let resp = svc.describe_configuration_templates(&req).unwrap();
6669 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6670 assert!(body["configurationTemplates"]
6671 .as_array()
6672 .unwrap()
6673 .is_empty());
6674 }
6675}