1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6 AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt, future::Shared, task::noop_waker,
7};
8use geph5_broker_protocol::{Credential, ExitConstraint, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19 auth::{auth_loop, get_auth_token},
20 broker::{BrokerSource, TunneledBrokerSource, broker_client},
21 bw_token::bw_token_refresh_loop,
22 control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23 http_proxy::http_proxy_serve,
24 logging,
25 pac::pac_serve,
26 port_forward::port_forward,
27 session::{open_conn, run_session},
28 socks5::socks5_loop,
29 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
30};
31
32#[derive(Serialize, Deserialize, Clone)]
33pub struct Config {
34 pub socks5_listen: Option<SocketAddr>,
35 pub http_proxy_listen: Option<SocketAddr>,
36 pub pac_listen: Option<SocketAddr>,
37
38 pub control_listen: Option<SocketAddr>,
39 pub exit_constraint: ExitConstraint,
40 #[serde(default)]
41 pub allow_direct: bool,
42
43 pub cache: Option<PathBuf>,
44
45 pub broker: Option<BrokerSource>,
46 #[serde(alias = "tunneled_broker_source")]
47 pub tunneled_broker: Option<TunneledBrokerSource>,
48 pub broker_keys: Option<BrokerKeys>,
49
50 #[serde(default)]
51 pub port_forward: Vec<PortForwardCfg>,
52
53 #[serde(default)]
54 pub vpn: bool,
55 #[serde(default)]
56 pub vpn_fd: Option<i32>,
57 #[serde(default)]
58 pub spoof_dns: bool,
59 #[serde(default)]
60 pub passthrough_china: bool,
61 #[serde(default)]
62 pub dry_run: bool,
63 #[serde(default)]
64 pub credentials: Credential,
65
66 #[serde(default)]
67 pub sess_metadata: serde_json::Value,
68 pub task_limit: Option<u32>,
69}
70
71#[derive(Serialize, Deserialize, Clone)]
72pub struct PortForwardCfg {
73 pub listen: SocketAddr,
74 pub connect: String,
75}
76
77#[derive(Serialize, Deserialize, Clone)]
78pub struct BrokerKeys {
80 pub master: String,
81 pub mizaru_free: String,
82 pub mizaru_plus: String,
83 pub mizaru_bw: String,
84}
85
86impl Config {
87 pub fn inert(&self) -> Self {
89 let mut this = self.clone();
90 this.dry_run = true;
91 this.socks5_listen = None;
92 this.http_proxy_listen = None;
93 this.pac_listen = None;
94 this.control_listen = None;
95 this
96 }
97}
98
99#[derive(Clone)]
100pub struct Client {
101 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
102 ctx: AnyCtx<Config>,
103}
104
105impl Client {
106 pub fn start(cfg: Config) -> Self {
108 let ctx = AnyCtx::new(cfg.clone());
109 let _ = logging::init_logging(&ctx);
111 let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
112 if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
113 binary_search::Direction::Low(())
114 } else {
115 binary_search::Direction::High(())
116 }
117 });
118 tracing::info!("raised file descriptor limit to {}", fd_limit);
119
120 #[cfg(unix)]
121 if let Some(fd) = cfg.vpn_fd {
122 let ctx_clone = ctx.clone();
123 smolscale::spawn(async move {
124 let async_fd: smol::Async<File> =
126 smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
127 .expect("could not wrap VPN fd in Async");
128
129 let (mut reader, mut writer) = async_fd.split();
131
132 let read_task = async {
134 let mut buf = vec![0u8; 65535]; loop {
136 match reader.read(&mut buf).await {
137 Ok(n) if n > 0 => {
138 send_vpn_packet(
140 &ctx_clone,
141 bytes::Bytes::copy_from_slice(&buf[..n]),
142 )
143 .await;
144 }
145 Ok(0) => {
146 tracing::warn!("VPN fd reached EOF");
148 break;
149 }
150 Err(e) => {
151 tracing::error!("Error reading from VPN fd: {}", e);
152 break;
153 }
154 _ => break,
155 }
156 }
157 anyhow::Ok(())
158 };
159
160 let write_task = async {
162 loop {
163 let packet = recv_vpn_packet(&ctx_clone).await;
165
166 if let Err(e) = writer.write_all(&packet).await {
168 tracing::error!("Error writing to VPN fd: {}", e);
169 break;
170 }
171
172 if let Err(e) = writer.flush().await {
173 tracing::error!("Error flushing VPN fd: {}", e);
174 break;
175 }
176 }
177 anyhow::Ok(())
178 };
179
180 let _ = read_task.race(write_task).await;
182 tracing::warn!("VPN fd handler exited");
183 })
184 .detach();
185 }
186 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
187 Client {
188 task: task.shared(),
189 ctx,
190 }
191 }
192
193 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
195 open_conn(&self.ctx, "tcp", remote).await
196 }
197
198 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
200 self.task.await.map_err(|e| anyhow::anyhow!(e))
201 }
202
203 pub fn check_dead(&self) -> anyhow::Result<()> {
205 match self
206 .task
207 .clone()
208 .poll(&mut std::task::Context::from_waker(&noop_waker()))
209 {
210 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
211 std::task::Poll::Pending => {}
212 }
213
214 Ok(())
215 }
216
217 pub fn control_client(&self) -> ControlClient {
219 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
220 ControlService(ControlProtocolImpl {
221 ctx: self.ctx.clone(),
222 }),
223 )))
224 }
225
226 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
228 let auth_token = get_auth_token(&self.ctx).await?;
229 let user_info = broker_client(&self.ctx)?
230 .get_user_info(auth_token)
231 .await??
232 .context("no such user")?;
233 Ok(user_info)
234 }
235
236 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
238 send_vpn_packet(&self.ctx, bts).await;
239 Ok(())
240 }
241
242 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
244 let packet = recv_vpn_packet(&self.ctx).await;
245 Ok(packet)
246 }
247}
248
249pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
250
251async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
252 let rpc_serve = async {
253 if let Some(control_listen) = ctx.init().control_listen {
254 nanorpc_sillad::rpc_serve(
255 sillad::tcp::TcpListener::bind(control_listen).await?,
256 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
257 )
258 .await?;
259 anyhow::Ok(())
260 } else {
261 smol::future::pending().await
262 }
263 };
264 if ctx.init().dry_run {
265 rpc_serve.await
266 } else {
267 let vpn_loop = vpn_loop(&ctx);
268
269 let _client_loop = Immortal::spawn(run_session(ctx.clone()));
270
271 socks5_loop(&ctx)
272 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
273 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
274 .race(
275 http_proxy_serve(&ctx)
276 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
277 )
278 .race(
279 auth_loop(&ctx)
280 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
281 )
282 .race(
283 bw_token_refresh_loop(&ctx)
284 .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
285 )
286 .race(rpc_serve)
287 .race(pac_serve(&ctx))
288 .race(port_forward(&ctx))
289 .await
290 }
291}