use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::error;
use std::fs::File;
pub use std::fs::OpenOptions;
use std::io;
use std::io::{Read, Write};
use std::rc::Rc;
use std::sync;
use std::sync::Arc;
use address::Address;
use allocator::Allocator;
use config::Config;
use data_encoding::BASE32_DNSSEC;
use discref::DiscRef;
use discref::PAGE_METADATA_SIZE;
use fs2::FileExt;
use index::config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX};
use index::keeper::{IndexKeeper, IndexRawIter};
use index::tree::{Index, PageIter, PageIterBack, Value};
use journal::Journal;
use journal::JOURNAL_PAGE_EXP;
use record_scanner::{SegmentRawIter, TxSegmentRawIter};
use snapshot::{SnapshotId, Snapshots};
use std::collections::HashMap;
use std::fmt;
use std::ops::RangeBounds;
use std::path::Path;
use std::str;
use transaction::TxSegCheck::{CREATED, DROPPED, NONE};
use transaction::{Transaction, TxRead};
use unsigned_varint::decode::{u32 as u32_vdec, u64 as u64_vdec, Error as VarintError};
use unsigned_varint::encode::{u32 as u32_venc, u32_buffer, u64 as u64_venc, u64_buffer};
use {IndexId, PersyId, SegmentId};
const DEFAULT_PAGE_EXP: u8 = 10;
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct RecRef {
pub page: u64,
pub pos: u32,
}
pub struct PersyImpl {
config: Arc<Config>,
journal: Journal,
address: Address,
indexes: Indexes,
allocator: Arc<Allocator>,
snapshots: Snapshots,
}
pub struct TxFinalize {
transaction: Transaction,
pub finished: bool,
}
#[derive(PartialEq, Debug)]
pub enum RecoverStatus {
Started,
PrepareCommit,
Rollback,
Commit,
}
#[derive(Clone)]
pub struct IndexInfo {
pub id: IndexId,
pub value_mode: ValueMode,
pub key_type: IndexTypeId,
pub value_type: IndexTypeId,
}
#[derive(PartialEq, Debug)]
pub enum PersyError {
IO(String),
Err(String),
DecodingUTF(str::Utf8Error),
DecodingVarint(VarintError),
DecodingDataEncoding(data_encoding::DecodeError),
VersionNotLastest,
RecordNotFound(PersyId),
SegmentNotFound,
SegmentAlreadyExists,
CannotDropSegmentCreatedInTx,
Lock,
IndexMinElementsShouldBeAtLeastDoubleOfMax,
IndexNotFound,
IndexTypeMismatch(String),
IndexDuplicateKey(String, String),
}
pub type PRes<T> = Result<T, PersyError>;
impl PersyImpl {
pub fn create(path: &Path) -> PRes<()> {
let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
PersyImpl::create_from_file(f)
}
pub fn create_from_file(f: File) -> PRes<()> {
f.try_lock_exclusive()?;
PersyImpl::init_file(f)?;
Ok(())
}
fn init_file(fl: File) -> PRes<()> {
let disc = DiscRef::new(fl);
let root_page = disc.create_page_raw(DEFAULT_PAGE_EXP)?;
let allocator_page = Allocator::init(&disc)?;
let allocator = &Allocator::new(disc, &Rc::new(Config::new()), allocator_page)?;
let address_page = Address::init(allocator)?;
let journal_page = Journal::init(allocator)?;
{
let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
root.write_u16::<BigEndian>(0)?;
root.write_u64::<BigEndian>(address_page)?;
root.write_u64::<BigEndian>(journal_page)?;
root.write_u64::<BigEndian>(allocator_page)?;
allocator.flush_page(&mut root)?;
}
allocator.disc().sync()?;
Ok(())
}
fn new(file: File, config: Config) -> PRes<PersyImpl> {
let disc = DiscRef::new(file);
let address_page;
let journal_page;
let allocator_page;
{
let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
pg.read_u16::<BigEndian>()?;
address_page = pg.read_u64::<BigEndian>()?;
journal_page = pg.read_u64::<BigEndian>()?;
allocator_page = pg.read_u64::<BigEndian>()?;
}
let config = Arc::new(config);
let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
let address = Address::new(&allocator, &config, address_page)?;
let journal = Journal::new(&allocator, journal_page)?;
let indexes = Indexes::new(&config);
let snapshots = Snapshots::new();
Ok(PersyImpl {
config: config.clone(),
journal,
address,
indexes,
allocator,
snapshots,
})
}
fn recover<C>(&self, check_if_commit: C) -> PRes<()>
where
C: Fn(&Vec<u8>) -> bool,
{
let mut last_id = None;
let mut commit_order = Vec::new();
let mut transactions = HashMap::new();
let journal = &self.journal;
let jp = journal.recover(|record, id| {
let tx = transactions
.entry(id.clone())
.or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone())));
let res = record.recover(&mut tx.1);
if res.is_err() {
tx.0 = RecoverStatus::Rollback;
} else {
match res.unwrap() {
RecoverStatus::Started => {
if tx.0 != RecoverStatus::Rollback {
tx.0 = RecoverStatus::Started;
}
}
RecoverStatus::PrepareCommit => {
if tx.0 != RecoverStatus::Rollback {
tx.0 = RecoverStatus::PrepareCommit;
commit_order.push(id.clone());
}
}
RecoverStatus::Rollback => {
tx.0 = RecoverStatus::Rollback;
}
RecoverStatus::Commit => {
if tx.0 != RecoverStatus::Rollback {
tx.0 = RecoverStatus::Commit;
}
}
}
}
})?;
let allocator = &self.allocator;
let address = &self.address;
let indexes = &self.indexes;
let snapshots = &self.snapshots;
for id in commit_order {
if let Some(mut rec) = transactions.remove(&id) {
if rec.0 == RecoverStatus::PrepareCommit {
if check_if_commit(rec.1.meta_id()) {
rec.1.recover_prepare_commit(journal, address, snapshots, allocator)?;
rec.1.recover_commit(journal, address, indexes, snapshots, allocator)?;
last_id = Some(id.clone());
} else {
rec.1.recover_rollback(journal, address, snapshots, allocator)?;
}
}
}
}
for p in jp {
allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
}
for (_, rec) in transactions.iter_mut() {
rec.1.recover_rollback(journal, address, snapshots, allocator)?;
}
if let Some(id) = last_id {
self.journal.clear(&id)?;
}
allocator.flush_free_list()?;
allocator.disc().sync()?;
Ok(())
}
pub fn open(path: &Path, config: Config) -> PRes<PersyImpl> {
PersyImpl::open_with_recover(path, config, |_| true)
}
pub fn open_from_file(f: File, config: Config) -> PRes<PersyImpl> {
PersyImpl::open_from_file_with_recover(f, config, |_| true)
}
pub fn open_with_recover<C>(path: &Path, config: Config, recover: C) -> PRes<PersyImpl>
where
C: Fn(&Vec<u8>) -> bool,
{
let f = OpenOptions::new()
.write(true)
.read(true)
.create(false)
.truncate(false)
.open(path)?;
PersyImpl::open_from_file_with_recover(f, config, recover)
}
pub fn open_from_file_with_recover<C>(f: File, config: Config, recover: C) -> PRes<PersyImpl>
where
C: Fn(&Vec<u8>) -> bool,
{
f.try_lock_exclusive()?;
let persy = PersyImpl::new(f, config)?;
persy.recover(recover)?;
Ok(persy)
}
pub fn begin_id(&self, meta_id: Vec<u8>) -> PRes<Transaction> {
let journal = &self.journal;
Ok(Transaction::new(journal, self.config.tx_strategy(), meta_id)?)
}
pub fn begin(&self) -> PRes<Transaction> {
self.begin_id(Vec::new())
}
pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
match tx.exists_segment(segment) {
DROPPED => {}
CREATED(_) => {
return Err(PersyError::SegmentAlreadyExists);
}
NONE => {
if self.address.exists_segment(&segment)? {
return Err(PersyError::SegmentAlreadyExists);
}
}
}
let segment_id = self.address.create_temp_segment(segment)?;
tx.add_create_segment(&self.journal, segment, segment_id)?;
Ok(())
}
pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
let (_, segment_id) = self.check_segment_tx(tx, segment)?;
tx.add_drop_segment(&self.journal, segment, segment_id)?;
Ok(())
}
pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
self.address.exists_segment(segment)
}
pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
match tx.exists_segment(segment) {
DROPPED => Ok(false),
CREATED(_) => Ok(true),
NONE => self.address.exists_segment(segment),
}
}
pub fn exists_index(&self, index: &str) -> PRes<bool> {
self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
}
pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
}
fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
match tx.exists_segment(segment) {
DROPPED => Err(PersyError::SegmentNotFound),
CREATED(segment_id) => Ok((true, segment_id)),
NONE => {
if let Some(id) = self.address.segment_id(segment)? {
Ok((false, id))
} else {
Err(PersyError::SegmentNotFound)
}
}
}
}
pub fn insert_record(&self, tx: &mut Transaction, segment: &str, rec: &[u8]) -> PRes<RecRef> {
let (in_tx, segment_id) = self.check_segment_tx(tx, segment)?;
let len = rec.len();
let allocation_exp = exp_from_content_size(len as u64);
let allocator = &self.allocator;
let address = &self.address;
let journal = &self.journal;
let page = allocator.allocate(allocation_exp)?;
let rec_ref = if in_tx {
address.allocate_temp(segment_id)?
} else {
address.allocate(segment_id)?
};
tx.add_insert(journal, segment_id, &rec_ref, page)?;
{
let mut pg = allocator.write_page(page)?;
pg.write_u64::<BigEndian>(len as u64)?;
pg.write_all(rec)?;
allocator.flush_page(&mut pg)?;
}
Ok(rec_ref)
}
fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
match tx.read(rec_ref) {
TxRead::RECORD(rec) => Ok(Some((rec.0, rec.1, segment_id))),
TxRead::DELETED => Ok(None),
TxRead::NONE => Ok(self
.address
.read(rec_ref, segment_id)?
.map(|(pos, version)| (pos, version, segment_id))),
}
}
fn read_ref(&self, tx: &Transaction, segment: &str, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
let (_, segment_id) = self.check_segment_tx(tx, segment)?;
self.read_ref_segment(tx, segment_id, rec_ref)
}
fn read_page(&self, page: u64) -> PRes<Vec<u8>> {
let mut pg = self.allocator.load_page(page)?;
let len = pg.read_u64::<BigEndian>()?;
let mut buffer = Vec::<u8>::with_capacity(len as usize);
pg.take(len).read_to_end(&mut buffer)?;
Ok(buffer)
}
pub fn read_record_scan_tx(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
if let Some(page) = self.read_ref_segment(tx, segment_id, rec_ref)? {
Ok(Some(self.read_page(page.0)?))
} else {
Ok(None)
}
}
pub fn read_record_tx(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
if let Some(page) = self.read_ref(tx, &segment, rec_ref)? {
tx.add_read(&self.journal, page.2, rec_ref, page.1)?;
return Ok(Some(self.read_page(page.0)?));
}
Ok(None)
}
pub fn read_record(&self, segment: &str, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
if let Some(segment_id) = self.address.segment_id(segment)? {
self.read_record_scan(segment_id, rec_ref)
} else {
Err(PersyError::SegmentNotFound)
}
}
pub fn read_record_snapshot(&self, segment: &str, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
if let Some(segment_id) = self.address.segment_id(segment)? {
if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
Ok(Some(self.read_page(rec_vers.pos)?))
} else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
Ok(Some(self.read_page(page)?))
} else {
Ok(None)
}
} else {
Err(PersyError::SegmentNotFound)
}
}
pub fn read_record_scan(&self, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
Ok(Some(self.read_page(page)?))
} else {
Ok(None)
}
}
pub fn scan(&self, segment: &str) -> PRes<SegmentRawIter> {
let segment_id;
if let Some(id) = self.address.segment_id(segment)? {
segment_id = id;
} else {
return Err(PersyError::SegmentNotFound);
}
Ok(SegmentRawIter::new(segment_id, self.address.scan(segment_id)?))
}
pub fn scan_tx<'a>(&'a self, tx: &'a mut Transaction, segment: &str) -> PRes<TxSegmentRawIter> {
let (_, segment_id) = self.check_segment_tx(tx, segment)?;
Ok(TxSegmentRawIter::new(tx, segment_id, self.address.scan(segment_id)?))
}
pub fn update_record(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
let allocator = &self.allocator;
let journal = &self.journal;
if let Some((_, version, segment)) = self.read_ref(tx, segment, rec_ref)? {
let len = rec.len();
let allocation_exp = exp_from_content_size(len as u64);
let page = allocator.allocate(allocation_exp)?;
tx.add_update(journal, segment, &rec_ref, page, version)?;
{
let mut pg = allocator.write_page(page)?;
pg.write_u64::<BigEndian>(len as u64)?;
pg.write_all(rec)?;
allocator.flush_page(&mut pg)?;
}
return Ok(());
}
Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
}
pub fn delete_record(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef) -> PRes<()> {
let journal = &self.journal;
if let Some((_, version, seg)) = self.read_ref(tx, segment, rec_ref)? {
tx.add_delete(journal, seg, &rec_ref, version)?;
return Ok(());
}
Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
}
pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
let allocator = &self.allocator;
let journal = &self.journal;
let snapshots = &self.snapshots;
let address = &self.address;
tx.rollback(journal, address, snapshots, allocator)
}
pub fn prepare_commit(&self, mut tx: Transaction) -> PRes<TxFinalize> {
let indexes = &self.indexes;
let allocator = &self.allocator;
let journal = &self.journal;
let snapshots = &self.snapshots;
let address = &self.address;
tx = tx.prepare_commit(journal, address, indexes, snapshots, self, allocator)?;
Ok(TxFinalize {
transaction: tx,
finished: false,
})
}
pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
if finalizer.finished {
return Ok(());
}
finalizer.finished = true;
let allocator = &self.allocator;
let journal = &self.journal;
let address = &self.address;
let snapshots = &self.snapshots;
let indexes = &self.indexes;
finalizer
.transaction
.rollback_prepared(journal, address, indexes, snapshots, allocator)
}
pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
if finalizer.finished {
return Ok(());
}
finalizer.finished = true;
let allocator = &self.allocator;
let journal = &self.journal;
let indexes = &self.indexes;
let snapshots = &self.snapshots;
let address = &self.address;
finalizer
.transaction
.commit(address, journal, indexes, snapshots, allocator)
}
pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::create_index::<K, V>(self, tx, &index_name.to_string(), 32, 128, value_mode)
}
pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
Indexes::drop_index(self, tx, &index_name.to_string())
}
pub fn put<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: V) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
tx.add_put(&index_name.to_string(), k, v);
Ok(())
}
pub fn remove<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: Option<V>) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
tx.add_remove(&index_name.to_string(), k, v);
Ok(())
}
pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let result;
let vm;
{
let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, Some(tx), None, index_name)?;
self.indexes.read_lock(index_name.to_string())?;
result = ik.get(k)?;
vm = IndexKeeper::<K, V>::value_mode(&ik);
}
self.indexes.read_unlock(index_name.to_string())?;
tx.apply_changes::<K, V>(vm, &index_name.to_string(), k, result)
}
pub fn get<K, V>(&self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let read_snapshot = self.snapshots.read_snapshot()?;
let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
let r = ik.get(k);
self.snapshots.release(read_snapshot)?;
r
}
pub fn index_next<K, V>(&self, index_name: &str, read_snapshot: SnapshotId, next: &RecRef) -> PRes<PageIter<K, V>>
where
K: IndexType,
V: IndexType,
{
let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
ik.iter_node(next)
}
pub fn index_next_back<K, V>(
&self,
index_name: &str,
read_snapshot: SnapshotId,
next: &RecRef,
) -> PRes<PageIterBack<K, V>>
where
K: IndexType,
V: IndexType,
{
let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
ik.back_iter_node(next)
}
pub fn range<K, V, R>(&self, index_name: &str, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let read_snapshot = self.snapshots.read_snapshot()?;
let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
let after = ik.iter_from(range.start_bound())?;
let before = ik.back_iter_from(range.end_bound())?;
let vm = IndexKeeper::<K, V>::value_mode(&ik);
Ok((vm, IndexRawIter::new(index_name, read_snapshot, after, before)))
}
pub fn release(&self, snapshot_id: SnapshotId) -> PRes<()> {
let (to_free, to_clean) = self.snapshots.release(snapshot_id)?;
for page in to_free {
self.allocator.free(page)?;
}
for journal_id in to_clean {
self.journal.clear(&journal_id)?;
}
Ok(())
}
pub fn address(&self) -> &Address {
&self.address
}
pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
Ok(self
.address
.list()?
.into_iter()
.filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
.collect())
}
pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
self.address
.list()?
.into_iter()
.filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
.map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
name.drain(..INDEX_META_PREFIX.len());
let info = self.index_info(None, &name, id)?;
Ok((name, info))
})
.collect()
}
pub fn list_segments_tx(&self, tx: &mut Transaction) -> PRes<Vec<(String, u32)>> {
Ok(tx
.filter_list(self.address.list()?)
.into_iter()
.filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
.collect())
}
pub fn list_indexes_tx(&self, tx: &mut Transaction) -> PRes<Vec<(String, IndexInfo)>> {
tx.filter_list(self.address.list()?)
.into_iter()
.filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
.map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
name.drain(..INDEX_META_PREFIX.len());
let info = self.index_info(Some(tx), &name, id)?;
Ok((name, info))
})
.collect()
}
fn index_info(&self, tx: Option<&mut Transaction>, name: &str, id: u32) -> PRes<IndexInfo> {
let index = Indexes::get_index(self, tx, name)?;
Ok(IndexInfo {
id: IndexId(id),
value_mode: index.value_mode,
key_type: IndexTypeId::from(index.key_type),
value_type: IndexTypeId::from(index.value_type),
})
}
}
pub fn exp_from_content_size(size: u64) -> u8 {
let final_size = size + 8 + u64::from(PAGE_METADATA_SIZE);
let mut res: u8 = 1;
loop {
if final_size < (1 << res) {
return res;
}
res += 1;
}
}
impl From<io::Error> for PersyError {
fn from(erro: io::Error) -> PersyError {
PersyError::IO(format!("{}", erro).to_string())
}
}
impl<T> From<sync::PoisonError<T>> for PersyError {
fn from(_: sync::PoisonError<T>) -> PersyError {
PersyError::Lock
}
}
impl From<str::Utf8Error> for PersyError {
fn from(err: str::Utf8Error) -> PersyError {
PersyError::DecodingUTF(err)
}
}
impl From<data_encoding::DecodeError> for PersyError {
fn from(err: data_encoding::DecodeError) -> PersyError {
PersyError::DecodingDataEncoding(err)
}
}
impl From<VarintError> for PersyError {
fn from(err: VarintError) -> PersyError {
PersyError::DecodingVarint(err)
}
}
impl error::Error for PersyError {}
impl fmt::Display for PersyError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PersyError::IO(m) => write!(f, "IO Error: {}", m),
PersyError::Err(g) => write!(f, "Generic Error: {}", g),
PersyError::DecodingUTF(e) => write!(f, "String decoding error: {}", e),
PersyError::DecodingDataEncoding(e) => write!(f, "Data Encoding Decoding error: {}", e),
PersyError::DecodingVarint(e) => write!(f, "Varint decoding error: {}", e),
PersyError::VersionNotLastest => write!(f, "The record version is not latest"),
PersyError::RecordNotFound(r) => write!(f, "Record not found: {}", r),
PersyError::SegmentNotFound => write!(f, "Segment not found"),
PersyError::SegmentAlreadyExists => write!(f, "Segment already exist"),
PersyError::CannotDropSegmentCreatedInTx => {
write!(f, "Create and drop of a segment in the same transaction is not allowed")
}
PersyError::Lock => write!(f, "Failure acquiring lock for poisoning"),
PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax => write!(
f,
"Index min page elements should be maximum half of the maximum elements"
),
PersyError::IndexNotFound => write!(f, "Index not found"),
PersyError::IndexTypeMismatch(m) => write!(f, "Index method type mismatch persistent types: {}", m),
PersyError::IndexDuplicateKey(i, k) => write!(f, "Found duplicate key:{} for index: {}", k, i),
}
}
}
impl RecRef {
pub fn new(page: u64, pos: u32) -> RecRef {
RecRef { page, pos }
}
}
impl fmt::Display for RecRef {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut buffer = Vec::new();
buffer
.write_all(u64_venc(self.page, &mut u64_buffer()))
.expect("no failure expected only memory allocation");
buffer.push(0b0101_0101);
buffer
.write_all(u32_venc(self.pos, &mut u32_buffer()))
.expect("no failure expected only memory allocation");
write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
}
}
impl std::str::FromStr for RecRef {
type Err = PersyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
let (page, rest) = u64_vdec(&bytes)?;
let (pos, _) = u32_vdec(&rest[1..])?;
Ok(RecRef::new(page, pos))
}
}
fn write_id(f: &mut fmt::Formatter, id: u32) -> fmt::Result {
let mut buffer = Vec::new();
buffer
.write_all(u32_venc(id, &mut u32_buffer()))
.expect("no failure expected only memory allocation");
buffer.push(0b0101_0101);
write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
}
fn read_id(s: &str) -> PRes<u32> {
let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
let (id, _) = u32_vdec(&bytes)?;
Ok(id)
}
impl fmt::Display for SegmentId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write_id(f, self.0)
}
}
impl std::str::FromStr for SegmentId {
type Err = PersyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(SegmentId(read_id(s)?))
}
}
impl fmt::Display for IndexId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write_id(f, self.0)
}
}
impl std::str::FromStr for IndexId {
type Err = PersyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(IndexId(read_id(s)?))
}
}
#[cfg(test)]
mod tests {
use super::{IndexId, RecRef, SegmentId};
#[test()]
fn test_persy_id_string() {
let id = RecRef::new(20, 30);
let s = format!("{}", id);
let rid = s.parse::<RecRef>();
assert_eq!(rid, Ok(id));
}
#[test()]
fn test_persy_id_parse_failure() {
let s = "ACCC";
let rid = s.parse::<RecRef>();
assert!(rid.is_err());
}
#[test()]
fn test_segmend_id_string() {
let id = SegmentId(20);
let s = format!("{}", id);
let rid = s.parse::<SegmentId>();
assert_eq!(rid, Ok(id));
}
#[test()]
fn test_segment_id_parse_failure() {
let s = "ACCC";
let rid = s.parse::<SegmentId>();
assert!(rid.is_err());
}
#[test()]
fn test_index_id_string() {
let id = IndexId(20);
let s = format!("{}", id);
let rid = s.parse::<IndexId>();
assert_eq!(rid, Ok(id));
}
#[test()]
fn test_index_id_parse_failure() {
let s = "ACCC";
let rid = s.parse::<IndexId>();
assert!(rid.is_err());
}
}