1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{future::Shared, task::noop_waker, FutureExt, TryFutureExt};
6use geph5_broker_protocol::{Credential, ExitList, UserInfo};
7use nanorpc::DynRpcTransport;
8use sillad::Pipe;
9use smol::future::FutureExt as _;
10use std::{net::SocketAddr, path::PathBuf, sync::Arc};
11
12use serde::{Deserialize, Serialize};
13use smolscale::immortal::Immortal;
14
15use crate::{
16 auth::{auth_loop, get_auth_token},
17 broker::{broker_client, BrokerSource},
18 client_inner::{client_inner, open_conn},
19 control_prot::{
20 ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
21 },
22 database::db_read_or_wait,
23 http_proxy::run_http_proxy,
24 route::ExitConstraint,
25 socks5::socks5_loop,
26 vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
27};
28
29#[derive(Serialize, Deserialize, Clone)]
30pub struct Config {
31 pub socks5_listen: Option<SocketAddr>,
32 pub http_proxy_listen: Option<SocketAddr>,
33
34 pub control_listen: Option<SocketAddr>,
35 pub exit_constraint: ExitConstraint,
36 #[serde(default)]
37 pub bridge_mode: BridgeMode,
38 pub cache: Option<PathBuf>,
39
40 pub broker: Option<BrokerSource>,
41 pub broker_keys: Option<BrokerKeys>,
42
43 #[serde(default)]
44 pub vpn: bool,
45 #[serde(default)]
46 pub spoof_dns: bool,
47 #[serde(default)]
48 pub passthrough_china: bool,
49 #[serde(default)]
50 pub dry_run: bool,
51 #[serde(default)]
52 pub credentials: Credential,
53
54 #[serde(default)]
55 pub sess_metadata: serde_json::Value,
56 pub task_limit: Option<u32>,
57}
58
59#[derive(Serialize, Deserialize, Clone)]
60pub struct BrokerKeys {
62 pub master: String,
63 pub mizaru_free: String,
64 pub mizaru_plus: String,
65}
66
67impl Config {
68 pub fn inert(&self) -> Self {
70 let mut this = self.clone();
71 this.dry_run = true;
72 this.socks5_listen = None;
73 this.http_proxy_listen = None;
74
75 this.control_listen = None;
76 this
77 }
78}
79
80#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
81pub enum BridgeMode {
82 Auto,
83 ForceBridges,
84 ForceDirect,
85}
86
87impl Default for BridgeMode {
88 fn default() -> Self {
89 Self::Auto
90 }
91}
92
93pub struct Client {
94 task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
95 ctx: AnyCtx<Config>,
96}
97
98impl Client {
99 pub fn start(cfg: Config) -> Self {
101 std::env::remove_var("http_proxy");
102 std::env::remove_var("https_proxy");
103 std::env::remove_var("HTTP_PROXY");
104 std::env::remove_var("HTTPS_PROXY");
105 let ctx = AnyCtx::new(cfg);
106 let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
107 Client {
108 task: task.shared(),
109 ctx,
110 }
111 }
112
113 pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
115 open_conn(&self.ctx, "tcp", remote).await
116 }
117
118 pub async fn wait_until_dead(self) -> anyhow::Result<()> {
120 self.task.await.map_err(|e| anyhow::anyhow!(e))
121 }
122
123 pub fn check_dead(&self) -> anyhow::Result<()> {
125 match self
126 .task
127 .clone()
128 .poll(&mut std::task::Context::from_waker(&noop_waker()))
129 {
130 std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
131 std::task::Poll::Pending => {}
132 }
133
134 Ok(())
135 }
136
137 pub fn control_client(&self) -> ControlClient {
139 ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
140 ControlService(ControlProtocolImpl {
141 ctx: self.ctx.clone(),
142 }),
143 )))
144 }
145
146 pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
148 let auth_token = get_auth_token(&self.ctx).await?;
149 let user_info = broker_client(&self.ctx)?
150 .get_user_info(auth_token)
151 .await??
152 .context("no such user")?;
153 Ok(user_info)
154 }
155
156 pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
158 send_vpn_packet(&self.ctx, bts).await;
159 Ok(())
160 }
161
162 pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
164 let packet = recv_vpn_packet(&self.ctx).await;
165 Ok(packet)
166 }
167}
168
169pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
170
171async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
172 #[derive(Serialize)]
173 struct DryRunOutput {
174 auth_token: String,
175 exits: ExitList,
176 }
177
178 if ctx.init().dry_run {
179 auth_loop(&ctx)
180 .race(async {
181 let broker_client = broker_client(&ctx)?;
182 let exits = broker_client
183 .get_exits()
184 .await?
185 .map_err(|e| anyhow::anyhow!("{e}"))?;
186 let auth_token = db_read_or_wait(&ctx, "auth_token").await?;
187 let exits = exits.inner;
188 println!(
189 "{}",
190 serde_json::to_string(&DryRunOutput {
191 auth_token: hex::encode(auth_token),
192 exits,
193 })?
194 );
195 anyhow::Ok(())
196 })
197 .await
198 } else {
199 let vpn_loop = vpn_loop(&ctx);
200
201 let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
202
203 let rpc_serve = async {
204 if let Some(control_listen) = ctx.init().control_listen {
205 nanorpc_sillad::rpc_serve(
206 sillad::tcp::TcpListener::bind(control_listen).await?,
207 ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
208 )
209 .await?;
210 anyhow::Ok(())
211 } else {
212 smol::future::pending().await
213 }
214 };
215
216 socks5_loop(&ctx)
217 .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
218 .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
219 .race(
220 run_http_proxy(&ctx)
221 .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
222 )
223 .race(
224 auth_loop(&ctx)
225 .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
226 )
227 .race(rpc_serve)
228 .await
229 }
230}