1use async_trait::async_trait;
2use chrono::Utc;
3use http::{Method, StatusCode};
4use serde_json::{json, Value};
5use sha2::{Digest, Sha256};
6
7use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
8
9use crate::state::{EventSourceMapping, LambdaFunction, SharedLambdaState};
10
11pub struct LambdaService {
12 state: SharedLambdaState,
13}
14
15impl LambdaService {
16 pub fn new(state: SharedLambdaState) -> Self {
17 Self { state }
18 }
19
20 fn resolve_action(req: &AwsRequest) -> Option<(&str, Option<String>)> {
33 let segs = &req.path_segments;
34 if segs.is_empty() {
35 return None;
36 }
37
38 if segs[0] != "2015-03-31" {
40 return None;
41 }
42
43 match (req.method.clone(), segs.len()) {
44 (Method::POST, 2) if segs[1] == "functions" => Some(("CreateFunction", None)),
46 (Method::GET, 2) if segs[1] == "functions" => Some(("ListFunctions", None)),
47 (Method::GET, 3) if segs[1] == "functions" => {
49 Some(("GetFunction", Some(segs[2].clone())))
50 }
51 (Method::DELETE, 3) if segs[1] == "functions" => {
52 Some(("DeleteFunction", Some(segs[2].clone())))
53 }
54 (Method::POST, 4) if segs[1] == "functions" && segs[3] == "invocations" => {
56 Some(("Invoke", Some(segs[2].clone())))
57 }
58 (Method::POST, 4) if segs[1] == "functions" && segs[3] == "versions" => {
60 Some(("PublishVersion", Some(segs[2].clone())))
61 }
62 (Method::POST, 2) if segs[1] == "event-source-mappings" => {
64 Some(("CreateEventSourceMapping", None))
65 }
66 (Method::GET, 2) if segs[1] == "event-source-mappings" => {
67 Some(("ListEventSourceMappings", None))
68 }
69 (Method::GET, 3) if segs[1] == "event-source-mappings" => {
71 Some(("GetEventSourceMapping", Some(segs[2].clone())))
72 }
73 (Method::DELETE, 3) if segs[1] == "event-source-mappings" => {
74 Some(("DeleteEventSourceMapping", Some(segs[2].clone())))
75 }
76 _ => None,
77 }
78 }
79
80 fn create_function(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
81 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
82 let function_name = body["FunctionName"]
83 .as_str()
84 .ok_or_else(|| {
85 AwsServiceError::aws_error(
86 StatusCode::BAD_REQUEST,
87 "InvalidParameterValueException",
88 "FunctionName is required",
89 )
90 })?
91 .to_string();
92
93 let mut state = self.state.write();
94
95 if state.functions.contains_key(&function_name) {
96 return Err(AwsServiceError::aws_error(
97 StatusCode::CONFLICT,
98 "ResourceConflictException",
99 format!("Function already exist: {}", function_name),
100 ));
101 }
102
103 let runtime = body["Runtime"].as_str().unwrap_or("python3.12").to_string();
104 let role = body["Role"].as_str().unwrap_or("").to_string();
105 let handler = body["Handler"]
106 .as_str()
107 .unwrap_or("index.handler")
108 .to_string();
109 let description = body["Description"].as_str().unwrap_or("").to_string();
110 let timeout = body["Timeout"].as_i64().unwrap_or(3);
111 let memory_size = body["MemorySize"].as_i64().unwrap_or(128);
112 let package_type = body["PackageType"].as_str().unwrap_or("Zip").to_string();
113
114 let tags: std::collections::HashMap<String, String> = body["Tags"]
115 .as_object()
116 .map(|m| {
117 m.iter()
118 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
119 .collect()
120 })
121 .unwrap_or_default();
122
123 let environment: std::collections::HashMap<String, String> = body["Environment"]
124 ["Variables"]
125 .as_object()
126 .map(|m| {
127 m.iter()
128 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
129 .collect()
130 })
131 .unwrap_or_default();
132
133 let architectures = body["Architectures"]
134 .as_array()
135 .map(|a| {
136 a.iter()
137 .filter_map(|v| v.as_str().map(|s| s.to_string()))
138 .collect()
139 })
140 .unwrap_or_else(|| vec!["x86_64".to_string()]);
141
142 let code_body = serde_json::to_vec(&body["Code"]).unwrap_or_default();
144 let mut hasher = Sha256::new();
145 hasher.update(&code_body);
146 let hash = hasher.finalize();
147 let code_sha256 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, hash);
148 let code_size = code_body.len() as i64;
149
150 let function_arn = format!(
151 "arn:aws:lambda:{}:{}:function:{}",
152 state.region, state.account_id, function_name
153 );
154 let now = Utc::now();
155
156 let func = LambdaFunction {
157 function_name: function_name.clone(),
158 function_arn: function_arn.clone(),
159 runtime: runtime.clone(),
160 role: role.clone(),
161 handler: handler.clone(),
162 description: description.clone(),
163 timeout,
164 memory_size,
165 code_sha256: code_sha256.clone(),
166 code_size,
167 version: "$LATEST".to_string(),
168 last_modified: now,
169 tags,
170 environment: environment.clone(),
171 architectures: architectures.clone(),
172 package_type: package_type.clone(),
173 };
174
175 let response = self.function_config_json(&func);
176
177 state.functions.insert(function_name, func);
178
179 Ok(AwsResponse::json(StatusCode::CREATED, response.to_string()))
180 }
181
182 fn get_function(&self, function_name: &str) -> Result<AwsResponse, AwsServiceError> {
183 let state = self.state.read();
184 let func = state.functions.get(function_name).ok_or_else(|| {
185 AwsServiceError::aws_error(
186 StatusCode::NOT_FOUND,
187 "ResourceNotFoundException",
188 format!(
189 "Function not found: arn:aws:lambda:{}:{}:function:{}",
190 state.region, state.account_id, function_name
191 ),
192 )
193 })?;
194
195 let config = self.function_config_json(func);
196 let response = json!({
197 "Code": {
198 "Location": format!("https://awslambda-{}-tasks.s3.{}.amazonaws.com/stub",
199 func.function_arn.split(':').nth(3).unwrap_or("us-east-1"),
200 func.function_arn.split(':').nth(3).unwrap_or("us-east-1")),
201 "RepositoryType": "S3"
202 },
203 "Configuration": config,
204 "Tags": func.tags,
205 });
206
207 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
208 }
209
210 fn delete_function(&self, function_name: &str) -> Result<AwsResponse, AwsServiceError> {
211 let mut state = self.state.write();
212 let region = state.region.clone();
213 let account_id = state.account_id.clone();
214 if state.functions.remove(function_name).is_none() {
215 return Err(AwsServiceError::aws_error(
216 StatusCode::NOT_FOUND,
217 "ResourceNotFoundException",
218 format!(
219 "Function not found: arn:aws:lambda:{}:{}:function:{}",
220 region, account_id, function_name
221 ),
222 ));
223 }
224
225 Ok(AwsResponse::json(StatusCode::NO_CONTENT, ""))
226 }
227
228 fn list_functions(&self) -> Result<AwsResponse, AwsServiceError> {
229 let state = self.state.read();
230 let functions: Vec<Value> = state
231 .functions
232 .values()
233 .map(|f| self.function_config_json(f))
234 .collect();
235
236 let response = json!({
237 "Functions": functions,
238 });
239
240 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
241 }
242
243 fn invoke(&self, function_name: &str) -> Result<AwsResponse, AwsServiceError> {
244 let state = self.state.read();
245 if !state.functions.contains_key(function_name) {
246 let region = state.region.clone();
247 let account_id = state.account_id.clone();
248 return Err(AwsServiceError::aws_error(
249 StatusCode::NOT_FOUND,
250 "ResourceNotFoundException",
251 format!(
252 "Function not found: arn:aws:lambda:{}:{}:function:{}",
253 region, account_id, function_name
254 ),
255 ));
256 }
257
258 let mut resp = AwsResponse::json(StatusCode::OK, "{}");
260 resp.headers.insert(
261 http::header::HeaderName::from_static("x-amz-executed-version"),
262 http::header::HeaderValue::from_static("$LATEST"),
263 );
264 Ok(resp)
265 }
266
267 fn publish_version(&self, function_name: &str) -> Result<AwsResponse, AwsServiceError> {
268 let state = self.state.read();
269 let func = state.functions.get(function_name).ok_or_else(|| {
270 AwsServiceError::aws_error(
271 StatusCode::NOT_FOUND,
272 "ResourceNotFoundException",
273 format!(
274 "Function not found: arn:aws:lambda:{}:{}:function:{}",
275 state.region, state.account_id, function_name
276 ),
277 )
278 })?;
279
280 let mut config = self.function_config_json(func);
281 config["Version"] = json!("1");
283 config["FunctionArn"] = json!(format!("{}:1", func.function_arn));
284
285 Ok(AwsResponse::json(StatusCode::CREATED, config.to_string()))
286 }
287
288 fn create_event_source_mapping(
289 &self,
290 req: &AwsRequest,
291 ) -> Result<AwsResponse, AwsServiceError> {
292 let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
293 let event_source_arn = body["EventSourceArn"]
294 .as_str()
295 .ok_or_else(|| {
296 AwsServiceError::aws_error(
297 StatusCode::BAD_REQUEST,
298 "InvalidParameterValueException",
299 "EventSourceArn is required",
300 )
301 })?
302 .to_string();
303
304 let function_name = body["FunctionName"]
305 .as_str()
306 .ok_or_else(|| {
307 AwsServiceError::aws_error(
308 StatusCode::BAD_REQUEST,
309 "InvalidParameterValueException",
310 "FunctionName is required",
311 )
312 })?
313 .to_string();
314
315 let mut state = self.state.write();
316
317 let function_arn = if function_name.starts_with("arn:") {
319 function_name.clone()
320 } else {
321 let func = state.functions.get(&function_name).ok_or_else(|| {
322 AwsServiceError::aws_error(
323 StatusCode::NOT_FOUND,
324 "ResourceNotFoundException",
325 format!(
326 "Function not found: arn:aws:lambda:{}:{}:function:{}",
327 state.region, state.account_id, function_name
328 ),
329 )
330 })?;
331 func.function_arn.clone()
332 };
333
334 let batch_size = body["BatchSize"].as_i64().unwrap_or(10);
335 let enabled = body["Enabled"].as_bool().unwrap_or(true);
336 let mapping_uuid = uuid::Uuid::new_v4().to_string();
337 let now = Utc::now();
338
339 let mapping = EventSourceMapping {
340 uuid: mapping_uuid.clone(),
341 function_arn: function_arn.clone(),
342 event_source_arn: event_source_arn.clone(),
343 batch_size,
344 enabled,
345 state: if enabled {
346 "Enabled".to_string()
347 } else {
348 "Disabled".to_string()
349 },
350 last_modified: now,
351 };
352
353 let response = self.event_source_mapping_json(&mapping);
354 state.event_source_mappings.insert(mapping_uuid, mapping);
355
356 Ok(AwsResponse::json(
357 StatusCode::ACCEPTED,
358 response.to_string(),
359 ))
360 }
361
362 fn list_event_source_mappings(&self) -> Result<AwsResponse, AwsServiceError> {
363 let state = self.state.read();
364 let mappings: Vec<Value> = state
365 .event_source_mappings
366 .values()
367 .map(|m| self.event_source_mapping_json(m))
368 .collect();
369
370 let response = json!({
371 "EventSourceMappings": mappings,
372 });
373
374 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
375 }
376
377 fn get_event_source_mapping(&self, uuid: &str) -> Result<AwsResponse, AwsServiceError> {
378 let state = self.state.read();
379 let mapping = state.event_source_mappings.get(uuid).ok_or_else(|| {
380 AwsServiceError::aws_error(
381 StatusCode::NOT_FOUND,
382 "ResourceNotFoundException",
383 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
384 )
385 })?;
386
387 let response = self.event_source_mapping_json(mapping);
388 Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
389 }
390
391 fn delete_event_source_mapping(&self, uuid: &str) -> Result<AwsResponse, AwsServiceError> {
392 let mut state = self.state.write();
393 let mapping = state.event_source_mappings.remove(uuid).ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::NOT_FOUND,
396 "ResourceNotFoundException",
397 format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
398 )
399 })?;
400
401 let mut response = self.event_source_mapping_json(&mapping);
402 response["State"] = json!("Deleting");
403 Ok(AwsResponse::json(
404 StatusCode::ACCEPTED,
405 response.to_string(),
406 ))
407 }
408
409 fn function_config_json(&self, func: &LambdaFunction) -> Value {
410 let mut env_vars = json!({});
411 if !func.environment.is_empty() {
412 env_vars = json!({ "Variables": func.environment });
413 }
414
415 json!({
416 "FunctionName": func.function_name,
417 "FunctionArn": func.function_arn,
418 "Runtime": func.runtime,
419 "Role": func.role,
420 "Handler": func.handler,
421 "Description": func.description,
422 "Timeout": func.timeout,
423 "MemorySize": func.memory_size,
424 "CodeSha256": func.code_sha256,
425 "CodeSize": func.code_size,
426 "Version": func.version,
427 "LastModified": func.last_modified.format("%Y-%m-%dT%H:%M:%S%.3f+0000").to_string(),
428 "PackageType": func.package_type,
429 "Architectures": func.architectures,
430 "Environment": env_vars,
431 "State": "Active",
432 "LastUpdateStatus": "Successful",
433 "TracingConfig": { "Mode": "PassThrough" },
434 "RevisionId": uuid::Uuid::new_v4().to_string(),
435 })
436 }
437
438 fn event_source_mapping_json(&self, mapping: &EventSourceMapping) -> Value {
439 json!({
440 "UUID": mapping.uuid,
441 "FunctionArn": mapping.function_arn,
442 "EventSourceArn": mapping.event_source_arn,
443 "BatchSize": mapping.batch_size,
444 "State": mapping.state,
445 "LastModified": mapping.last_modified.timestamp_millis() as f64 / 1000.0,
446 })
447 }
448}
449
450#[async_trait]
451impl AwsService for LambdaService {
452 fn service_name(&self) -> &str {
453 "lambda"
454 }
455
456 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
457 let (action, resource_name) = Self::resolve_action(&req).ok_or_else(|| {
458 AwsServiceError::aws_error(
459 StatusCode::NOT_FOUND,
460 "UnknownOperationException",
461 format!("Unknown operation: {} {}", req.method, req.raw_path),
462 )
463 })?;
464
465 match action {
466 "CreateFunction" => self.create_function(&req),
467 "ListFunctions" => self.list_functions(),
468 "GetFunction" => self.get_function(resource_name.as_deref().unwrap_or("")),
469 "DeleteFunction" => self.delete_function(resource_name.as_deref().unwrap_or("")),
470 "Invoke" => self.invoke(resource_name.as_deref().unwrap_or("")),
471 "PublishVersion" => self.publish_version(resource_name.as_deref().unwrap_or("")),
472 "CreateEventSourceMapping" => self.create_event_source_mapping(&req),
473 "ListEventSourceMappings" => self.list_event_source_mappings(),
474 "GetEventSourceMapping" => {
475 self.get_event_source_mapping(resource_name.as_deref().unwrap_or(""))
476 }
477 "DeleteEventSourceMapping" => {
478 self.delete_event_source_mapping(resource_name.as_deref().unwrap_or(""))
479 }
480 _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
481 }
482 }
483
484 fn supported_actions(&self) -> &[&str] {
485 &[
486 "CreateFunction",
487 "GetFunction",
488 "DeleteFunction",
489 "ListFunctions",
490 "Invoke",
491 "PublishVersion",
492 "CreateEventSourceMapping",
493 "ListEventSourceMappings",
494 "GetEventSourceMapping",
495 "DeleteEventSourceMapping",
496 ]
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503 use crate::state::LambdaState;
504 use bytes::Bytes;
505 use http::{HeaderMap, Method};
506 use parking_lot::RwLock;
507 use std::collections::HashMap;
508 use std::sync::Arc;
509
510 fn make_state() -> SharedLambdaState {
511 Arc::new(RwLock::new(LambdaState::new("123456789012", "us-east-1")))
512 }
513
514 fn make_request(method: Method, path: &str, body: &str) -> AwsRequest {
515 let path_segments: Vec<String> = path
516 .split('/')
517 .filter(|s| !s.is_empty())
518 .map(|s| s.to_string())
519 .collect();
520 AwsRequest {
521 service: "lambda".to_string(),
522 action: String::new(),
523 region: "us-east-1".to_string(),
524 account_id: "123456789012".to_string(),
525 request_id: "test-request-id".to_string(),
526 headers: HeaderMap::new(),
527 query_params: HashMap::new(),
528 body: Bytes::from(body.to_string()),
529 path_segments,
530 raw_path: path.to_string(),
531 method,
532 is_query_protocol: false,
533 access_key_id: None,
534 }
535 }
536
537 #[tokio::test]
538 async fn test_create_and_get_function() {
539 let state = make_state();
540 let svc = LambdaService::new(state);
541
542 let create_body = json!({
543 "FunctionName": "my-func",
544 "Runtime": "python3.12",
545 "Role": "arn:aws:iam::123456789012:role/test-role",
546 "Handler": "index.handler",
547 "Code": { "ZipFile": "UEsFBgAAAAAAAAAAAAAAAAAAAAA=" }
548 });
549
550 let req = make_request(
551 Method::POST,
552 "/2015-03-31/functions",
553 &create_body.to_string(),
554 );
555 let resp = svc.handle(req).await.unwrap();
556 assert_eq!(resp.status, StatusCode::CREATED);
557
558 let body: Value = serde_json::from_slice(&resp.body).unwrap();
559 assert_eq!(body["FunctionName"], "my-func");
560 assert_eq!(body["Runtime"], "python3.12");
561
562 let req = make_request(Method::GET, "/2015-03-31/functions/my-func", "");
564 let resp = svc.handle(req).await.unwrap();
565 assert_eq!(resp.status, StatusCode::OK);
566 let body: Value = serde_json::from_slice(&resp.body).unwrap();
567 assert_eq!(body["Configuration"]["FunctionName"], "my-func");
568 }
569
570 #[tokio::test]
571 async fn test_delete_function() {
572 let state = make_state();
573 let svc = LambdaService::new(state);
574
575 let create_body = json!({
576 "FunctionName": "to-delete",
577 "Runtime": "nodejs20.x",
578 "Role": "arn:aws:iam::123456789012:role/test",
579 "Handler": "index.handler",
580 "Code": {}
581 });
582
583 let req = make_request(
584 Method::POST,
585 "/2015-03-31/functions",
586 &create_body.to_string(),
587 );
588 svc.handle(req).await.unwrap();
589
590 let req = make_request(Method::DELETE, "/2015-03-31/functions/to-delete", "");
591 let resp = svc.handle(req).await.unwrap();
592 assert_eq!(resp.status, StatusCode::NO_CONTENT);
593
594 let req = make_request(Method::GET, "/2015-03-31/functions/to-delete", "");
596 let resp = svc.handle(req).await;
597 assert!(resp.is_err());
598 }
599
600 #[tokio::test]
601 async fn test_invoke() {
602 let state = make_state();
603 let svc = LambdaService::new(state);
604
605 let create_body = json!({
606 "FunctionName": "invoke-me",
607 "Runtime": "python3.12",
608 "Role": "arn:aws:iam::123456789012:role/test",
609 "Handler": "index.handler",
610 "Code": {}
611 });
612
613 let req = make_request(
614 Method::POST,
615 "/2015-03-31/functions",
616 &create_body.to_string(),
617 );
618 svc.handle(req).await.unwrap();
619
620 let req = make_request(
621 Method::POST,
622 "/2015-03-31/functions/invoke-me/invocations",
623 r#"{"key": "value"}"#,
624 );
625 let resp = svc.handle(req).await.unwrap();
626 assert_eq!(resp.status, StatusCode::OK);
627 }
628
629 #[tokio::test]
630 async fn test_list_functions() {
631 let state = make_state();
632 let svc = LambdaService::new(state);
633
634 for name in &["func-a", "func-b"] {
635 let create_body = json!({
636 "FunctionName": name,
637 "Runtime": "python3.12",
638 "Role": "arn:aws:iam::123456789012:role/test",
639 "Handler": "index.handler",
640 "Code": {}
641 });
642 let req = make_request(
643 Method::POST,
644 "/2015-03-31/functions",
645 &create_body.to_string(),
646 );
647 svc.handle(req).await.unwrap();
648 }
649
650 let req = make_request(Method::GET, "/2015-03-31/functions", "");
651 let resp = svc.handle(req).await.unwrap();
652 let body: Value = serde_json::from_slice(&resp.body).unwrap();
653 assert_eq!(body["Functions"].as_array().unwrap().len(), 2);
654 }
655
656 #[tokio::test]
657 async fn test_event_source_mapping() {
658 let state = make_state();
659 let svc = LambdaService::new(state);
660
661 let create_body = json!({
663 "FunctionName": "esm-func",
664 "Runtime": "python3.12",
665 "Role": "arn:aws:iam::123456789012:role/test",
666 "Handler": "index.handler",
667 "Code": {}
668 });
669 let req = make_request(
670 Method::POST,
671 "/2015-03-31/functions",
672 &create_body.to_string(),
673 );
674 svc.handle(req).await.unwrap();
675
676 let mapping_body = json!({
678 "FunctionName": "esm-func",
679 "EventSourceArn": "arn:aws:sqs:us-east-1:123456789012:my-queue",
680 "BatchSize": 5
681 });
682 let req = make_request(
683 Method::POST,
684 "/2015-03-31/event-source-mappings",
685 &mapping_body.to_string(),
686 );
687 let resp = svc.handle(req).await.unwrap();
688 assert_eq!(resp.status, StatusCode::ACCEPTED);
689 let body: Value = serde_json::from_slice(&resp.body).unwrap();
690 let uuid = body["UUID"].as_str().unwrap().to_string();
691
692 let req = make_request(Method::GET, "/2015-03-31/event-source-mappings", "");
694 let resp = svc.handle(req).await.unwrap();
695 let body: Value = serde_json::from_slice(&resp.body).unwrap();
696 assert_eq!(body["EventSourceMappings"].as_array().unwrap().len(), 1);
697
698 let req = make_request(
700 Method::DELETE,
701 &format!("/2015-03-31/event-source-mappings/{uuid}"),
702 "",
703 );
704 let resp = svc.handle(req).await.unwrap();
705 assert_eq!(resp.status, StatusCode::ACCEPTED);
706 }
707}