Skip to main content

kick_api/
live_chat.rs

1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13/// Client for receiving live chat messages over Kick's Pusher WebSocket.
14///
15/// Connects to the public Pusher channel for a chatroom and yields chat
16/// messages in real time. **No authentication is required.**
17///
18/// # Connecting
19///
20/// There are two ways to connect:
21///
22/// - [`connect_by_username`](Self::connect_by_username) — pass a Kick username
23///   and the chatroom ID is resolved automatically (requires `curl` on PATH).
24/// - [`connect`](Self::connect) — pass a chatroom ID directly.
25///
26/// # Example
27///
28/// ```no_run
29/// use kick_api::LiveChatClient;
30///
31/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
32/// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
33/// while let Some(msg) = chat.next_message().await? {
34///     println!("{}: {}", msg.sender.username, msg.content);
35/// }
36/// # Ok(())
37/// # }
38/// ```
39pub struct LiveChatClient {
40    ws: WsStream,
41}
42
43impl std::fmt::Debug for LiveChatClient {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("LiveChatClient").finish_non_exhaustive()
46    }
47}
48
49impl LiveChatClient {
50    /// Connect to a chatroom by the channel's username/slug.
51    ///
52    /// Looks up the chatroom ID via Kick's public API and connects to the
53    /// WebSocket. No authentication is required.
54    ///
55    /// # Example
56    /// ```no_run
57    /// use kick_api::LiveChatClient;
58    ///
59    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60    /// let mut chat = LiveChatClient::connect_by_username("xqc").await?;
61    /// while let Some(msg) = chat.next_message().await? {
62    ///     println!("{}: {}", msg.sender.username, msg.content);
63    /// }
64    /// # Ok(())
65    /// # }
66    /// ```
67    pub async fn connect_by_username(username: &str) -> Result<Self> {
68        let chatroom_id = lookup_chatroom_id(username).await?;
69        Self::connect(chatroom_id).await
70    }
71
72    /// Connect to a chatroom by its ID.
73    ///
74    /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
75    /// channel. No authentication is required.
76    ///
77    /// To find a channel's chatroom ID, visit
78    /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
79    /// `"chatroom":{"id":`.
80    pub async fn connect(chatroom_id: u64) -> Result<Self> {
81        let channel = format!("chatrooms.{chatroom_id}.v2");
82
83        let (mut ws, _) = connect_async(PUSHER_URL)
84            .await
85            .map_err(KickApiError::WebSocketError)?;
86
87        // Wait for pusher:connection_established
88        wait_for_event(&mut ws, "pusher:connection_established").await?;
89
90        // Subscribe to the chatroom channel
91        let subscribe = serde_json::json!({
92            "event": "pusher:subscribe",
93            "data": {
94                "auth": "",
95                "channel": channel,
96            }
97        });
98        ws.send(Message::Text(subscribe.to_string().into()))
99            .await
100            .map_err(KickApiError::WebSocketError)?;
101
102        // Wait for subscription confirmation
103        wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
104
105        Ok(Self { ws })
106    }
107
108    /// Receive the next raw Pusher event.
109    ///
110    /// Returns all events from the subscribed channel (chat messages, pins,
111    /// subs, bans, etc.). Automatically handles Pusher-level pings and
112    /// internal protocol events. Returns `None` if the connection is closed.
113    pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
114        loop {
115            let Some(frame) = self.ws.next().await else {
116                return Ok(None);
117            };
118
119            let frame = frame.map_err(KickApiError::WebSocketError)?;
120
121            let text = match frame {
122                Message::Text(t) => t,
123                Message::Close(_) => return Ok(None),
124                Message::Ping(data) => {
125                    self.ws
126                        .send(Message::Pong(data))
127                        .await
128                        .map_err(KickApiError::WebSocketError)?;
129                    continue;
130                }
131                _ => continue,
132            };
133
134            let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
135                Ok(m) => m,
136                Err(_) => continue,
137            };
138
139            // Handle Pusher-level pings automatically
140            if pusher_msg.event == "pusher:ping" {
141                let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
142                self.ws
143                    .send(Message::Text(pong.to_string().into()))
144                    .await
145                    .map_err(KickApiError::WebSocketError)?;
146                continue;
147            }
148
149            // Skip internal Pusher protocol events
150            if pusher_msg.event.starts_with("pusher:")
151                || pusher_msg.event.starts_with("pusher_internal:")
152            {
153                continue;
154            }
155
156            return Ok(Some(PusherEvent {
157                event: pusher_msg.event,
158                channel: pusher_msg.channel,
159                data: pusher_msg.data,
160            }));
161        }
162    }
163
164    /// Receive the next chat message.
165    ///
166    /// Blocks until a chat message arrives. Automatically handles Pusher-level
167    /// pings and skips non-chat events. Returns `None` if the connection is
168    /// closed.
169    pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
170        loop {
171            let Some(event) = self.next_event().await? else {
172                return Ok(None);
173            };
174
175            if event.event != "App\\Events\\ChatMessageEvent" {
176                continue;
177            }
178
179            // Data is double-encoded: outer JSON has `data` as a string
180            let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
181                Ok(m) => m,
182                Err(_) => continue,
183            };
184
185            return Ok(Some(msg));
186        }
187    }
188
189    /// Send a Pusher-level ping to keep the connection alive.
190    pub async fn send_ping(&mut self) -> Result<()> {
191        let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
192        self.ws
193            .send(Message::Text(ping.to_string().into()))
194            .await
195            .map_err(KickApiError::WebSocketError)?;
196        Ok(())
197    }
198
199    /// Close the WebSocket connection.
200    pub async fn close(&mut self) -> Result<()> {
201        self.ws
202            .close(None)
203            .await
204            .map_err(KickApiError::WebSocketError)?;
205        Ok(())
206    }
207}
208
209/// Look up a channel's chatroom ID from its username/slug using Kick's public v2 API.
210///
211/// This uses `curl` as a subprocess because Kick's Cloudflare protection blocks
212/// HTTP libraries based on TLS fingerprinting. `curl` uses the OS-native TLS
213/// stack which Cloudflare allows. `curl` ships with Windows 10+, macOS, and
214/// virtually all Linux distributions.
215async fn lookup_chatroom_id(username: &str) -> Result<u64> {
216    let url = format!("https://kick.com/api/v2/channels/{}", username);
217
218    let output = tokio::process::Command::new("curl")
219        .args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url])
220        .output()
221        .await
222        .map_err(|e| KickApiError::UnexpectedError(format!(
223            "Failed to run curl (is it installed?): {}", e
224        )))?;
225
226    if !output.status.success() {
227        return Err(KickApiError::ApiError(format!(
228            "curl failed for channel '{}': exit code {:?}",
229            username,
230            output.status.code()
231        )));
232    }
233
234    let body: serde_json::Value = serde_json::from_slice(&output.stdout)
235        .map_err(|e| KickApiError::ApiError(format!(
236            "Failed to parse channel response for '{}': {}", username, e
237        )))?;
238
239    body["chatroom"]["id"]
240        .as_u64()
241        .ok_or_else(|| KickApiError::ApiError(format!(
242            "Channel '{}' not found or missing chatroom ID",
243            username
244        )))
245}
246
247/// Wait for a specific Pusher event on the WebSocket.
248async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
249    loop {
250        let Some(frame) = ws.next().await else {
251            return Err(KickApiError::UnexpectedError(format!(
252                "Connection closed while waiting for '{event_name}'"
253            )));
254        };
255
256        let frame = frame.map_err(KickApiError::WebSocketError)?;
257
258        let text = match frame {
259            Message::Text(t) => t,
260            Message::Ping(data) => {
261                ws.send(Message::Pong(data))
262                    .await
263                    .map_err(KickApiError::WebSocketError)?;
264                continue;
265            }
266            _ => continue,
267        };
268
269        let msg: PusherMessage = match serde_json::from_str(&text) {
270            Ok(m) => m,
271            Err(_) => continue,
272        };
273
274        if msg.event == event_name {
275            return Ok(());
276        }
277    }
278}