geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{future::Shared, task::noop_waker, FutureExt, TryFutureExt};
6use geph5_broker_protocol::{Credential, ExitList, UserInfo};
7use nanorpc::DynRpcTransport;
8use sillad::Pipe;
9use smol::future::FutureExt as _;
10use std::{net::SocketAddr, path::PathBuf, sync::Arc};
11
12use serde::{Deserialize, Serialize};
13use smolscale::immortal::Immortal;
14
15use crate::{
16    auth::{auth_loop, get_auth_token},
17    broker::{broker_client, BrokerSource},
18    client_inner::{client_inner, open_conn},
19    control_prot::{
20        ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
21    },
22    database::db_read_or_wait,
23    http_proxy::run_http_proxy,
24    route::ExitConstraint,
25    socks5::socks5_loop,
26    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
27};
28
29#[derive(Serialize, Deserialize, Clone)]
30pub struct Config {
31    pub socks5_listen: Option<SocketAddr>,
32    pub http_proxy_listen: Option<SocketAddr>,
33
34    pub control_listen: Option<SocketAddr>,
35    pub exit_constraint: ExitConstraint,
36    #[serde(default)]
37    pub bridge_mode: BridgeMode,
38    pub cache: Option<PathBuf>,
39
40    pub broker: Option<BrokerSource>,
41    pub broker_keys: Option<BrokerKeys>,
42
43    #[serde(default)]
44    pub vpn: bool,
45    #[serde(default)]
46    pub spoof_dns: bool,
47    #[serde(default)]
48    pub passthrough_china: bool,
49    #[serde(default)]
50    pub dry_run: bool,
51    #[serde(default)]
52    pub credentials: Credential,
53
54    #[serde(default)]
55    pub sess_metadata: serde_json::Value,
56    pub task_limit: Option<u32>,
57}
58
59#[derive(Serialize, Deserialize, Clone)]
60/// Broker keys, in hexadecimal format.
61pub struct BrokerKeys {
62    pub master: String,
63    pub mizaru_free: String,
64    pub mizaru_plus: String,
65}
66
67impl Config {
68    /// Create an "inert" version of this config that does not start any processes.
69    pub fn inert(&self) -> Self {
70        let mut this = self.clone();
71        this.dry_run = true;
72        this.socks5_listen = None;
73        this.http_proxy_listen = None;
74
75        this.control_listen = None;
76        this
77    }
78}
79
80#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
81pub enum BridgeMode {
82    Auto,
83    ForceBridges,
84    ForceDirect,
85}
86
87impl Default for BridgeMode {
88    fn default() -> Self {
89        Self::Auto
90    }
91}
92
93pub struct Client {
94    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
95    ctx: AnyCtx<Config>,
96}
97
98impl Client {
99    /// Starts the client logic in the loop, returning the handle.
100    pub fn start(cfg: Config) -> Self {
101        std::env::remove_var("http_proxy");
102        std::env::remove_var("https_proxy");
103        std::env::remove_var("HTTP_PROXY");
104        std::env::remove_var("HTTPS_PROXY");
105        let ctx = AnyCtx::new(cfg);
106        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
107        Client {
108            task: task.shared(),
109            ctx,
110        }
111    }
112
113    /// Opens a connection through the tunnel.
114    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
115        open_conn(&self.ctx, "tcp", remote).await
116    }
117
118    /// Wait until there's an error.
119    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
120        self.task.await.map_err(|e| anyhow::anyhow!(e))
121    }
122
123    /// Check for an error.
124    pub fn check_dead(&self) -> anyhow::Result<()> {
125        match self
126            .task
127            .clone()
128            .poll(&mut std::task::Context::from_waker(&noop_waker()))
129        {
130            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
131            std::task::Poll::Pending => {}
132        }
133
134        Ok(())
135    }
136
137    /// Get the control protocol client.
138    pub fn control_client(&self) -> ControlClient {
139        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
140            ControlService(ControlProtocolImpl {
141                ctx: self.ctx.clone(),
142            }),
143        )))
144    }
145
146    /// Gets the user info.
147    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
148        let auth_token = get_auth_token(&self.ctx).await?;
149        let user_info = broker_client(&self.ctx)?
150            .get_user_info(auth_token)
151            .await??
152            .context("no such user")?;
153        Ok(user_info)
154    }
155
156    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
157    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
158        send_vpn_packet(&self.ctx, bts).await;
159        Ok(())
160    }
161
162    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
163    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
164        let packet = recv_vpn_packet(&self.ctx).await;
165        Ok(packet)
166    }
167}
168
169pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
170
171async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
172    #[derive(Serialize)]
173    struct DryRunOutput {
174        auth_token: String,
175        exits: ExitList,
176    }
177
178    if ctx.init().dry_run {
179        auth_loop(&ctx)
180            .race(async {
181                let broker_client = broker_client(&ctx)?;
182                let exits = broker_client
183                    .get_exits()
184                    .await?
185                    .map_err(|e| anyhow::anyhow!("{e}"))?;
186                let auth_token = db_read_or_wait(&ctx, "auth_token").await?;
187                let exits = exits.inner;
188                println!(
189                    "{}",
190                    serde_json::to_string(&DryRunOutput {
191                        auth_token: hex::encode(auth_token),
192                        exits,
193                    })?
194                );
195                anyhow::Ok(())
196            })
197            .await
198    } else {
199        let vpn_loop = vpn_loop(&ctx);
200
201        let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
202
203        let rpc_serve = async {
204            if let Some(control_listen) = ctx.init().control_listen {
205                nanorpc_sillad::rpc_serve(
206                    sillad::tcp::TcpListener::bind(control_listen).await?,
207                    ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
208                )
209                .await?;
210                anyhow::Ok(())
211            } else {
212                smol::future::pending().await
213            }
214        };
215
216        socks5_loop(&ctx)
217            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
218            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
219            .race(
220                run_http_proxy(&ctx)
221                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
222            )
223            .race(
224                auth_loop(&ctx)
225                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
226            )
227            .race(rpc_serve)
228            .await
229    }
230}