1use crate::ws::error::*;
4use crate::ws::worker::{ControlMessage, WorkerLoop};
5
6pub struct WebSocketClient {
8 config: crate::config::WebSocketConfig,
9 tls_config: Option<crate::config::TLSConfig>,
10 callback: Option<crate::ws::MessageCallback>,
11 control_tx: Option<tokio::sync::mpsc::UnboundedSender<ControlMessage>>,
12 worker_handle: Option<tokio::task::JoinHandle<WebsocketResult<()>>>,
13 is_connected: std::sync::Arc<tokio::sync::RwLock<bool>>,
14}
15impl WebSocketClient {
16 pub fn new(
18 config: crate::config::WebSocketConfig,
19 tls_config: Option<crate::config::TLSConfig>,
20 ) -> Self {
21 Self {
22 config,
23 tls_config,
24 callback: None,
25 control_tx: None,
26 worker_handle: None,
27 is_connected: std::sync::Arc::new(tokio::sync::RwLock::new(false)),
28 }
29 }
30
31 pub fn on_message<F>(&mut self, callback: F)
33 where
34 F: Fn(sms_types::events::Event) + Send + Sync + 'static,
35 {
36 self.callback = Some(std::sync::Arc::new(callback));
37 }
38
39 pub async fn start_background(&mut self) -> WebsocketResult<()> {
41 if self.worker_handle.is_some() {
42 return Err(WebsocketError::AlreadyConnected);
43 }
44
45 let (control_tx, control_rx) = tokio::sync::mpsc::unbounded_channel();
46 self.control_tx = Some(control_tx);
47
48 let worker_loop = WorkerLoop::new(
49 self.config.clone(),
50 self.tls_config.clone(),
51 self.callback.clone(),
52 std::sync::Arc::clone(&self.is_connected),
53 );
54
55 let worker_handle = tokio::spawn(async move { worker_loop.run(control_rx).await });
56
57 self.worker_handle = Some(worker_handle);
58 Ok(())
59 }
60
61 pub async fn start_blocking(&mut self) -> WebsocketResult<()> {
63 let (control_tx, control_rx) = tokio::sync::mpsc::unbounded_channel();
64 self.control_tx = Some(control_tx);
65
66 let worker_loop = WorkerLoop::new(
67 self.config.clone(),
68 self.tls_config.clone(),
69 self.callback.clone(),
70 std::sync::Arc::clone(&self.is_connected),
71 );
72
73 worker_loop.run(control_rx).await
75 }
76
77 pub async fn stop_background(&mut self) -> WebsocketResult<()> {
79 if let Some(tx) = &self.control_tx {
80 let _ = tx.send(ControlMessage::Stop);
81 }
82
83 if let Some(handle) = self.worker_handle.take() {
84 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
86 }
87
88 self.control_tx = None;
89 *self.is_connected.write().await = false;
90
91 Ok(())
92 }
93
94 pub async fn is_connected(&self) -> bool {
96 *self.is_connected.read().await
97 }
98
99 pub async fn reconnect(&self) -> WebsocketResult<()> {
101 if let Some(tx) = &self.control_tx {
102 tx.send(ControlMessage::Reconnect)
103 .map_err(|_| WebsocketError::ChannelError)?;
104 Ok(())
105 } else {
106 Err(WebsocketError::NotConnected)
107 }
108 }
109}
110impl Drop for WebSocketClient {
111 fn drop(&mut self) {
112 if let Some(tx) = &self.control_tx {
114 let _ = tx.send(ControlMessage::Stop);
115 }
116 }
117}
118impl std::fmt::Debug for WebSocketClient {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct("WebsocketClient")
121 .field("url", &self.config.url)
122 .field("is_connected", &self.is_connected)
123 .field("has_tls_config", &self.tls_config.is_some())
124 .finish()
125 }
126}