Skip to main content

route_util/
http1_backend.rs

1use crate::pools_with_global_extra::PoolsWithGlobalExtra;
2use anyhow::anyhow;
3use detcd::{DestinationRule, Meta, ServiceState};
4use http_pool::body::{VariantBody, variant_body};
5use http_pool::http1::HttpPool;
6use http_pool::net_pool::{BackendState, Pool, Pools};
7use hyper::body::Incoming;
8use hyper::{Request, Response};
9use std::sync::Arc;
10
11#[macro_export]
12macro_rules! to_backend {
13    (
14         $pools: ident,
15         $req: ident,
16         $service: ident,
17         $service_id: ident
18     ) => {
19        let (pool, meta) = get_pool_and_meta(&$pools, &$service)?;
20
21        // 获取发送者
22        let mut sender = match meta.state {
23            Some(ServiceState::Fixed) => pool
24                .clone()
25                .target(get_fixed_address(&pool, $service_id, meta.instances)?.get_address())
26                .await
27                .map_err(|e| anyhow::Error::new(e))?,
28            _ => pool
29                .get(parse_rule(&$req, &meta)?)
30                .await
31                .map_err(|e| anyhow::Error::new(e))?,
32        };
33
34        let uri = sender
35            .new_uri($req.uri())
36            .map_err(|e| anyhow::Error::new(e))?;
37
38        *$req.uri_mut() = uri;
39
40        sender
41            .send_request($req.map(|b| variant_body(b)))
42            .await
43            .map(|b| b.map(|b| variant_body(b)))
44            .map_err(|e| anyhow::Error::new(e))
45    };
46}
47
48pub(crate) use to_backend;
49
50pub async fn send_to_backend(
51    pools: Pools<http_pool::http1::Pool>,
52    service: String,
53    service_id: Option<u32>,
54    mut req: Request<Incoming>,
55) -> Result<Response<VariantBody>, anyhow::Error> {
56    to_backend! {
57        pools,
58        req,
59        service,
60        service_id
61    }
62}
63
64pub(crate) fn get_pool_and_meta<P: Pool>(
65    pools: &Pools<P>,
66    service: &String,
67) -> Result<(Arc<P>, Arc<Meta>), anyhow::Error> {
68    // 获取连接池
69    let pool = pools
70        .get_pool(&service)
71        .ok_or(anyhow!(format!("service/{} not exist", service)))?;
72
73    // 获取meta
74    let meta = pools
75        .read_global_meta(&service)
76        .ok_or(anyhow!(format!("service/{} no meta", service)))?;
77
78    Ok((pool, meta))
79}
80
81pub(crate) fn get_fixed_address<P: Pool>(
82    pool: &Arc<P>,
83    service_id: Option<u32>,
84    instances: u32,
85) -> Result<BackendState, anyhow::Error> {
86    let service_id = service_id
87        .filter(|&id| id < instances)
88        .ok_or_else(|| anyhow!("invalid target service id/{:?}", service_id))?;
89
90    let bs = pool
91        .get_backend_by_id(service_id)
92        .ok_or(anyhow!("target service id/{} not exist", service_id))?;
93
94    Ok(bs)
95}
96
97pub(crate) fn parse_rule<'a>(
98    req: &'a Request<Incoming>,
99    meta: &'a Meta,
100) -> Result<&'a str, anyhow::Error> {
101    let rule = match meta.destination_rule {
102        DestinationRule::Path(_) => req.uri().path(),
103        DestinationRule::Header(ref h, _s) => req
104            .headers()
105            .get(h)
106            .ok_or(anyhow!(format!("head/{} not exist", h)))?
107            .to_str()
108            .map_err(|_| anyhow!("head/{} not legal", h))?,
109    };
110    Ok(rule)
111}