geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::{Arc, LazyLock},
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{
19    broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
20    traffcount::TRAFF_COUNT, Config,
21};
22
23#[nanorpc_derive]
24#[async_trait]
25pub trait ControlProtocol {
26    async fn conn_info(&self) -> ConnInfo;
27    async fn stat_num(&self, stat: String) -> f64;
28    async fn start_time(&self) -> SystemTime;
29    async fn stop(&self);
30
31    async fn recent_logs(&self) -> Vec<String>;
32
33    // broker-proxying stuff
34
35    async fn check_secret(&self, secret: String) -> Result<bool, String>;
36    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
37    async fn start_registration(&self) -> Result<usize, String>;
38    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
39    async fn convert_legacy_account(
40        &self,
41        username: String,
42        password: String,
43    ) -> Result<String, String>;
44    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
45    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
46    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
47    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
48    async fn create_payment(
49        &self,
50        secret: String,
51        days: u32,
52        method: String,
53    ) -> Result<String, String>;
54    async fn export_debug_pack(
55        &self,
56        email: Option<String>,
57        contents: String,
58    ) -> Result<(), String>;
59}
60
61#[derive(Serialize, Deserialize, Clone, Debug)]
62#[serde(tag = "state")]
63pub enum ConnInfo {
64    Disconnected,
65    Connecting,
66    Connected(ConnectedInfo),
67}
68
69#[derive(Serialize, Deserialize, Clone, Debug)]
70pub struct ConnectedInfo {
71    pub protocol: String,
72    pub bridge: String,
73
74    pub exit: ExitDescriptor,
75}
76
77#[derive(Serialize, Deserialize, Clone, Debug)]
78pub struct UserInfo {
79    pub level: AccountLevel,
80    pub expiry: Option<u64>,
81}
82
83pub struct ControlProtocolImpl {
84    pub ctx: AnyCtx<Config>,
85}
86
87pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
88
89static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
90    LazyLock::new(|| Mutex::new(Slab::new()));
91
92#[derive(Serialize, Deserialize, Clone)]
93pub struct RegistrationProgress {
94    pub progress: f64,
95    pub secret: Option<String>,
96}
97
98#[async_trait]
99impl ControlProtocol for ControlProtocolImpl {
100    async fn conn_info(&self) -> ConnInfo {
101        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
102    }
103
104    async fn stat_num(&self, stat: String) -> f64 {
105        stat_get_num(&self.ctx, &stat)
106    }
107
108    async fn start_time(&self) -> SystemTime {
109        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
110        *self.ctx.get(START_TIME)
111    }
112
113    async fn stop(&self) {
114        std::thread::spawn(move || {
115            std::thread::sleep(Duration::from_millis(100));
116            std::process::exit(0);
117        });
118    }
119
120    async fn recent_logs(&self) -> Vec<String> {
121        get_json_logs().split("\n").map(|s| s.to_string()).collect()
122    }
123
124    async fn check_secret(&self, secret: String) -> Result<bool, String> {
125        let res = broker_client(&self.ctx)
126            .map_err(|e| format!("{:?}", e))?
127            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
128            .await
129            .map_err(|e| format!("{:?}", e))?
130            .map_err(|e| format!("{:?}", e))?;
131        Ok(res.is_some())
132    }
133
134    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
135        static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
136            Cache::builder()
137                .time_to_live(Duration::from_secs(60))
138                .build()
139        };
140
141        let cache = self.ctx.get(USER_INFO_CACHE);
142
143        cache
144            .try_get_with(secret.clone(), async {
145                let res = broker_client(&self.ctx)
146                    .map_err(|e| format!("{:?}", e))?
147                    .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
148                    .await
149                    .map_err(|e| format!("{:?}", e))?
150                    .map_err(|e| format!("{:?}", e))?
151                    .ok_or_else(|| "no such user".to_string())?;
152                Ok(UserInfo {
153                    level: if res.plus_expires_unix.is_some() {
154                        AccountLevel::Plus
155                    } else {
156                        AccountLevel::Free
157                    },
158                    expiry: res.plus_expires_unix,
159                })
160            })
161            .await
162            .map_err(|s: Arc<String>| (*s).clone())
163    }
164
165    async fn start_registration(&self) -> Result<usize, String> {
166        let (puzzle, difficulty) = broker_client(&self.ctx)
167            .map_err(|e| format!("{:?}", e))?
168            .get_puzzle()
169            .await
170            .map_err(|e| format!("{:?}", e))?;
171        tracing::debug!(puzzle, difficulty, "got puzzle");
172        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
173            progress: 0.0,
174            secret: None,
175        });
176        let ctx = self.ctx.clone();
177        smolscale::spawn(async move {
178            loop {
179                let fallible = async {
180                    let solution = {
181                        let puzzle = puzzle.clone();
182                        smol::unblock(move || {
183                            solve_puzzle(&puzzle, difficulty, |progress| {
184                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
185                                    progress,
186                                    secret: None,
187                                }
188                            })
189                        })
190                        .await
191                    };
192                    let secret = broker_client(&ctx)?
193                        .register_user_secret(puzzle.clone(), solution)
194                        .await?
195                        .map_err(|e| anyhow::anyhow!(e))?;
196                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
197                        progress: 1.0,
198                        secret: Some(secret.clone()),
199                    };
200                    anyhow::Ok(secret)
201                };
202                if let Err(err) = fallible.await {
203                    tracing::warn!(err = debug(err), "restarting registration")
204                } else {
205                    break;
206                }
207            }
208        })
209        .detach();
210        Ok(idx)
211    }
212
213    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
214        tracing::debug!(idx, "polling registration");
215        let registers = REGISTRATIONS.lock();
216        registers
217            .get(idx)
218            .cloned()
219            .ok_or_else(|| "no such registration".to_string())
220    }
221
222    async fn convert_legacy_account(
223        &self,
224        username: String,
225        password: String,
226    ) -> Result<String, String> {
227        Ok(broker_client(&self.ctx)
228            .map_err(|e| format!("{:?}", e))?
229            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
230                username,
231                password,
232            })
233            .await
234            .map_err(|e| format!("{:?}", e))?
235            .map_err(|e| format!("{:?}", e))?)
236    }
237
238    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
239        if stat != "traffic" {
240            return Err(format!("bad: {stat}"));
241        }
242        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
243    }
244
245    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
246        let resp = broker_client(&self.ctx)
247            .map_err(|e| format!("{:?}", e))?
248            .get_exits()
249            .await
250            .map_err(|e| format!("{:?}", e))?
251            .map_err(|e| format!("{:?}", e))?;
252        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
253    }
254
255    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
256        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
257        Ok(client
258            .get_news(lang)
259            .await
260            .map_err(|s| s.to_string())?
261            .map_err(|s| s.to_string())?)
262    }
263
264    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
265        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
266        Ok(client
267            .raw_price_points()
268            .await
269            .map_err(|s| s.to_string())?
270            .map_err(|s| s.to_string())?
271            .into_iter()
272            .map(|(a, b)| (a, b as f64 / 100.0))
273            .collect())
274    }
275
276    async fn create_payment(
277        &self,
278        auth_token: String,
279        days: u32,
280        method: String,
281    ) -> Result<String, String> {
282        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
283        Ok(client
284            .create_payment(auth_token, days, method)
285            .await
286            .map_err(|s| s.to_string())?
287            .map_err(|s| s.to_string())?)
288    }
289
290    async fn export_debug_pack(
291        &self,
292        email: Option<String>,
293        contents: String,
294    ) -> Result<(), String> {
295        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
296        client
297            .upload_debug_pack(email, contents)
298            .await
299            .map_err(|s| s.to_string())?
300            .map_err(|s| s.to_string())?;
301        Ok(())
302    }
303}
304
305pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
306
307#[async_trait]
308impl RpcTransport for DummyControlProtocolTransport {
309    type Error = Infallible;
310
311    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
312        Ok(self.0.respond_raw(req).await)
313    }
314}