use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use crate::error::{KickApiError, Result};
use crate::models::ChannelInfo;
use crate::models::FollowedChannelsResponse;
use crate::models::live_chat::{LiveChatMessage, PusherEvent, PusherMessage};
const PUSHER_URL: &str = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=8.4.0&flash=false";
type WsStream = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
pub struct LiveChatClient {
ws: WsStream,
}
impl std::fmt::Debug for LiveChatClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiveChatClient").finish_non_exhaustive()
}
}
impl LiveChatClient {
pub async fn connect_by_username(username: &str) -> Result<Self> {
let info = fetch_channel_info_inner(username).await?;
Self::connect(info.chatroom.id).await
}
pub async fn connect(chatroom_id: u64) -> Result<Self> {
let channel = format!("chatrooms.{chatroom_id}.v2");
let (mut ws, _) = connect_async(PUSHER_URL)
.await
.map_err(KickApiError::WebSocketError)?;
wait_for_event(&mut ws, "pusher:connection_established").await?;
let subscribe = serde_json::json!({
"event": "pusher:subscribe",
"data": {
"auth": "",
"channel": channel,
}
});
ws.send(Message::Text(subscribe.to_string().into()))
.await
.map_err(KickApiError::WebSocketError)?;
wait_for_event(&mut ws, "pusher_internal:subscription_succeeded").await?;
Ok(Self { ws })
}
pub async fn next_event(&mut self) -> Result<Option<PusherEvent>> {
loop {
let Some(frame) = self.ws.next().await else {
return Ok(None);
};
let frame = frame.map_err(KickApiError::WebSocketError)?;
let text = match frame {
Message::Text(t) => t,
Message::Close(_) => return Ok(None),
Message::Ping(data) => {
self.ws
.send(Message::Pong(data))
.await
.map_err(KickApiError::WebSocketError)?;
continue;
}
_ => continue,
};
let pusher_msg: PusherMessage = match serde_json::from_str(&text) {
Ok(m) => m,
Err(_) => continue,
};
if pusher_msg.event == "pusher:ping" {
let pong = serde_json::json!({ "event": "pusher:pong", "data": {} });
self.ws
.send(Message::Text(pong.to_string().into()))
.await
.map_err(KickApiError::WebSocketError)?;
continue;
}
if pusher_msg.event.starts_with("pusher:")
|| pusher_msg.event.starts_with("pusher_internal:")
{
continue;
}
return Ok(Some(PusherEvent {
event: pusher_msg.event,
channel: pusher_msg.channel,
data: pusher_msg.data,
}));
}
}
pub async fn next_message(&mut self) -> Result<Option<LiveChatMessage>> {
loop {
let Some(event) = self.next_event().await? else {
return Ok(None);
};
if event.event != "App\\Events\\ChatMessageEvent" {
continue;
}
let msg: LiveChatMessage = match serde_json::from_str(&event.data) {
Ok(m) => m,
Err(_) => continue,
};
return Ok(Some(msg));
}
}
pub async fn send_ping(&mut self) -> Result<()> {
let ping = serde_json::json!({ "event": "pusher:ping", "data": {} });
self.ws
.send(Message::Text(ping.to_string().into()))
.await
.map_err(KickApiError::WebSocketError)?;
Ok(())
}
pub async fn close(&mut self) -> Result<()> {
self.ws
.close(None)
.await
.map_err(KickApiError::WebSocketError)?;
Ok(())
}
}
pub async fn fetch_channel_info(username: &str) -> Result<ChannelInfo> {
fetch_channel_info_inner(username).await
}
pub async fn fetch_followed_channels(token: &str) -> Result<FollowedChannelsResponse> {
let url = "https://kick.com/api/v2/channels/followed";
let auth_header = format!("Bearer {}", token);
let mut cmd = tokio::process::Command::new("curl");
cmd.args([
"-s",
"-H", "Accept: application/json",
"-H", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36",
"-H", &format!("Authorization: {}", auth_header),
url,
]);
#[cfg(target_os = "windows")]
{
#[allow(unused_imports)]
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x08000000); }
let output = cmd
.output()
.await
.map_err(|e| KickApiError::UnexpectedError(format!(
"Failed to run curl (is it installed?): {}", e
)))?;
if !output.status.success() {
return Err(KickApiError::ApiError(format!(
"curl failed for followed channels: exit code {:?}",
output.status.code()
)));
}
let resp: FollowedChannelsResponse = serde_json::from_slice(&output.stdout)
.map_err(|e| KickApiError::ApiError(format!(
"Failed to parse followed channels response: {}", e
)))?;
Ok(resp)
}
async fn fetch_channel_info_inner(username: &str) -> Result<ChannelInfo> {
let url = format!("https://kick.com/api/v2/channels/{}", username);
let mut cmd = tokio::process::Command::new("curl");
cmd.args(["-s", "-H", "Accept: application/json", "-H", "User-Agent: Chatterino7", &url]);
#[cfg(target_os = "windows")]
{
#[allow(unused_imports)]
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x08000000); }
let output = cmd
.output()
.await
.map_err(|e| KickApiError::UnexpectedError(format!(
"Failed to run curl (is it installed?): {}", e
)))?;
if !output.status.success() {
return Err(KickApiError::ApiError(format!(
"curl failed for channel '{}': exit code {:?}",
username,
output.status.code()
)));
}
let info: ChannelInfo = serde_json::from_slice(&output.stdout)
.map_err(|e| KickApiError::ApiError(format!(
"Failed to parse channel response for '{}': {}", username, e
)))?;
Ok(info)
}
async fn wait_for_event(ws: &mut WsStream, event_name: &str) -> Result<()> {
loop {
let Some(frame) = ws.next().await else {
return Err(KickApiError::UnexpectedError(format!(
"Connection closed while waiting for '{event_name}'"
)));
};
let frame = frame.map_err(KickApiError::WebSocketError)?;
let text = match frame {
Message::Text(t) => t,
Message::Ping(data) => {
ws.send(Message::Pong(data))
.await
.map_err(KickApiError::WebSocketError)?;
continue;
}
_ => continue,
};
let msg: PusherMessage = match serde_json::from_str(&text) {
Ok(m) => m,
Err(_) => continue,
};
if msg.event == event_name {
return Ok(());
}
}
}