1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::{BrokerClient, DOMAIN_NET_STATUS, NetStatus};
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29 Direct(String),
30 Fronted {
31 front: String,
32 host: String,
33 #[serde(default)]
34 override_dns: Option<Vec<SocketAddr>>,
35 },
36 DirectTcp(SocketAddr),
37 #[cfg(feature = "aws_lambda")]
38 AwsLambda {
39 function_name: String,
40 region: String,
41 obfs_key: String,
42 },
43 Race(Vec<BrokerSource>),
44 PriorityRace(BTreeMap<u64, BrokerSource>),
45 Other(String),
46}
47
48#[derive(Deserialize)]
49#[serde(untagged)]
50enum RawBrokerSource {
51 Direct {
52 direct: String,
53 },
54 Fronted {
55 fronted: FrontedFields,
56 },
57 DirectTcp {
58 direct_tcp: SocketAddr,
59 },
60 #[cfg(feature = "aws_lambda")]
61 AwsLambda {
62 aws_lambda: AwsLambdaFields,
63 },
64 #[cfg(not(feature = "aws_lambda"))]
65 AwsLambdaUnsupported {
66 #[serde(rename = "aws_lambda")]
67 _aws_lambda: serde::de::IgnoredAny,
68 },
69 Race {
70 race: Vec<BrokerSource>,
71 },
72 PriorityRace {
73 priority_race: BTreeMap<u64, BrokerSource>,
74 },
75 Other(BTreeMap<String, serde_json::Value>),
76}
77
78#[derive(Deserialize)]
79struct FrontedFields {
80 front: String,
81 host: String,
82 #[serde(default)]
83 override_dns: Option<Vec<SocketAddr>>,
84}
85
86#[cfg(feature = "aws_lambda")]
87#[derive(Deserialize)]
88struct AwsLambdaFields {
89 function_name: String,
90 region: String,
91 obfs_key: String,
92}
93
94impl<'de> Deserialize<'de> for BrokerSource {
95 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
96 where
97 D: serde::Deserializer<'de>,
98 {
99 let raw = RawBrokerSource::deserialize(deserializer)?;
100 Ok(match raw {
101 RawBrokerSource::Direct { direct } => BrokerSource::Direct(direct),
102 RawBrokerSource::Fronted { fronted } => BrokerSource::Fronted {
103 front: fronted.front,
104 host: fronted.host,
105 override_dns: fronted.override_dns,
106 },
107 RawBrokerSource::DirectTcp { direct_tcp } => BrokerSource::DirectTcp(direct_tcp),
108 #[cfg(feature = "aws_lambda")]
109 RawBrokerSource::AwsLambda { aws_lambda } => BrokerSource::AwsLambda {
110 function_name: aws_lambda.function_name,
111 region: aws_lambda.region,
112 obfs_key: aws_lambda.obfs_key,
113 },
114 #[cfg(not(feature = "aws_lambda"))]
115 RawBrokerSource::AwsLambdaUnsupported { .. } => {
116 warn_unknown_broker_source("aws_lambda");
117 BrokerSource::Other("aws_lambda".to_string())
118 }
119 RawBrokerSource::Race { race } => BrokerSource::Race(race),
120 RawBrokerSource::PriorityRace { priority_race } => {
121 BrokerSource::PriorityRace(priority_race)
122 }
123 RawBrokerSource::Other(other) => {
124 let kind = other
125 .into_iter()
126 .next()
127 .map(|(key, _)| key)
128 .unwrap_or_else(|| "unknown".to_string());
129 warn_unknown_broker_source(&kind);
130 BrokerSource::Other(kind)
131 }
132 })
133 }
134}
135
136fn warn_unknown_broker_source(kind: &str) {
137 tracing::warn!(
138 broker_source = kind,
139 "unknown or unsupported broker source in config; ignoring it until used"
140 );
141}
142
143impl BrokerSource {
144 pub fn rpc_transport(&self) -> DynRpcTransport {
146 match self {
147 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
148 url: s.clone(),
149 host: None,
150 dns: None,
151 }),
152 BrokerSource::DirectTcp(dest_addr) => {
153 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
154 dest_addr: *dest_addr,
155 }))
156 }
157 BrokerSource::Fronted {
158 front,
159 host,
160 override_dns,
161 } => DynRpcTransport::new(FrontedHttpTransport {
162 url: front.clone(),
163 host: Some(host.clone()),
164 dns: override_dns.clone(),
165 }),
166 #[cfg(feature = "aws_lambda")]
167 BrokerSource::AwsLambda {
168 function_name,
169 region,
170 obfs_key,
171 } => DynRpcTransport::new(AwsLambdaTransport {
172 function_name: function_name.clone(),
173 region: region.clone(),
174 obfs_key: obfs_key.clone(),
175 }),
176 BrokerSource::Race(race_between) => {
177 let transports = race_between
178 .iter()
179 .map(|bs| bs.rpc_transport())
180 .collect_vec();
181 DynRpcTransport::new(RaceTransport::new(transports))
182 }
183 BrokerSource::PriorityRace(inner) => {
184 let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
185 DynRpcTransport::new(PriorityRaceTransport::new(inner))
186 }
187 BrokerSource::Other(kind) => {
188 DynRpcTransport::new(UnsupportedBrokerTransport { kind: kind.clone() })
189 }
190 }
191 }
192}
193
194struct UnsupportedBrokerTransport {
195 kind: String,
196}
197
198#[async_trait::async_trait]
199impl nanorpc::RpcTransport for UnsupportedBrokerTransport {
200 type Error = anyhow::Error;
201
202 async fn call_raw(
203 &self,
204 _req: nanorpc::JrpcRequest,
205 ) -> Result<nanorpc::JrpcResponse, Self::Error> {
206 Err(anyhow::anyhow!(
207 "broker source '{}' is unsupported in this build",
208 self.kind
209 ))
210 }
211}
212
213pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
214 ctx.get(BROKER_CLIENT).as_ref().context(
215 "broker information not provided, so cannot use any broker-dependent functionality",
216 )
217}
218
219static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
220 ctx.init()
221 .broker
222 .as_ref()
223 .map(|src| BrokerClient::from(src.rpc_transport()))
224};
225
226pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
227 let broker = broker_client(ctx).context("could not get broker client")?;
228 let net_status_response = broker
229 .get_net_status()
230 .await?
231 .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
232
233 let net_status_verified = net_status_response
235 .verify(DOMAIN_NET_STATUS, |their_pk| {
236 if let Some(broker_pk) = &ctx.init().broker_keys {
237 hex::encode(their_pk.as_bytes()) == broker_pk.master
238 } else {
239 tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
240 true
241 }
242 })
243 .context("could not verify net status")?;
244
245 Ok(net_status_verified)
246}
247
248#[cfg(test)]
249mod tests {
250 use super::BrokerSource;
251
252 #[test]
253 fn deserializes_unknown_broker_source_as_other() {
254 let parsed: BrokerSource =
255 serde_json::from_str(r#"{"future_transport":{"foo":"bar"}}"#).unwrap();
256
257 match parsed {
258 BrokerSource::Other(kind) => assert_eq!(kind, "future_transport"),
259 other => panic!("expected other broker source, got {:?}", kind_of(&other)),
260 }
261 }
262
263 #[cfg(not(feature = "aws_lambda"))]
264 #[test]
265 fn deserializes_aws_lambda_as_other_when_feature_disabled() {
266 let parsed: BrokerSource = serde_json::from_str(
267 r#"{"aws_lambda":{"function_name":"f","region":"us-east-1","obfs_key":"k"}}"#,
268 )
269 .unwrap();
270
271 match parsed {
272 BrokerSource::Other(kind) => assert_eq!(kind, "aws_lambda"),
273 other => panic!("expected other broker source, got {:?}", kind_of(&other)),
274 }
275 }
276
277 fn kind_of(source: &BrokerSource) -> &'static str {
278 match source {
279 BrokerSource::Direct(_) => "direct",
280 BrokerSource::Fronted { .. } => "fronted",
281 BrokerSource::DirectTcp(_) => "direct_tcp",
282 #[cfg(feature = "aws_lambda")]
283 BrokerSource::AwsLambda { .. } => "aws_lambda",
284 BrokerSource::Race(_) => "race",
285 BrokerSource::PriorityRace(_) => "priority_race",
286 BrokerSource::Other(_) => "other",
287 }
288 }
289}