1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
//! `LambdaService` `stream` family — extracted from service.rs by audit-2026-05-19.
use super::*;
impl LambdaService {
/// `InvokeWithResponseStream` — invoke the function and serialize
/// its response as a sequence of `application/vnd.amazon.eventstream`
/// frames. AWS uses this protocol for response-streaming Lambda
/// invocations (Node.js `awslambda.streamifyResponse`, Python
/// streaming handlers, custom runtimes that flush mid-handler).
///
/// On success: zero or more `PayloadChunk` events (one per chunk
/// the RIE flushed) followed by an `InvokeComplete` event with
/// `ErrorCode = null`. On a function error (uncaught exception in
/// the handler) or an infrastructure error (timeout, container
/// crash): an `InvokeComplete` with non-null `ErrorCode`/
/// `ErrorDetails`. The HTTP status itself is always 200 — failures
/// surface inside the trailing event, matching AWS.
pub(super) async fn invoke_with_response_stream(
&self,
function_name: &str,
account_id: &str,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
// Resolve the function under the same rules as buffered Invoke
// — qualifier, version snapshots, attached layers, code-zip
// presence — but without the InvocationType branch (streaming
// is always synchronous).
let qualifier = req.query_params.get("Qualifier").map(String::as_str);
let resolved_version: Option<String> = {
let accounts = self.state.read();
let empty = LambdaState::new(account_id, "");
let state = accounts.get(account_id).unwrap_or(&empty);
crate::service::resolve_qualifier_to_version(state, function_name, qualifier)
};
let executed_version = resolved_version
.clone()
.unwrap_or_else(|| "$LATEST".to_string());
let (func, layer_zips) = {
let accounts = self.state.read();
let empty = LambdaState::new(account_id, "");
let state = accounts.get(account_id).unwrap_or(&empty);
let func = match resolved_version.as_deref() {
Some(v) => state
.function_version_snapshots
.get(function_name)
.and_then(|m| m.get(v))
.cloned()
.or_else(|| state.functions.get(function_name).cloned()),
None => state.functions.get(function_name).cloned(),
}
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!(
"Function not found: arn:aws:lambda:{}:{}:function:{}",
state.region, state.account_id, function_name
),
)
})?;
let mut zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
for attached in &func.layers {
if let Some(b) =
parse_layer_version_arn(&attached.arn).and_then(|(acct, name, ver)| {
accounts
.get(&acct)
.and_then(|s| s.layers.get(&name))
.and_then(|l| l.versions.iter().find(|v| v.version == ver))
.and_then(|v| v.code_zip.clone())
})
{
zips.push(b);
}
}
(func, zips)
};
if func.code_zip.is_none() && func.package_type != "Image" {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"Function has no deployment package",
));
}
let runtime = self.runtime.as_ref().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"ServiceException",
"Docker/Podman is required for Lambda execution but is not available",
)
})?;
// Drive the streaming RIE call and assemble the eventstream
// body. We buffer all frames before returning — `AwsResponse`
// is byte-buffered today — but the chunk boundaries the RIE
// flushed are preserved as separate `PayloadChunk` events, so
// SDK parsers see exactly the streaming structure they expect.
let mut frames: Vec<u8> = Vec::new();
let invoke_result = runtime
.invoke_streaming(&func, &req.body, &layer_zips)
.await;
let (error_code, error_details) = match invoke_result {
Ok(mut stream) => {
let mut last_chunk: Option<bytes::Bytes> = None;
let mut had_chunks = false;
loop {
match stream.next_chunk().await {
Ok(Some(chunk)) => {
had_chunks = true;
frames.extend_from_slice(&crate::eventstream::payload_chunk_frame(
&chunk,
));
last_chunk = Some(chunk);
}
Ok(None) => break,
Err(e) => {
// Stream-invoke contract: errors are encoded
// in-stream as a terminal InvokeComplete frame
// with HTTP 200. Returning HTTP 500 here breaks
// SDK parsers expecting that envelope.
tracing::error!(function = %function_name, error = %e, "Lambda streaming chunk read failed");
let detail = format!("Lambda streaming read failed: {e}");
frames.extend_from_slice(&crate::eventstream::invoke_complete_frame(
Some("Runtime.StreamReadFailure"),
Some(&detail),
"",
));
return Ok(AwsResponse {
status: StatusCode::OK,
headers: http::HeaderMap::new(),
body: bytes::Bytes::from(frames).into(),
content_type: "application/vnd.amazon.eventstream".to_string(),
});
}
}
}
// The Lambda runtime returns 200 even when the user
// handler threw, packing `errorMessage`/`errorType`
// into the buffered body. Streaming handlers do the
// same on the final chunk. Inspect the last chunk we
// saw and surface that as a function error in the
// terminal `InvokeComplete` event.
let mut error: Option<(String, String)> = None;
if had_chunks {
if let Some(bytes) = last_chunk {
if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
if let Some(obj) = v.as_object() {
if obj.contains_key("errorMessage") || obj.contains_key("errorType")
{
let etype = obj
.get("errorType")
.and_then(|x| x.as_str())
.unwrap_or("Runtime.Unknown")
.to_string();
let emsg = obj
.get("errorMessage")
.and_then(|x| x.as_str())
.unwrap_or("function error")
.to_string();
error = Some((etype, emsg));
}
}
}
}
}
match error {
Some((code, details)) => (Some(code), Some(details)),
None => (None, None),
}
}
Err(e) => {
tracing::error!(function = %function_name, error = %e, "Lambda streaming invocation failed");
(
Some("Runtime.InvocationFailure".to_string()),
Some(e.to_string()),
)
}
};
frames.extend_from_slice(&crate::eventstream::invoke_complete_frame(
error_code.as_deref(),
error_details.as_deref(),
"",
));
let mut resp = AwsResponse {
status: StatusCode::OK,
content_type: "application/vnd.amazon.eventstream".to_string(),
body: fakecloud_core::service::ResponseBody::Bytes(bytes::Bytes::from(frames)),
headers: http::HeaderMap::new(),
};
if let Ok(v) = http::HeaderValue::from_str(&executed_version) {
resp.headers
.insert(http::HeaderName::from_static("x-amz-executed-version"), v);
}
Ok(resp)
}
}