watermark/
lib.rs

1//! A simple watermarking set.
2//!
3//! A watermarking set holds any integer values, and supports two operations:
4//! 
5//! - insert(element: T)
6//!   - Inserts an item into the set
7//! - contains(element: T)
8//!   - Checks whether an item has previously been added to the set
9//!
10//! A watermark set works best when the "inserts" *all* happen, and happen "mostly"
11//! in order. For example, when keeping track of which message IDs have been seen.
12//!
13//! # Example
14//!
15//! To make a simple idempotent data processor:
16//!
17//! ```rust
18//!
19//! struct message {
20//!   id: u32,
21//!   data: u64,
22//! }
23//!
24//! let message_bus = vec![
25//!   message { id: 1, data: 2 },
26//!   message { id: 2, data: 3 },
27//!   message { id: 1, data: 2 }
28//! ];
29//!
30//! let mut ws = watermark::WatermarkSet::default();
31//! for message in message_bus {
32//!   if !ws.contains(message.id) {
33//!     ws.insert(message.id);
34//!     // Do some work with message.data
35//!   }
36//! }
37//! ```
38//!                         
39//! # Operation
40//! 
41//! Internally, a watermarking set contains a "watermark" and a bitvector of
42//! "recently added" items.  The watermark guarantees that all items below
43//! that number have been seen, and the recently added items handles everything
44//! else.  This means that if all elements eventually get added, memory usage
45//! is kept very low and membership tests are very very cheap.
46
47extern crate num;
48
49use std::collections::VecDeque;
50use num::{Integer, CheckedAdd, CheckedSub, FromPrimitive, ToPrimitive};
51
52/// A watermarking set
53/// 
54/// Allows insert and contains operations.
55#[derive(Default)]
56pub struct WatermarkSet<T> {
57    pub watermark: T,
58    pub recently_added: VecDeque<u64>,
59}
60
61impl <T: Integer + CheckedSub + ToPrimitive> WatermarkSet<T> {
62    fn bucket_and_offset(&self, elem: T) -> (usize, usize) {
63        let diff = elem.checked_sub(&self.watermark).unwrap();
64        let diff: usize = diff.to_usize().unwrap();
65        // We use u64s as bitmasks for elements "just above" the water
66        // so figure out which u64 the element belongs in
67        let bucket = diff / 64;
68        // And within that u64, which bit corresponds to the element
69        let offset = diff % 64;
70        return (bucket, offset);
71    }
72}
73
74impl <T> WatermarkSet<T> {
75    /// Create a new benchmarking set containing all elements less than the first
76    /// parameter.
77    /// ```
78    /// let wm = watermark::WatermarkSet::new(1385);
79    /// assert!(wm.contains(1384));
80    /// assert_eq!(wm.contains(1385), false);
81    /// assert_eq!(wm.contains(1386), false);
82    /// ```
83    pub fn new(watermark: T) -> WatermarkSet<T> {
84        WatermarkSet {
85            watermark: watermark,
86            recently_added: VecDeque::default(),
87        }
88    }
89}
90
91impl <T: Integer + CheckedSub + CheckedAdd + FromPrimitive + ToPrimitive> WatermarkSet<T> {
92    /// Insert an element to the collection
93    /// # Example
94    /// ```
95    /// let mut wm = watermark::WatermarkSet::default();
96    /// wm.insert(123);
97    /// ```
98    /// # Panics
99    ///
100    /// If the collection gets completely full, watermark may overflow the
101    /// bounds of T, resulting in an unwrap panic on a checked_add.
102    ///
103    pub fn insert(&mut self, elem: T) {
104        // It's already below the watermark, so do nothing
105        if self.watermark > elem {
106            return;
107        }
108
109        // Identify which bit we need to flip
110        let (bucket, offset) = self.bucket_and_offset(elem);
111
112        // make sure we have enough capacity for the bit we're about to set
113        if self.recently_added.len() <= bucket {
114            self.recently_added.resize(bucket + 1, 0);
115        }
116
117        // Flip the offset'th bit in the bucket to indicate this element
118        // has been added
119        self.recently_added[bucket] |= 1 << offset;
120
121        // Raise the water as far as we can
122        // If all the bits are set in the first bucket,
123        while !self.recently_added.is_empty() && self.recently_added[0] == !0u64 {
124            // We can pop it off (cheap, because VecDeque is a ring buffer)
125            self.recently_added.pop_front();
126            // And raise the watermark by 64
127            let stride = T::from_u8(64).unwrap();
128            self.watermark = self.watermark.checked_add(&stride).unwrap();
129        }
130    }
131}
132
133impl<T: Integer + ToPrimitive> WatermarkSet<T> {
134    /// Check how many items have been added to the collection
135    /// # Example
136    /// ```
137    /// let mut wm = watermark::WatermarkSet::default();
138    /// for i in 0..=63 {
139    ///     wm.insert(i);
140    /// }
141    /// for i in (64..80).step_by(3) {
142    ///     wm.insert(i);
143    /// }
144    /// assert_eq!(wm.size(), 64 + 6)
145    /// ```
146    pub fn size(&self) -> usize {
147        // Count anything that's submerged in the watermark,
148        let mut size: usize = self.watermark.to_usize().unwrap();
149        // Plus any bits set above the watermark
150        for bucket in &self.recently_added {
151            size += bucket.count_ones() as usize;
152        }
153        return size;
154    }
155}
156
157impl<T: Integer + CheckedSub + ToPrimitive> WatermarkSet<T> {
158    /// Check if an element has been added to the collection
159    /// # Example
160    /// ```
161    /// let mut wm = watermark::WatermarkSet::default();
162    /// wm.insert(1);
163    /// assert!(wm.contains(1));
164    /// assert_eq!(wm.contains(2), false);
165    /// ```
166    pub fn contains(&self, elem: T) -> bool {
167        // If asked about something below the waterline, return true
168        if self.watermark > elem {
169            return true;
170        }
171        // Find out which bit to check
172        let (bucket, offset) = self.bucket_and_offset(elem);
173        // If we don't have a bucket for it yet, return false
174        if self.recently_added.len() <= bucket {
175            return false;
176        }
177        // Otherwise, check if the bit is set
178        return self.recently_added[bucket] & (1 << offset) > 0;
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use std::convert::TryInto;
186    use rand::{thread_rng, Rng};
187
188    #[test]
189    fn can_create_as_default() {
190        let collection = WatermarkSet::default();
191        assert_eq!(collection.contains(1), false);
192        assert_eq!(collection.contains(0), false);
193    }
194    
195    #[test]
196    fn basic_operation() {
197        let mut collection = WatermarkSet::default();
198        collection.insert(1);
199        assert!(collection.contains(1));
200        assert_eq!(collection.contains(0), false);
201    }
202
203    #[test]
204    fn can_check_size() {
205        let mut collection = WatermarkSet::default();
206        let mut rng = thread_rng();
207        let mut expected_count = 0;
208        let mid = rng.gen_range(10,100);
209        for i in 0..mid {
210            collection.insert(i);
211            expected_count += 1;
212        }
213        let upper = rng.gen_range(mid,500);
214        let step = rng.gen_range(3,20);
215        for i in (mid..upper).step_by(step) {
216            collection.insert(i);
217            expected_count += 1;
218        }
219        assert_eq!(collection.size(), expected_count);
220    }
221
222    use std::panic;
223    #[test]
224    fn can_insert_many_with_good_watermarking() {
225        let mut collection = WatermarkSet::default();
226        // Insert the numbers 0 through 16383 (exclusive)
227        for i in 0..(1<<14) - 1 {
228            collection.insert(i);
229        }
230        // We should have been able to raise the watermark to 16320
231        assert!(collection.watermark == (1<<14) - 64);
232        // and have one entry tracking recent additions
233        assert!(collection.recently_added.len() == 1);
234        // And have all but the 64th bit flipped
235        assert!(collection.recently_added[0] == !(1 << 63));
236
237        // Add 16383 (the 16384th entry, starting at 0)
238        collection.insert((1<<14) - 1);
239
240        // This should have filled up the last bucket, causing
241        // us to raise the watermark
242        assert!(collection.watermark == (1<<14));
243        assert!(collection.recently_added.len() == 0);
244    }
245
246    #[test]
247    fn can_insert_slightly_out_of_order() {
248        // Generate a list of IDs we're going to insert
249        // that are "mostly" in order
250        let mut items: Vec<u32> = (0..1<<12).collect();
251        let mut rng = thread_rng();
252        for i in 0..items.len() {
253            // If we've swapped this item forward, leave it here
254            if items[i] != i.try_into().unwrap() {
255                continue;
256            }
257            // Swapping items with elements within 100 units of them
258            // means that the whole list will be "mostly" in order
259            // and items will be within 100 units of where they "should" be
260            let mut offset: i32 = rng.gen_range(-100, 100);
261            let idx: i32 = i.try_into().unwrap();
262            let count: i32 = items.len().try_into().unwrap();
263            if idx + offset < 0 {
264                offset = 0;
265            } else if idx + offset > count - 1 {
266                offset = (count - 1) - idx;
267            }
268            let j: usize = (idx + offset).try_into().unwrap();
269            items.swap(i, j);
270        }
271
272        let mut coll = WatermarkSet::default();
273        // Now, insert each item
274        for item in items {
275            coll.insert(item);
276        }
277        // And check that we watermarked correctly to this point
278        assert!(coll.watermark == (1<<12));
279        assert!(coll.recently_added.len() == 0);
280    }
281
282    use num::{Num, BigUint};
283    #[test]
284    fn should_support_other_integer_types() {
285        {
286            let mut coll: WatermarkSet<u8> = WatermarkSet::default();
287            // Make sure not to try 255, as that causes us to raise the
288            // watermark out of bounds
289            for i in 0u8..255u8 {
290                coll.insert(i);
291                assert!(coll.contains(i));
292            }
293        }
294        {
295            let start = BigUint::from_str_radix(
296                "10000000000000000000000000000",
297                10
298            ).unwrap();
299            let mut coll: WatermarkSet<BigUint> = WatermarkSet {
300                watermark: start.clone(),
301                recently_added: VecDeque::default()
302            };
303            for i in 0u8..255u8 {
304                let elem = start.clone() + BigUint::from(i); 
305                coll.insert(elem.clone());
306                assert!(coll.contains(elem.clone()));
307            }
308        }
309    }
310}