1use chrono::Utc;
6use http::StatusCode;
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9
10use fakecloud_aws::arn::Arn;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
12
13use crate::service::LambdaService;
14use crate::state::{
15 AccountSettings, AttachedLayer, CodeSigningConfig, EventInvokeConfig, FunctionAlias,
16 FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
17 ProvisionedConcurrencyConfig, RuntimeManagementConfig,
18};
19
20pub(crate) fn resolve_layer_attachments(
25 accounts: &fakecloud_core::multi_account::MultiAccountState<LambdaState>,
26 arns: Vec<String>,
27) -> Vec<AttachedLayer> {
28 arns.into_iter()
29 .map(|arn| {
30 let code_size = parse_layer_version_arn(&arn)
31 .and_then(|(acct, name, ver)| {
32 accounts
33 .get(&acct)
34 .and_then(|s| s.layers.get(&name))
35 .and_then(|l| l.versions.iter().find(|v| v.version == ver))
36 .map(|v| v.code_size)
37 })
38 .unwrap_or(0);
39 AttachedLayer { arn, code_size }
40 })
41 .collect()
42}
43
44fn missing(name: &str) -> AwsServiceError {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 format!("Missing required field: {name}"),
49 )
50}
51
52fn not_found(entity: &str, name: &str) -> AwsServiceError {
53 AwsServiceError::aws_error(
54 StatusCode::NOT_FOUND,
55 "ResourceNotFoundException",
56 format!("{entity} not found: {name}"),
57 )
58}
59
60fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
61 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
62}
63
64fn empty() -> Result<AwsResponse, AwsServiceError> {
65 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
66}
67
68fn body(req: &AwsRequest) -> Value {
69 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
70}
71
72fn function_name_from_arn(arn: &str) -> Option<String> {
79 let rest = arn.strip_prefix("arn:aws:lambda:")?;
80 let mut parts = rest.splitn(5, ':');
81 let _region = parts.next()?;
82 let _account = parts.next()?;
83 let resource_kind = parts.next()?;
84 if resource_kind != "function" {
85 return None;
86 }
87 let name_with_qualifier = parts.next()?;
88 Some(
89 name_with_qualifier
90 .split(':')
91 .next()
92 .unwrap_or(name_with_qualifier)
93 .to_string(),
94 )
95}
96
97fn parse_query_pairs(raw_query: &str) -> Vec<(String, String)> {
104 raw_query
105 .split('&')
106 .filter(|s| !s.is_empty())
107 .map(|pair| {
108 let mut it = pair.splitn(2, '=');
109 let k = it.next().unwrap_or("");
110 let v = it.next().unwrap_or("");
111 (decode_query_segment(k), decode_query_segment(v))
112 })
113 .collect()
114}
115
116fn decode_query_segment(s: &str) -> String {
117 let plus_decoded = s.replace('+', " ");
120 percent_encoding::percent_decode_str(&plus_decoded)
121 .decode_utf8_lossy()
122 .into_owned()
123}
124
125fn layer_content_url(req: &AwsRequest, account_id: &str, layer_name: &str, version: i64) -> String {
130 let host = req
131 .headers
132 .get(http::header::HOST)
133 .and_then(|h| h.to_str().ok())
134 .unwrap_or("localhost");
135 let scheme = req
136 .headers
137 .get("x-forwarded-proto")
138 .and_then(|h| h.to_str().ok())
139 .unwrap_or("http");
140 format!(
141 "{scheme}://{host}/_fakecloud/lambda/layer-content/{account_id}/{layer_name}/{version}.zip"
142 )
143}
144
145pub(crate) fn function_code_url(
150 req: &AwsRequest,
151 account_id: &str,
152 function_name: &str,
153 version_label: &str,
154) -> String {
155 let host = req
156 .headers
157 .get(http::header::HOST)
158 .and_then(|h| h.to_str().ok())
159 .unwrap_or("localhost");
160 let scheme = req
161 .headers
162 .get("x-forwarded-proto")
163 .and_then(|h| h.to_str().ok())
164 .unwrap_or("http");
165 let file = if version_label == "$LATEST" {
166 "latest.zip".to_string()
167 } else {
168 format!("{version_label}.zip")
169 };
170 format!("{scheme}://{host}/_fakecloud/lambda/function-code/{account_id}/{function_name}/{file}")
171}
172
173pub fn parse_layer_version_arn(arn: &str) -> Option<(String, String, i64)> {
177 let parts: Vec<&str> = arn.split(':').collect();
178 if parts.len() != 8 || parts[0] != "arn" || parts[2] != "lambda" || parts[5] != "layer" {
179 return None;
180 }
181 let account = parts[4].to_string();
182 let name = parts[6].to_string();
183 let version: i64 = parts[7].parse().ok()?;
184 Some((account, name, version))
185}
186
187const LAMBDA_RUNTIMES: &[&str] = &[
191 "nodejs",
192 "nodejs4.3",
193 "nodejs6.10",
194 "nodejs8.10",
195 "nodejs10.x",
196 "nodejs12.x",
197 "nodejs14.x",
198 "nodejs16.x",
199 "nodejs18.x",
200 "nodejs20.x",
201 "nodejs22.x",
202 "nodejs24.x",
203 "nodejs4.3-edge",
204 "java8",
205 "java8.al2",
206 "java11",
207 "java17",
208 "java21",
209 "java25",
210 "python2.7",
211 "python3.6",
212 "python3.7",
213 "python3.8",
214 "python3.9",
215 "python3.10",
216 "python3.11",
217 "python3.12",
218 "python3.13",
219 "python3.14",
220 "dotnetcore1.0",
221 "dotnetcore2.0",
222 "dotnetcore2.1",
223 "dotnetcore3.1",
224 "dotnet6",
225 "dotnet8",
226 "dotnet10",
227 "go1.x",
228 "ruby2.5",
229 "ruby2.7",
230 "ruby3.2",
231 "ruby3.3",
232 "ruby3.4",
233 "provided",
234 "provided.al2",
235 "provided.al2023",
236];
237
238fn validate_layer_filters(req: &AwsRequest) -> Result<(), AwsServiceError> {
241 if let Some(arch) = req.query_params.get("CompatibleArchitecture") {
242 if arch != "x86_64" && arch != "arm64" {
243 return Err(AwsServiceError::aws_error(
244 StatusCode::BAD_REQUEST,
245 "InvalidParameterValueException",
246 format!(
247 "Invalid CompatibleArchitecture value '{}'; expected 'x86_64' or 'arm64'",
248 arch
249 ),
250 ));
251 }
252 }
253 if let Some(rt) = req.query_params.get("CompatibleRuntime") {
254 if !LAMBDA_RUNTIMES.contains(&rt.as_str()) {
255 return Err(AwsServiceError::aws_error(
256 StatusCode::BAD_REQUEST,
257 "InvalidParameterValueException",
258 format!("Invalid CompatibleRuntime value '{}'", rt),
259 ));
260 }
261 }
262 Ok(())
263}
264
265fn parse_qualifier(req: &AwsRequest) -> String {
266 req.query_params
267 .get("Qualifier")
268 .cloned()
269 .unwrap_or_else(|| "$LATEST".to_string())
270}
271
272fn require_qualifier(req: &AwsRequest) -> Result<String, AwsServiceError> {
277 req.query_params.get("Qualifier").cloned().ok_or_else(|| {
278 AwsServiceError::aws_error(
279 StatusCode::BAD_REQUEST,
280 "InvalidParameterValueException",
281 "Qualifier is required for this operation",
282 )
283 })
284}
285
286fn id_from_time(prefix: &str) -> String {
287 format!(
288 "{}{}",
289 prefix,
290 std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .map(|d| d.as_nanos())
293 .unwrap_or(0)
294 )
295}
296
297mod account;
298mod aliases;
299mod code_signing;
300mod concurrency;
301mod event_invoke;
302mod function_url;
303mod layers;
304mod recursion;
305mod runtime;
306mod stream;
307
308impl LambdaService {
309 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
310 where
311 F: FnOnce(&LambdaState) -> R,
312 {
313 let accounts = self.state.read();
314 let empty = LambdaState::new(account_id, region);
315 let state = accounts.get(account_id).unwrap_or(&empty);
316 f(state)
317 }
318
319 fn get_function_configuration(
322 &self,
323 function_name: &str,
324 account_id: &str,
325 req: &AwsRequest,
326 ) -> Result<AwsResponse, AwsServiceError> {
327 let region = self.region_for(account_id);
328 let qualifier = req.query_params.get("Qualifier").cloned();
329 self.with_state_read(account_id, ®ion, |state| {
330 let live = state
331 .functions
332 .get(function_name)
333 .ok_or_else(|| not_found("Function", function_name))?;
334 let resolved = crate::service::resolve_qualifier_to_version(
338 state,
339 function_name,
340 qualifier.as_deref(),
341 );
342 let (func, version_label) = match resolved {
343 None => (live, "$LATEST".to_string()),
344 Some(v) => {
345 let snap = state
346 .function_version_snapshots
347 .get(function_name)
348 .and_then(|m| m.get(&v))
349 .ok_or_else(|| not_found("Function", function_name))?;
350 (snap, v)
351 }
352 };
353 let mut config = self.function_config_json(func);
354 config["Version"] = json!(version_label);
355 if version_label != "$LATEST" {
356 config["FunctionArn"] = json!(format!("{}:{version_label}", live.function_arn));
357 config["MasterArn"] = json!(live.function_arn);
358 }
359 ok(config)
360 })
361 }
362
363 fn update_function_configuration(
364 &self,
365 function_name: &str,
366 req: &AwsRequest,
367 ) -> Result<AwsResponse, AwsServiceError> {
368 let body = body(req);
369 let validated_ephemeral = match body["EphemeralStorage"]["Size"].as_i64() {
373 Some(size) => Some(crate::service::validate_ephemeral_storage(size)?),
374 None => None,
375 };
376 let mut accounts = self.state.write();
377 let layer_attachments: Option<Vec<AttachedLayer>> = body["Layers"].as_array().map(|arr| {
380 let arns: Vec<String> = arr
381 .iter()
382 .filter_map(|v| v.as_str().map(String::from))
383 .collect();
384 resolve_layer_attachments(&accounts, arns)
385 });
386 let state = accounts.get_or_create(&req.account_id);
387 let func = state
388 .functions
389 .get_mut(function_name)
390 .ok_or_else(|| not_found("Function", function_name))?;
391 if let Some(handler) = body["Handler"].as_str() {
392 func.handler = handler.to_string();
393 }
394 if let Some(t) = body["Timeout"].as_i64() {
395 func.timeout = t;
396 }
397 if let Some(m) = body["MemorySize"].as_i64() {
398 func.memory_size = m;
399 }
400 if let Some(role) = body["Role"].as_str() {
401 func.role = role.to_string();
402 }
403 if let Some(desc) = body["Description"].as_str() {
404 func.description = desc.to_string();
405 }
406 if let Some(rt) = body["Runtime"].as_str() {
407 func.runtime = rt.to_string();
408 }
409 if let Some(env) = body["Environment"]["Variables"].as_object() {
410 func.environment = env
411 .iter()
412 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
413 .collect();
414 }
415 if let Some(mode) = body["TracingConfig"]["Mode"].as_str() {
416 func.tracing_mode = Some(mode.to_string());
417 }
418 if let Some(arn) = body["KMSKeyArn"].as_str() {
419 func.kms_key_arn = if arn.is_empty() {
420 None
421 } else {
422 Some(arn.to_string())
423 };
424 }
425 if let Some(size) = validated_ephemeral {
426 func.ephemeral_storage_size = Some(size);
427 }
428 if body["VpcConfig"].is_object() {
429 func.vpc_config = Some(body["VpcConfig"].clone());
430 }
431 if body["SnapStart"].is_object() {
432 func.snap_start = Some(body["SnapStart"].clone());
433 }
434 if let Some(arn) = body["DeadLetterConfig"]["TargetArn"].as_str() {
435 func.dead_letter_config_arn = if arn.is_empty() {
436 None
437 } else {
438 Some(arn.to_string())
439 };
440 }
441 if let Some(fsc) = body["FileSystemConfigs"].as_array() {
442 func.file_system_configs = fsc.clone();
443 }
444 if body["LoggingConfig"].is_object() {
445 func.logging_config = Some(body["LoggingConfig"].clone());
446 }
447 if body["ImageConfig"].is_object() {
448 func.image_config = Some(body["ImageConfig"].clone());
449 }
450 if body["DurableConfig"].is_object() {
451 func.durable_config = Some(body["DurableConfig"].clone());
452 }
453 if let Some(attachments) = layer_attachments {
454 func.layers = attachments;
455 }
456 func.revision_id = uuid::Uuid::new_v4().to_string();
460 func.last_modified = Utc::now();
461 ok(self.function_config_json(func))
462 }
463
464 fn update_function_code(
465 &self,
466 function_name: &str,
467 req: &AwsRequest,
468 ) -> Result<AwsResponse, AwsServiceError> {
469 let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap_or_default();
470
471 let new_zip: Option<Vec<u8>> = match body["ZipFile"].as_str() {
476 Some(b64) => Some(
477 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
478 |_| {
479 AwsServiceError::aws_error(
480 StatusCode::BAD_REQUEST,
481 "InvalidParameterValueException",
482 "Could not decode ZipFile: invalid base64",
483 )
484 },
485 )?,
486 ),
487 None => None,
488 };
489 let new_image_uri = body["ImageUri"].as_str().map(String::from);
490 let s3_fetched_zip: Option<Vec<u8>> = match (
503 body["S3Bucket"].as_str(),
504 body["S3Key"].as_str(),
505 ) {
506 (Some(bucket), Some(key)) if new_zip.is_none() && new_image_uri.is_none() => {
507 if let Some(s3) = &self.s3_delivery {
508 match s3.get_object(&req.account_id, bucket, key) {
509 Ok(bytes) => Some(bytes),
510 Err(e) => {
511 return Err(AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "InvalidParameterValueException",
514 format!("Error occurred while GetObject. S3 Error Code: NoSuchKey. S3 Error Message: {e}"),
515 ));
516 }
517 }
518 } else {
519 None
520 }
521 }
522 _ => None,
523 };
524
525 let new_s3_descriptor: Option<Vec<u8>> =
526 match (body["S3Bucket"].as_str(), body["S3Key"].as_str()) {
527 (Some(bucket), Some(key))
528 if new_zip.is_none() && new_image_uri.is_none() && s3_fetched_zip.is_none() =>
529 {
530 let mut descriptor = serde_json::Map::new();
531 descriptor.insert("S3Bucket".to_string(), Value::String(bucket.to_string()));
532 descriptor.insert("S3Key".to_string(), Value::String(key.to_string()));
533 if let Some(ver) = body["S3ObjectVersion"].as_str() {
534 descriptor.insert(
535 "S3ObjectVersion".to_string(),
536 Value::String(ver.to_string()),
537 );
538 }
539 Some(serde_json::to_vec(&Value::Object(descriptor)).unwrap_or_default())
540 }
541 _ => None,
542 };
543 let new_zip = new_zip.or(s3_fetched_zip);
544 let supplied_signing_profile = body["SigningProfileVersionArn"].as_str().map(String::from);
545 let supplied_revision_id = body["RevisionId"].as_str().map(String::from);
546 let new_architectures: Option<Vec<String>> = body["Architectures"].as_array().map(|arr| {
547 arr.iter()
548 .filter_map(|v| v.as_str().map(String::from))
549 .collect()
550 });
551 let dry_run = body["DryRun"].as_bool().unwrap_or(false);
552 let publish = body["Publish"].as_bool().unwrap_or(false);
553
554 let mut accounts = self.state.write();
555 let state = accounts.get_or_create(&req.account_id);
556
557 if !state.functions.contains_key(function_name) {
561 return Err(not_found("Function", function_name));
562 }
563
564 if let Some(csc_arn) = state.function_code_signing.get(function_name).cloned() {
569 let csc_id = extract_csc_id(&csc_arn);
570 if let Some(csc) = state.code_signing_configs.get(&csc_id).cloned() {
571 if !csc.allowed_publishers.is_empty()
572 && csc
573 .untrusted_artifact_action
574 .eq_ignore_ascii_case("Enforce")
575 {
576 let allowed = match supplied_signing_profile.as_deref() {
577 Some(arn) => csc.allowed_publishers.iter().any(|p| p == arn),
578 None => false,
579 };
580 if !allowed {
581 return Err(AwsServiceError::aws_error(
582 StatusCode::BAD_REQUEST,
583 "CodeVerificationFailedException",
584 "The code signature failed the integrity check or the signing profile is not in the allowed publishers list.",
585 ));
586 }
587 }
588 }
589 }
590
591 let func = state
592 .functions
593 .get_mut(function_name)
594 .ok_or_else(|| not_found("Function", function_name))?;
595
596 if let Some(ref rev) = supplied_revision_id {
600 if rev != &func.revision_id {
601 return Err(AwsServiceError::aws_error(
602 StatusCode::PRECONDITION_FAILED,
603 "PreconditionFailedException",
604 format!(
605 "The Revision Id provided: {rev} does not match the latest Revision Id of function: {function_name}. Call the GetFunction/GetAlias API to retrieve the latest Revision Id"
606 ),
607 ));
608 }
609 }
610
611 if dry_run {
613 return ok(self.function_config_json(func));
614 }
615
616 let mut changed = false;
617 if let Some(bytes) = new_zip {
618 let mut hasher = Sha256::new();
621 hasher.update(&bytes);
622 let hash = hasher.finalize();
623 let code_sha256 =
624 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
625 if code_sha256 != func.code_sha256 {
626 changed = true;
627 }
628 func.code_size = bytes.len() as i64;
629 func.code_zip = Some(bytes);
630 func.code_sha256 = code_sha256;
631 func.image_uri = None;
632 func.package_type = "Zip".to_string();
633 } else if let Some(descriptor_bytes) = new_s3_descriptor {
634 let mut hasher = Sha256::new();
641 hasher.update(&descriptor_bytes);
642 let hash = hasher.finalize();
643 let code_sha256 =
644 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
645 if code_sha256 != func.code_sha256 {
646 changed = true;
647 }
648 func.code_size = descriptor_bytes.len() as i64;
649 func.code_zip = None;
653 func.code_sha256 = code_sha256;
654 func.image_uri = None;
655 func.package_type = "Zip".to_string();
656 } else if let Some(uri) = new_image_uri {
657 if func.image_uri.as_deref() != Some(uri.as_str()) {
658 changed = true;
659 }
660 func.image_uri = Some(uri);
661 func.code_zip = None;
662 func.package_type = "Image".to_string();
663 func.code_size = 0;
667 func.code_sha256 = String::new();
668 }
669
670 if let Some(arns) = new_architectures {
671 if !arns.is_empty() && arns != func.architectures {
672 changed = true;
673 func.architectures = arns;
674 }
675 }
676
677 if let Some(arn) = supplied_signing_profile {
678 if func.signing_profile_version_arn.as_deref() != Some(arn.as_str()) {
679 changed = true;
680 }
681 func.signing_profile_version_arn = Some(arn);
682 }
683
684 func.last_modified = Utc::now();
689 if changed {
690 func.revision_id = uuid::Uuid::new_v4().to_string();
691 }
692 func.last_update_status_reason = None;
696 func.last_update_status_reason_code = None;
697
698 if publish {
701 drop(accounts);
702 return self.publish_version(function_name, &req.account_id, req);
703 }
704
705 ok(self.function_config_json(func))
706 }
707
708 fn list_versions_by_function(
711 &self,
712 function_name: &str,
713 account_id: &str,
714 req: &AwsRequest,
715 ) -> Result<AwsResponse, AwsServiceError> {
716 let region = self.region_for(account_id);
717 let max_items: usize = req
718 .query_params
719 .get("MaxItems")
720 .and_then(|v| v.parse::<usize>().ok())
721 .map(|n| n.clamp(1, 50))
722 .unwrap_or(50);
723 let marker = req.query_params.get("Marker").cloned();
724 self.with_state_read(account_id, ®ion, |state| {
725 let func = state
726 .functions
727 .get(function_name)
728 .ok_or_else(|| not_found("Function", function_name))?;
729 let mut all: Vec<serde_json::Value> = Vec::new();
733 let mut latest = self.function_config_json(func);
734 latest["Version"] = json!("$LATEST");
735 all.push(latest);
736 let snapshots = state.function_version_snapshots.get(function_name);
737 if let Some(numbered) = state.function_versions.get(function_name) {
738 for v in numbered {
739 let snap = snapshots.and_then(|m| m.get(v)).unwrap_or(func);
740 let mut cfg = self.function_config_json(snap);
741 cfg["Version"] = json!(v);
742 cfg["FunctionArn"] = json!(format!("{}:{v}", func.function_arn));
743 cfg["MasterArn"] = json!(func.function_arn);
744 all.push(cfg);
745 }
746 }
747 let start = match marker.as_deref() {
751 Some(m) => all
752 .iter()
753 .position(|v| v["Version"].as_str() == Some(m))
754 .map(|i| i + 1)
755 .unwrap_or(0),
756 None => 0,
757 };
758 let end = (start + max_items).min(all.len());
759 let page: Vec<serde_json::Value> = all[start..end].to_vec();
760 let mut body = json!({ "Versions": page });
761 if end < all.len() {
762 if let Some(last) = all[end - 1]["Version"].as_str() {
763 body["NextMarker"] = json!(last);
764 }
765 }
766 ok(body)
767 })
768 }
769
770 fn pc_key(function: &str, qualifier: &str) -> String {
771 format!("{function}:{qualifier}")
772 }
773
774 fn tag_resource(
777 &self,
778 resource_arn: &str,
779 req: &AwsRequest,
780 ) -> Result<AwsResponse, AwsServiceError> {
781 let body = body(req);
782 let new_tags: Vec<(String, String)> = body
783 .get("Tags")
784 .and_then(|v| v.as_object())
785 .map(|m| {
786 m.iter()
787 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
788 .collect()
789 })
790 .unwrap_or_default();
791 let resource_arn_decoded = decode_query_segment(resource_arn);
794 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
795 AwsServiceError::aws_error(
796 StatusCode::BAD_REQUEST,
797 "InvalidParameterValueException",
798 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
799 )
800 })?;
801 let mut accounts = self.state.write();
802 let state = accounts.get_or_create(&req.account_id);
803 let func = state.functions.get_mut(&name).ok_or_else(|| {
804 AwsServiceError::aws_error(
805 StatusCode::NOT_FOUND,
806 "ResourceNotFoundException",
807 format!("Function not found: {name}"),
808 )
809 })?;
810 for (k, v) in new_tags {
813 func.tags.insert(k, v);
814 }
815 empty()
816 }
817
818 fn untag_resource(
819 &self,
820 resource_arn: &str,
821 req: &AwsRequest,
822 ) -> Result<AwsResponse, AwsServiceError> {
823 let mut keys: Vec<String> = Vec::new();
836 for (k, v) in parse_query_pairs(&req.raw_query) {
837 if k == "tagKeys" || k.starts_with("tagKeys.") {
838 keys.push(v);
839 }
840 }
841 if keys.is_empty() {
842 let parsed = body(req);
843 for field in ["TagKeys", "tagKeys"] {
844 if let Some(arr) = parsed.get(field).and_then(|v| v.as_array()) {
845 for v in arr {
846 if let Some(s) = v.as_str() {
847 keys.push(s.to_string());
848 }
849 }
850 if !keys.is_empty() {
851 break;
852 }
853 }
854 }
855 }
856 let resource_arn_decoded = decode_query_segment(resource_arn);
857 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
858 AwsServiceError::aws_error(
859 StatusCode::BAD_REQUEST,
860 "InvalidParameterValueException",
861 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
862 )
863 })?;
864 let mut accounts = self.state.write();
865 let state = accounts.get_or_create(&req.account_id);
866 let func = state.functions.get_mut(&name).ok_or_else(|| {
867 AwsServiceError::aws_error(
868 StatusCode::NOT_FOUND,
869 "ResourceNotFoundException",
870 format!("Function not found: {name}"),
871 )
872 })?;
873 for k in &keys {
874 func.tags.remove(k);
875 }
876 empty()
877 }
878
879 fn list_tags(
880 &self,
881 resource_arn: &str,
882 account_id: &str,
883 ) -> Result<AwsResponse, AwsServiceError> {
884 let resource_arn_decoded = decode_query_segment(resource_arn);
885 let name = function_name_from_arn(&resource_arn_decoded).ok_or_else(|| {
886 AwsServiceError::aws_error(
887 StatusCode::BAD_REQUEST,
888 "InvalidParameterValueException",
889 format!("Resource ARN is not a Lambda function: {resource_arn_decoded}"),
890 )
891 })?;
892 let region = self.region_for(account_id);
893 self.with_state_read(account_id, ®ion, |state| {
894 let func = state.functions.get(&name).ok_or_else(|| {
895 AwsServiceError::aws_error(
896 StatusCode::NOT_FOUND,
897 "ResourceNotFoundException",
898 format!("Function not found: {name}"),
899 )
900 })?;
901 let tags: serde_json::Map<String, Value> = func
902 .tags
903 .iter()
904 .map(|(k, v)| (k.clone(), Value::String(v.clone())))
905 .collect();
906 ok(json!({"Tags": tags}))
907 })
908 }
909
910 fn update_event_source_mapping_handler(
913 &self,
914 uuid: &str,
915 req: &AwsRequest,
916 ) -> Result<AwsResponse, AwsServiceError> {
917 let body = body(req);
918 let mut accounts = self.state.write();
919 let state = accounts.get_or_create(&req.account_id);
920 let esm = state
921 .event_source_mappings
922 .get_mut(uuid)
923 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
924 if let Some(b) = body["BatchSize"].as_i64() {
925 esm.batch_size = b;
926 }
927 if let Some(name) = body["FunctionName"].as_str() {
928 esm.function_arn = format!(
929 "arn:aws:lambda:{}:{}:function:{}",
930 state.region, state.account_id, name
931 );
932 }
933 if let Some(filters) = body
934 .get("FilterCriteria")
935 .and_then(|v| v.get("Filters"))
936 .and_then(|v| v.as_array())
937 {
938 esm.filter_patterns = filters
939 .iter()
940 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
941 .collect();
942 }
943 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
944 esm.function_response_types = types
945 .iter()
946 .filter_map(|v| v.as_str().map(String::from))
947 .collect();
948 }
949 if let Some(w) = body
950 .get("MaximumBatchingWindowInSeconds")
951 .and_then(|v| v.as_i64())
952 {
953 esm.maximum_batching_window_in_seconds = Some(w);
954 }
955 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
956 esm.parallelization_factor = Some(p);
957 }
958 if let Some(s) = body.get("KMSKeyArn").and_then(|v| v.as_str()) {
959 esm.kms_key_arn = Some(s.to_string());
960 }
961 if let Some(mc) = body.get("MetricsConfig") {
962 esm.metrics_config = Some(mc.clone());
963 }
964 if let Some(dc) = body.get("DestinationConfig") {
965 esm.destination_config = Some(dc.clone());
966 }
967 if let Some(n) = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64()) {
968 esm.maximum_retry_attempts = Some(n);
969 }
970 if let Some(n) = body
971 .get("MaximumRecordAgeInSeconds")
972 .and_then(|v| v.as_i64())
973 {
974 esm.maximum_record_age_in_seconds = Some(n);
975 }
976 if let Some(b) = body
977 .get("BisectBatchOnFunctionError")
978 .and_then(|v| v.as_bool())
979 {
980 esm.bisect_batch_on_function_error = Some(b);
981 }
982 if let Some(n) = body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64()) {
983 esm.tumbling_window_in_seconds = Some(n);
984 }
985 let mut body_json = json!({
986 "UUID": esm.uuid,
987 "FunctionArn": esm.function_arn,
988 "EventSourceArn": esm.event_source_arn,
989 "BatchSize": esm.batch_size,
990 "State": "Enabled",
991 "StateTransitionReason": "USER_INITIATED",
992 "LastModified": chrono::Utc::now().timestamp() as f64,
993 });
994 let obj = body_json.as_object_mut().expect("json! built object");
995 if !esm.filter_patterns.is_empty() {
996 obj.insert(
997 "FilterCriteria".into(),
998 json!({
999 "Filters": esm
1000 .filter_patterns
1001 .iter()
1002 .map(|p| json!({"Pattern": p}))
1003 .collect::<Vec<_>>(),
1004 }),
1005 );
1006 }
1007 if !esm.function_response_types.is_empty() {
1008 obj.insert(
1009 "FunctionResponseTypes".into(),
1010 json!(esm.function_response_types),
1011 );
1012 }
1013 if let Some(w) = esm.maximum_batching_window_in_seconds {
1014 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1015 }
1016 if let Some(p) = esm.parallelization_factor {
1017 obj.insert("ParallelizationFactor".into(), json!(p));
1018 }
1019 ok(body_json)
1020 }
1021
1022 fn region_for(&self, account_id: &str) -> String {
1023 let accounts = self.state.read();
1024 accounts
1025 .get(account_id)
1026 .map(|s| s.region.clone())
1027 .unwrap_or_else(|| "us-east-1".to_string())
1028 }
1029}
1030
1031fn extract_csc_id(input: &str) -> String {
1032 let decoded = percent_decode(input);
1035 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1036}
1037
1038pub(crate) fn percent_decode_for_length(input: &str) -> String {
1041 percent_decode(input)
1042}
1043
1044fn percent_decode(input: &str) -> String {
1045 let mut out = String::with_capacity(input.len());
1046 let bytes = input.as_bytes();
1047 let mut i = 0;
1048 while i < bytes.len() {
1049 if bytes[i] == b'%' && i + 2 < bytes.len() {
1050 let hi = (bytes[i + 1] as char).to_digit(16);
1051 let lo = (bytes[i + 2] as char).to_digit(16);
1052 if let (Some(h), Some(l)) = (hi, lo) {
1053 out.push(((h * 16 + l) as u8) as char);
1054 i += 3;
1055 continue;
1056 }
1057 }
1058 out.push(bytes[i] as char);
1059 i += 1;
1060 }
1061 out
1062}
1063
1064fn code_signing_json(c: &CodeSigningConfig) -> Value {
1065 json!({
1066 "CodeSigningConfigId": c.csc_id,
1067 "CodeSigningConfigArn": c.csc_arn,
1068 "Description": c.description,
1069 "AllowedPublishers": {
1070 "SigningProfileVersionArns": c.allowed_publishers,
1071 },
1072 "CodeSigningPolicies": {
1073 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1074 },
1075 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1076 })
1077}
1078
1079fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1080 let destination = match &c.destination_config {
1091 None => json!({"OnSuccess": {}, "OnFailure": {}}),
1092 Some(v) if !v.is_object() => json!({}),
1093 Some(v) => {
1094 let mut map = v.as_object().cloned().unwrap_or_default();
1095 if !map.is_empty() {
1096 map.entry("OnSuccess".to_string()).or_insert(json!({}));
1097 map.entry("OnFailure".to_string()).or_insert(json!({}));
1098 }
1099 Value::Object(map)
1100 }
1101 };
1102 json!({
1103 "FunctionArn": c.function_arn,
1104 "MaximumEventAgeInSeconds": c.maximum_event_age,
1105 "MaximumRetryAttempts": c.maximum_retry_attempts,
1106 "DestinationConfig": destination,
1107 "LastModified": c
1113 .last_modified
1114 .timestamp_millis() as f64
1115 / 1000.0,
1116 })
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use crate::service::LambdaService;
1122 use crate::state::{LambdaState, SharedLambdaState};
1123 use fakecloud_core::multi_account::MultiAccountState;
1124 use fakecloud_core::service::AwsRequest;
1125 use http::Method;
1126 use parking_lot::RwLock;
1127 use std::collections::HashMap;
1128 use std::sync::Arc;
1129
1130 fn svc() -> LambdaService {
1131 let state: SharedLambdaState = Arc::new(RwLock::new(
1132 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1133 ));
1134 LambdaService::new(state)
1135 }
1136
1137 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1138 AwsRequest {
1139 service: "lambda".to_string(),
1140 method: Method::POST,
1141 raw_path: format!("/{}", segs.join("/")),
1142 raw_query: String::new(),
1143 path_segments: segs.iter().map(|s| s.to_string()).collect(),
1144 query_params: HashMap::new(),
1145 headers: http::HeaderMap::new(),
1146 body: bytes::Bytes::from(body.to_string()),
1147 body_stream: parking_lot::Mutex::new(None),
1148 account_id: "000000000000".to_string(),
1149 region: "us-east-1".to_string(),
1150 request_id: "rid".to_string(),
1151 action: action.to_string(),
1152 is_query_protocol: false,
1153 access_key_id: None,
1154 principal: None,
1155 }
1156 }
1157
1158 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1159 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1160 match r {
1161 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1162 Err(e) => panic!("{action} failed: {e:?}"),
1163 }
1164 }
1165
1166 #[tokio::test]
1167 async fn read_only_listings_succeed_without_state() {
1168 let s = svc();
1169 run(&s, "GetAccountSettings", "", None, &[]).await;
1170 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1171 run(&s, "ListLayers", "", None, &[]).await;
1172 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1173 }
1174
1175 #[tokio::test]
1176 async fn layers_lifecycle() {
1177 let s = svc();
1178 run(
1179 &s,
1180 "PublishLayerVersion",
1181 r#"{"Content":{"ZipFile":""}}"#,
1182 Some("layer1"),
1183 &["2018-10-31", "layers", "layer1", "versions"],
1184 )
1185 .await;
1186 run(&s, "ListLayers", "", None, &[]).await;
1187 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1188 }
1189
1190 #[tokio::test]
1191 async fn code_signing_lifecycle() {
1192 let s = svc();
1193 run(
1194 &s,
1195 "CreateCodeSigningConfig",
1196 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
1197 None,
1198 &[],
1199 )
1200 .await;
1201 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
1202 }
1203}