use crate::{Actor, ActorError, Result, Update, Who};
use async_trait::async_trait;
use std::fmt::Display;
use super::PlainActor;
#[async_trait]
pub trait Task: Display + Send {
async fn async_run(&mut self) -> Result<()>;
fn spawn(self) -> tokio::task::JoinHandle<()>;
fn check_inputs(&self) -> Result<()>;
fn check_outputs(&self) -> Result<()>;
async fn task(&mut self);
fn n_inputs(&self) -> usize;
fn n_outputs(&self) -> usize;
fn inputs_hashes(&self) -> Vec<u64>;
fn outputs_hashes(&self) -> Vec<u64>;
fn as_plain(&self) -> PlainActor;
}
#[async_trait]
impl<C, const NI: usize, const NO: usize> Task for Actor<C, NI, NO>
where
C: 'static + Update + Send,
{
fn spawn(mut self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
self.task().await;
})
}
async fn task(&mut self) {
match self.bootstrap().await {
Err(e) => crate::print_info(
format!("{} bootstrapping failed", Who::highlight(self)),
Some(&e),
),
Ok(_) => {
crate::print_info(
format!("{} loop started", Who::highlight(self)),
None::<&dyn std::error::Error>,
);
if let Err(e) = self.async_run().await {
crate::print_info(format!("{} loop ended", Who::highlight(self)), Some(&e));
}
}
}
}
async fn async_run(&mut self) -> Result<()> {
match (self.inputs.as_ref(), self.outputs.as_ref()) {
(Some(_), Some(_)) => {
if NO >= NI {
loop {
for _ in 0..NO / NI {
self.collect().await?.client.lock().await.update();
}
self.distribute().await?;
}
} else {
loop {
self.collect().await?.client.lock().await.update();
for _ in 0..NI / NO {
self.distribute().await?;
}
}
}
}
(None, Some(_)) => loop {
self.client.lock().await.update();
self.distribute().await?;
},
(Some(_), None) => loop {
self.collect().await?.client.lock().await.update();
},
(None, None) => Ok(()),
}
}
fn check_inputs(&self) -> Result<()> {
match self.inputs {
Some(_) if NI == 0 => Err(ActorError::SomeInputsZeroRate(Who::who(self))),
None if NI > 0 => Err(ActorError::NoInputsPositiveRate(Who::who(self))),
_ => Ok(()),
}
}
fn check_outputs(&self) -> Result<()> {
match self.outputs {
Some(_) if NO == 0 => Err(ActorError::SomeOutputsZeroRate(Who::who(self))),
None if NO > 0 => Err(ActorError::NoOutputsPositiveRate(Who::who(self))),
_ => Ok(()),
}
}
fn n_inputs(&self) -> usize {
self.inputs.as_ref().map_or(0, |i| i.len())
}
fn n_outputs(&self) -> usize {
self.outputs
.as_ref()
.map_or(0, |o| o.iter().map(|o| o.len()).sum())
}
fn inputs_hashes(&self) -> Vec<u64> {
self.inputs.as_ref().map_or(Vec::new(), |inputs| {
inputs.iter().map(|input| input.get_hash()).collect()
})
}
fn outputs_hashes(&self) -> Vec<u64> {
self.outputs.as_ref().map_or(Vec::new(), |outputs| {
outputs
.iter()
.flat_map(|output| vec![output.get_hash(); output.len()])
.collect()
})
}
fn as_plain(&self) -> PlainActor {
self.into()
}
}