reservoir_sampler/
lib.rs

1//! This crate only provide a "standard" trait about what a streaming sampler
2//! using the [Reservoir Algorithm] can do. In My opinion, given a `Whole`
3//! consists of a same type of `Item`, the sampler can decide whether it should
4//! sample an item when the item passes through, and no matter when, the sampler
5//! should know which samples it currently holds. When the sampler decided not
6//! to accept any new sample more, it can `lock` the result.
7//!
8//! [Reservoir Algorithm](https://en.wikipedia.org/wiki/Reservoir_sampling)
9use rand::random;
10
11pub trait ReservoirSampler {
12    /// Each sampler only processes the same type of items.
13    type Item;
14
15    /// A sampler processes exactly one item each time, for the items come in as
16    /// a stream.
17    ///
18    /// ## Return
19    /// the `sample` function return a tuple contains 3 elements:
20    /// - a `usize` stands for what random number the current item gets
21    /// - a `usize` stands for how many items has been passed through so far
22    /// - an option of item that is replaced by the current item.
23    fn sample(&mut self, it: Self::Item) -> (usize, usize, Option<Self::Item>);
24
25    /// A reservoir should know which items are held no matter if the sampling
26    /// process is finished.
27    fn samples(&self) -> &[Option<Self::Item>];
28
29    /// End the sampling process. Shuffling the order of the result is allowed.
30    fn lock(self) -> Vec<Option<Self::Item>>;
31}
32
33/// A `Reservoir` is a just a pool, but for random number generation, `total`
34/// items' count passed through is known.
35pub struct Reservoir<T> {
36    total: usize,
37    pool: Vec<Option<T>>,
38}
39
40impl<T: Clone> Reservoir<T> {
41    pub fn with_capacity(n: usize) -> Self {
42        Self {
43            total: 0,
44            pool: std::vec::from_elem(Option::<T>::None, n),
45        }
46    }
47}
48
49impl<T> ReservoirSampler for Reservoir<T> {
50    type Item = T;
51
52    fn sample(&mut self, it: Self::Item) -> (usize, usize, Option<Self::Item>) {
53        let pool_cap = self.pool.capacity();
54
55        self.total += 1;
56
57        // 概率渐小的随机替换
58        let r = random::<usize>() % self.total + 1;
59        let mut replaced = None;
60        if r <= pool_cap {
61            replaced = self.pool[r - 1].take();
62            self.pool[r - 1] = Some(it);
63        }
64
65        if self.total <= pool_cap && r < self.total {
66            self.pool[self.total - 1] = replaced.take();
67        }
68
69        (r, self.total, replaced)
70    }
71
72    fn samples(&self) -> &[Option<Self::Item>] {
73        &self.pool[..]
74    }
75
76    fn lock(mut self) -> Vec<Option<Self::Item>> {
77        let mut i = self.total;
78        while i < self.pool.capacity() {
79            i += 1;
80
81            let r = random::<usize>() % i + 1;
82            if r <= self.pool.capacity() {
83                self.pool[i - 1] = self.pool[r - 1].take();
84            }
85        }
86
87        self.pool
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    #[test]
96    fn test() {
97        let list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
98        let mut reservoir = Reservoir::<i32>::with_capacity(15);
99
100        for &it in &list {
101            let _ = reservoir.sample(it);
102            println!("current: {:?}", reservoir.samples());
103        }
104
105        println!("result: {:?}", reservoir.lock());
106    }
107}