1use serde::{Deserialize, Serialize};
2use std::convert::Infallible;
3use std::env;
4use std::future::Future;
5use std::io::prelude::*;
6use std::net::SocketAddr;
7use std::os::unix::net::UnixStream;
8use std::sync::{Arc, Mutex};
9
10use http_body_util::BodyExt;
11use hyper::body::Bytes;
12use hyper::server::conn::http1;
13use hyper::service::service_fn;
14use hyper_util::rt::TokioIo;
15use tokio::net::TcpListener;
16
17#[derive(Serialize, Deserialize, Debug)]
18pub struct RequestContext {
19 #[serde(rename = "invocationId")]
20 pub invocation_id: String,
21 #[serde(rename = "requestId")]
22 pub request_id: u64,
23}
24
25#[derive(Serialize, Deserialize, Debug)]
26pub struct StartMessage {
27 #[serde(rename = "type")]
28 pub message_type: String,
29 pub payload: StartPayload,
30}
31
32#[derive(Serialize, Deserialize, Debug)]
33pub struct StartPayload {
34 #[serde(rename = "initDuration")]
35 pub init_duration: u64,
36 #[serde(rename = "httpPort")]
37 pub http_port: u16,
38}
39
40impl StartMessage {
41 pub fn new(init_duration: u64, http_port: u16) -> Self {
42 Self {
43 message_type: "server-started".to_string(),
44 payload: StartPayload {
45 init_duration,
46 http_port,
47 },
48 }
49 }
50}
51
52#[derive(Serialize, Deserialize, Debug)]
53pub struct EndMessage {
54 #[serde(rename = "type")]
55 pub message_type: String,
56 pub payload: EndPayload,
57}
58
59#[derive(Serialize, Deserialize, Debug)]
60pub struct EndPayload {
61 pub context: RequestContext,
62 pub error: Option<serde_json::Value>,
63}
64
65impl EndMessage {
66 pub fn new(invocation_id: String, request_id: u64, error: Option<serde_json::Value>) -> Self {
67 Self {
68 message_type: "end".to_string(),
69 payload: EndPayload {
70 context: RequestContext {
71 invocation_id,
72 request_id,
73 },
74 error,
75 },
76 }
77 }
78}
79
80#[derive(Serialize, Deserialize, Debug)]
81pub struct MetricMessage {
82 #[serde(rename = "type")]
83 pub message_type: String,
84 pub payload: MetricPayload,
85}
86
87#[derive(Serialize, Deserialize, Debug)]
88pub struct MetricPayload {
89 pub context: RequestContext,
90 #[serde(rename = "type")]
91 pub metric_type: Option<String>,
92 #[serde(rename = "payload")]
93 pub metric_payload: Option<serde_json::Value>,
94}
95
96impl MetricMessage {
97 pub fn new(
98 invocation_id: String,
99 request_id: u64,
100 metric_type: Option<String>,
101 metric_payload: Option<serde_json::Value>,
102 ) -> Self {
103 Self {
104 message_type: "metric".to_string(),
105 payload: MetricPayload {
106 context: RequestContext {
107 invocation_id,
108 request_id,
109 },
110 metric_type,
111 metric_payload,
112 },
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize, Debug)]
118#[serde(rename_all = "lowercase")]
119pub enum Stream {
120 Stdout,
121 Stderr,
122}
123
124#[derive(Serialize, Deserialize, Debug)]
125#[serde(rename_all = "lowercase")]
126pub enum Level {
127 Trace,
128 Debug,
129 Info,
130 Warn,
131 Error,
132}
133
134#[derive(Serialize, Deserialize, Debug)]
135#[serde(untagged)]
136pub enum LogType {
137 Stream { stream: Stream },
138 Level { level: Level },
139}
140
141#[derive(Serialize, Deserialize, Debug)]
142pub struct LogPayload {
143 pub context: RequestContext,
144 pub message: String,
145 #[serde(flatten)]
146 pub log_type: LogType,
147}
148
149#[derive(Serialize, Deserialize, Debug)]
150pub struct LogMessage {
151 #[serde(rename = "type")]
152 pub message_type: String,
153 pub payload: LogPayload,
154}
155
156impl LogMessage {
157 pub fn stream(invocation_id: String, request_id: u64, message: String, stream: Stream) -> Self {
158 Self {
159 message_type: "log".to_string(),
160 payload: LogPayload {
161 context: RequestContext {
162 invocation_id,
163 request_id,
164 },
165 message,
166 log_type: LogType::Stream { stream },
167 },
168 }
169 }
170
171 pub fn level(invocation_id: String, request_id: u64, message: String, level: Level) -> Self {
172 Self {
173 message_type: "log".to_string(),
174 payload: LogPayload {
175 context: RequestContext {
176 invocation_id,
177 request_id,
178 },
179 message,
180 log_type: LogType::Level { level },
181 },
182 }
183 }
184
185 pub fn encode_message(message: &str) -> String {
186 use base64::Engine;
187 use base64::engine::general_purpose::STANDARD as BASE64_ENCODER;
188 BASE64_ENCODER.encode(message)
189 }
190
191 pub fn with_stream(
192 invocation_id: String,
193 request_id: u64,
194 message: &str,
195 stream: Stream,
196 ) -> Self {
197 Self::stream(
198 invocation_id,
199 request_id,
200 Self::encode_message(message),
201 stream,
202 )
203 }
204
205 pub fn with_level(invocation_id: String, request_id: u64, message: &str, level: Level) -> Self {
206 Self::level(
207 invocation_id,
208 request_id,
209 Self::encode_message(message),
210 level,
211 )
212 }
213}
214
215pub type ResponseBody = http_body_util::combinators::BoxBody<Bytes, Error>;
216pub type Error = Box<dyn std::error::Error + Send + Sync>;
217pub use hyper::Response;
218pub type Request = hyper::Request<hyper::body::Incoming>;
219pub struct ResponseBuilder;
220
221impl ResponseBuilder {
222 #[allow(clippy::new_ret_no_self)]
223 pub fn new() -> hyper::http::response::Builder {
224 hyper::Response::builder()
225 }
226}
227
228pub trait IntoResponseBody {
230 fn into_response_body(self) -> ResponseBody;
231}
232
233impl IntoResponseBody for &str {
234 fn into_response_body(self) -> ResponseBody {
235 http_body_util::Full::new(Bytes::from(self.to_string()))
236 .map_err(|e| Box::new(e) as Error)
237 .boxed()
238 }
239}
240
241impl IntoResponseBody for String {
242 fn into_response_body(self) -> ResponseBody {
243 http_body_util::Full::new(Bytes::from(self))
244 .map_err(|e| Box::new(e) as Error)
245 .boxed()
246 }
247}
248
249impl IntoResponseBody for Bytes {
250 fn into_response_body(self) -> ResponseBody {
251 http_body_util::Full::new(self)
252 .map_err(|e| Box::new(e) as Error)
253 .boxed()
254 }
255}
256
257impl IntoResponseBody for http_body_util::Full<Bytes> {
258 fn into_response_body(self) -> ResponseBody {
259 self.map_err(|e| Box::new(e) as Error).boxed()
260 }
261}
262
263impl<T> IntoResponseBody for http_body_util::StreamBody<T>
264where
265 T: tokio_stream::Stream<Item = Result<hyper::body::Frame<Bytes>, Error>>
266 + Send
267 + Sync
268 + 'static,
269{
270 fn into_response_body(self) -> ResponseBody {
271 self.boxed()
272 }
273}
274
275#[derive(Clone)]
276pub struct LogContext {
277 ipc_stream: Option<Arc<Mutex<UnixStream>>>,
278 invocation_id: Option<String>,
279 request_id: Option<u64>,
280}
281
282impl LogContext {
283 pub fn new(
284 ipc_stream: Option<Arc<Mutex<UnixStream>>>,
285 invocation_id: Option<String>,
286 request_id: Option<u64>,
287 ) -> Self {
288 Self {
289 ipc_stream,
290 invocation_id,
291 request_id,
292 }
293 }
294
295 pub fn info(&self, msg: &str) {
296 self.log(Level::Info, msg);
297 }
298
299 pub fn error(&self, msg: &str) {
300 self.log(Level::Error, msg);
301 }
302
303 pub fn warn(&self, msg: &str) {
304 self.log(Level::Warn, msg);
305 }
306
307 pub fn debug(&self, msg: &str) {
308 self.log(Level::Debug, msg);
309 }
310
311 fn log(&self, level: Level, msg: &str) {
312 if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
313 (&self.ipc_stream, &self.invocation_id, &self.request_id)
314 {
315 let log = LogMessage::with_level(inv_id.clone(), *req_id, msg, level);
316 send_message(ipc_stream, log).unwrap_or_else(|e| {
317 eprintln!("Failed to send log message: {}", e);
318 });
319 } else {
320 println!("{:?}: {}", level, msg);
322 }
323 }
324}
325
326#[derive(Clone)]
327pub struct AppState {
328 pub log_context: LogContext,
329}
330
331impl AppState {
332 pub fn new(log_context: LogContext) -> Self {
333 Self { log_context }
334 }
335}
336
337pub fn send_message<T: Serialize>(
338 stream: &Arc<Mutex<UnixStream>>,
339 message: T,
340) -> Result<(), Error> {
341 let mut stream = stream.lock().unwrap();
342 let json_str = serde_json::to_string(&message)?;
343 let msg = format!("{json_str}\0");
344 stream.write_all(msg.as_bytes())?;
345 Ok(())
346}
347
348pub async fn run<H, F>(handler: H) -> Result<(), Error>
349where
350 H: Fn(AppState, hyper::Request<hyper::body::Incoming>) -> F + Send + Sync + 'static + Copy,
351 F: Future<Output = Result<Response<ResponseBody>, Error>> + Send + 'static,
352{
353 let ipc_stream = match env::var("VERCEL_IPC_PATH") {
354 Ok(ipc_path) => match UnixStream::connect(ipc_path) {
355 Ok(stream) => Some(Arc::new(Mutex::new(stream))),
356 Err(e) => {
357 eprintln!(
358 "Warning: Failed to connect to IPC stream: {}. Running without IPC support.",
359 e
360 );
361 None
362 }
363 },
364 Err(_) => {
365 None
367 }
368 };
369
370 let port = 3000;
371 let addr = SocketAddr::from(([127, 0, 0, 1], port));
372 let listener = TcpListener::bind(addr).await?;
373
374 if let Some(ref ipc_stream) = ipc_stream {
376 let start_message = StartMessage::new(0, port);
377 send_message(ipc_stream, start_message)?;
378 } else {
379 println!("Dev server listening: {}", port);
382 };
383
384 loop {
385 let (stream, _) = listener.accept().await?;
386
387 let io = TokioIo::new(stream);
388 let ipc_stream_clone = ipc_stream.clone();
389
390 tokio::task::spawn(async move {
391 if let Err(err) = http1::Builder::new()
392 .serve_connection(
393 io,
394 service_fn(move |req| {
395 let ipc_stream_clone = ipc_stream_clone.clone();
396
397 let invocation_id = req
399 .headers()
400 .get("x-vercel-internal-invocation-id")
401 .and_then(|v| v.to_str().ok())
402 .map(|s| s.to_owned());
403
404 let request_id = req
405 .headers()
406 .get("x-vercel-internal-request-id")
407 .and_then(|v| v.to_str().ok())
408 .and_then(|s| s.parse::<u64>().ok());
409
410 let app_state = AppState::new(LogContext::new(
411 ipc_stream_clone,
412 invocation_id.clone(),
413 request_id,
414 ));
415
416 async move {
417 let ipc_stream_for_end = app_state.log_context.ipc_stream.clone();
418
419 if req.uri().path() == "/_vercel/ping" {
420 let response = hyper::Response::builder()
421 .status(200)
422 .body("OK".into_response_body())
423 .unwrap();
424 return Ok::<_, Infallible>(response);
425 }
426
427 let response = match handler(app_state, req).await {
428 Ok(resp) => resp,
429 Err(e) => {
430 eprintln!("Handler error: {}", e);
431 let error_body = http_body_util::Full::new(Bytes::from(
432 "Internal Server Error",
433 ));
434 hyper::Response::builder()
435 .status(500)
436 .body(error_body.map_err(|e| Box::new(e) as Error).boxed())
437 .unwrap()
438 }
439 };
440
441 if let (Some(ipc_stream), Some(inv_id), Some(req_id)) =
442 (&ipc_stream_for_end, &invocation_id, &request_id)
443 {
444 let end_message = EndMessage::new(inv_id.clone(), *req_id, None);
445 send_message(ipc_stream, end_message).unwrap_or_else(|e| {
446 eprintln!("Failed to send end message: {}", e);
447 });
448 }
449
450 Ok::<_, Infallible>(response)
451 }
452 }),
453 )
454 .await
455 {
456 eprintln!("Error serving connection: {:?}", err);
457 }
458 });
459 }
460}