geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::LazyLock,
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use chrono::{NaiveDate, NaiveDateTime};
10use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, VoucherInfo};
11
12use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use slab::Slab;
16use tap::Tap;
17
18use crate::{
19    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
20    traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
21};
22
23#[nanorpc_derive]
24#[async_trait]
25pub trait ControlProtocol {
26    async fn conn_info(&self) -> ConnInfo;
27    async fn stat_num(&self, stat: String) -> f64;
28    async fn start_time(&self) -> SystemTime;
29    async fn stop(&self);
30
31    async fn recent_logs(&self) -> Vec<String>;
32
33    // broker-proxying stuff
34
35    async fn check_secret(&self, secret: String) -> Result<bool, String>;
36    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
37    async fn start_registration(&self) -> Result<usize, String>;
38    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
39    async fn convert_legacy_account(
40        &self,
41        username: String,
42        password: String,
43    ) -> Result<String, String>;
44    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
45    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
46    async fn free_exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
47    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
48    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
49    async fn payment_methods(&self) -> Result<Vec<String>, String>;
50    async fn create_payment(
51        &self,
52        secret: String,
53        days: u32,
54        method: String,
55    ) -> Result<String, String>;
56    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58    async fn export_debug_pack(
59        &self,
60        email: Option<String>,
61        contents: String,
62    ) -> Result<(), String>;
63
64    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
65}
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
68#[serde(tag = "state")]
69pub enum ConnInfo {
70    Disconnected,
71    Connecting,
72    Connected(ConnectedInfo),
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct ConnectedInfo {
77    pub protocol: String,
78    pub bridge: String,
79
80    pub exit: ExitDescriptor,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct UserInfo {
85    pub user_id: u64,
86    pub level: AccountLevel,
87
88    pub recurring: bool,
89    pub expiry: Option<u64>,
90}
91
92pub struct ControlProtocolImpl {
93    pub ctx: AnyCtx<Config>,
94}
95
96pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
97
98static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
99    LazyLock::new(|| Mutex::new(Slab::new()));
100
101#[derive(Serialize, Deserialize, Clone)]
102pub struct RegistrationProgress {
103    pub progress: f64,
104    pub secret: Option<String>,
105}
106
107#[async_trait]
108impl ControlProtocol for ControlProtocolImpl {
109    async fn conn_info(&self) -> ConnInfo {
110        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
111    }
112
113    async fn stat_num(&self, stat: String) -> f64 {
114        stat_get_num(&self.ctx, &stat)
115    }
116
117    async fn start_time(&self) -> SystemTime {
118        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
119        *self.ctx.get(START_TIME)
120    }
121
122    async fn stop(&self) {
123        std::thread::spawn(move || {
124            std::thread::sleep(Duration::from_millis(100));
125            std::process::exit(0);
126        });
127    }
128
129    async fn recent_logs(&self) -> Vec<String> {
130        get_json_logs().split("\n").map(|s| s.to_string()).collect()
131    }
132
133    async fn check_secret(&self, secret: String) -> Result<bool, String> {
134        let res = broker_client(&self.ctx)
135            .map_err(|e| format!("{:?}", e))?
136            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
137            .await
138            .map_err(|e| format!("{:?}", e))?
139            .map_err(|e| format!("{:?}", e))?;
140        Ok(res.is_some())
141    }
142
143    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
144        let res = broker_client(&self.ctx)
145            .map_err(|e| format!("{:?}", e))?
146            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
147            .await
148            .map_err(|e| format!("{:?}", e))?
149            .map_err(|e| format!("{:?}", e))?
150            .ok_or_else(|| "no such user".to_string())?;
151        Ok(UserInfo {
152            user_id: res.user_id,
153            level: if res.plus_expires_unix.is_some() {
154                AccountLevel::Plus
155            } else {
156                AccountLevel::Free
157            },
158            expiry: res.plus_expires_unix,
159            recurring: res.recurring,
160        })
161    }
162
163    async fn start_registration(&self) -> Result<usize, String> {
164        let (puzzle, difficulty) = broker_client(&self.ctx)
165            .map_err(|e| format!("{:?}", e))?
166            .get_puzzle()
167            .await
168            .map_err(|e| format!("{:?}", e))?;
169        tracing::debug!(puzzle, difficulty, "got puzzle");
170        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
171            progress: 0.0,
172            secret: None,
173        });
174        let ctx = self.ctx.clone();
175        smolscale::spawn(async move {
176            loop {
177                let fallible = async {
178                    let solution = {
179                        let puzzle = puzzle.clone();
180                        smol::unblock(move || {
181                            solve_puzzle(&puzzle, difficulty, |progress| {
182                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
183                                    progress,
184                                    secret: None,
185                                }
186                            })
187                        })
188                        .await
189                    };
190                    let secret = broker_client(&ctx)?
191                        .register_user_secret(puzzle.clone(), solution)
192                        .await?
193                        .map_err(|e| anyhow::anyhow!(e))?;
194                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
195                        progress: 1.0,
196                        secret: Some(secret.clone()),
197                    };
198                    anyhow::Ok(secret)
199                };
200                if let Err(err) = fallible.await {
201                    tracing::warn!(err = debug(err), "restarting registration")
202                } else {
203                    break;
204                }
205            }
206        })
207        .detach();
208        Ok(idx)
209    }
210
211    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
212        tracing::debug!(idx, "polling registration");
213        let registers = REGISTRATIONS.lock();
214        registers
215            .get(idx)
216            .cloned()
217            .ok_or_else(|| "no such registration".to_string())
218    }
219
220    async fn convert_legacy_account(
221        &self,
222        username: String,
223        password: String,
224    ) -> Result<String, String> {
225        Ok(broker_client(&self.ctx)
226            .map_err(|e| format!("{:?}", e))?
227            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
228                username,
229                password,
230            })
231            .await
232            .map_err(|e| format!("{:?}", e))?
233            .map_err(|e| format!("{:?}", e))?)
234    }
235
236    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
237        if stat != "traffic" {
238            return Err(format!("bad: {stat}"));
239        }
240        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
241    }
242
243    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
244        let resp = broker_client(&self.ctx)
245            .map_err(|e| format!("{:?}", e))?
246            .get_exits()
247            .await
248            .map_err(|e| format!("{:?}", e))?
249            .map_err(|e| format!("{:?}", e))?;
250        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
251    }
252
253    async fn free_exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
254        let resp = broker_client(&self.ctx)
255            .map_err(|e| format!("{:?}", e))?
256            .get_free_exits()
257            .await
258            .map_err(|e| format!("{:?}", e))?
259            .map_err(|e| format!("{:?}", e))?;
260        Ok(resp
261            .inner
262            .all_exits
263            .iter()
264            .map(|s| s.1.clone().tap_mut(|e| e.load = e.load.powf(0.5)))
265            .collect())
266    }
267
268    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
269        let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
270        let news = manifest["news"]
271            .as_array()
272            .ok_or_else(|| "No news array".to_string())?;
273
274        let mut out = Vec::new();
275
276        for item in news {
277            let important = item["important"].as_bool().unwrap_or_default();
278            let date_str = item["date"]
279                .as_str()
280                .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
281
282            let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
283                .map_err(|_| format!("Invalid date format: {}", date_str))?;
284
285            let naive_dt: NaiveDateTime = naive_date
286                .and_hms_opt(0, 0, 0)
287                .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
288            let date_unix = naive_dt.and_utc().timestamp() as u64;
289
290            let localized = item[&lang]
291                .as_object()
292                .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
293
294            let title = localized["title"]
295                .as_str()
296                .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
297            let contents = localized["contents"]
298                .as_str()
299                .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
300
301            out.push(NewsItem {
302                title: title.to_string(),
303                date_unix,
304                contents: contents.to_string(),
305                important,
306            });
307        }
308
309        Ok(out)
310    }
311
312    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
313        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
314        Ok(client
315            .get_free_voucher(secret)
316            .await
317            .map_err(|s| s.to_string())?
318            .map_err(|s| s.to_string())?)
319    }
320
321    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
322        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
323
324        // Call the broker's redeem_voucher method directly with the secret
325        client
326            .redeem_voucher(secret, code)
327            .await
328            .map_err(|s| s.to_string())?
329            .map_err(|s| s.to_string())
330    }
331
332    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
333        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
334        Ok(client
335            .raw_price_points()
336            .await
337            .map_err(|s| s.to_string())?
338            .map_err(|s| s.to_string())?
339            .into_iter()
340            .map(|(a, b)| (a, b as f64 / 100.0))
341            .collect())
342    }
343
344    async fn payment_methods(&self) -> Result<Vec<String>, String> {
345        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
346        Ok(client
347            .payment_methods()
348            .await
349            .map_err(|s| s.to_string())?
350            .map_err(|s| s.to_string())?)
351    }
352
353    async fn create_payment(
354        &self,
355        secret: String,
356        days: u32,
357        method: String,
358    ) -> Result<String, String> {
359        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
360        Ok(client
361            .create_payment(secret, days, method)
362            .await
363            .map_err(|s| s.to_string())?
364            .map_err(|s| s.to_string())?)
365    }
366
367    async fn export_debug_pack(
368        &self,
369        email: Option<String>,
370        contents: String,
371    ) -> Result<(), String> {
372        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
373        client
374            .upload_debug_pack(email, contents)
375            .await
376            .map_err(|s| s.to_string())?
377            .map_err(|s| s.to_string())?;
378        Ok(())
379    }
380
381    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
382        get_update_manifest().await.map_err(|e| format!("{:?}", e))
383    }
384}
385
386pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
387
388#[async_trait]
389impl RpcTransport for DummyControlProtocolTransport {
390    type Error = Infallible;
391
392    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
393        Ok(self.0.respond_raw(req).await)
394    }
395}
396
397#[derive(Serialize, Deserialize, Clone, Debug)]
398pub struct NewsItem {
399    pub title: String,
400    pub date_unix: u64,
401    pub contents: String,
402    pub important: bool,
403}