arthas 0.3.0

Arthas is an in-memory structure database.
Documentation

use std::sync::Mutex;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use query::Query;
use item::{ItemWrapper, Id, get_len_field_int};
use serde_json::Value;
use to_value;
use store;
use traits::{Structure, FieldIntMap};
use persistence::service::PersistenceService;
use error::Error;
use item::convertor::Convertor;
use encoder;
use tree::Tree;
use query::QueryType;
use num_cpus;
use utils::hash_map::revert;
use scoped_pool::Pool;
use vec_map::VecMap;


pub struct Memory {
    pub field_int_map: Arc<FieldIntMap>,
    pub int_field_map: Arc<FieldIntMap>,
    pub service: PersistenceService,
    pub tree: Tree,
    pub is_one: bool,
    pub pool: Pool,
}

impl Memory {
    pub fn new<T: Structure>() -> Memory {
        Memory {
            field_int_map: Arc::new(T::get_field_int_map()),
            int_field_map: Arc::new(revert(T::get_field_int_map())),
            service: PersistenceService::new::<T>(),
            tree: Tree::new::<T>(),
            is_one: T::is_one(),
            pool: Pool::new(num_cpus::get()),
        }
    }

    pub fn insert<T: Structure>(&mut self, query: &mut Query<T>) -> Result<Id, Error> {
        self.check_query_field(query)?;

        let wrapper = ItemWrapper::new(query.item.take().unwrap());
        let id = wrapper.id.clone();
        let encoded = encoder::encode_wrapper(&to_value(wrapper), &self.field_int_map);
        self.insert_encoded_value(id.clone(), encoded)?;
        Ok(id)
    }

    pub fn find<T: Structure>(&self, query: &mut Query<T>) -> Result<Vec<T>, Error> {
        self.check_query_field(query)?;

        Ok(self.search(&query, &QueryType::Find)?
            .1
            .into_iter()
            .map(|wrapper| wrapper.item)
            .collect::<Vec<_>>())
    }

    pub fn count<T: Structure>(&self, query: &mut Query<T>) -> Result<usize, Error> {
        self.check_query_field(query)?;

        Ok(self.search(&query, &QueryType::Count)?.0)
    }

    pub fn find_one<T: Structure>(&self, query: &mut Query<T>) -> Result<Option<T>, Error> {
        self.check_query_field(query)?;

        query.limit = Some(1);
        let mut items = self.find(query)?;

        if !items.is_empty() {
            Ok(Some(items.remove(0)))
        } else if T::is_one() {
            Ok(Some(T::new_empty()))
        } else {
            Ok(None)
        }
    }

    pub fn update<T: Structure>(&mut self, query: &mut Query<T>) -> Result<usize, Error> {
        self.check_query_field(query)?;

        let updater = query.updater.take().unwrap();
        let mut wrappers = self.delete_to_wrapper(query)?;
        let count = wrappers.len();

        for wrapper in &mut wrappers {
            updater(&mut wrapper.item);
        }

        for wrapper in &wrappers {
            let encoded = encoder::encode_wrapper(&to_value(wrapper), &self.field_int_map);
            self.tree.insert(wrapper.id.clone(), encoded);
        }

        if store::is_persistence() {
            for wrapper in wrappers {
                self.service.insert(wrapper.id.clone(), to_value(&wrapper))?;
            }
        }

        Ok(count)
    }

    pub fn replace<T: Structure>(&mut self, query: &mut Query<T>) -> Result<(), Error> {
        self.check_query_field(query)?;

        if query.id.is_some() {
            query.item.as_mut().unwrap().set_id(query.id.clone().unwrap());
        }

        let wrapper = ItemWrapper::new(query.item.clone().unwrap());
        self.delete_to_wrapper::<T>(&mut Query::new().id(wrapper.id.clone()))?;
        self.insert(query)?;

        Ok(())
    }

    pub fn delete<T: Structure>(&mut self, query: &mut Query<T>) -> Result<Vec<T>, Error> {
        self.check_query_field(query)?;

        Ok(self.delete_to_wrapper(query)?
            .into_iter()
            .map(|wrapper| wrapper.item)
            .collect::<Vec<_>>())
    }

    pub fn insert_encoded_value(&mut self, id: Id, value: Value) -> Result<(), Error> {
        if self.is_one {
            self.clear()?;
            self.tree.insert(id.clone(), value.clone());

            if store::is_persistence() {
                self.service.clear()?;
                self.service.insert(id.clone(), value.clone())?;
            }
        } else {
            self.tree.insert(id.clone(), value.clone());

            if store::is_persistence() {
                self.service.insert(id.clone(), value.clone())?;
            }
        }

        Ok(())
    }

    fn clear(&mut self) -> Result<(), Error> {
        self.tree.clear();

        if store::is_persistence() {
            self.service.clear()?;
        }

        Ok(())
    }

    fn delete_to_wrapper<T: Structure>(&mut self,
                                       query: &mut Query<T>)
                                       -> Result<Vec<ItemWrapper<T>>, Error> {
        let wrappers = self.search::<T>(&query, &QueryType::Find)?.1;

        for wrapper in &wrappers {
            self.tree.delete(&wrapper.id);

            if store::is_persistence() {
                self.service.delete(wrapper.id.clone())?;
            }
        }

        Ok(wrappers)
    }

    fn search<T: Structure>(&self,
                            query: &&mut Query<T>,
                            query_type: &QueryType)
                            -> Result<(usize, Vec<ItemWrapper<T>>), Error> {
        let (count, values) = self.tree.search(&self.pool, query, query_type)?;
        let wrappers = if query_type == &QueryType::Find && count > 0 {
            let wrapper_map = Mutex::new(VecMap::new());
            self.pool.scoped(|scope| {
                let wrapper_map = &wrapper_map;
                let int_field_map = &self.int_field_map;

                for (i, value) in values.into_iter().enumerate() {
                    let ptr = AtomicPtr::new(value as *mut Value);

                    scope.execute(move || {
                        let wrapper = unsafe { &*ptr.load(Ordering::SeqCst) }
                            .to_wrapper::<T>(&*int_field_map)
                            .unwrap();
                        wrapper_map.lock().unwrap().insert(i, wrapper);
                    })
                }
            });

            let mut wrappers = Vec::new();
            let mut wrapper_map = wrapper_map.into_inner().unwrap();

            for i in 0..count {
                wrappers.push(wrapper_map.remove(i).unwrap());
            }

            wrappers

        } else {
            Vec::new()
        };

        Ok((count, wrappers))
    }

    #[inline]
    fn check_query_field<T>(&self, query: &Query<T>) -> Result<(), Error> {
        for field in &query.conditions {
            if !self.int_field_map.contains_key(field.0) {
                let mut found = false;

                for field_int in self.int_field_map.keys() {
                    if get_len_field_int(field_int) == *field.0 {
                        found = true;
                        break;
                    }
                }

                if !found {
                    return Err(Error::FieldNotFound);
                }
            }
        }

        Ok(())
    }
}