use crate::index::{
config::{IndexType, Indexes, ValueMode},
keeper::IndexTransactionKeeper,
tree::Value,
};
use crate::{
address::Address,
allocator::Allocator,
config::TxStrategy,
error::{PRes, PersyError},
id::RecRef,
journal::{Journal, JournalId},
persy::PersyImpl,
snapshot::{SnapshotEntry, SnapshotId, Snapshots},
};
use std::{
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
ops::RangeBounds,
vec::{self, IntoIter},
};
#[derive(Clone)]
pub struct NewSegmentPage {
pub segment: u32,
pub page: u64,
pub previous: u64,
}
#[derive(Clone)]
pub struct InsertRecord {
pub segment: u32,
pub recref: RecRef,
pub record_page: u64,
}
#[derive(Clone)]
pub struct UpdateRecord {
pub segment: u32,
pub recref: RecRef,
pub record_page: u64,
pub version: u16,
}
#[derive(Clone)]
pub struct ReadRecord {
pub segment: u32,
pub recref: RecRef,
pub version: u16,
}
#[derive(Clone)]
pub struct DeleteRecord {
pub segment: u32,
pub recref: RecRef,
pub version: u16,
}
#[derive(Clone)]
pub struct CreateSegment {
pub name: String,
pub segment_id: u32,
pub first_page: u64,
}
#[derive(Clone)]
pub struct DropSegment {
pub name: String,
pub segment_id: u32,
}
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq)]
pub struct FreedPage {
pub page: u64,
}
pub struct PrepareCommit {}
pub struct Commit {}
pub struct Rollback {}
pub struct Metadata {
pub strategy: TxStrategy,
pub meta_id: Vec<u8>,
}
pub enum SegmentOperation {
CREATE(CreateSegment),
DROP(DropSegment),
}
#[derive(Clone)]
pub struct PreparedState {
locked_indexes: Option<Vec<String>>,
snapshot_id: Option<SnapshotId>,
data_locks: Option<(Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>)>,
}
impl PreparedState {
fn new() -> PreparedState {
PreparedState {
locked_indexes: None,
snapshot_id: None,
data_locks: None,
}
}
}
pub struct Transaction {
strategy: TxStrategy,
meta_id: Vec<u8>,
id: JournalId,
inserted: Vec<InsertRecord>,
updated: Vec<UpdateRecord>,
deleted: Vec<DeleteRecord>,
read: HashMap<RecRef, ReadRecord>,
segments_operations: Vec<SegmentOperation>,
segs_created_names: HashSet<String>,
segs_dropped_names: HashSet<String>,
segs_created: HashSet<u32>,
segs_dropped: HashSet<u32>,
segs_updated: HashSet<u32>,
freed_pages: Option<Vec<FreedPage>>,
indexes: Option<IndexTransactionKeeper>,
segs_new_pages: Vec<NewSegmentPage>,
}
pub enum TxRead {
RECORD((u64, u16)),
DELETED,
NONE,
}
pub enum TxSegCheck {
CREATED(u32),
DROPPED,
NONE,
}
pub struct TransactionInsertScanner<'a> {
tx: &'a Transaction,
segment: u32,
}
pub struct TransactionInsertIterator {
iter: vec::IntoIter<InsertRecord>,
segment: u32,
}
impl<'a> IntoIterator for TransactionInsertScanner<'a> {
type Item = RecRef;
type IntoIter = TransactionInsertIterator;
fn into_iter(self) -> Self::IntoIter {
let iter: vec::IntoIter<InsertRecord> = self.tx.inserted.clone().into_iter();
TransactionInsertIterator {
iter,
segment: self.segment,
}
}
}
impl Iterator for TransactionInsertIterator {
type Item = RecRef;
fn next(&mut self) -> Option<RecRef> {
loop {
let next = self.iter.next();
if let Some(rec) = next {
if rec.segment == self.segment {
return Some(rec.recref);
}
} else {
return None;
}
}
}
}
impl Transaction {
pub fn new(journal: &Journal, strategy: &TxStrategy, meta_id: Vec<u8>) -> PRes<Transaction> {
let id = journal.start()?;
journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
Ok(Transaction {
strategy: strategy.clone(),
meta_id,
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created_names: HashSet::new(),
segs_dropped_names: HashSet::new(),
segs_created: HashSet::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
indexes: Some(IndexTransactionKeeper::new()),
segs_new_pages: Vec::new(),
})
}
pub fn recover(id: JournalId) -> Transaction {
Transaction {
strategy: TxStrategy::LastWin,
meta_id: Vec::new(),
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created_names: HashSet::new(),
segs_dropped_names: HashSet::new(),
segs_created: HashSet::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
indexes: Some(IndexTransactionKeeper::new()),
segs_new_pages: Vec::new(),
}
}
pub fn segment_created_in_tx(&self, segment: u32) -> bool {
self.segs_created.contains(&segment)
}
pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
if self.segs_created_names.contains(segment) {
for a in &self.segments_operations {
if let SegmentOperation::CREATE(ref c) = a {
if c.name == segment {
return TxSegCheck::CREATED(c.segment_id);
}
}
}
} else if self.segs_dropped_names.contains(segment) {
return TxSegCheck::DROPPED;
}
TxSegCheck::NONE
}
pub fn add_create_segment(&mut self, journal: &Journal, name: &str, segment_id: u32, first_page: u64) -> PRes<()> {
let create = CreateSegment::new(name, segment_id, first_page);
journal.log(&create, &self.id)?;
self.segments_operations.push(SegmentOperation::CREATE(create));
self.segs_created.insert(segment_id);
self.segs_created_names.insert(name.into());
Ok(())
}
pub fn recover_add(&mut self, create: &CreateSegment) {
self.segments_operations.push(SegmentOperation::CREATE(create.clone()));
self.segs_created.insert(create.segment_id);
self.segs_created_names.insert(create.name.clone());
}
pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: u32) -> PRes<()> {
if self.segs_created_names.contains(name) {
Err(PersyError::CannotDropSegmentCreatedInTx)
} else {
let drop = DropSegment::new(name, segment_id);
journal.log(&drop, &self.id)?;
self.segments_operations.push(SegmentOperation::DROP(drop));
self.segs_dropped.insert(segment_id);
self.segs_dropped_names.insert(name.into());
Ok(())
}
}
pub fn recover_drop(&mut self, drop: &DropSegment) {
self.segments_operations.push(SegmentOperation::DROP(drop.clone()));
self.segs_dropped.insert(drop.segment_id);
self.segs_dropped_names.insert(drop.name.clone());
}
pub fn add_read(&mut self, journal: &Journal, segment: u32, recref: &RecRef, version: u16) -> PRes<()> {
if self.strategy == TxStrategy::VersionOnRead {
let read = ReadRecord::new(segment, recref, version);
journal.log(&read, &self.id)?;
self.read.insert(recref.clone(), read);
}
Ok(())
}
pub fn recover_read(&mut self, read: &ReadRecord) {
self.read.insert(read.recref.clone(), read.clone());
}
pub fn add_insert(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, record: u64) -> PRes<()> {
self.segs_updated.insert(segment);
let insert = InsertRecord::new(segment, rec_ref, record);
journal.log(&insert, &self.id)?;
self.inserted.push(insert);
Ok(())
}
pub fn add_new_segment_page(
&mut self,
journal: &Journal,
segment: u32,
new_page: u64,
previous_page: u64,
) -> PRes<()> {
let new_page = NewSegmentPage::new(segment, new_page, previous_page);
journal.log(&new_page, &self.id)?;
self.segs_new_pages.push(new_page);
Ok(())
}
pub fn recover_insert(&mut self, insert: &InsertRecord) {
self.segs_updated.insert(insert.segment);
self.inserted.push(insert.clone());
}
pub fn add_update(
&mut self,
journal: &Journal,
segment: u32,
rec_ref: &RecRef,
record: u64,
version: u16,
) -> PRes<()> {
self.segs_updated.insert(segment);
let update = UpdateRecord::new(segment, rec_ref, record, version);
journal.log(&update, &self.id)?;
self.updated.push(update);
Ok(())
}
pub fn recover_update(&mut self, update: &UpdateRecord) {
self.segs_updated.insert(update.segment);
self.updated.push(update.clone());
}
pub fn add_delete(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, version: u16) -> PRes<()> {
self.segs_updated.insert(segment);
let delete = DeleteRecord::new(segment, rec_ref, version);
journal.log(&delete, &self.id)?;
self.deleted.push(delete);
Ok(())
}
pub fn add_put<K, V>(&mut self, index: &str, k: K, v: V)
where
K: IndexType,
V: IndexType,
{
if let Some(ref mut indexes) = self.indexes {
indexes.put(index, k, v);
}
}
pub fn add_remove<K, V>(&mut self, index: &str, k: K, v: Option<V>)
where
K: IndexType,
V: IndexType,
{
if let Some(ref mut indexes) = self.indexes {
indexes.remove(index, k, v);
}
}
pub fn apply_changes<K, V>(
&self,
vm: ValueMode,
index: &str,
k: &K,
pers: Option<Value<V>>,
) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
if let Some(ref indexes) = self.indexes {
indexes.apply_changes(index, vm, k, pers)
} else {
Ok(pers)
}
}
pub fn index_range<K, V, R>(&self, index_name: &str, range: R) -> Option<IntoIter<K>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
if let Some(ind) = &self.indexes {
ind.range::<K, V, R>(index_name, range)
} else {
None
}
}
pub fn recover_delete(&mut self, delete: &DeleteRecord) {
self.segs_updated.insert(delete.segment);
self.deleted.push(delete.clone());
}
pub fn scan_insert(&self, seg: u32) -> TransactionInsertScanner {
TransactionInsertScanner { tx: self, segment: seg }
}
pub fn read(&self, rec_ref: &RecRef) -> TxRead {
for ele in &self.deleted {
if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
return TxRead::DELETED;
}
}
if let Some(ele) = self
.updated
.iter()
.rev()
.find(|ele| ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos)
{
return TxRead::RECORD((ele.record_page, ele.version));
}
for ele in &self.inserted {
if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
return TxRead::RECORD((ele.record_page, 1));
}
}
TxRead::NONE
}
pub fn recover_prepare_commit(
&mut self,
journal: &Journal,
address: &Address,
allocator: &Allocator,
) -> PRes<PreparedState> {
let mut prepared = PreparedState::new();
let _ = self.collapse_operations();
let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
self.recover_rollback(journal, address, allocator)?;
return Err(x);
}
prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
let check_version = self.strategy != TxStrategy::LastWin;
if let Err(x) = address.check_persistent_records(&records, check_version) {
self.recover_rollback(journal, address, allocator)?;
return Err(x);
}
if let Err(x) = address.confirm_allocations(&crt_upd_segs, false) {
self.recover_rollback(journal, address, allocator)?;
return Err(x);
}
Ok(prepared)
}
pub fn prepare_commit(
mut self,
journal: &Journal,
address: &Address,
indexes: &Indexes,
snapshots: &Snapshots,
persy_impl: &PersyImpl,
allocator: &Allocator,
) -> PRes<(Transaction, PreparedState)> {
let mut prepared = PreparedState::new();
let ind = self.indexes;
self.indexes = None;
if let Some(ind_change) = ind {
let mut to_lock = ind_change.changed_indexes();
to_lock.sort();
if let Err(err) = indexes.write_lock(&to_lock) {
prepared.locked_indexes = Some(to_lock);
self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
return Err(err);
}
prepared.locked_indexes = Some(to_lock);
ind_change.apply(persy_impl, &mut self)?;
}
let mut freed_pages = self.collapse_operations();
let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
return Err(x);
};
prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
let check_version = self.strategy != TxStrategy::LastWin;
let old_records = match address.check_persistent_records(&records, check_version) {
Ok(old) => old,
Err(x) => {
self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
return Err(x);
}
};
let segs: Vec<_> = self.segs_updated.iter().map(|x| *x).collect();
if let Err(x) = address.confirm_allocations(&segs, false) {
self.rollback_prepared(journal, address, indexes, snapshots, allocator, prepared)?;
return Err(x);
}
for dropped_seg in &self.segs_dropped {
let pages = address.collect_segment_pages(allocator, *dropped_seg)?;
for p in pages.into_iter().map(FreedPage::new) {
freed_pages.insert(p);
}
}
let mut snapshot_entries = Vec::new();
for old_record in &old_records {
freed_pages.insert(FreedPage::new(old_record.record_page));
snapshot_entries.push(SnapshotEntry::new(
&old_record.recref,
old_record.record_page,
old_record.version,
));
}
for freed_page in &freed_pages {
journal.log(freed_page, &self.id)?;
}
let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
freed_pages_vec.reverse();
prepared.snapshot_id = Some(snapshots.snapshot(snapshot_entries, freed_pages_vec.clone(), self.id.clone())?);
self.freed_pages = Some(freed_pages_vec);
journal.prepare(&PrepareCommit::new(), &self.id)?;
allocator.disc().sync()?;
Ok((self, prepared))
}
fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
let mut pages_to_free = BTreeSet::new();
let mut inserted_by_id = HashMap::new();
for insert in self.inserted.drain(..) {
inserted_by_id.insert(insert.recref.clone(), insert);
}
let mut updated_by_id = HashMap::new();
for update in self.updated.drain(..) {
match updated_by_id.entry(update.recref.clone()) {
Entry::Vacant(e) => {
e.insert(update);
}
Entry::Occupied(mut e) => {
pages_to_free.insert(FreedPage::new(e.get().record_page));
e.get_mut().record_page = update.record_page;
}
}
}
for (k, insert) in &mut inserted_by_id {
if let Some(update) = updated_by_id.remove(&k) {
pages_to_free.insert(FreedPage::new(insert.record_page));
insert.record_page = update.record_page;
}
}
let mut i = 0;
while i != self.deleted.len() {
if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
self.deleted.remove(i);
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
i += 1;
}
}
for delete in &self.deleted {
if let Some(update) = updated_by_id.remove(&delete.recref) {
pages_to_free.insert(FreedPage::new(update.record_page));
}
}
for (_, insert) in inserted_by_id.drain() {
if self.segs_dropped.contains(&insert.segment) {
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
self.inserted.push(insert);
}
}
for (_, update) in updated_by_id.drain() {
if self.segs_dropped.contains(&update.segment) {
pages_to_free.insert(FreedPage::new(update.record_page));
} else {
self.updated.push(update);
}
}
pages_to_free
}
fn coll_locks(&self) -> (Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>) {
let mut crt_upd_segs = Vec::new();
for create in &self.segs_created {
if !&self.segs_dropped.contains(create) {
crt_upd_segs.push(create.clone());
}
}
for update in &self.segs_updated {
if !&self.segs_dropped.contains(update) {
crt_upd_segs.push(update.clone());
}
}
let mut dropped_segs = Vec::new();
for dropped in &self.segs_dropped {
dropped_segs.push(dropped.clone());
}
let mut records = HashSet::new();
for update in &self.updated {
let mut version = update.version;
if let Some(read_v) = self.read.get(&update.recref) {
version = read_v.version;
}
records.insert((update.segment, update.recref.clone(), version));
}
for delete in &self.deleted {
let mut version = delete.version;
if let Some(read_v) = self.read.get(&delete.recref) {
version = read_v.version;
}
records.insert((delete.segment, delete.recref.clone(), version));
}
for insert in &self.inserted {
records.remove(&(insert.segment, insert.recref.clone(), 1));
}
let mut sorted_records: Vec<(u32, RecRef, u16)> = records.iter().cloned().collect();
sorted_records.sort_by_key(|ref x| x.1.clone());
crt_upd_segs.sort();
dropped_segs.sort();
(sorted_records, crt_upd_segs, dropped_segs)
}
fn internal_rollback(&self, address: &Address, allocator: &Allocator) -> PRes<()> {
let mut dropped_segs = Vec::new();
for create in &self.segs_created {
dropped_segs.push(create.clone());
}
for insert in &self.inserted {
address.rollback(&insert.recref)?;
if dropped_segs.contains(&insert.segment) {
allocator.free(insert.record_page)?;
}
}
for create in &self.segs_created {
address.drop_temp_segment(*create)?;
}
for update in &self.updated {
if dropped_segs.contains(&update.segment) {
allocator.free(update.record_page)?;
}
}
Ok(())
}
pub fn recover_rollback(&self, journal: &Journal, address: &Address, allocator: &Allocator) -> PRes<()> {
journal.end(&Rollback::new(), &self.id)?;
self.internal_rollback(address, allocator)?;
journal.clear_all(&[self.id.clone()])?;
Ok(())
}
pub fn rollback(&mut self, journal: &Journal, address: &Address, allocator: &Allocator) -> PRes<()> {
journal.end(&Rollback::new(), &self.id)?;
self.internal_rollback(address, allocator)?;
allocator.flush_free_list()?;
journal.clear_all(&[self.id.clone()])?;
allocator.disc().sync()?;
Ok(())
}
pub fn rollback_prepared(
&mut self,
journal: &Journal,
address: &Address,
indexes: &Indexes,
snapshots: &Snapshots,
allocator: &Allocator,
prepared: PreparedState,
) -> PRes<()> {
journal.end(&Rollback::new(), &self.id)?;
for insert in &self.inserted {
address.rollback(&insert.recref)?;
}
if let Some((records, crt_upd_segs, delete_segs)) = &prepared.data_locks {
address.release_locks(records, crt_upd_segs, delete_segs)?;
}
if let Some(il) = &prepared.locked_indexes {
indexes.write_unlock(il)?;
}
self.internal_rollback(address, allocator)?;
if let Some(snapshot_id) = prepared.snapshot_id {
let (to_free, to_clean) = snapshots.release(snapshot_id)?;
for page in to_free {
allocator.free(page.page)?;
}
allocator.flush_free_list()?;
journal.clear_all(&to_clean)?;
allocator.disc().sync()?;
}
Ok(())
}
pub fn recover_commit(
&mut self,
journal: &Journal,
address: &Address,
indexes: &Indexes,
allocator: &Allocator,
prepared: PreparedState,
) -> PRes<()> {
self.internal_commit(address, indexes, true, &prepared)?;
journal.end(&Commit::new(), &self.id)?;
if let Some(ref up_free) = self.freed_pages {
for to_free in up_free {
allocator.free(to_free.page)?;
}
}
allocator.disc().sync()?;
Ok(())
}
fn internal_commit(
&mut self,
address: &Address,
indexes: &Indexes,
recover: bool,
prepared: &PreparedState,
) -> PRes<()> {
let _pages_to_unlink = address.apply(
&self.segs_new_pages,
&self.inserted,
&self.updated,
&self.deleted,
&self.segments_operations,
recover,
)?;
if let Some((records, crt_upd_segs, deleted_segs)) = &prepared.data_locks {
address.release_locks(&records, &crt_upd_segs, &deleted_segs)?;
}
if let Some(il) = &prepared.locked_indexes {
indexes.write_unlock(il)?;
}
Ok(())
}
pub fn commit(
&mut self,
address: &Address,
journal: &Journal,
indexes: &Indexes,
snapshots: &Snapshots,
allocator: &Allocator,
prepared: PreparedState,
) -> PRes<()> {
self.internal_commit(address, indexes, false, &prepared)?;
journal.end(&Commit::new(), &self.id)?;
if let Some(snapshot_id) = prepared.snapshot_id {
let (to_free, to_clean) = snapshots.release(snapshot_id)?;
for page in to_free {
allocator.free(page.page)?;
}
allocator.flush_free_list()?;
journal.clear_all(&to_clean)?;
allocator.disc().sync()?;
}
Ok(())
}
pub fn recover_metadata(&mut self, metadata: &Metadata) {
self.strategy = metadata.strategy.clone();
self.meta_id = metadata.meta_id.clone();
}
pub fn recover_freed_page(&mut self, freed: &FreedPage) {
let pages = self.freed_pages.get_or_insert(Vec::new());
pages.push(freed.clone());
}
pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
self.segs_new_pages.push(new_page.clone());
}
pub fn meta_id<'a>(&'a self) -> &'a Vec<u8> {
&self.meta_id
}
pub fn filter_list<'a>(&'a self, pers: &'a [(String, u32)]) -> impl Iterator<Item = (&'a str, u32)> + 'a {
let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
let inner = self.segments_operations.iter().filter_map(|seg| {
if let SegmentOperation::CREATE(c) = seg {
Some((c.name.as_str(), c.segment_id))
} else {
None
}
});
outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
}
}
impl DeleteRecord {
pub fn new(segment: u32, rec_ref: &RecRef, version: u16) -> DeleteRecord {
DeleteRecord {
segment,
recref: rec_ref.clone(),
version,
}
}
}
impl UpdateRecord {
pub fn new(segment: u32, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
UpdateRecord {
segment,
recref: rec_ref.clone(),
record_page: record,
version,
}
}
}
impl ReadRecord {
pub fn new(segment: u32, recref: &RecRef, version: u16) -> ReadRecord {
ReadRecord {
segment,
recref: recref.clone(),
version,
}
}
}
impl PrepareCommit {
pub fn new() -> PrepareCommit {
PrepareCommit {}
}
}
impl Commit {
pub fn new() -> Commit {
Commit {}
}
}
impl Rollback {
pub fn new() -> Rollback {
Rollback {}
}
}
impl InsertRecord {
pub fn new(segment: u32, rec_ref: &RecRef, record: u64) -> InsertRecord {
InsertRecord {
segment,
recref: rec_ref.clone(),
record_page: record,
}
}
}
impl CreateSegment {
pub fn new(name: &str, segment_id: u32, first_page: u64) -> CreateSegment {
CreateSegment {
name: name.into(),
segment_id,
first_page,
}
}
}
impl DropSegment {
pub fn new(name: &str, segment_id: u32) -> DropSegment {
DropSegment {
name: name.into(),
segment_id,
}
}
}
impl Metadata {
pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
Metadata {
strategy: strategy.clone(),
meta_id,
}
}
}
impl FreedPage {
pub fn new(page: u64) -> FreedPage {
FreedPage { page }
}
}
impl NewSegmentPage {
pub fn new(segment: u32, page: u64, previous: u64) -> NewSegmentPage {
NewSegmentPage {
segment,
page,
previous,
}
}
}
#[cfg(test)]
mod tests {
use super::{DeleteRecord, FreedPage, InsertRecord, Transaction, UpdateRecord};
use crate::{id::RecRef, journal::JournalId};
#[test]
fn test_scan_insert() {
let mut tx = Transaction::recover(JournalId::new(0, 0));
tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
tx.inserted.push(InsertRecord::new(10, &RecRef::new(4, 2), 2));
tx.inserted.push(InsertRecord::new(20, &RecRef::new(0, 1), 3));
let mut count = 0;
for x in tx.scan_insert(10) {
assert_eq!(x.pos, 2);
count += 1;
}
assert_eq!(count, 2);
}
#[test]
fn test_collapse() {
let mut tx = Transaction::recover(JournalId::new(0, 0));
tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 1), 1));
tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 3), 3));
tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 4), 4));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 5, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 6, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 2), 7, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 8, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 9, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 6), 10, 1));
tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 7), 11, 1));
tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 1), 0));
tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 3), 1));
tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 6), 2));
tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 8), 2));
let free = tx.collapse_operations();
assert_eq!(free.len(), 7);
for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
assert!(free.contains(&e));
}
assert_eq!(tx.inserted.len(), 2);
assert_eq!(tx.updated.len(), 2);
assert_eq!(tx.deleted.len(), 2);
}
}