Skip to main content

kick_api/
live_chat.rs

1use futures_util::{SinkExt, StreamExt};
2use tokio_tungstenite::{connect_async, tungstenite::Message};
3
4use crate::error::{KickApiError, Result};
5use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
6
7const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
8
9type WsStream = tokio_tungstenite::WebSocketStream<
10    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
11>;
12
13/// Client for receiving live chat messages over Kick's Pusher WebSocket.
14///
15/// This connects to the public Pusher channel for a chatroom and yields
16/// chat messages in real time. No authentication is required.
17///
18/// The chatroom ID can be found by visiting
19/// `https://kick.com/api/v2/channels/{slug}` in a browser and searching
20/// for `"chatroom":{"id":`.
21///
22/// # Example
23/// ```no_run
24/// use kick_api::LiveChatClient;
25///
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// let mut chat = LiveChatClient::connect(27670567).await?;
28/// while let Some(msg) = chat.next_message().await? {
29///     println!("{}: {}", msg.sender.username, msg.content);
30/// }
31/// # Ok(())
32/// # }
33/// ```
34pub struct LiveChatClient {
35    ws: WsStream,
36}
37
38impl LiveChatClient {
39    /// Connect to a chatroom by its ID.
40    ///
41    /// Opens a WebSocket to Pusher and subscribes to the chatroom's public
42    /// channel. No authentication is required.
43    ///
44    /// To find a channel's chatroom ID, visit
45    /// `https://kick.com/api/v2/channels/{slug}` in a browser and look for
46    /// `"chatroom":{"id":`.
47    pub async fn connect(chatroom_id: u64) -> Result<Self> {
48        let channel = format!("chatrooms.{chatroom_id}.v2");
49
50        let (mut ws, _) = connect_async(PUSHER_URL)
51            .await
52            .map_err(KickApiError::WebSocketError)?;
53
54        // Wait for pusher:connection_established
55        wait_for_event(&mut ws, "pusher:connection_established").await?;
56
57        // Subscribe to the chatroom channel
58        let subscribe = serde_json::json!({
59            "event": "pusher:subscribe",
60            "data": {
61                "auth": "",
62                "channel": channel,
63            }
64        });
65        ws.send(Message::Text(subscribe.to_string().into()))
66            .await
67            .map_err(KickApiError::WebSocketError)?;
68
69        // Wait for subscription confirmation
70        wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
71
72        Ok(Self { ws })
73    }
74
75    /// Receive the next raw Pusher event.
76    ///
77    /// Returns all events from the subscribed channel (chat messages, pins,
78    /// subs, bans, etc.). Automatically handles Pusher-level pings and
79    /// internal protocol events. Returns `None` if the connection is closed.
80    pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
81        loop {
82            let Some(frame) = self.ws.next().await else {
83                return Ok(None);
84            };
85
86            let frame = frame.map_err(KickApiError::WebSocketError)?;
87
88            let text = match frame {
89                Message::Text(t) => t,
90                Message::Close(_) => return Ok(None),
91                Message::Ping(data) => {
92                    self.ws
93                        .send(Message::Pong(data))
94                        .await
95                        .map_err(KickApiError::WebSocketError)?;
96                    continue;
97                }
98                _ => continue,
99            };
100
101            let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
102                Ok(m) => m,
103                Err(_) => continue,
104            };
105
106            // Handle Pusher-level pings automatically
107            if pusher_msg.event == "pusher:ping" {
108                let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
109                self.ws
110                    .send(Message::Text(pong.to_string().into()))
111                    .await
112                    .map_err(KickApiError::WebSocketError)?;
113                continue;
114            }
115
116            // Skip internal Pusher protocol events
117            if pusher_msg.event.starts_with("pusher:")
118                || pusher_msg.event.starts_with("pusher_internal:")
119            {
120                continue;
121            }
122
123            return Ok(Some(PusherEvent {
124                event: pusher_msg.event,
125                channel: pusher_msg.channel,
126                data: pusher_msg.data,
127            }));
128        }
129    }
130
131    /// Receive the next chat message.
132    ///
133    /// Blocks until a chat message arrives. Automatically handles Pusher-level
134    /// pings and skips non-chat events. Returns `None` if the connection is
135    /// closed.
136    pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
137        loop {
138            let Some(event) = self.next_event().await? else {
139                return Ok(None);
140            };
141
142            if event.event != "App\\Events\\ChatMessageEvent" {
143                continue;
144            }
145
146            // Data is double-encoded: outer JSON has `data` as a string
147            let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
148                Ok(m) => m,
149                Err(_) => continue,
150            };
151
152            return Ok(Some(msg));
153        }
154    }
155
156    /// Send a Pusher-level ping to keep the connection alive.
157    pub async fn send_ping(&mut self) -> Result<()> {
158        let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
159        self.ws
160            .send(Message::Text(ping.to_string().into()))
161            .await
162            .map_err(KickApiError::WebSocketError)?;
163        Ok(())
164    }
165
166    /// Close the WebSocket connection.
167    pub async fn close(&mut self) -> Result<()> {
168        self.ws
169            .close(None)
170            .await
171            .map_err(KickApiError::WebSocketError)?;
172        Ok(())
173    }
174}
175
176/// Wait for a specific Pusher event on the WebSocket.
177async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
178    loop {
179        let Some(frame) = ws.next().await else {
180            return Err(KickApiError::UnexpectedError(format!(
181                "Connection closed while waiting for '{event_name}'"
182            )));
183        };
184
185        let frame = frame.map_err(KickApiError::WebSocketError)?;
186
187        let text = match frame {
188            Message::Text(t) => t,
189            Message::Ping(data) => {
190                ws.send(Message::Pong(data))
191                    .await
192                    .map_err(KickApiError::WebSocketError)?;
193                continue;
194            }
195            _ => continue,
196        };
197
198        let msg: PusherMessage = match serde_json::from_str(&text) {
199            Ok(m) => m,
200            Err(_) => continue,
201        };
202
203        if msg.event == event_name {
204            return Ok(());
205        }
206    }
207}