geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::{Arc, LazyLock},
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num, Config};
19
20#[nanorpc_derive]
21#[async_trait]
22pub trait ControlProtocol {
23    async fn conn_info(&self) -> ConnInfo;
24    async fn stat_num(&self, stat: String) -> f64;
25    async fn start_time(&self) -> SystemTime;
26    async fn stop(&self);
27
28    async fn recent_logs(&self) -> Vec<String>;
29
30    // broker-proxying stuff
31
32    async fn check_secret(&self, secret: String) -> Result<bool, String>;
33    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
34    async fn start_registration(&self) -> Result<usize, String>;
35    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
36    async fn convert_legacy_account(
37        &self,
38        username: String,
39        password: String,
40    ) -> Result<String, String>;
41    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
42    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
43    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
44    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
45    async fn create_payment(
46        &self,
47        secret: String,
48        days: u32,
49        method: String,
50    ) -> Result<String, String>;
51    async fn export_debug_pack(
52        &self,
53        email: Option<String>,
54        contents: String,
55    ) -> Result<(), String>;
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug)]
59#[serde(tag = "state")]
60pub enum ConnInfo {
61    Connecting,
62    Connected(ConnectedInfo),
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66pub struct ConnectedInfo {
67    pub protocol: String,
68    pub bridge: String,
69
70    pub exit: ExitDescriptor,
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct UserInfo {
75    pub level: AccountLevel,
76    pub expiry: Option<u64>,
77}
78
79pub struct ControlProtocolImpl {
80    pub ctx: AnyCtx<Config>,
81}
82
83pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Connecting);
84
85static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
86    LazyLock::new(|| Mutex::new(Slab::new()));
87
88#[derive(Serialize, Deserialize, Clone)]
89pub struct RegistrationProgress {
90    pub progress: f64,
91    pub secret: Option<String>,
92}
93
94#[async_trait]
95impl ControlProtocol for ControlProtocolImpl {
96    async fn conn_info(&self) -> ConnInfo {
97        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
98    }
99
100    async fn stat_num(&self, stat: String) -> f64 {
101        stat_get_num(&self.ctx, &stat)
102    }
103
104    async fn start_time(&self) -> SystemTime {
105        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
106        *self.ctx.get(START_TIME)
107    }
108
109    async fn stop(&self) {
110        std::thread::spawn(move || {
111            std::thread::sleep(Duration::from_millis(100));
112            std::process::exit(0);
113        });
114    }
115
116    async fn recent_logs(&self) -> Vec<String> {
117        get_json_logs().split("\n").map(|s| s.to_string()).collect()
118    }
119
120    async fn check_secret(&self, secret: String) -> Result<bool, String> {
121        let res = broker_client(&self.ctx)
122            .map_err(|e| format!("{:?}", e))?
123            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
124            .await
125            .map_err(|e| format!("{:?}", e))?
126            .map_err(|e| format!("{:?}", e))?;
127        Ok(res.is_some())
128    }
129
130    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
131        static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
132            Cache::builder()
133                .time_to_live(Duration::from_secs(60))
134                .build()
135        };
136
137        let cache = self.ctx.get(USER_INFO_CACHE);
138
139        cache
140            .try_get_with(secret.clone(), async {
141                let res = broker_client(&self.ctx)
142                    .map_err(|e| format!("{:?}", e))?
143                    .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
144                    .await
145                    .map_err(|e| format!("{:?}", e))?
146                    .map_err(|e| format!("{:?}", e))?
147                    .ok_or_else(|| "no such user".to_string())?;
148                Ok(UserInfo {
149                    level: if res.plus_expires_unix.is_some() {
150                        AccountLevel::Plus
151                    } else {
152                        AccountLevel::Free
153                    },
154                    expiry: res.plus_expires_unix,
155                })
156            })
157            .await
158            .map_err(|s: Arc<String>| (*s).clone())
159    }
160
161    async fn start_registration(&self) -> Result<usize, String> {
162        let (puzzle, difficulty) = broker_client(&self.ctx)
163            .map_err(|e| format!("{:?}", e))?
164            .get_puzzle()
165            .await
166            .map_err(|e| format!("{:?}", e))?;
167        tracing::debug!(puzzle, difficulty, "got puzzle");
168        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
169            progress: 0.0,
170            secret: None,
171        });
172        let ctx = self.ctx.clone();
173        smolscale::spawn(async move {
174            loop {
175                let fallible = async {
176                    let solution = {
177                        let puzzle = puzzle.clone();
178                        smol::unblock(move || {
179                            solve_puzzle(&puzzle, difficulty, |progress| {
180                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
181                                    progress,
182                                    secret: None,
183                                }
184                            })
185                        })
186                        .await
187                    };
188                    let secret = broker_client(&ctx)?
189                        .register_user_secret(puzzle.clone(), solution)
190                        .await?
191                        .map_err(|e| anyhow::anyhow!(e))?;
192                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
193                        progress: 1.0,
194                        secret: Some(secret.clone()),
195                    };
196                    anyhow::Ok(secret)
197                };
198                if let Err(err) = fallible.await {
199                    tracing::warn!(err = debug(err), "restarting registration")
200                } else {
201                    break;
202                }
203            }
204        })
205        .detach();
206        Ok(idx)
207    }
208
209    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
210        tracing::debug!(idx, "polling registration");
211        let registers = REGISTRATIONS.lock();
212        registers
213            .get(idx)
214            .cloned()
215            .ok_or_else(|| "no such registration".to_string())
216    }
217
218    async fn convert_legacy_account(
219        &self,
220        username: String,
221        password: String,
222    ) -> Result<String, String> {
223        Ok(broker_client(&self.ctx)
224            .map_err(|e| format!("{:?}", e))?
225            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
226                username,
227                password,
228            })
229            .await
230            .map_err(|e| format!("{:?}", e))?
231            .map_err(|e| format!("{:?}", e))?)
232    }
233
234    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
235        Ok(vec![1.0, 2.0, 3.0])
236    }
237
238    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
239        let resp = broker_client(&self.ctx)
240            .map_err(|e| format!("{:?}", e))?
241            .get_exits()
242            .await
243            .map_err(|e| format!("{:?}", e))?
244            .map_err(|e| format!("{:?}", e))?;
245        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
246    }
247
248    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
249        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
250        Ok(client
251            .get_news(lang)
252            .await
253            .map_err(|s| s.to_string())?
254            .map_err(|s| s.to_string())?)
255    }
256
257    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
258        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
259        Ok(client
260            .raw_price_points()
261            .await
262            .map_err(|s| s.to_string())?
263            .map_err(|s| s.to_string())?
264            .into_iter()
265            .map(|(a, b)| (a, b as f64 / 100.0))
266            .collect())
267    }
268
269    async fn create_payment(
270        &self,
271        auth_token: String,
272        days: u32,
273        method: String,
274    ) -> Result<String, String> {
275        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
276        Ok(client
277            .create_payment(auth_token, days, method)
278            .await
279            .map_err(|s| s.to_string())?
280            .map_err(|s| s.to_string())?)
281    }
282
283    async fn export_debug_pack(
284        &self,
285        email: Option<String>,
286        contents: String,
287    ) -> Result<(), String> {
288        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
289        client
290            .upload_debug_pack(email, contents)
291            .await
292            .map_err(|s| s.to_string())?
293            .map_err(|s| s.to_string())?;
294        Ok(())
295    }
296}
297
298pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
299
300#[async_trait]
301impl RpcTransport for DummyControlProtocolTransport {
302    type Error = Infallible;
303
304    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
305        Ok(self.0.respond_raw(req).await)
306    }
307}