extern crate rand;
use persy::{PRes, RecRef, PersyError};
use allocator::Allocator;
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
use std::sync::Arc;
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hasher, Hash};
use address::{Address, ADDRESS_PAGE_EXP, ADDRESS_ENTRY_SIZE, FLAG_EXISTS, ADDRESS_PAGE_SIZE, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET};
use std::io::{Write, Read};
use discref::PageSeek;
use std::vec;
use std::str;
const SEGMENT_CONTENT_OFFSET: u32 = 8;
const SEGMENT_NEXT_PAGE_OFFSET: u32 = 0;
pub struct Segment {
pub first_page: u64,
pub persistent_page: u64,
pub persistent_pos: u32,
pub alloc_page: u64,
pub alloc_pos: u32,
pub segment_id: u32,
pub name: String,
}
impl Segment {
pub fn new(first_page: u64, persistent_page: u64, persistent_pos: u32, alloc_page: u64, alloc_pos: u32, segment_id: u32, name: &String) -> Segment {
Segment {
first_page: first_page,
persistent_page: persistent_page,
persistent_pos: persistent_pos,
alloc_page: alloc_page,
alloc_pos: alloc_pos,
segment_id: segment_id,
name: name.clone(),
}
}
pub fn allocate_internal(&mut self, allocator: &Arc<Allocator>) -> PRes<RecRef> {
let page = self.alloc_page;
let pos = self.alloc_pos;
let new_pos = pos + ADDRESS_ENTRY_SIZE;
if new_pos > ADDRESS_PAGE_SIZE {
let new_page = allocator.allocate(ADDRESS_PAGE_EXP)?;
let mut pg = allocator.write_page(page)?;
pg.write_u64::<BigEndian>(new_page)?;
allocator.flush_page(&mut pg)?;
let mut new_pg = allocator.write_page(new_page)?;
new_pg.write_u64::<BigEndian>(0)?;
new_pg.write_u64::<BigEndian>(page)?;
new_pg.write_u32::<BigEndian>(self.segment_id)?;
allocator.flush_page(&mut new_pg)?;
self.alloc_page = new_page;
self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
return Ok(RecRef {
page: self.alloc_page,
pos: SEGMENT_DATA_OFFSET,
});
}
self.alloc_pos = new_pos;
Ok(RecRef {
page: page,
pos: pos,
})
}
pub fn free_segment_pages(&self, allocator: &Arc<Allocator>) -> PRes<()> {
let page = self.first_page;
let last = self.persistent_page;
loop {
let mut pag = allocator.write_page(page)?;
let next = pag.read_u64::<BigEndian>()?;
pag.seek(SEGMENT_HASH_OFFSET)?;
pag.write_u32::<BigEndian>(0)?;
let mut pos = SEGMENT_DATA_OFFSET;
loop {
pag.seek(pos)?;
let data_page = pag.read_u64::<BigEndian>()?;
let flag = pag.read_u8()?;
if flag & FLAG_EXISTS == 1 {
allocator.free(data_page)?;
}
pos += ADDRESS_ENTRY_SIZE;
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
break;
}
}
allocator.flush_page(&mut pag)?;
allocator.free(page)?;
if next == last {
allocator.free(page)?;
break;
}
}
Ok(())
}
}
pub trait SegmentPageRead {
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
}
pub trait SegmentPage: SegmentPageRead {
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<()>;
}
impl<T: ReadBytesExt + PageSeek> SegmentPageRead for T {
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Ok(None);
}
self.seek(pos)?;
let record = self.read_u64::<BigEndian>()?;
let flag = self.read_u8()?;
let version = self.read_u16::<BigEndian>()?;
if flag & FLAG_EXISTS == 0 {
Ok(None)
} else {
Ok(Some((record, version)))
}
}
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
let next_page = self.read_u64::<BigEndian>()?;
let mut pos = SEGMENT_DATA_OFFSET;
let mut recs = Vec::new();
loop {
self.seek(pos + 8)?;
let flag = self.read_u8()?;
if flag & FLAG_EXISTS == 1 {
recs.push(pos);
}
pos += ADDRESS_ENTRY_SIZE;
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
break;
}
}
Ok((next_page, recs))
}
}
fn inc_version(mut version: u16) -> u16 {
version += 1;
if version == 0 { 1 } else { version }
}
impl<T: WriteBytesExt + ReadBytesExt + PageSeek> SegmentPage for T {
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::SegmentNotFound);
}
self.seek(pos)?;
self.write_u64::<BigEndian>(record_page)?;
self.write_u8(FLAG_EXISTS)?;
self.write_u16::<BigEndian>(1)?;
Ok(())
}
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::RecordNotFound);
}
self.seek(pos + 9)?;
let version = self.read_u16::<BigEndian>()?;
self.seek(pos)?;
self.write_u64::<BigEndian>(record_page)?;
self.seek(pos + 9)?;
self.write_u16::<BigEndian>(inc_version(version))?;
Ok(())
}
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<()> {
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::RecordNotFound);
}
self.seek(pos + 8)?;
let flag = self.read_u8()?;
self.seek(pos + 8)?;
self.write_u8(flag ^ FLAG_EXISTS)?;
Ok(())
}
}
pub struct Segments {
pub root_page: u64,
pub segments: HashMap<u32, Segment>,
pub segments_id: HashMap<String, u32>,
pub temp_segments: HashMap<u32, Segment>,
pub temp_segments_id: HashMap<String, u32>,
}
pub fn segment_hash(segment: &String) -> u32 {
let mut val: u32;
let ref mut hasher = DefaultHasher::new();
segment.hash(hasher);
val = hasher.finish() as u32;
val = val << 16;
val |= rand::random::<u16>() as u32;
val
}
impl Segments {
pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
let mut root = allocator.load_page(root_page)?;
root.seek(SEGMENT_CONTENT_OFFSET)?;
let mut segments = HashMap::new();
let mut segments_id = HashMap::new();
loop {
let flag = root.read_u8()?;
if flag == 1 {
let first_page = root.read_u64::<BigEndian>()?;
let persistent_page = root.read_u64::<BigEndian>()?;
let persistent_pos = root.read_u32::<BigEndian>()?;
let pers_hash = root.read_u32::<BigEndian>()?;
let name_size = root.read_u16::<BigEndian>()? as usize;
let mut slice: Vec<u8> = vec![0;name_size ];
root.read_exact(&mut slice)?;
let name: String = str::from_utf8(&slice[0..name_size])?.into();
segments.insert(pers_hash,
Segment::new(first_page,
persistent_page,
persistent_pos,
persistent_page,
persistent_pos,
pers_hash,
&name));
segments_id.insert(name, pers_hash);
} else {
root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
let next_page = root.read_u64::<BigEndian>()?;
if next_page != 0 {
root = allocator.load_page(next_page)?;
root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
} else {
break;
}
}
}
Ok(Segments {
root_page: root_page,
segments: segments,
segments_id: segments_id,
temp_segments: HashMap::new(),
temp_segments_id: HashMap::new(),
})
}
pub fn init(root_page: u64, allocator: &Allocator) -> PRes<()> {
let mut root = allocator.write_page(root_page)?;
root.write_u64::<BigEndian>(root_page)?;
root.write_u8(0)?;
Ok(())
}
pub fn segment_id(&self, segment: &String) -> Option<u32> {
if let Some(id) = self.segments_id.get(segment) {
self.segments.get(id).map(|x| x.segment_id)
} else {
None
}
}
pub fn segment_by_id<'a>(&'a self, id: u32) -> Option<&'a Segment> {
self.segments.get(&id)
}
pub fn segment_by_id_temp<'a>(&'a self, id: u32) -> Option<&'a Segment> {
self.temp_segments.get(&id)
}
pub fn has_segment(&self, segment: &String) -> bool {
self.segments_id.contains_key(segment)
}
pub fn create_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: &String) -> PRes<u32> {
let allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
let segment_id = segment_hash(&segment);
let seg = Segment::new(allocated,
allocated,
SEGMENT_DATA_OFFSET,
allocated,
SEGMENT_DATA_OFFSET,
segment_id,
segment);
self.temp_segments.insert(segment_id, seg);
self.temp_segments_id.insert(segment.clone(), segment_id);
let mut pag = allocator.write_page(allocated)?;
pag.write_u64::<BigEndian>(0)?;
pag.write_u64::<BigEndian>(0)?;
pag.write_u32::<BigEndian>(segment_id)?;
allocator.flush_page(&mut pag)?;
Ok(segment_id)
}
pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
self.temp_segments.get_mut(&segment)
}
pub fn drop_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: u32) -> PRes<()> {
let rem_seg = self.temp_segments.remove(&segment);
if let Some(segment) = rem_seg {
self.temp_segments_id.remove(&segment.name);
segment.free_segment_pages(allocator)?;
}
Ok(())
}
pub fn exists_real_or_temp(&self, segment: &u32) -> bool {
self.segments.contains_key(segment) || self.temp_segments.contains_key(segment)
}
pub fn create_segment(&mut self, segment: u32) -> PRes<()> {
let seg = self.temp_segments.remove(&segment);
if let Some(s) = seg {
self.temp_segments_id.remove(&s.name);
self.segments_id.insert(s.name.clone(), s.segment_id);
self.segments.insert(s.segment_id, s);
}
Ok(())
}
pub fn drop_segment(&mut self, allocator: &Arc<Allocator>, segment: &String) -> PRes<()> {
if let Some(seg) = self.segments_id.remove(segment) {
let rem_seg = self.segments.remove(&seg);
if let Some(segment) = rem_seg {
segment.free_segment_pages(allocator)?;
}
}
Ok(())
}
pub fn flush_segments(&self, allocator: &Arc<Allocator>) -> PRes<()> {
let mut root = allocator.write_page(self.root_page)?;
root.seek(SEGMENT_CONTENT_OFFSET)?;
let mut cursor = 0;
for (_, segment) in &self.segments {
let mut buffer = Vec::<u8>::new();
buffer.write_u64::<BigEndian>(segment.first_page)?;
buffer.write_u64::<BigEndian>(segment.persistent_page)?;
buffer.write_u32::<BigEndian>(segment.persistent_pos)?;
buffer.write_u32::<BigEndian>(segment.segment_id)?;
buffer.write_u16::<BigEndian>(segment.name.len() as u16)?;
buffer.write_all(segment.name.as_bytes())?;
if cursor < ADDRESS_PAGE_SIZE {
root.write_u8(1)?;
root.write_all(&*buffer)?;
cursor += buffer.len() as u32;
} else {
let next = allocator.allocate(ADDRESS_PAGE_EXP)?;
root.write_u8(0)?;
root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
root.write_u64::<BigEndian>(next)?;
allocator.flush_page(&mut root)?;
root = allocator.write_page(next)?;
root.seek(SEGMENT_CONTENT_OFFSET)?;
}
}
root.write_u8(0)?;
root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
root.write_u64::<BigEndian>(0)?;
allocator.flush_page(&mut root)?;
Ok(())
}
}
pub struct SegmentScanner<'a> {
address: &'a Address,
page: u64,
}
impl<'a> SegmentScanner<'a> {
pub fn new<'b>(address: &'b Address, page: u64) -> SegmentScanner<'b> {
SegmentScanner::<'b> {
address: address,
page: page,
}
}
}
pub struct SegmentIterator<'a> {
address: &'a Address,
cur_page: u64,
next_page: u64,
per_page_iterator: vec::IntoIter<u32>,
}
impl<'a> IntoIterator for SegmentScanner<'a> {
type Item = RecRef;
type IntoIter = SegmentIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
let vc = Vec::new();
let vci = vc.into_iter();
SegmentIterator::<'a> {
address: self.address,
cur_page: self.page,
next_page: self.page,
per_page_iterator: vci,
}
}
}
impl<'a> Iterator for SegmentIterator<'a> {
type Item = RecRef;
fn next(&mut self) -> Option<RecRef> {
let mut iter;
loop {
iter = self.per_page_iterator.next();
if iter.is_none() && self.next_page != 0 {
self.cur_page = self.next_page;
let res = self.address.scan_page(self.cur_page);
if !res.is_err() {
let tp = res.unwrap();
self.next_page = tp.0;
self.per_page_iterator = tp.1.into_iter();
} else {
break;
}
} else {
break;
}
}
if let Some(pos) = iter {
return Some(RecRef::new(self.cur_page, pos));
} else {
return None;
}
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::fs::OpenOptions;
use std::sync::Arc;
use std::io;
use discref::{Page, DiscRef};
use allocator::Allocator;
use config::Config;
use address::ADDRESS_PAGE_EXP;
use super::{Segments, SegmentPage, SegmentPageRead, segment_hash};
fn create_allocator(file_name: &str) -> Allocator {
let file = OpenOptions::new().read(true).write(true).create(true).open(file_name).unwrap();
let config = Arc::new(Config::new());
let disc = DiscRef::new(file);
let pa = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &config, pa).unwrap();
allocator.allocate(5).unwrap();
allocator
}
#[test()]
fn test_create_drop_segment() {
let allocator = create_allocator("./raw_segment_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let mut segments = Segments::new(root, &all).unwrap();
let id = segments.create_temp_segment(&all, &"some".into()).unwrap();
segments.create_segment(id).unwrap();
segments.flush_segments(&all).unwrap();
assert!(segments.segments_id.contains_key("some"));
assert!(segments.segments.contains_key(&id));
segments.drop_segment(&all, &"some".into()).unwrap();
segments.flush_segments(&all).unwrap();
assert!(!segments.segments_id.contains_key("some"));
assert!(!segments.segments.contains_key(&id));
fs::remove_file("./raw_segment_create_delete.persy").unwrap();
}
#[test()]
fn test_create_drop_temp_segment() {
let allocator = create_allocator("./segment_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let mut segments = Segments::new(root, &all).unwrap();
let id = segments.create_temp_segment(&all, &"some".into()).unwrap();
assert!(segments.temp_segments.contains_key(&id));
assert!(segments.temp_segments_id.contains_key("some"));
segments.drop_temp_segment(&all, id).unwrap();
assert!(!segments.temp_segments.contains_key(&id));
assert!(!segments.temp_segments_id.contains_key("some"));
fs::remove_file("./segment_create_delete.persy").unwrap();
}
#[test()]
fn test_create_close_drop_close_segment() {
let allocator = create_allocator("./segment_pers_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let id;
{
let mut segments = Segments::new(root, &all).unwrap();
id = segments.create_temp_segment(&all, &"some".into()).unwrap();
segments.create_segment(id).unwrap();
segments.flush_segments(&all).unwrap();
}
{
let mut segments = Segments::new(root, &all).unwrap();
assert_eq!(segments.segments.len(), 1);
assert!(segments.segments_id.contains_key("some"));
assert!(segments.segments.contains_key(&id));
segments.drop_segment(&all, &"some".into()).unwrap();
segments.flush_segments(&all).unwrap();
}
{
let segments = Segments::new(root, &all).unwrap();
assert!(!segments.segments_id.contains_key("some"));
assert!(!segments.segments.contains_key(&id));
}
fs::remove_file("./segment_pers_create_delete.persy").unwrap();
}
#[test]
fn test_seg_insert_read_pointer() {
let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(val) => assert_eq!(val.0, 10),
None => assert!(false),
}
}
#[test]
fn test_seg_insert_update_read_pointer() {
let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
page.segment_update_entry(0, 30, 15).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(val) => assert_eq!(val.0, 15),
None => assert!(false),
}
}
#[test]
fn test_seg_insert_delete_read_pointer() {
let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
page.segment_delete_entry(0, 30).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(_) => assert!(false),
None => assert!(true),
}
}
#[test]
fn test_hash_id_generator() {
assert!(0 != segment_hash(&"some".into()));
}
}