geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::BrokerClient;
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Deserialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29    Direct(String),
30    Fronted {
31        front: String,
32        host: String,
33        #[serde(default)]
34        override_dns: Option<Vec<SocketAddr>>,
35    },
36    DirectTcp(SocketAddr),
37    #[cfg(feature = "aws_lambda")]
38    AwsLambda {
39        function_name: String,
40        region: String,
41        obfs_key: String,
42    },
43    Race(Vec<BrokerSource>),
44    PriorityRace(BTreeMap<u64, BrokerSource>),
45}
46
47impl BrokerSource {
48    /// Converts to a RpcTransport.
49    pub fn rpc_transport(&self) -> DynRpcTransport {
50        match self {
51            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
52                url: s.clone(),
53                host: None,
54                dns: None,
55            }),
56            BrokerSource::DirectTcp(dest_addr) => {
57                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
58                    dest_addr: *dest_addr,
59                }))
60            }
61            BrokerSource::Fronted {
62                front,
63                host,
64                override_dns,
65            } => DynRpcTransport::new(FrontedHttpTransport {
66                url: front.clone(),
67                host: Some(host.clone()),
68                dns: override_dns.clone(),
69            }),
70            #[cfg(feature = "aws_lambda")]
71            BrokerSource::AwsLambda {
72                function_name,
73                region,
74                obfs_key,
75            } => DynRpcTransport::new(AwsLambdaTransport {
76                function_name: function_name.clone(),
77                region: region.clone(),
78                obfs_key: obfs_key.clone(),
79            }),
80            BrokerSource::Race(race_between) => {
81                let transports = race_between
82                    .iter()
83                    .map(|bs| bs.rpc_transport())
84                    .collect_vec();
85                DynRpcTransport::new(RaceTransport::new(transports))
86            }
87            BrokerSource::PriorityRace(inner) => {
88                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
89                DynRpcTransport::new(PriorityRaceTransport::new(inner))
90            }
91        }
92    }
93}
94
95pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
96    ctx.get(BROKER_CLIENT).as_ref().context(
97        "broker information not provided, so cannot use any broker-dependent functionality",
98    )
99}
100
101static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
102    ctx.init()
103        .broker
104        .as_ref()
105        .map(|src| BrokerClient::from(src.rpc_transport()))
106};