monolake_services/http/handlers/
route.rs

1//! Routing and request handling for a service-oriented architecture.
2//!
3//! This module provides components for routing HTTP requests to appropriate upstream
4//! servers based on configured routes. It is designed to work with the `service_async`
5//! crate, implementing its [`Service`] and [`MakeService`] traits for seamless integration
6//! into service stacks.
7//!
8//! # Key Components
9//!
10//! - [`RewriteAndRouteHandler`]: The main service component responsible for routing requests.
11//! - [`RewriteAndRouteHandlerFactory`]: A factory for creating and updating
12//!   `RewriteAndRouteHandler` instances.
13//! - [`RouteConfig`]: Configuration structure for defining routes and their associated upstreams.
14//! - [`Upstream`]: Represents an upstream server configuration.
15//!
16//! # Architecture
17//!
18//! The routing system is built around the following workflow:
19//!
20//! 1. A `RewriteAndRouteHandler` is created by its factory, initialized with a set of routes.
21//! 2. Incoming requests are matched against these routes using a [`matchit::Router`].
22//! 3. When a match is found, an upstream server is selected (with support for load balancing).
23//! 4. The request is rewritten as necessary for the selected upstream.
24//! 5. The rewritten request is passed to an inner handler for further processing
25//!
26//! # Usage
27//!
28//! This module is typically used as part of a larger service stack. Here's a basic example:
29//!
30//! ```rust
31//! use monolake_services::{
32//!     common::ContextService,
33//!     http::{
34//!         core::HttpCoreService,
35//!         detect::H2Detect,
36//!         handlers::{
37//!             route::RouteConfig, ConnectionReuseHandler, ContentHandler, RewriteAndRouteHandler,
38//!             UpstreamHandler,
39//!         },
40//!         HttpServerTimeout,
41//!     },
42//! };
43//! use service_async::{layer::FactoryLayer, stack::FactoryStack, Param};
44//!
45//! // Dummy struct to satisfy Param trait requirements
46//! struct DummyConfig;
47//!
48//! // Implement Param for DummyConfig to return Vec<RouteConfig>
49//! impl Param<Vec<RouteConfig>> for DummyConfig {
50//!     fn param(&self) -> Vec<RouteConfig> {
51//!         vec![]
52//!     }
53//! }
54//! impl Param<HttpServerTimeout> for DummyConfig {
55//!     fn param(&self) -> HttpServerTimeout {
56//!         HttpServerTimeout::default()
57//!     }
58//! }
59//!
60//! let config = DummyConfig;
61//! let stacks = FactoryStack::new(config)
62//!     .replace(UpstreamHandler::factory(
63//!         Default::default(),
64//!         Default::default(),
65//!     ))
66//!     .push(ContentHandler::layer())
67//!     .push(RewriteAndRouteHandler::layer())
68//!     .push(ConnectionReuseHandler::layer())
69//!     .push(HttpCoreService::layer())
70//!     .push(H2Detect::layer());
71//!
72//! // Use the service to handle HTTP requests
73//! ```
74//!
75//! # Configuration
76//!
77//! Routing is configured through [`RouteConfig`] structures, which define paths and their
78//! associated upstreams. These configurations can be dynamically updated by recreating
79//! the handler through its factory.
80//!
81//! # Error Handling
82//!
83//! - Routing errors (no matching route) result in a 404 Not Found response.
84//! - Other errors are propagated from the inner handler.
85//!
86//! # Performance Considerations
87//!
88//! - The module uses [`matchit::Router`] for efficient path matching.
89//! - Upstream selection supports weighted load balancing.
90//!
91//! # Feature Flags
92//!
93//! - `tls`: Enables TLS support for upstream connections.
94//!
95//! # Future Directions
96//!
97//! - Support for more advanced routing patterns (e.g., regex-based routing).
98//! - Enhanced metrics and logging for better observability.
99//! - Integration with service discovery systems for dynamic upstream management.
100use http::{uri::Scheme, HeaderValue, Request, StatusCode};
101use matchit::Router;
102use monoio_http::common::body::FixedBody;
103use monolake_core::{
104    http::{HttpHandler, ResponseWithContinue},
105    util::uri_serde,
106    AnyError,
107};
108use serde::{Deserialize, Serialize};
109use service_async::{
110    layer::{layer_fn, FactoryLayer},
111    AsyncMakeService, MakeService, Param, Service,
112};
113use tracing::debug;
114
115use crate::http::generate_response;
116
117/// A handler that routes incoming requests to appropriate upstream servers based on configured
118/// routes.
119///
120/// [`RewriteAndRouteHandler`] is responsible for matching incoming request paths against a set of
121/// predefined routes, selecting an appropriate upstream server, and forwarding the request to that
122/// server. It implements the `Service` trait from the `service_async` crate, providing an
123/// asynchronous request handling mechanism.
124///
125/// # Type Parameters
126///
127/// - `H`: The type of the inner handler, which must implement `HttpHandler`.
128///
129/// # Fields
130///
131/// - `inner`: The inner handler that processes requests after routing.
132/// - `router`: A `matchit::Router` containing the routing configuration.
133///
134/// # Usage
135///
136/// This handler is typically created using the [`RewriteAndRouteHandlerFactory`], which allows for
137/// dynamic creation and updates of the routing configuration. It can be integrated into a
138/// service stack using the `layer` method, enabling composition with other services.
139///
140///
141/// # Service Implementation
142///
143/// The `call` method of this handler performs the following steps:
144/// 1. Extracts the path from the incoming request.
145/// 2. Matches the path against the configured routes.
146/// 3. If a match is found:
147///    - Selects an upstream server from the matched route.
148///    - Rewrites the request for the selected upstream.
149///    - Forwards the request to the inner handler.
150/// 4. If no match is found, returns a 404 Not Found response.
151///
152/// # Performance Considerations
153///
154/// This handler uses [`matchit::Router`] for efficient path matching, which is generally
155/// faster than iterative matching for a large number of routes.
156#[derive(Clone)]
157pub struct RewriteAndRouteHandler<H> {
158    inner: H,
159    router: Router<RouteConfig>,
160}
161
162impl<H, CX, B> Service<(Request<B>, CX)> for RewriteAndRouteHandler<H>
163where
164    H: HttpHandler<CX, B>,
165    H::Body: FixedBody,
166{
167    type Response = ResponseWithContinue<H::Body>;
168    type Error = H::Error;
169
170    async fn call(
171        &self,
172        (mut request, ctx): (Request<B>, CX),
173    ) -> Result<Self::Response, Self::Error> {
174        let req_path = request.uri().path();
175        tracing::info!("request path: {req_path}");
176
177        match self.router.at(req_path) {
178            Ok(route) => {
179                let route = route.value;
180                tracing::info!("the route id: {}", route.id);
181                if route.upstreams.len() == 1 {
182                    rewrite_request(&mut request, &route.upstreams[0]);
183                    return self.inner.handle(request, ctx).await;
184                }
185                use rand::seq::SliceRandom;
186                let upstream = route
187                    .upstreams
188                    .choose(&mut rand::thread_rng())
189                    .expect("empty upstream list");
190
191                rewrite_request(&mut request, upstream);
192
193                self.inner.handle(request, ctx).await
194            }
195            Err(e) => {
196                debug!("match request uri: {} with error: {e}", request.uri());
197                Ok((generate_response(StatusCode::NOT_FOUND, false), true))
198            }
199        }
200    }
201}
202
203/// Factory for creating [`RewriteAndRouteHandler`] instances.
204///
205/// This factory implements the [`MakeService`] &
206/// [`AsyncMakeService`] trait, allowing for dynamic creation and updates of
207/// `RewriteAndRouteHandler` instances. It's designed to work with the `service_async` crate's
208/// compositional model.
209pub struct RewriteAndRouteHandlerFactory<F> {
210    inner: F,
211    routes: Vec<RouteConfig>,
212}
213
214#[derive(thiserror::Error, Debug)]
215pub enum RoutingFactoryError<E> {
216    #[error("inner error: {0:?}")]
217    Inner(E),
218    #[error("empty upstream")]
219    EmptyUpstream,
220    #[error("router error: {0:?}")]
221    Router(#[from] matchit::InsertError),
222}
223
224impl<F: MakeService> MakeService for RewriteAndRouteHandlerFactory<F> {
225    type Service = RewriteAndRouteHandler<F::Service>;
226    type Error = RoutingFactoryError<F::Error>;
227
228    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
229        let mut router: Router<RouteConfig> = Router::new();
230        for route in self.routes.iter() {
231            router.insert(&route.path, route.clone())?;
232            if route.upstreams.is_empty() {
233                return Err(RoutingFactoryError::EmptyUpstream);
234            }
235        }
236        Ok(RewriteAndRouteHandler {
237            inner: self
238                .inner
239                .make_via_ref(old.map(|o| &o.inner))
240                .map_err(RoutingFactoryError::Inner)?,
241            router,
242        })
243    }
244}
245
246impl<F: AsyncMakeService> AsyncMakeService for RewriteAndRouteHandlerFactory<F>
247where
248    F::Error: Into<AnyError>,
249{
250    type Service = RewriteAndRouteHandler<F::Service>;
251    type Error = RoutingFactoryError<F::Error>;
252
253    async fn make_via_ref(
254        &self,
255        old: Option<&Self::Service>,
256    ) -> Result<Self::Service, Self::Error> {
257        let mut router: Router<RouteConfig> = Router::new();
258        for route in self.routes.iter() {
259            router.insert(&route.path, route.clone())?;
260            if route.upstreams.is_empty() {
261                return Err(RoutingFactoryError::EmptyUpstream);
262            }
263        }
264        Ok(RewriteAndRouteHandler {
265            inner: self
266                .inner
267                .make_via_ref(old.map(|o| &o.inner))
268                .await
269                .map_err(RoutingFactoryError::Inner)?,
270            router,
271        })
272    }
273}
274
275const fn default_weight() -> u16 {
276    1
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
280#[serde(rename_all = "UPPERCASE")]
281#[derive(Default)]
282pub enum HttpVersion {
283    #[default]
284    HTTP1_1,
285    HTTP2,
286}
287
288impl HttpVersion {
289    pub fn convert_to_http_version(&self) -> http::Version {
290        match self {
291            HttpVersion::HTTP1_1 => http::Version::HTTP_11,
292            HttpVersion::HTTP2 => http::Version::HTTP_2,
293        }
294    }
295}
296
297/// Configuration for a single route in the routing system.
298///
299/// This structure defines how a particular path should be routed to one or more upstream servers.
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct RouteConfig {
302    /// Unique identifier for the route.
303    #[serde(skip)]
304    pub id: String,
305
306    /// The path pattern to match incoming requests against.
307    ///
308    /// This can be an exact path or a pattern supported by the routing system.
309    pub path: String,
310
311    /// A list of upstream servers that can handle requests matching this route.
312    ///
313    /// Multiple upstreams allow for load balancing and failover configurations.
314    pub upstreams: Vec<Upstream>,
315}
316
317/// Configuration for an upstream server.
318///
319/// This structure defines the properties of a single upstream server,
320/// including its endpoint, weight for load balancing, and HTTP version.
321#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct Upstream {
323    /// The endpoint of the upstream server.
324    pub endpoint: Endpoint,
325
326    /// The weight of this upstream for load balancing purposes.
327    ///
328    /// A higher weight means the upstream is more likely to be chosen when distributing requests.
329    /// If not specified, it defaults to a value provided by the `default_weight` function.
330    #[serde(default = "default_weight")]
331    pub weight: u16,
332}
333
334/// Represents different types of endpoints for upstream servers.
335///
336/// This enum allows for flexibility in specifying how to connect to an upstream server,
337/// supporting various protocols and addressing methods.
338#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
339#[serde(tag = "type", content = "value", rename_all = "snake_case")]
340pub enum Endpoint {
341    /// A URI endpoint.
342    ///
343    /// This variant uses custom serialization/deserialization logic defined in `uri_serde`.
344    #[serde(with = "uri_serde")]
345    Uri(http::Uri),
346
347    /// A socket address endpoint.
348    ///
349    /// This can be used for direct IP:port addressing.
350    Socket(std::net::SocketAddr),
351
352    /// A Unix domain socket endpoint.
353    ///
354    /// This is typically used for local inter-process communication on Unix-like systems.
355    Unix(std::path::PathBuf),
356}
357
358impl<F> RewriteAndRouteHandler<F> {
359    pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = RewriteAndRouteHandlerFactory<F>>
360    where
361        C: Param<Vec<RouteConfig>>,
362    {
363        layer_fn(|c: &C, inner| {
364            let routes = c.param();
365            RewriteAndRouteHandlerFactory { inner, routes }
366        })
367    }
368}
369
370fn rewrite_request<B>(request: &mut Request<B>, upstream: &Upstream) {
371    let remote = match &upstream.endpoint {
372        Endpoint::Uri(uri) => uri,
373        _ => unimplemented!("not implement"),
374    };
375
376    if let Some(authority) = remote.authority() {
377        let header_value =
378            HeaderValue::from_str(authority.as_str()).unwrap_or(HeaderValue::from_static(""));
379        tracing::debug!(
380            "Request: {:?} -> {:?}",
381            request.headers().get(http::header::HOST),
382            header_value
383        );
384
385        request.headers_mut().remove(http::header::HOST);
386
387        request
388            .headers_mut()
389            .insert(http::header::HOST, header_value);
390
391        let scheme = match remote.scheme() {
392            Some(scheme) => scheme.to_owned(),
393            None => Scheme::HTTP,
394        };
395
396        let uri = request.uri_mut();
397        let path_and_query = match uri.path_and_query() {
398            Some(path_and_query) => match path_and_query.query() {
399                Some(query) => format!("{}?{}", remote.path(), query),
400                None => String::from(remote.path()),
401            },
402            None => "/".to_string(),
403        };
404        *uri = http::Uri::builder()
405            .authority(authority.to_owned())
406            .scheme(scheme)
407            .path_and_query(path_and_query)
408            .build()
409            .unwrap();
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use std::time::SystemTime;
416
417    use super::*;
418
419    fn iterate_match<'a>(req_path: &str, routes: &'a [RouteConfig]) -> Option<&'a RouteConfig> {
420        let mut target_route = None;
421        let mut route_len = 0;
422        for route in routes.iter() {
423            let route_path = &route.path;
424            let route_path_len = route_path.len();
425            if req_path.starts_with(route_path) && route_path_len > route_len {
426                target_route = Some(route);
427                route_len = route_path_len;
428            }
429        }
430        target_route
431    }
432
433    fn create_routes() -> impl Iterator<Item = RouteConfig> {
434        let total_routes = 1024 * 100;
435        (0..total_routes).map(|n| RouteConfig {
436            id: "testroute".to_string(),
437            path: format!("/{n}"),
438            upstreams: Vec::from([Upstream {
439                endpoint: Endpoint::Uri(format!("http://test{n}.endpoint").parse().unwrap()),
440                weight: Default::default(),
441            }]),
442        })
443    }
444
445    #[test]
446    fn test_iterate_match() {
447        let mut router: Router<RouteConfig> = Router::new();
448        create_routes().for_each(|route| router.insert(route.path.clone(), route).unwrap());
449        let routes: Vec<RouteConfig> = create_routes().collect();
450        let target_path = "/1024";
451
452        let current = SystemTime::now();
453        let iterate_route = iterate_match(target_path, &routes).unwrap();
454        let iterate_match_elapsed = current.elapsed().unwrap().as_micros();
455
456        let current = SystemTime::now();
457        let matchit_route = router.at(target_path).unwrap().value;
458        let matchit_match_elapsed = current.elapsed().unwrap().as_micros();
459
460        assert_eq!(
461            format!("{:?}", iterate_route),
462            format!("{:?}", matchit_route)
463        );
464        println!("{:?}", iterate_route);
465        assert!(matchit_match_elapsed < (iterate_match_elapsed / 100));
466    }
467}