1use chrono::Utc;
7use http::StatusCode;
8use serde_json::{json, Value};
9
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
11
12use crate::service::LambdaService;
13use crate::state::{
14 AccountSettings, CapacityProvider, CodeSigningConfig, DurableExecution, EventInvokeConfig,
15 FunctionAlias, FunctionScalingConfig, FunctionUrlConfig, LambdaState, Layer, LayerVersion,
16 ProvisionedConcurrencyConfig, RuntimeManagementConfig,
17};
18
19fn missing(name: &str) -> AwsServiceError {
20 AwsServiceError::aws_error(
21 StatusCode::BAD_REQUEST,
22 "InvalidParameterValueException",
23 format!("Missing required field: {name}"),
24 )
25}
26
27fn not_found(entity: &str, name: &str) -> AwsServiceError {
28 AwsServiceError::aws_error(
29 StatusCode::NOT_FOUND,
30 "ResourceNotFoundException",
31 format!("{entity} not found: {name}"),
32 )
33}
34
35fn ok(body: Value) -> Result<AwsResponse, AwsServiceError> {
36 Ok(AwsResponse::json(StatusCode::OK, body.to_string()))
37}
38
39fn empty() -> Result<AwsResponse, AwsServiceError> {
40 Ok(AwsResponse::json(StatusCode::OK, "{}".to_string()))
41}
42
43fn body(req: &AwsRequest) -> Value {
44 serde_json::from_slice(&req.body).unwrap_or_else(|_| Value::Object(Default::default()))
45}
46
47fn parse_qualifier(req: &AwsRequest) -> String {
48 req.query_params
49 .get("Qualifier")
50 .cloned()
51 .unwrap_or_else(|| "$LATEST".to_string())
52}
53
54fn id_from_time(prefix: &str) -> String {
55 format!(
56 "{}{}",
57 prefix,
58 std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .map(|d| d.as_nanos())
61 .unwrap_or(0)
62 )
63}
64
65impl LambdaService {
66 pub(crate) async fn handle_extra(
67 &self,
68 action: &str,
69 resource: Option<&str>,
70 req: &AwsRequest,
71 ) -> Result<AwsResponse, AwsServiceError> {
72 let aid = req.account_id.as_str();
73 let res = resource.unwrap_or("");
74 match action {
75 "GetFunctionConfiguration" => self.get_function_configuration(res, aid),
77 "UpdateFunctionConfiguration" => self.update_function_configuration(res, req),
78 "UpdateFunctionCode" => self.update_function_code(res, req),
79 "UpdateEventSourceMapping" => self.update_event_source_mapping_handler(res, req),
80 "GetAccountSettings" => self.get_account_settings(aid),
81 "InvokeAsync" => Ok(AwsResponse::json(StatusCode::ACCEPTED, "{}".to_string())),
82 "InvokeWithResponseStream" => Ok(AwsResponse::json(StatusCode::OK, "{}".to_string())),
83
84 "ListVersionsByFunction" => self.list_versions_by_function(res, aid),
86
87 "CreateAlias" => self.create_alias(res, req),
89 "GetAlias" => self.get_alias(res, req),
90 "ListAliases" => self.list_aliases(res, aid),
91 "UpdateAlias" => self.update_alias(res, req),
92 "DeleteAlias" => self.delete_alias(res, req),
93
94 "PublishLayerVersion" => self.publish_layer_version(res, req),
96 "GetLayerVersion" => self.get_layer_version(req),
97 "GetLayerVersionByArn" => self.get_layer_version_by_arn(req),
98 "ListLayers" => self.list_layers(aid),
99 "ListLayerVersions" => self.list_layer_versions(res, aid),
100 "DeleteLayerVersion" => self.delete_layer_version(req),
101 "GetLayerVersionPolicy" => self.get_layer_version_policy(req),
102 "AddLayerVersionPermission" => self.add_layer_version_permission(req),
103 "RemoveLayerVersionPermission" => self.remove_layer_version_permission(req),
104
105 "CreateFunctionUrlConfig" => self.create_function_url_config(res, req),
107 "GetFunctionUrlConfig" => self.get_function_url_config(res, aid),
108 "UpdateFunctionUrlConfig" => self.update_function_url_config(res, req),
109 "DeleteFunctionUrlConfig" => self.delete_function_url_config(res, aid),
110 "ListFunctionUrlConfigs" => self.list_function_url_configs(aid),
111
112 "PutFunctionConcurrency" => self.put_function_concurrency(res, req),
114 "GetFunctionConcurrency" => self.get_function_concurrency(res, aid),
115 "DeleteFunctionConcurrency" => self.delete_function_concurrency(res, aid),
116 "PutProvisionedConcurrencyConfig" => self.put_provisioned_concurrency(res, req),
117 "GetProvisionedConcurrencyConfig" => self.get_provisioned_concurrency(res, req),
118 "DeleteProvisionedConcurrencyConfig" => self.delete_provisioned_concurrency(res, req),
119 "ListProvisionedConcurrencyConfigs" => self.list_provisioned_concurrency(res, aid),
120
121 "CreateCodeSigningConfig" => self.create_code_signing_config(req),
123 "GetCodeSigningConfig" => self.get_code_signing_config(res, aid),
124 "UpdateCodeSigningConfig" => self.update_code_signing_config(res, req),
125 "DeleteCodeSigningConfig" => self.delete_code_signing_config(res, aid),
126 "ListCodeSigningConfigs" => self.list_code_signing_configs(aid),
127 "PutFunctionCodeSigningConfig" => self.put_function_code_signing(res, req),
128 "GetFunctionCodeSigningConfig" => self.get_function_code_signing(res, aid),
129 "DeleteFunctionCodeSigningConfig" => self.delete_function_code_signing(res, aid),
130 "ListFunctionsByCodeSigningConfig" => self.list_functions_by_code_signing(res, aid),
131
132 "PutFunctionEventInvokeConfig" | "UpdateFunctionEventInvokeConfig" => {
134 self.put_function_event_invoke(res, req)
135 }
136 "GetFunctionEventInvokeConfig" => self.get_function_event_invoke(res, req),
137 "DeleteFunctionEventInvokeConfig" => self.delete_function_event_invoke(res, req),
138 "ListFunctionEventInvokeConfigs" => self.list_function_event_invoke(res, aid),
139
140 "PutRuntimeManagementConfig" => self.put_runtime_management(res, req),
142 "GetRuntimeManagementConfig" => self.get_runtime_management(res, req),
143
144 "PutFunctionScalingConfig" => self.put_scaling_config(res, req),
146 "GetFunctionScalingConfig" => self.get_scaling_config(res, aid),
147
148 "PutFunctionRecursionConfig" => self.put_recursion_config(res, req),
150 "GetFunctionRecursionConfig" => self.get_recursion_config(res, aid),
151
152 "TagResource" => self.tag_resource(res, req),
154 "UntagResource" => self.untag_resource(res, req),
155 "ListTags" => self.list_tags(res, aid),
156
157 "CreateCapacityProvider" => self.create_capacity_provider(req),
159 "GetCapacityProvider" => self.get_capacity_provider(res, aid),
160 "UpdateCapacityProvider" => self.update_capacity_provider(res, req),
161 "DeleteCapacityProvider" => self.delete_capacity_provider(res, aid),
162 "ListCapacityProviders" => self.list_capacity_providers(aid),
163 "ListFunctionVersionsByCapacityProvider" => {
164 self.list_versions_by_capacity_provider(res, aid)
165 }
166
167 "CheckpointDurableExecution" => self.checkpoint_durable_execution(res, req),
169 "GetDurableExecution" => self.get_durable_execution(res, aid),
170 "GetDurableExecutionHistory" => self.get_durable_execution_history(res, aid),
171 "GetDurableExecutionState" => self.get_durable_execution_state(res, aid),
172 "ListDurableExecutionsByFunction" => self.list_durable_executions_by_function(res, aid),
173 "StopDurableExecution" => self.stop_durable_execution(res, aid),
174 "SendDurableExecutionCallbackSuccess" => {
175 self.send_durable_callback(res, req, "SUCCESS")
176 }
177 "SendDurableExecutionCallbackFailure" => {
178 self.send_durable_callback(res, req, "FAILURE")
179 }
180 "SendDurableExecutionCallbackHeartbeat" => {
181 self.send_durable_callback(res, req, "HEARTBEAT")
182 }
183
184 _ => Err(AwsServiceError::action_not_implemented("lambda", action)),
185 }
186 }
187
188 fn with_state_read<F, R>(&self, account_id: &str, region: &str, f: F) -> R
189 where
190 F: FnOnce(&LambdaState) -> R,
191 {
192 let accounts = self.state.read();
193 let empty = LambdaState::new(account_id, region);
194 let state = accounts.get(account_id).unwrap_or(&empty);
195 f(state)
196 }
197
198 fn get_function_configuration(
201 &self,
202 function_name: &str,
203 account_id: &str,
204 ) -> Result<AwsResponse, AwsServiceError> {
205 let region = self.region_for(account_id);
206 self.with_state_read(account_id, ®ion, |state| {
207 state
208 .functions
209 .get(function_name)
210 .map(|f| ok(self.function_config_json(f)))
211 .unwrap_or_else(|| Err(not_found("Function", function_name)))
212 })
213 }
214
215 fn update_function_configuration(
216 &self,
217 function_name: &str,
218 req: &AwsRequest,
219 ) -> Result<AwsResponse, AwsServiceError> {
220 let body = body(req);
221 let mut accounts = self.state.write();
222 let state = accounts.get_or_create(&req.account_id);
223 let func = state
224 .functions
225 .get_mut(function_name)
226 .ok_or_else(|| not_found("Function", function_name))?;
227 if let Some(handler) = body["Handler"].as_str() {
228 func.handler = handler.to_string();
229 }
230 if let Some(t) = body["Timeout"].as_i64() {
231 func.timeout = t;
232 }
233 if let Some(m) = body["MemorySize"].as_i64() {
234 func.memory_size = m;
235 }
236 if let Some(role) = body["Role"].as_str() {
237 func.role = role.to_string();
238 }
239 if let Some(desc) = body["Description"].as_str() {
240 func.description = desc.to_string();
241 }
242 func.last_modified = Utc::now();
243 ok(self.function_config_json(func))
244 }
245
246 fn update_function_code(
247 &self,
248 function_name: &str,
249 req: &AwsRequest,
250 ) -> Result<AwsResponse, AwsServiceError> {
251 let mut accounts = self.state.write();
252 let state = accounts.get_or_create(&req.account_id);
253 let func = state
254 .functions
255 .get_mut(function_name)
256 .ok_or_else(|| not_found("Function", function_name))?;
257 func.last_modified = Utc::now();
258 ok(self.function_config_json(func))
259 }
260
261 fn get_account_settings(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
262 let mut accounts = self.state.write();
263 let state = accounts.get_or_create(account_id);
264 let settings = state.account_settings.clone().unwrap_or(AccountSettings {
265 concurrent_executions: 1000,
266 code_size_zipped: 52_428_800,
267 code_size_unzipped: 262_144_000,
268 total_code_size: 80_530_636_800,
269 });
270 if state.account_settings.is_none() {
271 state.account_settings = Some(settings.clone());
272 }
273 ok(json!({
274 "AccountLimit": {
275 "ConcurrentExecutions": settings.concurrent_executions,
276 "CodeSizeZipped": settings.code_size_zipped,
277 "CodeSizeUnzipped": settings.code_size_unzipped,
278 "TotalCodeSize": settings.total_code_size,
279 "UnreservedConcurrentExecutions": settings.concurrent_executions,
280 },
281 "AccountUsage": {
282 "TotalCodeSize": 0,
283 "FunctionCount": 0,
284 },
285 }))
286 }
287
288 fn list_versions_by_function(
291 &self,
292 function_name: &str,
293 account_id: &str,
294 ) -> Result<AwsResponse, AwsServiceError> {
295 let region = self.region_for(account_id);
296 self.with_state_read(account_id, ®ion, |state| {
297 if !state.functions.contains_key(function_name) {
298 return Err(not_found("Function", function_name));
299 }
300 let versions: Vec<&String> = state
301 .function_versions
302 .get(function_name)
303 .map(|v| v.iter().collect())
304 .unwrap_or_default();
305 ok(json!({
306 "Versions": versions,
307 }))
308 })
309 }
310
311 fn alias_key(function: &str, alias: &str) -> String {
314 format!("{function}:{alias}")
315 }
316
317 fn create_alias(
318 &self,
319 function_name: &str,
320 req: &AwsRequest,
321 ) -> Result<AwsResponse, AwsServiceError> {
322 let body = body(req);
323 let name = body["Name"]
324 .as_str()
325 .ok_or_else(|| missing("Name"))?
326 .to_string();
327 let version = body["FunctionVersion"]
328 .as_str()
329 .unwrap_or("$LATEST")
330 .to_string();
331 let mut accounts = self.state.write();
332 let state = accounts.get_or_create(&req.account_id);
333 if !state.functions.contains_key(function_name) {
334 return Err(not_found("Function", function_name));
335 }
336 let alias_arn = format!(
337 "arn:aws:lambda:{}:{}:function:{}:{}",
338 state.region, state.account_id, function_name, name
339 );
340 let alias = FunctionAlias {
341 alias_arn: alias_arn.clone(),
342 name: name.clone(),
343 function_version: version,
344 description: body["Description"].as_str().unwrap_or("").to_string(),
345 revision_id: id_from_time("rev-"),
346 routing_config: body.get("RoutingConfig").cloned(),
347 };
348 state
349 .aliases
350 .insert(Self::alias_key(function_name, &name), alias.clone());
351 ok(serde_json::to_value(alias).unwrap_or_default())
352 }
353
354 fn get_alias(
355 &self,
356 function_name: &str,
357 req: &AwsRequest,
358 ) -> Result<AwsResponse, AwsServiceError> {
359 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
360 let region = self.region_for(&req.account_id);
361 self.with_state_read(&req.account_id, ®ion, |state| {
362 state
363 .aliases
364 .get(&Self::alias_key(function_name, &alias_name))
365 .map(|a| ok(serde_json::to_value(a).unwrap_or_default()))
366 .unwrap_or_else(|| Err(not_found("Alias", &alias_name)))
367 })
368 }
369
370 fn list_aliases(
371 &self,
372 function_name: &str,
373 account_id: &str,
374 ) -> Result<AwsResponse, AwsServiceError> {
375 let region = self.region_for(account_id);
376 self.with_state_read(account_id, ®ion, |state| {
377 let prefix = format!("{function_name}:");
378 let aliases: Vec<&FunctionAlias> = state
379 .aliases
380 .iter()
381 .filter(|(k, _)| k.starts_with(&prefix))
382 .map(|(_, v)| v)
383 .collect();
384 ok(json!({"Aliases": aliases}))
385 })
386 }
387
388 fn update_alias(
389 &self,
390 function_name: &str,
391 req: &AwsRequest,
392 ) -> Result<AwsResponse, AwsServiceError> {
393 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
394 let body = body(req);
395 let mut accounts = self.state.write();
396 let state = accounts.get_or_create(&req.account_id);
397 let key = Self::alias_key(function_name, &alias_name);
398 let alias = state
399 .aliases
400 .get_mut(&key)
401 .ok_or_else(|| not_found("Alias", &alias_name))?;
402 if let Some(v) = body["FunctionVersion"].as_str() {
403 alias.function_version = v.to_string();
404 }
405 if let Some(d) = body["Description"].as_str() {
406 alias.description = d.to_string();
407 }
408 if let Some(rc) = body.get("RoutingConfig") {
409 alias.routing_config = Some(rc.clone());
410 }
411 alias.revision_id = id_from_time("rev-");
412 ok(serde_json::to_value(alias).unwrap_or_default())
413 }
414
415 fn delete_alias(
416 &self,
417 function_name: &str,
418 req: &AwsRequest,
419 ) -> Result<AwsResponse, AwsServiceError> {
420 let alias_name = req.path_segments.get(4).cloned().unwrap_or_default();
421 let mut accounts = self.state.write();
422 let state = accounts.get_or_create(&req.account_id);
423 state
424 .aliases
425 .remove(&Self::alias_key(function_name, &alias_name));
426 empty()
427 }
428
429 fn publish_layer_version(
432 &self,
433 layer_name: &str,
434 req: &AwsRequest,
435 ) -> Result<AwsResponse, AwsServiceError> {
436 let body = body(req);
437 let mut accounts = self.state.write();
438 let state = accounts.get_or_create(&req.account_id);
439 let layer = state
440 .layers
441 .entry(layer_name.to_string())
442 .or_insert_with(|| Layer {
443 layer_name: layer_name.to_string(),
444 layer_arn: format!(
445 "arn:aws:lambda:{}:{}:layer:{}",
446 state.region, state.account_id, layer_name
447 ),
448 versions: Vec::new(),
449 });
450 let next_version = (layer.versions.len() as i64) + 1;
451 let version_arn = format!("{}:{}", layer.layer_arn, next_version);
452 let runtimes: Vec<String> = body["CompatibleRuntimes"]
453 .as_array()
454 .map(|arr| {
455 arr.iter()
456 .filter_map(|v| v.as_str().map(String::from))
457 .collect()
458 })
459 .unwrap_or_default();
460 let lv = LayerVersion {
461 version: next_version,
462 layer_version_arn: version_arn.clone(),
463 description: body["Description"].as_str().unwrap_or("").to_string(),
464 created_date: Utc::now(),
465 compatible_runtimes: runtimes,
466 license_info: body["LicenseInfo"].as_str().unwrap_or("").to_string(),
467 policy: None,
468 };
469 layer.versions.push(lv.clone());
470 ok(json!({
471 "LayerArn": layer.layer_arn,
472 "LayerVersionArn": version_arn,
473 "Version": next_version,
474 "Description": lv.description,
475 "CreatedDate": lv.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
476 "CompatibleRuntimes": lv.compatible_runtimes,
477 "LicenseInfo": lv.license_info,
478 }))
479 }
480
481 fn list_layers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
482 let region = self.region_for(account_id);
483 self.with_state_read(account_id, ®ion, |state| {
484 let layers: Vec<Value> = state
485 .layers
486 .values()
487 .map(|l| {
488 json!({
489 "LayerName": l.layer_name,
490 "LayerArn": l.layer_arn,
491 "LatestMatchingVersion": l.versions.last().map(|v| json!({
492 "LayerVersionArn": v.layer_version_arn,
493 "Version": v.version,
494 "Description": v.description,
495 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
496 "CompatibleRuntimes": v.compatible_runtimes,
497 })),
498 })
499 })
500 .collect();
501 ok(json!({"Layers": layers}))
502 })
503 }
504
505 fn list_layer_versions(
506 &self,
507 layer_name: &str,
508 account_id: &str,
509 ) -> Result<AwsResponse, AwsServiceError> {
510 let region = self.region_for(account_id);
511 self.with_state_read(account_id, ®ion, |state| {
512 let versions: Vec<Value> = state
513 .layers
514 .get(layer_name)
515 .map(|l| {
516 l.versions
517 .iter()
518 .map(|v| {
519 json!({
520 "LayerVersionArn": v.layer_version_arn,
521 "Version": v.version,
522 "Description": v.description,
523 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
524 "CompatibleRuntimes": v.compatible_runtimes,
525 "LicenseInfo": v.license_info,
526 })
527 })
528 .collect()
529 })
530 .unwrap_or_default();
531 ok(json!({"LayerVersions": versions}))
532 })
533 }
534
535 fn get_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
536 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
537 let version: i64 = req
538 .path_segments
539 .get(4)
540 .and_then(|s| s.parse().ok())
541 .ok_or_else(|| missing("VersionNumber"))?;
542 let region = self.region_for(&req.account_id);
543 self.with_state_read(&req.account_id, ®ion, |state| {
544 state
545 .layers
546 .get(&layer_name)
547 .and_then(|l| l.versions.iter().find(|v| v.version == version))
548 .map(|v| {
549 ok(json!({
550 "LayerVersionArn": v.layer_version_arn,
551 "Version": v.version,
552 "Description": v.description,
553 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
554 "CompatibleRuntimes": v.compatible_runtimes,
555 "LicenseInfo": v.license_info,
556 "Content": {
557 "Location": "https://example.com/layer.zip",
558 "CodeSha256": "",
559 "CodeSize": 0,
560 },
561 }))
562 })
563 .unwrap_or_else(|| Err(not_found("LayerVersion", &layer_name)))
564 })
565 }
566
567 fn get_layer_version_by_arn(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568 let arn = req
569 .query_params
570 .get("Arn")
571 .or_else(|| req.query_params.get("find"))
572 .cloned()
573 .unwrap_or_default();
574 let parts: Vec<&str> = arn.rsplitn(3, ':').collect();
576 if parts.len() < 3 {
577 return Err(missing("Arn"));
578 }
579 let version: i64 = parts[0].parse().map_err(|_| missing("Arn"))?;
580 let layer_name = parts[1].to_string();
581 let region = self.region_for(&req.account_id);
582 self.with_state_read(&req.account_id, ®ion, |state| {
583 state
584 .layers
585 .get(&layer_name)
586 .and_then(|l| l.versions.iter().find(|v| v.version == version))
587 .map(|v| {
588 ok(json!({
589 "LayerVersionArn": v.layer_version_arn,
590 "Version": v.version,
591 "Description": v.description,
592 "CreatedDate": v.created_date.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
593 "CompatibleRuntimes": v.compatible_runtimes,
594 "LicenseInfo": v.license_info,
595 }))
596 })
597 .unwrap_or_else(|| Err(not_found("LayerVersion", &arn)))
598 })
599 }
600
601 fn delete_layer_version(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
602 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
603 let version: i64 = req
604 .path_segments
605 .get(4)
606 .and_then(|s| s.parse().ok())
607 .unwrap_or(0);
608 let mut accounts = self.state.write();
609 let state = accounts.get_or_create(&req.account_id);
610 if let Some(layer) = state.layers.get_mut(&layer_name) {
611 layer.versions.retain(|v| v.version != version);
612 }
613 empty()
614 }
615
616 fn get_layer_version_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
617 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
618 let version: i64 = req
619 .path_segments
620 .get(4)
621 .and_then(|s| s.parse().ok())
622 .unwrap_or(0);
623 let region = self.region_for(&req.account_id);
624 self.with_state_read(&req.account_id, ®ion, |state| {
625 let policy = state
626 .layers
627 .get(&layer_name)
628 .and_then(|l| l.versions.iter().find(|v| v.version == version))
629 .and_then(|v| v.policy.clone())
630 .unwrap_or_else(|| "{}".to_string());
631 ok(json!({"Policy": policy, "RevisionId": id_from_time("rev-")}))
632 })
633 }
634
635 fn add_layer_version_permission(
636 &self,
637 req: &AwsRequest,
638 ) -> Result<AwsResponse, AwsServiceError> {
639 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
640 let version: i64 = req
641 .path_segments
642 .get(4)
643 .and_then(|s| s.parse().ok())
644 .unwrap_or(0);
645 let body = body(req);
646 let mut accounts = self.state.write();
647 let state = accounts.get_or_create(&req.account_id);
648 if let Some(layer) = state.layers.get_mut(&layer_name) {
649 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
650 let policy = v.policy.clone().unwrap_or_else(|| "{}".to_string());
651 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
652 let statements = policy_doc["Statement"].as_array_mut();
653 let new_stmt = json!({
654 "Sid": body["StatementId"].as_str().unwrap_or("default"),
655 "Effect": "Allow",
656 "Principal": body["Principal"].clone(),
657 "Action": body["Action"].clone(),
658 "Resource": v.layer_version_arn.clone(),
659 });
660 if let Some(s) = statements {
661 s.push(new_stmt);
662 } else {
663 policy_doc = json!({"Version": "2012-10-17", "Statement": [new_stmt]});
664 }
665 v.policy = Some(policy_doc.to_string());
666 }
667 }
668 ok(json!({
669 "Statement": body["StatementId"],
670 "RevisionId": id_from_time("rev-"),
671 }))
672 }
673
674 fn remove_layer_version_permission(
675 &self,
676 req: &AwsRequest,
677 ) -> Result<AwsResponse, AwsServiceError> {
678 let layer_name = req.path_segments.get(2).cloned().unwrap_or_default();
679 let version: i64 = req
680 .path_segments
681 .get(4)
682 .and_then(|s| s.parse().ok())
683 .unwrap_or(0);
684 let sid = req.path_segments.get(6).cloned().unwrap_or_default();
685 let mut accounts = self.state.write();
686 let state = accounts.get_or_create(&req.account_id);
687 if let Some(layer) = state.layers.get_mut(&layer_name) {
688 if let Some(v) = layer.versions.iter_mut().find(|v| v.version == version) {
689 if let Some(policy) = v.policy.clone() {
690 let mut policy_doc: Value = serde_json::from_str(&policy).unwrap_or(json!({}));
691 if let Some(stmts) = policy_doc["Statement"].as_array_mut() {
692 stmts.retain(|s| s["Sid"].as_str() != Some(&sid));
693 }
694 v.policy = Some(policy_doc.to_string());
695 }
696 }
697 }
698 empty()
699 }
700
701 fn create_function_url_config(
704 &self,
705 function_name: &str,
706 req: &AwsRequest,
707 ) -> Result<AwsResponse, AwsServiceError> {
708 let body = body(req);
709 let auth_type = body["AuthType"].as_str().unwrap_or("NONE").to_string();
710 let now = Utc::now();
711 let mut accounts = self.state.write();
712 let state = accounts.get_or_create(&req.account_id);
713 if !state.functions.contains_key(function_name) {
714 return Err(not_found("Function", function_name));
715 }
716 let function_arn = format!(
717 "arn:aws:lambda:{}:{}:function:{}",
718 state.region, state.account_id, function_name
719 );
720 let cfg = FunctionUrlConfig {
721 function_arn: function_arn.clone(),
722 function_url: format!(
723 "https://{function_name}.lambda-url.{}.on.aws/",
724 state.region
725 ),
726 auth_type: auth_type.clone(),
727 cors: body.get("Cors").cloned(),
728 creation_time: now,
729 last_modified_time: now,
730 invoke_mode: body["InvokeMode"]
731 .as_str()
732 .unwrap_or("BUFFERED")
733 .to_string(),
734 };
735 state
736 .function_url_configs
737 .insert(function_name.to_string(), cfg.clone());
738 ok(serde_json::to_value(cfg).unwrap_or_default())
739 }
740
741 fn get_function_url_config(
742 &self,
743 function_name: &str,
744 account_id: &str,
745 ) -> Result<AwsResponse, AwsServiceError> {
746 let region = self.region_for(account_id);
747 self.with_state_read(account_id, ®ion, |state| {
748 state
749 .function_url_configs
750 .get(function_name)
751 .map(|c| ok(serde_json::to_value(c).unwrap_or_default()))
752 .unwrap_or_else(|| Err(not_found("FunctionUrlConfig", function_name)))
753 })
754 }
755
756 fn update_function_url_config(
757 &self,
758 function_name: &str,
759 req: &AwsRequest,
760 ) -> Result<AwsResponse, AwsServiceError> {
761 let body = body(req);
762 let mut accounts = self.state.write();
763 let state = accounts.get_or_create(&req.account_id);
764 let cfg = state
765 .function_url_configs
766 .get_mut(function_name)
767 .ok_or_else(|| not_found("FunctionUrlConfig", function_name))?;
768 if let Some(a) = body["AuthType"].as_str() {
769 cfg.auth_type = a.to_string();
770 }
771 if let Some(c) = body.get("Cors") {
772 cfg.cors = Some(c.clone());
773 }
774 if let Some(m) = body["InvokeMode"].as_str() {
775 cfg.invoke_mode = m.to_string();
776 }
777 cfg.last_modified_time = Utc::now();
778 ok(serde_json::to_value(cfg).unwrap_or_default())
779 }
780
781 fn delete_function_url_config(
782 &self,
783 function_name: &str,
784 account_id: &str,
785 ) -> Result<AwsResponse, AwsServiceError> {
786 let mut accounts = self.state.write();
787 let state = accounts.get_or_create(account_id);
788 state.function_url_configs.remove(function_name);
789 empty()
790 }
791
792 fn list_function_url_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
793 let region = self.region_for(account_id);
794 self.with_state_read(account_id, ®ion, |state| {
795 let configs: Vec<&FunctionUrlConfig> = state.function_url_configs.values().collect();
796 ok(json!({"FunctionUrlConfigs": configs}))
797 })
798 }
799
800 fn put_function_concurrency(
803 &self,
804 function_name: &str,
805 req: &AwsRequest,
806 ) -> Result<AwsResponse, AwsServiceError> {
807 let body = body(req);
808 let n = body["ReservedConcurrentExecutions"]
809 .as_i64()
810 .ok_or_else(|| missing("ReservedConcurrentExecutions"))?;
811 let mut accounts = self.state.write();
812 let state = accounts.get_or_create(&req.account_id);
813 state
814 .function_concurrency
815 .insert(function_name.to_string(), n);
816 ok(json!({"ReservedConcurrentExecutions": n}))
817 }
818
819 fn get_function_concurrency(
820 &self,
821 function_name: &str,
822 account_id: &str,
823 ) -> Result<AwsResponse, AwsServiceError> {
824 let region = self.region_for(account_id);
825 self.with_state_read(account_id, ®ion, |state| {
826 let n = state
827 .function_concurrency
828 .get(function_name)
829 .copied()
830 .unwrap_or(0);
831 ok(json!({"ReservedConcurrentExecutions": n}))
832 })
833 }
834
835 fn delete_function_concurrency(
836 &self,
837 function_name: &str,
838 account_id: &str,
839 ) -> Result<AwsResponse, AwsServiceError> {
840 let mut accounts = self.state.write();
841 let state = accounts.get_or_create(account_id);
842 state.function_concurrency.remove(function_name);
843 empty()
844 }
845
846 fn pc_key(function: &str, qualifier: &str) -> String {
847 format!("{function}:{qualifier}")
848 }
849
850 fn put_provisioned_concurrency(
851 &self,
852 function_name: &str,
853 req: &AwsRequest,
854 ) -> Result<AwsResponse, AwsServiceError> {
855 let body = body(req);
856 let qualifier = parse_qualifier(req);
857 let requested = body["ProvisionedConcurrentExecutions"]
858 .as_i64()
859 .ok_or_else(|| missing("ProvisionedConcurrentExecutions"))?;
860 let mut accounts = self.state.write();
861 let state = accounts.get_or_create(&req.account_id);
862 let cfg = ProvisionedConcurrencyConfig {
863 requested,
864 allocated: requested,
865 status: "READY".to_string(),
866 last_modified: Utc::now(),
867 };
868 state
869 .provisioned_concurrency
870 .insert(Self::pc_key(function_name, &qualifier), cfg.clone());
871 ok(json!({
872 "RequestedProvisionedConcurrentExecutions": cfg.requested,
873 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
874 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
875 "Status": cfg.status,
876 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
877 }))
878 }
879
880 fn get_provisioned_concurrency(
881 &self,
882 function_name: &str,
883 req: &AwsRequest,
884 ) -> Result<AwsResponse, AwsServiceError> {
885 let qualifier = parse_qualifier(req);
886 let region = self.region_for(&req.account_id);
887 self.with_state_read(&req.account_id, ®ion, |state| {
888 state
889 .provisioned_concurrency
890 .get(&Self::pc_key(function_name, &qualifier))
891 .map(|cfg| ok(json!({
892 "RequestedProvisionedConcurrentExecutions": cfg.requested,
893 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
894 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
895 "Status": cfg.status,
896 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
897 })))
898 .unwrap_or_else(|| Err(not_found("ProvisionedConcurrencyConfig", function_name)))
899 })
900 }
901
902 fn delete_provisioned_concurrency(
903 &self,
904 function_name: &str,
905 req: &AwsRequest,
906 ) -> Result<AwsResponse, AwsServiceError> {
907 let qualifier = parse_qualifier(req);
908 let mut accounts = self.state.write();
909 let state = accounts.get_or_create(&req.account_id);
910 state
911 .provisioned_concurrency
912 .remove(&Self::pc_key(function_name, &qualifier));
913 empty()
914 }
915
916 fn list_provisioned_concurrency(
917 &self,
918 function_name: &str,
919 account_id: &str,
920 ) -> Result<AwsResponse, AwsServiceError> {
921 let region = self.region_for(account_id);
922 self.with_state_read(account_id, ®ion, |state| {
923 let prefix = format!("{function_name}:");
924 let configs: Vec<Value> = state
925 .provisioned_concurrency
926 .iter()
927 .filter(|(k, _)| k.starts_with(&prefix))
928 .map(|(k, cfg)| {
929 let qualifier = k.split(':').next_back().unwrap_or("$LATEST");
930 json!({
931 "FunctionArn": format!(
932 "arn:aws:lambda:{}:{}:function:{}:{}",
933 state.region, state.account_id, function_name, qualifier
934 ),
935 "Status": cfg.status,
936 "RequestedProvisionedConcurrentExecutions": cfg.requested,
937 "AvailableProvisionedConcurrentExecutions": cfg.allocated,
938 "AllocatedProvisionedConcurrentExecutions": cfg.allocated,
939 "LastModified": cfg.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
940 })
941 })
942 .collect();
943 ok(json!({"ProvisionedConcurrencyConfigs": configs}))
944 })
945 }
946
947 fn create_code_signing_config(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
950 let body = body(req);
951 let mut accounts = self.state.write();
952 let state = accounts.get_or_create(&req.account_id);
953 let id = id_from_time("csc-");
954 let arn = format!(
955 "arn:aws:lambda:{}:{}:code-signing-config:{}",
956 state.region, state.account_id, id
957 );
958 let publishers: Vec<String> = body
959 .get("AllowedPublishers")
960 .and_then(|v| v.get("SigningProfileVersionArns"))
961 .and_then(|v| v.as_array())
962 .map(|arr| {
963 arr.iter()
964 .filter_map(|x| x.as_str().map(String::from))
965 .collect()
966 })
967 .unwrap_or_default();
968 let csc = CodeSigningConfig {
969 csc_id: id.clone(),
970 csc_arn: arn,
971 description: body["Description"].as_str().unwrap_or("").to_string(),
972 allowed_publishers: publishers,
973 untrusted_artifact_action: body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]
974 .as_str()
975 .unwrap_or("Warn")
976 .to_string(),
977 last_modified: Utc::now(),
978 };
979 state.code_signing_configs.insert(id, csc.clone());
980 ok(json!({"CodeSigningConfig": code_signing_json(&csc)}))
981 }
982
983 fn get_code_signing_config(
984 &self,
985 csc_id: &str,
986 account_id: &str,
987 ) -> Result<AwsResponse, AwsServiceError> {
988 let id = extract_csc_id(csc_id);
989 let region = self.region_for(account_id);
990 self.with_state_read(account_id, ®ion, |state| {
991 state
992 .code_signing_configs
993 .get(&id)
994 .map(|c| ok(json!({"CodeSigningConfig": code_signing_json(c)})))
995 .unwrap_or_else(|| Err(not_found("CodeSigningConfig", &id)))
996 })
997 }
998
999 fn update_code_signing_config(
1000 &self,
1001 csc_id: &str,
1002 req: &AwsRequest,
1003 ) -> Result<AwsResponse, AwsServiceError> {
1004 let body = body(req);
1005 let mut accounts = self.state.write();
1006 let state = accounts.get_or_create(&req.account_id);
1007 let id = extract_csc_id(csc_id);
1008 let csc = state
1009 .code_signing_configs
1010 .get_mut(&id)
1011 .ok_or_else(|| not_found("CodeSigningConfig", &id))?;
1012 if let Some(d) = body["Description"].as_str() {
1013 csc.description = d.to_string();
1014 }
1015 if let Some(action) = body["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"].as_str()
1016 {
1017 csc.untrusted_artifact_action = action.to_string();
1018 }
1019 csc.last_modified = Utc::now();
1020 ok(json!({"CodeSigningConfig": code_signing_json(csc)}))
1021 }
1022
1023 fn delete_code_signing_config(
1024 &self,
1025 csc_id: &str,
1026 account_id: &str,
1027 ) -> Result<AwsResponse, AwsServiceError> {
1028 let id = extract_csc_id(csc_id);
1029 let mut accounts = self.state.write();
1030 let state = accounts.get_or_create(account_id);
1031 state.code_signing_configs.remove(&id);
1032 empty()
1033 }
1034
1035 fn list_code_signing_configs(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1036 let region = self.region_for(account_id);
1037 self.with_state_read(account_id, ®ion, |state| {
1038 let cfgs: Vec<Value> = state
1039 .code_signing_configs
1040 .values()
1041 .map(code_signing_json)
1042 .collect();
1043 ok(json!({"CodeSigningConfigs": cfgs}))
1044 })
1045 }
1046
1047 fn put_function_code_signing(
1048 &self,
1049 function_name: &str,
1050 req: &AwsRequest,
1051 ) -> Result<AwsResponse, AwsServiceError> {
1052 let body = body(req);
1053 let csc_arn = body["CodeSigningConfigArn"]
1054 .as_str()
1055 .ok_or_else(|| missing("CodeSigningConfigArn"))?
1056 .to_string();
1057 let mut accounts = self.state.write();
1058 let state = accounts.get_or_create(&req.account_id);
1059 state
1060 .function_code_signing
1061 .insert(function_name.to_string(), csc_arn.clone());
1062 ok(json!({
1063 "CodeSigningConfigArn": csc_arn,
1064 "FunctionName": function_name,
1065 }))
1066 }
1067
1068 fn get_function_code_signing(
1069 &self,
1070 function_name: &str,
1071 account_id: &str,
1072 ) -> Result<AwsResponse, AwsServiceError> {
1073 let region = self.region_for(account_id);
1074 self.with_state_read(account_id, ®ion, |state| {
1075 let arn = state
1076 .function_code_signing
1077 .get(function_name)
1078 .cloned()
1079 .unwrap_or_default();
1080 ok(json!({
1081 "CodeSigningConfigArn": arn,
1082 "FunctionName": function_name,
1083 }))
1084 })
1085 }
1086
1087 fn delete_function_code_signing(
1088 &self,
1089 function_name: &str,
1090 account_id: &str,
1091 ) -> Result<AwsResponse, AwsServiceError> {
1092 let mut accounts = self.state.write();
1093 let state = accounts.get_or_create(account_id);
1094 state.function_code_signing.remove(function_name);
1095 empty()
1096 }
1097
1098 fn list_functions_by_code_signing(
1099 &self,
1100 csc_id: &str,
1101 account_id: &str,
1102 ) -> Result<AwsResponse, AwsServiceError> {
1103 let id = extract_csc_id(csc_id);
1104 let region = self.region_for(account_id);
1105 self.with_state_read(account_id, ®ion, |state| {
1106 let funcs: Vec<&String> = state
1107 .function_code_signing
1108 .iter()
1109 .filter(|(_, v)| v.contains(&id))
1110 .map(|(k, _)| k)
1111 .collect();
1112 ok(json!({"FunctionArns": funcs}))
1113 })
1114 }
1115
1116 fn ev_key(function: &str, qualifier: &str) -> String {
1119 format!("{function}:{qualifier}")
1120 }
1121
1122 fn put_function_event_invoke(
1123 &self,
1124 function_name: &str,
1125 req: &AwsRequest,
1126 ) -> Result<AwsResponse, AwsServiceError> {
1127 let body = body(req);
1128 let qualifier = parse_qualifier(req);
1129 let function_arn = format!(
1130 "arn:aws:lambda:{}:{}:function:{}",
1131 self.region_for(&req.account_id),
1132 req.account_id,
1133 function_name
1134 );
1135 let cfg = EventInvokeConfig {
1136 function_arn: function_arn.clone(),
1137 maximum_event_age: body["MaximumEventAgeInSeconds"].as_i64().unwrap_or(21600),
1138 maximum_retry_attempts: body["MaximumRetryAttempts"].as_i64().unwrap_or(2),
1139 destination_config: body.get("DestinationConfig").cloned().unwrap_or(json!({})),
1140 last_modified: Utc::now(),
1141 };
1142 let mut accounts = self.state.write();
1143 let state = accounts.get_or_create(&req.account_id);
1144 state
1145 .event_invoke_configs
1146 .insert(Self::ev_key(function_name, &qualifier), cfg.clone());
1147 ok(event_invoke_json(&cfg))
1148 }
1149
1150 fn get_function_event_invoke(
1151 &self,
1152 function_name: &str,
1153 req: &AwsRequest,
1154 ) -> Result<AwsResponse, AwsServiceError> {
1155 let qualifier = parse_qualifier(req);
1156 let region = self.region_for(&req.account_id);
1157 self.with_state_read(&req.account_id, ®ion, |state| {
1158 state
1159 .event_invoke_configs
1160 .get(&Self::ev_key(function_name, &qualifier))
1161 .map(|c| ok(event_invoke_json(c)))
1162 .unwrap_or_else(|| Err(not_found("EventInvokeConfig", function_name)))
1163 })
1164 }
1165
1166 fn delete_function_event_invoke(
1167 &self,
1168 function_name: &str,
1169 req: &AwsRequest,
1170 ) -> Result<AwsResponse, AwsServiceError> {
1171 let qualifier = parse_qualifier(req);
1172 let mut accounts = self.state.write();
1173 let state = accounts.get_or_create(&req.account_id);
1174 state
1175 .event_invoke_configs
1176 .remove(&Self::ev_key(function_name, &qualifier));
1177 empty()
1178 }
1179
1180 fn list_function_event_invoke(
1181 &self,
1182 function_name: &str,
1183 account_id: &str,
1184 ) -> Result<AwsResponse, AwsServiceError> {
1185 let region = self.region_for(account_id);
1186 self.with_state_read(account_id, ®ion, |state| {
1187 let prefix = format!("{function_name}:");
1188 let configs: Vec<Value> = state
1189 .event_invoke_configs
1190 .iter()
1191 .filter(|(k, _)| k.starts_with(&prefix))
1192 .map(|(_, c)| event_invoke_json(c))
1193 .collect();
1194 ok(json!({"FunctionEventInvokeConfigs": configs}))
1195 })
1196 }
1197
1198 fn put_runtime_management(
1201 &self,
1202 function_name: &str,
1203 req: &AwsRequest,
1204 ) -> Result<AwsResponse, AwsServiceError> {
1205 let body = body(req);
1206 let qualifier = parse_qualifier(req);
1207 let cfg = RuntimeManagementConfig {
1208 update_runtime_on: body["UpdateRuntimeOn"]
1209 .as_str()
1210 .unwrap_or("Auto")
1211 .to_string(),
1212 runtime_version_arn: body["RuntimeVersionArn"].as_str().unwrap_or("").to_string(),
1213 };
1214 let mut accounts = self.state.write();
1215 let state = accounts.get_or_create(&req.account_id);
1216 state
1217 .runtime_management
1218 .insert(format!("{function_name}:{qualifier}"), cfg.clone());
1219 ok(json!({
1220 "FunctionArn": format!("arn:aws:lambda:{}:{}:function:{}:{}", state.region, state.account_id, function_name, qualifier),
1221 "UpdateRuntimeOn": cfg.update_runtime_on,
1222 "RuntimeVersionArn": cfg.runtime_version_arn,
1223 }))
1224 }
1225
1226 fn get_runtime_management(
1227 &self,
1228 function_name: &str,
1229 req: &AwsRequest,
1230 ) -> Result<AwsResponse, AwsServiceError> {
1231 let qualifier = parse_qualifier(req);
1232 let region = self.region_for(&req.account_id);
1233 self.with_state_read(&req.account_id, ®ion, |state| {
1234 let cfg = state
1235 .runtime_management
1236 .get(&format!("{function_name}:{qualifier}"))
1237 .cloned()
1238 .unwrap_or(RuntimeManagementConfig {
1239 update_runtime_on: "Auto".to_string(),
1240 runtime_version_arn: String::new(),
1241 });
1242 ok(json!({
1243 "FunctionArn": format!(
1244 "arn:aws:lambda:{}:{}:function:{}:{}",
1245 state.region, state.account_id, function_name, qualifier
1246 ),
1247 "UpdateRuntimeOn": cfg.update_runtime_on,
1248 "RuntimeVersionArn": cfg.runtime_version_arn,
1249 }))
1250 })
1251 }
1252
1253 fn put_scaling_config(
1256 &self,
1257 uuid: &str,
1258 req: &AwsRequest,
1259 ) -> Result<AwsResponse, AwsServiceError> {
1260 let body = body(req);
1261 let cfg = FunctionScalingConfig {
1262 maximum_concurrency: body["MaximumConcurrency"].as_i64().unwrap_or(0),
1263 };
1264 let mut accounts = self.state.write();
1265 let state = accounts.get_or_create(&req.account_id);
1266 state.scaling_configs.insert(uuid.to_string(), cfg.clone());
1267 ok(json!({
1268 "MaximumConcurrency": cfg.maximum_concurrency,
1269 }))
1270 }
1271
1272 fn get_scaling_config(
1273 &self,
1274 uuid: &str,
1275 account_id: &str,
1276 ) -> Result<AwsResponse, AwsServiceError> {
1277 let region = self.region_for(account_id);
1278 self.with_state_read(account_id, ®ion, |state| {
1279 let n = state
1280 .scaling_configs
1281 .get(uuid)
1282 .map(|c| c.maximum_concurrency)
1283 .unwrap_or(0);
1284 ok(json!({"MaximumConcurrency": n}))
1285 })
1286 }
1287
1288 fn put_recursion_config(
1291 &self,
1292 function_name: &str,
1293 req: &AwsRequest,
1294 ) -> Result<AwsResponse, AwsServiceError> {
1295 let body = body(req);
1296 let mode = body["RecursiveLoop"]
1297 .as_str()
1298 .unwrap_or("Terminate")
1299 .to_string();
1300 let mut accounts = self.state.write();
1301 let state = accounts.get_or_create(&req.account_id);
1302 state
1303 .recursion_configs
1304 .insert(function_name.to_string(), mode.clone());
1305 ok(json!({"RecursiveLoop": mode}))
1306 }
1307
1308 fn get_recursion_config(
1309 &self,
1310 function_name: &str,
1311 account_id: &str,
1312 ) -> Result<AwsResponse, AwsServiceError> {
1313 let region = self.region_for(account_id);
1314 self.with_state_read(account_id, ®ion, |state| {
1315 let mode = state
1316 .recursion_configs
1317 .get(function_name)
1318 .cloned()
1319 .unwrap_or_else(|| "Terminate".to_string());
1320 ok(json!({"RecursiveLoop": mode}))
1321 })
1322 }
1323
1324 fn tag_resource(
1327 &self,
1328 resource_arn: &str,
1329 req: &AwsRequest,
1330 ) -> Result<AwsResponse, AwsServiceError> {
1331 let body = body(req);
1332 let new_tags: Vec<(String, String)> = body
1333 .get("Tags")
1334 .and_then(|v| v.as_object())
1335 .map(|m| {
1336 m.iter()
1337 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1338 .collect()
1339 })
1340 .unwrap_or_default();
1341 let mut accounts = self.state.write();
1342 let state = accounts.get_or_create(&req.account_id);
1343 let entry = state.tags.entry(resource_arn.to_string()).or_default();
1344 for (k, v) in new_tags {
1345 entry.retain(|(ek, _)| ek != &k);
1346 entry.push((k, v));
1347 }
1348 empty()
1349 }
1350
1351 fn untag_resource(
1352 &self,
1353 resource_arn: &str,
1354 req: &AwsRequest,
1355 ) -> Result<AwsResponse, AwsServiceError> {
1356 let mut keys: Vec<String> = Vec::new();
1357 for (k, v) in &req.query_params {
1358 if k.starts_with("tagKeys") {
1359 keys.push(v.clone());
1360 }
1361 }
1362 let mut accounts = self.state.write();
1363 let state = accounts.get_or_create(&req.account_id);
1364 if let Some(entry) = state.tags.get_mut(resource_arn) {
1365 entry.retain(|(k, _)| !keys.contains(k));
1366 }
1367 empty()
1368 }
1369
1370 fn list_tags(
1371 &self,
1372 resource_arn: &str,
1373 account_id: &str,
1374 ) -> Result<AwsResponse, AwsServiceError> {
1375 let region = self.region_for(account_id);
1376 self.with_state_read(account_id, ®ion, |state| {
1377 let tags: serde_json::Map<String, Value> = state
1378 .tags
1379 .get(resource_arn)
1380 .map(|v| {
1381 v.iter()
1382 .map(|(k, val)| (k.clone(), Value::String(val.clone())))
1383 .collect()
1384 })
1385 .unwrap_or_default();
1386 ok(json!({"Tags": tags}))
1387 })
1388 }
1389
1390 fn create_capacity_provider(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1393 let body = body(req);
1394 let name = body["CapacityProviderName"]
1395 .as_str()
1396 .or_else(|| body["Name"].as_str())
1397 .ok_or_else(|| missing("CapacityProviderName"))?
1398 .to_string();
1399 let mut accounts = self.state.write();
1400 let state = accounts.get_or_create(&req.account_id);
1401 let arn = format!(
1402 "arn:aws:lambda:{}:{}:capacity-provider:{}",
1403 state.region, state.account_id, name
1404 );
1405 let cp = CapacityProvider {
1406 name: name.clone(),
1407 arn: arn.clone(),
1408 status: "ACTIVE".to_string(),
1409 created: Utc::now(),
1410 };
1411 state.capacity_providers.insert(name, cp.clone());
1412 ok(json!({
1413 "Name": cp.name,
1414 "Arn": cp.arn,
1415 "Status": cp.status,
1416 }))
1417 }
1418
1419 fn get_capacity_provider(
1420 &self,
1421 name: &str,
1422 account_id: &str,
1423 ) -> Result<AwsResponse, AwsServiceError> {
1424 let region = self.region_for(account_id);
1425 self.with_state_read(account_id, ®ion, |state| {
1426 state
1427 .capacity_providers
1428 .get(name)
1429 .map(|cp| {
1430 ok(json!({
1431 "Name": cp.name,
1432 "Arn": cp.arn,
1433 "Status": cp.status,
1434 }))
1435 })
1436 .unwrap_or_else(|| Err(not_found("CapacityProvider", name)))
1437 })
1438 }
1439
1440 fn update_capacity_provider(
1441 &self,
1442 name: &str,
1443 req: &AwsRequest,
1444 ) -> Result<AwsResponse, AwsServiceError> {
1445 let mut accounts = self.state.write();
1446 let state = accounts.get_or_create(&req.account_id);
1447 let cp = state
1448 .capacity_providers
1449 .get_mut(name)
1450 .ok_or_else(|| not_found("CapacityProvider", name))?;
1451 cp.status = "ACTIVE".to_string();
1452 ok(json!({
1453 "Name": cp.name,
1454 "Arn": cp.arn,
1455 "Status": cp.status,
1456 }))
1457 }
1458
1459 fn delete_capacity_provider(
1460 &self,
1461 name: &str,
1462 account_id: &str,
1463 ) -> Result<AwsResponse, AwsServiceError> {
1464 let mut accounts = self.state.write();
1465 let state = accounts.get_or_create(account_id);
1466 state.capacity_providers.remove(name);
1467 empty()
1468 }
1469
1470 fn list_capacity_providers(&self, account_id: &str) -> Result<AwsResponse, AwsServiceError> {
1471 let region = self.region_for(account_id);
1472 self.with_state_read(account_id, ®ion, |state| {
1473 let cps: Vec<Value> = state
1474 .capacity_providers
1475 .values()
1476 .map(|cp| {
1477 json!({
1478 "Name": cp.name,
1479 "Arn": cp.arn,
1480 "Status": cp.status,
1481 })
1482 })
1483 .collect();
1484 ok(json!({"CapacityProviders": cps}))
1485 })
1486 }
1487
1488 fn list_versions_by_capacity_provider(
1489 &self,
1490 _name: &str,
1491 _account_id: &str,
1492 ) -> Result<AwsResponse, AwsServiceError> {
1493 ok(json!({"FunctionVersions": []}))
1494 }
1495
1496 fn checkpoint_durable_execution(
1499 &self,
1500 id: &str,
1501 req: &AwsRequest,
1502 ) -> Result<AwsResponse, AwsServiceError> {
1503 let body = body(req);
1504 let body_arn = body
1505 .get("FunctionArn")
1506 .and_then(|v| v.as_str())
1507 .map(String::from);
1508 let body_function = body
1509 .get("FunctionName")
1510 .and_then(|v| v.as_str())
1511 .map(String::from);
1512 let mut accounts = self.state.write();
1513 let state = accounts.get_or_create(&req.account_id);
1514 let derived_arn = body_arn.unwrap_or_else(|| match body_function {
1515 Some(name) if name.starts_with("arn:") => name,
1516 Some(name) => format!(
1517 "arn:aws:lambda:us-east-1:{}:function:{name}",
1518 req.account_id
1519 ),
1520 None => String::new(),
1521 });
1522 let exec = state
1523 .durable_executions
1524 .entry(id.to_string())
1525 .or_insert_with(|| DurableExecution {
1526 id: id.to_string(),
1527 function_arn: derived_arn.clone(),
1528 status: "RUNNING".to_string(),
1529 started: Utc::now(),
1530 stopped: None,
1531 history: Vec::new(),
1532 state: json!({}),
1533 });
1534 if exec.function_arn.is_empty() && !derived_arn.is_empty() {
1535 exec.function_arn = derived_arn;
1536 }
1537 if let Some(s) = body.get("State") {
1538 exec.state = s.clone();
1539 }
1540 if let Some(h) = body.get("HistoryEvent") {
1541 exec.history.push(h.clone());
1542 }
1543 empty()
1544 }
1545
1546 fn get_durable_execution(
1547 &self,
1548 id: &str,
1549 account_id: &str,
1550 ) -> Result<AwsResponse, AwsServiceError> {
1551 let region = self.region_for(account_id);
1552 self.with_state_read(account_id, ®ion, |state| {
1553 state
1554 .durable_executions
1555 .get(id)
1556 .map(|e| {
1557 ok(json!({
1558 "Id": e.id,
1559 "FunctionArn": e.function_arn,
1560 "Status": e.status,
1561 "Started": e.started.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1562 "Stopped": e.stopped.map(|d| d.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
1563 }))
1564 })
1565 .unwrap_or_else(|| Err(not_found("DurableExecution", id)))
1566 })
1567 }
1568
1569 fn get_durable_execution_history(
1570 &self,
1571 id: &str,
1572 account_id: &str,
1573 ) -> Result<AwsResponse, AwsServiceError> {
1574 let region = self.region_for(account_id);
1575 self.with_state_read(account_id, ®ion, |state| {
1576 let history = state
1577 .durable_executions
1578 .get(id)
1579 .map(|e| e.history.clone())
1580 .unwrap_or_default();
1581 ok(json!({"Events": history}))
1582 })
1583 }
1584
1585 fn get_durable_execution_state(
1586 &self,
1587 id: &str,
1588 account_id: &str,
1589 ) -> Result<AwsResponse, AwsServiceError> {
1590 let region = self.region_for(account_id);
1591 self.with_state_read(account_id, ®ion, |state| {
1592 let s = state
1593 .durable_executions
1594 .get(id)
1595 .map(|e| e.state.clone())
1596 .unwrap_or(json!({}));
1597 ok(json!({"State": s}))
1598 })
1599 }
1600
1601 fn list_durable_executions_by_function(
1602 &self,
1603 function_name: &str,
1604 account_id: &str,
1605 ) -> Result<AwsResponse, AwsServiceError> {
1606 let region = self.region_for(account_id);
1607 self.with_state_read(account_id, ®ion, |state| {
1608 let executions: Vec<Value> = state
1609 .durable_executions
1610 .values()
1611 .filter(|e| e.function_arn.contains(function_name))
1612 .map(|e| {
1613 json!({
1614 "Id": e.id,
1615 "Status": e.status,
1616 })
1617 })
1618 .collect();
1619 ok(json!({"DurableExecutions": executions}))
1620 })
1621 }
1622
1623 fn stop_durable_execution(
1624 &self,
1625 id: &str,
1626 account_id: &str,
1627 ) -> Result<AwsResponse, AwsServiceError> {
1628 let mut accounts = self.state.write();
1629 let state = accounts.get_or_create(account_id);
1630 if let Some(e) = state.durable_executions.get_mut(id) {
1631 e.status = "STOPPED".to_string();
1632 e.stopped = Some(Utc::now());
1633 }
1634 empty()
1635 }
1636
1637 fn send_durable_callback(
1638 &self,
1639 id: &str,
1640 _req: &AwsRequest,
1641 kind: &str,
1642 ) -> Result<AwsResponse, AwsServiceError> {
1643 let mut accounts = self.state.write();
1644 let state = accounts.get_or_create(_req.account_id.as_str());
1645 if let Some(e) = state.durable_executions.get_mut(id) {
1646 e.history.push(
1647 json!({"type": format!("Callback{kind}"), "timestamp": Utc::now().to_rfc3339()}),
1648 );
1649 if kind == "SUCCESS" {
1650 e.status = "SUCCEEDED".to_string();
1651 } else if kind == "FAILURE" {
1652 e.status = "FAILED".to_string();
1653 }
1654 }
1655 empty()
1656 }
1657
1658 fn update_event_source_mapping_handler(
1659 &self,
1660 uuid: &str,
1661 req: &AwsRequest,
1662 ) -> Result<AwsResponse, AwsServiceError> {
1663 let body = body(req);
1664 let mut accounts = self.state.write();
1665 let state = accounts.get_or_create(&req.account_id);
1666 let esm = state
1667 .event_source_mappings
1668 .get_mut(uuid)
1669 .ok_or_else(|| not_found("EventSourceMapping", uuid))?;
1670 if let Some(b) = body["BatchSize"].as_i64() {
1671 esm.batch_size = b;
1672 }
1673 if let Some(name) = body["FunctionName"].as_str() {
1674 esm.function_arn = format!(
1675 "arn:aws:lambda:{}:{}:function:{}",
1676 state.region, state.account_id, name
1677 );
1678 }
1679 if let Some(filters) = body
1680 .get("FilterCriteria")
1681 .and_then(|v| v.get("Filters"))
1682 .and_then(|v| v.as_array())
1683 {
1684 esm.filter_patterns = filters
1685 .iter()
1686 .filter_map(|f| f.get("Pattern").and_then(|p| p.as_str()).map(String::from))
1687 .collect();
1688 }
1689 if let Some(types) = body.get("FunctionResponseTypes").and_then(|v| v.as_array()) {
1690 esm.function_response_types = types
1691 .iter()
1692 .filter_map(|v| v.as_str().map(String::from))
1693 .collect();
1694 }
1695 if let Some(w) = body
1696 .get("MaximumBatchingWindowInSeconds")
1697 .and_then(|v| v.as_i64())
1698 {
1699 esm.maximum_batching_window_in_seconds = Some(w);
1700 }
1701 if let Some(p) = body.get("ParallelizationFactor").and_then(|v| v.as_i64()) {
1702 esm.parallelization_factor = Some(p);
1703 }
1704 let mut body_json = json!({
1705 "UUID": esm.uuid,
1706 "FunctionArn": esm.function_arn,
1707 "EventSourceArn": esm.event_source_arn,
1708 "BatchSize": esm.batch_size,
1709 "State": "Enabled",
1710 "StateTransitionReason": "USER_INITIATED",
1711 "LastModified": chrono::Utc::now().timestamp() as f64,
1712 });
1713 let obj = body_json.as_object_mut().expect("json! built object");
1714 if !esm.filter_patterns.is_empty() {
1715 obj.insert(
1716 "FilterCriteria".into(),
1717 json!({
1718 "Filters": esm
1719 .filter_patterns
1720 .iter()
1721 .map(|p| json!({"Pattern": p}))
1722 .collect::<Vec<_>>(),
1723 }),
1724 );
1725 }
1726 if !esm.function_response_types.is_empty() {
1727 obj.insert(
1728 "FunctionResponseTypes".into(),
1729 json!(esm.function_response_types),
1730 );
1731 }
1732 if let Some(w) = esm.maximum_batching_window_in_seconds {
1733 obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
1734 }
1735 if let Some(p) = esm.parallelization_factor {
1736 obj.insert("ParallelizationFactor".into(), json!(p));
1737 }
1738 ok(body_json)
1739 }
1740
1741 fn region_for(&self, account_id: &str) -> String {
1742 let accounts = self.state.read();
1743 accounts
1744 .get(account_id)
1745 .map(|s| s.region.clone())
1746 .unwrap_or_else(|| "us-east-1".to_string())
1747 }
1748}
1749
1750fn extract_csc_id(input: &str) -> String {
1751 let decoded = percent_decode(input);
1754 decoded.rsplit(':').next().unwrap_or(&decoded).to_string()
1755}
1756
1757fn percent_decode(input: &str) -> String {
1758 let mut out = String::with_capacity(input.len());
1759 let bytes = input.as_bytes();
1760 let mut i = 0;
1761 while i < bytes.len() {
1762 if bytes[i] == b'%' && i + 2 < bytes.len() {
1763 let hi = (bytes[i + 1] as char).to_digit(16);
1764 let lo = (bytes[i + 2] as char).to_digit(16);
1765 if let (Some(h), Some(l)) = (hi, lo) {
1766 out.push(((h * 16 + l) as u8) as char);
1767 i += 3;
1768 continue;
1769 }
1770 }
1771 out.push(bytes[i] as char);
1772 i += 1;
1773 }
1774 out
1775}
1776
1777fn code_signing_json(c: &CodeSigningConfig) -> Value {
1778 json!({
1779 "CodeSigningConfigId": c.csc_id,
1780 "CodeSigningConfigArn": c.csc_arn,
1781 "Description": c.description,
1782 "AllowedPublishers": {
1783 "SigningProfileVersionArns": c.allowed_publishers,
1784 },
1785 "CodeSigningPolicies": {
1786 "UntrustedArtifactOnDeployment": c.untrusted_artifact_action,
1787 },
1788 "LastModified": c.last_modified.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string(),
1789 })
1790}
1791
1792fn event_invoke_json(c: &EventInvokeConfig) -> Value {
1793 json!({
1794 "FunctionArn": c.function_arn,
1795 "MaximumEventAgeInSeconds": c.maximum_event_age,
1796 "MaximumRetryAttempts": c.maximum_retry_attempts,
1797 "DestinationConfig": c.destination_config,
1798 "LastModified": c.last_modified.timestamp(),
1799 })
1800}
1801
1802#[cfg(test)]
1803mod tests {
1804 use crate::service::LambdaService;
1805 use crate::state::{LambdaState, SharedLambdaState};
1806 use fakecloud_core::multi_account::MultiAccountState;
1807 use fakecloud_core::service::AwsRequest;
1808 use http::Method;
1809 use parking_lot::RwLock;
1810 use std::collections::HashMap;
1811 use std::sync::Arc;
1812
1813 fn svc() -> LambdaService {
1814 let state: SharedLambdaState = Arc::new(RwLock::new(
1815 MultiAccountState::<LambdaState>::new("000000000000", "us-east-1", ""),
1816 ));
1817 LambdaService::new(state)
1818 }
1819
1820 fn req(action: &str, body: &str, segs: &[&str]) -> AwsRequest {
1821 AwsRequest {
1822 service: "lambda".to_string(),
1823 method: Method::POST,
1824 raw_path: format!("/{}", segs.join("/")),
1825 raw_query: String::new(),
1826 path_segments: segs.iter().map(|s| s.to_string()).collect(),
1827 query_params: HashMap::new(),
1828 headers: http::HeaderMap::new(),
1829 body: bytes::Bytes::from(body.to_string()),
1830 body_stream: parking_lot::Mutex::new(None),
1831 account_id: "000000000000".to_string(),
1832 region: "us-east-1".to_string(),
1833 request_id: "rid".to_string(),
1834 action: action.to_string(),
1835 is_query_protocol: false,
1836 access_key_id: None,
1837 principal: None,
1838 }
1839 }
1840
1841 async fn run(s: &LambdaService, action: &str, body: &str, res: Option<&str>, segs: &[&str]) {
1842 let r = s.handle_extra(action, res, &req(action, body, segs)).await;
1843 match r {
1844 Ok(resp) => assert!(resp.status.is_success(), "{action} status: {}", resp.status),
1845 Err(e) => panic!("{action} failed: {e:?}"),
1846 }
1847 }
1848
1849 #[tokio::test]
1850 async fn read_only_listings_succeed_without_state() {
1851 let s = svc();
1852 run(&s, "GetAccountSettings", "", None, &[]).await;
1853 run(&s, "InvokeAsync", r#"{}"#, Some("fn"), &[]).await;
1854 run(&s, "InvokeWithResponseStream", r#"{}"#, Some("fn"), &[]).await;
1855 run(&s, "ListLayers", "", None, &[]).await;
1856 run(&s, "ListLayerVersions", "", Some("layer"), &[]).await;
1857 run(&s, "ListCapacityProviders", "", None, &[]).await;
1858 }
1859
1860 #[tokio::test]
1861 async fn layers_lifecycle() {
1862 let s = svc();
1863 run(
1864 &s,
1865 "PublishLayerVersion",
1866 r#"{"Content":{"ZipFile":""}}"#,
1867 Some("layer1"),
1868 &["2018-10-31", "layers", "layer1", "versions"],
1869 )
1870 .await;
1871 run(&s, "ListLayers", "", None, &[]).await;
1872 run(&s, "ListLayerVersions", "", Some("layer1"), &[]).await;
1873 }
1874
1875 #[tokio::test]
1876 async fn capacity_providers_lifecycle() {
1877 let s = svc();
1878 run(
1879 &s,
1880 "CreateCapacityProvider",
1881 r#"{"CapacityProviderName":"cp1"}"#,
1882 None,
1883 &[],
1884 )
1885 .await;
1886 run(&s, "GetCapacityProvider", "", Some("cp1"), &[]).await;
1887 run(&s, "ListCapacityProviders", "", None, &[]).await;
1888 run(&s, "UpdateCapacityProvider", r#"{}"#, Some("cp1"), &[]).await;
1889 run(&s, "DeleteCapacityProvider", "", Some("cp1"), &[]).await;
1890 }
1891
1892 #[tokio::test]
1893 async fn durable_executions() {
1894 let s = svc();
1895 run(
1896 &s,
1897 "CheckpointDurableExecution",
1898 r#"{"FunctionName":"fn"}"#,
1899 Some("d1"),
1900 &[],
1901 )
1902 .await;
1903 run(&s, "GetDurableExecution", "", Some("d1"), &[]).await;
1904 run(&s, "GetDurableExecutionHistory", "", Some("d1"), &[]).await;
1905 run(&s, "GetDurableExecutionState", "", Some("d1"), &[]).await;
1906 run(&s, "StopDurableExecution", "", Some("d1"), &[]).await;
1907 }
1908
1909 #[tokio::test]
1910 async fn code_signing_lifecycle() {
1911 let s = svc();
1912 run(
1913 &s,
1914 "CreateCodeSigningConfig",
1915 r#"{"AllowedPublishers":{"SigningProfileVersionArns":[]}}"#,
1916 None,
1917 &[],
1918 )
1919 .await;
1920 run(&s, "ListCodeSigningConfigs", "", None, &[]).await;
1921 }
1922}