Skip to main content

kick_api/
live_chat.rs

1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13/// Client for receiving live chat messages over Kick's Pusher WebSocket.
14///
15/// This connects to the public Pusher channel for a chatroom and yields
16/// chat messages in real time. No authentication is required.
17///
18/// The chatroom ID can be found by visiting
19/// `https://kick.com/api/v2/channels/{slug}` in a browser and searching
20/// for `"chatroom":{"id":`.
21///
22/// # Example
23/// ```no_run
24/// use kick_api::LiveChatClient;
25///
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// let mut chat = LiveChatClient::connect(27670567).await?;
28/// while let Some(msg) = chat.next_message().await? {
29///     println!("{}: {}", msg.sender.username, msg.content);
30/// }
31/// # Ok(())
32/// # }
33/// ```
34pub struct LiveChatClient {
35    ws: WsStream,
36}
37
38impl std::fmt::Debug for LiveChatClient {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("LiveChatClient").finish_non_exhaustive()
41    }
42}
43
44impl LiveChatClient {
45    /// Connect to a chatroom by its ID.
46    ///
47    /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
48    /// channel. No authentication is required.
49    ///
50    /// To find a channel's chatroom ID, visit
51    /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
52    /// `"chatroom":{"id":`.
53    pub async fn connect(chatroom_id: u64) -> Result<Self> {
54        let channel = format!("chatrooms.{chatroom_id}.v2");
55
56        let (mut ws, _) = connect_async(PUSHER_URL)
57            .await
58            .map_err(KickApiError::WebSocketError)?;
59
60        // Wait for pusher:connection_established
61        wait_for_event(&mut ws, "pusher:connection_established").await?;
62
63        // Subscribe to the chatroom channel
64        let subscribe = serde_json::json!({
65            "event": "pusher:subscribe",
66            "data": {
67                "auth": "",
68                "channel": channel,
69            }
70        });
71        ws.send(Message::Text(subscribe.to_string().into()))
72            .await
73            .map_err(KickApiError::WebSocketError)?;
74
75        // Wait for subscription confirmation
76        wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
77
78        Ok(Self { ws })
79    }
80
81    /// Receive the next raw Pusher event.
82    ///
83    /// Returns all events from the subscribed channel (chat messages, pins,
84    /// subs, bans, etc.). Automatically handles Pusher-level pings and
85    /// internal protocol events. Returns `None` if the connection is closed.
86    pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
87        loop {
88            let Some(frame) = self.ws.next().await else {
89                return Ok(None);
90            };
91
92            let frame = frame.map_err(KickApiError::WebSocketError)?;
93
94            let text = match frame {
95                Message::Text(t) => t,
96                Message::Close(_) => return Ok(None),
97                Message::Ping(data) => {
98                    self.ws
99                        .send(Message::Pong(data))
100                        .await
101                        .map_err(KickApiError::WebSocketError)?;
102                    continue;
103                }
104                _ => continue,
105            };
106
107            let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
108                Ok(m) => m,
109                Err(_) => continue,
110            };
111
112            // Handle Pusher-level pings automatically
113            if pusher_msg.event == "pusher:ping" {
114                let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
115                self.ws
116                    .send(Message::Text(pong.to_string().into()))
117                    .await
118                    .map_err(KickApiError::WebSocketError)?;
119                continue;
120            }
121
122            // Skip internal Pusher protocol events
123            if pusher_msg.event.starts_with("pusher:")
124                || pusher_msg.event.starts_with("pusher_internal:")
125            {
126                continue;
127            }
128
129            return Ok(Some(PusherEvent {
130                event: pusher_msg.event,
131                channel: pusher_msg.channel,
132                data: pusher_msg.data,
133            }));
134        }
135    }
136
137    /// Receive the next chat message.
138    ///
139    /// Blocks until a chat message arrives. Automatically handles Pusher-level
140    /// pings and skips non-chat events. Returns `None` if the connection is
141    /// closed.
142    pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
143        loop {
144            let Some(event) = self.next_event().await? else {
145                return Ok(None);
146            };
147
148            if event.event != "App\\Events\\ChatMessageEvent" {
149                continue;
150            }
151
152            // Data is double-encoded: outer JSON has `data` as a string
153            let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
154                Ok(m) => m,
155                Err(_) => continue,
156            };
157
158            return Ok(Some(msg));
159        }
160    }
161
162    /// Send a Pusher-level ping to keep the connection alive.
163    pub async fn send_ping(&mut self) -> Result<()> {
164        let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
165        self.ws
166            .send(Message::Text(ping.to_string().into()))
167            .await
168            .map_err(KickApiError::WebSocketError)?;
169        Ok(())
170    }
171
172    /// Close the WebSocket connection.
173    pub async fn close(&mut self) -> Result<()> {
174        self.ws
175            .close(None)
176            .await
177            .map_err(KickApiError::WebSocketError)?;
178        Ok(())
179    }
180}
181
182/// Wait for a specific Pusher event on the WebSocket.
183async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
184    loop {
185        let Some(frame) = ws.next().await else {
186            return Err(KickApiError::UnexpectedError(format!(
187                "Connection closed while waiting for '{event_name}'"
188            )));
189        };
190
191        let frame = frame.map_err(KickApiError::WebSocketError)?;
192
193        let text = match frame {
194            Message::Text(t) => t,
195            Message::Ping(data) => {
196                ws.send(Message::Pong(data))
197                    .await
198                    .map_err(KickApiError::WebSocketError)?;
199                continue;
200            }
201            _ => continue,
202        };
203
204        let msg: PusherMessage = match serde_json::from_str(&text) {
205            Ok(m) => m,
206            Err(_) => continue,
207        };
208
209        if msg.event == event_name {
210            return Ok(());
211        }
212    }
213}