use crate::{
config::Config,
error::{PRes, PersyError},
id::{PersyId, RecRef},
index::{
keeper::{AddTo, IndexSegmentKeeper},
serialization::IndexSerialization,
},
locks::RwLockManager,
persy::PersyImpl,
snapshot::SnapshotId,
transaction::Transaction,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
cmp::Ordering,
fmt::Display,
io::{Cursor, Read, Write},
str,
sync::Arc,
};
use data_encoding::BASE64URL_NOPAD;
macro_rules! index_type_id_def {
($($id:expr => $variant:ident),+,) => {
#[derive(Clone)]
pub enum IndexTypeId {
$(
$variant,
)+
}
impl From<u8> for IndexTypeId {
fn from(val: u8) -> IndexTypeId {
match val {
$(
$id => IndexTypeId::$variant,
)+
_ => panic!("type node defined for {}", val),
}
}
}
};
}
index_type_id_def!(
1 => U8,
2 => U16,
3 => U32,
4 => U64,
14 => U128,
5 => I8,
6 => I16,
7 => I32,
8 => I64,
15 => I128,
9 => F32W,
10 => F64W,
12 => STRING,
13 => PERSYID,
16 => BYTEVEC,
);
pub trait IndexType: Display + Sized + IndexOrd + Clone + AddTo + IndexSerialization {
fn get_id() -> u8;
fn get_type_id() -> IndexTypeId;
}
pub trait IndexOrd {
fn cmp(&self, other: &Self) -> std::cmp::Ordering;
}
macro_rules! impl_index_ord {
($($t:ty),+) => {
$(
impl IndexOrd for $t {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
std::cmp::Ord::cmp(self, other)
}
}
)+
};
}
impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, String, PersyId, ByteVec);
macro_rules! simple_wrapper {
($(#[$m:meta])* $t:ident, $tw:ty) => {
#[derive(Debug, PartialOrd, PartialEq, Clone)]
$(#[$m])*
pub struct $t(pub $tw);
impl Eq for $t {}
impl From<$tw> for $t {
fn from(f: $tw) -> $t {
$t(f)
}
}
impl From<$t> for $tw {
fn from(f: $t) -> $tw {
f.0
}
}
};
}
macro_rules! float_wrapper {
($t:ident, $tw:ty) => {
impl IndexOrd for $tw {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.is_nan() {
if other.is_nan() {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Less
}
} else if other.is_nan() {
std::cmp::Ordering::Greater
} else {
std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
}
}
}
simple_wrapper!($t, $tw);
impl Ord for $t {
fn cmp(&self, other: &Self) -> Ordering {
if let Some(r) = self.partial_cmp(&other) {
r
} else if self.0.is_nan() {
Ordering::Greater
} else {
Ordering::Less
}
}
}
impl Display for $t {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl From<&$tw> for $t {
fn from(f: &$tw) -> $t {
$t(*f)
}
}
};
}
float_wrapper!(F32W, f32);
float_wrapper!(F64W, f64);
simple_wrapper!(
#[derive(Ord)]
ByteVec,
Vec<u8>
);
impl Display for ByteVec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", BASE64URL_NOPAD.encode(&self.0))
}
}
impl std::str::FromStr for ByteVec {
type Err = PersyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(ByteVec(BASE64URL_NOPAD.decode(s.as_bytes())?))
}
}
pub const INDEX_META_PREFIX: &str = "+_M";
pub const INDEX_DATA_PREFIX: &str = "+_D";
fn format_segment_name_meta(index_name: &str) -> String {
format!("{}{}", INDEX_META_PREFIX, index_name)
}
fn format_segment_name_data(index_name: &str) -> String {
format!("{}{}", INDEX_DATA_PREFIX, index_name)
}
macro_rules! value_mode_def {
($($(#[$m:meta])+ $variant:ident: $id:expr),+,) => {
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ValueMode {
$(
$(#[$m])*
$variant,
)+
}
impl From<u8> for ValueMode {
fn from(value: u8) -> Self {
match value {
$(
$id => ValueMode::$variant,
)+
_ => unreachable!("is impossible to get a value mode from {}", value),
}
}
}
impl ValueMode {
fn to_u8(&self) -> u8 {
match self {
$(
ValueMode::$variant => $id,
)+
}
}
}
};
}
value_mode_def!(
EXCLUSIVE: 1,
CLUSTER: 2,
REPLACE: 3,
);
pub struct Indexes {
index_locks: RwLockManager<String>,
config: Arc<Config>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IndexConfig {
name: String,
root: Option<RecRef>,
pub key_type: u8,
pub value_type: u8,
page_min: usize,
page_max: usize,
pub value_mode: ValueMode,
}
impl IndexConfig {
fn serialize(&self, w: &mut dyn Write) -> PRes<()> {
if let Some(ref root) = self.root {
w.write_u64::<BigEndian>(root.page)?;
w.write_u32::<BigEndian>(root.pos)?;
} else {
w.write_u64::<BigEndian>(0)?;
w.write_u32::<BigEndian>(0)?;
}
w.write_u8(self.key_type)?;
w.write_u8(self.value_type)?;
w.write_u32::<BigEndian>(self.page_min as u32)?;
w.write_u32::<BigEndian>(self.page_max as u32)?;
w.write_u8(self.value_mode.to_u8())?;
w.write_u16::<BigEndian>(self.name.len() as u16)?;
w.write_all(self.name.as_bytes())?;
Ok(())
}
fn deserialize(r: &mut dyn Read) -> PRes<IndexConfig> {
let index_root_page = r.read_u64::<BigEndian>()?;
let index_root_pos = r.read_u32::<BigEndian>()?;
let key_type = r.read_u8()?;
let value_type = r.read_u8()?;
let page_min = r.read_u32::<BigEndian>()? as usize;
let page_max = r.read_u32::<BigEndian>()? as usize;
let value_mode = ValueMode::from(r.read_u8()?);
let name_size = r.read_u16::<BigEndian>()? as usize;
let mut slice: Vec<u8> = vec![0; name_size];
r.read_exact(&mut slice)?;
let name: String = str::from_utf8(&slice[0..name_size])?.into();
let root = if index_root_page != 0 && index_root_pos != 0 {
Some(RecRef::new(index_root_page, index_root_pos))
} else {
None
};
Ok(IndexConfig {
name,
root,
key_type,
value_type,
page_min,
page_max,
value_mode,
})
}
}
fn error_map(err: PersyError) -> PersyError {
if let PersyError::SegmentNotFound = err {
PersyError::IndexNotFound
} else {
err
}
}
impl Indexes {
pub fn new(config: &Arc<Config>) -> Indexes {
Indexes {
index_locks: Default::default(),
config: config.clone(),
}
}
pub fn create_index<K, V>(
p: &PersyImpl,
tx: &mut Transaction,
name: &str,
min: usize,
max: usize,
value_mode: ValueMode,
) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
if min > max / 2 {
return Err(PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax);
}
let segment_name_meta = format_segment_name_meta(name);
p.create_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.create_segment(tx, &segment_name_data)?;
let cfg = IndexConfig {
name: name.to_string(),
root: None,
key_type: K::get_id(),
value_type: V::get_id(),
page_min: min,
page_max: max,
value_mode,
};
let mut scfg = Vec::new();
cfg.serialize(&mut scfg)?;
p.insert_record(tx, &segment_name_meta, &scfg)?;
Ok(())
}
pub fn drop_index(p: &PersyImpl, tx: &mut Transaction, name: &str) -> PRes<()> {
let segment_name_meta = format_segment_name_meta(name);
p.drop_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.drop_segment(tx, &segment_name_data)?;
Ok(())
}
pub fn update_index_root(p: &PersyImpl, tx: &mut Transaction, name: &str, root: Option<RecRef>) -> PRes<()> {
let segment_name = format_segment_name_meta(name);
let (id, mut config) =
if let Some((rid, content)) = p.scan_tx(tx, &segment_name).map_err(error_map)?.next(p, tx) {
(rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
} else {
return Err(PersyError::IndexNotFound);
};
if config.root != root {
config.root = root;
let mut scfg = Vec::new();
config.serialize(&mut scfg)?;
p.update_record(tx, &segment_name, &id.0, &scfg)?;
}
Ok(())
}
pub fn get_index(p: &PersyImpl, op_tx: Option<&Transaction>, name: &str) -> PRes<IndexConfig> {
let segment_name_meta = format_segment_name_meta(name);
let meta = if let Some(tx) = op_tx {
p.scan_tx(tx, &segment_name_meta).map_err(error_map)?.next(p, tx)
} else {
p.scan(&segment_name_meta).map_err(error_map)?.next(p)
};
if let Some((_, content)) = meta {
Ok(IndexConfig::deserialize(&mut Cursor::new(content))?)
} else {
Err(PersyError::IndexNotFound)
}
}
pub fn check_and_get_index<K: IndexType, V: IndexType>(
p: &PersyImpl,
op_tx: Option<&Transaction>,
name: &str,
) -> PRes<IndexConfig> {
let index = Indexes::get_index(p, op_tx, name)?;
if index.key_type != K::get_id() {
Err(PersyError::IndexTypeMismatch("key type".into()))
} else if index.value_type != V::get_id() {
Err(PersyError::IndexTypeMismatch("value type".into()))
} else {
Ok(index)
}
}
pub fn check_and_get_index_keeper<'a, K: IndexType, V: IndexType>(
p: &'a PersyImpl,
tx: Option<&'a mut Transaction>,
snapshot: Option<SnapshotId>,
name: &str,
) -> PRes<IndexSegmentKeeper<'a, K, V>> {
let (config, tx) = if let Some(t) = tx {
(Indexes::check_and_get_index::<K, V>(p, Some(t), name)?, Some(t))
} else {
(Indexes::check_and_get_index::<K, V>(p, None, name)?, None)
};
Ok(IndexSegmentKeeper::new(
name,
&format_segment_name_data(name),
config.root,
p,
tx,
snapshot,
config.value_mode,
))
}
pub fn write_lock(&self, indexes: &[String]) -> PRes<()> {
self.index_locks
.lock_all_write(indexes, self.config.transaction_lock_timeout().clone())
}
pub fn read_lock(&self, index: String) -> PRes<()> {
self.index_locks
.lock_all_read(&[index], self.config.transaction_lock_timeout().clone())
}
pub fn read_unlock(&self, index: String) -> PRes<()> {
self.index_locks.unlock_all_read(&[index])
}
pub fn write_unlock(&self, indexes: &[String]) -> PRes<()> {
self.index_locks.unlock_all_write(indexes)
}
}
#[cfg(test)]
mod tests {
use super::{ByteVec, IndexConfig, ValueMode};
use std::io::Cursor;
#[test]
fn test_config_ser_des() {
let cfg = IndexConfig {
name: "abc".to_string(),
root: None,
key_type: 1,
value_type: 1,
page_min: 10,
page_max: 30,
value_mode: ValueMode::REPLACE,
};
let mut buff = Vec::new();
cfg.serialize(&mut Cursor::new(&mut buff)).expect("serialization works");
let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
assert_eq!(cfg, read);
}
#[test]
fn test_bytevec_to_from_string() {
let bv = ByteVec(vec![10, 20]);
let nbv = bv.to_string().parse().unwrap();
assert_eq!(bv, nbv);
}
}