binlog/stores/
memory.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap};
3use std::ops::{Bound, RangeBounds};
4use std::sync::{Arc, Condvar, Mutex, Weak};
5use std::time::Duration;
6use std::vec::IntoIter as VecIter;
7
8use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};
9
10use string_cache::DefaultAtom as Atom;
11
12#[derive(Clone, Default)]
13struct MemoryStoreInternal {
14    entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>,
15    subscribers: HashMap<Atom, Vec<Weak<MemoryStreamSubscriptionInternal>>>,
16}
17
18struct MemoryStreamSubscriptionInternal {
19    latest: Mutex<Option<Entry>>,
20    cvar: Condvar,
21}
22
23impl MemoryStreamSubscriptionInternal {
24    fn notify(&self, entry: Entry) {
25        let mut latest = self.latest.lock().unwrap();
26        *latest = Some(entry);
27        self.cvar.notify_all();
28    }
29}
30
31#[derive(Clone, Default)]
32pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);
33
34impl Store for MemoryStore {
35    fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
36        let mut internal = self.0.lock().unwrap();
37
38        internal
39            .entries
40            .entry((entry.timestamp, entry.name.clone()))
41            .or_insert_with(Vec::default)
42            .push(entry.value.clone());
43
44        if let Some(subscribers) = internal.subscribers.get_mut(&entry.name) {
45            let entry = entry.into_owned();
46            let mut new_subscribers = Vec::<Weak<MemoryStreamSubscriptionInternal>>::default();
47            for subscriber in subscribers.drain(..) {
48                if let Some(subscriber) = Weak::upgrade(&subscriber) {
49                    subscriber.notify(entry.clone());
50                    new_subscribers.push(Arc::downgrade(&subscriber));
51                }
52            }
53            *subscribers = new_subscribers;
54        }
55
56        Ok(())
57    }
58
59    fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
60        let name = name.into();
61        let internal = self.0.lock().unwrap();
62        for ((map_timestamp, map_name), map_values) in internal.entries.iter().rev() {
63            if map_name != &name {
64                continue;
65            }
66            if let Some(value) = map_values.last() {
67                return Ok(Some(Entry::new_with_timestamp(*map_timestamp, name, value.clone())));
68            }
69        }
70
71        Ok(None)
72    }
73}
74
75impl RangeableStore for MemoryStore {
76    type Range = MemoryRange;
77
78    fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error> {
79        utils::check_bounds(range.start_bound(), range.end_bound())?;
80        Ok(Self::Range {
81            internal: self.0.clone(),
82            start_bound: range.start_bound().cloned(),
83            end_bound: range.end_bound().cloned(),
84            name: name.map(|n| n.into()),
85        })
86    }
87}
88
89pub struct MemoryRange {
90    internal: Arc<Mutex<MemoryStoreInternal>>,
91    start_bound: Bound<i64>,
92    end_bound: Bound<i64>,
93    name: Option<Atom>,
94}
95
96impl MemoryRange {
97    fn full_start_bound(&self) -> (i64, Atom) {
98        match self.start_bound {
99            Bound::Included(timestamp) => (timestamp, Atom::from("")),
100            Bound::Excluded(timestamp) => (timestamp + 1, Atom::from("")),
101            Bound::Unbounded => (i64::min_value(), Atom::from("")),
102        }
103    }
104
105    fn done_iterating_in_range(&self, timestamp: i64) -> bool {
106        match self.end_bound {
107            Bound::Included(end_bound_timestamp) => timestamp <= end_bound_timestamp,
108            Bound::Excluded(end_bound_timestamp) => timestamp < end_bound_timestamp,
109            Bound::Unbounded => false,
110        }
111    }
112
113    fn filter_name_in_range(&self, name: &Atom) -> bool {
114        if let Some(ref expected_name) = self.name {
115            name != expected_name
116        } else {
117            false
118        }
119    }
120}
121
122impl Range for MemoryRange {
123    type Iter = VecIter<Result<Entry, Error>>;
124
125    fn count(&self) -> Result<u64, Error> {
126        let mut count: u64 = 0;
127        let internal = self.internal.lock().unwrap();
128        for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
129            if self.done_iterating_in_range(*timestamp) {
130                break;
131            }
132            if self.filter_name_in_range(name) {
133                continue;
134            }
135            count += values.len() as u64;
136        }
137        Ok(count)
138    }
139
140    fn remove(self) -> Result<(), Error> {
141        let mut removeable_keys = Vec::default();
142        let mut internal = self.internal.lock().unwrap();
143        for ((timestamp, name), _values) in internal.entries.range(self.full_start_bound()..) {
144            if self.done_iterating_in_range(*timestamp) {
145                break;
146            }
147            if self.filter_name_in_range(name) {
148                continue;
149            }
150            removeable_keys.push((*timestamp, name.clone()));
151        }
152        for key in removeable_keys {
153            internal.entries.remove(&key);
154        }
155        Ok(())
156    }
157
158    fn iter(self) -> Result<Self::Iter, Error> {
159        let mut returnable_entries = Vec::default();
160        let internal = self.internal.lock().unwrap();
161        for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
162            if self.done_iterating_in_range(*timestamp) {
163                break;
164            }
165            if self.filter_name_in_range(name) {
166                continue;
167            }
168            for value in values.iter() {
169                returnable_entries.push(Ok(Entry::new_with_timestamp(*timestamp, name.clone(), value.clone())));
170            }
171        }
172        Ok(returnable_entries.into_iter())
173    }
174}
175
176impl SubscribeableStore for MemoryStore {
177    type Subscription = MemoryStreamSubscription;
178    fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
179        let name = name.into();
180        let latest = self.latest(&name)?;
181        let subscription_internal = Arc::new(MemoryStreamSubscriptionInternal {
182            latest: Mutex::new(latest),
183            cvar: Condvar::new(),
184        });
185
186        let mut internal = self.0.lock().unwrap();
187        internal
188            .subscribers
189            .entry(name)
190            .or_insert_with(Vec::default)
191            .push(Arc::downgrade(&subscription_internal));
192
193        Ok(MemoryStreamSubscription {
194            internal: subscription_internal,
195            last_timestamp: None,
196        })
197    }
198}
199
200#[derive(Clone)]
201pub struct MemoryStreamSubscription {
202    internal: Arc<MemoryStreamSubscriptionInternal>,
203    last_timestamp: Option<i64>,
204}
205
206impl Subscription for MemoryStreamSubscription {
207    fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
208        let mut latest = self.internal.latest.lock().unwrap();
209
210        loop {
211            if let Some(latest) = &*latest {
212                if let Some(last_timestamp) = self.last_timestamp {
213                    if last_timestamp < latest.timestamp {
214                        self.last_timestamp = Some(latest.timestamp);
215                        return Ok(Some(latest.clone()));
216                    }
217                } else {
218                    self.last_timestamp = Some(latest.timestamp);
219                    return Ok(Some(latest.clone()));
220                }
221            }
222
223            if let Some(timeout) = timeout {
224                let result = self.internal.cvar.wait_timeout(latest, timeout).unwrap();
225                if result.1.timed_out() {
226                    return Ok(None);
227                }
228                latest = result.0;
229            } else {
230                latest = self.internal.cvar.wait(latest).unwrap();
231            }
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use crate::{define_test, test_rangeable_store_impl, test_store_impl, test_subscribeable_store_impl, MemoryStore};
239    test_store_impl!(MemoryStore::default());
240    test_rangeable_store_impl!(MemoryStore::default());
241    test_subscribeable_store_impl!(MemoryStore::default());
242}
243
244#[cfg(test)]
245#[cfg(feature = "benches")]
246mod benches {
247    use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench, MemoryStore};
248    bench_store_impl!(MemoryStore::default());
249    bench_rangeable_store_impl!(MemoryStore::default());
250}