feature_probe_server/
realtime.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use futures::{Future, FutureExt};
5use serde_json::Value;
6use socketio_rs::{Payload, Server, ServerBuilder, ServerSocket};
7use tracing::{info, trace, warn};
8
9type SocketCallback = Pin<Box<dyn Future<Output = ()> + Send>>;
10
11#[derive(Clone)]
12pub struct RealtimeSocket {
13    server: Arc<Server>,
14    port: u16,
15    // this path shoule be the same as gateway
16    // if nginx forward to {host}/{path}, so the PATH should be {path}
17    path: Arc<String>,
18}
19
20impl RealtimeSocket {
21    pub fn serve(port: u16, path: &str) -> Self {
22        info!("serve_socektio on port {}", port);
23        let callback =
24            |payload: Option<Payload>, socket: ServerSocket, _| Self::register(payload, socket);
25
26        let server = ServerBuilder::new(port)
27            .on(path, "register", callback)
28            .build();
29
30        let server_clone = server.clone();
31
32        tokio::spawn(async move {
33            server_clone.serve().await;
34        });
35
36        let path = Arc::new(path.to_owned());
37
38        Self { server, port, path }
39    }
40
41    pub async fn notify_sdk(
42        &self,
43        server_sdk_key: String,
44        client_sdk_key: Option<String>,
45        event: &str,
46        data: serde_json::Value,
47    ) {
48        trace!(
49            "notify_sdk {} {:?} {} {:?}",
50            server_sdk_key,
51            client_sdk_key,
52            event,
53            data
54        );
55
56        let mut keys: Vec<&str> = vec![&server_sdk_key];
57        if let Some(client_sdk_key) = &client_sdk_key {
58            keys.push(client_sdk_key);
59        }
60
61        self.server.emit_to(&self.path, keys, event, data).await
62    }
63
64    fn register(payload: Option<Payload>, socket: ServerSocket) -> SocketCallback {
65        async move {
66            info!("socketio recv {:?}", payload);
67            if let Some(Payload::Json(value)) = payload {
68                match value.get("key") {
69                    Some(Value::String(sdk_key)) => socket.join(vec![sdk_key]).await,
70                    _ => {
71                        warn!("unkown register payload")
72                    }
73                }
74            }
75
76            let _ = socket.emit("update", serde_json::json!("")).await;
77        }
78        .boxed()
79    }
80}
81
82impl std::fmt::Debug for RealtimeSocket {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_tuple("RealtimeSocket").field(&self.port).finish()
85    }
86}