use awsim_core::{AwsError, RequestContext};
use serde_json::{Value, json};
use tracing::{debug, warn};
use crate::{
error::resource_not_found,
executor,
state::{InvocationRecord, InvocationSlot, LambdaState},
util::{new_uuid, now_iso8601, opt_str, require_str},
};
fn map_io_err(e: std::io::Error) -> AwsError {
AwsError::internal(format!("read function code: {e}"))
}
thread_local! {
static INVOCATION_STACK: std::cell::RefCell<Vec<String>> =
const { std::cell::RefCell::new(Vec::new()) };
}
struct RecursionFrame;
impl RecursionFrame {
fn push(name: &str) -> Self {
INVOCATION_STACK.with(|s| s.borrow_mut().push(name.to_string()));
RecursionFrame
}
fn contains(name: &str) -> bool {
INVOCATION_STACK.with(|s| s.borrow().iter().any(|n| n == name))
}
fn snapshot() -> Vec<String> {
INVOCATION_STACK.with(|s| s.borrow().clone())
}
}
impl Drop for RecursionFrame {
fn drop(&mut self) {
INVOCATION_STACK.with(|s| {
s.borrow_mut().pop();
});
}
}
fn too_many_requests(function_name: &str, current: u32, cap: u32) -> AwsError {
AwsError::too_many_requests(
"TooManyRequestsException",
format!(
"Rate Exceeded: function `{function_name}` has {current} in-flight \
invocation(s), at its ReservedConcurrentExecutions cap of {cap}."
),
)
.with_extra(
"Reason",
serde_json::Value::String("ConcurrentInvocationLimitExceeded".into()),
)
}
fn extract_zip(zip_bytes: &[u8], dest: &std::path::Path) -> Result<(), String> {
use std::io::Read;
std::fs::create_dir_all(dest).map_err(|e| format!("create_dir_all failed: {e}"))?;
let cursor = std::io::Cursor::new(zip_bytes);
let mut archive = zip::ZipArchive::new(cursor).map_err(|e| format!("zip open failed: {e}"))?;
for i in 0..archive.len() {
let mut entry = archive
.by_index(i)
.map_err(|e| format!("zip entry {i}: {e}"))?;
let entry_path = dest.join(entry.name());
if entry.is_dir() {
std::fs::create_dir_all(&entry_path)
.map_err(|e| format!("mkdir {}: {e}", entry_path.display()))?;
} else {
if let Some(parent) = entry_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| format!("mkdir parent: {e}"))?;
}
let mut data = Vec::new();
entry
.read_to_end(&mut data)
.map_err(|e| format!("read entry: {e}"))?;
std::fs::write(&entry_path, &data)
.map_err(|e| format!("write {}: {e}", entry_path.display()))?;
}
}
Ok(())
}
fn ensure_code_dir(
function_name: &str,
code_data: &[u8],
code_sha256: &str,
) -> Result<std::path::PathBuf, String> {
let cache_dir = std::env::temp_dir()
.join("awsim-lambda")
.join(function_name)
.join("code");
let stamp_path = cache_dir.join(".awsim_sha256");
if cache_dir.exists() {
if let Ok(existing) = std::fs::read_to_string(&stamp_path)
&& existing.trim() == code_sha256
{
debug!(function_name, "Using cached code directory");
return Ok(cache_dir);
}
std::fs::remove_dir_all(&cache_dir).map_err(|e| format!("remove stale cache: {e}"))?;
}
debug!(function_name, "Extracting zip to cache directory");
extract_zip(code_data, &cache_dir)?;
std::fs::write(&stamp_path, code_sha256).map_err(|e| format!("write sha stamp: {e}"))?;
Ok(cache_dir)
}
pub fn invoke(
state: &LambdaState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let name = require_str(input, "FunctionName")?;
let invocation_type = opt_str(input, "InvocationType").unwrap_or("RequestResponse");
let payload = input.get("Payload").cloned().unwrap_or(json!({}));
let function_info = {
let f = state
.functions
.get(name)
.ok_or_else(|| resource_not_found("function", name))?;
let code_bytes = f
.code
.as_ref()
.map(|c| c.read_all())
.transpose()
.map_err(map_io_err)?;
(
f.runtime.clone(),
f.handler.clone(),
code_bytes,
f.code_sha256.clone(),
f.environment.clone(),
f.timeout,
f.memory_size,
f.reserved_concurrent_executions,
f.recursive_loop.clone(),
)
};
let (
runtime,
handler,
code_data,
code_sha256,
env_vars,
timeout,
memory_size,
reserved_concurrent_executions,
recursive_loop,
) = function_info;
if invocation_type != "DryRun" && recursive_loop != "Allow" && RecursionFrame::contains(name) {
let chain = RecursionFrame::snapshot().join(" -> ");
return Err(AwsError::bad_request(
"InvalidRequestContentException",
format!(
"Function `{name}` is already on the invocation chain ({chain}); \
RecursiveLoop=Terminate refuses to re-invoke."
),
));
}
if invocation_type == "DryRun" {
return Ok(json!({
"StatusCode": 204u64,
"__status_code": 204u64,
}));
}
let _recursion_frame = if invocation_type != "DryRun" {
Some(RecursionFrame::push(name))
} else {
None
};
let concurrency_slot = if let Some(cap) = reserved_concurrent_executions {
let current = InvocationSlot::current(state, name);
if current >= cap {
return Err(too_many_requests(name, current, cap));
}
let counter = InvocationSlot::acquire(state, name);
Some(InvocationSlot::from_acquired(counter))
} else {
None
};
let request_id = new_uuid();
let mut invocation_env = env_vars.clone();
invocation_env
.entry("AWS_LAMBDA_FUNCTION_NAME".to_string())
.or_insert_with(|| name.to_string());
invocation_env
.entry("AWS_LAMBDA_FUNCTION_MEMORY_SIZE".to_string())
.or_insert_with(|| memory_size.to_string());
invocation_env
.entry("AWS_REQUEST_ID".to_string())
.or_insert_with(|| request_id.clone());
let event_json = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
if invocation_type == "Event" {
if let (Some(rt), Some(hndlr), Some(data)) =
(runtime.as_deref(), handler.as_deref(), code_data.as_deref())
{
match ensure_code_dir(name, data, &code_sha256) {
Ok(code_dir) => {
let rt = rt.to_string();
let hndlr = hndlr.to_string();
let env = invocation_env.clone();
let async_slot = concurrency_slot;
std::thread::spawn(move || {
let _ = executor::execute_function(
&rt,
&hndlr,
&code_dir,
&event_json,
&env,
timeout,
);
drop(async_slot);
});
}
Err(e) => {
warn!(
error = %e,
function_name = name,
"Async invoke: failed to prepare code; dropping"
);
}
}
}
return Ok(json!({
"StatusCode": 202u64,
"__status_code": 202u64,
"__headers": { "X-Awsim-Memory-MB": memory_size.to_string() },
}));
}
let (response_payload, exec_error, exec_logs) = if let (Some(rt), Some(hndlr), Some(data)) =
(runtime.as_deref(), handler.as_deref(), code_data.as_deref())
{
match ensure_code_dir(name, data, &code_sha256) {
Ok(code_dir) => {
let result = executor::execute_function(
rt,
hndlr,
&code_dir,
&event_json,
&invocation_env,
timeout,
);
let parsed_payload: Value = serde_json::from_str(&result.payload)
.unwrap_or(Value::String(result.payload.clone()));
(parsed_payload, result.error, result.logs)
}
Err(e) => {
warn!(error = %e, function_name = name, "Failed to extract function code");
let err_payload = json!({
"errorMessage": format!("Failed to prepare function code: {}", e),
"errorType": "ServiceException"
});
(
err_payload,
Some("ServiceException".to_string()),
String::new(),
)
}
}
} else {
warn!(
function_name = name,
runtime = ?runtime,
handler = ?handler,
has_code = code_data.is_some(),
"Falling back to mock response (no executable code)"
);
(
json!({ "statusCode": 200, "body": "{}" }),
None,
String::new(),
)
};
let status_code: u16 = 200;
let record = InvocationRecord {
invocation_id: request_id,
invocation_type: invocation_type.to_string(),
payload: payload.clone(),
response: response_payload.clone(),
status_code,
timestamp: now_iso8601(),
};
if let Some(mut f) = state.functions.get_mut(name) {
f.invocations.push(record);
if f.invocations.len() > 1000 {
f.invocations.remove(0);
}
}
let mut response = json!({
"StatusCode": 200u64,
"Payload": response_payload,
"__status_code": 200u64,
});
let mut headers = serde_json::Map::new();
headers.insert(
"X-Awsim-Memory-MB".to_string(),
Value::String(memory_size.to_string()),
);
if let Some(err_type) = exec_error {
response["FunctionError"] = Value::String(err_type.clone());
headers.insert("X-Amz-Function-Error".to_string(), Value::String(err_type));
}
if opt_str(input, "LogType").is_some_and(|v| v.eq_ignore_ascii_case("Tail")) {
use base64::Engine as _;
const TAIL_BYTES: usize = 4096;
let tail = if exec_logs.len() > TAIL_BYTES {
let cut = exec_logs.len() - TAIL_BYTES;
let mut start = cut;
while start < exec_logs.len() && !exec_logs.is_char_boundary(start) {
start += 1;
}
&exec_logs[start..]
} else {
exec_logs.as_str()
};
let encoded = base64::engine::general_purpose::STANDARD.encode(tail.as_bytes());
response["LogResult"] = Value::String(encoded.clone());
headers.insert("X-Amz-Log-Result".to_string(), Value::String(encoded));
}
response["__headers"] = Value::Object(headers);
Ok(response)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operations::functions::create_function;
fn ctx() -> RequestContext {
RequestContext::new("lambda", "us-east-1")
}
fn empty_zip_b64() -> String {
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
let bytes: [u8; 22] = [
0x50, 0x4b, 0x05, 0x06, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
BASE64.encode(bytes)
}
fn create_test_fn(state: &LambdaState) {
create_function(
state,
&json!({
"FunctionName": "f",
"Role": "arn:aws:iam::000000000000:role/test",
"Code": { "ZipFile": empty_zip_b64() },
}),
&ctx(),
)
.unwrap();
}
#[test]
fn event_invocation_returns_202_with_no_payload() {
let state = LambdaState::default();
create_test_fn(&state);
let resp = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "Event",
"Payload": {"hello": "world"},
}),
&ctx(),
)
.unwrap();
assert_eq!(resp["StatusCode"], json!(202));
assert_eq!(resp["__status_code"], json!(202));
assert!(resp.get("Payload").is_none());
assert!(resp.get("FunctionError").is_none());
}
#[test]
fn event_invocation_does_not_record_invocation_history() {
let state = LambdaState::default();
create_test_fn(&state);
invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "Event",
"Payload": {},
}),
&ctx(),
)
.unwrap();
let f = state.functions.get("f").unwrap();
assert!(f.invocations.is_empty());
}
#[test]
fn dry_run_invocation_returns_204_with_no_payload() {
let state = LambdaState::default();
create_test_fn(&state);
let resp = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "DryRun",
}),
&ctx(),
)
.unwrap();
assert_eq!(resp["StatusCode"], json!(204));
assert_eq!(resp["__status_code"], json!(204));
assert!(resp.get("Payload").is_none());
}
#[test]
fn request_response_invocation_returns_200_with_payload() {
let state = LambdaState::default();
create_test_fn(&state);
let resp = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "RequestResponse",
"Payload": {},
}),
&ctx(),
)
.unwrap();
assert_eq!(resp["StatusCode"], json!(200));
assert!(resp.get("Payload").is_some());
}
#[test]
fn log_type_tail_emits_log_result_field_and_header() {
let state = LambdaState::default();
create_test_fn(&state);
let resp = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "RequestResponse",
"LogType": "Tail",
"Payload": {},
}),
&ctx(),
)
.unwrap();
let body_log = resp.get("LogResult").and_then(Value::as_str).unwrap();
let header_log = resp["__headers"]["X-Amz-Log-Result"].as_str().unwrap();
assert_eq!(body_log, header_log);
}
#[test]
fn log_type_tail_omitted_when_not_requested() {
let state = LambdaState::default();
create_test_fn(&state);
let resp = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "RequestResponse",
"Payload": {},
}),
&ctx(),
)
.unwrap();
assert!(resp.get("LogResult").is_none());
assert!(resp["__headers"].get("X-Amz-Log-Result").is_none());
}
#[test]
fn recursive_loop_terminate_blocks_self_invoke_chain() {
let state = LambdaState::default();
create_test_fn(&state);
let _outer = RecursionFrame::push("f");
let err = invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "RequestResponse" }),
&ctx(),
)
.unwrap_err();
assert_eq!(err.code, "InvalidRequestContentException");
assert!(
err.message.contains("RecursiveLoop=Terminate"),
"error must explain the recursion gate: {}",
err.message
);
}
#[test]
fn recursive_loop_allow_permits_self_invoke() {
let state = LambdaState::default();
create_test_fn(&state);
state.functions.get_mut("f").unwrap().recursive_loop = "Allow".to_string();
let _outer = RecursionFrame::push("f");
invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "RequestResponse" }),
&ctx(),
)
.unwrap();
}
#[test]
fn recursion_frame_pops_on_return() {
let state = LambdaState::default();
create_test_fn(&state);
invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "RequestResponse" }),
&ctx(),
)
.unwrap();
assert!(RecursionFrame::snapshot().is_empty());
}
#[test]
fn reserved_concurrency_caps_invocations_at_429() {
let state = LambdaState::default();
create_test_fn(&state);
state
.functions
.get_mut("f")
.unwrap()
.reserved_concurrent_executions = Some(1);
let counter = InvocationSlot::acquire(&state, "f");
let err = invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "RequestResponse",
}),
&ctx(),
)
.unwrap_err();
assert_eq!(err.code, "TooManyRequestsException");
let extras = err.extras.as_ref().expect("extras populated");
assert_eq!(
extras.get("Reason").and_then(|v| v.as_str()),
Some("ConcurrentInvocationLimitExceeded")
);
drop(InvocationSlot::from_acquired(counter));
invoke(
&state,
&json!({
"FunctionName": "f",
"InvocationType": "RequestResponse",
}),
&ctx(),
)
.unwrap();
}
#[test]
fn reserved_concurrency_unset_allows_unbounded_invokes() {
let state = LambdaState::default();
create_test_fn(&state);
for _ in 0..10 {
invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "RequestResponse" }),
&ctx(),
)
.unwrap();
}
}
#[test]
fn reserved_concurrency_slot_decrements_on_completion() {
let state = LambdaState::default();
create_test_fn(&state);
state
.functions
.get_mut("f")
.unwrap()
.reserved_concurrent_executions = Some(2);
invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "RequestResponse" }),
&ctx(),
)
.unwrap();
assert_eq!(InvocationSlot::current(&state, "f"), 0);
}
#[test]
fn dry_run_does_not_count_against_concurrency() {
let state = LambdaState::default();
create_test_fn(&state);
state
.functions
.get_mut("f")
.unwrap()
.reserved_concurrent_executions = Some(1);
let _occupied = InvocationSlot::from_acquired(InvocationSlot::acquire(&state, "f"));
let resp = invoke(
&state,
&json!({ "FunctionName": "f", "InvocationType": "DryRun" }),
&ctx(),
)
.unwrap();
assert_eq!(resp["StatusCode"], json!(204));
}
}