use crate::command_and_control::agent_registration_message::RegistrationRequest;
use crate::command_and_control::FeagiMessage;
use crate::{AgentCapabilities, AgentDescriptor, FeagiAgentError};
use feagi_io::traits_and_enums::server::{FeagiServerRouter, FeagiServerRouterProperties};
use feagi_io::traits_and_enums::shared::FeagiEndpointState;
use feagi_io::{AgentID, FeagiNetworkError};
use feagi_serialization::FeagiByteContainer;
use std::collections::HashMap;
pub type IsNewSessionId = bool;
pub struct CommandControlWrapper {
router: Box<dyn FeagiServerRouter>,
request_buffer: FeagiByteContainer,
send_buffer: FeagiByteContainer,
}
impl CommandControlWrapper {
pub fn new(router: Box<dyn FeagiServerRouter>) -> Self {
Self {
router,
request_buffer: FeagiByteContainer::new_empty(),
send_buffer: FeagiByteContainer::new_empty(),
}
}
pub fn poll_for_incoming_messages(
&mut self,
known_session_ids: &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
) -> Result<Option<(AgentID, FeagiMessage, IsNewSessionId)>, FeagiAgentError> {
let state = self.router.poll().clone();
match state {
FeagiEndpointState::Inactive => Ok(None),
FeagiEndpointState::Pending => Ok(None),
FeagiEndpointState::ActiveWaiting => Ok(None),
FeagiEndpointState::ActiveHasData => {
self.process_incoming_data_into_message(known_session_ids)
}
FeagiEndpointState::Errored(error) => {
match error {
FeagiNetworkError::CannotBind(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
FeagiNetworkError::CannotUnbind(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
FeagiNetworkError::CannotConnect(err) => {
Err(FeagiAgentError::UnableToSendData(err.clone()))
}
FeagiNetworkError::CannotDisconnect(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
FeagiNetworkError::SendFailed(err) => {
Err(FeagiAgentError::UnableToSendData(err.clone()))
}
FeagiNetworkError::ReceiveFailed(err) => {
Err(FeagiAgentError::UnableToDecodeReceivedData(err.clone()))
}
FeagiNetworkError::InvalidSocketProperties(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
FeagiNetworkError::SocketCreationFailed(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
FeagiNetworkError::GeneralFailure(err) => {
self.router.confirm_error_and_close()?;
Err(FeagiAgentError::SocketFailure(err.clone()))
}
}
}
}
}
pub fn send_message(
&mut self,
session_id: AgentID,
message: FeagiMessage,
increment_counter: u16,
) -> Result<(), FeagiAgentError> {
let container = &mut self.send_buffer;
message.serialize_to_byte_container(container, session_id, increment_counter)?;
self.router
.publish_response(session_id, container.get_byte_ref())?;
Ok(())
}
pub fn get_running_server_properties(&self) -> Box<dyn FeagiServerRouterProperties> {
self.router.as_boxed_router_properties()
}
fn process_incoming_data_into_message(
&mut self,
known_session_ids: &HashMap<AgentID, (AgentDescriptor, Vec<AgentCapabilities>)>,
) -> Result<Option<(AgentID, FeagiMessage, IsNewSessionId)>, FeagiAgentError> {
let (session_id, incoming_data) = self.router.consume_retrieved_request()?;
let min_len = FeagiByteContainer::GLOBAL_BYTE_HEADER_BYTE_COUNT
+ FeagiByteContainer::AGENT_ID_BYTE_COUNT;
if incoming_data.len() < min_len {
tracing::warn!(
"Rejecting command/control payload: {} bytes (minimum {} for 48-byte AgentDescriptor)",
incoming_data.len(),
min_len
);
return Ok(None);
}
let is_new_session = !known_session_ids.contains_key(&session_id);
if is_new_session {
if incoming_data.len() > RegistrationRequest::MAX_REQUEST_SIZE {
return Ok(None);
}
}
self.request_buffer
.try_write_data_by_copy_and_verify(incoming_data)?; let feagi_message: FeagiMessage = (&self.request_buffer).try_into()?;
Ok(Some((session_id, feagi_message, is_new_session)))
}
}