use persy::{PRes, RecRef, PersyError};
use allocator::Allocator;
use std::sync::{Condvar, Mutex, RwLock, Arc};
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use transaction::{InsertRecord, UpdateRecord, DeleteRecord, SegmentOperation};
use config::Config;
use segment::{Segments, SegmentScanner, SegmentPage, SegmentPageRead};
pub const ADDRESS_PAGE_EXP: u8 = 10; pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - 2; pub const FLAG_EXISTS: u8 = 1;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_DATA_OFFSET: u32 = 20;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2;
pub struct Address {
config: Arc<Config>,
allocator: Arc<Allocator>,
record_locks: Mutex<HashMap<RecRef, Arc<Condvar>>>,
segment_locks: Mutex<HashMap<u32, SegmentLock>>,
segments: RwLock<Segments>,
}
struct SegmentLock {
write: bool,
read_count: u32,
cond: Arc<Condvar>,
}
impl SegmentLock {
fn new_write() -> SegmentLock {
SegmentLock {
write: true,
read_count: 0,
cond: Arc::new(Condvar::new()),
}
}
fn new_read() -> SegmentLock {
SegmentLock {
write: false,
read_count: 1,
cond: Arc::new(Condvar::new()),
}
}
fn inc_read(&mut self) {
self.read_count += 1;
}
fn dec_read(&mut self) -> bool {
self.read_count -= 1;
self.read_count == 0
}
}
impl Address {
pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
let segments = Segments::new(page, all)?;
Ok(Address {
config: config.clone(),
allocator: all.clone(),
record_locks: Mutex::new(HashMap::new()),
segment_locks: Mutex::new(HashMap::new()),
segments: RwLock::new(segments),
})
}
pub fn init(all: &Allocator) -> PRes<u64> {
let page = all.allocate(ADDRESS_PAGE_EXP)?;
Segments::init(page, all)?;
Ok(page)
}
pub fn scan<'a>(&'a self, segment: u32) -> PRes<SegmentScanner<'a>> {
let segments = self.segments.read()?;
if let Some(segment) = segments.segment_by_id(segment) {
Ok(SegmentScanner::<'a>::new(self, segment.first_page))
} else {
if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
Ok(SegmentScanner::<'a>::new(self, temp_segment.first_page))
} else {
Err(PersyError::SegmentNotFound)
}
}
}
pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
let _ = self.segments.read()?;
let mut page = self.allocator.load_page(cur_page)?;
page.segment_scan_entries()
}
pub fn allocate_temp(&self, segment: u32) -> PRes<RecRef> {
let mut segments = self.segments.write()?;
let found = segments.get_temp_segment_mut(segment);
if found.is_none() {
return Err(PersyError::SegmentNotFound);
}
found.unwrap().allocate_internal(&self.allocator)
}
pub fn create_temp_segment(&self, segment: &String) -> PRes<u32> {
self.segments.write()?.create_temp_segment(&self.allocator, segment)
}
pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
self.segments.write()?.drop_temp_segment(&self.allocator, segment)
}
pub fn allocate(&self, segment: u32) -> PRes<RecRef> {
let mut segments = self.segments.write()?;
let found = segments.segments.get_mut(&segment);
if found.is_none() {
return Err(PersyError::SegmentNotFound);
}
found.unwrap().allocate_internal(&self.allocator)
}
pub fn acquire_locks(&self, records: &Vec<(u32, RecRef, u16)>, created_updated: &Vec<u32>, deleted: &Vec<u32>, check_version: bool) -> PRes<()> {
for segment in deleted {
let seg_lock = SegmentLock::new_write();
loop {
let mut lock_manager = self.segment_locks.lock()?;
let cond = match lock_manager.entry(segment.clone()) {
Entry::Occupied(o) => o.get().cond.clone(),
Entry::Vacant(v) => {
v.insert(seg_lock);
break;
}
};
cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
}
}
for segment in created_updated {
loop {
let mut lock_manager = self.segment_locks.lock()?;
let cond;
match lock_manager.entry(segment.clone()) {
Entry::Occupied(mut o) => {
if o.get().write {
cond = o.get().cond.clone();
} else {
o.get_mut().inc_read();
break;
}
}
Entry::Vacant(v) => {
v.insert(SegmentLock::new_read());
break;
}
};
cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
}
}
for rec in records {
let cond = Arc::new(Condvar::new());
loop {
let mut lock_manager = self.record_locks.lock()?;
let cond = match lock_manager.entry(rec.1.clone()) {
Entry::Occupied(o) => o.get().clone(),
Entry::Vacant(v) => {
v.insert(cond);
break;
}
};
cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
}
}
{
let segs = self.segments.read()?;
for segment in created_updated {
if !segs.exists_real_or_temp(segment) {
return Err(PersyError::SegmentNotFound);
}
}
}
for &(segment, ref recref, version) in records {
let val = self.read(recref, segment)?;
if let Some((_, pers_version)) = val {
if check_version && pers_version != version {
return Err(PersyError::VersionNotLastest);
}
} else {
return Err(PersyError::RecordNotFound);
}
}
Ok(())
}
pub fn release_locks(&self, records: Vec<(u32, RecRef, u16)>, created_updated: &Vec<u32>, deleted: &Vec<u32>) -> PRes<()> {
for rec in records {
let mut lock_manager = self.record_locks.lock()?;
if let Some(cond) = lock_manager.remove(&rec.1) {
cond.notify_one();
}
}
for segment in created_updated {
let mut lock_manager = self.segment_locks.lock()?;
if let Entry::Occupied(mut lock) = lock_manager.entry(segment.clone()) {
if lock.get_mut().dec_read() {
let cond = lock.get().cond.clone();
lock.remove();
cond.notify_one();
}
}
}
for segment in deleted {
let mut lock_manager = self.segment_locks.lock()?;
if let Some(lock) = lock_manager.remove(segment) {
lock.cond.notify_one();
}
}
Ok(())
}
pub fn rollback(&self, _: &RecRef) -> PRes<()> {
Ok(())
}
pub fn apply(&self, inserts: &Vec<InsertRecord>, updates: &Vec<UpdateRecord>, deletes: &Vec<DeleteRecord>, seg_ops: &Vec<SegmentOperation>) -> PRes<()> {
let mut segments = self.segments.write()?;
let mut dropped = HashSet::new();
for seg_op in seg_ops {
match *seg_op {
SegmentOperation::CREATE(_) => {}
SegmentOperation::DROP(ref op) => {
dropped.insert(op.segment_id);
}
}
}
let mut segs = Vec::new();
let mut pages = HashMap::new();
for insert in inserts {
if !dropped.contains(&insert.segment) {
segs.push(&insert.segment);
match pages.entry(&insert.recref.page) {
Entry::Vacant(o) => {
let mut page = self.allocator.write_page(insert.recref.page)?;
page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
o.insert(page);
}
Entry::Occupied(mut o) => {
o.get_mut().segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
}
}
}
}
for update in updates {
if !dropped.contains(&update.segment) {
segs.push(&update.segment);
match pages.entry(&update.recref.page) {
Entry::Vacant(o) => {
let mut page = self.allocator.write_page(update.recref.page)?;
page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
o.insert(page);
}
Entry::Occupied(mut o) => {
o.get_mut().segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
}
}
}
}
for delete in deletes {
if !dropped.contains(&delete.segment) {
segs.push(&delete.segment);
match pages.entry(&delete.recref.page) {
Entry::Vacant(o) => {
let mut page = self.allocator.write_page(delete.recref.page)?;
page.segment_delete_entry(delete.segment, delete.recref.pos)?;
o.insert(page);
}
Entry::Occupied(mut o) => {
o.get_mut().segment_delete_entry(delete.segment, delete.recref.pos)?;
}
}
}
}
for (_, v) in &mut pages {
self.allocator.flush_page(v)?;
}
for seg_op in seg_ops {
match *seg_op {
SegmentOperation::CREATE(_) => {}
SegmentOperation::DROP(ref op) => {
segments.drop_segment(&self.allocator, &op.name)?;
}
}
}
for seg_op in seg_ops {
match *seg_op {
SegmentOperation::CREATE(ref op) => {
segments.create_segment(op.segment_id)?;
}
SegmentOperation::DROP(_) => {}
}
}
for seg in segs {
let mut segment = segments.segments.get_mut(seg).unwrap();
segment.persistent_page = segment.alloc_page;
segment.persistent_pos = segment.alloc_pos;
}
segments.flush_segments(&self.allocator)?;
Ok(())
}
pub fn exists_segment(&self, segment: &String) -> PRes<bool> {
Ok(self.segments.read()?.has_segment(segment))
}
pub fn segment_id(&self, segment: &String) -> PRes<Option<u32>> {
Ok(self.segments.read()?.segment_id(segment))
}
pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
let mut page = self.allocator.write_page(recref.page)?;
page.segment_insert_entry(segment_id, recref.pos, record_page)?;
self.allocator.flush_page(&mut page)?;
Ok(())
}
pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
let mut page = self.allocator.load_page(recref.page)?;
page.segment_read_entry(segment, recref.pos)
}
}
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::fs;
use discref::DiscRef;
use allocator::Allocator;
use address::Address;
use std::sync::Arc;
use config::Config;
use transaction::{InsertRecord, UpdateRecord, DeleteRecord, CreateSegment, SegmentOperation};
fn init_test_address(file_name: &str) -> (Address, u32) {
let file = OpenOptions::new().read(true).write(true).create(true).open(file_name).unwrap();
let config = Arc::new(Config::new());
let disc = DiscRef::new(file);
let pa = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &config, pa).unwrap();
let page = Address::init(&allocator).unwrap();
let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
let id = addr.create_temp_segment(&"def".into()).unwrap();
addr.segments.write().unwrap().create_segment(id).unwrap();
(addr, id)
}
#[test]
fn test_init_and_new_address() {
let (add, segment_id) = init_test_address("./addr_test");
fs::remove_file("./addr_test").unwrap();
assert_eq!(add.segments.read().unwrap().segment_by_id(segment_id).unwrap().alloc_page,
1536);
assert_eq!(add.segments.read().unwrap().segment_by_id(segment_id).unwrap().alloc_pos,
20);
}
#[test]
fn test_insert_update_delete_read_apply_pointer() {
let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
let recref = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let recref_1 = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let mut inserted = Vec::new();
let recref_2 = add.allocate(segment_id).unwrap();
inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
let mut updated = Vec::new();
updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 20, 1));
let mut deleted = Vec::new();
deleted.push(DeleteRecord::new(segment_id, &recref, 10, 1));
let mut seg_ops = Vec::new();
seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20)));
add.apply(&inserted, &updated, &deleted, &seg_ops).unwrap();
let read = add.read(&recref, segment_id).unwrap();
let read_1 = add.read(&recref_1, segment_id).unwrap();
let read_2 = add.read(&recref_2, segment_id).unwrap();
fs::remove_file("./addr_insert_update_delete_apply_test.persy").unwrap();
match read {
Some(_) => assert!(false),
None => assert!(true),
}
match read_1 {
Some(val) => assert_eq!(val.0, 40),
None => assert!(false),
}
match read_2 {
Some(val) => assert_eq!(val.0, 30),
None => assert!(false),
}
}
#[test]
fn test_insert_scan() {
let (add, segment_id) = init_test_address("./addr_scan_test.persy");
let recref = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, 10).unwrap();
let recref_1 = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref_1, 20).unwrap();
let to_iter = add.scan(segment_id).unwrap();
assert_eq!(to_iter.into_iter().count(), 2);
let mut iter = add.scan(segment_id).unwrap().into_iter();
let re = iter.next().unwrap();
assert_eq!(re.page, recref.page);
assert_eq!(re.pos, recref.pos);
let re_1 = iter.next().unwrap();
assert_eq!(re_1.page, recref_1.page);
assert_eq!(re_1.pos, recref_1.pos);
fs::remove_file("./addr_scan_test.persy").unwrap();
}
#[test]
fn test_insert_over_page() {
let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
for z in 0..1000 {
let recref = add.allocate(segment_id).unwrap();
add.insert(segment_id, &recref, z).unwrap();
}
let to_iter = add.scan(segment_id).unwrap();
assert_eq!(to_iter.into_iter().count(), 1000);
fs::remove_file("./addr_insert_over_page.persy").unwrap();
}
}