1use crate::message_parser::MessageParser;
3use crate::types::{BufferStats, *};
4use crate::fetch::client::KickApiClient;
5use crate::fetch::useragent::{generate_browser_fingerprint, get_rotating_user_agent};
6use futures_util::{SinkExt, StreamExt};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::{Mutex, RwLock};
13use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
14use tracing::{debug, error, info, warn};
15use url::Url;
16use tokio_tungstenite::tungstenite::http::Request;
17pub type EventHandler<T> = Arc<dyn Fn(T) + Send + Sync>;
21
22#[derive(Clone)]
24pub struct WebSocketManager {
25 ws: Arc<Mutex<Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>,
26 channel_name: Arc<RwLock<String>>,
27 channel_id: Arc<RwLock<u64>>,
28 connection_state: Arc<RwLock<ConnectionState>>,
29 options: Arc<RwLock<KickWebSocketOptions>>,
30 message_buffer: Arc<RwLock<std::collections::VecDeque<String>>>,
31 event_handlers: Arc<RwLock<EventHandlers>>,
32 is_manual_disconnect: Arc<RwLock<bool>>,
33 reconnect_timer: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
34 custom_websocket_url: Arc<RwLock<Option<String>>>,
35 custom_websocket_params: Arc<RwLock<Option<HashMap<String, String>>>>,
36 fetch_client: Arc<KickApiClient>,
37}
38
39#[derive(Default)]
41struct EventHandlers {
42 chat_message: Vec<EventHandler<ChatMessageEvent>>,
43 message_deleted: Vec<EventHandler<MessageDeletedEvent>>,
44 user_banned: Vec<EventHandler<UserBannedEvent>>,
45 user_unbanned: Vec<EventHandler<UserUnbannedEvent>>,
46 subscription: Vec<EventHandler<SubscriptionEvent>>,
47 gifted_subscriptions: Vec<EventHandler<GiftedSubscriptionsEvent>>,
48 pinned_message_created: Vec<EventHandler<PinnedMessageCreatedEvent>>,
49 stream_host: Vec<EventHandler<StreamHostEvent>>,
50 poll_update: Vec<EventHandler<PollUpdateEvent>>,
51 poll_delete: Vec<EventHandler<PollDeleteEvent>>,
52 raw_message: Vec<EventHandler<RawMessage>>,
53 error: Vec<EventHandler<KickError>>,
54 ready: Vec<EventHandler<()>>,
55 disconnected: Vec<EventHandler<()>>,
56}
57
58impl WebSocketManager {
59 pub fn new() -> Self {
61 Self::with_options(KickWebSocketOptions::default())
62 }
63
64 pub fn with_options(options: KickWebSocketOptions) -> Self {
66 let fetch_client = Arc::new(KickApiClient::new().expect("Failed to create KickApiClient with robust User-Agent"));
67 Self {
68 ws: Arc::new(Mutex::new(None)),
69 channel_name: Arc::new(RwLock::new(String::new())),
70 channel_id: Arc::new(RwLock::new(0)),
71 connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
72 options: Arc::new(RwLock::new(options)),
73 message_buffer: Arc::new(RwLock::new(std::collections::VecDeque::new())),
74 event_handlers: Arc::new(RwLock::new(EventHandlers::default())),
75 is_manual_disconnect: Arc::new(RwLock::new(false)),
76 reconnect_timer: Arc::new(RwLock::new(None)),
77 custom_websocket_url: Arc::new(RwLock::new(None)),
78 custom_websocket_params: Arc::new(RwLock::new(None)),
79 fetch_client,
80 }
81 }
82
83 pub async fn connect(&self, channel_name: &str) -> Result<()> {
85 info!("Connecting to channel: {}", channel_name);
86 let current_state = *self.connection_state.read().await;
87 if current_state == ConnectionState::Connected || current_state == ConnectionState::Connecting {
88 warn!("Already connected or connecting");
89 return Ok(());
90 }
91
92 *self.channel_name.write().await = channel_name.to_string();
93 *self.is_manual_disconnect.write().await = false;
94
95 if let Err(e) = self.perform_connection().await {
96 self.handle_connection_error(&e).await;
97 return Err(e);
98 }
99
100 Ok(())
101 }
102
103 async fn perform_connection(&self) -> Result<()> {
105 self.set_connection_state(ConnectionState::Connecting).await;
106 info!("Connecting to channel: {}", *self.channel_name.read().await);
107
108 let chatroom_id = self.get_chatroom_id_from_name(&*self.channel_name.read().await).await?;
110 *self.channel_id.write().await = chatroom_id;
111
112 let ws_url = self.build_websocket_url().await?;
114
115 let options = self.options.read().await;
117 let request = self.create_websocket_request(&ws_url, &*options).await?;
118 drop(options);
119
120 match connect_async(request).await {
121 Ok((ws_stream, response)) => {
122 info!("WebSocket connected! Status: {}", response.status());
123 *self.ws.lock().await = Some(ws_stream);
124
125 self.setup_websocket_handlers().await?;
127 }
128 Err(e) => {
129 error!("WebSocket connection failed: {}", e);
130 return Err(KickError::WebSocket(e));
131 }
132 }
133
134 Ok(())
135 }
136
137
138
139 async fn setup_websocket_handlers(&self) -> Result<()> {
141 let ws = self.ws.clone();
142 let channel_id = *self.channel_id.read().await;
143 let connection_state = self.connection_state.clone();
144 let message_buffer = self.message_buffer.clone();
145 let options = self.options.clone();
146 let event_handlers = self.event_handlers.clone();
147
148 tokio::spawn(async move {
149 let mut connection_established = false;
151 let mut subscription_succeeded = false;
152
153 loop {
155 let msg = {
156 let mut ws_lock = ws.lock().await;
157 if let Some(ws_stream) = ws_lock.as_mut() {
158 ws_stream.next().await
159 } else {
160 break;
161 }
162 };
163
164 match msg {
165 Some(Ok(Message::Text(text))) => {
166 debug!("Received message: {}", text);
167
168 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
170 if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
171 if event == "pusher:connection_established" && !connection_established {
173 info!("✓ Pusher connection established!");
174 connection_established = true;
175
176 let subscribe_msg = json!({
178 "event": "pusher:subscribe",
179 "data": {
180 "auth": "",
181 "channel": format!("chatrooms.{}.v2", channel_id)
182 }
183 });
184
185 info!("→ Sending subscription: {}", subscribe_msg);
186
187 let mut ws_lock = ws.lock().await;
188 if let Some(ws_stream) = ws_lock.as_mut() {
189 if let Err(e) = ws_stream.send(Message::Text(subscribe_msg.to_string())).await {
190 error!("Failed to subscribe: {}", e);
191 break;
192 }
193 }
194 drop(ws_lock); continue;
196 }
197
198 if event == "pusher_internal:subscription_succeeded" && !subscription_succeeded {
200 info!("✓ Subscription succeeded!");
201 subscription_succeeded = true;
202 *connection_state.write().await = ConnectionState::Connected;
203
204 let handlers = event_handlers.read().await;
206 for handler in &handlers.ready {
207 handler(());
208 }
209 continue;
210 }
211
212 {
214 let handlers = event_handlers.read().await;
215 let raw_msg = RawMessage {
216 event_type: event.to_string(),
217 data: parsed.get("data").unwrap_or(&serde_json::Value::Null).to_string(),
218 raw_json: text.clone(),
219 };
220 for handler in &handlers.raw_message {
221 handler(raw_msg.clone());
222 }
223 }
224 }
225 }
226
227 if options.read().await.enable_buffer {
229 let mut buffer = message_buffer.write().await;
230 if buffer.len() >= options.read().await.buffer_size {
231 buffer.pop_front();
232 }
233 buffer.push_back(text.clone());
234 }
235
236 match MessageParser::parse_message(&text) {
238 Ok(Some(parsed_message)) => {
239 let handlers = event_handlers.read().await;
240 if !options.read().await.filtered_events.contains(&parsed_message.r#type) {
241 match parsed_message.data {
242 KickEventData::ChatMessage(data) => {
243 for handler in &handlers.chat_message {
244 handler(data.clone());
245 }
246 }
247 KickEventData::MessageDeleted(data) => {
248 for handler in &handlers.message_deleted {
249 handler(data.clone());
250 }
251 }
252 KickEventData::UserBanned(data) => {
253 for handler in &handlers.user_banned {
254 handler(data.clone());
255 }
256 }
257 KickEventData::UserUnbanned(data) => {
258 for handler in &handlers.user_unbanned {
259 handler(data.clone());
260 }
261 }
262 KickEventData::Subscription(data) => {
263 for handler in &handlers.subscription {
264 handler(data.clone());
265 }
266 }
267 KickEventData::GiftedSubscriptions(data) => {
268 for handler in &handlers.gifted_subscriptions {
269 handler(data.clone());
270 }
271 }
272 KickEventData::PinnedMessageCreated(data) => {
273 for handler in &handlers.pinned_message_created {
274 handler(data.clone());
275 }
276 }
277 KickEventData::StreamHost(data) => {
278 for handler in &handlers.stream_host {
279 handler(data.clone());
280 }
281 }
282 KickEventData::PollUpdate(data) => {
283 for handler in &handlers.poll_update {
284 handler(data.clone());
285 }
286 }
287 KickEventData::PollDelete(data) => {
288 for handler in &handlers.poll_delete {
289 handler(data.clone());
290 }
291 }
292 }
293 }
294 }
295 Ok(None) => {
296 }
298 Err(e) => {
299 debug!("Parse error: {}", e);
300 }
301 }
302 }
303 Some(Ok(Message::Close(_))) => {
304 info!("WebSocket closed");
305 break;
306 }
307 Some(Ok(Message::Ping(data))) => {
308 let mut ws_lock = ws.lock().await;
309 if let Some(ws_stream) = ws_lock.as_mut() {
310 if let Err(e) = ws_stream.send(Message::Pong(data)).await {
311 error!("Failed to send pong: {}", e);
312 }
313 }
314 }
315 Some(Err(e)) => {
316 error!("WebSocket error: {}", e);
317 let handlers = event_handlers.read().await;
318 for handler in &handlers.error {
319 handler(KickError::Connection(format!("{}", e)));
320 }
321 break;
322 }
323 None => break,
324 _ => {}
325 }
326 }
327
328 *connection_state.write().await = ConnectionState::Disconnected;
330 let handlers = event_handlers.read().await;
331 for handler in &handlers.disconnected {
332 handler(());
333 }
334 });
335
336 Ok(())
337 }
338
339 pub async fn disconnect(&self) -> Result<()> {
341 *self.is_manual_disconnect.write().await = true;
342
343 if let Some(timer) = self.reconnect_timer.write().await.take() {
344 timer.abort();
345 }
346
347 let mut ws_lock = self.ws.lock().await;
348 if let Some(ws_stream) = ws_lock.as_mut() {
349 let _ = ws_stream.close(None).await;
350 }
351 *ws_lock = None;
352
353 let mut handlers = self.event_handlers.write().await;
355 handlers.chat_message.clear();
356 handlers.message_deleted.clear();
357 handlers.user_banned.clear();
358 handlers.user_unbanned.clear();
359 handlers.subscription.clear();
360 handlers.gifted_subscriptions.clear();
361 handlers.pinned_message_created.clear();
362 handlers.stream_host.clear();
363 handlers.poll_update.clear();
364 handlers.poll_delete.clear();
365 handlers.raw_message.clear();
366 handlers.error.clear();
367 handlers.ready.clear();
368 handlers.disconnected.clear();
369
370 self.set_connection_state(ConnectionState::Disconnected).await;
371 info!("Disconnected");
372 Ok(())
373 }
374
375 async fn set_connection_state(&self, state: ConnectionState) {
376 *self.connection_state.write().await = state;
377 }
378
379 async fn handle_connection_error(&self, error: &KickError) {
380 self.set_connection_state(ConnectionState::Error).await;
381 error!("Connection error: {}", error);
382 }
383
384 pub async fn on_chat_message<F>(&self, handler: F)
386 where
387 F: Fn(ChatMessageEvent) + Send + Sync + 'static,
388 {
389 self.event_handlers.write().await.chat_message.push(Arc::new(handler));
390 }
391
392 pub async fn on_message<F>(&self, handler: F)
393 where
394 F: Fn(SimpleMessage) + Send + Sync + 'static,
395 {
396 let simple_handler = move |chat_msg: ChatMessageEvent| {
397 let simple_msg = SimpleMessage::from(&chat_msg);
398 handler(simple_msg);
399 };
400 self.event_handlers.write().await.chat_message.push(Arc::new(simple_handler));
401 }
402
403 pub async fn on_ready<F>(&self, handler: F)
404 where
405 F: Fn(()) + Send + Sync + 'static,
406 {
407 self.event_handlers.write().await.ready.push(Arc::new(handler));
408 }
409
410 pub async fn on_disconnected<F>(&self, handler: F)
411 where
412 F: Fn(()) + Send + Sync + 'static,
413 {
414 self.event_handlers.write().await.disconnected.push(Arc::new(handler));
415 }
416
417 pub async fn on_raw_message<F>(&self, handler: F)
418 where
419 F: Fn(RawMessage) + Send + Sync + 'static,
420 {
421 self.event_handlers.write().await.raw_message.push(Arc::new(handler));
422 }
423
424 pub async fn on_error<F>(&self, handler: F)
425 where
426 F: Fn(KickError) + Send + Sync + 'static,
427 {
428 self.event_handlers.write().await.error.push(Arc::new(handler));
429 }
430
431 pub async fn on_message_deleted<F>(&self, handler: F)
432 where
433 F: Fn(MessageDeletedEvent) + Send + Sync + 'static,
434 {
435 self.event_handlers.write().await.message_deleted.push(Arc::new(handler));
436 }
437
438 pub async fn on_user_banned<F>(&self, handler: F)
439 where
440 F: Fn(UserBannedEvent) + Send + Sync + 'static,
441 {
442 self.event_handlers.write().await.user_banned.push(Arc::new(handler));
443 }
444
445 pub async fn on_user_unbanned<F>(&self, handler: F)
446 where
447 F: Fn(UserUnbannedEvent) + Send + Sync + 'static,
448 {
449 self.event_handlers.write().await.user_unbanned.push(Arc::new(handler));
450 }
451
452 pub async fn on_subscription<F>(&self, handler: F)
453 where
454 F: Fn(SubscriptionEvent) + Send + Sync + 'static,
455 {
456 self.event_handlers.write().await.subscription.push(Arc::new(handler));
457 }
458
459 pub async fn on_gifted_subscriptions<F>(&self, handler: F)
460 where
461 F: Fn(GiftedSubscriptionsEvent) + Send + Sync + 'static,
462 {
463 self.event_handlers.write().await.gifted_subscriptions.push(Arc::new(handler));
464 }
465
466 pub async fn on_pinned_message_created<F>(&self, handler: F)
467 where
468 F: Fn(PinnedMessageCreatedEvent) + Send + Sync + 'static,
469 {
470 self.event_handlers.write().await.pinned_message_created.push(Arc::new(handler));
471 }
472
473 pub async fn on_stream_host<F>(&self, handler: F)
474 where
475 F: Fn(StreamHostEvent) + Send + Sync + 'static,
476 {
477 self.event_handlers.write().await.stream_host.push(Arc::new(handler));
478 }
479
480 pub async fn on_poll_update<F>(&self, handler: F)
481 where
482 F: Fn(PollUpdateEvent) + Send + Sync + 'static,
483 {
484 self.event_handlers.write().await.poll_update.push(Arc::new(handler));
485 }
486
487 pub async fn on_poll_delete<F>(&self, handler: F)
488 where
489 F: Fn(PollDeleteEvent) + Send + Sync + 'static,
490 {
491 self.event_handlers.write().await.poll_delete.push(Arc::new(handler));
492 }
493
494 pub async fn get_connection_state(&self) -> ConnectionState {
496 *self.connection_state.read().await
497 }
498
499 pub async fn get_channel_name(&self) -> String {
500 self.channel_name.read().await.clone()
501 }
502
503 pub async fn get_channel_id(&self) -> u64 {
504 *self.channel_id.read().await
505 }
506
507 pub async fn build_websocket_url(&self) -> Result<Url> {
509 let base_url = if let Some(custom_url) = self.custom_websocket_url.read().await.clone() {
510 custom_url
511 } else {
512 "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679".to_string()
513 };
514
515 let mut url = Url::parse(&base_url)?;
516
517 let mut params: Vec<(String, String)> = vec![
518 ("protocol".to_string(), "7".to_string()),
519 ("client".to_string(), "js".to_string()),
520 ("version".to_string(), "8.4.0".to_string()),
521 ("flash".to_string(), "false".to_string()),
522 ];
523
524 if let Some(custom_params) = self.custom_websocket_params.read().await.clone() {
525 for (key, value) in custom_params {
526 params.push((key, value));
527 }
528 }
529
530 for (key, value) in params {
531 url.query_pairs_mut().append_pair(&key, &value);
532 }
533
534 Ok(url)
535 }
536
537 pub async fn set_websocket_url(&self, url: String) {
539 *self.custom_websocket_url.write().await = Some(url);
540 info!("Custom WebSocket URL set");
541 }
542
543 pub async fn set_websocket_params(&self, params: HashMap<String, String>) {
545 *self.custom_websocket_params.write().await = Some(params);
546 info!("Custom WebSocket params set");
547 }
548
549 pub async fn reset_websocket_config(&self) {
551 *self.custom_websocket_url.write().await = None;
552 *self.custom_websocket_params.write().await = None;
553 info!("WebSocket configuration reset to defaults");
554 }
555
556 pub async fn get_channel_id_from_name(&self, channel_name: &str) -> Result<u64> {
558 self.fetch_client.get_channel_id(channel_name).await.map_err(Into::into)
559 }
560
561 pub async fn get_chatroom_id_from_name(&self, channel_name: &str) -> Result<u64> {
563 self.fetch_client.get_chatroom_id(channel_name).await.map_err(|e| KickError::ChannelNotFound(format!("Failed to get chatroom ID: {}", e)))
564 }
565
566 pub async fn export_raw_messages(&self) -> Vec<String> {
573 self.message_buffer.read().await.iter().cloned().collect()
574 }
575
576 pub async fn get_buffer_stats(&self) -> BufferStats {
577 let buffer = self.message_buffer.read().await;
578 BufferStats {
579 total: buffer.len(),
580 by_type: HashMap::new(),
581 oldest_timestamp: None,
582 newest_timestamp: None,
583 }
584 }
585
586 pub async fn export_raw_messages_by_event_type(&self, event_type: KickEventType) -> Vec<String> {
588 let buffer = self.message_buffer.read().await;
589 buffer
590 .iter()
591 .filter(|msg| {
592 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(msg) {
593 if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
594 let mapped_type = MessageParser::map_kick_event_to_standard(event);
595 mapped_type == Some(event_type)
596 } else {
597 false
598 }
599 } else {
600 false
601 }
602 })
603 .cloned()
604 .collect()
605 }
606
607 pub async fn export_raw_messages_in_range(&self, start_index: usize, end_index: Option<usize>) -> Vec<String> {
609 let buffer = self.message_buffer.read().await;
610 let end = end_index.unwrap_or(buffer.len());
611 buffer
612 .iter()
613 .skip(start_index)
614 .take(end - start_index)
615 .cloned()
616 .collect()
617 }
618
619 pub async fn clear_raw_messages_by_event_type(&self, event_type: KickEventType) -> Result<()> {
621 let mut buffer = self.message_buffer.write().await;
622 buffer.retain(|msg| {
623 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(msg) {
624 if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
625 let should_remove = match (event, event_type) {
626 ("ChatMessage", KickEventType::ChatMessage) => true,
627 ("MessageDeleted", KickEventType::MessageDeleted) => true,
628 ("UserBanned", KickEventType::UserBanned) => true,
629 ("UserUnbanned", KickEventType::UserUnbanned) => true,
630 ("Subscription", KickEventType::Subscription) => true,
631 ("GiftedSubscriptions", KickEventType::GiftedSubscriptions) => true,
632 ("PinnedMessageCreated", KickEventType::PinnedMessageCreated) => true,
633 ("StreamHost", KickEventType::StreamHost) => true,
634 ("PollUpdate", KickEventType::PollUpdate) => true,
635 ("PollDelete", KickEventType::PollDelete) => true,
636 _ => false,
637 };
638 !should_remove } else {
640 true }
642 } else {
643 true }
645 });
646 Ok(())
647 }
648
649 async fn create_websocket_request(&self, ws_url: &Url, options: &KickWebSocketOptions) -> Result<Request<()>> {
651 let fingerprint = generate_browser_fingerprint();
653
654 let user_agent = if let Some(custom_ua) = &options.custom_user_agent {
656 custom_ua.clone()
657 } else if options.rotate_user_agent {
658 get_rotating_user_agent().to_string()
659 } else {
660 fingerprint.user_agent.clone()
661 };
662
663 let host = format!("{}:{}", ws_url.host_str().unwrap_or("kick.com"),
665 ws_url.port().unwrap_or(443));
666
667 let request = Request::builder()
669 .uri(ws_url.as_str())
670 .header("Host", &host)
671 .header("User-Agent", user_agent)
672 .header("Accept", "*/*")
673 .header("Accept-Language", "en-US,en;q=0.9")
674 .header("Cache-Control", "no-cache")
675 .header("Pragma", "no-cache")
676 .header("Sec-Ch-Ua", fingerprint.sec_ch_ua)
677 .header("Sec-Ch-Ua-Mobile", fingerprint.sec_ch_ua_mobile)
678 .header("Sec-Ch-Ua-Platform", fingerprint.sec_ch_ua_platform)
679 .header("Sec-Fetch-Dest", "websocket")
680 .header("Sec-Fetch-Mode", "websocket")
681 .header("Sec-Fetch-Site", "cross-site")
682 .header("Origin", "https://kick.com")
683 .header("Referer", "https://kick.com/")
684 .header("Connection", "Upgrade")
685 .header("Upgrade", "websocket")
686 .header("Sec-WebSocket-Key", tokio_tungstenite::tungstenite::handshake::client::generate_key())
687 .header("Sec-WebSocket-Version", "13")
688 .body(())
689 .map_err(|_e| KickError::WebSocket(tokio_tungstenite::tungstenite::Error::Http(
690 tokio_tungstenite::tungstenite::http::Response::builder()
691 .status(400)
692 .body(None)
693 .unwrap()
694 )))?;
695
696 Ok(request)
697 }
698
699 pub async fn set_custom_user_agent(&self, user_agent: String) {
701 let mut options = self.options.write().await;
702 options.custom_user_agent = Some(user_agent);
703 }
704
705 pub async fn set_user_agent_rotation(&self, enabled: bool) {
707 let mut options = self.options.write().await;
708 options.rotate_user_agent = enabled;
709 }
710
711 pub async fn get_user_agent_config(&self) -> (Option<String>, bool) {
713 let options = self.options.read().await;
714 (options.custom_user_agent.clone(), options.rotate_user_agent)
715 }
716
717 pub async fn reset_user_agent(&self) {
719 let mut options = self.options.write().await;
720 options.custom_user_agent = None;
721 options.rotate_user_agent = true;
722 }
723
724 pub async fn is_connected(&self) -> bool {
726 matches!(*self.connection_state.read().await, ConnectionState::Connected)
727 }
728
729 pub async fn has_handlers(&self) -> bool {
731 let handlers = self.event_handlers.read().await;
732 !handlers.chat_message.is_empty() ||
733 !handlers.message_deleted.is_empty() ||
734 !handlers.user_banned.is_empty() ||
735 !handlers.user_unbanned.is_empty() ||
736 !handlers.subscription.is_empty() ||
737 !handlers.gifted_subscriptions.is_empty() ||
738 !handlers.pinned_message_created.is_empty() ||
739 !handlers.stream_host.is_empty() ||
740 !handlers.poll_update.is_empty() ||
741 !handlers.poll_delete.is_empty() ||
742 !handlers.raw_message.is_empty() ||
743 !handlers.error.is_empty() ||
744 !handlers.ready.is_empty() ||
745 !handlers.disconnected.is_empty()
746 }
747
748 pub async fn connect_with_timeout(&self, channel_name: &str, timeout: Duration) -> Result<()> {
750 let connect_future = self.connect(channel_name);
751 match tokio::time::timeout(timeout, connect_future).await {
752 Ok(result) => result,
753 Err(_) => Err(KickError::Connection(format!("Connection timeout after {:?}", timeout))),
754 }
755 }
756}
757
758impl Default for WebSocketManager {
759 fn default() -> Self {
760 Self::new()
761 }
762}