1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{future::Shared, task::noop_waker, FutureExt, TryFutureExt};
6use geph5_broker_protocol::{Credential, ExitList, UserInfo};
7use nanorpc::DynRpcTransport;
8use sillad::Pipe;
9use smol::future::FutureExt as _;
10use std::{net::SocketAddr, path::PathBuf, sync::Arc};
11
12use serde::{Deserialize, Serialize};
13use smolscale::immortal::Immortal;
14
15use crate::{
16 auth::{auth_loop, get_auth_token},
17 broker::{broker_client, BrokerSource},
18 client_inner::{client_inner, open_conn},
19 control_prot::{
20 ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
21 },
22 database::db_read_or_wait,
23 http_proxy::http_proxy_serve,
24 pac::pac_serve,
25 route::ExitConstraint,
26 socks5::socks5_loop,
27 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
28};
29
30#[derive(Serialize, Deserialize, Clone)]
31pub struct Config {
32 pub socks5_listen: Option<SocketAddr>,
33 pub http_proxy_listen: Option<SocketAddr>,
34 pub pac_listen: Option<SocketAddr>,
35
36 pub control_listen: Option<SocketAddr>,
37 pub exit_constraint: ExitConstraint,
38 #[serde(default)]
39 pub bridge_mode: BridgeMode,
40 pub cache: Option<PathBuf>,
41
42 pub broker: Option<BrokerSource>,
43 pub broker_keys: Option<BrokerKeys>,
44
45 #[serde(default)]
46 pub vpn: bool,
47 #[serde(default)]
48 pub spoof_dns: bool,
49 #[serde(default)]
50 pub passthrough_china: bool,
51 #[serde(default)]
52 pub dry_run: bool,
53 #[serde(default)]
54 pub credentials: Credential,
55
56 #[serde(default)]
57 pub sess_metadata: serde_json::Value,
58 pub task_limit: Option<u32>,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62pub struct BrokerKeys {
64 pub master: String,
65 pub mizaru_free: String,
66 pub mizaru_plus: String,
67}
68
69impl Config {
70 pub fn inert(&self) -> Self {
72 let mut this = self.clone();
73 this.dry_run = true;
74 this.socks5_listen = None;
75 this.http_proxy_listen = None;
76 this.pac_listen = None;
77 this.control_listen = None;
78 this
79 }
80}
81
82#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
83pub enum BridgeMode {
84 Auto,
85 ForceBridges,
86 ForceDirect,
87}
88
89impl Default for BridgeMode {
90 fn default() -> Self {
91 Self::Auto
92 }
93}
94
95pub struct Client {
96 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
97 ctx: AnyCtx<Config>,
98}
99
100impl Client {
101 pub fn start(cfg: Config) -> Self {
103 std::env::remove_var("http_proxy");
104 std::env::remove_var("https_proxy");
105 std::env::remove_var("HTTP_PROXY");
106 std::env::remove_var("HTTPS_PROXY");
107 let ctx = AnyCtx::new(cfg);
108 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
109 Client {
110 task: task.shared(),
111 ctx,
112 }
113 }
114
115 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
117 open_conn(&self.ctx, "tcp", remote).await
118 }
119
120 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
122 self.task.await.map_err(|e| anyhow::anyhow!(e))
123 }
124
125 pub fn check_dead(&self) -> anyhow::Result<()> {
127 match self
128 .task
129 .clone()
130 .poll(&mut std::task::Context::from_waker(&noop_waker()))
131 {
132 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
133 std::task::Poll::Pending => {}
134 }
135
136 Ok(())
137 }
138
139 pub fn control_client(&self) -> ControlClient {
141 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
142 ControlService(ControlProtocolImpl {
143 ctx: self.ctx.clone(),
144 }),
145 )))
146 }
147
148 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
150 let auth_token = get_auth_token(&self.ctx).await?;
151 let user_info = broker_client(&self.ctx)?
152 .get_user_info(auth_token)
153 .await??
154 .context("no such user")?;
155 Ok(user_info)
156 }
157
158 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
160 send_vpn_packet(&self.ctx, bts).await;
161 Ok(())
162 }
163
164 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
166 let packet = recv_vpn_packet(&self.ctx).await;
167 Ok(packet)
168 }
169}
170
171pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
172
173async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
174 #[derive(Serialize)]
175 struct DryRunOutput {
176 auth_token: String,
177 exits: ExitList,
178 }
179
180 if ctx.init().dry_run {
181 auth_loop(&ctx)
182 .race(async {
183 let broker_client = broker_client(&ctx)?;
184 let exits = broker_client
185 .get_exits()
186 .await?
187 .map_err(|e| anyhow::anyhow!("{e}"))?;
188 let auth_token = db_read_or_wait(&ctx, "auth_token").await?;
189 let exits = exits.inner;
190 println!(
191 "{}",
192 serde_json::to_string(&DryRunOutput {
193 auth_token: hex::encode(auth_token),
194 exits,
195 })?
196 );
197 anyhow::Ok(())
198 })
199 .await
200 } else {
201 let vpn_loop = vpn_loop(&ctx);
202
203 let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
204
205 let rpc_serve = async {
206 if let Some(control_listen) = ctx.init().control_listen {
207 nanorpc_sillad::rpc_serve(
208 sillad::tcp::TcpListener::bind(control_listen).await?,
209 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
210 )
211 .await?;
212 anyhow::Ok(())
213 } else {
214 smol::future::pending().await
215 }
216 };
217
218 socks5_loop(&ctx)
219 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
220 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
221 .race(
222 http_proxy_serve(&ctx)
223 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
224 )
225 .race(
226 auth_loop(&ctx)
227 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
228 )
229 .race(rpc_serve)
230 .race(pac_serve(&ctx))
231 .await
232 }
233}