Skip to main content

geph5_client/
control_prot.rs

1use std::{
2    convert::Infallible,
3    sync::LazyLock,
4    time::{Duration, SystemTime},
5};
6
7use anyctx::AnyCtx;
8use arrayref::array_ref;
9use async_trait::async_trait;
10use chrono::{NaiveDate, NaiveDateTime};
11use geph5_broker_protocol::{NetStatus, puzzle::solve_puzzle};
12
13use geph5_misc_rpc::client_control::{
14    ConnInfo, ConnectedInfo, ControlProtocol, ControlService, NewsItem, RegistrationProgress,
15};
16use nanorpc::{JrpcId, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
17use parking_lot::Mutex;
18
19use serde_json::{Value, json};
20use slab::Slab;
21
22use crate::{
23    Config, broker::get_net_status, broker_client, client::CtxField, logging::get_json_logs,
24    stats::stat_get_num, traffcount::TRAFF_COUNT, updates::get_update_manifest,
25};
26
27pub struct ControlProtocolImpl {
28    pub ctx: AnyCtx<Config>,
29}
30
31pub static CURRENT_CONNECTED_INFOS: CtxField<Mutex<Slab<ConnectedInfo>>> =
32    |_| Mutex::new(Slab::new());
33
34static REGISTRATIONS: LazyLock<Mutex<Slab<RegistrationProgress>>> =
35    LazyLock::new(|| Mutex::new(Slab::new()));
36
37#[async_trait]
38impl ControlProtocol for ControlProtocolImpl {
39    async fn ab_test(&self, key: String, secret: String) -> Result<bool, String> {
40        let id = blake3::hash(secret.as_bytes());
41        let id = u64::from_be_bytes(*array_ref![id.as_bytes(), 0, 8]);
42        let res = self
43            .broker_rpc("opaque_abtest".into(), vec![json!(key), json!(id)])
44            .await?;
45        serde_json::from_value(res).map_err(|e| e.to_string())
46    }
47
48    async fn conn_info(&self) -> ConnInfo {
49        if self.ctx.init().dry_run {
50            return ConnInfo::Disconnected;
51        }
52        let connected_infos = self
53            .ctx
54            .get(CURRENT_CONNECTED_INFOS)
55            .lock()
56            .iter()
57            .map(|(_, info)| info.clone())
58            .collect::<Vec<_>>();
59        if connected_infos.is_empty() {
60            ConnInfo::Connecting
61        } else {
62            ConnInfo::Connected {
63                sessions: connected_infos,
64            }
65        }
66    }
67
68    async fn stat_num(&self, stat: String) -> f64 {
69        stat_get_num(&self.ctx, &stat)
70    }
71
72    async fn start_time(&self) -> SystemTime {
73        static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
74        *self.ctx.get(START_TIME)
75    }
76
77    async fn stop(&self) {
78        std::thread::spawn(move || {
79            std::thread::sleep(Duration::from_millis(100));
80            std::process::exit(0);
81        });
82    }
83
84    async fn recent_logs(&self) -> Vec<String> {
85        get_json_logs(&self.ctx)
86            .await
87            .split("\n")
88            .map(|s| s.to_string())
89            .collect()
90    }
91
92    async fn broker_rpc(&self, method: String, params: Vec<Value>) -> Result<Value, String> {
93        let jrpc = JrpcRequest {
94            jsonrpc: "2.0".into(),
95            method,
96            params,
97            id: JrpcId::String(format!("req-{}", rand::random::<u128>())),
98        };
99        let resp = broker_client(&self.ctx)
100            .map_err(|e| format!("{:?}", e))?
101            .0
102            .call_raw(jrpc)
103            .await
104            .map_err(|e| format!("{:?}", e))?;
105        if let Some(err) = resp.error {
106            Err(err.message)
107        } else {
108            Ok(resp.result.unwrap_or(Value::Null))
109        }
110    }
111
112    async fn start_registration(&self) -> Result<usize, String> {
113        let (puzzle, difficulty) = broker_client(&self.ctx)
114            .map_err(|e| format!("{:?}", e))?
115            .get_puzzle()
116            .await
117            .map_err(|e| format!("{:?}", e))?;
118        tracing::debug!(puzzle, difficulty, "got puzzle");
119        let idx = REGISTRATIONS.lock().insert(RegistrationProgress {
120            progress: 0.0,
121            secret: None,
122        });
123        let ctx = self.ctx.clone();
124        smolscale::spawn(async move {
125            loop {
126                let fallible = async {
127                    let solution = {
128                        let puzzle = puzzle.clone();
129                        smol::unblock(move || {
130                            solve_puzzle(&puzzle, difficulty, |progress| {
131                                REGISTRATIONS.lock()[idx] = RegistrationProgress {
132                                    progress,
133                                    secret: None,
134                                }
135                            })
136                        })
137                        .await
138                    };
139                    let secret = broker_client(&ctx)?
140                        .register_user_secret(puzzle.clone(), solution)
141                        .await?
142                        .map_err(|e| anyhow::anyhow!(e))?;
143                    REGISTRATIONS.lock()[idx] = RegistrationProgress {
144                        progress: 1.0,
145                        secret: Some(secret.clone()),
146                    };
147                    anyhow::Ok(secret)
148                };
149                if let Err(err) = fallible.await {
150                    tracing::warn!(err = debug(err), "restarting registration")
151                } else {
152                    break;
153                }
154            }
155        })
156        .detach();
157        Ok(idx)
158    }
159
160    async fn poll_registration(&self, idx: usize) -> Result<RegistrationProgress, String> {
161        tracing::debug!(idx, "polling registration");
162        let registers = REGISTRATIONS.lock();
163        registers
164            .get(idx)
165            .cloned()
166            .ok_or_else(|| "no such registration".to_string())
167    }
168
169    async fn stat_history(&self, stat: String) -> Result<Vec<f64>, String> {
170        if stat != "traffic" {
171            return Err(format!("bad: {stat}"));
172        }
173        Ok(self.ctx.get(TRAFF_COUNT).read().unwrap().speed_history())
174    }
175
176    async fn net_status(&self) -> Result<NetStatus, String> {
177        let resp = get_net_status(&self.ctx)
178            .await
179            .map_err(|e| format!("{:?}", e))?;
180        Ok(resp)
181    }
182
183    async fn latest_news(&self, lang: String) -> Result<Vec<NewsItem>, String> {
184        let (manifest, _) = get_update_manifest().await.map_err(|e| e.to_string())?;
185        let news = manifest["news"]
186            .as_array()
187            .ok_or_else(|| "No news array".to_string())?;
188
189        let mut out = Vec::new();
190
191        for item in news {
192            let important = item["important"].as_bool().unwrap_or_default();
193            let date_str = item["date"]
194                .as_str()
195                .ok_or_else(|| "No or invalid 'date' field in news item".to_string())?;
196
197            let naive_date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
198                .map_err(|_| format!("Invalid date format: {}", date_str))?;
199
200            let naive_dt: NaiveDateTime = naive_date
201                .and_hms_opt(0, 0, 0)
202                .ok_or_else(|| "Unable to create NaiveDateTime from date".to_string())?;
203            let date_unix = naive_dt.and_utc().timestamp() as u64;
204
205            let localized = item[&lang]
206                .as_object()
207                .ok_or_else(|| format!("No localized data for language '{}'", lang))?;
208
209            let title = localized["title"]
210                .as_str()
211                .ok_or_else(|| "Missing 'title' in localized news data".to_string())?;
212            let contents = localized["contents"]
213                .as_str()
214                .ok_or_else(|| "Missing 'contents' in localized news data".to_string())?;
215
216            out.push(NewsItem {
217                title: title.to_string(),
218                date_unix,
219                contents: contents.to_string(),
220                important,
221            });
222        }
223
224        Ok(out)
225    }
226
227    async fn get_update_manifest(&self) -> Result<(serde_json::Value, String), String> {
228        get_update_manifest().await.map_err(|e| format!("{:?}", e))
229    }
230}
231
232pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
233
234#[async_trait]
235impl RpcTransport for DummyControlProtocolTransport {
236    type Error = Infallible;
237
238    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
239        Ok(self.0.respond_raw(req).await)
240    }
241}