use crate::{
DropSegmentError, GenericError, PrepareError,
address::Address,
address::segment::Segment,
allocator::Allocator,
config::TxStrategy,
error::{IndexChangeError, PERes},
id::{IndexId, RecRef, SegmentId},
index::{
config::{IndexTypeInternal, ValueMode},
keeper_tx::IndexTransactionKeeper,
tree::nodes::Value,
},
journal::{
Journal, JournalId,
records::{
Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
PrepareCommit, ReadRecord, Rollback, RollbackPage, UpdateRecord,
},
},
persy::PersyImpl,
snapshot::data::{CleanInfo, SnapshotEntry},
snapshots::{SnapshotRef, Snapshots},
transaction::{index_locks::IndexDataLocks, iter::TransactionInsertScanner, locks::Locks},
};
use std::{
collections::{BTreeSet, HashMap, HashSet, hash_map::Entry},
ops::RangeBounds,
slice::from_ref,
sync::Arc,
time::Duration,
vec::IntoIter,
};
pub enum SegmentOperation {
Create(CreateSegment),
Drop(DropSegment),
}
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub(crate) struct CheckRecord {
pub(crate) segment_id: SegmentId,
pub(crate) record_id: RecRef,
pub(crate) version: u16,
}
impl CheckRecord {
pub(crate) fn new(segment_id: SegmentId, record_id: RecRef, version: u16) -> Self {
Self {
segment_id,
record_id,
version,
}
}
}
#[derive(Clone)]
pub struct PreparedState {
pub(crate) snapshot_ref: Option<SnapshotRef>,
pub(crate) data_locks: Option<Locks>,
entries: Option<Vec<SnapshotEntry>>,
freed_pages: Option<Vec<FreedPage>>,
}
impl PreparedState {
fn only_indexes(index: IndexDataLocks) -> Self {
let (index_record_locks, segments) = index.extract(&[]);
let data_locks = Some(Locks::new(
index_record_locks.into_iter().collect(),
Vec::new(),
segments.into_iter().collect(),
Vec::new(),
));
Self {
snapshot_ref: None,
data_locks,
entries: None,
freed_pages: None,
}
}
pub(crate) fn locked(data: Locks, index: IndexDataLocks) -> Self {
Self::all_int(None, data, index, None, None)
}
fn all(
snapshot_ref: SnapshotRef,
data: Locks,
index: IndexDataLocks,
entries: Vec<SnapshotEntry>,
freed_pages: Vec<FreedPage>,
) -> Self {
Self::all_int(Some(snapshot_ref), data, index, Some(entries), Some(freed_pages))
}
fn all_int(
snapshot_ref: Option<SnapshotRef>,
mut data: Locks,
index: IndexDataLocks,
entries: Option<Vec<SnapshotEntry>>,
freed_pages: Option<Vec<FreedPage>>,
) -> Self {
let (index_record_locks, segments) = index.extract(data.records());
data.add_records(index_record_locks.into_iter());
data.add_create_update_segments(segments.into_iter());
Self {
snapshot_ref,
data_locks: Some(data),
entries,
freed_pages,
}
}
#[cfg(test)]
pub(crate) fn leak(&mut self) {
if let Some(sref) = &mut self.snapshot_ref {
sref.leak();
}
}
}
pub enum SyncMode {
Sync,
BackgroundSync,
}
pub struct TransactionImpl {
strategy: TxStrategy,
sync_mode: SyncMode,
meta_id: Vec<u8>,
id: JournalId,
inserted: Vec<InsertRecord>,
updated: Vec<UpdateRecord>,
deleted: Vec<DeleteRecord>,
read: HashMap<RecRef, ReadRecord>,
segments_operations: Vec<SegmentOperation>,
segs_created_names: HashMap<String, SegmentId>,
segs_dropped_names: HashSet<String>,
segs_created: HashMap<SegmentId, Segment>,
segs_dropped: HashSet<SegmentId>,
segs_updated: HashSet<SegmentId>,
freed_pages: Option<Vec<FreedPage>>,
indexes: Option<IndexTransactionKeeper>,
segs_new_pages: Vec<NewSegmentPage>,
timeout: Duration,
}
pub enum TxRead {
Record((u64, u16)),
Deleted,
None,
}
pub enum TxSegCheck {
Created(SegmentId),
Dropped,
None,
}
impl TransactionImpl {
pub fn new(
journal: &Journal,
strategy: &TxStrategy,
sync_mode: SyncMode,
meta_id: Vec<u8>,
timeout: Duration,
) -> PERes<TransactionImpl> {
let id = journal.start()?;
journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
Ok(TransactionImpl {
strategy: strategy.clone(),
sync_mode,
meta_id,
id,
inserted: Vec::new(),
updated: Vec::new(),
deleted: Vec::new(),
read: HashMap::new(),
segments_operations: Vec::new(),
segs_created_names: HashMap::new(),
segs_dropped_names: HashSet::new(),
segs_created: HashMap::new(),
segs_dropped: HashSet::new(),
segs_updated: HashSet::new(),
freed_pages: None,
indexes: Some(IndexTransactionKeeper::new()),
segs_new_pages: Vec::new(),
timeout,
})
}
pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
self.segs_created.contains_key(&segment)
}
pub fn index_created_in_tx(&self, index: &IndexId) -> bool {
self.segment_created_in_tx(index.get_meta_id())
}
pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
for info in &self.segments_operations {
if let SegmentOperation::Create(c) = info {
if c.segment_id == segment {
return Some(c.name.clone());
}
}
}
None
}
pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
if self.segs_created_names.contains_key(segment) {
for a in &self.segments_operations {
if let SegmentOperation::Create(c) = a {
if c.name == segment {
return TxSegCheck::Created(c.segment_id);
}
}
}
} else if self.segs_dropped_names.contains(segment) {
return TxSegCheck::Dropped;
}
TxSegCheck::None
}
pub fn add_create_segment(&mut self, journal: &Journal, segment: Segment) -> PERes<()> {
let create = CreateSegment::new(segment.get_name(), segment.get_segment_id(), segment.get_first_page());
journal.log(&create, &self.id)?;
self.segments_operations.push(SegmentOperation::Create(create));
self.segs_created_names
.insert(segment.get_name().to_owned(), segment.get_segment_id());
self.segs_created.insert(segment.get_segment_id(), segment);
Ok(())
}
pub fn add_drop_segment(
&mut self,
journal: &Journal,
name: &str,
segment_id: SegmentId,
) -> Result<(), DropSegmentError> {
if self.segs_created_names.contains_key(name) {
Err(DropSegmentError::CannotDropSegmentCreatedInTx)
} else {
let drop = DropSegment::new(name, segment_id);
journal.log(&drop, &self.id)?;
self.segments_operations.push(SegmentOperation::Drop(drop));
self.segs_dropped.insert(segment_id);
self.segs_dropped_names.insert(name.into());
Ok(())
}
}
pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PERes<()> {
if self.strategy == TxStrategy::VersionOnRead {
let read = ReadRecord::new(segment, recref, version);
journal.log(&read, &self.id)?;
self.read.insert(*recref, read);
}
Ok(())
}
pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PERes<()> {
self.segs_updated.insert(segment);
let insert = InsertRecord::new(segment, rec_ref, record);
journal.log(&insert, &self.id)?;
self.inserted.push(insert);
Ok(())
}
pub fn add_new_segment_page(
&mut self,
journal: &Journal,
segment: SegmentId,
new_page: u64,
previous_page: u64,
) -> PERes<()> {
let new_page = NewSegmentPage::new(segment, new_page, previous_page);
journal.log(&new_page, &self.id)?;
self.segs_new_pages.push(new_page);
Ok(())
}
pub fn add_update(
&mut self,
journal: &Journal,
segment: SegmentId,
rec_ref: &RecRef,
record: u64,
version: u16,
) -> PERes<()> {
self.segs_updated.insert(segment);
let update = UpdateRecord::new(segment, rec_ref, record, version);
journal.log(&update, &self.id)?;
self.updated.push(update);
Ok(())
}
pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PERes<()> {
self.segs_updated.insert(segment);
let delete = DeleteRecord::new(segment, rec_ref, version);
journal.log(&delete, &self.id)?;
self.deleted.push(delete);
Ok(())
}
pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref mut indexes) = self.indexes {
indexes.put(index, k, v);
}
}
pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref mut indexes) = self.indexes {
indexes.remove(index, k, v);
}
}
pub fn apply_changes<K, V>(
&self,
vm: ValueMode,
index: IndexId,
index_name: &str,
k: &K,
pers: Option<Value<V>>,
) -> Result<Option<Value<V>>, IndexChangeError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(ref indexes) = self.indexes {
indexes.apply_changes(index, index_name, vm, k, pers)
} else {
Ok(pers)
}
}
pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
if let Some(ind) = &self.indexes {
ind.range::<K, V, R>(index, range)
} else {
None
}
}
pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
TransactionInsertScanner::new(self.inserted.clone(), seg)
}
pub fn read(&self, rec_ref: &RecRef) -> TxRead {
for ele in &self.deleted {
if ele.recref == *rec_ref {
return TxRead::Deleted;
}
}
if let Some(ele) = self.updated.iter().rev().find(|ele| ele.recref == *rec_ref) {
return TxRead::Record((ele.record_page, ele.version));
}
for ele in &self.inserted {
if ele.recref == *rec_ref {
return TxRead::Record((ele.record_page, 1));
}
}
TxRead::None
}
pub fn prepare(mut self, persy_impl: &PersyImpl) -> Result<(TransactionImpl, PreparedState), PrepareError> {
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let address = persy_impl.address();
let ind = self.indexes;
self.indexes = None;
let mut index_locks = IndexDataLocks::new(self.timeout);
if let Some(mut ind_change) = ind {
let changed_indexes = ind_change.changed_indexes();
for check in changed_indexes {
if self.segs_dropped.contains(&check.get_meta_id()) {
ind_change.remove_changes(&check);
}
}
let to_lock = ind_change.changed_indexes();
if let Err(x) = index_locks.read_lock_indexes(persy_impl, &to_lock) {
self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
return Err(PrepareError::from(x));
}
if let Err(x) = ind_change.apply(&mut index_locks, persy_impl, &mut self) {
self.rollback_prepared(persy_impl, PreparedState::only_indexes(index_locks))?;
return Err(PrepareError::from(x));
}
}
let mut freed_pages = self.collapse_operations();
let (locks, checks) = self.coll_locks(&index_locks);
if let Err(x) = address.acquire_locks(&locks, self.timeout) {
let prepared = PreparedState::only_indexes(index_locks);
self.rollback_prepared(persy_impl, prepared)?;
return Err(x);
};
let mut created = locks.created_segments_cloned();
created.retain(|x| !self.segs_dropped_names.contains(x));
let mut updated_segs = self.segs_updated.clone();
updated_segs.retain(|x| !self.segs_created.contains_key(x));
if let Err(x) = address.check_segments(&created, updated_segs.into_iter()) {
self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
return Err(x);
}
let check_version = self.strategy != TxStrategy::LastWin;
let old_records = match address.check_persistent_records(&checks, check_version) {
Ok(old) => old,
Err(x) => {
self.rollback_prepared(persy_impl, PreparedState::locked(locks, index_locks))?;
return Err(x);
}
};
for dropped_seg in &self.segs_dropped {
let pages = address.collect_segment_pages(*dropped_seg)?;
for p in pages.into_iter().map(FreedPage::new) {
freed_pages.insert(p);
}
}
let mut snapshot_entries = Vec::with_capacity(old_records.len());
for old_record in &old_records {
freed_pages.insert(FreedPage::new(old_record.record_page));
snapshot_entries.push(SnapshotEntry::change(
&old_record.recref,
old_record.record_page,
old_record.version,
));
let rollback = RollbackPage::new(old_record.segment, &old_record.recref, old_record.record_page);
journal.log(&rollback, &self.id)?;
}
for insert in &self.inserted {
snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
}
for freed_page in &freed_pages {
journal.log(freed_page, &self.id)?;
}
let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
freed_pages_vec.reverse();
let snapshot_ref = snapshots.new_snapshot();
self.freed_pages = Some(freed_pages_vec.clone());
journal.prepare(&PrepareCommit::new(), &self.id)?;
self.sync(persy_impl, &snapshot_ref)?;
Ok((
self,
PreparedState::all(snapshot_ref, locks, index_locks, snapshot_entries, freed_pages_vec),
))
}
fn sync(&self, persy_impl: &PersyImpl, snapshot_ref: &SnapshotRef) -> PERes<()> {
persy_impl.transaction_sync(&self.sync_mode, snapshot_ref)
}
fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
let mut pages_to_free = BTreeSet::new();
let mut inserted_by_id = HashMap::new();
for insert in self.inserted.drain(..) {
inserted_by_id.insert(insert.recref, insert);
}
let mut updated_by_id = HashMap::new();
for update in self.updated.drain(..) {
match updated_by_id.entry(update.recref) {
Entry::Vacant(e) => {
e.insert(update);
}
Entry::Occupied(mut e) => {
pages_to_free.insert(FreedPage::new(e.get().record_page));
e.get_mut().record_page = update.record_page;
}
}
}
for (k, insert) in &mut inserted_by_id {
if let Some(update) = updated_by_id.remove(k) {
pages_to_free.insert(FreedPage::new(insert.record_page));
insert.record_page = update.record_page;
}
}
let mut i = 0;
while i != self.deleted.len() {
if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
self.deleted.remove(i);
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
i += 1;
}
}
for delete in &self.deleted {
if let Some(update) = updated_by_id.remove(&delete.recref) {
pages_to_free.insert(FreedPage::new(update.record_page));
}
}
for (_, insert) in inserted_by_id.drain() {
if self.segs_dropped.contains(&insert.segment) {
pages_to_free.insert(FreedPage::new(insert.record_page));
} else {
self.inserted.push(insert);
}
}
for (_, update) in updated_by_id.drain() {
if self.segs_dropped.contains(&update.segment) {
pages_to_free.insert(FreedPage::new(update.record_page));
} else {
self.updated.push(update);
}
}
pages_to_free
}
fn coll_locks(&self, index_locks: &IndexDataLocks) -> (Locks, Vec<CheckRecord>) {
let mut crt_upd_segs = Vec::new();
for create in self.segs_created.keys() {
if !&self.segs_dropped.contains(create) && !index_locks.is_segment_locked(create) {
crt_upd_segs.push(*create);
}
}
for update in &self.segs_updated {
if !&self.segs_dropped.contains(update) && !index_locks.is_segment_locked(update) {
crt_upd_segs.push(*update);
}
}
let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
let mut check_records = HashSet::new();
let mut lock_records = HashSet::new();
let insert_ids = self.inserted.iter().map(|i| i.recref).collect::<HashSet<_>>();
for update in &self.updated {
let mut version = update.version;
if let Some(read_v) = self.read.get(&update.recref) {
version = read_v.version;
}
if !insert_ids.contains(&update.recref) {
if !index_locks.is_record_locked(&update.recref) {
lock_records.insert(update.recref);
}
check_records.insert(CheckRecord::new(update.segment, update.recref, version));
}
}
for delete in &self.deleted {
let mut version = delete.version;
if let Some(read_v) = self.read.get(&delete.recref) {
version = read_v.version;
}
if !insert_ids.contains(&delete.recref) {
if !index_locks.is_record_locked(&delete.recref) {
lock_records.insert(delete.recref);
}
check_records.insert(CheckRecord::new(delete.segment, delete.recref, version));
}
}
let mut records: Vec<RecRef> = lock_records.into_iter().collect();
records.sort();
crt_upd_segs.sort();
dropped_segs.sort();
let created = self.segs_created_names.keys().cloned().collect();
(
Locks::new(records, created, crt_upd_segs, dropped_segs),
check_records.into_iter().collect(),
)
}
fn internal_rollback(&self, persy_impl: &PersyImpl) -> PERes<(Vec<(SegmentId, u64)>, Vec<u64>)> {
let address = persy_impl.address();
let dropped_segs: Vec<_> = self.segs_created.keys().copied().collect();
let address_to_free = address.rollback(&self.inserted)?;
let mut free_pages = Vec::new();
for insert in &self.inserted {
if dropped_segs.contains(&insert.segment) {
free_pages.push(insert.record_page);
}
}
for page in &self.segs_new_pages {
if self.segs_created.contains_key(&page.segment) {
free_pages.push(page.page);
}
}
for update in &self.updated {
if dropped_segs.contains(&update.segment) {
free_pages.push(update.record_page);
}
}
Ok((address_to_free, free_pages))
}
pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PERes<()> {
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
let allocator = persy_impl.allocator();
journal.end(&Rollback::new(), &self.id)?;
let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
allocator.free_pages(&free_pages)?;
journal.finished_to_clean(from_ref(&self.id))?;
let _add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(Vec::new(), address_to_free), self.id.clone());
Ok(())
}
pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PERes<()> {
let journal = persy_impl.journal();
let address = persy_impl.address();
let snapshots = persy_impl.snapshots();
let allocator = persy_impl.allocator();
journal.end(&Rollback::new(), &self.id)?;
let (address_to_free, free_pages) = self.internal_rollback(persy_impl)?;
allocator.free_pages(&free_pages)?;
if let Some(locks) = &prepared.data_locks {
address.release_locks(locks);
}
journal.finished_to_clean(from_ref(&self.id))?;
let infos = CleanInfo::new(Vec::new(), address_to_free);
if let Some(snapshot_ref) = prepared.snapshot_ref {
snapshots.fill_snapshot_clean_info(&snapshot_ref, infos);
}
Ok(())
}
fn internal_commit(&mut self, persy_impl: &PersyImpl, prepared: &PreparedState) -> PERes<Vec<(SegmentId, u64)>> {
let address = persy_impl.address();
let pages_to_unlink = address.apply(
&self.inserted,
&self.updated,
&self.deleted,
&self.segments_operations,
&mut self.segs_created,
)?;
if let Some(locks) = &prepared.data_locks {
address.release_locks(locks);
}
Ok(pages_to_unlink)
}
pub(crate) fn free_address_structures_impl(
journal: &Journal,
snapshots: &Arc<Snapshots>,
address: &Address,
allocator: &Allocator,
address_to_free: &[(SegmentId, u64)],
) -> PERes<()> {
if !address_to_free.is_empty() {
let new_pages = address.clear_empty(address_to_free)?;
let tx_id = journal.start()?;
let mut freed = Vec::new();
for (_, page) in address_to_free {
let fp = FreedPage::new(*page);
journal.log(&fp, &tx_id)?;
freed.push(fp);
}
for (segment, page) in new_pages {
journal.log(&NewSegmentPage::new(segment, page, 0), &tx_id)?;
}
journal.end(&PrepareCommit::new(), &tx_id)?;
journal.end(&Commit::new(), &tx_id)?;
let add_snap_id = snapshots.snapshot(Vec::new(), CleanInfo::new(freed, Vec::new()), tx_id);
if let Some(pc) = snapshots.pending_clean(add_snap_id.id()) {
allocator.to_release_next_sync(pc);
}
}
Ok(())
}
pub fn commit(&mut self, persy_impl: &PersyImpl, mut prepared: PreparedState) -> Result<SnapshotRef, GenericError> {
let allocator = persy_impl.allocator();
let journal = persy_impl.journal();
let snapshots = persy_impl.snapshots();
if let (Some(entries), Some(snapshot_ref)) = (prepared.entries.take(), &prepared.snapshot_ref) {
snapshots.fill_snapshot_address(snapshot_ref, entries, self.id.clone())
}
let address_to_free = self.internal_commit(persy_impl, &prepared)?;
if let (Some(freed_pages), Some(snapshot_ref)) = (prepared.freed_pages.take(), &prepared.snapshot_ref) {
let infos = CleanInfo::new(freed_pages, address_to_free);
snapshots.fill_snapshot_clean_info(snapshot_ref, infos);
}
if !self.segments_operations.is_empty() {
if let Some(snapshot_ref) = &prepared.snapshot_ref {
self.sync(persy_impl, snapshot_ref)?;
}
}
journal.end(&Commit::new(), &self.id)?;
let snapshot_ref = prepared.snapshot_ref.take().expect("everytime has snapshot");
if let Some(pc) = snapshots.pending_clean(snapshot_ref.id()) {
allocator.to_release_next_sync(pc);
}
Ok(snapshot_ref)
}
pub fn meta_id(&self) -> &Vec<u8> {
&self.meta_id
}
pub fn id(&self) -> &JournalId {
&self.id
}
pub fn filter_list<'a>(
&'a self,
pers: &'a [(String, SegmentId)],
) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
let inner = self.segments_operations.iter().filter_map(|seg| {
if let SegmentOperation::Create(c) = seg {
Some((c.name.as_str(), c.segment_id))
} else {
None
}
});
outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
}
pub fn filter_list_snap<'a>(
&'a self,
pers: &'a [(String, SegmentId, u64)],
) -> impl Iterator<Item = (&'a str, SegmentId, u64)> + 'a {
let outer = pers.iter().map(|(name, id, d)| (name.as_str(), *id, *d));
let inner = self.segments_operations.iter().filter_map(|seg| {
if let SegmentOperation::Create(c) = seg {
Some((c.name.as_str(), c.segment_id, c.first_page))
} else {
None
}
});
outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
}
pub(crate) fn get_segment_mut(&mut self, segment_id: SegmentId) -> Option<&mut Segment> {
self.segs_created.get_mut(&segment_id)
}
pub(crate) fn get_segment(&self, segment_id: SegmentId) -> Option<&Segment> {
self.segs_created.get(&segment_id)
}
pub(crate) fn get_timeout(&self) -> Duration {
self.timeout
}
}