Skip to main content

fakecloud_lambda/
extras.rs

1//! Lambda handlers added to close the conformance gap. Aliases, layers,
2//! function URL configs, concurrency, code signing, event invoke, runtime
3//! management, scaling, recursion, capacity providers, durable executions,
4//! tagging, and account settings.
5
6use chrono::Utc;
7use http::StatusCode;
8use serde_json::{json, Value};
9
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
11
12use crate::service::LambdaService;
13use crate::state::{
14    AccountSettings, CapacityProvider, CodeSigningConfig, DurableExecution, EventInvokeConfig,
15    FunctionAlias, FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
16    ProvisionedConcurrencyConfig, RuntimeManagementConfig,
17};
18
19fn missing(name: &str) -> AwsServiceError {
20    AwsServiceError::aws_error(
21        StatusCode::BAD_REQUEST,
22        "InvalidParameterValueException",
23        format!("Missing required field: {name}"),
24    )
25}
26
27fn not_found(entity: &str, name: &str) -> AwsServiceError {
28    AwsServiceError::aws_error(
29        StatusCode::NOT_FOUND,
30        "ResourceNotFoundException",
31        format!("{entity} not found: {name}"),
32    )
33}
34
35fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
36    Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
37}
38
39fn empty() -> Result<AwsResponse, AwsServiceError> {
40    Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
41}
42
43fn body(req: &AwsRequest) -> Value {
44    serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
45}
46
47fn parse_qualifier(req: &AwsRequest) -> String {
48    req.query_params
49        .get("Qualifier")
50        .cloned()
51        .unwrap_or_else(|| "$LATEST".to_string())
52}
53
54fn id_from_time(prefix: &str) -> String {
55    format!(
56        "{}{}",
57        prefix,
58        std::time::SystemTime::now()
59            .duration_since(std::time::UNIX_EPOCH)
60            .map(|d| d.as_nanos())
61            .unwrap_or(0)
62    )
63}
64
65impl LambdaService {
66    pub(crate) async fn handle_extra(
67        &self,
68        action: &str,
69        resource: Option<&str>,
70        req: &AwsRequest,
71    ) -> Result<AwsResponse, AwsServiceError> {
72        let aid = req.account_id.as_str();
73        let res = resource.unwrap_or("");
74        match action {
75            // Function lifecycle extras
76            "GetFunctionConfiguration" => self.get_function_configuration(res, aid),
77            "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
78            "UpdateFunctionCode" => self.update_function_code(res, req),
79            "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
80            "GetAccountSettings" => self.get_account_settings(aid),
81            "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
82            "InvokeWithResponseStream" => Ok(AwsResponse::json(StatusCode::OK, "{}".to_string())),
83
84            // Versions
85            "ListVersionsByFunction" => self.list_versions_by_function(res, aid),
86
87            // Aliases
88            "CreateAlias" => self.create_alias(res, req),
89            "GetAlias" => self.get_alias(res, req),
90            "ListAliases" => self.list_aliases(res, aid),
91            "UpdateAlias" => self.update_alias(res, req),
92            "DeleteAlias" => self.delete_alias(res, req),
93
94            // Layers
95            "PublishLayerVersion" => self.publish_layer_version(res, req),
96            "GetLayerVersion" => self.get_layer_version(req),
97            "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
98            "ListLayers" => self.list_layers(aid),
99            "ListLayerVersions" => self.list_layer_versions(res, aid),
100            "DeleteLayerVersion" => self.delete_layer_version(req),
101            "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
102            "AddLayerVersionPermission" => self.add_layer_version_permission(req),
103            "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
104
105            // Function URL
106            "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
107            "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
108            "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
109            "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
110            "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
111
112            // Concurrency
113            "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
114            "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
115            "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
116            "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
117            "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
118            "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
119            "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
120
121            // Code signing
122            "CreateCodeSigningConfig" => self.create_code_signing_config(req),
123            "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
124            "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
125            "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
126            "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
127            "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
128            "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
129            "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
130            "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
131
132            // Event invoke
133            "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
134                self.put_function_event_invoke(res, req)
135            }
136            "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
137            "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
138            "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
139
140            // Runtime management
141            "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
142            "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
143
144            // Scaling
145            "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
146            "GetFunctionScalingConfig" => self.get_scaling_config(res, aid),
147
148            // Recursion
149            "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
150            "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
151
152            // Tags
153            "TagResource" => self.tag_resource(res, req),
154            "UntagResource" => self.untag_resource(res, req),
155            "ListTags" => self.list_tags(res, aid),
156
157            // Capacity providers
158            "CreateCapacityProvider" => self.create_capacity_provider(req),
159            "GetCapacityProvider" => self.get_capacity_provider(res, aid),
160            "UpdateCapacityProvider" => self.update_capacity_provider(res, req),
161            "DeleteCapacityProvider" => self.delete_capacity_provider(res, aid),
162            "ListCapacityProviders" => self.list_capacity_providers(aid),
163            "ListFunctionVersionsByCapacityProvider" => {
164                self.list_versions_by_capacity_provider(res, aid)
165            }
166
167            // Durable executions
168            "CheckpointDurableExecution" => self.checkpoint_durable_execution(res, req),
169            "GetDurableExecution" => self.get_durable_execution(res, aid),
170            "GetDurableExecutionHistory" => self.get_durable_execution_history(res, aid),
171            "GetDurableExecutionState" => self.get_durable_execution_state(res, aid),
172            "ListDurableExecutionsByFunction" => self.list_durable_executions_by_function(res, aid),
173            "StopDurableExecution" => self.stop_durable_execution(res, aid),
174            "SendDurableExecutionCallbackSuccess" => {
175                self.send_durable_callback(res, req, "SUCCESS")
176            }
177            "SendDurableExecutionCallbackFailure" => {
178                self.send_durable_callback(res, req, "FAILURE")
179            }
180            "SendDurableExecutionCallbackHeartbeat" => {
181                self.send_durable_callback(res, req, "HEARTBEAT")
182            }
183
184            _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
185        }
186    }
187
188    fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
189    where
190        F: FnOnce(&LambdaState) -> R,
191    {
192        let accounts = self.state.read();
193        let empty = LambdaState::new(account_id, region);
194        let state = accounts.get(account_id).unwrap_or(&empty);
195        f(state)
196    }
197
198    // ── Function lifecycle extras ──
199
200    fn get_function_configuration(
201        &self,
202        function_name: &str,
203        account_id: &str,
204    ) -> Result<AwsResponse, AwsServiceError> {
205        let region = self.region_for(account_id);
206        self.with_state_read(account_id, &region, |state| {
207            state
208                .functions
209                .get(function_name)
210                .map(|f| ok(self.function_config_json(f)))
211                .unwrap_or_else(|| Err(not_found("Function", function_name)))
212        })
213    }
214
215    fn update_function_configuration(
216        &self,
217        function_name: &str,
218        req: &AwsRequest,
219    ) -> Result<AwsResponse, AwsServiceError> {
220        let body = body(req);
221        let mut accounts = self.state.write();
222        let state = accounts.get_or_create(&req.account_id);
223        let func = state
224            .functions
225            .get_mut(function_name)
226            .ok_or_else(|| not_found("Function", function_name))?;
227        if let Some(handler) = body["Handler"].as_str() {
228            func.handler = handler.to_string();
229        }
230        if let Some(t) = body["Timeout"].as_i64() {
231            func.timeout = t;
232        }
233        if let Some(m) = body["MemorySize"].as_i64() {
234            func.memory_size = m;
235        }
236        if let Some(role) = body["Role"].as_str() {
237            func.role = role.to_string();
238        }
239        if let Some(desc) = body["Description"].as_str() {
240            func.description = desc.to_string();
241        }
242        func.last_modified = Utc::now();
243        ok(self.function_config_json(func))
244    }
245
246    fn update_function_code(
247        &self,
248        function_name: &str,
249        req: &AwsRequest,
250    ) -> Result<AwsResponse, AwsServiceError> {
251        let mut accounts = self.state.write();
252        let state = accounts.get_or_create(&req.account_id);
253        let func = state
254            .functions
255            .get_mut(function_name)
256            .ok_or_else(|| not_found("Function", function_name))?;
257        func.last_modified = Utc::now();
258        ok(self.function_config_json(func))
259    }
260
261    fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
262        let mut accounts = self.state.write();
263        let state = accounts.get_or_create(account_id);
264        let settings = state.account_settings.clone().unwrap_or(AccountSettings {
265            concurrent_executions: 1000,
266            code_size_zipped: 52_428_800,
267            code_size_unzipped: 262_144_000,
268            total_code_size: 80_530_636_800,
269        });
270        if state.account_settings.is_none() {
271            state.account_settings = Some(settings.clone());
272        }
273        ok(json!({
274            "AccountLimit": {
275                "ConcurrentExecutions": settings.concurrent_executions,
276                "CodeSizeZipped": settings.code_size_zipped,
277                "CodeSizeUnzipped": settings.code_size_unzipped,
278                "TotalCodeSize": settings.total_code_size,
279                "UnreservedConcurrentExecutions": settings.concurrent_executions,
280            },
281            "AccountUsage": {
282                "TotalCodeSize": 0,
283                "FunctionCount": 0,
284            },
285        }))
286    }
287
288    // ── Versions ──
289
290    fn list_versions_by_function(
291        &self,
292        function_name: &str,
293        account_id: &str,
294    ) -> Result<AwsResponse, AwsServiceError> {
295        let region = self.region_for(account_id);
296        self.with_state_read(account_id, &region, |state| {
297            if !state.functions.contains_key(function_name) {
298                return Err(not_found("Function", function_name));
299            }
300            let versions: Vec<&String> = state
301                .function_versions
302                .get(function_name)
303                .map(|v| v.iter().collect())
304                .unwrap_or_default();
305            ok(json!({
306                "Versions": versions,
307            }))
308        })
309    }
310
311    // ── Aliases ──
312
313    fn alias_key(function: &str, alias: &str) -> String {
314        format!("{function}:{alias}")
315    }
316
317    fn create_alias(
318        &self,
319        function_name: &str,
320        req: &AwsRequest,
321    ) -> Result<AwsResponse, AwsServiceError> {
322        let body = body(req);
323        let name = body["Name"]
324            .as_str()
325            .ok_or_else(|| missing("Name"))?
326            .to_string();
327        let version = body["FunctionVersion"]
328            .as_str()
329            .unwrap_or("$LATEST")
330            .to_string();
331        let mut accounts = self.state.write();
332        let state = accounts.get_or_create(&req.account_id);
333        if !state.functions.contains_key(function_name) {
334            return Err(not_found("Function", function_name));
335        }
336        let alias_arn = format!(
337            "arn:aws:lambda:{}:{}:function:{}:{}",
338            state.region, state.account_id, function_name, name
339        );
340        let alias = FunctionAlias {
341            alias_arn: alias_arn.clone(),
342            name: name.clone(),
343            function_version: version,
344            description: body["Description"].as_str().unwrap_or("").to_string(),
345            revision_id: id_from_time("rev-"),
346            routing_config: body.get("RoutingConfig").cloned(),
347        };
348        state
349            .aliases
350            .insert(Self::alias_key(function_name, &name), alias.clone());
351        ok(serde_json::to_value(alias).unwrap_or_default())
352    }
353
354    fn get_alias(
355        &self,
356        function_name: &str,
357        req: &AwsRequest,
358    ) -> Result<AwsResponse, AwsServiceError> {
359        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
360        let region = self.region_for(&req.account_id);
361        self.with_state_read(&req.account_id, &region, |state| {
362            state
363                .aliases
364                .get(&Self::alias_key(function_name, &alias_name))
365                .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
366                .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
367        })
368    }
369
370    fn list_aliases(
371        &self,
372        function_name: &str,
373        account_id: &str,
374    ) -> Result<AwsResponse, AwsServiceError> {
375        let region = self.region_for(account_id);
376        self.with_state_read(account_id, &region, |state| {
377            let prefix = format!("{function_name}:");
378            let aliases: Vec<&FunctionAlias> = state
379                .aliases
380                .iter()
381                .filter(|(k, _)| k.starts_with(&prefix))
382                .map(|(_, v)| v)
383                .collect();
384            ok(json!({"Aliases": aliases}))
385        })
386    }
387
388    fn update_alias(
389        &self,
390        function_name: &str,
391        req: &AwsRequest,
392    ) -> Result<AwsResponse, AwsServiceError> {
393        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
394        let body = body(req);
395        let mut accounts = self.state.write();
396        let state = accounts.get_or_create(&req.account_id);
397        let key = Self::alias_key(function_name, &alias_name);
398        let alias = state
399            .aliases
400            .get_mut(&key)
401            .ok_or_else(|| not_found("Alias", &alias_name))?;
402        if let Some(v) = body["FunctionVersion"].as_str() {
403            alias.function_version = v.to_string();
404        }
405        if let Some(d) = body["Description"].as_str() {
406            alias.description = d.to_string();
407        }
408        if let Some(rc) = body.get("RoutingConfig") {
409            alias.routing_config = Some(rc.clone());
410        }
411        alias.revision_id = id_from_time("rev-");
412        ok(serde_json::to_value(alias).unwrap_or_default())
413    }
414
415    fn delete_alias(
416        &self,
417        function_name: &str,
418        req: &AwsRequest,
419    ) -> Result<AwsResponse, AwsServiceError> {
420        let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
421        let mut accounts = self.state.write();
422        let state = accounts.get_or_create(&req.account_id);
423        state
424            .aliases
425            .remove(&Self::alias_key(function_name, &alias_name));
426        empty()
427    }
428
429    // ── Layers ──
430
431    fn publish_layer_version(
432        &self,
433        layer_name: &str,
434        req: &AwsRequest,
435    ) -> Result<AwsResponse, AwsServiceError> {
436        let body = body(req);
437        let mut accounts = self.state.write();
438        let state = accounts.get_or_create(&req.account_id);
439        let layer = state
440            .layers
441            .entry(layer_name.to_string())
442            .or_insert_with(|| Layer {
443                layer_name: layer_name.to_string(),
444                layer_arn: format!(
445                    "arn:aws:lambda:{}:{}:layer:{}",
446                    state.region, state.account_id, layer_name
447                ),
448                versions: Vec::new(),
449            });
450        let next_version = (layer.versions.len() as i64) + 1;
451        let version_arn = format!("{}:{}", layer.layer_arn, next_version);
452        let runtimes: Vec<String> = body["CompatibleRuntimes"]
453            .as_array()
454            .map(|arr| {
455                arr.iter()
456                    .filter_map(|v| v.as_str().map(String::from))
457                    .collect()
458            })
459            .unwrap_or_default();
460        let lv = LayerVersion {
461            version: next_version,
462            layer_version_arn: version_arn.clone(),
463            description: body["Description"].as_str().unwrap_or("").to_string(),
464            created_date: Utc::now(),
465            compatible_runtimes: runtimes,
466            license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
467            policy: None,
468        };
469        layer.versions.push(lv.clone());
470        ok(json!({
471            "LayerArn": layer.layer_arn,
472            "LayerVersionArn": version_arn,
473            "Version": next_version,
474            "Description": lv.description,
475            "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
476            "CompatibleRuntimes": lv.compatible_runtimes,
477            "LicenseInfo": lv.license_info,
478        }))
479    }
480
481    fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
482        let region = self.region_for(account_id);
483        self.with_state_read(account_id, &region, |state| {
484            let layers: Vec<Value> = state
485                .layers
486                .values()
487                .map(|l| {
488                    json!({
489                        "LayerName": l.layer_name,
490                        "LayerArn": l.layer_arn,
491                        "LatestMatchingVersion": l.versions.last().map(|v| json!({
492                            "LayerVersionArn": v.layer_version_arn,
493                            "Version": v.version,
494                            "Description": v.description,
495                            "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
496                            "CompatibleRuntimes": v.compatible_runtimes,
497                        })),
498                    })
499                })
500                .collect();
501            ok(json!({"Layers": layers}))
502        })
503    }
504
505    fn list_layer_versions(
506        &self,
507        layer_name: &str,
508        account_id: &str,
509    ) -> Result<AwsResponse, AwsServiceError> {
510        let region = self.region_for(account_id);
511        self.with_state_read(account_id, &region, |state| {
512            let versions: Vec<Value> = state
513                .layers
514                .get(layer_name)
515                .map(|l| {
516                    l.versions
517                        .iter()
518                        .map(|v| {
519                            json!({
520                                "LayerVersionArn": v.layer_version_arn,
521                                "Version": v.version,
522                                "Description": v.description,
523                                "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
524                                "CompatibleRuntimes": v.compatible_runtimes,
525                                "LicenseInfo": v.license_info,
526                            })
527                        })
528                        .collect()
529                })
530                .unwrap_or_default();
531            ok(json!({"LayerVersions": versions}))
532        })
533    }
534
535    fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
536        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
537        let version: i64 = req
538            .path_segments
539            .get(4)
540            .and_then(|s| s.parse().ok())
541            .ok_or_else(|| missing("VersionNumber"))?;
542        let region = self.region_for(&req.account_id);
543        self.with_state_read(&req.account_id, &region, |state| {
544            state
545                .layers
546                .get(&layer_name)
547                .and_then(|l| l.versions.iter().find(|v| v.version == version))
548                .map(|v| {
549                    ok(json!({
550                        "LayerVersionArn": v.layer_version_arn,
551                        "Version": v.version,
552                        "Description": v.description,
553                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
554                        "CompatibleRuntimes": v.compatible_runtimes,
555                        "LicenseInfo": v.license_info,
556                        "Content": {
557                            "Location": "https://example.com/layer.zip",
558                            "CodeSha256": "",
559                            "CodeSize": 0,
560                        },
561                    }))
562                })
563                .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
564        })
565    }
566
567    fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568        let arn = req
569            .query_params
570            .get("Arn")
571            .or_else(|| req.query_params.get("find"))
572            .cloned()
573            .unwrap_or_default();
574        // arn:aws:lambda:region:account:layer:name:version
575        let parts: Vec<&str> = arn.rsplitn(3, ':').collect();
576        if parts.len() < 3 {
577            return Err(missing("Arn"));
578        }
579        let version: i64 = parts[0].parse().map_err(|_| missing("Arn"))?;
580        let layer_name = parts[1].to_string();
581        let region = self.region_for(&req.account_id);
582        self.with_state_read(&req.account_id, &region, |state| {
583            state
584                .layers
585                .get(&layer_name)
586                .and_then(|l| l.versions.iter().find(|v| v.version == version))
587                .map(|v| {
588                    ok(json!({
589                        "LayerVersionArn": v.layer_version_arn,
590                        "Version": v.version,
591                        "Description": v.description,
592                        "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
593                        "CompatibleRuntimes": v.compatible_runtimes,
594                        "LicenseInfo": v.license_info,
595                    }))
596                })
597                .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
598        })
599    }
600
601    fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
602        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
603        let version: i64 = req
604            .path_segments
605            .get(4)
606            .and_then(|s| s.parse().ok())
607            .unwrap_or(0);
608        let mut accounts = self.state.write();
609        let state = accounts.get_or_create(&req.account_id);
610        if let Some(layer) = state.layers.get_mut(&layer_name) {
611            layer.versions.retain(|v| v.version != version);
612        }
613        empty()
614    }
615
616    fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
617        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
618        let version: i64 = req
619            .path_segments
620            .get(4)
621            .and_then(|s| s.parse().ok())
622            .unwrap_or(0);
623        let region = self.region_for(&req.account_id);
624        self.with_state_read(&req.account_id, &region, |state| {
625            let policy = state
626                .layers
627                .get(&layer_name)
628                .and_then(|l| l.versions.iter().find(|v| v.version == version))
629                .and_then(|v| v.policy.clone())
630                .unwrap_or_else(|| "{}".to_string());
631            ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
632        })
633    }
634
635    fn add_layer_version_permission(
636        &self,
637        req: &AwsRequest,
638    ) -> Result<AwsResponse, AwsServiceError> {
639        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
640        let version: i64 = req
641            .path_segments
642            .get(4)
643            .and_then(|s| s.parse().ok())
644            .unwrap_or(0);
645        let body = body(req);
646        let mut accounts = self.state.write();
647        let state = accounts.get_or_create(&req.account_id);
648        if let Some(layer) = state.layers.get_mut(&layer_name) {
649            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
650                let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
651                let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
652                let statements = policy_doc["Statement"].as_array_mut();
653                let new_stmt = json!({
654                    "Sid": body["StatementId"].as_str().unwrap_or("default"),
655                    "Effect": "Allow",
656                    "Principal": body["Principal"].clone(),
657                    "Action": body["Action"].clone(),
658                    "Resource": v.layer_version_arn.clone(),
659                });
660                if let Some(s) = statements {
661                    s.push(new_stmt);
662                } else {
663                    policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
664                }
665                v.policy = Some(policy_doc.to_string());
666            }
667        }
668        ok(json!({
669            "Statement": body["StatementId"],
670            "RevisionId": id_from_time("rev-"),
671        }))
672    }
673
674    fn remove_layer_version_permission(
675        &self,
676        req: &AwsRequest,
677    ) -> Result<AwsResponse, AwsServiceError> {
678        let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
679        let version: i64 = req
680            .path_segments
681            .get(4)
682            .and_then(|s| s.parse().ok())
683            .unwrap_or(0);
684        let sid = req.path_segments.get(6).cloned().unwrap_or_default();
685        let mut accounts = self.state.write();
686        let state = accounts.get_or_create(&req.account_id);
687        if let Some(layer) = state.layers.get_mut(&layer_name) {
688            if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
689                if let Some(policy) = v.policy.clone() {
690                    let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
691                    if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
692                        stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
693                    }
694                    v.policy = Some(policy_doc.to_string());
695                }
696            }
697        }
698        empty()
699    }
700
701    // ── Function URL ──
702
703    fn create_function_url_config(
704        &self,
705        function_name: &str,
706        req: &AwsRequest,
707    ) -> Result<AwsResponse, AwsServiceError> {
708        let body = body(req);
709        let auth_type = body["AuthType"].as_str().unwrap_or("NONE").to_string();
710        let now = Utc::now();
711        let mut accounts = self.state.write();
712        let state = accounts.get_or_create(&req.account_id);
713        if !state.functions.contains_key(function_name) {
714            return Err(not_found("Function", function_name));
715        }
716        let function_arn = format!(
717            "arn:aws:lambda:{}:{}:function:{}",
718            state.region, state.account_id, function_name
719        );
720        let cfg = FunctionUrlConfig {
721            function_arn: function_arn.clone(),
722            function_url: format!(
723                "https://{function_name}.lambda-url.{}.on.aws/",
724                state.region
725            ),
726            auth_type: auth_type.clone(),
727            cors: body.get("Cors").cloned(),
728            creation_time: now,
729            last_modified_time: now,
730            invoke_mode: body["InvokeMode"]
731                .as_str()
732                .unwrap_or("BUFFERED")
733                .to_string(),
734        };
735        state
736            .function_url_configs
737            .insert(function_name.to_string(), cfg.clone());
738        ok(serde_json::to_value(cfg).unwrap_or_default())
739    }
740
741    fn get_function_url_config(
742        &self,
743        function_name: &str,
744        account_id: &str,
745    ) -> Result<AwsResponse, AwsServiceError> {
746        let region = self.region_for(account_id);
747        self.with_state_read(account_id, &region, |state| {
748            state
749                .function_url_configs
750                .get(function_name)
751                .map(|c| ok(serde_json::to_value(c).unwrap_or_default()))
752                .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
753        })
754    }
755
756    fn update_function_url_config(
757        &self,
758        function_name: &str,
759        req: &AwsRequest,
760    ) -> Result<AwsResponse, AwsServiceError> {
761        let body = body(req);
762        let mut accounts = self.state.write();
763        let state = accounts.get_or_create(&req.account_id);
764        let cfg = state
765            .function_url_configs
766            .get_mut(function_name)
767            .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
768        if let Some(a) = body["AuthType"].as_str() {
769            cfg.auth_type = a.to_string();
770        }
771        if let Some(c) = body.get("Cors") {
772            cfg.cors = Some(c.clone());
773        }
774        if let Some(m) = body["InvokeMode"].as_str() {
775            cfg.invoke_mode = m.to_string();
776        }
777        cfg.last_modified_time = Utc::now();
778        ok(serde_json::to_value(cfg).unwrap_or_default())
779    }
780
781    fn delete_function_url_config(
782        &self,
783        function_name: &str,
784        account_id: &str,
785    ) -> Result<AwsResponse, AwsServiceError> {
786        let mut accounts = self.state.write();
787        let state = accounts.get_or_create(account_id);
788        state.function_url_configs.remove(function_name);
789        empty()
790    }
791
792    fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
793        let region = self.region_for(account_id);
794        self.with_state_read(account_id, &region, |state| {
795            let configs: Vec<&FunctionUrlConfig> = state.function_url_configs.values().collect();
796            ok(json!({"FunctionUrlConfigs": configs}))
797        })
798    }
799
800    // ── Concurrency ──
801
802    fn put_function_concurrency(
803        &self,
804        function_name: &str,
805        req: &AwsRequest,
806    ) -> Result<AwsResponse, AwsServiceError> {
807        let body = body(req);
808        let n = body["ReservedConcurrentExecutions"]
809            .as_i64()
810            .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
811        let mut accounts = self.state.write();
812        let state = accounts.get_or_create(&req.account_id);
813        state
814            .function_concurrency
815            .insert(function_name.to_string(), n);
816        ok(json!({"ReservedConcurrentExecutions": n}))
817    }
818
819    fn get_function_concurrency(
820        &self,
821        function_name: &str,
822        account_id: &str,
823    ) -> Result<AwsResponse, AwsServiceError> {
824        let region = self.region_for(account_id);
825        self.with_state_read(account_id, &region, |state| {
826            let n = state
827                .function_concurrency
828                .get(function_name)
829                .copied()
830                .unwrap_or(0);
831            ok(json!({"ReservedConcurrentExecutions": n}))
832        })
833    }
834
835    fn delete_function_concurrency(
836        &self,
837        function_name: &str,
838        account_id: &str,
839    ) -> Result<AwsResponse, AwsServiceError> {
840        let mut accounts = self.state.write();
841        let state = accounts.get_or_create(account_id);
842        state.function_concurrency.remove(function_name);
843        empty()
844    }
845
846    fn pc_key(function: &str, qualifier: &str) -> String {
847        format!("{function}:{qualifier}")
848    }
849
850    fn put_provisioned_concurrency(
851        &self,
852        function_name: &str,
853        req: &AwsRequest,
854    ) -> Result<AwsResponse, AwsServiceError> {
855        let body = body(req);
856        let qualifier = parse_qualifier(req);
857        let requested = body["ProvisionedConcurrentExecutions"]
858            .as_i64()
859            .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
860        let mut accounts = self.state.write();
861        let state = accounts.get_or_create(&req.account_id);
862        let cfg = ProvisionedConcurrencyConfig {
863            requested,
864            allocated: requested,
865            status: "READY".to_string(),
866            last_modified: Utc::now(),
867        };
868        state
869            .provisioned_concurrency
870            .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
871        ok(json!({
872            "RequestedProvisionedConcurrentExecutions": cfg.requested,
873            "AvailableProvisionedConcurrentExecutions": cfg.allocated,
874            "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
875            "Status": cfg.status,
876            "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
877        }))
878    }
879
880    fn get_provisioned_concurrency(
881        &self,
882        function_name: &str,
883        req: &AwsRequest,
884    ) -> Result<AwsResponse, AwsServiceError> {
885        let qualifier = parse_qualifier(req);
886        let region = self.region_for(&req.account_id);
887        self.with_state_read(&req.account_id, &region, |state| {
888            state
889                .provisioned_concurrency
890                .get(&Self::pc_key(function_name, &qualifier))
891                .map(|cfg| ok(json!({
892                    "RequestedProvisionedConcurrentExecutions": cfg.requested,
893                    "AvailableProvisionedConcurrentExecutions": cfg.allocated,
894                    "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
895                    "Status": cfg.status,
896                    "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
897                })))
898                .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
899        })
900    }
901
902    fn delete_provisioned_concurrency(
903        &self,
904        function_name: &str,
905        req: &AwsRequest,
906    ) -> Result<AwsResponse, AwsServiceError> {
907        let qualifier = parse_qualifier(req);
908        let mut accounts = self.state.write();
909        let state = accounts.get_or_create(&req.account_id);
910        state
911            .provisioned_concurrency
912            .remove(&Self::pc_key(function_name, &qualifier));
913        empty()
914    }
915
916    fn list_provisioned_concurrency(
917        &self,
918        function_name: &str,
919        account_id: &str,
920    ) -> Result<AwsResponse, AwsServiceError> {
921        let region = self.region_for(account_id);
922        self.with_state_read(account_id, &region, |state| {
923            let prefix = format!("{function_name}:");
924            let configs: Vec<Value> = state
925                .provisioned_concurrency
926                .iter()
927                .filter(|(k, _)| k.starts_with(&prefix))
928                .map(|(k, cfg)| {
929                    let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
930                    json!({
931                        "FunctionArn": format!(
932                            "arn:aws:lambda:{}:{}:function:{}:{}",
933                            state.region, state.account_id, function_name, qualifier
934                        ),
935                        "Status": cfg.status,
936                        "RequestedProvisionedConcurrentExecutions": cfg.requested,
937                        "AvailableProvisionedConcurrentExecutions": cfg.allocated,
938                        "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
939                        "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
940                    })
941                })
942                .collect();
943            ok(json!({"ProvisionedConcurrencyConfigs": configs}))
944        })
945    }
946
947    // ── Code signing ──
948
949    fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
950        let body = body(req);
951        let mut accounts = self.state.write();
952        let state = accounts.get_or_create(&req.account_id);
953        let id = id_from_time("csc-");
954        let arn = format!(
955            "arn:aws:lambda:{}:{}:code-signing-config:{}",
956            state.region, state.account_id, id
957        );
958        let publishers: Vec<String> = body
959            .get("AllowedPublishers")
960            .and_then(|v| v.get("SigningProfileVersionArns"))
961            .and_then(|v| v.as_array())
962            .map(|arr| {
963                arr.iter()
964                    .filter_map(|x| x.as_str().map(String::from))
965                    .collect()
966            })
967            .unwrap_or_default();
968        let csc = CodeSigningConfig {
969            csc_id: id.clone(),
970            csc_arn: arn,
971            description: body["Description"].as_str().unwrap_or("").to_string(),
972            allowed_publishers: publishers,
973            untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
974                .as_str()
975                .unwrap_or("Warn")
976                .to_string(),
977            last_modified: Utc::now(),
978        };
979        state.code_signing_configs.insert(id, csc.clone());
980        ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
981    }
982
983    fn get_code_signing_config(
984        &self,
985        csc_id: &str,
986        account_id: &str,
987    ) -> Result<AwsResponse, AwsServiceError> {
988        let id = extract_csc_id(csc_id);
989        let region = self.region_for(account_id);
990        self.with_state_read(account_id, &region, |state| {
991            state
992                .code_signing_configs
993                .get(&id)
994                .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
995                .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
996        })
997    }
998
999    fn update_code_signing_config(
1000        &self,
1001        csc_id: &str,
1002        req: &AwsRequest,
1003    ) -> Result<AwsResponse, AwsServiceError> {
1004        let body = body(req);
1005        let mut accounts = self.state.write();
1006        let state = accounts.get_or_create(&req.account_id);
1007        let id = extract_csc_id(csc_id);
1008        let csc = state
1009            .code_signing_configs
1010            .get_mut(&id)
1011            .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1012        if let Some(d) = body["Description"].as_str() {
1013            csc.description = d.to_string();
1014        }
1015        if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1016        {
1017            csc.untrusted_artifact_action = action.to_string();
1018        }
1019        csc.last_modified = Utc::now();
1020        ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1021    }
1022
1023    fn delete_code_signing_config(
1024        &self,
1025        csc_id: &str,
1026        account_id: &str,
1027    ) -> Result<AwsResponse, AwsServiceError> {
1028        let id = extract_csc_id(csc_id);
1029        let mut accounts = self.state.write();
1030        let state = accounts.get_or_create(account_id);
1031        state.code_signing_configs.remove(&id);
1032        empty()
1033    }
1034
1035    fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1036        let region = self.region_for(account_id);
1037        self.with_state_read(account_id, &region, |state| {
1038            let cfgs: Vec<Value> = state
1039                .code_signing_configs
1040                .values()
1041                .map(code_signing_json)
1042                .collect();
1043            ok(json!({"CodeSigningConfigs": cfgs}))
1044        })
1045    }
1046
1047    fn put_function_code_signing(
1048        &self,
1049        function_name: &str,
1050        req: &AwsRequest,
1051    ) -> Result<AwsResponse, AwsServiceError> {
1052        let body = body(req);
1053        let csc_arn = body["CodeSigningConfigArn"]
1054            .as_str()
1055            .ok_or_else(|| missing("CodeSigningConfigArn"))?
1056            .to_string();
1057        let mut accounts = self.state.write();
1058        let state = accounts.get_or_create(&req.account_id);
1059        state
1060            .function_code_signing
1061            .insert(function_name.to_string(), csc_arn.clone());
1062        ok(json!({
1063            "CodeSigningConfigArn": csc_arn,
1064            "FunctionName": function_name,
1065        }))
1066    }
1067
1068    fn get_function_code_signing(
1069        &self,
1070        function_name: &str,
1071        account_id: &str,
1072    ) -> Result<AwsResponse, AwsServiceError> {
1073        let region = self.region_for(account_id);
1074        self.with_state_read(account_id, &region, |state| {
1075            let arn = state
1076                .function_code_signing
1077                .get(function_name)
1078                .cloned()
1079                .unwrap_or_default();
1080            ok(json!({
1081                "CodeSigningConfigArn": arn,
1082                "FunctionName": function_name,
1083            }))
1084        })
1085    }
1086
1087    fn delete_function_code_signing(
1088        &self,
1089        function_name: &str,
1090        account_id: &str,
1091    ) -> Result<AwsResponse, AwsServiceError> {
1092        let mut accounts = self.state.write();
1093        let state = accounts.get_or_create(account_id);
1094        state.function_code_signing.remove(function_name);
1095        empty()
1096    }
1097
1098    fn list_functions_by_code_signing(
1099        &self,
1100        csc_id: &str,
1101        account_id: &str,
1102    ) -> Result<AwsResponse, AwsServiceError> {
1103        let id = extract_csc_id(csc_id);
1104        let region = self.region_for(account_id);
1105        self.with_state_read(account_id, &region, |state| {
1106            let funcs: Vec<&String> = state
1107                .function_code_signing
1108                .iter()
1109                .filter(|(_, v)| v.contains(&id))
1110                .map(|(k, _)| k)
1111                .collect();
1112            ok(json!({"FunctionArns": funcs}))
1113        })
1114    }
1115
1116    // ── Event invoke ──
1117
1118    fn ev_key(function: &str, qualifier: &str) -> String {
1119        format!("{function}:{qualifier}")
1120    }
1121
1122    fn put_function_event_invoke(
1123        &self,
1124        function_name: &str,
1125        req: &AwsRequest,
1126    ) -> Result<AwsResponse, AwsServiceError> {
1127        let body = body(req);
1128        let qualifier = parse_qualifier(req);
1129        let function_arn = format!(
1130            "arn:aws:lambda:{}:{}:function:{}",
1131            self.region_for(&req.account_id),
1132            req.account_id,
1133            function_name
1134        );
1135        let cfg = EventInvokeConfig {
1136            function_arn: function_arn.clone(),
1137            maximum_event_age: body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600),
1138            maximum_retry_attempts: body["MaximumRetryAttempts"].as_i64().unwrap_or(2),
1139            destination_config: body.get("DestinationConfig").cloned().unwrap_or(json!({})),
1140            last_modified: Utc::now(),
1141        };
1142        let mut accounts = self.state.write();
1143        let state = accounts.get_or_create(&req.account_id);
1144        state
1145            .event_invoke_configs
1146            .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1147        ok(event_invoke_json(&cfg))
1148    }
1149
1150    fn get_function_event_invoke(
1151        &self,
1152        function_name: &str,
1153        req: &AwsRequest,
1154    ) -> Result<AwsResponse, AwsServiceError> {
1155        let qualifier = parse_qualifier(req);
1156        let region = self.region_for(&req.account_id);
1157        self.with_state_read(&req.account_id, &region, |state| {
1158            state
1159                .event_invoke_configs
1160                .get(&Self::ev_key(function_name, &qualifier))
1161                .map(|c| ok(event_invoke_json(c)))
1162                .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
1163        })
1164    }
1165
1166    fn delete_function_event_invoke(
1167        &self,
1168        function_name: &str,
1169        req: &AwsRequest,
1170    ) -> Result<AwsResponse, AwsServiceError> {
1171        let qualifier = parse_qualifier(req);
1172        let mut accounts = self.state.write();
1173        let state = accounts.get_or_create(&req.account_id);
1174        state
1175            .event_invoke_configs
1176            .remove(&Self::ev_key(function_name, &qualifier));
1177        empty()
1178    }
1179
1180    fn list_function_event_invoke(
1181        &self,
1182        function_name: &str,
1183        account_id: &str,
1184    ) -> Result<AwsResponse, AwsServiceError> {
1185        let region = self.region_for(account_id);
1186        self.with_state_read(account_id, &region, |state| {
1187            let prefix = format!("{function_name}:");
1188            let configs: Vec<Value> = state
1189                .event_invoke_configs
1190                .iter()
1191                .filter(|(k, _)| k.starts_with(&prefix))
1192                .map(|(_, c)| event_invoke_json(c))
1193                .collect();
1194            ok(json!({"FunctionEventInvokeConfigs": configs}))
1195        })
1196    }
1197
1198    // ── Runtime management ──
1199
1200    fn put_runtime_management(
1201        &self,
1202        function_name: &str,
1203        req: &AwsRequest,
1204    ) -> Result<AwsResponse, AwsServiceError> {
1205        let body = body(req);
1206        let qualifier = parse_qualifier(req);
1207        let cfg = RuntimeManagementConfig {
1208            update_runtime_on: body["UpdateRuntimeOn"]
1209                .as_str()
1210                .unwrap_or("Auto")
1211                .to_string(),
1212            runtime_version_arn: body["RuntimeVersionArn"].as_str().unwrap_or("").to_string(),
1213        };
1214        let mut accounts = self.state.write();
1215        let state = accounts.get_or_create(&req.account_id);
1216        state
1217            .runtime_management
1218            .insert(format!("{function_name}:{qualifier}"), cfg.clone());
1219        ok(json!({
1220            "FunctionArn": format!("arn:aws:lambda:{}:{}:function:{}:{}", state.region, state.account_id, function_name, qualifier),
1221            "UpdateRuntimeOn": cfg.update_runtime_on,
1222            "RuntimeVersionArn": cfg.runtime_version_arn,
1223        }))
1224    }
1225
1226    fn get_runtime_management(
1227        &self,
1228        function_name: &str,
1229        req: &AwsRequest,
1230    ) -> Result<AwsResponse, AwsServiceError> {
1231        let qualifier = parse_qualifier(req);
1232        let region = self.region_for(&req.account_id);
1233        self.with_state_read(&req.account_id, &region, |state| {
1234            let cfg = state
1235                .runtime_management
1236                .get(&format!("{function_name}:{qualifier}"))
1237                .cloned()
1238                .unwrap_or(RuntimeManagementConfig {
1239                    update_runtime_on: "Auto".to_string(),
1240                    runtime_version_arn: String::new(),
1241                });
1242            ok(json!({
1243                "FunctionArn": format!(
1244                    "arn:aws:lambda:{}:{}:function:{}:{}",
1245                    state.region, state.account_id, function_name, qualifier
1246                ),
1247                "UpdateRuntimeOn": cfg.update_runtime_on,
1248                "RuntimeVersionArn": cfg.runtime_version_arn,
1249            }))
1250        })
1251    }
1252
1253    // ── Scaling ──
1254
1255    fn put_scaling_config(
1256        &self,
1257        uuid: &str,
1258        req: &AwsRequest,
1259    ) -> Result<AwsResponse, AwsServiceError> {
1260        let body = body(req);
1261        let cfg = FunctionScalingConfig {
1262            maximum_concurrency: body["MaximumConcurrency"].as_i64().unwrap_or(0),
1263        };
1264        let mut accounts = self.state.write();
1265        let state = accounts.get_or_create(&req.account_id);
1266        state.scaling_configs.insert(uuid.to_string(), cfg.clone());
1267        ok(json!({
1268            "MaximumConcurrency": cfg.maximum_concurrency,
1269        }))
1270    }
1271
1272    fn get_scaling_config(
1273        &self,
1274        uuid: &str,
1275        account_id: &str,
1276    ) -> Result<AwsResponse, AwsServiceError> {
1277        let region = self.region_for(account_id);
1278        self.with_state_read(account_id, &region, |state| {
1279            let n = state
1280                .scaling_configs
1281                .get(uuid)
1282                .map(|c| c.maximum_concurrency)
1283                .unwrap_or(0);
1284            ok(json!({"MaximumConcurrency": n}))
1285        })
1286    }
1287
1288    // ── Recursion ──
1289
1290    fn put_recursion_config(
1291        &self,
1292        function_name: &str,
1293        req: &AwsRequest,
1294    ) -> Result<AwsResponse, AwsServiceError> {
1295        let body = body(req);
1296        let mode = body["RecursiveLoop"]
1297            .as_str()
1298            .unwrap_or("Terminate")
1299            .to_string();
1300        let mut accounts = self.state.write();
1301        let state = accounts.get_or_create(&req.account_id);
1302        state
1303            .recursion_configs
1304            .insert(function_name.to_string(), mode.clone());
1305        ok(json!({"RecursiveLoop": mode}))
1306    }
1307
1308    fn get_recursion_config(
1309        &self,
1310        function_name: &str,
1311        account_id: &str,
1312    ) -> Result<AwsResponse, AwsServiceError> {
1313        let region = self.region_for(account_id);
1314        self.with_state_read(account_id, &region, |state| {
1315            let mode = state
1316                .recursion_configs
1317                .get(function_name)
1318                .cloned()
1319                .unwrap_or_else(|| "Terminate".to_string());
1320            ok(json!({"RecursiveLoop": mode}))
1321        })
1322    }
1323
1324    // ── Tags ──
1325
1326    fn tag_resource(
1327        &self,
1328        resource_arn: &str,
1329        req: &AwsRequest,
1330    ) -> Result<AwsResponse, AwsServiceError> {
1331        let body = body(req);
1332        let new_tags: Vec<(String, String)> = body
1333            .get("Tags")
1334            .and_then(|v| v.as_object())
1335            .map(|m| {
1336                m.iter()
1337                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1338                    .collect()
1339            })
1340            .unwrap_or_default();
1341        let mut accounts = self.state.write();
1342        let state = accounts.get_or_create(&req.account_id);
1343        let entry = state.tags.entry(resource_arn.to_string()).or_default();
1344        for (k, v) in new_tags {
1345            entry.retain(|(ek, _)| ek != &k);
1346            entry.push((k, v));
1347        }
1348        empty()
1349    }
1350
1351    fn untag_resource(
1352        &self,
1353        resource_arn: &str,
1354        req: &AwsRequest,
1355    ) -> Result<AwsResponse, AwsServiceError> {
1356        let mut keys: Vec<String> = Vec::new();
1357        for (k, v) in &req.query_params {
1358            if k.starts_with("tagKeys") {
1359                keys.push(v.clone());
1360            }
1361        }
1362        let mut accounts = self.state.write();
1363        let state = accounts.get_or_create(&req.account_id);
1364        if let Some(entry) = state.tags.get_mut(resource_arn) {
1365            entry.retain(|(k, _)| !keys.contains(k));
1366        }
1367        empty()
1368    }
1369
1370    fn list_tags(
1371        &self,
1372        resource_arn: &str,
1373        account_id: &str,
1374    ) -> Result<AwsResponse, AwsServiceError> {
1375        let region = self.region_for(account_id);
1376        self.with_state_read(account_id, &region, |state| {
1377            let tags: serde_json::Map<String, Value> = state
1378                .tags
1379                .get(resource_arn)
1380                .map(|v| {
1381                    v.iter()
1382                        .map(|(k, val)| (k.clone(), Value::String(val.clone())))
1383                        .collect()
1384                })
1385                .unwrap_or_default();
1386            ok(json!({"Tags": tags}))
1387        })
1388    }
1389
1390    // ── Capacity providers ──
1391
1392    fn create_capacity_provider(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1393        let body = body(req);
1394        let name = body["CapacityProviderName"]
1395            .as_str()
1396            .or_else(|| body["Name"].as_str())
1397            .ok_or_else(|| missing("CapacityProviderName"))?
1398            .to_string();
1399        let mut accounts = self.state.write();
1400        let state = accounts.get_or_create(&req.account_id);
1401        let arn = format!(
1402            "arn:aws:lambda:{}:{}:capacity-provider:{}",
1403            state.region, state.account_id, name
1404        );
1405        let cp = CapacityProvider {
1406            name: name.clone(),
1407            arn: arn.clone(),
1408            status: "ACTIVE".to_string(),
1409            created: Utc::now(),
1410        };
1411        state.capacity_providers.insert(name, cp.clone());
1412        ok(json!({
1413            "Name": cp.name,
1414            "Arn": cp.arn,
1415            "Status": cp.status,
1416        }))
1417    }
1418
1419    fn get_capacity_provider(
1420        &self,
1421        name: &str,
1422        account_id: &str,
1423    ) -> Result<AwsResponse, AwsServiceError> {
1424        let region = self.region_for(account_id);
1425        self.with_state_read(account_id, &region, |state| {
1426            state
1427                .capacity_providers
1428                .get(name)
1429                .map(|cp| {
1430                    ok(json!({
1431                        "Name": cp.name,
1432                        "Arn": cp.arn,
1433                        "Status": cp.status,
1434                    }))
1435                })
1436                .unwrap_or_else(|| Err(not_found("CapacityProvider", name)))
1437        })
1438    }
1439
1440    fn update_capacity_provider(
1441        &self,
1442        name: &str,
1443        req: &AwsRequest,
1444    ) -> Result<AwsResponse, AwsServiceError> {
1445        let mut accounts = self.state.write();
1446        let state = accounts.get_or_create(&req.account_id);
1447        let cp = state
1448            .capacity_providers
1449            .get_mut(name)
1450            .ok_or_else(|| not_found("CapacityProvider", name))?;
1451        cp.status = "ACTIVE".to_string();
1452        ok(json!({
1453            "Name": cp.name,
1454            "Arn": cp.arn,
1455            "Status": cp.status,
1456        }))
1457    }
1458
1459    fn delete_capacity_provider(
1460        &self,
1461        name: &str,
1462        account_id: &str,
1463    ) -> Result<AwsResponse, AwsServiceError> {
1464        let mut accounts = self.state.write();
1465        let state = accounts.get_or_create(account_id);
1466        state.capacity_providers.remove(name);
1467        empty()
1468    }
1469
1470    fn list_capacity_providers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1471        let region = self.region_for(account_id);
1472        self.with_state_read(account_id, &region, |state| {
1473            let cps: Vec<Value> = state
1474                .capacity_providers
1475                .values()
1476                .map(|cp| {
1477                    json!({
1478                        "Name": cp.name,
1479                        "Arn": cp.arn,
1480                        "Status": cp.status,
1481                    })
1482                })
1483                .collect();
1484            ok(json!({"CapacityProviders": cps}))
1485        })
1486    }
1487
1488    fn list_versions_by_capacity_provider(
1489        &self,
1490        _name: &str,
1491        _account_id: &str,
1492    ) -> Result<AwsResponse, AwsServiceError> {
1493        ok(json!({"FunctionVersions": []}))
1494    }
1495
1496    // ── Durable executions ──
1497
1498    fn checkpoint_durable_execution(
1499        &self,
1500        id: &str,
1501        req: &AwsRequest,
1502    ) -> Result<AwsResponse, AwsServiceError> {
1503        let body = body(req);
1504        let body_arn = body
1505            .get("FunctionArn")
1506            .and_then(|v| v.as_str())
1507            .map(String::from);
1508        let body_function = body
1509            .get("FunctionName")
1510            .and_then(|v| v.as_str())
1511            .map(String::from);
1512        let mut accounts = self.state.write();
1513        let state = accounts.get_or_create(&req.account_id);
1514        let derived_arn = body_arn.unwrap_or_else(|| match body_function {
1515            Some(name) if name.starts_with("arn:") => name,
1516            Some(name) => format!(
1517                "arn:aws:lambda:us-east-1:{}:function:{name}",
1518                req.account_id
1519            ),
1520            None => String::new(),
1521        });
1522        let exec = state
1523            .durable_executions
1524            .entry(id.to_string())
1525            .or_insert_with(|| DurableExecution {
1526                id: id.to_string(),
1527                function_arn: derived_arn.clone(),
1528                status: "RUNNING".to_string(),
1529                started: Utc::now(),
1530                stopped: None,
1531                history: Vec::new(),
1532                state: json!({}),
1533            });
1534        if exec.function_arn.is_empty() && !derived_arn.is_empty() {
1535            exec.function_arn = derived_arn;
1536        }
1537        if let Some(s) = body.get("State") {
1538            exec.state = s.clone();
1539        }
1540        if let Some(h) = body.get("HistoryEvent") {
1541            exec.history.push(h.clone());
1542        }
1543        empty()
1544    }
1545
1546    fn get_durable_execution(
1547        &self,
1548        id: &str,
1549        account_id: &str,
1550    ) -> Result<AwsResponse, AwsServiceError> {
1551        let region = self.region_for(account_id);
1552        self.with_state_read(account_id, &region, |state| {
1553            state
1554                .durable_executions
1555                .get(id)
1556                .map(|e| {
1557                    ok(json!({
1558                        "Id": e.id,
1559                        "FunctionArn": e.function_arn,
1560                        "Status": e.status,
1561                        "Started": e.started.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1562                        "Stopped": e.stopped.map(|d| d.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
1563                    }))
1564                })
1565                .unwrap_or_else(|| Err(not_found("DurableExecution", id)))
1566        })
1567    }
1568
1569    fn get_durable_execution_history(
1570        &self,
1571        id: &str,
1572        account_id: &str,
1573    ) -> Result<AwsResponse, AwsServiceError> {
1574        let region = self.region_for(account_id);
1575        self.with_state_read(account_id, &region, |state| {
1576            let history = state
1577                .durable_executions
1578                .get(id)
1579                .map(|e| e.history.clone())
1580                .unwrap_or_default();
1581            ok(json!({"Events": history}))
1582        })
1583    }
1584
1585    fn get_durable_execution_state(
1586        &self,
1587        id: &str,
1588        account_id: &str,
1589    ) -> Result<AwsResponse, AwsServiceError> {
1590        let region = self.region_for(account_id);
1591        self.with_state_read(account_id, &region, |state| {
1592            let s = state
1593                .durable_executions
1594                .get(id)
1595                .map(|e| e.state.clone())
1596                .unwrap_or(json!({}));
1597            ok(json!({"State": s}))
1598        })
1599    }
1600
1601    fn list_durable_executions_by_function(
1602        &self,
1603        function_name: &str,
1604        account_id: &str,
1605    ) -> Result<AwsResponse, AwsServiceError> {
1606        let region = self.region_for(account_id);
1607        self.with_state_read(account_id, &region, |state| {
1608            let executions: Vec<Value> = state
1609                .durable_executions
1610                .values()
1611                .filter(|e| e.function_arn.contains(function_name))
1612                .map(|e| {
1613                    json!({
1614                        "Id": e.id,
1615                        "Status": e.status,
1616                    })
1617                })
1618                .collect();
1619            ok(json!({"DurableExecutions": executions}))
1620        })
1621    }
1622
1623    fn stop_durable_execution(
1624        &self,
1625        id: &str,
1626        account_id: &str,
1627    ) -> Result<AwsResponse, AwsServiceError> {
1628        let mut accounts = self.state.write();
1629        let state = accounts.get_or_create(account_id);
1630        if let Some(e) = state.durable_executions.get_mut(id) {
1631            e.status = "STOPPED".to_string();
1632            e.stopped = Some(Utc::now());
1633        }
1634        empty()
1635    }
1636
1637    fn send_durable_callback(
1638        &self,
1639        id: &str,
1640        _req: &AwsRequest,
1641        kind: &str,
1642    ) -> Result<AwsResponse, AwsServiceError> {
1643        let mut accounts = self.state.write();
1644        let state = accounts.get_or_create(_req.account_id.as_str());
1645        if let Some(e) = state.durable_executions.get_mut(id) {
1646            e.history.push(
1647                json!({"type": format!("Callback{kind}"), "timestamp": Utc::now().to_rfc3339()}),
1648            );
1649            if kind == "SUCCESS" {
1650                e.status = "SUCCEEDED".to_string();
1651            } else if kind == "FAILURE" {
1652                e.status = "FAILED".to_string();
1653            }
1654        }
1655        empty()
1656    }
1657
1658    fn update_event_source_mapping_handler(
1659        &self,
1660        uuid: &str,
1661        req: &AwsRequest,
1662    ) -> Result<AwsResponse, AwsServiceError> {
1663        let body = body(req);
1664        let mut accounts = self.state.write();
1665        let state = accounts.get_or_create(&req.account_id);
1666        let esm = state
1667            .event_source_mappings
1668            .get_mut(uuid)
1669            .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
1670        if let Some(b) = body["BatchSize"].as_i64() {
1671            esm.batch_size = b;
1672        }
1673        if let Some(name) = body["FunctionName"].as_str() {
1674            esm.function_arn = format!(
1675                "arn:aws:lambda:{}:{}:function:{}",
1676                state.region, state.account_id, name
1677            );
1678        }
1679        if let Some(filters) = body
1680            .get("FilterCriteria")
1681            .and_then(|v| v.get("Filters"))
1682            .and_then(|v| v.as_array())
1683        {
1684            esm.filter_patterns = filters
1685                .iter()
1686                .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
1687                .collect();
1688        }
1689        if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
1690            esm.function_response_types = types
1691                .iter()
1692                .filter_map(|v| v.as_str().map(String::from))
1693                .collect();
1694        }
1695        if let Some(w) = body
1696            .get("MaximumBatchingWindowInSeconds")
1697            .and_then(|v| v.as_i64())
1698        {
1699            esm.maximum_batching_window_in_seconds = Some(w);
1700        }
1701        if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
1702            esm.parallelization_factor = Some(p);
1703        }
1704        let mut body_json = json!({
1705            "UUID": esm.uuid,
1706            "FunctionArn": esm.function_arn,
1707            "EventSourceArn": esm.event_source_arn,
1708            "BatchSize": esm.batch_size,
1709            "State": "Enabled",
1710            "StateTransitionReason": "USER_INITIATED",
1711            "LastModified": chrono::Utc::now().timestamp() as f64,
1712        });
1713        let obj = body_json.as_object_mut().expect("json! built object");
1714        if !esm.filter_patterns.is_empty() {
1715            obj.insert(
1716                "FilterCriteria".into(),
1717                json!({
1718                    "Filters": esm
1719                        .filter_patterns
1720                        .iter()
1721                        .map(|p| json!({"Pattern": p}))
1722                        .collect::<Vec<_>>(),
1723                }),
1724            );
1725        }
1726        if !esm.function_response_types.is_empty() {
1727            obj.insert(
1728                "FunctionResponseTypes".into(),
1729                json!(esm.function_response_types),
1730            );
1731        }
1732        if let Some(w) = esm.maximum_batching_window_in_seconds {
1733            obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1734        }
1735        if let Some(p) = esm.parallelization_factor {
1736            obj.insert("ParallelizationFactor".into(), json!(p));
1737        }
1738        ok(body_json)
1739    }
1740
1741    fn region_for(&self, account_id: &str) -> String {
1742        let accounts = self.state.read();
1743        accounts
1744            .get(account_id)
1745            .map(|s| s.region.clone())
1746            .unwrap_or_else(|| "us-east-1".to_string())
1747    }
1748}
1749
1750fn extract_csc_id(input: &str) -> String {
1751    // Decode percent encoding then take the segment after the last colon
1752    // (csc id), or treat as id if no colon present.
1753    let decoded = percent_decode(input);
1754    decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1755}
1756
1757fn percent_decode(input: &str) -> String {
1758    let mut out = String::with_capacity(input.len());
1759    let bytes = input.as_bytes();
1760    let mut i = 0;
1761    while i < bytes.len() {
1762        if bytes[i] == b'%' && i + 2 < bytes.len() {
1763            let hi = (bytes[i + 1] as char).to_digit(16);
1764            let lo = (bytes[i + 2] as char).to_digit(16);
1765            if let (Some(h), Some(l)) = (hi, lo) {
1766                out.push(((h * 16 + l) as u8) as char);
1767                i += 3;
1768                continue;
1769            }
1770        }
1771        out.push(bytes[i] as char);
1772        i += 1;
1773    }
1774    out
1775}
1776
1777fn code_signing_json(c: &CodeSigningConfig) -> Value {
1778    json!({
1779        "CodeSigningConfigId": c.csc_id,
1780        "CodeSigningConfigArn": c.csc_arn,
1781        "Description": c.description,
1782        "AllowedPublishers": {
1783            "SigningProfileVersionArns": c.allowed_publishers,
1784        },
1785        "CodeSigningPolicies": {
1786            "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1787        },
1788        "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1789    })
1790}
1791
1792fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1793    json!({
1794        "FunctionArn": c.function_arn,
1795        "MaximumEventAgeInSeconds": c.maximum_event_age,
1796        "MaximumRetryAttempts": c.maximum_retry_attempts,
1797        "DestinationConfig": c.destination_config,
1798        "LastModified": c.last_modified.timestamp(),
1799    })
1800}
1801
1802#[cfg(test)]
1803mod tests {
1804    use crate::service::LambdaService;
1805    use crate::state::{LambdaState, SharedLambdaState};
1806    use fakecloud_core::multi_account::MultiAccountState;
1807    use fakecloud_core::service::AwsRequest;
1808    use http::Method;
1809    use parking_lot::RwLock;
1810    use std::collections::HashMap;
1811    use std::sync::Arc;
1812
1813    fn svc() -> LambdaService {
1814        let state: SharedLambdaState = Arc::new(RwLock::new(
1815            MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1816        ));
1817        LambdaService::new(state)
1818    }
1819
1820    fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1821        AwsRequest {
1822            service: "lambda".to_string(),
1823            method: Method::POST,
1824            raw_path: format!("/{}", segs.join("/")),
1825            raw_query: String::new(),
1826            path_segments: segs.iter().map(|s| s.to_string()).collect(),
1827            query_params: HashMap::new(),
1828            headers: http::HeaderMap::new(),
1829            body: bytes::Bytes::from(body.to_string()),
1830            body_stream: parking_lot::Mutex::new(None),
1831            account_id: "000000000000".to_string(),
1832            region: "us-east-1".to_string(),
1833            request_id: "rid".to_string(),
1834            action: action.to_string(),
1835            is_query_protocol: false,
1836            access_key_id: None,
1837            principal: None,
1838        }
1839    }
1840
1841    async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1842        let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1843        match r {
1844            Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1845            Err(e) => panic!("{action} failed: {e:?}"),
1846        }
1847    }
1848
1849    #[tokio::test]
1850    async fn read_only_listings_succeed_without_state() {
1851        let s = svc();
1852        run(&s, "GetAccountSettings", "", None, &[]).await;
1853        run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1854        run(&s, "InvokeWithResponseStream", r#"{}"#, Some("fn"), &[]).await;
1855        run(&s, "ListLayers", "", None, &[]).await;
1856        run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1857        run(&s, "ListCapacityProviders", "", None, &[]).await;
1858    }
1859
1860    #[tokio::test]
1861    async fn layers_lifecycle() {
1862        let s = svc();
1863        run(
1864            &s,
1865            "PublishLayerVersion",
1866            r#"{"Content":{"ZipFile":""}}"#,
1867            Some("layer1"),
1868            &["2018-10-31", "layers", "layer1", "versions"],
1869        )
1870        .await;
1871        run(&s, "ListLayers", "", None, &[]).await;
1872        run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1873    }
1874
1875    #[tokio::test]
1876    async fn capacity_providers_lifecycle() {
1877        let s = svc();
1878        run(
1879            &s,
1880            "CreateCapacityProvider",
1881            r#"{"CapacityProviderName":"cp1"}"#,
1882            None,
1883            &[],
1884        )
1885        .await;
1886        run(&s, "GetCapacityProvider", "", Some("cp1"), &[]).await;
1887        run(&s, "ListCapacityProviders", "", None, &[]).await;
1888        run(&s, "UpdateCapacityProvider", r#"{}"#, Some("cp1"), &[]).await;
1889        run(&s, "DeleteCapacityProvider", "", Some("cp1"), &[]).await;
1890    }
1891
1892    #[tokio::test]
1893    async fn durable_executions() {
1894        let s = svc();
1895        run(
1896            &s,
1897            "CheckpointDurableExecution",
1898            r#"{"FunctionName":"fn"}"#,
1899            Some("d1"),
1900            &[],
1901        )
1902        .await;
1903        run(&s, "GetDurableExecution", "", Some("d1"), &[]).await;
1904        run(&s, "GetDurableExecutionHistory", "", Some("d1"), &[]).await;
1905        run(&s, "GetDurableExecutionState", "", Some("d1"), &[]).await;
1906        run(&s, "StopDurableExecution", "", Some("d1"), &[]).await;
1907    }
1908
1909    #[tokio::test]
1910    async fn code_signing_lifecycle() {
1911        let s = svc();
1912        run(
1913            &s,
1914            "CreateCodeSigningConfig",
1915            r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
1916            None,
1917            &[],
1918        )
1919        .await;
1920        run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
1921    }
1922}