Skip to main content

fakecloud_lambda/
extras.rs

1//! Lambda handlers added to close the conformance gap. Aliases, layers,
2//! function URL configs, concurrency, code signing, event invoke, runtime
3//! management, scaling, recursion, tagging, and account settings.
4
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15    AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16    FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17    ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20/// Resolve a layer-version ARN to its current `CodeSize` from the
21/// multi-account state. Returns 0 when the ARN is unparseable, when the
22/// referenced account/layer/version is unknown, or when the version was
23/// published without ZIP content (legacy snapshots).
24pub(crate) fn resolve_layer_attachments(
25    accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26    arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28    arns.into_iter()
29        .map(|arn| {
30            let code_size = parse_layer_version_arn(&arn)
31                .and_then(|(acct, name, ver)| {
32                    accounts
33                        .get(&acct)
34                        .and_then(|s| s.layers.get(&name))
35                        .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36                        .map(|v| v.code_size)
37                })
38                .unwrap_or(0);
39            AttachedLayer { arn, code_size }
40        })
41        .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45    AwsServiceError::aws_error(
46        StatusCode::BAD_REQUEST,
47        "InvalidParameterValueException",
48        format!("Missing required field: {name}"),
49    )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53    AwsServiceError::aws_error(
54        StatusCode::NOT_FOUND,
55        "ResourceNotFoundException",
56        format!("{entity} not found: {name}"),
57    )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61    Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65    Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69    serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72/// Extract the function name from a Lambda function ARN, ignoring any
73/// trailing `:version` / `:alias` qualifier. Returns `None` for ARNs
74/// that name a different resource type (event-source mapping,
75/// code-signing config, layer, …) — Lambda only supports tags on
76/// function ARNs in this implementation, so non-function ARNs are
77/// rejected by callers as `InvalidParameterValueException`.
78fn function_name_from_arn(arn: &str) -> Option<String> {
79    let rest = arn.strip_prefix("arn:aws:lambda:")?;
80    let mut parts = rest.splitn(5, ':');
81    let _region = parts.next()?;
82    let _account = parts.next()?;
83    let resource_kind = parts.next()?;
84    if resource_kind != "function" {
85        return None;
86    }
87    let name_with_qualifier = parts.next()?;
88    Some(
89        name_with_qualifier
90            .split(':')
91            .next()
92            .unwrap_or(name_with_qualifier)
93            .to_string(),
94    )
95}
96
97/// Parse a raw query string into key/value pairs preserving repeats.
98/// `req.query_params` is a `HashMap<String, String>` and so collapses
99/// `tagKeys=A&tagKeys=B` to a single entry; this lets the
100/// `UntagResource` handler see every value the caller actually sent.
101/// Percent-decodes both key and value with the same lossy fallback the
102/// rest of the dispatch path uses.
103fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104    raw_query
105        .split('&')
106        .filter(|s| !s.is_empty())
107        .map(|pair| {
108            let mut it = pair.splitn(2, '=');
109            let k = it.next().unwrap_or("");
110            let v = it.next().unwrap_or("");
111            (decode_query_segment(k), decode_query_segment(v))
112        })
113        .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117    // Replace `+` with space to match `application/x-www-form-urlencoded`,
118    // then percent-decode. SDKs hit both shapes for path/query data.
119    let plus_decoded = s.replace('+', " ");
120    percent_encoding::percent_decode_str(&plus_decoded)
121        .decode_utf8_lossy()
122        .into_owned()
123}
124
125/// Build a fakecloud-hosted download URL for a layer version's ZIP. The URL
126/// is reachable on the same authority the SDK used for the original
127/// request, so test harnesses get a working `Location` they can `GET`
128/// directly instead of the placeholder AWS clients otherwise see.
129fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130    let host = req
131        .headers
132        .get(http::header::HOST)
133        .and_then(|h| h.to_str().ok())
134        .unwrap_or("localhost");
135    let scheme = req
136        .headers
137        .get("x-forwarded-proto")
138        .and_then(|h| h.to_str().ok())
139        .unwrap_or("http");
140    format!(
141        "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142    )
143}
144
145/// Build a fakecloud-hosted download URL for a function version's ZIP. AWS
146/// Toolkit (and `aws lambda get-function --query 'Code.Location'`) expects
147/// this to resolve to an actual ZIP body, so the URL points back at the
148/// running fakecloud instance on the same authority the SDK used.
149pub(crate) fn function_code_url(
150    req: &AwsRequest,
151    account_id: &str,
152    function_name: &str,
153    version_label: &str,
154) -> String {
155    let host = req
156        .headers
157        .get(http::header::HOST)
158        .and_then(|h| h.to_str().ok())
159        .unwrap_or("localhost");
160    let scheme = req
161        .headers
162        .get("x-forwarded-proto")
163        .and_then(|h| h.to_str().ok())
164        .unwrap_or("http");
165    let file = if version_label == "$LATEST" {
166        "latest.zip".to_string()
167    } else {
168        format!("{version_label}.zip")
169    };
170    format!("{scheme}://{host}/_fakecloud/lambda/function-code/{account_id}/{function_name}/{file}")
171}
172
173/// AWS layer-version ARN: `arn:aws:lambda:<region>:<account>:layer:<name>:<version>`.
174/// Returns `(account_id, layer_name, version)`. Used to resolve cross-account
175/// layer references attached to a function.
176pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
177    let parts: Vec<&str> = arn.split(':').collect();
178    if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
179        return None;
180    }
181    let account = parts[4].to_string();
182    let name = parts[6].to_string();
183    let version: i64 = parts[7].parse().ok()?;
184    Some((account, name, version))
185}
186
187/// Enum members of `com.amazonaws.lambda#Runtime`. Used by layer-listing
188/// ops to validate the `CompatibleRuntime` query filter without
189/// teaching every handler the full enum.
190const LAMBDA_RUNTIMES: &[&str] = &[
191    "nodejs",
192    "nodejs4.3",
193    "nodejs6.10",
194    "nodejs8.10",
195    "nodejs10.x",
196    "nodejs12.x",
197    "nodejs14.x",
198    "nodejs16.x",
199    "nodejs18.x",
200    "nodejs20.x",
201    "nodejs22.x",
202    "nodejs24.x",
203    "nodejs4.3-edge",
204    "java8",
205    "java8.al2",
206    "java11",
207    "java17",
208    "java21",
209    "java25",
210    "python2.7",
211    "python3.6",
212    "python3.7",
213    "python3.8",
214    "python3.9",
215    "python3.10",
216    "python3.11",
217    "python3.12",
218    "python3.13",
219    "python3.14",
220    "dotnetcore1.0",
221    "dotnetcore2.0",
222    "dotnetcore2.1",
223    "dotnetcore3.1",
224    "dotnet6",
225    "dotnet8",
226    "dotnet10",
227    "go1.x",
228    "ruby2.5",
229    "ruby2.7",
230    "ruby3.2",
231    "ruby3.3",
232    "ruby3.4",
233    "provided",
234    "provided.al2",
235    "provided.al2023",
236];
237
238/// Validate the `CompatibleArchitecture` and `CompatibleRuntime` query
239/// filters shared by `ListLayers` and `ListLayerVersions`.
240fn validate_layer_filters(req: &AwsRequest) -> Result<(), AwsServiceError> {
241    if let Some(arch) = req.query_params.get("CompatibleArchitecture") {
242        if arch != "x86_64" && arch != "arm64" {
243            return Err(AwsServiceError::aws_error(
244                StatusCode::BAD_REQUEST,
245                "InvalidParameterValueException",
246                format!(
247                    "Invalid CompatibleArchitecture value '{}'; expected 'x86_64' or 'arm64'",
248                    arch
249                ),
250            ));
251        }
252    }
253    if let Some(rt) = req.query_params.get("CompatibleRuntime") {
254        if !LAMBDA_RUNTIMES.contains(&rt.as_str()) {
255            return Err(AwsServiceError::aws_error(
256                StatusCode::BAD_REQUEST,
257                "InvalidParameterValueException",
258                format!("Invalid CompatibleRuntime value '{}'", rt),
259            ));
260        }
261    }
262    Ok(())
263}
264
265fn parse_qualifier(req: &AwsRequest) -> String {
266    req.query_params
267        .get("Qualifier")
268        .cloned()
269        .unwrap_or_else(|| "$LATEST".to_string())
270}
271
272/// Strict variant for operations whose Smithy model marks `Qualifier`
273/// `@required` (provisioned-concurrency, scaling-config). Returns
274/// `InvalidParameterValueException` when the query parameter is
275/// missing, matching AWS's wire response.
276fn require_qualifier(req: &AwsRequest) -> Result<String, AwsServiceError> {
277    req.query_params.get("Qualifier").cloned().ok_or_else(|| {
278        AwsServiceError::aws_error(
279            StatusCode::BAD_REQUEST,
280            "InvalidParameterValueException",
281            "Qualifier is required for this operation",
282        )
283    })
284}
285
286fn id_from_time(prefix: &str) -> String {
287    format!(
288        "{}{}",
289        prefix,
290        std::time::SystemTime::now()
291            .duration_since(std::time::UNIX_EPOCH)
292            .map(|d| d.as_nanos())
293            .unwrap_or(0)
294    )
295}
296
297impl LambdaService {
298    pub(crate) async fn handle_extra(
299        &self,
300        action: &str,
301        resource: Option<&str>,
302        req: &AwsRequest,
303    ) -> Result<AwsResponse, AwsServiceError> {
304        let aid = req.account_id.as_str();
305        let res = resource.unwrap_or("");
306        match action {
307            // Function lifecycle extras
308            "GetFunctionConfiguration" => self.get_function_configuration(res, aid, req),
309            "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
310            "UpdateFunctionCode" => self.update_function_code(res, req),
311            "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
312            "GetAccountSettings" => self.get_account_settings(aid),
313            "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
314            "InvokeWithResponseStream" => self.invoke_with_response_stream(res, aid, req).await,
315
316            // Versions
317            "ListVersionsByFunction" => self.list_versions_by_function(res, aid, req),
318
319            // Aliases
320            "CreateAlias" => self.create_alias(res, req),
321            "GetAlias" => self.get_alias(res, req),
322            "ListAliases" => self.list_aliases(res, aid),
323            "UpdateAlias" => self.update_alias(res, req),
324            "DeleteAlias" => self.delete_alias(res, req),
325
326            // Layers
327            "PublishLayerVersion" => self.publish_layer_version(res, req),
328            "GetLayerVersion" => self.get_layer_version(req),
329            "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
330            "ListLayers" => {
331                validate_layer_filters(req)?;
332                self.list_layers(aid)
333            }
334            "ListLayerVersions" => {
335                validate_layer_filters(req)?;
336                if res.is_empty() {
337                    return Err(missing("LayerName"));
338                }
339                // Smithy `LayerName.length 1..140`; ARN form is longer
340                // (~200) but the probe drives the bare-name path.
341                let limit = if res.starts_with("arn:") { 200 } else { 140 };
342                if res.chars().count() > limit {
343                    return Err(AwsServiceError::aws_error(
344                        StatusCode::BAD_REQUEST,
345                        "InvalidParameterValueException",
346                        "LayerName exceeds the 140-character maximum",
347                    ));
348                }
349                self.list_layer_versions(res, aid)
350            }
351            "DeleteLayerVersion" => self.delete_layer_version(req),
352            "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
353            "AddLayerVersionPermission" => self.add_layer_version_permission(req),
354            "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
355
356            // Function URL
357            "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
358            "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
359            "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
360            "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
361            "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
362
363            // Concurrency
364            "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
365            "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
366            "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
367            "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
368            "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
369            "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
370            "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
371
372            // Code signing
373            "CreateCodeSigningConfig" => self.create_code_signing_config(req),
374            "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
375            "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
376            "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
377            "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
378            "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
379            "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
380            "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
381            "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
382
383            // Event invoke
384            "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
385                self.put_function_event_invoke(res, req)
386            }
387            "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
388            "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
389            "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
390
391            // Runtime management
392            "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
393            "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
394
395            // Scaling
396            "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
397            "GetFunctionScalingConfig" => {
398                require_qualifier(req)?;
399                self.get_scaling_config(res, aid)
400            }
401
402            // Recursion
403            "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
404            "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
405
406            // Tags
407            "TagResource" => self.tag_resource(res, req),
408            "UntagResource" => self.untag_resource(res, req),
409            "ListTags" => self.list_tags(res, aid),
410
411            _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
412        }
413    }
414
415    fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
416    where
417        F: FnOnce(&LambdaState) -> R,
418    {
419        let accounts = self.state.read();
420        let empty = LambdaState::new(account_id, region);
421        let state = accounts.get(account_id).unwrap_or(&empty);
422        f(state)
423    }
424
425    // ── Function lifecycle extras ──
426
427    fn get_function_configuration(
428        &self,
429        function_name: &str,
430        account_id: &str,
431        req: &AwsRequest,
432    ) -> Result<AwsResponse, AwsServiceError> {
433        let region = self.region_for(account_id);
434        let qualifier = req.query_params.get("Qualifier").cloned();
435        self.with_state_read(account_id, &region, |state| {
436            let live = state
437                .functions
438                .get(function_name)
439                .ok_or_else(|| not_found("Function", function_name))?;
440            // Qualifier resolution mirrors GetFunction: $LATEST or omitted
441            // returns the live config; numeric / alias qualifiers resolve
442            // to a numbered snapshot.
443            let resolved = crate::service::resolve_qualifier_to_version(
444                state,
445                function_name,
446                qualifier.as_deref(),
447            );
448            let (func, version_label) = match resolved {
449                None => (live, "$LATEST".to_string()),
450                Some(v) => {
451                    let snap = state
452                        .function_version_snapshots
453                        .get(function_name)
454                        .and_then(|m| m.get(&v))
455                        .ok_or_else(|| not_found("Function", function_name))?;
456                    (snap, v)
457                }
458            };
459            let mut config = self.function_config_json(func);
460            config["Version"] = json!(version_label);
461            if version_label != "$LATEST" {
462                config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
463                config["MasterArn"] = json!(live.function_arn);
464            }
465            ok(config)
466        })
467    }
468
469    fn update_function_configuration(
470        &self,
471        function_name: &str,
472        req: &AwsRequest,
473    ) -> Result<AwsResponse, AwsServiceError> {
474        let body = body(req);
475        // Validate before taking the write lock and before any mutation:
476        // an invalid EphemeralStorage.Size on an otherwise valid request
477        // must not silently apply the surrounding fields.
478        let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
479            Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
480            None => None,
481        };
482        let mut accounts = self.state.write();
483        // Pre-resolve layer attachments before re-borrowing accounts mutably
484        // for the function. Layer ARNs may live in sibling accounts.
485        let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
486            let arns: Vec<String> = arr
487                .iter()
488                .filter_map(|v| v.as_str().map(String::from))
489                .collect();
490            resolve_layer_attachments(&accounts, arns)
491        });
492        let state = accounts.get_or_create(&req.account_id);
493        let func = state
494            .functions
495            .get_mut(function_name)
496            .ok_or_else(|| not_found("Function", function_name))?;
497        if let Some(handler) = body["Handler"].as_str() {
498            func.handler = handler.to_string();
499        }
500        if let Some(t) = body["Timeout"].as_i64() {
501            func.timeout = t;
502        }
503        if let Some(m) = body["MemorySize"].as_i64() {
504            func.memory_size = m;
505        }
506        if let Some(role) = body["Role"].as_str() {
507            func.role = role.to_string();
508        }
509        if let Some(desc) = body["Description"].as_str() {
510            func.description = desc.to_string();
511        }
512        if let Some(rt) = body["Runtime"].as_str() {
513            func.runtime = rt.to_string();
514        }
515        if let Some(env) = body["Environment"]["Variables"].as_object() {
516            func.environment = env
517                .iter()
518                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
519                .collect();
520        }
521        if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
522            func.tracing_mode = Some(mode.to_string());
523        }
524        if let Some(arn) = body["KMSKeyArn"].as_str() {
525            func.kms_key_arn = if arn.is_empty() {
526                None
527            } else {
528                Some(arn.to_string())
529            };
530        }
531        if let Some(size) = validated_ephemeral {
532            func.ephemeral_storage_size = Some(size);
533        }
534        if body["VpcConfig"].is_object() {
535            func.vpc_config = Some(body["VpcConfig"].clone());
536        }
537        if body["SnapStart"].is_object() {
538            func.snap_start = Some(body["SnapStart"].clone());
539        }
540        if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
541            func.dead_letter_config_arn = if arn.is_empty() {
542                None
543            } else {
544                Some(arn.to_string())
545            };
546        }
547        if let Some(fsc) = body["FileSystemConfigs"].as_array() {
548            func.file_system_configs = fsc.clone();
549        }
550        if body["LoggingConfig"].is_object() {
551            func.logging_config = Some(body["LoggingConfig"].clone());
552        }
553        if body["ImageConfig"].is_object() {
554            func.image_config = Some(body["ImageConfig"].clone());
555        }
556        if body["DurableConfig"].is_object() {
557            func.durable_config = Some(body["DurableConfig"].clone());
558        }
559        if let Some(attachments) = layer_attachments {
560            func.layers = attachments;
561        }
562        // RevisionId rotates only on real config changes — clients
563        // round-trip it through optimistic-concurrency calls, so we
564        // mint a fresh one here to signal "config changed".
565        func.revision_id = uuid::Uuid::new_v4().to_string();
566        func.last_modified = Utc::now();
567        ok(self.function_config_json(func))
568    }
569
570    fn update_function_code(
571        &self,
572        function_name: &str,
573        req: &AwsRequest,
574    ) -> Result<AwsResponse, AwsServiceError> {
575        let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
576
577        // ZipFile / ImageUri / S3Bucket+S3Key are mutually exclusive; AWS
578        // rejects the request when more than one is present. The handler
579        // picks one with a defined precedence: ZipFile, S3 descriptor,
580        // ImageUri.
581        let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
582            Some(b64) => Some(
583                base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
584                    |_| {
585                        AwsServiceError::aws_error(
586                            StatusCode::BAD_REQUEST,
587                            "InvalidParameterValueException",
588                            "Could not decode ZipFile: invalid base64",
589                        )
590                    },
591                )?,
592            ),
593            None => None,
594        };
595        let new_image_uri = body["ImageUri"].as_str().map(String::from);
596        // S3 source descriptor: when the caller didn't supply ZipFile or
597        // ImageUri, AWS expects S3Bucket+S3Key (S3ObjectVersion is
598        // optional). fakecloud doesn't fetch the object — CreateFunction
599        // takes the same shortcut — so we synthesize a fingerprint from
600        // the descriptor and use that as the new code identity. The hash
601        // and size still rotate when the descriptor differs, so
602        // optimistic-concurrency callers see RevisionId bump on real
603        // changes.
604        // S3-sourced code: if an S3Delivery hook is wired, fetch the
605        // actual object bytes and treat them as a ZIP upload. This
606        // matches real Lambda's S3-pull semantics. Fall back to the
607        // descriptor-hash shortcut when no hook is available.
608        let s3_fetched_zip: Option<Vec<u8>> = match (
609            body["S3Bucket"].as_str(),
610            body["S3Key"].as_str(),
611        ) {
612            (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
613                if let Some(s3) = &self.s3_delivery {
614                    match s3.get_object(&req.account_id, bucket, key) {
615                        Ok(bytes) => Some(bytes),
616                        Err(e) => {
617                            return Err(AwsServiceError::aws_error(
618                                StatusCode::BAD_REQUEST,
619                                "InvalidParameterValueException",
620                                format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
621                            ));
622                        }
623                    }
624                } else {
625                    None
626                }
627            }
628            _ => None,
629        };
630
631        let new_s3_descriptor: Option<Vec<u8>> =
632            match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
633                (Some(bucket), Some(key))
634                    if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
635                {
636                    let mut descriptor = serde_json::Map::new();
637                    descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
638                    descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
639                    if let Some(ver) = body["S3ObjectVersion"].as_str() {
640                        descriptor.insert(
641                            "S3ObjectVersion".to_string(),
642                            Value::String(ver.to_string()),
643                        );
644                    }
645                    Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
646                }
647                _ => None,
648            };
649        let new_zip = new_zip.or(s3_fetched_zip);
650        let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
651        let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
652        let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
653            arr.iter()
654                .filter_map(|v| v.as_str().map(String::from))
655                .collect()
656        });
657        let dry_run = body["DryRun"].as_bool().unwrap_or(false);
658        let publish = body["Publish"].as_bool().unwrap_or(false);
659
660        let mut accounts = self.state.write();
661        let state = accounts.get_or_create(&req.account_id);
662
663        // Function existence is the first check so callers always see
664        // ResourceNotFoundException 404 even when CSC / sig-profile
665        // fields would otherwise reject the request.
666        if !state.functions.contains_key(function_name) {
667            return Err(not_found("Function", function_name));
668        }
669
670        // Code-signing gate: if a CSC is bound to this function and at
671        // least one allowed publisher is registered, the caller must
672        // supply a SigningProfileVersionArn from that allow-list when
673        // the policy is Enforce. Warn just lets the upload through.
674        if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
675            let csc_id = extract_csc_id(&csc_arn);
676            if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
677                if !csc.allowed_publishers.is_empty()
678                    && csc
679                        .untrusted_artifact_action
680                        .eq_ignore_ascii_case("Enforce")
681                {
682                    let allowed = match supplied_signing_profile.as_deref() {
683                        Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
684                        None => false,
685                    };
686                    if !allowed {
687                        return Err(AwsServiceError::aws_error(
688                            StatusCode::BAD_REQUEST,
689                            "CodeVerificationFailedException",
690                            "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
691                        ));
692                    }
693                }
694            }
695        }
696
697        let func = state
698            .functions
699            .get_mut(function_name)
700            .ok_or_else(|| not_found("Function", function_name))?;
701
702        // Optimistic-concurrency precondition: when the caller supplies
703        // a RevisionId, it must match the function's current revision
704        // or AWS rejects with PreconditionFailedException 412.
705        if let Some(ref rev) = supplied_revision_id {
706            if rev != &func.revision_id {
707                return Err(AwsServiceError::aws_error(
708                    StatusCode::PRECONDITION_FAILED,
709                    "PreconditionFailedException",
710                    format!(
711                        "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
712                    ),
713                ));
714            }
715        }
716
717        // DryRun validates the request shape but never mutates state.
718        if dry_run {
719            return ok(self.function_config_json(func));
720        }
721
722        let mut changed = false;
723        if let Some(bytes) = new_zip {
724            // SHA256(base64) of the new code, matching CreateFunction's
725            // hash so GetFunction returns identical CodeSha256 round-trip.
726            let mut hasher = Sha256::new();
727            hasher.update(&bytes);
728            let hash = hasher.finalize();
729            let code_sha256 =
730                base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
731            if code_sha256 != func.code_sha256 {
732                changed = true;
733            }
734            func.code_size = bytes.len() as i64;
735            func.code_zip = Some(bytes);
736            func.code_sha256 = code_sha256;
737            func.image_uri = None;
738            func.package_type = "Zip".to_string();
739        } else if let Some(descriptor_bytes) = new_s3_descriptor {
740            // Hash the S3 descriptor JSON (S3Bucket+S3Key+optional
741            // S3ObjectVersion) so the same descriptor produces a stable
742            // sha and a different descriptor rotates RevisionId. This
743            // mirrors CreateFunction's behavior for S3-sourced code,
744            // which also fingerprints the descriptor rather than fetching
745            // S3 (real Lambda fetches asynchronously).
746            let mut hasher = Sha256::new();
747            hasher.update(&descriptor_bytes);
748            let hash = hasher.finalize();
749            let code_sha256 =
750                base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
751            if code_sha256 != func.code_sha256 {
752                changed = true;
753            }
754            func.code_size = descriptor_bytes.len() as i64;
755            // We don't have the object bytes — clear the cached zip so
756            // the runtime falls back to whatever it had previously cached
757            // rather than serving stale bytes for the new descriptor.
758            func.code_zip = None;
759            func.code_sha256 = code_sha256;
760            func.image_uri = None;
761            func.package_type = "Zip".to_string();
762        } else if let Some(uri) = new_image_uri {
763            if func.image_uri.as_deref() != Some(uri.as_str()) {
764                changed = true;
765            }
766            func.image_uri = Some(uri);
767            func.code_zip = None;
768            func.package_type = "Image".to_string();
769            // AWS reports CodeSize=0 and an empty CodeSha256 for
770            // image-package functions — the actual digest lives on the
771            // ECR side, not in the Lambda response.
772            func.code_size = 0;
773            func.code_sha256 = String::new();
774        }
775
776        if let Some(arns) = new_architectures {
777            if !arns.is_empty() && arns != func.architectures {
778                changed = true;
779                func.architectures = arns;
780            }
781        }
782
783        if let Some(arn) = supplied_signing_profile {
784            if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
785                changed = true;
786            }
787            func.signing_profile_version_arn = Some(arn);
788        }
789
790        // last_modified is bumped on every call (matches AWS), but
791        // revision_id only rotates when code or signing fields actually
792        // change so optimistic-concurrency callers don't see spurious
793        // updates from no-op pings.
794        func.last_modified = Utc::now();
795        if changed {
796            func.revision_id = uuid::Uuid::new_v4().to_string();
797        }
798        // A successful UpdateFunctionCode clears any prior failure
799        // reason — function_config_json elides the field when None,
800        // matching AWS's "no LastUpdateStatusReason on success" shape.
801        func.last_update_status_reason = None;
802        func.last_update_status_reason_code = None;
803
804        // Publish=true mints a new immutable version snapshot off the
805        // freshly updated $LATEST and returns that version's config.
806        if publish {
807            drop(accounts);
808            return self.publish_version(function_name, &req.account_id, req);
809        }
810
811        ok(self.function_config_json(func))
812    }
813
814    fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
815        let mut accounts = self.state.write();
816        let state = accounts.get_or_create(account_id);
817        let settings = state.account_settings.clone().unwrap_or(AccountSettings {
818            concurrent_executions: 1000,
819            code_size_zipped: 52_428_800,
820            code_size_unzipped: 262_144_000,
821            total_code_size: 80_530_636_800,
822        });
823        if state.account_settings.is_none() {
824            state.account_settings = Some(settings.clone());
825        }
826        // Real AccountUsage so clients monitoring deployment quotas see
827        // accurate numbers. AWS sums total code size across all functions.
828        let function_count = state.functions.len() as i64;
829        let total_code_size: i64 = state.functions.values().map(|f| f.code_size).sum();
830        ok(json!({
831            "AccountLimit": {
832                "ConcurrentExecutions": settings.concurrent_executions,
833                "CodeSizeZipped": settings.code_size_zipped,
834                "CodeSizeUnzipped": settings.code_size_unzipped,
835                "TotalCodeSize": settings.total_code_size,
836                "UnreservedConcurrentExecutions": settings.concurrent_executions,
837            },
838            "AccountUsage": {
839                "TotalCodeSize": total_code_size,
840                "FunctionCount": function_count,
841            },
842        }))
843    }
844
845    // ── Versions ──
846
847    fn list_versions_by_function(
848        &self,
849        function_name: &str,
850        account_id: &str,
851        req: &AwsRequest,
852    ) -> Result<AwsResponse, AwsServiceError> {
853        let region = self.region_for(account_id);
854        let max_items: usize = req
855            .query_params
856            .get("MaxItems")
857            .and_then(|v| v.parse::<usize>().ok())
858            .map(|n| n.clamp(1, 50))
859            .unwrap_or(50);
860        let marker = req.query_params.get("Marker").cloned();
861        self.with_state_read(account_id, &region, |state| {
862            let func = state
863                .functions
864                .get(function_name)
865                .ok_or_else(|| not_found("Function", function_name))?;
866            // AWS returns $LATEST first, then numbered versions in
867            // ascending order. Each numbered version is an immutable
868            // snapshot of the function at publish time.
869            let mut all: Vec<serde_json::Value> = Vec::new();
870            let mut latest = self.function_config_json(func);
871            latest["Version"] = json!("$LATEST");
872            all.push(latest);
873            let snapshots = state.function_version_snapshots.get(function_name);
874            if let Some(numbered) = state.function_versions.get(function_name) {
875                for v in numbered {
876                    let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
877                    let mut cfg = self.function_config_json(snap);
878                    cfg["Version"] = json!(v);
879                    cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
880                    cfg["MasterArn"] = json!(func.function_arn);
881                    all.push(cfg);
882                }
883            }
884            // Pagination: skip past Marker if supplied (Marker is the
885            // Version string of the entry to start *after*), then take
886            // up to MaxItems. Emit a NextMarker when truncated.
887            let start = match marker.as_deref() {
888                Some(m) => all
889                    .iter()
890                    .position(|v| v["Version"].as_str() == Some(m))
891                    .map(|i| i + 1)
892                    .unwrap_or(0),
893                None => 0,
894            };
895            let end = (start + max_items).min(all.len());
896            let page: Vec<serde_json::Value> = all[start..end].to_vec();
897            let mut body = json!({ "Versions": page });
898            if end < all.len() {
899                if let Some(last) = all[end - 1]["Version"].as_str() {
900                    body["NextMarker"] = json!(last);
901                }
902            }
903            ok(body)
904        })
905    }
906
907    // ── Aliases ──
908
909    fn alias_key(function: &str, alias: &str) -> String {
910        format!("{function}:{alias}")
911    }
912
913    fn create_alias(
914        &self,
915        function_name: &str,
916        req: &AwsRequest,
917    ) -> Result<AwsResponse, AwsServiceError> {
918        let body = body(req);
919        let name = body["Name"]
920            .as_str()
921            .ok_or_else(|| missing("Name"))?
922            .to_string();
923        let version = body["FunctionVersion"]
924            .as_str()
925            .unwrap_or("$LATEST")
926            .to_string();
927        let mut accounts = self.state.write();
928        let state = accounts.get_or_create(&req.account_id);
929        if !state.functions.contains_key(function_name) {
930            return Err(not_found("Function", function_name));
931        }
932        let alias_arn = format!(
933            "arn:aws:lambda:{}:{}:function:{}:{}",
934            state.region, state.account_id, function_name, name
935        );
936        let alias = FunctionAlias {
937            alias_arn: alias_arn.clone(),
938            name: name.clone(),
939            function_version: version,
940            description: body["Description"].as_str().unwrap_or("").to_string(),
941            revision_id: id_from_time("rev-"),
942            routing_config: body.get("RoutingConfig").cloned(),
943        };
944        state
945            .aliases
946            .insert(Self::alias_key(function_name, &name), alias.clone());
947        ok(serde_json::to_value(alias).unwrap_or_default())
948    }
949
950    fn get_alias(
951        &self,
952        function_name: &str,
953        req: &AwsRequest,
954    ) -> Result<AwsResponse, AwsServiceError> {
955        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
956        if alias_name.is_empty() {
957            return Err(missing("Name"));
958        }
959        if alias_name.chars().count() > 128 {
960            return Err(AwsServiceError::aws_error(
961                StatusCode::BAD_REQUEST,
962                "InvalidParameterValueException",
963                "Alias name exceeds the 128-character maximum",
964            ));
965        }
966        let region = self.region_for(&req.account_id);
967        self.with_state_read(&req.account_id, &region, |state| {
968            state
969                .aliases
970                .get(&Self::alias_key(function_name, &alias_name))
971                .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
972                .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
973        })
974    }
975
976    fn list_aliases(
977        &self,
978        function_name: &str,
979        account_id: &str,
980    ) -> Result<AwsResponse, AwsServiceError> {
981        let region = self.region_for(account_id);
982        self.with_state_read(account_id, &region, |state| {
983            let prefix = format!("{function_name}:");
984            let aliases: Vec<&FunctionAlias> = state
985                .aliases
986                .iter()
987                .filter(|(k, _)| k.starts_with(&prefix))
988                .map(|(_, v)| v)
989                .collect();
990            ok(json!({"Aliases": aliases}))
991        })
992    }
993
994    fn update_alias(
995        &self,
996        function_name: &str,
997        req: &AwsRequest,
998    ) -> Result<AwsResponse, AwsServiceError> {
999        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
1000        let body = body(req);
1001        let mut accounts = self.state.write();
1002        let state = accounts.get_or_create(&req.account_id);
1003        let key = Self::alias_key(function_name, &alias_name);
1004        let alias = state
1005            .aliases
1006            .get_mut(&key)
1007            .ok_or_else(|| not_found("Alias", &alias_name))?;
1008        if let Some(v) = body["FunctionVersion"].as_str() {
1009            alias.function_version = v.to_string();
1010        }
1011        if let Some(d) = body["Description"].as_str() {
1012            alias.description = d.to_string();
1013        }
1014        if let Some(rc) = body.get("RoutingConfig") {
1015            alias.routing_config = Some(rc.clone());
1016        }
1017        alias.revision_id = id_from_time("rev-");
1018        ok(serde_json::to_value(alias).unwrap_or_default())
1019    }
1020
1021    fn delete_alias(
1022        &self,
1023        function_name: &str,
1024        req: &AwsRequest,
1025    ) -> Result<AwsResponse, AwsServiceError> {
1026        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
1027        if alias_name.is_empty() {
1028            return Err(missing("Name"));
1029        }
1030        // Smithy `Alias.length 1..128`.
1031        if alias_name.chars().count() > 128 {
1032            return Err(AwsServiceError::aws_error(
1033                StatusCode::BAD_REQUEST,
1034                "InvalidParameterValueException",
1035                "Alias name exceeds the 128-character maximum",
1036            ));
1037        }
1038        let mut accounts = self.state.write();
1039        let state = accounts.get_or_create(&req.account_id);
1040        // `DeleteAlias` is idempotent on AWS — no `ResourceNotFoundException`
1041        // is declared on the operation. Removing without error matches
1042        // the live API.
1043        state
1044            .aliases
1045            .remove(&Self::alias_key(function_name, &alias_name));
1046        empty()
1047    }
1048
1049    // ── Layers ──
1050
1051    fn publish_layer_version(
1052        &self,
1053        layer_name: &str,
1054        req: &AwsRequest,
1055    ) -> Result<AwsResponse, AwsServiceError> {
1056        if layer_name.is_empty() {
1057            return Err(missing("LayerName"));
1058        }
1059        let limit = if layer_name.starts_with("arn:") {
1060            200
1061        } else {
1062            140
1063        };
1064        if layer_name.chars().count() > limit {
1065            return Err(AwsServiceError::aws_error(
1066                StatusCode::BAD_REQUEST,
1067                "InvalidParameterValueException",
1068                "LayerName exceeds the 140-character maximum",
1069            ));
1070        }
1071        let body = body(req);
1072        // `Content` is `@required` on `PublishLayerVersionRequest` —
1073        // reject when missing rather than silently publishing a
1074        // zero-byte layer.
1075        if body.get("Content").is_none() || body["Content"].is_null() {
1076            return Err(missing("Content"));
1077        }
1078        // `Description` is bound to Smithy's `Description` shape
1079        // (`length 0..256`). Reject overlong values up front.
1080        if let Some(desc) = body["Description"].as_str() {
1081            if desc.chars().count() > 256 {
1082                return Err(AwsServiceError::aws_error(
1083                    StatusCode::BAD_REQUEST,
1084                    "InvalidParameterValueException",
1085                    "Description exceeds the 256-character maximum",
1086                ));
1087            }
1088        }
1089        // `LicenseInfo` Smithy shape: `length 0..512`.
1090        if let Some(li) = body["LicenseInfo"].as_str() {
1091            if li.chars().count() > 512 {
1092                return Err(AwsServiceError::aws_error(
1093                    StatusCode::BAD_REQUEST,
1094                    "InvalidParameterValueException",
1095                    "LicenseInfo exceeds the 512-character maximum",
1096                ));
1097            }
1098        }
1099        let zip_bytes: Option<Vec<u8>> = match body["Content"]["ZipFile"].as_str() {
1100            Some(b64) => Some(
1101                base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
1102                    |_| {
1103                        AwsServiceError::aws_error(
1104                            StatusCode::BAD_REQUEST,
1105                            "InvalidParameterValueException",
1106                            "Could not decode Content.ZipFile: invalid base64",
1107                        )
1108                    },
1109                )?,
1110            ),
1111            None => None,
1112        };
1113        let (code_sha256, code_size) = match zip_bytes.as_deref() {
1114            Some(bytes) => {
1115                let mut hasher = Sha256::new();
1116                hasher.update(bytes);
1117                let digest = hasher.finalize();
1118                (
1119                    base64::Engine::encode(&base64::engine::general_purpose::STANDARD, digest),
1120                    bytes.len() as i64,
1121                )
1122            }
1123            None => (String::new(), 0),
1124        };
1125
1126        let mut accounts = self.state.write();
1127        let state = accounts.get_or_create(&req.account_id);
1128        let account_id = state.account_id.clone();
1129        let layer = state
1130            .layers
1131            .entry(layer_name.to_string())
1132            .or_insert_with(|| Layer {
1133                layer_name: layer_name.to_string(),
1134                layer_arn: format!(
1135                    "arn:aws:lambda:{}:{}:layer:{}",
1136                    state.region, state.account_id, layer_name
1137                ),
1138                versions: Vec::new(),
1139            });
1140        let next_version = (layer.versions.len() as i64) + 1;
1141        let version_arn = format!("{}:{}", layer.layer_arn, next_version);
1142        let runtimes: Vec<String> = body["CompatibleRuntimes"]
1143            .as_array()
1144            .map(|arr| {
1145                arr.iter()
1146                    .filter_map(|v| v.as_str().map(String::from))
1147                    .collect()
1148            })
1149            .unwrap_or_default();
1150        let architectures: Vec<String> = body["CompatibleArchitectures"]
1151            .as_array()
1152            .map(|arr| {
1153                arr.iter()
1154                    .filter_map(|v| v.as_str().map(String::from))
1155                    .collect()
1156            })
1157            .unwrap_or_default();
1158        let layer_arn = layer.layer_arn.clone();
1159        let lv = LayerVersion {
1160            version: next_version,
1161            layer_version_arn: version_arn.clone(),
1162            description: body["Description"].as_str().unwrap_or("").to_string(),
1163            created_date: Utc::now(),
1164            compatible_runtimes: runtimes,
1165            license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
1166            policy: None,
1167            code_zip: zip_bytes,
1168            code_sha256: code_sha256.clone(),
1169            code_size,
1170            compatible_architectures: architectures,
1171        };
1172        layer.versions.push(lv.clone());
1173        let location = layer_content_url(req, &account_id, layer_name, next_version);
1174        ok(json!({
1175            "LayerArn": layer_arn,
1176            "LayerVersionArn": version_arn,
1177            "Version": next_version,
1178            "Description": lv.description,
1179            "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1180            "CompatibleRuntimes": lv.compatible_runtimes,
1181            "CompatibleArchitectures": lv.compatible_architectures,
1182            "LicenseInfo": lv.license_info,
1183            "Content": {
1184                "Location": location,
1185                "CodeSha256": code_sha256,
1186                "CodeSize": code_size,
1187            },
1188        }))
1189    }
1190
1191    fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1192        let region = self.region_for(account_id);
1193        self.with_state_read(account_id, &region, |state| {
1194            let layers: Vec<Value> = state
1195                .layers
1196                .values()
1197                .map(|l| {
1198                    json!({
1199                        "LayerName": l.layer_name,
1200                        "LayerArn": l.layer_arn,
1201                        "LatestMatchingVersion": l.versions.last().map(|v| json!({
1202                            "LayerVersionArn": v.layer_version_arn,
1203                            "Version": v.version,
1204                            "Description": v.description,
1205                            "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1206                            "CompatibleRuntimes": v.compatible_runtimes,
1207                            "CompatibleArchitectures": v.compatible_architectures,
1208                        })),
1209                    })
1210                })
1211                .collect();
1212            ok(json!({"Layers": layers}))
1213        })
1214    }
1215
1216    fn list_layer_versions(
1217        &self,
1218        layer_name: &str,
1219        account_id: &str,
1220    ) -> Result<AwsResponse, AwsServiceError> {
1221        let region = self.region_for(account_id);
1222        self.with_state_read(account_id, &region, |state| {
1223            let versions: Vec<Value> = state
1224                .layers
1225                .get(layer_name)
1226                .map(|l| {
1227                    l.versions
1228                        .iter()
1229                        .map(|v| {
1230                            json!({
1231                                "LayerVersionArn": v.layer_version_arn,
1232                                "Version": v.version,
1233                                "Description": v.description,
1234                                "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1235                                "CompatibleRuntimes": v.compatible_runtimes,
1236                                "CompatibleArchitectures": v.compatible_architectures,
1237                                "LicenseInfo": v.license_info,
1238                            })
1239                        })
1240                        .collect()
1241                })
1242                .unwrap_or_default();
1243            ok(json!({"LayerVersions": versions}))
1244        })
1245    }
1246
1247    fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1248        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1249        let version: i64 = req
1250            .path_segments
1251            .get(4)
1252            .and_then(|s| s.parse().ok())
1253            .ok_or_else(|| missing("VersionNumber"))?;
1254        let region = self.region_for(&req.account_id);
1255        let location = layer_content_url(req, &req.account_id, &layer_name, version);
1256        self.with_state_read(&req.account_id, &region, |state| {
1257            state
1258                .layers
1259                .get(&layer_name)
1260                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1261                .map(|v| {
1262                    ok(json!({
1263                        "LayerVersionArn": v.layer_version_arn,
1264                        "Version": v.version,
1265                        "Description": v.description,
1266                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1267                        "CompatibleRuntimes": v.compatible_runtimes,
1268                        "CompatibleArchitectures": v.compatible_architectures,
1269                        "LicenseInfo": v.license_info,
1270                        "Content": {
1271                            "Location": location,
1272                            "CodeSha256": v.code_sha256,
1273                            "CodeSize": v.code_size,
1274                        },
1275                    }))
1276                })
1277                .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
1278        })
1279    }
1280
1281    fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1282        let arn = req
1283            .query_params
1284            .get("Arn")
1285            .or_else(|| req.query_params.get("find"))
1286            .cloned()
1287            .unwrap_or_default();
1288        let (account_id, layer_name, version) =
1289            parse_layer_version_arn(&arn).ok_or_else(|| missing("Arn"))?;
1290        let region = self.region_for(&account_id);
1291        let location = layer_content_url(req, &account_id, &layer_name, version);
1292        self.with_state_read(&account_id, &region, |state| {
1293            state
1294                .layers
1295                .get(&layer_name)
1296                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1297                .map(|v| {
1298                    ok(json!({
1299                        "LayerVersionArn": v.layer_version_arn,
1300                        "Version": v.version,
1301                        "Description": v.description,
1302                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1303                        "CompatibleRuntimes": v.compatible_runtimes,
1304                        "CompatibleArchitectures": v.compatible_architectures,
1305                        "LicenseInfo": v.license_info,
1306                        "Content": {
1307                            "Location": location,
1308                            "CodeSha256": v.code_sha256,
1309                            "CodeSize": v.code_size,
1310                        },
1311                    }))
1312                })
1313                .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
1314        })
1315    }
1316
1317    fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1318        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1319        if layer_name.is_empty() {
1320            return Err(missing("LayerName"));
1321        }
1322        let limit = if layer_name.starts_with("arn:") {
1323            200
1324        } else {
1325            140
1326        };
1327        if layer_name.chars().count() > limit {
1328            return Err(AwsServiceError::aws_error(
1329                StatusCode::BAD_REQUEST,
1330                "InvalidParameterValueException",
1331                "LayerName exceeds the 140-character maximum",
1332            ));
1333        }
1334        let version_raw = req.path_segments.get(4).map(|s| s.as_str()).unwrap_or("");
1335        if version_raw.is_empty() {
1336            return Err(missing("VersionNumber"));
1337        }
1338        let version: i64 = version_raw.parse().map_err(|_| {
1339            AwsServiceError::aws_error(
1340                StatusCode::BAD_REQUEST,
1341                "InvalidParameterValueException",
1342                "VersionNumber must be an integer",
1343            )
1344        })?;
1345        let mut accounts = self.state.write();
1346        let state = accounts.get_or_create(&req.account_id);
1347        if let Some(layer) = state.layers.get_mut(&layer_name) {
1348            layer.versions.retain(|v| v.version != version);
1349        }
1350        empty()
1351    }
1352
1353    fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1354        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1355        let version: i64 = req
1356            .path_segments
1357            .get(4)
1358            .and_then(|s| s.parse().ok())
1359            .unwrap_or(0);
1360        let region = self.region_for(&req.account_id);
1361        self.with_state_read(&req.account_id, &region, |state| {
1362            let policy = state
1363                .layers
1364                .get(&layer_name)
1365                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1366                .and_then(|v| v.policy.clone())
1367                .unwrap_or_else(|| "{}".to_string());
1368            ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
1369        })
1370    }
1371
1372    fn add_layer_version_permission(
1373        &self,
1374        req: &AwsRequest,
1375    ) -> Result<AwsResponse, AwsServiceError> {
1376        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1377        let version: i64 = req
1378            .path_segments
1379            .get(4)
1380            .and_then(|s| s.parse().ok())
1381            .unwrap_or(0);
1382        let body = body(req);
1383        let mut accounts = self.state.write();
1384        let state = accounts.get_or_create(&req.account_id);
1385        if let Some(layer) = state.layers.get_mut(&layer_name) {
1386            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1387                let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
1388                let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1389                let statements = policy_doc["Statement"].as_array_mut();
1390                let new_stmt = json!({
1391                    "Sid": body["StatementId"].as_str().unwrap_or("default"),
1392                    "Effect": "Allow",
1393                    "Principal": body["Principal"].clone(),
1394                    "Action": body["Action"].clone(),
1395                    "Resource": v.layer_version_arn.clone(),
1396                });
1397                if let Some(s) = statements {
1398                    s.push(new_stmt);
1399                } else {
1400                    policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
1401                }
1402                v.policy = Some(policy_doc.to_string());
1403            }
1404        }
1405        ok(json!({
1406            "Statement": body["StatementId"],
1407            "RevisionId": id_from_time("rev-"),
1408        }))
1409    }
1410
1411    fn remove_layer_version_permission(
1412        &self,
1413        req: &AwsRequest,
1414    ) -> Result<AwsResponse, AwsServiceError> {
1415        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1416        let version: i64 = req
1417            .path_segments
1418            .get(4)
1419            .and_then(|s| s.parse().ok())
1420            .unwrap_or(0);
1421        let sid = req.path_segments.get(6).cloned().unwrap_or_default();
1422        let mut accounts = self.state.write();
1423        let state = accounts.get_or_create(&req.account_id);
1424        if let Some(layer) = state.layers.get_mut(&layer_name) {
1425            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1426                if let Some(policy) = v.policy.clone() {
1427                    let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1428                    if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
1429                        stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
1430                    }
1431                    v.policy = Some(policy_doc.to_string());
1432                }
1433            }
1434        }
1435        empty()
1436    }
1437
1438    // ── Function URL ──
1439
1440    /// Render a `FunctionUrlConfig` into the AWS-shaped JSON the Lambda
1441    /// SDK expects (PascalCase keys, ISO-8601 timestamps). Direct
1442    /// `serde_json::to_value` would emit the struct's snake_case field
1443    /// names, which the SDK silently treats as missing fields — leaving
1444    /// `function_url()` returning an empty string.
1445    fn function_url_config_json(cfg: &FunctionUrlConfig) -> Value {
1446        let mut out = json!({
1447            "FunctionArn": cfg.function_arn,
1448            "FunctionUrl": cfg.function_url,
1449            "AuthType": cfg.auth_type,
1450            "InvokeMode": cfg.invoke_mode,
1451            "CreationTime": cfg.creation_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1452            "LastModifiedTime": cfg.last_modified_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1453        });
1454        if let Some(cors) = &cfg.cors {
1455            out["Cors"] = cors.clone();
1456        }
1457        out
1458    }
1459
1460    fn create_function_url_config(
1461        &self,
1462        function_name: &str,
1463        req: &AwsRequest,
1464    ) -> Result<AwsResponse, AwsServiceError> {
1465        let body = body(req);
1466        let auth_type = body["AuthType"]
1467            .as_str()
1468            .ok_or_else(|| missing("AuthType"))?
1469            .to_string();
1470        // `FunctionUrlAuthType` enum: `NONE` | `AWS_IAM`. Reject any
1471        // other value rather than persisting an unrecognised auth type.
1472        if auth_type != "NONE" && auth_type != "AWS_IAM" {
1473            return Err(AwsServiceError::aws_error(
1474                StatusCode::BAD_REQUEST,
1475                "InvalidParameterValueException",
1476                format!(
1477                    "Invalid AuthType value '{}'; expected 'NONE' or 'AWS_IAM'",
1478                    auth_type
1479                ),
1480            ));
1481        }
1482        let now = Utc::now();
1483        let mut accounts = self.state.write();
1484        let state = accounts.get_or_create(&req.account_id);
1485        if !state.functions.contains_key(function_name) {
1486            return Err(not_found("Function", function_name));
1487        }
1488        let function_arn = format!(
1489            "arn:aws:lambda:{}:{}:function:{}",
1490            state.region, state.account_id, function_name
1491        );
1492        let cfg = FunctionUrlConfig {
1493            function_arn: function_arn.clone(),
1494            function_url: format!(
1495                "https://{function_name}.lambda-url.{}.on.aws/",
1496                state.region
1497            ),
1498            auth_type: auth_type.clone(),
1499            cors: body.get("Cors").cloned(),
1500            creation_time: now,
1501            last_modified_time: now,
1502            invoke_mode: {
1503                let m = body["InvokeMode"]
1504                    .as_str()
1505                    .unwrap_or("BUFFERED")
1506                    .to_string();
1507                if m != "BUFFERED" && m != "RESPONSE_STREAM" {
1508                    return Err(AwsServiceError::aws_error(
1509                        StatusCode::BAD_REQUEST,
1510                        "InvalidParameterValueException",
1511                        format!(
1512                            "Invalid InvokeMode value '{}'; expected 'BUFFERED' or 'RESPONSE_STREAM'",
1513                            m
1514                        ),
1515                    ));
1516                }
1517                m
1518            },
1519        };
1520        state
1521            .function_url_configs
1522            .insert(function_name.to_string(), cfg.clone());
1523        // `CreateFunctionUrlConfigResponse` lacks `LastModifiedTime` —
1524        // that member only appears on `Get`/`Update` responses. Strip it
1525        // before returning so strict shape validators don't reject it.
1526        let mut created = Self::function_url_config_json(&cfg);
1527        if let Some(obj) = created.as_object_mut() {
1528            obj.remove("LastModifiedTime");
1529        }
1530        ok(created)
1531    }
1532
1533    fn get_function_url_config(
1534        &self,
1535        function_name: &str,
1536        account_id: &str,
1537    ) -> Result<AwsResponse, AwsServiceError> {
1538        let region = self.region_for(account_id);
1539        self.with_state_read(account_id, &region, |state| {
1540            state
1541                .function_url_configs
1542                .get(function_name)
1543                .map(|c| ok(Self::function_url_config_json(c)))
1544                .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
1545        })
1546    }
1547
1548    fn update_function_url_config(
1549        &self,
1550        function_name: &str,
1551        req: &AwsRequest,
1552    ) -> Result<AwsResponse, AwsServiceError> {
1553        let body = body(req);
1554        let mut accounts = self.state.write();
1555        let state = accounts.get_or_create(&req.account_id);
1556        let cfg = state
1557            .function_url_configs
1558            .get_mut(function_name)
1559            .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
1560        if let Some(a) = body["AuthType"].as_str() {
1561            cfg.auth_type = a.to_string();
1562        }
1563        if let Some(c) = body.get("Cors") {
1564            cfg.cors = Some(c.clone());
1565        }
1566        if let Some(m) = body["InvokeMode"].as_str() {
1567            cfg.invoke_mode = m.to_string();
1568        }
1569        cfg.last_modified_time = Utc::now();
1570        let snapshot = cfg.clone();
1571        ok(Self::function_url_config_json(&snapshot))
1572    }
1573
1574    fn delete_function_url_config(
1575        &self,
1576        function_name: &str,
1577        account_id: &str,
1578    ) -> Result<AwsResponse, AwsServiceError> {
1579        let mut accounts = self.state.write();
1580        let state = accounts.get_or_create(account_id);
1581        state.function_url_configs.remove(function_name);
1582        empty()
1583    }
1584
1585    fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1586        let region = self.region_for(account_id);
1587        self.with_state_read(account_id, &region, |state| {
1588            let configs: Vec<Value> = state
1589                .function_url_configs
1590                .values()
1591                .map(Self::function_url_config_json)
1592                .collect();
1593            ok(json!({"FunctionUrlConfigs": configs}))
1594        })
1595    }
1596
1597    // ── Concurrency ──
1598
1599    fn put_function_concurrency(
1600        &self,
1601        function_name: &str,
1602        req: &AwsRequest,
1603    ) -> Result<AwsResponse, AwsServiceError> {
1604        let body = body(req);
1605        let n = body["ReservedConcurrentExecutions"]
1606            .as_i64()
1607            .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
1608        // Smithy `range(min=0)` — negative values are invalid.
1609        if n < 0 {
1610            return Err(AwsServiceError::aws_error(
1611                StatusCode::BAD_REQUEST,
1612                "InvalidParameterValueException",
1613                format!("ReservedConcurrentExecutions must be >= 0 (got {})", n),
1614            ));
1615        }
1616        let mut accounts = self.state.write();
1617        let state = accounts.get_or_create(&req.account_id);
1618        state
1619            .function_concurrency
1620            .insert(function_name.to_string(), n);
1621        ok(json!({"ReservedConcurrentExecutions": n}))
1622    }
1623
1624    fn get_function_concurrency(
1625        &self,
1626        function_name: &str,
1627        account_id: &str,
1628    ) -> Result<AwsResponse, AwsServiceError> {
1629        let region = self.region_for(account_id);
1630        self.with_state_read(account_id, &region, |state| {
1631            let n = state
1632                .function_concurrency
1633                .get(function_name)
1634                .copied()
1635                .unwrap_or(0);
1636            ok(json!({"ReservedConcurrentExecutions": n}))
1637        })
1638    }
1639
1640    fn delete_function_concurrency(
1641        &self,
1642        function_name: &str,
1643        account_id: &str,
1644    ) -> Result<AwsResponse, AwsServiceError> {
1645        let mut accounts = self.state.write();
1646        let state = accounts.get_or_create(account_id);
1647        state.function_concurrency.remove(function_name);
1648        empty()
1649    }
1650
1651    fn pc_key(function: &str, qualifier: &str) -> String {
1652        format!("{function}:{qualifier}")
1653    }
1654
1655    fn put_provisioned_concurrency(
1656        &self,
1657        function_name: &str,
1658        req: &AwsRequest,
1659    ) -> Result<AwsResponse, AwsServiceError> {
1660        let body = body(req);
1661        let qualifier = require_qualifier(req)?;
1662        let requested = body["ProvisionedConcurrentExecutions"]
1663            .as_i64()
1664            .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
1665        // Smithy `range(min=1)` — zero and negatives are invalid.
1666        if requested < 1 {
1667            return Err(AwsServiceError::aws_error(
1668                StatusCode::BAD_REQUEST,
1669                "InvalidParameterValueException",
1670                format!(
1671                    "ProvisionedConcurrentExecutions must be >= 1 (got {})",
1672                    requested
1673                ),
1674            ));
1675        }
1676        let mut accounts = self.state.write();
1677        let state = accounts.get_or_create(&req.account_id);
1678        let cfg = ProvisionedConcurrencyConfig {
1679            requested,
1680            allocated: requested,
1681            status: "READY".to_string(),
1682            last_modified: Utc::now(),
1683        };
1684        state
1685            .provisioned_concurrency
1686            .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
1687        ok(json!({
1688            "RequestedProvisionedConcurrentExecutions": cfg.requested,
1689            "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1690            "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1691            "Status": cfg.status,
1692            "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1693        }))
1694    }
1695
1696    fn get_provisioned_concurrency(
1697        &self,
1698        function_name: &str,
1699        req: &AwsRequest,
1700    ) -> Result<AwsResponse, AwsServiceError> {
1701        let qualifier = require_qualifier(req)?;
1702        let region = self.region_for(&req.account_id);
1703        self.with_state_read(&req.account_id, &region, |state| {
1704            state
1705                .provisioned_concurrency
1706                .get(&Self::pc_key(function_name, &qualifier))
1707                .map(|cfg| ok(json!({
1708                    "RequestedProvisionedConcurrentExecutions": cfg.requested,
1709                    "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1710                    "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1711                    "Status": cfg.status,
1712                    "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1713                })))
1714                .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
1715        })
1716    }
1717
1718    fn delete_provisioned_concurrency(
1719        &self,
1720        function_name: &str,
1721        req: &AwsRequest,
1722    ) -> Result<AwsResponse, AwsServiceError> {
1723        let qualifier = require_qualifier(req)?;
1724        let mut accounts = self.state.write();
1725        let state = accounts.get_or_create(&req.account_id);
1726        state
1727            .provisioned_concurrency
1728            .remove(&Self::pc_key(function_name, &qualifier));
1729        empty()
1730    }
1731
1732    fn list_provisioned_concurrency(
1733        &self,
1734        function_name: &str,
1735        account_id: &str,
1736    ) -> Result<AwsResponse, AwsServiceError> {
1737        let region = self.region_for(account_id);
1738        self.with_state_read(account_id, &region, |state| {
1739            let prefix = format!("{function_name}:");
1740            let configs: Vec<Value> = state
1741                .provisioned_concurrency
1742                .iter()
1743                .filter(|(k, _)| k.starts_with(&prefix))
1744                .map(|(k, cfg)| {
1745                    let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
1746                    json!({
1747                        "FunctionArn": format!(
1748                            "arn:aws:lambda:{}:{}:function:{}:{}",
1749                            state.region, state.account_id, function_name, qualifier
1750                        ),
1751                        "Status": cfg.status,
1752                        "RequestedProvisionedConcurrentExecutions": cfg.requested,
1753                        "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1754                        "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1755                        "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1756                    })
1757                })
1758                .collect();
1759            ok(json!({"ProvisionedConcurrencyConfigs": configs}))
1760        })
1761    }
1762
1763    // ── Code signing ──
1764
1765    fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1766        let body = body(req);
1767        let mut accounts = self.state.write();
1768        let state = accounts.get_or_create(&req.account_id);
1769        let id = id_from_time("csc-");
1770        let arn = format!(
1771            "arn:aws:lambda:{}:{}:code-signing-config:{}",
1772            state.region, state.account_id, id
1773        );
1774        let publishers: Vec<String> = body
1775            .get("AllowedPublishers")
1776            .and_then(|v| v.get("SigningProfileVersionArns"))
1777            .and_then(|v| v.as_array())
1778            .map(|arr| {
1779                arr.iter()
1780                    .filter_map(|x| x.as_str().map(String::from))
1781                    .collect()
1782            })
1783            .unwrap_or_default();
1784        let csc = CodeSigningConfig {
1785            csc_id: id.clone(),
1786            csc_arn: arn,
1787            description: body["Description"].as_str().unwrap_or("").to_string(),
1788            allowed_publishers: publishers,
1789            untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
1790                .as_str()
1791                .unwrap_or("Warn")
1792                .to_string(),
1793            last_modified: Utc::now(),
1794        };
1795        state.code_signing_configs.insert(id, csc.clone());
1796        ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
1797    }
1798
1799    fn get_code_signing_config(
1800        &self,
1801        csc_id: &str,
1802        account_id: &str,
1803    ) -> Result<AwsResponse, AwsServiceError> {
1804        let id = extract_csc_id(csc_id);
1805        let region = self.region_for(account_id);
1806        self.with_state_read(account_id, &region, |state| {
1807            state
1808                .code_signing_configs
1809                .get(&id)
1810                .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
1811                .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
1812        })
1813    }
1814
1815    fn update_code_signing_config(
1816        &self,
1817        csc_id: &str,
1818        req: &AwsRequest,
1819    ) -> Result<AwsResponse, AwsServiceError> {
1820        let body = body(req);
1821        let mut accounts = self.state.write();
1822        let state = accounts.get_or_create(&req.account_id);
1823        let id = extract_csc_id(csc_id);
1824        let csc = state
1825            .code_signing_configs
1826            .get_mut(&id)
1827            .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1828        if let Some(d) = body["Description"].as_str() {
1829            csc.description = d.to_string();
1830        }
1831        if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1832        {
1833            csc.untrusted_artifact_action = action.to_string();
1834        }
1835        csc.last_modified = Utc::now();
1836        ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1837    }
1838
1839    fn delete_code_signing_config(
1840        &self,
1841        csc_id: &str,
1842        account_id: &str,
1843    ) -> Result<AwsResponse, AwsServiceError> {
1844        let id = extract_csc_id(csc_id);
1845        let mut accounts = self.state.write();
1846        let state = accounts.get_or_create(account_id);
1847        state.code_signing_configs.remove(&id);
1848        empty()
1849    }
1850
1851    fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1852        let region = self.region_for(account_id);
1853        self.with_state_read(account_id, &region, |state| {
1854            let cfgs: Vec<Value> = state
1855                .code_signing_configs
1856                .values()
1857                .map(code_signing_json)
1858                .collect();
1859            ok(json!({"CodeSigningConfigs": cfgs}))
1860        })
1861    }
1862
1863    fn put_function_code_signing(
1864        &self,
1865        function_name: &str,
1866        req: &AwsRequest,
1867    ) -> Result<AwsResponse, AwsServiceError> {
1868        let body = body(req);
1869        let csc_arn = body["CodeSigningConfigArn"]
1870            .as_str()
1871            .ok_or_else(|| missing("CodeSigningConfigArn"))?
1872            .to_string();
1873        // Smithy length bound: max 200. Reject overlong inputs rather
1874        // than persisting a malformed ARN.
1875        if csc_arn.chars().count() > 200 {
1876            return Err(AwsServiceError::aws_error(
1877                StatusCode::BAD_REQUEST,
1878                "InvalidParameterValueException",
1879                "CodeSigningConfigArn exceeds the 200-character maximum",
1880            ));
1881        }
1882        let mut accounts = self.state.write();
1883        let state = accounts.get_or_create(&req.account_id);
1884        state
1885            .function_code_signing
1886            .insert(function_name.to_string(), csc_arn.clone());
1887        ok(json!({
1888            "CodeSigningConfigArn": csc_arn,
1889            "FunctionName": function_name,
1890        }))
1891    }
1892
1893    fn get_function_code_signing(
1894        &self,
1895        function_name: &str,
1896        account_id: &str,
1897    ) -> Result<AwsResponse, AwsServiceError> {
1898        let region = self.region_for(account_id);
1899        self.with_state_read(account_id, &region, |state| {
1900            let arn = state
1901                .function_code_signing
1902                .get(function_name)
1903                .cloned()
1904                .unwrap_or_default();
1905            ok(json!({
1906                "CodeSigningConfigArn": arn,
1907                "FunctionName": function_name,
1908            }))
1909        })
1910    }
1911
1912    fn delete_function_code_signing(
1913        &self,
1914        function_name: &str,
1915        account_id: &str,
1916    ) -> Result<AwsResponse, AwsServiceError> {
1917        let mut accounts = self.state.write();
1918        let state = accounts.get_or_create(account_id);
1919        state.function_code_signing.remove(function_name);
1920        empty()
1921    }
1922
1923    fn list_functions_by_code_signing(
1924        &self,
1925        csc_id: &str,
1926        account_id: &str,
1927    ) -> Result<AwsResponse, AwsServiceError> {
1928        let id = extract_csc_id(csc_id);
1929        let region = self.region_for(account_id);
1930        self.with_state_read(account_id, &region, |state| {
1931            let funcs: Vec<&String> = state
1932                .function_code_signing
1933                .iter()
1934                .filter(|(_, v)| v.contains(&id))
1935                .map(|(k, _)| k)
1936                .collect();
1937            ok(json!({"FunctionArns": funcs}))
1938        })
1939    }
1940
1941    // ── Event invoke ──
1942
1943    fn ev_key(function: &str, qualifier: &str) -> String {
1944        format!("{function}:{qualifier}")
1945    }
1946
1947    fn put_function_event_invoke(
1948        &self,
1949        function_name: &str,
1950        req: &AwsRequest,
1951    ) -> Result<AwsResponse, AwsServiceError> {
1952        let body = body(req);
1953        let qualifier = parse_qualifier(req);
1954        let function_arn = format!(
1955            "arn:aws:lambda:{}:{}:function:{}",
1956            self.region_for(&req.account_id),
1957            req.account_id,
1958            function_name
1959        );
1960        // Validate Smithy ranges before persisting:
1961        //   MaximumEventAgeInSeconds: 60..=21600
1962        //   MaximumRetryAttempts:     0..=2
1963        let event_age = body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600);
1964        if !(60..=21600).contains(&event_age) {
1965            return Err(AwsServiceError::aws_error(
1966                StatusCode::BAD_REQUEST,
1967                "InvalidParameterValueException",
1968                format!(
1969                    "MaximumEventAgeInSeconds must be 60..21600 (got {})",
1970                    event_age
1971                ),
1972            ));
1973        }
1974        let retries = body["MaximumRetryAttempts"].as_i64().unwrap_or(2);
1975        if !(0..=2).contains(&retries) {
1976            return Err(AwsServiceError::aws_error(
1977                StatusCode::BAD_REQUEST,
1978                "InvalidParameterValueException",
1979                format!("MaximumRetryAttempts must be 0..2 (got {})", retries),
1980            ));
1981        }
1982        let cfg = EventInvokeConfig {
1983            function_arn: function_arn.clone(),
1984            maximum_event_age: event_age,
1985            maximum_retry_attempts: retries,
1986            destination_config: body.get("DestinationConfig").cloned(),
1987            last_modified: Utc::now(),
1988        };
1989        let mut accounts = self.state.write();
1990        let state = accounts.get_or_create(&req.account_id);
1991        state
1992            .event_invoke_configs
1993            .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1994        ok(event_invoke_json(&cfg))
1995    }
1996
1997    fn get_function_event_invoke(
1998        &self,
1999        function_name: &str,
2000        req: &AwsRequest,
2001    ) -> Result<AwsResponse, AwsServiceError> {
2002        let qualifier = parse_qualifier(req);
2003        let region = self.region_for(&req.account_id);
2004        self.with_state_read(&req.account_id, &region, |state| {
2005            state
2006                .event_invoke_configs
2007                .get(&Self::ev_key(function_name, &qualifier))
2008                .map(|c| ok(event_invoke_json(c)))
2009                .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
2010        })
2011    }
2012
2013    fn delete_function_event_invoke(
2014        &self,
2015        function_name: &str,
2016        req: &AwsRequest,
2017    ) -> Result<AwsResponse, AwsServiceError> {
2018        let qualifier = parse_qualifier(req);
2019        let mut accounts = self.state.write();
2020        let state = accounts.get_or_create(&req.account_id);
2021        state
2022            .event_invoke_configs
2023            .remove(&Self::ev_key(function_name, &qualifier));
2024        empty()
2025    }
2026
2027    fn list_function_event_invoke(
2028        &self,
2029        function_name: &str,
2030        account_id: &str,
2031    ) -> Result<AwsResponse, AwsServiceError> {
2032        let region = self.region_for(account_id);
2033        self.with_state_read(account_id, &region, |state| {
2034            let prefix = format!("{function_name}:");
2035            let configs: Vec<Value> = state
2036                .event_invoke_configs
2037                .iter()
2038                .filter(|(k, _)| k.starts_with(&prefix))
2039                .map(|(_, c)| event_invoke_json(c))
2040                .collect();
2041            ok(json!({"FunctionEventInvokeConfigs": configs}))
2042        })
2043    }
2044
2045    // ── Runtime management ──
2046
2047    fn put_runtime_management(
2048        &self,
2049        function_name: &str,
2050        req: &AwsRequest,
2051    ) -> Result<AwsResponse, AwsServiceError> {
2052        let body = body(req);
2053        let qualifier = parse_qualifier(req);
2054        // `UpdateRuntimeOn` is `@required` in the model; reject the
2055        // request rather than silently defaulting to `Auto`.
2056        let update_runtime_on = body["UpdateRuntimeOn"]
2057            .as_str()
2058            .ok_or_else(|| missing("UpdateRuntimeOn"))?
2059            .to_string();
2060        // `UpdateRuntimeOn` enum: Auto | Manual | FunctionUpdate.
2061        if !matches!(
2062            update_runtime_on.as_str(),
2063            "Auto" | "Manual" | "FunctionUpdate"
2064        ) {
2065            return Err(AwsServiceError::aws_error(
2066                StatusCode::BAD_REQUEST,
2067                "InvalidParameterValueException",
2068                format!(
2069                    "Invalid UpdateRuntimeOn value '{}'; expected 'Auto', 'Manual', or 'FunctionUpdate'",
2070                    update_runtime_on
2071                ),
2072            ));
2073        }
2074        let runtime_version_arn = body["RuntimeVersionArn"].as_str().unwrap_or("").to_string();
2075        // `RuntimeVersionArn` Smithy shape: length 26..2048. Empty
2076        // means "unset" (valid); any non-empty value must satisfy the
2077        // minimum.
2078        if !runtime_version_arn.is_empty()
2079            && (runtime_version_arn.chars().count() < 26
2080                || runtime_version_arn.chars().count() > 2048)
2081        {
2082            return Err(AwsServiceError::aws_error(
2083                StatusCode::BAD_REQUEST,
2084                "InvalidParameterValueException",
2085                "RuntimeVersionArn must be 26..2048 characters",
2086            ));
2087        }
2088        let cfg = RuntimeManagementConfig {
2089            update_runtime_on,
2090            runtime_version_arn,
2091        };
2092        let mut accounts = self.state.write();
2093        let state = accounts.get_or_create(&req.account_id);
2094        state
2095            .runtime_management
2096            .insert(format!("{function_name}:{qualifier}"), cfg.clone());
2097        ok(json!({
2098            "FunctionArn": Arn::new("lambda", &state.region, &state.account_id, &format!("function:{function_name}:{qualifier}")).to_string(),
2099            "UpdateRuntimeOn": cfg.update_runtime_on,
2100            "RuntimeVersionArn": cfg.runtime_version_arn,
2101        }))
2102    }
2103
2104    fn get_runtime_management(
2105        &self,
2106        function_name: &str,
2107        req: &AwsRequest,
2108    ) -> Result<AwsResponse, AwsServiceError> {
2109        let qualifier = parse_qualifier(req);
2110        let region = self.region_for(&req.account_id);
2111        self.with_state_read(&req.account_id, &region, |state| {
2112            let cfg = state
2113                .runtime_management
2114                .get(&format!("{function_name}:{qualifier}"))
2115                .cloned()
2116                .unwrap_or(RuntimeManagementConfig {
2117                    update_runtime_on: "Auto".to_string(),
2118                    runtime_version_arn: String::new(),
2119                });
2120            ok(json!({
2121                "FunctionArn": format!(
2122                    "arn:aws:lambda:{}:{}:function:{}:{}",
2123                    state.region, state.account_id, function_name, qualifier
2124                ),
2125                "UpdateRuntimeOn": cfg.update_runtime_on,
2126                "RuntimeVersionArn": cfg.runtime_version_arn,
2127            }))
2128        })
2129    }
2130
2131    // ── Scaling ──
2132
2133    fn put_scaling_config(
2134        &self,
2135        function_name: &str,
2136        req: &AwsRequest,
2137    ) -> Result<AwsResponse, AwsServiceError> {
2138        let _qualifier = require_qualifier(req)?;
2139        let body = body(req);
2140        let inner = body
2141            .get("FunctionScalingConfig")
2142            .cloned()
2143            .unwrap_or_else(|| json!({}));
2144        let cfg = FunctionScalingConfig {
2145            min_execution_environments: inner["MinExecutionEnvironments"].as_i64(),
2146            max_execution_environments: inner["MaxExecutionEnvironments"].as_i64(),
2147        };
2148        let mut accounts = self.state.write();
2149        let state = accounts.get_or_create(&req.account_id);
2150        state.scaling_configs.insert(function_name.to_string(), cfg);
2151        // `PutFunctionScalingConfigResponse` only carries `FunctionState`
2152        // (the post-update steady state). Pending → ready is instant in
2153        // fakecloud since there's no real fleet to scale.
2154        ok(json!({ "FunctionState": "Ready" }))
2155    }
2156
2157    fn get_scaling_config(
2158        &self,
2159        function_name: &str,
2160        account_id: &str,
2161    ) -> Result<AwsResponse, AwsServiceError> {
2162        // Caller validates `Qualifier` via `require_qualifier` before
2163        // delegating here; reads don't need it post-validation since
2164        // scaling config is per-function in fakecloud.
2165        let region = self.region_for(account_id);
2166        self.with_state_read(account_id, &region, |state| {
2167            let cfg = state
2168                .scaling_configs
2169                .get(function_name)
2170                .cloned()
2171                .unwrap_or_default();
2172            let mut applied = serde_json::Map::new();
2173            if let Some(v) = cfg.min_execution_environments {
2174                applied.insert("MinExecutionEnvironments".into(), json!(v));
2175            }
2176            if let Some(v) = cfg.max_execution_environments {
2177                applied.insert("MaxExecutionEnvironments".into(), json!(v));
2178            }
2179            let function_arn = format!(
2180                "arn:aws:lambda:{}:{}:function:{}",
2181                state.region, state.account_id, function_name
2182            );
2183            ok(json!({
2184                "FunctionArn": function_arn,
2185                "AppliedFunctionScalingConfig": Value::Object(applied.clone()),
2186                "RequestedFunctionScalingConfig": Value::Object(applied),
2187            }))
2188        })
2189    }
2190
2191    // ── Recursion ──
2192
2193    fn put_recursion_config(
2194        &self,
2195        function_name: &str,
2196        req: &AwsRequest,
2197    ) -> Result<AwsResponse, AwsServiceError> {
2198        let body = body(req);
2199        // `RecursiveLoop` is `@required` on the model — reject missing
2200        // values instead of defaulting silently to `Terminate`. The
2201        // enum admits only `Allow` and `Terminate`.
2202        let mode = body["RecursiveLoop"]
2203            .as_str()
2204            .ok_or_else(|| missing("RecursiveLoop"))?
2205            .to_string();
2206        if mode != "Allow" && mode != "Terminate" {
2207            return Err(AwsServiceError::aws_error(
2208                StatusCode::BAD_REQUEST,
2209                "InvalidParameterValueException",
2210                format!(
2211                    "Invalid RecursiveLoop value '{}'; expected 'Allow' or 'Terminate'",
2212                    mode
2213                ),
2214            ));
2215        }
2216        let mut accounts = self.state.write();
2217        let state = accounts.get_or_create(&req.account_id);
2218        state
2219            .recursion_configs
2220            .insert(function_name.to_string(), mode.clone());
2221        ok(json!({"RecursiveLoop": mode}))
2222    }
2223
2224    fn get_recursion_config(
2225        &self,
2226        function_name: &str,
2227        account_id: &str,
2228    ) -> Result<AwsResponse, AwsServiceError> {
2229        let region = self.region_for(account_id);
2230        self.with_state_read(account_id, &region, |state| {
2231            let mode = state
2232                .recursion_configs
2233                .get(function_name)
2234                .cloned()
2235                .unwrap_or_else(|| "Terminate".to_string());
2236            ok(json!({"RecursiveLoop": mode}))
2237        })
2238    }
2239
2240    // ── Tags ──
2241
2242    fn tag_resource(
2243        &self,
2244        resource_arn: &str,
2245        req: &AwsRequest,
2246    ) -> Result<AwsResponse, AwsServiceError> {
2247        let body = body(req);
2248        let new_tags: Vec<(String, String)> = body
2249            .get("Tags")
2250            .and_then(|v| v.as_object())
2251            .map(|m| {
2252                m.iter()
2253                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
2254                    .collect()
2255            })
2256            .unwrap_or_default();
2257        // SDKs URL-encode `:` in the path so the ARN arrives as
2258        // `arn%3Aaws%3Alambda%3A...`; decode before parsing.
2259        let resource_arn_decoded = decode_query_segment(resource_arn);
2260        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
2261            AwsServiceError::aws_error(
2262                StatusCode::BAD_REQUEST,
2263                "InvalidParameterValueException",
2264                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
2265            )
2266        })?;
2267        let mut accounts = self.state.write();
2268        let state = accounts.get_or_create(&req.account_id);
2269        let func = state.functions.get_mut(&name).ok_or_else(|| {
2270            AwsServiceError::aws_error(
2271                StatusCode::NOT_FOUND,
2272                "ResourceNotFoundException",
2273                format!("Function not found: {name}"),
2274            )
2275        })?;
2276        // Single source of truth: per-function `tags`. `GetFunction`,
2277        // `ListTagsForResource`, and `UntagResource` all read here.
2278        for (k, v) in new_tags {
2279            func.tags.insert(k, v);
2280        }
2281        empty()
2282    }
2283
2284    fn untag_resource(
2285        &self,
2286        resource_arn: &str,
2287        req: &AwsRequest,
2288    ) -> Result<AwsResponse, AwsServiceError> {
2289        // AWS sends keys as repeated `tagKeys=K1&tagKeys=K2` query
2290        // params per the Smithy model (`httpQuery: "tagKeys"`). The
2291        // dispatcher's deduplicated `query_params` HashMap collapses
2292        // repeats, so parse the raw query string for every occurrence.
2293        // Also accept `tagKeys.1=K1` / `tagKeys.member.1=K1` for SDKs
2294        // that serialize list params indexed-style.
2295        //
2296        // As a defensive fallback we also accept a JSON body of the
2297        // form `{"TagKeys": [...]}` / `{"tagKeys": [...]}` for clients
2298        // that mistakenly send the tag keys in the body. Query
2299        // parameters win when both are present, since query is the
2300        // AWS-canonical wire format.
2301        let mut keys: Vec<String> = Vec::new();
2302        for (k, v) in parse_query_pairs(&req.raw_query) {
2303            if k == "tagKeys" || k.starts_with("tagKeys.") {
2304                keys.push(v);
2305            }
2306        }
2307        if keys.is_empty() {
2308            let parsed = body(req);
2309            for field in ["TagKeys", "tagKeys"] {
2310                if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
2311                    for v in arr {
2312                        if let Some(s) = v.as_str() {
2313                            keys.push(s.to_string());
2314                        }
2315                    }
2316                    if !keys.is_empty() {
2317                        break;
2318                    }
2319                }
2320            }
2321        }
2322        let resource_arn_decoded = decode_query_segment(resource_arn);
2323        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
2324            AwsServiceError::aws_error(
2325                StatusCode::BAD_REQUEST,
2326                "InvalidParameterValueException",
2327                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
2328            )
2329        })?;
2330        let mut accounts = self.state.write();
2331        let state = accounts.get_or_create(&req.account_id);
2332        let func = state.functions.get_mut(&name).ok_or_else(|| {
2333            AwsServiceError::aws_error(
2334                StatusCode::NOT_FOUND,
2335                "ResourceNotFoundException",
2336                format!("Function not found: {name}"),
2337            )
2338        })?;
2339        for k in &keys {
2340            func.tags.remove(k);
2341        }
2342        empty()
2343    }
2344
2345    fn list_tags(
2346        &self,
2347        resource_arn: &str,
2348        account_id: &str,
2349    ) -> Result<AwsResponse, AwsServiceError> {
2350        let resource_arn_decoded = decode_query_segment(resource_arn);
2351        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
2352            AwsServiceError::aws_error(
2353                StatusCode::BAD_REQUEST,
2354                "InvalidParameterValueException",
2355                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
2356            )
2357        })?;
2358        let region = self.region_for(account_id);
2359        self.with_state_read(account_id, &region, |state| {
2360            let func = state.functions.get(&name).ok_or_else(|| {
2361                AwsServiceError::aws_error(
2362                    StatusCode::NOT_FOUND,
2363                    "ResourceNotFoundException",
2364                    format!("Function not found: {name}"),
2365                )
2366            })?;
2367            let tags: serde_json::Map<String, Value> = func
2368                .tags
2369                .iter()
2370                .map(|(k, v)| (k.clone(), Value::String(v.clone())))
2371                .collect();
2372            ok(json!({"Tags": tags}))
2373        })
2374    }
2375
2376    // ── Capacity providers ──
2377
2378    fn update_event_source_mapping_handler(
2379        &self,
2380        uuid: &str,
2381        req: &AwsRequest,
2382    ) -> Result<AwsResponse, AwsServiceError> {
2383        let body = body(req);
2384        let mut accounts = self.state.write();
2385        let state = accounts.get_or_create(&req.account_id);
2386        let esm = state
2387            .event_source_mappings
2388            .get_mut(uuid)
2389            .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
2390        if let Some(b) = body["BatchSize"].as_i64() {
2391            esm.batch_size = b;
2392        }
2393        if let Some(name) = body["FunctionName"].as_str() {
2394            esm.function_arn = format!(
2395                "arn:aws:lambda:{}:{}:function:{}",
2396                state.region, state.account_id, name
2397            );
2398        }
2399        if let Some(filters) = body
2400            .get("FilterCriteria")
2401            .and_then(|v| v.get("Filters"))
2402            .and_then(|v| v.as_array())
2403        {
2404            esm.filter_patterns = filters
2405                .iter()
2406                .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
2407                .collect();
2408        }
2409        if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
2410            esm.function_response_types = types
2411                .iter()
2412                .filter_map(|v| v.as_str().map(String::from))
2413                .collect();
2414        }
2415        if let Some(w) = body
2416            .get("MaximumBatchingWindowInSeconds")
2417            .and_then(|v| v.as_i64())
2418        {
2419            esm.maximum_batching_window_in_seconds = Some(w);
2420        }
2421        if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
2422            esm.parallelization_factor = Some(p);
2423        }
2424        if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
2425            esm.kms_key_arn = Some(s.to_string());
2426        }
2427        if let Some(mc) = body.get("MetricsConfig") {
2428            esm.metrics_config = Some(mc.clone());
2429        }
2430        if let Some(dc) = body.get("DestinationConfig") {
2431            esm.destination_config = Some(dc.clone());
2432        }
2433        if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
2434            esm.maximum_retry_attempts = Some(n);
2435        }
2436        if let Some(n) = body
2437            .get("MaximumRecordAgeInSeconds")
2438            .and_then(|v| v.as_i64())
2439        {
2440            esm.maximum_record_age_in_seconds = Some(n);
2441        }
2442        if let Some(b) = body
2443            .get("BisectBatchOnFunctionError")
2444            .and_then(|v| v.as_bool())
2445        {
2446            esm.bisect_batch_on_function_error = Some(b);
2447        }
2448        if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
2449            esm.tumbling_window_in_seconds = Some(n);
2450        }
2451        let mut body_json = json!({
2452            "UUID": esm.uuid,
2453            "FunctionArn": esm.function_arn,
2454            "EventSourceArn": esm.event_source_arn,
2455            "BatchSize": esm.batch_size,
2456            "State": "Enabled",
2457            "StateTransitionReason": "USER_INITIATED",
2458            "LastModified": chrono::Utc::now().timestamp() as f64,
2459        });
2460        let obj = body_json.as_object_mut().expect("json! built object");
2461        if !esm.filter_patterns.is_empty() {
2462            obj.insert(
2463                "FilterCriteria".into(),
2464                json!({
2465                    "Filters": esm
2466                        .filter_patterns
2467                        .iter()
2468                        .map(|p| json!({"Pattern": p}))
2469                        .collect::<Vec<_>>(),
2470                }),
2471            );
2472        }
2473        if !esm.function_response_types.is_empty() {
2474            obj.insert(
2475                "FunctionResponseTypes".into(),
2476                json!(esm.function_response_types),
2477            );
2478        }
2479        if let Some(w) = esm.maximum_batching_window_in_seconds {
2480            obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
2481        }
2482        if let Some(p) = esm.parallelization_factor {
2483            obj.insert("ParallelizationFactor".into(), json!(p));
2484        }
2485        ok(body_json)
2486    }
2487
2488    fn region_for(&self, account_id: &str) -> String {
2489        let accounts = self.state.read();
2490        accounts
2491            .get(account_id)
2492            .map(|s| s.region.clone())
2493            .unwrap_or_else(|| "us-east-1".to_string())
2494    }
2495
2496    /// `InvokeWithResponseStream` — invoke the function and serialize
2497    /// its response as a sequence of `application/vnd.amazon.eventstream`
2498    /// frames. AWS uses this protocol for response-streaming Lambda
2499    /// invocations (Node.js `awslambda.streamifyResponse`, Python
2500    /// streaming handlers, custom runtimes that flush mid-handler).
2501    ///
2502    /// On success: zero or more `PayloadChunk` events (one per chunk
2503    /// the RIE flushed) followed by an `InvokeComplete` event with
2504    /// `ErrorCode = null`. On a function error (uncaught exception in
2505    /// the handler) or an infrastructure error (timeout, container
2506    /// crash): an `InvokeComplete` with non-null `ErrorCode`/
2507    /// `ErrorDetails`. The HTTP status itself is always 200 — failures
2508    /// surface inside the trailing event, matching AWS.
2509    pub(crate) async fn invoke_with_response_stream(
2510        &self,
2511        function_name: &str,
2512        account_id: &str,
2513        req: &AwsRequest,
2514    ) -> Result<AwsResponse, AwsServiceError> {
2515        // Resolve the function under the same rules as buffered Invoke
2516        // — qualifier, version snapshots, attached layers, code-zip
2517        // presence — but without the InvocationType branch (streaming
2518        // is always synchronous).
2519        let qualifier = req.query_params.get("Qualifier").map(String::as_str);
2520
2521        let resolved_version: Option<String> = {
2522            let accounts = self.state.read();
2523            let empty = LambdaState::new(account_id, "");
2524            let state = accounts.get(account_id).unwrap_or(&empty);
2525            crate::service::resolve_qualifier_to_version(state, function_name, qualifier)
2526        };
2527        let executed_version = resolved_version
2528            .clone()
2529            .unwrap_or_else(|| "$LATEST".to_string());
2530
2531        let (func, layer_zips) = {
2532            let accounts = self.state.read();
2533            let empty = LambdaState::new(account_id, "");
2534            let state = accounts.get(account_id).unwrap_or(&empty);
2535            let func = match resolved_version.as_deref() {
2536                Some(v) => state
2537                    .function_version_snapshots
2538                    .get(function_name)
2539                    .and_then(|m| m.get(v))
2540                    .cloned()
2541                    .or_else(|| state.functions.get(function_name).cloned()),
2542                None => state.functions.get(function_name).cloned(),
2543            }
2544            .ok_or_else(|| {
2545                AwsServiceError::aws_error(
2546                    StatusCode::NOT_FOUND,
2547                    "ResourceNotFoundException",
2548                    format!(
2549                        "Function not found: arn:aws:lambda:{}:{}:function:{}",
2550                        state.region, state.account_id, function_name
2551                    ),
2552                )
2553            })?;
2554            let mut zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
2555            for attached in &func.layers {
2556                if let Some(b) =
2557                    parse_layer_version_arn(&attached.arn).and_then(|(acct, name, ver)| {
2558                        accounts
2559                            .get(&acct)
2560                            .and_then(|s| s.layers.get(&name))
2561                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
2562                            .and_then(|v| v.code_zip.clone())
2563                    })
2564                {
2565                    zips.push(b);
2566                }
2567            }
2568            (func, zips)
2569        };
2570
2571        if func.code_zip.is_none() && func.package_type != "Image" {
2572            return Err(AwsServiceError::aws_error(
2573                StatusCode::BAD_REQUEST,
2574                "InvalidParameterValueException",
2575                "Function has no deployment package",
2576            ));
2577        }
2578
2579        let runtime = self.runtime.as_ref().ok_or_else(|| {
2580            AwsServiceError::aws_error(
2581                StatusCode::INTERNAL_SERVER_ERROR,
2582                "ServiceException",
2583                "Docker/Podman is required for Lambda execution but is not available",
2584            )
2585        })?;
2586
2587        // Drive the streaming RIE call and assemble the eventstream
2588        // body. We buffer all frames before returning — `AwsResponse`
2589        // is byte-buffered today — but the chunk boundaries the RIE
2590        // flushed are preserved as separate `PayloadChunk` events, so
2591        // SDK parsers see exactly the streaming structure they expect.
2592        let mut frames: Vec<u8> = Vec::new();
2593        let invoke_result = runtime
2594            .invoke_streaming(&func, &req.body, &layer_zips)
2595            .await;
2596
2597        let (error_code, error_details) = match invoke_result {
2598            Ok(mut stream) => {
2599                let mut last_chunk: Option<bytes::Bytes> = None;
2600                let mut had_chunks = false;
2601                loop {
2602                    match stream.next_chunk().await {
2603                        Ok(Some(chunk)) => {
2604                            had_chunks = true;
2605                            frames.extend_from_slice(&crate::eventstream::payload_chunk_frame(
2606                                &chunk,
2607                            ));
2608                            last_chunk = Some(chunk);
2609                        }
2610                        Ok(None) => break,
2611                        Err(e) => {
2612                            tracing::error!(function = %function_name, error = %e, "Lambda streaming chunk read failed");
2613                            return Err(AwsServiceError::aws_error(
2614                                StatusCode::INTERNAL_SERVER_ERROR,
2615                                "ServiceException",
2616                                format!("Lambda streaming read failed: {e}"),
2617                            ));
2618                        }
2619                    }
2620                }
2621
2622                // The Lambda runtime returns 200 even when the user
2623                // handler threw, packing `errorMessage`/`errorType`
2624                // into the buffered body. Streaming handlers do the
2625                // same on the final chunk. Inspect the last chunk we
2626                // saw and surface that as a function error in the
2627                // terminal `InvokeComplete` event.
2628                let mut error: Option<(String, String)> = None;
2629                if had_chunks {
2630                    if let Some(bytes) = last_chunk {
2631                        if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
2632                            if let Some(obj) = v.as_object() {
2633                                if obj.contains_key("errorMessage") || obj.contains_key("errorType")
2634                                {
2635                                    let etype = obj
2636                                        .get("errorType")
2637                                        .and_then(|x| x.as_str())
2638                                        .unwrap_or("Runtime.Unknown")
2639                                        .to_string();
2640                                    let emsg = obj
2641                                        .get("errorMessage")
2642                                        .and_then(|x| x.as_str())
2643                                        .unwrap_or("function error")
2644                                        .to_string();
2645                                    error = Some((etype, emsg));
2646                                }
2647                            }
2648                        }
2649                    }
2650                }
2651                match error {
2652                    Some((code, details)) => (Some(code), Some(details)),
2653                    None => (None, None),
2654                }
2655            }
2656            Err(e) => {
2657                tracing::error!(function = %function_name, error = %e, "Lambda streaming invocation failed");
2658                (
2659                    Some("Runtime.InvocationFailure".to_string()),
2660                    Some(e.to_string()),
2661                )
2662            }
2663        };
2664
2665        frames.extend_from_slice(&crate::eventstream::invoke_complete_frame(
2666            error_code.as_deref(),
2667            error_details.as_deref(),
2668            "",
2669        ));
2670
2671        let mut resp = AwsResponse {
2672            status: StatusCode::OK,
2673            content_type: "application/vnd.amazon.eventstream".to_string(),
2674            body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(frames)),
2675            headers: http::HeaderMap::new(),
2676        };
2677        if let Ok(v) = http::HeaderValue::from_str(&executed_version) {
2678            resp.headers
2679                .insert(http::HeaderName::from_static("x-amz-executed-version"), v);
2680        }
2681        Ok(resp)
2682    }
2683}
2684
2685fn extract_csc_id(input: &str) -> String {
2686    // Decode percent encoding then take the segment after the last colon
2687    // (csc id), or treat as id if no colon present.
2688    let decoded = percent_decode(input);
2689    decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
2690}
2691
2692fn percent_decode(input: &str) -> String {
2693    let mut out = String::with_capacity(input.len());
2694    let bytes = input.as_bytes();
2695    let mut i = 0;
2696    while i < bytes.len() {
2697        if bytes[i] == b'%' && i + 2 < bytes.len() {
2698            let hi = (bytes[i + 1] as char).to_digit(16);
2699            let lo = (bytes[i + 2] as char).to_digit(16);
2700            if let (Some(h), Some(l)) = (hi, lo) {
2701                out.push(((h * 16 + l) as u8) as char);
2702                i += 3;
2703                continue;
2704            }
2705        }
2706        out.push(bytes[i] as char);
2707        i += 1;
2708    }
2709    out
2710}
2711
2712fn code_signing_json(c: &CodeSigningConfig) -> Value {
2713    json!({
2714        "CodeSigningConfigId": c.csc_id,
2715        "CodeSigningConfigArn": c.csc_arn,
2716        "Description": c.description,
2717        "AllowedPublishers": {
2718            "SigningProfileVersionArns": c.allowed_publishers,
2719        },
2720        "CodeSigningPolicies": {
2721            "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
2722        },
2723        "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
2724    })
2725}
2726
2727fn event_invoke_json(c: &EventInvokeConfig) -> Value {
2728    // `DestinationConfig` serialization mirrors the three AWS behaviours
2729    // documented in the Smithy `@examples` for Put/UpdateFunctionEventInvokeConfig
2730    // and asserted by the conformance round-trip strategy:
2731    //
2732    //   stored `None`          (input omitted entirely)
2733    //       -> emit `{OnSuccess:{}, OnFailure:{}}`  (Put example)
2734    //   stored `Some({})`      (caller sent `{}`)
2735    //       -> echo `{}` verbatim                    (round-trip strategy)
2736    //   stored `Some({half})`  (one side configured)
2737    //       -> backfill the other half as `{}`       (Update example)
2738    let destination = match &c.destination_config {
2739        None => json!({"OnSuccess": {}, "OnFailure": {}}),
2740        Some(v) if !v.is_object() => json!({}),
2741        Some(v) => {
2742            let mut map = v.as_object().cloned().unwrap_or_default();
2743            if !map.is_empty() {
2744                map.entry("OnSuccess".to_string()).or_insert(json!({}));
2745                map.entry("OnFailure".to_string()).or_insert(json!({}));
2746            }
2747            Value::Object(map)
2748        }
2749    };
2750    json!({
2751        "FunctionArn": c.function_arn,
2752        "MaximumEventAgeInSeconds": c.maximum_event_age,
2753        "MaximumRetryAttempts": c.maximum_retry_attempts,
2754        "DestinationConfig": destination,
2755        // `LastModified` is bound to Smithy's `Date` shape
2756        // (`type: timestamp`). The default REST-JSON serialization
2757        // for `timestamp` is an epoch-seconds float, which is what
2758        // `aws-sdk-lambda` deserializes; emitting an ISO string here
2759        // makes the SDK panic on `f64::from_str("2026-...")`.
2760        "LastModified": c
2761            .last_modified
2762            .timestamp_millis() as f64
2763            / 1000.0,
2764    })
2765}
2766
2767#[cfg(test)]
2768mod tests {
2769    use crate::service::LambdaService;
2770    use crate::state::{LambdaState, SharedLambdaState};
2771    use fakecloud_core::multi_account::MultiAccountState;
2772    use fakecloud_core::service::AwsRequest;
2773    use http::Method;
2774    use parking_lot::RwLock;
2775    use std::collections::HashMap;
2776    use std::sync::Arc;
2777
2778    fn svc() -> LambdaService {
2779        let state: SharedLambdaState = Arc::new(RwLock::new(
2780            MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
2781        ));
2782        LambdaService::new(state)
2783    }
2784
2785    fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
2786        AwsRequest {
2787            service: "lambda".to_string(),
2788            method: Method::POST,
2789            raw_path: format!("/{}", segs.join("/")),
2790            raw_query: String::new(),
2791            path_segments: segs.iter().map(|s| s.to_string()).collect(),
2792            query_params: HashMap::new(),
2793            headers: http::HeaderMap::new(),
2794            body: bytes::Bytes::from(body.to_string()),
2795            body_stream: parking_lot::Mutex::new(None),
2796            account_id: "000000000000".to_string(),
2797            region: "us-east-1".to_string(),
2798            request_id: "rid".to_string(),
2799            action: action.to_string(),
2800            is_query_protocol: false,
2801            access_key_id: None,
2802            principal: None,
2803        }
2804    }
2805
2806    async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
2807        let r = s.handle_extra(action, res, &req(action, body, segs)).await;
2808        match r {
2809            Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
2810            Err(e) => panic!("{action} failed: {e:?}"),
2811        }
2812    }
2813
2814    #[tokio::test]
2815    async fn read_only_listings_succeed_without_state() {
2816        let s = svc();
2817        run(&s, "GetAccountSettings", "", None, &[]).await;
2818        run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
2819        run(&s, "ListLayers", "", None, &[]).await;
2820        run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
2821    }
2822
2823    #[tokio::test]
2824    async fn layers_lifecycle() {
2825        let s = svc();
2826        run(
2827            &s,
2828            "PublishLayerVersion",
2829            r#"{"Content":{"ZipFile":""}}"#,
2830            Some("layer1"),
2831            &["2018-10-31", "layers", "layer1", "versions"],
2832        )
2833        .await;
2834        run(&s, "ListLayers", "", None, &[]).await;
2835        run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
2836    }
2837
2838    #[tokio::test]
2839    async fn code_signing_lifecycle() {
2840        let s = svc();
2841        run(
2842            &s,
2843            "CreateCodeSigningConfig",
2844            r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
2845            None,
2846            &[],
2847        )
2848        .await;
2849        run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
2850    }
2851}