use std::borrow::Borrow;
use std::os::unix::thread;
use futures::FutureExt;
use log::info;
use yaaral::RuntimeInterface;
use crate::delegation::transport;
use crate::module::Module;
use crate::{
decision, definitions, delegation, execution, knowledge_base, module, states, utils, Result,
};
use delegation::Task;
#[derive(Clone)]
pub struct AgentData
{
pub agent_uri: String,
pub async_runtime: yaaral::Runtime,
pub knowledge_base: Box<dyn knowledge_base::KnowledgeBase>,
pub states: states::SharedStates,
pub capabilities: definitions::agent::capabilities::Capabilities,
}
trait AgentDataTrait: Sync + Send {}
impl AgentDataTrait for AgentData {}
pub struct Agent
{
agent_data: AgentData,
delegation_interface:
module::ModuleInterface<delegation::InputMessage, delegation::OutputMessage>,
}
impl Agent
{
pub fn new<TTransport, TDecision, TExecution, TKnowledgeBase>(
agent_uri: String,
async_runtime: yaaral::Runtime,
knowledge_base: TKnowledgeBase,
states: crate::states::States,
capabilities: definitions::agent::capabilities::Capabilities,
transport_options: TTransport::Options,
decision_options: TDecision::Options,
executor_options: TExecution::Options,
) -> Result<Self>
where
TKnowledgeBase: knowledge_base::KnowledgeBase,
TTransport: delegation::transport::Module,
TDecision: decision::Module,
TExecution: execution::Module,
{
let agent_data = AgentData {
agent_uri: agent_uri,
async_runtime: async_runtime,
knowledge_base: Box::new(knowledge_base),
states: states::SharedStates::new(states),
capabilities,
};
let transport_interfaces = TTransport::prepare_interfaces(20);
let delegation_interfaces = delegation::Module::prepare_interfaces(20);
let decision_interfaces = TDecision::prepare_interfaces(20);
let executor_interfaces = TExecution::prepare_interfaces(20);
let transport_interface = transport_interfaces.0.clone();
let execution_interface = executor_interfaces.0.clone();
let delegation_interface = delegation_interfaces.0.clone();
let decision_interface = decision_interfaces.0.clone();
let transport_module = TTransport::start(
transport_interfaces,
delegation_interfaces.0.clone(),
transport_options,
)?;
let delegation_module = delegation::Module::start(
agent_data.to_owned(),
delegation_interfaces,
transport_interface,
decision_interface,
);
let decision_module = TDecision::start(
agent_data.to_owned(),
decision_interfaces,
execution_interface,
decision_options,
);
let execution_module =
TExecution::start(agent_data.to_owned(), executor_interfaces, executor_options);
agent_data.async_runtime.spawn_task(transport_module)?;
agent_data.async_runtime.spawn_task(delegation_module)?;
agent_data.async_runtime.spawn_task(decision_module)?;
agent_data.async_runtime.spawn_task(execution_module)?;
Ok(Self {
agent_data,
delegation_interface,
})
}
pub fn states_ref(&self) -> &crate::states::SharedStates
{
&self.agent_data.states
}
pub fn capabilities_ref(&self) -> &definitions::agent::capabilities::Capabilities
{
&self.agent_data.capabilities
}
pub fn delegate_task(
&self,
task: definitions::task::Task,
) -> impl std::future::Future<Output = Result<()>>
{
log::info!("Start delegation of task {:?}", task.task_id());
let agent_data = self.agent_data.clone();
let delegation_input_sender = self.delegation_interface.input_sender();
let mut delegation_output_recv = self.delegation_interface.output_receiver();
async move {
let task_uuid = task.task_id();
let fut = delegation_input_sender
.broadcast(delegation::InputMessage::create_start_delegation(
agent_data.agent_uri.to_owned(),
task,
None,
))
.await;
while let Ok(msg) = delegation_output_recv.recv().await
{
match msg
{
delegation::OutputMessage::Status { uuid, status } =>
{
if uuid == task_uuid
{
info!("Delegation {:?} received status {:?}", uuid, status);
}
}
}
}
Ok(())
}
}
}