Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27/// CloudWatch Logs actions that do NOT mutate state. Everything else
28/// triggers a snapshot save on HTTP 2xx.
29fn is_read_only_action(action: &str) -> bool {
30    matches!(
31        action,
32        "DescribeLogGroups"
33            | "DescribeLogStreams"
34            | "GetLogEvents"
35            | "FilterLogEvents"
36            | "ListTagsLogGroup"
37            | "ListTagsForResource"
38            | "DescribeSubscriptionFilters"
39            | "DescribeMetricFilters"
40            | "DescribeResourcePolicies"
41            | "DescribeDestinations"
42            | "GetQueryResults"
43            | "DescribeQueries"
44            | "DescribeExportTasks"
45            | "GetDeliveryDestination"
46            | "DescribeDeliveryDestinations"
47            | "GetDeliveryDestinationPolicy"
48            | "GetDeliverySource"
49            | "DescribeDeliverySources"
50            | "GetDelivery"
51            | "DescribeDeliveries"
52            | "DescribeQueryDefinitions"
53            | "DescribeAccountPolicies"
54            | "GetDataProtectionPolicy"
55            | "DescribeIndexPolicies"
56            | "DescribeFieldIndexes"
57            | "GetTransformer"
58            | "TestTransformer"
59            | "GetLogAnomalyDetector"
60            | "ListLogAnomalyDetectors"
61            | "GetLogGroupFields"
62            | "TestMetricFilter"
63            | "GetLogRecord"
64            | "ListAnomalies"
65            | "DescribeImportTasks"
66            | "DescribeImportTaskBatches"
67            | "GetIntegration"
68            | "ListIntegrations"
69            | "GetLookupTable"
70            | "DescribeLookupTables"
71            | "GetScheduledQuery"
72            | "GetScheduledQueryHistory"
73            | "ListScheduledQueries"
74            | "StartLiveTail"
75            | "ListLogGroups"
76            | "ListLogGroupsForQuery"
77            | "ListAggregateLogGroupSummaries"
78            | "GetLogObject"
79            | "GetLogFields"
80            | "ListSourcesForS3TableIntegration"
81            | "DescribeConfigurationTemplates"
82            | "GetExportedData"
83    )
84}
85
86pub struct LogsService {
87    state: SharedLogsState,
88    delivery_bus: Arc<DeliveryBus>,
89    snapshot_store: Option<Arc<dyn SnapshotStore>>,
90    snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95        Self {
96            state,
97            delivery_bus,
98            snapshot_store: None,
99            snapshot_lock: Arc::new(AsyncMutex::new(())),
100        }
101    }
102
103    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104        self.snapshot_store = Some(store);
105        self
106    }
107
108    /// Persist current state as a snapshot. Held across the
109    /// clone-serialize-write sequence to prevent stale-last writes,
110    /// with serde + file I/O offloaded to the blocking pool.
111    async fn save_snapshot(&self) {
112        save_logs_snapshot(
113            &self.state,
114            self.snapshot_store.clone(),
115            &self.snapshot_lock,
116        )
117        .await;
118    }
119
120    /// Build a hook that persists the current Logs state when invoked, or `None`
121    /// in memory mode (no snapshot store). The CloudFormation provisioner
122    /// mutates `state` directly and uses this to write a CFN-provisioned
123    /// resource through to disk, the same way a direct mutating API call would.
124    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125        let store = self.snapshot_store.clone()?;
126        let state = self.state.clone();
127        let lock = self.snapshot_lock.clone();
128        Some(Arc::new(move || {
129            let state = state.clone();
130            let store = store.clone();
131            let lock = lock.clone();
132            Box::pin(async move {
133                save_logs_snapshot(&state, Some(store), &lock).await;
134            })
135        }))
136    }
137}
138
139/// Persist the current Logs state as a snapshot. Offloads the serde + blocking
140/// file write to the Tokio blocking pool. Noop when `store` is `None` (memory
141/// mode). Shared by `LogsService::save_snapshot` and the CloudFormation
142/// provisioner's post-provision persist hook so both route through the same
143/// serialize-and-write path.
144pub async fn save_logs_snapshot(
145    state: &SharedLogsState,
146    store: Option<Arc<dyn SnapshotStore>>,
147    lock: &AsyncMutex<()>,
148) {
149    let Some(store) = store else {
150        return;
151    };
152    let _guard = lock.lock().await;
153    let snapshot = LogsSnapshot {
154        schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155        accounts: Some(state.read().clone()),
156        state: None,
157    };
158    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159        let bytes = serde_json::to_vec(&snapshot)
160            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161        store.save(&bytes)
162    })
163    .await;
164    match join {
165        Ok(Ok(())) => {}
166        Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167        Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168    }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173    fn service_name(&self) -> &str {
174        "logs"
175    }
176
177    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178        let mutates = !is_read_only_action(req.action.as_str());
179        let result = match req.action.as_str() {
180            "CreateLogGroup" => self.create_log_group(&req),
181            "DeleteLogGroup" => self.delete_log_group(&req),
182            "DescribeLogGroups" => self.describe_log_groups(&req),
183            "CreateLogStream" => self.create_log_stream(&req),
184            "DeleteLogStream" => self.delete_log_stream(&req),
185            "DescribeLogStreams" => self.describe_log_streams(&req),
186            "PutLogEvents" => self.put_log_events(&req),
187            "GetLogEvents" => self.get_log_events(&req),
188            "FilterLogEvents" => self.filter_log_events(&req),
189            "TagLogGroup" => self.tag_log_group(&req),
190            "UntagLogGroup" => self.untag_log_group(&req),
191            "ListTagsLogGroup" => self.list_tags_log_group(&req),
192            "TagResource" => self.tag_resource(&req),
193            "UntagResource" => self.untag_resource(&req),
194            "ListTagsForResource" => self.list_tags_for_resource(&req),
195            "PutRetentionPolicy" => self.put_retention_policy(&req),
196            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200            "PutMetricFilter" => self.put_metric_filter(&req),
201            "DescribeMetricFilters" => self.describe_metric_filters(&req),
202            "DeleteMetricFilter" => self.delete_metric_filter(&req),
203            "PutResourcePolicy" => self.put_resource_policy(&req),
204            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206            "PutDestination" => self.put_destination(&req),
207            "DescribeDestinations" => self.describe_destinations(&req),
208            "DeleteDestination" => self.delete_destination(&req),
209            "PutDestinationPolicy" => self.put_destination_policy(&req),
210            "StartQuery" => self.start_query(&req),
211            "GetQueryResults" => self.get_query_results(&req),
212            "DescribeQueries" => self.describe_queries(&req),
213            "CreateExportTask" => self.create_export_task(&req),
214            "DescribeExportTasks" => self.describe_export_tasks(&req),
215            "CancelExportTask" => self.cancel_export_task(&req),
216            "PutDeliveryDestination" => self.put_delivery_destination(&req),
217            "GetDeliveryDestination" => self.get_delivery_destination(&req),
218            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223            "PutDeliverySource" => self.put_delivery_source(&req),
224            "GetDeliverySource" => self.get_delivery_source(&req),
225            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226            "DeleteDeliverySource" => self.delete_delivery_source(&req),
227            "CreateDelivery" => self.create_delivery(&req),
228            "GetDelivery" => self.get_delivery(&req),
229            "DescribeDeliveries" => self.describe_deliveries(&req),
230            "DeleteDelivery" => self.delete_delivery(&req),
231            "AssociateKmsKey" => self.associate_kms_key(&req),
232            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233            "PutQueryDefinition" => self.put_query_definition(&req),
234            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235            "DeleteQueryDefinition" => self.delete_query_definition(&req),
236            "PutAccountPolicy" => self.put_account_policy(&req),
237            "DescribeAccountPolicies" => self.describe_account_policies(&req),
238            "DeleteAccountPolicy" => self.delete_account_policy(&req),
239            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242            "PutIndexPolicy" => self.put_index_policy(&req),
243            "DescribeIndexPolicies" => self.describe_index_policies(&req),
244            "DeleteIndexPolicy" => self.delete_index_policy(&req),
245            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246            "PutTransformer" => self.put_transformer(&req),
247            "GetTransformer" => self.get_transformer(&req),
248            "DeleteTransformer" => self.delete_transformer(&req),
249            "TestTransformer" => self.test_transformer(&req),
250            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255            "GetLogGroupFields" => self.get_log_group_fields(&req),
256            "TestMetricFilter" => self.test_metric_filter(&req),
257            "StopQuery" => self.stop_query(&req),
258            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259            "GetLogRecord" => self.get_log_record(&req),
260            "ListAnomalies" => self.list_anomalies(&req),
261            "UpdateAnomaly" => self.update_anomaly(&req),
262            "CreateImportTask" => self.create_import_task(&req),
263            "DescribeImportTasks" => self.describe_import_tasks(&req),
264            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265            "CancelImportTask" => self.cancel_import_task(&req),
266            "PutIntegration" => self.put_integration(&req),
267            "GetIntegration" => self.get_integration(&req),
268            "DeleteIntegration" => self.delete_integration(&req),
269            "ListIntegrations" => self.list_integrations(&req),
270            "CreateLookupTable" => self.create_lookup_table(&req),
271            "GetLookupTable" => self.get_lookup_table(&req),
272            "DescribeLookupTables" => self.describe_lookup_tables(&req),
273            "DeleteLookupTable" => self.delete_lookup_table(&req),
274            "UpdateLookupTable" => self.update_lookup_table(&req),
275            "CreateScheduledQuery" => self.create_scheduled_query(&req),
276            "GetScheduledQuery" => self.get_scheduled_query(&req),
277            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278            "ListScheduledQueries" => self.list_scheduled_queries(&req),
279            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281            "StartLiveTail" => self.start_live_tail(&req),
282            "ListLogGroups" => self.list_log_groups(&req),
283            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286            "GetLogObject" => self.get_log_object(&req),
287            "GetLogFields" => self.get_log_fields(&req),
288            "AssociateSourceToS3TableIntegration" => {
289                self.associate_source_to_s3_table_integration(&req)
290            }
291            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292            "DisassociateSourceFromS3TableIntegration" => {
293                self.disassociate_source_from_s3_table_integration(&req)
294            }
295            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297            // Internal action for testing export storage
298            "GetExportedData" => self.get_exported_data(&req),
299            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300        };
301        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302            self.save_snapshot().await;
303        }
304        result
305    }
306
307    fn supported_actions(&self) -> &[&str] {
308        SUPPORTED_ACTIONS
309    }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313    "CreateLogGroup",
314    "DeleteLogGroup",
315    "DescribeLogGroups",
316    "CreateLogStream",
317    "DeleteLogStream",
318    "DescribeLogStreams",
319    "PutLogEvents",
320    "GetLogEvents",
321    "FilterLogEvents",
322    "TagLogGroup",
323    "UntagLogGroup",
324    "ListTagsLogGroup",
325    "TagResource",
326    "UntagResource",
327    "ListTagsForResource",
328    "PutRetentionPolicy",
329    "DeleteRetentionPolicy",
330    "PutSubscriptionFilter",
331    "DescribeSubscriptionFilters",
332    "DeleteSubscriptionFilter",
333    "PutMetricFilter",
334    "DescribeMetricFilters",
335    "DeleteMetricFilter",
336    "PutResourcePolicy",
337    "DescribeResourcePolicies",
338    "DeleteResourcePolicy",
339    "PutDestination",
340    "DescribeDestinations",
341    "DeleteDestination",
342    "PutDestinationPolicy",
343    "StartQuery",
344    "GetQueryResults",
345    "DescribeQueries",
346    "CreateExportTask",
347    "DescribeExportTasks",
348    "CancelExportTask",
349    "PutDeliveryDestination",
350    "GetDeliveryDestination",
351    "DescribeDeliveryDestinations",
352    "DeleteDeliveryDestination",
353    "PutDeliveryDestinationPolicy",
354    "GetDeliveryDestinationPolicy",
355    "DeleteDeliveryDestinationPolicy",
356    "PutDeliverySource",
357    "GetDeliverySource",
358    "DescribeDeliverySources",
359    "DeleteDeliverySource",
360    "CreateDelivery",
361    "GetDelivery",
362    "DescribeDeliveries",
363    "DeleteDelivery",
364    "AssociateKmsKey",
365    "DisassociateKmsKey",
366    "PutQueryDefinition",
367    "DescribeQueryDefinitions",
368    "DeleteQueryDefinition",
369    "PutAccountPolicy",
370    "DescribeAccountPolicies",
371    "DeleteAccountPolicy",
372    "PutDataProtectionPolicy",
373    "GetDataProtectionPolicy",
374    "DeleteDataProtectionPolicy",
375    "PutIndexPolicy",
376    "DescribeIndexPolicies",
377    "DeleteIndexPolicy",
378    "DescribeFieldIndexes",
379    "PutTransformer",
380    "GetTransformer",
381    "DeleteTransformer",
382    "TestTransformer",
383    "CreateLogAnomalyDetector",
384    "GetLogAnomalyDetector",
385    "DeleteLogAnomalyDetector",
386    "ListLogAnomalyDetectors",
387    "UpdateLogAnomalyDetector",
388    "GetLogGroupFields",
389    "TestMetricFilter",
390    "StopQuery",
391    "PutLogGroupDeletionProtection",
392    "GetLogRecord",
393    "ListAnomalies",
394    "UpdateAnomaly",
395    "CreateImportTask",
396    "DescribeImportTasks",
397    "DescribeImportTaskBatches",
398    "CancelImportTask",
399    "PutIntegration",
400    "GetIntegration",
401    "DeleteIntegration",
402    "ListIntegrations",
403    "CreateLookupTable",
404    "GetLookupTable",
405    "DescribeLookupTables",
406    "DeleteLookupTable",
407    "UpdateLookupTable",
408    "CreateScheduledQuery",
409    "GetScheduledQuery",
410    "GetScheduledQueryHistory",
411    "ListScheduledQueries",
412    "DeleteScheduledQuery",
413    "UpdateScheduledQuery",
414    "StartLiveTail",
415    "ListLogGroups",
416    "ListLogGroupsForQuery",
417    "ListAggregateLogGroupSummaries",
418    "PutBearerTokenAuthentication",
419    "GetLogObject",
420    "GetLogFields",
421    "AssociateSourceToS3TableIntegration",
422    "ListSourcesForS3TableIntegration",
423    "DisassociateSourceFromS3TableIntegration",
424    "UpdateDeliveryConfiguration",
425    "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429    body[field].as_str().ok_or_else(|| {
430        AwsServiceError::aws_error(
431            StatusCode::BAD_REQUEST,
432            "InvalidParameterException",
433            format!("{field} is required"),
434        )
435    })
436}
437
438/// Build a delivery destination configuration JSON object, ensuring
439/// `destinationResourceArn` is always present as a string (Smithy requirement).
440fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441    let mut m: serde_json::Map<String, Value> =
442        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443    m.entry("destinationResourceArn".to_string())
444        .or_insert_with(|| json!(""));
445    Value::Object(m)
446}
447
448/// Infer a delivery destination's type from its destination resource ARN's
449/// service. Defaults to `CWL` (the most common case) when the ARN is absent or
450/// unrecognised.
451pub fn infer_delivery_destination_type(destination_arn: Option<&String>) -> String {
452    let arn = destination_arn.map(String::as_str).unwrap_or("");
453    let service = arn.split(':').nth(2).unwrap_or("");
454    match service {
455        "s3" => "S3",
456        "firehose" => "FH",
457        "xray" => "XRAY",
458        _ => "CWL",
459    }
460    .to_string()
461}
462
463fn generate_sequence_token() -> String {
464    use std::time::{SystemTime, UNIX_EPOCH};
465    let nanos = SystemTime::now()
466        .duration_since(UNIX_EPOCH)
467        .unwrap_or_default()
468        .as_nanos();
469    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
470    format!("{:038}", nanos % 10u128.pow(38))
471}
472
473fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
474    AwsServiceError::aws_error(
475        StatusCode::BAD_REQUEST,
476        "InvalidParameterException",
477        format!(
478            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
479        ),
480    )
481}
482
483/// Resolve log group name from either logGroupName or resourceIdentifier.
484/// resourceIdentifier can be a log group name or an ARN.
485fn resolve_log_group_name(
486    log_group_name: Option<&str>,
487    resource_identifier: Option<&str>,
488) -> Result<String, AwsServiceError> {
489    if let Some(identifier) = resource_identifier {
490        if identifier.starts_with("arn:") {
491            extract_log_group_from_arn(identifier).ok_or_else(|| {
492                AwsServiceError::aws_error(
493                    StatusCode::BAD_REQUEST,
494                    "InvalidParameterException",
495                    format!("Invalid ARN: {identifier}"),
496                )
497            })
498        } else {
499            Ok(identifier.to_string())
500        }
501    } else if let Some(name) = log_group_name {
502        Ok(name.to_string())
503    } else {
504        Err(AwsServiceError::aws_error(
505            StatusCode::BAD_REQUEST,
506            "InvalidParameterException",
507            "Either logGroupName or resourceIdentifier is required",
508        ))
509    }
510}
511
512/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
513pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
514    // arn:aws:logs:region:account:log-group:name:*
515    let parts: Vec<&str> = arn.splitn(7, ':').collect();
516    if parts.len() >= 7 && parts[5] == "log-group" {
517        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
518        Some(name.to_string())
519    } else {
520        None
521    }
522}
523
524/// CloudWatch Logs filter pattern matching.
525///
526/// Rules:
527/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
528/// - Quoted string `"foo bar"` matches the exact substring
529/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
530/// - Single unquoted word `foo` is a simple substring match
531fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
532    let pattern = pattern.trim();
533
534    // Empty pattern matches everything
535    if pattern.is_empty() {
536        return true;
537    }
538
539    // JSON `{ ... }` (incl. `||`) and array `[ ... ]` patterns use the full
540    // filter-pattern engine, so FilterLogEvents matches them the same way
541    // metric-filter ingest does instead of failing closed.
542    if (pattern.starts_with('{') && pattern.ends_with('}')) || pattern.starts_with('[') {
543        return crate::filter_pattern::matches(pattern, message);
544    }
545
546    // Quoted pattern: exact substring match (handles escaped inner quotes)
547    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
548        let inner = &pattern[1..pattern.len() - 1];
549        // Unescape inner quotes: \"  ->  "
550        let unescaped = inner.replace("\\\"", "\"");
551        return message.contains(&unescaped);
552    }
553
554    // Multiple words: all must be present (AND semantics)
555    let terms = parse_filter_terms(pattern);
556    terms.iter().all(|term| message.contains(term.as_str()))
557}
558
559/// Parse filter pattern terms, respecting quoted strings as single terms.
560fn parse_filter_terms(pattern: &str) -> Vec<String> {
561    let mut terms = Vec::new();
562    let mut chars = pattern.chars().peekable();
563
564    while chars.peek().is_some() {
565        // Skip whitespace
566        while chars.peek().is_some_and(|c| c.is_whitespace()) {
567            chars.next();
568        }
569
570        if chars.peek().is_none() {
571            break;
572        }
573
574        if chars.peek() == Some(&'"') {
575            // Quoted term
576            chars.next(); // consume opening quote
577            let mut term = String::new();
578            loop {
579                match chars.next() {
580                    Some('\\') => {
581                        if let Some(c) = chars.next() {
582                            term.push(c);
583                        }
584                    }
585                    Some('"') => break,
586                    Some(c) => term.push(c),
587                    None => break,
588                }
589            }
590            terms.push(term);
591        } else {
592            // Unquoted term
593            let mut term = String::new();
594            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
595                term.push(chars.next().unwrap());
596            }
597            if !term.is_empty() {
598                terms.push(term);
599            }
600        }
601    }
602
603    terms
604}
605
606#[cfg(test)]
607pub(crate) mod test_helpers {
608    use super::*;
609    use bytes::Bytes;
610    use fakecloud_core::delivery::DeliveryBus;
611    use http::{HeaderMap, Method};
612    use std::collections::HashMap;
613    use std::sync::Arc;
614
615    pub fn make_service() -> LogsService {
616        let state = Arc::new(parking_lot::RwLock::new(
617            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
618        ));
619        let delivery_bus = Arc::new(DeliveryBus::new());
620        LogsService::new(state, delivery_bus)
621    }
622
623    pub fn make_request(
624        action: &str,
625        body: serde_json::Value,
626    ) -> fakecloud_core::service::AwsRequest {
627        fakecloud_core::service::AwsRequest {
628            service: "logs".to_string(),
629            action: action.to_string(),
630            region: "us-east-1".to_string(),
631            account_id: "123456789012".to_string(),
632            request_id: "test-request-id".to_string(),
633            headers: HeaderMap::new(),
634            query_params: HashMap::new(),
635            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
636            body_stream: parking_lot::Mutex::new(None),
637            path_segments: vec![],
638            raw_path: "/".to_string(),
639            raw_query: String::new(),
640            method: Method::POST,
641            is_query_protocol: false,
642            access_key_id: None,
643            principal: None,
644        }
645    }
646
647    pub fn create_group(svc: &LogsService, name: &str) {
648        let req = make_request(
649            "CreateLogGroup",
650            serde_json::json!({ "logGroupName": name }),
651        );
652        svc.create_log_group(&req).unwrap();
653    }
654
655    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
656        let req = make_request(
657            "CreateLogStream",
658            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
659        );
660        svc.create_log_stream(&req).unwrap();
661    }
662
663    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
664        let now = chrono::Utc::now().timestamp_millis();
665        let events: Vec<serde_json::Value> = messages
666            .iter()
667            .enumerate()
668            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
669            .collect();
670        let req = make_request(
671            "PutLogEvents",
672            serde_json::json!({
673                "logGroupName": group,
674                "logStreamName": stream,
675                "logEvents": events,
676            }),
677        );
678        svc.put_log_events(&req).unwrap();
679    }
680
681    pub fn put_events_at(
682        svc: &LogsService,
683        group: &str,
684        stream: &str,
685        messages: &[&str],
686        timestamp: i64,
687    ) {
688        let events: Vec<serde_json::Value> = messages
689            .iter()
690            .enumerate()
691            .map(
692                |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
693            )
694            .collect();
695        let req = make_request(
696            "PutLogEvents",
697            serde_json::json!({
698                "logGroupName": group,
699                "logStreamName": stream,
700                "logEvents": events,
701            }),
702        );
703        svc.put_log_events(&req).unwrap();
704    }
705
706    pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
707        let req = make_request(
708            "PutRetentionPolicy",
709            serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
710        );
711        svc.put_retention_policy(&req).unwrap();
712    }
713
714    // bug-audit 2026-06-27, T1.14: FilterLogEvents now evaluates array `[...]`
715    // patterns through the full engine (positional token match) instead of
716    // failing closed.
717    #[test]
718    fn array_filter_pattern_matches_positionally() {
719        // Bare-name fields match any token in their slot.
720        assert!(matches_filter_pattern("[w1, w2, w3]", "some log message"));
721        // A literal-equality field that doesn't match the token fails.
722        assert!(!matches_filter_pattern(
723            "[w1=ERROR, w2, w3]",
724            "INFO log message"
725        ));
726        // Wrong arity (4 tokens vs 3 fields) doesn't match.
727        assert!(!matches_filter_pattern("[w1, w2, w3]", "a b c d"));
728    }
729
730    // JSON patterns with `||` (OR) now match via the full engine.
731    #[test]
732    fn json_filter_pattern_supports_or() {
733        let msg = r#"{"level":"ERROR","code":500}"#;
734        assert!(matches_filter_pattern(
735            "{ $.level = \"WARN\" || $.code = 500 }",
736            msg
737        ));
738        assert!(!matches_filter_pattern(
739            "{ $.level = \"WARN\" || $.code = 200 }",
740            msg
741        ));
742    }
743
744    /// No snapshot store (memory mode) -> no persist hook for the CFN provisioner.
745    #[test]
746    fn snapshot_hook_is_none_without_store() {
747        let svc = make_service();
748        assert!(svc.snapshot_hook().is_none());
749    }
750
751    /// With a store, the hook is present and invoking it runs the whole-state
752    /// persist path the CloudFormation provisioner uses after mutating logs
753    /// state directly.
754    #[tokio::test]
755    async fn snapshot_hook_fires_with_store() {
756        let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
757            Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
758        let svc = make_service().with_snapshot_store(store);
759        let hook = svc
760            .snapshot_hook()
761            .expect("hook present when a store is set");
762        // Must not panic; exercises the closure and the snapshot save path.
763        hook().await;
764    }
765}