1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27fn is_read_only_action(action: &str) -> bool {
30 matches!(
31 action,
32 "DescribeLogGroups"
33 | "DescribeLogStreams"
34 | "GetLogEvents"
35 | "FilterLogEvents"
36 | "ListTagsLogGroup"
37 | "ListTagsForResource"
38 | "DescribeSubscriptionFilters"
39 | "DescribeMetricFilters"
40 | "DescribeResourcePolicies"
41 | "DescribeDestinations"
42 | "GetQueryResults"
43 | "DescribeQueries"
44 | "DescribeExportTasks"
45 | "GetDeliveryDestination"
46 | "DescribeDeliveryDestinations"
47 | "GetDeliveryDestinationPolicy"
48 | "GetDeliverySource"
49 | "DescribeDeliverySources"
50 | "GetDelivery"
51 | "DescribeDeliveries"
52 | "DescribeQueryDefinitions"
53 | "DescribeAccountPolicies"
54 | "GetDataProtectionPolicy"
55 | "DescribeIndexPolicies"
56 | "DescribeFieldIndexes"
57 | "GetTransformer"
58 | "TestTransformer"
59 | "GetLogAnomalyDetector"
60 | "ListLogAnomalyDetectors"
61 | "GetLogGroupFields"
62 | "TestMetricFilter"
63 | "GetLogRecord"
64 | "ListAnomalies"
65 | "DescribeImportTasks"
66 | "DescribeImportTaskBatches"
67 | "GetIntegration"
68 | "ListIntegrations"
69 | "GetLookupTable"
70 | "DescribeLookupTables"
71 | "GetScheduledQuery"
72 | "GetScheduledQueryHistory"
73 | "ListScheduledQueries"
74 | "StartLiveTail"
75 | "ListLogGroups"
76 | "ListLogGroupsForQuery"
77 | "ListAggregateLogGroupSummaries"
78 | "GetLogObject"
79 | "GetLogFields"
80 | "ListSourcesForS3TableIntegration"
81 | "DescribeConfigurationTemplates"
82 | "GetExportedData"
83 )
84}
85
86pub struct LogsService {
87 state: SharedLogsState,
88 delivery_bus: Arc<DeliveryBus>,
89 snapshot_store: Option<Arc<dyn SnapshotStore>>,
90 snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95 Self {
96 state,
97 delivery_bus,
98 snapshot_store: None,
99 snapshot_lock: Arc::new(AsyncMutex::new(())),
100 }
101 }
102
103 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104 self.snapshot_store = Some(store);
105 self
106 }
107
108 async fn save_snapshot(&self) {
112 save_logs_snapshot(
113 &self.state,
114 self.snapshot_store.clone(),
115 &self.snapshot_lock,
116 )
117 .await;
118 }
119
120 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125 let store = self.snapshot_store.clone()?;
126 let state = self.state.clone();
127 let lock = self.snapshot_lock.clone();
128 Some(Arc::new(move || {
129 let state = state.clone();
130 let store = store.clone();
131 let lock = lock.clone();
132 Box::pin(async move {
133 save_logs_snapshot(&state, Some(store), &lock).await;
134 })
135 }))
136 }
137}
138
139pub async fn save_logs_snapshot(
145 state: &SharedLogsState,
146 store: Option<Arc<dyn SnapshotStore>>,
147 lock: &AsyncMutex<()>,
148) {
149 let Some(store) = store else {
150 return;
151 };
152 let _guard = lock.lock().await;
153 let snapshot = LogsSnapshot {
154 schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155 accounts: Some(state.read().clone()),
156 state: None,
157 };
158 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159 let bytes = serde_json::to_vec(&snapshot)
160 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161 store.save(&bytes)
162 })
163 .await;
164 match join {
165 Ok(Ok(())) => {}
166 Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167 Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168 }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173 fn service_name(&self) -> &str {
174 "logs"
175 }
176
177 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178 let mutates = !is_read_only_action(req.action.as_str());
179 let result = match req.action.as_str() {
180 "CreateLogGroup" => self.create_log_group(&req),
181 "DeleteLogGroup" => self.delete_log_group(&req),
182 "DescribeLogGroups" => self.describe_log_groups(&req),
183 "CreateLogStream" => self.create_log_stream(&req),
184 "DeleteLogStream" => self.delete_log_stream(&req),
185 "DescribeLogStreams" => self.describe_log_streams(&req),
186 "PutLogEvents" => self.put_log_events(&req),
187 "GetLogEvents" => self.get_log_events(&req),
188 "FilterLogEvents" => self.filter_log_events(&req),
189 "TagLogGroup" => self.tag_log_group(&req),
190 "UntagLogGroup" => self.untag_log_group(&req),
191 "ListTagsLogGroup" => self.list_tags_log_group(&req),
192 "TagResource" => self.tag_resource(&req),
193 "UntagResource" => self.untag_resource(&req),
194 "ListTagsForResource" => self.list_tags_for_resource(&req),
195 "PutRetentionPolicy" => self.put_retention_policy(&req),
196 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200 "PutMetricFilter" => self.put_metric_filter(&req),
201 "DescribeMetricFilters" => self.describe_metric_filters(&req),
202 "DeleteMetricFilter" => self.delete_metric_filter(&req),
203 "PutResourcePolicy" => self.put_resource_policy(&req),
204 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206 "PutDestination" => self.put_destination(&req),
207 "DescribeDestinations" => self.describe_destinations(&req),
208 "DeleteDestination" => self.delete_destination(&req),
209 "PutDestinationPolicy" => self.put_destination_policy(&req),
210 "StartQuery" => self.start_query(&req),
211 "GetQueryResults" => self.get_query_results(&req),
212 "DescribeQueries" => self.describe_queries(&req),
213 "CreateExportTask" => self.create_export_task(&req),
214 "DescribeExportTasks" => self.describe_export_tasks(&req),
215 "CancelExportTask" => self.cancel_export_task(&req),
216 "PutDeliveryDestination" => self.put_delivery_destination(&req),
217 "GetDeliveryDestination" => self.get_delivery_destination(&req),
218 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223 "PutDeliverySource" => self.put_delivery_source(&req),
224 "GetDeliverySource" => self.get_delivery_source(&req),
225 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226 "DeleteDeliverySource" => self.delete_delivery_source(&req),
227 "CreateDelivery" => self.create_delivery(&req),
228 "GetDelivery" => self.get_delivery(&req),
229 "DescribeDeliveries" => self.describe_deliveries(&req),
230 "DeleteDelivery" => self.delete_delivery(&req),
231 "AssociateKmsKey" => self.associate_kms_key(&req),
232 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233 "PutQueryDefinition" => self.put_query_definition(&req),
234 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235 "DeleteQueryDefinition" => self.delete_query_definition(&req),
236 "PutAccountPolicy" => self.put_account_policy(&req),
237 "DescribeAccountPolicies" => self.describe_account_policies(&req),
238 "DeleteAccountPolicy" => self.delete_account_policy(&req),
239 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242 "PutIndexPolicy" => self.put_index_policy(&req),
243 "DescribeIndexPolicies" => self.describe_index_policies(&req),
244 "DeleteIndexPolicy" => self.delete_index_policy(&req),
245 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246 "PutTransformer" => self.put_transformer(&req),
247 "GetTransformer" => self.get_transformer(&req),
248 "DeleteTransformer" => self.delete_transformer(&req),
249 "TestTransformer" => self.test_transformer(&req),
250 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255 "GetLogGroupFields" => self.get_log_group_fields(&req),
256 "TestMetricFilter" => self.test_metric_filter(&req),
257 "StopQuery" => self.stop_query(&req),
258 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259 "GetLogRecord" => self.get_log_record(&req),
260 "ListAnomalies" => self.list_anomalies(&req),
261 "UpdateAnomaly" => self.update_anomaly(&req),
262 "CreateImportTask" => self.create_import_task(&req),
263 "DescribeImportTasks" => self.describe_import_tasks(&req),
264 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265 "CancelImportTask" => self.cancel_import_task(&req),
266 "PutIntegration" => self.put_integration(&req),
267 "GetIntegration" => self.get_integration(&req),
268 "DeleteIntegration" => self.delete_integration(&req),
269 "ListIntegrations" => self.list_integrations(&req),
270 "CreateLookupTable" => self.create_lookup_table(&req),
271 "GetLookupTable" => self.get_lookup_table(&req),
272 "DescribeLookupTables" => self.describe_lookup_tables(&req),
273 "DeleteLookupTable" => self.delete_lookup_table(&req),
274 "UpdateLookupTable" => self.update_lookup_table(&req),
275 "CreateScheduledQuery" => self.create_scheduled_query(&req),
276 "GetScheduledQuery" => self.get_scheduled_query(&req),
277 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278 "ListScheduledQueries" => self.list_scheduled_queries(&req),
279 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281 "StartLiveTail" => self.start_live_tail(&req),
282 "ListLogGroups" => self.list_log_groups(&req),
283 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286 "GetLogObject" => self.get_log_object(&req),
287 "GetLogFields" => self.get_log_fields(&req),
288 "AssociateSourceToS3TableIntegration" => {
289 self.associate_source_to_s3_table_integration(&req)
290 }
291 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292 "DisassociateSourceFromS3TableIntegration" => {
293 self.disassociate_source_from_s3_table_integration(&req)
294 }
295 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297 "GetExportedData" => self.get_exported_data(&req),
299 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300 };
301 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302 self.save_snapshot().await;
303 }
304 result
305 }
306
307 fn supported_actions(&self) -> &[&str] {
308 SUPPORTED_ACTIONS
309 }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313 "CreateLogGroup",
314 "DeleteLogGroup",
315 "DescribeLogGroups",
316 "CreateLogStream",
317 "DeleteLogStream",
318 "DescribeLogStreams",
319 "PutLogEvents",
320 "GetLogEvents",
321 "FilterLogEvents",
322 "TagLogGroup",
323 "UntagLogGroup",
324 "ListTagsLogGroup",
325 "TagResource",
326 "UntagResource",
327 "ListTagsForResource",
328 "PutRetentionPolicy",
329 "DeleteRetentionPolicy",
330 "PutSubscriptionFilter",
331 "DescribeSubscriptionFilters",
332 "DeleteSubscriptionFilter",
333 "PutMetricFilter",
334 "DescribeMetricFilters",
335 "DeleteMetricFilter",
336 "PutResourcePolicy",
337 "DescribeResourcePolicies",
338 "DeleteResourcePolicy",
339 "PutDestination",
340 "DescribeDestinations",
341 "DeleteDestination",
342 "PutDestinationPolicy",
343 "StartQuery",
344 "GetQueryResults",
345 "DescribeQueries",
346 "CreateExportTask",
347 "DescribeExportTasks",
348 "CancelExportTask",
349 "PutDeliveryDestination",
350 "GetDeliveryDestination",
351 "DescribeDeliveryDestinations",
352 "DeleteDeliveryDestination",
353 "PutDeliveryDestinationPolicy",
354 "GetDeliveryDestinationPolicy",
355 "DeleteDeliveryDestinationPolicy",
356 "PutDeliverySource",
357 "GetDeliverySource",
358 "DescribeDeliverySources",
359 "DeleteDeliverySource",
360 "CreateDelivery",
361 "GetDelivery",
362 "DescribeDeliveries",
363 "DeleteDelivery",
364 "AssociateKmsKey",
365 "DisassociateKmsKey",
366 "PutQueryDefinition",
367 "DescribeQueryDefinitions",
368 "DeleteQueryDefinition",
369 "PutAccountPolicy",
370 "DescribeAccountPolicies",
371 "DeleteAccountPolicy",
372 "PutDataProtectionPolicy",
373 "GetDataProtectionPolicy",
374 "DeleteDataProtectionPolicy",
375 "PutIndexPolicy",
376 "DescribeIndexPolicies",
377 "DeleteIndexPolicy",
378 "DescribeFieldIndexes",
379 "PutTransformer",
380 "GetTransformer",
381 "DeleteTransformer",
382 "TestTransformer",
383 "CreateLogAnomalyDetector",
384 "GetLogAnomalyDetector",
385 "DeleteLogAnomalyDetector",
386 "ListLogAnomalyDetectors",
387 "UpdateLogAnomalyDetector",
388 "GetLogGroupFields",
389 "TestMetricFilter",
390 "StopQuery",
391 "PutLogGroupDeletionProtection",
392 "GetLogRecord",
393 "ListAnomalies",
394 "UpdateAnomaly",
395 "CreateImportTask",
396 "DescribeImportTasks",
397 "DescribeImportTaskBatches",
398 "CancelImportTask",
399 "PutIntegration",
400 "GetIntegration",
401 "DeleteIntegration",
402 "ListIntegrations",
403 "CreateLookupTable",
404 "GetLookupTable",
405 "DescribeLookupTables",
406 "DeleteLookupTable",
407 "UpdateLookupTable",
408 "CreateScheduledQuery",
409 "GetScheduledQuery",
410 "GetScheduledQueryHistory",
411 "ListScheduledQueries",
412 "DeleteScheduledQuery",
413 "UpdateScheduledQuery",
414 "StartLiveTail",
415 "ListLogGroups",
416 "ListLogGroupsForQuery",
417 "ListAggregateLogGroupSummaries",
418 "PutBearerTokenAuthentication",
419 "GetLogObject",
420 "GetLogFields",
421 "AssociateSourceToS3TableIntegration",
422 "ListSourcesForS3TableIntegration",
423 "DisassociateSourceFromS3TableIntegration",
424 "UpdateDeliveryConfiguration",
425 "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429 body[field].as_str().ok_or_else(|| {
430 AwsServiceError::aws_error(
431 StatusCode::BAD_REQUEST,
432 "InvalidParameterException",
433 format!("{field} is required"),
434 )
435 })
436}
437
438fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441 let mut m: serde_json::Map<String, Value> =
442 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443 m.entry("destinationResourceArn".to_string())
444 .or_insert_with(|| json!(""));
445 Value::Object(m)
446}
447
448fn generate_sequence_token() -> String {
449 use std::time::{SystemTime, UNIX_EPOCH};
450 let nanos = SystemTime::now()
451 .duration_since(UNIX_EPOCH)
452 .unwrap_or_default()
453 .as_nanos();
454 format!("{:038}", nanos % 10u128.pow(38))
456}
457
458fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
459 AwsServiceError::aws_error(
460 StatusCode::BAD_REQUEST,
461 "InvalidParameterException",
462 format!(
463 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
464 ),
465 )
466}
467
468fn resolve_log_group_name(
471 log_group_name: Option<&str>,
472 resource_identifier: Option<&str>,
473) -> Result<String, AwsServiceError> {
474 if let Some(identifier) = resource_identifier {
475 if identifier.starts_with("arn:") {
476 extract_log_group_from_arn(identifier).ok_or_else(|| {
477 AwsServiceError::aws_error(
478 StatusCode::BAD_REQUEST,
479 "InvalidParameterException",
480 format!("Invalid ARN: {identifier}"),
481 )
482 })
483 } else {
484 Ok(identifier.to_string())
485 }
486 } else if let Some(name) = log_group_name {
487 Ok(name.to_string())
488 } else {
489 Err(AwsServiceError::aws_error(
490 StatusCode::BAD_REQUEST,
491 "InvalidParameterException",
492 "Either logGroupName or resourceIdentifier is required",
493 ))
494 }
495}
496
497pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
499 let parts: Vec<&str> = arn.splitn(7, ':').collect();
501 if parts.len() >= 7 && parts[5] == "log-group" {
502 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
503 Some(name.to_string())
504 } else {
505 None
506 }
507}
508
509fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
517 let pattern = pattern.trim();
518
519 if pattern.is_empty() {
521 return true;
522 }
523
524 if pattern.starts_with('{') && pattern.ends_with('}') {
526 return matches_json_filter_pattern(pattern, message);
527 }
528
529 if pattern.starts_with('[') {
531 return false;
532 }
533
534 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
536 let inner = &pattern[1..pattern.len() - 1];
537 let unescaped = inner.replace("\\\"", "\"");
539 return message.contains(&unescaped);
540 }
541
542 let terms = parse_filter_terms(pattern);
544 terms.iter().all(|term| message.contains(term.as_str()))
545}
546
547fn parse_filter_terms(pattern: &str) -> Vec<String> {
549 let mut terms = Vec::new();
550 let mut chars = pattern.chars().peekable();
551
552 while chars.peek().is_some() {
553 while chars.peek().is_some_and(|c| c.is_whitespace()) {
555 chars.next();
556 }
557
558 if chars.peek().is_none() {
559 break;
560 }
561
562 if chars.peek() == Some(&'"') {
563 chars.next(); let mut term = String::new();
566 loop {
567 match chars.next() {
568 Some('\\') => {
569 if let Some(c) = chars.next() {
570 term.push(c);
571 }
572 }
573 Some('"') => break,
574 Some(c) => term.push(c),
575 None => break,
576 }
577 }
578 terms.push(term);
579 } else {
580 let mut term = String::new();
582 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
583 term.push(chars.next().unwrap());
584 }
585 if !term.is_empty() {
586 terms.push(term);
587 }
588 }
589 }
590
591 terms
592}
593
594fn matches_json_filter_pattern(pattern: &str, message: &str) -> bool {
596 let inner = pattern
598 .strip_prefix('{')
599 .and_then(|s| s.strip_suffix('}'))
600 .unwrap_or("")
601 .trim();
602
603 if inner.is_empty() {
604 return true;
605 }
606
607 let msg_json: serde_json::Value = match serde_json::from_str(message) {
609 Ok(v) => v,
610 Err(_) => return false, };
612
613 let conditions: Vec<&str> = inner.split("&&").collect();
617
618 for condition in conditions {
619 let condition = condition.trim();
620 if !matches_single_json_condition(condition, &msg_json) {
621 return false;
622 }
623 }
624
625 true
626}
627
628fn matches_single_json_condition(condition: &str, json: &serde_json::Value) -> bool {
629 let condition = condition.trim();
631
632 let ops = ["!=", ">=", "<=", "=", ">", "<"];
634 let mut found_op = None;
635 let mut op_pos = 0;
636 let mut op_len = 0;
637
638 for op in &ops {
639 if let Some(pos) = condition.find(op) {
640 let before = &condition[..pos];
642 let quote_count = before.chars().filter(|&c| c == '"').count();
643 if quote_count % 2 == 0 {
644 found_op = Some(*op);
645 op_pos = pos;
646 op_len = op.len();
647 break;
648 }
649 }
650 }
651
652 let (op, field_part, value_part) = match found_op {
653 Some(op) => (
654 op,
655 condition[..op_pos].trim(),
656 condition[op_pos + op_len..].trim(),
657 ),
658 None => {
659 if let Some(path) = condition.strip_prefix("$.") {
662 return resolve_json_path_simple(json, path).is_some();
663 }
664 return true;
665 }
666 };
667
668 let path = match field_part.strip_prefix("$.") {
670 Some(p) => p,
671 None => return false, };
673
674 let actual_value = match resolve_json_path_simple(json, path) {
675 Some(v) => v,
676 None => return op == "!=", };
678
679 let expected_str = if value_part.starts_with('"') && value_part.ends_with('"') {
681 let s = &value_part[1..value_part.len() - 1];
683 match op {
684 "=" => actual_value.as_str() == Some(s),
685 "!=" => actual_value.as_str() != Some(s),
686 _ => false,
687 }
688 } else if let Ok(expected_num) = value_part.parse::<f64>() {
689 let actual_num = actual_value.as_f64();
691 match (op, actual_num) {
692 ("=", Some(n)) => (n - expected_num).abs() < f64::EPSILON,
693 ("!=", Some(n)) => (n - expected_num).abs() >= f64::EPSILON,
694 (">", Some(n)) => n > expected_num,
695 ("<", Some(n)) => n < expected_num,
696 (">=", Some(n)) => n >= expected_num,
697 ("<=", Some(n)) => n <= expected_num,
698 _ => false,
699 }
700 } else if value_part == "true" || value_part == "false" {
701 let expected_bool = value_part == "true";
702 match op {
703 "=" => actual_value.as_bool() == Some(expected_bool),
704 "!=" => actual_value.as_bool() != Some(expected_bool),
705 _ => false,
706 }
707 } else {
708 false };
710
711 expected_str
712}
713
714fn resolve_json_path_simple<'a>(
716 json: &'a serde_json::Value,
717 path: &str,
718) -> Option<&'a serde_json::Value> {
719 let mut current = json;
720 for part in path.split('.') {
721 current = current.get(part)?;
722 }
723 if current.is_null() {
724 None
725 } else {
726 Some(current)
727 }
728}
729
730#[cfg(test)]
731pub(crate) mod test_helpers {
732 use super::*;
733 use bytes::Bytes;
734 use fakecloud_core::delivery::DeliveryBus;
735 use http::{HeaderMap, Method};
736 use std::collections::HashMap;
737 use std::sync::Arc;
738
739 pub fn make_service() -> LogsService {
740 let state = Arc::new(parking_lot::RwLock::new(
741 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
742 ));
743 let delivery_bus = Arc::new(DeliveryBus::new());
744 LogsService::new(state, delivery_bus)
745 }
746
747 pub fn make_request(
748 action: &str,
749 body: serde_json::Value,
750 ) -> fakecloud_core::service::AwsRequest {
751 fakecloud_core::service::AwsRequest {
752 service: "logs".to_string(),
753 action: action.to_string(),
754 region: "us-east-1".to_string(),
755 account_id: "123456789012".to_string(),
756 request_id: "test-request-id".to_string(),
757 headers: HeaderMap::new(),
758 query_params: HashMap::new(),
759 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
760 body_stream: parking_lot::Mutex::new(None),
761 path_segments: vec![],
762 raw_path: "/".to_string(),
763 raw_query: String::new(),
764 method: Method::POST,
765 is_query_protocol: false,
766 access_key_id: None,
767 principal: None,
768 }
769 }
770
771 pub fn create_group(svc: &LogsService, name: &str) {
772 let req = make_request(
773 "CreateLogGroup",
774 serde_json::json!({ "logGroupName": name }),
775 );
776 svc.create_log_group(&req).unwrap();
777 }
778
779 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
780 let req = make_request(
781 "CreateLogStream",
782 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
783 );
784 svc.create_log_stream(&req).unwrap();
785 }
786
787 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
788 let now = chrono::Utc::now().timestamp_millis();
789 let events: Vec<serde_json::Value> = messages
790 .iter()
791 .enumerate()
792 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
793 .collect();
794 let req = make_request(
795 "PutLogEvents",
796 serde_json::json!({
797 "logGroupName": group,
798 "logStreamName": stream,
799 "logEvents": events,
800 }),
801 );
802 svc.put_log_events(&req).unwrap();
803 }
804
805 pub fn put_events_at(
806 svc: &LogsService,
807 group: &str,
808 stream: &str,
809 messages: &[&str],
810 timestamp: i64,
811 ) {
812 let events: Vec<serde_json::Value> = messages
813 .iter()
814 .enumerate()
815 .map(
816 |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
817 )
818 .collect();
819 let req = make_request(
820 "PutLogEvents",
821 serde_json::json!({
822 "logGroupName": group,
823 "logStreamName": stream,
824 "logEvents": events,
825 }),
826 );
827 svc.put_log_events(&req).unwrap();
828 }
829
830 pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
831 let req = make_request(
832 "PutRetentionPolicy",
833 serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
834 );
835 svc.put_retention_policy(&req).unwrap();
836 }
837
838 #[test]
839 fn array_filter_pattern_does_not_match() {
840 assert!(
841 !matches_filter_pattern("[w1, w2, w3]", "some log message"),
842 "array-style filter pattern must not match (fail closed)"
843 );
844 }
845
846 #[test]
847 fn unrecognized_json_filter_path_does_not_match() {
848 assert!(
851 !matches_single_json_condition(
852 "level = \"ERROR\"",
853 &serde_json::json!({"level": "ERROR"}),
854 ),
855 "filter condition without $. prefix must not match (fail closed)"
856 );
857 }
858
859 #[test]
860 fn unknown_value_format_does_not_match() {
861 assert!(
863 !matches_single_json_condition(
864 "$.level = ERROR",
865 &serde_json::json!({"level": "ERROR"}),
866 ),
867 "unquoted non-numeric non-boolean value must not match (fail closed)"
868 );
869 }
870
871 #[test]
873 fn snapshot_hook_is_none_without_store() {
874 let svc = make_service();
875 assert!(svc.snapshot_hook().is_none());
876 }
877
878 #[tokio::test]
882 async fn snapshot_hook_fires_with_store() {
883 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
884 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
885 let svc = make_service().with_snapshot_store(store);
886 let hook = svc
887 .snapshot_hook()
888 .expect("hook present when a store is set");
889 hook().await;
891 }
892}