pub struct CompactionExecutor { /* private fields */ }Expand description
Executes compaction plans: reads N small files, merges them into a single AI-Lake file with a rebuilt index, and commits to the catalog.
The index algorithm is chosen via CompactionIndexStrategy (default: Auto,
which detects GPU / CPU cores at compaction time — the same heuristic used
by write_batch_auto).
For large tables use compact_deferred / run_deferred: the merged Parquet
is persisted immediately and the HNSW build runs in a background Tokio task,
decoupling I/O cost from CPU cost.
Implementations§
Source§impl CompactionExecutor
impl CompactionExecutor
pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self
Sourcepub fn with_index_strategy(self, strategy: CompactionIndexStrategy) -> Self
pub fn with_index_strategy(self, strategy: CompactionIndexStrategy) -> Self
Override the default (Auto) index strategy for this executor.
Sourcepub async fn compact(
&self,
files: &[DataFileEntry],
output_path: &str,
) -> AilakeResult<DataFileEntry>
pub async fn compact( &self, files: &[DataFileEntry], output_path: &str, ) -> AilakeResult<DataFileEntry>
Merge files into a single new file at output_path.
Reads all input files in parallel to minimise S3 latency, then
rebuilds the HNSW / IVF-PQ index synchronously. For very large merges
(N > 100 000 vectors) prefer compact_deferred, which offloads the
index build to a background Tokio task.
Returns the DataFileEntry for the merged file.
Sourcepub async fn compact_incremental(
&self,
files: &[DataFileEntry],
output_path: &str,
) -> AilakeResult<DataFileEntry>
pub async fn compact_incremental( &self, files: &[DataFileEntry], output_path: &str, ) -> AilakeResult<DataFileEntry>
Merge files into a single new file using incremental HNSW insertion.
Identifies the dominant file — the file holding >= 40 % of the total
row count — loads its existing HNSW graph from the AILK section, then
calls HnswIndex::insert_node for every vector from the remaining files.
Complexity vs compact:
- Full rebuild: O(N log N), N = total rows.
- Incremental (this method): O(N_dom) deserialization + O(N_small × log N_dom). For a 90 / 10 split (N = 1 M, N_dom = 900 k) the speedup is ~7×.
Fallbacks (all degrade gracefully to compact):
- No file holds >= 40 % of rows.
- Dominant file’s HNSW cannot be loaded (IVF-PQ,
IndexStatus::Indexing, corrupt).
RowId contract: dominant file’s vectors are placed first in the merged Parquet (positions 0..N_dom-1); other files follow. The existing RowIds from the dominant HNSW remain valid; new nodes receive RowIds N_dom..N-1.
Sourcepub async fn compact_deferred(
&self,
files: &[DataFileEntry],
output_path: &str,
catalog: Arc<dyn CatalogProvider>,
table: &TableIdent,
) -> AilakeResult<DataFileEntry>
pub async fn compact_deferred( &self, files: &[DataFileEntry], output_path: &str, catalog: Arc<dyn CatalogProvider>, table: &TableIdent, ) -> AilakeResult<DataFileEntry>
Merge files into a single new file at output_path, writing Parquet
immediately and building the HNSW / IVF-PQ index in a background Tokio task.
The merged file appears in the catalog as IndexStatus::Indexing until
the background task completes; queries fall back to flat scan during that
window (same behaviour as write_batch_deferred).
Returns the DataFileEntry with IndexStatus::Indexing. The entry
transitions to Ready automatically when the background build finishes.
Sourcepub async fn run(
&self,
planner: &CompactionPlanner,
table: &TableIdent,
catalog: Arc<dyn CatalogProvider>,
output_prefix: &str,
) -> AilakeResult<Option<DataFileEntry>>
pub async fn run( &self, planner: &CompactionPlanner, table: &TableIdent, catalog: Arc<dyn CatalogProvider>, output_prefix: &str, ) -> AilakeResult<Option<DataFileEntry>>
Full compaction workflow: plan, compact (synchronous HNSW rebuild), drop old files from catalog, commit.
Sourcepub async fn run_deferred(
&self,
planner: &CompactionPlanner,
table: &TableIdent,
catalog: Arc<dyn CatalogProvider>,
output_prefix: &str,
) -> AilakeResult<Option<DataFileEntry>>
pub async fn run_deferred( &self, planner: &CompactionPlanner, table: &TableIdent, catalog: Arc<dyn CatalogProvider>, output_prefix: &str, ) -> AilakeResult<Option<DataFileEntry>>
Full compaction workflow with deferred HNSW build: plan, write merged
Parquet immediately, commit as Indexing, spawn background index build.
Use for large tables where inline HNSW rebuild blocks too long.
Auto Trait Implementations§
impl !RefUnwindSafe for CompactionExecutor
impl !UnwindSafe for CompactionExecutor
impl Freeze for CompactionExecutor
impl Send for CompactionExecutor
impl Sync for CompactionExecutor
impl Unpin for CompactionExecutor
impl UnsafeUnpin for CompactionExecutor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more