iflow_cli_sdk_rust/
websocket_transport.rs

1//! WebSocket transport implementation for iFlow SDK
2//!
3//! This module provides the low-level WebSocket communication layer.
4//! It handles connection management, message sending/receiving, and
5//! basic error handling.
6
7use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15/// WebSocket transport for iFlow communication
16///
17/// This class provides a low-level WebSocket interface for communicating
18/// with iFlow. It handles connection management, message serialization,
19/// and error recovery.
20pub struct WebSocketTransport {
21    /// WebSocket URL to connect to
22    url: String,
23    /// Active WebSocket connection (if connected)
24    websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25    /// Whether the transport is currently connected
26    connected: bool,
27    /// Connection timeout in seconds
28    timeout: f64,
29}
30
31impl WebSocketTransport {
32    /// Initialize WebSocket transport
33    ///
34    /// # Arguments
35    /// * `url` - WebSocket URL (e.g., ws://localhost:8090/acp?peer=iflow)
36    /// * `timeout` - Connection timeout in seconds
37    pub fn new(url: String, timeout: f64) -> Self {
38        Self {
39            url,
40            websocket: None,
41            connected: false,
42            timeout,
43        }
44    }
45
46    /// Establish WebSocket connection
47    ///
48    /// # Returns
49    /// * `Ok(())` if the connection was successful
50    /// * `Err(IFlowError)` if the connection failed
51    pub async fn connect(&mut self) -> Result<()> {
52        if self.connected {
53            tracing::warn!("Already connected to {}", self.url);
54            return Ok(());
55        }
56
57        debug!("Connecting to {}", self.url);
58
59        // Parse URL to validate it
60        let _url = Url::parse(&self.url)
61            .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63        // Attempt to connect with timeout
64        let (ws_stream, _) =
65            tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(&self.url))
66                .await
67                .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68                .map_err(|e| {
69                    IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70                })?;
71
72        self.websocket = Some(ws_stream);
73        self.connected = true;
74        debug!("Connected to {}", self.url);
75
76        Ok(())
77    }
78
79    /// Send a message through WebSocket
80    ///
81    /// # Arguments
82    /// * `message` - Message to send (string or JSON Value)
83    ///
84    /// # Returns
85    /// * `Ok(())` if the message was sent successfully
86    /// * `Err(IFlowError)` if there was an error
87    pub async fn send(&mut self, message: &Value) -> Result<()> {
88        if !self.connected {
89            return Err(IFlowError::NotConnected);
90        }
91
92        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94        // Serialize message to JSON string
95        let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97        // Send the message
98        ws_stream
99            .send(Message::Text(data.clone().into()))
100            .await
101            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103        tracing::debug!(
104            "Sent message: {}", data
105        );
106
107        Ok(())
108    }
109
110    /// Send a raw string message through WebSocket
111    ///
112    /// # Arguments
113    /// * `message` - Raw message string to send
114    ///
115    /// # Returns
116    /// * `Ok(())` if the message was sent successfully
117    /// * `Err(IFlowError)` if there was an error
118    pub async fn send_raw(&mut self, message: &str) -> Result<()> {
119        if !self.connected {
120            return Err(IFlowError::NotConnected);
121        }
122
123        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
124
125        // Send the message
126        ws_stream
127            .send(Message::Text(message.to_string().into()))
128            .await
129            .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
130
131        tracing::debug!(
132            "Sent raw message: {}",
133            message.to_string()
134        );
135
136        Ok(())
137    }
138
139    /// Receive messages from WebSocket
140    ///
141    /// This method receives a single message from the WebSocket connection.
142    ///
143    /// # Returns
144    /// * `Ok(String)` containing the received message
145    /// * `Err(IFlowError)` if there was an error
146    pub async fn receive(&mut self) -> Result<String> {
147        if !self.connected {
148            return Err(IFlowError::NotConnected);
149        }
150
151        let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
152
153        // Receive the next message with proper error handling
154        loop {
155            let msg = match ws_stream.next().await {
156                Some(Ok(msg)) => msg,
157                Some(Err(e)) => {
158                    tracing::error!("WebSocket error: {}", e);
159                    self.connected = false;
160                    return Err(IFlowError::Transport(format!(
161                        "Failed to receive message: {}",
162                        e
163                    )));
164                }
165                None => {
166                    tracing::debug!("WebSocket connection closed");
167                    self.connected = false;
168                    return Err(IFlowError::Connection("Connection closed".to_string()));
169                }
170            };
171
172            match msg {
173                Message::Text(text) => {
174                    // Clean up the text - remove any non-printable characters at the beginning
175                    let cleaned_text = text.trim_start_matches(|c: char| {
176                        !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
177                    });
178                    tracing::debug!(
179                        "Received message: {}", cleaned_text.to_string()
180                    );
181                    return Ok(cleaned_text.to_string());
182                }
183                Message::Binary(data) => {
184                    // Convert binary to string if possible
185                    match String::from_utf8(data.to_vec()) {
186                        Ok(text) => return Ok(text),
187                        Err(_) => {
188                            tracing::debug!("Received binary message, ignoring");
189                            continue;
190                        }
191                    }
192                }
193                Message::Ping(data) => {
194                    // Respond to ping with pong
195                    tracing::debug!("Received ping, sending pong");
196                    if let Err(e) = ws_stream.send(Message::Pong(data)).await {
197                        tracing::error!("Failed to send pong: {}", e);
198                        self.connected = false;
199                        return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
200                    }
201                    continue;
202                }
203                Message::Pong(_) => {
204                    tracing::debug!("Received pong");
205                    continue;
206                }
207                Message::Close(close_frame) => {
208                    tracing::debug!("Received close frame: {:?}", close_frame);
209                    self.connected = false;
210                    return Err(IFlowError::Connection(
211                        "Connection closed by server".to_string(),
212                    ));
213                }
214                Message::Frame(_) => {
215                    tracing::debug!("Received raw frame, ignoring");
216                    continue;
217                }
218            }
219        }
220    }
221
222    /// Close WebSocket connection gracefully
223    pub async fn close(&mut self) -> Result<()> {
224        if let Some(mut ws_stream) = self.websocket.take() {
225            ws_stream
226                .close(None)
227                .await
228                .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
229            debug!("WebSocket connection closed");
230        }
231        self.connected = false;
232        Ok(())
233    }
234
235    /// Check if the WebSocket is connected
236    ///
237    /// # Returns
238    /// True if connected, False otherwise
239    pub fn is_connected(&self) -> bool {
240        self.connected
241    }
242
243    /// Get the WebSocket URL
244    ///
245    /// # Returns
246    /// The WebSocket URL
247    pub fn url(&self) -> &str {
248        &self.url
249    }
250}