1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::state::SharedLogsState;
11
12mod anomaly;
13mod deliveries;
14mod destinations;
15mod exports;
16mod filters;
17mod groups;
18mod misc;
19mod policies;
20mod queries;
21mod streams;
22mod tags;
23
24pub struct LogsService {
25 state: SharedLogsState,
26 delivery_bus: Arc<DeliveryBus>,
27}
28
29impl LogsService {
30 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
31 Self {
32 state,
33 delivery_bus,
34 }
35 }
36}
37
38#[async_trait]
39impl AwsService for LogsService {
40 fn service_name(&self) -> &str {
41 "logs"
42 }
43
44 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
45 match req.action.as_str() {
46 "CreateLogGroup" => self.create_log_group(&req),
47 "DeleteLogGroup" => self.delete_log_group(&req),
48 "DescribeLogGroups" => self.describe_log_groups(&req),
49 "CreateLogStream" => self.create_log_stream(&req),
50 "DeleteLogStream" => self.delete_log_stream(&req),
51 "DescribeLogStreams" => self.describe_log_streams(&req),
52 "PutLogEvents" => self.put_log_events(&req),
53 "GetLogEvents" => self.get_log_events(&req),
54 "FilterLogEvents" => self.filter_log_events(&req),
55 "TagLogGroup" => self.tag_log_group(&req),
56 "UntagLogGroup" => self.untag_log_group(&req),
57 "ListTagsLogGroup" => self.list_tags_log_group(&req),
58 "TagResource" => self.tag_resource(&req),
59 "UntagResource" => self.untag_resource(&req),
60 "ListTagsForResource" => self.list_tags_for_resource(&req),
61 "PutRetentionPolicy" => self.put_retention_policy(&req),
62 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
63 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
64 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
65 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
66 "PutMetricFilter" => self.put_metric_filter(&req),
67 "DescribeMetricFilters" => self.describe_metric_filters(&req),
68 "DeleteMetricFilter" => self.delete_metric_filter(&req),
69 "PutResourcePolicy" => self.put_resource_policy(&req),
70 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
71 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
72 "PutDestination" => self.put_destination(&req),
73 "DescribeDestinations" => self.describe_destinations(&req),
74 "DeleteDestination" => self.delete_destination(&req),
75 "PutDestinationPolicy" => self.put_destination_policy(&req),
76 "StartQuery" => self.start_query(&req),
77 "GetQueryResults" => self.get_query_results(&req),
78 "DescribeQueries" => self.describe_queries(&req),
79 "CreateExportTask" => self.create_export_task(&req),
80 "DescribeExportTasks" => self.describe_export_tasks(&req),
81 "CancelExportTask" => self.cancel_export_task(&req),
82 "PutDeliveryDestination" => self.put_delivery_destination(&req),
83 "GetDeliveryDestination" => self.get_delivery_destination(&req),
84 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
85 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
86 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
87 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
88 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
89 "PutDeliverySource" => self.put_delivery_source(&req),
90 "GetDeliverySource" => self.get_delivery_source(&req),
91 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
92 "DeleteDeliverySource" => self.delete_delivery_source(&req),
93 "CreateDelivery" => self.create_delivery(&req),
94 "GetDelivery" => self.get_delivery(&req),
95 "DescribeDeliveries" => self.describe_deliveries(&req),
96 "DeleteDelivery" => self.delete_delivery(&req),
97 "AssociateKmsKey" => self.associate_kms_key(&req),
98 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
99 "PutQueryDefinition" => self.put_query_definition(&req),
100 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
101 "DeleteQueryDefinition" => self.delete_query_definition(&req),
102 "PutAccountPolicy" => self.put_account_policy(&req),
103 "DescribeAccountPolicies" => self.describe_account_policies(&req),
104 "DeleteAccountPolicy" => self.delete_account_policy(&req),
105 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
106 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
107 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
108 "PutIndexPolicy" => self.put_index_policy(&req),
109 "DescribeIndexPolicies" => self.describe_index_policies(&req),
110 "DeleteIndexPolicy" => self.delete_index_policy(&req),
111 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
112 "PutTransformer" => self.put_transformer(&req),
113 "GetTransformer" => self.get_transformer(&req),
114 "DeleteTransformer" => self.delete_transformer(&req),
115 "TestTransformer" => self.test_transformer(&req),
116 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
117 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
118 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
119 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
120 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
121 "GetLogGroupFields" => self.get_log_group_fields(&req),
122 "TestMetricFilter" => self.test_metric_filter(&req),
123 "StopQuery" => self.stop_query(&req),
124 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
125 "GetLogRecord" => self.get_log_record(&req),
126 "ListAnomalies" => self.list_anomalies(&req),
127 "UpdateAnomaly" => self.update_anomaly(&req),
128 "CreateImportTask" => self.create_import_task(&req),
129 "DescribeImportTasks" => self.describe_import_tasks(&req),
130 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
131 "CancelImportTask" => self.cancel_import_task(&req),
132 "PutIntegration" => self.put_integration(&req),
133 "GetIntegration" => self.get_integration(&req),
134 "DeleteIntegration" => self.delete_integration(&req),
135 "ListIntegrations" => self.list_integrations(&req),
136 "CreateLookupTable" => self.create_lookup_table(&req),
137 "GetLookupTable" => self.get_lookup_table(&req),
138 "DescribeLookupTables" => self.describe_lookup_tables(&req),
139 "DeleteLookupTable" => self.delete_lookup_table(&req),
140 "UpdateLookupTable" => self.update_lookup_table(&req),
141 "CreateScheduledQuery" => self.create_scheduled_query(&req),
142 "GetScheduledQuery" => self.get_scheduled_query(&req),
143 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
144 "ListScheduledQueries" => self.list_scheduled_queries(&req),
145 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
146 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
147 "StartLiveTail" => self.start_live_tail(&req),
148 "ListLogGroups" => self.list_log_groups(&req),
149 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
150 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
151 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
152 "GetLogObject" => self.get_log_object(&req),
153 "GetLogFields" => self.get_log_fields(&req),
154 "AssociateSourceToS3TableIntegration" => {
155 self.associate_source_to_s3_table_integration(&req)
156 }
157 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
158 "DisassociateSourceFromS3TableIntegration" => {
159 self.disassociate_source_from_s3_table_integration(&req)
160 }
161 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
162 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
163 "GetExportedData" => self.get_exported_data(&req),
165 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
166 }
167 }
168
169 fn supported_actions(&self) -> &[&str] {
170 SUPPORTED_ACTIONS
171 }
172}
173
174const SUPPORTED_ACTIONS: &[&str] = &[
175 "CreateLogGroup",
176 "DeleteLogGroup",
177 "DescribeLogGroups",
178 "CreateLogStream",
179 "DeleteLogStream",
180 "DescribeLogStreams",
181 "PutLogEvents",
182 "GetLogEvents",
183 "FilterLogEvents",
184 "TagLogGroup",
185 "UntagLogGroup",
186 "ListTagsLogGroup",
187 "TagResource",
188 "UntagResource",
189 "ListTagsForResource",
190 "PutRetentionPolicy",
191 "DeleteRetentionPolicy",
192 "PutSubscriptionFilter",
193 "DescribeSubscriptionFilters",
194 "DeleteSubscriptionFilter",
195 "PutMetricFilter",
196 "DescribeMetricFilters",
197 "DeleteMetricFilter",
198 "PutResourcePolicy",
199 "DescribeResourcePolicies",
200 "DeleteResourcePolicy",
201 "PutDestination",
202 "DescribeDestinations",
203 "DeleteDestination",
204 "PutDestinationPolicy",
205 "StartQuery",
206 "GetQueryResults",
207 "DescribeQueries",
208 "CreateExportTask",
209 "DescribeExportTasks",
210 "CancelExportTask",
211 "PutDeliveryDestination",
212 "GetDeliveryDestination",
213 "DescribeDeliveryDestinations",
214 "DeleteDeliveryDestination",
215 "PutDeliveryDestinationPolicy",
216 "GetDeliveryDestinationPolicy",
217 "DeleteDeliveryDestinationPolicy",
218 "PutDeliverySource",
219 "GetDeliverySource",
220 "DescribeDeliverySources",
221 "DeleteDeliverySource",
222 "CreateDelivery",
223 "GetDelivery",
224 "DescribeDeliveries",
225 "DeleteDelivery",
226 "AssociateKmsKey",
227 "DisassociateKmsKey",
228 "PutQueryDefinition",
229 "DescribeQueryDefinitions",
230 "DeleteQueryDefinition",
231 "PutAccountPolicy",
232 "DescribeAccountPolicies",
233 "DeleteAccountPolicy",
234 "PutDataProtectionPolicy",
235 "GetDataProtectionPolicy",
236 "DeleteDataProtectionPolicy",
237 "PutIndexPolicy",
238 "DescribeIndexPolicies",
239 "DeleteIndexPolicy",
240 "DescribeFieldIndexes",
241 "PutTransformer",
242 "GetTransformer",
243 "DeleteTransformer",
244 "TestTransformer",
245 "CreateLogAnomalyDetector",
246 "GetLogAnomalyDetector",
247 "DeleteLogAnomalyDetector",
248 "ListLogAnomalyDetectors",
249 "UpdateLogAnomalyDetector",
250 "GetLogGroupFields",
251 "TestMetricFilter",
252 "StopQuery",
253 "PutLogGroupDeletionProtection",
254 "GetLogRecord",
255 "ListAnomalies",
256 "UpdateAnomaly",
257 "CreateImportTask",
258 "DescribeImportTasks",
259 "DescribeImportTaskBatches",
260 "CancelImportTask",
261 "PutIntegration",
262 "GetIntegration",
263 "DeleteIntegration",
264 "ListIntegrations",
265 "CreateLookupTable",
266 "GetLookupTable",
267 "DescribeLookupTables",
268 "DeleteLookupTable",
269 "UpdateLookupTable",
270 "CreateScheduledQuery",
271 "GetScheduledQuery",
272 "GetScheduledQueryHistory",
273 "ListScheduledQueries",
274 "DeleteScheduledQuery",
275 "UpdateScheduledQuery",
276 "StartLiveTail",
277 "ListLogGroups",
278 "ListLogGroupsForQuery",
279 "ListAggregateLogGroupSummaries",
280 "PutBearerTokenAuthentication",
281 "GetLogObject",
282 "GetLogFields",
283 "AssociateSourceToS3TableIntegration",
284 "ListSourcesForS3TableIntegration",
285 "DisassociateSourceFromS3TableIntegration",
286 "UpdateDeliveryConfiguration",
287 "DescribeConfigurationTemplates",
288];
289
290fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
291 body[field].as_str().ok_or_else(|| {
292 AwsServiceError::aws_error(
293 StatusCode::BAD_REQUEST,
294 "InvalidParameterException",
295 format!("{field} is required"),
296 )
297 })
298}
299
300fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
303 let mut m: serde_json::Map<String, Value> =
304 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
305 m.entry("destinationResourceArn".to_string())
306 .or_insert_with(|| json!(""));
307 Value::Object(m)
308}
309
310fn generate_sequence_token() -> String {
311 use std::time::{SystemTime, UNIX_EPOCH};
312 let nanos = SystemTime::now()
313 .duration_since(UNIX_EPOCH)
314 .unwrap_or_default()
315 .as_nanos();
316 format!("{:038}", nanos % 10u128.pow(38))
318}
319
320fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
321 AwsServiceError::aws_error(
322 StatusCode::BAD_REQUEST,
323 "InvalidParameterException",
324 format!(
325 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
326 ),
327 )
328}
329
330fn resolve_log_group_name(
333 log_group_name: Option<&str>,
334 resource_identifier: Option<&str>,
335) -> Result<String, AwsServiceError> {
336 if let Some(identifier) = resource_identifier {
337 if identifier.starts_with("arn:") {
338 extract_log_group_from_arn(identifier).ok_or_else(|| {
339 AwsServiceError::aws_error(
340 StatusCode::BAD_REQUEST,
341 "InvalidParameterException",
342 format!("Invalid ARN: {identifier}"),
343 )
344 })
345 } else {
346 Ok(identifier.to_string())
347 }
348 } else if let Some(name) = log_group_name {
349 Ok(name.to_string())
350 } else {
351 Err(AwsServiceError::aws_error(
352 StatusCode::BAD_REQUEST,
353 "InvalidParameterException",
354 "Either logGroupName or resourceIdentifier is required",
355 ))
356 }
357}
358
359fn extract_log_group_from_arn(arn: &str) -> Option<String> {
361 let parts: Vec<&str> = arn.splitn(7, ':').collect();
363 if parts.len() >= 7 && parts[5] == "log-group" {
364 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
365 Some(name.to_string())
366 } else {
367 None
368 }
369}
370
371fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
379 let pattern = pattern.trim();
380
381 if pattern.is_empty() {
383 return true;
384 }
385
386 if pattern.starts_with('{') && pattern.ends_with('}') {
388 return matches_json_filter_pattern(pattern, message);
389 }
390
391 if pattern.starts_with('[') {
393 return false;
394 }
395
396 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
398 let inner = &pattern[1..pattern.len() - 1];
399 let unescaped = inner.replace("\\\"", "\"");
401 return message.contains(&unescaped);
402 }
403
404 let terms = parse_filter_terms(pattern);
406 terms.iter().all(|term| message.contains(term.as_str()))
407}
408
409fn parse_filter_terms(pattern: &str) -> Vec<String> {
411 let mut terms = Vec::new();
412 let mut chars = pattern.chars().peekable();
413
414 while chars.peek().is_some() {
415 while chars.peek().is_some_and(|c| c.is_whitespace()) {
417 chars.next();
418 }
419
420 if chars.peek().is_none() {
421 break;
422 }
423
424 if chars.peek() == Some(&'"') {
425 chars.next(); let mut term = String::new();
428 loop {
429 match chars.next() {
430 Some('\\') => {
431 if let Some(c) = chars.next() {
432 term.push(c);
433 }
434 }
435 Some('"') => break,
436 Some(c) => term.push(c),
437 None => break,
438 }
439 }
440 terms.push(term);
441 } else {
442 let mut term = String::new();
444 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
445 term.push(chars.next().unwrap());
446 }
447 if !term.is_empty() {
448 terms.push(term);
449 }
450 }
451 }
452
453 terms
454}
455
456fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
458 let inner = pattern
460 .strip_prefix('{')
461 .and_then(|s| s.strip_suffix('}'))
462 .unwrap_or("")
463 .trim();
464
465 if inner.is_empty() {
466 return true;
467 }
468
469 let msg_json: serde_json::Value = match serde_json::from_str(message) {
471 Ok(v) => v,
472 Err(_) => return false, };
474
475 let conditions: Vec<&str> = inner.split("&&").collect();
479
480 for condition in conditions {
481 let condition = condition.trim();
482 if !matches_single_json_condition(condition, &msg_json) {
483 return false;
484 }
485 }
486
487 true
488}
489
490fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
491 let condition = condition.trim();
493
494 let ops = ["!=", ">=", "<=", "=", ">", "<"];
496 let mut found_op = None;
497 let mut op_pos = 0;
498 let mut op_len = 0;
499
500 for op in &ops {
501 if let Some(pos) = condition.find(op) {
502 let before = &condition[..pos];
504 let quote_count = before.chars().filter(|&c| c == '"').count();
505 if quote_count % 2 == 0 {
506 found_op = Some(*op);
507 op_pos = pos;
508 op_len = op.len();
509 break;
510 }
511 }
512 }
513
514 let (op, field_part, value_part) = match found_op {
515 Some(op) => (
516 op,
517 condition[..op_pos].trim(),
518 condition[op_pos + op_len..].trim(),
519 ),
520 None => {
521 if let Some(path) = condition.strip_prefix("$.") {
524 return resolve_json_path_simple(json, path).is_some();
525 }
526 return true;
527 }
528 };
529
530 let path = match field_part.strip_prefix("$.") {
532 Some(p) => p,
533 None => return false, };
535
536 let actual_value = match resolve_json_path_simple(json, path) {
537 Some(v) => v,
538 None => return op == "!=", };
540
541 let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
543 let s = &value_part[1..value_part.len() - 1];
545 match op {
546 "=" => actual_value.as_str() == Some(s),
547 "!=" => actual_value.as_str() != Some(s),
548 _ => false,
549 }
550 } else if let Ok(expected_num) = value_part.parse::<f64>() {
551 let actual_num = actual_value.as_f64();
553 match (op, actual_num) {
554 ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
555 ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
556 (">", Some(n)) => n > expected_num,
557 ("<", Some(n)) => n < expected_num,
558 (">=", Some(n)) => n >= expected_num,
559 ("<=", Some(n)) => n <= expected_num,
560 _ => false,
561 }
562 } else if value_part == "true" || value_part == "false" {
563 let expected_bool = value_part == "true";
564 match op {
565 "=" => actual_value.as_bool() == Some(expected_bool),
566 "!=" => actual_value.as_bool() != Some(expected_bool),
567 _ => false,
568 }
569 } else {
570 false };
572
573 expected_str
574}
575
576fn resolve_json_path_simple<'a>(
578 json: &'a serde_json::Value,
579 path: &str,
580) -> Option<&'a serde_json::Value> {
581 let mut current = json;
582 for part in path.split('.') {
583 current = current.get(part)?;
584 }
585 if current.is_null() {
586 None
587 } else {
588 Some(current)
589 }
590}
591
592#[cfg(test)]
593pub(crate) mod test_helpers {
594 use super::*;
595 use crate::state::LogsState;
596 use bytes::Bytes;
597 use fakecloud_core::delivery::DeliveryBus;
598 use http::{HeaderMap, Method};
599 use std::collections::HashMap;
600 use std::sync::Arc;
601
602 pub fn make_service() -> LogsService {
603 let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
604 "123456789012",
605 "us-east-1",
606 )));
607 let delivery_bus = Arc::new(DeliveryBus::new());
608 LogsService::new(state, delivery_bus)
609 }
610
611 pub fn make_request(
612 action: &str,
613 body: serde_json::Value,
614 ) -> fakecloud_core::service::AwsRequest {
615 fakecloud_core::service::AwsRequest {
616 service: "logs".to_string(),
617 action: action.to_string(),
618 region: "us-east-1".to_string(),
619 account_id: "123456789012".to_string(),
620 request_id: "test-request-id".to_string(),
621 headers: HeaderMap::new(),
622 query_params: HashMap::new(),
623 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
624 path_segments: vec![],
625 raw_path: "/".to_string(),
626 raw_query: String::new(),
627 method: Method::POST,
628 is_query_protocol: false,
629 access_key_id: None,
630 principal: None,
631 }
632 }
633
634 pub fn create_group(svc: &LogsService, name: &str) {
635 let req = make_request(
636 "CreateLogGroup",
637 serde_json::json!({ "logGroupName": name }),
638 );
639 svc.create_log_group(&req).unwrap();
640 }
641
642 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
643 let req = make_request(
644 "CreateLogStream",
645 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
646 );
647 svc.create_log_stream(&req).unwrap();
648 }
649
650 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
651 let now = chrono::Utc::now().timestamp_millis();
652 let events: Vec<serde_json::Value> = messages
653 .iter()
654 .enumerate()
655 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
656 .collect();
657 let req = make_request(
658 "PutLogEvents",
659 serde_json::json!({
660 "logGroupName": group,
661 "logStreamName": stream,
662 "logEvents": events,
663 }),
664 );
665 svc.put_log_events(&req).unwrap();
666 }
667
668 #[test]
669 fn array_filter_pattern_does_not_match() {
670 assert!(
671 !matches_filter_pattern("[w1, w2, w3]", "some log message"),
672 "array-style filter pattern must not match (fail closed)"
673 );
674 }
675
676 #[test]
677 fn unrecognized_json_filter_path_does_not_match() {
678 assert!(
681 !matches_single_json_condition(
682 "level = \"ERROR\"",
683 &serde_json::json!({"level": "ERROR"}),
684 ),
685 "filter condition without $. prefix must not match (fail closed)"
686 );
687 }
688
689 #[test]
690 fn unknown_value_format_does_not_match() {
691 assert!(
693 !matches_single_json_condition(
694 "$.level = ERROR",
695 &serde_json::json!({"level": "ERROR"}),
696 ),
697 "unquoted non-numeric non-boolean value must not match (fail closed)"
698 );
699 }
700}