kick_rust/
websocket_manager.rs

1//! WebSocket manager for handling Kick.com WebSocket connections
2use crate::message_parser::MessageParser;
3use crate::types::{BufferStats, *};
4use crate::fetch::client::KickApiClient;
5use crate::fetch::useragent::{generate_browser_fingerprint, get_rotating_user_agent};
6use futures_util::{SinkExt, StreamExt};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, RwLock};
13use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
14use tracing::{debug, error, info, warn};
15use url::Url;
16use tokio_tungstenite::tungstenite::http::Request;
17// use tokio_tungstenite::tungstenite::client::IntoClientRequest; // No longer needed
18
19/// Event handler callback type
20pub type EventHandler<T> = Arc<dyn Fn(T) + Send + Sync>;
21
22/// WebSocket manager for Kick.com
23#[derive(Clone)]
24pub struct WebSocketManager {
25    ws: Arc<Mutex<Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>,
26    channel_name: Arc<RwLock<String>>,
27    channel_id: Arc<RwLock<u64>>,
28    connection_state: Arc<RwLock<ConnectionState>>,
29    options: Arc<RwLock<KickWebSocketOptions>>,
30    message_buffer: Arc<RwLock<std::collections::VecDeque<String>>>,
31    event_handlers: Arc<RwLock<EventHandlers>>,
32    is_manual_disconnect: Arc<RwLock<bool>>,
33    reconnect_timer: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
34    custom_websocket_url: Arc<RwLock<Option<String>>>,
35    custom_websocket_params: Arc<RwLock<Option<HashMap<String, String>>>>,
36    fetch_client: Arc<KickApiClient>,
37}
38
39/// Container for event handlers
40#[derive(Default)]
41struct EventHandlers {
42    chat_message: Vec<EventHandler<ChatMessageEvent>>,
43    message_deleted: Vec<EventHandler<MessageDeletedEvent>>,
44    user_banned: Vec<EventHandler<UserBannedEvent>>,
45    user_unbanned: Vec<EventHandler<UserUnbannedEvent>>,
46    subscription: Vec<EventHandler<SubscriptionEvent>>,
47    gifted_subscriptions: Vec<EventHandler<GiftedSubscriptionsEvent>>,
48    pinned_message_created: Vec<EventHandler<PinnedMessageCreatedEvent>>,
49    stream_host: Vec<EventHandler<StreamHostEvent>>,
50    poll_update: Vec<EventHandler<PollUpdateEvent>>,
51    poll_delete: Vec<EventHandler<PollDeleteEvent>>,
52    raw_message: Vec<EventHandler<RawMessage>>,
53    error: Vec<EventHandler<KickError>>,
54    ready: Vec<EventHandler<()>>,
55    disconnected: Vec<EventHandler<()>>,
56}
57
58impl WebSocketManager {
59    /// Create a new WebSocket manager with default options
60    pub fn new() -> Self {
61        Self::with_options(KickWebSocketOptions::default())
62    }
63
64    /// Create a new WebSocket manager with custom options
65    pub fn with_options(options: KickWebSocketOptions) -> Self {
66        let fetch_client = Arc::new(KickApiClient::new().expect("Failed to create KickApiClient with robust User-Agent"));
67        Self {
68            ws: Arc::new(Mutex::new(None)),
69            channel_name: Arc::new(RwLock::new(String::new())),
70            channel_id: Arc::new(RwLock::new(0)),
71            connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
72            options: Arc::new(RwLock::new(options)),
73            message_buffer: Arc::new(RwLock::new(std::collections::VecDeque::new())),
74            event_handlers: Arc::new(RwLock::new(EventHandlers::default())),
75            is_manual_disconnect: Arc::new(RwLock::new(false)),
76            reconnect_timer: Arc::new(RwLock::new(None)),
77            custom_websocket_url: Arc::new(RwLock::new(None)),
78            custom_websocket_params: Arc::new(RwLock::new(None)),
79            fetch_client,
80        }
81    }
82
83    /// Connect to a specific channel's WebSocket
84    pub async fn connect(&self, channel_name: &str) -> Result<()> {
85        info!("Connecting to channel: {}", channel_name);
86        let current_state = *self.connection_state.read().await;
87        if current_state == ConnectionState::Connected || current_state == ConnectionState::Connecting {
88            warn!("Already connected or connecting");
89            return Ok(());
90        }
91
92        *self.channel_name.write().await = channel_name.to_string();
93        *self.is_manual_disconnect.write().await = false;
94
95        if let Err(e) = self.perform_connection().await {
96            self.handle_connection_error(&e).await;
97            return Err(e);
98        }
99
100        Ok(())
101    }
102
103    /// Perform the actual WebSocket connection
104    async fn perform_connection(&self) -> Result<()> {
105        self.set_connection_state(ConnectionState::Connecting).await;
106        info!("Connecting to channel: {}", *self.channel_name.read().await);
107
108        // Get chatroom ID for WebSocket subscription
109        let chatroom_id = self.get_chatroom_id_from_name(&*self.channel_name.read().await).await?;
110        *self.channel_id.write().await = chatroom_id;
111
112        // Build WebSocket URL
113        let ws_url = self.build_websocket_url().await?;
114
115        // Create WebSocket connection with realistic headers
116        let options = self.options.read().await;
117        let request = self.create_websocket_request(&ws_url, &*options).await?;
118        drop(options);
119
120        match connect_async(request).await {
121            Ok((ws_stream, response)) => {
122                info!("WebSocket connected! Status: {}", response.status());
123                *self.ws.lock().await = Some(ws_stream);
124
125                // Setup WebSocket handlers
126                self.setup_websocket_handlers().await?;
127            }
128            Err(e) => {
129                error!("WebSocket connection failed: {}", e);
130                return Err(KickError::WebSocket(e));
131            }
132        }
133
134        Ok(())
135    }
136
137
138
139    /// Setup WebSocket message handlers
140    async fn setup_websocket_handlers(&self) -> Result<()> {
141        let ws = self.ws.clone();
142        let channel_id = *self.channel_id.read().await;
143        let connection_state = self.connection_state.clone();
144        let message_buffer = self.message_buffer.clone();
145        let options = self.options.clone();
146        let event_handlers = self.event_handlers.clone();
147
148        tokio::spawn(async move {
149            // Estado para controlar la suscripción
150            let mut connection_established = false;
151            let mut subscription_succeeded = false;
152
153            // Handle messages
154            loop {
155                let msg = {
156                    let mut ws_lock = ws.lock().await;
157                    if let Some(ws_stream) = ws_lock.as_mut() {
158                        ws_stream.next().await
159                    } else {
160                        break;
161                    }
162                };
163
164                match msg {
165                    Some(Ok(Message::Text(text))) => {
166                        debug!("Received message: {}", text);
167
168                        // Parsear el mensaje
169                        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
170                            if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
171                                // 1. Detectar pusher:connection_established
172                                if event == "pusher:connection_established" && !connection_established {
173                                    info!("✓ Pusher connection established!");
174                                    connection_established = true;
175
176                                    // ENVIAR SUSCRIPCIÓN INMEDIATAMENTE
177                                    let subscribe_msg = json!({
178                                        "event": "pusher:subscribe",
179                                        "data": {
180                                            "auth": "",
181                                            "channel": format!("chatrooms.{}.v2", channel_id)
182                                        }
183                                    });
184
185                                    info!("→ Sending subscription: {}", subscribe_msg);
186
187                                    let mut ws_lock = ws.lock().await;
188                                    if let Some(ws_stream) = ws_lock.as_mut() {
189                                        if let Err(e) = ws_stream.send(Message::Text(subscribe_msg.to_string())).await {
190                                            error!("Failed to subscribe: {}", e);
191                                            break;
192                                        }
193                                    }
194                                    drop(ws_lock); // Liberar el lock explícitamente
195                                    continue;
196                                }
197
198                                // 2. Detectar pusher_internal:subscription_succeeded
199                                if event == "pusher_internal:subscription_succeeded" && !subscription_succeeded {
200                                    info!("✓ Subscription succeeded!");
201                                    subscription_succeeded = true;
202                                    *connection_state.write().await = ConnectionState::Connected;
203
204                                    // Emitir evento ready
205                                    let handlers = event_handlers.read().await;
206                                    for handler in &handlers.ready {
207                                        handler(());
208                                    }
209                                    continue;
210                                }
211
212                                // Emitir raw message
213                                {
214                                    let handlers = event_handlers.read().await;
215                                    let raw_msg = RawMessage {
216                                        event_type: event.to_string(),
217                                        data: parsed.get("data").unwrap_or(&serde_json::Value::Null).to_string(),
218                                        raw_json: text.clone(),
219                                    };
220                                    for handler in &handlers.raw_message {
221                                        handler(raw_msg.clone());
222                                    }
223                                }
224                            }
225                        }
226
227                        // Agregar al buffer si está habilitado
228                        if options.read().await.enable_buffer {
229                            let mut buffer = message_buffer.write().await;
230                            if buffer.len() >= options.read().await.buffer_size {
231                                buffer.pop_front();
232                            }
233                            buffer.push_back(text.clone());
234                        }
235
236                        // Parsear y manejar eventos de Kick
237                        match MessageParser::parse_message(&text) {
238                            Ok(Some(parsed_message)) => {
239                                let handlers = event_handlers.read().await;
240                                if !options.read().await.filtered_events.contains(&parsed_message.r#type) {
241                                    match parsed_message.data {
242                                        KickEventData::ChatMessage(data) => {
243                                            for handler in &handlers.chat_message {
244                                                handler(data.clone());
245                                            }
246                                        }
247                                        KickEventData::MessageDeleted(data) => {
248                                            for handler in &handlers.message_deleted {
249                                                handler(data.clone());
250                                            }
251                                        }
252                                        KickEventData::UserBanned(data) => {
253                                            for handler in &handlers.user_banned {
254                                                handler(data.clone());
255                                            }
256                                        }
257                                        KickEventData::UserUnbanned(data) => {
258                                            for handler in &handlers.user_unbanned {
259                                                handler(data.clone());
260                                            }
261                                        }
262                                        KickEventData::Subscription(data) => {
263                                            for handler in &handlers.subscription {
264                                                handler(data.clone());
265                                            }
266                                        }
267                                        KickEventData::GiftedSubscriptions(data) => {
268                                            for handler in &handlers.gifted_subscriptions {
269                                                handler(data.clone());
270                                            }
271                                        }
272                                        KickEventData::PinnedMessageCreated(data) => {
273                                            for handler in &handlers.pinned_message_created {
274                                                handler(data.clone());
275                                            }
276                                        }
277                                        KickEventData::StreamHost(data) => {
278                                            for handler in &handlers.stream_host {
279                                                handler(data.clone());
280                                            }
281                                        }
282                                        KickEventData::PollUpdate(data) => {
283                                            for handler in &handlers.poll_update {
284                                                handler(data.clone());
285                                            }
286                                        }
287                                        KickEventData::PollDelete(data) => {
288                                            for handler in &handlers.poll_delete {
289                                                handler(data.clone());
290                                            }
291                                        }
292                                    }
293                                }
294                            }
295                            Ok(None) => {
296                                // Evento de sistema de Pusher, ignorar
297                            }
298                            Err(e) => {
299                                debug!("Parse error: {}", e);
300                            }
301                        }
302                    }
303                    Some(Ok(Message::Close(_))) => {
304                        info!("WebSocket closed");
305                        break;
306                    }
307                    Some(Ok(Message::Ping(data))) => {
308                        let mut ws_lock = ws.lock().await;
309                        if let Some(ws_stream) = ws_lock.as_mut() {
310                            if let Err(e) = ws_stream.send(Message::Pong(data)).await {
311                                error!("Failed to send pong: {}", e);
312                            }
313                        }
314                    }
315                    Some(Err(e)) => {
316                        error!("WebSocket error: {}", e);
317                        let handlers = event_handlers.read().await;
318                        for handler in &handlers.error {
319                            handler(KickError::Connection(format!("{}", e)));
320                        }
321                        break;
322                    }
323                    None => break,
324                    _ => {}
325                }
326            }
327
328            // Manejar desconexión
329            *connection_state.write().await = ConnectionState::Disconnected;
330            let handlers = event_handlers.read().await;
331            for handler in &handlers.disconnected {
332                handler(());
333            }
334        });
335
336        Ok(())
337    }
338
339    /// Disconnect from the WebSocket
340    pub async fn disconnect(&self) -> Result<()> {
341        *self.is_manual_disconnect.write().await = true;
342
343        if let Some(timer) = self.reconnect_timer.write().await.take() {
344            timer.abort();
345        }
346
347        let mut ws_lock = self.ws.lock().await;
348        if let Some(ws_stream) = ws_lock.as_mut() {
349            let _ = ws_stream.close(None).await;
350        }
351        *ws_lock = None;
352
353        // Clear all event handlers
354        let mut handlers = self.event_handlers.write().await;
355        handlers.chat_message.clear();
356        handlers.message_deleted.clear();
357        handlers.user_banned.clear();
358        handlers.user_unbanned.clear();
359        handlers.subscription.clear();
360        handlers.gifted_subscriptions.clear();
361        handlers.pinned_message_created.clear();
362        handlers.stream_host.clear();
363        handlers.poll_update.clear();
364        handlers.poll_delete.clear();
365        handlers.raw_message.clear();
366        handlers.error.clear();
367        handlers.ready.clear();
368        handlers.disconnected.clear();
369
370        self.set_connection_state(ConnectionState::Disconnected).await;
371        info!("Disconnected");
372        Ok(())
373    }
374
375    async fn set_connection_state(&self, state: ConnectionState) {
376        *self.connection_state.write().await = state;
377    }
378
379    async fn handle_connection_error(&self, error: &KickError) {
380        self.set_connection_state(ConnectionState::Error).await;
381        error!("Connection error: {}", error);
382    }
383
384    // Event handler registration methods
385    pub async fn on_chat_message<F>(&self, handler: F)
386    where
387        F: Fn(ChatMessageEvent) + Send + Sync + 'static,
388    {
389        self.event_handlers.write().await.chat_message.push(Arc::new(handler));
390    }
391
392    pub async fn on_message<F>(&self, handler: F)
393    where
394        F: Fn(SimpleMessage) + Send + Sync + 'static,
395    {
396        let simple_handler = move |chat_msg: ChatMessageEvent| {
397            let simple_msg = SimpleMessage::from(&chat_msg);
398            handler(simple_msg);
399        };
400        self.event_handlers.write().await.chat_message.push(Arc::new(simple_handler));
401    }
402
403    pub async fn on_ready<F>(&self, handler: F)
404    where
405        F: Fn(()) + Send + Sync + 'static,
406    {
407        self.event_handlers.write().await.ready.push(Arc::new(handler));
408    }
409
410    pub async fn on_disconnected<F>(&self, handler: F)
411    where
412        F: Fn(()) + Send + Sync + 'static,
413    {
414        self.event_handlers.write().await.disconnected.push(Arc::new(handler));
415    }
416
417    pub async fn on_raw_message<F>(&self, handler: F)
418    where
419        F: Fn(RawMessage) + Send + Sync + 'static,
420    {
421        self.event_handlers.write().await.raw_message.push(Arc::new(handler));
422    }
423
424    pub async fn on_error<F>(&self, handler: F)
425    where
426        F: Fn(KickError) + Send + Sync + 'static,
427    {
428        self.event_handlers.write().await.error.push(Arc::new(handler));
429    }
430
431    pub async fn on_message_deleted<F>(&self, handler: F)
432    where
433        F: Fn(MessageDeletedEvent) + Send + Sync + 'static,
434    {
435        self.event_handlers.write().await.message_deleted.push(Arc::new(handler));
436    }
437
438    pub async fn on_user_banned<F>(&self, handler: F)
439    where
440        F: Fn(UserBannedEvent) + Send + Sync + 'static,
441    {
442        self.event_handlers.write().await.user_banned.push(Arc::new(handler));
443    }
444
445    pub async fn on_user_unbanned<F>(&self, handler: F)
446    where
447        F: Fn(UserUnbannedEvent) + Send + Sync + 'static,
448    {
449        self.event_handlers.write().await.user_unbanned.push(Arc::new(handler));
450    }
451
452    pub async fn on_subscription<F>(&self, handler: F)
453    where
454        F: Fn(SubscriptionEvent) + Send + Sync + 'static,
455    {
456        self.event_handlers.write().await.subscription.push(Arc::new(handler));
457    }
458
459    pub async fn on_gifted_subscriptions<F>(&self, handler: F)
460    where
461        F: Fn(GiftedSubscriptionsEvent) + Send + Sync + 'static,
462    {
463        self.event_handlers.write().await.gifted_subscriptions.push(Arc::new(handler));
464    }
465
466    pub async fn on_pinned_message_created<F>(&self, handler: F)
467    where
468        F: Fn(PinnedMessageCreatedEvent) + Send + Sync + 'static,
469    {
470        self.event_handlers.write().await.pinned_message_created.push(Arc::new(handler));
471    }
472
473    pub async fn on_stream_host<F>(&self, handler: F)
474    where
475        F: Fn(StreamHostEvent) + Send + Sync + 'static,
476    {
477        self.event_handlers.write().await.stream_host.push(Arc::new(handler));
478    }
479
480    pub async fn on_poll_update<F>(&self, handler: F)
481    where
482        F: Fn(PollUpdateEvent) + Send + Sync + 'static,
483    {
484        self.event_handlers.write().await.poll_update.push(Arc::new(handler));
485    }
486
487    pub async fn on_poll_delete<F>(&self, handler: F)
488    where
489        F: Fn(PollDeleteEvent) + Send + Sync + 'static,
490    {
491        self.event_handlers.write().await.poll_delete.push(Arc::new(handler));
492    }
493
494    // Getters
495    pub async fn get_connection_state(&self) -> ConnectionState {
496        *self.connection_state.read().await
497    }
498
499    pub async fn get_channel_name(&self) -> String {
500        self.channel_name.read().await.clone()
501    }
502
503    pub async fn get_channel_id(&self) -> u64 {
504        *self.channel_id.read().await
505    }
506
507    /// Build the WebSocket URL with parameters (public method for testing)
508    pub async fn build_websocket_url(&self) -> Result<Url> {
509        let base_url = if let Some(custom_url) = self.custom_websocket_url.read().await.clone() {
510            custom_url
511        } else {
512            "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679".to_string()
513        };
514
515        let mut url = Url::parse(&base_url)?;
516
517        let mut params: Vec<(String, String)> = vec![
518            ("protocol".to_string(), "7".to_string()),
519            ("client".to_string(), "js".to_string()),
520            ("version".to_string(), "8.4.0".to_string()),
521            ("flash".to_string(), "false".to_string()),
522        ];
523
524        if let Some(custom_params) = self.custom_websocket_params.read().await.clone() {
525            for (key, value) in custom_params {
526                params.push((key, value));
527            }
528        }
529
530        for (key, value) in params {
531            url.query_pairs_mut().append_pair(&key, &value);
532        }
533
534        Ok(url)
535    }
536
537    /// Set a custom WebSocket URL
538    pub async fn set_websocket_url(&self, url: String) {
539        *self.custom_websocket_url.write().await = Some(url);
540        info!("Custom WebSocket URL set");
541    }
542
543    /// Set custom WebSocket parameters
544    pub async fn set_websocket_params(&self, params: HashMap<String, String>) {
545        *self.custom_websocket_params.write().await = Some(params);
546        info!("Custom WebSocket params set");
547    }
548
549    /// Reset WebSocket configuration to defaults
550    pub async fn reset_websocket_config(&self) {
551        *self.custom_websocket_url.write().await = None;
552        *self.custom_websocket_params.write().await = None;
553        info!("WebSocket configuration reset to defaults");
554    }
555
556    /// Get channel ID from channel name (public method for testing)
557    pub async fn get_channel_id_from_name(&self, channel_name: &str) -> Result<u64> {
558        self.fetch_client.get_channel_id(channel_name).await.map_err(Into::into)
559    }
560
561    /// Get chatroom ID for WebSocket subscription
562    pub async fn get_chatroom_id_from_name(&self, channel_name: &str) -> Result<u64> {
563        self.fetch_client.get_chatroom_id(channel_name).await.map_err(|e| KickError::ChannelNotFound(format!("Failed to get chatroom ID: {}", e)))
564    }
565
566    /// Clear the channel ID cache
567
568
569
570
571
572    pub async fn export_raw_messages(&self) -> Vec<String> {
573        self.message_buffer.read().await.iter().cloned().collect()
574    }
575
576    pub async fn get_buffer_stats(&self) -> BufferStats {
577        let buffer = self.message_buffer.read().await;
578        BufferStats {
579            total: buffer.len(),
580            by_type: HashMap::new(),
581            oldest_timestamp: None,
582            newest_timestamp: None,
583        }
584    }
585
586    /// Export raw messages by event type
587    pub async fn export_raw_messages_by_event_type(&self, event_type: KickEventType) -> Vec<String> {
588        let buffer = self.message_buffer.read().await;
589        buffer
590            .iter()
591            .filter(|msg| {
592                if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(msg) {
593                    if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
594                        let mapped_type = MessageParser::map_kick_event_to_standard(event);
595                        mapped_type == Some(event_type)
596                    } else {
597                        false
598                    }
599                } else {
600                    false
601                }
602            })
603            .cloned()
604            .collect()
605    }
606
607    /// Export raw messages in range
608    pub async fn export_raw_messages_in_range(&self, start_index: usize, end_index: Option<usize>) -> Vec<String> {
609        let buffer = self.message_buffer.read().await;
610        let end = end_index.unwrap_or(buffer.len());
611        buffer
612            .iter()
613            .skip(start_index)
614            .take(end - start_index)
615            .cloned()
616            .collect()
617    }
618
619    /// Clear raw messages by event type
620    pub async fn clear_raw_messages_by_event_type(&self, event_type: KickEventType) -> Result<()> {
621        let mut buffer = self.message_buffer.write().await;
622        buffer.retain(|msg| {
623            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(msg) {
624                if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
625                    let should_remove = match (event, event_type) {
626                        ("ChatMessage", KickEventType::ChatMessage) => true,
627                        ("MessageDeleted", KickEventType::MessageDeleted) => true,
628                        ("UserBanned", KickEventType::UserBanned) => true,
629                        ("UserUnbanned", KickEventType::UserUnbanned) => true,
630                        ("Subscription", KickEventType::Subscription) => true,
631                        ("GiftedSubscriptions", KickEventType::GiftedSubscriptions) => true,
632                        ("PinnedMessageCreated", KickEventType::PinnedMessageCreated) => true,
633                        ("StreamHost", KickEventType::StreamHost) => true,
634                        ("PollUpdate", KickEventType::PollUpdate) => true,
635                        ("PollDelete", KickEventType::PollDelete) => true,
636                        _ => false,
637                    };
638                    !should_remove // Keep if not matching
639                } else {
640                    true // Keep if no event field
641                }
642            } else {
643                true // Keep if not valid JSON
644            }
645        });
646        Ok(())
647    }
648
649    /// Create a WebSocket request with realistic browser headers
650    async fn create_websocket_request(&self, ws_url: &Url, options: &KickWebSocketOptions) -> Result<Request<()>> {
651        // Generate realistic browser fingerprint
652        let fingerprint = generate_browser_fingerprint();
653
654        // Use custom User-Agent if provided, otherwise use rotating one
655        let user_agent = if let Some(custom_ua) = &options.custom_user_agent {
656            custom_ua.clone()
657        } else if options.rotate_user_agent {
658            get_rotating_user_agent().to_string()
659        } else {
660            fingerprint.user_agent.clone()
661        };
662
663        // Extract host from URL
664        let host = format!("{}:{}", ws_url.host_str().unwrap_or("kick.com"),
665                          ws_url.port().unwrap_or(443));
666
667        // Create request with headers
668        let request = Request::builder()
669            .uri(ws_url.as_str())
670            .header("Host", &host)
671            .header("User-Agent", user_agent)
672            .header("Accept", "*/*")
673            .header("Accept-Language", "en-US,en;q=0.9")
674            .header("Cache-Control", "no-cache")
675            .header("Pragma", "no-cache")
676            .header("Sec-Ch-Ua", fingerprint.sec_ch_ua)
677            .header("Sec-Ch-Ua-Mobile", fingerprint.sec_ch_ua_mobile)
678            .header("Sec-Ch-Ua-Platform", fingerprint.sec_ch_ua_platform)
679            .header("Sec-Fetch-Dest", "websocket")
680            .header("Sec-Fetch-Mode", "websocket")
681            .header("Sec-Fetch-Site", "cross-site")
682            .header("Origin", "https://kick.com")
683            .header("Referer", "https://kick.com/")
684            .header("Connection", "Upgrade")
685            .header("Upgrade", "websocket")
686            .header("Sec-WebSocket-Key", tokio_tungstenite::tungstenite::handshake::client::generate_key())
687            .header("Sec-WebSocket-Version", "13")
688            .body(())
689                .map_err(|_e| KickError::WebSocket(tokio_tungstenite::tungstenite::Error::Http(
690                    tokio_tungstenite::tungstenite::http::Response::builder()
691                        .status(400)
692                        .body(None)
693                        .unwrap()
694                )))?;
695
696        Ok(request)
697    }
698
699    /// Set a custom User-Agent for WebSocket connections
700    pub async fn set_custom_user_agent(&self, user_agent: String) {
701        let mut options = self.options.write().await;
702        options.custom_user_agent = Some(user_agent);
703    }
704
705    /// Enable or disable User-Agent rotation for WebSocket connections
706    pub async fn set_user_agent_rotation(&self, enabled: bool) {
707        let mut options = self.options.write().await;
708        options.rotate_user_agent = enabled;
709    }
710
711    /// Get current User-Agent configuration
712    pub async fn get_user_agent_config(&self) -> (Option<String>, bool) {
713        let options = self.options.read().await;
714        (options.custom_user_agent.clone(), options.rotate_user_agent)
715    }
716
717    /// Reset User-Agent to default behavior (rotation enabled)
718    pub async fn reset_user_agent(&self) {
719        let mut options = self.options.write().await;
720        options.custom_user_agent = None;
721        options.rotate_user_agent = true;
722    }
723
724    /// Check if the client is connected to WebSocket
725    pub async fn is_connected(&self) -> bool {
726        matches!(*self.connection_state.read().await, ConnectionState::Connected)
727    }
728
729    /// Check if any event handlers are registered
730    pub async fn has_handlers(&self) -> bool {
731        let handlers = self.event_handlers.read().await;
732        !handlers.chat_message.is_empty() ||
733        !handlers.message_deleted.is_empty() ||
734        !handlers.user_banned.is_empty() ||
735        !handlers.user_unbanned.is_empty() ||
736        !handlers.subscription.is_empty() ||
737        !handlers.gifted_subscriptions.is_empty() ||
738        !handlers.pinned_message_created.is_empty() ||
739        !handlers.stream_host.is_empty() ||
740        !handlers.poll_update.is_empty() ||
741        !handlers.poll_delete.is_empty() ||
742        !handlers.raw_message.is_empty() ||
743        !handlers.error.is_empty() ||
744        !handlers.ready.is_empty() ||
745        !handlers.disconnected.is_empty()
746    }
747
748    /// Connect to a channel with timeout
749    pub async fn connect_with_timeout(&self, channel_name: &str, timeout: Duration) -> Result<()> {
750        let connect_future = self.connect(channel_name);
751        match tokio::time::timeout(timeout, connect_future).await {
752            Ok(result) => result,
753            Err(_) => Err(KickError::Connection(format!("Connection timeout after {:?}", timeout))),
754        }
755    }
756}
757
758impl Default for WebSocketManager {
759    fn default() -> Self {
760        Self::new()
761    }
762}