1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::state::SharedLogsState;
11
12mod anomaly;
13mod deliveries;
14mod destinations;
15mod exports;
16mod filters;
17mod groups;
18mod misc;
19mod policies;
20mod queries;
21mod streams;
22mod tags;
23
24pub struct LogsService {
25 state: SharedLogsState,
26 delivery_bus: Arc<DeliveryBus>,
27}
28
29impl LogsService {
30 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
31 Self {
32 state,
33 delivery_bus,
34 }
35 }
36}
37
38#[async_trait]
39impl AwsService for LogsService {
40 fn service_name(&self) -> &str {
41 "logs"
42 }
43
44 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
45 match req.action.as_str() {
46 "CreateLogGroup" => self.create_log_group(&req),
47 "DeleteLogGroup" => self.delete_log_group(&req),
48 "DescribeLogGroups" => self.describe_log_groups(&req),
49 "CreateLogStream" => self.create_log_stream(&req),
50 "DeleteLogStream" => self.delete_log_stream(&req),
51 "DescribeLogStreams" => self.describe_log_streams(&req),
52 "PutLogEvents" => self.put_log_events(&req),
53 "GetLogEvents" => self.get_log_events(&req),
54 "FilterLogEvents" => self.filter_log_events(&req),
55 "TagLogGroup" => self.tag_log_group(&req),
56 "UntagLogGroup" => self.untag_log_group(&req),
57 "ListTagsLogGroup" => self.list_tags_log_group(&req),
58 "TagResource" => self.tag_resource(&req),
59 "UntagResource" => self.untag_resource(&req),
60 "ListTagsForResource" => self.list_tags_for_resource(&req),
61 "PutRetentionPolicy" => self.put_retention_policy(&req),
62 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
63 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
64 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
65 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
66 "PutMetricFilter" => self.put_metric_filter(&req),
67 "DescribeMetricFilters" => self.describe_metric_filters(&req),
68 "DeleteMetricFilter" => self.delete_metric_filter(&req),
69 "PutResourcePolicy" => self.put_resource_policy(&req),
70 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
71 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
72 "PutDestination" => self.put_destination(&req),
73 "DescribeDestinations" => self.describe_destinations(&req),
74 "DeleteDestination" => self.delete_destination(&req),
75 "PutDestinationPolicy" => self.put_destination_policy(&req),
76 "StartQuery" => self.start_query(&req),
77 "GetQueryResults" => self.get_query_results(&req),
78 "DescribeQueries" => self.describe_queries(&req),
79 "CreateExportTask" => self.create_export_task(&req),
80 "DescribeExportTasks" => self.describe_export_tasks(&req),
81 "CancelExportTask" => self.cancel_export_task(&req),
82 "PutDeliveryDestination" => self.put_delivery_destination(&req),
83 "GetDeliveryDestination" => self.get_delivery_destination(&req),
84 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
85 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
86 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
87 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
88 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
89 "PutDeliverySource" => self.put_delivery_source(&req),
90 "GetDeliverySource" => self.get_delivery_source(&req),
91 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
92 "DeleteDeliverySource" => self.delete_delivery_source(&req),
93 "CreateDelivery" => self.create_delivery(&req),
94 "GetDelivery" => self.get_delivery(&req),
95 "DescribeDeliveries" => self.describe_deliveries(&req),
96 "DeleteDelivery" => self.delete_delivery(&req),
97 "AssociateKmsKey" => self.associate_kms_key(&req),
98 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
99 "PutQueryDefinition" => self.put_query_definition(&req),
100 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
101 "DeleteQueryDefinition" => self.delete_query_definition(&req),
102 "PutAccountPolicy" => self.put_account_policy(&req),
103 "DescribeAccountPolicies" => self.describe_account_policies(&req),
104 "DeleteAccountPolicy" => self.delete_account_policy(&req),
105 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
106 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
107 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
108 "PutIndexPolicy" => self.put_index_policy(&req),
109 "DescribeIndexPolicies" => self.describe_index_policies(&req),
110 "DeleteIndexPolicy" => self.delete_index_policy(&req),
111 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
112 "PutTransformer" => self.put_transformer(&req),
113 "GetTransformer" => self.get_transformer(&req),
114 "DeleteTransformer" => self.delete_transformer(&req),
115 "TestTransformer" => self.test_transformer(&req),
116 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
117 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
118 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
119 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
120 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
121 "GetLogGroupFields" => self.get_log_group_fields(&req),
122 "TestMetricFilter" => self.test_metric_filter(&req),
123 "StopQuery" => self.stop_query(&req),
124 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
125 "GetLogRecord" => self.get_log_record(&req),
126 "ListAnomalies" => self.list_anomalies(&req),
127 "UpdateAnomaly" => self.update_anomaly(&req),
128 "CreateImportTask" => self.create_import_task(&req),
129 "DescribeImportTasks" => self.describe_import_tasks(&req),
130 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
131 "CancelImportTask" => self.cancel_import_task(&req),
132 "PutIntegration" => self.put_integration(&req),
133 "GetIntegration" => self.get_integration(&req),
134 "DeleteIntegration" => self.delete_integration(&req),
135 "ListIntegrations" => self.list_integrations(&req),
136 "CreateLookupTable" => self.create_lookup_table(&req),
137 "GetLookupTable" => self.get_lookup_table(&req),
138 "DescribeLookupTables" => self.describe_lookup_tables(&req),
139 "DeleteLookupTable" => self.delete_lookup_table(&req),
140 "UpdateLookupTable" => self.update_lookup_table(&req),
141 "CreateScheduledQuery" => self.create_scheduled_query(&req),
142 "GetScheduledQuery" => self.get_scheduled_query(&req),
143 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
144 "ListScheduledQueries" => self.list_scheduled_queries(&req),
145 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
146 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
147 "StartLiveTail" => self.start_live_tail(&req),
148 "ListLogGroups" => self.list_log_groups(&req),
149 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
150 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
151 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
152 "GetLogObject" => self.get_log_object(&req),
153 "GetLogFields" => self.get_log_fields(&req),
154 "AssociateSourceToS3TableIntegration" => {
155 self.associate_source_to_s3_table_integration(&req)
156 }
157 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
158 "DisassociateSourceFromS3TableIntegration" => {
159 self.disassociate_source_from_s3_table_integration(&req)
160 }
161 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
162 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
163 "GetExportedData" => self.get_exported_data(&req),
165 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
166 }
167 }
168
169 fn supported_actions(&self) -> &[&str] {
170 &[
171 "CreateLogGroup",
172 "DeleteLogGroup",
173 "DescribeLogGroups",
174 "CreateLogStream",
175 "DeleteLogStream",
176 "DescribeLogStreams",
177 "PutLogEvents",
178 "GetLogEvents",
179 "FilterLogEvents",
180 "TagLogGroup",
181 "UntagLogGroup",
182 "ListTagsLogGroup",
183 "TagResource",
184 "UntagResource",
185 "ListTagsForResource",
186 "PutRetentionPolicy",
187 "DeleteRetentionPolicy",
188 "PutSubscriptionFilter",
189 "DescribeSubscriptionFilters",
190 "DeleteSubscriptionFilter",
191 "PutMetricFilter",
192 "DescribeMetricFilters",
193 "DeleteMetricFilter",
194 "PutResourcePolicy",
195 "DescribeResourcePolicies",
196 "DeleteResourcePolicy",
197 "PutDestination",
198 "DescribeDestinations",
199 "DeleteDestination",
200 "PutDestinationPolicy",
201 "StartQuery",
202 "GetQueryResults",
203 "DescribeQueries",
204 "CreateExportTask",
205 "DescribeExportTasks",
206 "CancelExportTask",
207 "PutDeliveryDestination",
208 "GetDeliveryDestination",
209 "DescribeDeliveryDestinations",
210 "DeleteDeliveryDestination",
211 "PutDeliveryDestinationPolicy",
212 "GetDeliveryDestinationPolicy",
213 "DeleteDeliveryDestinationPolicy",
214 "PutDeliverySource",
215 "GetDeliverySource",
216 "DescribeDeliverySources",
217 "DeleteDeliverySource",
218 "CreateDelivery",
219 "GetDelivery",
220 "DescribeDeliveries",
221 "DeleteDelivery",
222 "AssociateKmsKey",
223 "DisassociateKmsKey",
224 "PutQueryDefinition",
225 "DescribeQueryDefinitions",
226 "DeleteQueryDefinition",
227 "PutAccountPolicy",
228 "DescribeAccountPolicies",
229 "DeleteAccountPolicy",
230 "PutDataProtectionPolicy",
231 "GetDataProtectionPolicy",
232 "DeleteDataProtectionPolicy",
233 "PutIndexPolicy",
234 "DescribeIndexPolicies",
235 "DeleteIndexPolicy",
236 "DescribeFieldIndexes",
237 "PutTransformer",
238 "GetTransformer",
239 "DeleteTransformer",
240 "TestTransformer",
241 "CreateLogAnomalyDetector",
242 "GetLogAnomalyDetector",
243 "DeleteLogAnomalyDetector",
244 "ListLogAnomalyDetectors",
245 "UpdateLogAnomalyDetector",
246 "GetLogGroupFields",
247 "TestMetricFilter",
248 "StopQuery",
249 "PutLogGroupDeletionProtection",
250 "GetLogRecord",
251 "ListAnomalies",
252 "UpdateAnomaly",
253 "CreateImportTask",
254 "DescribeImportTasks",
255 "DescribeImportTaskBatches",
256 "CancelImportTask",
257 "PutIntegration",
258 "GetIntegration",
259 "DeleteIntegration",
260 "ListIntegrations",
261 "CreateLookupTable",
262 "GetLookupTable",
263 "DescribeLookupTables",
264 "DeleteLookupTable",
265 "UpdateLookupTable",
266 "CreateScheduledQuery",
267 "GetScheduledQuery",
268 "GetScheduledQueryHistory",
269 "ListScheduledQueries",
270 "DeleteScheduledQuery",
271 "UpdateScheduledQuery",
272 "StartLiveTail",
273 "ListLogGroups",
274 "ListLogGroupsForQuery",
275 "ListAggregateLogGroupSummaries",
276 "PutBearerTokenAuthentication",
277 "GetLogObject",
278 "GetLogFields",
279 "AssociateSourceToS3TableIntegration",
280 "ListSourcesForS3TableIntegration",
281 "DisassociateSourceFromS3TableIntegration",
282 "UpdateDeliveryConfiguration",
283 "DescribeConfigurationTemplates",
284 ]
285 }
286}
287
288fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
289 body[field].as_str().ok_or_else(|| {
290 AwsServiceError::aws_error(
291 StatusCode::BAD_REQUEST,
292 "InvalidParameterException",
293 format!("{field} is required"),
294 )
295 })
296}
297
298fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
301 let mut m: serde_json::Map<String, Value> =
302 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
303 m.entry("destinationResourceArn".to_string())
304 .or_insert_with(|| json!(""));
305 Value::Object(m)
306}
307
308fn generate_sequence_token() -> String {
309 use std::time::{SystemTime, UNIX_EPOCH};
310 let nanos = SystemTime::now()
311 .duration_since(UNIX_EPOCH)
312 .unwrap_or_default()
313 .as_nanos();
314 format!("{:038}", nanos % 10u128.pow(38))
316}
317
318fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
319 AwsServiceError::aws_error(
320 StatusCode::BAD_REQUEST,
321 "InvalidParameterException",
322 format!(
323 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
324 ),
325 )
326}
327
328fn resolve_log_group_name(
331 log_group_name: Option<&str>,
332 resource_identifier: Option<&str>,
333) -> Result<String, AwsServiceError> {
334 if let Some(identifier) = resource_identifier {
335 if identifier.starts_with("arn:") {
336 extract_log_group_from_arn(identifier).ok_or_else(|| {
337 AwsServiceError::aws_error(
338 StatusCode::BAD_REQUEST,
339 "InvalidParameterException",
340 format!("Invalid ARN: {identifier}"),
341 )
342 })
343 } else {
344 Ok(identifier.to_string())
345 }
346 } else if let Some(name) = log_group_name {
347 Ok(name.to_string())
348 } else {
349 Err(AwsServiceError::aws_error(
350 StatusCode::BAD_REQUEST,
351 "InvalidParameterException",
352 "Either logGroupName or resourceIdentifier is required",
353 ))
354 }
355}
356
357fn extract_log_group_from_arn(arn: &str) -> Option<String> {
359 let parts: Vec<&str> = arn.splitn(7, ':').collect();
361 if parts.len() >= 7 && parts[5] == "log-group" {
362 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
363 Some(name.to_string())
364 } else {
365 None
366 }
367}
368
369fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
377 let pattern = pattern.trim();
378
379 if pattern.is_empty() {
381 return true;
382 }
383
384 if pattern.starts_with('{') && pattern.ends_with('}') {
386 return matches_json_filter_pattern(pattern, message);
387 }
388
389 if pattern.starts_with('[') {
391 return true;
392 }
393
394 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
396 let inner = &pattern[1..pattern.len() - 1];
397 let unescaped = inner.replace("\\\"", "\"");
399 return message.contains(&unescaped);
400 }
401
402 let terms = parse_filter_terms(pattern);
404 terms.iter().all(|term| message.contains(term.as_str()))
405}
406
407fn parse_filter_terms(pattern: &str) -> Vec<String> {
409 let mut terms = Vec::new();
410 let mut chars = pattern.chars().peekable();
411
412 while chars.peek().is_some() {
413 while chars.peek().is_some_and(|c| c.is_whitespace()) {
415 chars.next();
416 }
417
418 if chars.peek().is_none() {
419 break;
420 }
421
422 if chars.peek() == Some(&'"') {
423 chars.next(); let mut term = String::new();
426 loop {
427 match chars.next() {
428 Some('\\') => {
429 if let Some(c) = chars.next() {
430 term.push(c);
431 }
432 }
433 Some('"') => break,
434 Some(c) => term.push(c),
435 None => break,
436 }
437 }
438 terms.push(term);
439 } else {
440 let mut term = String::new();
442 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
443 term.push(chars.next().unwrap());
444 }
445 if !term.is_empty() {
446 terms.push(term);
447 }
448 }
449 }
450
451 terms
452}
453
454fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
456 let inner = pattern
458 .strip_prefix('{')
459 .and_then(|s| s.strip_suffix('}'))
460 .unwrap_or("")
461 .trim();
462
463 if inner.is_empty() {
464 return true;
465 }
466
467 let msg_json: serde_json::Value = match serde_json::from_str(message) {
469 Ok(v) => v,
470 Err(_) => return false, };
472
473 let conditions: Vec<&str> = inner.split("&&").collect();
477
478 for condition in conditions {
479 let condition = condition.trim();
480 if !matches_single_json_condition(condition, &msg_json) {
481 return false;
482 }
483 }
484
485 true
486}
487
488fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
489 let condition = condition.trim();
491
492 let ops = ["!=", ">=", "<=", "=", ">", "<"];
494 let mut found_op = None;
495 let mut op_pos = 0;
496 let mut op_len = 0;
497
498 for op in &ops {
499 if let Some(pos) = condition.find(op) {
500 let before = &condition[..pos];
502 let quote_count = before.chars().filter(|&c| c == '"').count();
503 if quote_count % 2 == 0 {
504 found_op = Some(*op);
505 op_pos = pos;
506 op_len = op.len();
507 break;
508 }
509 }
510 }
511
512 let (op, field_part, value_part) = match found_op {
513 Some(op) => (
514 op,
515 condition[..op_pos].trim(),
516 condition[op_pos + op_len..].trim(),
517 ),
518 None => {
519 if let Some(path) = condition.strip_prefix("$.") {
522 return resolve_json_path_simple(json, path).is_some();
523 }
524 return true;
525 }
526 };
527
528 let path = match field_part.strip_prefix("$.") {
530 Some(p) => p,
531 None => return true, };
533
534 let actual_value = match resolve_json_path_simple(json, path) {
535 Some(v) => v,
536 None => return op == "!=", };
538
539 let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
541 let s = &value_part[1..value_part.len() - 1];
543 match op {
544 "=" => actual_value.as_str() == Some(s),
545 "!=" => actual_value.as_str() != Some(s),
546 _ => false,
547 }
548 } else if let Ok(expected_num) = value_part.parse::<f64>() {
549 let actual_num = actual_value.as_f64();
551 match (op, actual_num) {
552 ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
553 ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
554 (">", Some(n)) => n > expected_num,
555 ("<", Some(n)) => n < expected_num,
556 (">=", Some(n)) => n >= expected_num,
557 ("<=", Some(n)) => n <= expected_num,
558 _ => false,
559 }
560 } else if value_part == "true" || value_part == "false" {
561 let expected_bool = value_part == "true";
562 match op {
563 "=" => actual_value.as_bool() == Some(expected_bool),
564 "!=" => actual_value.as_bool() != Some(expected_bool),
565 _ => false,
566 }
567 } else {
568 true };
570
571 expected_str
572}
573
574fn resolve_json_path_simple<'a>(
576 json: &'a serde_json::Value,
577 path: &str,
578) -> Option<&'a serde_json::Value> {
579 let mut current = json;
580 for part in path.split('.') {
581 current = current.get(part)?;
582 }
583 if current.is_null() {
584 None
585 } else {
586 Some(current)
587 }
588}
589
590#[cfg(test)]
591pub(crate) mod test_helpers {
592 use super::*;
593 use crate::state::LogsState;
594 use bytes::Bytes;
595 use fakecloud_core::delivery::DeliveryBus;
596 use http::{HeaderMap, Method};
597 use std::collections::HashMap;
598 use std::sync::Arc;
599
600 pub fn make_service() -> LogsService {
601 let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
602 "123456789012",
603 "us-east-1",
604 )));
605 let delivery_bus = Arc::new(DeliveryBus::new());
606 LogsService::new(state, delivery_bus)
607 }
608
609 pub fn make_request(
610 action: &str,
611 body: serde_json::Value,
612 ) -> fakecloud_core::service::AwsRequest {
613 fakecloud_core::service::AwsRequest {
614 service: "logs".to_string(),
615 action: action.to_string(),
616 region: "us-east-1".to_string(),
617 account_id: "123456789012".to_string(),
618 request_id: "test-request-id".to_string(),
619 headers: HeaderMap::new(),
620 query_params: HashMap::new(),
621 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
622 path_segments: vec![],
623 raw_path: "/".to_string(),
624 raw_query: String::new(),
625 method: Method::POST,
626 is_query_protocol: false,
627 access_key_id: None,
628 }
629 }
630
631 pub fn create_group(svc: &LogsService, name: &str) {
632 let req = make_request(
633 "CreateLogGroup",
634 serde_json::json!({ "logGroupName": name }),
635 );
636 svc.create_log_group(&req).unwrap();
637 }
638
639 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
640 let req = make_request(
641 "CreateLogStream",
642 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
643 );
644 svc.create_log_stream(&req).unwrap();
645 }
646
647 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
648 let now = chrono::Utc::now().timestamp_millis();
649 let events: Vec<serde_json::Value> = messages
650 .iter()
651 .enumerate()
652 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
653 .collect();
654 let req = make_request(
655 "PutLogEvents",
656 serde_json::json!({
657 "logGroupName": group,
658 "logStreamName": stream,
659 "logEvents": events,
660 }),
661 );
662 svc.put_log_events(&req).unwrap();
663 }
664}