1use std::{
2 convert::Infallible,
3 sync::{Arc, LazyLock},
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{
19 broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num,
20 traffcount::TRAFF_COUNT, Config,
21};
22
23#[nanorpc_derive]
24#[async_trait]
25pub trait ControlProtocol {
26 async fn conn_info(&self) -> ConnInfo;
27 async fn stat_num(&self, stat: String) -> f64;
28 async fn start_time(&self) -> SystemTime;
29 async fn stop(&self);
30
31 async fn recent_logs(&self) -> Vec<String>;
32
33 async fn check_secret(&self, secret: String) -> Result<bool, String>;
36 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
37 async fn start_registration(&self) -> Result<usize, String>;
38 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
39 async fn convert_legacy_account(
40 &self,
41 username: String,
42 password: String,
43 ) -> Result<String, String>;
44 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
45 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
46 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
47 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
48 async fn create_payment(
49 &self,
50 secret: String,
51 days: u32,
52 method: String,
53 ) -> Result<String, String>;
54 async fn export_debug_pack(
55 &self,
56 email: Option<String>,
57 contents: String,
58 ) -> Result<(), String>;
59}
60
61#[derive(Serialize, Deserialize, Clone, Debug)]
62#[serde(tag = "state")]
63pub enum ConnInfo {
64 Disconnected,
65 Connecting,
66 Connected(ConnectedInfo),
67}
68
69#[derive(Serialize, Deserialize, Clone, Debug)]
70pub struct ConnectedInfo {
71 pub protocol: String,
72 pub bridge: String,
73
74 pub exit: ExitDescriptor,
75}
76
77#[derive(Serialize, Deserialize, Clone, Debug)]
78pub struct UserInfo {
79 pub level: AccountLevel,
80 pub expiry: Option<u64>,
81}
82
83pub struct ControlProtocolImpl {
84 pub ctx: AnyCtx<Config>,
85}
86
87pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Disconnected);
88
89static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
90 LazyLock::new(|| Mutex::new(Slab::new()));
91
92#[derive(Serialize, Deserialize, Clone)]
93pub struct RegistrationProgress {
94 pub progress: f64,
95 pub secret: Option<String>,
96}
97
98#[async_trait]
99impl ControlProtocol for ControlProtocolImpl {
100 async fn conn_info(&self) -> ConnInfo {
101 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
102 }
103
104 async fn stat_num(&self, stat: String) -> f64 {
105 stat_get_num(&self.ctx, &stat)
106 }
107
108 async fn start_time(&self) -> SystemTime {
109 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
110 *self.ctx.get(START_TIME)
111 }
112
113 async fn stop(&self) {
114 std::thread::spawn(move || {
115 std::thread::sleep(Duration::from_millis(100));
116 std::process::exit(0);
117 });
118 }
119
120 async fn recent_logs(&self) -> Vec<String> {
121 get_json_logs().split("\n").map(|s| s.to_string()).collect()
122 }
123
124 async fn check_secret(&self, secret: String) -> Result<bool, String> {
125 let res = broker_client(&self.ctx)
126 .map_err(|e| format!("{:?}", e))?
127 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
128 .await
129 .map_err(|e| format!("{:?}", e))?
130 .map_err(|e| format!("{:?}", e))?;
131 Ok(res.is_some())
132 }
133
134 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
135 static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
136 Cache::builder()
137 .time_to_live(Duration::from_secs(60))
138 .build()
139 };
140
141 let cache = self.ctx.get(USER_INFO_CACHE);
142
143 cache
144 .try_get_with(secret.clone(), async {
145 let res = broker_client(&self.ctx)
146 .map_err(|e| format!("{:?}", e))?
147 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
148 .await
149 .map_err(|e| format!("{:?}", e))?
150 .map_err(|e| format!("{:?}", e))?
151 .ok_or_else(|| "no such user".to_string())?;
152 Ok(UserInfo {
153 level: if res.plus_expires_unix.is_some() {
154 AccountLevel::Plus
155 } else {
156 AccountLevel::Free
157 },
158 expiry: res.plus_expires_unix,
159 })
160 })
161 .await
162 .map_err(|s: Arc<String>| (*s).clone())
163 }
164
165 async fn start_registration(&self) -> Result<usize, String> {
166 let (puzzle, difficulty) = broker_client(&self.ctx)
167 .map_err(|e| format!("{:?}", e))?
168 .get_puzzle()
169 .await
170 .map_err(|e| format!("{:?}", e))?;
171 tracing::debug!(puzzle, difficulty, "got puzzle");
172 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
173 progress: 0.0,
174 secret: None,
175 });
176 let ctx = self.ctx.clone();
177 smolscale::spawn(async move {
178 loop {
179 let fallible = async {
180 let solution = {
181 let puzzle = puzzle.clone();
182 smol::unblock(move || {
183 solve_puzzle(&puzzle, difficulty, |progress| {
184 REGISTRATIONS.lock()[idx] = RegistrationProgress {
185 progress,
186 secret: None,
187 }
188 })
189 })
190 .await
191 };
192 let secret = broker_client(&ctx)?
193 .register_user_secret(puzzle.clone(), solution)
194 .await?
195 .map_err(|e| anyhow::anyhow!(e))?;
196 REGISTRATIONS.lock()[idx] = RegistrationProgress {
197 progress: 1.0,
198 secret: Some(secret.clone()),
199 };
200 anyhow::Ok(secret)
201 };
202 if let Err(err) = fallible.await {
203 tracing::warn!(err = debug(err), "restarting registration")
204 } else {
205 break;
206 }
207 }
208 })
209 .detach();
210 Ok(idx)
211 }
212
213 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
214 tracing::debug!(idx, "polling registration");
215 let registers = REGISTRATIONS.lock();
216 registers
217 .get(idx)
218 .cloned()
219 .ok_or_else(|| "no such registration".to_string())
220 }
221
222 async fn convert_legacy_account(
223 &self,
224 username: String,
225 password: String,
226 ) -> Result<String, String> {
227 Ok(broker_client(&self.ctx)
228 .map_err(|e| format!("{:?}", e))?
229 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
230 username,
231 password,
232 })
233 .await
234 .map_err(|e| format!("{:?}", e))?
235 .map_err(|e| format!("{:?}", e))?)
236 }
237
238 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
239 if stat != "traffic" {
240 return Err(format!("bad: {stat}"));
241 }
242 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
243 }
244
245 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
246 let resp = broker_client(&self.ctx)
247 .map_err(|e| format!("{:?}", e))?
248 .get_exits()
249 .await
250 .map_err(|e| format!("{:?}", e))?
251 .map_err(|e| format!("{:?}", e))?;
252 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
253 }
254
255 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
256 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
257 Ok(client
258 .get_news(lang)
259 .await
260 .map_err(|s| s.to_string())?
261 .map_err(|s| s.to_string())?)
262 }
263
264 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
265 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
266 Ok(client
267 .raw_price_points()
268 .await
269 .map_err(|s| s.to_string())?
270 .map_err(|s| s.to_string())?
271 .into_iter()
272 .map(|(a, b)| (a, b as f64 / 100.0))
273 .collect())
274 }
275
276 async fn create_payment(
277 &self,
278 auth_token: String,
279 days: u32,
280 method: String,
281 ) -> Result<String, String> {
282 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
283 Ok(client
284 .create_payment(auth_token, days, method)
285 .await
286 .map_err(|s| s.to_string())?
287 .map_err(|s| s.to_string())?)
288 }
289
290 async fn export_debug_pack(
291 &self,
292 email: Option<String>,
293 contents: String,
294 ) -> Result<(), String> {
295 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
296 client
297 .upload_debug_pack(email, contents)
298 .await
299 .map_err(|s| s.to_string())?
300 .map_err(|s| s.to_string())?;
301 Ok(())
302 }
303}
304
305pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
306
307#[async_trait]
308impl RpcTransport for DummyControlProtocolTransport {
309 type Error = Infallible;
310
311 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
312 Ok(self.0.respond_raw(req).await)
313 }
314}