1use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15 AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16 FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17 ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20pub(crate) fn resolve_layer_attachments(
25 accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26 arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28 arns.into_iter()
29 .map(|arn| {
30 let code_size = parse_layer_version_arn(&arn)
31 .and_then(|(acct, name, ver)| {
32 accounts
33 .get(&acct)
34 .and_then(|s| s.layers.get(&name))
35 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36 .map(|v| v.code_size)
37 })
38 .unwrap_or(0);
39 AttachedLayer { arn, code_size }
40 })
41 .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 format!("Missing required field: {name}"),
49 )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53 AwsServiceError::aws_error(
54 StatusCode::NOT_FOUND,
55 "ResourceNotFoundException",
56 format!("{entity} not found: {name}"),
57 )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72fn function_name_from_arn(arn: &str) -> Option<String> {
79 let rest = arn.strip_prefix("arn:aws:lambda:")?;
80 let mut parts = rest.splitn(5, ':');
81 let _region = parts.next()?;
82 let _account = parts.next()?;
83 let resource_kind = parts.next()?;
84 if resource_kind != "function" {
85 return None;
86 }
87 let name_with_qualifier = parts.next()?;
88 Some(
89 name_with_qualifier
90 .split(':')
91 .next()
92 .unwrap_or(name_with_qualifier)
93 .to_string(),
94 )
95}
96
97fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104 raw_query
105 .split('&')
106 .filter(|s| !s.is_empty())
107 .map(|pair| {
108 let mut it = pair.splitn(2, '=');
109 let k = it.next().unwrap_or("");
110 let v = it.next().unwrap_or("");
111 (decode_query_segment(k), decode_query_segment(v))
112 })
113 .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117 let plus_decoded = s.replace('+', " ");
120 percent_encoding::percent_decode_str(&plus_decoded)
121 .decode_utf8_lossy()
122 .into_owned()
123}
124
125fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130 let host = req
131 .headers
132 .get(http::header::HOST)
133 .and_then(|h| h.to_str().ok())
134 .unwrap_or("localhost");
135 let scheme = req
136 .headers
137 .get("x-forwarded-proto")
138 .and_then(|h| h.to_str().ok())
139 .unwrap_or("http");
140 format!(
141 "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142 )
143}
144
145pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
149 let parts: Vec<&str> = arn.split(':').collect();
150 if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
151 return None;
152 }
153 let account = parts[4].to_string();
154 let name = parts[6].to_string();
155 let version: i64 = parts[7].parse().ok()?;
156 Some((account, name, version))
157}
158
159fn parse_qualifier(req: &AwsRequest) -> String {
160 req.query_params
161 .get("Qualifier")
162 .cloned()
163 .unwrap_or_else(|| "$LATEST".to_string())
164}
165
166fn id_from_time(prefix: &str) -> String {
167 format!(
168 "{}{}",
169 prefix,
170 std::time::SystemTime::now()
171 .duration_since(std::time::UNIX_EPOCH)
172 .map(|d| d.as_nanos())
173 .unwrap_or(0)
174 )
175}
176
177impl LambdaService {
178 pub(crate) async fn handle_extra(
179 &self,
180 action: &str,
181 resource: Option<&str>,
182 req: &AwsRequest,
183 ) -> Result<AwsResponse, AwsServiceError> {
184 let aid = req.account_id.as_str();
185 let res = resource.unwrap_or("");
186 match action {
187 "GetFunctionConfiguration" => self.get_function_configuration(res, aid, req),
189 "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
190 "UpdateFunctionCode" => self.update_function_code(res, req),
191 "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
192 "GetAccountSettings" => self.get_account_settings(aid),
193 "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
194 "InvokeWithResponseStream" => self.invoke_with_response_stream(res, aid, req).await,
195
196 "ListVersionsByFunction" => self.list_versions_by_function(res, aid, req),
198
199 "CreateAlias" => self.create_alias(res, req),
201 "GetAlias" => self.get_alias(res, req),
202 "ListAliases" => self.list_aliases(res, aid),
203 "UpdateAlias" => self.update_alias(res, req),
204 "DeleteAlias" => self.delete_alias(res, req),
205
206 "PublishLayerVersion" => self.publish_layer_version(res, req),
208 "GetLayerVersion" => self.get_layer_version(req),
209 "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
210 "ListLayers" => self.list_layers(aid),
211 "ListLayerVersions" => self.list_layer_versions(res, aid),
212 "DeleteLayerVersion" => self.delete_layer_version(req),
213 "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
214 "AddLayerVersionPermission" => self.add_layer_version_permission(req),
215 "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
216
217 "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
219 "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
220 "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
221 "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
222 "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
223
224 "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
226 "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
227 "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
228 "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
229 "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
230 "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
231 "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
232
233 "CreateCodeSigningConfig" => self.create_code_signing_config(req),
235 "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
236 "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
237 "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
238 "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
239 "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
240 "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
241 "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
242 "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
243
244 "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
246 self.put_function_event_invoke(res, req)
247 }
248 "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
249 "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
250 "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
251
252 "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
254 "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
255
256 "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
258 "GetFunctionScalingConfig" => self.get_scaling_config(res, aid),
259
260 "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
262 "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
263
264 "TagResource" => self.tag_resource(res, req),
266 "UntagResource" => self.untag_resource(res, req),
267 "ListTags" => self.list_tags(res, aid),
268
269 _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
270 }
271 }
272
273 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
274 where
275 F: FnOnce(&LambdaState) -> R,
276 {
277 let accounts = self.state.read();
278 let empty = LambdaState::new(account_id, region);
279 let state = accounts.get(account_id).unwrap_or(&empty);
280 f(state)
281 }
282
283 fn get_function_configuration(
286 &self,
287 function_name: &str,
288 account_id: &str,
289 req: &AwsRequest,
290 ) -> Result<AwsResponse, AwsServiceError> {
291 let region = self.region_for(account_id);
292 let qualifier = req.query_params.get("Qualifier").cloned();
293 self.with_state_read(account_id, ®ion, |state| {
294 let live = state
295 .functions
296 .get(function_name)
297 .ok_or_else(|| not_found("Function", function_name))?;
298 let resolved = crate::service::resolve_qualifier_to_version(
302 state,
303 function_name,
304 qualifier.as_deref(),
305 );
306 let (func, version_label) = match resolved {
307 None => (live, "$LATEST".to_string()),
308 Some(v) => {
309 let snap = state
310 .function_version_snapshots
311 .get(function_name)
312 .and_then(|m| m.get(&v))
313 .ok_or_else(|| not_found("Function", function_name))?;
314 (snap, v)
315 }
316 };
317 let mut config = self.function_config_json(func);
318 config["Version"] = json!(version_label);
319 if version_label != "$LATEST" {
320 config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
321 config["MasterArn"] = json!(live.function_arn);
322 }
323 ok(config)
324 })
325 }
326
327 fn update_function_configuration(
328 &self,
329 function_name: &str,
330 req: &AwsRequest,
331 ) -> Result<AwsResponse, AwsServiceError> {
332 let body = body(req);
333 let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
337 Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
338 None => None,
339 };
340 let mut accounts = self.state.write();
341 let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
344 let arns: Vec<String> = arr
345 .iter()
346 .filter_map(|v| v.as_str().map(String::from))
347 .collect();
348 resolve_layer_attachments(&accounts, arns)
349 });
350 let state = accounts.get_or_create(&req.account_id);
351 let func = state
352 .functions
353 .get_mut(function_name)
354 .ok_or_else(|| not_found("Function", function_name))?;
355 if let Some(handler) = body["Handler"].as_str() {
356 func.handler = handler.to_string();
357 }
358 if let Some(t) = body["Timeout"].as_i64() {
359 func.timeout = t;
360 }
361 if let Some(m) = body["MemorySize"].as_i64() {
362 func.memory_size = m;
363 }
364 if let Some(role) = body["Role"].as_str() {
365 func.role = role.to_string();
366 }
367 if let Some(desc) = body["Description"].as_str() {
368 func.description = desc.to_string();
369 }
370 if let Some(rt) = body["Runtime"].as_str() {
371 func.runtime = rt.to_string();
372 }
373 if let Some(env) = body["Environment"]["Variables"].as_object() {
374 func.environment = env
375 .iter()
376 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
377 .collect();
378 }
379 if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
380 func.tracing_mode = Some(mode.to_string());
381 }
382 if let Some(arn) = body["KMSKeyArn"].as_str() {
383 func.kms_key_arn = if arn.is_empty() {
384 None
385 } else {
386 Some(arn.to_string())
387 };
388 }
389 if let Some(size) = validated_ephemeral {
390 func.ephemeral_storage_size = Some(size);
391 }
392 if body["VpcConfig"].is_object() {
393 func.vpc_config = Some(body["VpcConfig"].clone());
394 }
395 if body["SnapStart"].is_object() {
396 func.snap_start = Some(body["SnapStart"].clone());
397 }
398 if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
399 func.dead_letter_config_arn = if arn.is_empty() {
400 None
401 } else {
402 Some(arn.to_string())
403 };
404 }
405 if let Some(fsc) = body["FileSystemConfigs"].as_array() {
406 func.file_system_configs = fsc.clone();
407 }
408 if body["LoggingConfig"].is_object() {
409 func.logging_config = Some(body["LoggingConfig"].clone());
410 }
411 if body["ImageConfig"].is_object() {
412 func.image_config = Some(body["ImageConfig"].clone());
413 }
414 if let Some(attachments) = layer_attachments {
415 func.layers = attachments;
416 }
417 func.revision_id = uuid::Uuid::new_v4().to_string();
421 func.last_modified = Utc::now();
422 ok(self.function_config_json(func))
423 }
424
425 fn update_function_code(
426 &self,
427 function_name: &str,
428 req: &AwsRequest,
429 ) -> Result<AwsResponse, AwsServiceError> {
430 let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
431
432 let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
437 Some(b64) => Some(
438 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
439 |_| {
440 AwsServiceError::aws_error(
441 StatusCode::BAD_REQUEST,
442 "InvalidParameterValueException",
443 "Could not decode ZipFile: invalid base64",
444 )
445 },
446 )?,
447 ),
448 None => None,
449 };
450 let new_image_uri = body["ImageUri"].as_str().map(String::from);
451 let s3_fetched_zip: Option<Vec<u8>> = match (
464 body["S3Bucket"].as_str(),
465 body["S3Key"].as_str(),
466 ) {
467 (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
468 if let Some(s3) = &self.s3_delivery {
469 match s3.get_object(&req.account_id, bucket, key) {
470 Ok(bytes) => Some(bytes),
471 Err(e) => {
472 return Err(AwsServiceError::aws_error(
473 StatusCode::BAD_REQUEST,
474 "InvalidParameterValueException",
475 format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
476 ));
477 }
478 }
479 } else {
480 None
481 }
482 }
483 _ => None,
484 };
485
486 let new_s3_descriptor: Option<Vec<u8>> =
487 match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
488 (Some(bucket), Some(key))
489 if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
490 {
491 let mut descriptor = serde_json::Map::new();
492 descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
493 descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
494 if let Some(ver) = body["S3ObjectVersion"].as_str() {
495 descriptor.insert(
496 "S3ObjectVersion".to_string(),
497 Value::String(ver.to_string()),
498 );
499 }
500 Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
501 }
502 _ => None,
503 };
504 let new_zip = new_zip.or(s3_fetched_zip);
505 let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
506 let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
507 let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
508 arr.iter()
509 .filter_map(|v| v.as_str().map(String::from))
510 .collect()
511 });
512 let dry_run = body["DryRun"].as_bool().unwrap_or(false);
513 let publish = body["Publish"].as_bool().unwrap_or(false);
514
515 let mut accounts = self.state.write();
516 let state = accounts.get_or_create(&req.account_id);
517
518 if !state.functions.contains_key(function_name) {
522 return Err(not_found("Function", function_name));
523 }
524
525 if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
530 let csc_id = extract_csc_id(&csc_arn);
531 if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
532 if !csc.allowed_publishers.is_empty()
533 && csc
534 .untrusted_artifact_action
535 .eq_ignore_ascii_case("Enforce")
536 {
537 let allowed = match supplied_signing_profile.as_deref() {
538 Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
539 None => false,
540 };
541 if !allowed {
542 return Err(AwsServiceError::aws_error(
543 StatusCode::BAD_REQUEST,
544 "CodeVerificationFailedException",
545 "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
546 ));
547 }
548 }
549 }
550 }
551
552 let func = state
553 .functions
554 .get_mut(function_name)
555 .ok_or_else(|| not_found("Function", function_name))?;
556
557 if let Some(ref rev) = supplied_revision_id {
561 if rev != &func.revision_id {
562 return Err(AwsServiceError::aws_error(
563 StatusCode::PRECONDITION_FAILED,
564 "PreconditionFailedException",
565 format!(
566 "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
567 ),
568 ));
569 }
570 }
571
572 if dry_run {
574 return ok(self.function_config_json(func));
575 }
576
577 let mut changed = false;
578 if let Some(bytes) = new_zip {
579 let mut hasher = Sha256::new();
582 hasher.update(&bytes);
583 let hash = hasher.finalize();
584 let code_sha256 =
585 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
586 if code_sha256 != func.code_sha256 {
587 changed = true;
588 }
589 func.code_size = bytes.len() as i64;
590 func.code_zip = Some(bytes);
591 func.code_sha256 = code_sha256;
592 func.image_uri = None;
593 func.package_type = "Zip".to_string();
594 } else if let Some(descriptor_bytes) = new_s3_descriptor {
595 let mut hasher = Sha256::new();
602 hasher.update(&descriptor_bytes);
603 let hash = hasher.finalize();
604 let code_sha256 =
605 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
606 if code_sha256 != func.code_sha256 {
607 changed = true;
608 }
609 func.code_size = descriptor_bytes.len() as i64;
610 func.code_zip = None;
614 func.code_sha256 = code_sha256;
615 func.image_uri = None;
616 func.package_type = "Zip".to_string();
617 } else if let Some(uri) = new_image_uri {
618 if func.image_uri.as_deref() != Some(uri.as_str()) {
619 changed = true;
620 }
621 func.image_uri = Some(uri);
622 func.code_zip = None;
623 func.package_type = "Image".to_string();
624 func.code_size = 0;
628 func.code_sha256 = String::new();
629 }
630
631 if let Some(arns) = new_architectures {
632 if !arns.is_empty() && arns != func.architectures {
633 changed = true;
634 func.architectures = arns;
635 }
636 }
637
638 if let Some(arn) = supplied_signing_profile {
639 if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
640 changed = true;
641 }
642 func.signing_profile_version_arn = Some(arn);
643 }
644
645 func.last_modified = Utc::now();
650 if changed {
651 func.revision_id = uuid::Uuid::new_v4().to_string();
652 }
653 func.last_update_status_reason = None;
657 func.last_update_status_reason_code = None;
658
659 if publish {
662 drop(accounts);
663 return self.publish_version(function_name, &req.account_id, req);
664 }
665
666 ok(self.function_config_json(func))
667 }
668
669 fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
670 let mut accounts = self.state.write();
671 let state = accounts.get_or_create(account_id);
672 let settings = state.account_settings.clone().unwrap_or(AccountSettings {
673 concurrent_executions: 1000,
674 code_size_zipped: 52_428_800,
675 code_size_unzipped: 262_144_000,
676 total_code_size: 80_530_636_800,
677 });
678 if state.account_settings.is_none() {
679 state.account_settings = Some(settings.clone());
680 }
681 let function_count = state.functions.len() as i64;
684 let total_code_size: i64 = state.functions.values().map(|f| f.code_size).sum();
685 ok(json!({
686 "AccountLimit": {
687 "ConcurrentExecutions": settings.concurrent_executions,
688 "CodeSizeZipped": settings.code_size_zipped,
689 "CodeSizeUnzipped": settings.code_size_unzipped,
690 "TotalCodeSize": settings.total_code_size,
691 "UnreservedConcurrentExecutions": settings.concurrent_executions,
692 },
693 "AccountUsage": {
694 "TotalCodeSize": total_code_size,
695 "FunctionCount": function_count,
696 },
697 }))
698 }
699
700 fn list_versions_by_function(
703 &self,
704 function_name: &str,
705 account_id: &str,
706 req: &AwsRequest,
707 ) -> Result<AwsResponse, AwsServiceError> {
708 let region = self.region_for(account_id);
709 let max_items: usize = req
710 .query_params
711 .get("MaxItems")
712 .and_then(|v| v.parse::<usize>().ok())
713 .map(|n| n.clamp(1, 50))
714 .unwrap_or(50);
715 let marker = req.query_params.get("Marker").cloned();
716 self.with_state_read(account_id, ®ion, |state| {
717 let func = state
718 .functions
719 .get(function_name)
720 .ok_or_else(|| not_found("Function", function_name))?;
721 let mut all: Vec<serde_json::Value> = Vec::new();
725 let mut latest = self.function_config_json(func);
726 latest["Version"] = json!("$LATEST");
727 all.push(latest);
728 let snapshots = state.function_version_snapshots.get(function_name);
729 if let Some(numbered) = state.function_versions.get(function_name) {
730 for v in numbered {
731 let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
732 let mut cfg = self.function_config_json(snap);
733 cfg["Version"] = json!(v);
734 cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
735 cfg["MasterArn"] = json!(func.function_arn);
736 all.push(cfg);
737 }
738 }
739 let start = match marker.as_deref() {
743 Some(m) => all
744 .iter()
745 .position(|v| v["Version"].as_str() == Some(m))
746 .map(|i| i + 1)
747 .unwrap_or(0),
748 None => 0,
749 };
750 let end = (start + max_items).min(all.len());
751 let page: Vec<serde_json::Value> = all[start..end].to_vec();
752 let mut body = json!({ "Versions": page });
753 if end < all.len() {
754 if let Some(last) = all[end - 1]["Version"].as_str() {
755 body["NextMarker"] = json!(last);
756 }
757 }
758 ok(body)
759 })
760 }
761
762 fn alias_key(function: &str, alias: &str) -> String {
765 format!("{function}:{alias}")
766 }
767
768 fn create_alias(
769 &self,
770 function_name: &str,
771 req: &AwsRequest,
772 ) -> Result<AwsResponse, AwsServiceError> {
773 let body = body(req);
774 let name = body["Name"]
775 .as_str()
776 .ok_or_else(|| missing("Name"))?
777 .to_string();
778 let version = body["FunctionVersion"]
779 .as_str()
780 .unwrap_or("$LATEST")
781 .to_string();
782 let mut accounts = self.state.write();
783 let state = accounts.get_or_create(&req.account_id);
784 if !state.functions.contains_key(function_name) {
785 return Err(not_found("Function", function_name));
786 }
787 let alias_arn = format!(
788 "arn:aws:lambda:{}:{}:function:{}:{}",
789 state.region, state.account_id, function_name, name
790 );
791 let alias = FunctionAlias {
792 alias_arn: alias_arn.clone(),
793 name: name.clone(),
794 function_version: version,
795 description: body["Description"].as_str().unwrap_or("").to_string(),
796 revision_id: id_from_time("rev-"),
797 routing_config: body.get("RoutingConfig").cloned(),
798 };
799 state
800 .aliases
801 .insert(Self::alias_key(function_name, &name), alias.clone());
802 ok(serde_json::to_value(alias).unwrap_or_default())
803 }
804
805 fn get_alias(
806 &self,
807 function_name: &str,
808 req: &AwsRequest,
809 ) -> Result<AwsResponse, AwsServiceError> {
810 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
811 let region = self.region_for(&req.account_id);
812 self.with_state_read(&req.account_id, ®ion, |state| {
813 state
814 .aliases
815 .get(&Self::alias_key(function_name, &alias_name))
816 .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
817 .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
818 })
819 }
820
821 fn list_aliases(
822 &self,
823 function_name: &str,
824 account_id: &str,
825 ) -> Result<AwsResponse, AwsServiceError> {
826 let region = self.region_for(account_id);
827 self.with_state_read(account_id, ®ion, |state| {
828 let prefix = format!("{function_name}:");
829 let aliases: Vec<&FunctionAlias> = state
830 .aliases
831 .iter()
832 .filter(|(k, _)| k.starts_with(&prefix))
833 .map(|(_, v)| v)
834 .collect();
835 ok(json!({"Aliases": aliases}))
836 })
837 }
838
839 fn update_alias(
840 &self,
841 function_name: &str,
842 req: &AwsRequest,
843 ) -> Result<AwsResponse, AwsServiceError> {
844 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
845 let body = body(req);
846 let mut accounts = self.state.write();
847 let state = accounts.get_or_create(&req.account_id);
848 let key = Self::alias_key(function_name, &alias_name);
849 let alias = state
850 .aliases
851 .get_mut(&key)
852 .ok_or_else(|| not_found("Alias", &alias_name))?;
853 if let Some(v) = body["FunctionVersion"].as_str() {
854 alias.function_version = v.to_string();
855 }
856 if let Some(d) = body["Description"].as_str() {
857 alias.description = d.to_string();
858 }
859 if let Some(rc) = body.get("RoutingConfig") {
860 alias.routing_config = Some(rc.clone());
861 }
862 alias.revision_id = id_from_time("rev-");
863 ok(serde_json::to_value(alias).unwrap_or_default())
864 }
865
866 fn delete_alias(
867 &self,
868 function_name: &str,
869 req: &AwsRequest,
870 ) -> Result<AwsResponse, AwsServiceError> {
871 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
872 let mut accounts = self.state.write();
873 let state = accounts.get_or_create(&req.account_id);
874 state
875 .aliases
876 .remove(&Self::alias_key(function_name, &alias_name));
877 empty()
878 }
879
880 fn publish_layer_version(
883 &self,
884 layer_name: &str,
885 req: &AwsRequest,
886 ) -> Result<AwsResponse, AwsServiceError> {
887 let body = body(req);
888 let zip_bytes: Option<Vec<u8>> = match body["Content"]["ZipFile"].as_str() {
889 Some(b64) => Some(
890 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
891 |_| {
892 AwsServiceError::aws_error(
893 StatusCode::BAD_REQUEST,
894 "InvalidParameterValueException",
895 "Could not decode Content.ZipFile: invalid base64",
896 )
897 },
898 )?,
899 ),
900 None => None,
901 };
902 let (code_sha256, code_size) = match zip_bytes.as_deref() {
903 Some(bytes) => {
904 let mut hasher = Sha256::new();
905 hasher.update(bytes);
906 let digest = hasher.finalize();
907 (
908 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, digest),
909 bytes.len() as i64,
910 )
911 }
912 None => (String::new(), 0),
913 };
914
915 let mut accounts = self.state.write();
916 let state = accounts.get_or_create(&req.account_id);
917 let account_id = state.account_id.clone();
918 let layer = state
919 .layers
920 .entry(layer_name.to_string())
921 .or_insert_with(|| Layer {
922 layer_name: layer_name.to_string(),
923 layer_arn: format!(
924 "arn:aws:lambda:{}:{}:layer:{}",
925 state.region, state.account_id, layer_name
926 ),
927 versions: Vec::new(),
928 });
929 let next_version = (layer.versions.len() as i64) + 1;
930 let version_arn = format!("{}:{}", layer.layer_arn, next_version);
931 let runtimes: Vec<String> = body["CompatibleRuntimes"]
932 .as_array()
933 .map(|arr| {
934 arr.iter()
935 .filter_map(|v| v.as_str().map(String::from))
936 .collect()
937 })
938 .unwrap_or_default();
939 let architectures: Vec<String> = body["CompatibleArchitectures"]
940 .as_array()
941 .map(|arr| {
942 arr.iter()
943 .filter_map(|v| v.as_str().map(String::from))
944 .collect()
945 })
946 .unwrap_or_default();
947 let layer_arn = layer.layer_arn.clone();
948 let lv = LayerVersion {
949 version: next_version,
950 layer_version_arn: version_arn.clone(),
951 description: body["Description"].as_str().unwrap_or("").to_string(),
952 created_date: Utc::now(),
953 compatible_runtimes: runtimes,
954 license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
955 policy: None,
956 code_zip: zip_bytes,
957 code_sha256: code_sha256.clone(),
958 code_size,
959 compatible_architectures: architectures,
960 };
961 layer.versions.push(lv.clone());
962 let location = layer_content_url(req, &account_id, layer_name, next_version);
963 ok(json!({
964 "LayerArn": layer_arn,
965 "LayerVersionArn": version_arn,
966 "Version": next_version,
967 "Description": lv.description,
968 "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
969 "CompatibleRuntimes": lv.compatible_runtimes,
970 "CompatibleArchitectures": lv.compatible_architectures,
971 "LicenseInfo": lv.license_info,
972 "Content": {
973 "Location": location,
974 "CodeSha256": code_sha256,
975 "CodeSize": code_size,
976 },
977 }))
978 }
979
980 fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
981 let region = self.region_for(account_id);
982 self.with_state_read(account_id, ®ion, |state| {
983 let layers: Vec<Value> = state
984 .layers
985 .values()
986 .map(|l| {
987 json!({
988 "LayerName": l.layer_name,
989 "LayerArn": l.layer_arn,
990 "LatestMatchingVersion": l.versions.last().map(|v| json!({
991 "LayerVersionArn": v.layer_version_arn,
992 "Version": v.version,
993 "Description": v.description,
994 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
995 "CompatibleRuntimes": v.compatible_runtimes,
996 "CompatibleArchitectures": v.compatible_architectures,
997 })),
998 })
999 })
1000 .collect();
1001 ok(json!({"Layers": layers}))
1002 })
1003 }
1004
1005 fn list_layer_versions(
1006 &self,
1007 layer_name: &str,
1008 account_id: &str,
1009 ) -> Result<AwsResponse, AwsServiceError> {
1010 let region = self.region_for(account_id);
1011 self.with_state_read(account_id, ®ion, |state| {
1012 let versions: Vec<Value> = state
1013 .layers
1014 .get(layer_name)
1015 .map(|l| {
1016 l.versions
1017 .iter()
1018 .map(|v| {
1019 json!({
1020 "LayerVersionArn": v.layer_version_arn,
1021 "Version": v.version,
1022 "Description": v.description,
1023 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1024 "CompatibleRuntimes": v.compatible_runtimes,
1025 "CompatibleArchitectures": v.compatible_architectures,
1026 "LicenseInfo": v.license_info,
1027 })
1028 })
1029 .collect()
1030 })
1031 .unwrap_or_default();
1032 ok(json!({"LayerVersions": versions}))
1033 })
1034 }
1035
1036 fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1037 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1038 let version: i64 = req
1039 .path_segments
1040 .get(4)
1041 .and_then(|s| s.parse().ok())
1042 .ok_or_else(|| missing("VersionNumber"))?;
1043 let region = self.region_for(&req.account_id);
1044 let location = layer_content_url(req, &req.account_id, &layer_name, version);
1045 self.with_state_read(&req.account_id, ®ion, |state| {
1046 state
1047 .layers
1048 .get(&layer_name)
1049 .and_then(|l| l.versions.iter().find(|v| v.version == version))
1050 .map(|v| {
1051 ok(json!({
1052 "LayerVersionArn": v.layer_version_arn,
1053 "Version": v.version,
1054 "Description": v.description,
1055 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1056 "CompatibleRuntimes": v.compatible_runtimes,
1057 "CompatibleArchitectures": v.compatible_architectures,
1058 "LicenseInfo": v.license_info,
1059 "Content": {
1060 "Location": location,
1061 "CodeSha256": v.code_sha256,
1062 "CodeSize": v.code_size,
1063 },
1064 }))
1065 })
1066 .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
1067 })
1068 }
1069
1070 fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1071 let arn = req
1072 .query_params
1073 .get("Arn")
1074 .or_else(|| req.query_params.get("find"))
1075 .cloned()
1076 .unwrap_or_default();
1077 let (account_id, layer_name, version) =
1078 parse_layer_version_arn(&arn).ok_or_else(|| missing("Arn"))?;
1079 let region = self.region_for(&account_id);
1080 let location = layer_content_url(req, &account_id, &layer_name, version);
1081 self.with_state_read(&account_id, ®ion, |state| {
1082 state
1083 .layers
1084 .get(&layer_name)
1085 .and_then(|l| l.versions.iter().find(|v| v.version == version))
1086 .map(|v| {
1087 ok(json!({
1088 "LayerVersionArn": v.layer_version_arn,
1089 "Version": v.version,
1090 "Description": v.description,
1091 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1092 "CompatibleRuntimes": v.compatible_runtimes,
1093 "CompatibleArchitectures": v.compatible_architectures,
1094 "LicenseInfo": v.license_info,
1095 "Content": {
1096 "Location": location,
1097 "CodeSha256": v.code_sha256,
1098 "CodeSize": v.code_size,
1099 },
1100 }))
1101 })
1102 .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
1103 })
1104 }
1105
1106 fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1108 let version: i64 = req
1109 .path_segments
1110 .get(4)
1111 .and_then(|s| s.parse().ok())
1112 .unwrap_or(0);
1113 let mut accounts = self.state.write();
1114 let state = accounts.get_or_create(&req.account_id);
1115 if let Some(layer) = state.layers.get_mut(&layer_name) {
1116 layer.versions.retain(|v| v.version != version);
1117 }
1118 empty()
1119 }
1120
1121 fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1122 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1123 let version: i64 = req
1124 .path_segments
1125 .get(4)
1126 .and_then(|s| s.parse().ok())
1127 .unwrap_or(0);
1128 let region = self.region_for(&req.account_id);
1129 self.with_state_read(&req.account_id, ®ion, |state| {
1130 let policy = state
1131 .layers
1132 .get(&layer_name)
1133 .and_then(|l| l.versions.iter().find(|v| v.version == version))
1134 .and_then(|v| v.policy.clone())
1135 .unwrap_or_else(|| "{}".to_string());
1136 ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
1137 })
1138 }
1139
1140 fn add_layer_version_permission(
1141 &self,
1142 req: &AwsRequest,
1143 ) -> Result<AwsResponse, AwsServiceError> {
1144 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1145 let version: i64 = req
1146 .path_segments
1147 .get(4)
1148 .and_then(|s| s.parse().ok())
1149 .unwrap_or(0);
1150 let body = body(req);
1151 let mut accounts = self.state.write();
1152 let state = accounts.get_or_create(&req.account_id);
1153 if let Some(layer) = state.layers.get_mut(&layer_name) {
1154 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1155 let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
1156 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1157 let statements = policy_doc["Statement"].as_array_mut();
1158 let new_stmt = json!({
1159 "Sid": body["StatementId"].as_str().unwrap_or("default"),
1160 "Effect": "Allow",
1161 "Principal": body["Principal"].clone(),
1162 "Action": body["Action"].clone(),
1163 "Resource": v.layer_version_arn.clone(),
1164 });
1165 if let Some(s) = statements {
1166 s.push(new_stmt);
1167 } else {
1168 policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
1169 }
1170 v.policy = Some(policy_doc.to_string());
1171 }
1172 }
1173 ok(json!({
1174 "Statement": body["StatementId"],
1175 "RevisionId": id_from_time("rev-"),
1176 }))
1177 }
1178
1179 fn remove_layer_version_permission(
1180 &self,
1181 req: &AwsRequest,
1182 ) -> Result<AwsResponse, AwsServiceError> {
1183 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
1184 let version: i64 = req
1185 .path_segments
1186 .get(4)
1187 .and_then(|s| s.parse().ok())
1188 .unwrap_or(0);
1189 let sid = req.path_segments.get(6).cloned().unwrap_or_default();
1190 let mut accounts = self.state.write();
1191 let state = accounts.get_or_create(&req.account_id);
1192 if let Some(layer) = state.layers.get_mut(&layer_name) {
1193 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
1194 if let Some(policy) = v.policy.clone() {
1195 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
1196 if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
1197 stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
1198 }
1199 v.policy = Some(policy_doc.to_string());
1200 }
1201 }
1202 }
1203 empty()
1204 }
1205
1206 fn function_url_config_json(cfg: &FunctionUrlConfig) -> Value {
1214 let mut out = json!({
1215 "FunctionArn": cfg.function_arn,
1216 "FunctionUrl": cfg.function_url,
1217 "AuthType": cfg.auth_type,
1218 "InvokeMode": cfg.invoke_mode,
1219 "CreationTime": cfg.creation_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1220 "LastModifiedTime": cfg.last_modified_time.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1221 });
1222 if let Some(cors) = &cfg.cors {
1223 out["Cors"] = cors.clone();
1224 }
1225 out
1226 }
1227
1228 fn create_function_url_config(
1229 &self,
1230 function_name: &str,
1231 req: &AwsRequest,
1232 ) -> Result<AwsResponse, AwsServiceError> {
1233 let body = body(req);
1234 let auth_type = body["AuthType"].as_str().unwrap_or("NONE").to_string();
1235 let now = Utc::now();
1236 let mut accounts = self.state.write();
1237 let state = accounts.get_or_create(&req.account_id);
1238 if !state.functions.contains_key(function_name) {
1239 return Err(not_found("Function", function_name));
1240 }
1241 let function_arn = format!(
1242 "arn:aws:lambda:{}:{}:function:{}",
1243 state.region, state.account_id, function_name
1244 );
1245 let cfg = FunctionUrlConfig {
1246 function_arn: function_arn.clone(),
1247 function_url: format!(
1248 "https://{function_name}.lambda-url.{}.on.aws/",
1249 state.region
1250 ),
1251 auth_type: auth_type.clone(),
1252 cors: body.get("Cors").cloned(),
1253 creation_time: now,
1254 last_modified_time: now,
1255 invoke_mode: body["InvokeMode"]
1256 .as_str()
1257 .unwrap_or("BUFFERED")
1258 .to_string(),
1259 };
1260 state
1261 .function_url_configs
1262 .insert(function_name.to_string(), cfg.clone());
1263 ok(Self::function_url_config_json(&cfg))
1264 }
1265
1266 fn get_function_url_config(
1267 &self,
1268 function_name: &str,
1269 account_id: &str,
1270 ) -> Result<AwsResponse, AwsServiceError> {
1271 let region = self.region_for(account_id);
1272 self.with_state_read(account_id, ®ion, |state| {
1273 state
1274 .function_url_configs
1275 .get(function_name)
1276 .map(|c| ok(Self::function_url_config_json(c)))
1277 .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
1278 })
1279 }
1280
1281 fn update_function_url_config(
1282 &self,
1283 function_name: &str,
1284 req: &AwsRequest,
1285 ) -> Result<AwsResponse, AwsServiceError> {
1286 let body = body(req);
1287 let mut accounts = self.state.write();
1288 let state = accounts.get_or_create(&req.account_id);
1289 let cfg = state
1290 .function_url_configs
1291 .get_mut(function_name)
1292 .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
1293 if let Some(a) = body["AuthType"].as_str() {
1294 cfg.auth_type = a.to_string();
1295 }
1296 if let Some(c) = body.get("Cors") {
1297 cfg.cors = Some(c.clone());
1298 }
1299 if let Some(m) = body["InvokeMode"].as_str() {
1300 cfg.invoke_mode = m.to_string();
1301 }
1302 cfg.last_modified_time = Utc::now();
1303 let snapshot = cfg.clone();
1304 ok(Self::function_url_config_json(&snapshot))
1305 }
1306
1307 fn delete_function_url_config(
1308 &self,
1309 function_name: &str,
1310 account_id: &str,
1311 ) -> Result<AwsResponse, AwsServiceError> {
1312 let mut accounts = self.state.write();
1313 let state = accounts.get_or_create(account_id);
1314 state.function_url_configs.remove(function_name);
1315 empty()
1316 }
1317
1318 fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1319 let region = self.region_for(account_id);
1320 self.with_state_read(account_id, ®ion, |state| {
1321 let configs: Vec<Value> = state
1322 .function_url_configs
1323 .values()
1324 .map(Self::function_url_config_json)
1325 .collect();
1326 ok(json!({"FunctionUrlConfigs": configs}))
1327 })
1328 }
1329
1330 fn put_function_concurrency(
1333 &self,
1334 function_name: &str,
1335 req: &AwsRequest,
1336 ) -> Result<AwsResponse, AwsServiceError> {
1337 let body = body(req);
1338 let n = body["ReservedConcurrentExecutions"]
1339 .as_i64()
1340 .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
1341 let mut accounts = self.state.write();
1342 let state = accounts.get_or_create(&req.account_id);
1343 state
1344 .function_concurrency
1345 .insert(function_name.to_string(), n);
1346 ok(json!({"ReservedConcurrentExecutions": n}))
1347 }
1348
1349 fn get_function_concurrency(
1350 &self,
1351 function_name: &str,
1352 account_id: &str,
1353 ) -> Result<AwsResponse, AwsServiceError> {
1354 let region = self.region_for(account_id);
1355 self.with_state_read(account_id, ®ion, |state| {
1356 let n = state
1357 .function_concurrency
1358 .get(function_name)
1359 .copied()
1360 .unwrap_or(0);
1361 ok(json!({"ReservedConcurrentExecutions": n}))
1362 })
1363 }
1364
1365 fn delete_function_concurrency(
1366 &self,
1367 function_name: &str,
1368 account_id: &str,
1369 ) -> Result<AwsResponse, AwsServiceError> {
1370 let mut accounts = self.state.write();
1371 let state = accounts.get_or_create(account_id);
1372 state.function_concurrency.remove(function_name);
1373 empty()
1374 }
1375
1376 fn pc_key(function: &str, qualifier: &str) -> String {
1377 format!("{function}:{qualifier}")
1378 }
1379
1380 fn put_provisioned_concurrency(
1381 &self,
1382 function_name: &str,
1383 req: &AwsRequest,
1384 ) -> Result<AwsResponse, AwsServiceError> {
1385 let body = body(req);
1386 let qualifier = parse_qualifier(req);
1387 let requested = body["ProvisionedConcurrentExecutions"]
1388 .as_i64()
1389 .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
1390 let mut accounts = self.state.write();
1391 let state = accounts.get_or_create(&req.account_id);
1392 let cfg = ProvisionedConcurrencyConfig {
1393 requested,
1394 allocated: requested,
1395 status: "READY".to_string(),
1396 last_modified: Utc::now(),
1397 };
1398 state
1399 .provisioned_concurrency
1400 .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
1401 ok(json!({
1402 "RequestedProvisionedConcurrentExecutions": cfg.requested,
1403 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1404 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1405 "Status": cfg.status,
1406 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1407 }))
1408 }
1409
1410 fn get_provisioned_concurrency(
1411 &self,
1412 function_name: &str,
1413 req: &AwsRequest,
1414 ) -> Result<AwsResponse, AwsServiceError> {
1415 let qualifier = parse_qualifier(req);
1416 let region = self.region_for(&req.account_id);
1417 self.with_state_read(&req.account_id, ®ion, |state| {
1418 state
1419 .provisioned_concurrency
1420 .get(&Self::pc_key(function_name, &qualifier))
1421 .map(|cfg| ok(json!({
1422 "RequestedProvisionedConcurrentExecutions": cfg.requested,
1423 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1424 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1425 "Status": cfg.status,
1426 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1427 })))
1428 .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
1429 })
1430 }
1431
1432 fn delete_provisioned_concurrency(
1433 &self,
1434 function_name: &str,
1435 req: &AwsRequest,
1436 ) -> Result<AwsResponse, AwsServiceError> {
1437 let qualifier = parse_qualifier(req);
1438 let mut accounts = self.state.write();
1439 let state = accounts.get_or_create(&req.account_id);
1440 state
1441 .provisioned_concurrency
1442 .remove(&Self::pc_key(function_name, &qualifier));
1443 empty()
1444 }
1445
1446 fn list_provisioned_concurrency(
1447 &self,
1448 function_name: &str,
1449 account_id: &str,
1450 ) -> Result<AwsResponse, AwsServiceError> {
1451 let region = self.region_for(account_id);
1452 self.with_state_read(account_id, ®ion, |state| {
1453 let prefix = format!("{function_name}:");
1454 let configs: Vec<Value> = state
1455 .provisioned_concurrency
1456 .iter()
1457 .filter(|(k, _)| k.starts_with(&prefix))
1458 .map(|(k, cfg)| {
1459 let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
1460 json!({
1461 "FunctionArn": format!(
1462 "arn:aws:lambda:{}:{}:function:{}:{}",
1463 state.region, state.account_id, function_name, qualifier
1464 ),
1465 "Status": cfg.status,
1466 "RequestedProvisionedConcurrentExecutions": cfg.requested,
1467 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
1468 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
1469 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1470 })
1471 })
1472 .collect();
1473 ok(json!({"ProvisionedConcurrencyConfigs": configs}))
1474 })
1475 }
1476
1477 fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1480 let body = body(req);
1481 let mut accounts = self.state.write();
1482 let state = accounts.get_or_create(&req.account_id);
1483 let id = id_from_time("csc-");
1484 let arn = format!(
1485 "arn:aws:lambda:{}:{}:code-signing-config:{}",
1486 state.region, state.account_id, id
1487 );
1488 let publishers: Vec<String> = body
1489 .get("AllowedPublishers")
1490 .and_then(|v| v.get("SigningProfileVersionArns"))
1491 .and_then(|v| v.as_array())
1492 .map(|arr| {
1493 arr.iter()
1494 .filter_map(|x| x.as_str().map(String::from))
1495 .collect()
1496 })
1497 .unwrap_or_default();
1498 let csc = CodeSigningConfig {
1499 csc_id: id.clone(),
1500 csc_arn: arn,
1501 description: body["Description"].as_str().unwrap_or("").to_string(),
1502 allowed_publishers: publishers,
1503 untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
1504 .as_str()
1505 .unwrap_or("Warn")
1506 .to_string(),
1507 last_modified: Utc::now(),
1508 };
1509 state.code_signing_configs.insert(id, csc.clone());
1510 ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
1511 }
1512
1513 fn get_code_signing_config(
1514 &self,
1515 csc_id: &str,
1516 account_id: &str,
1517 ) -> Result<AwsResponse, AwsServiceError> {
1518 let id = extract_csc_id(csc_id);
1519 let region = self.region_for(account_id);
1520 self.with_state_read(account_id, ®ion, |state| {
1521 state
1522 .code_signing_configs
1523 .get(&id)
1524 .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
1525 .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
1526 })
1527 }
1528
1529 fn update_code_signing_config(
1530 &self,
1531 csc_id: &str,
1532 req: &AwsRequest,
1533 ) -> Result<AwsResponse, AwsServiceError> {
1534 let body = body(req);
1535 let mut accounts = self.state.write();
1536 let state = accounts.get_or_create(&req.account_id);
1537 let id = extract_csc_id(csc_id);
1538 let csc = state
1539 .code_signing_configs
1540 .get_mut(&id)
1541 .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1542 if let Some(d) = body["Description"].as_str() {
1543 csc.description = d.to_string();
1544 }
1545 if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1546 {
1547 csc.untrusted_artifact_action = action.to_string();
1548 }
1549 csc.last_modified = Utc::now();
1550 ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1551 }
1552
1553 fn delete_code_signing_config(
1554 &self,
1555 csc_id: &str,
1556 account_id: &str,
1557 ) -> Result<AwsResponse, AwsServiceError> {
1558 let id = extract_csc_id(csc_id);
1559 let mut accounts = self.state.write();
1560 let state = accounts.get_or_create(account_id);
1561 state.code_signing_configs.remove(&id);
1562 empty()
1563 }
1564
1565 fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1566 let region = self.region_for(account_id);
1567 self.with_state_read(account_id, ®ion, |state| {
1568 let cfgs: Vec<Value> = state
1569 .code_signing_configs
1570 .values()
1571 .map(code_signing_json)
1572 .collect();
1573 ok(json!({"CodeSigningConfigs": cfgs}))
1574 })
1575 }
1576
1577 fn put_function_code_signing(
1578 &self,
1579 function_name: &str,
1580 req: &AwsRequest,
1581 ) -> Result<AwsResponse, AwsServiceError> {
1582 let body = body(req);
1583 let csc_arn = body["CodeSigningConfigArn"]
1584 .as_str()
1585 .ok_or_else(|| missing("CodeSigningConfigArn"))?
1586 .to_string();
1587 let mut accounts = self.state.write();
1588 let state = accounts.get_or_create(&req.account_id);
1589 state
1590 .function_code_signing
1591 .insert(function_name.to_string(), csc_arn.clone());
1592 ok(json!({
1593 "CodeSigningConfigArn": csc_arn,
1594 "FunctionName": function_name,
1595 }))
1596 }
1597
1598 fn get_function_code_signing(
1599 &self,
1600 function_name: &str,
1601 account_id: &str,
1602 ) -> Result<AwsResponse, AwsServiceError> {
1603 let region = self.region_for(account_id);
1604 self.with_state_read(account_id, ®ion, |state| {
1605 let arn = state
1606 .function_code_signing
1607 .get(function_name)
1608 .cloned()
1609 .unwrap_or_default();
1610 ok(json!({
1611 "CodeSigningConfigArn": arn,
1612 "FunctionName": function_name,
1613 }))
1614 })
1615 }
1616
1617 fn delete_function_code_signing(
1618 &self,
1619 function_name: &str,
1620 account_id: &str,
1621 ) -> Result<AwsResponse, AwsServiceError> {
1622 let mut accounts = self.state.write();
1623 let state = accounts.get_or_create(account_id);
1624 state.function_code_signing.remove(function_name);
1625 empty()
1626 }
1627
1628 fn list_functions_by_code_signing(
1629 &self,
1630 csc_id: &str,
1631 account_id: &str,
1632 ) -> Result<AwsResponse, AwsServiceError> {
1633 let id = extract_csc_id(csc_id);
1634 let region = self.region_for(account_id);
1635 self.with_state_read(account_id, ®ion, |state| {
1636 let funcs: Vec<&String> = state
1637 .function_code_signing
1638 .iter()
1639 .filter(|(_, v)| v.contains(&id))
1640 .map(|(k, _)| k)
1641 .collect();
1642 ok(json!({"FunctionArns": funcs}))
1643 })
1644 }
1645
1646 fn ev_key(function: &str, qualifier: &str) -> String {
1649 format!("{function}:{qualifier}")
1650 }
1651
1652 fn put_function_event_invoke(
1653 &self,
1654 function_name: &str,
1655 req: &AwsRequest,
1656 ) -> Result<AwsResponse, AwsServiceError> {
1657 let body = body(req);
1658 let qualifier = parse_qualifier(req);
1659 let function_arn = format!(
1660 "arn:aws:lambda:{}:{}:function:{}",
1661 self.region_for(&req.account_id),
1662 req.account_id,
1663 function_name
1664 );
1665 let cfg = EventInvokeConfig {
1666 function_arn: function_arn.clone(),
1667 maximum_event_age: body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600),
1668 maximum_retry_attempts: body["MaximumRetryAttempts"].as_i64().unwrap_or(2),
1669 destination_config: body.get("DestinationConfig").cloned().unwrap_or(json!({})),
1670 last_modified: Utc::now(),
1671 };
1672 let mut accounts = self.state.write();
1673 let state = accounts.get_or_create(&req.account_id);
1674 state
1675 .event_invoke_configs
1676 .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1677 ok(event_invoke_json(&cfg))
1678 }
1679
1680 fn get_function_event_invoke(
1681 &self,
1682 function_name: &str,
1683 req: &AwsRequest,
1684 ) -> Result<AwsResponse, AwsServiceError> {
1685 let qualifier = parse_qualifier(req);
1686 let region = self.region_for(&req.account_id);
1687 self.with_state_read(&req.account_id, ®ion, |state| {
1688 state
1689 .event_invoke_configs
1690 .get(&Self::ev_key(function_name, &qualifier))
1691 .map(|c| ok(event_invoke_json(c)))
1692 .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
1693 })
1694 }
1695
1696 fn delete_function_event_invoke(
1697 &self,
1698 function_name: &str,
1699 req: &AwsRequest,
1700 ) -> Result<AwsResponse, AwsServiceError> {
1701 let qualifier = parse_qualifier(req);
1702 let mut accounts = self.state.write();
1703 let state = accounts.get_or_create(&req.account_id);
1704 state
1705 .event_invoke_configs
1706 .remove(&Self::ev_key(function_name, &qualifier));
1707 empty()
1708 }
1709
1710 fn list_function_event_invoke(
1711 &self,
1712 function_name: &str,
1713 account_id: &str,
1714 ) -> Result<AwsResponse, AwsServiceError> {
1715 let region = self.region_for(account_id);
1716 self.with_state_read(account_id, ®ion, |state| {
1717 let prefix = format!("{function_name}:");
1718 let configs: Vec<Value> = state
1719 .event_invoke_configs
1720 .iter()
1721 .filter(|(k, _)| k.starts_with(&prefix))
1722 .map(|(_, c)| event_invoke_json(c))
1723 .collect();
1724 ok(json!({"FunctionEventInvokeConfigs": configs}))
1725 })
1726 }
1727
1728 fn put_runtime_management(
1731 &self,
1732 function_name: &str,
1733 req: &AwsRequest,
1734 ) -> Result<AwsResponse, AwsServiceError> {
1735 let body = body(req);
1736 let qualifier = parse_qualifier(req);
1737 let cfg = RuntimeManagementConfig {
1738 update_runtime_on: body["UpdateRuntimeOn"]
1739 .as_str()
1740 .unwrap_or("Auto")
1741 .to_string(),
1742 runtime_version_arn: body["RuntimeVersionArn"].as_str().unwrap_or("").to_string(),
1743 };
1744 let mut accounts = self.state.write();
1745 let state = accounts.get_or_create(&req.account_id);
1746 state
1747 .runtime_management
1748 .insert(format!("{function_name}:{qualifier}"), cfg.clone());
1749 ok(json!({
1750 "FunctionArn": Arn::new("lambda", &state.region, &state.account_id, &format!("function:{function_name}:{qualifier}")).to_string(),
1751 "UpdateRuntimeOn": cfg.update_runtime_on,
1752 "RuntimeVersionArn": cfg.runtime_version_arn,
1753 }))
1754 }
1755
1756 fn get_runtime_management(
1757 &self,
1758 function_name: &str,
1759 req: &AwsRequest,
1760 ) -> Result<AwsResponse, AwsServiceError> {
1761 let qualifier = parse_qualifier(req);
1762 let region = self.region_for(&req.account_id);
1763 self.with_state_read(&req.account_id, ®ion, |state| {
1764 let cfg = state
1765 .runtime_management
1766 .get(&format!("{function_name}:{qualifier}"))
1767 .cloned()
1768 .unwrap_or(RuntimeManagementConfig {
1769 update_runtime_on: "Auto".to_string(),
1770 runtime_version_arn: String::new(),
1771 });
1772 ok(json!({
1773 "FunctionArn": format!(
1774 "arn:aws:lambda:{}:{}:function:{}:{}",
1775 state.region, state.account_id, function_name, qualifier
1776 ),
1777 "UpdateRuntimeOn": cfg.update_runtime_on,
1778 "RuntimeVersionArn": cfg.runtime_version_arn,
1779 }))
1780 })
1781 }
1782
1783 fn put_scaling_config(
1786 &self,
1787 uuid: &str,
1788 req: &AwsRequest,
1789 ) -> Result<AwsResponse, AwsServiceError> {
1790 let body = body(req);
1791 let cfg = FunctionScalingConfig {
1792 maximum_concurrency: body["MaximumConcurrency"].as_i64().unwrap_or(0),
1793 };
1794 let mut accounts = self.state.write();
1795 let state = accounts.get_or_create(&req.account_id);
1796 state.scaling_configs.insert(uuid.to_string(), cfg.clone());
1797 ok(json!({
1798 "MaximumConcurrency": cfg.maximum_concurrency,
1799 }))
1800 }
1801
1802 fn get_scaling_config(
1803 &self,
1804 uuid: &str,
1805 account_id: &str,
1806 ) -> Result<AwsResponse, AwsServiceError> {
1807 let region = self.region_for(account_id);
1808 self.with_state_read(account_id, ®ion, |state| {
1809 let n = state
1810 .scaling_configs
1811 .get(uuid)
1812 .map(|c| c.maximum_concurrency)
1813 .unwrap_or(0);
1814 ok(json!({"MaximumConcurrency": n}))
1815 })
1816 }
1817
1818 fn put_recursion_config(
1821 &self,
1822 function_name: &str,
1823 req: &AwsRequest,
1824 ) -> Result<AwsResponse, AwsServiceError> {
1825 let body = body(req);
1826 let mode = body["RecursiveLoop"]
1827 .as_str()
1828 .unwrap_or("Terminate")
1829 .to_string();
1830 let mut accounts = self.state.write();
1831 let state = accounts.get_or_create(&req.account_id);
1832 state
1833 .recursion_configs
1834 .insert(function_name.to_string(), mode.clone());
1835 ok(json!({"RecursiveLoop": mode}))
1836 }
1837
1838 fn get_recursion_config(
1839 &self,
1840 function_name: &str,
1841 account_id: &str,
1842 ) -> Result<AwsResponse, AwsServiceError> {
1843 let region = self.region_for(account_id);
1844 self.with_state_read(account_id, ®ion, |state| {
1845 let mode = state
1846 .recursion_configs
1847 .get(function_name)
1848 .cloned()
1849 .unwrap_or_else(|| "Terminate".to_string());
1850 ok(json!({"RecursiveLoop": mode}))
1851 })
1852 }
1853
1854 fn tag_resource(
1857 &self,
1858 resource_arn: &str,
1859 req: &AwsRequest,
1860 ) -> Result<AwsResponse, AwsServiceError> {
1861 let body = body(req);
1862 let new_tags: Vec<(String, String)> = body
1863 .get("Tags")
1864 .and_then(|v| v.as_object())
1865 .map(|m| {
1866 m.iter()
1867 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1868 .collect()
1869 })
1870 .unwrap_or_default();
1871 let resource_arn_decoded = decode_query_segment(resource_arn);
1874 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1875 AwsServiceError::aws_error(
1876 StatusCode::BAD_REQUEST,
1877 "InvalidParameterValueException",
1878 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1879 )
1880 })?;
1881 let mut accounts = self.state.write();
1882 let state = accounts.get_or_create(&req.account_id);
1883 let func = state.functions.get_mut(&name).ok_or_else(|| {
1884 AwsServiceError::aws_error(
1885 StatusCode::NOT_FOUND,
1886 "ResourceNotFoundException",
1887 format!("Function not found: {name}"),
1888 )
1889 })?;
1890 for (k, v) in new_tags {
1893 func.tags.insert(k, v);
1894 }
1895 empty()
1896 }
1897
1898 fn untag_resource(
1899 &self,
1900 resource_arn: &str,
1901 req: &AwsRequest,
1902 ) -> Result<AwsResponse, AwsServiceError> {
1903 let mut keys: Vec<String> = Vec::new();
1916 for (k, v) in parse_query_pairs(&req.raw_query) {
1917 if k == "tagKeys" || k.starts_with("tagKeys.") {
1918 keys.push(v);
1919 }
1920 }
1921 if keys.is_empty() {
1922 let parsed = body(req);
1923 for field in ["TagKeys", "tagKeys"] {
1924 if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
1925 for v in arr {
1926 if let Some(s) = v.as_str() {
1927 keys.push(s.to_string());
1928 }
1929 }
1930 if !keys.is_empty() {
1931 break;
1932 }
1933 }
1934 }
1935 }
1936 let resource_arn_decoded = decode_query_segment(resource_arn);
1937 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1938 AwsServiceError::aws_error(
1939 StatusCode::BAD_REQUEST,
1940 "InvalidParameterValueException",
1941 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1942 )
1943 })?;
1944 let mut accounts = self.state.write();
1945 let state = accounts.get_or_create(&req.account_id);
1946 let func = state.functions.get_mut(&name).ok_or_else(|| {
1947 AwsServiceError::aws_error(
1948 StatusCode::NOT_FOUND,
1949 "ResourceNotFoundException",
1950 format!("Function not found: {name}"),
1951 )
1952 })?;
1953 for k in &keys {
1954 func.tags.remove(k);
1955 }
1956 empty()
1957 }
1958
1959 fn list_tags(
1960 &self,
1961 resource_arn: &str,
1962 account_id: &str,
1963 ) -> Result<AwsResponse, AwsServiceError> {
1964 let resource_arn_decoded = decode_query_segment(resource_arn);
1965 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
1966 AwsServiceError::aws_error(
1967 StatusCode::BAD_REQUEST,
1968 "InvalidParameterValueException",
1969 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
1970 )
1971 })?;
1972 let region = self.region_for(account_id);
1973 self.with_state_read(account_id, ®ion, |state| {
1974 let func = state.functions.get(&name).ok_or_else(|| {
1975 AwsServiceError::aws_error(
1976 StatusCode::NOT_FOUND,
1977 "ResourceNotFoundException",
1978 format!("Function not found: {name}"),
1979 )
1980 })?;
1981 let tags: serde_json::Map<String, Value> = func
1982 .tags
1983 .iter()
1984 .map(|(k, v)| (k.clone(), Value::String(v.clone())))
1985 .collect();
1986 ok(json!({"Tags": tags}))
1987 })
1988 }
1989
1990 fn update_event_source_mapping_handler(
1993 &self,
1994 uuid: &str,
1995 req: &AwsRequest,
1996 ) -> Result<AwsResponse, AwsServiceError> {
1997 let body = body(req);
1998 let mut accounts = self.state.write();
1999 let state = accounts.get_or_create(&req.account_id);
2000 let esm = state
2001 .event_source_mappings
2002 .get_mut(uuid)
2003 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
2004 if let Some(b) = body["BatchSize"].as_i64() {
2005 esm.batch_size = b;
2006 }
2007 if let Some(name) = body["FunctionName"].as_str() {
2008 esm.function_arn = format!(
2009 "arn:aws:lambda:{}:{}:function:{}",
2010 state.region, state.account_id, name
2011 );
2012 }
2013 if let Some(filters) = body
2014 .get("FilterCriteria")
2015 .and_then(|v| v.get("Filters"))
2016 .and_then(|v| v.as_array())
2017 {
2018 esm.filter_patterns = filters
2019 .iter()
2020 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
2021 .collect();
2022 }
2023 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
2024 esm.function_response_types = types
2025 .iter()
2026 .filter_map(|v| v.as_str().map(String::from))
2027 .collect();
2028 }
2029 if let Some(w) = body
2030 .get("MaximumBatchingWindowInSeconds")
2031 .and_then(|v| v.as_i64())
2032 {
2033 esm.maximum_batching_window_in_seconds = Some(w);
2034 }
2035 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
2036 esm.parallelization_factor = Some(p);
2037 }
2038 if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
2039 esm.kms_key_arn = Some(s.to_string());
2040 }
2041 if let Some(mc) = body.get("MetricsConfig") {
2042 esm.metrics_config = Some(mc.clone());
2043 }
2044 if let Some(dc) = body.get("DestinationConfig") {
2045 esm.destination_config = Some(dc.clone());
2046 }
2047 if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
2048 esm.maximum_retry_attempts = Some(n);
2049 }
2050 if let Some(n) = body
2051 .get("MaximumRecordAgeInSeconds")
2052 .and_then(|v| v.as_i64())
2053 {
2054 esm.maximum_record_age_in_seconds = Some(n);
2055 }
2056 if let Some(b) = body
2057 .get("BisectBatchOnFunctionError")
2058 .and_then(|v| v.as_bool())
2059 {
2060 esm.bisect_batch_on_function_error = Some(b);
2061 }
2062 if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
2063 esm.tumbling_window_in_seconds = Some(n);
2064 }
2065 let mut body_json = json!({
2066 "UUID": esm.uuid,
2067 "FunctionArn": esm.function_arn,
2068 "EventSourceArn": esm.event_source_arn,
2069 "BatchSize": esm.batch_size,
2070 "State": "Enabled",
2071 "StateTransitionReason": "USER_INITIATED",
2072 "LastModified": chrono::Utc::now().timestamp() as f64,
2073 });
2074 let obj = body_json.as_object_mut().expect("json! built object");
2075 if !esm.filter_patterns.is_empty() {
2076 obj.insert(
2077 "FilterCriteria".into(),
2078 json!({
2079 "Filters": esm
2080 .filter_patterns
2081 .iter()
2082 .map(|p| json!({"Pattern": p}))
2083 .collect::<Vec<_>>(),
2084 }),
2085 );
2086 }
2087 if !esm.function_response_types.is_empty() {
2088 obj.insert(
2089 "FunctionResponseTypes".into(),
2090 json!(esm.function_response_types),
2091 );
2092 }
2093 if let Some(w) = esm.maximum_batching_window_in_seconds {
2094 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
2095 }
2096 if let Some(p) = esm.parallelization_factor {
2097 obj.insert("ParallelizationFactor".into(), json!(p));
2098 }
2099 ok(body_json)
2100 }
2101
2102 fn region_for(&self, account_id: &str) -> String {
2103 let accounts = self.state.read();
2104 accounts
2105 .get(account_id)
2106 .map(|s| s.region.clone())
2107 .unwrap_or_else(|| "us-east-1".to_string())
2108 }
2109
2110 pub(crate) async fn invoke_with_response_stream(
2124 &self,
2125 function_name: &str,
2126 account_id: &str,
2127 req: &AwsRequest,
2128 ) -> Result<AwsResponse, AwsServiceError> {
2129 let qualifier = req.query_params.get("Qualifier").map(String::as_str);
2134
2135 let resolved_version: Option<String> = {
2136 let accounts = self.state.read();
2137 let empty = LambdaState::new(account_id, "");
2138 let state = accounts.get(account_id).unwrap_or(&empty);
2139 crate::service::resolve_qualifier_to_version(state, function_name, qualifier)
2140 };
2141 let executed_version = resolved_version
2142 .clone()
2143 .unwrap_or_else(|| "$LATEST".to_string());
2144
2145 let (func, layer_zips) = {
2146 let accounts = self.state.read();
2147 let empty = LambdaState::new(account_id, "");
2148 let state = accounts.get(account_id).unwrap_or(&empty);
2149 let func = match resolved_version.as_deref() {
2150 Some(v) => state
2151 .function_version_snapshots
2152 .get(function_name)
2153 .and_then(|m| m.get(v))
2154 .cloned()
2155 .or_else(|| state.functions.get(function_name).cloned()),
2156 None => state.functions.get(function_name).cloned(),
2157 }
2158 .ok_or_else(|| {
2159 AwsServiceError::aws_error(
2160 StatusCode::NOT_FOUND,
2161 "ResourceNotFoundException",
2162 format!(
2163 "Function not found: arn:aws:lambda:{}:{}:function:{}",
2164 state.region, state.account_id, function_name
2165 ),
2166 )
2167 })?;
2168 let mut zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
2169 for attached in &func.layers {
2170 if let Some(b) =
2171 parse_layer_version_arn(&attached.arn).and_then(|(acct, name, ver)| {
2172 accounts
2173 .get(&acct)
2174 .and_then(|s| s.layers.get(&name))
2175 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
2176 .and_then(|v| v.code_zip.clone())
2177 })
2178 {
2179 zips.push(b);
2180 }
2181 }
2182 (func, zips)
2183 };
2184
2185 if func.code_zip.is_none() && func.package_type != "Image" {
2186 return Err(AwsServiceError::aws_error(
2187 StatusCode::BAD_REQUEST,
2188 "InvalidParameterValueException",
2189 "Function has no deployment package",
2190 ));
2191 }
2192
2193 let runtime = self.runtime.as_ref().ok_or_else(|| {
2194 AwsServiceError::aws_error(
2195 StatusCode::INTERNAL_SERVER_ERROR,
2196 "ServiceException",
2197 "Docker/Podman is required for Lambda execution but is not available",
2198 )
2199 })?;
2200
2201 let mut frames: Vec<u8> = Vec::new();
2207 let invoke_result = runtime
2208 .invoke_streaming(&func, &req.body, &layer_zips)
2209 .await;
2210
2211 let (error_code, error_details) = match invoke_result {
2212 Ok(mut stream) => {
2213 let mut last_chunk: Option<bytes::Bytes> = None;
2214 let mut had_chunks = false;
2215 loop {
2216 match stream.next_chunk().await {
2217 Ok(Some(chunk)) => {
2218 had_chunks = true;
2219 frames.extend_from_slice(&crate::eventstream::payload_chunk_frame(
2220 &chunk,
2221 ));
2222 last_chunk = Some(chunk);
2223 }
2224 Ok(None) => break,
2225 Err(e) => {
2226 tracing::error!(function = %function_name, error = %e, "Lambda streaming chunk read failed");
2227 return Err(AwsServiceError::aws_error(
2228 StatusCode::INTERNAL_SERVER_ERROR,
2229 "ServiceException",
2230 format!("Lambda streaming read failed: {e}"),
2231 ));
2232 }
2233 }
2234 }
2235
2236 let mut error: Option<(String, String)> = None;
2243 if had_chunks {
2244 if let Some(bytes) = last_chunk {
2245 if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
2246 if let Some(obj) = v.as_object() {
2247 if obj.contains_key("errorMessage") || obj.contains_key("errorType")
2248 {
2249 let etype = obj
2250 .get("errorType")
2251 .and_then(|x| x.as_str())
2252 .unwrap_or("Runtime.Unknown")
2253 .to_string();
2254 let emsg = obj
2255 .get("errorMessage")
2256 .and_then(|x| x.as_str())
2257 .unwrap_or("function error")
2258 .to_string();
2259 error = Some((etype, emsg));
2260 }
2261 }
2262 }
2263 }
2264 }
2265 match error {
2266 Some((code, details)) => (Some(code), Some(details)),
2267 None => (None, None),
2268 }
2269 }
2270 Err(e) => {
2271 tracing::error!(function = %function_name, error = %e, "Lambda streaming invocation failed");
2272 (
2273 Some("Runtime.InvocationFailure".to_string()),
2274 Some(e.to_string()),
2275 )
2276 }
2277 };
2278
2279 frames.extend_from_slice(&crate::eventstream::invoke_complete_frame(
2280 error_code.as_deref(),
2281 error_details.as_deref(),
2282 "",
2283 ));
2284
2285 let mut resp = AwsResponse {
2286 status: StatusCode::OK,
2287 content_type: "application/vnd.amazon.eventstream".to_string(),
2288 body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(frames)),
2289 headers: http::HeaderMap::new(),
2290 };
2291 if let Ok(v) = http::HeaderValue::from_str(&executed_version) {
2292 resp.headers
2293 .insert(http::HeaderName::from_static("x-amz-executed-version"), v);
2294 }
2295 Ok(resp)
2296 }
2297}
2298
2299fn extract_csc_id(input: &str) -> String {
2300 let decoded = percent_decode(input);
2303 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
2304}
2305
2306fn percent_decode(input: &str) -> String {
2307 let mut out = String::with_capacity(input.len());
2308 let bytes = input.as_bytes();
2309 let mut i = 0;
2310 while i < bytes.len() {
2311 if bytes[i] == b'%' && i + 2 < bytes.len() {
2312 let hi = (bytes[i + 1] as char).to_digit(16);
2313 let lo = (bytes[i + 2] as char).to_digit(16);
2314 if let (Some(h), Some(l)) = (hi, lo) {
2315 out.push(((h * 16 + l) as u8) as char);
2316 i += 3;
2317 continue;
2318 }
2319 }
2320 out.push(bytes[i] as char);
2321 i += 1;
2322 }
2323 out
2324}
2325
2326fn code_signing_json(c: &CodeSigningConfig) -> Value {
2327 json!({
2328 "CodeSigningConfigId": c.csc_id,
2329 "CodeSigningConfigArn": c.csc_arn,
2330 "Description": c.description,
2331 "AllowedPublishers": {
2332 "SigningProfileVersionArns": c.allowed_publishers,
2333 },
2334 "CodeSigningPolicies": {
2335 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
2336 },
2337 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
2338 })
2339}
2340
2341fn event_invoke_json(c: &EventInvokeConfig) -> Value {
2342 json!({
2343 "FunctionArn": c.function_arn,
2344 "MaximumEventAgeInSeconds": c.maximum_event_age,
2345 "MaximumRetryAttempts": c.maximum_retry_attempts,
2346 "DestinationConfig": c.destination_config,
2347 "LastModified": c.last_modified.timestamp(),
2348 })
2349}
2350
2351#[cfg(test)]
2352mod tests {
2353 use crate::service::LambdaService;
2354 use crate::state::{LambdaState, SharedLambdaState};
2355 use fakecloud_core::multi_account::MultiAccountState;
2356 use fakecloud_core::service::AwsRequest;
2357 use http::Method;
2358 use parking_lot::RwLock;
2359 use std::collections::HashMap;
2360 use std::sync::Arc;
2361
2362 fn svc() -> LambdaService {
2363 let state: SharedLambdaState = Arc::new(RwLock::new(
2364 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
2365 ));
2366 LambdaService::new(state)
2367 }
2368
2369 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
2370 AwsRequest {
2371 service: "lambda".to_string(),
2372 method: Method::POST,
2373 raw_path: format!("/{}", segs.join("/")),
2374 raw_query: String::new(),
2375 path_segments: segs.iter().map(|s| s.to_string()).collect(),
2376 query_params: HashMap::new(),
2377 headers: http::HeaderMap::new(),
2378 body: bytes::Bytes::from(body.to_string()),
2379 body_stream: parking_lot::Mutex::new(None),
2380 account_id: "000000000000".to_string(),
2381 region: "us-east-1".to_string(),
2382 request_id: "rid".to_string(),
2383 action: action.to_string(),
2384 is_query_protocol: false,
2385 access_key_id: None,
2386 principal: None,
2387 }
2388 }
2389
2390 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
2391 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
2392 match r {
2393 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
2394 Err(e) => panic!("{action} failed: {e:?}"),
2395 }
2396 }
2397
2398 #[tokio::test]
2399 async fn read_only_listings_succeed_without_state() {
2400 let s = svc();
2401 run(&s, "GetAccountSettings", "", None, &[]).await;
2402 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
2403 run(&s, "ListLayers", "", None, &[]).await;
2404 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
2405 }
2406
2407 #[tokio::test]
2408 async fn layers_lifecycle() {
2409 let s = svc();
2410 run(
2411 &s,
2412 "PublishLayerVersion",
2413 r#"{"Content":{"ZipFile":""}}"#,
2414 Some("layer1"),
2415 &["2018-10-31", "layers", "layer1", "versions"],
2416 )
2417 .await;
2418 run(&s, "ListLayers", "", None, &[]).await;
2419 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
2420 }
2421
2422 #[tokio::test]
2423 async fn code_signing_lifecycle() {
2424 let s = svc();
2425 run(
2426 &s,
2427 "CreateCodeSigningConfig",
2428 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
2429 None,
2430 &[],
2431 )
2432 .await;
2433 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
2434 }
2435}