dataflow/pipeline/premade/
stateful.rs1use crate::pipeline::Node;
2use std::marker::PhantomData;
3
4pub struct Stateful<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> {
5 _phantom: PhantomData<(I, O)>,
6 function: F,
7 state: S,
8 remaining: R,
9}
10
11impl<I, O, S: Clone, F: Fn(I, &mut S) -> O + Clone, R: Fn(usize) -> usize + Clone> Clone
12 for Stateful<I, O, S, F, R>
13{
14 fn clone(&self) -> Self {
15 Self {
16 _phantom: self._phantom,
17 function: self.function.clone(),
18 state: self.state.clone(),
19 remaining: self.remaining.clone(),
20 }
21 }
22}
23
24fn identity_remaining(before: usize) -> usize {
25 before
26}
27
28impl<I, O, S, F: Fn(I, &mut S) -> O> Stateful<I, O, S, F, fn(usize) -> usize> {
29 pub fn new(state: S, function: F) -> Self {
31 Stateful {
32 _phantom: PhantomData::default(),
33 function,
34 state,
35 remaining: identity_remaining,
36 }
37 }
38}
39
40impl<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> Stateful<I, O, S, F, R> {
41 pub fn remaining<N: Fn(usize) -> usize>(self, remaining_fn: N) -> Stateful<I, O, S, F, N> {
42 Stateful {
43 _phantom: PhantomData::default(),
44 function: self.function,
45 state: self.state,
46 remaining: remaining_fn,
47 }
48 }
49}
50
51impl<I, O, S, F: Fn(I, &mut S) -> O, R: Fn(usize) -> usize> Node<I> for Stateful<I, O, S, F, R> {
52 type Output = O;
53
54 fn process(&mut self, input: I) -> Self::Output {
55 (self.function)(input, &mut self.state)
56 }
57
58 fn data_remaining(&self, before: usize) -> usize {
59 (self.remaining)(before)
60 }
61}