1use std::{
2 convert::Infallible,
3 sync::LazyLock,
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use chrono::{NaiveDate, NaiveDateTime};
10use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, VoucherInfo};
11
12use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use slab::Slab;
16use tap::Tap;
17
18use crate::{
19 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
20 traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
21};
22
23#[nanorpc_derive]
24#[async_trait]
25pub trait ControlProtocol {
26 async fn conn_info(&self) -> ConnInfo;
27 async fn stat_num(&self, stat: String) -> f64;
28 async fn start_time(&self) -> SystemTime;
29 async fn stop(&self);
30
31 async fn recent_logs(&self) -> Vec<String>;
32
33 async fn check_secret(&self, secret: String) -> Result<bool, String>;
36 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
37 async fn start_registration(&self) -> Result<usize, String>;
38 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
39 async fn convert_legacy_account(
40 &self,
41 username: String,
42 password: String,
43 ) -> Result<String, String>;
44 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
45 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
46 async fn free_exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
47 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
48 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
49 async fn payment_methods(&self) -> Result<Vec<String>, String>;
50 async fn create_payment(
51 &self,
52 secret: String,
53 days: u32,
54 method: String,
55 ) -> Result<String, String>;
56 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58 async fn export_debug_pack(
59 &self,
60 email: Option<String>,
61 contents: String,
62 ) -> Result<(), String>;
63
64 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
65}
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
68#[serde(tag = "state")]
69pub enum ConnInfo {
70 Disconnected,
71 Connecting,
72 Connected(ConnectedInfo),
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct ConnectedInfo {
77 pub protocol: String,
78 pub bridge: String,
79
80 pub exit: ExitDescriptor,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct UserInfo {
85 pub user_id: u64,
86 pub level: AccountLevel,
87
88 pub recurring: bool,
89 pub expiry: Option<u64>,
90}
91
92pub struct ControlProtocolImpl {
93 pub ctx: AnyCtx<Config>,
94}
95
96pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
97
98static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
99 LazyLock::new(|| Mutex::new(Slab::new()));
100
101#[derive(Serialize, Deserialize, Clone)]
102pub struct RegistrationProgress {
103 pub progress: f64,
104 pub secret: Option<String>,
105}
106
107#[async_trait]
108impl ControlProtocol for ControlProtocolImpl {
109 async fn conn_info(&self) -> ConnInfo {
110 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
111 }
112
113 async fn stat_num(&self, stat: String) -> f64 {
114 stat_get_num(&self.ctx, &stat)
115 }
116
117 async fn start_time(&self) -> SystemTime {
118 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
119 *self.ctx.get(START_TIME)
120 }
121
122 async fn stop(&self) {
123 std::thread::spawn(move || {
124 std::thread::sleep(Duration::from_millis(100));
125 std::process::exit(0);
126 });
127 }
128
129 async fn recent_logs(&self) -> Vec<String> {
130 get_json_logs().split("\n").map(|s| s.to_string()).collect()
131 }
132
133 async fn check_secret(&self, secret: String) -> Result<bool, String> {
134 let res = broker_client(&self.ctx)
135 .map_err(|e| format!("{:?}", e))?
136 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
137 .await
138 .map_err(|e| format!("{:?}", e))?
139 .map_err(|e| format!("{:?}", e))?;
140 Ok(res.is_some())
141 }
142
143 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
144 let res = broker_client(&self.ctx)
145 .map_err(|e| format!("{:?}", e))?
146 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
147 .await
148 .map_err(|e| format!("{:?}", e))?
149 .map_err(|e| format!("{:?}", e))?
150 .ok_or_else(|| "no such user".to_string())?;
151 Ok(UserInfo {
152 user_id: res.user_id,
153 level: if res.plus_expires_unix.is_some() {
154 AccountLevel::Plus
155 } else {
156 AccountLevel::Free
157 },
158 expiry: res.plus_expires_unix,
159 recurring: res.recurring,
160 })
161 }
162
163 async fn start_registration(&self) -> Result<usize, String> {
164 let (puzzle, difficulty) = broker_client(&self.ctx)
165 .map_err(|e| format!("{:?}", e))?
166 .get_puzzle()
167 .await
168 .map_err(|e| format!("{:?}", e))?;
169 tracing::debug!(puzzle, difficulty, "got puzzle");
170 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
171 progress: 0.0,
172 secret: None,
173 });
174 let ctx = self.ctx.clone();
175 smolscale::spawn(async move {
176 loop {
177 let fallible = async {
178 let solution = {
179 let puzzle = puzzle.clone();
180 smol::unblock(move || {
181 solve_puzzle(&puzzle, difficulty, |progress| {
182 REGISTRATIONS.lock()[idx] = RegistrationProgress {
183 progress,
184 secret: None,
185 }
186 })
187 })
188 .await
189 };
190 let secret = broker_client(&ctx)?
191 .register_user_secret(puzzle.clone(), solution)
192 .await?
193 .map_err(|e| anyhow::anyhow!(e))?;
194 REGISTRATIONS.lock()[idx] = RegistrationProgress {
195 progress: 1.0,
196 secret: Some(secret.clone()),
197 };
198 anyhow::Ok(secret)
199 };
200 if let Err(err) = fallible.await {
201 tracing::warn!(err = debug(err), "restarting registration")
202 } else {
203 break;
204 }
205 }
206 })
207 .detach();
208 Ok(idx)
209 }
210
211 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
212 tracing::debug!(idx, "polling registration");
213 let registers = REGISTRATIONS.lock();
214 registers
215 .get(idx)
216 .cloned()
217 .ok_or_else(|| "no such registration".to_string())
218 }
219
220 async fn convert_legacy_account(
221 &self,
222 username: String,
223 password: String,
224 ) -> Result<String, String> {
225 Ok(broker_client(&self.ctx)
226 .map_err(|e| format!("{:?}", e))?
227 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
228 username,
229 password,
230 })
231 .await
232 .map_err(|e| format!("{:?}", e))?
233 .map_err(|e| format!("{:?}", e))?)
234 }
235
236 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
237 if stat != "traffic" {
238 return Err(format!("bad: {stat}"));
239 }
240 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
241 }
242
243 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
244 let resp = broker_client(&self.ctx)
245 .map_err(|e| format!("{:?}", e))?
246 .get_exits()
247 .await
248 .map_err(|e| format!("{:?}", e))?
249 .map_err(|e| format!("{:?}", e))?;
250 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
251 }
252
253 async fn free_exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
254 let resp = broker_client(&self.ctx)
255 .map_err(|e| format!("{:?}", e))?
256 .get_free_exits()
257 .await
258 .map_err(|e| format!("{:?}", e))?
259 .map_err(|e| format!("{:?}", e))?;
260 Ok(resp
261 .inner
262 .all_exits
263 .iter()
264 .map(|s| s.1.clone().tap_mut(|e| e.load = e.load.powf(0.5)))
265 .collect())
266 }
267
268 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
269 let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
270 let news = manifest["news"]
271 .as_array()
272 .ok_or_else(|| "No news array".to_string())?;
273
274 let mut out = Vec::new();
275
276 for item in news {
277 let important = item["important"].as_bool().unwrap_or_default();
278 let date_str = item["date"]
279 .as_str()
280 .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
281
282 let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
283 .map_err(|_| format!("Invalid date format: {}", date_str))?;
284
285 let naive_dt: NaiveDateTime = naive_date
286 .and_hms_opt(0, 0, 0)
287 .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
288 let date_unix = naive_dt.and_utc().timestamp() as u64;
289
290 let localized = item[&lang]
291 .as_object()
292 .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
293
294 let title = localized["title"]
295 .as_str()
296 .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
297 let contents = localized["contents"]
298 .as_str()
299 .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
300
301 out.push(NewsItem {
302 title: title.to_string(),
303 date_unix,
304 contents: contents.to_string(),
305 important,
306 });
307 }
308
309 Ok(out)
310 }
311
312 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
313 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
314 Ok(client
315 .get_free_voucher(secret)
316 .await
317 .map_err(|s| s.to_string())?
318 .map_err(|s| s.to_string())?)
319 }
320
321 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
322 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
323
324 client
326 .redeem_voucher(secret, code)
327 .await
328 .map_err(|s| s.to_string())?
329 .map_err(|s| s.to_string())
330 }
331
332 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
333 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
334 Ok(client
335 .raw_price_points()
336 .await
337 .map_err(|s| s.to_string())?
338 .map_err(|s| s.to_string())?
339 .into_iter()
340 .map(|(a, b)| (a, b as f64 / 100.0))
341 .collect())
342 }
343
344 async fn payment_methods(&self) -> Result<Vec<String>, String> {
345 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
346 Ok(client
347 .payment_methods()
348 .await
349 .map_err(|s| s.to_string())?
350 .map_err(|s| s.to_string())?)
351 }
352
353 async fn create_payment(
354 &self,
355 secret: String,
356 days: u32,
357 method: String,
358 ) -> Result<String, String> {
359 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
360 Ok(client
361 .create_payment(secret, days, method)
362 .await
363 .map_err(|s| s.to_string())?
364 .map_err(|s| s.to_string())?)
365 }
366
367 async fn export_debug_pack(
368 &self,
369 email: Option<String>,
370 contents: String,
371 ) -> Result<(), String> {
372 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
373 client
374 .upload_debug_pack(email, contents)
375 .await
376 .map_err(|s| s.to_string())?
377 .map_err(|s| s.to_string())?;
378 Ok(())
379 }
380
381 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
382 get_update_manifest().await.map_err(|e| format!("{:?}", e))
383 }
384}
385
386pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
387
388#[async_trait]
389impl RpcTransport for DummyControlProtocolTransport {
390 type Error = Infallible;
391
392 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
393 Ok(self.0.respond_raw(req).await)
394 }
395}
396
397#[derive(Serialize, Deserialize, Clone, Debug)]
398pub struct NewsItem {
399 pub title: String,
400 pub date_unix: u64,
401 pub contents: String,
402 pub important: bool,
403}