lambda_simulator/
extensions_api.rs1use crate::extension::{ExtensionState, LifecycleEvent, RegisterRequest};
7use crate::extension_readiness::ExtensionReadinessTracker;
8use crate::simulator::SimulatorConfig;
9use crate::state::RuntimeState;
10use axum::{
11 Json, Router,
12 extract::State,
13 http::{HeaderMap, HeaderValue, StatusCode},
14 response::{IntoResponse, Response},
15 routing::{get, post},
16};
17use std::sync::Arc;
18
19#[derive(Clone)]
21pub(crate) struct ExtensionsApiState {
22 pub extensions: Arc<ExtensionState>,
23 pub readiness: Arc<ExtensionReadinessTracker>,
24 pub config: Arc<SimulatorConfig>,
25 pub runtime: Arc<RuntimeState>,
26}
27
28pub(crate) fn create_extensions_api_router(state: ExtensionsApiState) -> Router {
38 Router::new()
39 .route("/2020-01-01/extension/register", post(register_extension))
40 .route("/2020-01-01/extension/event/next", get(next_event))
41 .with_state(state)
42}
43
44async fn register_extension(
52 State(state): State<ExtensionsApiState>,
53 headers: HeaderMap,
54 Json(request): Json<RegisterRequest>,
55) -> Response {
56 if state.runtime.is_initialized().await {
57 return (
58 StatusCode::FORBIDDEN,
59 "Extension registration is only allowed during initialization phase",
60 )
61 .into_response();
62 }
63
64 let extension_name = match headers.get("Lambda-Extension-Name") {
65 Some(name) => match name.to_str() {
66 Ok(s) => s.to_string(),
67 Err(_) => {
68 return (
69 StatusCode::BAD_REQUEST,
70 "Invalid Lambda-Extension-Name header",
71 )
72 .into_response();
73 }
74 },
75 None => {
76 return (
77 StatusCode::BAD_REQUEST,
78 "Missing Lambda-Extension-Name header",
79 )
80 .into_response();
81 }
82 };
83
84 let extension = state
85 .extensions
86 .register(extension_name.clone(), request.events.clone())
87 .await;
88
89 let events_str = request
90 .events
91 .iter()
92 .map(|e| format!("{:?}", e))
93 .collect::<Vec<_>>()
94 .join(", ");
95 tracing::info!(target: "lambda_lifecycle", "🔌 Extension registered: {} (events: {})", extension_name, events_str);
96
97 let mut response_headers = HeaderMap::new();
98
99 if let Ok(id) = HeaderValue::from_str(&extension.id) {
100 response_headers.insert("Lambda-Extension-Identifier", id);
101 }
102
103 if let Ok(name) = HeaderValue::from_str(&state.config.function_name) {
104 response_headers.insert("Lambda-Extension-Function-Name", name);
105 }
106
107 if let Ok(version) = HeaderValue::from_str(&state.config.function_version) {
108 response_headers.insert("Lambda-Extension-Function-Version", version);
109 }
110
111 let response_body = serde_json::json!({
113 "functionName": state.config.function_name,
114 "functionVersion": state.config.function_version,
115 "handler": state.config.handler.clone().unwrap_or_else(|| "handler".to_string()),
116 "accountId": state.config.account_id.clone().unwrap_or_else(|| "123456789012".to_string()),
117 "logGroupName": state.config.log_group_name,
118 "logStreamName": state.config.log_stream_name,
119 });
120
121 (StatusCode::OK, response_headers, Json(response_body)).into_response()
122}
123
124async fn next_event(State(state): State<ExtensionsApiState>, headers: HeaderMap) -> Response {
137 use crate::simulator::SimulatorPhase;
138
139 let extension_id = match headers.get("Lambda-Extension-Identifier") {
140 Some(id) => match id.to_str() {
141 Ok(s) => s.to_string(),
142 Err(_) => {
143 return (
144 StatusCode::BAD_REQUEST,
145 "Invalid Lambda-Extension-Identifier header",
146 )
147 .into_response();
148 }
149 },
150 None => {
151 return (
152 StatusCode::BAD_REQUEST,
153 "Missing Lambda-Extension-Identifier header",
154 )
155 .into_response();
156 }
157 };
158
159 match state.extensions.get_extension(&extension_id).await {
160 Some(ext) => {
161 state.readiness.mark_extension_ready(&extension_id).await;
162 tracing::info!(target: "lambda_lifecycle", "⏳ Extension polling /next: {} (waiting)", ext.name);
163
164 match state.extensions.next_event(&extension_id).await {
165 Some(event) => {
166 let is_shutdown = matches!(event, LifecycleEvent::Shutdown { .. });
167 let is_invoke = matches!(event, LifecycleEvent::Invoke { .. });
168
169 if is_invoke {
170 tracing::info!(target: "lambda_lifecycle", "📨 Extension received INVOKE: {}", ext.name);
171 } else if is_shutdown {
172 tracing::info!(target: "lambda_lifecycle", "🛑 Extension received SHUTDOWN: {}", ext.name);
173 state
174 .extensions
175 .mark_shutdown_acknowledged(&extension_id)
176 .await;
177 }
178
179 Json(&event).into_response()
180 }
181 None => {
182 if state.runtime.get_phase().await == SimulatorPhase::ShuttingDown {
183 state
184 .extensions
185 .mark_shutdown_acknowledged(&extension_id)
186 .await;
187 }
188 (StatusCode::INTERNAL_SERVER_ERROR, "Extension not found").into_response()
189 }
190 }
191 }
192 None => (StatusCode::FORBIDDEN, "Extension not registered").into_response(),
193 }
194}