streaming_harness/
input.rs

1use std::ops::Add;
2
3pub trait InputTimeResumableIterator<T: Eq+Ord>: Iterator<Item=T> {
4    #[inline(always)]
5    fn peek(&mut self) -> Option<&T>;
6    #[inline(always)]
7    fn end(&self) -> bool;
8}
9
10pub struct ConstantThroughputInputTimes<T: Copy+Eq+Ord+Add<DT, Output=T>, DT> {
11    next: T,
12    inter_arrival: DT,
13    end: T,
14}
15
16
17impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> ConstantThroughputInputTimes<T, DT> {
18    pub fn new(first: T, inter_arrival: DT, end: T) -> Self {
19        Self {
20            next: first,
21            inter_arrival,
22            end,
23        }
24    }
25}
26
27impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> Iterator for ConstantThroughputInputTimes<T, DT> {
28    type Item = T;
29    #[inline(always)]
30    fn next(&mut self) -> Option<T> {
31        if !self.end() {
32            let n = self.next;
33            self.next = self.next + self.inter_arrival;
34            Some(n)
35        } else {
36            None
37        }
38    }
39}
40
41impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> InputTimeResumableIterator<T> for ConstantThroughputInputTimes<T, DT> {
42    #[inline(always)]
43    fn peek(&mut self) -> Option<&T> {
44        if !self.end() {
45            Some(&self.next)
46        } else {
47            None
48        }
49    }
50    #[inline(always)]
51    fn end(&self) -> bool {
52        self.next >= self.end
53    }
54}
55
56pub struct SyntheticInputTimeGenerator<T: Copy+Eq+Ord, I: InputTimeResumableIterator<T>> {
57    input_times: I,
58    _phantom_data: ::std::marker::PhantomData<T>,
59}
60
61impl<T: Copy+Eq+Ord, I: InputTimeResumableIterator<T>> SyntheticInputTimeGenerator<T, I> {
62    pub fn new(input_times: I) -> Self {
63        Self {
64            input_times,
65            _phantom_data: ::std::marker::PhantomData,
66        }
67    }
68
69    pub fn iter_until<'a>(&'a mut self, until: T) -> Option<impl Iterator<Item=T>+'a> {
70        if self.input_times.end() {
71            None
72        } else {
73            Some(SyntheticInputTimeGeneratorIterator {
74                referenced: self,
75                until,
76            })
77        }
78    }
79}
80
81struct SyntheticInputTimeGeneratorIterator<'a, T: Copy+Eq+Ord+'a, I: InputTimeResumableIterator<T>+'a> {
82    referenced: &'a mut SyntheticInputTimeGenerator<T, I>,
83    until: T,
84}
85
86impl<'a, T: Copy+Eq+Ord+'a, I: InputTimeResumableIterator<T>+'a> Iterator for SyntheticInputTimeGeneratorIterator<'a, T, I> {
87    type Item = T;
88
89    fn next(&mut self) -> Option<T> {
90        if self.referenced.input_times.end() {
91            None
92        } else if let Some(&next_t) = self.referenced.input_times.peek() {
93            if next_t < self.until {
94                self.referenced.input_times.next().unwrap();
95                Some(next_t)
96            } else {
97                None
98            }
99        } else {
100            None
101        }
102    }
103}