geph5_client/
client.rs

1use anyctx::AnyCtx;
2
3use anyhow::Context;
4use bytes::Bytes;
5use futures_util::{
6    future::Shared, task::noop_waker, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt,
7};
8use geph5_broker_protocol::{Credential, UserInfo};
9use geph5_misc_rpc::client_control::{ControlClient, ControlService};
10use nanorpc::DynRpcTransport;
11use sillad::Pipe;
12use smol::future::FutureExt as _;
13use std::{fs::File, net::SocketAddr, path::PathBuf, sync::Arc};
14
15use serde::{Deserialize, Serialize};
16use smolscale::immortal::Immortal;
17
18use crate::{
19    auth::{auth_loop, get_auth_token},
20    broker::{broker_client, BrokerSource},
21    bw_token::bw_token_refresh_loop,
22    control_prot::{ControlProtocolImpl, DummyControlProtocolTransport},
23    get_dialer::ExitConstraint,
24    http_proxy::http_proxy_serve,
25    pac::pac_serve,
26    logging,
27    session::{open_conn, run_client_sessions},
28    socks5::socks5_loop,
29    vpn::{recv_vpn_packet, send_vpn_packet, vpn_loop},
30};
31
32#[derive(Serialize, Deserialize, Clone)]
33pub struct Config {
34    pub socks5_listen: Option<SocketAddr>,
35    pub http_proxy_listen: Option<SocketAddr>,
36    pub pac_listen: Option<SocketAddr>,
37
38    pub control_listen: Option<SocketAddr>,
39    pub exit_constraint: ExitConstraint,
40    #[serde(default)]
41    pub bridge_mode: BridgeMode,
42    pub cache: Option<PathBuf>,
43
44    pub broker: Option<BrokerSource>,
45    pub broker_keys: Option<BrokerKeys>,
46
47    #[serde(default)]
48    pub vpn: bool,
49    #[serde(default)]
50    pub vpn_fd: Option<i32>,
51    #[serde(default)]
52    pub spoof_dns: bool,
53    #[serde(default)]
54    pub passthrough_china: bool,
55    #[serde(default)]
56    pub dry_run: bool,
57    #[serde(default)]
58    pub credentials: Credential,
59
60    #[serde(default)]
61    pub sess_metadata: serde_json::Value,
62    pub task_limit: Option<u32>,
63}
64
65#[derive(Serialize, Deserialize, Clone)]
66/// Broker keys, in hexadecimal format.
67pub struct BrokerKeys {
68    pub master: String,
69    pub mizaru_free: String,
70    pub mizaru_plus: String,
71    pub mizaru_bw: String,
72}
73
74impl Config {
75    /// Create an "inert" version of this config that does not start any processes.
76    pub fn inert(&self) -> Self {
77        let mut this = self.clone();
78        this.dry_run = true;
79        this.socks5_listen = None;
80        this.http_proxy_listen = None;
81        this.pac_listen = None;
82        this.control_listen = None;
83        this
84    }
85}
86
87#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
88pub enum BridgeMode {
89    Auto,
90    ForceBridges,
91}
92
93impl Default for BridgeMode {
94    fn default() -> Self {
95        Self::Auto
96    }
97}
98
99#[derive(Clone)]
100pub struct Client {
101    task: Shared<smol::Task<Result<(), Arc<anyhow::Error>>>>,
102    ctx: AnyCtx<Config>,
103}
104
105impl Client {
106    /// Starts the client logic in the loop, returning the handle.
107    pub fn start(cfg: Config) -> Self {
108        std::env::remove_var("http_proxy");
109        std::env::remove_var("https_proxy");
110        std::env::remove_var("HTTP_PROXY");
111        std::env::remove_var("HTTPS_PROXY");
112        let ctx = AnyCtx::new(cfg.clone());
113        // Initialize logging once we have context so JSON logs go to SQLite
114        let _ = logging::init_logging(&ctx);
115        let ((fd_limit, _), _) = binary_search::binary_search((1, ()), (65536, ()), |lim| {
116            if rlimit::increase_nofile_limit(lim).unwrap_or_default() >= lim {
117                binary_search::Direction::Low(())
118            } else {
119                binary_search::Direction::High(())
120            }
121        });
122        tracing::info!("raised file descriptor limit to {}", fd_limit);
123
124        #[cfg(unix)]
125        if let Some(fd) = cfg.vpn_fd {
126            let ctx_clone = ctx.clone();
127            smolscale::spawn(async move {
128                // Create an async file descriptor from the raw fd
129                let async_fd: smol::Async<File> =
130                    smol::Async::new(unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) })
131                        .expect("could not wrap VPN fd in Async");
132
133                // Split the file descriptor for reading and writingz
134                let (mut reader, mut writer) = async_fd.split();
135
136                // Spawn a task for reading from fd and sending to VPN
137                let read_task = async {
138                    let mut buf = vec![0u8; 65535]; // Buffer for reading packets
139                    loop {
140                        match reader.read(&mut buf).await {
141                            Ok(n) if n > 0 => {
142                                // Send the packet to the VPN
143                                send_vpn_packet(
144                                    &ctx_clone,
145                                    bytes::Bytes::copy_from_slice(&buf[..n]),
146                                )
147                                .await;
148                            }
149                            Ok(0) => {
150                                // EOF
151                                tracing::warn!("VPN fd reached EOF");
152                                break;
153                            }
154                            Err(e) => {
155                                tracing::error!("Error reading from VPN fd: {}", e);
156                                break;
157                            }
158                            _ => break,
159                        }
160                    }
161                    anyhow::Ok(())
162                };
163
164                // Spawn a task for receiving from VPN and writing to fd
165                let write_task = async {
166                    loop {
167                        // Receive a packet from the VPN
168                        let packet = recv_vpn_packet(&ctx_clone).await;
169
170                        // Write the packet to the file descriptor
171                        if let Err(e) = writer.write_all(&packet).await {
172                            tracing::error!("Error writing to VPN fd: {}", e);
173                            break;
174                        }
175
176                        if let Err(e) = writer.flush().await {
177                            tracing::error!("Error flushing VPN fd: {}", e);
178                            break;
179                        }
180                    }
181                    anyhow::Ok(())
182                };
183
184                // Wait for either task to complete (or fail)
185                let _ = read_task.race(write_task).await;
186                tracing::warn!("VPN fd handler exited");
187            })
188            .detach();
189        }
190        let task = smolscale::spawn(client_main(ctx.clone()).map_err(Arc::new));
191        Client {
192            task: task.shared(),
193            ctx,
194        }
195    }
196
197    /// Opens a connection through the tunnel.
198    pub async fn open_conn(&self, remote: &str) -> anyhow::Result<Box<dyn Pipe>> {
199        open_conn(&self.ctx, "tcp", remote).await
200    }
201
202    /// Wait until there's an error.
203    pub async fn wait_until_dead(self) -> anyhow::Result<()> {
204        self.task.await.map_err(|e| anyhow::anyhow!(e))
205    }
206
207    /// Check for an error.
208    pub fn check_dead(&self) -> anyhow::Result<()> {
209        match self
210            .task
211            .clone()
212            .poll(&mut std::task::Context::from_waker(&noop_waker()))
213        {
214            std::task::Poll::Ready(val) => val.map_err(|e| anyhow::anyhow!(e))?,
215            std::task::Poll::Pending => {}
216        }
217
218        Ok(())
219    }
220
221    /// Get the control protocol client.
222    pub fn control_client(&self) -> ControlClient {
223        ControlClient(DynRpcTransport::new(DummyControlProtocolTransport(
224            ControlService(ControlProtocolImpl {
225                ctx: self.ctx.clone(),
226            }),
227        )))
228    }
229
230    /// Gets the user info.
231    pub async fn user_info(&self) -> anyhow::Result<UserInfo> {
232        let auth_token = get_auth_token(&self.ctx).await?;
233        let user_info = broker_client(&self.ctx)?
234            .get_user_info(auth_token)
235            .await??
236            .context("no such user")?;
237        Ok(user_info)
238    }
239
240    /// Force a particular packet to be sent through VPN mode, regardless of whether VPN mode is on.
241    pub async fn send_vpn_packet(&self, bts: Bytes) -> anyhow::Result<()> {
242        send_vpn_packet(&self.ctx, bts).await;
243        Ok(())
244    }
245
246    /// Receive a packet from VPN mode, regardless of whether VPN mode is on.
247    pub async fn recv_vpn_packet(&self) -> anyhow::Result<Bytes> {
248        let packet = recv_vpn_packet(&self.ctx).await;
249        Ok(packet)
250    }
251}
252
253pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;
254
255async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
256    let rpc_serve = async {
257        if let Some(control_listen) = ctx.init().control_listen {
258            nanorpc_sillad::rpc_serve(
259                sillad::tcp::TcpListener::bind(control_listen).await?,
260                ControlService(ControlProtocolImpl { ctx: ctx.clone() }),
261            )
262            .await?;
263            anyhow::Ok(())
264        } else {
265            smol::future::pending().await
266        }
267    };
268    if ctx.init().dry_run {
269        rpc_serve.await
270    } else {
271        let vpn_loop = vpn_loop(&ctx);
272
273        let _client_loop = Immortal::spawn(run_client_sessions(ctx.clone()));
274
275        socks5_loop(&ctx)
276            .inspect_err(|e| tracing::error!(err = debug(e), "socks5 loop stopped"))
277            .race(vpn_loop.inspect_err(|e| tracing::error!(err = debug(e), "vpn loop stopped")))
278            .race(
279                http_proxy_serve(&ctx)
280                    .inspect_err(|e| tracing::error!(err = debug(e), "http proxy stopped")),
281            )
282            .race(
283                auth_loop(&ctx)
284                    .inspect_err(|e| tracing::error!(err = debug(e), "auth loop stopped")),
285            )
286            .race(
287                bw_token_refresh_loop(&ctx)
288                    .inspect_err(|e| tracing::error!(err = debug(e), "bw token loop stopped")),
289            )
290            .race(rpc_serve)
291            .race(pac_serve(&ctx))
292            .await
293    }
294}