1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6 future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19 auth::{auth_loop, get_auth_token},
20 broker::{broker_client, BrokerSource},
21 bw_token::bw_token_refresh_loop,
22 control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23 get_dialer::ExitConstraint,
24 http_proxy::http_proxy_serve,
25 pac::pac_serve,
26 logging,
27 session::{open_conn, run_client_sessions},
28 socks5::socks5_loop,
29 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
30};
31
32#[derive(Serialize, Deserialize, Clone)]
33pub struct Config {
34 pub socks5_listen: Option<SocketAddr>,
35 pub http_proxy_listen: Option<SocketAddr>,
36 pub pac_listen: Option<SocketAddr>,
37
38 pub control_listen: Option<SocketAddr>,
39 pub exit_constraint: ExitConstraint,
40 #[serde(default)]
41 pub bridge_mode: BridgeMode,
42 pub cache: Option<PathBuf>,
43
44 pub broker: Option<BrokerSource>,
45 pub broker_keys: Option<BrokerKeys>,
46
47 #[serde(default)]
48 pub vpn: bool,
49 #[serde(default)]
50 pub vpn_fd: Option<i32>,
51 #[serde(default)]
52 pub spoof_dns: bool,
53 #[serde(default)]
54 pub passthrough_china: bool,
55 #[serde(default)]
56 pub dry_run: bool,
57 #[serde(default)]
58 pub credentials: Credential,
59
60 #[serde(default)]
61 pub sess_metadata: serde_json::Value,
62 pub task_limit: Option<u32>,
63}
64
65#[derive(Serialize, Deserialize, Clone)]
66pub struct BrokerKeys {
68 pub master: String,
69 pub mizaru_free: String,
70 pub mizaru_plus: String,
71 pub mizaru_bw: String,
72}
73
74impl Config {
75 pub fn inert(&self) -> Self {
77 let mut this = self.clone();
78 this.dry_run = true;
79 this.socks5_listen = None;
80 this.http_proxy_listen = None;
81 this.pac_listen = None;
82 this.control_listen = None;
83 this
84 }
85}
86
87#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
88pub enum BridgeMode {
89 Auto,
90 ForceBridges,
91}
92
93impl Default for BridgeMode {
94 fn default() -> Self {
95 Self::Auto
96 }
97}
98
99#[derive(Clone)]
100pub struct Client {
101 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
102 ctx: AnyCtx<Config>,
103}
104
105impl Client {
106 pub fn start(cfg: Config) -> Self {
108 std::env::remove_var("http_proxy");
109 std::env::remove_var("https_proxy");
110 std::env::remove_var("HTTP_PROXY");
111 std::env::remove_var("HTTPS_PROXY");
112 let ctx = AnyCtx::new(cfg.clone());
113 let _ = logging::init_logging(&ctx);
115 let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
116 if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
117 binary_search::Direction::Low(())
118 } else {
119 binary_search::Direction::High(())
120 }
121 });
122 tracing::info!("raised file descriptor limit to {}", fd_limit);
123
124 #[cfg(unix)]
125 if let Some(fd) = cfg.vpn_fd {
126 let ctx_clone = ctx.clone();
127 smolscale::spawn(async move {
128 let async_fd: smol::Async<File> =
130 smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
131 .expect("could not wrap VPN fd in Async");
132
133 let (mut reader, mut writer) = async_fd.split();
135
136 let read_task = async {
138 let mut buf = vec![0u8; 65535]; loop {
140 match reader.read(&mut buf).await {
141 Ok(n) if n > 0 => {
142 send_vpn_packet(
144 &ctx_clone,
145 bytes::Bytes::copy_from_slice(&buf[..n]),
146 )
147 .await;
148 }
149 Ok(0) => {
150 tracing::warn!("VPN fd reached EOF");
152 break;
153 }
154 Err(e) => {
155 tracing::error!("Error reading from VPN fd: {}", e);
156 break;
157 }
158 _ => break,
159 }
160 }
161 anyhow::Ok(())
162 };
163
164 let write_task = async {
166 loop {
167 let packet = recv_vpn_packet(&ctx_clone).await;
169
170 if let Err(e) = writer.write_all(&packet).await {
172 tracing::error!("Error writing to VPN fd: {}", e);
173 break;
174 }
175
176 if let Err(e) = writer.flush().await {
177 tracing::error!("Error flushing VPN fd: {}", e);
178 break;
179 }
180 }
181 anyhow::Ok(())
182 };
183
184 let _ = read_task.race(write_task).await;
186 tracing::warn!("VPN fd handler exited");
187 })
188 .detach();
189 }
190 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
191 Client {
192 task: task.shared(),
193 ctx,
194 }
195 }
196
197 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
199 open_conn(&self.ctx, "tcp", remote).await
200 }
201
202 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
204 self.task.await.map_err(|e| anyhow::anyhow!(e))
205 }
206
207 pub fn check_dead(&self) -> anyhow::Result<()> {
209 match self
210 .task
211 .clone()
212 .poll(&mut std::task::Context::from_waker(&noop_waker()))
213 {
214 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
215 std::task::Poll::Pending => {}
216 }
217
218 Ok(())
219 }
220
221 pub fn control_client(&self) -> ControlClient {
223 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
224 ControlService(ControlProtocolImpl {
225 ctx: self.ctx.clone(),
226 }),
227 )))
228 }
229
230 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
232 let auth_token = get_auth_token(&self.ctx).await?;
233 let user_info = broker_client(&self.ctx)?
234 .get_user_info(auth_token)
235 .await??
236 .context("no such user")?;
237 Ok(user_info)
238 }
239
240 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
242 send_vpn_packet(&self.ctx, bts).await;
243 Ok(())
244 }
245
246 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
248 let packet = recv_vpn_packet(&self.ctx).await;
249 Ok(packet)
250 }
251}
252
253pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
254
255async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
256 let rpc_serve = async {
257 if let Some(control_listen) = ctx.init().control_listen {
258 nanorpc_sillad::rpc_serve(
259 sillad::tcp::TcpListener::bind(control_listen).await?,
260 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
261 )
262 .await?;
263 anyhow::Ok(())
264 } else {
265 smol::future::pending().await
266 }
267 };
268 if ctx.init().dry_run {
269 rpc_serve.await
270 } else {
271 let vpn_loop = vpn_loop(&ctx);
272
273 let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
274
275 socks5_loop(&ctx)
276 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
277 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
278 .race(
279 http_proxy_serve(&ctx)
280 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
281 )
282 .race(
283 auth_loop(&ctx)
284 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
285 )
286 .race(
287 bw_token_refresh_loop(&ctx)
288 .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
289 )
290 .race(rpc_serve)
291 .race(pac_serve(&ctx))
292 .await
293 }
294}