Skip to main content

hinge_rs/
ws.rs

1use crate::errors::HingeError;
2use crate::models::SendbirdSyevEvent;
3use serde::{Deserialize, Serialize};
4use tokio::sync::{broadcast, mpsc};
5
6#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
7#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
8#[serde(rename_all = "camelCase", tag = "kind")]
9pub enum SendbirdWsEvent {
10    SessionKey {
11        key: String,
12    },
13    Read {
14        #[serde(default)]
15        req_id: Option<String>,
16        payload: serde_json::Value,
17    },
18    Typing {
19        event: SendbirdSyevEvent,
20    },
21    Ping {
22        payload: serde_json::Value,
23    },
24    Pong {
25        payload: serde_json::Value,
26    },
27    Close {
28        code: Option<u16>,
29        reason: Option<String>,
30    },
31    Raw {
32        frame: String,
33    },
34}
35
36pub struct SendbirdWsSubscription {
37    commands: mpsc::UnboundedSender<String>,
38    raw: broadcast::Receiver<String>,
39}
40
41impl SendbirdWsSubscription {
42    pub fn new(commands: mpsc::UnboundedSender<String>, raw: broadcast::Receiver<String>) -> Self {
43        Self { commands, raw }
44    }
45
46    pub fn commands(&self) -> mpsc::UnboundedSender<String> {
47        self.commands.clone()
48    }
49
50    pub fn resubscribe_raw(&self) -> broadcast::Receiver<String> {
51        self.raw.resubscribe()
52    }
53
54    pub async fn recv(&mut self) -> Result<SendbirdWsEvent, HingeError> {
55        let frame = self.recv_raw().await?;
56        parse_sendbird_ws_frame(&frame)
57    }
58
59    pub async fn recv_raw(&mut self) -> Result<String, HingeError> {
60        self.raw
61            .recv()
62            .await
63            .map_err(|err| HingeError::Http(format!("sendbird ws receive failed: {err}")))
64    }
65}
66
67pub fn parse_sendbird_ws_frame(frame: &str) -> Result<SendbirdWsEvent, HingeError> {
68    if let Some(rest) = frame.strip_prefix("__SESSION_KEY__:") {
69        return Ok(SendbirdWsEvent::SessionKey {
70            key: rest.to_string(),
71        });
72    }
73
74    if let Some(rest) = frame.strip_prefix("__SYEV__:") {
75        let event = serde_json::from_str::<SendbirdSyevEvent>(rest)
76            .map_err(|err| HingeError::Serde(err.to_string()))?;
77        return Ok(SendbirdWsEvent::Typing { event });
78    }
79
80    if let Some(rest) = frame.strip_prefix("__CLOSE__:") {
81        let mut parts = rest.splitn(2, ':');
82        let code = parts.next().and_then(|part| part.parse::<u16>().ok());
83        let reason = parts
84            .next()
85            .filter(|part| !part.is_empty())
86            .map(ToOwned::to_owned);
87        return Ok(SendbirdWsEvent::Close { code, reason });
88    }
89
90    parse_prefixed_json(frame)
91}
92
93pub(crate) fn sendbird_logi_session_key(payload: &serde_json::Value) -> Option<&str> {
94    payload
95        .get("key")
96        .or_else(|| payload.get("session_key"))
97        .and_then(|value| value.as_str())
98}
99
100fn parse_prefixed_json(frame: &str) -> Result<SendbirdWsEvent, HingeError> {
101    let Some(start) = frame.find('{') else {
102        return Ok(SendbirdWsEvent::Raw {
103            frame: frame.to_string(),
104        });
105    };
106    let (prefix, json) = frame.split_at(start);
107    let payload = serde_json::from_str::<serde_json::Value>(json)
108        .map_err(|err| HingeError::Serde(err.to_string()))?;
109
110    match prefix {
111        "LOGI" => Ok(SendbirdWsEvent::SessionKey {
112            key: sendbird_logi_session_key(&payload)
113                .unwrap_or_default()
114                .to_string(),
115        }),
116        "READ" => Ok(SendbirdWsEvent::Read {
117            req_id: payload
118                .get("req_id")
119                .and_then(|value| value.as_str())
120                .map(ToOwned::to_owned),
121            payload,
122        }),
123        "SYEV" => {
124            let event = serde_json::from_value::<SendbirdSyevEvent>(payload)
125                .map_err(|err| HingeError::Serde(err.to_string()))?;
126            Ok(SendbirdWsEvent::Typing { event })
127        }
128        "PING" => Ok(SendbirdWsEvent::Ping { payload }),
129        "PONG" => Ok(SendbirdWsEvent::Pong { payload }),
130        _ => Ok(SendbirdWsEvent::Raw {
131            frame: frame.to_string(),
132        }),
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn parses_logi_session_key() {
142        let event = parse_sendbird_ws_frame(r#"LOGI{"key":"session-key","user_id":"u1"}"#)
143            .expect("LOGI frame should parse");
144        assert_eq!(
145            event,
146            SendbirdWsEvent::SessionKey {
147                key: "session-key".to_string()
148            }
149        );
150    }
151
152    #[test]
153    fn parses_logi_session_key_alias() {
154        let event = parse_sendbird_ws_frame(r#"LOGI{"session_key":"session-key","user_id":"u1"}"#)
155            .expect("LOGI frame should parse");
156        assert_eq!(
157            event,
158            SendbirdWsEvent::SessionKey {
159                key: "session-key".to_string()
160            }
161        );
162    }
163
164    #[test]
165    fn parses_read_ack() {
166        let event = parse_sendbird_ws_frame(
167            r#"READ{"req_id":"r1","channel_id":1,"channel_url":"c","channel_type":"group"}"#,
168        )
169        .expect("READ frame should parse");
170        match event {
171            SendbirdWsEvent::Read { req_id, payload } => {
172                assert_eq!(req_id.as_deref(), Some("r1"));
173                assert_eq!(payload["channel_url"], "c");
174            }
175            other => panic!("unexpected event: {other:?}"),
176        }
177    }
178
179    #[test]
180    fn parses_typing_event() {
181        let event = parse_sendbird_ws_frame(
182            r#"SYEV{"cat":10900,"channel_url":"c","channel_type":"group","ts":1,"sts":1}"#,
183        )
184        .expect("SYEV frame should parse");
185        match event {
186            SendbirdWsEvent::Typing { event } => {
187                assert_eq!(event.cat, SendbirdSyevEvent::CATEGORY_TYPING_START);
188                assert_eq!(event.channel_url, "c");
189            }
190            other => panic!("unexpected event: {other:?}"),
191        }
192    }
193
194    #[test]
195    fn parses_ping_pong_and_close() {
196        let ping =
197            parse_sendbird_ws_frame(r#"PING{"req_id":"p1"}"#).expect("PING frame should parse");
198        assert!(matches!(ping, SendbirdWsEvent::Ping { .. }));
199
200        let pong =
201            parse_sendbird_ws_frame(r#"PONG{"req_id":"p1"}"#).expect("PONG frame should parse");
202        assert!(matches!(pong, SendbirdWsEvent::Pong { .. }));
203
204        let close =
205            parse_sendbird_ws_frame("__CLOSE__:1000:done").expect("close sentinel should parse");
206        assert_eq!(
207            close,
208            SendbirdWsEvent::Close {
209                code: Some(1000),
210                reason: Some("done".to_string())
211            }
212        );
213    }
214
215    #[test]
216    fn keeps_unknown_frames_raw() {
217        let event = parse_sendbird_ws_frame("NOPE{}").expect("unknown frame should parse");
218        assert_eq!(
219            event,
220            SendbirdWsEvent::Raw {
221                frame: "NOPE{}".to_string()
222            }
223        );
224    }
225}