geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::LazyLock,
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use chrono::{NaiveDate, NaiveDateTime};
10use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, VoucherInfo};
11
12use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use slab::Slab;
16
17use crate::{
18    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
19    traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
20};
21
22#[nanorpc_derive]
23#[async_trait]
24pub trait ControlProtocol {
25    async fn conn_info(&self) -> ConnInfo;
26    async fn stat_num(&self, stat: String) -> f64;
27    async fn start_time(&self) -> SystemTime;
28    async fn stop(&self);
29
30    async fn recent_logs(&self) -> Vec<String>;
31
32    // broker-proxying stuff
33
34    async fn check_secret(&self, secret: String) -> Result<bool, String>;
35    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
36    async fn start_registration(&self) -> Result<usize, String>;
37    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
38    async fn convert_legacy_account(
39        &self,
40        username: String,
41        password: String,
42    ) -> Result<String, String>;
43    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
44    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
45    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
46    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
47    async fn payment_methods(&self) -> Result<Vec<String>, String>;
48    async fn create_payment(
49        &self,
50        secret: String,
51        days: u32,
52        method: String,
53    ) -> Result<String, String>;
54    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
55    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
56    async fn export_debug_pack(
57        &self,
58        email: Option<String>,
59        contents: String,
60    ) -> Result<(), String>;
61
62    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66#[serde(tag = "state")]
67pub enum ConnInfo {
68    Disconnected,
69    Connecting,
70    Connected(ConnectedInfo),
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct ConnectedInfo {
75    pub protocol: String,
76    pub bridge: String,
77
78    pub exit: ExitDescriptor,
79}
80
81#[derive(Serialize, Deserialize, Clone, Debug)]
82pub struct UserInfo {
83    pub user_id: u64,
84    pub level: AccountLevel,
85
86    pub recurring: bool,
87    pub expiry: Option<u64>,
88}
89
90pub struct ControlProtocolImpl {
91    pub ctx: AnyCtx<Config>,
92}
93
94pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
95
96static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
97    LazyLock::new(|| Mutex::new(Slab::new()));
98
99#[derive(Serialize, Deserialize, Clone)]
100pub struct RegistrationProgress {
101    pub progress: f64,
102    pub secret: Option<String>,
103}
104
105#[async_trait]
106impl ControlProtocol for ControlProtocolImpl {
107    async fn conn_info(&self) -> ConnInfo {
108        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
109    }
110
111    async fn stat_num(&self, stat: String) -> f64 {
112        stat_get_num(&self.ctx, &stat)
113    }
114
115    async fn start_time(&self) -> SystemTime {
116        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
117        *self.ctx.get(START_TIME)
118    }
119
120    async fn stop(&self) {
121        std::thread::spawn(move || {
122            std::thread::sleep(Duration::from_millis(100));
123            std::process::exit(0);
124        });
125    }
126
127    async fn recent_logs(&self) -> Vec<String> {
128        get_json_logs().split("\n").map(|s| s.to_string()).collect()
129    }
130
131    async fn check_secret(&self, secret: String) -> Result<bool, String> {
132        let res = broker_client(&self.ctx)
133            .map_err(|e| format!("{:?}", e))?
134            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
135            .await
136            .map_err(|e| format!("{:?}", e))?
137            .map_err(|e| format!("{:?}", e))?;
138        Ok(res.is_some())
139    }
140
141    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
142        let res = broker_client(&self.ctx)
143            .map_err(|e| format!("{:?}", e))?
144            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
145            .await
146            .map_err(|e| format!("{:?}", e))?
147            .map_err(|e| format!("{:?}", e))?
148            .ok_or_else(|| "no such user".to_string())?;
149        Ok(UserInfo {
150            user_id: res.user_id,
151            level: if res.plus_expires_unix.is_some() {
152                AccountLevel::Plus
153            } else {
154                AccountLevel::Free
155            },
156            expiry: res.plus_expires_unix,
157            recurring: res.recurring,
158        })
159    }
160
161    async fn start_registration(&self) -> Result<usize, String> {
162        let (puzzle, difficulty) = broker_client(&self.ctx)
163            .map_err(|e| format!("{:?}", e))?
164            .get_puzzle()
165            .await
166            .map_err(|e| format!("{:?}", e))?;
167        tracing::debug!(puzzle, difficulty, "got puzzle");
168        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
169            progress: 0.0,
170            secret: None,
171        });
172        let ctx = self.ctx.clone();
173        smolscale::spawn(async move {
174            loop {
175                let fallible = async {
176                    let solution = {
177                        let puzzle = puzzle.clone();
178                        smol::unblock(move || {
179                            solve_puzzle(&puzzle, difficulty, |progress| {
180                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
181                                    progress,
182                                    secret: None,
183                                }
184                            })
185                        })
186                        .await
187                    };
188                    let secret = broker_client(&ctx)?
189                        .register_user_secret(puzzle.clone(), solution)
190                        .await?
191                        .map_err(|e| anyhow::anyhow!(e))?;
192                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
193                        progress: 1.0,
194                        secret: Some(secret.clone()),
195                    };
196                    anyhow::Ok(secret)
197                };
198                if let Err(err) = fallible.await {
199                    tracing::warn!(err = debug(err), "restarting registration")
200                } else {
201                    break;
202                }
203            }
204        })
205        .detach();
206        Ok(idx)
207    }
208
209    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
210        tracing::debug!(idx, "polling registration");
211        let registers = REGISTRATIONS.lock();
212        registers
213            .get(idx)
214            .cloned()
215            .ok_or_else(|| "no such registration".to_string())
216    }
217
218    async fn convert_legacy_account(
219        &self,
220        username: String,
221        password: String,
222    ) -> Result<String, String> {
223        Ok(broker_client(&self.ctx)
224            .map_err(|e| format!("{:?}", e))?
225            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
226                username,
227                password,
228            })
229            .await
230            .map_err(|e| format!("{:?}", e))?
231            .map_err(|e| format!("{:?}", e))?)
232    }
233
234    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
235        if stat != "traffic" {
236            return Err(format!("bad: {stat}"));
237        }
238        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
239    }
240
241    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
242        let resp = broker_client(&self.ctx)
243            .map_err(|e| format!("{:?}", e))?
244            .get_exits()
245            .await
246            .map_err(|e| format!("{:?}", e))?
247            .map_err(|e| format!("{:?}", e))?;
248        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
249    }
250
251    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
252        let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
253        let news = manifest["news"]
254            .as_array()
255            .ok_or_else(|| "No news array".to_string())?;
256
257        let mut out = Vec::new();
258
259        for item in news {
260            let important = item["important"].as_bool().unwrap_or_default();
261            let date_str = item["date"]
262                .as_str()
263                .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
264
265            let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
266                .map_err(|_| format!("Invalid date format: {}", date_str))?;
267
268            let naive_dt: NaiveDateTime = naive_date
269                .and_hms_opt(0, 0, 0)
270                .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
271            let date_unix = naive_dt.and_utc().timestamp() as u64;
272
273            let localized = item[&lang]
274                .as_object()
275                .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
276
277            let title = localized["title"]
278                .as_str()
279                .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
280            let contents = localized["contents"]
281                .as_str()
282                .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
283
284            out.push(NewsItem {
285                title: title.to_string(),
286                date_unix,
287                contents: contents.to_string(),
288                important,
289            });
290        }
291
292        Ok(out)
293    }
294
295    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
296        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
297        Ok(client
298            .get_free_voucher(secret)
299            .await
300            .map_err(|s| s.to_string())?
301            .map_err(|s| s.to_string())?)
302    }
303
304    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
305        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
306
307        // Call the broker's redeem_voucher method directly with the secret
308        client
309            .redeem_voucher(secret, code)
310            .await
311            .map_err(|s| s.to_string())?
312            .map_err(|s| s.to_string())
313    }
314
315    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
316        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
317        Ok(client
318            .raw_price_points()
319            .await
320            .map_err(|s| s.to_string())?
321            .map_err(|s| s.to_string())?
322            .into_iter()
323            .map(|(a, b)| (a, b as f64 / 100.0))
324            .collect())
325    }
326
327    async fn payment_methods(&self) -> Result<Vec<String>, String> {
328        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
329        Ok(client
330            .payment_methods()
331            .await
332            .map_err(|s| s.to_string())?
333            .map_err(|s| s.to_string())?)
334    }
335
336    async fn create_payment(
337        &self,
338        secret: String,
339        days: u32,
340        method: String,
341    ) -> Result<String, String> {
342        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
343        Ok(client
344            .create_payment(secret, days, method)
345            .await
346            .map_err(|s| s.to_string())?
347            .map_err(|s| s.to_string())?)
348    }
349
350    async fn export_debug_pack(
351        &self,
352        email: Option<String>,
353        contents: String,
354    ) -> Result<(), String> {
355        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
356        client
357            .upload_debug_pack(email, contents)
358            .await
359            .map_err(|s| s.to_string())?
360            .map_err(|s| s.to_string())?;
361        Ok(())
362    }
363
364    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
365        get_update_manifest().await.map_err(|e| format!("{:?}", e))
366    }
367}
368
369pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
370
371#[async_trait]
372impl RpcTransport for DummyControlProtocolTransport {
373    type Error = Infallible;
374
375    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
376        Ok(self.0.respond_raw(req).await)
377    }
378}
379
380#[derive(Serialize, Deserialize, Clone, Debug)]
381pub struct NewsItem {
382    pub title: String,
383    pub date_unix: u64,
384    pub contents: String,
385    pub important: bool,
386}