streaming_harness/
input.rs1use std::ops::Add;
2
3pub trait InputTimeResumableIterator<T: Eq+Ord>: Iterator<Item=T> {
4 #[inline(always)]
5 fn peek(&mut self) -> Option<&T>;
6 #[inline(always)]
7 fn end(&self) -> bool;
8}
9
10pub struct ConstantThroughputInputTimes<T: Copy+Eq+Ord+Add<DT, Output=T>, DT> {
11 next: T,
12 inter_arrival: DT,
13 end: T,
14}
15
16
17impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> ConstantThroughputInputTimes<T, DT> {
18 pub fn new(first: T, inter_arrival: DT, end: T) -> Self {
19 Self {
20 next: first,
21 inter_arrival,
22 end,
23 }
24 }
25}
26
27impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> Iterator for ConstantThroughputInputTimes<T, DT> {
28 type Item = T;
29 #[inline(always)]
30 fn next(&mut self) -> Option<T> {
31 if !self.end() {
32 let n = self.next;
33 self.next = self.next + self.inter_arrival;
34 Some(n)
35 } else {
36 None
37 }
38 }
39}
40
41impl<T: Copy+Eq+Ord+Add<DT, Output=T>, DT: Copy> InputTimeResumableIterator<T> for ConstantThroughputInputTimes<T, DT> {
42 #[inline(always)]
43 fn peek(&mut self) -> Option<&T> {
44 if !self.end() {
45 Some(&self.next)
46 } else {
47 None
48 }
49 }
50 #[inline(always)]
51 fn end(&self) -> bool {
52 self.next >= self.end
53 }
54}
55
56pub struct SyntheticInputTimeGenerator<T: Copy+Eq+Ord, I: InputTimeResumableIterator<T>> {
57 input_times: I,
58 _phantom_data: ::std::marker::PhantomData<T>,
59}
60
61impl<T: Copy+Eq+Ord, I: InputTimeResumableIterator<T>> SyntheticInputTimeGenerator<T, I> {
62 pub fn new(input_times: I) -> Self {
63 Self {
64 input_times,
65 _phantom_data: ::std::marker::PhantomData,
66 }
67 }
68
69 pub fn iter_until<'a>(&'a mut self, until: T) -> Option<impl Iterator<Item=T>+'a> {
70 if self.input_times.end() {
71 None
72 } else {
73 Some(SyntheticInputTimeGeneratorIterator {
74 referenced: self,
75 until,
76 })
77 }
78 }
79}
80
81struct SyntheticInputTimeGeneratorIterator<'a, T: Copy+Eq+Ord+'a, I: InputTimeResumableIterator<T>+'a> {
82 referenced: &'a mut SyntheticInputTimeGenerator<T, I>,
83 until: T,
84}
85
86impl<'a, T: Copy+Eq+Ord+'a, I: InputTimeResumableIterator<T>+'a> Iterator for SyntheticInputTimeGeneratorIterator<'a, T, I> {
87 type Item = T;
88
89 fn next(&mut self) -> Option<T> {
90 if self.referenced.input_times.end() {
91 None
92 } else if let Some(&next_t) = self.referenced.input_times.peek() {
93 if next_t < self.until {
94 self.referenced.input_times.next().unwrap();
95 Some(next_t)
96 } else {
97 None
98 }
99 } else {
100 None
101 }
102 }
103}