lambda_simulator/
extensions_api.rs

1//! Lambda Extensions API HTTP endpoints implementation.
2//!
3//! Implements the Lambda Extensions API as documented at:
4//! <https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html>
5
6use crate::extension::{ExtensionState, LifecycleEvent, RegisterRequest};
7use crate::extension_readiness::ExtensionReadinessTracker;
8use crate::simulator::SimulatorConfig;
9use crate::state::RuntimeState;
10use axum::{
11    Json, Router,
12    extract::State,
13    http::{HeaderMap, HeaderValue, StatusCode},
14    response::{IntoResponse, Response},
15    routing::{get, post},
16};
17use std::sync::Arc;
18
19/// Shared state for Extensions API handlers.
20#[derive(Clone)]
21pub(crate) struct ExtensionsApiState {
22    pub extensions: Arc<ExtensionState>,
23    pub readiness: Arc<ExtensionReadinessTracker>,
24    pub config: Arc<SimulatorConfig>,
25    pub runtime: Arc<RuntimeState>,
26}
27
28/// Creates the Extensions API router.
29///
30/// # Arguments
31///
32/// * `state` - Shared extensions API state
33///
34/// # Returns
35///
36/// An axum router configured with all Extensions API endpoints.
37pub(crate) fn create_extensions_api_router(state: ExtensionsApiState) -> Router {
38    Router::new()
39        .route("/2020-01-01/extension/register", post(register_extension))
40        .route("/2020-01-01/extension/event/next", get(next_event))
41        .with_state(state)
42}
43
44/// POST /2020-01-01/extension/register
45///
46/// Registers an extension with the Lambda environment.
47///
48/// Per the Lambda Extensions API specification, extensions can only register
49/// during the initialization phase. Any registration attempts after the runtime
50/// has called `/next` for the first time will be rejected.
51async fn register_extension(
52    State(state): State<ExtensionsApiState>,
53    headers: HeaderMap,
54    Json(request): Json<RegisterRequest>,
55) -> Response {
56    if state.runtime.is_initialized().await {
57        return (
58            StatusCode::FORBIDDEN,
59            "Extension registration is only allowed during initialization phase",
60        )
61            .into_response();
62    }
63
64    let extension_name = match headers.get("Lambda-Extension-Name") {
65        Some(name) => match name.to_str() {
66            Ok(s) => s.to_string(),
67            Err(_) => {
68                return (
69                    StatusCode::BAD_REQUEST,
70                    "Invalid Lambda-Extension-Name header",
71                )
72                    .into_response();
73            }
74        },
75        None => {
76            return (
77                StatusCode::BAD_REQUEST,
78                "Missing Lambda-Extension-Name header",
79            )
80                .into_response();
81        }
82    };
83
84    let extension = state
85        .extensions
86        .register(extension_name.clone(), request.events.clone())
87        .await;
88
89    let events_str = request
90        .events
91        .iter()
92        .map(|e| format!("{:?}", e))
93        .collect::<Vec<_>>()
94        .join(", ");
95    tracing::info!(target: "lambda_lifecycle", "🔌 Extension registered: {} (events: {})", extension_name, events_str);
96
97    let mut response_headers = HeaderMap::new();
98
99    if let Ok(id) = HeaderValue::from_str(&extension.id) {
100        response_headers.insert("Lambda-Extension-Identifier", id);
101    }
102
103    if let Ok(name) = HeaderValue::from_str(&state.config.function_name) {
104        response_headers.insert("Lambda-Extension-Function-Name", name);
105    }
106
107    if let Ok(version) = HeaderValue::from_str(&state.config.function_version) {
108        response_headers.insert("Lambda-Extension-Function-Version", version);
109    }
110
111    // The lambda_extension crate expects these fields in the response body
112    let response_body = serde_json::json!({
113        "functionName": state.config.function_name,
114        "functionVersion": state.config.function_version,
115        "handler": state.config.handler.clone().unwrap_or_else(|| "handler".to_string()),
116        "accountId": state.config.account_id.clone().unwrap_or_else(|| "123456789012".to_string()),
117        "logGroupName": state.config.log_group_name,
118        "logStreamName": state.config.log_stream_name,
119    });
120
121    (StatusCode::OK, response_headers, Json(response_body)).into_response()
122}
123
124/// GET /2020-01-01/extension/event/next
125///
126/// Retrieves the next lifecycle event for an extension.
127/// This is a long-poll endpoint that blocks until an event is available.
128///
129/// When an extension polls this endpoint, it signals that the extension has
130/// completed its post-invocation work and is ready for the next event.
131/// This is used to track extension readiness for the lifecycle coordination.
132///
133/// During shutdown, if an extension has already received the SHUTDOWN event,
134/// polling this endpoint again signals that the extension has completed its
135/// cleanup work (shutdown acknowledgment).
136async fn next_event(State(state): State<ExtensionsApiState>, headers: HeaderMap) -> Response {
137    use crate::simulator::SimulatorPhase;
138
139    let extension_id = match headers.get("Lambda-Extension-Identifier") {
140        Some(id) => match id.to_str() {
141            Ok(s) => s.to_string(),
142            Err(_) => {
143                return (
144                    StatusCode::BAD_REQUEST,
145                    "Invalid Lambda-Extension-Identifier header",
146                )
147                    .into_response();
148            }
149        },
150        None => {
151            return (
152                StatusCode::BAD_REQUEST,
153                "Missing Lambda-Extension-Identifier header",
154            )
155                .into_response();
156        }
157    };
158
159    match state.extensions.get_extension(&extension_id).await {
160        Some(ext) => {
161            state.readiness.mark_extension_ready(&extension_id).await;
162            tracing::info!(target: "lambda_lifecycle", "⏳ Extension polling /next: {} (waiting)", ext.name);
163
164            match state.extensions.next_event(&extension_id).await {
165                Some(event) => {
166                    let is_shutdown = matches!(event, LifecycleEvent::Shutdown { .. });
167                    let is_invoke = matches!(event, LifecycleEvent::Invoke { .. });
168
169                    if is_invoke {
170                        tracing::info!(target: "lambda_lifecycle", "📨 Extension received INVOKE: {}", ext.name);
171                    } else if is_shutdown {
172                        tracing::info!(target: "lambda_lifecycle", "🛑 Extension received SHUTDOWN: {}", ext.name);
173                        state
174                            .extensions
175                            .mark_shutdown_acknowledged(&extension_id)
176                            .await;
177                    }
178
179                    Json(&event).into_response()
180                }
181                None => {
182                    if state.runtime.get_phase().await == SimulatorPhase::ShuttingDown {
183                        state
184                            .extensions
185                            .mark_shutdown_acknowledged(&extension_id)
186                            .await;
187                    }
188                    (StatusCode::INTERNAL_SERVER_ERROR, "Extension not found").into_response()
189                }
190            }
191        }
192        None => (StatusCode::FORBIDDEN, "Extension not registered").into_response(),
193    }
194}