Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27/// CloudWatch Logs actions that do NOT mutate state. Everything else
28/// triggers a snapshot save on HTTP 2xx.
29fn is_read_only_action(action: &str) -> bool {
30    matches!(
31        action,
32        "DescribeLogGroups"
33            | "DescribeLogStreams"
34            | "GetLogEvents"
35            | "FilterLogEvents"
36            | "ListTagsLogGroup"
37            | "ListTagsForResource"
38            | "DescribeSubscriptionFilters"
39            | "DescribeMetricFilters"
40            | "DescribeResourcePolicies"
41            | "DescribeDestinations"
42            | "GetQueryResults"
43            | "DescribeQueries"
44            | "DescribeExportTasks"
45            | "GetDeliveryDestination"
46            | "DescribeDeliveryDestinations"
47            | "GetDeliveryDestinationPolicy"
48            | "GetDeliverySource"
49            | "DescribeDeliverySources"
50            | "GetDelivery"
51            | "DescribeDeliveries"
52            | "DescribeQueryDefinitions"
53            | "DescribeAccountPolicies"
54            | "GetDataProtectionPolicy"
55            | "DescribeIndexPolicies"
56            | "DescribeFieldIndexes"
57            | "GetTransformer"
58            | "TestTransformer"
59            | "GetLogAnomalyDetector"
60            | "ListLogAnomalyDetectors"
61            | "GetLogGroupFields"
62            | "TestMetricFilter"
63            | "GetLogRecord"
64            | "ListAnomalies"
65            | "DescribeImportTasks"
66            | "DescribeImportTaskBatches"
67            | "GetIntegration"
68            | "ListIntegrations"
69            | "GetLookupTable"
70            | "DescribeLookupTables"
71            | "GetScheduledQuery"
72            | "GetScheduledQueryHistory"
73            | "ListScheduledQueries"
74            | "StartLiveTail"
75            | "ListLogGroups"
76            | "ListLogGroupsForQuery"
77            | "ListAggregateLogGroupSummaries"
78            | "GetLogObject"
79            | "GetLogFields"
80            | "ListSourcesForS3TableIntegration"
81            | "DescribeConfigurationTemplates"
82            | "GetExportedData"
83    )
84}
85
86pub struct LogsService {
87    state: SharedLogsState,
88    delivery_bus: Arc<DeliveryBus>,
89    snapshot_store: Option<Arc<dyn SnapshotStore>>,
90    snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95        Self {
96            state,
97            delivery_bus,
98            snapshot_store: None,
99            snapshot_lock: Arc::new(AsyncMutex::new(())),
100        }
101    }
102
103    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104        self.snapshot_store = Some(store);
105        self
106    }
107
108    /// Persist current state as a snapshot. Held across the
109    /// clone-serialize-write sequence to prevent stale-last writes,
110    /// with serde + file I/O offloaded to the blocking pool.
111    async fn save_snapshot(&self) {
112        save_logs_snapshot(
113            &self.state,
114            self.snapshot_store.clone(),
115            &self.snapshot_lock,
116        )
117        .await;
118    }
119
120    /// Build a hook that persists the current Logs state when invoked, or `None`
121    /// in memory mode (no snapshot store). The CloudFormation provisioner
122    /// mutates `state` directly and uses this to write a CFN-provisioned
123    /// resource through to disk, the same way a direct mutating API call would.
124    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125        let store = self.snapshot_store.clone()?;
126        let state = self.state.clone();
127        let lock = self.snapshot_lock.clone();
128        Some(Arc::new(move || {
129            let state = state.clone();
130            let store = store.clone();
131            let lock = lock.clone();
132            Box::pin(async move {
133                save_logs_snapshot(&state, Some(store), &lock).await;
134            })
135        }))
136    }
137}
138
139/// Persist the current Logs state as a snapshot. Offloads the serde + blocking
140/// file write to the Tokio blocking pool. Noop when `store` is `None` (memory
141/// mode). Shared by `LogsService::save_snapshot` and the CloudFormation
142/// provisioner's post-provision persist hook so both route through the same
143/// serialize-and-write path.
144pub async fn save_logs_snapshot(
145    state: &SharedLogsState,
146    store: Option<Arc<dyn SnapshotStore>>,
147    lock: &AsyncMutex<()>,
148) {
149    let Some(store) = store else {
150        return;
151    };
152    let _guard = lock.lock().await;
153    let snapshot = LogsSnapshot {
154        schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155        accounts: Some(state.read().clone()),
156        state: None,
157    };
158    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159        let bytes = serde_json::to_vec(&snapshot)
160            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161        store.save(&bytes)
162    })
163    .await;
164    match join {
165        Ok(Ok(())) => {}
166        Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167        Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168    }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173    fn service_name(&self) -> &str {
174        "logs"
175    }
176
177    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178        let mutates = !is_read_only_action(req.action.as_str());
179        let result = match req.action.as_str() {
180            "CreateLogGroup" => self.create_log_group(&req),
181            "DeleteLogGroup" => self.delete_log_group(&req),
182            "DescribeLogGroups" => self.describe_log_groups(&req),
183            "CreateLogStream" => self.create_log_stream(&req),
184            "DeleteLogStream" => self.delete_log_stream(&req),
185            "DescribeLogStreams" => self.describe_log_streams(&req),
186            "PutLogEvents" => self.put_log_events(&req),
187            "GetLogEvents" => self.get_log_events(&req),
188            "FilterLogEvents" => self.filter_log_events(&req),
189            "TagLogGroup" => self.tag_log_group(&req),
190            "UntagLogGroup" => self.untag_log_group(&req),
191            "ListTagsLogGroup" => self.list_tags_log_group(&req),
192            "TagResource" => self.tag_resource(&req),
193            "UntagResource" => self.untag_resource(&req),
194            "ListTagsForResource" => self.list_tags_for_resource(&req),
195            "PutRetentionPolicy" => self.put_retention_policy(&req),
196            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200            "PutMetricFilter" => self.put_metric_filter(&req),
201            "DescribeMetricFilters" => self.describe_metric_filters(&req),
202            "DeleteMetricFilter" => self.delete_metric_filter(&req),
203            "PutResourcePolicy" => self.put_resource_policy(&req),
204            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206            "PutDestination" => self.put_destination(&req),
207            "DescribeDestinations" => self.describe_destinations(&req),
208            "DeleteDestination" => self.delete_destination(&req),
209            "PutDestinationPolicy" => self.put_destination_policy(&req),
210            "StartQuery" => self.start_query(&req),
211            "GetQueryResults" => self.get_query_results(&req),
212            "DescribeQueries" => self.describe_queries(&req),
213            "CreateExportTask" => self.create_export_task(&req),
214            "DescribeExportTasks" => self.describe_export_tasks(&req),
215            "CancelExportTask" => self.cancel_export_task(&req),
216            "PutDeliveryDestination" => self.put_delivery_destination(&req),
217            "GetDeliveryDestination" => self.get_delivery_destination(&req),
218            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223            "PutDeliverySource" => self.put_delivery_source(&req),
224            "GetDeliverySource" => self.get_delivery_source(&req),
225            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226            "DeleteDeliverySource" => self.delete_delivery_source(&req),
227            "CreateDelivery" => self.create_delivery(&req),
228            "GetDelivery" => self.get_delivery(&req),
229            "DescribeDeliveries" => self.describe_deliveries(&req),
230            "DeleteDelivery" => self.delete_delivery(&req),
231            "AssociateKmsKey" => self.associate_kms_key(&req),
232            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233            "PutQueryDefinition" => self.put_query_definition(&req),
234            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235            "DeleteQueryDefinition" => self.delete_query_definition(&req),
236            "PutAccountPolicy" => self.put_account_policy(&req),
237            "DescribeAccountPolicies" => self.describe_account_policies(&req),
238            "DeleteAccountPolicy" => self.delete_account_policy(&req),
239            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242            "PutIndexPolicy" => self.put_index_policy(&req),
243            "DescribeIndexPolicies" => self.describe_index_policies(&req),
244            "DeleteIndexPolicy" => self.delete_index_policy(&req),
245            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246            "PutTransformer" => self.put_transformer(&req),
247            "GetTransformer" => self.get_transformer(&req),
248            "DeleteTransformer" => self.delete_transformer(&req),
249            "TestTransformer" => self.test_transformer(&req),
250            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255            "GetLogGroupFields" => self.get_log_group_fields(&req),
256            "TestMetricFilter" => self.test_metric_filter(&req),
257            "StopQuery" => self.stop_query(&req),
258            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259            "GetLogRecord" => self.get_log_record(&req),
260            "ListAnomalies" => self.list_anomalies(&req),
261            "UpdateAnomaly" => self.update_anomaly(&req),
262            "CreateImportTask" => self.create_import_task(&req),
263            "DescribeImportTasks" => self.describe_import_tasks(&req),
264            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265            "CancelImportTask" => self.cancel_import_task(&req),
266            "PutIntegration" => self.put_integration(&req),
267            "GetIntegration" => self.get_integration(&req),
268            "DeleteIntegration" => self.delete_integration(&req),
269            "ListIntegrations" => self.list_integrations(&req),
270            "CreateLookupTable" => self.create_lookup_table(&req),
271            "GetLookupTable" => self.get_lookup_table(&req),
272            "DescribeLookupTables" => self.describe_lookup_tables(&req),
273            "DeleteLookupTable" => self.delete_lookup_table(&req),
274            "UpdateLookupTable" => self.update_lookup_table(&req),
275            "CreateScheduledQuery" => self.create_scheduled_query(&req),
276            "GetScheduledQuery" => self.get_scheduled_query(&req),
277            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278            "ListScheduledQueries" => self.list_scheduled_queries(&req),
279            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281            "StartLiveTail" => self.start_live_tail(&req),
282            "ListLogGroups" => self.list_log_groups(&req),
283            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286            "GetLogObject" => self.get_log_object(&req),
287            "GetLogFields" => self.get_log_fields(&req),
288            "AssociateSourceToS3TableIntegration" => {
289                self.associate_source_to_s3_table_integration(&req)
290            }
291            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292            "DisassociateSourceFromS3TableIntegration" => {
293                self.disassociate_source_from_s3_table_integration(&req)
294            }
295            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297            // Internal action for testing export storage
298            "GetExportedData" => self.get_exported_data(&req),
299            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300        };
301        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302            self.save_snapshot().await;
303        }
304        result
305    }
306
307    fn supported_actions(&self) -> &[&str] {
308        SUPPORTED_ACTIONS
309    }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313    "CreateLogGroup",
314    "DeleteLogGroup",
315    "DescribeLogGroups",
316    "CreateLogStream",
317    "DeleteLogStream",
318    "DescribeLogStreams",
319    "PutLogEvents",
320    "GetLogEvents",
321    "FilterLogEvents",
322    "TagLogGroup",
323    "UntagLogGroup",
324    "ListTagsLogGroup",
325    "TagResource",
326    "UntagResource",
327    "ListTagsForResource",
328    "PutRetentionPolicy",
329    "DeleteRetentionPolicy",
330    "PutSubscriptionFilter",
331    "DescribeSubscriptionFilters",
332    "DeleteSubscriptionFilter",
333    "PutMetricFilter",
334    "DescribeMetricFilters",
335    "DeleteMetricFilter",
336    "PutResourcePolicy",
337    "DescribeResourcePolicies",
338    "DeleteResourcePolicy",
339    "PutDestination",
340    "DescribeDestinations",
341    "DeleteDestination",
342    "PutDestinationPolicy",
343    "StartQuery",
344    "GetQueryResults",
345    "DescribeQueries",
346    "CreateExportTask",
347    "DescribeExportTasks",
348    "CancelExportTask",
349    "PutDeliveryDestination",
350    "GetDeliveryDestination",
351    "DescribeDeliveryDestinations",
352    "DeleteDeliveryDestination",
353    "PutDeliveryDestinationPolicy",
354    "GetDeliveryDestinationPolicy",
355    "DeleteDeliveryDestinationPolicy",
356    "PutDeliverySource",
357    "GetDeliverySource",
358    "DescribeDeliverySources",
359    "DeleteDeliverySource",
360    "CreateDelivery",
361    "GetDelivery",
362    "DescribeDeliveries",
363    "DeleteDelivery",
364    "AssociateKmsKey",
365    "DisassociateKmsKey",
366    "PutQueryDefinition",
367    "DescribeQueryDefinitions",
368    "DeleteQueryDefinition",
369    "PutAccountPolicy",
370    "DescribeAccountPolicies",
371    "DeleteAccountPolicy",
372    "PutDataProtectionPolicy",
373    "GetDataProtectionPolicy",
374    "DeleteDataProtectionPolicy",
375    "PutIndexPolicy",
376    "DescribeIndexPolicies",
377    "DeleteIndexPolicy",
378    "DescribeFieldIndexes",
379    "PutTransformer",
380    "GetTransformer",
381    "DeleteTransformer",
382    "TestTransformer",
383    "CreateLogAnomalyDetector",
384    "GetLogAnomalyDetector",
385    "DeleteLogAnomalyDetector",
386    "ListLogAnomalyDetectors",
387    "UpdateLogAnomalyDetector",
388    "GetLogGroupFields",
389    "TestMetricFilter",
390    "StopQuery",
391    "PutLogGroupDeletionProtection",
392    "GetLogRecord",
393    "ListAnomalies",
394    "UpdateAnomaly",
395    "CreateImportTask",
396    "DescribeImportTasks",
397    "DescribeImportTaskBatches",
398    "CancelImportTask",
399    "PutIntegration",
400    "GetIntegration",
401    "DeleteIntegration",
402    "ListIntegrations",
403    "CreateLookupTable",
404    "GetLookupTable",
405    "DescribeLookupTables",
406    "DeleteLookupTable",
407    "UpdateLookupTable",
408    "CreateScheduledQuery",
409    "GetScheduledQuery",
410    "GetScheduledQueryHistory",
411    "ListScheduledQueries",
412    "DeleteScheduledQuery",
413    "UpdateScheduledQuery",
414    "StartLiveTail",
415    "ListLogGroups",
416    "ListLogGroupsForQuery",
417    "ListAggregateLogGroupSummaries",
418    "PutBearerTokenAuthentication",
419    "GetLogObject",
420    "GetLogFields",
421    "AssociateSourceToS3TableIntegration",
422    "ListSourcesForS3TableIntegration",
423    "DisassociateSourceFromS3TableIntegration",
424    "UpdateDeliveryConfiguration",
425    "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429    body[field].as_str().ok_or_else(|| {
430        AwsServiceError::aws_error(
431            StatusCode::BAD_REQUEST,
432            "InvalidParameterException",
433            format!("{field} is required"),
434        )
435    })
436}
437
438/// Build a delivery destination configuration JSON object, ensuring
439/// `destinationResourceArn` is always present as a string (Smithy requirement).
440fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441    let mut m: serde_json::Map<String, Value> =
442        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443    m.entry("destinationResourceArn".to_string())
444        .or_insert_with(|| json!(""));
445    Value::Object(m)
446}
447
448fn generate_sequence_token() -> String {
449    use std::time::{SystemTime, UNIX_EPOCH};
450    let nanos = SystemTime::now()
451        .duration_since(UNIX_EPOCH)
452        .unwrap_or_default()
453        .as_nanos();
454    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
455    format!("{:038}", nanos % 10u128.pow(38))
456}
457
458fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
459    AwsServiceError::aws_error(
460        StatusCode::BAD_REQUEST,
461        "InvalidParameterException",
462        format!(
463            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
464        ),
465    )
466}
467
468/// Resolve log group name from either logGroupName or resourceIdentifier.
469/// resourceIdentifier can be a log group name or an ARN.
470fn resolve_log_group_name(
471    log_group_name: Option<&str>,
472    resource_identifier: Option<&str>,
473) -> Result<String, AwsServiceError> {
474    if let Some(identifier) = resource_identifier {
475        if identifier.starts_with("arn:") {
476            extract_log_group_from_arn(identifier).ok_or_else(|| {
477                AwsServiceError::aws_error(
478                    StatusCode::BAD_REQUEST,
479                    "InvalidParameterException",
480                    format!("Invalid ARN: {identifier}"),
481                )
482            })
483        } else {
484            Ok(identifier.to_string())
485        }
486    } else if let Some(name) = log_group_name {
487        Ok(name.to_string())
488    } else {
489        Err(AwsServiceError::aws_error(
490            StatusCode::BAD_REQUEST,
491            "InvalidParameterException",
492            "Either logGroupName or resourceIdentifier is required",
493        ))
494    }
495}
496
497/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
498pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
499    // arn:aws:logs:region:account:log-group:name:*
500    let parts: Vec<&str> = arn.splitn(7, ':').collect();
501    if parts.len() >= 7 && parts[5] == "log-group" {
502        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
503        Some(name.to_string())
504    } else {
505        None
506    }
507}
508
509/// CloudWatch Logs filter pattern matching.
510///
511/// Rules:
512/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
513/// - Quoted string `"foo bar"` matches the exact substring
514/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
515/// - Single unquoted word `foo` is a simple substring match
516fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
517    let pattern = pattern.trim();
518
519    // Empty pattern matches everything
520    if pattern.is_empty() {
521        return true;
522    }
523
524    // JSON/metric filter patterns: { $.field = "value" }
525    if pattern.starts_with('{') && pattern.ends_with('}') {
526        return matches_json_filter_pattern(pattern, message);
527    }
528
529    // Array-style metric filter patterns - not implemented, fail closed
530    if pattern.starts_with('[') {
531        return false;
532    }
533
534    // Quoted pattern: exact substring match (handles escaped inner quotes)
535    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
536        let inner = &pattern[1..pattern.len() - 1];
537        // Unescape inner quotes: \"  ->  "
538        let unescaped = inner.replace("\\\"", "\"");
539        return message.contains(&unescaped);
540    }
541
542    // Multiple words: all must be present (AND semantics)
543    let terms = parse_filter_terms(pattern);
544    terms.iter().all(|term| message.contains(term.as_str()))
545}
546
547/// Parse filter pattern terms, respecting quoted strings as single terms.
548fn parse_filter_terms(pattern: &str) -> Vec<String> {
549    let mut terms = Vec::new();
550    let mut chars = pattern.chars().peekable();
551
552    while chars.peek().is_some() {
553        // Skip whitespace
554        while chars.peek().is_some_and(|c| c.is_whitespace()) {
555            chars.next();
556        }
557
558        if chars.peek().is_none() {
559            break;
560        }
561
562        if chars.peek() == Some(&'"') {
563            // Quoted term
564            chars.next(); // consume opening quote
565            let mut term = String::new();
566            loop {
567                match chars.next() {
568                    Some('\\') => {
569                        if let Some(c) = chars.next() {
570                            term.push(c);
571                        }
572                    }
573                    Some('"') => break,
574                    Some(c) => term.push(c),
575                    None => break,
576                }
577            }
578            terms.push(term);
579        } else {
580            // Unquoted term
581            let mut term = String::new();
582            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
583                term.push(chars.next().unwrap());
584            }
585            if !term.is_empty() {
586                terms.push(term);
587            }
588        }
589    }
590
591    terms
592}
593
594/// Match a JSON filter pattern like `{ $.level = "ERROR" }` against a message.
595fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
596    // Strip the outer braces
597    let inner = pattern
598        .strip_prefix('{')
599        .and_then(|s| s.strip_suffix('}'))
600        .unwrap_or("")
601        .trim();
602
603    if inner.is_empty() {
604        return true;
605    }
606
607    // Parse the message as JSON
608    let msg_json: serde_json::Value = match serde_json::from_str(message) {
609        Ok(v) => v,
610        Err(_) => return false, // Non-JSON message cannot match JSON filter
611    };
612
613    // Support: $.field = "value", $.field != "value", $.field = number,
614    //          $.field > number, $.field < number, $.field >= number, $.field <= number
615    // Also support && for multiple conditions
616    let conditions: Vec<&str> = inner.split("&&").collect();
617
618    for condition in conditions {
619        let condition = condition.trim();
620        if !matches_single_json_condition(condition, &msg_json) {
621            return false;
622        }
623    }
624
625    true
626}
627
628fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
629    // Try to parse: $.field op value
630    let condition = condition.trim();
631
632    // Find the operator
633    let ops = ["!=", ">=", "<=", "=", ">", "<"];
634    let mut found_op = None;
635    let mut op_pos = 0;
636    let mut op_len = 0;
637
638    for op in &ops {
639        if let Some(pos) = condition.find(op) {
640            // Make sure we're not inside a quoted string
641            let before = &condition[..pos];
642            let quote_count = before.chars().filter(|&c| c == '"').count();
643            if quote_count % 2 == 0 {
644                found_op = Some(*op);
645                op_pos = pos;
646                op_len = op.len();
647                break;
648            }
649        }
650    }
651
652    let (op, field_part, value_part) = match found_op {
653        Some(op) => (
654            op,
655            condition[..op_pos].trim(),
656            condition[op_pos + op_len..].trim(),
657        ),
658        None => {
659            // No operator: just check if the field exists
660            // Pattern like `{ $.field }` means field exists
661            if let Some(path) = condition.strip_prefix("$.") {
662                return resolve_json_path_simple(json, path).is_some();
663            }
664            return true;
665        }
666    };
667
668    // Extract JSON path from field_part (must start with $.)
669    let path = match field_part.strip_prefix("$.") {
670        Some(p) => p,
671        None => return false, // Don't understand this pattern, fail closed
672    };
673
674    let actual_value = match resolve_json_path_simple(json, path) {
675        Some(v) => v,
676        None => return op == "!=", // field doesn't exist: only != matches
677    };
678
679    // Parse the expected value
680    let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
681        // String comparison
682        let s = &value_part[1..value_part.len() - 1];
683        match op {
684            "=" => actual_value.as_str() == Some(s),
685            "!=" => actual_value.as_str() != Some(s),
686            _ => false,
687        }
688    } else if let Ok(expected_num) = value_part.parse::<f64>() {
689        // Numeric comparison
690        let actual_num = actual_value.as_f64();
691        match (op, actual_num) {
692            ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
693            ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
694            (">", Some(n)) => n > expected_num,
695            ("<", Some(n)) => n < expected_num,
696            (">=", Some(n)) => n >= expected_num,
697            ("<=", Some(n)) => n <= expected_num,
698            _ => false,
699        }
700    } else if value_part == "true" || value_part == "false" {
701        let expected_bool = value_part == "true";
702        match op {
703            "=" => actual_value.as_bool() == Some(expected_bool),
704            "!=" => actual_value.as_bool() != Some(expected_bool),
705            _ => false,
706        }
707    } else {
708        false // Unknown value format, fail closed
709    };
710
711    expected_str
712}
713
714/// Resolve a simple dot-separated JSON path (e.g., "level" or "nested.field").
715fn resolve_json_path_simple<'a>(
716    json: &'a serde_json::Value,
717    path: &str,
718) -> Option<&'a serde_json::Value> {
719    let mut current = json;
720    for part in path.split('.') {
721        current = current.get(part)?;
722    }
723    if current.is_null() {
724        None
725    } else {
726        Some(current)
727    }
728}
729
730#[cfg(test)]
731pub(crate) mod test_helpers {
732    use super::*;
733    use bytes::Bytes;
734    use fakecloud_core::delivery::DeliveryBus;
735    use http::{HeaderMap, Method};
736    use std::collections::HashMap;
737    use std::sync::Arc;
738
739    pub fn make_service() -> LogsService {
740        let state = Arc::new(parking_lot::RwLock::new(
741            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
742        ));
743        let delivery_bus = Arc::new(DeliveryBus::new());
744        LogsService::new(state, delivery_bus)
745    }
746
747    pub fn make_request(
748        action: &str,
749        body: serde_json::Value,
750    ) -> fakecloud_core::service::AwsRequest {
751        fakecloud_core::service::AwsRequest {
752            service: "logs".to_string(),
753            action: action.to_string(),
754            region: "us-east-1".to_string(),
755            account_id: "123456789012".to_string(),
756            request_id: "test-request-id".to_string(),
757            headers: HeaderMap::new(),
758            query_params: HashMap::new(),
759            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
760            body_stream: parking_lot::Mutex::new(None),
761            path_segments: vec![],
762            raw_path: "/".to_string(),
763            raw_query: String::new(),
764            method: Method::POST,
765            is_query_protocol: false,
766            access_key_id: None,
767            principal: None,
768        }
769    }
770
771    pub fn create_group(svc: &LogsService, name: &str) {
772        let req = make_request(
773            "CreateLogGroup",
774            serde_json::json!({ "logGroupName": name }),
775        );
776        svc.create_log_group(&req).unwrap();
777    }
778
779    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
780        let req = make_request(
781            "CreateLogStream",
782            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
783        );
784        svc.create_log_stream(&req).unwrap();
785    }
786
787    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
788        let now = chrono::Utc::now().timestamp_millis();
789        let events: Vec<serde_json::Value> = messages
790            .iter()
791            .enumerate()
792            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
793            .collect();
794        let req = make_request(
795            "PutLogEvents",
796            serde_json::json!({
797                "logGroupName": group,
798                "logStreamName": stream,
799                "logEvents": events,
800            }),
801        );
802        svc.put_log_events(&req).unwrap();
803    }
804
805    pub fn put_events_at(
806        svc: &LogsService,
807        group: &str,
808        stream: &str,
809        messages: &[&str],
810        timestamp: i64,
811    ) {
812        let events: Vec<serde_json::Value> = messages
813            .iter()
814            .enumerate()
815            .map(
816                |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
817            )
818            .collect();
819        let req = make_request(
820            "PutLogEvents",
821            serde_json::json!({
822                "logGroupName": group,
823                "logStreamName": stream,
824                "logEvents": events,
825            }),
826        );
827        svc.put_log_events(&req).unwrap();
828    }
829
830    pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
831        let req = make_request(
832            "PutRetentionPolicy",
833            serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
834        );
835        svc.put_retention_policy(&req).unwrap();
836    }
837
838    #[test]
839    fn array_filter_pattern_does_not_match() {
840        assert!(
841            !matches_filter_pattern("[w1, w2, w3]", "some log message"),
842            "array-style filter pattern must not match (fail closed)"
843        );
844    }
845
846    #[test]
847    fn unrecognized_json_filter_path_does_not_match() {
848        // A JSON filter condition where the field part doesn't start with $.
849        // should fail closed instead of matching everything.
850        assert!(
851            !matches_single_json_condition(
852                "level = \"ERROR\"",
853                &serde_json::json!({"level": "ERROR"}),
854            ),
855            "filter condition without $. prefix must not match (fail closed)"
856        );
857    }
858
859    #[test]
860    fn unknown_value_format_does_not_match() {
861        // A value that is not a string, number, or boolean should fail closed.
862        assert!(
863            !matches_single_json_condition(
864                "$.level = ERROR",
865                &serde_json::json!({"level": "ERROR"}),
866            ),
867            "unquoted non-numeric non-boolean value must not match (fail closed)"
868        );
869    }
870
871    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
872    #[test]
873    fn snapshot_hook_is_none_without_store() {
874        let svc = make_service();
875        assert!(svc.snapshot_hook().is_none());
876    }
877
878    /// With a store, the hook is present and invoking it runs the whole-state
879    /// persist path the CloudFormation provisioner uses after mutating logs
880    /// state directly.
881    #[tokio::test]
882    async fn snapshot_hook_fires_with_store() {
883        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
884            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
885        let svc = make_service().with_snapshot_store(store);
886        let hook = svc
887            .snapshot_hook()
888            .expect("hook present when a store is set");
889        // Must not panic; exercises the closure and the snapshot save path.
890        hook().await;
891    }
892}