kick_api/live_chat.rs
1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::ChannelInfo;
6use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
7
8const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
9
10type WsStream = tokio_tungstenite::WebSocketStream<
11 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
12>;
13
14/// Client for receiving live chat messages over Kick's Pusher WebSocket.
15///
16/// Connects to the public Pusher channel for a chatroom and yields chat
17/// messages in real time. **No authentication is required.**
18///
19/// # Connecting
20///
21/// There are two ways to connect:
22///
23/// - [`connect_by_username`](Self::connect_by_username) — pass a Kick username
24/// and the chatroom ID is resolved automatically (requires `curl` on PATH).
25/// - [`connect`](Self::connect) — pass a chatroom ID directly.
26///
27/// # Example
28///
29/// ```no_run
30/// use kick_api::LiveChatClient;
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
34/// while let Some(msg) = chat.next_message().await? {
35/// println!("{}: {}", msg.sender.username, msg.content);
36/// }
37/// # Ok(())
38/// # }
39/// ```
40pub struct LiveChatClient {
41 ws: WsStream,
42}
43
44impl std::fmt::Debug for LiveChatClient {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("LiveChatClient").finish_non_exhaustive()
47 }
48}
49
50impl LiveChatClient {
51 /// Connect to a chatroom by the channel's username/slug.
52 ///
53 /// Looks up the chatroom ID via Kick's public API and connects to the
54 /// WebSocket. No authentication is required.
55 ///
56 /// # Example
57 /// ```no_run
58 /// use kick_api::LiveChatClient;
59 ///
60 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
61 /// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
62 /// while let Some(msg) = chat.next_message().await? {
63 /// println!("{}: {}", msg.sender.username, msg.content);
64 /// }
65 /// # Ok(())
66 /// # }
67 /// ```
68 pub async fn connect_by_username(username: &str) -> Result<Self> {
69 let info = fetch_channel_info_inner(username).await?;
70 Self::connect(info.chatroom.id).await
71 }
72
73 /// Connect to a chatroom by its ID.
74 ///
75 /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
76 /// channel. No authentication is required.
77 ///
78 /// To find a channel's chatroom ID, visit
79 /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
80 /// `"chatroom":{"id":`.
81 pub async fn connect(chatroom_id: u64) -> Result<Self> {
82 let channel = format!("chatrooms.{chatroom_id}.v2");
83
84 let (mut ws, _) = connect_async(PUSHER_URL)
85 .await
86 .map_err(KickApiError::WebSocketError)?;
87
88 // Wait for pusher:connection_established
89 wait_for_event(&mut ws, "pusher:connection_established").await?;
90
91 // Subscribe to the chatroom channel
92 let subscribe = serde_json::json!({
93 "event": "pusher:subscribe",
94 "data": {
95 "auth": "",
96 "channel": channel,
97 }
98 });
99 ws.send(Message::Text(subscribe.to_string().into()))
100 .await
101 .map_err(KickApiError::WebSocketError)?;
102
103 // Wait for subscription confirmation
104 wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
105
106 Ok(Self { ws })
107 }
108
109 /// Receive the next raw Pusher event.
110 ///
111 /// Returns all events from the subscribed channel (chat messages, pins,
112 /// subs, bans, etc.). Automatically handles Pusher-level pings and
113 /// internal protocol events. Returns `None` if the connection is closed.
114 pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
115 loop {
116 let Some(frame) = self.ws.next().await else {
117 return Ok(None);
118 };
119
120 let frame = frame.map_err(KickApiError::WebSocketError)?;
121
122 let text = match frame {
123 Message::Text(t) => t,
124 Message::Close(_) => return Ok(None),
125 Message::Ping(data) => {
126 self.ws
127 .send(Message::Pong(data))
128 .await
129 .map_err(KickApiError::WebSocketError)?;
130 continue;
131 }
132 _ => continue,
133 };
134
135 let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
136 Ok(m) => m,
137 Err(_) => continue,
138 };
139
140 // Handle Pusher-level pings automatically
141 if pusher_msg.event == "pusher:ping" {
142 let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
143 self.ws
144 .send(Message::Text(pong.to_string().into()))
145 .await
146 .map_err(KickApiError::WebSocketError)?;
147 continue;
148 }
149
150 // Skip internal Pusher protocol events
151 if pusher_msg.event.starts_with("pusher:")
152 || pusher_msg.event.starts_with("pusher_internal:")
153 {
154 continue;
155 }
156
157 return Ok(Some(PusherEvent {
158 event: pusher_msg.event,
159 channel: pusher_msg.channel,
160 data: pusher_msg.data,
161 }));
162 }
163 }
164
165 /// Receive the next chat message.
166 ///
167 /// Blocks until a chat message arrives. Automatically handles Pusher-level
168 /// pings and skips non-chat events. Returns `None` if the connection is
169 /// closed.
170 pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
171 loop {
172 let Some(event) = self.next_event().await? else {
173 return Ok(None);
174 };
175
176 if event.event != "App\\Events\\ChatMessageEvent" {
177 continue;
178 }
179
180 // Data is double-encoded: outer JSON has `data` as a string
181 let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
182 Ok(m) => m,
183 Err(_) => continue,
184 };
185
186 return Ok(Some(msg));
187 }
188 }
189
190 /// Send a Pusher-level ping to keep the connection alive.
191 pub async fn send_ping(&mut self) -> Result<()> {
192 let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
193 self.ws
194 .send(Message::Text(ping.to_string().into()))
195 .await
196 .map_err(KickApiError::WebSocketError)?;
197 Ok(())
198 }
199
200 /// Close the WebSocket connection.
201 pub async fn close(&mut self) -> Result<()> {
202 self.ws
203 .close(None)
204 .await
205 .map_err(KickApiError::WebSocketError)?;
206 Ok(())
207 }
208}
209
210/// Fetch public channel information from Kick's v2 API.
211///
212/// Returns chatroom settings, subscriber badges, user profile, and livestream
213/// status for any channel. **No authentication required.**
214///
215/// This uses `curl` as a subprocess because Kick's Cloudflare protection blocks
216/// HTTP libraries based on TLS fingerprinting. `curl` ships with Windows 10+,
217/// macOS, and virtually all Linux distributions.
218///
219/// # Example
220///
221/// ```no_run
222/// use kick_api::fetch_channel_info;
223///
224/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
225/// let info = fetch_channel_info("xqc").await?;
226///
227/// // Chatroom settings
228/// println!("Chatroom ID: {}", info.chatroom.id);
229/// println!("Slow mode: {}", info.chatroom.slow_mode);
230/// println!("Followers only: {}", info.chatroom.followers_mode);
231///
232/// // Subscriber badges
233/// for badge in &info.subscriber_badges {
234/// println!("{}mo badge: {}", badge.months, badge.badge_image.src);
235/// }
236///
237/// // Livestream status
238/// if let Some(stream) = &info.livestream {
239/// println!("{} is live with {} viewers", info.slug, stream.viewer_count);
240/// }
241/// # Ok(())
242/// # }
243/// ```
244pub async fn fetch_channel_info(username: &str) -> Result<ChannelInfo> {
245 fetch_channel_info_inner(username).await
246}
247
248async fn fetch_channel_info_inner(username: &str) -> Result<ChannelInfo> {
249 let url = format!("https://kick.com/api/v2/channels/{}", username);
250
251 let output = tokio::process::Command::new("curl")
252 .args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url])
253 .output()
254 .await
255 .map_err(|e| KickApiError::UnexpectedError(format!(
256 "Failed to run curl (is it installed?): {}", e
257 )))?;
258
259 if !output.status.success() {
260 return Err(KickApiError::ApiError(format!(
261 "curl failed for channel '{}': exit code {:?}",
262 username,
263 output.status.code()
264 )));
265 }
266
267 let info: ChannelInfo = serde_json::from_slice(&output.stdout)
268 .map_err(|e| KickApiError::ApiError(format!(
269 "Failed to parse channel response for '{}': {}", username, e
270 )))?;
271
272 Ok(info)
273}
274
275/// Wait for a specific Pusher event on the WebSocket.
276async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
277 loop {
278 let Some(frame) = ws.next().await else {
279 return Err(KickApiError::UnexpectedError(format!(
280 "Connection closed while waiting for '{event_name}'"
281 )));
282 };
283
284 let frame = frame.map_err(KickApiError::WebSocketError)?;
285
286 let text = match frame {
287 Message::Text(t) => t,
288 Message::Ping(data) => {
289 ws.send(Message::Pong(data))
290 .await
291 .map_err(KickApiError::WebSocketError)?;
292 continue;
293 }
294 _ => continue,
295 };
296
297 let msg: PusherMessage = match serde_json::from_str(&text) {
298 Ok(m) => m,
299 Err(_) => continue,
300 };
301
302 if msg.event == event_name {
303 return Ok(());
304 }
305 }
306}