1use std::any::Any;
6use std::any::TypeId;
7use std::sync::Arc;
8use std::sync::Mutex;
9
10use log::{warn, info};
11
12use crate::DErr;
13use crate::edge::Edge;
14use crate::executor::Executor;
15use crate::inport::InSignature;
16use crate::signature::IntoIoSignature;
17use crate::outport::OutSignature;
18
19pub struct Aggregator<I: IntoIoSignature, O: IntoIoSignature>{
22 pub(crate) ins: <I as IntoIoSignature>::InSig,
23 pub(crate) outs: <O as IntoIoSignature>::OutSig,
24 node: Arc<Mutex<dyn Node<InSig = I, OutSig = O> + Send>>,
25
26 defaults_loaded: bool,
28}
29
30impl<I, O> Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
31 pub fn from_node<N>(node: N) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
34 Aggregator{
35 ins: I::into_in_collection(),
36 outs: O::into_out_collection(),
37 node: Arc::new(Mutex::new(node)),
38 defaults_loaded: false,
39 }
40 }
41
42 pub fn from_arc<N>(node: Arc<Mutex<N>>) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
43 Aggregator{
44 ins: I::into_in_collection(),
45 outs: O::into_out_collection(),
46 node,
47 defaults_loaded: false,
48 }
49 }
50
51 fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>{
52
53 let process_input = I::from_collection(&mut self.ins);
54
55 let process_output = {
56 let mut node_lock = self.node.lock().unwrap();
57 info!("Executing node [{}]", node_lock.name());
58 node_lock.process(process_input)
59 };
60 process_output.send(executor, &mut self.outs)?;
61
62 self.defaults_loaded = false;
63 Ok(())
64 }
65}
66
67impl<I, O> AbstAggregator for Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
68
69 fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>{
70 self.ins.set_edge(edge)
71 }
72 fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>{
73 self.outs.set_edge(edge)
74 }
75
76 fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
77 self.ins.remove_edge(port_idx)
78 }
79
80 fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
81 self.outs.remove_edge(port_idx)
82 }
83
84 fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
85 if !self.defaults_loaded{
86 let nodec = self.node.clone();
87 nodec.lock().unwrap().default(self);
88 }
89 self.ins.set(idx, value)?;
91
92 if self.ins.all_set(){
93 self.execute(executor)?;
94 }
95
96 Ok(())
97 }
98
99 fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
100
101 if !self.ins.has_edge(idx){
103 self.ins.set(idx, value)?;
104 Ok(())
105 }else{
106 Ok(())
108 }
109
110 }
111
112 fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>{
113 I::get_type_id(in_port_idx)
114 }
115
116 fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>{
117 O::get_type_id(out_port_idx)
118 }
119}
120
121pub trait AbstAggregator{
122 fn remove_in_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
125 match self.notify_in_remove(port_idx){
126 Ok(partner) => {
127 if let Err(err) = partner.start_node.lock().unwrap().notify_out_remove(partner.start_idx){
128 warn!("Could not notifiy partner of edge removal");
129 Err(err)
130 }else{
131 Ok(())
132 }
133 },
134 Err(e) => {
135 warn!("Could not remove edge on node: {:?}", e);
136 Err(e)
137 }
138 }
139 }
140 fn remove_out_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
142 match self.notify_out_remove(port_idx){
143 Ok(partner) => {
144 if let Err(err) = partner.end_node.lock().unwrap().notify_in_remove(partner.end_idx){
145 warn!("Could not notifiy partner of edge removal");
146 Err(err)
147 }else{
148 Ok(())
149 }
150 },
151 Err(e) => {
152 warn!("Could not remove edge on node: {:?}", e);
153 Err(e)
154 }
155 }
156 }
157 fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
159 fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
161 fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>;
162 fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>;
163 fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>;
164 fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>;
165 fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
166 fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
167}
168
169pub trait Executable{
171 fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>;
172}
173
174impl<O> Executable for Aggregator<(), O> where O: IntoIoSignature{
175 fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr> {
176 self.execute(executor)
177 }
178}
179
180pub trait Node{
181 type InSig;
183 type OutSig;
185
186 fn process(&mut self, input: Self::InSig) -> Self::OutSig;
188
189 fn default(&mut self, _aggregator: &mut dyn AbstAggregator){
191
192 }
193
194 fn name<'a>(&'a self) -> &'a str{
196 ""
197 }
198}
199
200
201pub fn arc_node<N, I, O>(node: N) -> Arc<Mutex<Aggregator<I,O>>>
203where N: Node<InSig = I, OutSig = O> + Send + 'static,
204 I: IntoIoSignature,
205 O: IntoIoSignature
206{
207 Arc::new(Mutex::new(Aggregator::from_node(node)))
208}