Skip to main content

kproc/processors/
cache.rs

1//! Cache values, used to avoid a stream to clog
2
3pub use kproc_pmacros::Processor;
4use std::collections::LinkedList;
5
6use crate::processor::{InputStreams, OutputStreams};
7use crate::{processor, Result};
8
9/// Inputs to cache
10#[derive(Clone, InputStreams)]
11pub struct Inputs<T>
12where
13  T: Clone,
14{
15  t: T,
16}
17
18/// Outputs to cache
19#[derive(Clone, OutputStreams)]
20pub struct Outputs<T>
21where
22  T: Clone,
23{
24  t: T,
25}
26
27/// Allow to cache a value in a kproc graph.
28#[derive(Processor)]
29#[streams(Inputs<T>, Outputs<T>)]
30pub struct Cache<T>
31where
32  T: Clone,
33{
34  _t: std::marker::PhantomData<T>,
35}
36
37impl<T> processor::Processor for Cache<T>
38where
39  T: Clone,
40{
41  // async fn process_one(&self, inputs: Self::Inputs) -> Self::Outputs {}
42  async fn start(self, mut inputs: Self::InputStreams, mut outputs: Self::OutputStreams)
43  {
44    let mut cache = LinkedList::<T>::new();
45    loop
46    {
47      let i = inputs.next().await.unwrap();
48      if outputs.is_full()
49      {
50        cache.push_back(i.t);
51      }
52      else
53      {
54        while !cache.is_empty() && !outputs.is_full()
55        {
56          let _ = outputs
57            .next(Outputs {
58              t: cache.pop_front().unwrap(),
59            })
60            .await;
61        }
62        if outputs.is_full()
63        {
64          cache.push_back(i.t);
65        }
66        else
67        {
68          let _ = outputs.next(Outputs { t: i.t }).await;
69        }
70      }
71    }
72  }
73}