Skip to main content

kick_api/
live_chat.rs

1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::ChannelInfo;
6use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
7
8const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
9
10type WsStream = tokio_tungstenite::WebSocketStream<
11    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
12>;
13
14/// Client for receiving live chat messages over Kick's Pusher WebSocket.
15///
16/// Connects to the public Pusher channel for a chatroom and yields chat
17/// messages in real time. **No authentication is required.**
18///
19/// # Connecting
20///
21/// There are two ways to connect:
22///
23/// - [`connect_by_username`](Self::connect_by_username) — pass a Kick username
24///   and the chatroom ID is resolved automatically (requires `curl` on PATH).
25/// - [`connect`](Self::connect) — pass a chatroom ID directly.
26///
27/// # Example
28///
29/// ```no_run
30/// use kick_api::LiveChatClient;
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
34/// while let Some(msg) = chat.next_message().await? {
35///     println!("{}: {}", msg.sender.username, msg.content);
36/// }
37/// # Ok(())
38/// # }
39/// ```
40pub struct LiveChatClient {
41    ws: WsStream,
42}
43
44impl std::fmt::Debug for LiveChatClient {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("LiveChatClient").finish_non_exhaustive()
47    }
48}
49
50impl LiveChatClient {
51    /// Connect to a chatroom by the channel's username/slug.
52    ///
53    /// Looks up the chatroom ID via Kick's public API and connects to the
54    /// WebSocket. No authentication is required.
55    ///
56    /// # Example
57    /// ```no_run
58    /// use kick_api::LiveChatClient;
59    ///
60    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
61    /// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
62    /// while let Some(msg) = chat.next_message().await? {
63    ///     println!("{}: {}", msg.sender.username, msg.content);
64    /// }
65    /// # Ok(())
66    /// # }
67    /// ```
68    pub async fn connect_by_username(username: &str) -> Result<Self> {
69        let info = fetch_channel_info_inner(username).await?;
70        Self::connect(info.chatroom.id).await
71    }
72
73    /// Connect to a chatroom by its ID.
74    ///
75    /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
76    /// channel. No authentication is required.
77    ///
78    /// To find a channel's chatroom ID, visit
79    /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
80    /// `"chatroom":{"id":`.
81    pub async fn connect(chatroom_id: u64) -> Result<Self> {
82        let channel = format!("chatrooms.{chatroom_id}.v2");
83
84        let (mut ws, _) = connect_async(PUSHER_URL)
85            .await
86            .map_err(KickApiError::WebSocketError)?;
87
88        // Wait for pusher:connection_established
89        wait_for_event(&mut ws, "pusher:connection_established").await?;
90
91        // Subscribe to the chatroom channel
92        let subscribe = serde_json::json!({
93            "event": "pusher:subscribe",
94            "data": {
95                "auth": "",
96                "channel": channel,
97            }
98        });
99        ws.send(Message::Text(subscribe.to_string().into()))
100            .await
101            .map_err(KickApiError::WebSocketError)?;
102
103        // Wait for subscription confirmation
104        wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
105
106        Ok(Self { ws })
107    }
108
109    /// Receive the next raw Pusher event.
110    ///
111    /// Returns all events from the subscribed channel (chat messages, pins,
112    /// subs, bans, etc.). Automatically handles Pusher-level pings and
113    /// internal protocol events. Returns `None` if the connection is closed.
114    pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
115        loop {
116            let Some(frame) = self.ws.next().await else {
117                return Ok(None);
118            };
119
120            let frame = frame.map_err(KickApiError::WebSocketError)?;
121
122            let text = match frame {
123                Message::Text(t) => t,
124                Message::Close(_) => return Ok(None),
125                Message::Ping(data) => {
126                    self.ws
127                        .send(Message::Pong(data))
128                        .await
129                        .map_err(KickApiError::WebSocketError)?;
130                    continue;
131                }
132                _ => continue,
133            };
134
135            let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
136                Ok(m) => m,
137                Err(_) => continue,
138            };
139
140            // Handle Pusher-level pings automatically
141            if pusher_msg.event == "pusher:ping" {
142                let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
143                self.ws
144                    .send(Message::Text(pong.to_string().into()))
145                    .await
146                    .map_err(KickApiError::WebSocketError)?;
147                continue;
148            }
149
150            // Skip internal Pusher protocol events
151            if pusher_msg.event.starts_with("pusher:")
152                || pusher_msg.event.starts_with("pusher_internal:")
153            {
154                continue;
155            }
156
157            return Ok(Some(PusherEvent {
158                event: pusher_msg.event,
159                channel: pusher_msg.channel,
160                data: pusher_msg.data,
161            }));
162        }
163    }
164
165    /// Receive the next chat message.
166    ///
167    /// Blocks until a chat message arrives. Automatically handles Pusher-level
168    /// pings and skips non-chat events. Returns `None` if the connection is
169    /// closed.
170    pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
171        loop {
172            let Some(event) = self.next_event().await? else {
173                return Ok(None);
174            };
175
176            if event.event != "App\\Events\\ChatMessageEvent" {
177                continue;
178            }
179
180            // Data is double-encoded: outer JSON has `data` as a string
181            let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
182                Ok(m) => m,
183                Err(_) => continue,
184            };
185
186            return Ok(Some(msg));
187        }
188    }
189
190    /// Send a Pusher-level ping to keep the connection alive.
191    pub async fn send_ping(&mut self) -> Result<()> {
192        let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
193        self.ws
194            .send(Message::Text(ping.to_string().into()))
195            .await
196            .map_err(KickApiError::WebSocketError)?;
197        Ok(())
198    }
199
200    /// Close the WebSocket connection.
201    pub async fn close(&mut self) -> Result<()> {
202        self.ws
203            .close(None)
204            .await
205            .map_err(KickApiError::WebSocketError)?;
206        Ok(())
207    }
208}
209
210/// Fetch public channel information from Kick's v2 API.
211///
212/// Returns chatroom settings, subscriber badges, user profile, and livestream
213/// status for any channel. **No authentication required.**
214///
215/// This uses `curl` as a subprocess because Kick's Cloudflare protection blocks
216/// HTTP libraries based on TLS fingerprinting. `curl` ships with Windows 10+,
217/// macOS, and virtually all Linux distributions.
218///
219/// # Example
220///
221/// ```no_run
222/// use kick_api::fetch_channel_info;
223///
224/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
225/// let info = fetch_channel_info("xqc").await?;
226///
227/// // Chatroom settings
228/// println!("Chatroom ID: {}", info.chatroom.id);
229/// println!("Slow mode: {}", info.chatroom.slow_mode);
230/// println!("Followers only: {}", info.chatroom.followers_mode);
231///
232/// // Subscriber badges
233/// for badge in &info.subscriber_badges {
234///     println!("{}mo badge: {}", badge.months, badge.badge_image.src);
235/// }
236///
237/// // Livestream status
238/// if let Some(stream) = &info.livestream {
239///     println!("{} is live with {} viewers", info.slug, stream.viewer_count);
240/// }
241/// # Ok(())
242/// # }
243/// ```
244pub async fn fetch_channel_info(username: &str) -> Result<ChannelInfo> {
245    fetch_channel_info_inner(username).await
246}
247
248async fn fetch_channel_info_inner(username: &str) -> Result<ChannelInfo> {
249    let url = format!("https://kick.com/api/v2/channels/{}", username);
250
251    let output = tokio::process::Command::new("curl")
252        .args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url])
253        .output()
254        .await
255        .map_err(|e| KickApiError::UnexpectedError(format!(
256            "Failed to run curl (is it installed?): {}", e
257        )))?;
258
259    if !output.status.success() {
260        return Err(KickApiError::ApiError(format!(
261            "curl failed for channel '{}': exit code {:?}",
262            username,
263            output.status.code()
264        )));
265    }
266
267    let info: ChannelInfo = serde_json::from_slice(&output.stdout)
268        .map_err(|e| KickApiError::ApiError(format!(
269            "Failed to parse channel response for '{}': {}", username, e
270        )))?;
271
272    Ok(info)
273}
274
275/// Wait for a specific Pusher event on the WebSocket.
276async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
277    loop {
278        let Some(frame) = ws.next().await else {
279            return Err(KickApiError::UnexpectedError(format!(
280                "Connection closed while waiting for '{event_name}'"
281            )));
282        };
283
284        let frame = frame.map_err(KickApiError::WebSocketError)?;
285
286        let text = match frame {
287            Message::Text(t) => t,
288            Message::Ping(data) => {
289                ws.send(Message::Pong(data))
290                    .await
291                    .map_err(KickApiError::WebSocketError)?;
292                continue;
293            }
294            _ => continue,
295        };
296
297        let msg: PusherMessage = match serde_json::from_str(&text) {
298            Ok(m) => m,
299            Err(_) => continue,
300        };
301
302        if msg.event == event_name {
303            return Ok(());
304        }
305    }
306}