feature_probe_server/
realtime.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use futures::{Future, FutureExt};
5use serde_json::Value;
6use socketio_rs::{Payload, Server, ServerBuilder, ServerSocket};
7use tracing::{info, trace, warn};
8
9type SocketCallback = Pin<Box<dyn Future<Output = ()> + Send>>;
10
11#[derive(Clone)]
12pub struct RealtimeSocket {
13 server: Arc<Server>,
14 port: u16,
15 path: Arc<String>,
18}
19
20impl RealtimeSocket {
21 pub fn serve(port: u16, path: &str) -> Self {
22 info!("serve_socektio on port {}", port);
23 let callback =
24 |payload: Option<Payload>, socket: ServerSocket, _| Self::register(payload, socket);
25
26 let server = ServerBuilder::new(port)
27 .on(path, "register", callback)
28 .build();
29
30 let server_clone = server.clone();
31
32 tokio::spawn(async move {
33 server_clone.serve().await;
34 });
35
36 let path = Arc::new(path.to_owned());
37
38 Self { server, port, path }
39 }
40
41 pub async fn notify_sdk(
42 &self,
43 server_sdk_key: String,
44 client_sdk_key: Option<String>,
45 event: &str,
46 data: serde_json::Value,
47 ) {
48 trace!(
49 "notify_sdk {} {:?} {} {:?}",
50 server_sdk_key,
51 client_sdk_key,
52 event,
53 data
54 );
55
56 let mut keys: Vec<&str> = vec![&server_sdk_key];
57 if let Some(client_sdk_key) = &client_sdk_key {
58 keys.push(client_sdk_key);
59 }
60
61 self.server.emit_to(&self.path, keys, event, data).await
62 }
63
64 fn register(payload: Option<Payload>, socket: ServerSocket) -> SocketCallback {
65 async move {
66 info!("socketio recv {:?}", payload);
67 if let Some(Payload::Json(value)) = payload {
68 match value.get("key") {
69 Some(Value::String(sdk_key)) => socket.join(vec![sdk_key]).await,
70 _ => {
71 warn!("unkown register payload")
72 }
73 }
74 }
75
76 let _ = socket.emit("update", serde_json::json!("")).await;
77 }
78 .boxed()
79 }
80}
81
82impl std::fmt::Debug for RealtimeSocket {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_tuple("RealtimeSocket").field(&self.port).finish()
85 }
86}