geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6    future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, ExitList, UserInfo};
9use nanorpc::DynRpcTransport;
10use sillad::Pipe;
11use smol::future::FutureExt as _;
12use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
13
14use serde::{Deserialize, Serialize};
15use smolscale::immortal::Immortal;
16
17use crate::{
18    auth::{auth_loop, get_auth_token},
19    broker::{broker_client, BrokerSource},
20    client_inner::{client_inner, open_conn},
21    control_prot::{
22        ControlClient, ControlProtocolImpl, ControlService, DummyControlProtocolTransport,
23        CURRENT_CONN_INFO,
24    },
25    database::db_read_or_wait,
26    http_proxy::http_proxy_serve,
27    pac::pac_serve,
28    route::ExitConstraint,
29    socks5::socks5_loop,
30    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
31    ConnInfo,
32};
33
34#[derive(Serialize, Deserialize, Clone)]
35pub struct Config {
36    pub socks5_listen: Option<SocketAddr>,
37    pub http_proxy_listen: Option<SocketAddr>,
38    pub pac_listen: Option<SocketAddr>,
39
40    pub control_listen: Option<SocketAddr>,
41    pub exit_constraint: ExitConstraint,
42    #[serde(default)]
43    pub bridge_mode: BridgeMode,
44    pub cache: Option<PathBuf>,
45
46    pub broker: Option<BrokerSource>,
47    pub broker_keys: Option<BrokerKeys>,
48
49    #[serde(default)]
50    pub vpn: bool,
51    #[serde(default)]
52    pub vpn_fd: Option<i32>,
53    #[serde(default)]
54    pub spoof_dns: bool,
55    #[serde(default)]
56    pub passthrough_china: bool,
57    #[serde(default)]
58    pub dry_run: bool,
59    #[serde(default)]
60    pub credentials: Credential,
61
62    #[serde(default)]
63    pub sess_metadata: serde_json::Value,
64    pub task_limit: Option<u32>,
65}
66
67#[derive(Serialize, Deserialize, Clone)]
68/// Broker keys, in hexadecimal format.
69pub struct BrokerKeys {
70    pub master: String,
71    pub mizaru_free: String,
72    pub mizaru_plus: String,
73}
74
75impl Config {
76    /// Create an "inert" version of this config that does not start any processes.
77    pub fn inert(&self) -> Self {
78        let mut this = self.clone();
79        this.dry_run = true;
80        this.socks5_listen = None;
81        this.http_proxy_listen = None;
82        this.pac_listen = None;
83        this.control_listen = None;
84        this
85    }
86}
87
88#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
89pub enum BridgeMode {
90    Auto,
91    ForceBridges,
92    ForceDirect,
93}
94
95impl Default for BridgeMode {
96    fn default() -> Self {
97        Self::Auto
98    }
99}
100
101pub struct Client {
102    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
103    ctx: AnyCtx<Config>,
104}
105
106impl Client {
107    /// Starts the client logic in the loop, returning the handle.
108    pub fn start(cfg: Config) -> Self {
109        std::env::remove_var("http_proxy");
110        std::env::remove_var("https_proxy");
111        std::env::remove_var("HTTP_PROXY");
112        std::env::remove_var("HTTPS_PROXY");
113        let ctx = AnyCtx::new(cfg.clone());
114
115        #[cfg(unix)]
116        if let Some(fd) = cfg.vpn_fd {
117            let ctx_clone = ctx.clone();
118            smolscale::spawn(async move {
119                // Create an async file descriptor from the raw fd
120                let async_fd: smol::Async<File> =
121                    smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
122                        .expect("could not wrap VPN fd in Async");
123
124                // Split the file descriptor for reading and writingz
125                let (mut reader, mut writer) = async_fd.split();
126
127                // Spawn a task for reading from fd and sending to VPN
128                let read_task = async {
129                    let mut buf = vec![0u8; 65535]; // Buffer for reading packets
130                    loop {
131                        match reader.read(&mut buf).await {
132                            Ok(n) if n > 0 => {
133                                // Send the packet to the VPN
134                                send_vpn_packet(
135                                    &ctx_clone,
136                                    bytes::Bytes::copy_from_slice(&buf[..n]),
137                                )
138                                .await;
139                            }
140                            Ok(0) => {
141                                // EOF
142                                tracing::warn!("VPN fd reached EOF");
143                                break;
144                            }
145                            Err(e) => {
146                                tracing::error!("Error reading from VPN fd: {}", e);
147                                break;
148                            }
149                            _ => break,
150                        }
151                    }
152                    anyhow::Ok(())
153                };
154
155                // Spawn a task for receiving from VPN and writing to fd
156                let write_task = async {
157                    loop {
158                        // Receive a packet from the VPN
159                        let packet = recv_vpn_packet(&ctx_clone).await;
160
161                        // Write the packet to the file descriptor
162                        if let Err(e) = writer.write_all(&packet).await {
163                            tracing::error!("Error writing to VPN fd: {}", e);
164                            break;
165                        }
166
167                        if let Err(e) = writer.flush().await {
168                            tracing::error!("Error flushing VPN fd: {}", e);
169                            break;
170                        }
171                    }
172                    anyhow::Ok(())
173                };
174
175                // Wait for either task to complete (or fail)
176                let _ = read_task.race(write_task).await;
177                tracing::warn!("VPN fd handler exited");
178            })
179            .detach();
180        }
181        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
182        Client {
183            task: task.shared(),
184            ctx,
185        }
186    }
187
188    /// Opens a connection through the tunnel.
189    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
190        open_conn(&self.ctx, "tcp", remote).await
191    }
192
193    /// Wait until there's an error.
194    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
195        self.task.await.map_err(|e| anyhow::anyhow!(e))
196    }
197
198    /// Check for an error.
199    pub fn check_dead(&self) -> anyhow::Result<()> {
200        match self
201            .task
202            .clone()
203            .poll(&mut std::task::Context::from_waker(&noop_waker()))
204        {
205            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
206            std::task::Poll::Pending => {}
207        }
208
209        Ok(())
210    }
211
212    /// Get the control protocol client.
213    pub fn control_client(&self) -> ControlClient {
214        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
215            ControlService(ControlProtocolImpl {
216                ctx: self.ctx.clone(),
217            }),
218        )))
219    }
220
221    /// Gets the user info.
222    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
223        let auth_token = get_auth_token(&self.ctx).await?;
224        let user_info = broker_client(&self.ctx)?
225            .get_user_info(auth_token)
226            .await??
227            .context("no such user")?;
228        Ok(user_info)
229    }
230
231    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
232    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
233        send_vpn_packet(&self.ctx, bts).await;
234        Ok(())
235    }
236
237    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
238    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
239        let packet = recv_vpn_packet(&self.ctx).await;
240        Ok(packet)
241    }
242}
243
244pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
245
246async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
247    #[derive(Serialize)]
248    struct DryRunOutput {
249        auth_token: String,
250        exits: ExitList,
251    }
252
253    if ctx.init().dry_run {
254        smol::future::pending().await
255    } else {
256        let rpc_serve = async {
257            if let Some(control_listen) = ctx.init().control_listen {
258                nanorpc_sillad::rpc_serve(
259                    sillad::tcp::TcpListener::bind(control_listen).await?,
260                    ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
261                )
262                .await?;
263                anyhow::Ok(())
264            } else {
265                smol::future::pending().await
266            }
267        };
268
269        let vpn_loop = vpn_loop(&ctx);
270
271        let _client_loop = Immortal::spawn(client_inner(ctx.clone()));
272
273        socks5_loop(&ctx)
274            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
275            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
276            .race(
277                http_proxy_serve(&ctx)
278                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
279            )
280            .race(
281                auth_loop(&ctx)
282                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
283            )
284            .race(rpc_serve)
285            .race(pac_serve(&ctx))
286            .await
287    }
288}