sms_client/ws/
client.rs

1//! Main WebSocket client implementation.
2
3use crate::ws::error::*;
4use crate::ws::worker::{ControlMessage, WorkerLoop};
5
6/// WebSocket client for real-time message reception.
7pub struct WebSocketClient {
8    config: crate::config::WebSocketConfig,
9    tls_config: Option<crate::config::TLSConfig>,
10    callback: Option<crate::ws::MessageCallback>,
11    control_tx: Option<tokio::sync::mpsc::UnboundedSender<ControlMessage>>,
12    worker_handle: Option<tokio::task::JoinHandle<WebsocketResult<()>>>,
13    is_connected: std::sync::Arc<tokio::sync::RwLock<bool>>,
14}
15impl WebSocketClient {
16    /// Create a new WebSocket client.
17    pub fn new(
18        config: crate::config::WebSocketConfig,
19        tls_config: Option<crate::config::TLSConfig>,
20    ) -> Self {
21        Self {
22            config,
23            tls_config,
24            callback: None,
25            control_tx: None,
26            worker_handle: None,
27            is_connected: std::sync::Arc::new(tokio::sync::RwLock::new(false)),
28        }
29    }
30
31    /// Set the message callback handler.
32    pub fn on_message<F>(&mut self, callback: F)
33    where
34        F: Fn(sms_types::events::Event) + Send + Sync + 'static,
35    {
36        self.callback = Some(std::sync::Arc::new(callback));
37    }
38
39    /// Start the WebSocket connection in the background (spawns a worker task).
40    pub async fn start_background(&mut self) -> WebsocketResult<()> {
41        if self.worker_handle.is_some() {
42            return Err(WebsocketError::AlreadyConnected);
43        }
44
45        let (control_tx, control_rx) = tokio::sync::mpsc::unbounded_channel();
46        self.control_tx = Some(control_tx);
47
48        let worker_loop = WorkerLoop::new(
49            self.config.clone(),
50            self.tls_config.clone(),
51            self.callback.clone(),
52            std::sync::Arc::clone(&self.is_connected),
53        );
54
55        let worker_handle = tokio::spawn(async move { worker_loop.run(control_rx).await });
56
57        self.worker_handle = Some(worker_handle);
58        Ok(())
59    }
60
61    /// Start the WebSocket connection and block until it closes.
62    pub async fn start_blocking(&mut self) -> WebsocketResult<()> {
63        let (control_tx, control_rx) = tokio::sync::mpsc::unbounded_channel();
64        self.control_tx = Some(control_tx);
65
66        let worker_loop = WorkerLoop::new(
67            self.config.clone(),
68            self.tls_config.clone(),
69            self.callback.clone(),
70            std::sync::Arc::clone(&self.is_connected),
71        );
72
73        // Run directly in this task (no spawn)
74        worker_loop.run(control_rx).await
75    }
76
77    /// Stop the WebSocket connection and worker.
78    pub async fn stop_background(&mut self) -> WebsocketResult<()> {
79        if let Some(tx) = &self.control_tx {
80            let _ = tx.send(ControlMessage::Stop);
81        }
82
83        if let Some(handle) = self.worker_handle.take() {
84            // Wait for worker to finish with timeout
85            let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
86        }
87
88        self.control_tx = None;
89        *self.is_connected.write().await = false;
90
91        Ok(())
92    }
93
94    /// Check if the WebSocket is currently connected.
95    pub async fn is_connected(&self) -> bool {
96        *self.is_connected.read().await
97    }
98
99    /// Force a reconnection attempt.
100    pub async fn reconnect(&self) -> WebsocketResult<()> {
101        if let Some(tx) = &self.control_tx {
102            tx.send(ControlMessage::Reconnect)
103                .map_err(|_| WebsocketError::ChannelError)?;
104            Ok(())
105        } else {
106            Err(WebsocketError::NotConnected)
107        }
108    }
109}
110impl Drop for WebSocketClient {
111    fn drop(&mut self) {
112        // Send stop signal to worker if still running.
113        if let Some(tx) = &self.control_tx {
114            let _ = tx.send(ControlMessage::Stop);
115        }
116    }
117}
118impl std::fmt::Debug for WebSocketClient {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("WebsocketClient")
121            .field("url", &self.config.url)
122            .field("is_connected", &self.is_connected)
123            .field("has_tls_config", &self.tls_config.is_some())
124            .finish()
125    }
126}