use std::{borrow::Cow, io::Write, os::unix::fs::FileExt};
use crate::{DocumentId, Result, Version, ZeboError, index::ProbableIndex};
pub const VERSION_OFFSET: u64 = 0;
pub const DOCUMENT_COUNT_LIMIT_OFFSET: u64 = VERSION_OFFSET + 1;
pub const DOCUMENT_COUNT_OFFSET: u64 = DOCUMENT_COUNT_LIMIT_OFFSET + 4;
pub const NEXT_AVAILABLE_OFFSET: u64 = DOCUMENT_COUNT_OFFSET + 4;
pub const NEXT_AVAILABLE_HEADER_OFFSET: u64 = NEXT_AVAILABLE_OFFSET + 4;
pub const STARTING_DOCUMENT_ID_OFFSET: u64 = NEXT_AVAILABLE_HEADER_OFFSET + 4;
pub const DOCUMENT_INDEX_OFFSET: u64 = STARTING_DOCUMENT_ID_OFFSET + 8;
pub struct ZeboPage {
document_limit: u32,
#[allow(dead_code)]
pub(crate) starting_document_id: u64,
page_file: std::fs::File,
next_available_header_offset: u32,
}
impl ZeboPage {
pub fn try_new(
document_limit: u32,
starting_document_id: u64,
mut page_file: std::fs::File,
) -> Result<Self> {
let document_header_size = (4 + 4 + 8) * (document_limit as u64);
page_file
.set_len(DOCUMENT_INDEX_OFFSET + document_header_size)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[Version::V1.into()], VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&document_limit.to_be_bytes(), DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(&[0; 4], DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let initial_available_offset = (DOCUMENT_INDEX_OFFSET + document_header_size) as u32;
page_file
.write_all_at(
&initial_available_offset.to_be_bytes(),
NEXT_AVAILABLE_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file
.write_all_at(
&starting_document_id.to_be_bytes(),
STARTING_DOCUMENT_ID_OFFSET,
)
.map_err(ZeboError::OperationError)?;
page_file.flush().map_err(ZeboError::OperationError)?;
page_file.sync_all().map_err(ZeboError::OperationError)?;
Ok(Self {
document_limit,
starting_document_id,
page_file,
next_available_header_offset: 0,
})
}
pub fn try_load(page_file: std::fs::File) -> Result<Self> {
let mut buf = [0; 1];
page_file
.read_exact_at(&mut buf, VERSION_OFFSET)
.map_err(ZeboError::OperationError)?;
let version = buf[0];
if version != Version::V1.into() {
return Err(ZeboError::UnsupportedVersion {
version,
wanted: Version::V1.into(),
});
}
let mut buf = [0; 4];
page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_LIMIT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_limit = u32::from_be_bytes(buf);
page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_header_offset = u32::from_be_bytes(buf);
let mut buf = [0; 8];
page_file
.read_exact_at(&mut buf, STARTING_DOCUMENT_ID_OFFSET)
.map_err(ZeboError::OperationError)?;
let starting_document_id = u64::from_be_bytes(buf);
Ok(Self {
page_file,
document_limit,
starting_document_id,
next_available_header_offset,
})
}
pub fn get_document_count(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
let document_count = u32::from_be_bytes(buf);
Ok(document_count)
}
fn get_next_available_offset(&self) -> Result<u32> {
let mut buf = [0; 4];
self.page_file
.read_exact_at(&mut buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
let next_available_offset = u32::from_be_bytes(buf);
Ok(next_available_offset)
}
pub fn current_file_size(&self) -> Result<u64> {
let metadata = self
.page_file
.metadata()
.map_err(ZeboError::OperationError)?;
Ok(metadata.len())
}
pub fn get_header(&self) -> Result<ZeboPageHeader> {
let document_count = self.get_document_count()?;
let next_available_offset = self.get_next_available_offset()?;
let mut doc_index = Vec::with_capacity(document_count as usize);
let mut found = 0;
let mut i: u64 = 0;
while found < document_count {
if i > (self.document_limit as u64) {
break;
}
if let Some((doc_id, document_offset, document_len)) = self.get_at(i)? {
if document_offset == 0 {
break;
}
if document_offset == u32::MAX {
i += 1;
continue;
}
doc_index.push((doc_id, document_offset, document_len));
found += 1;
}
i += 1;
}
let header = ZeboPageHeader {
document_limit: self.document_limit,
document_count,
next_available_offset,
index: doc_index,
};
Ok(header)
}
pub fn get_documents<DocId: DocumentId>(
&self,
doc_id_with_index: &[(u64, ProbableIndex)],
) -> Result<Vec<(DocId, Vec<u8>)>> {
let header = self.get_header()?;
let mut r = Vec::with_capacity(doc_id_with_index.len());
for (doc_id, _) in doc_id_with_index {
let found = header.index.iter().find(|(d, _, _)| d == doc_id);
if let Some((_, document_offset, document_len)) = found {
let mut doc_buf = vec![0; *document_len as usize];
if document_len > &0 {
self.page_file
.read_exact_at(&mut doc_buf, *document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
r.push((DocId::from_u64(*doc_id), doc_buf));
}
}
Ok(r)
}
pub fn reserve_space(&mut self, doc_id: u64, len: u32) -> Result<ZeboReservedSpace> {
let next_available_offset = self.get_next_available_offset()?;
let document_count = self.get_document_count()?;
let available_header_offset = self.next_available_header_offset;
{
self.next_available_header_offset += 1;
let buf = self.next_available_header_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_HEADER_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let next_available_offset = next_available_offset + len;
let buf = next_available_offset.to_be_bytes();
self.page_file
.write_all_at(&buf, NEXT_AVAILABLE_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let document_count = document_count + 1;
let buf = document_count.to_be_bytes();
self.page_file
.write_all_at(&buf, DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
}
{
let document_offset = next_available_offset;
let mut buf = [0; 16];
buf[0..8].copy_from_slice(&doc_id.to_be_bytes());
buf[8..12].copy_from_slice(&document_offset.to_be_bytes());
buf[12..16].copy_from_slice(&len.to_be_bytes());
self.page_file
.write_all_at(
&buf,
DOCUMENT_INDEX_OFFSET + (available_header_offset * (4 + 4 + 8)) as u64,
)
.map_err(ZeboError::OperationError)?;
}
Ok(ZeboReservedSpace {
page: self,
document_offset: next_available_offset,
len,
})
}
pub fn delete_documents(
&mut self,
documents_to_delete: &[(u64, ProbableIndex)],
clean_data: bool,
) -> Result<u32> {
if clean_data {
let header = self.get_header()?;
let mut v: Vec<u8> = vec![];
for (doc_id, _) in documents_to_delete {
let found = header.index.iter().find(|(d, _, _)| d == doc_id);
if let Some((_, document_offset, document_len)) = found {
let len = *document_len as usize;
if v.len() < len {
v.resize(len, 0);
}
self.page_file
.write_all_at(&v[0..len], *document_offset as u64)
.map_err(ZeboError::OperationError)?;
}
}
}
let mut found = 0_u32;
let mut buf = [0; 16];
for i in 0..self.document_limit {
let (doc_id, offset, _) = match self.get_at(i as u64)? {
Some(x) => x,
None => continue,
};
if offset == 0 {
continue;
}
if documents_to_delete.iter().any(|(d, _)| *d == doc_id) {
buf[0..8].copy_from_slice(&u64::MAX.to_be_bytes());
buf[8..12].copy_from_slice(&u32::MAX.to_be_bytes());
buf[12..16].copy_from_slice(&u32::MAX.to_be_bytes());
self.page_file
.write_all_at(&buf, DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8)) as u64)
.map_err(ZeboError::OperationError)?;
found += 1;
}
}
if found > 0 {
let document_count = self.get_document_count()?;
let new_document_count = document_count - found;
self.page_file
.write_all_at(&new_document_count.to_be_bytes(), DOCUMENT_COUNT_OFFSET)
.map_err(ZeboError::OperationError)?;
}
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(found)
}
fn get_at(&self, document_index: u64) -> Result<Option<(u64, u32, u32)>> {
if (self.document_limit as u64) < document_index {
return Ok(None);
}
let mut buf = [0; 16];
self.page_file
.read_exact_at(
&mut buf,
DOCUMENT_INDEX_OFFSET + (document_index * (4 + 4 + 8)),
)
.map_err(ZeboError::OperationError)?;
let doc_id = u64::from_be_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let document_offset = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let document_len = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
Ok(Some((doc_id, document_offset, document_len)))
}
pub fn close(mut self) -> Result<()> {
self.page_file.flush().map_err(ZeboError::OperationError)?;
self.page_file
.sync_all()
.map_err(ZeboError::OperationError)?;
Ok(())
}
}
pub struct ZeboReservedSpace<'page> {
page: &'page mut ZeboPage,
len: u32,
document_offset: u32,
}
impl ZeboReservedSpace<'_> {
pub fn write(self, data: Cow<[Cow<[u8]>]>) -> Result<()> {
let data_len = data.iter().map(|d| d.len()).sum::<usize>() as u32;
if data_len > self.len {
return Err(ZeboError::NotEnoughReservedSpace {
wanted: data.len(),
reserved: self.len,
});
}
let mut offset = self.document_offset as u64;
for d in data.iter() {
let len = d.len();
self.page
.page_file
.write_all_at(d, offset)
.map_err(ZeboError::OperationError)?;
offset += len as u64;
}
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub struct ZeboPageHeader {
pub document_limit: u32,
pub document_count: u32,
pub next_available_offset: u32,
pub index: Vec<(u64, u32, u32)>,
}
#[cfg(test)]
mod tests {
use crate::tests::prepare_test_dir;
use super::*;
#[test]
fn test_zebo_page_check_internals_empty() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
assert_eq!(page.get_next_available_offset().unwrap(), 57);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_offset, 57);
assert_eq!(header.index.len(), 0);
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2
);
for i in 0..2 {
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
0
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
0
);
}
}
#[test]
fn test_zebo_page_check_internals_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(2, 0, zebo_page_file).unwrap();
assert_eq!(page.document_limit, 2);
assert_eq!(page.get_document_count().unwrap(), 0);
assert_eq!(page.get_next_available_offset().unwrap(), 57);
let header = page.get_header().unwrap();
assert_eq!(header.document_limit, 2);
assert_eq!(header.document_count, 0);
assert_eq!(header.next_available_offset, 57);
assert_eq!(header.index.len(), 0);
let reserved_space = page.reserve_space(1, 2).unwrap();
reserved_space
.write(Cow::Borrowed(&[Cow::Borrowed(b"ab")]))
.unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(file_content[0], Version::V1.into());
assert_eq!(
u32::from_be_bytes([
file_content[1],
file_content[2],
file_content[3],
file_content[4]
]),
2
);
assert_eq!(
u32::from_be_bytes([
file_content[5],
file_content[6],
file_content[7],
file_content[8]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[9],
file_content[10],
file_content[11],
file_content[12]
]),
DOCUMENT_INDEX_OFFSET as u32 + (4 + 4 + 8) * 2 + 2
);
let i = 0;
let offset = (DOCUMENT_INDEX_OFFSET + (i * (4 + 4 + 8))) as usize;
assert_eq!(
u64::from_be_bytes([
file_content[offset],
file_content[offset + 1],
file_content[offset + 2],
file_content[offset + 3],
file_content[offset + 4],
file_content[offset + 5],
file_content[offset + 6],
file_content[offset + 7]
]),
1
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 8],
file_content[offset + 9],
file_content[offset + 10],
file_content[offset + 11]
]),
57
);
assert_eq!(
u32::from_be_bytes([
file_content[offset + 12],
file_content[offset + 13],
file_content[offset + 14],
file_content[offset + 15]
]),
2
);
}
#[test]
fn test_zebo_page_check_internals_add_remove_add_doc() {
let test_dir = prepare_test_dir();
let file_path = test_dir.join("page_0.zebo");
let zebo_page_file = std::fs::File::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&file_path)
.unwrap();
let mut page = ZeboPage::try_new(10, 0, zebo_page_file).unwrap();
let reserved_space = page.reserve_space(1, 2).unwrap();
reserved_space
.write(Cow::Borrowed(&[Cow::Borrowed(b"ab")]))
.unwrap();
let reserved_space = page.reserve_space(2, 2).unwrap();
reserved_space
.write(Cow::Borrowed(&[Cow::Borrowed(b"cd")]))
.unwrap();
let reserved_space = page.reserve_space(3, 2).unwrap();
reserved_space
.write(Cow::Borrowed(&[Cow::Borrowed(b"ef")]))
.unwrap();
page.delete_documents(&[(2, ProbableIndex(0))], true)
.unwrap();
let reserved_space = page.reserve_space(4, 2).unwrap();
reserved_space
.write(Cow::Borrowed(&[Cow::Borrowed(b"ef")]))
.unwrap();
drop(page);
let file_content = std::fs::read(&file_path).unwrap();
assert_eq!(
&file_content[41..89],
&[
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0,
0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 189, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0,
191, 0, 0, 0, 2
]
);
}
}