watermark/lib.rs
1//! A simple watermarking set.
2//!
3//! A watermarking set holds any integer values, and supports two operations:
4//!
5//! - insert(element: T)
6//! - Inserts an item into the set
7//! - contains(element: T)
8//! - Checks whether an item has previously been added to the set
9//!
10//! A watermark set works best when the "inserts" *all* happen, and happen "mostly"
11//! in order. For example, when keeping track of which message IDs have been seen.
12//!
13//! # Example
14//!
15//! To make a simple idempotent data processor:
16//!
17//! ```rust
18//!
19//! struct message {
20//! id: u32,
21//! data: u64,
22//! }
23//!
24//! let message_bus = vec![
25//! message { id: 1, data: 2 },
26//! message { id: 2, data: 3 },
27//! message { id: 1, data: 2 }
28//! ];
29//!
30//! let mut ws = watermark::WatermarkSet::default();
31//! for message in message_bus {
32//! if !ws.contains(message.id) {
33//! ws.insert(message.id);
34//! // Do some work with message.data
35//! }
36//! }
37//! ```
38//!
39//! # Operation
40//!
41//! Internally, a watermarking set contains a "watermark" and a bitvector of
42//! "recently added" items. The watermark guarantees that all items below
43//! that number have been seen, and the recently added items handles everything
44//! else. This means that if all elements eventually get added, memory usage
45//! is kept very low and membership tests are very very cheap.
46
47extern crate num;
48
49use std::collections::VecDeque;
50use num::{Integer, CheckedAdd, CheckedSub, FromPrimitive, ToPrimitive};
51
52/// A watermarking set
53///
54/// Allows insert and contains operations.
55#[derive(Default)]
56pub struct WatermarkSet<T> {
57 pub watermark: T,
58 pub recently_added: VecDeque<u64>,
59}
60
61impl <T: Integer + CheckedSub + ToPrimitive> WatermarkSet<T> {
62 fn bucket_and_offset(&self, elem: T) -> (usize, usize) {
63 let diff = elem.checked_sub(&self.watermark).unwrap();
64 let diff: usize = diff.to_usize().unwrap();
65 // We use u64s as bitmasks for elements "just above" the water
66 // so figure out which u64 the element belongs in
67 let bucket = diff / 64;
68 // And within that u64, which bit corresponds to the element
69 let offset = diff % 64;
70 return (bucket, offset);
71 }
72}
73
74impl <T> WatermarkSet<T> {
75 /// Create a new benchmarking set containing all elements less than the first
76 /// parameter.
77 /// ```
78 /// let wm = watermark::WatermarkSet::new(1385);
79 /// assert!(wm.contains(1384));
80 /// assert_eq!(wm.contains(1385), false);
81 /// assert_eq!(wm.contains(1386), false);
82 /// ```
83 pub fn new(watermark: T) -> WatermarkSet<T> {
84 WatermarkSet {
85 watermark: watermark,
86 recently_added: VecDeque::default(),
87 }
88 }
89}
90
91impl <T: Integer + CheckedSub + CheckedAdd + FromPrimitive + ToPrimitive> WatermarkSet<T> {
92 /// Insert an element to the collection
93 /// # Example
94 /// ```
95 /// let mut wm = watermark::WatermarkSet::default();
96 /// wm.insert(123);
97 /// ```
98 /// # Panics
99 ///
100 /// If the collection gets completely full, watermark may overflow the
101 /// bounds of T, resulting in an unwrap panic on a checked_add.
102 ///
103 pub fn insert(&mut self, elem: T) {
104 // It's already below the watermark, so do nothing
105 if self.watermark > elem {
106 return;
107 }
108
109 // Identify which bit we need to flip
110 let (bucket, offset) = self.bucket_and_offset(elem);
111
112 // make sure we have enough capacity for the bit we're about to set
113 if self.recently_added.len() <= bucket {
114 self.recently_added.resize(bucket + 1, 0);
115 }
116
117 // Flip the offset'th bit in the bucket to indicate this element
118 // has been added
119 self.recently_added[bucket] |= 1 << offset;
120
121 // Raise the water as far as we can
122 // If all the bits are set in the first bucket,
123 while !self.recently_added.is_empty() && self.recently_added[0] == !0u64 {
124 // We can pop it off (cheap, because VecDeque is a ring buffer)
125 self.recently_added.pop_front();
126 // And raise the watermark by 64
127 let stride = T::from_u8(64).unwrap();
128 self.watermark = self.watermark.checked_add(&stride).unwrap();
129 }
130 }
131}
132
133impl<T: Integer + ToPrimitive> WatermarkSet<T> {
134 /// Check how many items have been added to the collection
135 /// # Example
136 /// ```
137 /// let mut wm = watermark::WatermarkSet::default();
138 /// for i in 0..=63 {
139 /// wm.insert(i);
140 /// }
141 /// for i in (64..80).step_by(3) {
142 /// wm.insert(i);
143 /// }
144 /// assert_eq!(wm.size(), 64 + 6)
145 /// ```
146 pub fn size(&self) -> usize {
147 // Count anything that's submerged in the watermark,
148 let mut size: usize = self.watermark.to_usize().unwrap();
149 // Plus any bits set above the watermark
150 for bucket in &self.recently_added {
151 size += bucket.count_ones() as usize;
152 }
153 return size;
154 }
155}
156
157impl<T: Integer + CheckedSub + ToPrimitive> WatermarkSet<T> {
158 /// Check if an element has been added to the collection
159 /// # Example
160 /// ```
161 /// let mut wm = watermark::WatermarkSet::default();
162 /// wm.insert(1);
163 /// assert!(wm.contains(1));
164 /// assert_eq!(wm.contains(2), false);
165 /// ```
166 pub fn contains(&self, elem: T) -> bool {
167 // If asked about something below the waterline, return true
168 if self.watermark > elem {
169 return true;
170 }
171 // Find out which bit to check
172 let (bucket, offset) = self.bucket_and_offset(elem);
173 // If we don't have a bucket for it yet, return false
174 if self.recently_added.len() <= bucket {
175 return false;
176 }
177 // Otherwise, check if the bit is set
178 return self.recently_added[bucket] & (1 << offset) > 0;
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use std::convert::TryInto;
186 use rand::{thread_rng, Rng};
187
188 #[test]
189 fn can_create_as_default() {
190 let collection = WatermarkSet::default();
191 assert_eq!(collection.contains(1), false);
192 assert_eq!(collection.contains(0), false);
193 }
194
195 #[test]
196 fn basic_operation() {
197 let mut collection = WatermarkSet::default();
198 collection.insert(1);
199 assert!(collection.contains(1));
200 assert_eq!(collection.contains(0), false);
201 }
202
203 #[test]
204 fn can_check_size() {
205 let mut collection = WatermarkSet::default();
206 let mut rng = thread_rng();
207 let mut expected_count = 0;
208 let mid = rng.gen_range(10,100);
209 for i in 0..mid {
210 collection.insert(i);
211 expected_count += 1;
212 }
213 let upper = rng.gen_range(mid,500);
214 let step = rng.gen_range(3,20);
215 for i in (mid..upper).step_by(step) {
216 collection.insert(i);
217 expected_count += 1;
218 }
219 assert_eq!(collection.size(), expected_count);
220 }
221
222 use std::panic;
223 #[test]
224 fn can_insert_many_with_good_watermarking() {
225 let mut collection = WatermarkSet::default();
226 // Insert the numbers 0 through 16383 (exclusive)
227 for i in 0..(1<<14) - 1 {
228 collection.insert(i);
229 }
230 // We should have been able to raise the watermark to 16320
231 assert!(collection.watermark == (1<<14) - 64);
232 // and have one entry tracking recent additions
233 assert!(collection.recently_added.len() == 1);
234 // And have all but the 64th bit flipped
235 assert!(collection.recently_added[0] == !(1 << 63));
236
237 // Add 16383 (the 16384th entry, starting at 0)
238 collection.insert((1<<14) - 1);
239
240 // This should have filled up the last bucket, causing
241 // us to raise the watermark
242 assert!(collection.watermark == (1<<14));
243 assert!(collection.recently_added.len() == 0);
244 }
245
246 #[test]
247 fn can_insert_slightly_out_of_order() {
248 // Generate a list of IDs we're going to insert
249 // that are "mostly" in order
250 let mut items: Vec<u32> = (0..1<<12).collect();
251 let mut rng = thread_rng();
252 for i in 0..items.len() {
253 // If we've swapped this item forward, leave it here
254 if items[i] != i.try_into().unwrap() {
255 continue;
256 }
257 // Swapping items with elements within 100 units of them
258 // means that the whole list will be "mostly" in order
259 // and items will be within 100 units of where they "should" be
260 let mut offset: i32 = rng.gen_range(-100, 100);
261 let idx: i32 = i.try_into().unwrap();
262 let count: i32 = items.len().try_into().unwrap();
263 if idx + offset < 0 {
264 offset = 0;
265 } else if idx + offset > count - 1 {
266 offset = (count - 1) - idx;
267 }
268 let j: usize = (idx + offset).try_into().unwrap();
269 items.swap(i, j);
270 }
271
272 let mut coll = WatermarkSet::default();
273 // Now, insert each item
274 for item in items {
275 coll.insert(item);
276 }
277 // And check that we watermarked correctly to this point
278 assert!(coll.watermark == (1<<12));
279 assert!(coll.recently_added.len() == 0);
280 }
281
282 use num::{Num, BigUint};
283 #[test]
284 fn should_support_other_integer_types() {
285 {
286 let mut coll: WatermarkSet<u8> = WatermarkSet::default();
287 // Make sure not to try 255, as that causes us to raise the
288 // watermark out of bounds
289 for i in 0u8..255u8 {
290 coll.insert(i);
291 assert!(coll.contains(i));
292 }
293 }
294 {
295 let start = BigUint::from_str_radix(
296 "10000000000000000000000000000",
297 10
298 ).unwrap();
299 let mut coll: WatermarkSet<BigUint> = WatermarkSet {
300 watermark: start.clone(),
301 recently_added: VecDeque::default()
302 };
303 for i in 0u8..255u8 {
304 let elem = start.clone() + BigUint::from(i);
305 coll.insert(elem.clone());
306 assert!(coll.contains(elem.clone()));
307 }
308 }
309 }
310}