rivet_adapter_lambda/
adapter.rs1use std::panic::{catch_unwind, AssertUnwindSafe};
2use std::sync::{Arc, Mutex, OnceLock};
3
4use rivet_core::{set_current, Dispatcher};
5use rivet_http::Response;
6use lambda_http::{
7 run, service_fn, Body, Error as LambdaError, Request as LambdaRequest,
8 Response as LambdaResponse,
9};
10
11use crate::converters::{AdapterLambdaResponse, AdapterRequest, AdapterRequestInput};
12use crate::error::AdapterError;
13use crate::options::AdapterOptions;
14
15type DispatcherBuilder =
16 dyn Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static;
17
18struct AdapterConfig {
19 builder: Arc<DispatcherBuilder>,
20 options: AdapterOptions,
21}
22
23static ADAPTER_CONFIG: OnceLock<AdapterConfig> = OnceLock::new();
24static APP: OnceLock<Mutex<Option<Arc<dyn Dispatcher>>>> = OnceLock::new();
25
26pub fn configure<F>(builder: F) -> Result<(), AdapterError>
31where
32 F: Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static,
33{
34 configure_with_options(builder, AdapterOptions::default())
35}
36
37pub fn configure_with_options<F>(builder: F, options: AdapterOptions) -> Result<(), AdapterError>
42where
43 F: Fn() -> Result<Arc<dyn Dispatcher>, AdapterError> + Send + Sync + 'static,
44{
45 ADAPTER_CONFIG
46 .set(AdapterConfig {
47 builder: Arc::new(builder),
48 options,
49 })
50 .map_err(|_| AdapterError::already_configured())
51}
52
53pub async fn serve() -> Result<(), LambdaError> {
55 run(service_fn(handle)).await
56}
57
58pub async fn handle(req: LambdaRequest) -> Result<LambdaResponse<Body>, LambdaError> {
64 let config = match ADAPTER_CONFIG.get() {
65 Some(config) => config,
66 None => {
67 return AdapterLambdaResponse::try_from(Response::internal_error())
68 .map(LambdaResponse::from);
69 }
70 };
71
72 let request: rivet_http::Request = match AdapterRequest::try_from(AdapterRequestInput {
73 request: &req,
74 body_limit: config.options.body_limit,
75 })
76 .map(rivet_http::Request::from)
77 {
78 Ok(req) => req,
79 Err(error) => {
80 return AdapterLambdaResponse::try_from(Response::from(error)).map(LambdaResponse::from)
81 }
82 };
83
84 let app = match dispatcher() {
85 Ok(app) => app,
86 Err(_) => {
87 return AdapterLambdaResponse::try_from(Response::internal_error())
88 .map(LambdaResponse::from);
89 }
90 };
91
92 let response = match catch_unwind(AssertUnwindSafe(|| app.dispatch(request))) {
93 Ok(res) => res,
94 Err(_) => Response::internal_error(),
95 };
96
97 AdapterLambdaResponse::try_from(response).map(LambdaResponse::from)
98}
99
100fn dispatcher() -> Result<Arc<dyn Dispatcher>, AdapterError> {
105 let config = ADAPTER_CONFIG
106 .get()
107 .ok_or_else(AdapterError::not_configured)?;
108
109 let cache = APP.get_or_init(|| Mutex::new(None));
110
111 let mut guard = cache
112 .lock()
113 .map_err(|_| AdapterError::initialization("application cache lock poisoned"))?;
114
115 if let Some(app) = guard.as_ref() {
116 return Ok(Arc::clone(app));
117 }
118
119 let app = (config.builder)().map_err(|err| AdapterError::initialization(err.to_string()))?;
120 let _ = set_current(Arc::clone(&app));
121 *guard = Some(Arc::clone(&app));
122
123 Ok(app)
124}