use std::{
cmp,
collections::HashMap,
fmt, io,
marker::PhantomData,
mem,
sync::{Arc, atomic::Ordering},
time,
};
use arrow::ipc::writer::StreamWriter;
use arrow_array::{
Array, ArrayRef, Decimal128Array, FixedSizeListArray, Float32Array, RecordBatch,
};
use bytes::Bytes;
use chrono::Utc;
use datafusion::prelude::Expr;
use object_store::{PutPayload, UploadPart};
use rayon::prelude::*;
use tokio::{runtime::Handle, task::block_in_place, time::sleep};
use super::{
build::fanout_shards,
error::BuildError,
handle::{Supertable, SupertableInner},
manifest::{
FtsSummaryAgg, ManifestSnapshot, ScalarStatsAgg, SubsectionOffsets, SuperfileEntry,
SuperfileUri, VectorSummary, bloom::BloomBuilder,
},
mutations::{
CommitError, CommitResult, MAX_TARGETS_PER_MUTATION, MutationError, MutationStats,
PendingDelete, PendingUpdate,
},
options::{DECIMAL128_PRECISION, DECIMAL128_SCALE, SupertableOptions},
utils::vector_split::split_vectors,
wal::{
WalStore,
pipeline::{self, TombstonePhaseOutcome},
state_doc::{
IdSpan, OpKind, RowId, SCHEMA_VERSION, TombstoneEntry, TombstoneOutcome, WalId,
WalState, WalStateDoc,
},
},
};
#[cfg(test)]
use crate::superfile::ReadError;
use crate::{
InfinoError,
storage::{StorageError, StorageProvider},
superfile::{
SuperfileReader,
builder::SuperfileBuilder,
format::{
CRC_BYTES,
footer::read_kv_metadata,
fts::{HEADER_SIZE as FTS_HEADER_SIZE, U64_BYTES, hdr},
kv,
vec::{
CLUSTER_IDX_ENTRY_BYTES, DIR_ENTRY_SIZE, OUTER_HEADER_SIZE, SUB_HEADER_SIZE,
U32_BYTES, dir_entry, outer_hdr, sub_hdr,
},
},
},
supertable::{
CommitError as SupertableCommitError, ManifestLoadError,
error::ManifestError,
manifest::{
ClusterCentroids,
commit::get_current_manifest_etag,
part::{self as part_mod, PartId},
},
reader_cache::DiskCacheStore,
},
};
pub struct SupertableWriter {
inner: Arc<SupertableInner>,
buffer: Vec<BufferedBatch>,
buffer_bytes: usize,
pending_updates: Vec<PendingUpdateEntry>,
pending_deletes: Vec<PendingDeleteEntry>,
}
struct PendingUpdateEntry {
wal_id: WalId,
target_ids: Vec<i128>,
preallocated_superfile_id: uuid::Uuid,
minted_id_spans: Vec<IdSpan>,
new_row_count: u32,
new_row_content_hash: String,
ipc_bytes: Bytes,
}
struct PendingDeleteEntry {
wal_id: WalId,
target_ids: Vec<i128>,
}
impl fmt::Debug for SupertableWriter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SupertableWriter")
.field("buffered_batches", &self.buffer.len())
.field("buffered_bytes", &self.buffer_bytes)
.field("manifest_id", &self.inner.manifest.load().manifest_id)
.finish()
}
}
struct BufferedBatch {
scalar: RecordBatch,
vectors: Vec<Arc<Float32Array>>,
}
fn split_buffer_into_row_shards(
buffer: Vec<BufferedBatch>,
n_shards: usize,
vector_dims: &[usize],
) -> Vec<Vec<BufferedBatch>> {
debug_assert!(n_shards > 0);
let total_rows: usize = buffer.iter().map(|b| b.scalar.num_rows()).sum();
if total_rows == 0 {
return Vec::new();
}
let base = total_rows / n_shards;
let remainder = total_rows % n_shards;
let target = |i: usize| if i < remainder { base + 1 } else { base };
let mut shards: Vec<Vec<BufferedBatch>> = (0..n_shards).map(|_| Vec::new()).collect();
let mut shard_idx = 0usize;
let mut shard_remaining = target(0);
for batch in buffer {
let n_rows = batch.scalar.num_rows();
if n_rows == 0 {
continue;
}
let mut row_cursor = 0;
while row_cursor < n_rows {
while shard_remaining == 0 && shard_idx + 1 < n_shards {
shard_idx += 1;
shard_remaining = target(shard_idx);
}
let take = cmp::min(shard_remaining, n_rows - row_cursor);
let scalar = batch.scalar.slice(row_cursor, take);
let vectors: Vec<Arc<Float32Array>> = batch
.vectors
.iter()
.enumerate()
.map(|(i, v)| {
let dim = vector_dims[i];
Arc::new(v.slice(row_cursor * dim, take * dim))
})
.collect();
shards[shard_idx].push(BufferedBatch { scalar, vectors });
row_cursor += take;
shard_remaining -= take;
}
}
shards.retain(|s| !s.is_empty());
shards
}
fn single_outcome(res: CommitResult) -> Result<MutationStats, InfinoError> {
res.outcomes
.into_iter()
.next()
.ok_or_else(|| InfinoError::Backend("commit produced no mutation outcome".to_string()))
}
impl Supertable {
pub fn append(&self, batch: &RecordBatch) -> Result<(), InfinoError> {
let mut w = self.writer()?;
w.append(batch)?;
w.commit()?;
Ok(())
}
pub fn update(
&self,
predicate: Expr,
new_rows: &RecordBatch,
) -> Result<MutationStats, InfinoError> {
let mut w = self.writer()?;
w.update(predicate, new_rows.clone())?;
single_outcome(w.commit()?)
}
pub fn delete(&self, predicate: Expr) -> Result<MutationStats, InfinoError> {
let mut w = self.writer()?;
w.delete(predicate)?;
single_outcome(w.commit()?)
}
test_visible! {
fn writer(&self) -> Result<SupertableWriter, BuildError> {
match self.inner().writer_outstanding.compare_exchange(
false,
true,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => Ok(SupertableWriter {
inner: Arc::clone(self.inner()),
buffer: Vec::new(),
buffer_bytes: 0,
pending_updates: Vec::new(),
pending_deletes: Vec::new(),
}),
Err(_) => Err(BuildError::SupertableInUse),
}
}
}
}
impl SupertableWriter {
pub fn buffered_batches(&self) -> usize {
self.buffer.len()
}
pub fn buffered_bytes(&self) -> usize {
self.buffer_bytes
}
pub fn append(&mut self, batch: &RecordBatch) -> Result<(), BuildError> {
let options = &self.inner.options;
let (scalar_no_id, _vector_slices) = split_vectors(batch, options)?;
let mut vectors = Vec::with_capacity(options.vector_columns.len());
for vc in &options.vector_columns {
let col_idx = batch
.schema()
.index_of(&vc.column)
.map_err(|_| BuildError::BatchSchemaMismatch)?;
let fsl = batch
.column(col_idx)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.ok_or(BuildError::BatchSchemaMismatch)?;
let values = fsl.values();
let f32_arr = values
.as_any()
.downcast_ref::<Float32Array>()
.ok_or(BuildError::BatchSchemaMismatch)?
.clone();
vectors.push(Arc::new(f32_arr));
}
let n_rows = scalar_no_id.num_rows();
let mut ids: Vec<i128> = Vec::with_capacity(n_rows);
{
let generator = self
.inner
.id_generator
.lock()
.expect("id_generator mutex poisoned");
for _ in 0..n_rows {
ids.push(generator.next_id());
}
}
let id_array = Decimal128Array::from(ids)
.with_precision_and_scale(DECIMAL128_PRECISION, DECIMAL128_SCALE)
.expect(
"invariant: precision 38 + scale 0 always valid \
for any i128 payload",
);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(scalar_no_id.num_columns() + 1);
columns.push(Arc::new(id_array));
columns.extend(scalar_no_id.columns().iter().cloned());
let scalar = RecordBatch::try_new(options.scalar_schema(), columns)
.map_err(|_| BuildError::BatchSchemaMismatch)?;
let bytes = scalar.get_array_memory_size()
+ vectors
.iter()
.map(|v| v.len() * mem::size_of::<f32>())
.sum::<usize>();
self.buffer.push(BufferedBatch { scalar, vectors });
self.buffer_bytes += bytes;
let threshold = (options.commit_threshold_size_mb as usize)
.saturating_mul(1024)
.saturating_mul(1024);
if threshold > 0 && self.buffer_bytes >= threshold {
self.commit_appends_internal()?;
}
Ok(())
}
pub fn delete(&mut self, predicate: Expr) -> Result<PendingDelete, MutationError> {
let _ = self
.inner
.options
.storage
.as_ref()
.ok_or(MutationError::NoStorageAttached)?;
let supertable = Supertable::from_inner(Arc::clone(&self.inner));
let target_ids = supertable
.reader()
.scan_ids_matching(predicate)
.map_err(MutationError::PredicateEval)?;
let matched = target_ids.len();
if matched > MAX_TARGETS_PER_MUTATION {
return Err(MutationError::MatchCountExceedsCap {
matched,
cap: MAX_TARGETS_PER_MUTATION,
});
}
let wal_id_value = self
.inner
.id_generator
.lock()
.expect("id_generator mutex poisoned")
.next_id();
self.pending_deletes.push(PendingDeleteEntry {
wal_id: WalId(wal_id_value),
target_ids,
});
Ok(PendingDelete { matched })
}
pub fn update(
&mut self,
predicate: Expr,
new_rows: RecordBatch,
) -> Result<PendingUpdate, MutationError> {
let _ = self
.inner
.options
.storage
.as_ref()
.ok_or(MutationError::NoStorageAttached)?;
if new_rows.schema().as_ref() != self.inner.options.schema.as_ref() {
return Err(MutationError::SchemaMismatch(format!(
"expected {:?}, got {:?}",
self.inner.options.schema.fields(),
new_rows.schema().fields()
)));
}
let supertable = Supertable::from_inner(Arc::clone(&self.inner));
let target_ids = supertable
.reader()
.scan_ids_matching(predicate)
.map_err(MutationError::PredicateEval)?;
let matched = target_ids.len();
if matched > MAX_TARGETS_PER_MUTATION {
return Err(MutationError::MatchCountExceedsCap {
matched,
cap: MAX_TARGETS_PER_MUTATION,
});
}
let new_row_count = new_rows.num_rows();
if matched != new_row_count {
return Err(MutationError::CardinalityMismatch {
matched,
new_rows: new_row_count,
});
}
if matched == 0 {
return Ok(PendingUpdate { matched: 0 });
}
let (wal_id_value, minted_id_spans, preallocated_superfile_id) = {
let idgen = self.inner.id_generator.lock().expect("idgen mutex");
let spans = idgen
.reserve_range(matched as u32)
.into_iter()
.map(|(first, last)| IdSpan {
first: RowId(first),
last: RowId(last),
})
.collect::<Vec<_>>();
let wal_id_value = idgen.next_id();
let preallocated = uuid::Uuid::new_v4();
(wal_id_value, spans, preallocated)
};
let ipc_bytes = encode_record_batch_ipc(&new_rows).map_err(|e| {
MutationError::Storage(StorageError::Permanent {
uri: "ipc encode".into(),
source: Box::new(io::Error::other(e)),
})
})?;
let content_hash = blake3::hash(&ipc_bytes).to_hex().to_string();
self.pending_updates.push(PendingUpdateEntry {
wal_id: WalId(wal_id_value),
target_ids,
preallocated_superfile_id,
minted_id_spans,
new_row_count: matched as u32,
new_row_content_hash: content_hash,
ipc_bytes,
});
Ok(PendingUpdate { matched })
}
pub fn commit(&mut self) -> Result<CommitResult, CommitError> {
if !self.buffer.is_empty() {
self.commit_appends_internal()
.map_err(CommitError::AppendFlush)?;
}
let total_mutations = self.pending_updates.len() + self.pending_deletes.len();
let mut committed_wal_ids: Vec<WalId> = Vec::with_capacity(total_mutations);
let mut outcomes: Vec<MutationStats> = Vec::with_capacity(total_mutations);
let mut updates_to_run = mem::take(&mut self.pending_updates);
let mut update_cursor = 0usize;
while update_cursor < updates_to_run.len() {
let entry = &updates_to_run[update_cursor];
match self.drive_one_update(entry) {
Ok(outcome) => {
committed_wal_ids.push(outcome.wal_id);
outcomes.push(outcome);
update_cursor += 1;
}
Err(cause) => {
let remaining: Vec<PendingUpdateEntry> =
updates_to_run.split_off(update_cursor + 1);
self.pending_updates = remaining;
return Err(CommitError::PartialCommit {
committed_wal_ids,
committed: outcomes.len(),
total: total_mutations,
cause: Box::new(cause),
});
}
}
}
let mut deletes_to_run = mem::take(&mut self.pending_deletes);
let mut delete_cursor = 0usize;
while delete_cursor < deletes_to_run.len() {
let entry = &deletes_to_run[delete_cursor];
match self.drive_one_delete(entry) {
Ok(outcome) => {
committed_wal_ids.push(outcome.wal_id);
outcomes.push(outcome);
delete_cursor += 1;
}
Err(cause) => {
let remaining: Vec<PendingDeleteEntry> =
deletes_to_run.split_off(delete_cursor + 1);
self.pending_deletes = remaining;
return Err(CommitError::PartialCommit {
committed_wal_ids,
committed: outcomes.len(),
total: total_mutations,
cause: Box::new(cause),
});
}
}
}
Ok(CommitResult {
wal_ids: committed_wal_ids,
outcomes,
})
}
fn drive_one_update(&self, entry: &PendingUpdateEntry) -> Result<MutationStats, MutationError> {
let storage = self
.inner
.options
.storage
.as_ref()
.ok_or(MutationError::NoStorageAttached)?
.clone();
let wal_doc = WalStateDoc {
wal_id: entry.wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "writer.update()".into(),
target_ids: entry.target_ids.iter().map(|&v| RowId(v)).collect(),
new_row_count: Some(entry.new_row_count),
new_row_content_hash: Some(entry.new_row_content_hash.clone()),
preallocated_superfile_id: Some(entry.preallocated_superfile_id),
minted_id_spans: entry.minted_id_spans.clone(),
tombstone_progress: entry
.target_ids
.iter()
.map(|&v| TombstoneEntry {
target_id: RowId(v),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
})
.collect(),
};
let wal_store = WalStore::new(Arc::clone(&storage));
let supertable = Supertable::from_inner(Arc::clone(&self.inner));
let wal_id = entry.wal_id;
let ipc_bytes = entry.ipc_bytes.clone();
let drive = async move {
wal_store
.put_arrow(wal_id, ipc_bytes)
.await
.map_err(MutationError::WalStore)?;
let etag = wal_store
.create(&wal_doc)
.await
.map_err(MutationError::WalStore)?;
let (_outcome, doc_after_append, etag_after_append) =
pipeline::run_append_phase(&supertable, &wal_store, &wal_doc, &etag).await?;
let (outcome, _post, _post_etag) = pipeline::run_tombstone_phase(
&supertable,
&wal_store,
&doc_after_append,
&etag_after_append,
)
.await?;
let (n_t, n_nf) = match outcome {
TombstonePhaseOutcome::Applied {
n_tombstoned,
n_not_found,
}
| TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned,
n_not_found,
} => (n_tombstoned, n_not_found),
};
let _ = wal_store.delete_arrow(wal_id).await;
let _ = wal_store.delete_state(wal_id).await;
Ok::<_, MutationError>((n_t, n_nf))
};
let (n_tombstoned, n_not_found) = match Handle::try_current() {
Ok(handle) => block_in_place(|| handle.block_on(drive))?,
Err(_) => self.inner.query_runtime().block_on(drive)?,
};
Ok(MutationStats {
wal_id: entry.wal_id,
matched: entry.target_ids.len(),
n_tombstoned,
n_not_found,
})
}
fn drive_one_delete(&self, entry: &PendingDeleteEntry) -> Result<MutationStats, MutationError> {
let storage = self
.inner
.options
.storage
.as_ref()
.ok_or(MutationError::NoStorageAttached)?
.clone();
let wal_doc = WalStateDoc {
wal_id: entry.wal_id,
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "writer.delete()".into(),
target_ids: entry.target_ids.iter().map(|&v| RowId(v)).collect(),
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: entry
.target_ids
.iter()
.map(|&v| TombstoneEntry {
target_id: RowId(v),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
})
.collect(),
};
let wal_store = WalStore::new(Arc::clone(&storage));
let supertable = Supertable::from_inner(Arc::clone(&self.inner));
let wal_id = entry.wal_id;
let drive = async move {
let etag = wal_store
.create(&wal_doc)
.await
.map_err(MutationError::WalStore)?;
let (outcome, _post, _post_etag) =
pipeline::run_tombstone_phase(&supertable, &wal_store, &wal_doc, &etag).await?;
let (n_t, n_nf) = match outcome {
TombstonePhaseOutcome::Applied {
n_tombstoned,
n_not_found,
}
| TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned,
n_not_found,
} => (n_tombstoned, n_not_found),
};
let _ = wal_store.delete_state(wal_id).await;
Ok::<_, MutationError>((n_t, n_nf))
};
let (n_tombstoned, n_not_found) = match Handle::try_current() {
Ok(handle) => block_in_place(|| handle.block_on(drive))?,
Err(_) => self.inner.query_runtime().block_on(drive)?,
};
Ok(MutationStats {
wal_id: entry.wal_id,
matched: entry.target_ids.len(),
n_tombstoned,
n_not_found,
})
}
fn commit_appends_internal(&mut self) -> Result<(), BuildError> {
if self.buffer.is_empty() {
return Ok(());
}
let buffer = mem::take(&mut self.buffer);
self.buffer_bytes = 0;
let total_rows: usize = buffer.iter().map(|b| b.scalar.num_rows()).sum();
if total_rows == 0 {
return Ok(());
}
let writer_pool = Arc::clone(&self.inner.options.writer_pool);
let n_threads = writer_pool.current_num_threads().max(1);
let n_shards = n_threads.min(total_rows);
let vector_dims: Vec<usize> = self
.inner
.options
.vector_columns
.iter()
.map(|vc| vc.dim)
.collect();
let shards = split_buffer_into_row_shards(buffer, n_shards, &vector_dims);
let outputs: Vec<ShardOutput> = fanout_shards(&writer_pool, &shards, |slice| {
build_one_shard(slice.as_slice(), &self.inner.options)
})?;
publish_superfiles(&self.inner, outputs)?;
Ok(())
}
}
impl Drop for SupertableWriter {
fn drop(&mut self) {
self.inner
.writer_outstanding
.store(false, Ordering::Release);
}
}
pub struct ShardOutput {
bytes: Bytes,
n_docs: u64,
id_min: i128,
id_max: i128,
scalar_stats: HashMap<String, ScalarStatsAgg>,
}
impl ShardOutput {
pub fn new_with_params(
bytes: Bytes,
n_docs: u64,
id_min: i128,
id_max: i128,
scalar_stats: HashMap<String, ScalarStatsAgg>,
) -> Self {
Self {
bytes,
n_docs,
id_min,
id_max,
scalar_stats,
}
}
}
fn build_one_shard(
slice: &[BufferedBatch],
options: &SupertableOptions,
) -> Result<ShardOutput, BuildError> {
let mut builder = SuperfileBuilder::new(options.builder_options())?;
let scalar_schema = options.scalar_schema();
let id_idx = 0;
let mut id_min = i128::MAX;
let mut id_max = i128::MIN;
let mut n_docs: u64 = 0;
for buffered in slice {
let id_col = buffered
.scalar
.column(id_idx)
.as_any()
.downcast_ref::<Decimal128Array>()
.ok_or_else(|| {
BuildError::IdColumnWrongType(
options.id_column.clone(),
"<id column not Decimal128 at runtime>".to_string(),
)
})?;
for i in 0..id_col.len() {
let v = id_col.value(i);
id_min = id_min.min(v);
id_max = id_max.max(v);
}
n_docs += id_col.len() as u64;
let vector_slices: Vec<&[f32]> = buffered
.vectors
.iter()
.map(|fa| fa.values().as_ref())
.collect();
builder.add_batch(&buffered.scalar, &vector_slices)?;
}
let scalar_batches: Vec<&RecordBatch> = slice.iter().map(|b| &b.scalar).collect();
let scalar_stats = ScalarStatsAgg::from_batches(&scalar_schema, &scalar_batches);
let bytes = Bytes::from(builder.finish()?);
let (id_min, id_max) = if n_docs == 0 {
(0, 0)
} else {
(id_min, id_max)
};
Ok(ShardOutput {
bytes,
n_docs,
id_min,
id_max,
scalar_stats,
})
}
pub(crate) fn build_subsection_offsets(bytes: &Bytes) -> Option<SubsectionOffsets> {
let kvs = read_kv_metadata(bytes).ok()?;
let get = |k: &str| -> Option<u64> { kvs.get(k).and_then(|s| s.parse::<u64>().ok()) };
let vec = match (get(kv::VEC_OFFSET), get(kv::VEC_LENGTH)) {
(Some(o), Some(l)) if l > 0 => Some((o, l)),
_ => None,
};
let fts = match (get(kv::FTS_OFFSET), get(kv::FTS_LENGTH)) {
(Some(o), Some(l)) if l > 0 => Some((o, l)),
_ => None,
};
let total_size = bytes.len() as u64;
let vec_open_ranges = vec
.and_then(|(off, len)| vector_open_ranges(bytes, off, len))
.unwrap_or_default();
let fts_open_ranges = fts
.and_then(|(off, len)| fts_open_ranges(bytes, off, len))
.unwrap_or_default();
let open_blob = build_open_blob(bytes, total_size, &vec_open_ranges, &fts_open_ranges);
Some(SubsectionOffsets {
total_size,
vec,
fts,
vec_open_ranges,
fts_open_ranges,
open_blob,
})
}
fn build_open_blob(
bytes: &Bytes,
total_size: u64,
vec_open_ranges: &[(u64, u64)],
fts_open_ranges: &[(u64, u64)],
) -> Vec<(u64, Vec<u8>)> {
const PARQUET_TAIL_SPEC: u64 = 64 * 1024;
let mut blob: Vec<(u64, Vec<u8>)> =
Vec::with_capacity(1 + vec_open_ranges.len() + fts_open_ranges.len());
let parquet_tail_len = PARQUET_TAIL_SPEC.min(total_size);
let parquet_tail_start = total_size.saturating_sub(parquet_tail_len);
let slice = |off: u64, len: u64| -> Option<Vec<u8>> {
let start = off as usize;
let end = start.checked_add(len as usize)?;
bytes.get(start..end).map(|s| s.to_vec())
};
if parquet_tail_len > 0 {
match slice(parquet_tail_start, parquet_tail_len) {
Some(b) => blob.push((parquet_tail_start, b)),
None => return Vec::new(),
}
}
for &(off, len) in vec_open_ranges.iter().chain(fts_open_ranges.iter()) {
match slice(off, len) {
Some(b) => blob.push((off, b)),
None => return Vec::new(),
}
}
blob
}
fn vector_open_ranges(bytes: &Bytes, off: u64, len: u64) -> Option<Vec<(u64, u64)>> {
let start = off as usize;
let end = start.checked_add(len as usize)?;
let blob = bytes.get(start..end)?;
if blob.len() < OUTER_HEADER_SIZE + CRC_BYTES {
return None;
}
let n_columns =
read_u32_le(blob.get(outer_hdr::N_COLUMNS_OFF..outer_hdr::N_COLUMNS_OFF + U32_BYTES)?)
as usize;
let dir_offset =
read_u64_le(blob.get(outer_hdr::DIR_OFFSET_OFF..outer_hdr::DIR_OFFSET_OFF + U64_BYTES)?)
as usize;
let dir_size = n_columns.checked_mul(DIR_ENTRY_SIZE)?;
let dir_end = dir_offset.checked_add(dir_size)?.checked_add(CRC_BYTES)?;
let dir = blob.get(dir_offset..dir_offset + dir_size)?;
let mut ranges = vec![(off + dir_offset as u64, (dir_size + CRC_BYTES) as u64)];
ranges.push((off, OUTER_HEADER_SIZE as u64));
for i in 0..n_columns {
let entry = i * DIR_ENTRY_SIZE;
let subsection_off = read_u64_le(dir.get(
entry + dir_entry::SUBSECTION_OFF_OFF
..entry + dir_entry::SUBSECTION_OFF_OFF + U64_BYTES,
)?) as usize;
let subsection_len = read_u64_le(dir.get(
entry + dir_entry::SUBSECTION_LEN_OFF
..entry + dir_entry::SUBSECTION_LEN_OFF + U64_BYTES,
)?) as usize;
let codec_meta_off = read_u32_le(dir.get(
entry + dir_entry::CODEC_META_OFF_OFF
..entry + dir_entry::CODEC_META_OFF_OFF + U32_BYTES,
)?) as usize;
let codec_meta_size = read_u32_le(dir.get(
entry + dir_entry::CODEC_META_SIZE_OFF
..entry + dir_entry::CODEC_META_SIZE_OFF + U32_BYTES,
)?) as usize;
if subsection_off.checked_add(SUB_HEADER_SIZE)? > blob.len()
|| subsection_off.checked_add(subsection_len)? > blob.len()
{
return None;
}
ranges.push((off + subsection_off as u64, SUB_HEADER_SIZE as u64));
let sub = blob.get(subsection_off..subsection_off + subsection_len)?;
let centroids_off = read_u64_le(
sub.get(sub_hdr::CENTROIDS_OFF_OFF..sub_hdr::CENTROIDS_OFF_OFF + U64_BYTES)?,
) as usize;
let cluster_idx_off = read_u64_le(
sub.get(sub_hdr::CLUSTER_IDX_OFF_OFF..sub_hdr::CLUSTER_IDX_OFF_OFF + U64_BYTES)?,
) as usize;
let cluster_idx_end = cluster_idx_off.checked_add(
CLUSTER_IDX_ENTRY_BYTES
* read_u32_le(dir.get(
entry + dir_entry::N_CENT_OFF..entry + dir_entry::N_CENT_OFF + U32_BYTES,
)?) as usize,
)?;
if centroids_off < SUB_HEADER_SIZE || cluster_idx_end > subsection_len {
return None;
}
ranges.push((
off + subsection_off as u64 + cluster_idx_off as u64,
(cluster_idx_end - cluster_idx_off) as u64,
));
if codec_meta_size > 0 {
let meta_end = codec_meta_off.checked_add(codec_meta_size)?;
if meta_end > subsection_len {
return None;
}
}
}
if dir_end > blob.len() {
return None;
}
Some(merge_ranges(ranges))
}
fn fts_open_ranges(bytes: &Bytes, off: u64, len: u64) -> Option<Vec<(u64, u64)>> {
let start = off as usize;
let end = start.checked_add(len as usize)?;
let blob = bytes.get(start..end)?;
if blob.len() < FTS_HEADER_SIZE {
return None;
}
let postings_offset =
read_u64_le(blob.get(hdr::POSTINGS_OFFSET_OFF..hdr::POSTINGS_OFFSET_OFF + U64_BYTES)?)
as usize;
let doc_lengths_offset =
read_u64_le(blob.get(hdr::DOC_LENGTHS_DIR_OFF..hdr::DOC_LENGTHS_DIR_OFF + U64_BYTES)?)
as usize;
if postings_offset > blob.len()
|| doc_lengths_offset > blob.len()
|| postings_offset > doc_lengths_offset
{
return None;
}
Some(merge_ranges(vec![
(off, postings_offset as u64),
(
off + doc_lengths_offset as u64,
(blob.len() - doc_lengths_offset) as u64,
),
]))
}
fn merge_ranges(mut ranges: Vec<(u64, u64)>) -> Vec<(u64, u64)> {
ranges.retain(|&(_, len)| len > 0);
ranges.sort_unstable_by_key(|&(off, _)| off);
let mut merged: Vec<(u64, u64)> = Vec::with_capacity(ranges.len());
for (off, len) in ranges {
let end = off + len;
if let Some((last_off, last_len)) = merged.last_mut() {
let last_end = *last_off + *last_len;
if off <= last_end {
*last_len = (*last_len).max(end - *last_off);
continue;
}
}
merged.push((off, len));
}
merged
}
fn read_u32_le(bytes: &[u8]) -> u32 {
u32::from_le_bytes(bytes.try_into().expect("u32 slice length"))
}
fn read_u64_le(bytes: &[u8]) -> u64 {
u64::from_le_bytes(bytes.try_into().expect("u64 slice length"))
}
pub(crate) struct PreparedSuperfile {
pub(crate) entry: Arc<SuperfileEntry>,
pub(crate) bytes_for_store: Option<(SuperfileUri, Bytes)>,
pub(crate) bytes_for_storage: Option<(SuperfileUri, Bytes)>,
pub(crate) bytes_for_cache: Option<(SuperfileUri, Bytes)>,
}
impl PreparedSuperfile {
#[cfg(test)]
pub(crate) fn open_reader(&self) -> Option<Result<SuperfileReader, ReadError>> {
let bytes = self
.bytes_for_store
.as_ref()
.or(self.bytes_for_storage.as_ref())
.or(self.bytes_for_cache.as_ref())
.map(|(_, b)| b.clone())?;
Some(SuperfileReader::open(bytes))
}
}
pub(super) fn prepare_superfile(
inner: &SupertableInner,
shard: ShardOutput,
) -> Result<Option<PreparedSuperfile>, BuildError> {
if shard.n_docs == 0 {
return Ok(None);
}
let uri = SuperfileUri::new_v4();
let bytes_for_storage = inner.options.storage.is_some().then(|| shard.bytes.clone());
let cache_attached = inner.options.disk_cache.is_some() && inner.options.storage.is_some();
let bytes_for_store = (!cache_attached).then(|| shard.bytes.clone());
let bytes_for_cache =
(cache_attached && inner.options.prepopulate_cache_on_commit).then(|| shard.bytes.clone());
let reader =
SuperfileReader::open_with(shard.bytes.clone(), inner.options.superfile_open_options())
.map_err(|e| BuildError::Store(format!("opening superfile for summary: {e}")))?;
let mut fts_summary: HashMap<String, FtsSummaryAgg> = HashMap::new();
if let Some(fts_reader) = reader.fts() {
for fc in &inner.options.fts_columns {
let terms = fts_reader
.iter_column_terms(&fc.column)
.expect("FST bytes valid: superfile just built");
let n_terms_distinct = terms.len() as u32;
let (min_term, max_term) = match (terms.first(), terms.last()) {
(Some(min), Some(max)) => (min.clone(), max.clone()),
_ => (Vec::new(), Vec::new()),
};
let mut bloom_builder = BloomBuilder::new();
for term in &terms {
bloom_builder.insert(term);
}
fts_summary.insert(
fc.column.clone(),
FtsSummaryAgg::new_with_params(
bloom_builder.finish(),
n_terms_distinct,
(min_term, max_term),
),
);
}
}
let mut vector_summary: HashMap<String, VectorSummary> = HashMap::new();
if let Some(vec_reader) = reader.vec() {
for vc in &inner.options.vector_columns {
if let Some((centroid, radius)) = vec_reader.summary(&vc.column) {
let clusters = vec_reader
.cluster_centroids(&vc.column)
.map(|(n_cent, dim, fp32, counts)| {
ClusterCentroids::from_fp32(n_cent, dim, &fp32, counts)
})
.unwrap_or_default();
vector_summary.insert(
vc.column.clone(),
VectorSummary {
centroid,
radius,
clusters,
},
);
}
}
}
let subsection_offsets = build_subsection_offsets(&shard.bytes);
let entry = Arc::new(SuperfileEntry {
superfile_id: uuid::Uuid::new_v4(),
uri,
n_docs: shard.n_docs,
id_min: shard.id_min,
id_max: shard.id_max,
scalar_stats: shard.scalar_stats,
fts_summary,
vector_summary,
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets,
});
Ok(Some(PreparedSuperfile {
entry,
bytes_for_store: bytes_for_store.map(|b| (uri, b)),
bytes_for_storage: bytes_for_storage.map(|b| (uri, b)),
bytes_for_cache: bytes_for_cache.map(|b| (uri, b)),
}))
}
fn publish_superfiles(
inner: &SupertableInner,
outputs: Vec<ShardOutput>,
) -> Result<(), BuildError> {
let prepared: Vec<PreparedSuperfile> = inner.options.writer_pool.install(|| {
outputs
.into_par_iter()
.filter_map(|shard| prepare_superfile(inner, shard).transpose())
.collect::<Result<Vec<_>, _>>()
})?;
let mut new_entries: Vec<Arc<SuperfileEntry>> = Vec::with_capacity(prepared.len());
let mut pending_storage_writes: Vec<(SuperfileUri, Bytes)> = Vec::new();
let mut pending_cache_inserts: Vec<(SuperfileUri, Bytes)> = Vec::new();
for p in prepared {
if let Some((uri, b)) = p.bytes_for_store {
inner
.options
.store
.insert(uri, b)
.map_err(|e| BuildError::Store(e.to_string()))?;
}
if let Some(t) = p.bytes_for_storage {
pending_storage_writes.push(t);
}
if let Some(t) = p.bytes_for_cache {
pending_cache_inserts.push(t);
}
new_entries.push(p.entry);
}
if new_entries.is_empty() {
return Ok(());
}
let old = inner.manifest.load();
if let Some(storage) = inner.options.storage.as_ref().cloned() {
drop(old);
persist_commit(inner, storage, new_entries, &[], pending_storage_writes)
.map_err(|e| BuildError::Store(e.to_string()))?;
if !pending_cache_inserts.is_empty()
&& let Some(cache) = inner.options.disk_cache.as_ref().cloned()
{
warm_cache_after_commit(inner, &cache, pending_cache_inserts);
}
if let (Some(cache), Some(budget)) = (
inner.options.disk_cache.as_ref(),
inner.options.memory_budget_bytes,
) {
cache.sweep_for_budget(budget);
}
return Ok(());
}
let new = old.with_appended(new_entries);
inner.manifest.store(Arc::new(new));
Ok(())
}
pub(super) fn backoff_delay(attempt: u32) -> time::Duration {
const BASE_MS: u64 = 10;
const CAP_MS: u64 = 1000;
const MAX_SHIFT: u32 = 6;
const JITTER_RANGE_PCT: i64 = 30;
const JITTER_MODULUS: u64 = 61;
const PERCENT_DIVISOR: i64 = 100;
let exp = BASE_MS.saturating_mul(1u64 << attempt.min(MAX_SHIFT));
let capped = exp.min(CAP_MS);
let nanos = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let jitter_pct = (nanos % JITTER_MODULUS) as i64 - JITTER_RANGE_PCT;
let adjusted = ((capped as i64) + (capped as i64 * jitter_pct / PERCENT_DIVISOR)).max(1) as u64;
time::Duration::from_millis(adjusted)
}
pub(in crate::supertable) fn persist_commit(
inner: &SupertableInner,
storage: Arc<dyn StorageProvider>,
new_entries: Vec<Arc<SuperfileEntry>>,
entries_to_remove: &[Arc<SuperfileEntry>],
mut pending_storage_writes: Vec<(SuperfileUri, Bytes)>,
) -> Result<(), SupertableCommitError> {
let storage_async = Arc::clone(&storage);
let opts = Arc::clone(&inner.options);
let max_retries = opts.max_commit_retries.max(1);
let drive = async move {
let mut last_err: Option<SupertableCommitError> = None;
for attempt in 0..max_retries {
let old = inner.manifest.load_full();
let pending_writes = &mut pending_storage_writes;
match try_commit_attempt(
Arc::clone(&storage_async),
Arc::clone(&opts),
Arc::clone(&old),
&new_entries,
entries_to_remove,
pending_writes,
)
.await
{
Ok(new_manifest) => {
return Ok::<_, SupertableCommitError>(new_manifest);
}
Err(SupertableCommitError::WriteContentionExhausted)
if attempt + 1 < max_retries =>
{
refresh_inner_state_async(inner, &storage_async).await?;
last_err = Some(SupertableCommitError::WriteContentionExhausted);
sleep(backoff_delay(attempt)).await;
}
Err(e) => return Err(e),
}
}
Err(last_err.unwrap_or(SupertableCommitError::WriteContentionExhausted))
};
let new_manifest = match Handle::try_current() {
Ok(handle) => {
block_in_place(|| handle.block_on(drive))?
}
Err(_) => {
inner.query_runtime().block_on(drive)?
}
};
inner.manifest.store(Arc::new(new_manifest));
Ok(())
}
pub async fn write_superfile_list(
storage: &Arc<dyn StorageProvider>,
opts: &Arc<SupertableOptions>,
pending_storage_writes: &mut Vec<(SuperfileUri, Bytes)>,
) -> Result<(), SupertableCommitError> {
let multipart_threshold = opts.put_multipart_threshold_bytes;
let put_futs = pending_storage_writes
.iter_mut()
.enumerate()
.map(|(i, (uri, bytes))| {
let storage = Arc::clone(storage);
async move {
let path = superfile_storage_path(uri);
let result = if (bytes.len() as u64) >= multipart_threshold {
put_superfile_multipart(storage.as_ref(), &path, bytes.clone()).await
} else {
storage.put_atomic(&path, bytes.clone()).await.map(|_| ())
};
match result {
Ok(()) => Ok(i),
Err(StorageError::PreconditionFailed { .. }) => Ok(i),
Err(e) => Err(SupertableCommitError::from(e)),
}
}
});
let mut err = None;
let mut successful_writes_idx = Vec::with_capacity(put_futs.len());
for r in futures::future::join_all(put_futs).await.into_iter().rev() {
match r {
Ok(i) => successful_writes_idx.push(i),
Err(e) => err = Some(e),
}
}
for idx in successful_writes_idx {
pending_storage_writes.remove(idx);
}
if let Some(e) = err {
return Err(e);
}
Ok(())
}
pub(crate) async fn try_commit_attempt(
storage: Arc<dyn StorageProvider>,
opts: Arc<SupertableOptions>,
current_manifest: Arc<ManifestSnapshot>,
new_entries: &[Arc<SuperfileEntry>],
entries_to_remove: &[Arc<SuperfileEntry>],
pending_storage_writes: &mut Vec<(SuperfileUri, Bytes)>,
) -> Result<ManifestSnapshot, SupertableCommitError> {
write_superfile_list(&storage, &opts, pending_storage_writes).await?;
let (new_manifest, parts_to_write) = current_manifest
.update(new_entries, entries_to_remove)
.await?;
let prev_etag = get_current_manifest_etag(&storage, current_manifest).await?;
let encoded_refs: Vec<&[u8]> = parts_to_write
.iter()
.map(|ep| ep.encoded.as_slice())
.collect();
new_manifest
.write(storage.as_ref(), prev_etag.as_deref(), &encoded_refs)
.await?;
let _ = PhantomData::<(PartId, part_mod::ContentHash)>;
Ok(new_manifest)
}
async fn refresh_inner_state_async(
inner: &SupertableInner,
storage: &Arc<dyn StorageProvider>,
) -> Result<(), SupertableCommitError> {
let current = inner.manifest.load_full();
let manifest = match ManifestSnapshot::load(Some(current), storage.clone(), None).await {
Ok(manifest) => manifest,
Err(ManifestLoadError::PointerNotFound) => return Ok(()),
Err(ManifestLoadError::AlreadyLoaded) => return Ok(()),
Err(err) => {
return Err(SupertableCommitError::ManifestError(
ManifestError::ManifestLoadError(err),
));
}
};
inner.manifest.store(manifest);
Ok(())
}
fn encode_record_batch_ipc(batch: &RecordBatch) -> Result<Bytes, String> {
let mut out: Vec<u8> = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut out, &batch.schema())
.map_err(|e| format!("ipc writer init: {e}"))?;
writer.write(batch).map_err(|e| format!("ipc write: {e}"))?;
writer.finish().map_err(|e| format!("ipc finish: {e}"))?;
}
Ok(Bytes::from(out))
}
fn superfile_storage_path(uri: &SuperfileUri) -> String {
uri.storage_path()
}
async fn put_superfile_multipart(
storage: &dyn StorageProvider,
path: &str,
bytes: Bytes,
) -> Result<(), StorageError> {
const PART_BYTES: usize = 8 * (1 << 20);
match storage.head(path).await {
Ok(_) => return Err(StorageError::PreconditionFailed { uri: path.into() }),
Err(StorageError::NotFound { .. }) => {}
Err(e) => return Err(e),
}
let mut upload = storage.put_multipart(path).await?;
let total = bytes.len();
let mut parts: Vec<UploadPart> = Vec::with_capacity(total / PART_BYTES + 1);
let mut offset = 0;
while offset < total {
let end = cmp::min(offset + PART_BYTES, total);
let chunk = bytes.slice(offset..end);
parts.push(upload.put_part(PutPayload::from_bytes(chunk)));
offset = end;
}
if let Err(e) = futures::future::try_join_all(parts).await {
let _ = upload.abort().await;
return Err(StorageError::Permanent {
uri: path.into(),
source: Box::new(e),
});
}
if let Err(e) = upload.complete().await {
let _ = upload.abort().await;
return Err(StorageError::Permanent {
uri: path.into(),
source: Box::new(e),
});
}
Ok(())
}
fn warm_cache_after_commit(
inner: &SupertableInner,
cache: &Arc<DiskCacheStore>,
pending: Vec<(SuperfileUri, Bytes)>,
) {
let cache = Arc::clone(cache);
let drive = async move {
for (uri, bytes) in pending {
if let Err(e) = cache.insert_warm(&uri, bytes).await {
tracing::warn!(
"supertable: warm cache pre-population failed for {}: {} \
(superfile is durable in storage; first query will cold-fetch)",
uri.0,
e
);
}
}
};
match Handle::try_current() {
Ok(handle) => {
block_in_place(|| handle.block_on(drive));
}
Err(_) => {
inner.query_runtime().block_on(drive);
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Instant};
use arrow_array::{FixedSizeListArray, Float32Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use figment::{
Figment,
providers::{Format, Yaml},
};
use rayon::ThreadPoolBuilder;
use tempfile::TempDir;
use super::*;
use crate::{
config::Config,
superfile::{
builder::{FtsConfig, VectorConfig},
fts::reader::BoolMode,
vector::{distance::Metric, rerank_codec::RerankCodec},
},
supertable::{SupertableOptions, handle::Supertable, storage::LocalFsStorageProvider},
test_helpers::default_tokenizer as tok,
};
fn schema_id_title() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]))
}
fn fixed_list_f32(dim: usize) -> DataType {
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32,
)
}
fn schema_id_title_emb(dim: usize) -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("emb", fixed_list_f32(dim), false),
]))
}
fn options_id_title() -> SupertableOptions {
SupertableOptions::new(
schema_id_title(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tok()),
)
.expect("valid options")
}
fn options_id_title_serial() -> SupertableOptions {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("build pool"),
);
options_id_title().with_writer_pool(pool)
}
fn writer_pool_with(n: usize) -> Arc<rayon::ThreadPool> {
Arc::new(
ThreadPoolBuilder::new()
.num_threads(n)
.build()
.expect("build pool"),
)
}
fn build_simple_batch(_start: u64, n: usize) -> RecordBatch {
let titles =
LargeStringArray::from((0..n).map(|i| format!("doc {i} alpha")).collect::<Vec<_>>());
RecordBatch::try_new(schema_id_title(), vec![Arc::new(titles)]).expect("build batch")
}
#[test]
fn writer_slot_is_exclusive() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let _w = st.writer().expect("first writer");
let err = st.writer().expect_err("second writer should fail");
assert!(matches!(err, BuildError::SupertableInUse));
}
#[test]
fn writer_slot_releases_on_drop() {
let st = Supertable::create(options_id_title_serial()).expect("create");
{
let _w = st.writer().expect("first writer");
}
let _w2 = st.writer().expect("second writer after drop");
}
#[test]
fn append_then_commit_publishes_one_superfile() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 4)).expect("append");
w.commit().expect("commit");
let r = st.reader();
assert_eq!(r.manifest_id(), 1);
assert_eq!(r.n_superfiles(), 1);
assert_eq!(r.n_docs_total(), 4);
}
#[test]
fn commit_with_empty_buffer_is_noop() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.commit().expect("commit-empty");
assert_eq!(st.manifest_id(), 0, "no manifest swap on empty commit");
assert_eq!(st.reader().n_superfiles(), 0);
}
#[tokio::test]
async fn superfile_is_queryable_via_store() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 4)).expect("append");
w.commit().expect("commit");
let r = st.reader();
let superfile = &r.manifest().superfiles[0];
let store = &st.options().store;
let sf_reader = store.reader(&superfile.uri).expect("reader");
let hits = sf_reader
.bm25_hits_async("title", "alpha", 10, BoolMode::Or)
.await
.expect("bm25");
assert_eq!(hits.len(), 4);
}
#[test]
fn superfile_entry_records_id_range_and_n_docs() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(100, 3)).expect("a");
w.append(&build_simple_batch(50, 2)).expect("b");
w.commit().expect("commit");
let r = st.reader();
let seg = &r.manifest().superfiles[0];
assert_eq!(seg.n_docs, 5);
assert!(seg.id_min > 0);
assert!(seg.id_max > seg.id_min, "id_max should exceed id_min");
}
#[test]
fn superfile_entry_carries_fts_summary() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 4)).expect("append");
w.commit().expect("commit");
let r = st.reader();
let seg = &r.manifest().superfiles[0];
let fts = seg
.fts_summary
.get("title")
.expect("title FTS summary present");
assert!(
fts.n_terms_distinct >= 3,
"expected ≥ 3 distinct terms, got {}",
fts.n_terms_distinct,
);
assert!(fts.may_contain(b"alpha"));
assert!(fts.may_contain(b"doc"));
let (min_term, max_term) = fts.term_range.as_ref().expect("non-empty FST has a range");
assert!(!min_term.is_empty());
assert!(!max_term.is_empty());
assert!(min_term <= max_term, "min_term <= max_term invariant");
}
fn build_vector_batch(_start: u64, n: usize, dim: usize) -> RecordBatch {
let titles = LargeStringArray::from((0..n).map(|i| format!("doc {i}")).collect::<Vec<_>>());
let mut flat = Vec::with_capacity(n * dim);
for i in 0..n {
for j in 0..dim {
flat.push(((i + j) as f32) / 100.0);
}
}
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
let values = Float32Array::from(flat);
let fsl = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
.expect("FSL");
RecordBatch::try_new(
schema_id_title_emb(dim),
vec![Arc::new(titles), Arc::new(fsl)],
)
.expect("batch")
}
fn options_with_vector(dim: usize) -> SupertableOptions {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("build pool"),
);
SupertableOptions::new(
schema_id_title_emb(dim),
vec![],
vec![VectorConfig {
column: "emb".into(),
dim,
n_cent: 4,
rot_seed: 7,
metric: Metric::Cosine,
rerank_codec: RerankCodec::Fp32,
}],
None,
)
.expect("valid options")
.with_writer_pool(pool)
}
#[test]
fn superfile_entry_carries_vector_summary() {
let dim = 16;
let st = Supertable::create(options_with_vector(dim)).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_vector_batch(0, 8, dim)).expect("append");
w.commit().expect("commit");
let r = st.reader();
let seg = &r.manifest().superfiles[0];
let vs = seg
.vector_summary
.get("emb")
.expect("emb vector summary present");
assert_eq!(vs.centroid.len(), dim);
assert!(vs.radius >= 0.0);
assert!(
!vs.clusters.is_empty(),
"cluster centroids must be populated"
);
assert_eq!(vs.clusters.dim as usize, dim);
assert!(vs.clusters.n_cent >= 1);
assert_eq!(vs.clusters.counts.len(), vs.clusters.n_cent as usize);
assert_eq!(vs.clusters.mins.len(), vs.clusters.n_cent as usize);
assert_eq!(vs.clusters.scales.len(), vs.clusters.n_cent as usize);
assert_eq!(vs.clusters.codes.len(), vs.clusters.n_cent as usize * dim);
let total: u64 = vs.clusters.counts.iter().map(|&c| c as u64).sum();
assert_eq!(total, seg.n_docs);
}
#[test]
fn open_blob_omits_fp32_centroids_keeps_cluster_idx() {
let dim = 64;
let st = Supertable::create(options_with_vector(dim)).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_vector_batch(0, 8, dim)).expect("append");
w.commit().expect("commit");
let r = st.reader();
let seg = &r.manifest().superfiles[0];
let vs = seg.vector_summary.get("emb").expect("emb summary");
let n_cent = vs.clusters.n_cent as usize;
assert!(n_cent >= 1 && vs.clusters.dim as usize == dim);
let offsets = seg
.subsection_offsets
.as_ref()
.expect("subsection offsets captured at commit");
let centroids_bytes = (n_cent * dim * 4) as u64;
let cluster_idx_bytes = (n_cent * CLUSTER_IDX_ENTRY_BYTES) as u64;
assert!(
offsets
.vec_open_ranges
.iter()
.all(|&(_, len)| len < centroids_bytes),
"open_blob must not carry fp32 centroids; ranges={:?}, centroids={centroids_bytes} B",
offsets.vec_open_ranges,
);
assert!(
offsets
.vec_open_ranges
.iter()
.any(|&(_, len)| len == cluster_idx_bytes),
"open_blob must carry cluster_idx ({cluster_idx_bytes} B); ranges={:?}",
offsets.vec_open_ranges,
);
}
#[test]
fn commit_produces_one_superfile_per_writer_pool_thread() {
for n_threads in [1usize, 2, 4] {
let opts = options_id_title().with_writer_pool(writer_pool_with(n_threads));
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
for i in 0..n_threads * 2 {
w.append(&build_simple_batch(i as u64 * 10, 3))
.expect("append");
}
w.commit().expect("commit");
let r = st.reader();
assert_eq!(
r.n_superfiles(),
n_threads,
"expected {n_threads} superfiles for {n_threads}-thread pool",
);
assert_eq!(r.n_docs_total(), (n_threads * 2 * 3) as u64);
}
}
#[test]
fn commit_with_fewer_batches_than_threads_skips_empty_shards() {
let opts = options_id_title().with_writer_pool(writer_pool_with(4));
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 1)).expect("a");
w.append(&build_simple_batch(1, 1)).expect("b");
w.commit().expect("commit");
let r = st.reader();
assert_eq!(r.n_superfiles(), 2);
assert_eq!(r.n_docs_total(), 2);
}
#[test]
fn apply_config_with_fixed_writer_threads_emits_that_many_superfiles() {
let yaml = r#"
commit_threshold_size_mb: 1024
supertable:
reader_threads: 1
writer_threads: 4
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
let opts = options_id_title().apply_config(&cfg).expect("apply_config");
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
for i in 0..8u64 {
w.append(&build_simple_batch(i * 10, 3)).expect("append");
}
w.commit().expect("commit");
let r = st.reader();
assert_eq!(
r.n_superfiles(),
4,
"writer_threads=4 should yield 4 shards"
);
assert_eq!(r.n_docs_total(), 24);
}
#[test]
fn append_auto_flushes_when_buffer_crosses_threshold() {
let opts = options_id_title_serial().with_commit_threshold_size_mb(1);
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
let batch = build_simple_batch(0, 50_000);
w.append(&batch).expect("append");
assert_eq!(st.manifest_id(), 1, "auto-flush should fire");
assert_eq!(w.buffered_batches(), 0, "buffer drained on auto-flush");
w.commit().expect("commit-empty");
assert_eq!(st.manifest_id(), 1);
}
#[test]
fn append_does_not_auto_flush_when_threshold_zero() {
let opts = options_id_title_serial().with_commit_threshold_size_mb(0);
let st = Supertable::create(opts).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 50_000)).expect("append");
assert_eq!(st.manifest_id(), 0, "no auto-flush at threshold=0");
assert!(w.buffered_batches() > 0);
}
#[ignore = "known O(n) regression: manifest part rewrite on every commit"]
#[test]
fn commit_latency_is_constant_with_localfs() {
const N: usize = 100;
const DOCS_PER_COMMIT: usize = 64;
const MAX_GROWTH_FACTOR: f64 = 2.0;
let dir = TempDir::new().expect("tempdir");
let storage = Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = options_id_title_serial().with_storage(storage);
let st = Supertable::create(opts).expect("create");
let mut latencies_ms: Vec<u128> = Vec::with_capacity(N);
for i in 0..N {
let batch = build_simple_batch(i as u64, DOCS_PER_COMMIT);
let t0 = Instant::now();
st.append(&batch).expect("append");
latencies_ms.push(t0.elapsed().as_millis());
}
let avg = |slice: &[u128]| slice.iter().sum::<u128>() as f64 / slice.len() as f64;
let first5_avg = avg(&latencies_ms[..5]);
let last5_avg = avg(&latencies_ms[N - 5..]);
let ratio = last5_avg / first5_avg.max(1.0);
println!(
"first-5 avg: {first5_avg:.1}ms last-5 avg: {last5_avg:.1}ms ratio: {ratio:.1}x"
);
assert!(
ratio <= MAX_GROWTH_FACTOR,
"commit latency grew {ratio:.1}x from first-5 ({first5_avg:.1}ms) to \
last-5 ({last5_avg:.1}ms) — O(n) growth in manifest rewrite path"
);
}
#[test]
fn each_commit_appends_to_existing_superfiles() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 2)).expect("a1");
w.commit().expect("c1");
w.append(&build_simple_batch(10, 3)).expect("a2");
w.commit().expect("c2");
w.append(&build_simple_batch(20, 1)).expect("a3");
w.commit().expect("commit");
let r = st.reader();
assert_eq!(r.manifest_id(), 3);
assert_eq!(r.n_superfiles(), 3);
assert_eq!(r.n_docs_total(), 6);
}
#[test]
fn merge_ranges_coalesces_overlapping_and_adjacent_drops_empty() {
let input = vec![
(100u64, 10u64), (0, 0), (10, 10), (15, 10), (25, 5), ];
let merged = merge_ranges(input);
assert_eq!(merged, vec![(10, 20), (100, 10)]);
}
#[test]
fn merge_ranges_empty_input_is_empty() {
assert!(merge_ranges(Vec::new()).is_empty());
}
#[test]
fn build_subsection_offsets_captures_total_size_and_fts_range() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_simple_batch(0, 8)).expect("append");
w.commit().expect("commit");
let r = st.reader();
let seg = &r.manifest().superfiles[0];
let store = &st.options().store;
let reader = store.reader(&seg.uri).expect("reader");
let offsets = seg
.subsection_offsets
.as_ref()
.expect("offsets captured at commit");
assert!(offsets.total_size > 0);
assert!(
offsets.fts.is_some(),
"an FTS superfile must record an FTS subsection"
);
assert!(
!offsets.fts_open_ranges.is_empty(),
"FTS open ranges should be populated for the cold-open fast path"
);
assert_eq!(reader.n_docs(), 8);
}
#[test]
fn build_subsection_offsets_on_garbage_returns_none() {
let garbage = Bytes::from_static(b"not a parquet file at all");
assert!(build_subsection_offsets(&garbage).is_none());
}
#[test]
fn append_with_vector_column_publishes_superfile() {
let dim = 16;
let st = Supertable::create(options_with_vector(dim)).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_vector_batch(0, 8, dim)).expect("append");
assert!(
w.buffered_bytes() > 0,
"buffered_bytes must account for the vector payload"
);
w.commit().expect("commit");
let r = st.reader();
assert_eq!(r.n_superfiles(), 1);
assert_eq!(r.n_docs_total(), 8);
}
fn storage_backed_st(dir: &TempDir) -> Supertable {
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
Supertable::create(options_id_title_serial().with_storage(storage)).expect("create")
}
fn row(title: &str) -> RecordBatch {
RecordBatch::try_new(
schema_id_title(),
vec![Arc::new(LargeStringArray::from(vec![title]))],
)
.expect("row batch")
}
#[test]
fn delete_tombstones_matching_row() {
use datafusion::prelude::{col, lit};
let dir = TempDir::new().expect("tempdir");
let st = storage_backed_st(&dir);
st.append(&build_simple_batch(0, 1)).expect("append");
let stats = st
.delete(col("title").eq(lit("doc 0 alpha")))
.expect("delete");
assert_eq!(stats.matched(), 1);
assert_eq!(stats.n_tombstoned(), 1);
}
#[test]
fn delete_unmatched_predicate_is_noop() {
use datafusion::prelude::{col, lit};
let dir = TempDir::new().expect("tempdir");
let st = storage_backed_st(&dir);
st.append(&build_simple_batch(0, 1)).expect("append");
let stats = st
.delete(col("title").eq(lit("no such title")))
.expect("delete");
assert_eq!(stats.matched(), 0);
assert_eq!(stats.n_tombstoned(), 0);
}
#[test]
fn update_replaces_matching_row() {
use datafusion::prelude::{col, lit};
let dir = TempDir::new().expect("tempdir");
let st = storage_backed_st(&dir);
st.append(&row("draft")).expect("append");
let stats = st
.update(col("title").eq(lit("draft")), &row("published"))
.expect("update");
assert_eq!(stats.matched(), 1);
assert_eq!(stats.n_tombstoned(), 1);
}
#[test]
fn update_cardinality_mismatch_is_rejected() {
use datafusion::prelude::{col, lit};
let dir = TempDir::new().expect("tempdir");
let st = storage_backed_st(&dir);
st.append(&row("draft")).expect("append");
let two = RecordBatch::try_new(
schema_id_title(),
vec![Arc::new(LargeStringArray::from(vec!["a", "b"]))],
)
.expect("two-row batch");
let mut w = st.writer().expect("writer");
let err = w
.update(col("title").eq(lit("draft")), two)
.expect_err("cardinality mismatch");
assert!(
matches!(
err,
MutationError::CardinalityMismatch {
matched: 1,
new_rows: 2
}
),
"{err:?}"
);
}
#[test]
fn update_without_storage_is_rejected() {
use datafusion::prelude::{col, lit};
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
let err = w
.update(col("title").eq(lit("x")), row("y"))
.expect_err("no storage");
assert!(matches!(err, MutationError::NoStorageAttached), "{err:?}");
}
#[test]
fn delete_without_storage_is_rejected() {
use datafusion::prelude::{col, lit};
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
let err = w.delete(col("title").eq(lit("x"))).expect_err("no storage");
assert!(matches!(err, MutationError::NoStorageAttached), "{err:?}");
}
#[test]
fn buffered_bytes_grows_then_resets_on_commit() {
let st = Supertable::create(options_id_title_serial()).expect("create");
let mut w = st.writer().expect("writer");
assert_eq!(w.buffered_bytes(), 0);
w.append(&build_simple_batch(0, 4)).expect("append");
assert!(w.buffered_bytes() > 0, "buffer cost recorded");
assert_eq!(w.buffered_batches(), 1);
w.commit().expect("commit");
assert_eq!(w.buffered_bytes(), 0, "buffer drained on commit");
assert_eq!(w.buffered_batches(), 0);
}
}