Skip to main content

fakecloud_lambda/
service.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::{Method, StatusCode};
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::ContainerRuntime;
15use crate::state::{
16    EventSourceMapping, LambdaFunction, LambdaSnapshot, LambdaState, SharedLambdaState,
17    LAMBDA_SNAPSHOT_SCHEMA_VERSION,
18};
19
20/// Lambda actions whose URL `resource_name` slot is a `FunctionName`
21/// (and therefore accepts ARN / partial ARN / `name:qualifier` forms).
22/// Layer / event-source-mapping / code-signing-config actions key off
23/// other resource identifiers and are excluded.
24pub(crate) fn action_takes_function_name(action: &str) -> bool {
25    matches!(
26        action,
27        "GetFunction"
28            | "DeleteFunction"
29            | "Invoke"
30            | "InvokeAsync"
31            | "InvokeWithResponseStream"
32            | "PublishVersion"
33            | "ListVersionsByFunction"
34            | "AddPermission"
35            | "RemovePermission"
36            | "GetPolicy"
37            | "GetFunctionConfiguration"
38            | "UpdateFunctionConfiguration"
39            | "UpdateFunctionCode"
40            | "GetFunctionConcurrency"
41            | "PutFunctionConcurrency"
42            | "DeleteFunctionConcurrency"
43            | "PutProvisionedConcurrencyConfig"
44            | "GetProvisionedConcurrencyConfig"
45            | "DeleteProvisionedConcurrencyConfig"
46            | "ListProvisionedConcurrencyConfigs"
47            | "PutFunctionEventInvokeConfig"
48            | "UpdateFunctionEventInvokeConfig"
49            | "GetFunctionEventInvokeConfig"
50            | "DeleteFunctionEventInvokeConfig"
51            | "ListFunctionEventInvokeConfigs"
52            | "CreateFunctionUrlConfig"
53            | "UpdateFunctionUrlConfig"
54            | "GetFunctionUrlConfig"
55            | "DeleteFunctionUrlConfig"
56            | "ListFunctionUrlConfigs"
57            | "PutFunctionCodeSigningConfig"
58            | "GetFunctionCodeSigningConfig"
59            | "DeleteFunctionCodeSigningConfig"
60            | "GetFunctionScalingConfig"
61            | "PutFunctionRecursionConfig"
62            | "GetFunctionRecursionConfig"
63            | "CreateAlias"
64            | "GetAlias"
65            | "ListAliases"
66            | "UpdateAlias"
67            | "DeleteAlias"
68            | "PutRuntimeManagementConfig"
69            | "GetRuntimeManagementConfig"
70    )
71}
72
73/// Strip an ARN, partial ARN, or trailing `:qualifier` from a Lambda
74/// `FunctionName` input down to the bare function name used as the
75/// state map key. AWS Lambda accepts four forms in URL path slots and
76/// API params:
77///
78///   - `MyFunction`
79///   - `MyFunction:Qualifier`
80///   - `123456789012:function:MyFunction[:Qualifier]`           (partial ARN)
81///   - `arn:aws:lambda:REGION:ACCOUNT:function:MyFunction[:Qualifier]`
82///
83/// Inputs that don't match any of those structures are returned
84/// unchanged. The qualifier (version or alias) is dropped because most
85/// callers look up the function by name and resolve qualifier
86/// separately.
87pub(crate) fn normalize_function_name(input: &str) -> String {
88    if input.is_empty() {
89        return String::new();
90    }
91
92    // SDKs URL-encode `:` in path segments, so `arn:aws:lambda:...`
93    // arrives as `arn%3Aaws%3Alambda%3A...`. Decode first; legitimate
94    // function names contain no percent-encoded characters, so this is
95    // safe for the bare-name path too.
96    let decoded = percent_encoding::percent_decode_str(input)
97        .decode_utf8_lossy()
98        .into_owned();
99    let input = decoded.as_str();
100
101    // Full ARN: arn:aws:lambda:REGION:ACCOUNT:function:NAME[:QUALIFIER]
102    if let Some(rest) = input.strip_prefix("arn:aws:lambda:") {
103        let parts: Vec<&str> = rest.splitn(5, ':').collect();
104        // parts: [region, account, "function", name, qualifier?]
105        if parts.len() >= 4 && parts[2] == "function" && !parts[3].is_empty() {
106            return parts[3].to_string();
107        }
108        return input.to_string();
109    }
110
111    // Partial ARN: ACCOUNT:function:NAME[:QUALIFIER]
112    let parts: Vec<&str> = input.splitn(4, ':').collect();
113    if parts.len() >= 3 && parts[1] == "function" && parts[0].chars().all(|c| c.is_ascii_digit()) {
114        if !parts[2].is_empty() {
115            return parts[2].to_string();
116        }
117        return input.to_string();
118    }
119
120    // Bare name with qualifier: NAME:QUALIFIER. Only apply when the
121    // input contains exactly one colon and the name part is a valid
122    // Lambda function-name token, so malformed ARNs (e.g. wrong service
123    // or wrong format) fall through unchanged rather than getting their
124    // first colon-segment returned.
125    if input.matches(':').count() == 1 {
126        if let Some((name, _qualifier)) = input.split_once(':') {
127            if !name.is_empty() && name.chars().all(is_function_name_char) {
128                return name.to_string();
129            }
130        }
131    }
132
133    input.to_string()
134}
135
136fn is_function_name_char(c: char) -> bool {
137    c.is_ascii_alphanumeric() || c == '-' || c == '_'
138}
139
140/// AWS bounds `EphemeralStorage.Size` to `[512, 10240]` MiB. Anything
141/// outside that range is rejected at the API edge with
142/// `InvalidParameterValueException`, matching the real Lambda control
143/// plane. Returns the validated size unchanged on success.
144pub(crate) fn validate_ephemeral_storage(size: i64) -> Result<i64, AwsServiceError> {
145    if !(512..=10240).contains(&size) {
146        return Err(AwsServiceError::aws_error(
147            StatusCode::BAD_REQUEST,
148            "InvalidParameterValueException",
149            format!(
150                "Value {size} at 'ephemeralStorage.size' failed to satisfy constraint: \
151                 Member must satisfy constraint: [Member must have value less than or equal to 10240, \
152                 Member must have value greater than or equal to 512]"
153            ),
154        ));
155    }
156    Ok(size)
157}
158
159/// All fields of a `CreateFunction` request, already parsed and
160/// defaulted. The code zip (if any) is eagerly base64-decoded so the
161/// caller can hash it without doing the decode again.
162struct CreateFunctionInput {
163    function_name: String,
164    runtime: String,
165    role: String,
166    handler: String,
167    description: String,
168    timeout: i64,
169    memory_size: i64,
170    package_type: String,
171    tags: BTreeMap<String, String>,
172    environment: BTreeMap<String, String>,
173    architectures: Vec<String>,
174    code_zip: Option<Vec<u8>>,
175    code_fallback: Vec<u8>,
176    image_uri: Option<String>,
177    layer_arns: Vec<String>,
178    tracing_mode: Option<String>,
179    kms_key_arn: Option<String>,
180    ephemeral_storage_size: Option<i64>,
181    vpc_config: Option<serde_json::Value>,
182    snap_start: Option<serde_json::Value>,
183    dead_letter_config_arn: Option<String>,
184    file_system_configs: Vec<serde_json::Value>,
185    logging_config: Option<serde_json::Value>,
186    image_config: Option<serde_json::Value>,
187}
188
189impl CreateFunctionInput {
190    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
191        let function_name = body["FunctionName"]
192            .as_str()
193            .ok_or_else(|| {
194                AwsServiceError::aws_error(
195                    StatusCode::BAD_REQUEST,
196                    "InvalidParameterValueException",
197                    "FunctionName is required",
198                )
199            })?
200            .to_string();
201
202        let tags: BTreeMap<String, String> = body["Tags"]
203            .as_object()
204            .map(|m| {
205                m.iter()
206                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
207                    .collect()
208            })
209            .unwrap_or_default();
210
211        let environment: BTreeMap<String, String> = body["Environment"]["Variables"]
212            .as_object()
213            .map(|m| {
214                m.iter()
215                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
216                    .collect()
217            })
218            .unwrap_or_default();
219
220        let architectures = body["Architectures"]
221            .as_array()
222            .map(|a| {
223                a.iter()
224                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
225                    .collect()
226            })
227            .unwrap_or_else(|| vec!["x86_64".to_string()]);
228
229        let code_zip: Option<Vec<u8>> = match body["Code"]["ZipFile"].as_str() {
230            Some(b64) => Some(
231                base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
232                    |_| {
233                        AwsServiceError::aws_error(
234                            StatusCode::BAD_REQUEST,
235                            "InvalidParameterValueException",
236                            "Could not decode Code.ZipFile: invalid base64",
237                        )
238                    },
239                )?,
240            ),
241            None => None,
242        };
243
244        let code_fallback = serde_json::to_vec(&body["Code"]).unwrap_or_default();
245
246        let package_type = body["PackageType"].as_str().unwrap_or("Zip").to_string();
247        // ImageUri belongs to `PackageType=Image` functions. Silently
248        // dropping it on `Zip` functions avoids GetFunction returning
249        // ECR code metadata for a Zip-based function (AWS ignores the
250        // field entirely in that case too).
251        let image_uri = if package_type == "Image" {
252            body["Code"]["ImageUri"].as_str().map(String::from)
253        } else {
254            None
255        };
256
257        // PackageType=Image requires Code.ImageUri; PackageType=Zip requires
258        // code content. Reject inconsistent shapes with AWS's error code so
259        // SDK-level validation tests see matching behaviour.
260        if package_type == "Image" && image_uri.is_none() {
261            return Err(AwsServiceError::aws_error(
262                StatusCode::BAD_REQUEST,
263                "InvalidParameterValueException",
264                "Code.ImageUri is required when PackageType is Image",
265            ));
266        }
267
268        let layer_arns: Vec<String> = body["Layers"]
269            .as_array()
270            .map(|arr| {
271                arr.iter()
272                    .filter_map(|v| v.as_str().map(String::from))
273                    .collect()
274            })
275            .unwrap_or_default();
276
277        let tracing_mode = body["TracingConfig"]["Mode"].as_str().map(String::from);
278        let kms_key_arn = body["KMSKeyArn"].as_str().map(String::from);
279        let ephemeral_storage_size = match body["EphemeralStorage"]["Size"].as_i64() {
280            Some(size) => Some(validate_ephemeral_storage(size)?),
281            None => None,
282        };
283        let vpc_config = body["VpcConfig"]
284            .is_object()
285            .then(|| body["VpcConfig"].clone());
286        let snap_start = body["SnapStart"]
287            .is_object()
288            .then(|| body["SnapStart"].clone());
289        let dead_letter_config_arn = body["DeadLetterConfig"]["TargetArn"]
290            .as_str()
291            .map(String::from);
292        let file_system_configs = body["FileSystemConfigs"]
293            .as_array()
294            .cloned()
295            .unwrap_or_default();
296        let logging_config = body["LoggingConfig"]
297            .is_object()
298            .then(|| body["LoggingConfig"].clone());
299        let image_config = body["ImageConfig"]
300            .is_object()
301            .then(|| body["ImageConfig"].clone());
302
303        Ok(Self {
304            function_name,
305            runtime: body["Runtime"].as_str().unwrap_or("python3.12").to_string(),
306            role: body["Role"].as_str().unwrap_or("").to_string(),
307            handler: body["Handler"]
308                .as_str()
309                .unwrap_or("index.handler")
310                .to_string(),
311            description: body["Description"].as_str().unwrap_or("").to_string(),
312            timeout: body["Timeout"].as_i64().unwrap_or(3),
313            memory_size: body["MemorySize"].as_i64().unwrap_or(128),
314            package_type,
315            tags,
316            environment,
317            architectures,
318            code_zip,
319            code_fallback,
320            image_uri,
321            layer_arns,
322            tracing_mode,
323            kms_key_arn,
324            ephemeral_storage_size,
325            vpc_config,
326            snap_start,
327            dead_letter_config_arn,
328            file_system_configs,
329            logging_config,
330            image_config,
331        })
332    }
333}
334
335/// AWS Lambda's InvocationType: synchronous, async (event), or dry-run.
336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337pub enum InvocationType {
338    RequestResponse,
339    Event,
340    DryRun,
341}
342
343impl InvocationType {
344    pub fn from_header(value: Option<&str>) -> Self {
345        match value {
346            Some("Event") => Self::Event,
347            Some("DryRun") => Self::DryRun,
348            _ => Self::RequestResponse,
349        }
350    }
351}
352
353/// Route an async-invoke result to the configured OnSuccess / OnFailure
354/// destination. Destination is matched by ARN scheme: SQS, SNS, EventBridge,
355/// or another Lambda. Mirrors the AWS Lambda destinations record schema.
356fn route_to_destination(
357    bus: Arc<fakecloud_core::delivery::DeliveryBus>,
358    function_arn: &str,
359    request_payload: &[u8],
360    result: &Result<Vec<u8>, String>,
361    destination_config: Option<&serde_json::Value>,
362) {
363    let Some(cfg) = destination_config else {
364        return;
365    };
366    let (key, condition, response_value): (&str, &str, serde_json::Value) = match result {
367        Ok(bytes) => (
368            "OnSuccess",
369            "Success",
370            serde_json::from_slice(bytes).unwrap_or(serde_json::Value::Null),
371        ),
372        Err(err) => (
373            "OnFailure",
374            "RetriesExhausted",
375            serde_json::json!({ "errorMessage": err }),
376        ),
377    };
378    let Some(dest) = cfg
379        .get(key)
380        .and_then(|v| v.get("Destination"))
381        .and_then(|v| v.as_str())
382    else {
383        return;
384    };
385    let request_payload_v: serde_json::Value =
386        serde_json::from_slice(request_payload).unwrap_or(serde_json::Value::Null);
387    let record = serde_json::json!({
388        "version": "1.0",
389        "timestamp": chrono::Utc::now().to_rfc3339(),
390        "requestContext": {
391            "requestId": uuid::Uuid::new_v4().to_string(),
392            "functionArn": format!("{function_arn}:$LATEST"),
393            "condition": condition,
394            "approximateInvokeCount": 1,
395        },
396        "requestPayload": request_payload_v,
397        "responseContext": {
398            "statusCode": 200,
399            "executedVersion": "$LATEST",
400        },
401        "responsePayload": response_value,
402    });
403    let body = record.to_string();
404    if dest.contains(":sqs:") {
405        bus.send_to_sqs(dest, &body, &std::collections::HashMap::new());
406    } else if dest.contains(":sns:") {
407        bus.publish_to_sns(dest, &body, None);
408    } else if dest.contains(":lambda:") {
409        let dest = dest.to_string();
410        let payload = body.clone();
411        tokio::spawn(async move {
412            let _ = bus.invoke_lambda(&dest, &payload).await;
413        });
414    } else if dest.contains(":events:") || dest.contains(":eventbridge:") {
415        let detail_type = if result.is_ok() {
416            "Lambda Function Invocation Result - Success"
417        } else {
418            "Lambda Function Invocation Result - Failure"
419        };
420        bus.put_event_to_eventbridge("lambda", detail_type, &body, "default");
421    }
422}
423
424/// Decrements the per-function in-flight counter on drop. Lives as
425/// long as the invocation it gates — for synchronous invokes that's
426/// the function call's stack frame; for `Event` invokes the guard is
427/// moved into the spawned task so the counter drops only when the
428/// async work finishes.
429pub(crate) struct ConcurrencyGuard {
430    pub(crate) map: Arc<parking_lot::RwLock<BTreeMap<String, i64>>>,
431    pub(crate) key: String,
432}
433
434impl Drop for ConcurrencyGuard {
435    fn drop(&mut self) {
436        let mut m = self.map.write();
437        let n = m.get(&self.key).copied().unwrap_or(0);
438        if n <= 1 {
439            m.remove(&self.key);
440        } else {
441            m.insert(self.key.clone(), n - 1);
442        }
443    }
444}
445
446/// Map an Invoke `Qualifier` (alias name, numeric version, or
447/// `$LATEST`) to a concrete numeric version string. Aliases with a
448/// `RoutingConfig.AdditionalVersionWeights` table do a weighted pick
449/// across the alias's primary `function_version` plus the additional
450/// True when `prev` is byte-equivalent to `live` for every field
451/// that `PublishVersion` would otherwise capture into a new snapshot.
452/// Used to short-circuit a no-op publish (AWS-style idempotency:
453/// re-publishing without any change returns the previous version
454/// unchanged). The comparison spans code identity (sha + size),
455/// configuration (runtime/handler/role/timeout/memory/env/layers/...)
456/// and every advanced field round-tripped through
457/// `function_config_json`. The caller is responsible for resolving
458/// the `effective_description` (caller-supplied override wins over
459/// the live `$LATEST` description, matching real PublishVersion
460/// semantics).
461fn function_config_unchanged_for_publish(
462    prev: &LambdaFunction,
463    live: &LambdaFunction,
464    effective_description: &str,
465) -> bool {
466    prev.code_sha256 == live.code_sha256
467        && prev.code_size == live.code_size
468        && prev.image_uri == live.image_uri
469        && prev.package_type == live.package_type
470        && prev.runtime == live.runtime
471        && prev.role == live.role
472        && prev.handler == live.handler
473        && prev.description == effective_description
474        && prev.timeout == live.timeout
475        && prev.memory_size == live.memory_size
476        && prev.environment == live.environment
477        && prev.architectures == live.architectures
478        && prev.layers.len() == live.layers.len()
479        && prev
480            .layers
481            .iter()
482            .zip(live.layers.iter())
483            .all(|(a, b)| a.arn == b.arn && a.code_size == b.code_size)
484        && prev.tracing_mode == live.tracing_mode
485        && prev.kms_key_arn == live.kms_key_arn
486        && prev.ephemeral_storage_size == live.ephemeral_storage_size
487        && prev.vpc_config == live.vpc_config
488        && prev.dead_letter_config_arn == live.dead_letter_config_arn
489        && prev.file_system_configs == live.file_system_configs
490        && prev.logging_config == live.logging_config
491        && prev.image_config == live.image_config
492        && prev.signing_profile_version_arn == live.signing_profile_version_arn
493        && prev.signing_job_arn == live.signing_job_arn
494        && prev.runtime_version_config == live.runtime_version_config
495        && snap_start_apply_on_eq(prev.snap_start.as_ref(), live.snap_start.as_ref())
496}
497
498/// Compare two `SnapStart` configs by `ApplyOn` only — that's the
499/// caller-supplied knob. `OptimizationStatus` is server-side state
500/// that PublishVersion mutates on snapshots (flipping to "On" when
501/// ApplyOn=PublishedVersions) while $LATEST stays "Off", so a deep
502/// equality check here would never match on a SnapStart-enabled
503/// function and PublishVersion would never be idempotent. Treating
504/// `None` and `{ApplyOn:"None"}` as equivalent matches AWS, which
505/// emits the latter when the field is unset.
506fn snap_start_apply_on_eq(prev: Option<&Value>, live: Option<&Value>) -> bool {
507    let prev_apply = prev
508        .and_then(|v| v.get("ApplyOn"))
509        .and_then(|v| v.as_str())
510        .unwrap_or("None");
511    let live_apply = live
512        .and_then(|v| v.get("ApplyOn"))
513        .and_then(|v| v.as_str())
514        .unwrap_or("None");
515    prev_apply == live_apply
516}
517
518/// versions in the weight map. Returns `None` for `$LATEST` /
519/// unqualified invokes (caller uses the live `$LATEST` config).
520pub(crate) fn resolve_qualifier_to_version(
521    state: &LambdaState,
522    function_name: &str,
523    qualifier: Option<&str>,
524) -> Option<String> {
525    let q = qualifier?;
526    if q == "$LATEST" {
527        return None;
528    }
529    if q.chars().all(|c| c.is_ascii_digit()) {
530        return Some(q.to_string());
531    }
532    let alias_key = format!("{function_name}:{q}");
533    let alias = state.aliases.get(&alias_key)?;
534    let primary = alias.function_version.clone();
535    let routing = alias
536        .routing_config
537        .as_ref()
538        .and_then(|rc| rc.get("AdditionalVersionWeights"))
539        .and_then(|m| m.as_object());
540    let Some(weights) = routing else {
541        return Some(primary);
542    };
543    // Sum of additional weights ∈ [0,1]; primary gets 1 - sum. Pick
544    // uniformly in [0,1) and walk the cumulative weight axis.
545    let mut additional: Vec<(String, f64)> = Vec::with_capacity(weights.len());
546    let mut sum: f64 = 0.0;
547    for (ver, w) in weights {
548        let weight = w.as_f64().unwrap_or(0.0).clamp(0.0, 1.0);
549        sum += weight;
550        additional.push((ver.clone(), weight));
551    }
552    let primary_weight = (1.0 - sum).max(0.0);
553    let pick: f64 = {
554        // Mix a thread-local LCG state with wall-clock nanos so
555        // back-to-back calls within a single process tick still
556        // produce distinct picks. Invoke routing only needs fairness
557        // over many invokes, not crypto randomness.
558        use std::cell::Cell;
559        thread_local! {
560            static RNG: Cell<u64> = const { Cell::new(0x9E37_79B9_7F4A_7C15) };
561        }
562        let now_nanos = std::time::SystemTime::now()
563            .duration_since(std::time::UNIX_EPOCH)
564            .map(|d| d.as_nanos() as u64)
565            .unwrap_or(0);
566        RNG.with(|cell| {
567            let mut s = cell.get() ^ now_nanos;
568            // splitmix64 step
569            s = s.wrapping_add(0x9E37_79B9_7F4A_7C15);
570            let mut z = s;
571            z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
572            z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
573            z ^= z >> 31;
574            cell.set(s);
575            (z >> 11) as f64 / ((1u64 << 53) as f64)
576        })
577    };
578    let mut acc = primary_weight;
579    if pick < acc {
580        return Some(primary);
581    }
582    for (ver, w) in &additional {
583        acc += w;
584        if pick < acc {
585            return Some(ver.clone());
586        }
587    }
588    Some(primary)
589}
590
591pub struct LambdaService {
592    pub(crate) state: SharedLambdaState,
593    pub(crate) runtime: Option<Arc<ContainerRuntime>>,
594    snapshot_store: Option<Arc<dyn SnapshotStore>>,
595    snapshot_lock: Arc<AsyncMutex<()>>,
596    pub(crate) delivery_bus: Option<Arc<fakecloud_core::delivery::DeliveryBus>>,
597    pub(crate) role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
598    pub(crate) s3_delivery: Option<Arc<dyn fakecloud_core::delivery::S3Delivery>>,
599    /// Per-account-per-function in-flight invocation count, used to
600    /// gate `Invoke` against `PutFunctionConcurrency`'s
601    /// `ReservedConcurrentExecutions` ceiling. Keyed by
602    /// `{account_id}:{function_name}`. Live counter — incremented at
603    /// invoke entry, decremented when the invocation completes (or
604    /// when the spawned async task finishes for `Event` invokes).
605    pub(crate) inflight_invocations: Arc<parking_lot::RwLock<BTreeMap<String, i64>>>,
606}
607
608impl LambdaService {
609    pub fn new(state: SharedLambdaState) -> Self {
610        Self {
611            state,
612            runtime: None,
613            snapshot_store: None,
614            snapshot_lock: Arc::new(AsyncMutex::new(())),
615            delivery_bus: None,
616            role_trust_validator: None,
617            s3_delivery: None,
618            inflight_invocations: Arc::new(parking_lot::RwLock::new(BTreeMap::new())),
619        }
620    }
621
622    pub fn with_s3_delivery(mut self, s3: Arc<dyn fakecloud_core::delivery::S3Delivery>) -> Self {
623        self.s3_delivery = Some(s3);
624        self
625    }
626
627    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
628        self.runtime = Some(runtime);
629        self
630    }
631
632    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
633        self.snapshot_store = Some(store);
634        self
635    }
636
637    pub fn with_delivery_bus(mut self, bus: Arc<fakecloud_core::delivery::DeliveryBus>) -> Self {
638        self.delivery_bus = Some(bus);
639        self
640    }
641
642    pub fn with_role_trust_validator(
643        mut self,
644        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
645    ) -> Self {
646        self.role_trust_validator = Some(validator);
647        self
648    }
649
650    async fn save_snapshot(&self) {
651        let Some(store) = self.snapshot_store.clone() else {
652            return;
653        };
654        let _guard = self.snapshot_lock.lock().await;
655        let snapshot = LambdaSnapshot {
656            schema_version: LAMBDA_SNAPSHOT_SCHEMA_VERSION,
657            accounts: Some(self.state.read().clone()),
658            state: None,
659        };
660        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
661            let bytes = serde_json::to_vec(&snapshot)
662                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
663            store.save(&bytes)
664        })
665        .await;
666        match join {
667            Ok(Ok(())) => {}
668            Ok(Err(err)) => tracing::error!(%err, "failed to write lambda snapshot"),
669            Err(err) => tracing::error!(%err, "lambda snapshot task panicked"),
670        }
671    }
672
673    /// Determine the action from the HTTP method and path segments.
674    /// Lambda uses REST-style routing:
675    ///   POST   /2015-03-31/functions                         -> CreateFunction
676    ///   GET    /2015-03-31/functions                         -> ListFunctions
677    ///   GET    /2015-03-31/functions/{name}                  -> GetFunction
678    ///   DELETE /2015-03-31/functions/{name}                  -> DeleteFunction
679    ///   POST   /2015-03-31/functions/{name}/invocations      -> Invoke
680    ///   POST   /2015-03-31/functions/{name}/versions         -> PublishVersion
681    ///   POST   /2015-03-31/event-source-mappings             -> CreateEventSourceMapping
682    ///   GET    /2015-03-31/event-source-mappings             -> ListEventSourceMappings
683    ///   GET    /2015-03-31/event-source-mappings/{uuid}      -> GetEventSourceMapping
684    ///   DELETE /2015-03-31/event-source-mappings/{uuid}      -> DeleteEventSourceMapping
685    fn resolve_action(req: &AwsRequest) -> Option<(&'static str, Option<String>)> {
686        let segs = &req.path_segments;
687        if segs.is_empty() {
688            return None;
689        }
690        // The Lambda data API uses many date prefixes (one per
691        // operation family). Recognise any well-formed YYYY-MM-DD
692        // prefix and route based on the path structure that follows.
693        let prefix = segs[0].as_str();
694
695        // Account settings + InvokeAsync — any prefix.
696        if segs.get(1).map(|s| s.as_str()) == Some("account-settings") && req.method == Method::GET
697        {
698            return Some(("GetAccountSettings", None));
699        }
700        if segs.get(1).map(|s| s.as_str()) == Some("functions")
701            && segs.get(3).map(|s| s.as_str()) == Some("invoke-async")
702            && req.method == Method::POST
703        {
704            return Some(("InvokeAsync", segs.get(2).map(|s| s.to_string())));
705        }
706        if segs.get(1).map(|s| s.as_str()) == Some("functions")
707            && segs.get(3).map(|s| s.as_str()) == Some("response-streaming-invocations")
708            && req.method == Method::POST
709        {
710            return Some((
711                "InvokeWithResponseStream",
712                segs.get(2).map(|s| s.to_string()),
713            ));
714        }
715
716        // Concurrency (reserved + provisioned) — any prefix.
717        if segs.get(1).map(|s| s.as_str()) == Some("functions")
718            && segs.get(3).map(|s| s.as_str()) == Some("concurrency")
719        {
720            let res = segs.get(2).map(|s| s.to_string());
721            return match req.method {
722                Method::PUT => Some(("PutFunctionConcurrency", res)),
723                Method::GET => Some(("GetFunctionConcurrency", res)),
724                Method::DELETE => Some(("DeleteFunctionConcurrency", res)),
725                _ => None,
726            };
727        }
728
729        // Provisioned concurrency at any prefix.
730        if segs.get(1).map(|s| s.as_str()) == Some("functions")
731            && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency")
732        {
733            let res = segs.get(2).map(|s| s.to_string());
734            return match req.method {
735                Method::PUT => Some(("PutProvisionedConcurrencyConfig", res)),
736                Method::GET => Some(("GetProvisionedConcurrencyConfig", res)),
737                Method::DELETE => Some(("DeleteProvisionedConcurrencyConfig", res)),
738                _ => None,
739            };
740        }
741        if segs.get(1).map(|s| s.as_str()) == Some("functions")
742            && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency-configs")
743            && req.method == Method::GET
744        {
745            return Some((
746                "ListProvisionedConcurrencyConfigs",
747                segs.get(2).map(|s| s.to_string()),
748            ));
749        }
750
751        // Event invoke config — any prefix.
752        if segs.get(1).map(|s| s.as_str()) == Some("functions")
753            && segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
754        {
755            let res = segs.get(2).map(|s| s.to_string());
756            return match req.method {
757                Method::POST => Some(("PutFunctionEventInvokeConfig", res)),
758                Method::PUT => Some(("UpdateFunctionEventInvokeConfig", res)),
759                Method::GET => Some(("GetFunctionEventInvokeConfig", res)),
760                Method::DELETE => Some(("DeleteFunctionEventInvokeConfig", res)),
761                _ => None,
762            };
763        }
764        if segs.get(1).map(|s| s.as_str()) == Some("functions")
765            && (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config-list")
766                || (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
767                    && segs.get(4).map(|s| s.as_str()) == Some("list")))
768            && req.method == Method::GET
769        {
770            return Some((
771                "ListFunctionEventInvokeConfigs",
772                segs.get(2).map(|s| s.to_string()),
773            ));
774        }
775
776        // Recursion config — any prefix.
777        if segs.get(1).map(|s| s.as_str()) == Some("functions")
778            && segs.get(3).map(|s| s.as_str()) == Some("recursion-config")
779        {
780            let res = segs.get(2).map(|s| s.to_string());
781            return match req.method {
782                Method::PUT => Some(("PutFunctionRecursionConfig", res)),
783                Method::GET => Some(("GetFunctionRecursionConfig", res)),
784                _ => None,
785            };
786        }
787
788        // Runtime management config — any prefix.
789        if segs.get(1).map(|s| s.as_str()) == Some("functions")
790            && segs.get(3).map(|s| s.as_str()) == Some("runtime-management-config")
791        {
792            let res = segs.get(2).map(|s| s.to_string());
793            return match req.method {
794                Method::PUT => Some(("PutRuntimeManagementConfig", res)),
795                Method::GET => Some(("GetRuntimeManagementConfig", res)),
796                _ => None,
797            };
798        }
799
800        // Code signing config (function and global) — any prefix.
801        if segs.get(1).map(|s| s.as_str()) == Some("functions")
802            && segs.get(3).map(|s| s.as_str()) == Some("code-signing-config")
803        {
804            let res = segs.get(2).map(|s| s.to_string());
805            return match req.method {
806                Method::PUT => Some(("PutFunctionCodeSigningConfig", res)),
807                Method::GET => Some(("GetFunctionCodeSigningConfig", res)),
808                Method::DELETE => Some(("DeleteFunctionCodeSigningConfig", res)),
809                _ => None,
810            };
811        }
812        if segs.get(1).map(|s| s.as_str()) == Some("code-signing-configs") {
813            let res = segs.get(2).map(|s| s.to_string());
814            return match (
815                req.method.clone(),
816                segs.len(),
817                segs.get(3).map(|s| s.as_str()),
818            ) {
819                (Method::POST, 2, _) => Some(("CreateCodeSigningConfig", None)),
820                (Method::GET, 2, _) => Some(("ListCodeSigningConfigs", None)),
821                (Method::GET, 3, _) => Some(("GetCodeSigningConfig", res)),
822                (Method::PUT, 3, _) => Some(("UpdateCodeSigningConfig", res)),
823                (Method::DELETE, 3, _) => Some(("DeleteCodeSigningConfig", res)),
824                (Method::GET, 4, Some("functions")) => {
825                    Some(("ListFunctionsByCodeSigningConfig", res))
826                }
827                _ => None,
828            };
829        }
830
831        // Tags resource ARN at any prefix.
832        if segs.get(1).map(|s| s.as_str()) == Some("tags") && segs.len() >= 3 {
833            let res = segs[2..].join("/");
834            return match req.method {
835                Method::POST => Some(("TagResource", Some(res))),
836                Method::DELETE => Some(("UntagResource", Some(res))),
837                Method::GET => Some(("ListTags", Some(res))),
838                _ => None,
839            };
840        }
841
842        // Function URL config + scaling config (any prefix).
843        if segs.get(1).map(|s| s.as_str()) == Some("functions")
844            && segs.get(3).map(|s| s.as_str()) == Some("url")
845        {
846            let res = segs.get(2).map(|s| s.to_string());
847            return match req.method {
848                Method::POST => Some(("CreateFunctionUrlConfig", res)),
849                Method::GET => Some(("GetFunctionUrlConfig", res)),
850                Method::PUT => Some(("UpdateFunctionUrlConfig", res)),
851                Method::DELETE => Some(("DeleteFunctionUrlConfig", res)),
852                _ => None,
853            };
854        }
855        if segs.get(1).map(|s| s.as_str()) == Some("function-urls") && req.method == Method::GET {
856            return Some(("ListFunctionUrlConfigs", None));
857        }
858        if segs.get(1).map(|s| s.as_str()) == Some("functions")
859            && segs.get(3).map(|s| s.as_str()) == Some("urls")
860            && req.method == Method::GET
861        {
862            return Some(("ListFunctionUrlConfigs", segs.get(2).map(|s| s.to_string())));
863        }
864        if segs.get(1).map(|s| s.as_str()) == Some("event-source-mappings")
865            && segs.get(3).map(|s| s.as_str()) == Some("scaling-config")
866        {
867            let res = segs.get(2).map(|s| s.to_string());
868            return match req.method {
869                Method::PUT => Some(("PutFunctionScalingConfig", res)),
870                Method::GET => Some(("GetFunctionScalingConfig", res)),
871                _ => None,
872            };
873        }
874
875        // NOTE: concurrency, event-invoke-config, recursion-config, and
876        // code-signing-configs routes are all handled by the prefix-agnostic
877        // blocks above. The previously-present date-specific blocks were
878        // dead code.
879
880        // /2018-10-31/layers
881        if prefix == "2018-10-31" && segs.get(1).map(|s| s.as_str()) == Some("layers") {
882            let layer = segs.get(2).map(|s| s.to_string());
883            let third = segs.get(3).map(|s| s.as_str());
884            let version = segs.get(4).map(|s| s.to_string());
885            return match (&req.method, segs.len(), third, version.is_some()) {
886                (&Method::GET, 2, _, _) => Some(("ListLayers", None)),
887                (&Method::POST, 4, Some("versions"), false) => Some(("PublishLayerVersion", layer)),
888                (&Method::GET, 4, Some("versions"), false) => Some(("ListLayerVersions", layer)),
889                (&Method::GET, 5, Some("versions"), true) => Some(("GetLayerVersion", version)),
890                (&Method::DELETE, 5, Some("versions"), true) => {
891                    Some(("DeleteLayerVersion", version))
892                }
893                (&Method::GET, 6, Some("versions"), true)
894                    if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
895                {
896                    Some(("GetLayerVersionPolicy", version))
897                }
898                (&Method::POST, 6, Some("versions"), true)
899                    if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
900                {
901                    Some(("AddLayerVersionPermission", version))
902                }
903                (&Method::DELETE, 7, Some("versions"), true)
904                    if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
905                {
906                    Some(("RemoveLayerVersionPermission", version))
907                }
908                _ => None,
909            };
910        }
911
912        // /2018-10-31/layers-by-arn
913        if prefix == "2018-10-31"
914            && segs.get(1).map(|s| s.as_str()) == Some("layers-by-arn")
915            && req.method == Method::GET
916        {
917            return Some(("GetLayerVersionByArn", None));
918        }
919
920        // NOTE: 2021-10-31/functions/{name}/url and ListFunctionUrlConfigs
921        // are handled by the prefix-agnostic blocks above.
922
923        if prefix != "2015-03-31" {
924            return None;
925        }
926
927        let collection = segs.get(1).map(|s| s.as_str());
928        let resource = segs.get(2).map(|s| s.to_string());
929        let third = segs.get(3).map(|s| s.as_str());
930        let fourth = segs.get(4).map(|s| s.as_str());
931
932        let action = match (&req.method, segs.len(), collection, third) {
933            (&Method::POST, 2, Some("functions"), _) => "CreateFunction",
934            (&Method::GET, 2, Some("functions"), _) => "ListFunctions",
935            (&Method::GET, 3, Some("functions"), _) => "GetFunction",
936            (&Method::DELETE, 3, Some("functions"), _) => "DeleteFunction",
937            (&Method::POST, 4, Some("functions"), Some("invocations")) => "Invoke",
938            (&Method::POST, 4, Some("functions"), Some("invoke-async")) => "InvokeAsync",
939            (&Method::POST, 4, Some("functions"), Some("response-streaming-invocations")) => {
940                "InvokeWithResponseStream"
941            }
942            (&Method::POST, 4, Some("functions"), Some("versions")) => "PublishVersion",
943            (&Method::GET, 4, Some("functions"), Some("versions")) => "ListVersionsByFunction",
944            (&Method::POST, 4, Some("functions"), Some("policy")) => "AddPermission",
945            (&Method::GET, 4, Some("functions"), Some("policy")) => "GetPolicy",
946            (&Method::DELETE, 5, Some("functions"), Some("policy")) => "RemovePermission",
947            (&Method::POST, 4, Some("functions"), Some("aliases")) => "CreateAlias",
948            (&Method::GET, 4, Some("functions"), Some("aliases")) => "ListAliases",
949            (&Method::GET, 5, Some("functions"), Some("aliases")) => "GetAlias",
950            (&Method::PUT, 5, Some("functions"), Some("aliases")) => "UpdateAlias",
951            (&Method::DELETE, 5, Some("functions"), Some("aliases")) => "DeleteAlias",
952            (&Method::GET, 4, Some("functions"), Some("configuration")) => {
953                "GetFunctionConfiguration"
954            }
955            (&Method::PUT, 4, Some("functions"), Some("configuration")) => {
956                "UpdateFunctionConfiguration"
957            }
958            (&Method::PUT, 4, Some("functions"), Some("code")) => "UpdateFunctionCode",
959            (&Method::PUT, 4, Some("functions"), Some("concurrency")) => "PutFunctionConcurrency",
960            (&Method::GET, 4, Some("functions"), Some("concurrency")) => "GetFunctionConcurrency",
961            (&Method::DELETE, 4, Some("functions"), Some("concurrency")) => {
962                "DeleteFunctionConcurrency"
963            }
964            (&Method::PUT, 4, Some("functions"), Some("provisioned-concurrency")) => {
965                "PutProvisionedConcurrencyConfig"
966            }
967            (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency")) => {
968                "GetProvisionedConcurrencyConfig"
969            }
970            (&Method::DELETE, 4, Some("functions"), Some("provisioned-concurrency")) => {
971                "DeleteProvisionedConcurrencyConfig"
972            }
973            (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency-configs")) => {
974                "ListProvisionedConcurrencyConfigs"
975            }
976            (&Method::PUT, 4, Some("functions"), Some("event-invoke-config")) => {
977                "UpdateFunctionEventInvokeConfig"
978            }
979            (&Method::POST, 4, Some("functions"), Some("event-invoke-config")) => {
980                "PutFunctionEventInvokeConfig"
981            }
982            (&Method::GET, 4, Some("functions"), Some("event-invoke-config")) => {
983                "GetFunctionEventInvokeConfig"
984            }
985            (&Method::DELETE, 4, Some("functions"), Some("event-invoke-config")) => {
986                "DeleteFunctionEventInvokeConfig"
987            }
988            (&Method::GET, 4, Some("functions"), Some("event-invoke-config-list")) => {
989                "ListFunctionEventInvokeConfigs"
990            }
991            (&Method::PUT, 4, Some("functions"), Some("code-signing-config")) => {
992                "PutFunctionCodeSigningConfig"
993            }
994            (&Method::GET, 4, Some("functions"), Some("code-signing-config")) => {
995                "GetFunctionCodeSigningConfig"
996            }
997            (&Method::DELETE, 4, Some("functions"), Some("code-signing-config")) => {
998                "DeleteFunctionCodeSigningConfig"
999            }
1000            (&Method::PUT, 4, Some("functions"), Some("runtime-management-config")) => {
1001                "PutRuntimeManagementConfig"
1002            }
1003            (&Method::GET, 4, Some("functions"), Some("runtime-management-config")) => {
1004                "GetRuntimeManagementConfig"
1005            }
1006            (&Method::PUT, 4, Some("functions"), Some("scaling-config")) => {
1007                "PutFunctionScalingConfig"
1008            }
1009            (&Method::GET, 4, Some("functions"), Some("scaling-config")) => {
1010                "GetFunctionScalingConfig"
1011            }
1012            (&Method::PUT, 4, Some("functions"), Some("recursion-config")) => {
1013                "PutFunctionRecursionConfig"
1014            }
1015            (&Method::GET, 4, Some("functions"), Some("recursion-config")) => {
1016                "GetFunctionRecursionConfig"
1017            }
1018            (&Method::POST, 2, Some("event-source-mappings"), _) => "CreateEventSourceMapping",
1019            (&Method::GET, 2, Some("event-source-mappings"), _) => "ListEventSourceMappings",
1020            (&Method::GET, 3, Some("event-source-mappings"), _) => "GetEventSourceMapping",
1021            (&Method::PUT, 3, Some("event-source-mappings"), _) => "UpdateEventSourceMapping",
1022            (&Method::DELETE, 3, Some("event-source-mappings"), _) => "DeleteEventSourceMapping",
1023            (&Method::POST, 3, Some("tags"), _) => "TagResource",
1024            (&Method::DELETE, 3, Some("tags"), _) => "UntagResource",
1025            (&Method::GET, 3, Some("tags"), _) => "ListTags",
1026            _ => return None,
1027        };
1028        let _ = fourth;
1029
1030        Some((action, resource))
1031    }
1032
1033    fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1034        let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1035        let input = CreateFunctionInput::from_body(&body)?;
1036
1037        // PassRole trust-policy check: the supplied execution role must
1038        // have a trust policy that allows lambda.amazonaws.com to call
1039        // sts:AssumeRole. Real AWS rejects with InvalidParameterValueException
1040        // when the trust policy doesn't include the service principal.
1041        if let Some(ref validator) = self.role_trust_validator {
1042            if let Err(err) =
1043                validator.validate(&req.account_id, &input.role, "lambda.amazonaws.com")
1044            {
1045                return Err(AwsServiceError::aws_error(
1046                    StatusCode::BAD_REQUEST,
1047                    "InvalidParameterValueException",
1048                    err.to_string(),
1049                ));
1050            }
1051        }
1052
1053        let mut accounts = self.state.write();
1054        // Pre-resolve layer attachments before re-borrowing accounts mutably.
1055        // Layer ARNs may live in sibling accounts.
1056        let layer_attachments =
1057            crate::extras::resolve_layer_attachments(&accounts, input.layer_arns.clone());
1058        let state = accounts.get_or_create(&req.account_id);
1059
1060        if state.functions.contains_key(&input.function_name) {
1061            return Err(AwsServiceError::aws_error(
1062                StatusCode::CONFLICT,
1063                "ResourceConflictException",
1064                format!("Function already exist: {}", input.function_name),
1065            ));
1066        }
1067
1068        // Hash the actual ZIP bytes when available, falling back to the
1069        // raw Code JSON so image-based functions still get a stable id.
1070        let code_bytes = input.code_zip.as_deref().unwrap_or(&input.code_fallback);
1071        let mut hasher = Sha256::new();
1072        hasher.update(code_bytes);
1073        let hash = hasher.finalize();
1074        let code_sha256 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
1075        let code_size = code_bytes.len() as i64;
1076
1077        let function_arn = format!(
1078            "arn:aws:lambda:{}:{}:function:{}",
1079            state.region, state.account_id, input.function_name
1080        );
1081        let now = Utc::now();
1082
1083        let func = LambdaFunction {
1084            function_name: input.function_name.clone(),
1085            function_arn,
1086            runtime: input.runtime,
1087            role: input.role,
1088            handler: input.handler,
1089            description: input.description,
1090            timeout: input.timeout,
1091            memory_size: input.memory_size,
1092            code_sha256,
1093            code_size,
1094            version: "$LATEST".to_string(),
1095            last_modified: now,
1096            tags: input.tags,
1097            environment: input.environment,
1098            architectures: input.architectures,
1099            package_type: input.package_type,
1100            code_zip: input.code_zip,
1101            image_uri: input.image_uri,
1102            policy: None,
1103            layers: layer_attachments,
1104            revision_id: uuid::Uuid::new_v4().to_string(),
1105            tracing_mode: input.tracing_mode,
1106            kms_key_arn: input.kms_key_arn,
1107            ephemeral_storage_size: input.ephemeral_storage_size,
1108            vpc_config: input.vpc_config,
1109            snap_start: input.snap_start,
1110            dead_letter_config_arn: input.dead_letter_config_arn,
1111            file_system_configs: input.file_system_configs,
1112            logging_config: input.logging_config,
1113            image_config: input.image_config,
1114            signing_profile_version_arn: None,
1115            signing_job_arn: None,
1116            runtime_version_config: None,
1117            master_arn: None,
1118            state_reason: None,
1119            state_reason_code: None,
1120            last_update_status_reason: None,
1121            last_update_status_reason_code: None,
1122        };
1123
1124        let response = self.function_config_json(&func);
1125
1126        state.functions.insert(input.function_name, func);
1127
1128        Ok(AwsResponse::json(StatusCode::CREATED, response.to_string()))
1129    }
1130
1131    fn get_function(
1132        &self,
1133        function_name: &str,
1134        account_id: &str,
1135        region: &str,
1136        qualifier: Option<&str>,
1137    ) -> Result<AwsResponse, AwsServiceError> {
1138        let accounts = self.state.read();
1139        let empty = LambdaState::new(account_id, region);
1140        let state = accounts.get(account_id).unwrap_or(&empty);
1141        let live = state.functions.get(function_name).ok_or_else(|| {
1142            AwsServiceError::aws_error(
1143                StatusCode::NOT_FOUND,
1144                "ResourceNotFoundException",
1145                format!(
1146                    "Function not found: arn:aws:lambda:{}:{}:function:{}",
1147                    state.region, state.account_id, function_name
1148                ),
1149            )
1150        })?;
1151
1152        // Resolve the qualifier to either $LATEST (live config) or a
1153        // numbered immutable snapshot. Aliases route through
1154        // `resolve_qualifier_to_version` so weighted aliases still pick
1155        // between the underlying numbered versions.
1156        let resolved_version = resolve_qualifier_to_version(state, function_name, qualifier);
1157        let (func, version_label) = match resolved_version {
1158            None => (live, "$LATEST".to_string()),
1159            Some(v) => {
1160                let snap = state
1161                    .function_version_snapshots
1162                    .get(function_name)
1163                    .and_then(|m| m.get(&v))
1164                    .ok_or_else(|| {
1165                        AwsServiceError::aws_error(
1166                            StatusCode::NOT_FOUND,
1167                            "ResourceNotFoundException",
1168                            format!(
1169                                "Function not found: arn:aws:lambda:{}:{}:function:{}:{v}",
1170                                state.region, state.account_id, function_name
1171                            ),
1172                        )
1173                    })?;
1174                (snap, v)
1175            }
1176        };
1177
1178        let mut config = self.function_config_json(func);
1179        config["Version"] = json!(version_label);
1180        if version_label != "$LATEST" {
1181            config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
1182            config["MasterArn"] = json!(live.function_arn);
1183        }
1184        let code = if let Some(ref uri) = func.image_uri {
1185            json!({
1186                "ImageUri": uri,
1187                "ResolvedImageUri": uri,
1188                "RepositoryType": "ECR",
1189            })
1190        } else {
1191            // fakecloud doesn't expose a presigned-download path yet; we
1192            // synthesize a `_fakecloud/`-namespaced URL so it's obvious to
1193            // operators that the URL isn't a real S3 link. (Real
1194            // download support is tracked under follow-up GG batch.)
1195            let region = func.function_arn.split(':').nth(3).unwrap_or("us-east-1");
1196            json!({
1197                "Location": format!(
1198                    "https://prod-{region}-starport-layer-bucket.s3.{region}.amazonaws.com/_fakecloud/{account}/{name}/code.zip",
1199                    account = state.account_id,
1200                    name = function_name,
1201                ),
1202                "RepositoryType": "S3",
1203            })
1204        };
1205        let response = json!({
1206            "Code": code,
1207            "Configuration": config,
1208            "Tags": live.tags,
1209        });
1210
1211        Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1212    }
1213
1214    fn delete_function(
1215        &self,
1216        function_name: &str,
1217        account_id: &str,
1218        qualifier: Option<&str>,
1219    ) -> Result<AwsResponse, AwsServiceError> {
1220        let mut accounts = self.state.write();
1221        let state = accounts.get_or_create(account_id);
1222        let region = state.region.clone();
1223        let account_id_owned = state.account_id.clone();
1224
1225        // Qualifier=N targets a single immutable version snapshot; the
1226        // live $LATEST function and other versions stay put. AWS only
1227        // accepts numeric qualifiers here — alias targets are deleted
1228        // via DeleteAlias, and `$LATEST` is rejected as
1229        // InvalidParameterValueException.
1230        if let Some(q) = qualifier {
1231            if q == "$LATEST" {
1232                return Err(AwsServiceError::aws_error(
1233                    StatusCode::BAD_REQUEST,
1234                    "InvalidParameterValueException",
1235                    "$LATEST version cannot be deleted without deleting the function.",
1236                ));
1237            }
1238            if !q.chars().all(|c| c.is_ascii_digit()) {
1239                return Err(AwsServiceError::aws_error(
1240                    StatusCode::BAD_REQUEST,
1241                    "InvalidParameterValueException",
1242                    format!(
1243                        "Value '{q}' at 'qualifier' failed to satisfy constraint: Member must satisfy regular expression pattern: (|[a-zA-Z0-9$_-]+)"
1244                    ),
1245                ));
1246            }
1247            // Live function must exist or AWS 404s before checking the version.
1248            if !state.functions.contains_key(function_name) {
1249                return Err(AwsServiceError::aws_error(
1250                    StatusCode::NOT_FOUND,
1251                    "ResourceNotFoundException",
1252                    format!(
1253                        "Function not found: arn:aws:lambda:{region}:{account_id_owned}:function:{function_name}:{q}"
1254                    ),
1255                ));
1256            }
1257            let snap_existed = state
1258                .function_version_snapshots
1259                .get_mut(function_name)
1260                .map(|m| m.remove(q).is_some())
1261                .unwrap_or(false);
1262            if !snap_existed {
1263                return Err(AwsServiceError::aws_error(
1264                    StatusCode::NOT_FOUND,
1265                    "ResourceNotFoundException",
1266                    format!(
1267                        "Function not found: arn:aws:lambda:{region}:{account_id_owned}:function:{function_name}:{q}"
1268                    ),
1269                ));
1270            }
1271            // Drop the version from the ordered list too so
1272            // ListVersionsByFunction reflects the deletion.
1273            if let Some(list) = state.function_versions.get_mut(function_name) {
1274                list.retain(|v| v != q);
1275            }
1276            return Ok(AwsResponse::json(StatusCode::NO_CONTENT, ""));
1277        }
1278
1279        if state.functions.remove(function_name).is_none() {
1280            return Err(AwsServiceError::aws_error(
1281                StatusCode::NOT_FOUND,
1282                "ResourceNotFoundException",
1283                format!(
1284                    "Function not found: arn:aws:lambda:{region}:{account_id_owned}:function:{function_name}"
1285                ),
1286            ));
1287        }
1288        // Drop all numbered versions + their snapshots so the function
1289        // is gone end-to-end (AWS deletes everything when no Qualifier
1290        // is supplied).
1291        state.function_versions.remove(function_name);
1292        state.function_version_snapshots.remove(function_name);
1293        // Aliases on this function disappear too.
1294        let prefix = format!("{function_name}:");
1295        state.aliases.retain(|k, _| !k.starts_with(&prefix));
1296
1297        // Clean up any running container for this function
1298        if let Some(ref runtime) = self.runtime {
1299            let rt = runtime.clone();
1300            let name = function_name.to_string();
1301            tokio::spawn(async move { rt.stop_container(&name).await });
1302        }
1303
1304        Ok(AwsResponse::json(StatusCode::NO_CONTENT, ""))
1305    }
1306
1307    fn list_functions(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1308        let accounts = self.state.read();
1309        let empty = LambdaState::new(account_id, "");
1310        let state = accounts.get(account_id).unwrap_or(&empty);
1311        let functions: Vec<Value> = state
1312            .functions
1313            .values()
1314            .map(|f| self.function_config_json(f))
1315            .collect();
1316
1317        let response = json!({
1318            "Functions": functions,
1319        });
1320
1321        Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1322    }
1323
1324    async fn invoke(
1325        &self,
1326        function_name: &str,
1327        payload: &[u8],
1328        account_id: &str,
1329        invocation_type: InvocationType,
1330        qualifier: Option<&str>,
1331    ) -> Result<AwsResponse, AwsServiceError> {
1332        // Resolve qualifier (alias / numeric version / $LATEST) to a
1333        // concrete version string. Aliases with a
1334        // `RoutingConfig.AdditionalVersionWeights` map do a weighted
1335        // pick across `function_version` + the additional versions,
1336        // mirroring AWS canary routing. The resolved version is
1337        // surfaced via `x-amz-executed-version` so callers can verify
1338        // which version actually ran. We always invoke against the
1339        // live `$LATEST` function record for now — version-snapshot
1340        // execution is wired separately once
1341        // `function_version_snapshots` lands.
1342        let resolved_version: Option<String> = {
1343            let accounts = self.state.read();
1344            let empty = LambdaState::new(account_id, "");
1345            let state = accounts.get(account_id).unwrap_or(&empty);
1346            resolve_qualifier_to_version(state, function_name, qualifier)
1347        };
1348        let executed_version = resolved_version
1349            .clone()
1350            .unwrap_or_else(|| "$LATEST".to_string());
1351        let (func, layer_zips) = {
1352            let accounts = self.state.read();
1353            let empty = LambdaState::new(account_id, "");
1354            let state = accounts.get(account_id).unwrap_or(&empty);
1355            // Resolve numbered versions to the immutable snapshot stored
1356            // by PublishVersion so an alias pinned to v1 runs the v1 code
1357            // even after $LATEST is mutated. Falls back to $LATEST when
1358            // the snapshot is missing (legacy state) so we never 404 a
1359            // routable invoke.
1360            let func = match resolved_version.as_deref() {
1361                Some(v) => state
1362                    .function_version_snapshots
1363                    .get(function_name)
1364                    .and_then(|m| m.get(v))
1365                    .cloned()
1366                    .or_else(|| state.functions.get(function_name).cloned()),
1367                None => state.functions.get(function_name).cloned(),
1368            }
1369            .ok_or_else(|| {
1370                AwsServiceError::aws_error(
1371                    StatusCode::NOT_FOUND,
1372                    "ResourceNotFoundException",
1373                    format!(
1374                        "Function not found: arn:aws:lambda:{}:{}:function:{}",
1375                        state.region, state.account_id, function_name
1376                    ),
1377                )
1378            })?;
1379            // Resolve attached layer ARNs to ZIP bytes under the same read
1380            // lock. Layers may live in sibling accounts (cross-account
1381            // attach is legal in AWS); fall back to no bytes for unknown
1382            // ARNs and warn — invoke proceeds without that layer.
1383            let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
1384            for attached in &func.layers {
1385                let bytes = crate::extras::parse_layer_version_arn(&attached.arn).and_then(
1386                    |(acct, name, ver)| {
1387                        accounts
1388                            .get(&acct)
1389                            .and_then(|s| s.layers.get(&name))
1390                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
1391                            .and_then(|v| v.code_zip.clone())
1392                    },
1393                );
1394                match bytes {
1395                    Some(b) => layer_zips.push(b),
1396                    None => tracing::warn!(
1397                        function = %function_name,
1398                        layer_arn = %attached.arn,
1399                        "attached layer not resolvable; skipping /opt mount for this layer"
1400                    ),
1401                }
1402            }
1403            (func, layer_zips)
1404        };
1405
1406        // Reserved-concurrency gate runs before code-zip / DryRun
1407        // checks: AWS returns 429 even for functions without a code
1408        // package as long as the cap is already hit, since throttling
1409        // is request-rate, not deployment-state.
1410        let concurrency_key = format!("{account_id}:{function_name}");
1411        let _concurrency_guard = {
1412            let cap = {
1413                let accounts = self.state.read();
1414                accounts
1415                    .get(account_id)
1416                    .and_then(|s| s.function_concurrency.get(function_name).copied())
1417            };
1418            let mut map = self.inflight_invocations.write();
1419            let current = map.get(&concurrency_key).copied().unwrap_or(0);
1420            if let Some(limit) = cap {
1421                if current >= limit {
1422                    // AWS returns the throttle Reason in the body so SDKs
1423                    // can branch on `ReservedFunctionConcurrentInvocationLimitExceeded`
1424                    // versus account-pool limits.
1425                    return Err(AwsServiceError::aws_error_with_fields(
1426                        StatusCode::TOO_MANY_REQUESTS,
1427                        "TooManyRequestsException",
1428                        "Rate Exceeded.",
1429                        vec![(
1430                            "Reason".to_string(),
1431                            "ReservedFunctionConcurrentInvocationLimitExceeded".to_string(),
1432                        )],
1433                    ));
1434                }
1435            }
1436            map.insert(concurrency_key.clone(), current + 1);
1437            ConcurrencyGuard {
1438                map: self.inflight_invocations.clone(),
1439                key: concurrency_key.clone(),
1440            }
1441        };
1442
1443        if func.code_zip.is_none() {
1444            return Err(AwsServiceError::aws_error(
1445                StatusCode::BAD_REQUEST,
1446                "InvalidParameterValueException",
1447                "Function has no deployment package",
1448            ));
1449        }
1450
1451        let invoke_start = std::time::Instant::now();
1452        let dry_run_response = if matches!(invocation_type, InvocationType::DryRun) {
1453            let mut resp = AwsResponse::json(StatusCode::NO_CONTENT, "");
1454            if let Ok(v) = http::header::HeaderValue::from_str(&executed_version) {
1455                resp.headers.insert(
1456                    http::header::HeaderName::from_static("x-amz-executed-version"),
1457                    v,
1458                );
1459            }
1460            Some(resp)
1461        } else {
1462            None
1463        };
1464
1465        let runtime_for_invoke = if dry_run_response.is_some() {
1466            None
1467        } else {
1468            self.runtime.clone()
1469        };
1470
1471        let result: Result<AwsResponse, AwsServiceError> = if let Some(resp) = dry_run_response {
1472            Ok(resp)
1473        } else if let Some(runtime) = runtime_for_invoke {
1474            match invocation_type {
1475                InvocationType::Event => {
1476                    // Fire-and-forget. AWS returns 202 with no body. Move
1477                    // the concurrency guard into the spawned task so the
1478                    // counter only drops once the async invocation
1479                    // actually finishes.
1480                    let runtime = runtime.clone();
1481                    let func_clone = func.clone();
1482                    let payload_vec = payload.to_vec();
1483                    let bus = self.delivery_bus.clone();
1484                    let destination_config = self.lookup_destination_config(&func, account_id);
1485                    let function_arn = func.function_arn.clone();
1486                    let layer_zips_async = layer_zips.clone();
1487                    let async_guard = _concurrency_guard;
1488                    tokio::spawn(async move {
1489                        let _g = async_guard;
1490                        let result = match runtime
1491                            .invoke(&func_clone, &payload_vec, &layer_zips_async)
1492                            .await
1493                        {
1494                            Ok(bytes) => {
1495                                // Lambda runtime returns 200 even on uncaught
1496                                // function errors; the body has errorMessage /
1497                                // errorType. Treat that as failure for routing.
1498                                let parsed: Option<serde_json::Value> =
1499                                    serde_json::from_slice(&bytes).ok();
1500                                let is_error = parsed
1501                                    .as_ref()
1502                                    .and_then(|v| v.as_object())
1503                                    .map(|m| {
1504                                        m.contains_key("errorMessage")
1505                                            || m.contains_key("errorType")
1506                                    })
1507                                    .unwrap_or(false);
1508                                if is_error {
1509                                    let msg = parsed
1510                                        .as_ref()
1511                                        .and_then(|v| v.get("errorMessage"))
1512                                        .and_then(|v| v.as_str())
1513                                        .unwrap_or("function error")
1514                                        .to_string();
1515                                    Err(msg)
1516                                } else {
1517                                    Ok(bytes)
1518                                }
1519                            }
1520                            Err(e) => Err(e.to_string()),
1521                        };
1522                        if let Some(bus) = bus {
1523                            route_to_destination(
1524                                bus,
1525                                &function_arn,
1526                                &payload_vec,
1527                                &result,
1528                                destination_config.as_ref(),
1529                            );
1530                        }
1531                    });
1532                    let mut resp = AwsResponse::json(StatusCode::ACCEPTED, "");
1533                    if let Ok(v) = http::header::HeaderValue::from_str(&executed_version) {
1534                        resp.headers.insert(
1535                            http::header::HeaderName::from_static("x-amz-executed-version"),
1536                            v,
1537                        );
1538                    }
1539                    Ok(resp)
1540                }
1541                InvocationType::RequestResponse | InvocationType::DryRun => {
1542                    match runtime.invoke(&func, payload, &layer_zips).await {
1543                        Ok(response_bytes) => {
1544                            let mut resp = AwsResponse::json(StatusCode::OK, response_bytes);
1545                            if let Ok(v) = http::header::HeaderValue::from_str(&executed_version) {
1546                                resp.headers.insert(
1547                                    http::header::HeaderName::from_static("x-amz-executed-version"),
1548                                    v,
1549                                );
1550                            }
1551                            Ok(resp)
1552                        }
1553                        Err(e) => {
1554                            tracing::error!(function = %function_name, error = %e, "Lambda invocation failed");
1555                            Err(AwsServiceError::aws_error(
1556                                StatusCode::INTERNAL_SERVER_ERROR,
1557                                "ServiceException",
1558                                format!("Lambda execution failed: {e}"),
1559                            ))
1560                        }
1561                    }
1562                }
1563            }
1564        } else {
1565            Err(AwsServiceError::aws_error(
1566                StatusCode::INTERNAL_SERVER_ERROR,
1567                "ServiceException",
1568                "Docker/Podman is required for Lambda execution but is not available",
1569            ))
1570        };
1571
1572        // Publish standard AWS/Lambda metrics: Invocations always, Errors
1573        // when the runtime returned a failure, Duration in milliseconds.
1574        // Mirrors what real Lambda emits and lets fakecloud users wire
1575        // CloudWatch alarms on Lambda errors / latency in tests.
1576        if let Some(bus) = &self.delivery_bus {
1577            let dims: std::collections::BTreeMap<String, String> =
1578                [("FunctionName".to_string(), function_name.to_string())]
1579                    .into_iter()
1580                    .collect();
1581            let now_ms = chrono::Utc::now().timestamp_millis();
1582            let region = {
1583                let accounts = self.state.read();
1584                let empty = LambdaState::new(account_id, "");
1585                accounts
1586                    .get(account_id)
1587                    .map(|s| s.region.clone())
1588                    .unwrap_or_else(|| empty.region)
1589            };
1590            bus.put_cloudwatch_metric(
1591                account_id,
1592                &region,
1593                "AWS/Lambda",
1594                "Invocations",
1595                1.0,
1596                Some("Count"),
1597                dims.clone(),
1598                now_ms,
1599            );
1600            bus.put_cloudwatch_metric(
1601                account_id,
1602                &region,
1603                "AWS/Lambda",
1604                "Duration",
1605                invoke_start.elapsed().as_millis() as f64,
1606                Some("Milliseconds"),
1607                dims.clone(),
1608                now_ms,
1609            );
1610            if result.is_err() {
1611                bus.put_cloudwatch_metric(
1612                    account_id,
1613                    &region,
1614                    "AWS/Lambda",
1615                    "Errors",
1616                    1.0,
1617                    Some("Count"),
1618                    dims,
1619                    now_ms,
1620                );
1621            }
1622        }
1623
1624        result
1625    }
1626
1627    /// Pull EventInvokeConfig.DestinationConfig for the function. The
1628    /// stored key is `<function_name>:<qualifier>`; treat unqualified
1629    /// invokes as the empty qualifier (matches `parse_qualifier` in
1630    /// `extras.rs` when no `Qualifier` is supplied).
1631    fn lookup_destination_config(
1632        &self,
1633        func: &crate::state::LambdaFunction,
1634        account_id: &str,
1635    ) -> Option<serde_json::Value> {
1636        let accounts = self.state.read();
1637        let state = accounts.get(account_id)?;
1638        let key = format!("{}:$LATEST", func.function_name);
1639        state
1640            .event_invoke_configs
1641            .get(&key)
1642            .map(|cfg| cfg.destination_config.clone())
1643            .filter(|v| !v.is_null() && !v.as_object().map(|o| o.is_empty()).unwrap_or(false))
1644    }
1645
1646    pub(crate) fn publish_version(
1647        &self,
1648        function_name: &str,
1649        account_id: &str,
1650        req: &AwsRequest,
1651    ) -> Result<AwsResponse, AwsServiceError> {
1652        // Optional preconditions from the body. Both compare the supplied
1653        // value against the live `$LATEST` state; mismatch yields 412
1654        // PreconditionFailedException, matching AWS optimistic-concurrency.
1655        let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1656        let supplied_revision = body["RevisionId"].as_str().map(String::from);
1657        let supplied_sha = body["CodeSha256"].as_str().map(String::from);
1658        let description_override = body["Description"].as_str().map(String::from);
1659
1660        let mut accounts = self.state.write();
1661        let state = accounts.get_or_create(account_id);
1662        let func = state.functions.get(function_name).ok_or_else(|| {
1663            AwsServiceError::aws_error(
1664                StatusCode::NOT_FOUND,
1665                "ResourceNotFoundException",
1666                format!(
1667                    "Function not found: arn:aws:lambda:{}:{}:function:{}",
1668                    state.region, state.account_id, function_name
1669                ),
1670            )
1671        })?;
1672
1673        if let Some(ref rev) = supplied_revision {
1674            if rev != &func.revision_id {
1675                return Err(AwsServiceError::aws_error(
1676                    StatusCode::PRECONDITION_FAILED,
1677                    "PreconditionFailedException",
1678                    "The RevisionId provided does not match the latest RevisionId for the Lambda function. Call the GetFunction or the GetAlias API to retrieve the latest RevisionId for your resource.",
1679                ));
1680            }
1681        }
1682        if let Some(ref sha) = supplied_sha {
1683            if sha != &func.code_sha256 {
1684                return Err(AwsServiceError::aws_error(
1685                    StatusCode::PRECONDITION_FAILED,
1686                    "PreconditionFailedException",
1687                    "CodeSha256 does not match the SHA-256 of the function's deployment package.",
1688                ));
1689            }
1690        }
1691
1692        // Pick the next version number per function, monotonic per
1693        // function arn, never reused. AWS uses sequential decimal
1694        // strings starting at 1.
1695        let existing = state
1696            .function_versions
1697            .get(function_name)
1698            .cloned()
1699            .unwrap_or_default();
1700        let latest_version = existing.iter().filter_map(|v| v.parse::<u64>().ok()).max();
1701
1702        // PublishVersion is idempotent on AWS: if `$LATEST` hasn't
1703        // changed since the most recent published version, return that
1704        // existing snapshot instead of bumping the counter. We compare
1705        // every field that PublishVersion would otherwise carry into
1706        // the new snapshot — code identity, description, runtime,
1707        // handler, role, env, memory/timeout, layers, image config,
1708        // VPC, EFS, logging, tracing, kms, ephemeral storage — so a
1709        // config-only change (e.g. UpdateFunctionConfiguration bumping
1710        // memory) still produces a fresh version. This keeps deploy
1711        // pipelines that re-publish on every CI run from leaking a
1712        // fresh numbered version per build when the underlying
1713        // artifact + config are identical.
1714        if let Some(latest_num) = latest_version {
1715            let latest_str = latest_num.to_string();
1716            if let Some(prev_snap) = state
1717                .function_version_snapshots
1718                .get(function_name)
1719                .and_then(|m| m.get(&latest_str))
1720                .cloned()
1721            {
1722                let effective_desc = description_override
1723                    .clone()
1724                    .unwrap_or_else(|| func.description.clone());
1725                if function_config_unchanged_for_publish(&prev_snap, func, &effective_desc) {
1726                    let mut config = self.function_config_json(&prev_snap);
1727                    config["Version"] = json!(latest_str);
1728                    config["FunctionArn"] = json!(format!("{}:{latest_str}", func.function_arn));
1729                    config["MasterArn"] = json!(func.function_arn);
1730                    return Ok(AwsResponse::json(StatusCode::CREATED, config.to_string()));
1731                }
1732            }
1733        }
1734
1735        let next: u64 = latest_version.unwrap_or(0) + 1;
1736        let next_str = next.to_string();
1737
1738        // Snapshot the function config + code for the new immutable version.
1739        let mut snapshot = func.clone();
1740        snapshot.version = next_str.clone();
1741        snapshot.master_arn = Some(func.function_arn.clone());
1742        if let Some(desc) = description_override {
1743            snapshot.description = desc;
1744        }
1745        // Each numbered version gets its own RevisionId, decoupled from $LATEST.
1746        snapshot.revision_id = uuid::Uuid::new_v4().to_string();
1747
1748        // SnapStart optimization completes asynchronously on real Lambda
1749        // when ApplyOn=PublishedVersions. fakecloud has no actual
1750        // optimization step, so we flip OptimizationStatus to "On"
1751        // eagerly on the published-version snapshot so clients that
1752        // wait on this transition see the steady state immediately.
1753        if let Some(snap) = snapshot.snap_start.as_mut() {
1754            if snap.get("ApplyOn").and_then(|v| v.as_str()) == Some("PublishedVersions") {
1755                snap["OptimizationStatus"] = json!("On");
1756            }
1757        }
1758
1759        // Append to numbered list and store the snapshot.
1760        state
1761            .function_versions
1762            .entry(function_name.to_string())
1763            .or_default()
1764            .push(next_str.clone());
1765        state
1766            .function_version_snapshots
1767            .entry(function_name.to_string())
1768            .or_default()
1769            .insert(next_str.clone(), snapshot.clone());
1770
1771        let mut config = self.function_config_json(&snapshot);
1772        config["Version"] = json!(next_str);
1773        config["FunctionArn"] = json!(format!("{}:{next_str}", func.function_arn));
1774        config["MasterArn"] = json!(func.function_arn);
1775
1776        Ok(AwsResponse::json(StatusCode::CREATED, config.to_string()))
1777    }
1778
1779    pub(crate) fn function_config_json(&self, func: &LambdaFunction) -> Value {
1780        // AWS always emits Environment with at least an empty Variables map.
1781        let env_vars = if func.environment.is_empty() {
1782            json!({ "Variables": {} })
1783        } else {
1784            json!({ "Variables": func.environment })
1785        };
1786
1787        let tracing_mode = func.tracing_mode.as_deref().unwrap_or("PassThrough");
1788        let ephemeral_size = func.ephemeral_storage_size.unwrap_or(512);
1789
1790        let mut config = json!({
1791            "FunctionName": func.function_name,
1792            "FunctionArn": func.function_arn,
1793            "Runtime": func.runtime,
1794            "Role": func.role,
1795            "Handler": func.handler,
1796            "Description": func.description,
1797            "Timeout": func.timeout,
1798            "MemorySize": func.memory_size,
1799            "CodeSha256": func.code_sha256,
1800            "CodeSize": func.code_size,
1801            "Version": func.version,
1802            "LastModified": func.last_modified.format("%Y-%m-%dT%H:%M:%S%.3f+0000").to_string(),
1803            "PackageType": func.package_type,
1804            "Architectures": func.architectures,
1805            "Environment": env_vars,
1806            "State": "Active",
1807            "LastUpdateStatus": "Successful",
1808            "TracingConfig": { "Mode": tracing_mode },
1809            "RevisionId": func.revision_id,
1810            "EphemeralStorage": { "Size": ephemeral_size },
1811            "SnapStart": func.snap_start.clone().unwrap_or_else(|| json!({
1812                "ApplyOn": "None",
1813                "OptimizationStatus": "Off",
1814            })),
1815        });
1816        if let Some(ref kms) = func.kms_key_arn {
1817            config["KMSKeyArn"] = json!(kms);
1818        }
1819        if let Some(ref vpc) = func.vpc_config {
1820            config["VpcConfig"] = vpc.clone();
1821        }
1822        if let Some(ref dlq) = func.dead_letter_config_arn {
1823            config["DeadLetterConfig"] = json!({ "TargetArn": dlq });
1824        }
1825        if !func.file_system_configs.is_empty() {
1826            config["FileSystemConfigs"] = json!(func.file_system_configs);
1827        }
1828        if let Some(ref lg) = func.logging_config {
1829            config["LoggingConfig"] = lg.clone();
1830        }
1831        if let Some(ref ic) = func.image_config {
1832            config["ImageConfigResponse"] = json!({ "ImageConfig": ic });
1833        }
1834        if let Some(ref s) = func.signing_profile_version_arn {
1835            config["SigningProfileVersionArn"] = json!(s);
1836        }
1837        if let Some(ref s) = func.signing_job_arn {
1838            config["SigningJobArn"] = json!(s);
1839        }
1840        if let Some(ref rv) = func.runtime_version_config {
1841            config["RuntimeVersionConfig"] = rv.clone();
1842        }
1843        if let Some(ref m) = func.master_arn {
1844            config["MasterArn"] = json!(m);
1845        }
1846        if let Some(ref uri) = func.image_uri {
1847            config["Code"] = json!({
1848                "ImageUri": uri,
1849                "ResolvedImageUri": uri,
1850            });
1851        }
1852        if !func.layers.is_empty() {
1853            config["Layers"] = json!(func
1854                .layers
1855                .iter()
1856                .map(|l| json!({"Arn": l.arn, "CodeSize": l.code_size}))
1857                .collect::<Vec<_>>());
1858        }
1859        if let Some(ref r) = func.state_reason {
1860            config["StateReason"] = json!(r);
1861        }
1862        if let Some(ref c) = func.state_reason_code {
1863            config["StateReasonCode"] = json!(c);
1864        }
1865        if let Some(ref r) = func.last_update_status_reason {
1866            config["LastUpdateStatusReason"] = json!(r);
1867        }
1868        if let Some(ref c) = func.last_update_status_reason_code {
1869            config["LastUpdateStatusReasonCode"] = json!(c);
1870        }
1871        config
1872    }
1873}
1874
1875#[async_trait]
1876impl AwsService for LambdaService {
1877    fn service_name(&self) -> &str {
1878        "lambda"
1879    }
1880
1881    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882        let (action, resource_name) = Self::resolve_action(&req).ok_or_else(|| {
1883            AwsServiceError::aws_error(
1884                StatusCode::NOT_FOUND,
1885                "UnknownOperationException",
1886                format!("Unknown operation: {} {}", req.method, req.raw_path),
1887            )
1888        })?;
1889
1890        // Normalize FunctionName-bearing resource slots: AWS Lambda accepts
1891        // bare name, name:qualifier, partial ARN, and full ARN in any URL
1892        // slot that names a function. Layer / event-source-mapping resource
1893        // names go through different routes and are left as-is.
1894        let resource_name = if action_takes_function_name(action) {
1895            resource_name.map(|s| normalize_function_name(&s))
1896        } else {
1897            resource_name
1898        };
1899
1900        let mutates = matches!(
1901            action,
1902            "CreateFunction"
1903                | "DeleteFunction"
1904                | "PublishVersion"
1905                | "AddPermission"
1906                | "RemovePermission"
1907                | "CreateEventSourceMapping"
1908                | "DeleteEventSourceMapping"
1909                | "UpdateEventSourceMapping"
1910                | "UpdateFunctionCode"
1911                | "UpdateFunctionConfiguration"
1912                | "CreateAlias"
1913                | "DeleteAlias"
1914                | "UpdateAlias"
1915                | "PublishLayerVersion"
1916                | "DeleteLayerVersion"
1917                | "AddLayerVersionPermission"
1918                | "RemoveLayerVersionPermission"
1919                | "CreateFunctionUrlConfig"
1920                | "DeleteFunctionUrlConfig"
1921                | "UpdateFunctionUrlConfig"
1922                | "PutFunctionConcurrency"
1923                | "DeleteFunctionConcurrency"
1924                | "PutProvisionedConcurrencyConfig"
1925                | "DeleteProvisionedConcurrencyConfig"
1926                | "CreateCodeSigningConfig"
1927                | "UpdateCodeSigningConfig"
1928                | "DeleteCodeSigningConfig"
1929                | "PutFunctionCodeSigningConfig"
1930                | "DeleteFunctionCodeSigningConfig"
1931                | "PutFunctionEventInvokeConfig"
1932                | "UpdateFunctionEventInvokeConfig"
1933                | "DeleteFunctionEventInvokeConfig"
1934                | "PutRuntimeManagementConfig"
1935                | "PutFunctionScalingConfig"
1936                | "PutFunctionRecursionConfig"
1937                | "TagResource"
1938                | "UntagResource"
1939                | "InvokeAsync"
1940                | "InvokeWithResponseStream"
1941        );
1942
1943        let aid = &req.account_id;
1944        let result = match action {
1945            "CreateFunction" => self.create_function(&req),
1946            "ListFunctions" => self.list_functions(aid),
1947            "GetFunction" => self.get_function(
1948                resource_name.as_deref().unwrap_or(""),
1949                aid,
1950                req.region.as_str(),
1951                req.query_params.get("Qualifier").map(String::as_str),
1952            ),
1953            "DeleteFunction" => self.delete_function(
1954                resource_name.as_deref().unwrap_or(""),
1955                aid,
1956                req.query_params.get("Qualifier").map(String::as_str),
1957            ),
1958            "Invoke" => {
1959                let invocation_type = InvocationType::from_header(
1960                    req.headers
1961                        .get("x-amz-invocation-type")
1962                        .and_then(|v| v.to_str().ok()),
1963                );
1964                let qualifier = req.query_params.get("Qualifier").map(String::as_str);
1965                self.invoke(
1966                    resource_name.as_deref().unwrap_or(""),
1967                    &req.body,
1968                    aid,
1969                    invocation_type,
1970                    qualifier,
1971                )
1972                .await
1973            }
1974            "InvokeAsync" => {
1975                self.invoke(
1976                    resource_name.as_deref().unwrap_or(""),
1977                    &req.body,
1978                    aid,
1979                    InvocationType::Event,
1980                    None,
1981                )
1982                .await
1983            }
1984            "PublishVersion" => {
1985                self.publish_version(resource_name.as_deref().unwrap_or(""), aid, &req)
1986            }
1987            "AddPermission" => self.add_permission(resource_name.as_deref().unwrap_or(""), &req),
1988            "GetPolicy" => self.get_policy(
1989                resource_name.as_deref().unwrap_or(""),
1990                aid,
1991                req.query_params.get("Qualifier").map(String::as_str),
1992            ),
1993            "RemovePermission" => {
1994                // Path: /2015-03-31/functions/{name}/policy/{sid}
1995                let sid = req.path_segments.get(4).cloned().unwrap_or_default();
1996                self.remove_permission(
1997                    resource_name.as_deref().unwrap_or(""),
1998                    &sid,
1999                    aid,
2000                    req.query_params.get("Qualifier").map(String::as_str),
2001                )
2002            }
2003            "CreateEventSourceMapping" => self.create_event_source_mapping(&req),
2004            "ListEventSourceMappings" => self.list_event_source_mappings(aid),
2005            "GetEventSourceMapping" => {
2006                self.get_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
2007            }
2008            "DeleteEventSourceMapping" => {
2009                self.delete_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
2010            }
2011            other => {
2012                self.handle_extra(other, resource_name.as_deref(), &req)
2013                    .await
2014            }
2015        };
2016        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
2017            self.save_snapshot().await;
2018        }
2019        result
2020    }
2021
2022    fn supported_actions(&self) -> &[&str] {
2023        &[
2024            "CreateFunction",
2025            "GetFunction",
2026            "DeleteFunction",
2027            "ListFunctions",
2028            "Invoke",
2029            "InvokeAsync",
2030            "InvokeWithResponseStream",
2031            "PublishVersion",
2032            "ListVersionsByFunction",
2033            "AddPermission",
2034            "RemovePermission",
2035            "GetPolicy",
2036            "CreateEventSourceMapping",
2037            "ListEventSourceMappings",
2038            "GetEventSourceMapping",
2039            "UpdateEventSourceMapping",
2040            "DeleteEventSourceMapping",
2041            "GetFunctionConfiguration",
2042            "UpdateFunctionConfiguration",
2043            "UpdateFunctionCode",
2044            "GetAccountSettings",
2045            "CreateAlias",
2046            "GetAlias",
2047            "ListAliases",
2048            "UpdateAlias",
2049            "DeleteAlias",
2050            "PublishLayerVersion",
2051            "GetLayerVersion",
2052            "GetLayerVersionByArn",
2053            "DeleteLayerVersion",
2054            "ListLayerVersions",
2055            "ListLayers",
2056            "GetLayerVersionPolicy",
2057            "AddLayerVersionPermission",
2058            "RemoveLayerVersionPermission",
2059            "CreateFunctionUrlConfig",
2060            "GetFunctionUrlConfig",
2061            "UpdateFunctionUrlConfig",
2062            "DeleteFunctionUrlConfig",
2063            "ListFunctionUrlConfigs",
2064            "PutFunctionConcurrency",
2065            "GetFunctionConcurrency",
2066            "DeleteFunctionConcurrency",
2067            "PutProvisionedConcurrencyConfig",
2068            "GetProvisionedConcurrencyConfig",
2069            "DeleteProvisionedConcurrencyConfig",
2070            "ListProvisionedConcurrencyConfigs",
2071            "CreateCodeSigningConfig",
2072            "GetCodeSigningConfig",
2073            "UpdateCodeSigningConfig",
2074            "DeleteCodeSigningConfig",
2075            "ListCodeSigningConfigs",
2076            "PutFunctionCodeSigningConfig",
2077            "GetFunctionCodeSigningConfig",
2078            "DeleteFunctionCodeSigningConfig",
2079            "ListFunctionsByCodeSigningConfig",
2080            "PutFunctionEventInvokeConfig",
2081            "GetFunctionEventInvokeConfig",
2082            "UpdateFunctionEventInvokeConfig",
2083            "DeleteFunctionEventInvokeConfig",
2084            "ListFunctionEventInvokeConfigs",
2085            "PutRuntimeManagementConfig",
2086            "GetRuntimeManagementConfig",
2087            "PutFunctionScalingConfig",
2088            "GetFunctionScalingConfig",
2089            "PutFunctionRecursionConfig",
2090            "GetFunctionRecursionConfig",
2091            "TagResource",
2092            "UntagResource",
2093            "ListTags",
2094        ]
2095    }
2096
2097    fn iam_enforceable(&self) -> bool {
2098        true
2099    }
2100
2101    /// Lambda resources are function ARNs. Function-scoped ops
2102    /// resolve the target ARN from the path; list ops target `*`
2103    /// (the whole service), matching how AWS models them.
2104    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
2105        // REST-JSON services don't have `request.action` populated at
2106        // dispatch time — it's derived from method+path inside
2107        // `handle()`. Reuse the same resolver so the two can never
2108        // drift.
2109        let (action_str, resource_name) = Self::resolve_action(request)?;
2110        let action: &'static str = match action_str {
2111            "CreateFunction" => "CreateFunction",
2112            "ListFunctions" => "ListFunctions",
2113            "GetFunction" => "GetFunction",
2114            "DeleteFunction" => "DeleteFunction",
2115            "Invoke" => "InvokeFunction",
2116            "InvokeWithResponseStream" => "InvokeFunctionWithResponseStream",
2117            "PublishVersion" => "PublishVersion",
2118            "AddPermission" => "AddPermission",
2119            "RemovePermission" => "RemovePermission",
2120            "GetPolicy" => "GetPolicy",
2121            "CreateEventSourceMapping" => "CreateEventSourceMapping",
2122            "ListEventSourceMappings" => "ListEventSourceMappings",
2123            "GetEventSourceMapping" => "GetEventSourceMapping",
2124            "DeleteEventSourceMapping" => "DeleteEventSourceMapping",
2125            _ => return None,
2126        };
2127        let accounts = self.state.read();
2128        let empty = LambdaState::new(&request.account_id, &request.region);
2129        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2130        let resource = match action {
2131            "GetFunction"
2132            | "DeleteFunction"
2133            | "InvokeFunction"
2134            | "InvokeFunctionWithResponseStream"
2135            | "PublishVersion"
2136            | "AddPermission"
2137            | "RemovePermission"
2138            | "GetPolicy" => {
2139                let name = resource_name.unwrap_or_default();
2140                if name.is_empty() {
2141                    "*".to_string()
2142                } else {
2143                    format!(
2144                        "arn:aws:lambda:{}:{}:function:{}",
2145                        state.region, state.account_id, name
2146                    )
2147                }
2148            }
2149            "CreateFunction" => {
2150                // Best-effort: parse the FunctionName from the body so
2151                // CreateFunction can be resource-scoped against the
2152                // to-be-created ARN. Falls back to `*` when the body
2153                // isn't JSON yet (e.g. soft-mode observability).
2154                serde_json::from_slice::<Value>(&request.body)
2155                    .ok()
2156                    .and_then(|v| {
2157                        v.get("FunctionName").and_then(|f| f.as_str()).map(|n| {
2158                            format!(
2159                                "arn:aws:lambda:{}:{}:function:{}",
2160                                state.region, state.account_id, n
2161                            )
2162                        })
2163                    })
2164                    .unwrap_or_else(|| "*".to_string())
2165            }
2166            _ => "*".to_string(),
2167        };
2168        Some(fakecloud_core::auth::IamAction {
2169            service: "lambda",
2170            action,
2171            resource,
2172        })
2173    }
2174
2175    fn iam_condition_keys_for(
2176        &self,
2177        request: &AwsRequest,
2178        action: &fakecloud_core::auth::IamAction,
2179    ) -> std::collections::BTreeMap<String, Vec<String>> {
2180        let mut out = std::collections::BTreeMap::new();
2181        if action.action == "AddPermission" {
2182            if action.resource != "*" {
2183                out.insert(
2184                    "lambda:functionarn".to_string(),
2185                    vec![action.resource.clone()],
2186                );
2187            }
2188            if let Ok(body) = serde_json::from_slice::<Value>(&request.body) {
2189                if let Some(principal) = body.get("Principal").and_then(|p| p.as_str()) {
2190                    out.insert("lambda:principal".to_string(), vec![principal.to_string()]);
2191                }
2192            }
2193        }
2194        out
2195    }
2196}
2197
2198#[path = "service_event_sources.rs"]
2199mod service_event_sources;
2200#[path = "service_permissions.rs"]
2201mod service_permissions;
2202
2203#[cfg(test)]
2204#[path = "service_tests.rs"]
2205mod tests;