Skip to main content

geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4mod tunneled_http;
5
6#[cfg(feature = "aws_lambda")]
7mod aws_lambda;
8
9use anyctx::AnyCtx;
10use anyhow::Context;
11
12#[cfg(feature = "aws_lambda")]
13use aws_lambda::AwsLambdaTransport;
14use fronted_http::FrontedHttpTransport;
15use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
16use itertools::Itertools;
17use nanorpc::{DynRpcTransport, JrpcRequest, JrpcResponse, RpcTransport};
18use priority_race::PriorityRaceTransport;
19use race::RaceTransport;
20use smol_timeout2::TimeoutExt;
21use std::sync::atomic::Ordering;
22use tunneled_http::{TUNNELED_BROKER_TIMEOUT, TunneledHttpTransport};
23
24use serde::{Deserialize, Serialize};
25use sillad::tcp::TcpDialer;
26use std::{collections::BTreeMap, net::SocketAddr};
27
28use crate::{
29    client::{Config, CtxField},
30    control_prot::CURRENT_ACTIVE_SESSIONS,
31};
32
33#[derive(Serialize, Deserialize, Clone)]
34#[serde(rename_all = "snake_case")]
35pub enum BrokerSource {
36    Direct(String),
37    Fronted {
38        front: String,
39        host: String,
40        #[serde(default)]
41        override_dns: Option<Vec<SocketAddr>>,
42    },
43    DirectTcp(SocketAddr),
44    AwsLambda {
45        function_name: String,
46        region: String,
47        obfs_key: String,
48    },
49    Race(Vec<BrokerSource>),
50    PriorityRace(BTreeMap<u64, BrokerSource>),
51}
52
53#[derive(Serialize, Deserialize, Clone)]
54#[serde(rename_all = "snake_case")]
55pub enum TunneledBrokerSource {
56    Direct(String),
57}
58
59impl BrokerSource {
60    /// Converts to a RpcTransport.
61    pub fn rpc_transport(&self) -> DynRpcTransport {
62        match self {
63            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
64                url: s.clone(),
65                host: None,
66                dns: None,
67            }),
68            BrokerSource::DirectTcp(dest_addr) => {
69                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
70                    dest_addr: *dest_addr,
71                }))
72            }
73            BrokerSource::Fronted {
74                front,
75                host,
76                override_dns,
77            } => DynRpcTransport::new(FrontedHttpTransport {
78                url: front.clone(),
79                host: Some(host.clone()),
80                dns: override_dns.clone(),
81            }),
82            #[cfg(feature = "aws_lambda")]
83            BrokerSource::AwsLambda {
84                function_name,
85                region,
86                obfs_key,
87            } => DynRpcTransport::new(AwsLambdaTransport {
88                function_name: function_name.clone(),
89                region: region.clone(),
90                obfs_key: obfs_key.clone(),
91            }),
92            #[cfg(not(feature = "aws_lambda"))]
93            BrokerSource::AwsLambda { .. } => {
94                DynRpcTransport::new(UnsupportedBrokerTransport("aws_lambda"))
95            }
96            BrokerSource::Race(race_between) => {
97                let transports = race_between
98                    .iter()
99                    .map(|bs| bs.rpc_transport())
100                    .collect_vec();
101                DynRpcTransport::new(RaceTransport::new(transports))
102            }
103            BrokerSource::PriorityRace(inner) => {
104                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
105                DynRpcTransport::new(PriorityRaceTransport::new(inner))
106            }
107        }
108    }
109}
110
111impl TunneledBrokerSource {
112    fn rpc_transport(&self, ctx: &AnyCtx<Config>) -> DynRpcTransport {
113        match self {
114            TunneledBrokerSource::Direct(url) => {
115                DynRpcTransport::new(TunneledHttpTransport::new(ctx.clone(), url.clone()))
116            }
117        }
118    }
119}
120
121struct UnsupportedBrokerTransport(&'static str);
122
123#[async_trait::async_trait]
124impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
125    type Error = anyhow::Error;
126
127    async fn call_raw(
128        &self,
129        _req: nanorpc::JrpcRequest,
130    ) -> Result<nanorpc::JrpcResponse, Self::Error> {
131        Err(anyhow::anyhow!(
132            "broker source '{}' is unsupported in this build",
133            self.0
134        ))
135    }
136}
137
138pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
139    ctx.get(BROKER_CLIENT).as_ref().context(
140        "broker information not provided, so cannot use any broker-dependent functionality",
141    )
142}
143
144static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
145    ctx.init().broker.as_ref().map(|src| {
146        let normal = src.rpc_transport();
147        let tunneled = ctx
148            .init()
149            .tunneled_broker
150            .as_ref()
151            .map(|src| src.rpc_transport(ctx));
152        BrokerClient::from(DynRpcTransport::new(SwitchingBrokerTransport {
153            normal,
154            tunneled,
155            is_connected: std::sync::Arc::new({
156                let ctx = ctx.clone();
157                move || ctx
158                    .get(CURRENT_ACTIVE_SESSIONS)
159                    .load(Ordering::SeqCst)
160                    > 0
161            }),
162        }))
163    })
164};
165
166struct SwitchingBrokerTransport {
167    normal: DynRpcTransport,
168    tunneled: Option<DynRpcTransport>,
169    is_connected: std::sync::Arc<dyn Fn() -> bool + Send + Sync>,
170}
171
172#[async_trait::async_trait]
173impl RpcTransport for SwitchingBrokerTransport {
174    type Error = anyhow::Error;
175
176    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
177        let should_try_tunneled = self.tunneled.is_some() && (self.is_connected)();
178        if should_try_tunneled {
179            let tunneled = self.tunneled.as_ref().unwrap();
180            match tunneled
181                .call_raw(req.clone())
182                .timeout(TUNNELED_BROKER_TIMEOUT)
183                .await
184            {
185                Some(Ok(response)) => return Ok(response),
186                Some(Err(err)) => {
187                    tracing::warn!(err = debug(&err), "tunneled broker RPC failed, falling back");
188                }
189                None => {
190                    tracing::warn!("tunneled broker RPC timed out, falling back");
191                }
192            }
193        }
194        self.normal.call_raw(req).await
195    }
196}
197
198pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
199    let broker = broker_client(ctx).context("could not get broker client")?;
200    let net_status_response = broker
201        .get_net_status()
202        .await?
203        .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
204
205    // Verify the broker's signature over the net status:
206    let net_status_verified = net_status_response
207        .verify(DOMAIN_NET_STATUS, |their_pk| {
208            if let Some(broker_pk) = &ctx.init().broker_keys {
209                hex::encode(their_pk.as_bytes()) == broker_pk.master
210            } else {
211                tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
212                true
213            }
214        })
215        .context("could not verify net status")?;
216
217    Ok(net_status_verified)
218}
219
220#[cfg(test)]
221mod tests {
222    use super::BrokerSource;
223
224    #[test]
225    fn deserializes_aws_lambda_when_feature_disabled() {
226        let parsed: BrokerSource = serde_json::from_str(
227            r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
228        )
229        .unwrap();
230
231        match parsed {
232            BrokerSource::AwsLambda {
233                function_name,
234                region,
235                obfs_key,
236            } => {
237                assert_eq!(function_name, "f");
238                assert_eq!(region, "us-east-1");
239                assert_eq!(obfs_key, "k");
240            }
241            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
242        }
243    }
244
245    fn kind_of(source: &BrokerSource) -> &'static str {
246        match source {
247            BrokerSource::Direct(_) => "direct",
248            BrokerSource::Fronted { .. } => "fronted",
249            BrokerSource::DirectTcp(_) => "direct_tcp",
250            BrokerSource::AwsLambda { .. } => "aws_lambda",
251            BrokerSource::Race(_) => "race",
252            BrokerSource::PriorityRace(_) => "priority_race",
253        }
254    }
255}