1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13pub struct LiveChatClient {
40 ws: WsStream,
41}
42
43impl std::fmt::Debug for LiveChatClient {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("LiveChatClient").finish_non_exhaustive()
46 }
47}
48
49impl LiveChatClient {
50 pub async fn connect_by_username(username: &str) -> Result<Self> {
68 let chatroom_id = lookup_chatroom_id(username).await?;
69 Self::connect(chatroom_id).await
70 }
71
72 pub async fn connect(chatroom_id: u64) -> Result<Self> {
81 let channel = format!("chatrooms.{chatroom_id}.v2");
82
83 let (mut ws, _) = connect_async(PUSHER_URL)
84 .await
85 .map_err(KickApiError::WebSocketError)?;
86
87 wait_for_event(&mut ws, "pusher:connection_established").await?;
89
90 let subscribe = serde_json::json!({
92 "event": "pusher:subscribe",
93 "data": {
94 "auth": "",
95 "channel": channel,
96 }
97 });
98 ws.send(Message::Text(subscribe.to_string().into()))
99 .await
100 .map_err(KickApiError::WebSocketError)?;
101
102 wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
104
105 Ok(Self { ws })
106 }
107
108 pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
114 loop {
115 let Some(frame) = self.ws.next().await else {
116 return Ok(None);
117 };
118
119 let frame = frame.map_err(KickApiError::WebSocketError)?;
120
121 let text = match frame {
122 Message::Text(t) => t,
123 Message::Close(_) => return Ok(None),
124 Message::Ping(data) => {
125 self.ws
126 .send(Message::Pong(data))
127 .await
128 .map_err(KickApiError::WebSocketError)?;
129 continue;
130 }
131 _ => continue,
132 };
133
134 let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
135 Ok(m) => m,
136 Err(_) => continue,
137 };
138
139 if pusher_msg.event == "pusher:ping" {
141 let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
142 self.ws
143 .send(Message::Text(pong.to_string().into()))
144 .await
145 .map_err(KickApiError::WebSocketError)?;
146 continue;
147 }
148
149 if pusher_msg.event.starts_with("pusher:")
151 || pusher_msg.event.starts_with("pusher_internal:")
152 {
153 continue;
154 }
155
156 return Ok(Some(PusherEvent {
157 event: pusher_msg.event,
158 channel: pusher_msg.channel,
159 data: pusher_msg.data,
160 }));
161 }
162 }
163
164 pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
170 loop {
171 let Some(event) = self.next_event().await? else {
172 return Ok(None);
173 };
174
175 if event.event != "App\\Events\\ChatMessageEvent" {
176 continue;
177 }
178
179 let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
181 Ok(m) => m,
182 Err(_) => continue,
183 };
184
185 return Ok(Some(msg));
186 }
187 }
188
189 pub async fn send_ping(&mut self) -> Result<()> {
191 let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
192 self.ws
193 .send(Message::Text(ping.to_string().into()))
194 .await
195 .map_err(KickApiError::WebSocketError)?;
196 Ok(())
197 }
198
199 pub async fn close(&mut self) -> Result<()> {
201 self.ws
202 .close(None)
203 .await
204 .map_err(KickApiError::WebSocketError)?;
205 Ok(())
206 }
207}
208
209async fn lookup_chatroom_id(username: &str) -> Result<u64> {
216 let url = format!("https://kick.com/api/v2/channels/{}", username);
217
218 let output = tokio::process::Command::new("curl")
219 .args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url])
220 .output()
221 .await
222 .map_err(|e| KickApiError::UnexpectedError(format!(
223 "Failed to run curl (is it installed?): {}", e
224 )))?;
225
226 if !output.status.success() {
227 return Err(KickApiError::ApiError(format!(
228 "curl failed for channel '{}': exit code {:?}",
229 username,
230 output.status.code()
231 )));
232 }
233
234 let body: serde_json::Value = serde_json::from_slice(&output.stdout)
235 .map_err(|e| KickApiError::ApiError(format!(
236 "Failed to parse channel response for '{}': {}", username, e
237 )))?;
238
239 body["chatroom"]["id"]
240 .as_u64()
241 .ok_or_else(|| KickApiError::ApiError(format!(
242 "Channel '{}' not found or missing chatroom ID",
243 username
244 )))
245}
246
247async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
249 loop {
250 let Some(frame) = ws.next().await else {
251 return Err(KickApiError::UnexpectedError(format!(
252 "Connection closed while waiting for '{event_name}'"
253 )));
254 };
255
256 let frame = frame.map_err(KickApiError::WebSocketError)?;
257
258 let text = match frame {
259 Message::Text(t) => t,
260 Message::Ping(data) => {
261 ws.send(Message::Pong(data))
262 .await
263 .map_err(KickApiError::WebSocketError)?;
264 continue;
265 }
266 _ => continue,
267 };
268
269 let msg: PusherMessage = match serde_json::from_str(&text) {
270 Ok(m) => m,
271 Err(_) => continue,
272 };
273
274 if msg.event == event_name {
275 return Ok(());
276 }
277 }
278}