route_util/
http1_backend.rs1use crate::pools_with_global_extra::PoolsWithGlobalExtra;
2use anyhow::anyhow;
3use detcd::{DestinationRule, Meta, ServiceState};
4use http_pool::body::{VariantBody, variant_body};
5use http_pool::http1::HttpPool;
6use http_pool::net_pool::{BackendState, Pool, Pools};
7use hyper::body::Incoming;
8use hyper::{Request, Response};
9use std::sync::Arc;
10
11#[macro_export]
12macro_rules! to_backend {
13 (
14 $pools: ident,
15 $req: ident,
16 $service: ident,
17 $service_id: ident
18 ) => {
19 let (pool, meta) = get_pool_and_meta(&$pools, &$service)?;
20
21 let mut sender = match meta.state {
23 Some(ServiceState::Fixed) => pool
24 .clone()
25 .target(get_fixed_address(&pool, $service_id, meta.instances)?.get_address())
26 .await
27 .map_err(|e| anyhow::Error::new(e))?,
28 _ => pool
29 .get(parse_rule(&$req, &meta)?)
30 .await
31 .map_err(|e| anyhow::Error::new(e))?,
32 };
33
34 let uri = sender
35 .new_uri($req.uri())
36 .map_err(|e| anyhow::Error::new(e))?;
37
38 *$req.uri_mut() = uri;
39
40 sender
41 .send_request($req.map(|b| variant_body(b)))
42 .await
43 .map(|b| b.map(|b| variant_body(b)))
44 .map_err(|e| anyhow::Error::new(e))
45 };
46}
47
48pub(crate) use to_backend;
49
50pub async fn send_to_backend(
51 pools: Pools<http_pool::http1::Pool>,
52 service: String,
53 service_id: Option<u32>,
54 mut req: Request<Incoming>,
55) -> Result<Response<VariantBody>, anyhow::Error> {
56 to_backend! {
57 pools,
58 req,
59 service,
60 service_id
61 }
62}
63
64pub(crate) fn get_pool_and_meta<P: Pool>(
65 pools: &Pools<P>,
66 service: &String,
67) -> Result<(Arc<P>, Arc<Meta>), anyhow::Error> {
68 let pool = pools
70 .get_pool(&service)
71 .ok_or(anyhow!(format!("service/{} not exist", service)))?;
72
73 let meta = pools
75 .read_global_meta(&service)
76 .ok_or(anyhow!(format!("service/{} no meta", service)))?;
77
78 Ok((pool, meta))
79}
80
81pub(crate) fn get_fixed_address<P: Pool>(
82 pool: &Arc<P>,
83 service_id: Option<u32>,
84 instances: u32,
85) -> Result<BackendState, anyhow::Error> {
86 let service_id = service_id
87 .filter(|&id| id < instances)
88 .ok_or_else(|| anyhow!("invalid target service id/{:?}", service_id))?;
89
90 let bs = pool
91 .get_backend_by_id(service_id)
92 .ok_or(anyhow!("target service id/{} not exist", service_id))?;
93
94 Ok(bs)
95}
96
97pub(crate) fn parse_rule<'a>(
98 req: &'a Request<Incoming>,
99 meta: &'a Meta,
100) -> Result<&'a str, anyhow::Error> {
101 let rule = match meta.destination_rule {
102 DestinationRule::Path(_) => req.uri().path(),
103 DestinationRule::Header(ref h, _s) => req
104 .headers()
105 .get(h)
106 .ok_or(anyhow!(format!("head/{} not exist", h)))?
107 .to_str()
108 .map_err(|_| anyhow!("head/{} not legal", h))?,
109 };
110 Ok(rule)
111}