vercel_runtime/
lib.rs

1use base64::prelude::*;
2use hyper::server::conn::http1;
3use hyper::service::service_fn as hyper_service_fn;
4use hyper_util::rt::TokioIo;
5use serde::Serialize;
6use std::collections::VecDeque;
7use std::convert::Infallible;
8use std::env;
9use std::future::Future;
10use std::io::prelude::*;
11use std::net::SocketAddr;
12use std::os::unix::net::UnixStream;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17use tokio::net::TcpListener;
18use tower::Service;
19
20pub use hyper::Response;
21pub use types::{Error, ResponseBody};
22pub type Request = hyper::Request<hyper::body::Incoming>;
23
24#[cfg(feature = "axum")]
25pub mod axum;
26
27mod ipc;
28mod types;
29use crate::ipc::core::{EndMessage, StartMessage};
30use crate::ipc::log::{Level, LogMessage};
31use crate::types::IntoFunctionResponse;
32
33static IPC_READY: AtomicBool = AtomicBool::new(false);
34static INIT_LOG_BUF_MAX_BYTES: usize = 1_000_000;
35
36lazy_static::lazy_static! {
37    static ref INIT_LOG_BUFFER: Arc<Mutex<(VecDeque<String>, usize)>> = {
38        register_exit_handler();
39        Arc::new(Mutex::new((VecDeque::new(), 0)))
40    };
41}
42
43// Register exit handler to flush buffered messages
44fn register_exit_handler() {
45    extern "C" fn exit_handler() {
46        flush_init_log_buf_to_stderr();
47    }
48    unsafe {
49        libc::atexit(exit_handler);
50    }
51}
52
53#[derive(Clone)]
54pub struct LogContext {
55    ipc_stream: Option<Arc<Mutex<UnixStream>>>,
56    invocation_id: Option<String>,
57    request_id: Option<u64>,
58}
59
60impl LogContext {
61    pub fn new(
62        ipc_stream: Option<Arc<Mutex<UnixStream>>>,
63        invocation_id: Option<String>,
64        request_id: Option<u64>,
65    ) -> Self {
66        Self {
67            ipc_stream,
68            invocation_id,
69            request_id,
70        }
71    }
72
73    pub fn info(&self, msg: &str) {
74        self.log(Level::Info, msg);
75    }
76
77    pub fn error(&self, msg: &str) {
78        self.log(Level::Error, msg);
79    }
80
81    pub fn warn(&self, msg: &str) {
82        self.log(Level::Warn, msg);
83    }
84
85    pub fn debug(&self, msg: &str) {
86        self.log(Level::Debug, msg);
87    }
88
89    fn log(&self, level: Level, msg: &str) {
90        if let (Some(inv_id), Some(req_id)) = (&self.invocation_id, &self.request_id) {
91            let log = LogMessage::with_level(inv_id.clone(), *req_id, msg, level);
92            if let Err(_e) = enqueue_or_send_message(&self.ipc_stream, log) {
93                // Failed to send or queue log message
94            }
95        } else {
96            // Fall back to regular println when no request context
97            println!("{:?}: {}", level, msg);
98        }
99    }
100}
101
102#[derive(Clone)]
103pub struct AppState {
104    pub log_context: LogContext,
105}
106
107impl AppState {
108    pub fn new(log_context: LogContext) -> Self {
109        Self { log_context }
110    }
111}
112
113fn send_message<T: Serialize>(stream: &Arc<Mutex<UnixStream>>, message: T) -> Result<(), Error> {
114    let json_str = serde_json::to_string(&message)?;
115    let msg = format!("{json_str}\0");
116
117    let mut stream = stream.lock().map_err(|e| {
118        Box::new(std::io::Error::other(format!(
119            "Failed to acquire stream lock: {}",
120            e
121        ))) as Error
122    })?;
123
124    stream.write_all(msg.as_bytes())?;
125    stream.flush()?;
126    Ok(())
127}
128
129fn enqueue_or_send_message<T: Serialize>(
130    stream: &Option<Arc<Mutex<UnixStream>>>,
131    message: T,
132) -> Result<(), Error> {
133    if IPC_READY.load(Ordering::Relaxed)
134        && let Some(stream) = stream
135    {
136        return send_message(stream, message);
137    }
138
139    // Buffer the message if IPC is not ready
140    let json_str = serde_json::to_string(&message)?;
141    let msg_len = json_str.len();
142
143    if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
144        if buffer.1 + msg_len <= INIT_LOG_BUF_MAX_BYTES {
145            buffer.0.push_back(json_str);
146            buffer.1 += msg_len;
147        } else {
148            // Fallback to stderr if buffer is full - decode base64
149            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str)
150                && let Some(payload) = parsed.get("payload")
151                && let Some(msg) = payload.get("message")
152                && let Some(msg_str) = msg.as_str()
153                && let Ok(decoded) = BASE64_STANDARD.decode(msg_str)
154                && let Ok(text) = String::from_utf8(decoded)
155            {
156                eprint!("{}", text);
157            }
158        }
159    }
160
161    Ok(())
162}
163
164fn flush_init_log_buffer(stream: &Option<Arc<Mutex<UnixStream>>>) {
165    if let Some(stream) = stream {
166        if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
167            while let Some(json_str) = buffer.0.pop_front() {
168                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&json_str)
169                    && let Err(_e) = send_message(stream, message)
170                {
171                    // Failed to send buffered message
172                    break;
173                }
174            }
175            buffer.1 = 0; // Reset byte count
176        }
177    } else {
178        flush_init_log_buf_to_stderr();
179    }
180}
181
182fn flush_init_log_buf_to_stderr() {
183    if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
184        let mut combined: Vec<String> = Vec::new();
185
186        while let Some(json_str) = buffer.0.pop_front() {
187            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str)
188                && let Some(payload) = parsed.get("payload")
189                && let Some(msg) = payload.get("message")
190                && let Some(msg_str) = msg.as_str()
191                && let Ok(decoded) = BASE64_STANDARD.decode(msg_str)
192                && let Ok(text) = String::from_utf8(decoded)
193            {
194                combined.push(text);
195            }
196        }
197
198        if !combined.is_empty() {
199            eprint!("{}", combined.join(""));
200        }
201
202        buffer.1 = 0;
203    }
204}
205
206/// Trait that abstracts over handler function signatures that return types implementing IntoFunctionResponse
207pub trait Handler {
208    type Output: IntoFunctionResponse;
209    type Future: Future<Output = Self::Output> + Send + 'static;
210    fn call(&self, req: Request, state: AppState) -> Self::Future;
211}
212
213/// Implementation for handlers that return IntoFunctionResponse types
214impl<F, Fut, R> Handler for F
215where
216    F: Fn(Request, AppState) -> Fut + Clone + Send + 'static,
217    Fut: Future<Output = R> + Send + 'static,
218    R: IntoFunctionResponse,
219{
220    type Output = R;
221    type Future = Fut;
222
223    fn call(&self, req: Request, state: AppState) -> Self::Future {
224        self(req, state)
225    }
226}
227
228/// Wrapper for stateless handlers that return IntoFunctionResponse types
229#[derive(Clone)]
230pub struct StatelessHandler<F> {
231    f: F,
232}
233
234impl<F, Fut, R> Handler for StatelessHandler<F>
235where
236    F: Fn(Request) -> Fut + Clone + Send + 'static,
237    Fut: Future<Output = R> + Send + 'static,
238    R: IntoFunctionResponse,
239{
240    type Output = R;
241    type Future = Fut;
242
243    fn call(&self, req: Request, _state: AppState) -> Self::Future {
244        (self.f)(req)
245    }
246}
247
248/// Service function creation trait for different handler signatures
249pub trait IntoServiceFn<Args> {
250    type Handler: Handler;
251    fn into_service_fn(self) -> ServiceFn<Self::Handler>;
252}
253
254/// Implementation for handlers that take Request and AppState
255impl<F, Fut, R> IntoServiceFn<(Request, AppState)> for F
256where
257    F: Fn(Request, AppState) -> Fut + Clone + Send + 'static,
258    Fut: Future<Output = R> + Send + 'static,
259    R: IntoFunctionResponse,
260{
261    type Handler = F;
262
263    fn into_service_fn(self) -> ServiceFn<Self::Handler> {
264        ServiceFn { f: self }
265    }
266}
267
268/// Implementation for handlers that only take Request (stateless)
269impl<F, Fut, R> IntoServiceFn<(Request,)> for F
270where
271    F: Fn(Request) -> Fut + Clone + Send + 'static,
272    Fut: Future<Output = R> + Send + 'static,
273    R: IntoFunctionResponse,
274{
275    type Handler = StatelessHandler<F>;
276
277    fn into_service_fn(self) -> ServiceFn<Self::Handler> {
278        ServiceFn {
279            f: StatelessHandler { f: self },
280        }
281    }
282}
283
284/// Creates a Tower service from a handler function
285pub fn service_fn<F, Args>(handler: F) -> ServiceFn<F::Handler>
286where
287    F: IntoServiceFn<Args>,
288{
289    handler.into_service_fn()
290}
291
292/// A Tower service wrapper around a handler function
293#[derive(Clone)]
294pub struct ServiceFn<H> {
295    f: H,
296}
297
298impl<H> Service<(AppState, Request)> for ServiceFn<H>
299where
300    H: Handler + Clone + Send + 'static,
301    H::Future: Send + 'static,
302    H::Output: Send + 'static,
303{
304    type Response = Response<ResponseBody>;
305    type Error = Error;
306    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
307
308    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
309        Poll::Ready(Ok(()))
310    }
311
312    fn call(&mut self, (state, req): (AppState, Request)) -> Self::Future {
313        let f = self.f.clone();
314        Box::pin(async move {
315            let result = f.call(req, state).await;
316            result.into_response()
317        })
318    }
319}
320
321/// Run a Tower service with Vercel's runtime
322pub async fn run<S>(service: S) -> Result<(), Error>
323where
324    S: tower::Service<
325            (AppState, hyper::Request<hyper::body::Incoming>),
326            Response = Response<ResponseBody>,
327            Error = Error,
328        > + Send
329        + Clone
330        + 'static,
331    S::Future: Send + 'static,
332{
333    let ipc_stream = match env::var("VERCEL_IPC_PATH") {
334        Ok(ipc_path) => match UnixStream::connect(ipc_path) {
335            Ok(stream) => Some(Arc::new(Mutex::new(stream))),
336            Err(_) => None,
337        },
338        Err(_) => None,
339    };
340
341    let port = 3000;
342    let addr = SocketAddr::from(([127, 0, 0, 1], port));
343    let listener = TcpListener::bind(addr).await?;
344
345    if let Some(ref ipc_stream_ref) = ipc_stream {
346        let start_message = StartMessage::new(0, port);
347        if let Err(_e) = send_message(ipc_stream_ref, start_message) {
348            // Failed to send start message to IPC
349        } else {
350            IPC_READY.store(true, Ordering::Relaxed);
351            flush_init_log_buffer(&ipc_stream);
352        }
353    } else {
354        println!("Dev server listening: {}", port);
355        flush_init_log_buffer(&ipc_stream);
356    };
357
358    loop {
359        let (stream, _) = listener.accept().await?;
360        let io = TokioIo::new(stream);
361        let ipc_stream_clone = ipc_stream.clone();
362        let service_clone = service.clone();
363
364        tokio::task::spawn(async move {
365            if let Err(_e) = http1::Builder::new()
366                .keep_alive(true)
367                .half_close(true)
368                .serve_connection(
369                    io,
370                    hyper_service_fn(move |req| {
371                        let ipc_stream_clone = ipc_stream_clone.clone();
372                        let mut service_clone = service_clone.clone();
373
374                        // Extract information for IPC before calling handler
375                        let invocation_id = req
376                            .headers()
377                            .get("x-vercel-internal-invocation-id")
378                            .and_then(|v| v.to_str().ok())
379                            .map(|s| s.to_owned());
380
381                        let request_id = req
382                            .headers()
383                            .get("x-vercel-internal-request-id")
384                            .and_then(|v| v.to_str().ok())
385                            .and_then(|s| s.parse::<u64>().ok());
386
387                        let app_state = AppState::new(LogContext::new(
388                            ipc_stream_clone,
389                            invocation_id.clone(),
390                            request_id,
391                        ));
392
393                        async move {
394                            let ipc_stream_for_end = app_state.log_context.ipc_stream.clone();
395
396                            if req.uri().path() == "/_vercel/ping" {
397                                let response = hyper::Response::builder()
398                                    .status(200)
399                                    .body(ResponseBody::from("OK"))
400                                    .unwrap();
401                                return Ok::<_, Infallible>(response);
402                            }
403
404                            let response = match tower::ServiceExt::ready(&mut service_clone).await
405                            {
406                                Ok(ready_service) => {
407                                    match tower::Service::call(ready_service, (app_state, req))
408                                        .await
409                                    {
410                                        Ok(resp) => resp,
411                                        Err(_e) => {
412                                            // Service error
413                                            hyper::Response::builder()
414                                                .status(500)
415                                                .header("connection", "close")
416                                                .body(ResponseBody::from("Internal Server Error"))
417                                                .unwrap()
418                                        }
419                                    }
420                                }
421                                Err(_e) => {
422                                    // Service not ready
423                                    hyper::Response::builder()
424                                        .status(500)
425                                        .header("connection", "close")
426                                        .body(ResponseBody::from("Service Not Ready"))
427                                        .unwrap()
428                                }
429                            };
430
431                            if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
432                                (&ipc_stream_for_end, &invocation_id, &request_id)
433                            {
434                                let end_message = EndMessage::new(inv_id.clone(), *req_id, None);
435                                if let Err(_e) = send_message(ipc_stream, end_message) {
436                                    // Failed to send end message
437                                }
438                            }
439
440                            Ok::<_, Infallible>(response)
441                        }
442                    }),
443                )
444                .await
445            {
446                // Error serving connection
447            }
448        });
449    }
450}