Skip to main content

geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29    Direct(String),
30    Fronted {
31        front: String,
32        host: String,
33        #[serde(default)]
34        override_dns: Option<Vec<SocketAddr>>,
35    },
36    DirectTcp(SocketAddr),
37    #[cfg(feature = "aws_lambda")]
38    AwsLambda {
39        function_name: String,
40        region: String,
41        obfs_key: String,
42    },
43    Race(Vec<BrokerSource>),
44    PriorityRace(BTreeMap<u64, BrokerSource>),
45    Other(String),
46}
47
48#[derive(Deserialize)]
49#[serde(untagged)]
50enum RawBrokerSource {
51    Direct {
52        direct: String,
53    },
54    Fronted {
55        fronted: FrontedFields,
56    },
57    DirectTcp {
58        direct_tcp: SocketAddr,
59    },
60    #[cfg(feature = "aws_lambda")]
61    AwsLambda {
62        aws_lambda: AwsLambdaFields,
63    },
64    #[cfg(not(feature = "aws_lambda"))]
65    AwsLambdaUnsupported {
66        #[serde(rename = "aws_lambda")]
67        _aws_lambda: serde::de::IgnoredAny,
68    },
69    Race {
70        race: Vec<BrokerSource>,
71    },
72    PriorityRace {
73        priority_race: BTreeMap<u64, BrokerSource>,
74    },
75    Other(BTreeMap<String, serde_json::Value>),
76}
77
78#[derive(Deserialize)]
79struct FrontedFields {
80    front: String,
81    host: String,
82    #[serde(default)]
83    override_dns: Option<Vec<SocketAddr>>,
84}
85
86#[cfg(feature = "aws_lambda")]
87#[derive(Deserialize)]
88struct AwsLambdaFields {
89    function_name: String,
90    region: String,
91    obfs_key: String,
92}
93
94impl<'de> Deserialize<'de> for BrokerSource {
95    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
96    where
97        D: serde::Deserializer<'de>,
98    {
99        let raw = RawBrokerSource::deserialize(deserializer)?;
100        Ok(match raw {
101            RawBrokerSource::Direct { direct } => BrokerSource::Direct(direct),
102            RawBrokerSource::Fronted { fronted } => BrokerSource::Fronted {
103                front: fronted.front,
104                host: fronted.host,
105                override_dns: fronted.override_dns,
106            },
107            RawBrokerSource::DirectTcp { direct_tcp } => BrokerSource::DirectTcp(direct_tcp),
108            #[cfg(feature = "aws_lambda")]
109            RawBrokerSource::AwsLambda { aws_lambda } => BrokerSource::AwsLambda {
110                function_name: aws_lambda.function_name,
111                region: aws_lambda.region,
112                obfs_key: aws_lambda.obfs_key,
113            },
114            #[cfg(not(feature = "aws_lambda"))]
115            RawBrokerSource::AwsLambdaUnsupported { .. } => {
116                warn_unknown_broker_source("aws_lambda");
117                BrokerSource::Other("aws_lambda".to_string())
118            }
119            RawBrokerSource::Race { race } => BrokerSource::Race(race),
120            RawBrokerSource::PriorityRace { priority_race } => {
121                BrokerSource::PriorityRace(priority_race)
122            }
123            RawBrokerSource::Other(other) => {
124                let kind = other
125                    .into_iter()
126                    .next()
127                    .map(|(key, _)| key)
128                    .unwrap_or_else(|| "unknown".to_string());
129                warn_unknown_broker_source(&kind);
130                BrokerSource::Other(kind)
131            }
132        })
133    }
134}
135
136fn warn_unknown_broker_source(kind: &str) {
137    tracing::warn!(
138        broker_source = kind,
139        "unknown or unsupported broker source in config; ignoring it until used"
140    );
141}
142
143impl BrokerSource {
144    /// Converts to a RpcTransport.
145    pub fn rpc_transport(&self) -> DynRpcTransport {
146        match self {
147            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
148                url: s.clone(),
149                host: None,
150                dns: None,
151            }),
152            BrokerSource::DirectTcp(dest_addr) => {
153                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
154                    dest_addr: *dest_addr,
155                }))
156            }
157            BrokerSource::Fronted {
158                front,
159                host,
160                override_dns,
161            } => DynRpcTransport::new(FrontedHttpTransport {
162                url: front.clone(),
163                host: Some(host.clone()),
164                dns: override_dns.clone(),
165            }),
166            #[cfg(feature = "aws_lambda")]
167            BrokerSource::AwsLambda {
168                function_name,
169                region,
170                obfs_key,
171            } => DynRpcTransport::new(AwsLambdaTransport {
172                function_name: function_name.clone(),
173                region: region.clone(),
174                obfs_key: obfs_key.clone(),
175            }),
176            BrokerSource::Race(race_between) => {
177                let transports = race_between
178                    .iter()
179                    .map(|bs| bs.rpc_transport())
180                    .collect_vec();
181                DynRpcTransport::new(RaceTransport::new(transports))
182            }
183            BrokerSource::PriorityRace(inner) => {
184                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
185                DynRpcTransport::new(PriorityRaceTransport::new(inner))
186            }
187            BrokerSource::Other(kind) => {
188                DynRpcTransport::new(UnsupportedBrokerTransport { kind: kind.clone() })
189            }
190        }
191    }
192}
193
194struct UnsupportedBrokerTransport {
195    kind: String,
196}
197
198#[async_trait::async_trait]
199impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
200    type Error = anyhow::Error;
201
202    async fn call_raw(
203        &self,
204        _req: nanorpc::JrpcRequest,
205    ) -> Result<nanorpc::JrpcResponse, Self::Error> {
206        Err(anyhow::anyhow!(
207            "broker source '{}' is unsupported in this build",
208            self.kind
209        ))
210    }
211}
212
213pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
214    ctx.get(BROKER_CLIENT).as_ref().context(
215        "broker information not provided, so cannot use any broker-dependent functionality",
216    )
217}
218
219static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
220    ctx.init()
221        .broker
222        .as_ref()
223        .map(|src| BrokerClient::from(src.rpc_transport()))
224};
225
226pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
227    let broker = broker_client(ctx).context("could not get broker client")?;
228    let net_status_response = broker
229        .get_net_status()
230        .await?
231        .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
232
233    // Verify the broker's signature over the net status:
234    let net_status_verified = net_status_response
235        .verify(DOMAIN_NET_STATUS, |their_pk| {
236            if let Some(broker_pk) = &ctx.init().broker_keys {
237                hex::encode(their_pk.as_bytes()) == broker_pk.master
238            } else {
239                tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
240                true
241            }
242        })
243        .context("could not verify net status")?;
244
245    Ok(net_status_verified)
246}
247
248#[cfg(test)]
249mod tests {
250    use super::BrokerSource;
251
252    #[test]
253    fn deserializes_unknown_broker_source_as_other() {
254        let parsed: BrokerSource =
255            serde_json::from_str(r#"{"future_transport":{"foo":"bar"}}"#).unwrap();
256
257        match parsed {
258            BrokerSource::Other(kind) => assert_eq!(kind, "future_transport"),
259            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
260        }
261    }
262
263    #[cfg(not(feature = "aws_lambda"))]
264    #[test]
265    fn deserializes_aws_lambda_as_other_when_feature_disabled() {
266        let parsed: BrokerSource = serde_json::from_str(
267            r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
268        )
269        .unwrap();
270
271        match parsed {
272            BrokerSource::Other(kind) => assert_eq!(kind, "aws_lambda"),
273            other => panic!("expected other broker source, got {:?}", kind_of(&other)),
274        }
275    }
276
277    fn kind_of(source: &BrokerSource) -> &'static str {
278        match source {
279            BrokerSource::Direct(_) => "direct",
280            BrokerSource::Fronted { .. } => "fronted",
281            BrokerSource::DirectTcp(_) => "direct_tcp",
282            #[cfg(feature = "aws_lambda")]
283            BrokerSource::AwsLambda { .. } => "aws_lambda",
284            BrokerSource::Race(_) => "race",
285            BrokerSource::PriorityRace(_) => "priority_race",
286            BrokerSource::Other(_) => "other",
287        }
288    }
289}