dataflow/pipeline/
node.rs

1use super::{Duplicator, Pair};
2
3pub trait Node<Input> {
4    type Output;
5
6    /// Process a batch of data
7    fn process(&mut self, input: Input) -> Self::Output;
8    /// Reset signal propogates through pipeline
9    fn reset(&mut self) {}
10    /// Get number of examples left
11    fn data_remaining(&self, before: usize) -> usize {
12        before
13    } // Defaults to same as previous remaining data
14}
15
16impl<I, O, F: FnMut(I) -> O> Node<I> for F {
17    type Output = O;
18    fn process(&mut self, input: I) -> Self::Output {
19        (self)(input)
20    }
21}
22
23pub trait ExtendNode<Input, Output, E: Node<Input, Output = Output>> {
24    fn chain<O, N: Node<Output, Output = O>>(self, node: N) -> (E, N);
25}
26
27impl<Input, Output, E: Node<Input, Output = Output>> ExtendNode<Input, Output, E> for E {
28    fn chain<O, N: Node<Output, Output = O>>(self, node: N) -> (E, N)
29    where
30        Self: std::marker::Sized,
31    {
32        (self, node)
33    }
34}
35
36pub trait ExtendNodeSplit<Input, Output: Clone, E: Node<Input, Output = Output>> {
37    #[allow(clippy::type_complexity)]
38    fn split<O1, O2, E1: Node<Output, Output = O1>, E2: Node<Output, Output = O2>>(
39        self,
40        node1: E1,
41        node2: E2,
42    ) -> (E, Duplicator<Output>, Pair<Output, Output, E1, E2>);
43}
44
45impl<Input, Output: Clone, E: Node<Input, Output = Output>> ExtendNodeSplit<Input, Output, E>
46    for E
47{
48    #[allow(clippy::type_complexity)]
49    fn split<O1, O2, E1: Node<Output, Output = O1>, E2: Node<Output, Output = O2>>(
50        self,
51        node1: E1,
52        node2: E2,
53    ) -> (E, Duplicator<Output>, Pair<Output, Output, E1, E2>) {
54        (self, Duplicator::default(), Pair::new(node1, node2))
55    }
56}
57
58pub trait ExtendNodePair<Input, Out1, Out2, E: Node<Input, Output = (Out1, Out2)>> {
59    #[allow(clippy::type_complexity)]
60    fn pair<F1, F2, N1: Node<Out1, Output = F1>, N2: Node<Out2, Output = F2>>(
61        self,
62        node1: N1,
63        node2: N2,
64    ) -> (E, Pair<Out1, Out2, N1, N2>);
65}
66
67impl<Input, Out1, Out2, E: Node<Input, Output = (Out1, Out2)>> ExtendNodePair<Input, Out1, Out2, E>
68    for E
69{
70    fn pair<F1, F2, N1: Node<Out1, Output = F1>, N2: Node<Out2, Output = F2>>(
71        self,
72        node1: N1,
73        node2: N2,
74    ) -> (E, Pair<Out1, Out2, N1, N2>) {
75        (self, Pair::new(node1, node2))
76    }
77}