#[cfg(feature = "memmap2")]
use memmap2::Mmap;
use uuid::Uuid;
use xxhash_rust::xxh64::xxh64;
use crate::path_util::path_from_bytes;
use crate::posting::{roaring_util, PostingList};
use crate::IndexError;
pub const MAGIC: &[u8; 4] = b"SNTX";
pub const FORMAT_VERSION_V2: u32 = 2;
pub const FORMAT_VERSION_V3: u32 = 3;
pub const FORMAT_VERSION: u32 = FORMAT_VERSION_V3;
pub const PAGE_SIZE: usize = 4096;
pub(super) const HEADER_SIZE: usize = 40;
pub const FOOTER_SIZE: usize = 48;
pub const DICT_ENTRY_SIZE: usize = 20;
pub const MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct DocEntry {
pub doc_id: u32,
pub content_hash: u64,
pub size_bytes: u64,
pub path: std::path::PathBuf,
}
#[derive(Debug, Clone)]
pub struct SegmentMeta {
pub segment_id: Uuid,
pub filename: String,
pub dict_filename: String,
pub post_filename: String,
pub doc_count: u32,
pub gram_count: u32,
}
mod segment_writer;
pub use segment_writer::SegmentWriter;
mod open;
mod reader;
pub(super) enum SegmentData {
#[cfg(feature = "memmap2")]
Mmap(Mmap),
Heap(Vec<u8>),
}
impl std::ops::Deref for SegmentData {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
#[cfg(feature = "memmap2")]
SegmentData::Mmap(m) => m,
SegmentData::Heap(v) => v,
}
}
}
pub(super) enum PostingsBacking {
#[cfg(feature = "memmap2")]
V2Mmap,
#[cfg(feature = "memmap2")]
V3File(std::fs::File),
InMemory(Vec<u8>),
}
pub struct MmapSegment {
pub(super) _file: Option<std::fs::File>,
pub(super) mmap: SegmentData,
pub(super) expected_len: usize,
pub doc_count: u32,
pub gram_count: u32,
pub(super) doc_table_offset: usize,
pub(super) dict_offset: usize,
#[cfg_attr(not(feature = "memmap2"), allow(dead_code))]
pub(super) postings_start: usize,
pub(super) postings: PostingsBacking,
}
impl MmapSegment {
fn check_len(&self) -> Option<()> {
if self.mmap.len() == self.expected_len {
Some(())
} else {
None
}
}
pub fn verify_integrity(&self) -> Result<(), IndexError> {
let len = self.mmap.len();
if len != self.expected_len {
return Err(IndexError::CorruptIndex(format!(
"segment size changed: expected {}, got {}",
self.expected_len, len,
)));
}
let content = self
.mmap
.get(..len - FOOTER_SIZE)
.ok_or_else(|| IndexError::CorruptIndex("truncated".into()))?;
let footer = self
.mmap
.get(len - FOOTER_SIZE..)
.ok_or_else(|| IndexError::CorruptIndex("truncated".into()))?;
let stored = u64::from_le_bytes(
footer
.get(32..40)
.ok_or_else(|| IndexError::CorruptIndex("truncated footer".into()))?
.try_into()
.map_err(|_| IndexError::CorruptIndex("footer slice".into()))?,
);
if xxh64(content, 0) != stored {
return Err(IndexError::CorruptIndex(
"checksum mismatch on re-verify".into(),
));
}
Ok(())
}
pub fn lookup_gram(&self, gram_hash: u64) -> Option<PostingList> {
self.check_len()?;
let (abs_off, _) = self.dict_lookup(gram_hash)?;
self.read_posting_list(abs_off)
}
pub fn gram_cardinality(&self, gram_hash: u64) -> Option<u32> {
self.check_len()?;
Some(self.dict_lookup(gram_hash)?.1)
}
#[cfg_attr(not(feature = "memmap2"), allow(dead_code))]
pub(crate) fn gram_hashes(&self) -> Result<Vec<u64>, IndexError> {
self.check_len()
.ok_or_else(|| IndexError::CorruptIndex("segment length changed".into()))?;
let dict_len = (self.gram_count as usize)
.checked_mul(DICT_ENTRY_SIZE)
.ok_or_else(|| IndexError::CorruptIndex("dictionary size overflow".into()))?;
let dict = self
.mmap
.get(self.dict_offset..self.dict_offset.saturating_add(dict_len))
.ok_or_else(|| IndexError::CorruptIndex("truncated dictionary".into()))?;
let mut hashes = Vec::with_capacity(self.gram_count as usize);
for entry in dict.chunks_exact(DICT_ENTRY_SIZE) {
hashes.push(u64::from_le_bytes(entry[0..8].try_into().map_err(
|_| IndexError::CorruptIndex("dictionary entry hash".into()),
)?));
}
Ok(hashes)
}
pub fn get_doc(&self, doc_id: u32) -> Option<DocEntry> {
self.check_len()?;
if doc_id >= self.doc_count {
return None;
}
let idx_pos = self
.doc_table_offset
.checked_add((doc_id as usize).checked_mul(8)?)?;
let abs_off =
u64::from_le_bytes(self.mmap.get(idx_pos..idx_pos + 8)?.try_into().ok()?) as usize;
const MIN_DOC_ENTRY_BYTES: usize = 22;
if abs_off < self.doc_table_offset
|| abs_off.saturating_add(MIN_DOC_ENTRY_BYTES) > self.dict_offset
{
return None;
}
let e = self.mmap.get(abs_off..)?;
let doc_id_r = u32::from_le_bytes(e.get(0..4)?.try_into().ok()?);
let content_hash = u64::from_le_bytes(e.get(4..12)?.try_into().ok()?);
let size_bytes = u64::from_le_bytes(e.get(12..20)?.try_into().ok()?);
let path_len = u16::from_le_bytes(e.get(20..22)?.try_into().ok()?) as usize;
if abs_off.saturating_add(22 + path_len) > self.dict_offset {
return None;
}
let path = path_from_bytes(e.get(22..22 + path_len)?);
Some(DocEntry {
doc_id: doc_id_r,
content_hash,
size_bytes,
path,
})
}
fn dict_lookup(&self, gram_hash: u64) -> Option<(usize, u32)> {
let dict = self.mmap.get(self.dict_offset..)?;
let n = self.gram_count as usize;
let mut lo = 0usize;
let mut hi = n;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let base = mid * DICT_ENTRY_SIZE;
let mid_hash = u64::from_le_bytes(dict.get(base..base + 8)?.try_into().ok()?);
match mid_hash.cmp(&gram_hash) {
std::cmp::Ordering::Equal => {
let abs_off =
u64::from_le_bytes(dict.get(base + 8..base + 16)?.try_into().ok()?)
as usize;
let count =
u32::from_le_bytes(dict.get(base + 16..base + 20)?.try_into().ok()?);
return Some((abs_off, count));
}
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
}
}
None
}
fn read_posting_list(&self, abs_off: usize) -> Option<PostingList> {
match &self.postings {
#[cfg(feature = "memmap2")]
PostingsBacking::V2Mmap => self.read_posting_list_mmap(abs_off),
#[cfg(feature = "memmap2")]
PostingsBacking::V3File(post_file) => {
reader::read_posting_list_pread(post_file, abs_off as u64).ok()
}
PostingsBacking::InMemory(bytes) => {
use crate::posting::PostingList;
const POST_MAGIC_SIZE: usize = 8;
let off = POST_MAGIC_SIZE + abs_off;
let b = bytes.get(off..)?;
const MIN_POSTING_BYTES: usize = 9;
if b.len() < MIN_POSTING_BYTES {
return None;
}
let encoding = b[0];
let byte_len = u32::from_le_bytes(b[5..9].try_into().ok()?) as usize;
let data = b.get(9..9 + byte_len)?;
match encoding {
0 => Some(PostingList::Small(data.to_vec())),
1 => roaring_util::deserialize(data).ok().map(PostingList::Large),
_ => None,
}
}
}
}
#[cfg_attr(not(feature = "memmap2"), allow(dead_code))]
fn read_posting_list_mmap(&self, abs_off: usize) -> Option<PostingList> {
const MIN_POSTING_BYTES: usize = 9;
if abs_off < self.postings_start
|| abs_off.saturating_add(MIN_POSTING_BYTES) > self.dict_offset
{
return None;
}
let b = self.mmap.get(abs_off..)?;
let encoding = *b.first()?;
let byte_len = u32::from_le_bytes(b.get(5..9)?.try_into().ok()?) as usize;
let data = b.get(9..9 + byte_len)?;
match encoding {
0 => Some(PostingList::Small(data.to_vec())),
1 => roaring_util::deserialize(data).ok().map(PostingList::Large),
_ => None,
}
}
}
#[cfg(test)]
mod tests;