iflow_cli_sdk_rust/
websocket_transport.rs1use crate::error::{IFlowError, Result};
8use futures::{SinkExt, StreamExt};
9use serde_json::Value;
10use std::time::Duration;
11use tokio_tungstenite::{WebSocketStream, connect_async, tungstenite::protocol::Message};
12use tracing::debug;
13use url::Url;
14
15pub struct WebSocketTransport {
21 url: String,
23 websocket: Option<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
25 connected: bool,
27 timeout: f64,
29}
30
31impl WebSocketTransport {
32 pub fn new(url: String, timeout: f64) -> Self {
38 Self {
39 url,
40 websocket: None,
41 connected: false,
42 timeout,
43 }
44 }
45
46 pub async fn connect(&mut self) -> Result<()> {
52 if self.connected {
53 tracing::warn!("Already connected to {}", self.url);
54 return Ok(());
55 }
56
57 debug!("Connecting to {}", self.url);
58
59 let url = Url::parse(&self.url)
61 .map_err(|e| IFlowError::Connection(format!("Invalid URL: {}", e)))?;
62
63 let (ws_stream, _) =
65 tokio::time::timeout(Duration::from_secs_f64(self.timeout), connect_async(url))
66 .await
67 .map_err(|_| IFlowError::Timeout("Connection timeout".to_string()))?
68 .map_err(|e| {
69 IFlowError::Connection(format!("WebSocket connection failed: {}", e))
70 })?;
71
72 self.websocket = Some(ws_stream);
73 self.connected = true;
74 debug!("Connected to {}", self.url);
75
76 Ok(())
77 }
78
79 pub async fn send(&mut self, message: &Value) -> Result<()> {
88 if !self.connected {
89 return Err(IFlowError::NotConnected);
90 }
91
92 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
93
94 let data = serde_json::to_string(message).map_err(|e| IFlowError::JsonParse(e))?;
96
97 ws_stream
99 .send(Message::Text(data.clone()))
100 .await
101 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
102
103 tracing::debug!(
104 "Sent message: {}",
105 if data.len() > 200 {
106 format!("{}...", &data[..200])
107 } else {
108 data
109 }
110 );
111
112 Ok(())
113 }
114
115 pub async fn send_raw(&mut self, message: &str) -> Result<()> {
124 if !self.connected {
125 return Err(IFlowError::NotConnected);
126 }
127
128 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
129
130 ws_stream
132 .send(Message::Text(message.to_string()))
133 .await
134 .map_err(|e| IFlowError::Transport(format!("Failed to send message: {}", e)))?;
135
136 tracing::debug!(
137 "Sent raw message: {}",
138 if message.len() > 200 {
139 format!("{}...", &message[..200])
140 } else {
141 message.to_string()
142 }
143 );
144
145 Ok(())
146 }
147
148 pub async fn receive(&mut self) -> Result<String> {
156 if !self.connected {
157 return Err(IFlowError::NotConnected);
158 }
159
160 let ws_stream = self.websocket.as_mut().ok_or(IFlowError::NotConnected)?;
161
162 loop {
164 let msg = match ws_stream.next().await {
165 Some(Ok(msg)) => msg,
166 Some(Err(e)) => {
167 tracing::error!("WebSocket error: {}", e);
168 self.connected = false;
169 return Err(IFlowError::Transport(format!(
170 "Failed to receive message: {}",
171 e
172 )));
173 }
174 None => {
175 tracing::debug!("WebSocket connection closed");
176 self.connected = false;
177 return Err(IFlowError::Connection("Connection closed".to_string()));
178 }
179 };
180
181 match msg {
182 Message::Text(text) => {
183 let cleaned_text = text.trim_start_matches(|c: char| {
185 !c.is_ascii() || c.is_control() && c != '\n' && c != '\r' && c != '\t'
186 });
187 tracing::debug!(
188 "Received message: {}", cleaned_text.to_string()
189 );
190 return Ok(cleaned_text.to_string());
191 }
192 Message::Binary(data) => {
193 match String::from_utf8(data) {
195 Ok(text) => return Ok(text),
196 Err(_) => {
197 tracing::debug!("Received binary message, ignoring");
198 continue;
199 }
200 }
201 }
202 Message::Ping(data) => {
203 tracing::debug!("Received ping, sending pong");
205 if let Err(e) = ws_stream.send(Message::Pong(data)).await {
206 tracing::error!("Failed to send pong: {}", e);
207 self.connected = false;
208 return Err(IFlowError::Transport(format!("Failed to send pong: {}", e)));
209 }
210 continue;
211 }
212 Message::Pong(_) => {
213 tracing::debug!("Received pong");
214 continue;
215 }
216 Message::Close(close_frame) => {
217 tracing::debug!("Received close frame: {:?}", close_frame);
218 self.connected = false;
219 return Err(IFlowError::Connection(
220 "Connection closed by server".to_string(),
221 ));
222 }
223 Message::Frame(_) => {
224 tracing::debug!("Received raw frame, ignoring");
225 continue;
226 }
227 }
228 }
229 }
230
231 pub async fn close(&mut self) -> Result<()> {
233 if let Some(mut ws_stream) = self.websocket.take() {
234 ws_stream
235 .close(None)
236 .await
237 .map_err(|e| IFlowError::Transport(format!("Error closing WebSocket: {}", e)))?;
238 debug!("WebSocket connection closed");
239 }
240 self.connected = false;
241 Ok(())
242 }
243
244 pub fn is_connected(&self) -> bool {
249 self.connected
250 }
251
252 pub fn url(&self) -> &str {
257 &self.url
258 }
259}