use ::std::borrow::Borrow;
use std::time::{Duration, Instant};
#[cfg(feature = "mqtt")]
pub mod mqtt;
pub mod transport;
pub mod transport_messages;
use futures::join;
use yaaral::TaskInterface as _;
use yaaral::time::TimeInterface as _;
use crate::prelude::*;
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CFP
{
pub uuid: uuid::Uuid,
pub requester_uri: String,
pub team_uri: Option<String>,
pub task_type: String,
pub task_description: String,
pub signature: Vec<u8>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub struct Proposal
{
pub agent_uri: String,
pub requester_uri: String,
pub cost: f32,
pub task_uuid: uuid::Uuid,
pub signature: Vec<u8>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub enum Status
{
SendCFP,
ReceivedProposals(usize),
ProposalAccepted,
NoReceivedProposals,
NoAcceptedProposal,
ExecutionStarted,
NodeExecutionStarted
{
node: uuid::Uuid,
},
NodeExecutionCompleted
{
node: uuid::Uuid,
},
}
pub trait Task: Sized
{
fn from_description(task_type: impl AsRef<str>, description: impl AsRef<str>) -> Result<Self>;
fn task_type(&self) -> &str;
fn to_description(&self) -> String;
fn task_id(&self) -> uuid::Uuid;
}
#[derive(Clone)]
pub enum InputMessage
{
StartDelegation
{
cfp: CFP,
},
}
impl InputMessage
{
pub(crate) fn create_start_delegation(
requester_uri: String,
task: impl Task,
team_uri: Option<String>,
) -> InputMessage
{
InputMessage::StartDelegation {
cfp: CFP {
uuid: task.task_id(),
requester_uri,
team_uri,
task_type: task.task_type().into(),
task_description: task.to_description(),
signature: Vec::<u8>::default(),
},
}
}
}
#[derive(Clone)]
pub enum OutputMessage
{
Status
{
uuid: uuid::Uuid,
status: Status,
},
}
pub type ModuleInterface = module::ModuleInterface<InputMessage, OutputMessage>;
module::create_module_private_interface!(ModulePrivateInterface, InputMessage, OutputMessage);
pub(crate) struct Module {}
impl Module
{
pub(crate) async fn start(
agent_data: agent::AgentData,
module_interfaces: (
crate::module::ModuleInterface<InputMessage, OutputMessage>,
ModulePrivateInterface,
),
transport_interface: transport::ModuleInterface,
decision_interface: decision::ModuleInterface,
) -> ()
{
let (_, module_private_interface) = module_interfaces;
let mut input_receiver = module_private_interface.input_receiver.activate();
let output_sender = module_private_interface.output_sender;
let transport_interface = transport_interface;
let ir_fut = {
let agent_data = agent_data.clone();
let transport_interface = transport_interface.clone();
async move {
loop
{
let msg = input_receiver.recv().await;
if let Ok(msg) = msg
{
match msg
{
InputMessage::StartDelegation { cfp } =>
{
let output_sender = output_sender.clone();
let transport_input_sender = transport_interface.input_sender();
let mut transport_output_receiver = transport_interface.output_receiver();
let agent_data_agent_uri = agent_data.agent_uri.to_owned();
let delegation = async move {
let cfp_uuid = cfp.uuid.to_owned();
match transport_input_sender
.broadcast_direct(transport::InputMessage::SendCFP { cfp })
.await
{
Ok(_) =>
{}
Err(e) => log::error!("While sending CFP: {:?}", e.to_string()),
}
let mut proposals = Vec::<delegation::Proposal>::new();
let start = Instant::now();
while start.elapsed().as_secs() < 10
{
let msg = crate::Runtime::timeout(
Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
transport_output_receiver.recv(),
)
.await;
if let Ok(Ok(transport::OutputMessage::ReceivedProposal { proposal })) = msg
&& proposal.task_uuid == cfp_uuid
{
proposals.push(proposal);
}
}
log::info!(
"Received {} proposals for delegation {:?}",
proposals.len(),
cfp_uuid
);
proposals.sort_by(|a, b| a.cost.partial_cmp(b.cost.borrow()).unwrap());
if proposals.is_empty()
{
log::error!("No proposals received.");
ccutils::log_error!(
output_sender
.broadcast_direct(OutputMessage::Status {
uuid: cfp_uuid,
status: Status::NoReceivedProposals,
})
.await,
"sending no received proposals"
);
return;
}
else
{
ccutils::log::log_error!(
output_sender
.broadcast(OutputMessage::Status {
uuid: cfp_uuid.to_owned(),
status: Status::ReceivedProposals(proposals.len()),
})
.await,
"sending status"
);
}
for p in proposals
{
ccutils::log::log_error!(
transport_input_sender
.broadcast(transport::InputMessage::SendProposalAcceptance {
acceptance: delegation::transport_messages::Acceptance {
agent_uri: p.agent_uri.to_owned(),
requester_uri: agent_data_agent_uri.to_owned(),
acceptance: true,
uuid: p.task_uuid,
signature: Default::default(),
},
})
.await,
"sending proposal acceptance"
);
let start = Instant::now();
while start.elapsed().as_secs() < 10
{
let msg = crate::Runtime::timeout(
Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
transport_output_receiver.recv(),
)
.await;
if let Ok(Ok(transport::OutputMessage::ReceivedExecutionAccepted {
acceptance,
})) = msg
&& acceptance.uuid == cfp_uuid
{
if acceptance.acceptance
{
ccutils::log::log_error!(
output_sender
.broadcast(OutputMessage::Status {
uuid: cfp_uuid,
status: Status::ProposalAccepted,
})
.await,
"sending proposal accepted"
);
return;
}
else
{
break;
}
}
}
ccutils::log::log_error!(
transport_input_sender
.broadcast(transport::InputMessage::SendCancelAcceptance {
cancel_acceptance: transport_messages::CancelAcceptance {
agent_uri: p.agent_uri,
uuid: p.task_uuid,
signature: Default::default(),
},
})
.await,
"sending acceptance cancellation"
);
}
ccutils::log::log_error!(
output_sender
.broadcast(OutputMessage::Status {
uuid: cfp_uuid.to_owned(),
status: Status::NoAcceptedProposal,
})
.await,
"sending no accepted proposal"
);
};
ccutils::log::log_error!(
agent_data.async_runtime.spawn_task(delegation).detach(),
"spawning delergation task"
);
}
}
}
else
{
return;
}
}
}
};
let tr_fut = {
let agent_data = agent_data.clone();
let decision_input_sender = decision_interface.input_sender();
let mut transport_output_receiver = transport_interface.output_receiver();
async move {
loop
{
match transport_output_receiver.recv().await
{
Ok(msg) => match msg
{
transport::OutputMessage::ReceivedCFP { cfp } =>
{
ccutils::log::log_error!(
decision_input_sender
.broadcast(decision::InputMessage::DecideCFPAcceptance { cfp })
.await,
"sending cfp acceptance"
);
}
transport::OutputMessage::ReceivedProposalAccepted { acceptance } =>
{
if acceptance.agent_uri == agent_data.agent_uri
{
ccutils::log::log_error!(
decision_input_sender
.broadcast(decision::InputMessage::QueueExecution {
uuid: acceptance.uuid,
requester_uri: acceptance.requester_uri,
})
.await,
"sending queue execution"
);
}
}
transport::OutputMessage::ReceivedCancelAcceptance { cancel_acceptance } =>
{
if cancel_acceptance.agent_uri == agent_data.agent_uri
{
ccutils::log::log_error!(
decision_input_sender
.broadcast(decision::InputMessage::CancelExecution {
uuid: cancel_acceptance.uuid,
})
.await,
"sending cancel execution"
);
}
}
_ =>
{}
},
Err(_) =>
{
return;
}
}
}
}
};
let dr_fut = {
let mut decision_output_receiver = decision_interface.output_receiver();
let transport_input_sender = transport_interface.input_sender();
async move {
loop
{
match decision_output_receiver.recv().await
{
Ok(msg) => match msg
{
decision::OutputMessage::CFPProposal { proposal } =>
{
ccutils::log_error!(
transport_input_sender
.broadcast(transport::InputMessage::SendProposal { proposal })
.await,
"sending proposal"
);
}
decision::OutputMessage::QueueExecutionResult {
uuid,
requester_uri,
accepted,
} =>
{
ccutils::log::log_error!(
transport_input_sender
.broadcast(transport::InputMessage::SendExecutionAcceptance {
acceptance: transport_messages::Acceptance {
agent_uri: agent_data.agent_uri.to_owned(),
requester_uri,
acceptance: accepted,
uuid,
signature: Default::default(),
},
})
.await,
"sending execution acceptance"
);
}
},
Err(_) =>
{
return;
}
}
}
}
};
join!(ir_fut, tr_fut, dr_fut);
}
}
impl crate::module::Module for Module
{
type InputMessage = InputMessage;
type OutputMessage = OutputMessage;
type ModulePrivateInterface = ModulePrivateInterface;
}