reservoir_sampler/lib.rs
1//! This crate only provide a "standard" trait about what a streaming sampler
2//! using the [Reservoir Algorithm] can do. In My opinion, given a `Whole`
3//! consists of a same type of `Item`, the sampler can decide whether it should
4//! sample an item when the item passes through, and no matter when, the sampler
5//! should know which samples it currently holds. When the sampler decided not
6//! to accept any new sample more, it can `lock` the result.
7//!
8//! [Reservoir Algorithm](https://en.wikipedia.org/wiki/Reservoir_sampling)
9use rand::random;
10
11pub trait ReservoirSampler {
12 /// Each sampler only processes the same type of items.
13 type Item;
14
15 /// A sampler processes exactly one item each time, for the items come in as
16 /// a stream.
17 ///
18 /// ## Return
19 /// the `sample` function return a tuple contains 3 elements:
20 /// - a `usize` stands for what random number the current item gets
21 /// - a `usize` stands for how many items has been passed through so far
22 /// - an option of item that is replaced by the current item.
23 fn sample(&mut self, it: Self::Item) -> (usize, usize, Option<Self::Item>);
24
25 /// A reservoir should know which items are held no matter if the sampling
26 /// process is finished.
27 fn samples(&self) -> &[Option<Self::Item>];
28
29 /// End the sampling process. Shuffling the order of the result is allowed.
30 fn lock(self) -> Vec<Option<Self::Item>>;
31}
32
33/// A `Reservoir` is a just a pool, but for random number generation, `total`
34/// items' count passed through is known.
35pub struct Reservoir<T> {
36 total: usize,
37 pool: Vec<Option<T>>,
38}
39
40impl<T: Clone> Reservoir<T> {
41 pub fn with_capacity(n: usize) -> Self {
42 Self {
43 total: 0,
44 pool: std::vec::from_elem(Option::<T>::None, n),
45 }
46 }
47}
48
49impl<T> ReservoirSampler for Reservoir<T> {
50 type Item = T;
51
52 fn sample(&mut self, it: Self::Item) -> (usize, usize, Option<Self::Item>) {
53 let pool_cap = self.pool.capacity();
54
55 self.total += 1;
56
57 // 概率渐小的随机替换
58 let r = random::<usize>() % self.total + 1;
59 let mut replaced = None;
60 if r <= pool_cap {
61 replaced = self.pool[r - 1].take();
62 self.pool[r - 1] = Some(it);
63 }
64
65 if self.total <= pool_cap && r < self.total {
66 self.pool[self.total - 1] = replaced.take();
67 }
68
69 (r, self.total, replaced)
70 }
71
72 fn samples(&self) -> &[Option<Self::Item>] {
73 &self.pool[..]
74 }
75
76 fn lock(mut self) -> Vec<Option<Self::Item>> {
77 let mut i = self.total;
78 while i < self.pool.capacity() {
79 i += 1;
80
81 let r = random::<usize>() % i + 1;
82 if r <= self.pool.capacity() {
83 self.pool[i - 1] = self.pool[r - 1].take();
84 }
85 }
86
87 self.pool
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94
95 #[test]
96 fn test() {
97 let list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
98 let mut reservoir = Reservoir::<i32>::with_capacity(15);
99
100 for &it in &list {
101 let _ = reservoir.sample(it);
102 println!("current: {:?}", reservoir.samples());
103 }
104
105 println!("result: {:?}", reservoir.lock());
106 }
107}