use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use config::Config;
use index::keeper::AddTo;
use index::keeper::IndexSegmentKeeper;
use index::serialization::IndexSerialization;
use persy::{PRes, PersyError, PersyImpl, RecRef};
use snapshot::SnapshotId;
use std::collections::hash_map::Entry;
use std::collections::hash_map::HashMap;
use std::fmt::Display;
use std::io::{Cursor, Read, Write};
use std::str;
use std::sync::{Arc, Condvar, Mutex};
use transaction::Transaction;
#[derive(Clone)]
pub enum IndexTypeId {
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
STRING,
PERSYID,
}
impl From<u8> for IndexTypeId {
fn from(val: u8) -> IndexTypeId {
match val {
1 => IndexTypeId::U8,
2 => IndexTypeId::U16,
3 => IndexTypeId::U32,
4 => IndexTypeId::U64,
5 => IndexTypeId::I8,
6 => IndexTypeId::I16,
7 => IndexTypeId::I32,
8 => IndexTypeId::I64,
12 => IndexTypeId::STRING,
13 => IndexTypeId::PERSYID,
_ => panic!("type node defined for {}", val),
}
}
}
pub trait IndexType: Display + Sized + Ord + Clone + AddTo + IndexSerialization {
fn get_id() -> u8;
fn get_type_id() -> IndexTypeId;
}
#[derive(Clone, Debug, PartialEq)]
pub enum ValueMode {
EXCLUSIVE,
CLUSTER,
REPLACE,
}
pub const INDEX_META_PREFIX: &str = "+_M";
pub const INDEX_DATA_PREFIX: &str = "+_D";
fn format_segment_name_meta(index_name: &str) -> String {
format!("{}{}", INDEX_META_PREFIX, index_name)
}
fn format_segment_name_data(index_name: &str) -> String {
format!("{}{}", INDEX_DATA_PREFIX, index_name)
}
impl From<u8> for ValueMode {
fn from(value: u8) -> Self {
match value {
1 => ValueMode::EXCLUSIVE,
2 => ValueMode::CLUSTER,
3 => ValueMode::REPLACE,
_ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
}
}
}
impl ValueMode {
fn to_u8(&self) -> u8 {
match self {
ValueMode::EXCLUSIVE => 1,
ValueMode::CLUSTER => 2,
ValueMode::REPLACE => 3,
}
}
}
struct IndexLock {
write: bool,
read_count: u32,
cond: Arc<Condvar>,
}
impl IndexLock {
fn new_write() -> IndexLock {
IndexLock {
write: true,
read_count: 0,
cond: Arc::new(Condvar::new()),
}
}
fn new_read() -> IndexLock {
IndexLock {
write: false,
read_count: 1,
cond: Arc::new(Condvar::new()),
}
}
fn inc_read(&mut self) {
self.read_count += 1;
}
fn dec_read(&mut self) -> bool {
self.read_count -= 1;
self.read_count == 0
}
}
pub struct Indexes {
index_locks: Mutex<HashMap<String, IndexLock>>,
config: Arc<Config>,
}
#[derive(Clone)]
pub struct IndexConfig {
name: String,
root: Option<RecRef>,
pub key_type: u8,
pub value_type: u8,
page_min: usize,
page_max: usize,
pub value_mode: ValueMode,
}
impl IndexConfig {
fn serialize(&self, w: &mut Write) -> PRes<()> {
if let Some(ref root) = self.root {
w.write_u64::<BigEndian>(root.page)?;
w.write_u32::<BigEndian>(root.pos)?;
} else {
w.write_u64::<BigEndian>(0)?;
w.write_u32::<BigEndian>(0)?;
}
w.write_u8(self.key_type)?;
w.write_u8(self.value_type)?;
w.write_u32::<BigEndian>(self.page_min as u32)?;
w.write_u32::<BigEndian>(self.page_max as u32)?;
w.write_u8(self.value_mode.to_u8())?;
w.write_u16::<BigEndian>(self.name.len() as u16)?;
w.write_all(self.name.as_bytes())?;
Ok(())
}
fn deserialize(r: &mut Read) -> PRes<IndexConfig> {
let index_root_page = r.read_u64::<BigEndian>()?;
let index_root_pos = r.read_u32::<BigEndian>()?;
let key_type = r.read_u8()?;
let value_type = r.read_u8()?;
let page_min = r.read_u32::<BigEndian>()? as usize;
let page_max = r.read_u32::<BigEndian>()? as usize;
let value_mode = ValueMode::from(r.read_u8()?);
let name_size = r.read_u16::<BigEndian>()? as usize;
let mut slice: Vec<u8> = vec![0; name_size];
r.read_exact(&mut slice)?;
let name: String = str::from_utf8(&slice[0..name_size])?.into();
let root = if index_root_page != 0 && index_root_pos != 0 {
Some(RecRef::new(index_root_page, index_root_pos))
} else {
None
};
Ok(IndexConfig {
name,
root,
key_type,
value_type,
page_min,
page_max,
value_mode,
})
}
}
fn error_map(err: PersyError) -> PersyError {
if let PersyError::SegmentNotFound = err {
PersyError::IndexNotFound
} else {
err
}
}
impl Indexes {
pub fn new(config: &Arc<Config>) -> Indexes {
Indexes {
index_locks: Mutex::new(HashMap::new()),
config: config.clone(),
}
}
pub fn create_index<K, V>(
p: &PersyImpl,
tx: &mut Transaction,
name: &str,
min: usize,
max: usize,
value_mode: ValueMode,
) -> PRes<()>
where
K: Clone + Ord + IndexType,
V: Clone + IndexType,
{
if min > max / 2 {
return Err(PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax);
}
let segment_name_meta = format_segment_name_meta(name);
p.create_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.create_segment(tx, &segment_name_data)?;
let cfg = IndexConfig {
name: name.to_string(),
root: None,
key_type: K::get_id(),
value_type: V::get_id(),
page_min: min,
page_max: max,
value_mode,
};
let mut scfg = Vec::new();
cfg.serialize(&mut scfg)?;
p.insert_record(tx, &segment_name_meta, &scfg)?;
Ok(())
}
pub fn drop_index(p: &PersyImpl, tx: &mut Transaction, name: &str) -> PRes<()> {
let segment_name_meta = format_segment_name_meta(name);
p.drop_segment(tx, &segment_name_meta)?;
let segment_name_data = format_segment_name_data(name);
p.drop_segment(tx, &segment_name_data)?;
Ok(())
}
pub fn update_index_root(p: &PersyImpl, tx: &mut Transaction, name: &str, root: Option<RecRef>) -> PRes<()> {
let segment_name = format_segment_name_meta(name);
let (id, mut config) = if let Some((rid, content)) = p.scan_tx(tx, &segment_name).map_err(error_map)?.next(p) {
(rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
} else {
return Err(PersyError::IndexNotFound);
};
if config.root != root {
config.root = root;
let mut scfg = Vec::new();
config.serialize(&mut scfg)?;
p.update_record(tx, &segment_name, &id.0, &scfg)?;
}
Ok(())
}
pub fn get_index(p: &PersyImpl, op_tx: Option<&Transaction>, name: &str) -> PRes<IndexConfig> {
let segment_name_meta = format_segment_name_meta(name);
let meta = if let Some(tx) = op_tx {
p.scan_tx(tx, &segment_name_meta).map_err(error_map)?.next(p)
} else {
p.scan(&segment_name_meta).map_err(error_map)?.next(p)
};
if let Some((_, content)) = meta {
Ok(IndexConfig::deserialize(&mut Cursor::new(content))?)
} else {
Err(PersyError::IndexNotFound)
}
}
pub fn check_and_get_index<K: Clone + Ord + IndexType, V: Clone + IndexType>(
p: &PersyImpl,
op_tx: Option<&Transaction>,
name: &str,
) -> PRes<IndexConfig> {
let index = Indexes::get_index(p, op_tx, name)?;
if index.key_type != K::get_id() {
return Err(PersyError::IndexTypeMismatch(
"given key type miss match to persistent key type".to_string(),
));
}
if index.value_type != V::get_id() {
return Err(PersyError::IndexTypeMismatch(
"given value type miss match to persistent key type".to_string(),
));
}
Ok(index)
}
pub fn check_and_get_index_keeper<'a, K: Clone + Ord + IndexType, V: Clone + IndexType>(
p: &'a PersyImpl,
tx: Option<&'a mut Transaction>,
snapshot: Option<SnapshotId>,
name: &str,
) -> PRes<IndexSegmentKeeper<'a>> {
let (config, tx) = if let Some(t) = tx {
(Indexes::check_and_get_index::<K, V>(p, Some(t), name)?, Some(t))
} else {
(Indexes::check_and_get_index::<K, V>(p, None, name)?, None)
};
Ok(IndexSegmentKeeper::new(
name,
&format_segment_name_data(name),
config.root,
p,
tx,
snapshot,
config.value_mode,
))
}
pub fn write_lock(&self, indexes: &[String]) -> PRes<()> {
for index in indexes {
let seg_lock = IndexLock::new_write();
loop {
let mut lock_manager = self.index_locks.lock()?;
let cond = match lock_manager.entry(index.clone()) {
Entry::Occupied(o) => o.get().cond.clone(),
Entry::Vacant(v) => {
v.insert(seg_lock);
break;
}
};
cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
}
}
Ok(())
}
pub fn read_lock(&self, index: String) -> PRes<()> {
loop {
let mut lock_manager = self.index_locks.lock()?;
let cond;
match lock_manager.entry(index.clone()) {
Entry::Occupied(mut o) => {
if o.get().write {
cond = o.get().cond.clone();
} else {
o.get_mut().inc_read();
break;
}
}
Entry::Vacant(v) => {
v.insert(IndexLock::new_read());
break;
}
};
cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
}
Ok(())
}
pub fn read_unlock(&self, index: String) -> PRes<()> {
let mut lock_manager = self.index_locks.lock()?;
if let Entry::Occupied(mut lock) = lock_manager.entry(index) {
if lock.get_mut().dec_read() {
let cond = lock.get().cond.clone();
lock.remove();
cond.notify_one();
}
}
Ok(())
}
pub fn write_unlock(&self, indexes: &[String]) -> PRes<()> {
for index in indexes {
let mut lock_manager = self.index_locks.lock()?;
if let Some(lock) = lock_manager.remove(index) {
lock.cond.notify_one();
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{IndexConfig, ValueMode};
use std::io::Cursor;
#[test()]
fn test_config_ser_des() {
let cfg = IndexConfig {
name: "abc".to_string(),
root: None,
key_type: 1,
value_type: 1,
page_min: 10,
page_max: 30,
value_mode: ValueMode::REPLACE,
};
let mut buff = Vec::new();
cfg.serialize(&mut Cursor::new(&mut buff)).expect("serialization works");
let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
assert_eq!(cfg.name, read.name);
assert_eq!(cfg.root, read.root);
assert_eq!(cfg.key_type, read.key_type);
assert_eq!(cfg.value_type, read.value_type);
assert_eq!(cfg.page_min, read.page_min);
assert_eq!(cfg.page_max, read.page_max);
assert_eq!(cfg.value_mode, read.value_mode);
}
}