1use std::{
2 convert::Infallible,
3 sync::{Arc, LazyLock},
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10 puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use moka::future::Cache;
15use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use slab::Slab;
19
20use crate::{
21 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
22 traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
23};
24
25#[nanorpc_derive]
26#[async_trait]
27pub trait ControlProtocol {
28 async fn conn_info(&self) -> ConnInfo;
29 async fn stat_num(&self, stat: String) -> f64;
30 async fn start_time(&self) -> SystemTime;
31 async fn stop(&self);
32
33 async fn recent_logs(&self) -> Vec<String>;
34
35 async fn check_secret(&self, secret: String) -> Result<bool, String>;
38 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
39 async fn start_registration(&self) -> Result<usize, String>;
40 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
41 async fn convert_legacy_account(
42 &self,
43 username: String,
44 password: String,
45 ) -> Result<String, String>;
46 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
47 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
48 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
49 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
50 async fn create_payment(
51 &self,
52 secret: String,
53 days: u32,
54 method: String,
55 ) -> Result<String, String>;
56 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58 async fn export_debug_pack(
59 &self,
60 email: Option<String>,
61 contents: String,
62 ) -> Result<(), String>;
63
64 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
65}
66
67#[derive(Serialize, Deserialize, Clone, Debug)]
68#[serde(tag = "state")]
69pub enum ConnInfo {
70 Disconnected,
71 Connecting,
72 Connected(ConnectedInfo),
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug)]
76pub struct ConnectedInfo {
77 pub protocol: String,
78 pub bridge: String,
79
80 pub exit: ExitDescriptor,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct UserInfo {
85 pub user_id: u64,
86 pub level: AccountLevel,
87
88 pub recurring: bool,
89 pub expiry: Option<u64>,
90}
91
92pub struct ControlProtocolImpl {
93 pub ctx: AnyCtx<Config>,
94}
95
96pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
97
98static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
99 LazyLock::new(|| Mutex::new(Slab::new()));
100
101#[derive(Serialize, Deserialize, Clone)]
102pub struct RegistrationProgress {
103 pub progress: f64,
104 pub secret: Option<String>,
105}
106
107#[async_trait]
108impl ControlProtocol for ControlProtocolImpl {
109 async fn conn_info(&self) -> ConnInfo {
110 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
111 }
112
113 async fn stat_num(&self, stat: String) -> f64 {
114 stat_get_num(&self.ctx, &stat)
115 }
116
117 async fn start_time(&self) -> SystemTime {
118 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
119 *self.ctx.get(START_TIME)
120 }
121
122 async fn stop(&self) {
123 std::thread::spawn(move || {
124 std::thread::sleep(Duration::from_millis(100));
125 std::process::exit(0);
126 });
127 }
128
129 async fn recent_logs(&self) -> Vec<String> {
130 get_json_logs().split("\n").map(|s| s.to_string()).collect()
131 }
132
133 async fn check_secret(&self, secret: String) -> Result<bool, String> {
134 let res = broker_client(&self.ctx)
135 .map_err(|e| format!("{:?}", e))?
136 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
137 .await
138 .map_err(|e| format!("{:?}", e))?
139 .map_err(|e| format!("{:?}", e))?;
140 Ok(res.is_some())
141 }
142
143 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
144 let res = broker_client(&self.ctx)
145 .map_err(|e| format!("{:?}", e))?
146 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
147 .await
148 .map_err(|e| format!("{:?}", e))?
149 .map_err(|e| format!("{:?}", e))?
150 .ok_or_else(|| "no such user".to_string())?;
151 Ok(UserInfo {
152 user_id: res.user_id,
153 level: if res.plus_expires_unix.is_some() {
154 AccountLevel::Plus
155 } else {
156 AccountLevel::Free
157 },
158 expiry: res.plus_expires_unix,
159 recurring: res.recurring,
160 })
161 }
162
163 async fn start_registration(&self) -> Result<usize, String> {
164 let (puzzle, difficulty) = broker_client(&self.ctx)
165 .map_err(|e| format!("{:?}", e))?
166 .get_puzzle()
167 .await
168 .map_err(|e| format!("{:?}", e))?;
169 tracing::debug!(puzzle, difficulty, "got puzzle");
170 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
171 progress: 0.0,
172 secret: None,
173 });
174 let ctx = self.ctx.clone();
175 smolscale::spawn(async move {
176 loop {
177 let fallible = async {
178 let solution = {
179 let puzzle = puzzle.clone();
180 smol::unblock(move || {
181 solve_puzzle(&puzzle, difficulty, |progress| {
182 REGISTRATIONS.lock()[idx] = RegistrationProgress {
183 progress,
184 secret: None,
185 }
186 })
187 })
188 .await
189 };
190 let secret = broker_client(&ctx)?
191 .register_user_secret(puzzle.clone(), solution)
192 .await?
193 .map_err(|e| anyhow::anyhow!(e))?;
194 REGISTRATIONS.lock()[idx] = RegistrationProgress {
195 progress: 1.0,
196 secret: Some(secret.clone()),
197 };
198 anyhow::Ok(secret)
199 };
200 if let Err(err) = fallible.await {
201 tracing::warn!(err = debug(err), "restarting registration")
202 } else {
203 break;
204 }
205 }
206 })
207 .detach();
208 Ok(idx)
209 }
210
211 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
212 tracing::debug!(idx, "polling registration");
213 let registers = REGISTRATIONS.lock();
214 registers
215 .get(idx)
216 .cloned()
217 .ok_or_else(|| "no such registration".to_string())
218 }
219
220 async fn convert_legacy_account(
221 &self,
222 username: String,
223 password: String,
224 ) -> Result<String, String> {
225 Ok(broker_client(&self.ctx)
226 .map_err(|e| format!("{:?}", e))?
227 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
228 username,
229 password,
230 })
231 .await
232 .map_err(|e| format!("{:?}", e))?
233 .map_err(|e| format!("{:?}", e))?)
234 }
235
236 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
237 if stat != "traffic" {
238 return Err(format!("bad: {stat}"));
239 }
240 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
241 }
242
243 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
244 let resp = broker_client(&self.ctx)
245 .map_err(|e| format!("{:?}", e))?
246 .get_exits()
247 .await
248 .map_err(|e| format!("{:?}", e))?
249 .map_err(|e| format!("{:?}", e))?;
250 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
251 }
252
253 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
254 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
255 Ok(client
256 .get_news(lang)
257 .await
258 .map_err(|s| s.to_string())?
259 .map_err(|s| s.to_string())?)
260 }
261
262 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
263 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
264 Ok(client
265 .get_free_voucher(secret)
266 .await
267 .map_err(|s| s.to_string())?
268 .map_err(|s| s.to_string())?)
269 }
270
271 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
272 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
273
274 client
276 .redeem_voucher(secret, code)
277 .await
278 .map_err(|s| s.to_string())?
279 .map_err(|s| s.to_string())
280 }
281
282 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
283 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
284 Ok(client
285 .raw_price_points()
286 .await
287 .map_err(|s| s.to_string())?
288 .map_err(|s| s.to_string())?
289 .into_iter()
290 .map(|(a, b)| (a, b as f64 / 100.0))
291 .collect())
292 }
293
294 async fn create_payment(
295 &self,
296 secret: String,
297 days: u32,
298 method: String,
299 ) -> Result<String, String> {
300 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
301 Ok(client
302 .create_payment(secret, days, method)
303 .await
304 .map_err(|s| s.to_string())?
305 .map_err(|s| s.to_string())?)
306 }
307
308 async fn export_debug_pack(
309 &self,
310 email: Option<String>,
311 contents: String,
312 ) -> Result<(), String> {
313 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
314 client
315 .upload_debug_pack(email, contents)
316 .await
317 .map_err(|s| s.to_string())?
318 .map_err(|s| s.to_string())?;
319 Ok(())
320 }
321
322 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
323 get_update_manifest().await.map_err(|e| format!("{:?}", e))
324 }
325}
326
327pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
328
329#[async_trait]
330impl RpcTransport for DummyControlProtocolTransport {
331 type Error = Infallible;
332
333 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
334 Ok(self.0.respond_raw(req).await)
335 }
336}