use crate::{
allocator::Allocator,
config::Config,
discref::{Page, PageOps, PAGE_METADATA_SIZE},
error::{GenericError, PERes, ReadError, SegmentError, TimeoutError},
id::{PersyId, RecRef, SegmentId},
locks::{LockManager, RwLockManager},
segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
transaction_impl::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
PrepareError,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{Arc, RwLock},
};
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; pub const ADDRESS_PAGE_EXP: u8 = 10; pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; pub const ADDRESS_PAGE_ENTRY_COUNT: u32 = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
pub const FLAG_EXISTS: u8 = 0b000_0001;
pub const FLAG_DELETED: u8 = 0b000_0010;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
pub const SEGMENT_DATA_OFFSET: u32 = 26;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2;
pub struct OldRecordInfo {
pub recref: RecRef,
pub segment: SegmentId,
pub record_page: u64,
pub version: u16,
}
impl OldRecordInfo {
fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
OldRecordInfo {
recref: *recref,
segment,
record_page,
version,
}
}
}
pub struct Address {
config: Arc<Config>,
allocator: Arc<Allocator>,
record_locks: LockManager<RecRef>,
segment_locks: RwLockManager<SegmentId>,
segments: RwLock<Segments>,
}
impl Address {
pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PERes<Address> {
let segments = Segments::new(page, all)?;
Ok(Address {
config: config.clone(),
allocator: all.clone(),
record_locks: Default::default(),
segment_locks: Default::default(),
segments: RwLock::new(segments),
})
}
pub fn init(all: &Allocator) -> PERes<u64> {
let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
let page_index = page.get_index();
Segments::init(page, all)?;
Ok(page_index)
}
pub fn scan(&self, segment: SegmentId) -> Result<SegmentPageIterator, SegmentError> {
let segments = self.segments.read().map_err(GenericError::from)?;
if let Some(segment) = segments.segment_by_id(segment) {
Ok(SegmentPageIterator::new(segment.first_page))
} else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
Ok(SegmentPageIterator::new(temp_segment.first_page))
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn scan_page_all(&self, cur_page: u64) -> PERes<(u64, [(u32, bool); ADDRESS_PAGE_ENTRY_COUNT as usize])> {
let _lock = self.segments.read()?;
let mut page = self.allocator.load_page(cur_page)?;
Ok(page.segment_scan_all_entries())
}
pub fn allocate_temp(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
if let Some(found) = self
.segments
.write()
.map_err(GenericError::from)?
.get_temp_segment_mut(segment)
{
Ok(found.allocate_internal(&self.allocator)?)
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn create_temp_segment(&self, segment: &str) -> PERes<(SegmentId, u64)> {
self.segments.write()?.create_temp_segment(&self.allocator, segment)
}
pub fn drop_temp_segment(&self, segment: SegmentId) -> PERes<()> {
self.segments.write()?.drop_temp_segment(&self.allocator, segment)
}
pub fn allocate(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
if let Some(found) = self
.segments
.write()
.map_err(GenericError::from)?
.segments
.get_mut(&segment)
{
Ok(found.allocate_internal(&self.allocator)?)
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn acquire_locks(
&self,
records: &[(SegmentId, RecRef, u16)],
created_updated: &[SegmentId],
deleted: &[SegmentId],
) -> Result<(), PrepareError> {
let timeout = *self.config.transaction_lock_timeout();
self.segment_locks.lock_all_write(deleted, timeout)?;
if let Err(x) = self.segment_locks.lock_all_read(created_updated, timeout) {
if let Err(e) = self.segment_locks.unlock_all_write(deleted) {
dbg!("unlock error: {}", e);
}
return Err(PrepareError::from(x));
}
let to_lock: Vec<_> = records.iter().map(|(_, id, _)| *id).collect();
if let Err(x) = self.record_locks.lock_all(&to_lock, timeout) {
if let Err(e) = self.segment_locks.unlock_all_write(deleted) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.segment_locks.unlock_all_read(created_updated) {
dbg!("unlock error: {}", e);
}
return Err(PrepareError::from(x));
}
let segs = self.segments.read()?;
for segment in created_updated {
if !segs.exists_real_or_temp(*segment) {
if let Err(e) = self.segment_locks.unlock_all_write(deleted) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.segment_locks.unlock_all_read(created_updated) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.record_locks.unlock_all(&to_lock) {
dbg!("unlock error: {}", e);
}
return Err(PrepareError::SegmentNotFound);
}
}
Ok(())
}
pub fn acquire_segment_read_lock(&self, segment: SegmentId) -> Result<(), TimeoutError> {
let timeout = *self.config.transaction_lock_timeout();
self.segment_locks.lock_all_read(&[segment], timeout)?;
Ok(())
}
pub fn acquire_record_lock(&self, id: &RecRef) -> Result<(), TimeoutError> {
let timeout = *self.config.transaction_lock_timeout();
self.record_locks.lock_all(&[*id], timeout)?;
Ok(())
}
pub fn release_segment_read_lock(&self, segment: SegmentId) -> PERes<()> {
self.segment_locks.unlock_all_read(&[segment])?;
Ok(())
}
pub fn release_record_lock(&self, id: &RecRef) -> PERes<()> {
self.record_locks.unlock_all(&[*id])?;
Ok(())
}
pub fn confirm_allocations(&self, segs: &[SegmentId], recover: bool) -> PERes<()> {
let mut segments = self.segments.write()?;
segments.confirm_allocations(segs, &self.allocator, recover)?;
Ok(())
}
pub fn check_persistent_records(
&self,
records: &[(SegmentId, RecRef, u16)],
check_version: bool,
) -> Result<Vec<OldRecordInfo>, PrepareError> {
let mut current_record_pages = Vec::with_capacity(records.len());
for &(segment, ref recref, version) in records {
let val = self.read(recref, segment)?;
if let Some((record, pers_version)) = val {
current_record_pages.push(OldRecordInfo::new(recref, segment, record, pers_version));
if check_version && pers_version != version {
return Err(PrepareError::VersionNotLastest);
}
} else {
return Err(PrepareError::RecordNotFound(PersyId(*recref)));
}
}
Ok(current_record_pages)
}
pub fn release_locks<'a>(
&self,
records: impl Iterator<Item = &'a RecRef>,
created_updated: &[SegmentId],
deleted: &[SegmentId],
) -> PERes<()> {
self.record_locks.unlock_all_iter(records)?;
self.segment_locks.unlock_all_read(created_updated)?;
self.segment_locks.unlock_all_write(deleted)?;
Ok(())
}
pub fn rollback(&self, inserts: &[InsertRecord]) -> PERes<Vec<(SegmentId, u64)>> {
let segments = self.segments.write()?;
let mut pages_to_remove = Vec::new();
let mut pages = HashMap::new();
for insert in inserts {
if segments.segments.contains_key(&insert.segment) {
let page = insert.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
if seg_page.segment_delete_entry(insert.segment, insert.recref.pos) && seg_page.get_next()? != 0 {
pages_to_remove.push((insert.segment, page));
}
}
}
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
Ok(pages_to_remove)
}
pub fn apply(
&self,
segs_new_pages: &[NewSegmentPage],
inserts: &[InsertRecord],
updates: &[UpdateRecord],
deletes: &[DeleteRecord],
seg_ops: &[SegmentOperation],
recover: bool,
) -> PERes<Vec<(SegmentId, u64)>> {
let mut segments = self.segments.write()?;
let mut dropped = HashSet::new();
for seg_op in seg_ops {
if let SegmentOperation::Drop(ref op) = *seg_op {
dropped.insert(op.segment_id);
}
}
let mut pages = HashMap::new();
if recover {
for new_page in segs_new_pages {
let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
p_page.set_next(new_page.page)?;
let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
n_page.set_prev(new_page.previous)?;
n_page.set_segment_id(new_page.segment)?;
}
}
for insert in inserts {
if !dropped.contains(&insert.segment) {
let page = insert.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page);
}
}
for update in updates {
if !dropped.contains(&update.segment) {
let page = update.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page);
}
}
let mut pages_to_remove = Vec::new();
for delete in deletes {
if !dropped.contains(&delete.segment) {
let page = delete.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
if seg_page.segment_delete_entry(delete.segment, delete.recref.pos) {
if seg_page.get_next()? != 0 {
pages_to_remove.push((delete.segment, page));
}
}
}
}
if recover {
for (_, mut to_flush) in pages.into_iter() {
to_flush.recalc_count()?;
self.allocator.flush_page(to_flush)?;
}
let recover_page = |record_page: u64| {
let page = self.allocator.load_page(record_page)?;
self.allocator.remove_from_free(record_page, page.get_size_exp())
};
let mut segs = HashSet::new();
for insert in inserts {
recover_page(insert.record_page)?;
segs.insert(insert.segment);
}
for update in updates {
recover_page(update.record_page)?;
segs.insert(update.segment);
}
for delete in deletes {
segs.insert(delete.segment);
}
segments.confirm_allocations(&segs.into_iter().collect::<Vec<_>>(), &self.allocator, true)?;
} else {
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
}
for seg_op in seg_ops {
if let SegmentOperation::Drop(ref op) = *seg_op {
segments.drop_segment(&op.name);
}
}
for seg_op in seg_ops {
if let SegmentOperation::Create(ref op) = *seg_op {
segments.create_segment(op.segment_id, op.first_page);
}
}
segments.flush_segments(&self.allocator)?;
Ok(pages_to_remove)
}
pub fn collect_segment_pages(&self, segment: SegmentId) -> PERes<Vec<u64>> {
let segments = self.segments.read()?;
segments.collect_segment_pages(&self.allocator, segment)
}
pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PERes<()> {
let mut segments = self.segments.write()?;
let mut flush_segments = false;
for (segment, page) in empty {
let mut p = self.allocator.load_page(*page)?;
let next = p.get_next()?;
let prev = p.get_prev()?;
debug_assert!(next != 0);
let mut next_page = self.allocator.write_page(next)?;
next_page.set_prev(prev)?;
self.allocator.flush_page(next_page)?;
if prev != 0 {
let mut prev_page = self.allocator.write_page(prev)?;
prev_page.set_next(next)?;
self.allocator.flush_page(prev_page)?;
} else if next != 0 {
segments.set_first_page(*segment, next)?;
flush_segments = true;
}
}
if flush_segments {
segments.flush_segments(&self.allocator)?;
}
Ok(())
}
pub fn exists_segment(&self, segment: &str) -> PERes<bool> {
Ok(self.segments.read()?.has_segment(segment))
}
pub fn segment_id(&self, segment: &str) -> PERes<Option<SegmentId>> {
Ok(self.segments.read()?.segment_id(segment))
}
pub fn segment_name_by_id(&self, segment: SegmentId) -> PERes<Option<String>> {
Ok(self.segments.read()?.segment_name_by_id(segment))
}
#[allow(dead_code)]
pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PERes<()> {
let mut page = self.allocator.write_page(recref.page)?;
page.segment_insert_entry(segment_id, recref.pos, record_page);
self.allocator.flush_page(page)?;
Ok(())
}
pub fn read(&self, recref: &RecRef, segment: SegmentId) -> Result<Option<(u64, u16)>, ReadError> {
if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
if recref.pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
return Err(ReadError::InvalidPersyId(*recref));
}
Ok(page.segment_read_entry(segment, recref.pos))
} else {
Ok(None)
}
}
fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PERes<&'a mut Page> {
Ok(match map.entry(k) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
})
}
pub fn list(&self) -> PERes<Vec<(String, SegmentId)>> {
Ok(self.segments.read()?.list())
}
pub fn snapshot_list(&self) -> PERes<Vec<(String, SegmentId, u64)>> {
Ok(self.segments.read()?.snapshot_list())
}
}
#[cfg(test)]
mod tests {
use super::Address;
use crate::id::SegmentId;
use crate::transaction_impl::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
use crate::{allocator::Allocator, config::Config, discref::DiscRef};
use std::sync::Arc;
use tempfile::Builder;
fn init_test_address(file_name: &str) -> (Address, SegmentId) {
let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
let config = Arc::new(Config::new());
let disc = Box::new(DiscRef::new(file.reopen().unwrap()).unwrap());
let (_, allocator) = Allocator::init(disc, &config).unwrap();
let page = Address::init(&allocator).unwrap();
let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
let (id, fp) = addr.create_temp_segment("def").unwrap();
addr.segments.write().unwrap().create_segment(id, fp);
(addr, id)
}
#[test]
fn test_init_and_new_address() {
let (add, segment_id) = init_test_address("./addr_test");
assert_eq!(
add.segments
.read()
.unwrap()
.segment_by_id(segment_id)
.unwrap()
.last_page,
1088
);
assert_eq!(
add.segments.read().unwrap().segment_by_id(segment_id).unwrap().last_pos,
26
);
}
#[test]
fn test_insert_update_delete_read_apply_pointer() {
let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let (recref_1, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let mut inserted = Vec::new();
let (recref_2, _) = add.allocate(segment_id).unwrap();
inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
let mut updated = Vec::new();
updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));
let mut deleted = Vec::new();
deleted.push(DeleteRecord::new(segment_id, &recref, 1));
let mut seg_ops = Vec::new();
seg_ops.push(SegmentOperation::Create(CreateSegment::new(
"def",
SegmentId::new(20),
20,
)));
add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
let read = add.read(&recref, segment_id).unwrap();
let read_1 = add.read(&recref_1, segment_id).unwrap();
let read_2 = add.read(&recref_2, segment_id).unwrap();
match read {
Some(_) => assert!(false),
None => assert!(true),
}
match read_1 {
Some(val) => assert_eq!(val.0, 40),
None => assert!(false),
}
match read_2 {
Some(val) => assert_eq!(val.0, 30),
None => assert!(false),
}
}
#[test]
fn test_insert_scan() {
let (add, segment_id) = init_test_address("./addr_scan_test.persy");
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let (recref_1, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let mut to_iter = add.scan(segment_id).unwrap();
let mut count = 0;
while to_iter.next(&add).is_some() {
count += 1;
}
assert_eq!(count, 2);
let mut iter = add.scan(segment_id).unwrap();
let re = iter.next(&add).unwrap();
assert_eq!(re.page, recref.page);
assert_eq!(re.pos, recref.pos);
let re_1 = iter.next(&add).unwrap();
assert_eq!(re_1.page, recref_1.page);
assert_eq!(re_1.pos, recref_1.pos);
}
#[test]
fn test_insert_over_page() {
let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
for z in 0..1000 {
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, z).unwrap();
}
let mut to_iter = add.scan(segment_id).unwrap();
let mut count = 0;
while to_iter.next(&add).is_some() {
count += 1;
}
assert_eq!(count, 1000);
}
}