geph5_client/route.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
use std::{net::SocketAddr, time::Duration};
use anyctx::AnyCtx;
use anyhow::Context;
use ed25519_dalek::VerifyingKey;
use futures_util::TryFutureExt as _;
use geph5_broker_protocol::{
AccountLevel, ExitDescriptor, RouteDescriptor, DOMAIN_EXIT_DESCRIPTOR,
};
use isocountry::CountryCode;
use moka::sync::Cache;
use once_cell::sync::Lazy;
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
use sillad::{
dialer::{DialerExt, DynDialer, FailingDialer},
tcp::TcpDialer,
};
use sillad_sosistab3::{dialer::SosistabDialer, Cookie};
use crate::{
auth::get_connect_token, broker::broker_client, client::Config, client_inner::CONCURRENCY,
vpn::vpn_whitelist,
};
static ROUTE_SHITLIST: Lazy<Cache<SocketAddr, usize>> = Lazy::new(|| {
Cache::builder()
.time_to_live(Duration::from_secs(60))
.build()
});
fn shitlist_delay(addr: SocketAddr) -> Duration {
let recent_deaths = ROUTE_SHITLIST.get(&addr).unwrap_or_default() as f64 / CONCURRENCY as f64;
Duration::from_secs_f64((recent_deaths - 1.0).max(0.0).powi(2) / 10.0)
}
/// Deprioritizes routes with this address.
pub fn deprioritize_route(addr: SocketAddr) {
ROUTE_SHITLIST.insert(addr, ROUTE_SHITLIST.get_with(addr, || 1) + 1)
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ExitConstraint {
Auto,
Direct(String),
Hostname(String),
Country(CountryCode),
CountryCity(CountryCode, String),
}
/// Gets a sillad Dialer that produces a single, pre-authentication pipe, as well as the public key.
pub async fn get_dialer(
ctx: &AnyCtx<Config>,
) -> anyhow::Result<(VerifyingKey, ExitDescriptor, DynDialer)> {
let mut country_constraint = None;
let mut city_constraint = None;
let mut hostname_constraint = None;
match &ctx.init().exit_constraint {
ExitConstraint::Direct(dir) => {
let (dir, pubkey) = dir
.split_once('/')
.context("did not find / in a direct constraint")?;
let pubkey = VerifyingKey::from_bytes(
hex::decode(pubkey)
.context("cannot decode pubkey as hex")?
.as_slice()
.try_into()
.context("pubkey wrong length")?,
)?;
let dest_addr = *smol::net::resolve(dir)
.await?
.choose(&mut rand::thread_rng())
.context("could not resolve destination for direct exit connection")?;
vpn_whitelist(dest_addr.ip());
return Ok((
pubkey,
ExitDescriptor {
c2e_listen: "0.0.0.0:0".parse()?,
b2e_listen: "0.0.0.0:0".parse()?,
country: CountryCode::ABW,
city: "".to_string(),
load: 0.0,
expiry: 0,
},
TcpDialer { dest_addr }.dynamic(),
));
}
ExitConstraint::Country(country) => country_constraint = Some(*country),
ExitConstraint::CountryCity(country, city) => {
country_constraint = Some(*country);
city_constraint = Some(city.clone())
}
ExitConstraint::Hostname(hostname) => {
hostname_constraint = Some(hostname.clone());
}
ExitConstraint::Auto => {}
}
// First get the conn token
let (level, conn_token, sig) = get_connect_token(ctx)
.await
.context("could not get connect token")?;
let broker = broker_client(ctx).context("could not get broker client")?;
let exits = match level {
AccountLevel::Plus => broker.get_exits().await,
AccountLevel::Free => broker.get_free_exits().await,
}?
.map_err(|e| anyhow::anyhow!("broker refused to serve exits: {e}"))?;
let exits = exits
.verify(DOMAIN_EXIT_DESCRIPTOR, |their_pk| {
if let Some(broker_pk) = &ctx.init().broker_keys {
hex::encode(their_pk.as_bytes()) == broker_pk.master
} else {
true
}
})
.context("could not verify")?;
// filter for things that fit
let (pubkey, exit) = if let Some(min) = exits
.all_exits
.iter()
.filter(|(_, exit)| {
let country_pass = if let Some(country) = &country_constraint {
exit.country == *country
} else {
true
};
let city_pass = if let Some(city) = &city_constraint {
&exit.city == city
} else {
true
};
let hostname_pass = if let Some(hostname) = &hostname_constraint {
&exit.b2e_listen.ip().to_string() == hostname
} else {
true
};
country_pass && city_pass && hostname_pass
})
.min_by_key(|e| (e.1.load * 1000.0) as u64)
{
min
} else {
exits
.all_exits
.iter()
.min_by_key(|e| (e.1.load * 1000.0) as u64)
.context("no exits that fit the criterion")?
};
tracing::debug!(exit = debug(&exit), "narrowed down choice of exit");
vpn_whitelist(exit.c2e_listen.ip());
let exit_c2e = exit.c2e_listen;
let direct_dialer = TcpDialer {
dest_addr: exit_c2e,
}
.dyn_delay(move || shitlist_delay(exit_c2e));
tracing::debug!(token = debug(&conn_token), sig = debug(&sig), "CONN TOKEN");
// also get bridges
let bridge_routes = broker
.get_routes(conn_token, sig, exit.b2e_listen)
.await?
.map_err(|e| anyhow::anyhow!("broker refused to serve bridge routes: {e}"))?;
tracing::debug!(
bridge_routes = debug(&bridge_routes),
"bridge routes obtained too"
);
let bridge_dialer = route_to_dialer(&bridge_routes);
let final_dialer = match ctx.init().bridge_mode {
crate::BridgeMode::Auto => direct_dialer
.race(bridge_dialer.delay(Duration::from_millis(1000)))
.dynamic(),
crate::BridgeMode::ForceBridges => bridge_dialer,
crate::BridgeMode::ForceDirect => direct_dialer.dynamic(),
};
Ok((*pubkey, exit.clone(), final_dialer))
}
// async fn reachability_test(
// ctx: AnyCtx<Config>,
// dialers: BTreeMap<String, DynDialer>,
// ) -> anyhow::Result<()> {
// let nfo = IP_INFO.get().unwrap();
// let country = nfo["country"].as_str().context("country code not found")?;
// let asn = nfo["org"]
// .as_str()
// .context("no org")?
// .split_ascii_whitespace()
// .next()
// .unwrap();
// for (name, dialer) in dialers {
// tracing::debug!(name = display(&name), "doing a reachability test");
// let ctx = ctx.clone();
// smolscale::spawn(async move {
// let broker = broker_client(&ctx).context("could not get broker client")?;
// let success = if let Err(err) = dialer.timeout(Duration::from_secs(10)).dial().await {
// tracing::warn!(
// name = display(&name),
// err = debug(err),
// "could not reach something"
// );
// false
// } else {
// true
// };
// broker
// .upload_available(AvailabilityData {
// listen: name,
// country: country.to_string(),
// asn: asn.to_string(),
// success,
// })
// .await?;
// anyhow::Ok(())
// })
// .detach();
// }
// Ok(())
// }
// fn route_to_flat_dialers(route: &RouteDescriptor) -> BTreeMap<String, DynDialer> {
// match route {
// RouteDescriptor::Tcp(socket_addr) => std::iter::once((
// socket_addr.ip().to_string(),
// TcpDialer {
// dest_addr: *socket_addr,
// }
// .dynamic(),
// ))
// .collect(),
// RouteDescriptor::Sosistab3 { cookie, lower } => route_to_flat_dialers(lower)
// .into_iter()
// .map(|(k, inner)| {
// (
// k,
// SosistabDialer {
// inner,
// cookie: Cookie::new(cookie),
// }
// .dynamic(),
// )
// })
// .collect(),
// RouteDescriptor::Race(vec) | RouteDescriptor::Fallback(vec) => vec
// .iter()
// .flat_map(|v| route_to_flat_dialers(v).into_iter())
// .collect(),
// RouteDescriptor::Timeout {
// milliseconds: _,
// lower,
// }
// | RouteDescriptor::Delay {
// milliseconds: _,
// lower,
// } => route_to_flat_dialers(lower),
// _ => BTreeMap::new(),
// }
// }
fn route_to_dialer(route: &RouteDescriptor) -> DynDialer {
match route {
RouteDescriptor::Tcp(addr) => {
vpn_whitelist(addr.ip());
let addr = *addr;
TcpDialer { dest_addr: addr }
.dyn_delay(move || shitlist_delay(addr))
.dynamic()
}
RouteDescriptor::Sosistab3 { cookie, lower } => {
let inner = route_to_dialer(lower);
SosistabDialer {
inner,
cookie: Cookie::new(cookie),
}
.dynamic()
}
RouteDescriptor::Race(inside) => inside
.iter()
.map(route_to_dialer)
.reduce(|a, b| a.race(b).dynamic())
.unwrap_or_else(|| FailingDialer.dynamic()),
RouteDescriptor::Fallback(a) => a
.iter()
.map(route_to_dialer)
.reduce(|a, b| a.fallback(b).dynamic())
.unwrap_or_else(|| FailingDialer.dynamic()),
RouteDescriptor::Timeout {
milliseconds,
lower,
} => route_to_dialer(lower)
.timeout(Duration::from_millis(*milliseconds as _))
.dynamic(),
RouteDescriptor::Delay {
milliseconds,
lower,
} => route_to_dialer(lower)
.delay(Duration::from_millis((*milliseconds).into()))
.dynamic(),
RouteDescriptor::Other(_) => FailingDialer.dynamic(),
}
}