use crate::{
allocator::Allocator,
config::Config,
discref::{Page, PageOps, PAGE_METADATA_SIZE},
error::{PRes, PersyError},
id::{PersyId, RecRef},
locks::{LockManager, RwLockManager},
segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{Arc, RwLock},
};
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; pub const ADDRESS_PAGE_EXP: u8 = 10; pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; pub const FLAG_EXISTS: u8 = 0b000_0001;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_PAGE_ENTRY_COUNT_OFFSET: u32 = 20;
pub const SEGMENT_DATA_OFFSET: u32 = 21;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2;
pub struct OldRecordInfo {
pub recref: RecRef,
pub segment: u32,
pub record_page: u64,
pub version: u16,
}
impl OldRecordInfo {
fn new(recref: &RecRef, segment: u32, record_page: u64, version: u16) -> OldRecordInfo {
OldRecordInfo {
recref: recref.clone(),
segment,
record_page,
version,
}
}
}
pub struct Address {
config: Arc<Config>,
allocator: Arc<Allocator>,
record_locks: LockManager<RecRef>,
segment_locks: RwLockManager<u32>,
segments: RwLock<Segments>,
}
impl Address {
pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
let segments = Segments::new(page, all)?;
Ok(Address {
config: config.clone(),
allocator: all.clone(),
record_locks: Default::default(),
segment_locks: Default::default(),
segments: RwLock::new(segments),
})
}
pub fn init(all: &Allocator) -> PRes<u64> {
let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
let page_index = page.get_index();
Segments::init(page, all)?;
Ok(page_index)
}
pub fn scan(&self, segment: u32) -> PRes<SegmentPageIterator> {
let segments = self.segments.read()?;
if let Some(segment) = segments.segment_by_id(segment) {
Ok(SegmentPageIterator::new(segment.first_page))
} else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
Ok(SegmentPageIterator::new(temp_segment.first_page))
} else {
Err(PersyError::SegmentNotFound)
}
}
pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
let _lock = self.segments.read()?;
let mut page = self.allocator.load_page(cur_page)?;
page.segment_scan_entries()
}
pub fn allocate_temp(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
found.allocate_internal(&self.allocator)
} else {
Err(PersyError::SegmentNotFound)
}
}
pub fn create_temp_segment(&self, segment: &str) -> PRes<(u32, u64)> {
self.segments.write()?.create_temp_segment(&self.allocator, segment)
}
pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
self.segments.write()?.drop_temp_segment(&self.allocator, segment)
}
pub fn allocate(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
found.allocate_internal(&self.allocator)
} else {
Err(PersyError::SegmentNotFound)
}
}
pub fn acquire_locks(&self, records: &[(u32, RecRef, u16)], created_updated: &[u32], deleted: &[u32]) -> PRes<()> {
let timeout = self.config.transaction_lock_timeout();
self.segment_locks.lock_all_write(&deleted, timeout.clone())?;
if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout.clone()) {
if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
dbg!("unlock error: {}", e);
}
return Err(x);
}
let to_lock: Vec<_> = records.iter().map(|(_, id, _)| id.clone()).collect();
if let Err(x) = self.record_locks.lock_all(&to_lock, timeout.clone()) {
if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
dbg!("unlock error: {}", e);
}
return Err(x);
}
let segs = self.segments.read()?;
for segment in created_updated {
if !segs.exists_real_or_temp(*segment) {
if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
dbg!("unlock error: {}", e);
}
if let Err(e) = self.record_locks.unlock_all(&to_lock) {
dbg!("unlock error: {}", e);
}
return Err(PersyError::SegmentNotFound);
}
}
Ok(())
}
pub fn acquire_segment_read_lock(&self, segment: u32) -> PRes<()> {
let timeout = self.config.transaction_lock_timeout();
self.segment_locks.lock_all_read(&[segment], timeout.clone())?;
Ok(())
}
pub fn acquire_record_lock(&self, id: &RecRef) -> PRes<()> {
let timeout = self.config.transaction_lock_timeout();
self.record_locks.lock_all(&[id.clone()], timeout.clone())?;
Ok(())
}
pub fn release_segment_read_lock(&self, segment: u32) -> PRes<()> {
self.segment_locks.unlock_all_read(&[segment])?;
Ok(())
}
pub fn release_record_lock(&self, id: &RecRef) -> PRes<()> {
self.record_locks.unlock_all(&[id.clone()])?;
Ok(())
}
pub fn confirm_allocations(&self, segs: &[u32], recover: bool) -> PRes<()> {
let mut segments = self.segments.write()?;
segments.confirm_allocations(&segs, &self.allocator, recover)?;
Ok(())
}
pub fn check_persistent_records(
&self,
records: &[(u32, RecRef, u16)],
check_version: bool,
) -> PRes<Vec<OldRecordInfo>> {
let mut current_record_pages = Vec::with_capacity(records.len());
for &(segment, ref recref, version) in records {
let val = self.read(recref, segment)?;
if let Some((record, pers_version)) = val {
current_record_pages.push(OldRecordInfo::new(&recref, segment, record, pers_version));
if check_version && pers_version != version {
return Err(PersyError::VersionNotLastest);
}
} else {
return Err(PersyError::RecordNotFound(PersyId(recref.clone())));
}
}
Ok(current_record_pages)
}
pub fn release_locks<'a>(
&self,
records: impl Iterator<Item = &'a RecRef>,
created_updated: &[u32],
deleted: &[u32],
) -> PRes<()> {
self.record_locks.unlock_all_iter(records)?;
self.segment_locks.unlock_all_read(&created_updated)?;
self.segment_locks.unlock_all_write(&deleted)?;
Ok(())
}
pub fn rollback(&self, _: &RecRef) -> PRes<()> {
Ok(())
}
pub fn apply(
&self,
segs_new_pages: &[NewSegmentPage],
inserts: &[InsertRecord],
updates: &[UpdateRecord],
deletes: &[DeleteRecord],
seg_ops: &[SegmentOperation],
recover: bool,
) -> PRes<Vec<(u32, u64)>> {
let mut segments = self.segments.write()?;
let mut dropped = HashSet::new();
for seg_op in seg_ops {
if let SegmentOperation::DROP(ref op) = *seg_op {
dropped.insert(op.segment_id);
}
}
let mut pages = HashMap::new();
if recover {
for new_page in segs_new_pages {
let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
p_page.set_next(new_page.page)?;
let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
n_page.set_prev(new_page.previous)?;
n_page.set_segment_id(new_page.segment)?;
}
}
for insert in inserts {
if !dropped.contains(&insert.segment) {
let page = insert.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
}
}
for update in updates {
if !dropped.contains(&update.segment) {
let page = update.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
}
}
let mut pages_to_remove = Vec::new();
for delete in deletes {
if !dropped.contains(&delete.segment) {
let page = delete.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
if seg_page.segment_delete_entry(delete.segment, delete.recref.pos)? {
pages_to_remove.push((delete.segment, page));
}
}
}
if recover {
for (_, mut to_flush) in pages.into_iter() {
to_flush.recalc_count()?;
self.allocator.flush_page(to_flush)?;
}
let recover_page = |record_page: u64| {
let page = self.allocator.load_page(record_page)?;
self.allocator.remove_from_free(record_page, page.get_size_exp())
};
let mut segs = HashSet::new();
for insert in inserts {
recover_page(insert.record_page)?;
segs.insert(insert.segment);
}
for update in updates {
recover_page(update.record_page)?;
segs.insert(update.segment);
}
for delete in deletes {
segs.insert(delete.segment);
}
segments.confirm_allocations(&segs.into_iter().collect::<Vec<_>>(), &self.allocator, true)?;
} else {
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
}
for seg_op in seg_ops {
if let SegmentOperation::DROP(ref op) = *seg_op {
segments.drop_segment(&op.name)?;
}
}
for seg_op in seg_ops {
if let SegmentOperation::CREATE(ref op) = *seg_op {
segments.create_segment(op.segment_id, op.first_page)?;
}
}
segments.flush_segments(&self.allocator)?;
Ok(pages_to_remove)
}
pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
let segments = self.segments.read()?;
segments.collect_segment_pages(allocator, segment)
}
#[allow(dead_code)]
pub fn clear_empty(&self, empty: Vec<(u32, u64)>) -> PRes<()> {
let mut segments = self.segments.write()?;
for (segment, page) in empty {
let mut p = self.allocator.write_page(page)?;
let next = p.get_next()?;
let prev = p.get_prev()?;
if next != 0 {
let mut next_page = self.allocator.write_page(next)?;
next_page.set_prev(prev)?;
self.allocator.flush_page(next_page)?;
}
if prev != 0 {
let mut prev_page = self.allocator.write_page(prev)?;
prev_page.set_next(next)?;
self.allocator.flush_page(prev_page)?;
} else if next != 0 {
segments.set_first_page(segment, next, &self.allocator)?;
} else {
segments.reset(segment, &self.allocator)?;
}
self.allocator.flush_page(p)?;
self.allocator.free(page)?;
}
Ok(())
}
pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
Ok(self.segments.read()?.has_segment(segment))
}
pub fn segment_id(&self, segment: &str) -> PRes<Option<u32>> {
Ok(self.segments.read()?.segment_id(segment))
}
pub fn segment_name_by_id(&self, segment: u32) -> PRes<Option<String>> {
Ok(self.segments.read()?.segment_name_by_id(segment))
}
#[allow(dead_code)]
pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
let mut page = self.allocator.write_page(recref.page)?;
page.segment_insert_entry(segment_id, recref.pos, record_page)?;
self.allocator.flush_page(page)?;
Ok(())
}
pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
let mut page = self.allocator.load_page(recref.page)?;
page.segment_read_entry(segment, recref.pos)
}
fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PRes<&'a mut Page> {
Ok(match map.entry(k) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
})
}
pub fn list(&self) -> PRes<Vec<(String, u32)>> {
Ok(self.segments.read()?.list())
}
pub fn snapshot_list(&self) -> PRes<Vec<(String, u32, u64)>> {
Ok(self.segments.read()?.snapshot_list())
}
}
#[cfg(test)]
mod tests {
use super::Address;
use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
use crate::{allocator::Allocator, config::Config, discref::DiscRef};
use std::sync::Arc;
use tempfile::Builder;
fn init_test_address(file_name: &str) -> (Address, u32) {
let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
let config = Arc::new(Config::new());
let disc = DiscRef::new(file.reopen().unwrap());
let pa = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &config, pa).unwrap();
let page = Address::init(&allocator).unwrap();
let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
let (id, fp) = addr.create_temp_segment("def").unwrap();
addr.segments.write().unwrap().create_segment(id, fp).unwrap();
(addr, id)
}
#[test]
fn test_init_and_new_address() {
let (add, segment_id) = init_test_address("./addr_test");
assert_eq!(
add.segments
.read()
.unwrap()
.segment_by_id(segment_id)
.unwrap()
.alloc_page,
1088
);
assert_eq!(
add.segments
.read()
.unwrap()
.segment_by_id(segment_id)
.unwrap()
.alloc_pos,
21
);
}
#[test]
fn test_insert_update_delete_read_apply_pointer() {
let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let (recref_1, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let mut inserted = Vec::new();
let (recref_2, _) = add.allocate(segment_id).unwrap();
inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
let mut updated = Vec::new();
updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));
let mut deleted = Vec::new();
deleted.push(DeleteRecord::new(segment_id, &recref, 1));
let mut seg_ops = Vec::new();
seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20, 20)));
add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
let read = add.read(&recref, segment_id).unwrap();
let read_1 = add.read(&recref_1, segment_id).unwrap();
let read_2 = add.read(&recref_2, segment_id).unwrap();
match read {
Some(_) => assert!(false),
None => assert!(true),
}
match read_1 {
Some(val) => assert_eq!(val.0, 40),
None => assert!(false),
}
match read_2 {
Some(val) => assert_eq!(val.0, 30),
None => assert!(false),
}
}
#[test]
fn test_insert_scan() {
let (add, segment_id) = init_test_address("./addr_scan_test.persy");
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let (recref_1, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let mut to_iter = add.scan(segment_id).unwrap();
let mut count = 0;
while to_iter.next(&add).is_some() {
count += 1;
}
assert_eq!(count, 2);
let mut iter = add.scan(segment_id).unwrap();
let re = iter.next(&add).unwrap();
assert_eq!(re.page, recref.page);
assert_eq!(re.pos, recref.pos);
let re_1 = iter.next(&add).unwrap();
assert_eq!(re_1.page, recref_1.page);
assert_eq!(re_1.pos, recref_1.pos);
}
#[test]
fn test_insert_over_page() {
let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
for z in 0..1000 {
let (recref, _) = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, z).unwrap();
}
let mut to_iter = add.scan(segment_id).unwrap();
let mut count = 0;
while to_iter.next(&add).is_some() {
count += 1;
}
assert_eq!(count, 1000);
}
}