iflow_cli_sdk_rust/
websocket_transport.rs

1//! WebSocket transport implementation for iFlow SDK
2//!
3//! This module provides the low-level WebSocket communication layer.
4//! It handles connection management, message sending/receiving, and
5//! basic error handling.
6
7use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15/// WebSocket transport for iFlow communication
16///
17/// This class provides a low-level WebSocket interface for communicating
18/// with iFlow. It handles connection management, message serialization,
19/// and error recovery.
20pub struct WebSocketTransport {
21    /// WebSocket URL to connect to
22    url: String,
23    /// Active WebSocket connection (if connected)
24    websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25    /// Whether the transport is currently connected
26    connected: bool,
27    /// Connection timeout in seconds
28    timeout: f64,
29}
30
31impl WebSocketTransport {
32    /// Initialize WebSocket transport
33    ///
34    /// # Arguments
35    /// * `url` - WebSocket URL (e.g., ws://localhost:8090/acp?peer=iflow)
36    /// * `timeout` - Connection timeout in seconds
37    pub fn new(url: String, timeout: f64) -> Self {
38        Self {
39            url,
40            websocket: None,
41            connected: false,
42            timeout,
43        }
44    }
45
46    /// Establish WebSocket connection
47    ///
48    /// # Returns
49    /// * `Ok(())` if the connection was successful
50    /// * `Err(IFlowError)` if the connection failed
51    pub async fn connect(&mut self) -> Result<()> {
52        if self.connected {
53            tracing::warn!("Already connected to {}", self.url);
54            return Ok(());
55        }
56
57        debug!("Connecting to {}", self.url);
58
59        // Parse URL to validate it
60        let url = Url::parse(&self.url)
61            .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63        // Attempt to connect with timeout
64        let (ws_stream, _) =
65            tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(url))
66                .await
67                .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68                .map_err(|e| {
69                    IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70                })?;
71
72        self.websocket = Some(ws_stream);
73        self.connected = true;
74        debug!("Connected to {}", self.url);
75
76        Ok(())
77    }
78
79    /// Send a message through WebSocket
80    ///
81    /// # Arguments
82    /// * `message` - Message to send (string or JSON Value)
83    ///
84    /// # Returns
85    /// * `Ok(())` if the message was sent successfully
86    /// * `Err(IFlowError)` if there was an error
87    pub async fn send(&mut self, message: &Value) -> Result<()> {
88        if !self.connected {
89            return Err(IFlowError::NotConnected);
90        }
91
92        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94        // Serialize message to JSON string
95        let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97        // Send the message
98        ws_stream
99            .send(Message::Text(data.clone()))
100            .await
101            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103        tracing::debug!(
104            "Sent message: {}",
105            if data.len() > 200 {
106                format!("{}...", &data[..200])
107            } else {
108                data
109            }
110        );
111
112        Ok(())
113    }
114
115    /// Send a raw string message through WebSocket
116    ///
117    /// # Arguments
118    /// * `message` - Raw message string to send
119    ///
120    /// # Returns
121    /// * `Ok(())` if the message was sent successfully
122    /// * `Err(IFlowError)` if there was an error
123    pub async fn send_raw(&mut self, message: &str) -> Result<()> {
124        if !self.connected {
125            return Err(IFlowError::NotConnected);
126        }
127
128        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
129
130        // Send the message
131        ws_stream
132            .send(Message::Text(message.to_string()))
133            .await
134            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
135
136        tracing::debug!(
137            "Sent raw message: {}",
138            if message.len() > 200 {
139                format!("{}...", &message[..200])
140            } else {
141                message.to_string()
142            }
143        );
144
145        Ok(())
146    }
147
148    /// Receive messages from WebSocket
149    ///
150    /// This method receives a single message from the WebSocket connection.
151    ///
152    /// # Returns
153    /// * `Ok(String)` containing the received message
154    /// * `Err(IFlowError)` if there was an error
155    pub async fn receive(&mut self) -> Result<String> {
156        if !self.connected {
157            return Err(IFlowError::NotConnected);
158        }
159
160        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
161
162        // Receive the next message with proper error handling
163        loop {
164            let msg = match ws_stream.next().await {
165                Some(Ok(msg)) => msg,
166                Some(Err(e)) => {
167                    tracing::error!("WebSocket error: {}", e);
168                    self.connected = false;
169                    return Err(IFlowError::Transport(format!(
170                        "Failed to receive message: {}",
171                        e
172                    )));
173                }
174                None => {
175                    tracing::debug!("WebSocket connection closed");
176                    self.connected = false;
177                    return Err(IFlowError::Connection("Connection closed".to_string()));
178                }
179            };
180
181            match msg {
182                Message::Text(text) => {
183                    // Clean up the text - remove any non-printable characters at the beginning
184                    let cleaned_text = text.trim_start_matches(|c: char| {
185                        !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
186                    });
187                    tracing::debug!(
188                        "Received message: {}",
189                        if cleaned_text.len() > 1000 {
190                            format!("{}...", &cleaned_text[..1000])
191                        } else {
192                            cleaned_text.to_string()
193                        }
194                    );
195                    return Ok(cleaned_text.to_string());
196                }
197                Message::Binary(data) => {
198                    // Convert binary to string if possible
199                    match String::from_utf8(data) {
200                        Ok(text) => return Ok(text),
201                        Err(_) => {
202                            tracing::debug!("Received binary message, ignoring");
203                            continue;
204                        }
205                    }
206                }
207                Message::Ping(data) => {
208                    // Respond to ping with pong
209                    tracing::debug!("Received ping, sending pong");
210                    if let Err(e) = ws_stream.send(Message::Pong(data)).await {
211                        tracing::error!("Failed to send pong: {}", e);
212                        self.connected = false;
213                        return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
214                    }
215                    continue;
216                }
217                Message::Pong(_) => {
218                    tracing::debug!("Received pong");
219                    continue;
220                }
221                Message::Close(close_frame) => {
222                    tracing::debug!("Received close frame: {:?}", close_frame);
223                    self.connected = false;
224                    return Err(IFlowError::Connection(
225                        "Connection closed by server".to_string(),
226                    ));
227                }
228                Message::Frame(_) => {
229                    tracing::debug!("Received raw frame, ignoring");
230                    continue;
231                }
232            }
233        }
234    }
235
236    /// Close WebSocket connection gracefully
237    pub async fn close(&mut self) -> Result<()> {
238        if let Some(mut ws_stream) = self.websocket.take() {
239            ws_stream
240                .close(None)
241                .await
242                .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
243            debug!("WebSocket connection closed");
244        }
245        self.connected = false;
246        Ok(())
247    }
248
249    /// Check if the WebSocket is connected
250    ///
251    /// # Returns
252    /// True if connected, False otherwise
253    pub fn is_connected(&self) -> bool {
254        self.connected
255    }
256
257    /// Get the WebSocket URL
258    ///
259    /// # Returns
260    /// The WebSocket URL
261    pub fn url(&self) -> &str {
262        &self.url
263    }
264}