use crate::command_and_control::agent_embodiment_configuration_message::AgentEmbodimentConfigurationMessage;
use crate::command_and_control::agent_registration_message::{
AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
};
use crate::command_and_control::FeagiMessage;
use crate::server::auth::AgentAuth;
use crate::server::wrappers::{
CommandControlWrapper, MotorTranslator, SensorTranslator, VisualizationTranslator,
};
use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
use feagi_io::traits_and_enums::server::{
FeagiServerPublisher, FeagiServerPublisherProperties, FeagiServerPuller,
FeagiServerPullerProperties, FeagiServerRouterProperties,
};
use feagi_io::traits_and_enums::shared::{
TransportProtocolEndpoint, TransportProtocolImplementation,
};
use feagi_io::AgentID;
use feagi_serialization::FeagiByteContainer;
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use tracing::{error, info, warn};
type CommandServerIndex = usize;
#[derive(Debug, Clone)]
pub struct AgentLivenessConfig {
pub heartbeat_timeout: Duration,
pub stale_check_interval: Duration,
}
impl Default for AgentLivenessConfig {
fn default() -> Self {
Self {
heartbeat_timeout: Duration::from_secs(30),
stale_check_interval: Duration::from_secs(1),
}
}
}
pub struct FeagiAgentHandler {
agent_auth_backend: Box<dyn AgentAuth>,
available_publishers: Vec<Box<dyn FeagiServerPublisherProperties>>,
available_pullers: Vec<Box<dyn FeagiServerPullerProperties>>,
command_control_servers: Vec<CommandControlWrapper>,
all_registered_agents: HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
agent_mapping_to_command_control_server_index: HashMap<AgentID, CommandServerIndex>,
last_activity_by_agent: HashMap<AgentID, Instant>,
sensors: HashMap<AgentID, SensorTranslator>,
motors: HashMap<AgentID, MotorTranslator>,
visualizations: HashMap<AgentID, VisualizationTranslator>,
sensor_poll_cursor: usize,
liveness_config: AgentLivenessConfig,
last_stale_check_at: Instant,
device_registrations_by_descriptor: HashMap<AgentDescriptor, serde_json::Value>,
agent_id_by_descriptor: HashMap<AgentDescriptor, String>,
device_registrations_by_agent: HashMap<AgentID, serde_json::Value>,
}
impl FeagiAgentHandler {
#[allow(dead_code)]
fn capabilities_equivalent(
existing: &[AgentCapabilities],
requested: &[AgentCapabilities],
) -> bool {
existing.len() == requested.len()
&& existing
.iter()
.all(|capability| requested.contains(capability))
}
#[allow(dead_code)]
fn should_replace_existing_descriptor_session(&self, existing_agent_id: AgentID) -> bool {
let Some(last_seen) = self.last_activity_by_agent.get(&existing_agent_id) else {
return true;
};
let duplicate_guard_window = self
.liveness_config
.stale_check_interval
.checked_mul(2)
.unwrap_or(self.liveness_config.stale_check_interval);
last_seen.elapsed() > duplicate_guard_window
}
pub fn new(agent_auth_backend: Box<dyn AgentAuth>) -> FeagiAgentHandler {
Self::new_with_liveness_config(agent_auth_backend, AgentLivenessConfig::default())
}
pub fn new_with_liveness_config(
agent_auth_backend: Box<dyn AgentAuth>,
liveness_config: AgentLivenessConfig,
) -> FeagiAgentHandler {
FeagiAgentHandler {
agent_auth_backend,
available_publishers: Vec::new(),
available_pullers: Vec::new(),
command_control_servers: Vec::new(),
all_registered_agents: HashMap::new(),
agent_mapping_to_command_control_server_index: HashMap::new(),
last_activity_by_agent: HashMap::new(),
sensors: Default::default(),
motors: Default::default(),
visualizations: Default::default(),
sensor_poll_cursor: 0,
liveness_config,
last_stale_check_at: Instant::now(),
device_registrations_by_descriptor: HashMap::new(),
agent_id_by_descriptor: HashMap::new(),
device_registrations_by_agent: HashMap::new(),
}
}
pub fn get_all_registered_agents(
&self,
) -> &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)> {
&self.all_registered_agents
}
pub fn get_all_registered_sensors(&self) -> HashSet<AgentID> {
self.sensors.keys().cloned().collect()
}
pub fn get_all_registered_motors(&self) -> HashSet<AgentID> {
self.motors.keys().cloned().collect()
}
pub fn get_all_registered_visualizations(&self) -> HashSet<AgentID> {
self.visualizations.keys().cloned().collect()
}
pub fn register_logical_agent(
&mut self,
agent_id: AgentID,
descriptor: AgentDescriptor,
capabilities: Vec<AgentCapabilities>,
) {
self.all_registered_agents
.insert(agent_id, (descriptor, capabilities));
self.last_activity_by_agent.insert(agent_id, Instant::now());
}
pub fn force_deregister_all_agents(&mut self, reason: &str) -> Vec<String> {
let ids: Vec<AgentID> = self.all_registered_agents.keys().copied().collect();
let mut removed_ids = Vec::with_capacity(ids.len());
for agent_id in ids {
removed_ids.push(agent_id.to_base64());
self.deregister_agent_internal(agent_id, reason);
}
removed_ids
}
pub fn get_command_control_server_info(&self) -> Vec<Box<dyn FeagiServerRouterProperties>> {
let mut output: Vec<Box<dyn FeagiServerRouterProperties>> = Vec::new();
for command_control_server in &self.command_control_servers {
output.push(command_control_server.get_running_server_properties())
}
output
}
pub fn get_device_registrations_by_agent(
&self,
agent_id: AgentID,
) -> Option<&serde_json::Value> {
self.device_registrations_by_agent.get(&agent_id)
}
pub fn set_device_registrations_by_descriptor(
&mut self,
agent_id_base64: String,
agent_descriptor: AgentDescriptor,
device_registrations: serde_json::Value,
) {
self.device_registrations_by_descriptor
.insert(agent_descriptor.clone(), device_registrations);
self.agent_id_by_descriptor
.insert(agent_descriptor, agent_id_base64);
}
pub fn get_device_registrations_by_descriptor(
&self,
agent_descriptor: &AgentDescriptor,
) -> Option<&serde_json::Value> {
self.device_registrations_by_descriptor
.get(agent_descriptor)
}
pub fn set_device_registrations_by_agent(
&mut self,
agent_id: AgentID,
device_registrations: serde_json::Value,
) {
self.device_registrations_by_agent
.insert(agent_id, device_registrations);
}
pub fn get_visualization_info_for_agent(&self, agent_id: AgentID) -> Option<(String, f64)> {
let device_regs = self.device_registrations_by_agent.get(&agent_id)?;
let viz = device_regs.get("visualization")?;
let rate_hz = viz.get("rate_hz").and_then(|v| v.as_f64())?;
if rate_hz > 0.0 {
let agent_descriptor = self.all_registered_agents.get(&agent_id)?;
let agent_id = self
.agent_id_by_descriptor
.get(&agent_descriptor.0)?
.clone();
Some((agent_id, rate_hz))
} else {
None
}
}
pub fn add_and_start_command_control_server(
&mut self,
router_property: Box<dyn FeagiServerRouterProperties>,
) -> Result<(), FeagiAgentError> {
let mut router = router_property.as_boxed_server_router();
router.request_start()?;
let translator = CommandControlWrapper::new(router);
self.command_control_servers.push(translator);
Ok(())
}
pub fn add_publisher_server(&mut self, publisher: Box<dyn FeagiServerPublisherProperties>) {
self.available_publishers.push(publisher);
}
pub fn add_puller_server(&mut self, puller: Box<dyn FeagiServerPullerProperties>) {
self.available_pullers.push(puller);
}
pub fn poll_command_and_control(
&mut self,
) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
self.try_prune_stale_agents();
for (command_index, translator) in self.command_control_servers.iter_mut().enumerate() {
let possible_message =
translator.poll_for_incoming_messages(&self.all_registered_agents)?;
match possible_message {
None => {
continue;
}
Some((agent_id, message, is_new_agent)) => {
if is_new_agent {
return self.handle_messages_from_unknown_agent_ids(
agent_id,
&message,
command_index,
);
} else {
return self.handle_messages_from_known_agent_ids(agent_id, message);
}
}
}
}
Ok(None)
}
pub fn send_message_to_agent(
&mut self,
agent_id: AgentID,
message: FeagiMessage,
increment_counter: u16,
) -> Result<(), FeagiAgentError> {
let translator_index = match self
.agent_mapping_to_command_control_server_index
.get(&agent_id)
{
None => {
return Err(FeagiAgentError::Other(
"No such Agent ID exists!".to_string(),
))
}
Some(index) => index,
};
let command_translator = match self.command_control_servers.get_mut(*translator_index) {
None => {
panic!("Missing Index for command control server!") }
Some(translator) => translator,
};
command_translator.send_message(agent_id, message, increment_counter)
}
fn send_message_via_command_server(
&mut self,
command_server_index: CommandServerIndex,
session_id: AgentID,
message: FeagiMessage,
increment_counter: u16,
) -> Result<(), FeagiAgentError> {
let command_translator = self
.command_control_servers
.get_mut(command_server_index)
.ok_or_else(|| {
FeagiAgentError::Other("Missing command control server index".to_string())
})?;
command_translator.send_message(session_id, message, increment_counter)
}
pub fn send_motor_data_to_agent(
&mut self,
agent_id: AgentID,
data: &FeagiByteContainer,
) -> Result<(), FeagiAgentError> {
let motor_translator = self
.motors
.get_mut(&agent_id)
.ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
motor_translator.poll_and_send_buffered_motor_data(data)?;
self.refresh_agent_activity(agent_id);
Ok(())
}
pub fn send_visualization_data_to_agent(
&mut self,
agent_id: AgentID,
data: &FeagiByteContainer,
) -> Result<(), FeagiAgentError> {
let visualization_translator = self
.visualizations
.get_mut(&agent_id)
.ok_or_else(|| FeagiAgentError::Other("No Agent ID exists!".to_string()))?;
visualization_translator.poll_and_send_visualization_data(data)?;
self.refresh_agent_activity(agent_id);
Ok(())
}
pub fn poll_agent_sensors(&mut self) -> Result<Option<FeagiByteContainer>, FeagiAgentError> {
let mut sensor_ids: Vec<AgentID> = self.sensors.keys().copied().collect();
if sensor_ids.is_empty() {
return Ok(None);
}
sensor_ids.sort_by_key(|agent_id| agent_id.to_base64());
let count = sensor_ids.len();
let start = self.sensor_poll_cursor % count;
for offset in 0..count {
let idx = (start + offset) % count;
let agent_id = sensor_ids[idx];
let polled_data = if let Some(translator) = self.sensors.get_mut(&agent_id) {
translator.poll_sensor_server()?.cloned()
} else {
None
};
if let Some(data) = polled_data {
self.sensor_poll_cursor = (idx + 1) % count;
self.refresh_agent_activity(agent_id);
return Ok(Some(data));
}
}
self.sensor_poll_cursor = (start + 1) % count;
Ok(None)
}
pub fn poll_agent_motors(&mut self) -> Result<(), FeagiAgentError> {
for (_id, translator) in self.motors.iter_mut() {
translator.poll_motor_server()?;
}
Ok(())
}
pub fn poll_agent_visualizers(&mut self) -> Result<(), FeagiAgentError> {
for (_id, translator) in self.visualizations.iter_mut() {
translator.poll_visualization_server()?;
}
Ok(())
}
pub fn send_motor_data(
&mut self,
agent_id: AgentID,
motor_data: &FeagiByteContainer,
) -> Result<(), FeagiAgentError> {
let embodiment_option = self.motors.get_mut(&agent_id);
match embodiment_option {
Some(embodiment) => {
embodiment.poll_and_send_buffered_motor_data(motor_data)?;
self.refresh_agent_activity(agent_id);
Ok(())
}
None => Err(FeagiAgentError::UnableToSendData(
"Nonexistant Agent ID!".to_string(),
)),
}
}
pub fn send_visualization_data(
&mut self,
agent_id: AgentID,
viz_data: &FeagiByteContainer,
) -> Result<(), FeagiAgentError> {
let embodiment_option = self.visualizations.get_mut(&agent_id);
match embodiment_option {
Some(embodiment) => {
embodiment.poll_and_send_visualization_data(viz_data)?;
self.refresh_agent_activity(agent_id);
Ok(())
}
None => Err(FeagiAgentError::UnableToSendData(
"Nonexistant Agent ID!".to_string(),
)),
}
}
fn try_get_puller_property_index(
&mut self,
wanted_protocol: &TransportProtocolImplementation,
) -> Result<usize, FeagiAgentError> {
for i in 0..self.available_pullers.len() {
let available_puller = &self.available_pullers[i];
if &available_puller
.get_bind_point()
.as_transport_protocol_implementation()
!= wanted_protocol
{
continue;
} else {
return Ok(i);
}
}
Err(FeagiAgentError::InitFail(
"Missing required protocol puller".to_string(),
))
}
fn try_get_publisher_property_index(
&mut self,
wanted_protocol: &TransportProtocolImplementation,
) -> Result<usize, FeagiAgentError> {
for i in 0..self.available_publishers.len() {
let available_publisher = &self.available_publishers[i];
if &available_publisher.get_protocol() != wanted_protocol {
continue;
} else {
return Ok(i);
}
}
Err(FeagiAgentError::InitFail(
"Missing required protocol publisher".to_string(),
))
}
fn try_get_last_publisher_property_index(
&mut self,
wanted_protocol: &TransportProtocolImplementation,
) -> Result<usize, FeagiAgentError> {
for i in (0..self.available_publishers.len()).rev() {
let available_publisher = &self.available_publishers[i];
if &available_publisher.get_protocol() != wanted_protocol {
continue;
} else {
return Ok(i);
}
}
Err(FeagiAgentError::InitFail(
"Missing required protocol publisher".to_string(),
))
}
fn handle_messages_from_unknown_agent_ids(
&mut self,
agent_id: AgentID,
message: &FeagiMessage,
command_control_index: CommandServerIndex,
) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
match &message {
FeagiMessage::AgentRegistration(register_message) => {
match ®ister_message {
AgentRegistrationMessage::ClientRequestRegistration(registration_request) => {
info!(
target: "feagi-agent",
"WS registration request received: session={} descriptor={:?} caps={:?} protocol={:?}",
agent_id.to_base64(),
registration_request.agent_descriptor(),
registration_request.requested_capabilities(),
registration_request.connection_protocol()
);
let auth_result = self
.agent_auth_backend
.verify_agent_allowed_to_connect(registration_request);
if auth_result.is_err() {
warn!(
target: "feagi-agent",
"WS registration rejected by auth backend: session={} descriptor={:?}",
agent_id.to_base64(),
registration_request.agent_descriptor()
);
self.send_message_via_command_server(
command_control_index,
agent_id,
FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsRegistration(
RegistrationResponse::FailedInvalidAuth,
),
),
0,
)?;
return Ok(None);
}
if let Some(existing_agent_id) = self
.find_agent_id_by_descriptor(registration_request.agent_descriptor())
{
if let Some((_, existing_capabilities)) =
self.all_registered_agents.get(&existing_agent_id)
{
if !Self::capabilities_equivalent(
existing_capabilities,
registration_request.requested_capabilities(),
) {
info!(
target: "feagi-agent",
"Rejecting descriptor-collision registration for {:?}: existing session {} has different capabilities",
registration_request.agent_descriptor(),
existing_agent_id.to_base64()
);
self.send_message_via_command_server(
command_control_index,
agent_id,
FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsRegistration(
RegistrationResponse::AlreadyRegistered,
),
),
0,
)?;
return Ok(None);
}
}
if !self.should_replace_existing_descriptor_session(existing_agent_id) {
info!(
target: "feagi-agent",
"Ignoring duplicate registration for descriptor {:?}: existing session {} remains active",
registration_request.agent_descriptor(),
existing_agent_id.to_base64()
);
self.send_message_via_command_server(
command_control_index,
agent_id,
FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsRegistration(
RegistrationResponse::AlreadyRegistered,
),
),
0,
)?;
return Ok(None);
}
let replacement_reason = format!(
"descriptor replacement by new registration session={}",
agent_id.to_base64()
);
self.deregister_agent_internal(existing_agent_id, &replacement_reason);
}
let mappings = match self.register_agent(
agent_id,
*registration_request.connection_protocol(),
registration_request.requested_capabilities().to_vec(),
registration_request.agent_descriptor().clone(),
command_control_index,
) {
Ok(mappings) => mappings,
Err(_) => {
error!(
target: "feagi-agent",
"WS registration failed while creating transport mappings: session={} descriptor={:?}",
agent_id.to_base64(),
registration_request.agent_descriptor()
);
self.send_message_via_command_server(
command_control_index,
agent_id,
FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsRegistration(
RegistrationResponse::FailedInvalidRequest,
),
),
0,
)?;
return Ok(None);
}
};
let mapped_caps: Vec<_> = mappings.keys().cloned().collect();
let response = RegistrationResponse::Success(agent_id, mappings);
let response_message = FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsRegistration(response),
);
self.send_message_via_command_server(
command_control_index,
agent_id,
response_message,
0,
)?;
info!(
target: "feagi-agent",
"WS registration success response sent: session={} descriptor={:?} mapped_caps={:?}",
agent_id.to_base64(),
registration_request.agent_descriptor(),
mapped_caps
);
Ok(None)
}
AgentRegistrationMessage::ClientRequestDeregistration(_) => {
let response = FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsDeregistration(
DeregistrationResponse::NotRegistered,
),
);
self.send_message_via_command_server(
command_control_index,
agent_id,
response,
0,
)?;
Ok(None)
}
_ => {
Ok(None)
}
}
}
_ => {
Ok(None)
}
}
}
fn handle_messages_from_known_agent_ids(
&mut self,
agent_id: AgentID,
message: FeagiMessage,
) -> Result<Option<(AgentID, FeagiMessage)>, FeagiAgentError> {
self.refresh_agent_activity(agent_id);
match &message {
FeagiMessage::AgentRegistration(register_message) => {
match register_message {
AgentRegistrationMessage::ClientRequestDeregistration(request) => {
self.send_message_to_agent(
agent_id,
FeagiMessage::AgentRegistration(
AgentRegistrationMessage::ServerRespondsDeregistration(
DeregistrationResponse::Success,
),
),
0,
)?;
let dereg_reason = request
.reason()
.map(|text| format!("client request: {}", text))
.unwrap_or_else(|| "client request".to_string());
self.deregister_agent_internal(agent_id, &dereg_reason);
Ok(None)
}
_ => {
Ok(None)
}
}
}
FeagiMessage::HeartBeat => {
self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
Ok(None)
}
FeagiMessage::AgentConfiguration(
AgentEmbodimentConfigurationMessage::AgentConfigurationDetails(device_def),
) => {
let device_regs = serde_json::to_value(device_def).unwrap_or_else(|_| {
tracing::warn!(
target: "feagi-agent",
"Failed to serialize AgentConfigurationDetails to JSON"
);
serde_json::Value::Object(serde_json::Map::new())
});
self.set_device_registrations_by_agent(agent_id, device_regs.clone());
if let Some((descriptor, _)) = self.all_registered_agents.get(&agent_id) {
self.set_device_registrations_by_descriptor(
agent_id.to_base64(),
descriptor.clone(),
device_regs,
);
}
info!(
target: "feagi-agent",
"Stored device registrations for agent {}",
agent_id.to_base64()
);
self.send_message_to_agent(agent_id, FeagiMessage::HeartBeat, 0)?;
Ok(None)
}
_ => {
Ok(Some((agent_id, message)))
}
}
}
fn register_agent(
&mut self,
agent_id: AgentID,
wanted_protocol: TransportProtocolImplementation,
agent_capabilities: Vec<AgentCapabilities>,
descriptor: AgentDescriptor,
command_server_index: CommandServerIndex,
) -> Result<HashMap<AgentCapabilities, TransportProtocolEndpoint>, FeagiAgentError> {
let mut used_puller_indices: Vec<usize> = Vec::new();
let mut used_publisher_indices: Vec<usize> = Vec::new();
let mut sensor_servers: Vec<Box<dyn FeagiServerPuller>> = Vec::new();
let mut motor_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
let mut visualizer_servers: Vec<Box<dyn FeagiServerPublisher>> = Vec::new();
let mut endpoint_mappings: HashMap<AgentCapabilities, TransportProtocolEndpoint> =
HashMap::new();
for agent_capability in &agent_capabilities {
match agent_capability {
AgentCapabilities::SendSensorData => {
let puller_property_index =
self.try_get_puller_property_index(&wanted_protocol)?;
let puller_property = &self.available_pullers[puller_property_index];
let mut sensor_server = puller_property.as_boxed_server_puller();
sensor_server.request_start()?;
sensor_servers.push(sensor_server);
endpoint_mappings.insert(
AgentCapabilities::SendSensorData,
puller_property.get_agent_endpoint(),
);
used_puller_indices.push(puller_property_index);
}
AgentCapabilities::ReceiveMotorData => {
let publisher_index =
self.try_get_publisher_property_index(&wanted_protocol)?;
let publisher_property = &self.available_publishers[publisher_index];
let mut publisher_server = publisher_property.as_boxed_server_publisher();
publisher_server.request_start()?;
motor_servers.push(publisher_server);
endpoint_mappings.insert(
AgentCapabilities::ReceiveMotorData,
publisher_property.get_agent_endpoint(),
);
used_publisher_indices.push(publisher_index);
}
AgentCapabilities::ReceiveNeuronVisualizations => {
let publisher_index =
self.try_get_last_publisher_property_index(&wanted_protocol)?;
let publisher_property = &self.available_publishers[publisher_index];
let mut publisher_server = publisher_property.as_boxed_server_publisher();
publisher_server.request_start()?;
visualizer_servers.push(publisher_server);
endpoint_mappings.insert(
AgentCapabilities::ReceiveNeuronVisualizations,
publisher_property.get_agent_endpoint(),
);
used_publisher_indices.push(publisher_index);
}
AgentCapabilities::ReceiveSystemMessages => {
todo!()
}
}
}
used_puller_indices.sort_unstable();
used_puller_indices.dedup();
for idx in used_puller_indices.into_iter().rev() {
self.available_pullers.remove(idx);
}
used_publisher_indices.sort_unstable();
used_publisher_indices.dedup();
for idx in used_publisher_indices.into_iter().rev() {
self.available_publishers.remove(idx);
}
for sensor_server in sensor_servers {
let sensor_translator: SensorTranslator =
SensorTranslator::new(agent_id, sensor_server);
self.sensors.insert(agent_id, sensor_translator);
}
for motor_server in motor_servers {
let motor_translator: MotorTranslator = MotorTranslator::new(agent_id, motor_server);
self.motors.insert(agent_id, motor_translator);
}
for visualizer_server in visualizer_servers {
let visualizer_translator: VisualizationTranslator =
VisualizationTranslator::new(agent_id, visualizer_server);
self.visualizations.insert(agent_id, visualizer_translator);
}
self.all_registered_agents
.insert(agent_id, (descriptor, agent_capabilities));
self.agent_mapping_to_command_control_server_index
.insert(agent_id, command_server_index);
self.last_activity_by_agent.insert(agent_id, Instant::now());
Ok(endpoint_mappings)
}
fn refresh_agent_activity(&mut self, agent_id: AgentID) {
self.last_activity_by_agent.insert(agent_id, Instant::now());
}
fn find_agent_id_by_descriptor(&self, descriptor: &AgentDescriptor) -> Option<AgentID> {
self.all_registered_agents
.iter()
.find_map(|(agent_id, (existing_descriptor, _))| {
if existing_descriptor == descriptor {
Some(*agent_id)
} else {
None
}
})
}
fn try_prune_stale_agents(&mut self) {
if self.last_stale_check_at.elapsed() < self.liveness_config.stale_check_interval {
return;
}
self.last_stale_check_at = Instant::now();
let stale_ids: Vec<AgentID> = self
.last_activity_by_agent
.iter()
.filter_map(|(agent_id, last_seen)| {
if last_seen.elapsed() > self.liveness_config.heartbeat_timeout {
Some(*agent_id)
} else {
None
}
})
.collect();
for stale_id in stale_ids {
let stale_reason = format!(
"stale heartbeat timeout exceeded ({:.3}s)",
self.liveness_config.heartbeat_timeout.as_secs_f64()
);
self.deregister_agent_internal(stale_id, &stale_reason);
}
}
fn deregister_agent_internal(&mut self, agent_id: AgentID, reason: &str) {
self.last_activity_by_agent.remove(&agent_id);
self.agent_mapping_to_command_control_server_index
.remove(&agent_id);
let descriptor = self
.all_registered_agents
.remove(&agent_id)
.map(|(descriptor, _)| descriptor);
let descriptor_text = descriptor
.as_ref()
.map(|item| format!("{:?}", item))
.unwrap_or_else(|| "<unknown-descriptor>".to_string());
info!(
target: "feagi-agent",
"Agent deregistered: agent_id={} descriptor={} reason={}",
agent_id.to_base64(),
descriptor_text,
reason
);
self.device_registrations_by_agent.remove(&agent_id);
if let Some(sensor) = self.sensors.remove(&agent_id) {
self.available_pullers.push(sensor.into_puller_properties());
}
if let Some(motor) = self.motors.remove(&agent_id) {
self.available_publishers
.push(motor.into_publisher_properties());
}
if let Some(viz) = self.visualizations.remove(&agent_id) {
self.available_publishers
.push(viz.into_publisher_properties());
}
if let Some(descriptor) = descriptor {
self.agent_id_by_descriptor.remove(&descriptor);
}
}
}