Skip to main content

feagi_agent/server/
feagi_agent_handler.rs

1use crate::command_and_control::agent_embodiment_configuration_message::AgentEmbodimentConfigurationMessage;
2use crate::command_and_control::agent_registration_message::{
3    AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::server::auth::AgentAuth;
7use crate::server::wrappers::{
8    CommandControlWrapper, MotorTranslator, SensorTranslator, VisualizationTranslator,
9};
10use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
11use feagi_io::traits_and_enums::server::{
12    FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
13    FeagiServerPullerProperties, FeagiServerRouterProperties,
14};
15use feagi_io::traits_and_enums::shared::{
16    TransportProtocolEndpoint, TransportProtocolImplementation,
17};
18use feagi_io::AgentID;
19use feagi_serialization::FeagiByteContainer;
20use std::collections::{HashMap, HashSet};
21use std::time::{Duration, Instant};
22use tracing::{error, info, warn};
23
24type CommandServerIndex = usize;
25
26/// Server-side liveness configuration for command/control sessions.
27///
28/// `heartbeat_timeout` defines when a client is considered stale if no
29/// command/control messages are received.
30/// `stale_check_interval` controls how often stale scans run during polling.
31#[derive(Debug, Clone)]
32pub struct AgentLivenessConfig {
33    pub heartbeat_timeout: Duration,
34    pub stale_check_interval: Duration,
35}
36
37impl Default for AgentLivenessConfig {
38    fn default() -> Self {
39        Self {
40            heartbeat_timeout: Duration::from_secs(30),
41            stale_check_interval: Duration::from_secs(1),
42        }
43    }
44}
45
46pub struct FeagiAgentHandler {
47    agent_auth_backend: Box<dyn AgentAuth>,
48    available_publishers: Vec<Box<dyn FeagiServerPublisherProperties>>,
49    available_pullers: Vec<Box<dyn FeagiServerPullerProperties>>,
50    command_control_servers: Vec<CommandControlWrapper>,
51
52    all_registered_agents: HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
53    agent_mapping_to_command_control_server_index: HashMap<AgentID, CommandServerIndex>,
54    last_activity_by_agent: HashMap<AgentID, Instant>,
55    sensors: HashMap<AgentID, SensorTranslator>,
56    motors: HashMap<AgentID, MotorTranslator>,
57    visualizations: HashMap<AgentID, VisualizationTranslator>,
58    liveness_config: AgentLivenessConfig,
59    last_stale_check_at: Instant,
60
61    // this stuff is likely redundant
62    // REST STUFF
63    /// Device registrations by AgentDescriptor (REST API configuration storage)
64    device_registrations_by_descriptor: HashMap<AgentDescriptor, serde_json::Value>,
65    /// Agent ID (base64) by AgentDescriptor (for REST→WebSocket bridging)
66    agent_id_by_descriptor: HashMap<AgentDescriptor, String>,
67    /// Device registrations by AgentID (active connections)
68    device_registrations_by_agent: HashMap<AgentID, serde_json::Value>,
69}
70
71impl FeagiAgentHandler {
72    #[allow(dead_code)]
73    fn capabilities_equivalent(
74        existing: &[AgentCapabilities],
75        requested: &[AgentCapabilities],
76    ) -> bool {
77        existing.len() == requested.len()
78            && existing
79                .iter()
80                .all(|capability| requested.contains(capability))
81    }
82
83    /// Returns true when an existing descriptor session should be replaced by a new registration.
84    ///
85    /// This suppresses immediate descriptor-replacement churn caused by duplicate
86    /// registration packets that arrive within a very short window for the same
87    /// live agent session.
88    #[allow(dead_code)]
89    fn should_replace_existing_descriptor_session(&self, existing_agent_id: AgentID) -> bool {
90        let Some(last_seen) = self.last_activity_by_agent.get(&existing_agent_id) else {
91            // Missing liveness state is treated as stale and replaceable.
92            return true;
93        };
94
95        let duplicate_guard_window = self
96            .liveness_config
97            .stale_check_interval
98            .checked_mul(2)
99            .unwrap_or(self.liveness_config.stale_check_interval);
100
101        // If the existing session is still very fresh, treat incoming registration
102        // as duplicate/in-flight reconnect noise and keep the existing mapping.
103        last_seen.elapsed() > duplicate_guard_window
104    }
105
106    pub fn new(agent_auth_backend: Box<dyn AgentAuth>) -> FeagiAgentHandler {
107        Self::new_with_liveness_config(agent_auth_backend, AgentLivenessConfig::default())
108    }
109
110    /// Create a handler with explicit liveness configuration.
111    ///
112    /// This constructor is preferred in FEAGI runtime code paths where values
113    /// come from centralized configuration (`feagi_configuration.toml`).
114    pub fn new_with_liveness_config(
115        agent_auth_backend: Box<dyn AgentAuth>,
116        liveness_config: AgentLivenessConfig,
117    ) -> FeagiAgentHandler {
118        FeagiAgentHandler {
119            agent_auth_backend,
120            available_publishers: Vec::new(),
121            available_pullers: Vec::new(),
122
123            command_control_servers: Vec::new(),
124            all_registered_agents: HashMap::new(),
125            agent_mapping_to_command_control_server_index: HashMap::new(),
126            last_activity_by_agent: HashMap::new(),
127            sensors: Default::default(),
128            motors: Default::default(),
129            visualizations: Default::default(),
130            liveness_config,
131            last_stale_check_at: Instant::now(),
132
133            device_registrations_by_descriptor: HashMap::new(),
134            agent_id_by_descriptor: HashMap::new(),
135            device_registrations_by_agent: HashMap::new(),
136        }
137    }
138
139    //region Get Properties
140
141    pub fn get_all_registered_agents(
142        &self,
143    ) -> &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)> {
144        &self.all_registered_agents
145    }
146
147    pub fn get_all_registered_sensors(&self) -> HashSet<AgentID> {
148        self.sensors.keys().cloned().collect()
149    }
150
151    pub fn get_all_registered_motors(&self) -> HashSet<AgentID> {
152        self.motors.keys().cloned().collect()
153    }
154
155    pub fn get_all_registered_visualizations(&self) -> HashSet<AgentID> {
156        self.visualizations.keys().cloned().collect()
157    }
158
159    pub fn get_command_control_server_info(&self) -> Vec<Box<dyn FeagiServerRouterProperties>> {
160        let mut output: Vec<Box<dyn FeagiServerRouterProperties>> = Vec::new();
161        for command_control_server in &self.command_control_servers {
162            output.push(command_control_server.get_running_server_properties())
163        }
164        output
165    }
166
167    //region  REST
168
169    /// Get device registrations by AgentID
170    pub fn get_device_registrations_by_agent(
171        &self,
172        agent_id: AgentID,
173    ) -> Option<&serde_json::Value> {
174        self.device_registrations_by_agent.get(&agent_id)
175    }
176
177    /// Store device registrations by AgentDescriptor (REST API - before connection)
178    /// Also stores the original agent_id for later WebSocket→REST bridging
179    pub fn set_device_registrations_by_descriptor(
180        &mut self,
181        agent_id_base64: String,
182        agent_descriptor: AgentDescriptor,
183        device_registrations: serde_json::Value,
184    ) {
185        self.device_registrations_by_descriptor
186            .insert(agent_descriptor.clone(), device_registrations);
187        self.agent_id_by_descriptor
188            .insert(agent_descriptor, agent_id_base64);
189    }
190
191    /// Get device registrations by AgentDescriptor (REST API queries)
192    pub fn get_device_registrations_by_descriptor(
193        &self,
194        agent_descriptor: &AgentDescriptor,
195    ) -> Option<&serde_json::Value> {
196        self.device_registrations_by_descriptor
197            .get(agent_descriptor)
198    }
199
200    /// Store device registrations by AgentID (active connection)
201    pub fn set_device_registrations_by_agent(
202        &mut self,
203        agent_id: AgentID,
204        device_registrations: serde_json::Value,
205    ) {
206        self.device_registrations_by_agent
207            .insert(agent_id, device_registrations);
208    }
209
210    // TODO redudant, you can simply check if a AgentID has the capability hash?
211    /// Check if a agent has visualization capability configured
212    /// Returns (agent_id_base64, rate_hz) for registration with RuntimeService
213    pub fn get_visualization_info_for_agent(&self, agent_id: AgentID) -> Option<(String, f64)> {
214        let device_regs = self.device_registrations_by_agent.get(&agent_id)?;
215        let viz = device_regs.get("visualization")?;
216        let rate_hz = viz.get("rate_hz").and_then(|v| v.as_f64())?;
217
218        if rate_hz > 0.0 {
219            let agent_descriptor = self.all_registered_agents.get(&agent_id)?;
220            let agent_id = self
221                .agent_id_by_descriptor
222                .get(&agent_descriptor.0)?
223                .clone();
224            Some((agent_id, rate_hz))
225        } else {
226            None
227        }
228    }
229
230    //endregion
231
232    //endregion
233
234    //region Add Servers
235
236    /// Add a poll-based command/control server (ZMQ/WS). The router is wrapped in a
237    /// [`CommandControlWrapper`] that only exposes messages.
238    pub fn add_and_start_command_control_server(
239        &mut self,
240        router_property: Box<dyn FeagiServerRouterProperties>,
241    ) -> Result<(), FeagiAgentError> {
242        let mut router = router_property.as_boxed_server_router();
243        router.request_start()?;
244        let translator = CommandControlWrapper::new(router);
245        self.command_control_servers.push(translator);
246        Ok(())
247    }
248
249    pub fn add_publisher_server(&mut self, publisher: Box<dyn FeagiServerPublisherProperties>) {
250        // TODO check for collisions
251        self.available_publishers.push(publisher);
252    }
253
254    pub fn add_puller_server(&mut self, puller: Box<dyn FeagiServerPullerProperties>) {
255        // TODO check for collisions
256        self.available_pullers.push(puller);
257    }
258
259    // TODO talk about forcibly starting servers
260    /*
261    /// Add and start a broadcast publisher server (e.g., visualization on port 9050)
262    /// This creates a running server instance that can be polled and broadcast to
263    /// NOTE: This does NOT add to available_publishers - broadcast publishers are shared
264    pub fn add_and_start_broadcast_publisher(&mut self, publisher_props: Box<dyn FeagiServerPublisherProperties>) -> Result<(), FeagiAgentError> {
265        let mut publisher = publisher_props.as_boxed_server_publisher();
266        publisher.request_start()?;
267        self.broadcast_publishers.push(publisher);
268        Ok(())
269    }
270
271     */
272
273    //endregion
274
275    //region Command and Control
276
277    /// Poll all command and control servers. Messages for registration request and heartbeat are
278    /// handled internally here. Others are raised for FEAGI to act upon
279    pub fn poll_command_and_control(
280        &mut self,
281    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
282        self.try_prune_stale_agents();
283        for (command_index, translator) in self.command_control_servers.iter_mut().enumerate() {
284            // TODO smarter error handling. Many things don't deserve a panic
285            let possible_message =
286                translator.poll_for_incoming_messages(&self.all_registered_agents)?;
287
288            match possible_message {
289                None => {
290                    continue;
291                }
292                Some((agent_id, message, is_new_agent)) => {
293                    if is_new_agent {
294                        return self.handle_messages_from_unknown_agent_ids(
295                            agent_id,
296                            &message,
297                            command_index,
298                        );
299                    } else {
300                        return self.handle_messages_from_known_agent_ids(agent_id, message);
301                    }
302                }
303            }
304        }
305        // Nothing to report from anyone!
306        Ok(None)
307    }
308
309    /// Send a command and control message to a specific agent
310    pub fn send_message_to_agent(
311        &mut self,
312        agent_id: AgentID,
313        message: FeagiMessage,
314        increment_counter: u16,
315    ) -> Result<(), FeagiAgentError> {
316        let translator_index = match self
317            .agent_mapping_to_command_control_server_index
318            .get(&agent_id)
319        {
320            None => {
321                return Err(FeagiAgentError::Other(
322                    "No such Agent ID exists!".to_string(),
323                ))
324            }
325            Some(index) => index,
326        };
327
328        let command_translator = match self.command_control_servers.get_mut(*translator_index) {
329            None => {
330                panic!("Missing Index for command control server!") // something went wrong
331            }
332            Some(translator) => translator,
333        };
334        command_translator.send_message(agent_id, message, increment_counter)
335    }
336
337    /// Send a command/control response via the router that received the request.
338    ///
339    /// This is used for unknown sessions (pre-registration), where agent-to-router
340    /// mapping does not exist yet.
341    fn send_message_via_command_server(
342        &mut self,
343        command_server_index: CommandServerIndex,
344        session_id: AgentID,
345        message: FeagiMessage,
346        increment_counter: u16,
347    ) -> Result<(), FeagiAgentError> {
348        let command_translator = self
349            .command_control_servers
350            .get_mut(command_server_index)
351            .ok_or_else(|| {
352                FeagiAgentError::Other("Missing command control server index".to_string())
353            })?;
354        command_translator.send_message(session_id, message, increment_counter)
355    }
356
357    pub fn send_motor_data_to_agent(
358        &mut self,
359        agent_id: AgentID,
360        data: &FeagiByteContainer,
361    ) -> Result<(), FeagiAgentError> {
362        let motor_translator = self
363            .motors
364            .get_mut(&agent_id)
365            .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
366        motor_translator.poll_and_send_buffered_motor_data(data)?;
367        self.refresh_agent_activity(agent_id);
368        Ok(())
369    }
370
371    pub fn send_visualization_data_to_agent(
372        &mut self,
373        agent_id: AgentID,
374        data: &FeagiByteContainer,
375    ) -> Result<(), FeagiAgentError> {
376        let visualization_translator = self
377            .visualizations
378            .get_mut(&agent_id)
379            .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
380        visualization_translator.poll_and_send_visualization_data(data)?;
381        self.refresh_agent_activity(agent_id);
382        Ok(())
383    }
384
385    //endregion
386
387    //region Agents
388
389    pub fn poll_agent_sensors(&mut self) -> Result<Option<&FeagiByteContainer>, FeagiAgentError> {
390        for (_id, translator) in self.sensors.iter_mut() {
391            let possible_sensor_data = translator.poll_sensor_server()?;
392            if possible_sensor_data.is_some() {
393                return Ok(possible_sensor_data);
394            }
395        }
396        Ok(None)
397    }
398
399    pub fn poll_agent_motors(&mut self) -> Result<(), FeagiAgentError> {
400        for (_id, translator) in self.motors.iter_mut() {
401            translator.poll_motor_server()?;
402        }
403        Ok(())
404    }
405
406    pub fn poll_agent_visualizers(&mut self) -> Result<(), FeagiAgentError> {
407        for (_id, translator) in self.visualizations.iter_mut() {
408            translator.poll_visualization_server()?;
409        }
410        Ok(())
411    }
412
413    pub fn send_motor_data(
414        &mut self,
415        agent_id: AgentID,
416        motor_data: &FeagiByteContainer,
417    ) -> Result<(), FeagiAgentError> {
418        let embodiment_option = self.motors.get_mut(&agent_id);
419        match embodiment_option {
420            Some(embodiment) => {
421                embodiment.poll_and_send_buffered_motor_data(motor_data)?;
422                self.refresh_agent_activity(agent_id);
423                Ok(())
424            }
425            None => Err(FeagiAgentError::UnableToSendData(
426                "Nonexistant Agent ID!".to_string(),
427            )),
428        }
429    }
430
431    /// Send visualization data to a specific agent via dedicated visualization channel
432    pub fn send_visualization_data(
433        &mut self,
434        agent_id: AgentID,
435        viz_data: &FeagiByteContainer,
436    ) -> Result<(), FeagiAgentError> {
437        let embodiment_option = self.visualizations.get_mut(&agent_id);
438        match embodiment_option {
439            Some(embodiment) => {
440                embodiment.poll_and_send_visualization_data(viz_data)?;
441                self.refresh_agent_activity(agent_id);
442                Ok(())
443            }
444            None => Err(FeagiAgentError::UnableToSendData(
445                "Nonexistant Agent ID!".to_string(),
446            )),
447        }
448    }
449
450    //endregion
451
452    //region Internal
453
454    //region Get property
455
456    fn try_get_puller_property_index(
457        &mut self,
458        wanted_protocol: &TransportProtocolImplementation,
459    ) -> Result<usize, FeagiAgentError> {
460        for i in 0..self.available_pullers.len() {
461            let available_puller = &self.available_pullers[i];
462            if &available_puller
463                .get_bind_point()
464                .as_transport_protocol_implementation()
465                != wanted_protocol
466            {
467                // not the protocol we are looking for
468                continue;
469            } else {
470                // found the protocol we want
471                return Ok(i);
472            }
473        }
474        Err(FeagiAgentError::InitFail(
475            "Missing required protocol puller".to_string(),
476        ))
477    }
478
479    fn try_get_publisher_property_index(
480        &mut self,
481        wanted_protocol: &TransportProtocolImplementation,
482    ) -> Result<usize, FeagiAgentError> {
483        for i in 0..self.available_publishers.len() {
484            let available_publisher = &self.available_publishers[i];
485            if &available_publisher.get_protocol() != wanted_protocol {
486                // not the protocol we are looking for
487                continue;
488            } else {
489                // found the protocol we want
490                return Ok(i);
491            }
492        }
493        Err(FeagiAgentError::InitFail(
494            "Missing required protocol publisher".to_string(),
495        ))
496    }
497
498    fn try_get_last_publisher_property_index(
499        &mut self,
500        wanted_protocol: &TransportProtocolImplementation,
501    ) -> Result<usize, FeagiAgentError> {
502        for i in (0..self.available_publishers.len()).rev() {
503            let available_publisher = &self.available_publishers[i];
504            if &available_publisher.get_protocol() != wanted_protocol {
505                continue;
506            } else {
507                return Ok(i);
508            }
509        }
510        Err(FeagiAgentError::InitFail(
511            "Missing required protocol publisher".to_string(),
512        ))
513    }
514
515    //endregion
516
517    //region Message Handling
518
519    fn handle_messages_from_unknown_agent_ids(
520        &mut self,
521        agent_id: AgentID,
522        message: &FeagiMessage,
523        command_control_index: CommandServerIndex,
524    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
525        match &message {
526            FeagiMessage::AgentRegistration(register_message) => {
527                match &register_message {
528                    AgentRegistrationMessage::ClientRequestRegistration(registration_request) => {
529                        info!(
530                            target: "feagi-agent",
531                            "WS registration request received: session={} descriptor={:?} caps={:?} protocol={:?}",
532                            agent_id.to_base64(),
533                            registration_request.agent_descriptor(),
534                            registration_request.requested_capabilities(),
535                            registration_request.connection_protocol()
536                        );
537                        let auth_result = self
538                            .agent_auth_backend
539                            .verify_agent_allowed_to_connect(registration_request);
540                        if auth_result.is_err() {
541                            warn!(
542                                target: "feagi-agent",
543                                "WS registration rejected by auth backend: session={} descriptor={:?}",
544                                agent_id.to_base64(),
545                                registration_request.agent_descriptor()
546                            );
547                            self.send_message_via_command_server(
548                                command_control_index,
549                                agent_id,
550                                FeagiMessage::AgentRegistration(
551                                    AgentRegistrationMessage::ServerRespondsRegistration(
552                                        RegistrationResponse::FailedInvalidAuth,
553                                    ),
554                                ),
555                                0,
556                            )?;
557                            return Ok(None);
558                        }
559                        // auth passed; if the same descriptor is already connected, replace it
560                        // first so reconnects can reclaim resources immediately.
561                        //
562                        // Important: only replace when capability shape is equivalent. This
563                        // prevents unrelated clients that share a descriptor string from
564                        // evicting each other (for example, a motor/sensor client removing
565                        // a live visualization client).
566                        if let Some(existing_agent_id) = self
567                            .find_agent_id_by_descriptor(registration_request.agent_descriptor())
568                        {
569                            if let Some((_, _existing_capabilities)) =
570                                self.all_registered_agents.get(&existing_agent_id)
571                            {
572                                // TODO disable colliding agent check for now to temporarily bypass duplicate agent
573                                /*
574                                if !Self::capabilities_equivalent(
575                                    existing_capabilities,
576                                    registration_request.requested_capabilities(),
577                                ) {
578                                    self.send_message_via_command_server(
579                                        command_control_index,
580                                        agent_id,
581                                        FeagiMessage::AgentRegistration(
582                                            AgentRegistrationMessage::ServerRespondsRegistration(
583                                                RegistrationResponse::AlreadyRegistered,
584                                            ),
585                                        ),
586                                        0,
587                                    )?;
588                                    return Ok(None);
589                                }
590                                */
591                            }
592                            // TODO prevent duplicate agent
593                            /*
594
595                            if !self.should_replace_existing_descriptor_session(existing_agent_id) {
596                                info!(
597                                    target: "feagi-agent",
598                                    "Ignoring duplicate registration for descriptor {:?}: existing session {} remains active",
599                                    registration_request.agent_descriptor(),
600                                    existing_agent_id.to_base64()
601                                );
602                                self.send_message_via_command_server(
603                                    command_control_index,
604                                    agent_id,
605                                    FeagiMessage::AgentRegistration(
606                                        AgentRegistrationMessage::ServerRespondsRegistration(
607                                            RegistrationResponse::AlreadyRegistered,
608                                        ),
609                                    ),
610                                    0,
611                                )?;
612                                return Ok(None);
613                            }
614
615                             */
616                            let replacement_reason = format!(
617                                "descriptor replacement by new registration session={}",
618                                agent_id.to_base64()
619                            );
620                            self.deregister_agent_internal(existing_agent_id, &replacement_reason);
621                        }
622
623                        // register and always respond deterministically (avoid client timeouts).
624                        let mappings = match self.register_agent(
625                            agent_id,
626                            *registration_request.connection_protocol(),
627                            registration_request.requested_capabilities().to_vec(),
628                            registration_request.agent_descriptor().clone(),
629                            command_control_index,
630                        ) {
631                            Ok(mappings) => mappings,
632                            Err(_) => {
633                                error!(
634                                    target: "feagi-agent",
635                                    "WS registration failed while creating transport mappings: session={} descriptor={:?}",
636                                    agent_id.to_base64(),
637                                    registration_request.agent_descriptor()
638                                );
639                                self.send_message_via_command_server(
640                                    command_control_index,
641                                    agent_id,
642                                    FeagiMessage::AgentRegistration(
643                                        AgentRegistrationMessage::ServerRespondsRegistration(
644                                            RegistrationResponse::FailedInvalidRequest,
645                                        ),
646                                    ),
647                                    0,
648                                )?;
649                                return Ok(None);
650                            }
651                        };
652
653                        let mapped_caps: Vec<_> = mappings.keys().cloned().collect();
654                        let response = RegistrationResponse::Success(agent_id, mappings);
655                        let response_message = FeagiMessage::AgentRegistration(
656                            AgentRegistrationMessage::ServerRespondsRegistration(response),
657                        );
658                        self.send_message_via_command_server(
659                            command_control_index,
660                            agent_id,
661                            response_message,
662                            0,
663                        )?;
664                        info!(
665                            target: "feagi-agent",
666                            "WS registration success response sent: session={} descriptor={:?} mapped_caps={:?}",
667                            agent_id.to_base64(),
668                            registration_request.agent_descriptor(),
669                            mapped_caps
670                        );
671                        Ok(None)
672                    }
673                    AgentRegistrationMessage::ClientRequestDeregistration(_) => {
674                        let response = FeagiMessage::AgentRegistration(
675                            AgentRegistrationMessage::ServerRespondsDeregistration(
676                                DeregistrationResponse::NotRegistered,
677                            ),
678                        );
679                        self.send_message_via_command_server(
680                            command_control_index,
681                            agent_id,
682                            response,
683                            0,
684                        )?;
685                        Ok(None)
686                    }
687                    _ => {
688                        // If not requesting registration, we dont want to hear it
689                        Ok(None)
690                    }
691                }
692            }
693            _ => {
694                // If the new agent is not registering, we don't want to hear it
695                Ok(None)
696            }
697        }
698    }
699
700    fn handle_messages_from_known_agent_ids(
701        &mut self,
702        agent_id: AgentID,
703        message: FeagiMessage,
704    ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
705        self.refresh_agent_activity(agent_id);
706        match &message {
707            FeagiMessage::AgentRegistration(register_message) => {
708                match register_message {
709                    AgentRegistrationMessage::ClientRequestDeregistration(request) => {
710                        // Respond first so REQ/REP clients can complete the in-flight request.
711                        self.send_message_to_agent(
712                            agent_id,
713                            FeagiMessage::AgentRegistration(
714                                AgentRegistrationMessage::ServerRespondsDeregistration(
715                                    DeregistrationResponse::Success,
716                                ),
717                            ),
718                            0,
719                        )?;
720                        let dereg_reason = request
721                            .reason()
722                            .map(|text| format!("client request: {}", text))
723                            .unwrap_or_else(|| "client request".to_string());
724                        self.deregister_agent_internal(agent_id, &dereg_reason);
725                        Ok(None)
726                    }
727                    _ => {
728                        // Already registered? dont dont register again
729                        // TODO any special exceptions?
730                        Ok(None)
731                    }
732                }
733            }
734            FeagiMessage::HeartBeat => {
735                // We can handle heartbeat here
736                // TODO or maybe we should let the higher levels handle it?
737                self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
738                Ok(None)
739            }
740            FeagiMessage::AgentConfiguration(
741                AgentEmbodimentConfigurationMessage::AgentConfigurationDetails(device_def),
742            ) => {
743                let device_regs = serde_json::to_value(device_def).unwrap_or_else(|_| {
744                    tracing::warn!(
745                        target: "feagi-agent",
746                        "Failed to serialize AgentConfigurationDetails to JSON"
747                    );
748                    serde_json::Value::Object(serde_json::Map::new())
749                });
750                self.set_device_registrations_by_agent(agent_id, device_regs.clone());
751                if let Some((descriptor, _)) = self.all_registered_agents.get(&agent_id) {
752                    self.set_device_registrations_by_descriptor(
753                        agent_id.to_base64(),
754                        descriptor.clone(),
755                        device_regs,
756                    );
757                }
758                info!(
759                    target: "feagi-agent",
760                    "Stored device registrations for agent {}",
761                    agent_id.to_base64()
762                );
763                // Send acknowledgment so REQ/REP clients can complete the request
764                self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
765                Ok(None)
766            }
767            _ => {
768                // Throw up anything else
769                Ok(Some((agent_id, message)))
770            }
771        }
772    }
773
774    //endregion
775
776    //region Registration
777
778    fn register_agent(
779        &mut self,
780        agent_id: AgentID,
781        wanted_protocol: TransportProtocolImplementation,
782        agent_capabilities: Vec<AgentCapabilities>,
783        descriptor: AgentDescriptor,
784        command_server_index: CommandServerIndex,
785    ) -> Result<HashMap<AgentCapabilities, TransportProtocolEndpoint>, FeagiAgentError> {
786        // TODO prevent duplicate registration
787        /*
788        if self.all_registered_agents.contains_key(&agent_id) {
789            return Err(FeagiAgentError::ConnectionFailed(
790                "Agent Already registered".to_string(),
791            ));
792        }
793
794         */
795
796        let mut used_puller_indices: Vec<usize> = Vec::new();
797        let mut used_publisher_indices: Vec<usize> = Vec::new();
798        let mut sensor_servers: Vec<Box<dyn FeagiServerPuller>> = Vec::new();
799        let mut motor_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
800        let mut visualizer_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
801        let mut endpoint_mappings: HashMap<AgentCapabilities, TransportProtocolEndpoint> =
802            HashMap::new();
803
804        // We try spawning all the servers first without taking any properties out mof circulation
805        for agent_capability in &agent_capabilities {
806            match agent_capability {
807                AgentCapabilities::SendSensorData => {
808                    let puller_property_index =
809                        self.try_get_puller_property_index(&wanted_protocol)?;
810                    let puller_property = &self.available_pullers[puller_property_index];
811                    let mut sensor_server = puller_property.as_boxed_server_puller();
812                    sensor_server.request_start()?;
813                    sensor_servers.push(sensor_server);
814                    endpoint_mappings.insert(
815                        AgentCapabilities::SendSensorData,
816                        puller_property.get_agent_endpoint(),
817                    );
818                    used_puller_indices.push(puller_property_index);
819                }
820                AgentCapabilities::ReceiveMotorData => {
821                    let publisher_index =
822                        self.try_get_publisher_property_index(&wanted_protocol)?;
823                    let publisher_property = &self.available_publishers[publisher_index];
824                    let mut publisher_server = publisher_property.as_boxed_server_publisher();
825                    publisher_server.request_start()?;
826                    motor_servers.push(publisher_server);
827                    endpoint_mappings.insert(
828                        AgentCapabilities::ReceiveMotorData,
829                        publisher_property.get_agent_endpoint(),
830                    );
831                    used_publisher_indices.push(publisher_index);
832                }
833                AgentCapabilities::ReceiveNeuronVisualizations => {
834                    // Prefer the last matching publisher for visualization so motor/viz publishers
835                    // configured in order [motor, visualization] map correctly.
836                    let publisher_index =
837                        self.try_get_last_publisher_property_index(&wanted_protocol)?;
838                    let publisher_property = &self.available_publishers[publisher_index];
839                    let mut publisher_server = publisher_property.as_boxed_server_publisher();
840                    publisher_server.request_start()?;
841                    visualizer_servers.push(publisher_server);
842                    endpoint_mappings.insert(
843                        AgentCapabilities::ReceiveNeuronVisualizations,
844                        publisher_property.get_agent_endpoint(),
845                    );
846                    used_publisher_indices.push(publisher_index);
847                }
848                AgentCapabilities::ReceiveSystemMessages => {
849                    todo!()
850                }
851            }
852        }
853
854        // everything is good, take used properties out of circulation by exact index
855        used_puller_indices.sort_unstable();
856        used_puller_indices.dedup();
857        for idx in used_puller_indices.into_iter().rev() {
858            self.available_pullers.remove(idx);
859        }
860
861        used_publisher_indices.sort_unstable();
862        used_publisher_indices.dedup();
863        for idx in used_publisher_indices.into_iter().rev() {
864            self.available_publishers.remove(idx);
865        }
866
867        // insert the servers into the cache
868        for sensor_server in sensor_servers {
869            let sensor_translator: SensorTranslator =
870                SensorTranslator::new(agent_id, sensor_server);
871            self.sensors.insert(agent_id, sensor_translator);
872        }
873
874        for motor_server in motor_servers {
875            let motor_translator: MotorTranslator = MotorTranslator::new(agent_id, motor_server);
876            self.motors.insert(agent_id, motor_translator);
877        }
878
879        for visualizer_server in visualizer_servers {
880            let visualizer_translator: VisualizationTranslator =
881                VisualizationTranslator::new(agent_id, visualizer_server);
882            self.visualizations.insert(agent_id, visualizer_translator);
883        }
884
885        self.all_registered_agents
886            .insert(agent_id, (descriptor, agent_capabilities));
887        self.agent_mapping_to_command_control_server_index
888            .insert(agent_id, command_server_index);
889        self.last_activity_by_agent.insert(agent_id, Instant::now());
890
891        Ok(endpoint_mappings)
892    }
893
894    /// Refresh liveness for a known agent based on command/control activity.
895    ///
896    /// FEAGI treats any valid command/control message as proof of liveness
897    /// (not just explicit heartbeat packets).
898    fn refresh_agent_activity(&mut self, agent_id: AgentID) {
899        self.last_activity_by_agent.insert(agent_id, Instant::now());
900    }
901
902    /// Find currently connected agent by descriptor value.
903    fn find_agent_id_by_descriptor(&self, descriptor: &AgentDescriptor) -> Option<AgentID> {
904        self.all_registered_agents
905            .iter()
906            .find_map(|(agent_id, (existing_descriptor, _))| {
907                if existing_descriptor == descriptor {
908                    Some(*agent_id)
909                } else {
910                    None
911                }
912            })
913    }
914
915    /// Periodically scan and remove stale agents that have exceeded heartbeat timeout.
916    fn try_prune_stale_agents(&mut self) {
917        if self.last_stale_check_at.elapsed() < self.liveness_config.stale_check_interval {
918            return;
919        }
920        self.last_stale_check_at = Instant::now();
921
922        let stale_ids: Vec<AgentID> = self
923            .last_activity_by_agent
924            .iter()
925            .filter_map(|(agent_id, last_seen)| {
926                if last_seen.elapsed() > self.liveness_config.heartbeat_timeout {
927                    Some(*agent_id)
928                } else {
929                    None
930                }
931            })
932            .collect();
933
934        for stale_id in stale_ids {
935            let stale_reason = format!(
936                "stale heartbeat timeout exceeded ({:.3}s)",
937                self.liveness_config.heartbeat_timeout.as_secs_f64()
938            );
939            self.deregister_agent_internal(stale_id, &stale_reason);
940        }
941    }
942
943    /// Fully remove an agent and recycle all transport resources.
944    ///
945    /// This is the single teardown path used by both voluntary and forced
946    /// deregistration.
947    fn deregister_agent_internal(&mut self, agent_id: AgentID, reason: &str) {
948        self.last_activity_by_agent.remove(&agent_id);
949        self.agent_mapping_to_command_control_server_index
950            .remove(&agent_id);
951        let descriptor = self
952            .all_registered_agents
953            .remove(&agent_id)
954            .map(|(descriptor, _)| descriptor);
955        let descriptor_text = descriptor
956            .as_ref()
957            .map(|item| format!("{:?}", item))
958            .unwrap_or_else(|| "<unknown-descriptor>".to_string());
959        info!(
960            target: "feagi-agent",
961            "Agent deregistered: agent_id={} descriptor={} reason={}",
962            agent_id.to_base64(),
963            descriptor_text,
964            reason
965        );
966        self.device_registrations_by_agent.remove(&agent_id);
967
968        if let Some(sensor) = self.sensors.remove(&agent_id) {
969            self.available_pullers.push(sensor.into_puller_properties());
970        }
971        if let Some(motor) = self.motors.remove(&agent_id) {
972            self.available_publishers
973                .push(motor.into_publisher_properties());
974        }
975        if let Some(viz) = self.visualizations.remove(&agent_id) {
976            self.available_publishers
977                .push(viz.into_publisher_properties());
978        }
979
980        if let Some(descriptor) = descriptor {
981            self.agent_id_by_descriptor.remove(&descriptor);
982            self.device_registrations_by_descriptor.remove(&descriptor);
983        }
984    }
985
986    //endregion
987
988    //endregion
989}