Skip to main content

feagi_io/protocol_implementations/websocket/websocket_std/
server_implementations.rs

1//! WebSocket server implementations using the poll-based trait design.
2//!
3//! Uses `tungstenite` with non-blocking `std::net::TcpStream` for poll-based
4//! WebSocket communication that works with any async runtime or synchronously.
5
6use std::collections::HashMap;
7use std::io::ErrorKind;
8use std::net::{TcpListener, TcpStream};
9
10use tungstenite::handshake::server::{NoCallback, ServerHandshake};
11use tungstenite::handshake::HandshakeError;
12use tungstenite::handshake::MidHandshake;
13use tungstenite::{accept, Message, WebSocket};
14
15use crate::protocol_implementations::websocket::shared::WebSocketUrl;
16use crate::traits_and_enums::server::{
17    FeagiServer, FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
18    FeagiServerPullerProperties, FeagiServerRouter, FeagiServerRouterProperties,
19};
20use crate::traits_and_enums::shared::{
21    FeagiEndpointState, TransportProtocolEndpoint, TransportProtocolImplementation,
22};
23use crate::{AgentID, FeagiNetworkError};
24use feagi_serialization::FeagiByteContainer;
25
26/// Type alias for WebSocket over TcpStream
27type WsStream = WebSocket<TcpStream>;
28type WsMidServerHandshake = MidHandshake<ServerHandshake<TcpStream, NoCallback>>;
29
30/// State of a WebSocket connection during handshake
31#[allow(dead_code)]
32enum HandshakeState {
33    /// TCP accepted, handshake in progress
34    Handshaking(WsMidServerHandshake),
35    /// Handshake complete, WebSocket ready
36    Ready(WsStream),
37    /// Handshake failed
38    Failed,
39}
40
41fn begin_nonblocking_handshake(stream: TcpStream) -> Option<HandshakeState> {
42    if stream.set_nonblocking(true).is_err() {
43        return None;
44    }
45    match accept(stream) {
46        Ok(ws) => Some(HandshakeState::Ready(ws)),
47        Err(HandshakeError::Interrupted(mid)) => Some(HandshakeState::Handshaking(mid)),
48        Err(HandshakeError::Failure(_)) => None,
49    }
50}
51
52fn advance_nonblocking_handshake(state: &mut HandshakeState) {
53    if matches!(state, HandshakeState::Handshaking(_)) {
54        let current = std::mem::replace(state, HandshakeState::Failed);
55        let HandshakeState::Handshaking(mid) = current else {
56            return;
57        };
58        match mid.handshake() {
59            Ok(ws) => *state = HandshakeState::Ready(ws),
60            Err(HandshakeError::Interrupted(next_mid)) => {
61                *state = HandshakeState::Handshaking(next_mid);
62            }
63            Err(HandshakeError::Failure(_)) => *state = HandshakeState::Failed,
64        }
65    }
66}
67
68fn close_handshake_state(state: HandshakeState) {
69    if let HandshakeState::Ready(mut ws) = state {
70        let _ = ws.close(None);
71    }
72}
73
74// ============================================================================
75// Publisher
76// ============================================================================
77
78//region Publisher Properties
79
80/// Configuration properties for creating a WebSocket publisher server.
81#[derive(Debug, Clone, PartialEq)]
82pub struct FeagiWebSocketServerPublisherProperties {
83    local_bind_address: WebSocketUrl,
84    remote_bind_address: WebSocketUrl,
85}
86
87impl FeagiWebSocketServerPublisherProperties {
88    /// Creates new publisher properties with explicit local/remote endpoints.
89    pub fn new(
90        local_bind_address: &str,
91        remote_bind_address: &str,
92    ) -> Result<Self, FeagiNetworkError> {
93        let local_bind_address = WebSocketUrl::new(local_bind_address)?;
94        let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
95        Ok(Self {
96            local_bind_address,
97            remote_bind_address,
98        })
99    }
100}
101
102impl FeagiServerPublisherProperties for FeagiWebSocketServerPublisherProperties {
103    fn as_boxed_server_publisher(&self) -> Box<dyn FeagiServerPublisher> {
104        Box::new(FeagiWebSocketServerPublisher {
105            local_bind_address: self.local_bind_address.clone(),
106            remote_bind_address: self.remote_bind_address.clone(),
107            current_state: FeagiEndpointState::Inactive,
108            listener: None,
109            clients: Vec::new(),
110        })
111    }
112
113    fn get_bind_point(&self) -> TransportProtocolEndpoint {
114        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
115    }
116
117    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
118        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
119    }
120
121    fn get_protocol(&self) -> TransportProtocolImplementation {
122        TransportProtocolImplementation::WebSocket
123    }
124}
125
126//endregion
127
128//region Publisher Implementation
129
130/// A WebSocket server that broadcasts data to all connected clients.
131pub struct FeagiWebSocketServerPublisher {
132    local_bind_address: WebSocketUrl,
133    remote_bind_address: WebSocketUrl,
134    current_state: FeagiEndpointState,
135    listener: Option<TcpListener>,
136    clients: Vec<HandshakeState>,
137}
138
139impl FeagiWebSocketServerPublisher {
140    /// Accept any pending connections (non-blocking).
141    fn accept_pending_connections(&mut self) {
142        let listener = match &self.listener {
143            Some(l) => l,
144            None => return,
145        };
146
147        // Try to accept connections in non-blocking mode
148        loop {
149            match listener.accept() {
150                Ok((stream, _addr)) => {
151                    if let Some(state) = begin_nonblocking_handshake(stream) {
152                        self.clients.push(state);
153                    }
154                }
155                Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
156                    // No more pending connections
157                    break;
158                }
159                Err(_) => {
160                    break;
161                }
162            }
163        }
164    }
165
166    fn process_handshakes(&mut self) {
167        let mut failed_indices = Vec::new();
168        for (i, state) in self.clients.iter_mut().enumerate() {
169            advance_nonblocking_handshake(state);
170            if matches!(state, HandshakeState::Failed) {
171                failed_indices.push(i);
172            }
173        }
174        for i in failed_indices.into_iter().rev() {
175            let _ = self.clients.remove(i);
176        }
177    }
178
179    /// Get the number of connected clients.
180    pub fn client_count(&self) -> usize {
181        self.clients
182            .iter()
183            .filter(|s| matches!(s, HandshakeState::Ready(_)))
184            .count()
185    }
186}
187
188impl FeagiServer for FeagiWebSocketServerPublisher {
189    fn poll(&mut self) -> &FeagiEndpointState {
190        if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) {
191            self.accept_pending_connections();
192            self.process_handshakes();
193        }
194        &self.current_state
195    }
196
197    fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
198        match &self.current_state {
199            FeagiEndpointState::Inactive => {
200                let listener = TcpListener::bind(self.local_bind_address.host_port())
201                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
202
203                // Set listener to non-blocking for poll-based accept
204                listener
205                    .set_nonblocking(true)
206                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
207
208                self.listener = Some(listener);
209                self.current_state = FeagiEndpointState::ActiveWaiting;
210                Ok(())
211            }
212            _ => Err(FeagiNetworkError::InvalidSocketProperties(
213                "Cannot start: server is not in Inactive state".to_string(),
214            )),
215        }
216    }
217
218    fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
219        match &self.current_state {
220            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
221                // Close all client connections
222                for state in self.clients.drain(..) {
223                    if let HandshakeState::Ready(mut client) = state {
224                        let _ = client.close(None);
225                    }
226                }
227                self.listener = None;
228                self.current_state = FeagiEndpointState::Inactive;
229                Ok(())
230            }
231            _ => Err(FeagiNetworkError::InvalidSocketProperties(
232                "Cannot stop: server is not in Active state".to_string(),
233            )),
234        }
235    }
236
237    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
238        match &self.current_state {
239            FeagiEndpointState::Errored(_) => {
240                for state in self.clients.drain(..) {
241                    if let HandshakeState::Ready(mut client) = state {
242                        let _ = client.close(None);
243                    }
244                }
245                self.listener = None;
246                self.current_state = FeagiEndpointState::Inactive;
247                Ok(())
248            }
249            _ => Err(FeagiNetworkError::InvalidSocketProperties(
250                "Cannot confirm error: server is not in Errored state".to_string(),
251            )),
252        }
253    }
254
255    fn get_bind_point(&self) -> TransportProtocolEndpoint {
256        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
257    }
258
259    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
260        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
261    }
262
263    fn get_protocol(&self) -> TransportProtocolImplementation {
264        TransportProtocolImplementation::WebSocket
265    }
266}
267
268impl FeagiServerPublisher for FeagiWebSocketServerPublisher {
269    fn publish_data(&mut self, data: &[u8]) -> Result<(), FeagiNetworkError> {
270        match &self.current_state {
271            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
272                let message = Message::Binary(data.to_vec());
273
274                // Send to all clients, tracking which ones fail
275                let mut failed_indices = Vec::new();
276                for (i, state) in self.clients.iter_mut().enumerate() {
277                    let client = match state {
278                        HandshakeState::Ready(ws) => ws,
279                        HandshakeState::Handshaking(_) => continue,
280                        HandshakeState::Failed => {
281                            failed_indices.push(i);
282                            continue;
283                        }
284                    };
285                    // Try to send, handling WouldBlock gracefully
286                    match client.send(message.clone()) {
287                        Ok(()) => {}
288                        Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {
289                            // Socket buffer full - could buffer, but for now skip
290                        }
291                        Err(_) => {
292                            failed_indices.push(i);
293                        }
294                    }
295                    // Flush to actually send
296                    let _ = client.flush();
297                }
298
299                // Remove failed clients (in reverse order to preserve indices)
300                for i in failed_indices.into_iter().rev() {
301                    close_handshake_state(self.clients.remove(i));
302                }
303
304                Ok(())
305            }
306            _ => Err(FeagiNetworkError::SendFailed(
307                "Cannot publish: server is not in Active state".to_string(),
308            )),
309        }
310    }
311
312    fn as_boxed_publisher_properties(&self) -> Box<dyn FeagiServerPublisherProperties> {
313        Box::new(FeagiWebSocketServerPublisherProperties {
314            local_bind_address: self.local_bind_address.clone(),
315            remote_bind_address: self.remote_bind_address.clone(),
316        })
317    }
318}
319
320//endregion
321
322// ============================================================================
323// Puller
324// ============================================================================
325
326//region Puller Properties
327
328/// Configuration properties for creating a WebSocket puller server.
329#[derive(Debug, Clone, PartialEq)]
330pub struct FeagiWebSocketServerPullerProperties {
331    local_bind_address: WebSocketUrl,
332    remote_bind_address: WebSocketUrl,
333}
334
335impl FeagiWebSocketServerPullerProperties {
336    /// Creates new puller properties with local bind address.
337    ///
338    /// The remote endpoint defaults to the same address with `ws://` scheme.
339    pub fn new(local_bind_address: &str) -> Result<Self, FeagiNetworkError> {
340        Self::new_with_remote(local_bind_address, local_bind_address)
341    }
342
343    /// Creates new puller properties with explicit local/remote endpoints.
344    pub fn new_with_remote(
345        local_bind_address: &str,
346        remote_bind_address: &str,
347    ) -> Result<Self, FeagiNetworkError> {
348        let local_bind_address = WebSocketUrl::new(local_bind_address)?;
349        let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
350        Ok(Self {
351            local_bind_address,
352            remote_bind_address,
353        })
354    }
355}
356
357impl FeagiServerPullerProperties for FeagiWebSocketServerPullerProperties {
358    fn as_boxed_server_puller(&self) -> Box<dyn FeagiServerPuller> {
359        Box::new(FeagiWebSocketServerPuller {
360            local_bind_address: self.local_bind_address.clone(),
361            remote_bind_address: self.remote_bind_address.clone(),
362            current_state: FeagiEndpointState::Inactive,
363            listener: None,
364            clients: Vec::new(),
365            receive_buffer: None,
366            has_data: false,
367        })
368    }
369
370    fn get_bind_point(&self) -> TransportProtocolEndpoint {
371        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
372    }
373
374    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
375        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
376    }
377
378    fn get_protocol(&self) -> TransportProtocolImplementation {
379        TransportProtocolImplementation::WebSocket
380    }
381}
382
383//endregion
384
385//region Puller Implementation
386
387/// A WebSocket server that receives pushed data from clients.
388pub struct FeagiWebSocketServerPuller {
389    local_bind_address: WebSocketUrl,
390    remote_bind_address: WebSocketUrl,
391    current_state: FeagiEndpointState,
392    listener: Option<TcpListener>,
393    clients: Vec<HandshakeState>,
394    /// Buffer for received data
395    receive_buffer: Option<Vec<u8>>,
396    has_data: bool,
397}
398
399impl FeagiWebSocketServerPuller {
400    fn accept_pending_connections(&mut self) {
401        let listener = match &self.listener {
402            Some(l) => l,
403            None => return,
404        };
405
406        loop {
407            match listener.accept() {
408                Ok((stream, _addr)) => {
409                    if let Some(state) = begin_nonblocking_handshake(stream) {
410                        self.clients.push(state);
411                    }
412                }
413                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
414                Err(_) => break,
415            }
416        }
417    }
418
419    fn process_handshakes(&mut self) {
420        let mut failed_indices = Vec::new();
421        for (i, state) in self.clients.iter_mut().enumerate() {
422            advance_nonblocking_handshake(state);
423            if matches!(state, HandshakeState::Failed) {
424                failed_indices.push(i);
425            }
426        }
427        for i in failed_indices.into_iter().rev() {
428            let _ = self.clients.remove(i);
429        }
430    }
431
432    /// Try to receive data from any client (non-blocking).
433    fn try_receive(&mut self) -> bool {
434        let mut failed_indices = Vec::new();
435
436        for (i, state) in self.clients.iter_mut().enumerate() {
437            let client = match state {
438                HandshakeState::Ready(ws) => ws,
439                HandshakeState::Handshaking(_) => continue,
440                HandshakeState::Failed => {
441                    failed_indices.push(i);
442                    continue;
443                }
444            };
445            match client.read() {
446                Ok(Message::Binary(data)) => {
447                    self.receive_buffer = Some(data);
448                    // Remove failed clients before returning
449                    for idx in failed_indices.into_iter().rev() {
450                        close_handshake_state(self.clients.remove(idx));
451                    }
452                    return true;
453                }
454                Ok(Message::Text(text)) => {
455                    self.receive_buffer = Some(text.into_bytes());
456                    for idx in failed_indices.into_iter().rev() {
457                        close_handshake_state(self.clients.remove(idx));
458                    }
459                    return true;
460                }
461                Ok(Message::Close(_)) => {
462                    failed_indices.push(i);
463                }
464                Ok(_) => {
465                    // Ping/Pong/Frame - continue
466                }
467                Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {
468                    // No data from this client
469                }
470                Err(_) => {
471                    failed_indices.push(i);
472                }
473            }
474        }
475
476        // Remove failed clients
477        for i in failed_indices.into_iter().rev() {
478            close_handshake_state(self.clients.remove(i));
479        }
480
481        false
482    }
483}
484
485impl FeagiServer for FeagiWebSocketServerPuller {
486    fn poll(&mut self) -> &FeagiEndpointState {
487        if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
488            self.accept_pending_connections();
489            self.process_handshakes();
490
491            if self.try_receive() {
492                self.has_data = true;
493                self.current_state = FeagiEndpointState::ActiveHasData;
494            }
495        }
496        &self.current_state
497    }
498
499    fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
500        match &self.current_state {
501            FeagiEndpointState::Inactive => {
502                let listener = TcpListener::bind(self.local_bind_address.host_port())
503                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
504                listener
505                    .set_nonblocking(true)
506                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
507
508                self.listener = Some(listener);
509                self.current_state = FeagiEndpointState::ActiveWaiting;
510                Ok(())
511            }
512            _ => Err(FeagiNetworkError::InvalidSocketProperties(
513                "Cannot start: server is not in Inactive state".to_string(),
514            )),
515        }
516    }
517
518    fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
519        match &self.current_state {
520            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
521                for state in self.clients.drain(..) {
522                    if let HandshakeState::Ready(mut client) = state {
523                        let _ = client.close(None);
524                    }
525                }
526                self.listener = None;
527                self.receive_buffer = None;
528                self.has_data = false;
529                self.current_state = FeagiEndpointState::Inactive;
530                Ok(())
531            }
532            _ => Err(FeagiNetworkError::InvalidSocketProperties(
533                "Cannot stop: server is not in Active state".to_string(),
534            )),
535        }
536    }
537
538    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
539        match &self.current_state {
540            FeagiEndpointState::Errored(_) => {
541                for state in self.clients.drain(..) {
542                    if let HandshakeState::Ready(mut client) = state {
543                        let _ = client.close(None);
544                    }
545                }
546                self.listener = None;
547                self.receive_buffer = None;
548                self.has_data = false;
549                self.current_state = FeagiEndpointState::Inactive;
550                Ok(())
551            }
552            _ => Err(FeagiNetworkError::InvalidSocketProperties(
553                "Cannot confirm error: server is not in Errored state".to_string(),
554            )),
555        }
556    }
557
558    fn get_bind_point(&self) -> TransportProtocolEndpoint {
559        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
560    }
561
562    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
563        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
564    }
565
566    fn get_protocol(&self) -> TransportProtocolImplementation {
567        TransportProtocolImplementation::WebSocket
568    }
569}
570
571impl FeagiServerPuller for FeagiWebSocketServerPuller {
572    fn consume_retrieved_data(&mut self) -> Result<&[u8], FeagiNetworkError> {
573        match &self.current_state {
574            FeagiEndpointState::ActiveHasData => {
575                if self.has_data {
576                    if let Some(ref data) = self.receive_buffer {
577                        self.has_data = false;
578                        self.current_state = FeagiEndpointState::ActiveWaiting;
579                        Ok(data.as_slice())
580                    } else {
581                        Err(FeagiNetworkError::ReceiveFailed(
582                            "No data in buffer".to_string(),
583                        ))
584                    }
585                } else {
586                    Err(FeagiNetworkError::ReceiveFailed(
587                        "No data available".to_string(),
588                    ))
589                }
590            }
591            _ => Err(FeagiNetworkError::ReceiveFailed(
592                "Cannot consume: no data available".to_string(),
593            )),
594        }
595    }
596
597    fn as_boxed_puller_properties(&self) -> Box<dyn FeagiServerPullerProperties> {
598        Box::new(FeagiWebSocketServerPullerProperties {
599            local_bind_address: self.local_bind_address.clone(),
600            remote_bind_address: self.remote_bind_address.clone(),
601        })
602    }
603}
604
605//endregion
606
607// ============================================================================
608// Router
609// ============================================================================
610
611//region Router Properties
612
613/// Configuration properties for creating a WebSocket router server.
614#[derive(Debug, Clone, PartialEq)]
615pub struct FeagiWebSocketServerRouterProperties {
616    local_bind_address: WebSocketUrl,
617    remote_bind_address: WebSocketUrl,
618}
619
620impl FeagiWebSocketServerRouterProperties {
621    /// Creates new router properties with local bind address.
622    ///
623    /// The remote endpoint defaults to the same address with `ws://` scheme.
624    pub fn new(local_bind_address: &str) -> Result<Self, FeagiNetworkError> {
625        Self::new_with_remote(local_bind_address, local_bind_address)
626    }
627
628    /// Creates new router properties with explicit local/remote endpoints.
629    pub fn new_with_remote(
630        local_bind_address: &str,
631        remote_bind_address: &str,
632    ) -> Result<Self, FeagiNetworkError> {
633        let local_bind_address = WebSocketUrl::new(local_bind_address)?;
634        let remote_bind_address = WebSocketUrl::new(remote_bind_address)?;
635        Ok(Self {
636            local_bind_address,
637            remote_bind_address,
638        })
639    }
640}
641
642impl FeagiServerRouterProperties for FeagiWebSocketServerRouterProperties {
643    fn as_boxed_server_router(&self) -> Box<dyn FeagiServerRouter> {
644        Box::new(FeagiWebSocketServerRouter {
645            local_bind_address: self.local_bind_address.clone(),
646            remote_bind_address: self.remote_bind_address.clone(),
647            current_state: FeagiEndpointState::Inactive,
648            listener: None,
649            clients: Vec::new(),
650            receive_buffer: None,
651            current_session: None,
652            has_data: false,
653            index_to_session: HashMap::new(),
654            session_to_index: HashMap::new(),
655        })
656    }
657
658    fn get_bind_point(&self) -> TransportProtocolEndpoint {
659        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
660    }
661
662    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
663        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
664    }
665
666    fn get_protocol(&self) -> TransportProtocolImplementation {
667        TransportProtocolImplementation::WebSocket
668    }
669}
670
671//endregion
672
673//region Router Implementation
674
675/// A WebSocket server that handles request-response communication with multiple clients.
676pub struct FeagiWebSocketServerRouter {
677    local_bind_address: WebSocketUrl,
678    remote_bind_address: WebSocketUrl,
679    current_state: FeagiEndpointState,
680    listener: Option<TcpListener>,
681    /// Connections in various states (handshaking or ready)
682    clients: Vec<HandshakeState>,
683    /// Buffer for received request
684    receive_buffer: Option<Vec<u8>>,
685    /// Session ID of the client that sent the current request
686    current_session: Option<AgentID>,
687    has_data: bool,
688    /// Client index to AgentID mapping
689    index_to_session: HashMap<usize, AgentID>,
690    /// AgentID to client index mapping
691    session_to_index: HashMap<AgentID, usize>,
692}
693
694impl FeagiWebSocketServerRouter {
695    fn try_extract_agent_id_from_payload(payload: &[u8]) -> Option<AgentID> {
696        let start = FeagiByteContainer::GLOBAL_BYTE_HEADER_BYTE_COUNT;
697        let end = start + FeagiByteContainer::AGENT_ID_BYTE_COUNT;
698        if payload.len() < end {
699            return None;
700        }
701
702        let mut id_bytes = [0u8; AgentID::NUMBER_BYTES];
703        id_bytes.copy_from_slice(&payload[start..end]);
704        let parsed_id = AgentID::new(id_bytes);
705        if parsed_id.is_blank() {
706            return None;
707        }
708        Some(parsed_id)
709    }
710
711    fn remap_client_session(&mut self, client_index: usize, new_session_id: AgentID) {
712        let previous_session = self.index_to_session.insert(client_index, new_session_id);
713        if let Some(old_session_id) = previous_session {
714            self.session_to_index.remove(&old_session_id);
715        }
716
717        if let Some(previous_index) = self.session_to_index.insert(new_session_id, client_index) {
718            if previous_index != client_index {
719                self.index_to_session.remove(&previous_index);
720            }
721        }
722    }
723
724    fn align_session_with_payload_agent_id(&mut self, client_index: usize, payload: &[u8]) {
725        if let Some(payload_agent_id) = Self::try_extract_agent_id_from_payload(payload) {
726            self.remap_client_session(client_index, payload_agent_id);
727        }
728    }
729
730    fn accept_pending_connections(&mut self) {
731        let listener = match &self.listener {
732            Some(l) => l,
733            None => return,
734        };
735
736        // Accept new TCP connections and start handshake
737        loop {
738            match listener.accept() {
739                Ok((stream, _addr)) => {
740                    if let Some(state) = begin_nonblocking_handshake(stream) {
741                        self.clients.push(state);
742                    }
743                }
744                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
745                Err(_) => break,
746            }
747        }
748    }
749
750    fn process_handshakes(&mut self) {
751        let mut indices_to_remove = Vec::new();
752
753        for (i, state) in self.clients.iter_mut().enumerate() {
754            match state {
755                HandshakeState::Handshaking(_) => {
756                    advance_nonblocking_handshake(state);
757                    match state {
758                        HandshakeState::Ready(_) => {
759                            let session_id = AgentID::new_random();
760                            self.index_to_session.insert(i, session_id);
761                            self.session_to_index.insert(session_id, i);
762                        }
763                        HandshakeState::Failed => indices_to_remove.push(i),
764                        HandshakeState::Handshaking(_) => {}
765                    }
766                }
767                HandshakeState::Failed => {
768                    indices_to_remove.push(i);
769                }
770                HandshakeState::Ready(_) => {
771                    // Ensure every ready client has a stable session mapping.
772                    // This also covers immediate-handshake success where a client
773                    // was inserted as Ready directly from accept path.
774                    if let std::collections::hash_map::Entry::Vacant(e) =
775                        self.index_to_session.entry(i)
776                    {
777                        let session_id = AgentID::new_random();
778                        e.insert(session_id);
779                        self.session_to_index.insert(session_id, i);
780                    }
781                }
782            }
783        }
784
785        // Remove failed handshakes
786        for &i in indices_to_remove.iter().rev() {
787            if i < self.clients.len() {
788                self.remove_client(i);
789            }
790        }
791    }
792
793    fn try_receive(&mut self) -> bool {
794        let mut failed_indices = Vec::new();
795
796        for i in 0..self.clients.len() {
797            let read_result = {
798                let client = match self.clients.get_mut(i) {
799                    Some(HandshakeState::Ready(ws)) => ws,
800                    _ => continue, // Skip handshaking/failed connections
801                };
802                client.read()
803            };
804
805            match read_result {
806                Ok(Message::Binary(data)) => {
807                    self.align_session_with_payload_agent_id(i, &data);
808                    if let Some(&session_id) = self.index_to_session.get(&i) {
809                        self.receive_buffer = Some(data);
810                        self.current_session = Some(session_id);
811                        // Remove failed clients before returning
812                        for idx in failed_indices.into_iter().rev() {
813                            self.remove_client(idx);
814                        }
815                        return true;
816                    }
817                }
818                Ok(Message::Text(text)) => {
819                    let text_bytes = text.into_bytes();
820                    self.align_session_with_payload_agent_id(i, &text_bytes);
821                    if let Some(&session_id) = self.index_to_session.get(&i) {
822                        self.receive_buffer = Some(text_bytes);
823                        self.current_session = Some(session_id);
824                        for idx in failed_indices.into_iter().rev() {
825                            self.remove_client(idx);
826                        }
827                        return true;
828                    }
829                }
830                Ok(Message::Close(_)) => {
831                    failed_indices.push(i);
832                }
833                Ok(_) => {}
834                Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock => {}
835                Err(_) => {
836                    failed_indices.push(i);
837                }
838            }
839        }
840
841        for i in failed_indices.into_iter().rev() {
842            self.remove_client(i);
843        }
844
845        false
846    }
847
848    fn remove_client(&mut self, index: usize) {
849        if index >= self.clients.len() {
850            return;
851        }
852
853        // Remove and close the client
854        let removed_state = self.clients.remove(index);
855        if let HandshakeState::Ready(mut ws) = removed_state {
856            let _ = ws.close(None);
857        }
858
859        // Remove from mappings
860        if let Some(session_id) = self.index_to_session.remove(&index) {
861            self.session_to_index.remove(&session_id);
862        }
863
864        // Update indices for all clients after the removed one
865        let mut new_index_to_session = HashMap::new();
866        let mut new_session_to_index = HashMap::new();
867
868        for (old_idx, session_id) in self.index_to_session.drain() {
869            let new_idx = if old_idx > index {
870                old_idx - 1
871            } else {
872                old_idx
873            };
874            new_index_to_session.insert(new_idx, session_id);
875            new_session_to_index.insert(session_id, new_idx);
876        }
877
878        self.index_to_session = new_index_to_session;
879        self.session_to_index = new_session_to_index;
880    }
881}
882
883impl FeagiServer for FeagiWebSocketServerRouter {
884    fn poll(&mut self) -> &FeagiEndpointState {
885        if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
886            self.accept_pending_connections();
887            self.process_handshakes(); // Process pending handshakes
888
889            if self.try_receive() {
890                self.has_data = true;
891                self.current_state = FeagiEndpointState::ActiveHasData;
892            }
893        }
894        &self.current_state
895    }
896
897    fn request_start(&mut self) -> Result<(), FeagiNetworkError> {
898        match &self.current_state {
899            FeagiEndpointState::Inactive => {
900                let listener = TcpListener::bind(self.local_bind_address.host_port())
901                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
902                listener
903                    .set_nonblocking(true)
904                    .map_err(|e| FeagiNetworkError::CannotBind(e.to_string()))?;
905
906                self.listener = Some(listener);
907                self.current_state = FeagiEndpointState::ActiveWaiting;
908                Ok(())
909            }
910            _ => Err(FeagiNetworkError::InvalidSocketProperties(
911                "Cannot start: server is not in Inactive state".to_string(),
912            )),
913        }
914    }
915
916    fn request_stop(&mut self) -> Result<(), FeagiNetworkError> {
917        match &self.current_state {
918            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
919                for state in self.clients.drain(..) {
920                    if let HandshakeState::Ready(mut ws) = state {
921                        let _ = ws.close(None);
922                    }
923                }
924                self.listener = None;
925                self.receive_buffer = None;
926                self.current_session = None;
927                self.has_data = false;
928                self.index_to_session.clear();
929                self.session_to_index.clear();
930                self.current_state = FeagiEndpointState::Inactive;
931                Ok(())
932            }
933            _ => Err(FeagiNetworkError::InvalidSocketProperties(
934                "Cannot stop: server is not in Active state".to_string(),
935            )),
936        }
937    }
938
939    fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
940        match &self.current_state {
941            FeagiEndpointState::Errored(_) => {
942                for state in self.clients.drain(..) {
943                    if let HandshakeState::Ready(mut ws) = state {
944                        let _ = ws.close(None);
945                    }
946                }
947                self.listener = None;
948                self.receive_buffer = None;
949                self.current_session = None;
950                self.has_data = false;
951                self.index_to_session.clear();
952                self.session_to_index.clear();
953                self.current_state = FeagiEndpointState::Inactive;
954                Ok(())
955            }
956            _ => Err(FeagiNetworkError::InvalidSocketProperties(
957                "Cannot confirm error: server is not in Errored state".to_string(),
958            )),
959        }
960    }
961
962    fn get_bind_point(&self) -> TransportProtocolEndpoint {
963        TransportProtocolEndpoint::WebSocket(self.local_bind_address.clone())
964    }
965
966    fn get_agent_endpoint(&self) -> TransportProtocolEndpoint {
967        TransportProtocolEndpoint::WebSocket(self.remote_bind_address.clone())
968    }
969
970    fn get_protocol(&self) -> TransportProtocolImplementation {
971        TransportProtocolImplementation::WebSocket
972    }
973}
974
975impl FeagiServerRouter for FeagiWebSocketServerRouter {
976    fn consume_retrieved_request(&mut self) -> Result<(AgentID, &[u8]), FeagiNetworkError> {
977        match &self.current_state {
978            FeagiEndpointState::ActiveHasData => {
979                if self.has_data {
980                    if let (Some(ref data), Some(session_id)) =
981                        (&self.receive_buffer, self.current_session)
982                    {
983                        self.has_data = false;
984                        self.current_session = None;
985                        self.current_state = FeagiEndpointState::ActiveWaiting;
986                        Ok((session_id, data.as_slice()))
987                    } else {
988                        Err(FeagiNetworkError::ReceiveFailed(
989                            "No data or session in buffer".to_string(),
990                        ))
991                    }
992                } else {
993                    Err(FeagiNetworkError::ReceiveFailed(
994                        "No data available".to_string(),
995                    ))
996                }
997            }
998            _ => Err(FeagiNetworkError::ReceiveFailed(
999                "Cannot consume: no request available".to_string(),
1000            )),
1001        }
1002    }
1003
1004    fn publish_response(
1005        &mut self,
1006        session_id: AgentID,
1007        message: &[u8],
1008    ) -> Result<(), FeagiNetworkError> {
1009        match &self.current_state {
1010            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
1011                let client_index = *self.session_to_index.get(&session_id).ok_or_else(|| {
1012                    FeagiNetworkError::SendFailed(format!("Unknown session: {:?}", session_id))
1013                })?;
1014
1015                if client_index >= self.clients.len() {
1016                    return Err(FeagiNetworkError::SendFailed(
1017                        "Client disconnected".to_string(),
1018                    ));
1019                }
1020
1021                let ws_message = Message::Binary(message.to_vec());
1022
1023                let client = match &mut self.clients[client_index] {
1024                    HandshakeState::Ready(ws) => ws,
1025                    _ => {
1026                        return Err(FeagiNetworkError::SendFailed(
1027                            "Client not ready".to_string(),
1028                        ))
1029                    }
1030                };
1031
1032                client
1033                    .send(ws_message)
1034                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
1035                client
1036                    .flush()
1037                    .map_err(|e| FeagiNetworkError::SendFailed(e.to_string()))?;
1038
1039                Ok(())
1040            }
1041            _ => Err(FeagiNetworkError::SendFailed(
1042                "Cannot send response: server is not in Active state".to_string(),
1043            )),
1044        }
1045    }
1046
1047    fn as_boxed_router_properties(&self) -> Box<dyn FeagiServerRouterProperties> {
1048        Box::new(FeagiWebSocketServerRouterProperties {
1049            local_bind_address: self.local_bind_address.clone(),
1050            remote_bind_address: self.remote_bind_address.clone(),
1051        })
1052    }
1053}
1054
1055//endregion