Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27/// CloudWatch Logs actions that do NOT mutate state. Everything else
28/// triggers a snapshot save on HTTP 2xx.
29fn is_read_only_action(action: &str) -> bool {
30    matches!(
31        action,
32        "DescribeLogGroups"
33            | "DescribeLogStreams"
34            | "GetLogEvents"
35            | "FilterLogEvents"
36            | "ListTagsLogGroup"
37            | "ListTagsForResource"
38            | "DescribeSubscriptionFilters"
39            | "DescribeMetricFilters"
40            | "DescribeResourcePolicies"
41            | "DescribeDestinations"
42            | "GetQueryResults"
43            | "DescribeQueries"
44            | "DescribeExportTasks"
45            | "GetDeliveryDestination"
46            | "DescribeDeliveryDestinations"
47            | "GetDeliveryDestinationPolicy"
48            | "GetDeliverySource"
49            | "DescribeDeliverySources"
50            | "GetDelivery"
51            | "DescribeDeliveries"
52            | "DescribeQueryDefinitions"
53            | "DescribeAccountPolicies"
54            | "GetDataProtectionPolicy"
55            | "DescribeIndexPolicies"
56            | "DescribeFieldIndexes"
57            | "GetTransformer"
58            | "TestTransformer"
59            | "GetLogAnomalyDetector"
60            | "ListLogAnomalyDetectors"
61            | "GetLogGroupFields"
62            | "TestMetricFilter"
63            | "GetLogRecord"
64            | "ListAnomalies"
65            | "DescribeImportTasks"
66            | "DescribeImportTaskBatches"
67            | "GetIntegration"
68            | "ListIntegrations"
69            | "GetLookupTable"
70            | "DescribeLookupTables"
71            | "GetScheduledQuery"
72            | "GetScheduledQueryHistory"
73            | "ListScheduledQueries"
74            | "StartLiveTail"
75            | "ListLogGroups"
76            | "ListLogGroupsForQuery"
77            | "ListAggregateLogGroupSummaries"
78            | "GetLogObject"
79            | "GetLogFields"
80            | "ListSourcesForS3TableIntegration"
81            | "DescribeConfigurationTemplates"
82            | "GetExportedData"
83    )
84}
85
86pub struct LogsService {
87    state: SharedLogsState,
88    delivery_bus: Arc<DeliveryBus>,
89    snapshot_store: Option<Arc<dyn SnapshotStore>>,
90    snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95        Self {
96            state,
97            delivery_bus,
98            snapshot_store: None,
99            snapshot_lock: Arc::new(AsyncMutex::new(())),
100        }
101    }
102
103    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104        self.snapshot_store = Some(store);
105        self
106    }
107
108    /// Persist current state as a snapshot. Held across the
109    /// clone-serialize-write sequence to prevent stale-last writes,
110    /// with serde + file I/O offloaded to the blocking pool.
111    async fn save_snapshot(&self) {
112        save_logs_snapshot(
113            &self.state,
114            self.snapshot_store.clone(),
115            &self.snapshot_lock,
116        )
117        .await;
118    }
119
120    /// Build a hook that persists the current Logs state when invoked, or `None`
121    /// in memory mode (no snapshot store). The CloudFormation provisioner
122    /// mutates `state` directly and uses this to write a CFN-provisioned
123    /// resource through to disk, the same way a direct mutating API call would.
124    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125        let store = self.snapshot_store.clone()?;
126        let state = self.state.clone();
127        let lock = self.snapshot_lock.clone();
128        Some(Arc::new(move || {
129            let state = state.clone();
130            let store = store.clone();
131            let lock = lock.clone();
132            Box::pin(async move {
133                save_logs_snapshot(&state, Some(store), &lock).await;
134            })
135        }))
136    }
137}
138
139/// Persist the current Logs state as a snapshot. Offloads the serde + blocking
140/// file write to the Tokio blocking pool. Noop when `store` is `None` (memory
141/// mode). Shared by `LogsService::save_snapshot` and the CloudFormation
142/// provisioner's post-provision persist hook so both route through the same
143/// serialize-and-write path.
144pub async fn save_logs_snapshot(
145    state: &SharedLogsState,
146    store: Option<Arc<dyn SnapshotStore>>,
147    lock: &AsyncMutex<()>,
148) {
149    let Some(store) = store else {
150        return;
151    };
152    let _guard = lock.lock().await;
153    let snapshot = LogsSnapshot {
154        schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155        accounts: Some(state.read().clone()),
156        state: None,
157    };
158    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159        let bytes = serde_json::to_vec(&snapshot)
160            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161        store.save(&bytes)
162    })
163    .await;
164    match join {
165        Ok(Ok(())) => {}
166        Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167        Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168    }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173    fn service_name(&self) -> &str {
174        "logs"
175    }
176
177    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178        let mutates = !is_read_only_action(req.action.as_str());
179        let result = match req.action.as_str() {
180            "CreateLogGroup" => self.create_log_group(&req),
181            "DeleteLogGroup" => self.delete_log_group(&req),
182            "DescribeLogGroups" => self.describe_log_groups(&req),
183            "CreateLogStream" => self.create_log_stream(&req),
184            "DeleteLogStream" => self.delete_log_stream(&req),
185            "DescribeLogStreams" => self.describe_log_streams(&req),
186            "PutLogEvents" => self.put_log_events(&req),
187            "GetLogEvents" => self.get_log_events(&req),
188            "FilterLogEvents" => self.filter_log_events(&req),
189            "TagLogGroup" => self.tag_log_group(&req),
190            "UntagLogGroup" => self.untag_log_group(&req),
191            "ListTagsLogGroup" => self.list_tags_log_group(&req),
192            "TagResource" => self.tag_resource(&req),
193            "UntagResource" => self.untag_resource(&req),
194            "ListTagsForResource" => self.list_tags_for_resource(&req),
195            "PutRetentionPolicy" => self.put_retention_policy(&req),
196            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200            "PutMetricFilter" => self.put_metric_filter(&req),
201            "DescribeMetricFilters" => self.describe_metric_filters(&req),
202            "DeleteMetricFilter" => self.delete_metric_filter(&req),
203            "PutResourcePolicy" => self.put_resource_policy(&req),
204            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206            "PutDestination" => self.put_destination(&req),
207            "DescribeDestinations" => self.describe_destinations(&req),
208            "DeleteDestination" => self.delete_destination(&req),
209            "PutDestinationPolicy" => self.put_destination_policy(&req),
210            "StartQuery" => self.start_query(&req),
211            "GetQueryResults" => self.get_query_results(&req),
212            "DescribeQueries" => self.describe_queries(&req),
213            "CreateExportTask" => self.create_export_task(&req),
214            "DescribeExportTasks" => self.describe_export_tasks(&req),
215            "CancelExportTask" => self.cancel_export_task(&req),
216            "PutDeliveryDestination" => self.put_delivery_destination(&req),
217            "GetDeliveryDestination" => self.get_delivery_destination(&req),
218            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223            "PutDeliverySource" => self.put_delivery_source(&req),
224            "GetDeliverySource" => self.get_delivery_source(&req),
225            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226            "DeleteDeliverySource" => self.delete_delivery_source(&req),
227            "CreateDelivery" => self.create_delivery(&req),
228            "GetDelivery" => self.get_delivery(&req),
229            "DescribeDeliveries" => self.describe_deliveries(&req),
230            "DeleteDelivery" => self.delete_delivery(&req),
231            "AssociateKmsKey" => self.associate_kms_key(&req),
232            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233            "PutQueryDefinition" => self.put_query_definition(&req),
234            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235            "DeleteQueryDefinition" => self.delete_query_definition(&req),
236            "PutAccountPolicy" => self.put_account_policy(&req),
237            "DescribeAccountPolicies" => self.describe_account_policies(&req),
238            "DeleteAccountPolicy" => self.delete_account_policy(&req),
239            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242            "PutIndexPolicy" => self.put_index_policy(&req),
243            "DescribeIndexPolicies" => self.describe_index_policies(&req),
244            "DeleteIndexPolicy" => self.delete_index_policy(&req),
245            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246            "PutTransformer" => self.put_transformer(&req),
247            "GetTransformer" => self.get_transformer(&req),
248            "DeleteTransformer" => self.delete_transformer(&req),
249            "TestTransformer" => self.test_transformer(&req),
250            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255            "GetLogGroupFields" => self.get_log_group_fields(&req),
256            "TestMetricFilter" => self.test_metric_filter(&req),
257            "StopQuery" => self.stop_query(&req),
258            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259            "GetLogRecord" => self.get_log_record(&req),
260            "ListAnomalies" => self.list_anomalies(&req),
261            "UpdateAnomaly" => self.update_anomaly(&req),
262            "CreateImportTask" => self.create_import_task(&req),
263            "DescribeImportTasks" => self.describe_import_tasks(&req),
264            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265            "CancelImportTask" => self.cancel_import_task(&req),
266            "PutIntegration" => self.put_integration(&req),
267            "GetIntegration" => self.get_integration(&req),
268            "DeleteIntegration" => self.delete_integration(&req),
269            "ListIntegrations" => self.list_integrations(&req),
270            "CreateLookupTable" => self.create_lookup_table(&req),
271            "GetLookupTable" => self.get_lookup_table(&req),
272            "DescribeLookupTables" => self.describe_lookup_tables(&req),
273            "DeleteLookupTable" => self.delete_lookup_table(&req),
274            "UpdateLookupTable" => self.update_lookup_table(&req),
275            "CreateScheduledQuery" => self.create_scheduled_query(&req),
276            "GetScheduledQuery" => self.get_scheduled_query(&req),
277            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278            "ListScheduledQueries" => self.list_scheduled_queries(&req),
279            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281            "StartLiveTail" => self.start_live_tail(&req),
282            "ListLogGroups" => self.list_log_groups(&req),
283            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286            "GetLogObject" => self.get_log_object(&req),
287            "GetLogFields" => self.get_log_fields(&req),
288            "AssociateSourceToS3TableIntegration" => {
289                self.associate_source_to_s3_table_integration(&req)
290            }
291            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292            "DisassociateSourceFromS3TableIntegration" => {
293                self.disassociate_source_from_s3_table_integration(&req)
294            }
295            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297            // Internal action for testing export storage
298            "GetExportedData" => self.get_exported_data(&req),
299            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300        };
301        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302            self.save_snapshot().await;
303        }
304        result
305    }
306
307    fn supported_actions(&self) -> &[&str] {
308        SUPPORTED_ACTIONS
309    }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313    "CreateLogGroup",
314    "DeleteLogGroup",
315    "DescribeLogGroups",
316    "CreateLogStream",
317    "DeleteLogStream",
318    "DescribeLogStreams",
319    "PutLogEvents",
320    "GetLogEvents",
321    "FilterLogEvents",
322    "TagLogGroup",
323    "UntagLogGroup",
324    "ListTagsLogGroup",
325    "TagResource",
326    "UntagResource",
327    "ListTagsForResource",
328    "PutRetentionPolicy",
329    "DeleteRetentionPolicy",
330    "PutSubscriptionFilter",
331    "DescribeSubscriptionFilters",
332    "DeleteSubscriptionFilter",
333    "PutMetricFilter",
334    "DescribeMetricFilters",
335    "DeleteMetricFilter",
336    "PutResourcePolicy",
337    "DescribeResourcePolicies",
338    "DeleteResourcePolicy",
339    "PutDestination",
340    "DescribeDestinations",
341    "DeleteDestination",
342    "PutDestinationPolicy",
343    "StartQuery",
344    "GetQueryResults",
345    "DescribeQueries",
346    "CreateExportTask",
347    "DescribeExportTasks",
348    "CancelExportTask",
349    "PutDeliveryDestination",
350    "GetDeliveryDestination",
351    "DescribeDeliveryDestinations",
352    "DeleteDeliveryDestination",
353    "PutDeliveryDestinationPolicy",
354    "GetDeliveryDestinationPolicy",
355    "DeleteDeliveryDestinationPolicy",
356    "PutDeliverySource",
357    "GetDeliverySource",
358    "DescribeDeliverySources",
359    "DeleteDeliverySource",
360    "CreateDelivery",
361    "GetDelivery",
362    "DescribeDeliveries",
363    "DeleteDelivery",
364    "AssociateKmsKey",
365    "DisassociateKmsKey",
366    "PutQueryDefinition",
367    "DescribeQueryDefinitions",
368    "DeleteQueryDefinition",
369    "PutAccountPolicy",
370    "DescribeAccountPolicies",
371    "DeleteAccountPolicy",
372    "PutDataProtectionPolicy",
373    "GetDataProtectionPolicy",
374    "DeleteDataProtectionPolicy",
375    "PutIndexPolicy",
376    "DescribeIndexPolicies",
377    "DeleteIndexPolicy",
378    "DescribeFieldIndexes",
379    "PutTransformer",
380    "GetTransformer",
381    "DeleteTransformer",
382    "TestTransformer",
383    "CreateLogAnomalyDetector",
384    "GetLogAnomalyDetector",
385    "DeleteLogAnomalyDetector",
386    "ListLogAnomalyDetectors",
387    "UpdateLogAnomalyDetector",
388    "GetLogGroupFields",
389    "TestMetricFilter",
390    "StopQuery",
391    "PutLogGroupDeletionProtection",
392    "GetLogRecord",
393    "ListAnomalies",
394    "UpdateAnomaly",
395    "CreateImportTask",
396    "DescribeImportTasks",
397    "DescribeImportTaskBatches",
398    "CancelImportTask",
399    "PutIntegration",
400    "GetIntegration",
401    "DeleteIntegration",
402    "ListIntegrations",
403    "CreateLookupTable",
404    "GetLookupTable",
405    "DescribeLookupTables",
406    "DeleteLookupTable",
407    "UpdateLookupTable",
408    "CreateScheduledQuery",
409    "GetScheduledQuery",
410    "GetScheduledQueryHistory",
411    "ListScheduledQueries",
412    "DeleteScheduledQuery",
413    "UpdateScheduledQuery",
414    "StartLiveTail",
415    "ListLogGroups",
416    "ListLogGroupsForQuery",
417    "ListAggregateLogGroupSummaries",
418    "PutBearerTokenAuthentication",
419    "GetLogObject",
420    "GetLogFields",
421    "AssociateSourceToS3TableIntegration",
422    "ListSourcesForS3TableIntegration",
423    "DisassociateSourceFromS3TableIntegration",
424    "UpdateDeliveryConfiguration",
425    "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429    body[field].as_str().ok_or_else(|| {
430        AwsServiceError::aws_error(
431            StatusCode::BAD_REQUEST,
432            "InvalidParameterException",
433            format!("{field} is required"),
434        )
435    })
436}
437
438/// Build a delivery destination configuration JSON object, ensuring
439/// `destinationResourceArn` is always present as a string (Smithy requirement).
440fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441    let mut m: serde_json::Map<String, Value> =
442        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443    m.entry("destinationResourceArn".to_string())
444        .or_insert_with(|| json!(""));
445    Value::Object(m)
446}
447
448/// Infer a delivery destination's type from its destination resource ARN's
449/// service. Defaults to `CWL` (the most common case) when the ARN is absent or
450/// unrecognised.
451pub fn infer_delivery_destination_type(destination_arn: Option<&String>) -> String {
452    let arn = destination_arn.map(String::as_str).unwrap_or("");
453    let service = arn.split(':').nth(2).unwrap_or("");
454    match service {
455        "s3" => "S3",
456        "firehose" => "FH",
457        "xray" => "XRAY",
458        _ => "CWL",
459    }
460    .to_string()
461}
462
463fn generate_sequence_token() -> String {
464    use std::time::{SystemTime, UNIX_EPOCH};
465    let nanos = SystemTime::now()
466        .duration_since(UNIX_EPOCH)
467        .unwrap_or_default()
468        .as_nanos();
469    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
470    format!("{:038}", nanos % 10u128.pow(38))
471}
472
473fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
474    AwsServiceError::aws_error(
475        StatusCode::BAD_REQUEST,
476        "InvalidParameterException",
477        format!(
478            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
479        ),
480    )
481}
482
483/// Resolve log group name from either logGroupName or resourceIdentifier.
484/// resourceIdentifier can be a log group name or an ARN.
485fn resolve_log_group_name(
486    log_group_name: Option<&str>,
487    resource_identifier: Option<&str>,
488) -> Result<String, AwsServiceError> {
489    if let Some(identifier) = resource_identifier {
490        if identifier.starts_with("arn:") {
491            extract_log_group_from_arn(identifier).ok_or_else(|| {
492                AwsServiceError::aws_error(
493                    StatusCode::BAD_REQUEST,
494                    "InvalidParameterException",
495                    format!("Invalid ARN: {identifier}"),
496                )
497            })
498        } else {
499            Ok(identifier.to_string())
500        }
501    } else if let Some(name) = log_group_name {
502        Ok(name.to_string())
503    } else {
504        Err(AwsServiceError::aws_error(
505            StatusCode::BAD_REQUEST,
506            "InvalidParameterException",
507            "Either logGroupName or resourceIdentifier is required",
508        ))
509    }
510}
511
512/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
513pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
514    // arn:aws:logs:region:account:log-group:name:*
515    let parts: Vec<&str> = arn.splitn(7, ':').collect();
516    if parts.len() >= 7 && parts[5] == "log-group" {
517        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
518        Some(name.to_string())
519    } else {
520        None
521    }
522}
523
524/// CloudWatch Logs filter pattern matching.
525///
526/// Rules:
527/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
528/// - Quoted string `"foo bar"` matches the exact substring
529/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
530/// - Single unquoted word `foo` is a simple substring match
531fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
532    let pattern = pattern.trim();
533
534    // Empty pattern matches everything
535    if pattern.is_empty() {
536        return true;
537    }
538
539    // JSON/metric filter patterns: { $.field = "value" }
540    if pattern.starts_with('{') && pattern.ends_with('}') {
541        return matches_json_filter_pattern(pattern, message);
542    }
543
544    // Array-style metric filter patterns - not implemented, fail closed
545    if pattern.starts_with('[') {
546        return false;
547    }
548
549    // Quoted pattern: exact substring match (handles escaped inner quotes)
550    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
551        let inner = &pattern[1..pattern.len() - 1];
552        // Unescape inner quotes: \"  ->  "
553        let unescaped = inner.replace("\\\"", "\"");
554        return message.contains(&unescaped);
555    }
556
557    // Multiple words: all must be present (AND semantics)
558    let terms = parse_filter_terms(pattern);
559    terms.iter().all(|term| message.contains(term.as_str()))
560}
561
562/// Parse filter pattern terms, respecting quoted strings as single terms.
563fn parse_filter_terms(pattern: &str) -> Vec<String> {
564    let mut terms = Vec::new();
565    let mut chars = pattern.chars().peekable();
566
567    while chars.peek().is_some() {
568        // Skip whitespace
569        while chars.peek().is_some_and(|c| c.is_whitespace()) {
570            chars.next();
571        }
572
573        if chars.peek().is_none() {
574            break;
575        }
576
577        if chars.peek() == Some(&'"') {
578            // Quoted term
579            chars.next(); // consume opening quote
580            let mut term = String::new();
581            loop {
582                match chars.next() {
583                    Some('\\') => {
584                        if let Some(c) = chars.next() {
585                            term.push(c);
586                        }
587                    }
588                    Some('"') => break,
589                    Some(c) => term.push(c),
590                    None => break,
591                }
592            }
593            terms.push(term);
594        } else {
595            // Unquoted term
596            let mut term = String::new();
597            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
598                term.push(chars.next().unwrap());
599            }
600            if !term.is_empty() {
601                terms.push(term);
602            }
603        }
604    }
605
606    terms
607}
608
609/// Match a JSON filter pattern like `{ $.level = "ERROR" }` against a message.
610fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
611    // Strip the outer braces
612    let inner = pattern
613        .strip_prefix('{')
614        .and_then(|s| s.strip_suffix('}'))
615        .unwrap_or("")
616        .trim();
617
618    if inner.is_empty() {
619        return true;
620    }
621
622    // Parse the message as JSON
623    let msg_json: serde_json::Value = match serde_json::from_str(message) {
624        Ok(v) => v,
625        Err(_) => return false, // Non-JSON message cannot match JSON filter
626    };
627
628    // Support: $.field = "value", $.field != "value", $.field = number,
629    //          $.field > number, $.field < number, $.field >= number, $.field <= number
630    // Also support && for multiple conditions
631    let conditions: Vec<&str> = inner.split("&&").collect();
632
633    for condition in conditions {
634        let condition = condition.trim();
635        if !matches_single_json_condition(condition, &msg_json) {
636            return false;
637        }
638    }
639
640    true
641}
642
643fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
644    // Try to parse: $.field op value
645    let condition = condition.trim();
646
647    // Find the operator
648    let ops = ["!=", ">=", "<=", "=", ">", "<"];
649    let mut found_op = None;
650    let mut op_pos = 0;
651    let mut op_len = 0;
652
653    for op in &ops {
654        if let Some(pos) = condition.find(op) {
655            // Make sure we're not inside a quoted string
656            let before = &condition[..pos];
657            let quote_count = before.chars().filter(|&c| c == '"').count();
658            if quote_count % 2 == 0 {
659                found_op = Some(*op);
660                op_pos = pos;
661                op_len = op.len();
662                break;
663            }
664        }
665    }
666
667    let (op, field_part, value_part) = match found_op {
668        Some(op) => (
669            op,
670            condition[..op_pos].trim(),
671            condition[op_pos + op_len..].trim(),
672        ),
673        None => {
674            // No operator: just check if the field exists
675            // Pattern like `{ $.field }` means field exists
676            if let Some(path) = condition.strip_prefix("$.") {
677                return resolve_json_path_simple(json, path).is_some();
678            }
679            return true;
680        }
681    };
682
683    // Extract JSON path from field_part (must start with $.)
684    let path = match field_part.strip_prefix("$.") {
685        Some(p) => p,
686        None => return false, // Don't understand this pattern, fail closed
687    };
688
689    let actual_value = match resolve_json_path_simple(json, path) {
690        Some(v) => v,
691        None => return op == "!=", // field doesn't exist: only != matches
692    };
693
694    // Parse the expected value
695    let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
696        // String comparison
697        let s = &value_part[1..value_part.len() - 1];
698        match op {
699            "=" => actual_value.as_str() == Some(s),
700            "!=" => actual_value.as_str() != Some(s),
701            _ => false,
702        }
703    } else if let Ok(expected_num) = value_part.parse::<f64>() {
704        // Numeric comparison
705        let actual_num = actual_value.as_f64();
706        match (op, actual_num) {
707            ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
708            ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
709            (">", Some(n)) => n > expected_num,
710            ("<", Some(n)) => n < expected_num,
711            (">=", Some(n)) => n >= expected_num,
712            ("<=", Some(n)) => n <= expected_num,
713            _ => false,
714        }
715    } else if value_part == "true" || value_part == "false" {
716        let expected_bool = value_part == "true";
717        match op {
718            "=" => actual_value.as_bool() == Some(expected_bool),
719            "!=" => actual_value.as_bool() != Some(expected_bool),
720            _ => false,
721        }
722    } else {
723        false // Unknown value format, fail closed
724    };
725
726    expected_str
727}
728
729/// Resolve a simple dot-separated JSON path (e.g., "level" or "nested.field").
730fn resolve_json_path_simple<'a>(
731    json: &'a serde_json::Value,
732    path: &str,
733) -> Option<&'a serde_json::Value> {
734    let mut current = json;
735    for part in path.split('.') {
736        current = current.get(part)?;
737    }
738    if current.is_null() {
739        None
740    } else {
741        Some(current)
742    }
743}
744
745#[cfg(test)]
746pub(crate) mod test_helpers {
747    use super::*;
748    use bytes::Bytes;
749    use fakecloud_core::delivery::DeliveryBus;
750    use http::{HeaderMap, Method};
751    use std::collections::HashMap;
752    use std::sync::Arc;
753
754    pub fn make_service() -> LogsService {
755        let state = Arc::new(parking_lot::RwLock::new(
756            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
757        ));
758        let delivery_bus = Arc::new(DeliveryBus::new());
759        LogsService::new(state, delivery_bus)
760    }
761
762    pub fn make_request(
763        action: &str,
764        body: serde_json::Value,
765    ) -> fakecloud_core::service::AwsRequest {
766        fakecloud_core::service::AwsRequest {
767            service: "logs".to_string(),
768            action: action.to_string(),
769            region: "us-east-1".to_string(),
770            account_id: "123456789012".to_string(),
771            request_id: "test-request-id".to_string(),
772            headers: HeaderMap::new(),
773            query_params: HashMap::new(),
774            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
775            body_stream: parking_lot::Mutex::new(None),
776            path_segments: vec![],
777            raw_path: "/".to_string(),
778            raw_query: String::new(),
779            method: Method::POST,
780            is_query_protocol: false,
781            access_key_id: None,
782            principal: None,
783        }
784    }
785
786    pub fn create_group(svc: &LogsService, name: &str) {
787        let req = make_request(
788            "CreateLogGroup",
789            serde_json::json!({ "logGroupName": name }),
790        );
791        svc.create_log_group(&req).unwrap();
792    }
793
794    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
795        let req = make_request(
796            "CreateLogStream",
797            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
798        );
799        svc.create_log_stream(&req).unwrap();
800    }
801
802    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
803        let now = chrono::Utc::now().timestamp_millis();
804        let events: Vec<serde_json::Value> = messages
805            .iter()
806            .enumerate()
807            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
808            .collect();
809        let req = make_request(
810            "PutLogEvents",
811            serde_json::json!({
812                "logGroupName": group,
813                "logStreamName": stream,
814                "logEvents": events,
815            }),
816        );
817        svc.put_log_events(&req).unwrap();
818    }
819
820    pub fn put_events_at(
821        svc: &LogsService,
822        group: &str,
823        stream: &str,
824        messages: &[&str],
825        timestamp: i64,
826    ) {
827        let events: Vec<serde_json::Value> = messages
828            .iter()
829            .enumerate()
830            .map(
831                |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
832            )
833            .collect();
834        let req = make_request(
835            "PutLogEvents",
836            serde_json::json!({
837                "logGroupName": group,
838                "logStreamName": stream,
839                "logEvents": events,
840            }),
841        );
842        svc.put_log_events(&req).unwrap();
843    }
844
845    pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
846        let req = make_request(
847            "PutRetentionPolicy",
848            serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
849        );
850        svc.put_retention_policy(&req).unwrap();
851    }
852
853    #[test]
854    fn array_filter_pattern_does_not_match() {
855        assert!(
856            !matches_filter_pattern("[w1, w2, w3]", "some log message"),
857            "array-style filter pattern must not match (fail closed)"
858        );
859    }
860
861    #[test]
862    fn unrecognized_json_filter_path_does_not_match() {
863        // A JSON filter condition where the field part doesn't start with $.
864        // should fail closed instead of matching everything.
865        assert!(
866            !matches_single_json_condition(
867                "level = \"ERROR\"",
868                &serde_json::json!({"level": "ERROR"}),
869            ),
870            "filter condition without $. prefix must not match (fail closed)"
871        );
872    }
873
874    #[test]
875    fn unknown_value_format_does_not_match() {
876        // A value that is not a string, number, or boolean should fail closed.
877        assert!(
878            !matches_single_json_condition(
879                "$.level = ERROR",
880                &serde_json::json!({"level": "ERROR"}),
881            ),
882            "unquoted non-numeric non-boolean value must not match (fail closed)"
883        );
884    }
885
886    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
887    #[test]
888    fn snapshot_hook_is_none_without_store() {
889        let svc = make_service();
890        assert!(svc.snapshot_hook().is_none());
891    }
892
893    /// With a store, the hook is present and invoking it runs the whole-state
894    /// persist path the CloudFormation provisioner uses after mutating logs
895    /// state directly.
896    #[tokio::test]
897    async fn snapshot_hook_fires_with_store() {
898        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
899            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
900        let svc = make_service().with_snapshot_store(store);
901        let hook = svc
902            .snapshot_hook()
903            .expect("hook present when a store is set");
904        // Must not panic; exercises the closure and the snapshot save path.
905        hook().await;
906    }
907}