use anyctx::AnyCtx;
use anyhow::Context;
use bytes::Bytes;
use futures_util::{
future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
};
use geph5_broker_protocol::{Credential, UserInfo};
use geph5_misc_rpc::client_control::{ControlClient, ControlService};
use nanorpc::DynRpcTransport;
use sillad::Pipe;
use smol::future::FutureExt as _;
use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
use serde::{Deserialize, Serialize};
use smolscale::immortal::Immortal;
use crate::{
auth::{auth_loop, get_auth_token},
broker::{broker_client, BrokerSource},
bw_token::bw_token_refresh_loop,
client_inner::{client_inner, open_conn},
control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
get_dialer::ExitConstraint,
http_proxy::http_proxy_serve,
pac::pac_serve,
socks5::socks5_loop,
vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
};
#[derive(Serialize, Deserialize, Clone)]
pub struct Config {
pub socks5_listen: Option<SocketAddr>,
pub http_proxy_listen: Option<SocketAddr>,
pub pac_listen: Option<SocketAddr>,
pub control_listen: Option<SocketAddr>,
pub exit_constraint: ExitConstraint,
#[serde(default)]
pub bridge_mode: BridgeMode,
pub cache: Option<PathBuf>,
pub broker: Option<BrokerSource>,
pub broker_keys: Option<BrokerKeys>,
#[serde(default)]
pub vpn: bool,
#[serde(default)]
pub vpn_fd: Option<i32>,
#[serde(default)]
pub spoof_dns: bool,
#[serde(default)]
pub passthrough_china: bool,
#[serde(default)]
pub dry_run: bool,
#[serde(default)]
pub credentials: Credential,
#[serde(default)]
pub sess_metadata: serde_json::Value,
pub task_limit: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct BrokerKeys {
pub master: String,
pub mizaru_free: String,
pub mizaru_plus: String,
pub mizaru_bw: String,
}
impl Config {
pub fn inert(&self) -> Self {
let mut this = self.clone();
this.dry_run = true;
this.socks5_listen = None;
this.http_proxy_listen = None;
this.pac_listen = None;
this.control_listen = None;
this
}
}
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
pub enum BridgeMode {
Auto,
ForceBridges,
}
impl Default for BridgeMode {
fn default() -> Self {
Self::Auto
}
}
#[derive(Clone)]
pub struct Client {
task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
ctx: AnyCtx<Config>,
}
impl Client {
pub fn start(cfg: Config) -> Self {
std::env::remove_var("http_proxy");
std::env::remove_var("https_proxy");
std::env::remove_var("HTTP_PROXY");
std::env::remove_var("HTTPS_PROXY");
let ctx = AnyCtx::new(cfg.clone());
#[cfg(unix)]
if let Some(fd) = cfg.vpn_fd {
let ctx_clone = ctx.clone();
smolscale::spawn(async move {
let async_fd: smol::Async<File> =
smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
.expect("could not wrap VPN fd in Async");
let (mut reader, mut writer) = async_fd.split();
let read_task = async {
let mut buf = vec![0u8; 65535]; loop {
match reader.read(&mut buf).await {
Ok(n) if n > 0 => {
send_vpn_packet(
&ctx_clone,
bytes::Bytes::copy_from_slice(&buf[..n]),
)
.await;
}
Ok(0) => {
tracing::warn!("VPN fd reached EOF");
break;
}
Err(e) => {
tracing::error!("Error reading from VPN fd: {}", e);
break;
}
_ => break,
}
}
anyhow::Ok(())
};
let write_task = async {
loop {
let packet = recv_vpn_packet(&ctx_clone).await;
if let Err(e) = writer.write_all(&packet).await {
tracing::error!("Error writing to VPN fd: {}", e);
break;
}
if let Err(e) = writer.flush().await {
tracing::error!("Error flushing VPN fd: {}", e);
break;
}
}
anyhow::Ok(())
};
let _ = read_task.race(write_task).await;
tracing::warn!("VPN fd handler exited");
})
.detach();
}
let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
Client {
task: task.shared(),
ctx,
}
}
pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
open_conn(&self.ctx, "tcp", remote).await
}
pub async fn wait_until_dead(self) -> anyhow::Result<()> {
self.task.await.map_err(|e| anyhow::anyhow!(e))
}
pub fn check_dead(&self) -> anyhow::Result<()> {
match self
.task
.clone()
.poll(&mut std::task::Context::from_waker(&noop_waker()))
{
std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
std::task::Poll::Pending => {}
}
Ok(())
}
pub fn control_client(&self) -> ControlClient {
ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
ControlService(ControlProtocolImpl {
ctx: self.ctx.clone(),
}),
)))
}
pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
let auth_token = get_auth_token(&self.ctx).await?;
let user_info = broker_client(&self.ctx)?
.get_user_info(auth_token)
.await??
.context("no such user")?;
Ok(user_info)
}
pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
send_vpn_packet(&self.ctx, bts).await;
Ok(())
}
pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
let packet = recv_vpn_packet(&self.ctx).await;
Ok(packet)
}
}
pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
let rpc_serve = async {
if let Some(control_listen) = ctx.init().control_listen {
nanorpc_sillad::rpc_serve(
sillad::tcp::TcpListener::bind(control_listen).await?,
ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
)
.await?;
anyhow::Ok(())
} else {
smol::future::pending().await
}
};
if ctx.init().dry_run {
rpc_serve.await
} else {
let vpn_loop = vpn_loop(&ctx);
let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
socks5_loop(&ctx)
.inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
.race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
.race(
http_proxy_serve(&ctx)
.inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
)
.race(
auth_loop(&ctx)
.inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
)
.race(
bw_token_refresh_loop(&ctx)
.inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
)
.race(rpc_serve)
.race(pac_serve(&ctx))
.await
}
}