1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6 future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19 auth::{auth_loop, get_auth_token},
20 broker::{broker_client, BrokerSource},
21 bw_token::bw_token_refresh_loop,
22 control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23 get_dialer::ExitConstraint,
24 http_proxy::http_proxy_serve,
25 logging,
26 pac::pac_serve,
27 port_forward::port_forward,
28 session::{open_conn, run_client_sessions},
29 socks5::socks5_loop,
30 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
31};
32
33#[derive(Serialize, Deserialize, Clone)]
34pub struct Config {
35 pub socks5_listen: Option<SocketAddr>,
36 pub http_proxy_listen: Option<SocketAddr>,
37 pub pac_listen: Option<SocketAddr>,
38
39 pub control_listen: Option<SocketAddr>,
40 pub exit_constraint: ExitConstraint,
41 #[serde(default)]
42 pub allow_direct: bool,
43
44 pub cache: Option<PathBuf>,
45
46 pub broker: Option<BrokerSource>,
47 pub broker_keys: Option<BrokerKeys>,
48
49 #[serde(default)]
50 pub port_forward: Vec<PortForwardCfg>,
51
52 #[serde(default)]
53 pub vpn: bool,
54 #[serde(default)]
55 pub vpn_fd: Option<i32>,
56 #[serde(default)]
57 pub spoof_dns: bool,
58 #[serde(default)]
59 pub passthrough_china: bool,
60 #[serde(default)]
61 pub dry_run: bool,
62 #[serde(default)]
63 pub credentials: Credential,
64
65 #[serde(default)]
66 pub sess_metadata: serde_json::Value,
67 pub task_limit: Option<u32>,
68}
69
70#[derive(Serialize, Deserialize, Clone)]
71pub struct PortForwardCfg {
72 pub listen: SocketAddr,
73 pub connect: String,
74}
75
76#[derive(Serialize, Deserialize, Clone)]
77pub struct BrokerKeys {
79 pub master: String,
80 pub mizaru_free: String,
81 pub mizaru_plus: String,
82 pub mizaru_bw: String,
83}
84
85impl Config {
86 pub fn inert(&self) -> Self {
88 let mut this = self.clone();
89 this.dry_run = true;
90 this.socks5_listen = None;
91 this.http_proxy_listen = None;
92 this.pac_listen = None;
93 this.control_listen = None;
94 this
95 }
96}
97
98#[derive(Clone)]
99pub struct Client {
100 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
101 ctx: AnyCtx<Config>,
102}
103
104impl Client {
105 pub fn start(cfg: Config) -> Self {
107 let ctx = AnyCtx::new(cfg.clone());
108 let _ = logging::init_logging(&ctx);
110 let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
111 if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
112 binary_search::Direction::Low(())
113 } else {
114 binary_search::Direction::High(())
115 }
116 });
117 tracing::info!("raised file descriptor limit to {}", fd_limit);
118
119 #[cfg(unix)]
120 if let Some(fd) = cfg.vpn_fd {
121 let ctx_clone = ctx.clone();
122 smolscale::spawn(async move {
123 let async_fd: smol::Async<File> =
125 smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
126 .expect("could not wrap VPN fd in Async");
127
128 let (mut reader, mut writer) = async_fd.split();
130
131 let read_task = async {
133 let mut buf = vec![0u8; 65535]; loop {
135 match reader.read(&mut buf).await {
136 Ok(n) if n > 0 => {
137 send_vpn_packet(
139 &ctx_clone,
140 bytes::Bytes::copy_from_slice(&buf[..n]),
141 )
142 .await;
143 }
144 Ok(0) => {
145 tracing::warn!("VPN fd reached EOF");
147 break;
148 }
149 Err(e) => {
150 tracing::error!("Error reading from VPN fd: {}", e);
151 break;
152 }
153 _ => break,
154 }
155 }
156 anyhow::Ok(())
157 };
158
159 let write_task = async {
161 loop {
162 let packet = recv_vpn_packet(&ctx_clone).await;
164
165 if let Err(e) = writer.write_all(&packet).await {
167 tracing::error!("Error writing to VPN fd: {}", e);
168 break;
169 }
170
171 if let Err(e) = writer.flush().await {
172 tracing::error!("Error flushing VPN fd: {}", e);
173 break;
174 }
175 }
176 anyhow::Ok(())
177 };
178
179 let _ = read_task.race(write_task).await;
181 tracing::warn!("VPN fd handler exited");
182 })
183 .detach();
184 }
185 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
186 Client {
187 task: task.shared(),
188 ctx,
189 }
190 }
191
192 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
194 open_conn(&self.ctx, "tcp", remote).await
195 }
196
197 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
199 self.task.await.map_err(|e| anyhow::anyhow!(e))
200 }
201
202 pub fn check_dead(&self) -> anyhow::Result<()> {
204 match self
205 .task
206 .clone()
207 .poll(&mut std::task::Context::from_waker(&noop_waker()))
208 {
209 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
210 std::task::Poll::Pending => {}
211 }
212
213 Ok(())
214 }
215
216 pub fn control_client(&self) -> ControlClient {
218 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
219 ControlService(ControlProtocolImpl {
220 ctx: self.ctx.clone(),
221 }),
222 )))
223 }
224
225 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
227 let auth_token = get_auth_token(&self.ctx).await?;
228 let user_info = broker_client(&self.ctx)?
229 .get_user_info(auth_token)
230 .await??
231 .context("no such user")?;
232 Ok(user_info)
233 }
234
235 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
237 send_vpn_packet(&self.ctx, bts).await;
238 Ok(())
239 }
240
241 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
243 let packet = recv_vpn_packet(&self.ctx).await;
244 Ok(packet)
245 }
246}
247
248pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
249
250async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
251 let rpc_serve = async {
252 if let Some(control_listen) = ctx.init().control_listen {
253 nanorpc_sillad::rpc_serve(
254 sillad::tcp::TcpListener::bind(control_listen).await?,
255 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
256 )
257 .await?;
258 anyhow::Ok(())
259 } else {
260 smol::future::pending().await
261 }
262 };
263 if ctx.init().dry_run {
264 rpc_serve.await
265 } else {
266 let vpn_loop = vpn_loop(&ctx);
267
268 let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
269
270 socks5_loop(&ctx)
271 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
272 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
273 .race(
274 http_proxy_serve(&ctx)
275 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
276 )
277 .race(
278 auth_loop(&ctx)
279 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
280 )
281 .race(
282 bw_token_refresh_loop(&ctx)
283 .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
284 )
285 .race(rpc_serve)
286 .race(pac_serve(&ctx))
287 .race(port_forward(&ctx))
288 .await
289 }
290}