use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::error;
use std::fs::File;
pub use std::fs::OpenOptions;
use std::io;
use std::io::{Read, Write};
use std::rc::Rc;
use std::sync;
use std::sync::Arc;
use address::Address;
use allocator::Allocator;
use config::Config;
use data_encoding::BASE32_DNSSEC;
use discref::DiscRef;
use fs2::FileExt;
use index::config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX};
use index::keeper::{IndexKeeper, IndexRawIter};
use index::tree::{Index, PageIter, Value};
use journal::Journal;
use journal::JOURNAL_PAGE_EXP;
use record_scanner::{SegmentRawIter, TxSegmentRawIter};
use snapshot::{SnapshotId, Snapshots};
use std::collections::HashMap;
use std::fmt;
use std::ops::{Bound, RangeBounds};
use std::str;
use transaction::TxSegCheck::{CREATED, DROPPED, NONE};
use transaction::{Transaction, TxRead};
use unsigned_varint::decode::{u32 as u32_vdec, u64 as u64_vdec, Error as VarintError};
use unsigned_varint::encode::{u32 as u32_venc, u32_buffer, u64 as u64_venc, u64_buffer};
use {IndexId, PersyId, SegmentId};
const DEFAULT_PAGE_EXP: u8 = 10; 
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct RecRef {
    pub page: u64,
    pub pos: u32,
}
pub struct PersyImpl {
    config: Arc<Config>,
    journal: Journal,
    address: Address,
    indexes: Indexes,
    allocator: Arc<Allocator>,
    snapshots: Snapshots,
}
pub struct TxFinalize {
    transaction: Transaction,
    pub finished: bool,
}
#[derive(PartialEq, Debug)]
pub enum RecoverStatus {
    Started,
    PrepareCommit,
    Rollback,
    Commit,
}
#[derive(Clone)]
pub struct IndexInfo {
    pub id: IndexId,
    pub value_mode: ValueMode,
    pub key_type: IndexTypeId,
    pub value_type: IndexTypeId,
}
#[derive(PartialEq, Debug)]
pub enum PersyError {
    IO(String),
    Err(String),
    DecodingUTF(str::Utf8Error),
    DecodingVarint(VarintError),
    DecodingDataEncoding(data_encoding::DecodeError),
    VersionNotLastest,
    RecordNotFound(PersyId),
    SegmentNotFound,
    SegmentAlreadyExists,
    CannotDropSegmentCreatedInTx,
    Lock,
    IndexMinElementsShouldBeAtLeastDoubleOfMax,
    IndexNotFound,
    IndexTypeMismatch(String),
    IndexDuplicateKey(String, String),
}
pub type PRes<T> = Result<T, PersyError>;
impl PersyImpl {
    pub fn create<P: Into<String>>(path: P) -> PRes<()> {
        let f: File = OpenOptions::new()
            .write(true)
            .read(true)
            .create_new(true)
            .open(path.into())?;
        PersyImpl::create_from_file(f)
    }
    pub fn create_from_file(f: File) -> PRes<()> {
        f.try_lock_exclusive()?;
        PersyImpl::init_file(f)?;
        Ok(())
    }
    fn init_file(fl: File) -> PRes<()> {
        let disc = DiscRef::new(fl);
        
        let root_page = disc.create_page_raw(DEFAULT_PAGE_EXP)?;
        let allocator_page = Allocator::init(&disc)?;
        let allocator = &Allocator::new(disc, &Rc::new(Config::new()), allocator_page)?;
        let address_page = Address::init(allocator)?;
        let journal_page = Journal::init(allocator)?;
        {
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
            
            root.write_u16::<BigEndian>(0)?;
            
            root.write_u64::<BigEndian>(address_page)?;
            
            root.write_u64::<BigEndian>(journal_page)?;
            root.write_u64::<BigEndian>(allocator_page)?;
            allocator.flush_page(&mut root)?;
            
        }
        allocator.disc().sync()?;
        Ok(())
    }
    fn new(file: File, config: Config) -> PRes<PersyImpl> {
        let disc = DiscRef::new(file);
        let address_page;
        let journal_page;
        let allocator_page;
        {
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
            pg.read_u16::<BigEndian>()?; 
            address_page = pg.read_u64::<BigEndian>()?;
            journal_page = pg.read_u64::<BigEndian>()?;
            allocator_page = pg.read_u64::<BigEndian>()?;
        }
        let config = Arc::new(config);
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
        let address = Address::new(&allocator, &config, address_page)?;
        let journal = Journal::new(&allocator, journal_page)?;
        let indexes = Indexes::new(&config);
        let snapshots = Snapshots::new();
        Ok(PersyImpl {
            config: config.clone(),
            journal,
            address,
            indexes,
            allocator,
            snapshots,
        })
    }
    fn recover<C>(&self, check_if_commit: C) -> PRes<()>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        let mut last_id = None;
        let mut commit_order = Vec::new();
        let mut transactions = HashMap::new();
        let journal = &self.journal;
        let jp = journal.recover(|record, id| {
            let tx = transactions
                .entry(id.clone())
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone())));
            let res = record.recover(&mut tx.1);
            if res.is_err() {
                tx.0 = RecoverStatus::Rollback;
            } else {
                match res.unwrap() {
                    RecoverStatus::Started => {
                        if tx.0 != RecoverStatus::Rollback {
                            tx.0 = RecoverStatus::Started;
                        }
                    }
                    RecoverStatus::PrepareCommit => {
                        if tx.0 != RecoverStatus::Rollback {
                            tx.0 = RecoverStatus::PrepareCommit;
                            commit_order.push(id.clone());
                        }
                    }
                    RecoverStatus::Rollback => {
                        tx.0 = RecoverStatus::Rollback;
                    }
                    RecoverStatus::Commit => {
                        if tx.0 != RecoverStatus::Rollback {
                            tx.0 = RecoverStatus::Commit;
                        }
                    }
                }
            }
        })?;
        let allocator = &self.allocator;
        let address = &self.address;
        let indexes = &self.indexes;
        let snapshots = &self.snapshots;
        for id in commit_order {
            if let Some(mut rec) = transactions.remove(&id) {
                if rec.0 == RecoverStatus::PrepareCommit {
                    if check_if_commit(rec.1.meta_id()) {
                        rec.1.recover_prepare_commit(journal, address, snapshots, allocator)?;
                        rec.1.recover_commit(journal, address, indexes, snapshots, allocator)?;
                        last_id = Some(id.clone());
                    } else {
                        rec.1.recover_rollback(journal, address, snapshots, allocator)?;
                    }
                }
            }
        }
        for p in jp {
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
        }
        for (_, rec) in transactions.iter_mut() {
            rec.1.recover_rollback(journal, address, snapshots, allocator)?;
        }
        if let Some(id) = last_id {
            self.journal.clear(&id)?;
        }
        allocator.flush_free_list()?;
        allocator.disc().sync()?;
        Ok(())
    }
    pub fn open<P: Into<String>>(path: P, config: Config) -> PRes<PersyImpl> {
        PersyImpl::open_with_recover(path, config, |_| true)
    }
    pub fn open_from_file(f: File, config: Config) -> PRes<PersyImpl> {
        PersyImpl::open_from_file_with_recover(f, config, |_| true)
    }
    pub fn open_with_recover<C, P: Into<String>>(path: P, config: Config, recover: C) -> PRes<PersyImpl>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        let f = OpenOptions::new()
            .write(true)
            .read(true)
            .create(false)
            .truncate(false)
            .open(path.into())?;
        PersyImpl::open_from_file_with_recover(f, config, recover)
    }
    pub fn open_from_file_with_recover<C>(f: File, config: Config, recover: C) -> PRes<PersyImpl>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        f.try_lock_exclusive()?;
        let persy = PersyImpl::new(f, config)?;
        persy.recover(recover)?;
        Ok(persy)
    }
    pub fn begin_id(&self, meta_id: Vec<u8>) -> PRes<Transaction> {
        let journal = &self.journal;
        Ok(Transaction::new(journal, self.config.tx_strategy(), meta_id)?)
    }
    pub fn begin(&self) -> PRes<Transaction> {
        self.begin_id(Vec::new())
    }
    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
        match tx.exists_segment(segment) {
            DROPPED => {}
            CREATED(_) => {
                return Err(PersyError::SegmentAlreadyExists);
            }
            NONE => {
                if self.address.exists_segment(&segment)? {
                    return Err(PersyError::SegmentAlreadyExists);
                }
            }
        }
        let segment_id = self.address.create_temp_segment(segment)?;
        tx.add_create_segment(&self.journal, segment, segment_id)?;
        Ok(())
    }
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
        Ok(())
    }
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
        self.address.exists_segment(segment)
    }
    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
        match tx.exists_segment(segment) {
            DROPPED => Ok(false),
            CREATED(_) => Ok(true),
            NONE => self.address.exists_segment(segment),
        }
    }
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
    }
    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
    }
    
    
    
    fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
        match tx.exists_segment(segment) {
            DROPPED => Err(PersyError::SegmentNotFound),
            CREATED(segment_id) => Ok((true, segment_id)),
            NONE => {
                if let Some(id) = self.address.segment_id(segment)? {
                    Ok((false, id))
                } else {
                    Err(PersyError::SegmentNotFound)
                }
            }
        }
    }
    pub fn insert_record(&self, tx: &mut Transaction, segment: &str, rec: &[u8]) -> PRes<RecRef> {
        let (in_tx, segment_id) = self.check_segment_tx(tx, segment)?;
        let len = rec.len();
        let allocation_exp = exp_from_content_size(len as u64);
        let allocator = &self.allocator;
        let address = &self.address;
        let journal = &self.journal;
        let page = allocator.allocate(allocation_exp)?;
        let rec_ref = if in_tx {
            address.allocate_temp(segment_id)?
        } else {
            address.allocate(segment_id)?
        };
        tx.add_insert(journal, segment_id, &rec_ref, page)?;
        {
            let mut pg = allocator.write_page(page)?;
            pg.write_u64::<BigEndian>(len as u64)?;
            pg.write_all(rec)?;
            allocator.flush_page(&mut pg)?;
        }
        Ok(rec_ref)
    }
    fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
        match tx.read(rec_ref) {
            TxRead::RECORD(rec) => Ok(Some((rec.0, rec.1, segment_id))),
            TxRead::DELETED => Ok(None),
            TxRead::NONE => Ok(self
                .address
                .read(rec_ref, segment_id)?
                .map(|(pos, version)| (pos, version, segment_id))),
        }
    }
    fn read_ref(&self, tx: &Transaction, segment: &str, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
        self.read_ref_segment(tx, segment_id, rec_ref)
    }
    fn read_page(&self, page: u64) -> PRes<Vec<u8>> {
        let mut pg = self.allocator.load_page(page)?;
        let len = pg.read_u64::<BigEndian>()?;
        let mut buffer = Vec::<u8>::with_capacity(len as usize);
        pg.take(len).read_to_end(&mut buffer)?;
        Ok(buffer)
    }
    pub fn read_record_scan_tx(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        if let Some(page) = self.read_ref_segment(tx, segment_id, rec_ref)? {
            Ok(Some(self.read_page(page.0)?))
        } else {
            Ok(None)
        }
    }
    pub fn read_record_tx(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        if let Some(page) = self.read_ref(tx, &segment, rec_ref)? {
            tx.add_read(&self.journal, page.2, rec_ref, page.1)?;
            return Ok(Some(self.read_page(page.0)?));
        }
        Ok(None)
    }
    pub fn read_record(&self, segment: &str, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        if let Some(segment_id) = self.address.segment_id(segment)? {
            self.read_record_scan(segment_id, rec_ref)
        } else {
            Err(PersyError::SegmentNotFound)
        }
    }
    pub fn read_record_snapshot(&self, segment: &str, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
        if let Some(segment_id) = self.address.segment_id(segment)? {
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
                Ok(Some(self.read_page(rec_vers.pos)?))
            } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
                Ok(Some(self.read_page(page)?))
            } else {
                Ok(None)
            }
        } else {
            Err(PersyError::SegmentNotFound)
        }
    }
    pub fn read_record_scan(&self, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
            Ok(Some(self.read_page(page)?))
        } else {
            Ok(None)
        }
    }
    pub fn scan(&self, segment: &str) -> PRes<SegmentRawIter> {
        let segment_id;
        if let Some(id) = self.address.segment_id(segment)? {
            segment_id = id;
        } else {
            return Err(PersyError::SegmentNotFound);
        }
        Ok(SegmentRawIter::new(segment_id, self.address.scan(segment_id)?))
    }
    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment: &str) -> PRes<TxSegmentRawIter> {
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
        Ok(TxSegmentRawIter::new(tx, segment_id, self.address.scan(segment_id)?))
    }
    pub fn update_record(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
        let allocator = &self.allocator;
        let journal = &self.journal;
        if let Some(old) = self.read_ref(tx, segment, rec_ref)? {
            let len = rec.len();
            let allocation_exp = exp_from_content_size(len as u64);
            let page = allocator.allocate(allocation_exp)?;
            tx.add_update(journal, old.2, &rec_ref, page, old.0, old.1)?;
            {
                let mut pg = allocator.write_page(page)?;
                pg.write_u64::<BigEndian>(len as u64)?;
                pg.write_all(rec)?;
                allocator.flush_page(&mut pg)?;
            }
            return Ok(());
        }
        Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
    }
    pub fn delete_record(&self, tx: &mut Transaction, segment: &str, rec_ref: &RecRef) -> PRes<()> {
        let journal = &self.journal;
        if let Some(old) = self.read_ref(tx, segment, rec_ref)? {
            tx.add_delete(journal, old.2, &rec_ref, old.0, old.1)?;
            return Ok(());
        }
        Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
    }
    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
        let allocator = &self.allocator;
        let journal = &self.journal;
        let snapshots = &self.snapshots;
        let address = &self.address;
        tx.rollback(journal, address, snapshots, allocator)
    }
    pub fn prepare_commit(&self, mut tx: Transaction) -> PRes<TxFinalize> {
        let indexes = &self.indexes;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let snapshots = &self.snapshots;
        let address = &self.address;
        tx = tx.prepare_commit(journal, address, indexes, snapshots, self, allocator)?;
        Ok(TxFinalize {
            transaction: tx,
            finished: false,
        })
    }
    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
        if finalizer.finished {
            return Ok(());
        }
        finalizer.finished = true;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let address = &self.address;
        let snapshots = &self.snapshots;
        let indexes = &self.indexes;
        finalizer
            .transaction
            .rollback_prepared(journal, address, indexes, snapshots, allocator)
    }
    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
        if finalizer.finished {
            return Ok(());
        }
        finalizer.finished = true;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let indexes = &self.indexes;
        let snapshots = &self.snapshots;
        let address = &self.address;
        finalizer
            .transaction
            .commit(address, journal, indexes, snapshots, allocator)
    }
    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::create_index::<K, V>(self, tx, &index_name.to_string(), 32, 128, value_mode)
    }
    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
        Indexes::drop_index(self, tx, &index_name.to_string())
    }
    pub fn put<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: V) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
        tx.add_put(&index_name.to_string(), k, v);
        Ok(())
    }
    pub fn remove<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: Option<V>) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
        tx.add_remove(&index_name.to_string(), k, v);
        Ok(())
    }
    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
    where
        K: IndexType,
        V: IndexType,
    {
        let result;
        let vm;
        {
            let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, Some(tx), None, index_name)?;
            self.indexes.read_lock(index_name.to_string())?;
            result = ik.get(k)?;
            vm = IndexKeeper::<K, V>::value_mode(&ik);
        }
        self.indexes.read_unlock(index_name.to_string())?;
        tx.apply_changes::<K, V>(vm, &index_name.to_string(), k, result)
    }
    pub fn get<K, V>(&self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
    where
        K: IndexType,
        V: IndexType,
    {
        let read_snapshot = self.snapshots.read_snapshot()?;
        let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
        let r = ik.get(k);
        self.snapshots.release(read_snapshot)?;
        r
    }
    pub fn index_next<K, V>(&self, index_name: &str, read_snapshot: SnapshotId, next: &RecRef) -> PRes<PageIter<K, V>>
    where
        K: IndexType,
        V: IndexType,
    {
        let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
        ik.iter_node(next)
    }
    pub fn range<K, V, R>(&self, index_name: &str, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
    where
        K: IndexType,
        V: IndexType,
        R: RangeBounds<K>,
    {
        let read_snapshot = self.snapshots.read_snapshot()?;
        let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
        let after = ik.iter_from(range.start_bound())?;
        let vm = IndexKeeper::<K, V>::value_mode(&ik);
        let end = match range.end_bound() {
            Bound::Included(x) => Bound::Included(x.clone()),
            Bound::Excluded(x) => Bound::Excluded(x.clone()),
            Bound::Unbounded => Bound::Unbounded,
        };
        Ok((vm, IndexRawIter::new(index_name, read_snapshot, after, end)))
    }
    pub fn release(&self, snapshot_id: SnapshotId) -> PRes<()> {
        let (to_free, to_clean) = self.snapshots.release(snapshot_id)?;
        for page in to_free {
            self.allocator.free(page)?;
        }
        for journal_id in to_clean {
            self.journal.clear(&journal_id)?;
        }
        Ok(())
    }
    pub fn address(&self) -> &Address {
        &self.address
    }
    pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
        Ok(self
            .address
            .list()?
            .into_iter()
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
            .collect())
    }
    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
        self.address
            .list()?
            .into_iter()
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
            .map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
                name.drain(..INDEX_META_PREFIX.len());
                let info = self.index_info(None, &name, id)?;
                Ok((name, info))
            })
            .collect()
    }
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, u32)>> {
        Ok(tx
            .filter_list(self.address.list()?)
            .into_iter()
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
            .collect())
    }
    pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
        tx.filter_list(self.address.list()?)
            .into_iter()
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
            .map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
                name.drain(..INDEX_META_PREFIX.len());
                let info = self.index_info(Some(tx), &name, id)?;
                Ok((name, info))
            })
            .collect()
    }
    fn index_info(&self, tx: Option<&Transaction>, name: &str, id: u32) -> PRes<IndexInfo> {
        let index = Indexes::get_index(self, tx, name)?;
        Ok(IndexInfo {
            id: IndexId(id),
            value_mode: index.value_mode,
            key_type: IndexTypeId::from(index.key_type),
            value_type: IndexTypeId::from(index.value_type),
        })
    }
}
pub fn exp_from_content_size(size: u64) -> u8 {
    
    let final_size = size + 8 + 2;
    
    let mut res: u8 = 1;
    loop {
        if final_size < (1 << res) {
            return res;
        }
        res += 1;
    }
}
impl From<io::Error> for PersyError {
    fn from(erro: io::Error) -> PersyError {
        PersyError::IO(format!("{}", erro).to_string())
    }
}
impl<T> From<sync::PoisonError<T>> for PersyError {
    fn from(_: sync::PoisonError<T>) -> PersyError {
        PersyError::Lock
    }
}
impl From<str::Utf8Error> for PersyError {
    fn from(err: str::Utf8Error) -> PersyError {
        PersyError::DecodingUTF(err)
    }
}
impl From<data_encoding::DecodeError> for PersyError {
    fn from(err: data_encoding::DecodeError) -> PersyError {
        PersyError::DecodingDataEncoding(err)
    }
}
impl From<VarintError> for PersyError {
    fn from(err: VarintError) -> PersyError {
        PersyError::DecodingVarint(err)
    }
}
impl error::Error for PersyError {}
impl fmt::Display for PersyError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            PersyError::IO(m) => write!(f, "IO Error: {}", m),
            PersyError::Err(g) => write!(f, "Generic Error: {}", g),
            PersyError::DecodingUTF(e) => write!(f, "String decoding error: {}", e),
            PersyError::DecodingDataEncoding(e) => write!(f, "Data Encoding Decoding error: {}", e),
            PersyError::DecodingVarint(e) => write!(f, "Varint decoding error: {}", e),
            PersyError::VersionNotLastest => write!(f, "The record version is not latest"),
            PersyError::RecordNotFound(r) => write!(f, "Record not found: {}", r),
            PersyError::SegmentNotFound => write!(f, "Segment not found"),
            PersyError::SegmentAlreadyExists => write!(f, "Segment already exist"),
            PersyError::CannotDropSegmentCreatedInTx => {
                write!(f, "Create and drop of a segment in the same transaction is not allowed")
            }
            PersyError::Lock => write!(f, "Failure acquiring lock for poisoning"),
            PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax => write!(
                f,
                "Index min page elements should be maximum half of the maximum elements"
            ),
            PersyError::IndexNotFound => write!(f, "Index not found"),
            PersyError::IndexTypeMismatch(m) => write!(f, "Index method type mismatch persistent types: {}", m),
            PersyError::IndexDuplicateKey(i, k) => write!(f, "Found duplicate key:{} for index: {}", k, i),
        }
    }
}
impl RecRef {
    pub fn new(page: u64, pos: u32) -> RecRef {
        RecRef { page, pos }
    }
}
impl fmt::Display for RecRef {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut buffer = Vec::new();
        buffer
            .write_all(u64_venc(self.page, &mut u64_buffer()))
            .expect("no failure expected only memory allocation");
        buffer.push(0b0101_0101);
        buffer
            .write_all(u32_venc(self.pos, &mut u32_buffer()))
            .expect("no failure expected only memory allocation");
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
    }
}
impl std::str::FromStr for RecRef {
    type Err = PersyError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
        let (page, rest) = u64_vdec(&bytes)?;
        let (pos, _) = u32_vdec(&rest[1..])?;
        Ok(RecRef::new(page, pos))
    }
}
fn write_id(f: &mut fmt::Formatter, id: u32) -> fmt::Result {
    let mut buffer = Vec::new();
    buffer
        .write_all(u32_venc(id, &mut u32_buffer()))
        .expect("no failure expected only memory allocation");
    buffer.push(0b0101_0101);
    write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
}
fn read_id(s: &str) -> PRes<u32> {
    let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
    let (id, _) = u32_vdec(&bytes)?;
    Ok(id)
}
impl fmt::Display for SegmentId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write_id(f, self.0)
    }
}
impl std::str::FromStr for SegmentId {
    type Err = PersyError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(SegmentId(read_id(s)?))
    }
}
impl fmt::Display for IndexId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write_id(f, self.0)
    }
}
impl std::str::FromStr for IndexId {
    type Err = PersyError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(IndexId(read_id(s)?))
    }
}
#[cfg(test)]
mod tests {
    use super::{IndexId, RecRef, SegmentId};
    #[test()]
    fn test_persy_id_string() {
        let id = RecRef::new(20, 30);
        let s = format!("{}", id);
        let rid = s.parse::<RecRef>();
        assert_eq!(rid, Ok(id));
    }
    #[test()]
    fn test_persy_id_parse_failure() {
        let s = "ACCC";
        let rid = s.parse::<RecRef>();
        assert!(rid.is_err());
    }
    #[test()]
    fn test_segmend_id_string() {
        let id = SegmentId(20);
        let s = format!("{}", id);
        let rid = s.parse::<SegmentId>();
        assert_eq!(rid, Ok(id));
    }
    #[test()]
    fn test_segment_id_parse_failure() {
        let s = "ACCC";
        let rid = s.parse::<SegmentId>();
        assert!(rid.is_err());
    }
    #[test()]
    fn test_index_id_string() {
        let id = IndexId(20);
        let s = format!("{}", id);
        let rid = s.parse::<IndexId>();
        assert_eq!(rid, Ok(id));
    }
    #[test()]
    fn test_index_id_parse_failure() {
        let s = "ACCC";
        let rid = s.parse::<IndexId>();
        assert!(rid.is_err());
    }
}