1use std::{
2 convert::Infallible,
3 sync::{Arc, LazyLock},
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{broker_client, client::CtxField, logging::get_json_logs, stats::stat_get_num, Config};
19
20#[nanorpc_derive]
21#[async_trait]
22pub trait ControlProtocol {
23 async fn conn_info(&self) -> ConnInfo;
24 async fn stat_num(&self, stat: String) -> f64;
25 async fn start_time(&self) -> SystemTime;
26 async fn stop(&self);
27
28 async fn recent_logs(&self) -> Vec<String>;
29
30 async fn check_secret(&self, secret: String) -> Result<bool, String>;
33 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
34 async fn start_registration(&self) -> Result<usize, String>;
35 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
36 async fn convert_legacy_account(
37 &self,
38 username: String,
39 password: String,
40 ) -> Result<String, String>;
41 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
42 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
43 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
44 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
45 async fn create_payment(
46 &self,
47 secret: String,
48 days: u32,
49 method: String,
50 ) -> Result<String, String>;
51 async fn export_debug_pack(
52 &self,
53 email: Option<String>,
54 contents: String,
55 ) -> Result<(), String>;
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug)]
59#[serde(tag = "state")]
60pub enum ConnInfo {
61 Connecting,
62 Connected(ConnectedInfo),
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug)]
66pub struct ConnectedInfo {
67 pub protocol: String,
68 pub bridge: String,
69
70 pub exit: ExitDescriptor,
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug)]
74pub struct UserInfo {
75 pub level: AccountLevel,
76 pub expiry: Option<u64>,
77}
78
79pub struct ControlProtocolImpl {
80 pub ctx: AnyCtx<Config>,
81}
82
83pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Connecting);
84
85static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
86 LazyLock::new(|| Mutex::new(Slab::new()));
87
88#[derive(Serialize, Deserialize, Clone)]
89pub struct RegistrationProgress {
90 pub progress: f64,
91 pub secret: Option<String>,
92}
93
94#[async_trait]
95impl ControlProtocol for ControlProtocolImpl {
96 async fn conn_info(&self) -> ConnInfo {
97 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
98 }
99
100 async fn stat_num(&self, stat: String) -> f64 {
101 stat_get_num(&self.ctx, &stat)
102 }
103
104 async fn start_time(&self) -> SystemTime {
105 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
106 *self.ctx.get(START_TIME)
107 }
108
109 async fn stop(&self) {
110 std::thread::spawn(move || {
111 std::thread::sleep(Duration::from_millis(100));
112 std::process::exit(0);
113 });
114 }
115
116 async fn recent_logs(&self) -> Vec<String> {
117 get_json_logs().split("\n").map(|s| s.to_string()).collect()
118 }
119
120 async fn check_secret(&self, secret: String) -> Result<bool, String> {
121 let res = broker_client(&self.ctx)
122 .map_err(|e| format!("{:?}", e))?
123 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
124 .await
125 .map_err(|e| format!("{:?}", e))?
126 .map_err(|e| format!("{:?}", e))?;
127 Ok(res.is_some())
128 }
129
130 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
131 static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
132 Cache::builder()
133 .time_to_live(Duration::from_secs(60))
134 .build()
135 };
136
137 let cache = self.ctx.get(USER_INFO_CACHE);
138
139 cache
140 .try_get_with(secret.clone(), async {
141 let res = broker_client(&self.ctx)
142 .map_err(|e| format!("{:?}", e))?
143 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
144 .await
145 .map_err(|e| format!("{:?}", e))?
146 .map_err(|e| format!("{:?}", e))?
147 .ok_or_else(|| "no such user".to_string())?;
148 Ok(UserInfo {
149 level: if res.plus_expires_unix.is_some() {
150 AccountLevel::Plus
151 } else {
152 AccountLevel::Free
153 },
154 expiry: res.plus_expires_unix,
155 })
156 })
157 .await
158 .map_err(|s: Arc<String>| (*s).clone())
159 }
160
161 async fn start_registration(&self) -> Result<usize, String> {
162 let (puzzle, difficulty) = broker_client(&self.ctx)
163 .map_err(|e| format!("{:?}", e))?
164 .get_puzzle()
165 .await
166 .map_err(|e| format!("{:?}", e))?;
167 tracing::debug!(puzzle, difficulty, "got puzzle");
168 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
169 progress: 0.0,
170 secret: None,
171 });
172 let ctx = self.ctx.clone();
173 smolscale::spawn(async move {
174 loop {
175 let fallible = async {
176 let solution = {
177 let puzzle = puzzle.clone();
178 smol::unblock(move || {
179 solve_puzzle(&puzzle, difficulty, |progress| {
180 REGISTRATIONS.lock()[idx] = RegistrationProgress {
181 progress,
182 secret: None,
183 }
184 })
185 })
186 .await
187 };
188 let secret = broker_client(&ctx)?
189 .register_user_secret(puzzle.clone(), solution)
190 .await?
191 .map_err(|e| anyhow::anyhow!(e))?;
192 REGISTRATIONS.lock()[idx] = RegistrationProgress {
193 progress: 1.0,
194 secret: Some(secret.clone()),
195 };
196 anyhow::Ok(secret)
197 };
198 if let Err(err) = fallible.await {
199 tracing::warn!(err = debug(err), "restarting registration")
200 } else {
201 break;
202 }
203 }
204 })
205 .detach();
206 Ok(idx)
207 }
208
209 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
210 tracing::debug!(idx, "polling registration");
211 let registers = REGISTRATIONS.lock();
212 registers
213 .get(idx)
214 .cloned()
215 .ok_or_else(|| "no such registration".to_string())
216 }
217
218 async fn convert_legacy_account(
219 &self,
220 username: String,
221 password: String,
222 ) -> Result<String, String> {
223 Ok(broker_client(&self.ctx)
224 .map_err(|e| format!("{:?}", e))?
225 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
226 username,
227 password,
228 })
229 .await
230 .map_err(|e| format!("{:?}", e))?
231 .map_err(|e| format!("{:?}", e))?)
232 }
233
234 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
235 Ok(vec![1.0, 2.0, 3.0])
236 }
237
238 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
239 let resp = broker_client(&self.ctx)
240 .map_err(|e| format!("{:?}", e))?
241 .get_exits()
242 .await
243 .map_err(|e| format!("{:?}", e))?
244 .map_err(|e| format!("{:?}", e))?;
245 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
246 }
247
248 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
249 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
250 Ok(client
251 .get_news(lang)
252 .await
253 .map_err(|s| s.to_string())?
254 .map_err(|s| s.to_string())?)
255 }
256
257 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
258 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
259 Ok(client
260 .raw_price_points()
261 .await
262 .map_err(|s| s.to_string())?
263 .map_err(|s| s.to_string())?
264 .into_iter()
265 .map(|(a, b)| (a, b as f64 / 100.0))
266 .collect())
267 }
268
269 async fn create_payment(
270 &self,
271 auth_token: String,
272 days: u32,
273 method: String,
274 ) -> Result<String, String> {
275 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
276 Ok(client
277 .create_payment(auth_token, days, method)
278 .await
279 .map_err(|s| s.to_string())?
280 .map_err(|s| s.to_string())?)
281 }
282
283 async fn export_debug_pack(
284 &self,
285 email: Option<String>,
286 contents: String,
287 ) -> Result<(), String> {
288 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
289 client
290 .upload_debug_pack(email, contents)
291 .await
292 .map_err(|s| s.to_string())?
293 .map_err(|s| s.to_string())?;
294 Ok(())
295 }
296}
297
298pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
299
300#[async_trait]
301impl RpcTransport for DummyControlProtocolTransport {
302 type Error = Infallible;
303
304 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
305 Ok(self.0.respond_raw(req).await)
306 }
307}