1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use http::{Method, StatusCode};
7use serde_json::{json, Value};
8use sha2::{Digest, Sha256};
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::ContainerRuntime;
15use crate::state::{
16 EventSourceMapping, LambdaFunction, LambdaSnapshot, LambdaState, SharedLambdaState,
17 LAMBDA_SNAPSHOT_SCHEMA_VERSION,
18};
19
20struct CreateFunctionInput {
24 function_name: String,
25 runtime: String,
26 role: String,
27 handler: String,
28 description: String,
29 timeout: i64,
30 memory_size: i64,
31 package_type: String,
32 tags: HashMap<String, String>,
33 environment: HashMap<String, String>,
34 architectures: Vec<String>,
35 code_zip: Option<Vec<u8>>,
36 code_fallback: Vec<u8>,
37 image_uri: Option<String>,
38}
39
40impl CreateFunctionInput {
41 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
42 let function_name = body["FunctionName"]
43 .as_str()
44 .ok_or_else(|| {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterValueException",
48 "FunctionName is required",
49 )
50 })?
51 .to_string();
52
53 let tags: HashMap<String, String> = body["Tags"]
54 .as_object()
55 .map(|m| {
56 m.iter()
57 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
58 .collect()
59 })
60 .unwrap_or_default();
61
62 let environment: HashMap<String, String> = body["Environment"]["Variables"]
63 .as_object()
64 .map(|m| {
65 m.iter()
66 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
67 .collect()
68 })
69 .unwrap_or_default();
70
71 let architectures = body["Architectures"]
72 .as_array()
73 .map(|a| {
74 a.iter()
75 .filter_map(|v| v.as_str().map(|s| s.to_string()))
76 .collect()
77 })
78 .unwrap_or_else(|| vec!["x86_64".to_string()]);
79
80 let code_zip: Option<Vec<u8>> = match body["Code"]["ZipFile"].as_str() {
81 Some(b64) => Some(
82 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64).map_err(
83 |_| {
84 AwsServiceError::aws_error(
85 StatusCode::BAD_REQUEST,
86 "InvalidParameterValueException",
87 "Could not decode Code.ZipFile: invalid base64",
88 )
89 },
90 )?,
91 ),
92 None => None,
93 };
94
95 let code_fallback = serde_json::to_vec(&body["Code"]).unwrap_or_default();
96
97 let package_type = body["PackageType"].as_str().unwrap_or("Zip").to_string();
98 let image_uri = if package_type == "Image" {
103 body["Code"]["ImageUri"].as_str().map(String::from)
104 } else {
105 None
106 };
107
108 if package_type == "Image" && image_uri.is_none() {
112 return Err(AwsServiceError::aws_error(
113 StatusCode::BAD_REQUEST,
114 "InvalidParameterValueException",
115 "Code.ImageUri is required when PackageType is Image",
116 ));
117 }
118
119 Ok(Self {
120 function_name,
121 runtime: body["Runtime"].as_str().unwrap_or("python3.12").to_string(),
122 role: body["Role"].as_str().unwrap_or("").to_string(),
123 handler: body["Handler"]
124 .as_str()
125 .unwrap_or("index.handler")
126 .to_string(),
127 description: body["Description"].as_str().unwrap_or("").to_string(),
128 timeout: body["Timeout"].as_i64().unwrap_or(3),
129 memory_size: body["MemorySize"].as_i64().unwrap_or(128),
130 package_type,
131 tags,
132 environment,
133 architectures,
134 code_zip,
135 code_fallback,
136 image_uri,
137 })
138 }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum InvocationType {
144 RequestResponse,
145 Event,
146 DryRun,
147}
148
149impl InvocationType {
150 pub fn from_header(value: Option<&str>) -> Self {
151 match value {
152 Some("Event") => Self::Event,
153 Some("DryRun") => Self::DryRun,
154 _ => Self::RequestResponse,
155 }
156 }
157}
158
159fn route_to_destination(
163 bus: Arc<fakecloud_core::delivery::DeliveryBus>,
164 function_arn: &str,
165 request_payload: &[u8],
166 result: &Result<Vec<u8>, String>,
167 destination_config: Option<&serde_json::Value>,
168) {
169 let Some(cfg) = destination_config else {
170 return;
171 };
172 let (key, condition, response_value): (&str, &str, serde_json::Value) = match result {
173 Ok(bytes) => (
174 "OnSuccess",
175 "Success",
176 serde_json::from_slice(bytes).unwrap_or(serde_json::Value::Null),
177 ),
178 Err(err) => (
179 "OnFailure",
180 "RetriesExhausted",
181 serde_json::json!({ "errorMessage": err }),
182 ),
183 };
184 let Some(dest) = cfg
185 .get(key)
186 .and_then(|v| v.get("Destination"))
187 .and_then(|v| v.as_str())
188 else {
189 return;
190 };
191 let request_payload_v: serde_json::Value =
192 serde_json::from_slice(request_payload).unwrap_or(serde_json::Value::Null);
193 let record = serde_json::json!({
194 "version": "1.0",
195 "timestamp": chrono::Utc::now().to_rfc3339(),
196 "requestContext": {
197 "requestId": uuid::Uuid::new_v4().to_string(),
198 "functionArn": format!("{function_arn}:$LATEST"),
199 "condition": condition,
200 "approximateInvokeCount": 1,
201 },
202 "requestPayload": request_payload_v,
203 "responseContext": {
204 "statusCode": 200,
205 "executedVersion": "$LATEST",
206 },
207 "responsePayload": response_value,
208 });
209 let body = record.to_string();
210 if dest.contains(":sqs:") {
211 bus.send_to_sqs(dest, &body, &std::collections::HashMap::new());
212 } else if dest.contains(":sns:") {
213 bus.publish_to_sns(dest, &body, None);
214 } else if dest.contains(":lambda:") {
215 let dest = dest.to_string();
216 let payload = body.clone();
217 tokio::spawn(async move {
218 let _ = bus.invoke_lambda(&dest, &payload).await;
219 });
220 } else if dest.contains(":events:") || dest.contains(":eventbridge:") {
221 let detail_type = if result.is_ok() {
222 "Lambda Function Invocation Result - Success"
223 } else {
224 "Lambda Function Invocation Result - Failure"
225 };
226 bus.put_event_to_eventbridge("lambda", detail_type, &body, "default");
227 }
228}
229
230pub struct LambdaService {
231 pub(crate) state: SharedLambdaState,
232 runtime: Option<Arc<ContainerRuntime>>,
233 snapshot_store: Option<Arc<dyn SnapshotStore>>,
234 snapshot_lock: Arc<AsyncMutex<()>>,
235 pub(crate) delivery_bus: Option<Arc<fakecloud_core::delivery::DeliveryBus>>,
236 pub(crate) role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
237}
238
239impl LambdaService {
240 pub fn new(state: SharedLambdaState) -> Self {
241 Self {
242 state,
243 runtime: None,
244 snapshot_store: None,
245 snapshot_lock: Arc::new(AsyncMutex::new(())),
246 delivery_bus: None,
247 role_trust_validator: None,
248 }
249 }
250
251 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
252 self.runtime = Some(runtime);
253 self
254 }
255
256 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
257 self.snapshot_store = Some(store);
258 self
259 }
260
261 pub fn with_delivery_bus(mut self, bus: Arc<fakecloud_core::delivery::DeliveryBus>) -> Self {
262 self.delivery_bus = Some(bus);
263 self
264 }
265
266 pub fn with_role_trust_validator(
267 mut self,
268 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
269 ) -> Self {
270 self.role_trust_validator = Some(validator);
271 self
272 }
273
274 async fn save_snapshot(&self) {
275 let Some(store) = self.snapshot_store.clone() else {
276 return;
277 };
278 let _guard = self.snapshot_lock.lock().await;
279 let snapshot = LambdaSnapshot {
280 schema_version: LAMBDA_SNAPSHOT_SCHEMA_VERSION,
281 accounts: Some(self.state.read().clone()),
282 state: None,
283 };
284 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
285 let bytes = serde_json::to_vec(&snapshot)
286 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
287 store.save(&bytes)
288 })
289 .await;
290 match join {
291 Ok(Ok(())) => {}
292 Ok(Err(err)) => tracing::error!(%err, "failed to write lambda snapshot"),
293 Err(err) => tracing::error!(%err, "lambda snapshot task panicked"),
294 }
295 }
296
297 fn resolve_action(req: &AwsRequest) -> Option<(&'static str, Option<String>)> {
310 let segs = &req.path_segments;
311 if segs.is_empty() {
312 return None;
313 }
314 let prefix = segs[0].as_str();
318
319 if segs.get(1).map(|s| s.as_str()) == Some("account-settings") && req.method == Method::GET
321 {
322 return Some(("GetAccountSettings", None));
323 }
324 if segs.get(1).map(|s| s.as_str()) == Some("functions")
325 && segs.get(3).map(|s| s.as_str()) == Some("invoke-async")
326 && req.method == Method::POST
327 {
328 return Some(("InvokeAsync", segs.get(2).map(|s| s.to_string())));
329 }
330 if segs.get(1).map(|s| s.as_str()) == Some("functions")
331 && segs.get(3).map(|s| s.as_str()) == Some("response-streaming-invocations")
332 && req.method == Method::POST
333 {
334 return Some((
335 "InvokeWithResponseStream",
336 segs.get(2).map(|s| s.to_string()),
337 ));
338 }
339
340 if segs.get(1).map(|s| s.as_str()) == Some("functions")
342 && segs.get(3).map(|s| s.as_str()) == Some("concurrency")
343 {
344 let res = segs.get(2).map(|s| s.to_string());
345 return match req.method {
346 Method::PUT => Some(("PutFunctionConcurrency", res)),
347 Method::GET => Some(("GetFunctionConcurrency", res)),
348 Method::DELETE => Some(("DeleteFunctionConcurrency", res)),
349 _ => None,
350 };
351 }
352
353 if segs.get(1).map(|s| s.as_str()) == Some("functions")
355 && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency")
356 {
357 let res = segs.get(2).map(|s| s.to_string());
358 return match req.method {
359 Method::PUT => Some(("PutProvisionedConcurrencyConfig", res)),
360 Method::GET => Some(("GetProvisionedConcurrencyConfig", res)),
361 Method::DELETE => Some(("DeleteProvisionedConcurrencyConfig", res)),
362 _ => None,
363 };
364 }
365 if segs.get(1).map(|s| s.as_str()) == Some("functions")
366 && segs.get(3).map(|s| s.as_str()) == Some("provisioned-concurrency-configs")
367 && req.method == Method::GET
368 {
369 return Some((
370 "ListProvisionedConcurrencyConfigs",
371 segs.get(2).map(|s| s.to_string()),
372 ));
373 }
374
375 if segs.get(1).map(|s| s.as_str()) == Some("functions")
377 && segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
378 {
379 let res = segs.get(2).map(|s| s.to_string());
380 return match req.method {
381 Method::POST => Some(("PutFunctionEventInvokeConfig", res)),
382 Method::PUT => Some(("UpdateFunctionEventInvokeConfig", res)),
383 Method::GET => Some(("GetFunctionEventInvokeConfig", res)),
384 Method::DELETE => Some(("DeleteFunctionEventInvokeConfig", res)),
385 _ => None,
386 };
387 }
388 if segs.get(1).map(|s| s.as_str()) == Some("functions")
389 && (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config-list")
390 || (segs.get(3).map(|s| s.as_str()) == Some("event-invoke-config")
391 && segs.get(4).map(|s| s.as_str()) == Some("list")))
392 && req.method == Method::GET
393 {
394 return Some((
395 "ListFunctionEventInvokeConfigs",
396 segs.get(2).map(|s| s.to_string()),
397 ));
398 }
399
400 if segs.get(1).map(|s| s.as_str()) == Some("functions")
402 && segs.get(3).map(|s| s.as_str()) == Some("recursion-config")
403 {
404 let res = segs.get(2).map(|s| s.to_string());
405 return match req.method {
406 Method::PUT => Some(("PutFunctionRecursionConfig", res)),
407 Method::GET => Some(("GetFunctionRecursionConfig", res)),
408 _ => None,
409 };
410 }
411
412 if segs.get(1).map(|s| s.as_str()) == Some("functions")
414 && segs.get(3).map(|s| s.as_str()) == Some("runtime-management-config")
415 {
416 let res = segs.get(2).map(|s| s.to_string());
417 return match req.method {
418 Method::PUT => Some(("PutRuntimeManagementConfig", res)),
419 Method::GET => Some(("GetRuntimeManagementConfig", res)),
420 _ => None,
421 };
422 }
423
424 if segs.get(1).map(|s| s.as_str()) == Some("functions")
426 && segs.get(3).map(|s| s.as_str()) == Some("code-signing-config")
427 {
428 let res = segs.get(2).map(|s| s.to_string());
429 return match req.method {
430 Method::PUT => Some(("PutFunctionCodeSigningConfig", res)),
431 Method::GET => Some(("GetFunctionCodeSigningConfig", res)),
432 Method::DELETE => Some(("DeleteFunctionCodeSigningConfig", res)),
433 _ => None,
434 };
435 }
436 if segs.get(1).map(|s| s.as_str()) == Some("code-signing-configs") {
437 let res = segs.get(2).map(|s| s.to_string());
438 return match (
439 req.method.clone(),
440 segs.len(),
441 segs.get(3).map(|s| s.as_str()),
442 ) {
443 (Method::POST, 2, _) => Some(("CreateCodeSigningConfig", None)),
444 (Method::GET, 2, _) => Some(("ListCodeSigningConfigs", None)),
445 (Method::GET, 3, _) => Some(("GetCodeSigningConfig", res)),
446 (Method::PUT, 3, _) => Some(("UpdateCodeSigningConfig", res)),
447 (Method::DELETE, 3, _) => Some(("DeleteCodeSigningConfig", res)),
448 (Method::GET, 4, Some("functions")) => {
449 Some(("ListFunctionsByCodeSigningConfig", res))
450 }
451 _ => None,
452 };
453 }
454
455 if segs.get(1).map(|s| s.as_str()) == Some("tags") && segs.len() >= 3 {
457 let res = segs[2..].join("/");
458 return match req.method {
459 Method::POST => Some(("TagResource", Some(res))),
460 Method::DELETE => Some(("UntagResource", Some(res))),
461 Method::GET => Some(("ListTags", Some(res))),
462 _ => None,
463 };
464 }
465
466 if segs.get(1).map(|s| s.as_str()) == Some("functions")
468 && segs.get(3).map(|s| s.as_str()) == Some("url")
469 {
470 let res = segs.get(2).map(|s| s.to_string());
471 return match req.method {
472 Method::POST => Some(("CreateFunctionUrlConfig", res)),
473 Method::GET => Some(("GetFunctionUrlConfig", res)),
474 Method::PUT => Some(("UpdateFunctionUrlConfig", res)),
475 Method::DELETE => Some(("DeleteFunctionUrlConfig", res)),
476 _ => None,
477 };
478 }
479 if segs.get(1).map(|s| s.as_str()) == Some("function-urls") && req.method == Method::GET {
480 return Some(("ListFunctionUrlConfigs", None));
481 }
482 if segs.get(1).map(|s| s.as_str()) == Some("functions")
483 && segs.get(3).map(|s| s.as_str()) == Some("urls")
484 && req.method == Method::GET
485 {
486 return Some(("ListFunctionUrlConfigs", segs.get(2).map(|s| s.to_string())));
487 }
488 if segs.get(1).map(|s| s.as_str()) == Some("event-source-mappings")
489 && segs.get(3).map(|s| s.as_str()) == Some("scaling-config")
490 {
491 let res = segs.get(2).map(|s| s.to_string());
492 return match req.method {
493 Method::PUT => Some(("PutFunctionScalingConfig", res)),
494 Method::GET => Some(("GetFunctionScalingConfig", res)),
495 _ => None,
496 };
497 }
498
499 if segs.get(1).map(|s| s.as_str()) == Some("capacity-providers") {
501 let res = segs.get(2).map(|s| s.to_string());
502 return match (
503 req.method.clone(),
504 segs.len(),
505 segs.get(3).map(|s| s.as_str()),
506 ) {
507 (Method::POST, 2, _) => Some(("CreateCapacityProvider", None)),
508 (Method::GET, 2, _) => Some(("ListCapacityProviders", None)),
509 (Method::GET, 3, _) => Some(("GetCapacityProvider", res)),
510 (Method::PUT, 3, _) => Some(("UpdateCapacityProvider", res)),
511 (Method::DELETE, 3, _) => Some(("DeleteCapacityProvider", res)),
512 (Method::GET, 4, Some("function-versions")) => {
513 Some(("ListFunctionVersionsByCapacityProvider", res))
514 }
515 _ => None,
516 };
517 }
518
519 if segs.get(1).map(|s| s.as_str()) == Some("functions")
521 && segs.get(3).map(|s| s.as_str()) == Some("durable-executions")
522 && req.method == Method::GET
523 {
524 return Some((
525 "ListDurableExecutionsByFunction",
526 segs.get(2).map(|s| s.to_string()),
527 ));
528 }
529
530 if segs.get(1).map(|s| s.as_str()) == Some("durable-execution-callbacks")
532 && req.method == Method::POST
533 {
534 let res = segs.get(2).map(|s| s.to_string());
535 return match segs.get(3).map(|s| s.as_str()) {
536 Some("success") | Some("succeed") => {
537 Some(("SendDurableExecutionCallbackSuccess", res))
538 }
539 Some("failure") | Some("fail") => {
540 Some(("SendDurableExecutionCallbackFailure", res))
541 }
542 Some("heartbeat") => Some(("SendDurableExecutionCallbackHeartbeat", res)),
543 _ => None,
544 };
545 }
546
547 if segs.get(1).map(|s| s.as_str()) == Some("durable-executions") {
549 let res = segs.get(2).map(|s| s.to_string());
550 return match (
551 req.method.clone(),
552 segs.len(),
553 segs.get(3).map(|s| s.as_str()),
554 segs.get(4).map(|s| s.as_str()),
555 ) {
556 (Method::GET, 3, _, _) => Some(("GetDurableExecution", res)),
557 (Method::GET, 4, Some("history"), _) => Some(("GetDurableExecutionHistory", res)),
558 (Method::GET, 4, Some("state"), _) => Some(("GetDurableExecutionState", res)),
559 (Method::POST, 4, Some("checkpoint"), _) => {
560 Some(("CheckpointDurableExecution", res))
561 }
562 (Method::POST, 4, Some("stop"), _) => Some(("StopDurableExecution", res)),
563 (Method::POST, 5, Some("callback"), Some("success")) => {
564 Some(("SendDurableExecutionCallbackSuccess", res))
565 }
566 (Method::POST, 5, Some("callback"), Some("failure")) => {
567 Some(("SendDurableExecutionCallbackFailure", res))
568 }
569 (Method::POST, 5, Some("callback"), Some("heartbeat")) => {
570 Some(("SendDurableExecutionCallbackHeartbeat", res))
571 }
572 _ => None,
573 };
574 }
575
576 if prefix == "2018-10-31" && segs.get(1).map(|s| s.as_str()) == Some("layers") {
583 let layer = segs.get(2).map(|s| s.to_string());
584 let third = segs.get(3).map(|s| s.as_str());
585 let version = segs.get(4).map(|s| s.to_string());
586 return match (&req.method, segs.len(), third, version.is_some()) {
587 (&Method::GET, 2, _, _) => Some(("ListLayers", None)),
588 (&Method::POST, 4, Some("versions"), false) => Some(("PublishLayerVersion", layer)),
589 (&Method::GET, 4, Some("versions"), false) => Some(("ListLayerVersions", layer)),
590 (&Method::GET, 5, Some("versions"), true) => Some(("GetLayerVersion", version)),
591 (&Method::DELETE, 5, Some("versions"), true) => {
592 Some(("DeleteLayerVersion", version))
593 }
594 (&Method::GET, 6, Some("versions"), true)
595 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
596 {
597 Some(("GetLayerVersionPolicy", version))
598 }
599 (&Method::POST, 6, Some("versions"), true)
600 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
601 {
602 Some(("AddLayerVersionPermission", version))
603 }
604 (&Method::DELETE, 7, Some("versions"), true)
605 if segs.get(5).map(|s| s.as_str()) == Some("policy") =>
606 {
607 Some(("RemoveLayerVersionPermission", version))
608 }
609 _ => None,
610 };
611 }
612
613 if prefix == "2018-10-31"
615 && segs.get(1).map(|s| s.as_str()) == Some("layers-by-arn")
616 && req.method == Method::GET
617 {
618 return Some(("GetLayerVersionByArn", None));
619 }
620
621 if prefix != "2015-03-31" {
625 return None;
626 }
627
628 let collection = segs.get(1).map(|s| s.as_str());
629 let resource = segs.get(2).map(|s| s.to_string());
630 let third = segs.get(3).map(|s| s.as_str());
631 let fourth = segs.get(4).map(|s| s.as_str());
632
633 let action = match (&req.method, segs.len(), collection, third) {
634 (&Method::POST, 2, Some("functions"), _) => "CreateFunction",
635 (&Method::GET, 2, Some("functions"), _) => "ListFunctions",
636 (&Method::GET, 3, Some("functions"), _) => "GetFunction",
637 (&Method::DELETE, 3, Some("functions"), _) => "DeleteFunction",
638 (&Method::POST, 4, Some("functions"), Some("invocations")) => "Invoke",
639 (&Method::POST, 4, Some("functions"), Some("invoke-async")) => "InvokeAsync",
640 (&Method::POST, 4, Some("functions"), Some("response-streaming-invocations")) => {
641 "InvokeWithResponseStream"
642 }
643 (&Method::POST, 4, Some("functions"), Some("versions")) => "PublishVersion",
644 (&Method::GET, 4, Some("functions"), Some("versions")) => "ListVersionsByFunction",
645 (&Method::POST, 4, Some("functions"), Some("policy")) => "AddPermission",
646 (&Method::GET, 4, Some("functions"), Some("policy")) => "GetPolicy",
647 (&Method::DELETE, 5, Some("functions"), Some("policy")) => "RemovePermission",
648 (&Method::POST, 4, Some("functions"), Some("aliases")) => "CreateAlias",
649 (&Method::GET, 4, Some("functions"), Some("aliases")) => "ListAliases",
650 (&Method::GET, 5, Some("functions"), Some("aliases")) => "GetAlias",
651 (&Method::PUT, 5, Some("functions"), Some("aliases")) => "UpdateAlias",
652 (&Method::DELETE, 5, Some("functions"), Some("aliases")) => "DeleteAlias",
653 (&Method::GET, 4, Some("functions"), Some("configuration")) => {
654 "GetFunctionConfiguration"
655 }
656 (&Method::PUT, 4, Some("functions"), Some("configuration")) => {
657 "UpdateFunctionConfiguration"
658 }
659 (&Method::PUT, 4, Some("functions"), Some("code")) => "UpdateFunctionCode",
660 (&Method::PUT, 4, Some("functions"), Some("concurrency")) => "PutFunctionConcurrency",
661 (&Method::GET, 4, Some("functions"), Some("concurrency")) => "GetFunctionConcurrency",
662 (&Method::DELETE, 4, Some("functions"), Some("concurrency")) => {
663 "DeleteFunctionConcurrency"
664 }
665 (&Method::PUT, 4, Some("functions"), Some("provisioned-concurrency")) => {
666 "PutProvisionedConcurrencyConfig"
667 }
668 (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency")) => {
669 "GetProvisionedConcurrencyConfig"
670 }
671 (&Method::DELETE, 4, Some("functions"), Some("provisioned-concurrency")) => {
672 "DeleteProvisionedConcurrencyConfig"
673 }
674 (&Method::GET, 4, Some("functions"), Some("provisioned-concurrency-configs")) => {
675 "ListProvisionedConcurrencyConfigs"
676 }
677 (&Method::PUT, 4, Some("functions"), Some("event-invoke-config")) => {
678 "UpdateFunctionEventInvokeConfig"
679 }
680 (&Method::POST, 4, Some("functions"), Some("event-invoke-config")) => {
681 "PutFunctionEventInvokeConfig"
682 }
683 (&Method::GET, 4, Some("functions"), Some("event-invoke-config")) => {
684 "GetFunctionEventInvokeConfig"
685 }
686 (&Method::DELETE, 4, Some("functions"), Some("event-invoke-config")) => {
687 "DeleteFunctionEventInvokeConfig"
688 }
689 (&Method::GET, 4, Some("functions"), Some("event-invoke-config-list")) => {
690 "ListFunctionEventInvokeConfigs"
691 }
692 (&Method::PUT, 4, Some("functions"), Some("code-signing-config")) => {
693 "PutFunctionCodeSigningConfig"
694 }
695 (&Method::GET, 4, Some("functions"), Some("code-signing-config")) => {
696 "GetFunctionCodeSigningConfig"
697 }
698 (&Method::DELETE, 4, Some("functions"), Some("code-signing-config")) => {
699 "DeleteFunctionCodeSigningConfig"
700 }
701 (&Method::PUT, 4, Some("functions"), Some("runtime-management-config")) => {
702 "PutRuntimeManagementConfig"
703 }
704 (&Method::GET, 4, Some("functions"), Some("runtime-management-config")) => {
705 "GetRuntimeManagementConfig"
706 }
707 (&Method::PUT, 4, Some("functions"), Some("scaling-config")) => {
708 "PutFunctionScalingConfig"
709 }
710 (&Method::GET, 4, Some("functions"), Some("scaling-config")) => {
711 "GetFunctionScalingConfig"
712 }
713 (&Method::PUT, 4, Some("functions"), Some("recursion-config")) => {
714 "PutFunctionRecursionConfig"
715 }
716 (&Method::GET, 4, Some("functions"), Some("recursion-config")) => {
717 "GetFunctionRecursionConfig"
718 }
719 (&Method::GET, 4, Some("functions"), Some("durable-executions")) => {
720 "ListDurableExecutionsByFunction"
721 }
722 (&Method::POST, 2, Some("event-source-mappings"), _) => "CreateEventSourceMapping",
723 (&Method::GET, 2, Some("event-source-mappings"), _) => "ListEventSourceMappings",
724 (&Method::GET, 3, Some("event-source-mappings"), _) => "GetEventSourceMapping",
725 (&Method::PUT, 3, Some("event-source-mappings"), _) => "UpdateEventSourceMapping",
726 (&Method::DELETE, 3, Some("event-source-mappings"), _) => "DeleteEventSourceMapping",
727 (&Method::POST, 3, Some("tags"), _) => "TagResource",
728 (&Method::DELETE, 3, Some("tags"), _) => "UntagResource",
729 (&Method::GET, 3, Some("tags"), _) => "ListTags",
730 _ => return None,
731 };
732 let _ = fourth;
733
734 Some((action, resource))
735 }
736
737 fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
738 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
739 let input = CreateFunctionInput::from_body(&body)?;
740
741 if let Some(ref validator) = self.role_trust_validator {
746 if let Err(err) =
747 validator.validate(&req.account_id, &input.role, "lambda.amazonaws.com")
748 {
749 return Err(AwsServiceError::aws_error(
750 StatusCode::BAD_REQUEST,
751 "InvalidParameterValueException",
752 err.to_string(),
753 ));
754 }
755 }
756
757 let mut accounts = self.state.write();
758 let state = accounts.get_or_create(&req.account_id);
759
760 if state.functions.contains_key(&input.function_name) {
761 return Err(AwsServiceError::aws_error(
762 StatusCode::CONFLICT,
763 "ResourceConflictException",
764 format!("Function already exist: {}", input.function_name),
765 ));
766 }
767
768 let code_bytes = input.code_zip.as_deref().unwrap_or(&input.code_fallback);
771 let mut hasher = Sha256::new();
772 hasher.update(code_bytes);
773 let hash = hasher.finalize();
774 let code_sha256 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
775 let code_size = code_bytes.len() as i64;
776
777 let function_arn = format!(
778 "arn:aws:lambda:{}:{}:function:{}",
779 state.region, state.account_id, input.function_name
780 );
781 let now = Utc::now();
782
783 let func = LambdaFunction {
784 function_name: input.function_name.clone(),
785 function_arn,
786 runtime: input.runtime,
787 role: input.role,
788 handler: input.handler,
789 description: input.description,
790 timeout: input.timeout,
791 memory_size: input.memory_size,
792 code_sha256,
793 code_size,
794 version: "$LATEST".to_string(),
795 last_modified: now,
796 tags: input.tags,
797 environment: input.environment,
798 architectures: input.architectures,
799 package_type: input.package_type,
800 code_zip: input.code_zip,
801 image_uri: input.image_uri,
802 policy: None,
803 };
804
805 let response = self.function_config_json(&func);
806
807 state.functions.insert(input.function_name, func);
808
809 Ok(AwsResponse::json(StatusCode::CREATED, response.to_string()))
810 }
811
812 fn get_function(
813 &self,
814 function_name: &str,
815 account_id: &str,
816 region: &str,
817 ) -> Result<AwsResponse, AwsServiceError> {
818 let accounts = self.state.read();
819 let empty = LambdaState::new(account_id, region);
820 let state = accounts.get(account_id).unwrap_or(&empty);
821 let func = state.functions.get(function_name).ok_or_else(|| {
822 AwsServiceError::aws_error(
823 StatusCode::NOT_FOUND,
824 "ResourceNotFoundException",
825 format!(
826 "Function not found: arn:aws:lambda:{}:{}:function:{}",
827 state.region, state.account_id, function_name
828 ),
829 )
830 })?;
831
832 let config = self.function_config_json(func);
833 let code = if let Some(ref uri) = func.image_uri {
834 json!({
835 "ImageUri": uri,
836 "ResolvedImageUri": uri,
837 "RepositoryType": "ECR",
838 })
839 } else {
840 json!({
841 "Location": format!(
842 "https://awslambda-{}-tasks.s3.{}.amazonaws.com/stub",
843 func.function_arn.split(':').nth(3).unwrap_or("us-east-1"),
844 func.function_arn.split(':').nth(3).unwrap_or("us-east-1")
845 ),
846 "RepositoryType": "S3",
847 })
848 };
849 let response = json!({
850 "Code": code,
851 "Configuration": config,
852 "Tags": func.tags,
853 });
854
855 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
856 }
857
858 fn delete_function(
859 &self,
860 function_name: &str,
861 account_id: &str,
862 ) -> Result<AwsResponse, AwsServiceError> {
863 let mut accounts = self.state.write();
864 let state = accounts.get_or_create(account_id);
865 let region = state.region.clone();
866 let account_id = state.account_id.clone();
867 if state.functions.remove(function_name).is_none() {
868 return Err(AwsServiceError::aws_error(
869 StatusCode::NOT_FOUND,
870 "ResourceNotFoundException",
871 format!(
872 "Function not found: arn:aws:lambda:{}:{}:function:{}",
873 region, account_id, function_name
874 ),
875 ));
876 }
877
878 if let Some(ref runtime) = self.runtime {
880 let rt = runtime.clone();
881 let name = function_name.to_string();
882 tokio::spawn(async move { rt.stop_container(&name).await });
883 }
884
885 Ok(AwsResponse::json(StatusCode::NO_CONTENT, ""))
886 }
887
888 fn list_functions(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
889 let accounts = self.state.read();
890 let empty = LambdaState::new(account_id, "");
891 let state = accounts.get(account_id).unwrap_or(&empty);
892 let functions: Vec<Value> = state
893 .functions
894 .values()
895 .map(|f| self.function_config_json(f))
896 .collect();
897
898 let response = json!({
899 "Functions": functions,
900 });
901
902 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
903 }
904
905 async fn invoke(
906 &self,
907 function_name: &str,
908 payload: &[u8],
909 account_id: &str,
910 invocation_type: InvocationType,
911 ) -> Result<AwsResponse, AwsServiceError> {
912 let func = {
913 let accounts = self.state.read();
914 let empty = LambdaState::new(account_id, "");
915 let state = accounts.get(account_id).unwrap_or(&empty);
916 state.functions.get(function_name).cloned().ok_or_else(|| {
917 AwsServiceError::aws_error(
918 StatusCode::NOT_FOUND,
919 "ResourceNotFoundException",
920 format!(
921 "Function not found: arn:aws:lambda:{}:{}:function:{}",
922 state.region, state.account_id, function_name
923 ),
924 )
925 })?
926 };
927
928 if func.code_zip.is_none() {
929 return Err(AwsServiceError::aws_error(
930 StatusCode::BAD_REQUEST,
931 "InvalidParameterValueException",
932 "Function has no deployment package",
933 ));
934 }
935
936 if matches!(invocation_type, InvocationType::DryRun) {
937 let mut resp = AwsResponse::json(StatusCode::NO_CONTENT, "");
938 resp.headers.insert(
939 http::header::HeaderName::from_static("x-amz-executed-version"),
940 http::header::HeaderValue::from_static("$LATEST"),
941 );
942 return Ok(resp);
943 }
944
945 let runtime = self.runtime.as_ref().ok_or_else(|| {
946 AwsServiceError::aws_error(
947 StatusCode::INTERNAL_SERVER_ERROR,
948 "ServiceException",
949 "Docker/Podman is required for Lambda execution but is not available",
950 )
951 })?;
952
953 match invocation_type {
954 InvocationType::Event => {
955 let runtime = runtime.clone();
957 let func_clone = func.clone();
958 let payload_vec = payload.to_vec();
959 let bus = self.delivery_bus.clone();
960 let destination_config = self.lookup_destination_config(&func, account_id);
961 let function_arn = func.function_arn.clone();
962 tokio::spawn(async move {
963 let result = match runtime.invoke(&func_clone, &payload_vec).await {
964 Ok(bytes) => {
965 let parsed: Option<serde_json::Value> =
969 serde_json::from_slice(&bytes).ok();
970 let is_error = parsed
971 .as_ref()
972 .and_then(|v| v.as_object())
973 .map(|m| {
974 m.contains_key("errorMessage") || m.contains_key("errorType")
975 })
976 .unwrap_or(false);
977 if is_error {
978 let msg = parsed
979 .as_ref()
980 .and_then(|v| v.get("errorMessage"))
981 .and_then(|v| v.as_str())
982 .unwrap_or("function error")
983 .to_string();
984 Err(msg)
985 } else {
986 Ok(bytes)
987 }
988 }
989 Err(e) => Err(e.to_string()),
990 };
991 if let Some(bus) = bus {
992 route_to_destination(
993 bus,
994 &function_arn,
995 &payload_vec,
996 &result,
997 destination_config.as_ref(),
998 );
999 }
1000 });
1001 let mut resp = AwsResponse::json(StatusCode::ACCEPTED, "");
1002 resp.headers.insert(
1003 http::header::HeaderName::from_static("x-amz-executed-version"),
1004 http::header::HeaderValue::from_static("$LATEST"),
1005 );
1006 Ok(resp)
1007 }
1008 InvocationType::RequestResponse | InvocationType::DryRun => {
1009 match runtime.invoke(&func, payload).await {
1010 Ok(response_bytes) => {
1011 let mut resp = AwsResponse::json(StatusCode::OK, response_bytes);
1012 resp.headers.insert(
1013 http::header::HeaderName::from_static("x-amz-executed-version"),
1014 http::header::HeaderValue::from_static("$LATEST"),
1015 );
1016 Ok(resp)
1017 }
1018 Err(e) => {
1019 tracing::error!(function = %function_name, error = %e, "Lambda invocation failed");
1020 Err(AwsServiceError::aws_error(
1021 StatusCode::INTERNAL_SERVER_ERROR,
1022 "ServiceException",
1023 format!("Lambda execution failed: {e}"),
1024 ))
1025 }
1026 }
1027 }
1028 }
1029 }
1030
1031 fn lookup_destination_config(
1036 &self,
1037 func: &crate::state::LambdaFunction,
1038 account_id: &str,
1039 ) -> Option<serde_json::Value> {
1040 let accounts = self.state.read();
1041 let state = accounts.get(account_id)?;
1042 let key = format!("{}:$LATEST", func.function_name);
1043 state
1044 .event_invoke_configs
1045 .get(&key)
1046 .map(|cfg| cfg.destination_config.clone())
1047 .filter(|v| !v.is_null() && !v.as_object().map(|o| o.is_empty()).unwrap_or(false))
1048 }
1049
1050 fn publish_version(
1051 &self,
1052 function_name: &str,
1053 account_id: &str,
1054 ) -> Result<AwsResponse, AwsServiceError> {
1055 let accounts = self.state.read();
1056 let empty = LambdaState::new(account_id, "");
1057 let state = accounts.get(account_id).unwrap_or(&empty);
1058 let func = state.functions.get(function_name).ok_or_else(|| {
1059 AwsServiceError::aws_error(
1060 StatusCode::NOT_FOUND,
1061 "ResourceNotFoundException",
1062 format!(
1063 "Function not found: arn:aws:lambda:{}:{}:function:{}",
1064 state.region, state.account_id, function_name
1065 ),
1066 )
1067 })?;
1068
1069 let mut config = self.function_config_json(func);
1070 config["Version"] = json!("1");
1072 config["FunctionArn"] = json!(format!("{}:1", func.function_arn));
1073
1074 Ok(AwsResponse::json(StatusCode::CREATED, config.to_string()))
1075 }
1076
1077 fn create_event_source_mapping(
1078 &self,
1079 req: &AwsRequest,
1080 ) -> Result<AwsResponse, AwsServiceError> {
1081 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1082 let event_source_arn = body["EventSourceArn"]
1083 .as_str()
1084 .ok_or_else(|| {
1085 AwsServiceError::aws_error(
1086 StatusCode::BAD_REQUEST,
1087 "InvalidParameterValueException",
1088 "EventSourceArn is required",
1089 )
1090 })?
1091 .to_string();
1092
1093 let function_name = body["FunctionName"]
1094 .as_str()
1095 .ok_or_else(|| {
1096 AwsServiceError::aws_error(
1097 StatusCode::BAD_REQUEST,
1098 "InvalidParameterValueException",
1099 "FunctionName is required",
1100 )
1101 })?
1102 .to_string();
1103
1104 let mut accounts = self.state.write();
1105 let state = accounts.get_or_create(&req.account_id);
1106
1107 let function_arn = if function_name.starts_with("arn:") {
1109 function_name.clone()
1110 } else {
1111 let func = state.functions.get(&function_name).ok_or_else(|| {
1112 AwsServiceError::aws_error(
1113 StatusCode::NOT_FOUND,
1114 "ResourceNotFoundException",
1115 format!(
1116 "Function not found: arn:aws:lambda:{}:{}:function:{}",
1117 state.region, state.account_id, function_name
1118 ),
1119 )
1120 })?;
1121 func.function_arn.clone()
1122 };
1123
1124 let batch_size = body["BatchSize"].as_i64().unwrap_or(10);
1125 let enabled = body["Enabled"].as_bool().unwrap_or(true);
1126 let mapping_uuid = uuid::Uuid::new_v4().to_string();
1127 let now = Utc::now();
1128
1129 let filter_patterns: Vec<String> = match body.get("FilterCriteria") {
1137 None | Some(Value::Null) => Vec::new(),
1138 Some(Value::Object(_)) => {
1139 match body.get("FilterCriteria").and_then(|v| v.get("Filters")) {
1140 None => Vec::new(),
1141 Some(Value::Array(arr)) => {
1142 let mut out = Vec::with_capacity(arr.len());
1143 for f in arr {
1144 match f.get("Pattern") {
1145 Some(Value::String(s)) => out.push(s.clone()),
1146 _ => {
1147 return Err(AwsServiceError::aws_error(
1148 StatusCode::BAD_REQUEST,
1149 "InvalidParameterValueException",
1150 "FilterCriteria.Filters[].Pattern must be a string",
1151 ));
1152 }
1153 }
1154 }
1155 out
1156 }
1157 Some(_) => {
1158 return Err(AwsServiceError::aws_error(
1159 StatusCode::BAD_REQUEST,
1160 "InvalidParameterValueException",
1161 "FilterCriteria.Filters must be an array",
1162 ));
1163 }
1164 }
1165 }
1166 Some(_) => {
1167 return Err(AwsServiceError::aws_error(
1168 StatusCode::BAD_REQUEST,
1169 "InvalidParameterValueException",
1170 "FilterCriteria must be an object",
1171 ));
1172 }
1173 };
1174 if let Err(err) = crate::filter::FilterSet::validate(filter_patterns.iter()) {
1176 return Err(AwsServiceError::aws_error(
1177 StatusCode::BAD_REQUEST,
1178 "InvalidParameterValueException",
1179 err,
1180 ));
1181 }
1182 let function_response_types: Vec<String> = body
1183 .get("FunctionResponseTypes")
1184 .and_then(|v| v.as_array())
1185 .map(|arr| {
1186 arr.iter()
1187 .filter_map(|v| v.as_str().map(String::from))
1188 .collect()
1189 })
1190 .unwrap_or_default();
1191 let starting_position = body
1192 .get("StartingPosition")
1193 .and_then(|v| v.as_str())
1194 .map(String::from);
1195 let starting_position_timestamp = body
1196 .get("StartingPositionTimestamp")
1197 .and_then(|v| v.as_f64());
1198 let parallelization_factor = body.get("ParallelizationFactor").and_then(|v| v.as_i64());
1199 let maximum_batching_window_in_seconds = body
1200 .get("MaximumBatchingWindowInSeconds")
1201 .and_then(|v| v.as_i64());
1202
1203 let mapping = EventSourceMapping {
1204 uuid: mapping_uuid.clone(),
1205 function_arn: function_arn.clone(),
1206 event_source_arn: event_source_arn.clone(),
1207 batch_size,
1208 enabled,
1209 state: if enabled {
1210 "Enabled".to_string()
1211 } else {
1212 "Disabled".to_string()
1213 },
1214 last_modified: now,
1215 filter_patterns,
1216 maximum_batching_window_in_seconds,
1217 starting_position,
1218 starting_position_timestamp,
1219 parallelization_factor,
1220 function_response_types,
1221 };
1222
1223 let response = self.event_source_mapping_json(&mapping);
1224 state.event_source_mappings.insert(mapping_uuid, mapping);
1225
1226 Ok(AwsResponse::json(
1227 StatusCode::ACCEPTED,
1228 response.to_string(),
1229 ))
1230 }
1231
1232 fn list_event_source_mappings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1233 let accounts = self.state.read();
1234 let empty = LambdaState::new(account_id, "");
1235 let state = accounts.get(account_id).unwrap_or(&empty);
1236 let mappings: Vec<Value> = state
1237 .event_source_mappings
1238 .values()
1239 .map(|m| self.event_source_mapping_json(m))
1240 .collect();
1241
1242 let response = json!({
1243 "EventSourceMappings": mappings,
1244 });
1245
1246 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1247 }
1248
1249 fn get_event_source_mapping(
1250 &self,
1251 uuid: &str,
1252 account_id: &str,
1253 ) -> Result<AwsResponse, AwsServiceError> {
1254 let accounts = self.state.read();
1255 let empty = LambdaState::new(account_id, "");
1256 let state = accounts.get(account_id).unwrap_or(&empty);
1257 let mapping = state.event_source_mappings.get(uuid).ok_or_else(|| {
1258 AwsServiceError::aws_error(
1259 StatusCode::NOT_FOUND,
1260 "ResourceNotFoundException",
1261 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
1262 )
1263 })?;
1264
1265 let response = self.event_source_mapping_json(mapping);
1266 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
1267 }
1268
1269 fn delete_event_source_mapping(
1270 &self,
1271 uuid: &str,
1272 account_id: &str,
1273 ) -> Result<AwsResponse, AwsServiceError> {
1274 let mut accounts = self.state.write();
1275 let state = accounts.get_or_create(account_id);
1276 let mapping = state.event_source_mappings.remove(uuid).ok_or_else(|| {
1277 AwsServiceError::aws_error(
1278 StatusCode::NOT_FOUND,
1279 "ResourceNotFoundException",
1280 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
1281 )
1282 })?;
1283
1284 let mut response = self.event_source_mapping_json(&mapping);
1285 response["State"] = json!("Deleting");
1286 Ok(AwsResponse::json(
1287 StatusCode::ACCEPTED,
1288 response.to_string(),
1289 ))
1290 }
1291
1292 pub(crate) fn function_config_json(&self, func: &LambdaFunction) -> Value {
1293 let mut env_vars = json!({});
1294 if !func.environment.is_empty() {
1295 env_vars = json!({ "Variables": func.environment });
1296 }
1297
1298 let mut config = json!({
1299 "FunctionName": func.function_name,
1300 "FunctionArn": func.function_arn,
1301 "Runtime": func.runtime,
1302 "Role": func.role,
1303 "Handler": func.handler,
1304 "Description": func.description,
1305 "Timeout": func.timeout,
1306 "MemorySize": func.memory_size,
1307 "CodeSha256": func.code_sha256,
1308 "CodeSize": func.code_size,
1309 "Version": func.version,
1310 "LastModified": func.last_modified.format("%Y-%m-%dT%H:%M:%S%.3f+0000").to_string(),
1311 "PackageType": func.package_type,
1312 "Architectures": func.architectures,
1313 "Environment": env_vars,
1314 "State": "Active",
1315 "LastUpdateStatus": "Successful",
1316 "TracingConfig": { "Mode": "PassThrough" },
1317 "RevisionId": uuid::Uuid::new_v4().to_string(),
1318 });
1319 if let Some(ref uri) = func.image_uri {
1320 config["Code"] = json!({
1321 "ImageUri": uri,
1322 "ResolvedImageUri": uri,
1323 });
1324 }
1325 config
1326 }
1327
1328 fn event_source_mapping_json(&self, mapping: &EventSourceMapping) -> Value {
1329 let mut out = json!({
1330 "UUID": mapping.uuid,
1331 "FunctionArn": mapping.function_arn,
1332 "EventSourceArn": mapping.event_source_arn,
1333 "BatchSize": mapping.batch_size,
1334 "State": mapping.state,
1335 "LastModified": mapping.last_modified.timestamp_millis() as f64 / 1000.0,
1336 });
1337 let obj = out.as_object_mut().expect("json! built object");
1338 if !mapping.filter_patterns.is_empty() {
1339 obj.insert(
1340 "FilterCriteria".into(),
1341 json!({
1342 "Filters": mapping.filter_patterns.iter().map(|p| json!({"Pattern": p})).collect::<Vec<_>>(),
1343 }),
1344 );
1345 }
1346 if !mapping.function_response_types.is_empty() {
1347 obj.insert(
1348 "FunctionResponseTypes".into(),
1349 json!(mapping.function_response_types),
1350 );
1351 }
1352 if let Some(sp) = &mapping.starting_position {
1353 obj.insert("StartingPosition".into(), json!(sp));
1354 }
1355 if let Some(ts) = mapping.starting_position_timestamp {
1356 obj.insert("StartingPositionTimestamp".into(), json!(ts));
1357 }
1358 if let Some(pf) = mapping.parallelization_factor {
1359 obj.insert("ParallelizationFactor".into(), json!(pf));
1360 }
1361 if let Some(w) = mapping.maximum_batching_window_in_seconds {
1362 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1363 }
1364 out
1365 }
1366
1367 fn add_permission(
1380 &self,
1381 function_name: &str,
1382 req: &AwsRequest,
1383 ) -> Result<AwsResponse, AwsServiceError> {
1384 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
1385 let statement_id = body
1386 .get("StatementId")
1387 .and_then(|v| v.as_str())
1388 .ok_or_else(|| {
1389 AwsServiceError::aws_error(
1390 StatusCode::BAD_REQUEST,
1391 "InvalidParameterValueException",
1392 "StatementId is required",
1393 )
1394 })?
1395 .to_string();
1396 let action = body
1397 .get("Action")
1398 .and_then(|v| v.as_str())
1399 .ok_or_else(|| {
1400 AwsServiceError::aws_error(
1401 StatusCode::BAD_REQUEST,
1402 "InvalidParameterValueException",
1403 "Action is required",
1404 )
1405 })?
1406 .to_string();
1407 let principal_raw = body
1408 .get("Principal")
1409 .and_then(|v| v.as_str())
1410 .ok_or_else(|| {
1411 AwsServiceError::aws_error(
1412 StatusCode::BAD_REQUEST,
1413 "InvalidParameterValueException",
1414 "Principal is required",
1415 )
1416 })?
1417 .to_string();
1418 let source_arn = body
1419 .get("SourceArn")
1420 .and_then(|v| v.as_str())
1421 .map(str::to_string);
1422 let source_account = body
1423 .get("SourceAccount")
1424 .and_then(|v| v.as_str())
1425 .map(str::to_string);
1426
1427 let mut accounts = self.state.write();
1428 let state = accounts.get_or_create(&req.account_id);
1429 let func = state.functions.get_mut(function_name).ok_or_else(|| {
1430 AwsServiceError::aws_error(
1431 StatusCode::NOT_FOUND,
1432 "ResourceNotFoundException",
1433 format!("Function not found: {function_name}"),
1434 )
1435 })?;
1436
1437 let mut doc: Value = func
1445 .policy
1446 .as_deref()
1447 .and_then(|s| serde_json::from_str::<Value>(s).ok())
1448 .filter(|v| v.is_object())
1449 .unwrap_or_else(|| json!({"Version": "2012-10-17", "Statement": []}));
1450
1451 if !doc.get("Statement").map(|s| s.is_array()).unwrap_or(false) {
1453 doc["Statement"] = json!([]);
1454 }
1455 let statements = doc["Statement"].as_array_mut().unwrap();
1456
1457 if statements
1460 .iter()
1461 .any(|s| s.get("Sid").and_then(|v| v.as_str()) == Some(statement_id.as_str()))
1462 {
1463 return Err(AwsServiceError::aws_error(
1464 StatusCode::CONFLICT,
1465 "ResourceConflictException",
1466 format!("The statement id ({statement_id}) provided already exists"),
1467 ));
1468 }
1469
1470 let principal_value =
1477 if principal_raw.ends_with(".amazonaws.com") || principal_raw.contains(".amazon") {
1478 json!({ "Service": principal_raw })
1479 } else {
1480 json!({ "AWS": principal_raw })
1481 };
1482
1483 let mut condition = serde_json::Map::new();
1487 if let Some(arn) = source_arn.as_ref() {
1488 condition.insert("ArnLike".to_string(), json!({ "aws:SourceArn": arn }));
1489 }
1490 if let Some(acct) = source_account.as_ref() {
1491 condition.insert(
1492 "StringEquals".to_string(),
1493 json!({ "aws:SourceAccount": acct }),
1494 );
1495 }
1496
1497 let mut new_statement = serde_json::Map::new();
1498 new_statement.insert("Sid".to_string(), json!(statement_id));
1499 new_statement.insert("Effect".to_string(), json!("Allow"));
1500 new_statement.insert("Principal".to_string(), principal_value);
1501 new_statement.insert("Action".to_string(), json!(format!("lambda:{action}")));
1502 new_statement.insert("Resource".to_string(), json!(func.function_arn));
1503 if !condition.is_empty() {
1504 new_statement.insert("Condition".to_string(), Value::Object(condition));
1505 }
1506 let statement_json = Value::Object(new_statement);
1507 statements.push(statement_json.clone());
1508
1509 func.policy = Some(serde_json::to_string(&doc).unwrap());
1510
1511 Ok(AwsResponse::json(
1512 StatusCode::CREATED,
1513 json!({ "Statement": serde_json::to_string(&statement_json).unwrap() }).to_string(),
1514 ))
1515 }
1516
1517 fn remove_permission(
1518 &self,
1519 function_name: &str,
1520 statement_id: &str,
1521 account_id: &str,
1522 ) -> Result<AwsResponse, AwsServiceError> {
1523 let mut accounts = self.state.write();
1524 let state = accounts.get_or_create(account_id);
1525 let func = state.functions.get_mut(function_name).ok_or_else(|| {
1526 AwsServiceError::aws_error(
1527 StatusCode::NOT_FOUND,
1528 "ResourceNotFoundException",
1529 format!("Function not found: {function_name}"),
1530 )
1531 })?;
1532 let policy_str = func.policy.as_deref().ok_or_else(|| {
1533 AwsServiceError::aws_error(
1534 StatusCode::NOT_FOUND,
1535 "ResourceNotFoundException",
1536 format!("No policy is associated with function {function_name}"),
1537 )
1538 })?;
1539 let mut doc: Value = serde_json::from_str(policy_str).map_err(|_| {
1540 AwsServiceError::aws_error(
1541 StatusCode::INTERNAL_SERVER_ERROR,
1542 "InternalError",
1543 "stored resource policy is not valid JSON",
1544 )
1545 })?;
1546 let statements = doc
1547 .get_mut("Statement")
1548 .and_then(|s| s.as_array_mut())
1549 .ok_or_else(|| {
1550 AwsServiceError::aws_error(
1551 StatusCode::INTERNAL_SERVER_ERROR,
1552 "InternalError",
1553 "stored resource policy has no Statement array",
1554 )
1555 })?;
1556 let before = statements.len();
1557 statements.retain(|s| s.get("Sid").and_then(|v| v.as_str()) != Some(statement_id));
1558 if statements.len() == before {
1559 return Err(AwsServiceError::aws_error(
1560 StatusCode::NOT_FOUND,
1561 "ResourceNotFoundException",
1562 format!("Statement {statement_id} is not found in resource policy"),
1563 ));
1564 }
1565 func.policy = Some(serde_json::to_string(&doc).unwrap());
1569 Ok(AwsResponse::json(StatusCode::NO_CONTENT, String::new()))
1570 }
1571
1572 fn get_policy(
1573 &self,
1574 function_name: &str,
1575 account_id: &str,
1576 ) -> Result<AwsResponse, AwsServiceError> {
1577 let accounts = self.state.read();
1578 let empty = LambdaState::new(account_id, "");
1579 let state = accounts.get(account_id).unwrap_or(&empty);
1580 let func = state.functions.get(function_name).ok_or_else(|| {
1581 AwsServiceError::aws_error(
1582 StatusCode::NOT_FOUND,
1583 "ResourceNotFoundException",
1584 format!("Function not found: {function_name}"),
1585 )
1586 })?;
1587 let policy = func.policy.as_deref().ok_or_else(|| {
1588 AwsServiceError::aws_error(
1589 StatusCode::NOT_FOUND,
1590 "ResourceNotFoundException",
1591 format!("No policy is associated with function {function_name}"),
1592 )
1593 })?;
1594 Ok(AwsResponse::json(
1595 StatusCode::OK,
1596 json!({
1597 "Policy": policy,
1598 "RevisionId": uuid::Uuid::new_v4().to_string(),
1599 })
1600 .to_string(),
1601 ))
1602 }
1603}
1604
1605#[async_trait]
1606impl AwsService for LambdaService {
1607 fn service_name(&self) -> &str {
1608 "lambda"
1609 }
1610
1611 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1612 let (action, resource_name) = Self::resolve_action(&req).ok_or_else(|| {
1613 AwsServiceError::aws_error(
1614 StatusCode::NOT_FOUND,
1615 "UnknownOperationException",
1616 format!("Unknown operation: {} {}", req.method, req.raw_path),
1617 )
1618 })?;
1619
1620 let mutates = matches!(
1621 action,
1622 "CreateFunction"
1623 | "DeleteFunction"
1624 | "PublishVersion"
1625 | "AddPermission"
1626 | "RemovePermission"
1627 | "CreateEventSourceMapping"
1628 | "DeleteEventSourceMapping"
1629 | "UpdateEventSourceMapping"
1630 | "UpdateFunctionCode"
1631 | "UpdateFunctionConfiguration"
1632 | "CreateAlias"
1633 | "DeleteAlias"
1634 | "UpdateAlias"
1635 | "PublishLayerVersion"
1636 | "DeleteLayerVersion"
1637 | "AddLayerVersionPermission"
1638 | "RemoveLayerVersionPermission"
1639 | "CreateFunctionUrlConfig"
1640 | "DeleteFunctionUrlConfig"
1641 | "UpdateFunctionUrlConfig"
1642 | "PutFunctionConcurrency"
1643 | "DeleteFunctionConcurrency"
1644 | "PutProvisionedConcurrencyConfig"
1645 | "DeleteProvisionedConcurrencyConfig"
1646 | "CreateCodeSigningConfig"
1647 | "UpdateCodeSigningConfig"
1648 | "DeleteCodeSigningConfig"
1649 | "PutFunctionCodeSigningConfig"
1650 | "DeleteFunctionCodeSigningConfig"
1651 | "PutFunctionEventInvokeConfig"
1652 | "UpdateFunctionEventInvokeConfig"
1653 | "DeleteFunctionEventInvokeConfig"
1654 | "PutRuntimeManagementConfig"
1655 | "PutFunctionScalingConfig"
1656 | "PutFunctionRecursionConfig"
1657 | "TagResource"
1658 | "UntagResource"
1659 | "CreateCapacityProvider"
1660 | "UpdateCapacityProvider"
1661 | "DeleteCapacityProvider"
1662 | "CheckpointDurableExecution"
1663 | "StopDurableExecution"
1664 | "SendDurableExecutionCallbackSuccess"
1665 | "SendDurableExecutionCallbackFailure"
1666 | "SendDurableExecutionCallbackHeartbeat"
1667 | "InvokeAsync"
1668 | "InvokeWithResponseStream"
1669 );
1670
1671 let aid = &req.account_id;
1672 let result = match action {
1673 "CreateFunction" => self.create_function(&req),
1674 "ListFunctions" => self.list_functions(aid),
1675 "GetFunction" => self.get_function(
1676 resource_name.as_deref().unwrap_or(""),
1677 aid,
1678 req.region.as_str(),
1679 ),
1680 "DeleteFunction" => self.delete_function(resource_name.as_deref().unwrap_or(""), aid),
1681 "Invoke" => {
1682 let invocation_type = InvocationType::from_header(
1683 req.headers
1684 .get("x-amz-invocation-type")
1685 .and_then(|v| v.to_str().ok()),
1686 );
1687 self.invoke(
1688 resource_name.as_deref().unwrap_or(""),
1689 &req.body,
1690 aid,
1691 invocation_type,
1692 )
1693 .await
1694 }
1695 "InvokeAsync" => {
1696 self.invoke(
1697 resource_name.as_deref().unwrap_or(""),
1698 &req.body,
1699 aid,
1700 InvocationType::Event,
1701 )
1702 .await
1703 }
1704 "PublishVersion" => self.publish_version(resource_name.as_deref().unwrap_or(""), aid),
1705 "AddPermission" => self.add_permission(resource_name.as_deref().unwrap_or(""), &req),
1706 "GetPolicy" => self.get_policy(resource_name.as_deref().unwrap_or(""), aid),
1707 "RemovePermission" => {
1708 let sid = req.path_segments.get(4).cloned().unwrap_or_default();
1710 self.remove_permission(resource_name.as_deref().unwrap_or(""), &sid, aid)
1711 }
1712 "CreateEventSourceMapping" => self.create_event_source_mapping(&req),
1713 "ListEventSourceMappings" => self.list_event_source_mappings(aid),
1714 "GetEventSourceMapping" => {
1715 self.get_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1716 }
1717 "DeleteEventSourceMapping" => {
1718 self.delete_event_source_mapping(resource_name.as_deref().unwrap_or(""), aid)
1719 }
1720 other => {
1721 self.handle_extra(other, resource_name.as_deref(), &req)
1722 .await
1723 }
1724 };
1725 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1726 self.save_snapshot().await;
1727 }
1728 result
1729 }
1730
1731 fn supported_actions(&self) -> &[&str] {
1732 &[
1733 "CreateFunction",
1734 "GetFunction",
1735 "DeleteFunction",
1736 "ListFunctions",
1737 "Invoke",
1738 "InvokeAsync",
1739 "InvokeWithResponseStream",
1740 "PublishVersion",
1741 "ListVersionsByFunction",
1742 "AddPermission",
1743 "RemovePermission",
1744 "GetPolicy",
1745 "CreateEventSourceMapping",
1746 "ListEventSourceMappings",
1747 "GetEventSourceMapping",
1748 "UpdateEventSourceMapping",
1749 "DeleteEventSourceMapping",
1750 "GetFunctionConfiguration",
1751 "UpdateFunctionConfiguration",
1752 "UpdateFunctionCode",
1753 "GetAccountSettings",
1754 "CreateAlias",
1755 "GetAlias",
1756 "ListAliases",
1757 "UpdateAlias",
1758 "DeleteAlias",
1759 "PublishLayerVersion",
1760 "GetLayerVersion",
1761 "GetLayerVersionByArn",
1762 "DeleteLayerVersion",
1763 "ListLayerVersions",
1764 "ListLayers",
1765 "GetLayerVersionPolicy",
1766 "AddLayerVersionPermission",
1767 "RemoveLayerVersionPermission",
1768 "CreateFunctionUrlConfig",
1769 "GetFunctionUrlConfig",
1770 "UpdateFunctionUrlConfig",
1771 "DeleteFunctionUrlConfig",
1772 "ListFunctionUrlConfigs",
1773 "PutFunctionConcurrency",
1774 "GetFunctionConcurrency",
1775 "DeleteFunctionConcurrency",
1776 "PutProvisionedConcurrencyConfig",
1777 "GetProvisionedConcurrencyConfig",
1778 "DeleteProvisionedConcurrencyConfig",
1779 "ListProvisionedConcurrencyConfigs",
1780 "CreateCodeSigningConfig",
1781 "GetCodeSigningConfig",
1782 "UpdateCodeSigningConfig",
1783 "DeleteCodeSigningConfig",
1784 "ListCodeSigningConfigs",
1785 "PutFunctionCodeSigningConfig",
1786 "GetFunctionCodeSigningConfig",
1787 "DeleteFunctionCodeSigningConfig",
1788 "ListFunctionsByCodeSigningConfig",
1789 "PutFunctionEventInvokeConfig",
1790 "GetFunctionEventInvokeConfig",
1791 "UpdateFunctionEventInvokeConfig",
1792 "DeleteFunctionEventInvokeConfig",
1793 "ListFunctionEventInvokeConfigs",
1794 "PutRuntimeManagementConfig",
1795 "GetRuntimeManagementConfig",
1796 "PutFunctionScalingConfig",
1797 "GetFunctionScalingConfig",
1798 "PutFunctionRecursionConfig",
1799 "GetFunctionRecursionConfig",
1800 "TagResource",
1801 "UntagResource",
1802 "ListTags",
1803 "CreateCapacityProvider",
1804 "GetCapacityProvider",
1805 "UpdateCapacityProvider",
1806 "DeleteCapacityProvider",
1807 "ListCapacityProviders",
1808 "ListFunctionVersionsByCapacityProvider",
1809 "CheckpointDurableExecution",
1810 "GetDurableExecution",
1811 "GetDurableExecutionHistory",
1812 "GetDurableExecutionState",
1813 "ListDurableExecutionsByFunction",
1814 "StopDurableExecution",
1815 "SendDurableExecutionCallbackSuccess",
1816 "SendDurableExecutionCallbackFailure",
1817 "SendDurableExecutionCallbackHeartbeat",
1818 ]
1819 }
1820
1821 fn iam_enforceable(&self) -> bool {
1822 true
1823 }
1824
1825 fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1829 let (action_str, resource_name) = Self::resolve_action(request)?;
1834 let action: &'static str = match action_str {
1835 "CreateFunction" => "CreateFunction",
1836 "ListFunctions" => "ListFunctions",
1837 "GetFunction" => "GetFunction",
1838 "DeleteFunction" => "DeleteFunction",
1839 "Invoke" => "InvokeFunction",
1840 "PublishVersion" => "PublishVersion",
1841 "AddPermission" => "AddPermission",
1842 "RemovePermission" => "RemovePermission",
1843 "GetPolicy" => "GetPolicy",
1844 "CreateEventSourceMapping" => "CreateEventSourceMapping",
1845 "ListEventSourceMappings" => "ListEventSourceMappings",
1846 "GetEventSourceMapping" => "GetEventSourceMapping",
1847 "DeleteEventSourceMapping" => "DeleteEventSourceMapping",
1848 _ => return None,
1849 };
1850 let accounts = self.state.read();
1851 let empty = LambdaState::new(&request.account_id, &request.region);
1852 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1853 let resource = match action {
1854 "GetFunction" | "DeleteFunction" | "InvokeFunction" | "PublishVersion"
1855 | "AddPermission" | "RemovePermission" | "GetPolicy" => {
1856 let name = resource_name.unwrap_or_default();
1857 if name.is_empty() {
1858 "*".to_string()
1859 } else {
1860 format!(
1861 "arn:aws:lambda:{}:{}:function:{}",
1862 state.region, state.account_id, name
1863 )
1864 }
1865 }
1866 "CreateFunction" => {
1867 serde_json::from_slice::<Value>(&request.body)
1872 .ok()
1873 .and_then(|v| {
1874 v.get("FunctionName").and_then(|f| f.as_str()).map(|n| {
1875 format!(
1876 "arn:aws:lambda:{}:{}:function:{}",
1877 state.region, state.account_id, n
1878 )
1879 })
1880 })
1881 .unwrap_or_else(|| "*".to_string())
1882 }
1883 _ => "*".to_string(),
1884 };
1885 Some(fakecloud_core::auth::IamAction {
1886 service: "lambda",
1887 action,
1888 resource,
1889 })
1890 }
1891
1892 fn iam_condition_keys_for(
1893 &self,
1894 request: &AwsRequest,
1895 action: &fakecloud_core::auth::IamAction,
1896 ) -> std::collections::BTreeMap<String, Vec<String>> {
1897 let mut out = std::collections::BTreeMap::new();
1898 if action.action == "AddPermission" {
1899 if action.resource != "*" {
1900 out.insert(
1901 "lambda:functionarn".to_string(),
1902 vec![action.resource.clone()],
1903 );
1904 }
1905 if let Ok(body) = serde_json::from_slice::<Value>(&request.body) {
1906 if let Some(principal) = body.get("Principal").and_then(|p| p.as_str()) {
1907 out.insert("lambda:principal".to_string(), vec![principal.to_string()]);
1908 }
1909 }
1910 }
1911 out
1912 }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917 use super::*;
1918 use bytes::Bytes;
1919 use http::{HeaderMap, Method};
1920 use parking_lot::RwLock;
1921 use std::collections::HashMap;
1922 use std::sync::Arc;
1923
1924 fn make_state() -> SharedLambdaState {
1925 Arc::new(RwLock::new(
1926 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1927 ))
1928 }
1929
1930 fn make_request(method: Method, path: &str, body: &str) -> AwsRequest {
1931 let path_segments: Vec<String> = path
1932 .split('/')
1933 .filter(|s| !s.is_empty())
1934 .map(|s| s.to_string())
1935 .collect();
1936 AwsRequest {
1937 service: "lambda".to_string(),
1938 action: String::new(),
1939 region: "us-east-1".to_string(),
1940 account_id: "123456789012".to_string(),
1941 request_id: "test-request-id".to_string(),
1942 headers: HeaderMap::new(),
1943 query_params: HashMap::new(),
1944 body: Bytes::from(body.to_string()),
1945 body_stream: parking_lot::Mutex::new(None),
1946 path_segments,
1947 raw_path: path.to_string(),
1948 raw_query: String::new(),
1949 method,
1950 is_query_protocol: false,
1951 access_key_id: None,
1952 principal: None,
1953 }
1954 }
1955
1956 #[test]
1957 fn iam_condition_keys_for_add_permission_populates_arn_and_principal() {
1958 let svc = LambdaService::new(make_state());
1959 let body = json!({
1960 "StatementId": "stmt",
1961 "Action": "lambda:InvokeFunction",
1962 "Principal": "s3.amazonaws.com",
1963 })
1964 .to_string();
1965 let req = make_request(Method::POST, "/2015-03-31/functions/my-func/policy", &body);
1966 let action = fakecloud_core::auth::IamAction {
1967 service: "lambda",
1968 action: "AddPermission",
1969 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
1970 };
1971 let keys = svc.iam_condition_keys_for(&req, &action);
1972 assert_eq!(
1973 keys.get("lambda:functionarn"),
1974 Some(&vec![
1975 "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string()
1976 ])
1977 );
1978 assert_eq!(
1979 keys.get("lambda:principal"),
1980 Some(&vec!["s3.amazonaws.com".to_string()])
1981 );
1982 }
1983
1984 #[test]
1985 fn iam_condition_keys_for_add_permission_omits_missing_principal() {
1986 let svc = LambdaService::new(make_state());
1987 let body = json!({"StatementId": "stmt", "Action": "lambda:InvokeFunction"}).to_string();
1988 let req = make_request(Method::POST, "/2015-03-31/functions/my-func/policy", &body);
1989 let action = fakecloud_core::auth::IamAction {
1990 service: "lambda",
1991 action: "AddPermission",
1992 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
1993 };
1994 let keys = svc.iam_condition_keys_for(&req, &action);
1995 assert!(!keys.contains_key("lambda:principal"));
1996 assert!(keys.contains_key("lambda:functionarn"));
1997 }
1998
1999 #[test]
2000 fn iam_condition_keys_for_non_add_permission_is_empty() {
2001 let svc = LambdaService::new(make_state());
2002 let req = make_request(Method::GET, "/2015-03-31/functions/my-func", "");
2003 let action = fakecloud_core::auth::IamAction {
2004 service: "lambda",
2005 action: "GetFunction",
2006 resource: "arn:aws:lambda:us-east-1:123456789012:function:my-func".to_string(),
2007 };
2008 assert!(svc.iam_condition_keys_for(&req, &action).is_empty());
2009 }
2010
2011 #[tokio::test]
2012 async fn test_create_and_get_function() {
2013 let state = make_state();
2014 let svc = LambdaService::new(state);
2015
2016 let create_body = json!({
2017 "FunctionName": "my-func",
2018 "Runtime": "python3.12",
2019 "Role": "arn:aws:iam::123456789012:role/test-role",
2020 "Handler": "index.handler",
2021 "Code": { "ZipFile": "UEsFBgAAAAAAAAAAAAAAAAAAAAA=" }
2022 });
2023
2024 let req = make_request(
2025 Method::POST,
2026 "/2015-03-31/functions",
2027 &create_body.to_string(),
2028 );
2029 let resp = svc.handle(req).await.unwrap();
2030 assert_eq!(resp.status, StatusCode::CREATED);
2031
2032 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2033 assert_eq!(body["FunctionName"], "my-func");
2034 assert_eq!(body["Runtime"], "python3.12");
2035
2036 let req = make_request(Method::GET, "/2015-03-31/functions/my-func", "");
2038 let resp = svc.handle(req).await.unwrap();
2039 assert_eq!(resp.status, StatusCode::OK);
2040 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2041 assert_eq!(body["Configuration"]["FunctionName"], "my-func");
2042 }
2043
2044 #[tokio::test]
2045 async fn test_delete_function() {
2046 let state = make_state();
2047 let svc = LambdaService::new(state);
2048
2049 let create_body = json!({
2050 "FunctionName": "to-delete",
2051 "Runtime": "nodejs20.x",
2052 "Role": "arn:aws:iam::123456789012:role/test",
2053 "Handler": "index.handler",
2054 "Code": {}
2055 });
2056
2057 let req = make_request(
2058 Method::POST,
2059 "/2015-03-31/functions",
2060 &create_body.to_string(),
2061 );
2062 svc.handle(req).await.unwrap();
2063
2064 let req = make_request(Method::DELETE, "/2015-03-31/functions/to-delete", "");
2065 let resp = svc.handle(req).await.unwrap();
2066 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2067
2068 let req = make_request(Method::GET, "/2015-03-31/functions/to-delete", "");
2070 let resp = svc.handle(req).await;
2071 assert!(resp.is_err());
2072 }
2073
2074 #[tokio::test]
2075 async fn test_invoke_without_runtime_returns_error() {
2076 let state = make_state();
2077 let svc = LambdaService::new(state);
2078
2079 let create_body = json!({
2080 "FunctionName": "invoke-me",
2081 "Runtime": "python3.12",
2082 "Role": "arn:aws:iam::123456789012:role/test",
2083 "Handler": "index.handler",
2084 "Code": {}
2085 });
2086
2087 let req = make_request(
2088 Method::POST,
2089 "/2015-03-31/functions",
2090 &create_body.to_string(),
2091 );
2092 svc.handle(req).await.unwrap();
2093
2094 let req = make_request(
2095 Method::POST,
2096 "/2015-03-31/functions/invoke-me/invocations",
2097 r#"{"key": "value"}"#,
2098 );
2099 let resp = svc.handle(req).await;
2100 assert!(resp.is_err());
2101 }
2102
2103 #[tokio::test]
2104 async fn test_invoke_nonexistent_function() {
2105 let state = make_state();
2106 let svc = LambdaService::new(state);
2107
2108 let req = make_request(
2109 Method::POST,
2110 "/2015-03-31/functions/does-not-exist/invocations",
2111 "{}",
2112 );
2113 let resp = svc.handle(req).await;
2114 assert!(resp.is_err());
2115 }
2116
2117 #[tokio::test]
2118 async fn test_list_functions() {
2119 let state = make_state();
2120 let svc = LambdaService::new(state);
2121
2122 for name in &["func-a", "func-b"] {
2123 let create_body = json!({
2124 "FunctionName": name,
2125 "Runtime": "python3.12",
2126 "Role": "arn:aws:iam::123456789012:role/test",
2127 "Handler": "index.handler",
2128 "Code": {}
2129 });
2130 let req = make_request(
2131 Method::POST,
2132 "/2015-03-31/functions",
2133 &create_body.to_string(),
2134 );
2135 svc.handle(req).await.unwrap();
2136 }
2137
2138 let req = make_request(Method::GET, "/2015-03-31/functions", "");
2139 let resp = svc.handle(req).await.unwrap();
2140 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2141 assert_eq!(body["Functions"].as_array().unwrap().len(), 2);
2142 }
2143
2144 #[tokio::test]
2145 async fn test_event_source_mapping() {
2146 let state = make_state();
2147 let svc = LambdaService::new(state);
2148
2149 let create_body = json!({
2151 "FunctionName": "esm-func",
2152 "Runtime": "python3.12",
2153 "Role": "arn:aws:iam::123456789012:role/test",
2154 "Handler": "index.handler",
2155 "Code": {}
2156 });
2157 let req = make_request(
2158 Method::POST,
2159 "/2015-03-31/functions",
2160 &create_body.to_string(),
2161 );
2162 svc.handle(req).await.unwrap();
2163
2164 let mapping_body = json!({
2166 "FunctionName": "esm-func",
2167 "EventSourceArn": "arn:aws:sqs:us-east-1:123456789012:my-queue",
2168 "BatchSize": 5
2169 });
2170 let req = make_request(
2171 Method::POST,
2172 "/2015-03-31/event-source-mappings",
2173 &mapping_body.to_string(),
2174 );
2175 let resp = svc.handle(req).await.unwrap();
2176 assert_eq!(resp.status, StatusCode::ACCEPTED);
2177 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2178 let uuid = body["UUID"].as_str().unwrap().to_string();
2179
2180 let req = make_request(Method::GET, "/2015-03-31/event-source-mappings", "");
2182 let resp = svc.handle(req).await.unwrap();
2183 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2184 assert_eq!(body["EventSourceMappings"].as_array().unwrap().len(), 1);
2185
2186 let req = make_request(
2188 Method::DELETE,
2189 &format!("/2015-03-31/event-source-mappings/{uuid}"),
2190 "",
2191 );
2192 let resp = svc.handle(req).await.unwrap();
2193 assert_eq!(resp.status, StatusCode::ACCEPTED);
2194 }
2195
2196 async fn seed_function(svc: &LambdaService, name: &str) {
2197 let body = json!({
2198 "FunctionName": name,
2199 "Runtime": "python3.12",
2200 "Role": "arn:aws:iam::123456789012:role/r",
2201 "Handler": "index.handler",
2202 "Code": {}
2203 });
2204 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2205 svc.handle(req).await.unwrap();
2206 }
2207
2208 #[tokio::test]
2209 async fn add_permission_builds_canonical_statement() {
2210 let svc = LambdaService::new(make_state());
2211 seed_function(&svc, "f").await;
2212
2213 let body = json!({
2214 "StatementId": "s3-invoke",
2215 "Action": "InvokeFunction",
2216 "Principal": "s3.amazonaws.com",
2217 "SourceArn": "arn:aws:s3:::my-bucket",
2218 "SourceAccount": "123456789012",
2219 });
2220 let req = make_request(
2221 Method::POST,
2222 "/2015-03-31/functions/f/policy",
2223 &body.to_string(),
2224 );
2225 let resp = svc.handle(req).await.unwrap();
2226 assert_eq!(resp.status, StatusCode::CREATED);
2227
2228 let out: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2229 let statement: Value = serde_json::from_str(out["Statement"].as_str().unwrap()).unwrap();
2230 assert_eq!(statement["Sid"], "s3-invoke");
2231 assert_eq!(statement["Effect"], "Allow");
2232 assert_eq!(statement["Principal"]["Service"], "s3.amazonaws.com");
2233 assert_eq!(statement["Action"], "lambda:InvokeFunction");
2234 assert_eq!(
2235 statement["Resource"],
2236 "arn:aws:lambda:us-east-1:123456789012:function:f"
2237 );
2238 assert_eq!(
2239 statement["Condition"]["ArnLike"]["aws:SourceArn"],
2240 "arn:aws:s3:::my-bucket"
2241 );
2242 assert_eq!(
2243 statement["Condition"]["StringEquals"]["aws:SourceAccount"],
2244 "123456789012"
2245 );
2246 }
2247
2248 #[tokio::test]
2249 async fn add_permission_aws_principal_emits_aws_key() {
2250 let svc = LambdaService::new(make_state());
2251 seed_function(&svc, "f").await;
2252
2253 let body = json!({
2254 "StatementId": "user-invoke",
2255 "Action": "InvokeFunction",
2256 "Principal": "arn:aws:iam::123456789012:user/alice",
2257 });
2258 let req = make_request(
2259 Method::POST,
2260 "/2015-03-31/functions/f/policy",
2261 &body.to_string(),
2262 );
2263 svc.handle(req).await.unwrap();
2264
2265 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2267 let resp = svc.handle(req).await.unwrap();
2268 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2269 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2270 let statements = doc["Statement"].as_array().unwrap();
2271 assert_eq!(statements.len(), 1);
2272 assert_eq!(
2273 statements[0]["Principal"]["AWS"],
2274 "arn:aws:iam::123456789012:user/alice"
2275 );
2276 assert!(statements[0].get("Condition").is_none());
2277 }
2278
2279 #[tokio::test]
2280 async fn add_permission_rejects_duplicate_statement_id() {
2281 let svc = LambdaService::new(make_state());
2282 seed_function(&svc, "f").await;
2283
2284 let body = json!({
2285 "StatementId": "dup",
2286 "Action": "InvokeFunction",
2287 "Principal": "arn:aws:iam::123456789012:user/a",
2288 });
2289 let req = make_request(
2290 Method::POST,
2291 "/2015-03-31/functions/f/policy",
2292 &body.to_string(),
2293 );
2294 svc.handle(req).await.unwrap();
2295
2296 let req = make_request(
2297 Method::POST,
2298 "/2015-03-31/functions/f/policy",
2299 &body.to_string(),
2300 );
2301 let err = match svc.handle(req).await {
2302 Err(e) => e,
2303 Ok(_) => panic!("expected error"),
2304 };
2305 assert_eq!(err.status(), StatusCode::CONFLICT);
2306 }
2307
2308 #[tokio::test]
2309 async fn get_policy_returns_404_when_no_policy_attached() {
2310 let svc = LambdaService::new(make_state());
2311 seed_function(&svc, "f").await;
2312
2313 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2314 let err = match svc.handle(req).await {
2315 Err(e) => e,
2316 Ok(_) => panic!("expected error"),
2317 };
2318 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2319 }
2320
2321 #[tokio::test]
2322 async fn remove_permission_strips_matching_sid_and_leaves_empty_doc() {
2323 let svc = LambdaService::new(make_state());
2324 seed_function(&svc, "f").await;
2325
2326 for sid in ["a", "b"] {
2327 let body = json!({
2328 "StatementId": sid,
2329 "Action": "InvokeFunction",
2330 "Principal": "arn:aws:iam::123456789012:user/u",
2331 });
2332 let req = make_request(
2333 Method::POST,
2334 "/2015-03-31/functions/f/policy",
2335 &body.to_string(),
2336 );
2337 svc.handle(req).await.unwrap();
2338 }
2339
2340 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/a", "");
2342 let resp = svc.handle(req).await.unwrap();
2343 assert_eq!(resp.status, StatusCode::NO_CONTENT);
2344
2345 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2347 let resp = svc.handle(req).await.unwrap();
2348 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2349 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2350 let stmts = doc["Statement"].as_array().unwrap();
2351 assert_eq!(stmts.len(), 1);
2352 assert_eq!(stmts[0]["Sid"], "b");
2353
2354 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/b", "");
2356 svc.handle(req).await.unwrap();
2357
2358 let req = make_request(Method::GET, "/2015-03-31/functions/f/policy", "");
2359 let resp = svc.handle(req).await.unwrap();
2360 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2361 let doc: Value = serde_json::from_str(body["Policy"].as_str().unwrap()).unwrap();
2362 assert_eq!(doc["Statement"].as_array().unwrap().len(), 0);
2363 }
2364
2365 #[tokio::test]
2366 async fn remove_permission_unknown_sid_is_404() {
2367 let svc = LambdaService::new(make_state());
2368 seed_function(&svc, "f").await;
2369
2370 let body = json!({
2371 "StatementId": "known",
2372 "Action": "InvokeFunction",
2373 "Principal": "arn:aws:iam::123456789012:user/u",
2374 });
2375 let req = make_request(
2376 Method::POST,
2377 "/2015-03-31/functions/f/policy",
2378 &body.to_string(),
2379 );
2380 svc.handle(req).await.unwrap();
2381
2382 let req = make_request(Method::DELETE, "/2015-03-31/functions/f/policy/other", "");
2383 let err = match svc.handle(req).await {
2384 Err(e) => e,
2385 Ok(_) => panic!("expected error"),
2386 };
2387 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2388 }
2389
2390 #[tokio::test]
2391 async fn add_permission_on_missing_function_is_404() {
2392 let svc = LambdaService::new(make_state());
2393 let body = json!({
2394 "StatementId": "s",
2395 "Action": "InvokeFunction",
2396 "Principal": "arn:aws:iam::123456789012:user/u",
2397 });
2398 let req = make_request(
2399 Method::POST,
2400 "/2015-03-31/functions/missing/policy",
2401 &body.to_string(),
2402 );
2403 let err = match svc.handle(req).await {
2404 Err(e) => e,
2405 Ok(_) => panic!("expected error"),
2406 };
2407 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2408 }
2409
2410 #[test]
2411 fn iam_action_for_maps_invoke_to_function_arn() {
2412 let svc = LambdaService::new(make_state());
2413 let req = make_request(Method::POST, "/2015-03-31/functions/f/invocations", "");
2414 let action = svc.iam_action_for(&req).unwrap();
2415 assert_eq!(action.service, "lambda");
2416 assert_eq!(action.action, "InvokeFunction");
2417 assert_eq!(
2418 action.resource,
2419 "arn:aws:lambda:us-east-1:123456789012:function:f"
2420 );
2421 }
2422
2423 #[test]
2424 fn iam_action_for_maps_list_to_star() {
2425 let svc = LambdaService::new(make_state());
2426 let req = make_request(Method::GET, "/2015-03-31/functions", "");
2427 let action = svc.iam_action_for(&req).unwrap();
2428 assert_eq!(action.action, "ListFunctions");
2429 assert_eq!(action.resource, "*");
2430 }
2431
2432 #[test]
2433 fn iam_action_for_create_reads_function_name_from_body() {
2434 let svc = LambdaService::new(make_state());
2435 let body = json!({
2436 "FunctionName": "newfn",
2437 "Runtime": "python3.12",
2438 "Role": "arn:aws:iam::123456789012:role/r",
2439 "Handler": "index.handler",
2440 "Code": {}
2441 });
2442 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2443 let action = svc.iam_action_for(&req).unwrap();
2444 assert_eq!(action.action, "CreateFunction");
2445 assert_eq!(
2446 action.resource,
2447 "arn:aws:lambda:us-east-1:123456789012:function:newfn"
2448 );
2449 }
2450
2451 #[tokio::test]
2454 async fn create_function_duplicate_returns_conflict() {
2455 let svc = LambdaService::new(make_state());
2456 seed_function(&svc, "dup-fn").await;
2457
2458 let body = json!({
2459 "FunctionName": "dup-fn",
2460 "Runtime": "python3.12",
2461 "Role": "arn:aws:iam::123456789012:role/r",
2462 "Handler": "index.handler",
2463 "Code": {"ZipFile": "UEsDBBQ="},
2464 });
2465 let req = make_request(Method::POST, "/2015-03-31/functions", &body.to_string());
2466 let err = match svc.handle(req).await {
2467 Err(e) => e,
2468 Ok(_) => panic!("expected ResourceConflictException"),
2469 };
2470 assert_eq!(err.status(), StatusCode::CONFLICT);
2471 }
2472
2473 #[tokio::test]
2474 async fn get_function_not_found() {
2475 let svc = LambdaService::new(make_state());
2476 let req = make_request(Method::GET, "/2015-03-31/functions/nope", "");
2477 let err = match svc.handle(req).await {
2478 Err(e) => e,
2479 Ok(_) => panic!("expected error"),
2480 };
2481 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2482 }
2483
2484 #[tokio::test]
2485 async fn delete_function_not_found() {
2486 let svc = LambdaService::new(make_state());
2487 let req = make_request(Method::DELETE, "/2015-03-31/functions/nope", "");
2488 let err = match svc.handle(req).await {
2489 Err(e) => e,
2490 Ok(_) => panic!("expected error"),
2491 };
2492 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2493 }
2494
2495 #[tokio::test]
2496 async fn get_event_source_mapping_not_found() {
2497 let svc = LambdaService::new(make_state());
2498 let req = make_request(
2499 Method::GET,
2500 "/2015-03-31/event-source-mappings/nonexistent",
2501 "",
2502 );
2503 let err = match svc.handle(req).await {
2504 Err(e) => e,
2505 Ok(_) => panic!("expected error"),
2506 };
2507 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2508 }
2509
2510 #[tokio::test]
2511 async fn delete_event_source_mapping_not_found() {
2512 let svc = LambdaService::new(make_state());
2513 let req = make_request(
2514 Method::DELETE,
2515 "/2015-03-31/event-source-mappings/nonexistent",
2516 "",
2517 );
2518 let err = match svc.handle(req).await {
2519 Err(e) => e,
2520 Ok(_) => panic!("expected error"),
2521 };
2522 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2523 }
2524
2525 #[tokio::test]
2526 async fn get_policy_on_missing_function() {
2527 let svc = LambdaService::new(make_state());
2528 let req = make_request(Method::GET, "/2015-03-31/functions/nope/policy", "");
2529 let err = match svc.handle(req).await {
2530 Err(e) => e,
2531 Ok(_) => panic!("expected error"),
2532 };
2533 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2534 }
2535
2536 #[tokio::test]
2537 async fn remove_permission_on_missing_function() {
2538 let svc = LambdaService::new(make_state());
2539 let req = make_request(
2540 Method::DELETE,
2541 "/2015-03-31/functions/nope/policy/stmt1",
2542 "",
2543 );
2544 let err = match svc.handle(req).await {
2545 Err(e) => e,
2546 Ok(_) => panic!("expected error"),
2547 };
2548 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2549 }
2550
2551 #[tokio::test]
2552 async fn publish_version_on_missing_function() {
2553 let svc = LambdaService::new(make_state());
2554 let req = make_request(Method::POST, "/2015-03-31/functions/nope/versions", "{}");
2555 let err = match svc.handle(req).await {
2556 Err(e) => e,
2557 Ok(_) => panic!("expected error"),
2558 };
2559 assert_eq!(err.status(), StatusCode::NOT_FOUND);
2560 }
2561
2562 #[tokio::test]
2563 async fn unknown_route_returns_error() {
2564 let svc = LambdaService::new(make_state());
2565 let req = make_request(Method::POST, "/unknown/route", "{}");
2566 assert!(svc.handle(req).await.is_err());
2567 }
2568
2569 #[tokio::test]
2570 async fn publish_version_unknown_function_errors() {
2571 let svc = LambdaService::new(make_state());
2572 assert!(svc.publish_version("ghost", "123456789012").is_err());
2573 }
2574
2575 #[tokio::test]
2576 async fn get_function_unknown_errors() {
2577 let svc = LambdaService::new(make_state());
2578 assert!(svc
2579 .get_function("ghost", "123456789012", "us-east-1")
2580 .is_err());
2581 }
2582
2583 #[tokio::test]
2584 async fn delete_function_unknown_errors() {
2585 let svc = LambdaService::new(make_state());
2586 assert!(svc.delete_function("ghost", "123456789012").is_err());
2587 }
2588
2589 #[tokio::test]
2590 async fn get_event_source_mapping_unknown_errors() {
2591 let svc = LambdaService::new(make_state());
2592 assert!(svc
2593 .get_event_source_mapping("ghost", "123456789012")
2594 .is_err());
2595 }
2596
2597 #[tokio::test]
2598 async fn delete_event_source_mapping_unknown_errors() {
2599 let svc = LambdaService::new(make_state());
2600 assert!(svc
2601 .delete_event_source_mapping("ghost", "123456789012")
2602 .is_err());
2603 }
2604
2605 #[tokio::test]
2606 async fn list_functions_empty_ok() {
2607 let svc = LambdaService::new(make_state());
2608 let resp = svc.list_functions("123456789012").unwrap();
2609 assert_eq!(resp.status, http::StatusCode::OK);
2610 }
2611
2612 #[tokio::test]
2613 async fn list_event_source_mappings_empty_ok() {
2614 let svc = LambdaService::new(make_state());
2615 let resp = svc.list_event_source_mappings("123456789012").unwrap();
2616 assert_eq!(resp.status, http::StatusCode::OK);
2617 }
2618}