streaming_median/
lib.rs

1#[cfg(test)]
2#[macro_use]
3extern crate quickcheck;
4
5extern crate xorshift;
6extern crate arraydeque;
7
8use arraydeque::ArrayDeque;
9use std::iter::{self, FromIterator};
10use std::cmp::Ordering;
11use std::mem::uninitialized;
12
13
14/// `StreamingMedian` provides a simple interface for inserting values
15/// and calculating medians.
16pub struct StreamingMedian {
17    data: ArrayDeque<[u32; 64]>,
18    sorted: [u32; 64],
19    last_median: u32,
20}
21
22impl StreamingMedian {
23    pub fn new(initial_median: u32) -> StreamingMedian {
24        let data = ArrayDeque::from_iter(iter::repeat(initial_median).take(64));
25
26        // We use unsafe here and then immediately assign values to the
27        // unused space
28        let mut sorted: [u32; 64] = [0; 64];
29
30        for (i, t) in data.iter().enumerate() {
31            sorted[i] = *t;
32        }
33
34        StreamingMedian {
35            data,
36            sorted,
37            last_median: initial_median,
38        }
39    }
40
41    /// Returns the last median value without performing any recalculation
42    ///
43    /// # Example
44    /// ```norun
45    /// use sqs_service_handler::autoscaling::median;
46    ///
47    /// let stream = StreamingMedian::new(123_000);
48    /// assert_eq!(stream.last(), 31_000);
49    /// ```
50    pub fn last(&self) -> u32 {
51        self.last_median
52    }
53
54    /// Calculates and returns the median
55    ///
56    /// # Arguments
57    ///
58    /// * `value` - The value to be inserted into the stream
59    /// # Example
60    /// ```norun
61    /// use sqs_service_handler::autoscaling::median;
62    ///
63    /// let stream = StreamingMedian::new(123_000);
64    /// assert_eq!(stream.insert_and_calculate(31_000), 31_000);
65    /// ```
66    /// The algorithm used to efficiently insert and calculate relies
67    /// on the fact that the data is always left in a sorted state.
68    ///
69    /// First we pop off the oldest value, 'removed', from our internal
70    /// ring buffer. Then we add our new value 'value' to the buffer at
71    /// the back. We use this buffer to maintain a temporal relationship
72    /// between our values.
73    ///
74    /// A separate stack array 'self.sorted' is used to maintain a sorted
75    /// representation of the data.
76    ///
77    /// We binary search for 'removed' in our 'sorted' array and store the
78    /// index as 'remove_index'.
79    ///
80    /// We then calculate where to insert the new 'value' by binary searching
81    /// for it, either finding it already or where to insert it.
82    ///
83    /// If the 'insert_index' for our 'value' is less than the 'remove_index'
84    /// we shift the data between the 'remove_index' and the 'insert_index' over
85    /// one space. This overwrites the old value we want to remove while maintaining
86    /// order. We can then insert our value into the array.
87    ///
88    /// Example:
89    /// Starting with a self.sorted of
90    /// [2, 3, 4, 5, 7, 8]
91    /// We then call insert_and_calculate(6)
92    /// Let's assume that '3' is the oldest value. This makes 'remove_index' = 1
93    /// We search for where to insert our value '6' and its' index 3.
94    /// [2, 3, 4, 5, 7, 8] <- remove_index = 1, insert_index = 3
95    /// Shift the data between 1 and 3 over by one.
96    /// [2, 4, 5, 5, 7, 8]
97    /// Insert our value into index 3.
98    /// [2, 4, 5, 6, 7, 8]
99    ///
100    /// A similar approach is performed in the case of the insert_index being before
101    /// the remove index.
102    ///
103    /// Unsafe is used here to dramatically improve performance - a full 3-5x
104    pub fn insert_and_calculate(&mut self, value: u32) -> u32 {
105        let mut scratch_space: [u32; 64] = unsafe { uninitialized() };
106
107        let removed = self.data.pop_front().unwrap();
108        let _ = self.data.push_back(value);  // If we pop_front, push_back can never fail
109
110        if removed == value {
111            return self.sorted[31];
112        }
113
114        let remove_index = binary_search(&self.sorted, &removed);
115
116        // If removed is larger than value than the remove_index must be
117        // after the insert_index, allowing us to cut our search down
118        let insert_index = {
119            if removed > value {
120                let sorted_slice = &self.sorted[..remove_index];
121                binary_search(sorted_slice, &value)
122            } else {
123                let sorted_slice = &self.sorted[remove_index..];
124                remove_index + binary_search(sorted_slice, &value)
125            }
126        };
127
128        // shift the data between remove_index and insert_index so that the
129        // value of remove_index is overwritten and the 'value' can be placed
130        // in the gap between them
131
132        if remove_index < insert_index {
133            // Starting with a self.sorted of
134            // [2, 3, 4, 5, 7, 8]
135            // insert_and_calculate(6)
136            // [2, 3, 4, 5, 7, 8] <- remove_index = 1, insert_index = 3
137            // [2, 4, 5, 5, 7, 8]
138            // [2, 4, 5, 6, 7, 8]
139
140            scratch_space[remove_index + 1..insert_index]
141                .copy_from_slice(&self.sorted[remove_index + 1..insert_index]);
142
143            self.sorted[remove_index..insert_index - 1]
144                .copy_from_slice(&scratch_space[remove_index + 1..insert_index]);
145
146            self.sorted[insert_index - 1] = value;
147
148        } else {
149            // Starting with a self.sorted of
150            // [2, 3, 4, 5, 7, 8, 9]
151            // insert_and_calculate(6)
152            // [2, 3, 4, 5, 7, 8, 9] <- remove_index = 5, insert_index = 3
153            // [2, 3, 4, 5, 5, 7, 9] Shift values
154            // [2, 3, 4, 6, 7, 8, 9] Insert value
155            scratch_space[insert_index..remove_index]
156                .copy_from_slice(&self.sorted[insert_index..remove_index]);
157
158            self.sorted[insert_index + 1..remove_index + 1]
159                .copy_from_slice(&scratch_space[insert_index..remove_index]);
160
161            self.sorted[insert_index] = value;
162
163        }
164
165        let median = self.sorted[31];
166        self.last_median = median;
167        median
168    }
169}
170
171fn binary_search<T>(t: &[T], x: &T) -> usize where T: Ord {
172    binary_search_by(t, |p| p.cmp(x))
173}
174
175// A custom binary search that always returns a usize, showing where an item is or
176// where an item can be inserted to preserve sorted order
177// Since we have no use for differentiating between the two cases, a single usize
178// is sufficient.
179fn binary_search_by<T, F>(t: &[T], mut f: F) -> usize
180    where F: FnMut(&T) -> Ordering
181{
182    let mut base = 0usize;
183    let mut s = t;
184
185    loop {
186        let (head, tail) = s.split_at(s.len() >> 1);
187        if tail.is_empty() {
188            return base;
189        }
190        match f(&tail[0]) {
191            Ordering::Less => {
192                base += head.len() + 1;
193                s = &tail[1..];
194            }
195            Ordering::Greater => s = head,
196            Ordering::Equal => return base + head.len(),
197        }
198    }
199}
200
201#[cfg(test)]
202mod test {
203    use super::*;
204    use xorshift::{Xoroshiro128, Rng, SeedableRng};
205    use std::time::{SystemTime, UNIX_EPOCH};
206    use std::time::Duration;
207
208    const NANOS_PER_MILLI: u32 = 1000_000;
209    const MILLIS_PER_SEC: u64 = 1000;
210
211    pub fn millis(d: Duration) -> u64 {
212        // A proper Duration will not overflow, because MIN and MAX are defined
213        // such that the range is exactly i64 milliseconds.
214        let secs_part = d.as_secs() * MILLIS_PER_SEC;
215        let nanos_part = d.subsec_nanos() / NANOS_PER_MILLI;
216        secs_part + nanos_part as u64
217    }
218
219    #[test]
220    fn test_median_random() {
221        let t = millis(SystemTime::now().duration_since(UNIX_EPOCH).unwrap());
222        let mut rng = Xoroshiro128::from_seed(&[t, 71, 1223]);
223
224        let mut median_tracker = StreamingMedian::new(123_000);
225        for _ in 0..100_000 {
226            median_tracker.insert_and_calculate(rng.gen());
227        }
228
229        for i in median_tracker.sorted.windows(2) {
230            assert!(i[0] <= i[1]);
231        }
232    }
233
234    #[test]
235    fn test_median_ascending() {
236        let mut median_tracker = StreamingMedian::new(123_000);
237
238        let mut ascending_iter = 0..;
239        for _ in 0..100_000 {
240            median_tracker.insert_and_calculate(ascending_iter.next().unwrap());
241        }
242
243        for i in median_tracker.sorted.windows(2) {
244            assert!(i[0] <= i[1]);
245        }
246    }
247
248    #[test]
249    fn test_median_descending() {
250        let mut median_tracker = StreamingMedian::new(123_000);
251
252        let mut ascending_iter = 200_000..;
253        for _ in 0..100_000 {
254            median_tracker.insert_and_calculate(ascending_iter.next().unwrap());
255        }
256
257        for i in median_tracker.sorted.windows(2) {
258            assert!(i[0] <= i[1]);
259        }
260    }
261
262    #[test]
263    fn test_poison_absence() {
264        let mut median_tracker = StreamingMedian::new(123_000);
265
266        for _ in 0..64 {
267            median_tracker.insert_and_calculate(1);
268        }
269
270        for i in median_tracker.sorted.iter() {
271            assert_ne!(*i, 123_000);
272        }
273    }
274
275    quickcheck! {
276        fn maintains_sorted(default: u32, input: u32) -> bool {
277            let mut median_tracker = StreamingMedian::new(default   );
278            median_tracker.insert_and_calculate(input);
279
280            for i in median_tracker.sorted.windows(2) {
281                if i[0] > i[1] {
282                    return false
283                }
284            }
285            true
286        }
287    }
288}