Skip to main content

feagi_io/protocol_implementations/websocket/websocket_std/
client_implementations.rs

1//! WebSocket client implementations using the poll-based trait design.
2//!
3//! Uses `tungstenite` with non-blocking `std::net::TcpStream` for poll-based
4//! WebSocket communication that works with any async runtime or synchronously.
5
6use std::io::ErrorKind;
7use std::net::TcpStream;
8
9use tungstenite::{connect, Message, WebSocket};
10
11use crate::protocol_implementations::websocket::WebSocketUrl;
12use crate::traits_and_enums::client::{
13    FeagiClient, FeagiClientPusher, FeagiClientPusherProperties, FeagiClientRequester,
14    FeagiClientRequesterProperties, FeagiClientSubscriber, FeagiClientSubscriberProperties,
15};
16use crate::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
17use crate::FeagiNetworkError;
18
19/// Type alias for WebSocket over TcpStream
20type WsStream = WebSocket<tungstenite::stream::MaybeTlsStream<TcpStream>>;
21
22// ============================================================================
23// Subscriber
24// ============================================================================
25
26//region Subscriber Properties
27
28/// Configuration properties for creating a WebSocket subscriber client.
29#[derive(Debug, Clone, PartialEq)]
30pub struct FeagiWebSocketClientSubscriberProperties {
31    server_address: WebSocketUrl,
32}
33
34impl FeagiWebSocketClientSubscriberProperties {
35    /// Creates new subscriber properties with the given server address.
36    ///
37    /// # Arguments
38    ///
39    /// * `server_address` - The WebSocket URL (e.g., "ws://localhost:8080" or "localhost:8080").
40    pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
41        let url = WebSocketUrl::new(server_address)?;
42        Ok(Self {
43            server_address: url,
44        })
45    }
46}
47
48impl FeagiClientSubscriberProperties for FeagiWebSocketClientSubscriberProperties {
49    fn as_boxed_client_subscriber(&self) -> Box<dyn FeagiClientSubscriber> {
50        Box::new(FeagiWebSocketClientSubscriber {
51            server_address: self.server_address.clone(),
52            current_state: FeagiEndpointState::Inactive,
53            socket: None,
54            receive_buffer: None,
55            has_data: false,
56        })
57    }
58
59    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
60        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
61    }
62}
63
64//endregion
65
66//region Subscriber Implementation
67
68/// A WebSocket client that subscribes to data from a publisher server.
69pub struct FeagiWebSocketClientSubscriber {
70    server_address: WebSocketUrl,
71    current_state: FeagiEndpointState,
72    socket: Option<WsStream>,
73    receive_buffer: Option<Vec<u8>>,
74    has_data: bool,
75}
76
77impl FeagiWebSocketClientSubscriber {
78    fn try_receive(&mut self) -> bool {
79        let socket = match &mut self.socket {
80            Some(s) => s,
81            None => return false,
82        };
83
84        match socket.read() {
85            Ok(Message::Binary(data)) => {
86                self.receive_buffer = Some(data);
87                true
88            }
89            Ok(Message::Text(text)) => {
90                self.receive_buffer = Some(text.into_bytes());
91                true
92            }
93            Ok(Message::Close(_)) => {
94                self.current_state = FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(
95                    "Connection closed".to_string(),
96                ));
97                false
98            }
99            Ok(_) => false, // Ping/Pong
100            Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => false,
101            Err(e) => {
102                self.current_state =
103                    FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(e.to_string()));
104                false
105            }
106        }
107    }
108}
109
110impl FeagiClient for FeagiWebSocketClientSubscriber {
111    fn poll(&mut self) -> &FeagiEndpointState {
112        if matches!(self.current_state, FeagiEndpointState::ActiveWaiting)
113            && !self.has_data
114            && self.try_receive()
115        {
116            self.has_data = true;
117            self.current_state = FeagiEndpointState::ActiveHasData;
118        }
119        &self.current_state
120    }
121
122    fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
123        match &self.current_state {
124            FeagiEndpointState::Inactive => {
125                let (socket, _response) = connect(self.server_address.as_str())
126                    .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
127
128                // Set underlying stream to non-blocking
129                if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
130                    stream
131                        .set_nonblocking(true)
132                        .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
133                }
134
135                self.socket = Some(socket);
136                self.current_state = FeagiEndpointState::ActiveWaiting;
137                Ok(())
138            }
139            _ => Err(FeagiNetworkError::InvalidSocketProperties(
140                "Cannot connect: client is not in Inactive state".to_string(),
141            )),
142        }
143    }
144
145    fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
146        match &self.current_state {
147            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
148                if let Some(mut socket) = self.socket.take() {
149                    let _ = socket.close(None);
150                }
151                self.receive_buffer = None;
152                self.has_data = false;
153                self.current_state = FeagiEndpointState::Inactive;
154                Ok(())
155            }
156            _ => Err(FeagiNetworkError::InvalidSocketProperties(
157                "Cannot disconnect: client is not in Active state".to_string(),
158            )),
159        }
160    }
161
162    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
163        match &self.current_state {
164            FeagiEndpointState::Errored(_) => {
165                if let Some(mut socket) = self.socket.take() {
166                    let _ = socket.close(None);
167                }
168                self.receive_buffer = None;
169                self.has_data = false;
170                self.current_state = FeagiEndpointState::Inactive;
171                Ok(())
172            }
173            _ => Err(FeagiNetworkError::InvalidSocketProperties(
174                "Cannot confirm error: client is not in Errored state".to_string(),
175            )),
176        }
177    }
178
179    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
180        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
181    }
182}
183
184impl FeagiClientSubscriber for FeagiWebSocketClientSubscriber {
185    fn consume_retrieved_data(&mut self) -> Result<&[u8], FeagiNetworkError> {
186        match &self.current_state {
187            FeagiEndpointState::ActiveHasData => {
188                if self.has_data {
189                    if let Some(ref data) = self.receive_buffer {
190                        self.has_data = false;
191                        self.current_state = FeagiEndpointState::ActiveWaiting;
192                        Ok(data.as_slice())
193                    } else {
194                        Err(FeagiNetworkError::ReceiveFailed(
195                            "No data in buffer".to_string(),
196                        ))
197                    }
198                } else {
199                    Err(FeagiNetworkError::ReceiveFailed(
200                        "No data available".to_string(),
201                    ))
202                }
203            }
204            _ => Err(FeagiNetworkError::ReceiveFailed(
205                "Cannot consume: no data available".to_string(),
206            )),
207        }
208    }
209
210    fn as_boxed_subscriber_properties(&self) -> Box<dyn FeagiClientSubscriberProperties> {
211        Box::new(FeagiWebSocketClientSubscriberProperties {
212            server_address: self.server_address.clone(),
213        })
214    }
215}
216
217//endregion
218
219// ============================================================================
220// Pusher
221// ============================================================================
222
223//region Pusher Properties
224
225/// Configuration properties for creating a WebSocket pusher client.
226#[derive(Debug, Clone, PartialEq)]
227pub struct FeagiWebSocketClientPusherProperties {
228    server_address: WebSocketUrl,
229}
230
231impl FeagiWebSocketClientPusherProperties {
232    /// Creates new pusher properties with the given server address.
233    pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
234        let url = WebSocketUrl::new(server_address)?;
235        Ok(Self {
236            server_address: url,
237        })
238    }
239}
240
241impl FeagiClientPusherProperties for FeagiWebSocketClientPusherProperties {
242    fn as_boxed_client_pusher(&self) -> Box<dyn FeagiClientPusher> {
243        Box::new(FeagiWebSocketClientPusher {
244            server_address: self.server_address.clone(),
245            current_state: FeagiEndpointState::Inactive,
246            socket: None,
247        })
248    }
249
250    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
251        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
252    }
253}
254
255//endregion
256
257//region Pusher Implementation
258
259/// A WebSocket client that pushes data to a server.
260pub struct FeagiWebSocketClientPusher {
261    server_address: WebSocketUrl,
262    current_state: FeagiEndpointState,
263    socket: Option<WsStream>,
264}
265
266impl FeagiClient for FeagiWebSocketClientPusher {
267    fn poll(&mut self) -> &FeagiEndpointState {
268        // Pusher doesn't receive data, just return current state
269        &self.current_state
270    }
271
272    fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
273        match &self.current_state {
274            FeagiEndpointState::Inactive => {
275                let (socket, _response) = connect(self.server_address.as_str())
276                    .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
277
278                if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
279                    stream
280                        .set_nonblocking(true)
281                        .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
282                }
283
284                self.socket = Some(socket);
285                self.current_state = FeagiEndpointState::ActiveWaiting;
286                Ok(())
287            }
288            _ => Err(FeagiNetworkError::InvalidSocketProperties(
289                "Cannot connect: client is not in Inactive state".to_string(),
290            )),
291        }
292    }
293
294    fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
295        match &self.current_state {
296            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
297                if let Some(mut socket) = self.socket.take() {
298                    let _ = socket.close(None);
299                }
300                self.current_state = FeagiEndpointState::Inactive;
301                Ok(())
302            }
303            _ => Err(FeagiNetworkError::InvalidSocketProperties(
304                "Cannot disconnect: client is not in Active state".to_string(),
305            )),
306        }
307    }
308
309    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
310        match &self.current_state {
311            FeagiEndpointState::Errored(_) => {
312                if let Some(mut socket) = self.socket.take() {
313                    let _ = socket.close(None);
314                }
315                self.current_state = FeagiEndpointState::Inactive;
316                Ok(())
317            }
318            _ => Err(FeagiNetworkError::InvalidSocketProperties(
319                "Cannot confirm error: client is not in Errored state".to_string(),
320            )),
321        }
322    }
323
324    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
325        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
326    }
327}
328
329impl FeagiClientPusher for FeagiWebSocketClientPusher {
330    fn publish_data(&mut self, data: &[u8]) -> Result<(), FeagiNetworkError> {
331        match &self.current_state {
332            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
333                let socket = self
334                    .socket
335                    .as_mut()
336                    .ok_or_else(|| FeagiNetworkError::SendFailed("Not connected".to_string()))?;
337
338                let message = Message::Binary(data.to_vec());
339                socket
340                    .send(message)
341                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
342                socket
343                    .flush()
344                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
345
346                Ok(())
347            }
348            _ => Err(FeagiNetworkError::SendFailed(
349                "Cannot publish: client is not in Active state".to_string(),
350            )),
351        }
352    }
353
354    fn as_boxed_pusher_properties(&self) -> Box<dyn FeagiClientPusherProperties> {
355        Box::new(FeagiWebSocketClientPusherProperties {
356            server_address: self.server_address.clone(),
357        })
358    }
359}
360
361//endregion
362
363// ============================================================================
364// Requester
365// ============================================================================
366
367//region Requester Properties
368
369/// Configuration properties for creating a WebSocket requester client.
370#[derive(Debug, Clone, PartialEq)]
371pub struct FeagiWebSocketClientRequesterProperties {
372    server_address: WebSocketUrl,
373}
374
375impl FeagiWebSocketClientRequesterProperties {
376    /// Creates new requester properties with the given server address.
377    pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
378        let url = WebSocketUrl::new(server_address)?;
379        Ok(Self {
380            server_address: url,
381        })
382    }
383}
384
385impl FeagiClientRequesterProperties for FeagiWebSocketClientRequesterProperties {
386    fn as_boxed_client_requester(&self) -> Box<dyn FeagiClientRequester> {
387        Box::new(FeagiWebSocketClientRequester {
388            server_address: self.server_address.clone(),
389            current_state: FeagiEndpointState::Inactive,
390            socket: None,
391            receive_buffer: None,
392            has_data: false,
393        })
394    }
395
396    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
397        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
398    }
399}
400
401//endregion
402
403//region Requester Implementation
404
405/// A WebSocket client that sends requests and receives responses.
406pub struct FeagiWebSocketClientRequester {
407    server_address: WebSocketUrl,
408    current_state: FeagiEndpointState,
409    socket: Option<WsStream>,
410    receive_buffer: Option<Vec<u8>>,
411    has_data: bool,
412}
413
414impl FeagiWebSocketClientRequester {
415    fn try_receive(&mut self) -> bool {
416        let socket = match &mut self.socket {
417            Some(s) => s,
418            None => return false,
419        };
420
421        match socket.read() {
422            Ok(Message::Binary(data)) => {
423                self.receive_buffer = Some(data);
424                true
425            }
426            Ok(Message::Text(text)) => {
427                self.receive_buffer = Some(text.into_bytes());
428                true
429            }
430            Ok(Message::Close(_)) => {
431                self.current_state = FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(
432                    "Connection closed".to_string(),
433                ));
434                false
435            }
436            Ok(_) => false,
437            Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => false,
438            Err(e) => {
439                self.current_state =
440                    FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(e.to_string()));
441                false
442            }
443        }
444    }
445}
446
447impl FeagiClient for FeagiWebSocketClientRequester {
448    fn poll(&mut self) -> &FeagiEndpointState {
449        if matches!(self.current_state, FeagiEndpointState::ActiveWaiting)
450            && !self.has_data
451            && self.try_receive()
452        {
453            self.has_data = true;
454            self.current_state = FeagiEndpointState::ActiveHasData;
455        }
456        &self.current_state
457    }
458
459    fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
460        match &self.current_state {
461            FeagiEndpointState::Inactive => {
462                let (socket, _response) = connect(self.server_address.as_str())
463                    .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
464
465                if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
466                    stream
467                        .set_nonblocking(true)
468                        .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
469                }
470
471                self.socket = Some(socket);
472                self.current_state = FeagiEndpointState::ActiveWaiting;
473                Ok(())
474            }
475            _ => Err(FeagiNetworkError::InvalidSocketProperties(
476                "Cannot connect: client is not in Inactive state".to_string(),
477            )),
478        }
479    }
480
481    fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
482        match &self.current_state {
483            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
484                if let Some(mut socket) = self.socket.take() {
485                    let _ = socket.close(None);
486                }
487                self.receive_buffer = None;
488                self.has_data = false;
489                self.current_state = FeagiEndpointState::Inactive;
490                Ok(())
491            }
492            _ => Err(FeagiNetworkError::InvalidSocketProperties(
493                "Cannot disconnect: client is not in Active state".to_string(),
494            )),
495        }
496    }
497
498    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
499        match &self.current_state {
500            FeagiEndpointState::Errored(_) => {
501                if let Some(mut socket) = self.socket.take() {
502                    let _ = socket.close(None);
503                }
504                self.receive_buffer = None;
505                self.has_data = false;
506                self.current_state = FeagiEndpointState::Inactive;
507                Ok(())
508            }
509            _ => Err(FeagiNetworkError::InvalidSocketProperties(
510                "Cannot confirm error: client is not in Errored state".to_string(),
511            )),
512        }
513    }
514
515    fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
516        TransportProtocolEndpoint::WebSocket(self.server_address.clone())
517    }
518}
519
520impl FeagiClientRequester for FeagiWebSocketClientRequester {
521    fn publish_request(&mut self, request: &[u8]) -> Result<(), FeagiNetworkError> {
522        match &self.current_state {
523            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
524                let socket = self
525                    .socket
526                    .as_mut()
527                    .ok_or_else(|| FeagiNetworkError::SendFailed("Not connected".to_string()))?;
528
529                let message = Message::Binary(request.to_vec());
530                socket
531                    .send(message)
532                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
533                socket
534                    .flush()
535                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
536
537                Ok(())
538            }
539            _ => Err(FeagiNetworkError::SendFailed(
540                "Cannot send request: client is not in Active state".to_string(),
541            )),
542        }
543    }
544
545    fn consume_retrieved_response(&mut self) -> Result<&[u8], FeagiNetworkError> {
546        match &self.current_state {
547            FeagiEndpointState::ActiveHasData => {
548                if self.has_data {
549                    if let Some(ref data) = self.receive_buffer {
550                        self.has_data = false;
551                        self.current_state = FeagiEndpointState::ActiveWaiting;
552                        Ok(data.as_slice())
553                    } else {
554                        Err(FeagiNetworkError::ReceiveFailed(
555                            "No data in buffer".to_string(),
556                        ))
557                    }
558                } else {
559                    Err(FeagiNetworkError::ReceiveFailed(
560                        "No response available".to_string(),
561                    ))
562                }
563            }
564            _ => Err(FeagiNetworkError::ReceiveFailed(
565                "Cannot consume: no response available".to_string(),
566            )),
567        }
568    }
569
570    fn as_boxed_requester_properties(&self) -> Box<dyn FeagiClientRequesterProperties> {
571        Box::new(FeagiWebSocketClientRequesterProperties {
572            server_address: self.server_address.clone(),
573        })
574    }
575}
576
577//endregion