confitul 0.1.4

ConfitUL contains utilities for ConfitDB which is an experimental, distributed, real-time database, giving full control on conflict resolution.
Documentation
use crate::conflict::Conflict;
use crate::sized_queue::SizedQueue;
use crate::sized_queue_drain::SizedQueueDrain;
use crate::suggestion::Suggestion;
use crate::versioned::{Versioned, VersionedStatus};
use ckey::CKey;
use hashlru::Cache;
use std::cmp::Ordering::Greater;
use std::hash::Hash;
use std::iter::Iterator;
use std::sync::{Arc, RwLock};
use vclock::VClock;

#[derive(Debug)]
pub struct MemoryStore<C, V>
where
    C: Eq + Hash + Clone,
    V: Clone,
{
    items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>,
    conflicts: Arc<RwLock<SizedQueue<Conflict<C, V>>>>,
    suggestions: Arc<RwLock<SizedQueue<Suggestion<C, V>>>>,
    suggestion_mode: Arc<RwLock<SuggestionMode>>,
}

const MEMORY_STORE_ITEMS_SIZE: usize = 10000; // [TODO] make this a param
const MEMORY_STORE_CONFLICTS_SIZE: usize = 10000; // [TODO] make this a param
const MEMORY_STORE_SUGGESTIONS_SIZE: usize = 10000; // [TODO] make this a param

#[derive(Debug, Clone, Copy)]
pub enum SuggestionMode {
    Assign,
    Queue,
    Drop,
}

impl<C, V> MemoryStore<C, V>
where
    C: Eq + Hash + Clone,
    V: Clone,
{
    pub fn new() -> Self {
        MemoryStore::<C, V> {
            items: Arc::new(RwLock::new(Cache::new(MEMORY_STORE_ITEMS_SIZE))),
            conflicts: Arc::new(RwLock::new(SizedQueue::new(MEMORY_STORE_CONFLICTS_SIZE))),
            suggestions: Arc::new(RwLock::new(SizedQueue::new(MEMORY_STORE_SUGGESTIONS_SIZE))),
            suggestion_mode: Arc::new(RwLock::new(SuggestionMode::Assign)),
        }
    }

    pub fn items_len(&self) -> usize {
        self.items.read().unwrap().len()
    }

    pub fn items_capacity(&self) -> usize {
        self.items.read().unwrap().capacity()
    }

    pub fn resize_items(&self, size: usize) -> usize {
        self.items.write().unwrap().resize(size)
    }

    pub fn conflicts_len(&self) -> usize {
        self.conflicts.read().unwrap().len()
    }

    pub fn conflicts_capacity(&self) -> usize {
        self.conflicts.read().unwrap().capacity()
    }

    pub fn resize_conflicts(&self, size: usize) -> usize {
        self.conflicts.write().unwrap().resize(size)
    }

    pub fn set_suggestion_mode(&self, mode: SuggestionMode) {
        *self.suggestion_mode.write().unwrap() = mode;
    }

    pub fn get_suggestion_mode(&self) -> SuggestionMode {
        *self.suggestion_mode.read().unwrap()
    }

    pub fn suggestions_len(&self) -> usize {
        self.items.read().unwrap().len()
    }

    pub fn suggestions_capacity(&self) -> usize {
        self.suggestions.read().unwrap().capacity()
    }

    pub fn resize_suggestions(&self, size: usize) -> usize {
        self.suggestions.write().unwrap().resize(size)
    }

    pub fn assign_versioned(
        &self,
        key: CKey,
        versioned: Versioned<C, V>,
    ) -> Option<Conflict<C, V>> {
        let mut items = self.items.write().unwrap();
        match items.get(&key) {
            Some(old) => match versioned.version.partial_cmp(&old.version) {
                Some(order) => {
                    if order == Greater {
                        items.insert(key, versioned);
                    }
                }
                None => {
                    return Some(Conflict {
                        key,
                        left: old.clone(),
                        right: versioned,
                    })
                }
            },
            None => {
                items.insert(key, versioned);
            }
        }
        None
    }

    pub fn assign_value(&self, key: CKey, version: VClock<C>, value: V) -> Option<Conflict<C, V>> {
        self.assign_versioned(
            key,
            Versioned {
                version,
                value: Some(value),
            },
        )
    }

    pub fn assign_kill(&self, key: CKey, version: VClock<C>) -> Option<Conflict<C, V>> {
        self.assign_versioned(
            key,
            Versioned {
                value: None,
                version,
            },
        )
    }

    pub fn suggest_versioned(&self, key: CKey, versioned: Versioned<C, V>) {
        let suggestion_mode = self.suggestion_mode.read().unwrap();
        match *suggestion_mode {
            SuggestionMode::Assign => {
                if let Some(conflict) = self.assign_versioned(key, versioned) {
                    self.push_conflict(conflict);
                }
            }
            SuggestionMode::Queue => {
                self.suggestions
                    .write()
                    .unwrap()
                    .push(Suggestion { key, versioned });
            }
            SuggestionMode::Drop => (),
        }
        drop(suggestion_mode);
    }

    pub fn suggest_value(&self, key: CKey, version: VClock<C>, value: V) {
        self.suggest_versioned(
            key,
            Versioned {
                version,
                value: Some(value),
            },
        );
    }

    pub fn suggest_kill(&self, key: CKey, version: VClock<C>) {
        self.suggest_versioned(
            key,
            Versioned {
                value: None,
                version,
            },
        );
    }

    pub fn get_versioned(&self, key: &CKey) -> Option<Versioned<C, V>> {
        let mut items = self.items.write().unwrap();

        match items.get(key) {
            // We clone the items, this is the whole point of
            // this store, it copies the items so that you
            // can freely use it outside the store.
            //
            // To modify it you *HAVE* to go through the conflict
            // system so a direct access makes no sense.
            Some(v) => Some(v.clone()),
            None => None,
        }
    }

    pub fn get_value(&self, key: &CKey) -> Option<V> {
        let mut items = self.items.write().unwrap();

        match items.get(key) {
            // We clone the items, this is the whole point of
            // this store, it copies the items so that you
            // can freely use it outside the store.
            //
            // To modify it you *HAVE* to go through the conflict
            // system so a direct access makes no sense.
            Some(v) => match &v.value {
                Some(value) => Some(value.clone()),
                None => None,
            },
            None => None,
        }
    }

    pub fn get_status(&self, key: &CKey) -> VersionedStatus {
        let items = self.items.read().unwrap();

        match items.peek(key) {
            Some(v) => {
                if v.value.is_none() {
                    VersionedStatus::WasKilled
                } else {
                    VersionedStatus::HasValue
                }
            }
            None => VersionedStatus::NotExist,
        }
    }

    pub fn has_versioned(&self, key: &CKey) -> bool {
        self.items.read().unwrap().contains_key(&key)
    }

    pub fn has_value(&self, key: &CKey) -> bool {
        let items = self.items.read().unwrap();

        match items.peek(key) {
            Some(v) => !v.value.is_none(),
            None => false,
        }
    }

    pub fn is_killed(&self, key: &CKey) -> bool {
        let items = self.items.read().unwrap();

        match items.peek(key) {
            Some(v) => v.value.is_none(),
            None => false,
        }
    }

    pub fn push_conflict(&self, conflict: Conflict<C, V>) {
        self.conflicts.write().unwrap().push(conflict);
    }

    pub fn pop_conflict(&self) -> Option<Conflict<C, V>> {
        self.conflicts.write().unwrap().pop()
    }

    pub fn iter_items(&self) -> MemoryStoreItemsIterator<C, V> {
        MemoryStoreItemsIterator::new(self.items.clone())
    }

    pub fn drain_conflicts(&self) -> SizedQueueDrain<Conflict<C, V>> {
        let mut conflicts = self.conflicts.write().unwrap();
        SizedQueueDrain::new(&mut conflicts)
    }

    pub fn drain_suggestions(&self) -> SizedQueueDrain<Suggestion<C, V>> {
        let mut suggestions = self.suggestions.write().unwrap();
        SizedQueueDrain::new(&mut suggestions)
    }

    pub fn freeze(&self) -> SizedQueueDrain<Conflict<C, V>> {
        let mut suggestion_mode = self.suggestion_mode.write().unwrap();
        let conflicts = self.drain_conflicts();
        *suggestion_mode = SuggestionMode::Queue;
        drop(suggestion_mode);

        conflicts
    }

    pub fn unfreeze(&self) -> usize {
        let mut suggestion_mode = self.suggestion_mode.write().unwrap();
        let suggestions = self.drain_suggestions();
        *suggestion_mode = SuggestionMode::Assign;
        drop(suggestion_mode);

        suggestions
            .map(|suggestion| self.assign_versioned(suggestion.key, suggestion.versioned))
            .count()
    }
}

pub struct MemoryStoreItemsIterator<C, V>
where
    C: Eq + Hash + Clone,
    V: Clone,
{
    pos: usize,
    keys: Vec<CKey>,
    items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>,
}

impl<C, V> MemoryStoreItemsIterator<C, V>
where
    C: Eq + Hash + Clone,
    V: Clone,
{
    fn new(items: Arc<RwLock<Cache<CKey, Versioned<C, V>>>>) -> Self {
        // Build a list of all keys to iterate on, we do not know
        // what will happen to the store afterwards...
        let keys: Vec<CKey> = items.read().unwrap().iter().map(|x| *x.0).collect();
        Self {
            pos: 0,
            keys,
            items,
        }
    }
}

impl<C, V> Iterator for MemoryStoreItemsIterator<C, V>
where
    C: Eq + Hash + Clone,
    V: Clone,
{
    type Item = Versioned<C, V>;

    fn next(&mut self) -> Option<Versioned<C, V>> {
        while self.pos < self.keys.len() {
            match self.items.read().unwrap().peek(&self.keys[self.pos]) {
                Some(versioned) => {
                    if !versioned.value.is_none() {
                        return Some(versioned.clone());
                    }
                }
                None => (),
            }
            self.pos += 1;
        }
        None
    }
}