geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6    future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19    auth::{auth_loop, get_auth_token},
20    broker::{broker_client, BrokerSource},
21    bw_token::bw_token_refresh_loop,
22    control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23    get_dialer::ExitConstraint,
24    http_proxy::http_proxy_serve,
25    logging,
26    pac::pac_serve,
27    port_forward::port_forward,
28    session::{open_conn, run_client_sessions},
29    socks5::socks5_loop,
30    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
31};
32
33#[derive(Serialize, Deserialize, Clone)]
34pub struct Config {
35    pub socks5_listen: Option<SocketAddr>,
36    pub http_proxy_listen: Option<SocketAddr>,
37    pub pac_listen: Option<SocketAddr>,
38
39    pub control_listen: Option<SocketAddr>,
40    pub exit_constraint: ExitConstraint,
41    #[serde(default)]
42    pub allow_direct: bool,
43
44    pub cache: Option<PathBuf>,
45
46    pub broker: Option<BrokerSource>,
47    pub broker_keys: Option<BrokerKeys>,
48
49    #[serde(default)]
50    pub port_forward: Vec<PortForwardCfg>,
51
52    #[serde(default)]
53    pub vpn: bool,
54    #[serde(default)]
55    pub vpn_fd: Option<i32>,
56    #[serde(default)]
57    pub spoof_dns: bool,
58    #[serde(default)]
59    pub passthrough_china: bool,
60    #[serde(default)]
61    pub dry_run: bool,
62    #[serde(default)]
63    pub credentials: Credential,
64
65    #[serde(default)]
66    pub sess_metadata: serde_json::Value,
67    pub task_limit: Option<u32>,
68}
69
70#[derive(Serialize, Deserialize, Clone)]
71pub struct PortForwardCfg {
72    pub listen: SocketAddr,
73    pub connect: String,
74}
75
76#[derive(Serialize, Deserialize, Clone)]
77/// Broker keys, in hexadecimal format.
78pub struct BrokerKeys {
79    pub master: String,
80    pub mizaru_free: String,
81    pub mizaru_plus: String,
82    pub mizaru_bw: String,
83}
84
85impl Config {
86    /// Create an "inert" version of this config that does not start any processes.
87    pub fn inert(&self) -> Self {
88        let mut this = self.clone();
89        this.dry_run = true;
90        this.socks5_listen = None;
91        this.http_proxy_listen = None;
92        this.pac_listen = None;
93        this.control_listen = None;
94        this
95    }
96}
97
98#[derive(Clone)]
99pub struct Client {
100    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
101    ctx: AnyCtx<Config>,
102}
103
104impl Client {
105    /// Starts the client logic in the loop, returning the handle.
106    pub fn start(cfg: Config) -> Self {
107        let ctx = AnyCtx::new(cfg.clone());
108        // Initialize logging once we have context so JSON logs go to SQLite
109        let _ = logging::init_logging(&ctx);
110        let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
111            if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
112                binary_search::Direction::Low(())
113            } else {
114                binary_search::Direction::High(())
115            }
116        });
117        tracing::info!("raised file descriptor limit to {}", fd_limit);
118
119        #[cfg(unix)]
120        if let Some(fd) = cfg.vpn_fd {
121            let ctx_clone = ctx.clone();
122            smolscale::spawn(async move {
123                // Create an async file descriptor from the raw fd
124                let async_fd: smol::Async<File> =
125                    smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
126                        .expect("could not wrap VPN fd in Async");
127
128                // Split the file descriptor for reading and writingz
129                let (mut reader, mut writer) = async_fd.split();
130
131                // Spawn a task for reading from fd and sending to VPN
132                let read_task = async {
133                    let mut buf = vec![0u8; 65535]; // Buffer for reading packets
134                    loop {
135                        match reader.read(&mut buf).await {
136                            Ok(n) if n > 0 => {
137                                // Send the packet to the VPN
138                                send_vpn_packet(
139                                    &ctx_clone,
140                                    bytes::Bytes::copy_from_slice(&buf[..n]),
141                                )
142                                .await;
143                            }
144                            Ok(0) => {
145                                // EOF
146                                tracing::warn!("VPN fd reached EOF");
147                                break;
148                            }
149                            Err(e) => {
150                                tracing::error!("Error reading from VPN fd: {}", e);
151                                break;
152                            }
153                            _ => break,
154                        }
155                    }
156                    anyhow::Ok(())
157                };
158
159                // Spawn a task for receiving from VPN and writing to fd
160                let write_task = async {
161                    loop {
162                        // Receive a packet from the VPN
163                        let packet = recv_vpn_packet(&ctx_clone).await;
164
165                        // Write the packet to the file descriptor
166                        if let Err(e) = writer.write_all(&packet).await {
167                            tracing::error!("Error writing to VPN fd: {}", e);
168                            break;
169                        }
170
171                        if let Err(e) = writer.flush().await {
172                            tracing::error!("Error flushing VPN fd: {}", e);
173                            break;
174                        }
175                    }
176                    anyhow::Ok(())
177                };
178
179                // Wait for either task to complete (or fail)
180                let _ = read_task.race(write_task).await;
181                tracing::warn!("VPN fd handler exited");
182            })
183            .detach();
184        }
185        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
186        Client {
187            task: task.shared(),
188            ctx,
189        }
190    }
191
192    /// Opens a connection through the tunnel.
193    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
194        open_conn(&self.ctx, "tcp", remote).await
195    }
196
197    /// Wait until there's an error.
198    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
199        self.task.await.map_err(|e| anyhow::anyhow!(e))
200    }
201
202    /// Check for an error.
203    pub fn check_dead(&self) -> anyhow::Result<()> {
204        match self
205            .task
206            .clone()
207            .poll(&mut std::task::Context::from_waker(&noop_waker()))
208        {
209            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
210            std::task::Poll::Pending => {}
211        }
212
213        Ok(())
214    }
215
216    /// Get the control protocol client.
217    pub fn control_client(&self) -> ControlClient {
218        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
219            ControlService(ControlProtocolImpl {
220                ctx: self.ctx.clone(),
221            }),
222        )))
223    }
224
225    /// Gets the user info.
226    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
227        let auth_token = get_auth_token(&self.ctx).await?;
228        let user_info = broker_client(&self.ctx)?
229            .get_user_info(auth_token)
230            .await??
231            .context("no such user")?;
232        Ok(user_info)
233    }
234
235    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
236    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
237        send_vpn_packet(&self.ctx, bts).await;
238        Ok(())
239    }
240
241    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
242    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
243        let packet = recv_vpn_packet(&self.ctx).await;
244        Ok(packet)
245    }
246}
247
248pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
249
250async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
251    let rpc_serve = async {
252        if let Some(control_listen) = ctx.init().control_listen {
253            nanorpc_sillad::rpc_serve(
254                sillad::tcp::TcpListener::bind(control_listen).await?,
255                ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
256            )
257            .await?;
258            anyhow::Ok(())
259        } else {
260            smol::future::pending().await
261        }
262    };
263    if ctx.init().dry_run {
264        rpc_serve.await
265    } else {
266        let vpn_loop = vpn_loop(&ctx);
267
268        let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
269
270        socks5_loop(&ctx)
271            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
272            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
273            .race(
274                http_proxy_serve(&ctx)
275                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
276            )
277            .race(
278                auth_loop(&ctx)
279                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
280            )
281            .race(
282                bw_token_refresh_loop(&ctx)
283                    .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
284            )
285            .race(rpc_serve)
286            .race(pac_serve(&ctx))
287            .race(port_forward(&ctx))
288            .await
289    }
290}