1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13pub struct LiveChatClient {
35 ws: WsStream,
36}
37
38impl LiveChatClient {
39 pub async fn connect(chatroom_id: u64) -> Result<Self> {
48 let channel = format!("chatrooms.{chatroom_id}.v2");
49
50 let (mut ws, _) = connect_async(PUSHER_URL)
51 .await
52 .map_err(KickApiError::WebSocketError)?;
53
54 wait_for_event(&mut ws, "pusher:connection_established").await?;
56
57 let subscribe = serde_json::json!({
59 "event": "pusher:subscribe",
60 "data": {
61 "auth": "",
62 "channel": channel,
63 }
64 });
65 ws.send(Message::Text(subscribe.to_string().into()))
66 .await
67 .map_err(KickApiError::WebSocketError)?;
68
69 wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
71
72 Ok(Self { ws })
73 }
74
75 pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
81 loop {
82 let Some(frame) = self.ws.next().await else {
83 return Ok(None);
84 };
85
86 let frame = frame.map_err(KickApiError::WebSocketError)?;
87
88 let text = match frame {
89 Message::Text(t) => t,
90 Message::Close(_) => return Ok(None),
91 Message::Ping(data) => {
92 self.ws
93 .send(Message::Pong(data))
94 .await
95 .map_err(KickApiError::WebSocketError)?;
96 continue;
97 }
98 _ => continue,
99 };
100
101 let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
102 Ok(m) => m,
103 Err(_) => continue,
104 };
105
106 if pusher_msg.event == "pusher:ping" {
108 let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
109 self.ws
110 .send(Message::Text(pong.to_string().into()))
111 .await
112 .map_err(KickApiError::WebSocketError)?;
113 continue;
114 }
115
116 if pusher_msg.event.starts_with("pusher:")
118 || pusher_msg.event.starts_with("pusher_internal:")
119 {
120 continue;
121 }
122
123 return Ok(Some(PusherEvent {
124 event: pusher_msg.event,
125 channel: pusher_msg.channel,
126 data: pusher_msg.data,
127 }));
128 }
129 }
130
131 pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
137 loop {
138 let Some(event) = self.next_event().await? else {
139 return Ok(None);
140 };
141
142 if event.event != "App\\Events\\ChatMessageEvent" {
143 continue;
144 }
145
146 let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
148 Ok(m) => m,
149 Err(_) => continue,
150 };
151
152 return Ok(Some(msg));
153 }
154 }
155
156 pub async fn send_ping(&mut self) -> Result<()> {
158 let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
159 self.ws
160 .send(Message::Text(ping.to_string().into()))
161 .await
162 .map_err(KickApiError::WebSocketError)?;
163 Ok(())
164 }
165
166 pub async fn close(&mut self) -> Result<()> {
168 self.ws
169 .close(None)
170 .await
171 .map_err(KickApiError::WebSocketError)?;
172 Ok(())
173 }
174}
175
176async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
178 loop {
179 let Some(frame) = ws.next().await else {
180 return Err(KickApiError::UnexpectedError(format!(
181 "Connection closed while waiting for '{event_name}'"
182 )));
183 };
184
185 let frame = frame.map_err(KickApiError::WebSocketError)?;
186
187 let text = match frame {
188 Message::Text(t) => t,
189 Message::Ping(data) => {
190 ws.send(Message::Pong(data))
191 .await
192 .map_err(KickApiError::WebSocketError)?;
193 continue;
194 }
195 _ => continue,
196 };
197
198 let msg: PusherMessage = match serde_json::from_str(&text) {
199 Ok(m) => m,
200 Err(_) => continue,
201 };
202
203 if msg.event == event_name {
204 return Ok(());
205 }
206 }
207}