geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::{Arc, LazyLock},
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{broker_client, client::CtxField, logs::LOGS, stats::stat_get_num, Config};
19
20#[nanorpc_derive]
21#[async_trait]
22pub trait ControlProtocol {
23    async fn conn_info(&self) -> ConnInfo;
24    async fn stat_num(&self, stat: String) -> f64;
25    async fn start_time(&self) -> SystemTime;
26    async fn stop(&self);
27
28    async fn recent_logs(&self) -> Vec<String>;
29
30    // broker-proxying stuff
31
32    async fn check_secret(&self, secret: String) -> Result<bool, String>;
33    async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
34    async fn start_registration(&self) -> Result<usize, String>;
35    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
36    async fn convert_legacy_account(
37        &self,
38        username: String,
39        password: String,
40    ) -> Result<String, String>;
41    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
42    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
43    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
44    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
45    async fn create_payment(
46        &self,
47        secret: String,
48        days: u32,
49        method: String,
50    ) -> Result<String, String>;
51}
52
53#[derive(Serialize, Deserialize, Clone, Debug)]
54#[serde(tag = "state")]
55pub enum ConnInfo {
56    Connecting,
57    Connected(ConnectedInfo),
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug)]
61pub struct ConnectedInfo {
62    pub protocol: String,
63    pub bridge: String,
64
65    pub exit: ExitDescriptor,
66}
67
68#[derive(Serialize, Deserialize, Clone, Debug)]
69pub struct UserInfo {
70    pub level: AccountLevel,
71    pub expiry: Option<u64>,
72}
73
74pub struct ControlProtocolImpl {
75    pub ctx: AnyCtx<Config>,
76}
77
78pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Connecting);
79
80static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
81    LazyLock::new(|| Mutex::new(Slab::new()));
82
83#[derive(Serialize, Deserialize, Clone)]
84pub struct RegistrationProgress {
85    pub progress: f64,
86    pub secret: Option<String>,
87}
88
89#[async_trait]
90impl ControlProtocol for ControlProtocolImpl {
91    async fn conn_info(&self) -> ConnInfo {
92        self.ctx.get(CURRENT_CONN_INFO).lock().clone()
93    }
94
95    async fn stat_num(&self, stat: String) -> f64 {
96        stat_get_num(&self.ctx, &stat)
97    }
98
99    async fn start_time(&self) -> SystemTime {
100        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
101        *self.ctx.get(START_TIME)
102    }
103
104    async fn stop(&self) {
105        std::thread::spawn(move || {
106            std::thread::sleep(Duration::from_millis(100));
107            std::process::exit(0);
108        });
109    }
110
111    async fn recent_logs(&self) -> Vec<String> {
112        let logs = LOGS.lock();
113        String::from_utf8_lossy(&logs)
114            .split('\n')
115            .map(|s| s.to_string())
116            .collect_vec()
117    }
118
119    async fn check_secret(&self, secret: String) -> Result<bool, String> {
120        let res = broker_client(&self.ctx)
121            .map_err(|e| format!("{:?}", e))?
122            .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
123            .await
124            .map_err(|e| format!("{:?}", e))?
125            .map_err(|e| format!("{:?}", e))?;
126        Ok(res.is_some())
127    }
128
129    async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
130        static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
131            Cache::builder()
132                .time_to_live(Duration::from_secs(60))
133                .build()
134        };
135
136        let cache = self.ctx.get(USER_INFO_CACHE);
137
138        cache
139            .try_get_with(secret.clone(), async {
140                let res = broker_client(&self.ctx)
141                    .map_err(|e| format!("{:?}", e))?
142                    .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
143                    .await
144                    .map_err(|e| format!("{:?}", e))?
145                    .map_err(|e| format!("{:?}", e))?
146                    .ok_or_else(|| "no such user".to_string())?;
147                Ok(UserInfo {
148                    level: if res.plus_expires_unix.is_some() {
149                        AccountLevel::Plus
150                    } else {
151                        AccountLevel::Free
152                    },
153                    expiry: res.plus_expires_unix,
154                })
155            })
156            .await
157            .map_err(|s: Arc<String>| (*s).clone())
158    }
159
160    async fn start_registration(&self) -> Result<usize, String> {
161        let (puzzle, difficulty) = broker_client(&self.ctx)
162            .map_err(|e| format!("{:?}", e))?
163            .get_puzzle()
164            .await
165            .map_err(|e| format!("{:?}", e))?;
166        tracing::debug!(puzzle, difficulty, "got puzzle");
167        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
168            progress: 0.0,
169            secret: None,
170        });
171        let ctx = self.ctx.clone();
172        smolscale::spawn(async move {
173            loop {
174                let fallible = async {
175                    let solution = {
176                        let puzzle = puzzle.clone();
177                        smol::unblock(move || {
178                            solve_puzzle(&puzzle, difficulty, |progress| {
179                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
180                                    progress,
181                                    secret: None,
182                                }
183                            })
184                        })
185                        .await
186                    };
187                    let secret = broker_client(&ctx)?
188                        .register_user_secret(puzzle.clone(), solution)
189                        .await?
190                        .map_err(|e| anyhow::anyhow!(e))?;
191                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
192                        progress: 1.0,
193                        secret: Some(secret.clone()),
194                    };
195                    anyhow::Ok(secret)
196                };
197                if let Err(err) = fallible.await {
198                    tracing::warn!(err = debug(err), "restarting registration")
199                } else {
200                    break;
201                }
202            }
203        })
204        .detach();
205        Ok(idx)
206    }
207
208    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
209        tracing::debug!(idx, "polling registration");
210        let registers = REGISTRATIONS.lock();
211        registers
212            .get(idx)
213            .cloned()
214            .ok_or_else(|| "no such registration".to_string())
215    }
216
217    async fn convert_legacy_account(
218        &self,
219        username: String,
220        password: String,
221    ) -> Result<String, String> {
222        Ok(broker_client(&self.ctx)
223            .map_err(|e| format!("{:?}", e))?
224            .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
225                username,
226                password,
227            })
228            .await
229            .map_err(|e| format!("{:?}", e))?
230            .map_err(|e| format!("{:?}", e))?)
231    }
232
233    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
234        Ok(vec![1.0, 2.0, 3.0])
235    }
236
237    async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
238        let resp = broker_client(&self.ctx)
239            .map_err(|e| format!("{:?}", e))?
240            .get_exits()
241            .await
242            .map_err(|e| format!("{:?}", e))?
243            .map_err(|e| format!("{:?}", e))?;
244        Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
245    }
246
247    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
248        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
249        Ok(client
250            .get_news(lang)
251            .await
252            .map_err(|s| s.to_string())?
253            .map_err(|s| s.to_string())?)
254    }
255
256    async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
257        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
258        Ok(client
259            .raw_price_points()
260            .await
261            .map_err(|s| s.to_string())?
262            .map_err(|s| s.to_string())?
263            .into_iter()
264            .map(|(a, b)| (a, b as f64 / 100.0))
265            .collect())
266    }
267
268    async fn create_payment(
269        &self,
270        auth_token: String,
271        days: u32,
272        method: String,
273    ) -> Result<String, String> {
274        let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
275        Ok(client
276            .create_payment(auth_token, days, method)
277            .await
278            .map_err(|s| s.to_string())?
279            .map_err(|s| s.to_string())?)
280    }
281}
282
283pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
284
285#[async_trait]
286impl RpcTransport for DummyControlProtocolTransport {
287    type Error = Infallible;
288
289    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
290        Ok(self.0.respond_raw(req).await)
291    }
292}