use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::time::Duration;
use std::vec::IntoIter as VecIter;
use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};
use string_cache::DefaultAtom as Atom;
#[derive(Clone, Default)]
struct MemoryStoreInternal {
entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>,
subscribers: HashMap<Atom, Vec<Weak<MemoryStreamSubscriptionInternal>>>,
}
struct MemoryStreamSubscriptionInternal {
latest: Mutex<Option<Entry>>,
cvar: Condvar,
}
impl MemoryStreamSubscriptionInternal {
fn notify(&self, entry: Entry) {
let mut latest = self.latest.lock().unwrap();
*latest = Some(entry);
self.cvar.notify_all();
}
}
#[derive(Clone, Default)]
pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);
impl Store for MemoryStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let mut internal = self.0.lock().unwrap();
internal
.entries
.entry((entry.timestamp, entry.name.clone()))
.or_insert_with(Vec::default)
.push(entry.value.clone());
if let Some(subscribers) = internal.subscribers.get_mut(&entry.name) {
let entry = entry.into_owned();
let mut new_subscribers = Vec::<Weak<MemoryStreamSubscriptionInternal>>::default();
for subscriber in subscribers.drain(..) {
if let Some(subscriber) = Weak::upgrade(&subscriber) {
subscriber.notify(entry.clone());
new_subscribers.push(Arc::downgrade(&subscriber));
}
}
*subscribers = new_subscribers;
}
Ok(())
}
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
let name = name.into();
let internal = self.0.lock().unwrap();
for ((map_timestamp, map_name), map_values) in internal.entries.iter().rev() {
if map_name != &name {
continue;
}
if let Some(value) = map_values.last() {
return Ok(Some(Entry::new_with_timestamp(*map_timestamp, name, value.clone())));
}
}
Ok(None)
}
}
impl RangeableStore for MemoryStore {
type Range = MemoryRange;
fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error> {
utils::check_bounds(range.start_bound(), range.end_bound())?;
Ok(Self::Range {
internal: self.0.clone(),
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
name: name.map(|n| n.into()),
})
}
}
pub struct MemoryRange {
internal: Arc<Mutex<MemoryStoreInternal>>,
start_bound: Bound<i64>,
end_bound: Bound<i64>,
name: Option<Atom>,
}
impl MemoryRange {
fn full_start_bound(&self) -> (i64, Atom) {
match self.start_bound {
Bound::Included(timestamp) => (timestamp, Atom::from("")),
Bound::Excluded(timestamp) => (timestamp + 1, Atom::from("")),
Bound::Unbounded => (i64::min_value(), Atom::from("")),
}
}
fn done_iterating_in_range(&self, timestamp: i64) -> bool {
match self.end_bound {
Bound::Included(end_bound_timestamp) => timestamp <= end_bound_timestamp,
Bound::Excluded(end_bound_timestamp) => timestamp < end_bound_timestamp,
Bound::Unbounded => false,
}
}
fn filter_name_in_range(&self, name: &Atom) -> bool {
if let Some(ref expected_name) = self.name {
name != expected_name
} else {
false
}
}
}
impl Range for MemoryRange {
type Iter = VecIter<Result<Entry, Error>>;
fn count(&self) -> Result<u64, Error> {
let mut count: u64 = 0;
let internal = self.internal.lock().unwrap();
for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
count += values.len() as u64;
}
Ok(count)
}
fn remove(self) -> Result<(), Error> {
let mut removeable_keys = Vec::default();
let mut internal = self.internal.lock().unwrap();
for ((timestamp, name), _values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
removeable_keys.push((*timestamp, name.clone()));
}
for key in removeable_keys {
internal.entries.remove(&key);
}
Ok(())
}
fn iter(self) -> Result<Self::Iter, Error> {
let mut returnable_entries = Vec::default();
let internal = self.internal.lock().unwrap();
for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
for value in values.iter() {
returnable_entries.push(Ok(Entry::new_with_timestamp(*timestamp, name.clone(), value.clone())));
}
}
Ok(returnable_entries.into_iter())
}
}
impl SubscribeableStore for MemoryStore {
type Subscription = MemoryStreamSubscription;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let name = name.into();
let latest = self.latest(&name)?;
let subscription_internal = Arc::new(MemoryStreamSubscriptionInternal {
latest: Mutex::new(latest),
cvar: Condvar::new(),
});
let mut internal = self.0.lock().unwrap();
internal
.subscribers
.entry(name)
.or_insert_with(Vec::default)
.push(Arc::downgrade(&subscription_internal));
Ok(MemoryStreamSubscription {
internal: subscription_internal,
last_timestamp: None,
})
}
}
#[derive(Clone)]
pub struct MemoryStreamSubscription {
internal: Arc<MemoryStreamSubscriptionInternal>,
last_timestamp: Option<i64>,
}
impl Subscription for MemoryStreamSubscription {
fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
let mut latest = self.internal.latest.lock().unwrap();
loop {
if let Some(latest) = &*latest {
if let Some(last_timestamp) = self.last_timestamp {
if last_timestamp < latest.timestamp {
self.last_timestamp = Some(latest.timestamp);
return Ok(Some(latest.clone()));
}
} else {
self.last_timestamp = Some(latest.timestamp);
return Ok(Some(latest.clone()));
}
}
if let Some(timeout) = timeout {
let result = self.internal.cvar.wait_timeout(latest, timeout).unwrap();
if result.1.timed_out() {
return Ok(None);
}
latest = result.0;
} else {
latest = self.internal.cvar.wait(latest).unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_rangeable_store_impl, test_store_impl, test_subscribeable_store_impl, MemoryStore};
test_store_impl!(MemoryStore::default());
test_rangeable_store_impl!(MemoryStore::default());
test_subscribeable_store_impl!(MemoryStore::default());
}
#[cfg(test)]
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench, MemoryStore};
bench_store_impl!(MemoryStore::default());
bench_rangeable_store_impl!(MemoryStore::default());
}