lynx_core/
schedular.rs

1use std::sync::Arc;
2
3use anyhow::{Error, Result};
4use glob_match::glob_match;
5use http::header::CONTENT_TYPE;
6use http::status;
7use http_body_util::combinators::BoxBody;
8use hyper::body::{Bytes, Incoming};
9use hyper::{Method, Request, Response};
10use nanoid::nanoid;
11use tracing::{debug, info, trace};
12
13use crate::entities::app_config::{get_app_config, get_enabled_ssl_config, SSLConfigRule};
14use crate::proxy::http_proxy::proxy_http_request;
15use crate::proxy::https_proxy::https_proxy;
16use crate::self_service::{handle_self_service, match_self_service};
17use crate::tunnel_proxy::tunnel_proxy;
18use crate::utils::{full, is_http};
19
20pub fn get_req_trace_id(req: &Request<hyper::body::Incoming>) -> Arc<String> {
21    req.extensions()
22        .get::<Arc<String>>()
23        .map(Arc::clone)
24        .expect("trace id not found")
25}
26
27pub async fn capture_ssl(req: &Request<Incoming>) -> Result<bool> {
28    let app_config = get_app_config().await;
29    if !app_config.capture_ssl {
30        return Ok(false);
31    }
32    let (include, exclude) = get_enabled_ssl_config().await?;
33
34    let uri = req.uri();
35
36    let host = uri.host();
37    let port = uri.port_u16();
38
39    let match_host = |config: &SSLConfigRule, host: &str, port: u16| -> bool {
40        let glob_match_host = glob_match(&config.host, host);
41        trace!(
42            "matching host: {:?} {:?} {:?}",
43            config.host,
44            host,
45            glob_match_host
46        );
47        if !glob_match_host {
48            return false;
49        }
50        if matches!(config.port, Some(p) if p != port) {
51            return false;
52        }
53        true
54    };
55
56    match (host, port) {
57        (Some(host), Some(port)) => {
58            let include = include.iter().any(|config| match_host(config, host, port));
59            let exclude = exclude.iter().any(|config| match_host(config, host, port));
60            trace!("capture ssl: {:?} {:?} {:?}", include, exclude, uri);
61            Ok(include && !exclude)
62        }
63        _ => Ok(false),
64    }
65}
66
67pub async fn dispatch(
68    mut req: Request<hyper::body::Incoming>,
69) -> Result<Response<BoxBody<Bytes, Error>>> {
70    if match_self_service(&req) {
71        return handle_self_service(req).await;
72    }
73
74    info!("dispatching request {:?}", req.uri());
75    debug!("dispatching request {:?}", req);
76
77    req.extensions_mut().insert(Arc::new(nanoid!()));
78
79    if is_http(req.uri()) {
80        trace!("proxying http request {:?}", req);
81        return proxy_http_request(req).await;
82    }
83
84    if capture_ssl(&req).await? {
85        // TODO: support websocket
86        // let is_websocket = hyper_tungstenite::is_upgrade_request(&req);
87        // if is_websocket {
88        //     return WebsocketProxy {}.proxy(req).await;
89        // }
90        trace!("proxying https request {:?}", req);
91        if req.method() == Method::CONNECT {
92            return https_proxy(req).await;
93        }
94    } else {
95        trace!("tunnel proxy {:?}", req);
96        return tunnel_proxy(req).await;
97    }
98
99    Ok(Response::builder()
100        .status(status::StatusCode::NOT_FOUND)
101        .header(CONTENT_TYPE, "text/plain")
102        .body(full(Bytes::from(
103            "The service does not support the current protocol",
104        )))
105        .unwrap())
106}
107
108#[test]
109fn global_() {}