1use std::{collections::VecDeque, sync::mpsc, time::Duration};
2
3use crate::DeltaDuration;
4
5type MapFunc<In, Out> = Box<dyn FnMut(&mut VecDeque<In>, Duration) -> Option<TimedValue<Out>> + Send>;
6
7pub struct StreamMapper<In, Out> {
8 pub values_in: VecDeque<In>,
9 step: MapFunc<In, Out>,
10}
11
12impl<In, Out> StreamMapper<In, Out> {
13 pub fn new<F>(step: F) -> StreamMapper<In, Out>
14 where
15 F: FnMut(&mut VecDeque<In>, Duration) -> Option<TimedValue<Out>> + Send + 'static,
16 {
17 StreamMapper {
18 values_in: VecDeque::new(),
19 step: Box::new(step),
20 }
21 }
22
23 pub fn step(&mut self, since_start: Duration) -> Option<TimedValue<Out>> {
24 if !self.values_in.is_empty() {
25 (self.step)(&mut self.values_in, since_start)
26 } else {
27 None
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
33pub struct TimedValue<T> {
34 pub since_start: Duration,
35 pub value: T,
36}
37
38pub struct IntermittentSink<Output> {
39 channel_in: mpsc::Receiver<Output>,
40 send: Box<dyn FnMut(Output)>,
41}
42
43impl<Output> IntermittentSink<Output> {
44 pub fn new<F>(channel_in: mpsc::Receiver<Output>, send: F) -> Self
45 where
46 F: FnMut(Output) + 'static,
47 {
48 IntermittentSink {
49 channel_in,
50 send: Box::new(send),
51 }
52 }
53
54 pub fn start(&mut self) {
56 while let Ok(value) = self.channel_in.recv() {
57 (self.send)(value);
58 }
59 }
60}
61
62pub struct IntermittentSource<Input, Converted> {
63 relative: Option<DeltaDuration>,
64 channel_out: mpsc::Sender<TimedValue<Converted>>,
65 mapper: StreamMapper<Input, Converted>,
66}
67
68impl<Input, Converted> IntermittentSource<Input, Converted> {
69 pub fn new<F>(out: mpsc::Sender<TimedValue<Converted>>, convert: F) -> Self
70 where
71 F: FnMut(&mut VecDeque<Input>, Duration) -> Option<TimedValue<Converted>> + 'static + Send,
72 {
73 IntermittentSource {
74 relative: None,
75 channel_out: out,
76 mapper: StreamMapper::new(convert),
77 }
78 }
79
80 pub fn input_messages(
81 &mut self,
82 messages_in: impl IntoIterator<Item = Input>,
83 since_start: Duration,
84 timestamp: Duration,
85 ) {
86 let processed_timestamp = if let Some(relative) = &self.relative {
87 relative.add_to(timestamp)
88 } else {
89 self.relative = Some(DeltaDuration::sub(timestamp, since_start));
90
91 since_start
92 };
93
94 self.mapper.values_in.extend(messages_in);
95
96 while let Some(value) = self.mapper.step(processed_timestamp) {
97 if self.channel_out.send(value).is_err() {
98 return; }
100 }
101 }
102}