Skip to main content

fakecloud_logs/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::RwLock;
5
6pub type SharedLogsState = Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<LogsState>>>;
7
8impl fakecloud_core::multi_account::AccountState for LogsState {
9    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
10        Self::new(account_id, region)
11    }
12}
13
14/// JSON object keys must be strings, so serialize
15/// `HashMap<(String,String), AccountPolicy>` as a list of
16/// `[policy_name, policy_type, policy]` tuples.
17mod account_policy_map_serde {
18    use super::AccountPolicy;
19    use serde::{Deserialize, Deserializer, Serialize, Serializer};
20    use std::collections::HashMap;
21
22    pub fn serialize<S: Serializer>(
23        map: &HashMap<(String, String), AccountPolicy>,
24        s: S,
25    ) -> Result<S::Ok, S::Error> {
26        let entries: Vec<(&String, &String, &AccountPolicy)> = map
27            .iter()
28            .map(|((name, kind), p)| (name, kind, p))
29            .collect();
30        entries.serialize(s)
31    }
32
33    pub fn deserialize<'de, D: Deserializer<'de>>(
34        d: D,
35    ) -> Result<HashMap<(String, String), AccountPolicy>, D::Error> {
36        let entries: Vec<(String, String, AccountPolicy)> = Vec::deserialize(d)?;
37        Ok(entries
38            .into_iter()
39            .map(|(name, kind, p)| ((name, kind), p))
40            .collect())
41    }
42}
43
44#[derive(Clone, serde::Serialize, serde::Deserialize)]
45pub struct LogsState {
46    pub account_id: String,
47    pub region: String,
48    pub log_groups: HashMap<String, LogGroup>,
49    pub metric_filters: Vec<MetricFilter>,
50    pub resource_policies: HashMap<String, ResourcePolicy>,
51    pub destinations: HashMap<String, Destination>,
52    pub queries: HashMap<String, QueryInfo>,
53    pub export_tasks: Vec<ExportTask>,
54    pub delivery_destinations: HashMap<String, DeliveryDestination>,
55    pub delivery_sources: HashMap<String, DeliverySource>,
56    pub deliveries: HashMap<String, Delivery>,
57    pub query_definitions: HashMap<String, QueryDefinition>,
58    /// Account policies keyed by (policy_name, policy_type)
59    #[serde(with = "account_policy_map_serde")]
60    pub account_policies: HashMap<(String, String), AccountPolicy>,
61    /// Anomaly detectors keyed by detector ARN
62    pub anomaly_detectors: HashMap<String, AnomalyDetector>,
63    /// Import tasks keyed by import ID
64    pub import_tasks: HashMap<String, ImportTask>,
65    /// Integrations keyed by integration name
66    pub integrations: HashMap<String, Integration>,
67    /// Lookup tables keyed by ARN
68    pub lookup_tables: HashMap<String, LookupTable>,
69    /// Scheduled queries keyed by identifier (ARN)
70    pub scheduled_queries: HashMap<String, ScheduledQuery>,
71    /// S3 table integration sources keyed by integration ARN -> list of source identifiers
72    pub s3_table_sources: HashMap<String, Vec<String>>,
73    /// Bearer token authentication flag per log group
74    pub bearer_token_auth: HashMap<String, bool>,
75    /// Internal export storage: keyed by "bucket/prefix/..." path, value is exported data.
76    /// Used by CreateExportTask and delivery pipeline when direct S3 access is unavailable.
77    pub export_storage: HashMap<String, Vec<u8>>,
78}
79
80impl LogsState {
81    pub fn new(account_id: &str, region: &str) -> Self {
82        Self {
83            account_id: account_id.to_string(),
84            region: region.to_string(),
85            log_groups: HashMap::new(),
86            metric_filters: Vec::new(),
87            resource_policies: HashMap::new(),
88            destinations: HashMap::new(),
89            queries: HashMap::new(),
90            export_tasks: Vec::new(),
91            delivery_destinations: HashMap::new(),
92            delivery_sources: HashMap::new(),
93            deliveries: HashMap::new(),
94            query_definitions: HashMap::new(),
95            account_policies: HashMap::new(),
96            anomaly_detectors: HashMap::new(),
97            import_tasks: HashMap::new(),
98            integrations: HashMap::new(),
99            lookup_tables: HashMap::new(),
100            scheduled_queries: HashMap::new(),
101            s3_table_sources: HashMap::new(),
102            bearer_token_auth: HashMap::new(),
103            export_storage: HashMap::new(),
104        }
105    }
106
107    pub fn reset(&mut self) {
108        self.log_groups.clear();
109        self.metric_filters.clear();
110        self.resource_policies.clear();
111        self.destinations.clear();
112        self.queries.clear();
113        self.export_tasks.clear();
114        self.delivery_destinations.clear();
115        self.delivery_sources.clear();
116        self.deliveries.clear();
117        self.query_definitions.clear();
118        self.account_policies.clear();
119        self.anomaly_detectors.clear();
120        self.import_tasks.clear();
121        self.integrations.clear();
122        self.lookup_tables.clear();
123        self.scheduled_queries.clear();
124        self.s3_table_sources.clear();
125        self.bearer_token_auth.clear();
126        self.export_storage.clear();
127    }
128}
129
130#[derive(Clone, serde::Serialize, serde::Deserialize)]
131pub struct LogGroup {
132    pub name: String,
133    pub arn: String,
134    pub creation_time: i64,
135    pub retention_in_days: Option<i32>,
136    pub kms_key_id: Option<String>,
137    pub tags: HashMap<String, String>,
138    pub log_streams: HashMap<String, LogStream>,
139    pub stored_bytes: i64,
140    pub subscription_filters: Vec<SubscriptionFilter>,
141    pub data_protection_policy: Option<DataProtectionPolicy>,
142    pub index_policies: Vec<IndexPolicy>,
143    pub transformer: Option<Transformer>,
144    pub deletion_protection: bool,
145    /// `STANDARD` (default), `INFREQUENT_ACCESS`, or `DELIVERY`. Set at
146    /// creation time via `CreateLogGroup`'s `logGroupClass` parameter.
147    /// Tracked here so `DescribeLogGroups` round-trips it correctly.
148    pub log_group_class: Option<String>,
149}
150
151#[derive(Clone, serde::Serialize, serde::Deserialize)]
152pub struct LogStream {
153    pub name: String,
154    pub arn: String,
155    pub creation_time: i64,
156    pub first_event_timestamp: Option<i64>,
157    pub last_event_timestamp: Option<i64>,
158    pub last_ingestion_time: Option<i64>,
159    pub upload_sequence_token: String,
160    pub events: Vec<LogEvent>,
161}
162
163#[derive(Clone, serde::Serialize, serde::Deserialize)]
164pub struct LogEvent {
165    pub timestamp: i64,
166    pub message: String,
167    pub ingestion_time: i64,
168}
169
170#[derive(Clone, serde::Serialize, serde::Deserialize)]
171pub struct SubscriptionFilter {
172    pub filter_name: String,
173    pub log_group_name: String,
174    pub filter_pattern: String,
175    pub destination_arn: String,
176    pub role_arn: Option<String>,
177    pub distribution: String,
178    pub creation_time: i64,
179}
180
181#[derive(Clone, serde::Serialize, serde::Deserialize)]
182pub struct MetricFilter {
183    pub filter_name: String,
184    pub filter_pattern: String,
185    pub log_group_name: String,
186    pub metric_transformations: Vec<MetricTransformation>,
187    pub creation_time: i64,
188}
189
190#[derive(Clone, serde::Serialize, serde::Deserialize)]
191pub struct MetricTransformation {
192    pub metric_name: String,
193    pub metric_namespace: String,
194    pub metric_value: String,
195    pub default_value: Option<f64>,
196}
197
198#[derive(Clone, serde::Serialize, serde::Deserialize)]
199pub struct ResourcePolicy {
200    pub policy_name: String,
201    pub policy_document: String,
202    pub last_updated_time: i64,
203}
204
205#[derive(Clone, serde::Serialize, serde::Deserialize)]
206pub struct Destination {
207    pub destination_name: String,
208    pub target_arn: String,
209    pub role_arn: String,
210    pub arn: String,
211    pub access_policy: Option<String>,
212    pub creation_time: i64,
213    pub tags: HashMap<String, String>,
214}
215
216#[derive(Clone, serde::Serialize, serde::Deserialize)]
217pub struct QueryInfo {
218    pub query_id: String,
219    pub log_group_name: String,
220    /// Every log group / identifier referenced by this query, used by
221    /// `ListLogGroupsForQuery`. Always includes `log_group_name` plus any
222    /// names from `logGroupNames` / identifiers from `logGroupIdentifiers`
223    /// passed at start time.
224    #[serde(default)]
225    pub log_group_identifiers: Vec<String>,
226    pub query_string: String,
227    pub start_time: i64,
228    pub end_time: i64,
229    pub status: String,
230    pub create_time: i64,
231}
232
233#[derive(Clone, serde::Serialize, serde::Deserialize)]
234pub struct ExportTask {
235    pub task_id: String,
236    pub task_name: Option<String>,
237    pub log_group_name: String,
238    pub log_stream_name_prefix: Option<String>,
239    pub from_time: i64,
240    pub to_time: i64,
241    pub destination: String,
242    pub destination_prefix: String,
243    pub status_code: String,
244    pub status_message: String,
245}
246
247#[derive(Clone, serde::Serialize, serde::Deserialize)]
248pub struct DeliveryDestination {
249    pub name: String,
250    pub arn: String,
251    pub output_format: Option<String>,
252    pub delivery_destination_configuration: HashMap<String, String>,
253    pub tags: HashMap<String, String>,
254    pub delivery_destination_policy: Option<String>,
255}
256
257#[derive(Clone, serde::Serialize, serde::Deserialize)]
258pub struct DeliverySource {
259    pub name: String,
260    pub arn: String,
261    pub resource_arns: Vec<String>,
262    pub service: String,
263    pub log_type: String,
264    pub tags: HashMap<String, String>,
265}
266
267#[derive(Clone, serde::Serialize, serde::Deserialize)]
268pub struct Delivery {
269    pub id: String,
270    pub delivery_source_name: String,
271    pub delivery_destination_arn: String,
272    pub delivery_destination_type: String,
273    pub arn: String,
274    pub tags: HashMap<String, String>,
275}
276
277#[derive(Clone, serde::Serialize, serde::Deserialize)]
278pub struct QueryDefinition {
279    pub query_definition_id: String,
280    pub name: String,
281    pub query_string: String,
282    pub log_group_names: Vec<String>,
283    pub last_modified: i64,
284}
285
286#[derive(Clone, serde::Serialize, serde::Deserialize)]
287pub struct AccountPolicy {
288    pub policy_name: String,
289    pub policy_type: String,
290    pub policy_document: String,
291    pub scope: Option<String>,
292    pub selection_criteria: Option<String>,
293    pub account_id: String,
294    pub last_updated_time: i64,
295}
296
297#[derive(Clone, serde::Serialize, serde::Deserialize)]
298pub struct DataProtectionPolicy {
299    pub policy_document: String,
300    pub last_updated_time: i64,
301}
302
303#[derive(Clone, serde::Serialize, serde::Deserialize)]
304pub struct IndexPolicy {
305    pub policy_name: String,
306    pub policy_document: String,
307    pub last_updated_time: i64,
308}
309
310#[derive(Clone, serde::Serialize, serde::Deserialize)]
311pub struct Transformer {
312    pub transformer_config: serde_json::Value,
313    pub creation_time: i64,
314    pub last_modified_time: i64,
315}
316
317#[derive(Clone, serde::Serialize, serde::Deserialize)]
318pub struct AnomalyDetector {
319    pub detector_name: String,
320    pub arn: String,
321    pub log_group_arn_list: Vec<String>,
322    pub evaluation_frequency: Option<String>,
323    pub filter_pattern: Option<String>,
324    pub anomaly_visibility_time: Option<i64>,
325    pub creation_time: i64,
326    pub last_modified_time: i64,
327    pub enabled: bool,
328}
329
330#[derive(Clone, serde::Serialize, serde::Deserialize)]
331pub struct ImportTask {
332    pub import_id: String,
333    pub import_source_arn: String,
334    pub import_role_arn: String,
335    pub log_group_name: Option<String>,
336    pub status: String,
337    pub creation_time: i64,
338}
339
340#[derive(Clone, serde::Serialize, serde::Deserialize)]
341pub struct Integration {
342    pub integration_name: String,
343    pub integration_type: String,
344    pub resource_config: serde_json::Value,
345    pub status: String,
346    pub creation_time: i64,
347}
348
349#[derive(Clone, serde::Serialize, serde::Deserialize)]
350pub struct LookupTable {
351    pub lookup_table_name: String,
352    pub arn: String,
353    pub table_body: String,
354    pub creation_time: i64,
355    pub last_modified_time: i64,
356}
357
358#[derive(Clone, serde::Serialize, serde::Deserialize)]
359pub struct ScheduledQuery {
360    pub name: String,
361    pub arn: String,
362    pub query_string: String,
363    pub query_language: String,
364    pub schedule_expression: String,
365    pub execution_role_arn: String,
366    pub status: String,
367    pub creation_time: i64,
368    pub last_modified_time: i64,
369}
370
371/// On-disk snapshot envelope for CloudWatch Logs state. Versioned so
372/// format changes fail loudly on upgrade.
373#[derive(Clone, serde::Serialize, serde::Deserialize)]
374pub struct LogsSnapshot {
375    pub schema_version: u32,
376    #[serde(default)]
377    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<LogsState>>,
378    #[serde(default)]
379    pub state: Option<LogsState>,
380}
381
382pub const LOGS_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn new_initializes_empty() {
390        let state = LogsState::new("123456789012", "us-east-1");
391        assert_eq!(state.account_id, "123456789012");
392        assert_eq!(state.region, "us-east-1");
393        assert!(state.log_groups.is_empty());
394        assert!(state.queries.is_empty());
395    }
396
397    #[test]
398    fn reset_clears_state() {
399        let mut state = LogsState::new("123456789012", "us-east-1");
400        state.bearer_token_auth.insert("g".to_string(), true);
401        state.reset();
402        assert!(state.bearer_token_auth.is_empty());
403    }
404}