iflow_cli_sdk_rust/
websocket_transport.rs1use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15pub struct WebSocketTransport {
21 url: String,
23 websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25 connected: bool,
27 timeout: f64,
29}
30
31impl WebSocketTransport {
32 pub fn new(url: String, timeout: f64) -> Self {
38 Self {
39 url,
40 websocket: None,
41 connected: false,
42 timeout,
43 }
44 }
45
46 pub async fn connect(&mut self) -> Result<()> {
52 if self.connected {
53 tracing::warn!("Already connected to {}", self.url);
54 return Ok(());
55 }
56
57 debug!("Connecting to {}", self.url);
58
59 let url = Url::parse(&self.url)
61 .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63 let (ws_stream, _) =
65 tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(url))
66 .await
67 .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68 .map_err(|e| {
69 IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70 })?;
71
72 self.websocket = Some(ws_stream);
73 self.connected = true;
74 debug!("Connected to {}", self.url);
75
76 Ok(())
77 }
78
79 pub async fn send(&mut self, message: &Value) -> Result<()> {
88 if !self.connected {
89 return Err(IFlowError::NotConnected);
90 }
91
92 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94 let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97 ws_stream
99 .send(Message::Text(data.clone()))
100 .await
101 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103 tracing::debug!(
104 "Sent message: {}",
105 if data.len() > 200 {
106 format!("{}...", &data[..200])
107 } else {
108 data
109 }
110 );
111
112 Ok(())
113 }
114
115 pub async fn send_raw(&mut self, message: &str) -> Result<()> {
124 if !self.connected {
125 return Err(IFlowError::NotConnected);
126 }
127
128 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
129
130 ws_stream
132 .send(Message::Text(message.to_string()))
133 .await
134 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
135
136 tracing::debug!(
137 "Sent raw message: {}",
138 if message.len() > 200 {
139 format!("{}...", &message[..200])
140 } else {
141 message.to_string()
142 }
143 );
144
145 Ok(())
146 }
147
148 pub async fn receive(&mut self) -> Result<String> {
156 if !self.connected {
157 return Err(IFlowError::NotConnected);
158 }
159
160 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
161
162 loop {
164 let msg = match ws_stream.next().await {
165 Some(Ok(msg)) => msg,
166 Some(Err(e)) => {
167 tracing::error!("WebSocket error: {}", e);
168 self.connected = false;
169 return Err(IFlowError::Transport(format!(
170 "Failed to receive message: {}",
171 e
172 )));
173 }
174 None => {
175 tracing::debug!("WebSocket connection closed");
176 self.connected = false;
177 return Err(IFlowError::Connection("Connection closed".to_string()));
178 }
179 };
180
181 match msg {
182 Message::Text(text) => {
183 let cleaned_text = text.trim_start_matches(|c: char| {
185 !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
186 });
187 tracing::debug!(
188 "Received message: {}",
189 if cleaned_text.len() > 1000 {
190 format!("{}...", &cleaned_text[..1000])
191 } else {
192 cleaned_text.to_string()
193 }
194 );
195 return Ok(cleaned_text.to_string());
196 }
197 Message::Binary(data) => {
198 match String::from_utf8(data) {
200 Ok(text) => return Ok(text),
201 Err(_) => {
202 tracing::debug!("Received binary message, ignoring");
203 continue;
204 }
205 }
206 }
207 Message::Ping(data) => {
208 tracing::debug!("Received ping, sending pong");
210 if let Err(e) = ws_stream.send(Message::Pong(data)).await {
211 tracing::error!("Failed to send pong: {}", e);
212 self.connected = false;
213 return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
214 }
215 continue;
216 }
217 Message::Pong(_) => {
218 tracing::debug!("Received pong");
219 continue;
220 }
221 Message::Close(close_frame) => {
222 tracing::debug!("Received close frame: {:?}", close_frame);
223 self.connected = false;
224 return Err(IFlowError::Connection(
225 "Connection closed by server".to_string(),
226 ));
227 }
228 Message::Frame(_) => {
229 tracing::debug!("Received raw frame, ignoring");
230 continue;
231 }
232 }
233 }
234 }
235
236 pub async fn close(&mut self) -> Result<()> {
238 if let Some(mut ws_stream) = self.websocket.take() {
239 ws_stream
240 .close(None)
241 .await
242 .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
243 debug!("WebSocket connection closed");
244 }
245 self.connected = false;
246 Ok(())
247 }
248
249 pub fn is_connected(&self) -> bool {
254 self.connected
255 }
256
257 pub fn url(&self) -> &str {
262 &self.url
263 }
264}