tiktoklive/core/
live_client_websocket.rs1use futures_util::{SinkExt, StreamExt};
2use log::info;
3use protobuf::Message;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::time::{interval, timeout, Duration};
8use tokio_tungstenite::tungstenite::handshake::client::Request;
9use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
10
11use crate::core::live_client::TikTokLiveClient;
12use crate::core::live_client_mapper::TikTokLiveMessageMapper;
13use crate::data::live_common::ConnectionState::CONNECTED;
14use crate::errors::LibError;
15use crate::generated::events::{TikTokConnectedEvent, TikTokLiveEvent};
16use crate::generated::messages::webcast::{WebcastPushFrame, WebcastResponse};
17use crate::http::http_data::LiveConnectionDataResponse;
18
19pub struct TikTokLiveWebsocketClient {
20 pub(crate) message_mapper: TikTokLiveMessageMapper,
21 pub(crate) running: Arc<AtomicBool>,
22}
23
24impl TikTokLiveWebsocketClient {
25 pub fn new(message_mapper: TikTokLiveMessageMapper) -> Self {
26 TikTokLiveWebsocketClient {
27 message_mapper,
28 running: Arc::new(AtomicBool::new(false)),
29 }
30 }
31
32 pub async fn start(
33 &self,
34 response: LiveConnectionDataResponse,
35 client: Arc<TikTokLiveClient>,
36 ) -> Result<(), LibError> {
37 let host = response
38 .web_socket_url
39 .host_str()
40 .ok_or(LibError::InvalidHost)?;
41
42 let request = Request::builder()
43 .method("GET")
44 .uri(response.web_socket_url.to_string())
45 .header("Host", host)
46 .header("Upgrade", "websocket")
47 .header("Connection", "keep-alive")
48 .header("Cache-Control", "max-age=0")
49 .header("Accept", "text/html,application/json,application/protobuf")
50 .header("Sec-Websocket-Key", "asd")
51 .header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36")
52 .header("Referer", "https://www.tiktok.com/")
53 .header("Origin", "https://www.tiktok.com")
54 .header("Accept-Language", "en-US,en;q=0.9")
55 .header("Accept-Encoding", "gzip, deflate")
56 .header("Cookie", response.web_socket_cookies)
57 .header("Sec-Websocket-Version", "13")
58 .body(())
59 .map_err(|_| LibError::ParamsError)?;
60
61 let (ws_stream, _) = connect_async(request)
62 .await
63 .map_err(|_| LibError::WebSocketConnectFailed)?;
64 let (write, mut read) = ws_stream.split();
65 let write = Arc::new(Mutex::new(write));
66
67 client.set_connection_state(CONNECTED);
68 client.publish_event(TikTokLiveEvent::OnConnected(TikTokConnectedEvent {}));
69
70 let running = self.running.clone();
71 running.store(true, Ordering::SeqCst);
72
73 let message_mapper = self.message_mapper.clone();
74 let client_clone = client.clone();
75 let write_clone = write.clone();
76 let running_clone = running.clone();
77
78 tokio::spawn(async move {
79 info!("Websocket connected");
80 while running_clone.load(Ordering::SeqCst) {
81 if let Some(Ok(message)) = read.next().await {
82 if let WsMessage::Binary(buffer) = message {
83 let mut push_frame = match WebcastPushFrame::parse_from_bytes(&buffer) {
84 Ok(frame) => frame,
85 Err(_) => continue,
86 };
87
88 let webcast_response = match WebcastResponse::parse_from_bytes(
89 push_frame.Payload.as_mut_slice(),
90 ) {
91 Ok(response) => response,
92 Err(_) => continue,
93 };
94
95 if webcast_response.needsAck {
96 let mut push_frame_ack = WebcastPushFrame::new();
97 push_frame_ack.PayloadType = "ack".to_string();
98 push_frame_ack.LogId = push_frame.LogId;
99 push_frame_ack.Payload =
100 webcast_response.internalExt.clone().into_bytes();
101
102 let binary = match push_frame_ack.write_to_bytes() {
103 Ok(bytes) => bytes,
104 Err(_) => continue,
105 };
106
107 let message = WsMessage::Binary(binary);
108 if write_clone.lock().await.send(message).await.is_err() {
109 continue;
110 }
111 }
112
113 message_mapper
114 .handle_webcast_response(webcast_response, client_clone.as_ref());
115 }
116 }
117 }
118 });
119
120 let write_clone = write.clone();
121 let running_clone = running.clone();
122 tokio::spawn(async move {
123 let mut interval = interval(Duration::from_secs(9));
124 while running_clone.load(Ordering::SeqCst) {
125 interval.tick().await;
126
127 let heartbeat_message = WsMessage::Binary(vec![0x3a, 0x02, 0x68, 0x62]);
128
129 match timeout(
130 Duration::from_secs(5),
131 write_clone.lock().await.send(heartbeat_message),
132 )
133 .await
134 {
135 Ok(Ok(_)) => {
136 log::info!("Heartbeat sent");
137 }
138 Ok(Err(e)) => {
139 log::error!("Failed to send heartbeat: {:?}", e);
140 break;
141 }
142 Err(e) => {
143 log::error!("Heartbeat send timed out: {:?}", e);
144 break;
145 }
146 }
147 }
148 log::info!("Heartbeat task stopped");
149 });
150 Ok(())
151 }
152
153 pub fn stop(&self) {
154 self.running.store(false, Ordering::SeqCst);
155 }
156}