1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6 future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, ExitList, UserInfo};
9use nanorpc::DynRpcTransport;
10use sillad::Pipe;
11use smol::future::FutureExt as _;
12use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
13
14use serde::{Deserialize, Serialize};
15use smolscale::immortal::Immortal;
16
17use crate::{
18 auth::{auth_loop, get_auth_token},
19 broker::{broker_client, BrokerSource},
20 client_inner::{client_inner, open_conn},
21 control_prot::{
22 ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
23 CURRENT_CONN_INFO,
24 },
25 database::db_read_or_wait,
26 http_proxy::http_proxy_serve,
27 pac::pac_serve,
28 route::ExitConstraint,
29 socks5::socks5_loop,
30 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
31 ConnInfo,
32};
33
34#[derive(Serialize, Deserialize, Clone)]
35pub struct Config {
36 pub socks5_listen: Option<SocketAddr>,
37 pub http_proxy_listen: Option<SocketAddr>,
38 pub pac_listen: Option<SocketAddr>,
39
40 pub control_listen: Option<SocketAddr>,
41 pub exit_constraint: ExitConstraint,
42 #[serde(default)]
43 pub bridge_mode: BridgeMode,
44 pub cache: Option<PathBuf>,
45
46 pub broker: Option<BrokerSource>,
47 pub broker_keys: Option<BrokerKeys>,
48
49 #[serde(default)]
50 pub vpn: bool,
51 #[serde(default)]
52 pub vpn_fd: Option<i32>,
53 #[serde(default)]
54 pub spoof_dns: bool,
55 #[serde(default)]
56 pub passthrough_china: bool,
57 #[serde(default)]
58 pub dry_run: bool,
59 #[serde(default)]
60 pub credentials: Credential,
61
62 #[serde(default)]
63 pub sess_metadata: serde_json::Value,
64 pub task_limit: Option<u32>,
65}
66
67#[derive(Serialize, Deserialize, Clone)]
68pub struct BrokerKeys {
70 pub master: String,
71 pub mizaru_free: String,
72 pub mizaru_plus: String,
73}
74
75impl Config {
76 pub fn inert(&self) -> Self {
78 let mut this = self.clone();
79 this.dry_run = true;
80 this.socks5_listen = None;
81 this.http_proxy_listen = None;
82 this.pac_listen = None;
83 this.control_listen = None;
84 this
85 }
86}
87
88#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
89pub enum BridgeMode {
90 Auto,
91 ForceBridges,
92 ForceDirect,
93}
94
95impl Default for BridgeMode {
96 fn default() -> Self {
97 Self::Auto
98 }
99}
100
101pub struct Client {
102 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
103 ctx: AnyCtx<Config>,
104}
105
106impl Client {
107 pub fn start(cfg: Config) -> Self {
109 std::env::remove_var("http_proxy");
110 std::env::remove_var("https_proxy");
111 std::env::remove_var("HTTP_PROXY");
112 std::env::remove_var("HTTPS_PROXY");
113 let ctx = AnyCtx::new(cfg.clone());
114
115 #[cfg(unix)]
116 if let Some(fd) = cfg.vpn_fd {
117 let ctx_clone = ctx.clone();
118 smolscale::spawn(async move {
119 let async_fd: smol::Async<File> =
121 smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
122 .expect("could not wrap VPN fd in Async");
123
124 let (mut reader, mut writer) = async_fd.split();
126
127 let read_task = async {
129 let mut buf = vec![0u8; 65535]; loop {
131 match reader.read(&mut buf).await {
132 Ok(n) if n > 0 => {
133 send_vpn_packet(
135 &ctx_clone,
136 bytes::Bytes::copy_from_slice(&buf[..n]),
137 )
138 .await;
139 }
140 Ok(0) => {
141 tracing::warn!("VPN fd reached EOF");
143 break;
144 }
145 Err(e) => {
146 tracing::error!("Error reading from VPN fd: {}", e);
147 break;
148 }
149 _ => break,
150 }
151 }
152 anyhow::Ok(())
153 };
154
155 let write_task = async {
157 loop {
158 let packet = recv_vpn_packet(&ctx_clone).await;
160
161 if let Err(e) = writer.write_all(&packet).await {
163 tracing::error!("Error writing to VPN fd: {}", e);
164 break;
165 }
166
167 if let Err(e) = writer.flush().await {
168 tracing::error!("Error flushing VPN fd: {}", e);
169 break;
170 }
171 }
172 anyhow::Ok(())
173 };
174
175 let _ = read_task.race(write_task).await;
177 tracing::warn!("VPN fd handler exited");
178 })
179 .detach();
180 }
181 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
182 Client {
183 task: task.shared(),
184 ctx,
185 }
186 }
187
188 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
190 open_conn(&self.ctx, "tcp", remote).await
191 }
192
193 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
195 self.task.await.map_err(|e| anyhow::anyhow!(e))
196 }
197
198 pub fn check_dead(&self) -> anyhow::Result<()> {
200 match self
201 .task
202 .clone()
203 .poll(&mut std::task::Context::from_waker(&noop_waker()))
204 {
205 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
206 std::task::Poll::Pending => {}
207 }
208
209 Ok(())
210 }
211
212 pub fn control_client(&self) -> ControlClient {
214 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
215 ControlService(ControlProtocolImpl {
216 ctx: self.ctx.clone(),
217 }),
218 )))
219 }
220
221 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
223 let auth_token = get_auth_token(&self.ctx).await?;
224 let user_info = broker_client(&self.ctx)?
225 .get_user_info(auth_token)
226 .await??
227 .context("no such user")?;
228 Ok(user_info)
229 }
230
231 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
233 send_vpn_packet(&self.ctx, bts).await;
234 Ok(())
235 }
236
237 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
239 let packet = recv_vpn_packet(&self.ctx).await;
240 Ok(packet)
241 }
242}
243
244pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
245
246async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
247 #[derive(Serialize)]
248 struct DryRunOutput {
249 auth_token: String,
250 exits: ExitList,
251 }
252
253 if ctx.init().dry_run {
254 smol::future::pending().await
255 } else {
256 let rpc_serve = async {
257 if let Some(control_listen) = ctx.init().control_listen {
258 nanorpc_sillad::rpc_serve(
259 sillad::tcp::TcpListener::bind(control_listen).await?,
260 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
261 )
262 .await?;
263 anyhow::Ok(())
264 } else {
265 smol::future::pending().await
266 }
267 };
268
269 let vpn_loop = vpn_loop(&ctx);
270
271 let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
272
273 socks5_loop(&ctx)
274 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
275 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
276 .race(
277 http_proxy_serve(&ctx)
278 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
279 )
280 .race(
281 auth_loop(&ctx)
282 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
283 )
284 .race(rpc_serve)
285 .race(pac_serve(&ctx))
286 .await
287 }
288}