persy 0.7.0

Transactional Persistence Engine
Documentation
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use fs2::FileExt;
pub use std::fs::OpenOptions;
use std::{
    collections::HashMap,
    fs::File,
    io::{Read, Write},
    ops::RangeBounds,
    path::Path,
    rc::Rc,
    str,
    sync::Arc,
};

pub use crate::id::RecRef;
use crate::index::{
    config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
    keeper::{IndexKeeper, IndexRawIter},
    tree::{Index, PageIter, PageIterBack, Value},
};
use crate::transaction::{
    PreparedState, Transaction, TxRead,
    TxSegCheck::{CREATED, DROPPED, NONE},
};
use crate::{
    address::Address,
    allocator::Allocator,
    config::Config,
    discref::{DiscRef, PAGE_METADATA_SIZE},
    error::{PRes, PersyError},
    id::{IndexId, PersyId, SegmentId, ToSegmentId},
    journal::{Journal, JOURNAL_PAGE_EXP},
    record_scanner::{SegmentRawIter, TxSegmentRawIter},
    snapshot::{SnapshotId, Snapshots},
};

const DEFAULT_PAGE_EXP: u8 = 10; // 2^10

pub struct PersyImpl {
    config: Arc<Config>,
    journal: Journal,
    address: Address,
    indexes: Indexes,
    allocator: Arc<Allocator>,
    snapshots: Snapshots,
}

pub struct TxFinalize {
    transaction: Transaction,
    prepared: PreparedState,
    pub finished: bool,
}

#[derive(PartialEq, Debug)]
pub enum RecoverStatus {
    Started,
    PrepareCommit,
    Rollback,
    Commit,
}

#[derive(Clone)]
/// Index definition details
pub struct IndexInfo {
    pub id: IndexId,
    pub value_mode: ValueMode,
    pub key_type: IndexTypeId,
    pub value_type: IndexTypeId,
}

impl PersyImpl {
    pub fn create(path: &Path) -> PRes<()> {
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
        PersyImpl::create_from_file(f)
    }

    pub fn create_from_file(f: File) -> PRes<()> {
        f.try_lock_exclusive()?;
        PersyImpl::init_file(f)?;
        Ok(())
    }

    fn init_file(fl: File) -> PRes<()> {
        let disc = DiscRef::new(fl);
        // root_page is every time 0
        let root_page = disc.create_page_raw(DEFAULT_PAGE_EXP)?;
        let allocator_page = Allocator::init(&disc)?;
        let allocator = &Allocator::new(disc, &Rc::new(Config::new()), allocator_page)?;
        let address_page = Address::init(allocator)?;
        let journal_page = Journal::init(allocator)?;
        {
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
            // Version of the disc format
            root.write_u16::<BigEndian>(0)?;
            // Position of the start of address structure
            root.write_u64::<BigEndian>(address_page)?;
            // Start of the Log data, if shutdown well this will be every time 0
            root.write_u64::<BigEndian>(journal_page)?;
            root.write_u64::<BigEndian>(allocator_page)?;
            allocator.flush_page(&mut root)?;
            // TODO: check this never go over the first page
        }
        allocator.disc().sync()?;
        Ok(())
    }

    fn new(file: File, config: Config) -> PRes<PersyImpl> {
        let disc = DiscRef::new(file);
        let address_page;
        let journal_page;
        let allocator_page;
        {
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
            pg.read_u16::<BigEndian>()?; //THIS NOW is 0 all the time
            address_page = pg.read_u64::<BigEndian>()?;
            journal_page = pg.read_u64::<BigEndian>()?;
            allocator_page = pg.read_u64::<BigEndian>()?;
        }
        let config = Arc::new(config);
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
        let address = Address::new(&allocator, &config, address_page)?;
        let journal = Journal::new(&allocator, journal_page)?;
        let indexes = Indexes::new(&config);
        let snapshots = Snapshots::new();
        Ok(PersyImpl {
            config: config.clone(),
            journal,
            address,
            indexes,
            allocator,
            snapshots,
        })
    }

    fn recover<C>(&self, check_if_commit: C) -> PRes<()>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        let mut last_id = None;
        let mut commit_order = Vec::new();
        let mut transactions = HashMap::new();
        let journal = &self.journal;
        let jp = journal.recover(|record, id| {
            let tx = transactions
                .entry(id.clone())
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone())));
            tx.0 = match record.recover(&mut tx.1) {
                Err(_) => RecoverStatus::Rollback,
                Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
                Ok(x) => match x {
                    RecoverStatus::Started => RecoverStatus::Started,
                    RecoverStatus::PrepareCommit => {
                        commit_order.push(id.clone());
                        RecoverStatus::PrepareCommit
                    }
                    RecoverStatus::Rollback => RecoverStatus::Rollback,
                    RecoverStatus::Commit => RecoverStatus::Commit,
                },
            }
        })?;

        let allocator = &self.allocator;
        let address = &self.address;
        let indexes = &self.indexes;
        for id in commit_order {
            if let Some((status, mut tx)) = transactions.remove(&id) {
                if status == RecoverStatus::PrepareCommit {
                    if check_if_commit(tx.meta_id()) {
                        let prepared = tx.recover_prepare_commit(journal, address, allocator)?;
                        tx.recover_commit(journal, address, indexes, allocator, prepared)?;
                        last_id = Some(id);
                    } else {
                        tx.recover_rollback(journal, address, allocator)?;
                    }
                }
            }
        }
        for p in jp {
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
        }

        for (_, (_, tx)) in transactions.iter_mut() {
            tx.recover_rollback(journal, address, allocator)?;
        }
        if let Some(id) = last_id {
            self.journal.clear_all(&[id])?;
        }
        allocator.flush_free_list()?;
        allocator.disc().sync()?;
        Ok(())
    }

    pub fn open(path: &Path, config: Config) -> PRes<PersyImpl> {
        PersyImpl::open_with_recover(path, config, |_| true)
    }

    pub fn open_from_file(f: File, config: Config) -> PRes<PersyImpl> {
        PersyImpl::open_from_file_with_recover(f, config, |_| true)
    }

    pub fn open_with_recover<C>(path: &Path, config: Config, recover: C) -> PRes<PersyImpl>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        let f = OpenOptions::new()
            .write(true)
            .read(true)
            .create(false)
            .truncate(false)
            .open(path)?;
        PersyImpl::open_from_file_with_recover(f, config, recover)
    }

    pub fn open_from_file_with_recover<C>(f: File, config: Config, recover: C) -> PRes<PersyImpl>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        f.try_lock_exclusive()?;
        let persy = PersyImpl::new(f, config)?;
        persy.recover(recover)?;
        Ok(persy)
    }

    pub fn begin_id(&self, meta_id: Vec<u8>) -> PRes<Transaction> {
        let journal = &self.journal;
        Ok(Transaction::new(journal, self.config.tx_strategy(), meta_id)?)
    }

    pub fn begin(&self) -> PRes<Transaction> {
        self.begin_id(Vec::new())
    }

    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
        match tx.exists_segment(segment) {
            DROPPED => {}
            CREATED(_) => {
                return Err(PersyError::SegmentAlreadyExists);
            }
            NONE => {
                if self.address.exists_segment(&segment)? {
                    return Err(PersyError::SegmentAlreadyExists);
                }
            }
        }
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
        Ok(SegmentId::new(segment_id))
    }

    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
        Ok(())
    }

    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
        self.address.exists_segment(segment)
    }

    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
        match tx.exists_segment(segment) {
            DROPPED => Ok(false),
            CREATED(_) => Ok(true),
            NONE => self.address.exists_segment(segment),
        }
    }
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
    }

    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
    }

    pub fn segment2id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
        segment.to_segment_id(&self.address)
    }

    pub fn segment2id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
        Ok(sid)
    }

    /// check if a segment exist persistent or in tx.
    ///
    /// @return true if the segment was created in tx.
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
        match tx.exists_segment(segment) {
            DROPPED => Err(PersyError::SegmentNotFound),
            CREATED(segment_id) => Ok((true, segment_id)),
            NONE => self
                .address
                .segment_id(segment)?
                .map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
        }
    }

    pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
        let len = rec.len() as u64;
        let allocation_exp = exp_from_content_size(len);
        let allocator = &self.allocator;
        let address = &self.address;
        let page = allocator.allocate(allocation_exp)?;
        let (rec_ref, maybe_new_page) = if in_tx {
            address.allocate_temp(segment_id.id)
        } else {
            address.allocate(segment_id.id)
        }?;
        tx.add_insert(&self.journal, segment_id.id, &rec_ref, page)?;
        if let Some(new_page) = maybe_new_page {
            tx.add_new_segment_page(&self.journal, segment_id.id, new_page.new_page, new_page.previus_page)?;
        }
        {
            let mut pg = allocator.write_page(page)?;
            pg.write_u64::<BigEndian>(len)?;
            pg.write_all(rec)?;
            allocator.flush_page(&mut pg)?;
        }
        Ok(rec_ref)
    }

    fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
        Ok(match tx.read(rec_ref) {
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
            TxRead::DELETED => None,
            TxRead::NONE => self
                .address
                .read(rec_ref, segment_id)?
                .map(|(pos, version)| (pos, version, segment_id)),
        })
    }

    fn read_ref(&self, tx: &Transaction, segment: impl ToSegmentId, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
        let (segment_id, _) = segment.to_segment_id_tx(self, tx)?;
        self.read_ref_segment(tx, segment_id.id, rec_ref)
    }

    fn read_page(&self, page: u64) -> PRes<Vec<u8>> {
        let mut pg = self.allocator.load_page(page)?;
        let len = pg.read_u64::<BigEndian>()?;
        let mut buffer = Vec::<u8>::with_capacity(len as usize);
        pg.take(len).read_to_end(&mut buffer)?;
        Ok(buffer)
    }

    pub fn read_record_scan_tx(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        Ok(if let Some(page) = self.read_ref_segment(tx, segment_id, rec_ref)? {
            Some(self.read_page(page.0)?)
        } else {
            None
        })
    }

    pub fn read_record_tx(
        &self,
        tx: &mut Transaction,
        segment: impl ToSegmentId,
        rec_ref: &RecRef,
    ) -> PRes<Option<Vec<u8>>> {
        Ok(if let Some(page) = self.read_ref(tx, segment, rec_ref)? {
            tx.add_read(&self.journal, page.2, rec_ref, page.1)?;
            Some(self.read_page(page.0)?)
        } else {
            None
        })
    }

    pub fn read_record(&self, segment: impl ToSegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        let segment_id = segment.to_segment_id(&self.address)?.id;
        self.read_record_scan(segment_id, rec_ref)
    }

    pub fn read_record_snapshot(
        &self,
        segment: impl ToSegmentId,
        rec_ref: &RecRef,
        snapshot: SnapshotId,
    ) -> PRes<Option<Vec<u8>>> {
        let segment_id = segment.to_segment_id(&self.address)?.id;
        Ok(if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
            Some(self.read_page(rec_vers.pos)?)
        } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
            Some(self.read_page(page)?)
        } else {
            None
        })
    }

    pub fn read_record_scan(&self, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
        Ok(if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
            Some(self.read_page(page)?)
        } else {
            None
        })
    }

    pub fn scan(&self, segment: impl ToSegmentId) -> PRes<SegmentRawIter> {
        let segment_id = segment.to_segment_id(&self.address)?.id;
        Ok(SegmentRawIter::new(segment_id, self.address.scan(segment_id)?))
    }

    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment: impl ToSegmentId) -> PRes<TxSegmentRawIter> {
        let (segment_id, _) = segment.to_segment_id_tx(self, tx)?;
        Ok(TxSegmentRawIter::new(
            tx,
            segment_id.id,
            self.address.scan(segment_id.id)?,
        ))
    }

    pub fn update_record(
        &self,
        tx: &mut Transaction,
        segment: impl ToSegmentId,
        rec_ref: &RecRef,
        rec: &[u8],
    ) -> PRes<()> {
        if let Some((_, version, segment)) = self.read_ref(tx, segment, rec_ref)? {
            let allocator = &self.allocator;
            let journal = &self.journal;
            let len = rec.len();
            let allocation_exp = exp_from_content_size(len as u64);
            let page = allocator.allocate(allocation_exp)?;
            tx.add_update(journal, segment, &rec_ref, page, version)?;
            let mut pg = allocator.write_page(page)?;
            pg.write_u64::<BigEndian>(len as u64)?;
            pg.write_all(rec)?;
            allocator.flush_page(&mut pg)
        } else {
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
        }
    }

    pub fn delete_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec_ref: &RecRef) -> PRes<()> {
        if let Some((_, version, seg)) = self.read_ref(tx, segment, rec_ref)? {
            tx.add_delete(&self.journal, seg, &rec_ref, version)
        } else {
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
        }
    }

    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
        let allocator = &self.allocator;
        let journal = &self.journal;
        let address = &self.address;
        tx.rollback(journal, address, allocator)
    }

    pub fn prepare_commit(&self, tx: Transaction) -> PRes<TxFinalize> {
        let indexes = &self.indexes;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let snapshots = &self.snapshots;
        let address = &self.address;
        let (tx, prepared) = tx.prepare_commit(journal, address, indexes, snapshots, self, allocator)?;

        Ok(TxFinalize {
            transaction: tx,
            prepared,
            finished: false,
        })
    }

    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
        if finalizer.finished {
            return Ok(());
        }
        finalizer.finished = true;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let address = &self.address;
        let snapshots = &self.snapshots;
        let indexes = &self.indexes;
        let prepared = finalizer.prepared.clone();
        let tx = &mut finalizer.transaction;
        tx.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)
    }

    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
        if finalizer.finished {
            return Ok(());
        }
        finalizer.finished = true;
        let allocator = &self.allocator;
        let journal = &self.journal;
        let indexes = &self.indexes;
        let snapshots = &self.snapshots;
        let address = &self.address;
        let prepared = finalizer.prepared.clone();
        let tx = &mut finalizer.transaction;
        tx.commit(address, journal, indexes, snapshots, allocator, prepared)
    }

    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
    }

    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
        Indexes::drop_index(self, tx, index_name)
    }

    pub fn put<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: V) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
        tx.add_put(index_name, k, v);
        Ok(())
    }

    pub fn remove<K, V>(&self, tx: &mut Transaction, index_name: &str, k: K, v: Option<V>) -> PRes<()>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index::<K, V>(self, Some(tx), index_name)?;
        tx.add_remove(index_name, k, v);
        Ok(())
    }

    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
    where
        K: IndexType,
        V: IndexType,
    {
        let (result, vm) = {
            let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, Some(tx), None, index_name)?;
            self.indexes.read_lock(index_name.to_string())?;
            (ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
        };
        self.indexes.read_unlock(index_name.to_string())?;
        tx.apply_changes::<K, V>(vm, index_name, k, result)
    }

    pub fn get<K, V>(&self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
    where
        K: IndexType,
        V: IndexType,
    {
        let read_snapshot = self.snapshots.read_snapshot()?;
        let r = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?.get(k);
        self.snapshots.release(read_snapshot)?;
        r
    }

    pub fn index_next<K, V>(&self, index_name: &str, read_snapshot: SnapshotId, next: &RecRef) -> PRes<PageIter<K, V>>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?.iter_node(next)
    }

    pub fn index_next_back<K, V>(
        &self,
        index_name: &str,
        read_snapshot: SnapshotId,
        next: &RecRef,
    ) -> PRes<PageIterBack<K, V>>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?.back_iter_node(next)
    }

    pub fn range<K, V, R>(&self, index_name: &str, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
    where
        K: IndexType,
        V: IndexType,
        R: RangeBounds<K>,
    {
        let read_snapshot = self.snapshots.read_snapshot()?;
        let mut ik = Indexes::check_and_get_index_keeper::<K, V>(self, None, Some(read_snapshot), index_name)?;
        let after = ik.iter_from(range.start_bound())?;
        let before = ik.back_iter_from(range.end_bound())?;
        Ok((
            IndexKeeper::<K, V>::value_mode(&ik),
            IndexRawIter::new(index_name, read_snapshot, after, before),
        ))
    }

    pub fn release(&self, snapshot_id: SnapshotId) -> PRes<()> {
        let (to_free, to_clean) = self.snapshots.release(snapshot_id)?;
        for page in to_free {
            self.allocator.free(page.page)?;
        }
        self.journal.clear_all(&to_clean)?;
        self.allocator.disc().sync()?;
        Ok(())
    }
    pub fn address(&self) -> &Address {
        &self.address
    }

    pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
        Ok(self
            .address
            .list()?
            .into_iter()
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
            .collect())
    }

    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
        self.address
            .list()?
            .into_iter()
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
            .map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
                name.drain(..INDEX_META_PREFIX.len());
                let info = self.index_info(None, &name, id)?;
                Ok((name, info))
            })
            .collect()
    }

    pub fn list_segments_tx(&self, tx: &mut Transaction) -> PRes<Vec<(String, u32)>> {
        Ok(tx
            .filter_list(&self.address.list()?)
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
            .map(|(name, id)| (name.to_string(), id))
            .collect())
    }

    pub fn list_indexes_tx(&self, tx: &mut Transaction) -> PRes<Vec<(String, IndexInfo)>> {
        tx.filter_list(&self.address.list()?)
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
            .map(|(name, id)| (name.to_string(), id))
            .map(|(mut name, id)| -> PRes<(String, IndexInfo)> {
                name.drain(..INDEX_META_PREFIX.len());
                let info = self.index_info(Some(tx), &name, id)?;
                Ok((name, info))
            })
            .collect()
    }

    fn index_info(&self, tx: Option<&Transaction>, name: &str, id: u32) -> PRes<IndexInfo> {
        let index = Indexes::get_index(self, tx, name)?;
        Ok(IndexInfo {
            id: IndexId::new(id),
            value_mode: index.value_mode,
            key_type: IndexTypeId::from(index.key_type),
            value_type: IndexTypeId::from(index.value_type),
        })
    }
}

pub fn exp_from_content_size(size: u64) -> u8 {
    // content + size + page_header
    let final_size = size + 8 + u64::from(PAGE_METADATA_SIZE);
    // Should be there a better way, so far is OK.
    let mut res: u8 = 1;
    loop {
        if final_size < (1 << res) {
            return res;
        }
        res += 1;
    }
}