Skip to main content

rivet_adapter_lambda/
adapter.rs

1use std::panic::{catch_unwind, AssertUnwindSafe};
2use std::sync::{Arc, Mutex, OnceLock};
3
4use rivet_core::{set_current, Dispatcher};
5use rivet_http::Response;
6use lambda_http::{
7    run, service_fn, Body, Error as LambdaError, Request as LambdaRequest,
8    Response as LambdaResponse,
9};
10
11use crate::converters::{AdapterLambdaResponse, AdapterRequest, AdapterRequestInput};
12use crate::error::AdapterError;
13use crate::options::AdapterOptions;
14
15type DispatcherBuilder =
16    dyn Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static;
17
18struct AdapterConfig {
19    builder: Arc<DispatcherBuilder>,
20    options: AdapterOptions,
21}
22
23static ADAPTER_CONFIG: OnceLock<AdapterConfig> = OnceLock::new();
24static APP: OnceLock<Mutex<Option<Arc<dyn Dispatcher>>>> = OnceLock::new();
25
26/// Configure the Lambda adapter with a dispatcher builder and default options.
27///
28/// This can only be called once per process. Additional calls return
29/// [`AdapterError::AlreadyConfigured`].
30pub fn configure<F>(builder: F) -> Result<(), AdapterError>
31where
32    F: Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static,
33{
34    configure_with_options(builder, AdapterOptions::default())
35}
36
37/// Configure the Lambda adapter with a dispatcher builder and explicit options.
38///
39/// The builder is invoked lazily on first request and the resulting dispatcher
40/// is cached for subsequent invocations in the same execution environment.
41pub fn configure_with_options<F>(builder: F, options: AdapterOptions) -> Result<(), AdapterError>
42where
43    F: Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static,
44{
45    ADAPTER_CONFIG
46        .set(AdapterConfig {
47            builder: Arc::new(builder),
48            options,
49        })
50        .map_err(|_| AdapterError::already_configured())
51}
52
53/// Start the Lambda runtime event loop using [`handle`] as the request handler.
54pub async fn serve() -> Result<(), LambdaError> {
55    run(service_fn(handle)).await
56}
57
58/// Handle a single Lambda invocation request.
59///
60/// Request conversion failures are mapped to rivet responses:
61/// - unsupported methods => `501`
62/// - body limit exceeded => `413`
63pub async fn handle(req: LambdaRequest) -> Result<LambdaResponse<Body>, LambdaError> {
64    let config = match ADAPTER_CONFIG.get() {
65        Some(config) => config,
66        None => {
67            return AdapterLambdaResponse::try_from(Response::internal_error())
68                .map(LambdaResponse::from);
69        }
70    };
71
72    let request: rivet_http::Request = match AdapterRequest::try_from(AdapterRequestInput {
73        request: &req,
74        body_limit: config.options.body_limit,
75    })
76    .map(rivet_http::Request::from)
77    {
78        Ok(req) => req,
79        Err(error) => {
80            return AdapterLambdaResponse::try_from(Response::from(error)).map(LambdaResponse::from)
81        }
82    };
83
84    let app = match dispatcher() {
85        Ok(app) => app,
86        Err(_) => {
87            return AdapterLambdaResponse::try_from(Response::internal_error())
88                .map(LambdaResponse::from);
89        }
90    };
91
92    let response = match catch_unwind(AssertUnwindSafe(|| app.dispatch(request))) {
93        Ok(res) => res,
94        Err(_) => Response::internal_error(),
95    };
96
97    AdapterLambdaResponse::try_from(response).map(LambdaResponse::from)
98}
99
100/// Resolve or initialize the cached dispatcher for the current execution environment.
101///
102/// The configured builder is executed at most once per process and the result
103/// is reused for warm invocations.
104fn dispatcher() -> Result<Arc<dyn Dispatcher>, AdapterError> {
105    let config = ADAPTER_CONFIG
106        .get()
107        .ok_or_else(AdapterError::not_configured)?;
108
109    let cache = APP.get_or_init(|| Mutex::new(None));
110
111    let mut guard = cache
112        .lock()
113        .map_err(|_| AdapterError::initialization("application cache lock poisoned"))?;
114
115    if let Some(app) = guard.as_ref() {
116        return Ok(Arc::clone(app));
117    }
118
119    let app = (config.builder)().map_err(|err| AdapterError::initialization(err.to_string()))?;
120    let _ = set_current(Arc::clone(&app));
121    *guard = Some(Arc::clone(&app));
122
123    Ok(app)
124}