ackr/
lib.rs

1use std::collections::HashMap;
2
3#[derive(PartialEq, Eq, Hash, Debug, Copy, Clone)]
4pub struct Source(u64);
5
6impl Source {
7    pub fn to_u64(&self) -> u64 {
8        self.0
9    }
10
11    pub fn as_tuple(&self) -> Tuple {
12        Tuple(self.to_u64())
13    }
14}
15
16#[derive(PartialEq, Eq, Hash, Debug, Copy, Clone)]
17pub struct Tuple(u64);
18
19impl Tuple {
20    pub fn to_u64(&self) -> u64 {
21        self.0
22    }
23}
24
25#[derive(PartialEq, Eq, Hash, Debug, Copy, Clone)]
26pub struct Task(u32);
27
28impl Task {
29    pub fn to_u32(&self) -> u32 {
30        self.0
31    }
32}
33
34/// Track tuple acks across multiple tasks with minimal memory
35/// requirements. The algorithm is from the stream-processing system
36/// Storm.
37///
38/// The only allocation needed is two 64-bit values per task (task_id, ack value)
39/// and scales to 2^64 tuples.
40///
41/// To track a tuple, you call `tuple`. The tuple's id will be XORed with the previous
42/// ack value. As tuples are acked (marked as arbitrarily completed), the ack value will
43/// once again be XORed. Once all tuples are acked/completed, the ack value will be 0.
44#[derive(PartialEq, Eq, Debug)]
45pub struct Ackr {
46    buckets: HashMap<Source, (Task, Tuple)>
47}
48
49impl Ackr {
50    /// Create a new Ackr with no buckets/tasks.
51    pub fn new() -> Ackr {
52        Ackr {
53            buckets: HashMap::new()
54        }
55    }
56
57    /// Insert a new bucket entry with the Source id as the initial ack value.
58    pub fn insert(&mut self, source_id: Source, task_id: Task) {
59        self.buckets.insert(source_id, (task_id, source_id.as_tuple()));
60    }
61
62    /// Add a tuple to the Source's ack value. This is essentially just the first
63    /// XOR.
64    pub fn add_tuple(&mut self, source_id: Source, tuple_id: Tuple) {
65        self.ack(source_id, tuple_id);
66    }
67
68    /// XOR the ack value for a given Source and the result is the new ack value.
69    /// Acking once adds the tuple to the Source, acking it twice removes it.
70    pub fn ack(&mut self, source_id: Source, tuple_id: Tuple) -> Option<()> {
71        if let Some(&mut (_, Tuple(ref mut x))) = self.buckets.get_mut(&source_id) {
72            *x ^= tuple_id.to_u64();
73
74            Some(())
75        } else {
76            None
77        }
78    }
79
80    pub fn get(&mut self, source_id: Source) -> Tuple {
81        self.buckets[&source_id].1
82    }
83
84    pub fn has_completed(&mut self, source_id: Source) -> bool {
85        self.buckets[&source_id].1.to_u64() == 0
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use super::*;
92
93    #[test]
94    fn new() {
95        let mut ackr = Ackr::new();
96        ackr.insert(Source(0x01), Task(0x02));
97        assert_eq!(ackr.get(Source(0x01)), Tuple(0x01));
98    }
99
100    #[test]
101    fn ack() {
102        let mut ackr = Ackr::new();
103        ackr.insert(Source(0x01), Task(0x01));
104        ackr.ack(Source(0x01), Tuple(0x01));
105        assert_eq!(ackr.has_completed(Source(0x01)), true);
106    }
107
108    #[test]
109    fn ack_2() {
110        let mut ackr = Ackr::new();
111
112        // Source id, task id
113        ackr.insert(Source(0x01), Task(1));
114
115        ackr.add_tuple(Source(0x01), Tuple(0x03));
116        ackr.add_tuple(Source(0x01), Tuple(0x04));
117        ackr.ack(Source(0x01), Tuple(0x04));
118        ackr.ack(Source(0x01), Tuple(0x03));
119        assert_eq!(ackr.get(Source(0x01)), Tuple(0x01));
120        assert_eq!(ackr.has_completed(Source(0x01)), false);
121    }
122}