use std::{fs::File, io::Write, os::unix::fs::FileExt};
use tracing::{debug, error, instrument};
use crate::{Document, DocumentId, Result, Version, ZeboError, index::ProbableIndex};
pub const VERSION_OFFSET: u64 = 0;
pub const DOCUMENT_COUNT_LIMIT_OFFSET: u64 = VERSION_OFFSET + 1;
pub const DOCUMENT_COUNT_OFFSET: u64 = DOCUMENT_COUNT_LIMIT_OFFSET + 4;
pub const NEXT_AVAILABLE_OFFSET: u64 = DOCUMENT_COUNT_OFFSET + 4;
pub const NEXT_AVAILABLE_HEADER_OFFSET: u64 = NEXT_AVAILABLE_OFFSET + 4;
pub const STARTING_DOCUMENT_ID_OFFSET: u64 = NEXT_AVAILABLE_HEADER_OFFSET + 4;
pub const DOCUMENT_INDEX_OFFSET: u64 = STARTING_DOCUMENT_ID_OFFSET + 8;
pub struct ZeboPage {
document_limit: u32,
#[allow(dead_code)]
pub(crate) starting_document_id: u64,
pub(crate) page_file: std::fs::File,
pub(crate) next_available_header_offset: u32,
}
impl ZeboPage {
pub fn try_new(
document_limit: u32,
starting_document_id: u64,
mut page_file: std::fs::File,
) -> Result<Self> {
let document_header_size = (4 + 4 + 8) * (document_limit as u64);
page_file
.set_len(DOCUMENT_INDEX_OFFSET + document_header_size)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[Version::V1.into()], VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&document_limit.to_be_bytes(), DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[0; 4], DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let initial_available_offset = (DOCUMENT_INDEX_OFFSET + document_header_size) as u32;
page_file
.write_all_at(
&initial_available_offset.to_be_bytes(),
NEXT_AVAILABLE_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(
&starting_document_id.to_be_bytes(),
STARTING_DOCUMENT_ID_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file.flush().map_err(ZeboError::OperationError)?;
page_file.sync_all().map_err(ZeboError::OperationError)?;
let s = Self {
document_limit,
starting_document_id,
page_file,
next_available_header_offset: 0,
};
Ok(s)
}
pub fn try_load(page_file: std::fs::File) -> Result<Self> {
let mut buf = [0; 1];
page_file
.read_exact_at(&mut buf, VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
let version = buf[0];
if version != Version::V1.into() {
return Err(ZeboError::UnsupportedVersion {
version,
wanted: Version::V1.into(),
});
}
let mut buf = [0; 4];
page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_limit = u32::from_be_bytes(buf);
page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_header_offset = u32::from_be_bytes(buf);
let mut buf = [0; 8];
page_file
.read_exact_at(&mut buf, STARTING_DOCUMENT_ID_OFFSET)
.map_err(ZeboError::OperationError)?;
let starting_document_id = u64::from_be_bytes(buf);
Ok(Self {
page_file,
document_limit,
starting_document_id,
next_available_header_offset,
})
}
pub fn get_document_count(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_count = u32::from_be_bytes(buf);
Ok(document_count)
}
fn get_next_available_document_offset(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_offset = u32::from_be_bytes(buf);
Ok(next_available_offset)
}
#[inline]
fn is_deleted(doc_id: u64, document_offset: u32, document_len: u32) -> bool {
if document_offset == u32::MAX && document_len == u32::MAX {
return true;
}
if doc_id == u64::MAX && document_offset == u32::MAX {
return true;
}
false
}
#[inline]
fn is_uninitialized_entry(document_offset: u32) -> bool {
document_offset == 0
}
pub fn get_header(&self) -> Result<ZeboPageHeader> {
let document_count = self.get_document_count()?;
let next_available_document_offset = self.get_next_available_document_offset()?;
let mut doc_index = Vec::with_capacity(document_count as usize);
let mut found = 0;
let mut i: u64 = 0;
while found < document_count {
if i > (self.document_limit as u64) {
break;
}
if let Some((doc_id, document_offset, document_len)) = self.get_at(i)? {
if Self::is_uninitialized_entry(document_offset) {
break;
}
if Self::is_deleted(doc_id, document_offset, document_len) {
i += 1;
continue;
}
doc_index.push((doc_id, document_offset, document_len));
found += 1;
}
i += 1;
}
let header = ZeboPageHeader {
document_limit: self.document_limit,
document_count,
next_available_document_offset,
next_available_header_offset: self.next_available_header_offset,
index: doc_index,
};
Ok(header)
}
pub fn get_documents<DocId: DocumentId>(
&self,
doc_id_with_index: &[(u64, ProbableIndex)],
) -> Result<Vec<(DocId, Vec<u8>)>> {
let mut r = Vec::with_capacity(doc_id_with_index.len());
for (doc_id, probable_index) in doc_id_with_index {
let mut probable_index = *probable_index;
if probable_index.0 >= self.next_available_header_offset as u64 {
probable_index.0 = self.next_available_header_offset.saturating_sub(1) as u64;
}
let mut buff = [0; 16];
self.page_file
.read_exact_at(
&mut buff,
DOCUMENT_INDEX_OFFSET + (probable_index.0 * (4 + 4 + 8)),
)
.unwrap();
let found_id = u64::from_be_bytes([
buff[0], buff[1], buff[2], buff[3], buff[4], buff[5], buff[6], buff[7],
]);
let document_offset = u32::from_be_bytes([buff[8], buff[9], buff[10], buff[11]]);
let document_len = u32::from_be_bytes([buff[12], buff[13], buff[14], buff[15]]);
if &found_id == doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(found_id, document_offset, document_len)
{
continue; } else {
let mut doc_buf = vec![0; document_len as usize];
if document_len > 0 {
self.page_file
.read_exact_at(&mut doc_buf, document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
debug!("Found with probable index");
r.push((DocId::from_u64(*doc_id), doc_buf));
continue;
}
}
let mut probable_index = if *doc_id > found_id {
let delta = doc_id - found_id;
ProbableIndex(probable_index.0 + delta)
} else {
let delta = found_id - doc_id;
ProbableIndex(probable_index.0.saturating_sub(delta))
};
if probable_index.0 >= self.next_available_header_offset as u64 {
probable_index.0 = self.next_available_header_offset.saturating_sub(1) as u64;
}
self.page_file
.read_exact_at(
&mut buff,
DOCUMENT_INDEX_OFFSET + (probable_index.0 * (4 + 4 + 8)),
)
.unwrap();
let new_found_id = u64::from_be_bytes([
buff[0], buff[1], buff[2], buff[3], buff[4], buff[5], buff[6], buff[7],
]);
let new_document_offset = u32::from_be_bytes([buff[8], buff[9], buff[10], buff[11]]);
let new_document_len = u32::from_be_bytes([buff[12], buff[13], buff[14], buff[15]]);
let a = self.fallback_search_document(
*doc_id,
Some((
probable_index.0,
(new_found_id, new_document_offset, new_document_len),
)),
);
if let Ok(Some((_, document_offset, document_len))) = a {
let mut doc_buf = vec![0; document_len as usize];
if document_len > 0 {
self.page_file
.read_exact_at(&mut doc_buf, document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
r.push((DocId::from_u64(*doc_id), doc_buf));
continue;
} else if let Err(e) = a {
error!("Error during fallback search: {:?}", e);
continue;
}
}
Ok(r)
}
pub fn reserve<'docs, DocId: DocumentId, Doc: Document>(
&mut self,
documents: &'docs [(DocId, Doc)],
) -> Result<ZeboPageReservedSpace<'docs, DocId, Doc>> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_offset = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_header_offset = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
let document_count = documents.len() as u32;
let documents_size = documents.iter().map(|(_, doc)| doc.len()).sum::<usize>() as u32;
let new_next_available_offset = next_available_offset + documents_size;
let new_next_available_header_offset = next_available_header_offset + document_count;
self.next_available_header_offset = new_next_available_header_offset;
let mut buf = [0; 8];
buf[0..4].copy_from_slice(&new_next_available_offset.to_be_bytes());
self.page_file
.write_at(&buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
buf[0..4].copy_from_slice(&new_next_available_header_offset.to_be_bytes());
self.page_file
.write_at(&buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let current_document_count = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
let new_document_count = current_document_count + document_count;
self.page_file
.write_at(&new_document_count.to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
let file = self
.page_file
.try_clone()
.map_err(ZeboError::OperationError)?;
Ok(ZeboPageReservedSpace {
file,
next_available_offset,
next_available_header_offset,
documents,
})
}
pub fn delete_documents(
&mut self,
documents_to_delete: &[(u64, ProbableIndex)],
clean_data: bool,
) -> Result<u32> {
if clean_data {
let header = self.get_header()?;
let mut v: Vec<u8> = vec![];
for (doc_id, _) in documents_to_delete {
let found = header.index.iter().find(|(d, _, _)| d == doc_id);
if let Some((_, document_offset, document_len)) = found {
let len = *document_len as usize;
if v.len() < len {
v.resize(len, 0);
}
self.page_file
.write_all_at(&v[0..len], *document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
}
}
let mut found = 0_u32;
let mut buf = [0; 16];
for i in 0..self.document_limit {
let (doc_id, offset, _) = match self.get_at(i as u64)? {
Some(x) => x,
None => continue,
};
if Self::is_uninitialized_entry(offset) {
continue;
}
if offset == u32::MAX {
continue;
}
if documents_to_delete.iter().any(|(d, _)| *d == doc_id) {
buf[0..8].copy_from_slice(&doc_id.to_be_bytes());
buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes());
buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
self.page_file
.write_all_at(&buf, DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8)) as u64)
.map_err(ZeboError::OperationError)?;
found += 1;
}
}
if found > 0 {
let document_count = self.get_document_count()?;
let new_document_count = document_count - found;
self.page_file
.write_all_at(&new_document_count.to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
}
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(found)
}
fn get_at(&self, document_index: u64) -> Result<Option<(u64, u32, u32)>> {
if (self.document_limit as u64) < document_index {
return Ok(None);
}
let mut buf = [0; 16];
if let Err(e) = self.page_file.read_exact_at(
&mut buf,
DOCUMENT_INDEX_OFFSET + (document_index * (4 + 4 + 8)),
) {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return Ok(None);
}
return Err(ZeboError::OperationError(e));
}
let doc_id = u64::from_be_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let document_offset = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let document_len = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
Ok(Some((doc_id, document_offset, document_len)))
}
#[instrument(skip(self, hint_data), fields(target_doc_id = target_doc_id))]
fn fallback_search_document(
&self,
target_doc_id: u64,
hint_data: Option<(u64, (u64, u32, u32))>,
) -> Result<Option<(u64, u32, u32)>> {
let (starting_index, starting_doc_id) = if let Some((index, (doc_id, offset, len))) =
hint_data
{
if doc_id == target_doc_id {
if Self::is_uninitialized_entry(offset) || Self::is_deleted(doc_id, offset, len) {
return Ok(None);
}
debug!("Found with hint");
return Ok(Some((doc_id, offset, len)));
}
let document_count = self.next_available_header_offset as u64;
if document_count == 0 {
return Ok(None);
}
let most_probable_index = if doc_id < target_doc_id {
(target_doc_id - doc_id + index).min(document_count - 1)
} else {
index.saturating_sub(doc_id - target_doc_id)
};
match self.get_at(most_probable_index)? {
None => {
(index, doc_id)
}
Some((found_doc_id, document_offset, document_len)) => {
if found_doc_id == target_doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(found_doc_id, document_offset, document_len)
{
return Ok(None);
}
debug!("Found with delta hint");
return Ok(Some((found_doc_id, document_offset, document_len)));
}
(most_probable_index, found_doc_id)
}
}
} else {
let document_count = self.next_available_header_offset;
if document_count == 0 {
return Ok(None);
}
let first_doc_id = self.starting_document_id;
if document_count == 1 {
(0, first_doc_id)
} else {
let last_index = (document_count - 1) as u64;
match self.get_at(last_index)? {
None => (0, first_doc_id),
Some((last_doc_id, _, _)) => {
let distance_from_first = target_doc_id.abs_diff(first_doc_id);
let distance_from_last = target_doc_id.abs_diff(last_doc_id);
if distance_from_last < distance_from_first {
(last_index, last_doc_id)
} else {
(0, first_doc_id)
}
}
}
}
};
let delta: i32 = if starting_doc_id < target_doc_id {
1
} else {
-1
};
let mut tries = 0;
let mut current_index = starting_index;
loop {
tries += 1;
match self.get_at(current_index)? {
None => {
return self.find_in_range(target_doc_id, starting_index, 50);
}
Some((doc_id, document_offset, document_len)) => {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(doc_id, document_offset, document_len)
{
let temp_current_index = current_index as i128 + delta as i128;
if temp_current_index < 0 {
break;
}
current_index = temp_current_index as u64;
continue;
}
if doc_id == target_doc_id {
debug!(tries = ?tries, "Found after some retries");
return Ok(Some((doc_id, document_offset, document_len)));
}
let current_delta = if doc_id < target_doc_id { 1 } else { -1 };
if current_delta != delta {
return self.find_in_range(target_doc_id, starting_index, 50);
}
let temp_current_index = current_index as i128 + current_delta as i128;
if temp_current_index < 0 {
break;
}
current_index = temp_current_index as u64;
}
}
}
Ok(None)
}
fn find_in_range(
&self,
target_doc_id: u64,
index: u64,
delta: u64,
) -> Result<Option<(u64, u32, u32)>> {
let starting_index = index.saturating_sub(delta);
let ending_index = index + delta;
let mut buf = [0; 16];
for i in starting_index..=ending_index {
match self
.page_file
.read_exact_at(&mut buf, DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8)))
{
Ok(_) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
return Err(ZeboError::OperationError(e));
}
};
let doc_id = u64::from_be_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let document_offset = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let document_len = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
if doc_id == target_doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(doc_id, document_offset, document_len)
{
return Ok(None);
}
debug!("Found in range search");
return Ok(Some((doc_id, document_offset, document_len)));
}
}
self.find_from_start(target_doc_id)
}
fn find_from_start(&self, target_doc_id: u64) -> Result<Option<(u64, u32, u32)>> {
let mut buf = [0; 16];
for i in 0..=1179623 {
match self
.page_file
.read_exact_at(&mut buf, DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8)))
{
Ok(_) => {
let doc_id = u64::from_be_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let document_offset = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let document_len = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
if doc_id == target_doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(doc_id, document_offset, document_len)
{
return Ok(None);
} else {
return Ok(Some((doc_id, document_offset, document_len)));
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
return Err(ZeboError::OperationError(e));
}
}
}
Ok(None)
}
pub fn close(&mut self) -> Result<()> {
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(())
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn debug_content_with_options(
&self,
writer: &mut dyn std::io::Write,
skip_content_checks: bool,
skip_document_content: bool,
skip_header_info: bool,
wanted_doc_id: Option<u64>,
starting_doc_id: Option<u64>,
) -> Result<()> {
let mut buf = [0; 1];
self.page_file
.read_exact_at(&mut buf, VERSION_OFFSET)
.unwrap();
let version = u8::from_be_bytes(buf);
writeln!(writer, "Version: {version}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_LIMIT_OFFSET)
.unwrap();
let document_limit = u32::from_be_bytes(buf);
writeln!(writer, "Document Limit: {document_limit}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.unwrap();
let document_count = u32::from_be_bytes(buf);
writeln!(writer, "Document Count: {document_count}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.unwrap();
let next_available_offset = u32::from_be_bytes(buf);
writeln!(writer, "Next Available Offset: {next_available_offset}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.unwrap();
let next_available_header_offset = u32::from_be_bytes(buf);
writeln!(
writer,
"Next Available Header Offset: {next_available_header_offset}"
)
.unwrap();
let mut buf = [0; 8];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_INDEX_OFFSET)
.unwrap();
let starting_document_id = u64::from_be_bytes(buf);
writeln!(writer, "Starting Document ID: {starting_document_id}").unwrap();
let mut offset = DOCUMENT_INDEX_OFFSET;
let mut doc_id = [0; 8];
let mut starting_offset = [0; 4];
let mut bytes_length = [0; 4];
let mut docs: Vec<u8> = vec![];
let mut i = -1_i128;
loop {
i += 1;
if i > self.next_available_header_offset as i128 {
break;
}
self.page_file.read_exact_at(&mut doc_id, offset).unwrap();
if doc_id == [0; 8] {
break;
}
match self
.page_file
.read_exact_at(&mut starting_offset, offset + 8)
{
Ok(_) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
return Err(ZeboError::OperationError(e));
}
};
match self.page_file.read_exact_at(&mut bytes_length, offset + 12) {
Ok(_) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
return Err(ZeboError::OperationError(e));
}
};
let doc_id = u64::from_be_bytes(doc_id);
offset += 8 + 4 + 4;
if let Some(wanted_doc_id) = wanted_doc_id
&& doc_id != wanted_doc_id
{
continue;
}
if let Some(starting_doc_id) = starting_doc_id
&& doc_id < starting_doc_id
{
continue;
}
let starting_offset = u32::from_be_bytes(starting_offset);
let bytes_length = u32::from_be_bytes(bytes_length);
if Self::is_uninitialized_entry(starting_offset) {
break; }
if !skip_header_info {
writeln!(
writer,
"# {i} - Document id: {doc_id}, starting_offset: {starting_offset}, bytes_length: {bytes_length}"
)
.unwrap();
}
if doc_id == u64::MAX {
break; }
if bytes_length == u32::MAX || starting_offset == u32::MAX {
if !skip_document_content {
writeln!(writer, "Document is deleted or uninitialized").unwrap();
}
} else {
if docs.len() < bytes_length as usize {
docs.resize(bytes_length as usize, 0);
}
let slice = &mut docs[0..bytes_length as usize];
if !skip_content_checks {
if let Err(e) = self.page_file.read_exact_at(slice, starting_offset as u64) {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
writeln!(writer, "Document content: [incomplete data, expected length {bytes_length} bytes]").unwrap();
continue;
}
return Err(ZeboError::OperationError(e));
}
let probable_index = ProbableIndex(doc_id - starting_document_id);
let output = self
.get_documents::<u64>(&[(doc_id, probable_index)])
.unwrap();
assert_eq!(output.len(), 1, "Document id {doc_id} not found");
let (f_doc_id, f_content) = &output[0];
assert_eq!(*f_doc_id, doc_id, "Document id mismatch");
assert_eq!(*f_content, slice, "Document content mismatch");
}
if !skip_document_content {
self.page_file
.read_exact_at(slice, starting_offset as u64)
.unwrap();
match String::from_utf8(slice.to_vec()) {
Ok(s) => {
writeln!(writer, "{s}").unwrap();
}
Err(_) => {
writeln!(
writer,
"Document content: [binary data of {} bytes]",
slice.len()
)
.unwrap();
}
}
}
}
}
Ok(())
}
}
#[cfg_attr(test, derive(Debug))]
pub struct ZeboPageReservedSpace<'docs, DocId, Doc> {
file: File,
next_available_offset: u32,
next_available_header_offset: u32,
documents: &'docs [(DocId, Doc)],
}
impl<'docs, DocId: DocumentId, Doc: Document> ZeboPageReservedSpace<'docs, DocId, Doc> {
pub fn write_all(mut self) -> Result<()> {
let mut all_document_bytes = vec![];
let mut document_size_per_doc = Vec::with_capacity(self.documents.len() * 16);
let mut document_offset = self.next_available_offset;
for (doc_id, doc) in self.documents.iter() {
let current_bytes_len = all_document_bytes.len();
doc.as_bytes(&mut all_document_bytes);
let doc_bytes_len = all_document_bytes.len() - current_bytes_len;
let doc_bytes_len = doc_bytes_len as u32;
let doc_id = doc_id.as_u64();
let doc_id_bytes = doc_id.to_be_bytes();
let document_offset_bytes = document_offset.to_be_bytes();
let doc_bytes_len_bytes = doc_bytes_len.to_be_bytes();
document_offset += doc_bytes_len;
document_size_per_doc.extend_from_slice(&[
doc_id_bytes[0],
doc_id_bytes[1],
doc_id_bytes[2],
doc_id_bytes[3],
doc_id_bytes[4],
doc_id_bytes[5],
doc_id_bytes[6],
doc_id_bytes[7],
document_offset_bytes[0],
document_offset_bytes[1],
document_offset_bytes[2],
document_offset_bytes[3],
doc_bytes_len_bytes[0],
doc_bytes_len_bytes[1],
doc_bytes_len_bytes[2],
doc_bytes_len_bytes[3],
]);
}
self.file
.write_all_at(
&document_size_per_doc,
DOCUMENT_INDEX_OFFSET + (self.next_available_header_offset * 16) as u64,
)
.map_err(ZeboError::OperationError)?;
self.file
.write_all_at(&all_document_bytes, self.next_available_offset as u64)
.map_err(ZeboError::OperationError)?;
self.file.flush().map_err(ZeboError::OperationError)?;
self.file.sync_all().map_err(ZeboError::OperationError)?;
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub struct ZeboPageHeader {
pub document_limit: u32,
pub document_count: u32,
pub next_available_document_offset: u32,
pub next_available_header_offset: u32,
pub index: Vec<(u64, u32, u32)>,
}
#[cfg(test)]
mod tests {
use crate::tests::prepare_test_dir;
use super::*;
#[test]
fn test_zebo_page_check_internals_empty() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_document_offset, 57);
assert_eq!(header.index.len(), 0);
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2
);
for i in 0..2 {
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
0
);
}
}
#[test]
fn test_zebo_page_check_internals_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_document_offset, 57);
assert_eq!(header.index.len(), 0);
page.reserve(&[(1_u64, "ab")]).unwrap().write_all().unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2 + 2
);
let i = 0;
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
57
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
2
);
}
#[test]
fn test_zebo_page_check_internals_add_remove_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
page.reserve(&[(1_u32, "ab")]).unwrap().write_all().unwrap();
page.reserve(&[(2_u32, "cd")]).unwrap().write_all().unwrap();
page.reserve(&[(3_u32, "ef")]).unwrap().write_all().unwrap();
page.delete_documents(&[(2, ProbableIndex(0))], true)
.unwrap();
page.reserve(&[(4_u32, "gh")]).unwrap().write_all().unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(
&file_content[41..89],
&[
0, 0, 0, 0, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0,
0, 3, 0, 0, 0, 189, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 191, 0, 0, 0, 2
]
);
assert_eq!(&file_content[89..185], &[0; 185 - 89],);
assert_eq!(&file_content[185..], "ab\0\0efgh".as_bytes(),);
}
#[test]
fn test_page_get_documents_with_gaps() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
page.reserve(&[(1_u32, "a")]).unwrap().write_all().unwrap();
page.reserve(&[(5_u32, "e")]).unwrap().write_all().unwrap();
page.reserve(&[(8_u32, "h")]).unwrap().write_all().unwrap();
page.reserve(&[(12_u32, "l")]).unwrap().write_all().unwrap();
let existing_docs = vec![
(1, ProbableIndex(1)), (5, ProbableIndex(5)), (8, ProbableIndex(8)), (12, ProbableIndex(12)), ];
let result = page.get_documents::<u32>(&existing_docs).unwrap();
assert_eq!(result.len(), 4);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
assert_eq!(sorted_result[3], (12, b"l".to_vec()));
let missing_docs = vec![
(2, ProbableIndex(2)),
(3, ProbableIndex(3)),
(4, ProbableIndex(4)),
(6, ProbableIndex(6)),
(7, ProbableIndex(7)),
(9, ProbableIndex(9)),
(10, ProbableIndex(10)),
(11, ProbableIndex(11)),
(13, ProbableIndex(13)), (14, ProbableIndex(14)), ];
let result = page.get_documents::<u32>(&missing_docs).unwrap();
assert_eq!(
result.len(),
0,
"Should return no documents for missing IDs"
);
let mixed_docs = vec![
(1, ProbableIndex(1)), (2, ProbableIndex(2)), (5, ProbableIndex(5)), (6, ProbableIndex(6)), (8, ProbableIndex(8)), (9, ProbableIndex(9)), (12, ProbableIndex(12)), (15, ProbableIndex(15)), ];
let result = page.get_documents::<u32>(&mixed_docs).unwrap();
assert_eq!(result.len(), 4, "Should return only existing documents");
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
assert_eq!(sorted_result[3], (12, b"l".to_vec()));
let wrong_probable_docs = vec![
(1, ProbableIndex(0)), (5, ProbableIndex(2)), (8, ProbableIndex(50)), ];
let result = page.get_documents::<u32>(&wrong_probable_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should find documents even with wrong ProbableIndex"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
page.delete_documents(&[(5, ProbableIndex(5))], false)
.unwrap();
let after_deletion_docs = vec![
(1, ProbableIndex(1)), (5, ProbableIndex(5)), (8, ProbableIndex(8)), (12, ProbableIndex(12)), ];
let result = page.get_documents::<u32>(&after_deletion_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should return 3 documents after deleting one"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (8, b"h".to_vec()));
assert_eq!(sorted_result[2], (12, b"l".to_vec()));
let mixed_search_docs = vec![
(1, ProbableIndex(0)), (8, ProbableIndex(50)), (12, ProbableIndex(1)), (5, ProbableIndex(5)), (99, ProbableIndex(2)), ];
let result = page.get_documents::<u32>(&mixed_search_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should find 3 existing non-deleted documents"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (8, b"h".to_vec()));
assert_eq!(sorted_result[2], (12, b"l".to_vec()));
}
#[test]
fn test_backwards_compatible_old_deletion_format() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
page.reserve(&[(1_u32, "a")]).unwrap().write_all().unwrap();
page.reserve(&[(2_u32, "b")]).unwrap().write_all().unwrap();
page.reserve(&[(3_u32, "c")]).unwrap().write_all().unwrap();
let mut old_deletion_buf = [0u8; 16];
old_deletion_buf[0..8].copy_from_slice(&u64::MAX.to_be_bytes()); old_deletion_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes()); old_deletion_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&old_deletion_buf, DOCUMENT_INDEX_OFFSET + 16)
.unwrap();
let document_count = page.get_document_count().unwrap();
page.page_file
.write_all_at(&(document_count - 1).to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.unwrap();
let header = page.get_header().unwrap();
assert_eq!(header.document_count, 2);
assert_eq!(header.index.len(), 2);
let doc_ids: Vec<u64> = header.index.iter().map(|(id, _, _)| *id).collect();
assert!(doc_ids.contains(&1));
assert!(doc_ids.contains(&3));
assert!(!doc_ids.contains(&2));
assert!(!doc_ids.contains(&u64::MAX));
let result = page
.get_documents::<u32>(&[(1, ProbableIndex(0)), (3, ProbableIndex(2))])
.unwrap();
assert_eq!(result.len(), 2);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (3, b"c".to_vec()));
let deleted_result = page.get_documents::<u32>(&[(2, ProbableIndex(1))]).unwrap();
assert_eq!(deleted_result.len(), 0);
}
#[test]
fn test_mixed_deletion_formats() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
page.reserve(&[(1_u32, "a")]).unwrap().write_all().unwrap();
page.reserve(&[(2_u32, "b")]).unwrap().write_all().unwrap();
page.reserve(&[(3_u32, "c")]).unwrap().write_all().unwrap();
page.reserve(&[(4_u32, "d")]).unwrap().write_all().unwrap();
let mut old_deletion_buf = [0u8; 16];
old_deletion_buf[0..8].copy_from_slice(&u64::MAX.to_be_bytes());
old_deletion_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes());
old_deletion_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&old_deletion_buf, DOCUMENT_INDEX_OFFSET + 16)
.unwrap();
page.delete_documents(&[(4, ProbableIndex(3))], false)
.unwrap();
let header = page.get_header().unwrap();
assert_eq!(header.document_count, 3); assert_eq!(header.index.len(), 2);
let doc_ids: Vec<u64> = header.index.iter().map(|(id, _, _)| *id).collect();
assert!(doc_ids.contains(&1));
assert!(doc_ids.contains(&3));
assert!(!doc_ids.contains(&2));
assert!(!doc_ids.contains(&4));
let all_docs = page
.get_documents::<u32>(&[
(1, ProbableIndex(0)), (2, ProbableIndex(1)), (3, ProbableIndex(2)), (4, ProbableIndex(3)), ])
.unwrap();
assert_eq!(all_docs.len(), 2);
let mut sorted_result = all_docs;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (3, b"c".to_vec()));
}
#[test]
fn test_deletion_detection_helper() {
assert!(ZeboPage::is_deleted(u64::MAX, u32::MAX, u32::MAX));
assert!(ZeboPage::is_deleted(u64::MAX, u32::MAX, 0));
assert!(ZeboPage::is_deleted(1, u32::MAX, u32::MAX));
assert!(ZeboPage::is_deleted(12345, u32::MAX, u32::MAX));
assert!(!ZeboPage::is_deleted(1, 100, 50));
assert!(!ZeboPage::is_deleted(u64::MAX, 100, 50));
assert!(!ZeboPage::is_deleted(1, 0, 0)); assert!(!ZeboPage::is_deleted(1, u32::MAX, 50)); assert!(!ZeboPage::is_deleted(1, 100, u32::MAX)); }
#[test]
fn test_uninitialized_entry_detection() {
assert!(ZeboPage::is_uninitialized_entry(0));
assert!(!ZeboPage::is_uninitialized_entry(1));
assert!(!ZeboPage::is_uninitialized_entry(57)); assert!(!ZeboPage::is_uninitialized_entry(100));
assert!(!ZeboPage::is_uninitialized_entry(u32::MAX)); }
#[test]
fn test_fallback_search_optimization_start_from_end() {
use crate::tests::prepare_test_dir;
let _ = tracing_subscriber::fmt::try_init();
let test_dir = prepare_test_dir();
let file_path = test_dir.join("optimization_test_page.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 1000, page_file).expect("Failed to create page");
let doc_ids = vec![100_u32, 200, 300, 400, 500, 600, 700, 800, 900, 950];
for doc_id in &doc_ids {
page.reserve(&[(*doc_id, "xx")])
.unwrap()
.write_all()
.unwrap();
}
let result = page
.fallback_search_document(950, None)
.expect("Search failed");
assert!(result.is_some());
assert_eq!(result.unwrap().0, 950);
let result = page
.fallback_search_document(100, None)
.expect("Search failed");
assert!(result.is_some());
assert_eq!(result.unwrap().0, 100);
let result = page
.fallback_search_document(940, None)
.expect("Search failed");
assert!(result.is_none());
let result = page
.fallback_search_document(150, None)
.expect("Search failed");
assert!(result.is_none());
}
#[test]
fn test_fallback_search_bug_with_deleted_target_id() {
use crate::tests::prepare_test_dir;
let test_dir = prepare_test_dir();
let file_path = test_dir.join("bug_test_page.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 100, page_file).expect("Failed to create page");
page.reserve(&[(101_u32, "aa")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(102_u32, "bb")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(105_u32, "ee")])
.unwrap()
.write_all()
.unwrap();
let mut deleted_entry_buf = [0u8; 16];
deleted_entry_buf[0..8].copy_from_slice(&105_u64.to_be_bytes()); deleted_entry_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes()); deleted_entry_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&deleted_entry_buf, DOCUMENT_INDEX_OFFSET + 16)
.expect("Failed to write corrupted entry");
let result = page
.get_documents::<u64>(&[(105, ProbableIndex(999))])
.expect("Get documents failed");
assert_eq!(result.len(), 1);
assert_eq!(result[0].0, 105);
assert_eq!(result[0].1, b"ee".to_vec());
}
#[test]
fn test_fallback_search_wrong_document_order() {
use crate::tests::prepare_test_dir;
let _ = tracing_subscriber::fmt::try_init();
let test_dir = prepare_test_dir();
let file_path = test_dir.join("wrong_document_order.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 1000, page_file).expect("Failed to create page");
page.reserve(&[(1_u32, "hello")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(2_u32, "world")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(4_u32, "aaaaa")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(3_u32, "bbbbb")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(5_u32, "ccccc")])
.unwrap()
.write_all()
.unwrap();
let docs = page
.get_documents::<u64>(&[
(1, ProbableIndex(0)),
(2, ProbableIndex(1)),
(3, ProbableIndex(3)),
(4, ProbableIndex(2)),
(5, ProbableIndex(4)),
])
.unwrap();
assert_eq!(docs.len(), 5);
assert_eq!(docs[0], (1, b"hello".to_vec()));
assert_eq!(docs[1], (2, b"world".to_vec()));
assert_eq!(docs[2], (3, b"bbbbb".to_vec()));
assert_eq!(docs[3], (4, b"aaaaa".to_vec()));
assert_eq!(docs[4], (5, b"ccccc".to_vec()));
for doc_id in 1..=5 {
let docs = page.fallback_search_document(doc_id, None).unwrap();
assert!(docs.is_some());
}
}
#[test]
fn test_fallback_search_start_from_initial() {
use crate::tests::prepare_test_dir;
let _ = tracing_subscriber::fmt::try_init();
let test_dir = prepare_test_dir();
let file_path = test_dir.join("wrong_document_order.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 1000, page_file).expect("Failed to create page");
page.reserve(&[(1_u32, "hello")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(2_u32, "world")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(4_u32, "aaaaa")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(3_u32, "bbbbb")])
.unwrap()
.write_all()
.unwrap();
page.reserve(&[(5_u32, "ccccc")])
.unwrap()
.write_all()
.unwrap();
let docs = page
.get_documents::<u64>(&[
(1, ProbableIndex(0)),
(2, ProbableIndex(1)),
(3, ProbableIndex(3)),
(4, ProbableIndex(2)),
(5, ProbableIndex(4)),
])
.unwrap();
assert_eq!(docs.len(), 5);
assert_eq!(docs[0], (1, b"hello".to_vec()));
assert_eq!(docs[1], (2, b"world".to_vec()));
assert_eq!(docs[2], (3, b"bbbbb".to_vec()));
assert_eq!(docs[3], (4, b"aaaaa".to_vec()));
assert_eq!(docs[4], (5, b"ccccc".to_vec()));
for doc_id in 1..=5 {
let docs = page.find_from_start(doc_id).unwrap();
assert!(docs.is_some());
}
}
}