use std::{collections::HashSet, fmt, sync::Arc, time::Duration};
use arrow_schema::{DataType, Field, Schema};
use rayon::{ThreadPool, ThreadPoolBuilder};
use super::{
error::BuildError,
reader_cache::{
ColdFetchMode, DiskCacheConfig, DiskCacheStore, InMemoryReaderCache, LruPolicy,
SuperfileReaderCache,
},
};
use crate::{
config::{Config, StorageBackend, StorageColdFetchMode},
storage::{AzureStorageProvider, LocalFsStorageProvider, S3StorageProvider, StorageProvider},
superfile::{
OpenOptions,
builder::{BuilderOptions, FtsConfig, VectorConfig},
fts::tokenize::Tokenizer,
},
supertable::manifest::{disk_cache::ManifestDiskCache, list::PartitionStrategy},
};
const VECTOR_DIM_MIN: usize = 16;
const VECTOR_DIM_MAX: usize = 4096;
const RESERVED_SEPARATOR: char = '\x1F';
const RESERVED_PREFIX: &str = "inf.";
fn default_writer_thread_count() -> usize {
num_cpus::get().div_ceil(2).max(1)
}
fn default_reader_thread_count() -> usize {
num_cpus::get().max(1)
}
fn default_id_column() -> String {
"_id".to_string()
}
pub(crate) const DECIMAL128_PRECISION: u8 = 38;
pub(crate) const DECIMAL128_SCALE: i8 = 0;
const DEFAULT_READ_STALENESS_SECS: u64 = 1;
const DEFAULT_TARGET_SUPERFILES_PER_PART: u64 = 10_000;
const DEFAULT_PART_SIZE_THRESHOLD_BYTES: u64 = 10 * (1 << 20);
const DEFAULT_EAGER_LOAD_THRESHOLD_PARTS: u32 = 4;
const MANIFEST_CACHE_SUBDIR: &str = "manifest-parts";
const DEFAULT_MAX_COMMIT_RETRIES: u32 = 10;
const DEFAULT_COMMIT_THRESHOLD_SIZE_MB: u64 = 1024;
const DEFAULT_PUT_MULTIPART_THRESHOLD_BYTES: u64 = 100 * (1 << 20);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Consistency {
Strong,
BoundedStaleness(Duration),
Snapshot,
}
impl Default for Consistency {
fn default() -> Self {
Consistency::BoundedStaleness(Duration::from_secs(DEFAULT_READ_STALENESS_SECS))
}
}
pub struct SupertableOptions {
pub schema: Arc<Schema>,
pub id_column: String,
pub fts_columns: Vec<FtsConfig>,
pub vector_columns: Vec<VectorConfig>,
pub tokenizer: Option<Arc<dyn Tokenizer>>,
pub reader_pool: Arc<ThreadPool>,
pub writer_pool: Arc<ThreadPool>,
pub store: Arc<dyn SuperfileReaderCache>,
pub storage: Option<Arc<dyn StorageProvider>>,
pub disk_cache: Option<Arc<DiskCacheStore>>,
pub manifest_disk_cache: Option<Arc<ManifestDiskCache>>,
pub memory_budget_bytes: Option<u64>,
pub prepopulate_cache_on_commit: bool,
pub partition_strategy: Option<PartitionStrategy>,
pub target_superfiles_per_part: u64,
pub part_size_threshold_bytes: u64,
pub eager_load_threshold_parts: u32,
pub max_commit_retries: u32,
pub commit_threshold_size_mb: u64,
pub put_multipart_threshold_bytes: u64,
pub verify_crc_on_open: bool,
pub read_consistency: Consistency,
}
impl SupertableOptions {
pub fn new(
schema: Arc<Schema>,
fts_columns: Vec<FtsConfig>,
vector_columns: Vec<VectorConfig>,
tokenizer: Option<Arc<dyn Tokenizer>>,
) -> Result<Self, BuildError> {
let id_column = default_id_column();
if schema.fields().iter().any(|f| f.name() == &id_column) {
return Err(BuildError::IdColumnReserved(id_column.clone()));
}
for fc in &fts_columns {
check_user_column_name(&fc.column)?;
let idx = schema
.index_of(&fc.column)
.map_err(|_| BuildError::FtsColumnMissing {
column: fc.column.clone(),
})?;
let f = schema.field(idx);
if f.data_type() != &DataType::LargeUtf8 {
return Err(BuildError::FtsColumnMustBeLargeUtf8 {
column: fc.column.clone(),
actual: format!("{:?}", f.data_type()),
});
}
}
for vc in &vector_columns {
check_user_column_name(&vc.column)?;
if vc.dim < VECTOR_DIM_MIN || vc.dim > VECTOR_DIM_MAX {
return Err(BuildError::VectorDimOutOfRange {
column: vc.column.clone(),
dim: vc.dim,
});
}
let idx = schema
.index_of(&vc.column)
.map_err(|_| BuildError::VectorColumnMissing {
column: vc.column.clone(),
})?;
let f = schema.field(idx);
match f.data_type() {
DataType::FixedSizeList(item_field, list_size) => {
if item_field.data_type() != &DataType::Float32 {
return Err(BuildError::VectorColumnNotFixedSizeList {
column: vc.column.clone(),
dim: vc.dim,
actual: format!("{:?}", f.data_type()),
});
}
let list_size = usize::try_from(*list_size).unwrap_or(usize::MAX);
if list_size != vc.dim {
return Err(BuildError::VectorColumnDimMismatch {
column: vc.column.clone(),
expected: vc.dim,
actual: list_size,
});
}
}
other => {
return Err(BuildError::VectorColumnNotFixedSizeList {
column: vc.column.clone(),
dim: vc.dim,
actual: format!("{:?}", other),
});
}
}
}
let mut seen_logical: HashSet<&str> = HashSet::new();
for fc in &fts_columns {
if !seen_logical.insert(fc.column.as_str()) {
return Err(BuildError::DuplicateLogicalName(fc.column.clone()));
}
}
for vc in &vector_columns {
if !seen_logical.insert(vc.column.as_str()) {
return Err(BuildError::DuplicateLogicalName(vc.column.clone()));
}
}
if !fts_columns.is_empty() && tokenizer.is_none() {
return Err(BuildError::MissingTokenizer);
}
let reader_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(default_reader_thread_count())
.thread_name(|i| format!("supertable-reader-{i}"))
.build()
.map_err(|e| BuildError::ThreadPoolCreation(e.to_string()))?,
);
let writer_threads = default_writer_thread_count();
let writer_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(writer_threads)
.thread_name(|i| format!("supertable-writer-{i}"))
.build()
.map_err(|e| BuildError::ThreadPoolCreation(e.to_string()))?,
);
let store: Arc<dyn SuperfileReaderCache> = Arc::new(InMemoryReaderCache::new());
Ok(Self {
schema,
id_column,
fts_columns,
vector_columns,
tokenizer,
reader_pool,
writer_pool,
store,
storage: None,
disk_cache: None,
manifest_disk_cache: None,
memory_budget_bytes: None,
prepopulate_cache_on_commit: true,
partition_strategy: None,
target_superfiles_per_part: DEFAULT_TARGET_SUPERFILES_PER_PART,
part_size_threshold_bytes: DEFAULT_PART_SIZE_THRESHOLD_BYTES,
eager_load_threshold_parts: DEFAULT_EAGER_LOAD_THRESHOLD_PARTS,
max_commit_retries: DEFAULT_MAX_COMMIT_RETRIES,
commit_threshold_size_mb: DEFAULT_COMMIT_THRESHOLD_SIZE_MB,
put_multipart_threshold_bytes: DEFAULT_PUT_MULTIPART_THRESHOLD_BYTES,
verify_crc_on_open: true,
read_consistency: Consistency::default(),
})
}
pub fn user_schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
}
pub fn effective_schema(&self) -> Arc<Schema> {
let mut fields = vec![Arc::new(Field::new(
&self.id_column,
DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE),
false,
))];
fields.extend(self.schema.fields().iter().cloned());
Arc::new(Schema::new(fields))
}
pub fn effective_partition_strategy(&self) -> PartitionStrategy {
self.partition_strategy
.clone()
.unwrap_or(PartitionStrategy::IngestionTime {
granularity_secs: 86_400,
})
}
pub fn with_id_column(mut self, name: impl Into<String>) -> Result<Self, BuildError> {
let name = name.into();
if self.schema.fields().iter().any(|f| f.name() == &name) {
return Err(BuildError::IdColumnReserved(name));
}
self.id_column = name;
Ok(self)
}
pub fn with_reader_pool(mut self, pool: Arc<ThreadPool>) -> Self {
self.reader_pool = pool;
self
}
pub fn with_writer_pool(mut self, pool: Arc<ThreadPool>) -> Self {
self.writer_pool = pool;
self
}
pub fn with_read_consistency(mut self, consistency: Consistency) -> Self {
self.read_consistency = consistency;
self
}
pub fn with_store(mut self, store: Arc<dyn SuperfileReaderCache>) -> Self {
self.store = store;
self
}
pub fn with_storage(mut self, storage: Arc<dyn StorageProvider>) -> Self {
self.storage = Some(storage);
self
}
pub fn with_disk_cache(mut self, cache: Arc<DiskCacheStore>) -> Self {
self.disk_cache = Some(cache);
self
}
pub fn with_manifest_disk_cache(mut self, cache: Arc<ManifestDiskCache>) -> Self {
self.manifest_disk_cache = Some(cache);
self
}
pub fn with_memory_budget(mut self, budget_bytes: u64) -> Self {
self.memory_budget_bytes = Some(budget_bytes);
self
}
pub fn with_cache_prepopulation(mut self, enabled: bool) -> Self {
self.prepopulate_cache_on_commit = enabled;
self
}
pub fn with_partition_strategy(mut self, strategy: PartitionStrategy) -> Self {
self.partition_strategy = Some(strategy);
self
}
pub fn with_target_superfiles_per_part(mut self, n: u64) -> Self {
self.target_superfiles_per_part = n;
self
}
pub fn with_part_size_threshold_bytes(mut self, n: u64) -> Self {
self.part_size_threshold_bytes = n;
self
}
pub fn with_eager_load_threshold(mut self, n: u32) -> Self {
self.eager_load_threshold_parts = n;
self
}
pub fn with_max_commit_retries(mut self, n: u32) -> Self {
self.max_commit_retries = n;
self
}
pub fn with_commit_threshold_size_mb(mut self, mb: u64) -> Self {
self.commit_threshold_size_mb = mb;
self
}
pub fn with_put_multipart_threshold_bytes(mut self, n: u64) -> Self {
self.put_multipart_threshold_bytes = n;
self
}
pub fn with_verify_crc_on_open(mut self, v: bool) -> Self {
self.verify_crc_on_open = v;
self
}
pub(crate) fn superfile_open_options(&self) -> OpenOptions {
OpenOptions {
verify_crc: self.verify_crc_on_open,
}
}
pub fn apply_config(mut self, cfg: &Config) -> Result<Self, BuildError> {
let reader_n = cfg
.supertable
.reader_threads
.resolve_or_default(default_reader_thread_count());
let writer_n = cfg
.supertable
.writer_threads
.resolve_or_default(default_writer_thread_count());
self.reader_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(reader_n)
.thread_name(|i| format!("supertable-reader-{i}"))
.build()
.map_err(|e| BuildError::ThreadPoolCreation(e.to_string()))?,
);
self.writer_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(writer_n)
.thread_name(|i| format!("supertable-writer-{i}"))
.build()
.map_err(|e| BuildError::ThreadPoolCreation(e.to_string()))?,
);
self.commit_threshold_size_mb = cfg.supertable.commit_threshold_size_mb;
self.verify_crc_on_open = cfg.supertable.verify_crc_on_open;
if cfg.supertable.id_column != self.id_column {
if self
.schema
.fields()
.iter()
.any(|f| f.name() == &cfg.supertable.id_column)
{
return Err(BuildError::IdColumnReserved(
cfg.supertable.id_column.clone(),
));
}
self.id_column = cfg.supertable.id_column.clone();
}
self.apply_storage_config(cfg)?;
Ok(self)
}
fn apply_storage_config(&mut self, cfg: &Config) -> Result<(), BuildError> {
let storage: Option<Arc<dyn StorageProvider>> = match cfg.storage.backend {
StorageBackend::None => None,
StorageBackend::LocalFs => {
let root = cfg.storage.local_root.as_ref().ok_or_else(|| {
BuildError::Store("storage.backend=local_fs requires storage.local_root".into())
})?;
Some(Arc::new(LocalFsStorageProvider::new(root)?) as Arc<dyn StorageProvider>)
}
StorageBackend::S3 => {
let bucket = cfg.storage.bucket.as_ref().ok_or_else(|| {
BuildError::Store("storage.backend=s3 requires storage.bucket".into())
})?;
Some(Arc::new(S3StorageProvider::new_with_prefix(
bucket,
&cfg.storage.prefix,
)?) as Arc<dyn StorageProvider>)
}
StorageBackend::Azure => {
let container = cfg.storage.bucket.as_ref().ok_or_else(|| {
BuildError::Store("storage.backend=azure requires storage.bucket".into())
})?;
Some(Arc::new(AzureStorageProvider::new_with_prefix(
container,
&cfg.storage.prefix,
)?) as Arc<dyn StorageProvider>)
}
};
let Some(storage) = storage else {
return Ok(());
};
if let Some(cache_root) = cfg.storage.disk_cache_root.as_ref() {
let cold_fetch_mode = match cfg.storage.cold_fetch_mode {
StorageColdFetchMode::HybridWithPrefetch => ColdFetchMode::HybridWithPrefetch,
StorageColdFetchMode::RangeOnly => ColdFetchMode::RangeOnly,
StorageColdFetchMode::LazyForegroundWithBackgroundFill => {
ColdFetchMode::LazyForegroundWithBackgroundFill
}
};
let disk_cfg = DiskCacheConfig {
cache_root: cache_root.clone(),
disk_budget_bytes: cfg.storage.disk_budget_bytes,
cold_fetch_mode,
cold_fetch_streams: cfg.storage.cold_fetch_streams.max(1),
cold_fetch_chunk_bytes: cfg.storage.cold_fetch_chunk_bytes.max(1),
prefetch_concurrency: cfg.storage.prefetch_concurrency.max(1),
mmap_cold_threshold_secs: cfg.storage.mmap_cold_threshold_secs,
mmap_sweep_interval_secs: cfg.storage.mmap_sweep_interval_secs,
eviction: Box::new(LruPolicy::new()),
verify_crc_on_open: cfg.supertable.verify_crc_on_open,
};
let cache = DiskCacheStore::new_unpinned(Arc::clone(&storage), disk_cfg)
.map_err(|e| BuildError::Store(format!("disk cache construction: {e}")))?;
self.disk_cache = Some(cache);
let manifest_cache_root = cache_root.join(MANIFEST_CACHE_SUBDIR);
let manifest_cache =
ManifestDiskCache::new(manifest_cache_root, cfg.storage.manifest_disk_budget_bytes)
.map_err(|e| {
BuildError::Store(format!("manifest disk cache construction: {e}"))
})?;
self.manifest_disk_cache = Some(manifest_cache);
}
self.storage = Some(storage);
Ok(())
}
pub fn builder_options(&self) -> BuilderOptions {
BuilderOptions::new(
self.scalar_schema(),
self.id_column.clone(),
self.fts_columns.clone(),
self.vector_columns.clone(),
self.tokenizer.clone(),
)
}
pub fn scalar_schema(&self) -> Arc<Schema> {
let vector_names: HashSet<&str> = self
.vector_columns
.iter()
.map(|vc| vc.column.as_str())
.collect();
let mut kept: Vec<Arc<Field>> = Vec::with_capacity(self.schema.fields().len() + 1);
kept.push(Arc::new(Field::new(
&self.id_column,
DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE),
false,
)));
kept.extend(
self.schema
.fields()
.iter()
.filter(|f| !vector_names.contains(f.name().as_str()))
.cloned(),
);
Arc::new(Schema::new(kept))
}
}
impl fmt::Debug for SupertableOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SupertableOptions")
.field("schema_fields", &self.schema.fields().len())
.field("id_column", &self.id_column)
.field("n_fts_columns", &self.fts_columns.len())
.field("n_vector_columns", &self.vector_columns.len())
.field("has_tokenizer", &self.tokenizer.is_some())
.finish()
}
}
fn check_user_column_name(name: &str) -> Result<(), BuildError> {
if name.contains(RESERVED_SEPARATOR) {
return Err(BuildError::ReservedSeparatorInColumnName(name.to_string()));
}
if name.starts_with(RESERVED_PREFIX) {
return Err(BuildError::ReservedPrefixInColumnName(name.to_string()));
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{env, fs, sync::Arc};
use arrow_schema::{DataType, Field};
use uuid::Uuid;
use super::*;
use crate::superfile::vector::{distance::Metric, rerank_codec::RerankCodec};
fn fixed_list_f32(dim: usize) -> DataType {
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
)
}
fn schema_with_vector(dim: usize) -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("emb", fixed_list_f32(dim), false),
]))
}
fn vc(name: &str, dim: usize) -> VectorConfig {
VectorConfig {
column: name.into(),
dim,
n_cent: 4,
rot_seed: 0,
metric: Metric::Cosine,
rerank_codec: RerankCodec::Fp32,
}
}
fn fc(name: &str) -> FtsConfig {
FtsConfig {
column: name.into(),
}
}
use crate::test_helpers::default_tokenizer as tok;
#[test]
fn valid_options_with_fts_and_vector_succeeds() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options should succeed");
assert_eq!(opts.id_column, "_id");
assert_eq!(opts.fts_columns.len(), 1);
assert_eq!(opts.vector_columns.len(), 1);
}
#[test]
fn schema_that_contains_id_column_is_rejected() {
let s = Arc::new(Schema::new(vec![
Field::new("_id", DataType::UInt64, false),
Field::new("emb", fixed_list_f32(16), false),
]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(err, BuildError::IdColumnReserved(c) if c == "_id"));
}
#[test]
fn fts_column_missing_from_schema_rejected() {
let s = schema_with_vector(16);
let err = SupertableOptions::new(s, vec![fc("absent")], vec![], Some(tok()))
.expect_err("expected error");
assert!(matches!(err, BuildError::FtsColumnMissing { column } if column == "absent"));
}
#[test]
fn fts_column_wrong_type_rejected() {
let s = Arc::new(Schema::new(vec![Field::new("body", DataType::Utf8, false)]));
let err = SupertableOptions::new(s, vec![fc("body")], vec![], Some(tok()))
.expect_err("expected error");
assert!(
matches!(err, BuildError::FtsColumnMustBeLargeUtf8 { column, .. } if column == "body")
);
}
#[test]
fn vector_column_missing_from_schema_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"category",
DataType::Utf8,
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(err, BuildError::VectorColumnMissing { column } if column == "emb"));
}
#[test]
fn vector_column_not_fixed_size_list_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"emb",
DataType::Float32,
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(
err,
BuildError::VectorColumnNotFixedSizeList { column, .. } if column == "emb"
));
}
#[test]
fn vector_column_wrong_inner_type_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"emb",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, true)), 16),
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(
err,
BuildError::VectorColumnNotFixedSizeList { column, .. } if column == "emb"
));
}
#[test]
fn vector_column_dim_mismatch_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"emb",
fixed_list_f32(8),
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(
err,
BuildError::VectorColumnDimMismatch { expected: 16, actual: 8, column } if column == "emb"
));
}
#[test]
fn vector_dim_below_min_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"emb",
fixed_list_f32(8),
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 8)], None)
.expect_err("expected error");
assert!(matches!(
err,
BuildError::VectorDimOutOfRange { column, dim: 8 } if column == "emb"
));
}
#[test]
fn vector_dim_above_max_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"emb",
fixed_list_f32(8192),
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("emb", 8192)], None)
.expect_err("expected error");
assert!(matches!(
err,
BuildError::VectorDimOutOfRange { column, dim: 8192 } if column == "emb"
));
}
#[test]
fn duplicate_logical_name_across_fts_and_vector_rejected() {
let s = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("emb", fixed_list_f32(16), false),
]));
let err =
SupertableOptions::new(s.clone(), vec![], vec![vc("emb", 16), vc("emb", 16)], None)
.expect_err("expected error");
assert!(matches!(err, BuildError::DuplicateLogicalName(n) if n == "emb"));
}
#[test]
fn reserved_separator_in_fts_column_name_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"ti\u{1F}tle",
DataType::LargeUtf8,
false,
)]));
let err = SupertableOptions::new(s, vec![fc("ti\u{1F}tle")], vec![], Some(tok()))
.expect_err("expected error");
assert!(matches!(err, BuildError::ReservedSeparatorInColumnName(_)));
}
#[test]
fn reserved_prefix_in_vector_column_name_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"inf.emb",
fixed_list_f32(16),
false,
)]));
let err = SupertableOptions::new(s, vec![], vec![vc("inf.emb", 16)], None)
.expect_err("expected error");
assert!(matches!(err, BuildError::ReservedPrefixInColumnName(_)));
}
#[test]
fn fts_columns_without_tokenizer_rejected() {
let s = Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]));
let err =
SupertableOptions::new(s, vec![fc("title")], vec![], None).expect_err("expected error");
assert!(matches!(err, BuildError::MissingTokenizer));
}
#[test]
fn empty_fts_and_vector_succeeds_without_tokenizer() {
let s = Arc::new(Schema::new(vec![Field::new(
"category",
DataType::Utf8,
false,
)]));
SupertableOptions::new(s, vec![], vec![], None).expect("empty fts + vector should succeed");
}
#[test]
fn scalar_schema_drops_vector_columns_and_prepends_id() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options");
let scalar = opts.scalar_schema();
let names: Vec<_> = scalar.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(names, vec!["_id", "title"]);
let id_field = scalar.field(0);
assert_eq!(
id_field.data_type(),
&DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE)
);
}
#[test]
fn scalar_schema_no_vector_columns_still_prepends_id() {
let s = Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]));
let opts = SupertableOptions::new(s, vec![fc("title")], vec![], Some(tok()))
.expect("valid options");
let scalar = opts.scalar_schema();
let names: Vec<_> = scalar.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(names, vec!["_id", "title"]);
}
#[test]
fn effective_schema_prepends_id_keeps_vector_columns() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options");
let eff = opts.effective_schema();
let names: Vec<_> = eff.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(names, vec!["_id", "title", "emb"]);
}
#[test]
fn user_schema_returns_input_schema_unchanged() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(
Arc::clone(&s),
vec![fc("title")],
vec![vc("emb", 16)],
Some(tok()),
)
.expect("valid options");
let us = opts.user_schema();
assert_eq!(us.fields().len(), s.fields().len());
for (a, b) in us.fields().iter().zip(s.fields().iter()) {
assert_eq!(a.name(), b.name());
}
}
#[test]
fn with_id_column_overrides_default() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options")
.with_id_column("row_id")
.expect("override accepted");
assert_eq!(opts.id_column, "row_id");
let eff = opts.effective_schema();
assert_eq!(eff.field(0).name(), "row_id");
}
#[test]
fn with_id_column_rejects_name_that_collides_with_user_schema() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options");
let err = opts.with_id_column("title").expect_err("collision");
assert!(matches!(err, BuildError::IdColumnReserved(c) if c == "title"));
}
#[test]
fn apply_config_sets_writer_pool_size_to_fixed_value() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = r#"
supertable:
reader_threads: 3
writer_threads: 5
commit_threshold_size_mb: 7
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options")
.apply_config(&cfg)
.expect("apply_config");
assert_eq!(opts.commit_threshold_size_mb, 7);
assert_eq!(opts.reader_pool.current_num_threads(), 3);
assert_eq!(opts.writer_pool.current_num_threads(), 5);
}
#[test]
fn apply_config_auto_resolves_to_num_cpus_defaults() {
let cfg = Config::defaults().expect("embedded default");
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options")
.apply_config(&cfg)
.expect("apply_config");
let reader_default = num_cpus::get().max(1);
let writer_default = num_cpus::get().div_ceil(2).max(1);
assert_eq!(opts.reader_pool.current_num_threads(), reader_default);
assert_eq!(opts.writer_pool.current_num_threads(), writer_default);
assert_eq!(opts.commit_threshold_size_mb, 1024);
assert_eq!(opts.id_column, "_id");
}
#[test]
fn debug_format_doesnt_explode() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options");
let s = format!("{:?}", opts);
assert!(s.contains("SupertableOptions"));
assert!(s.contains("_id"));
}
fn plain_opts() -> SupertableOptions {
let s = Arc::new(Schema::new(vec![Field::new(
"category",
DataType::Utf8,
false,
)]));
SupertableOptions::new(s, vec![], vec![], None).expect("valid options")
}
#[test]
fn consistency_default_is_bounded_staleness_one_sec() {
assert_eq!(
Consistency::default(),
Consistency::BoundedStaleness(Duration::from_secs(DEFAULT_READ_STALENESS_SECS))
);
}
#[test]
fn new_sets_documented_defaults() {
let opts = plain_opts();
assert!(opts.prepopulate_cache_on_commit);
assert!(opts.verify_crc_on_open);
assert!(opts.storage.is_none());
assert!(opts.disk_cache.is_none());
assert!(opts.memory_budget_bytes.is_none());
assert!(opts.partition_strategy.is_none());
assert_eq!(
opts.target_superfiles_per_part,
DEFAULT_TARGET_SUPERFILES_PER_PART
);
assert_eq!(
opts.part_size_threshold_bytes,
DEFAULT_PART_SIZE_THRESHOLD_BYTES
);
assert_eq!(
opts.eager_load_threshold_parts,
DEFAULT_EAGER_LOAD_THRESHOLD_PARTS
);
assert_eq!(opts.max_commit_retries, DEFAULT_MAX_COMMIT_RETRIES);
assert_eq!(
opts.commit_threshold_size_mb,
DEFAULT_COMMIT_THRESHOLD_SIZE_MB
);
assert_eq!(
opts.put_multipart_threshold_bytes,
DEFAULT_PUT_MULTIPART_THRESHOLD_BYTES
);
assert_eq!(opts.read_consistency, Consistency::default());
}
#[test]
fn effective_partition_strategy_defaults_to_ingestion_time() {
let opts = plain_opts();
match opts.effective_partition_strategy() {
PartitionStrategy::IngestionTime { granularity_secs } => {
assert_eq!(granularity_secs, 86_400);
}
other => panic!("expected IngestionTime with 1-day granularity, got {other:?}"),
}
}
#[test]
fn effective_partition_strategy_returns_configured_strategy() {
let strat = PartitionStrategy::Hash {
column: "category".into(),
n_buckets: 64,
};
let opts = plain_opts().with_partition_strategy(strat.clone());
assert_eq!(opts.effective_partition_strategy(), strat);
}
#[test]
fn scalar_threshold_builders_set_their_fields() {
let opts = plain_opts()
.with_target_superfiles_per_part(42)
.with_part_size_threshold_bytes(4096)
.with_eager_load_threshold(0)
.with_max_commit_retries(99)
.with_commit_threshold_size_mb(7)
.with_put_multipart_threshold_bytes(1)
.with_memory_budget(1 << 30)
.with_cache_prepopulation(false)
.with_verify_crc_on_open(false);
assert_eq!(opts.target_superfiles_per_part, 42);
assert_eq!(opts.part_size_threshold_bytes, 4096);
assert_eq!(opts.eager_load_threshold_parts, 0);
assert_eq!(opts.max_commit_retries, 99);
assert_eq!(opts.commit_threshold_size_mb, 7);
assert_eq!(opts.put_multipart_threshold_bytes, 1);
assert_eq!(opts.memory_budget_bytes, Some(1 << 30));
assert!(!opts.prepopulate_cache_on_commit);
assert!(!opts.verify_crc_on_open);
}
#[test]
fn with_read_consistency_overrides_default() {
let opts = plain_opts().with_read_consistency(Consistency::Strong);
assert_eq!(opts.read_consistency, Consistency::Strong);
let opts = opts.with_read_consistency(Consistency::Snapshot);
assert_eq!(opts.read_consistency, Consistency::Snapshot);
}
#[test]
fn with_reader_and_writer_pool_override_pools() {
let reader = Arc::new(
ThreadPoolBuilder::new()
.num_threads(2)
.build()
.expect("reader pool"),
);
let writer = Arc::new(
ThreadPoolBuilder::new()
.num_threads(3)
.build()
.expect("writer pool"),
);
let opts = plain_opts()
.with_reader_pool(Arc::clone(&reader))
.with_writer_pool(Arc::clone(&writer));
assert_eq!(opts.reader_pool.current_num_threads(), 2);
assert_eq!(opts.writer_pool.current_num_threads(), 3);
assert!(Arc::ptr_eq(&opts.reader_pool, &reader));
assert!(Arc::ptr_eq(&opts.writer_pool, &writer));
}
#[test]
fn with_store_replaces_default_store() {
let store: Arc<dyn SuperfileReaderCache> = Arc::new(InMemoryReaderCache::new());
let opts = plain_opts().with_store(Arc::clone(&store));
let opts_store: Arc<dyn SuperfileReaderCache> = Arc::clone(&opts.store);
assert!(Arc::ptr_eq(&opts_store, &store));
}
#[test]
fn with_storage_attaches_provider() {
let dir = env::temp_dir().join(format!("infino-opts-test-{}", Uuid::new_v4()));
fs::create_dir_all(&dir).expect("mkdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(&dir).expect("provider"));
let opts = plain_opts().with_storage(storage);
assert!(opts.storage.is_some());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn superfile_open_options_track_verify_crc_flag() {
let opts = plain_opts();
assert!(opts.superfile_open_options().verify_crc);
let opts = opts.with_verify_crc_on_open(false);
assert!(!opts.superfile_open_options().verify_crc);
}
#[test]
fn builder_options_use_scalar_schema_and_id_column() {
let s = schema_with_vector(16);
let opts = SupertableOptions::new(s, vec![fc("title")], vec![vc("emb", 16)], Some(tok()))
.expect("valid options");
let bo = opts.builder_options();
let names: Vec<_> = bo
.schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
assert_eq!(names, vec!["_id", "title"]);
assert_eq!(bo.id_column, "_id");
assert_eq!(bo.fts_columns.len(), 1);
assert_eq!(bo.vector_columns.len(), 1);
}
#[test]
fn apply_config_overrides_id_column_when_no_collision() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = r#"
supertable:
id_column: row_pk
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let opts = plain_opts().apply_config(&cfg).expect("apply_config");
assert_eq!(opts.id_column, "row_pk");
}
#[test]
fn apply_config_rejects_id_column_that_collides_with_schema() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = r#"
supertable:
id_column: category
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let err = plain_opts().apply_config(&cfg).expect_err("collision");
assert!(matches!(err, BuildError::IdColumnReserved(c) if c == "category"));
}
#[test]
fn apply_config_with_none_backend_leaves_storage_unattached() {
let cfg = Config::defaults().expect("defaults");
let opts = plain_opts().apply_config(&cfg).expect("apply_config");
assert!(opts.storage.is_none());
assert!(opts.disk_cache.is_none());
}
#[test]
fn with_disk_cache_attaches_cache() {
use crate::supertable::reader_cache::{DiskCacheConfig, DiskCacheStore, LruPolicy};
let dir = env::temp_dir().join(format!("infino-opts-dc-{}", Uuid::new_v4()));
fs::create_dir_all(&dir).expect("mkdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(&dir).expect("provider"));
let cache = DiskCacheStore::new_unpinned(
Arc::clone(&storage),
DiskCacheConfig {
cache_root: dir.join("cache"),
mmap_cold_threshold_secs: 0,
eviction: Box::new(LruPolicy::new()),
..Default::default()
},
)
.expect("disk cache");
let opts = plain_opts().with_storage(storage).with_disk_cache(cache);
assert!(opts.disk_cache.is_some());
assert!(opts.storage.is_some());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn apply_config_attaches_local_fs_storage() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let dir = env::temp_dir().join(format!("infino-opts-localfs-{}", Uuid::new_v4()));
fs::create_dir_all(&dir).expect("mkdir");
let yaml = format!(
"storage:\n backend: local_fs\n local_root: {}\n",
dir.display()
);
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(&yaml))).expect("parse config");
let opts = plain_opts().apply_config(&cfg).expect("apply_config");
assert!(opts.storage.is_some(), "local_fs backend attaches storage");
assert!(
opts.disk_cache.is_none(),
"no disk_cache_root ⇒ no cache attached"
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn apply_config_local_fs_with_disk_cache_root_attaches_both() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let dir = env::temp_dir().join(format!("infino-opts-dcroot-{}", Uuid::new_v4()));
let cache_root = dir.join("cache");
fs::create_dir_all(&dir).expect("mkdir");
let yaml = format!(
"storage:\n backend: local_fs\n local_root: {}\n disk_cache_root: {}\n",
dir.display(),
cache_root.display()
);
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(&yaml))).expect("parse config");
let opts = plain_opts().apply_config(&cfg).expect("apply_config");
assert!(opts.storage.is_some());
assert!(
opts.disk_cache.is_some(),
"disk_cache_root ⇒ cache attached"
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn apply_config_local_fs_without_root_is_rejected() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = "storage:\n backend: local_fs\n";
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let err = plain_opts()
.apply_config(&cfg)
.expect_err("missing local_root");
assert!(matches!(err, BuildError::Store(_)), "{err:?}");
}
#[test]
fn apply_config_attaches_s3_storage_from_bucket() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = "storage:\n backend: s3\n bucket: example-bucket\n prefix: tbl/example\n";
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let opts = plain_opts().apply_config(&cfg).expect("apply_config");
assert!(opts.storage.is_some(), "s3 backend attaches storage");
assert!(
opts.disk_cache.is_none(),
"no disk_cache_root ⇒ no cache attached"
);
}
#[test]
fn apply_config_s3_without_bucket_is_rejected() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = "storage:\n backend: s3\n";
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let err = plain_opts().apply_config(&cfg).expect_err("missing bucket");
assert!(matches!(err, BuildError::Store(_)), "{err:?}");
}
#[test]
fn apply_config_azure_without_bucket_is_rejected() {
use figment::{
Figment,
providers::{Format, Yaml},
};
let yaml = "storage:\n backend: azure\n";
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let err = plain_opts()
.apply_config(&cfg)
.expect_err("missing container");
assert!(matches!(err, BuildError::Store(_)), "{err:?}");
}
}