use super::{
io::{Input, InputObject, OutputObject},
plain::{PlainActor, IO},
};
use crate::{
framework::network::{ActorOutput, ActorOutputBuilder, AddActorInput, AddActorOutput},
Result,
};
use futures::{future::join_all, stream::FuturesUnordered};
use interface::{Data, Read, UniqueIdentifier, Update, Who};
use std::{
fmt::{self, Debug},
sync::Arc,
};
use tokio::sync::Mutex;
pub struct Actor<C, const NI: usize = 1, const NO: usize = 1>
where
C: Update,
{
pub(crate) inputs: Option<Vec<Box<dyn InputObject>>>,
pub(crate) outputs: Option<Vec<Box<dyn OutputObject>>>,
pub(crate) client: Arc<Mutex<C>>,
name: Option<String>,
image: Option<String>,
}
impl<C: Update, const NI: usize, const NO: usize> Clone for Actor<C, NI, NO> {
fn clone(&self) -> Self {
Self {
inputs: None,
outputs: None,
client: self.client.clone(),
name: self.name.clone(),
image: self.image.clone(),
}
}
}
impl<C, const NI: usize, const NO: usize> From<&Actor<C, NI, NO>> for PlainActor
where
C: Update,
{
fn from(actor: &Actor<C, NI, NO>) -> Self {
Self {
client: actor.name.as_ref().unwrap_or(&actor.who()).to_owned(),
inputs_rate: NI,
outputs_rate: NO,
inputs: actor
.inputs
.as_ref()
.map(|inputs| inputs.iter().map(|o| IO::from(o)).collect()),
outputs: actor
.outputs
.as_ref()
.map(|outputs| outputs.iter().map(|o| IO::from(o)).collect()),
hash: 0,
image: actor.image.as_ref().cloned(),
}
}
}
impl<C, const NI: usize, const NO: usize> fmt::Display for Actor<C, NI, NO>
where
C: Update,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{}:", self.who().to_uppercase())?;
if let Some(inputs) = self.inputs.as_ref() {
writeln!(f, " - inputs #{:>1}:", inputs.len())?;
for (k, input) in self.inputs.as_ref().unwrap().iter().enumerate() {
writeln!(f, " {}. {}", 1 + k, input)?;
}
}
if let Some(outputs) = self.outputs.as_ref() {
writeln!(f, " - outputs #{:>1}:", outputs.len())?;
for (k, output) in self.outputs.as_ref().unwrap().iter().enumerate() {
writeln!(f, " {}. {}", 1 + k, output)?;
}
}
Ok(())
}
}
impl<C, const NI: usize, const NO: usize> fmt::Debug for Actor<C, NI, NO>
where
C: Update + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Actor")
.field("inputs", &self.inputs)
.field("outputs", &self.outputs)
.field("client", &self.client)
.field("name", &self.name)
.field("image", &self.image)
.finish()
}
}
impl<C, const NI: usize, const NO: usize> From<C> for Actor<C, NI, NO>
where
C: Update,
{
fn from(client: C) -> Self {
Actor::new(Arc::new(Mutex::new(client)))
}
}
impl<C, S, const NI: usize, const NO: usize> From<(C, S)> for Actor<C, NI, NO>
where
C: Update,
S: Into<String>,
{
fn from((client, name): (C, S)) -> Self {
let mut actor = Actor::new(Arc::new(Mutex::new(client)));
actor.name = Some(name.into());
actor
}
}
impl<C, const NI: usize, const NO: usize> Who<C> for Actor<C, NI, NO>
where
C: Update,
{
fn who(&self) -> String {
self.name
.as_ref()
.cloned()
.unwrap_or_else(|| std::any::type_name::<C>().into())
}
}
impl<C, const NI: usize, const NO: usize> Actor<C, NI, NO>
where
C: Update,
{
pub fn new(client: Arc<Mutex<C>>) -> Self {
Self {
inputs: None,
outputs: None,
client,
name: None,
image: None,
}
}
pub fn name<S: Into<String>>(self, name: S) -> Self {
Self {
name: Some(name.into()),
..self
}
}
pub fn image<S: Into<String>>(self, image: S) -> Self {
Self {
image: Some(image.into()),
..self
}
}
pub fn client(&self) -> Arc<Mutex<C>> {
Arc::clone(&self.client)
}
pub(super) async fn collect(&mut self) -> Result<&mut Self> {
if let Some(inputs) = &mut self.inputs {
let futures: FuturesUnordered<_> =
inputs.iter_mut().map(|input| input.recv()).collect();
join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
}
Ok(self)
}
pub(super) async fn distribute(&mut self) -> Result<&mut Self> {
if let Some(outputs) = &mut self.outputs {
let futures: FuturesUnordered<_> =
outputs.iter_mut().map(|output| output.send()).collect();
join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
}
Ok(self)
}
pub(super) async fn bootstrap(&mut self) -> Result<bool> {
if let Some(outputs) = &mut self.outputs {
async fn inner(outputs: &mut Vec<Box<dyn OutputObject>>) -> Result<Vec<()>> {
let futures: Vec<_> = outputs
.iter_mut()
.filter(|output| output.bootstrap())
.inspect(|output| {
interface::print_info(
format!("{} bootstrapped", output.highlight()),
None::<&dyn std::error::Error>,
)
})
.map(|output| output.send())
.collect();
join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}
if NO >= NI {
inner(outputs).await.map(|result| !result.is_empty())
} else {
let mut a = true;
for _ in 0..NI / NO {
a = a && inner(outputs).await.map(|result| !result.is_empty())?;
}
Ok(a)
}
} else {
Ok(false)
}
}
}
impl<'a, C, const NI: usize, const NO: usize> AddActorOutput<'a, C, NI, NO> for Actor<C, NI, NO>
where
C: Update + 'static,
{
fn add_output(&'a mut self) -> ActorOutput<'a, Actor<C, NI, NO>> {
ActorOutput::new(self, ActorOutputBuilder::new(1))
}
}
impl<U, C, const NI: usize, const NO: usize> AddActorInput<U, C, NI, NO> for Actor<C, NI, NO>
where
C: Read<U> + 'static,
U: 'static + UniqueIdentifier,
{
fn add_input(&mut self, rx: flume::Receiver<Data<U>>, hash: u64) {
let input: Input<C, U, NI> = Input::new(rx, self.client.clone(), hash);
if let Some(ref mut inputs) = self.inputs {
inputs.push(Box::new(input));
} else {
self.inputs = Some(vec![Box::new(input)]);
}
}
}