Skip to main content

geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6    AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt, future::Shared, task::noop_waker,
7};
8use geph5_broker_protocol::{Credential, ExitConstraint, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19    auth::{auth_loop, get_auth_token},
20    broker::{BrokerSource, broker_client},
21    bw_token::bw_token_refresh_loop,
22    control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23    http_proxy::http_proxy_serve,
24    logging,
25    pac::pac_serve,
26    port_forward::port_forward,
27    session::{open_conn, run_client_sessions},
28    socks5::socks5_loop,
29    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
30};
31
32#[derive(Serialize, Deserialize, Clone)]
33pub struct Config {
34    pub socks5_listen: Option<SocketAddr>,
35    pub http_proxy_listen: Option<SocketAddr>,
36    pub pac_listen: Option<SocketAddr>,
37
38    pub control_listen: Option<SocketAddr>,
39    pub exit_constraint: ExitConstraint,
40    #[serde(default)]
41    pub allow_direct: bool,
42
43    pub cache: Option<PathBuf>,
44
45    pub broker: Option<BrokerSource>,
46    pub broker_keys: Option<BrokerKeys>,
47
48    #[serde(default)]
49    pub port_forward: Vec<PortForwardCfg>,
50
51    #[serde(default)]
52    pub vpn: bool,
53    #[serde(default)]
54    pub vpn_fd: Option<i32>,
55    #[serde(default)]
56    pub spoof_dns: bool,
57    #[serde(default)]
58    pub passthrough_china: bool,
59    #[serde(default)]
60    pub dry_run: bool,
61    #[serde(default)]
62    pub credentials: Credential,
63
64    #[serde(default)]
65    pub sess_metadata: serde_json::Value,
66    pub task_limit: Option<u32>,
67}
68
69#[derive(Serialize, Deserialize, Clone)]
70pub struct PortForwardCfg {
71    pub listen: SocketAddr,
72    pub connect: String,
73}
74
75#[derive(Serialize, Deserialize, Clone)]
76/// Broker keys, in hexadecimal format.
77pub struct BrokerKeys {
78    pub master: String,
79    pub mizaru_free: String,
80    pub mizaru_plus: String,
81    pub mizaru_bw: String,
82}
83
84impl Config {
85    /// Create an "inert" version of this config that does not start any processes.
86    pub fn inert(&self) -> Self {
87        let mut this = self.clone();
88        this.dry_run = true;
89        this.socks5_listen = None;
90        this.http_proxy_listen = None;
91        this.pac_listen = None;
92        this.control_listen = None;
93        this
94    }
95}
96
97#[derive(Clone)]
98pub struct Client {
99    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
100    ctx: AnyCtx<Config>,
101}
102
103impl Client {
104    /// Starts the client logic in the loop, returning the handle.
105    pub fn start(cfg: Config) -> Self {
106        let ctx = AnyCtx::new(cfg.clone());
107        // Initialize logging once we have context so JSON logs go to SQLite
108        let _ = logging::init_logging(&ctx);
109        let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
110            if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
111                binary_search::Direction::Low(())
112            } else {
113                binary_search::Direction::High(())
114            }
115        });
116        tracing::info!("raised file descriptor limit to {}", fd_limit);
117
118        #[cfg(unix)]
119        if let Some(fd) = cfg.vpn_fd {
120            let ctx_clone = ctx.clone();
121            smolscale::spawn(async move {
122                // Create an async file descriptor from the raw fd
123                let async_fd: smol::Async<File> =
124                    smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
125                        .expect("could not wrap VPN fd in Async");
126
127                // Split the file descriptor for reading and writingz
128                let (mut reader, mut writer) = async_fd.split();
129
130                // Spawn a task for reading from fd and sending to VPN
131                let read_task = async {
132                    let mut buf = vec![0u8; 65535]; // Buffer for reading packets
133                    loop {
134                        match reader.read(&mut buf).await {
135                            Ok(n) if n > 0 => {
136                                // Send the packet to the VPN
137                                send_vpn_packet(
138                                    &ctx_clone,
139                                    bytes::Bytes::copy_from_slice(&buf[..n]),
140                                )
141                                .await;
142                            }
143                            Ok(0) => {
144                                // EOF
145                                tracing::warn!("VPN fd reached EOF");
146                                break;
147                            }
148                            Err(e) => {
149                                tracing::error!("Error reading from VPN fd: {}", e);
150                                break;
151                            }
152                            _ => break,
153                        }
154                    }
155                    anyhow::Ok(())
156                };
157
158                // Spawn a task for receiving from VPN and writing to fd
159                let write_task = async {
160                    loop {
161                        // Receive a packet from the VPN
162                        let packet = recv_vpn_packet(&ctx_clone).await;
163
164                        // Write the packet to the file descriptor
165                        if let Err(e) = writer.write_all(&packet).await {
166                            tracing::error!("Error writing to VPN fd: {}", e);
167                            break;
168                        }
169
170                        if let Err(e) = writer.flush().await {
171                            tracing::error!("Error flushing VPN fd: {}", e);
172                            break;
173                        }
174                    }
175                    anyhow::Ok(())
176                };
177
178                // Wait for either task to complete (or fail)
179                let _ = read_task.race(write_task).await;
180                tracing::warn!("VPN fd handler exited");
181            })
182            .detach();
183        }
184        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
185        Client {
186            task: task.shared(),
187            ctx,
188        }
189    }
190
191    /// Opens a connection through the tunnel.
192    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
193        open_conn(&self.ctx, "tcp", remote).await
194    }
195
196    /// Wait until there's an error.
197    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
198        self.task.await.map_err(|e| anyhow::anyhow!(e))
199    }
200
201    /// Check for an error.
202    pub fn check_dead(&self) -> anyhow::Result<()> {
203        match self
204            .task
205            .clone()
206            .poll(&mut std::task::Context::from_waker(&noop_waker()))
207        {
208            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
209            std::task::Poll::Pending => {}
210        }
211
212        Ok(())
213    }
214
215    /// Get the control protocol client.
216    pub fn control_client(&self) -> ControlClient {
217        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
218            ControlService(ControlProtocolImpl {
219                ctx: self.ctx.clone(),
220            }),
221        )))
222    }
223
224    /// Gets the user info.
225    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
226        let auth_token = get_auth_token(&self.ctx).await?;
227        let user_info = broker_client(&self.ctx)?
228            .get_user_info(auth_token)
229            .await??
230            .context("no such user")?;
231        Ok(user_info)
232    }
233
234    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
235    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
236        send_vpn_packet(&self.ctx, bts).await;
237        Ok(())
238    }
239
240    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
241    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
242        let packet = recv_vpn_packet(&self.ctx).await;
243        Ok(packet)
244    }
245}
246
247pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
248
249async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
250    let rpc_serve = async {
251        if let Some(control_listen) = ctx.init().control_listen {
252            nanorpc_sillad::rpc_serve(
253                sillad::tcp::TcpListener::bind(control_listen).await?,
254                ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
255            )
256            .await?;
257            anyhow::Ok(())
258        } else {
259            smol::future::pending().await
260        }
261    };
262    if ctx.init().dry_run {
263        rpc_serve.await
264    } else {
265        let vpn_loop = vpn_loop(&ctx);
266
267        let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
268
269        socks5_loop(&ctx)
270            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
271            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
272            .race(
273                http_proxy_serve(&ctx)
274                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
275            )
276            .race(
277                auth_loop(&ctx)
278                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
279            )
280            .race(
281                bw_token_refresh_loop(&ctx)
282                    .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
283            )
284            .race(rpc_serve)
285            .race(pac_serve(&ctx))
286            .race(port_forward(&ctx))
287            .await
288    }
289}