Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::state::SharedLogsState;
11
12mod anomaly;
13mod deliveries;
14mod destinations;
15mod exports;
16mod filters;
17mod groups;
18mod misc;
19mod policies;
20mod queries;
21mod streams;
22mod tags;
23
24pub struct LogsService {
25    state: SharedLogsState,
26    delivery_bus: Arc<DeliveryBus>,
27}
28
29impl LogsService {
30    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
31        Self {
32            state,
33            delivery_bus,
34        }
35    }
36}
37
38#[async_trait]
39impl AwsService for LogsService {
40    fn service_name(&self) -> &str {
41        "logs"
42    }
43
44    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
45        match req.action.as_str() {
46            "CreateLogGroup" => self.create_log_group(&req),
47            "DeleteLogGroup" => self.delete_log_group(&req),
48            "DescribeLogGroups" => self.describe_log_groups(&req),
49            "CreateLogStream" => self.create_log_stream(&req),
50            "DeleteLogStream" => self.delete_log_stream(&req),
51            "DescribeLogStreams" => self.describe_log_streams(&req),
52            "PutLogEvents" => self.put_log_events(&req),
53            "GetLogEvents" => self.get_log_events(&req),
54            "FilterLogEvents" => self.filter_log_events(&req),
55            "TagLogGroup" => self.tag_log_group(&req),
56            "UntagLogGroup" => self.untag_log_group(&req),
57            "ListTagsLogGroup" => self.list_tags_log_group(&req),
58            "TagResource" => self.tag_resource(&req),
59            "UntagResource" => self.untag_resource(&req),
60            "ListTagsForResource" => self.list_tags_for_resource(&req),
61            "PutRetentionPolicy" => self.put_retention_policy(&req),
62            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
63            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
64            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
65            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
66            "PutMetricFilter" => self.put_metric_filter(&req),
67            "DescribeMetricFilters" => self.describe_metric_filters(&req),
68            "DeleteMetricFilter" => self.delete_metric_filter(&req),
69            "PutResourcePolicy" => self.put_resource_policy(&req),
70            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
71            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
72            "PutDestination" => self.put_destination(&req),
73            "DescribeDestinations" => self.describe_destinations(&req),
74            "DeleteDestination" => self.delete_destination(&req),
75            "PutDestinationPolicy" => self.put_destination_policy(&req),
76            "StartQuery" => self.start_query(&req),
77            "GetQueryResults" => self.get_query_results(&req),
78            "DescribeQueries" => self.describe_queries(&req),
79            "CreateExportTask" => self.create_export_task(&req),
80            "DescribeExportTasks" => self.describe_export_tasks(&req),
81            "CancelExportTask" => self.cancel_export_task(&req),
82            "PutDeliveryDestination" => self.put_delivery_destination(&req),
83            "GetDeliveryDestination" => self.get_delivery_destination(&req),
84            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
85            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
86            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
87            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
88            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
89            "PutDeliverySource" => self.put_delivery_source(&req),
90            "GetDeliverySource" => self.get_delivery_source(&req),
91            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
92            "DeleteDeliverySource" => self.delete_delivery_source(&req),
93            "CreateDelivery" => self.create_delivery(&req),
94            "GetDelivery" => self.get_delivery(&req),
95            "DescribeDeliveries" => self.describe_deliveries(&req),
96            "DeleteDelivery" => self.delete_delivery(&req),
97            "AssociateKmsKey" => self.associate_kms_key(&req),
98            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
99            "PutQueryDefinition" => self.put_query_definition(&req),
100            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
101            "DeleteQueryDefinition" => self.delete_query_definition(&req),
102            "PutAccountPolicy" => self.put_account_policy(&req),
103            "DescribeAccountPolicies" => self.describe_account_policies(&req),
104            "DeleteAccountPolicy" => self.delete_account_policy(&req),
105            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
106            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
107            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
108            "PutIndexPolicy" => self.put_index_policy(&req),
109            "DescribeIndexPolicies" => self.describe_index_policies(&req),
110            "DeleteIndexPolicy" => self.delete_index_policy(&req),
111            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
112            "PutTransformer" => self.put_transformer(&req),
113            "GetTransformer" => self.get_transformer(&req),
114            "DeleteTransformer" => self.delete_transformer(&req),
115            "TestTransformer" => self.test_transformer(&req),
116            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
117            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
118            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
119            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
120            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
121            "GetLogGroupFields" => self.get_log_group_fields(&req),
122            "TestMetricFilter" => self.test_metric_filter(&req),
123            "StopQuery" => self.stop_query(&req),
124            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
125            "GetLogRecord" => self.get_log_record(&req),
126            "ListAnomalies" => self.list_anomalies(&req),
127            "UpdateAnomaly" => self.update_anomaly(&req),
128            "CreateImportTask" => self.create_import_task(&req),
129            "DescribeImportTasks" => self.describe_import_tasks(&req),
130            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
131            "CancelImportTask" => self.cancel_import_task(&req),
132            "PutIntegration" => self.put_integration(&req),
133            "GetIntegration" => self.get_integration(&req),
134            "DeleteIntegration" => self.delete_integration(&req),
135            "ListIntegrations" => self.list_integrations(&req),
136            "CreateLookupTable" => self.create_lookup_table(&req),
137            "GetLookupTable" => self.get_lookup_table(&req),
138            "DescribeLookupTables" => self.describe_lookup_tables(&req),
139            "DeleteLookupTable" => self.delete_lookup_table(&req),
140            "UpdateLookupTable" => self.update_lookup_table(&req),
141            "CreateScheduledQuery" => self.create_scheduled_query(&req),
142            "GetScheduledQuery" => self.get_scheduled_query(&req),
143            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
144            "ListScheduledQueries" => self.list_scheduled_queries(&req),
145            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
146            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
147            "StartLiveTail" => self.start_live_tail(&req),
148            "ListLogGroups" => self.list_log_groups(&req),
149            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
150            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
151            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
152            "GetLogObject" => self.get_log_object(&req),
153            "GetLogFields" => self.get_log_fields(&req),
154            "AssociateSourceToS3TableIntegration" => {
155                self.associate_source_to_s3_table_integration(&req)
156            }
157            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
158            "DisassociateSourceFromS3TableIntegration" => {
159                self.disassociate_source_from_s3_table_integration(&req)
160            }
161            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
162            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
163            // Internal action for testing export storage
164            "GetExportedData" => self.get_exported_data(&req),
165            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
166        }
167    }
168
169    fn supported_actions(&self) -> &[&str] {
170        SUPPORTED_ACTIONS
171    }
172}
173
174const SUPPORTED_ACTIONS: &[&str] = &[
175    "CreateLogGroup",
176    "DeleteLogGroup",
177    "DescribeLogGroups",
178    "CreateLogStream",
179    "DeleteLogStream",
180    "DescribeLogStreams",
181    "PutLogEvents",
182    "GetLogEvents",
183    "FilterLogEvents",
184    "TagLogGroup",
185    "UntagLogGroup",
186    "ListTagsLogGroup",
187    "TagResource",
188    "UntagResource",
189    "ListTagsForResource",
190    "PutRetentionPolicy",
191    "DeleteRetentionPolicy",
192    "PutSubscriptionFilter",
193    "DescribeSubscriptionFilters",
194    "DeleteSubscriptionFilter",
195    "PutMetricFilter",
196    "DescribeMetricFilters",
197    "DeleteMetricFilter",
198    "PutResourcePolicy",
199    "DescribeResourcePolicies",
200    "DeleteResourcePolicy",
201    "PutDestination",
202    "DescribeDestinations",
203    "DeleteDestination",
204    "PutDestinationPolicy",
205    "StartQuery",
206    "GetQueryResults",
207    "DescribeQueries",
208    "CreateExportTask",
209    "DescribeExportTasks",
210    "CancelExportTask",
211    "PutDeliveryDestination",
212    "GetDeliveryDestination",
213    "DescribeDeliveryDestinations",
214    "DeleteDeliveryDestination",
215    "PutDeliveryDestinationPolicy",
216    "GetDeliveryDestinationPolicy",
217    "DeleteDeliveryDestinationPolicy",
218    "PutDeliverySource",
219    "GetDeliverySource",
220    "DescribeDeliverySources",
221    "DeleteDeliverySource",
222    "CreateDelivery",
223    "GetDelivery",
224    "DescribeDeliveries",
225    "DeleteDelivery",
226    "AssociateKmsKey",
227    "DisassociateKmsKey",
228    "PutQueryDefinition",
229    "DescribeQueryDefinitions",
230    "DeleteQueryDefinition",
231    "PutAccountPolicy",
232    "DescribeAccountPolicies",
233    "DeleteAccountPolicy",
234    "PutDataProtectionPolicy",
235    "GetDataProtectionPolicy",
236    "DeleteDataProtectionPolicy",
237    "PutIndexPolicy",
238    "DescribeIndexPolicies",
239    "DeleteIndexPolicy",
240    "DescribeFieldIndexes",
241    "PutTransformer",
242    "GetTransformer",
243    "DeleteTransformer",
244    "TestTransformer",
245    "CreateLogAnomalyDetector",
246    "GetLogAnomalyDetector",
247    "DeleteLogAnomalyDetector",
248    "ListLogAnomalyDetectors",
249    "UpdateLogAnomalyDetector",
250    "GetLogGroupFields",
251    "TestMetricFilter",
252    "StopQuery",
253    "PutLogGroupDeletionProtection",
254    "GetLogRecord",
255    "ListAnomalies",
256    "UpdateAnomaly",
257    "CreateImportTask",
258    "DescribeImportTasks",
259    "DescribeImportTaskBatches",
260    "CancelImportTask",
261    "PutIntegration",
262    "GetIntegration",
263    "DeleteIntegration",
264    "ListIntegrations",
265    "CreateLookupTable",
266    "GetLookupTable",
267    "DescribeLookupTables",
268    "DeleteLookupTable",
269    "UpdateLookupTable",
270    "CreateScheduledQuery",
271    "GetScheduledQuery",
272    "GetScheduledQueryHistory",
273    "ListScheduledQueries",
274    "DeleteScheduledQuery",
275    "UpdateScheduledQuery",
276    "StartLiveTail",
277    "ListLogGroups",
278    "ListLogGroupsForQuery",
279    "ListAggregateLogGroupSummaries",
280    "PutBearerTokenAuthentication",
281    "GetLogObject",
282    "GetLogFields",
283    "AssociateSourceToS3TableIntegration",
284    "ListSourcesForS3TableIntegration",
285    "DisassociateSourceFromS3TableIntegration",
286    "UpdateDeliveryConfiguration",
287    "DescribeConfigurationTemplates",
288];
289
290fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
291    body[field].as_str().ok_or_else(|| {
292        AwsServiceError::aws_error(
293            StatusCode::BAD_REQUEST,
294            "InvalidParameterException",
295            format!("{field} is required"),
296        )
297    })
298}
299
300/// Build a delivery destination configuration JSON object, ensuring
301/// `destinationResourceArn` is always present as a string (Smithy requirement).
302fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
303    let mut m: serde_json::Map<String, Value> =
304        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
305    m.entry("destinationResourceArn".to_string())
306        .or_insert_with(|| json!(""));
307    Value::Object(m)
308}
309
310fn generate_sequence_token() -> String {
311    use std::time::{SystemTime, UNIX_EPOCH};
312    let nanos = SystemTime::now()
313        .duration_since(UNIX_EPOCH)
314        .unwrap_or_default()
315        .as_nanos();
316    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
317    format!("{:038}", nanos % 10u128.pow(38))
318}
319
320fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
321    AwsServiceError::aws_error(
322        StatusCode::BAD_REQUEST,
323        "InvalidParameterException",
324        format!(
325            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
326        ),
327    )
328}
329
330/// Resolve log group name from either logGroupName or resourceIdentifier.
331/// resourceIdentifier can be a log group name or an ARN.
332fn resolve_log_group_name(
333    log_group_name: Option<&str>,
334    resource_identifier: Option<&str>,
335) -> Result<String, AwsServiceError> {
336    if let Some(identifier) = resource_identifier {
337        if identifier.starts_with("arn:") {
338            extract_log_group_from_arn(identifier).ok_or_else(|| {
339                AwsServiceError::aws_error(
340                    StatusCode::BAD_REQUEST,
341                    "InvalidParameterException",
342                    format!("Invalid ARN: {identifier}"),
343                )
344            })
345        } else {
346            Ok(identifier.to_string())
347        }
348    } else if let Some(name) = log_group_name {
349        Ok(name.to_string())
350    } else {
351        Err(AwsServiceError::aws_error(
352            StatusCode::BAD_REQUEST,
353            "InvalidParameterException",
354            "Either logGroupName or resourceIdentifier is required",
355        ))
356    }
357}
358
359/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
360fn extract_log_group_from_arn(arn: &str) -> Option<String> {
361    // arn:aws:logs:region:account:log-group:name:*
362    let parts: Vec<&str> = arn.splitn(7, ':').collect();
363    if parts.len() >= 7 && parts[5] == "log-group" {
364        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
365        Some(name.to_string())
366    } else {
367        None
368    }
369}
370
371/// CloudWatch Logs filter pattern matching.
372///
373/// Rules:
374/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
375/// - Quoted string `"foo bar"` matches the exact substring
376/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
377/// - Single unquoted word `foo` is a simple substring match
378fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
379    let pattern = pattern.trim();
380
381    // Empty pattern matches everything
382    if pattern.is_empty() {
383        return true;
384    }
385
386    // JSON/metric filter patterns: { $.field = "value" }
387    if pattern.starts_with('{') && pattern.ends_with('}') {
388        return matches_json_filter_pattern(pattern, message);
389    }
390
391    // Array-style metric filter patterns - not implemented, fail closed
392    if pattern.starts_with('[') {
393        return false;
394    }
395
396    // Quoted pattern: exact substring match (handles escaped inner quotes)
397    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
398        let inner = &pattern[1..pattern.len() - 1];
399        // Unescape inner quotes: \"  ->  "
400        let unescaped = inner.replace("\\\"", "\"");
401        return message.contains(&unescaped);
402    }
403
404    // Multiple words: all must be present (AND semantics)
405    let terms = parse_filter_terms(pattern);
406    terms.iter().all(|term| message.contains(term.as_str()))
407}
408
409/// Parse filter pattern terms, respecting quoted strings as single terms.
410fn parse_filter_terms(pattern: &str) -> Vec<String> {
411    let mut terms = Vec::new();
412    let mut chars = pattern.chars().peekable();
413
414    while chars.peek().is_some() {
415        // Skip whitespace
416        while chars.peek().is_some_and(|c| c.is_whitespace()) {
417            chars.next();
418        }
419
420        if chars.peek().is_none() {
421            break;
422        }
423
424        if chars.peek() == Some(&'"') {
425            // Quoted term
426            chars.next(); // consume opening quote
427            let mut term = String::new();
428            loop {
429                match chars.next() {
430                    Some('\\') => {
431                        if let Some(c) = chars.next() {
432                            term.push(c);
433                        }
434                    }
435                    Some('"') => break,
436                    Some(c) => term.push(c),
437                    None => break,
438                }
439            }
440            terms.push(term);
441        } else {
442            // Unquoted term
443            let mut term = String::new();
444            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
445                term.push(chars.next().unwrap());
446            }
447            if !term.is_empty() {
448                terms.push(term);
449            }
450        }
451    }
452
453    terms
454}
455
456/// Match a JSON filter pattern like `{ $.level = "ERROR" }` against a message.
457fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
458    // Strip the outer braces
459    let inner = pattern
460        .strip_prefix('{')
461        .and_then(|s| s.strip_suffix('}'))
462        .unwrap_or("")
463        .trim();
464
465    if inner.is_empty() {
466        return true;
467    }
468
469    // Parse the message as JSON
470    let msg_json: serde_json::Value = match serde_json::from_str(message) {
471        Ok(v) => v,
472        Err(_) => return false, // Non-JSON message cannot match JSON filter
473    };
474
475    // Support: $.field = "value", $.field != "value", $.field = number,
476    //          $.field > number, $.field < number, $.field >= number, $.field <= number
477    // Also support && for multiple conditions
478    let conditions: Vec<&str> = inner.split("&&").collect();
479
480    for condition in conditions {
481        let condition = condition.trim();
482        if !matches_single_json_condition(condition, &msg_json) {
483            return false;
484        }
485    }
486
487    true
488}
489
490fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
491    // Try to parse: $.field op value
492    let condition = condition.trim();
493
494    // Find the operator
495    let ops = ["!=", ">=", "<=", "=", ">", "<"];
496    let mut found_op = None;
497    let mut op_pos = 0;
498    let mut op_len = 0;
499
500    for op in &ops {
501        if let Some(pos) = condition.find(op) {
502            // Make sure we're not inside a quoted string
503            let before = &condition[..pos];
504            let quote_count = before.chars().filter(|&c| c == '"').count();
505            if quote_count % 2 == 0 {
506                found_op = Some(*op);
507                op_pos = pos;
508                op_len = op.len();
509                break;
510            }
511        }
512    }
513
514    let (op, field_part, value_part) = match found_op {
515        Some(op) => (
516            op,
517            condition[..op_pos].trim(),
518            condition[op_pos + op_len..].trim(),
519        ),
520        None => {
521            // No operator: just check if the field exists
522            // Pattern like `{ $.field }` means field exists
523            if let Some(path) = condition.strip_prefix("$.") {
524                return resolve_json_path_simple(json, path).is_some();
525            }
526            return true;
527        }
528    };
529
530    // Extract JSON path from field_part (must start with $.)
531    let path = match field_part.strip_prefix("$.") {
532        Some(p) => p,
533        None => return false, // Don't understand this pattern, fail closed
534    };
535
536    let actual_value = match resolve_json_path_simple(json, path) {
537        Some(v) => v,
538        None => return op == "!=", // field doesn't exist: only != matches
539    };
540
541    // Parse the expected value
542    let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
543        // String comparison
544        let s = &value_part[1..value_part.len() - 1];
545        match op {
546            "=" => actual_value.as_str() == Some(s),
547            "!=" => actual_value.as_str() != Some(s),
548            _ => false,
549        }
550    } else if let Ok(expected_num) = value_part.parse::<f64>() {
551        // Numeric comparison
552        let actual_num = actual_value.as_f64();
553        match (op, actual_num) {
554            ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
555            ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
556            (">", Some(n)) => n > expected_num,
557            ("<", Some(n)) => n < expected_num,
558            (">=", Some(n)) => n >= expected_num,
559            ("<=", Some(n)) => n <= expected_num,
560            _ => false,
561        }
562    } else if value_part == "true" || value_part == "false" {
563        let expected_bool = value_part == "true";
564        match op {
565            "=" => actual_value.as_bool() == Some(expected_bool),
566            "!=" => actual_value.as_bool() != Some(expected_bool),
567            _ => false,
568        }
569    } else {
570        false // Unknown value format, fail closed
571    };
572
573    expected_str
574}
575
576/// Resolve a simple dot-separated JSON path (e.g., "level" or "nested.field").
577fn resolve_json_path_simple<'a>(
578    json: &'a serde_json::Value,
579    path: &str,
580) -> Option<&'a serde_json::Value> {
581    let mut current = json;
582    for part in path.split('.') {
583        current = current.get(part)?;
584    }
585    if current.is_null() {
586        None
587    } else {
588        Some(current)
589    }
590}
591
592#[cfg(test)]
593pub(crate) mod test_helpers {
594    use super::*;
595    use crate::state::LogsState;
596    use bytes::Bytes;
597    use fakecloud_core::delivery::DeliveryBus;
598    use http::{HeaderMap, Method};
599    use std::collections::HashMap;
600    use std::sync::Arc;
601
602    pub fn make_service() -> LogsService {
603        let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
604            "123456789012",
605            "us-east-1",
606        )));
607        let delivery_bus = Arc::new(DeliveryBus::new());
608        LogsService::new(state, delivery_bus)
609    }
610
611    pub fn make_request(
612        action: &str,
613        body: serde_json::Value,
614    ) -> fakecloud_core::service::AwsRequest {
615        fakecloud_core::service::AwsRequest {
616            service: "logs".to_string(),
617            action: action.to_string(),
618            region: "us-east-1".to_string(),
619            account_id: "123456789012".to_string(),
620            request_id: "test-request-id".to_string(),
621            headers: HeaderMap::new(),
622            query_params: HashMap::new(),
623            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
624            path_segments: vec![],
625            raw_path: "/".to_string(),
626            raw_query: String::new(),
627            method: Method::POST,
628            is_query_protocol: false,
629            access_key_id: None,
630            principal: None,
631        }
632    }
633
634    pub fn create_group(svc: &LogsService, name: &str) {
635        let req = make_request(
636            "CreateLogGroup",
637            serde_json::json!({ "logGroupName": name }),
638        );
639        svc.create_log_group(&req).unwrap();
640    }
641
642    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
643        let req = make_request(
644            "CreateLogStream",
645            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
646        );
647        svc.create_log_stream(&req).unwrap();
648    }
649
650    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
651        let now = chrono::Utc::now().timestamp_millis();
652        let events: Vec<serde_json::Value> = messages
653            .iter()
654            .enumerate()
655            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
656            .collect();
657        let req = make_request(
658            "PutLogEvents",
659            serde_json::json!({
660                "logGroupName": group,
661                "logStreamName": stream,
662                "logEvents": events,
663            }),
664        );
665        svc.put_log_events(&req).unwrap();
666    }
667
668    #[test]
669    fn array_filter_pattern_does_not_match() {
670        assert!(
671            !matches_filter_pattern("[w1, w2, w3]", "some log message"),
672            "array-style filter pattern must not match (fail closed)"
673        );
674    }
675
676    #[test]
677    fn unrecognized_json_filter_path_does_not_match() {
678        // A JSON filter condition where the field part doesn't start with $.
679        // should fail closed instead of matching everything.
680        assert!(
681            !matches_single_json_condition(
682                "level = \"ERROR\"",
683                &serde_json::json!({"level": "ERROR"}),
684            ),
685            "filter condition without $. prefix must not match (fail closed)"
686        );
687    }
688
689    #[test]
690    fn unknown_value_format_does_not_match() {
691        // A value that is not a string, number, or boolean should fail closed.
692        assert!(
693            !matches_single_json_condition(
694                "$.level = ERROR",
695                &serde_json::json!({"level": "ERROR"}),
696            ),
697            "unquoted non-numeric non-boolean value must not match (fail closed)"
698        );
699    }
700}