1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::{BrokerClient, NetStatus, DOMAIN_NET_STATUS};
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Deserialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29 Direct(String),
30 Fronted {
31 front: String,
32 host: String,
33 #[serde(default)]
34 override_dns: Option<Vec<SocketAddr>>,
35 },
36 DirectTcp(SocketAddr),
37 #[cfg(feature = "aws_lambda")]
38 AwsLambda {
39 function_name: String,
40 region: String,
41 obfs_key: String,
42 },
43 Race(Vec<BrokerSource>),
44 PriorityRace(BTreeMap<u64, BrokerSource>),
45}
46
47impl BrokerSource {
48 pub fn rpc_transport(&self) -> DynRpcTransport {
50 match self {
51 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
52 url: s.clone(),
53 host: None,
54 dns: None,
55 }),
56 BrokerSource::DirectTcp(dest_addr) => {
57 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
58 dest_addr: *dest_addr,
59 }))
60 }
61 BrokerSource::Fronted {
62 front,
63 host,
64 override_dns,
65 } => DynRpcTransport::new(FrontedHttpTransport {
66 url: front.clone(),
67 host: Some(host.clone()),
68 dns: override_dns.clone(),
69 }),
70 #[cfg(feature = "aws_lambda")]
71 BrokerSource::AwsLambda {
72 function_name,
73 region,
74 obfs_key,
75 } => DynRpcTransport::new(AwsLambdaTransport {
76 function_name: function_name.clone(),
77 region: region.clone(),
78 obfs_key: obfs_key.clone(),
79 }),
80 BrokerSource::Race(race_between) => {
81 let transports = race_between
82 .iter()
83 .map(|bs| bs.rpc_transport())
84 .collect_vec();
85 DynRpcTransport::new(RaceTransport::new(transports))
86 }
87 BrokerSource::PriorityRace(inner) => {
88 let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
89 DynRpcTransport::new(PriorityRaceTransport::new(inner))
90 }
91 }
92 }
93}
94
95pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
96 ctx.get(BROKER_CLIENT).as_ref().context(
97 "broker information not provided, so cannot use any broker-dependent functionality",
98 )
99}
100
101static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
102 ctx.init()
103 .broker
104 .as_ref()
105 .map(|src| BrokerClient::from(src.rpc_transport()))
106};
107
108pub async fn get_net_status(ctx: &AnyCtx<Config>) -> anyhow::Result<NetStatus> {
109 let broker = broker_client(ctx).context("could not get broker client")?;
110 let net_status_response = broker
111 .get_net_status()
112 .await?
113 .map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
114
115 let net_status_verified = net_status_response
117 .verify(DOMAIN_NET_STATUS, |their_pk| {
118 if let Some(broker_pk) = &ctx.init().broker_keys {
119 hex::encode(their_pk.as_bytes()) == broker_pk.master
120 } else {
121 tracing::warn!("trusting netstatus blindly since broker_keys was not provided");
122 true
123 }
124 })
125 .context("could not verify net status")?;
126
127 Ok(net_status_verified)
128}