geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::LazyLock,
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10    puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
15use parking_lot::Mutex;
16use serde::{Deserialize, Serialize};
17use slab::Slab;
18
19use crate::{
20    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
21    traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
22};
23
24#[nanorpc_derive]
25#[async_trait]
26pub trait ControlProtocol {
27    async fn conn_info(&self) -> ConnInfo;
28    async fn stat_num(&self, stat: String) -> f64;
29    async fn start_time(&self) -> SystemTime;
30    async fn stop(&self);
31
32    async fn recent_logs(&self) -> Vec<String>;
33
34    // broker-proxying stuff
35
36    async fn check_secret(&self, secret: String) -> Result<bool, String>;
37    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
38    async fn start_registration(&self) -> Result<usize, String>;
39    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
40    async fn convert_legacy_account(
41        &self,
42        username: String,
43        password: String,
44    ) -> Result<String, String>;
45    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
46    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
47    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
48    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
49    async fn create_payment(
50        &self,
51        secret: String,
52        days: u32,
53        method: String,
54    ) -> Result<String, String>;
55    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
56    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
57    async fn export_debug_pack(
58        &self,
59        email: Option<String>,
60        contents: String,
61    ) -> Result<(), String>;
62
63    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
64}
65
66#[derive(Serialize, Deserialize, Clone, Debug)]
67#[serde(tag = "state")]
68pub enum ConnInfo {
69    Disconnected,
70    Connecting,
71    Connected(ConnectedInfo),
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug)]
75pub struct ConnectedInfo {
76    pub protocol: String,
77    pub bridge: String,
78
79    pub exit: ExitDescriptor,
80}
81
82#[derive(Serialize, Deserialize, Clone, Debug)]
83pub struct UserInfo {
84    pub level: AccountLevel,
85    pub expiry: Option<u64>,
86}
87
88pub struct ControlProtocolImpl {
89    pub ctx: AnyCtx<Config>,
90}
91
92pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
93
94static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
95    LazyLock::new(|| Mutex::new(Slab::new()));
96
97#[derive(Serialize, Deserialize, Clone)]
98pub struct RegistrationProgress {
99    pub progress: f64,
100    pub secret: Option<String>,
101}
102
103#[async_trait]
104impl ControlProtocol for ControlProtocolImpl {
105    async fn conn_info(&self) -> ConnInfo {
106        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
107    }
108
109    async fn stat_num(&self, stat: String) -> f64 {
110        stat_get_num(&self.ctx, &stat)
111    }
112
113    async fn start_time(&self) -> SystemTime {
114        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
115        *self.ctx.get(START_TIME)
116    }
117
118    async fn stop(&self) {
119        std::thread::spawn(move || {
120            std::thread::sleep(Duration::from_millis(100));
121            std::process::exit(0);
122        });
123    }
124
125    async fn recent_logs(&self) -> Vec<String> {
126        get_json_logs().split("\n").map(|s| s.to_string()).collect()
127    }
128
129    async fn check_secret(&self, secret: String) -> Result<bool, String> {
130        let res = broker_client(&self.ctx)
131            .map_err(|e| format!("{:?}", e))?
132            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
133            .await
134            .map_err(|e| format!("{:?}", e))?
135            .map_err(|e| format!("{:?}", e))?;
136        Ok(res.is_some())
137    }
138
139    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
140        let res = broker_client(&self.ctx)
141            .map_err(|e| format!("{:?}", e))?
142            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
143            .await
144            .map_err(|e| format!("{:?}", e))?
145            .map_err(|e| format!("{:?}", e))?
146            .ok_or_else(|| "no such user".to_string())?;
147        Ok(UserInfo {
148            level: if res.plus_expires_unix.is_some() {
149                AccountLevel::Plus
150            } else {
151                AccountLevel::Free
152            },
153            expiry: res.plus_expires_unix,
154        })
155    }
156
157    async fn start_registration(&self) -> Result<usize, String> {
158        let (puzzle, difficulty) = broker_client(&self.ctx)
159            .map_err(|e| format!("{:?}", e))?
160            .get_puzzle()
161            .await
162            .map_err(|e| format!("{:?}", e))?;
163        tracing::debug!(puzzle, difficulty, "got puzzle");
164        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
165            progress: 0.0,
166            secret: None,
167        });
168        let ctx = self.ctx.clone();
169        smolscale::spawn(async move {
170            loop {
171                let fallible = async {
172                    let solution = {
173                        let puzzle = puzzle.clone();
174                        smol::unblock(move || {
175                            solve_puzzle(&puzzle, difficulty, |progress| {
176                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
177                                    progress,
178                                    secret: None,
179                                }
180                            })
181                        })
182                        .await
183                    };
184                    let secret = broker_client(&ctx)?
185                        .register_user_secret(puzzle.clone(), solution)
186                        .await?
187                        .map_err(|e| anyhow::anyhow!(e))?;
188                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
189                        progress: 1.0,
190                        secret: Some(secret.clone()),
191                    };
192                    anyhow::Ok(secret)
193                };
194                if let Err(err) = fallible.await {
195                    tracing::warn!(err = debug(err), "restarting registration")
196                } else {
197                    break;
198                }
199            }
200        })
201        .detach();
202        Ok(idx)
203    }
204
205    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
206        tracing::debug!(idx, "polling registration");
207        let registers = REGISTRATIONS.lock();
208        registers
209            .get(idx)
210            .cloned()
211            .ok_or_else(|| "no such registration".to_string())
212    }
213
214    async fn convert_legacy_account(
215        &self,
216        username: String,
217        password: String,
218    ) -> Result<String, String> {
219        Ok(broker_client(&self.ctx)
220            .map_err(|e| format!("{:?}", e))?
221            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
222                username,
223                password,
224            })
225            .await
226            .map_err(|e| format!("{:?}", e))?
227            .map_err(|e| format!("{:?}", e))?)
228    }
229
230    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
231        if stat != "traffic" {
232            return Err(format!("bad: {stat}"));
233        }
234        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
235    }
236
237    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
238        let resp = broker_client(&self.ctx)
239            .map_err(|e| format!("{:?}", e))?
240            .get_exits()
241            .await
242            .map_err(|e| format!("{:?}", e))?
243            .map_err(|e| format!("{:?}", e))?;
244        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
245    }
246
247    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
248        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
249        Ok(client
250            .get_news(lang)
251            .await
252            .map_err(|s| s.to_string())?
253            .map_err(|s| s.to_string())?)
254    }
255
256    async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
257        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
258        Ok(client
259            .get_free_voucher(secret)
260            .await
261            .map_err(|s| s.to_string())?
262            .map_err(|s| s.to_string())?)
263    }
264
265    async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
266        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
267
268        // Call the broker's redeem_voucher method directly with the secret
269        client
270            .redeem_voucher(secret, code)
271            .await
272            .map_err(|s| s.to_string())?
273            .map_err(|s| s.to_string())
274    }
275
276    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
277        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
278        Ok(client
279            .raw_price_points()
280            .await
281            .map_err(|s| s.to_string())?
282            .map_err(|s| s.to_string())?
283            .into_iter()
284            .map(|(a, b)| (a, b as f64 / 100.0))
285            .collect())
286    }
287
288    async fn create_payment(
289        &self,
290        secret: String,
291        days: u32,
292        method: String,
293    ) -> Result<String, String> {
294        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
295        Ok(client
296            .create_payment(secret, days, method)
297            .await
298            .map_err(|s| s.to_string())?
299            .map_err(|s| s.to_string())?)
300    }
301
302    async fn export_debug_pack(
303        &self,
304        email: Option<String>,
305        contents: String,
306    ) -> Result<(), String> {
307        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
308        client
309            .upload_debug_pack(email, contents)
310            .await
311            .map_err(|s| s.to_string())?
312            .map_err(|s| s.to_string())?;
313        Ok(())
314    }
315
316    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
317        get_update_manifest().await.map_err(|e| format!("{:?}", e))
318    }
319}
320
321pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
322
323#[async_trait]
324impl RpcTransport for DummyControlProtocolTransport {
325    type Error = Infallible;
326
327    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
328        Ok(self.0.respond_raw(req).await)
329    }
330}