1use async_trait::async_trait;
2use http::StatusCode;
3use serde_json::{json, Value};
4
5use std::sync::Arc;
6
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{LogsSnapshot, SharedLogsState, LOGS_SNAPSHOT_SCHEMA_VERSION};
14
15mod anomaly;
16mod deliveries;
17mod destinations;
18mod exports;
19mod filters;
20mod groups;
21mod misc;
22mod policies;
23mod queries;
24mod streams;
25mod tags;
26
27fn is_read_only_action(action: &str) -> bool {
30 matches!(
31 action,
32 "DescribeLogGroups"
33 | "DescribeLogStreams"
34 | "GetLogEvents"
35 | "FilterLogEvents"
36 | "ListTagsLogGroup"
37 | "ListTagsForResource"
38 | "DescribeSubscriptionFilters"
39 | "DescribeMetricFilters"
40 | "DescribeResourcePolicies"
41 | "DescribeDestinations"
42 | "GetQueryResults"
43 | "DescribeQueries"
44 | "DescribeExportTasks"
45 | "GetDeliveryDestination"
46 | "DescribeDeliveryDestinations"
47 | "GetDeliveryDestinationPolicy"
48 | "GetDeliverySource"
49 | "DescribeDeliverySources"
50 | "GetDelivery"
51 | "DescribeDeliveries"
52 | "DescribeQueryDefinitions"
53 | "DescribeAccountPolicies"
54 | "GetDataProtectionPolicy"
55 | "DescribeIndexPolicies"
56 | "DescribeFieldIndexes"
57 | "GetTransformer"
58 | "TestTransformer"
59 | "GetLogAnomalyDetector"
60 | "ListLogAnomalyDetectors"
61 | "GetLogGroupFields"
62 | "TestMetricFilter"
63 | "GetLogRecord"
64 | "ListAnomalies"
65 | "DescribeImportTasks"
66 | "DescribeImportTaskBatches"
67 | "GetIntegration"
68 | "ListIntegrations"
69 | "GetLookupTable"
70 | "DescribeLookupTables"
71 | "GetScheduledQuery"
72 | "GetScheduledQueryHistory"
73 | "ListScheduledQueries"
74 | "StartLiveTail"
75 | "ListLogGroups"
76 | "ListLogGroupsForQuery"
77 | "ListAggregateLogGroupSummaries"
78 | "GetLogObject"
79 | "GetLogFields"
80 | "ListSourcesForS3TableIntegration"
81 | "DescribeConfigurationTemplates"
82 | "GetExportedData"
83 )
84}
85
86pub struct LogsService {
87 state: SharedLogsState,
88 delivery_bus: Arc<DeliveryBus>,
89 snapshot_store: Option<Arc<dyn SnapshotStore>>,
90 snapshot_lock: Arc<AsyncMutex<()>>,
91}
92
93impl LogsService {
94 pub fn new(state: SharedLogsState, delivery_bus: Arc<DeliveryBus>) -> Self {
95 Self {
96 state,
97 delivery_bus,
98 snapshot_store: None,
99 snapshot_lock: Arc::new(AsyncMutex::new(())),
100 }
101 }
102
103 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
104 self.snapshot_store = Some(store);
105 self
106 }
107
108 async fn save_snapshot(&self) {
112 save_logs_snapshot(
113 &self.state,
114 self.snapshot_store.clone(),
115 &self.snapshot_lock,
116 )
117 .await;
118 }
119
120 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
125 let store = self.snapshot_store.clone()?;
126 let state = self.state.clone();
127 let lock = self.snapshot_lock.clone();
128 Some(Arc::new(move || {
129 let state = state.clone();
130 let store = store.clone();
131 let lock = lock.clone();
132 Box::pin(async move {
133 save_logs_snapshot(&state, Some(store), &lock).await;
134 })
135 }))
136 }
137}
138
139pub async fn save_logs_snapshot(
145 state: &SharedLogsState,
146 store: Option<Arc<dyn SnapshotStore>>,
147 lock: &AsyncMutex<()>,
148) {
149 let Some(store) = store else {
150 return;
151 };
152 let _guard = lock.lock().await;
153 let snapshot = LogsSnapshot {
154 schema_version: LOGS_SNAPSHOT_SCHEMA_VERSION,
155 accounts: Some(state.read().clone()),
156 state: None,
157 };
158 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
159 let bytes = serde_json::to_vec(&snapshot)
160 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
161 store.save(&bytes)
162 })
163 .await;
164 match join {
165 Ok(Ok(())) => {}
166 Ok(Err(err)) => tracing::error!(%err, "failed to write logs snapshot"),
167 Err(err) => tracing::error!(%err, "logs snapshot task panicked"),
168 }
169}
170
171#[async_trait]
172impl AwsService for LogsService {
173 fn service_name(&self) -> &str {
174 "logs"
175 }
176
177 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
178 let mutates = !is_read_only_action(req.action.as_str());
179 let result = match req.action.as_str() {
180 "CreateLogGroup" => self.create_log_group(&req),
181 "DeleteLogGroup" => self.delete_log_group(&req),
182 "DescribeLogGroups" => self.describe_log_groups(&req),
183 "CreateLogStream" => self.create_log_stream(&req),
184 "DeleteLogStream" => self.delete_log_stream(&req),
185 "DescribeLogStreams" => self.describe_log_streams(&req),
186 "PutLogEvents" => self.put_log_events(&req),
187 "GetLogEvents" => self.get_log_events(&req),
188 "FilterLogEvents" => self.filter_log_events(&req),
189 "TagLogGroup" => self.tag_log_group(&req),
190 "UntagLogGroup" => self.untag_log_group(&req),
191 "ListTagsLogGroup" => self.list_tags_log_group(&req),
192 "TagResource" => self.tag_resource(&req),
193 "UntagResource" => self.untag_resource(&req),
194 "ListTagsForResource" => self.list_tags_for_resource(&req),
195 "PutRetentionPolicy" => self.put_retention_policy(&req),
196 "DeleteRetentionPolicy" => self.delete_retention_policy(&req),
197 "PutSubscriptionFilter" => self.put_subscription_filter(&req),
198 "DescribeSubscriptionFilters" => self.describe_subscription_filters(&req),
199 "DeleteSubscriptionFilter" => self.delete_subscription_filter(&req),
200 "PutMetricFilter" => self.put_metric_filter(&req),
201 "DescribeMetricFilters" => self.describe_metric_filters(&req),
202 "DeleteMetricFilter" => self.delete_metric_filter(&req),
203 "PutResourcePolicy" => self.put_resource_policy(&req),
204 "DescribeResourcePolicies" => self.describe_resource_policies(&req),
205 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
206 "PutDestination" => self.put_destination(&req),
207 "DescribeDestinations" => self.describe_destinations(&req),
208 "DeleteDestination" => self.delete_destination(&req),
209 "PutDestinationPolicy" => self.put_destination_policy(&req),
210 "StartQuery" => self.start_query(&req),
211 "GetQueryResults" => self.get_query_results(&req),
212 "DescribeQueries" => self.describe_queries(&req),
213 "CreateExportTask" => self.create_export_task(&req),
214 "DescribeExportTasks" => self.describe_export_tasks(&req),
215 "CancelExportTask" => self.cancel_export_task(&req),
216 "PutDeliveryDestination" => self.put_delivery_destination(&req),
217 "GetDeliveryDestination" => self.get_delivery_destination(&req),
218 "DescribeDeliveryDestinations" => self.describe_delivery_destinations(&req),
219 "DeleteDeliveryDestination" => self.delete_delivery_destination(&req),
220 "PutDeliveryDestinationPolicy" => self.put_delivery_destination_policy(&req),
221 "GetDeliveryDestinationPolicy" => self.get_delivery_destination_policy(&req),
222 "DeleteDeliveryDestinationPolicy" => self.delete_delivery_destination_policy(&req),
223 "PutDeliverySource" => self.put_delivery_source(&req),
224 "GetDeliverySource" => self.get_delivery_source(&req),
225 "DescribeDeliverySources" => self.describe_delivery_sources(&req),
226 "DeleteDeliverySource" => self.delete_delivery_source(&req),
227 "CreateDelivery" => self.create_delivery(&req),
228 "GetDelivery" => self.get_delivery(&req),
229 "DescribeDeliveries" => self.describe_deliveries(&req),
230 "DeleteDelivery" => self.delete_delivery(&req),
231 "AssociateKmsKey" => self.associate_kms_key(&req),
232 "DisassociateKmsKey" => self.disassociate_kms_key(&req),
233 "PutQueryDefinition" => self.put_query_definition(&req),
234 "DescribeQueryDefinitions" => self.describe_query_definitions(&req),
235 "DeleteQueryDefinition" => self.delete_query_definition(&req),
236 "PutAccountPolicy" => self.put_account_policy(&req),
237 "DescribeAccountPolicies" => self.describe_account_policies(&req),
238 "DeleteAccountPolicy" => self.delete_account_policy(&req),
239 "PutDataProtectionPolicy" => self.put_data_protection_policy(&req),
240 "GetDataProtectionPolicy" => self.get_data_protection_policy(&req),
241 "DeleteDataProtectionPolicy" => self.delete_data_protection_policy(&req),
242 "PutIndexPolicy" => self.put_index_policy(&req),
243 "DescribeIndexPolicies" => self.describe_index_policies(&req),
244 "DeleteIndexPolicy" => self.delete_index_policy(&req),
245 "DescribeFieldIndexes" => self.describe_field_indexes(&req),
246 "PutTransformer" => self.put_transformer(&req),
247 "GetTransformer" => self.get_transformer(&req),
248 "DeleteTransformer" => self.delete_transformer(&req),
249 "TestTransformer" => self.test_transformer(&req),
250 "CreateLogAnomalyDetector" => self.create_log_anomaly_detector(&req),
251 "GetLogAnomalyDetector" => self.get_log_anomaly_detector(&req),
252 "DeleteLogAnomalyDetector" => self.delete_log_anomaly_detector(&req),
253 "ListLogAnomalyDetectors" => self.list_log_anomaly_detectors(&req),
254 "UpdateLogAnomalyDetector" => self.update_log_anomaly_detector(&req),
255 "GetLogGroupFields" => self.get_log_group_fields(&req),
256 "TestMetricFilter" => self.test_metric_filter(&req),
257 "StopQuery" => self.stop_query(&req),
258 "PutLogGroupDeletionProtection" => self.put_log_group_deletion_protection(&req),
259 "GetLogRecord" => self.get_log_record(&req),
260 "ListAnomalies" => self.list_anomalies(&req),
261 "UpdateAnomaly" => self.update_anomaly(&req),
262 "CreateImportTask" => self.create_import_task(&req),
263 "DescribeImportTasks" => self.describe_import_tasks(&req),
264 "DescribeImportTaskBatches" => self.describe_import_task_batches(&req),
265 "CancelImportTask" => self.cancel_import_task(&req),
266 "PutIntegration" => self.put_integration(&req),
267 "GetIntegration" => self.get_integration(&req),
268 "DeleteIntegration" => self.delete_integration(&req),
269 "ListIntegrations" => self.list_integrations(&req),
270 "CreateLookupTable" => self.create_lookup_table(&req),
271 "GetLookupTable" => self.get_lookup_table(&req),
272 "DescribeLookupTables" => self.describe_lookup_tables(&req),
273 "DeleteLookupTable" => self.delete_lookup_table(&req),
274 "UpdateLookupTable" => self.update_lookup_table(&req),
275 "CreateScheduledQuery" => self.create_scheduled_query(&req),
276 "GetScheduledQuery" => self.get_scheduled_query(&req),
277 "GetScheduledQueryHistory" => self.get_scheduled_query_history(&req),
278 "ListScheduledQueries" => self.list_scheduled_queries(&req),
279 "DeleteScheduledQuery" => self.delete_scheduled_query(&req),
280 "UpdateScheduledQuery" => self.update_scheduled_query(&req),
281 "StartLiveTail" => self.start_live_tail(&req),
282 "ListLogGroups" => self.list_log_groups(&req),
283 "ListLogGroupsForQuery" => self.list_log_groups_for_query(&req),
284 "ListAggregateLogGroupSummaries" => self.list_aggregate_log_group_summaries(&req),
285 "PutBearerTokenAuthentication" => self.put_bearer_token_authentication(&req),
286 "GetLogObject" => self.get_log_object(&req),
287 "GetLogFields" => self.get_log_fields(&req),
288 "AssociateSourceToS3TableIntegration" => {
289 self.associate_source_to_s3_table_integration(&req)
290 }
291 "ListSourcesForS3TableIntegration" => self.list_sources_for_s3_table_integration(&req),
292 "DisassociateSourceFromS3TableIntegration" => {
293 self.disassociate_source_from_s3_table_integration(&req)
294 }
295 "UpdateDeliveryConfiguration" => self.update_delivery_configuration(&req),
296 "DescribeConfigurationTemplates" => self.describe_configuration_templates(&req),
297 "GetExportedData" => self.get_exported_data(&req),
299 _ => Err(AwsServiceError::action_not_implemented("logs", &req.action)),
300 };
301 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
302 self.save_snapshot().await;
303 }
304 result
305 }
306
307 fn supported_actions(&self) -> &[&str] {
308 SUPPORTED_ACTIONS
309 }
310}
311
312const SUPPORTED_ACTIONS: &[&str] = &[
313 "CreateLogGroup",
314 "DeleteLogGroup",
315 "DescribeLogGroups",
316 "CreateLogStream",
317 "DeleteLogStream",
318 "DescribeLogStreams",
319 "PutLogEvents",
320 "GetLogEvents",
321 "FilterLogEvents",
322 "TagLogGroup",
323 "UntagLogGroup",
324 "ListTagsLogGroup",
325 "TagResource",
326 "UntagResource",
327 "ListTagsForResource",
328 "PutRetentionPolicy",
329 "DeleteRetentionPolicy",
330 "PutSubscriptionFilter",
331 "DescribeSubscriptionFilters",
332 "DeleteSubscriptionFilter",
333 "PutMetricFilter",
334 "DescribeMetricFilters",
335 "DeleteMetricFilter",
336 "PutResourcePolicy",
337 "DescribeResourcePolicies",
338 "DeleteResourcePolicy",
339 "PutDestination",
340 "DescribeDestinations",
341 "DeleteDestination",
342 "PutDestinationPolicy",
343 "StartQuery",
344 "GetQueryResults",
345 "DescribeQueries",
346 "CreateExportTask",
347 "DescribeExportTasks",
348 "CancelExportTask",
349 "PutDeliveryDestination",
350 "GetDeliveryDestination",
351 "DescribeDeliveryDestinations",
352 "DeleteDeliveryDestination",
353 "PutDeliveryDestinationPolicy",
354 "GetDeliveryDestinationPolicy",
355 "DeleteDeliveryDestinationPolicy",
356 "PutDeliverySource",
357 "GetDeliverySource",
358 "DescribeDeliverySources",
359 "DeleteDeliverySource",
360 "CreateDelivery",
361 "GetDelivery",
362 "DescribeDeliveries",
363 "DeleteDelivery",
364 "AssociateKmsKey",
365 "DisassociateKmsKey",
366 "PutQueryDefinition",
367 "DescribeQueryDefinitions",
368 "DeleteQueryDefinition",
369 "PutAccountPolicy",
370 "DescribeAccountPolicies",
371 "DeleteAccountPolicy",
372 "PutDataProtectionPolicy",
373 "GetDataProtectionPolicy",
374 "DeleteDataProtectionPolicy",
375 "PutIndexPolicy",
376 "DescribeIndexPolicies",
377 "DeleteIndexPolicy",
378 "DescribeFieldIndexes",
379 "PutTransformer",
380 "GetTransformer",
381 "DeleteTransformer",
382 "TestTransformer",
383 "CreateLogAnomalyDetector",
384 "GetLogAnomalyDetector",
385 "DeleteLogAnomalyDetector",
386 "ListLogAnomalyDetectors",
387 "UpdateLogAnomalyDetector",
388 "GetLogGroupFields",
389 "TestMetricFilter",
390 "StopQuery",
391 "PutLogGroupDeletionProtection",
392 "GetLogRecord",
393 "ListAnomalies",
394 "UpdateAnomaly",
395 "CreateImportTask",
396 "DescribeImportTasks",
397 "DescribeImportTaskBatches",
398 "CancelImportTask",
399 "PutIntegration",
400 "GetIntegration",
401 "DeleteIntegration",
402 "ListIntegrations",
403 "CreateLookupTable",
404 "GetLookupTable",
405 "DescribeLookupTables",
406 "DeleteLookupTable",
407 "UpdateLookupTable",
408 "CreateScheduledQuery",
409 "GetScheduledQuery",
410 "GetScheduledQueryHistory",
411 "ListScheduledQueries",
412 "DeleteScheduledQuery",
413 "UpdateScheduledQuery",
414 "StartLiveTail",
415 "ListLogGroups",
416 "ListLogGroupsForQuery",
417 "ListAggregateLogGroupSummaries",
418 "PutBearerTokenAuthentication",
419 "GetLogObject",
420 "GetLogFields",
421 "AssociateSourceToS3TableIntegration",
422 "ListSourcesForS3TableIntegration",
423 "DisassociateSourceFromS3TableIntegration",
424 "UpdateDeliveryConfiguration",
425 "DescribeConfigurationTemplates",
426];
427
428fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
429 body[field].as_str().ok_or_else(|| {
430 AwsServiceError::aws_error(
431 StatusCode::BAD_REQUEST,
432 "InvalidParameterException",
433 format!("{field} is required"),
434 )
435 })
436}
437
438fn dd_config_json(config: &std::collections::BTreeMap<String, String>) -> Value {
441 let mut m: serde_json::Map<String, Value> =
442 config.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
443 m.entry("destinationResourceArn".to_string())
444 .or_insert_with(|| json!(""));
445 Value::Object(m)
446}
447
448pub fn infer_delivery_destination_type(destination_arn: Option<&String>) -> String {
452 let arn = destination_arn.map(String::as_str).unwrap_or("");
453 let service = arn.split(':').nth(2).unwrap_or("");
454 match service {
455 "s3" => "S3",
456 "firehose" => "FH",
457 "xray" => "XRAY",
458 _ => "CWL",
459 }
460 .to_string()
461}
462
463fn generate_sequence_token() -> String {
464 use std::time::{SystemTime, UNIX_EPOCH};
465 let nanos = SystemTime::now()
466 .duration_since(UNIX_EPOCH)
467 .unwrap_or_default()
468 .as_nanos();
469 format!("{:038}", nanos % 10u128.pow(38))
471}
472
473fn validation_error(field: &str, value: &str, constraint: &str) -> AwsServiceError {
474 AwsServiceError::aws_error(
475 StatusCode::BAD_REQUEST,
476 "InvalidParameterException",
477 format!(
478 "1 validation error detected: Value '{value}' at '{field}' failed to satisfy constraint: {constraint}"
479 ),
480 )
481}
482
483fn resolve_log_group_name(
486 log_group_name: Option<&str>,
487 resource_identifier: Option<&str>,
488) -> Result<String, AwsServiceError> {
489 if let Some(identifier) = resource_identifier {
490 if identifier.starts_with("arn:") {
491 extract_log_group_from_arn(identifier).ok_or_else(|| {
492 AwsServiceError::aws_error(
493 StatusCode::BAD_REQUEST,
494 "InvalidParameterException",
495 format!("Invalid ARN: {identifier}"),
496 )
497 })
498 } else {
499 Ok(identifier.to_string())
500 }
501 } else if let Some(name) = log_group_name {
502 Ok(name.to_string())
503 } else {
504 Err(AwsServiceError::aws_error(
505 StatusCode::BAD_REQUEST,
506 "InvalidParameterException",
507 "Either logGroupName or resourceIdentifier is required",
508 ))
509 }
510}
511
512pub(crate) fn extract_log_group_from_arn(arn: &str) -> Option<String> {
514 let parts: Vec<&str> = arn.splitn(7, ':').collect();
516 if parts.len() >= 7 && parts[5] == "log-group" {
517 let name = parts[6].strip_suffix(":*").unwrap_or(parts[6]);
518 Some(name.to_string())
519 } else {
520 None
521 }
522}
523
524fn matches_filter_pattern(pattern: &str, message: &str) -> bool {
532 let pattern = pattern.trim();
533
534 if pattern.is_empty() {
536 return true;
537 }
538
539 if (pattern.starts_with('{') && pattern.ends_with('}')) || pattern.starts_with('[') {
543 return crate::filter_pattern::matches(pattern, message);
544 }
545
546 if pattern.starts_with('"') && pattern.ends_with('"') && pattern.len() >= 2 {
548 let inner = &pattern[1..pattern.len() - 1];
549 let unescaped = inner.replace("\\\"", "\"");
551 return message.contains(&unescaped);
552 }
553
554 let terms = parse_filter_terms(pattern);
556 terms.iter().all(|term| message.contains(term.as_str()))
557}
558
559fn parse_filter_terms(pattern: &str) -> Vec<String> {
561 let mut terms = Vec::new();
562 let mut chars = pattern.chars().peekable();
563
564 while chars.peek().is_some() {
565 while chars.peek().is_some_and(|c| c.is_whitespace()) {
567 chars.next();
568 }
569
570 if chars.peek().is_none() {
571 break;
572 }
573
574 if chars.peek() == Some(&'"') {
575 chars.next(); let mut term = String::new();
578 loop {
579 match chars.next() {
580 Some('\\') => {
581 if let Some(c) = chars.next() {
582 term.push(c);
583 }
584 }
585 Some('"') => break,
586 Some(c) => term.push(c),
587 None => break,
588 }
589 }
590 terms.push(term);
591 } else {
592 let mut term = String::new();
594 while chars.peek().is_some_and(|c| !c.is_whitespace()) {
595 term.push(chars.next().unwrap());
596 }
597 if !term.is_empty() {
598 terms.push(term);
599 }
600 }
601 }
602
603 terms
604}
605
606#[cfg(test)]
607pub(crate) mod test_helpers {
608 use super::*;
609 use bytes::Bytes;
610 use fakecloud_core::delivery::DeliveryBus;
611 use http::{HeaderMap, Method};
612 use std::collections::HashMap;
613 use std::sync::Arc;
614
615 pub fn make_service() -> LogsService {
616 let state = Arc::new(parking_lot::RwLock::new(
617 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
618 ));
619 let delivery_bus = Arc::new(DeliveryBus::new());
620 LogsService::new(state, delivery_bus)
621 }
622
623 pub fn make_request(
624 action: &str,
625 body: serde_json::Value,
626 ) -> fakecloud_core::service::AwsRequest {
627 fakecloud_core::service::AwsRequest {
628 service: "logs".to_string(),
629 action: action.to_string(),
630 region: "us-east-1".to_string(),
631 account_id: "123456789012".to_string(),
632 request_id: "test-request-id".to_string(),
633 headers: HeaderMap::new(),
634 query_params: HashMap::new(),
635 body: Bytes::from(serde_json::to_vec(&body).unwrap()),
636 body_stream: parking_lot::Mutex::new(None),
637 path_segments: vec![],
638 raw_path: "/".to_string(),
639 raw_query: String::new(),
640 method: Method::POST,
641 is_query_protocol: false,
642 access_key_id: None,
643 principal: None,
644 }
645 }
646
647 pub fn create_group(svc: &LogsService, name: &str) {
648 let req = make_request(
649 "CreateLogGroup",
650 serde_json::json!({ "logGroupName": name }),
651 );
652 svc.create_log_group(&req).unwrap();
653 }
654
655 pub fn create_stream(svc: &LogsService, group: &str, stream: &str) {
656 let req = make_request(
657 "CreateLogStream",
658 serde_json::json!({ "logGroupName": group, "logStreamName": stream }),
659 );
660 svc.create_log_stream(&req).unwrap();
661 }
662
663 pub fn put_events(svc: &LogsService, group: &str, stream: &str, messages: &[&str]) {
664 let now = chrono::Utc::now().timestamp_millis();
665 let events: Vec<serde_json::Value> = messages
666 .iter()
667 .enumerate()
668 .map(|(i, msg)| serde_json::json!({ "timestamp": now + i as i64, "message": msg }))
669 .collect();
670 let req = make_request(
671 "PutLogEvents",
672 serde_json::json!({
673 "logGroupName": group,
674 "logStreamName": stream,
675 "logEvents": events,
676 }),
677 );
678 svc.put_log_events(&req).unwrap();
679 }
680
681 pub fn put_events_at(
682 svc: &LogsService,
683 group: &str,
684 stream: &str,
685 messages: &[&str],
686 timestamp: i64,
687 ) {
688 let events: Vec<serde_json::Value> = messages
689 .iter()
690 .enumerate()
691 .map(
692 |(i, msg)| serde_json::json!({ "timestamp": timestamp + i as i64, "message": msg }),
693 )
694 .collect();
695 let req = make_request(
696 "PutLogEvents",
697 serde_json::json!({
698 "logGroupName": group,
699 "logStreamName": stream,
700 "logEvents": events,
701 }),
702 );
703 svc.put_log_events(&req).unwrap();
704 }
705
706 pub fn put_retention(svc: &LogsService, group: &str, days: i32) {
707 let req = make_request(
708 "PutRetentionPolicy",
709 serde_json::json!({ "logGroupName": group, "retentionInDays": days }),
710 );
711 svc.put_retention_policy(&req).unwrap();
712 }
713
714 #[test]
718 fn array_filter_pattern_matches_positionally() {
719 assert!(matches_filter_pattern("[w1, w2, w3]", "some log message"));
721 assert!(!matches_filter_pattern(
723 "[w1=ERROR, w2, w3]",
724 "INFO log message"
725 ));
726 assert!(!matches_filter_pattern("[w1, w2, w3]", "a b c d"));
728 }
729
730 #[test]
732 fn json_filter_pattern_supports_or() {
733 let msg = r#"{"level":"ERROR","code":500}"#;
734 assert!(matches_filter_pattern(
735 "{ $.level = \"WARN\" || $.code = 500 }",
736 msg
737 ));
738 assert!(!matches_filter_pattern(
739 "{ $.level = \"WARN\" || $.code = 200 }",
740 msg
741 ));
742 }
743
744 #[test]
746 fn snapshot_hook_is_none_without_store() {
747 let svc = make_service();
748 assert!(svc.snapshot_hook().is_none());
749 }
750
751 #[tokio::test]
755 async fn snapshot_hook_fires_with_store() {
756 let store: Arc<dyn fakecloud_persistence::SnapshotStore> =
757 Arc::new(fakecloud_persistence::MemorySnapshotStore::new());
758 let svc = make_service().with_snapshot_store(store);
759 let hook = svc
760 .snapshot_hook()
761 .expect("hook present when a store is set");
762 hook().await;
764 }
765}