1use std::collections::HashMap;
7use std::io::ErrorKind;
8use std::net::{TcpListener, TcpStream};
9
10use tungstenite::handshake::server::{NoCallback, ServerHandshake};
11use tungstenite::handshake::HandshakeError;
12use tungstenite::handshake::MidHandshake;
13use tungstenite::{accept, Message, WebSocket};
14
15use crate::protocol_implementations::websocket::shared::WebSocketUrl;
16use crate::traits_and_enums::server::{
17 FeagiServer, FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
18 FeagiServerPullerProperties, FeagiServerRouter, FeagiServerRouterProperties,
19};
20use crate::traits_and_enums::shared::{
21 FeagiEndpointState, TransportProtocolEndpoint, TransportProtocolImplementation,
22};
23use crate::{AgentID, FeagiNetworkError};
24use feagi_serialization::FeagiByteContainer;
25
26type WsStream = WebSocket<TcpStream>;
28type WsMidServerHandshake = MidHandshake<ServerHandshake<TcpStream, NoCallback>>;
29
30#[allow(dead_code)]
32enum HandshakeState {
33 Handshaking(WsMidServerHandshake),
35 Ready(WsStream),
37 Failed,
39}
40
41fn begin_nonblocking_handshake(stream: TcpStream) -> Option<HandshakeState> {
42 if stream.set_nonblocking(true).is_err() {
43 return None;
44 }
45 match accept(stream) {
46 Ok(ws) => Some(HandshakeState::Ready(ws)),
47 Err(HandshakeError::Interrupted(mid)) => Some(HandshakeState::Handshaking(mid)),
48 Err(HandshakeError::Failure(_)) => None,
49 }
50}
51
52fn advance_nonblocking_handshake(state: &mut HandshakeState) {
53 if matches!(state, HandshakeState::Handshaking(_)) {
54 let current = std::mem::replace(state, HandshakeState::Failed);
55 let HandshakeState::Handshaking(mid) = current else {
56 return;
57 };
58 match mid.handshake() {
59 Ok(ws) => *state = HandshakeState::Ready(ws),
60 Err(HandshakeError::Interrupted(next_mid)) => {
61 *state = HandshakeState::Handshaking(next_mid);
62 }
63 Err(HandshakeError::Failure(_)) => *state = HandshakeState::Failed,
64 }
65 }
66}
67
68fn close_handshake_state(state: HandshakeState) {
69 if let HandshakeState::Ready(mut ws) = state {
70 let _ = ws.close(None);
71 }
72}
73
74#[derive(Debug, Clone, PartialEq)]
82pub struct FeagiWebSocketServerPublisherProperties {
83 local_bind_address: WebSocketUrl,
84 remote_bind_address: WebSocketUrl,
85}
86
87impl FeagiWebSocketServerPublisherProperties {
88 pub fn new(
90 local_bind_address: &str,
91 remote_bind_address: &str,
92 ) -> Result<Self, FeagiNetworkError> {
93 let local_bind_address = WebSocketUrl::new(local_bind_address)?;
94 let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
95 Ok(Self {
96 local_bind_address,
97 remote_bind_address,
98 })
99 }
100}
101
102impl FeagiServerPublisherProperties for FeagiWebSocketServerPublisherProperties {
103 fn as_boxed_server_publisher(&self) -> Box<dyn FeagiServerPublisher> {
104 Box::new(FeagiWebSocketServerPublisher {
105 local_bind_address: self.local_bind_address.clone(),
106 remote_bind_address: self.remote_bind_address.clone(),
107 current_state: FeagiEndpointState::Inactive,
108 listener: None,
109 clients: Vec::new(),
110 })
111 }
112
113 fn get_bind_point(&self) -> TransportProtocolEndpoint {
114 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
115 }
116
117 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
118 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
119 }
120
121 fn get_protocol(&self) -> TransportProtocolImplementation {
122 TransportProtocolImplementation::WebSocket
123 }
124}
125
126pub struct FeagiWebSocketServerPublisher {
132 local_bind_address: WebSocketUrl,
133 remote_bind_address: WebSocketUrl,
134 current_state: FeagiEndpointState,
135 listener: Option<TcpListener>,
136 clients: Vec<HandshakeState>,
137}
138
139impl FeagiWebSocketServerPublisher {
140 fn accept_pending_connections(&mut self) {
142 let listener = match &self.listener {
143 Some(l) => l,
144 None => return,
145 };
146
147 loop {
149 match listener.accept() {
150 Ok((stream, _addr)) => {
151 if let Some(state) = begin_nonblocking_handshake(stream) {
152 self.clients.push(state);
153 }
154 }
155 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
156 break;
158 }
159 Err(_) => {
160 break;
161 }
162 }
163 }
164 }
165
166 fn process_handshakes(&mut self) {
167 let mut failed_indices = Vec::new();
168 for (i, state) in self.clients.iter_mut().enumerate() {
169 advance_nonblocking_handshake(state);
170 if matches!(state, HandshakeState::Failed) {
171 failed_indices.push(i);
172 }
173 }
174 for i in failed_indices.into_iter().rev() {
175 let _ = self.clients.remove(i);
176 }
177 }
178
179 pub fn client_count(&self) -> usize {
181 self.clients
182 .iter()
183 .filter(|s| matches!(s, HandshakeState::Ready(_)))
184 .count()
185 }
186}
187
188impl FeagiServer for FeagiWebSocketServerPublisher {
189 fn poll(&mut self) -> &FeagiEndpointState {
190 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) {
191 self.accept_pending_connections();
192 self.process_handshakes();
193 }
194 &self.current_state
195 }
196
197 fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
198 match &self.current_state {
199 FeagiEndpointState::Inactive => {
200 let listener = TcpListener::bind(self.local_bind_address.host_port())
201 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
202
203 listener
205 .set_nonblocking(true)
206 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
207
208 self.listener = Some(listener);
209 self.current_state = FeagiEndpointState::ActiveWaiting;
210 Ok(())
211 }
212 _ => Err(FeagiNetworkError::InvalidSocketProperties(
213 "Cannot start: server is not in Inactive state".to_string(),
214 )),
215 }
216 }
217
218 fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
219 match &self.current_state {
220 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
221 for state in self.clients.drain(..) {
223 if let HandshakeState::Ready(mut client) = state {
224 let _ = client.close(None);
225 }
226 }
227 self.listener = None;
228 self.current_state = FeagiEndpointState::Inactive;
229 Ok(())
230 }
231 _ => Err(FeagiNetworkError::InvalidSocketProperties(
232 "Cannot stop: server is not in Active state".to_string(),
233 )),
234 }
235 }
236
237 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
238 match &self.current_state {
239 FeagiEndpointState::Errored(_) => {
240 for state in self.clients.drain(..) {
241 if let HandshakeState::Ready(mut client) = state {
242 let _ = client.close(None);
243 }
244 }
245 self.listener = None;
246 self.current_state = FeagiEndpointState::Inactive;
247 Ok(())
248 }
249 _ => Err(FeagiNetworkError::InvalidSocketProperties(
250 "Cannot confirm error: server is not in Errored state".to_string(),
251 )),
252 }
253 }
254
255 fn get_bind_point(&self) -> TransportProtocolEndpoint {
256 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
257 }
258
259 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
260 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
261 }
262
263 fn get_protocol(&self) -> TransportProtocolImplementation {
264 TransportProtocolImplementation::WebSocket
265 }
266}
267
268impl FeagiServerPublisher for FeagiWebSocketServerPublisher {
269 fn publish_data(&mut self, data: &[u8]) -> Result<(), FeagiNetworkError> {
270 match &self.current_state {
271 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
272 let message = Message::Binary(data.to_vec());
273
274 let mut failed_indices = Vec::new();
276 for (i, state) in self.clients.iter_mut().enumerate() {
277 let client = match state {
278 HandshakeState::Ready(ws) => ws,
279 HandshakeState::Handshaking(_) => continue,
280 HandshakeState::Failed => {
281 failed_indices.push(i);
282 continue;
283 }
284 };
285 match client.send(message.clone()) {
287 Ok(()) => {}
288 Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {
289 }
291 Err(_) => {
292 failed_indices.push(i);
293 }
294 }
295 let _ = client.flush();
297 }
298
299 for i in failed_indices.into_iter().rev() {
301 close_handshake_state(self.clients.remove(i));
302 }
303
304 Ok(())
305 }
306 _ => Err(FeagiNetworkError::SendFailed(
307 "Cannot publish: server is not in Active state".to_string(),
308 )),
309 }
310 }
311
312 fn as_boxed_publisher_properties(&self) -> Box<dyn FeagiServerPublisherProperties> {
313 Box::new(FeagiWebSocketServerPublisherProperties {
314 local_bind_address: self.local_bind_address.clone(),
315 remote_bind_address: self.remote_bind_address.clone(),
316 })
317 }
318}
319
320#[derive(Debug, Clone, PartialEq)]
330pub struct FeagiWebSocketServerPullerProperties {
331 local_bind_address: WebSocketUrl,
332 remote_bind_address: WebSocketUrl,
333}
334
335impl FeagiWebSocketServerPullerProperties {
336 pub fn new(local_bind_address: &str) -> Result<Self, FeagiNetworkError> {
340 Self::new_with_remote(local_bind_address, local_bind_address)
341 }
342
343 pub fn new_with_remote(
345 local_bind_address: &str,
346 remote_bind_address: &str,
347 ) -> Result<Self, FeagiNetworkError> {
348 let local_bind_address = WebSocketUrl::new(local_bind_address)?;
349 let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
350 Ok(Self {
351 local_bind_address,
352 remote_bind_address,
353 })
354 }
355}
356
357impl FeagiServerPullerProperties for FeagiWebSocketServerPullerProperties {
358 fn as_boxed_server_puller(&self) -> Box<dyn FeagiServerPuller> {
359 Box::new(FeagiWebSocketServerPuller {
360 local_bind_address: self.local_bind_address.clone(),
361 remote_bind_address: self.remote_bind_address.clone(),
362 current_state: FeagiEndpointState::Inactive,
363 listener: None,
364 clients: Vec::new(),
365 receive_buffer: None,
366 has_data: false,
367 })
368 }
369
370 fn get_bind_point(&self) -> TransportProtocolEndpoint {
371 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
372 }
373
374 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
375 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
376 }
377
378 fn get_protocol(&self) -> TransportProtocolImplementation {
379 TransportProtocolImplementation::WebSocket
380 }
381}
382
383pub struct FeagiWebSocketServerPuller {
389 local_bind_address: WebSocketUrl,
390 remote_bind_address: WebSocketUrl,
391 current_state: FeagiEndpointState,
392 listener: Option<TcpListener>,
393 clients: Vec<HandshakeState>,
394 receive_buffer: Option<Vec<u8>>,
396 has_data: bool,
397}
398
399impl FeagiWebSocketServerPuller {
400 fn accept_pending_connections(&mut self) {
401 let listener = match &self.listener {
402 Some(l) => l,
403 None => return,
404 };
405
406 loop {
407 match listener.accept() {
408 Ok((stream, _addr)) => {
409 if let Some(state) = begin_nonblocking_handshake(stream) {
410 self.clients.push(state);
411 }
412 }
413 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
414 Err(_) => break,
415 }
416 }
417 }
418
419 fn process_handshakes(&mut self) {
420 let mut failed_indices = Vec::new();
421 for (i, state) in self.clients.iter_mut().enumerate() {
422 advance_nonblocking_handshake(state);
423 if matches!(state, HandshakeState::Failed) {
424 failed_indices.push(i);
425 }
426 }
427 for i in failed_indices.into_iter().rev() {
428 let _ = self.clients.remove(i);
429 }
430 }
431
432 fn try_receive(&mut self) -> bool {
434 let mut failed_indices = Vec::new();
435
436 for (i, state) in self.clients.iter_mut().enumerate() {
437 let client = match state {
438 HandshakeState::Ready(ws) => ws,
439 HandshakeState::Handshaking(_) => continue,
440 HandshakeState::Failed => {
441 failed_indices.push(i);
442 continue;
443 }
444 };
445 match client.read() {
446 Ok(Message::Binary(data)) => {
447 self.receive_buffer = Some(data);
448 for idx in failed_indices.into_iter().rev() {
450 close_handshake_state(self.clients.remove(idx));
451 }
452 return true;
453 }
454 Ok(Message::Text(text)) => {
455 self.receive_buffer = Some(text.into_bytes());
456 for idx in failed_indices.into_iter().rev() {
457 close_handshake_state(self.clients.remove(idx));
458 }
459 return true;
460 }
461 Ok(Message::Close(_)) => {
462 failed_indices.push(i);
463 }
464 Ok(_) => {
465 }
467 Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {
468 }
470 Err(_) => {
471 failed_indices.push(i);
472 }
473 }
474 }
475
476 for i in failed_indices.into_iter().rev() {
478 close_handshake_state(self.clients.remove(i));
479 }
480
481 false
482 }
483}
484
485impl FeagiServer for FeagiWebSocketServerPuller {
486 fn poll(&mut self) -> &FeagiEndpointState {
487 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
488 self.accept_pending_connections();
489 self.process_handshakes();
490
491 if self.try_receive() {
492 self.has_data = true;
493 self.current_state = FeagiEndpointState::ActiveHasData;
494 }
495 }
496 &self.current_state
497 }
498
499 fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
500 match &self.current_state {
501 FeagiEndpointState::Inactive => {
502 let listener = TcpListener::bind(self.local_bind_address.host_port())
503 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
504 listener
505 .set_nonblocking(true)
506 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
507
508 self.listener = Some(listener);
509 self.current_state = FeagiEndpointState::ActiveWaiting;
510 Ok(())
511 }
512 _ => Err(FeagiNetworkError::InvalidSocketProperties(
513 "Cannot start: server is not in Inactive state".to_string(),
514 )),
515 }
516 }
517
518 fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
519 match &self.current_state {
520 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
521 for state in self.clients.drain(..) {
522 if let HandshakeState::Ready(mut client) = state {
523 let _ = client.close(None);
524 }
525 }
526 self.listener = None;
527 self.receive_buffer = None;
528 self.has_data = false;
529 self.current_state = FeagiEndpointState::Inactive;
530 Ok(())
531 }
532 _ => Err(FeagiNetworkError::InvalidSocketProperties(
533 "Cannot stop: server is not in Active state".to_string(),
534 )),
535 }
536 }
537
538 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
539 match &self.current_state {
540 FeagiEndpointState::Errored(_) => {
541 for state in self.clients.drain(..) {
542 if let HandshakeState::Ready(mut client) = state {
543 let _ = client.close(None);
544 }
545 }
546 self.listener = None;
547 self.receive_buffer = None;
548 self.has_data = false;
549 self.current_state = FeagiEndpointState::Inactive;
550 Ok(())
551 }
552 _ => Err(FeagiNetworkError::InvalidSocketProperties(
553 "Cannot confirm error: server is not in Errored state".to_string(),
554 )),
555 }
556 }
557
558 fn get_bind_point(&self) -> TransportProtocolEndpoint {
559 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
560 }
561
562 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
563 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
564 }
565
566 fn get_protocol(&self) -> TransportProtocolImplementation {
567 TransportProtocolImplementation::WebSocket
568 }
569}
570
571impl FeagiServerPuller for FeagiWebSocketServerPuller {
572 fn consume_retrieved_data(&mut self) -> Result<&[u8], FeagiNetworkError> {
573 match &self.current_state {
574 FeagiEndpointState::ActiveHasData => {
575 if self.has_data {
576 if let Some(ref data) = self.receive_buffer {
577 self.has_data = false;
578 self.current_state = FeagiEndpointState::ActiveWaiting;
579 Ok(data.as_slice())
580 } else {
581 Err(FeagiNetworkError::ReceiveFailed(
582 "No data in buffer".to_string(),
583 ))
584 }
585 } else {
586 Err(FeagiNetworkError::ReceiveFailed(
587 "No data available".to_string(),
588 ))
589 }
590 }
591 _ => Err(FeagiNetworkError::ReceiveFailed(
592 "Cannot consume: no data available".to_string(),
593 )),
594 }
595 }
596
597 fn as_boxed_puller_properties(&self) -> Box<dyn FeagiServerPullerProperties> {
598 Box::new(FeagiWebSocketServerPullerProperties {
599 local_bind_address: self.local_bind_address.clone(),
600 remote_bind_address: self.remote_bind_address.clone(),
601 })
602 }
603}
604
605#[derive(Debug, Clone, PartialEq)]
615pub struct FeagiWebSocketServerRouterProperties {
616 local_bind_address: WebSocketUrl,
617 remote_bind_address: WebSocketUrl,
618}
619
620impl FeagiWebSocketServerRouterProperties {
621 pub fn new(local_bind_address: &str) -> Result<Self, FeagiNetworkError> {
625 Self::new_with_remote(local_bind_address, local_bind_address)
626 }
627
628 pub fn new_with_remote(
630 local_bind_address: &str,
631 remote_bind_address: &str,
632 ) -> Result<Self, FeagiNetworkError> {
633 let local_bind_address = WebSocketUrl::new(local_bind_address)?;
634 let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
635 Ok(Self {
636 local_bind_address,
637 remote_bind_address,
638 })
639 }
640}
641
642impl FeagiServerRouterProperties for FeagiWebSocketServerRouterProperties {
643 fn as_boxed_server_router(&self) -> Box<dyn FeagiServerRouter> {
644 Box::new(FeagiWebSocketServerRouter {
645 local_bind_address: self.local_bind_address.clone(),
646 remote_bind_address: self.remote_bind_address.clone(),
647 current_state: FeagiEndpointState::Inactive,
648 listener: None,
649 clients: Vec::new(),
650 receive_buffer: None,
651 current_session: None,
652 has_data: false,
653 index_to_session: HashMap::new(),
654 session_to_index: HashMap::new(),
655 })
656 }
657
658 fn get_bind_point(&self) -> TransportProtocolEndpoint {
659 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
660 }
661
662 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
663 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
664 }
665
666 fn get_protocol(&self) -> TransportProtocolImplementation {
667 TransportProtocolImplementation::WebSocket
668 }
669}
670
671pub struct FeagiWebSocketServerRouter {
677 local_bind_address: WebSocketUrl,
678 remote_bind_address: WebSocketUrl,
679 current_state: FeagiEndpointState,
680 listener: Option<TcpListener>,
681 clients: Vec<HandshakeState>,
683 receive_buffer: Option<Vec<u8>>,
685 current_session: Option<AgentID>,
687 has_data: bool,
688 index_to_session: HashMap<usize, AgentID>,
690 session_to_index: HashMap<AgentID, usize>,
692}
693
694impl FeagiWebSocketServerRouter {
695 fn try_extract_agent_id_from_payload(payload: &[u8]) -> Option<AgentID> {
696 let start = FeagiByteContainer::GLOBAL_BYTE_HEADER_BYTE_COUNT;
697 let end = start + FeagiByteContainer::AGENT_ID_BYTE_COUNT;
698 if payload.len() < end {
699 return None;
700 }
701
702 let mut id_bytes = [0u8; AgentID::NUMBER_BYTES];
703 id_bytes.copy_from_slice(&payload[start..end]);
704 let parsed_id = AgentID::new(id_bytes);
705 if parsed_id.is_blank() {
706 return None;
707 }
708 Some(parsed_id)
709 }
710
711 fn remap_client_session(&mut self, client_index: usize, new_session_id: AgentID) {
712 let previous_session = self.index_to_session.insert(client_index, new_session_id);
713 if let Some(old_session_id) = previous_session {
714 self.session_to_index.remove(&old_session_id);
715 }
716
717 if let Some(previous_index) = self.session_to_index.insert(new_session_id, client_index) {
718 if previous_index != client_index {
719 self.index_to_session.remove(&previous_index);
720 }
721 }
722 }
723
724 fn align_session_with_payload_agent_id(&mut self, client_index: usize, payload: &[u8]) {
725 if let Some(payload_agent_id) = Self::try_extract_agent_id_from_payload(payload) {
726 self.remap_client_session(client_index, payload_agent_id);
727 }
728 }
729
730 fn accept_pending_connections(&mut self) {
731 let listener = match &self.listener {
732 Some(l) => l,
733 None => return,
734 };
735
736 loop {
738 match listener.accept() {
739 Ok((stream, _addr)) => {
740 if let Some(state) = begin_nonblocking_handshake(stream) {
741 self.clients.push(state);
742 }
743 }
744 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
745 Err(_) => break,
746 }
747 }
748 }
749
750 fn process_handshakes(&mut self) {
751 let mut indices_to_remove = Vec::new();
752
753 for (i, state) in self.clients.iter_mut().enumerate() {
754 match state {
755 HandshakeState::Handshaking(_) => {
756 advance_nonblocking_handshake(state);
757 match state {
758 HandshakeState::Ready(_) => {
759 let session_id = AgentID::new_random();
760 self.index_to_session.insert(i, session_id);
761 self.session_to_index.insert(session_id, i);
762 }
763 HandshakeState::Failed => indices_to_remove.push(i),
764 HandshakeState::Handshaking(_) => {}
765 }
766 }
767 HandshakeState::Failed => {
768 indices_to_remove.push(i);
769 }
770 HandshakeState::Ready(_) => {
771 if let std::collections::hash_map::Entry::Vacant(e) =
775 self.index_to_session.entry(i)
776 {
777 let session_id = AgentID::new_random();
778 e.insert(session_id);
779 self.session_to_index.insert(session_id, i);
780 }
781 }
782 }
783 }
784
785 for &i in indices_to_remove.iter().rev() {
787 if i < self.clients.len() {
788 self.remove_client(i);
789 }
790 }
791 }
792
793 fn try_receive(&mut self) -> bool {
794 let mut failed_indices = Vec::new();
795
796 for i in 0..self.clients.len() {
797 let read_result = {
798 let client = match self.clients.get_mut(i) {
799 Some(HandshakeState::Ready(ws)) => ws,
800 _ => continue, };
802 client.read()
803 };
804
805 match read_result {
806 Ok(Message::Binary(data)) => {
807 self.align_session_with_payload_agent_id(i, &data);
808 if let Some(&session_id) = self.index_to_session.get(&i) {
809 self.receive_buffer = Some(data);
810 self.current_session = Some(session_id);
811 for idx in failed_indices.into_iter().rev() {
813 self.remove_client(idx);
814 }
815 return true;
816 }
817 }
818 Ok(Message::Text(text)) => {
819 let text_bytes = text.into_bytes();
820 self.align_session_with_payload_agent_id(i, &text_bytes);
821 if let Some(&session_id) = self.index_to_session.get(&i) {
822 self.receive_buffer = Some(text_bytes);
823 self.current_session = Some(session_id);
824 for idx in failed_indices.into_iter().rev() {
825 self.remove_client(idx);
826 }
827 return true;
828 }
829 }
830 Ok(Message::Close(_)) => {
831 failed_indices.push(i);
832 }
833 Ok(_) => {}
834 Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {}
835 Err(_) => {
836 failed_indices.push(i);
837 }
838 }
839 }
840
841 for i in failed_indices.into_iter().rev() {
842 self.remove_client(i);
843 }
844
845 false
846 }
847
848 fn remove_client(&mut self, index: usize) {
849 if index >= self.clients.len() {
850 return;
851 }
852
853 let removed_state = self.clients.remove(index);
855 if let HandshakeState::Ready(mut ws) = removed_state {
856 let _ = ws.close(None);
857 }
858
859 if let Some(session_id) = self.index_to_session.remove(&index) {
861 self.session_to_index.remove(&session_id);
862 }
863
864 let mut new_index_to_session = HashMap::new();
866 let mut new_session_to_index = HashMap::new();
867
868 for (old_idx, session_id) in self.index_to_session.drain() {
869 let new_idx = if old_idx > index {
870 old_idx - 1
871 } else {
872 old_idx
873 };
874 new_index_to_session.insert(new_idx, session_id);
875 new_session_to_index.insert(session_id, new_idx);
876 }
877
878 self.index_to_session = new_index_to_session;
879 self.session_to_index = new_session_to_index;
880 }
881}
882
883impl FeagiServer for FeagiWebSocketServerRouter {
884 fn poll(&mut self) -> &FeagiEndpointState {
885 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
886 self.accept_pending_connections();
887 self.process_handshakes(); if self.try_receive() {
890 self.has_data = true;
891 self.current_state = FeagiEndpointState::ActiveHasData;
892 }
893 }
894 &self.current_state
895 }
896
897 fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
898 match &self.current_state {
899 FeagiEndpointState::Inactive => {
900 let listener = TcpListener::bind(self.local_bind_address.host_port())
901 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
902 listener
903 .set_nonblocking(true)
904 .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
905
906 self.listener = Some(listener);
907 self.current_state = FeagiEndpointState::ActiveWaiting;
908 Ok(())
909 }
910 _ => Err(FeagiNetworkError::InvalidSocketProperties(
911 "Cannot start: server is not in Inactive state".to_string(),
912 )),
913 }
914 }
915
916 fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
917 match &self.current_state {
918 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
919 for state in self.clients.drain(..) {
920 if let HandshakeState::Ready(mut ws) = state {
921 let _ = ws.close(None);
922 }
923 }
924 self.listener = None;
925 self.receive_buffer = None;
926 self.current_session = None;
927 self.has_data = false;
928 self.index_to_session.clear();
929 self.session_to_index.clear();
930 self.current_state = FeagiEndpointState::Inactive;
931 Ok(())
932 }
933 _ => Err(FeagiNetworkError::InvalidSocketProperties(
934 "Cannot stop: server is not in Active state".to_string(),
935 )),
936 }
937 }
938
939 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
940 match &self.current_state {
941 FeagiEndpointState::Errored(_) => {
942 for state in self.clients.drain(..) {
943 if let HandshakeState::Ready(mut ws) = state {
944 let _ = ws.close(None);
945 }
946 }
947 self.listener = None;
948 self.receive_buffer = None;
949 self.current_session = None;
950 self.has_data = false;
951 self.index_to_session.clear();
952 self.session_to_index.clear();
953 self.current_state = FeagiEndpointState::Inactive;
954 Ok(())
955 }
956 _ => Err(FeagiNetworkError::InvalidSocketProperties(
957 "Cannot confirm error: server is not in Errored state".to_string(),
958 )),
959 }
960 }
961
962 fn get_bind_point(&self) -> TransportProtocolEndpoint {
963 TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
964 }
965
966 fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
967 TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
968 }
969
970 fn get_protocol(&self) -> TransportProtocolImplementation {
971 TransportProtocolImplementation::WebSocket
972 }
973}
974
975impl FeagiServerRouter for FeagiWebSocketServerRouter {
976 fn consume_retrieved_request(&mut self) -> Result<(AgentID, &[u8]), FeagiNetworkError> {
977 match &self.current_state {
978 FeagiEndpointState::ActiveHasData => {
979 if self.has_data {
980 if let (Some(ref data), Some(session_id)) =
981 (&self.receive_buffer, self.current_session)
982 {
983 self.has_data = false;
984 self.current_session = None;
985 self.current_state = FeagiEndpointState::ActiveWaiting;
986 Ok((session_id, data.as_slice()))
987 } else {
988 Err(FeagiNetworkError::ReceiveFailed(
989 "No data or session in buffer".to_string(),
990 ))
991 }
992 } else {
993 Err(FeagiNetworkError::ReceiveFailed(
994 "No data available".to_string(),
995 ))
996 }
997 }
998 _ => Err(FeagiNetworkError::ReceiveFailed(
999 "Cannot consume: no request available".to_string(),
1000 )),
1001 }
1002 }
1003
1004 fn publish_response(
1005 &mut self,
1006 session_id: AgentID,
1007 message: &[u8],
1008 ) -> Result<(), FeagiNetworkError> {
1009 match &self.current_state {
1010 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
1011 let client_index = *self.session_to_index.get(&session_id).ok_or_else(|| {
1012 FeagiNetworkError::SendFailed(format!("Unknown session: {:?}", session_id))
1013 })?;
1014
1015 if client_index >= self.clients.len() {
1016 return Err(FeagiNetworkError::SendFailed(
1017 "Client disconnected".to_string(),
1018 ));
1019 }
1020
1021 let ws_message = Message::Binary(message.to_vec());
1022
1023 let client = match &mut self.clients[client_index] {
1024 HandshakeState::Ready(ws) => ws,
1025 _ => {
1026 return Err(FeagiNetworkError::SendFailed(
1027 "Client not ready".to_string(),
1028 ))
1029 }
1030 };
1031
1032 client
1033 .send(ws_message)
1034 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
1035 client
1036 .flush()
1037 .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
1038
1039 Ok(())
1040 }
1041 _ => Err(FeagiNetworkError::SendFailed(
1042 "Cannot send response: server is not in Active state".to_string(),
1043 )),
1044 }
1045 }
1046
1047 fn as_boxed_router_properties(&self) -> Box<dyn FeagiServerRouterProperties> {
1048 Box::new(FeagiWebSocketServerRouterProperties {
1049 local_bind_address: self.local_bind_address.clone(),
1050 remote_bind_address: self.remote_bind_address.clone(),
1051 })
1052 }
1053}
1054
1055