incr/lib.rs
1//! Simple, fast and self-contained structs for tracking whether a newly observed
2//! value (`u64`) is greater than the largest previously observed value.
3//!
4//! Use cases include message sequence numbers, timestamps, and other situations
5//! that present a need to quickly assess whether incoming data is "new", i.e. its
6//! numbering is larger than any previous value.
7//!
8//! All of the structs include an `is_new` function that returns `true` if the
9//! passed value is a new maximum, while simultaneously storing the new value to
10//! check against future values.
11//!
12//! Two of the `is_new` implementations (`Incr` and `Map`) require an `&mut self`
13//! signature, while `RcIncr` and `AtomicIncr` and `AtomicMap` require only `&self`
14//! due to `RcIncr`'s interior mutability and `AtomicIncr`'s thread safe syncrhonization.
15//!
16//! The cost of checking a new value is minimal: 0-2ns for the single-threaded
17//! implementations, and ~5-10ns for `AtomicIncr`, except in cases of pathological
18//! contention. In a worst-case, nightmare scenrio benchmark for the `AtomicIncr`,
19//! it's possible to induce delays of hundreds of nanoseconds. A more realistic
20//! case of 24 threads contending to increment the atomic but yielding each iteration
21//! resulted in checks in the ~5-10ns range.
22//!
23//! Enabling the "nightly" feature (on by default) allows the use of `AtomicU64`
24//! as the backing storage for `AtomicIncr` (vs. `AtomicUsize` otherwise). Also,
25//! nightly is required to run the benchmarks.
26//!
27//! Enabling the "fnv" feature will use a Fowler-Noll-Vo hash function from the `fnv`
28//! crate for the `HashMap` used by `Map` and `AtomicMap`. FNV is faster than the
29//! default hasher but provides no protection against malicious inputs.
30//!
31#![cfg_attr(feature = "nightly", feature(test))]
32
33#[cfg(all(test, feature = "nightly"))]
34extern crate test;
35#[cfg(feature = "fnv")]
36extern crate fnv;
37
38#[cfg(not(feature = "fnv"))]
39use std::collections;
40use std::hash::Hash;
41use std::cmp;
42use std::rc::Rc;
43use std::cell::Cell;
44use std::sync::Arc;
45use std::sync::atomic::Ordering;
46use std::borrow::Borrow;
47#[cfg(feature = "nightly")]
48use std::sync::atomic::AtomicU64;
49#[cfg(not(feature = "nightly"))]
50use std::sync::atomic::AtomicUsize;
51
52#[cfg(feature = "nightly")]
53type Atomic = AtomicU64;
54#[cfg(not(feature = "nightly"))]
55type Atomic = AtomicUsize;
56#[cfg(feature = "fnv")]
57type HashMap<K, V> = fnv::FnvHashMap<K, V>;
58#[cfg(not(feature = "fnv"))]
59type HashMap<K, V> = collections::HashMap<K, V>;
60
61/// A self-contained struct for quickly checking whether a newly observed value
62/// is greater than any previously observed value.
63///
64/// # Examples
65///
66/// ```
67/// use incr::Incr;
68/// let mut last = Incr::default();
69/// assert_eq!(last.is_new(1), true);
70/// assert_eq!(last.is_new(1), false);
71/// assert_eq!(last.is_new(2), true);
72/// assert_eq!(last.get(), 2);
73/// ```
74///
75#[derive(Default, Clone, PartialEq, PartialOrd, Eq, Debug)]
76pub struct Incr(u64);
77
78/// A map interface allowing fast checks of whether a newly observed value
79/// is greater than any previously observed value for a given key.
80///
81/// The inner map is a `HashMap<K, u64>`. The signature of `Map`'s `is_new` is
82/// unique from the others in also requiring a key.
83///
84/// # Examples
85///
86/// ```
87/// use incr::Map;
88/// let mut last: Map<&'static str> = Default::default();
89/// assert_eq!(last.is_new("a", 1), true);
90/// assert_eq!(last.is_new("b", 1), true);
91/// assert_eq!(last.is_new("a", 1), false);
92/// assert_eq!(last.is_new("b", 3), true);
93/// assert_eq!(last.is_new("c", 1), true);
94/// assert_eq!(last.is_new("c", 1), false);
95/// assert_eq!(last.get(&"b"), 3);
96/// assert_eq!(last.get(&"not a key"), 0);
97/// ```
98///
99#[derive(Default, Clone, PartialEq, Debug)]
100pub struct Map<K: Eq + Hash>(HashMap<K, u64>);
101
102/// The `Rc<Cell<_>>` backed storage of `RcIncr` provides flexibility in situations
103/// where the counter must be shared among several disparate objects/contexts while
104/// retaining consistentcy between all the references of the count. `RcIncr` is not
105/// threadsafe, and even in single-threaded code `Rc<Cell<T>>` has some tricky edge
106/// cases, for instance if a `Cell<T>` is used as the key to a hash map and the
107/// interior value mutated (fair warning).
108///
109/// # Examples
110///
111/// ```
112/// use incr::RcIncr;
113/// let mut last = RcIncr::default();
114/// assert_eq!(last.is_new(1), true);
115/// let mut xs = Vec::new();
116/// for i in 2..5 {
117/// xs.push(last.clone());
118/// xs.last().unwrap().is_new(i);
119/// }
120/// assert_eq!(last.get(), 4);
121/// for x in &xs {
122/// assert_eq!(x.get(), 4);
123/// }
124/// ```
125///
126#[derive(Default, Clone, PartialEq, PartialOrd, Eq, Debug)]
127pub struct RcIncr(Rc<Cell<u64>>);
128
129/// `AtomicIncr` is a threadsafe, yet very fast counter, utilizing compare
130/// and swap instructions to provide speed and safety in the same package.
131/// There are some cases where 5ns matters. But in many, many other
132/// situations, it's a perfectly good decision to just use the `AtomicIncr`,
133/// knowing it can handle anything, and move on to other problems.
134///
135/// # Examples
136///
137/// ```
138/// use std::thread::{spawn, JoinHandle};
139/// use std::sync::{Arc, Barrier};
140/// use incr::AtomicIncr;
141///
142/// let last: AtomicIncr = Default::default();
143/// let barrier = Arc::new(Barrier::new(2));
144/// let thread: JoinHandle<u64> = {
145/// let barrier = Arc::clone(&barrier);
146/// let last = last.clone();
147/// spawn(move || {
148/// assert_eq!(last.is_new(2), true);
149/// assert_eq!(last.is_new(3), true);
150/// assert_eq!(last.is_new(3), false);
151/// barrier.wait();
152/// last.get()
153/// })
154/// };
155/// barrier.wait();
156/// assert_eq!(last.is_new(3), false);
157/// assert_eq!(thread.join().unwrap(), 3);
158/// ```
159///
160/// It's also possible to access the inner `Arc<AtomicU64>` by consuming the
161/// outer wrapper:
162///
163/// ```
164/// # #![cfg_attr(feature = "nightly", feature(integer_atomics))]
165/// # use std::sync::atomic::*;
166/// # use std::sync::Arc;
167/// # use std::thread;
168/// # #[cfg(feature = "nightly")]
169/// # use std::sync::atomic::AtomicU64;
170/// # #[cfg(not(feature = "nightly"))]
171/// # use std::sync::atomic::AtomicUsize;
172/// use incr::AtomicIncr;
173///
174/// #[cfg(feature = "nightly")]
175/// type Atomic = AtomicU64;
176/// #[cfg(not(feature = "nightly"))]
177/// type Atomic = AtomicUsize;
178///
179/// let stop = Arc::new(AtomicBool::new(false));
180/// let last: AtomicIncr = Default::default();
181/// let mut threads = Vec::new();
182/// for _ in 0..5 {
183/// let val: Arc<Atomic> = last.clone().into_inner();
184/// let stop = Arc::clone(&stop);
185/// threads.push(thread::spawn(move || {
186/// loop {
187/// val.fetch_add(1, Ordering::Relaxed);
188/// thread::yield_now();
189/// if stop.load(Ordering::Relaxed) { break }
190/// }
191/// }));
192/// }
193///
194/// let mut i = 1;
195///
196/// for _ in 0..100 {
197/// i = match last.is_new(i) {
198/// true => i + 1,
199/// false => i.max(last.get()),
200/// };
201/// }
202/// stop.store(true, Ordering::SeqCst);
203/// ```
204///
205#[derive(Default, Clone, Debug)]
206pub struct AtomicIncr(Arc<Atomic>);
207
208/// Like `Map`, `AtomicMap` provides simple, fast sequence checking by key, but with
209/// the thread-safe backing storage of `AtomicIncr`.
210///
211/// # Tradeoffs
212///
213/// `AtomicMap` is not a concurrent hashmap. Importantly **key insertion is not
214/// synchronized**. The intended use case is to initialize the map fully on program start,
215/// inserting whatever keys will be used throughout its life, and cloning this master
216/// instance to be used by any threads tracking sequences by those keys.
217///
218/// A fully synchronized map was not chosen for performance reasons. If keys
219/// are not fully known when threads are launched, the best options include:
220///
221/// - wrap a normal `Map` in an `Arc<Mutex<Map>>` or `Arc<RwLock<Map>>`,
222/// - utilize a third-party crate providing a concurrent hashmap implementation
223/// (with `Incr` values).
224///
225/// For a given (already inserted) key, any `clone()`d `AtomicMap` will use/have a
226/// value at that key that *is* synchronized across threads (the inner value is
227/// an `Arc<AtomicU64>`).
228///
229/// # Examples
230///
231/// ```
232/// use incr::AtomicMap;
233///
234/// let mut last: AtomicMap<&'static str> = Default::default();
235///
236/// assert_eq!(last.insert("a", 1), true);
237/// assert_eq!(last.is_new("missing_key", 1), false); // note difference from `Map`
238/// assert_eq!(last.insert("b", 1), true);
239/// assert_eq!(last.is_new("a", 1), false);
240/// assert_eq!(last.is_new("b", 3), true);
241/// assert_eq!(last.is_new_or_insert("c", 1), true);
242/// assert_eq!(last.is_new("c", 1), false);
243/// assert_eq!(last.get("b"), 3);
244/// assert_eq!(last.get("not a key"), 0);
245/// ```
246///
247#[derive(Default, Clone, Debug)]
248pub struct AtomicMap<K: Eq + Hash>(HashMap<K, AtomicIncr>);
249
250impl Incr {
251 /// Returns `true` if `val` is greater than the highest previously observed
252 /// value. If `val` is a new maximum, it is stored in `self` for checks against
253 /// future values subsequent calls `Self::get(&self)` will return `val` until a
254 /// new max is observed.
255 ///
256 pub fn is_new(&mut self, val: u64) -> bool {
257 if val > self.0 {
258 self.0 = val;
259 true
260 } else {
261 false
262 }
263 }
264
265 /// Returns the current value of `self`, which is the maximum observed value.
266 pub fn get(&self) -> u64 { self.0 }
267}
268
269impl<K> Map<K>
270 where K: Eq + Hash
271{
272 /// Returns `true` if `val` is greater than the highest observed value at
273 /// `key`. If `key` does not exist, inserts `val` at `key` and returns `true`.
274 pub fn is_new(&mut self, k: K, val: u64) -> bool {
275 let prev = self.0.entry(k).or_insert(0);
276 if val > *prev {
277 *prev = val;
278 true
279 } else {
280 false
281 }
282 }
283
284 /// Returns the highest observed value at `key`, or, if `key` does not exist,
285 /// returns `0`.
286 pub fn get<Q>(&self, key: &Q) -> u64
287 where K: Borrow<Q>,
288 Q: ?Sized + Hash + Eq
289 {
290 self.0.get(key)
291 .cloned()
292 .unwrap_or(0)
293 }
294
295 pub fn contains_key<Q>(&self, key: &Q) -> bool
296 where K: Borrow<Q>,
297 Q: ?Sized + Hash + Eq
298 {
299 self.0.contains_key(key)
300 }
301
302 pub fn len(&self) -> usize { self.0.len() }
303
304 pub fn is_empty(&self) -> bool { self.0.is_empty() }
305}
306
307impl RcIncr {
308 /// Returns `true` if `val` is greater than the highest previously observed
309 /// value. If `val` is a new maximum, it is stored in `self` for checks against
310 /// future values subsequent calls `Self::get(&self)` will return `val` until a
311 /// new max is observed.
312 ///
313 pub fn is_new(&self, val: u64) -> bool {
314 if val > self.get() {
315 self.0.set(val);
316 true
317 } else {
318 false
319 }
320 }
321
322 /// Returns the current value of `self`, which is the maximum observed value.
323 pub fn get(&self) -> u64 { self.0.get() }
324}
325
326impl AtomicIncr {
327 /// Returns `true` if `val` is greater than the highest previously observed
328 /// value. If `val` is a new maximum, it is stored in `self` for checks against
329 /// future values subsequent calls `Self::get(&self)` will return `val` until a
330 /// new max is observed.
331 ///
332 pub fn is_new(&self, val: u64) -> bool {
333 let mut gt = false;
334
335 #[cfg(not(feature = "nightly"))]
336 let val = val as usize;
337
338 loop {
339 let prev = self.0.load(Ordering::Acquire);
340 if val > prev {
341 if let Ok(_) = self.0.compare_exchange(prev, val, Ordering::AcqRel, Ordering::Acquire) {
342 gt = true;
343 break
344 }
345 } else {
346 break
347 }
348 }
349 gt
350 }
351
352 /// Returns the current value of `self`, which is the maximum observed value.
353 pub fn get(&self) -> u64 {
354 self.0.load(Ordering::Acquire) as u64
355 }
356
357 /// Consumes the outer struct, returning the inner `Arc<Atomic>`.
358 pub fn into_inner(self) -> Arc<Atomic> {
359 self.0
360 }
361}
362
363impl<K> AtomicMap<K>
364 where K: Eq + Hash
365{
366 /// Returns `true` if `key` exists and `val` is greater than the largest
367 /// previously observed value (for `key`). Returns `false` if `key` does
368 /// not exist in the inner map. See `AtomicMap::check_or_insert` for a function
369 /// that behaves similarly to `Map::is_new`.
370 ///
371 /// # Tradeoffs
372 ///
373 /// This function has a different signature and works differently than
374 /// `Map::is_new`.
375 ///
376 /// Specifically, `Map::is_new`:
377 ///
378 /// - takes `&mut self`
379 /// - consumes `key`
380 /// - inserts `val` at `key` if `key` was not already present in the map.
381 ///
382 /// By contrast, `AtomicIncr`:
383 ///
384 /// - takes `&self`
385 /// - borrows `&key`
386 /// - does not insert `val` on a key miss, instead "silently" returning `false`
387 ///
388 /// This design was chosen for several reasons, including:
389 ///
390 /// - key insertions are not synchronized across threads. Instead, the map is
391 /// expected to have been initialized on program start, and a key miss is most
392 /// likely an error
393 /// - A `&self` signature provides more flexibility, and is possible, unlike with
394 /// `Map`, because the `AtomicIncr::is_new` function takes `&self`
395 ///
396 /// The `AtomicMap::check_or_insert` function provides insert-on-key-miss
397 /// functionality if desired.
398 ///
399 /// Possibly, it would be less confusing if this function returned `Option<bool>`,
400 /// where a key miss would return `None`. Feedback on this issue would be
401 /// appreciated.
402 ///
403 /// # Examples
404 ///
405 /// ```
406 /// # extern crate incr;
407 /// use incr::AtomicMap;
408 /// # fn main() {
409 /// let mut last: AtomicMap<&'static str> = Default::default();
410 ///
411 /// assert_eq!(last.is_new("a", 1), false);
412 /// assert_eq!(last.contains_key("a"), false);
413 /// assert_eq!(last.is_new_or_insert("a", 1), true);
414 /// assert_eq!(last.get("a"), 1);
415 /// assert_eq!(last.insert("b", 1), true);
416 /// assert_eq!(last.is_new("b", 2), true);
417 /// assert_eq!(last.is_new("b", 2), false);
418 /// # }
419 /// ```
420 ///
421 pub fn is_new<Q>(&self, key: &Q, val: u64) -> bool
422 where K: Borrow<Q>,
423 Q: ?Sized + Hash + Eq
424 {
425 self.0.get(key)
426 .map(move |x| x.is_new(val))
427 .unwrap_or(false)
428 }
429
430 /// Like `is_new`, but inserts `val` at `key` if the inner map did not
431 /// previously contain `key`.
432 ///
433 /// This may be renamed to `check_or_insert` in the future.
434 pub fn is_new_or_insert(&mut self, key: K, val: u64) -> bool {
435 self.0.entry(key)
436 .or_insert_with(Default::default)
437 .is_new(val)
438 }
439
440 /// An alias for, and Works identically to, `is_new_or_insert`. It's
441 /// not possible, using the public api, to decrease the value at a given
442 /// key, so calling this with a `val` lower than the current value
443 /// would simply return `false` and leave the higher value in the map
444 /// unchanged.
445 pub fn insert(&mut self, key: K, val: u64) -> bool {
446 self.is_new_or_insert(key, val)
447 }
448
449 /// Returns the highest observed value at `key`, or, if `key` does not exist,
450 /// returns `0`.
451 pub fn get<Q>(&self, key: &Q) -> u64
452 where K: Borrow<Q>,
453 Q: ?Sized + Hash + Eq
454 {
455 self.0.get(key)
456 .map(|x| x.get())
457 .unwrap_or(0)
458 }
459
460 pub fn contains_key<Q>(&self, key: &Q) -> bool
461 where K: Borrow<Q>,
462 Q: ?Sized + Hash + Eq
463 {
464 self.0.contains_key(key)
465 }
466
467 pub fn len(&self) -> usize { self.0.len() }
468
469 pub fn is_empty(&self) -> bool { self.0.is_empty() }
470}
471
472impl PartialEq for AtomicIncr {
473 fn eq(&self, other: &Self) -> bool {
474 self.get() == other.get()
475 }
476}
477
478impl PartialOrd for AtomicIncr {
479 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
480 Some(self.get().cmp(&other.get()))
481 }
482}
483
484impl Eq for AtomicIncr {}
485
486impl From<u64> for Incr {
487 fn from(val: u64) -> Self {
488 Incr(val)
489 }
490}
491
492impl From<u64> for RcIncr {
493 fn from(val: u64) -> Self {
494 RcIncr(Rc::new(Cell::new(val)))
495 }
496}
497
498impl From<u64> for AtomicIncr {
499 fn from(val: u64) -> Self {
500 AtomicIncr(Arc::new(Atomic::new(val)))
501 }
502}
503
504#[allow(unused)]
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use std::sync::atomic::AtomicBool;
509 use std::thread;
510 #[cfg(feature = "nightly")]
511 use test::Bencher;
512
513 #[test]
514 fn atomic_map_key_ergonomics() {
515 let mut last: AtomicMap<String> = Default::default();
516 let a = String::from("a");
517 last.insert(a.clone(), 10);
518 assert_eq!(last.get(&a), 10);
519
520 let mut last: AtomicMap<&'static str> = Default::default();
521 last.insert("a", 11);
522 assert_eq!(last.get("a"), 11);
523
524 let mut last: AtomicMap<u64> = Default::default();
525 last.insert(1, 12);
526 assert_eq!(last.get(&1), 12);
527 }
528
529 macro_rules! stairway_to_heaven {
530 ($f:ident, $t:ident) => {
531 #[test]
532 fn $f() {
533 let mut last: $t = Default::default();
534 for i in 1..1_000_000u64 {
535 assert!(last.is_new(i), "i = {}", i);
536 }
537 }
538 }
539 }
540
541 stairway_to_heaven!(all_true_to_one_million_incr, Incr);
542 stairway_to_heaven!(all_true_to_one_million_rc_incr, RcIncr);
543 stairway_to_heaven!(all_true_to_one_million_atomic_incr, AtomicIncr);
544
545 macro_rules! purgatory {
546 ($f:ident, $t:ident) => {
547 #[test]
548 fn $f() {
549 let mut last: $t = Default::default();
550 for _ in 1..1_000_000u64 {
551 assert!(!last.is_new(0), "i = {}", 0);
552 }
553 }
554 }
555 }
556
557 purgatory!(never_true_one_million_times_incr, Incr);
558 purgatory!(never_true_one_million_times_rc_incr, RcIncr);
559 purgatory!(never_true_one_million_times_atomic_incr, AtomicIncr);
560
561 macro_rules! stairway_to_heaven_bench {
562 ($f:ident, $t:ident) => {
563 #[cfg(feature = "nightly")]
564 #[bench]
565 fn $f(b: &mut Bencher) {
566 let mut last: $t = Default::default();
567 let mut i = 1;
568 b.iter(|| {
569 i += 1;
570 last.is_new(i - 1)
571 })
572 }
573 }
574 }
575
576 stairway_to_heaven_bench!(always_increasing_bench_incr, Incr);
577 stairway_to_heaven_bench!(always_increasing_bench_rc_incr, RcIncr);
578 stairway_to_heaven_bench!(always_increasing_bench_atomic_incr, AtomicIncr);
579
580 macro_rules! purgatory_bench {
581 ($f:ident, $t:ident) => {
582 #[cfg(feature = "nightly")]
583 #[bench]
584 fn $f(b: &mut Bencher) {
585 let mut last: $t = Default::default();
586 b.iter(|| {
587 last.is_new(0)
588 })
589 }
590 }
591 }
592
593 purgatory_bench!(never_incr_bench_incr, Incr);
594 purgatory_bench!(never_incr_bench_rc_incr, RcIncr);
595 purgatory_bench!(never_incr_bench_atomic_incr, AtomicIncr);
596
597 #[cfg(feature = "nightly")]
598 #[bench]
599 fn atomic_incr_nightmare_scenario_except_threads_yield_each_iteration(b: &mut Bencher) {
600 let n = 24;
601 let stop = Arc::new(AtomicBool::new(false));
602 let last: AtomicIncr = Default::default();
603 let mut threads = Vec::new();
604 for _ in 0..n {
605 let val = last.clone().into_inner();
606 let stop = Arc::clone(&stop);
607 threads.push(thread::spawn(move || {
608 loop {
609 val.fetch_add(1, Ordering::Relaxed);
610 thread::yield_now();
611 if stop.load(Ordering::Relaxed) { break }
612 }
613 }));
614 }
615
616 let mut i = 1;
617
618 b.iter(|| {
619 let is_new = last.is_new(i);
620 i = match is_new {
621 true => i + 1,
622 false => i.max(last.get()),
623 };
624 is_new
625 });
626 stop.store(true, Ordering::SeqCst);
627 }
628
629 #[cfg(feature = "nightly")]
630 #[bench]
631 fn im_your_worst_nightmare(b: &mut Bencher) {
632 let n = 24;
633 let stop = Arc::new(AtomicBool::new(false));
634 let last: AtomicIncr = Default::default();
635 let mut threads = Vec::new();
636 for _ in 0..n {
637 let val = last.clone().into_inner();
638 let stop = Arc::clone(&stop);
639 threads.push(thread::spawn(move || {
640 loop {
641 val.fetch_add(1, Ordering::Relaxed);
642 if stop.load(Ordering::Relaxed) { break }
643 }
644 }));
645 }
646
647 let mut i = 1;
648
649 b.iter(|| {
650 let is_new = last.is_new(i);
651 i = match is_new {
652 true => i + 1,
653 false => i.max(last.get()),
654 };
655 is_new
656 });
657 stop.store(true, Ordering::SeqCst);
658 }
659
660 #[test]
661 fn check_atomic_incr_default_initial_value() {
662 let last = AtomicIncr::default();
663 assert_eq!(last.get(), 0);
664 }
665}