1use std::{
2 convert::Infallible,
3 sync::LazyLock,
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{
10 puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem, VoucherInfo,
11};
12
13use itertools::Itertools;
14use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
15use parking_lot::Mutex;
16use serde::{Deserialize, Serialize};
17use slab::Slab;
18
19use crate::{
20 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
21 traffcount::TRAFF_COUNT, updates::get_update_manifest, Config,
22};
23
24#[nanorpc_derive]
25#[async_trait]
26pub trait ControlProtocol {
27 async fn conn_info(&self) -> ConnInfo;
28 async fn stat_num(&self, stat: String) -> f64;
29 async fn start_time(&self) -> SystemTime;
30 async fn stop(&self);
31
32 async fn recent_logs(&self) -> Vec<String>;
33
34 async fn check_secret(&self, secret: String) -> Result<bool, String>;
37 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
38 async fn start_registration(&self) -> Result<usize, String>;
39 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
40 async fn convert_legacy_account(
41 &self,
42 username: String,
43 password: String,
44 ) -> Result<String, String>;
45 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
46 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
47 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
48 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
49 async fn create_payment(
50 &self,
51 secret: String,
52 days: u32,
53 method: String,
54 ) -> Result<String, String>;
55 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String>;
56 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String>;
57 async fn export_debug_pack(
58 &self,
59 email: Option<String>,
60 contents: String,
61 ) -> Result<(), String>;
62
63 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String>;
64}
65
66#[derive(Serialize, Deserialize, Clone, Debug)]
67#[serde(tag = "state")]
68pub enum ConnInfo {
69 Disconnected,
70 Connecting,
71 Connected(ConnectedInfo),
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug)]
75pub struct ConnectedInfo {
76 pub protocol: String,
77 pub bridge: String,
78
79 pub exit: ExitDescriptor,
80}
81
82#[derive(Serialize, Deserialize, Clone, Debug)]
83pub struct UserInfo {
84 pub level: AccountLevel,
85 pub expiry: Option<u64>,
86}
87
88pub struct ControlProtocolImpl {
89 pub ctx: AnyCtx<Config>,
90}
91
92pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
93
94static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
95 LazyLock::new(|| Mutex::new(Slab::new()));
96
97#[derive(Serialize, Deserialize, Clone)]
98pub struct RegistrationProgress {
99 pub progress: f64,
100 pub secret: Option<String>,
101}
102
103#[async_trait]
104impl ControlProtocol for ControlProtocolImpl {
105 async fn conn_info(&self) -> ConnInfo {
106 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
107 }
108
109 async fn stat_num(&self, stat: String) -> f64 {
110 stat_get_num(&self.ctx, &stat)
111 }
112
113 async fn start_time(&self) -> SystemTime {
114 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
115 *self.ctx.get(START_TIME)
116 }
117
118 async fn stop(&self) {
119 std::thread::spawn(move || {
120 std::thread::sleep(Duration::from_millis(100));
121 std::process::exit(0);
122 });
123 }
124
125 async fn recent_logs(&self) -> Vec<String> {
126 get_json_logs().split("\n").map(|s| s.to_string()).collect()
127 }
128
129 async fn check_secret(&self, secret: String) -> Result<bool, String> {
130 let res = broker_client(&self.ctx)
131 .map_err(|e| format!("{:?}", e))?
132 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
133 .await
134 .map_err(|e| format!("{:?}", e))?
135 .map_err(|e| format!("{:?}", e))?;
136 Ok(res.is_some())
137 }
138
139 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
140 let res = broker_client(&self.ctx)
141 .map_err(|e| format!("{:?}", e))?
142 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
143 .await
144 .map_err(|e| format!("{:?}", e))?
145 .map_err(|e| format!("{:?}", e))?
146 .ok_or_else(|| "no such user".to_string())?;
147 Ok(UserInfo {
148 level: if res.plus_expires_unix.is_some() {
149 AccountLevel::Plus
150 } else {
151 AccountLevel::Free
152 },
153 expiry: res.plus_expires_unix,
154 })
155 }
156
157 async fn start_registration(&self) -> Result<usize, String> {
158 let (puzzle, difficulty) = broker_client(&self.ctx)
159 .map_err(|e| format!("{:?}", e))?
160 .get_puzzle()
161 .await
162 .map_err(|e| format!("{:?}", e))?;
163 tracing::debug!(puzzle, difficulty, "got puzzle");
164 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
165 progress: 0.0,
166 secret: None,
167 });
168 let ctx = self.ctx.clone();
169 smolscale::spawn(async move {
170 loop {
171 let fallible = async {
172 let solution = {
173 let puzzle = puzzle.clone();
174 smol::unblock(move || {
175 solve_puzzle(&puzzle, difficulty, |progress| {
176 REGISTRATIONS.lock()[idx] = RegistrationProgress {
177 progress,
178 secret: None,
179 }
180 })
181 })
182 .await
183 };
184 let secret = broker_client(&ctx)?
185 .register_user_secret(puzzle.clone(), solution)
186 .await?
187 .map_err(|e| anyhow::anyhow!(e))?;
188 REGISTRATIONS.lock()[idx] = RegistrationProgress {
189 progress: 1.0,
190 secret: Some(secret.clone()),
191 };
192 anyhow::Ok(secret)
193 };
194 if let Err(err) = fallible.await {
195 tracing::warn!(err = debug(err), "restarting registration")
196 } else {
197 break;
198 }
199 }
200 })
201 .detach();
202 Ok(idx)
203 }
204
205 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
206 tracing::debug!(idx, "polling registration");
207 let registers = REGISTRATIONS.lock();
208 registers
209 .get(idx)
210 .cloned()
211 .ok_or_else(|| "no such registration".to_string())
212 }
213
214 async fn convert_legacy_account(
215 &self,
216 username: String,
217 password: String,
218 ) -> Result<String, String> {
219 Ok(broker_client(&self.ctx)
220 .map_err(|e| format!("{:?}", e))?
221 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
222 username,
223 password,
224 })
225 .await
226 .map_err(|e| format!("{:?}", e))?
227 .map_err(|e| format!("{:?}", e))?)
228 }
229
230 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
231 if stat != "traffic" {
232 return Err(format!("bad: {stat}"));
233 }
234 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
235 }
236
237 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
238 let resp = broker_client(&self.ctx)
239 .map_err(|e| format!("{:?}", e))?
240 .get_exits()
241 .await
242 .map_err(|e| format!("{:?}", e))?
243 .map_err(|e| format!("{:?}", e))?;
244 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
245 }
246
247 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
248 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
249 Ok(client
250 .get_news(lang)
251 .await
252 .map_err(|s| s.to_string())?
253 .map_err(|s| s.to_string())?)
254 }
255
256 async fn get_free_voucher(&self, secret: String) -> Result<Option<VoucherInfo>, String> {
257 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
258 Ok(client
259 .get_free_voucher(secret)
260 .await
261 .map_err(|s| s.to_string())?
262 .map_err(|s| s.to_string())?)
263 }
264
265 async fn redeem_voucher(&self, secret: String, code: String) -> Result<i32, String> {
266 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
267
268 client
270 .redeem_voucher(secret, code)
271 .await
272 .map_err(|s| s.to_string())?
273 .map_err(|s| s.to_string())
274 }
275
276 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
277 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
278 Ok(client
279 .raw_price_points()
280 .await
281 .map_err(|s| s.to_string())?
282 .map_err(|s| s.to_string())?
283 .into_iter()
284 .map(|(a, b)| (a, b as f64 / 100.0))
285 .collect())
286 }
287
288 async fn create_payment(
289 &self,
290 secret: String,
291 days: u32,
292 method: String,
293 ) -> Result<String, String> {
294 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
295 Ok(client
296 .create_payment(secret, days, method)
297 .await
298 .map_err(|s| s.to_string())?
299 .map_err(|s| s.to_string())?)
300 }
301
302 async fn export_debug_pack(
303 &self,
304 email: Option<String>,
305 contents: String,
306 ) -> Result<(), String> {
307 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
308 client
309 .upload_debug_pack(email, contents)
310 .await
311 .map_err(|s| s.to_string())?
312 .map_err(|s| s.to_string())?;
313 Ok(())
314 }
315
316 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
317 get_update_manifest().await.map_err(|e| format!("{:?}", e))
318 }
319}
320
321pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
322
323#[async_trait]
324impl RpcTransport for DummyControlProtocolTransport {
325 type Error = Infallible;
326
327 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
328 Ok(self.0.respond_raw(req).await)
329 }
330}