geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::{Arc, LazyLock},
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10    puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use moka::future::Cache;
15use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use slab::Slab;
19
20use crate::{
21    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
22    traffcount::TRAFF_COUNT, Config,
23};
24
25#[nanorpc_derive]
26#[async_trait]
27pub trait ControlProtocol {
28    async fn conn_info(&self) -> ConnInfo;
29    async fn stat_num(&self, stat: String) -> f64;
30    async fn start_time(&self) -> SystemTime;
31    async fn stop(&self);
32
33    async fn recent_logs(&self) -> Vec<String>;
34
35    // broker-proxying stuff
36
37    async fn check_secret(&self, secret: String) -> Result<bool, String>;
38    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
39    async fn start_registration(&self) -> Result<usize, String>;
40    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
41    async fn convert_legacy_account(
42        &self,
43        username: String,
44        password: String,
45    ) -> Result<String, String>;
46    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
47    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
48    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
49    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
50    async fn create_payment(
51        &self,
52        secret: String,
53        days: u32,
54        method: String,
55    ) -> Result<String, String>;
56    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58    async fn export_debug_pack(
59        &self,
60        email: Option<String>,
61        contents: String,
62    ) -> Result<(), String>;
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66#[serde(tag = "state")]
67pub enum ConnInfo {
68    Disconnected,
69    Connecting,
70    Connected(ConnectedInfo),
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct ConnectedInfo {
75    pub protocol: String,
76    pub bridge: String,
77
78    pub exit: ExitDescriptor,
79}
80
81#[derive(Serialize, Deserialize, Clone, Debug)]
82pub struct UserInfo {
83    pub level: AccountLevel,
84    pub expiry: Option<u64>,
85}
86
87pub struct ControlProtocolImpl {
88    pub ctx: AnyCtx<Config>,
89}
90
91pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
92
93static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
94    LazyLock::new(|| Mutex::new(Slab::new()));
95
96#[derive(Serialize, Deserialize, Clone)]
97pub struct RegistrationProgress {
98    pub progress: f64,
99    pub secret: Option<String>,
100}
101
102#[async_trait]
103impl ControlProtocol for ControlProtocolImpl {
104    async fn conn_info(&self) -> ConnInfo {
105        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
106    }
107
108    async fn stat_num(&self, stat: String) -> f64 {
109        stat_get_num(&self.ctx, &stat)
110    }
111
112    async fn start_time(&self) -> SystemTime {
113        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
114        *self.ctx.get(START_TIME)
115    }
116
117    async fn stop(&self) {
118        std::thread::spawn(move || {
119            std::thread::sleep(Duration::from_millis(100));
120            std::process::exit(0);
121        });
122    }
123
124    async fn recent_logs(&self) -> Vec<String> {
125        get_json_logs().split("\n").map(|s| s.to_string()).collect()
126    }
127
128    async fn check_secret(&self, secret: String) -> Result<bool, String> {
129        let res = broker_client(&self.ctx)
130            .map_err(|e| format!("{:?}", e))?
131            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
132            .await
133            .map_err(|e| format!("{:?}", e))?
134            .map_err(|e| format!("{:?}", e))?;
135        Ok(res.is_some())
136    }
137
138    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
139        let res = broker_client(&self.ctx)
140            .map_err(|e| format!("{:?}", e))?
141            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
142            .await
143            .map_err(|e| format!("{:?}", e))?
144            .map_err(|e| format!("{:?}", e))?
145            .ok_or_else(|| "no such user".to_string())?;
146        Ok(UserInfo {
147            level: if res.plus_expires_unix.is_some() {
148                AccountLevel::Plus
149            } else {
150                AccountLevel::Free
151            },
152            expiry: res.plus_expires_unix,
153        })
154    }
155
156    async fn start_registration(&self) -> Result<usize, String> {
157        let (puzzle, difficulty) = broker_client(&self.ctx)
158            .map_err(|e| format!("{:?}", e))?
159            .get_puzzle()
160            .await
161            .map_err(|e| format!("{:?}", e))?;
162        tracing::debug!(puzzle, difficulty, "got puzzle");
163        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
164            progress: 0.0,
165            secret: None,
166        });
167        let ctx = self.ctx.clone();
168        smolscale::spawn(async move {
169            loop {
170                let fallible = async {
171                    let solution = {
172                        let puzzle = puzzle.clone();
173                        smol::unblock(move || {
174                            solve_puzzle(&puzzle, difficulty, |progress| {
175                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
176                                    progress,
177                                    secret: None,
178                                }
179                            })
180                        })
181                        .await
182                    };
183                    let secret = broker_client(&ctx)?
184                        .register_user_secret(puzzle.clone(), solution)
185                        .await?
186                        .map_err(|e| anyhow::anyhow!(e))?;
187                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
188                        progress: 1.0,
189                        secret: Some(secret.clone()),
190                    };
191                    anyhow::Ok(secret)
192                };
193                if let Err(err) = fallible.await {
194                    tracing::warn!(err = debug(err), "restarting registration")
195                } else {
196                    break;
197                }
198            }
199        })
200        .detach();
201        Ok(idx)
202    }
203
204    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
205        tracing::debug!(idx, "polling registration");
206        let registers = REGISTRATIONS.lock();
207        registers
208            .get(idx)
209            .cloned()
210            .ok_or_else(|| "no such registration".to_string())
211    }
212
213    async fn convert_legacy_account(
214        &self,
215        username: String,
216        password: String,
217    ) -> Result<String, String> {
218        Ok(broker_client(&self.ctx)
219            .map_err(|e| format!("{:?}", e))?
220            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
221                username,
222                password,
223            })
224            .await
225            .map_err(|e| format!("{:?}", e))?
226            .map_err(|e| format!("{:?}", e))?)
227    }
228
229    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
230        if stat != "traffic" {
231            return Err(format!("bad: {stat}"));
232        }
233        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
234    }
235
236    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
237        let resp = broker_client(&self.ctx)
238            .map_err(|e| format!("{:?}", e))?
239            .get_exits()
240            .await
241            .map_err(|e| format!("{:?}", e))?
242            .map_err(|e| format!("{:?}", e))?;
243        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
244    }
245
246    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
247        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
248        Ok(client
249            .get_news(lang)
250            .await
251            .map_err(|s| s.to_string())?
252            .map_err(|s| s.to_string())?)
253    }
254
255    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
256        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
257        Ok(client
258            .get_free_voucher(secret)
259            .await
260            .map_err(|s| s.to_string())?
261            .map_err(|s| s.to_string())?)
262    }
263
264    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
265        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
266
267        // Call the broker's redeem_voucher method directly with the secret
268        client
269            .redeem_voucher(secret, code)
270            .await
271            .map_err(|s| s.to_string())?
272            .map_err(|s| s.to_string())
273    }
274
275    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
276        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
277        Ok(client
278            .raw_price_points()
279            .await
280            .map_err(|s| s.to_string())?
281            .map_err(|s| s.to_string())?
282            .into_iter()
283            .map(|(a, b)| (a, b as f64 / 100.0))
284            .collect())
285    }
286
287    async fn create_payment(
288        &self,
289        secret: String,
290        days: u32,
291        method: String,
292    ) -> Result<String, String> {
293        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
294        Ok(client
295            .create_payment(secret, days, method)
296            .await
297            .map_err(|s| s.to_string())?
298            .map_err(|s| s.to_string())?)
299    }
300
301    async fn export_debug_pack(
302        &self,
303        email: Option<String>,
304        contents: String,
305    ) -> Result<(), String> {
306        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
307        client
308            .upload_debug_pack(email, contents)
309            .await
310            .map_err(|s| s.to_string())?
311            .map_err(|s| s.to_string())?;
312        Ok(())
313    }
314}
315
316pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
317
318#[async_trait]
319impl RpcTransport for DummyControlProtocolTransport {
320    type Error = Infallible;
321
322    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
323        Ok(self.0.respond_raw(req).await)
324    }
325}