1use std::borrow::Cow;
2use std::collections::{BTreeMap, HashMap};
3use std::ops::{Bound, RangeBounds};
4use std::sync::{Arc, Condvar, Mutex, Weak};
5use std::time::Duration;
6use std::vec::IntoIter as VecIter;
7
8use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};
9
10use string_cache::DefaultAtom as Atom;
11
12#[derive(Clone, Default)]
13struct MemoryStoreInternal {
14 entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>,
15 subscribers: HashMap<Atom, Vec<Weak<MemoryStreamSubscriptionInternal>>>,
16}
17
18struct MemoryStreamSubscriptionInternal {
19 latest: Mutex<Option<Entry>>,
20 cvar: Condvar,
21}
22
23impl MemoryStreamSubscriptionInternal {
24 fn notify(&self, entry: Entry) {
25 let mut latest = self.latest.lock().unwrap();
26 *latest = Some(entry);
27 self.cvar.notify_all();
28 }
29}
30
31#[derive(Clone, Default)]
32pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);
33
34impl Store for MemoryStore {
35 fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
36 let mut internal = self.0.lock().unwrap();
37
38 internal
39 .entries
40 .entry((entry.timestamp, entry.name.clone()))
41 .or_insert_with(Vec::default)
42 .push(entry.value.clone());
43
44 if let Some(subscribers) = internal.subscribers.get_mut(&entry.name) {
45 let entry = entry.into_owned();
46 let mut new_subscribers = Vec::<Weak<MemoryStreamSubscriptionInternal>>::default();
47 for subscriber in subscribers.drain(..) {
48 if let Some(subscriber) = Weak::upgrade(&subscriber) {
49 subscriber.notify(entry.clone());
50 new_subscribers.push(Arc::downgrade(&subscriber));
51 }
52 }
53 *subscribers = new_subscribers;
54 }
55
56 Ok(())
57 }
58
59 fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
60 let name = name.into();
61 let internal = self.0.lock().unwrap();
62 for ((map_timestamp, map_name), map_values) in internal.entries.iter().rev() {
63 if map_name != &name {
64 continue;
65 }
66 if let Some(value) = map_values.last() {
67 return Ok(Some(Entry::new_with_timestamp(*map_timestamp, name, value.clone())));
68 }
69 }
70
71 Ok(None)
72 }
73}
74
75impl RangeableStore for MemoryStore {
76 type Range = MemoryRange;
77
78 fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error> {
79 utils::check_bounds(range.start_bound(), range.end_bound())?;
80 Ok(Self::Range {
81 internal: self.0.clone(),
82 start_bound: range.start_bound().cloned(),
83 end_bound: range.end_bound().cloned(),
84 name: name.map(|n| n.into()),
85 })
86 }
87}
88
89pub struct MemoryRange {
90 internal: Arc<Mutex<MemoryStoreInternal>>,
91 start_bound: Bound<i64>,
92 end_bound: Bound<i64>,
93 name: Option<Atom>,
94}
95
96impl MemoryRange {
97 fn full_start_bound(&self) -> (i64, Atom) {
98 match self.start_bound {
99 Bound::Included(timestamp) => (timestamp, Atom::from("")),
100 Bound::Excluded(timestamp) => (timestamp + 1, Atom::from("")),
101 Bound::Unbounded => (i64::min_value(), Atom::from("")),
102 }
103 }
104
105 fn done_iterating_in_range(&self, timestamp: i64) -> bool {
106 match self.end_bound {
107 Bound::Included(end_bound_timestamp) => timestamp <= end_bound_timestamp,
108 Bound::Excluded(end_bound_timestamp) => timestamp < end_bound_timestamp,
109 Bound::Unbounded => false,
110 }
111 }
112
113 fn filter_name_in_range(&self, name: &Atom) -> bool {
114 if let Some(ref expected_name) = self.name {
115 name != expected_name
116 } else {
117 false
118 }
119 }
120}
121
122impl Range for MemoryRange {
123 type Iter = VecIter<Result<Entry, Error>>;
124
125 fn count(&self) -> Result<u64, Error> {
126 let mut count: u64 = 0;
127 let internal = self.internal.lock().unwrap();
128 for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
129 if self.done_iterating_in_range(*timestamp) {
130 break;
131 }
132 if self.filter_name_in_range(name) {
133 continue;
134 }
135 count += values.len() as u64;
136 }
137 Ok(count)
138 }
139
140 fn remove(self) -> Result<(), Error> {
141 let mut removeable_keys = Vec::default();
142 let mut internal = self.internal.lock().unwrap();
143 for ((timestamp, name), _values) in internal.entries.range(self.full_start_bound()..) {
144 if self.done_iterating_in_range(*timestamp) {
145 break;
146 }
147 if self.filter_name_in_range(name) {
148 continue;
149 }
150 removeable_keys.push((*timestamp, name.clone()));
151 }
152 for key in removeable_keys {
153 internal.entries.remove(&key);
154 }
155 Ok(())
156 }
157
158 fn iter(self) -> Result<Self::Iter, Error> {
159 let mut returnable_entries = Vec::default();
160 let internal = self.internal.lock().unwrap();
161 for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
162 if self.done_iterating_in_range(*timestamp) {
163 break;
164 }
165 if self.filter_name_in_range(name) {
166 continue;
167 }
168 for value in values.iter() {
169 returnable_entries.push(Ok(Entry::new_with_timestamp(*timestamp, name.clone(), value.clone())));
170 }
171 }
172 Ok(returnable_entries.into_iter())
173 }
174}
175
176impl SubscribeableStore for MemoryStore {
177 type Subscription = MemoryStreamSubscription;
178 fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
179 let name = name.into();
180 let latest = self.latest(&name)?;
181 let subscription_internal = Arc::new(MemoryStreamSubscriptionInternal {
182 latest: Mutex::new(latest),
183 cvar: Condvar::new(),
184 });
185
186 let mut internal = self.0.lock().unwrap();
187 internal
188 .subscribers
189 .entry(name)
190 .or_insert_with(Vec::default)
191 .push(Arc::downgrade(&subscription_internal));
192
193 Ok(MemoryStreamSubscription {
194 internal: subscription_internal,
195 last_timestamp: None,
196 })
197 }
198}
199
200#[derive(Clone)]
201pub struct MemoryStreamSubscription {
202 internal: Arc<MemoryStreamSubscriptionInternal>,
203 last_timestamp: Option<i64>,
204}
205
206impl Subscription for MemoryStreamSubscription {
207 fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
208 let mut latest = self.internal.latest.lock().unwrap();
209
210 loop {
211 if let Some(latest) = &*latest {
212 if let Some(last_timestamp) = self.last_timestamp {
213 if last_timestamp < latest.timestamp {
214 self.last_timestamp = Some(latest.timestamp);
215 return Ok(Some(latest.clone()));
216 }
217 } else {
218 self.last_timestamp = Some(latest.timestamp);
219 return Ok(Some(latest.clone()));
220 }
221 }
222
223 if let Some(timeout) = timeout {
224 let result = self.internal.cvar.wait_timeout(latest, timeout).unwrap();
225 if result.1.timed_out() {
226 return Ok(None);
227 }
228 latest = result.0;
229 } else {
230 latest = self.internal.cvar.wait(latest).unwrap();
231 }
232 }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use crate::{define_test, test_rangeable_store_impl, test_store_impl, test_subscribeable_store_impl, MemoryStore};
239 test_store_impl!(MemoryStore::default());
240 test_rangeable_store_impl!(MemoryStore::default());
241 test_subscribeable_store_impl!(MemoryStore::default());
242}
243
244#[cfg(test)]
245#[cfg(feature = "benches")]
246mod benches {
247 use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench, MemoryStore};
248 bench_store_impl!(MemoryStore::default());
249 bench_rangeable_store_impl!(MemoryStore::default());
250}