Skip to main content

fakecloud_logs/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::RwLock;
5
6pub type SharedLogsState = Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<LogsState>>>;
7
8impl fakecloud_core::multi_account::AccountState for LogsState {
9    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
10        Self::new(account_id, region)
11    }
12}
13
14/// JSON object keys must be strings, so serialize
15/// `HashMap<(String,String), AccountPolicy>` as a list of
16/// `[policy_name, policy_type, policy]` tuples.
17mod account_policy_map_serde {
18    use super::AccountPolicy;
19    use serde::{Deserialize, Deserializer, Serialize, Serializer};
20    use std::collections::HashMap;
21
22    pub fn serialize<S: Serializer>(
23        map: &HashMap<(String, String), AccountPolicy>,
24        s: S,
25    ) -> Result<S::Ok, S::Error> {
26        let entries: Vec<(&String, &String, &AccountPolicy)> = map
27            .iter()
28            .map(|((name, kind), p)| (name, kind, p))
29            .collect();
30        entries.serialize(s)
31    }
32
33    pub fn deserialize<'de, D: Deserializer<'de>>(
34        d: D,
35    ) -> Result<HashMap<(String, String), AccountPolicy>, D::Error> {
36        let entries: Vec<(String, String, AccountPolicy)> = Vec::deserialize(d)?;
37        Ok(entries
38            .into_iter()
39            .map(|(name, kind, p)| ((name, kind), p))
40            .collect())
41    }
42}
43
44#[derive(Clone, serde::Serialize, serde::Deserialize)]
45pub struct LogsState {
46    pub account_id: String,
47    pub region: String,
48    pub log_groups: HashMap<String, LogGroup>,
49    pub metric_filters: Vec<MetricFilter>,
50    pub resource_policies: HashMap<String, ResourcePolicy>,
51    pub destinations: HashMap<String, Destination>,
52    pub queries: HashMap<String, QueryInfo>,
53    pub export_tasks: Vec<ExportTask>,
54    pub delivery_destinations: HashMap<String, DeliveryDestination>,
55    pub delivery_sources: HashMap<String, DeliverySource>,
56    pub deliveries: HashMap<String, Delivery>,
57    pub query_definitions: HashMap<String, QueryDefinition>,
58    /// Account policies keyed by (policy_name, policy_type)
59    #[serde(with = "account_policy_map_serde")]
60    pub account_policies: HashMap<(String, String), AccountPolicy>,
61    /// Anomaly detectors keyed by detector ARN
62    pub anomaly_detectors: HashMap<String, AnomalyDetector>,
63    /// Import tasks keyed by import ID
64    pub import_tasks: HashMap<String, ImportTask>,
65    /// Integrations keyed by integration name
66    pub integrations: HashMap<String, Integration>,
67    /// Lookup tables keyed by ARN
68    pub lookup_tables: HashMap<String, LookupTable>,
69    /// Scheduled queries keyed by identifier (ARN)
70    pub scheduled_queries: HashMap<String, ScheduledQuery>,
71    /// S3 table integration sources keyed by integration ARN -> list of source identifiers
72    pub s3_table_sources: HashMap<String, Vec<String>>,
73    /// Bearer token authentication flag per log group
74    pub bearer_token_auth: HashMap<String, bool>,
75    /// Internal export storage: keyed by "bucket/prefix/..." path, value is exported data.
76    /// Used by CreateExportTask and delivery pipeline when direct S3 access is unavailable.
77    pub export_storage: HashMap<String, Vec<u8>>,
78}
79
80impl LogsState {
81    pub fn new(account_id: &str, region: &str) -> Self {
82        Self {
83            account_id: account_id.to_string(),
84            region: region.to_string(),
85            log_groups: HashMap::new(),
86            metric_filters: Vec::new(),
87            resource_policies: HashMap::new(),
88            destinations: HashMap::new(),
89            queries: HashMap::new(),
90            export_tasks: Vec::new(),
91            delivery_destinations: HashMap::new(),
92            delivery_sources: HashMap::new(),
93            deliveries: HashMap::new(),
94            query_definitions: HashMap::new(),
95            account_policies: HashMap::new(),
96            anomaly_detectors: HashMap::new(),
97            import_tasks: HashMap::new(),
98            integrations: HashMap::new(),
99            lookup_tables: HashMap::new(),
100            scheduled_queries: HashMap::new(),
101            s3_table_sources: HashMap::new(),
102            bearer_token_auth: HashMap::new(),
103            export_storage: HashMap::new(),
104        }
105    }
106
107    pub fn reset(&mut self) {
108        self.log_groups.clear();
109        self.metric_filters.clear();
110        self.resource_policies.clear();
111        self.destinations.clear();
112        self.queries.clear();
113        self.export_tasks.clear();
114        self.delivery_destinations.clear();
115        self.delivery_sources.clear();
116        self.deliveries.clear();
117        self.query_definitions.clear();
118        self.account_policies.clear();
119        self.anomaly_detectors.clear();
120        self.import_tasks.clear();
121        self.integrations.clear();
122        self.lookup_tables.clear();
123        self.scheduled_queries.clear();
124        self.s3_table_sources.clear();
125        self.bearer_token_auth.clear();
126        self.export_storage.clear();
127    }
128}
129
130#[derive(Clone, serde::Serialize, serde::Deserialize)]
131pub struct LogGroup {
132    pub name: String,
133    pub arn: String,
134    pub creation_time: i64,
135    pub retention_in_days: Option<i32>,
136    pub kms_key_id: Option<String>,
137    pub tags: HashMap<String, String>,
138    pub log_streams: HashMap<String, LogStream>,
139    pub stored_bytes: i64,
140    pub subscription_filters: Vec<SubscriptionFilter>,
141    pub data_protection_policy: Option<DataProtectionPolicy>,
142    pub index_policies: Vec<IndexPolicy>,
143    pub transformer: Option<Transformer>,
144    pub deletion_protection: bool,
145    /// `STANDARD` (default), `INFREQUENT_ACCESS`, or `DELIVERY`. Set at
146    /// creation time via `CreateLogGroup`'s `logGroupClass` parameter.
147    /// Tracked here so `DescribeLogGroups` round-trips it correctly.
148    pub log_group_class: Option<String>,
149}
150
151#[derive(Clone, serde::Serialize, serde::Deserialize)]
152pub struct LogStream {
153    pub name: String,
154    pub arn: String,
155    pub creation_time: i64,
156    pub first_event_timestamp: Option<i64>,
157    pub last_event_timestamp: Option<i64>,
158    pub last_ingestion_time: Option<i64>,
159    pub upload_sequence_token: String,
160    pub events: Vec<LogEvent>,
161}
162
163#[derive(Clone, serde::Serialize, serde::Deserialize)]
164pub struct LogEvent {
165    pub timestamp: i64,
166    pub message: String,
167    pub ingestion_time: i64,
168}
169
170#[derive(Clone, serde::Serialize, serde::Deserialize)]
171pub struct SubscriptionFilter {
172    pub filter_name: String,
173    pub log_group_name: String,
174    pub filter_pattern: String,
175    pub destination_arn: String,
176    pub role_arn: Option<String>,
177    pub distribution: String,
178    pub creation_time: i64,
179}
180
181#[derive(Clone, serde::Serialize, serde::Deserialize)]
182pub struct MetricFilter {
183    pub filter_name: String,
184    pub filter_pattern: String,
185    pub log_group_name: String,
186    pub metric_transformations: Vec<MetricTransformation>,
187    pub creation_time: i64,
188}
189
190#[derive(Clone, serde::Serialize, serde::Deserialize)]
191pub struct MetricTransformation {
192    pub metric_name: String,
193    pub metric_namespace: String,
194    pub metric_value: String,
195    pub default_value: Option<f64>,
196}
197
198#[derive(Clone, serde::Serialize, serde::Deserialize)]
199pub struct ResourcePolicy {
200    pub policy_name: String,
201    pub policy_document: String,
202    pub last_updated_time: i64,
203}
204
205#[derive(Clone, serde::Serialize, serde::Deserialize)]
206pub struct Destination {
207    pub destination_name: String,
208    pub target_arn: String,
209    pub role_arn: String,
210    pub arn: String,
211    pub access_policy: Option<String>,
212    pub creation_time: i64,
213    pub tags: HashMap<String, String>,
214}
215
216#[derive(Clone, serde::Serialize, serde::Deserialize)]
217pub struct QueryInfo {
218    pub query_id: String,
219    pub log_group_name: String,
220    pub query_string: String,
221    pub start_time: i64,
222    pub end_time: i64,
223    pub status: String,
224    pub create_time: i64,
225}
226
227#[derive(Clone, serde::Serialize, serde::Deserialize)]
228pub struct ExportTask {
229    pub task_id: String,
230    pub task_name: Option<String>,
231    pub log_group_name: String,
232    pub log_stream_name_prefix: Option<String>,
233    pub from_time: i64,
234    pub to_time: i64,
235    pub destination: String,
236    pub destination_prefix: String,
237    pub status_code: String,
238    pub status_message: String,
239}
240
241#[derive(Clone, serde::Serialize, serde::Deserialize)]
242pub struct DeliveryDestination {
243    pub name: String,
244    pub arn: String,
245    pub output_format: Option<String>,
246    pub delivery_destination_configuration: HashMap<String, String>,
247    pub tags: HashMap<String, String>,
248    pub delivery_destination_policy: Option<String>,
249}
250
251#[derive(Clone, serde::Serialize, serde::Deserialize)]
252pub struct DeliverySource {
253    pub name: String,
254    pub arn: String,
255    pub resource_arns: Vec<String>,
256    pub service: String,
257    pub log_type: String,
258    pub tags: HashMap<String, String>,
259}
260
261#[derive(Clone, serde::Serialize, serde::Deserialize)]
262pub struct Delivery {
263    pub id: String,
264    pub delivery_source_name: String,
265    pub delivery_destination_arn: String,
266    pub delivery_destination_type: String,
267    pub arn: String,
268    pub tags: HashMap<String, String>,
269}
270
271#[derive(Clone, serde::Serialize, serde::Deserialize)]
272pub struct QueryDefinition {
273    pub query_definition_id: String,
274    pub name: String,
275    pub query_string: String,
276    pub log_group_names: Vec<String>,
277    pub last_modified: i64,
278}
279
280#[derive(Clone, serde::Serialize, serde::Deserialize)]
281pub struct AccountPolicy {
282    pub policy_name: String,
283    pub policy_type: String,
284    pub policy_document: String,
285    pub scope: Option<String>,
286    pub selection_criteria: Option<String>,
287    pub account_id: String,
288    pub last_updated_time: i64,
289}
290
291#[derive(Clone, serde::Serialize, serde::Deserialize)]
292pub struct DataProtectionPolicy {
293    pub policy_document: String,
294    pub last_updated_time: i64,
295}
296
297#[derive(Clone, serde::Serialize, serde::Deserialize)]
298pub struct IndexPolicy {
299    pub policy_name: String,
300    pub policy_document: String,
301    pub last_updated_time: i64,
302}
303
304#[derive(Clone, serde::Serialize, serde::Deserialize)]
305pub struct Transformer {
306    pub transformer_config: serde_json::Value,
307    pub creation_time: i64,
308    pub last_modified_time: i64,
309}
310
311#[derive(Clone, serde::Serialize, serde::Deserialize)]
312pub struct AnomalyDetector {
313    pub detector_name: String,
314    pub arn: String,
315    pub log_group_arn_list: Vec<String>,
316    pub evaluation_frequency: Option<String>,
317    pub filter_pattern: Option<String>,
318    pub anomaly_visibility_time: Option<i64>,
319    pub creation_time: i64,
320    pub last_modified_time: i64,
321    pub enabled: bool,
322}
323
324#[derive(Clone, serde::Serialize, serde::Deserialize)]
325pub struct ImportTask {
326    pub import_id: String,
327    pub import_source_arn: String,
328    pub import_role_arn: String,
329    pub log_group_name: Option<String>,
330    pub status: String,
331    pub creation_time: i64,
332}
333
334#[derive(Clone, serde::Serialize, serde::Deserialize)]
335pub struct Integration {
336    pub integration_name: String,
337    pub integration_type: String,
338    pub resource_config: serde_json::Value,
339    pub status: String,
340    pub creation_time: i64,
341}
342
343#[derive(Clone, serde::Serialize, serde::Deserialize)]
344pub struct LookupTable {
345    pub lookup_table_name: String,
346    pub arn: String,
347    pub table_body: String,
348    pub creation_time: i64,
349    pub last_modified_time: i64,
350}
351
352#[derive(Clone, serde::Serialize, serde::Deserialize)]
353pub struct ScheduledQuery {
354    pub name: String,
355    pub arn: String,
356    pub query_string: String,
357    pub query_language: String,
358    pub schedule_expression: String,
359    pub execution_role_arn: String,
360    pub status: String,
361    pub creation_time: i64,
362    pub last_modified_time: i64,
363}
364
365/// On-disk snapshot envelope for CloudWatch Logs state. Versioned so
366/// format changes fail loudly on upgrade.
367#[derive(Clone, serde::Serialize, serde::Deserialize)]
368pub struct LogsSnapshot {
369    pub schema_version: u32,
370    #[serde(default)]
371    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<LogsState>>,
372    #[serde(default)]
373    pub state: Option<LogsState>,
374}
375
376pub const LOGS_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn new_initializes_empty() {
384        let state = LogsState::new("123456789012", "us-east-1");
385        assert_eq!(state.account_id, "123456789012");
386        assert_eq!(state.region, "us-east-1");
387        assert!(state.log_groups.is_empty());
388        assert!(state.queries.is_empty());
389    }
390
391    #[test]
392    fn reset_clears_state() {
393        let mut state = LogsState::new("123456789012", "us-east-1");
394        state.bearer_token_auth.insert("g".to_string(), true);
395        state.reset();
396        assert!(state.bearer_token_auth.is_empty());
397    }
398}