1use std::{
2 convert::Infallible,
3 sync::LazyLock,
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use chrono::{NaiveDate, NaiveDateTime};
10use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, VoucherInfo};
11
12use itertools::Itertools;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{
19 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
20 traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
21};
22
23#[nanorpc_derive]
24#[async_trait]
25pub trait ControlProtocol {
26 async fn conn_info(&self) -> ConnInfo;
27 async fn stat_num(&self, stat: String) -> f64;
28 async fn start_time(&self) -> SystemTime;
29 async fn stop(&self);
30
31 async fn recent_logs(&self) -> Vec<String>;
32
33 async fn check_secret(&self, secret: String) -> Result<bool, String>;
36 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
37 async fn start_registration(&self) -> Result<usize, String>;
38 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
39 async fn convert_legacy_account(
40 &self,
41 username: String,
42 password: String,
43 ) -> Result<String, String>;
44 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
45 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
46 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
47 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
48 async fn create_payment(
49 &self,
50 secret: String,
51 days: u32,
52 method: String,
53 ) -> Result<String, String>;
54 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
55 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
56 async fn export_debug_pack(
57 &self,
58 email: Option<String>,
59 contents: String,
60 ) -> Result<(), String>;
61
62 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66#[serde(tag = "state")]
67pub enum ConnInfo {
68 Disconnected,
69 Connecting,
70 Connected(ConnectedInfo),
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct ConnectedInfo {
75 pub protocol: String,
76 pub bridge: String,
77
78 pub exit: ExitDescriptor,
79}
80
81#[derive(Serialize, Deserialize, Clone, Debug)]
82pub struct UserInfo {
83 pub user_id: u64,
84 pub level: AccountLevel,
85
86 pub recurring: bool,
87 pub expiry: Option<u64>,
88}
89
90pub struct ControlProtocolImpl {
91 pub ctx: AnyCtx<Config>,
92}
93
94pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
95
96static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
97 LazyLock::new(|| Mutex::new(Slab::new()));
98
99#[derive(Serialize, Deserialize, Clone)]
100pub struct RegistrationProgress {
101 pub progress: f64,
102 pub secret: Option<String>,
103}
104
105#[async_trait]
106impl ControlProtocol for ControlProtocolImpl {
107 async fn conn_info(&self) -> ConnInfo {
108 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
109 }
110
111 async fn stat_num(&self, stat: String) -> f64 {
112 stat_get_num(&self.ctx, &stat)
113 }
114
115 async fn start_time(&self) -> SystemTime {
116 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
117 *self.ctx.get(START_TIME)
118 }
119
120 async fn stop(&self) {
121 std::thread::spawn(move || {
122 std::thread::sleep(Duration::from_millis(100));
123 std::process::exit(0);
124 });
125 }
126
127 async fn recent_logs(&self) -> Vec<String> {
128 get_json_logs().split("\n").map(|s| s.to_string()).collect()
129 }
130
131 async fn check_secret(&self, secret: String) -> Result<bool, String> {
132 let res = broker_client(&self.ctx)
133 .map_err(|e| format!("{:?}", e))?
134 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
135 .await
136 .map_err(|e| format!("{:?}", e))?
137 .map_err(|e| format!("{:?}", e))?;
138 Ok(res.is_some())
139 }
140
141 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
142 let res = broker_client(&self.ctx)
143 .map_err(|e| format!("{:?}", e))?
144 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
145 .await
146 .map_err(|e| format!("{:?}", e))?
147 .map_err(|e| format!("{:?}", e))?
148 .ok_or_else(|| "no such user".to_string())?;
149 Ok(UserInfo {
150 user_id: res.user_id,
151 level: if res.plus_expires_unix.is_some() {
152 AccountLevel::Plus
153 } else {
154 AccountLevel::Free
155 },
156 expiry: res.plus_expires_unix,
157 recurring: res.recurring,
158 })
159 }
160
161 async fn start_registration(&self) -> Result<usize, String> {
162 let (puzzle, difficulty) = broker_client(&self.ctx)
163 .map_err(|e| format!("{:?}", e))?
164 .get_puzzle()
165 .await
166 .map_err(|e| format!("{:?}", e))?;
167 tracing::debug!(puzzle, difficulty, "got puzzle");
168 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
169 progress: 0.0,
170 secret: None,
171 });
172 let ctx = self.ctx.clone();
173 smolscale::spawn(async move {
174 loop {
175 let fallible = async {
176 let solution = {
177 let puzzle = puzzle.clone();
178 smol::unblock(move || {
179 solve_puzzle(&puzzle, difficulty, |progress| {
180 REGISTRATIONS.lock()[idx] = RegistrationProgress {
181 progress,
182 secret: None,
183 }
184 })
185 })
186 .await
187 };
188 let secret = broker_client(&ctx)?
189 .register_user_secret(puzzle.clone(), solution)
190 .await?
191 .map_err(|e| anyhow::anyhow!(e))?;
192 REGISTRATIONS.lock()[idx] = RegistrationProgress {
193 progress: 1.0,
194 secret: Some(secret.clone()),
195 };
196 anyhow::Ok(secret)
197 };
198 if let Err(err) = fallible.await {
199 tracing::warn!(err = debug(err), "restarting registration")
200 } else {
201 break;
202 }
203 }
204 })
205 .detach();
206 Ok(idx)
207 }
208
209 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
210 tracing::debug!(idx, "polling registration");
211 let registers = REGISTRATIONS.lock();
212 registers
213 .get(idx)
214 .cloned()
215 .ok_or_else(|| "no such registration".to_string())
216 }
217
218 async fn convert_legacy_account(
219 &self,
220 username: String,
221 password: String,
222 ) -> Result<String, String> {
223 Ok(broker_client(&self.ctx)
224 .map_err(|e| format!("{:?}", e))?
225 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
226 username,
227 password,
228 })
229 .await
230 .map_err(|e| format!("{:?}", e))?
231 .map_err(|e| format!("{:?}", e))?)
232 }
233
234 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
235 if stat != "traffic" {
236 return Err(format!("bad: {stat}"));
237 }
238 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
239 }
240
241 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
242 let resp = broker_client(&self.ctx)
243 .map_err(|e| format!("{:?}", e))?
244 .get_exits()
245 .await
246 .map_err(|e| format!("{:?}", e))?
247 .map_err(|e| format!("{:?}", e))?;
248 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
249 }
250
251 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
252 let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
253 let news = manifest["news"]
254 .as_array()
255 .ok_or_else(|| "No news array".to_string())?;
256
257 let mut out = Vec::new();
258
259 for item in news {
260 let important = item["important"].as_bool().unwrap_or_default();
261 let date_str = item["date"]
262 .as_str()
263 .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
264
265 let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
266 .map_err(|_| format!("Invalid date format: {}", date_str))?;
267
268 let naive_dt: NaiveDateTime = naive_date
269 .and_hms_opt(0, 0, 0)
270 .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
271 let date_unix = naive_dt.and_utc().timestamp() as u64;
272
273 let localized = item[&lang]
274 .as_object()
275 .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
276
277 let title = localized["title"]
278 .as_str()
279 .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
280 let contents = localized["contents"]
281 .as_str()
282 .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
283
284 out.push(NewsItem {
285 title: title.to_string(),
286 date_unix,
287 contents: contents.to_string(),
288 important,
289 });
290 }
291
292 Ok(out)
293 }
294
295 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
296 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
297 Ok(client
298 .get_free_voucher(secret)
299 .await
300 .map_err(|s| s.to_string())?
301 .map_err(|s| s.to_string())?)
302 }
303
304 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
305 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
306
307 client
309 .redeem_voucher(secret, code)
310 .await
311 .map_err(|s| s.to_string())?
312 .map_err(|s| s.to_string())
313 }
314
315 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
316 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
317 Ok(client
318 .raw_price_points()
319 .await
320 .map_err(|s| s.to_string())?
321 .map_err(|s| s.to_string())?
322 .into_iter()
323 .map(|(a, b)| (a, b as f64 / 100.0))
324 .collect())
325 }
326
327 async fn create_payment(
328 &self,
329 secret: String,
330 days: u32,
331 method: String,
332 ) -> Result<String, String> {
333 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
334 Ok(client
335 .create_payment(secret, days, method)
336 .await
337 .map_err(|s| s.to_string())?
338 .map_err(|s| s.to_string())?)
339 }
340
341 async fn export_debug_pack(
342 &self,
343 email: Option<String>,
344 contents: String,
345 ) -> Result<(), String> {
346 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
347 client
348 .upload_debug_pack(email, contents)
349 .await
350 .map_err(|s| s.to_string())?
351 .map_err(|s| s.to_string())?;
352 Ok(())
353 }
354
355 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
356 get_update_manifest().await.map_err(|e| format!("{:?}", e))
357 }
358}
359
360pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
361
362#[async_trait]
363impl RpcTransport for DummyControlProtocolTransport {
364 type Error = Infallible;
365
366 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
367 Ok(self.0.respond_raw(req).await)
368 }
369}
370
371#[derive(Serialize, Deserialize, Clone, Debug)]
372pub struct NewsItem {
373 pub title: String,
374 pub date_unix: u64,
375 pub contents: String,
376 pub important: bool,
377}