Skip to main content

feagi_agent/server/
feagi_agent_handler.rs

1use crate::command_and_control::agent_embodiment_configuration_message::AgentEmbodimentConfigurationMessage;
2use crate::command_and_control::agent_registration_message::{
3    AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::server::auth::AgentAuth;
7use crate::server::wrappers::{
8    CommandControlWrapper, MotorTranslator, SensorTranslator, VisualizationTranslator,
9};
10use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
11use feagi_io::traits_and_enums::server::{
12    FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
13    FeagiServerPullerProperties, FeagiServerRouterProperties,
14};
15use feagi_io::traits_and_enums::shared::{
16    TransportProtocolEndpoint, TransportProtocolImplementation,
17};
18use feagi_io::AgentID;
19use feagi_serialization::FeagiByteContainer;
20use std::collections::{HashMap, HashSet};
21use std::time::{Duration, Instant};
22use tracing::{debug, error, info, warn};
23
24type CommandServerIndex = usize;
25
26/// Server-side liveness configuration for command/control sessions.
27///
28/// `heartbeat_timeout` defines when a client is considered stale if no
29/// command/control messages are received.
30/// `stale_check_interval` controls how often stale scans run during polling.
31#[derive(Debug, Clone)]
32pub struct AgentLivenessConfig {
33    pub heartbeat_timeout: Duration,
34    pub stale_check_interval: Duration,
35}
36
37impl Default for AgentLivenessConfig {
38    fn default() -> Self {
39        Self {
40            heartbeat_timeout: Duration::from_secs(30),
41            stale_check_interval: Duration::from_secs(1),
42        }
43    }
44}
45
46pub struct FeagiAgentHandler {
47    agent_auth_backend: Box<dyn AgentAuth>,
48    available_publishers: Vec<Box<dyn FeagiServerPublisherProperties>>,
49    available_pullers: Vec<Box<dyn FeagiServerPullerProperties>>,
50    command_control_servers: Vec<CommandControlWrapper>,
51
52    all_registered_agents: HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
53    agent_mapping_to_command_control_server_index: HashMap<AgentID, CommandServerIndex>,
54    last_activity_by_agent: HashMap<AgentID, Instant>,
55    sensors: HashMap<AgentID, SensorTranslator>,
56    motors: HashMap<AgentID, MotorTranslator>,
57    visualizations: HashMap<AgentID, VisualizationTranslator>,
58    sensor_poll_cursor: usize,
59    liveness_config: AgentLivenessConfig,
60    last_stale_check_at: Instant,
61
62    // this stuff is likely redundant
63    // REST STUFF
64    /// Device registrations by AgentDescriptor (REST API configuration storage)
65    device_registrations_by_descriptor: HashMap<AgentDescriptor, serde_json::Value>,
66    /// Agent ID (base64) by AgentDescriptor (for REST→WebSocket bridging)
67    agent_id_by_descriptor: HashMap<AgentDescriptor, String>,
68    /// Device registrations by AgentID (active connections)
69    device_registrations_by_agent: HashMap<AgentID, serde_json::Value>,
70}
71
72impl FeagiAgentHandler {
73    #[allow(dead_code)]
74    fn capabilities_equivalent(
75        existing: &[AgentCapabilities],
76        requested: &[AgentCapabilities],
77    ) -> bool {
78        existing.len() == requested.len()
79            && existing
80                .iter()
81                .all(|capability| requested.contains(capability))
82    }
83
84    /// Returns true when an existing descriptor session should be replaced by a new registration.
85    ///
86    /// This suppresses immediate descriptor-replacement churn caused by duplicate
87    /// registration packets that arrive within a very short window for the same
88    /// live agent session.
89    #[allow(dead_code)]
90    fn should_replace_existing_descriptor_session(&self, existing_agent_id: AgentID) -> bool {
91        let Some(last_seen) = self.last_activity_by_agent.get(&existing_agent_id) else {
92            // Missing liveness state is treated as stale and replaceable.
93            return true;
94        };
95
96        let duplicate_guard_window = self
97            .liveness_config
98            .stale_check_interval
99            .checked_mul(2)
100            .unwrap_or(self.liveness_config.stale_check_interval);
101
102        // If the existing session is still very fresh, treat incoming registration
103        // as duplicate/in-flight reconnect noise and keep the existing mapping.
104        last_seen.elapsed() > duplicate_guard_window
105    }
106
107    pub fn new(agent_auth_backend: Box<dyn AgentAuth>) -> FeagiAgentHandler {
108        Self::new_with_liveness_config(agent_auth_backend, AgentLivenessConfig::default())
109    }
110
111    /// Create a handler with explicit liveness configuration.
112    ///
113    /// This constructor is preferred in FEAGI runtime code paths where values
114    /// come from centralized configuration (`feagi_configuration.toml`).
115    pub fn new_with_liveness_config(
116        agent_auth_backend: Box<dyn AgentAuth>,
117        liveness_config: AgentLivenessConfig,
118    ) -> FeagiAgentHandler {
119        FeagiAgentHandler {
120            agent_auth_backend,
121            available_publishers: Vec::new(),
122            available_pullers: Vec::new(),
123
124            command_control_servers: Vec::new(),
125            all_registered_agents: HashMap::new(),
126            agent_mapping_to_command_control_server_index: HashMap::new(),
127            last_activity_by_agent: HashMap::new(),
128            sensors: Default::default(),
129            motors: Default::default(),
130            visualizations: Default::default(),
131            sensor_poll_cursor: 0,
132            liveness_config,
133            last_stale_check_at: Instant::now(),
134
135            device_registrations_by_descriptor: HashMap::new(),
136            agent_id_by_descriptor: HashMap::new(),
137            device_registrations_by_agent: HashMap::new(),
138        }
139    }
140
141    //region Get Properties
142
143    pub fn get_all_registered_agents(
144        &self,
145    ) -> &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)> {
146        &self.all_registered_agents
147    }
148
149    pub fn get_all_registered_sensors(&self) -> HashSet<AgentID> {
150        self.sensors.keys().cloned().collect()
151    }
152
153    pub fn get_all_registered_motors(&self) -> HashSet<AgentID> {
154        self.motors.keys().cloned().collect()
155    }
156
157    pub fn get_all_registered_visualizations(&self) -> HashSet<AgentID> {
158        self.visualizations.keys().cloned().collect()
159    }
160
161    /// Register a logical agent entry without transport allocation.
162    ///
163    /// This utility is intended for deterministic transition/integration tests
164    /// that need an active agent session record without starting network servers.
165    pub fn register_logical_agent(
166        &mut self,
167        agent_id: AgentID,
168        descriptor: AgentDescriptor,
169        capabilities: Vec<AgentCapabilities>,
170    ) {
171        self.all_registered_agents
172            .insert(agent_id, (descriptor, capabilities));
173        self.last_activity_by_agent.insert(agent_id, Instant::now());
174    }
175
176    /// Forcefully deregister all currently connected agents.
177    ///
178    /// Returns the removed session IDs (base64) so callers can clear any
179    /// runtime subscriptions keyed by session.
180    pub fn force_deregister_all_agents(&mut self, reason: &str) -> Vec<String> {
181        let ids: Vec<AgentID> = self.all_registered_agents.keys().copied().collect();
182        let mut removed_ids = Vec::with_capacity(ids.len());
183        for agent_id in ids {
184            removed_ids.push(agent_id.to_base64());
185            self.deregister_agent_internal(agent_id, reason);
186        }
187        removed_ids
188    }
189
190    pub fn get_command_control_server_info(&self) -> Vec<Box<dyn FeagiServerRouterProperties>> {
191        let mut output: Vec<Box<dyn FeagiServerRouterProperties>> = Vec::new();
192        for command_control_server in &self.command_control_servers {
193            output.push(command_control_server.get_running_server_properties())
194        }
195        output
196    }
197
198    //region  REST
199
200    /// Get device registrations by AgentID
201    pub fn get_device_registrations_by_agent(
202        &self,
203        agent_id: AgentID,
204    ) -> Option<&serde_json::Value> {
205        self.device_registrations_by_agent.get(&agent_id)
206    }
207
208    /// Store device registrations by AgentDescriptor (REST API - before connection)
209    /// Also stores the original agent_id for later WebSocket→REST bridging
210    pub fn set_device_registrations_by_descriptor(
211        &mut self,
212        agent_id_base64: String,
213        agent_descriptor: AgentDescriptor,
214        device_registrations: serde_json::Value,
215    ) {
216        self.device_registrations_by_descriptor
217            .insert(agent_descriptor.clone(), device_registrations);
218        self.agent_id_by_descriptor
219            .insert(agent_descriptor, agent_id_base64);
220    }
221
222    /// Get device registrations by AgentDescriptor (REST API queries)
223    pub fn get_device_registrations_by_descriptor(
224        &self,
225        agent_descriptor: &AgentDescriptor,
226    ) -> Option<&serde_json::Value> {
227        self.device_registrations_by_descriptor
228            .get(agent_descriptor)
229    }
230
231    /// Store device registrations by AgentID (active connection)
232    pub fn set_device_registrations_by_agent(
233        &mut self,
234        agent_id: AgentID,
235        device_registrations: serde_json::Value,
236    ) {
237        self.device_registrations_by_agent
238            .insert(agent_id, device_registrations);
239    }
240
241    // TODO redudant, you can simply check if a AgentID has the capability hash?
242    /// Check if a agent has visualization capability configured
243    /// Returns (agent_id_base64, rate_hz) for registration with RuntimeService
244    pub fn get_visualization_info_for_agent(&self, agent_id: AgentID) -> Option<(String, f64)> {
245        let device_regs = self.device_registrations_by_agent.get(&agent_id)?;
246        let viz = device_regs.get("visualization")?;
247        let rate_hz = viz.get("rate_hz").and_then(|v| v.as_f64())?;
248
249        if rate_hz > 0.0 {
250            let agent_descriptor = self.all_registered_agents.get(&agent_id)?;
251            let agent_id = self
252                .agent_id_by_descriptor
253                .get(&agent_descriptor.0)?
254                .clone();
255            Some((agent_id, rate_hz))
256        } else {
257            None
258        }
259    }
260
261    //endregion
262
263    //endregion
264
265    //region Add Servers
266
267    /// Add a poll-based command/control server (ZMQ/WS). The router is wrapped in a
268    /// [`CommandControlWrapper`] that only exposes messages.
269    pub fn add_and_start_command_control_server(
270        &mut self,
271        router_property: Box<dyn FeagiServerRouterProperties>,
272    ) -> Result<(), FeagiAgentError> {
273        let mut router = router_property.as_boxed_server_router();
274        router.request_start()?;
275        let translator = CommandControlWrapper::new(router);
276        self.command_control_servers.push(translator);
277        Ok(())
278    }
279
280    pub fn add_publisher_server(&mut self, publisher: Box<dyn FeagiServerPublisherProperties>) {
281        // TODO check for collisions
282        self.available_publishers.push(publisher);
283    }
284
285    pub fn add_puller_server(&mut self, puller: Box<dyn FeagiServerPullerProperties>) {
286        // TODO check for collisions
287        self.available_pullers.push(puller);
288    }
289
290    // TODO talk about forcibly starting servers
291    /*
292    /// Add and start a broadcast publisher server (e.g., visualization on port 9050)
293    /// This creates a running server instance that can be polled and broadcast to
294    /// NOTE: This does NOT add to available_publishers - broadcast publishers are shared
295    pub fn add_and_start_broadcast_publisher(&mut self, publisher_props: Box<dyn FeagiServerPublisherProperties>) -> Result<(), FeagiAgentError> {
296        let mut publisher = publisher_props.as_boxed_server_publisher();
297        publisher.request_start()?;
298        self.broadcast_publishers.push(publisher);
299        Ok(())
300    }
301
302     */
303
304    //endregion
305
306    //region Command and Control
307
308    /// Poll all command and control servers. Messages for registration request and heartbeat are
309    /// handled internally here. Others are raised for FEAGI to act upon
310    pub fn poll_command_and_control(
311        &mut self,
312    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
313        self.try_prune_stale_agents();
314        for (command_index, translator) in self.command_control_servers.iter_mut().enumerate() {
315            // TODO smarter error handling. Many things don't deserve a panic
316            let possible_message =
317                translator.poll_for_incoming_messages(&self.all_registered_agents)?;
318
319            match possible_message {
320                None => {
321                    continue;
322                }
323                Some((agent_id, message, is_new_agent)) => {
324                    if is_new_agent {
325                        return self.handle_messages_from_unknown_agent_ids(
326                            agent_id,
327                            &message,
328                            command_index,
329                        );
330                    } else {
331                        return self.handle_messages_from_known_agent_ids(agent_id, message);
332                    }
333                }
334            }
335        }
336        // Nothing to report from anyone!
337        Ok(None)
338    }
339
340    /// Send a command and control message to a specific agent
341    pub fn send_message_to_agent(
342        &mut self,
343        agent_id: AgentID,
344        message: FeagiMessage,
345        increment_counter: u16,
346    ) -> Result<(), FeagiAgentError> {
347        let translator_index = match self
348            .agent_mapping_to_command_control_server_index
349            .get(&agent_id)
350        {
351            None => {
352                return Err(FeagiAgentError::Other(
353                    "No such Agent ID exists!".to_string(),
354                ))
355            }
356            Some(index) => index,
357        };
358
359        let command_translator = match self.command_control_servers.get_mut(*translator_index) {
360            None => {
361                panic!("Missing Index for command control server!") // something went wrong
362            }
363            Some(translator) => translator,
364        };
365        command_translator.send_message(agent_id, message, increment_counter)
366    }
367
368    /// Send a command/control response via the router that received the request.
369    ///
370    /// This is used for unknown sessions (pre-registration), where agent-to-router
371    /// mapping does not exist yet.
372    fn send_message_via_command_server(
373        &mut self,
374        command_server_index: CommandServerIndex,
375        session_id: AgentID,
376        message: FeagiMessage,
377        increment_counter: u16,
378    ) -> Result<(), FeagiAgentError> {
379        let command_translator = self
380            .command_control_servers
381            .get_mut(command_server_index)
382            .ok_or_else(|| {
383                FeagiAgentError::Other("Missing command control server index".to_string())
384            })?;
385        command_translator.send_message(session_id, message, increment_counter)
386    }
387
388    pub fn send_motor_data_to_agent(
389        &mut self,
390        agent_id: AgentID,
391        data: &FeagiByteContainer,
392    ) -> Result<(), FeagiAgentError> {
393        let motor_translator = self
394            .motors
395            .get_mut(&agent_id)
396            .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
397        motor_translator.poll_and_send_buffered_motor_data(data)?;
398        self.refresh_agent_activity(agent_id);
399        Ok(())
400    }
401
402    pub fn send_visualization_data_to_agent(
403        &mut self,
404        agent_id: AgentID,
405        data: &FeagiByteContainer,
406    ) -> Result<(), FeagiAgentError> {
407        let visualization_translator = self
408            .visualizations
409            .get_mut(&agent_id)
410            .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
411        visualization_translator.poll_and_send_visualization_data(data)?;
412        self.refresh_agent_activity(agent_id);
413        Ok(())
414    }
415
416    //endregion
417
418    //region Agents
419
420    pub fn poll_agent_sensors(&mut self) -> Result<Option<FeagiByteContainer>, FeagiAgentError> {
421        let mut sensor_ids: Vec<AgentID> = self.sensors.keys().copied().collect();
422        if sensor_ids.is_empty() {
423            return Ok(None);
424        }
425
426        sensor_ids.sort_by_key(|agent_id| agent_id.to_base64());
427        let count = sensor_ids.len();
428        let start = self.sensor_poll_cursor % count;
429
430        for offset in 0..count {
431            let idx = (start + offset) % count;
432            let agent_id = sensor_ids[idx];
433            let polled_data = if let Some(translator) = self.sensors.get_mut(&agent_id) {
434                translator.poll_sensor_server()?.cloned()
435            } else {
436                None
437            };
438
439            if let Some(data) = polled_data {
440                self.sensor_poll_cursor = (idx + 1) % count;
441                self.refresh_agent_activity(agent_id);
442                return Ok(Some(data));
443            }
444        }
445
446        self.sensor_poll_cursor = (start + 1) % count;
447        Ok(None)
448    }
449
450    pub fn poll_agent_motors(&mut self) -> Result<(), FeagiAgentError> {
451        for (_id, translator) in self.motors.iter_mut() {
452            translator.poll_motor_server()?;
453        }
454        Ok(())
455    }
456
457    pub fn poll_agent_visualizers(&mut self) -> Result<(), FeagiAgentError> {
458        for (_id, translator) in self.visualizations.iter_mut() {
459            translator.poll_visualization_server()?;
460        }
461        Ok(())
462    }
463
464    pub fn send_motor_data(
465        &mut self,
466        agent_id: AgentID,
467        motor_data: &FeagiByteContainer,
468    ) -> Result<(), FeagiAgentError> {
469        let embodiment_option = self.motors.get_mut(&agent_id);
470        match embodiment_option {
471            Some(embodiment) => {
472                embodiment.poll_and_send_buffered_motor_data(motor_data)?;
473                self.refresh_agent_activity(agent_id);
474                Ok(())
475            }
476            None => Err(FeagiAgentError::UnableToSendData(
477                "Nonexistant Agent ID!".to_string(),
478            )),
479        }
480    }
481
482    /// Send visualization data to a specific agent via dedicated visualization channel
483    pub fn send_visualization_data(
484        &mut self,
485        agent_id: AgentID,
486        viz_data: &FeagiByteContainer,
487    ) -> Result<(), FeagiAgentError> {
488        let embodiment_option = self.visualizations.get_mut(&agent_id);
489        match embodiment_option {
490            Some(embodiment) => {
491                embodiment.poll_and_send_visualization_data(viz_data)?;
492                self.refresh_agent_activity(agent_id);
493                Ok(())
494            }
495            None => Err(FeagiAgentError::UnableToSendData(
496                "Nonexistant Agent ID!".to_string(),
497            )),
498        }
499    }
500
501    //endregion
502
503    //region Internal
504
505    //region Get property
506
507    fn try_get_puller_property_index(
508        &mut self,
509        wanted_protocol: &TransportProtocolImplementation,
510    ) -> Result<usize, FeagiAgentError> {
511        for i in 0..self.available_pullers.len() {
512            let available_puller = &self.available_pullers[i];
513            if &available_puller
514                .get_bind_point()
515                .as_transport_protocol_implementation()
516                != wanted_protocol
517            {
518                // not the protocol we are looking for
519                continue;
520            } else {
521                // found the protocol we want
522                return Ok(i);
523            }
524        }
525        Err(FeagiAgentError::InitFail(
526            "Missing required protocol puller".to_string(),
527        ))
528    }
529
530    fn try_get_publisher_property_index(
531        &mut self,
532        wanted_protocol: &TransportProtocolImplementation,
533    ) -> Result<usize, FeagiAgentError> {
534        for i in 0..self.available_publishers.len() {
535            let available_publisher = &self.available_publishers[i];
536            if &available_publisher.get_protocol() != wanted_protocol {
537                // not the protocol we are looking for
538                continue;
539            } else {
540                // found the protocol we want
541                return Ok(i);
542            }
543        }
544        Err(FeagiAgentError::InitFail(
545            "Missing required protocol publisher".to_string(),
546        ))
547    }
548
549    fn try_get_last_publisher_property_index(
550        &mut self,
551        wanted_protocol: &TransportProtocolImplementation,
552    ) -> Result<usize, FeagiAgentError> {
553        for i in (0..self.available_publishers.len()).rev() {
554            let available_publisher = &self.available_publishers[i];
555            if &available_publisher.get_protocol() != wanted_protocol {
556                continue;
557            } else {
558                return Ok(i);
559            }
560        }
561        Err(FeagiAgentError::InitFail(
562            "Missing required protocol publisher".to_string(),
563        ))
564    }
565
566    //endregion
567
568    //region Message Handling
569
570    fn handle_messages_from_unknown_agent_ids(
571        &mut self,
572        agent_id: AgentID,
573        message: &FeagiMessage,
574        command_control_index: CommandServerIndex,
575    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
576        match &message {
577            FeagiMessage::AgentRegistration(register_message) => {
578                match &register_message {
579                    AgentRegistrationMessage::ClientRequestRegistration(registration_request) => {
580                        debug!(
581                            target: "feagi-agent",
582                            "WS registration request received: session={} descriptor={:?} caps={:?} protocol={:?}",
583                            agent_id.to_base64(),
584                            registration_request.agent_descriptor(),
585                            registration_request.requested_capabilities(),
586                            registration_request.connection_protocol()
587                        );
588                        let auth_result = self
589                            .agent_auth_backend
590                            .verify_agent_allowed_to_connect(registration_request);
591                        if auth_result.is_err() {
592                            warn!(
593                                target: "feagi-agent",
594                                "WS registration rejected by auth backend: session={} descriptor={:?}",
595                                agent_id.to_base64(),
596                                registration_request.agent_descriptor()
597                            );
598                            self.send_message_via_command_server(
599                                command_control_index,
600                                agent_id,
601                                FeagiMessage::AgentRegistration(
602                                    AgentRegistrationMessage::ServerRespondsRegistration(
603                                        RegistrationResponse::FailedInvalidAuth,
604                                    ),
605                                ),
606                                0,
607                            )?;
608                            return Ok(None);
609                        }
610                        // auth passed; if the same descriptor is already connected, replace it
611                        // first so reconnects can reclaim resources immediately.
612                        //
613                        // Important: only replace when capability shape is equivalent. This
614                        // prevents unrelated clients that share a descriptor string from
615                        // evicting each other (for example, a motor/sensor client removing
616                        // a live visualization client).
617                        if let Some(existing_agent_id) = self
618                            .find_agent_id_by_descriptor(registration_request.agent_descriptor())
619                        {
620                            if let Some((_, existing_capabilities)) =
621                                self.all_registered_agents.get(&existing_agent_id)
622                            {
623                                if !Self::capabilities_equivalent(
624                                    existing_capabilities,
625                                    registration_request.requested_capabilities(),
626                                ) {
627                                    info!(
628                                        target: "feagi-agent",
629                                        "Rejecting descriptor-collision registration for {:?}: existing session {} has different capabilities",
630                                        registration_request.agent_descriptor(),
631                                        existing_agent_id.to_base64()
632                                    );
633                                    self.send_message_via_command_server(
634                                        command_control_index,
635                                        agent_id,
636                                        FeagiMessage::AgentRegistration(
637                                            AgentRegistrationMessage::ServerRespondsRegistration(
638                                                RegistrationResponse::AlreadyRegistered,
639                                            ),
640                                        ),
641                                        0,
642                                    )?;
643                                    return Ok(None);
644                                }
645                            }
646                            if !self.should_replace_existing_descriptor_session(existing_agent_id) {
647                                debug!(
648                                    target: "feagi-agent",
649                                    "Ignoring duplicate registration for descriptor {:?}: existing session {} remains active",
650                                    registration_request.agent_descriptor(),
651                                    existing_agent_id.to_base64()
652                                );
653                                self.send_message_via_command_server(
654                                    command_control_index,
655                                    agent_id,
656                                    FeagiMessage::AgentRegistration(
657                                        AgentRegistrationMessage::ServerRespondsRegistration(
658                                            RegistrationResponse::AlreadyRegistered,
659                                        ),
660                                    ),
661                                    0,
662                                )?;
663                                return Ok(None);
664                            }
665                            let replacement_reason = format!(
666                                "descriptor replacement by new registration session={}",
667                                agent_id.to_base64()
668                            );
669                            self.deregister_agent_internal(existing_agent_id, &replacement_reason);
670                        }
671
672                        // register and always respond deterministically (avoid client timeouts).
673                        let mappings = match self.register_agent(
674                            agent_id,
675                            *registration_request.connection_protocol(),
676                            registration_request.requested_capabilities().to_vec(),
677                            registration_request.agent_descriptor().clone(),
678                            command_control_index,
679                        ) {
680                            Ok(mappings) => mappings,
681                            Err(_) => {
682                                error!(
683                                    target: "feagi-agent",
684                                    "WS registration failed while creating transport mappings: session={} descriptor={:?}",
685                                    agent_id.to_base64(),
686                                    registration_request.agent_descriptor()
687                                );
688                                self.send_message_via_command_server(
689                                    command_control_index,
690                                    agent_id,
691                                    FeagiMessage::AgentRegistration(
692                                        AgentRegistrationMessage::ServerRespondsRegistration(
693                                            RegistrationResponse::FailedInvalidRequest,
694                                        ),
695                                    ),
696                                    0,
697                                )?;
698                                return Ok(None);
699                            }
700                        };
701
702                        let mapped_caps: Vec<_> = mappings.keys().cloned().collect();
703                        let response = RegistrationResponse::Success(agent_id, mappings);
704                        let response_message = FeagiMessage::AgentRegistration(
705                            AgentRegistrationMessage::ServerRespondsRegistration(response),
706                        );
707                        self.send_message_via_command_server(
708                            command_control_index,
709                            agent_id,
710                            response_message,
711                            0,
712                        )?;
713                        debug!(
714                            target: "feagi-agent",
715                            "WS registration success response sent: session={} descriptor={:?} mapped_caps={:?}",
716                            agent_id.to_base64(),
717                            registration_request.agent_descriptor(),
718                            mapped_caps
719                        );
720                        Ok(None)
721                    }
722                    AgentRegistrationMessage::ClientRequestDeregistration(_) => {
723                        let response = FeagiMessage::AgentRegistration(
724                            AgentRegistrationMessage::ServerRespondsDeregistration(
725                                DeregistrationResponse::NotRegistered,
726                            ),
727                        );
728                        self.send_message_via_command_server(
729                            command_control_index,
730                            agent_id,
731                            response,
732                            0,
733                        )?;
734                        Ok(None)
735                    }
736                    _ => {
737                        // If not requesting registration, we dont want to hear it
738                        Ok(None)
739                    }
740                }
741            }
742            _ => {
743                // If the new agent is not registering, we don't want to hear it
744                Ok(None)
745            }
746        }
747    }
748
749    fn handle_messages_from_known_agent_ids(
750        &mut self,
751        agent_id: AgentID,
752        message: FeagiMessage,
753    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
754        self.refresh_agent_activity(agent_id);
755        match &message {
756            FeagiMessage::AgentRegistration(register_message) => {
757                match register_message {
758                    AgentRegistrationMessage::ClientRequestDeregistration(request) => {
759                        // Respond first so REQ/REP clients can complete the in-flight request.
760                        self.send_message_to_agent(
761                            agent_id,
762                            FeagiMessage::AgentRegistration(
763                                AgentRegistrationMessage::ServerRespondsDeregistration(
764                                    DeregistrationResponse::Success,
765                                ),
766                            ),
767                            0,
768                        )?;
769                        let dereg_reason = request
770                            .reason()
771                            .map(|text| format!("client request: {}", text))
772                            .unwrap_or_else(|| "client request".to_string());
773                        self.deregister_agent_internal(agent_id, &dereg_reason);
774                        Ok(None)
775                    }
776                    _ => {
777                        // Already registered? dont dont register again
778                        // TODO any special exceptions?
779                        Ok(None)
780                    }
781                }
782            }
783            FeagiMessage::HeartBeat => {
784                // We can handle heartbeat here
785                // TODO or maybe we should let the higher levels handle it?
786                self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
787                Ok(None)
788            }
789            FeagiMessage::AgentConfiguration(
790                AgentEmbodimentConfigurationMessage::AgentConfigurationDetails(device_def),
791            ) => {
792                let device_regs = serde_json::to_value(device_def).unwrap_or_else(|_| {
793                    tracing::warn!(
794                        target: "feagi-agent",
795                        "Failed to serialize AgentConfigurationDetails to JSON"
796                    );
797                    serde_json::Value::Object(serde_json::Map::new())
798                });
799                self.set_device_registrations_by_agent(agent_id, device_regs.clone());
800                if let Some((descriptor, _)) = self.all_registered_agents.get(&agent_id) {
801                    self.set_device_registrations_by_descriptor(
802                        agent_id.to_base64(),
803                        descriptor.clone(),
804                        device_regs,
805                    );
806                }
807                debug!(
808                    target: "feagi-agent",
809                    "Stored device registrations for agent {}",
810                    agent_id.to_base64()
811                );
812                // Send acknowledgment so REQ/REP clients can complete the request
813                self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
814                Ok(None)
815            }
816            _ => {
817                // Throw up anything else
818                Ok(Some((agent_id, message)))
819            }
820        }
821    }
822
823    //endregion
824
825    //region Registration
826
827    fn register_agent(
828        &mut self,
829        agent_id: AgentID,
830        wanted_protocol: TransportProtocolImplementation,
831        agent_capabilities: Vec<AgentCapabilities>,
832        descriptor: AgentDescriptor,
833        command_server_index: CommandServerIndex,
834    ) -> Result<HashMap<AgentCapabilities, TransportProtocolEndpoint>, FeagiAgentError> {
835        // TODO prevent duplicate registration
836        /*
837        if self.all_registered_agents.contains_key(&agent_id) {
838            return Err(FeagiAgentError::ConnectionFailed(
839                "Agent Already registered".to_string(),
840            ));
841        }
842
843         */
844
845        let mut used_puller_indices: Vec<usize> = Vec::new();
846        let mut used_publisher_indices: Vec<usize> = Vec::new();
847        let mut sensor_servers: Vec<Box<dyn FeagiServerPuller>> = Vec::new();
848        let mut motor_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
849        let mut visualizer_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
850        let mut endpoint_mappings: HashMap<AgentCapabilities, TransportProtocolEndpoint> =
851            HashMap::new();
852
853        // We try spawning all the servers first without taking any properties out mof circulation
854        for agent_capability in &agent_capabilities {
855            match agent_capability {
856                AgentCapabilities::SendSensorData => {
857                    let puller_property_index =
858                        self.try_get_puller_property_index(&wanted_protocol)?;
859                    let puller_property = &self.available_pullers[puller_property_index];
860                    let mut sensor_server = puller_property.as_boxed_server_puller();
861                    sensor_server.request_start()?;
862                    sensor_servers.push(sensor_server);
863                    endpoint_mappings.insert(
864                        AgentCapabilities::SendSensorData,
865                        puller_property.get_agent_endpoint(),
866                    );
867                    used_puller_indices.push(puller_property_index);
868                }
869                AgentCapabilities::ReceiveMotorData => {
870                    let publisher_index =
871                        self.try_get_publisher_property_index(&wanted_protocol)?;
872                    let publisher_property = &self.available_publishers[publisher_index];
873                    let mut publisher_server = publisher_property.as_boxed_server_publisher();
874                    publisher_server.request_start()?;
875                    motor_servers.push(publisher_server);
876                    endpoint_mappings.insert(
877                        AgentCapabilities::ReceiveMotorData,
878                        publisher_property.get_agent_endpoint(),
879                    );
880                    used_publisher_indices.push(publisher_index);
881                }
882                AgentCapabilities::ReceiveNeuronVisualizations => {
883                    // Prefer the last matching publisher for visualization so motor/viz publishers
884                    // configured in order [motor, visualization] map correctly.
885                    let publisher_index =
886                        self.try_get_last_publisher_property_index(&wanted_protocol)?;
887                    let publisher_property = &self.available_publishers[publisher_index];
888                    let mut publisher_server = publisher_property.as_boxed_server_publisher();
889                    publisher_server.request_start()?;
890                    visualizer_servers.push(publisher_server);
891                    endpoint_mappings.insert(
892                        AgentCapabilities::ReceiveNeuronVisualizations,
893                        publisher_property.get_agent_endpoint(),
894                    );
895                    used_publisher_indices.push(publisher_index);
896                }
897                AgentCapabilities::ReceiveSystemMessages => {
898                    todo!()
899                }
900            }
901        }
902
903        // everything is good, take used properties out of circulation by exact index
904        used_puller_indices.sort_unstable();
905        used_puller_indices.dedup();
906        for idx in used_puller_indices.into_iter().rev() {
907            self.available_pullers.remove(idx);
908        }
909
910        used_publisher_indices.sort_unstable();
911        used_publisher_indices.dedup();
912        for idx in used_publisher_indices.into_iter().rev() {
913            self.available_publishers.remove(idx);
914        }
915
916        // insert the servers into the cache
917        for sensor_server in sensor_servers {
918            let sensor_translator: SensorTranslator =
919                SensorTranslator::new(agent_id, sensor_server);
920            self.sensors.insert(agent_id, sensor_translator);
921        }
922
923        for motor_server in motor_servers {
924            let motor_translator: MotorTranslator = MotorTranslator::new(agent_id, motor_server);
925            self.motors.insert(agent_id, motor_translator);
926        }
927
928        for visualizer_server in visualizer_servers {
929            let visualizer_translator: VisualizationTranslator =
930                VisualizationTranslator::new(agent_id, visualizer_server);
931            self.visualizations.insert(agent_id, visualizer_translator);
932        }
933
934        self.all_registered_agents
935            .insert(agent_id, (descriptor, agent_capabilities));
936        self.agent_mapping_to_command_control_server_index
937            .insert(agent_id, command_server_index);
938        self.last_activity_by_agent.insert(agent_id, Instant::now());
939
940        Ok(endpoint_mappings)
941    }
942
943    /// Refresh liveness for a known agent based on command/control activity.
944    ///
945    /// FEAGI treats any valid command/control message as proof of liveness
946    /// (not just explicit heartbeat packets).
947    fn refresh_agent_activity(&mut self, agent_id: AgentID) {
948        self.last_activity_by_agent.insert(agent_id, Instant::now());
949    }
950
951    /// Find currently connected agent by descriptor value.
952    fn find_agent_id_by_descriptor(&self, descriptor: &AgentDescriptor) -> Option<AgentID> {
953        self.all_registered_agents
954            .iter()
955            .find_map(|(agent_id, (existing_descriptor, _))| {
956                if existing_descriptor == descriptor {
957                    Some(*agent_id)
958                } else {
959                    None
960                }
961            })
962    }
963
964    /// Periodically scan and remove stale agents that have exceeded heartbeat timeout.
965    fn try_prune_stale_agents(&mut self) {
966        if self.last_stale_check_at.elapsed() < self.liveness_config.stale_check_interval {
967            return;
968        }
969        self.last_stale_check_at = Instant::now();
970
971        let stale_ids: Vec<AgentID> = self
972            .last_activity_by_agent
973            .iter()
974            .filter_map(|(agent_id, last_seen)| {
975                if last_seen.elapsed() > self.liveness_config.heartbeat_timeout {
976                    Some(*agent_id)
977                } else {
978                    None
979                }
980            })
981            .collect();
982
983        for stale_id in stale_ids {
984            let stale_reason = format!(
985                "stale heartbeat timeout exceeded ({:.3}s)",
986                self.liveness_config.heartbeat_timeout.as_secs_f64()
987            );
988            self.deregister_agent_internal(stale_id, &stale_reason);
989        }
990    }
991
992    /// Fully remove an agent and recycle all transport resources.
993    ///
994    /// This is the single teardown path used by both voluntary and forced
995    /// deregistration.
996    fn deregister_agent_internal(&mut self, agent_id: AgentID, reason: &str) {
997        self.last_activity_by_agent.remove(&agent_id);
998        self.agent_mapping_to_command_control_server_index
999            .remove(&agent_id);
1000        let descriptor = self
1001            .all_registered_agents
1002            .remove(&agent_id)
1003            .map(|(descriptor, _)| descriptor);
1004        let descriptor_text = descriptor
1005            .as_ref()
1006            .map(|item| format!("{:?}", item))
1007            .unwrap_or_else(|| "<unknown-descriptor>".to_string());
1008        info!(
1009            target: "feagi-agent",
1010            "Agent deregistered: agent_id={} descriptor={} reason={}",
1011            agent_id.to_base64(),
1012            descriptor_text,
1013            reason
1014        );
1015        self.device_registrations_by_agent.remove(&agent_id);
1016
1017        if let Some(sensor) = self.sensors.remove(&agent_id) {
1018            self.available_pullers.push(sensor.into_puller_properties());
1019        }
1020        if let Some(motor) = self.motors.remove(&agent_id) {
1021            self.available_publishers
1022                .push(motor.into_publisher_properties());
1023        }
1024        if let Some(viz) = self.visualizations.remove(&agent_id) {
1025            self.available_publishers
1026                .push(viz.into_publisher_properties());
1027        }
1028
1029        if let Some(descriptor) = descriptor {
1030            self.agent_id_by_descriptor.remove(&descriptor);
1031            // Preserve descriptor-scoped device registrations across session teardown so
1032            // reconnecting agents can recover motor/sensory mapping state before they
1033            // resend AgentConfiguration. Session-scoped registrations are still removed.
1034        }
1035    }
1036
1037    //endregion
1038
1039    //endregion
1040}