vercel_runtime/
lib.rs

1use serde::{Deserialize, Serialize};
2use std::convert::Infallible;
3use std::env;
4use std::future::Future;
5use std::io::prelude::*;
6use std::net::SocketAddr;
7use std::os::unix::net::UnixStream;
8use std::sync::{Arc, Mutex};
9
10use http_body_util::BodyExt;
11use hyper::body::Bytes;
12use hyper::server::conn::http1;
13use hyper::service::service_fn;
14use hyper_util::rt::TokioIo;
15use tokio::net::TcpListener;
16
17#[derive(Serialize, Deserialize, Debug)]
18pub struct RequestContext {
19    #[serde(rename = "invocationId")]
20    pub invocation_id: String,
21    #[serde(rename = "requestId")]
22    pub request_id: u64,
23}
24
25#[derive(Serialize, Deserialize, Debug)]
26pub struct StartMessage {
27    #[serde(rename = "type")]
28    pub message_type: String,
29    pub payload: StartPayload,
30}
31
32#[derive(Serialize, Deserialize, Debug)]
33pub struct StartPayload {
34    #[serde(rename = "initDuration")]
35    pub init_duration: u64,
36    #[serde(rename = "httpPort")]
37    pub http_port: u16,
38}
39
40impl StartMessage {
41    pub fn new(init_duration: u64, http_port: u16) -> Self {
42        Self {
43            message_type: "server-started".to_string(),
44            payload: StartPayload {
45                init_duration,
46                http_port,
47            },
48        }
49    }
50}
51
52#[derive(Serialize, Deserialize, Debug)]
53pub struct EndMessage {
54    #[serde(rename = "type")]
55    pub message_type: String,
56    pub payload: EndPayload,
57}
58
59#[derive(Serialize, Deserialize, Debug)]
60pub struct EndPayload {
61    pub context: RequestContext,
62    pub error: Option<serde_json::Value>,
63}
64
65impl EndMessage {
66    pub fn new(invocation_id: String, request_id: u64, error: Option<serde_json::Value>) -> Self {
67        Self {
68            message_type: "end".to_string(),
69            payload: EndPayload {
70                context: RequestContext {
71                    invocation_id,
72                    request_id,
73                },
74                error,
75            },
76        }
77    }
78}
79
80#[derive(Serialize, Deserialize, Debug)]
81pub struct MetricMessage {
82    #[serde(rename = "type")]
83    pub message_type: String,
84    pub payload: MetricPayload,
85}
86
87#[derive(Serialize, Deserialize, Debug)]
88pub struct MetricPayload {
89    pub context: RequestContext,
90    #[serde(rename = "type")]
91    pub metric_type: Option<String>,
92    #[serde(rename = "payload")]
93    pub metric_payload: Option<serde_json::Value>,
94}
95
96impl MetricMessage {
97    pub fn new(
98        invocation_id: String,
99        request_id: u64,
100        metric_type: Option<String>,
101        metric_payload: Option<serde_json::Value>,
102    ) -> Self {
103        Self {
104            message_type: "metric".to_string(),
105            payload: MetricPayload {
106                context: RequestContext {
107                    invocation_id,
108                    request_id,
109                },
110                metric_type,
111                metric_payload,
112            },
113        }
114    }
115}
116
117#[derive(Serialize, Deserialize, Debug)]
118#[serde(rename_all = "lowercase")]
119pub enum Stream {
120    Stdout,
121    Stderr,
122}
123
124#[derive(Serialize, Deserialize, Debug)]
125#[serde(rename_all = "lowercase")]
126pub enum Level {
127    Trace,
128    Debug,
129    Info,
130    Warn,
131    Error,
132}
133
134#[derive(Serialize, Deserialize, Debug)]
135#[serde(untagged)]
136pub enum LogType {
137    Stream { stream: Stream },
138    Level { level: Level },
139}
140
141#[derive(Serialize, Deserialize, Debug)]
142pub struct LogPayload {
143    pub context: RequestContext,
144    pub message: String,
145    #[serde(flatten)]
146    pub log_type: LogType,
147}
148
149#[derive(Serialize, Deserialize, Debug)]
150pub struct LogMessage {
151    #[serde(rename = "type")]
152    pub message_type: String,
153    pub payload: LogPayload,
154}
155
156impl LogMessage {
157    pub fn stream(invocation_id: String, request_id: u64, message: String, stream: Stream) -> Self {
158        Self {
159            message_type: "log".to_string(),
160            payload: LogPayload {
161                context: RequestContext {
162                    invocation_id,
163                    request_id,
164                },
165                message,
166                log_type: LogType::Stream { stream },
167            },
168        }
169    }
170
171    pub fn level(invocation_id: String, request_id: u64, message: String, level: Level) -> Self {
172        Self {
173            message_type: "log".to_string(),
174            payload: LogPayload {
175                context: RequestContext {
176                    invocation_id,
177                    request_id,
178                },
179                message,
180                log_type: LogType::Level { level },
181            },
182        }
183    }
184
185    pub fn encode_message(message: &str) -> String {
186        use base64::Engine;
187        use base64::engine::general_purpose::STANDARD as BASE64_ENCODER;
188        BASE64_ENCODER.encode(message)
189    }
190
191    pub fn with_stream(
192        invocation_id: String,
193        request_id: u64,
194        message: &str,
195        stream: Stream,
196    ) -> Self {
197        Self::stream(
198            invocation_id,
199            request_id,
200            Self::encode_message(message),
201            stream,
202        )
203    }
204
205    pub fn with_level(invocation_id: String, request_id: u64, message: &str, level: Level) -> Self {
206        Self::level(
207            invocation_id,
208            request_id,
209            Self::encode_message(message),
210            level,
211        )
212    }
213}
214
215pub type ResponseBody = http_body_util::combinators::BoxBody<Bytes, Error>;
216pub type Error = Box<dyn std::error::Error + Send + Sync>;
217pub use hyper::Response;
218pub type Request = hyper::Request<hyper::body::Incoming>;
219pub struct ResponseBuilder;
220
221impl ResponseBuilder {
222    #[allow(clippy::new_ret_no_self)]
223    pub fn new() -> hyper::http::response::Builder {
224        hyper::Response::builder()
225    }
226}
227
228/// Trait for automatic body conversion to ResponseBody
229pub trait IntoResponseBody {
230    fn into_response_body(self) -> ResponseBody;
231}
232
233impl IntoResponseBody for &str {
234    fn into_response_body(self) -> ResponseBody {
235        http_body_util::Full::new(Bytes::from(self.to_string()))
236            .map_err(|e| Box::new(e) as Error)
237            .boxed()
238    }
239}
240
241impl IntoResponseBody for String {
242    fn into_response_body(self) -> ResponseBody {
243        http_body_util::Full::new(Bytes::from(self))
244            .map_err(|e| Box::new(e) as Error)
245            .boxed()
246    }
247}
248
249impl IntoResponseBody for Bytes {
250    fn into_response_body(self) -> ResponseBody {
251        http_body_util::Full::new(self)
252            .map_err(|e| Box::new(e) as Error)
253            .boxed()
254    }
255}
256
257impl IntoResponseBody for http_body_util::Full<Bytes> {
258    fn into_response_body(self) -> ResponseBody {
259        self.map_err(|e| Box::new(e) as Error).boxed()
260    }
261}
262
263impl<T> IntoResponseBody for http_body_util::StreamBody<T>
264where
265    T: tokio_stream::Stream<Item = Result<hyper::body::Frame<Bytes>, Error>>
266        + Send
267        + Sync
268        + 'static,
269{
270    fn into_response_body(self) -> ResponseBody {
271        self.boxed()
272    }
273}
274
275#[derive(Clone)]
276pub struct LogContext {
277    ipc_stream: Option<Arc<Mutex<UnixStream>>>,
278    invocation_id: Option<String>,
279    request_id: Option<u64>,
280}
281
282impl LogContext {
283    pub fn new(
284        ipc_stream: Option<Arc<Mutex<UnixStream>>>,
285        invocation_id: Option<String>,
286        request_id: Option<u64>,
287    ) -> Self {
288        Self {
289            ipc_stream,
290            invocation_id,
291            request_id,
292        }
293    }
294
295    pub fn info(&self, msg: &str) {
296        self.log(Level::Info, msg);
297    }
298
299    pub fn error(&self, msg: &str) {
300        self.log(Level::Error, msg);
301    }
302
303    pub fn warn(&self, msg: &str) {
304        self.log(Level::Warn, msg);
305    }
306
307    pub fn debug(&self, msg: &str) {
308        self.log(Level::Debug, msg);
309    }
310
311    fn log(&self, level: Level, msg: &str) {
312        if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
313            (&self.ipc_stream, &self.invocation_id, &self.request_id)
314        {
315            let log = LogMessage::with_level(inv_id.clone(), *req_id, msg, level);
316            send_message(ipc_stream, log).unwrap_or_else(|e| {
317                eprintln!("Failed to send log message: {}", e);
318            });
319        } else {
320            // Fall back to regular println when IPC is not available
321            println!("{:?}: {}", level, msg);
322        }
323    }
324}
325
326#[derive(Clone)]
327pub struct AppState {
328    pub log_context: LogContext,
329}
330
331impl AppState {
332    pub fn new(log_context: LogContext) -> Self {
333        Self { log_context }
334    }
335}
336
337pub fn send_message<T: Serialize>(
338    stream: &Arc<Mutex<UnixStream>>,
339    message: T,
340) -> Result<(), Error> {
341    let mut stream = stream.lock().unwrap();
342    let json_str = serde_json::to_string(&message)?;
343    let msg = format!("{json_str}\0");
344    stream.write_all(msg.as_bytes())?;
345    Ok(())
346}
347
348pub async fn run<H, F>(handler: H) -> Result<(), Error>
349where
350    H: Fn(AppState, hyper::Request<hyper::body::Incoming>) -> F + Send + Sync + 'static + Copy,
351    F: Future<Output = Result<Response<ResponseBody>, Error>> + Send + 'static,
352{
353    let ipc_stream = match env::var("VERCEL_IPC_PATH") {
354        Ok(ipc_path) => match UnixStream::connect(ipc_path) {
355            Ok(stream) => Some(Arc::new(Mutex::new(stream))),
356            Err(e) => {
357                eprintln!(
358                    "Warning: Failed to connect to IPC stream: {}. Running without IPC support.",
359                    e
360                );
361                None
362            }
363        },
364        Err(_) => {
365            // No IPC available (dev mode like Bun)
366            None
367        }
368    };
369
370    let port = 3000;
371    let addr = SocketAddr::from(([127, 0, 0, 1], port));
372    let listener = TcpListener::bind(addr).await?;
373
374    // Send IPC start message
375    if let Some(ref ipc_stream) = ipc_stream {
376        let start_message = StartMessage::new(0, port);
377        send_message(ipc_stream, start_message)?;
378    } else {
379        // If we couldn't find an IPC stream, we are in `vercel dev` mode,
380        // Print to stdout for dev server to parse (see ./start-dev-server.ts)
381        println!("Dev server listening: {}", port);
382    };
383
384    loop {
385        let (stream, _) = listener.accept().await?;
386
387        let io = TokioIo::new(stream);
388        let ipc_stream_clone = ipc_stream.clone();
389
390        tokio::task::spawn(async move {
391            if let Err(err) = http1::Builder::new()
392                .serve_connection(
393                    io,
394                    service_fn(move |req| {
395                        let ipc_stream_clone = ipc_stream_clone.clone();
396
397                        // Extract information for IPC before calling handler
398                        let invocation_id = req
399                            .headers()
400                            .get("x-vercel-internal-invocation-id")
401                            .and_then(|v| v.to_str().ok())
402                            .map(|s| s.to_owned());
403
404                        let request_id = req
405                            .headers()
406                            .get("x-vercel-internal-request-id")
407                            .and_then(|v| v.to_str().ok())
408                            .and_then(|s| s.parse::<u64>().ok());
409
410                        let app_state = AppState::new(LogContext::new(
411                            ipc_stream_clone,
412                            invocation_id.clone(),
413                            request_id,
414                        ));
415
416                        async move {
417                            let ipc_stream_for_end = app_state.log_context.ipc_stream.clone();
418
419                            if req.uri().path() == "/_vercel/ping" {
420                                let response = hyper::Response::builder()
421                                    .status(200)
422                                    .body("OK".into_response_body())
423                                    .unwrap();
424                                return Ok::<_, Infallible>(response);
425                            }
426
427                            let response = match handler(app_state, req).await {
428                                Ok(resp) => resp,
429                                Err(e) => {
430                                    eprintln!("Handler error: {}", e);
431                                    let error_body = http_body_util::Full::new(Bytes::from(
432                                        "Internal Server Error",
433                                    ));
434                                    hyper::Response::builder()
435                                        .status(500)
436                                        .body(error_body.map_err(|e| Box::new(e) as Error).boxed())
437                                        .unwrap()
438                                }
439                            };
440
441                            if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
442                                (&ipc_stream_for_end, &invocation_id, &request_id)
443                            {
444                                let end_message = EndMessage::new(inv_id.clone(), *req_id, None);
445                                send_message(ipc_stream, end_message).unwrap_or_else(|e| {
446                                    eprintln!("Failed to send end message: {}", e);
447                                });
448                            }
449
450                            Ok::<_, Infallible>(response)
451                        }
452                    }),
453                )
454                .await
455            {
456                eprintln!("Error serving connection: {:?}", err);
457            }
458        });
459    }
460}