1use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15 AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16 FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17 ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20pub(crate) fn resolve_layer_attachments(
25 accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26 arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28 arns.into_iter()
29 .map(|arn| {
30 let code_size = parse_layer_version_arn(&arn)
31 .and_then(|(acct, name, ver)| {
32 accounts
33 .get(&acct)
34 .and_then(|s| s.layers.get(&name))
35 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36 .map(|v| v.code_size)
37 })
38 .unwrap_or(0);
39 AttachedLayer { arn, code_size }
40 })
41 .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 format!("Missing required field: {name}"),
49 )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53 AwsServiceError::aws_error(
54 StatusCode::NOT_FOUND,
55 "ResourceNotFoundException",
56 format!("{entity} not found: {name}"),
57 )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72fn function_name_from_arn(arn: &str) -> Option<String> {
79 let rest = arn.strip_prefix("arn:aws:lambda:")?;
80 let mut parts = rest.splitn(5, ':');
81 let _region = parts.next()?;
82 let _account = parts.next()?;
83 let resource_kind = parts.next()?;
84 if resource_kind != "function" {
85 return None;
86 }
87 let name_with_qualifier = parts.next()?;
88 Some(
89 name_with_qualifier
90 .split(':')
91 .next()
92 .unwrap_or(name_with_qualifier)
93 .to_string(),
94 )
95}
96
97fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104 raw_query
105 .split('&')
106 .filter(|s| !s.is_empty())
107 .map(|pair| {
108 let mut it = pair.splitn(2, '=');
109 let k = it.next().unwrap_or("");
110 let v = it.next().unwrap_or("");
111 (decode_query_segment(k), decode_query_segment(v))
112 })
113 .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117 let plus_decoded = s.replace('+', " ");
120 percent_encoding::percent_decode_str(&plus_decoded)
121 .decode_utf8_lossy()
122 .into_owned()
123}
124
125fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130 let host = req
131 .headers
132 .get(http::header::HOST)
133 .and_then(|h| h.to_str().ok())
134 .unwrap_or("localhost");
135 let scheme = req
136 .headers
137 .get("x-forwarded-proto")
138 .and_then(|h| h.to_str().ok())
139 .unwrap_or("http");
140 format!(
141 "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142 )
143}
144
145pub(crate) fn function_code_url(
150 req: &AwsRequest,
151 account_id: &str,
152 function_name: &str,
153 version_label: &str,
154) -> String {
155 let host = req
156 .headers
157 .get(http::header::HOST)
158 .and_then(|h| h.to_str().ok())
159 .unwrap_or("localhost");
160 let scheme = req
161 .headers
162 .get("x-forwarded-proto")
163 .and_then(|h| h.to_str().ok())
164 .unwrap_or("http");
165 let file = if version_label == "$LATEST" {
166 "latest.zip".to_string()
167 } else {
168 format!("{version_label}.zip")
169 };
170 format!("{scheme}://{host}/_fakecloud/lambda/function-code/{account_id}/{function_name}/{file}")
171}
172
173pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
177 let parts: Vec<&str> = arn.split(':').collect();
178 if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
179 return None;
180 }
181 let account = parts[4].to_string();
182 let name = parts[6].to_string();
183 let version: i64 = parts[7].parse().ok()?;
184 Some((account, name, version))
185}
186
187const LAMBDA_RUNTIMES: &[&str] = &[
191 "nodejs",
192 "nodejs4.3",
193 "nodejs6.10",
194 "nodejs8.10",
195 "nodejs10.x",
196 "nodejs12.x",
197 "nodejs14.x",
198 "nodejs16.x",
199 "nodejs18.x",
200 "nodejs20.x",
201 "nodejs22.x",
202 "nodejs24.x",
203 "nodejs4.3-edge",
204 "java8",
205 "java8.al2",
206 "java11",
207 "java17",
208 "java21",
209 "java25",
210 "python2.7",
211 "python3.6",
212 "python3.7",
213 "python3.8",
214 "python3.9",
215 "python3.10",
216 "python3.11",
217 "python3.12",
218 "python3.13",
219 "python3.14",
220 "dotnetcore1.0",
221 "dotnetcore2.0",
222 "dotnetcore2.1",
223 "dotnetcore3.1",
224 "dotnet6",
225 "dotnet8",
226 "dotnet10",
227 "go1.x",
228 "ruby2.5",
229 "ruby2.7",
230 "ruby3.2",
231 "ruby3.3",
232 "ruby3.4",
233 "provided",
234 "provided.al2",
235 "provided.al2023",
236];
237
238fn validate_layer_filters(req: &AwsRequest) -> Result<(), AwsServiceError> {
241 if let Some(arch) = req.query_params.get("CompatibleArchitecture") {
242 if arch != "x86_64" && arch != "arm64" {
243 return Err(AwsServiceError::aws_error(
244 StatusCode::BAD_REQUEST,
245 "InvalidParameterValueException",
246 format!(
247 "Invalid CompatibleArchitecture value '{}'; expected 'x86_64' or 'arm64'",
248 arch
249 ),
250 ));
251 }
252 }
253 if let Some(rt) = req.query_params.get("CompatibleRuntime") {
254 if !LAMBDA_RUNTIMES.contains(&rt.as_str()) {
255 return Err(AwsServiceError::aws_error(
256 StatusCode::BAD_REQUEST,
257 "InvalidParameterValueException",
258 format!("Invalid CompatibleRuntime value '{}'", rt),
259 ));
260 }
261 }
262 Ok(())
263}
264
265fn parse_qualifier(req: &AwsRequest) -> String {
266 req.query_params
267 .get("Qualifier")
268 .cloned()
269 .unwrap_or_else(|| "$LATEST".to_string())
270}
271
272fn require_qualifier(req: &AwsRequest) -> Result<String, AwsServiceError> {
277 req.query_params.get("Qualifier").cloned().ok_or_else(|| {
278 AwsServiceError::aws_error(
279 StatusCode::BAD_REQUEST,
280 "InvalidParameterValueException",
281 "Qualifier is required for this operation",
282 )
283 })
284}
285
286fn id_from_time(prefix: &str) -> String {
287 format!(
288 "{}{}",
289 prefix,
290 std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .map(|d| d.as_nanos())
293 .unwrap_or(0)
294 )
295}
296
297mod account;
298mod aliases;
299mod code_signing;
300mod concurrency;
301mod event_invoke;
302mod function_url;
303mod layers;
304mod recursion;
305mod runtime;
306mod stream;
307
308impl LambdaService {
309 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
310 where
311 F: FnOnce(&LambdaState) -> R,
312 {
313 let accounts = self.state.read();
314 let empty = LambdaState::new(account_id, region);
315 let state = accounts.get(account_id).unwrap_or(&empty);
316 f(state)
317 }
318
319 fn get_function_configuration(
322 &self,
323 function_name: &str,
324 account_id: &str,
325 req: &AwsRequest,
326 ) -> Result<AwsResponse, AwsServiceError> {
327 let region = self.region_for(account_id);
328 let qualifier = req.query_params.get("Qualifier").cloned();
329 self.with_state_read(account_id, ®ion, |state| {
330 let live = state
331 .functions
332 .get(function_name)
333 .ok_or_else(|| not_found("Function", function_name))?;
334 let resolved = crate::service::resolve_qualifier_to_version(
338 state,
339 function_name,
340 qualifier.as_deref(),
341 );
342 let (func, version_label) = match resolved {
343 None => (live, "$LATEST".to_string()),
344 Some(v) => {
345 let snap = state
346 .function_version_snapshots
347 .get(function_name)
348 .and_then(|m| m.get(&v))
349 .ok_or_else(|| not_found("Function", function_name))?;
350 (snap, v)
351 }
352 };
353 let mut config = self.function_config_json(func);
354 config["Version"] = json!(version_label);
355 if version_label != "$LATEST" {
356 config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
357 config["MasterArn"] = json!(live.function_arn);
358 }
359 ok(config)
360 })
361 }
362
363 fn update_function_configuration(
364 &self,
365 function_name: &str,
366 req: &AwsRequest,
367 ) -> Result<AwsResponse, AwsServiceError> {
368 let body = body(req);
369 let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
373 Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
374 None => None,
375 };
376 let mut accounts = self.state.write();
377 let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
380 let arns: Vec<String> = arr
381 .iter()
382 .filter_map(|v| v.as_str().map(String::from))
383 .collect();
384 resolve_layer_attachments(&accounts, arns)
385 });
386 let state = accounts.get_or_create(&req.account_id);
387 let func = state
388 .functions
389 .get_mut(function_name)
390 .ok_or_else(|| not_found("Function", function_name))?;
391 if let Some(handler) = body["Handler"].as_str() {
392 func.handler = handler.to_string();
393 }
394 if let Some(t) = body["Timeout"].as_i64() {
395 func.timeout = t;
396 }
397 if let Some(m) = body["MemorySize"].as_i64() {
398 func.memory_size = m;
399 }
400 if let Some(role) = body["Role"].as_str() {
401 func.role = role.to_string();
402 }
403 if let Some(desc) = body["Description"].as_str() {
404 func.description = desc.to_string();
405 }
406 if let Some(rt) = body["Runtime"].as_str() {
407 func.runtime = rt.to_string();
408 }
409 if let Some(env) = body["Environment"]["Variables"].as_object() {
410 func.environment = env
411 .iter()
412 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
413 .collect();
414 }
415 if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
416 func.tracing_mode = Some(mode.to_string());
417 }
418 if let Some(arn) = body["KMSKeyArn"].as_str() {
419 func.kms_key_arn = if arn.is_empty() {
420 None
421 } else {
422 Some(arn.to_string())
423 };
424 }
425 if let Some(size) = validated_ephemeral {
426 func.ephemeral_storage_size = Some(size);
427 }
428 if body["VpcConfig"].is_object() {
429 func.vpc_config = Some(body["VpcConfig"].clone());
430 }
431 if body["SnapStart"].is_object() {
432 func.snap_start = Some(body["SnapStart"].clone());
433 }
434 if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
435 func.dead_letter_config_arn = if arn.is_empty() {
436 None
437 } else {
438 Some(arn.to_string())
439 };
440 }
441 if let Some(fsc) = body["FileSystemConfigs"].as_array() {
442 func.file_system_configs = fsc.clone();
443 }
444 if body["LoggingConfig"].is_object() {
445 func.logging_config = Some(body["LoggingConfig"].clone());
446 }
447 if body["ImageConfig"].is_object() {
448 func.image_config = Some(body["ImageConfig"].clone());
449 }
450 if body["DurableConfig"].is_object() {
451 func.durable_config = Some(body["DurableConfig"].clone());
452 }
453 if let Some(attachments) = layer_attachments {
454 func.layers = attachments;
455 }
456 func.revision_id = uuid::Uuid::new_v4().to_string();
460 func.last_modified = Utc::now();
461 ok(self.function_config_json(func))
462 }
463
464 fn update_function_code(
465 &self,
466 function_name: &str,
467 req: &AwsRequest,
468 ) -> Result<AwsResponse, AwsServiceError> {
469 let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
470
471 let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
476 Some(b64) => Some(
477 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
478 |_| {
479 AwsServiceError::aws_error(
480 StatusCode::BAD_REQUEST,
481 "InvalidParameterValueException",
482 "Could not decode ZipFile: invalid base64",
483 )
484 },
485 )?,
486 ),
487 None => None,
488 };
489 let new_image_uri = body["ImageUri"].as_str().map(String::from);
490 let s3_fetched_zip: Option<Vec<u8>> = match (
503 body["S3Bucket"].as_str(),
504 body["S3Key"].as_str(),
505 ) {
506 (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
507 if let Some(s3) = &self.s3_delivery {
508 match s3.get_object(&req.account_id, bucket, key) {
509 Ok(bytes) => Some(bytes),
510 Err(e) => {
511 return Err(AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "InvalidParameterValueException",
514 format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
515 ));
516 }
517 }
518 } else {
519 None
520 }
521 }
522 _ => None,
523 };
524
525 let new_s3_descriptor: Option<Vec<u8>> =
526 match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
527 (Some(bucket), Some(key))
528 if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
529 {
530 let mut descriptor = serde_json::Map::new();
531 descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
532 descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
533 if let Some(ver) = body["S3ObjectVersion"].as_str() {
534 descriptor.insert(
535 "S3ObjectVersion".to_string(),
536 Value::String(ver.to_string()),
537 );
538 }
539 Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
540 }
541 _ => None,
542 };
543 let new_zip = new_zip.or(s3_fetched_zip);
544 let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
545 let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
546 let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
547 arr.iter()
548 .filter_map(|v| v.as_str().map(String::from))
549 .collect()
550 });
551 let dry_run = body["DryRun"].as_bool().unwrap_or(false);
552 let publish = body["Publish"].as_bool().unwrap_or(false);
553
554 let mut accounts = self.state.write();
555 let state = accounts.get_or_create(&req.account_id);
556
557 if !state.functions.contains_key(function_name) {
561 return Err(not_found("Function", function_name));
562 }
563
564 if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
569 let csc_id = extract_csc_id(&csc_arn);
570 if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
571 if !csc.allowed_publishers.is_empty()
572 && csc
573 .untrusted_artifact_action
574 .eq_ignore_ascii_case("Enforce")
575 {
576 let allowed = match supplied_signing_profile.as_deref() {
577 Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
578 None => false,
579 };
580 if !allowed {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "CodeVerificationFailedException",
584 "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
585 ));
586 }
587 }
588 }
589 }
590
591 let func = state
592 .functions
593 .get_mut(function_name)
594 .ok_or_else(|| not_found("Function", function_name))?;
595
596 if let Some(ref rev) = supplied_revision_id {
600 if rev != &func.revision_id {
601 return Err(AwsServiceError::aws_error(
602 StatusCode::PRECONDITION_FAILED,
603 "PreconditionFailedException",
604 format!(
605 "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
606 ),
607 ));
608 }
609 }
610
611 if dry_run {
613 return ok(self.function_config_json(func));
614 }
615
616 let mut changed = false;
617 if let Some(bytes) = new_zip {
618 let mut hasher = Sha256::new();
621 hasher.update(&bytes);
622 let hash = hasher.finalize();
623 let code_sha256 =
624 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
625 if code_sha256 != func.code_sha256 {
626 changed = true;
627 }
628 func.code_size = bytes.len() as i64;
629 func.code_zip = Some(bytes);
630 func.code_sha256 = code_sha256;
631 func.image_uri = None;
632 func.package_type = "Zip".to_string();
633 } else if let Some(descriptor_bytes) = new_s3_descriptor {
634 let mut hasher = Sha256::new();
641 hasher.update(&descriptor_bytes);
642 let hash = hasher.finalize();
643 let code_sha256 =
644 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
645 if code_sha256 != func.code_sha256 {
646 changed = true;
647 }
648 func.code_size = descriptor_bytes.len() as i64;
649 func.code_zip = None;
653 func.code_sha256 = code_sha256;
654 func.image_uri = None;
655 func.package_type = "Zip".to_string();
656 } else if let Some(uri) = new_image_uri {
657 if func.image_uri.as_deref() != Some(uri.as_str()) {
658 changed = true;
659 }
660 func.image_uri = Some(uri);
661 func.code_zip = None;
662 func.package_type = "Image".to_string();
663 func.code_size = 0;
667 func.code_sha256 = String::new();
668 }
669
670 if let Some(arns) = new_architectures {
671 if !arns.is_empty() && arns != func.architectures {
672 changed = true;
673 func.architectures = arns;
674 }
675 }
676
677 if let Some(arn) = supplied_signing_profile {
678 if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
679 changed = true;
680 }
681 func.signing_profile_version_arn = Some(arn);
682 }
683
684 func.last_modified = Utc::now();
689 if changed {
690 func.revision_id = uuid::Uuid::new_v4().to_string();
691 }
692 func.last_update_status_reason = None;
696 func.last_update_status_reason_code = None;
697
698 if publish {
701 drop(accounts);
702 return self.publish_version(function_name, &req.account_id, req);
703 }
704
705 ok(self.function_config_json(func))
706 }
707
708 fn list_versions_by_function(
711 &self,
712 function_name: &str,
713 account_id: &str,
714 req: &AwsRequest,
715 ) -> Result<AwsResponse, AwsServiceError> {
716 let region = self.region_for(account_id);
717 let max_items: usize = req
718 .query_params
719 .get("MaxItems")
720 .and_then(|v| v.parse::<usize>().ok())
721 .map(|n| n.clamp(1, 50))
722 .unwrap_or(50);
723 let marker = req.query_params.get("Marker").cloned();
724 self.with_state_read(account_id, ®ion, |state| {
725 let func = state
726 .functions
727 .get(function_name)
728 .ok_or_else(|| not_found("Function", function_name))?;
729 let mut all: Vec<serde_json::Value> = Vec::new();
733 let mut latest = self.function_config_json(func);
734 latest["Version"] = json!("$LATEST");
735 latest["FunctionArn"] = json!(format!("{}:$LATEST", func.function_arn));
740 all.push(latest);
741 let snapshots = state.function_version_snapshots.get(function_name);
742 if let Some(numbered) = state.function_versions.get(function_name) {
743 for v in numbered {
744 let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
745 let mut cfg = self.function_config_json(snap);
746 cfg["Version"] = json!(v);
747 cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
748 cfg["MasterArn"] = json!(func.function_arn);
749 all.push(cfg);
750 }
751 }
752 let start = match marker.as_deref() {
756 Some(m) => all
757 .iter()
758 .position(|v| v["Version"].as_str() == Some(m))
759 .map(|i| i + 1)
760 .unwrap_or(0),
761 None => 0,
762 };
763 let end = (start + max_items).min(all.len());
764 let page: Vec<serde_json::Value> = all[start..end].to_vec();
765 let mut body = json!({ "Versions": page });
766 if end < all.len() {
767 if let Some(last) = all[end - 1]["Version"].as_str() {
768 body["NextMarker"] = json!(last);
769 }
770 }
771 ok(body)
772 })
773 }
774
775 fn pc_key(function: &str, qualifier: &str) -> String {
776 format!("{function}:{qualifier}")
777 }
778
779 fn tag_resource(
782 &self,
783 resource_arn: &str,
784 req: &AwsRequest,
785 ) -> Result<AwsResponse, AwsServiceError> {
786 let body = body(req);
787 let new_tags: Vec<(String, String)> = body
788 .get("Tags")
789 .and_then(|v| v.as_object())
790 .map(|m| {
791 m.iter()
792 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
793 .collect()
794 })
795 .unwrap_or_default();
796 let resource_arn_decoded = decode_query_segment(resource_arn);
799 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
800 AwsServiceError::aws_error(
801 StatusCode::BAD_REQUEST,
802 "InvalidParameterValueException",
803 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
804 )
805 })?;
806 let mut accounts = self.state.write();
807 let state = accounts.get_or_create(&req.account_id);
808 let func = state.functions.get_mut(&name).ok_or_else(|| {
809 AwsServiceError::aws_error(
810 StatusCode::NOT_FOUND,
811 "ResourceNotFoundException",
812 format!("Function not found: {name}"),
813 )
814 })?;
815 for (k, v) in new_tags {
818 func.tags.insert(k, v);
819 }
820 empty()
821 }
822
823 fn untag_resource(
824 &self,
825 resource_arn: &str,
826 req: &AwsRequest,
827 ) -> Result<AwsResponse, AwsServiceError> {
828 let mut keys: Vec<String> = Vec::new();
841 for (k, v) in parse_query_pairs(&req.raw_query) {
842 if k == "tagKeys" || k.starts_with("tagKeys.") {
843 keys.push(v);
844 }
845 }
846 if keys.is_empty() {
847 let parsed = body(req);
848 for field in ["TagKeys", "tagKeys"] {
849 if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
850 for v in arr {
851 if let Some(s) = v.as_str() {
852 keys.push(s.to_string());
853 }
854 }
855 if !keys.is_empty() {
856 break;
857 }
858 }
859 }
860 }
861 let resource_arn_decoded = decode_query_segment(resource_arn);
862 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
863 AwsServiceError::aws_error(
864 StatusCode::BAD_REQUEST,
865 "InvalidParameterValueException",
866 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
867 )
868 })?;
869 let mut accounts = self.state.write();
870 let state = accounts.get_or_create(&req.account_id);
871 let func = state.functions.get_mut(&name).ok_or_else(|| {
872 AwsServiceError::aws_error(
873 StatusCode::NOT_FOUND,
874 "ResourceNotFoundException",
875 format!("Function not found: {name}"),
876 )
877 })?;
878 for k in &keys {
879 func.tags.remove(k);
880 }
881 empty()
882 }
883
884 fn list_tags(
885 &self,
886 resource_arn: &str,
887 account_id: &str,
888 ) -> Result<AwsResponse, AwsServiceError> {
889 let resource_arn_decoded = decode_query_segment(resource_arn);
890 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
891 AwsServiceError::aws_error(
892 StatusCode::BAD_REQUEST,
893 "InvalidParameterValueException",
894 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
895 )
896 })?;
897 let region = self.region_for(account_id);
898 self.with_state_read(account_id, ®ion, |state| {
899 let func = state.functions.get(&name).ok_or_else(|| {
900 AwsServiceError::aws_error(
901 StatusCode::NOT_FOUND,
902 "ResourceNotFoundException",
903 format!("Function not found: {name}"),
904 )
905 })?;
906 let tags: serde_json::Map<String, Value> = func
907 .tags
908 .iter()
909 .map(|(k, v)| (k.clone(), Value::String(v.clone())))
910 .collect();
911 ok(json!({"Tags": tags}))
912 })
913 }
914
915 fn update_event_source_mapping_handler(
918 &self,
919 uuid: &str,
920 req: &AwsRequest,
921 ) -> Result<AwsResponse, AwsServiceError> {
922 let body = body(req);
923 let mut accounts = self.state.write();
924 let state = accounts.get_or_create(&req.account_id);
925 let esm = state
926 .event_source_mappings
927 .get_mut(uuid)
928 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
929 if let Some(b) = body["BatchSize"].as_i64() {
930 esm.batch_size = b;
931 }
932 if let Some(name) = body["FunctionName"].as_str() {
933 esm.function_arn = format!(
934 "arn:aws:lambda:{}:{}:function:{}",
935 state.region, state.account_id, name
936 );
937 }
938 if let Some(filters) = body
939 .get("FilterCriteria")
940 .and_then(|v| v.get("Filters"))
941 .and_then(|v| v.as_array())
942 {
943 esm.filter_patterns = filters
944 .iter()
945 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
946 .collect();
947 }
948 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
949 esm.function_response_types = types
950 .iter()
951 .filter_map(|v| v.as_str().map(String::from))
952 .collect();
953 }
954 if let Some(w) = body
955 .get("MaximumBatchingWindowInSeconds")
956 .and_then(|v| v.as_i64())
957 {
958 esm.maximum_batching_window_in_seconds = Some(w);
959 }
960 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
961 esm.parallelization_factor = Some(p);
962 }
963 if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
964 esm.kms_key_arn = Some(s.to_string());
965 }
966 if let Some(mc) = body.get("MetricsConfig") {
967 esm.metrics_config = Some(mc.clone());
968 }
969 if let Some(dc) = body.get("DestinationConfig") {
970 esm.destination_config = Some(dc.clone());
971 }
972 if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
973 esm.maximum_retry_attempts = Some(n);
974 }
975 if let Some(n) = body
976 .get("MaximumRecordAgeInSeconds")
977 .and_then(|v| v.as_i64())
978 {
979 esm.maximum_record_age_in_seconds = Some(n);
980 }
981 if let Some(b) = body
982 .get("BisectBatchOnFunctionError")
983 .and_then(|v| v.as_bool())
984 {
985 esm.bisect_batch_on_function_error = Some(b);
986 }
987 if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
988 esm.tumbling_window_in_seconds = Some(n);
989 }
990 if let Some(enabled) = body.get("Enabled").and_then(|v| v.as_bool()) {
995 esm.enabled = enabled;
996 esm.state = if enabled { "Enabled" } else { "Disabled" }.to_string();
997 }
998 if let Some(sac) = body
1002 .get("SourceAccessConfigurations")
1003 .and_then(|v| v.as_array())
1004 {
1005 esm.source_access_configurations = sac.clone();
1006 }
1007 esm.last_modified = chrono::Utc::now();
1008 let response = self.event_source_mapping_json(esm);
1012 ok(response)
1013 }
1014
1015 fn region_for(&self, account_id: &str) -> String {
1016 let accounts = self.state.read();
1017 accounts
1018 .get(account_id)
1019 .map(|s| s.region.clone())
1020 .unwrap_or_else(|| "us-east-1".to_string())
1021 }
1022}
1023
1024fn extract_csc_id(input: &str) -> String {
1025 let decoded = percent_decode(input);
1028 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1029}
1030
1031pub(crate) fn percent_decode_for_length(input: &str) -> String {
1034 percent_decode(input)
1035}
1036
1037fn percent_decode(input: &str) -> String {
1038 let mut out = String::with_capacity(input.len());
1039 let bytes = input.as_bytes();
1040 let mut i = 0;
1041 while i < bytes.len() {
1042 if bytes[i] == b'%' && i + 2 < bytes.len() {
1043 let hi = (bytes[i + 1] as char).to_digit(16);
1044 let lo = (bytes[i + 2] as char).to_digit(16);
1045 if let (Some(h), Some(l)) = (hi, lo) {
1046 out.push(((h * 16 + l) as u8) as char);
1047 i += 3;
1048 continue;
1049 }
1050 }
1051 out.push(bytes[i] as char);
1052 i += 1;
1053 }
1054 out
1055}
1056
1057fn code_signing_json(c: &CodeSigningConfig) -> Value {
1058 json!({
1059 "CodeSigningConfigId": c.csc_id,
1060 "CodeSigningConfigArn": c.csc_arn,
1061 "Description": c.description,
1062 "AllowedPublishers": {
1063 "SigningProfileVersionArns": c.allowed_publishers,
1064 },
1065 "CodeSigningPolicies": {
1066 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1067 },
1068 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1069 })
1070}
1071
1072fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1073 let destination = match &c.destination_config {
1084 None => json!({"OnSuccess": {}, "OnFailure": {}}),
1085 Some(v) if !v.is_object() => json!({}),
1086 Some(v) => {
1087 let mut map = v.as_object().cloned().unwrap_or_default();
1088 if !map.is_empty() {
1089 map.entry("OnSuccess".to_string()).or_insert(json!({}));
1090 map.entry("OnFailure".to_string()).or_insert(json!({}));
1091 }
1092 Value::Object(map)
1093 }
1094 };
1095 let mut out = json!({
1096 "FunctionArn": c.function_arn,
1097 "MaximumRetryAttempts": c.maximum_retry_attempts,
1098 "DestinationConfig": destination,
1099 "LastModified": c
1105 .last_modified
1106 .timestamp_millis() as f64
1107 / 1000.0,
1108 });
1109 if c.maximum_event_age != 0 {
1111 out["MaximumEventAgeInSeconds"] = json!(c.maximum_event_age);
1112 }
1113 out
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use crate::service::LambdaService;
1119 use crate::state::{LambdaState, SharedLambdaState};
1120 use fakecloud_core::multi_account::MultiAccountState;
1121 use fakecloud_core::service::AwsRequest;
1122 use http::Method;
1123 use parking_lot::RwLock;
1124 use std::collections::HashMap;
1125 use std::sync::Arc;
1126
1127 fn svc() -> LambdaService {
1128 let state: SharedLambdaState = Arc::new(RwLock::new(
1129 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1130 ));
1131 LambdaService::new(state)
1132 }
1133
1134 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1135 AwsRequest {
1136 service: "lambda".to_string(),
1137 method: Method::POST,
1138 raw_path: format!("/{}", segs.join("/")),
1139 raw_query: String::new(),
1140 path_segments: segs.iter().map(|s| s.to_string()).collect(),
1141 query_params: HashMap::new(),
1142 headers: http::HeaderMap::new(),
1143 body: bytes::Bytes::from(body.to_string()),
1144 body_stream: parking_lot::Mutex::new(None),
1145 account_id: "000000000000".to_string(),
1146 region: "us-east-1".to_string(),
1147 request_id: "rid".to_string(),
1148 action: action.to_string(),
1149 is_query_protocol: false,
1150 access_key_id: None,
1151 principal: None,
1152 }
1153 }
1154
1155 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1156 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1157 match r {
1158 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1159 Err(e) => panic!("{action} failed: {e:?}"),
1160 }
1161 }
1162
1163 #[tokio::test]
1164 async fn read_only_listings_succeed_without_state() {
1165 let s = svc();
1166 run(&s, "GetAccountSettings", "", None, &[]).await;
1167 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1168 run(&s, "ListLayers", "", None, &[]).await;
1169 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1170 }
1171
1172 #[tokio::test]
1173 async fn layers_lifecycle() {
1174 let s = svc();
1175 run(
1176 &s,
1177 "PublishLayerVersion",
1178 r#"{"Content":{"ZipFile":""}}"#,
1179 Some("layer1"),
1180 &["2018-10-31", "layers", "layer1", "versions"],
1181 )
1182 .await;
1183 run(&s, "ListLayers", "", None, &[]).await;
1184 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1185 }
1186
1187 #[tokio::test]
1188 async fn code_signing_lifecycle() {
1189 let s = svc();
1190 run(
1191 &s,
1192 "CreateCodeSigningConfig",
1193 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
1194 None,
1195 &[],
1196 )
1197 .await;
1198 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
1199 }
1200}