1use crate::command_and_control::agent_embodiment_configuration_message::AgentEmbodimentConfigurationMessage;
2use crate::command_and_control::agent_registration_message::{
3 AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::server::auth::AgentAuth;
7use crate::server::wrappers::{
8 CommandControlWrapper, MotorTranslator, SensorTranslator, VisualizationTranslator,
9};
10use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
11use feagi_io::traits_and_enums::server::{
12 FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
13 FeagiServerPullerProperties, FeagiServerRouterProperties,
14};
15use feagi_io::traits_and_enums::shared::{
16 TransportProtocolEndpoint, TransportProtocolImplementation,
17};
18use feagi_io::AgentID;
19use feagi_serialization::FeagiByteContainer;
20use std::collections::{HashMap, HashSet};
21use std::time::{Duration, Instant};
22use tracing::{error, info, warn};
23
24type CommandServerIndex = usize;
25
26#[derive(Debug, Clone)]
32pub struct AgentLivenessConfig {
33 pub heartbeat_timeout: Duration,
34 pub stale_check_interval: Duration,
35}
36
37impl Default for AgentLivenessConfig {
38 fn default() -> Self {
39 Self {
40 heartbeat_timeout: Duration::from_secs(30),
41 stale_check_interval: Duration::from_secs(1),
42 }
43 }
44}
45
46pub struct FeagiAgentHandler {
47 agent_auth_backend: Box<dyn AgentAuth>,
48 available_publishers: Vec<Box<dyn FeagiServerPublisherProperties>>,
49 available_pullers: Vec<Box<dyn FeagiServerPullerProperties>>,
50 command_control_servers: Vec<CommandControlWrapper>,
51
52 all_registered_agents: HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
53 agent_mapping_to_command_control_server_index: HashMap<AgentID, CommandServerIndex>,
54 last_activity_by_agent: HashMap<AgentID, Instant>,
55 sensors: HashMap<AgentID, SensorTranslator>,
56 motors: HashMap<AgentID, MotorTranslator>,
57 visualizations: HashMap<AgentID, VisualizationTranslator>,
58 liveness_config: AgentLivenessConfig,
59 last_stale_check_at: Instant,
60
61 device_registrations_by_descriptor: HashMap<AgentDescriptor, serde_json::Value>,
65 agent_id_by_descriptor: HashMap<AgentDescriptor, String>,
67 device_registrations_by_agent: HashMap<AgentID, serde_json::Value>,
69}
70
71impl FeagiAgentHandler {
72 #[allow(dead_code)]
73 fn capabilities_equivalent(
74 existing: &[AgentCapabilities],
75 requested: &[AgentCapabilities],
76 ) -> bool {
77 existing.len() == requested.len()
78 && existing
79 .iter()
80 .all(|capability| requested.contains(capability))
81 }
82
83 #[allow(dead_code)]
89 fn should_replace_existing_descriptor_session(&self, existing_agent_id: AgentID) -> bool {
90 let Some(last_seen) = self.last_activity_by_agent.get(&existing_agent_id) else {
91 return true;
93 };
94
95 let duplicate_guard_window = self
96 .liveness_config
97 .stale_check_interval
98 .checked_mul(2)
99 .unwrap_or(self.liveness_config.stale_check_interval);
100
101 last_seen.elapsed() > duplicate_guard_window
104 }
105
106 pub fn new(agent_auth_backend: Box<dyn AgentAuth>) -> FeagiAgentHandler {
107 Self::new_with_liveness_config(agent_auth_backend, AgentLivenessConfig::default())
108 }
109
110 pub fn new_with_liveness_config(
115 agent_auth_backend: Box<dyn AgentAuth>,
116 liveness_config: AgentLivenessConfig,
117 ) -> FeagiAgentHandler {
118 FeagiAgentHandler {
119 agent_auth_backend,
120 available_publishers: Vec::new(),
121 available_pullers: Vec::new(),
122
123 command_control_servers: Vec::new(),
124 all_registered_agents: HashMap::new(),
125 agent_mapping_to_command_control_server_index: HashMap::new(),
126 last_activity_by_agent: HashMap::new(),
127 sensors: Default::default(),
128 motors: Default::default(),
129 visualizations: Default::default(),
130 liveness_config,
131 last_stale_check_at: Instant::now(),
132
133 device_registrations_by_descriptor: HashMap::new(),
134 agent_id_by_descriptor: HashMap::new(),
135 device_registrations_by_agent: HashMap::new(),
136 }
137 }
138
139 pub fn get_all_registered_agents(
142 &self,
143 ) -> &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)> {
144 &self.all_registered_agents
145 }
146
147 pub fn get_all_registered_sensors(&self) -> HashSet<AgentID> {
148 self.sensors.keys().cloned().collect()
149 }
150
151 pub fn get_all_registered_motors(&self) -> HashSet<AgentID> {
152 self.motors.keys().cloned().collect()
153 }
154
155 pub fn get_all_registered_visualizations(&self) -> HashSet<AgentID> {
156 self.visualizations.keys().cloned().collect()
157 }
158
159 pub fn get_command_control_server_info(&self) -> Vec<Box<dyn FeagiServerRouterProperties>> {
160 let mut output: Vec<Box<dyn FeagiServerRouterProperties>> = Vec::new();
161 for command_control_server in &self.command_control_servers {
162 output.push(command_control_server.get_running_server_properties())
163 }
164 output
165 }
166
167 pub fn get_device_registrations_by_agent(
171 &self,
172 agent_id: AgentID,
173 ) -> Option<&serde_json::Value> {
174 self.device_registrations_by_agent.get(&agent_id)
175 }
176
177 pub fn set_device_registrations_by_descriptor(
180 &mut self,
181 agent_id_base64: String,
182 agent_descriptor: AgentDescriptor,
183 device_registrations: serde_json::Value,
184 ) {
185 self.device_registrations_by_descriptor
186 .insert(agent_descriptor.clone(), device_registrations);
187 self.agent_id_by_descriptor
188 .insert(agent_descriptor, agent_id_base64);
189 }
190
191 pub fn get_device_registrations_by_descriptor(
193 &self,
194 agent_descriptor: &AgentDescriptor,
195 ) -> Option<&serde_json::Value> {
196 self.device_registrations_by_descriptor
197 .get(agent_descriptor)
198 }
199
200 pub fn set_device_registrations_by_agent(
202 &mut self,
203 agent_id: AgentID,
204 device_registrations: serde_json::Value,
205 ) {
206 self.device_registrations_by_agent
207 .insert(agent_id, device_registrations);
208 }
209
210 pub fn get_visualization_info_for_agent(&self, agent_id: AgentID) -> Option<(String, f64)> {
214 let device_regs = self.device_registrations_by_agent.get(&agent_id)?;
215 let viz = device_regs.get("visualization")?;
216 let rate_hz = viz.get("rate_hz").and_then(|v| v.as_f64())?;
217
218 if rate_hz > 0.0 {
219 let agent_descriptor = self.all_registered_agents.get(&agent_id)?;
220 let agent_id = self
221 .agent_id_by_descriptor
222 .get(&agent_descriptor.0)?
223 .clone();
224 Some((agent_id, rate_hz))
225 } else {
226 None
227 }
228 }
229
230 pub fn add_and_start_command_control_server(
239 &mut self,
240 router_property: Box<dyn FeagiServerRouterProperties>,
241 ) -> Result<(), FeagiAgentError> {
242 let mut router = router_property.as_boxed_server_router();
243 router.request_start()?;
244 let translator = CommandControlWrapper::new(router);
245 self.command_control_servers.push(translator);
246 Ok(())
247 }
248
249 pub fn add_publisher_server(&mut self, publisher: Box<dyn FeagiServerPublisherProperties>) {
250 self.available_publishers.push(publisher);
252 }
253
254 pub fn add_puller_server(&mut self, puller: Box<dyn FeagiServerPullerProperties>) {
255 self.available_pullers.push(puller);
257 }
258
259 pub fn poll_command_and_control(
280 &mut self,
281 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
282 self.try_prune_stale_agents();
283 for (command_index, translator) in self.command_control_servers.iter_mut().enumerate() {
284 let possible_message =
286 translator.poll_for_incoming_messages(&self.all_registered_agents)?;
287
288 match possible_message {
289 None => {
290 continue;
291 }
292 Some((agent_id, message, is_new_agent)) => {
293 if is_new_agent {
294 return self.handle_messages_from_unknown_agent_ids(
295 agent_id,
296 &message,
297 command_index,
298 );
299 } else {
300 return self.handle_messages_from_known_agent_ids(agent_id, message);
301 }
302 }
303 }
304 }
305 Ok(None)
307 }
308
309 pub fn send_message_to_agent(
311 &mut self,
312 agent_id: AgentID,
313 message: FeagiMessage,
314 increment_counter: u16,
315 ) -> Result<(), FeagiAgentError> {
316 let translator_index = match self
317 .agent_mapping_to_command_control_server_index
318 .get(&agent_id)
319 {
320 None => {
321 return Err(FeagiAgentError::Other(
322 "No such Agent ID exists!".to_string(),
323 ))
324 }
325 Some(index) => index,
326 };
327
328 let command_translator = match self.command_control_servers.get_mut(*translator_index) {
329 None => {
330 panic!("Missing Index for command control server!") }
332 Some(translator) => translator,
333 };
334 command_translator.send_message(agent_id, message, increment_counter)
335 }
336
337 fn send_message_via_command_server(
342 &mut self,
343 command_server_index: CommandServerIndex,
344 session_id: AgentID,
345 message: FeagiMessage,
346 increment_counter: u16,
347 ) -> Result<(), FeagiAgentError> {
348 let command_translator = self
349 .command_control_servers
350 .get_mut(command_server_index)
351 .ok_or_else(|| {
352 FeagiAgentError::Other("Missing command control server index".to_string())
353 })?;
354 command_translator.send_message(session_id, message, increment_counter)
355 }
356
357 pub fn send_motor_data_to_agent(
358 &mut self,
359 agent_id: AgentID,
360 data: &FeagiByteContainer,
361 ) -> Result<(), FeagiAgentError> {
362 let motor_translator = self
363 .motors
364 .get_mut(&agent_id)
365 .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
366 motor_translator.poll_and_send_buffered_motor_data(data)?;
367 self.refresh_agent_activity(agent_id);
368 Ok(())
369 }
370
371 pub fn send_visualization_data_to_agent(
372 &mut self,
373 agent_id: AgentID,
374 data: &FeagiByteContainer,
375 ) -> Result<(), FeagiAgentError> {
376 let visualization_translator = self
377 .visualizations
378 .get_mut(&agent_id)
379 .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
380 visualization_translator.poll_and_send_visualization_data(data)?;
381 self.refresh_agent_activity(agent_id);
382 Ok(())
383 }
384
385 pub fn poll_agent_sensors(&mut self) -> Result<Option<&FeagiByteContainer>, FeagiAgentError> {
390 for (_id, translator) in self.sensors.iter_mut() {
391 let possible_sensor_data = translator.poll_sensor_server()?;
392 if possible_sensor_data.is_some() {
393 return Ok(possible_sensor_data);
394 }
395 }
396 Ok(None)
397 }
398
399 pub fn poll_agent_motors(&mut self) -> Result<(), FeagiAgentError> {
400 for (_id, translator) in self.motors.iter_mut() {
401 translator.poll_motor_server()?;
402 }
403 Ok(())
404 }
405
406 pub fn poll_agent_visualizers(&mut self) -> Result<(), FeagiAgentError> {
407 for (_id, translator) in self.visualizations.iter_mut() {
408 translator.poll_visualization_server()?;
409 }
410 Ok(())
411 }
412
413 pub fn send_motor_data(
414 &mut self,
415 agent_id: AgentID,
416 motor_data: &FeagiByteContainer,
417 ) -> Result<(), FeagiAgentError> {
418 let embodiment_option = self.motors.get_mut(&agent_id);
419 match embodiment_option {
420 Some(embodiment) => {
421 embodiment.poll_and_send_buffered_motor_data(motor_data)?;
422 self.refresh_agent_activity(agent_id);
423 Ok(())
424 }
425 None => Err(FeagiAgentError::UnableToSendData(
426 "Nonexistant Agent ID!".to_string(),
427 )),
428 }
429 }
430
431 pub fn send_visualization_data(
433 &mut self,
434 agent_id: AgentID,
435 viz_data: &FeagiByteContainer,
436 ) -> Result<(), FeagiAgentError> {
437 let embodiment_option = self.visualizations.get_mut(&agent_id);
438 match embodiment_option {
439 Some(embodiment) => {
440 embodiment.poll_and_send_visualization_data(viz_data)?;
441 self.refresh_agent_activity(agent_id);
442 Ok(())
443 }
444 None => Err(FeagiAgentError::UnableToSendData(
445 "Nonexistant Agent ID!".to_string(),
446 )),
447 }
448 }
449
450 fn try_get_puller_property_index(
457 &mut self,
458 wanted_protocol: &TransportProtocolImplementation,
459 ) -> Result<usize, FeagiAgentError> {
460 for i in 0..self.available_pullers.len() {
461 let available_puller = &self.available_pullers[i];
462 if &available_puller
463 .get_bind_point()
464 .as_transport_protocol_implementation()
465 != wanted_protocol
466 {
467 continue;
469 } else {
470 return Ok(i);
472 }
473 }
474 Err(FeagiAgentError::InitFail(
475 "Missing required protocol puller".to_string(),
476 ))
477 }
478
479 fn try_get_publisher_property_index(
480 &mut self,
481 wanted_protocol: &TransportProtocolImplementation,
482 ) -> Result<usize, FeagiAgentError> {
483 for i in 0..self.available_publishers.len() {
484 let available_publisher = &self.available_publishers[i];
485 if &available_publisher.get_protocol() != wanted_protocol {
486 continue;
488 } else {
489 return Ok(i);
491 }
492 }
493 Err(FeagiAgentError::InitFail(
494 "Missing required protocol publisher".to_string(),
495 ))
496 }
497
498 fn try_get_last_publisher_property_index(
499 &mut self,
500 wanted_protocol: &TransportProtocolImplementation,
501 ) -> Result<usize, FeagiAgentError> {
502 for i in (0..self.available_publishers.len()).rev() {
503 let available_publisher = &self.available_publishers[i];
504 if &available_publisher.get_protocol() != wanted_protocol {
505 continue;
506 } else {
507 return Ok(i);
508 }
509 }
510 Err(FeagiAgentError::InitFail(
511 "Missing required protocol publisher".to_string(),
512 ))
513 }
514
515 fn handle_messages_from_unknown_agent_ids(
520 &mut self,
521 agent_id: AgentID,
522 message: &FeagiMessage,
523 command_control_index: CommandServerIndex,
524 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
525 match &message {
526 FeagiMessage::AgentRegistration(register_message) => {
527 match ®ister_message {
528 AgentRegistrationMessage::ClientRequestRegistration(registration_request) => {
529 info!(
530 target: "feagi-agent",
531 "WS registration request received: session={} descriptor={:?} caps={:?} protocol={:?}",
532 agent_id.to_base64(),
533 registration_request.agent_descriptor(),
534 registration_request.requested_capabilities(),
535 registration_request.connection_protocol()
536 );
537 let auth_result = self
538 .agent_auth_backend
539 .verify_agent_allowed_to_connect(registration_request);
540 if auth_result.is_err() {
541 warn!(
542 target: "feagi-agent",
543 "WS registration rejected by auth backend: session={} descriptor={:?}",
544 agent_id.to_base64(),
545 registration_request.agent_descriptor()
546 );
547 self.send_message_via_command_server(
548 command_control_index,
549 agent_id,
550 FeagiMessage::AgentRegistration(
551 AgentRegistrationMessage::ServerRespondsRegistration(
552 RegistrationResponse::FailedInvalidAuth,
553 ),
554 ),
555 0,
556 )?;
557 return Ok(None);
558 }
559 if let Some(existing_agent_id) = self
567 .find_agent_id_by_descriptor(registration_request.agent_descriptor())
568 {
569 if let Some((_, _existing_capabilities)) =
570 self.all_registered_agents.get(&existing_agent_id)
571 {
572 }
592 let replacement_reason = format!(
617 "descriptor replacement by new registration session={}",
618 agent_id.to_base64()
619 );
620 self.deregister_agent_internal(existing_agent_id, &replacement_reason);
621 }
622
623 let mappings = match self.register_agent(
625 agent_id,
626 *registration_request.connection_protocol(),
627 registration_request.requested_capabilities().to_vec(),
628 registration_request.agent_descriptor().clone(),
629 command_control_index,
630 ) {
631 Ok(mappings) => mappings,
632 Err(_) => {
633 error!(
634 target: "feagi-agent",
635 "WS registration failed while creating transport mappings: session={} descriptor={:?}",
636 agent_id.to_base64(),
637 registration_request.agent_descriptor()
638 );
639 self.send_message_via_command_server(
640 command_control_index,
641 agent_id,
642 FeagiMessage::AgentRegistration(
643 AgentRegistrationMessage::ServerRespondsRegistration(
644 RegistrationResponse::FailedInvalidRequest,
645 ),
646 ),
647 0,
648 )?;
649 return Ok(None);
650 }
651 };
652
653 let mapped_caps: Vec<_> = mappings.keys().cloned().collect();
654 let response = RegistrationResponse::Success(agent_id, mappings);
655 let response_message = FeagiMessage::AgentRegistration(
656 AgentRegistrationMessage::ServerRespondsRegistration(response),
657 );
658 self.send_message_via_command_server(
659 command_control_index,
660 agent_id,
661 response_message,
662 0,
663 )?;
664 info!(
665 target: "feagi-agent",
666 "WS registration success response sent: session={} descriptor={:?} mapped_caps={:?}",
667 agent_id.to_base64(),
668 registration_request.agent_descriptor(),
669 mapped_caps
670 );
671 Ok(None)
672 }
673 AgentRegistrationMessage::ClientRequestDeregistration(_) => {
674 let response = FeagiMessage::AgentRegistration(
675 AgentRegistrationMessage::ServerRespondsDeregistration(
676 DeregistrationResponse::NotRegistered,
677 ),
678 );
679 self.send_message_via_command_server(
680 command_control_index,
681 agent_id,
682 response,
683 0,
684 )?;
685 Ok(None)
686 }
687 _ => {
688 Ok(None)
690 }
691 }
692 }
693 _ => {
694 Ok(None)
696 }
697 }
698 }
699
700 fn handle_messages_from_known_agent_ids(
701 &mut self,
702 agent_id: AgentID,
703 message: FeagiMessage,
704 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
705 self.refresh_agent_activity(agent_id);
706 match &message {
707 FeagiMessage::AgentRegistration(register_message) => {
708 match register_message {
709 AgentRegistrationMessage::ClientRequestDeregistration(request) => {
710 self.send_message_to_agent(
712 agent_id,
713 FeagiMessage::AgentRegistration(
714 AgentRegistrationMessage::ServerRespondsDeregistration(
715 DeregistrationResponse::Success,
716 ),
717 ),
718 0,
719 )?;
720 let dereg_reason = request
721 .reason()
722 .map(|text| format!("client request: {}", text))
723 .unwrap_or_else(|| "client request".to_string());
724 self.deregister_agent_internal(agent_id, &dereg_reason);
725 Ok(None)
726 }
727 _ => {
728 Ok(None)
731 }
732 }
733 }
734 FeagiMessage::HeartBeat => {
735 self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
738 Ok(None)
739 }
740 FeagiMessage::AgentConfiguration(
741 AgentEmbodimentConfigurationMessage::AgentConfigurationDetails(device_def),
742 ) => {
743 let device_regs = serde_json::to_value(device_def).unwrap_or_else(|_| {
744 tracing::warn!(
745 target: "feagi-agent",
746 "Failed to serialize AgentConfigurationDetails to JSON"
747 );
748 serde_json::Value::Object(serde_json::Map::new())
749 });
750 self.set_device_registrations_by_agent(agent_id, device_regs.clone());
751 if let Some((descriptor, _)) = self.all_registered_agents.get(&agent_id) {
752 self.set_device_registrations_by_descriptor(
753 agent_id.to_base64(),
754 descriptor.clone(),
755 device_regs,
756 );
757 }
758 info!(
759 target: "feagi-agent",
760 "Stored device registrations for agent {}",
761 agent_id.to_base64()
762 );
763 self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
765 Ok(None)
766 }
767 _ => {
768 Ok(Some((agent_id, message)))
770 }
771 }
772 }
773
774 fn register_agent(
779 &mut self,
780 agent_id: AgentID,
781 wanted_protocol: TransportProtocolImplementation,
782 agent_capabilities: Vec<AgentCapabilities>,
783 descriptor: AgentDescriptor,
784 command_server_index: CommandServerIndex,
785 ) -> Result<HashMap<AgentCapabilities, TransportProtocolEndpoint>, FeagiAgentError> {
786 let mut used_puller_indices: Vec<usize> = Vec::new();
797 let mut used_publisher_indices: Vec<usize> = Vec::new();
798 let mut sensor_servers: Vec<Box<dyn FeagiServerPuller>> = Vec::new();
799 let mut motor_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
800 let mut visualizer_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
801 let mut endpoint_mappings: HashMap<AgentCapabilities, TransportProtocolEndpoint> =
802 HashMap::new();
803
804 for agent_capability in &agent_capabilities {
806 match agent_capability {
807 AgentCapabilities::SendSensorData => {
808 let puller_property_index =
809 self.try_get_puller_property_index(&wanted_protocol)?;
810 let puller_property = &self.available_pullers[puller_property_index];
811 let mut sensor_server = puller_property.as_boxed_server_puller();
812 sensor_server.request_start()?;
813 sensor_servers.push(sensor_server);
814 endpoint_mappings.insert(
815 AgentCapabilities::SendSensorData,
816 puller_property.get_agent_endpoint(),
817 );
818 used_puller_indices.push(puller_property_index);
819 }
820 AgentCapabilities::ReceiveMotorData => {
821 let publisher_index =
822 self.try_get_publisher_property_index(&wanted_protocol)?;
823 let publisher_property = &self.available_publishers[publisher_index];
824 let mut publisher_server = publisher_property.as_boxed_server_publisher();
825 publisher_server.request_start()?;
826 motor_servers.push(publisher_server);
827 endpoint_mappings.insert(
828 AgentCapabilities::ReceiveMotorData,
829 publisher_property.get_agent_endpoint(),
830 );
831 used_publisher_indices.push(publisher_index);
832 }
833 AgentCapabilities::ReceiveNeuronVisualizations => {
834 let publisher_index =
837 self.try_get_last_publisher_property_index(&wanted_protocol)?;
838 let publisher_property = &self.available_publishers[publisher_index];
839 let mut publisher_server = publisher_property.as_boxed_server_publisher();
840 publisher_server.request_start()?;
841 visualizer_servers.push(publisher_server);
842 endpoint_mappings.insert(
843 AgentCapabilities::ReceiveNeuronVisualizations,
844 publisher_property.get_agent_endpoint(),
845 );
846 used_publisher_indices.push(publisher_index);
847 }
848 AgentCapabilities::ReceiveSystemMessages => {
849 todo!()
850 }
851 }
852 }
853
854 used_puller_indices.sort_unstable();
856 used_puller_indices.dedup();
857 for idx in used_puller_indices.into_iter().rev() {
858 self.available_pullers.remove(idx);
859 }
860
861 used_publisher_indices.sort_unstable();
862 used_publisher_indices.dedup();
863 for idx in used_publisher_indices.into_iter().rev() {
864 self.available_publishers.remove(idx);
865 }
866
867 for sensor_server in sensor_servers {
869 let sensor_translator: SensorTranslator =
870 SensorTranslator::new(agent_id, sensor_server);
871 self.sensors.insert(agent_id, sensor_translator);
872 }
873
874 for motor_server in motor_servers {
875 let motor_translator: MotorTranslator = MotorTranslator::new(agent_id, motor_server);
876 self.motors.insert(agent_id, motor_translator);
877 }
878
879 for visualizer_server in visualizer_servers {
880 let visualizer_translator: VisualizationTranslator =
881 VisualizationTranslator::new(agent_id, visualizer_server);
882 self.visualizations.insert(agent_id, visualizer_translator);
883 }
884
885 self.all_registered_agents
886 .insert(agent_id, (descriptor, agent_capabilities));
887 self.agent_mapping_to_command_control_server_index
888 .insert(agent_id, command_server_index);
889 self.last_activity_by_agent.insert(agent_id, Instant::now());
890
891 Ok(endpoint_mappings)
892 }
893
894 fn refresh_agent_activity(&mut self, agent_id: AgentID) {
899 self.last_activity_by_agent.insert(agent_id, Instant::now());
900 }
901
902 fn find_agent_id_by_descriptor(&self, descriptor: &AgentDescriptor) -> Option<AgentID> {
904 self.all_registered_agents
905 .iter()
906 .find_map(|(agent_id, (existing_descriptor, _))| {
907 if existing_descriptor == descriptor {
908 Some(*agent_id)
909 } else {
910 None
911 }
912 })
913 }
914
915 fn try_prune_stale_agents(&mut self) {
917 if self.last_stale_check_at.elapsed() < self.liveness_config.stale_check_interval {
918 return;
919 }
920 self.last_stale_check_at = Instant::now();
921
922 let stale_ids: Vec<AgentID> = self
923 .last_activity_by_agent
924 .iter()
925 .filter_map(|(agent_id, last_seen)| {
926 if last_seen.elapsed() > self.liveness_config.heartbeat_timeout {
927 Some(*agent_id)
928 } else {
929 None
930 }
931 })
932 .collect();
933
934 for stale_id in stale_ids {
935 let stale_reason = format!(
936 "stale heartbeat timeout exceeded ({:.3}s)",
937 self.liveness_config.heartbeat_timeout.as_secs_f64()
938 );
939 self.deregister_agent_internal(stale_id, &stale_reason);
940 }
941 }
942
943 fn deregister_agent_internal(&mut self, agent_id: AgentID, reason: &str) {
948 self.last_activity_by_agent.remove(&agent_id);
949 self.agent_mapping_to_command_control_server_index
950 .remove(&agent_id);
951 let descriptor = self
952 .all_registered_agents
953 .remove(&agent_id)
954 .map(|(descriptor, _)| descriptor);
955 let descriptor_text = descriptor
956 .as_ref()
957 .map(|item| format!("{:?}", item))
958 .unwrap_or_else(|| "<unknown-descriptor>".to_string());
959 info!(
960 target: "feagi-agent",
961 "Agent deregistered: agent_id={} descriptor={} reason={}",
962 agent_id.to_base64(),
963 descriptor_text,
964 reason
965 );
966 self.device_registrations_by_agent.remove(&agent_id);
967
968 if let Some(sensor) = self.sensors.remove(&agent_id) {
969 self.available_pullers.push(sensor.into_puller_properties());
970 }
971 if let Some(motor) = self.motors.remove(&agent_id) {
972 self.available_publishers
973 .push(motor.into_publisher_properties());
974 }
975 if let Some(viz) = self.visualizations.remove(&agent_id) {
976 self.available_publishers
977 .push(viz.into_publisher_properties());
978 }
979
980 if let Some(descriptor) = descriptor {
981 self.agent_id_by_descriptor.remove(&descriptor);
982 self.device_registrations_by_descriptor.remove(&descriptor);
983 }
984 }
985
986 }