Skip to main content

fakecloud_lambda/
extras.rs

1//! Lambda handlers added to close the conformance gap. Aliases, layers,
2//! function URL configs, concurrency, code signing, event invoke, runtime
3//! management, scaling, recursion, tagging, and account settings.
4
5use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15    AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16    FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17    ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20/// Resolve a layer-version ARN to its current `CodeSize` from the
21/// multi-account state. Returns 0 when the ARN is unparseable, when the
22/// referenced account/layer/version is unknown, or when the version was
23/// published without ZIP content (legacy snapshots).
24pub(crate) fn resolve_layer_attachments(
25    accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26    arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28    arns.into_iter()
29        .map(|arn| {
30            let code_size = parse_layer_version_arn(&arn)
31                .and_then(|(acct, name, ver)| {
32                    accounts
33                        .get(&acct)
34                        .and_then(|s| s.layers.get(&name))
35                        .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36                        .map(|v| v.code_size)
37                })
38                .unwrap_or(0);
39            AttachedLayer { arn, code_size }
40        })
41        .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45    AwsServiceError::aws_error(
46        StatusCode::BAD_REQUEST,
47        "InvalidParameterValueException",
48        format!("Missing required field: {name}"),
49    )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53    AwsServiceError::aws_error(
54        StatusCode::NOT_FOUND,
55        "ResourceNotFoundException",
56        format!("{entity} not found: {name}"),
57    )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61    Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65    Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69    serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72/// Extract the function name from a Lambda function ARN, ignoring any
73/// trailing `:version` / `:alias` qualifier. Returns `None` for ARNs
74/// that name a different resource type (event-source mapping,
75/// code-signing config, layer, …) — Lambda only supports tags on
76/// function ARNs in this implementation, so non-function ARNs are
77/// rejected by callers as `InvalidParameterValueException`.
78fn function_name_from_arn(arn: &str) -> Option<String> {
79    let rest = arn.strip_prefix("arn:aws:lambda:")?;
80    let mut parts = rest.splitn(5, ':');
81    let _region = parts.next()?;
82    let _account = parts.next()?;
83    let resource_kind = parts.next()?;
84    if resource_kind != "function" {
85        return None;
86    }
87    let name_with_qualifier = parts.next()?;
88    Some(
89        name_with_qualifier
90            .split(':')
91            .next()
92            .unwrap_or(name_with_qualifier)
93            .to_string(),
94    )
95}
96
97/// Parse a raw query string into key/value pairs preserving repeats.
98/// `req.query_params` is a `HashMap<String, String>` and so collapses
99/// `tagKeys=A&tagKeys=B` to a single entry; this lets the
100/// `UntagResource` handler see every value the caller actually sent.
101/// Percent-decodes both key and value with the same lossy fallback the
102/// rest of the dispatch path uses.
103fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104    raw_query
105        .split('&')
106        .filter(|s| !s.is_empty())
107        .map(|pair| {
108            let mut it = pair.splitn(2, '=');
109            let k = it.next().unwrap_or("");
110            let v = it.next().unwrap_or("");
111            (decode_query_segment(k), decode_query_segment(v))
112        })
113        .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117    // Replace `+` with space to match `application/x-www-form-urlencoded`,
118    // then percent-decode. SDKs hit both shapes for path/query data.
119    let plus_decoded = s.replace('+', " ");
120    percent_encoding::percent_decode_str(&plus_decoded)
121        .decode_utf8_lossy()
122        .into_owned()
123}
124
125/// Build a fakecloud-hosted download URL for a layer version's ZIP. The URL
126/// is reachable on the same authority the SDK used for the original
127/// request, so test harnesses get a working `Location` they can `GET`
128/// directly instead of the placeholder AWS clients otherwise see.
129fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130    let host = req
131        .headers
132        .get(http::header::HOST)
133        .and_then(|h| h.to_str().ok())
134        .unwrap_or("localhost");
135    let scheme = req
136        .headers
137        .get("x-forwarded-proto")
138        .and_then(|h| h.to_str().ok())
139        .unwrap_or("http");
140    format!(
141        "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142    )
143}
144
145/// AWS layer-version ARN: `arn:aws:lambda:<region>:<account>:layer:<name>:<version>`.
146/// Returns `(account_id, layer_name, version)`. Used to resolve cross-account
147/// layer references attached to a function.
148pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
149    let parts: Vec<&str> = arn.split(':').collect();
150    if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
151        return None;
152    }
153    let account = parts[4].to_string();
154    let name = parts[6].to_string();
155    let version: i64 = parts[7].parse().ok()?;
156    Some((account, name, version))
157}
158
159fn parse_qualifier(req: &AwsRequest) -> String {
160    req.query_params
161        .get("Qualifier")
162        .cloned()
163        .unwrap_or_else(|| "$LATEST".to_string())
164}
165
166fn id_from_time(prefix: &str) -> String {
167    format!(
168        "{}{}",
169        prefix,
170        std::time::SystemTime::now()
171            .duration_since(std::time::UNIX_EPOCH)
172            .map(|d| d.as_nanos())
173            .unwrap_or(0)
174    )
175}
176
177impl LambdaService {
178    pub(crate) async fn handle_extra(
179        &self,
180        action: &str,
181        resource: Option<&str>,
182        req: &AwsRequest,
183    ) -> Result<AwsResponse, AwsServiceError> {
184        let aid = req.account_id.as_str();
185        let res = resource.unwrap_or("");
186        match action {
187            // Function lifecycle extras
188            "GetFunctionConfiguration" => self.get_function_configuration(res, aid, req),
189            "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
190            "UpdateFunctionCode" => self.update_function_code(res, req),
191            "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
192            "GetAccountSettings" => self.get_account_settings(aid),
193            "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
194            "InvokeWithResponseStream" => self.invoke_with_response_stream(res, aid, req).await,
195
196            // Versions
197            "ListVersionsByFunction" => self.list_versions_by_function(res, aid, req),
198
199            // Aliases
200            "CreateAlias" => self.create_alias(res, req),
201            "GetAlias" => self.get_alias(res, req),
202            "ListAliases" => self.list_aliases(res, aid),
203            "UpdateAlias" => self.update_alias(res, req),
204            "DeleteAlias" => self.delete_alias(res, req),
205
206            // Layers
207            "PublishLayerVersion" => self.publish_layer_version(res, req),
208            "GetLayerVersion" => self.get_layer_version(req),
209            "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
210            "ListLayers" => self.list_layers(aid),
211            "ListLayerVersions" => self.list_layer_versions(res, aid),
212            "DeleteLayerVersion" => self.delete_layer_version(req),
213            "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
214            "AddLayerVersionPermission" => self.add_layer_version_permission(req),
215            "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
216
217            // Function URL
218            "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
219            "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
220            "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
221            "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
222            "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
223
224            // Concurrency
225            "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
226            "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
227            "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
228            "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
229            "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
230            "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
231            "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
232
233            // Code signing
234            "CreateCodeSigningConfig" => self.create_code_signing_config(req),
235            "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
236            "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
237            "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
238            "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
239            "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
240            "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
241            "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
242            "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
243
244            // Event invoke
245            "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
246                self.put_function_event_invoke(res, req)
247            }
248            "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
249            "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
250            "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
251
252            // Runtime management
253            "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
254            "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
255
256            // Scaling
257            "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
258            "GetFunctionScalingConfig" => self.get_scaling_config(res, aid),
259
260            // Recursion
261            "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
262            "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
263
264            // Tags
265            "TagResource" => self.tag_resource(res, req),
266            "UntagResource" => self.untag_resource(res, req),
267            "ListTags" => self.list_tags(res, aid),
268
269            _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
270        }
271    }
272
273    fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
274    where
275        F: FnOnce(&LambdaState) -> R,
276    {
277        let accounts = self.state.read();
278        let empty = LambdaState::new(account_id, region);
279        let state = accounts.get(account_id).unwrap_or(&empty);
280        f(state)
281    }
282
283    // ── Function lifecycle extras ──
284
285    fn get_function_configuration(
286        &self,
287        function_name: &str,
288        account_id: &str,
289        req: &AwsRequest,
290    ) -> Result<AwsResponse, AwsServiceError> {
291        let region = self.region_for(account_id);
292        let qualifier = req.query_params.get("Qualifier").cloned();
293        self.with_state_read(account_id, &region, |state| {
294            let live = state
295                .functions
296                .get(function_name)
297                .ok_or_else(|| not_found("Function", function_name))?;
298            // Qualifier resolution mirrors GetFunction: $LATEST or omitted
299            // returns the live config; numeric / alias qualifiers resolve
300            // to a numbered snapshot.
301            let resolved = crate::service::resolve_qualifier_to_version(
302                state,
303                function_name,
304                qualifier.as_deref(),
305            );
306            let (func, version_label) = match resolved {
307                None => (live, "$LATEST".to_string()),
308                Some(v) => {
309                    let snap = state
310                        .function_version_snapshots
311                        .get(function_name)
312                        .and_then(|m| m.get(&v))
313                        .ok_or_else(|| not_found("Function", function_name))?;
314                    (snap, v)
315                }
316            };
317            let mut config = self.function_config_json(func);
318            config["Version"] = json!(version_label);
319            if version_label != "$LATEST" {
320                config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
321                config["MasterArn"] = json!(live.function_arn);
322            }
323            ok(config)
324        })
325    }
326
327    fn update_function_configuration(
328        &self,
329        function_name: &str,
330        req: &AwsRequest,
331    ) -> Result<AwsResponse, AwsServiceError> {
332        let body = body(req);
333        // Validate before taking the write lock and before any mutation:
334        // an invalid EphemeralStorage.Size on an otherwise valid request
335        // must not silently apply the surrounding fields.
336        let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
337            Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
338            None => None,
339        };
340        let mut accounts = self.state.write();
341        // Pre-resolve layer attachments before re-borrowing accounts mutably
342        // for the function. Layer ARNs may live in sibling accounts.
343        let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
344            let arns: Vec<String> = arr
345                .iter()
346                .filter_map(|v| v.as_str().map(String::from))
347                .collect();
348            resolve_layer_attachments(&accounts, arns)
349        });
350        let state = accounts.get_or_create(&req.account_id);
351        let func = state
352            .functions
353            .get_mut(function_name)
354            .ok_or_else(|| not_found("Function", function_name))?;
355        if let Some(handler) = body["Handler"].as_str() {
356            func.handler = handler.to_string();
357        }
358        if let Some(t) = body["Timeout"].as_i64() {
359            func.timeout = t;
360        }
361        if let Some(m) = body["MemorySize"].as_i64() {
362            func.memory_size = m;
363        }
364        if let Some(role) = body["Role"].as_str() {
365            func.role = role.to_string();
366        }
367        if let Some(desc) = body["Description"].as_str() {
368            func.description = desc.to_string();
369        }
370        if let Some(rt) = body["Runtime"].as_str() {
371            func.runtime = rt.to_string();
372        }
373        if let Some(env) = body["Environment"]["Variables"].as_object() {
374            func.environment = env
375                .iter()
376                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
377                .collect();
378        }
379        if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
380            func.tracing_mode = Some(mode.to_string());
381        }
382        if let Some(arn) = body["KMSKeyArn"].as_str() {
383            func.kms_key_arn = if arn.is_empty() {
384                None
385            } else {
386                Some(arn.to_string())
387            };
388        }
389        if let Some(size) = validated_ephemeral {
390            func.ephemeral_storage_size = Some(size);
391        }
392        if body["VpcConfig"].is_object() {
393            func.vpc_config = Some(body["VpcConfig"].clone());
394        }
395        if body["SnapStart"].is_object() {
396            func.snap_start = Some(body["SnapStart"].clone());
397        }
398        if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
399            func.dead_letter_config_arn = if arn.is_empty() {
400                None
401            } else {
402                Some(arn.to_string())
403            };
404        }
405        if let Some(fsc) = body["FileSystemConfigs"].as_array() {
406            func.file_system_configs = fsc.clone();
407        }
408        if body["LoggingConfig"].is_object() {
409            func.logging_config = Some(body["LoggingConfig"].clone());
410        }
411        if body["ImageConfig"].is_object() {
412            func.image_config = Some(body["ImageConfig"].clone());
413        }
414        if let Some(attachments) = layer_attachments {
415            func.layers = attachments;
416        }
417        // RevisionId rotates only on real config changes — clients
418        // round-trip it through optimistic-concurrency calls, so we
419        // mint a fresh one here to signal "config changed".
420        func.revision_id = uuid::Uuid::new_v4().to_string();
421        func.last_modified = Utc::now();
422        ok(self.function_config_json(func))
423    }
424
425    fn update_function_code(
426        &self,
427        function_name: &str,
428        req: &AwsRequest,
429    ) -> Result<AwsResponse, AwsServiceError> {
430        let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
431
432        // ZipFile / ImageUri / S3Bucket+S3Key are mutually exclusive; AWS
433        // rejects the request when more than one is present. The handler
434        // picks one with a defined precedence: ZipFile, S3 descriptor,
435        // ImageUri.
436        let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
437            Some(b64) => Some(
438                base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
439                    |_| {
440                        AwsServiceError::aws_error(
441                            StatusCode::BAD_REQUEST,
442                            "InvalidParameterValueException",
443                            "Could not decode ZipFile: invalid base64",
444                        )
445                    },
446                )?,
447            ),
448            None => None,
449        };
450        let new_image_uri = body["ImageUri"].as_str().map(String::from);
451        // S3 source descriptor: when the caller didn't supply ZipFile or
452        // ImageUri, AWS expects S3Bucket+S3Key (S3ObjectVersion is
453        // optional). fakecloud doesn't fetch the object — CreateFunction
454        // takes the same shortcut — so we synthesize a fingerprint from
455        // the descriptor and use that as the new code identity. The hash
456        // and size still rotate when the descriptor differs, so
457        // optimistic-concurrency callers see RevisionId bump on real
458        // changes.
459        // S3-sourced code: if an S3Delivery hook is wired, fetch the
460        // actual object bytes and treat them as a ZIP upload. This
461        // matches real Lambda's S3-pull semantics. Fall back to the
462        // descriptor-hash shortcut when no hook is available.
463        let s3_fetched_zip: Option<Vec<u8>> = match (
464            body["S3Bucket"].as_str(),
465            body["S3Key"].as_str(),
466        ) {
467            (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
468                if let Some(s3) = &self.s3_delivery {
469                    match s3.get_object(&req.account_id, bucket, key) {
470                        Ok(bytes) => Some(bytes),
471                        Err(e) => {
472                            return Err(AwsServiceError::aws_error(
473                                StatusCode::BAD_REQUEST,
474                                "InvalidParameterValueException",
475                                format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
476                            ));
477                        }
478                    }
479                } else {
480                    None
481                }
482            }
483            _ => None,
484        };
485
486        let new_s3_descriptor: Option<Vec<u8>> =
487            match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
488                (Some(bucket), Some(key))
489                    if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
490                {
491                    let mut descriptor = serde_json::Map::new();
492                    descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
493                    descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
494                    if let Some(ver) = body["S3ObjectVersion"].as_str() {
495                        descriptor.insert(
496                            "S3ObjectVersion".to_string(),
497                            Value::String(ver.to_string()),
498                        );
499                    }
500                    Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
501                }
502                _ => None,
503            };
504        let new_zip = new_zip.or(s3_fetched_zip);
505        let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
506        let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
507        let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
508            arr.iter()
509                .filter_map(|v| v.as_str().map(String::from))
510                .collect()
511        });
512        let dry_run = body["DryRun"].as_bool().unwrap_or(false);
513        let publish = body["Publish"].as_bool().unwrap_or(false);
514
515        let mut accounts = self.state.write();
516        let state = accounts.get_or_create(&req.account_id);
517
518        // Function existence is the first check so callers always see
519        // ResourceNotFoundException 404 even when CSC / sig-profile
520        // fields would otherwise reject the request.
521        if !state.functions.contains_key(function_name) {
522            return Err(not_found("Function", function_name));
523        }
524
525        // Code-signing gate: if a CSC is bound to this function and at
526        // least one allowed publisher is registered, the caller must
527        // supply a SigningProfileVersionArn from that allow-list when
528        // the policy is Enforce. Warn just lets the upload through.
529        if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
530            let csc_id = extract_csc_id(&csc_arn);
531            if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
532                if !csc.allowed_publishers.is_empty()
533                    && csc
534                        .untrusted_artifact_action
535                        .eq_ignore_ascii_case("Enforce")
536                {
537                    let allowed = match supplied_signing_profile.as_deref() {
538                        Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
539                        None => false,
540                    };
541                    if !allowed {
542                        return Err(AwsServiceError::aws_error(
543                            StatusCode::BAD_REQUEST,
544                            "CodeVerificationFailedException",
545                            "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
546                        ));
547                    }
548                }
549            }
550        }
551
552        let func = state
553            .functions
554            .get_mut(function_name)
555            .ok_or_else(|| not_found("Function", function_name))?;
556
557        // Optimistic-concurrency precondition: when the caller supplies
558        // a RevisionId, it must match the function's current revision
559        // or AWS rejects with PreconditionFailedException 412.
560        if let Some(ref rev) = supplied_revision_id {
561            if rev != &func.revision_id {
562                return Err(AwsServiceError::aws_error(
563                    StatusCode::PRECONDITION_FAILED,
564                    "PreconditionFailedException",
565                    format!(
566                        "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
567                    ),
568                ));
569            }
570        }
571
572        // DryRun validates the request shape but never mutates state.
573        if dry_run {
574            return ok(self.function_config_json(func));
575        }
576
577        let mut changed = false;
578        if let Some(bytes) = new_zip {
579            // SHA256(base64) of the new code, matching CreateFunction's
580            // hash so GetFunction returns identical CodeSha256 round-trip.
581            let mut hasher = Sha256::new();
582            hasher.update(&bytes);
583            let hash = hasher.finalize();
584            let code_sha256 =
585                base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
586            if code_sha256 != func.code_sha256 {
587                changed = true;
588            }
589            func.code_size = bytes.len() as i64;
590            func.code_zip = Some(bytes);
591            func.code_sha256 = code_sha256;
592            func.image_uri = None;
593            func.package_type = "Zip".to_string();
594        } else if let Some(descriptor_bytes) = new_s3_descriptor {
595            // Hash the S3 descriptor JSON (S3Bucket+S3Key+optional
596            // S3ObjectVersion) so the same descriptor produces a stable
597            // sha and a different descriptor rotates RevisionId. This
598            // mirrors CreateFunction's behavior for S3-sourced code,
599            // which also fingerprints the descriptor rather than fetching
600            // S3 (real Lambda fetches asynchronously).
601            let mut hasher = Sha256::new();
602            hasher.update(&descriptor_bytes);
603            let hash = hasher.finalize();
604            let code_sha256 =
605                base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
606            if code_sha256 != func.code_sha256 {
607                changed = true;
608            }
609            func.code_size = descriptor_bytes.len() as i64;
610            // We don't have the object bytes — clear the cached zip so
611            // the runtime falls back to whatever it had previously cached
612            // rather than serving stale bytes for the new descriptor.
613            func.code_zip = None;
614            func.code_sha256 = code_sha256;
615            func.image_uri = None;
616            func.package_type = "Zip".to_string();
617        } else if let Some(uri) = new_image_uri {
618            if func.image_uri.as_deref() != Some(uri.as_str()) {
619                changed = true;
620            }
621            func.image_uri = Some(uri);
622            func.code_zip = None;
623            func.package_type = "Image".to_string();
624            // AWS reports CodeSize=0 and an empty CodeSha256 for
625            // image-package functions — the actual digest lives on the
626            // ECR side, not in the Lambda response.
627            func.code_size = 0;
628            func.code_sha256 = String::new();
629        }
630
631        if let Some(arns) = new_architectures {
632            if !arns.is_empty() && arns != func.architectures {
633                changed = true;
634                func.architectures = arns;
635            }
636        }
637
638        if let Some(arn) = supplied_signing_profile {
639            if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
640                changed = true;
641            }
642            func.signing_profile_version_arn = Some(arn);
643        }
644
645        // last_modified is bumped on every call (matches AWS), but
646        // revision_id only rotates when code or signing fields actually
647        // change so optimistic-concurrency callers don't see spurious
648        // updates from no-op pings.
649        func.last_modified = Utc::now();
650        if changed {
651            func.revision_id = uuid::Uuid::new_v4().to_string();
652        }
653        // A successful UpdateFunctionCode clears any prior failure
654        // reason — function_config_json elides the field when None,
655        // matching AWS's "no LastUpdateStatusReason on success" shape.
656        func.last_update_status_reason = None;
657        func.last_update_status_reason_code = None;
658
659        // Publish=true mints a new immutable version snapshot off the
660        // freshly updated $LATEST and returns that version's config.
661        if publish {
662            drop(accounts);
663            return self.publish_version(function_name, &req.account_id, req);
664        }
665
666        ok(self.function_config_json(func))
667    }
668
669    fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
670        let mut accounts = self.state.write();
671        let state = accounts.get_or_create(account_id);
672        let settings = state.account_settings.clone().unwrap_or(AccountSettings {
673            concurrent_executions: 1000,
674            code_size_zipped: 52_428_800,
675            code_size_unzipped: 262_144_000,
676            total_code_size: 80_530_636_800,
677        });
678        if state.account_settings.is_none() {
679            state.account_settings = Some(settings.clone());
680        }
681        // Real AccountUsage so clients monitoring deployment quotas see
682        // accurate numbers. AWS sums total code size across all functions.
683        let function_count = state.functions.len() as i64;
684        let total_code_size: i64 = state.functions.values().map(|f| f.code_size).sum();
685        ok(json!({
686            "AccountLimit": {
687                "ConcurrentExecutions": settings.concurrent_executions,
688                "CodeSizeZipped": settings.code_size_zipped,
689                "CodeSizeUnzipped": settings.code_size_unzipped,
690                "TotalCodeSize": settings.total_code_size,
691                "UnreservedConcurrentExecutions": settings.concurrent_executions,
692            },
693            "AccountUsage": {
694                "TotalCodeSize": total_code_size,
695                "FunctionCount": function_count,
696            },
697        }))
698    }
699
700    // ── Versions ──
701
702    fn list_versions_by_function(
703        &self,
704        function_name: &str,
705        account_id: &str,
706        req: &AwsRequest,
707    ) -> Result<AwsResponse, AwsServiceError> {
708        let region = self.region_for(account_id);
709        let max_items: usize = req
710            .query_params
711            .get("MaxItems")
712            .and_then(|v| v.parse::<usize>().ok())
713            .map(|n| n.clamp(1, 50))
714            .unwrap_or(50);
715        let marker = req.query_params.get("Marker").cloned();
716        self.with_state_read(account_id, &region, |state| {
717            let func = state
718                .functions
719                .get(function_name)
720                .ok_or_else(|| not_found("Function", function_name))?;
721            // AWS returns $LATEST first, then numbered versions in
722            // ascending order. Each numbered version is an immutable
723            // snapshot of the function at publish time.
724            let mut all: Vec<serde_json::Value> = Vec::new();
725            let mut latest = self.function_config_json(func);
726            latest["Version"] = json!("$LATEST");
727            all.push(latest);
728            let snapshots = state.function_version_snapshots.get(function_name);
729            if let Some(numbered) = state.function_versions.get(function_name) {
730                for v in numbered {
731                    let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
732                    let mut cfg = self.function_config_json(snap);
733                    cfg["Version"] = json!(v);
734                    cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
735                    cfg["MasterArn"] = json!(func.function_arn);
736                    all.push(cfg);
737                }
738            }
739            // Pagination: skip past Marker if supplied (Marker is the
740            // Version string of the entry to start *after*), then take
741            // up to MaxItems. Emit a NextMarker when truncated.
742            let start = match marker.as_deref() {
743                Some(m) => all
744                    .iter()
745                    .position(|v| v["Version"].as_str() == Some(m))
746                    .map(|i| i + 1)
747                    .unwrap_or(0),
748                None => 0,
749            };
750            let end = (start + max_items).min(all.len());
751            let page: Vec<serde_json::Value> = all[start..end].to_vec();
752            let mut body = json!({ "Versions": page });
753            if end < all.len() {
754                if let Some(last) = all[end - 1]["Version"].as_str() {
755                    body["NextMarker"] = json!(last);
756                }
757            }
758            ok(body)
759        })
760    }
761
762    // ── Aliases ──
763
764    fn alias_key(function: &str, alias: &str) -> String {
765        format!("{function}:{alias}")
766    }
767
768    fn create_alias(
769        &self,
770        function_name: &str,
771        req: &AwsRequest,
772    ) -> Result<AwsResponse, AwsServiceError> {
773        let body = body(req);
774        let name = body["Name"]
775            .as_str()
776            .ok_or_else(|| missing("Name"))?
777            .to_string();
778        let version = body["FunctionVersion"]
779            .as_str()
780            .unwrap_or("$LATEST")
781            .to_string();
782        let mut accounts = self.state.write();
783        let state = accounts.get_or_create(&req.account_id);
784        if !state.functions.contains_key(function_name) {
785            return Err(not_found("Function", function_name));
786        }
787        let alias_arn = format!(
788            "arn:aws:lambda:{}:{}:function:{}:{}",
789            state.region, state.account_id, function_name, name
790        );
791        let alias = FunctionAlias {
792            alias_arn: alias_arn.clone(),
793            name: name.clone(),
794            function_version: version,
795            description: body["Description"].as_str().unwrap_or("").to_string(),
796            revision_id: id_from_time("rev-"),
797            routing_config: body.get("RoutingConfig").cloned(),
798        };
799        state
800            .aliases
801            .insert(Self::alias_key(function_name, &name), alias.clone());
802        ok(serde_json::to_value(alias).unwrap_or_default())
803    }
804
805    fn get_alias(
806        &self,
807        function_name: &str,
808        req: &AwsRequest,
809    ) -> Result<AwsResponse, AwsServiceError> {
810        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
811        let region = self.region_for(&req.account_id);
812        self.with_state_read(&req.account_id, &region, |state| {
813            state
814                .aliases
815                .get(&Self::alias_key(function_name, &alias_name))
816                .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
817                .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
818        })
819    }
820
821    fn list_aliases(
822        &self,
823        function_name: &str,
824        account_id: &str,
825    ) -> Result<AwsResponse, AwsServiceError> {
826        let region = self.region_for(account_id);
827        self.with_state_read(account_id, &region, |state| {
828            let prefix = format!("{function_name}:");
829            let aliases: Vec<&FunctionAlias> = state
830                .aliases
831                .iter()
832                .filter(|(k, _)| k.starts_with(&prefix))
833                .map(|(_, v)| v)
834                .collect();
835            ok(json!({"Aliases": aliases}))
836        })
837    }
838
839    fn update_alias(
840        &self,
841        function_name: &str,
842        req: &AwsRequest,
843    ) -> Result<AwsResponse, AwsServiceError> {
844        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
845        let body = body(req);
846        let mut accounts = self.state.write();
847        let state = accounts.get_or_create(&req.account_id);
848        let key = Self::alias_key(function_name, &alias_name);
849        let alias = state
850            .aliases
851            .get_mut(&key)
852            .ok_or_else(|| not_found("Alias", &alias_name))?;
853        if let Some(v) = body["FunctionVersion"].as_str() {
854            alias.function_version = v.to_string();
855        }
856        if let Some(d) = body["Description"].as_str() {
857            alias.description = d.to_string();
858        }
859        if let Some(rc) = body.get("RoutingConfig") {
860            alias.routing_config = Some(rc.clone());
861        }
862        alias.revision_id = id_from_time("rev-");
863        ok(serde_json::to_value(alias).unwrap_or_default())
864    }
865
866    fn delete_alias(
867        &self,
868        function_name: &str,
869        req: &AwsRequest,
870    ) -> Result<AwsResponse, AwsServiceError> {
871        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
872        let mut accounts = self.state.write();
873        let state = accounts.get_or_create(&req.account_id);
874        state
875            .aliases
876            .remove(&Self::alias_key(function_name, &alias_name));
877        empty()
878    }
879
880    // ── Layers ──
881
882    fn publish_layer_version(
883        &self,
884        layer_name: &str,
885        req: &AwsRequest,
886    ) -> Result<AwsResponse, AwsServiceError> {
887        let body = body(req);
888        let zip_bytes: Option<Vec<u8>> = match body["Content"]["ZipFile"].as_str() {
889            Some(b64) => Some(
890                base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
891                    |_| {
892                        AwsServiceError::aws_error(
893                            StatusCode::BAD_REQUEST,
894                            "InvalidParameterValueException",
895                            "Could not decode Content.ZipFile: invalid base64",
896                        )
897                    },
898                )?,
899            ),
900            None => None,
901        };
902        let (code_sha256, code_size) = match zip_bytes.as_deref() {
903            Some(bytes) => {
904                let mut hasher = Sha256::new();
905                hasher.update(bytes);
906                let digest = hasher.finalize();
907                (
908                    base64::Engine::encode(&base64::engine::general_purpose::STANDARD, digest),
909                    bytes.len() as i64,
910                )
911            }
912            None => (String::new(), 0),
913        };
914
915        let mut accounts = self.state.write();
916        let state = accounts.get_or_create(&req.account_id);
917        let account_id = state.account_id.clone();
918        let layer = state
919            .layers
920            .entry(layer_name.to_string())
921            .or_insert_with(|| Layer {
922                layer_name: layer_name.to_string(),
923                layer_arn: format!(
924                    "arn:aws:lambda:{}:{}:layer:{}",
925                    state.region, state.account_id, layer_name
926                ),
927                versions: Vec::new(),
928            });
929        let next_version = (layer.versions.len() as i64) + 1;
930        let version_arn = format!("{}:{}", layer.layer_arn, next_version);
931        let runtimes: Vec<String> = body["CompatibleRuntimes"]
932            .as_array()
933            .map(|arr| {
934                arr.iter()
935                    .filter_map(|v| v.as_str().map(String::from))
936                    .collect()
937            })
938            .unwrap_or_default();
939        let architectures: Vec<String> = body["CompatibleArchitectures"]
940            .as_array()
941            .map(|arr| {
942                arr.iter()
943                    .filter_map(|v| v.as_str().map(String::from))
944                    .collect()
945            })
946            .unwrap_or_default();
947        let layer_arn = layer.layer_arn.clone();
948        let lv = LayerVersion {
949            version: next_version,
950            layer_version_arn: version_arn.clone(),
951            description: body["Description"].as_str().unwrap_or("").to_string(),
952            created_date: Utc::now(),
953            compatible_runtimes: runtimes,
954            license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
955            policy: None,
956            code_zip: zip_bytes,
957            code_sha256: code_sha256.clone(),
958            code_size,
959            compatible_architectures: architectures,
960        };
961        layer.versions.push(lv.clone());
962        let location = layer_content_url(req, &account_id, layer_name, next_version);
963        ok(json!({
964            "LayerArn": layer_arn,
965            "LayerVersionArn": version_arn,
966            "Version": next_version,
967            "Description": lv.description,
968            "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
969            "CompatibleRuntimes": lv.compatible_runtimes,
970            "CompatibleArchitectures": lv.compatible_architectures,
971            "LicenseInfo": lv.license_info,
972            "Content": {
973                "Location": location,
974                "CodeSha256": code_sha256,
975                "CodeSize": code_size,
976            },
977        }))
978    }
979
980    fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
981        let region = self.region_for(account_id);
982        self.with_state_read(account_id, &region, |state| {
983            let layers: Vec<Value> = state
984                .layers
985                .values()
986                .map(|l| {
987                    json!({
988                        "LayerName": l.layer_name,
989                        "LayerArn": l.layer_arn,
990                        "LatestMatchingVersion": l.versions.last().map(|v| json!({
991                            "LayerVersionArn": v.layer_version_arn,
992                            "Version": v.version,
993                            "Description": v.description,
994                            "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
995                            "CompatibleRuntimes": v.compatible_runtimes,
996                            "CompatibleArchitectures": v.compatible_architectures,
997                        })),
998                    })
999                })
1000                .collect();
1001            ok(json!({"Layers": layers}))
1002        })
1003    }
1004
1005    fn list_layer_versions(
1006        &self,
1007        layer_name: &str,
1008        account_id: &str,
1009    ) -> Result<AwsResponse, AwsServiceError> {
1010        let region = self.region_for(account_id);
1011        self.with_state_read(account_id, &region, |state| {
1012            let versions: Vec<Value> = state
1013                .layers
1014                .get(layer_name)
1015                .map(|l| {
1016                    l.versions
1017                        .iter()
1018                        .map(|v| {
1019                            json!({
1020                                "LayerVersionArn": v.layer_version_arn,
1021                                "Version": v.version,
1022                                "Description": v.description,
1023                                "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1024                                "CompatibleRuntimes": v.compatible_runtimes,
1025                                "CompatibleArchitectures": v.compatible_architectures,
1026                                "LicenseInfo": v.license_info,
1027                            })
1028                        })
1029                        .collect()
1030                })
1031                .unwrap_or_default();
1032            ok(json!({"LayerVersions": versions}))
1033        })
1034    }
1035
1036    fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1037        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1038        let version: i64 = req
1039            .path_segments
1040            .get(4)
1041            .and_then(|s| s.parse().ok())
1042            .ok_or_else(|| missing("VersionNumber"))?;
1043        let region = self.region_for(&req.account_id);
1044        let location = layer_content_url(req, &req.account_id, &layer_name, version);
1045        self.with_state_read(&req.account_id, &region, |state| {
1046            state
1047                .layers
1048                .get(&layer_name)
1049                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1050                .map(|v| {
1051                    ok(json!({
1052                        "LayerVersionArn": v.layer_version_arn,
1053                        "Version": v.version,
1054                        "Description": v.description,
1055                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1056                        "CompatibleRuntimes": v.compatible_runtimes,
1057                        "CompatibleArchitectures": v.compatible_architectures,
1058                        "LicenseInfo": v.license_info,
1059                        "Content": {
1060                            "Location": location,
1061                            "CodeSha256": v.code_sha256,
1062                            "CodeSize": v.code_size,
1063                        },
1064                    }))
1065                })
1066                .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
1067        })
1068    }
1069
1070    fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1071        let arn = req
1072            .query_params
1073            .get("Arn")
1074            .or_else(|| req.query_params.get("find"))
1075            .cloned()
1076            .unwrap_or_default();
1077        let (account_id, layer_name, version) =
1078            parse_layer_version_arn(&arn).ok_or_else(|| missing("Arn"))?;
1079        let region = self.region_for(&account_id);
1080        let location = layer_content_url(req, &account_id, &layer_name, version);
1081        self.with_state_read(&account_id, &region, |state| {
1082            state
1083                .layers
1084                .get(&layer_name)
1085                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1086                .map(|v| {
1087                    ok(json!({
1088                        "LayerVersionArn": v.layer_version_arn,
1089                        "Version": v.version,
1090                        "Description": v.description,
1091                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1092                        "CompatibleRuntimes": v.compatible_runtimes,
1093                        "CompatibleArchitectures": v.compatible_architectures,
1094                        "LicenseInfo": v.license_info,
1095                        "Content": {
1096                            "Location": location,
1097                            "CodeSha256": v.code_sha256,
1098                            "CodeSize": v.code_size,
1099                        },
1100                    }))
1101                })
1102                .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
1103        })
1104    }
1105
1106    fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1108        let version: i64 = req
1109            .path_segments
1110            .get(4)
1111            .and_then(|s| s.parse().ok())
1112            .unwrap_or(0);
1113        let mut accounts = self.state.write();
1114        let state = accounts.get_or_create(&req.account_id);
1115        if let Some(layer) = state.layers.get_mut(&layer_name) {
1116            layer.versions.retain(|v| v.version != version);
1117        }
1118        empty()
1119    }
1120
1121    fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1122        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1123        let version: i64 = req
1124            .path_segments
1125            .get(4)
1126            .and_then(|s| s.parse().ok())
1127            .unwrap_or(0);
1128        let region = self.region_for(&req.account_id);
1129        self.with_state_read(&req.account_id, &region, |state| {
1130            let policy = state
1131                .layers
1132                .get(&layer_name)
1133                .and_then(|l| l.versions.iter().find(|v| v.version == version))
1134                .and_then(|v| v.policy.clone())
1135                .unwrap_or_else(|| "{}".to_string());
1136            ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
1137        })
1138    }
1139
1140    fn add_layer_version_permission(
1141        &self,
1142        req: &AwsRequest,
1143    ) -> Result<AwsResponse, AwsServiceError> {
1144        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1145        let version: i64 = req
1146            .path_segments
1147            .get(4)
1148            .and_then(|s| s.parse().ok())
1149            .unwrap_or(0);
1150        let body = body(req);
1151        let mut accounts = self.state.write();
1152        let state = accounts.get_or_create(&req.account_id);
1153        if let Some(layer) = state.layers.get_mut(&layer_name) {
1154            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1155                let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
1156                let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1157                let statements = policy_doc["Statement"].as_array_mut();
1158                let new_stmt = json!({
1159                    "Sid": body["StatementId"].as_str().unwrap_or("default"),
1160                    "Effect": "Allow",
1161                    "Principal": body["Principal"].clone(),
1162                    "Action": body["Action"].clone(),
1163                    "Resource": v.layer_version_arn.clone(),
1164                });
1165                if let Some(s) = statements {
1166                    s.push(new_stmt);
1167                } else {
1168                    policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
1169                }
1170                v.policy = Some(policy_doc.to_string());
1171            }
1172        }
1173        ok(json!({
1174            "Statement": body["StatementId"],
1175            "RevisionId": id_from_time("rev-"),
1176        }))
1177    }
1178
1179    fn remove_layer_version_permission(
1180        &self,
1181        req: &AwsRequest,
1182    ) -> Result<AwsResponse, AwsServiceError> {
1183        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1184        let version: i64 = req
1185            .path_segments
1186            .get(4)
1187            .and_then(|s| s.parse().ok())
1188            .unwrap_or(0);
1189        let sid = req.path_segments.get(6).cloned().unwrap_or_default();
1190        let mut accounts = self.state.write();
1191        let state = accounts.get_or_create(&req.account_id);
1192        if let Some(layer) = state.layers.get_mut(&layer_name) {
1193            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1194                if let Some(policy) = v.policy.clone() {
1195                    let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1196                    if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
1197                        stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
1198                    }
1199                    v.policy = Some(policy_doc.to_string());
1200                }
1201            }
1202        }
1203        empty()
1204    }
1205
1206    // ── Function URL ──
1207
1208    /// Render a `FunctionUrlConfig` into the AWS-shaped JSON the Lambda
1209    /// SDK expects (PascalCase keys, ISO-8601 timestamps). Direct
1210    /// `serde_json::to_value` would emit the struct's snake_case field
1211    /// names, which the SDK silently treats as missing fields — leaving
1212    /// `function_url()` returning an empty string.
1213    fn function_url_config_json(cfg: &FunctionUrlConfig) -> Value {
1214        let mut out = json!({
1215            "FunctionArn": cfg.function_arn,
1216            "FunctionUrl": cfg.function_url,
1217            "AuthType": cfg.auth_type,
1218            "InvokeMode": cfg.invoke_mode,
1219            "CreationTime": cfg.creation_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1220            "LastModifiedTime": cfg.last_modified_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1221        });
1222        if let Some(cors) = &cfg.cors {
1223            out["Cors"] = cors.clone();
1224        }
1225        out
1226    }
1227
1228    fn create_function_url_config(
1229        &self,
1230        function_name: &str,
1231        req: &AwsRequest,
1232    ) -> Result<AwsResponse, AwsServiceError> {
1233        let body = body(req);
1234        let auth_type = body["AuthType"].as_str().unwrap_or("NONE").to_string();
1235        let now = Utc::now();
1236        let mut accounts = self.state.write();
1237        let state = accounts.get_or_create(&req.account_id);
1238        if !state.functions.contains_key(function_name) {
1239            return Err(not_found("Function", function_name));
1240        }
1241        let function_arn = format!(
1242            "arn:aws:lambda:{}:{}:function:{}",
1243            state.region, state.account_id, function_name
1244        );
1245        let cfg = FunctionUrlConfig {
1246            function_arn: function_arn.clone(),
1247            function_url: format!(
1248                "https://{function_name}.lambda-url.{}.on.aws/",
1249                state.region
1250            ),
1251            auth_type: auth_type.clone(),
1252            cors: body.get("Cors").cloned(),
1253            creation_time: now,
1254            last_modified_time: now,
1255            invoke_mode: body["InvokeMode"]
1256                .as_str()
1257                .unwrap_or("BUFFERED")
1258                .to_string(),
1259        };
1260        state
1261            .function_url_configs
1262            .insert(function_name.to_string(), cfg.clone());
1263        ok(Self::function_url_config_json(&cfg))
1264    }
1265
1266    fn get_function_url_config(
1267        &self,
1268        function_name: &str,
1269        account_id: &str,
1270    ) -> Result<AwsResponse, AwsServiceError> {
1271        let region = self.region_for(account_id);
1272        self.with_state_read(account_id, &region, |state| {
1273            state
1274                .function_url_configs
1275                .get(function_name)
1276                .map(|c| ok(Self::function_url_config_json(c)))
1277                .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
1278        })
1279    }
1280
1281    fn update_function_url_config(
1282        &self,
1283        function_name: &str,
1284        req: &AwsRequest,
1285    ) -> Result<AwsResponse, AwsServiceError> {
1286        let body = body(req);
1287        let mut accounts = self.state.write();
1288        let state = accounts.get_or_create(&req.account_id);
1289        let cfg = state
1290            .function_url_configs
1291            .get_mut(function_name)
1292            .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
1293        if let Some(a) = body["AuthType"].as_str() {
1294            cfg.auth_type = a.to_string();
1295        }
1296        if let Some(c) = body.get("Cors") {
1297            cfg.cors = Some(c.clone());
1298        }
1299        if let Some(m) = body["InvokeMode"].as_str() {
1300            cfg.invoke_mode = m.to_string();
1301        }
1302        cfg.last_modified_time = Utc::now();
1303        let snapshot = cfg.clone();
1304        ok(Self::function_url_config_json(&snapshot))
1305    }
1306
1307    fn delete_function_url_config(
1308        &self,
1309        function_name: &str,
1310        account_id: &str,
1311    ) -> Result<AwsResponse, AwsServiceError> {
1312        let mut accounts = self.state.write();
1313        let state = accounts.get_or_create(account_id);
1314        state.function_url_configs.remove(function_name);
1315        empty()
1316    }
1317
1318    fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1319        let region = self.region_for(account_id);
1320        self.with_state_read(account_id, &region, |state| {
1321            let configs: Vec<Value> = state
1322                .function_url_configs
1323                .values()
1324                .map(Self::function_url_config_json)
1325                .collect();
1326            ok(json!({"FunctionUrlConfigs": configs}))
1327        })
1328    }
1329
1330    // ── Concurrency ──
1331
1332    fn put_function_concurrency(
1333        &self,
1334        function_name: &str,
1335        req: &AwsRequest,
1336    ) -> Result<AwsResponse, AwsServiceError> {
1337        let body = body(req);
1338        let n = body["ReservedConcurrentExecutions"]
1339            .as_i64()
1340            .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
1341        let mut accounts = self.state.write();
1342        let state = accounts.get_or_create(&req.account_id);
1343        state
1344            .function_concurrency
1345            .insert(function_name.to_string(), n);
1346        ok(json!({"ReservedConcurrentExecutions": n}))
1347    }
1348
1349    fn get_function_concurrency(
1350        &self,
1351        function_name: &str,
1352        account_id: &str,
1353    ) -> Result<AwsResponse, AwsServiceError> {
1354        let region = self.region_for(account_id);
1355        self.with_state_read(account_id, &region, |state| {
1356            let n = state
1357                .function_concurrency
1358                .get(function_name)
1359                .copied()
1360                .unwrap_or(0);
1361            ok(json!({"ReservedConcurrentExecutions": n}))
1362        })
1363    }
1364
1365    fn delete_function_concurrency(
1366        &self,
1367        function_name: &str,
1368        account_id: &str,
1369    ) -> Result<AwsResponse, AwsServiceError> {
1370        let mut accounts = self.state.write();
1371        let state = accounts.get_or_create(account_id);
1372        state.function_concurrency.remove(function_name);
1373        empty()
1374    }
1375
1376    fn pc_key(function: &str, qualifier: &str) -> String {
1377        format!("{function}:{qualifier}")
1378    }
1379
1380    fn put_provisioned_concurrency(
1381        &self,
1382        function_name: &str,
1383        req: &AwsRequest,
1384    ) -> Result<AwsResponse, AwsServiceError> {
1385        let body = body(req);
1386        let qualifier = parse_qualifier(req);
1387        let requested = body["ProvisionedConcurrentExecutions"]
1388            .as_i64()
1389            .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
1390        let mut accounts = self.state.write();
1391        let state = accounts.get_or_create(&req.account_id);
1392        let cfg = ProvisionedConcurrencyConfig {
1393            requested,
1394            allocated: requested,
1395            status: "READY".to_string(),
1396            last_modified: Utc::now(),
1397        };
1398        state
1399            .provisioned_concurrency
1400            .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
1401        ok(json!({
1402            "RequestedProvisionedConcurrentExecutions": cfg.requested,
1403            "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1404            "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1405            "Status": cfg.status,
1406            "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1407        }))
1408    }
1409
1410    fn get_provisioned_concurrency(
1411        &self,
1412        function_name: &str,
1413        req: &AwsRequest,
1414    ) -> Result<AwsResponse, AwsServiceError> {
1415        let qualifier = parse_qualifier(req);
1416        let region = self.region_for(&req.account_id);
1417        self.with_state_read(&req.account_id, &region, |state| {
1418            state
1419                .provisioned_concurrency
1420                .get(&Self::pc_key(function_name, &qualifier))
1421                .map(|cfg| ok(json!({
1422                    "RequestedProvisionedConcurrentExecutions": cfg.requested,
1423                    "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1424                    "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1425                    "Status": cfg.status,
1426                    "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1427                })))
1428                .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
1429        })
1430    }
1431
1432    fn delete_provisioned_concurrency(
1433        &self,
1434        function_name: &str,
1435        req: &AwsRequest,
1436    ) -> Result<AwsResponse, AwsServiceError> {
1437        let qualifier = parse_qualifier(req);
1438        let mut accounts = self.state.write();
1439        let state = accounts.get_or_create(&req.account_id);
1440        state
1441            .provisioned_concurrency
1442            .remove(&Self::pc_key(function_name, &qualifier));
1443        empty()
1444    }
1445
1446    fn list_provisioned_concurrency(
1447        &self,
1448        function_name: &str,
1449        account_id: &str,
1450    ) -> Result<AwsResponse, AwsServiceError> {
1451        let region = self.region_for(account_id);
1452        self.with_state_read(account_id, &region, |state| {
1453            let prefix = format!("{function_name}:");
1454            let configs: Vec<Value> = state
1455                .provisioned_concurrency
1456                .iter()
1457                .filter(|(k, _)| k.starts_with(&prefix))
1458                .map(|(k, cfg)| {
1459                    let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
1460                    json!({
1461                        "FunctionArn": format!(
1462                            "arn:aws:lambda:{}:{}:function:{}:{}",
1463                            state.region, state.account_id, function_name, qualifier
1464                        ),
1465                        "Status": cfg.status,
1466                        "RequestedProvisionedConcurrentExecutions": cfg.requested,
1467                        "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1468                        "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1469                        "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1470                    })
1471                })
1472                .collect();
1473            ok(json!({"ProvisionedConcurrencyConfigs": configs}))
1474        })
1475    }
1476
1477    // ── Code signing ──
1478
1479    fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1480        let body = body(req);
1481        let mut accounts = self.state.write();
1482        let state = accounts.get_or_create(&req.account_id);
1483        let id = id_from_time("csc-");
1484        let arn = format!(
1485            "arn:aws:lambda:{}:{}:code-signing-config:{}",
1486            state.region, state.account_id, id
1487        );
1488        let publishers: Vec<String> = body
1489            .get("AllowedPublishers")
1490            .and_then(|v| v.get("SigningProfileVersionArns"))
1491            .and_then(|v| v.as_array())
1492            .map(|arr| {
1493                arr.iter()
1494                    .filter_map(|x| x.as_str().map(String::from))
1495                    .collect()
1496            })
1497            .unwrap_or_default();
1498        let csc = CodeSigningConfig {
1499            csc_id: id.clone(),
1500            csc_arn: arn,
1501            description: body["Description"].as_str().unwrap_or("").to_string(),
1502            allowed_publishers: publishers,
1503            untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
1504                .as_str()
1505                .unwrap_or("Warn")
1506                .to_string(),
1507            last_modified: Utc::now(),
1508        };
1509        state.code_signing_configs.insert(id, csc.clone());
1510        ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
1511    }
1512
1513    fn get_code_signing_config(
1514        &self,
1515        csc_id: &str,
1516        account_id: &str,
1517    ) -> Result<AwsResponse, AwsServiceError> {
1518        let id = extract_csc_id(csc_id);
1519        let region = self.region_for(account_id);
1520        self.with_state_read(account_id, &region, |state| {
1521            state
1522                .code_signing_configs
1523                .get(&id)
1524                .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
1525                .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
1526        })
1527    }
1528
1529    fn update_code_signing_config(
1530        &self,
1531        csc_id: &str,
1532        req: &AwsRequest,
1533    ) -> Result<AwsResponse, AwsServiceError> {
1534        let body = body(req);
1535        let mut accounts = self.state.write();
1536        let state = accounts.get_or_create(&req.account_id);
1537        let id = extract_csc_id(csc_id);
1538        let csc = state
1539            .code_signing_configs
1540            .get_mut(&id)
1541            .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1542        if let Some(d) = body["Description"].as_str() {
1543            csc.description = d.to_string();
1544        }
1545        if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1546        {
1547            csc.untrusted_artifact_action = action.to_string();
1548        }
1549        csc.last_modified = Utc::now();
1550        ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1551    }
1552
1553    fn delete_code_signing_config(
1554        &self,
1555        csc_id: &str,
1556        account_id: &str,
1557    ) -> Result<AwsResponse, AwsServiceError> {
1558        let id = extract_csc_id(csc_id);
1559        let mut accounts = self.state.write();
1560        let state = accounts.get_or_create(account_id);
1561        state.code_signing_configs.remove(&id);
1562        empty()
1563    }
1564
1565    fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1566        let region = self.region_for(account_id);
1567        self.with_state_read(account_id, &region, |state| {
1568            let cfgs: Vec<Value> = state
1569                .code_signing_configs
1570                .values()
1571                .map(code_signing_json)
1572                .collect();
1573            ok(json!({"CodeSigningConfigs": cfgs}))
1574        })
1575    }
1576
1577    fn put_function_code_signing(
1578        &self,
1579        function_name: &str,
1580        req: &AwsRequest,
1581    ) -> Result<AwsResponse, AwsServiceError> {
1582        let body = body(req);
1583        let csc_arn = body["CodeSigningConfigArn"]
1584            .as_str()
1585            .ok_or_else(|| missing("CodeSigningConfigArn"))?
1586            .to_string();
1587        let mut accounts = self.state.write();
1588        let state = accounts.get_or_create(&req.account_id);
1589        state
1590            .function_code_signing
1591            .insert(function_name.to_string(), csc_arn.clone());
1592        ok(json!({
1593            "CodeSigningConfigArn": csc_arn,
1594            "FunctionName": function_name,
1595        }))
1596    }
1597
1598    fn get_function_code_signing(
1599        &self,
1600        function_name: &str,
1601        account_id: &str,
1602    ) -> Result<AwsResponse, AwsServiceError> {
1603        let region = self.region_for(account_id);
1604        self.with_state_read(account_id, &region, |state| {
1605            let arn = state
1606                .function_code_signing
1607                .get(function_name)
1608                .cloned()
1609                .unwrap_or_default();
1610            ok(json!({
1611                "CodeSigningConfigArn": arn,
1612                "FunctionName": function_name,
1613            }))
1614        })
1615    }
1616
1617    fn delete_function_code_signing(
1618        &self,
1619        function_name: &str,
1620        account_id: &str,
1621    ) -> Result<AwsResponse, AwsServiceError> {
1622        let mut accounts = self.state.write();
1623        let state = accounts.get_or_create(account_id);
1624        state.function_code_signing.remove(function_name);
1625        empty()
1626    }
1627
1628    fn list_functions_by_code_signing(
1629        &self,
1630        csc_id: &str,
1631        account_id: &str,
1632    ) -> Result<AwsResponse, AwsServiceError> {
1633        let id = extract_csc_id(csc_id);
1634        let region = self.region_for(account_id);
1635        self.with_state_read(account_id, &region, |state| {
1636            let funcs: Vec<&String> = state
1637                .function_code_signing
1638                .iter()
1639                .filter(|(_, v)| v.contains(&id))
1640                .map(|(k, _)| k)
1641                .collect();
1642            ok(json!({"FunctionArns": funcs}))
1643        })
1644    }
1645
1646    // ── Event invoke ──
1647
1648    fn ev_key(function: &str, qualifier: &str) -> String {
1649        format!("{function}:{qualifier}")
1650    }
1651
1652    fn put_function_event_invoke(
1653        &self,
1654        function_name: &str,
1655        req: &AwsRequest,
1656    ) -> Result<AwsResponse, AwsServiceError> {
1657        let body = body(req);
1658        let qualifier = parse_qualifier(req);
1659        let function_arn = format!(
1660            "arn:aws:lambda:{}:{}:function:{}",
1661            self.region_for(&req.account_id),
1662            req.account_id,
1663            function_name
1664        );
1665        let cfg = EventInvokeConfig {
1666            function_arn: function_arn.clone(),
1667            maximum_event_age: body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600),
1668            maximum_retry_attempts: body["MaximumRetryAttempts"].as_i64().unwrap_or(2),
1669            destination_config: body.get("DestinationConfig").cloned().unwrap_or(json!({})),
1670            last_modified: Utc::now(),
1671        };
1672        let mut accounts = self.state.write();
1673        let state = accounts.get_or_create(&req.account_id);
1674        state
1675            .event_invoke_configs
1676            .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1677        ok(event_invoke_json(&cfg))
1678    }
1679
1680    fn get_function_event_invoke(
1681        &self,
1682        function_name: &str,
1683        req: &AwsRequest,
1684    ) -> Result<AwsResponse, AwsServiceError> {
1685        let qualifier = parse_qualifier(req);
1686        let region = self.region_for(&req.account_id);
1687        self.with_state_read(&req.account_id, &region, |state| {
1688            state
1689                .event_invoke_configs
1690                .get(&Self::ev_key(function_name, &qualifier))
1691                .map(|c| ok(event_invoke_json(c)))
1692                .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
1693        })
1694    }
1695
1696    fn delete_function_event_invoke(
1697        &self,
1698        function_name: &str,
1699        req: &AwsRequest,
1700    ) -> Result<AwsResponse, AwsServiceError> {
1701        let qualifier = parse_qualifier(req);
1702        let mut accounts = self.state.write();
1703        let state = accounts.get_or_create(&req.account_id);
1704        state
1705            .event_invoke_configs
1706            .remove(&Self::ev_key(function_name, &qualifier));
1707        empty()
1708    }
1709
1710    fn list_function_event_invoke(
1711        &self,
1712        function_name: &str,
1713        account_id: &str,
1714    ) -> Result<AwsResponse, AwsServiceError> {
1715        let region = self.region_for(account_id);
1716        self.with_state_read(account_id, &region, |state| {
1717            let prefix = format!("{function_name}:");
1718            let configs: Vec<Value> = state
1719                .event_invoke_configs
1720                .iter()
1721                .filter(|(k, _)| k.starts_with(&prefix))
1722                .map(|(_, c)| event_invoke_json(c))
1723                .collect();
1724            ok(json!({"FunctionEventInvokeConfigs": configs}))
1725        })
1726    }
1727
1728    // ── Runtime management ──
1729
1730    fn put_runtime_management(
1731        &self,
1732        function_name: &str,
1733        req: &AwsRequest,
1734    ) -> Result<AwsResponse, AwsServiceError> {
1735        let body = body(req);
1736        let qualifier = parse_qualifier(req);
1737        let cfg = RuntimeManagementConfig {
1738            update_runtime_on: body["UpdateRuntimeOn"]
1739                .as_str()
1740                .unwrap_or("Auto")
1741                .to_string(),
1742            runtime_version_arn: body["RuntimeVersionArn"].as_str().unwrap_or("").to_string(),
1743        };
1744        let mut accounts = self.state.write();
1745        let state = accounts.get_or_create(&req.account_id);
1746        state
1747            .runtime_management
1748            .insert(format!("{function_name}:{qualifier}"), cfg.clone());
1749        ok(json!({
1750            "FunctionArn": Arn::new("lambda", &state.region, &state.account_id, &format!("function:{function_name}:{qualifier}")).to_string(),
1751            "UpdateRuntimeOn": cfg.update_runtime_on,
1752            "RuntimeVersionArn": cfg.runtime_version_arn,
1753        }))
1754    }
1755
1756    fn get_runtime_management(
1757        &self,
1758        function_name: &str,
1759        req: &AwsRequest,
1760    ) -> Result<AwsResponse, AwsServiceError> {
1761        let qualifier = parse_qualifier(req);
1762        let region = self.region_for(&req.account_id);
1763        self.with_state_read(&req.account_id, &region, |state| {
1764            let cfg = state
1765                .runtime_management
1766                .get(&format!("{function_name}:{qualifier}"))
1767                .cloned()
1768                .unwrap_or(RuntimeManagementConfig {
1769                    update_runtime_on: "Auto".to_string(),
1770                    runtime_version_arn: String::new(),
1771                });
1772            ok(json!({
1773                "FunctionArn": format!(
1774                    "arn:aws:lambda:{}:{}:function:{}:{}",
1775                    state.region, state.account_id, function_name, qualifier
1776                ),
1777                "UpdateRuntimeOn": cfg.update_runtime_on,
1778                "RuntimeVersionArn": cfg.runtime_version_arn,
1779            }))
1780        })
1781    }
1782
1783    // ── Scaling ──
1784
1785    fn put_scaling_config(
1786        &self,
1787        uuid: &str,
1788        req: &AwsRequest,
1789    ) -> Result<AwsResponse, AwsServiceError> {
1790        let body = body(req);
1791        let cfg = FunctionScalingConfig {
1792            maximum_concurrency: body["MaximumConcurrency"].as_i64().unwrap_or(0),
1793        };
1794        let mut accounts = self.state.write();
1795        let state = accounts.get_or_create(&req.account_id);
1796        state.scaling_configs.insert(uuid.to_string(), cfg.clone());
1797        ok(json!({
1798            "MaximumConcurrency": cfg.maximum_concurrency,
1799        }))
1800    }
1801
1802    fn get_scaling_config(
1803        &self,
1804        uuid: &str,
1805        account_id: &str,
1806    ) -> Result<AwsResponse, AwsServiceError> {
1807        let region = self.region_for(account_id);
1808        self.with_state_read(account_id, &region, |state| {
1809            let n = state
1810                .scaling_configs
1811                .get(uuid)
1812                .map(|c| c.maximum_concurrency)
1813                .unwrap_or(0);
1814            ok(json!({"MaximumConcurrency": n}))
1815        })
1816    }
1817
1818    // ── Recursion ──
1819
1820    fn put_recursion_config(
1821        &self,
1822        function_name: &str,
1823        req: &AwsRequest,
1824    ) -> Result<AwsResponse, AwsServiceError> {
1825        let body = body(req);
1826        let mode = body["RecursiveLoop"]
1827            .as_str()
1828            .unwrap_or("Terminate")
1829            .to_string();
1830        let mut accounts = self.state.write();
1831        let state = accounts.get_or_create(&req.account_id);
1832        state
1833            .recursion_configs
1834            .insert(function_name.to_string(), mode.clone());
1835        ok(json!({"RecursiveLoop": mode}))
1836    }
1837
1838    fn get_recursion_config(
1839        &self,
1840        function_name: &str,
1841        account_id: &str,
1842    ) -> Result<AwsResponse, AwsServiceError> {
1843        let region = self.region_for(account_id);
1844        self.with_state_read(account_id, &region, |state| {
1845            let mode = state
1846                .recursion_configs
1847                .get(function_name)
1848                .cloned()
1849                .unwrap_or_else(|| "Terminate".to_string());
1850            ok(json!({"RecursiveLoop": mode}))
1851        })
1852    }
1853
1854    // ── Tags ──
1855
1856    fn tag_resource(
1857        &self,
1858        resource_arn: &str,
1859        req: &AwsRequest,
1860    ) -> Result<AwsResponse, AwsServiceError> {
1861        let body = body(req);
1862        let new_tags: Vec<(String, String)> = body
1863            .get("Tags")
1864            .and_then(|v| v.as_object())
1865            .map(|m| {
1866                m.iter()
1867                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1868                    .collect()
1869            })
1870            .unwrap_or_default();
1871        // SDKs URL-encode `:` in the path so the ARN arrives as
1872        // `arn%3Aaws%3Alambda%3A...`; decode before parsing.
1873        let resource_arn_decoded = decode_query_segment(resource_arn);
1874        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1875            AwsServiceError::aws_error(
1876                StatusCode::BAD_REQUEST,
1877                "InvalidParameterValueException",
1878                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1879            )
1880        })?;
1881        let mut accounts = self.state.write();
1882        let state = accounts.get_or_create(&req.account_id);
1883        let func = state.functions.get_mut(&name).ok_or_else(|| {
1884            AwsServiceError::aws_error(
1885                StatusCode::NOT_FOUND,
1886                "ResourceNotFoundException",
1887                format!("Function not found: {name}"),
1888            )
1889        })?;
1890        // Single source of truth: per-function `tags`. `GetFunction`,
1891        // `ListTagsForResource`, and `UntagResource` all read here.
1892        for (k, v) in new_tags {
1893            func.tags.insert(k, v);
1894        }
1895        empty()
1896    }
1897
1898    fn untag_resource(
1899        &self,
1900        resource_arn: &str,
1901        req: &AwsRequest,
1902    ) -> Result<AwsResponse, AwsServiceError> {
1903        // AWS sends keys as repeated `tagKeys=K1&tagKeys=K2` query
1904        // params per the Smithy model (`httpQuery: "tagKeys"`). The
1905        // dispatcher's deduplicated `query_params` HashMap collapses
1906        // repeats, so parse the raw query string for every occurrence.
1907        // Also accept `tagKeys.1=K1` / `tagKeys.member.1=K1` for SDKs
1908        // that serialize list params indexed-style.
1909        //
1910        // As a defensive fallback we also accept a JSON body of the
1911        // form `{"TagKeys": [...]}` / `{"tagKeys": [...]}` for clients
1912        // that mistakenly send the tag keys in the body. Query
1913        // parameters win when both are present, since query is the
1914        // AWS-canonical wire format.
1915        let mut keys: Vec<String> = Vec::new();
1916        for (k, v) in parse_query_pairs(&req.raw_query) {
1917            if k == "tagKeys" || k.starts_with("tagKeys.") {
1918                keys.push(v);
1919            }
1920        }
1921        if keys.is_empty() {
1922            let parsed = body(req);
1923            for field in ["TagKeys", "tagKeys"] {
1924                if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
1925                    for v in arr {
1926                        if let Some(s) = v.as_str() {
1927                            keys.push(s.to_string());
1928                        }
1929                    }
1930                    if !keys.is_empty() {
1931                        break;
1932                    }
1933                }
1934            }
1935        }
1936        let resource_arn_decoded = decode_query_segment(resource_arn);
1937        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1938            AwsServiceError::aws_error(
1939                StatusCode::BAD_REQUEST,
1940                "InvalidParameterValueException",
1941                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1942            )
1943        })?;
1944        let mut accounts = self.state.write();
1945        let state = accounts.get_or_create(&req.account_id);
1946        let func = state.functions.get_mut(&name).ok_or_else(|| {
1947            AwsServiceError::aws_error(
1948                StatusCode::NOT_FOUND,
1949                "ResourceNotFoundException",
1950                format!("Function not found: {name}"),
1951            )
1952        })?;
1953        for k in &keys {
1954            func.tags.remove(k);
1955        }
1956        empty()
1957    }
1958
1959    fn list_tags(
1960        &self,
1961        resource_arn: &str,
1962        account_id: &str,
1963    ) -> Result<AwsResponse, AwsServiceError> {
1964        let resource_arn_decoded = decode_query_segment(resource_arn);
1965        let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1966            AwsServiceError::aws_error(
1967                StatusCode::BAD_REQUEST,
1968                "InvalidParameterValueException",
1969                format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1970            )
1971        })?;
1972        let region = self.region_for(account_id);
1973        self.with_state_read(account_id, &region, |state| {
1974            let func = state.functions.get(&name).ok_or_else(|| {
1975                AwsServiceError::aws_error(
1976                    StatusCode::NOT_FOUND,
1977                    "ResourceNotFoundException",
1978                    format!("Function not found: {name}"),
1979                )
1980            })?;
1981            let tags: serde_json::Map<String, Value> = func
1982                .tags
1983                .iter()
1984                .map(|(k, v)| (k.clone(), Value::String(v.clone())))
1985                .collect();
1986            ok(json!({"Tags": tags}))
1987        })
1988    }
1989
1990    // ── Capacity providers ──
1991
1992    fn update_event_source_mapping_handler(
1993        &self,
1994        uuid: &str,
1995        req: &AwsRequest,
1996    ) -> Result<AwsResponse, AwsServiceError> {
1997        let body = body(req);
1998        let mut accounts = self.state.write();
1999        let state = accounts.get_or_create(&req.account_id);
2000        let esm = state
2001            .event_source_mappings
2002            .get_mut(uuid)
2003            .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
2004        if let Some(b) = body["BatchSize"].as_i64() {
2005            esm.batch_size = b;
2006        }
2007        if let Some(name) = body["FunctionName"].as_str() {
2008            esm.function_arn = format!(
2009                "arn:aws:lambda:{}:{}:function:{}",
2010                state.region, state.account_id, name
2011            );
2012        }
2013        if let Some(filters) = body
2014            .get("FilterCriteria")
2015            .and_then(|v| v.get("Filters"))
2016            .and_then(|v| v.as_array())
2017        {
2018            esm.filter_patterns = filters
2019                .iter()
2020                .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
2021                .collect();
2022        }
2023        if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
2024            esm.function_response_types = types
2025                .iter()
2026                .filter_map(|v| v.as_str().map(String::from))
2027                .collect();
2028        }
2029        if let Some(w) = body
2030            .get("MaximumBatchingWindowInSeconds")
2031            .and_then(|v| v.as_i64())
2032        {
2033            esm.maximum_batching_window_in_seconds = Some(w);
2034        }
2035        if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
2036            esm.parallelization_factor = Some(p);
2037        }
2038        if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
2039            esm.kms_key_arn = Some(s.to_string());
2040        }
2041        if let Some(mc) = body.get("MetricsConfig") {
2042            esm.metrics_config = Some(mc.clone());
2043        }
2044        if let Some(dc) = body.get("DestinationConfig") {
2045            esm.destination_config = Some(dc.clone());
2046        }
2047        if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
2048            esm.maximum_retry_attempts = Some(n);
2049        }
2050        if let Some(n) = body
2051            .get("MaximumRecordAgeInSeconds")
2052            .and_then(|v| v.as_i64())
2053        {
2054            esm.maximum_record_age_in_seconds = Some(n);
2055        }
2056        if let Some(b) = body
2057            .get("BisectBatchOnFunctionError")
2058            .and_then(|v| v.as_bool())
2059        {
2060            esm.bisect_batch_on_function_error = Some(b);
2061        }
2062        if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
2063            esm.tumbling_window_in_seconds = Some(n);
2064        }
2065        let mut body_json = json!({
2066            "UUID": esm.uuid,
2067            "FunctionArn": esm.function_arn,
2068            "EventSourceArn": esm.event_source_arn,
2069            "BatchSize": esm.batch_size,
2070            "State": "Enabled",
2071            "StateTransitionReason": "USER_INITIATED",
2072            "LastModified": chrono::Utc::now().timestamp() as f64,
2073        });
2074        let obj = body_json.as_object_mut().expect("json! built object");
2075        if !esm.filter_patterns.is_empty() {
2076            obj.insert(
2077                "FilterCriteria".into(),
2078                json!({
2079                    "Filters": esm
2080                        .filter_patterns
2081                        .iter()
2082                        .map(|p| json!({"Pattern": p}))
2083                        .collect::<Vec<_>>(),
2084                }),
2085            );
2086        }
2087        if !esm.function_response_types.is_empty() {
2088            obj.insert(
2089                "FunctionResponseTypes".into(),
2090                json!(esm.function_response_types),
2091            );
2092        }
2093        if let Some(w) = esm.maximum_batching_window_in_seconds {
2094            obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
2095        }
2096        if let Some(p) = esm.parallelization_factor {
2097            obj.insert("ParallelizationFactor".into(), json!(p));
2098        }
2099        ok(body_json)
2100    }
2101
2102    fn region_for(&self, account_id: &str) -> String {
2103        let accounts = self.state.read();
2104        accounts
2105            .get(account_id)
2106            .map(|s| s.region.clone())
2107            .unwrap_or_else(|| "us-east-1".to_string())
2108    }
2109
2110    /// `InvokeWithResponseStream` — invoke the function and serialize
2111    /// its response as a sequence of `application/vnd.amazon.eventstream`
2112    /// frames. AWS uses this protocol for response-streaming Lambda
2113    /// invocations (Node.js `awslambda.streamifyResponse`, Python
2114    /// streaming handlers, custom runtimes that flush mid-handler).
2115    ///
2116    /// On success: zero or more `PayloadChunk` events (one per chunk
2117    /// the RIE flushed) followed by an `InvokeComplete` event with
2118    /// `ErrorCode = null`. On a function error (uncaught exception in
2119    /// the handler) or an infrastructure error (timeout, container
2120    /// crash): an `InvokeComplete` with non-null `ErrorCode`/
2121    /// `ErrorDetails`. The HTTP status itself is always 200 — failures
2122    /// surface inside the trailing event, matching AWS.
2123    pub(crate) async fn invoke_with_response_stream(
2124        &self,
2125        function_name: &str,
2126        account_id: &str,
2127        req: &AwsRequest,
2128    ) -> Result<AwsResponse, AwsServiceError> {
2129        // Resolve the function under the same rules as buffered Invoke
2130        // — qualifier, version snapshots, attached layers, code-zip
2131        // presence — but without the InvocationType branch (streaming
2132        // is always synchronous).
2133        let qualifier = req.query_params.get("Qualifier").map(String::as_str);
2134
2135        let resolved_version: Option<String> = {
2136            let accounts = self.state.read();
2137            let empty = LambdaState::new(account_id, "");
2138            let state = accounts.get(account_id).unwrap_or(&empty);
2139            crate::service::resolve_qualifier_to_version(state, function_name, qualifier)
2140        };
2141        let executed_version = resolved_version
2142            .clone()
2143            .unwrap_or_else(|| "$LATEST".to_string());
2144
2145        let (func, layer_zips) = {
2146            let accounts = self.state.read();
2147            let empty = LambdaState::new(account_id, "");
2148            let state = accounts.get(account_id).unwrap_or(&empty);
2149            let func = match resolved_version.as_deref() {
2150                Some(v) => state
2151                    .function_version_snapshots
2152                    .get(function_name)
2153                    .and_then(|m| m.get(v))
2154                    .cloned()
2155                    .or_else(|| state.functions.get(function_name).cloned()),
2156                None => state.functions.get(function_name).cloned(),
2157            }
2158            .ok_or_else(|| {
2159                AwsServiceError::aws_error(
2160                    StatusCode::NOT_FOUND,
2161                    "ResourceNotFoundException",
2162                    format!(
2163                        "Function not found: arn:aws:lambda:{}:{}:function:{}",
2164                        state.region, state.account_id, function_name
2165                    ),
2166                )
2167            })?;
2168            let mut zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
2169            for attached in &func.layers {
2170                if let Some(b) =
2171                    parse_layer_version_arn(&attached.arn).and_then(|(acct, name, ver)| {
2172                        accounts
2173                            .get(&acct)
2174                            .and_then(|s| s.layers.get(&name))
2175                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
2176                            .and_then(|v| v.code_zip.clone())
2177                    })
2178                {
2179                    zips.push(b);
2180                }
2181            }
2182            (func, zips)
2183        };
2184
2185        if func.code_zip.is_none() && func.package_type != "Image" {
2186            return Err(AwsServiceError::aws_error(
2187                StatusCode::BAD_REQUEST,
2188                "InvalidParameterValueException",
2189                "Function has no deployment package",
2190            ));
2191        }
2192
2193        let runtime = self.runtime.as_ref().ok_or_else(|| {
2194            AwsServiceError::aws_error(
2195                StatusCode::INTERNAL_SERVER_ERROR,
2196                "ServiceException",
2197                "Docker/Podman is required for Lambda execution but is not available",
2198            )
2199        })?;
2200
2201        // Drive the streaming RIE call and assemble the eventstream
2202        // body. We buffer all frames before returning — `AwsResponse`
2203        // is byte-buffered today — but the chunk boundaries the RIE
2204        // flushed are preserved as separate `PayloadChunk` events, so
2205        // SDK parsers see exactly the streaming structure they expect.
2206        let mut frames: Vec<u8> = Vec::new();
2207        let invoke_result = runtime
2208            .invoke_streaming(&func, &req.body, &layer_zips)
2209            .await;
2210
2211        let (error_code, error_details) = match invoke_result {
2212            Ok(mut stream) => {
2213                let mut last_chunk: Option<bytes::Bytes> = None;
2214                let mut had_chunks = false;
2215                loop {
2216                    match stream.next_chunk().await {
2217                        Ok(Some(chunk)) => {
2218                            had_chunks = true;
2219                            frames.extend_from_slice(&crate::eventstream::payload_chunk_frame(
2220                                &chunk,
2221                            ));
2222                            last_chunk = Some(chunk);
2223                        }
2224                        Ok(None) => break,
2225                        Err(e) => {
2226                            tracing::error!(function = %function_name, error = %e, "Lambda streaming chunk read failed");
2227                            return Err(AwsServiceError::aws_error(
2228                                StatusCode::INTERNAL_SERVER_ERROR,
2229                                "ServiceException",
2230                                format!("Lambda streaming read failed: {e}"),
2231                            ));
2232                        }
2233                    }
2234                }
2235
2236                // The Lambda runtime returns 200 even when the user
2237                // handler threw, packing `errorMessage`/`errorType`
2238                // into the buffered body. Streaming handlers do the
2239                // same on the final chunk. Inspect the last chunk we
2240                // saw and surface that as a function error in the
2241                // terminal `InvokeComplete` event.
2242                let mut error: Option<(String, String)> = None;
2243                if had_chunks {
2244                    if let Some(bytes) = last_chunk {
2245                        if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
2246                            if let Some(obj) = v.as_object() {
2247                                if obj.contains_key("errorMessage") || obj.contains_key("errorType")
2248                                {
2249                                    let etype = obj
2250                                        .get("errorType")
2251                                        .and_then(|x| x.as_str())
2252                                        .unwrap_or("Runtime.Unknown")
2253                                        .to_string();
2254                                    let emsg = obj
2255                                        .get("errorMessage")
2256                                        .and_then(|x| x.as_str())
2257                                        .unwrap_or("function error")
2258                                        .to_string();
2259                                    error = Some((etype, emsg));
2260                                }
2261                            }
2262                        }
2263                    }
2264                }
2265                match error {
2266                    Some((code, details)) => (Some(code), Some(details)),
2267                    None => (None, None),
2268                }
2269            }
2270            Err(e) => {
2271                tracing::error!(function = %function_name, error = %e, "Lambda streaming invocation failed");
2272                (
2273                    Some("Runtime.InvocationFailure".to_string()),
2274                    Some(e.to_string()),
2275                )
2276            }
2277        };
2278
2279        frames.extend_from_slice(&crate::eventstream::invoke_complete_frame(
2280            error_code.as_deref(),
2281            error_details.as_deref(),
2282            "",
2283        ));
2284
2285        let mut resp = AwsResponse {
2286            status: StatusCode::OK,
2287            content_type: "application/vnd.amazon.eventstream".to_string(),
2288            body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(frames)),
2289            headers: http::HeaderMap::new(),
2290        };
2291        if let Ok(v) = http::HeaderValue::from_str(&executed_version) {
2292            resp.headers
2293                .insert(http::HeaderName::from_static("x-amz-executed-version"), v);
2294        }
2295        Ok(resp)
2296    }
2297}
2298
2299fn extract_csc_id(input: &str) -> String {
2300    // Decode percent encoding then take the segment after the last colon
2301    // (csc id), or treat as id if no colon present.
2302    let decoded = percent_decode(input);
2303    decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
2304}
2305
2306fn percent_decode(input: &str) -> String {
2307    let mut out = String::with_capacity(input.len());
2308    let bytes = input.as_bytes();
2309    let mut i = 0;
2310    while i < bytes.len() {
2311        if bytes[i] == b'%' && i + 2 < bytes.len() {
2312            let hi = (bytes[i + 1] as char).to_digit(16);
2313            let lo = (bytes[i + 2] as char).to_digit(16);
2314            if let (Some(h), Some(l)) = (hi, lo) {
2315                out.push(((h * 16 + l) as u8) as char);
2316                i += 3;
2317                continue;
2318            }
2319        }
2320        out.push(bytes[i] as char);
2321        i += 1;
2322    }
2323    out
2324}
2325
2326fn code_signing_json(c: &CodeSigningConfig) -> Value {
2327    json!({
2328        "CodeSigningConfigId": c.csc_id,
2329        "CodeSigningConfigArn": c.csc_arn,
2330        "Description": c.description,
2331        "AllowedPublishers": {
2332            "SigningProfileVersionArns": c.allowed_publishers,
2333        },
2334        "CodeSigningPolicies": {
2335            "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
2336        },
2337        "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
2338    })
2339}
2340
2341fn event_invoke_json(c: &EventInvokeConfig) -> Value {
2342    json!({
2343        "FunctionArn": c.function_arn,
2344        "MaximumEventAgeInSeconds": c.maximum_event_age,
2345        "MaximumRetryAttempts": c.maximum_retry_attempts,
2346        "DestinationConfig": c.destination_config,
2347        "LastModified": c.last_modified.timestamp(),
2348    })
2349}
2350
2351#[cfg(test)]
2352mod tests {
2353    use crate::service::LambdaService;
2354    use crate::state::{LambdaState, SharedLambdaState};
2355    use fakecloud_core::multi_account::MultiAccountState;
2356    use fakecloud_core::service::AwsRequest;
2357    use http::Method;
2358    use parking_lot::RwLock;
2359    use std::collections::HashMap;
2360    use std::sync::Arc;
2361
2362    fn svc() -> LambdaService {
2363        let state: SharedLambdaState = Arc::new(RwLock::new(
2364            MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
2365        ));
2366        LambdaService::new(state)
2367    }
2368
2369    fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
2370        AwsRequest {
2371            service: "lambda".to_string(),
2372            method: Method::POST,
2373            raw_path: format!("/{}", segs.join("/")),
2374            raw_query: String::new(),
2375            path_segments: segs.iter().map(|s| s.to_string()).collect(),
2376            query_params: HashMap::new(),
2377            headers: http::HeaderMap::new(),
2378            body: bytes::Bytes::from(body.to_string()),
2379            body_stream: parking_lot::Mutex::new(None),
2380            account_id: "000000000000".to_string(),
2381            region: "us-east-1".to_string(),
2382            request_id: "rid".to_string(),
2383            action: action.to_string(),
2384            is_query_protocol: false,
2385            access_key_id: None,
2386            principal: None,
2387        }
2388    }
2389
2390    async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
2391        let r = s.handle_extra(action, res, &req(action, body, segs)).await;
2392        match r {
2393            Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
2394            Err(e) => panic!("{action} failed: {e:?}"),
2395        }
2396    }
2397
2398    #[tokio::test]
2399    async fn read_only_listings_succeed_without_state() {
2400        let s = svc();
2401        run(&s, "GetAccountSettings", "", None, &[]).await;
2402        run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
2403        run(&s, "ListLayers", "", None, &[]).await;
2404        run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
2405    }
2406
2407    #[tokio::test]
2408    async fn layers_lifecycle() {
2409        let s = svc();
2410        run(
2411            &s,
2412            "PublishLayerVersion",
2413            r#"{"Content":{"ZipFile":""}}"#,
2414            Some("layer1"),
2415            &["2018-10-31", "layers", "layer1", "versions"],
2416        )
2417        .await;
2418        run(&s, "ListLayers", "", None, &[]).await;
2419        run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
2420    }
2421
2422    #[tokio::test]
2423    async fn code_signing_lifecycle() {
2424        let s = svc();
2425        run(
2426            &s,
2427            "CreateCodeSigningConfig",
2428            r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
2429            None,
2430            &[],
2431        )
2432        .await;
2433        run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
2434    }
2435}