1use chrono::Utc;
7use http::StatusCode;
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
13
14use crate::service::LambdaService;
15use crate::state::{
16 AccountSettings, AttachedLayer, CapacityProvider, CodeSigningConfig, DurableExecution,
17 EventInvokeConfig, FunctionAlias, FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer,
18 LayerVersion, ProvisionedConcurrencyConfig, RuntimeManagementConfig,
19};
20
21pub(crate) fn resolve_layer_attachments(
26 accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
27 arns: Vec<String>,
28) -> Vec<AttachedLayer> {
29 arns.into_iter()
30 .map(|arn| {
31 let code_size = parse_layer_version_arn(&arn)
32 .and_then(|(acct, name, ver)| {
33 accounts
34 .get(&acct)
35 .and_then(|s| s.layers.get(&name))
36 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
37 .map(|v| v.code_size)
38 })
39 .unwrap_or(0);
40 AttachedLayer { arn, code_size }
41 })
42 .collect()
43}
44
45fn missing(name: &str) -> AwsServiceError {
46 AwsServiceError::aws_error(
47 StatusCode::BAD_REQUEST,
48 "InvalidParameterValueException",
49 format!("Missing required field: {name}"),
50 )
51}
52
53fn not_found(entity: &str, name: &str) -> AwsServiceError {
54 AwsServiceError::aws_error(
55 StatusCode::NOT_FOUND,
56 "ResourceNotFoundException",
57 format!("{entity} not found: {name}"),
58 )
59}
60
61fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
62 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
63}
64
65fn empty() -> Result<AwsResponse, AwsServiceError> {
66 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
67}
68
69fn body(req: &AwsRequest) -> Value {
70 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
71}
72
73fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
78 let host = req
79 .headers
80 .get(http::header::HOST)
81 .and_then(|h| h.to_str().ok())
82 .unwrap_or("localhost");
83 let scheme = req
84 .headers
85 .get("x-forwarded-proto")
86 .and_then(|h| h.to_str().ok())
87 .unwrap_or("http");
88 format!(
89 "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
90 )
91}
92
93pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
97 let parts: Vec<&str> = arn.split(':').collect();
98 if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
99 return None;
100 }
101 let account = parts[4].to_string();
102 let name = parts[6].to_string();
103 let version: i64 = parts[7].parse().ok()?;
104 Some((account, name, version))
105}
106
107fn parse_qualifier(req: &AwsRequest) -> String {
108 req.query_params
109 .get("Qualifier")
110 .cloned()
111 .unwrap_or_else(|| "$LATEST".to_string())
112}
113
114fn id_from_time(prefix: &str) -> String {
115 format!(
116 "{}{}",
117 prefix,
118 std::time::SystemTime::now()
119 .duration_since(std::time::UNIX_EPOCH)
120 .map(|d| d.as_nanos())
121 .unwrap_or(0)
122 )
123}
124
125impl LambdaService {
126 pub(crate) async fn handle_extra(
127 &self,
128 action: &str,
129 resource: Option<&str>,
130 req: &AwsRequest,
131 ) -> Result<AwsResponse, AwsServiceError> {
132 let aid = req.account_id.as_str();
133 let res = resource.unwrap_or("");
134 match action {
135 "GetFunctionConfiguration" => self.get_function_configuration(res, aid),
137 "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
138 "UpdateFunctionCode" => self.update_function_code(res, req),
139 "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
140 "GetAccountSettings" => self.get_account_settings(aid),
141 "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
142 "InvokeWithResponseStream" => Ok(AwsResponse::json(StatusCode::OK, "{}".to_string())),
143
144 "ListVersionsByFunction" => self.list_versions_by_function(res, aid),
146
147 "CreateAlias" => self.create_alias(res, req),
149 "GetAlias" => self.get_alias(res, req),
150 "ListAliases" => self.list_aliases(res, aid),
151 "UpdateAlias" => self.update_alias(res, req),
152 "DeleteAlias" => self.delete_alias(res, req),
153
154 "PublishLayerVersion" => self.publish_layer_version(res, req),
156 "GetLayerVersion" => self.get_layer_version(req),
157 "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
158 "ListLayers" => self.list_layers(aid),
159 "ListLayerVersions" => self.list_layer_versions(res, aid),
160 "DeleteLayerVersion" => self.delete_layer_version(req),
161 "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
162 "AddLayerVersionPermission" => self.add_layer_version_permission(req),
163 "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
164
165 "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
167 "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
168 "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
169 "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
170 "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
171
172 "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
174 "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
175 "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
176 "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
177 "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
178 "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
179 "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
180
181 "CreateCodeSigningConfig" => self.create_code_signing_config(req),
183 "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
184 "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
185 "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
186 "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
187 "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
188 "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
189 "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
190 "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
191
192 "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
194 self.put_function_event_invoke(res, req)
195 }
196 "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
197 "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
198 "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
199
200 "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
202 "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
203
204 "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
206 "GetFunctionScalingConfig" => self.get_scaling_config(res, aid),
207
208 "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
210 "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
211
212 "TagResource" => self.tag_resource(res, req),
214 "UntagResource" => self.untag_resource(res, req),
215 "ListTags" => self.list_tags(res, aid),
216
217 "CreateCapacityProvider" => self.create_capacity_provider(req),
219 "GetCapacityProvider" => self.get_capacity_provider(res, aid),
220 "UpdateCapacityProvider" => self.update_capacity_provider(res, req),
221 "DeleteCapacityProvider" => self.delete_capacity_provider(res, aid),
222 "ListCapacityProviders" => self.list_capacity_providers(aid),
223 "ListFunctionVersionsByCapacityProvider" => {
224 self.list_versions_by_capacity_provider(res, aid)
225 }
226
227 "CheckpointDurableExecution" => self.checkpoint_durable_execution(res, req),
229 "GetDurableExecution" => self.get_durable_execution(res, aid),
230 "GetDurableExecutionHistory" => self.get_durable_execution_history(res, aid),
231 "GetDurableExecutionState" => self.get_durable_execution_state(res, aid),
232 "ListDurableExecutionsByFunction" => self.list_durable_executions_by_function(res, aid),
233 "StopDurableExecution" => self.stop_durable_execution(res, aid),
234 "SendDurableExecutionCallbackSuccess" => {
235 self.send_durable_callback(res, req, "SUCCESS")
236 }
237 "SendDurableExecutionCallbackFailure" => {
238 self.send_durable_callback(res, req, "FAILURE")
239 }
240 "SendDurableExecutionCallbackHeartbeat" => {
241 self.send_durable_callback(res, req, "HEARTBEAT")
242 }
243
244 _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
245 }
246 }
247
248 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
249 where
250 F: FnOnce(&LambdaState) -> R,
251 {
252 let accounts = self.state.read();
253 let empty = LambdaState::new(account_id, region);
254 let state = accounts.get(account_id).unwrap_or(&empty);
255 f(state)
256 }
257
258 fn get_function_configuration(
261 &self,
262 function_name: &str,
263 account_id: &str,
264 ) -> Result<AwsResponse, AwsServiceError> {
265 let region = self.region_for(account_id);
266 self.with_state_read(account_id, ®ion, |state| {
267 state
268 .functions
269 .get(function_name)
270 .map(|f| ok(self.function_config_json(f)))
271 .unwrap_or_else(|| Err(not_found("Function", function_name)))
272 })
273 }
274
275 fn update_function_configuration(
276 &self,
277 function_name: &str,
278 req: &AwsRequest,
279 ) -> Result<AwsResponse, AwsServiceError> {
280 let body = body(req);
281 let mut accounts = self.state.write();
282 let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
285 let arns: Vec<String> = arr
286 .iter()
287 .filter_map(|v| v.as_str().map(String::from))
288 .collect();
289 resolve_layer_attachments(&accounts, arns)
290 });
291 let state = accounts.get_or_create(&req.account_id);
292 let func = state
293 .functions
294 .get_mut(function_name)
295 .ok_or_else(|| not_found("Function", function_name))?;
296 if let Some(handler) = body["Handler"].as_str() {
297 func.handler = handler.to_string();
298 }
299 if let Some(t) = body["Timeout"].as_i64() {
300 func.timeout = t;
301 }
302 if let Some(m) = body["MemorySize"].as_i64() {
303 func.memory_size = m;
304 }
305 if let Some(role) = body["Role"].as_str() {
306 func.role = role.to_string();
307 }
308 if let Some(desc) = body["Description"].as_str() {
309 func.description = desc.to_string();
310 }
311 if let Some(attachments) = layer_attachments {
312 func.layers = attachments;
313 }
314 func.last_modified = Utc::now();
315 ok(self.function_config_json(func))
316 }
317
318 fn update_function_code(
319 &self,
320 function_name: &str,
321 req: &AwsRequest,
322 ) -> Result<AwsResponse, AwsServiceError> {
323 let mut accounts = self.state.write();
324 let state = accounts.get_or_create(&req.account_id);
325 let func = state
326 .functions
327 .get_mut(function_name)
328 .ok_or_else(|| not_found("Function", function_name))?;
329 func.last_modified = Utc::now();
330 ok(self.function_config_json(func))
331 }
332
333 fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
334 let mut accounts = self.state.write();
335 let state = accounts.get_or_create(account_id);
336 let settings = state.account_settings.clone().unwrap_or(AccountSettings {
337 concurrent_executions: 1000,
338 code_size_zipped: 52_428_800,
339 code_size_unzipped: 262_144_000,
340 total_code_size: 80_530_636_800,
341 });
342 if state.account_settings.is_none() {
343 state.account_settings = Some(settings.clone());
344 }
345 ok(json!({
346 "AccountLimit": {
347 "ConcurrentExecutions": settings.concurrent_executions,
348 "CodeSizeZipped": settings.code_size_zipped,
349 "CodeSizeUnzipped": settings.code_size_unzipped,
350 "TotalCodeSize": settings.total_code_size,
351 "UnreservedConcurrentExecutions": settings.concurrent_executions,
352 },
353 "AccountUsage": {
354 "TotalCodeSize": 0,
355 "FunctionCount": 0,
356 },
357 }))
358 }
359
360 fn list_versions_by_function(
363 &self,
364 function_name: &str,
365 account_id: &str,
366 ) -> Result<AwsResponse, AwsServiceError> {
367 let region = self.region_for(account_id);
368 self.with_state_read(account_id, ®ion, |state| {
369 if !state.functions.contains_key(function_name) {
370 return Err(not_found("Function", function_name));
371 }
372 let versions: Vec<&String> = state
373 .function_versions
374 .get(function_name)
375 .map(|v| v.iter().collect())
376 .unwrap_or_default();
377 ok(json!({
378 "Versions": versions,
379 }))
380 })
381 }
382
383 fn alias_key(function: &str, alias: &str) -> String {
386 format!("{function}:{alias}")
387 }
388
389 fn create_alias(
390 &self,
391 function_name: &str,
392 req: &AwsRequest,
393 ) -> Result<AwsResponse, AwsServiceError> {
394 let body = body(req);
395 let name = body["Name"]
396 .as_str()
397 .ok_or_else(|| missing("Name"))?
398 .to_string();
399 let version = body["FunctionVersion"]
400 .as_str()
401 .unwrap_or("$LATEST")
402 .to_string();
403 let mut accounts = self.state.write();
404 let state = accounts.get_or_create(&req.account_id);
405 if !state.functions.contains_key(function_name) {
406 return Err(not_found("Function", function_name));
407 }
408 let alias_arn = format!(
409 "arn:aws:lambda:{}:{}:function:{}:{}",
410 state.region, state.account_id, function_name, name
411 );
412 let alias = FunctionAlias {
413 alias_arn: alias_arn.clone(),
414 name: name.clone(),
415 function_version: version,
416 description: body["Description"].as_str().unwrap_or("").to_string(),
417 revision_id: id_from_time("rev-"),
418 routing_config: body.get("RoutingConfig").cloned(),
419 };
420 state
421 .aliases
422 .insert(Self::alias_key(function_name, &name), alias.clone());
423 ok(serde_json::to_value(alias).unwrap_or_default())
424 }
425
426 fn get_alias(
427 &self,
428 function_name: &str,
429 req: &AwsRequest,
430 ) -> Result<AwsResponse, AwsServiceError> {
431 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
432 let region = self.region_for(&req.account_id);
433 self.with_state_read(&req.account_id, ®ion, |state| {
434 state
435 .aliases
436 .get(&Self::alias_key(function_name, &alias_name))
437 .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
438 .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
439 })
440 }
441
442 fn list_aliases(
443 &self,
444 function_name: &str,
445 account_id: &str,
446 ) -> Result<AwsResponse, AwsServiceError> {
447 let region = self.region_for(account_id);
448 self.with_state_read(account_id, ®ion, |state| {
449 let prefix = format!("{function_name}:");
450 let aliases: Vec<&FunctionAlias> = state
451 .aliases
452 .iter()
453 .filter(|(k, _)| k.starts_with(&prefix))
454 .map(|(_, v)| v)
455 .collect();
456 ok(json!({"Aliases": aliases}))
457 })
458 }
459
460 fn update_alias(
461 &self,
462 function_name: &str,
463 req: &AwsRequest,
464 ) -> Result<AwsResponse, AwsServiceError> {
465 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
466 let body = body(req);
467 let mut accounts = self.state.write();
468 let state = accounts.get_or_create(&req.account_id);
469 let key = Self::alias_key(function_name, &alias_name);
470 let alias = state
471 .aliases
472 .get_mut(&key)
473 .ok_or_else(|| not_found("Alias", &alias_name))?;
474 if let Some(v) = body["FunctionVersion"].as_str() {
475 alias.function_version = v.to_string();
476 }
477 if let Some(d) = body["Description"].as_str() {
478 alias.description = d.to_string();
479 }
480 if let Some(rc) = body.get("RoutingConfig") {
481 alias.routing_config = Some(rc.clone());
482 }
483 alias.revision_id = id_from_time("rev-");
484 ok(serde_json::to_value(alias).unwrap_or_default())
485 }
486
487 fn delete_alias(
488 &self,
489 function_name: &str,
490 req: &AwsRequest,
491 ) -> Result<AwsResponse, AwsServiceError> {
492 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
493 let mut accounts = self.state.write();
494 let state = accounts.get_or_create(&req.account_id);
495 state
496 .aliases
497 .remove(&Self::alias_key(function_name, &alias_name));
498 empty()
499 }
500
501 fn publish_layer_version(
504 &self,
505 layer_name: &str,
506 req: &AwsRequest,
507 ) -> Result<AwsResponse, AwsServiceError> {
508 let body = body(req);
509 let zip_bytes: Option<Vec<u8>> = match body["Content"]["ZipFile"].as_str() {
510 Some(b64) => Some(
511 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
512 |_| {
513 AwsServiceError::aws_error(
514 StatusCode::BAD_REQUEST,
515 "InvalidParameterValueException",
516 "Could not decode Content.ZipFile: invalid base64",
517 )
518 },
519 )?,
520 ),
521 None => None,
522 };
523 let (code_sha256, code_size) = match zip_bytes.as_deref() {
524 Some(bytes) => {
525 let mut hasher = Sha256::new();
526 hasher.update(bytes);
527 let digest = hasher.finalize();
528 (
529 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, digest),
530 bytes.len() as i64,
531 )
532 }
533 None => (String::new(), 0),
534 };
535
536 let mut accounts = self.state.write();
537 let state = accounts.get_or_create(&req.account_id);
538 let account_id = state.account_id.clone();
539 let layer = state
540 .layers
541 .entry(layer_name.to_string())
542 .or_insert_with(|| Layer {
543 layer_name: layer_name.to_string(),
544 layer_arn: format!(
545 "arn:aws:lambda:{}:{}:layer:{}",
546 state.region, state.account_id, layer_name
547 ),
548 versions: Vec::new(),
549 });
550 let next_version = (layer.versions.len() as i64) + 1;
551 let version_arn = format!("{}:{}", layer.layer_arn, next_version);
552 let runtimes: Vec<String> = body["CompatibleRuntimes"]
553 .as_array()
554 .map(|arr| {
555 arr.iter()
556 .filter_map(|v| v.as_str().map(String::from))
557 .collect()
558 })
559 .unwrap_or_default();
560 let layer_arn = layer.layer_arn.clone();
561 let lv = LayerVersion {
562 version: next_version,
563 layer_version_arn: version_arn.clone(),
564 description: body["Description"].as_str().unwrap_or("").to_string(),
565 created_date: Utc::now(),
566 compatible_runtimes: runtimes,
567 license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
568 policy: None,
569 code_zip: zip_bytes,
570 code_sha256: code_sha256.clone(),
571 code_size,
572 };
573 layer.versions.push(lv.clone());
574 let location = layer_content_url(req, &account_id, layer_name, next_version);
575 ok(json!({
576 "LayerArn": layer_arn,
577 "LayerVersionArn": version_arn,
578 "Version": next_version,
579 "Description": lv.description,
580 "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
581 "CompatibleRuntimes": lv.compatible_runtimes,
582 "LicenseInfo": lv.license_info,
583 "Content": {
584 "Location": location,
585 "CodeSha256": code_sha256,
586 "CodeSize": code_size,
587 },
588 }))
589 }
590
591 fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
592 let region = self.region_for(account_id);
593 self.with_state_read(account_id, ®ion, |state| {
594 let layers: Vec<Value> = state
595 .layers
596 .values()
597 .map(|l| {
598 json!({
599 "LayerName": l.layer_name,
600 "LayerArn": l.layer_arn,
601 "LatestMatchingVersion": l.versions.last().map(|v| json!({
602 "LayerVersionArn": v.layer_version_arn,
603 "Version": v.version,
604 "Description": v.description,
605 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
606 "CompatibleRuntimes": v.compatible_runtimes,
607 })),
608 })
609 })
610 .collect();
611 ok(json!({"Layers": layers}))
612 })
613 }
614
615 fn list_layer_versions(
616 &self,
617 layer_name: &str,
618 account_id: &str,
619 ) -> Result<AwsResponse, AwsServiceError> {
620 let region = self.region_for(account_id);
621 self.with_state_read(account_id, ®ion, |state| {
622 let versions: Vec<Value> = state
623 .layers
624 .get(layer_name)
625 .map(|l| {
626 l.versions
627 .iter()
628 .map(|v| {
629 json!({
630 "LayerVersionArn": v.layer_version_arn,
631 "Version": v.version,
632 "Description": v.description,
633 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
634 "CompatibleRuntimes": v.compatible_runtimes,
635 "LicenseInfo": v.license_info,
636 })
637 })
638 .collect()
639 })
640 .unwrap_or_default();
641 ok(json!({"LayerVersions": versions}))
642 })
643 }
644
645 fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
646 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
647 let version: i64 = req
648 .path_segments
649 .get(4)
650 .and_then(|s| s.parse().ok())
651 .ok_or_else(|| missing("VersionNumber"))?;
652 let region = self.region_for(&req.account_id);
653 let location = layer_content_url(req, &req.account_id, &layer_name, version);
654 self.with_state_read(&req.account_id, ®ion, |state| {
655 state
656 .layers
657 .get(&layer_name)
658 .and_then(|l| l.versions.iter().find(|v| v.version == version))
659 .map(|v| {
660 ok(json!({
661 "LayerVersionArn": v.layer_version_arn,
662 "Version": v.version,
663 "Description": v.description,
664 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
665 "CompatibleRuntimes": v.compatible_runtimes,
666 "LicenseInfo": v.license_info,
667 "Content": {
668 "Location": location,
669 "CodeSha256": v.code_sha256,
670 "CodeSize": v.code_size,
671 },
672 }))
673 })
674 .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
675 })
676 }
677
678 fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679 let arn = req
680 .query_params
681 .get("Arn")
682 .or_else(|| req.query_params.get("find"))
683 .cloned()
684 .unwrap_or_default();
685 let (account_id, layer_name, version) =
686 parse_layer_version_arn(&arn).ok_or_else(|| missing("Arn"))?;
687 let region = self.region_for(&account_id);
688 let location = layer_content_url(req, &account_id, &layer_name, version);
689 self.with_state_read(&account_id, ®ion, |state| {
690 state
691 .layers
692 .get(&layer_name)
693 .and_then(|l| l.versions.iter().find(|v| v.version == version))
694 .map(|v| {
695 ok(json!({
696 "LayerVersionArn": v.layer_version_arn,
697 "Version": v.version,
698 "Description": v.description,
699 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
700 "CompatibleRuntimes": v.compatible_runtimes,
701 "LicenseInfo": v.license_info,
702 "Content": {
703 "Location": location,
704 "CodeSha256": v.code_sha256,
705 "CodeSize": v.code_size,
706 },
707 }))
708 })
709 .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
710 })
711 }
712
713 fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
714 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
715 let version: i64 = req
716 .path_segments
717 .get(4)
718 .and_then(|s| s.parse().ok())
719 .unwrap_or(0);
720 let mut accounts = self.state.write();
721 let state = accounts.get_or_create(&req.account_id);
722 if let Some(layer) = state.layers.get_mut(&layer_name) {
723 layer.versions.retain(|v| v.version != version);
724 }
725 empty()
726 }
727
728 fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
729 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
730 let version: i64 = req
731 .path_segments
732 .get(4)
733 .and_then(|s| s.parse().ok())
734 .unwrap_or(0);
735 let region = self.region_for(&req.account_id);
736 self.with_state_read(&req.account_id, ®ion, |state| {
737 let policy = state
738 .layers
739 .get(&layer_name)
740 .and_then(|l| l.versions.iter().find(|v| v.version == version))
741 .and_then(|v| v.policy.clone())
742 .unwrap_or_else(|| "{}".to_string());
743 ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
744 })
745 }
746
747 fn add_layer_version_permission(
748 &self,
749 req: &AwsRequest,
750 ) -> Result<AwsResponse, AwsServiceError> {
751 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
752 let version: i64 = req
753 .path_segments
754 .get(4)
755 .and_then(|s| s.parse().ok())
756 .unwrap_or(0);
757 let body = body(req);
758 let mut accounts = self.state.write();
759 let state = accounts.get_or_create(&req.account_id);
760 if let Some(layer) = state.layers.get_mut(&layer_name) {
761 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
762 let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
763 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
764 let statements = policy_doc["Statement"].as_array_mut();
765 let new_stmt = json!({
766 "Sid": body["StatementId"].as_str().unwrap_or("default"),
767 "Effect": "Allow",
768 "Principal": body["Principal"].clone(),
769 "Action": body["Action"].clone(),
770 "Resource": v.layer_version_arn.clone(),
771 });
772 if let Some(s) = statements {
773 s.push(new_stmt);
774 } else {
775 policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
776 }
777 v.policy = Some(policy_doc.to_string());
778 }
779 }
780 ok(json!({
781 "Statement": body["StatementId"],
782 "RevisionId": id_from_time("rev-"),
783 }))
784 }
785
786 fn remove_layer_version_permission(
787 &self,
788 req: &AwsRequest,
789 ) -> Result<AwsResponse, AwsServiceError> {
790 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
791 let version: i64 = req
792 .path_segments
793 .get(4)
794 .and_then(|s| s.parse().ok())
795 .unwrap_or(0);
796 let sid = req.path_segments.get(6).cloned().unwrap_or_default();
797 let mut accounts = self.state.write();
798 let state = accounts.get_or_create(&req.account_id);
799 if let Some(layer) = state.layers.get_mut(&layer_name) {
800 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
801 if let Some(policy) = v.policy.clone() {
802 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
803 if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
804 stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
805 }
806 v.policy = Some(policy_doc.to_string());
807 }
808 }
809 }
810 empty()
811 }
812
813 fn create_function_url_config(
816 &self,
817 function_name: &str,
818 req: &AwsRequest,
819 ) -> Result<AwsResponse, AwsServiceError> {
820 let body = body(req);
821 let auth_type = body["AuthType"].as_str().unwrap_or("NONE").to_string();
822 let now = Utc::now();
823 let mut accounts = self.state.write();
824 let state = accounts.get_or_create(&req.account_id);
825 if !state.functions.contains_key(function_name) {
826 return Err(not_found("Function", function_name));
827 }
828 let function_arn = format!(
829 "arn:aws:lambda:{}:{}:function:{}",
830 state.region, state.account_id, function_name
831 );
832 let cfg = FunctionUrlConfig {
833 function_arn: function_arn.clone(),
834 function_url: format!(
835 "https://{function_name}.lambda-url.{}.on.aws/",
836 state.region
837 ),
838 auth_type: auth_type.clone(),
839 cors: body.get("Cors").cloned(),
840 creation_time: now,
841 last_modified_time: now,
842 invoke_mode: body["InvokeMode"]
843 .as_str()
844 .unwrap_or("BUFFERED")
845 .to_string(),
846 };
847 state
848 .function_url_configs
849 .insert(function_name.to_string(), cfg.clone());
850 ok(serde_json::to_value(cfg).unwrap_or_default())
851 }
852
853 fn get_function_url_config(
854 &self,
855 function_name: &str,
856 account_id: &str,
857 ) -> Result<AwsResponse, AwsServiceError> {
858 let region = self.region_for(account_id);
859 self.with_state_read(account_id, ®ion, |state| {
860 state
861 .function_url_configs
862 .get(function_name)
863 .map(|c| ok(serde_json::to_value(c).unwrap_or_default()))
864 .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
865 })
866 }
867
868 fn update_function_url_config(
869 &self,
870 function_name: &str,
871 req: &AwsRequest,
872 ) -> Result<AwsResponse, AwsServiceError> {
873 let body = body(req);
874 let mut accounts = self.state.write();
875 let state = accounts.get_or_create(&req.account_id);
876 let cfg = state
877 .function_url_configs
878 .get_mut(function_name)
879 .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
880 if let Some(a) = body["AuthType"].as_str() {
881 cfg.auth_type = a.to_string();
882 }
883 if let Some(c) = body.get("Cors") {
884 cfg.cors = Some(c.clone());
885 }
886 if let Some(m) = body["InvokeMode"].as_str() {
887 cfg.invoke_mode = m.to_string();
888 }
889 cfg.last_modified_time = Utc::now();
890 ok(serde_json::to_value(cfg).unwrap_or_default())
891 }
892
893 fn delete_function_url_config(
894 &self,
895 function_name: &str,
896 account_id: &str,
897 ) -> Result<AwsResponse, AwsServiceError> {
898 let mut accounts = self.state.write();
899 let state = accounts.get_or_create(account_id);
900 state.function_url_configs.remove(function_name);
901 empty()
902 }
903
904 fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
905 let region = self.region_for(account_id);
906 self.with_state_read(account_id, ®ion, |state| {
907 let configs: Vec<&FunctionUrlConfig> = state.function_url_configs.values().collect();
908 ok(json!({"FunctionUrlConfigs": configs}))
909 })
910 }
911
912 fn put_function_concurrency(
915 &self,
916 function_name: &str,
917 req: &AwsRequest,
918 ) -> Result<AwsResponse, AwsServiceError> {
919 let body = body(req);
920 let n = body["ReservedConcurrentExecutions"]
921 .as_i64()
922 .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
923 let mut accounts = self.state.write();
924 let state = accounts.get_or_create(&req.account_id);
925 state
926 .function_concurrency
927 .insert(function_name.to_string(), n);
928 ok(json!({"ReservedConcurrentExecutions": n}))
929 }
930
931 fn get_function_concurrency(
932 &self,
933 function_name: &str,
934 account_id: &str,
935 ) -> Result<AwsResponse, AwsServiceError> {
936 let region = self.region_for(account_id);
937 self.with_state_read(account_id, ®ion, |state| {
938 let n = state
939 .function_concurrency
940 .get(function_name)
941 .copied()
942 .unwrap_or(0);
943 ok(json!({"ReservedConcurrentExecutions": n}))
944 })
945 }
946
947 fn delete_function_concurrency(
948 &self,
949 function_name: &str,
950 account_id: &str,
951 ) -> Result<AwsResponse, AwsServiceError> {
952 let mut accounts = self.state.write();
953 let state = accounts.get_or_create(account_id);
954 state.function_concurrency.remove(function_name);
955 empty()
956 }
957
958 fn pc_key(function: &str, qualifier: &str) -> String {
959 format!("{function}:{qualifier}")
960 }
961
962 fn put_provisioned_concurrency(
963 &self,
964 function_name: &str,
965 req: &AwsRequest,
966 ) -> Result<AwsResponse, AwsServiceError> {
967 let body = body(req);
968 let qualifier = parse_qualifier(req);
969 let requested = body["ProvisionedConcurrentExecutions"]
970 .as_i64()
971 .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
972 let mut accounts = self.state.write();
973 let state = accounts.get_or_create(&req.account_id);
974 let cfg = ProvisionedConcurrencyConfig {
975 requested,
976 allocated: requested,
977 status: "READY".to_string(),
978 last_modified: Utc::now(),
979 };
980 state
981 .provisioned_concurrency
982 .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
983 ok(json!({
984 "RequestedProvisionedConcurrentExecutions": cfg.requested,
985 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
986 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
987 "Status": cfg.status,
988 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
989 }))
990 }
991
992 fn get_provisioned_concurrency(
993 &self,
994 function_name: &str,
995 req: &AwsRequest,
996 ) -> Result<AwsResponse, AwsServiceError> {
997 let qualifier = parse_qualifier(req);
998 let region = self.region_for(&req.account_id);
999 self.with_state_read(&req.account_id, ®ion, |state| {
1000 state
1001 .provisioned_concurrency
1002 .get(&Self::pc_key(function_name, &qualifier))
1003 .map(|cfg| ok(json!({
1004 "RequestedProvisionedConcurrentExecutions": cfg.requested,
1005 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1006 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1007 "Status": cfg.status,
1008 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1009 })))
1010 .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
1011 })
1012 }
1013
1014 fn delete_provisioned_concurrency(
1015 &self,
1016 function_name: &str,
1017 req: &AwsRequest,
1018 ) -> Result<AwsResponse, AwsServiceError> {
1019 let qualifier = parse_qualifier(req);
1020 let mut accounts = self.state.write();
1021 let state = accounts.get_or_create(&req.account_id);
1022 state
1023 .provisioned_concurrency
1024 .remove(&Self::pc_key(function_name, &qualifier));
1025 empty()
1026 }
1027
1028 fn list_provisioned_concurrency(
1029 &self,
1030 function_name: &str,
1031 account_id: &str,
1032 ) -> Result<AwsResponse, AwsServiceError> {
1033 let region = self.region_for(account_id);
1034 self.with_state_read(account_id, ®ion, |state| {
1035 let prefix = format!("{function_name}:");
1036 let configs: Vec<Value> = state
1037 .provisioned_concurrency
1038 .iter()
1039 .filter(|(k, _)| k.starts_with(&prefix))
1040 .map(|(k, cfg)| {
1041 let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
1042 json!({
1043 "FunctionArn": format!(
1044 "arn:aws:lambda:{}:{}:function:{}:{}",
1045 state.region, state.account_id, function_name, qualifier
1046 ),
1047 "Status": cfg.status,
1048 "RequestedProvisionedConcurrentExecutions": cfg.requested,
1049 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1050 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1051 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1052 })
1053 })
1054 .collect();
1055 ok(json!({"ProvisionedConcurrencyConfigs": configs}))
1056 })
1057 }
1058
1059 fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1062 let body = body(req);
1063 let mut accounts = self.state.write();
1064 let state = accounts.get_or_create(&req.account_id);
1065 let id = id_from_time("csc-");
1066 let arn = format!(
1067 "arn:aws:lambda:{}:{}:code-signing-config:{}",
1068 state.region, state.account_id, id
1069 );
1070 let publishers: Vec<String> = body
1071 .get("AllowedPublishers")
1072 .and_then(|v| v.get("SigningProfileVersionArns"))
1073 .and_then(|v| v.as_array())
1074 .map(|arr| {
1075 arr.iter()
1076 .filter_map(|x| x.as_str().map(String::from))
1077 .collect()
1078 })
1079 .unwrap_or_default();
1080 let csc = CodeSigningConfig {
1081 csc_id: id.clone(),
1082 csc_arn: arn,
1083 description: body["Description"].as_str().unwrap_or("").to_string(),
1084 allowed_publishers: publishers,
1085 untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
1086 .as_str()
1087 .unwrap_or("Warn")
1088 .to_string(),
1089 last_modified: Utc::now(),
1090 };
1091 state.code_signing_configs.insert(id, csc.clone());
1092 ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
1093 }
1094
1095 fn get_code_signing_config(
1096 &self,
1097 csc_id: &str,
1098 account_id: &str,
1099 ) -> Result<AwsResponse, AwsServiceError> {
1100 let id = extract_csc_id(csc_id);
1101 let region = self.region_for(account_id);
1102 self.with_state_read(account_id, ®ion, |state| {
1103 state
1104 .code_signing_configs
1105 .get(&id)
1106 .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
1107 .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
1108 })
1109 }
1110
1111 fn update_code_signing_config(
1112 &self,
1113 csc_id: &str,
1114 req: &AwsRequest,
1115 ) -> Result<AwsResponse, AwsServiceError> {
1116 let body = body(req);
1117 let mut accounts = self.state.write();
1118 let state = accounts.get_or_create(&req.account_id);
1119 let id = extract_csc_id(csc_id);
1120 let csc = state
1121 .code_signing_configs
1122 .get_mut(&id)
1123 .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1124 if let Some(d) = body["Description"].as_str() {
1125 csc.description = d.to_string();
1126 }
1127 if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1128 {
1129 csc.untrusted_artifact_action = action.to_string();
1130 }
1131 csc.last_modified = Utc::now();
1132 ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1133 }
1134
1135 fn delete_code_signing_config(
1136 &self,
1137 csc_id: &str,
1138 account_id: &str,
1139 ) -> Result<AwsResponse, AwsServiceError> {
1140 let id = extract_csc_id(csc_id);
1141 let mut accounts = self.state.write();
1142 let state = accounts.get_or_create(account_id);
1143 state.code_signing_configs.remove(&id);
1144 empty()
1145 }
1146
1147 fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1148 let region = self.region_for(account_id);
1149 self.with_state_read(account_id, ®ion, |state| {
1150 let cfgs: Vec<Value> = state
1151 .code_signing_configs
1152 .values()
1153 .map(code_signing_json)
1154 .collect();
1155 ok(json!({"CodeSigningConfigs": cfgs}))
1156 })
1157 }
1158
1159 fn put_function_code_signing(
1160 &self,
1161 function_name: &str,
1162 req: &AwsRequest,
1163 ) -> Result<AwsResponse, AwsServiceError> {
1164 let body = body(req);
1165 let csc_arn = body["CodeSigningConfigArn"]
1166 .as_str()
1167 .ok_or_else(|| missing("CodeSigningConfigArn"))?
1168 .to_string();
1169 let mut accounts = self.state.write();
1170 let state = accounts.get_or_create(&req.account_id);
1171 state
1172 .function_code_signing
1173 .insert(function_name.to_string(), csc_arn.clone());
1174 ok(json!({
1175 "CodeSigningConfigArn": csc_arn,
1176 "FunctionName": function_name,
1177 }))
1178 }
1179
1180 fn get_function_code_signing(
1181 &self,
1182 function_name: &str,
1183 account_id: &str,
1184 ) -> Result<AwsResponse, AwsServiceError> {
1185 let region = self.region_for(account_id);
1186 self.with_state_read(account_id, ®ion, |state| {
1187 let arn = state
1188 .function_code_signing
1189 .get(function_name)
1190 .cloned()
1191 .unwrap_or_default();
1192 ok(json!({
1193 "CodeSigningConfigArn": arn,
1194 "FunctionName": function_name,
1195 }))
1196 })
1197 }
1198
1199 fn delete_function_code_signing(
1200 &self,
1201 function_name: &str,
1202 account_id: &str,
1203 ) -> Result<AwsResponse, AwsServiceError> {
1204 let mut accounts = self.state.write();
1205 let state = accounts.get_or_create(account_id);
1206 state.function_code_signing.remove(function_name);
1207 empty()
1208 }
1209
1210 fn list_functions_by_code_signing(
1211 &self,
1212 csc_id: &str,
1213 account_id: &str,
1214 ) -> Result<AwsResponse, AwsServiceError> {
1215 let id = extract_csc_id(csc_id);
1216 let region = self.region_for(account_id);
1217 self.with_state_read(account_id, ®ion, |state| {
1218 let funcs: Vec<&String> = state
1219 .function_code_signing
1220 .iter()
1221 .filter(|(_, v)| v.contains(&id))
1222 .map(|(k, _)| k)
1223 .collect();
1224 ok(json!({"FunctionArns": funcs}))
1225 })
1226 }
1227
1228 fn ev_key(function: &str, qualifier: &str) -> String {
1231 format!("{function}:{qualifier}")
1232 }
1233
1234 fn put_function_event_invoke(
1235 &self,
1236 function_name: &str,
1237 req: &AwsRequest,
1238 ) -> Result<AwsResponse, AwsServiceError> {
1239 let body = body(req);
1240 let qualifier = parse_qualifier(req);
1241 let function_arn = format!(
1242 "arn:aws:lambda:{}:{}:function:{}",
1243 self.region_for(&req.account_id),
1244 req.account_id,
1245 function_name
1246 );
1247 let cfg = EventInvokeConfig {
1248 function_arn: function_arn.clone(),
1249 maximum_event_age: body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600),
1250 maximum_retry_attempts: body["MaximumRetryAttempts"].as_i64().unwrap_or(2),
1251 destination_config: body.get("DestinationConfig").cloned().unwrap_or(json!({})),
1252 last_modified: Utc::now(),
1253 };
1254 let mut accounts = self.state.write();
1255 let state = accounts.get_or_create(&req.account_id);
1256 state
1257 .event_invoke_configs
1258 .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1259 ok(event_invoke_json(&cfg))
1260 }
1261
1262 fn get_function_event_invoke(
1263 &self,
1264 function_name: &str,
1265 req: &AwsRequest,
1266 ) -> Result<AwsResponse, AwsServiceError> {
1267 let qualifier = parse_qualifier(req);
1268 let region = self.region_for(&req.account_id);
1269 self.with_state_read(&req.account_id, ®ion, |state| {
1270 state
1271 .event_invoke_configs
1272 .get(&Self::ev_key(function_name, &qualifier))
1273 .map(|c| ok(event_invoke_json(c)))
1274 .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
1275 })
1276 }
1277
1278 fn delete_function_event_invoke(
1279 &self,
1280 function_name: &str,
1281 req: &AwsRequest,
1282 ) -> Result<AwsResponse, AwsServiceError> {
1283 let qualifier = parse_qualifier(req);
1284 let mut accounts = self.state.write();
1285 let state = accounts.get_or_create(&req.account_id);
1286 state
1287 .event_invoke_configs
1288 .remove(&Self::ev_key(function_name, &qualifier));
1289 empty()
1290 }
1291
1292 fn list_function_event_invoke(
1293 &self,
1294 function_name: &str,
1295 account_id: &str,
1296 ) -> Result<AwsResponse, AwsServiceError> {
1297 let region = self.region_for(account_id);
1298 self.with_state_read(account_id, ®ion, |state| {
1299 let prefix = format!("{function_name}:");
1300 let configs: Vec<Value> = state
1301 .event_invoke_configs
1302 .iter()
1303 .filter(|(k, _)| k.starts_with(&prefix))
1304 .map(|(_, c)| event_invoke_json(c))
1305 .collect();
1306 ok(json!({"FunctionEventInvokeConfigs": configs}))
1307 })
1308 }
1309
1310 fn put_runtime_management(
1313 &self,
1314 function_name: &str,
1315 req: &AwsRequest,
1316 ) -> Result<AwsResponse, AwsServiceError> {
1317 let body = body(req);
1318 let qualifier = parse_qualifier(req);
1319 let cfg = RuntimeManagementConfig {
1320 update_runtime_on: body["UpdateRuntimeOn"]
1321 .as_str()
1322 .unwrap_or("Auto")
1323 .to_string(),
1324 runtime_version_arn: body["RuntimeVersionArn"].as_str().unwrap_or("").to_string(),
1325 };
1326 let mut accounts = self.state.write();
1327 let state = accounts.get_or_create(&req.account_id);
1328 state
1329 .runtime_management
1330 .insert(format!("{function_name}:{qualifier}"), cfg.clone());
1331 ok(json!({
1332 "FunctionArn": Arn::new("lambda", &state.region, &state.account_id, &format!("function:{function_name}:{qualifier}")).to_string(),
1333 "UpdateRuntimeOn": cfg.update_runtime_on,
1334 "RuntimeVersionArn": cfg.runtime_version_arn,
1335 }))
1336 }
1337
1338 fn get_runtime_management(
1339 &self,
1340 function_name: &str,
1341 req: &AwsRequest,
1342 ) -> Result<AwsResponse, AwsServiceError> {
1343 let qualifier = parse_qualifier(req);
1344 let region = self.region_for(&req.account_id);
1345 self.with_state_read(&req.account_id, ®ion, |state| {
1346 let cfg = state
1347 .runtime_management
1348 .get(&format!("{function_name}:{qualifier}"))
1349 .cloned()
1350 .unwrap_or(RuntimeManagementConfig {
1351 update_runtime_on: "Auto".to_string(),
1352 runtime_version_arn: String::new(),
1353 });
1354 ok(json!({
1355 "FunctionArn": format!(
1356 "arn:aws:lambda:{}:{}:function:{}:{}",
1357 state.region, state.account_id, function_name, qualifier
1358 ),
1359 "UpdateRuntimeOn": cfg.update_runtime_on,
1360 "RuntimeVersionArn": cfg.runtime_version_arn,
1361 }))
1362 })
1363 }
1364
1365 fn put_scaling_config(
1368 &self,
1369 uuid: &str,
1370 req: &AwsRequest,
1371 ) -> Result<AwsResponse, AwsServiceError> {
1372 let body = body(req);
1373 let cfg = FunctionScalingConfig {
1374 maximum_concurrency: body["MaximumConcurrency"].as_i64().unwrap_or(0),
1375 };
1376 let mut accounts = self.state.write();
1377 let state = accounts.get_or_create(&req.account_id);
1378 state.scaling_configs.insert(uuid.to_string(), cfg.clone());
1379 ok(json!({
1380 "MaximumConcurrency": cfg.maximum_concurrency,
1381 }))
1382 }
1383
1384 fn get_scaling_config(
1385 &self,
1386 uuid: &str,
1387 account_id: &str,
1388 ) -> Result<AwsResponse, AwsServiceError> {
1389 let region = self.region_for(account_id);
1390 self.with_state_read(account_id, ®ion, |state| {
1391 let n = state
1392 .scaling_configs
1393 .get(uuid)
1394 .map(|c| c.maximum_concurrency)
1395 .unwrap_or(0);
1396 ok(json!({"MaximumConcurrency": n}))
1397 })
1398 }
1399
1400 fn put_recursion_config(
1403 &self,
1404 function_name: &str,
1405 req: &AwsRequest,
1406 ) -> Result<AwsResponse, AwsServiceError> {
1407 let body = body(req);
1408 let mode = body["RecursiveLoop"]
1409 .as_str()
1410 .unwrap_or("Terminate")
1411 .to_string();
1412 let mut accounts = self.state.write();
1413 let state = accounts.get_or_create(&req.account_id);
1414 state
1415 .recursion_configs
1416 .insert(function_name.to_string(), mode.clone());
1417 ok(json!({"RecursiveLoop": mode}))
1418 }
1419
1420 fn get_recursion_config(
1421 &self,
1422 function_name: &str,
1423 account_id: &str,
1424 ) -> Result<AwsResponse, AwsServiceError> {
1425 let region = self.region_for(account_id);
1426 self.with_state_read(account_id, ®ion, |state| {
1427 let mode = state
1428 .recursion_configs
1429 .get(function_name)
1430 .cloned()
1431 .unwrap_or_else(|| "Terminate".to_string());
1432 ok(json!({"RecursiveLoop": mode}))
1433 })
1434 }
1435
1436 fn tag_resource(
1439 &self,
1440 resource_arn: &str,
1441 req: &AwsRequest,
1442 ) -> Result<AwsResponse, AwsServiceError> {
1443 let body = body(req);
1444 let new_tags: Vec<(String, String)> = body
1445 .get("Tags")
1446 .and_then(|v| v.as_object())
1447 .map(|m| {
1448 m.iter()
1449 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1450 .collect()
1451 })
1452 .unwrap_or_default();
1453 let mut accounts = self.state.write();
1454 let state = accounts.get_or_create(&req.account_id);
1455 let entry = state.tags.entry(resource_arn.to_string()).or_default();
1456 for (k, v) in new_tags {
1457 entry.retain(|(ek, _)| ek != &k);
1458 entry.push((k, v));
1459 }
1460 empty()
1461 }
1462
1463 fn untag_resource(
1464 &self,
1465 resource_arn: &str,
1466 req: &AwsRequest,
1467 ) -> Result<AwsResponse, AwsServiceError> {
1468 let mut keys: Vec<String> = Vec::new();
1469 for (k, v) in &req.query_params {
1470 if k.starts_with("tagKeys") {
1471 keys.push(v.clone());
1472 }
1473 }
1474 let mut accounts = self.state.write();
1475 let state = accounts.get_or_create(&req.account_id);
1476 if let Some(entry) = state.tags.get_mut(resource_arn) {
1477 entry.retain(|(k, _)| !keys.contains(k));
1478 }
1479 empty()
1480 }
1481
1482 fn list_tags(
1483 &self,
1484 resource_arn: &str,
1485 account_id: &str,
1486 ) -> Result<AwsResponse, AwsServiceError> {
1487 let region = self.region_for(account_id);
1488 self.with_state_read(account_id, ®ion, |state| {
1489 let tags: serde_json::Map<String, Value> = state
1490 .tags
1491 .get(resource_arn)
1492 .map(|v| {
1493 v.iter()
1494 .map(|(k, val)| (k.clone(), Value::String(val.clone())))
1495 .collect()
1496 })
1497 .unwrap_or_default();
1498 ok(json!({"Tags": tags}))
1499 })
1500 }
1501
1502 fn create_capacity_provider(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1505 let body = body(req);
1506 let name = body["CapacityProviderName"]
1507 .as_str()
1508 .or_else(|| body["Name"].as_str())
1509 .ok_or_else(|| missing("CapacityProviderName"))?
1510 .to_string();
1511 let mut accounts = self.state.write();
1512 let state = accounts.get_or_create(&req.account_id);
1513 let arn = format!(
1514 "arn:aws:lambda:{}:{}:capacity-provider:{}",
1515 state.region, state.account_id, name
1516 );
1517 let cp = CapacityProvider {
1518 name: name.clone(),
1519 arn: arn.clone(),
1520 status: "ACTIVE".to_string(),
1521 created: Utc::now(),
1522 };
1523 state.capacity_providers.insert(name, cp.clone());
1524 ok(json!({
1525 "Name": cp.name,
1526 "Arn": cp.arn,
1527 "Status": cp.status,
1528 }))
1529 }
1530
1531 fn get_capacity_provider(
1532 &self,
1533 name: &str,
1534 account_id: &str,
1535 ) -> Result<AwsResponse, AwsServiceError> {
1536 let region = self.region_for(account_id);
1537 self.with_state_read(account_id, ®ion, |state| {
1538 state
1539 .capacity_providers
1540 .get(name)
1541 .map(|cp| {
1542 ok(json!({
1543 "Name": cp.name,
1544 "Arn": cp.arn,
1545 "Status": cp.status,
1546 }))
1547 })
1548 .unwrap_or_else(|| Err(not_found("CapacityProvider", name)))
1549 })
1550 }
1551
1552 fn update_capacity_provider(
1553 &self,
1554 name: &str,
1555 req: &AwsRequest,
1556 ) -> Result<AwsResponse, AwsServiceError> {
1557 let mut accounts = self.state.write();
1558 let state = accounts.get_or_create(&req.account_id);
1559 let cp = state
1560 .capacity_providers
1561 .get_mut(name)
1562 .ok_or_else(|| not_found("CapacityProvider", name))?;
1563 cp.status = "ACTIVE".to_string();
1564 ok(json!({
1565 "Name": cp.name,
1566 "Arn": cp.arn,
1567 "Status": cp.status,
1568 }))
1569 }
1570
1571 fn delete_capacity_provider(
1572 &self,
1573 name: &str,
1574 account_id: &str,
1575 ) -> Result<AwsResponse, AwsServiceError> {
1576 let mut accounts = self.state.write();
1577 let state = accounts.get_or_create(account_id);
1578 state.capacity_providers.remove(name);
1579 empty()
1580 }
1581
1582 fn list_capacity_providers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1583 let region = self.region_for(account_id);
1584 self.with_state_read(account_id, ®ion, |state| {
1585 let cps: Vec<Value> = state
1586 .capacity_providers
1587 .values()
1588 .map(|cp| {
1589 json!({
1590 "Name": cp.name,
1591 "Arn": cp.arn,
1592 "Status": cp.status,
1593 })
1594 })
1595 .collect();
1596 ok(json!({"CapacityProviders": cps}))
1597 })
1598 }
1599
1600 fn list_versions_by_capacity_provider(
1601 &self,
1602 _name: &str,
1603 _account_id: &str,
1604 ) -> Result<AwsResponse, AwsServiceError> {
1605 ok(json!({"FunctionVersions": []}))
1606 }
1607
1608 fn checkpoint_durable_execution(
1611 &self,
1612 id: &str,
1613 req: &AwsRequest,
1614 ) -> Result<AwsResponse, AwsServiceError> {
1615 let body = body(req);
1616 let body_arn = body
1617 .get("FunctionArn")
1618 .and_then(|v| v.as_str())
1619 .map(String::from);
1620 let body_function = body
1621 .get("FunctionName")
1622 .and_then(|v| v.as_str())
1623 .map(String::from);
1624 let mut accounts = self.state.write();
1625 let state = accounts.get_or_create(&req.account_id);
1626 let derived_arn = body_arn.unwrap_or_else(|| match body_function {
1627 Some(name) if name.starts_with("arn:") => name,
1628 Some(name) => format!(
1629 "arn:aws:lambda:us-east-1:{}:function:{name}",
1630 req.account_id
1631 ),
1632 None => String::new(),
1633 });
1634 let exec = state
1635 .durable_executions
1636 .entry(id.to_string())
1637 .or_insert_with(|| DurableExecution {
1638 id: id.to_string(),
1639 function_arn: derived_arn.clone(),
1640 status: "RUNNING".to_string(),
1641 started: Utc::now(),
1642 stopped: None,
1643 history: Vec::new(),
1644 state: json!({}),
1645 });
1646 if exec.function_arn.is_empty() && !derived_arn.is_empty() {
1647 exec.function_arn = derived_arn;
1648 }
1649 if let Some(s) = body.get("State") {
1650 exec.state = s.clone();
1651 }
1652 if let Some(h) = body.get("HistoryEvent") {
1653 exec.history.push(h.clone());
1654 }
1655 empty()
1656 }
1657
1658 fn get_durable_execution(
1659 &self,
1660 id: &str,
1661 account_id: &str,
1662 ) -> Result<AwsResponse, AwsServiceError> {
1663 let region = self.region_for(account_id);
1664 self.with_state_read(account_id, ®ion, |state| {
1665 state
1666 .durable_executions
1667 .get(id)
1668 .map(|e| {
1669 ok(json!({
1670 "Id": e.id,
1671 "FunctionArn": e.function_arn,
1672 "Status": e.status,
1673 "Started": e.started.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1674 "Stopped": e.stopped.map(|d| d.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
1675 }))
1676 })
1677 .unwrap_or_else(|| Err(not_found("DurableExecution", id)))
1678 })
1679 }
1680
1681 fn get_durable_execution_history(
1682 &self,
1683 id: &str,
1684 account_id: &str,
1685 ) -> Result<AwsResponse, AwsServiceError> {
1686 let region = self.region_for(account_id);
1687 self.with_state_read(account_id, ®ion, |state| {
1688 let history = state
1689 .durable_executions
1690 .get(id)
1691 .map(|e| e.history.clone())
1692 .unwrap_or_default();
1693 ok(json!({"Events": history}))
1694 })
1695 }
1696
1697 fn get_durable_execution_state(
1698 &self,
1699 id: &str,
1700 account_id: &str,
1701 ) -> Result<AwsResponse, AwsServiceError> {
1702 let region = self.region_for(account_id);
1703 self.with_state_read(account_id, ®ion, |state| {
1704 let s = state
1705 .durable_executions
1706 .get(id)
1707 .map(|e| e.state.clone())
1708 .unwrap_or(json!({}));
1709 ok(json!({"State": s}))
1710 })
1711 }
1712
1713 fn list_durable_executions_by_function(
1714 &self,
1715 function_name: &str,
1716 account_id: &str,
1717 ) -> Result<AwsResponse, AwsServiceError> {
1718 let region = self.region_for(account_id);
1719 self.with_state_read(account_id, ®ion, |state| {
1720 let executions: Vec<Value> = state
1721 .durable_executions
1722 .values()
1723 .filter(|e| e.function_arn.contains(function_name))
1724 .map(|e| {
1725 json!({
1726 "Id": e.id,
1727 "Status": e.status,
1728 })
1729 })
1730 .collect();
1731 ok(json!({"DurableExecutions": executions}))
1732 })
1733 }
1734
1735 fn stop_durable_execution(
1736 &self,
1737 id: &str,
1738 account_id: &str,
1739 ) -> Result<AwsResponse, AwsServiceError> {
1740 let mut accounts = self.state.write();
1741 let state = accounts.get_or_create(account_id);
1742 if let Some(e) = state.durable_executions.get_mut(id) {
1743 e.status = "STOPPED".to_string();
1744 e.stopped = Some(Utc::now());
1745 }
1746 empty()
1747 }
1748
1749 fn send_durable_callback(
1750 &self,
1751 id: &str,
1752 _req: &AwsRequest,
1753 kind: &str,
1754 ) -> Result<AwsResponse, AwsServiceError> {
1755 let mut accounts = self.state.write();
1756 let state = accounts.get_or_create(_req.account_id.as_str());
1757 if let Some(e) = state.durable_executions.get_mut(id) {
1758 e.history.push(
1759 json!({"type": format!("Callback{kind}"), "timestamp": Utc::now().to_rfc3339()}),
1760 );
1761 if kind == "SUCCESS" {
1762 e.status = "SUCCEEDED".to_string();
1763 } else if kind == "FAILURE" {
1764 e.status = "FAILED".to_string();
1765 }
1766 }
1767 empty()
1768 }
1769
1770 fn update_event_source_mapping_handler(
1771 &self,
1772 uuid: &str,
1773 req: &AwsRequest,
1774 ) -> Result<AwsResponse, AwsServiceError> {
1775 let body = body(req);
1776 let mut accounts = self.state.write();
1777 let state = accounts.get_or_create(&req.account_id);
1778 let esm = state
1779 .event_source_mappings
1780 .get_mut(uuid)
1781 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
1782 if let Some(b) = body["BatchSize"].as_i64() {
1783 esm.batch_size = b;
1784 }
1785 if let Some(name) = body["FunctionName"].as_str() {
1786 esm.function_arn = format!(
1787 "arn:aws:lambda:{}:{}:function:{}",
1788 state.region, state.account_id, name
1789 );
1790 }
1791 if let Some(filters) = body
1792 .get("FilterCriteria")
1793 .and_then(|v| v.get("Filters"))
1794 .and_then(|v| v.as_array())
1795 {
1796 esm.filter_patterns = filters
1797 .iter()
1798 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
1799 .collect();
1800 }
1801 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
1802 esm.function_response_types = types
1803 .iter()
1804 .filter_map(|v| v.as_str().map(String::from))
1805 .collect();
1806 }
1807 if let Some(w) = body
1808 .get("MaximumBatchingWindowInSeconds")
1809 .and_then(|v| v.as_i64())
1810 {
1811 esm.maximum_batching_window_in_seconds = Some(w);
1812 }
1813 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
1814 esm.parallelization_factor = Some(p);
1815 }
1816 let mut body_json = json!({
1817 "UUID": esm.uuid,
1818 "FunctionArn": esm.function_arn,
1819 "EventSourceArn": esm.event_source_arn,
1820 "BatchSize": esm.batch_size,
1821 "State": "Enabled",
1822 "StateTransitionReason": "USER_INITIATED",
1823 "LastModified": chrono::Utc::now().timestamp() as f64,
1824 });
1825 let obj = body_json.as_object_mut().expect("json! built object");
1826 if !esm.filter_patterns.is_empty() {
1827 obj.insert(
1828 "FilterCriteria".into(),
1829 json!({
1830 "Filters": esm
1831 .filter_patterns
1832 .iter()
1833 .map(|p| json!({"Pattern": p}))
1834 .collect::<Vec<_>>(),
1835 }),
1836 );
1837 }
1838 if !esm.function_response_types.is_empty() {
1839 obj.insert(
1840 "FunctionResponseTypes".into(),
1841 json!(esm.function_response_types),
1842 );
1843 }
1844 if let Some(w) = esm.maximum_batching_window_in_seconds {
1845 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1846 }
1847 if let Some(p) = esm.parallelization_factor {
1848 obj.insert("ParallelizationFactor".into(), json!(p));
1849 }
1850 ok(body_json)
1851 }
1852
1853 fn region_for(&self, account_id: &str) -> String {
1854 let accounts = self.state.read();
1855 accounts
1856 .get(account_id)
1857 .map(|s| s.region.clone())
1858 .unwrap_or_else(|| "us-east-1".to_string())
1859 }
1860}
1861
1862fn extract_csc_id(input: &str) -> String {
1863 let decoded = percent_decode(input);
1866 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1867}
1868
1869fn percent_decode(input: &str) -> String {
1870 let mut out = String::with_capacity(input.len());
1871 let bytes = input.as_bytes();
1872 let mut i = 0;
1873 while i < bytes.len() {
1874 if bytes[i] == b'%' && i + 2 < bytes.len() {
1875 let hi = (bytes[i + 1] as char).to_digit(16);
1876 let lo = (bytes[i + 2] as char).to_digit(16);
1877 if let (Some(h), Some(l)) = (hi, lo) {
1878 out.push(((h * 16 + l) as u8) as char);
1879 i += 3;
1880 continue;
1881 }
1882 }
1883 out.push(bytes[i] as char);
1884 i += 1;
1885 }
1886 out
1887}
1888
1889fn code_signing_json(c: &CodeSigningConfig) -> Value {
1890 json!({
1891 "CodeSigningConfigId": c.csc_id,
1892 "CodeSigningConfigArn": c.csc_arn,
1893 "Description": c.description,
1894 "AllowedPublishers": {
1895 "SigningProfileVersionArns": c.allowed_publishers,
1896 },
1897 "CodeSigningPolicies": {
1898 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1899 },
1900 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1901 })
1902}
1903
1904fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1905 json!({
1906 "FunctionArn": c.function_arn,
1907 "MaximumEventAgeInSeconds": c.maximum_event_age,
1908 "MaximumRetryAttempts": c.maximum_retry_attempts,
1909 "DestinationConfig": c.destination_config,
1910 "LastModified": c.last_modified.timestamp(),
1911 })
1912}
1913
1914#[cfg(test)]
1915mod tests {
1916 use crate::service::LambdaService;
1917 use crate::state::{LambdaState, SharedLambdaState};
1918 use fakecloud_core::multi_account::MultiAccountState;
1919 use fakecloud_core::service::AwsRequest;
1920 use http::Method;
1921 use parking_lot::RwLock;
1922 use std::collections::HashMap;
1923 use std::sync::Arc;
1924
1925 fn svc() -> LambdaService {
1926 let state: SharedLambdaState = Arc::new(RwLock::new(
1927 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1928 ));
1929 LambdaService::new(state)
1930 }
1931
1932 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1933 AwsRequest {
1934 service: "lambda".to_string(),
1935 method: Method::POST,
1936 raw_path: format!("/{}", segs.join("/")),
1937 raw_query: String::new(),
1938 path_segments: segs.iter().map(|s| s.to_string()).collect(),
1939 query_params: HashMap::new(),
1940 headers: http::HeaderMap::new(),
1941 body: bytes::Bytes::from(body.to_string()),
1942 body_stream: parking_lot::Mutex::new(None),
1943 account_id: "000000000000".to_string(),
1944 region: "us-east-1".to_string(),
1945 request_id: "rid".to_string(),
1946 action: action.to_string(),
1947 is_query_protocol: false,
1948 access_key_id: None,
1949 principal: None,
1950 }
1951 }
1952
1953 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1954 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1955 match r {
1956 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1957 Err(e) => panic!("{action} failed: {e:?}"),
1958 }
1959 }
1960
1961 #[tokio::test]
1962 async fn read_only_listings_succeed_without_state() {
1963 let s = svc();
1964 run(&s, "GetAccountSettings", "", None, &[]).await;
1965 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1966 run(&s, "InvokeWithResponseStream", r#"{}"#, Some("fn"), &[]).await;
1967 run(&s, "ListLayers", "", None, &[]).await;
1968 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1969 run(&s, "ListCapacityProviders", "", None, &[]).await;
1970 }
1971
1972 #[tokio::test]
1973 async fn layers_lifecycle() {
1974 let s = svc();
1975 run(
1976 &s,
1977 "PublishLayerVersion",
1978 r#"{"Content":{"ZipFile":""}}"#,
1979 Some("layer1"),
1980 &["2018-10-31", "layers", "layer1", "versions"],
1981 )
1982 .await;
1983 run(&s, "ListLayers", "", None, &[]).await;
1984 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1985 }
1986
1987 #[tokio::test]
1988 async fn capacity_providers_lifecycle() {
1989 let s = svc();
1990 run(
1991 &s,
1992 "CreateCapacityProvider",
1993 r#"{"CapacityProviderName":"cp1"}"#,
1994 None,
1995 &[],
1996 )
1997 .await;
1998 run(&s, "GetCapacityProvider", "", Some("cp1"), &[]).await;
1999 run(&s, "ListCapacityProviders", "", None, &[]).await;
2000 run(&s, "UpdateCapacityProvider", r#"{}"#, Some("cp1"), &[]).await;
2001 run(&s, "DeleteCapacityProvider", "", Some("cp1"), &[]).await;
2002 }
2003
2004 #[tokio::test]
2005 async fn durable_executions() {
2006 let s = svc();
2007 run(
2008 &s,
2009 "CheckpointDurableExecution",
2010 r#"{"FunctionName":"fn"}"#,
2011 Some("d1"),
2012 &[],
2013 )
2014 .await;
2015 run(&s, "GetDurableExecution", "", Some("d1"), &[]).await;
2016 run(&s, "GetDurableExecutionHistory", "", Some("d1"), &[]).await;
2017 run(&s, "GetDurableExecutionState", "", Some("d1"), &[]).await;
2018 run(&s, "StopDurableExecution", "", Some("d1"), &[]).await;
2019 }
2020
2021 #[tokio::test]
2022 async fn code_signing_lifecycle() {
2023 let s = svc();
2024 run(
2025 &s,
2026 "CreateCodeSigningConfig",
2027 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
2028 None,
2029 &[],
2030 )
2031 .await;
2032 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
2033 }
2034}