streaming_median/lib.rs
1#[cfg(test)]
2#[macro_use]
3extern crate quickcheck;
4
5extern crate xorshift;
6extern crate arraydeque;
7
8use arraydeque::ArrayDeque;
9use std::iter::{self, FromIterator};
10use std::cmp::Ordering;
11use std::mem::uninitialized;
12
13
14/// `StreamingMedian` provides a simple interface for inserting values
15/// and calculating medians.
16pub struct StreamingMedian {
17 data: ArrayDeque<[u32; 64]>,
18 sorted: [u32; 64],
19 last_median: u32,
20}
21
22impl StreamingMedian {
23 pub fn new(initial_median: u32) -> StreamingMedian {
24 let data = ArrayDeque::from_iter(iter::repeat(initial_median).take(64));
25
26 // We use unsafe here and then immediately assign values to the
27 // unused space
28 let mut sorted: [u32; 64] = [0; 64];
29
30 for (i, t) in data.iter().enumerate() {
31 sorted[i] = *t;
32 }
33
34 StreamingMedian {
35 data,
36 sorted,
37 last_median: initial_median,
38 }
39 }
40
41 /// Returns the last median value without performing any recalculation
42 ///
43 /// # Example
44 /// ```norun
45 /// use sqs_service_handler::autoscaling::median;
46 ///
47 /// let stream = StreamingMedian::new(123_000);
48 /// assert_eq!(stream.last(), 31_000);
49 /// ```
50 pub fn last(&self) -> u32 {
51 self.last_median
52 }
53
54 /// Calculates and returns the median
55 ///
56 /// # Arguments
57 ///
58 /// * `value` - The value to be inserted into the stream
59 /// # Example
60 /// ```norun
61 /// use sqs_service_handler::autoscaling::median;
62 ///
63 /// let stream = StreamingMedian::new(123_000);
64 /// assert_eq!(stream.insert_and_calculate(31_000), 31_000);
65 /// ```
66 /// The algorithm used to efficiently insert and calculate relies
67 /// on the fact that the data is always left in a sorted state.
68 ///
69 /// First we pop off the oldest value, 'removed', from our internal
70 /// ring buffer. Then we add our new value 'value' to the buffer at
71 /// the back. We use this buffer to maintain a temporal relationship
72 /// between our values.
73 ///
74 /// A separate stack array 'self.sorted' is used to maintain a sorted
75 /// representation of the data.
76 ///
77 /// We binary search for 'removed' in our 'sorted' array and store the
78 /// index as 'remove_index'.
79 ///
80 /// We then calculate where to insert the new 'value' by binary searching
81 /// for it, either finding it already or where to insert it.
82 ///
83 /// If the 'insert_index' for our 'value' is less than the 'remove_index'
84 /// we shift the data between the 'remove_index' and the 'insert_index' over
85 /// one space. This overwrites the old value we want to remove while maintaining
86 /// order. We can then insert our value into the array.
87 ///
88 /// Example:
89 /// Starting with a self.sorted of
90 /// [2, 3, 4, 5, 7, 8]
91 /// We then call insert_and_calculate(6)
92 /// Let's assume that '3' is the oldest value. This makes 'remove_index' = 1
93 /// We search for where to insert our value '6' and its' index 3.
94 /// [2, 3, 4, 5, 7, 8] <- remove_index = 1, insert_index = 3
95 /// Shift the data between 1 and 3 over by one.
96 /// [2, 4, 5, 5, 7, 8]
97 /// Insert our value into index 3.
98 /// [2, 4, 5, 6, 7, 8]
99 ///
100 /// A similar approach is performed in the case of the insert_index being before
101 /// the remove index.
102 ///
103 /// Unsafe is used here to dramatically improve performance - a full 3-5x
104 pub fn insert_and_calculate(&mut self, value: u32) -> u32 {
105 let mut scratch_space: [u32; 64] = unsafe { uninitialized() };
106
107 let removed = self.data.pop_front().unwrap();
108 let _ = self.data.push_back(value); // If we pop_front, push_back can never fail
109
110 if removed == value {
111 return self.sorted[31];
112 }
113
114 let remove_index = binary_search(&self.sorted, &removed);
115
116 // If removed is larger than value than the remove_index must be
117 // after the insert_index, allowing us to cut our search down
118 let insert_index = {
119 if removed > value {
120 let sorted_slice = &self.sorted[..remove_index];
121 binary_search(sorted_slice, &value)
122 } else {
123 let sorted_slice = &self.sorted[remove_index..];
124 remove_index + binary_search(sorted_slice, &value)
125 }
126 };
127
128 // shift the data between remove_index and insert_index so that the
129 // value of remove_index is overwritten and the 'value' can be placed
130 // in the gap between them
131
132 if remove_index < insert_index {
133 // Starting with a self.sorted of
134 // [2, 3, 4, 5, 7, 8]
135 // insert_and_calculate(6)
136 // [2, 3, 4, 5, 7, 8] <- remove_index = 1, insert_index = 3
137 // [2, 4, 5, 5, 7, 8]
138 // [2, 4, 5, 6, 7, 8]
139
140 scratch_space[remove_index + 1..insert_index]
141 .copy_from_slice(&self.sorted[remove_index + 1..insert_index]);
142
143 self.sorted[remove_index..insert_index - 1]
144 .copy_from_slice(&scratch_space[remove_index + 1..insert_index]);
145
146 self.sorted[insert_index - 1] = value;
147
148 } else {
149 // Starting with a self.sorted of
150 // [2, 3, 4, 5, 7, 8, 9]
151 // insert_and_calculate(6)
152 // [2, 3, 4, 5, 7, 8, 9] <- remove_index = 5, insert_index = 3
153 // [2, 3, 4, 5, 5, 7, 9] Shift values
154 // [2, 3, 4, 6, 7, 8, 9] Insert value
155 scratch_space[insert_index..remove_index]
156 .copy_from_slice(&self.sorted[insert_index..remove_index]);
157
158 self.sorted[insert_index + 1..remove_index + 1]
159 .copy_from_slice(&scratch_space[insert_index..remove_index]);
160
161 self.sorted[insert_index] = value;
162
163 }
164
165 let median = self.sorted[31];
166 self.last_median = median;
167 median
168 }
169}
170
171fn binary_search<T>(t: &[T], x: &T) -> usize where T: Ord {
172 binary_search_by(t, |p| p.cmp(x))
173}
174
175// A custom binary search that always returns a usize, showing where an item is or
176// where an item can be inserted to preserve sorted order
177// Since we have no use for differentiating between the two cases, a single usize
178// is sufficient.
179fn binary_search_by<T, F>(t: &[T], mut f: F) -> usize
180 where F: FnMut(&T) -> Ordering
181{
182 let mut base = 0usize;
183 let mut s = t;
184
185 loop {
186 let (head, tail) = s.split_at(s.len() >> 1);
187 if tail.is_empty() {
188 return base;
189 }
190 match f(&tail[0]) {
191 Ordering::Less => {
192 base += head.len() + 1;
193 s = &tail[1..];
194 }
195 Ordering::Greater => s = head,
196 Ordering::Equal => return base + head.len(),
197 }
198 }
199}
200
201#[cfg(test)]
202mod test {
203 use super::*;
204 use xorshift::{Xoroshiro128, Rng, SeedableRng};
205 use std::time::{SystemTime, UNIX_EPOCH};
206 use std::time::Duration;
207
208 const NANOS_PER_MILLI: u32 = 1000_000;
209 const MILLIS_PER_SEC: u64 = 1000;
210
211 pub fn millis(d: Duration) -> u64 {
212 // A proper Duration will not overflow, because MIN and MAX are defined
213 // such that the range is exactly i64 milliseconds.
214 let secs_part = d.as_secs() * MILLIS_PER_SEC;
215 let nanos_part = d.subsec_nanos() / NANOS_PER_MILLI;
216 secs_part + nanos_part as u64
217 }
218
219 #[test]
220 fn test_median_random() {
221 let t = millis(SystemTime::now().duration_since(UNIX_EPOCH).unwrap());
222 let mut rng = Xoroshiro128::from_seed(&[t, 71, 1223]);
223
224 let mut median_tracker = StreamingMedian::new(123_000);
225 for _ in 0..100_000 {
226 median_tracker.insert_and_calculate(rng.gen());
227 }
228
229 for i in median_tracker.sorted.windows(2) {
230 assert!(i[0] <= i[1]);
231 }
232 }
233
234 #[test]
235 fn test_median_ascending() {
236 let mut median_tracker = StreamingMedian::new(123_000);
237
238 let mut ascending_iter = 0..;
239 for _ in 0..100_000 {
240 median_tracker.insert_and_calculate(ascending_iter.next().unwrap());
241 }
242
243 for i in median_tracker.sorted.windows(2) {
244 assert!(i[0] <= i[1]);
245 }
246 }
247
248 #[test]
249 fn test_median_descending() {
250 let mut median_tracker = StreamingMedian::new(123_000);
251
252 let mut ascending_iter = 200_000..;
253 for _ in 0..100_000 {
254 median_tracker.insert_and_calculate(ascending_iter.next().unwrap());
255 }
256
257 for i in median_tracker.sorted.windows(2) {
258 assert!(i[0] <= i[1]);
259 }
260 }
261
262 #[test]
263 fn test_poison_absence() {
264 let mut median_tracker = StreamingMedian::new(123_000);
265
266 for _ in 0..64 {
267 median_tracker.insert_and_calculate(1);
268 }
269
270 for i in median_tracker.sorted.iter() {
271 assert_ne!(*i, 123_000);
272 }
273 }
274
275 quickcheck! {
276 fn maintains_sorted(default: u32, input: u32) -> bool {
277 let mut median_tracker = StreamingMedian::new(default );
278 median_tracker.insert_and_calculate(input);
279
280 for i in median_tracker.sorted.windows(2) {
281 if i[0] > i[1] {
282 return false
283 }
284 }
285 true
286 }
287 }
288}