1use base64::prelude::*;
2use hyper::server::conn::http1;
3use hyper::service::service_fn as hyper_service_fn;
4use hyper_util::rt::TokioIo;
5use serde::Serialize;
6use std::collections::VecDeque;
7use std::convert::Infallible;
8use std::env;
9use std::future::Future;
10use std::io::prelude::*;
11use std::net::SocketAddr;
12use std::os::unix::net::UnixStream;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17use tokio::net::TcpListener;
18use tower::Service;
19
20pub use hyper::Response;
21pub use types::{Error, ResponseBody};
22pub type Request = hyper::Request<hyper::body::Incoming>;
23
24#[cfg(feature = "axum")]
25pub mod axum;
26
27mod ipc;
28mod types;
29use crate::ipc::core::{EndMessage, StartMessage};
30use crate::ipc::log::{Level, LogMessage};
31use crate::types::IntoFunctionResponse;
32
33static IPC_READY: AtomicBool = AtomicBool::new(false);
34static INIT_LOG_BUF_MAX_BYTES: usize = 1_000_000;
35
36lazy_static::lazy_static! {
37 static ref INIT_LOG_BUFFER: Arc<Mutex<(VecDeque<String>, usize)>> = {
38 register_exit_handler();
39 Arc::new(Mutex::new((VecDeque::new(), 0)))
40 };
41}
42
43fn register_exit_handler() {
45 extern "C" fn exit_handler() {
46 flush_init_log_buf_to_stderr();
47 }
48 unsafe {
49 libc::atexit(exit_handler);
50 }
51}
52
53#[derive(Clone)]
54pub struct LogContext {
55 ipc_stream: Option<Arc<Mutex<UnixStream>>>,
56 invocation_id: Option<String>,
57 request_id: Option<u64>,
58}
59
60impl LogContext {
61 pub fn new(
62 ipc_stream: Option<Arc<Mutex<UnixStream>>>,
63 invocation_id: Option<String>,
64 request_id: Option<u64>,
65 ) -> Self {
66 Self {
67 ipc_stream,
68 invocation_id,
69 request_id,
70 }
71 }
72
73 pub fn info(&self, msg: &str) {
74 self.log(Level::Info, msg);
75 }
76
77 pub fn error(&self, msg: &str) {
78 self.log(Level::Error, msg);
79 }
80
81 pub fn warn(&self, msg: &str) {
82 self.log(Level::Warn, msg);
83 }
84
85 pub fn debug(&self, msg: &str) {
86 self.log(Level::Debug, msg);
87 }
88
89 fn log(&self, level: Level, msg: &str) {
90 if let (Some(inv_id), Some(req_id)) = (&self.invocation_id, &self.request_id) {
91 let log = LogMessage::with_level(inv_id.clone(), *req_id, msg, level);
92 if let Err(_e) = enqueue_or_send_message(&self.ipc_stream, log) {
93 }
95 } else {
96 println!("{:?}: {}", level, msg);
98 }
99 }
100}
101
102#[derive(Clone)]
103pub struct AppState {
104 pub log_context: LogContext,
105}
106
107impl AppState {
108 pub fn new(log_context: LogContext) -> Self {
109 Self { log_context }
110 }
111}
112
113fn send_message<T: Serialize>(stream: &Arc<Mutex<UnixStream>>, message: T) -> Result<(), Error> {
114 let json_str = serde_json::to_string(&message)?;
115 let msg = format!("{json_str}\0");
116
117 let mut stream = stream.lock().map_err(|e| {
118 Box::new(std::io::Error::other(format!(
119 "Failed to acquire stream lock: {}",
120 e
121 ))) as Error
122 })?;
123
124 stream.write_all(msg.as_bytes())?;
125 stream.flush()?;
126 Ok(())
127}
128
129fn enqueue_or_send_message<T: Serialize>(
130 stream: &Option<Arc<Mutex<UnixStream>>>,
131 message: T,
132) -> Result<(), Error> {
133 if IPC_READY.load(Ordering::Relaxed)
134 && let Some(stream) = stream
135 {
136 return send_message(stream, message);
137 }
138
139 let json_str = serde_json::to_string(&message)?;
141 let msg_len = json_str.len();
142
143 if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
144 if buffer.1 + msg_len <= INIT_LOG_BUF_MAX_BYTES {
145 buffer.0.push_back(json_str);
146 buffer.1 += msg_len;
147 } else {
148 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str)
150 && let Some(payload) = parsed.get("payload")
151 && let Some(msg) = payload.get("message")
152 && let Some(msg_str) = msg.as_str()
153 && let Ok(decoded) = BASE64_STANDARD.decode(msg_str)
154 && let Ok(text) = String::from_utf8(decoded)
155 {
156 eprint!("{}", text);
157 }
158 }
159 }
160
161 Ok(())
162}
163
164fn flush_init_log_buffer(stream: &Option<Arc<Mutex<UnixStream>>>) {
165 if let Some(stream) = stream {
166 if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
167 while let Some(json_str) = buffer.0.pop_front() {
168 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&json_str)
169 && let Err(_e) = send_message(stream, message)
170 {
171 break;
173 }
174 }
175 buffer.1 = 0; }
177 } else {
178 flush_init_log_buf_to_stderr();
179 }
180}
181
182fn flush_init_log_buf_to_stderr() {
183 if let Ok(mut buffer) = INIT_LOG_BUFFER.lock() {
184 let mut combined: Vec<String> = Vec::new();
185
186 while let Some(json_str) = buffer.0.pop_front() {
187 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str)
188 && let Some(payload) = parsed.get("payload")
189 && let Some(msg) = payload.get("message")
190 && let Some(msg_str) = msg.as_str()
191 && let Ok(decoded) = BASE64_STANDARD.decode(msg_str)
192 && let Ok(text) = String::from_utf8(decoded)
193 {
194 combined.push(text);
195 }
196 }
197
198 if !combined.is_empty() {
199 eprint!("{}", combined.join(""));
200 }
201
202 buffer.1 = 0;
203 }
204}
205
206pub trait Handler {
208 type Output: IntoFunctionResponse;
209 type Future: Future<Output = Self::Output> + Send + 'static;
210 fn call(&self, req: Request, state: AppState) -> Self::Future;
211}
212
213impl<F, Fut, R> Handler for F
215where
216 F: Fn(Request, AppState) -> Fut + Clone + Send + 'static,
217 Fut: Future<Output = R> + Send + 'static,
218 R: IntoFunctionResponse,
219{
220 type Output = R;
221 type Future = Fut;
222
223 fn call(&self, req: Request, state: AppState) -> Self::Future {
224 self(req, state)
225 }
226}
227
228#[derive(Clone)]
230pub struct StatelessHandler<F> {
231 f: F,
232}
233
234impl<F, Fut, R> Handler for StatelessHandler<F>
235where
236 F: Fn(Request) -> Fut + Clone + Send + 'static,
237 Fut: Future<Output = R> + Send + 'static,
238 R: IntoFunctionResponse,
239{
240 type Output = R;
241 type Future = Fut;
242
243 fn call(&self, req: Request, _state: AppState) -> Self::Future {
244 (self.f)(req)
245 }
246}
247
248pub trait IntoServiceFn<Args> {
250 type Handler: Handler;
251 fn into_service_fn(self) -> ServiceFn<Self::Handler>;
252}
253
254impl<F, Fut, R> IntoServiceFn<(Request, AppState)> for F
256where
257 F: Fn(Request, AppState) -> Fut + Clone + Send + 'static,
258 Fut: Future<Output = R> + Send + 'static,
259 R: IntoFunctionResponse,
260{
261 type Handler = F;
262
263 fn into_service_fn(self) -> ServiceFn<Self::Handler> {
264 ServiceFn { f: self }
265 }
266}
267
268impl<F, Fut, R> IntoServiceFn<(Request,)> for F
270where
271 F: Fn(Request) -> Fut + Clone + Send + 'static,
272 Fut: Future<Output = R> + Send + 'static,
273 R: IntoFunctionResponse,
274{
275 type Handler = StatelessHandler<F>;
276
277 fn into_service_fn(self) -> ServiceFn<Self::Handler> {
278 ServiceFn {
279 f: StatelessHandler { f: self },
280 }
281 }
282}
283
284pub fn service_fn<F, Args>(handler: F) -> ServiceFn<F::Handler>
286where
287 F: IntoServiceFn<Args>,
288{
289 handler.into_service_fn()
290}
291
292#[derive(Clone)]
294pub struct ServiceFn<H> {
295 f: H,
296}
297
298impl<H> Service<(AppState, Request)> for ServiceFn<H>
299where
300 H: Handler + Clone + Send + 'static,
301 H::Future: Send + 'static,
302 H::Output: Send + 'static,
303{
304 type Response = Response<ResponseBody>;
305 type Error = Error;
306 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
307
308 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
309 Poll::Ready(Ok(()))
310 }
311
312 fn call(&mut self, (state, req): (AppState, Request)) -> Self::Future {
313 let f = self.f.clone();
314 Box::pin(async move {
315 let result = f.call(req, state).await;
316 result.into_response()
317 })
318 }
319}
320
321pub async fn run<S>(service: S) -> Result<(), Error>
323where
324 S: tower::Service<
325 (AppState, hyper::Request<hyper::body::Incoming>),
326 Response = Response<ResponseBody>,
327 Error = Error,
328 > + Send
329 + Clone
330 + 'static,
331 S::Future: Send + 'static,
332{
333 let ipc_stream = match env::var("VERCEL_IPC_PATH") {
334 Ok(ipc_path) => match UnixStream::connect(ipc_path) {
335 Ok(stream) => Some(Arc::new(Mutex::new(stream))),
336 Err(_) => None,
337 },
338 Err(_) => None,
339 };
340
341 let port = 3000;
342 let addr = SocketAddr::from(([127, 0, 0, 1], port));
343 let listener = TcpListener::bind(addr).await?;
344
345 if let Some(ref ipc_stream_ref) = ipc_stream {
346 let start_message = StartMessage::new(0, port);
347 if let Err(_e) = send_message(ipc_stream_ref, start_message) {
348 } else {
350 IPC_READY.store(true, Ordering::Relaxed);
351 flush_init_log_buffer(&ipc_stream);
352 }
353 } else {
354 println!("Dev server listening: {}", port);
355 flush_init_log_buffer(&ipc_stream);
356 };
357
358 loop {
359 let (stream, _) = listener.accept().await?;
360 let io = TokioIo::new(stream);
361 let ipc_stream_clone = ipc_stream.clone();
362 let service_clone = service.clone();
363
364 tokio::task::spawn(async move {
365 if let Err(_e) = http1::Builder::new()
366 .keep_alive(true)
367 .half_close(true)
368 .serve_connection(
369 io,
370 hyper_service_fn(move |req| {
371 let ipc_stream_clone = ipc_stream_clone.clone();
372 let mut service_clone = service_clone.clone();
373
374 let invocation_id = req
376 .headers()
377 .get("x-vercel-internal-invocation-id")
378 .and_then(|v| v.to_str().ok())
379 .map(|s| s.to_owned());
380
381 let request_id = req
382 .headers()
383 .get("x-vercel-internal-request-id")
384 .and_then(|v| v.to_str().ok())
385 .and_then(|s| s.parse::<u64>().ok());
386
387 let app_state = AppState::new(LogContext::new(
388 ipc_stream_clone,
389 invocation_id.clone(),
390 request_id,
391 ));
392
393 async move {
394 let ipc_stream_for_end = app_state.log_context.ipc_stream.clone();
395
396 if req.uri().path() == "/_vercel/ping" {
397 let response = hyper::Response::builder()
398 .status(200)
399 .body(ResponseBody::from("OK"))
400 .unwrap();
401 return Ok::<_, Infallible>(response);
402 }
403
404 let response = match tower::ServiceExt::ready(&mut service_clone).await
405 {
406 Ok(ready_service) => {
407 match tower::Service::call(ready_service, (app_state, req))
408 .await
409 {
410 Ok(resp) => resp,
411 Err(_e) => {
412 hyper::Response::builder()
414 .status(500)
415 .header("connection", "close")
416 .body(ResponseBody::from("Internal Server Error"))
417 .unwrap()
418 }
419 }
420 }
421 Err(_e) => {
422 hyper::Response::builder()
424 .status(500)
425 .header("connection", "close")
426 .body(ResponseBody::from("Service Not Ready"))
427 .unwrap()
428 }
429 };
430
431 if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
432 (&ipc_stream_for_end, &invocation_id, &request_id)
433 {
434 let end_message = EndMessage::new(inv_id.clone(), *req_id, None);
435 if let Err(_e) = send_message(ipc_stream, end_message) {
436 }
438 }
439
440 Ok::<_, Infallible>(response)
441 }
442 }),
443 )
444 .await
445 {
446 }
448 });
449 }
450}