Skip to main content

awsim_lambda/
state.rs

1use awsim_core::{Body, BodyStore, Snapshottable};
2use dashmap::DashMap;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6
7/// Lambda state — per account and region.
8#[derive(Debug, Default)]
9pub struct LambdaState {
10    pub functions: DashMap<String, LambdaFunction>,
11    pub event_source_mappings: DashMap<String, EventSourceMapping>,
12    pub layers: DashMap<String, Vec<LayerVersion>>,
13    /// function_name → FunctionUrlConfig
14    pub url_configs: DashMap<String, FunctionUrlConfig>,
15    /// function_name[:qualifier] → EventInvokeConfig
16    pub event_invoke_configs: DashMap<String, EventInvokeConfig>,
17    pub body_store: OnceLock<Arc<BodyStore>>,
18}
19
20impl LambdaState {
21    pub fn body_store(&self) -> Option<&Arc<BodyStore>> {
22        self.body_store.get()
23    }
24
25    pub fn set_body_store(&self, store: Arc<BodyStore>) {
26        let _ = self.body_store.set(store);
27    }
28}
29
30impl Snapshottable for LambdaState {
31    type Snapshot = LambdaRegionSnapshot;
32
33    fn to_snapshot(&self, account_id: &str, region: &str) -> Self::Snapshot {
34        let functions = self
35            .functions
36            .iter()
37            .map(|entry| {
38                let f = entry.value();
39                FunctionSnapshot {
40                    account_id: account_id.to_string(),
41                    region: region.to_string(),
42                    name: f.name.clone(),
43                    arn: f.arn.clone(),
44                    runtime: f.runtime.clone(),
45                    role: f.role.clone(),
46                    handler: f.handler.clone(),
47                    description: f.description.clone(),
48                    timeout: f.timeout,
49                    memory_size: f.memory_size,
50                    code_sha256: f.code_sha256.clone(),
51                    code_size: f.code_size,
52                    environment: f.environment.clone(),
53                    version: f.version.clone(),
54                    versions: f
55                        .versions
56                        .iter()
57                        .map(|v| FunctionVersionSnapshot {
58                            version: v.version.clone(),
59                            description: v.description.clone(),
60                            code_sha256: v.code_sha256.clone(),
61                            code_size: v.code_size,
62                            last_modified: v.last_modified.clone(),
63                        })
64                        .collect(),
65                    aliases: f
66                        .aliases
67                        .iter()
68                        .map(|(k, a)| {
69                            (
70                                k.clone(),
71                                AliasSnapshot {
72                                    name: a.name.clone(),
73                                    arn: a.arn.clone(),
74                                    function_version: a.function_version.clone(),
75                                    description: a.description.clone(),
76                                    routing_config: a.routing_config.clone(),
77                                },
78                            )
79                        })
80                        .collect(),
81                    last_modified: f.last_modified.clone(),
82                    state: f.state.clone(),
83                    policy_statements: f.policy_statements.clone(),
84                    tags: f.tags.clone(),
85                    architectures: f.architectures.clone(),
86                    ephemeral_storage_size: f.ephemeral_storage_size,
87                    package_type: f.package_type.clone(),
88                    layers: f.layers.clone(),
89                    vpc_config: f.vpc_config.clone(),
90                    dead_letter_config: f.dead_letter_config.clone(),
91                    tracing_config: f.tracing_config.clone(),
92                    kms_key_arn: f.kms_key_arn.clone(),
93                    file_system_configs: f.file_system_configs.clone(),
94                    logging_config: f.logging_config.clone(),
95                    snap_start: f.snap_start.clone(),
96                    image_config: f.image_config.clone(),
97                }
98            })
99            .collect();
100
101        LambdaRegionSnapshot {
102            account_id: account_id.to_string(),
103            region: region.to_string(),
104            functions,
105        }
106    }
107
108    fn from_snapshot(snapshot: Self::Snapshot) -> (String, String, Self) {
109        let state = LambdaState::default();
110        for fs in snapshot.functions {
111            let versions: Vec<FunctionVersion> = fs
112                .versions
113                .into_iter()
114                .map(|v| FunctionVersion {
115                    version: v.version,
116                    description: v.description,
117                    code_sha256: v.code_sha256,
118                    code_size: v.code_size,
119                    code: None,
120                    last_modified: v.last_modified,
121                })
122                .collect();
123
124            let aliases: HashMap<String, Alias> = fs
125                .aliases
126                .into_iter()
127                .map(|(k, a)| {
128                    (
129                        k,
130                        Alias {
131                            name: a.name,
132                            arn: a.arn,
133                            function_version: a.function_version,
134                            description: a.description,
135                            routing_config: a.routing_config,
136                        },
137                    )
138                })
139                .collect();
140
141            let func = LambdaFunction {
142                name: fs.name.clone(),
143                arn: fs.arn,
144                runtime: fs.runtime,
145                role: fs.role,
146                handler: fs.handler,
147                description: fs.description,
148                timeout: fs.timeout,
149                memory_size: fs.memory_size,
150                code_sha256: fs.code_sha256,
151                code_size: fs.code_size,
152                code: None,
153                environment: fs.environment,
154                version: fs.version,
155                versions,
156                aliases,
157                last_modified: fs.last_modified,
158                state: fs.state,
159                invocations: Vec::new(),
160                policy_statements: fs.policy_statements,
161                tags: fs.tags,
162                reserved_concurrent_executions: None,
163                provisioned_concurrency: HashMap::new(),
164                architectures: fs.architectures,
165                ephemeral_storage_size: fs.ephemeral_storage_size,
166                package_type: fs.package_type,
167                layers: fs.layers,
168                vpc_config: fs.vpc_config,
169                dead_letter_config: fs.dead_letter_config,
170                tracing_config: fs.tracing_config,
171                kms_key_arn: fs.kms_key_arn,
172                file_system_configs: fs.file_system_configs,
173                logging_config: fs.logging_config,
174                snap_start: fs.snap_start,
175                image_config: fs.image_config,
176            };
177            state.functions.insert(fs.name, func);
178        }
179        (snapshot.account_id, snapshot.region, state)
180    }
181}
182
183#[derive(Debug, Clone, Default)]
184pub struct EventInvokeConfig {
185    pub function_arn: String,
186    pub maximum_retry_attempts: Option<i32>,
187    pub maximum_event_age_in_seconds: Option<i32>,
188    pub destination_on_success: Option<String>,
189    pub destination_on_failure: Option<String>,
190    pub last_modified: f64,
191}
192
193#[derive(Debug, Clone)]
194pub struct LambdaFunction {
195    pub name: String,
196    pub arn: String,
197    pub runtime: Option<String>,
198    pub role: String,
199    pub handler: Option<String>,
200    pub description: String,
201    pub timeout: u32,
202    pub memory_size: u32,
203    pub code_sha256: String,
204    pub code_size: u64,
205    pub code: Option<Body>,
206    pub environment: HashMap<String, String>,
207    /// Always "$LATEST" for the live function.
208    pub version: String,
209    pub versions: Vec<FunctionVersion>,
210    pub aliases: HashMap<String, Alias>,
211    pub last_modified: String,
212    /// "Active", "Pending", "Failed", etc.
213    pub state: String,
214    /// Invocation records for debugging / admin console.
215    pub invocations: Vec<InvocationRecord>,
216    /// Resource-based policy statements (for AddPermission / RemovePermission).
217    pub policy_statements: HashMap<String, serde_json::Value>,
218    /// Tags attached to this function.
219    pub tags: HashMap<String, String>,
220    /// Reserved concurrent executions ceiling per PutFunctionConcurrency.
221    /// `None` means unreserved — the function shares the account pool.
222    pub reserved_concurrent_executions: Option<u32>,
223    /// Provisioned concurrency configurations keyed by qualifier (alias name
224    /// or function version). Each entry tracks the requested capacity along
225    /// with a simulated state machine that flips IN_PROGRESS -> READY.
226    pub provisioned_concurrency: HashMap<String, ProvisionedConcurrencyConfig>,
227    /// CPU architecture set: `["x86_64"]` or `["arm64"]`. Defaults to
228    /// `["x86_64"]` per AWS.
229    pub architectures: Vec<String>,
230    /// `/tmp` size in MiB. Defaults to 512; AWS allows 512..=10240.
231    pub ephemeral_storage_size: u32,
232    /// "Zip" or "Image". Defaults to "Zip".
233    pub package_type: String,
234    /// Optional layer-version ARNs attached to the function.
235    pub layers: Vec<String>,
236    /// VpcConfig as supplied by the caller plus the synthesized VpcId field.
237    pub vpc_config: Option<serde_json::Value>,
238    /// DeadLetterConfig (`{ TargetArn }`).
239    pub dead_letter_config: Option<serde_json::Value>,
240    /// TracingConfig (`{ Mode }`). Defaults to `{ Mode: "PassThrough" }`.
241    pub tracing_config: Option<serde_json::Value>,
242    /// KMS key ARN used to encrypt environment variables at rest.
243    pub kms_key_arn: Option<String>,
244    /// EFS mounts: array of `{ Arn, LocalMountPath }`.
245    pub file_system_configs: Option<serde_json::Value>,
246    /// LoggingConfig (`{ LogFormat, ApplicationLogLevel, SystemLogLevel, LogGroup }`).
247    pub logging_config: Option<serde_json::Value>,
248    /// SnapStart configuration. Stored as supplied; the serializer adds the
249    /// computed `OptimizationStatus` field.
250    pub snap_start: Option<serde_json::Value>,
251    /// ImageConfig for container-image functions.
252    pub image_config: Option<serde_json::Value>,
253}
254
255/// Provisioned concurrency configuration for a single (function, qualifier)
256/// pair. Real Lambda transitions IN_PROGRESS -> READY asynchronously; we
257/// flip immediately because the emulator never has provisioning latency.
258#[derive(Debug, Clone)]
259pub struct ProvisionedConcurrencyConfig {
260    pub qualifier: String,
261    pub requested_provisioned_concurrent_executions: u32,
262    pub allocated_provisioned_concurrent_executions: u32,
263    pub available_provisioned_concurrent_executions: u32,
264    pub status: String, // IN_PROGRESS | READY | FAILED
265    pub status_reason: Option<String>,
266    pub last_modified: String,
267}
268
269/// A function URL configuration.
270#[derive(Debug, Clone)]
271pub struct FunctionUrlConfig {
272    /// Kept for potential admin console use.
273    #[allow(dead_code)]
274    pub function_name: String,
275    pub function_arn: String,
276    pub function_url: String,
277    pub auth_type: String,
278    pub cors: Option<serde_json::Value>,
279    pub creation_time: String,
280    pub last_modified_time: String,
281}
282
283#[derive(Debug, Clone)]
284pub struct FunctionVersion {
285    pub version: String,
286    pub description: String,
287    pub code_sha256: String,
288    pub code_size: u64,
289    pub code: Option<Body>,
290    pub last_modified: String,
291}
292
293#[derive(Debug, Clone)]
294pub struct Alias {
295    pub name: String,
296    pub arn: String,
297    pub function_version: String,
298    pub description: String,
299    /// Traffic-shifting weights: `version → fraction in [0, 1]`. When set,
300    /// invocations through the alias split between `function_version` and
301    /// the listed versions per their weights. Must total ≤ 1; the
302    /// implicit remainder is routed to `function_version`.
303    pub routing_config: HashMap<String, f64>,
304}
305
306/// Stored for debugging and the admin console — fields read externally.
307#[allow(dead_code)]
308#[derive(Debug, Clone)]
309pub struct InvocationRecord {
310    pub invocation_id: String,
311    pub invocation_type: String,
312    pub payload: serde_json::Value,
313    pub response: serde_json::Value,
314    pub status_code: u16,
315    pub timestamp: String,
316}
317
318#[derive(Debug, Clone)]
319pub struct EventSourceMapping {
320    pub uuid: String,
321    pub event_source_arn: String,
322    pub function_arn: String,
323    pub batch_size: u32,
324    /// Stored for potential future use / admin console.
325    #[allow(dead_code)]
326    pub enabled: bool,
327    pub state: String,
328    pub last_modified: String,
329    /// TRIM_HORIZON | LATEST | AT_TIMESTAMP — only meaningful for Kinesis/DDB streams.
330    pub starting_position: Option<String>,
331    pub starting_position_timestamp: Option<f64>,
332    pub maximum_batching_window_in_seconds: u32,
333    pub maximum_record_age_in_seconds: Option<i32>,
334    pub bisect_batch_on_function_error: bool,
335    pub maximum_retry_attempts: Option<i32>,
336    pub parallelization_factor: Option<u32>,
337    pub tumbling_window_in_seconds: Option<u32>,
338    /// Raw FilterCriteria JSON: { "Filters": [{ "Pattern": "..." }, ...] }.
339    pub filter_criteria: Option<serde_json::Value>,
340    /// DestinationConfig.OnFailure.Destination ARN — receives failed batches.
341    pub destination_on_failure: Option<String>,
342    pub function_response_types: Vec<String>,
343    /// Last poll result, surfaced via Get/List for diagnostics.
344    /// "OK", "PROBLEM: <message>", or "No records processed".
345    pub last_processing_result: String,
346    /// Per-shard iterator state for Kinesis/DDB-stream pollers so we don't
347    /// re-deliver records on every tick. Keyed by shard id.
348    pub shard_iterators: HashMap<String, String>,
349    /// Tags attached via `TagResource` against the ESM ARN.
350    pub tags: HashMap<String, String>,
351}
352
353#[derive(Debug, Serialize, Deserialize)]
354pub struct LambdaStateSnapshot {
355    pub functions: Vec<FunctionSnapshot>,
356}
357
358#[derive(Debug, Serialize, Deserialize)]
359pub struct LambdaRegionSnapshot {
360    pub account_id: String,
361    pub region: String,
362    pub functions: Vec<FunctionSnapshot>,
363}
364
365#[derive(Debug, Serialize, Deserialize)]
366pub struct FunctionSnapshot {
367    pub account_id: String,
368    pub region: String,
369    pub name: String,
370    pub arn: String,
371    pub runtime: Option<String>,
372    pub role: String,
373    pub handler: Option<String>,
374    pub description: String,
375    pub timeout: u32,
376    pub memory_size: u32,
377    pub code_sha256: String,
378    pub code_size: u64,
379    pub environment: HashMap<String, String>,
380    pub version: String,
381    pub versions: Vec<FunctionVersionSnapshot>,
382    pub aliases: HashMap<String, AliasSnapshot>,
383    pub last_modified: String,
384    pub state: String,
385    #[serde(default)]
386    pub policy_statements: HashMap<String, serde_json::Value>,
387    #[serde(default)]
388    pub tags: HashMap<String, String>,
389    #[serde(default = "default_architectures")]
390    pub architectures: Vec<String>,
391    #[serde(default = "default_ephemeral_storage_size")]
392    pub ephemeral_storage_size: u32,
393    #[serde(default = "default_package_type")]
394    pub package_type: String,
395    #[serde(default)]
396    pub layers: Vec<String>,
397    #[serde(default)]
398    pub vpc_config: Option<serde_json::Value>,
399    #[serde(default)]
400    pub dead_letter_config: Option<serde_json::Value>,
401    #[serde(default)]
402    pub tracing_config: Option<serde_json::Value>,
403    #[serde(default)]
404    pub kms_key_arn: Option<String>,
405    #[serde(default)]
406    pub file_system_configs: Option<serde_json::Value>,
407    #[serde(default)]
408    pub logging_config: Option<serde_json::Value>,
409    #[serde(default)]
410    pub snap_start: Option<serde_json::Value>,
411    #[serde(default)]
412    pub image_config: Option<serde_json::Value>,
413}
414
415fn default_architectures() -> Vec<String> {
416    vec!["x86_64".to_string()]
417}
418
419fn default_ephemeral_storage_size() -> u32 {
420    512
421}
422
423fn default_package_type() -> String {
424    "Zip".to_string()
425}
426
427#[derive(Debug, Serialize, Deserialize)]
428pub struct FunctionVersionSnapshot {
429    pub version: String,
430    pub description: String,
431    pub code_sha256: String,
432    pub code_size: u64,
433    pub last_modified: String,
434}
435
436#[derive(Debug, Serialize, Deserialize)]
437pub struct AliasSnapshot {
438    pub name: String,
439    pub arn: String,
440    pub function_version: String,
441    pub description: String,
442    #[serde(default)]
443    pub routing_config: HashMap<String, f64>,
444}
445
446#[derive(Debug, Clone)]
447pub struct LayerVersion {
448    pub layer_name: String,
449    pub layer_arn: String,
450    pub version_arn: String,
451    pub version: u64,
452    pub description: String,
453    pub compatible_runtimes: Vec<String>,
454    pub code_sha256: String,
455    pub code_size: u64,
456    /// Raw zip bytes stored for future execution support.
457    #[allow(dead_code)]
458    pub code_data: Option<Vec<u8>>,
459    pub created_date: String,
460    /// Tags attached via `TagResource` against the layer-version ARN.
461    pub tags: HashMap<String, String>,
462}