Skip to main content

kick_api/
live_chat.rs

1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::ChannelInfo;
6use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
7
8const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
9
10type WsStream = tokio_tungstenite::WebSocketStream<
11    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
12>;
13
14/// Client for receiving live chat messages over Kick's Pusher WebSocket.
15///
16/// Connects to the public Pusher channel for a chatroom and yields chat
17/// messages in real time. **No authentication is required.**
18///
19/// # Connecting
20///
21/// There are two ways to connect:
22///
23/// - [`connect_by_username`](Self::connect_by_username) — pass a Kick username
24///   and the chatroom ID is resolved automatically (requires `curl` on PATH).
25/// - [`connect`](Self::connect) — pass a chatroom ID directly.
26///
27/// # Example
28///
29/// ```no_run
30/// use kick_api::LiveChatClient;
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
34/// while let Some(msg) = chat.next_message().await? {
35///     println!("{}: {}", msg.sender.username, msg.content);
36/// }
37/// # Ok(())
38/// # }
39/// ```
40pub struct LiveChatClient {
41    ws: WsStream,
42}
43
44impl std::fmt::Debug for LiveChatClient {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("LiveChatClient").finish_non_exhaustive()
47    }
48}
49
50impl LiveChatClient {
51    /// Connect to a chatroom by the channel's username/slug.
52    ///
53    /// Looks up the chatroom ID via Kick's public API and connects to the
54    /// WebSocket. No authentication is required.
55    ///
56    /// # Example
57    /// ```no_run
58    /// use kick_api::LiveChatClient;
59    ///
60    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
61    /// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
62    /// while let Some(msg) = chat.next_message().await? {
63    ///     println!("{}: {}", msg.sender.username, msg.content);
64    /// }
65    /// # Ok(())
66    /// # }
67    /// ```
68    pub async fn connect_by_username(username: &str) -> Result<Self> {
69        let info = fetch_channel_info_inner(username).await?;
70        Self::connect(info.chatroom.id).await
71    }
72
73    /// Connect to a chatroom by its ID.
74    ///
75    /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
76    /// channel. No authentication is required.
77    ///
78    /// To find a channel's chatroom ID, visit
79    /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
80    /// `"chatroom":{"id":`.
81    pub async fn connect(chatroom_id: u64) -> Result<Self> {
82        let channel = format!("chatrooms.{chatroom_id}.v2");
83
84        let (mut ws, _) = connect_async(PUSHER_URL)
85            .await
86            .map_err(KickApiError::WebSocketError)?;
87
88        // Wait for pusher:connection_established
89        wait_for_event(&mut ws, "pusher:connection_established").await?;
90
91        // Subscribe to the chatroom channel
92        let subscribe = serde_json::json!({
93            "event": "pusher:subscribe",
94            "data": {
95                "auth": "",
96                "channel": channel,
97            }
98        });
99        ws.send(Message::Text(subscribe.to_string().into()))
100            .await
101            .map_err(KickApiError::WebSocketError)?;
102
103        // Wait for subscription confirmation
104        wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
105
106        Ok(Self { ws })
107    }
108
109    /// Receive the next raw Pusher event.
110    ///
111    /// Returns all events from the subscribed channel (chat messages, pins,
112    /// subs, bans, etc.). Automatically handles Pusher-level pings and
113    /// internal protocol events. Returns `None` if the connection is closed.
114    pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
115        loop {
116            let Some(frame) = self.ws.next().await else {
117                return Ok(None);
118            };
119
120            let frame = frame.map_err(KickApiError::WebSocketError)?;
121
122            let text = match frame {
123                Message::Text(t) => t,
124                Message::Close(_) => return Ok(None),
125                Message::Ping(data) => {
126                    self.ws
127                        .send(Message::Pong(data))
128                        .await
129                        .map_err(KickApiError::WebSocketError)?;
130                    continue;
131                }
132                _ => continue,
133            };
134
135            let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
136                Ok(m) => m,
137                Err(_) => continue,
138            };
139
140            // Handle Pusher-level pings automatically
141            if pusher_msg.event == "pusher:ping" {
142                let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
143                self.ws
144                    .send(Message::Text(pong.to_string().into()))
145                    .await
146                    .map_err(KickApiError::WebSocketError)?;
147                continue;
148            }
149
150            // Skip internal Pusher protocol events
151            if pusher_msg.event.starts_with("pusher:")
152                || pusher_msg.event.starts_with("pusher_internal:")
153            {
154                continue;
155            }
156
157            return Ok(Some(PusherEvent {
158                event: pusher_msg.event,
159                channel: pusher_msg.channel,
160                data: pusher_msg.data,
161            }));
162        }
163    }
164
165    /// Receive the next chat message.
166    ///
167    /// Blocks until a chat message arrives. Automatically handles Pusher-level
168    /// pings and skips non-chat events. Returns `None` if the connection is
169    /// closed.
170    pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
171        loop {
172            let Some(event) = self.next_event().await? else {
173                return Ok(None);
174            };
175
176            if event.event != "App\\Events\\ChatMessageEvent" {
177                continue;
178            }
179
180            // Data is double-encoded: outer JSON has `data` as a string
181            let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
182                Ok(m) => m,
183                Err(_) => continue,
184            };
185
186            return Ok(Some(msg));
187        }
188    }
189
190    /// Send a Pusher-level ping to keep the connection alive.
191    pub async fn send_ping(&mut self) -> Result<()> {
192        let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
193        self.ws
194            .send(Message::Text(ping.to_string().into()))
195            .await
196            .map_err(KickApiError::WebSocketError)?;
197        Ok(())
198    }
199
200    /// Close the WebSocket connection.
201    pub async fn close(&mut self) -> Result<()> {
202        self.ws
203            .close(None)
204            .await
205            .map_err(KickApiError::WebSocketError)?;
206        Ok(())
207    }
208}
209
210/// Fetch public channel information from Kick's v2 API.
211///
212/// Returns chatroom settings, subscriber badges, user profile, and livestream
213/// status for any channel. **No authentication required.**
214///
215/// This uses `curl` as a subprocess because Kick's Cloudflare protection blocks
216/// HTTP libraries based on TLS fingerprinting. `curl` ships with Windows 10+,
217/// macOS, and virtually all Linux distributions.
218///
219/// # Example
220///
221/// ```no_run
222/// use kick_api::fetch_channel_info;
223///
224/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
225/// let info = fetch_channel_info("xqc").await?;
226///
227/// // Chatroom settings
228/// println!("Chatroom ID: {}", info.chatroom.id);
229/// println!("Slow mode: {}", info.chatroom.slow_mode);
230/// println!("Followers only: {}", info.chatroom.followers_mode);
231///
232/// // Subscriber badges
233/// for badge in &info.subscriber_badges {
234///     println!("{}mo badge: {}", badge.months, badge.badge_image.src);
235/// }
236///
237/// // Livestream status
238/// if let Some(stream) = &info.livestream {
239///     println!("{} is live with {} viewers", info.slug, stream.viewer_count);
240/// }
241/// # Ok(())
242/// # }
243/// ```
244pub async fn fetch_channel_info(username: &str) -> Result<ChannelInfo> {
245    fetch_channel_info_inner(username).await
246}
247
248async fn fetch_channel_info_inner(username: &str) -> Result<ChannelInfo> {
249    let url = format!("https://kick.com/api/v2/channels/{}", username);
250
251    let mut cmd = tokio::process::Command::new("curl");
252    cmd.args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url]);
253
254    // Prevent a visible console window from flashing on Windows
255    #[cfg(target_os = "windows")]
256    {
257        #[allow(unused_imports)]
258        use std::os::windows::process::CommandExt;
259        cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW
260    }
261
262    let output = cmd
263        .output()
264        .await
265        .map_err(|e| KickApiError::UnexpectedError(format!(
266            "Failed to run curl (is it installed?): {}", e
267        )))?;
268
269    if !output.status.success() {
270        return Err(KickApiError::ApiError(format!(
271            "curl failed for channel '{}': exit code {:?}",
272            username,
273            output.status.code()
274        )));
275    }
276
277    let info: ChannelInfo = serde_json::from_slice(&output.stdout)
278        .map_err(|e| KickApiError::ApiError(format!(
279            "Failed to parse channel response for '{}': {}", username, e
280        )))?;
281
282    Ok(info)
283}
284
285/// Wait for a specific Pusher event on the WebSocket.
286async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
287    loop {
288        let Some(frame) = ws.next().await else {
289            return Err(KickApiError::UnexpectedError(format!(
290                "Connection closed while waiting for '{event_name}'"
291            )));
292        };
293
294        let frame = frame.map_err(KickApiError::WebSocketError)?;
295
296        let text = match frame {
297            Message::Text(t) => t,
298            Message::Ping(data) => {
299                ws.send(Message::Pong(data))
300                    .await
301                    .map_err(KickApiError::WebSocketError)?;
302                continue;
303            }
304            _ => continue,
305        };
306
307        let msg: PusherMessage = match serde_json::from_str(&text) {
308            Ok(m) => m,
309            Err(_) => continue,
310        };
311
312        if msg.event == event_name {
313            return Ok(());
314        }
315    }
316}