1use std::{
2 convert::Infallible,
3 sync::LazyLock,
4 time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use arrayref::array_ref;
9use async_trait::async_trait;
10use chrono::{NaiveDate, NaiveDateTime};
11use geph5_broker_protocol::{NetStatus, puzzle::solve_puzzle};
12
13use geph5_misc_rpc::client_control::{
14 ConnInfo, ConnectedInfo, ControlProtocol, ControlService, NewsItem, RegistrationProgress,
15};
16use nanorpc::{JrpcId, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
17use parking_lot::Mutex;
18
19use serde_json::{Value, json};
20use slab::Slab;
21
22use crate::{
23 Config, broker::get_net_status, broker_client, client::CtxField, logging::get_json_logs,
24 stats::stat_get_num, traffcount::TRAFF_COUNT, updates::get_update_manifest,
25};
26
27pub struct ControlProtocolImpl {
28 pub ctx: AnyCtx<Config>,
29}
30
31pub static CURRENT_CONNECTED_INFOS: CtxField<Mutex<Slab<ConnectedInfo>>> =
32 |_| Mutex::new(Slab::new());
33
34static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
35 LazyLock::new(|| Mutex::new(Slab::new()));
36
37#[async_trait]
38impl ControlProtocol for ControlProtocolImpl {
39 async fn ab_test(&self, key: String, secret: String) -> Result<bool, String> {
40 let id = blake3::hash(secret.as_bytes());
41 let id = u64::from_be_bytes(*array_ref![id.as_bytes(), 0, 8]);
42 let res = self
43 .broker_rpc("opaque_abtest".into(), vec![json!(key), json!(id)])
44 .await?;
45 serde_json::from_value(res).map_err(|e| e.to_string())
46 }
47
48 async fn conn_info(&self) -> ConnInfo {
49 if self.ctx.init().dry_run {
50 return ConnInfo::Disconnected;
51 }
52 let connected_infos = self
53 .ctx
54 .get(CURRENT_CONNECTED_INFOS)
55 .lock()
56 .iter()
57 .map(|(_, info)| info.clone())
58 .collect::<Vec<_>>();
59 if connected_infos.is_empty() {
60 ConnInfo::Connecting
61 } else {
62 ConnInfo::Connected {
63 sessions: connected_infos,
64 }
65 }
66 }
67
68 async fn stat_num(&self, stat: String) -> f64 {
69 stat_get_num(&self.ctx, &stat)
70 }
71
72 async fn start_time(&self) -> SystemTime {
73 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
74 *self.ctx.get(START_TIME)
75 }
76
77 async fn stop(&self) {
78 std::thread::spawn(move || {
79 std::thread::sleep(Duration::from_millis(100));
80 std::process::exit(0);
81 });
82 }
83
84 async fn recent_logs(&self) -> Vec<String> {
85 get_json_logs(&self.ctx)
86 .await
87 .split("\n")
88 .map(|s| s.to_string())
89 .collect()
90 }
91
92 async fn broker_rpc(&self, method: String, params: Vec<Value>) -> Result<Value, String> {
93 let jrpc = JrpcRequest {
94 jsonrpc: "2.0".into(),
95 method,
96 params,
97 id: JrpcId::String(format!("req-{}", rand::random::<u128>())),
98 };
99 let resp = broker_client(&self.ctx)
100 .map_err(|e| format!("{:?}", e))?
101 .0
102 .call_raw(jrpc)
103 .await
104 .map_err(|e| format!("{:?}", e))?;
105 if let Some(err) = resp.error {
106 Err(err.message)
107 } else {
108 Ok(resp.result.unwrap_or(Value::Null))
109 }
110 }
111
112 async fn start_registration(&self) -> Result<usize, String> {
113 let (puzzle, difficulty) = broker_client(&self.ctx)
114 .map_err(|e| format!("{:?}", e))?
115 .get_puzzle()
116 .await
117 .map_err(|e| format!("{:?}", e))?;
118 tracing::debug!(puzzle, difficulty, "got puzzle");
119 let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
120 progress: 0.0,
121 secret: None,
122 });
123 let ctx = self.ctx.clone();
124 smolscale::spawn(async move {
125 loop {
126 let fallible = async {
127 let solution = {
128 let puzzle = puzzle.clone();
129 smol::unblock(move || {
130 solve_puzzle(&puzzle, difficulty, |progress| {
131 REGISTRATIONS.lock()[idx] = RegistrationProgress {
132 progress,
133 secret: None,
134 }
135 })
136 })
137 .await
138 };
139 let secret = broker_client(&ctx)?
140 .register_user_secret(puzzle.clone(), solution)
141 .await?
142 .map_err(|e| anyhow::anyhow!(e))?;
143 REGISTRATIONS.lock()[idx] = RegistrationProgress {
144 progress: 1.0,
145 secret: Some(secret.clone()),
146 };
147 anyhow::Ok(secret)
148 };
149 if let Err(err) = fallible.await {
150 tracing::warn!(err = debug(err), "restarting registration")
151 } else {
152 break;
153 }
154 }
155 })
156 .detach();
157 Ok(idx)
158 }
159
160 async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
161 tracing::debug!(idx, "polling registration");
162 let registers = REGISTRATIONS.lock();
163 registers
164 .get(idx)
165 .cloned()
166 .ok_or_else(|| "no such registration".to_string())
167 }
168
169 async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
170 if stat != "traffic" {
171 return Err(format!("bad: {stat}"));
172 }
173 Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
174 }
175
176 async fn net_status(&self) -> Result<NetStatus, String> {
177 let resp = get_net_status(&self.ctx)
178 .await
179 .map_err(|e| format!("{:?}", e))?;
180 Ok(resp)
181 }
182
183 async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
184 let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
185 let news = manifest["news"]
186 .as_array()
187 .ok_or_else(|| "No news array".to_string())?;
188
189 let mut out = Vec::new();
190
191 for item in news {
192 let important = item["important"].as_bool().unwrap_or_default();
193 let date_str = item["date"]
194 .as_str()
195 .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
196
197 let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
198 .map_err(|_| format!("Invalid date format: {}", date_str))?;
199
200 let naive_dt: NaiveDateTime = naive_date
201 .and_hms_opt(0, 0, 0)
202 .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
203 let date_unix = naive_dt.and_utc().timestamp() as u64;
204
205 let localized = item[&lang]
206 .as_object()
207 .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
208
209 let title = localized["title"]
210 .as_str()
211 .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
212 let contents = localized["contents"]
213 .as_str()
214 .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
215
216 out.push(NewsItem {
217 title: title.to_string(),
218 date_unix,
219 contents: contents.to_string(),
220 important,
221 });
222 }
223
224 Ok(out)
225 }
226
227 async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
228 get_update_manifest().await.map_err(|e| format!("{:?}", e))
229 }
230}
231
232pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
233
234#[async_trait]
235impl RpcTransport for DummyControlProtocolTransport {
236 type Error = Infallible;
237
238 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
239 Ok(self.0.respond_raw(req).await)
240 }
241}