geph5_client/
control_prot.rs1use std::{
2 convert::Infallible,
3 time::{Duration, SystemTime},
4};
5
6use anyctx::AnyCtx;
7use async_trait::async_trait;
8use geph5_broker_protocol::ExitDescriptor;
9
10use itertools::Itertools;
11use nanorpc::{nanorpc_derive, JrpcRequest, JrpcResponse, RpcService, RpcTransport};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14
15use crate::{client::CtxField, logs::LOGS, stats::stat_get_num, Config};
16
17#[nanorpc_derive]
18#[async_trait]
19pub trait ControlProtocol {
20 async fn conn_info(&self) -> ConnInfo;
21 async fn stat_num(&self, stat: String) -> f64;
22 async fn start_time(&self) -> SystemTime;
23 async fn stop(&self);
24
25 async fn recent_logs(&self) -> Vec<String>;
26}
27
28#[derive(Serialize, Deserialize, Clone, Debug)]
29#[serde(tag = "state")]
30pub enum ConnInfo {
31 Connecting,
32 Connected(ConnectedInfo),
33}
34
35#[derive(Serialize, Deserialize, Clone, Debug)]
36pub struct ConnectedInfo {
37 pub protocol: String,
38 pub bridge: String,
39
40 pub exit: ExitDescriptor,
41}
42
43pub struct ControlProtocolImpl {
44 pub ctx: AnyCtx<Config>,
45}
46
47pub static CURRENT_CONN_INFO: CtxField<Mutex<ConnInfo>> = |_| Mutex::new(ConnInfo::Connecting);
48
49#[async_trait]
50impl ControlProtocol for ControlProtocolImpl {
51 async fn conn_info(&self) -> ConnInfo {
52 self.ctx.get(CURRENT_CONN_INFO).lock().clone()
53 }
54
55 async fn stat_num(&self, stat: String) -> f64 {
56 stat_get_num(&self.ctx, &stat)
57 }
58
59 async fn start_time(&self) -> SystemTime {
60 static START_TIME: CtxField<SystemTime> = |_| SystemTime::now();
61 *self.ctx.get(START_TIME)
62 }
63
64 async fn stop(&self) {
65 smolscale::spawn(async move {
66 smol::Timer::after(Duration::from_millis(100)).await;
67 std::process::exit(0);
68 })
69 .detach();
70 }
71
72 async fn recent_logs(&self) -> Vec<String> {
73 let logs = LOGS.lock();
74 String::from_utf8_lossy(&logs)
75 .split('\n')
76 .map(|s| s.to_string())
77 .collect_vec()
78 }
79}
80
81pub struct DummyControlProtocolTransport(pub ControlService<ControlProtocolImpl>);
82
83#[async_trait]
84impl RpcTransport for DummyControlProtocolTransport {
85 type Error = Infallible;
86
87 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
88 Ok(self.0.respond_raw(req).await)
89 }
90}