use crate::{
address::{
Address, ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_EXP, ADDRESS_PAGE_SIZE, FLAG_EXISTS, SEGMENT_DATA_OFFSET,
SEGMENT_HASH_OFFSET, SEGMENT_PAGE_ENTRY_COUNT_OFFSET,
},
allocator::Allocator,
discref::{PageIndex, PageSeek},
error::{PRes, PersyError},
flush_checksum::{double_buffer_check, prepare_buffer_flush},
id::{PersyId, RecRef},
persy::exp_from_content_size,
};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
io::{Read, Write},
str,
sync::Arc,
vec,
};
pub struct AllocatedSegmentPage {
pub new_page: u64,
pub previus_page: u64,
}
pub struct Segment {
pub first_page: u64,
persistent_page: u64,
persistent_pos: u32,
pub alloc_page: u64,
pub alloc_pos: u32,
segment_id: u32,
name: String,
}
impl Segment {
pub fn new(
first_page: u64,
persistent_page: u64,
persistent_pos: u32,
alloc_page: u64,
alloc_pos: u32,
segment_id: u32,
name: &str,
) -> Segment {
Segment {
first_page,
persistent_page,
persistent_pos,
alloc_page,
alloc_pos,
segment_id,
name: name.to_string(),
}
}
pub fn allocate_internal(&mut self, allocator: &Arc<Allocator>) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
let page = self.alloc_page;
let pos = self.alloc_pos;
let new_pos = pos + ADDRESS_ENTRY_SIZE;
Ok(if new_pos > ADDRESS_PAGE_SIZE {
let new_page = allocator.allocate(ADDRESS_PAGE_EXP)?;
let mut pg = allocator.write_page(page)?;
pg.set_next(new_page)?;
allocator.flush_page(&mut pg)?;
let mut new_pg = allocator.write_page(new_page)?;
new_pg.set_next(0)?;
new_pg.set_prev(page)?;
new_pg.set_segment_id(self.segment_id)?;
allocator.flush_page(&mut new_pg)?;
self.alloc_page = new_page;
self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
(
RecRef {
page: self.alloc_page,
pos: SEGMENT_DATA_OFFSET,
},
Some(AllocatedSegmentPage {
new_page,
previus_page: page,
}),
)
} else {
self.alloc_pos = new_pos;
(RecRef { page, pos }, None)
})
}
pub fn collect_segment_pages(&self, allocator: &Allocator) -> PRes<Vec<u64>> {
let mut pages = Vec::new();
let mut page = self.first_page;
let last = self.persistent_page;
loop {
let mut pag = allocator.load_page(page)?;
let next = pag.read_u64::<BigEndian>()?;
let mut pos = SEGMENT_DATA_OFFSET;
loop {
pag.seek(pos)?;
let data_page = pag.read_u64::<BigEndian>()?;
let flag = pag.read_u8()?;
if flag & FLAG_EXISTS == 1 {
pages.push(data_page);
}
pos += ADDRESS_ENTRY_SIZE;
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
break;
}
}
pages.push(page);
if page == last {
break;
}
page = next;
}
Ok(pages)
}
}
pub trait SegmentPageRead: PageIndex {
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
fn segment_first_available_pos(&mut self) -> PRes<u32>;
}
pub trait SegmentPage: SegmentPageRead {
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool>;
fn set_segment_id(&mut self, id: u32) -> PRes<()>;
fn get_next(&mut self) -> PRes<u64>;
fn set_next(&mut self, next: u64) -> PRes<()>;
fn get_prev(&mut self) -> PRes<u64>;
fn set_prev(&mut self, prev: u64) -> PRes<()>;
fn recalc_count(&mut self) -> PRes<()>;
}
impl<T: ReadBytesExt + PageSeek + PageIndex> SegmentPageRead for T {
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Ok(None);
}
self.seek(pos)?;
let record = self.read_u64::<BigEndian>()?;
let flag = self.read_u8()?;
let version = self.read_u16::<BigEndian>()?;
Ok(if flag & FLAG_EXISTS == 0 {
None
} else {
Some((record, version))
})
}
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
let next_page = self.read_u64::<BigEndian>()?;
let mut pos = SEGMENT_DATA_OFFSET;
let mut recs = Vec::new();
loop {
self.seek(pos + 8)?;
let flag = self.read_u8()?;
if flag & FLAG_EXISTS == 1 {
recs.push(pos);
}
pos += ADDRESS_ENTRY_SIZE;
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
break;
}
}
Ok((next_page, recs))
}
fn segment_first_available_pos(&mut self) -> PRes<u32> {
let mut pos = SEGMENT_DATA_OFFSET;
let mut empty_pos = 0;
loop {
self.seek(pos + 8)?;
let flag = self.read_u8()?;
pos += ADDRESS_ENTRY_SIZE;
if flag & FLAG_EXISTS == 1 {
empty_pos = pos;
}
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
break;
}
}
Ok(empty_pos)
}
}
fn inc_version(mut version: u16) -> u16 {
version += 1;
if version == 0 {
1
} else {
version
}
}
impl<T: WriteBytesExt + ReadBytesExt + PageSeek + PageIndex> SegmentPage for T {
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::SegmentNotFound);
}
self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
let count = self.read_u8()?;
self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
self.write_u8(count + 1)?;
self.seek(pos)?;
self.write_u64::<BigEndian>(record_page)?;
self.write_u8(FLAG_EXISTS)?;
self.write_u16::<BigEndian>(1)?;
Ok(())
}
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
}
self.seek(pos + 9)?;
let version = self.read_u16::<BigEndian>()?;
self.seek(pos)?;
self.write_u64::<BigEndian>(record_page)?;
self.seek(pos + 9)?;
self.write_u16::<BigEndian>(inc_version(version))?;
Ok(())
}
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool> {
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
self.seek(SEGMENT_HASH_OFFSET)?;
let persistent_id = self.read_u32::<BigEndian>()?;
if persistent_id != segment_id {
return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
}
self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
let count = self.read_u8()? - 1;
self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
self.write_u8(count)?;
self.seek(pos + 8)?;
let flag = self.read_u8()?;
self.seek(pos + 8)?;
self.write_u8(flag ^ FLAG_EXISTS)?;
Ok(count == 0)
}
fn set_segment_id(&mut self, id: u32) -> PRes<()> {
self.seek(SEGMENT_HASH_OFFSET)?;
self.write_u32::<BigEndian>(id)?;
Ok(())
}
fn get_next(&mut self) -> PRes<u64> {
self.seek(0)?;
self.read_u64::<BigEndian>().map_err(<_>::into)
}
fn set_next(&mut self, next: u64) -> PRes<()> {
self.seek(0)?;
self.write_u64::<BigEndian>(next)?;
Ok(())
}
fn get_prev(&mut self) -> PRes<u64> {
self.seek(8)?;
self.read_u64::<BigEndian>().map_err(<_>::into)
}
fn set_prev(&mut self, prev: u64) -> PRes<()> {
self.seek(8)?;
self.write_u64::<BigEndian>(prev)?;
Ok(())
}
fn recalc_count(&mut self) -> PRes<()> {
let count = self.segment_scan_entries()?.1.len();
self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
self.write_u8(count as u8)?;
Ok(())
}
}
pub struct Segments {
pub root_page: u64,
content_page: u64,
last_flush: u8,
pub segments: HashMap<u32, Segment>,
pub segments_id: HashMap<String, u32>,
pub temp_segments: HashMap<u32, Segment>,
pub temp_segments_id: HashMap<String, u32>,
}
pub fn segment_hash(segment: &str) -> u32 {
let mut val: u32;
let hasher = &mut DefaultHasher::new();
segment.hash(hasher);
val = hasher.finish() as u32;
val <<= 16;
val |= u32::from(rand::random::<u16>());
val
}
impl Segments {
pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
let mut buffer_0 = [0; 11];
let mut buffer_1 = [0; 11];
let page_id;
let last_flush;
{
let mut root = allocator.load_page(root_page)?;
root.read_exact(&mut buffer_0)?;
root.read_exact(&mut buffer_1)?;
let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
last_flush = flush;
if first {
page_id = BigEndian::read_u64(&buffer_0[0..8]);
} else {
page_id = BigEndian::read_u64(&buffer_1[0..8]);
}
}
let mut segments = HashMap::new();
let mut segments_id = HashMap::new();
if page_id != 0 {
let mut page = allocator.load_page(page_id)?;
loop {
let flag = page.read_u8()?;
if flag == 1 {
let first_page = page.read_u64::<BigEndian>()?;
let persistent_page = page.read_u64::<BigEndian>()?;
let persistent_pos = page.read_u32::<BigEndian>()?;
let pers_hash = page.read_u32::<BigEndian>()?;
let name_size = page.read_u16::<BigEndian>()? as usize;
let mut slice: Vec<u8> = vec![0; name_size];
page.read_exact(&mut slice)?;
let name: String = str::from_utf8(&slice[0..name_size])?.into();
segments.insert(
pers_hash,
Segment::new(
first_page,
persistent_page,
persistent_pos,
persistent_page,
persistent_pos,
pers_hash,
&name,
),
);
segments_id.insert(name, pers_hash);
} else {
break;
}
}
}
Ok(Segments {
root_page,
content_page: page_id,
last_flush,
segments,
segments_id,
temp_segments: HashMap::new(),
temp_segments_id: HashMap::new(),
})
}
pub fn init(root_page: u64, allocator: &Allocator) -> PRes<()> {
let mut buffer = [0; 11];
BigEndian::write_u64(&mut buffer[0..8], 0);
prepare_buffer_flush(&mut buffer, 0);
let mut root = allocator.write_page(root_page)?;
root.write_all(&buffer)?;
allocator.flush_page(&mut root)?;
Ok(())
}
pub fn segment_id(&self, segment: &str) -> Option<u32> {
if let Some(id) = self.segments_id.get(segment) {
self.segments.get(id).map(|x| x.segment_id)
} else {
None
}
}
pub fn segment_by_id(&self, id: u32) -> Option<&Segment> {
self.segments.get(&id)
}
pub fn segment_by_id_temp(&self, id: u32) -> Option<&Segment> {
self.temp_segments.get(&id)
}
pub fn has_segment(&self, segment: &str) -> bool {
self.segments_id.contains_key(segment)
}
pub fn create_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: &str) -> PRes<(u32, u64)> {
let allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
let segment_id = segment_hash(segment);
let seg = Segment::new(
allocated,
allocated,
SEGMENT_DATA_OFFSET,
allocated,
SEGMENT_DATA_OFFSET,
segment_id,
segment,
);
self.temp_segments.insert(segment_id, seg);
self.temp_segments_id.insert(segment.to_string(), segment_id);
let mut pag = allocator.write_page(allocated)?;
pag.write_u64::<BigEndian>(0)?;
pag.write_u64::<BigEndian>(0)?;
pag.write_u32::<BigEndian>(segment_id)?;
allocator.flush_page(&mut pag)?;
Ok((segment_id, allocated))
}
pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
self.temp_segments.get_mut(&segment)
}
pub fn drop_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: u32) -> PRes<()> {
if let Some(segment) = self.temp_segments.remove(&segment) {
self.temp_segments_id.remove(&segment.name);
let pages = segment.collect_segment_pages(allocator)?;
for page in pages {
allocator.free(page)?;
}
}
Ok(())
}
pub fn exists_real_or_temp(&self, segment: u32) -> bool {
self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
}
pub fn create_segment(&mut self, segment: u32, first_page: u64) -> PRes<()> {
if let Some(mut s) = self.temp_segments.remove(&segment) {
s.first_page = first_page;
self.temp_segments_id.remove(&s.name);
self.segments_id.insert(s.name.clone(), s.segment_id);
self.segments.insert(s.segment_id, s);
}
Ok(())
}
pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
if let Some(seg) = self.segments_id.remove(segment) {
self.segments.remove(&seg);
}
Ok(())
}
pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
if let Some(seg) = self.segments.get(&segment) {
seg.collect_segment_pages(allocator)
} else {
Ok(Vec::new())
}
}
pub fn set_first_page(&mut self, segment: u32, first_page: u64, allocator: &Arc<Allocator>) -> PRes<()> {
if let Some(seg) = self.segments.get_mut(&segment) {
seg.first_page = first_page;
}
self.flush_segments(allocator)?;
allocator.disc().sync()?;
Ok(())
}
pub fn reset(&mut self, segment: u32, allocator: &Arc<Allocator>) -> PRes<()> {
if let Some(seg) = self.segments.get_mut(&segment) {
seg.persistent_page = seg.first_page;
seg.persistent_pos = SEGMENT_DATA_OFFSET;
seg.alloc_page = seg.first_page;
seg.alloc_pos = SEGMENT_DATA_OFFSET;
}
self.flush_segments(allocator)?;
allocator.disc().sync()?;
Ok(())
}
pub fn confirm_allocations(&mut self, segments: &[u32], allocator: &Allocator, recover: bool) -> PRes<()> {
for seg in segments {
let segment = if let Some(s) = self.segments.get_mut(seg) {
s
} else if let Some(s) = self.temp_segments.get_mut(seg) {
s
} else {
panic!("segment operation when segment do not exists");
};
segment.persistent_page = segment.alloc_page;
if recover {
let mut page = allocator.load_page(segment.alloc_page)?;
segment.alloc_pos = page.segment_first_available_pos()?;
}
segment.persistent_pos = segment.alloc_pos;
}
Ok(())
}
pub fn flush_segments(&mut self, allocator: &Arc<Allocator>) -> PRes<()> {
let mut buffer = Vec::<u8>::new();
for segment in self.segments.values() {
buffer.write_u8(1)?;
buffer.write_u64::<BigEndian>(segment.first_page)?;
buffer.write_u64::<BigEndian>(segment.persistent_page)?;
buffer.write_u32::<BigEndian>(segment.persistent_pos)?;
buffer.write_u32::<BigEndian>(segment.segment_id)?;
buffer.write_u16::<BigEndian>(segment.name.len() as u16)?;
buffer.write_all(segment.name.as_bytes())?;
}
buffer.write_u8(0)?;
let exp = exp_from_content_size(buffer.len() as u64);
let content_page_id;
{
content_page_id = allocator.allocate(exp)?;
let mut content_page = allocator.write_page(content_page_id)?;
content_page.write_all(&buffer)?;
allocator.flush_page(&mut content_page)?;
}
let mut root_buffer = [0; 11];
BigEndian::write_u64(&mut root_buffer[0..8], content_page_id);
let (last_flush, offset) = prepare_buffer_flush(&mut root_buffer, self.last_flush);
self.last_flush = last_flush;
{
let mut root = allocator.write_page(self.root_page)?;
root.seek(offset)?;
root.write_all(&root_buffer)?;
allocator.flush_page(&mut root)?;
}
if self.content_page != 0 {
allocator.free(self.content_page)?;
}
self.content_page = content_page_id;
Ok(())
}
pub fn list(&self) -> Vec<(String, u32)> {
self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
}
}
pub struct SegmentPageIterator {
cur_page: u64,
next_page: u64,
per_page_iterator: vec::IntoIter<u32>,
}
impl SegmentPageIterator {
pub fn new(first_page: u64) -> SegmentPageIterator {
let vc = Vec::new();
let vci = vc.into_iter();
SegmentPageIterator {
cur_page: first_page,
next_page: first_page,
per_page_iterator: vci,
}
}
pub fn next(&mut self, address: &Address) -> Option<RecRef> {
loop {
let iter = self.per_page_iterator.next();
if iter.is_none() && self.next_page != 0 {
self.cur_page = self.next_page;
if let Ok(tp) = address.scan_page(self.cur_page) {
self.next_page = tp.0;
self.per_page_iterator = tp.1.into_iter();
continue;
}
}
break iter;
}
.map(|pos| RecRef::new(self.cur_page, pos))
}
}
#[cfg(test)]
mod tests {
use super::{segment_hash, SegmentPage, SegmentPageRead, Segments};
use crate::{
address::ADDRESS_PAGE_EXP,
allocator::Allocator,
config::Config,
discref::{DiscRef, Page},
};
use std::sync::Arc;
use tempfile::Builder;
fn create_allocator(file_name: &str) -> Allocator {
let file = Builder::new()
.prefix(file_name)
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let config = Arc::new(Config::new());
let disc = DiscRef::new(file);
let pa = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &config, pa).unwrap();
allocator.allocate(5).unwrap();
allocator
}
#[test]
fn test_create_drop_segment() {
let allocator = create_allocator("./raw_segment_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let mut segments = Segments::new(root, &all).unwrap();
let (id, fp) = segments.create_temp_segment(&all, "some").unwrap();
segments.create_segment(id, fp).unwrap();
segments.flush_segments(&all).unwrap();
assert!(segments.segments_id.contains_key("some"));
assert!(segments.segments.contains_key(&id));
segments.drop_segment("some").unwrap();
segments.flush_segments(&all).unwrap();
assert!(!segments.segments_id.contains_key("some"));
assert!(!segments.segments.contains_key(&id));
}
#[test]
fn test_create_drop_temp_segment() {
let allocator = create_allocator("./segment_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let mut segments = Segments::new(root, &all).unwrap();
let (id, _) = segments.create_temp_segment(&all, "some").unwrap();
assert!(segments.temp_segments.contains_key(&id));
assert!(segments.temp_segments_id.contains_key("some"));
segments.drop_temp_segment(&all, id).unwrap();
assert!(!segments.temp_segments.contains_key(&id));
assert!(!segments.temp_segments_id.contains_key("some"));
}
#[test]
fn test_create_close_drop_close_segment() {
let allocator = create_allocator("./segment_pers_create_delete.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
let id;
{
let mut segments = Segments::new(root, &all).unwrap();
let (c_id, fp) = segments.create_temp_segment(&all, "some").unwrap();
id = c_id;
segments.create_segment(id, fp).unwrap();
segments.flush_segments(&all).unwrap();
}
{
let mut segments = Segments::new(root, &all).unwrap();
assert_eq!(segments.segments.len(), 1);
assert!(segments.segments_id.contains_key("some"));
assert!(segments.segments.contains_key(&id));
segments.drop_segment("some").unwrap();
segments.flush_segments(&all).unwrap();
}
{
let segments = Segments::new(root, &all).unwrap();
assert!(!segments.segments_id.contains_key("some"));
assert!(!segments.segments.contains_key(&id));
}
}
#[test]
fn test_create_close_drop_close_segment_off_page() {
let allocator = create_allocator("./segment_pers_create_delete_off_page.persy");
let all = Arc::new(allocator);
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
Segments::init(root, &all).unwrap();
{
let mut segments = Segments::new(root, &all).unwrap();
for i in 0..100 {
let (id, fp) = segments.create_temp_segment(&all, &format!("some{}", i)).unwrap();
segments.create_segment(id, fp).unwrap();
}
segments.flush_segments(&all).unwrap();
}
{
let mut segments = Segments::new(root, &all).unwrap();
for i in 0..100 {
assert!(segments.segments_id.contains_key(&format!("some{}", i)));
segments.drop_segment(&format!("some{}", i)).unwrap();
}
segments.flush_segments(&all).unwrap();
}
{
let segments = Segments::new(root, &all).unwrap();
for i in 0..100 {
assert!(!segments.segments_id.contains_key(&format!("some{}", i)));
}
}
}
#[test]
fn test_seg_insert_read_pointer() {
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(val) => assert_eq!(val.0, 10),
None => assert!(false),
}
}
#[test]
fn test_seg_insert_update_read_pointer() {
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
page.segment_update_entry(0, 30, 15).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(val) => assert_eq!(val.0, 15),
None => assert!(false),
}
}
#[test]
fn test_seg_insert_delete_read_pointer() {
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
page.segment_insert_entry(0, 30, 10).unwrap();
page.segment_delete_entry(0, 30).unwrap();
let read = page.segment_read_entry(0, 30).unwrap();
match read {
Some(_) => assert!(false),
None => assert!(true),
}
}
#[test]
fn test_hash_id_generator() {
assert!(0 != segment_hash("some"));
}
}