1use crate::command_and_control::agent_embodiment_configuration_message::AgentEmbodimentConfigurationMessage;
2use crate::command_and_control::agent_registration_message::{
3 AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::server::auth::AgentAuth;
7use crate::server::wrappers::{
8 CommandControlWrapper, MotorTranslator, SensorTranslator, VisualizationTranslator,
9};
10use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
11use feagi_io::traits_and_enums::server::{
12 FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
13 FeagiServerPullerProperties, FeagiServerRouterProperties,
14};
15use feagi_io::traits_and_enums::shared::{
16 TransportProtocolEndpoint, TransportProtocolImplementation,
17};
18use feagi_io::AgentID;
19use feagi_serialization::FeagiByteContainer;
20use std::collections::{HashMap, HashSet};
21use std::time::{Duration, Instant};
22use tracing::{debug, error, info, warn};
23
24type CommandServerIndex = usize;
25
26#[derive(Debug, Clone)]
32pub struct AgentLivenessConfig {
33 pub heartbeat_timeout: Duration,
34 pub stale_check_interval: Duration,
35}
36
37impl Default for AgentLivenessConfig {
38 fn default() -> Self {
39 Self {
40 heartbeat_timeout: Duration::from_secs(30),
41 stale_check_interval: Duration::from_secs(1),
42 }
43 }
44}
45
46pub struct FeagiAgentHandler {
47 agent_auth_backend: Box<dyn AgentAuth>,
48 available_publishers: Vec<Box<dyn FeagiServerPublisherProperties>>,
49 available_pullers: Vec<Box<dyn FeagiServerPullerProperties>>,
50 command_control_servers: Vec<CommandControlWrapper>,
51
52 all_registered_agents: HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
53 agent_mapping_to_command_control_server_index: HashMap<AgentID, CommandServerIndex>,
54 last_activity_by_agent: HashMap<AgentID, Instant>,
55 sensors: HashMap<AgentID, SensorTranslator>,
56 motors: HashMap<AgentID, MotorTranslator>,
57 visualizations: HashMap<AgentID, VisualizationTranslator>,
58 sensor_poll_cursor: usize,
59 liveness_config: AgentLivenessConfig,
60 last_stale_check_at: Instant,
61
62 device_registrations_by_descriptor: HashMap<AgentDescriptor, serde_json::Value>,
66 agent_id_by_descriptor: HashMap<AgentDescriptor, String>,
68 device_registrations_by_agent: HashMap<AgentID, serde_json::Value>,
70}
71
72impl FeagiAgentHandler {
73 #[allow(dead_code)]
74 fn capabilities_equivalent(
75 existing: &[AgentCapabilities],
76 requested: &[AgentCapabilities],
77 ) -> bool {
78 existing.len() == requested.len()
79 && existing
80 .iter()
81 .all(|capability| requested.contains(capability))
82 }
83
84 #[allow(dead_code)]
90 fn should_replace_existing_descriptor_session(&self, existing_agent_id: AgentID) -> bool {
91 let Some(last_seen) = self.last_activity_by_agent.get(&existing_agent_id) else {
92 return true;
94 };
95
96 let duplicate_guard_window = self
97 .liveness_config
98 .stale_check_interval
99 .checked_mul(2)
100 .unwrap_or(self.liveness_config.stale_check_interval);
101
102 last_seen.elapsed() > duplicate_guard_window
105 }
106
107 pub fn new(agent_auth_backend: Box<dyn AgentAuth>) -> FeagiAgentHandler {
108 Self::new_with_liveness_config(agent_auth_backend, AgentLivenessConfig::default())
109 }
110
111 pub fn new_with_liveness_config(
116 agent_auth_backend: Box<dyn AgentAuth>,
117 liveness_config: AgentLivenessConfig,
118 ) -> FeagiAgentHandler {
119 FeagiAgentHandler {
120 agent_auth_backend,
121 available_publishers: Vec::new(),
122 available_pullers: Vec::new(),
123
124 command_control_servers: Vec::new(),
125 all_registered_agents: HashMap::new(),
126 agent_mapping_to_command_control_server_index: HashMap::new(),
127 last_activity_by_agent: HashMap::new(),
128 sensors: Default::default(),
129 motors: Default::default(),
130 visualizations: Default::default(),
131 sensor_poll_cursor: 0,
132 liveness_config,
133 last_stale_check_at: Instant::now(),
134
135 device_registrations_by_descriptor: HashMap::new(),
136 agent_id_by_descriptor: HashMap::new(),
137 device_registrations_by_agent: HashMap::new(),
138 }
139 }
140
141 pub fn get_all_registered_agents(
144 &self,
145 ) -> &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)> {
146 &self.all_registered_agents
147 }
148
149 pub fn get_all_registered_sensors(&self) -> HashSet<AgentID> {
150 self.sensors.keys().cloned().collect()
151 }
152
153 pub fn get_all_registered_motors(&self) -> HashSet<AgentID> {
154 self.motors.keys().cloned().collect()
155 }
156
157 pub fn get_all_registered_visualizations(&self) -> HashSet<AgentID> {
158 self.visualizations.keys().cloned().collect()
159 }
160
161 pub fn register_logical_agent(
166 &mut self,
167 agent_id: AgentID,
168 descriptor: AgentDescriptor,
169 capabilities: Vec<AgentCapabilities>,
170 ) {
171 self.all_registered_agents
172 .insert(agent_id, (descriptor, capabilities));
173 self.last_activity_by_agent.insert(agent_id, Instant::now());
174 }
175
176 pub fn force_deregister_all_agents(&mut self, reason: &str) -> Vec<String> {
181 let ids: Vec<AgentID> = self.all_registered_agents.keys().copied().collect();
182 let mut removed_ids = Vec::with_capacity(ids.len());
183 for agent_id in ids {
184 removed_ids.push(agent_id.to_base64());
185 self.deregister_agent_internal(agent_id, reason);
186 }
187 removed_ids
188 }
189
190 pub fn get_command_control_server_info(&self) -> Vec<Box<dyn FeagiServerRouterProperties>> {
191 let mut output: Vec<Box<dyn FeagiServerRouterProperties>> = Vec::new();
192 for command_control_server in &self.command_control_servers {
193 output.push(command_control_server.get_running_server_properties())
194 }
195 output
196 }
197
198 pub fn get_device_registrations_by_agent(
202 &self,
203 agent_id: AgentID,
204 ) -> Option<&serde_json::Value> {
205 self.device_registrations_by_agent.get(&agent_id)
206 }
207
208 pub fn set_device_registrations_by_descriptor(
211 &mut self,
212 agent_id_base64: String,
213 agent_descriptor: AgentDescriptor,
214 device_registrations: serde_json::Value,
215 ) {
216 self.device_registrations_by_descriptor
217 .insert(agent_descriptor.clone(), device_registrations);
218 self.agent_id_by_descriptor
219 .insert(agent_descriptor, agent_id_base64);
220 }
221
222 pub fn get_device_registrations_by_descriptor(
224 &self,
225 agent_descriptor: &AgentDescriptor,
226 ) -> Option<&serde_json::Value> {
227 self.device_registrations_by_descriptor
228 .get(agent_descriptor)
229 }
230
231 pub fn set_device_registrations_by_agent(
233 &mut self,
234 agent_id: AgentID,
235 device_registrations: serde_json::Value,
236 ) {
237 self.device_registrations_by_agent
238 .insert(agent_id, device_registrations);
239 }
240
241 pub fn get_visualization_info_for_agent(&self, agent_id: AgentID) -> Option<(String, f64)> {
245 let device_regs = self.device_registrations_by_agent.get(&agent_id)?;
246 let viz = device_regs.get("visualization")?;
247 let rate_hz = viz.get("rate_hz").and_then(|v| v.as_f64())?;
248
249 if rate_hz > 0.0 {
250 let agent_descriptor = self.all_registered_agents.get(&agent_id)?;
251 let agent_id = self
252 .agent_id_by_descriptor
253 .get(&agent_descriptor.0)?
254 .clone();
255 Some((agent_id, rate_hz))
256 } else {
257 None
258 }
259 }
260
261 pub fn add_and_start_command_control_server(
270 &mut self,
271 router_property: Box<dyn FeagiServerRouterProperties>,
272 ) -> Result<(), FeagiAgentError> {
273 let mut router = router_property.as_boxed_server_router();
274 router.request_start()?;
275 let translator = CommandControlWrapper::new(router);
276 self.command_control_servers.push(translator);
277 Ok(())
278 }
279
280 pub fn add_publisher_server(&mut self, publisher: Box<dyn FeagiServerPublisherProperties>) {
281 self.available_publishers.push(publisher);
283 }
284
285 pub fn add_puller_server(&mut self, puller: Box<dyn FeagiServerPullerProperties>) {
286 self.available_pullers.push(puller);
288 }
289
290 pub fn poll_command_and_control(
311 &mut self,
312 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
313 self.try_prune_stale_agents();
314 for (command_index, translator) in self.command_control_servers.iter_mut().enumerate() {
315 let possible_message =
317 translator.poll_for_incoming_messages(&self.all_registered_agents)?;
318
319 match possible_message {
320 None => {
321 continue;
322 }
323 Some((agent_id, message, is_new_agent)) => {
324 if is_new_agent {
325 return self.handle_messages_from_unknown_agent_ids(
326 agent_id,
327 &message,
328 command_index,
329 );
330 } else {
331 return self.handle_messages_from_known_agent_ids(agent_id, message);
332 }
333 }
334 }
335 }
336 Ok(None)
338 }
339
340 pub fn send_message_to_agent(
342 &mut self,
343 agent_id: AgentID,
344 message: FeagiMessage,
345 increment_counter: u16,
346 ) -> Result<(), FeagiAgentError> {
347 let translator_index = match self
348 .agent_mapping_to_command_control_server_index
349 .get(&agent_id)
350 {
351 None => {
352 return Err(FeagiAgentError::Other(
353 "No such Agent ID exists!".to_string(),
354 ))
355 }
356 Some(index) => index,
357 };
358
359 let command_translator = match self.command_control_servers.get_mut(*translator_index) {
360 None => {
361 panic!("Missing Index for command control server!") }
363 Some(translator) => translator,
364 };
365 command_translator.send_message(agent_id, message, increment_counter)
366 }
367
368 fn send_message_via_command_server(
373 &mut self,
374 command_server_index: CommandServerIndex,
375 session_id: AgentID,
376 message: FeagiMessage,
377 increment_counter: u16,
378 ) -> Result<(), FeagiAgentError> {
379 let command_translator = self
380 .command_control_servers
381 .get_mut(command_server_index)
382 .ok_or_else(|| {
383 FeagiAgentError::Other("Missing command control server index".to_string())
384 })?;
385 command_translator.send_message(session_id, message, increment_counter)
386 }
387
388 pub fn send_motor_data_to_agent(
389 &mut self,
390 agent_id: AgentID,
391 data: &FeagiByteContainer,
392 ) -> Result<(), FeagiAgentError> {
393 let motor_translator = self
394 .motors
395 .get_mut(&agent_id)
396 .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
397 motor_translator.poll_and_send_buffered_motor_data(data)?;
398 self.refresh_agent_activity(agent_id);
399 Ok(())
400 }
401
402 pub fn send_visualization_data_to_agent(
403 &mut self,
404 agent_id: AgentID,
405 data: &FeagiByteContainer,
406 ) -> Result<(), FeagiAgentError> {
407 let visualization_translator = self
408 .visualizations
409 .get_mut(&agent_id)
410 .ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
411 visualization_translator.poll_and_send_visualization_data(data)?;
412 self.refresh_agent_activity(agent_id);
413 Ok(())
414 }
415
416 pub fn poll_agent_sensors(&mut self) -> Result<Option<FeagiByteContainer>, FeagiAgentError> {
421 let mut sensor_ids: Vec<AgentID> = self.sensors.keys().copied().collect();
422 if sensor_ids.is_empty() {
423 return Ok(None);
424 }
425
426 sensor_ids.sort_by_key(|agent_id| agent_id.to_base64());
427 let count = sensor_ids.len();
428 let start = self.sensor_poll_cursor % count;
429
430 for offset in 0..count {
431 let idx = (start + offset) % count;
432 let agent_id = sensor_ids[idx];
433 let polled_data = if let Some(translator) = self.sensors.get_mut(&agent_id) {
434 translator.poll_sensor_server()?.cloned()
435 } else {
436 None
437 };
438
439 if let Some(data) = polled_data {
440 self.sensor_poll_cursor = (idx + 1) % count;
441 self.refresh_agent_activity(agent_id);
442 return Ok(Some(data));
443 }
444 }
445
446 self.sensor_poll_cursor = (start + 1) % count;
447 Ok(None)
448 }
449
450 pub fn poll_agent_motors(&mut self) -> Result<(), FeagiAgentError> {
451 for (_id, translator) in self.motors.iter_mut() {
452 translator.poll_motor_server()?;
453 }
454 Ok(())
455 }
456
457 pub fn poll_agent_visualizers(&mut self) -> Result<(), FeagiAgentError> {
458 for (_id, translator) in self.visualizations.iter_mut() {
459 translator.poll_visualization_server()?;
460 }
461 Ok(())
462 }
463
464 pub fn send_motor_data(
465 &mut self,
466 agent_id: AgentID,
467 motor_data: &FeagiByteContainer,
468 ) -> Result<(), FeagiAgentError> {
469 let embodiment_option = self.motors.get_mut(&agent_id);
470 match embodiment_option {
471 Some(embodiment) => {
472 embodiment.poll_and_send_buffered_motor_data(motor_data)?;
473 self.refresh_agent_activity(agent_id);
474 Ok(())
475 }
476 None => Err(FeagiAgentError::UnableToSendData(
477 "Nonexistant Agent ID!".to_string(),
478 )),
479 }
480 }
481
482 pub fn send_visualization_data(
484 &mut self,
485 agent_id: AgentID,
486 viz_data: &FeagiByteContainer,
487 ) -> Result<(), FeagiAgentError> {
488 let embodiment_option = self.visualizations.get_mut(&agent_id);
489 match embodiment_option {
490 Some(embodiment) => {
491 embodiment.poll_and_send_visualization_data(viz_data)?;
492 self.refresh_agent_activity(agent_id);
493 Ok(())
494 }
495 None => Err(FeagiAgentError::UnableToSendData(
496 "Nonexistant Agent ID!".to_string(),
497 )),
498 }
499 }
500
501 fn try_get_puller_property_index(
508 &mut self,
509 wanted_protocol: &TransportProtocolImplementation,
510 ) -> Result<usize, FeagiAgentError> {
511 for i in 0..self.available_pullers.len() {
512 let available_puller = &self.available_pullers[i];
513 if &available_puller
514 .get_bind_point()
515 .as_transport_protocol_implementation()
516 != wanted_protocol
517 {
518 continue;
520 } else {
521 return Ok(i);
523 }
524 }
525 Err(FeagiAgentError::InitFail(
526 "Missing required protocol puller".to_string(),
527 ))
528 }
529
530 fn try_get_publisher_property_index(
531 &mut self,
532 wanted_protocol: &TransportProtocolImplementation,
533 ) -> Result<usize, FeagiAgentError> {
534 for i in 0..self.available_publishers.len() {
535 let available_publisher = &self.available_publishers[i];
536 if &available_publisher.get_protocol() != wanted_protocol {
537 continue;
539 } else {
540 return Ok(i);
542 }
543 }
544 Err(FeagiAgentError::InitFail(
545 "Missing required protocol publisher".to_string(),
546 ))
547 }
548
549 fn try_get_last_publisher_property_index(
550 &mut self,
551 wanted_protocol: &TransportProtocolImplementation,
552 ) -> Result<usize, FeagiAgentError> {
553 for i in (0..self.available_publishers.len()).rev() {
554 let available_publisher = &self.available_publishers[i];
555 if &available_publisher.get_protocol() != wanted_protocol {
556 continue;
557 } else {
558 return Ok(i);
559 }
560 }
561 Err(FeagiAgentError::InitFail(
562 "Missing required protocol publisher".to_string(),
563 ))
564 }
565
566 fn handle_messages_from_unknown_agent_ids(
571 &mut self,
572 agent_id: AgentID,
573 message: &FeagiMessage,
574 command_control_index: CommandServerIndex,
575 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
576 match &message {
577 FeagiMessage::AgentRegistration(register_message) => {
578 match ®ister_message {
579 AgentRegistrationMessage::ClientRequestRegistration(registration_request) => {
580 debug!(
581 target: "feagi-agent",
582 "WS registration request received: session={} descriptor={:?} caps={:?} protocol={:?}",
583 agent_id.to_base64(),
584 registration_request.agent_descriptor(),
585 registration_request.requested_capabilities(),
586 registration_request.connection_protocol()
587 );
588 let auth_result = self
589 .agent_auth_backend
590 .verify_agent_allowed_to_connect(registration_request);
591 if auth_result.is_err() {
592 warn!(
593 target: "feagi-agent",
594 "WS registration rejected by auth backend: session={} descriptor={:?}",
595 agent_id.to_base64(),
596 registration_request.agent_descriptor()
597 );
598 self.send_message_via_command_server(
599 command_control_index,
600 agent_id,
601 FeagiMessage::AgentRegistration(
602 AgentRegistrationMessage::ServerRespondsRegistration(
603 RegistrationResponse::FailedInvalidAuth,
604 ),
605 ),
606 0,
607 )?;
608 return Ok(None);
609 }
610 if let Some(existing_agent_id) = self
618 .find_agent_id_by_descriptor(registration_request.agent_descriptor())
619 {
620 if let Some((_, existing_capabilities)) =
621 self.all_registered_agents.get(&existing_agent_id)
622 {
623 if !Self::capabilities_equivalent(
624 existing_capabilities,
625 registration_request.requested_capabilities(),
626 ) {
627 info!(
628 target: "feagi-agent",
629 "Rejecting descriptor-collision registration for {:?}: existing session {} has different capabilities",
630 registration_request.agent_descriptor(),
631 existing_agent_id.to_base64()
632 );
633 self.send_message_via_command_server(
634 command_control_index,
635 agent_id,
636 FeagiMessage::AgentRegistration(
637 AgentRegistrationMessage::ServerRespondsRegistration(
638 RegistrationResponse::AlreadyRegistered,
639 ),
640 ),
641 0,
642 )?;
643 return Ok(None);
644 }
645 }
646 if !self.should_replace_existing_descriptor_session(existing_agent_id) {
647 debug!(
648 target: "feagi-agent",
649 "Ignoring duplicate registration for descriptor {:?}: existing session {} remains active",
650 registration_request.agent_descriptor(),
651 existing_agent_id.to_base64()
652 );
653 self.send_message_via_command_server(
654 command_control_index,
655 agent_id,
656 FeagiMessage::AgentRegistration(
657 AgentRegistrationMessage::ServerRespondsRegistration(
658 RegistrationResponse::AlreadyRegistered,
659 ),
660 ),
661 0,
662 )?;
663 return Ok(None);
664 }
665 let replacement_reason = format!(
666 "descriptor replacement by new registration session={}",
667 agent_id.to_base64()
668 );
669 self.deregister_agent_internal(existing_agent_id, &replacement_reason);
670 }
671
672 let mappings = match self.register_agent(
674 agent_id,
675 *registration_request.connection_protocol(),
676 registration_request.requested_capabilities().to_vec(),
677 registration_request.agent_descriptor().clone(),
678 command_control_index,
679 ) {
680 Ok(mappings) => mappings,
681 Err(_) => {
682 error!(
683 target: "feagi-agent",
684 "WS registration failed while creating transport mappings: session={} descriptor={:?}",
685 agent_id.to_base64(),
686 registration_request.agent_descriptor()
687 );
688 self.send_message_via_command_server(
689 command_control_index,
690 agent_id,
691 FeagiMessage::AgentRegistration(
692 AgentRegistrationMessage::ServerRespondsRegistration(
693 RegistrationResponse::FailedInvalidRequest,
694 ),
695 ),
696 0,
697 )?;
698 return Ok(None);
699 }
700 };
701
702 let mapped_caps: Vec<_> = mappings.keys().cloned().collect();
703 let response = RegistrationResponse::Success(agent_id, mappings);
704 let response_message = FeagiMessage::AgentRegistration(
705 AgentRegistrationMessage::ServerRespondsRegistration(response),
706 );
707 self.send_message_via_command_server(
708 command_control_index,
709 agent_id,
710 response_message,
711 0,
712 )?;
713 debug!(
714 target: "feagi-agent",
715 "WS registration success response sent: session={} descriptor={:?} mapped_caps={:?}",
716 agent_id.to_base64(),
717 registration_request.agent_descriptor(),
718 mapped_caps
719 );
720 Ok(None)
721 }
722 AgentRegistrationMessage::ClientRequestDeregistration(_) => {
723 let response = FeagiMessage::AgentRegistration(
724 AgentRegistrationMessage::ServerRespondsDeregistration(
725 DeregistrationResponse::NotRegistered,
726 ),
727 );
728 self.send_message_via_command_server(
729 command_control_index,
730 agent_id,
731 response,
732 0,
733 )?;
734 Ok(None)
735 }
736 _ => {
737 Ok(None)
739 }
740 }
741 }
742 _ => {
743 Ok(None)
745 }
746 }
747 }
748
749 fn handle_messages_from_known_agent_ids(
750 &mut self,
751 agent_id: AgentID,
752 message: FeagiMessage,
753 ) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
754 self.refresh_agent_activity(agent_id);
755 match &message {
756 FeagiMessage::AgentRegistration(register_message) => {
757 match register_message {
758 AgentRegistrationMessage::ClientRequestDeregistration(request) => {
759 self.send_message_to_agent(
761 agent_id,
762 FeagiMessage::AgentRegistration(
763 AgentRegistrationMessage::ServerRespondsDeregistration(
764 DeregistrationResponse::Success,
765 ),
766 ),
767 0,
768 )?;
769 let dereg_reason = request
770 .reason()
771 .map(|text| format!("client request: {}", text))
772 .unwrap_or_else(|| "client request".to_string());
773 self.deregister_agent_internal(agent_id, &dereg_reason);
774 Ok(None)
775 }
776 _ => {
777 Ok(None)
780 }
781 }
782 }
783 FeagiMessage::HeartBeat => {
784 self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
787 Ok(None)
788 }
789 FeagiMessage::AgentConfiguration(
790 AgentEmbodimentConfigurationMessage::AgentConfigurationDetails(device_def),
791 ) => {
792 let device_regs = serde_json::to_value(device_def).unwrap_or_else(|_| {
793 tracing::warn!(
794 target: "feagi-agent",
795 "Failed to serialize AgentConfigurationDetails to JSON"
796 );
797 serde_json::Value::Object(serde_json::Map::new())
798 });
799 self.set_device_registrations_by_agent(agent_id, device_regs.clone());
800 if let Some((descriptor, _)) = self.all_registered_agents.get(&agent_id) {
801 self.set_device_registrations_by_descriptor(
802 agent_id.to_base64(),
803 descriptor.clone(),
804 device_regs,
805 );
806 }
807 debug!(
808 target: "feagi-agent",
809 "Stored device registrations for agent {}",
810 agent_id.to_base64()
811 );
812 self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
814 Ok(None)
815 }
816 _ => {
817 Ok(Some((agent_id, message)))
819 }
820 }
821 }
822
823 fn register_agent(
828 &mut self,
829 agent_id: AgentID,
830 wanted_protocol: TransportProtocolImplementation,
831 agent_capabilities: Vec<AgentCapabilities>,
832 descriptor: AgentDescriptor,
833 command_server_index: CommandServerIndex,
834 ) -> Result<HashMap<AgentCapabilities, TransportProtocolEndpoint>, FeagiAgentError> {
835 let mut used_puller_indices: Vec<usize> = Vec::new();
846 let mut used_publisher_indices: Vec<usize> = Vec::new();
847 let mut sensor_servers: Vec<Box<dyn FeagiServerPuller>> = Vec::new();
848 let mut motor_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
849 let mut visualizer_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
850 let mut endpoint_mappings: HashMap<AgentCapabilities, TransportProtocolEndpoint> =
851 HashMap::new();
852
853 for agent_capability in &agent_capabilities {
855 match agent_capability {
856 AgentCapabilities::SendSensorData => {
857 let puller_property_index =
858 self.try_get_puller_property_index(&wanted_protocol)?;
859 let puller_property = &self.available_pullers[puller_property_index];
860 let mut sensor_server = puller_property.as_boxed_server_puller();
861 sensor_server.request_start()?;
862 sensor_servers.push(sensor_server);
863 endpoint_mappings.insert(
864 AgentCapabilities::SendSensorData,
865 puller_property.get_agent_endpoint(),
866 );
867 used_puller_indices.push(puller_property_index);
868 }
869 AgentCapabilities::ReceiveMotorData => {
870 let publisher_index =
871 self.try_get_publisher_property_index(&wanted_protocol)?;
872 let publisher_property = &self.available_publishers[publisher_index];
873 let mut publisher_server = publisher_property.as_boxed_server_publisher();
874 publisher_server.request_start()?;
875 motor_servers.push(publisher_server);
876 endpoint_mappings.insert(
877 AgentCapabilities::ReceiveMotorData,
878 publisher_property.get_agent_endpoint(),
879 );
880 used_publisher_indices.push(publisher_index);
881 }
882 AgentCapabilities::ReceiveNeuronVisualizations => {
883 let publisher_index =
886 self.try_get_last_publisher_property_index(&wanted_protocol)?;
887 let publisher_property = &self.available_publishers[publisher_index];
888 let mut publisher_server = publisher_property.as_boxed_server_publisher();
889 publisher_server.request_start()?;
890 visualizer_servers.push(publisher_server);
891 endpoint_mappings.insert(
892 AgentCapabilities::ReceiveNeuronVisualizations,
893 publisher_property.get_agent_endpoint(),
894 );
895 used_publisher_indices.push(publisher_index);
896 }
897 AgentCapabilities::ReceiveSystemMessages => {
898 todo!()
899 }
900 }
901 }
902
903 used_puller_indices.sort_unstable();
905 used_puller_indices.dedup();
906 for idx in used_puller_indices.into_iter().rev() {
907 self.available_pullers.remove(idx);
908 }
909
910 used_publisher_indices.sort_unstable();
911 used_publisher_indices.dedup();
912 for idx in used_publisher_indices.into_iter().rev() {
913 self.available_publishers.remove(idx);
914 }
915
916 for sensor_server in sensor_servers {
918 let sensor_translator: SensorTranslator =
919 SensorTranslator::new(agent_id, sensor_server);
920 self.sensors.insert(agent_id, sensor_translator);
921 }
922
923 for motor_server in motor_servers {
924 let motor_translator: MotorTranslator = MotorTranslator::new(agent_id, motor_server);
925 self.motors.insert(agent_id, motor_translator);
926 }
927
928 for visualizer_server in visualizer_servers {
929 let visualizer_translator: VisualizationTranslator =
930 VisualizationTranslator::new(agent_id, visualizer_server);
931 self.visualizations.insert(agent_id, visualizer_translator);
932 }
933
934 self.all_registered_agents
935 .insert(agent_id, (descriptor, agent_capabilities));
936 self.agent_mapping_to_command_control_server_index
937 .insert(agent_id, command_server_index);
938 self.last_activity_by_agent.insert(agent_id, Instant::now());
939
940 Ok(endpoint_mappings)
941 }
942
943 fn refresh_agent_activity(&mut self, agent_id: AgentID) {
948 self.last_activity_by_agent.insert(agent_id, Instant::now());
949 }
950
951 fn find_agent_id_by_descriptor(&self, descriptor: &AgentDescriptor) -> Option<AgentID> {
953 self.all_registered_agents
954 .iter()
955 .find_map(|(agent_id, (existing_descriptor, _))| {
956 if existing_descriptor == descriptor {
957 Some(*agent_id)
958 } else {
959 None
960 }
961 })
962 }
963
964 fn try_prune_stale_agents(&mut self) {
966 if self.last_stale_check_at.elapsed() < self.liveness_config.stale_check_interval {
967 return;
968 }
969 self.last_stale_check_at = Instant::now();
970
971 let stale_ids: Vec<AgentID> = self
972 .last_activity_by_agent
973 .iter()
974 .filter_map(|(agent_id, last_seen)| {
975 if last_seen.elapsed() > self.liveness_config.heartbeat_timeout {
976 Some(*agent_id)
977 } else {
978 None
979 }
980 })
981 .collect();
982
983 for stale_id in stale_ids {
984 let stale_reason = format!(
985 "stale heartbeat timeout exceeded ({:.3}s)",
986 self.liveness_config.heartbeat_timeout.as_secs_f64()
987 );
988 self.deregister_agent_internal(stale_id, &stale_reason);
989 }
990 }
991
992 fn deregister_agent_internal(&mut self, agent_id: AgentID, reason: &str) {
997 self.last_activity_by_agent.remove(&agent_id);
998 self.agent_mapping_to_command_control_server_index
999 .remove(&agent_id);
1000 let descriptor = self
1001 .all_registered_agents
1002 .remove(&agent_id)
1003 .map(|(descriptor, _)| descriptor);
1004 let descriptor_text = descriptor
1005 .as_ref()
1006 .map(|item| format!("{:?}", item))
1007 .unwrap_or_else(|| "<unknown-descriptor>".to_string());
1008 info!(
1009 target: "feagi-agent",
1010 "Agent deregistered: agent_id={} descriptor={} reason={}",
1011 agent_id.to_base64(),
1012 descriptor_text,
1013 reason
1014 );
1015 self.device_registrations_by_agent.remove(&agent_id);
1016
1017 if let Some(sensor) = self.sensors.remove(&agent_id) {
1018 self.available_pullers.push(sensor.into_puller_properties());
1019 }
1020 if let Some(motor) = self.motors.remove(&agent_id) {
1021 self.available_publishers
1022 .push(motor.into_publisher_properties());
1023 }
1024 if let Some(viz) = self.visualizations.remove(&agent_id) {
1025 self.available_publishers
1026 .push(viz.into_publisher_properties());
1027 }
1028
1029 if let Some(descriptor) = descriptor {
1030 self.agent_id_by_descriptor.remove(&descriptor);
1031 }
1035 }
1036
1037 }