geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::{Arc, LazyLock},
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10    puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use moka::future::Cache;
15use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use slab::Slab;
19
20use crate::{
21    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
22    traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
23};
24
25#[nanorpc_derive]
26#[async_trait]
27pub trait ControlProtocol {
28    async fn conn_info(&self) -> ConnInfo;
29    async fn stat_num(&self, stat: String) -> f64;
30    async fn start_time(&self) -> SystemTime;
31    async fn stop(&self);
32
33    async fn recent_logs(&self) -> Vec<String>;
34
35    // broker-proxying stuff
36
37    async fn check_secret(&self, secret: String) -> Result<bool, String>;
38    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
39    async fn start_registration(&self) -> Result<usize, String>;
40    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
41    async fn convert_legacy_account(
42        &self,
43        username: String,
44        password: String,
45    ) -> Result<String, String>;
46    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
47    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
48    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
49    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
50    async fn create_payment(
51        &self,
52        secret: String,
53        days: u32,
54        method: String,
55    ) -> Result<String, String>;
56    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58    async fn export_debug_pack(
59        &self,
60        email: Option<String>,
61        contents: String,
62    ) -> Result<(), String>;
63
64    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
65}
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
68#[serde(tag = "state")]
69pub enum ConnInfo {
70    Disconnected,
71    Connecting,
72    Connected(ConnectedInfo),
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct ConnectedInfo {
77    pub protocol: String,
78    pub bridge: String,
79
80    pub exit: ExitDescriptor,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct UserInfo {
85    pub user_id: u64,
86    pub level: AccountLevel,
87
88    pub recurring: bool,
89    pub expiry: Option<u64>,
90}
91
92pub struct ControlProtocolImpl {
93    pub ctx: AnyCtx<Config>,
94}
95
96pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
97
98static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
99    LazyLock::new(|| Mutex::new(Slab::new()));
100
101#[derive(Serialize, Deserialize, Clone)]
102pub struct RegistrationProgress {
103    pub progress: f64,
104    pub secret: Option<String>,
105}
106
107#[async_trait]
108impl ControlProtocol for ControlProtocolImpl {
109    async fn conn_info(&self) -> ConnInfo {
110        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
111    }
112
113    async fn stat_num(&self, stat: String) -> f64 {
114        stat_get_num(&self.ctx, &stat)
115    }
116
117    async fn start_time(&self) -> SystemTime {
118        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
119        *self.ctx.get(START_TIME)
120    }
121
122    async fn stop(&self) {
123        std::thread::spawn(move || {
124            std::thread::sleep(Duration::from_millis(100));
125            std::process::exit(0);
126        });
127    }
128
129    async fn recent_logs(&self) -> Vec<String> {
130        get_json_logs().split("\n").map(|s| s.to_string()).collect()
131    }
132
133    async fn check_secret(&self, secret: String) -> Result<bool, String> {
134        let res = broker_client(&self.ctx)
135            .map_err(|e| format!("{:?}", e))?
136            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
137            .await
138            .map_err(|e| format!("{:?}", e))?
139            .map_err(|e| format!("{:?}", e))?;
140        Ok(res.is_some())
141    }
142
143    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
144        let res = broker_client(&self.ctx)
145            .map_err(|e| format!("{:?}", e))?
146            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
147            .await
148            .map_err(|e| format!("{:?}", e))?
149            .map_err(|e| format!("{:?}", e))?
150            .ok_or_else(|| "no such user".to_string())?;
151        Ok(UserInfo {
152            user_id: res.user_id,
153            level: if res.plus_expires_unix.is_some() {
154                AccountLevel::Plus
155            } else {
156                AccountLevel::Free
157            },
158            expiry: res.plus_expires_unix,
159            recurring: res.recurring,
160        })
161    }
162
163    async fn start_registration(&self) -> Result<usize, String> {
164        let (puzzle, difficulty) = broker_client(&self.ctx)
165            .map_err(|e| format!("{:?}", e))?
166            .get_puzzle()
167            .await
168            .map_err(|e| format!("{:?}", e))?;
169        tracing::debug!(puzzle, difficulty, "got puzzle");
170        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
171            progress: 0.0,
172            secret: None,
173        });
174        let ctx = self.ctx.clone();
175        smolscale::spawn(async move {
176            loop {
177                let fallible = async {
178                    let solution = {
179                        let puzzle = puzzle.clone();
180                        smol::unblock(move || {
181                            solve_puzzle(&puzzle, difficulty, |progress| {
182                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
183                                    progress,
184                                    secret: None,
185                                }
186                            })
187                        })
188                        .await
189                    };
190                    let secret = broker_client(&ctx)?
191                        .register_user_secret(puzzle.clone(), solution)
192                        .await?
193                        .map_err(|e| anyhow::anyhow!(e))?;
194                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
195                        progress: 1.0,
196                        secret: Some(secret.clone()),
197                    };
198                    anyhow::Ok(secret)
199                };
200                if let Err(err) = fallible.await {
201                    tracing::warn!(err = debug(err), "restarting registration")
202                } else {
203                    break;
204                }
205            }
206        })
207        .detach();
208        Ok(idx)
209    }
210
211    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
212        tracing::debug!(idx, "polling registration");
213        let registers = REGISTRATIONS.lock();
214        registers
215            .get(idx)
216            .cloned()
217            .ok_or_else(|| "no such registration".to_string())
218    }
219
220    async fn convert_legacy_account(
221        &self,
222        username: String,
223        password: String,
224    ) -> Result<String, String> {
225        Ok(broker_client(&self.ctx)
226            .map_err(|e| format!("{:?}", e))?
227            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
228                username,
229                password,
230            })
231            .await
232            .map_err(|e| format!("{:?}", e))?
233            .map_err(|e| format!("{:?}", e))?)
234    }
235
236    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
237        if stat != "traffic" {
238            return Err(format!("bad: {stat}"));
239        }
240        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
241    }
242
243    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
244        let resp = broker_client(&self.ctx)
245            .map_err(|e| format!("{:?}", e))?
246            .get_exits()
247            .await
248            .map_err(|e| format!("{:?}", e))?
249            .map_err(|e| format!("{:?}", e))?;
250        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
251    }
252
253    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
254        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
255        Ok(client
256            .get_news(lang)
257            .await
258            .map_err(|s| s.to_string())?
259            .map_err(|s| s.to_string())?)
260    }
261
262    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
263        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
264        Ok(client
265            .get_free_voucher(secret)
266            .await
267            .map_err(|s| s.to_string())?
268            .map_err(|s| s.to_string())?)
269    }
270
271    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
272        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
273
274        // Call the broker's redeem_voucher method directly with the secret
275        client
276            .redeem_voucher(secret, code)
277            .await
278            .map_err(|s| s.to_string())?
279            .map_err(|s| s.to_string())
280    }
281
282    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
283        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
284        Ok(client
285            .raw_price_points()
286            .await
287            .map_err(|s| s.to_string())?
288            .map_err(|s| s.to_string())?
289            .into_iter()
290            .map(|(a, b)| (a, b as f64 / 100.0))
291            .collect())
292    }
293
294    async fn create_payment(
295        &self,
296        secret: String,
297        days: u32,
298        method: String,
299    ) -> Result<String, String> {
300        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
301        Ok(client
302            .create_payment(secret, days, method)
303            .await
304            .map_err(|s| s.to_string())?
305            .map_err(|s| s.to_string())?)
306    }
307
308    async fn export_debug_pack(
309        &self,
310        email: Option<String>,
311        contents: String,
312    ) -> Result<(), String> {
313        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
314        client
315            .upload_debug_pack(email, contents)
316            .await
317            .map_err(|s| s.to_string())?
318            .map_err(|s| s.to_string())?;
319        Ok(())
320    }
321
322    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
323        get_update_manifest().await.map_err(|e| format!("{:?}", e))
324    }
325}
326
327pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
328
329#[async_trait]
330impl RpcTransport for DummyControlProtocolTransport {
331    type Error = Infallible;
332
333    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
334        Ok(self.0.respond_raw(req).await)
335    }
336}