tiktoklive/core/
live_client_websocket.rs

1use futures_util::{SinkExt, StreamExt};
2use log::info;
3use protobuf::Message;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::time::{interval, timeout, Duration};
8use tokio_tungstenite::tungstenite::handshake::client::Request;
9use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
10
11use crate::core::live_client::TikTokLiveClient;
12use crate::core::live_client_mapper::TikTokLiveMessageMapper;
13use crate::data::live_common::ConnectionState::CONNECTED;
14use crate::errors::LibError;
15use crate::generated::events::{TikTokConnectedEvent, TikTokLiveEvent};
16use crate::generated::messages::webcast::{WebcastPushFrame, WebcastResponse};
17use crate::http::http_data::LiveConnectionDataResponse;
18
19pub struct TikTokLiveWebsocketClient {
20    pub(crate) message_mapper: TikTokLiveMessageMapper,
21    pub(crate) running: Arc<AtomicBool>,
22}
23
24impl TikTokLiveWebsocketClient {
25    pub fn new(message_mapper: TikTokLiveMessageMapper) -> Self {
26        TikTokLiveWebsocketClient {
27            message_mapper,
28            running: Arc::new(AtomicBool::new(false)),
29        }
30    }
31
32    pub async fn start(
33        &self,
34        response: LiveConnectionDataResponse,
35        client: Arc<TikTokLiveClient>,
36    ) -> Result<(), LibError> {
37        let host = response
38            .web_socket_url
39            .host_str()
40            .ok_or(LibError::InvalidHost)?;
41
42        let request = Request::builder()
43            .method("GET")
44            .uri(response.web_socket_url.to_string())
45            .header("Host", host)
46            .header("Upgrade", "websocket")
47            .header("Connection", "keep-alive")
48            .header("Cache-Control", "max-age=0")
49            .header("Accept", "text/html,application/json,application/protobuf")
50            .header("Sec-Websocket-Key", "asd")
51            .header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36")
52            .header("Referer", "https://www.tiktok.com/")
53            .header("Origin", "https://www.tiktok.com")
54            .header("Accept-Language", "en-US,en;q=0.9")
55            .header("Accept-Encoding", "gzip, deflate")
56            .header("Cookie", response.web_socket_cookies)
57            .header("Sec-Websocket-Version", "13")
58            .body(())
59            .map_err(|_| LibError::ParamsError)?;
60
61        let (ws_stream, _) = connect_async(request)
62            .await
63            .map_err(|_| LibError::WebSocketConnectFailed)?;
64        let (write, mut read) = ws_stream.split();
65        let write = Arc::new(Mutex::new(write));
66
67        client.set_connection_state(CONNECTED);
68        client.publish_event(TikTokLiveEvent::OnConnected(TikTokConnectedEvent {}));
69
70        let running = self.running.clone();
71        running.store(true, Ordering::SeqCst);
72
73        let message_mapper = self.message_mapper.clone();
74        let client_clone = client.clone();
75        let write_clone = write.clone();
76        let running_clone = running.clone();
77
78        tokio::spawn(async move {
79            info!("Websocket connected");
80            while running_clone.load(Ordering::SeqCst) {
81                if let Some(Ok(message)) = read.next().await {
82                    if let WsMessage::Binary(buffer) = message {
83                        let mut push_frame = match WebcastPushFrame::parse_from_bytes(&buffer) {
84                            Ok(frame) => frame,
85                            Err(_) => continue,
86                        };
87
88                        let webcast_response = match WebcastResponse::parse_from_bytes(
89                            push_frame.Payload.as_mut_slice(),
90                        ) {
91                            Ok(response) => response,
92                            Err(_) => continue,
93                        };
94
95                        if webcast_response.needsAck {
96                            let mut push_frame_ack = WebcastPushFrame::new();
97                            push_frame_ack.PayloadType = "ack".to_string();
98                            push_frame_ack.LogId = push_frame.LogId;
99                            push_frame_ack.Payload =
100                                webcast_response.internalExt.clone().into_bytes();
101
102                            let binary = match push_frame_ack.write_to_bytes() {
103                                Ok(bytes) => bytes,
104                                Err(_) => continue,
105                            };
106
107                            let message = WsMessage::Binary(binary);
108                            if write_clone.lock().await.send(message).await.is_err() {
109                                continue;
110                            }
111                        }
112
113                        message_mapper
114                            .handle_webcast_response(webcast_response, client_clone.as_ref());
115                    }
116                }
117            }
118        });
119
120        let write_clone = write.clone();
121        let running_clone = running.clone();
122        tokio::spawn(async move {
123            let mut interval = interval(Duration::from_secs(9));
124            while running_clone.load(Ordering::SeqCst) {
125                interval.tick().await;
126
127                let heartbeat_message = WsMessage::Binary(vec![0x3a, 0x02, 0x68, 0x62]);
128
129                match timeout(
130                    Duration::from_secs(5),
131                    write_clone.lock().await.send(heartbeat_message),
132                )
133                .await
134                {
135                    Ok(Ok(_)) => {
136                        log::info!("Heartbeat sent");
137                    }
138                    Ok(Err(e)) => {
139                        log::error!("Failed to send heartbeat: {:?}", e);
140                        break;
141                    }
142                    Err(e) => {
143                        log::error!("Heartbeat send timed out: {:?}", e);
144                        break;
145                    }
146                }
147            }
148            log::info!("Heartbeat task stopped");
149        });
150        Ok(())
151    }
152
153    pub fn stop(&self) {
154        self.running.store(false, Ordering::SeqCst);
155    }
156}