tiktoklive 0.0.19

A Rust library. Use it to receive live stream events such as comments and gifts in realtime from TikTok LIVE No credentials are required.
Documentation
use futures_util::{SinkExt, StreamExt};
use log::info;
use protobuf::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{interval, timeout, Duration};
use tokio_tungstenite::tungstenite::handshake::client::Request;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};

use crate::core::live_client::TikTokLiveClient;
use crate::core::live_client_mapper::TikTokLiveMessageMapper;
use crate::data::live_common::ConnectionState::CONNECTED;
use crate::errors::LibError;
use crate::generated::events::{TikTokConnectedEvent, TikTokLiveEvent};
use crate::generated::messages::webcast::{WebcastPushFrame, WebcastResponse};
use crate::http::http_data::LiveConnectionDataResponse;

pub struct TikTokLiveWebsocketClient {
    pub(crate) message_mapper: TikTokLiveMessageMapper,
    pub(crate) running: Arc<AtomicBool>,
}

impl TikTokLiveWebsocketClient {
    pub fn new(message_mapper: TikTokLiveMessageMapper) -> Self {
        TikTokLiveWebsocketClient {
            message_mapper,
            running: Arc::new(AtomicBool::new(false)),
        }
    }

    pub async fn start(
        &self,
        response: LiveConnectionDataResponse,
        client: Arc<TikTokLiveClient>,
    ) -> Result<(), LibError> {
        let host = response
            .web_socket_url
            .host_str()
            .ok_or(LibError::InvalidHost)?;

        let request = Request::builder()
            .method("GET")
            .uri(response.web_socket_url.to_string())
            .header("Host", host)
            .header("Upgrade", "websocket")
            .header("Connection", "keep-alive")
            .header("Cache-Control", "max-age=0")
            .header("Accept", "text/html,application/json,application/protobuf")
            .header("Sec-Websocket-Key", "asd")
            .header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36")
            .header("Referer", "https://www.tiktok.com/")
            .header("Origin", "https://www.tiktok.com")
            .header("Accept-Language", "en-US,en;q=0.9")
            .header("Accept-Encoding", "gzip, deflate")
            .header("Cookie", response.web_socket_cookies)
            .header("Sec-Websocket-Version", "13")
            .body(())
            .map_err(|_| LibError::ParamsError)?;

        let (ws_stream, _) = connect_async(request)
            .await
            .map_err(|_| LibError::WebSocketConnectFailed)?;
        let (write, mut read) = ws_stream.split();
        let write = Arc::new(Mutex::new(write));

        client.set_connection_state(CONNECTED);
        client.publish_event(TikTokLiveEvent::OnConnected(TikTokConnectedEvent {}));

        let running = self.running.clone();
        running.store(true, Ordering::SeqCst);

        let message_mapper = self.message_mapper.clone();
        let client_clone = client.clone();
        let write_clone = write.clone();
        let running_clone = running.clone();

        tokio::spawn(async move {
            info!("Websocket connected");
            while running_clone.load(Ordering::SeqCst) {
                if let Some(Ok(message)) = read.next().await {
                    if let WsMessage::Binary(buffer) = message {
                        let mut push_frame = match WebcastPushFrame::parse_from_bytes(&buffer) {
                            Ok(frame) => frame,
                            Err(_) => continue,
                        };

                        let webcast_response = match WebcastResponse::parse_from_bytes(
                            push_frame.Payload.as_mut_slice(),
                        ) {
                            Ok(response) => response,
                            Err(_) => continue,
                        };

                        if webcast_response.needsAck {
                            let mut push_frame_ack = WebcastPushFrame::new();
                            push_frame_ack.PayloadType = "ack".to_string();
                            push_frame_ack.LogId = push_frame.LogId;
                            push_frame_ack.Payload =
                                webcast_response.internalExt.clone().into_bytes();

                            let binary = match push_frame_ack.write_to_bytes() {
                                Ok(bytes) => bytes,
                                Err(_) => continue,
                            };

                            let message = WsMessage::Binary(binary);
                            if write_clone.lock().await.send(message).await.is_err() {
                                continue;
                            }
                        }

                        message_mapper
                            .handle_webcast_response(webcast_response, client_clone.as_ref());
                    }
                }
            }
        });

        let write_clone = write.clone();
        let running_clone = running.clone();
        tokio::spawn(async move {
            let mut interval = interval(Duration::from_secs(9));
            while running_clone.load(Ordering::SeqCst) {
                interval.tick().await;

                let heartbeat_message = WsMessage::Binary(vec![0x3a, 0x02, 0x68, 0x62]);

                match timeout(
                    Duration::from_secs(5),
                    write_clone.lock().await.send(heartbeat_message),
                )
                .await
                {
                    Ok(Ok(_)) => {
                        log::info!("Heartbeat sent");
                    }
                    Ok(Err(e)) => {
                        log::error!("Failed to send heartbeat: {:?}", e);
                        break;
                    }
                    Err(e) => {
                        log::error!("Heartbeat send timed out: {:?}", e);
                        break;
                    }
                }
            }
            log::info!("Heartbeat task stopped");
        });
        Ok(())
    }

    pub fn stop(&self) {
        self.running.store(false, Ordering::SeqCst);
    }
}