1use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15 AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16 FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17 ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20pub(crate) fn resolve_layer_attachments(
25 accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26 arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28 arns.into_iter()
29 .map(|arn| {
30 let code_size = parse_layer_version_arn(&arn)
31 .and_then(|(acct, name, ver)| {
32 accounts
33 .get(&acct)
34 .and_then(|s| s.layers.get(&name))
35 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36 .map(|v| v.code_size)
37 })
38 .unwrap_or(0);
39 AttachedLayer { arn, code_size }
40 })
41 .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 format!("Missing required field: {name}"),
49 )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53 AwsServiceError::aws_error(
54 StatusCode::NOT_FOUND,
55 "ResourceNotFoundException",
56 format!("{entity} not found: {name}"),
57 )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72fn function_name_from_arn(arn: &str) -> Option<String> {
79 let rest = arn.strip_prefix("arn:aws:lambda:")?;
80 let mut parts = rest.splitn(5, ':');
81 let _region = parts.next()?;
82 let _account = parts.next()?;
83 let resource_kind = parts.next()?;
84 if resource_kind != "function" {
85 return None;
86 }
87 let name_with_qualifier = parts.next()?;
88 Some(
89 name_with_qualifier
90 .split(':')
91 .next()
92 .unwrap_or(name_with_qualifier)
93 .to_string(),
94 )
95}
96
97fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104 raw_query
105 .split('&')
106 .filter(|s| !s.is_empty())
107 .map(|pair| {
108 let mut it = pair.splitn(2, '=');
109 let k = it.next().unwrap_or("");
110 let v = it.next().unwrap_or("");
111 (decode_query_segment(k), decode_query_segment(v))
112 })
113 .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117 let plus_decoded = s.replace('+', " ");
120 percent_encoding::percent_decode_str(&plus_decoded)
121 .decode_utf8_lossy()
122 .into_owned()
123}
124
125fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130 let host = req
131 .headers
132 .get(http::header::HOST)
133 .and_then(|h| h.to_str().ok())
134 .unwrap_or("localhost");
135 let scheme = req
136 .headers
137 .get("x-forwarded-proto")
138 .and_then(|h| h.to_str().ok())
139 .unwrap_or("http");
140 format!(
141 "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142 )
143}
144
145pub(crate) fn function_code_url(
150 req: &AwsRequest,
151 account_id: &str,
152 function_name: &str,
153 version_label: &str,
154) -> String {
155 let host = req
156 .headers
157 .get(http::header::HOST)
158 .and_then(|h| h.to_str().ok())
159 .unwrap_or("localhost");
160 let scheme = req
161 .headers
162 .get("x-forwarded-proto")
163 .and_then(|h| h.to_str().ok())
164 .unwrap_or("http");
165 let file = if version_label == "$LATEST" {
166 "latest.zip".to_string()
167 } else {
168 format!("{version_label}.zip")
169 };
170 format!("{scheme}://{host}/_fakecloud/lambda/function-code/{account_id}/{function_name}/{file}")
171}
172
173pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
177 let parts: Vec<&str> = arn.split(':').collect();
178 if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
179 return None;
180 }
181 let account = parts[4].to_string();
182 let name = parts[6].to_string();
183 let version: i64 = parts[7].parse().ok()?;
184 Some((account, name, version))
185}
186
187const LAMBDA_RUNTIMES: &[&str] = &[
191 "nodejs",
192 "nodejs4.3",
193 "nodejs6.10",
194 "nodejs8.10",
195 "nodejs10.x",
196 "nodejs12.x",
197 "nodejs14.x",
198 "nodejs16.x",
199 "nodejs18.x",
200 "nodejs20.x",
201 "nodejs22.x",
202 "nodejs24.x",
203 "nodejs4.3-edge",
204 "java8",
205 "java8.al2",
206 "java11",
207 "java17",
208 "java21",
209 "java25",
210 "python2.7",
211 "python3.6",
212 "python3.7",
213 "python3.8",
214 "python3.9",
215 "python3.10",
216 "python3.11",
217 "python3.12",
218 "python3.13",
219 "python3.14",
220 "dotnetcore1.0",
221 "dotnetcore2.0",
222 "dotnetcore2.1",
223 "dotnetcore3.1",
224 "dotnet6",
225 "dotnet8",
226 "dotnet10",
227 "go1.x",
228 "ruby2.5",
229 "ruby2.7",
230 "ruby3.2",
231 "ruby3.3",
232 "ruby3.4",
233 "provided",
234 "provided.al2",
235 "provided.al2023",
236];
237
238fn validate_layer_filters(req: &AwsRequest) -> Result<(), AwsServiceError> {
241 if let Some(arch) = req.query_params.get("CompatibleArchitecture") {
242 if arch != "x86_64" && arch != "arm64" {
243 return Err(AwsServiceError::aws_error(
244 StatusCode::BAD_REQUEST,
245 "InvalidParameterValueException",
246 format!(
247 "Invalid CompatibleArchitecture value '{}'; expected 'x86_64' or 'arm64'",
248 arch
249 ),
250 ));
251 }
252 }
253 if let Some(rt) = req.query_params.get("CompatibleRuntime") {
254 if !LAMBDA_RUNTIMES.contains(&rt.as_str()) {
255 return Err(AwsServiceError::aws_error(
256 StatusCode::BAD_REQUEST,
257 "InvalidParameterValueException",
258 format!("Invalid CompatibleRuntime value '{}'", rt),
259 ));
260 }
261 }
262 Ok(())
263}
264
265fn parse_qualifier(req: &AwsRequest) -> String {
266 req.query_params
267 .get("Qualifier")
268 .cloned()
269 .unwrap_or_else(|| "$LATEST".to_string())
270}
271
272fn require_qualifier(req: &AwsRequest) -> Result<String, AwsServiceError> {
277 req.query_params.get("Qualifier").cloned().ok_or_else(|| {
278 AwsServiceError::aws_error(
279 StatusCode::BAD_REQUEST,
280 "InvalidParameterValueException",
281 "Qualifier is required for this operation",
282 )
283 })
284}
285
286fn id_from_time(prefix: &str) -> String {
287 format!(
288 "{}{}",
289 prefix,
290 std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .map(|d| d.as_nanos())
293 .unwrap_or(0)
294 )
295}
296
297mod account;
298mod aliases;
299mod code_signing;
300mod concurrency;
301mod event_invoke;
302mod function_url;
303mod layers;
304mod recursion;
305mod runtime;
306mod stream;
307
308impl LambdaService {
309 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
310 where
311 F: FnOnce(&LambdaState) -> R,
312 {
313 let accounts = self.state.read();
314 let empty = LambdaState::new(account_id, region);
315 let state = accounts.get(account_id).unwrap_or(&empty);
316 f(state)
317 }
318
319 fn get_function_configuration(
322 &self,
323 function_name: &str,
324 account_id: &str,
325 req: &AwsRequest,
326 ) -> Result<AwsResponse, AwsServiceError> {
327 let region = self.region_for(account_id);
328 let qualifier = req.query_params.get("Qualifier").cloned();
329 self.with_state_read(account_id, ®ion, |state| {
330 let live = state
331 .functions
332 .get(function_name)
333 .ok_or_else(|| not_found("Function", function_name))?;
334 let resolved = crate::service::resolve_qualifier_to_version(
338 state,
339 function_name,
340 qualifier.as_deref(),
341 );
342 let (func, version_label) = match resolved {
343 None => (live, "$LATEST".to_string()),
344 Some(v) => {
345 let snap = state
346 .function_version_snapshots
347 .get(function_name)
348 .and_then(|m| m.get(&v))
349 .ok_or_else(|| not_found("Function", function_name))?;
350 (snap, v)
351 }
352 };
353 let mut config = self.function_config_json(func);
354 config["Version"] = json!(version_label);
355 if version_label != "$LATEST" {
356 config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
357 config["MasterArn"] = json!(live.function_arn);
358 }
359 ok(config)
360 })
361 }
362
363 fn update_function_configuration(
364 &self,
365 function_name: &str,
366 req: &AwsRequest,
367 ) -> Result<AwsResponse, AwsServiceError> {
368 let body = body(req);
369 let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
373 Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
374 None => None,
375 };
376 let mut accounts = self.state.write();
377 let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
380 let arns: Vec<String> = arr
381 .iter()
382 .filter_map(|v| v.as_str().map(String::from))
383 .collect();
384 resolve_layer_attachments(&accounts, arns)
385 });
386 let state = accounts.get_or_create(&req.account_id);
387 let func = state
388 .functions
389 .get_mut(function_name)
390 .ok_or_else(|| not_found("Function", function_name))?;
391 if let Some(handler) = body["Handler"].as_str() {
392 func.handler = handler.to_string();
393 }
394 if let Some(t) = body["Timeout"].as_i64() {
395 func.timeout = t;
396 }
397 if let Some(m) = body["MemorySize"].as_i64() {
398 func.memory_size = m;
399 }
400 if let Some(role) = body["Role"].as_str() {
401 func.role = role.to_string();
402 }
403 if let Some(desc) = body["Description"].as_str() {
404 func.description = desc.to_string();
405 }
406 if let Some(rt) = body["Runtime"].as_str() {
407 func.runtime = rt.to_string();
408 }
409 if let Some(env) = body["Environment"]["Variables"].as_object() {
410 func.environment = env
411 .iter()
412 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
413 .collect();
414 }
415 if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
416 func.tracing_mode = Some(mode.to_string());
417 }
418 if let Some(arn) = body["KMSKeyArn"].as_str() {
419 func.kms_key_arn = if arn.is_empty() {
420 None
421 } else {
422 Some(arn.to_string())
423 };
424 }
425 if let Some(size) = validated_ephemeral {
426 func.ephemeral_storage_size = Some(size);
427 }
428 if body["VpcConfig"].is_object() {
429 func.vpc_config = Some(body["VpcConfig"].clone());
430 }
431 if body["SnapStart"].is_object() {
432 func.snap_start = Some(body["SnapStart"].clone());
433 }
434 if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
435 func.dead_letter_config_arn = if arn.is_empty() {
436 None
437 } else {
438 Some(arn.to_string())
439 };
440 }
441 if let Some(fsc) = body["FileSystemConfigs"].as_array() {
442 func.file_system_configs = fsc.clone();
443 }
444 if body["LoggingConfig"].is_object() {
445 func.logging_config = Some(body["LoggingConfig"].clone());
446 }
447 if body["ImageConfig"].is_object() {
448 func.image_config = Some(body["ImageConfig"].clone());
449 }
450 if body["DurableConfig"].is_object() {
451 func.durable_config = Some(body["DurableConfig"].clone());
452 }
453 if let Some(attachments) = layer_attachments {
454 func.layers = attachments;
455 }
456 func.revision_id = uuid::Uuid::new_v4().to_string();
460 func.last_modified = Utc::now();
461 ok(self.function_config_json(func))
462 }
463
464 fn update_function_code(
465 &self,
466 function_name: &str,
467 req: &AwsRequest,
468 ) -> Result<AwsResponse, AwsServiceError> {
469 let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
470
471 let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
476 Some(b64) => Some(
477 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
478 |_| {
479 AwsServiceError::aws_error(
480 StatusCode::BAD_REQUEST,
481 "InvalidParameterValueException",
482 "Could not decode ZipFile: invalid base64",
483 )
484 },
485 )?,
486 ),
487 None => None,
488 };
489 let new_image_uri = body["ImageUri"].as_str().map(String::from);
490 let s3_fetched_zip: Option<Vec<u8>> = match (
503 body["S3Bucket"].as_str(),
504 body["S3Key"].as_str(),
505 ) {
506 (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
507 if let Some(s3) = &self.s3_delivery {
508 match s3.get_object(&req.account_id, bucket, key) {
509 Ok(bytes) => Some(bytes),
510 Err(e) => {
511 return Err(AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "InvalidParameterValueException",
514 format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
515 ));
516 }
517 }
518 } else {
519 None
520 }
521 }
522 _ => None,
523 };
524
525 let new_s3_descriptor: Option<Vec<u8>> =
526 match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
527 (Some(bucket), Some(key))
528 if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
529 {
530 let mut descriptor = serde_json::Map::new();
531 descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
532 descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
533 if let Some(ver) = body["S3ObjectVersion"].as_str() {
534 descriptor.insert(
535 "S3ObjectVersion".to_string(),
536 Value::String(ver.to_string()),
537 );
538 }
539 Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
540 }
541 _ => None,
542 };
543 let new_zip = new_zip.or(s3_fetched_zip);
544 let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
545 let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
546 let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
547 arr.iter()
548 .filter_map(|v| v.as_str().map(String::from))
549 .collect()
550 });
551 let dry_run = body["DryRun"].as_bool().unwrap_or(false);
552 let publish = body["Publish"].as_bool().unwrap_or(false);
553
554 let mut accounts = self.state.write();
555 let state = accounts.get_or_create(&req.account_id);
556
557 if !state.functions.contains_key(function_name) {
561 return Err(not_found("Function", function_name));
562 }
563
564 if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
569 let csc_id = extract_csc_id(&csc_arn);
570 if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
571 if !csc.allowed_publishers.is_empty()
572 && csc
573 .untrusted_artifact_action
574 .eq_ignore_ascii_case("Enforce")
575 {
576 let allowed = match supplied_signing_profile.as_deref() {
577 Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
578 None => false,
579 };
580 if !allowed {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "CodeVerificationFailedException",
584 "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
585 ));
586 }
587 }
588 }
589 }
590
591 let func = state
592 .functions
593 .get_mut(function_name)
594 .ok_or_else(|| not_found("Function", function_name))?;
595
596 if let Some(ref rev) = supplied_revision_id {
600 if rev != &func.revision_id {
601 return Err(AwsServiceError::aws_error(
602 StatusCode::PRECONDITION_FAILED,
603 "PreconditionFailedException",
604 format!(
605 "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
606 ),
607 ));
608 }
609 }
610
611 if dry_run {
613 return ok(self.function_config_json(func));
614 }
615
616 let mut changed = false;
617 if let Some(bytes) = new_zip {
618 let mut hasher = Sha256::new();
621 hasher.update(&bytes);
622 let hash = hasher.finalize();
623 let code_sha256 =
624 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
625 if code_sha256 != func.code_sha256 {
626 changed = true;
627 }
628 func.code_size = bytes.len() as i64;
629 func.code_zip = Some(bytes);
630 func.code_sha256 = code_sha256;
631 func.image_uri = None;
632 func.package_type = "Zip".to_string();
633 } else if let Some(descriptor_bytes) = new_s3_descriptor {
634 let mut hasher = Sha256::new();
641 hasher.update(&descriptor_bytes);
642 let hash = hasher.finalize();
643 let code_sha256 =
644 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
645 if code_sha256 != func.code_sha256 {
646 changed = true;
647 }
648 func.code_size = descriptor_bytes.len() as i64;
649 func.code_zip = None;
653 func.code_sha256 = code_sha256;
654 func.image_uri = None;
655 func.package_type = "Zip".to_string();
656 } else if let Some(uri) = new_image_uri {
657 if func.image_uri.as_deref() != Some(uri.as_str()) {
658 changed = true;
659 }
660 func.image_uri = Some(uri);
661 func.code_zip = None;
662 func.package_type = "Image".to_string();
663 func.code_size = 0;
667 func.code_sha256 = String::new();
668 }
669
670 if let Some(arns) = new_architectures {
671 if !arns.is_empty() && arns != func.architectures {
672 changed = true;
673 func.architectures = arns;
674 }
675 }
676
677 if let Some(arn) = supplied_signing_profile {
678 if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
679 changed = true;
680 }
681 func.signing_profile_version_arn = Some(arn);
682 }
683
684 func.last_modified = Utc::now();
689 if changed {
690 func.revision_id = uuid::Uuid::new_v4().to_string();
691 }
692 func.last_update_status_reason = None;
696 func.last_update_status_reason_code = None;
697
698 if publish {
701 drop(accounts);
702 return self.publish_version(function_name, &req.account_id, req);
703 }
704
705 ok(self.function_config_json(func))
706 }
707
708 fn list_versions_by_function(
711 &self,
712 function_name: &str,
713 account_id: &str,
714 req: &AwsRequest,
715 ) -> Result<AwsResponse, AwsServiceError> {
716 let region = self.region_for(account_id);
717 let max_items: usize = req
720 .query_params
721 .get("MaxItems")
722 .and_then(|v| v.parse::<usize>().ok())
723 .map(|n| n.clamp(1, 10000))
724 .unwrap_or(50);
725 let marker = req.query_params.get("Marker").cloned();
726 self.with_state_read(account_id, ®ion, |state| {
727 let func = state
728 .functions
729 .get(function_name)
730 .ok_or_else(|| not_found("Function", function_name))?;
731 let mut all: Vec<serde_json::Value> = Vec::new();
735 let mut latest = self.function_config_json(func);
736 latest["Version"] = json!("$LATEST");
737 latest["FunctionArn"] = json!(format!("{}:$LATEST", func.function_arn));
742 all.push(latest);
743 let snapshots = state.function_version_snapshots.get(function_name);
744 if let Some(numbered) = state.function_versions.get(function_name) {
745 for v in numbered {
746 let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
747 let mut cfg = self.function_config_json(snap);
748 cfg["Version"] = json!(v);
749 cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
750 cfg["MasterArn"] = json!(func.function_arn);
751 all.push(cfg);
752 }
753 }
754 let start = match marker.as_deref() {
758 Some(m) => all
759 .iter()
760 .position(|v| v["Version"].as_str() == Some(m))
761 .map(|i| i + 1)
762 .unwrap_or(0),
763 None => 0,
764 };
765 let end = (start + max_items).min(all.len());
766 let page: Vec<serde_json::Value> = all[start..end].to_vec();
767 let mut body = json!({ "Versions": page });
768 if end < all.len() {
769 if let Some(last) = all[end - 1]["Version"].as_str() {
770 body["NextMarker"] = json!(last);
771 }
772 }
773 ok(body)
774 })
775 }
776
777 fn pc_key(function: &str, qualifier: &str) -> String {
778 format!("{function}:{qualifier}")
779 }
780
781 fn tag_resource(
784 &self,
785 resource_arn: &str,
786 req: &AwsRequest,
787 ) -> Result<AwsResponse, AwsServiceError> {
788 let body = body(req);
789 let new_tags: Vec<(String, String)> = body
790 .get("Tags")
791 .and_then(|v| v.as_object())
792 .map(|m| {
793 m.iter()
794 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
795 .collect()
796 })
797 .unwrap_or_default();
798 let resource_arn_decoded = decode_query_segment(resource_arn);
801 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
802 AwsServiceError::aws_error(
803 StatusCode::BAD_REQUEST,
804 "InvalidParameterValueException",
805 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
806 )
807 })?;
808 let mut accounts = self.state.write();
809 let state = accounts.get_or_create(&req.account_id);
810 let func = state.functions.get_mut(&name).ok_or_else(|| {
811 AwsServiceError::aws_error(
812 StatusCode::NOT_FOUND,
813 "ResourceNotFoundException",
814 format!("Function not found: {name}"),
815 )
816 })?;
817 for (k, v) in new_tags {
820 func.tags.insert(k, v);
821 }
822 empty()
823 }
824
825 fn untag_resource(
826 &self,
827 resource_arn: &str,
828 req: &AwsRequest,
829 ) -> Result<AwsResponse, AwsServiceError> {
830 let mut keys: Vec<String> = Vec::new();
843 for (k, v) in parse_query_pairs(&req.raw_query) {
844 if k == "tagKeys" || k.starts_with("tagKeys.") {
845 keys.push(v);
846 }
847 }
848 if keys.is_empty() {
849 let parsed = body(req);
850 for field in ["TagKeys", "tagKeys"] {
851 if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
852 for v in arr {
853 if let Some(s) = v.as_str() {
854 keys.push(s.to_string());
855 }
856 }
857 if !keys.is_empty() {
858 break;
859 }
860 }
861 }
862 }
863 let resource_arn_decoded = decode_query_segment(resource_arn);
864 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
865 AwsServiceError::aws_error(
866 StatusCode::BAD_REQUEST,
867 "InvalidParameterValueException",
868 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
869 )
870 })?;
871 let mut accounts = self.state.write();
872 let state = accounts.get_or_create(&req.account_id);
873 let func = state.functions.get_mut(&name).ok_or_else(|| {
874 AwsServiceError::aws_error(
875 StatusCode::NOT_FOUND,
876 "ResourceNotFoundException",
877 format!("Function not found: {name}"),
878 )
879 })?;
880 for k in &keys {
881 func.tags.remove(k);
882 }
883 empty()
884 }
885
886 fn list_tags(
887 &self,
888 resource_arn: &str,
889 account_id: &str,
890 ) -> Result<AwsResponse, AwsServiceError> {
891 let resource_arn_decoded = decode_query_segment(resource_arn);
892 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
893 AwsServiceError::aws_error(
894 StatusCode::BAD_REQUEST,
895 "InvalidParameterValueException",
896 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
897 )
898 })?;
899 let region = self.region_for(account_id);
900 self.with_state_read(account_id, ®ion, |state| {
901 let func = state.functions.get(&name).ok_or_else(|| {
902 AwsServiceError::aws_error(
903 StatusCode::NOT_FOUND,
904 "ResourceNotFoundException",
905 format!("Function not found: {name}"),
906 )
907 })?;
908 let tags: serde_json::Map<String, Value> = func
909 .tags
910 .iter()
911 .map(|(k, v)| (k.clone(), Value::String(v.clone())))
912 .collect();
913 ok(json!({"Tags": tags}))
914 })
915 }
916
917 fn update_event_source_mapping_handler(
920 &self,
921 uuid: &str,
922 req: &AwsRequest,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let body = body(req);
925 let mut accounts = self.state.write();
926 let state = accounts.get_or_create(&req.account_id);
927 let esm = state
928 .event_source_mappings
929 .get_mut(uuid)
930 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
931 if let Some(b) = body["BatchSize"].as_i64() {
932 esm.batch_size = b;
933 }
934 if let Some(name) = body["FunctionName"].as_str() {
935 esm.function_arn = format!(
936 "arn:aws:lambda:{}:{}:function:{}",
937 state.region, state.account_id, name
938 );
939 }
940 if let Some(filters) = body
941 .get("FilterCriteria")
942 .and_then(|v| v.get("Filters"))
943 .and_then(|v| v.as_array())
944 {
945 esm.filter_patterns = filters
946 .iter()
947 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
948 .collect();
949 }
950 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
951 esm.function_response_types = types
952 .iter()
953 .filter_map(|v| v.as_str().map(String::from))
954 .collect();
955 }
956 if let Some(w) = body
957 .get("MaximumBatchingWindowInSeconds")
958 .and_then(|v| v.as_i64())
959 {
960 esm.maximum_batching_window_in_seconds = Some(w);
961 }
962 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
963 esm.parallelization_factor = Some(p);
964 }
965 if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
966 esm.kms_key_arn = Some(s.to_string());
967 }
968 if let Some(mc) = body.get("MetricsConfig") {
969 esm.metrics_config = Some(mc.clone());
970 }
971 if let Some(dc) = body.get("DestinationConfig") {
972 esm.destination_config = Some(dc.clone());
973 }
974 if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
975 esm.maximum_retry_attempts = Some(n);
976 }
977 if let Some(n) = body
978 .get("MaximumRecordAgeInSeconds")
979 .and_then(|v| v.as_i64())
980 {
981 esm.maximum_record_age_in_seconds = Some(n);
982 }
983 if let Some(b) = body
984 .get("BisectBatchOnFunctionError")
985 .and_then(|v| v.as_bool())
986 {
987 esm.bisect_batch_on_function_error = Some(b);
988 }
989 if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
990 esm.tumbling_window_in_seconds = Some(n);
991 }
992 if let Some(enabled) = body.get("Enabled").and_then(|v| v.as_bool()) {
997 esm.enabled = enabled;
998 esm.state = if enabled { "Enabled" } else { "Disabled" }.to_string();
999 }
1000 if let Some(sac) = body
1004 .get("SourceAccessConfigurations")
1005 .and_then(|v| v.as_array())
1006 {
1007 esm.source_access_configurations = sac.clone();
1008 }
1009 esm.last_modified = chrono::Utc::now();
1010 let response = self.event_source_mapping_json(esm);
1014 ok(response)
1015 }
1016
1017 fn region_for(&self, account_id: &str) -> String {
1018 let accounts = self.state.read();
1019 accounts
1020 .get(account_id)
1021 .map(|s| s.region.clone())
1022 .unwrap_or_else(|| "us-east-1".to_string())
1023 }
1024}
1025
1026fn extract_csc_id(input: &str) -> String {
1027 let decoded = percent_decode(input);
1030 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1031}
1032
1033pub(crate) fn percent_decode_for_length(input: &str) -> String {
1036 percent_decode(input)
1037}
1038
1039fn percent_decode(input: &str) -> String {
1040 let mut out = String::with_capacity(input.len());
1041 let bytes = input.as_bytes();
1042 let mut i = 0;
1043 while i < bytes.len() {
1044 if bytes[i] == b'%' && i + 2 < bytes.len() {
1045 let hi = (bytes[i + 1] as char).to_digit(16);
1046 let lo = (bytes[i + 2] as char).to_digit(16);
1047 if let (Some(h), Some(l)) = (hi, lo) {
1048 out.push(((h * 16 + l) as u8) as char);
1049 i += 3;
1050 continue;
1051 }
1052 }
1053 out.push(bytes[i] as char);
1054 i += 1;
1055 }
1056 out
1057}
1058
1059fn code_signing_json(c: &CodeSigningConfig) -> Value {
1060 json!({
1061 "CodeSigningConfigId": c.csc_id,
1062 "CodeSigningConfigArn": c.csc_arn,
1063 "Description": c.description,
1064 "AllowedPublishers": {
1065 "SigningProfileVersionArns": c.allowed_publishers,
1066 },
1067 "CodeSigningPolicies": {
1068 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1069 },
1070 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1071 })
1072}
1073
1074fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1075 let destination = match &c.destination_config {
1086 None => json!({"OnSuccess": {}, "OnFailure": {}}),
1087 Some(v) if !v.is_object() => json!({}),
1088 Some(v) => {
1089 let mut map = v.as_object().cloned().unwrap_or_default();
1090 if !map.is_empty() {
1091 map.entry("OnSuccess".to_string()).or_insert(json!({}));
1092 map.entry("OnFailure".to_string()).or_insert(json!({}));
1093 }
1094 Value::Object(map)
1095 }
1096 };
1097 let mut out = json!({
1098 "FunctionArn": c.function_arn,
1099 "MaximumRetryAttempts": c.maximum_retry_attempts,
1100 "DestinationConfig": destination,
1101 "LastModified": c
1107 .last_modified
1108 .timestamp_millis() as f64
1109 / 1000.0,
1110 });
1111 if c.maximum_event_age != 0 {
1113 out["MaximumEventAgeInSeconds"] = json!(c.maximum_event_age);
1114 }
1115 out
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120 use crate::service::LambdaService;
1121 use crate::state::{LambdaState, SharedLambdaState};
1122 use fakecloud_core::multi_account::MultiAccountState;
1123 use fakecloud_core::service::AwsRequest;
1124 use http::Method;
1125 use parking_lot::RwLock;
1126 use std::collections::HashMap;
1127 use std::sync::Arc;
1128
1129 fn svc() -> LambdaService {
1130 let state: SharedLambdaState = Arc::new(RwLock::new(
1131 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1132 ));
1133 LambdaService::new(state)
1134 }
1135
1136 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1137 AwsRequest {
1138 service: "lambda".to_string(),
1139 method: Method::POST,
1140 raw_path: format!("/{}", segs.join("/")),
1141 raw_query: String::new(),
1142 path_segments: segs.iter().map(|s| s.to_string()).collect(),
1143 query_params: HashMap::new(),
1144 headers: http::HeaderMap::new(),
1145 body: bytes::Bytes::from(body.to_string()),
1146 body_stream: parking_lot::Mutex::new(None),
1147 account_id: "000000000000".to_string(),
1148 region: "us-east-1".to_string(),
1149 request_id: "rid".to_string(),
1150 action: action.to_string(),
1151 is_query_protocol: false,
1152 access_key_id: None,
1153 principal: None,
1154 }
1155 }
1156
1157 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1158 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1159 match r {
1160 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1161 Err(e) => panic!("{action} failed: {e:?}"),
1162 }
1163 }
1164
1165 #[tokio::test]
1166 async fn read_only_listings_succeed_without_state() {
1167 let s = svc();
1168 run(&s, "GetAccountSettings", "", None, &[]).await;
1169 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1170 run(&s, "ListLayers", "", None, &[]).await;
1171 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1172 }
1173
1174 #[tokio::test]
1175 async fn layers_lifecycle() {
1176 let s = svc();
1177 run(
1178 &s,
1179 "PublishLayerVersion",
1180 r#"{"Content":{"ZipFile":""}}"#,
1181 Some("layer1"),
1182 &["2018-10-31", "layers", "layer1", "versions"],
1183 )
1184 .await;
1185 run(&s, "ListLayers", "", None, &[]).await;
1186 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1187 }
1188
1189 #[tokio::test]
1190 async fn code_signing_lifecycle() {
1191 let s = svc();
1192 run(
1193 &s,
1194 "CreateCodeSigningConfig",
1195 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
1196 None,
1197 &[],
1198 )
1199 .await;
1200 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
1201 }
1202}