use crate::{
address::Address,
config::TxStrategy,
error::{IndexChangeError, PERes, TimeoutError},
id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, RecRef, SegmentId},
journal::{Journal, JournalId},
persy::PersyImpl,
snapshots::{release_snapshot, SnapshotEntry, SnapshotId},
GenericError, PrepareError,
};
use crate::{
index::{
config::{IndexTypeInternal, ValueMode},
keeper::IndexTransactionKeeper,
tree::nodes::Value,
},
DropSegmentError,
};
use std::{
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
ops::RangeBounds,
vec::{self, IntoIter},
};
#[derive(Clone, Default)]
pub struct NewSegmentPage {
pub segment: SegmentId,
pub page: u64,
pub previous: u64,
}
#[derive(Clone, Default)]
pub struct InsertRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub record_page: u64,
}
#[derive(Clone, Default)]
pub struct UpdateRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub record_page: u64,
pub version: u16,
}
#[derive(Clone, Default)]
pub struct ReadRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub version: u16,
}
#[derive(Clone, Default)]
pub struct DeleteRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub version: u16,
}
#[derive(Clone, Default)]
pub struct CreateSegment {
pub name: String,
pub segment_id: SegmentId,
pub first_page: u64,
}
#[derive(Clone, Default)]
pub struct DropSegment {
pub name: String,
pub segment_id: SegmentId,
}
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
pub struct FreedPage {
pub page: u64,
}
#[derive(Default)]
pub struct PrepareCommit {}
#[derive(Default)]
pub struct Commit {}
#[derive(Default)]
pub struct Cleanup {}
#[derive(Default)]
pub struct Rollback {}
#[derive(Default)]
pub struct Metadata {
pub strategy: TxStrategy,
pub meta_id: Vec<u8>,
}
pub enum SegmentOperation {
Create(CreateSegment),
Drop(DropSegment),
}
#[derive(Clone)]
pub struct PreparedState {
locked_indexes: Option<Vec<IndexId>>,
snapshot_id: Option<SnapshotId>,
data_locks: Option<(Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>)>,
}
impl PreparedState {
fn new() -> PreparedState {
PreparedState {
locked_indexes: None,
snapshot_id: None,
data_locks: None,
}
}
}
pub enum SyncMode {
Sync,
BackgroundSync,
}
pub struct TransactionImpl {
strategy: TxStrategy,
sync_mode: SyncMode,
meta_id: Vec<u8>,
id: JournalId,
inserted: Vec<InsertRecord>,
updated: Vec<UpdateRecord>,
deleted: Vec<DeleteRecord>,
read: HashMap<RecRef, ReadRecord>,
segments_operations: Vec<SegmentOperation>,
segs_created_names: HashSet<String>,
segs_dropped_names: HashSet<String>,
segs_created: HashSet<SegmentId>,
segs_dropped: HashSet<SegmentId>,
segs_updated: HashSet<SegmentId>,
freed_pages: Option<Vec<FreedPage>>,
indexes: Option<IndexTransactionKeeper>,
segs_new_pages: Vec<NewSegmentPage>,
locked_index_segs: HashSet<SegmentId>,
locked_index_pages: HashSet<RecRef>,
locked_index_tracking: HashSet<(SegmentId, RecRef, u16)>,
}
pub enum TxRead {
Record((u64, u16)),
Deleted,
None,
}
pub enum TxSegCheck {
Created(SegmentId),
Dropped,
None,
}
pub struct TransactionInsertScanner<'a> {
tx: &'a TransactionImpl,
segment: SegmentId,
}
pub struct TransactionInsertIterator {
iter: vec::IntoIter<InsertRecord>,
segment: SegmentId,
}
impl<'a> IntoIterator for TransactionInsertScanner<'a> {
type Item = RecRef;
type IntoIter = TransactionInsertIterator;
fn into_iter(self) -> Self::IntoIter {
let iter: vec::IntoIter<InsertRecord> = self.tx.inserted.clone().into_iter();
TransactionInsertIterator {
iter,
segment: self.segment,
}
}
}
impl Iterator for TransactionInsertIterator {
type Item = RecRef;
fn next(&mut self) -> Option<RecRef> {
loop {
let next = self.iter.next();
if let Some(rec) = next {
if rec.segment == self.segment {
return Some(rec.recref);
}
} else {
return None;
}
}
}
}
impl TransactionImpl {
pub fn new(
journal: &Journal,
strategy: &TxStrategy,
sync_mode: SyncMode,
meta_id: Vec<u8>,
) -> PERes<TransactionImpl> {
let id = journal.start()?;
journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
Ok(TransactionImpl {
strategy: strategy.clone(),
sync_mode,
meta_id,
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created_names: HashSet::new(),
segs_dropped_names: HashSet::new(),
segs_created: HashSet::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
indexes: Some(IndexTransactionKeeper::new()),
segs_new_pages: Vec::new(),
locked_index_segs: HashSet::new(),
locked_index_pages: HashSet::new(),
locked_index_tracking: HashSet::new(),
})
}
pub fn recover(id: JournalId) -> TransactionImpl {
TransactionImpl {
strategy: TxStrategy::LastWin,
sync_mode: SyncMode::Sync,
meta_id: Vec::new(),
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created_names: HashSet::new(),
segs_dropped_names: HashSet::new(),
segs_created: HashSet::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
indexes: Some(IndexTransactionKeeper::new()),
segs_new_pages: Vec::new(),
locked_index_segs: HashSet::new(),
locked_index_pages: HashSet::new(),
locked_index_tracking: HashSet::new(),
}
}
pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
self.segs_created.contains(&segment)
}
pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
for info in &self.segments_operations {
if let SegmentOperation::Create(ref c) = info {
if c.segment_id == segment {
return Some(c.name.clone());
}
}
}
None
}
pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
if self.segs_created_names.contains(segment) {
for a in &self.segments_operations {
if let SegmentOperation::Create(ref c) = a {
if c.name == segment {
return TxSegCheck::Created(c.segment_id);
}
}
}
} else if self.segs_dropped_names.contains(segment) {
return TxSegCheck::Dropped;
}
TxSegCheck::None
}
pub fn add_create_segment(
&mut self,
journal: &Journal,
name: &str,
segment_id: SegmentId,
first_page: u64,
) -> PERes<()> {
let create = CreateSegment::new(name, segment_id, first_page);
journal.log(&create, &self.id)?;
self.segments_operations.push(SegmentOperation::Create(create));
self.segs_created.insert(segment_id);
self.segs_created_names.insert(name.into());
Ok(())
}
pub fn recover_add(&mut self, create: &CreateSegment) {
self.segments_operations.push(SegmentOperation::Create(create.clone()));
self.segs_created.insert(create.segment_id);
self.segs_created_names.insert(create.name.clone());
}
pub fn add_drop_segment(
&mut self,
journal: &Journal,
name: &str,
segment_id: SegmentId,
) -> Result<(), DropSegmentError> {
if self.segs_created_names.contains(name) {
Err(DropSegmentError::CannotDropSegmentCreatedInTx)
} else {
let drop = DropSegment::new(name, segment_id);
journal.log(&drop, &self.id)?;
self.segments_operations.push(SegmentOperation::Drop(drop));
self.segs_dropped.insert(segment_id);
self.segs_dropped_names.insert(name.into());
Ok(())
}
}
pub fn recover_drop(&mut self, drop: &DropSegment) {
self.segments_operations.push(SegmentOperation::Drop(drop.clone()));
self.segs_dropped.insert(drop.segment_id);
self.segs_dropped_names.insert(drop.name.clone());
}
pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PERes<()> {
if self.strategy == TxStrategy::VersionOnRead {
let read = ReadRecord::new(segment, recref, version);
journal.log(&read, &self.id)?;
self.read.insert(*recref, read);
}
Ok(())
}
pub fn recover_read(&mut self, read: &ReadRecord) {
self.read.insert(read.recref, read.clone());
}
pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PERes<()> {
self.segs_updated.insert(segment);
let insert = InsertRecord::new(segment, rec_ref, record);
journal.log(&insert, &self.id)?;
self.inserted.push(insert);
Ok(())
}
pub fn add_new_segment_page(
&mut self,
journal: &Journal,
segment: SegmentId,
new_page: u64,
previous_page: u64,
) -> PERes<()> {
let new_page = NewSegmentPage::new(segment, new_page, previous_page);
journal.log(&new_page, &self.id)?;
self.segs_new_pages.push(new_page);
Ok(())
}
pub fn recover_insert(&mut self, insert: &InsertRecord) {
self.segs_updated.insert(insert.segment);
self.inserted.push(insert.clone());
}
pub fn add_update(
&mut self,
journal: &Journal,
segment: SegmentId,
rec_ref: &RecRef,
record: u64,
version: u16,
) -> PERes<()> {
self.segs_updated.insert(segment);
let update = UpdateRecord::new(segment, rec_ref, record, version);
journal.log(&update, &self.id)?;
self.updated.push(update);
Ok(())
}
pub fn recover_update(&mut self, update: &UpdateRecord) {
self.segs_updated.insert(update.segment);
self.updated.push(update.clone());
}
pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PERes<()> {
self.segs_updated.insert(segment);
let delete = DeleteRecord::new(segment, rec_ref, version);
journal.log(&delete, &self.id)?;
self.deleted.push(delete);
Ok(())
}
pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref mut indexes) = self.indexes {
indexes.put(index, k, v);
}
}
pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref mut indexes) = self.indexes {
indexes.remove(index, k, v);
}
}
pub fn apply_changes<K, V>(
&self,
vm: ValueMode,
index: IndexId,
index_name: &str,
k: &K,
pers: Option<Value<V>>,
) -> Result<Option<Value<V>>, IndexChangeError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref indexes) = self.indexes {
indexes.apply_changes(index, index_name, vm, k, pers)
} else {
Ok(pers)
}
}
pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
if let Some(ind) = &self.indexes {
ind.range::<K, V, R>(index, range)
} else {
None
}
}
pub fn recover_delete(&mut self, delete: &DeleteRecord) {
self.segs_updated.insert(delete.segment);
self.deleted.push(delete.clone());
}
pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
TransactionInsertScanner { tx: self, segment: seg }
}
pub fn read(&self, rec_ref: &RecRef) -> TxRead {
for ele in &self.deleted {
if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
return TxRead::Deleted;
}
}
if let Some(ele) = self
.updated
.iter()
.rev()
.find(|ele| ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos)
{
return TxRead::Record((ele.record_page, ele.version));
}
for ele in &self.inserted {
if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
return TxRead::Record((ele.record_page, 1));
}
}
TxRead::None
}
pub fn recover_prepare(&mut self, persy_impl: &PersyImpl) -> Result<PreparedState, PrepareError> {
let address = persy_impl.address();
let mut prepared = PreparedState::new();
let _ = self.collapse_operations();
let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
self.recover_rollback(persy_impl)?;
return Err(x);
}
prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
let check_version = self.strategy != TxStrategy::LastWin;
if let Err(x) = address.check_persistent_records(&records, check_version) {
self.recover_rollback(persy_impl)?;
return Err(x);
}
if let Err(x) = address.confirm_allocations(&crt_upd_segs, true) {
self.recover_rollback(persy_impl)?;
return Err(PrepareError::from(x));
}
Ok(prepared)
}
fn solve_index_locks(&self) -> Vec<(SegmentId, RecRef, u16)> {
let mut records = HashSet::new();
for update in &self.updated {
if self.locked_index_pages.contains(&update.recref) {
records.insert((update.segment, update.recref, update.version));
}
}
for delete in &self.deleted {
if self.locked_index_pages.contains(&delete.recref) {
records.insert((delete.segment, delete.recref, delete.version));
}
}
records.into_iter().collect()
}
pub fn prepare(mut self, persy_impl: &PersyImpl) -> Result<(TransactionImpl, PreparedState), PrepareError> {
let indexes = persy_impl.indexes();
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let address = persy_impl.address();
let mut prepared = PreparedState::new();
let ind = self.indexes;
self.indexes = None;
if let Some(mut ind_change) = ind {
let changed_indexes = ind_change.changed_indexes();
for check in changed_indexes {
let segment_meta = index_id_to_segment_id_meta(&check);
if self.segs_dropped.contains(&segment_meta) {
ind_change.remove_changes(&check);
}
}
let to_lock = ind_change.changed_indexes();
if let Err(err) = indexes.read_lock_all(&to_lock) {
prepared.locked_indexes = Some(to_lock);
self.rollback_prepared(persy_impl, prepared)?;
return Err(PrepareError::from(err));
}
for index_id in &to_lock {
let segment_meta = index_id_to_segment_id_meta(index_id);
address.acquire_segment_read_lock(segment_meta)?;
self.locked_index_segs.insert(segment_meta);
let segment_data = index_id_to_segment_id_data(index_id);
address.acquire_segment_read_lock(segment_data)?;
self.locked_index_segs.insert(segment_data);
}
prepared.locked_indexes = Some(to_lock);
if let Err(x) = ind_change.apply(persy_impl, &mut self) {
self.rollback_prepared(persy_impl, prepared)?;
return Err(PrepareError::from(x));
}
}
let mut freed_pages = self.collapse_operations();
let (mut records, crt_upd_segs, dropped_segs) = self.coll_locks();
if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
self.rollback_prepared(persy_impl, prepared)?;
return Err(x);
};
records.extend_from_slice(&self.solve_index_locks());
prepared.data_locks = Some((records.clone(), crt_upd_segs, dropped_segs));
let check_version = self.strategy != TxStrategy::LastWin;
let old_records = match address.check_persistent_records(&records, check_version) {
Ok(old) => old,
Err(x) => {
self.rollback_prepared(persy_impl, prepared)?;
return Err(x);
}
};
let segs: Vec<_> = self.segs_updated.iter().copied().collect();
if let Err(x) = address.confirm_allocations(&segs, false) {
self.rollback_prepared(persy_impl, prepared)?;
return Err(PrepareError::from(x));
}
for dropped_seg in &self.segs_dropped {
let pages = address.collect_segment_pages(*dropped_seg)?;
for p in pages.into_iter().map(FreedPage::new) {
freed_pages.insert(p);
}
}
let mut snapshot_entries = Vec::with_capacity(old_records.len());
for old_record in &old_records {
freed_pages.insert(FreedPage::new(old_record.record_page));
snapshot_entries.push(SnapshotEntry::change(
&old_record.recref,
old_record.record_page,
old_record.version,
));
}
for insert in &self.inserted {
snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
}
for freed_page in &freed_pages {
journal.log(freed_page, &self.id)?;
}
let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
freed_pages_vec.reverse();
let snapshot_id = snapshots.snapshot(snapshot_entries, freed_pages_vec.clone(), self.id.clone())?;
prepared.snapshot_id = Some(snapshot_id);
self.freed_pages = Some(freed_pages_vec);
journal.prepare(&PrepareCommit::new(), &self.id)?;
allocator.flush_free_list()?;
self.sync(persy_impl, snapshot_id)?;
journal.clear_in_queque()?;
Ok((self, prepared))
}
fn sync(&self, persy_impl: &PersyImpl, snapshot_id: SnapshotId) -> PERes<()> {
persy_impl.transaction_sync(&self.sync_mode, snapshot_id)
}
fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
let mut pages_to_free = BTreeSet::new();
let mut inserted_by_id = HashMap::new();
for insert in self.inserted.drain(..) {
inserted_by_id.insert(insert.recref, insert);
}
let mut updated_by_id = HashMap::new();
for update in self.updated.drain(..) {
match updated_by_id.entry(update.recref) {
Entry::Vacant(e) => {
e.insert(update);
}
Entry::Occupied(mut e) => {
pages_to_free.insert(FreedPage::new(e.get().record_page));
e.get_mut().record_page = update.record_page;
}
}
}
for (k, insert) in &mut inserted_by_id {
if let Some(update) = updated_by_id.remove(k) {
pages_to_free.insert(FreedPage::new(insert.record_page));
insert.record_page = update.record_page;
}
}
let mut i = 0;
while i != self.deleted.len() {
if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
self.deleted.remove(i);
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
i += 1;
}
}
for delete in &self.deleted {
if let Some(update) = updated_by_id.remove(&delete.recref) {
pages_to_free.insert(FreedPage::new(update.record_page));
}
}
for (_, insert) in inserted_by_id.drain() {
if self.segs_dropped.contains(&insert.segment) {
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
self.inserted.push(insert);
}
}
for (_, update) in updated_by_id.drain() {
if self.segs_dropped.contains(&update.segment) {
pages_to_free.insert(FreedPage::new(update.record_page));
} else {
self.updated.push(update);
}
}
pages_to_free
}
fn coll_locks(&self) -> (Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>) {
let mut crt_upd_segs = Vec::new();
for create in &self.segs_created {
if !&self.segs_dropped.contains(create) && !self.locked_index_segs.contains(create) {
crt_upd_segs.push(*create);
}
}
for update in &self.segs_updated {
if !&self.segs_dropped.contains(update) && !self.locked_index_segs.contains(update) {
crt_upd_segs.push(*update);
}
}
let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
let mut records = HashSet::new();
for update in &self.updated {
let mut version = update.version;
if let Some(read_v) = self.read.get(&update.recref) {
version = read_v.version;
}
if !self.locked_index_pages.contains(&update.recref) {
records.insert((update.segment, update.recref, version));
}
}
for delete in &self.deleted {
let mut version = delete.version;
if let Some(read_v) = self.read.get(&delete.recref) {
version = read_v.version;
}
if !self.locked_index_pages.contains(&delete.recref) {
records.insert((delete.segment, delete.recref, version));
}
}
for insert in &self.inserted {
records.remove(&(insert.segment, insert.recref, 1));
}
let mut records: Vec<(SegmentId, RecRef, u16)> = records.into_iter().collect();
records.sort_by_key(|(_, x, _)| *x);
crt_upd_segs.sort();
dropped_segs.sort();
(records, crt_upd_segs, dropped_segs)
}
fn internal_rollback(&self, persy_impl: &PersyImpl) -> PERes<Vec<(SegmentId, u64)>> {
let allocator = persy_impl.allocator();
let address = persy_impl.address();
let dropped_segs: Vec<_> = self.segs_created.iter().copied().collect();
let address_to_free = address.rollback(&self.inserted)?;
for insert in &self.inserted {
if dropped_segs.contains(&insert.segment) {
allocator.free(insert.record_page)?;
}
}
for create in &self.segs_created {
address.drop_temp_segment(*create)?;
}
for update in &self.updated {
if dropped_segs.contains(&update.segment) {
allocator.free(update.record_page)?;
}
}
Ok(address_to_free)
}
pub fn recover_rollback(&self, persy_impl: &PersyImpl) -> PERes<()> {
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let address = persy_impl.address();
journal.end(&Rollback::new(), &self.id)?;
let address_to_free = self.internal_rollback(persy_impl)?;
journal.finished_to_clean(&[self.id.clone()])?;
if !address_to_free.is_empty() {
address.clear_empty(&address_to_free)?;
for (_, page) in address_to_free {
allocator.free(page)?;
}
}
Ok(())
}
pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
let journal = persy_impl.journal();
journal.end(&Rollback::new(), &self.id)?;
let address_to_free = self.internal_rollback(persy_impl)?;
journal.finished_to_clean(&[self.id.clone()])?;
self.free_address_structures(address_to_free, persy_impl)?;
Ok(())
}
pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
let indexes = persy_impl.indexes();
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let address = persy_impl.address();
journal.end(&Rollback::new(), &self.id)?;
let address_to_free = self.internal_rollback(persy_impl)?;
let mut indexes_to_unlock = self.locked_index_pages.clone();
if let Some((records, crt_upd_segs, delete_segs)) = &prepared.data_locks {
for rec in records {
indexes_to_unlock.remove(&rec.1);
}
address.release_locks(records.iter().map(|(_, id, _)| id), crt_upd_segs, delete_segs)?;
}
address.release_locks(
indexes_to_unlock.iter(),
&self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
&[],
)?;
if let Some(il) = &prepared.locked_indexes {
indexes.read_unlock_all(il)?;
}
self.free_address_structures(address_to_free, persy_impl)?;
if let Some(snapshot_id) = prepared.snapshot_id {
release_snapshot(snapshot_id, snapshots, allocator, journal)?;
}
Ok(())
}
pub fn recover_commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let address = persy_impl.address();
let address_to_free = self.internal_commit(persy_impl, true, &prepared)?;
journal.end(&Commit::new(), &self.id)?;
if !address_to_free.is_empty() {
address.clear_empty(&address_to_free)?;
for (_, page) in address_to_free {
allocator.recover_free(page)?;
}
}
self.recover_cleanup(persy_impl)
}
pub fn recover_cleanup(&self, persy_impl: &PersyImpl) -> PERes<()> {
let allocator = persy_impl.allocator();
if let Some(ref up_free) = self.freed_pages {
for to_free in up_free {
allocator.recover_free(to_free.page)?;
}
}
persy_impl.allocator().disc_sync()?;
Ok(())
}
fn free_pages_tx(
&self,
journal: &Journal,
pages_to_free: &[(SegmentId, u64)],
) -> PERes<(JournalId, Vec<FreedPage>)> {
let id = journal.start()?;
let mut freed = Vec::new();
for (_, page) in pages_to_free {
let fp = FreedPage::new(*page);
journal.log(&fp, &id)?;
freed.push(fp);
}
journal.log(&PrepareCommit::new(), &id)?;
journal.end(&Commit::new(), &id)?;
Ok((id, freed))
}
fn internal_commit(
&mut self,
persy_impl: &PersyImpl,
recover: bool,
prepared: &PreparedState,
) -> PERes<Vec<(SegmentId, u64)>> {
let indexes = persy_impl.indexes();
let address = persy_impl.address();
let pages_to_unlink = address.apply(
&self.segs_new_pages,
&self.inserted,
&self.updated,
&self.deleted,
&self.segments_operations,
recover,
)?;
let mut indexes_to_unlock = self.locked_index_pages.clone();
if let Some((records, crt_upd_segs, deleted_segs)) = &prepared.data_locks {
for rec in records {
indexes_to_unlock.remove(&rec.1);
}
address.release_locks(records.iter().map(|(_, id, _)| id), crt_upd_segs, deleted_segs)?;
}
address.release_locks(
indexes_to_unlock.iter(),
&self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
&[],
)?;
if let Some(il) = &prepared.locked_indexes {
indexes.read_unlock_all(il)?;
}
Ok(pages_to_unlink)
}
fn free_address_structures(&self, address_to_free: Vec<(SegmentId, u64)>, persy_impl: &PersyImpl) -> PERes<()> {
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let address = persy_impl.address();
if !address_to_free.is_empty() {
let (tx_id, freed) = self.free_pages_tx(journal, &address_to_free)?;
address.clear_empty(&address_to_free)?;
let add_snap_id = snapshots.snapshot(Vec::new(), freed, tx_id)?;
self.sync(persy_impl, add_snap_id)?;
release_snapshot(add_snap_id, snapshots, allocator, journal)?;
}
Ok(())
}
pub fn commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> Result<(), GenericError> {
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let address_to_free = self.internal_commit(persy_impl, false, &prepared)?;
journal.end(&Commit::new(), &self.id)?;
self.free_address_structures(address_to_free, persy_impl)?;
if let Some(snapshot_id) = prepared.snapshot_id {
release_snapshot(snapshot_id, snapshots, allocator, journal)?;
}
Ok(())
}
pub fn recover_metadata(&mut self, metadata: &Metadata) {
self.strategy = metadata.strategy.clone();
self.meta_id = metadata.meta_id.clone();
}
pub fn recover_freed_page(&mut self, freed: &FreedPage) {
self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
}
pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
self.segs_new_pages.push(new_page.clone());
}
pub fn meta_id(&self) -> &Vec<u8> {
&self.meta_id
}
pub fn filter_list<'a>(
&'a self,
pers: &'a [(String, SegmentId)],
) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
let inner = self.segments_operations.iter().filter_map(|seg| {
if let SegmentOperation::Create(c) = seg {
Some((c.name.as_str(), c.segment_id))
} else {
None
}
});
outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
}
pub fn lock_record(
&mut self,
address: &Address,
segment: SegmentId,
id: &RecRef,
version: u16,
) -> Result<bool, TimeoutError> {
let locked_page = if !self.locked_index_pages.contains(id) {
address.acquire_record_lock(id)?;
true
} else {
false
};
if address
.check_persistent_records(&[(segment, *id, version)], true)
.is_ok()
{
if locked_page {
self.locked_index_pages.insert(*id);
self.locked_index_tracking.insert((segment, *id, version));
}
Ok(true)
} else {
if locked_page {
address.release_record_lock(id)?;
}
Ok(false)
}
}
pub fn unlock_record(&mut self, address: &Address, _segment: SegmentId, id: &RecRef) -> PERes<()> {
assert!(self.locked_index_pages.remove(id));
address.release_record_lock(id)?;
Ok(())
}
}
impl DeleteRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, version: u16) -> DeleteRecord {
DeleteRecord {
segment,
recref: *rec_ref,
version,
}
}
}
impl UpdateRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
UpdateRecord {
segment,
recref: *rec_ref,
record_page: record,
version,
}
}
}
impl ReadRecord {
pub fn new(segment: SegmentId, recref: &RecRef, version: u16) -> ReadRecord {
ReadRecord {
segment,
recref: *recref,
version,
}
}
}
impl PrepareCommit {
pub fn new() -> PrepareCommit {
PrepareCommit {}
}
}
impl Commit {
pub fn new() -> Commit {
Commit {}
}
}
impl Rollback {
pub fn new() -> Rollback {
Rollback {}
}
}
impl Cleanup {
pub fn new() -> Cleanup {
Cleanup {}
}
}
impl InsertRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64) -> InsertRecord {
InsertRecord {
segment,
recref: *rec_ref,
record_page: record,
}
}
}
impl CreateSegment {
pub fn new(name: &str, segment_id: SegmentId, first_page: u64) -> CreateSegment {
CreateSegment {
name: name.into(),
segment_id,
first_page,
}
}
}
impl DropSegment {
pub fn new(name: &str, segment_id: SegmentId) -> DropSegment {
DropSegment {
name: name.into(),
segment_id,
}
}
}
impl Metadata {
pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
Metadata {
strategy: strategy.clone(),
meta_id,
}
}
}
impl FreedPage {
pub fn new(page: u64) -> FreedPage {
FreedPage { page }
}
}
impl NewSegmentPage {
pub fn new(segment: SegmentId, page: u64, previous: u64) -> NewSegmentPage {
NewSegmentPage {
segment,
page,
previous,
}
}
}
#[cfg(test)]
mod tests {
use super::{DeleteRecord, FreedPage, InsertRecord, TransactionImpl, UpdateRecord};
use crate::{
id::{RecRef, SegmentId},
journal::JournalId,
};
#[test]
fn test_scan_insert() {
let segment_id = SegmentId::new(10);
let segment_id_other = SegmentId::new(20);
let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
let mut count = 0;
for x in tx.scan_insert(segment_id) {
assert_eq!(x.pos, 2);
count += 1;
}
assert_eq!(count, 2);
}
#[test]
fn test_collapse() {
let segment_id = SegmentId::new(10);
let segment_id_other = SegmentId::new(20);
let mut tx = TransactionImpl::recover(JournalId::new(0, 0));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
tx.updated
.push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
tx.updated
.push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
let free = tx.collapse_operations();
assert_eq!(free.len(), 7);
for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
assert!(free.contains(&e));
}
assert_eq!(tx.inserted.len(), 2);
assert_eq!(tx.updated.len(), 2);
assert_eq!(tx.deleted.len(), 2);
}
}