Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27/// CloudWatch Logs actions that do NOT mutate state. Everything else
28/// triggers a snapshot save on HTTP 2xx.
29fn is_read_only_action(action: &str) -> bool {
30    matches!(
31        action,
32        "DescribeLogGroups"
33            | "DescribeLogStreams"
34            | "GetLogEvents"
35            | "FilterLogEvents"
36            | "ListTagsLogGroup"
37            | "ListTagsForResource"
38            | "DescribeSubscriptionFilters"
39            | "DescribeMetricFilters"
40            | "DescribeResourcePolicies"
41            | "DescribeDestinations"
42            | "GetQueryResults"
43            | "DescribeQueries"
44            | "DescribeExportTasks"
45            | "GetDeliveryDestination"
46            | "DescribeDeliveryDestinations"
47            | "GetDeliveryDestinationPolicy"
48            | "GetDeliverySource"
49            | "DescribeDeliverySources"
50            | "GetDelivery"
51            | "DescribeDeliveries"
52            | "DescribeQueryDefinitions"
53            | "DescribeAccountPolicies"
54            | "GetDataProtectionPolicy"
55            | "DescribeIndexPolicies"
56            | "DescribeFieldIndexes"
57            | "GetTransformer"
58            | "TestTransformer"
59            | "GetLogAnomalyDetector"
60            | "ListLogAnomalyDetectors"
61            | "GetLogGroupFields"
62            | "TestMetricFilter"
63            | "GetLogRecord"
64            | "ListAnomalies"
65            | "DescribeImportTasks"
66            | "DescribeImportTaskBatches"
67            | "GetIntegration"
68            | "ListIntegrations"
69            | "GetLookupTable"
70            | "DescribeLookupTables"
71            | "GetScheduledQuery"
72            | "GetScheduledQueryHistory"
73            | "ListScheduledQueries"
74            | "StartLiveTail"
75            | "ListLogGroups"
76            | "ListLogGroupsForQuery"
77            | "ListAggregateLogGroupSummaries"
78            | "GetLogObject"
79            | "GetLogFields"
80            | "ListSourcesForS3TableIntegration"
81            | "DescribeConfigurationTemplates"
82            | "GetExportedData"
83    )
84}
85
86pub struct LogsService {
87    state: SharedLogsState,
88    delivery_bus: Arc<DeliveryBus>,
89    snapshot_store: Option<Arc<dyn SnapshotStore>>,
90    snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95        Self {
96            state,
97            delivery_bus,
98            snapshot_store: None,
99            snapshot_lock: Arc::new(AsyncMutex::new(())),
100        }
101    }
102
103    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104        self.snapshot_store = Some(store);
105        self
106    }
107
108    /// Persist current state as a snapshot. Held across the
109    /// clone-serialize-write sequence to prevent stale-last writes,
110    /// with serde + file I/O offloaded to the blocking pool.
111    async fn save_snapshot(&self) {
112        let Some(store) = self.snapshot_store.clone() else {
113            return;
114        };
115        let _guard = self.snapshot_lock.lock().await;
116        let snapshot = LogsSnapshot {
117            schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
118            accounts: Some(self.state.read().clone()),
119            state: None,
120        };
121        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122            let bytes = serde_json::to_vec(&snapshot)
123                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124            store.save(&bytes)
125        })
126        .await;
127        match join {
128            Ok(Ok(())) => {}
129            Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
130            Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
131        }
132    }
133}
134
135#[async_trait]
136impl AwsService for LogsService {
137    fn service_name(&self) -> &str {
138        "logs"
139    }
140
141    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
142        let mutates = !is_read_only_action(req.action.as_str());
143        let result = match req.action.as_str() {
144            "CreateLogGroup" => self.create_log_group(&req),
145            "DeleteLogGroup" => self.delete_log_group(&req),
146            "DescribeLogGroups" => self.describe_log_groups(&req),
147            "CreateLogStream" => self.create_log_stream(&req),
148            "DeleteLogStream" => self.delete_log_stream(&req),
149            "DescribeLogStreams" => self.describe_log_streams(&req),
150            "PutLogEvents" => self.put_log_events(&req),
151            "GetLogEvents" => self.get_log_events(&req),
152            "FilterLogEvents" => self.filter_log_events(&req),
153            "TagLogGroup" => self.tag_log_group(&req),
154            "UntagLogGroup" => self.untag_log_group(&req),
155            "ListTagsLogGroup" => self.list_tags_log_group(&req),
156            "TagResource" => self.tag_resource(&req),
157            "UntagResource" => self.untag_resource(&req),
158            "ListTagsForResource" => self.list_tags_for_resource(&req),
159            "PutRetentionPolicy" => self.put_retention_policy(&req),
160            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
161            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
162            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
163            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
164            "PutMetricFilter" => self.put_metric_filter(&req),
165            "DescribeMetricFilters" => self.describe_metric_filters(&req),
166            "DeleteMetricFilter" => self.delete_metric_filter(&req),
167            "PutResourcePolicy" => self.put_resource_policy(&req),
168            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
169            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
170            "PutDestination" => self.put_destination(&req),
171            "DescribeDestinations" => self.describe_destinations(&req),
172            "DeleteDestination" => self.delete_destination(&req),
173            "PutDestinationPolicy" => self.put_destination_policy(&req),
174            "StartQuery" => self.start_query(&req),
175            "GetQueryResults" => self.get_query_results(&req),
176            "DescribeQueries" => self.describe_queries(&req),
177            "CreateExportTask" => self.create_export_task(&req),
178            "DescribeExportTasks" => self.describe_export_tasks(&req),
179            "CancelExportTask" => self.cancel_export_task(&req),
180            "PutDeliveryDestination" => self.put_delivery_destination(&req),
181            "GetDeliveryDestination" => self.get_delivery_destination(&req),
182            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
183            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
184            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
185            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
186            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
187            "PutDeliverySource" => self.put_delivery_source(&req),
188            "GetDeliverySource" => self.get_delivery_source(&req),
189            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
190            "DeleteDeliverySource" => self.delete_delivery_source(&req),
191            "CreateDelivery" => self.create_delivery(&req),
192            "GetDelivery" => self.get_delivery(&req),
193            "DescribeDeliveries" => self.describe_deliveries(&req),
194            "DeleteDelivery" => self.delete_delivery(&req),
195            "AssociateKmsKey" => self.associate_kms_key(&req),
196            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
197            "PutQueryDefinition" => self.put_query_definition(&req),
198            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
199            "DeleteQueryDefinition" => self.delete_query_definition(&req),
200            "PutAccountPolicy" => self.put_account_policy(&req),
201            "DescribeAccountPolicies" => self.describe_account_policies(&req),
202            "DeleteAccountPolicy" => self.delete_account_policy(&req),
203            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
204            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
205            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
206            "PutIndexPolicy" => self.put_index_policy(&req),
207            "DescribeIndexPolicies" => self.describe_index_policies(&req),
208            "DeleteIndexPolicy" => self.delete_index_policy(&req),
209            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
210            "PutTransformer" => self.put_transformer(&req),
211            "GetTransformer" => self.get_transformer(&req),
212            "DeleteTransformer" => self.delete_transformer(&req),
213            "TestTransformer" => self.test_transformer(&req),
214            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
215            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
216            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
217            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
218            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
219            "GetLogGroupFields" => self.get_log_group_fields(&req),
220            "TestMetricFilter" => self.test_metric_filter(&req),
221            "StopQuery" => self.stop_query(&req),
222            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
223            "GetLogRecord" => self.get_log_record(&req),
224            "ListAnomalies" => self.list_anomalies(&req),
225            "UpdateAnomaly" => self.update_anomaly(&req),
226            "CreateImportTask" => self.create_import_task(&req),
227            "DescribeImportTasks" => self.describe_import_tasks(&req),
228            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
229            "CancelImportTask" => self.cancel_import_task(&req),
230            "PutIntegration" => self.put_integration(&req),
231            "GetIntegration" => self.get_integration(&req),
232            "DeleteIntegration" => self.delete_integration(&req),
233            "ListIntegrations" => self.list_integrations(&req),
234            "CreateLookupTable" => self.create_lookup_table(&req),
235            "GetLookupTable" => self.get_lookup_table(&req),
236            "DescribeLookupTables" => self.describe_lookup_tables(&req),
237            "DeleteLookupTable" => self.delete_lookup_table(&req),
238            "UpdateLookupTable" => self.update_lookup_table(&req),
239            "CreateScheduledQuery" => self.create_scheduled_query(&req),
240            "GetScheduledQuery" => self.get_scheduled_query(&req),
241            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
242            "ListScheduledQueries" => self.list_scheduled_queries(&req),
243            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
244            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
245            "StartLiveTail" => self.start_live_tail(&req),
246            "ListLogGroups" => self.list_log_groups(&req),
247            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
248            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
249            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
250            "GetLogObject" => self.get_log_object(&req),
251            "GetLogFields" => self.get_log_fields(&req),
252            "AssociateSourceToS3TableIntegration" => {
253                self.associate_source_to_s3_table_integration(&req)
254            }
255            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
256            "DisassociateSourceFromS3TableIntegration" => {
257                self.disassociate_source_from_s3_table_integration(&req)
258            }
259            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
260            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
261            // Internal action for testing export storage
262            "GetExportedData" => self.get_exported_data(&req),
263            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
264        };
265        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
266            self.save_snapshot().await;
267        }
268        result
269    }
270
271    fn supported_actions(&self) -> &[&str] {
272        SUPPORTED_ACTIONS
273    }
274}
275
276const SUPPORTED_ACTIONS: &[&str] = &[
277    "CreateLogGroup",
278    "DeleteLogGroup",
279    "DescribeLogGroups",
280    "CreateLogStream",
281    "DeleteLogStream",
282    "DescribeLogStreams",
283    "PutLogEvents",
284    "GetLogEvents",
285    "FilterLogEvents",
286    "TagLogGroup",
287    "UntagLogGroup",
288    "ListTagsLogGroup",
289    "TagResource",
290    "UntagResource",
291    "ListTagsForResource",
292    "PutRetentionPolicy",
293    "DeleteRetentionPolicy",
294    "PutSubscriptionFilter",
295    "DescribeSubscriptionFilters",
296    "DeleteSubscriptionFilter",
297    "PutMetricFilter",
298    "DescribeMetricFilters",
299    "DeleteMetricFilter",
300    "PutResourcePolicy",
301    "DescribeResourcePolicies",
302    "DeleteResourcePolicy",
303    "PutDestination",
304    "DescribeDestinations",
305    "DeleteDestination",
306    "PutDestinationPolicy",
307    "StartQuery",
308    "GetQueryResults",
309    "DescribeQueries",
310    "CreateExportTask",
311    "DescribeExportTasks",
312    "CancelExportTask",
313    "PutDeliveryDestination",
314    "GetDeliveryDestination",
315    "DescribeDeliveryDestinations",
316    "DeleteDeliveryDestination",
317    "PutDeliveryDestinationPolicy",
318    "GetDeliveryDestinationPolicy",
319    "DeleteDeliveryDestinationPolicy",
320    "PutDeliverySource",
321    "GetDeliverySource",
322    "DescribeDeliverySources",
323    "DeleteDeliverySource",
324    "CreateDelivery",
325    "GetDelivery",
326    "DescribeDeliveries",
327    "DeleteDelivery",
328    "AssociateKmsKey",
329    "DisassociateKmsKey",
330    "PutQueryDefinition",
331    "DescribeQueryDefinitions",
332    "DeleteQueryDefinition",
333    "PutAccountPolicy",
334    "DescribeAccountPolicies",
335    "DeleteAccountPolicy",
336    "PutDataProtectionPolicy",
337    "GetDataProtectionPolicy",
338    "DeleteDataProtectionPolicy",
339    "PutIndexPolicy",
340    "DescribeIndexPolicies",
341    "DeleteIndexPolicy",
342    "DescribeFieldIndexes",
343    "PutTransformer",
344    "GetTransformer",
345    "DeleteTransformer",
346    "TestTransformer",
347    "CreateLogAnomalyDetector",
348    "GetLogAnomalyDetector",
349    "DeleteLogAnomalyDetector",
350    "ListLogAnomalyDetectors",
351    "UpdateLogAnomalyDetector",
352    "GetLogGroupFields",
353    "TestMetricFilter",
354    "StopQuery",
355    "PutLogGroupDeletionProtection",
356    "GetLogRecord",
357    "ListAnomalies",
358    "UpdateAnomaly",
359    "CreateImportTask",
360    "DescribeImportTasks",
361    "DescribeImportTaskBatches",
362    "CancelImportTask",
363    "PutIntegration",
364    "GetIntegration",
365    "DeleteIntegration",
366    "ListIntegrations",
367    "CreateLookupTable",
368    "GetLookupTable",
369    "DescribeLookupTables",
370    "DeleteLookupTable",
371    "UpdateLookupTable",
372    "CreateScheduledQuery",
373    "GetScheduledQuery",
374    "GetScheduledQueryHistory",
375    "ListScheduledQueries",
376    "DeleteScheduledQuery",
377    "UpdateScheduledQuery",
378    "StartLiveTail",
379    "ListLogGroups",
380    "ListLogGroupsForQuery",
381    "ListAggregateLogGroupSummaries",
382    "PutBearerTokenAuthentication",
383    "GetLogObject",
384    "GetLogFields",
385    "AssociateSourceToS3TableIntegration",
386    "ListSourcesForS3TableIntegration",
387    "DisassociateSourceFromS3TableIntegration",
388    "UpdateDeliveryConfiguration",
389    "DescribeConfigurationTemplates",
390];
391
392fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
393    body[field].as_str().ok_or_else(|| {
394        AwsServiceError::aws_error(
395            StatusCode::BAD_REQUEST,
396            "InvalidParameterException",
397            format!("{field} is required"),
398        )
399    })
400}
401
402/// Build a delivery destination configuration JSON object, ensuring
403/// `destinationResourceArn` is always present as a string (Smithy requirement).
404fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
405    let mut m: serde_json::Map<String, Value> =
406        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
407    m.entry("destinationResourceArn".to_string())
408        .or_insert_with(|| json!(""));
409    Value::Object(m)
410}
411
412fn generate_sequence_token() -> String {
413    use std::time::{SystemTime, UNIX_EPOCH};
414    let nanos = SystemTime::now()
415        .duration_since(UNIX_EPOCH)
416        .unwrap_or_default()
417        .as_nanos();
418    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
419    format!("{:038}", nanos % 10u128.pow(38))
420}
421
422fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
423    AwsServiceError::aws_error(
424        StatusCode::BAD_REQUEST,
425        "InvalidParameterException",
426        format!(
427            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
428        ),
429    )
430}
431
432/// Resolve log group name from either logGroupName or resourceIdentifier.
433/// resourceIdentifier can be a log group name or an ARN.
434fn resolve_log_group_name(
435    log_group_name: Option<&str>,
436    resource_identifier: Option<&str>,
437) -> Result<String, AwsServiceError> {
438    if let Some(identifier) = resource_identifier {
439        if identifier.starts_with("arn:") {
440            extract_log_group_from_arn(identifier).ok_or_else(|| {
441                AwsServiceError::aws_error(
442                    StatusCode::BAD_REQUEST,
443                    "InvalidParameterException",
444                    format!("Invalid ARN: {identifier}"),
445                )
446            })
447        } else {
448            Ok(identifier.to_string())
449        }
450    } else if let Some(name) = log_group_name {
451        Ok(name.to_string())
452    } else {
453        Err(AwsServiceError::aws_error(
454            StatusCode::BAD_REQUEST,
455            "InvalidParameterException",
456            "Either logGroupName or resourceIdentifier is required",
457        ))
458    }
459}
460
461/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
462pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
463    // arn:aws:logs:region:account:log-group:name:*
464    let parts: Vec<&str> = arn.splitn(7, ':').collect();
465    if parts.len() >= 7 && parts[5] == "log-group" {
466        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
467        Some(name.to_string())
468    } else {
469        None
470    }
471}
472
473/// CloudWatch Logs filter pattern matching.
474///
475/// Rules:
476/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
477/// - Quoted string `"foo bar"` matches the exact substring
478/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
479/// - Single unquoted word `foo` is a simple substring match
480fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
481    let pattern = pattern.trim();
482
483    // Empty pattern matches everything
484    if pattern.is_empty() {
485        return true;
486    }
487
488    // JSON/metric filter patterns: { $.field = "value" }
489    if pattern.starts_with('{') && pattern.ends_with('}') {
490        return matches_json_filter_pattern(pattern, message);
491    }
492
493    // Array-style metric filter patterns - not implemented, fail closed
494    if pattern.starts_with('[') {
495        return false;
496    }
497
498    // Quoted pattern: exact substring match (handles escaped inner quotes)
499    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
500        let inner = &pattern[1..pattern.len() - 1];
501        // Unescape inner quotes: \"  ->  "
502        let unescaped = inner.replace("\\\"", "\"");
503        return message.contains(&unescaped);
504    }
505
506    // Multiple words: all must be present (AND semantics)
507    let terms = parse_filter_terms(pattern);
508    terms.iter().all(|term| message.contains(term.as_str()))
509}
510
511/// Parse filter pattern terms, respecting quoted strings as single terms.
512fn parse_filter_terms(pattern: &str) -> Vec<String> {
513    let mut terms = Vec::new();
514    let mut chars = pattern.chars().peekable();
515
516    while chars.peek().is_some() {
517        // Skip whitespace
518        while chars.peek().is_some_and(|c| c.is_whitespace()) {
519            chars.next();
520        }
521
522        if chars.peek().is_none() {
523            break;
524        }
525
526        if chars.peek() == Some(&'"') {
527            // Quoted term
528            chars.next(); // consume opening quote
529            let mut term = String::new();
530            loop {
531                match chars.next() {
532                    Some('\\') => {
533                        if let Some(c) = chars.next() {
534                            term.push(c);
535                        }
536                    }
537                    Some('"') => break,
538                    Some(c) => term.push(c),
539                    None => break,
540                }
541            }
542            terms.push(term);
543        } else {
544            // Unquoted term
545            let mut term = String::new();
546            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
547                term.push(chars.next().unwrap());
548            }
549            if !term.is_empty() {
550                terms.push(term);
551            }
552        }
553    }
554
555    terms
556}
557
558/// Match a JSON filter pattern like `{ $.level = "ERROR" }` against a message.
559fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
560    // Strip the outer braces
561    let inner = pattern
562        .strip_prefix('{')
563        .and_then(|s| s.strip_suffix('}'))
564        .unwrap_or("")
565        .trim();
566
567    if inner.is_empty() {
568        return true;
569    }
570
571    // Parse the message as JSON
572    let msg_json: serde_json::Value = match serde_json::from_str(message) {
573        Ok(v) => v,
574        Err(_) => return false, // Non-JSON message cannot match JSON filter
575    };
576
577    // Support: $.field = "value", $.field != "value", $.field = number,
578    //          $.field > number, $.field < number, $.field >= number, $.field <= number
579    // Also support && for multiple conditions
580    let conditions: Vec<&str> = inner.split("&&").collect();
581
582    for condition in conditions {
583        let condition = condition.trim();
584        if !matches_single_json_condition(condition, &msg_json) {
585            return false;
586        }
587    }
588
589    true
590}
591
592fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
593    // Try to parse: $.field op value
594    let condition = condition.trim();
595
596    // Find the operator
597    let ops = ["!=", ">=", "<=", "=", ">", "<"];
598    let mut found_op = None;
599    let mut op_pos = 0;
600    let mut op_len = 0;
601
602    for op in &ops {
603        if let Some(pos) = condition.find(op) {
604            // Make sure we're not inside a quoted string
605            let before = &condition[..pos];
606            let quote_count = before.chars().filter(|&c| c == '"').count();
607            if quote_count % 2 == 0 {
608                found_op = Some(*op);
609                op_pos = pos;
610                op_len = op.len();
611                break;
612            }
613        }
614    }
615
616    let (op, field_part, value_part) = match found_op {
617        Some(op) => (
618            op,
619            condition[..op_pos].trim(),
620            condition[op_pos + op_len..].trim(),
621        ),
622        None => {
623            // No operator: just check if the field exists
624            // Pattern like `{ $.field }` means field exists
625            if let Some(path) = condition.strip_prefix("$.") {
626                return resolve_json_path_simple(json, path).is_some();
627            }
628            return true;
629        }
630    };
631
632    // Extract JSON path from field_part (must start with $.)
633    let path = match field_part.strip_prefix("$.") {
634        Some(p) => p,
635        None => return false, // Don't understand this pattern, fail closed
636    };
637
638    let actual_value = match resolve_json_path_simple(json, path) {
639        Some(v) => v,
640        None => return op == "!=", // field doesn't exist: only != matches
641    };
642
643    // Parse the expected value
644    let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
645        // String comparison
646        let s = &value_part[1..value_part.len() - 1];
647        match op {
648            "=" => actual_value.as_str() == Some(s),
649            "!=" => actual_value.as_str() != Some(s),
650            _ => false,
651        }
652    } else if let Ok(expected_num) = value_part.parse::<f64>() {
653        // Numeric comparison
654        let actual_num = actual_value.as_f64();
655        match (op, actual_num) {
656            ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
657            ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
658            (">", Some(n)) => n > expected_num,
659            ("<", Some(n)) => n < expected_num,
660            (">=", Some(n)) => n >= expected_num,
661            ("<=", Some(n)) => n <= expected_num,
662            _ => false,
663        }
664    } else if value_part == "true" || value_part == "false" {
665        let expected_bool = value_part == "true";
666        match op {
667            "=" => actual_value.as_bool() == Some(expected_bool),
668            "!=" => actual_value.as_bool() != Some(expected_bool),
669            _ => false,
670        }
671    } else {
672        false // Unknown value format, fail closed
673    };
674
675    expected_str
676}
677
678/// Resolve a simple dot-separated JSON path (e.g., "level" or "nested.field").
679fn resolve_json_path_simple<'a>(
680    json: &'a serde_json::Value,
681    path: &str,
682) -> Option<&'a serde_json::Value> {
683    let mut current = json;
684    for part in path.split('.') {
685        current = current.get(part)?;
686    }
687    if current.is_null() {
688        None
689    } else {
690        Some(current)
691    }
692}
693
694#[cfg(test)]
695pub(crate) mod test_helpers {
696    use super::*;
697    use bytes::Bytes;
698    use fakecloud_core::delivery::DeliveryBus;
699    use http::{HeaderMap, Method};
700    use std::collections::HashMap;
701    use std::sync::Arc;
702
703    pub fn make_service() -> LogsService {
704        let state = Arc::new(parking_lot::RwLock::new(
705            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
706        ));
707        let delivery_bus = Arc::new(DeliveryBus::new());
708        LogsService::new(state, delivery_bus)
709    }
710
711    pub fn make_request(
712        action: &str,
713        body: serde_json::Value,
714    ) -> fakecloud_core::service::AwsRequest {
715        fakecloud_core::service::AwsRequest {
716            service: "logs".to_string(),
717            action: action.to_string(),
718            region: "us-east-1".to_string(),
719            account_id: "123456789012".to_string(),
720            request_id: "test-request-id".to_string(),
721            headers: HeaderMap::new(),
722            query_params: HashMap::new(),
723            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
724            body_stream: parking_lot::Mutex::new(None),
725            path_segments: vec![],
726            raw_path: "/".to_string(),
727            raw_query: String::new(),
728            method: Method::POST,
729            is_query_protocol: false,
730            access_key_id: None,
731            principal: None,
732        }
733    }
734
735    pub fn create_group(svc: &LogsService, name: &str) {
736        let req = make_request(
737            "CreateLogGroup",
738            serde_json::json!({ "logGroupName": name }),
739        );
740        svc.create_log_group(&req).unwrap();
741    }
742
743    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
744        let req = make_request(
745            "CreateLogStream",
746            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
747        );
748        svc.create_log_stream(&req).unwrap();
749    }
750
751    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
752        let now = chrono::Utc::now().timestamp_millis();
753        let events: Vec<serde_json::Value> = messages
754            .iter()
755            .enumerate()
756            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
757            .collect();
758        let req = make_request(
759            "PutLogEvents",
760            serde_json::json!({
761                "logGroupName": group,
762                "logStreamName": stream,
763                "logEvents": events,
764            }),
765        );
766        svc.put_log_events(&req).unwrap();
767    }
768
769    pub fn put_events_at(
770        svc: &LogsService,
771        group: &str,
772        stream: &str,
773        messages: &[&str],
774        timestamp: i64,
775    ) {
776        let events: Vec<serde_json::Value> = messages
777            .iter()
778            .enumerate()
779            .map(
780                |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
781            )
782            .collect();
783        let req = make_request(
784            "PutLogEvents",
785            serde_json::json!({
786                "logGroupName": group,
787                "logStreamName": stream,
788                "logEvents": events,
789            }),
790        );
791        svc.put_log_events(&req).unwrap();
792    }
793
794    pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
795        let req = make_request(
796            "PutRetentionPolicy",
797            serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
798        );
799        svc.put_retention_policy(&req).unwrap();
800    }
801
802    #[test]
803    fn array_filter_pattern_does_not_match() {
804        assert!(
805            !matches_filter_pattern("[w1, w2, w3]", "some log message"),
806            "array-style filter pattern must not match (fail closed)"
807        );
808    }
809
810    #[test]
811    fn unrecognized_json_filter_path_does_not_match() {
812        // A JSON filter condition where the field part doesn't start with $.
813        // should fail closed instead of matching everything.
814        assert!(
815            !matches_single_json_condition(
816                "level = \"ERROR\"",
817                &serde_json::json!({"level": "ERROR"}),
818            ),
819            "filter condition without $. prefix must not match (fail closed)"
820        );
821    }
822
823    #[test]
824    fn unknown_value_format_does_not_match() {
825        // A value that is not a string, number, or boolean should fail closed.
826        assert!(
827            !matches_single_json_condition(
828                "$.level = ERROR",
829                &serde_json::json!({"level": "ERROR"}),
830            ),
831            "unquoted non-numeric non-boolean value must not match (fail closed)"
832        );
833    }
834}