use async_broadcast::Sender;
use async_std::task;
use crate::definitions::agent as definitions_agent;
use crate::definitions::task as definitions_task;
use crate::delegation::Task as DelTask;
use crate::knowledge_base::{KnowledgeBase, KnowledgeBaseInterface};
use crate::{agent, decision, delegation, execution, knowledge_base, module, utils, Result};
pub struct Options {}
impl Options
{
pub fn new() -> Self
{
Self {}
}
}
module::create_module_private_interface!(
ModulePrivateInterface,
decision::InputMessage,
decision::OutputMessage
);
pub struct Module {}
impl Module
{
async fn handle_input_message(
msg: decision::InputMessage,
agent_data: &agent::AgentData,
output_sender: &Sender<decision::OutputMessage>,
execution_input_sender: &Sender<execution::InputMessage>,
) -> Result<()>
{
match msg
{
decision::InputMessage::DecideCFPAcceptance { cfp } =>
{
let task = definitions_task::Task::from_description(&cfp.task_type, &cfp.task_description)?;
agent_data
.knowledge_base
.insert("tasks", cfp.uuid.to_hex(), &task)?;
match task.get_container()
{
definitions_task::TaskContainer::Tst(tst) =>
{
let sr = crate::simulation::tst::simulate_execution(
agent_data.states.to_owned_states()?,
agent_data.capabilities.clone(),
tst.clone(),
)
.await?;
output_sender
.broadcast(decision::OutputMessage::CFPProposal {
proposal: delegation::Proposal {
agent_uri: agent_data.agent_uri.to_owned(),
cost: sr.get_estimated_cost(),
signature: Default::default(),
task_uuid: task.task_id(),
},
})
.await;
}
}
}
decision::InputMessage::QueueExecution { uuid } =>
{
execution_input_sender
.broadcast(execution::InputMessage::QueueExecution { uuid })
.await;
output_sender
.broadcast(decision::OutputMessage::QueueExecutionResult {
uuid,
accepted: true,
})
.await;
}
decision::InputMessage::CancelExecution { uuid } =>
{
execution_input_sender
.broadcast(execution::InputMessage::CancelExecution {
uuid: uuid.to_owned(),
})
.await;
}
}
Ok(())
}
}
impl decision::Module for Module
{
type Options = Options;
fn start(
agent_data: agent::AgentData,
module_interfaces: (decision::ModuleInterface, ModulePrivateInterface),
execution_interface: execution::ModuleInterface,
_: Options,
) -> impl std::future::Future<Output = ()> + std::marker::Send + 'static
{
let (module_interface, module_private_interface) = module_interfaces;
let mut input_receiver = module_private_interface.input_receiver.activate();
let output_sender = module_private_interface.output_sender;
let execution_input_sender = execution_interface.input_sender();
async move {
loop
{
let msg = input_receiver.recv().await;
if let Ok(msg) = msg
{
if let Err(e) =
Self::handle_input_message(msg, &agent_data, &output_sender, &execution_input_sender)
.await
{
log::error!(
"An error occured for agent {} when handling decision message: {}",
agent_data.agent_uri,
e
);
}
}
else
{
log::info!("Decision loop for {:?} is closing", agent_data.agent_uri);
return;
}
}
}
}
}
impl module::Module for Module
{
type InputMessage = decision::InputMessage;
type OutputMessage = decision::OutputMessage;
type ModulePrivateInterface = ModulePrivateInterface;
}