1use crate::errors::HingeError;
2use crate::models::SendbirdSyevEvent;
3use serde::{Deserialize, Serialize};
4use tokio::sync::{broadcast, mpsc};
5
6#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
7#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
8#[serde(rename_all = "camelCase", tag = "kind")]
9pub enum SendbirdWsEvent {
10 SessionKey {
11 key: String,
12 },
13 Read {
14 #[serde(default)]
15 req_id: Option<String>,
16 payload: serde_json::Value,
17 },
18 Typing {
19 event: SendbirdSyevEvent,
20 },
21 Ping {
22 payload: serde_json::Value,
23 },
24 Pong {
25 payload: serde_json::Value,
26 },
27 Close {
28 code: Option<u16>,
29 reason: Option<String>,
30 },
31 Raw {
32 frame: String,
33 },
34}
35
36pub struct SendbirdWsSubscription {
37 commands: mpsc::UnboundedSender<String>,
38 raw: broadcast::Receiver<String>,
39}
40
41impl SendbirdWsSubscription {
42 pub fn new(commands: mpsc::UnboundedSender<String>, raw: broadcast::Receiver<String>) -> Self {
43 Self { commands, raw }
44 }
45
46 pub fn commands(&self) -> mpsc::UnboundedSender<String> {
47 self.commands.clone()
48 }
49
50 pub fn resubscribe_raw(&self) -> broadcast::Receiver<String> {
51 self.raw.resubscribe()
52 }
53
54 pub async fn recv(&mut self) -> Result<SendbirdWsEvent, HingeError> {
55 let frame = self.recv_raw().await?;
56 parse_sendbird_ws_frame(&frame)
57 }
58
59 pub async fn recv_raw(&mut self) -> Result<String, HingeError> {
60 self.raw
61 .recv()
62 .await
63 .map_err(|err| HingeError::Http(format!("sendbird ws receive failed: {err}")))
64 }
65}
66
67pub fn parse_sendbird_ws_frame(frame: &str) -> Result<SendbirdWsEvent, HingeError> {
68 if let Some(rest) = frame.strip_prefix("__SESSION_KEY__:") {
69 return Ok(SendbirdWsEvent::SessionKey {
70 key: rest.to_string(),
71 });
72 }
73
74 if let Some(rest) = frame.strip_prefix("__SYEV__:") {
75 let event = serde_json::from_str::<SendbirdSyevEvent>(rest)
76 .map_err(|err| HingeError::Serde(err.to_string()))?;
77 return Ok(SendbirdWsEvent::Typing { event });
78 }
79
80 if let Some(rest) = frame.strip_prefix("__CLOSE__:") {
81 let mut parts = rest.splitn(2, ':');
82 let code = parts.next().and_then(|part| part.parse::<u16>().ok());
83 let reason = parts
84 .next()
85 .filter(|part| !part.is_empty())
86 .map(ToOwned::to_owned);
87 return Ok(SendbirdWsEvent::Close { code, reason });
88 }
89
90 parse_prefixed_json(frame)
91}
92
93pub(crate) fn sendbird_logi_session_key(payload: &serde_json::Value) -> Option<&str> {
94 payload
95 .get("key")
96 .or_else(|| payload.get("session_key"))
97 .and_then(|value| value.as_str())
98}
99
100fn parse_prefixed_json(frame: &str) -> Result<SendbirdWsEvent, HingeError> {
101 let Some(start) = frame.find('{') else {
102 return Ok(SendbirdWsEvent::Raw {
103 frame: frame.to_string(),
104 });
105 };
106 let (prefix, json) = frame.split_at(start);
107 let payload = serde_json::from_str::<serde_json::Value>(json)
108 .map_err(|err| HingeError::Serde(err.to_string()))?;
109
110 match prefix {
111 "LOGI" => Ok(SendbirdWsEvent::SessionKey {
112 key: sendbird_logi_session_key(&payload)
113 .unwrap_or_default()
114 .to_string(),
115 }),
116 "READ" => Ok(SendbirdWsEvent::Read {
117 req_id: payload
118 .get("req_id")
119 .and_then(|value| value.as_str())
120 .map(ToOwned::to_owned),
121 payload,
122 }),
123 "SYEV" => {
124 let event = serde_json::from_value::<SendbirdSyevEvent>(payload)
125 .map_err(|err| HingeError::Serde(err.to_string()))?;
126 Ok(SendbirdWsEvent::Typing { event })
127 }
128 "PING" => Ok(SendbirdWsEvent::Ping { payload }),
129 "PONG" => Ok(SendbirdWsEvent::Pong { payload }),
130 _ => Ok(SendbirdWsEvent::Raw {
131 frame: frame.to_string(),
132 }),
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn parses_logi_session_key() {
142 let event = parse_sendbird_ws_frame(r#"LOGI{"key":"session-key","user_id":"u1"}"#)
143 .expect("LOGI frame should parse");
144 assert_eq!(
145 event,
146 SendbirdWsEvent::SessionKey {
147 key: "session-key".to_string()
148 }
149 );
150 }
151
152 #[test]
153 fn parses_logi_session_key_alias() {
154 let event = parse_sendbird_ws_frame(r#"LOGI{"session_key":"session-key","user_id":"u1"}"#)
155 .expect("LOGI frame should parse");
156 assert_eq!(
157 event,
158 SendbirdWsEvent::SessionKey {
159 key: "session-key".to_string()
160 }
161 );
162 }
163
164 #[test]
165 fn parses_read_ack() {
166 let event = parse_sendbird_ws_frame(
167 r#"READ{"req_id":"r1","channel_id":1,"channel_url":"c","channel_type":"group"}"#,
168 )
169 .expect("READ frame should parse");
170 match event {
171 SendbirdWsEvent::Read { req_id, payload } => {
172 assert_eq!(req_id.as_deref(), Some("r1"));
173 assert_eq!(payload["channel_url"], "c");
174 }
175 other => panic!("unexpected event: {other:?}"),
176 }
177 }
178
179 #[test]
180 fn parses_typing_event() {
181 let event = parse_sendbird_ws_frame(
182 r#"SYEV{"cat":10900,"channel_url":"c","channel_type":"group","ts":1,"sts":1}"#,
183 )
184 .expect("SYEV frame should parse");
185 match event {
186 SendbirdWsEvent::Typing { event } => {
187 assert_eq!(event.cat, SendbirdSyevEvent::CATEGORY_TYPING_START);
188 assert_eq!(event.channel_url, "c");
189 }
190 other => panic!("unexpected event: {other:?}"),
191 }
192 }
193
194 #[test]
195 fn parses_ping_pong_and_close() {
196 let ping =
197 parse_sendbird_ws_frame(r#"PING{"req_id":"p1"}"#).expect("PING frame should parse");
198 assert!(matches!(ping, SendbirdWsEvent::Ping { .. }));
199
200 let pong =
201 parse_sendbird_ws_frame(r#"PONG{"req_id":"p1"}"#).expect("PONG frame should parse");
202 assert!(matches!(pong, SendbirdWsEvent::Pong { .. }));
203
204 let close =
205 parse_sendbird_ws_frame("__CLOSE__:1000:done").expect("close sentinel should parse");
206 assert_eq!(
207 close,
208 SendbirdWsEvent::Close {
209 code: Some(1000),
210 reason: Some("done".to_string())
211 }
212 );
213 }
214
215 #[test]
216 fn keeps_unknown_frames_raw() {
217 let event = parse_sendbird_ws_frame("NOPE{}").expect("unknown frame should parse");
218 assert_eq!(
219 event,
220 SendbirdWsEvent::Raw {
221 frame: "NOPE{}".to_string()
222 }
223 );
224 }
225}