use std::{io::Write, os::unix::fs::FileExt};
use tracing::{debug, error, instrument};
use crate::{DocumentId, Result, Version, ZeboError, index::ProbableIndex};
pub const VERSION_OFFSET: u64 = 0;
pub const DOCUMENT_COUNT_LIMIT_OFFSET: u64 = VERSION_OFFSET + 1;
pub const DOCUMENT_COUNT_OFFSET: u64 = DOCUMENT_COUNT_LIMIT_OFFSET + 4;
pub const NEXT_AVAILABLE_OFFSET: u64 = DOCUMENT_COUNT_OFFSET + 4;
pub const NEXT_AVAILABLE_HEADER_OFFSET: u64 = NEXT_AVAILABLE_OFFSET + 4;
pub const STARTING_DOCUMENT_ID_OFFSET: u64 = NEXT_AVAILABLE_HEADER_OFFSET + 4;
pub const DOCUMENT_INDEX_OFFSET: u64 = STARTING_DOCUMENT_ID_OFFSET + 8;
pub struct ZeboPage {
document_limit: u32,
#[allow(dead_code)]
pub(crate) starting_document_id: u64,
page_file: std::fs::File,
next_available_header_offset: u32,
}
impl ZeboPage {
pub fn try_new(
document_limit: u32,
starting_document_id: u64,
mut page_file: std::fs::File,
) -> Result<Self> {
let document_header_size = (4 + 4 + 8) * (document_limit as u64);
page_file
.set_len(DOCUMENT_INDEX_OFFSET + document_header_size)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[Version::V1.into()], VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&document_limit.to_be_bytes(), DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[0; 4], DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let initial_available_offset = (DOCUMENT_INDEX_OFFSET + document_header_size) as u32;
page_file
.write_all_at(
&initial_available_offset.to_be_bytes(),
NEXT_AVAILABLE_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(
&starting_document_id.to_be_bytes(),
STARTING_DOCUMENT_ID_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file.flush().map_err(ZeboError::OperationError)?;
page_file.sync_all().map_err(ZeboError::OperationError)?;
Ok(Self {
document_limit,
starting_document_id,
page_file,
next_available_header_offset: 0,
})
}
pub fn try_load(page_file: std::fs::File) -> Result<Self> {
let mut buf = [0; 1];
page_file
.read_exact_at(&mut buf, VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
let version = buf[0];
if version != Version::V1.into() {
return Err(ZeboError::UnsupportedVersion {
version,
wanted: Version::V1.into(),
});
}
let mut buf = [0; 4];
page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_limit = u32::from_be_bytes(buf);
page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_header_offset = u32::from_be_bytes(buf);
let mut buf = [0; 8];
page_file
.read_exact_at(&mut buf, STARTING_DOCUMENT_ID_OFFSET)
.map_err(ZeboError::OperationError)?;
let starting_document_id = u64::from_be_bytes(buf);
Ok(Self {
page_file,
document_limit,
starting_document_id,
next_available_header_offset,
})
}
pub fn get_document_count(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_count = u32::from_be_bytes(buf);
Ok(document_count)
}
fn get_next_available_offset(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_offset = u32::from_be_bytes(buf);
Ok(next_available_offset)
}
pub fn current_file_size(&self) -> Result<u64> {
let metadata = self
.page_file
.metadata()
.map_err(ZeboError::OperationError)?;
Ok(metadata.len())
}
#[inline]
fn is_deleted(doc_id: u64, document_offset: u32, document_len: u32) -> bool {
if document_offset == u32::MAX && document_len == u32::MAX {
return true;
}
if doc_id == u64::MAX && document_offset == u32::MAX {
return true;
}
false
}
#[inline]
fn is_uninitialized_entry(document_offset: u32) -> bool {
document_offset == 0
}
pub fn get_header(&self) -> Result<ZeboPageHeader> {
let document_count = self.get_document_count()?;
let next_available_offset = self.get_next_available_offset()?;
let mut doc_index = Vec::with_capacity(document_count as usize);
let mut found = 0;
let mut i: u64 = 0;
while found < document_count {
if i > (self.document_limit as u64) {
break;
}
if let Some((doc_id, document_offset, document_len)) = self.get_at(i)? {
if Self::is_uninitialized_entry(document_offset) {
break;
}
if Self::is_deleted(doc_id, document_offset, document_len) {
i += 1;
continue;
}
doc_index.push((doc_id, document_offset, document_len));
found += 1;
}
i += 1;
}
let header = ZeboPageHeader {
document_limit: self.document_limit,
document_count,
next_available_offset,
index: doc_index,
};
Ok(header)
}
pub fn get_documents<DocId: DocumentId>(
&self,
doc_id_with_index: &[(u64, ProbableIndex)],
) -> Result<Vec<(DocId, Vec<u8>)>> {
let mut r = Vec::with_capacity(doc_id_with_index.len());
for (doc_id, probable_index) in doc_id_with_index {
let data_at_probable_index = if probable_index.0 < self.document_limit as u64 {
match self.get_at(probable_index.0)? {
Some((found_id, document_offset, document_len)) => {
if Self::is_deleted(found_id, document_offset, document_len)
|| Self::is_uninitialized_entry(document_offset)
{
Some((found_id, document_offset, document_len))
} else if found_id == *doc_id {
let mut doc_buf = vec![0; document_len as usize];
if document_len > 0 {
self.page_file
.read_exact_at(&mut doc_buf, document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
debug!("Found with probable index");
r.push((DocId::from_u64(*doc_id), doc_buf));
continue;
} else {
Some((found_id, document_offset, document_len))
}
}
None => None,
}
} else {
None
};
let data_at_probable_index =
data_at_probable_index.and_then(|(found_id, document_offset, document_len)| {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(found_id, document_offset, document_len)
{
return None;
}
Some((probable_index.0, (found_id, document_offset, document_len)))
});
if let Some((_, document_offset, document_len)) =
self.fallback_search_document(*doc_id, data_at_probable_index)?
{
let mut doc_buf = vec![0; document_len as usize];
if document_len > 0 {
self.page_file
.read_exact_at(&mut doc_buf, document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
r.push((DocId::from_u64(*doc_id), doc_buf));
}
}
Ok(r)
}
pub fn reserve_space(&mut self, doc_id: u64, len: u32) -> Result<ZeboReservedSpace<'_>> {
let next_available_offset = self.get_next_available_offset()?;
let document_count = self.get_document_count()?;
let available_header_offset = self.next_available_header_offset;
{
self.next_available_header_offset += 1;
let buf = self.next_available_header_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let next_available_offset = next_available_offset + len;
let buf = next_available_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let document_count = document_count + 1;
let buf = document_count.to_be_bytes();
self.page_file
.write_all_at(&buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let document_offset = next_available_offset;
let mut buf = [0; 16];
buf[0..8].copy_from_slice(&doc_id.to_be_bytes());
buf[8..12].copy_from_slice(&document_offset.to_be_bytes());
buf[12..16].copy_from_slice(&len.to_be_bytes());
self.page_file
.write_all_at(
&buf,
DOCUMENT_INDEX_OFFSET + (available_header_offset * (4 + 4 + 8)) as u64,
)
.map_err(ZeboError::OperationError)?;
}
Ok(ZeboReservedSpace {
page: self,
document_offset: next_available_offset,
len,
})
}
pub fn delete_documents(
&mut self,
documents_to_delete: &[(u64, ProbableIndex)],
clean_data: bool,
) -> Result<u32> {
if clean_data {
let header = self.get_header()?;
let mut v: Vec<u8> = vec![];
for (doc_id, _) in documents_to_delete {
let found = header.index.iter().find(|(d, _, _)| d == doc_id);
if let Some((_, document_offset, document_len)) = found {
let len = *document_len as usize;
if v.len() < len {
v.resize(len, 0);
}
self.page_file
.write_all_at(&v[0..len], *document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
}
}
let mut found = 0_u32;
let mut buf = [0; 16];
for i in 0..self.document_limit {
let (doc_id, offset, _) = match self.get_at(i as u64)? {
Some(x) => x,
None => continue,
};
if Self::is_uninitialized_entry(offset) {
continue;
}
if offset == u32::MAX {
continue;
}
if documents_to_delete.iter().any(|(d, _)| *d == doc_id) {
buf[0..8].copy_from_slice(&doc_id.to_be_bytes());
buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes());
buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
self.page_file
.write_all_at(&buf, DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8)) as u64)
.map_err(ZeboError::OperationError)?;
found += 1;
}
}
if found > 0 {
let document_count = self.get_document_count()?;
let new_document_count = document_count - found;
self.page_file
.write_all_at(&new_document_count.to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
}
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(found)
}
fn get_at(&self, document_index: u64) -> Result<Option<(u64, u32, u32)>> {
if (self.document_limit as u64) < document_index {
return Ok(None);
}
let mut buf = [0; 16];
if let Err(e) = self.page_file.read_exact_at(
&mut buf,
DOCUMENT_INDEX_OFFSET + (document_index * (4 + 4 + 8)),
) {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return Ok(None);
}
return Err(ZeboError::OperationError(e));
}
let doc_id = u64::from_be_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let document_offset = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let document_len = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
Ok(Some((doc_id, document_offset, document_len)))
}
#[instrument(skip(self, hint_data), fields(target_doc_id = target_doc_id))]
fn fallback_search_document(
&self,
target_doc_id: u64,
hint_data: Option<(u64, (u64, u32, u32))>,
) -> Result<Option<(u64, u32, u32)>> {
let (starting_index, starting_doc_id) = if let Some((index, (doc_id, offset, len))) =
hint_data
{
if doc_id == target_doc_id {
if Self::is_uninitialized_entry(offset) || Self::is_deleted(doc_id, offset, len) {
return Ok(None);
}
debug!("Found with hint");
return Ok(Some((doc_id, offset, len)));
}
let document_count = self.get_document_count()? as u64;
let most_probable_index = if doc_id < target_doc_id {
(target_doc_id - doc_id + index).min(document_count - 1)
} else {
index.saturating_sub(doc_id - target_doc_id)
};
match self.get_at(most_probable_index)? {
None => {
(index, doc_id)
}
Some((found_doc_id, document_offset, document_len)) => {
if found_doc_id == target_doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(found_doc_id, document_offset, document_len)
{
return Ok(None);
}
debug!("Found with delta hint");
return Ok(Some((found_doc_id, document_offset, document_len)));
}
(most_probable_index, found_doc_id)
}
}
} else {
let document_count = self.get_document_count()?;
if document_count == 0 {
return Ok(None);
}
let first_doc_id = self.starting_document_id;
if document_count == 1 {
(0, first_doc_id)
} else {
let last_index = (document_count - 1) as u64;
match self.get_at(last_index)? {
None => (0, first_doc_id),
Some((last_doc_id, _, _)) => {
let distance_from_first = target_doc_id.abs_diff(first_doc_id);
let distance_from_last = target_doc_id.abs_diff(last_doc_id);
if distance_from_last < distance_from_first {
(last_index, last_doc_id)
} else {
(0, first_doc_id)
}
}
}
}
};
let delta: i32 = if starting_doc_id < target_doc_id {
1
} else {
-1
};
let mut tries = 0;
let mut current_index = starting_index;
loop {
tries += 1;
match self.get_at(current_index)? {
None => {
return Ok(None);
}
Some((doc_id, document_offset, document_len)) => {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(doc_id, document_offset, document_len)
{
let temp_current_index = current_index as i128 + delta as i128;
if temp_current_index < 0 {
break;
}
current_index = temp_current_index as u64;
continue;
}
if doc_id == target_doc_id {
debug!(tries = ?tries, "Found after some retries");
return Ok(Some((doc_id, document_offset, document_len)));
}
let current_delta = if doc_id < target_doc_id { 1 } else { -1 };
if current_delta != delta {
return self.find_in_range(target_doc_id, starting_index, 50);
}
let temp_current_index = current_index as i128 + current_delta as i128;
if temp_current_index < 0 {
break;
}
current_index = temp_current_index as u64;
}
}
}
Ok(None)
}
fn find_in_range(
&self,
target_doc_id: u64,
index: u64,
delta: u64,
) -> Result<Option<(u64, u32, u32)>> {
let starting_index = index.saturating_sub(delta);
let ending_index = index + delta;
for i in starting_index..=ending_index {
match self.get_at(i) {
Ok(Some((doc_id, document_offset, document_len))) => {
if doc_id == target_doc_id {
if Self::is_uninitialized_entry(document_offset)
|| Self::is_deleted(doc_id, document_offset, document_len)
{
return Ok(None);
}
debug!("Found in range search");
return Ok(Some((doc_id, document_offset, document_len)));
}
}
Ok(None) => break,
Err(e) => {
error!("Error during range search: {:?}", e);
continue;
}
}
}
Ok(None)
}
pub fn close(&mut self) -> Result<()> {
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(())
}
pub fn reserve_multiple_space<'a, I: Iterator<Item = &'a (u64, u32)>>(
&mut self,
len: usize,
docs: I,
) -> Result<ZeboMultiReservedSpace<'_>> {
let mut next_available_offset = self.get_next_available_offset()?;
let mut document_count = self.get_document_count()?;
let mut available_header_offset = self.next_available_header_offset;
let initial_available_header_offset = available_header_offset;
self.next_available_header_offset += len as u32;
let mut total_len = 0;
let initial_next_available_offset = next_available_offset;
let mut index_buf = Vec::with_capacity(len * 16);
for &(doc_id, len) in docs {
let mut buf = [0u8; 16];
buf[0..8].copy_from_slice(&doc_id.to_be_bytes());
buf[8..12].copy_from_slice(&next_available_offset.to_be_bytes());
buf[12..16].copy_from_slice(&len.to_be_bytes());
index_buf.extend_from_slice(&buf);
total_len += len;
next_available_offset =
next_available_offset
.checked_add(len)
.ok_or(ZeboError::NotEnoughSpace {
limit: u32::MAX,
new_allocation_requested: len,
})?;
available_header_offset =
available_header_offset
.checked_add(1)
.ok_or(ZeboError::NotEnoughSpace {
limit: u32::MAX,
new_allocation_requested: 1,
})?;
document_count = document_count
.checked_add(1)
.ok_or(ZeboError::NotEnoughSpace {
limit: u32::MAX,
new_allocation_requested: 1,
})?;
}
let start_index_offset =
DOCUMENT_INDEX_OFFSET + (initial_available_header_offset as u64) * 16;
self.page_file
.write_all_at(&index_buf, start_index_offset)
.map_err(ZeboError::OperationError)?;
let buf = self.next_available_header_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
let buf = next_available_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
let buf = document_count.to_be_bytes();
self.page_file
.write_all_at(&buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
Ok(ZeboMultiReservedSpace {
page: self,
len: total_len,
document_offset: initial_next_available_offset,
})
}
pub fn debug_content_with_options(
&self,
writer: &mut dyn std::io::Write,
skip_content_checks: bool,
skip_document_content: bool,
skip_header_info: bool,
wanted_doc_id: Option<u64>,
starting_doc_id: Option<u64>,
) -> Result<()> {
let mut buf = [0; 1];
self.page_file
.read_exact_at(&mut buf, VERSION_OFFSET)
.unwrap();
let version = u8::from_be_bytes(buf);
writeln!(writer, "Version: {version}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_LIMIT_OFFSET)
.unwrap();
let document_limit = u32::from_be_bytes(buf);
writeln!(writer, "Document Limit: {document_limit}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.unwrap();
let document_count = u32::from_be_bytes(buf);
writeln!(writer, "Document Count: {document_count}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.unwrap();
let next_available_offset = u32::from_be_bytes(buf);
writeln!(writer, "Next Available Offset: {next_available_offset}").unwrap();
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.unwrap();
let next_available_header_offset = u32::from_be_bytes(buf);
writeln!(
writer,
"Next Available Header Offset: {next_available_header_offset}"
)
.unwrap();
let mut buf = [0; 8];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_INDEX_OFFSET)
.unwrap();
let starting_document_id = u64::from_be_bytes(buf);
writeln!(writer, "Starting Document ID: {starting_document_id}").unwrap();
let mut offset = DOCUMENT_INDEX_OFFSET;
let mut doc_id = [0; 8];
let mut starting_offset = [0; 4];
let mut bytes_length = [0; 4];
let mut docs: Vec<u8> = vec![];
let mut i = -1_i128;
loop {
i += 1;
if i > document_count as i128 {
break;
}
self.page_file.read_exact_at(&mut doc_id, offset).unwrap();
if doc_id == [0; 8] {
break;
}
self.page_file
.read_exact_at(&mut starting_offset, offset + 8)
.unwrap();
self.page_file
.read_exact_at(&mut bytes_length, offset + 12)
.unwrap();
let doc_id = u64::from_be_bytes(doc_id);
offset += 8 + 4 + 4;
if let Some(wanted_doc_id) = wanted_doc_id
&& doc_id != wanted_doc_id
{
continue;
}
if let Some(starting_doc_id) = starting_doc_id
&& doc_id < starting_doc_id
{
continue;
}
let starting_offset = u32::from_be_bytes(starting_offset);
let bytes_length = u32::from_be_bytes(bytes_length);
if !skip_header_info {
writeln!(
writer,
"# {i} - Document id: {doc_id}, starting_offset: {starting_offset}, bytes_length: {bytes_length}"
)
.unwrap();
}
if doc_id == u64::MAX {
break; }
if bytes_length == u32::MAX || starting_offset == u32::MAX {
if !skip_document_content {
writeln!(writer, "Document is deleted or uninitialized").unwrap();
}
} else {
if docs.len() < bytes_length as usize {
docs.resize(bytes_length as usize, 0);
}
let slice = &mut docs[0..bytes_length as usize];
if !skip_content_checks {
self.page_file
.read_exact_at(slice, starting_offset as u64)
.unwrap();
let probable_index = ProbableIndex(doc_id - starting_document_id);
let output = self
.get_documents::<u64>(&[(doc_id, probable_index)])
.unwrap();
assert_eq!(output.len(), 1, "Document id {doc_id} not found");
let (f_doc_id, f_content) = &output[0];
assert_eq!(*f_doc_id, doc_id, "Document id mismatch");
assert_eq!(*f_content, slice, "Document content mismatch");
}
if !skip_document_content {
self.page_file
.read_exact_at(slice, starting_offset as u64)
.unwrap();
match String::from_utf8(slice.to_vec()) {
Ok(s) => {
writeln!(writer, "{s}").unwrap();
}
Err(_) => {
writeln!(
writer,
"Document content: [binary data of {} bytes]",
slice.len()
)
.unwrap();
}
}
}
}
}
Ok(())
}
}
pub struct ZeboMultiReservedSpace<'page> {
page: &'page ZeboPage,
len: u32,
document_offset: u32,
}
impl ZeboMultiReservedSpace<'_> {
pub fn write(self, data: Vec<u8>) -> Result<()> {
if data.len() != self.len as usize {
return Err(ZeboError::WrongReservedSpace {
wanted: data.len(),
reserved: self.len,
});
}
self.page
.page_file
.write_all_at(&data, self.document_offset as u64)
.map_err(ZeboError::OperationError)?;
Ok(())
}
}
pub struct ZeboReservedSpace<'page> {
page: &'page ZeboPage,
len: u32,
document_offset: u32,
}
impl ZeboReservedSpace<'_> {
pub fn write(self, data: &[u8]) -> Result<()> {
let data_len = data.len() as u32;
if data_len > self.len {
return Err(ZeboError::NotEnoughReservedSpace {
wanted: data.len(),
reserved: self.len,
});
}
self.page
.page_file
.write_all_at(data, self.document_offset as u64)
.map_err(ZeboError::OperationError)?;
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub struct ZeboPageHeader {
pub document_limit: u32,
pub document_count: u32,
pub next_available_offset: u32,
pub index: Vec<(u64, u32, u32)>,
}
#[cfg(test)]
mod tests {
use crate::tests::prepare_test_dir;
use super::*;
#[test]
fn test_zebo_page_check_internals_empty() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
assert_eq!(page.get_next_available_offset().unwrap(), 57);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_offset, 57);
assert_eq!(header.index.len(), 0);
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2
);
for i in 0..2 {
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
0
);
}
}
#[test]
fn test_zebo_page_check_internals_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
assert_eq!(page.get_next_available_offset().unwrap(), 57);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_offset, 57);
assert_eq!(header.index.len(), 0);
let reserved_space = page.reserve_space(1, 2).unwrap();
reserved_space.write("ab".as_bytes()).unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2 + 2
);
let i = 0;
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
57
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
2
);
}
#[test]
fn test_zebo_page_check_internals_add_remove_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
let reserved_space = page.reserve_space(1, 2).unwrap();
reserved_space.write("ab".as_bytes()).unwrap();
let reserved_space = page.reserve_space(2, 2).unwrap();
reserved_space.write("cd".as_bytes()).unwrap();
let reserved_space = page.reserve_space(3, 2).unwrap();
reserved_space.write("ef".as_bytes()).unwrap();
page.delete_documents(&[(2, ProbableIndex(0))], true)
.unwrap();
let reserved_space = page.reserve_space(4, 2).unwrap();
reserved_space.write("ef".as_bytes()).unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(
&file_content[41..89],
&[
0, 0, 0, 0, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0,
0, 3, 0, 0, 0, 189, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 191, 0, 0, 0, 2
]
);
}
#[test]
fn test_page_get_documents_with_gaps() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
let reserved1 = page.reserve_space(1, 1).unwrap();
reserved1.write(b"a").unwrap();
let reserved5 = page.reserve_space(5, 1).unwrap();
reserved5.write(b"e").unwrap();
let reserved8 = page.reserve_space(8, 1).unwrap();
reserved8.write(b"h").unwrap();
let reserved12 = page.reserve_space(12, 1).unwrap();
reserved12.write(b"l").unwrap();
let existing_docs = vec![
(1, ProbableIndex(1)), (5, ProbableIndex(5)), (8, ProbableIndex(8)), (12, ProbableIndex(12)), ];
let result = page.get_documents::<u32>(&existing_docs).unwrap();
assert_eq!(result.len(), 4);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
assert_eq!(sorted_result[3], (12, b"l".to_vec()));
let missing_docs = vec![
(2, ProbableIndex(2)),
(3, ProbableIndex(3)),
(4, ProbableIndex(4)),
(6, ProbableIndex(6)),
(7, ProbableIndex(7)),
(9, ProbableIndex(9)),
(10, ProbableIndex(10)),
(11, ProbableIndex(11)),
(13, ProbableIndex(13)), (14, ProbableIndex(14)), ];
let result = page.get_documents::<u32>(&missing_docs).unwrap();
assert_eq!(
result.len(),
0,
"Should return no documents for missing IDs"
);
let mixed_docs = vec![
(1, ProbableIndex(1)), (2, ProbableIndex(2)), (5, ProbableIndex(5)), (6, ProbableIndex(6)), (8, ProbableIndex(8)), (9, ProbableIndex(9)), (12, ProbableIndex(12)), (15, ProbableIndex(15)), ];
let result = page.get_documents::<u32>(&mixed_docs).unwrap();
assert_eq!(result.len(), 4, "Should return only existing documents");
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
assert_eq!(sorted_result[3], (12, b"l".to_vec()));
let wrong_probable_docs = vec![
(1, ProbableIndex(0)), (5, ProbableIndex(2)), (8, ProbableIndex(50)), ];
let result = page.get_documents::<u32>(&wrong_probable_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should find documents even with wrong ProbableIndex"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (5, b"e".to_vec()));
assert_eq!(sorted_result[2], (8, b"h".to_vec()));
page.delete_documents(&[(5, ProbableIndex(5))], false)
.unwrap();
let after_deletion_docs = vec![
(1, ProbableIndex(1)), (5, ProbableIndex(5)), (8, ProbableIndex(8)), (12, ProbableIndex(12)), ];
let result = page.get_documents::<u32>(&after_deletion_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should return 3 documents after deleting one"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (8, b"h".to_vec()));
assert_eq!(sorted_result[2], (12, b"l".to_vec()));
let mixed_search_docs = vec![
(1, ProbableIndex(0)), (8, ProbableIndex(50)), (12, ProbableIndex(1)), (5, ProbableIndex(5)), (99, ProbableIndex(2)), ];
let result = page.get_documents::<u32>(&mixed_search_docs).unwrap();
assert_eq!(
result.len(),
3,
"Should find 3 existing non-deleted documents"
);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (8, b"h".to_vec()));
assert_eq!(sorted_result[2], (12, b"l".to_vec()));
}
#[test]
fn test_backwards_compatible_old_deletion_format() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
let reserved1 = page.reserve_space(1, 1).unwrap();
reserved1.write(b"a").unwrap();
let reserved2 = page.reserve_space(2, 1).unwrap();
reserved2.write(b"b").unwrap();
let reserved3 = page.reserve_space(3, 1).unwrap();
reserved3.write(b"c").unwrap();
let mut old_deletion_buf = [0u8; 16];
old_deletion_buf[0..8].copy_from_slice(&u64::MAX.to_be_bytes()); old_deletion_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes()); old_deletion_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&old_deletion_buf, DOCUMENT_INDEX_OFFSET + 16)
.unwrap();
let document_count = page.get_document_count().unwrap();
page.page_file
.write_all_at(&(document_count - 1).to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.unwrap();
let header = page.get_header().unwrap();
assert_eq!(header.document_count, 2);
assert_eq!(header.index.len(), 2);
let doc_ids: Vec<u64> = header.index.iter().map(|(id, _, _)| *id).collect();
assert!(doc_ids.contains(&1));
assert!(doc_ids.contains(&3));
assert!(!doc_ids.contains(&2));
assert!(!doc_ids.contains(&u64::MAX));
let result = page
.get_documents::<u32>(&[(1, ProbableIndex(0)), (3, ProbableIndex(2))])
.unwrap();
assert_eq!(result.len(), 2);
let mut sorted_result = result;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (3, b"c".to_vec()));
let deleted_result = page.get_documents::<u32>(&[(2, ProbableIndex(1))]).unwrap();
assert_eq!(deleted_result.len(), 0);
}
#[test]
fn test_mixed_deletion_formats() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
let reserved1 = page.reserve_space(1, 1).unwrap();
reserved1.write(b"a").unwrap();
let reserved2 = page.reserve_space(2, 1).unwrap();
reserved2.write(b"b").unwrap();
let reserved3 = page.reserve_space(3, 1).unwrap();
reserved3.write(b"c").unwrap();
let reserved4 = page.reserve_space(4, 1).unwrap();
reserved4.write(b"d").unwrap();
let mut old_deletion_buf = [0u8; 16];
old_deletion_buf[0..8].copy_from_slice(&u64::MAX.to_be_bytes());
old_deletion_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes());
old_deletion_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&old_deletion_buf, DOCUMENT_INDEX_OFFSET + 16)
.unwrap();
page.delete_documents(&[(4, ProbableIndex(3))], false)
.unwrap();
let header = page.get_header().unwrap();
assert_eq!(header.document_count, 3); assert_eq!(header.index.len(), 2);
let doc_ids: Vec<u64> = header.index.iter().map(|(id, _, _)| *id).collect();
assert!(doc_ids.contains(&1));
assert!(doc_ids.contains(&3));
assert!(!doc_ids.contains(&2));
assert!(!doc_ids.contains(&4));
let all_docs = page
.get_documents::<u32>(&[
(1, ProbableIndex(0)), (2, ProbableIndex(1)), (3, ProbableIndex(2)), (4, ProbableIndex(3)), ])
.unwrap();
assert_eq!(all_docs.len(), 2);
let mut sorted_result = all_docs;
sorted_result.sort_by_key(|(id, _)| *id);
assert_eq!(sorted_result[0], (1, b"a".to_vec()));
assert_eq!(sorted_result[1], (3, b"c".to_vec()));
}
#[test]
fn test_deletion_detection_helper() {
assert!(ZeboPage::is_deleted(u64::MAX, u32::MAX, u32::MAX));
assert!(ZeboPage::is_deleted(u64::MAX, u32::MAX, 0));
assert!(ZeboPage::is_deleted(1, u32::MAX, u32::MAX));
assert!(ZeboPage::is_deleted(12345, u32::MAX, u32::MAX));
assert!(!ZeboPage::is_deleted(1, 100, 50));
assert!(!ZeboPage::is_deleted(u64::MAX, 100, 50));
assert!(!ZeboPage::is_deleted(1, 0, 0)); assert!(!ZeboPage::is_deleted(1, u32::MAX, 50)); assert!(!ZeboPage::is_deleted(1, 100, u32::MAX)); }
#[test]
fn test_uninitialized_entry_detection() {
assert!(ZeboPage::is_uninitialized_entry(0));
assert!(!ZeboPage::is_uninitialized_entry(1));
assert!(!ZeboPage::is_uninitialized_entry(57)); assert!(!ZeboPage::is_uninitialized_entry(100));
assert!(!ZeboPage::is_uninitialized_entry(u32::MAX)); }
#[test]
fn test_fallback_search_bug_with_deleted_target_id() {
use crate::tests::prepare_test_dir;
let test_dir = prepare_test_dir();
let file_path = test_dir.join("bug_test_page.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 100, page_file).expect("Failed to create page");
let reserved1 = page.reserve_space(101, 2).expect("Failed to reserve");
reserved1.write(b"aa").expect("Failed to write");
let reserved2 = page.reserve_space(102, 2).expect("Failed to reserve");
reserved2.write(b"bb").expect("Failed to write");
let reserved3 = page.reserve_space(105, 2).expect("Failed to reserve");
reserved3.write(b"ee").expect("Failed to write");
let mut deleted_entry_buf = [0u8; 16];
deleted_entry_buf[0..8].copy_from_slice(&105_u64.to_be_bytes()); deleted_entry_buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes()); deleted_entry_buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
page.page_file
.write_all_at(&deleted_entry_buf, DOCUMENT_INDEX_OFFSET + 16)
.expect("Failed to write corrupted entry");
let result = page
.get_documents::<u64>(&[(105, ProbableIndex(999))])
.expect("Get documents failed");
assert_eq!(result.len(), 1);
assert_eq!(result[0].0, 105);
assert_eq!(result[0].1, b"ee".to_vec());
}
#[test]
fn test_fallback_search_optimization_start_from_end() {
use crate::tests::prepare_test_dir;
let _ = tracing_subscriber::fmt::try_init();
let test_dir = prepare_test_dir();
let file_path = test_dir.join("optimization_test_page.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 1000, page_file).expect("Failed to create page");
let doc_ids = vec![100, 200, 300, 400, 500, 600, 700, 800, 900, 950];
for doc_id in &doc_ids {
let reserved = page.reserve_space(*doc_id, 2).expect("Failed to reserve");
reserved.write(b"xx").expect("Failed to write");
}
let result = page
.fallback_search_document(950, None)
.expect("Search failed");
assert!(result.is_some());
assert_eq!(result.unwrap().0, 950);
let result = page
.fallback_search_document(100, None)
.expect("Search failed");
assert!(result.is_some());
assert_eq!(result.unwrap().0, 100);
let result = page
.fallback_search_document(940, None)
.expect("Search failed");
assert!(result.is_none());
let result = page
.fallback_search_document(150, None)
.expect("Search failed");
assert!(result.is_none());
}
#[test]
fn test_fallback_search_wrong_document_order() {
use crate::tests::prepare_test_dir;
let _ = tracing_subscriber::fmt::try_init();
let test_dir = prepare_test_dir();
let file_path = test_dir.join("wrong_document_order.zebo");
let page_file = std::fs::File::options()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&file_path)
.expect("Failed to create page file");
let mut page = ZeboPage::try_new(10, 1000, page_file).expect("Failed to create page");
let r = page.reserve_space(1, 5).unwrap();
r.write(b"hello").unwrap();
let r = page.reserve_space(2, 5).unwrap();
r.write(b"world").unwrap();
let r = page.reserve_space(4, 5).unwrap();
r.write(b"aaaaa").unwrap();
let r = page.reserve_space(3, 5).unwrap();
r.write(b"bbbbb").unwrap();
let r = page.reserve_space(5, 5).unwrap();
r.write(b"ccccc").unwrap();
let a = page
.get_documents::<u64>(&[
(1, ProbableIndex(0)),
(2, ProbableIndex(1)),
(3, ProbableIndex(3)),
(4, ProbableIndex(2)),
(5, ProbableIndex(4)),
])
.unwrap();
assert_eq!(a.len(), 5);
assert_eq!(a[0], (1, b"hello".to_vec()));
assert_eq!(a[1], (2, b"world".to_vec()));
assert_eq!(a[2], (3, b"bbbbb".to_vec()));
assert_eq!(a[3], (4, b"aaaaa".to_vec()));
assert_eq!(a[4], (5, b"ccccc".to_vec()));
}
}