Skip to main content

geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4mod tunneled_http;
5
6#[cfg(feature = "aws_lambda")]
7mod aws_lambda;
8
9use anyctx::AnyCtx;
10use anyhow::Context;
11
12#[cfg(feature = "aws_lambda")]
13use aws_lambda::AwsLambdaTransport;
14use fronted_http::FrontedHttpTransport;
15use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
16use itertools::Itertools;
17use nanorpc::{DynRpcTransport, JrpcRequest, JrpcResponse, RpcTransport};
18use priority_race::PriorityRaceTransport;
19use race::RaceTransport;
20use tunneled_http::TunneledHttpTransport;
21
22use serde::{Deserialize, Serialize};
23use sillad::tcp::TcpDialer;
24use std::{collections::BTreeMap, net::SocketAddr};
25
26use crate::{
27    client::{Config, CtxField},
28    control_prot::CURRENT_CONNECTED_INFOS,
29    timeout::{BROKER_RPC_TIMEOUT, RpcTransportExt},
30};
31
32#[derive(Serialize, Deserialize, Clone)]
33#[serde(rename_all = "snake_case")]
34pub enum BrokerSource {
35    Direct(String),
36    Fronted {
37        front: String,
38        host: String,
39        #[serde(default)]
40        override_dns: Option<Vec<SocketAddr>>,
41    },
42    DirectTcp(SocketAddr),
43    AwsLambda {
44        function_name: String,
45        region: String,
46        obfs_key: String,
47    },
48    Race(Vec<BrokerSource>),
49    PriorityRace(BTreeMap<u64, BrokerSource>),
50}
51
52#[derive(Serialize, Deserialize, Clone)]
53#[serde(rename_all = "snake_case")]
54pub enum TunneledBrokerSource {
55    Direct(String),
56}
57
58impl BrokerSource {
59    /// Converts to a RpcTransport.
60    pub fn rpc_transport(&self) -> DynRpcTransport {
61        match self {
62            BrokerSource::Direct(s) => DynRpcTransport::new(
63                FrontedHttpTransport {
64                    url: s.clone(),
65                    host: None,
66                    dns: None,
67                }
68                .timeout(BROKER_RPC_TIMEOUT),
69            ),
70            BrokerSource::DirectTcp(dest_addr) => DynRpcTransport::new(
71                nanorpc_sillad::DialerTransport(TcpDialer {
72                    dest_addr: *dest_addr,
73                })
74                .timeout(BROKER_RPC_TIMEOUT),
75            ),
76            BrokerSource::Fronted {
77                front,
78                host,
79                override_dns,
80            } => DynRpcTransport::new(
81                FrontedHttpTransport {
82                    url: front.clone(),
83                    host: Some(host.clone()),
84                    dns: override_dns.clone(),
85                }
86                .timeout(BROKER_RPC_TIMEOUT),
87            ),
88            #[cfg(feature = "aws_lambda")]
89            BrokerSource::AwsLambda {
90                function_name,
91                region,
92                obfs_key,
93            } => DynRpcTransport::new(
94                AwsLambdaTransport {
95                    function_name: function_name.clone(),
96                    region: region.clone(),
97                    obfs_key: obfs_key.clone(),
98                }
99                .timeout(BROKER_RPC_TIMEOUT),
100            ),
101            #[cfg(not(feature = "aws_lambda"))]
102            BrokerSource::AwsLambda { .. } => DynRpcTransport::new(
103                UnsupportedBrokerTransport("aws_lambda").timeout(BROKER_RPC_TIMEOUT),
104            ),
105            BrokerSource::Race(race_between) => {
106                let transports = race_between
107                    .iter()
108                    .map(|bs| bs.rpc_transport())
109                    .collect_vec();
110                DynRpcTransport::new(RaceTransport::new(transports))
111            }
112            BrokerSource::PriorityRace(inner) => {
113                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
114                DynRpcTransport::new(PriorityRaceTransport::new(inner))
115            }
116        }
117    }
118}
119
120impl TunneledBrokerSource {
121    fn rpc_transport(&self, ctx: &AnyCtx<Config>) -> DynRpcTransport {
122        match self {
123            TunneledBrokerSource::Direct(url) => DynRpcTransport::new(
124                TunneledHttpTransport::new(ctx.clone(), url.clone()).timeout(BROKER_RPC_TIMEOUT),
125            ),
126        }
127    }
128}
129
130struct UnsupportedBrokerTransport(&'static str);
131
132#[async_trait::async_trait]
133impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
134    type Error = anyhow::Error;
135
136    async fn call_raw(
137        &self,
138        _req: nanorpc::JrpcRequest,
139    ) -> Result<nanorpc::JrpcResponse, Self::Error> {
140        Err(anyhow::anyhow!(
141            "broker source '{}' is unsupported in this build",
142            self.0
143        ))
144    }
145}
146
147pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
148    ctx.get(BROKER_CLIENT).as_ref().context(
149        "broker information not provided, so cannot use any broker-dependent functionality",
150    )
151}
152
153static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
154    ctx.init().broker.as_ref().map(|src| {
155        let normal = src.rpc_transport();
156        let tunneled = ctx
157            .init()
158            .tunneled_broker
159            .as_ref()
160            .map(|src| src.rpc_transport(ctx));
161        BrokerClient::from(DynRpcTransport::new(SwitchingBrokerTransport {
162            normal,
163            tunneled,
164            is_connected: std::sync::Arc::new({
165                let ctx = ctx.clone();
166                move || !ctx.get(CURRENT_CONNECTED_INFOS).lock().is_empty()
167            }),
168        }))
169    })
170};
171
172struct SwitchingBrokerTransport {
173    normal: DynRpcTransport,
174    tunneled: Option<DynRpcTransport>,
175    is_connected: std::sync::Arc<dyn Fn() -> bool + Send + Sync>,
176}
177
178#[async_trait::async_trait]
179impl RpcTransport for SwitchingBrokerTransport {
180    type Error = anyhow::Error;
181
182    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
183        let should_try_tunneled = self.tunneled.is_some() && (self.is_connected)();
184        if should_try_tunneled {
185            let tunneled = self.tunneled.as_ref().unwrap();
186            match tunneled.call_raw(req.clone()).await {
187                Ok(response) => return Ok(response),
188                Err(err) => {
189                    tracing::warn!(
190                        err = debug(&err),
191                        "tunneled broker RPC failed, falling back"
192                    );
193                }
194            }
195        }
196        self.normal.call_raw(req).await
197    }
198}
199
200pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
201    let broker = broker_client(ctx).context("could not get broker client")?;
202    let net_status_response = broker
203        .get_net_status()
204        .await?
205        .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
206
207    // Verify the broker's signature over the net status:
208    let net_status_verified = net_status_response
209        .verify(DOMAIN_NET_STATUS, |their_pk| {
210            if let Some(broker_pk) = &ctx.init().broker_keys {
211                hex::encode(their_pk.as_bytes()) == broker_pk.master
212            } else {
213                tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
214                true
215            }
216        })
217        .context("could not verify net status")?;
218
219    Ok(net_status_verified)
220}
221
222#[cfg(test)]
223mod tests {
224    use super::BrokerSource;
225
226    #[test]
227    fn deserializes_aws_lambda_when_feature_disabled() {
228        let parsed: BrokerSource = serde_json::from_str(
229            r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
230        )
231        .unwrap();
232
233        match parsed {
234            BrokerSource::AwsLambda {
235                function_name,
236                region,
237                obfs_key,
238            } => {
239                assert_eq!(function_name, "f");
240                assert_eq!(region, "us-east-1");
241                assert_eq!(obfs_key, "k");
242            }
243            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
244        }
245    }
246
247    fn kind_of(source: &BrokerSource) -> &'static str {
248        match source {
249            BrokerSource::Direct(_) => "direct",
250            BrokerSource::Fronted { .. } => "fronted",
251            BrokerSource::DirectTcp(_) => "direct_tcp",
252            BrokerSource::AwsLambda { .. } => "aws_lambda",
253            BrokerSource::Race(_) => "race",
254            BrokerSource::PriorityRace(_) => "priority_race",
255        }
256    }
257}