#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use infino::{
superfile::{builder::FtsConfig, fts::tokenize::Tokenizer},
supertable::{
Supertable,
manifest::list::PartitionStrategy,
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options, default_tokenizer},
};
const COMMITS_PER_TEST: usize = 3;
const HASH_N_BUCKETS: u32 = 4;
const DAY_GRANULARITY_SECS: i64 = 86_400;
const RAYON_POOL_THREADS: usize = 1;
const PARTITION_KEY_BYTES: usize = 8;
use tempfile::TempDir;
#[test]
fn default_strategy_is_ingestion_time_with_one_day_granularity() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
for _i in 0..COMMITS_PER_TEST {
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["x"])).expect("append");
w.commit().expect("commit");
}
let r = st.reader();
let m = r.manifest();
let list_entries = m.get_all_list_entries();
assert_eq!(
list_entries.len(),
1,
"ingestion-time default within same day → one list entry; got {} entries",
list_entries.len()
);
assert_eq!(
list_entries[0].n_superfiles, 3,
"after 3 single-superfile commits the part should hold 3 superfiles"
);
assert_eq!(list_entries[0].partition_key.len(), 8);
}
#[test]
fn rewrite_path_produces_fresh_part_id_per_commit() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let mut part_ids = Vec::new();
for _i in 0..COMMITS_PER_TEST {
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["x"])).expect("append");
w.commit().expect("commit");
let m_id = {
let r = st.reader();
let m = r.manifest();
let list_entries = m.get_all_list_entries();
list_entries[0].part_id
};
part_ids.push(m_id);
}
assert_ne!(part_ids[0], part_ids[1], "rewrite must mint a new part_id");
assert_ne!(part_ids[1], part_ids[2]);
assert_ne!(part_ids[0], part_ids[2]);
}
#[test]
fn target_superfiles_per_partition_triggers_part_split() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_target_superfiles_per_part(2);
let st = Supertable::create(opts).expect("create");
for _i in 0..COMMITS_PER_TEST {
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["x"])).expect("append");
w.commit().expect("commit");
}
let r = st.reader();
let m = r.manifest();
let list_entries = m.get_all_list_entries();
assert_eq!(
list_entries.len(),
2,
"after 3 commits with target=2, the partition should split into 2 entries; \
got {} entries",
list_entries.len()
);
assert_eq!(
list_entries[0].partition_key, list_entries[1].partition_key,
"both entries should share the same partition_key (same partition, split into 2 parts)"
);
let total_superfiles: u64 = list_entries.iter().map(|p| p.n_superfiles).sum();
assert_eq!(total_superfiles, 3);
}
#[test]
fn hash_strategy_with_multiple_buckets_errors_without_partition_hint() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_partition_strategy(PartitionStrategy::Hash {
column: "doc_id".into(),
n_buckets: HASH_N_BUCKETS,
});
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha"])).expect("append");
let err = w.commit().expect_err("commit must fail without pre-shard");
let s = format!("{err}");
assert!(
s.contains("pre-sharded") || s.contains("partition_hint"),
"expected partition-assignment error, got: {s}"
);
}
#[test]
fn time_range_strategy_on_unsupported_column_type_errors_cleanly() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_partition_strategy(PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: DAY_GRANULARITY_SECS,
});
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha"])).expect("append");
let err = w
.commit()
.expect_err("commit must fail on unsupported column type");
let s = format!("{err}");
assert!(
s.contains("unsupported type") || s.contains("expected Int64 or Timestamp"),
"expected unsupported-type TimeRange error; got: {s}"
);
}
#[test]
fn time_range_assigns_int64_superfiles_to_bucket_zero() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let schema = Arc::new(arrow_schema::Schema::new(vec![
arrow_schema::Field::new("doc_id", arrow_schema::DataType::UInt64, false),
arrow_schema::Field::new("ts_secs", arrow_schema::DataType::Int64, false),
arrow_schema::Field::new("title", arrow_schema::DataType::LargeUtf8, false),
]));
let tk: Arc<dyn Tokenizer> = default_tokenizer();
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(RAYON_POOL_THREADS)
.build()
.expect("pool"),
);
let opts = infino::supertable::SupertableOptions::new(
schema.clone(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts")
.with_writer_pool(pool)
.with_storage(Arc::clone(&storage))
.with_partition_strategy(PartitionStrategy::TimeRange {
column: "ts_secs".into(),
granularity_secs: DAY_GRANULARITY_SECS,
});
let st = Supertable::create(opts).expect("create");
let batch = arrow_array::RecordBatch::try_new(
schema,
vec![
Arc::new(arrow_array::UInt64Array::from(vec![0u64, 1])),
Arc::new(arrow_array::Int64Array::from(vec![10_i64, 20])),
Arc::new(arrow_array::LargeStringArray::from(vec!["a", "b"])),
],
)
.expect("batch");
{
let mut w = st.writer().expect("writer");
w.append(&batch).expect("append");
w.commit()
.expect("TimeRange commit must succeed for a single-bucket batch");
}
let r = st.reader();
let m = r.manifest();
let list_entries = m.get_all_list_entries();
assert_eq!(
list_entries.len(),
1,
"single-bucket commit produces one part"
);
assert_eq!(list_entries[0].partition_key.len(), PARTITION_KEY_BYTES);
let bucket = u64::from_le_bytes(
list_entries[0]
.partition_key
.as_slice()
.try_into()
.expect("8-byte le"),
);
assert_eq!(bucket, 0, "ts in [10, 20] @ granularity 86400 → bucket 0");
}
#[test]
fn time_range_superfile_spanning_two_buckets_errors() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let schema = Arc::new(arrow_schema::Schema::new(vec![
arrow_schema::Field::new("doc_id", arrow_schema::DataType::UInt64, false),
arrow_schema::Field::new("ts_secs", arrow_schema::DataType::Int64, false),
arrow_schema::Field::new("title", arrow_schema::DataType::LargeUtf8, false),
]));
let tk: Arc<dyn Tokenizer> = default_tokenizer();
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(RAYON_POOL_THREADS)
.build()
.expect("pool"),
);
let opts = infino::supertable::SupertableOptions::new(
schema.clone(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("opts")
.with_writer_pool(pool)
.with_storage(Arc::clone(&storage))
.with_partition_strategy(PartitionStrategy::TimeRange {
column: "ts_secs".into(),
granularity_secs: DAY_GRANULARITY_SECS,
});
let st = Supertable::create(opts).expect("create");
let batch = arrow_array::RecordBatch::try_new(
schema,
vec![
Arc::new(arrow_array::UInt64Array::from(vec![0u64, 1])),
Arc::new(arrow_array::Int64Array::from(vec![10_i64, 86_500])),
Arc::new(arrow_array::LargeStringArray::from(vec!["a", "b"])),
],
)
.expect("batch");
let mut w = st.writer().expect("writer");
w.append(&batch).expect("append");
let err = w.commit().expect_err("spanning two buckets must error");
let s = format!("{err}");
assert!(
s.contains("spans buckets"),
"expected SuperfileSpansPartition with spans-buckets detail; got: {s}"
);
}