use crate::conflict::Conflict;
use crate::sized_queue::SizedQueue;
use crate::sized_queue_drain::SizedQueueDrain;
use crate::suggestion::Suggestion;
use crate::versioned::{Versioned, VersionedStatus};
use ckey::CKey;
use hashlru::Cache;
use std::cmp::Ordering::Greater;
use std::hash::Hash;
use std::iter::Iterator;
use std::sync::{Arc, RwLock};
use vclock::VClock;
#[derive(Debug)]
pub struct MemoryStore<C, V>
where
C: Eq + Hash + Clone,
V: Clone,
{
items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>,
conflicts: Arc<RwLock<SizedQueue<Conflict<C, V>>>>,
suggestions: Arc<RwLock<SizedQueue<Suggestion<C, V>>>>,
suggestion_mode: Arc<RwLock<SuggestionMode>>,
}
const MEMORY_STORE_ITEMS_SIZE: usize = 10000; const MEMORY_STORE_CONFLICTS_SIZE: usize = 10000; const MEMORY_STORE_SUGGESTIONS_SIZE: usize = 10000;
#[derive(Debug, Clone, Copy)]
pub enum SuggestionMode {
Assign,
Queue,
Drop,
}
impl<C, V> MemoryStore<C, V>
where
C: Eq + Hash + Clone,
V: Clone,
{
pub fn new() -> Self {
MemoryStore::<C, V> {
items: Arc::new(RwLock::new(Cache::new(MEMORY_STORE_ITEMS_SIZE))),
conflicts: Arc::new(RwLock::new(SizedQueue::new(MEMORY_STORE_CONFLICTS_SIZE))),
suggestions: Arc::new(RwLock::new(SizedQueue::new(MEMORY_STORE_SUGGESTIONS_SIZE))),
suggestion_mode: Arc::new(RwLock::new(SuggestionMode::Assign)),
}
}
pub fn items_len(&self) -> usize {
self.items.read().unwrap().len()
}
pub fn items_capacity(&self) -> usize {
self.items.read().unwrap().capacity()
}
pub fn resize_items(&self, size: usize) -> usize {
self.items.write().unwrap().resize(size)
}
pub fn conflicts_len(&self) -> usize {
self.conflicts.read().unwrap().len()
}
pub fn conflicts_capacity(&self) -> usize {
self.conflicts.read().unwrap().capacity()
}
pub fn resize_conflicts(&self, size: usize) -> usize {
self.conflicts.write().unwrap().resize(size)
}
pub fn set_suggestion_mode(&self, mode: SuggestionMode) {
*self.suggestion_mode.write().unwrap() = mode;
}
pub fn get_suggestion_mode(&self) -> SuggestionMode {
*self.suggestion_mode.read().unwrap()
}
pub fn suggestions_len(&self) -> usize {
self.items.read().unwrap().len()
}
pub fn suggestions_capacity(&self) -> usize {
self.suggestions.read().unwrap().capacity()
}
pub fn resize_suggestions(&self, size: usize) -> usize {
self.suggestions.write().unwrap().resize(size)
}
pub fn assign_versioned(
&self,
key: CKey,
versioned: Versioned<C, V>,
) -> Option<Conflict<C, V>> {
let mut items = self.items.write().unwrap();
match items.get(&key) {
Some(old) => match versioned.version.partial_cmp(&old.version) {
Some(order) => {
if order == Greater {
items.insert(key, versioned);
}
}
None => {
return Some(Conflict {
key,
left: old.clone(),
right: versioned,
})
}
},
None => {
items.insert(key, versioned);
}
}
None
}
pub fn assign_value(&self, key: CKey, version: VClock<C>, value: V) -> Option<Conflict<C, V>> {
self.assign_versioned(
key,
Versioned {
version,
value: Some(value),
},
)
}
pub fn assign_kill(&self, key: CKey, version: VClock<C>) -> Option<Conflict<C, V>> {
self.assign_versioned(
key,
Versioned {
value: None,
version,
},
)
}
pub fn suggest_versioned(&self, key: CKey, versioned: Versioned<C, V>) {
let suggestion_mode = self.suggestion_mode.read().unwrap();
match *suggestion_mode {
SuggestionMode::Assign => {
if let Some(conflict) = self.assign_versioned(key, versioned) {
self.push_conflict(conflict);
}
}
SuggestionMode::Queue => {
self.suggestions
.write()
.unwrap()
.push(Suggestion { key, versioned });
}
SuggestionMode::Drop => (),
}
drop(suggestion_mode);
}
pub fn suggest_value(&self, key: CKey, version: VClock<C>, value: V) {
self.suggest_versioned(
key,
Versioned {
version,
value: Some(value),
},
);
}
pub fn suggest_kill(&self, key: CKey, version: VClock<C>) {
self.suggest_versioned(
key,
Versioned {
value: None,
version,
},
);
}
pub fn get_versioned(&self, key: &CKey) -> Option<Versioned<C, V>> {
let mut items = self.items.write().unwrap();
match items.get(key) {
Some(v) => Some(v.clone()),
None => None,
}
}
pub fn get_value(&self, key: &CKey) -> Option<V> {
let mut items = self.items.write().unwrap();
match items.get(key) {
Some(v) => match &v.value {
Some(value) => Some(value.clone()),
None => None,
},
None => None,
}
}
pub fn get_status(&self, key: &CKey) -> VersionedStatus {
let items = self.items.read().unwrap();
match items.peek(key) {
Some(v) => {
if v.value.is_none() {
VersionedStatus::WasKilled
} else {
VersionedStatus::HasValue
}
}
None => VersionedStatus::NotExist,
}
}
pub fn has_versioned(&self, key: &CKey) -> bool {
self.items.read().unwrap().contains_key(&key)
}
pub fn has_value(&self, key: &CKey) -> bool {
let items = self.items.read().unwrap();
match items.peek(key) {
Some(v) => !v.value.is_none(),
None => false,
}
}
pub fn is_killed(&self, key: &CKey) -> bool {
let items = self.items.read().unwrap();
match items.peek(key) {
Some(v) => v.value.is_none(),
None => false,
}
}
pub fn push_conflict(&self, conflict: Conflict<C, V>) {
self.conflicts.write().unwrap().push(conflict);
}
pub fn pop_conflict(&self) -> Option<Conflict<C, V>> {
self.conflicts.write().unwrap().pop()
}
pub fn iter_items(&self) -> MemoryStoreItemsIterator<C, V> {
MemoryStoreItemsIterator::new(self.items.clone())
}
pub fn drain_conflicts(&self) -> SizedQueueDrain<Conflict<C, V>> {
let mut conflicts = self.conflicts.write().unwrap();
SizedQueueDrain::new(&mut conflicts)
}
pub fn drain_suggestions(&self) -> SizedQueueDrain<Suggestion<C, V>> {
let mut suggestions = self.suggestions.write().unwrap();
SizedQueueDrain::new(&mut suggestions)
}
pub fn freeze(&self) -> SizedQueueDrain<Conflict<C, V>> {
let mut suggestion_mode = self.suggestion_mode.write().unwrap();
let conflicts = self.drain_conflicts();
*suggestion_mode = SuggestionMode::Queue;
drop(suggestion_mode);
conflicts
}
pub fn unfreeze(&self) -> usize {
let mut suggestion_mode = self.suggestion_mode.write().unwrap();
let suggestions = self.drain_suggestions();
*suggestion_mode = SuggestionMode::Assign;
drop(suggestion_mode);
suggestions
.map(|suggestion| self.assign_versioned(suggestion.key, suggestion.versioned))
.count()
}
}
pub struct MemoryStoreItemsIterator<C, V>
where
C: Eq + Hash + Clone,
V: Clone,
{
pos: usize,
keys: Vec<CKey>,
items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>,
}
impl<C, V> MemoryStoreItemsIterator<C, V>
where
C: Eq + Hash + Clone,
V: Clone,
{
fn new(items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>) -> Self {
let keys: Vec<CKey> = items.read().unwrap().iter().map(|x| *x.0).collect();
Self {
pos: 0,
keys,
items,
}
}
}
impl<C, V> Iterator for MemoryStoreItemsIterator<C, V>
where
C: Eq + Hash + Clone,
V: Clone,
{
type Item = Versioned<C, V>;
fn next(&mut self) -> Option<Versioned<C, V>> {
while self.pos < self.keys.len() {
match self.items.read().unwrap().peek(&self.keys[self.pos]) {
Some(versioned) => {
if !versioned.value.is_none() {
return Some(versioned.clone());
}
}
None => (),
}
self.pos += 1;
}
None
}
}