Skip to main content

fakecloud_logs/
state.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use parking_lot::RwLock;
5
6pub type SharedLogsState = Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<LogsState>>>;
7
8impl fakecloud_core::multi_account::AccountState for LogsState {
9    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
10        Self::new(account_id, region)
11    }
12}
13
14/// JSON object keys must be strings, so serialize
15/// `HashMap<(String,String), AccountPolicy>` as a list of
16/// `[policy_name, policy_type, policy]` tuples.
17mod account_policy_map_serde {
18    use super::AccountPolicy;
19    use serde::{Deserialize, Deserializer, Serialize, Serializer};
20    use std::collections::BTreeMap;
21
22    pub fn serialize<S: Serializer>(
23        map: &BTreeMap<(String, String), AccountPolicy>,
24        s: S,
25    ) -> Result<S::Ok, S::Error> {
26        let entries: Vec<(&String, &String, &AccountPolicy)> = map
27            .iter()
28            .map(|((name, kind), p)| (name, kind, p))
29            .collect();
30        entries.serialize(s)
31    }
32
33    pub fn deserialize<'de, D: Deserializer<'de>>(
34        d: D,
35    ) -> Result<BTreeMap<(String, String), AccountPolicy>, D::Error> {
36        let entries: Vec<(String, String, AccountPolicy)> = Vec::deserialize(d)?;
37        Ok(entries
38            .into_iter()
39            .map(|(name, kind, p)| ((name, kind), p))
40            .collect())
41    }
42}
43
44#[derive(Clone, serde::Serialize, serde::Deserialize)]
45pub struct LogsState {
46    pub account_id: String,
47    pub region: String,
48    pub log_groups: BTreeMap<String, LogGroup>,
49    pub metric_filters: Vec<MetricFilter>,
50    pub resource_policies: BTreeMap<String, ResourcePolicy>,
51    pub destinations: BTreeMap<String, Destination>,
52    pub queries: BTreeMap<String, QueryInfo>,
53    pub export_tasks: Vec<ExportTask>,
54    pub delivery_destinations: BTreeMap<String, DeliveryDestination>,
55    pub delivery_sources: BTreeMap<String, DeliverySource>,
56    pub deliveries: BTreeMap<String, Delivery>,
57    pub query_definitions: BTreeMap<String, QueryDefinition>,
58    /// Account policies keyed by (policy_name, policy_type)
59    #[serde(with = "account_policy_map_serde")]
60    pub account_policies: BTreeMap<(String, String), AccountPolicy>,
61    /// Anomaly detectors keyed by detector ARN
62    pub anomaly_detectors: BTreeMap<String, AnomalyDetector>,
63    /// Import tasks keyed by import ID
64    pub import_tasks: BTreeMap<String, ImportTask>,
65    /// Integrations keyed by integration name
66    pub integrations: BTreeMap<String, Integration>,
67    /// Lookup tables keyed by ARN
68    pub lookup_tables: BTreeMap<String, LookupTable>,
69    /// Scheduled queries keyed by identifier (ARN)
70    pub scheduled_queries: BTreeMap<String, ScheduledQuery>,
71    /// S3 table integration sources keyed by integration ARN -> list of source identifiers
72    pub s3_table_sources: BTreeMap<String, Vec<String>>,
73    /// Bearer token authentication flag per log group
74    pub bearer_token_auth: BTreeMap<String, bool>,
75    /// Internal export storage: keyed by "bucket/prefix/..." path, value is exported data.
76    /// Used by CreateExportTask and delivery pipeline when direct S3 access is unavailable.
77    pub export_storage: BTreeMap<String, Vec<u8>>,
78    /// Detected log anomalies keyed by anomaly id. Populated via the
79    /// `/_fakecloud/logs/anomalies/inject` admin endpoint and surfaced
80    /// through ListAnomalies / UpdateAnomaly.
81    #[serde(default)]
82    pub anomalies: BTreeMap<String, LogAnomaly>,
83}
84
85#[derive(Clone, serde::Serialize, serde::Deserialize)]
86pub struct LogAnomaly {
87    pub anomaly_id: String,
88    pub anomaly_detector_arn: String,
89    pub log_group_arn_list: Vec<String>,
90    pub pattern_id: String,
91    pub pattern_string: String,
92    pub first_seen: i64,
93    pub last_seen: i64,
94    pub priority: String,
95    pub state: String,
96    pub suppressed: bool,
97}
98
99impl LogsState {
100    pub fn new(account_id: &str, region: &str) -> Self {
101        Self {
102            account_id: account_id.to_string(),
103            region: region.to_string(),
104            log_groups: BTreeMap::new(),
105            metric_filters: Vec::new(),
106            resource_policies: BTreeMap::new(),
107            destinations: BTreeMap::new(),
108            queries: BTreeMap::new(),
109            export_tasks: Vec::new(),
110            delivery_destinations: BTreeMap::new(),
111            delivery_sources: BTreeMap::new(),
112            deliveries: BTreeMap::new(),
113            query_definitions: BTreeMap::new(),
114            account_policies: BTreeMap::new(),
115            anomaly_detectors: BTreeMap::new(),
116            import_tasks: BTreeMap::new(),
117            integrations: BTreeMap::new(),
118            lookup_tables: BTreeMap::new(),
119            scheduled_queries: BTreeMap::new(),
120            s3_table_sources: BTreeMap::new(),
121            bearer_token_auth: BTreeMap::new(),
122            export_storage: BTreeMap::new(),
123            anomalies: BTreeMap::new(),
124        }
125    }
126
127    pub fn reset(&mut self) {
128        self.log_groups.clear();
129        self.metric_filters.clear();
130        self.resource_policies.clear();
131        self.destinations.clear();
132        self.queries.clear();
133        self.export_tasks.clear();
134        self.delivery_destinations.clear();
135        self.delivery_sources.clear();
136        self.deliveries.clear();
137        self.query_definitions.clear();
138        self.account_policies.clear();
139        self.anomaly_detectors.clear();
140        self.import_tasks.clear();
141        self.integrations.clear();
142        self.lookup_tables.clear();
143        self.scheduled_queries.clear();
144        self.s3_table_sources.clear();
145        self.bearer_token_auth.clear();
146        self.export_storage.clear();
147        self.anomalies.clear();
148    }
149}
150
151#[derive(Clone, serde::Serialize, serde::Deserialize)]
152pub struct LogGroup {
153    pub name: String,
154    pub arn: String,
155    pub creation_time: i64,
156    pub retention_in_days: Option<i32>,
157    pub kms_key_id: Option<String>,
158    pub tags: BTreeMap<String, String>,
159    pub log_streams: BTreeMap<String, LogStream>,
160    pub stored_bytes: i64,
161    pub subscription_filters: Vec<SubscriptionFilter>,
162    pub data_protection_policy: Option<DataProtectionPolicy>,
163    pub index_policies: Vec<IndexPolicy>,
164    pub transformer: Option<Transformer>,
165    pub deletion_protection: bool,
166    /// `STANDARD` (default), `INFREQUENT_ACCESS`, or `DELIVERY`. Set at
167    /// creation time via `CreateLogGroup`'s `logGroupClass` parameter.
168    /// Tracked here so `DescribeLogGroups` round-trips it correctly.
169    pub log_group_class: Option<String>,
170}
171
172#[derive(Clone, serde::Serialize, serde::Deserialize)]
173pub struct LogStream {
174    pub name: String,
175    pub arn: String,
176    pub creation_time: i64,
177    pub first_event_timestamp: Option<i64>,
178    pub last_event_timestamp: Option<i64>,
179    pub last_ingestion_time: Option<i64>,
180    pub upload_sequence_token: String,
181    pub events: Vec<LogEvent>,
182}
183
184#[derive(Clone, serde::Serialize, serde::Deserialize)]
185pub struct LogEvent {
186    pub timestamp: i64,
187    pub message: String,
188    pub ingestion_time: i64,
189}
190
191#[derive(Clone, serde::Serialize, serde::Deserialize)]
192pub struct SubscriptionFilter {
193    pub filter_name: String,
194    pub log_group_name: String,
195    pub filter_pattern: String,
196    pub destination_arn: String,
197    pub role_arn: Option<String>,
198    pub distribution: String,
199    pub creation_time: i64,
200}
201
202#[derive(Clone, serde::Serialize, serde::Deserialize)]
203pub struct MetricFilter {
204    pub filter_name: String,
205    pub filter_pattern: String,
206    pub log_group_name: String,
207    pub metric_transformations: Vec<MetricTransformation>,
208    pub creation_time: i64,
209}
210
211#[derive(Clone, serde::Serialize, serde::Deserialize)]
212pub struct MetricTransformation {
213    pub metric_name: String,
214    pub metric_namespace: String,
215    pub metric_value: String,
216    pub default_value: Option<f64>,
217}
218
219#[derive(Clone, serde::Serialize, serde::Deserialize)]
220pub struct ResourcePolicy {
221    pub policy_name: String,
222    pub policy_document: String,
223    pub last_updated_time: i64,
224}
225
226#[derive(Clone, serde::Serialize, serde::Deserialize)]
227pub struct Destination {
228    pub destination_name: String,
229    pub target_arn: String,
230    pub role_arn: String,
231    pub arn: String,
232    pub access_policy: Option<String>,
233    pub creation_time: i64,
234    pub tags: BTreeMap<String, String>,
235}
236
237#[derive(Clone, serde::Serialize, serde::Deserialize)]
238pub struct QueryInfo {
239    pub query_id: String,
240    pub log_group_name: String,
241    /// Every log group / identifier referenced by this query, used by
242    /// `ListLogGroupsForQuery`. Always includes `log_group_name` plus any
243    /// names from `logGroupNames` / identifiers from `logGroupIdentifiers`
244    /// passed at start time.
245    #[serde(default)]
246    pub log_group_identifiers: Vec<String>,
247    pub query_string: String,
248    pub start_time: i64,
249    pub end_time: i64,
250    pub status: String,
251    pub create_time: i64,
252}
253
254#[derive(Clone, serde::Serialize, serde::Deserialize)]
255pub struct ExportTask {
256    pub task_id: String,
257    pub task_name: Option<String>,
258    pub log_group_name: String,
259    pub log_stream_name_prefix: Option<String>,
260    pub from_time: i64,
261    pub to_time: i64,
262    pub destination: String,
263    pub destination_prefix: String,
264    pub status_code: String,
265    pub status_message: String,
266    #[serde(default)]
267    pub creation_time: i64,
268    #[serde(default)]
269    pub completion_time: Option<i64>,
270}
271
272#[derive(Clone, serde::Serialize, serde::Deserialize)]
273pub struct DeliveryDestination {
274    pub name: String,
275    pub arn: String,
276    pub output_format: Option<String>,
277    pub delivery_destination_configuration: BTreeMap<String, String>,
278    pub tags: BTreeMap<String, String>,
279    pub delivery_destination_policy: Option<String>,
280}
281
282#[derive(Clone, serde::Serialize, serde::Deserialize)]
283pub struct DeliverySource {
284    pub name: String,
285    pub arn: String,
286    pub resource_arns: Vec<String>,
287    pub service: String,
288    pub log_type: String,
289    pub tags: BTreeMap<String, String>,
290    #[serde(default)]
291    pub created_at: i64,
292}
293
294#[derive(Clone, serde::Serialize, serde::Deserialize)]
295pub struct Delivery {
296    pub id: String,
297    pub delivery_source_name: String,
298    pub delivery_destination_arn: String,
299    pub delivery_destination_type: String,
300    pub arn: String,
301    pub tags: BTreeMap<String, String>,
302    #[serde(default)]
303    pub field_delimiter: Option<String>,
304    #[serde(default)]
305    pub record_fields: Vec<String>,
306    #[serde(default)]
307    pub s3_delivery_configuration: Option<serde_json::Value>,
308    #[serde(default)]
309    pub created_at: i64,
310}
311
312#[derive(Clone, serde::Serialize, serde::Deserialize)]
313pub struct QueryDefinition {
314    pub query_definition_id: String,
315    pub name: String,
316    pub query_string: String,
317    pub log_group_names: Vec<String>,
318    pub last_modified: i64,
319}
320
321#[derive(Clone, serde::Serialize, serde::Deserialize)]
322pub struct AccountPolicy {
323    pub policy_name: String,
324    pub policy_type: String,
325    pub policy_document: String,
326    pub scope: Option<String>,
327    pub selection_criteria: Option<String>,
328    pub account_id: String,
329    pub last_updated_time: i64,
330}
331
332#[derive(Clone, serde::Serialize, serde::Deserialize)]
333pub struct DataProtectionPolicy {
334    pub policy_document: String,
335    pub last_updated_time: i64,
336}
337
338#[derive(Clone, serde::Serialize, serde::Deserialize)]
339pub struct IndexPolicy {
340    pub policy_name: String,
341    pub policy_document: String,
342    pub last_updated_time: i64,
343}
344
345#[derive(Clone, serde::Serialize, serde::Deserialize)]
346pub struct Transformer {
347    pub transformer_config: serde_json::Value,
348    pub creation_time: i64,
349    pub last_modified_time: i64,
350}
351
352#[derive(Clone, serde::Serialize, serde::Deserialize)]
353pub struct AnomalyDetector {
354    pub detector_name: String,
355    pub arn: String,
356    pub log_group_arn_list: Vec<String>,
357    pub evaluation_frequency: Option<String>,
358    pub filter_pattern: Option<String>,
359    pub anomaly_visibility_time: Option<i64>,
360    pub creation_time: i64,
361    pub last_modified_time: i64,
362    pub enabled: bool,
363}
364
365#[derive(Clone, serde::Serialize, serde::Deserialize)]
366pub struct ImportTask {
367    pub import_id: String,
368    pub import_source_arn: String,
369    pub import_role_arn: String,
370    pub log_group_name: Option<String>,
371    pub status: String,
372    pub creation_time: i64,
373}
374
375#[derive(Clone, serde::Serialize, serde::Deserialize)]
376pub struct Integration {
377    pub integration_name: String,
378    pub integration_type: String,
379    pub resource_config: serde_json::Value,
380    pub status: String,
381    pub creation_time: i64,
382}
383
384#[derive(Clone, serde::Serialize, serde::Deserialize)]
385pub struct LookupTable {
386    pub lookup_table_name: String,
387    pub arn: String,
388    pub table_body: String,
389    pub creation_time: i64,
390    pub last_modified_time: i64,
391}
392
393#[derive(Clone, serde::Serialize, serde::Deserialize)]
394pub struct ScheduledQuery {
395    pub name: String,
396    pub arn: String,
397    pub query_string: String,
398    pub query_language: String,
399    pub schedule_expression: String,
400    pub execution_role_arn: String,
401    pub status: String,
402    pub creation_time: i64,
403    pub last_modified_time: i64,
404}
405
406/// On-disk snapshot envelope for CloudWatch Logs state. Versioned so
407/// format changes fail loudly on upgrade.
408#[derive(Clone, serde::Serialize, serde::Deserialize)]
409pub struct LogsSnapshot {
410    pub schema_version: u32,
411    #[serde(default)]
412    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<LogsState>>,
413    #[serde(default)]
414    pub state: Option<LogsState>,
415}
416
417pub const LOGS_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn new_initializes_empty() {
425        let state = LogsState::new("123456789012", "us-east-1");
426        assert_eq!(state.account_id, "123456789012");
427        assert_eq!(state.region, "us-east-1");
428        assert!(state.log_groups.is_empty());
429        assert!(state.queries.is_empty());
430    }
431
432    #[test]
433    fn reset_clears_state() {
434        let mut state = LogsState::new("123456789012", "us-east-1");
435        state.bearer_token_auth.insert("g".to_string(), true);
436        state.reset();
437        assert!(state.bearer_token_auth.is_empty());
438    }
439}