persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::{
    config::Config,
    error::{PRes, PersyError},
    id::{PersyId, RecRef},
    index::{
        keeper::{AddTo, IndexSegmentKeeper},
        serialization::IndexSerialization,
    },
    locks::RwLockManager,
    persy::PersyImpl,
    snapshot::SnapshotId,
    transaction::Transaction,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
    cmp::Ordering,
    fmt::Display,
    io::{Cursor, Read, Write},
    str,
    sync::Arc,
};

use data_encoding::BASE64URL_NOPAD;

macro_rules! index_type_id_def {
    ($($id:expr => $variant:ident),+,) => {
        /// Enum of all the possible Key or Value types for indexes
        #[derive(Clone)]
        pub enum IndexTypeId {
            $(
                $variant,
            )+
        }

        impl From<u8> for IndexTypeId {
            fn from(val: u8) -> IndexTypeId {
                match val {
                    $(
                        $id => IndexTypeId::$variant,
                    )+
                    _ => panic!("type node defined for {}", val),
                }
            }
        }
    };
}

index_type_id_def!(
    1 => U8,
    2 => U16,
    3 => U32,
    4 => U64,
    14 => U128,
    5 => I8,
    6 => I16,
    7 => I32,
    8 => I64,
    15 => I128,
    9 => F32W,
    10 => F64W,
    12 => STRING,
    13 => PERSYID,
    16 => BYTEVEC,
);

/// Trait implemented by all supported types in the index
pub trait IndexType: Display + Sized + IndexOrd + Clone + AddTo + IndexSerialization {
    fn get_id() -> u8;
    fn get_type_id() -> IndexTypeId;
}

pub trait IndexOrd {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering;
}

macro_rules! impl_index_ord {
    ($($t:ty),+) => {
        $(
        impl IndexOrd for $t {
            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
                std::cmp::Ord::cmp(self, other)
            }
        }
        )+
    };
}

impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, String, PersyId, ByteVec);

macro_rules! simple_wrapper {
    ($(#[$m:meta])* $t:ident, $tw:ty) => {
        #[derive(Debug, PartialOrd, PartialEq, Clone)]
        $(#[$m])*
        pub struct $t(pub $tw);

        impl Eq for $t {}
        impl From<$tw> for $t {
            fn from(f: $tw) -> $t {
                $t(f)
            }
        }
        impl From<$t> for $tw {
            fn from(f: $t) -> $tw {
                f.0
            }
        }
    };
}

macro_rules! float_wrapper {
    ($t:ident, $tw:ty) => {
        impl IndexOrd for $tw {
            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
                if self.is_nan() {
                    if other.is_nan() {
                        std::cmp::Ordering::Equal
                    } else {
                        std::cmp::Ordering::Less
                    }
                } else if other.is_nan() {
                    std::cmp::Ordering::Greater
                } else {
                    std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
                }
            }
        }

        simple_wrapper!($t, $tw);

        impl Ord for $t {
            fn cmp(&self, other: &Self) -> Ordering {
                if let Some(r) = self.partial_cmp(&other) {
                    r
                } else if self.0.is_nan() {
                    Ordering::Greater
                } else {
                    Ordering::Less
                }
            }
        }
        impl Display for $t {
            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
                self.0.fmt(f)
            }
        }

        impl From<&$tw> for $t {
            fn from(f: &$tw) -> $t {
                $t(*f)
            }
        }
    };
}

float_wrapper!(F32W, f32);
float_wrapper!(F64W, f64);
simple_wrapper!(
    /// Wraps a `Vec<u8>` to be able to use it as a Index key/value
    #[derive(Ord)]
    ByteVec,
    Vec<u8>
);

impl Display for ByteVec {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", BASE64URL_NOPAD.encode(&self.0))
    }
}

impl std::str::FromStr for ByteVec {
    type Err = PersyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(ByteVec(BASE64URL_NOPAD.decode(s.as_bytes())?))
    }
}

pub const INDEX_META_PREFIX: &str = "+_M";
pub const INDEX_DATA_PREFIX: &str = "+_D";

fn format_segment_name_meta(index_name: &str) -> String {
    format!("{}{}", INDEX_META_PREFIX, index_name)
}

fn format_segment_name_data(index_name: &str) -> String {
    format!("{}{}", INDEX_DATA_PREFIX, index_name)
}

macro_rules! value_mode_def {
    ($($(#[$m:meta])+ $variant:ident: $id:expr),+,) => {
        /// Define the behavior of the index in case a key value pair already exists
        #[derive(Clone, Debug, PartialEq, Eq)]
        pub enum ValueMode {
            $(
                $(#[$m])*
                $variant,
            )+
        }

        impl From<u8> for ValueMode {
            fn from(value: u8) -> Self {
                match value {
                    $(
                        $id => ValueMode::$variant,
                    )+
                    _ => unreachable!("is impossible to get a value mode from {}", value),
                }
            }
        }

        impl ValueMode {
            fn to_u8(&self) -> u8 {
                match self {
                    $(
                        ValueMode::$variant => $id,
                    )+
                }
            }
        }
    };
}

value_mode_def!(
    /// An error will return if a key value pair already exists
    EXCLUSIVE: 1,
    /// The value will be add to a list of values for the key, duplicate value will be collapsed to
    /// only one entry
    CLUSTER: 2,
    /// The existing value will be replaced with the new value if a key value pair already exists
    REPLACE: 3,
);

pub struct Indexes {
    index_locks: RwLockManager<String>,
    config: Arc<Config>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IndexConfig {
    name: String,
    root: Option<RecRef>,
    pub key_type: u8,
    pub value_type: u8,
    page_min: usize,
    page_max: usize,
    pub value_mode: ValueMode,
}

impl IndexConfig {
    fn serialize(&self, w: &mut dyn Write) -> PRes<()> {
        if let Some(ref root) = self.root {
            w.write_u64::<BigEndian>(root.page)?;
            w.write_u32::<BigEndian>(root.pos)?;
        } else {
            w.write_u64::<BigEndian>(0)?;
            w.write_u32::<BigEndian>(0)?;
        }
        w.write_u8(self.key_type)?;
        w.write_u8(self.value_type)?;
        w.write_u32::<BigEndian>(self.page_min as u32)?;
        w.write_u32::<BigEndian>(self.page_max as u32)?;
        w.write_u8(self.value_mode.to_u8())?;
        w.write_u16::<BigEndian>(self.name.len() as u16)?;
        w.write_all(self.name.as_bytes())?;
        Ok(())
    }
    fn deserialize(r: &mut dyn Read) -> PRes<IndexConfig> {
        let index_root_page = r.read_u64::<BigEndian>()?;
        let index_root_pos = r.read_u32::<BigEndian>()?;
        let key_type = r.read_u8()?;
        let value_type = r.read_u8()?;
        let page_min = r.read_u32::<BigEndian>()? as usize;
        let page_max = r.read_u32::<BigEndian>()? as usize;
        let value_mode = ValueMode::from(r.read_u8()?);

        let name_size = r.read_u16::<BigEndian>()? as usize;
        let mut slice: Vec<u8> = vec![0; name_size];
        r.read_exact(&mut slice)?;
        let name: String = str::from_utf8(&slice[0..name_size])?.into();
        let root = if index_root_page != 0 && index_root_pos != 0 {
            Some(RecRef::new(index_root_page, index_root_pos))
        } else {
            None
        };
        Ok(IndexConfig {
            name,
            root,
            key_type,
            value_type,
            page_min,
            page_max,
            value_mode,
        })
    }
}

fn error_map(err: PersyError) -> PersyError {
    if let PersyError::SegmentNotFound = err {
        PersyError::IndexNotFound
    } else {
        err
    }
}

impl Indexes {
    pub fn new(config: &Arc<Config>) -> Indexes {
        Indexes {
            index_locks: Default::default(),
            config: config.clone(),
        }
    }

    pub fn create_index<K, V>(
        p: &PersyImpl,
        tx: &mut Transaction,
        name: &str,
        min: usize,
        max: usize,
        value_mode: ValueMode,
    ) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        if min > max / 2 {
            return Err(PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax);
        }
        let segment_name_meta = format_segment_name_meta(name);
        p.create_segment(tx, &segment_name_meta)?;
        let segment_name_data = format_segment_name_data(name);
        p.create_segment(tx, &segment_name_data)?;
        let cfg = IndexConfig {
            name: name.to_string(),
            root: None,
            key_type: K::get_id(),
            value_type: V::get_id(),
            page_min: min,
            page_max: max,
            value_mode,
        };
        let mut scfg = Vec::new();
        cfg.serialize(&mut scfg)?;
        p.insert_record(tx, &segment_name_meta, &scfg)?;
        Ok(())
    }

    pub fn drop_index(p: &PersyImpl, tx: &mut Transaction, name: &str) -> PRes<()> {
        let segment_name_meta = format_segment_name_meta(name);
        p.drop_segment(tx, &segment_name_meta)?;
        let segment_name_data = format_segment_name_data(name);
        p.drop_segment(tx, &segment_name_data)?;
        Ok(())
    }

    pub fn update_index_root(p: &PersyImpl, tx: &mut Transaction, name: &str, root: Option<RecRef>) -> PRes<()> {
        let segment_name = format_segment_name_meta(name);
        let (id, mut config) =
            if let Some((rid, content)) = p.scan_tx(tx, &segment_name).map_err(error_map)?.next(p, tx) {
                (rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
            } else {
                return Err(PersyError::IndexNotFound);
            };

        if config.root != root {
            config.root = root;
            let mut scfg = Vec::new();
            config.serialize(&mut scfg)?;
            p.update_record(tx, &segment_name, &id.0, &scfg)?;
        }
        Ok(())
    }

    pub fn get_index(p: &PersyImpl, op_tx: Option<&Transaction>, name: &str) -> PRes<IndexConfig> {
        let segment_name_meta = format_segment_name_meta(name);
        let meta = if let Some(tx) = op_tx {
            p.scan_tx(tx, &segment_name_meta).map_err(error_map)?.next(p, tx)
        } else {
            p.scan(&segment_name_meta).map_err(error_map)?.next(p)
        };

        if let Some((_, content)) = meta {
            Ok(IndexConfig::deserialize(&mut Cursor::new(content))?)
        } else {
            Err(PersyError::IndexNotFound)
        }
    }

    pub fn check_and_get_index<K: IndexType, V: IndexType>(
        p: &PersyImpl,
        op_tx: Option<&Transaction>,
        name: &str,
    ) -> PRes<IndexConfig> {
        let index = Indexes::get_index(p, op_tx, name)?;
        if index.key_type != K::get_id() {
            Err(PersyError::IndexTypeMismatch("key type".into()))
        } else if index.value_type != V::get_id() {
            Err(PersyError::IndexTypeMismatch("value type".into()))
        } else {
            Ok(index)
        }
    }

    pub fn check_and_get_index_keeper<'a, K: IndexType, V: IndexType>(
        p: &'a PersyImpl,
        tx: Option<&'a mut Transaction>,
        snapshot: Option<SnapshotId>,
        name: &str,
    ) -> PRes<IndexSegmentKeeper<'a, K, V>> {
        let (config, tx) = if let Some(t) = tx {
            (Indexes::check_and_get_index::<K, V>(p, Some(t), name)?, Some(t))
        } else {
            (Indexes::check_and_get_index::<K, V>(p, None, name)?, None)
        };
        Ok(IndexSegmentKeeper::new(
            name,
            &format_segment_name_data(name),
            config.root,
            p,
            tx,
            snapshot,
            config.value_mode,
        ))
    }

    pub fn write_lock(&self, indexes: &[String]) -> PRes<()> {
        self.index_locks
            .lock_all_write(indexes, self.config.transaction_lock_timeout().clone())
    }

    pub fn read_lock(&self, index: String) -> PRes<()> {
        self.index_locks
            .lock_all_read(&[index], self.config.transaction_lock_timeout().clone())
    }

    pub fn read_unlock(&self, index: String) -> PRes<()> {
        self.index_locks.unlock_all_read(&[index])
    }

    pub fn write_unlock(&self, indexes: &[String]) -> PRes<()> {
        self.index_locks.unlock_all_write(indexes)
    }
}

#[cfg(test)]
mod tests {
    use super::{ByteVec, IndexConfig, ValueMode};
    use std::io::Cursor;

    #[test]
    fn test_config_ser_des() {
        let cfg = IndexConfig {
            name: "abc".to_string(),
            root: None,
            key_type: 1,
            value_type: 1,
            page_min: 10,
            page_max: 30,
            value_mode: ValueMode::REPLACE,
        };

        let mut buff = Vec::new();
        cfg.serialize(&mut Cursor::new(&mut buff)).expect("serialization works");
        let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
        assert_eq!(cfg, read);
    }

    #[test]
    fn test_bytevec_to_from_string() {
        let bv = ByteVec(vec![10, 20]);
        let nbv = bv.to_string().parse().unwrap();
        assert_eq!(bv, nbv);
    }
}