1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27fn is_read_only_action(action: &str) -> bool {
30 matches!(
31 action,
32 "DescribeLogGroups"
33 | "DescribeLogStreams"
34 | "GetLogEvents"
35 | "FilterLogEvents"
36 | "ListTagsLogGroup"
37 | "ListTagsForResource"
38 | "DescribeSubscriptionFilters"
39 | "DescribeMetricFilters"
40 | "DescribeResourcePolicies"
41 | "DescribeDestinations"
42 | "GetQueryResults"
43 | "DescribeQueries"
44 | "DescribeExportTasks"
45 | "GetDeliveryDestination"
46 | "DescribeDeliveryDestinations"
47 | "GetDeliveryDestinationPolicy"
48 | "GetDeliverySource"
49 | "DescribeDeliverySources"
50 | "GetDelivery"
51 | "DescribeDeliveries"
52 | "DescribeQueryDefinitions"
53 | "DescribeAccountPolicies"
54 | "GetDataProtectionPolicy"
55 | "DescribeIndexPolicies"
56 | "DescribeFieldIndexes"
57 | "GetTransformer"
58 | "TestTransformer"
59 | "GetLogAnomalyDetector"
60 | "ListLogAnomalyDetectors"
61 | "GetLogGroupFields"
62 | "TestMetricFilter"
63 | "GetLogRecord"
64 | "ListAnomalies"
65 | "DescribeImportTasks"
66 | "DescribeImportTaskBatches"
67 | "GetIntegration"
68 | "ListIntegrations"
69 | "GetLookupTable"
70 | "DescribeLookupTables"
71 | "GetScheduledQuery"
72 | "GetScheduledQueryHistory"
73 | "ListScheduledQueries"
74 | "StartLiveTail"
75 | "ListLogGroups"
76 | "ListLogGroupsForQuery"
77 | "ListAggregateLogGroupSummaries"
78 | "GetLogObject"
79 | "GetLogFields"
80 | "ListSourcesForS3TableIntegration"
81 | "DescribeConfigurationTemplates"
82 | "GetExportedData"
83 )
84}
85
86pub struct LogsService {
87 state: SharedLogsState,
88 delivery_bus: Arc<DeliveryBus>,
89 snapshot_store: Option<Arc<dyn SnapshotStore>>,
90 snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95 Self {
96 state,
97 delivery_bus,
98 snapshot_store: None,
99 snapshot_lock: Arc::new(AsyncMutex::new(())),
100 }
101 }
102
103 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104 self.snapshot_store = Some(store);
105 self
106 }
107
108 async fn save_snapshot(&self) {
112 save_logs_snapshot(
113 &self.state,
114 self.snapshot_store.clone(),
115 &self.snapshot_lock,
116 )
117 .await;
118 }
119
120 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125 let store = self.snapshot_store.clone()?;
126 let state = self.state.clone();
127 let lock = self.snapshot_lock.clone();
128 Some(Arc::new(move || {
129 let state = state.clone();
130 let store = store.clone();
131 let lock = lock.clone();
132 Box::pin(async move {
133 save_logs_snapshot(&state, Some(store), &lock).await;
134 })
135 }))
136 }
137}
138
139pub async fn save_logs_snapshot(
145 state: &SharedLogsState,
146 store: Option<Arc<dyn SnapshotStore>>,
147 lock: &AsyncMutex<()>,
148) {
149 let Some(store) = store else {
150 return;
151 };
152 let _guard = lock.lock().await;
153 let snapshot = LogsSnapshot {
154 schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155 accounts: Some(state.read().clone()),
156 state: None,
157 };
158 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159 let bytes = serde_json::to_vec(&snapshot)
160 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161 store.save(&bytes)
162 })
163 .await;
164 match join {
165 Ok(Ok(())) => {}
166 Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167 Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168 }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173 fn service_name(&self) -> &str {
174 "logs"
175 }
176
177 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178 let mutates = !is_read_only_action(req.action.as_str());
179 let result = match req.action.as_str() {
180 "CreateLogGroup" => self.create_log_group(&req),
181 "DeleteLogGroup" => self.delete_log_group(&req),
182 "DescribeLogGroups" => self.describe_log_groups(&req),
183 "CreateLogStream" => self.create_log_stream(&req),
184 "DeleteLogStream" => self.delete_log_stream(&req),
185 "DescribeLogStreams" => self.describe_log_streams(&req),
186 "PutLogEvents" => self.put_log_events(&req),
187 "GetLogEvents" => self.get_log_events(&req),
188 "FilterLogEvents" => self.filter_log_events(&req),
189 "TagLogGroup" => self.tag_log_group(&req),
190 "UntagLogGroup" => self.untag_log_group(&req),
191 "ListTagsLogGroup" => self.list_tags_log_group(&req),
192 "TagResource" => self.tag_resource(&req),
193 "UntagResource" => self.untag_resource(&req),
194 "ListTagsForResource" => self.list_tags_for_resource(&req),
195 "PutRetentionPolicy" => self.put_retention_policy(&req),
196 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200 "PutMetricFilter" => self.put_metric_filter(&req),
201 "DescribeMetricFilters" => self.describe_metric_filters(&req),
202 "DeleteMetricFilter" => self.delete_metric_filter(&req),
203 "PutResourcePolicy" => self.put_resource_policy(&req),
204 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206 "PutDestination" => self.put_destination(&req),
207 "DescribeDestinations" => self.describe_destinations(&req),
208 "DeleteDestination" => self.delete_destination(&req),
209 "PutDestinationPolicy" => self.put_destination_policy(&req),
210 "StartQuery" => self.start_query(&req),
211 "GetQueryResults" => self.get_query_results(&req),
212 "DescribeQueries" => self.describe_queries(&req),
213 "CreateExportTask" => self.create_export_task(&req),
214 "DescribeExportTasks" => self.describe_export_tasks(&req),
215 "CancelExportTask" => self.cancel_export_task(&req),
216 "PutDeliveryDestination" => self.put_delivery_destination(&req),
217 "GetDeliveryDestination" => self.get_delivery_destination(&req),
218 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223 "PutDeliverySource" => self.put_delivery_source(&req),
224 "GetDeliverySource" => self.get_delivery_source(&req),
225 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226 "DeleteDeliverySource" => self.delete_delivery_source(&req),
227 "CreateDelivery" => self.create_delivery(&req),
228 "GetDelivery" => self.get_delivery(&req),
229 "DescribeDeliveries" => self.describe_deliveries(&req),
230 "DeleteDelivery" => self.delete_delivery(&req),
231 "AssociateKmsKey" => self.associate_kms_key(&req),
232 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233 "PutQueryDefinition" => self.put_query_definition(&req),
234 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235 "DeleteQueryDefinition" => self.delete_query_definition(&req),
236 "PutAccountPolicy" => self.put_account_policy(&req),
237 "DescribeAccountPolicies" => self.describe_account_policies(&req),
238 "DeleteAccountPolicy" => self.delete_account_policy(&req),
239 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242 "PutIndexPolicy" => self.put_index_policy(&req),
243 "DescribeIndexPolicies" => self.describe_index_policies(&req),
244 "DeleteIndexPolicy" => self.delete_index_policy(&req),
245 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246 "PutTransformer" => self.put_transformer(&req),
247 "GetTransformer" => self.get_transformer(&req),
248 "DeleteTransformer" => self.delete_transformer(&req),
249 "TestTransformer" => self.test_transformer(&req),
250 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255 "GetLogGroupFields" => self.get_log_group_fields(&req),
256 "TestMetricFilter" => self.test_metric_filter(&req),
257 "StopQuery" => self.stop_query(&req),
258 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259 "GetLogRecord" => self.get_log_record(&req),
260 "ListAnomalies" => self.list_anomalies(&req),
261 "UpdateAnomaly" => self.update_anomaly(&req),
262 "CreateImportTask" => self.create_import_task(&req),
263 "DescribeImportTasks" => self.describe_import_tasks(&req),
264 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265 "CancelImportTask" => self.cancel_import_task(&req),
266 "PutIntegration" => self.put_integration(&req),
267 "GetIntegration" => self.get_integration(&req),
268 "DeleteIntegration" => self.delete_integration(&req),
269 "ListIntegrations" => self.list_integrations(&req),
270 "CreateLookupTable" => self.create_lookup_table(&req),
271 "GetLookupTable" => self.get_lookup_table(&req),
272 "DescribeLookupTables" => self.describe_lookup_tables(&req),
273 "DeleteLookupTable" => self.delete_lookup_table(&req),
274 "UpdateLookupTable" => self.update_lookup_table(&req),
275 "CreateScheduledQuery" => self.create_scheduled_query(&req),
276 "GetScheduledQuery" => self.get_scheduled_query(&req),
277 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278 "ListScheduledQueries" => self.list_scheduled_queries(&req),
279 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281 "StartLiveTail" => self.start_live_tail(&req),
282 "ListLogGroups" => self.list_log_groups(&req),
283 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286 "GetLogObject" => self.get_log_object(&req),
287 "GetLogFields" => self.get_log_fields(&req),
288 "AssociateSourceToS3TableIntegration" => {
289 self.associate_source_to_s3_table_integration(&req)
290 }
291 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292 "DisassociateSourceFromS3TableIntegration" => {
293 self.disassociate_source_from_s3_table_integration(&req)
294 }
295 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297 "GetExportedData" => self.get_exported_data(&req),
299 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300 };
301 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302 self.save_snapshot().await;
303 }
304 result
305 }
306
307 fn supported_actions(&self) -> &[&str] {
308 SUPPORTED_ACTIONS
309 }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313 "CreateLogGroup",
314 "DeleteLogGroup",
315 "DescribeLogGroups",
316 "CreateLogStream",
317 "DeleteLogStream",
318 "DescribeLogStreams",
319 "PutLogEvents",
320 "GetLogEvents",
321 "FilterLogEvents",
322 "TagLogGroup",
323 "UntagLogGroup",
324 "ListTagsLogGroup",
325 "TagResource",
326 "UntagResource",
327 "ListTagsForResource",
328 "PutRetentionPolicy",
329 "DeleteRetentionPolicy",
330 "PutSubscriptionFilter",
331 "DescribeSubscriptionFilters",
332 "DeleteSubscriptionFilter",
333 "PutMetricFilter",
334 "DescribeMetricFilters",
335 "DeleteMetricFilter",
336 "PutResourcePolicy",
337 "DescribeResourcePolicies",
338 "DeleteResourcePolicy",
339 "PutDestination",
340 "DescribeDestinations",
341 "DeleteDestination",
342 "PutDestinationPolicy",
343 "StartQuery",
344 "GetQueryResults",
345 "DescribeQueries",
346 "CreateExportTask",
347 "DescribeExportTasks",
348 "CancelExportTask",
349 "PutDeliveryDestination",
350 "GetDeliveryDestination",
351 "DescribeDeliveryDestinations",
352 "DeleteDeliveryDestination",
353 "PutDeliveryDestinationPolicy",
354 "GetDeliveryDestinationPolicy",
355 "DeleteDeliveryDestinationPolicy",
356 "PutDeliverySource",
357 "GetDeliverySource",
358 "DescribeDeliverySources",
359 "DeleteDeliverySource",
360 "CreateDelivery",
361 "GetDelivery",
362 "DescribeDeliveries",
363 "DeleteDelivery",
364 "AssociateKmsKey",
365 "DisassociateKmsKey",
366 "PutQueryDefinition",
367 "DescribeQueryDefinitions",
368 "DeleteQueryDefinition",
369 "PutAccountPolicy",
370 "DescribeAccountPolicies",
371 "DeleteAccountPolicy",
372 "PutDataProtectionPolicy",
373 "GetDataProtectionPolicy",
374 "DeleteDataProtectionPolicy",
375 "PutIndexPolicy",
376 "DescribeIndexPolicies",
377 "DeleteIndexPolicy",
378 "DescribeFieldIndexes",
379 "PutTransformer",
380 "GetTransformer",
381 "DeleteTransformer",
382 "TestTransformer",
383 "CreateLogAnomalyDetector",
384 "GetLogAnomalyDetector",
385 "DeleteLogAnomalyDetector",
386 "ListLogAnomalyDetectors",
387 "UpdateLogAnomalyDetector",
388 "GetLogGroupFields",
389 "TestMetricFilter",
390 "StopQuery",
391 "PutLogGroupDeletionProtection",
392 "GetLogRecord",
393 "ListAnomalies",
394 "UpdateAnomaly",
395 "CreateImportTask",
396 "DescribeImportTasks",
397 "DescribeImportTaskBatches",
398 "CancelImportTask",
399 "PutIntegration",
400 "GetIntegration",
401 "DeleteIntegration",
402 "ListIntegrations",
403 "CreateLookupTable",
404 "GetLookupTable",
405 "DescribeLookupTables",
406 "DeleteLookupTable",
407 "UpdateLookupTable",
408 "CreateScheduledQuery",
409 "GetScheduledQuery",
410 "GetScheduledQueryHistory",
411 "ListScheduledQueries",
412 "DeleteScheduledQuery",
413 "UpdateScheduledQuery",
414 "StartLiveTail",
415 "ListLogGroups",
416 "ListLogGroupsForQuery",
417 "ListAggregateLogGroupSummaries",
418 "PutBearerTokenAuthentication",
419 "GetLogObject",
420 "GetLogFields",
421 "AssociateSourceToS3TableIntegration",
422 "ListSourcesForS3TableIntegration",
423 "DisassociateSourceFromS3TableIntegration",
424 "UpdateDeliveryConfiguration",
425 "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429 body[field].as_str().ok_or_else(|| {
430 AwsServiceError::aws_error(
431 StatusCode::BAD_REQUEST,
432 "InvalidParameterException",
433 format!("{field} is required"),
434 )
435 })
436}
437
438fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441 let mut m: serde_json::Map<String, Value> =
442 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443 m.entry("destinationResourceArn".to_string())
444 .or_insert_with(|| json!(""));
445 Value::Object(m)
446}
447
448pub fn infer_delivery_destination_type(destination_arn: Option<&String>) -> String {
452 let arn = destination_arn.map(String::as_str).unwrap_or("");
453 let service = arn.split(':').nth(2).unwrap_or("");
454 match service {
455 "s3" => "S3",
456 "firehose" => "FH",
457 "xray" => "XRAY",
458 _ => "CWL",
459 }
460 .to_string()
461}
462
463fn generate_sequence_token() -> String {
464 use std::time::{SystemTime, UNIX_EPOCH};
465 let nanos = SystemTime::now()
466 .duration_since(UNIX_EPOCH)
467 .unwrap_or_default()
468 .as_nanos();
469 format!("{:038}", nanos % 10u128.pow(38))
471}
472
473fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
474 AwsServiceError::aws_error(
475 StatusCode::BAD_REQUEST,
476 "InvalidParameterException",
477 format!(
478 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
479 ),
480 )
481}
482
483fn resolve_log_group_name(
486 log_group_name: Option<&str>,
487 resource_identifier: Option<&str>,
488) -> Result<String, AwsServiceError> {
489 if let Some(identifier) = resource_identifier {
490 if identifier.starts_with("arn:") {
491 extract_log_group_from_arn(identifier).ok_or_else(|| {
492 AwsServiceError::aws_error(
493 StatusCode::BAD_REQUEST,
494 "InvalidParameterException",
495 format!("Invalid ARN: {identifier}"),
496 )
497 })
498 } else {
499 Ok(identifier.to_string())
500 }
501 } else if let Some(name) = log_group_name {
502 Ok(name.to_string())
503 } else {
504 Err(AwsServiceError::aws_error(
505 StatusCode::BAD_REQUEST,
506 "InvalidParameterException",
507 "Either logGroupName or resourceIdentifier is required",
508 ))
509 }
510}
511
512pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
514 let parts: Vec<&str> = arn.splitn(7, ':').collect();
516 if parts.len() >= 7 && parts[5] == "log-group" {
517 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
518 Some(name.to_string())
519 } else {
520 None
521 }
522}
523
524fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
532 let pattern = pattern.trim();
533
534 if pattern.is_empty() {
536 return true;
537 }
538
539 if pattern.starts_with('{') && pattern.ends_with('}') {
541 return matches_json_filter_pattern(pattern, message);
542 }
543
544 if pattern.starts_with('[') {
546 return false;
547 }
548
549 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
551 let inner = &pattern[1..pattern.len() - 1];
552 let unescaped = inner.replace("\\\"", "\"");
554 return message.contains(&unescaped);
555 }
556
557 let terms = parse_filter_terms(pattern);
559 terms.iter().all(|term| message.contains(term.as_str()))
560}
561
562fn parse_filter_terms(pattern: &str) -> Vec<String> {
564 let mut terms = Vec::new();
565 let mut chars = pattern.chars().peekable();
566
567 while chars.peek().is_some() {
568 while chars.peek().is_some_and(|c| c.is_whitespace()) {
570 chars.next();
571 }
572
573 if chars.peek().is_none() {
574 break;
575 }
576
577 if chars.peek() == Some(&'"') {
578 chars.next(); let mut term = String::new();
581 loop {
582 match chars.next() {
583 Some('\\') => {
584 if let Some(c) = chars.next() {
585 term.push(c);
586 }
587 }
588 Some('"') => break,
589 Some(c) => term.push(c),
590 None => break,
591 }
592 }
593 terms.push(term);
594 } else {
595 let mut term = String::new();
597 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
598 term.push(chars.next().unwrap());
599 }
600 if !term.is_empty() {
601 terms.push(term);
602 }
603 }
604 }
605
606 terms
607}
608
609fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
611 let inner = pattern
613 .strip_prefix('{')
614 .and_then(|s| s.strip_suffix('}'))
615 .unwrap_or("")
616 .trim();
617
618 if inner.is_empty() {
619 return true;
620 }
621
622 let msg_json: serde_json::Value = match serde_json::from_str(message) {
624 Ok(v) => v,
625 Err(_) => return false, };
627
628 let conditions: Vec<&str> = inner.split("&&").collect();
632
633 for condition in conditions {
634 let condition = condition.trim();
635 if !matches_single_json_condition(condition, &msg_json) {
636 return false;
637 }
638 }
639
640 true
641}
642
643fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
644 let condition = condition.trim();
646
647 let ops = ["!=", ">=", "<=", "=", ">", "<"];
649 let mut found_op = None;
650 let mut op_pos = 0;
651 let mut op_len = 0;
652
653 for op in &ops {
654 if let Some(pos) = condition.find(op) {
655 let before = &condition[..pos];
657 let quote_count = before.chars().filter(|&c| c == '"').count();
658 if quote_count % 2 == 0 {
659 found_op = Some(*op);
660 op_pos = pos;
661 op_len = op.len();
662 break;
663 }
664 }
665 }
666
667 let (op, field_part, value_part) = match found_op {
668 Some(op) => (
669 op,
670 condition[..op_pos].trim(),
671 condition[op_pos + op_len..].trim(),
672 ),
673 None => {
674 if let Some(path) = condition.strip_prefix("$.") {
677 return resolve_json_path_simple(json, path).is_some();
678 }
679 return true;
680 }
681 };
682
683 let path = match field_part.strip_prefix("$.") {
685 Some(p) => p,
686 None => return false, };
688
689 let actual_value = match resolve_json_path_simple(json, path) {
690 Some(v) => v,
691 None => return op == "!=", };
693
694 let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
696 let s = &value_part[1..value_part.len() - 1];
698 match op {
699 "=" => actual_value.as_str() == Some(s),
700 "!=" => actual_value.as_str() != Some(s),
701 _ => false,
702 }
703 } else if let Ok(expected_num) = value_part.parse::<f64>() {
704 let actual_num = actual_value.as_f64();
706 match (op, actual_num) {
707 ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
708 ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
709 (">", Some(n)) => n > expected_num,
710 ("<", Some(n)) => n < expected_num,
711 (">=", Some(n)) => n >= expected_num,
712 ("<=", Some(n)) => n <= expected_num,
713 _ => false,
714 }
715 } else if value_part == "true" || value_part == "false" {
716 let expected_bool = value_part == "true";
717 match op {
718 "=" => actual_value.as_bool() == Some(expected_bool),
719 "!=" => actual_value.as_bool() != Some(expected_bool),
720 _ => false,
721 }
722 } else {
723 false };
725
726 expected_str
727}
728
729fn resolve_json_path_simple<'a>(
731 json: &'a serde_json::Value,
732 path: &str,
733) -> Option<&'a serde_json::Value> {
734 let mut current = json;
735 for part in path.split('.') {
736 current = current.get(part)?;
737 }
738 if current.is_null() {
739 None
740 } else {
741 Some(current)
742 }
743}
744
745#[cfg(test)]
746pub(crate) mod test_helpers {
747 use super::*;
748 use bytes::Bytes;
749 use fakecloud_core::delivery::DeliveryBus;
750 use http::{HeaderMap, Method};
751 use std::collections::HashMap;
752 use std::sync::Arc;
753
754 pub fn make_service() -> LogsService {
755 let state = Arc::new(parking_lot::RwLock::new(
756 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
757 ));
758 let delivery_bus = Arc::new(DeliveryBus::new());
759 LogsService::new(state, delivery_bus)
760 }
761
762 pub fn make_request(
763 action: &str,
764 body: serde_json::Value,
765 ) -> fakecloud_core::service::AwsRequest {
766 fakecloud_core::service::AwsRequest {
767 service: "logs".to_string(),
768 action: action.to_string(),
769 region: "us-east-1".to_string(),
770 account_id: "123456789012".to_string(),
771 request_id: "test-request-id".to_string(),
772 headers: HeaderMap::new(),
773 query_params: HashMap::new(),
774 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
775 body_stream: parking_lot::Mutex::new(None),
776 path_segments: vec![],
777 raw_path: "/".to_string(),
778 raw_query: String::new(),
779 method: Method::POST,
780 is_query_protocol: false,
781 access_key_id: None,
782 principal: None,
783 }
784 }
785
786 pub fn create_group(svc: &LogsService, name: &str) {
787 let req = make_request(
788 "CreateLogGroup",
789 serde_json::json!({ "logGroupName": name }),
790 );
791 svc.create_log_group(&req).unwrap();
792 }
793
794 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
795 let req = make_request(
796 "CreateLogStream",
797 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
798 );
799 svc.create_log_stream(&req).unwrap();
800 }
801
802 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
803 let now = chrono::Utc::now().timestamp_millis();
804 let events: Vec<serde_json::Value> = messages
805 .iter()
806 .enumerate()
807 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
808 .collect();
809 let req = make_request(
810 "PutLogEvents",
811 serde_json::json!({
812 "logGroupName": group,
813 "logStreamName": stream,
814 "logEvents": events,
815 }),
816 );
817 svc.put_log_events(&req).unwrap();
818 }
819
820 pub fn put_events_at(
821 svc: &LogsService,
822 group: &str,
823 stream: &str,
824 messages: &[&str],
825 timestamp: i64,
826 ) {
827 let events: Vec<serde_json::Value> = messages
828 .iter()
829 .enumerate()
830 .map(
831 |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
832 )
833 .collect();
834 let req = make_request(
835 "PutLogEvents",
836 serde_json::json!({
837 "logGroupName": group,
838 "logStreamName": stream,
839 "logEvents": events,
840 }),
841 );
842 svc.put_log_events(&req).unwrap();
843 }
844
845 pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
846 let req = make_request(
847 "PutRetentionPolicy",
848 serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
849 );
850 svc.put_retention_policy(&req).unwrap();
851 }
852
853 #[test]
854 fn array_filter_pattern_does_not_match() {
855 assert!(
856 !matches_filter_pattern("[w1, w2, w3]", "some log message"),
857 "array-style filter pattern must not match (fail closed)"
858 );
859 }
860
861 #[test]
862 fn unrecognized_json_filter_path_does_not_match() {
863 assert!(
866 !matches_single_json_condition(
867 "level = \"ERROR\"",
868 &serde_json::json!({"level": "ERROR"}),
869 ),
870 "filter condition without $. prefix must not match (fail closed)"
871 );
872 }
873
874 #[test]
875 fn unknown_value_format_does_not_match() {
876 assert!(
878 !matches_single_json_condition(
879 "$.level = ERROR",
880 &serde_json::json!({"level": "ERROR"}),
881 ),
882 "unquoted non-numeric non-boolean value must not match (fail closed)"
883 );
884 }
885
886 #[test]
888 fn snapshot_hook_is_none_without_store() {
889 let svc = make_service();
890 assert!(svc.snapshot_hook().is_none());
891 }
892
893 #[tokio::test]
897 async fn snapshot_hook_fires_with_store() {
898 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
899 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
900 let svc = make_service().with_snapshot_store(store);
901 let hook = svc
902 .snapshot_hook()
903 .expect("hook present when a store is set");
904 hook().await;
906 }
907}