geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6    future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19    auth::{auth_loop, get_auth_token},
20    broker::{broker_client, BrokerSource},
21    bw_token::bw_token_refresh_loop,
22    control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23    get_dialer::ExitConstraint,
24    http_proxy::http_proxy_serve,
25    pac::pac_serve,
26    session::{open_conn, run_client_sessions},
27    socks5::socks5_loop,
28    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
29};
30
31#[derive(Serialize, Deserialize, Clone)]
32pub struct Config {
33    pub socks5_listen: Option<SocketAddr>,
34    pub http_proxy_listen: Option<SocketAddr>,
35    pub pac_listen: Option<SocketAddr>,
36
37    pub control_listen: Option<SocketAddr>,
38    pub exit_constraint: ExitConstraint,
39    #[serde(default)]
40    pub bridge_mode: BridgeMode,
41    pub cache: Option<PathBuf>,
42
43    pub broker: Option<BrokerSource>,
44    pub broker_keys: Option<BrokerKeys>,
45
46    #[serde(default)]
47    pub vpn: bool,
48    #[serde(default)]
49    pub vpn_fd: Option<i32>,
50    #[serde(default)]
51    pub spoof_dns: bool,
52    #[serde(default)]
53    pub passthrough_china: bool,
54    #[serde(default)]
55    pub dry_run: bool,
56    #[serde(default)]
57    pub credentials: Credential,
58
59    #[serde(default)]
60    pub sess_metadata: serde_json::Value,
61    pub task_limit: Option<u32>,
62}
63
64#[derive(Serialize, Deserialize, Clone)]
65/// Broker keys, in hexadecimal format.
66pub struct BrokerKeys {
67    pub master: String,
68    pub mizaru_free: String,
69    pub mizaru_plus: String,
70    pub mizaru_bw: String,
71}
72
73impl Config {
74    /// Create an "inert" version of this config that does not start any processes.
75    pub fn inert(&self) -> Self {
76        let mut this = self.clone();
77        this.dry_run = true;
78        this.socks5_listen = None;
79        this.http_proxy_listen = None;
80        this.pac_listen = None;
81        this.control_listen = None;
82        this
83    }
84}
85
86#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
87pub enum BridgeMode {
88    Auto,
89    ForceBridges,
90}
91
92impl Default for BridgeMode {
93    fn default() -> Self {
94        Self::Auto
95    }
96}
97
98#[derive(Clone)]
99pub struct Client {
100    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
101    ctx: AnyCtx<Config>,
102}
103
104impl Client {
105    /// Starts the client logic in the loop, returning the handle.
106    pub fn start(cfg: Config) -> Self {
107        std::env::remove_var("http_proxy");
108        std::env::remove_var("https_proxy");
109        std::env::remove_var("HTTP_PROXY");
110        std::env::remove_var("HTTPS_PROXY");
111        let ctx = AnyCtx::new(cfg.clone());
112        let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
113            if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
114                binary_search::Direction::Low(())
115            } else {
116                binary_search::Direction::High(())
117            }
118        });
119        tracing::info!("raised file descriptor limit to {}", fd_limit);
120
121        #[cfg(unix)]
122        if let Some(fd) = cfg.vpn_fd {
123            let ctx_clone = ctx.clone();
124            smolscale::spawn(async move {
125                // Create an async file descriptor from the raw fd
126                let async_fd: smol::Async<File> =
127                    smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
128                        .expect("could not wrap VPN fd in Async");
129
130                // Split the file descriptor for reading and writingz
131                let (mut reader, mut writer) = async_fd.split();
132
133                // Spawn a task for reading from fd and sending to VPN
134                let read_task = async {
135                    let mut buf = vec![0u8; 65535]; // Buffer for reading packets
136                    loop {
137                        match reader.read(&mut buf).await {
138                            Ok(n) if n > 0 => {
139                                // Send the packet to the VPN
140                                send_vpn_packet(
141                                    &ctx_clone,
142                                    bytes::Bytes::copy_from_slice(&buf[..n]),
143                                )
144                                .await;
145                            }
146                            Ok(0) => {
147                                // EOF
148                                tracing::warn!("VPN fd reached EOF");
149                                break;
150                            }
151                            Err(e) => {
152                                tracing::error!("Error reading from VPN fd: {}", e);
153                                break;
154                            }
155                            _ => break,
156                        }
157                    }
158                    anyhow::Ok(())
159                };
160
161                // Spawn a task for receiving from VPN and writing to fd
162                let write_task = async {
163                    loop {
164                        // Receive a packet from the VPN
165                        let packet = recv_vpn_packet(&ctx_clone).await;
166
167                        // Write the packet to the file descriptor
168                        if let Err(e) = writer.write_all(&packet).await {
169                            tracing::error!("Error writing to VPN fd: {}", e);
170                            break;
171                        }
172
173                        if let Err(e) = writer.flush().await {
174                            tracing::error!("Error flushing VPN fd: {}", e);
175                            break;
176                        }
177                    }
178                    anyhow::Ok(())
179                };
180
181                // Wait for either task to complete (or fail)
182                let _ = read_task.race(write_task).await;
183                tracing::warn!("VPN fd handler exited");
184            })
185            .detach();
186        }
187        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
188        Client {
189            task: task.shared(),
190            ctx,
191        }
192    }
193
194    /// Opens a connection through the tunnel.
195    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
196        open_conn(&self.ctx, "tcp", remote).await
197    }
198
199    /// Wait until there's an error.
200    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
201        self.task.await.map_err(|e| anyhow::anyhow!(e))
202    }
203
204    /// Check for an error.
205    pub fn check_dead(&self) -> anyhow::Result<()> {
206        match self
207            .task
208            .clone()
209            .poll(&mut std::task::Context::from_waker(&noop_waker()))
210        {
211            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
212            std::task::Poll::Pending => {}
213        }
214
215        Ok(())
216    }
217
218    /// Get the control protocol client.
219    pub fn control_client(&self) -> ControlClient {
220        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
221            ControlService(ControlProtocolImpl {
222                ctx: self.ctx.clone(),
223            }),
224        )))
225    }
226
227    /// Gets the user info.
228    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
229        let auth_token = get_auth_token(&self.ctx).await?;
230        let user_info = broker_client(&self.ctx)?
231            .get_user_info(auth_token)
232            .await??
233            .context("no such user")?;
234        Ok(user_info)
235    }
236
237    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
238    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
239        send_vpn_packet(&self.ctx, bts).await;
240        Ok(())
241    }
242
243    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
244    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
245        let packet = recv_vpn_packet(&self.ctx).await;
246        Ok(packet)
247    }
248}
249
250pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
251
252async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
253    let rpc_serve = async {
254        if let Some(control_listen) = ctx.init().control_listen {
255            nanorpc_sillad::rpc_serve(
256                sillad::tcp::TcpListener::bind(control_listen).await?,
257                ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
258            )
259            .await?;
260            anyhow::Ok(())
261        } else {
262            smol::future::pending().await
263        }
264    };
265    if ctx.init().dry_run {
266        rpc_serve.await
267    } else {
268        let vpn_loop = vpn_loop(&ctx);
269
270        let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
271
272        socks5_loop(&ctx)
273            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
274            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
275            .race(
276                http_proxy_serve(&ctx)
277                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
278            )
279            .race(
280                auth_loop(&ctx)
281                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
282            )
283            .race(
284                bw_token_refresh_loop(&ctx)
285                    .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
286            )
287            .race(rpc_serve)
288            .race(pac_serve(&ctx))
289            .await
290    }
291}