1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::{Method, StatusCode};
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::ContainerRuntime;
15use crate::state::{
16 EventSourceMapping, LambdaFunction, LambdaSnapshot, LambdaState, SharedLambdaState,
17 LAMBDA_SNAPSHOT_SCHEMA_VERSION,
18};
19
20struct CreateFunctionInput {
24 function_name: String,
25 runtime: String,
26 role: String,
27 handler: String,
28 description: String,
29 timeout: i64,
30 memory_size: i64,
31 package_type: String,
32 tags: HashMap<String, String>,
33 environment: HashMap<String, String>,
34 architectures: Vec<String>,
35 code_zip: Option<Vec<u8>>,
36 code_fallback: Vec<u8>,
37 image_uri: Option<String>,
38}
39
40impl CreateFunctionInput {
41 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
42 let function_name = body["FunctionName"]
43 .as_str()
44 .ok_or_else(|| {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 "FunctionName is required",
49 )
50 })?
51 .to_string();
52
53 let tags: HashMap<String, String> = body["Tags"]
54 .as_object()
55 .map(|m| {
56 m.iter()
57 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
58 .collect()
59 })
60 .unwrap_or_default();
61
62 let environment: HashMap<String, String> = body["Environment"]["Variables"]
63 .as_object()
64 .map(|m| {
65 m.iter()
66 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
67 .collect()
68 })
69 .unwrap_or_default();
70
71 let architectures = body["Architectures"]
72 .as_array()
73 .map(|a| {
74 a.iter()
75 .filter_map(|v| v.as_str().map(|s| s.to_string()))
76 .collect()
77 })
78 .unwrap_or_else(|| vec!["x86_64".to_string()]);
79
80 let code_zip: Option<Vec<u8>> = match body["Code"]["ZipFile"].as_str() {
81 Some(b64) => Some(
82 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
83 |_| {
84 AwsServiceError::aws_error(
85 StatusCode::BAD_REQUEST,
86 "InvalidParameterValueException",
87 "Could not decode Code.ZipFile: invalid base64",
88 )
89 },
90 )?,
91 ),
92 None => None,
93 };
94
95 let code_fallback = serde_json::to_vec(&body["Code"]).unwrap_or_default();
96
97 let package_type = body["PackageType"].as_str().unwrap_or("Zip").to_string();
98 let image_uri = if package_type == "Image" {
103 body["Code"]["ImageUri"].as_str().map(String::from)
104 } else {
105 None
106 };
107
108 if package_type == "Image" && image_uri.is_none() {
112 return Err(AwsServiceError::aws_error(
113 StatusCode::BAD_REQUEST,
114 "InvalidParameterValueException",
115 "Code.ImageUri is required when PackageType is Image",
116 ));
117 }
118
119 Ok(Self {
120 function_name,
121 runtime: body["Runtime"].as_str().unwrap_or("python3.12").to_string(),
122 role: body["Role"].as_str().unwrap_or("").to_string(),
123 handler: body["Handler"]
124 .as_str()
125 .unwrap_or("index.handler")
126 .to_string(),
127 description: body["Description"].as_str().unwrap_or("").to_string(),
128 timeout: body["Timeout"].as_i64().unwrap_or(3),
129 memory_size: body["MemorySize"].as_i64().unwrap_or(128),
130 package_type,
131 tags,
132 environment,
133 architectures,
134 code_zip,
135 code_fallback,
136 image_uri,
137 })
138 }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum InvocationType {
144 RequestResponse,
145 Event,
146 DryRun,
147}
148
149impl InvocationType {
150 pub fn from_header(value: Option<&str>) -> Self {
151 match value {
152 Some("Event") => Self::Event,
153 Some("DryRun") => Self::DryRun,
154 _ => Self::RequestResponse,
155 }
156 }
157}
158
159fn route_to_destination(
163 bus: Arc<fakecloud_core::delivery::DeliveryBus>,
164 function_arn: &str,
165 request_payload: &[u8],
166 result: &Result<Vec<u8>, String>,
167 destination_config: Option<&serde_json::Value>,
168) {
169 let Some(cfg) = destination_config else {
170 return;
171 };
172 let (key, condition, response_value): (&str, &str, serde_json::Value) = match result {
173 Ok(bytes) => (
174 "OnSuccess",
175 "Success",
176 serde_json::from_slice(bytes).unwrap_or(serde_json::Value::Null),
177 ),
178 Err(err) => (
179 "OnFailure",
180 "RetriesExhausted",
181 serde_json::json!({ "errorMessage": err }),
182 ),
183 };
184 let Some(dest) = cfg
185 .get(key)
186 .and_then(|v| v.get("Destination"))
187 .and_then(|v| v.as_str())
188 else {
189 return;
190 };
191 let request_payload_v: serde_json::Value =
192 serde_json::from_slice(request_payload).unwrap_or(serde_json::Value::Null);
193 let record = serde_json::json!({
194 "version": "1.0",
195 "timestamp": chrono::Utc::now().to_rfc3339(),
196 "requestContext": {
197 "requestId": uuid::Uuid::new_v4().to_string(),
198 "functionArn": format!("{function_arn}:$LATEST"),
199 "condition": condition,
200 "approximateInvokeCount": 1,
201 },
202 "requestPayload": request_payload_v,
203 "responseContext": {
204 "statusCode": 200,
205 "executedVersion": "$LATEST",
206 },
207 "responsePayload": response_value,
208 });
209 let body = record.to_string();
210 if dest.contains(":sqs:") {
211 bus.send_to_sqs(dest, &body, &std::collections::HashMap::new());
212 } else if dest.contains(":sns:") {
213 bus.publish_to_sns(dest, &body, None);
214 } else if dest.contains(":lambda:") {
215 let dest = dest.to_string();
216 let payload = body.clone();
217 tokio::spawn(async move {
218 let _ = bus.invoke_lambda(&dest, &payload).await;
219 });
220 } else if dest.contains(":events:") || dest.contains(":eventbridge:") {
221 let detail_type = if result.is_ok() {
222 "Lambda Function Invocation Result - Success"
223 } else {
224 "Lambda Function Invocation Result - Failure"
225 };
226 bus.put_event_to_eventbridge("lambda", detail_type, &body, "default");
227 }
228}
229
230pub struct LambdaService {
231 pub(crate) state: SharedLambdaState,
232 runtime: Option<Arc<ContainerRuntime>>,
233 snapshot_store: Option<Arc<dyn SnapshotStore>>,
234 snapshot_lock: Arc<AsyncMutex<()>>,
235 pub(crate) delivery_bus: Option<Arc<fakecloud_core::delivery::DeliveryBus>>,
236 pub(crate) role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
237}
238
239impl LambdaService {
240 pub fn new(state: SharedLambdaState) -> Self {
241 Self {
242 state,
243 runtime: None,
244 snapshot_store: None,
245 snapshot_lock: Arc::new(AsyncMutex::new(())),
246 delivery_bus: None,
247 role_trust_validator: None,
248 }
249 }
250
251 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
252 self.runtime = Some(runtime);
253 self
254 }
255
256 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
257 self.snapshot_store = Some(store);
258 self
259 }
260
261 pub fn with_delivery_bus(mut self, bus: Arc<fakecloud_core::delivery::DeliveryBus>) -> Self {
262 self.delivery_bus = Some(bus);
263 self
264 }
265
266 pub fn with_role_trust_validator(
267 mut self,
268 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
269 ) -> Self {
270 self.role_trust_validator = Some(validator);
271 self
272 }
273
274 async fn save_snapshot(&self) {
275 let Some(store) = self.snapshot_store.clone() else {
276 return;
277 };
278 let _guard = self.snapshot_lock.lock().await;
279 let snapshot = LambdaSnapshot {
280 schema_version: LAMBDA_SNAPSHOT_SCHEMA_VERSION,
281 accounts: Some(self.state.read().clone()),
282 state: None,
283 };
284 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
285 let bytes = serde_json::to_vec(&snapshot)
286 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
287 store.save(&bytes)
288 })
289 .await;
290 match join {
291 Ok(Ok(())) => {}
292 Ok(Err(err)) => tracing::error!(%err, "failed to write lambda snapshot"),
293 Err(err) => tracing::error!(%err, "lambda snapshot task panicked"),
294 }
295 }
296
297 fn resolve_action(req: &AwsRequest) -> Option<(&'static str, Option<String>)> {
310 let segs = &req.path_segments;
311 if segs.is_empty() {
312 return None;
313 }
314 let prefix = segs[0].as_str();
318
319 if segs.get(1).map(|s| s.as_str()) == Some("account-settings") && req.method == Method::GET
321 {
322 return Some(("GetAccountSettings", None));
323 }
324 if segs.get(1).map(|s| s.as_str()) == Some("functions")
325 && segs.get(3).map(|s| s.as_str()) == Some("invoke-async")
326 && req.method == Method::POST
327 {
328 return Some(("InvokeAsync", segs.get(2).map(|s| s.to_string())));
329 }
330 if segs.get(1).map(|s| s.as_str()) == Some("functions")
331 && segs.get(3).map(|s| s.as_str()) == Some("response-streaming-invocations")
332 && req.method == Method::POST
333 {
334 return Some((
335 "InvokeWithResponseStream",
336 segs.get(2).map(|s| s.to_string()),
337 ));
338 }
339
340 if segs.get(1).map(|s| s.as_str()) == Some("functions")
342 && segs.get(3).map(|s| s.as_str()) == Some("concurrency")
343 {
344 let res = segs.get(2).map(|s| s.to_string());
345 return match req.method {
346 Method::PUT => Some(("PutFunctionConcurrency", res)),
347 Method::GET => Some(("GetFunctionConcurrency", res)),
348 Method::DELETE => Some(("DeleteFunctionConcurrency", res)),
349 _ => None,
350 };
351 }
352
353 if segs.get(1).map(|s| s.as_str()) == Some("functions")
355 && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency")
356 {
357 let res = segs.get(2).map(|s| s.to_string());
358 return match req.method {
359 Method::PUT => Some(("PutProvisionedConcurrencyConfig", res)),
360 Method::GET => Some(("GetProvisionedConcurrencyConfig", res)),
361 Method::DELETE => Some(("DeleteProvisionedConcurrencyConfig", res)),
362 _ => None,
363 };
364 }
365 if segs.get(1).map(|s| s.as_str()) == Some("functions")
366 && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency-configs")
367 && req.method == Method::GET
368 {
369 return Some((
370 "ListProvisionedConcurrencyConfigs",
371 segs.get(2).map(|s| s.to_string()),
372 ));
373 }
374
375 if segs.get(1).map(|s| s.as_str()) == Some("functions")
377 && segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
378 {
379 let res = segs.get(2).map(|s| s.to_string());
380 return match req.method {
381 Method::POST => Some(("PutFunctionEventInvokeConfig", res)),
382 Method::PUT => Some(("UpdateFunctionEventInvokeConfig", res)),
383 Method::GET => Some(("GetFunctionEventInvokeConfig", res)),
384 Method::DELETE => Some(("DeleteFunctionEventInvokeConfig", res)),
385 _ => None,
386 };
387 }
388 if segs.get(1).map(|s| s.as_str()) == Some("functions")
389 && (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config-list")
390 || (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
391 && segs.get(4).map(|s| s.as_str()) == Some("list")))
392 && req.method == Method::GET
393 {
394 return Some((
395 "ListFunctionEventInvokeConfigs",
396 segs.get(2).map(|s| s.to_string()),
397 ));
398 }
399
400 if segs.get(1).map(|s| s.as_str()) == Some("functions")
402 && segs.get(3).map(|s| s.as_str()) == Some("recursion-config")
403 {
404 let res = segs.get(2).map(|s| s.to_string());
405 return match req.method {
406 Method::PUT => Some(("PutFunctionRecursionConfig", res)),
407 Method::GET => Some(("GetFunctionRecursionConfig", res)),
408 _ => None,
409 };
410 }
411
412 if segs.get(1).map(|s| s.as_str()) == Some("functions")
414 && segs.get(3).map(|s| s.as_str()) == Some("runtime-management-config")
415 {
416 let res = segs.get(2).map(|s| s.to_string());
417 return match req.method {
418 Method::PUT => Some(("PutRuntimeManagementConfig", res)),
419 Method::GET => Some(("GetRuntimeManagementConfig", res)),
420 _ => None,
421 };
422 }
423
424 if segs.get(1).map(|s| s.as_str()) == Some("functions")
426 && segs.get(3).map(|s| s.as_str()) == Some("code-signing-config")
427 {
428 let res = segs.get(2).map(|s| s.to_string());
429 return match req.method {
430 Method::PUT => Some(("PutFunctionCodeSigningConfig", res)),
431 Method::GET => Some(("GetFunctionCodeSigningConfig", res)),
432 Method::DELETE => Some(("DeleteFunctionCodeSigningConfig", res)),
433 _ => None,
434 };
435 }
436 if segs.get(1).map(|s| s.as_str()) == Some("code-signing-configs") {
437 let res = segs.get(2).map(|s| s.to_string());
438 return match (
439 req.method.clone(),
440 segs.len(),
441 segs.get(3).map(|s| s.as_str()),
442 ) {
443 (Method::POST, 2, _) => Some(("CreateCodeSigningConfig", None)),
444 (Method::GET, 2, _) => Some(("ListCodeSigningConfigs", None)),
445 (Method::GET, 3, _) => Some(("GetCodeSigningConfig", res)),
446 (Method::PUT, 3, _) => Some(("UpdateCodeSigningConfig", res)),
447 (Method::DELETE, 3, _) => Some(("DeleteCodeSigningConfig", res)),
448 (Method::GET, 4, Some("functions")) => {
449 Some(("ListFunctionsByCodeSigningConfig", res))
450 }
451 _ => None,
452 };
453 }
454
455 if segs.get(1).map(|s| s.as_str()) == Some("tags") && segs.len() >= 3 {
457 let res = segs[2..].join("/");
458 return match req.method {
459 Method::POST => Some(("TagResource", Some(res))),
460 Method::DELETE => Some(("UntagResource", Some(res))),
461 Method::GET => Some(("ListTags", Some(res))),
462 _ => None,
463 };
464 }
465
466 if segs.get(1).map(|s| s.as_str()) == Some("functions")
468 && segs.get(3).map(|s| s.as_str()) == Some("url")
469 {
470 let res = segs.get(2).map(|s| s.to_string());
471 return match req.method {
472 Method::POST => Some(("CreateFunctionUrlConfig", res)),
473 Method::GET => Some(("GetFunctionUrlConfig", res)),
474 Method::PUT => Some(("UpdateFunctionUrlConfig", res)),
475 Method::DELETE => Some(("DeleteFunctionUrlConfig", res)),
476 _ => None,
477 };
478 }
479 if segs.get(1).map(|s| s.as_str()) == Some("function-urls") && req.method == Method::GET {
480 return Some(("ListFunctionUrlConfigs", None));
481 }
482 if segs.get(1).map(|s| s.as_str()) == Some("functions")
483 && segs.get(3).map(|s| s.as_str()) == Some("urls")
484 && req.method == Method::GET
485 {
486 return Some(("ListFunctionUrlConfigs", segs.get(2).map(|s| s.to_string())));
487 }
488 if segs.get(1).map(|s| s.as_str()) == Some("event-source-mappings")
489 && segs.get(3).map(|s| s.as_str()) == Some("scaling-config")
490 {
491 let res = segs.get(2).map(|s| s.to_string());
492 return match req.method {
493 Method::PUT => Some(("PutFunctionScalingConfig", res)),
494 Method::GET => Some(("GetFunctionScalingConfig", res)),
495 _ => None,
496 };
497 }
498
499 if segs.get(1).map(|s| s.as_str()) == Some("capacity-providers") {
501 let res = segs.get(2).map(|s| s.to_string());
502 return match (
503 req.method.clone(),
504 segs.len(),
505 segs.get(3).map(|s| s.as_str()),
506 ) {
507 (Method::POST, 2, _) => Some(("CreateCapacityProvider", None)),
508 (Method::GET, 2, _) => Some(("ListCapacityProviders", None)),
509 (Method::GET, 3, _) => Some(("GetCapacityProvider", res)),
510 (Method::PUT, 3, _) => Some(("UpdateCapacityProvider", res)),
511 (Method::DELETE, 3, _) => Some(("DeleteCapacityProvider", res)),
512 (Method::GET, 4, Some("function-versions")) => {
513 Some(("ListFunctionVersionsByCapacityProvider", res))
514 }
515 _ => None,
516 };
517 }
518
519 if segs.get(1).map(|s| s.as_str()) == Some("functions")
521 && segs.get(3).map(|s| s.as_str()) == Some("durable-executions")
522 && req.method == Method::GET
523 {
524 return Some((
525 "ListDurableExecutionsByFunction",
526 segs.get(2).map(|s| s.to_string()),
527 ));
528 }
529
530 if segs.get(1).map(|s| s.as_str()) == Some("durable-execution-callbacks")
532 && req.method == Method::POST
533 {
534 let res = segs.get(2).map(|s| s.to_string());
535 return match segs.get(3).map(|s| s.as_str()) {
536 Some("success") | Some("succeed") => {
537 Some(("SendDurableExecutionCallbackSuccess", res))
538 }
539 Some("failure") | Some("fail") => {
540 Some(("SendDurableExecutionCallbackFailure", res))
541 }
542 Some("heartbeat") => Some(("SendDurableExecutionCallbackHeartbeat", res)),
543 _ => None,
544 };
545 }
546
547 if segs.get(1).map(|s| s.as_str()) == Some("durable-executions") {
549 let res = segs.get(2).map(|s| s.to_string());
550 return match (
551 req.method.clone(),
552 segs.len(),
553 segs.get(3).map(|s| s.as_str()),
554 segs.get(4).map(|s| s.as_str()),
555 ) {
556 (Method::GET, 3, _, _) => Some(("GetDurableExecution", res)),
557 (Method::GET, 4, Some("history"), _) => Some(("GetDurableExecutionHistory", res)),
558 (Method::GET, 4, Some("state"), _) => Some(("GetDurableExecutionState", res)),
559 (Method::POST, 4, Some("checkpoint"), _) => {
560 Some(("CheckpointDurableExecution", res))
561 }
562 (Method::POST, 4, Some("stop"), _) => Some(("StopDurableExecution", res)),
563 (Method::POST, 5, Some("callback"), Some("success")) => {
564 Some(("SendDurableExecutionCallbackSuccess", res))
565 }
566 (Method::POST, 5, Some("callback"), Some("failure")) => {
567 Some(("SendDurableExecutionCallbackFailure", res))
568 }
569 (Method::POST, 5, Some("callback"), Some("heartbeat")) => {
570 Some(("SendDurableExecutionCallbackHeartbeat", res))
571 }
572 _ => None,
573 };
574 }
575
576 if prefix == "2018-10-31" && segs.get(1).map(|s| s.as_str()) == Some("layers") {
583 let layer = segs.get(2).map(|s| s.to_string());
584 let third = segs.get(3).map(|s| s.as_str());
585 let version = segs.get(4).map(|s| s.to_string());
586 return match (&req.method, segs.len(), third, version.is_some()) {
587 (&Method::GET, 2, _, _) => Some(("ListLayers", None)),
588 (&Method::POST, 4, Some("versions"), false) => Some(("PublishLayerVersion", layer)),
589 (&Method::GET, 4, Some("versions"), false) => Some(("ListLayerVersions", layer)),
590 (&Method::GET, 5, Some("versions"), true) => Some(("GetLayerVersion", version)),
591 (&Method::DELETE, 5, Some("versions"), true) => {
592 Some(("DeleteLayerVersion", version))
593 }
594 (&Method::GET, 6, Some("versions"), true)
595 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
596 {
597 Some(("GetLayerVersionPolicy", version))
598 }
599 (&Method::POST, 6, Some("versions"), true)
600 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
601 {
602 Some(("AddLayerVersionPermission", version))
603 }
604 (&Method::DELETE, 7, Some("versions"), true)
605 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
606 {
607 Some(("RemoveLayerVersionPermission", version))
608 }
609 _ => None,
610 };
611 }
612
613 if prefix == "2018-10-31"
615 && segs.get(1).map(|s| s.as_str()) == Some("layers-by-arn")
616 && req.method == Method::GET
617 {
618 return Some(("GetLayerVersionByArn", None));
619 }
620
621 if prefix != "2015-03-31" {
625 return None;
626 }
627
628 let collection = segs.get(1).map(|s| s.as_str());
629 let resource = segs.get(2).map(|s| s.to_string());
630 let third = segs.get(3).map(|s| s.as_str());
631 let fourth = segs.get(4).map(|s| s.as_str());
632
633 let action = match (&req.method, segs.len(), collection, third) {
634 (&Method::POST, 2, Some("functions"), _) => "CreateFunction",
635 (&Method::GET, 2, Some("functions"), _) => "ListFunctions",
636 (&Method::GET, 3, Some("functions"), _) => "GetFunction",
637 (&Method::DELETE, 3, Some("functions"), _) => "DeleteFunction",
638 (&Method::POST, 4, Some("functions"), Some("invocations")) => "Invoke",
639 (&Method::POST, 4, Some("functions"), Some("invoke-async")) => "InvokeAsync",
640 (&Method::POST, 4, Some("functions"), Some("response-streaming-invocations")) => {
641 "InvokeWithResponseStream"
642 }
643 (&Method::POST, 4, Some("functions"), Some("versions")) => "PublishVersion",
644 (&Method::GET, 4, Some("functions"), Some("versions")) => "ListVersionsByFunction",
645 (&Method::POST, 4, Some("functions"), Some("policy")) => "AddPermission",
646 (&Method::GET, 4, Some("functions"), Some("policy")) => "GetPolicy",
647 (&Method::DELETE, 5, Some("functions"), Some("policy")) => "RemovePermission",
648 (&Method::POST, 4, Some("functions"), Some("aliases")) => "CreateAlias",
649 (&Method::GET, 4, Some("functions"), Some("aliases")) => "ListAliases",
650 (&Method::GET, 5, Some("functions"), Some("aliases")) => "GetAlias",
651 (&Method::PUT, 5, Some("functions"), Some("aliases")) => "UpdateAlias",
652 (&Method::DELETE, 5, Some("functions"), Some("aliases")) => "DeleteAlias",
653 (&Method::GET, 4, Some("functions"), Some("configuration")) => {
654 "GetFunctionConfiguration"
655 }
656 (&Method::PUT, 4, Some("functions"), Some("configuration")) => {
657 "UpdateFunctionConfiguration"
658 }
659 (&Method::PUT, 4, Some("functions"), Some("code")) => "UpdateFunctionCode",
660 (&Method::PUT, 4, Some("functions"), Some("concurrency")) => "PutFunctionConcurrency",
661 (&Method::GET, 4, Some("functions"), Some("concurrency")) => "GetFunctionConcurrency",
662 (&Method::DELETE, 4, Some("functions"), Some("concurrency")) => {
663 "DeleteFunctionConcurrency"
664 }
665 (&Method::PUT, 4, Some("functions"), Some("provisioned-concurrency")) => {
666 "PutProvisionedConcurrencyConfig"
667 }
668 (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency")) => {
669 "GetProvisionedConcurrencyConfig"
670 }
671 (&Method::DELETE, 4, Some("functions"), Some("provisioned-concurrency")) => {
672 "DeleteProvisionedConcurrencyConfig"
673 }
674 (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency-configs")) => {
675 "ListProvisionedConcurrencyConfigs"
676 }
677 (&Method::PUT, 4, Some("functions"), Some("event-invoke-config")) => {
678 "UpdateFunctionEventInvokeConfig"
679 }
680 (&Method::POST, 4, Some("functions"), Some("event-invoke-config")) => {
681 "PutFunctionEventInvokeConfig"
682 }
683 (&Method::GET, 4, Some("functions"), Some("event-invoke-config")) => {
684 "GetFunctionEventInvokeConfig"
685 }
686 (&Method::DELETE, 4, Some("functions"), Some("event-invoke-config")) => {
687 "DeleteFunctionEventInvokeConfig"
688 }
689 (&Method::GET, 4, Some("functions"), Some("event-invoke-config-list")) => {
690 "ListFunctionEventInvokeConfigs"
691 }
692 (&Method::PUT, 4, Some("functions"), Some("code-signing-config")) => {
693 "PutFunctionCodeSigningConfig"
694 }
695 (&Method::GET, 4, Some("functions"), Some("code-signing-config")) => {
696 "GetFunctionCodeSigningConfig"
697 }
698 (&Method::DELETE, 4, Some("functions"), Some("code-signing-config")) => {
699 "DeleteFunctionCodeSigningConfig"
700 }
701 (&Method::PUT, 4, Some("functions"), Some("runtime-management-config")) => {
702 "PutRuntimeManagementConfig"
703 }
704 (&Method::GET, 4, Some("functions"), Some("runtime-management-config")) => {
705 "GetRuntimeManagementConfig"
706 }
707 (&Method::PUT, 4, Some("functions"), Some("scaling-config")) => {
708 "PutFunctionScalingConfig"
709 }
710 (&Method::GET, 4, Some("functions"), Some("scaling-config")) => {
711 "GetFunctionScalingConfig"
712 }
713 (&Method::PUT, 4, Some("functions"), Some("recursion-config")) => {
714 "PutFunctionRecursionConfig"
715 }
716 (&Method::GET, 4, Some("functions"), Some("recursion-config")) => {
717 "GetFunctionRecursionConfig"
718 }
719 (&Method::GET, 4, Some("functions"), Some("durable-executions")) => {
720 "ListDurableExecutionsByFunction"
721 }
722 (&Method::POST, 2, Some("event-source-mappings"), _) => "CreateEventSourceMapping",
723 (&Method::GET, 2, Some("event-source-mappings"), _) => "ListEventSourceMappings",
724 (&Method::GET, 3, Some("event-source-mappings"), _) => "GetEventSourceMapping",
725 (&Method::PUT, 3, Some("event-source-mappings"), _) => "UpdateEventSourceMapping",
726 (&Method::DELETE, 3, Some("event-source-mappings"), _) => "DeleteEventSourceMapping",
727 (&Method::POST, 3, Some("tags"), _) => "TagResource",
728 (&Method::DELETE, 3, Some("tags"), _) => "UntagResource",
729 (&Method::GET, 3, Some("tags"), _) => "ListTags",
730 _ => return None,
731 };
732 let _ = fourth;
733
734 Some((action, resource))
735 }
736
737 fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
738 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
739 let input = CreateFunctionInput::from_body(&body)?;
740
741 if let Some(ref validator) = self.role_trust_validator {
746 if let Err(err) =
747 validator.validate(&req.account_id, &input.role, "lambda.amazonaws.com")
748 {
749 return Err(AwsServiceError::aws_error(
750 StatusCode::BAD_REQUEST,
751 "InvalidParameterValueException",
752 err.to_string(),
753 ));
754 }
755 }
756
757 let mut accounts = self.state.write();
758 let state = accounts.get_or_create(&req.account_id);
759
760 if state.functions.contains_key(&input.function_name) {
761 return Err(AwsServiceError::aws_error(
762 StatusCode::CONFLICT,
763 "ResourceConflictException",
764 format!("Function already exist: {}", input.function_name),
765 ));
766 }
767
768 let code_bytes = input.code_zip.as_deref().unwrap_or(&input.code_fallback);
771 let mut hasher = Sha256::new();
772 hasher.update(code_bytes);
773 let hash = hasher.finalize();
774 let code_sha256 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
775 let code_size = code_bytes.len() as i64;
776
777 let function_arn = format!(
778 "arn:aws:lambda:{}:{}:function:{}",
779 state.region, state.account_id, input.function_name
780 );
781 let now = Utc::now();
782
783 let func = LambdaFunction {
784 function_name: input.function_name.clone(),
785 function_arn,
786 runtime: input.runtime,
787 role: input.role,
788 handler: input.handler,
789 description: input.description,
790 timeout: input.timeout,
791 memory_size: input.memory_size,
792 code_sha256,
793 code_size,
794 version: "$LATEST".to_string(),
795 last_modified: now,
796 tags: input.tags,
797 environment: input.environment,
798 architectures: input.architectures,
799 package_type: input.package_type,
800 code_zip: input.code_zip,
801 image_uri: input.image_uri,
802 policy: None,
803 };
804
805 let response = self.function_config_json(&func);
806
807 state.functions.insert(input.function_name, func);
808
809 Ok(AwsResponse::json(StatusCode::CREATED, response.to_string()))
810 }
811
812 fn get_function(
813 &self,
814 function_name: &str,
815 account_id: &str,
816 region: &str,
817 ) -> Result<AwsResponse, AwsServiceError> {
818 let accounts = self.state.read();
819 let empty = LambdaState::new(account_id, region);
820 let state = accounts.get(account_id).unwrap_or(&empty);
821 let func = state.functions.get(function_name).ok_or_else(|| {
822 AwsServiceError::aws_error(
823 StatusCode::NOT_FOUND,
824 "ResourceNotFoundException",
825 format!(
826 "Function not found: arn:aws:lambda:{}:{}:function:{}",
827 state.region, state.account_id, function_name
828 ),
829 )
830 })?;
831
832 let config = self.function_config_json(func);
833 let code = if let Some(ref uri) = func.image_uri {
834 json!({
835 "ImageUri": uri,
836 "ResolvedImageUri": uri,
837 "RepositoryType": "ECR",
838 })
839 } else {
840 json!({
841 "Location": format!(
842 "https://awslambda-{}-tasks.s3.{}.amazonaws.com/stub",
843 func.function_arn.split(':').nth(3).unwrap_or("us-east-1"),
844 func.function_arn.split(':').nth(3).unwrap_or("us-east-1")
845 ),
846 "RepositoryType": "S3",
847 })
848 };
849 let response = json!({
850 "Code": code,
851 "Configuration": config,
852 "Tags": func.tags,
853 });
854
855 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
856 }
857
858 fn delete_function(
859 &self,
860 function_name: &str,
861 account_id: &str,
862 ) -> Result<AwsResponse, AwsServiceError> {
863 let mut accounts = self.state.write();
864 let state = accounts.get_or_create(account_id);
865 let region = state.region.clone();
866 let account_id = state.account_id.clone();
867 if state.functions.remove(function_name).is_none() {
868 return Err(AwsServiceError::aws_error(
869 StatusCode::NOT_FOUND,
870 "ResourceNotFoundException",
871 format!(
872 "Function not found: arn:aws:lambda:{}:{}:function:{}",
873 region, account_id, function_name
874 ),
875 ));
876 }
877
878 if let Some(ref runtime) = self.runtime {
880 let rt = runtime.clone();
881 let name = function_name.to_string();
882 tokio::spawn(async move { rt.stop_container(&name).await });
883 }
884
885 Ok(AwsResponse::json(StatusCode::NO_CONTENT, ""))
886 }
887
888 fn list_functions(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
889 let accounts = self.state.read();
890 let empty = LambdaState::new(account_id, "");
891 let state = accounts.get(account_id).unwrap_or(&empty);
892 let functions: Vec<Value> = state
893 .functions
894 .values()
895 .map(|f| self.function_config_json(f))
896 .collect();
897
898 let response = json!({
899 "Functions": functions,
900 });
901
902 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
903 }
904
905 async fn invoke(
906 &self,
907 function_name: &str,
908 payload: &[u8],
909 account_id: &str,
910 invocation_type: InvocationType,
911 ) -> Result<AwsResponse, AwsServiceError> {
912 let func = {
913 let accounts = self.state.read();
914 let empty = LambdaState::new(account_id, "");
915 let state = accounts.get(account_id).unwrap_or(&empty);
916 state.functions.get(function_name).cloned().ok_or_else(|| {
917 AwsServiceError::aws_error(
918 StatusCode::NOT_FOUND,
919 "ResourceNotFoundException",
920 format!(
921 "Function not found: arn:aws:lambda:{}:{}:function:{}",
922 state.region, state.account_id, function_name
923 ),
924 )
925 })?
926 };
927
928 if func.code_zip.is_none() {
929 return Err(AwsServiceError::aws_error(
930 StatusCode::BAD_REQUEST,
931 "InvalidParameterValueException",
932 "Function has no deployment package",
933 ));
934 }
935
936 if matches!(invocation_type, InvocationType::DryRun) {
937 let mut resp = AwsResponse::json(StatusCode::NO_CONTENT, "");
938 resp.headers.insert(
939 http::header::HeaderName::from_static("x-amz-executed-version"),
940 http::header::HeaderValue::from_static("$LATEST"),
941 );
942 return Ok(resp);
943 }
944
945 let runtime = self.runtime.as_ref().ok_or_else(|| {
946 AwsServiceError::aws_error(
947 StatusCode::INTERNAL_SERVER_ERROR,
948 "ServiceException",
949 "Docker/Podman is required for Lambda execution but is not available",
950 )
951 })?;
952
953 match invocation_type {
954 InvocationType::Event => {
955 let runtime = runtime.clone();
957 let func_clone = func.clone();
958 let payload_vec = payload.to_vec();
959 let bus = self.delivery_bus.clone();
960 let destination_config = self.lookup_destination_config(&func, account_id);
961 let function_arn = func.function_arn.clone();
962 tokio::spawn(async move {
963 let result = match runtime.invoke(&func_clone, &payload_vec).await {
964 Ok(bytes) => {
965 let parsed: Option<serde_json::Value> =
969 serde_json::from_slice(&bytes).ok();
970 let is_error = parsed
971 .as_ref()
972 .and_then(|v| v.as_object())
973 .map(|m| {
974 m.contains_key("errorMessage") || m.contains_key("errorType")
975 })
976 .unwrap_or(false);
977 if is_error {
978 let msg = parsed
979 .as_ref()
980 .and_then(|v| v.get("errorMessage"))
981 .and_then(|v| v.as_str())
982 .unwrap_or("function error")
983 .to_string();
984 Err(msg)
985 } else {
986 Ok(bytes)
987 }
988 }
989 Err(e) => Err(e.to_string()),
990 };
991 if let Some(bus) = bus {
992 route_to_destination(
993 bus,
994 &function_arn,
995 &payload_vec,
996 &result,
997 destination_config.as_ref(),
998 );
999 }
1000 });
1001 let mut resp = AwsResponse::json(StatusCode::ACCEPTED, "");
1002 resp.headers.insert(
1003 http::header::HeaderName::from_static("x-amz-executed-version"),
1004 http::header::HeaderValue::from_static("$LATEST"),
1005 );
1006 Ok(resp)
1007 }
1008 InvocationType::RequestResponse | InvocationType::DryRun => {
1009 match runtime.invoke(&func, payload).await {
1010 Ok(response_bytes) => {
1011 let mut resp = AwsResponse::json(StatusCode::OK, response_bytes);
1012 resp.headers.insert(
1013 http::header::HeaderName::from_static("x-amz-executed-version"),
1014 http::header::HeaderValue::from_static("$LATEST"),
1015 );
1016 Ok(resp)
1017 }
1018 Err(e) => {
1019 tracing::error!(function = %function_name, error = %e, "Lambda invocation failed");
1020 Err(AwsServiceError::aws_error(
1021 StatusCode::INTERNAL_SERVER_ERROR,
1022 "ServiceException",
1023 format!("Lambda execution failed: {e}"),
1024 ))
1025 }
1026 }
1027 }
1028 }
1029 }
1030
1031 fn lookup_destination_config(
1036 &self,
1037 func: &crate::state::LambdaFunction,
1038 account_id: &str,
1039 ) -> Option<serde_json::Value> {
1040 let accounts = self.state.read();
1041 let state = accounts.get(account_id)?;
1042 let key = format!("{}:$LATEST", func.function_name);
1043 state
1044 .event_invoke_configs
1045 .get(&key)
1046 .map(|cfg| cfg.destination_config.clone())
1047 .filter(|v| !v.is_null() && !v.as_object().map(|o| o.is_empty()).unwrap_or(false))
1048 }
1049
1050 fn publish_version(
1051 &self,
1052 function_name: &str,
1053 account_id: &str,
1054 ) -> Result<AwsResponse, AwsServiceError> {
1055 let accounts = self.state.read();
1056 let empty = LambdaState::new(account_id, "");
1057 let state = accounts.get(account_id).unwrap_or(&empty);
1058 let func = state.functions.get(function_name).ok_or_else(|| {
1059 AwsServiceError::aws_error(
1060 StatusCode::NOT_FOUND,
1061 "ResourceNotFoundException",
1062 format!(
1063 "Function not found: arn:aws:lambda:{}:{}:function:{}",
1064 state.region, state.account_id, function_name
1065 ),
1066 )
1067 })?;
1068
1069 let mut config = self.function_config_json(func);
1070 config["Version"] = json!("1");
1072 config["FunctionArn"] = json!(format!("{}:1", func.function_arn));
1073
1074 Ok(AwsResponse::json(StatusCode::CREATED, config.to_string()))
1075 }
1076
1077 fn create_event_source_mapping(
1078 &self,
1079 req: &AwsRequest,
1080 ) -> Result<AwsResponse, AwsServiceError> {
1081 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1082 let event_source_arn = body["EventSourceArn"]
1083 .as_str()
1084 .ok_or_else(|| {
1085 AwsServiceError::aws_error(
1086 StatusCode::BAD_REQUEST,
1087 "InvalidParameterValueException",
1088 "EventSourceArn is required",
1089 )
1090 })?
1091 .to_string();
1092
1093 let function_name = body["FunctionName"]
1094 .as_str()
1095 .ok_or_else(|| {
1096 AwsServiceError::aws_error(
1097 StatusCode::BAD_REQUEST,
1098 "InvalidParameterValueException",
1099 "FunctionName is required",
1100 )
1101 })?
1102 .to_string();
1103
1104 let mut accounts = self.state.write();
1105 let state = accounts.get_or_create(&req.account_id);
1106
1107 let function_arn = if function_name.starts_with("arn:") {
1109 function_name.clone()
1110 } else {
1111 let func = state.functions.get(&function_name).ok_or_else(|| {
1112 AwsServiceError::aws_error(
1113 StatusCode::NOT_FOUND,
1114 "ResourceNotFoundException",
1115 format!(
1116 "Function not found: arn:aws:lambda:{}:{}:function:{}",
1117 state.region, state.account_id, function_name
1118 ),
1119 )
1120 })?;
1121 func.function_arn.clone()
1122 };
1123
1124 let batch_size = body["BatchSize"].as_i64().unwrap_or(10);
1125 let enabled = body["Enabled"].as_bool().unwrap_or(true);
1126 let mapping_uuid = uuid::Uuid::new_v4().to_string();
1127 let now = Utc::now();
1128
1129 let filter_patterns: Vec<String> = match body.get("FilterCriteria") {
1137 None | Some(Value::Null) => Vec::new(),
1138 Some(Value::Object(_)) => {
1139 match body.get("FilterCriteria").and_then(|v| v.get("Filters")) {
1140 None => Vec::new(),
1141 Some(Value::Array(arr)) => {
1142 let mut out = Vec::with_capacity(arr.len());
1143 for f in arr {
1144 match f.get("Pattern") {
1145 Some(Value::String(s)) => out.push(s.clone()),
1146 _ => {
1147 return Err(AwsServiceError::aws_error(
1148 StatusCode::BAD_REQUEST,
1149 "InvalidParameterValueException",
1150 "FilterCriteria.Filters[].Pattern must be a string",
1151 ));
1152 }
1153 }
1154 }
1155 out
1156 }
1157 Some(_) => {
1158 return Err(AwsServiceError::aws_error(
1159 StatusCode::BAD_REQUEST,
1160 "InvalidParameterValueException",
1161 "FilterCriteria.Filters must be an array",
1162 ));
1163 }
1164 }
1165 }
1166 Some(_) => {
1167 return Err(AwsServiceError::aws_error(
1168 StatusCode::BAD_REQUEST,
1169 "InvalidParameterValueException",
1170 "FilterCriteria must be an object",
1171 ));
1172 }
1173 };
1174 if let Err(err) = crate::filter::FilterSet::validate(filter_patterns.iter()) {
1176 return Err(AwsServiceError::aws_error(
1177 StatusCode::BAD_REQUEST,
1178 "InvalidParameterValueException",
1179 err,
1180 ));
1181 }
1182 let function_response_types: Vec<String> = body
1183 .get("FunctionResponseTypes")
1184 .and_then(|v| v.as_array())
1185 .map(|arr| {
1186 arr.iter()
1187 .filter_map(|v| v.as_str().map(String::from))
1188 .collect()
1189 })
1190 .unwrap_or_default();
1191 let starting_position = body
1192 .get("StartingPosition")
1193 .and_then(|v| v.as_str())
1194 .map(String::from);
1195 let starting_position_timestamp = body
1196 .get("StartingPositionTimestamp")
1197 .and_then(|v| v.as_f64());
1198 let parallelization_factor = body.get("ParallelizationFactor").and_then(|v| v.as_i64());
1199 let maximum_batching_window_in_seconds = body
1200 .get("MaximumBatchingWindowInSeconds")
1201 .and_then(|v| v.as_i64());
1202
1203 let mapping = EventSourceMapping {
1204 uuid: mapping_uuid.clone(),
1205 function_arn: function_arn.clone(),
1206 event_source_arn: event_source_arn.clone(),
1207 batch_size,
1208 enabled,
1209 state: if enabled {
1210 "Enabled".to_string()
1211 } else {
1212 "Disabled".to_string()
1213 },
1214 last_modified: now,
1215 filter_patterns,
1216 maximum_batching_window_in_seconds,
1217 starting_position,
1218 starting_position_timestamp,
1219 parallelization_factor,
1220 function_response_types,
1221 };
1222
1223 let response = self.event_source_mapping_json(&mapping);
1224 state.event_source_mappings.insert(mapping_uuid, mapping);
1225
1226 Ok(AwsResponse::json(
1227 StatusCode::ACCEPTED,
1228 response.to_string(),
1229 ))
1230 }
1231
1232 fn list_event_source_mappings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1233 let accounts = self.state.read();
1234 let empty = LambdaState::new(account_id, "");
1235 let state = accounts.get(account_id).unwrap_or(&empty);
1236 let mappings: Vec<Value> = state
1237 .event_source_mappings
1238 .values()
1239 .map(|m| self.event_source_mapping_json(m))
1240 .collect();
1241
1242 let response = json!({
1243 "EventSourceMappings": mappings,
1244 });
1245
1246 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1247 }
1248
1249 fn get_event_source_mapping(
1250 &self,
1251 uuid: &str,
1252 account_id: &str,
1253 ) -> Result<AwsResponse, AwsServiceError> {
1254 let accounts = self.state.read();
1255 let empty = LambdaState::new(account_id, "");
1256 let state = accounts.get(account_id).unwrap_or(&empty);
1257 let mapping = state.event_source_mappings.get(uuid).ok_or_else(|| {
1258 AwsServiceError::aws_error(
1259 StatusCode::NOT_FOUND,
1260 "ResourceNotFoundException",
1261 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
1262 )
1263 })?;
1264
1265 let response = self.event_source_mapping_json(mapping);
1266 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1267 }
1268
1269 fn delete_event_source_mapping(
1270 &self,
1271 uuid: &str,
1272 account_id: &str,
1273 ) -> Result<AwsResponse, AwsServiceError> {
1274 let mut accounts = self.state.write();
1275 let state = accounts.get_or_create(account_id);
1276 let mapping = state.event_source_mappings.remove(uuid).ok_or_else(|| {
1277 AwsServiceError::aws_error(
1278 StatusCode::NOT_FOUND,
1279 "ResourceNotFoundException",
1280 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
1281 )
1282 })?;
1283
1284 let mut response = self.event_source_mapping_json(&mapping);
1285 response["State"] = json!("Deleting");
1286 Ok(AwsResponse::json(
1287 StatusCode::ACCEPTED,
1288 response.to_string(),
1289 ))
1290 }
1291
1292 pub(crate) fn function_config_json(&self, func: &LambdaFunction) -> Value {
1293 let mut env_vars = json!({});
1294 if !func.environment.is_empty() {
1295 env_vars = json!({ "Variables": func.environment });
1296 }
1297
1298 let mut config = json!({
1299 "FunctionName": func.function_name,
1300 "FunctionArn": func.function_arn,
1301 "Runtime": func.runtime,
1302 "Role": func.role,
1303 "Handler": func.handler,
1304 "Description": func.description,
1305 "Timeout": func.timeout,
1306 "MemorySize": func.memory_size,
1307 "CodeSha256": func.code_sha256,
1308 "CodeSize": func.code_size,
1309 "Version": func.version,
1310 "LastModified": func.last_modified.format("%Y-%m-%dT%H:%M:%S%.3f+0000").to_string(),
1311 "PackageType": func.package_type,
1312 "Architectures": func.architectures,
1313 "Environment": env_vars,
1314 "State": "Active",
1315 "LastUpdateStatus": "Successful",
1316 "TracingConfig": { "Mode": "PassThrough" },
1317 "RevisionId": uuid::Uuid::new_v4().to_string(),
1318 });
1319 if let Some(ref uri) = func.image_uri {
1320 config["Code"] = json!({
1321 "ImageUri": uri,
1322 "ResolvedImageUri": uri,
1323 });
1324 }
1325 config
1326 }
1327
1328 fn event_source_mapping_json(&self, mapping: &EventSourceMapping) -> Value {
1329 let mut out = json!({
1330 "UUID": mapping.uuid,
1331 "FunctionArn": mapping.function_arn,
1332 "EventSourceArn": mapping.event_source_arn,
1333 "BatchSize": mapping.batch_size,
1334 "State": mapping.state,
1335 "LastModified": mapping.last_modified.timestamp_millis() as f64 / 1000.0,
1336 });
1337 let obj = out.as_object_mut().expect("json! built object");
1338 if !mapping.filter_patterns.is_empty() {
1339 obj.insert(
1340 "FilterCriteria".into(),
1341 json!({
1342 "Filters": mapping.filter_patterns.iter().map(|p| json!({"Pattern": p})).collect::<Vec<_>>(),
1343 }),
1344 );
1345 }
1346 if !mapping.function_response_types.is_empty() {
1347 obj.insert(
1348 "FunctionResponseTypes".into(),
1349 json!(mapping.function_response_types),
1350 );
1351 }
1352 if let Some(sp) = &mapping.starting_position {
1353 obj.insert("StartingPosition".into(), json!(sp));
1354 }
1355 if let Some(ts) = mapping.starting_position_timestamp {
1356 obj.insert("StartingPositionTimestamp".into(), json!(ts));
1357 }
1358 if let Some(pf) = mapping.parallelization_factor {
1359 obj.insert("ParallelizationFactor".into(), json!(pf));
1360 }
1361 if let Some(w) = mapping.maximum_batching_window_in_seconds {
1362 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1363 }
1364 out
1365 }
1366
1367 fn add_permission(
1380 &self,
1381 function_name: &str,
1382 req: &AwsRequest,
1383 ) -> Result<AwsResponse, AwsServiceError> {
1384 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1385 let statement_id = body
1386 .get("StatementId")
1387 .and_then(|v| v.as_str())
1388 .ok_or_else(|| {
1389 AwsServiceError::aws_error(
1390 StatusCode::BAD_REQUEST,
1391 "InvalidParameterValueException",
1392 "StatementId is required",
1393 )
1394 })?
1395 .to_string();
1396 let action = body
1397 .get("Action")
1398 .and_then(|v| v.as_str())
1399 .ok_or_else(|| {
1400 AwsServiceError::aws_error(
1401 StatusCode::BAD_REQUEST,
1402 "InvalidParameterValueException",
1403 "Action is required",
1404 )
1405 })?
1406 .to_string();
1407 let principal_raw = body
1408 .get("Principal")
1409 .and_then(|v| v.as_str())
1410 .ok_or_else(|| {
1411 AwsServiceError::aws_error(
1412 StatusCode::BAD_REQUEST,
1413 "InvalidParameterValueException",
1414 "Principal is required",
1415 )
1416 })?
1417 .to_string();
1418 let source_arn = body
1419 .get("SourceArn")
1420 .and_then(|v| v.as_str())
1421 .map(str::to_string);
1422 let source_account = body
1423 .get("SourceAccount")
1424 .and_then(|v| v.as_str())
1425 .map(str::to_string);
1426
1427 let mut accounts = self.state.write();
1428 let state = accounts.get_or_create(&req.account_id);
1429 let func = state.functions.get_mut(function_name).ok_or_else(|| {
1430 AwsServiceError::aws_error(
1431 StatusCode::NOT_FOUND,
1432 "ResourceNotFoundException",
1433 format!("Function not found: {function_name}"),
1434 )
1435 })?;
1436
1437 let mut doc: Value = func
1445 .policy
1446 .as_deref()
1447 .and_then(|s| serde_json::from_str::<Value>(s).ok())
1448 .filter(|v| v.is_object())
1449 .unwrap_or_else(|| json!({"Version": "2012-10-17", "Statement": []}));
1450
1451 if !doc.get("Statement").map(|s| s.is_array()).unwrap_or(false) {
1453 doc["Statement"] = json!([]);
1454 }
1455 let statements = doc["Statement"].as_array_mut().unwrap();
1456
1457 if statements
1460 .iter()
1461 .any(|s| s.get("Sid").and_then(|v| v.as_str()) == Some(statement_id.as_str()))
1462 {
1463 return Err(AwsServiceError::aws_error(
1464 StatusCode::CONFLICT,
1465 "ResourceConflictException",
1466 format!("The statement id ({statement_id}) provided already exists"),
1467 ));
1468 }
1469
1470 let principal_value =
1477 if principal_raw.ends_with(".amazonaws.com") || principal_raw.contains(".amazon") {
1478 json!({ "Service": principal_raw })
1479 } else {
1480 json!({ "AWS": principal_raw })
1481 };
1482
1483 let mut condition = serde_json::Map::new();
1487 if let Some(arn) = source_arn.as_ref() {
1488 condition.insert("ArnLike".to_string(), json!({ "aws:SourceArn": arn }));
1489 }
1490 if let Some(acct) = source_account.as_ref() {
1491 condition.insert(
1492 "StringEquals".to_string(),
1493 json!({ "aws:SourceAccount": acct }),
1494 );
1495 }
1496
1497 let mut new_statement = serde_json::Map::new();
1498 new_statement.insert("Sid".to_string(), json!(statement_id));
1499 new_statement.insert("Effect".to_string(), json!("Allow"));
1500 new_statement.insert("Principal".to_string(), principal_value);
1501 new_statement.insert("Action".to_string(), json!(format!("lambda:{action}")));
1502 new_statement.insert("Resource".to_string(), json!(func.function_arn));
1503 if !condition.is_empty() {
1504 new_statement.insert("Condition".to_string(), Value::Object(condition));
1505 }
1506 let statement_json = Value::Object(new_statement);
1507 statements.push(statement_json.clone());
1508
1509 func.policy = Some(serde_json::to_string(&doc).unwrap());
1510
1511 Ok(AwsResponse::json(
1512 StatusCode::CREATED,
1513 json!({ "Statement": serde_json::to_string(&statement_json).unwrap() }).to_string(),
1514 ))
1515 }
1516
1517 fn remove_permission(
1518 &self,
1519 function_name: &str,
1520 statement_id: &str,
1521 account_id: &str,
1522 ) -> Result<AwsResponse, AwsServiceError> {
1523 let mut accounts = self.state.write();
1524 let state = accounts.get_or_create(account_id);
1525 let func = state.functions.get_mut(function_name).ok_or_else(|| {
1526 AwsServiceError::aws_error(
1527 StatusCode::NOT_FOUND,
1528 "ResourceNotFoundException",
1529 format!("Function not found: {function_name}"),
1530 )
1531 })?;
1532 let policy_str = func.policy.as_deref().ok_or_else(|| {
1533 AwsServiceError::aws_error(
1534 StatusCode::NOT_FOUND,
1535 "ResourceNotFoundException",
1536 format!("No policy is associated with function {function_name}"),
1537 )
1538 })?;
1539 let mut doc: Value = serde_json::from_str(policy_str).map_err(|_| {
1540 AwsServiceError::aws_error(
1541 StatusCode::INTERNAL_SERVER_ERROR,
1542 "InternalError",
1543 "stored resource policy is not valid JSON",
1544 )
1545 })?;
1546 let statements = doc
1547 .get_mut("Statement")
1548 .and_then(|s| s.as_array_mut())
1549 .ok_or_else(|| {
1550 AwsServiceError::aws_error(
1551 StatusCode::INTERNAL_SERVER_ERROR,
1552 "InternalError",
1553 "stored resource policy has no Statement array",
1554 )
1555 })?;
1556 let before = statements.len();
1557 statements.retain(|s| s.get("Sid").and_then(|v| v.as_str()) != Some(statement_id));
1558 if statements.len() == before {
1559 return Err(AwsServiceError::aws_error(
1560 StatusCode::NOT_FOUND,
1561 "ResourceNotFoundException",
1562 format!("Statement {statement_id} is not found in resource policy"),
1563 ));
1564 }
1565 func.policy = Some(serde_json::to_string(&doc).unwrap());
1569 Ok(AwsResponse::json(StatusCode::NO_CONTENT, String::new()))
1570 }
1571
1572 fn get_policy(
1573 &self,
1574 function_name: &str,
1575 account_id: &str,
1576 ) -> Result<AwsResponse, AwsServiceError> {
1577 let accounts = self.state.read();
1578 let empty = LambdaState::new(account_id, "");
1579 let state = accounts.get(account_id).unwrap_or(&empty);
1580 let func = state.functions.get(function_name).ok_or_else(|| {
1581 AwsServiceError::aws_error(
1582 StatusCode::NOT_FOUND,
1583 "ResourceNotFoundException",
1584 format!("Function not found: {function_name}"),
1585 )
1586 })?;
1587 let policy = func.policy.as_deref().ok_or_else(|| {
1588 AwsServiceError::aws_error(
1589 StatusCode::NOT_FOUND,
1590 "ResourceNotFoundException",
1591 format!("No policy is associated with function {function_name}"),
1592 )
1593 })?;
1594 Ok(AwsResponse::json(
1595 StatusCode::OK,
1596 json!({
1597 "Policy": policy,
1598 "RevisionId": uuid::Uuid::new_v4().to_string(),
1599 })
1600 .to_string(),
1601 ))
1602 }
1603}
1604
1605#[async_trait]
1606impl AwsService for LambdaService {
1607 fn service_name(&self) -> &str {
1608 "lambda"
1609 }
1610
1611 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1612 let (action, resource_name) = Self::resolve_action(&req).ok_or_else(|| {
1613 AwsServiceError::aws_error(
1614 StatusCode::NOT_FOUND,
1615 "UnknownOperationException",
1616 format!("Unknown operation: {} {}", req.method, req.raw_path),
1617 )
1618 })?;
1619
1620 let mutates = matches!(
1621 action,
1622 "CreateFunction"
1623 | "DeleteFunction"
1624 | "PublishVersion"
1625 | "AddPermission"
1626 | "RemovePermission"
1627 | "CreateEventSourceMapping"
1628 | "DeleteEventSourceMapping"
1629 | "UpdateEventSourceMapping"
1630 | "UpdateFunctionCode"
1631 | "UpdateFunctionConfiguration"
1632 | "CreateAlias"
1633 | "DeleteAlias"
1634 | "UpdateAlias"
1635 | "PublishLayerVersion"
1636 | "DeleteLayerVersion"
1637 | "AddLayerVersionPermission"
1638 | "RemoveLayerVersionPermission"
1639 | "CreateFunctionUrlConfig"
1640 | "DeleteFunctionUrlConfig"
1641 | "UpdateFunctionUrlConfig"
1642 | "PutFunctionConcurrency"
1643 | "DeleteFunctionConcurrency"
1644 | "PutProvisionedConcurrencyConfig"
1645 | "DeleteProvisionedConcurrencyConfig"
1646 | "CreateCodeSigningConfig"
1647 | "UpdateCodeSigningConfig"
1648 | "DeleteCodeSigningConfig"
1649 | "PutFunctionCodeSigningConfig"
1650 | "DeleteFunctionCodeSigningConfig"
1651 | "PutFunctionEventInvokeConfig"
1652 | "UpdateFunctionEventInvokeConfig"
1653 | "DeleteFunctionEventInvokeConfig"
1654 | "PutRuntimeManagementConfig"
1655 | "PutFunctionScalingConfig"
1656 | "PutFunctionRecursionConfig"
1657 | "TagResource"
1658 | "UntagResource"
1659 | "CreateCapacityProvider"
1660 | "UpdateCapacityProvider"
1661 | "DeleteCapacityProvider"
1662 | "CheckpointDurableExecution"
1663 | "StopDurableExecution"
1664 | "SendDurableExecutionCallbackSuccess"
1665 | "SendDurableExecutionCallbackFailure"
1666 | "SendDurableExecutionCallbackHeartbeat"
1667 | "InvokeAsync"
1668 | "InvokeWithResponseStream"
1669 );
1670
1671 let aid = &req.account_id;
1672 let result = match action {
1673 "CreateFunction" => self.create_function(&req),
1674 "ListFunctions" => self.list_functions(aid),
1675 "GetFunction" => self.get_function(
1676 resource_name.as_deref().unwrap_or(""),
1677 aid,
1678 req.region.as_str(),
1679 ),
1680 "DeleteFunction" => self.delete_function(resource_name.as_deref().unwrap_or(""), aid),
1681 "Invoke" => {
1682 let invocation_type = InvocationType::from_header(
1683 req.headers
1684 .get("x-amz-invocation-type")
1685 .and_then(|v| v.to_str().ok()),
1686 );
1687 self.invoke(
1688 resource_name.as_deref().unwrap_or(""),
1689 &req.body,
1690 aid,
1691 invocation_type,
1692 )
1693 .await
1694 }
1695 "InvokeAsync" => {
1696 self.invoke(
1697 resource_name.as_deref().unwrap_or(""),
1698 &req.body,
1699 aid,
1700 InvocationType::Event,
1701 )
1702 .await
1703 }
1704 "PublishVersion" => self.publish_version(resource_name.as_deref().unwrap_or(""), aid),
1705 "AddPermission" => self.add_permission(resource_name.as_deref().unwrap_or(""), &req),
1706 "GetPolicy" => self.get_policy(resource_name.as_deref().unwrap_or(""), aid),
1707 "RemovePermission" => {
1708 let sid = req.path_segments.get(4).cloned().unwrap_or_default();
1710 self.remove_permission(resource_name.as_deref().unwrap_or(""), &sid, aid)
1711 }
1712 "CreateEventSourceMapping" => self.create_event_source_mapping(&req),
1713 "ListEventSourceMappings" => self.list_event_source_mappings(aid),
1714 "GetEventSourceMapping" => {
1715 self.get_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1716 }
1717 "DeleteEventSourceMapping" => {
1718 self.delete_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1719 }
1720 other => {
1721 self.handle_extra(other, resource_name.as_deref(), &req)
1722 .await
1723 }
1724 };
1725 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1726 self.save_snapshot().await;
1727 }
1728 result
1729 }
1730
1731 fn supported_actions(&self) -> &[&str] {
1732 &[
1733 "CreateFunction",
1734 "GetFunction",
1735 "DeleteFunction",
1736 "ListFunctions",
1737 "Invoke",
1738 "InvokeAsync",
1739 "InvokeWithResponseStream",
1740 "PublishVersion",
1741 "ListVersionsByFunction",
1742 "AddPermission",
1743 "RemovePermission",
1744 "GetPolicy",
1745 "CreateEventSourceMapping",
1746 "ListEventSourceMappings",
1747 "GetEventSourceMapping",
1748 "UpdateEventSourceMapping",
1749 "DeleteEventSourceMapping",
1750 "GetFunctionConfiguration",
1751 "UpdateFunctionConfiguration",
1752 "UpdateFunctionCode",
1753 "GetAccountSettings",
1754 "CreateAlias",
1755 "GetAlias",
1756 "ListAliases",
1757 "UpdateAlias",
1758 "DeleteAlias",
1759 "PublishLayerVersion",
1760 "GetLayerVersion",
1761 "GetLayerVersionByArn",
1762 "DeleteLayerVersion",
1763 "ListLayerVersions",
1764 "ListLayers",
1765 "GetLayerVersionPolicy",
1766 "AddLayerVersionPermission",
1767 "RemoveLayerVersionPermission",
1768 "CreateFunctionUrlConfig",
1769 "GetFunctionUrlConfig",
1770 "UpdateFunctionUrlConfig",
1771 "DeleteFunctionUrlConfig",
1772 "ListFunctionUrlConfigs",
1773 "PutFunctionConcurrency",
1774 "GetFunctionConcurrency",
1775 "DeleteFunctionConcurrency",
1776 "PutProvisionedConcurrencyConfig",
1777 "GetProvisionedConcurrencyConfig",
1778 "DeleteProvisionedConcurrencyConfig",
1779 "ListProvisionedConcurrencyConfigs",
1780 "CreateCodeSigningConfig",
1781 "GetCodeSigningConfig",
1782 "UpdateCodeSigningConfig",
1783 "DeleteCodeSigningConfig",
1784 "ListCodeSigningConfigs",
1785 "PutFunctionCodeSigningConfig",
1786 "GetFunctionCodeSigningConfig",
1787 "DeleteFunctionCodeSigningConfig",
1788 "ListFunctionsByCodeSigningConfig",
1789 "PutFunctionEventInvokeConfig",
1790 "GetFunctionEventInvokeConfig",
1791 "UpdateFunctionEventInvokeConfig",
1792 "DeleteFunctionEventInvokeConfig",
1793 "ListFunctionEventInvokeConfigs",
1794 "PutRuntimeManagementConfig",
1795 "GetRuntimeManagementConfig",
1796 "PutFunctionScalingConfig",
1797 "GetFunctionScalingConfig",
1798 "PutFunctionRecursionConfig",
1799 "GetFunctionRecursionConfig",
1800 "TagResource",
1801 "UntagResource",
1802 "ListTags",
1803 "CreateCapacityProvider",
1804 "GetCapacityProvider",
1805 "UpdateCapacityProvider",
1806 "DeleteCapacityProvider",
1807 "ListCapacityProviders",
1808 "ListFunctionVersionsByCapacityProvider",
1809 "CheckpointDurableExecution",
1810 "GetDurableExecution",
1811 "GetDurableExecutionHistory",
1812 "GetDurableExecutionState",
1813 "ListDurableExecutionsByFunction",
1814 "StopDurableExecution",
1815 "SendDurableExecutionCallbackSuccess",
1816 "SendDurableExecutionCallbackFailure",
1817 "SendDurableExecutionCallbackHeartbeat",
1818 ]
1819 }
1820
1821 fn iam_enforceable(&self) -> bool {
1822 true
1823 }
1824
1825 fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1829 let (action_str, resource_name) = Self::resolve_action(request)?;
1834 let action: &'static str = match action_str {
1835 "CreateFunction" => "CreateFunction",
1836 "ListFunctions" => "ListFunctions",
1837 "GetFunction" => "GetFunction",
1838 "DeleteFunction" => "DeleteFunction",
1839 "Invoke" => "InvokeFunction",
1840 "PublishVersion" => "PublishVersion",
1841 "AddPermission" => "AddPermission",
1842 "RemovePermission" => "RemovePermission",
1843 "GetPolicy" => "GetPolicy",
1844 "CreateEventSourceMapping" => "CreateEventSourceMapping",
1845 "ListEventSourceMappings" => "ListEventSourceMappings",
1846 "GetEventSourceMapping" => "GetEventSourceMapping",
1847 "DeleteEventSourceMapping" => "DeleteEventSourceMapping",
1848 _ => return None,
1849 };
1850 let accounts = self.state.read();
1851 let empty = LambdaState::new(&request.account_id, &request.region);
1852 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1853 let resource = match action {
1854 "GetFunction" | "DeleteFunction" | "InvokeFunction" | "PublishVersion"
1855 | "AddPermission" | "RemovePermission" | "GetPolicy" => {
1856 let name = resource_name.unwrap_or_default();
1857 if name.is_empty() {
1858 "*".to_string()
1859 } else {
1860 format!(
1861 "arn:aws:lambda:{}:{}:function:{}",
1862 state.region, state.account_id, name
1863 )
1864 }
1865 }
1866 "CreateFunction" => {
1867 serde_json::from_slice::<Value>(&request.body)
1872 .ok()
1873 .and_then(|v| {
1874 v.get("FunctionName").and_then(|f| f.as_str()).map(|n| {
1875 format!(
1876 "arn:aws:lambda:{}:{}:function:{}",
1877 state.region, state.account_id, n
1878 )
1879 })
1880 })
1881 .unwrap_or_else(|| "*".to_string())
1882 }
1883 _ => "*".to_string(),
1884 };
1885 Some(fakecloud_core::auth::IamAction {
1886 service: "lambda",
1887 action,
1888 resource,
1889 })
1890 }
1891
1892 fn iam_condition_keys_for(
1893 &self,
1894 request: &AwsRequest,
1895 action: &fakecloud_core::auth::IamAction,
1896 ) -> std::collections::BTreeMap<String, Vec<String>> {
1897 let mut out = std::collections::BTreeMap::new();
1898 if action.action == "AddPermission" {
1899 if action.resource != "*" {
1900 out.insert(
1901 "lambda:functionarn".to_string(),
1902 vec![action.resource.clone()],
1903 );
1904 }
1905 if let Ok(body) = serde_json::from_slice::<Value>(&request.body) {
1906 if let Some(principal) = body.get("Principal").and_then(|p| p.as_str()) {
1907 out.insert("lambda:principal".to_string(), vec![principal.to_string()]);
1908 }
1909 }
1910 }
1911 out
1912 }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917 use super::*;
1918 use bytes::Bytes;
1919 use http::{HeaderMap, Method};
1920 use parking_lot::RwLock;
1921 use std::collections::HashMap;
1922 use std::sync::Arc;
1923
1924 fn make_state() -> SharedLambdaState {
1925 Arc::new(RwLock::new(
1926 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1927 ))
1928 }
1929
1930 fn make_request(method: Method, path: &str, body: &str) -> AwsRequest {
1931 let path_segments: Vec<String> = path
1932 .split('/')
1933 .filter(|s| !s.is_empty())
1934 .map(|s| s.to_string())
1935 .collect();
1936 AwsRequest {
1937 service: "lambda".to_string(),
1938 action: String::new(),
1939 region: "us-east-1".to_string(),
1940 account_id: "123456789012".to_string(),
1941 request_id: "test-request-id".to_string(),
1942 headers: HeaderMap::new(),
1943 query_params: HashMap::new(),
1944 body: Bytes::from(body.to_string()),
1945 path_segments,
1946 raw_path: path.to_string(),
1947 raw_query: String::new(),
1948 method,
1949 is_query_protocol: false,
1950 access_key_id: None,
1951 principal: None,
1952 }
1953 }
1954
1955 #[test]
1956 fn iam_condition_keys_for_add_permission_populates_arn_and_principal() {
1957 let svc = LambdaService::new(make_state());
1958 let body = json!({
1959 "StatementId": "stmt",
1960 "Action": "lambda:InvokeFunction",
1961 "Principal": "s3.amazonaws.com",
1962 })
1963 .to_string();
1964 let req = make_request(Method::POST, "/2015-03-31/functions/my-func/policy", &body);
1965 let action = fakecloud_core::auth::IamAction {
1966 service: "lambda",
1967 action: "AddPermission",
1968 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
1969 };
1970 let keys = svc.iam_condition_keys_for(&req, &action);
1971 assert_eq!(
1972 keys.get("lambda:functionarn"),
1973 Some(&vec![
1974 "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string()
1975 ])
1976 );
1977 assert_eq!(
1978 keys.get("lambda:principal"),
1979 Some(&vec!["s3.amazonaws.com".to_string()])
1980 );
1981 }
1982
1983 #[test]
1984 fn iam_condition_keys_for_add_permission_omits_missing_principal() {
1985 let svc = LambdaService::new(make_state());
1986 let body = json!({"StatementId": "stmt", "Action": "lambda:InvokeFunction"}).to_string();
1987 let req = make_request(Method::POST, "/2015-03-31/functions/my-func/policy", &body);
1988 let action = fakecloud_core::auth::IamAction {
1989 service: "lambda",
1990 action: "AddPermission",
1991 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
1992 };
1993 let keys = svc.iam_condition_keys_for(&req, &action);
1994 assert!(!keys.contains_key("lambda:principal"));
1995 assert!(keys.contains_key("lambda:functionarn"));
1996 }
1997
1998 #[test]
1999 fn iam_condition_keys_for_non_add_permission_is_empty() {
2000 let svc = LambdaService::new(make_state());
2001 let req = make_request(Method::GET, "/2015-03-31/functions/my-func", "");
2002 let action = fakecloud_core::auth::IamAction {
2003 service: "lambda",
2004 action: "GetFunction",
2005 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
2006 };
2007 assert!(svc.iam_condition_keys_for(&req, &action).is_empty());
2008 }
2009
2010 #[tokio::test]
2011 async fn test_create_and_get_function() {
2012 let state = make_state();
2013 let svc = LambdaService::new(state);
2014
2015 let create_body = json!({
2016 "FunctionName": "my-func",
2017 "Runtime": "python3.12",
2018 "Role": "arn:aws:iam::123456789012:role/test-role",
2019 "Handler": "index.handler",
2020 "Code": { "ZipFile": "UEsFBgAAAAAAAAAAAAAAAAAAAAA=" }
2021 });
2022
2023 let req = make_request(
2024 Method::POST,
2025 "/2015-03-31/functions",
2026 &create_body.to_string(),
2027 );
2028 let resp = svc.handle(req).await.unwrap();
2029 assert_eq!(resp.status, StatusCode::CREATED);
2030
2031 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2032 assert_eq!(body["FunctionName"], "my-func");
2033 assert_eq!(body["Runtime"], "python3.12");
2034
2035 let req = make_request(Method::GET, "/2015-03-31/functions/my-func", "");
2037 let resp = svc.handle(req).await.unwrap();
2038 assert_eq!(resp.status, StatusCode::OK);
2039 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2040 assert_eq!(body["Configuration"]["FunctionName"], "my-func");
2041 }
2042
2043 #[tokio::test]
2044 async fn test_delete_function() {
2045 let state = make_state();
2046 let svc = LambdaService::new(state);
2047
2048 let create_body = json!({
2049 "FunctionName": "to-delete",
2050 "Runtime": "nodejs20.x",
2051 "Role": "arn:aws:iam::123456789012:role/test",
2052 "Handler": "index.handler",
2053 "Code": {}
2054 });
2055
2056 let req = make_request(
2057 Method::POST,
2058 "/2015-03-31/functions",
2059 &create_body.to_string(),
2060 );
2061 svc.handle(req).await.unwrap();
2062
2063 let req = make_request(Method::DELETE, "/2015-03-31/functions/to-delete", "");
2064 let resp = svc.handle(req).await.unwrap();
2065 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2066
2067 let req = make_request(Method::GET, "/2015-03-31/functions/to-delete", "");
2069 let resp = svc.handle(req).await;
2070 assert!(resp.is_err());
2071 }
2072
2073 #[tokio::test]
2074 async fn test_invoke_without_runtime_returns_error() {
2075 let state = make_state();
2076 let svc = LambdaService::new(state);
2077
2078 let create_body = json!({
2079 "FunctionName": "invoke-me",
2080 "Runtime": "python3.12",
2081 "Role": "arn:aws:iam::123456789012:role/test",
2082 "Handler": "index.handler",
2083 "Code": {}
2084 });
2085
2086 let req = make_request(
2087 Method::POST,
2088 "/2015-03-31/functions",
2089 &create_body.to_string(),
2090 );
2091 svc.handle(req).await.unwrap();
2092
2093 let req = make_request(
2094 Method::POST,
2095 "/2015-03-31/functions/invoke-me/invocations",
2096 r#"{"key": "value"}"#,
2097 );
2098 let resp = svc.handle(req).await;
2099 assert!(resp.is_err());
2100 }
2101
2102 #[tokio::test]
2103 async fn test_invoke_nonexistent_function() {
2104 let state = make_state();
2105 let svc = LambdaService::new(state);
2106
2107 let req = make_request(
2108 Method::POST,
2109 "/2015-03-31/functions/does-not-exist/invocations",
2110 "{}",
2111 );
2112 let resp = svc.handle(req).await;
2113 assert!(resp.is_err());
2114 }
2115
2116 #[tokio::test]
2117 async fn test_list_functions() {
2118 let state = make_state();
2119 let svc = LambdaService::new(state);
2120
2121 for name in &["func-a", "func-b"] {
2122 let create_body = json!({
2123 "FunctionName": name,
2124 "Runtime": "python3.12",
2125 "Role": "arn:aws:iam::123456789012:role/test",
2126 "Handler": "index.handler",
2127 "Code": {}
2128 });
2129 let req = make_request(
2130 Method::POST,
2131 "/2015-03-31/functions",
2132 &create_body.to_string(),
2133 );
2134 svc.handle(req).await.unwrap();
2135 }
2136
2137 let req = make_request(Method::GET, "/2015-03-31/functions", "");
2138 let resp = svc.handle(req).await.unwrap();
2139 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2140 assert_eq!(body["Functions"].as_array().unwrap().len(), 2);
2141 }
2142
2143 #[tokio::test]
2144 async fn test_event_source_mapping() {
2145 let state = make_state();
2146 let svc = LambdaService::new(state);
2147
2148 let create_body = json!({
2150 "FunctionName": "esm-func",
2151 "Runtime": "python3.12",
2152 "Role": "arn:aws:iam::123456789012:role/test",
2153 "Handler": "index.handler",
2154 "Code": {}
2155 });
2156 let req = make_request(
2157 Method::POST,
2158 "/2015-03-31/functions",
2159 &create_body.to_string(),
2160 );
2161 svc.handle(req).await.unwrap();
2162
2163 let mapping_body = json!({
2165 "FunctionName": "esm-func",
2166 "EventSourceArn": "arn:aws:sqs:us-east-1:123456789012:my-queue",
2167 "BatchSize": 5
2168 });
2169 let req = make_request(
2170 Method::POST,
2171 "/2015-03-31/event-source-mappings",
2172 &mapping_body.to_string(),
2173 );
2174 let resp = svc.handle(req).await.unwrap();
2175 assert_eq!(resp.status, StatusCode::ACCEPTED);
2176 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2177 let uuid = body["UUID"].as_str().unwrap().to_string();
2178
2179 let req = make_request(Method::GET, "/2015-03-31/event-source-mappings", "");
2181 let resp = svc.handle(req).await.unwrap();
2182 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2183 assert_eq!(body["EventSourceMappings"].as_array().unwrap().len(), 1);
2184
2185 let req = make_request(
2187 Method::DELETE,
2188 &format!("/2015-03-31/event-source-mappings/{uuid}"),
2189 "",
2190 );
2191 let resp = svc.handle(req).await.unwrap();
2192 assert_eq!(resp.status, StatusCode::ACCEPTED);
2193 }
2194
2195 async fn seed_function(svc: &LambdaService, name: &str) {
2196 let body = json!({
2197 "FunctionName": name,
2198 "Runtime": "python3.12",
2199 "Role": "arn:aws:iam::123456789012:role/r",
2200 "Handler": "index.handler",
2201 "Code": {}
2202 });
2203 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2204 svc.handle(req).await.unwrap();
2205 }
2206
2207 #[tokio::test]
2208 async fn add_permission_builds_canonical_statement() {
2209 let svc = LambdaService::new(make_state());
2210 seed_function(&svc, "f").await;
2211
2212 let body = json!({
2213 "StatementId": "s3-invoke",
2214 "Action": "InvokeFunction",
2215 "Principal": "s3.amazonaws.com",
2216 "SourceArn": "arn:aws:s3:::my-bucket",
2217 "SourceAccount": "123456789012",
2218 });
2219 let req = make_request(
2220 Method::POST,
2221 "/2015-03-31/functions/f/policy",
2222 &body.to_string(),
2223 );
2224 let resp = svc.handle(req).await.unwrap();
2225 assert_eq!(resp.status, StatusCode::CREATED);
2226
2227 let out: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2228 let statement: Value = serde_json::from_str(out["Statement"].as_str().unwrap()).unwrap();
2229 assert_eq!(statement["Sid"], "s3-invoke");
2230 assert_eq!(statement["Effect"], "Allow");
2231 assert_eq!(statement["Principal"]["Service"], "s3.amazonaws.com");
2232 assert_eq!(statement["Action"], "lambda:InvokeFunction");
2233 assert_eq!(
2234 statement["Resource"],
2235 "arn:aws:lambda:us-east-1:123456789012:function:f"
2236 );
2237 assert_eq!(
2238 statement["Condition"]["ArnLike"]["aws:SourceArn"],
2239 "arn:aws:s3:::my-bucket"
2240 );
2241 assert_eq!(
2242 statement["Condition"]["StringEquals"]["aws:SourceAccount"],
2243 "123456789012"
2244 );
2245 }
2246
2247 #[tokio::test]
2248 async fn add_permission_aws_principal_emits_aws_key() {
2249 let svc = LambdaService::new(make_state());
2250 seed_function(&svc, "f").await;
2251
2252 let body = json!({
2253 "StatementId": "user-invoke",
2254 "Action": "InvokeFunction",
2255 "Principal": "arn:aws:iam::123456789012:user/alice",
2256 });
2257 let req = make_request(
2258 Method::POST,
2259 "/2015-03-31/functions/f/policy",
2260 &body.to_string(),
2261 );
2262 svc.handle(req).await.unwrap();
2263
2264 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2266 let resp = svc.handle(req).await.unwrap();
2267 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2268 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2269 let statements = doc["Statement"].as_array().unwrap();
2270 assert_eq!(statements.len(), 1);
2271 assert_eq!(
2272 statements[0]["Principal"]["AWS"],
2273 "arn:aws:iam::123456789012:user/alice"
2274 );
2275 assert!(statements[0].get("Condition").is_none());
2276 }
2277
2278 #[tokio::test]
2279 async fn add_permission_rejects_duplicate_statement_id() {
2280 let svc = LambdaService::new(make_state());
2281 seed_function(&svc, "f").await;
2282
2283 let body = json!({
2284 "StatementId": "dup",
2285 "Action": "InvokeFunction",
2286 "Principal": "arn:aws:iam::123456789012:user/a",
2287 });
2288 let req = make_request(
2289 Method::POST,
2290 "/2015-03-31/functions/f/policy",
2291 &body.to_string(),
2292 );
2293 svc.handle(req).await.unwrap();
2294
2295 let req = make_request(
2296 Method::POST,
2297 "/2015-03-31/functions/f/policy",
2298 &body.to_string(),
2299 );
2300 let err = match svc.handle(req).await {
2301 Err(e) => e,
2302 Ok(_) => panic!("expected error"),
2303 };
2304 assert_eq!(err.status(), StatusCode::CONFLICT);
2305 }
2306
2307 #[tokio::test]
2308 async fn get_policy_returns_404_when_no_policy_attached() {
2309 let svc = LambdaService::new(make_state());
2310 seed_function(&svc, "f").await;
2311
2312 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2313 let err = match svc.handle(req).await {
2314 Err(e) => e,
2315 Ok(_) => panic!("expected error"),
2316 };
2317 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2318 }
2319
2320 #[tokio::test]
2321 async fn remove_permission_strips_matching_sid_and_leaves_empty_doc() {
2322 let svc = LambdaService::new(make_state());
2323 seed_function(&svc, "f").await;
2324
2325 for sid in ["a", "b"] {
2326 let body = json!({
2327 "StatementId": sid,
2328 "Action": "InvokeFunction",
2329 "Principal": "arn:aws:iam::123456789012:user/u",
2330 });
2331 let req = make_request(
2332 Method::POST,
2333 "/2015-03-31/functions/f/policy",
2334 &body.to_string(),
2335 );
2336 svc.handle(req).await.unwrap();
2337 }
2338
2339 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/a", "");
2341 let resp = svc.handle(req).await.unwrap();
2342 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2343
2344 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2346 let resp = svc.handle(req).await.unwrap();
2347 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2348 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2349 let stmts = doc["Statement"].as_array().unwrap();
2350 assert_eq!(stmts.len(), 1);
2351 assert_eq!(stmts[0]["Sid"], "b");
2352
2353 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/b", "");
2355 svc.handle(req).await.unwrap();
2356
2357 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2358 let resp = svc.handle(req).await.unwrap();
2359 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2360 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2361 assert_eq!(doc["Statement"].as_array().unwrap().len(), 0);
2362 }
2363
2364 #[tokio::test]
2365 async fn remove_permission_unknown_sid_is_404() {
2366 let svc = LambdaService::new(make_state());
2367 seed_function(&svc, "f").await;
2368
2369 let body = json!({
2370 "StatementId": "known",
2371 "Action": "InvokeFunction",
2372 "Principal": "arn:aws:iam::123456789012:user/u",
2373 });
2374 let req = make_request(
2375 Method::POST,
2376 "/2015-03-31/functions/f/policy",
2377 &body.to_string(),
2378 );
2379 svc.handle(req).await.unwrap();
2380
2381 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/other", "");
2382 let err = match svc.handle(req).await {
2383 Err(e) => e,
2384 Ok(_) => panic!("expected error"),
2385 };
2386 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2387 }
2388
2389 #[tokio::test]
2390 async fn add_permission_on_missing_function_is_404() {
2391 let svc = LambdaService::new(make_state());
2392 let body = json!({
2393 "StatementId": "s",
2394 "Action": "InvokeFunction",
2395 "Principal": "arn:aws:iam::123456789012:user/u",
2396 });
2397 let req = make_request(
2398 Method::POST,
2399 "/2015-03-31/functions/missing/policy",
2400 &body.to_string(),
2401 );
2402 let err = match svc.handle(req).await {
2403 Err(e) => e,
2404 Ok(_) => panic!("expected error"),
2405 };
2406 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2407 }
2408
2409 #[test]
2410 fn iam_action_for_maps_invoke_to_function_arn() {
2411 let svc = LambdaService::new(make_state());
2412 let req = make_request(Method::POST, "/2015-03-31/functions/f/invocations", "");
2413 let action = svc.iam_action_for(&req).unwrap();
2414 assert_eq!(action.service, "lambda");
2415 assert_eq!(action.action, "InvokeFunction");
2416 assert_eq!(
2417 action.resource,
2418 "arn:aws:lambda:us-east-1:123456789012:function:f"
2419 );
2420 }
2421
2422 #[test]
2423 fn iam_action_for_maps_list_to_star() {
2424 let svc = LambdaService::new(make_state());
2425 let req = make_request(Method::GET, "/2015-03-31/functions", "");
2426 let action = svc.iam_action_for(&req).unwrap();
2427 assert_eq!(action.action, "ListFunctions");
2428 assert_eq!(action.resource, "*");
2429 }
2430
2431 #[test]
2432 fn iam_action_for_create_reads_function_name_from_body() {
2433 let svc = LambdaService::new(make_state());
2434 let body = json!({
2435 "FunctionName": "newfn",
2436 "Runtime": "python3.12",
2437 "Role": "arn:aws:iam::123456789012:role/r",
2438 "Handler": "index.handler",
2439 "Code": {}
2440 });
2441 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2442 let action = svc.iam_action_for(&req).unwrap();
2443 assert_eq!(action.action, "CreateFunction");
2444 assert_eq!(
2445 action.resource,
2446 "arn:aws:lambda:us-east-1:123456789012:function:newfn"
2447 );
2448 }
2449
2450 #[tokio::test]
2453 async fn create_function_duplicate_returns_conflict() {
2454 let svc = LambdaService::new(make_state());
2455 seed_function(&svc, "dup-fn").await;
2456
2457 let body = json!({
2458 "FunctionName": "dup-fn",
2459 "Runtime": "python3.12",
2460 "Role": "arn:aws:iam::123456789012:role/r",
2461 "Handler": "index.handler",
2462 "Code": {"ZipFile": "UEsDBBQ="},
2463 });
2464 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2465 let err = match svc.handle(req).await {
2466 Err(e) => e,
2467 Ok(_) => panic!("expected ResourceConflictException"),
2468 };
2469 assert_eq!(err.status(), StatusCode::CONFLICT);
2470 }
2471
2472 #[tokio::test]
2473 async fn get_function_not_found() {
2474 let svc = LambdaService::new(make_state());
2475 let req = make_request(Method::GET, "/2015-03-31/functions/nope", "");
2476 let err = match svc.handle(req).await {
2477 Err(e) => e,
2478 Ok(_) => panic!("expected error"),
2479 };
2480 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2481 }
2482
2483 #[tokio::test]
2484 async fn delete_function_not_found() {
2485 let svc = LambdaService::new(make_state());
2486 let req = make_request(Method::DELETE, "/2015-03-31/functions/nope", "");
2487 let err = match svc.handle(req).await {
2488 Err(e) => e,
2489 Ok(_) => panic!("expected error"),
2490 };
2491 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2492 }
2493
2494 #[tokio::test]
2495 async fn get_event_source_mapping_not_found() {
2496 let svc = LambdaService::new(make_state());
2497 let req = make_request(
2498 Method::GET,
2499 "/2015-03-31/event-source-mappings/nonexistent",
2500 "",
2501 );
2502 let err = match svc.handle(req).await {
2503 Err(e) => e,
2504 Ok(_) => panic!("expected error"),
2505 };
2506 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2507 }
2508
2509 #[tokio::test]
2510 async fn delete_event_source_mapping_not_found() {
2511 let svc = LambdaService::new(make_state());
2512 let req = make_request(
2513 Method::DELETE,
2514 "/2015-03-31/event-source-mappings/nonexistent",
2515 "",
2516 );
2517 let err = match svc.handle(req).await {
2518 Err(e) => e,
2519 Ok(_) => panic!("expected error"),
2520 };
2521 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2522 }
2523
2524 #[tokio::test]
2525 async fn get_policy_on_missing_function() {
2526 let svc = LambdaService::new(make_state());
2527 let req = make_request(Method::GET, "/2015-03-31/functions/nope/policy", "");
2528 let err = match svc.handle(req).await {
2529 Err(e) => e,
2530 Ok(_) => panic!("expected error"),
2531 };
2532 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2533 }
2534
2535 #[tokio::test]
2536 async fn remove_permission_on_missing_function() {
2537 let svc = LambdaService::new(make_state());
2538 let req = make_request(
2539 Method::DELETE,
2540 "/2015-03-31/functions/nope/policy/stmt1",
2541 "",
2542 );
2543 let err = match svc.handle(req).await {
2544 Err(e) => e,
2545 Ok(_) => panic!("expected error"),
2546 };
2547 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2548 }
2549
2550 #[tokio::test]
2551 async fn publish_version_on_missing_function() {
2552 let svc = LambdaService::new(make_state());
2553 let req = make_request(Method::POST, "/2015-03-31/functions/nope/versions", "{}");
2554 let err = match svc.handle(req).await {
2555 Err(e) => e,
2556 Ok(_) => panic!("expected error"),
2557 };
2558 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2559 }
2560
2561 #[tokio::test]
2562 async fn unknown_route_returns_error() {
2563 let svc = LambdaService::new(make_state());
2564 let req = make_request(Method::POST, "/unknown/route", "{}");
2565 assert!(svc.handle(req).await.is_err());
2566 }
2567
2568 #[tokio::test]
2569 async fn publish_version_unknown_function_errors() {
2570 let svc = LambdaService::new(make_state());
2571 assert!(svc.publish_version("ghost", "123456789012").is_err());
2572 }
2573
2574 #[tokio::test]
2575 async fn get_function_unknown_errors() {
2576 let svc = LambdaService::new(make_state());
2577 assert!(svc
2578 .get_function("ghost", "123456789012", "us-east-1")
2579 .is_err());
2580 }
2581
2582 #[tokio::test]
2583 async fn delete_function_unknown_errors() {
2584 let svc = LambdaService::new(make_state());
2585 assert!(svc.delete_function("ghost", "123456789012").is_err());
2586 }
2587
2588 #[tokio::test]
2589 async fn get_event_source_mapping_unknown_errors() {
2590 let svc = LambdaService::new(make_state());
2591 assert!(svc
2592 .get_event_source_mapping("ghost", "123456789012")
2593 .is_err());
2594 }
2595
2596 #[tokio::test]
2597 async fn delete_event_source_mapping_unknown_errors() {
2598 let svc = LambdaService::new(make_state());
2599 assert!(svc
2600 .delete_event_source_mapping("ghost", "123456789012")
2601 .is_err());
2602 }
2603
2604 #[tokio::test]
2605 async fn list_functions_empty_ok() {
2606 let svc = LambdaService::new(make_state());
2607 let resp = svc.list_functions("123456789012").unwrap();
2608 assert_eq!(resp.status, http::StatusCode::OK);
2609 }
2610
2611 #[tokio::test]
2612 async fn list_event_source_mappings_empty_ok() {
2613 let svc = LambdaService::new(make_state());
2614 let resp = svc.list_event_source_mappings("123456789012").unwrap();
2615 assert_eq!(resp.status, http::StatusCode::OK);
2616 }
2617}