Skip to main content

geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4mod tunneled_http;
5
6#[cfg(feature = "aws_lambda")]
7mod aws_lambda;
8
9use anyctx::AnyCtx;
10use anyhow::Context;
11
12#[cfg(feature = "aws_lambda")]
13use aws_lambda::AwsLambdaTransport;
14use fronted_http::FrontedHttpTransport;
15use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
16use itertools::Itertools;
17use nanorpc::{DynRpcTransport, JrpcRequest, JrpcResponse, RpcTransport};
18use priority_race::PriorityRaceTransport;
19use race::RaceTransport;
20use std::sync::atomic::Ordering;
21use tunneled_http::TunneledHttpTransport;
22
23use serde::{Deserialize, Serialize};
24use sillad::tcp::TcpDialer;
25use std::{collections::BTreeMap, net::SocketAddr};
26
27use crate::{
28    client::{Config, CtxField},
29    control_prot::CURRENT_ACTIVE_SESSIONS,
30    timeout::{BROKER_RPC_TIMEOUT, RpcTransportExt},
31};
32
33#[derive(Serialize, Deserialize, Clone)]
34#[serde(rename_all = "snake_case")]
35pub enum BrokerSource {
36    Direct(String),
37    Fronted {
38        front: String,
39        host: String,
40        #[serde(default)]
41        override_dns: Option<Vec<SocketAddr>>,
42    },
43    DirectTcp(SocketAddr),
44    AwsLambda {
45        function_name: String,
46        region: String,
47        obfs_key: String,
48    },
49    Race(Vec<BrokerSource>),
50    PriorityRace(BTreeMap<u64, BrokerSource>),
51}
52
53#[derive(Serialize, Deserialize, Clone)]
54#[serde(rename_all = "snake_case")]
55pub enum TunneledBrokerSource {
56    Direct(String),
57}
58
59impl BrokerSource {
60    /// Converts to a RpcTransport.
61    pub fn rpc_transport(&self) -> DynRpcTransport {
62        match self {
63            BrokerSource::Direct(s) => DynRpcTransport::new(
64                FrontedHttpTransport {
65                    url: s.clone(),
66                    host: None,
67                    dns: None,
68                }
69                .timeout(BROKER_RPC_TIMEOUT),
70            ),
71            BrokerSource::DirectTcp(dest_addr) => DynRpcTransport::new(
72                nanorpc_sillad::DialerTransport(TcpDialer {
73                    dest_addr: *dest_addr,
74                })
75                .timeout(BROKER_RPC_TIMEOUT),
76            ),
77            BrokerSource::Fronted {
78                front,
79                host,
80                override_dns,
81            } => DynRpcTransport::new(
82                FrontedHttpTransport {
83                    url: front.clone(),
84                    host: Some(host.clone()),
85                    dns: override_dns.clone(),
86                }
87                .timeout(BROKER_RPC_TIMEOUT),
88            ),
89            #[cfg(feature = "aws_lambda")]
90            BrokerSource::AwsLambda {
91                function_name,
92                region,
93                obfs_key,
94            } => DynRpcTransport::new(
95                AwsLambdaTransport {
96                    function_name: function_name.clone(),
97                    region: region.clone(),
98                    obfs_key: obfs_key.clone(),
99                }
100                .timeout(BROKER_RPC_TIMEOUT),
101            ),
102            #[cfg(not(feature = "aws_lambda"))]
103            BrokerSource::AwsLambda { .. } => DynRpcTransport::new(
104                UnsupportedBrokerTransport("aws_lambda").timeout(BROKER_RPC_TIMEOUT),
105            ),
106            BrokerSource::Race(race_between) => {
107                let transports = race_between
108                    .iter()
109                    .map(|bs| bs.rpc_transport())
110                    .collect_vec();
111                DynRpcTransport::new(RaceTransport::new(transports))
112            }
113            BrokerSource::PriorityRace(inner) => {
114                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
115                DynRpcTransport::new(PriorityRaceTransport::new(inner))
116            }
117        }
118    }
119}
120
121impl TunneledBrokerSource {
122    fn rpc_transport(&self, ctx: &AnyCtx<Config>) -> DynRpcTransport {
123        match self {
124            TunneledBrokerSource::Direct(url) => DynRpcTransport::new(
125                TunneledHttpTransport::new(ctx.clone(), url.clone()).timeout(BROKER_RPC_TIMEOUT),
126            ),
127        }
128    }
129}
130
131struct UnsupportedBrokerTransport(&'static str);
132
133#[async_trait::async_trait]
134impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
135    type Error = anyhow::Error;
136
137    async fn call_raw(
138        &self,
139        _req: nanorpc::JrpcRequest,
140    ) -> Result<nanorpc::JrpcResponse, Self::Error> {
141        Err(anyhow::anyhow!(
142            "broker source '{}' is unsupported in this build",
143            self.0
144        ))
145    }
146}
147
148pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
149    ctx.get(BROKER_CLIENT).as_ref().context(
150        "broker information not provided, so cannot use any broker-dependent functionality",
151    )
152}
153
154static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
155    ctx.init().broker.as_ref().map(|src| {
156        let normal = src.rpc_transport();
157        let tunneled = ctx
158            .init()
159            .tunneled_broker
160            .as_ref()
161            .map(|src| src.rpc_transport(ctx));
162        BrokerClient::from(DynRpcTransport::new(SwitchingBrokerTransport {
163            normal,
164            tunneled,
165            is_connected: std::sync::Arc::new({
166                let ctx = ctx.clone();
167                move || ctx.get(CURRENT_ACTIVE_SESSIONS).load(Ordering::SeqCst) > 0
168            }),
169        }))
170    })
171};
172
173struct SwitchingBrokerTransport {
174    normal: DynRpcTransport,
175    tunneled: Option<DynRpcTransport>,
176    is_connected: std::sync::Arc<dyn Fn() -> bool + Send + Sync>,
177}
178
179#[async_trait::async_trait]
180impl RpcTransport for SwitchingBrokerTransport {
181    type Error = anyhow::Error;
182
183    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
184        let should_try_tunneled = self.tunneled.is_some() && (self.is_connected)();
185        if should_try_tunneled {
186            let tunneled = self.tunneled.as_ref().unwrap();
187            match tunneled.call_raw(req.clone()).await {
188                Ok(response) => return Ok(response),
189                Err(err) => {
190                    tracing::warn!(
191                        err = debug(&err),
192                        "tunneled broker RPC failed, falling back"
193                    );
194                }
195            }
196        }
197        self.normal.call_raw(req).await
198    }
199}
200
201pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
202    let broker = broker_client(ctx).context("could not get broker client")?;
203    let net_status_response = broker
204        .get_net_status()
205        .await?
206        .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
207
208    // Verify the broker's signature over the net status:
209    let net_status_verified = net_status_response
210        .verify(DOMAIN_NET_STATUS, |their_pk| {
211            if let Some(broker_pk) = &ctx.init().broker_keys {
212                hex::encode(their_pk.as_bytes()) == broker_pk.master
213            } else {
214                tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
215                true
216            }
217        })
218        .context("could not verify net status")?;
219
220    Ok(net_status_verified)
221}
222
223#[cfg(test)]
224mod tests {
225    use super::BrokerSource;
226
227    #[test]
228    fn deserializes_aws_lambda_when_feature_disabled() {
229        let parsed: BrokerSource = serde_json::from_str(
230            r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
231        )
232        .unwrap();
233
234        match parsed {
235            BrokerSource::AwsLambda {
236                function_name,
237                region,
238                obfs_key,
239            } => {
240                assert_eq!(function_name, "f");
241                assert_eq!(region, "us-east-1");
242                assert_eq!(obfs_key, "k");
243            }
244            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
245        }
246    }
247
248    fn kind_of(source: &BrokerSource) -> &'static str {
249        match source {
250            BrokerSource::Direct(_) => "direct",
251            BrokerSource::Fronted { .. } => "fronted",
252            BrokerSource::DirectTcp(_) => "direct_tcp",
253            BrokerSource::AwsLambda { .. } => "aws_lambda",
254            BrokerSource::Race(_) => "race",
255            BrokerSource::PriorityRace(_) => "priority_race",
256        }
257    }
258}