dataflow/pipeline/premade/
stateful.rs

1use crate::pipeline::Node;
2use std::marker::PhantomData;
3
4pub struct Stateful<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> {
5    _phantom: PhantomData<(I, O)>,
6    function: F,
7    state: S,
8    remaining: R,
9}
10
11impl<I, O, S: Clone, F: Fn(I, &mut S) -> O + Clone, R: Fn(usize) -> usize + Clone> Clone
12    for Stateful<I, O, S, F, R>
13{
14    fn clone(&self) -> Self {
15        Self {
16            _phantom: self._phantom,
17            function: self.function.clone(),
18            state: self.state.clone(),
19            remaining: self.remaining.clone(),
20        }
21    }
22}
23
24fn identity_remaining(before: usize) -> usize {
25    before
26}
27
28impl<I, O, S, F: Fn(I, &mut S) -> O> Stateful<I, O, S, F, fn(usize) -> usize> {
29    /// Initialize a new stateful node, with a state and a process function.
30    pub fn new(state: S, function: F) -> Self {
31        Stateful {
32            _phantom: PhantomData::default(),
33            function,
34            state,
35            remaining: identity_remaining,
36        }
37    }
38}
39
40impl<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> Stateful<I, O, S, F, R> {
41    pub fn remaining<N: Fn(usize) -> usize>(self, remaining_fn: N) -> Stateful<I, O, S, F, N> {
42        Stateful {
43            _phantom: PhantomData::default(),
44            function: self.function,
45            state: self.state,
46            remaining: remaining_fn,
47        }
48    }
49}
50
51impl<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> Node<I> for Stateful<I, O, S, F, R> {
52    type Output = O;
53
54    fn process(&mut self, input: I) -> Self::Output {
55        (self.function)(input, &mut self.state)
56    }
57
58    fn data_remaining(&self, before: usize) -> usize {
59        (self.remaining)(before)
60    }
61}