1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13pub struct LiveChatClient {
35 ws: WsStream,
36}
37
38impl std::fmt::Debug for LiveChatClient {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("LiveChatClient").finish_non_exhaustive()
41 }
42}
43
44impl LiveChatClient {
45 pub async fn connect(chatroom_id: u64) -> Result<Self> {
54 let channel = format!("chatrooms.{chatroom_id}.v2");
55
56 let (mut ws, _) = connect_async(PUSHER_URL)
57 .await
58 .map_err(KickApiError::WebSocketError)?;
59
60 wait_for_event(&mut ws, "pusher:connection_established").await?;
62
63 let subscribe = serde_json::json!({
65 "event": "pusher:subscribe",
66 "data": {
67 "auth": "",
68 "channel": channel,
69 }
70 });
71 ws.send(Message::Text(subscribe.to_string().into()))
72 .await
73 .map_err(KickApiError::WebSocketError)?;
74
75 wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
77
78 Ok(Self { ws })
79 }
80
81 pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
87 loop {
88 let Some(frame) = self.ws.next().await else {
89 return Ok(None);
90 };
91
92 let frame = frame.map_err(KickApiError::WebSocketError)?;
93
94 let text = match frame {
95 Message::Text(t) => t,
96 Message::Close(_) => return Ok(None),
97 Message::Ping(data) => {
98 self.ws
99 .send(Message::Pong(data))
100 .await
101 .map_err(KickApiError::WebSocketError)?;
102 continue;
103 }
104 _ => continue,
105 };
106
107 let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
108 Ok(m) => m,
109 Err(_) => continue,
110 };
111
112 if pusher_msg.event == "pusher:ping" {
114 let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
115 self.ws
116 .send(Message::Text(pong.to_string().into()))
117 .await
118 .map_err(KickApiError::WebSocketError)?;
119 continue;
120 }
121
122 if pusher_msg.event.starts_with("pusher:")
124 || pusher_msg.event.starts_with("pusher_internal:")
125 {
126 continue;
127 }
128
129 return Ok(Some(PusherEvent {
130 event: pusher_msg.event,
131 channel: pusher_msg.channel,
132 data: pusher_msg.data,
133 }));
134 }
135 }
136
137 pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
143 loop {
144 let Some(event) = self.next_event().await? else {
145 return Ok(None);
146 };
147
148 if event.event != "App\\Events\\ChatMessageEvent" {
149 continue;
150 }
151
152 let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
154 Ok(m) => m,
155 Err(_) => continue,
156 };
157
158 return Ok(Some(msg));
159 }
160 }
161
162 pub async fn send_ping(&mut self) -> Result<()> {
164 let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
165 self.ws
166 .send(Message::Text(ping.to_string().into()))
167 .await
168 .map_err(KickApiError::WebSocketError)?;
169 Ok(())
170 }
171
172 pub async fn close(&mut self) -> Result<()> {
174 self.ws
175 .close(None)
176 .await
177 .map_err(KickApiError::WebSocketError)?;
178 Ok(())
179 }
180}
181
182async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
184 loop {
185 let Some(frame) = ws.next().await else {
186 return Err(KickApiError::UnexpectedError(format!(
187 "Connection closed while waiting for '{event_name}'"
188 )));
189 };
190
191 let frame = frame.map_err(KickApiError::WebSocketError)?;
192
193 let text = match frame {
194 Message::Text(t) => t,
195 Message::Ping(data) => {
196 ws.send(Message::Pong(data))
197 .await
198 .map_err(KickApiError::WebSocketError)?;
199 continue;
200 }
201 _ => continue,
202 };
203
204 let msg: PusherMessage = match serde_json::from_str(&text) {
205 Ok(m) => m,
206 Err(_) => continue,
207 };
208
209 if msg.event == event_name {
210 return Ok(());
211 }
212 }
213}