use std::any::Any;
use std::any::TypeId;
use std::sync::Arc;
use std::sync::Mutex;
use log::{warn, info};
use crate::DErr;
use crate::edge::Edge;
use crate::executor::Executor;
use crate::inport::InSignature;
use crate::signature::IntoIoSignature;
use crate::outport::OutSignature;
pub struct Aggregator<I: IntoIoSignature, O: IntoIoSignature>{
pub(crate) ins: <I as IntoIoSignature>::InSig,
pub(crate) outs: <O as IntoIoSignature>::OutSig,
node: Arc<Mutex<dyn Node<InSig = I, OutSig = O> + Send>>,
defaults_loaded: bool,
}
impl<I, O> Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
pub fn from_node<N>(node: N) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
Aggregator{
ins: I::into_in_collection(),
outs: O::into_out_collection(),
node: Arc::new(Mutex::new(node)),
defaults_loaded: false,
}
}
pub fn from_arc<N>(node: Arc<Mutex<N>>) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
Aggregator{
ins: I::into_in_collection(),
outs: O::into_out_collection(),
node,
defaults_loaded: false,
}
}
fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>{
let process_input = I::from_collection(&mut self.ins);
let process_output = {
let mut node_lock = self.node.lock().unwrap();
info!("Executing node [{}]", node_lock.name());
node_lock.process(process_input)
};
process_output.send(executor, &mut self.outs)?;
self.defaults_loaded = false;
Ok(())
}
}
impl<I, O> AbstAggregator for Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>{
self.ins.set_edge(edge)
}
fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>{
self.outs.set_edge(edge)
}
fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
self.ins.remove_edge(port_idx)
}
fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
self.outs.remove_edge(port_idx)
}
fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
if !self.defaults_loaded{
let nodec = self.node.clone();
nodec.lock().unwrap().default(self);
}
self.ins.set(idx, value)?;
if self.ins.all_set(){
self.execute(executor)?;
}
Ok(())
}
fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
if !self.ins.has_edge(idx){
self.ins.set(idx, value)?;
Ok(())
}else{
Ok(())
}
}
fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>{
I::get_type_id(in_port_idx)
}
fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>{
O::get_type_id(out_port_idx)
}
}
pub trait AbstAggregator{
fn remove_in_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
match self.notify_in_remove(port_idx){
Ok(partner) => {
if let Err(err) = partner.start_node.lock().unwrap().notify_out_remove(partner.start_idx){
warn!("Could not notifiy partner of edge removal");
Err(err)
}else{
Ok(())
}
},
Err(e) => {
warn!("Could not remove edge on node: {:?}", e);
Err(e)
}
}
}
fn remove_out_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
match self.notify_out_remove(port_idx){
Ok(partner) => {
if let Err(err) = partner.end_node.lock().unwrap().notify_in_remove(partner.end_idx){
warn!("Could not notifiy partner of edge removal");
Err(err)
}else{
Ok(())
}
},
Err(e) => {
warn!("Could not remove edge on node: {:?}", e);
Err(e)
}
}
}
fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>;
fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>;
fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>;
fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>;
fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
}
pub trait Executable{
fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>;
}
impl<O> Executable for Aggregator<(), O> where O: IntoIoSignature{
fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr> {
self.execute(executor)
}
}
pub trait Node{
type InSig;
type OutSig;
fn process(&mut self, input: Self::InSig) -> Self::OutSig;
fn default(&mut self, _aggregator: &mut dyn AbstAggregator){
}
fn name<'a>(&'a self) -> &'a str{
""
}
}
pub fn arc_node<N, I, O>(node: N) -> Arc<Mutex<Aggregator<I,O>>>
where N: Node<InSig = I, OutSig = O> + Send + 'static,
I: IntoIoSignature,
O: IntoIoSignature
{
Arc::new(Mutex::new(Aggregator::from_node(node)))
}