use crate::{
config::Config,
error::{CreateIndexError, DropIndexError, IndexChangeError, IndexError, IndexOpsError, PERes},
id::{IndexId, PersyId, RecRef},
index::{
bytevec::ByteVec,
keeper::IndexSegmentKeeper,
keeper_tx::{ExternalRefs, IndexSegmentKeeperTx},
serialization::IndexSerialization,
},
io::{ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
persy::PersyImpl,
snapshots::SnapshotRef,
transaction::tx_impl::TransactionImpl,
};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Display,
str,
sync::{Arc, Mutex},
};
#[derive(Clone)]
pub enum IndexTypeId {
U8,
U16,
U32,
U64,
U128,
I8,
I16,
I32,
I64,
I128,
F32W,
F64W,
String,
PersyId,
ByteVec,
}
impl From<u8> for IndexTypeId {
fn from(val: u8) -> IndexTypeId {
match val {
1 => IndexTypeId::U8,
2 => IndexTypeId::U16,
3 => IndexTypeId::U32,
4 => IndexTypeId::U64,
14 => IndexTypeId::U128,
5 => IndexTypeId::I8,
6 => IndexTypeId::I16,
7 => IndexTypeId::I32,
8 => IndexTypeId::I64,
15 => IndexTypeId::I128,
9 => IndexTypeId::F32W,
10 => IndexTypeId::F64W,
12 => IndexTypeId::String,
13 => IndexTypeId::PersyId,
16 => IndexTypeId::ByteVec,
_ => panic!("type node defined for {}", val),
}
}
}
pub trait IndexType: IndexTypeWrap + Clone {}
#[cfg(not(feature = "index_container_static"))]
pub trait IndexTypeInternal: Display + IndexOrd + Clone + IndexSerialization + IndexTypeUnwrap + 'static {
fn get_id() -> u8;
fn get_type_id() -> IndexTypeId;
fn over_size_limit(&self) -> bool {
false
}
}
#[cfg(feature = "index_container_static")]
pub trait IndexTypeInternal:
Display + IndexOrd + Clone + crate::index::entries_container::Extractor + IndexSerialization + IndexTypeUnwrap + 'static
{
fn get_id() -> u8;
fn get_type_id() -> IndexTypeId;
fn over_size_limit(&self) -> bool {
false
}
}
pub(crate) fn check_over_size_limit(len: usize) -> bool {
len > 1024 * 512
}
pub trait IndexTypeUnwrap {
type Wrapped;
fn unwrap(self) -> Self::Wrapped;
}
pub trait IndexTypeWrap: Sized {
type Wrapper: IndexTypeInternal + IndexTypeUnwrap<Wrapped = Self>;
fn wrap(self) -> Self::Wrapper
where
Self: Sized;
}
macro_rules! impl_wrapper_trait_self {
($($t:ty),+,) => {
$(
impl IndexTypeUnwrap for $t {
type Wrapped = $t;
fn unwrap(self) -> $t {
self
}
}
impl IndexTypeWrap for $t {
type Wrapper = $t;
fn wrap(self) -> $t {
self
}
}
)+
}
}
impl_wrapper_trait_self!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, PersyId, ByteVec,);
macro_rules! impl_index_type {
($t:ty, $v:expr,$v1:ident) => {
impl IndexTypeInternal for $t {
fn get_id() -> u8 {
$v
}
fn get_type_id() -> IndexTypeId {
IndexTypeId::$v1
}
}
impl IndexType for $t {}
};
}
impl_index_type!(u8, 1, U8);
impl_index_type!(u16, 2, U16);
impl_index_type!(u32, 3, U32);
impl_index_type!(u64, 4, U64);
impl_index_type!(u128, 14, U128);
impl_index_type!(i8, 5, I8);
impl_index_type!(i16, 6, I16);
impl_index_type!(i32, 7, I32);
impl_index_type!(i64, 8, I64);
impl_index_type!(i128, 15, I128);
impl_index_type!(f32, 9, F32W);
impl_index_type!(f64, 10, F64W);
impl_index_type!(PersyId, 13, PersyId);
impl IndexType for String {}
pub trait IndexOrd {
fn cmp(&self, other: &Self) -> std::cmp::Ordering;
}
macro_rules! impl_index_ord {
($($t:ty),+) => {
$(
impl IndexOrd for $t {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
std::cmp::Ord::cmp(self, other)
}
}
)+
};
}
impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, PersyId, ByteVec);
impl IndexOrd for f32 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.is_nan() {
if other.is_nan() {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Less
}
} else if other.is_nan() {
std::cmp::Ordering::Greater
} else {
std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
}
}
}
impl IndexOrd for f64 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.is_nan() {
if other.is_nan() {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Less
}
} else if other.is_nan() {
std::cmp::Ordering::Greater
} else {
std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
}
}
}
pub const INDEX_META_PREFIX: &str = "+_M";
pub const INDEX_DATA_PREFIX: &str = "+_D";
pub fn is_index_name_meta(name: &str) -> bool {
name.starts_with(INDEX_META_PREFIX)
}
pub fn is_index_name_data(name: &str) -> bool {
name.starts_with(INDEX_DATA_PREFIX)
}
pub fn change_segment_meta_name_to_index_name(segment_name: &mut String) {
segment_name.drain(..INDEX_META_PREFIX.len());
}
pub fn index_name_from_meta_segment(segment_name: &str) -> String {
let mut name = segment_name.to_string();
name.drain(..INDEX_META_PREFIX.len());
name
}
pub fn format_segment_name_meta(index_name: &str) -> String {
format!("{}{}", INDEX_META_PREFIX, index_name)
}
pub fn format_segment_name_data(index_name: &str) -> String {
format!("{}{}", INDEX_DATA_PREFIX, index_name)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ValueMode {
Exclusive,
Cluster,
Replace,
}
impl From<u8> for ValueMode {
fn from(value: u8) -> Self {
match value {
1 => ValueMode::Exclusive,
2 => ValueMode::Cluster,
3 => ValueMode::Replace,
_ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
}
}
}
impl ValueMode {
fn to_u8(&self) -> u8 {
match self {
ValueMode::Exclusive => 1,
ValueMode::Cluster => 2,
ValueMode::Replace => 3,
}
}
}
#[derive(Default)]
struct ConfigIdCache {
cache: HashMap<IndexId, RecRef>,
hit_count: u32,
}
pub struct Indexes {
config_ids: Mutex<ConfigIdCache>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IndexConfig {
pub name: String,
root: Option<RecRef>,
pub key_type: u8,
pub value_type: u8,
page_min: usize,
page_max: usize,
pub value_mode: ValueMode,
}
impl IndexConfig {
fn serialize(&self, w: &mut dyn InfallibleWrite) {
w.write_u8(0);
self.serialize_v0(w)
}
fn serialize_v0(&self, w: &mut dyn InfallibleWrite) {
if let Some(ref root) = self.root {
w.write_u64(root.page);
w.write_u32(root.pos);
} else {
w.write_u64(0);
w.write_u32(0);
}
w.write_u8(self.key_type);
w.write_u8(self.value_type);
w.write_u32(self.page_min as u32);
w.write_u32(self.page_max as u32);
w.write_u8(self.value_mode.to_u8());
w.write_u16(self.name.len() as u16);
w.write_all(self.name.as_bytes());
}
fn deserialize(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
let version = r.read_u8();
match version {
0u8 => IndexConfig::deserialize_v0(r),
_ => panic!("unsupported disc format"),
}
}
fn deserialize_v0(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
let index_root_page = r.read_u64();
let index_root_pos = r.read_u32();
let key_type = r.read_u8();
let value_type = r.read_u8();
let page_min = r.read_u32() as usize;
let page_max = r.read_u32() as usize;
let value_mode = ValueMode::from(r.read_u8());
let name_size = r.read_u16() as usize;
let mut slice: Vec<u8> = vec![0; name_size];
r.read_exact(&mut slice);
let name: String = str::from_utf8(&slice[0..name_size])?.into();
let root = if index_root_page != 0 && index_root_pos != 0 {
Some(RecRef::new(index_root_page, index_root_pos))
} else {
None
};
Ok(IndexConfig {
name,
root,
key_type,
value_type,
page_min,
page_max,
value_mode,
})
}
pub fn check<K: IndexTypeInternal, V: IndexTypeInternal>(&self) -> Result<(), IndexOpsError> {
if self.key_type != K::get_id() {
Err(IndexOpsError::IndexTypeMismatch("key type".into()))
} else if self.value_type != V::get_id() {
Err(IndexOpsError::IndexTypeMismatch("value type".into()))
} else {
Ok(())
}
}
pub fn get_root(&self) -> Option<RecRef> {
self.root
}
}
impl Indexes {
pub fn new(_config: &Arc<Config>) -> Indexes {
Indexes {
config_ids: Default::default(),
}
}
pub fn create_index<K, V>(
p: &PersyImpl,
tx: &mut TransactionImpl,
name: &str,
min: usize,
max: usize,
value_mode: ValueMode,
) -> Result<(), CreateIndexError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
debug_assert!(min <= max / 2);
let segment_name_meta = format_segment_name_meta(name);
p.create_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.create_segment(tx, &segment_name_data)?;
let cfg = IndexConfig {
name: name.to_string(),
root: None,
key_type: K::get_id(),
value_type: V::get_id(),
page_min: min,
page_max: max,
value_mode,
};
let mut scfg = Vec::new();
cfg.serialize(&mut scfg);
p.insert_record(tx, &segment_name_meta, &scfg)?;
Ok(())
}
pub fn drop_index(p: &PersyImpl, tx: &mut TransactionImpl, name: &str) -> Result<(), DropIndexError> {
let segment_name_meta = format_segment_name_meta(name);
p.drop_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.drop_segment(tx, &segment_name_data)?;
Ok(())
}
pub fn update_index_root(
p: &PersyImpl,
tx: &mut TransactionImpl,
index_id: &IndexId,
root: Option<RecRef>,
) -> Result<(), IndexChangeError> {
let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
let metadata = scan.next(p, tx);
drop(scan);
let (id, mut config) = if let Some((rid, content, _)) = metadata {
(rid, IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?)
} else {
return Err(IndexChangeError::IndexNotFound);
};
if config.root != root {
config.root = root;
let mut scfg = Vec::new();
config.serialize(&mut scfg);
p.update(tx, index_id.get_meta_id(), &id.0, &scfg)?;
}
Ok(())
}
pub fn get_index_tx(
p: &PersyImpl,
tx: &TransactionImpl,
index_id: &IndexId,
) -> Result<(IndexConfig, u16), IndexError> {
let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
let metadata = scan.next(p, tx);
drop(scan);
if let Some((_, content, version)) = metadata {
Ok((IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?, version))
} else {
Err(IndexError::IndexNotFound)
}
}
pub fn get_config_id(p: &PersyImpl, tx: &mut TransactionImpl, index_id: &IndexId) -> Result<PersyId, IndexError> {
let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
let metadata = scan.next(p, tx);
drop(scan);
if let Some((id, _, _)) = metadata {
Ok(id)
} else {
Err(IndexError::IndexNotFound)
}
}
pub fn get_index(p: &PersyImpl, snapshot_ref: &SnapshotRef, index_id: &IndexId) -> Result<IndexConfig, IndexError> {
let mut indexes = p.indexes().config_ids.lock().unwrap();
let segment_meta = index_id.get_meta_id();
indexes.hit_count += 1;
if indexes.hit_count > 1000 {
indexes.hit_count = 0;
indexes.cache.retain(|i, _| p.exists_segment_by_id(&i.get_meta_id()));
}
match indexes.cache.entry(index_id.clone()) {
Entry::Occupied(o) => {
let id = o.get();
let info_read = p
.read_snap_fn(segment_meta, id, snapshot_ref, |mut c| IndexConfig::deserialize(&mut c))
.map_err(IndexError::from)?;
if let Some(info) = info_read {
Ok(info.map_err(IndexError::from)?)
} else {
o.remove();
Err(IndexError::IndexNotFound)
}
}
Entry::Vacant(v) => {
let (id, index) = p
.scan_snapshot_index(segment_meta, snapshot_ref)?
.next(p)
.map(|(id, content)| {
Ok((
id,
IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content)).map_err(IndexError::from)?,
))
})
.unwrap_or(Err(IndexError::IndexNotFound))?;
v.insert(id.0);
Ok(index)
}
}
}
pub fn get_index_keeper<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
p: &'a PersyImpl,
snapshot: &SnapshotRef,
index_id: &IndexId,
) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
let config = Indexes::get_index(p, snapshot, index_id)?;
config.check::<K, V>()?;
Ok(IndexSegmentKeeper::new(
&config.name,
index_id,
config.root,
p,
snapshot,
config.value_mode,
))
}
pub fn get_index_keeper_tx_read<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
p: &'a PersyImpl,
snapshot: &SnapshotRef,
tx: &'a mut TransactionImpl,
index_id: &IndexId,
) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
let (config, _) = Indexes::get_index_tx(p, tx, index_id)?;
config.check::<K, V>()?;
Ok(IndexSegmentKeeper::new(
&config.name,
index_id,
config.root,
p,
snapshot,
config.value_mode,
))
}
pub fn get_index_keeper_tx<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
store: ExternalRefs<'a>,
index_id: &IndexId,
) -> Result<IndexSegmentKeeperTx<'a, K, V>, IndexOpsError> {
let (config, version) = Indexes::get_index_tx(store.persy, store.tx, index_id)?;
config.check::<K, V>()?;
Ok(IndexSegmentKeeperTx::new(
&config.name,
index_id,
config.root,
version,
store,
config.value_mode,
config.page_min,
config.page_max,
))
}
pub fn check_index<K: IndexTypeInternal, V: IndexTypeInternal>(
p: &PersyImpl,
tx: &mut TransactionImpl,
index_id: &IndexId,
) -> Result<(), IndexOpsError> {
let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
config.check::<K, V>()
}
}
#[cfg(test)]
mod tests;