use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use crate::blockfile::{BlockFile, write_blocks};
use crate::descriptor::FilterableField;
use crate::error::{CoreError, Result};
use crate::page::{PageCodec, PageType};
use crate::sec::{SecIndex, SecPredicate};
pub(crate) const SEGMENT_FORMAT_VERSION: u16 = 3;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct RowEntry {
pub external_id: String,
pub pay_off: u64,
pub pay_len: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SegmentDir {
pub format_version: u16,
pub segment_id: u64,
pub rows: Vec<RowEntry>,
}
pub(crate) struct SealRow<'a> {
pub external_id: &'a str,
pub vector: &'a [u8],
pub payload: &'a [u8],
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct PayLoc {
off: u64,
len: u32,
}
pub(crate) struct SealedSegment {
pub seg_id: u64,
vec: BlockFile,
pay: BlockFile,
paylocs: Vec<PayLoc>,
row_ids: Vec<String>,
dead: RoaringBitmap,
sec: SecIndex,
}
impl SealedSegment {
pub(crate) fn row_ids(&self) -> &[String] {
&self.row_ids
}
pub(crate) fn sec_query(&self, predicate: &SecPredicate) -> Result<Option<RoaringBitmap>> {
self.sec.query(predicate)
}
pub(crate) fn read_vector(
&self,
codec: &dyn PageCodec,
row: u32,
stride: usize,
) -> Result<Vec<u8>> {
self.vec.read_range(codec, row as usize * stride, stride)
}
pub(crate) fn read_payload(&self, codec: &dyn PageCodec, row: u32) -> Result<Vec<u8>> {
let loc = self.paylocs.get(row as usize).ok_or_else(|| {
CoreError::MalformedPage(format!(
"segment {} has no row {row} (row count {})",
self.seg_id,
self.paylocs.len()
))
})?;
self.pay
.read_range(codec, loc.off as usize, loc.len as usize)
}
pub(crate) fn row_count(&self) -> u32 {
self.paylocs.len() as u32
}
pub(crate) fn is_dead(&self, row: u32) -> bool {
self.dead.contains(row)
}
pub(crate) fn live_count(&self) -> u64 {
u64::from(self.row_count()) - self.dead.len()
}
pub(crate) fn mark_dead(&mut self, rows: &RoaringBitmap) {
self.dead |= rows;
}
pub(crate) fn dead_bitmap(&self) -> RoaringBitmap {
self.dead.clone()
}
}
pub(crate) fn write_segment(
seg_dir: &Path,
segment_id: u64,
codec: &dyn PageCodec,
rows: &[SealRow<'_>],
filterable: &[FilterableField],
) -> Result<()> {
let mut vec_blob = Vec::new();
let mut pay_blob = Vec::new();
let mut dir_rows = Vec::with_capacity(rows.len());
for row in rows {
vec_blob.extend_from_slice(row.vector);
let off = pay_blob.len() as u64;
pay_blob.extend_from_slice(row.payload);
dir_rows.push(RowEntry {
external_id: row.external_id.to_owned(),
pay_off: off,
pay_len: row.payload.len() as u32,
});
}
let dir = SegmentDir {
format_version: SEGMENT_FORMAT_VERSION,
segment_id,
rows: dir_rows,
};
let dir_blob = postcard::to_allocvec(&dir)?;
write_blocks(
&vec_path(seg_dir, segment_id),
codec,
PageType::Segment,
segment_id,
&vec_blob,
)?;
write_blocks(
&pay_path(seg_dir, segment_id),
codec,
PageType::Segment,
segment_id,
&pay_blob,
)?;
crate::paged::write_paged(
&dir_path(seg_dir, segment_id),
codec,
PageType::Segment,
segment_id,
&dir_blob,
)?;
if !filterable.is_empty() {
let payloads: Vec<&[u8]> = rows.iter().map(|r| r.payload).collect();
let sec = SecIndex::build(filterable, &payloads)?;
crate::paged::write_paged(
&sec_path(seg_dir, segment_id),
codec,
PageType::Segment,
segment_id,
&sec.encode()?,
)?;
}
Ok(())
}
pub(crate) fn write_del(
seg_dir: &Path,
segment_id: u64,
codec: &dyn PageCodec,
dead: &RoaringBitmap,
) -> Result<()> {
let mut blob = Vec::with_capacity(dead.serialized_size());
dead.serialize_into(&mut blob)?;
let tmp = del_tmp_path(seg_dir, segment_id);
crate::paged::write_paged(&tmp, codec, PageType::Segment, segment_id, &blob)?;
let final_path = del_path(seg_dir, segment_id);
std::fs::rename(&tmp, &final_path).map_err(|e| CoreError::io(&final_path, e))?;
crate::paged::fsync_dir(seg_dir)?;
Ok(())
}
fn read_del(seg_dir: &Path, segment_id: u64, codec: &dyn PageCodec) -> Result<RoaringBitmap> {
let path = del_path(seg_dir, segment_id);
if !path.exists() {
return Ok(RoaringBitmap::new());
}
let blob = crate::paged::read_paged(&path, codec, PageType::Segment)?;
Ok(RoaringBitmap::deserialize_from(&blob[..])?)
}
fn read_sec(seg_dir: &Path, segment_id: u64, codec: &dyn PageCodec) -> Result<SecIndex> {
let path = sec_path(seg_dir, segment_id);
if !path.exists() {
return Ok(SecIndex::default());
}
let blob = crate::paged::read_paged(&path, codec, PageType::Segment)?;
SecIndex::decode(&blob)
}
pub(crate) fn open_segment(
seg_dir: &Path,
segment_id: u64,
codec: &dyn PageCodec,
) -> Result<SealedSegment> {
let dir_blob =
crate::paged::read_paged(&dir_path(seg_dir, segment_id), codec, PageType::Segment)?;
let dir: SegmentDir = postcard::from_bytes(&dir_blob)?;
if dir.format_version != SEGMENT_FORMAT_VERSION {
return Err(CoreError::UnsupportedVersion {
found: dir.format_version,
supported: SEGMENT_FORMAT_VERSION,
});
}
let vec = BlockFile::open(&vec_path(seg_dir, segment_id), codec, PageType::Segment)?;
let pay = BlockFile::open(&pay_path(seg_dir, segment_id), codec, PageType::Segment)?;
let dead = read_del(seg_dir, segment_id, codec)?;
let sec = read_sec(seg_dir, segment_id, codec)?;
let mut row_ids = Vec::with_capacity(dir.rows.len());
let mut paylocs = Vec::with_capacity(dir.rows.len());
for r in dir.rows {
row_ids.push(r.external_id);
paylocs.push(PayLoc {
off: r.pay_off,
len: r.pay_len,
});
}
Ok(SealedSegment {
seg_id: segment_id,
vec,
pay,
paylocs,
row_ids,
dead,
sec,
})
}
fn vec_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.vec"))
}
fn pay_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.pay"))
}
fn dir_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.dir"))
}
fn sec_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.sec"))
}
fn del_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.del"))
}
fn del_tmp_path(seg_dir: &Path, seg_id: u64) -> PathBuf {
seg_dir.join(format!("seg-{seg_id:010}.del.tmp"))
}
pub(crate) fn seg_id_of_file(name: &str) -> Option<u64> {
let stem = name.strip_prefix("seg-")?;
let dot = stem.find('.')?;
stem[..dot].parse::<u64>().ok()
}
pub(crate) fn is_temp_file(name: &str) -> bool {
name.ends_with(".tmp")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::page::PlainCodec;
fn rows() -> Vec<SealRow<'static>> {
vec![
SealRow {
external_id: "a",
vector: &[0, 1, 2, 3],
payload: b"{}",
},
SealRow {
external_id: "b",
vector: &[4, 5, 6, 7],
payload: b"[1,2,3]",
},
]
}
#[test]
fn segment_roundtrips() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path();
write_segment(seg_dir, 1, &PlainCodec, &rows(), &[]).unwrap();
let seg = open_segment(seg_dir, 1, &PlainCodec).unwrap();
assert_eq!(seg.row_ids(), &["a".to_owned(), "b".to_owned()]);
assert_eq!(seg.row_count(), 2);
assert_eq!(seg.live_count(), 2);
assert!(!seg.is_dead(0));
assert_eq!(
seg.read_vector(&PlainCodec, 0, 4).unwrap(),
vec![0, 1, 2, 3]
);
assert_eq!(
seg.read_vector(&PlainCodec, 1, 4).unwrap(),
vec![4, 5, 6, 7]
);
assert_eq!(seg.read_payload(&PlainCodec, 0).unwrap(), b"{}");
assert_eq!(seg.read_payload(&PlainCodec, 1).unwrap(), b"[1,2,3]");
assert!(!sec_path(seg_dir, 1).exists());
}
#[test]
fn tombstone_bitmap_roundtrips_atomically() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path();
write_segment(seg_dir, 1, &PlainCodec, &rows(), &[]).unwrap();
let seg = open_segment(seg_dir, 1, &PlainCodec).unwrap();
assert_eq!(seg.live_count(), 2);
let mut dead = RoaringBitmap::new();
dead.insert(0);
write_del(seg_dir, 1, &PlainCodec, &dead).unwrap();
assert!(
!del_tmp_path(seg_dir, 1).exists(),
"temp must be renamed away"
);
let seg = open_segment(seg_dir, 1, &PlainCodec).unwrap();
assert!(seg.is_dead(0));
assert!(!seg.is_dead(1));
assert_eq!(seg.live_count(), 1);
}
#[test]
fn secondary_index_is_written_and_queryable() {
use crate::descriptor::FilterableField;
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path();
let rows = vec![
SealRow {
external_id: "a",
vector: &[0, 0, 0, 0],
payload: br#"{"city":"paris"}"#,
},
SealRow {
external_id: "b",
vector: &[0, 0, 0, 0],
payload: br#"{"city":"lyon"}"#,
},
];
let filterable = [FilterableField::keyword("city")];
write_segment(seg_dir, 2, &PlainCodec, &rows, &filterable).unwrap();
assert!(sec_path(seg_dir, 2).exists(), ".sec must be written");
let seg = open_segment(seg_dir, 2, &PlainCodec).unwrap();
let hit = seg
.sec_query(&SecPredicate::Eq {
field: "city".into(),
value: crate::sec::SecValue::Keyword("paris".into()),
})
.unwrap()
.unwrap();
assert_eq!(hit.iter().collect::<Vec<_>>(), vec![0]);
}
#[test]
fn empty_segment_roundtrips() {
let dir = tempfile::tempdir().unwrap();
write_segment(dir.path(), 5, &PlainCodec, &[], &[]).unwrap();
let seg = open_segment(dir.path(), 5, &PlainCodec).unwrap();
assert!(seg.row_ids().is_empty());
assert_eq!(seg.row_count(), 0);
assert!(seg.read_payload(&PlainCodec, 0).is_err());
}
#[test]
fn seg_id_parses_from_any_companion() {
assert_eq!(seg_id_of_file("seg-0000000007.vec"), Some(7));
assert_eq!(seg_id_of_file("seg-0000000042.pay"), Some(42));
assert_eq!(seg_id_of_file("seg-0000000003.dir"), Some(3));
assert_eq!(seg_id_of_file("seg-0000000009.del"), Some(9));
assert_eq!(seg_id_of_file("seg-0000000005.sec"), Some(5));
assert_eq!(seg_id_of_file("CURRENT"), None);
assert_eq!(seg_id_of_file("seg-bogus.vec"), None);
assert!(is_temp_file("seg-0000000001.del.tmp"));
assert!(!is_temp_file("seg-0000000001.del"));
}
}