pub mod aggregates;
pub mod bloom;
pub mod commit;
pub mod disk_cache;
pub mod encoding;
pub mod hll;
pub mod list;
pub mod list_prune;
pub mod options_hash;
pub mod part;
pub mod partition;
pub mod term_range;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt,
ops::Deref,
sync::{Arc, OnceLock},
};
use arrow::compute::kernels::aggregate as agg;
use arrow_array::*;
use arrow_schema::DataType;
use dashmap::DashMap;
use futures::future;
pub use list::{FtsSummaryAgg, ScalarStatsAgg};
use tokio::sync::OnceCell;
use uuid::Uuid;
use xxhash_rust::xxh3::xxh3_64;
use super::options::SupertableOptions;
use crate::{
storage::{StorageError, StorageProvider},
superfile::vector::distance::{
COSINE_DISTANCE_BASE, L2_CROSS_TERM_COEFF, Metric, sq8_dot, u8_sum_sumsq,
},
supertable::{
CommitError,
error::ManifestError,
manifest::{
commit::{
EncodedPart, PointerFile, frame_content_size, part_uri, read_pointer,
translate_contention, write_manifest, write_part_bytes, write_pointer,
},
disk_cache::ManifestDiskCache,
list::{
FORMAT_VERSION as LIST_FORMAT_VERSION, Manifest, ManifestPartEntry,
PartitionStrategy,
},
part::{ContentHash, ManifestPart, PartId},
partition::{assign_partition, encode_partition_key},
},
query::{hierarchical_iter, prune::PruneLeaf},
},
};
pub const MANIFEST_ZSTD_LEVEL: i32 = 3;
#[derive(Debug, Clone)]
pub struct SuperfileList {
pub manifest_id: u64,
pub options: Arc<SupertableOptions>,
pub superfiles: Vec<Arc<SuperfileEntry>>,
}
impl SuperfileList {
pub fn empty(options: Arc<SupertableOptions>) -> Self {
Self {
manifest_id: 0,
options,
superfiles: Vec::new(),
}
}
pub fn with_appended(&self, new_entries: Vec<Arc<SuperfileEntry>>) -> Self {
let mut superfiles = self.superfiles.clone();
superfiles.extend(new_entries);
Self {
manifest_id: self.manifest_id + 1,
options: self.options.clone(),
superfiles,
}
}
pub fn n_docs_total(&self) -> u64 {
self.superfiles.iter().map(|s| s.n_docs).sum()
}
}
pub struct ManifestSnapshot {
superfile_list: SuperfileList,
list: Option<Manifest>,
parts: DashMap<PartId, Arc<OnceCell<Arc<ManifestPart>>>>,
loader: Option<Arc<ManifestPartLoader>>,
}
impl fmt::Debug for ManifestSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ManifestSnapshot")
.field("manifest_id", &self.superfile_list.manifest_id)
.field("n_superfiles", &self.superfile_list.superfiles.len())
.field("has_list", &self.list.is_some())
.field(
"n_parts",
&self.list.as_ref().map(|l| l.parts.len()).unwrap_or(0),
)
.field("n_parts_loaded", &self.parts.len())
.field("has_loader", &self.loader.is_some())
.finish()
}
}
impl Deref for ManifestSnapshot {
type Target = SuperfileList;
fn deref(&self) -> &Self::Target {
&self.superfile_list
}
}
impl ManifestSnapshot {
pub fn new(
manifest_id: u64,
options: Arc<SupertableOptions>,
superfile_list: Vec<Arc<SuperfileEntry>>,
storage: Option<Arc<dyn StorageProvider>>,
list: Option<Manifest>,
) -> Self {
let superfile_list = SuperfileList {
manifest_id,
options,
superfiles: superfile_list,
};
if let Some(storage) = storage
&& let Some(list) = list
{
let manifest_cache = superfile_list.options.manifest_disk_cache.clone();
let loader = Arc::new(ManifestPartLoader::new_with_cache(
Arc::clone(&storage),
&list,
manifest_cache,
));
Self {
superfile_list,
list: Some(list),
parts: DashMap::new(),
loader: Some(loader),
}
} else {
Self {
superfile_list,
list: None,
parts: DashMap::new(),
loader: None,
}
}
}
#[cfg(test)]
pub fn new_from_superfiles(
opts: Arc<SupertableOptions>,
superfiles: Vec<Arc<SuperfileEntry>>,
) -> Self {
ManifestSnapshot::empty(opts).with_appended(superfiles)
}
pub fn empty(options: Arc<SupertableOptions>) -> Self {
Self {
superfile_list: SuperfileList::empty(options),
list: None,
parts: DashMap::new(),
loader: None,
}
}
pub fn get_manifest_id(&self) -> u64 {
self.superfile_list.manifest_id
}
pub fn get_next_manifest_id(&self) -> u64 {
self.get_manifest_id() + 1
}
pub fn get_opts(&self) -> Arc<SupertableOptions> {
self.superfile_list.options.clone()
}
pub fn get_partition_strategy(&self) -> PartitionStrategy {
self.list
.as_ref()
.map(|l| l.partition_strategy.clone())
.unwrap_or(self.superfile_list.options.effective_partition_strategy())
}
pub fn get_num_parts(&self) -> usize {
self.list.as_ref().map(|l| l.parts.len()).unwrap_or(0)
}
pub fn get_num_parts_loaded(&self) -> usize {
self.parts.len()
}
pub fn is_in_process_only(&self) -> bool {
self.list.is_none()
}
pub fn get_cached_part_by_id(&self, part_id: &PartId) -> Option<Arc<ManifestPart>> {
self.parts
.get(part_id)
.and_then(|cell| cell.value().get().cloned())
}
pub fn get_cached_part_by_list_idx(&self, idx: usize) -> Option<Arc<ManifestPart>> {
let Some(list) = &self.list else {
return None;
};
let part_id = list.parts[idx].part_id;
self.get_cached_part_by_id(&part_id)
}
pub(crate) async fn load(
current_manifest: Option<Arc<Self>>,
storage: Arc<dyn StorageProvider>,
options: Option<Arc<SupertableOptions>>,
) -> Result<Arc<Self>, ManifestLoadError> {
let (pointer, _) = match read_pointer(storage.as_ref()).await? {
Some(p) => p,
None => return Err(ManifestLoadError::PointerNotFound),
};
if let Some(current_manifest) = ¤t_manifest
&& current_manifest.superfile_list.manifest_id >= pointer.manifest_id
{
return Err(ManifestLoadError::AlreadyLoaded);
}
let (list_bytes, _) = storage
.get(&pointer.manifest_uri)
.await
.map_err(ManifestLoadError::Storage)?;
let list = list::decode(&list_bytes).map_err(ManifestLoadError::ListParse)?;
let options = if let Some(options) = options {
options
} else if let Some(current) = ¤t_manifest {
current.options.clone()
} else {
return Err(ManifestLoadError::ContentHashMismatch {
expected: "valid options".to_string(),
actual: "None options".to_string(),
});
};
let expected_hash = options_hash::compute_options_hash(&options, &list.partition_strategy);
if let Err(mismatch) = options_hash::verify_options_hash(expected_hash, list.options_hash) {
return Err(ManifestLoadError::ContentHashMismatch {
expected: mismatch.expected,
actual: mismatch.actual,
});
}
let loader = Arc::new(ManifestPartLoader::new_with_cache(
Arc::clone(&storage),
&list,
options.manifest_disk_cache.clone(),
));
let parts: DashMap<_, _> = DashMap::new();
let mut all_superfiles: Vec<Arc<SuperfileEntry>> = Vec::new();
if let Some(current_manifest) = ¤t_manifest {
let mut missing_part_ids = Vec::new();
for entry in &list.parts {
if let Some(existing) = current_manifest.parts.get(&entry.part_id) {
parts.insert(entry.part_id, existing.value().clone());
} else {
missing_part_ids.push(entry.part_id);
}
}
let threshold = options.eager_load_threshold_parts as usize;
let eager = list.parts.len() <= threshold;
if eager {
let load_futs = missing_part_ids
.iter()
.map(|id| {
let loader = Arc::clone(&loader);
let pid = *id;
async move { loader.load(pid).await }
})
.collect::<Vec<_>>();
let loaded = future::join_all(load_futs).await;
for (pid, result) in missing_part_ids.iter().zip(loaded) {
let part = result?;
let cell = OnceCell::new();
cell.set(part).expect("fresh cell");
parts.insert(*pid, Arc::new(cell));
}
for entry in &list.parts {
let cell = parts.get(&entry.part_id).expect("part inserted above");
let part = cell
.value()
.get()
.expect("eager-fetched or inherited; must be set");
all_superfiles.extend(part.superfiles.iter().cloned());
}
} else {
for pid in &missing_part_ids {
parts.insert(*pid, Arc::new(OnceCell::new()));
}
}
} else {
let n_parts = list.parts.len();
let threshold = options.eager_load_threshold_parts as usize;
let eager = n_parts <= threshold;
if eager {
let part_ids: Vec<_> = list.parts.iter().map(|p| p.part_id).collect();
let load_futs = part_ids
.iter()
.map(|id| {
let loader = Arc::clone(&loader);
let pid = *id;
async move { loader.load(pid).await }
})
.collect::<Vec<_>>();
let loaded = future::join_all(load_futs).await;
for (pid, result) in part_ids.iter().zip(loaded) {
let part = result?;
all_superfiles.extend(part.superfiles.iter().cloned());
let cell = OnceCell::new();
cell.set(part).expect("fresh OnceCell");
parts.insert(*pid, Arc::new(cell));
}
} else {
for entry in &list.parts {
parts.insert(entry.part_id, Arc::new(OnceCell::new()));
}
}
}
let mut new_superfile_list = SuperfileList::empty(options.clone());
new_superfile_list.manifest_id = pointer.manifest_id;
new_superfile_list.superfiles = all_superfiles;
let new_manifest = ManifestSnapshot {
superfile_list: new_superfile_list,
list: Some(list),
parts,
loader: Some(loader),
};
Ok(Arc::new(new_manifest))
}
pub async fn write(
&self,
storage: &dyn StorageProvider,
expected_prev_etag: Option<&str>,
parts_to_write: &[&[u8]],
) -> Result<(), CommitError> {
let Some(list_to_write) = self.list.as_ref() else {
return Ok(());
};
let list_fut = write_manifest(storage, list_to_write);
let part_futs = parts_to_write
.iter()
.map(|encoded| write_part_bytes(storage, encoded));
let part_join = future::join_all(part_futs);
let (list_res, part_results) = tokio::join!(list_fut, part_join);
let list_res = list_res.map_err(translate_contention)?;
for part_result in part_results {
part_result.map_err(translate_contention)?;
}
let pointer = PointerFile {
manifest_id: self.get_manifest_id(),
manifest_uri: list_res.uri,
content_hash: list_res.content_hash,
};
write_pointer(storage, &pointer, expected_prev_etag).await?;
Ok(())
}
pub fn get_all_superfiles(&self) -> &[Arc<SuperfileEntry>] {
&self.superfile_list.superfiles
}
pub(crate) async fn get_pruned_superfiles(
&self,
leaves: &[PruneLeaf],
) -> Result<Vec<Arc<SuperfileEntry>>, ManifestLoadError> {
match &self.list {
Some(list) => {
let mut kept: Option<HashSet<PartId>> = None;
for leaf in leaves {
if let Some(part_ids) = leaf.keep_parts(list) {
let set: HashSet<PartId> = part_ids.into_iter().collect();
kept = Some(match kept {
None => set,
Some(existing) => existing.intersection(&set).copied().collect(),
});
}
}
let ordered: Vec<PartId> = match kept {
Some(set) => list
.parts
.iter()
.map(|p| p.part_id)
.filter(|id| set.contains(id))
.collect(),
None => list.parts.iter().map(|p| p.part_id).collect(),
};
hierarchical_iter::load_and_flatten(self, &ordered).await
}
None => Ok(hierarchical_iter::fallback_to_flat_superfiles(self)),
}
}
pub(crate) async fn get_pruned_superfiles_for_vector(
&self,
column: &str,
query: &[f32],
) -> Result<Vec<Arc<SuperfileEntry>>, ManifestLoadError> {
match &self.list {
Some(list) => {
let kept = list_prune::prune_parts_for_vector(list, column, query, f32::INFINITY);
hierarchical_iter::load_and_flatten(self, &kept).await
}
None => Ok(hierarchical_iter::fallback_to_flat_superfiles(self)),
}
}
pub fn get_all_list_entries(&self) -> &[ManifestPartEntry] {
match &self.list {
Some(list) => &list.parts,
None => &[],
}
}
pub fn with_appended(&self, new_entries: Vec<Arc<SuperfileEntry>>) -> Self {
Self {
superfile_list: self.superfile_list.with_appended(new_entries),
list: self.list.clone(),
parts: DashMap::new(),
loader: self.loader.clone(),
}
}
pub async fn get_part_by_id(
&self,
part_id: PartId,
) -> Result<Arc<ManifestPart>, ManifestLoadError> {
let loader = self
.loader
.as_ref()
.ok_or(ManifestLoadError::NoLoaderAttached)?;
let cell = self
.parts
.entry(part_id)
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone();
let loaded = cell.get_or_try_init(|| loader.load(part_id)).await?;
Ok(Arc::clone(loaded))
}
pub async fn update(
&self,
new_entries: &[Arc<SuperfileEntry>],
entries_to_remove: &[Arc<SuperfileEntry>],
) -> Result<(ManifestSnapshot, Vec<EncodedPart>), ManifestError> {
let opts = self.get_opts();
let strategy = self.get_partition_strategy();
let mut new_by_partition: BTreeMap<Vec<u8>, Vec<Arc<SuperfileEntry>>> = BTreeMap::new();
for entry in new_entries {
let pk = assign_partition(entry, &strategy)?;
new_by_partition
.entry(encode_partition_key(&pk))
.or_default()
.push(Arc::clone(entry));
}
let mut removals_by_partition: BTreeMap<Vec<u8>, Vec<Arc<SuperfileEntry>>> =
BTreeMap::new();
for entry in entries_to_remove {
let pk = assign_partition(entry, &strategy)?;
removals_by_partition
.entry(encode_partition_key(&pk))
.or_default()
.push(Arc::clone(entry));
}
let mut latest_index_for_partition: HashMap<Vec<u8>, usize> = HashMap::new();
let list_entries = self.get_all_list_entries();
for (i, entry) in list_entries.iter().enumerate() {
latest_index_for_partition.insert(entry.partition_key.clone(), i);
}
let mut out_list_entries: Vec<ManifestPartEntry> = Vec::new();
let mut parts_to_write: Vec<EncodedPart> = Vec::new();
let mut handled_partitions: HashSet<Vec<u8>> = HashSet::new();
for (i, entry) in list_entries.iter().enumerate() {
let is_latest_for_partition = latest_index_for_partition
.get(&entry.partition_key)
.copied()
== Some(i);
let touched = new_by_partition.contains_key(&entry.partition_key);
if is_latest_for_partition && touched {
let new_for_pk = new_by_partition
.remove(&entry.partition_key)
.expect("touched implies present");
let combined_n = entry.n_superfiles as usize + new_for_pk.len();
if combined_n as u64 > self.superfile_list.options.target_superfiles_per_part {
out_list_entries.push(entry.clone());
let (fresh_entry, fresh_part, fresh_encoded) = rebuild_part_and_entry(
opts.clone(),
vec![],
new_for_pk,
entry.partition_key.clone(),
None,
);
out_list_entries.push(fresh_entry);
parts_to_write.push(EncodedPart {
part: fresh_part,
encoded: fresh_encoded,
});
} else {
let existing_part = self.get_part_by_id(entry.part_id).await?;
let (rebuilt_entry, rebuilt_part, rebuilt_encoded) = rebuild_part_and_entry(
opts.clone(),
existing_part.superfiles.clone(),
new_for_pk,
entry.partition_key.clone(),
Some(entry),
);
out_list_entries.push(rebuilt_entry);
parts_to_write.push(EncodedPart {
part: rebuilt_part,
encoded: rebuilt_encoded,
});
}
handled_partitions.insert(entry.partition_key.clone());
} else {
out_list_entries.push(entry.clone());
}
}
for (pk, new_for_pk) in new_by_partition {
if handled_partitions.contains(&pk) {
continue;
}
let (fresh_entry, fresh_part, fresh_encoded) =
rebuild_part_and_entry(opts.clone(), vec![], new_for_pk, pk, None);
out_list_entries.push(fresh_entry);
parts_to_write.push(EncodedPart {
part: fresh_part,
encoded: fresh_encoded,
});
}
let mut out_list_entries_after_removal = Vec::new();
for entry in out_list_entries {
let Some(removals) = removals_by_partition.get(&entry.partition_key) else {
out_list_entries_after_removal.push(entry);
continue;
};
let removal_ids = removals
.iter()
.map(|r| r.superfile_id)
.collect::<HashSet<_>>();
let (superfile_entries_in_part, existing_part_to_update) = if let Some(existing) =
parts_to_write
.iter_mut()
.find(|ep| ep.part.part_id == entry.part_id)
{
(existing.part.superfiles.clone(), Some(existing))
} else if let Ok(existing_part) = self.get_part_by_id(entry.part_id).await {
(existing_part.superfiles.clone(), None)
} else {
return Err(ManifestError::UnknownPartId(entry.part_id));
};
let final_superfile_entries = superfile_entries_in_part
.iter()
.filter(|s| !removal_ids.contains(&s.superfile_id))
.cloned()
.collect::<Vec<_>>();
if final_superfile_entries.len() == superfile_entries_in_part.len() {
out_list_entries_after_removal.push(entry);
continue;
}
let (fresh_entry, fresh_part, fresh_encoded) = rebuild_part_and_entry(
opts.clone(),
vec![],
final_superfile_entries,
entry.partition_key,
None,
);
if let Some(existing) = existing_part_to_update {
*existing = EncodedPart {
part: fresh_part,
encoded: fresh_encoded,
};
} else {
parts_to_write.push(EncodedPart {
part: fresh_part,
encoded: fresh_encoded,
});
}
out_list_entries_after_removal.push(fresh_entry);
}
let opts_hash = options_hash::compute_options_hash(opts.as_ref(), &strategy);
let new_list = Manifest {
format_version: LIST_FORMAT_VERSION.into(),
manifest_id: self.get_next_manifest_id(),
options_hash: opts_hash,
schema: Vec::new(),
id_column: opts.id_column.clone(),
fts_columns: opts
.fts_columns
.iter()
.map(|f| list::FtsColumnInfo {
column: f.column.clone(),
})
.collect(),
vector_columns: opts
.vector_columns
.iter()
.map(|v| list::VectorColumnInfo {
column: v.column.clone(),
dim: v.dim,
n_cent: v.n_cent,
rot_seed: v.rot_seed,
metric: format!("{:?}", v.metric).to_lowercase(),
})
.collect(),
partition_strategy: strategy,
parts: out_list_entries_after_removal,
};
let ids_to_remove = entries_to_remove
.iter()
.map(|e| e.superfile_id)
.collect::<HashSet<_>>();
let mut new_superfile_list = self
.get_all_superfiles()
.iter()
.chain(new_entries.iter())
.map(Arc::clone)
.collect::<Vec<_>>();
new_superfile_list.retain(|e| !ids_to_remove.contains(&e.superfile_id));
let new_superfile_list = SuperfileList {
manifest_id: self.get_next_manifest_id(),
options: self.get_opts(),
superfiles: new_superfile_list,
};
let loader = opts.storage.as_ref().map(|storage| {
Arc::new(ManifestPartLoader::new_with_cache(
storage.clone(),
&new_list,
opts.manifest_disk_cache.clone(),
))
});
let live_part_ids: HashSet<_> = new_list.parts.iter().map(|e| e.part_id).collect();
let parts = DashMap::new();
for kv in self.parts.iter() {
if live_part_ids.contains(kv.key()) {
parts.insert(*kv.key(), kv.value().clone());
}
}
for part in parts_to_write.iter() {
let part = part.part.clone();
parts.insert(
part.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part)))),
);
}
let new_manifest = ManifestSnapshot {
superfile_list: new_superfile_list,
list: Some(new_list),
parts,
loader,
};
Ok((new_manifest, parts_to_write))
}
}
fn rebuild_part_and_entry(
opts: Arc<SupertableOptions>,
old_superfiles: Vec<Arc<SuperfileEntry>>,
new_superfiles: Vec<Arc<SuperfileEntry>>,
partition_key: Vec<u8>,
base_part: Option<&ManifestPartEntry>,
) -> (
ManifestPartEntry,
ManifestPart,
Vec<u8>, // pre-encoded compressed bytes — reused by write path, no second encode
) {
let _ = opts;
let aggregates = crate::supertable::manifest::aggregates::compute(&new_superfiles, base_part);
let superfiles = old_superfiles
.into_iter()
.chain(new_superfiles)
.collect::<Vec<_>>();
let part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles,
};
let compressed = part::encode(&part, MANIFEST_ZSTD_LEVEL);
let size_compressed = compressed.len() as u64;
let content_hash = ContentHash::of(&compressed);
let size_uncompressed = frame_content_size(&compressed, size_compressed);
let entry = ManifestPartEntry {
part_id: part.part_id,
uri: part_uri(&content_hash),
n_superfiles: part.superfiles.len() as u64,
size_bytes_compressed: size_compressed,
size_bytes_uncompressed: size_uncompressed,
content_hash,
partition_key,
id_range: aggregates.id_range,
scalar_stats_agg: aggregates.scalar_stats_agg,
fts_summary_agg: aggregates.fts_summary_agg,
vector_summary_agg: aggregates.vector_summary_agg,
};
(entry, part, compressed)
}
pub struct ManifestPartLoader {
storage: Arc<dyn StorageProvider>,
parts_index: HashMap<PartId, (ContentHash, String)>,
manifest_disk_cache: Option<Arc<ManifestDiskCache>>,
}
impl ManifestPartLoader {
pub fn new(storage: Arc<dyn StorageProvider>, list: &Manifest) -> Self {
Self::new_with_cache(storage, list, None)
}
pub fn new_with_cache(
storage: Arc<dyn StorageProvider>,
list: &Manifest,
manifest_disk_cache: Option<Arc<ManifestDiskCache>>,
) -> Self {
let mut idx = HashMap::with_capacity(list.parts.len());
for entry in &list.parts {
idx.insert(entry.part_id, (entry.content_hash, entry.uri.clone()));
}
Self {
storage,
parts_index: idx,
manifest_disk_cache,
}
}
pub async fn load(&self, part_id: PartId) -> Result<Arc<ManifestPart>, ManifestLoadError> {
let (expected_hash, uri) = self
.parts_index
.get(&part_id)
.ok_or(ManifestLoadError::PartNotInList { part_id })?;
if let Some(cache) = &self.manifest_disk_cache
&& let Some(bytes) = cache.get(expected_hash).await
{
let parsed = part::decode(&bytes)?;
return Ok(Arc::new(parsed));
}
let (bytes, _) = self
.storage
.get(uri)
.await
.map_err(ManifestLoadError::Storage)?;
let actual_hash = ContentHash::of(&bytes);
if actual_hash != *expected_hash {
return Err(ManifestLoadError::ContentHashMismatch {
expected: expected_hash.to_hex(),
actual: actual_hash.to_hex(),
});
}
if let Some(cache) = &self.manifest_disk_cache {
cache.put(actual_hash, &bytes).await;
}
let parsed = part::decode(&bytes)?;
Ok(Arc::new(parsed))
}
}
#[derive(Debug, thiserror::Error)]
pub enum ManifestLoadError {
#[error("pointer not found in storage")]
PointerNotFound,
#[error("already loaded")]
AlreadyLoaded,
#[error("pointer parse error: {0}")]
PointerParse(String),
#[error("no storage / loader attached to this manifest")]
NoLoaderAttached,
#[error("list parse error: {0}")]
ListParse(#[source] list::ListParseError),
#[error("part_id not in manifest list: {part_id}")]
PartNotInList { part_id: PartId },
#[error("storage error during part load: {0}")]
Storage(#[source] StorageError),
#[error("content-hash mismatch: expected {expected}, got {actual}")]
ContentHashMismatch { expected: String, actual: String },
#[error("part parse failed")]
Parse(#[from] part::PartParseError),
}
#[derive(Debug)]
pub struct SuperfileEntry {
pub superfile_id: Uuid,
pub uri: SuperfileUri,
pub n_docs: u64,
pub id_min: i128,
pub id_max: i128,
pub scalar_stats: HashMap<String, ScalarStatsAgg>,
pub fts_summary: HashMap<String, FtsSummaryAgg>,
pub vector_summary: HashMap<String, VectorSummary>,
pub partition_key: Vec<u8>,
pub partition_hint: Option<u32>,
pub subsection_offsets: Option<SubsectionOffsets>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubsectionOffsets {
pub total_size: u64,
pub vec: Option<(u64, u64)>,
pub fts: Option<(u64, u64)>,
pub vec_open_ranges: Vec<(u64, u64)>,
pub fts_open_ranges: Vec<(u64, u64)>,
pub open_blob: Vec<(u64, Vec<u8>)>,
}
#[derive(Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct SuperfileUri(pub Uuid);
impl SuperfileUri {
pub fn new_v4() -> Self {
Self(Uuid::new_v4())
}
pub fn storage_path(self) -> String {
format!("data/seg-{}.sf.parquet", self.0)
}
pub fn cache_filename(self) -> String {
format!("seg-{}.sf.parquet", self.0)
}
pub fn cache_tmp_filename(self) -> String {
format!("seg-{}.sf.parquet.tmp", self.0)
}
}
pub(crate) fn merge_min_max_arrays(
existing_min: &ArrayRef,
other_min: &ArrayRef,
existing_max: &ArrayRef,
other_max: &ArrayRef,
) -> Option<(ArrayRef, ArrayRef)> {
macro_rules! prim_merge {
($array_ty:ty) => {{
let ex_min_arr = existing_min.as_any().downcast_ref::<$array_ty>()?;
let ot_min_arr = other_min.as_any().downcast_ref::<$array_ty>()?;
let ex_max_arr = existing_max.as_any().downcast_ref::<$array_ty>()?;
let ot_max_arr = other_max.as_any().downcast_ref::<$array_ty>()?;
let ex_min = ex_min_arr.value(0);
let ot_min = ot_min_arr.value(0);
let ex_max = ex_max_arr.value(0);
let ot_max = ot_max_arr.value(0);
let merged_min = if ex_min < ot_min { ex_min } else { ot_min };
let merged_max = if ex_max > ot_max { ex_max } else { ot_max };
Some((
Arc::new(<$array_ty>::from(vec![merged_min])) as ArrayRef,
Arc::new(<$array_ty>::from(vec![merged_max])) as ArrayRef,
))
}};
}
match existing_min.data_type() {
DataType::UInt8 => prim_merge!(UInt8Array),
DataType::UInt16 => prim_merge!(UInt16Array),
DataType::UInt32 => prim_merge!(UInt32Array),
DataType::UInt64 => prim_merge!(UInt64Array),
DataType::Int8 => prim_merge!(Int8Array),
DataType::Int16 => prim_merge!(Int16Array),
DataType::Int32 => prim_merge!(Int32Array),
DataType::Int64 => prim_merge!(Int64Array),
DataType::Float32 => prim_merge!(Float32Array),
DataType::Float64 => prim_merge!(Float64Array),
DataType::Boolean => {
let ex_min = existing_min
.as_any()
.downcast_ref::<BooleanArray>()?
.value(0);
let ot_min = other_min.as_any().downcast_ref::<BooleanArray>()?.value(0);
let ex_max = existing_max
.as_any()
.downcast_ref::<BooleanArray>()?
.value(0);
let ot_max = other_max.as_any().downcast_ref::<BooleanArray>()?.value(0);
let merged_min = ex_min && ot_min;
let merged_max = ex_max || ot_max;
Some((
Arc::new(BooleanArray::from(vec![merged_min])),
Arc::new(BooleanArray::from(vec![merged_max])),
))
}
DataType::Utf8 => {
let ex_min = existing_min
.as_any()
.downcast_ref::<StringArray>()?
.value(0);
let ot_min = other_min.as_any().downcast_ref::<StringArray>()?.value(0);
let ex_max = existing_max
.as_any()
.downcast_ref::<StringArray>()?
.value(0);
let ot_max = other_max.as_any().downcast_ref::<StringArray>()?.value(0);
let merged_min = if ex_min < ot_min { ex_min } else { ot_min };
let merged_max = if ex_max > ot_max { ex_max } else { ot_max };
Some((
Arc::new(StringArray::from(vec![merged_min])),
Arc::new(StringArray::from(vec![merged_max])),
))
}
DataType::LargeUtf8 => {
let ex_min = existing_min
.as_any()
.downcast_ref::<LargeStringArray>()?
.value(0);
let ot_min = other_min
.as_any()
.downcast_ref::<LargeStringArray>()?
.value(0);
let ex_max = existing_max
.as_any()
.downcast_ref::<LargeStringArray>()?
.value(0);
let ot_max = other_max
.as_any()
.downcast_ref::<LargeStringArray>()?
.value(0);
let merged_min = if ex_min < ot_min { ex_min } else { ot_min };
let merged_max = if ex_max > ot_max { ex_max } else { ot_max };
Some((
Arc::new(LargeStringArray::from(vec![merged_min])),
Arc::new(LargeStringArray::from(vec![merged_max])),
))
}
DataType::Decimal128(precision, scale) => {
let ex_min = existing_min
.as_any()
.downcast_ref::<Decimal128Array>()?
.value(0);
let ot_min = other_min
.as_any()
.downcast_ref::<Decimal128Array>()?
.value(0);
let ex_max = existing_max
.as_any()
.downcast_ref::<Decimal128Array>()?
.value(0);
let ot_max = other_max
.as_any()
.downcast_ref::<Decimal128Array>()?
.value(0);
let merged_min = if ex_min < ot_min { ex_min } else { ot_min };
let merged_max = if ex_max > ot_max { ex_max } else { ot_max };
Some((
Arc::new(
Decimal128Array::from(vec![merged_min])
.with_precision_and_scale(*precision, *scale)
.ok()?,
),
Arc::new(
Decimal128Array::from(vec![merged_max])
.with_precision_and_scale(*precision, *scale)
.ok()?,
),
))
}
_ => None,
}
}
pub(crate) fn column_sum(col: &ArrayRef) -> Option<ArrayRef> {
macro_rules! signed {
($array_ty:ty) => {{
let a = col.as_any().downcast_ref::<$array_ty>()?;
let total: i128 = a.iter().flatten().map(i128::from).sum();
let v = i64::try_from(total).ok()?;
Some(Arc::new(Int64Array::from(vec![v])) as ArrayRef)
}};
}
macro_rules! unsigned {
($array_ty:ty) => {{
let a = col.as_any().downcast_ref::<$array_ty>()?;
let total: u128 = a.iter().flatten().map(u128::from).sum();
let v = u64::try_from(total).ok()?;
Some(Arc::new(UInt64Array::from(vec![v])) as ArrayRef)
}};
}
macro_rules! float {
($array_ty:ty) => {{
let a = col.as_any().downcast_ref::<$array_ty>()?;
let total: f64 = a.iter().flatten().map(f64::from).sum();
Some(Arc::new(Float64Array::from(vec![total])) as ArrayRef)
}};
}
match col.data_type() {
DataType::Int8 => signed!(Int8Array),
DataType::Int16 => signed!(Int16Array),
DataType::Int32 => signed!(Int32Array),
DataType::Int64 => signed!(Int64Array),
DataType::UInt8 => unsigned!(UInt8Array),
DataType::UInt16 => unsigned!(UInt16Array),
DataType::UInt32 => unsigned!(UInt32Array),
DataType::UInt64 => unsigned!(UInt64Array),
DataType::Float32 => float!(Float32Array),
DataType::Float64 => float!(Float64Array),
_ => None,
}
}
pub(crate) fn add_sum_arrays(a: &ArrayRef, b: &ArrayRef) -> Option<ArrayRef> {
match (a.data_type(), b.data_type()) {
(DataType::Int64, DataType::Int64) => {
let x = a.as_any().downcast_ref::<Int64Array>()?.value(0);
let y = b.as_any().downcast_ref::<Int64Array>()?.value(0);
Some(Arc::new(Int64Array::from(vec![x.checked_add(y)?])) as ArrayRef)
}
(DataType::UInt64, DataType::UInt64) => {
let x = a.as_any().downcast_ref::<UInt64Array>()?.value(0);
let y = b.as_any().downcast_ref::<UInt64Array>()?.value(0);
Some(Arc::new(UInt64Array::from(vec![x.checked_add(y)?])) as ArrayRef)
}
(DataType::Float64, DataType::Float64) => {
let x = a.as_any().downcast_ref::<Float64Array>()?.value(0);
let y = b.as_any().downcast_ref::<Float64Array>()?.value(0);
Some(Arc::new(Float64Array::from(vec![x + y])) as ArrayRef)
}
_ => None,
}
}
pub(crate) fn column_hll(col: &ArrayRef) -> Option<hll::HllSketch> {
let mut sketch = hll::HllSketch::new();
macro_rules! ints {
($array_ty:ty) => {{
let a = col.as_any().downcast_ref::<$array_ty>()?;
for v in a.iter().flatten() {
sketch.insert_hash(xxh3_64(&v.to_le_bytes()));
}
}};
}
match col.data_type() {
DataType::Int8 => ints!(Int8Array),
DataType::Int16 => ints!(Int16Array),
DataType::Int32 => ints!(Int32Array),
DataType::Int64 => ints!(Int64Array),
DataType::UInt8 => ints!(UInt8Array),
DataType::UInt16 => ints!(UInt16Array),
DataType::UInt32 => ints!(UInt32Array),
DataType::UInt64 => ints!(UInt64Array),
DataType::Float32 => {
let a = col.as_any().downcast_ref::<Float32Array>()?;
for v in a.iter().flatten() {
sketch.insert_hash(xxh3_64(&v.to_bits().to_le_bytes()));
}
}
DataType::Float64 => {
let a = col.as_any().downcast_ref::<Float64Array>()?;
for v in a.iter().flatten() {
sketch.insert_hash(xxh3_64(&v.to_bits().to_le_bytes()));
}
}
DataType::Utf8 => {
let a = col.as_any().downcast_ref::<StringArray>()?;
for v in a.iter().flatten() {
sketch.insert_hash(xxh3_64(v.as_bytes()));
}
}
DataType::LargeUtf8 => {
let a = col.as_any().downcast_ref::<LargeStringArray>()?;
for v in a.iter().flatten() {
sketch.insert_hash(xxh3_64(v.as_bytes()));
}
}
_ => return None,
}
Some(sketch)
}
pub(crate) fn column_min_max(col: &ArrayRef) -> Option<(ArrayRef, ArrayRef)> {
macro_rules! prim {
($array_ty:ty) => {{
let a = col.as_any().downcast_ref::<$array_ty>()?;
let mn = agg::min(a)?;
let mx = agg::max(a)?;
let mn_arr: ArrayRef = Arc::new(<$array_ty>::from(vec![mn]));
let mx_arr: ArrayRef = Arc::new(<$array_ty>::from(vec![mx]));
Some((mn_arr, mx_arr))
}};
}
match col.data_type() {
DataType::UInt8 => prim!(UInt8Array),
DataType::UInt16 => prim!(UInt16Array),
DataType::UInt32 => prim!(UInt32Array),
DataType::UInt64 => prim!(UInt64Array),
DataType::Int8 => prim!(Int8Array),
DataType::Int16 => prim!(Int16Array),
DataType::Int32 => prim!(Int32Array),
DataType::Int64 => prim!(Int64Array),
DataType::Float32 => prim!(Float32Array),
DataType::Float64 => prim!(Float64Array),
DataType::Boolean => {
let a = col.as_any().downcast_ref::<BooleanArray>()?;
let mn = agg::min_boolean(a)?;
let mx = agg::max_boolean(a)?;
Some((
Arc::new(BooleanArray::from(vec![mn])),
Arc::new(BooleanArray::from(vec![mx])),
))
}
DataType::Utf8 => {
let a = col.as_any().downcast_ref::<StringArray>()?;
let mn = agg::min_string(a)?;
let mx = agg::max_string(a)?;
Some((
Arc::new(StringArray::from(vec![mn])),
Arc::new(StringArray::from(vec![mx])),
))
}
DataType::LargeUtf8 => {
let a = col.as_any().downcast_ref::<LargeStringArray>()?;
let mn = agg::min_string(a)?;
let mx = agg::max_string(a)?;
Some((
Arc::new(LargeStringArray::from(vec![mn])),
Arc::new(LargeStringArray::from(vec![mx])),
))
}
DataType::Decimal128(precision, scale) => {
let a = col.as_any().downcast_ref::<Decimal128Array>()?;
let mn = agg::min(a)?;
let mx = agg::max(a)?;
Some((
Arc::new(
Decimal128Array::from(vec![mn])
.with_precision_and_scale(*precision, *scale)
.ok()?,
),
Arc::new(
Decimal128Array::from(vec![mx])
.with_precision_and_scale(*precision, *scale)
.ok()?,
),
))
}
_ => None,
}
}
#[derive(Debug, Clone)]
pub struct VectorSummary {
pub centroid: Vec<f32>,
pub radius: f32,
pub clusters: ClusterCentroids,
}
const SQ8_CODE_MAX: f32 = 255.0;
#[derive(Debug, Clone, Default)]
pub struct ClusterCentroids {
pub n_cent: u32,
pub dim: u32,
pub codes: Vec<u8>,
pub mins: Vec<f32>,
pub scales: Vec<f32>,
pub counts: Vec<u32>,
pub code_moments: OnceLock<Vec<(f32, f32)>>,
}
impl ClusterCentroids {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.n_cent == 0
}
pub fn from_fp32(n_cent: u32, dim: u32, centroids: &[f32], counts: Vec<u32>) -> Self {
let nc = n_cent as usize;
let d = dim as usize;
let mut codes = vec![0u8; nc * d];
let mut mins = vec![0f32; nc];
let mut scales = vec![0f32; nc];
for c in 0..nc {
let src = ¢roids[c * d..(c + 1) * d];
let mut mn = f32::INFINITY;
let mut mx = f32::NEG_INFINITY;
for &v in src {
mn = mn.min(v);
mx = mx.max(v);
}
if !mn.is_finite() {
mn = 0.0;
}
if !mx.is_finite() {
mx = 0.0;
}
let scale = if mx > mn {
(mx - mn) / SQ8_CODE_MAX
} else {
0.0
};
mins[c] = mn;
scales[c] = scale;
let dst = &mut codes[c * d..(c + 1) * d];
for (o, &v) in dst.iter_mut().zip(src) {
*o = if scale > 0.0 {
((v - mn) / scale).round().clamp(0.0, SQ8_CODE_MAX) as u8
} else {
0
};
}
}
Self {
n_cent,
dim,
codes,
mins,
scales,
counts,
code_moments: OnceLock::new(),
}
}
pub fn score_clusters_into(
&self,
metric: Metric,
query: &[f32],
sum_q: f32,
norm_q_sq: f32,
mut emit: impl FnMut(u32, f32),
) {
let d = self.dim as usize;
debug_assert_eq!(query.len(), d);
let moments = matches!(metric, Metric::L2Sq).then(|| {
self.code_moments.get_or_init(|| {
(0..self.n_cent as usize)
.map(|c| u8_sum_sumsq(&self.codes[c * d..(c + 1) * d]))
.collect()
})
});
for c in 0..self.n_cent as usize {
if self.counts[c] == 0 {
continue;
}
let codes = &self.codes[c * d..(c + 1) * d];
let dot_qc = self.mins[c] * sum_q + self.scales[c] * sq8_dot(query, codes, d);
let score = match metric {
Metric::Cosine => COSINE_DISTANCE_BASE - dot_qc,
Metric::NegDot => -dot_qc,
Metric::L2Sq => {
let (sum_c, sumsq_c) = moments.expect("moments built for L2 above")[c];
let centroid_norm_sq = d as f32 * self.mins[c] * self.mins[c]
+ L2_CROSS_TERM_COEFF * self.mins[c] * self.scales[c] * sum_c
+ self.scales[c] * self.scales[c] * sumsq_c;
norm_q_sq - L2_CROSS_TERM_COEFF * dot_qc + centroid_norm_sq
}
};
emit(c as u32, score);
}
}
pub fn dequantize_into(&self, c: usize, out: &mut [f32]) {
let d = self.dim as usize;
let codes = &self.codes[c * d..(c + 1) * d];
let mn = self.mins[c];
let scale = self.scales[c];
for (o, &code) in out.iter_mut().zip(codes) {
*o = mn + code as f32 * scale;
}
}
}
#[cfg(test)]
mod tests {
use std::{hint::black_box, slice::from_ref, sync::Arc, time::Instant};
use arrow_array::Array;
use arrow_schema::{DataType, Field, Schema};
use dashmap::DashMap;
use tempfile::TempDir;
use tokio::sync::OnceCell;
use super::*;
use crate::{
storage::LocalFsStorageProvider,
superfile::{builder::FtsConfig, vector::distance::distance},
supertable::manifest::{
commit::{PartWriteResult, write_manifest_part},
list::{Manifest, PartitionStrategy},
},
test_helpers::default_tokenizer,
};
fn synth_clusters(n_cent: u32, dim: u32, seed: u64) -> (ClusterCentroids, Vec<f32>) {
let (nc, d) = (n_cent as usize, dim as usize);
let mut centroids = vec![0f32; nc * d];
for c in 0..nc {
for j in 0..d {
let v = ((seed + (c * d + j) as u64 * 2_654_435_761) % 1000) as f32 / 250.0 - 2.0
+ c as f32 * 0.1;
centroids[c * d + j] = v;
}
}
let counts: Vec<u32> = (0..nc).map(|c| if c == nc / 2 { 0 } else { 10 }).collect();
let cc = ClusterCentroids::from_fp32(n_cent, dim, ¢roids, counts);
(cc, centroids)
}
#[test]
fn score_clusters_into_matches_dequantized_distance() {
let (n_cent, dim) = (17u32, 96u32);
let (cc, _) = synth_clusters(n_cent, dim, 7);
let query: Vec<f32> = (0..dim)
.map(|j| ((j as u64 * 40_503 + 11) % 997) as f32 / 500.0 - 1.0)
.collect();
let sum_q: f32 = query.iter().sum();
let norm_q_sq: f32 = query.iter().map(|v| v * v).sum();
for metric in [Metric::Cosine, Metric::L2Sq, Metric::NegDot] {
let mut folded: Vec<(u32, f32)> = Vec::new();
cc.score_clusters_into(metric, &query, sum_q, norm_q_sq, |c, s| {
folded.push((c, s));
});
let mut deq = vec![0f32; dim as usize];
let mut reference: Vec<(u32, f32)> = Vec::new();
for c in 0..n_cent as usize {
if cc.counts[c] == 0 {
continue;
}
cc.dequantize_into(c, &mut deq);
reference.push((c as u32, distance(metric, &query, &deq)));
}
assert_eq!(
folded.len(),
reference.len(),
"{metric:?}: cluster sets differ (count-0 skip)"
);
for ((fc, fs), (rc, rs)) in folded.iter().zip(&reference) {
assert_eq!(fc, rc, "{metric:?}: cluster order");
let tol = 1e-3 * (1.0 + rs.abs());
assert!(
(fs - rs).abs() <= tol,
"{metric:?} cluster {fc}: folded {fs} vs dequantized {rs} (tol {tol})"
);
}
}
}
#[test]
#[ignore = "perf microbench, not a correctness gate"]
fn score_clusters_microbench() {
let (n_cent, dim) = (4096u32, 384u32);
let iters = 50usize;
let (cc, _) = synth_clusters(n_cent, dim, 99);
let query: Vec<f32> = (0..dim).map(|j| (j as f32).sin()).collect();
let sum_q: f32 = query.iter().sum();
let norm_q_sq: f32 = query.iter().map(|v| v * v).sum();
for metric in [Metric::Cosine, Metric::L2Sq] {
let t0 = Instant::now();
for _ in 0..iters {
let mut acc = 0f32;
cc.score_clusters_into(metric, &query, sum_q, norm_q_sq, |_, s| acc += s);
black_box(acc);
}
let folded_us = t0.elapsed().as_micros() as f64 / iters as f64;
let mut deq = vec![0f32; dim as usize];
let t0 = Instant::now();
for _ in 0..iters {
let mut acc = 0f32;
for c in 0..n_cent as usize {
if cc.counts[c] == 0 {
continue;
}
cc.dequantize_into(c, &mut deq);
acc += distance(metric, &query, &deq);
}
black_box(acc);
}
let dequant_us = t0.elapsed().as_micros() as f64 / iters as f64;
println!(
"score_clusters {metric:?}: folded {folded_us:.0} µs vs dequantize {dequant_us:.0} µs ({:.1}×)",
dequant_us / folded_us
);
}
}
fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]))
}
fn opts() -> Arc<SupertableOptions> {
let tk = default_tokenizer();
Arc::new(
SupertableOptions::new(
schema(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("valid options"),
)
}
fn seg_entry(uuid: Uuid, n_docs: u64) -> Arc<SuperfileEntry> {
Arc::new(SuperfileEntry {
superfile_id: uuid,
uri: SuperfileUri(uuid),
n_docs,
id_min: 0,
id_max: n_docs.saturating_sub(1) as i128,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
#[test]
fn empty_manifest_starts_at_zero() {
let m = ManifestSnapshot::empty(opts());
assert_eq!(m.manifest_id, 0);
assert_eq!(m.superfiles.len(), 0);
assert_eq!(m.n_docs_total(), 0);
}
#[test]
fn with_appended_increments_manifest_id_and_extends_superfiles() {
let m0 = ManifestSnapshot::empty(opts());
let entry = seg_entry(Uuid::new_v4(), 100);
let m1 = m0.with_appended(vec![entry.clone()]);
assert_eq!(m1.manifest_id, 1);
assert_eq!(m1.superfiles.len(), 1);
assert_eq!(m1.n_docs_total(), 100);
assert_eq!(m0.manifest_id, 0);
assert_eq!(m0.superfiles.len(), 0);
assert_eq!(m0.n_docs_total(), 0);
}
#[test]
fn with_appended_chains_to_higher_manifest_ids() {
let m0 = ManifestSnapshot::empty(opts());
let m1 = m0.with_appended(vec![seg_entry(Uuid::new_v4(), 50)]);
let m2 = m1.with_appended(vec![seg_entry(Uuid::new_v4(), 75)]);
assert_eq!(m0.manifest_id, 0);
assert_eq!(m1.manifest_id, 1);
assert_eq!(m2.manifest_id, 2);
assert_eq!(m0.superfiles.len(), 0);
assert_eq!(m1.superfiles.len(), 1);
assert_eq!(m2.superfiles.len(), 2);
assert_eq!(m2.n_docs_total(), 50 + 75);
}
#[test]
fn with_appended_shares_old_superfiles_via_arc() {
let entry = seg_entry(Uuid::new_v4(), 1);
let m0 = ManifestSnapshot::empty(opts()).with_appended(vec![entry.clone()]);
let m1 = m0.with_appended(vec![seg_entry(Uuid::new_v4(), 2)]);
assert!(Arc::ptr_eq(&m0.superfiles[0], &m1.superfiles[0]));
}
#[test]
fn with_appended_empty_input_still_bumps_manifest_id() {
let m0 = ManifestSnapshot::empty(opts());
let m1 = m0.with_appended(vec![]);
assert_eq!(m1.manifest_id, 1);
assert_eq!(m1.superfiles.len(), 0);
}
#[test]
fn new_from_superfiles_builds_manifest_at_id_one_with_entries() {
let a = seg_entry(Uuid::new_v4(), 10);
let b = seg_entry(Uuid::new_v4(), 20);
let m = ManifestSnapshot::new_from_superfiles(opts(), vec![a.clone(), b.clone()]);
assert_eq!(m.manifest_id, 1);
assert_eq!(m.superfiles.len(), 2);
assert_eq!(m.n_docs_total(), 30);
assert!(Arc::ptr_eq(&m.superfiles[0], &a));
assert!(Arc::ptr_eq(&m.superfiles[1], &b));
assert!(m.is_in_process_only());
}
#[test]
fn new_from_superfiles_with_empty_input_is_empty_at_id_one() {
let m = ManifestSnapshot::new_from_superfiles(opts(), vec![]);
assert_eq!(m.manifest_id, 1);
assert_eq!(m.superfiles.len(), 0);
assert_eq!(m.n_docs_total(), 0);
}
#[test]
fn get_next_manifest_id_is_current_plus_one() {
let m0 = ManifestSnapshot::empty(opts());
assert_eq!(m0.get_manifest_id(), 0);
assert_eq!(m0.get_next_manifest_id(), 1);
let m1 = m0.with_appended(vec![seg_entry(Uuid::new_v4(), 1)]);
assert_eq!(m1.get_manifest_id(), 1);
assert_eq!(m1.get_next_manifest_id(), 2);
}
#[test]
fn get_next_manifest_id_is_a_pure_read() {
let m = ManifestSnapshot::empty(opts());
let _ = m.get_next_manifest_id();
assert_eq!(m.get_manifest_id(), 0, "current id unchanged");
assert_eq!(m.get_next_manifest_id(), m.get_next_manifest_id());
}
#[test]
fn superfile_uri_is_distinct_per_call() {
let a = SuperfileUri::new_v4();
let b = SuperfileUri::new_v4();
assert_ne!(a, b);
}
mod lazy_load {
use std::{
collections::HashMap,
error::Error,
ops::Range,
slice::from_ref,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::SystemTime,
};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use bytes::Bytes;
use dashmap::DashMap;
use tokio::spawn;
use uuid::Uuid;
use super::super::*;
use crate::{
storage::{ObjectMeta, StorageError, StorageProvider},
supertable::{
SupertableOptions,
manifest::{
list::{FORMAT_VERSION as LIST_FORMAT_VERSION, PartitionStrategy},
part::{self as part_mod, ContentHash, ManifestPart, PartId},
},
},
};
#[derive(Debug)]
struct CountingMockStorage {
objects: HashMap<String, Bytes>,
get_calls: AtomicUsize,
}
impl CountingMockStorage {
fn new(objects: HashMap<String, Bytes>) -> Self {
Self {
objects,
get_calls: AtomicUsize::new(0),
}
}
fn get_call_count(&self) -> usize {
self.get_calls.load(Ordering::Acquire)
}
}
#[async_trait]
impl StorageProvider for CountingMockStorage {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
match self.objects.get(uri) {
Some(b) => Ok(ObjectMeta {
size: b.len() as u64,
etag: Some("mock-etag".into()),
last_modified: SystemTime::UNIX_EPOCH,
}),
None => Err(StorageError::NotFound { uri: uri.into() }),
}
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
self.get_calls.fetch_add(1, Ordering::AcqRel);
match self.objects.get(uri) {
Some(b) => Ok((
b.clone(),
ObjectMeta {
size: b.len() as u64,
etag: Some("mock-etag".into()),
last_modified: SystemTime::UNIX_EPOCH,
},
)),
None => Err(StorageError::NotFound { uri: uri.into() }),
}
}
async fn get_range(
&self,
uri: &str,
_range: Range<u64>,
) -> Result<Bytes, StorageError> {
Err(permanent(uri, "get_range unimplemented for mock"))
}
async fn put_atomic(
&self,
uri: &str,
_bytes: Bytes,
) -> Result<Option<String>, StorageError> {
Err(permanent(uri, "put_atomic unimplemented for mock"))
}
async fn put_if_match(
&self,
uri: &str,
_bytes: Bytes,
_expected_etag: Option<&str>,
) -> Result<Option<String>, StorageError> {
Err(permanent(uri, "put_if_match unimplemented for mock"))
}
async fn put_multipart(
&self,
uri: &str,
) -> Result<Box<dyn object_store::MultipartUpload>, StorageError> {
Err(permanent(uri, "put_multipart unimplemented for mock"))
}
async fn delete(&self, _uri: &str) -> Result<(), StorageError> {
Ok(())
}
}
fn permanent(uri: &str, msg: &'static str) -> StorageError {
let boxed: Box<dyn Error + Send + Sync> = msg.into();
StorageError::Permanent {
uri: uri.into(),
source: boxed,
}
}
fn make_test_part(seed: u8) -> ManifestPart {
ManifestPart {
format_version: part_mod::FORMAT_VERSION.into(),
part_id: PartId(Uuid::from_bytes([seed; 16])),
superfiles: vec![],
}
}
fn encode_and_index(
parts: &[ManifestPart],
) -> (HashMap<String, Bytes>, Vec<ManifestPartEntry>) {
let mut objects = HashMap::new();
let mut entries = Vec::new();
for p in parts {
let bytes = part_mod::encode(p, 3);
let hash = ContentHash::of(&bytes);
let uri = format!("manifests/part-{}.avro.zst", hash.to_hex());
let size_compressed = bytes.len() as u64;
objects.insert(uri.clone(), Bytes::from(bytes));
entries.push(ManifestPartEntry {
part_id: p.part_id,
uri,
n_superfiles: p.superfiles.len() as u64,
size_bytes_compressed: size_compressed,
size_bytes_uncompressed: size_compressed,
content_hash: hash,
partition_key: Vec::new(),
id_range: (0, 0),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
});
}
(objects, entries)
}
fn fresh_list(entries: Vec<ManifestPartEntry>) -> list::Manifest {
list::Manifest {
format_version: LIST_FORMAT_VERSION.into(),
manifest_id: 1,
options_hash: ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "doc_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "doc_id".into(),
n_buckets: 64,
},
parts: entries,
}
}
fn options_for_test() -> Arc<SupertableOptions> {
let s = Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]));
Arc::new(SupertableOptions::new(s, vec![], vec![], None).expect("opts"))
}
fn build_manifest_with_loader(
list: Manifest,
storage: Arc<dyn StorageProvider>,
) -> ManifestSnapshot {
let loader = Arc::new(ManifestPartLoader::new(Arc::clone(&storage), &list));
ManifestSnapshot {
superfile_list: SuperfileList::empty(options_for_test()),
list: Some(list),
parts: DashMap::new(),
loader: Some(loader),
}
}
#[tokio::test]
async fn part_first_touch_loads_and_caches() {
let part = make_test_part(7);
let (objects, entries) = encode_and_index(from_ref(&part));
let storage = Arc::new(CountingMockStorage::new(objects));
let list = fresh_list(entries);
let manifest =
build_manifest_with_loader(list, Arc::clone(&storage) as Arc<dyn StorageProvider>);
let loaded = manifest.get_part_by_id(part.part_id).await.expect("load");
assert_eq!(loaded.part_id, part.part_id);
assert_eq!(storage.get_call_count(), 1, "exactly one storage.get");
}
#[tokio::test]
async fn second_touch_hits_cache_zero_additional_gets() {
let part = make_test_part(11);
let (objects, entries) = encode_and_index(from_ref(&part));
let storage = Arc::new(CountingMockStorage::new(objects));
let list = fresh_list(entries);
let manifest =
build_manifest_with_loader(list, Arc::clone(&storage) as Arc<dyn StorageProvider>);
let a = manifest
.get_part_by_id(part.part_id)
.await
.expect("first load");
let b = manifest
.get_part_by_id(part.part_id)
.await
.expect("second load");
assert!(Arc::ptr_eq(&a, &b), "second touch must return cached Arc");
assert_eq!(storage.get_call_count(), 1, "cache hit ⇒ no extra get");
}
#[tokio::test]
async fn concurrent_loaders_coalesce_to_one_get() {
let part = make_test_part(13);
let (objects, entries) = encode_and_index(from_ref(&part));
let storage = Arc::new(CountingMockStorage::new(objects));
let list = fresh_list(entries);
let manifest = Arc::new(build_manifest_with_loader(
list,
Arc::clone(&storage) as Arc<dyn StorageProvider>,
));
let mut handles = Vec::with_capacity(100);
for _ in 0..100 {
let m = Arc::clone(&manifest);
let pid = part.part_id;
handles.push(spawn(async move { m.get_part_by_id(pid).await }));
}
let mut first: Option<Arc<ManifestPart>> = None;
for h in handles {
let p = h.await.expect("join").expect("load");
match &first {
None => first = Some(p),
Some(f) => assert!(
Arc::ptr_eq(f, &p),
"all concurrent loaders must share the same Arc"
),
}
}
assert_eq!(
storage.get_call_count(),
1,
"100 concurrent loaders on cold cell ⇒ exactly one storage.get"
);
}
#[tokio::test]
async fn content_hash_mismatch_surfaces_typed_error_without_refetch() {
let part = make_test_part(17);
let (mut objects, entries) = encode_and_index(from_ref(&part));
let bytes = objects.values().next().expect("one obj").clone();
let mut tampered = bytes.to_vec();
let last = tampered.len() - 1;
tampered[last] ^= 0xff;
let uri = entries[0].uri.clone();
objects.insert(uri, Bytes::from(tampered));
let (_, fresh_entries) = encode_and_index(from_ref(&part));
let list = fresh_list(fresh_entries);
let storage = Arc::new(CountingMockStorage::new(objects));
let manifest =
build_manifest_with_loader(list, Arc::clone(&storage) as Arc<dyn StorageProvider>);
let err = manifest
.get_part_by_id(part.part_id)
.await
.expect_err("must reject tampered bytes");
assert!(
matches!(err, ManifestLoadError::ContentHashMismatch { .. }),
"expected ContentHashMismatch, got {err:?}"
);
let _pre = storage.get_call_count();
let err2 = manifest
.get_part_by_id(part.part_id)
.await
.expect_err("must reject on retry too");
assert!(matches!(
err2,
ManifestLoadError::ContentHashMismatch { .. }
));
}
#[tokio::test]
async fn part_id_not_in_list_surfaces_typed_error() {
let part = make_test_part(19);
let (objects, entries) = encode_and_index(&[part]);
let storage = Arc::new(CountingMockStorage::new(objects));
let list = fresh_list(entries);
let manifest =
build_manifest_with_loader(list, Arc::clone(&storage) as Arc<dyn StorageProvider>);
let stranger = PartId(Uuid::from_bytes([0xff; 16]));
let err = manifest
.get_part_by_id(stranger)
.await
.expect_err("must reject");
assert!(
matches!(err, ManifestLoadError::PartNotInList { .. }),
"expected PartNotInList, got {err:?}"
);
assert_eq!(
storage.get_call_count(),
0,
"missing-id check happens before any storage.get"
);
}
#[tokio::test]
async fn disk_cache_hit_serves_second_loader_without_storage_get() {
let part = make_test_part(23);
let (objects, entries) = encode_and_index(from_ref(&part));
let storage = Arc::new(CountingMockStorage::new(objects));
let storage_dyn = Arc::clone(&storage) as Arc<dyn StorageProvider>;
let list = fresh_list(entries);
let cache_root = std::env::temp_dir()
.join("infino-manifest-cache-loader-test-disk_cache_hit_second_loader");
let _ = std::fs::remove_dir_all(&cache_root);
let cache = ManifestDiskCache::new(cache_root.clone(), 1 << 20).expect("cache");
let loader_a = ManifestPartLoader::new_with_cache(
Arc::clone(&storage_dyn),
&list,
Some(Arc::clone(&cache)),
);
let a = loader_a.load(part.part_id).await.expect("first load");
assert_eq!(a.part_id, part.part_id);
assert_eq!(storage.get_call_count(), 1, "first loader fetches once");
assert_eq!(cache.stats().n_entries, 1, "part bytes cached on disk");
let loader_b = ManifestPartLoader::new_with_cache(
Arc::clone(&storage_dyn),
&list,
Some(Arc::clone(&cache)),
);
let b = loader_b.load(part.part_id).await.expect("second load");
assert_eq!(b.part_id, part.part_id);
assert_eq!(
storage.get_call_count(),
1,
"disk-cache hit ⇒ no additional storage.get"
);
assert!(cache.stats().n_hits >= 1, "recorded a cache hit");
let _ = std::fs::remove_dir_all(&cache_root);
}
#[tokio::test]
async fn loader_without_cache_always_hits_storage() {
let part = make_test_part(29);
let (objects, entries) = encode_and_index(from_ref(&part));
let storage = Arc::new(CountingMockStorage::new(objects));
let storage_dyn = Arc::clone(&storage) as Arc<dyn StorageProvider>;
let list = fresh_list(entries);
let loader = ManifestPartLoader::new(Arc::clone(&storage_dyn), &list);
loader.load(part.part_id).await.expect("load 1");
loader.load(part.part_id).await.expect("load 2");
assert_eq!(
storage.get_call_count(),
2,
"no cache ⇒ every load round-trips to storage"
);
}
#[tokio::test]
async fn no_loader_attached_surfaces_typed_error() {
let manifest = ManifestSnapshot::empty(options_for_test());
let err = manifest
.get_part_by_id(PartId(Uuid::nil()))
.await
.expect_err("must error");
assert!(
matches!(err, ManifestLoadError::NoLoaderAttached),
"expected NoLoaderAttached, got {err:?}"
);
}
}
#[test]
fn superfile_uri_path_helpers_share_the_same_uuid() {
let uri = SuperfileUri(Uuid::from_u128(0x1234_5678));
let id = uri.0;
assert_eq!(uri.storage_path(), format!("data/seg-{id}.sf.parquet"));
assert_eq!(uri.cache_filename(), format!("seg-{id}.sf.parquet"));
assert_eq!(uri.cache_tmp_filename(), format!("seg-{id}.sf.parquet.tmp"));
}
#[test]
fn manifest_debug_reports_counts() {
let m = ManifestSnapshot::empty(opts()).with_appended(vec![seg_entry(Uuid::new_v4(), 3)]);
let dbg = format!("{m:?}");
assert!(dbg.contains("Manifest"));
assert!(dbg.contains("manifest_id"));
assert!(dbg.contains("n_superfiles"));
assert!(dbg.contains("has_loader"));
}
#[test]
fn manifest_debug_with_list_reports_part_count() {
use list::PartitionStrategy;
let entry = part::PartId::new_v4();
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 1,
options_hash: part::ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![list::ManifestPartEntry {
part_id: entry,
uri: "manifests/part-x".into(),
n_superfiles: 0,
size_bytes_compressed: 0,
size_bytes_uncompressed: 0,
content_hash: part::ContentHash([0u8; 32]),
partition_key: Vec::new(),
id_range: (0, 0),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let m = ManifestSnapshot {
superfile_list: SuperfileList::empty(opts()),
list: Some(list),
parts: DashMap::new(),
loader: None,
};
let dbg = format!("{m:?}");
assert!(dbg.contains("n_parts: 1"), "{dbg}");
assert!(dbg.contains("has_list: true"), "{dbg}");
}
#[test]
fn cluster_centroids_empty_is_empty_and_default_matches() {
let cc = ClusterCentroids::empty();
assert!(cc.is_empty());
assert_eq!(cc.n_cent, 0);
let cc = ClusterCentroids::from_fp32(2, 4, &[0.0; 8], vec![1, 1]);
assert!(!cc.is_empty());
assert_eq!(cc.n_cent, 2);
assert_eq!(cc.dim, 4);
}
#[test]
fn add_sum_arrays_handles_each_type_and_overflow() {
use arrow_array::{Float64Array, Int64Array, UInt64Array};
let r = add_sum_arrays(
&(Arc::new(Int64Array::from(vec![3])) as ArrayRef),
&(Arc::new(Int64Array::from(vec![4])) as ArrayRef),
)
.expect("int sum");
assert_eq!(
r.as_any()
.downcast_ref::<Int64Array>()
.expect("test")
.value(0),
7
);
let r = add_sum_arrays(
&(Arc::new(UInt64Array::from(vec![3u64])) as ArrayRef),
&(Arc::new(UInt64Array::from(vec![4u64])) as ArrayRef),
)
.expect("uint sum");
assert_eq!(
r.as_any()
.downcast_ref::<UInt64Array>()
.expect("test")
.value(0),
7
);
let r = add_sum_arrays(
&(Arc::new(Float64Array::from(vec![1.5])) as ArrayRef),
&(Arc::new(Float64Array::from(vec![2.5])) as ArrayRef),
)
.expect("float sum");
assert!(
(r.as_any()
.downcast_ref::<Float64Array>()
.expect("test")
.value(0)
- 4.0)
.abs()
< 1e-9
);
let r = add_sum_arrays(
&(Arc::new(Int64Array::from(vec![i64::MAX])) as ArrayRef),
&(Arc::new(Int64Array::from(vec![1])) as ArrayRef),
);
assert!(r.is_none(), "i64 overflow drops the stat");
let r = add_sum_arrays(
&(Arc::new(Int64Array::from(vec![1])) as ArrayRef),
&(Arc::new(UInt64Array::from(vec![1u64])) as ArrayRef),
);
assert!(r.is_none(), "type mismatch drops the stat");
}
fn make_superfile_entry(docs: u64, pk: Vec<u8>) -> Arc<SuperfileEntry> {
Arc::new(SuperfileEntry {
superfile_id: uuid::Uuid::new_v4(),
uri: SuperfileUri::new_v4(),
n_docs: docs,
id_min: 0,
id_max: docs as i128 - 1,
scalar_stats: Default::default(),
fts_summary: Default::default(),
vector_summary: Default::default(),
partition_key: pk,
partition_hint: None,
subsection_offsets: None,
})
}
fn hash_bucket_0_pk() -> Vec<u8> {
vec![0, 0, 0, 0]
}
fn simple_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"text",
DataType::LargeUtf8,
false,
)]))
}
fn make_opts() -> Arc<SupertableOptions> {
SupertableOptions::new(simple_schema(), vec![], vec![], None)
.map(Arc::new)
.expect("valid options")
}
fn empty_manifest(opts: &Arc<SupertableOptions>) -> Arc<ManifestSnapshot> {
Arc::new(ManifestSnapshot {
superfile_list: SuperfileList::empty(opts.clone()),
list: Some(Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![],
}),
parts: DashMap::new(),
loader: None,
})
}
#[tokio::test]
async fn update_fresh_start_cold_partition_should_create_entry() {
let opts = make_opts();
let old_manifest = empty_manifest(&opts);
let pk = hash_bucket_0_pk();
let new_entry = make_superfile_entry(100, pk.clone());
let new_entries = vec![new_entry];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(parts[0].part.superfiles.len(), 1);
assert_eq!(parts[0].part.superfiles[0].n_docs, 100);
}
#[tokio::test]
async fn update_fresh_start_multiple_cold_partitions_should_create_entries() {
let opts = make_opts();
let old_manifest = empty_manifest(&opts);
let pk = hash_bucket_0_pk();
let entry1 = make_superfile_entry(100, pk.clone());
let entry2 = make_superfile_entry(200, pk.clone());
let new_entries = vec![entry1, entry2];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let total_docs: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 300);
}
fn local_storage() -> (TempDir, Arc<dyn StorageProvider>) {
let dir = TempDir::new().expect("tempdir");
let store: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("local"));
(dir, store)
}
#[tokio::test]
async fn update_add_to_existing_partition_rewrites_part() {
let opts = make_opts();
let pk_untouched = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let old_superfile = make_superfile_entry(100, pk_untouched.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![old_superfile.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_untouched.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts = DashMap::new();
parts.insert(
pw.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![old_superfile],
},
list: Some(list),
parts,
loader: Some(Arc::new(loader)),
});
let new_entry = make_superfile_entry(50, pk_untouched.clone());
let new_entries = vec![new_entry];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk_untouched);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let total_docs: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 150);
}
#[tokio::test]
async fn update_leaves_unchanged_parts_untouched() {
const SUPERFILES_PER_PART: u64 = 2;
const TARGET_SUPERFILES_PER_PART: u64 = 3;
let pk_a = hash2_pk(0);
let pk_b = hash2_pk(1);
let (_dir, storage) = local_storage();
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = TARGET_SUPERFILES_PER_PART;
let opts = Arc::new(base_opts.with_storage(storage.clone()));
async fn two_superfile_part(
storage: &dyn StorageProvider,
pk: &[u8],
hint: u32,
docs: [u64; 2],
) -> (ManifestPart, PartWriteResult) {
let part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![
make_superfile_entry_hinted(docs[0], pk.to_vec(), hint),
make_superfile_entry_hinted(docs[1], pk.to_vec(), hint),
],
};
let pw = write_manifest_part(storage, &part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
(part, pw)
}
let (part_a_old, pw_a_old) =
two_superfile_part(storage.as_ref(), &pk_a, 0, [100, 110]).await;
let (part_a_latest, pw_a_latest) =
two_superfile_part(storage.as_ref(), &pk_a, 0, [120, 130]).await;
let (part_b, pw_b) = two_superfile_part(storage.as_ref(), &pk_b, 1, [200, 210]).await;
let entry_for = |pw: &PartWriteResult, pk: &[u8]| -> ManifestPartEntry {
ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri.clone(),
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: SUPERFILES_PER_PART,
partition_key: pk.to_vec(),
id_range: (0, 0),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}
};
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 2,
},
parts: vec![
entry_for(&pw_a_old, &pk_a),
entry_for(&pw_a_latest, &pk_a),
entry_for(&pw_b, &pk_b),
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_latest)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: part_a_old
.superfiles
.iter()
.chain(part_b.superfiles.iter())
.cloned()
.collect(),
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let new_entry = make_superfile_entry_hinted(140, pk_a.clone(), 0);
let (new_manifest, parts_to_write) = old_manifest
.update(from_ref(&new_entry), &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 3, "list entry count");
assert_eq!(
parts_to_write.len(),
1,
"only the rewritten latest-A part should be re-emitted; \
unchanged parts must not be re-encoded/PUT",
);
let find = |part_id: PartId| {
list_entries
.iter()
.find(|e| e.part_id == part_id)
.unwrap_or_else(|| panic!("entry for part {part_id:?} missing after update"))
};
let a_old_after = find(pw_a_old.part_id);
assert_eq!(a_old_after.uri, pw_a_old.uri, "frozen older A part uri");
assert_eq!(
a_old_after.content_hash, pw_a_old.content_hash,
"frozen older A part content_hash",
);
assert_eq!(a_old_after.n_superfiles, SUPERFILES_PER_PART);
let b_after = find(pw_b.part_id);
assert_eq!(b_after.uri, pw_b.uri, "untouched B part uri");
assert_eq!(
b_after.content_hash, pw_b.content_hash,
"untouched B part content_hash",
);
assert_eq!(b_after.n_superfiles, SUPERFILES_PER_PART);
assert_eq!(
parts_to_write[0].part.superfiles.len(),
(SUPERFILES_PER_PART + 1) as usize,
"rewritten latest-A part should hold its 2 superfiles + the new one",
);
assert!(
!list_entries
.iter()
.any(|e| e.part_id == pw_a_latest.part_id),
"the rewritten latest-A part is replaced, so its old part_id must not survive",
);
let latest_a_v1_part_id = list_entries
.iter()
.find(|e| e.partition_key == pk_a && e.part_id != pw_a_old.part_id)
.expect("rewritten latest-A entry present after the add")
.part_id;
let (after_removal, removal_parts) = new_manifest
.update(&[], from_ref(&new_entry))
.await
.expect("update removal");
let entries_after = after_removal.get_all_list_entries();
assert_eq!(entries_after.len(), 3, "list entry count after removal");
assert!(
!entries_after
.iter()
.any(|e| e.part_id == latest_a_v1_part_id),
"the part we removed a superfile from must be rebuilt (new part_id)",
);
let b_after_removal = entries_after
.iter()
.find(|e| e.part_id == pw_b.part_id)
.expect("untouched B part must survive the removal unchanged");
assert_eq!(b_after_removal.uri, pw_b.uri, "B uri after removal");
assert_eq!(
b_after_removal.content_hash, pw_b.content_hash,
"B content_hash after removal",
);
assert!(
entries_after.iter().any(|e| e.part_id == pw_a_old.part_id),
"frozen older A part holds none of the removed superfile and must stay \
unchanged, but the removal rebuilt it under a new part_id; entries now: {:?}",
entries_after
.iter()
.map(|e| (e.part_id, e.partition_key.clone(), e.n_superfiles))
.collect::<Vec<_>>(),
);
let a_old_after_removal = entries_after
.iter()
.find(|e| e.part_id == pw_a_old.part_id)
.expect("frozen older A part must survive the removal unchanged");
assert_eq!(
a_old_after_removal.uri, pw_a_old.uri,
"frozen older A part uri after removal",
);
assert_eq!(
a_old_after_removal.content_hash, pw_a_old.content_hash,
"frozen older A part content_hash after removal",
);
assert_eq!(
removal_parts.len(),
1,
"only the part we removed from should be rewritten; unchanged parts \
must not be re-encoded/PUT",
);
}
#[tokio::test]
async fn update_rewrite_partition_within_target() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 3;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf1 = make_superfile_entry(100, pk.clone());
let sf2 = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf1.clone(), sf2.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts = DashMap::new();
parts.insert(
pw.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf1, sf2],
},
list: Some(list),
parts,
loader: Some(Arc::new(loader)),
});
let new_entry = make_superfile_entry(75, pk.clone());
let new_entries = vec![new_entry];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 3);
let part = &parts[0];
assert_eq!(part.part.superfiles.len(), 3);
let total_docs: u64 = part.part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 325); }
#[tokio::test]
async fn update_split_partition_exceeds_target() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 2;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf1 = make_superfile_entry(100, pk.clone());
let sf2 = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf1.clone(), sf2.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts = DashMap::new();
parts.insert(
pw.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf1, sf2],
},
list: Some(list),
parts,
loader: Some(Arc::new(loader)),
});
let new_entry1 = make_superfile_entry(75, pk.clone());
let new_entry2 = make_superfile_entry(80, pk.clone());
let new_entries = vec![new_entry1, new_entry2];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[1].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(list_entries[1].n_superfiles, 2);
let part = &parts[0];
assert_eq!(part.part.superfiles.len(), 2);
let total_docs: u64 = part.part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 155); }
fn make_superfile_entry_hinted(docs: u64, pk: Vec<u8>, hint: u32) -> Arc<SuperfileEntry> {
Arc::new(SuperfileEntry {
superfile_id: uuid::Uuid::new_v4(),
uri: SuperfileUri::new_v4(),
n_docs: docs,
id_min: 0,
id_max: docs as i128 - 1,
scalar_stats: Default::default(),
fts_summary: Default::default(),
vector_summary: Default::default(),
partition_key: pk,
partition_hint: Some(hint),
subsection_offsets: None,
})
}
fn hash2_pk(bucket: u32) -> Vec<u8> {
bucket.to_le_bytes().to_vec()
}
#[tokio::test]
async fn update_older_entry_preserved_when_latest_rewritten() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 2;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf_old = make_superfile_entry(100, pk.clone());
let sf_latest = make_superfile_entry(150, pk.clone());
let part_old = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_old.clone()],
};
let pw_old = write_manifest_part(storage.as_ref(), &part_old, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_old");
let part_latest = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_latest.clone()],
};
let pw_latest = write_manifest_part(storage.as_ref(), &part_latest, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_latest");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![
ManifestPartEntry {
part_id: pw_old.part_id,
uri: pw_old.uri.clone(),
content_hash: pw_old.content_hash,
size_bytes_compressed: pw_old.size_bytes_compressed,
size_bytes_uncompressed: pw_old.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_latest.part_id,
uri: pw_latest.uri,
content_hash: pw_latest.content_hash,
size_bytes_compressed: pw_latest.size_bytes_compressed,
size_bytes_uncompressed: pw_latest.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts = DashMap::new();
parts.insert(
part_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_latest)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_old, sf_latest],
},
list: Some(list),
parts,
loader: Some(Arc::new(loader)),
});
let new_entries = vec![make_superfile_entry(75, pk.clone())];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[1].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(list_entries[0].uri, pw_old.uri);
assert_eq!(list_entries[1].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let total_docs: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 225); }
#[tokio::test]
async fn update_two_partitions_both_touched() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 3;
let opts = Arc::new(base_opts);
let pk_a = hash2_pk(0);
let pk_b = hash2_pk(1);
let (_dir, storage) = local_storage();
let sf_a = make_superfile_entry_hinted(100, pk_a.clone(), 0);
let part_a = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a.clone()],
};
let pw_a = write_manifest_part(storage.as_ref(), &part_a, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a");
let sf_b = make_superfile_entry_hinted(200, pk_b.clone(), 1);
let part_b = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_b.clone()],
};
let pw_b = write_manifest_part(storage.as_ref(), &part_b, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_b");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 2,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a.part_id,
uri: pw_a.uri,
content_hash: pw_a.content_hash,
size_bytes_compressed: pw_a.size_bytes_compressed,
size_bytes_uncompressed: pw_a.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_a.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_b.part_id,
uri: pw_b.uri,
content_hash: pw_b.content_hash,
size_bytes_compressed: pw_b.size_bytes_compressed,
size_bytes_uncompressed: pw_b.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_b.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a)))),
);
parts_map.insert(
part_b.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_b)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_a, sf_b],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let new_entries = vec![
make_superfile_entry_hinted(50, pk_a.clone(), 0),
make_superfile_entry_hinted(80, pk_b.clone(), 1),
];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts.len(), 2);
assert_eq!(list_entries[0].partition_key, pk_a);
assert_eq!(list_entries[1].partition_key, pk_b);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let docs_a: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_a, 150);
assert_eq!(list_entries[1].n_superfiles, 2);
assert_eq!(parts[1].part.superfiles.len(), 2);
let docs_b: u64 = parts[1].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_b, 280);
}
#[tokio::test]
async fn update_two_partitions_one_touched_exact_carry_over() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 3;
let opts = Arc::new(base_opts);
let pk_a = hash2_pk(0);
let pk_b = hash2_pk(1);
let (_dir, storage) = local_storage();
let sf_a = make_superfile_entry_hinted(100, pk_a.clone(), 0);
let part_a = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a.clone()],
};
let pw_a = write_manifest_part(storage.as_ref(), &part_a, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a");
let sf_b = make_superfile_entry_hinted(200, pk_b.clone(), 1);
let part_b = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_b.clone()],
};
let pw_b = write_manifest_part(storage.as_ref(), &part_b, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_b");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 2,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a.part_id,
uri: pw_a.uri,
content_hash: pw_a.content_hash,
size_bytes_compressed: pw_a.size_bytes_compressed,
size_bytes_uncompressed: pw_a.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_a.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_b.part_id,
uri: pw_b.uri.clone(),
content_hash: pw_b.content_hash,
size_bytes_compressed: pw_b.size_bytes_compressed,
size_bytes_uncompressed: pw_b.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_b.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a)))),
);
parts_map.insert(
part_b.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_b)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_a, sf_b],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let new_entries = vec![make_superfile_entry_hinted(50, pk_a.clone(), 0)];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk_a);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let docs_a: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_a, 150);
assert_eq!(list_entries[1].partition_key, pk_b);
assert_eq!(list_entries[1].n_superfiles, 1);
assert_eq!(list_entries[1].uri, pw_b.uri);
assert_eq!(list_entries[1].content_hash, pw_b.content_hash);
}
#[tokio::test]
async fn update_two_partitions_each_with_prior_split() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 2;
let opts = Arc::new(base_opts);
let pk_a = hash2_pk(0);
let pk_b = hash2_pk(1);
let (_dir, storage) = local_storage();
let sf_a_old = make_superfile_entry_hinted(100, pk_a.clone(), 0);
let part_a_old = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_old.clone()],
};
let pw_a_old = write_manifest_part(storage.as_ref(), &part_a_old, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_old");
let sf_a_latest = make_superfile_entry_hinted(150, pk_a.clone(), 0);
let part_a_latest = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_latest.clone()],
};
let pw_a_latest =
write_manifest_part(storage.as_ref(), &part_a_latest, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_latest");
let sf_b_old = make_superfile_entry_hinted(200, pk_b.clone(), 1);
let part_b_old = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_b_old.clone()],
};
let pw_b_old = write_manifest_part(storage.as_ref(), &part_b_old, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_b_old");
let sf_b_latest = make_superfile_entry_hinted(250, pk_b.clone(), 1);
let part_b_latest = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_b_latest.clone()],
};
let pw_b_latest =
write_manifest_part(storage.as_ref(), &part_b_latest, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_b_latest");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 2,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a_old.part_id,
uri: pw_a_old.uri.clone(),
content_hash: pw_a_old.content_hash,
size_bytes_compressed: pw_a_old.size_bytes_compressed,
size_bytes_uncompressed: pw_a_old.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_a.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_a_latest.part_id,
uri: pw_a_latest.uri,
content_hash: pw_a_latest.content_hash,
size_bytes_compressed: pw_a_latest.size_bytes_compressed,
size_bytes_uncompressed: pw_a_latest.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_a.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_b_old.part_id,
uri: pw_b_old.uri.clone(),
content_hash: pw_b_old.content_hash,
size_bytes_compressed: pw_b_old.size_bytes_compressed,
size_bytes_uncompressed: pw_b_old.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_b.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_b_latest.part_id,
uri: pw_b_latest.uri,
content_hash: pw_b_latest.content_hash,
size_bytes_compressed: pw_b_latest.size_bytes_compressed,
size_bytes_uncompressed: pw_b_latest.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_b.clone(),
id_range: (0, 249),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_latest)))),
);
parts_map.insert(
part_b_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_b_latest)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_a_old, sf_a_latest, sf_b_old, sf_b_latest],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let new_entries = vec![
make_superfile_entry_hinted(75, pk_a.clone(), 0),
make_superfile_entry_hinted(90, pk_b.clone(), 1),
];
let (new_manifest, parts) = old_manifest
.update(&new_entries, &[])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 4);
assert_eq!(parts.len(), 2);
assert_eq!(list_entries[0].partition_key, pk_a);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(list_entries[0].uri, pw_a_old.uri);
assert_eq!(list_entries[0].content_hash, pw_a_old.content_hash);
assert_eq!(list_entries[1].partition_key, pk_a);
assert_eq!(list_entries[1].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let docs_a: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_a, 225);
assert_eq!(list_entries[2].partition_key, pk_b);
assert_eq!(list_entries[2].n_superfiles, 1);
assert_eq!(list_entries[2].uri, pw_b_old.uri);
assert_eq!(list_entries[2].content_hash, pw_b_old.content_hash);
assert_eq!(list_entries[3].partition_key, pk_b);
assert_eq!(list_entries[3].n_superfiles, 2);
assert_eq!(parts[1].part.superfiles.len(), 2);
let docs_b: u64 = parts[1].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_b, 340); }
#[tokio::test]
async fn update_remove_one_superfile_from_partition() {
let opts = make_opts();
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf_keep = make_superfile_entry(100, pk.clone());
let sf_remove = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_keep.clone(), sf_remove.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
existing_part.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_keep.clone(), sf_remove.clone()],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let (new_manifest, parts) = old_manifest
.update(&[], from_ref(&sf_remove))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(parts[0].part.superfiles.len(), 1);
assert_eq!(
parts[0].part.superfiles[0].superfile_id,
sf_keep.superfile_id
);
let total_docs: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 100);
}
#[tokio::test]
async fn update_add_and_remove_in_same_partition() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 3;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf_keep = make_superfile_entry(100, pk.clone());
let sf_remove = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_keep.clone(), sf_remove.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
existing_part.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_keep.clone(), sf_remove.clone()],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let sf_new = make_superfile_entry(75, pk.clone());
let new_entries = vec![sf_new.clone()];
let (new_manifest, parts) = old_manifest
.update(&new_entries, from_ref(&sf_remove))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 2);
assert_eq!(parts[0].part.superfiles.len(), 2);
let ids: Vec<_> = parts[0]
.part
.superfiles
.iter()
.map(|s| s.superfile_id)
.collect();
assert!(ids.contains(&sf_keep.superfile_id));
assert!(ids.contains(&sf_new.superfile_id));
assert!(!ids.contains(&sf_remove.superfile_id));
let total_docs: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(total_docs, 175); }
#[tokio::test]
async fn update_remove_from_one_partition_other_carried_over_exactly() {
let opts = make_opts();
let pk_a = hash2_pk(0);
let pk_b = hash2_pk(1);
let (_dir, storage) = local_storage();
let sf_a_keep = make_superfile_entry_hinted(100, pk_a.clone(), 0);
let sf_a_remove = make_superfile_entry_hinted(150, pk_a.clone(), 0);
let part_a = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_keep.clone(), sf_a_remove.clone()],
};
let pw_a = write_manifest_part(storage.as_ref(), &part_a, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a");
let sf_b = make_superfile_entry_hinted(200, pk_b.clone(), 1);
let part_b = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_b.clone()],
};
let pw_b = write_manifest_part(storage.as_ref(), &part_b, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_b");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 2,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a.part_id,
uri: pw_a.uri,
content_hash: pw_a.content_hash,
size_bytes_compressed: pw_a.size_bytes_compressed,
size_bytes_uncompressed: pw_a.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk_a.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_b.part_id,
uri: pw_b.uri.clone(),
content_hash: pw_b.content_hash,
size_bytes_compressed: pw_b.size_bytes_compressed,
size_bytes_uncompressed: pw_b.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk_b.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a)))),
);
parts_map.insert(
part_b.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_b)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf_a_keep.clone(), sf_a_remove.clone(), sf_b.clone()],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let (new_manifest, parts) = old_manifest
.update(&[], from_ref(&sf_a_remove))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk_a);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(parts[0].part.superfiles.len(), 1);
assert_eq!(
parts[0].part.superfiles[0].superfile_id,
sf_a_keep.superfile_id
);
let docs_a: u64 = parts[0].part.superfiles.iter().map(|s| s.n_docs).sum();
assert_eq!(docs_a, 100);
assert_eq!(list_entries[1].partition_key, pk_b);
assert_eq!(list_entries[1].n_superfiles, 1);
assert_eq!(list_entries[1].uri, pw_b.uri);
assert_eq!(list_entries[1].content_hash, pw_b.content_hash);
}
#[tokio::test]
async fn update_remove_from_latest_part_in_split_partition() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 2;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf_a_old = make_superfile_entry(100, pk.clone());
let part_a_old = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_old.clone()],
};
let pw_a_old = write_manifest_part(storage.as_ref(), &part_a_old, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_old");
let sf_a_latest_keep = make_superfile_entry(150, pk.clone());
let sf_a_latest_remove = make_superfile_entry(200, pk.clone());
let part_a_latest = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_latest_keep.clone(), sf_a_latest_remove.clone()],
};
let pw_a_latest =
write_manifest_part(storage.as_ref(), &part_a_latest, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_latest");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a_old.part_id,
uri: pw_a_old.uri.clone(),
content_hash: pw_a_old.content_hash,
size_bytes_compressed: pw_a_old.size_bytes_compressed,
size_bytes_uncompressed: pw_a_old.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk.clone(),
id_range: (0, 99),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_a_latest.part_id,
uri: pw_a_latest.uri.clone(),
content_hash: pw_a_latest.content_hash,
size_bytes_compressed: pw_a_latest.size_bytes_compressed,
size_bytes_uncompressed: pw_a_latest.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a_old.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_old)))),
);
parts_map.insert(
part_a_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_latest)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![
sf_a_old.clone(),
sf_a_latest_keep.clone(),
sf_a_latest_remove.clone(),
],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let (new_manifest, parts_to_write) = old_manifest
.update(&[], from_ref(&sf_a_latest_remove))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts_to_write.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[1].partition_key, pk);
let all_ids: Vec<_> = parts_to_write
.iter()
.flat_map(|ep| ep.part.superfiles.iter())
.map(|s| s.superfile_id)
.collect();
assert!(
all_ids.contains(&sf_a_latest_keep.superfile_id),
"sf_a_latest_keep must survive"
);
assert!(
!all_ids.contains(&sf_a_latest_remove.superfile_id),
"sf_a_latest_remove must be absent"
);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(list_entries[1].n_superfiles, 1);
}
#[tokio::test]
async fn update_remove_all_superfiles_empties_partition() {
let opts = make_opts();
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf1 = make_superfile_entry(100, pk.clone());
let sf2 = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf1.clone(), sf2.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
existing_part.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf1.clone(), sf2.clone()],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let (new_manifest, parts) = old_manifest
.update(&[], &[sf1.clone(), sf2.clone()])
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 0);
assert_eq!(parts[0].part.superfiles.len(), 0);
}
#[tokio::test]
async fn update_remove_nonexistent_superfile_id_is_noop() {
let opts = make_opts();
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf1 = make_superfile_entry(100, pk.clone());
let sf2 = make_superfile_entry(150, pk.clone());
let existing_part = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf1.clone(), sf2.clone()],
};
let pw = write_manifest_part(storage.as_ref(), &existing_part, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![ManifestPartEntry {
part_id: pw.part_id,
uri: pw.uri,
content_hash: pw.content_hash,
size_bytes_compressed: pw.size_bytes_compressed,
size_bytes_uncompressed: pw.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
}],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
existing_part.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(existing_part)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![sf1.clone(), sf2.clone()],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let sf_ghost = make_superfile_entry(50, pk.clone());
let (new_manifest, parts_to_write) = old_manifest
.update(&[], from_ref(&sf_ghost))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 1);
assert_eq!(parts_to_write.len(), 0);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[0].n_superfiles, 2);
}
#[tokio::test]
async fn update_remove_from_older_frozen_part_in_split_partition() {
let mut base_opts =
SupertableOptions::new(simple_schema(), vec![], vec![], None).expect("valid options");
base_opts.target_superfiles_per_part = 2;
let opts = Arc::new(base_opts);
let pk = hash_bucket_0_pk();
let (_dir, storage) = local_storage();
let sf_a_old_keep = make_superfile_entry(100, pk.clone());
let sf_a_old_remove = make_superfile_entry(150, pk.clone());
let part_a_old = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_old_keep.clone(), sf_a_old_remove.clone()],
};
let pw_a_old = write_manifest_part(storage.as_ref(), &part_a_old, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_old");
let sf_a_latest = make_superfile_entry(200, pk.clone());
let part_a_latest = ManifestPart {
format_version: part::FORMAT_VERSION.into(),
part_id: PartId::new_v4(),
superfiles: vec![sf_a_latest.clone()],
};
let pw_a_latest =
write_manifest_part(storage.as_ref(), &part_a_latest, MANIFEST_ZSTD_LEVEL)
.await
.expect("write part_a_latest");
let list = Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: vec![],
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts: vec![
ManifestPartEntry {
part_id: pw_a_old.part_id,
uri: pw_a_old.uri,
content_hash: pw_a_old.content_hash,
size_bytes_compressed: pw_a_old.size_bytes_compressed,
size_bytes_uncompressed: pw_a_old.size_bytes_uncompressed,
n_superfiles: 2,
partition_key: pk.clone(),
id_range: (0, 149),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
ManifestPartEntry {
part_id: pw_a_latest.part_id,
uri: pw_a_latest.uri,
content_hash: pw_a_latest.content_hash,
size_bytes_compressed: pw_a_latest.size_bytes_compressed,
size_bytes_uncompressed: pw_a_latest.size_bytes_uncompressed,
n_superfiles: 1,
partition_key: pk.clone(),
id_range: (0, 199),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
},
],
};
let loader = ManifestPartLoader::new(storage, &list);
let parts_map = DashMap::new();
parts_map.insert(
part_a_old.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_old)))),
);
parts_map.insert(
part_a_latest.part_id,
Arc::new(OnceCell::new_with(Some(Arc::new(part_a_latest)))),
);
let old_manifest = Arc::new(ManifestSnapshot {
superfile_list: SuperfileList {
manifest_id: 0,
options: opts.clone(),
superfiles: vec![
sf_a_old_keep.clone(),
sf_a_old_remove.clone(),
sf_a_latest.clone(),
],
},
list: Some(list),
parts: parts_map,
loader: Some(Arc::new(loader)),
});
let (new_manifest, parts_to_write) = old_manifest
.update(&[], from_ref(&sf_a_old_remove))
.await
.expect("update");
let list_entries = new_manifest.get_all_list_entries();
assert_eq!(list_entries.len(), 2);
assert_eq!(parts_to_write.len(), 1);
assert_eq!(list_entries[0].partition_key, pk);
assert_eq!(list_entries[1].partition_key, pk);
let all_ids: Vec<_> = parts_to_write
.iter()
.flat_map(|ep| ep.part.superfiles.iter())
.map(|s| s.superfile_id)
.collect();
assert!(
all_ids.contains(&sf_a_old_keep.superfile_id),
"sf_a_old_keep must survive"
);
assert!(
!all_ids.contains(&sf_a_old_remove.superfile_id),
"sf_a_old_remove must be absent"
);
assert_eq!(list_entries[0].n_superfiles, 1);
assert_eq!(list_entries[1].n_superfiles, 1);
}
fn list_with_parts(n_parts: usize) -> list::Manifest {
use list::{ManifestPartEntry, PartitionStrategy};
let parts = (0..n_parts)
.map(|i| ManifestPartEntry {
part_id: part::PartId(Uuid::from_u128(i as u128 + 1)),
uri: format!("manifests/part-{i}"),
n_superfiles: 0,
size_bytes_compressed: 0,
size_bytes_uncompressed: 0,
content_hash: part::ContentHash([0u8; 32]),
partition_key: Vec::new(),
id_range: (0, 0),
scalar_stats_agg: Default::default(),
fts_summary_agg: Default::default(),
vector_summary_agg: Default::default(),
})
.collect();
list::Manifest {
format_version: list::FORMAT_VERSION.into(),
manifest_id: 1,
options_hash: part::ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
},
parts,
}
}
fn manifest_with_list(list: list::Manifest) -> ManifestSnapshot {
ManifestSnapshot {
superfile_list: SuperfileList::empty(opts()),
list: Some(list),
parts: DashMap::new(),
loader: None,
}
}
#[test]
fn list_accessors_read_from_attached_list() {
let m = manifest_with_list(list_with_parts(3));
assert_eq!(m.get_num_parts(), 3);
assert_eq!(m.get_all_list_entries().len(), 3);
assert_eq!(m.get_num_parts_loaded(), 0, "nothing eagerly loaded");
assert!(!m.is_in_process_only(), "a list is attached");
let empty = ManifestSnapshot::empty(opts());
assert_eq!(empty.get_num_parts(), 0);
assert!(empty.get_all_list_entries().is_empty());
assert!(empty.is_in_process_only());
}
#[test]
fn cached_part_lookups_miss_before_load() {
let m = manifest_with_list(list_with_parts(2));
let known_id = part::PartId(Uuid::from_u128(1));
assert!(m.get_cached_part_by_id(&known_id).is_none());
assert!(m.get_cached_part_by_list_idx(0).is_none());
assert!(m.get_cached_part_by_list_idx(1).is_none());
let empty = ManifestSnapshot::empty(opts());
assert!(empty.get_cached_part_by_list_idx(0).is_none());
}
#[test]
fn manifest_new_without_storage_is_in_process_only() {
let m = ManifestSnapshot::new(7, opts(), vec![seg_entry(Uuid::new_v4(), 4)], None, None);
assert_eq!(m.get_manifest_id(), 7);
assert!(m.is_in_process_only());
assert_eq!(m.get_num_parts(), 0);
assert_eq!(m.superfiles.len(), 1);
}
#[test]
fn from_fp32_handles_non_finite_components() {
let centroids = [f32::INFINITY, f32::NEG_INFINITY, 0.0, 1.0];
let cc = ClusterCentroids::from_fp32(1, 4, ¢roids, vec![1]);
assert_eq!(cc.scales[0], 0.0);
assert!(cc.mins[0].is_finite());
assert!(cc.codes.iter().all(|&c| c == 0));
}
}