dager/
node.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::any::Any;
6use std::any::TypeId;
7use std::sync::Arc;
8use std::sync::Mutex;
9
10use log::{warn, info};
11
12use crate::DErr;
13use crate::edge::Edge;
14use crate::executor::Executor;
15use crate::inport::InSignature;
16use crate::signature::IntoIoSignature;
17use crate::outport::OutSignature;
18
19///The Aggregator is put "around" some implementation of `Node`. It handles waiting for data, edge assignment
20/// as well as using default data provided by the nodes `default()` function if no edge is supplied for some port.
21pub struct Aggregator<I: IntoIoSignature, O: IntoIoSignature>{
22    pub(crate) ins: <I as IntoIoSignature>::InSig,
23    pub(crate) outs: <O as IntoIoSignature>::OutSig,
24    node: Arc<Mutex<dyn Node<InSig = I, OutSig = O> + Send>>,
25
26    //flag that is true if the default values have been loaded since the last execution.
27    defaults_loaded: bool,
28}
29
30impl<I, O> Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
31    ///Creates this aggregator from some `Node` implementing object. Takes ownership of that node. If that is not desired, use the
32    /// `from_arc()` function.
33    pub fn from_node<N>(node: N) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
34	Aggregator{
35	    ins: I::into_in_collection(),
36	    outs: O::into_out_collection(),
37	    node: Arc::new(Mutex::new(node)),
38	    defaults_loaded: false,
39	}
40    }
41
42    pub fn from_arc<N>(node: Arc<Mutex<N>>) -> Self where N: Node<InSig = I, OutSig = O> + Send + 'static{
43	Aggregator{
44	    ins: I::into_in_collection(),
45	    outs: O::into_out_collection(),
46	    node,
47	    defaults_loaded: false,
48	}
49    }
50
51    fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>{
52	
53	let process_input = I::from_collection(&mut self.ins);
54
55	let process_output = {
56	    let mut node_lock = self.node.lock().unwrap();
57	    info!("Executing node [{}]", node_lock.name());
58	    node_lock.process(process_input)
59	};
60	process_output.send(executor, &mut self.outs)?;
61	
62	self.defaults_loaded = false;
63	Ok(())
64    }
65}
66
67impl<I, O> AbstAggregator for Aggregator<I, O> where I: IntoIoSignature, O: IntoIoSignature{
68    
69    fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>{
70	self.ins.set_edge(edge)
71    }
72    fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>{
73	self.outs.set_edge(edge)
74    }
75
76    fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
77	self.ins.remove_edge(port_idx)	
78    }
79
80    fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>{
81	self.outs.remove_edge(port_idx)
82    }
83    
84    fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
85	if !self.defaults_loaded{
86	    let nodec = self.node.clone();
87	    nodec.lock().unwrap().default(self);
88	}
89	//Set anyways since this comes from an edge which overwrites default data
90	self.ins.set(idx, value)?;
91
92	if self.ins.all_set(){
93	    self.execute(executor)?;
94	}
95	
96	Ok(())
97    }
98    
99    fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>{
100
101	//Only set default value if this input has no edge assigned
102	if !self.ins.has_edge(idx){
103	    self.ins.set(idx, value)?;
104	    Ok(())
105	}else{
106	    //Silent ignore
107	    Ok(())
108	}
109	
110    }
111
112    fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>{
113	I::get_type_id(in_port_idx)
114    }
115    
116    fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>{
117	O::get_type_id(out_port_idx)
118    }
119}
120
121pub trait AbstAggregator{
122    ///Removes some `edge`. Should also notify the partner on this edge. Should do nothing,
123    ///if the node has not edge set at the `edge`'s end_idx.
124    fn remove_in_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
125	match self.notify_in_remove(port_idx){
126	    Ok(partner) => {
127		if let Err(err) = partner.start_node.lock().unwrap().notify_out_remove(partner.start_idx){
128		    warn!("Could not notifiy partner of edge removal");
129		    Err(err)
130		}else{
131		    Ok(())
132		}
133	    },
134	    Err(e) => {
135		warn!("Could not remove edge on node: {:?}", e);
136		Err(e)
137	    }
138	}
139    }
140    ///See remove_in_edge, but in reverse.
141    fn remove_out_edge(&mut self, port_idx: usize) -> Result<(), DErr>{
142	match self.notify_out_remove(port_idx){
143	    Ok(partner) => {
144		if let Err(err) = partner.end_node.lock().unwrap().notify_in_remove(partner.end_idx){
145		    warn!("Could not notifiy partner of edge removal");
146		    Err(err)
147		}else{
148		    Ok(())
149		}
150	    },
151	    Err(e) => {
152		warn!("Could not remove edge on node: {:?}", e);
153		Err(e)
154	    }
155	}
156    }
157    ///Removes the in edge at `port_idx`. Returns the removed edge, or why this wasn't possible.
158    fn notify_in_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
159    ///Removes the out edge at `port_idx`. Returns the removed edge, or why this wasn't possible.
160    fn notify_out_remove(&mut self, port_idx: usize) -> Result<Edge, DErr>;
161    fn set_in_edge(&mut self, edge: Edge) -> Result<(), DErr>;
162    fn set_out_edge(&mut self, edge: Edge) -> Result<(), DErr>;
163    fn in_type_id(&self, in_port_idx: usize) -> Option<TypeId>;
164    fn out_type_id(&self, out_port_idx: usize) -> Option<TypeId>;
165    fn set_in_from_edge(&mut self, executor: Arc<Executor>, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
166    fn set_default_value(&mut self, idx: usize, value: Box<dyn Any>) -> Result<(), DErr>;
167}
168
169///Trait that is implemented for nodes without inputs. Can be used to start a graph from some "start" node.
170pub trait Executable{
171    fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr>;
172}
173
174impl<O> Executable for Aggregator<(), O> where O: IntoIoSignature{
175    fn execute(&mut self, executor: Arc<Executor>) -> Result<(), DErr> {
176	self.execute(executor)
177    }
178} 
179
180pub trait Node{
181    ///Signature of the input data
182    type InSig;
183    ///Signature of the output data
184    type OutSig;
185
186    ///The main process function that needs to be defined. Transforms some `input` to some output.
187    fn process(&mut self, input: Self::InSig) -> Self::OutSig;
188
189    ///Is called before the procesing on some node starts. Makes it possible for the node to set default values for its inputs
190    fn default(&mut self, _aggregator: &mut dyn AbstAggregator){
191	
192    }
193
194    ///If implemented this name is used while logging things about this node.
195    fn name<'a>(&'a self) -> &'a str{
196	""
197    }
198}
199
200
201///Creates and arc wrapped Aggregator. Something that's needed quiet often when connecting nodes by hand.
202pub fn arc_node<N, I, O>(node: N) -> Arc<Mutex<Aggregator<I,O>>>
203where N: Node<InSig = I, OutSig = O> + Send + 'static,
204      I: IntoIoSignature,
205      O: IntoIoSignature
206{
207    Arc::new(Mutex::new(Aggregator::from_node(node)))
208}