1use std::{
2 convert::Infallible,
3 sync::{Arc, LazyLock},
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use async_trait::async_trait;
9use geph5_broker_protocol::{puzzle::solve_puzzle, AccountLevel, ExitDescriptor, NewsItem};
10
11use itertools::Itertools;
12use moka::future::Cache;
13use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
14use parking_lot::Mutex;
15use serde::{Deserialize, Serialize};
16use slab::Slab;
17
18use crate::{broker_client, client::CtxField, logs::LOGS, stats::stat_get_num, Config};
19
20#[nanorpc_derive]
21#[async_trait]
22pub trait ControlProtocol {
23 async fn conn_info(&self) -> ConnInfo;
24 async fn stat_num(&self, stat: String) -> f64;
25 async fn start_time(&self) -> SystemTime;
26 async fn stop(&self);
27
28 async fn recent_logs(&self) -> Vec<String>;
29
30 async fn check_secret(&self, secret: String) -> Result<bool, String>;
33 async fn user_info(&self, secret: String) -> Result<UserInfo, String>;
34 async fn start_registration(&self) -> Result<usize, String>;
35 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String>;
36 async fn convert_legacy_account(
37 &self,
38 username: String,
39 password: String,
40 ) -> Result<String, String>;
41 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String>;
42 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String>;
43 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String>;
44 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String>;
45 async fn create_payment(
46 &self,
47 secret: String,
48 days: u32,
49 method: String,
50 ) -> Result<String, String>;
51}
52
53#[derive(Serialize, Deserialize, Clone, Debug)]
54#[serde(tag = "state")]
55pub enum ConnInfo {
56 Connecting,
57 Connected(ConnectedInfo),
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug)]
61pub struct ConnectedInfo {
62 pub protocol: String,
63 pub bridge: String,
64
65 pub exit: ExitDescriptor,
66}
67
68#[derive(Serialize, Deserialize, Clone, Debug)]
69pub struct UserInfo {
70 pub level: AccountLevel,
71 pub expiry: Option<u64>,
72}
73
74pub struct ControlProtocolImpl {
75 pub ctx: AnyCtx<Config>,
76}
77
78pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Connecting);
79
80static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
81 LazyLock::new(|| Mutex::new(Slab::new()));
82
83#[derive(Serialize, Deserialize, Clone)]
84pub struct RegistrationProgress {
85 pub progress: f64,
86 pub secret: Option<String>,
87}
88
89#[async_trait]
90impl ControlProtocol for ControlProtocolImpl {
91 async fn conn_info(&self) -> ConnInfo {
92 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
93 }
94
95 async fn stat_num(&self, stat: String) -> f64 {
96 stat_get_num(&self.ctx, &stat)
97 }
98
99 async fn start_time(&self) -> SystemTime {
100 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
101 *self.ctx.get(START_TIME)
102 }
103
104 async fn stop(&self) {
105 std::thread::spawn(move || {
106 std::thread::sleep(Duration::from_millis(100));
107 std::process::exit(0);
108 });
109 }
110
111 async fn recent_logs(&self) -> Vec<String> {
112 let logs = LOGS.lock();
113 String::from_utf8_lossy(&logs)
114 .split('\n')
115 .map(|s| s.to_string())
116 .collect_vec()
117 }
118
119 async fn check_secret(&self, secret: String) -> Result<bool, String> {
120 let res = broker_client(&self.ctx)
121 .map_err(|e| format!("{:?}", e))?
122 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
123 .await
124 .map_err(|e| format!("{:?}", e))?
125 .map_err(|e| format!("{:?}", e))?;
126 Ok(res.is_some())
127 }
128
129 async fn user_info(&self, secret: String) -> Result<UserInfo, String> {
130 static USER_INFO_CACHE: CtxField<Cache<String, UserInfo>> = |_| {
131 Cache::builder()
132 .time_to_live(Duration::from_secs(60))
133 .build()
134 };
135
136 let cache = self.ctx.get(USER_INFO_CACHE);
137
138 cache
139 .try_get_with(secret.clone(), async {
140 let res = broker_client(&self.ctx)
141 .map_err(|e| format!("{:?}", e))?
142 .get_user_info_by_cred(geph5_broker_protocol::Credential::Secret(secret))
143 .await
144 .map_err(|e| format!("{:?}", e))?
145 .map_err(|e| format!("{:?}", e))?
146 .ok_or_else(|| "no such user".to_string())?;
147 Ok(UserInfo {
148 level: if res.plus_expires_unix.is_some() {
149 AccountLevel::Plus
150 } else {
151 AccountLevel::Free
152 },
153 expiry: res.plus_expires_unix,
154 })
155 })
156 .await
157 .map_err(|s: Arc<String>| (*s).clone())
158 }
159
160 async fn start_registration(&self) -> Result<usize, String> {
161 let (puzzle, difficulty) = broker_client(&self.ctx)
162 .map_err(|e| format!("{:?}", e))?
163 .get_puzzle()
164 .await
165 .map_err(|e| format!("{:?}", e))?;
166 tracing::debug!(puzzle, difficulty, "got puzzle");
167 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
168 progress: 0.0,
169 secret: None,
170 });
171 let ctx = self.ctx.clone();
172 smolscale::spawn(async move {
173 loop {
174 let fallible = async {
175 let solution = {
176 let puzzle = puzzle.clone();
177 smol::unblock(move || {
178 solve_puzzle(&puzzle, difficulty, |progress| {
179 REGISTRATIONS.lock()[idx] = RegistrationProgress {
180 progress,
181 secret: None,
182 }
183 })
184 })
185 .await
186 };
187 let secret = broker_client(&ctx)?
188 .register_user_secret(puzzle.clone(), solution)
189 .await?
190 .map_err(|e| anyhow::anyhow!(e))?;
191 REGISTRATIONS.lock()[idx] = RegistrationProgress {
192 progress: 1.0,
193 secret: Some(secret.clone()),
194 };
195 anyhow::Ok(secret)
196 };
197 if let Err(err) = fallible.await {
198 tracing::warn!(err = debug(err), "restarting registration")
199 } else {
200 break;
201 }
202 }
203 })
204 .detach();
205 Ok(idx)
206 }
207
208 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
209 tracing::debug!(idx, "polling registration");
210 let registers = REGISTRATIONS.lock();
211 registers
212 .get(idx)
213 .cloned()
214 .ok_or_else(|| "no such registration".to_string())
215 }
216
217 async fn convert_legacy_account(
218 &self,
219 username: String,
220 password: String,
221 ) -> Result<String, String> {
222 Ok(broker_client(&self.ctx)
223 .map_err(|e| format!("{:?}", e))?
224 .upgrade_to_secret(geph5_broker_protocol::Credential::LegacyUsernamePassword {
225 username,
226 password,
227 })
228 .await
229 .map_err(|e| format!("{:?}", e))?
230 .map_err(|e| format!("{:?}", e))?)
231 }
232
233 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
234 Ok(vec![1.0, 2.0, 3.0])
235 }
236
237 async fn exit_list(&self) -> Result<Vec<ExitDescriptor>, String> {
238 let resp = broker_client(&self.ctx)
239 .map_err(|e| format!("{:?}", e))?
240 .get_exits()
241 .await
242 .map_err(|e| format!("{:?}", e))?
243 .map_err(|e| format!("{:?}", e))?;
244 Ok(resp.inner.all_exits.iter().map(|s| s.1.clone()).collect())
245 }
246
247 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
248 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
249 Ok(client
250 .get_news(lang)
251 .await
252 .map_err(|s| s.to_string())?
253 .map_err(|s| s.to_string())?)
254 }
255
256 async fn price_points(&self) -> Result<Vec<(u32, f64)>, String> {
257 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
258 Ok(client
259 .raw_price_points()
260 .await
261 .map_err(|s| s.to_string())?
262 .map_err(|s| s.to_string())?
263 .into_iter()
264 .map(|(a, b)| (a, b as f64 / 100.0))
265 .collect())
266 }
267
268 async fn create_payment(
269 &self,
270 auth_token: String,
271 days: u32,
272 method: String,
273 ) -> Result<String, String> {
274 let client = broker_client(&self.ctx).map_err(|e| format!("{:?}", e))?;
275 Ok(client
276 .create_payment(auth_token, days, method)
277 .await
278 .map_err(|s| s.to_string())?
279 .map_err(|s| s.to_string())?)
280 }
281}
282
283pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
284
285#[async_trait]
286impl RpcTransport for DummyControlProtocolTransport {
287 type Error = Infallible;
288
289 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
290 Ok(self.0.respond_raw(req).await)
291 }
292}