1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::{Method, StatusCode};
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::ContainerRuntime;
15use crate::state::{
16 EventSourceMapping, LambdaFunction, LambdaSnapshot, LambdaState, SharedLambdaState,
17 LAMBDA_SNAPSHOT_SCHEMA_VERSION,
18};
19
20fn invalid_param(msg: impl Into<String>) -> AwsServiceError {
21 AwsServiceError::aws_error(
22 StatusCode::BAD_REQUEST,
23 "InvalidParameterValueException",
24 msg,
25 )
26}
27
28fn check_len(field: &str, v: &str, min: usize, max: usize) -> Result<(), AwsServiceError> {
29 if v.len() < min || v.len() > max {
30 return Err(invalid_param(format!(
31 "{field} length must be in [{min},{max}], got {}",
32 v.len()
33 )));
34 }
35 Ok(())
36}
37
38fn check_optional_len(
39 field: &str,
40 v: Option<&str>,
41 min: usize,
42 max: usize,
43) -> Result<(), AwsServiceError> {
44 if let Some(s) = v {
45 check_len(field, s, min, max)?;
46 }
47 Ok(())
48}
49
50fn check_optional_int_range(
51 field: &str,
52 v: Option<i64>,
53 min: i64,
54 max: i64,
55) -> Result<(), AwsServiceError> {
56 if let Some(n) = v {
57 if n < min || n > max {
58 return Err(invalid_param(format!(
59 "{field} must be in [{min},{max}], got {n}"
60 )));
61 }
62 }
63 Ok(())
64}
65
66const LAMBDA_PUBLISH_TO_VALUES: &[&str] = &["LATEST_PUBLISHED"];
67
68const LAMBDA_RUNTIMES: &[&str] = &[
74 "nodejs",
75 "nodejs4.3",
76 "nodejs4.3-edge",
77 "nodejs6.10",
78 "nodejs8.10",
79 "nodejs10.x",
80 "nodejs12.x",
81 "nodejs14.x",
82 "nodejs16.x",
83 "nodejs18.x",
84 "nodejs20.x",
85 "nodejs22.x",
86 "nodejs24.x",
87 "java8",
88 "java8.al2",
89 "java11",
90 "java17",
91 "java21",
92 "java25",
93 "python2.7",
94 "python3.6",
95 "python3.7",
96 "python3.8",
97 "python3.9",
98 "python3.10",
99 "python3.11",
100 "python3.12",
101 "python3.13",
102 "python3.14",
103 "dotnetcore1.0",
104 "dotnetcore2.0",
105 "dotnetcore2.1",
106 "dotnetcore3.1",
107 "dotnet6",
108 "dotnet8",
109 "dotnet10",
110 "go1.x",
111 "ruby2.5",
112 "ruby2.7",
113 "ruby3.2",
114 "ruby3.3",
115 "ruby3.4",
116 "provided",
117 "provided.al2",
118 "provided.al2023",
119];
120
121fn check_optional_enum(
122 field: &str,
123 v: Option<&str>,
124 allowed: &[&str],
125) -> Result<(), AwsServiceError> {
126 if let Some(s) = v {
127 if !allowed.contains(&s) {
128 return Err(invalid_param(format!(
129 "{field} must be one of the enum values, got '{s}'"
130 )));
131 }
132 }
133 Ok(())
134}
135
136fn prevalidate_lambda(action: &str, req: &AwsRequest) -> Result<(), AwsServiceError> {
137 let body: Value = serde_json::from_slice(&req.body).unwrap_or(Value::Null);
138 match action {
139 "PublishVersion" => {
140 check_optional_len("Description", body["Description"].as_str(), 0, 256)?;
141 check_optional_enum(
142 "PublishTo",
143 body["PublishTo"].as_str(),
144 LAMBDA_PUBLISH_TO_VALUES,
145 )?;
146 }
147 "UpdateFunctionCode" => {
148 check_optional_enum(
149 "PublishTo",
150 body["PublishTo"].as_str(),
151 LAMBDA_PUBLISH_TO_VALUES,
152 )?;
153 check_optional_len("S3Bucket", body["S3Bucket"].as_str(), 3, 63)?;
154 check_optional_len("S3Key", body["S3Key"].as_str(), 1, 1024)?;
155 check_optional_len("S3ObjectVersion", body["S3ObjectVersion"].as_str(), 1, 1024)?;
156 }
157 "UpdateFunctionConfiguration" => {
158 check_optional_len("Description", body["Description"].as_str(), 0, 256)?;
159 check_optional_len("Handler", body["Handler"].as_str(), 0, 128)?;
160 check_optional_int_range("MemorySize", body["MemorySize"].as_i64(), 128, 32768)?;
161 check_optional_int_range("Timeout", body["Timeout"].as_i64(), 1, i64::MAX)?;
162 check_optional_enum("Runtime", body["Runtime"].as_str(), LAMBDA_RUNTIMES)?;
163 }
164 _ => {}
165 }
166 Ok(())
167}
168
169pub(crate) fn action_takes_function_name(action: &str) -> bool {
174 matches!(
175 action,
176 "GetFunction"
177 | "DeleteFunction"
178 | "Invoke"
179 | "InvokeAsync"
180 | "InvokeWithResponseStream"
181 | "PublishVersion"
182 | "ListVersionsByFunction"
183 | "AddPermission"
184 | "RemovePermission"
185 | "GetPolicy"
186 | "GetFunctionConfiguration"
187 | "UpdateFunctionConfiguration"
188 | "UpdateFunctionCode"
189 | "GetFunctionConcurrency"
190 | "PutFunctionConcurrency"
191 | "DeleteFunctionConcurrency"
192 | "PutProvisionedConcurrencyConfig"
193 | "GetProvisionedConcurrencyConfig"
194 | "DeleteProvisionedConcurrencyConfig"
195 | "ListProvisionedConcurrencyConfigs"
196 | "PutFunctionEventInvokeConfig"
197 | "UpdateFunctionEventInvokeConfig"
198 | "GetFunctionEventInvokeConfig"
199 | "DeleteFunctionEventInvokeConfig"
200 | "ListFunctionEventInvokeConfigs"
201 | "CreateFunctionUrlConfig"
202 | "UpdateFunctionUrlConfig"
203 | "GetFunctionUrlConfig"
204 | "DeleteFunctionUrlConfig"
205 | "ListFunctionUrlConfigs"
206 | "PutFunctionCodeSigningConfig"
207 | "GetFunctionCodeSigningConfig"
208 | "DeleteFunctionCodeSigningConfig"
209 | "GetFunctionScalingConfig"
210 | "PutFunctionScalingConfig"
211 | "PutFunctionRecursionConfig"
212 | "GetFunctionRecursionConfig"
213 | "CreateAlias"
214 | "GetAlias"
215 | "ListAliases"
216 | "UpdateAlias"
217 | "DeleteAlias"
218 | "PutRuntimeManagementConfig"
219 | "GetRuntimeManagementConfig"
220 )
221}
222
223pub(crate) fn normalize_function_name(input: &str) -> String {
238 if input.is_empty() {
239 return String::new();
240 }
241
242 let decoded = percent_encoding::percent_decode_str(input)
247 .decode_utf8_lossy()
248 .into_owned();
249 let input = decoded.as_str();
250
251 if let Some(rest) = input.strip_prefix("arn:aws:lambda:") {
253 let parts: Vec<&str> = rest.splitn(5, ':').collect();
254 if parts.len() >= 4 && parts[2] == "function" && !parts[3].is_empty() {
256 return parts[3].to_string();
257 }
258 return input.to_string();
259 }
260
261 let parts: Vec<&str> = input.splitn(4, ':').collect();
263 if parts.len() >= 3 && parts[1] == "function" && parts[0].chars().all(|c| c.is_ascii_digit()) {
264 if !parts[2].is_empty() {
265 return parts[2].to_string();
266 }
267 return input.to_string();
268 }
269
270 if input.matches(':').count() == 1 {
276 if let Some((name, _qualifier)) = input.split_once(':') {
277 if !name.is_empty() && name.chars().all(is_function_name_char) {
278 return name.to_string();
279 }
280 }
281 }
282
283 input.to_string()
284}
285
286fn is_function_name_char(c: char) -> bool {
287 c.is_ascii_alphanumeric() || c == '-' || c == '_'
288}
289
290pub(crate) fn validate_ephemeral_storage(size: i64) -> Result<i64, AwsServiceError> {
295 if !(512..=10240).contains(&size) {
296 return Err(AwsServiceError::aws_error(
297 StatusCode::BAD_REQUEST,
298 "InvalidParameterValueException",
299 format!(
300 "Value {size} at 'ephemeralStorage.size' failed to satisfy constraint: \
301 Member must satisfy constraint: [Member must have value less than or equal to 10240, \
302 Member must have value greater than or equal to 512]"
303 ),
304 ));
305 }
306 Ok(size)
307}
308
309struct CreateFunctionInput {
313 function_name: String,
314 runtime: String,
315 role: String,
316 handler: String,
317 description: String,
318 timeout: i64,
319 memory_size: i64,
320 package_type: String,
321 tags: BTreeMap<String, String>,
322 environment: BTreeMap<String, String>,
323 architectures: Vec<String>,
324 code_zip: Option<Vec<u8>>,
325 code_fallback: Vec<u8>,
326 image_uri: Option<String>,
327 layer_arns: Vec<String>,
328 tracing_mode: Option<String>,
329 kms_key_arn: Option<String>,
330 ephemeral_storage_size: Option<i64>,
331 vpc_config: Option<serde_json::Value>,
332 snap_start: Option<serde_json::Value>,
333 dead_letter_config_arn: Option<String>,
334 file_system_configs: Vec<serde_json::Value>,
335 logging_config: Option<serde_json::Value>,
336 image_config: Option<serde_json::Value>,
337 durable_config: Option<serde_json::Value>,
338}
339
340impl CreateFunctionInput {
341 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
342 let function_name = body["FunctionName"]
343 .as_str()
344 .ok_or_else(|| {
345 AwsServiceError::aws_error(
346 StatusCode::BAD_REQUEST,
347 "InvalidParameterValueException",
348 "FunctionName is required",
349 )
350 })?
351 .to_string();
352
353 let tags: BTreeMap<String, String> = body["Tags"]
354 .as_object()
355 .map(|m| {
356 m.iter()
357 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
358 .collect()
359 })
360 .unwrap_or_default();
361
362 let environment: BTreeMap<String, String> = body["Environment"]["Variables"]
363 .as_object()
364 .map(|m| {
365 m.iter()
366 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
367 .collect()
368 })
369 .unwrap_or_default();
370
371 let architectures = body["Architectures"]
372 .as_array()
373 .map(|a| {
374 a.iter()
375 .filter_map(|v| v.as_str().map(|s| s.to_string()))
376 .collect()
377 })
378 .unwrap_or_else(|| vec!["x86_64".to_string()]);
379
380 let code_zip: Option<Vec<u8>> = match body["Code"]["ZipFile"].as_str() {
381 Some(b64) => Some(
382 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
383 |_| {
384 AwsServiceError::aws_error(
385 StatusCode::BAD_REQUEST,
386 "InvalidParameterValueException",
387 "Could not decode Code.ZipFile: invalid base64",
388 )
389 },
390 )?,
391 ),
392 None => None,
393 };
394
395 let code_fallback = serde_json::to_vec(&body["Code"]).unwrap_or_default();
396
397 let package_type = body["PackageType"].as_str().unwrap_or("Zip").to_string();
398 let image_uri = if package_type == "Image" {
403 body["Code"]["ImageUri"].as_str().map(String::from)
404 } else {
405 None
406 };
407
408 if package_type == "Image" && image_uri.is_none() {
412 return Err(AwsServiceError::aws_error(
413 StatusCode::BAD_REQUEST,
414 "InvalidParameterValueException",
415 "Code.ImageUri is required when PackageType is Image",
416 ));
417 }
418
419 let layer_arns: Vec<String> = body["Layers"]
420 .as_array()
421 .map(|arr| {
422 arr.iter()
423 .filter_map(|v| v.as_str().map(String::from))
424 .collect()
425 })
426 .unwrap_or_default();
427
428 let tracing_mode = body["TracingConfig"]["Mode"].as_str().map(String::from);
429 let kms_key_arn = body["KMSKeyArn"].as_str().map(String::from);
430 let ephemeral_storage_size = match body["EphemeralStorage"]["Size"].as_i64() {
431 Some(size) => Some(validate_ephemeral_storage(size)?),
432 None => None,
433 };
434 let vpc_config = body["VpcConfig"]
435 .is_object()
436 .then(|| body["VpcConfig"].clone());
437 let snap_start = body["SnapStart"]
438 .is_object()
439 .then(|| body["SnapStart"].clone());
440 let dead_letter_config_arn = body["DeadLetterConfig"]["TargetArn"]
441 .as_str()
442 .map(String::from);
443 let file_system_configs = body["FileSystemConfigs"]
444 .as_array()
445 .cloned()
446 .unwrap_or_default();
447 let logging_config = body["LoggingConfig"]
448 .is_object()
449 .then(|| body["LoggingConfig"].clone());
450 let image_config = body["ImageConfig"]
451 .is_object()
452 .then(|| body["ImageConfig"].clone());
453 let durable_config = body["DurableConfig"]
454 .is_object()
455 .then(|| body["DurableConfig"].clone());
456
457 Ok(Self {
458 function_name,
459 runtime: body["Runtime"].as_str().unwrap_or("python3.12").to_string(),
460 role: body["Role"].as_str().unwrap_or("").to_string(),
461 handler: body["Handler"]
462 .as_str()
463 .unwrap_or("index.handler")
464 .to_string(),
465 description: body["Description"].as_str().unwrap_or("").to_string(),
466 timeout: body["Timeout"].as_i64().unwrap_or(3),
467 memory_size: body["MemorySize"].as_i64().unwrap_or(128),
468 package_type,
469 tags,
470 environment,
471 architectures,
472 code_zip,
473 code_fallback,
474 image_uri,
475 layer_arns,
476 tracing_mode,
477 kms_key_arn,
478 ephemeral_storage_size,
479 vpc_config,
480 snap_start,
481 dead_letter_config_arn,
482 file_system_configs,
483 logging_config,
484 image_config,
485 durable_config,
486 })
487 }
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub enum InvocationType {
493 RequestResponse,
494 Event,
495 DryRun,
496}
497
498impl InvocationType {
499 pub fn from_header(value: Option<&str>) -> Self {
500 match value {
501 Some("Event") => Self::Event,
502 Some("DryRun") => Self::DryRun,
503 _ => Self::RequestResponse,
504 }
505 }
506}
507
508fn route_to_destination(
512 bus: Arc<fakecloud_core::delivery::DeliveryBus>,
513 function_arn: &str,
514 request_payload: &[u8],
515 result: &Result<Vec<u8>, String>,
516 destination_config: Option<&serde_json::Value>,
517) {
518 let Some(cfg) = destination_config else {
519 return;
520 };
521 let (key, condition, response_value): (&str, &str, serde_json::Value) = match result {
522 Ok(bytes) => (
523 "OnSuccess",
524 "Success",
525 serde_json::from_slice(bytes).unwrap_or(serde_json::Value::Null),
526 ),
527 Err(err) => (
528 "OnFailure",
529 "RetriesExhausted",
530 serde_json::json!({ "errorMessage": err }),
531 ),
532 };
533 let Some(dest) = cfg
534 .get(key)
535 .and_then(|v| v.get("Destination"))
536 .and_then(|v| v.as_str())
537 else {
538 return;
539 };
540 let request_payload_v: serde_json::Value =
541 serde_json::from_slice(request_payload).unwrap_or(serde_json::Value::Null);
542 let record = serde_json::json!({
543 "version": "1.0",
544 "timestamp": chrono::Utc::now().to_rfc3339(),
545 "requestContext": {
546 "requestId": uuid::Uuid::new_v4().to_string(),
547 "functionArn": format!("{function_arn}:$LATEST"),
548 "condition": condition,
549 "approximateInvokeCount": 1,
550 },
551 "requestPayload": request_payload_v,
552 "responseContext": {
553 "statusCode": 200,
554 "executedVersion": "$LATEST",
555 },
556 "responsePayload": response_value,
557 });
558 let body = record.to_string();
559 if dest.contains(":sqs:") {
560 bus.send_to_sqs(dest, &body, &std::collections::HashMap::new());
561 } else if dest.contains(":sns:") {
562 bus.publish_to_sns(dest, &body, None);
563 } else if dest.contains(":lambda:") {
564 let dest = dest.to_string();
565 let payload = body.clone();
566 tokio::spawn(async move {
567 let _ = bus.invoke_lambda(&dest, &payload).await;
568 });
569 } else if dest.contains(":events:") || dest.contains(":eventbridge:") {
570 let detail_type = if result.is_ok() {
571 "Lambda Function Invocation Result - Success"
572 } else {
573 "Lambda Function Invocation Result - Failure"
574 };
575 bus.put_event_to_eventbridge("lambda", detail_type, &body, "default");
576 }
577}
578
579pub(crate) struct ConcurrencyGuard {
585 pub(crate) map: Arc<parking_lot::RwLock<BTreeMap<String, i64>>>,
586 pub(crate) key: String,
587}
588
589impl Drop for ConcurrencyGuard {
590 fn drop(&mut self) {
591 let mut m = self.map.write();
592 let n = m.get(&self.key).copied().unwrap_or(0);
593 if n <= 1 {
594 m.remove(&self.key);
595 } else {
596 m.insert(self.key.clone(), n - 1);
597 }
598 }
599}
600
601fn function_config_unchanged_for_publish(
617 prev: &LambdaFunction,
618 live: &LambdaFunction,
619 effective_description: &str,
620) -> bool {
621 prev.code_sha256 == live.code_sha256
622 && prev.code_size == live.code_size
623 && prev.image_uri == live.image_uri
624 && prev.package_type == live.package_type
625 && prev.runtime == live.runtime
626 && prev.role == live.role
627 && prev.handler == live.handler
628 && prev.description == effective_description
629 && prev.timeout == live.timeout
630 && prev.memory_size == live.memory_size
631 && prev.environment == live.environment
632 && prev.architectures == live.architectures
633 && prev.layers.len() == live.layers.len()
634 && prev
635 .layers
636 .iter()
637 .zip(live.layers.iter())
638 .all(|(a, b)| a.arn == b.arn && a.code_size == b.code_size)
639 && prev.tracing_mode == live.tracing_mode
640 && prev.kms_key_arn == live.kms_key_arn
641 && prev.ephemeral_storage_size == live.ephemeral_storage_size
642 && prev.vpc_config == live.vpc_config
643 && prev.dead_letter_config_arn == live.dead_letter_config_arn
644 && prev.file_system_configs == live.file_system_configs
645 && prev.logging_config == live.logging_config
646 && prev.image_config == live.image_config
647 && prev.signing_profile_version_arn == live.signing_profile_version_arn
648 && prev.signing_job_arn == live.signing_job_arn
649 && prev.runtime_version_config == live.runtime_version_config
650 && snap_start_apply_on_eq(prev.snap_start.as_ref(), live.snap_start.as_ref())
651}
652
653fn snap_start_apply_on_eq(prev: Option<&Value>, live: Option<&Value>) -> bool {
662 let prev_apply = prev
663 .and_then(|v| v.get("ApplyOn"))
664 .and_then(|v| v.as_str())
665 .unwrap_or("None");
666 let live_apply = live
667 .and_then(|v| v.get("ApplyOn"))
668 .and_then(|v| v.as_str())
669 .unwrap_or("None");
670 prev_apply == live_apply
671}
672
673pub(crate) fn resolve_qualifier_to_version(
676 state: &LambdaState,
677 function_name: &str,
678 qualifier: Option<&str>,
679) -> Option<String> {
680 let q = qualifier?;
681 if q == "$LATEST" {
682 return None;
683 }
684 if q.chars().all(|c| c.is_ascii_digit()) {
685 return Some(q.to_string());
686 }
687 let alias_key = format!("{function_name}:{q}");
688 let alias = state.aliases.get(&alias_key)?;
689 let primary = alias.function_version.clone();
690 let routing = alias
691 .routing_config
692 .as_ref()
693 .and_then(|rc| rc.get("AdditionalVersionWeights"))
694 .and_then(|m| m.as_object());
695 let Some(weights) = routing else {
696 return Some(primary);
697 };
698 let mut additional: Vec<(String, f64)> = Vec::with_capacity(weights.len());
701 let mut sum: f64 = 0.0;
702 for (ver, w) in weights {
703 let weight = w.as_f64().unwrap_or(0.0).clamp(0.0, 1.0);
704 sum += weight;
705 additional.push((ver.clone(), weight));
706 }
707 let primary_weight = (1.0 - sum).max(0.0);
708 let pick: f64 = {
709 use std::cell::Cell;
714 thread_local! {
715 static RNG: Cell<u64> = const { Cell::new(0x9E37_79B9_7F4A_7C15) };
716 }
717 let now_nanos = std::time::SystemTime::now()
718 .duration_since(std::time::UNIX_EPOCH)
719 .map(|d| d.as_nanos() as u64)
720 .unwrap_or(0);
721 RNG.with(|cell| {
722 let mut s = cell.get() ^ now_nanos;
723 s = s.wrapping_add(0x9E37_79B9_7F4A_7C15);
725 let mut z = s;
726 z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
727 z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
728 z ^= z >> 31;
729 cell.set(s);
730 (z >> 11) as f64 / ((1u64 << 53) as f64)
731 })
732 };
733 let mut acc = primary_weight;
734 if pick < acc {
735 return Some(primary);
736 }
737 for (ver, w) in &additional {
738 acc += w;
739 if pick < acc {
740 return Some(ver.clone());
741 }
742 }
743 Some(primary)
744}
745
746pub struct LambdaService {
747 pub(crate) state: SharedLambdaState,
748 pub(crate) runtime: Option<Arc<ContainerRuntime>>,
749 snapshot_store: Option<Arc<dyn SnapshotStore>>,
750 snapshot_lock: Arc<AsyncMutex<()>>,
751 pub(crate) delivery_bus: Option<Arc<fakecloud_core::delivery::DeliveryBus>>,
752 pub(crate) role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
753 pub(crate) s3_delivery: Option<Arc<dyn fakecloud_core::delivery::S3Delivery>>,
754 pub(crate) inflight_invocations: Arc<parking_lot::RwLock<BTreeMap<String, i64>>>,
761}
762
763mod functions;
764mod init;
765mod invoke;
766mod publish;
767
768impl LambdaService {
769 pub fn new(state: SharedLambdaState) -> Self {
770 Self {
771 state,
772 runtime: None,
773 snapshot_store: None,
774 snapshot_lock: Arc::new(AsyncMutex::new(())),
775 delivery_bus: None,
776 role_trust_validator: None,
777 s3_delivery: None,
778 inflight_invocations: Arc::new(parking_lot::RwLock::new(BTreeMap::new())),
779 }
780 }
781
782 pub fn with_s3_delivery(mut self, s3: Arc<dyn fakecloud_core::delivery::S3Delivery>) -> Self {
783 self.s3_delivery = Some(s3);
784 self
785 }
786
787 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
788 self.runtime = Some(runtime);
789 self
790 }
791
792 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
793 self.snapshot_store = Some(store);
794 self
795 }
796
797 pub fn with_delivery_bus(mut self, bus: Arc<fakecloud_core::delivery::DeliveryBus>) -> Self {
798 self.delivery_bus = Some(bus);
799 self
800 }
801
802 pub fn with_role_trust_validator(
803 mut self,
804 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
805 ) -> Self {
806 self.role_trust_validator = Some(validator);
807 self
808 }
809
810 async fn save_snapshot(&self) {
811 let Some(store) = self.snapshot_store.clone() else {
812 return;
813 };
814 let _guard = self.snapshot_lock.lock().await;
815 let snapshot = LambdaSnapshot {
816 schema_version: LAMBDA_SNAPSHOT_SCHEMA_VERSION,
817 accounts: Some(self.state.read().clone()),
818 state: None,
819 };
820 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
821 let bytes = serde_json::to_vec(&snapshot)
822 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
823 store.save(&bytes)
824 })
825 .await;
826 match join {
827 Ok(Ok(())) => {}
828 Ok(Err(err)) => tracing::error!(%err, "failed to write lambda snapshot"),
829 Err(err) => tracing::error!(%err, "lambda snapshot task panicked"),
830 }
831 }
832}
833
834#[async_trait]
835impl AwsService for LambdaService {
836 fn service_name(&self) -> &str {
837 "lambda"
838 }
839
840 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
841 let (action, resource_name) = Self::resolve_action(&req).ok_or_else(|| {
842 const KNOWN_COLLECTIONS: &[&str] = &[
853 "functions",
854 "layers",
855 "layers-by-arn",
856 "event-source-mappings",
857 "tags",
858 "account-settings",
859 "code-signing-configs",
860 ];
861 let is_known_collection = req
862 .path_segments
863 .get(1)
864 .map(|s| KNOWN_COLLECTIONS.contains(&s.as_str()))
865 .unwrap_or(false);
866 if is_known_collection {
867 AwsServiceError::aws_error(
868 StatusCode::BAD_REQUEST,
869 "InvalidParameterValueException",
870 format!(
871 "Could not route request {} {} — missing or invalid identifier",
872 req.method, req.raw_path
873 ),
874 )
875 } else {
876 AwsServiceError::aws_error(
877 StatusCode::NOT_FOUND,
878 "UnknownOperationException",
879 format!("Unknown operation: {} {}", req.method, req.raw_path),
880 )
881 }
882 })?;
883
884 let resource_name = if action_takes_function_name(action) {
889 if let Some(raw) = resource_name.as_ref() {
896 let decoded = crate::extras::percent_decode_for_length(raw);
901 let len = decoded.chars().count();
902 let limit = if decoded.starts_with("arn:") {
913 200
914 } else {
915 140
916 };
917 if decoded.is_empty() || len > limit {
918 let (code, msg) = if action == "InvokeAsync" {
919 (
920 "ResourceNotFoundException",
921 format!("Function not found: {}", raw),
922 )
923 } else {
924 (
925 "InvalidParameterValueException",
926 format!(
927 "1 validation error detected: Value '{}' at 'functionName' failed to \
928 satisfy constraint: Member must have length less than or equal to 140",
929 raw
930 ),
931 )
932 };
933 return Err(AwsServiceError::aws_error(
934 if action == "InvokeAsync" {
935 StatusCode::NOT_FOUND
936 } else {
937 StatusCode::BAD_REQUEST
938 },
939 code,
940 msg,
941 ));
942 }
943 }
944 resource_name.map(|s| normalize_function_name(&s))
945 } else {
946 resource_name
947 };
948
949 if let Some(raw) = req.query_params.get("MaxItems") {
955 let n = raw.parse::<i64>().map_err(|_| {
959 AwsServiceError::aws_error(
960 StatusCode::BAD_REQUEST,
961 "InvalidParameterValueException",
962 format!("MaxItems must be a number (got '{raw}')"),
963 )
964 })?;
965 let max = match action {
966 "ListLayers"
967 | "ListLayerVersions"
968 | "ListFunctionUrlConfigs"
969 | "ListProvisionedConcurrencyConfigs"
970 | "ListFunctionEventInvokeConfigs"
971 | "ListAliases" => 50,
972 _ => 10000,
973 };
974 if !(1..=max).contains(&n) {
975 return Err(AwsServiceError::aws_error(
976 StatusCode::BAD_REQUEST,
977 "InvalidParameterValueException",
978 format!("MaxItems must be between 1 and {} (got {})", max, n),
979 ));
980 }
981 }
982
983 if let Some(q) = req.query_params.get("Qualifier") {
988 let len = q.chars().count();
989 if q.is_empty() || len > 128 {
990 return Err(AwsServiceError::aws_error(
991 StatusCode::BAD_REQUEST,
992 "InvalidParameterValueException",
993 format!("Qualifier must be 1..128 characters (got length {})", len),
994 ));
995 }
996 }
997 if let Some(fv) = req.query_params.get("FunctionVersion") {
1000 let len = fv.chars().count();
1001 if fv.is_empty() || len > 1024 {
1002 return Err(AwsServiceError::aws_error(
1003 StatusCode::BAD_REQUEST,
1004 "InvalidParameterValueException",
1005 format!(
1006 "FunctionVersion must be 1..1024 characters (got length {})",
1007 len
1008 ),
1009 ));
1010 }
1011 }
1012
1013 let mutates = matches!(
1014 action,
1015 "CreateFunction"
1016 | "DeleteFunction"
1017 | "PublishVersion"
1018 | "AddPermission"
1019 | "RemovePermission"
1020 | "CreateEventSourceMapping"
1021 | "DeleteEventSourceMapping"
1022 | "UpdateEventSourceMapping"
1023 | "UpdateFunctionCode"
1024 | "UpdateFunctionConfiguration"
1025 | "CreateAlias"
1026 | "DeleteAlias"
1027 | "UpdateAlias"
1028 | "PublishLayerVersion"
1029 | "DeleteLayerVersion"
1030 | "AddLayerVersionPermission"
1031 | "RemoveLayerVersionPermission"
1032 | "CreateFunctionUrlConfig"
1033 | "DeleteFunctionUrlConfig"
1034 | "UpdateFunctionUrlConfig"
1035 | "PutFunctionConcurrency"
1036 | "DeleteFunctionConcurrency"
1037 | "PutProvisionedConcurrencyConfig"
1038 | "DeleteProvisionedConcurrencyConfig"
1039 | "CreateCodeSigningConfig"
1040 | "UpdateCodeSigningConfig"
1041 | "DeleteCodeSigningConfig"
1042 | "PutFunctionCodeSigningConfig"
1043 | "DeleteFunctionCodeSigningConfig"
1044 | "PutFunctionEventInvokeConfig"
1045 | "UpdateFunctionEventInvokeConfig"
1046 | "DeleteFunctionEventInvokeConfig"
1047 | "PutRuntimeManagementConfig"
1048 | "PutFunctionScalingConfig"
1049 | "PutFunctionRecursionConfig"
1050 | "TagResource"
1051 | "UntagResource"
1052 | "InvokeAsync"
1053 | "InvokeWithResponseStream"
1054 );
1055
1056 let aid = &req.account_id;
1057 prevalidate_lambda(action, &req)?;
1062 let result = match action {
1063 "CreateFunction" => self.create_function(&req),
1064 "ListFunctions" => self.list_functions(
1065 aid,
1066 req.query_params.get("FunctionVersion").map(String::as_str),
1067 ),
1068 "GetFunction" => self.get_function(
1069 &req,
1070 resource_name.as_deref().unwrap_or(""),
1071 aid,
1072 req.region.as_str(),
1073 req.query_params.get("Qualifier").map(String::as_str),
1074 ),
1075 "DeleteFunction" => self.delete_function(
1076 resource_name.as_deref().unwrap_or(""),
1077 aid,
1078 req.query_params.get("Qualifier").map(String::as_str),
1079 ),
1080 "Invoke" => {
1081 let invocation_type = InvocationType::from_header(
1082 req.headers
1083 .get("x-amz-invocation-type")
1084 .and_then(|v| v.to_str().ok()),
1085 );
1086 let qualifier = req.query_params.get("Qualifier").map(String::as_str);
1087 self.invoke(
1088 resource_name.as_deref().unwrap_or(""),
1089 &req.body,
1090 aid,
1091 invocation_type,
1092 qualifier,
1093 )
1094 .await
1095 }
1096 "InvokeAsync" => {
1097 let name = resource_name.as_deref().unwrap_or("");
1103 let accounts = self.state.read();
1104 let exists = accounts
1105 .get(aid)
1106 .map(|s| s.functions.contains_key(name))
1107 .unwrap_or(false);
1108 if !exists {
1109 Err(AwsServiceError::aws_error(
1110 StatusCode::NOT_FOUND,
1111 "ResourceNotFoundException",
1112 format!("Function not found: {}", name),
1113 ))
1114 } else {
1115 Ok(AwsResponse::json(
1116 StatusCode::ACCEPTED,
1117 json!({ "Status": 202 }).to_string(),
1118 ))
1119 }
1120 }
1121 "PublishVersion" => {
1122 self.publish_version(resource_name.as_deref().unwrap_or(""), aid, &req)
1123 }
1124 "AddPermission" => self.add_permission(resource_name.as_deref().unwrap_or(""), &req),
1125 "GetPolicy" => self.get_policy(
1126 resource_name.as_deref().unwrap_or(""),
1127 aid,
1128 req.query_params.get("Qualifier").map(String::as_str),
1129 ),
1130 "RemovePermission" => {
1131 let sid = req.path_segments.get(4).cloned().unwrap_or_default();
1133 self.remove_permission(
1134 resource_name.as_deref().unwrap_or(""),
1135 &sid,
1136 aid,
1137 req.query_params.get("Qualifier").map(String::as_str),
1138 )
1139 }
1140 "CreateEventSourceMapping" => self.create_event_source_mapping(&req),
1141 "ListEventSourceMappings" => {
1142 if let Some(fn_name) = req.query_params.get("FunctionName") {
1146 let len = fn_name.chars().count();
1147 if fn_name.is_empty() || len > 140 {
1148 return Err(AwsServiceError::aws_error(
1149 StatusCode::BAD_REQUEST,
1150 "InvalidParameterValueException",
1151 "FunctionName must be 1..140 characters",
1152 ));
1153 }
1154 }
1155 self.list_event_source_mappings(aid)
1156 }
1157 "GetEventSourceMapping" => {
1158 self.get_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1159 }
1160 "DeleteEventSourceMapping" => {
1161 self.delete_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1162 }
1163 "CreateCapacityProvider" => {
1164 crate::workflows::create_capacity_provider(&self.state, &req, &req.json_body())
1165 }
1166 "GetCapacityProvider" => crate::workflows::get_capacity_provider(
1167 &self.state,
1168 &req,
1169 resource_name.as_deref().unwrap_or(""),
1170 ),
1171 "ListCapacityProviders" => crate::workflows::list_capacity_providers(&self.state, &req),
1172 "UpdateCapacityProvider" => crate::workflows::update_capacity_provider(
1173 &self.state,
1174 &req,
1175 resource_name.as_deref().unwrap_or(""),
1176 &req.json_body(),
1177 ),
1178 "DeleteCapacityProvider" => crate::workflows::delete_capacity_provider(
1179 &self.state,
1180 &req,
1181 resource_name.as_deref().unwrap_or(""),
1182 ),
1183 "ListFunctionVersionsByCapacityProvider" => {
1184 crate::workflows::list_function_versions_by_capacity_provider(
1185 &self.state,
1186 &req,
1187 resource_name.as_deref().unwrap_or(""),
1188 )
1189 }
1190 "GetDurableExecution" => crate::workflows::get_durable_execution(
1191 &self.state,
1192 &req,
1193 resource_name.as_deref().unwrap_or(""),
1194 ),
1195 "GetDurableExecutionHistory" => crate::workflows::get_durable_execution_history(
1196 &self.state,
1197 &req,
1198 resource_name.as_deref().unwrap_or(""),
1199 ),
1200 "GetDurableExecutionState" => crate::workflows::get_durable_execution_state(
1201 &self.state,
1202 &req,
1203 resource_name.as_deref().unwrap_or(""),
1204 ),
1205 "ListDurableExecutionsByFunction" => {
1206 crate::workflows::list_durable_executions_by_function(
1207 &self.state,
1208 &req,
1209 resource_name.as_deref().unwrap_or(""),
1210 )
1211 }
1212 "CheckpointDurableExecution" => crate::workflows::checkpoint_durable_execution(
1213 &self.state,
1214 &req,
1215 resource_name.as_deref().unwrap_or(""),
1216 &req.json_body(),
1217 ),
1218 "StopDurableExecution" => crate::workflows::stop_durable_execution(
1219 &self.state,
1220 &req,
1221 resource_name.as_deref().unwrap_or(""),
1222 ),
1223 "SendDurableExecutionCallbackSuccess" => crate::workflows::send_callback_success(
1224 &self.state,
1225 &req,
1226 resource_name.as_deref().unwrap_or(""),
1227 ),
1228 "SendDurableExecutionCallbackFailure" => crate::workflows::send_callback_failure(
1229 &self.state,
1230 &req,
1231 resource_name.as_deref().unwrap_or(""),
1232 ),
1233 "SendDurableExecutionCallbackHeartbeat" => crate::workflows::send_callback_heartbeat(
1234 &self.state,
1235 &req,
1236 resource_name.as_deref().unwrap_or(""),
1237 ),
1238 other => {
1239 self.handle_extra(other, resource_name.as_deref(), &req)
1240 .await
1241 }
1242 };
1243 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1244 self.save_snapshot().await;
1245 }
1246 result
1247 }
1248
1249 fn supported_actions(&self) -> &[&str] {
1250 &[
1251 "CreateFunction",
1252 "GetFunction",
1253 "DeleteFunction",
1254 "ListFunctions",
1255 "Invoke",
1256 "InvokeAsync",
1257 "InvokeWithResponseStream",
1258 "PublishVersion",
1259 "ListVersionsByFunction",
1260 "AddPermission",
1261 "RemovePermission",
1262 "GetPolicy",
1263 "CreateEventSourceMapping",
1264 "ListEventSourceMappings",
1265 "GetEventSourceMapping",
1266 "UpdateEventSourceMapping",
1267 "DeleteEventSourceMapping",
1268 "GetFunctionConfiguration",
1269 "UpdateFunctionConfiguration",
1270 "UpdateFunctionCode",
1271 "GetAccountSettings",
1272 "CreateAlias",
1273 "GetAlias",
1274 "ListAliases",
1275 "UpdateAlias",
1276 "DeleteAlias",
1277 "PublishLayerVersion",
1278 "GetLayerVersion",
1279 "GetLayerVersionByArn",
1280 "DeleteLayerVersion",
1281 "ListLayerVersions",
1282 "ListLayers",
1283 "GetLayerVersionPolicy",
1284 "AddLayerVersionPermission",
1285 "RemoveLayerVersionPermission",
1286 "CreateFunctionUrlConfig",
1287 "GetFunctionUrlConfig",
1288 "UpdateFunctionUrlConfig",
1289 "DeleteFunctionUrlConfig",
1290 "ListFunctionUrlConfigs",
1291 "PutFunctionConcurrency",
1292 "GetFunctionConcurrency",
1293 "DeleteFunctionConcurrency",
1294 "PutProvisionedConcurrencyConfig",
1295 "GetProvisionedConcurrencyConfig",
1296 "DeleteProvisionedConcurrencyConfig",
1297 "ListProvisionedConcurrencyConfigs",
1298 "CreateCodeSigningConfig",
1299 "GetCodeSigningConfig",
1300 "UpdateCodeSigningConfig",
1301 "DeleteCodeSigningConfig",
1302 "ListCodeSigningConfigs",
1303 "PutFunctionCodeSigningConfig",
1304 "GetFunctionCodeSigningConfig",
1305 "DeleteFunctionCodeSigningConfig",
1306 "ListFunctionsByCodeSigningConfig",
1307 "PutFunctionEventInvokeConfig",
1308 "GetFunctionEventInvokeConfig",
1309 "UpdateFunctionEventInvokeConfig",
1310 "DeleteFunctionEventInvokeConfig",
1311 "ListFunctionEventInvokeConfigs",
1312 "PutRuntimeManagementConfig",
1313 "GetRuntimeManagementConfig",
1314 "PutFunctionScalingConfig",
1315 "GetFunctionScalingConfig",
1316 "PutFunctionRecursionConfig",
1317 "GetFunctionRecursionConfig",
1318 "TagResource",
1319 "UntagResource",
1320 "ListTags",
1321 "CreateCapacityProvider",
1322 "GetCapacityProvider",
1323 "ListCapacityProviders",
1324 "UpdateCapacityProvider",
1325 "DeleteCapacityProvider",
1326 "ListFunctionVersionsByCapacityProvider",
1327 "GetDurableExecution",
1328 "GetDurableExecutionHistory",
1329 "GetDurableExecutionState",
1330 "ListDurableExecutionsByFunction",
1331 "CheckpointDurableExecution",
1332 "StopDurableExecution",
1333 "SendDurableExecutionCallbackSuccess",
1334 "SendDurableExecutionCallbackFailure",
1335 "SendDurableExecutionCallbackHeartbeat",
1336 ]
1337 }
1338
1339 fn iam_enforceable(&self) -> bool {
1340 true
1341 }
1342
1343 fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1347 let (action_str, resource_name) = Self::resolve_action(request)?;
1352 let action: &'static str = match action_str {
1353 "CreateFunction" => "CreateFunction",
1354 "ListFunctions" => "ListFunctions",
1355 "GetFunction" => "GetFunction",
1356 "DeleteFunction" => "DeleteFunction",
1357 "Invoke" => "InvokeFunction",
1358 "InvokeWithResponseStream" => "InvokeFunctionWithResponseStream",
1359 "PublishVersion" => "PublishVersion",
1360 "AddPermission" => "AddPermission",
1361 "RemovePermission" => "RemovePermission",
1362 "GetPolicy" => "GetPolicy",
1363 "CreateEventSourceMapping" => "CreateEventSourceMapping",
1364 "ListEventSourceMappings" => "ListEventSourceMappings",
1365 "GetEventSourceMapping" => "GetEventSourceMapping",
1366 "DeleteEventSourceMapping" => "DeleteEventSourceMapping",
1367 _ => return None,
1368 };
1369 let accounts = self.state.read();
1370 let empty = LambdaState::new(&request.account_id, &request.region);
1371 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1372 let resource = match action {
1373 "GetFunction"
1374 | "DeleteFunction"
1375 | "InvokeFunction"
1376 | "InvokeFunctionWithResponseStream"
1377 | "PublishVersion"
1378 | "AddPermission"
1379 | "RemovePermission"
1380 | "GetPolicy" => {
1381 let raw = resource_name.unwrap_or_default();
1382 if raw.is_empty() {
1383 "*".to_string()
1384 } else {
1385 let name = normalize_function_name(&raw);
1391 format!(
1392 "arn:aws:lambda:{}:{}:function:{}",
1393 state.region, state.account_id, name
1394 )
1395 }
1396 }
1397 "CreateFunction" => {
1398 serde_json::from_slice::<Value>(&request.body)
1403 .ok()
1404 .and_then(|v| {
1405 v.get("FunctionName").and_then(|f| f.as_str()).map(|n| {
1406 format!(
1407 "arn:aws:lambda:{}:{}:function:{}",
1408 state.region, state.account_id, n
1409 )
1410 })
1411 })
1412 .unwrap_or_else(|| "*".to_string())
1413 }
1414 _ => "*".to_string(),
1415 };
1416 Some(fakecloud_core::auth::IamAction {
1417 service: "lambda",
1418 action,
1419 resource,
1420 })
1421 }
1422
1423 fn iam_condition_keys_for(
1424 &self,
1425 request: &AwsRequest,
1426 action: &fakecloud_core::auth::IamAction,
1427 ) -> std::collections::BTreeMap<String, Vec<String>> {
1428 let mut out = std::collections::BTreeMap::new();
1429 if action.action == "AddPermission" {
1430 if action.resource != "*" {
1431 out.insert(
1432 "lambda:functionarn".to_string(),
1433 vec![action.resource.clone()],
1434 );
1435 }
1436 if let Ok(body) = serde_json::from_slice::<Value>(&request.body) {
1437 if let Some(principal) = body.get("Principal").and_then(|p| p.as_str()) {
1438 out.insert("lambda:principal".to_string(), vec![principal.to_string()]);
1439 }
1440 }
1441 }
1442 out
1443 }
1444}
1445
1446#[path = "../service_event_sources.rs"]
1447mod service_event_sources;
1448#[path = "../service_permissions.rs"]
1449mod service_permissions;
1450
1451#[cfg(test)]
1452#[path = "../service_tests.rs"]
1453mod tests;