1use std::{
2 convert::Infallible,
3 sync::{Arc, LazyLock},
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10 puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use moka::future::Cache;
15use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use slab::Slab;
19
20use crate::{
21 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
22 traffcount::TRAFF_COUNT, Config,
23};
24
25#[nanorpc_derive]
26#[async_trait]
27pub trait ControlProtocol {
28 async fn conn_info(&self) -> ConnInfo;
29 async fn stat_num(&self, stat: String) -> f64;
30 async fn start_time(&self) -> SystemTime;
31 async fn stop(&self);
32
33 async fn recent_logs(&self) -> Vec<String>;
34
35 async fn check_secret(&self, secret: String) -> Result<bool, String>;
38 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
39 async fn start_registration(&self) -> Result<usize, String>;
40 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
41 async fn convert_legacy_account(
42 &self,
43 username: String,
44 password: String,
45 ) -> Result<String, String>;
46 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
47 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
48 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
49 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
50 async fn create_payment(
51 &self,
52 secret: String,
53 days: u32,
54 method: String,
55 ) -> Result<String, String>;
56 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
57 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
58 async fn export_debug_pack(
59 &self,
60 email: Option<String>,
61 contents: String,
62 ) -> Result<(), String>;
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66#[serde(tag = "state")]
67pub enum ConnInfo {
68 Disconnected,
69 Connecting,
70 Connected(ConnectedInfo),
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct ConnectedInfo {
75 pub protocol: String,
76 pub bridge: String,
77
78 pub exit: ExitDescriptor,
79}
80
81#[derive(Serialize, Deserialize, Clone, Debug)]
82pub struct UserInfo {
83 pub level: AccountLevel,
84 pub expiry: Option<u64>,
85}
86
87pub struct ControlProtocolImpl {
88 pub ctx: AnyCtx<Config>,
89}
90
91pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
92
93static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
94 LazyLock::new(|| Mutex::new(Slab::new()));
95
96#[derive(Serialize, Deserialize, Clone)]
97pub struct RegistrationProgress {
98 pub progress: f64,
99 pub secret: Option<String>,
100}
101
102#[async_trait]
103impl ControlProtocol for ControlProtocolImpl {
104 async fn conn_info(&self) -> ConnInfo {
105 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
106 }
107
108 async fn stat_num(&self, stat: String) -> f64 {
109 stat_get_num(&self.ctx, &stat)
110 }
111
112 async fn start_time(&self) -> SystemTime {
113 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
114 *self.ctx.get(START_TIME)
115 }
116
117 async fn stop(&self) {
118 std::thread::spawn(move || {
119 std::thread::sleep(Duration::from_millis(100));
120 std::process::exit(0);
121 });
122 }
123
124 async fn recent_logs(&self) -> Vec<String> {
125 get_json_logs().split("\n").map(|s| s.to_string()).collect()
126 }
127
128 async fn check_secret(&self, secret: String) -> Result<bool, String> {
129 let res = broker_client(&self.ctx)
130 .map_err(|e| format!("{:?}", e))?
131 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
132 .await
133 .map_err(|e| format!("{:?}", e))?
134 .map_err(|e| format!("{:?}", e))?;
135 Ok(res.is_some())
136 }
137
138 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
139 let res = broker_client(&self.ctx)
140 .map_err(|e| format!("{:?}", e))?
141 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
142 .await
143 .map_err(|e| format!("{:?}", e))?
144 .map_err(|e| format!("{:?}", e))?
145 .ok_or_else(|| "no such user".to_string())?;
146 Ok(UserInfo {
147 level: if res.plus_expires_unix.is_some() {
148 AccountLevel::Plus
149 } else {
150 AccountLevel::Free
151 },
152 expiry: res.plus_expires_unix,
153 })
154 }
155
156 async fn start_registration(&self) -> Result<usize, String> {
157 let (puzzle, difficulty) = broker_client(&self.ctx)
158 .map_err(|e| format!("{:?}", e))?
159 .get_puzzle()
160 .await
161 .map_err(|e| format!("{:?}", e))?;
162 tracing::debug!(puzzle, difficulty, "got puzzle");
163 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
164 progress: 0.0,
165 secret: None,
166 });
167 let ctx = self.ctx.clone();
168 smolscale::spawn(async move {
169 loop {
170 let fallible = async {
171 let solution = {
172 let puzzle = puzzle.clone();
173 smol::unblock(move || {
174 solve_puzzle(&puzzle, difficulty, |progress| {
175 REGISTRATIONS.lock()[idx] = RegistrationProgress {
176 progress,
177 secret: None,
178 }
179 })
180 })
181 .await
182 };
183 let secret = broker_client(&ctx)?
184 .register_user_secret(puzzle.clone(), solution)
185 .await?
186 .map_err(|e| anyhow::anyhow!(e))?;
187 REGISTRATIONS.lock()[idx] = RegistrationProgress {
188 progress: 1.0,
189 secret: Some(secret.clone()),
190 };
191 anyhow::Ok(secret)
192 };
193 if let Err(err) = fallible.await {
194 tracing::warn!(err = debug(err), "restarting registration")
195 } else {
196 break;
197 }
198 }
199 })
200 .detach();
201 Ok(idx)
202 }
203
204 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
205 tracing::debug!(idx, "polling registration");
206 let registers = REGISTRATIONS.lock();
207 registers
208 .get(idx)
209 .cloned()
210 .ok_or_else(|| "no such registration".to_string())
211 }
212
213 async fn convert_legacy_account(
214 &self,
215 username: String,
216 password: String,
217 ) -> Result<String, String> {
218 Ok(broker_client(&self.ctx)
219 .map_err(|e| format!("{:?}", e))?
220 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
221 username,
222 password,
223 })
224 .await
225 .map_err(|e| format!("{:?}", e))?
226 .map_err(|e| format!("{:?}", e))?)
227 }
228
229 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
230 if stat != "traffic" {
231 return Err(format!("bad: {stat}"));
232 }
233 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
234 }
235
236 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
237 let resp = broker_client(&self.ctx)
238 .map_err(|e| format!("{:?}", e))?
239 .get_exits()
240 .await
241 .map_err(|e| format!("{:?}", e))?
242 .map_err(|e| format!("{:?}", e))?;
243 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
244 }
245
246 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
247 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
248 Ok(client
249 .get_news(lang)
250 .await
251 .map_err(|s| s.to_string())?
252 .map_err(|s| s.to_string())?)
253 }
254
255 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
256 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
257 Ok(client
258 .get_free_voucher(secret)
259 .await
260 .map_err(|s| s.to_string())?
261 .map_err(|s| s.to_string())?)
262 }
263
264 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
265 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
266
267 client
269 .redeem_voucher(secret, code)
270 .await
271 .map_err(|s| s.to_string())?
272 .map_err(|s| s.to_string())
273 }
274
275 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
276 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
277 Ok(client
278 .raw_price_points()
279 .await
280 .map_err(|s| s.to_string())?
281 .map_err(|s| s.to_string())?
282 .into_iter()
283 .map(|(a, b)| (a, b as f64 / 100.0))
284 .collect())
285 }
286
287 async fn create_payment(
288 &self,
289 secret: String,
290 days: u32,
291 method: String,
292 ) -> Result<String, String> {
293 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
294 Ok(client
295 .create_payment(secret, days, method)
296 .await
297 .map_err(|s| s.to_string())?
298 .map_err(|s| s.to_string())?)
299 }
300
301 async fn export_debug_pack(
302 &self,
303 email: Option<String>,
304 contents: String,
305 ) -> Result<(), String> {
306 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
307 client
308 .upload_debug_pack(email, contents)
309 .await
310 .map_err(|s| s.to_string())?
311 .map_err(|s| s.to_string())?;
312 Ok(())
313 }
314}
315
316pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
317
318#[async_trait]
319impl RpcTransport for DummyControlProtocolTransport {
320 type Error = Infallible;
321
322 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
323 Ok(self.0.respond_raw(req).await)
324 }
325}