geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{future::Shared, task::noop_waker, FutureExt, TryFutureExt};
6use geph5_broker_protocol::{Credential, ExitList, UserInfo};
7use nanorpc::DynRpcTransport;
8use sillad::Pipe;
9use smol::future::FutureExt as _;
10use std::{net::SocketAddr, path::PathBuf, sync::Arc};
11
12use serde::{Deserialize, Serialize};
13use smolscale::immortal::Immortal;
14
15use crate::{
16    auth::{auth_loop, get_auth_token},
17    broker::{broker_client, BrokerSource},
18    client_inner::{client_inner, open_conn},
19    control_prot::{
20        ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
21    },
22    database::db_read_or_wait,
23    http_proxy::http_proxy_serve,
24    pac::pac_serve,
25    route::ExitConstraint,
26    socks5::socks5_loop,
27    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
28};
29
30#[derive(Serialize, Deserialize, Clone)]
31pub struct Config {
32    pub socks5_listen: Option<SocketAddr>,
33    pub http_proxy_listen: Option<SocketAddr>,
34    pub pac_listen: Option<SocketAddr>,
35
36    pub control_listen: Option<SocketAddr>,
37    pub exit_constraint: ExitConstraint,
38    #[serde(default)]
39    pub bridge_mode: BridgeMode,
40    pub cache: Option<PathBuf>,
41
42    pub broker: Option<BrokerSource>,
43    pub broker_keys: Option<BrokerKeys>,
44
45    #[serde(default)]
46    pub vpn: bool,
47    #[serde(default)]
48    pub spoof_dns: bool,
49    #[serde(default)]
50    pub passthrough_china: bool,
51    #[serde(default)]
52    pub dry_run: bool,
53    #[serde(default)]
54    pub credentials: Credential,
55
56    #[serde(default)]
57    pub sess_metadata: serde_json::Value,
58    pub task_limit: Option<u32>,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62/// Broker keys, in hexadecimal format.
63pub struct BrokerKeys {
64    pub master: String,
65    pub mizaru_free: String,
66    pub mizaru_plus: String,
67}
68
69impl Config {
70    /// Create an "inert" version of this config that does not start any processes.
71    pub fn inert(&self) -> Self {
72        let mut this = self.clone();
73        this.dry_run = true;
74        this.socks5_listen = None;
75        this.http_proxy_listen = None;
76        this.pac_listen = None;
77        this.control_listen = None;
78        this
79    }
80}
81
82#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
83pub enum BridgeMode {
84    Auto,
85    ForceBridges,
86    ForceDirect,
87}
88
89impl Default for BridgeMode {
90    fn default() -> Self {
91        Self::Auto
92    }
93}
94
95pub struct Client {
96    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
97    ctx: AnyCtx<Config>,
98}
99
100impl Client {
101    /// Starts the client logic in the loop, returning the handle.
102    pub fn start(cfg: Config) -> Self {
103        std::env::remove_var("http_proxy");
104        std::env::remove_var("https_proxy");
105        std::env::remove_var("HTTP_PROXY");
106        std::env::remove_var("HTTPS_PROXY");
107        let ctx = AnyCtx::new(cfg);
108        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
109        Client {
110            task: task.shared(),
111            ctx,
112        }
113    }
114
115    /// Opens a connection through the tunnel.
116    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
117        open_conn(&self.ctx, "tcp", remote).await
118    }
119
120    /// Wait until there's an error.
121    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
122        self.task.await.map_err(|e| anyhow::anyhow!(e))
123    }
124
125    /// Check for an error.
126    pub fn check_dead(&self) -> anyhow::Result<()> {
127        match self
128            .task
129            .clone()
130            .poll(&mut std::task::Context::from_waker(&noop_waker()))
131        {
132            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
133            std::task::Poll::Pending => {}
134        }
135
136        Ok(())
137    }
138
139    /// Get the control protocol client.
140    pub fn control_client(&self) -> ControlClient {
141        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
142            ControlService(ControlProtocolImpl {
143                ctx: self.ctx.clone(),
144            }),
145        )))
146    }
147
148    /// Gets the user info.
149    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
150        let auth_token = get_auth_token(&self.ctx).await?;
151        let user_info = broker_client(&self.ctx)?
152            .get_user_info(auth_token)
153            .await??
154            .context("no such user")?;
155        Ok(user_info)
156    }
157
158    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
159    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
160        send_vpn_packet(&self.ctx, bts).await;
161        Ok(())
162    }
163
164    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
165    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
166        let packet = recv_vpn_packet(&self.ctx).await;
167        Ok(packet)
168    }
169}
170
171pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
172
173async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
174    #[derive(Serialize)]
175    struct DryRunOutput {
176        auth_token: String,
177        exits: ExitList,
178    }
179
180    if ctx.init().dry_run {
181        auth_loop(&ctx)
182            .race(async {
183                let broker_client = broker_client(&ctx)?;
184                let exits = broker_client
185                    .get_exits()
186                    .await?
187                    .map_err(|e| anyhow::anyhow!("{e}"))?;
188                let auth_token = db_read_or_wait(&ctx, "auth_token").await?;
189                let exits = exits.inner;
190                println!(
191                    "{}",
192                    serde_json::to_string(&DryRunOutput {
193                        auth_token: hex::encode(auth_token),
194                        exits,
195                    })?
196                );
197                anyhow::Ok(())
198            })
199            .await
200    } else {
201        let vpn_loop = vpn_loop(&ctx);
202
203        let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
204
205        let rpc_serve = async {
206            if let Some(control_listen) = ctx.init().control_listen {
207                nanorpc_sillad::rpc_serve(
208                    sillad::tcp::TcpListener::bind(control_listen).await?,
209                    ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
210                )
211                .await?;
212                anyhow::Ok(())
213            } else {
214                smol::future::pending().await
215            }
216        };
217
218        socks5_loop(&ctx)
219            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
220            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
221            .race(
222                http_proxy_serve(&ctx)
223                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
224            )
225            .race(
226                auth_loop(&ctx)
227                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
228            )
229            .race(rpc_serve)
230            .race(pac_serve(&ctx))
231            .await
232    }
233}