use crate::{
RecoverStatus,
address::segment::Segment,
config::TxStrategy,
error::PERes,
id::{RecRef, SegmentId},
journal::{
JournalId,
records::{
Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata,
NewSegmentPage, ReadRecord, Rollback, RollbackPage, UpdateRecord,
},
recover_impl::PagesUseTracking,
},
persy::{CommitStatus, PersyImpl},
transaction::tx_impl::SegmentOperation,
};
use std::collections::{HashMap, HashSet, hash_map::Entry};
#[cfg(test)]
use crate::transaction::iter::TransactionInsertScanner;
pub struct RecoverTransaction {
strategy: TxStrategy,
meta_id: Vec<u8>,
id: JournalId,
inserted: Vec<InsertRecord>,
updated: Vec<UpdateRecord>,
deleted: Vec<DeleteRecord>,
read: HashMap<RecRef, ReadRecord>,
segments_operations: Vec<SegmentOperation>,
segs_created: HashMap<SegmentId, Segment>,
segs_dropped: HashSet<SegmentId>,
segs_updated: HashSet<SegmentId>,
freed_pages: Option<Vec<FreedPage>>,
rollback_pages: Option<Vec<RollbackPage>>,
segs_new_pages: Vec<NewSegmentPage>,
recover_status: RecoverStatus,
reached_prepared: bool,
}
impl std::fmt::Display for RecoverTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "id:{:?} state:{:?}", self.id, self.recover_status)
}
}
impl std::fmt::Debug for RecoverTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "id:{:?} state:{:?}", self.id, self.recover_status)
}
}
impl RecoverTransaction {
pub fn recover(id: JournalId, recover_status: RecoverStatus) -> Self {
RecoverTransaction {
strategy: TxStrategy::LastWin,
meta_id: Vec::new(),
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created: HashMap::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
rollback_pages: None,
segs_new_pages: Vec::new(),
recover_status,
reached_prepared: false,
}
}
pub fn id(&self) -> &JournalId {
&self.id
}
pub(crate) fn final_recover(
&mut self,
persy: &PersyImpl,
chosen: Option<CommitStatus>,
page_use: &mut PagesUseTracking,
) -> PERes<bool> {
match self.recover_status {
RecoverStatus::PrepareCommit => match chosen {
Some(CommitStatus::Commit) | Option::None => {
self.recover_prepared(persy, page_use)?;
Ok(true)
}
Some(CommitStatus::Rollback) => {
self.recover_rollback(persy, page_use)?;
Ok(true)
}
},
RecoverStatus::Commit => {
self.recover_commit_cleanup(persy, page_use)?;
Ok(true)
}
RecoverStatus::Cleanup => Ok(false),
RecoverStatus::Started | RecoverStatus::Rollback => {
self.recover_rollback(persy, page_use)?;
Ok(true)
}
RecoverStatus::Partial => Ok(false),
}
}
pub fn recover_add(&mut self, create: &CreateSegment) {
self.segments_operations.push(SegmentOperation::Create(create.clone()));
let segment = Segment::new_allocation(create.first_page, create.segment_id, &create.name);
self.segs_created.insert(create.segment_id, segment);
}
pub fn recover_status(&self) -> &RecoverStatus {
&self.recover_status
}
pub fn recover_drop(&mut self, drop: &DropSegment) {
self.segments_operations.push(SegmentOperation::Drop(drop.clone()));
self.segs_dropped.insert(drop.segment_id);
}
pub fn recover_read(&mut self, read: &ReadRecord) {
self.read.insert(read.recref, read.clone());
}
pub fn recover_insert(&mut self, insert: &InsertRecord) {
self.segs_updated.insert(insert.segment);
self.inserted.push(insert.clone());
}
pub fn recover_update(&mut self, update: &UpdateRecord) {
self.segs_updated.insert(update.segment);
self.updated.push(update.clone());
}
pub fn recover_delete(&mut self, delete: &DeleteRecord) {
self.segs_updated.insert(delete.segment);
self.deleted.push(delete.clone());
}
pub fn recover_metadata(&mut self, metadata: &Metadata) {
self.strategy = metadata.strategy.clone();
self.meta_id = metadata.meta_id.clone();
}
pub fn recover_rollback_page(&mut self, rollback_page: &RollbackPage) {
self.rollback_pages
.get_or_insert(Vec::new())
.push(rollback_page.clone());
}
pub fn recover_freed_page(&mut self, freed: &FreedPage) {
self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
}
pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
self.segs_new_pages.push(new_page.clone());
}
pub fn meta_id(&self) -> &Vec<u8> {
&self.meta_id
}
pub(crate) fn recover_commit_cleanup(
&mut self,
persy_impl: &PersyImpl,
page_use: &mut PagesUseTracking,
) -> PERes<()> {
let address = persy_impl.address();
for page in &self.segs_new_pages {
page_use.used_page(page.page);
}
address.recover_segment_operations(&self.segments_operations, &mut self.segs_created, &self.segs_new_pages)?;
if let Some(ref up_free) = self.freed_pages {
for to_free in up_free {
page_use.freed_page(to_free.page);
}
}
let delete_pages = self
.deleted
.iter()
.map(|r| (r.segment, r.recref.page))
.collect::<Vec<_>>();
let to_free = address.recover_remove_pages(&delete_pages)?;
for page in to_free {
page_use.freed_page(page);
}
persy_impl.allocator().device_sync()?;
Ok(())
}
pub(crate) fn recover_prepared(&mut self, persy_impl: &PersyImpl, page_use: &mut PagesUseTracking) -> PERes<()> {
let address = persy_impl.address();
let journal = persy_impl.journal();
for page in &self.segs_new_pages {
page_use.used_page(page.page);
}
self.collapse_operations(page_use);
let crt_upd_segs = self.coll_segments();
if address
.recover_allocations(&crt_upd_segs, &mut self.segs_created)
.is_err()
{
self.recover_rollback(persy_impl, page_use)?;
} else {
let pages_to_unlink = address.apply_recover(
&self.segs_new_pages,
&self.inserted,
&self.updated,
&self.deleted,
&self.segments_operations,
&mut self.segs_created,
)?;
for (_, to_free) in pages_to_unlink {
page_use.freed_page(to_free);
}
journal.end(&Commit::new(), &self.id)?;
address.recover_segment_operations(
&self.segments_operations,
&mut self.segs_created,
&self.segs_new_pages,
)?;
let delete_pages = self
.deleted
.iter()
.map(|r| (r.segment, r.recref.page))
.collect::<Vec<_>>();
let to_free = address.recover_remove_pages(&delete_pages)?;
for page in to_free {
page_use.freed_page(page);
}
}
Ok(())
}
pub fn log_cleanup(&self, persy_impl: &PersyImpl) -> PERes<()> {
let journal = persy_impl.journal();
journal.end(&Cleanup::new(), &self.id)?;
Ok(())
}
fn coll_segments(&self) -> Vec<SegmentId> {
let mut crt_upd_segs = Vec::new();
for create in self.segs_created.keys() {
if !&self.segs_dropped.contains(create) {
crt_upd_segs.push(*create);
}
}
for update in &self.segs_updated {
if !&self.segs_dropped.contains(update) {
crt_upd_segs.push(*update);
}
}
crt_upd_segs.sort();
crt_upd_segs
}
pub(crate) fn recover_rollback(&self, persy_impl: &PersyImpl, page_use: &mut PagesUseTracking) -> PERes<()> {
let journal = persy_impl.journal();
let address = persy_impl.address();
journal.end(&Rollback::new(), &self.id)?;
let dropped_segs: Vec<_> = self.segs_created.keys().copied().collect();
let address_to_free = address.rollback(&self.inserted)?;
for insert in &self.inserted {
if dropped_segs.contains(&insert.segment) {
page_use.freed_page(insert.record_page);
}
}
for page in &self.segs_new_pages {
if self.segs_created.contains_key(&page.segment) {
page_use.freed_page(page.page);
}
}
for update in &self.updated {
if dropped_segs.contains(&update.segment) {
page_use.freed_page(update.record_page);
}
}
if self.reached_prepared {
if let Some(rollback_pages) = &self.rollback_pages {
address.recover_rollback(rollback_pages)?;
for rbp in rollback_pages {
page_use.used_page(rbp.record_page);
}
}
}
let to_free = address.recover_remove_pages(&address_to_free)?;
for page in to_free {
page_use.freed_page(page);
}
Ok(())
}
pub(crate) fn remove_free_pages(&mut self, pages: Vec<u64>) {
if let Some(fp) = &mut self.freed_pages {
fp.retain(|x| !pages.contains(&x.page));
}
}
fn collapse_operations(&mut self, page_use: &mut PagesUseTracking) {
let mut inserted_by_id = HashMap::new();
for insert in self.inserted.drain(..) {
inserted_by_id.insert(insert.recref, insert);
}
let mut updated_by_id = HashMap::new();
for update in self.updated.drain(..) {
match updated_by_id.entry(update.recref) {
Entry::Vacant(e) => {
e.insert(update);
}
Entry::Occupied(mut e) => {
page_use.freed_page(e.get().record_page);
e.get_mut().record_page = update.record_page;
}
}
}
for (k, insert) in &mut inserted_by_id {
if let Some(update) = updated_by_id.remove(k) {
page_use.freed_page(insert.record_page);
insert.record_page = update.record_page;
}
}
let mut i = 0;
while i != self.deleted.len() {
if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
self.deleted.remove(i);
page_use.freed_page(insert.record_page);
} else {
i += 1;
}
}
for delete in &self.deleted {
if let Some(update) = updated_by_id.remove(&delete.recref) {
page_use.freed_page(update.record_page);
}
}
for (_, insert) in inserted_by_id.drain() {
if self.segs_dropped.contains(&insert.segment) {
page_use.freed_page(insert.record_page);
} else {
page_use.used_page(insert.record_page);
self.inserted.push(insert);
}
}
for (_, update) in updated_by_id.drain() {
if self.segs_dropped.contains(&update.segment) {
page_use.freed_page(update.record_page);
} else {
page_use.used_page(update.record_page);
self.updated.push(update);
}
}
}
pub(crate) fn recover_prepare_commit(&mut self) {
self.set_status(RecoverStatus::PrepareCommit);
self.reached_prepared = true;
}
pub(crate) fn recover_commit(&mut self) {
self.set_status(RecoverStatus::Commit);
}
pub(crate) fn recover_cleanup(&mut self) {
self.set_status(RecoverStatus::Cleanup);
}
pub(crate) fn recover_rollback_log(&mut self) {
self.set_status(RecoverStatus::Rollback);
}
#[cfg(test)]
pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
TransactionInsertScanner::new(self.inserted.clone(), seg)
}
fn set_status(&mut self, status: RecoverStatus) {
if self.recover_status != RecoverStatus::Partial {
self.recover_status = status;
}
}
}
#[cfg(test)]
mod tests {
use crate::{
RecoverStatus,
id::{RecRef, SegmentId},
journal::{
JournalId,
records::{DeleteRecord, InsertRecord, UpdateRecord},
recover_impl::PagesUseTracking,
},
transaction::recover_tx::RecoverTransaction,
};
#[test]
fn test_scan_insert() {
let segment_id = SegmentId::new(10);
let segment_id_other = SegmentId::new(20);
let mut tx = RecoverTransaction::recover(JournalId::new(0, 0), RecoverStatus::Started);
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
let mut count = 0;
for x in tx.scan_insert(segment_id) {
assert_eq!(x.pos, 2);
count += 1;
}
assert_eq!(count, 2);
}
#[test]
fn test_collapse() {
let segment_id = SegmentId::new(10);
let segment_id_other = SegmentId::new(20);
let mut tx = RecoverTransaction::recover(JournalId::new(0, 0), RecoverStatus::Started);
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
tx.inserted
.push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
tx.updated
.push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
tx.updated
.push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
let mut page_use = PagesUseTracking::default();
tx.collapse_operations(&mut page_use);
assert_eq!(page_use.freed_pages().len(), 7);
for e in [1, 2, 3, 5, 6, 8, 10].iter() {
assert!(page_use.freed_pages().contains(&e));
}
assert_eq!(tx.inserted.len(), 2);
assert_eq!(tx.updated.len(), 2);
assert_eq!(tx.deleted.len(), 2);
}
}