Skip to main content

geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Deserialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29    Direct(String),
30    Fronted {
31        front: String,
32        host: String,
33        #[serde(default)]
34        override_dns: Option<Vec<SocketAddr>>,
35    },
36    DirectTcp(SocketAddr),
37    AwsLambda {
38        function_name: String,
39        region: String,
40        obfs_key: String,
41    },
42    Race(Vec<BrokerSource>),
43    PriorityRace(BTreeMap<u64, BrokerSource>),
44}
45
46impl BrokerSource {
47    /// Converts to a RpcTransport.
48    pub fn rpc_transport(&self) -> DynRpcTransport {
49        match self {
50            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
51                url: s.clone(),
52                host: None,
53                dns: None,
54            }),
55            BrokerSource::DirectTcp(dest_addr) => {
56                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
57                    dest_addr: *dest_addr,
58                }))
59            }
60            BrokerSource::Fronted {
61                front,
62                host,
63                override_dns,
64            } => DynRpcTransport::new(FrontedHttpTransport {
65                url: front.clone(),
66                host: Some(host.clone()),
67                dns: override_dns.clone(),
68            }),
69            #[cfg(feature = "aws_lambda")]
70            BrokerSource::AwsLambda {
71                function_name,
72                region,
73                obfs_key,
74            } => DynRpcTransport::new(AwsLambdaTransport {
75                function_name: function_name.clone(),
76                region: region.clone(),
77                obfs_key: obfs_key.clone(),
78            }),
79            #[cfg(not(feature = "aws_lambda"))]
80            BrokerSource::AwsLambda { .. } => {
81                DynRpcTransport::new(UnsupportedBrokerTransport("aws_lambda"))
82            }
83            BrokerSource::Race(race_between) => {
84                let transports = race_between
85                    .iter()
86                    .map(|bs| bs.rpc_transport())
87                    .collect_vec();
88                DynRpcTransport::new(RaceTransport::new(transports))
89            }
90            BrokerSource::PriorityRace(inner) => {
91                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
92                DynRpcTransport::new(PriorityRaceTransport::new(inner))
93            }
94        }
95    }
96}
97
98struct UnsupportedBrokerTransport(&'static str);
99
100#[async_trait::async_trait]
101impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
102    type Error = anyhow::Error;
103
104    async fn call_raw(
105        &self,
106        _req: nanorpc::JrpcRequest,
107    ) -> Result<nanorpc::JrpcResponse, Self::Error> {
108        Err(anyhow::anyhow!(
109            "broker source '{}' is unsupported in this build",
110            self.0
111        ))
112    }
113}
114
115pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
116    ctx.get(BROKER_CLIENT).as_ref().context(
117        "broker information not provided, so cannot use any broker-dependent functionality",
118    )
119}
120
121static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
122    ctx.init()
123        .broker
124        .as_ref()
125        .map(|src| BrokerClient::from(src.rpc_transport()))
126};
127
128pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
129    let broker = broker_client(ctx).context("could not get broker client")?;
130    let net_status_response = broker
131        .get_net_status()
132        .await?
133        .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
134
135    // Verify the broker's signature over the net status:
136    let net_status_verified = net_status_response
137        .verify(DOMAIN_NET_STATUS, |their_pk| {
138            if let Some(broker_pk) = &ctx.init().broker_keys {
139                hex::encode(their_pk.as_bytes()) == broker_pk.master
140            } else {
141                tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
142                true
143            }
144        })
145        .context("could not verify net status")?;
146
147    Ok(net_status_verified)
148}
149
150#[cfg(test)]
151mod tests {
152    use super::BrokerSource;
153
154    #[test]
155    fn deserializes_aws_lambda_when_feature_disabled() {
156        let parsed: BrokerSource = serde_json::from_str(
157            r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
158        )
159        .unwrap();
160
161        match parsed {
162            BrokerSource::AwsLambda {
163                function_name,
164                region,
165                obfs_key,
166            } => {
167                assert_eq!(function_name, "f");
168                assert_eq!(region, "us-east-1");
169                assert_eq!(obfs_key, "k");
170            }
171            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
172        }
173    }
174
175    fn kind_of(source: &BrokerSource) -> &'static str {
176        match source {
177            BrokerSource::Direct(_) => "direct",
178            BrokerSource::Fronted { .. } => "fronted",
179            BrokerSource::DirectTcp(_) => "direct_tcp",
180            BrokerSource::AwsLambda { .. } => "aws_lambda",
181            BrokerSource::Race(_) => "race",
182            BrokerSource::PriorityRace(_) => "priority_race",
183        }
184    }
185}