use crate::{
address::{
ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_EXP, FLAG_DELETED, FLAG_EXISTS, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET,
SEGMENT_PAGE_DELETE_COUNT_OFFSET,
},
allocator::Allocator,
device::{Page, PageOps},
error::PERes,
id::{RecRef, SegmentId},
io::{ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
str,
sync::Arc,
vec,
};
pub mod segment_page_iterator;
#[cfg(test)]
mod tests;
pub(crate) const SEGMENTS_ROOT_PAGE_VERSION_0: u8 = 0;
pub(crate) const SEGMENTS_ROOT_PAGE_VERSION: u8 = SEGMENTS_ROOT_PAGE_VERSION_0;
pub struct AllocatedSegmentPage {
pub new_page: u64,
pub previous_page: u64,
}
#[derive(Clone)]
pub struct Segment {
pub first_page: u64,
pub last_page: u64,
pub last_pos: u32,
segment_id: SegmentId,
name: String,
}
impl Segment {
pub fn new_allocation(first_page: u64, segment_id: SegmentId, name: &str) -> Segment {
Segment {
first_page,
last_page: first_page,
last_pos: SEGMENT_DATA_OFFSET,
segment_id,
name: name.to_string(),
}
}
pub fn new(first_page: u64, last_page: u64, last_pos: u32, segment_id: SegmentId, name: &str) -> Segment {
Segment {
first_page,
last_page,
last_pos,
segment_id,
name: name.to_string(),
}
}
pub(crate) fn get_first_page(&self) -> u64 {
self.first_page
}
pub(crate) fn get_segment_id(&self) -> SegmentId {
self.segment_id
}
pub(crate) fn get_name(&self) -> &str {
&self.name
}
pub fn read(read: &mut dyn InfallibleRead) -> PERes<Segment> {
let first_page = read.read_u64();
let persistent_page = read.read_u64();
let persistent_pos = read.read_u32();
let segment_id = SegmentId::read(read);
let name_size = read.read_u16() as usize;
let mut slice: Vec<u8> = vec![0; name_size];
read.read_exact(&mut slice);
let name: String = str::from_utf8(&slice[0..name_size])?.into();
Ok(Segment::new(
first_page,
persistent_page,
persistent_pos,
segment_id,
&name,
))
}
pub fn write(&self, write: &mut dyn InfallibleWrite) {
write.write_u64(self.first_page);
write.write_u64(self.last_page);
write.write_u32(self.last_pos);
self.segment_id.write(write);
write.write_u16(self.name.len() as u16);
write.write_all(self.name.as_bytes());
}
pub fn allocate_internal(&mut self, allocator: &Allocator) -> PERes<(RecRef, Option<AllocatedSegmentPage>)> {
let page = self.last_page;
let pos = self.last_pos;
let new_pos = pos + ADDRESS_ENTRY_SIZE;
let mut pg = allocator.write_page(page)?;
Ok(if new_pos > pg.get_content_size() {
if pg.empty() {
let prev = pg.get_prev()?;
pg.reset()?;
pg.set_next(0)?;
pg.set_prev(prev)?;
pg.set_segment_id(self.segment_id)?;
pg.zero_count()?;
allocator.flush_page(pg)?;
self.last_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
(RecRef::new(self.last_page, SEGMENT_DATA_OFFSET), None)
} else {
let mut new_pg = allocator.allocate(ADDRESS_PAGE_EXP)?;
let new_page = new_pg.get_index();
pg.set_next(new_page)?;
allocator.flush_page(pg)?;
new_pg.set_next(0)?;
new_pg.set_prev(page)?;
new_pg.set_segment_id(self.segment_id)?;
new_pg.zero_count()?;
allocator.flush_page(new_pg)?;
self.last_page = new_page;
self.last_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
(
RecRef::new(self.last_page, SEGMENT_DATA_OFFSET),
Some(AllocatedSegmentPage {
new_page,
previous_page: page,
}),
)
}
} else {
self.last_pos = new_pos;
(RecRef::new(page, pos), None)
})
}
pub fn collect_segment_pages(&self, allocator: &Allocator) -> PERes<Vec<u64>> {
let mut pages = Vec::new();
let mut page = self.first_page;
let last = self.last_page;
loop {
let mut pag = allocator.load_page(page)?;
let next = pag.read_u64();
let mut pos = SEGMENT_DATA_OFFSET;
loop {
pag.seek(pos);
let data_page = pag.read_u64();
let flag = pag.read_u8();
if entry_exits(flag) {
pages.push(data_page);
}
pos += ADDRESS_ENTRY_SIZE;
if pos > pag.get_content_size() - ADDRESS_ENTRY_SIZE {
break;
}
}
pages.push(page);
if page == last {
break;
}
page = next;
}
Ok(pages)
}
}
pub(crate) trait SegmentPageRead: PageOps {
fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> Option<(u64, u16)>;
fn segment_scan_all_entries(&mut self) -> (u64, Vec<(u32, bool)>);
fn segment_first_available_pos(&mut self) -> u32;
fn get_next(&mut self) -> PERes<u64>;
fn get_prev(&mut self) -> PERes<u64>;
fn empty(&mut self) -> bool;
fn match_id(&mut self, id: &SegmentId) -> bool;
fn possible_entries(&self) -> u32;
}
pub(crate) trait SegmentPage: SegmentPageRead {
fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64);
fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64);
fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> bool;
fn set_segment_id(&mut self, id: SegmentId) -> PERes<()>;
fn set_next(&mut self, next: u64) -> PERes<()>;
fn set_prev(&mut self, prev: u64) -> PERes<()>;
fn recalc_count(&mut self) -> PERes<()>;
fn zero_count(&mut self) -> PERes<()>;
}
fn entry_exits(flags: u8) -> bool {
flags & FLAG_EXISTS == FLAG_EXISTS && flags & FLAG_DELETED == 0
}
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
fn match_id(&mut self, id: &SegmentId) -> bool {
let persistent_id = SegmentId::read(self);
persistent_id == *id
}
fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> Option<(u64, u16)> {
self.seek(SEGMENT_HASH_OFFSET);
let persistent_id = SegmentId::read(self);
if persistent_id != segment_id {
return None;
}
self.seek(pos);
let record = self.read_u64();
let flag = self.read_u8();
let version = self.read_u16();
if !entry_exits(flag) || record == 0 {
None
} else {
Some((record, version))
}
}
fn segment_scan_all_entries(&mut self) -> (u64, Vec<(u32, bool)>) {
let next_page = self.read_u64();
let mut pos = SEGMENT_DATA_OFFSET;
let mut recs = Vec::with_capacity(self.possible_entries() as usize);
loop {
self.seek(pos + 8);
let flag = self.read_u8();
recs.push((pos, flag & FLAG_EXISTS == 1));
pos += ADDRESS_ENTRY_SIZE;
if pos > self.get_content_size() - ADDRESS_ENTRY_SIZE {
break;
}
}
(next_page, recs)
}
fn segment_first_available_pos(&mut self) -> u32 {
let mut pos = SEGMENT_DATA_OFFSET + (self.possible_entries() - 1) * ADDRESS_ENTRY_SIZE;
loop {
self.seek(pos + 8);
let flag = self.read_u8();
if flag & FLAG_EXISTS == FLAG_EXISTS {
pos += ADDRESS_ENTRY_SIZE;
break;
}
if pos == SEGMENT_DATA_OFFSET {
break;
}
pos -= ADDRESS_ENTRY_SIZE;
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
}
pos
}
fn get_next(&mut self) -> PERes<u64> {
self.seek(0);
Ok(self.read_u64())
}
fn get_prev(&mut self) -> PERes<u64> {
self.seek(8);
Ok(self.read_u64())
}
fn empty(&mut self) -> bool {
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
self.read_u16() as u32 == self.possible_entries()
}
fn possible_entries(&self) -> u32 {
(self.get_content_size() - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE
}
}
fn inc_version(version: u16) -> u16 {
if version == u16::MAX {
1
} else {
version + 1
}
}
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) {
debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
self.seek(SEGMENT_HASH_OFFSET);
let persistent_id = SegmentId::read(self);
debug_assert!(persistent_id == segment_id);
self.seek(pos);
self.write_u64(record_page);
self.write_u8(FLAG_EXISTS);
self.write_u16(1);
}
fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) {
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
self.seek(SEGMENT_HASH_OFFSET);
let persistent_id = SegmentId::read(self);
debug_assert!(persistent_id == segment_id);
self.seek(pos + 9);
let version = self.read_u16();
self.seek(pos);
self.write_u64(record_page);
self.seek(pos + 9);
self.write_u16(inc_version(version));
}
fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> bool {
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
self.seek(SEGMENT_HASH_OFFSET);
let persistent_id = SegmentId::read(self);
debug_assert!(persistent_id == segment_id);
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
let count = self.read_u16() + 1;
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
self.write_u16(count);
self.seek(pos + 8);
let flag = self.read_u8();
self.seek(pos + 8);
self.write_u8(flag | FLAG_DELETED);
count as u32 == self.possible_entries()
}
fn set_segment_id(&mut self, id: SegmentId) -> PERes<()> {
self.seek(SEGMENT_HASH_OFFSET);
id.write(self);
Ok(())
}
fn set_next(&mut self, next: u64) -> PERes<()> {
self.seek(0);
self.write_u64(next);
Ok(())
}
fn set_prev(&mut self, prev: u64) -> PERes<()> {
self.seek(8);
self.write_u64(prev);
Ok(())
}
fn recalc_count(&mut self) -> PERes<()> {
let mut pos = SEGMENT_DATA_OFFSET;
let mut count = 0;
loop {
self.seek(pos + 8);
let flag = self.read_u8();
if flag & FLAG_DELETED == FLAG_DELETED {
count += 1;
}
pos += ADDRESS_ENTRY_SIZE;
if pos > self.get_content_size() - ADDRESS_ENTRY_SIZE {
break;
}
}
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
self.write_u16(count as u16);
Ok(())
}
fn zero_count(&mut self) -> PERes<()> {
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
self.write_u16(0);
Ok(())
}
}
pub struct Segments {
pub root_page: u64,
pub segments: HashMap<SegmentId, Segment>,
pub segments_id: HashMap<String, SegmentId>,
}
pub fn segment_hash(segment: &str) -> u64 {
let hasher = &mut DefaultHasher::new();
segment.hash(hasher);
let mut val = hasher.finish();
val <<= 32;
val |= u64::from(rand::random::<u32>());
val
}
impl Segments {
pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PERes<Segments> {
let read_buffer = allocator.read_address_buffer(root_page)?;
let mut segments = HashMap::new();
let mut segments_id = HashMap::new();
if let Some(buffer) = read_buffer {
let mut page = ArcSliceRead::new_vec(buffer);
loop {
let flag = page.read_u8();
if flag == 1 {
let segment = Segment::read(&mut page)?;
let name = segment.name.clone();
let pers_hash = segment.segment_id;
segments.insert(pers_hash, segment);
segments_id.insert(name, pers_hash);
} else {
break;
}
}
}
Ok(Segments {
root_page,
segments,
segments_id,
})
}
pub fn init(root: Page, allocator: &Allocator) -> PERes<()> {
allocator.create_address_root(root)?;
Ok(())
}
pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
if let Some(id) = self.segments_id.get(segment) {
self.segments.get(id).map(|x| x.segment_id)
} else {
None
}
}
pub fn segment_by_id(&self, id: SegmentId) -> Option<&Segment> {
self.segments.get(&id)
}
pub fn segment_name_by_id(&self, id: SegmentId) -> Option<String> {
self.segments.get(&id).map(|s| s.name.clone())
}
pub fn has_segment(&self, segment: &str) -> bool {
self.segments_id.contains_key(segment)
}
pub fn has_segment_by_id(&self, segment: &SegmentId) -> bool {
self.segments.contains_key(segment)
}
pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PERes<Segment> {
let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
let allocated_id = allocated.get_index();
let segment_id = SegmentId::new(segment_hash(segment));
let seg = Segment::new_allocation(allocated_id, segment_id, segment);
allocated.write_u64(0);
allocated.write_u64(0);
segment_id.write(&mut allocated);
allocator.flush_page(allocated)?;
Ok(seg)
}
pub fn finalize_create_segment(&mut self, segment: Segment) {
self.segments_id.insert(segment.name.clone(), segment.segment_id);
self.segments.insert(segment.segment_id, segment);
}
pub fn recover_finalize_create_segment(&mut self, segment: Segment) {
if !self.segments.contains_key(&segment.get_segment_id()) {
self.finalize_create_segment(segment)
}
}
pub fn drop_segment(&mut self, segment: &str) {
if let Some(seg) = self.segments_id.remove(segment) {
self.segments.remove(&seg);
}
}
pub fn collect_segment_pages(&self, allocator: &Allocator, segment: SegmentId) -> PERes<Vec<u64>> {
if let Some(seg) = self.segments.get(&segment) {
seg.collect_segment_pages(allocator)
} else {
Ok(Vec::new())
}
}
pub fn set_first_page(&mut self, segment: SegmentId, first_page: u64) {
if let Some(seg) = self.segments.get_mut(&segment) {
seg.first_page = first_page;
}
}
pub fn set_first_page_if_same(&mut self, segment: SegmentId, previous_first: u64, first_page: u64) {
if let Some(seg) = self.segments.get_mut(&segment) {
if seg.first_page == previous_first {
seg.first_page = first_page;
}
}
}
pub fn recover_allocations(
&mut self,
segments: &[SegmentId],
created: &mut HashMap<SegmentId, Segment>,
allocator: &Allocator,
) -> PERes<()> {
for seg in segments {
let segment = if let Some(s) = self.segments.get_mut(seg) {
s
} else if let Some(s) = created.get_mut(seg) {
s
} else {
panic!("segment operation when segment do not exists");
};
let mut page = allocator.load_page(segment.last_page)?;
segment.last_pos = page.segment_first_available_pos();
}
Ok(())
}
pub fn clear_empty(&mut self, allocator: &Allocator, empty: &[(SegmentId, u64)]) -> PERes<Vec<(SegmentId, u64)>> {
let mut flush_segments = false;
let mut updated_segment = HashMap::new();
for (segment, page) in empty {
let mut p = allocator.load_page(*page)?;
let next = p.get_next()?;
let prev = p.get_prev()?;
debug_assert!(next != 0);
let mut next_page = allocator.write_page(next)?;
next_page.set_prev(prev)?;
allocator.flush_page(next_page)?;
if prev != 0 {
let mut prev_page = allocator.write_page(prev)?;
prev_page.set_next(next)?;
allocator.flush_page(prev_page)?;
} else if next != 0 {
updated_segment.insert(*segment, (*segment, next));
self.set_first_page(*segment, next);
flush_segments = true;
}
}
if flush_segments {
self.flush_segments(allocator)?;
}
Ok(updated_segment.values().cloned().collect())
}
pub fn recover_remove_pages(
&mut self,
allocator: &Allocator,
delete_pages: &[(SegmentId, u64)],
) -> PERes<Vec<u64>> {
let mut to_free = Vec::new();
for (segment_id, page_id) in delete_pages {
if let Some(mut page) = allocator.load_page_not_free(*page_id)? {
if page.empty() && page.match_id(segment_id) {
let next = page.get_next()?;
let prev = page.get_prev()?;
debug_assert!(next != 0);
let mut next_page = allocator.write_page(next)?;
next_page.set_prev(prev)?;
allocator.flush_page(next_page)?;
if prev != 0 {
let mut prev_page = allocator.write_page(prev)?;
prev_page.set_next(next)?;
allocator.flush_page(prev_page)?;
} else if next != 0 {
self.set_first_page_if_same(*segment_id, *page_id, next);
}
to_free.push(*page_id);
}
}
}
Ok(to_free)
}
pub fn recompute_last_pages(&mut self, allocator: &Allocator) -> PERes<()> {
for (_, segment) in self.segments.iter_mut() {
loop {
let mut page = allocator.load_page(segment.last_page)?;
let n_page = page.get_next()?;
if n_page == 0 {
break;
}
segment.last_page = n_page;
}
let mut page = allocator.load_page(segment.last_page)?;
segment.last_pos = page.segment_first_available_pos();
}
Ok(())
}
pub fn flush_segments(&mut self, allocator: &Allocator) -> PERes<()> {
let mut buffer = Vec::<u8>::new();
for segment in self.segments.values() {
buffer.write_u8(1);
segment.write(&mut buffer);
}
buffer.write_u8(0);
allocator.write_address_root(self.root_page, &mut buffer, SEGMENTS_ROOT_PAGE_VERSION)?;
Ok(())
}
pub fn list(&self) -> Vec<(String, SegmentId)> {
self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
}
pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
self.segments
.iter()
.map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
.collect()
}
}