Skip to main content

fakecloud_logs/service/
mod.rs

1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::state::SharedLogsState;
11
12mod anomaly;
13mod deliveries;
14mod destinations;
15mod exports;
16mod filters;
17mod groups;
18mod misc;
19mod policies;
20mod queries;
21mod streams;
22mod tags;
23
24pub struct LogsService {
25    state: SharedLogsState,
26    delivery_bus: Arc<DeliveryBus>,
27}
28
29impl LogsService {
30    pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
31        Self {
32            state,
33            delivery_bus,
34        }
35    }
36}
37
38#[async_trait]
39impl AwsService for LogsService {
40    fn service_name(&self) -> &str {
41        "logs"
42    }
43
44    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
45        match req.action.as_str() {
46            "CreateLogGroup" => self.create_log_group(&req),
47            "DeleteLogGroup" => self.delete_log_group(&req),
48            "DescribeLogGroups" => self.describe_log_groups(&req),
49            "CreateLogStream" => self.create_log_stream(&req),
50            "DeleteLogStream" => self.delete_log_stream(&req),
51            "DescribeLogStreams" => self.describe_log_streams(&req),
52            "PutLogEvents" => self.put_log_events(&req),
53            "GetLogEvents" => self.get_log_events(&req),
54            "FilterLogEvents" => self.filter_log_events(&req),
55            "TagLogGroup" => self.tag_log_group(&req),
56            "UntagLogGroup" => self.untag_log_group(&req),
57            "ListTagsLogGroup" => self.list_tags_log_group(&req),
58            "TagResource" => self.tag_resource(&req),
59            "UntagResource" => self.untag_resource(&req),
60            "ListTagsForResource" => self.list_tags_for_resource(&req),
61            "PutRetentionPolicy" => self.put_retention_policy(&req),
62            "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
63            "PutSubscriptionFilter" => self.put_subscription_filter(&req),
64            "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
65            "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
66            "PutMetricFilter" => self.put_metric_filter(&req),
67            "DescribeMetricFilters" => self.describe_metric_filters(&req),
68            "DeleteMetricFilter" => self.delete_metric_filter(&req),
69            "PutResourcePolicy" => self.put_resource_policy(&req),
70            "DescribeResourcePolicies" => self.describe_resource_policies(&req),
71            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
72            "PutDestination" => self.put_destination(&req),
73            "DescribeDestinations" => self.describe_destinations(&req),
74            "DeleteDestination" => self.delete_destination(&req),
75            "PutDestinationPolicy" => self.put_destination_policy(&req),
76            "StartQuery" => self.start_query(&req),
77            "GetQueryResults" => self.get_query_results(&req),
78            "DescribeQueries" => self.describe_queries(&req),
79            "CreateExportTask" => self.create_export_task(&req),
80            "DescribeExportTasks" => self.describe_export_tasks(&req),
81            "CancelExportTask" => self.cancel_export_task(&req),
82            "PutDeliveryDestination" => self.put_delivery_destination(&req),
83            "GetDeliveryDestination" => self.get_delivery_destination(&req),
84            "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
85            "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
86            "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
87            "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
88            "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
89            "PutDeliverySource" => self.put_delivery_source(&req),
90            "GetDeliverySource" => self.get_delivery_source(&req),
91            "DescribeDeliverySources" => self.describe_delivery_sources(&req),
92            "DeleteDeliverySource" => self.delete_delivery_source(&req),
93            "CreateDelivery" => self.create_delivery(&req),
94            "GetDelivery" => self.get_delivery(&req),
95            "DescribeDeliveries" => self.describe_deliveries(&req),
96            "DeleteDelivery" => self.delete_delivery(&req),
97            "AssociateKmsKey" => self.associate_kms_key(&req),
98            "DisassociateKmsKey" => self.disassociate_kms_key(&req),
99            "PutQueryDefinition" => self.put_query_definition(&req),
100            "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
101            "DeleteQueryDefinition" => self.delete_query_definition(&req),
102            "PutAccountPolicy" => self.put_account_policy(&req),
103            "DescribeAccountPolicies" => self.describe_account_policies(&req),
104            "DeleteAccountPolicy" => self.delete_account_policy(&req),
105            "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
106            "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
107            "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
108            "PutIndexPolicy" => self.put_index_policy(&req),
109            "DescribeIndexPolicies" => self.describe_index_policies(&req),
110            "DeleteIndexPolicy" => self.delete_index_policy(&req),
111            "DescribeFieldIndexes" => self.describe_field_indexes(&req),
112            "PutTransformer" => self.put_transformer(&req),
113            "GetTransformer" => self.get_transformer(&req),
114            "DeleteTransformer" => self.delete_transformer(&req),
115            "TestTransformer" => self.test_transformer(&req),
116            "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
117            "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
118            "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
119            "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
120            "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
121            "GetLogGroupFields" => self.get_log_group_fields(&req),
122            "TestMetricFilter" => self.test_metric_filter(&req),
123            "StopQuery" => self.stop_query(&req),
124            "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
125            "GetLogRecord" => self.get_log_record(&req),
126            "ListAnomalies" => self.list_anomalies(&req),
127            "UpdateAnomaly" => self.update_anomaly(&req),
128            "CreateImportTask" => self.create_import_task(&req),
129            "DescribeImportTasks" => self.describe_import_tasks(&req),
130            "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
131            "CancelImportTask" => self.cancel_import_task(&req),
132            "PutIntegration" => self.put_integration(&req),
133            "GetIntegration" => self.get_integration(&req),
134            "DeleteIntegration" => self.delete_integration(&req),
135            "ListIntegrations" => self.list_integrations(&req),
136            "CreateLookupTable" => self.create_lookup_table(&req),
137            "GetLookupTable" => self.get_lookup_table(&req),
138            "DescribeLookupTables" => self.describe_lookup_tables(&req),
139            "DeleteLookupTable" => self.delete_lookup_table(&req),
140            "UpdateLookupTable" => self.update_lookup_table(&req),
141            "CreateScheduledQuery" => self.create_scheduled_query(&req),
142            "GetScheduledQuery" => self.get_scheduled_query(&req),
143            "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
144            "ListScheduledQueries" => self.list_scheduled_queries(&req),
145            "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
146            "UpdateScheduledQuery" => self.update_scheduled_query(&req),
147            "StartLiveTail" => self.start_live_tail(&req),
148            "ListLogGroups" => self.list_log_groups(&req),
149            "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
150            "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
151            "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
152            "GetLogObject" => self.get_log_object(&req),
153            "GetLogFields" => self.get_log_fields(&req),
154            "AssociateSourceToS3TableIntegration" => {
155                self.associate_source_to_s3_table_integration(&req)
156            }
157            "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
158            "DisassociateSourceFromS3TableIntegration" => {
159                self.disassociate_source_from_s3_table_integration(&req)
160            }
161            "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
162            "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
163            // Internal action for testing export storage
164            "GetExportedData" => self.get_exported_data(&req),
165            _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
166        }
167    }
168
169    fn supported_actions(&self) -> &[&str] {
170        &[
171            "CreateLogGroup",
172            "DeleteLogGroup",
173            "DescribeLogGroups",
174            "CreateLogStream",
175            "DeleteLogStream",
176            "DescribeLogStreams",
177            "PutLogEvents",
178            "GetLogEvents",
179            "FilterLogEvents",
180            "TagLogGroup",
181            "UntagLogGroup",
182            "ListTagsLogGroup",
183            "TagResource",
184            "UntagResource",
185            "ListTagsForResource",
186            "PutRetentionPolicy",
187            "DeleteRetentionPolicy",
188            "PutSubscriptionFilter",
189            "DescribeSubscriptionFilters",
190            "DeleteSubscriptionFilter",
191            "PutMetricFilter",
192            "DescribeMetricFilters",
193            "DeleteMetricFilter",
194            "PutResourcePolicy",
195            "DescribeResourcePolicies",
196            "DeleteResourcePolicy",
197            "PutDestination",
198            "DescribeDestinations",
199            "DeleteDestination",
200            "PutDestinationPolicy",
201            "StartQuery",
202            "GetQueryResults",
203            "DescribeQueries",
204            "CreateExportTask",
205            "DescribeExportTasks",
206            "CancelExportTask",
207            "PutDeliveryDestination",
208            "GetDeliveryDestination",
209            "DescribeDeliveryDestinations",
210            "DeleteDeliveryDestination",
211            "PutDeliveryDestinationPolicy",
212            "GetDeliveryDestinationPolicy",
213            "DeleteDeliveryDestinationPolicy",
214            "PutDeliverySource",
215            "GetDeliverySource",
216            "DescribeDeliverySources",
217            "DeleteDeliverySource",
218            "CreateDelivery",
219            "GetDelivery",
220            "DescribeDeliveries",
221            "DeleteDelivery",
222            "AssociateKmsKey",
223            "DisassociateKmsKey",
224            "PutQueryDefinition",
225            "DescribeQueryDefinitions",
226            "DeleteQueryDefinition",
227            "PutAccountPolicy",
228            "DescribeAccountPolicies",
229            "DeleteAccountPolicy",
230            "PutDataProtectionPolicy",
231            "GetDataProtectionPolicy",
232            "DeleteDataProtectionPolicy",
233            "PutIndexPolicy",
234            "DescribeIndexPolicies",
235            "DeleteIndexPolicy",
236            "DescribeFieldIndexes",
237            "PutTransformer",
238            "GetTransformer",
239            "DeleteTransformer",
240            "TestTransformer",
241            "CreateLogAnomalyDetector",
242            "GetLogAnomalyDetector",
243            "DeleteLogAnomalyDetector",
244            "ListLogAnomalyDetectors",
245            "UpdateLogAnomalyDetector",
246            "GetLogGroupFields",
247            "TestMetricFilter",
248            "StopQuery",
249            "PutLogGroupDeletionProtection",
250            "GetLogRecord",
251            "ListAnomalies",
252            "UpdateAnomaly",
253            "CreateImportTask",
254            "DescribeImportTasks",
255            "DescribeImportTaskBatches",
256            "CancelImportTask",
257            "PutIntegration",
258            "GetIntegration",
259            "DeleteIntegration",
260            "ListIntegrations",
261            "CreateLookupTable",
262            "GetLookupTable",
263            "DescribeLookupTables",
264            "DeleteLookupTable",
265            "UpdateLookupTable",
266            "CreateScheduledQuery",
267            "GetScheduledQuery",
268            "GetScheduledQueryHistory",
269            "ListScheduledQueries",
270            "DeleteScheduledQuery",
271            "UpdateScheduledQuery",
272            "StartLiveTail",
273            "ListLogGroups",
274            "ListLogGroupsForQuery",
275            "ListAggregateLogGroupSummaries",
276            "PutBearerTokenAuthentication",
277            "GetLogObject",
278            "GetLogFields",
279            "AssociateSourceToS3TableIntegration",
280            "ListSourcesForS3TableIntegration",
281            "DisassociateSourceFromS3TableIntegration",
282            "UpdateDeliveryConfiguration",
283            "DescribeConfigurationTemplates",
284        ]
285    }
286}
287
288fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
289    body[field].as_str().ok_or_else(|| {
290        AwsServiceError::aws_error(
291            StatusCode::BAD_REQUEST,
292            "InvalidParameterException",
293            format!("{field} is required"),
294        )
295    })
296}
297
298/// Build a delivery destination configuration JSON object, ensuring
299/// `destinationResourceArn` is always present as a string (Smithy requirement).
300fn dd_config_json(config: &std::collections::HashMap<String, String>) -> Value {
301    let mut m: serde_json::Map<String, Value> =
302        config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
303    m.entry("destinationResourceArn".to_string())
304        .or_insert_with(|| json!(""));
305    Value::Object(m)
306}
307
308fn generate_sequence_token() -> String {
309    use std::time::{SystemTime, UNIX_EPOCH};
310    let nanos = SystemTime::now()
311        .duration_since(UNIX_EPOCH)
312        .unwrap_or_default()
313        .as_nanos();
314    // u128 max is ~3.4e38, so we limit to 38 digits to avoid overflow
315    format!("{:038}", nanos % 10u128.pow(38))
316}
317
318fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
319    AwsServiceError::aws_error(
320        StatusCode::BAD_REQUEST,
321        "InvalidParameterException",
322        format!(
323            "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
324        ),
325    )
326}
327
328/// Resolve log group name from either logGroupName or resourceIdentifier.
329/// resourceIdentifier can be a log group name or an ARN.
330fn resolve_log_group_name(
331    log_group_name: Option<&str>,
332    resource_identifier: Option<&str>,
333) -> Result<String, AwsServiceError> {
334    if let Some(identifier) = resource_identifier {
335        if identifier.starts_with("arn:") {
336            extract_log_group_from_arn(identifier).ok_or_else(|| {
337                AwsServiceError::aws_error(
338                    StatusCode::BAD_REQUEST,
339                    "InvalidParameterException",
340                    format!("Invalid ARN: {identifier}"),
341                )
342            })
343        } else {
344            Ok(identifier.to_string())
345        }
346    } else if let Some(name) = log_group_name {
347        Ok(name.to_string())
348    } else {
349        Err(AwsServiceError::aws_error(
350            StatusCode::BAD_REQUEST,
351            "InvalidParameterException",
352            "Either logGroupName or resourceIdentifier is required",
353        ))
354    }
355}
356
357/// Extract log group name from ARN like "arn:aws:logs:region:account:log-group:name:*"
358fn extract_log_group_from_arn(arn: &str) -> Option<String> {
359    // arn:aws:logs:region:account:log-group:name:*
360    let parts: Vec<&str> = arn.splitn(7, ':').collect();
361    if parts.len() >= 7 && parts[5] == "log-group" {
362        let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
363        Some(name.to_string())
364    } else {
365        None
366    }
367}
368
369/// CloudWatch Logs filter pattern matching.
370///
371/// Rules:
372/// - Empty pattern or patterns starting with `{` (JSON patterns) match everything
373/// - Quoted string `"foo bar"` matches the exact substring
374/// - Multiple unquoted words `foo bar` means ALL words must appear anywhere in the message
375/// - Single unquoted word `foo` is a simple substring match
376fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
377    let pattern = pattern.trim();
378
379    // Empty pattern matches everything
380    if pattern.is_empty() {
381        return true;
382    }
383
384    // JSON/metric filter patterns: { $.field = "value" }
385    if pattern.starts_with('{') && pattern.ends_with('}') {
386        return matches_json_filter_pattern(pattern, message);
387    }
388
389    // Array-style metric filter patterns - not implemented, fail closed
390    if pattern.starts_with('[') {
391        return false;
392    }
393
394    // Quoted pattern: exact substring match (handles escaped inner quotes)
395    if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
396        let inner = &pattern[1..pattern.len() - 1];
397        // Unescape inner quotes: \"  ->  "
398        let unescaped = inner.replace("\\\"", "\"");
399        return message.contains(&unescaped);
400    }
401
402    // Multiple words: all must be present (AND semantics)
403    let terms = parse_filter_terms(pattern);
404    terms.iter().all(|term| message.contains(term.as_str()))
405}
406
407/// Parse filter pattern terms, respecting quoted strings as single terms.
408fn parse_filter_terms(pattern: &str) -> Vec<String> {
409    let mut terms = Vec::new();
410    let mut chars = pattern.chars().peekable();
411
412    while chars.peek().is_some() {
413        // Skip whitespace
414        while chars.peek().is_some_and(|c| c.is_whitespace()) {
415            chars.next();
416        }
417
418        if chars.peek().is_none() {
419            break;
420        }
421
422        if chars.peek() == Some(&'"') {
423            // Quoted term
424            chars.next(); // consume opening quote
425            let mut term = String::new();
426            loop {
427                match chars.next() {
428                    Some('\\') => {
429                        if let Some(c) = chars.next() {
430                            term.push(c);
431                        }
432                    }
433                    Some('"') => break,
434                    Some(c) => term.push(c),
435                    None => break,
436                }
437            }
438            terms.push(term);
439        } else {
440            // Unquoted term
441            let mut term = String::new();
442            while chars.peek().is_some_and(|c| !c.is_whitespace()) {
443                term.push(chars.next().unwrap());
444            }
445            if !term.is_empty() {
446                terms.push(term);
447            }
448        }
449    }
450
451    terms
452}
453
454/// Match a JSON filter pattern like `{ $.level = "ERROR" }` against a message.
455fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
456    // Strip the outer braces
457    let inner = pattern
458        .strip_prefix('{')
459        .and_then(|s| s.strip_suffix('}'))
460        .unwrap_or("")
461        .trim();
462
463    if inner.is_empty() {
464        return true;
465    }
466
467    // Parse the message as JSON
468    let msg_json: serde_json::Value = match serde_json::from_str(message) {
469        Ok(v) => v,
470        Err(_) => return false, // Non-JSON message cannot match JSON filter
471    };
472
473    // Support: $.field = "value", $.field != "value", $.field = number,
474    //          $.field > number, $.field < number, $.field >= number, $.field <= number
475    // Also support && for multiple conditions
476    let conditions: Vec<&str> = inner.split("&&").collect();
477
478    for condition in conditions {
479        let condition = condition.trim();
480        if !matches_single_json_condition(condition, &msg_json) {
481            return false;
482        }
483    }
484
485    true
486}
487
488fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
489    // Try to parse: $.field op value
490    let condition = condition.trim();
491
492    // Find the operator
493    let ops = ["!=", ">=", "<=", "=", ">", "<"];
494    let mut found_op = None;
495    let mut op_pos = 0;
496    let mut op_len = 0;
497
498    for op in &ops {
499        if let Some(pos) = condition.find(op) {
500            // Make sure we're not inside a quoted string
501            let before = &condition[..pos];
502            let quote_count = before.chars().filter(|&c| c == '"').count();
503            if quote_count % 2 == 0 {
504                found_op = Some(*op);
505                op_pos = pos;
506                op_len = op.len();
507                break;
508            }
509        }
510    }
511
512    let (op, field_part, value_part) = match found_op {
513        Some(op) => (
514            op,
515            condition[..op_pos].trim(),
516            condition[op_pos + op_len..].trim(),
517        ),
518        None => {
519            // No operator: just check if the field exists
520            // Pattern like `{ $.field }` means field exists
521            if let Some(path) = condition.strip_prefix("$.") {
522                return resolve_json_path_simple(json, path).is_some();
523            }
524            return true;
525        }
526    };
527
528    // Extract JSON path from field_part (must start with $.)
529    let path = match field_part.strip_prefix("$.") {
530        Some(p) => p,
531        None => return false, // Don't understand this pattern, fail closed
532    };
533
534    let actual_value = match resolve_json_path_simple(json, path) {
535        Some(v) => v,
536        None => return op == "!=", // field doesn't exist: only != matches
537    };
538
539    // Parse the expected value
540    let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
541        // String comparison
542        let s = &value_part[1..value_part.len() - 1];
543        match op {
544            "=" => actual_value.as_str() == Some(s),
545            "!=" => actual_value.as_str() != Some(s),
546            _ => false,
547        }
548    } else if let Ok(expected_num) = value_part.parse::<f64>() {
549        // Numeric comparison
550        let actual_num = actual_value.as_f64();
551        match (op, actual_num) {
552            ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
553            ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
554            (">", Some(n)) => n > expected_num,
555            ("<", Some(n)) => n < expected_num,
556            (">=", Some(n)) => n >= expected_num,
557            ("<=", Some(n)) => n <= expected_num,
558            _ => false,
559        }
560    } else if value_part == "true" || value_part == "false" {
561        let expected_bool = value_part == "true";
562        match op {
563            "=" => actual_value.as_bool() == Some(expected_bool),
564            "!=" => actual_value.as_bool() != Some(expected_bool),
565            _ => false,
566        }
567    } else {
568        false // Unknown value format, fail closed
569    };
570
571    expected_str
572}
573
574/// Resolve a simple dot-separated JSON path (e.g., "level" or "nested.field").
575fn resolve_json_path_simple<'a>(
576    json: &'a serde_json::Value,
577    path: &str,
578) -> Option<&'a serde_json::Value> {
579    let mut current = json;
580    for part in path.split('.') {
581        current = current.get(part)?;
582    }
583    if current.is_null() {
584        None
585    } else {
586        Some(current)
587    }
588}
589
590#[cfg(test)]
591pub(crate) mod test_helpers {
592    use super::*;
593    use crate::state::LogsState;
594    use bytes::Bytes;
595    use fakecloud_core::delivery::DeliveryBus;
596    use http::{HeaderMap, Method};
597    use std::collections::HashMap;
598    use std::sync::Arc;
599
600    pub fn make_service() -> LogsService {
601        let state = Arc::new(parking_lot::RwLock::new(LogsState::new(
602            "123456789012",
603            "us-east-1",
604        )));
605        let delivery_bus = Arc::new(DeliveryBus::new());
606        LogsService::new(state, delivery_bus)
607    }
608
609    pub fn make_request(
610        action: &str,
611        body: serde_json::Value,
612    ) -> fakecloud_core::service::AwsRequest {
613        fakecloud_core::service::AwsRequest {
614            service: "logs".to_string(),
615            action: action.to_string(),
616            region: "us-east-1".to_string(),
617            account_id: "123456789012".to_string(),
618            request_id: "test-request-id".to_string(),
619            headers: HeaderMap::new(),
620            query_params: HashMap::new(),
621            body: Bytes::from(serde_json::to_vec(&body).unwrap()),
622            path_segments: vec![],
623            raw_path: "/".to_string(),
624            raw_query: String::new(),
625            method: Method::POST,
626            is_query_protocol: false,
627            access_key_id: None,
628        }
629    }
630
631    pub fn create_group(svc: &LogsService, name: &str) {
632        let req = make_request(
633            "CreateLogGroup",
634            serde_json::json!({ "logGroupName": name }),
635        );
636        svc.create_log_group(&req).unwrap();
637    }
638
639    pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
640        let req = make_request(
641            "CreateLogStream",
642            serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
643        );
644        svc.create_log_stream(&req).unwrap();
645    }
646
647    pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
648        let now = chrono::Utc::now().timestamp_millis();
649        let events: Vec<serde_json::Value> = messages
650            .iter()
651            .enumerate()
652            .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
653            .collect();
654        let req = make_request(
655            "PutLogEvents",
656            serde_json::json!({
657                "logGroupName": group,
658                "logStreamName": stream,
659                "logEvents": events,
660            }),
661        );
662        svc.put_log_events(&req).unwrap();
663    }
664
665    #[test]
666    fn array_filter_pattern_does_not_match() {
667        assert!(
668            !matches_filter_pattern("[w1, w2, w3]", "some log message"),
669            "array-style filter pattern must not match (fail closed)"
670        );
671    }
672
673    #[test]
674    fn unrecognized_json_filter_path_does_not_match() {
675        // A JSON filter condition where the field part doesn't start with $.
676        // should fail closed instead of matching everything.
677        assert!(
678            !matches_single_json_condition(
679                "level = \"ERROR\"",
680                &serde_json::json!({"level": "ERROR"}),
681            ),
682            "filter condition without $. prefix must not match (fail closed)"
683        );
684    }
685
686    #[test]
687    fn unknown_value_format_does_not_match() {
688        // A value that is not a string, number, or boolean should fail closed.
689        assert!(
690            !matches_single_json_condition(
691                "$.level = ERROR",
692                &serde_json::json!({"level": "ERROR"}),
693            ),
694            "unquoted non-numeric non-boolean value must not match (fail closed)"
695        );
696    }
697}