use async_broadcast::Sender;
use futures::join;
use crate::definitions::task as definitions_task;
use crate::delegation::Task;
use crate::knowledge_base::KnowledgeBaseInterface;
use crate::{agent, delegation, execution, module, utils, uuid, Error, Result};
mod task_executor;
pub trait TaskExecutor: Sync + Send + 'static
{
fn execute_task(
&self,
task: definitions_task::Task,
) -> futures::future::BoxFuture<'static, Result<()>>;
fn can_execute(&self, task: &definitions_task::Task) -> bool;
}
pub struct Options
{
task_executor: Box<dyn TaskExecutor>,
}
impl Options
{
pub fn new<TTaskExecutor>(task_executor: TTaskExecutor) -> Self
where
TTaskExecutor: TaskExecutor,
{
Self {
task_executor: Box::new(task_executor),
}
}
}
module::create_module_private_interface!(
ModulePrivateInterface,
execution::InputMessage,
execution::OutputMessage
);
struct ModuleData
{
cancelled_tasks: utils::ArcMutex<Vec<uuid::Uuid>>,
task_sender: Sender<definitions_task::Task>,
}
pub struct Module {}
impl Module
{
async fn handle_input_message(
msg: execution::InputMessage,
agent_data: &agent::AgentData,
module_data: &ModuleData,
output_sender: &Sender<execution::OutputMessage>,
) -> Result<()>
{
match msg
{
execution::InputMessage::QueueExecution { uuid } =>
{
let task = agent_data
.knowledge_base
.retrieve::<definitions_task::Task>("tasks", uuid.to_hex())?;
module_data.task_sender.broadcast(task).await;
}
execution::InputMessage::CancelExecution { uuid } =>
{
module_data.cancelled_tasks.lock()?.push(uuid);
}
}
Ok(())
}
fn handle_task_execution(
task: definitions_task::Task,
cancelled_tasks: &utils::ArcMutex<Vec<uuid::Uuid>>,
task_executor: &Box<dyn TaskExecutor>,
) -> Result<Option<futures::future::BoxFuture<'static, Result<()>>>>
{
if cancelled_tasks.lock()?.contains(&task.task_id())
{
Ok(None)
}
else if (task_executor.can_execute(&task))
{
Ok(Some(task_executor.execute_task(task)))
}
else
{
Err(Error::NoExecutor())
}
}
}
impl execution::Module for Module
{
type Options = Options;
fn start(
agent_data: agent::AgentData,
module_interfaces: (
crate::module::ModuleInterface<execution::InputMessage, execution::OutputMessage>,
ModulePrivateInterface,
),
options: Options,
) -> impl std::future::Future<Output = ()> + std::marker::Send + 'static
{
async move {
let (task_sender, mut task_receiver) =
async_broadcast::broadcast::<definitions_task::Task>(20);
let cancelled_tasks: utils::ArcMutex<Vec<uuid::Uuid>> = Default::default();
let msg_fut = async {
let (module_interface, module_private_interface) = module_interfaces;
let mut input_receiver = module_private_interface.input_receiver.activate();
let output_sender = module_private_interface.output_sender;
let module_data = ModuleData {
cancelled_tasks: cancelled_tasks.to_owned(),
task_sender,
};
loop
{
let msg = input_receiver.recv().await;
if let Ok(msg) = msg
{
if let Err(e) =
Self::handle_input_message(msg, &agent_data, &module_data, &output_sender).await
{
log::error!(
"An error occured when handling input execution message: {} for agent {}",
e,
agent_data.agent_uri
);
}
}
else
{
return;
}
}
};
let exec_fut = async {
loop
{
let msg = task_receiver.recv().await;
if let Ok(msg) = msg
{
match Self::handle_task_execution(msg, &cancelled_tasks, &options.task_executor)
{
Ok(Some(fut)) =>
{
if let Err(e) = fut.await
{
log::error!(
"An error occured when executing a task: {} for agent {}",
e,
agent_data.agent_uri
);
}
}
Ok(None) =>
{}
Err(e) =>
{
log::error!(
"An error occured when handling task execution message: {} for agent {}",
e,
agent_data.agent_uri
);
}
}
}
else
{
return;
}
}
};
join!(msg_fut, exec_fut);
}
}
}
impl module::Module for Module
{
type InputMessage = execution::InputMessage;
type OutputMessage = execution::OutputMessage;
type ModulePrivateInterface = ModulePrivateInterface;
}