use crate::{
config::Config,
error::{PRes, PersyError},
id::{index_id_to_segment_id_meta, IndexId, PersyId, RecRef},
index::{
keeper::{Extractor, IndexSegmentKeeper, IndexSegmentKeeperTx},
serialization::IndexSerialization,
},
locks::RwLockManager,
persy::PersyImpl,
snapshot::SnapshotId,
transaction::Transaction,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
fmt::Display,
io::{Cursor, Read, Write},
str,
sync::Arc,
};
use data_encoding::BASE64URL_NOPAD;
#[derive(Clone)]
pub enum IndexTypeId {
U8,
U16,
U32,
U64,
U128,
I8,
I16,
I32,
I64,
I128,
F32W,
F64W,
STRING,
PERSYID,
BYTEVEC,
}
impl From<u8> for IndexTypeId {
fn from(val: u8) -> IndexTypeId {
match val {
1 => IndexTypeId::U8,
2 => IndexTypeId::U16,
3 => IndexTypeId::U32,
4 => IndexTypeId::U64,
14 => IndexTypeId::U128,
5 => IndexTypeId::I8,
6 => IndexTypeId::I16,
7 => IndexTypeId::I32,
8 => IndexTypeId::I64,
15 => IndexTypeId::I128,
9 => IndexTypeId::F32W,
10 => IndexTypeId::F64W,
12 => IndexTypeId::STRING,
13 => IndexTypeId::PERSYID,
16 => IndexTypeId::BYTEVEC,
_ => panic!("type node defined for {}", val),
}
}
}
pub trait WrapperType<T>: Clone + From<T> + Extractor + Ord {
fn value(self) -> T;
}
pub trait IndexType: Display + IndexOrd + Clone + IndexSerialization {
type Wrapper: WrapperType<Self>;
fn get_id() -> u8;
fn get_type_id() -> IndexTypeId;
}
pub trait IndexOrd {
fn cmp(&self, other: &Self) -> std::cmp::Ordering;
}
macro_rules! impl_index_ord {
($($t:ty),+) => {
$(
impl IndexOrd for $t {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
std::cmp::Ord::cmp(self, other)
}
}
)+
};
}
impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, String, PersyId, ByteVec);
impl IndexOrd for f32 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.is_nan() {
if other.is_nan() {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Less
}
} else if other.is_nan() {
std::cmp::Ordering::Greater
} else {
std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
}
}
}
impl IndexOrd for f64 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.is_nan() {
if other.is_nan() {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Less
}
} else if other.is_nan() {
std::cmp::Ordering::Greater
} else {
std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
}
}
}
#[derive(Debug, PartialOrd, PartialEq, Clone, Ord, Eq)]
pub struct ByteVec(pub Vec<u8>);
impl From<Vec<u8>> for ByteVec {
fn from(f: Vec<u8>) -> ByteVec {
ByteVec(f)
}
}
impl From<ByteVec> for Vec<u8> {
fn from(f: ByteVec) -> Vec<u8> {
f.0
}
}
impl Display for ByteVec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", BASE64URL_NOPAD.encode(&self.0))
}
}
impl std::str::FromStr for ByteVec {
type Err = PersyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(ByteVec(BASE64URL_NOPAD.decode(s.as_bytes())?))
}
}
pub const INDEX_META_PREFIX: &str = "+_M";
pub const INDEX_DATA_PREFIX: &str = "+_D";
pub fn index_name_from_meta_segment(segment_name: &str) -> String {
let mut name = segment_name.to_string();
name.drain(..INDEX_META_PREFIX.len());
name
}
pub fn format_segment_name_meta(index_name: &str) -> String {
format!("{}{}", INDEX_META_PREFIX, index_name)
}
pub fn format_segment_name_data(index_name: &str) -> String {
format!("{}{}", INDEX_DATA_PREFIX, index_name)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ValueMode {
EXCLUSIVE,
CLUSTER,
REPLACE,
}
impl From<u8> for ValueMode {
fn from(value: u8) -> Self {
match value {
1 => ValueMode::EXCLUSIVE,
2 => ValueMode::CLUSTER,
3 => ValueMode::REPLACE,
_ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
}
}
}
impl ValueMode {
fn to_u8(&self) -> u8 {
match self {
ValueMode::EXCLUSIVE => 1,
ValueMode::CLUSTER => 2,
ValueMode::REPLACE => 3,
}
}
}
pub struct Indexes {
index_locks: RwLockManager<IndexId>,
config: Arc<Config>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IndexConfig {
name: String,
root: Option<RecRef>,
pub key_type: u8,
pub value_type: u8,
page_min: usize,
page_max: usize,
pub value_mode: ValueMode,
}
impl IndexConfig {
fn serialize(&self, w: &mut dyn Write) -> PRes<()> {
if let Some(ref root) = self.root {
w.write_u64::<BigEndian>(root.page)?;
w.write_u32::<BigEndian>(root.pos)?;
} else {
w.write_u64::<BigEndian>(0)?;
w.write_u32::<BigEndian>(0)?;
}
w.write_u8(self.key_type)?;
w.write_u8(self.value_type)?;
w.write_u32::<BigEndian>(self.page_min as u32)?;
w.write_u32::<BigEndian>(self.page_max as u32)?;
w.write_u8(self.value_mode.to_u8())?;
w.write_u16::<BigEndian>(self.name.len() as u16)?;
w.write_all(self.name.as_bytes())?;
Ok(())
}
fn deserialize(r: &mut dyn Read) -> PRes<IndexConfig> {
let index_root_page = r.read_u64::<BigEndian>()?;
let index_root_pos = r.read_u32::<BigEndian>()?;
let key_type = r.read_u8()?;
let value_type = r.read_u8()?;
let page_min = r.read_u32::<BigEndian>()? as usize;
let page_max = r.read_u32::<BigEndian>()? as usize;
let value_mode = ValueMode::from(r.read_u8()?);
let name_size = r.read_u16::<BigEndian>()? as usize;
let mut slice: Vec<u8> = vec![0; name_size];
r.read_exact(&mut slice)?;
let name: String = str::from_utf8(&slice[0..name_size])?.into();
let root = if index_root_page != 0 && index_root_pos != 0 {
Some(RecRef::new(index_root_page, index_root_pos))
} else {
None
};
Ok(IndexConfig {
name,
root,
key_type,
value_type,
page_min,
page_max,
value_mode,
})
}
pub fn check<K: IndexType, V: IndexType>(&self) -> PRes<()> {
if self.key_type != K::get_id() {
Err(PersyError::IndexTypeMismatch("key type".into()))
} else if self.value_type != V::get_id() {
Err(PersyError::IndexTypeMismatch("value type".into()))
} else {
Ok(())
}
}
pub fn get_root(&self) -> Option<RecRef> {
self.root.clone()
}
}
fn error_map(err: PersyError) -> PersyError {
if let PersyError::SegmentNotFound = err {
PersyError::IndexNotFound
} else {
err
}
}
impl Indexes {
pub fn new(config: &Arc<Config>) -> Indexes {
Indexes {
index_locks: Default::default(),
config: config.clone(),
}
}
pub fn create_index<K, V>(
p: &PersyImpl,
tx: &mut Transaction,
name: &str,
min: usize,
max: usize,
value_mode: ValueMode,
) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
if min > max / 2 {
return Err(PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax);
}
let segment_name_meta = format_segment_name_meta(name);
p.create_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.create_segment(tx, &segment_name_data)?;
let cfg = IndexConfig {
name: name.to_string(),
root: None,
key_type: K::get_id(),
value_type: V::get_id(),
page_min: min,
page_max: max,
value_mode,
};
let mut scfg = Vec::new();
cfg.serialize(&mut scfg)?;
p.insert_record(tx, &segment_name_meta, &scfg)?;
Ok(())
}
pub fn drop_index(p: &PersyImpl, tx: &mut Transaction, name: &str) -> PRes<()> {
let segment_name_meta = format_segment_name_meta(name);
p.drop_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.drop_segment(tx, &segment_name_data)?;
Ok(())
}
pub fn update_index_root(
p: &PersyImpl,
tx: &mut Transaction,
index_id: &IndexId,
root: Option<RecRef>,
) -> PRes<()> {
let segment_meta = index_id_to_segment_id_meta(index_id);
let (id, mut config) =
if let Some((rid, content, _)) = p.scan_tx(tx, segment_meta.clone()).map_err(error_map)?.next(p, tx) {
(rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
} else {
return Err(PersyError::IndexNotFound);
};
if config.root != root {
config.root = root;
let mut scfg = Vec::new();
config.serialize(&mut scfg)?;
p.update(tx, segment_meta, &id.0, &scfg)?;
}
Ok(())
}
pub fn get_index_tx(p: &PersyImpl, tx: &Transaction, index_id: &IndexId) -> PRes<(IndexConfig, u16)> {
let segment_meta = index_id_to_segment_id_meta(index_id);
if let Some((_, content, version)) = p.scan_tx(tx, segment_meta).map_err(error_map)?.next(p, tx) {
Ok((IndexConfig::deserialize(&mut Cursor::new(content))?, version))
} else {
Err(PersyError::IndexNotFound)
}
}
pub fn get_config_id(p: &PersyImpl, tx: &mut Transaction, index_id: &IndexId) -> PRes<PersyId> {
let segment_meta = index_id_to_segment_id_meta(index_id);
if let Some((id, _, _)) = p.scan_tx(tx, segment_meta).map_err(error_map)?.next(p, tx) {
Ok(id)
} else {
Err(PersyError::IndexNotFound)
}
}
pub fn get_index(p: &PersyImpl, snapshot_id: SnapshotId, index_id: &IndexId) -> PRes<IndexConfig> {
let segment_meta = index_id_to_segment_id_meta(index_id);
p.scan_snapshot(segment_meta, snapshot_id)
.map_err(error_map)?
.next(p)
.map(|(_, content)| IndexConfig::deserialize(&mut Cursor::new(content)))
.unwrap_or(Err(PersyError::IndexNotFound))
}
pub fn get_index_keeper<'a, K: IndexType, V: IndexType>(
p: &'a PersyImpl,
snapshot: SnapshotId,
index_id: &IndexId,
) -> PRes<IndexSegmentKeeper<'a>> {
let config = Indexes::get_index(p, snapshot, index_id)?;
config.check::<K, V>()?;
Ok(IndexSegmentKeeper::new(
&config.name,
index_id,
config.root,
p,
snapshot,
config.value_mode,
))
}
pub fn get_index_keeper_tx<'a, K: IndexType, V: IndexType>(
p: &'a PersyImpl,
tx: &'a mut Transaction,
index_id: &IndexId,
) -> PRes<IndexSegmentKeeperTx<'a, K, V>> {
let (config, version) = Indexes::get_index_tx(p, tx, index_id)?;
config.check::<K, V>()?;
Ok(IndexSegmentKeeperTx::new(
&config.name,
index_id,
config.root,
version,
p,
tx,
config.value_mode,
config.page_min,
config.page_max,
))
}
pub fn check_index<K: IndexType, V: IndexType>(
p: &PersyImpl,
tx: &mut Transaction,
index_id: &IndexId,
) -> PRes<()> {
let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
config.check::<K, V>()
}
#[allow(dead_code)]
pub fn write_lock(&self, indexes: &[IndexId]) -> PRes<()> {
self.index_locks
.lock_all_write(indexes, self.config.transaction_lock_timeout().clone())
}
pub fn read_lock(&self, index: IndexId) -> PRes<()> {
self.index_locks
.lock_all_read(&[index], self.config.transaction_lock_timeout().clone())
}
pub fn read_lock_all(&self, indexes: &[IndexId]) -> PRes<()> {
self.index_locks
.lock_all_read(indexes, self.config.transaction_lock_timeout().clone())
}
pub fn read_unlock(&self, index: IndexId) -> PRes<()> {
self.index_locks.unlock_all_read(&[index])
}
pub fn read_unlock_all(&self, indexes: &[IndexId]) -> PRes<()> {
self.index_locks.unlock_all_read(indexes)
}
#[allow(dead_code)]
pub fn write_unlock(&self, indexes: &[IndexId]) -> PRes<()> {
self.index_locks.unlock_all_write(indexes)
}
}
#[cfg(test)]
mod tests {
use super::{ByteVec, IndexConfig, ValueMode};
use std::io::Cursor;
#[test]
fn test_config_ser_des() {
let cfg = IndexConfig {
name: "abc".to_string(),
root: None,
key_type: 1,
value_type: 1,
page_min: 10,
page_max: 30,
value_mode: ValueMode::REPLACE,
};
let mut buff = Vec::new();
cfg.serialize(&mut Cursor::new(&mut buff)).expect("serialization works");
let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
assert_eq!(cfg, read);
}
#[test]
fn test_bytevec_to_from_string() {
let bv = ByteVec(vec![10, 20]);
let nbv = bv.to_string().parse().unwrap();
assert_eq!(bv, nbv);
}
}