iflow_cli_sdk_rust/
websocket_transport.rs

1//! WebSocket transport implementation for iFlow SDK
2//!
3//! This module provides the low-level WebSocket communication layer.
4//! It handles connection management, message sending/receiving, and
5//! basic error handling.
6
7use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15/// WebSocket transport for iFlow communication
16///
17/// This class provides a low-level WebSocket interface for communicating
18/// with iFlow. It handles connection management, message serialization,
19/// and error recovery.
20pub struct WebSocketTransport {
21    /// WebSocket URL to connect to
22    url: String,
23    /// Active WebSocket connection (if connected)
24    websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25    /// Whether the transport is currently connected
26    connected: bool,
27    /// Connection timeout in seconds
28    timeout: f64,
29}
30
31impl WebSocketTransport {
32    /// Initialize WebSocket transport
33    ///
34    /// # Arguments
35    /// * `url` - WebSocket URL (e.g., ws://localhost:8090/acp?peer=iflow)
36    /// * `timeout` - Connection timeout in seconds
37    pub fn new(url: String, timeout: f64) -> Self {
38        Self {
39            url,
40            websocket: None,
41            connected: false,
42            timeout,
43        }
44    }
45
46    /// Establish WebSocket connection
47    ///
48    /// # Returns
49    /// * `Ok(())` if the connection was successful
50    /// * `Err(IFlowError)` if the connection failed
51    pub async fn connect(&mut self) -> Result<()> {
52        if self.connected {
53            tracing::warn!("Already connected to {}", self.url);
54            return Ok(());
55        }
56
57        debug!("Connecting to {}", self.url);
58
59        // Parse URL to validate it
60        let url = Url::parse(&self.url)
61            .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63        // Attempt to connect with timeout
64        let (ws_stream, _) =
65            tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(url))
66                .await
67                .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68                .map_err(|e| {
69                    IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70                })?;
71
72        self.websocket = Some(ws_stream);
73        self.connected = true;
74        debug!("Connected to {}", self.url);
75
76        Ok(())
77    }
78
79    /// Send a message through WebSocket
80    ///
81    /// # Arguments
82    /// * `message` - Message to send (string or JSON Value)
83    ///
84    /// # Returns
85    /// * `Ok(())` if the message was sent successfully
86    /// * `Err(IFlowError)` if there was an error
87    pub async fn send(&mut self, message: &Value) -> Result<()> {
88        if !self.connected {
89            return Err(IFlowError::NotConnected);
90        }
91
92        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94        // Serialize message to JSON string
95        let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97        // Send the message
98        ws_stream
99            .send(Message::Text(data.clone()))
100            .await
101            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103        tracing::debug!(
104            "Sent message: {}",
105            if data.len() > 200 {
106                format!("{}...", &data[..200])
107            } else {
108                data
109            }
110        );
111
112        Ok(())
113    }
114
115    /// Send a raw string message through WebSocket
116    ///
117    /// # Arguments
118    /// * `message` - Raw message string to send
119    ///
120    /// # Returns
121    /// * `Ok(())` if the message was sent successfully
122    /// * `Err(IFlowError)` if there was an error
123    pub async fn send_raw(&mut self, message: &str) -> Result<()> {
124        if !self.connected {
125            return Err(IFlowError::NotConnected);
126        }
127
128        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
129
130        // Send the message
131        ws_stream
132            .send(Message::Text(message.to_string()))
133            .await
134            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
135
136        tracing::debug!(
137            "Sent raw message: {}",
138            if message.len() > 200 {
139                format!("{}...", &message[..200])
140            } else {
141                message.to_string()
142            }
143        );
144
145        Ok(())
146    }
147
148    /// Receive messages from WebSocket
149    ///
150    /// This method receives a single message from the WebSocket connection.
151    ///
152    /// # Returns
153    /// * `Ok(String)` containing the received message
154    /// * `Err(IFlowError)` if there was an error
155    pub async fn receive(&mut self) -> Result<String> {
156        if !self.connected {
157            return Err(IFlowError::NotConnected);
158        }
159
160        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
161
162        // Receive the next message with proper error handling
163        loop {
164            let msg = match ws_stream.next().await {
165                Some(Ok(msg)) => msg,
166                Some(Err(e)) => {
167                    tracing::error!("WebSocket error: {}", e);
168                    self.connected = false;
169                    return Err(IFlowError::Transport(format!(
170                        "Failed to receive message: {}",
171                        e
172                    )));
173                }
174                None => {
175                    tracing::debug!("WebSocket connection closed");
176                    self.connected = false;
177                    return Err(IFlowError::Connection("Connection closed".to_string()));
178                }
179            };
180
181            match msg {
182                Message::Text(text) => {
183                    // Clean up the text - remove any non-printable characters at the beginning
184                    let cleaned_text = text.trim_start_matches(|c: char| {
185                        !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
186                    });
187                    tracing::debug!(
188                        "Received message: {}", cleaned_text.to_string()
189                    );
190                    return Ok(cleaned_text.to_string());
191                }
192                Message::Binary(data) => {
193                    // Convert binary to string if possible
194                    match String::from_utf8(data) {
195                        Ok(text) => return Ok(text),
196                        Err(_) => {
197                            tracing::debug!("Received binary message, ignoring");
198                            continue;
199                        }
200                    }
201                }
202                Message::Ping(data) => {
203                    // Respond to ping with pong
204                    tracing::debug!("Received ping, sending pong");
205                    if let Err(e) = ws_stream.send(Message::Pong(data)).await {
206                        tracing::error!("Failed to send pong: {}", e);
207                        self.connected = false;
208                        return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
209                    }
210                    continue;
211                }
212                Message::Pong(_) => {
213                    tracing::debug!("Received pong");
214                    continue;
215                }
216                Message::Close(close_frame) => {
217                    tracing::debug!("Received close frame: {:?}", close_frame);
218                    self.connected = false;
219                    return Err(IFlowError::Connection(
220                        "Connection closed by server".to_string(),
221                    ));
222                }
223                Message::Frame(_) => {
224                    tracing::debug!("Received raw frame, ignoring");
225                    continue;
226                }
227            }
228        }
229    }
230
231    /// Close WebSocket connection gracefully
232    pub async fn close(&mut self) -> Result<()> {
233        if let Some(mut ws_stream) = self.websocket.take() {
234            ws_stream
235                .close(None)
236                .await
237                .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
238            debug!("WebSocket connection closed");
239        }
240        self.connected = false;
241        Ok(())
242    }
243
244    /// Check if the WebSocket is connected
245    ///
246    /// # Returns
247    /// True if connected, False otherwise
248    pub fn is_connected(&self) -> bool {
249        self.connected
250    }
251
252    /// Get the WebSocket URL
253    ///
254    /// # Returns
255    /// The WebSocket URL
256    pub fn url(&self) -> &str {
257        &self.url
258    }
259}