persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::index::{
    config::{IndexType, Indexes, ValueMode},
    keeper::IndexTransactionKeeper,
    tree::Value,
};
use crate::{
    address::Address,
    allocator::Allocator,
    config::TxStrategy,
    error::{PRes, PersyError},
    id::RecRef,
    journal::{Journal, JournalId},
    persy::PersyImpl,
    snapshot::{SnapshotEntry, SnapshotId, Snapshots},
};
use std::{
    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
    ops::RangeBounds,
    vec::{self, IntoIter},
};

#[derive(Clone)]
pub struct NewSegmentPage {
    pub segment: u32,
    pub page: u64,
    pub previous: u64,
}

#[derive(Clone)]
pub struct InsertRecord {
    pub segment: u32,
    pub recref: RecRef,
    pub record_page: u64,
}

#[derive(Clone)]
pub struct UpdateRecord {
    pub segment: u32,
    pub recref: RecRef,
    pub record_page: u64,
    pub version: u16,
}

#[derive(Clone)]
pub struct ReadRecord {
    pub segment: u32,
    pub recref: RecRef,
    pub version: u16,
}

#[derive(Clone)]
pub struct DeleteRecord {
    pub segment: u32,
    pub recref: RecRef,
    pub version: u16,
}

#[derive(Clone)]
pub struct CreateSegment {
    pub name: String,
    pub segment_id: u32,
    pub first_page: u64,
}

#[derive(Clone)]
pub struct DropSegment {
    pub name: String,
    pub segment_id: u32,
}

#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq)]
pub struct FreedPage {
    pub page: u64,
}

pub struct PrepareCommit {}

pub struct Commit {}

pub struct Rollback {}
pub struct Metadata {
    pub strategy: TxStrategy,
    pub meta_id: Vec<u8>,
}

pub enum SegmentOperation {
    CREATE(CreateSegment),
    DROP(DropSegment),
}

#[derive(Clone)]
pub struct PreparedState {
    locked_indexes: Option<Vec<String>>,
    snapshot_id: Option<SnapshotId>,
    data_locks: Option<(Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>)>,
}

impl PreparedState {
    fn new() -> PreparedState {
        PreparedState {
            locked_indexes: None,
            snapshot_id: None,
            data_locks: None,
        }
    }
}

pub struct Transaction {
    strategy: TxStrategy,
    meta_id: Vec<u8>,
    id: JournalId,
    inserted: Vec<InsertRecord>,
    updated: Vec<UpdateRecord>,
    deleted: Vec<DeleteRecord>,
    read: HashMap<RecRef, ReadRecord>,
    segments_operations: Vec<SegmentOperation>,
    segs_created_names: HashSet<String>,
    segs_dropped_names: HashSet<String>,
    segs_created: HashSet<u32>,
    segs_dropped: HashSet<u32>,
    segs_updated: HashSet<u32>,
    freed_pages: Option<Vec<FreedPage>>,
    indexes: Option<IndexTransactionKeeper>,
    segs_new_pages: Vec<NewSegmentPage>,
}

pub enum TxRead {
    RECORD((u64, u16)),
    DELETED,
    NONE,
}

pub enum TxSegCheck {
    CREATED(u32),
    DROPPED,
    NONE,
}

pub struct TransactionInsertScanner<'a> {
    tx: &'a Transaction,
    segment: u32,
}

pub struct TransactionInsertIterator {
    iter: vec::IntoIter<InsertRecord>,
    segment: u32,
}

impl<'a> IntoIterator for TransactionInsertScanner<'a> {
    type Item = RecRef;
    type IntoIter = TransactionInsertIterator;

    fn into_iter(self) -> Self::IntoIter {
        let iter: vec::IntoIter<InsertRecord> = self.tx.inserted.clone().into_iter();
        TransactionInsertIterator {
            iter,
            segment: self.segment,
        }
    }
}

impl Iterator for TransactionInsertIterator {
    type Item = RecRef;
    fn next(&mut self) -> Option<RecRef> {
        loop {
            let next = self.iter.next();
            if let Some(rec) = next {
                if rec.segment == self.segment {
                    return Some(rec.recref);
                }
            } else {
                return None;
            }
        }
    }
}

impl Transaction {
    pub fn new(journal: &Journal, strategy: &TxStrategy, meta_id: Vec<u8>) -> PRes<Transaction> {
        let id = journal.start()?;
        journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
        Ok(Transaction {
            strategy: strategy.clone(),
            meta_id,
            id,
            inserted: Vec::new(),
            updated: Vec::new(),
            deleted: Vec::new(),
            read: HashMap::new(),
            segments_operations: Vec::new(),
            segs_created_names: HashSet::new(),
            segs_dropped_names: HashSet::new(),
            segs_created: HashSet::new(),
            segs_dropped: HashSet::new(),
            segs_updated: HashSet::new(),
            freed_pages: None,
            indexes: Some(IndexTransactionKeeper::new()),
            segs_new_pages: Vec::new(),
        })
    }

    pub fn recover(id: JournalId) -> Transaction {
        Transaction {
            strategy: TxStrategy::LastWin,
            meta_id: Vec::new(),
            id,
            inserted: Vec::new(),
            updated: Vec::new(),
            deleted: Vec::new(),
            read: HashMap::new(),
            segments_operations: Vec::new(),
            segs_created_names: HashSet::new(),
            segs_dropped_names: HashSet::new(),
            segs_created: HashSet::new(),
            segs_dropped: HashSet::new(),
            segs_updated: HashSet::new(),
            freed_pages: None,
            indexes: Some(IndexTransactionKeeper::new()),
            segs_new_pages: Vec::new(),
        }
    }

    pub fn segment_created_in_tx(&self, segment: u32) -> bool {
        self.segs_created.contains(&segment)
    }

    pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
        if self.segs_created_names.contains(segment) {
            for a in &self.segments_operations {
                if let SegmentOperation::CREATE(ref c) = a {
                    if c.name == segment {
                        return TxSegCheck::CREATED(c.segment_id);
                    }
                }
            }
        } else if self.segs_dropped_names.contains(segment) {
            return TxSegCheck::DROPPED;
        }
        TxSegCheck::NONE
    }

    pub fn add_create_segment(&mut self, journal: &Journal, name: &str, segment_id: u32, first_page: u64) -> PRes<()> {
        let create = CreateSegment::new(name, segment_id, first_page);

        journal.log(&create, &self.id)?;
        self.segments_operations.push(SegmentOperation::CREATE(create));
        self.segs_created.insert(segment_id);
        self.segs_created_names.insert(name.into());
        Ok(())
    }

    pub fn recover_add(&mut self, create: &CreateSegment) {
        self.segments_operations.push(SegmentOperation::CREATE(create.clone()));
        self.segs_created.insert(create.segment_id);
        self.segs_created_names.insert(create.name.clone());
    }

    pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: u32) -> PRes<()> {
        if self.segs_created_names.contains(name) {
            Err(PersyError::CannotDropSegmentCreatedInTx)
        } else {
            let drop = DropSegment::new(name, segment_id);
            journal.log(&drop, &self.id)?;
            self.segments_operations.push(SegmentOperation::DROP(drop));
            self.segs_dropped.insert(segment_id);
            self.segs_dropped_names.insert(name.into());
            Ok(())
        }
    }

    pub fn recover_drop(&mut self, drop: &DropSegment) {
        self.segments_operations.push(SegmentOperation::DROP(drop.clone()));
        self.segs_dropped.insert(drop.segment_id);
        self.segs_dropped_names.insert(drop.name.clone());
    }

    pub fn add_read(&mut self, journal: &Journal, segment: u32, recref: &RecRef, version: u16) -> PRes<()> {
        if self.strategy == TxStrategy::VersionOnRead {
            let read = ReadRecord::new(segment, recref, version);
            journal.log(&read, &self.id)?;
            self.read.insert(recref.clone(), read);
        }
        Ok(())
    }

    pub fn recover_read(&mut self, read: &ReadRecord) {
        self.read.insert(read.recref.clone(), read.clone());
    }

    pub fn add_insert(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, record: u64) -> PRes<()> {
        self.segs_updated.insert(segment);
        let insert = InsertRecord::new(segment, rec_ref, record);

        journal.log(&insert, &self.id)?;
        self.inserted.push(insert);
        Ok(())
    }
    pub fn add_new_segment_page(
        &mut self,
        journal: &Journal,
        segment: u32,
        new_page: u64,
        previous_page: u64,
    ) -> PRes<()> {
        let new_page = NewSegmentPage::new(segment, new_page, previous_page);

        journal.log(&new_page, &self.id)?;
        self.segs_new_pages.push(new_page);
        Ok(())
    }

    pub fn recover_insert(&mut self, insert: &InsertRecord) {
        self.segs_updated.insert(insert.segment);
        self.inserted.push(insert.clone());
    }

    pub fn add_update(
        &mut self,
        journal: &Journal,
        segment: u32,
        rec_ref: &RecRef,
        record: u64,
        version: u16,
    ) -> PRes<()> {
        self.segs_updated.insert(segment);
        let update = UpdateRecord::new(segment, rec_ref, record, version);
        journal.log(&update, &self.id)?;
        self.updated.push(update);
        Ok(())
    }

    pub fn recover_update(&mut self, update: &UpdateRecord) {
        self.segs_updated.insert(update.segment);
        self.updated.push(update.clone());
    }

    pub fn add_delete(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, version: u16) -> PRes<()> {
        self.segs_updated.insert(segment);
        let delete = DeleteRecord::new(segment, rec_ref, version);
        journal.log(&delete, &self.id)?;
        self.deleted.push(delete);
        Ok(())
    }

    pub fn add_put<K, V>(&mut self, index: &str, k: K, v: V)
    where
        K: IndexType,
        V: IndexType,
    {
        if let Some(ref mut indexes) = self.indexes {
            indexes.put(index, k, v);
        }
    }

    pub fn add_remove<K, V>(&mut self, index: &str, k: K, v: Option<V>)
    where
        K: IndexType,
        V: IndexType,
    {
        if let Some(ref mut indexes) = self.indexes {
            indexes.remove(index, k, v);
        }
    }

    pub fn apply_changes<K, V>(
        &self,
        vm: ValueMode,
        index: &str,
        k: &K,
        pers: Option<Value<V>>,
    ) -> PRes<Option<Value<V>>>
    where
        K: IndexType,
        V: IndexType,
    {
        if let Some(ref indexes) = self.indexes {
            indexes.apply_changes(index, vm, k, pers)
        } else {
            Ok(pers)
        }
    }

    pub fn index_range<K, V, R>(&self, index_name: &str, range: R) -> Option<IntoIter<K>>
    where
        K: IndexType,
        V: IndexType,
        R: RangeBounds<K>,
    {
        if let Some(ind) = &self.indexes {
            ind.range::<K, V, R>(index_name, range)
        } else {
            None
        }
    }

    pub fn recover_delete(&mut self, delete: &DeleteRecord) {
        self.segs_updated.insert(delete.segment);
        self.deleted.push(delete.clone());
    }

    pub fn scan_insert(&self, seg: u32) -> TransactionInsertScanner {
        TransactionInsertScanner { tx: self, segment: seg }
    }

    pub fn read(&self, rec_ref: &RecRef) -> TxRead {
        for ele in &self.deleted {
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
                return TxRead::DELETED;
            }
        }
        if let Some(ele) = self
            .updated
            .iter()
            .rev()
            .find(|ele| ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos)
        {
            return TxRead::RECORD((ele.record_page, ele.version));
        }
        for ele in &self.inserted {
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
                return TxRead::RECORD((ele.record_page, 1));
            }
        }
        TxRead::NONE
    }

    pub fn recover_prepare_commit(
        &mut self,
        journal: &Journal,
        address: &Address,
        allocator: &Allocator,
    ) -> PRes<PreparedState> {
        let mut prepared = PreparedState::new();
        let _ = self.collapse_operations();
        let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
            self.recover_rollback(journal, address, allocator)?;
            return Err(x);
        }
        prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
        let check_version = self.strategy != TxStrategy::LastWin;
        if let Err(x) = address.check_persistent_records(&records, check_version) {
            self.recover_rollback(journal, address, allocator)?;
            return Err(x);
        }
        if let Err(x) = address.confirm_allocations(&crt_upd_segs, false) {
            self.recover_rollback(journal, address, allocator)?;
            return Err(x);
        }
        Ok(prepared)
    }

    pub fn prepare_commit(
        mut self,
        journal: &Journal,
        address: &Address,
        indexes: &Indexes,
        snapshots: &Snapshots,
        persy_impl: &PersyImpl,
        allocator: &Allocator,
    ) -> PRes<(Transaction, PreparedState)> {
        let mut prepared = PreparedState::new();
        let ind = self.indexes;
        self.indexes = None;
        if let Some(ind_change) = ind {
            let mut to_lock = ind_change.changed_indexes();
            to_lock.sort();
            if let Err(err) = indexes.write_lock(&to_lock) {
                prepared.locked_indexes = Some(to_lock);
                self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
                return Err(err);
            }
            prepared.locked_indexes = Some(to_lock);
            ind_change.apply(persy_impl, &mut self)?;
        }

        let mut freed_pages = self.collapse_operations();

        let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
            self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
            return Err(x);
        };
        prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));

        let check_version = self.strategy != TxStrategy::LastWin;
        let old_records = match address.check_persistent_records(&records, check_version) {
            Ok(old) => old,
            Err(x) => {
                self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
                return Err(x);
            }
        };
        let segs: Vec<_> = self.segs_updated.iter().map(|x| *x).collect();
        if let Err(x) = address.confirm_allocations(&segs, false) {
            self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
            return Err(x);
        }

        for dropped_seg in &self.segs_dropped {
            let pages = address.collect_segment_pages(allocator, *dropped_seg)?;
            for p in pages.into_iter().map(FreedPage::new) {
                freed_pages.insert(p);
            }
        }
        let mut snapshot_entries = Vec::new();
        for old_record in &old_records {
            freed_pages.insert(FreedPage::new(old_record.record_page));
            snapshot_entries.push(SnapshotEntry::new(
                &old_record.recref,
                old_record.record_page,
                old_record.version,
            ));
        }
        for freed_page in &freed_pages {
            journal.log(freed_page, &self.id)?;
        }
        let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
        freed_pages_vec.reverse();
        prepared.snapshot_id = Some(snapshots.snapshot(snapshot_entries, freed_pages_vec.clone(), self.id.clone())?);
        self.freed_pages = Some(freed_pages_vec);
        journal.prepare(&PrepareCommit::new(), &self.id)?;
        allocator.disc().sync()?;
        Ok((self, prepared))
    }

    fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
        let mut pages_to_free = BTreeSet::new();
        let mut inserted_by_id = HashMap::new();
        for insert in self.inserted.drain(..) {
            inserted_by_id.insert(insert.recref.clone(), insert);
        }

        let mut updated_by_id = HashMap::new();
        for update in self.updated.drain(..) {
            match updated_by_id.entry(update.recref.clone()) {
                Entry::Vacant(e) => {
                    e.insert(update);
                }
                Entry::Occupied(mut e) => {
                    pages_to_free.insert(FreedPage::new(e.get().record_page));
                    e.get_mut().record_page = update.record_page;
                }
            }
        }

        for (k, insert) in &mut inserted_by_id {
            if let Some(update) = updated_by_id.remove(&k) {
                pages_to_free.insert(FreedPage::new(insert.record_page));
                insert.record_page = update.record_page;
            }
        }

        let mut i = 0;
        while i != self.deleted.len() {
            if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
                self.deleted.remove(i);
                pages_to_free.insert(FreedPage::new(insert.record_page));
            } else {
                i += 1;
            }
        }

        for delete in &self.deleted {
            if let Some(update) = updated_by_id.remove(&delete.recref) {
                pages_to_free.insert(FreedPage::new(update.record_page));
            }
        }

        for (_, insert) in inserted_by_id.drain() {
            if self.segs_dropped.contains(&insert.segment) {
                pages_to_free.insert(FreedPage::new(insert.record_page));
            } else {
                self.inserted.push(insert);
            }
        }

        for (_, update) in updated_by_id.drain() {
            if self.segs_dropped.contains(&update.segment) {
                pages_to_free.insert(FreedPage::new(update.record_page));
            } else {
                self.updated.push(update);
            }
        }
        pages_to_free
    }

    fn coll_locks(&self) -> (Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>) {
        let mut crt_upd_segs = Vec::new();
        for create in &self.segs_created {
            if !&self.segs_dropped.contains(create) {
                crt_upd_segs.push(create.clone());
            }
        }
        for update in &self.segs_updated {
            if !&self.segs_dropped.contains(update) {
                crt_upd_segs.push(update.clone());
            }
        }

        let mut dropped_segs = Vec::new();
        for dropped in &self.segs_dropped {
            dropped_segs.push(dropped.clone());
        }
        let mut records = HashSet::new();

        // No need to lock on inserted records the new id unique is managed by the address.
        //
        for update in &self.updated {
            let mut version = update.version;
            // I found values in the read only for VersionOnRead
            if let Some(read_v) = self.read.get(&update.recref) {
                version = read_v.version;
            }
            records.insert((update.segment, update.recref.clone(), version));
        }

        for delete in &self.deleted {
            let mut version = delete.version;
            // I found values in the read only for VersionOnRead
            if let Some(read_v) = self.read.get(&delete.recref) {
                version = read_v.version;
            }
            records.insert((delete.segment, delete.recref.clone(), version));
        }

        for insert in &self.inserted {
            records.remove(&(insert.segment, insert.recref.clone(), 1));
        }

        let mut sorted_records: Vec<(u32, RecRef, u16)> = records.iter().cloned().collect();
        sorted_records.sort_by_key(|ref x| x.1.clone());
        crt_upd_segs.sort();
        dropped_segs.sort();
        (sorted_records, crt_upd_segs, dropped_segs)
    }

    fn internal_rollback(&self, address: &Address, allocator: &Allocator) -> PRes<()> {
        let mut dropped_segs = Vec::new();
        for create in &self.segs_created {
            dropped_segs.push(create.clone());
        }
        for insert in &self.inserted {
            address.rollback(&insert.recref)?;
            if dropped_segs.contains(&insert.segment) {
                allocator.free(insert.record_page)?;
            }
        }

        for create in &self.segs_created {
            address.drop_temp_segment(*create)?;
        }

        for update in &self.updated {
            if dropped_segs.contains(&update.segment) {
                allocator.free(update.record_page)?;
            }
        }
        Ok(())
    }

    pub fn recover_rollback(&self, journal: &Journal, address: &Address, allocator: &Allocator) -> PRes<()> {
        journal.end(&Rollback::new(), &self.id)?;
        self.internal_rollback(address, allocator)?;
        journal.clear_all(&[self.id.clone()])?;
        Ok(())
    }

    pub fn rollback(&mut self, journal: &Journal, address: &Address, allocator: &Allocator) -> PRes<()> {
        journal.end(&Rollback::new(), &self.id)?;
        self.internal_rollback(address, allocator)?;
        allocator.flush_free_list()?;
        journal.clear_all(&[self.id.clone()])?;
        allocator.disc().sync()?;
        Ok(())
    }

    pub fn rollback_prepared(
        &mut self,
        journal: &Journal,
        address: &Address,
        indexes: &Indexes,
        snapshots: &Snapshots,
        allocator: &Allocator,
        prepared: PreparedState,
    ) -> PRes<()> {
        journal.end(&Rollback::new(), &self.id)?;

        for insert in &self.inserted {
            address.rollback(&insert.recref)?;
        }
        //TODO: Double check the order of release locks and internal rollback.
        if let Some((records, crt_upd_segs, delete_segs)) = &prepared.data_locks {
            address.release_locks(records, crt_upd_segs, delete_segs)?;
        }

        if let Some(il) = &prepared.locked_indexes {
            indexes.write_unlock(il)?;
        }

        self.internal_rollback(address, allocator)?;

        if let Some(snapshot_id) = prepared.snapshot_id {
            let (to_free, to_clean) = snapshots.release(snapshot_id)?;
            for page in to_free {
                allocator.free(page.page)?;
            }
            allocator.flush_free_list()?;
            journal.clear_all(&to_clean)?;
            allocator.disc().sync()?;
        }
        Ok(())
    }

    pub fn recover_commit(
        &mut self,
        journal: &Journal,
        address: &Address,
        indexes: &Indexes,
        allocator: &Allocator,
        prepared: PreparedState,
    ) -> PRes<()> {
        self.internal_commit(address, indexes, true, &prepared)?;
        journal.end(&Commit::new(), &self.id)?;
        if let Some(ref up_free) = self.freed_pages {
            for to_free in up_free {
                allocator.free(to_free.page)?;
            }
        }
        allocator.disc().sync()?;
        Ok(())
    }

    fn internal_commit(
        &mut self,
        address: &Address,
        indexes: &Indexes,
        recover: bool,
        prepared: &PreparedState,
    ) -> PRes<()> {
        // This are pages that are empty but we can't unlink because of possible running scans
        let _pages_to_unlink = address.apply(
            &self.segs_new_pages,
            &self.inserted,
            &self.updated,
            &self.deleted,
            &self.segments_operations,
            recover,
        )?;
        if let Some((records, crt_upd_segs, deleted_segs)) = &prepared.data_locks {
            address.release_locks(&records, &crt_upd_segs, &deleted_segs)?;
        }

        if let Some(il) = &prepared.locked_indexes {
            indexes.write_unlock(il)?;
        }

        Ok(())
    }

    pub fn commit(
        &mut self,
        address: &Address,
        journal: &Journal,
        indexes: &Indexes,
        snapshots: &Snapshots,
        allocator: &Allocator,
        prepared: PreparedState,
    ) -> PRes<()> {
        self.internal_commit(address, indexes, false, &prepared)?;
        journal.end(&Commit::new(), &self.id)?;

        if let Some(snapshot_id) = prepared.snapshot_id {
            let (to_free, to_clean) = snapshots.release(snapshot_id)?;
            for page in to_free {
                allocator.free(page.page)?;
            }
            allocator.flush_free_list()?;
            journal.clear_all(&to_clean)?;
            allocator.disc().sync()?;
        }
        Ok(())
    }

    pub fn recover_metadata(&mut self, metadata: &Metadata) {
        self.strategy = metadata.strategy.clone();
        self.meta_id = metadata.meta_id.clone();
    }

    pub fn recover_freed_page(&mut self, freed: &FreedPage) {
        let pages = self.freed_pages.get_or_insert(Vec::new());
        pages.push(freed.clone());
    }

    pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
        self.segs_new_pages.push(new_page.clone());
    }

    pub fn meta_id<'a>(&'a self) -> &'a Vec<u8> {
        &self.meta_id
    }

    pub fn filter_list<'a>(&'a self, pers: &'a [(String, u32)]) -> impl Iterator<Item = (&'a str, u32)> + 'a {
        let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
        let inner = self.segments_operations.iter().filter_map(|seg| {
            if let SegmentOperation::CREATE(c) = seg {
                Some((c.name.as_str(), c.segment_id))
            } else {
                None
            }
        });

        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
    }
}

impl DeleteRecord {
    pub fn new(segment: u32, rec_ref: &RecRef, version: u16) -> DeleteRecord {
        DeleteRecord {
            segment,
            recref: rec_ref.clone(),
            version,
        }
    }
}

impl UpdateRecord {
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
        UpdateRecord {
            segment,
            recref: rec_ref.clone(),
            record_page: record,
            version,
        }
    }
}

impl ReadRecord {
    pub fn new(segment: u32, recref: &RecRef, version: u16) -> ReadRecord {
        ReadRecord {
            segment,
            recref: recref.clone(),
            version,
        }
    }
}

impl PrepareCommit {
    pub fn new() -> PrepareCommit {
        PrepareCommit {}
    }
}

impl Commit {
    pub fn new() -> Commit {
        Commit {}
    }
}
impl Rollback {
    pub fn new() -> Rollback {
        Rollback {}
    }
}

impl InsertRecord {
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64) -> InsertRecord {
        InsertRecord {
            segment,
            recref: rec_ref.clone(),
            record_page: record,
        }
    }
}

impl CreateSegment {
    pub fn new(name: &str, segment_id: u32, first_page: u64) -> CreateSegment {
        CreateSegment {
            name: name.into(),
            segment_id,
            first_page,
        }
    }
}

impl DropSegment {
    pub fn new(name: &str, segment_id: u32) -> DropSegment {
        DropSegment {
            name: name.into(),
            segment_id,
        }
    }
}

impl Metadata {
    pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
        Metadata {
            strategy: strategy.clone(),
            meta_id,
        }
    }
}

impl FreedPage {
    pub fn new(page: u64) -> FreedPage {
        FreedPage { page }
    }
}

impl NewSegmentPage {
    pub fn new(segment: u32, page: u64, previous: u64) -> NewSegmentPage {
        NewSegmentPage {
            segment,
            page,
            previous,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{DeleteRecord, FreedPage, InsertRecord, Transaction, UpdateRecord};
    use crate::{id::RecRef, journal::JournalId};

    #[test]
    fn test_scan_insert() {
        let mut tx = Transaction::recover(JournalId::new(0, 0));
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(4, 2), 2));
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(0, 1), 3));
        let mut count = 0;
        for x in tx.scan_insert(10) {
            assert_eq!(x.pos, 2);
            count += 1;
        }
        assert_eq!(count, 2);
    }

    #[test]
    fn test_collapse() {
        let mut tx = Transaction::recover(JournalId::new(0, 0));
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 1), 1));
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 3), 3));
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 4), 4));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 5, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 6, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 2), 7, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 8, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 9, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 6), 10, 1));
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 7), 11, 1));
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 1), 0));
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 3), 1));
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 6), 2));
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 8), 2));
        let free = tx.collapse_operations();
        assert_eq!(free.len(), 7);
        for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
            assert!(free.contains(&e));
        }
        assert_eq!(tx.inserted.len(), 2);
        assert_eq!(tx.updated.len(), 2);
        assert_eq!(tx.deleted.len(), 2);
    }
}