use super::host_router::FrozenHostRouter;
use super::middleware::{MiddlewareFn, Next, Terminal};
use super::request::{Params as RequestParams, RequestHead};
use super::stream::StreamResponse;
pub(super) use super::trie::Handler;
pub(super) use super::trie::SseHandler;
#[cfg(feature = "ws")]
pub(super) use super::trie::WsHandler;
use super::trie::{FrozenNode, RouteHandler, split_path_segments};
use super::{Request, Response};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, LazyLock};
pub(super) enum RouteClass {
Buffered,
StreamingProxy {
backend: Arc<str>,
prefix: Arc<str>,
params: RequestParams,
},
StreamingProxyUnhealthy,
HeadOnly,
}
#[cfg(feature = "grpc")]
pub use super::grpc_support::GrpcRouter;
static NOT_FOUND_HANDLER: LazyLock<Handler> = LazyLock::new(|| {
Box::new(|_req: &Request| {
Box::pin(async { Response::text_raw(404, "not found") })
as Pin<Box<dyn Future<Output = Response> + Send>>
})
});
pub(super) struct FrozenRouter {
pub(super) root: FrozenNode,
pub(super) middleware: Box<[MiddlewareFn]>,
pub(super) skip_middleware_for_internal: bool,
#[cfg(feature = "grpc")]
pub(super) grpc_router: Option<GrpcRouter>,
}
pub(super) enum DispatchResult {
Async(Pin<Box<dyn Future<Output = Response> + Send>>, Request),
Stream(
Pin<Box<dyn Future<Output = StreamResponse> + Send>>,
Request,
),
Sse(SseHandler, Request),
#[cfg(feature = "ws")]
WebSocket(WsHandler, Request),
#[cfg(feature = "ws")]
ProxyWebSocket(Request, Arc<str>, Arc<str>),
ProxyStream(Request, Arc<str>, Arc<str>),
}
impl DispatchResult {
pub(super) fn needs_middleware_gate(&self) -> bool {
match self {
Self::Stream(..) | Self::Sse(..) | Self::ProxyStream(..) => true,
#[cfg(feature = "ws")]
Self::WebSocket(..) | Self::ProxyWebSocket(..) => true,
Self::Async(..) => false,
}
}
#[cfg(feature = "ws")]
pub(super) fn is_websocket(&self) -> bool {
matches!(self, Self::WebSocket(..) | Self::ProxyWebSocket(..))
}
pub(super) fn request_ref(&self) -> &Request {
match self {
Self::Async(_, req)
| Self::Stream(_, req)
| Self::Sse(_, req)
| Self::ProxyStream(req, _, _) => req,
#[cfg(feature = "ws")]
Self::WebSocket(_, req) | Self::ProxyWebSocket(req, _, _) => req,
}
}
}
pub(super) struct GateCheck {
pub(super) reached: Arc<AtomicBool>,
pub(super) fut: Pin<Box<dyn Future<Output = Response> + Send>>,
}
impl FrozenRouter {
pub(super) fn classify_route(&self, head: &RequestHead<'_>) -> RouteClass {
let path = head.path();
let segments = match split_path_segments(path) {
Some(s) => s,
None => return RouteClass::Buffered,
};
match self.root.lookup(head.method(), path, &segments) {
Some((
RouteHandler::ProxyStream {
healthy,
backend,
prefix,
},
_,
)) if healthy.as_ref().is_some_and(|f| !f.load(Ordering::Relaxed)) => {
RouteClass::StreamingProxyUnhealthy
}
Some((
RouteHandler::ProxyStream {
backend, prefix, ..
},
params,
)) => RouteClass::StreamingProxy {
backend: Arc::clone(backend),
prefix: Arc::clone(prefix),
params,
},
Some((RouteHandler::Sse(_), _)) => RouteClass::HeadOnly,
#[cfg(feature = "ws")]
Some((RouteHandler::WebSocket(_), _)) => RouteClass::HeadOnly,
_ => RouteClass::Buffered,
}
}
pub(super) fn dispatch(&self, mut req: Request) -> DispatchResult {
let method = req.method_enum();
let path_owned: Box<str> = req.path().into();
let result = {
let segments = match split_path_segments(&path_owned) {
Some(s) => s,
None => {
let fut = Box::pin(async { Response::text_raw(414, "URI too long") });
return DispatchResult::Async(fut, req);
}
};
self.root.lookup(method, &path_owned, &segments)
};
match result {
Some((RouteHandler::Async(handler), params)) => {
req.set_params(params);
self.dispatch_async(handler, req)
}
Some((RouteHandler::Stream(handler), params)) => {
req.set_params(params);
let fut = handler(&req);
DispatchResult::Stream(fut, req)
}
Some((RouteHandler::Sse(handler), params)) => {
req.set_params(params);
DispatchResult::Sse(Arc::clone(handler), req)
}
#[cfg(feature = "ws")]
Some((RouteHandler::WebSocket(handler), params)) => {
req.set_params(params);
DispatchResult::WebSocket(Arc::clone(handler), req)
}
Some((RouteHandler::Proxy { healthy, .. }, _))
| Some((RouteHandler::ProxyStream { healthy, .. }, _))
if healthy.as_ref().is_some_and(|f| !f.load(Ordering::Relaxed)) =>
{
let fut = Box::pin(async { Response::text_raw(503, "service unavailable") });
DispatchResult::Async(fut, req)
}
Some((
RouteHandler::Proxy {
backend, prefix, ..
},
params,
)) => {
req.set_params(params);
dispatch_proxy_through_middleware(self, req, backend, prefix)
}
Some((
RouteHandler::ProxyStream {
backend, prefix, ..
},
params,
)) => {
req.set_params(params);
dispatch_proxy_stream(req, backend, prefix)
}
None => self.dispatch_async(&NOT_FOUND_HANDLER, req),
}
}
pub(super) fn dispatch_async(&self, handler: &Handler, req: Request) -> DispatchResult {
let terminal = Terminal::Handler(handler);
let next = Next::new(&self.middleware, terminal);
let fut = next.call(&req);
DispatchResult::Async(fut, req)
}
pub(super) fn middleware_gate(&self, req: &Request) -> Option<GateCheck> {
match self.middleware.is_empty() {
true => None,
false => {
let reached = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&reached);
let terminal = Terminal::Gate(flag);
let next = Next::new(&self.middleware, terminal);
let fut = next.call(req);
Some(GateCheck { reached, fut })
}
}
}
pub(super) fn middleware_gate_head(
&self,
head: &RequestHead<'_>,
params: Option<RequestParams>,
) -> Option<GateCheck> {
match self.middleware.is_empty() {
true => None,
false => {
let gate_req = head.to_gate_request(params);
self.middleware_gate(&gate_req)
}
}
}
}
#[cfg(feature = "ws")]
fn is_ws_upgrade(req: &Request) -> bool {
req.headers()
.any(|(k, v)| k.eq_ignore_ascii_case("upgrade") && v.eq_ignore_ascii_case("websocket"))
}
fn dispatch_proxy_through_middleware(
router: &FrozenRouter,
req: Request,
backend: &Arc<str>,
prefix: &Arc<str>,
) -> DispatchResult {
#[cfg(feature = "ws")]
if is_ws_upgrade(&req) {
return DispatchResult::ProxyWebSocket(req, Arc::clone(backend), Arc::clone(prefix));
}
let terminal = Terminal::Proxy {
backend: Arc::clone(backend),
prefix: Arc::clone(prefix),
};
let next = Next::new(&router.middleware, terminal);
let fut = next.call(&req);
DispatchResult::Async(fut, req)
}
fn dispatch_proxy_stream(req: Request, backend: &Arc<str>, prefix: &Arc<str>) -> DispatchResult {
#[cfg(feature = "ws")]
if is_ws_upgrade(&req) {
return DispatchResult::ProxyWebSocket(req, Arc::clone(backend), Arc::clone(prefix));
}
DispatchResult::ProxyStream(req, Arc::clone(backend), Arc::clone(prefix))
}
pub(super) fn gate_result(reached: Arc<AtomicBool>, resp: Response) -> Option<Response> {
match reached.load(Ordering::Acquire) {
true => None,
false => Some(resp),
}
}
pub(super) enum ServerDispatch {
Single(FrozenRouter),
Host(FrozenHostRouter),
}
impl ServerDispatch {
pub(super) fn classify_route(&self, head: &RequestHead<'_>) -> RouteClass {
let router = match self.resolve_from_head(head) {
Some(r) => r,
None => return RouteClass::Buffered,
};
router.classify_route(head)
}
fn resolve_from_head(&self, head: &RequestHead<'_>) -> Option<&FrozenRouter> {
match self {
Self::Single(router) => Some(router),
Self::Host(host_router) => host_router.resolve_from_head(head).ok().flatten(),
}
}
fn resolve(&self, req: &Request) -> Result<Option<&FrozenRouter>, Response> {
match self {
Self::Single(router) => Ok(Some(router)),
Self::Host(host_router) => host_router.resolve(req),
}
}
fn fallback(error_resp: Option<Response>, req: Request) -> DispatchResult {
let fut: Pin<Box<dyn Future<Output = Response> + Send>> = match error_resp {
None => Box::pin(async { Response::text_raw(404, "not found") }),
Some(resp) => Box::pin(async move { resp }),
};
DispatchResult::Async(fut, req)
}
pub(super) fn dispatch(&self, req: Request) -> DispatchResult {
match self.resolve(&req) {
Ok(Some(router)) => router.dispatch(req),
Ok(None) => Self::fallback(None, req),
Err(resp) => Self::fallback(Some(resp), req),
}
}
pub(super) fn middleware_gate(&self, req: &Request) -> Option<GateCheck> {
match self.resolve(req) {
Ok(Some(router)) => router.middleware_gate(req),
_ => None,
}
}
pub(super) fn middleware_gate_head(
&self,
head: &RequestHead<'_>,
params: Option<RequestParams>,
) -> Option<GateCheck> {
let router = self.resolve_from_head(head)?;
router.middleware_gate_head(head, params)
}
pub(super) fn dispatch_with_handler(&self, handler: &Handler, req: Request) -> DispatchResult {
match self.resolve(&req) {
Ok(Some(router)) => router.dispatch_async(handler, req),
Ok(None) => Self::fallback(None, req),
Err(resp) => Self::fallback(Some(resp), req),
}
}
pub(super) fn skip_middleware_for_internal(&self) -> bool {
match self {
Self::Single(router) => router.skip_middleware_for_internal,
Self::Host(_) => false,
}
}
#[cfg(feature = "grpc")]
pub(super) fn grpc_router(&self) -> Option<&super::grpc_support::GrpcRouter> {
match self {
Self::Single(router) => router.grpc_router.as_ref(),
Self::Host(_) => None,
}
}
}