feagi_io/protocol_implementations/websocket/websocket_std/
client_implementations.rs1use std::io::ErrorKind;
7use std::net::TcpStream;
8
9use tungstenite::{connect, Message, WebSocket};
10
11use crate::protocol_implementations::websocket::WebSocketUrl;
12use crate::traits_and_enums::client::{
13 FeagiClient, FeagiClientPusher, FeagiClientPusherProperties, FeagiClientRequester,
14 FeagiClientRequesterProperties, FeagiClientSubscriber, FeagiClientSubscriberProperties,
15};
16use crate::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
17use crate::FeagiNetworkError;
18
19type WsStream = WebSocket<tungstenite::stream::MaybeTlsStream<TcpStream>>;
21
22#[derive(Debug, Clone, PartialEq)]
30pub struct FeagiWebSocketClientSubscriberProperties {
31 server_address: WebSocketUrl,
32}
33
34impl FeagiWebSocketClientSubscriberProperties {
35 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
41 let url = WebSocketUrl::new(server_address)?;
42 Ok(Self {
43 server_address: url,
44 })
45 }
46}
47
48impl FeagiClientSubscriberProperties for FeagiWebSocketClientSubscriberProperties {
49 fn as_boxed_client_subscriber(&self) -> Box<dyn FeagiClientSubscriber> {
50 Box::new(FeagiWebSocketClientSubscriber {
51 server_address: self.server_address.clone(),
52 current_state: FeagiEndpointState::Inactive,
53 socket: None,
54 receive_buffer: None,
55 has_data: false,
56 })
57 }
58
59 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
60 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
61 }
62}
63
64pub struct FeagiWebSocketClientSubscriber {
70 server_address: WebSocketUrl,
71 current_state: FeagiEndpointState,
72 socket: Option<WsStream>,
73 receive_buffer: Option<Vec<u8>>,
74 has_data: bool,
75}
76
77impl FeagiWebSocketClientSubscriber {
78 fn try_receive(&mut self) -> bool {
79 let socket = match &mut self.socket {
80 Some(s) => s,
81 None => return false,
82 };
83
84 match socket.read() {
85 Ok(Message::Binary(data)) => {
86 self.receive_buffer = Some(data);
87 true
88 }
89 Ok(Message::Text(text)) => {
90 self.receive_buffer = Some(text.into_bytes());
91 true
92 }
93 Ok(Message::Close(_)) => {
94 self.current_state = FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(
95 "Connection closed".to_string(),
96 ));
97 false
98 }
99 Ok(_) => false, Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => false,
101 Err(e) => {
102 self.current_state =
103 FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(e.to_string()));
104 false
105 }
106 }
107 }
108}
109
110impl FeagiClient for FeagiWebSocketClientSubscriber {
111 fn poll(&mut self) -> &FeagiEndpointState {
112 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting)
113 && !self.has_data
114 && self.try_receive()
115 {
116 self.has_data = true;
117 self.current_state = FeagiEndpointState::ActiveHasData;
118 }
119 &self.current_state
120 }
121
122 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
123 match &self.current_state {
124 FeagiEndpointState::Inactive => {
125 let (socket, _response) = connect(self.server_address.as_str())
126 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
127
128 if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
130 stream
131 .set_nonblocking(true)
132 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
133 }
134
135 self.socket = Some(socket);
136 self.current_state = FeagiEndpointState::ActiveWaiting;
137 Ok(())
138 }
139 _ => Err(FeagiNetworkError::InvalidSocketProperties(
140 "Cannot connect: client is not in Inactive state".to_string(),
141 )),
142 }
143 }
144
145 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
146 match &self.current_state {
147 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
148 if let Some(mut socket) = self.socket.take() {
149 let _ = socket.close(None);
150 }
151 self.receive_buffer = None;
152 self.has_data = false;
153 self.current_state = FeagiEndpointState::Inactive;
154 Ok(())
155 }
156 _ => Err(FeagiNetworkError::InvalidSocketProperties(
157 "Cannot disconnect: client is not in Active state".to_string(),
158 )),
159 }
160 }
161
162 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
163 match &self.current_state {
164 FeagiEndpointState::Errored(_) => {
165 if let Some(mut socket) = self.socket.take() {
166 let _ = socket.close(None);
167 }
168 self.receive_buffer = None;
169 self.has_data = false;
170 self.current_state = FeagiEndpointState::Inactive;
171 Ok(())
172 }
173 _ => Err(FeagiNetworkError::InvalidSocketProperties(
174 "Cannot confirm error: client is not in Errored state".to_string(),
175 )),
176 }
177 }
178
179 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
180 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
181 }
182}
183
184impl FeagiClientSubscriber for FeagiWebSocketClientSubscriber {
185 fn consume_retrieved_data(&mut self) -> Result<&[u8], FeagiNetworkError> {
186 match &self.current_state {
187 FeagiEndpointState::ActiveHasData => {
188 if self.has_data {
189 if let Some(ref data) = self.receive_buffer {
190 self.has_data = false;
191 self.current_state = FeagiEndpointState::ActiveWaiting;
192 Ok(data.as_slice())
193 } else {
194 Err(FeagiNetworkError::ReceiveFailed(
195 "No data in buffer".to_string(),
196 ))
197 }
198 } else {
199 Err(FeagiNetworkError::ReceiveFailed(
200 "No data available".to_string(),
201 ))
202 }
203 }
204 _ => Err(FeagiNetworkError::ReceiveFailed(
205 "Cannot consume: no data available".to_string(),
206 )),
207 }
208 }
209
210 fn as_boxed_subscriber_properties(&self) -> Box<dyn FeagiClientSubscriberProperties> {
211 Box::new(FeagiWebSocketClientSubscriberProperties {
212 server_address: self.server_address.clone(),
213 })
214 }
215}
216
217#[derive(Debug, Clone, PartialEq)]
227pub struct FeagiWebSocketClientPusherProperties {
228 server_address: WebSocketUrl,
229}
230
231impl FeagiWebSocketClientPusherProperties {
232 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
234 let url = WebSocketUrl::new(server_address)?;
235 Ok(Self {
236 server_address: url,
237 })
238 }
239}
240
241impl FeagiClientPusherProperties for FeagiWebSocketClientPusherProperties {
242 fn as_boxed_client_pusher(&self) -> Box<dyn FeagiClientPusher> {
243 Box::new(FeagiWebSocketClientPusher {
244 server_address: self.server_address.clone(),
245 current_state: FeagiEndpointState::Inactive,
246 socket: None,
247 })
248 }
249
250 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
251 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
252 }
253}
254
255pub struct FeagiWebSocketClientPusher {
261 server_address: WebSocketUrl,
262 current_state: FeagiEndpointState,
263 socket: Option<WsStream>,
264}
265
266impl FeagiClient for FeagiWebSocketClientPusher {
267 fn poll(&mut self) -> &FeagiEndpointState {
268 &self.current_state
270 }
271
272 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
273 match &self.current_state {
274 FeagiEndpointState::Inactive => {
275 let (socket, _response) = connect(self.server_address.as_str())
276 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
277
278 if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
279 stream
280 .set_nonblocking(true)
281 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
282 }
283
284 self.socket = Some(socket);
285 self.current_state = FeagiEndpointState::ActiveWaiting;
286 Ok(())
287 }
288 _ => Err(FeagiNetworkError::InvalidSocketProperties(
289 "Cannot connect: client is not in Inactive state".to_string(),
290 )),
291 }
292 }
293
294 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
295 match &self.current_state {
296 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
297 if let Some(mut socket) = self.socket.take() {
298 let _ = socket.close(None);
299 }
300 self.current_state = FeagiEndpointState::Inactive;
301 Ok(())
302 }
303 _ => Err(FeagiNetworkError::InvalidSocketProperties(
304 "Cannot disconnect: client is not in Active state".to_string(),
305 )),
306 }
307 }
308
309 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
310 match &self.current_state {
311 FeagiEndpointState::Errored(_) => {
312 if let Some(mut socket) = self.socket.take() {
313 let _ = socket.close(None);
314 }
315 self.current_state = FeagiEndpointState::Inactive;
316 Ok(())
317 }
318 _ => Err(FeagiNetworkError::InvalidSocketProperties(
319 "Cannot confirm error: client is not in Errored state".to_string(),
320 )),
321 }
322 }
323
324 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
325 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
326 }
327}
328
329impl FeagiClientPusher for FeagiWebSocketClientPusher {
330 fn publish_data(&mut self, data: &[u8]) -> Result<(), FeagiNetworkError> {
331 match &self.current_state {
332 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
333 let socket = self
334 .socket
335 .as_mut()
336 .ok_or_else(|| FeagiNetworkError::SendFailed("Not connected".to_string()))?;
337
338 let message = Message::Binary(data.to_vec());
339 socket
340 .send(message)
341 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
342 socket
343 .flush()
344 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
345
346 Ok(())
347 }
348 _ => Err(FeagiNetworkError::SendFailed(
349 "Cannot publish: client is not in Active state".to_string(),
350 )),
351 }
352 }
353
354 fn as_boxed_pusher_properties(&self) -> Box<dyn FeagiClientPusherProperties> {
355 Box::new(FeagiWebSocketClientPusherProperties {
356 server_address: self.server_address.clone(),
357 })
358 }
359}
360
361#[derive(Debug, Clone, PartialEq)]
371pub struct FeagiWebSocketClientRequesterProperties {
372 server_address: WebSocketUrl,
373}
374
375impl FeagiWebSocketClientRequesterProperties {
376 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
378 let url = WebSocketUrl::new(server_address)?;
379 Ok(Self {
380 server_address: url,
381 })
382 }
383}
384
385impl FeagiClientRequesterProperties for FeagiWebSocketClientRequesterProperties {
386 fn as_boxed_client_requester(&self) -> Box<dyn FeagiClientRequester> {
387 Box::new(FeagiWebSocketClientRequester {
388 server_address: self.server_address.clone(),
389 current_state: FeagiEndpointState::Inactive,
390 socket: None,
391 receive_buffer: None,
392 has_data: false,
393 })
394 }
395
396 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
397 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
398 }
399}
400
401pub struct FeagiWebSocketClientRequester {
407 server_address: WebSocketUrl,
408 current_state: FeagiEndpointState,
409 socket: Option<WsStream>,
410 receive_buffer: Option<Vec<u8>>,
411 has_data: bool,
412}
413
414impl FeagiWebSocketClientRequester {
415 fn try_receive(&mut self) -> bool {
416 let socket = match &mut self.socket {
417 Some(s) => s,
418 None => return false,
419 };
420
421 match socket.read() {
422 Ok(Message::Binary(data)) => {
423 self.receive_buffer = Some(data);
424 true
425 }
426 Ok(Message::Text(text)) => {
427 self.receive_buffer = Some(text.into_bytes());
428 true
429 }
430 Ok(Message::Close(_)) => {
431 self.current_state = FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(
432 "Connection closed".to_string(),
433 ));
434 false
435 }
436 Ok(_) => false,
437 Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => false,
438 Err(e) => {
439 self.current_state =
440 FeagiEndpointState::Errored(FeagiNetworkError::ReceiveFailed(e.to_string()));
441 false
442 }
443 }
444 }
445}
446
447impl FeagiClient for FeagiWebSocketClientRequester {
448 fn poll(&mut self) -> &FeagiEndpointState {
449 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting)
450 && !self.has_data
451 && self.try_receive()
452 {
453 self.has_data = true;
454 self.current_state = FeagiEndpointState::ActiveHasData;
455 }
456 &self.current_state
457 }
458
459 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
460 match &self.current_state {
461 FeagiEndpointState::Inactive => {
462 let (socket, _response) = connect(self.server_address.as_str())
463 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
464
465 if let tungstenite::stream::MaybeTlsStream::Plain(ref stream) = socket.get_ref() {
466 stream
467 .set_nonblocking(true)
468 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
469 }
470
471 self.socket = Some(socket);
472 self.current_state = FeagiEndpointState::ActiveWaiting;
473 Ok(())
474 }
475 _ => Err(FeagiNetworkError::InvalidSocketProperties(
476 "Cannot connect: client is not in Inactive state".to_string(),
477 )),
478 }
479 }
480
481 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
482 match &self.current_state {
483 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
484 if let Some(mut socket) = self.socket.take() {
485 let _ = socket.close(None);
486 }
487 self.receive_buffer = None;
488 self.has_data = false;
489 self.current_state = FeagiEndpointState::Inactive;
490 Ok(())
491 }
492 _ => Err(FeagiNetworkError::InvalidSocketProperties(
493 "Cannot disconnect: client is not in Active state".to_string(),
494 )),
495 }
496 }
497
498 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
499 match &self.current_state {
500 FeagiEndpointState::Errored(_) => {
501 if let Some(mut socket) = self.socket.take() {
502 let _ = socket.close(None);
503 }
504 self.receive_buffer = None;
505 self.has_data = false;
506 self.current_state = FeagiEndpointState::Inactive;
507 Ok(())
508 }
509 _ => Err(FeagiNetworkError::InvalidSocketProperties(
510 "Cannot confirm error: client is not in Errored state".to_string(),
511 )),
512 }
513 }
514
515 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
516 TransportProtocolEndpoint::WebSocket(self.server_address.clone())
517 }
518}
519
520impl FeagiClientRequester for FeagiWebSocketClientRequester {
521 fn publish_request(&mut self, request: &[u8]) -> Result<(), FeagiNetworkError> {
522 match &self.current_state {
523 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
524 let socket = self
525 .socket
526 .as_mut()
527 .ok_or_else(|| FeagiNetworkError::SendFailed("Not connected".to_string()))?;
528
529 let message = Message::Binary(request.to_vec());
530 socket
531 .send(message)
532 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
533 socket
534 .flush()
535 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
536
537 Ok(())
538 }
539 _ => Err(FeagiNetworkError::SendFailed(
540 "Cannot send request: client is not in Active state".to_string(),
541 )),
542 }
543 }
544
545 fn consume_retrieved_response(&mut self) -> Result<&[u8], FeagiNetworkError> {
546 match &self.current_state {
547 FeagiEndpointState::ActiveHasData => {
548 if self.has_data {
549 if let Some(ref data) = self.receive_buffer {
550 self.has_data = false;
551 self.current_state = FeagiEndpointState::ActiveWaiting;
552 Ok(data.as_slice())
553 } else {
554 Err(FeagiNetworkError::ReceiveFailed(
555 "No data in buffer".to_string(),
556 ))
557 }
558 } else {
559 Err(FeagiNetworkError::ReceiveFailed(
560 "No response available".to_string(),
561 ))
562 }
563 }
564 _ => Err(FeagiNetworkError::ReceiveFailed(
565 "Cannot consume: no response available".to_string(),
566 )),
567 }
568 }
569
570 fn as_boxed_requester_properties(&self) -> Box<dyn FeagiClientRequesterProperties> {
571 Box::new(FeagiWebSocketClientRequesterProperties {
572 server_address: self.server_address.clone(),
573 })
574 }
575}
576
577