Skip to main content

tork_core/
service.rs

1//! Request dispatch: turning an HTTP request into a response.
2
3use std::any::Any;
4use std::panic::AssertUnwindSafe;
5use std::pin::Pin;
6use std::time::Instant;
7
8use futures_util::FutureExt;
9use http::Request;
10use tracing::Instrument;
11use tracing::Level;
12
13use crate::app::{fire_request_hooks, fire_response_hooks, request_info, AppInner};
14use crate::body::ReqBody;
15use crate::error::Error;
16use crate::extract::RequestContext;
17use crate::hooks::RequestInfo;
18use crate::logging::Logger;
19use crate::response::{IntoResponse, Response};
20use crate::router::matcher::Match;
21
22impl AppInner {
23    /// Routes `request` to its handler and produces a response.
24    ///
25    /// Routing failures (`404`/`405`) and a handler's own errors are rendered here
26    /// via [`render_error`](AppInner::render_error), so the error hooks and any
27    /// exception handler run before rendering. This function never returns an
28    /// error, so an application error never tears down the connection.
29    pub async fn dispatch(&self, request: Request<ReqBody>) -> Response {
30        let (head, body) = request.into_parts();
31        let path = head.uri.path().to_owned();
32        let needs_info = self.needs_request_info();
33
34        match self.matcher().find(&head.method, &path) {
35            Match::Found { route, params } => {
36                let handler = route.handler().clone();
37                // Build metadata when an app-global hook or this route's scoped
38                // hooks need it.
39                let info = (needs_info || route.has_hooks()).then(|| {
40                    request_info(
41                        &head.method,
42                        &head.uri,
43                        &head.headers,
44                        Some(route.path().to_owned()),
45                    )
46                });
47
48                // Scoped on_request fires after routing, before the handler.
49                if let Some(info) = &info {
50                    fire_request_hooks(route.request_hooks(), info).await;
51                }
52
53                let started = info.as_ref().map(|_| Instant::now());
54                let tracing_enabled = tracing::enabled!(Level::INFO);
55                let request_meta = if tracing_enabled || self.request_logs() {
56                    Some(match &info {
57                        Some(info) => DispatchMeta {
58                            method: info.method().clone(),
59                            request_path: info.path().to_owned(),
60                            route_path: info.route().unwrap_or(route.path()).to_owned(),
61                            request_id: info.request_id().map(str::to_owned),
62                        },
63                        None => DispatchMeta {
64                            method: head.method.clone(),
65                            request_path: head.uri.path().to_owned(),
66                            route_path: route.path().to_owned(),
67                            request_id: head
68                                .headers
69                                .get("x-request-id")
70                                .and_then(|value| value.to_str().ok())
71                                .map(str::to_owned),
72                        },
73                    })
74                } else {
75                    None
76                };
77                let request_started = Instant::now();
78
79                let ctx = RequestContext::new(head, params, self.state().clone(), body);
80                let future: Pin<
81                    Box<dyn std::future::Future<Output = crate::error::Result<Response>> + Send>,
82                > = if tracing_enabled {
83                    let meta = request_meta.as_ref().expect("request meta for tracing");
84                    let span = tracing::info_span!(
85                        "request",
86                        method = %meta.method,
87                        route = %meta.route_path,
88                        request_id = meta.request_id.as_deref().unwrap_or("")
89                    );
90                    Box::pin(handler(ctx).instrument(span))
91                } else {
92                    Box::pin(handler(ctx))
93                };
94
95                // With the panic boundary enabled, a handler panic becomes a 500
96                // (and fires the panic hooks) instead of tearing down the task.
97                let result = if self.catch_panics() {
98                    match AssertUnwindSafe(future).catch_unwind().await {
99                        Ok(result) => result,
100                        Err(payload) => {
101                            if let Some(info) = &info {
102                                self.fire_panic(info, &panic_message(payload.as_ref()))
103                                    .await;
104                            }
105                            return Error::internal("handler panicked").into_response();
106                        }
107                    }
108                } else {
109                    future.await
110                };
111
112                let response = match result {
113                    Ok(response) => response,
114                    Err(error) => match &info {
115                        Some(info) => self.render_error(error, info, Some(route)).await,
116                        None => error.into_response(),
117                    },
118                };
119
120                // Scoped on_response fires before unwinding back through middleware.
121                if let Some(info) = &info {
122                    let elapsed = started.map(|start| start.elapsed()).unwrap_or_default();
123                    fire_response_hooks(route.response_hooks(), info, response.status(), elapsed)
124                        .await;
125                }
126
127                // The automatic HTTP request-completed log.
128                if self.request_logs() {
129                    let meta = request_meta.as_ref().expect("request meta for request log");
130                    let status = response.status().as_u16();
131                    let mut logger = Logger::framework("HTTP");
132                    if let Some(request_id) = &meta.request_id {
133                        logger = logger.with_field("request_id", request_id.clone());
134                    }
135                    logger
136                        .info(format!("{} {} {status}", meta.method, meta.request_path))
137                        .field("method", meta.method.as_str())
138                        .field("path", &meta.request_path)
139                        .field("route", &meta.route_path)
140                        .field("status", status)
141                        .field("duration_ms", request_started.elapsed().as_millis() as u64)
142                        .emit();
143                }
144
145                response
146            }
147            Match::MethodNotAllowed => {
148                let info =
149                    needs_info.then(|| request_info(&head.method, &head.uri, &head.headers, None));
150                self.finish_error(Error::method_not_allowed("method not allowed"), info)
151                    .await
152            }
153            Match::NotFound => {
154                let info =
155                    needs_info.then(|| request_info(&head.method, &head.uri, &head.headers, None));
156                self.finish_error(Error::not_found("resource not found"), info)
157                    .await
158            }
159        }
160    }
161
162    /// Routes a WebSocket upgrade request over an in-process duplex stream.
163    ///
164    /// Used by the test client: the matched handler reads the duplex instead of a
165    /// real upgraded socket. Returns the handler's response, which is a `101` on a
166    /// successful handshake or an error response if the handshake or a dependency
167    /// is rejected before the upgrade. (The caller, the test client, lands in a
168    /// later commit of this phase.)
169    #[allow(dead_code)]
170    pub(crate) async fn dispatch_upgrade(
171        &self,
172        request: Request<ReqBody>,
173        duplex: tokio::io::DuplexStream,
174    ) -> Response {
175        let (head, body) = request.into_parts();
176        let path = head.uri.path().to_owned();
177
178        match self.matcher().find(&head.method, &path) {
179            Match::Found { route, params } => {
180                let handler = route.handler().clone();
181                let ctx = RequestContext::with_duplex_upgrade(
182                    head,
183                    params,
184                    self.state().clone(),
185                    body,
186                    duplex,
187                );
188                match handler(ctx).await {
189                    Ok(response) => response,
190                    Err(error) => error.into_response(),
191                }
192            }
193            Match::MethodNotAllowed => {
194                Error::method_not_allowed("method not allowed").into_response()
195            }
196            Match::NotFound => Error::not_found("resource not found").into_response(),
197        }
198    }
199
200    /// Renders a routing error (no matched route), running the app-global hooks
201    /// when request metadata is present.
202    async fn finish_error(&self, error: Error, info: Option<RequestInfo>) -> Response {
203        match info {
204            Some(info) => self.render_error(error, &info, None).await,
205            None => error.into_response(),
206        }
207    }
208}
209
210struct DispatchMeta {
211    method: http::Method,
212    request_path: String,
213    route_path: String,
214    request_id: Option<String>,
215}
216
217/// Renders a caught panic payload as text for the [`on_panic`](crate::App::on_panic)
218/// hooks.
219///
220/// A panic payload is typically a `&str` or `String`; anything else is reported
221/// generically. The message is truncated to 1024 characters to limit the data
222/// exposed to hooks in case the panic message contains sensitive information.
223fn panic_message(payload: &(dyn Any + Send)) -> String {
224    let max_len = 1024;
225    if let Some(message) = payload.downcast_ref::<&str>() {
226        let s = (*message).to_owned();
227        if s.len() > max_len {
228            s[..max_len].to_owned()
229        } else {
230            s
231        }
232    } else if let Some(message) = payload.downcast_ref::<String>() {
233        if message.len() > max_len {
234            message[..max_len].to_owned()
235        } else {
236            message.clone()
237        }
238    } else {
239        "panic".to_owned()
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use crate::app::{App, AppInner};
246    use crate::body::{box_body, ReqBody};
247    use crate::error::Result;
248    use crate::extract::RequestContext;
249    use crate::response::Response;
250    use crate::router::{BoxFuture, HandlerFn, Route, Router};
251    use crate::{json_response, Method, StatusCode};
252
253    use bytes::Bytes;
254    use http_body_util::{BodyExt, Full};
255    use std::sync::Arc;
256
257    fn echo_id_handler() -> HandlerFn {
258        Arc::new(
259            |ctx: RequestContext| -> BoxFuture<'static, Result<Response>> {
260                Box::pin(async move {
261                    let id = ctx.path_param("user_id").unwrap_or_default().to_owned();
262                    Ok(json_response(
263                        StatusCode::OK,
264                        &serde_json::json!({ "id": id }),
265                    ))
266                })
267            },
268        )
269    }
270
271    fn test_app() -> AppInner {
272        let router = Router::new().prefix("/users").route(Route::new(
273            Method::GET,
274            "/{user_id}",
275            echo_id_handler(),
276        ));
277        App::new().include_router(router).build().unwrap()
278    }
279
280    fn request(method: Method, uri: &str) -> http::Request<ReqBody> {
281        http::Request::builder()
282            .method(method)
283            .uri(uri)
284            .body(box_body(Full::new(Bytes::new())))
285            .unwrap()
286    }
287
288    async fn body_to_string(response: Response) -> String {
289        let bytes = response.into_body().collect().await.unwrap().to_bytes();
290        String::from_utf8(bytes.to_vec()).unwrap()
291    }
292
293    #[tokio::test]
294    async fn dispatches_to_matching_route() {
295        let response = test_app().dispatch(request(Method::GET, "/users/42")).await;
296        assert_eq!(response.status(), StatusCode::OK);
297        assert!(body_to_string(response).await.contains("\"42\""));
298    }
299
300    #[tokio::test]
301    async fn unknown_path_yields_not_found() {
302        let response = test_app().dispatch(request(Method::GET, "/nope")).await;
303        assert_eq!(response.status(), StatusCode::NOT_FOUND);
304    }
305
306    #[tokio::test]
307    async fn wrong_method_yields_method_not_allowed() {
308        let response = test_app()
309            .dispatch(request(Method::POST, "/users/42"))
310            .await;
311        assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
312    }
313
314    #[test]
315    fn panic_message_handles_str_payload() {
316        use super::panic_message;
317        let payload: Box<dyn std::any::Any + Send> = Box::new("oops");
318        assert_eq!(panic_message(&*payload), "oops");
319    }
320
321    #[test]
322    fn panic_message_handles_string_payload() {
323        use super::panic_message;
324        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("from string"));
325        assert_eq!(panic_message(&*payload), "from string");
326    }
327
328    #[test]
329    fn panic_message_handles_unknown_payload() {
330        use super::panic_message;
331        let payload: Box<dyn std::any::Any + Send> = Box::new(42_i32);
332        assert_eq!(panic_message(&*payload), "panic");
333    }
334}