iflow_cli_sdk_rust/
websocket_transport.rs1use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15pub struct WebSocketTransport {
21 url: String,
23 websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25 connected: bool,
27 timeout: f64,
29}
30
31impl WebSocketTransport {
32 pub fn new(url: String, timeout: f64) -> Self {
38 Self {
39 url,
40 websocket: None,
41 connected: false,
42 timeout,
43 }
44 }
45
46 pub async fn connect(&mut self) -> Result<()> {
52 if self.connected {
53 tracing::warn!("Already connected to {}", self.url);
54 return Ok(());
55 }
56
57 debug!("Connecting to {}", self.url);
58
59 let _url = Url::parse(&self.url)
61 .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63 let (ws_stream, _) =
65 tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(&self.url))
66 .await
67 .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68 .map_err(|e| {
69 IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70 })?;
71
72 self.websocket = Some(ws_stream);
73 self.connected = true;
74 debug!("Connected to {}", self.url);
75
76 Ok(())
77 }
78
79 pub async fn send(&mut self, message: &Value) -> Result<()> {
88 if !self.connected {
89 return Err(IFlowError::NotConnected);
90 }
91
92 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94 let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97 ws_stream
99 .send(Message::Text(data.clone().into()))
100 .await
101 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103 tracing::debug!(
104 "Sent message: {}", data
105 );
106
107 Ok(())
108 }
109
110 pub async fn send_raw(&mut self, message: &str) -> Result<()> {
119 if !self.connected {
120 return Err(IFlowError::NotConnected);
121 }
122
123 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
124
125 ws_stream
127 .send(Message::Text(message.to_string().into()))
128 .await
129 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
130
131 tracing::debug!(
132 "Sent raw message: {}",
133 message.to_string()
134 );
135
136 Ok(())
137 }
138
139 pub async fn receive(&mut self) -> Result<String> {
147 if !self.connected {
148 return Err(IFlowError::NotConnected);
149 }
150
151 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
152
153 loop {
155 let msg = match ws_stream.next().await {
156 Some(Ok(msg)) => msg,
157 Some(Err(e)) => {
158 tracing::error!("WebSocket error: {}", e);
159 self.connected = false;
160 return Err(IFlowError::Transport(format!(
161 "Failed to receive message: {}",
162 e
163 )));
164 }
165 None => {
166 tracing::debug!("WebSocket connection closed");
167 self.connected = false;
168 return Err(IFlowError::Connection("Connection closed".to_string()));
169 }
170 };
171
172 match msg {
173 Message::Text(text) => {
174 let cleaned_text = text.trim_start_matches(|c: char| {
176 !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
177 });
178 tracing::debug!(
179 "Received message: {}", cleaned_text.to_string()
180 );
181 return Ok(cleaned_text.to_string());
182 }
183 Message::Binary(data) => {
184 match String::from_utf8(data.to_vec()) {
186 Ok(text) => return Ok(text),
187 Err(_) => {
188 tracing::debug!("Received binary message, ignoring");
189 continue;
190 }
191 }
192 }
193 Message::Ping(data) => {
194 tracing::debug!("Received ping, sending pong");
196 if let Err(e) = ws_stream.send(Message::Pong(data)).await {
197 tracing::error!("Failed to send pong: {}", e);
198 self.connected = false;
199 return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
200 }
201 continue;
202 }
203 Message::Pong(_) => {
204 tracing::debug!("Received pong");
205 continue;
206 }
207 Message::Close(close_frame) => {
208 tracing::debug!("Received close frame: {:?}", close_frame);
209 self.connected = false;
210 return Err(IFlowError::Connection(
211 "Connection closed by server".to_string(),
212 ));
213 }
214 Message::Frame(_) => {
215 tracing::debug!("Received raw frame, ignoring");
216 continue;
217 }
218 }
219 }
220 }
221
222 pub async fn close(&mut self) -> Result<()> {
224 if let Some(mut ws_stream) = self.websocket.take() {
225 ws_stream
226 .close(None)
227 .await
228 .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
229 debug!("WebSocket connection closed");
230 }
231 self.connected = false;
232 Ok(())
233 }
234
235 pub fn is_connected(&self) -> bool {
240 self.connected
241 }
242
243 pub fn url(&self) -> &str {
248 &self.url
249 }
250}