1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6 future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19 auth::{auth_loop, get_auth_token},
20 broker::{broker_client, BrokerSource},
21 bw_token::bw_token_refresh_loop,
22 control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23 get_dialer::ExitConstraint,
24 http_proxy::http_proxy_serve,
25 pac::pac_serve,
26 session::{open_conn, run_client_sessions},
27 socks5::socks5_loop,
28 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
29};
30
31#[derive(Serialize, Deserialize, Clone)]
32pub struct Config {
33 pub socks5_listen: Option<SocketAddr>,
34 pub http_proxy_listen: Option<SocketAddr>,
35 pub pac_listen: Option<SocketAddr>,
36
37 pub control_listen: Option<SocketAddr>,
38 pub exit_constraint: ExitConstraint,
39 #[serde(default)]
40 pub bridge_mode: BridgeMode,
41 pub cache: Option<PathBuf>,
42
43 pub broker: Option<BrokerSource>,
44 pub broker_keys: Option<BrokerKeys>,
45
46 #[serde(default)]
47 pub vpn: bool,
48 #[serde(default)]
49 pub vpn_fd: Option<i32>,
50 #[serde(default)]
51 pub spoof_dns: bool,
52 #[serde(default)]
53 pub passthrough_china: bool,
54 #[serde(default)]
55 pub dry_run: bool,
56 #[serde(default)]
57 pub credentials: Credential,
58
59 #[serde(default)]
60 pub sess_metadata: serde_json::Value,
61 pub task_limit: Option<u32>,
62}
63
64#[derive(Serialize, Deserialize, Clone)]
65pub struct BrokerKeys {
67 pub master: String,
68 pub mizaru_free: String,
69 pub mizaru_plus: String,
70 pub mizaru_bw: String,
71}
72
73impl Config {
74 pub fn inert(&self) -> Self {
76 let mut this = self.clone();
77 this.dry_run = true;
78 this.socks5_listen = None;
79 this.http_proxy_listen = None;
80 this.pac_listen = None;
81 this.control_listen = None;
82 this
83 }
84}
85
86#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
87pub enum BridgeMode {
88 Auto,
89 ForceBridges,
90}
91
92impl Default for BridgeMode {
93 fn default() -> Self {
94 Self::Auto
95 }
96}
97
98#[derive(Clone)]
99pub struct Client {
100 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
101 ctx: AnyCtx<Config>,
102}
103
104impl Client {
105 pub fn start(cfg: Config) -> Self {
107 std::env::remove_var("http_proxy");
108 std::env::remove_var("https_proxy");
109 std::env::remove_var("HTTP_PROXY");
110 std::env::remove_var("HTTPS_PROXY");
111 let ctx = AnyCtx::new(cfg.clone());
112 let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
113 if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
114 binary_search::Direction::Low(())
115 } else {
116 binary_search::Direction::High(())
117 }
118 });
119 tracing::info!("raised file descriptor limit to {}", fd_limit);
120
121 #[cfg(unix)]
122 if let Some(fd) = cfg.vpn_fd {
123 let ctx_clone = ctx.clone();
124 smolscale::spawn(async move {
125 let async_fd: smol::Async<File> =
127 smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
128 .expect("could not wrap VPN fd in Async");
129
130 let (mut reader, mut writer) = async_fd.split();
132
133 let read_task = async {
135 let mut buf = vec![0u8; 65535]; loop {
137 match reader.read(&mut buf).await {
138 Ok(n) if n > 0 => {
139 send_vpn_packet(
141 &ctx_clone,
142 bytes::Bytes::copy_from_slice(&buf[..n]),
143 )
144 .await;
145 }
146 Ok(0) => {
147 tracing::warn!("VPN fd reached EOF");
149 break;
150 }
151 Err(e) => {
152 tracing::error!("Error reading from VPN fd: {}", e);
153 break;
154 }
155 _ => break,
156 }
157 }
158 anyhow::Ok(())
159 };
160
161 let write_task = async {
163 loop {
164 let packet = recv_vpn_packet(&ctx_clone).await;
166
167 if let Err(e) = writer.write_all(&packet).await {
169 tracing::error!("Error writing to VPN fd: {}", e);
170 break;
171 }
172
173 if let Err(e) = writer.flush().await {
174 tracing::error!("Error flushing VPN fd: {}", e);
175 break;
176 }
177 }
178 anyhow::Ok(())
179 };
180
181 let _ = read_task.race(write_task).await;
183 tracing::warn!("VPN fd handler exited");
184 })
185 .detach();
186 }
187 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
188 Client {
189 task: task.shared(),
190 ctx,
191 }
192 }
193
194 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
196 open_conn(&self.ctx, "tcp", remote).await
197 }
198
199 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
201 self.task.await.map_err(|e| anyhow::anyhow!(e))
202 }
203
204 pub fn check_dead(&self) -> anyhow::Result<()> {
206 match self
207 .task
208 .clone()
209 .poll(&mut std::task::Context::from_waker(&noop_waker()))
210 {
211 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
212 std::task::Poll::Pending => {}
213 }
214
215 Ok(())
216 }
217
218 pub fn control_client(&self) -> ControlClient {
220 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
221 ControlService(ControlProtocolImpl {
222 ctx: self.ctx.clone(),
223 }),
224 )))
225 }
226
227 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
229 let auth_token = get_auth_token(&self.ctx).await?;
230 let user_info = broker_client(&self.ctx)?
231 .get_user_info(auth_token)
232 .await??
233 .context("no such user")?;
234 Ok(user_info)
235 }
236
237 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
239 send_vpn_packet(&self.ctx, bts).await;
240 Ok(())
241 }
242
243 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
245 let packet = recv_vpn_packet(&self.ctx).await;
246 Ok(packet)
247 }
248}
249
250pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
251
252async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
253 let rpc_serve = async {
254 if let Some(control_listen) = ctx.init().control_listen {
255 nanorpc_sillad::rpc_serve(
256 sillad::tcp::TcpListener::bind(control_listen).await?,
257 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
258 )
259 .await?;
260 anyhow::Ok(())
261 } else {
262 smol::future::pending().await
263 }
264 };
265 if ctx.init().dry_run {
266 rpc_serve.await
267 } else {
268 let vpn_loop = vpn_loop(&ctx);
269
270 let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
271
272 socks5_loop(&ctx)
273 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
274 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
275 .race(
276 http_proxy_serve(&ctx)
277 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
278 )
279 .race(
280 auth_loop(&ctx)
281 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
282 )
283 .race(
284 bw_token_refresh_loop(&ctx)
285 .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
286 )
287 .race(rpc_serve)
288 .race(pac_serve(&ctx))
289 .await
290 }
291}