1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27fn is_read_only_action(action: &str) -> bool {
30 matches!(
31 action,
32 "DescribeLogGroups"
33 | "DescribeLogStreams"
34 | "GetLogEvents"
35 | "FilterLogEvents"
36 | "ListTagsLogGroup"
37 | "ListTagsForResource"
38 | "DescribeSubscriptionFilters"
39 | "DescribeMetricFilters"
40 | "DescribeResourcePolicies"
41 | "DescribeDestinations"
42 | "GetQueryResults"
43 | "DescribeQueries"
44 | "DescribeExportTasks"
45 | "GetDeliveryDestination"
46 | "DescribeDeliveryDestinations"
47 | "GetDeliveryDestinationPolicy"
48 | "GetDeliverySource"
49 | "DescribeDeliverySources"
50 | "GetDelivery"
51 | "DescribeDeliveries"
52 | "DescribeQueryDefinitions"
53 | "DescribeAccountPolicies"
54 | "GetDataProtectionPolicy"
55 | "DescribeIndexPolicies"
56 | "DescribeFieldIndexes"
57 | "GetTransformer"
58 | "TestTransformer"
59 | "GetLogAnomalyDetector"
60 | "ListLogAnomalyDetectors"
61 | "GetLogGroupFields"
62 | "TestMetricFilter"
63 | "GetLogRecord"
64 | "ListAnomalies"
65 | "DescribeImportTasks"
66 | "DescribeImportTaskBatches"
67 | "GetIntegration"
68 | "ListIntegrations"
69 | "GetLookupTable"
70 | "DescribeLookupTables"
71 | "GetScheduledQuery"
72 | "GetScheduledQueryHistory"
73 | "ListScheduledQueries"
74 | "StartLiveTail"
75 | "ListLogGroups"
76 | "ListLogGroupsForQuery"
77 | "ListAggregateLogGroupSummaries"
78 | "GetLogObject"
79 | "GetLogFields"
80 | "ListSourcesForS3TableIntegration"
81 | "DescribeConfigurationTemplates"
82 | "GetExportedData"
83 )
84}
85
86pub struct LogsService {
87 state: SharedLogsState,
88 delivery_bus: Arc<DeliveryBus>,
89 snapshot_store: Option<Arc<dyn SnapshotStore>>,
90 snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95 Self {
96 state,
97 delivery_bus,
98 snapshot_store: None,
99 snapshot_lock: Arc::new(AsyncMutex::new(())),
100 }
101 }
102
103 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104 self.snapshot_store = Some(store);
105 self
106 }
107
108 async fn save_snapshot(&self) {
112 let Some(store) = self.snapshot_store.clone() else {
113 return;
114 };
115 let _guard = self.snapshot_lock.lock().await;
116 let snapshot = LogsSnapshot {
117 schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
118 accounts: Some(self.state.read().clone()),
119 state: None,
120 };
121 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122 let bytes = serde_json::to_vec(&snapshot)
123 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124 store.save(&bytes)
125 })
126 .await;
127 match join {
128 Ok(Ok(())) => {}
129 Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
130 Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
131 }
132 }
133}
134
135#[async_trait]
136impl AwsService for LogsService {
137 fn service_name(&self) -> &str {
138 "logs"
139 }
140
141 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
142 let mutates = !is_read_only_action(req.action.as_str());
143 let result = match req.action.as_str() {
144 "CreateLogGroup" => self.create_log_group(&req),
145 "DeleteLogGroup" => self.delete_log_group(&req),
146 "DescribeLogGroups" => self.describe_log_groups(&req),
147 "CreateLogStream" => self.create_log_stream(&req),
148 "DeleteLogStream" => self.delete_log_stream(&req),
149 "DescribeLogStreams" => self.describe_log_streams(&req),
150 "PutLogEvents" => self.put_log_events(&req),
151 "GetLogEvents" => self.get_log_events(&req),
152 "FilterLogEvents" => self.filter_log_events(&req),
153 "TagLogGroup" => self.tag_log_group(&req),
154 "UntagLogGroup" => self.untag_log_group(&req),
155 "ListTagsLogGroup" => self.list_tags_log_group(&req),
156 "TagResource" => self.tag_resource(&req),
157 "UntagResource" => self.untag_resource(&req),
158 "ListTagsForResource" => self.list_tags_for_resource(&req),
159 "PutRetentionPolicy" => self.put_retention_policy(&req),
160 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
161 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
162 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
163 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
164 "PutMetricFilter" => self.put_metric_filter(&req),
165 "DescribeMetricFilters" => self.describe_metric_filters(&req),
166 "DeleteMetricFilter" => self.delete_metric_filter(&req),
167 "PutResourcePolicy" => self.put_resource_policy(&req),
168 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
169 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
170 "PutDestination" => self.put_destination(&req),
171 "DescribeDestinations" => self.describe_destinations(&req),
172 "DeleteDestination" => self.delete_destination(&req),
173 "PutDestinationPolicy" => self.put_destination_policy(&req),
174 "StartQuery" => self.start_query(&req),
175 "GetQueryResults" => self.get_query_results(&req),
176 "DescribeQueries" => self.describe_queries(&req),
177 "CreateExportTask" => self.create_export_task(&req),
178 "DescribeExportTasks" => self.describe_export_tasks(&req),
179 "CancelExportTask" => self.cancel_export_task(&req),
180 "PutDeliveryDestination" => self.put_delivery_destination(&req),
181 "GetDeliveryDestination" => self.get_delivery_destination(&req),
182 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
183 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
184 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
185 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
186 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
187 "PutDeliverySource" => self.put_delivery_source(&req),
188 "GetDeliverySource" => self.get_delivery_source(&req),
189 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
190 "DeleteDeliverySource" => self.delete_delivery_source(&req),
191 "CreateDelivery" => self.create_delivery(&req),
192 "GetDelivery" => self.get_delivery(&req),
193 "DescribeDeliveries" => self.describe_deliveries(&req),
194 "DeleteDelivery" => self.delete_delivery(&req),
195 "AssociateKmsKey" => self.associate_kms_key(&req),
196 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
197 "PutQueryDefinition" => self.put_query_definition(&req),
198 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
199 "DeleteQueryDefinition" => self.delete_query_definition(&req),
200 "PutAccountPolicy" => self.put_account_policy(&req),
201 "DescribeAccountPolicies" => self.describe_account_policies(&req),
202 "DeleteAccountPolicy" => self.delete_account_policy(&req),
203 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
204 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
205 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
206 "PutIndexPolicy" => self.put_index_policy(&req),
207 "DescribeIndexPolicies" => self.describe_index_policies(&req),
208 "DeleteIndexPolicy" => self.delete_index_policy(&req),
209 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
210 "PutTransformer" => self.put_transformer(&req),
211 "GetTransformer" => self.get_transformer(&req),
212 "DeleteTransformer" => self.delete_transformer(&req),
213 "TestTransformer" => self.test_transformer(&req),
214 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
215 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
216 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
217 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
218 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
219 "GetLogGroupFields" => self.get_log_group_fields(&req),
220 "TestMetricFilter" => self.test_metric_filter(&req),
221 "StopQuery" => self.stop_query(&req),
222 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
223 "GetLogRecord" => self.get_log_record(&req),
224 "ListAnomalies" => self.list_anomalies(&req),
225 "UpdateAnomaly" => self.update_anomaly(&req),
226 "CreateImportTask" => self.create_import_task(&req),
227 "DescribeImportTasks" => self.describe_import_tasks(&req),
228 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
229 "CancelImportTask" => self.cancel_import_task(&req),
230 "PutIntegration" => self.put_integration(&req),
231 "GetIntegration" => self.get_integration(&req),
232 "DeleteIntegration" => self.delete_integration(&req),
233 "ListIntegrations" => self.list_integrations(&req),
234 "CreateLookupTable" => self.create_lookup_table(&req),
235 "GetLookupTable" => self.get_lookup_table(&req),
236 "DescribeLookupTables" => self.describe_lookup_tables(&req),
237 "DeleteLookupTable" => self.delete_lookup_table(&req),
238 "UpdateLookupTable" => self.update_lookup_table(&req),
239 "CreateScheduledQuery" => self.create_scheduled_query(&req),
240 "GetScheduledQuery" => self.get_scheduled_query(&req),
241 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
242 "ListScheduledQueries" => self.list_scheduled_queries(&req),
243 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
244 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
245 "StartLiveTail" => self.start_live_tail(&req),
246 "ListLogGroups" => self.list_log_groups(&req),
247 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
248 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
249 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
250 "GetLogObject" => self.get_log_object(&req),
251 "GetLogFields" => self.get_log_fields(&req),
252 "AssociateSourceToS3TableIntegration" => {
253 self.associate_source_to_s3_table_integration(&req)
254 }
255 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
256 "DisassociateSourceFromS3TableIntegration" => {
257 self.disassociate_source_from_s3_table_integration(&req)
258 }
259 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
260 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
261 "GetExportedData" => self.get_exported_data(&req),
263 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
264 };
265 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
266 self.save_snapshot().await;
267 }
268 result
269 }
270
271 fn supported_actions(&self) -> &[&str] {
272 SUPPORTED_ACTIONS
273 }
274}
275
276const SUPPORTED_ACTIONS: &[&str] = &[
277 "CreateLogGroup",
278 "DeleteLogGroup",
279 "DescribeLogGroups",
280 "CreateLogStream",
281 "DeleteLogStream",
282 "DescribeLogStreams",
283 "PutLogEvents",
284 "GetLogEvents",
285 "FilterLogEvents",
286 "TagLogGroup",
287 "UntagLogGroup",
288 "ListTagsLogGroup",
289 "TagResource",
290 "UntagResource",
291 "ListTagsForResource",
292 "PutRetentionPolicy",
293 "DeleteRetentionPolicy",
294 "PutSubscriptionFilter",
295 "DescribeSubscriptionFilters",
296 "DeleteSubscriptionFilter",
297 "PutMetricFilter",
298 "DescribeMetricFilters",
299 "DeleteMetricFilter",
300 "PutResourcePolicy",
301 "DescribeResourcePolicies",
302 "DeleteResourcePolicy",
303 "PutDestination",
304 "DescribeDestinations",
305 "DeleteDestination",
306 "PutDestinationPolicy",
307 "StartQuery",
308 "GetQueryResults",
309 "DescribeQueries",
310 "CreateExportTask",
311 "DescribeExportTasks",
312 "CancelExportTask",
313 "PutDeliveryDestination",
314 "GetDeliveryDestination",
315 "DescribeDeliveryDestinations",
316 "DeleteDeliveryDestination",
317 "PutDeliveryDestinationPolicy",
318 "GetDeliveryDestinationPolicy",
319 "DeleteDeliveryDestinationPolicy",
320 "PutDeliverySource",
321 "GetDeliverySource",
322 "DescribeDeliverySources",
323 "DeleteDeliverySource",
324 "CreateDelivery",
325 "GetDelivery",
326 "DescribeDeliveries",
327 "DeleteDelivery",
328 "AssociateKmsKey",
329 "DisassociateKmsKey",
330 "PutQueryDefinition",
331 "DescribeQueryDefinitions",
332 "DeleteQueryDefinition",
333 "PutAccountPolicy",
334 "DescribeAccountPolicies",
335 "DeleteAccountPolicy",
336 "PutDataProtectionPolicy",
337 "GetDataProtectionPolicy",
338 "DeleteDataProtectionPolicy",
339 "PutIndexPolicy",
340 "DescribeIndexPolicies",
341 "DeleteIndexPolicy",
342 "DescribeFieldIndexes",
343 "PutTransformer",
344 "GetTransformer",
345 "DeleteTransformer",
346 "TestTransformer",
347 "CreateLogAnomalyDetector",
348 "GetLogAnomalyDetector",
349 "DeleteLogAnomalyDetector",
350 "ListLogAnomalyDetectors",
351 "UpdateLogAnomalyDetector",
352 "GetLogGroupFields",
353 "TestMetricFilter",
354 "StopQuery",
355 "PutLogGroupDeletionProtection",
356 "GetLogRecord",
357 "ListAnomalies",
358 "UpdateAnomaly",
359 "CreateImportTask",
360 "DescribeImportTasks",
361 "DescribeImportTaskBatches",
362 "CancelImportTask",
363 "PutIntegration",
364 "GetIntegration",
365 "DeleteIntegration",
366 "ListIntegrations",
367 "CreateLookupTable",
368 "GetLookupTable",
369 "DescribeLookupTables",
370 "DeleteLookupTable",
371 "UpdateLookupTable",
372 "CreateScheduledQuery",
373 "GetScheduledQuery",
374 "GetScheduledQueryHistory",
375 "ListScheduledQueries",
376 "DeleteScheduledQuery",
377 "UpdateScheduledQuery",
378 "StartLiveTail",
379 "ListLogGroups",
380 "ListLogGroupsForQuery",
381 "ListAggregateLogGroupSummaries",
382 "PutBearerTokenAuthentication",
383 "GetLogObject",
384 "GetLogFields",
385 "AssociateSourceToS3TableIntegration",
386 "ListSourcesForS3TableIntegration",
387 "DisassociateSourceFromS3TableIntegration",
388 "UpdateDeliveryConfiguration",
389 "DescribeConfigurationTemplates",
390];
391
392fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
393 body[field].as_str().ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::BAD_REQUEST,
396 "InvalidParameterException",
397 format!("{field} is required"),
398 )
399 })
400}
401
402fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
405 let mut m: serde_json::Map<String, Value> =
406 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
407 m.entry("destinationResourceArn".to_string())
408 .or_insert_with(|| json!(""));
409 Value::Object(m)
410}
411
412fn generate_sequence_token() -> String {
413 use std::time::{SystemTime, UNIX_EPOCH};
414 let nanos = SystemTime::now()
415 .duration_since(UNIX_EPOCH)
416 .unwrap_or_default()
417 .as_nanos();
418 format!("{:038}", nanos % 10u128.pow(38))
420}
421
422fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
423 AwsServiceError::aws_error(
424 StatusCode::BAD_REQUEST,
425 "InvalidParameterException",
426 format!(
427 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
428 ),
429 )
430}
431
432fn resolve_log_group_name(
435 log_group_name: Option<&str>,
436 resource_identifier: Option<&str>,
437) -> Result<String, AwsServiceError> {
438 if let Some(identifier) = resource_identifier {
439 if identifier.starts_with("arn:") {
440 extract_log_group_from_arn(identifier).ok_or_else(|| {
441 AwsServiceError::aws_error(
442 StatusCode::BAD_REQUEST,
443 "InvalidParameterException",
444 format!("Invalid ARN: {identifier}"),
445 )
446 })
447 } else {
448 Ok(identifier.to_string())
449 }
450 } else if let Some(name) = log_group_name {
451 Ok(name.to_string())
452 } else {
453 Err(AwsServiceError::aws_error(
454 StatusCode::BAD_REQUEST,
455 "InvalidParameterException",
456 "Either logGroupName or resourceIdentifier is required",
457 ))
458 }
459}
460
461pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
463 let parts: Vec<&str> = arn.splitn(7, ':').collect();
465 if parts.len() >= 7 && parts[5] == "log-group" {
466 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
467 Some(name.to_string())
468 } else {
469 None
470 }
471}
472
473fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
481 let pattern = pattern.trim();
482
483 if pattern.is_empty() {
485 return true;
486 }
487
488 if pattern.starts_with('{') && pattern.ends_with('}') {
490 return matches_json_filter_pattern(pattern, message);
491 }
492
493 if pattern.starts_with('[') {
495 return false;
496 }
497
498 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
500 let inner = &pattern[1..pattern.len() - 1];
501 let unescaped = inner.replace("\\\"", "\"");
503 return message.contains(&unescaped);
504 }
505
506 let terms = parse_filter_terms(pattern);
508 terms.iter().all(|term| message.contains(term.as_str()))
509}
510
511fn parse_filter_terms(pattern: &str) -> Vec<String> {
513 let mut terms = Vec::new();
514 let mut chars = pattern.chars().peekable();
515
516 while chars.peek().is_some() {
517 while chars.peek().is_some_and(|c| c.is_whitespace()) {
519 chars.next();
520 }
521
522 if chars.peek().is_none() {
523 break;
524 }
525
526 if chars.peek() == Some(&'"') {
527 chars.next(); let mut term = String::new();
530 loop {
531 match chars.next() {
532 Some('\\') => {
533 if let Some(c) = chars.next() {
534 term.push(c);
535 }
536 }
537 Some('"') => break,
538 Some(c) => term.push(c),
539 None => break,
540 }
541 }
542 terms.push(term);
543 } else {
544 let mut term = String::new();
546 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
547 term.push(chars.next().unwrap());
548 }
549 if !term.is_empty() {
550 terms.push(term);
551 }
552 }
553 }
554
555 terms
556}
557
558fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
560 let inner = pattern
562 .strip_prefix('{')
563 .and_then(|s| s.strip_suffix('}'))
564 .unwrap_or("")
565 .trim();
566
567 if inner.is_empty() {
568 return true;
569 }
570
571 let msg_json: serde_json::Value = match serde_json::from_str(message) {
573 Ok(v) => v,
574 Err(_) => return false, };
576
577 let conditions: Vec<&str> = inner.split("&&").collect();
581
582 for condition in conditions {
583 let condition = condition.trim();
584 if !matches_single_json_condition(condition, &msg_json) {
585 return false;
586 }
587 }
588
589 true
590}
591
592fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
593 let condition = condition.trim();
595
596 let ops = ["!=", ">=", "<=", "=", ">", "<"];
598 let mut found_op = None;
599 let mut op_pos = 0;
600 let mut op_len = 0;
601
602 for op in &ops {
603 if let Some(pos) = condition.find(op) {
604 let before = &condition[..pos];
606 let quote_count = before.chars().filter(|&c| c == '"').count();
607 if quote_count % 2 == 0 {
608 found_op = Some(*op);
609 op_pos = pos;
610 op_len = op.len();
611 break;
612 }
613 }
614 }
615
616 let (op, field_part, value_part) = match found_op {
617 Some(op) => (
618 op,
619 condition[..op_pos].trim(),
620 condition[op_pos + op_len..].trim(),
621 ),
622 None => {
623 if let Some(path) = condition.strip_prefix("$.") {
626 return resolve_json_path_simple(json, path).is_some();
627 }
628 return true;
629 }
630 };
631
632 let path = match field_part.strip_prefix("$.") {
634 Some(p) => p,
635 None => return false, };
637
638 let actual_value = match resolve_json_path_simple(json, path) {
639 Some(v) => v,
640 None => return op == "!=", };
642
643 let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
645 let s = &value_part[1..value_part.len() - 1];
647 match op {
648 "=" => actual_value.as_str() == Some(s),
649 "!=" => actual_value.as_str() != Some(s),
650 _ => false,
651 }
652 } else if let Ok(expected_num) = value_part.parse::<f64>() {
653 let actual_num = actual_value.as_f64();
655 match (op, actual_num) {
656 ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
657 ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
658 (">", Some(n)) => n > expected_num,
659 ("<", Some(n)) => n < expected_num,
660 (">=", Some(n)) => n >= expected_num,
661 ("<=", Some(n)) => n <= expected_num,
662 _ => false,
663 }
664 } else if value_part == "true" || value_part == "false" {
665 let expected_bool = value_part == "true";
666 match op {
667 "=" => actual_value.as_bool() == Some(expected_bool),
668 "!=" => actual_value.as_bool() != Some(expected_bool),
669 _ => false,
670 }
671 } else {
672 false };
674
675 expected_str
676}
677
678fn resolve_json_path_simple<'a>(
680 json: &'a serde_json::Value,
681 path: &str,
682) -> Option<&'a serde_json::Value> {
683 let mut current = json;
684 for part in path.split('.') {
685 current = current.get(part)?;
686 }
687 if current.is_null() {
688 None
689 } else {
690 Some(current)
691 }
692}
693
694#[cfg(test)]
695pub(crate) mod test_helpers {
696 use super::*;
697 use bytes::Bytes;
698 use fakecloud_core::delivery::DeliveryBus;
699 use http::{HeaderMap, Method};
700 use std::collections::HashMap;
701 use std::sync::Arc;
702
703 pub fn make_service() -> LogsService {
704 let state = Arc::new(parking_lot::RwLock::new(
705 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
706 ));
707 let delivery_bus = Arc::new(DeliveryBus::new());
708 LogsService::new(state, delivery_bus)
709 }
710
711 pub fn make_request(
712 action: &str,
713 body: serde_json::Value,
714 ) -> fakecloud_core::service::AwsRequest {
715 fakecloud_core::service::AwsRequest {
716 service: "logs".to_string(),
717 action: action.to_string(),
718 region: "us-east-1".to_string(),
719 account_id: "123456789012".to_string(),
720 request_id: "test-request-id".to_string(),
721 headers: HeaderMap::new(),
722 query_params: HashMap::new(),
723 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
724 body_stream: parking_lot::Mutex::new(None),
725 path_segments: vec![],
726 raw_path: "/".to_string(),
727 raw_query: String::new(),
728 method: Method::POST,
729 is_query_protocol: false,
730 access_key_id: None,
731 principal: None,
732 }
733 }
734
735 pub fn create_group(svc: &LogsService, name: &str) {
736 let req = make_request(
737 "CreateLogGroup",
738 serde_json::json!({ "logGroupName": name }),
739 );
740 svc.create_log_group(&req).unwrap();
741 }
742
743 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
744 let req = make_request(
745 "CreateLogStream",
746 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
747 );
748 svc.create_log_stream(&req).unwrap();
749 }
750
751 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
752 let now = chrono::Utc::now().timestamp_millis();
753 let events: Vec<serde_json::Value> = messages
754 .iter()
755 .enumerate()
756 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
757 .collect();
758 let req = make_request(
759 "PutLogEvents",
760 serde_json::json!({
761 "logGroupName": group,
762 "logStreamName": stream,
763 "logEvents": events,
764 }),
765 );
766 svc.put_log_events(&req).unwrap();
767 }
768
769 pub fn put_events_at(
770 svc: &LogsService,
771 group: &str,
772 stream: &str,
773 messages: &[&str],
774 timestamp: i64,
775 ) {
776 let events: Vec<serde_json::Value> = messages
777 .iter()
778 .enumerate()
779 .map(
780 |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
781 )
782 .collect();
783 let req = make_request(
784 "PutLogEvents",
785 serde_json::json!({
786 "logGroupName": group,
787 "logStreamName": stream,
788 "logEvents": events,
789 }),
790 );
791 svc.put_log_events(&req).unwrap();
792 }
793
794 pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
795 let req = make_request(
796 "PutRetentionPolicy",
797 serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
798 );
799 svc.put_retention_policy(&req).unwrap();
800 }
801
802 #[test]
803 fn array_filter_pattern_does_not_match() {
804 assert!(
805 !matches_filter_pattern("[w1, w2, w3]", "some log message"),
806 "array-style filter pattern must not match (fail closed)"
807 );
808 }
809
810 #[test]
811 fn unrecognized_json_filter_path_does_not_match() {
812 assert!(
815 !matches_single_json_condition(
816 "level = \"ERROR\"",
817 &serde_json::json!({"level": "ERROR"}),
818 ),
819 "filter condition without $. prefix must not match (fail closed)"
820 );
821 }
822
823 #[test]
824 fn unknown_value_format_does_not_match() {
825 assert!(
827 !matches_single_json_condition(
828 "$.level = ERROR",
829 &serde_json::json!({"level": "ERROR"}),
830 ),
831 "unquoted non-numeric non-boolean value must not match (fail closed)"
832 );
833 }
834}