clocked/
intermittent.rs

1use std::{collections::VecDeque, sync::mpsc, time::Duration};
2
3use crate::DeltaDuration;
4
5type MapFunc<In, Out> = Box<dyn FnMut(&mut VecDeque<In>, Duration) -> Option<TimedValue<Out>> + Send>;
6
7pub struct StreamMapper<In, Out> {
8    pub values_in: VecDeque<In>,
9    step: MapFunc<In, Out>,
10}
11
12impl<In, Out> StreamMapper<In, Out> {
13    pub fn new<F>(step: F) -> StreamMapper<In, Out>
14    where
15        F: FnMut(&mut VecDeque<In>, Duration) -> Option<TimedValue<Out>> + Send + 'static,
16    {
17        StreamMapper {
18            values_in: VecDeque::new(),
19            step: Box::new(step),
20        }
21    }
22
23    pub fn step(&mut self, since_start: Duration) -> Option<TimedValue<Out>> {
24        if !self.values_in.is_empty() {
25            (self.step)(&mut self.values_in, since_start)
26        } else {
27            None
28        }
29    }
30}
31
32#[derive(Debug, Clone)]
33pub struct TimedValue<T> {
34    pub since_start: Duration,
35    pub value: T,
36}
37
38pub struct IntermittentSink<Output> {
39    channel_in: mpsc::Receiver<Output>,
40    send: Box<dyn FnMut(Output)>,
41}
42
43impl<Output> IntermittentSink<Output> {
44    pub fn new<F>(channel_in: mpsc::Receiver<Output>, send: F) -> Self
45    where
46        F: FnMut(Output) + 'static,
47    {
48        IntermittentSink {
49            channel_in,
50            send: Box::new(send),
51        }
52    }
53
54    /// this function blocks; probably best to run in a thread
55    pub fn start(&mut self) {
56        while let Ok(value) = self.channel_in.recv() {
57            (self.send)(value);
58        }
59    }
60}
61
62pub struct IntermittentSource<Input, Converted> {
63    relative: Option<DeltaDuration>,
64    channel_out: mpsc::Sender<TimedValue<Converted>>,
65    mapper: StreamMapper<Input, Converted>,
66}
67
68impl<Input, Converted> IntermittentSource<Input, Converted> {
69    pub fn new<F>(out: mpsc::Sender<TimedValue<Converted>>, convert: F) -> Self
70    where
71        F: FnMut(&mut VecDeque<Input>, Duration) -> Option<TimedValue<Converted>> + 'static + Send,
72    {
73        IntermittentSource {
74            relative: None,
75            channel_out: out,
76            mapper: StreamMapper::new(convert),
77        }
78    }
79
80    pub fn input_messages(
81        &mut self,
82        messages_in: impl IntoIterator<Item = Input>,
83        since_start: Duration,
84        timestamp: Duration,
85    ) {
86        let processed_timestamp = if let Some(relative) = &self.relative {
87            relative.add_to(timestamp)
88        } else {
89            self.relative = Some(DeltaDuration::sub(timestamp, since_start));
90
91            since_start
92        };
93
94        self.mapper.values_in.extend(messages_in);
95
96        while let Some(value) = self.mapper.step(processed_timestamp) {
97            if self.channel_out.send(value).is_err() {
98                return; // looks like the channel hung up
99            }
100        }
101    }
102}