Skip to main content

awsim_lambda/
state.rs

1use awsim_core::{Body, BodyStore, Snapshottable};
2use dashmap::DashMap;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6
7/// Lambda state — per account and region.
8#[derive(Debug, Default)]
9pub struct LambdaState {
10    pub functions: DashMap<String, LambdaFunction>,
11    pub event_source_mappings: DashMap<String, EventSourceMapping>,
12    pub layers: DashMap<String, Vec<LayerVersion>>,
13    /// function_name → FunctionUrlConfig
14    pub url_configs: DashMap<String, FunctionUrlConfig>,
15    /// function_name[:qualifier] → EventInvokeConfig
16    pub event_invoke_configs: DashMap<String, EventInvokeConfig>,
17    pub body_store: OnceLock<Arc<BodyStore>>,
18}
19
20impl LambdaState {
21    pub fn body_store(&self) -> Option<&Arc<BodyStore>> {
22        self.body_store.get()
23    }
24
25    pub fn set_body_store(&self, store: Arc<BodyStore>) {
26        let _ = self.body_store.set(store);
27    }
28}
29
30impl Snapshottable for LambdaState {
31    type Snapshot = LambdaRegionSnapshot;
32
33    fn to_snapshot(&self, account_id: &str, region: &str) -> Self::Snapshot {
34        let functions = self
35            .functions
36            .iter()
37            .map(|entry| {
38                let f = entry.value();
39                FunctionSnapshot {
40                    account_id: account_id.to_string(),
41                    region: region.to_string(),
42                    name: f.name.clone(),
43                    arn: f.arn.clone(),
44                    runtime: f.runtime.clone(),
45                    role: f.role.clone(),
46                    handler: f.handler.clone(),
47                    description: f.description.clone(),
48                    timeout: f.timeout,
49                    memory_size: f.memory_size,
50                    code_sha256: f.code_sha256.clone(),
51                    code_size: f.code_size,
52                    environment: f.environment.clone(),
53                    version: f.version.clone(),
54                    versions: f
55                        .versions
56                        .iter()
57                        .map(|v| FunctionVersionSnapshot {
58                            version: v.version.clone(),
59                            description: v.description.clone(),
60                            code_sha256: v.code_sha256.clone(),
61                            code_size: v.code_size,
62                            last_modified: v.last_modified.clone(),
63                        })
64                        .collect(),
65                    aliases: f
66                        .aliases
67                        .iter()
68                        .map(|(k, a)| {
69                            (
70                                k.clone(),
71                                AliasSnapshot {
72                                    name: a.name.clone(),
73                                    arn: a.arn.clone(),
74                                    function_version: a.function_version.clone(),
75                                    description: a.description.clone(),
76                                },
77                            )
78                        })
79                        .collect(),
80                    last_modified: f.last_modified.clone(),
81                    state: f.state.clone(),
82                    policy_statements: f.policy_statements.clone(),
83                    tags: f.tags.clone(),
84                }
85            })
86            .collect();
87
88        LambdaRegionSnapshot {
89            account_id: account_id.to_string(),
90            region: region.to_string(),
91            functions,
92        }
93    }
94
95    fn from_snapshot(snapshot: Self::Snapshot) -> (String, String, Self) {
96        let state = LambdaState::default();
97        for fs in snapshot.functions {
98            let versions: Vec<FunctionVersion> = fs
99                .versions
100                .into_iter()
101                .map(|v| FunctionVersion {
102                    version: v.version,
103                    description: v.description,
104                    code_sha256: v.code_sha256,
105                    code_size: v.code_size,
106                    code: None,
107                    last_modified: v.last_modified,
108                })
109                .collect();
110
111            let aliases: HashMap<String, Alias> = fs
112                .aliases
113                .into_iter()
114                .map(|(k, a)| {
115                    (
116                        k,
117                        Alias {
118                            name: a.name,
119                            arn: a.arn,
120                            function_version: a.function_version,
121                            description: a.description,
122                        },
123                    )
124                })
125                .collect();
126
127            let func = LambdaFunction {
128                name: fs.name.clone(),
129                arn: fs.arn,
130                runtime: fs.runtime,
131                role: fs.role,
132                handler: fs.handler,
133                description: fs.description,
134                timeout: fs.timeout,
135                memory_size: fs.memory_size,
136                code_sha256: fs.code_sha256,
137                code_size: fs.code_size,
138                code: None,
139                environment: fs.environment,
140                version: fs.version,
141                versions,
142                aliases,
143                last_modified: fs.last_modified,
144                state: fs.state,
145                invocations: Vec::new(),
146                policy_statements: fs.policy_statements,
147                tags: fs.tags,
148                reserved_concurrent_executions: None,
149                provisioned_concurrency: HashMap::new(),
150            };
151            state.functions.insert(fs.name, func);
152        }
153        (snapshot.account_id, snapshot.region, state)
154    }
155}
156
157#[derive(Debug, Clone, Default)]
158pub struct EventInvokeConfig {
159    pub function_arn: String,
160    pub maximum_retry_attempts: Option<i32>,
161    pub maximum_event_age_in_seconds: Option<i32>,
162    pub destination_on_success: Option<String>,
163    pub destination_on_failure: Option<String>,
164    pub last_modified: f64,
165}
166
167#[derive(Debug, Clone)]
168pub struct LambdaFunction {
169    pub name: String,
170    pub arn: String,
171    pub runtime: Option<String>,
172    pub role: String,
173    pub handler: Option<String>,
174    pub description: String,
175    pub timeout: u32,
176    pub memory_size: u32,
177    pub code_sha256: String,
178    pub code_size: u64,
179    pub code: Option<Body>,
180    pub environment: HashMap<String, String>,
181    /// Always "$LATEST" for the live function.
182    pub version: String,
183    pub versions: Vec<FunctionVersion>,
184    pub aliases: HashMap<String, Alias>,
185    pub last_modified: String,
186    /// "Active", "Pending", "Failed", etc.
187    pub state: String,
188    /// Invocation records for debugging / admin console.
189    pub invocations: Vec<InvocationRecord>,
190    /// Resource-based policy statements (for AddPermission / RemovePermission).
191    pub policy_statements: HashMap<String, serde_json::Value>,
192    /// Tags attached to this function.
193    pub tags: HashMap<String, String>,
194    /// Reserved concurrent executions ceiling per PutFunctionConcurrency.
195    /// `None` means unreserved — the function shares the account pool.
196    pub reserved_concurrent_executions: Option<u32>,
197    /// Provisioned concurrency configurations keyed by qualifier (alias name
198    /// or function version). Each entry tracks the requested capacity along
199    /// with a simulated state machine that flips IN_PROGRESS -> READY.
200    pub provisioned_concurrency: HashMap<String, ProvisionedConcurrencyConfig>,
201}
202
203/// Provisioned concurrency configuration for a single (function, qualifier)
204/// pair. Real Lambda transitions IN_PROGRESS -> READY asynchronously; we
205/// flip immediately because the emulator never has provisioning latency.
206#[derive(Debug, Clone)]
207pub struct ProvisionedConcurrencyConfig {
208    pub qualifier: String,
209    pub requested_provisioned_concurrent_executions: u32,
210    pub allocated_provisioned_concurrent_executions: u32,
211    pub available_provisioned_concurrent_executions: u32,
212    pub status: String, // IN_PROGRESS | READY | FAILED
213    pub status_reason: Option<String>,
214    pub last_modified: String,
215}
216
217/// A function URL configuration.
218#[derive(Debug, Clone)]
219pub struct FunctionUrlConfig {
220    /// Kept for potential admin console use.
221    #[allow(dead_code)]
222    pub function_name: String,
223    pub function_arn: String,
224    pub function_url: String,
225    pub auth_type: String,
226    pub cors: Option<serde_json::Value>,
227    pub creation_time: String,
228    pub last_modified_time: String,
229}
230
231#[derive(Debug, Clone)]
232pub struct FunctionVersion {
233    pub version: String,
234    pub description: String,
235    pub code_sha256: String,
236    pub code_size: u64,
237    pub code: Option<Body>,
238    pub last_modified: String,
239}
240
241#[derive(Debug, Clone)]
242pub struct Alias {
243    pub name: String,
244    pub arn: String,
245    pub function_version: String,
246    pub description: String,
247}
248
249/// Stored for debugging and the admin console — fields read externally.
250#[allow(dead_code)]
251#[derive(Debug, Clone)]
252pub struct InvocationRecord {
253    pub invocation_id: String,
254    pub invocation_type: String,
255    pub payload: serde_json::Value,
256    pub response: serde_json::Value,
257    pub status_code: u16,
258    pub timestamp: String,
259}
260
261#[derive(Debug, Clone)]
262pub struct EventSourceMapping {
263    pub uuid: String,
264    pub event_source_arn: String,
265    pub function_arn: String,
266    pub batch_size: u32,
267    /// Stored for potential future use / admin console.
268    #[allow(dead_code)]
269    pub enabled: bool,
270    pub state: String,
271    pub last_modified: String,
272    /// TRIM_HORIZON | LATEST | AT_TIMESTAMP — only meaningful for Kinesis/DDB streams.
273    pub starting_position: Option<String>,
274    pub starting_position_timestamp: Option<f64>,
275    pub maximum_batching_window_in_seconds: u32,
276    pub maximum_record_age_in_seconds: Option<i32>,
277    pub bisect_batch_on_function_error: bool,
278    pub maximum_retry_attempts: Option<i32>,
279    pub parallelization_factor: Option<u32>,
280    pub tumbling_window_in_seconds: Option<u32>,
281    /// Raw FilterCriteria JSON: { "Filters": [{ "Pattern": "..." }, ...] }.
282    pub filter_criteria: Option<serde_json::Value>,
283    /// DestinationConfig.OnFailure.Destination ARN — receives failed batches.
284    pub destination_on_failure: Option<String>,
285    pub function_response_types: Vec<String>,
286    /// Last poll result, surfaced via Get/List for diagnostics.
287    /// "OK", "PROBLEM: <message>", or "No records processed".
288    pub last_processing_result: String,
289    /// Per-shard iterator state for Kinesis/DDB-stream pollers so we don't
290    /// re-deliver records on every tick. Keyed by shard id.
291    pub shard_iterators: HashMap<String, String>,
292}
293
294#[derive(Debug, Serialize, Deserialize)]
295pub struct LambdaStateSnapshot {
296    pub functions: Vec<FunctionSnapshot>,
297}
298
299#[derive(Debug, Serialize, Deserialize)]
300pub struct LambdaRegionSnapshot {
301    pub account_id: String,
302    pub region: String,
303    pub functions: Vec<FunctionSnapshot>,
304}
305
306#[derive(Debug, Serialize, Deserialize)]
307pub struct FunctionSnapshot {
308    pub account_id: String,
309    pub region: String,
310    pub name: String,
311    pub arn: String,
312    pub runtime: Option<String>,
313    pub role: String,
314    pub handler: Option<String>,
315    pub description: String,
316    pub timeout: u32,
317    pub memory_size: u32,
318    pub code_sha256: String,
319    pub code_size: u64,
320    pub environment: HashMap<String, String>,
321    pub version: String,
322    pub versions: Vec<FunctionVersionSnapshot>,
323    pub aliases: HashMap<String, AliasSnapshot>,
324    pub last_modified: String,
325    pub state: String,
326    #[serde(default)]
327    pub policy_statements: HashMap<String, serde_json::Value>,
328    #[serde(default)]
329    pub tags: HashMap<String, String>,
330}
331
332#[derive(Debug, Serialize, Deserialize)]
333pub struct FunctionVersionSnapshot {
334    pub version: String,
335    pub description: String,
336    pub code_sha256: String,
337    pub code_size: u64,
338    pub last_modified: String,
339}
340
341#[derive(Debug, Serialize, Deserialize)]
342pub struct AliasSnapshot {
343    pub name: String,
344    pub arn: String,
345    pub function_version: String,
346    pub description: String,
347}
348
349#[derive(Debug, Clone)]
350pub struct LayerVersion {
351    /// Layer name kept for reference / admin console.
352    #[allow(dead_code)]
353    pub layer_name: String,
354    pub layer_arn: String,
355    pub version_arn: String,
356    pub version: u64,
357    pub description: String,
358    pub compatible_runtimes: Vec<String>,
359    pub code_sha256: String,
360    pub code_size: u64,
361    /// Raw zip bytes stored for future execution support.
362    #[allow(dead_code)]
363    pub code_data: Option<Vec<u8>>,
364    pub created_date: String,
365}