kproc/processors/
cache.rs1pub use kproc_pmacros::Processor;
4use std::collections::LinkedList;
5
6use crate::processor::{InputStreams, OutputStreams};
7use crate::{processor, Result};
8
9#[derive(Clone, InputStreams)]
11pub struct Inputs<T>
12where
13 T: Clone,
14{
15 t: T,
16}
17
18#[derive(Clone, OutputStreams)]
20pub struct Outputs<T>
21where
22 T: Clone,
23{
24 t: T,
25}
26
27#[derive(Processor)]
29#[streams(Inputs<T>, Outputs<T>)]
30pub struct Cache<T>
31where
32 T: Clone,
33{
34 _t: std::marker::PhantomData<T>,
35}
36
37impl<T> processor::Processor for Cache<T>
38where
39 T: Clone,
40{
41 async fn start(self, mut inputs: Self::InputStreams, mut outputs: Self::OutputStreams)
43 {
44 let mut cache = LinkedList::<T>::new();
45 loop
46 {
47 let i = inputs.next().await.unwrap();
48 if outputs.is_full()
49 {
50 cache.push_back(i.t);
51 }
52 else
53 {
54 while !cache.is_empty() && !outputs.is_full()
55 {
56 let _ = outputs
57 .next(Outputs {
58 t: cache.pop_front().unwrap(),
59 })
60 .await;
61 }
62 if outputs.is_full()
63 {
64 cache.push_back(i.t);
65 }
66 else
67 {
68 let _ = outputs.next(Outputs { t: i.t }).await;
69 }
70 }
71 }
72 }
73}