Skip to main content

ailake_query/
compaction.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5use ailake_catalog::{
6    make_data_file_entry, make_data_file_entry_indexing, CatalogProvider, DataFileEntry,
7    NewSnapshot, SnapshotOperation, TableIdent, VectorIndexInfo,
8};
9use ailake_core::{AilakeResult, RowId, VectorStoragePolicy};
10use ailake_file::{AilakeFileReader, AilakeFileWriter};
11use ailake_store::Store;
12use ailake_vec::compute_centroid_and_radius;
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use bytes::Bytes;
16use futures::future::try_join_all;
17
18use crate::writer::build_and_patch_index;
19
20/// Index strategy for the merged file produced by compaction.
21#[derive(Debug, Clone, Default)]
22pub enum CompactionIndexStrategy {
23    /// Detect GPU / CPU cores at compaction time and pick the best index.
24    /// IVF-PQ on GPU/many-core machines; HNSW elsewhere. (default)
25    #[default]
26    Auto,
27    /// Always rebuild with HNSW — highest recall, larger index.
28    ForceHnsw,
29    /// Always rebuild with IVF-PQ — smaller index, better S3 throughput.
30    ///
31    /// Recommended for large compactions (N > 100 000) on CPU-only machines
32    /// where HNSW rebuild cost becomes prohibitive.
33    ForceIvfPq,
34}
35
36#[derive(Debug, Clone)]
37pub struct CompactionConfig {
38    /// Trigger compaction only if at least this many files are eligible.
39    pub min_files_to_compact: usize,
40    /// Target output file size in bytes. Files below this are merged.
41    pub target_file_size_bytes: u64,
42    /// Index algorithm for the merged output file.
43    pub index_strategy: CompactionIndexStrategy,
44    /// Maximum files merged in a single compaction pass.
45    ///
46    /// Candidates are sorted smallest-first; only the first `max_files_per_pass`
47    /// are compacted each run. This bounds peak RAM and HNSW rebuild CPU cost —
48    /// O(N log N) stays proportional to this limit rather than table size.
49    /// Default: 20. Set to `usize::MAX` to compact all eligible files at once.
50    pub max_files_per_pass: usize,
51}
52
53impl Default for CompactionConfig {
54    fn default() -> Self {
55        Self {
56            min_files_to_compact: 4,
57            target_file_size_bytes: 128 * 1024 * 1024, // 128 MB
58            index_strategy: CompactionIndexStrategy::Auto,
59            max_files_per_pass: 20,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy)]
65pub enum CompactionMode {
66    Full,    // compact all files below target size
67    Partial, // compact the smallest N files
68}
69
70pub struct CompactionPlanner {
71    config: CompactionConfig,
72}
73
74impl CompactionPlanner {
75    pub fn new(config: CompactionConfig) -> Self {
76        Self { config }
77    }
78
79    /// Select files to compact.
80    ///
81    /// Picks files smaller than `target_file_size_bytes`, sorts them smallest-first
82    /// (cheapest to read), and caps the selection at `max_files_per_pass`. This
83    /// tiered approach prevents a single pass from compacting the entire table into
84    /// memory when thousands of small files exist.
85    pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
86        let mut candidates: Vec<DataFileEntry> = files
87            .iter()
88            .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
89            .cloned()
90            .collect();
91        if candidates.len() < self.config.min_files_to_compact {
92            debug!(
93                "ailake: compaction skipped — {} eligible files < min_files_to_compact={}",
94                candidates.len(),
95                self.config.min_files_to_compact
96            );
97            return vec![];
98        }
99        // Sort smallest-first so each pass handles the cheapest files first.
100        // This bounds peak RAM to max_files_per_pass * avg_small_file_size.
101        candidates.sort_unstable_by_key(|f| f.file_size_bytes);
102        candidates.truncate(self.config.max_files_per_pass);
103        let total_bytes: u64 = candidates.iter().map(|f| f.file_size_bytes).sum();
104        info!(
105            "ailake: compaction plan — {} files ({} bytes) → 1 merged file",
106            candidates.len(),
107            total_bytes
108        );
109        candidates
110    }
111}
112
113/// Executes compaction plans: reads N small files, merges them into a single
114/// AI-Lake file with a rebuilt index, and commits to the catalog.
115///
116/// The index algorithm is chosen via `CompactionIndexStrategy` (default: `Auto`,
117/// which detects GPU / CPU cores at compaction time — the same heuristic used
118/// by `write_batch_auto`).
119///
120/// For large tables use `compact_deferred` / `run_deferred`: the merged Parquet
121/// is persisted immediately and the HNSW build runs in a background Tokio task,
122/// decoupling I/O cost from CPU cost.
123pub struct CompactionExecutor {
124    store: Arc<dyn Store>,
125    policy: VectorStoragePolicy,
126    index_strategy: CompactionIndexStrategy,
127}
128
129impl CompactionExecutor {
130    pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
131        Self {
132            store,
133            policy,
134            index_strategy: CompactionIndexStrategy::Auto,
135        }
136    }
137
138    /// Override the default (Auto) index strategy for this executor.
139    pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
140        self.index_strategy = strategy;
141        self
142    }
143
144    /// Read all input files in parallel, returning ordered (batch, embeddings) pairs.
145    async fn read_files_parallel(
146        &self,
147        files: &[DataFileEntry],
148    ) -> AilakeResult<Vec<(RecordBatch, Vec<Vec<f32>>)>> {
149        let futs = files.iter().map(|entry| {
150            let store = self.store.clone();
151            let path = entry.path.clone();
152            let column = self.policy.column_name.clone();
153            let dim = self.policy.dim;
154            async move {
155                let bytes: Bytes = store.get(&path).await?;
156                let reader = AilakeFileReader::new(bytes, &column, dim);
157                if !reader.is_ailake_file() {
158                    debug!("ailake: compaction skipping {} — not an AI-Lake file", path);
159                    return Ok::<Option<(RecordBatch, Vec<Vec<f32>>)>, ailake_core::AilakeError>(
160                        None,
161                    );
162                }
163                let pair = reader.read_parquet()?;
164                Ok(Some(pair))
165            }
166        });
167        let results = try_join_all(futs).await?;
168        Ok(results.into_iter().flatten().collect())
169    }
170
171    /// Merge `files` into a single new file at `output_path`.
172    ///
173    /// Reads all input files **in parallel** to minimise S3 latency, then
174    /// rebuilds the HNSW / IVF-PQ index synchronously. For very large merges
175    /// (N > 100 000 vectors) prefer `compact_deferred`, which offloads the
176    /// index build to a background Tokio task.
177    ///
178    /// Returns the DataFileEntry for the merged file.
179    pub async fn compact(
180        &self,
181        files: &[DataFileEntry],
182        output_path: &str,
183    ) -> AilakeResult<DataFileEntry> {
184        if files.is_empty() {
185            return Err(ailake_core::AilakeError::Catalog(
186                "compact: no files provided".into(),
187            ));
188        }
189
190        let pairs = self.read_files_parallel(files).await?;
191
192        if pairs.is_empty() {
193            return Err(ailake_core::AilakeError::Catalog(
194                "compact: no valid AI-Lake files in input".into(),
195            ));
196        }
197
198        let schema: SchemaRef = pairs[0].0.schema();
199        let (all_batches, all_embeddings): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
200        let all_embeddings: Vec<Vec<f32>> = all_embeddings.into_iter().flatten().collect();
201
202        // Concatenate all row groups into one batch
203        let merged_batch = concat_batches(schema, &all_batches)?;
204        let record_count = merged_batch.num_rows() as u64;
205
206        // Write merged file with adaptive index selection.
207        let writer = {
208            let base = AilakeFileWriter::new(self.policy.clone());
209            match &self.index_strategy {
210                CompactionIndexStrategy::Auto => base.with_auto_index(),
211                CompactionIndexStrategy::ForceHnsw => base,
212                CompactionIndexStrategy::ForceIvfPq => {
213                    let cfg = ailake_index::IvfPqConfig::for_dataset(
214                        self.policy.dim as usize,
215                        all_embeddings.len(),
216                    );
217                    base.with_ivf_pq(cfg)
218                }
219            }
220        };
221        let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
222        let file_size = file_bytes.len() as u64;
223        self.store.put(output_path, file_bytes.clone()).await?;
224
225        // Compute centroid and HNSW offsets for catalog entry
226        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
227        let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
228        let header = reader.read_header()?;
229        let ailk_start = reader.ailk_offset()?;
230
231        // Preserve row-ID continuity: merged file inherits the minimum first_row_id of
232        // its sources so commit_snapshot doesn't allocate fresh IDs and grow next_row_id.
233        let source_first_row_id = files.iter().filter_map(|f| f.first_row_id).min();
234
235        let mut entry = make_data_file_entry(
236            output_path,
237            record_count,
238            file_size,
239            &centroid,
240            VectorIndexInfo {
241                column: &self.policy.column_name,
242                dim: self.policy.dim,
243                hnsw_offset: ailk_start + header.hnsw_offset,
244                hnsw_len: header.hnsw_len,
245            },
246        );
247        entry.first_row_id = source_first_row_id;
248        Ok(entry)
249    }
250
251    /// Merge `files` into a single new file using incremental HNSW insertion.
252    ///
253    /// Identifies the **dominant file** — the file holding >= 40 % of the total
254    /// row count — loads its existing HNSW graph from the AILK section, then
255    /// calls `HnswIndex::insert_node` for every vector from the remaining files.
256    ///
257    /// **Complexity vs `compact`**:
258    /// - Full rebuild: O(N log N), N = total rows.
259    /// - Incremental (this method): O(N_dom) deserialization + O(N_small × log N_dom).
260    ///   For a 90 / 10 split (N = 1 M, N_dom = 900 k) the speedup is ~7×.
261    ///
262    /// **Fallbacks** (all degrade gracefully to `compact`):
263    /// - No file holds >= 40 % of rows.
264    /// - Dominant file's HNSW cannot be loaded (IVF-PQ, `IndexStatus::Indexing`, corrupt).
265    ///
266    /// **RowId contract**: dominant file's vectors are placed first in the merged
267    /// Parquet (positions 0..N_dom-1); other files follow. The existing RowIds from
268    /// the dominant HNSW remain valid; new nodes receive RowIds N_dom..N-1.
269    pub async fn compact_incremental(
270        &self,
271        files: &[DataFileEntry],
272        output_path: &str,
273    ) -> AilakeResult<DataFileEntry> {
274        const DOMINANT_RATIO: f64 = 0.40;
275
276        if files.is_empty() {
277            return Err(ailake_core::AilakeError::Catalog(
278                "compact_incremental: no files provided".into(),
279            ));
280        }
281
282        // Find the dominant file by record_count.
283        let total_rows: u64 = files.iter().map(|f| f.record_count).sum();
284        let dom_idx = files
285            .iter()
286            .enumerate()
287            .max_by_key(|(_, f)| f.record_count)
288            .map(|(i, _)| i)
289            .unwrap_or(0);
290        let dom_rows = files[dom_idx].record_count;
291
292        if (dom_rows as f64 / total_rows as f64) < DOMINANT_RATIO {
293            debug!(
294                "ailake: compact_incremental — no dominant file ({}/{} rows < {:.0}% threshold), \
295                 falling back to full rebuild",
296                dom_rows,
297                total_rows,
298                DOMINANT_RATIO * 100.0
299            );
300            return self.compact(files, output_path).await;
301        }
302
303        let column = self.policy.column_name.clone();
304        let dim = self.policy.dim;
305        let dom_path = files[dom_idx].path.clone();
306
307        // Read all files in parallel. Retain raw bytes only for the dominant file
308        // (needed to load its HNSW without a second round-trip).
309        let futs: Vec<_> = files
310            .iter()
311            .map(|entry| {
312                let store = self.store.clone();
313                let path = entry.path.clone();
314                let col = column.clone();
315                let is_dom = path == dom_path;
316                async move {
317                    let bytes: Bytes = store.get(&path).await?;
318                    let reader = AilakeFileReader::new(bytes.clone(), &col, dim);
319                    if !reader.is_ailake_file() {
320                        debug!(
321                            "ailake: compact_incremental skipping {} — not an AI-Lake file",
322                            path
323                        );
324                        return Ok::<
325                            Option<(RecordBatch, Vec<Vec<f32>>, bool, Option<Bytes>)>,
326                            ailake_core::AilakeError,
327                        >(None);
328                    }
329                    let (batch, vecs) = reader.read_parquet()?;
330                    let retained = if is_dom { Some(bytes) } else { None };
331                    Ok(Some((batch, vecs, is_dom, retained)))
332                }
333            })
334            .collect();
335
336        #[allow(clippy::type_complexity)]
337        let raw: Vec<(RecordBatch, Vec<Vec<f32>>, bool, Option<Bytes>)> =
338            try_join_all(futs).await?.into_iter().flatten().collect();
339
340        if raw.is_empty() {
341            return Err(ailake_core::AilakeError::Catalog(
342                "compact_incremental: no valid AI-Lake files in input".into(),
343            ));
344        }
345
346        // Separate dominant from others; dominant goes first in the merged file.
347        let mut dom_batch: Option<RecordBatch> = None;
348        let mut dom_vecs: Vec<Vec<f32>> = Vec::new();
349        let mut dom_bytes_found: Option<Bytes> = None;
350        let mut other_batches: Vec<RecordBatch> = Vec::new();
351        let mut other_vecs: Vec<Vec<f32>> = Vec::new();
352
353        for (batch, vecs, is_dom, retained) in raw {
354            if is_dom {
355                dom_batch = Some(batch);
356                dom_vecs = vecs;
357                dom_bytes_found = retained;
358            } else {
359                other_batches.push(batch);
360                other_vecs.extend(vecs);
361            }
362        }
363
364        let (dom_batch, dom_bytes) = match (dom_batch, dom_bytes_found) {
365            (Some(b), Some(byt)) => (b, byt),
366            _ => {
367                debug!(
368                    "ailake: compact_incremental — dominant file missing from read results, \
369                     falling back to full rebuild"
370                );
371                return self.compact(files, output_path).await;
372            }
373        };
374
375        // Load the dominant file's existing HNSW graph.
376        let dom_reader = AilakeFileReader::new(dom_bytes, &column, dim);
377        let mut hnsw = match dom_reader.load_index() {
378            Ok(idx) => idx,
379            Err(e) => {
380                debug!(
381                    "ailake: compact_incremental — cannot load dominant HNSW ({}), \
382                     falling back to full rebuild",
383                    e
384                );
385                return self.compact(files, output_path).await;
386            }
387        };
388
389        let dom_count = dom_batch.num_rows() as u64;
390
391        // Insert vectors from non-dominant files into the loaded graph.
392        // RowIds are assigned starting at dom_count to match positions in the merged Parquet.
393        for (j, vec) in other_vecs.iter().enumerate() {
394            hnsw.insert_node(RowId::new(dom_count + j as u64), vec.clone());
395        }
396        hnsw.quantize_to_f16();
397
398        // Assemble merged batch (dominant rows first) and all embeddings.
399        let schema: SchemaRef = dom_batch.schema();
400        let mut all_batches = vec![dom_batch];
401        all_batches.extend(other_batches);
402        let merged_batch = concat_batches(schema, &all_batches)?;
403        let record_count = merged_batch.num_rows() as u64;
404
405        let mut all_embeddings = dom_vecs;
406        all_embeddings.extend(other_vecs);
407
408        // Write the merged file using the pre-built index (no rebuild).
409        let writer = AilakeFileWriter::new(self.policy.clone());
410        let file_bytes = writer.write_with_prebuilt_hnsw(&merged_batch, &all_embeddings, &hnsw)?;
411        let file_size = file_bytes.len() as u64;
412        self.store.put(output_path, file_bytes.clone()).await?;
413
414        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
415        let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
416        let header = reader.read_header()?;
417        let ailk_start = reader.ailk_offset()?;
418
419        // Dominant file goes first in the merged output, so the merged file's first
420        // logical row was the dominant file's first row.  Use its first_row_id so
421        // commit_snapshot doesn't grow next_row_id unnecessarily.
422        let source_first_row_id = files[dom_idx].first_row_id;
423
424        let mut entry = make_data_file_entry(
425            output_path,
426            record_count,
427            file_size,
428            &centroid,
429            VectorIndexInfo {
430                column: &self.policy.column_name,
431                dim: self.policy.dim,
432                hnsw_offset: ailk_start + header.hnsw_offset,
433                hnsw_len: header.hnsw_len,
434            },
435        );
436        entry.first_row_id = source_first_row_id;
437
438        info!(
439            "ailake: compact_incremental — merged {} files into {} \
440             ({} rows from dominant + {} inserted incrementally)",
441            files.len(),
442            output_path,
443            dom_count,
444            record_count - dom_count
445        );
446
447        Ok(entry)
448    }
449
450    /// Merge `files` into a single new file at `output_path`, writing Parquet
451    /// immediately and building the HNSW / IVF-PQ index in a background Tokio task.
452    ///
453    /// The merged file appears in the catalog as `IndexStatus::Indexing` until
454    /// the background task completes; queries fall back to flat scan during that
455    /// window (same behaviour as `write_batch_deferred`).
456    ///
457    /// Returns the `DataFileEntry` with `IndexStatus::Indexing`. The entry
458    /// transitions to `Ready` automatically when the background build finishes.
459    pub async fn compact_deferred(
460        &self,
461        files: &[DataFileEntry],
462        output_path: &str,
463        catalog: Arc<dyn CatalogProvider>,
464        table: &TableIdent,
465    ) -> AilakeResult<DataFileEntry> {
466        if files.is_empty() {
467            return Err(ailake_core::AilakeError::Catalog(
468                "compact_deferred: no files provided".into(),
469            ));
470        }
471
472        let pairs = self.read_files_parallel(files).await?;
473
474        if pairs.is_empty() {
475            return Err(ailake_core::AilakeError::Catalog(
476                "compact_deferred: no valid AI-Lake files in input".into(),
477            ));
478        }
479
480        let schema: SchemaRef = pairs[0].0.schema();
481        let (all_batches, all_embeddings): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
482        let all_embeddings: Vec<Vec<f32>> = all_embeddings.into_iter().flatten().collect();
483
484        let merged_batch = concat_batches(schema, &all_batches)?;
485        let record_count = merged_batch.num_rows() as u64;
486
487        // Write Parquet-only immediately — fast path, no HNSW build.
488        let file_writer = AilakeFileWriter::new(self.policy.clone());
489        let parquet_bytes = file_writer.write_parquet_only(&merged_batch, &all_embeddings)?;
490        let file_size = parquet_bytes.len() as u64;
491        self.store.put(output_path, parquet_bytes).await?;
492
493        // Centroid available for geometric pruning during the build window.
494        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
495        let source_first_row_id = files.iter().filter_map(|f| f.first_row_id).min();
496        let mut entry = make_data_file_entry_indexing(
497            output_path,
498            record_count,
499            file_size,
500            &centroid,
501            &self.policy.column_name,
502            self.policy.dim,
503        );
504        entry.first_row_id = source_first_row_id;
505
506        // Spawn background index build; errors are logged, not propagated.
507        let store = self.store.clone();
508        let policy = self.policy.clone();
509        let table_id = table.clone();
510        let fp = output_path.to_string();
511        tokio::spawn(async move {
512            if let Err(e) = build_and_patch_index(store, catalog, policy, table_id, fp).await {
513                error!(
514                    "ailake: compaction deferred HNSW build failed — file indexed as \
515                     Parquet-only until next compaction rebuilds the index: {}",
516                    e
517                );
518            }
519        });
520
521        Ok(entry)
522    }
523
524    /// Full compaction workflow: plan, compact (synchronous HNSW rebuild),
525    /// drop old files from catalog, commit.
526    pub async fn run(
527        &self,
528        planner: &CompactionPlanner,
529        table: &TableIdent,
530        catalog: Arc<dyn CatalogProvider>,
531        output_prefix: &str,
532    ) -> AilakeResult<Option<DataFileEntry>> {
533        let all_files = catalog.list_files(table, None).await?;
534        let to_compact = planner.plan(&all_files);
535        if to_compact.is_empty() {
536            return Ok(None);
537        }
538
539        let ts = std::time::SystemTime::now()
540            .duration_since(std::time::UNIX_EPOCH)
541            .unwrap_or_else(|e| e.duration())
542            .as_millis();
543        let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
544
545        // Use incremental merge when a dominant file exists (falls back to full rebuild automatically).
546        let merged = self.compact_incremental(&to_compact, &output_path).await?;
547
548        // Commit: add merged file, remove input files (via Replace snapshot)
549        let snapshot = NewSnapshot {
550            snapshot_id: ailake_catalog::new_snapshot_id(),
551            parent_snapshot_id: None,
552            files: vec![merged.clone()],
553            operation: SnapshotOperation::Replace,
554            iceberg_schema: None,
555            extra_properties: std::collections::HashMap::new(),
556            bloom_filters: vec![],
557            equality_delete_files: vec![],
558        };
559        catalog.commit_snapshot(table, snapshot).await?;
560
561        info!(
562            "ailake: compaction committed — merged {} files into {}",
563            to_compact.len(),
564            output_path
565        );
566
567        delete_old_files(&self.store, &to_compact).await;
568
569        Ok(Some(merged))
570    }
571
572    /// Full compaction workflow with deferred HNSW build: plan, write merged
573    /// Parquet immediately, commit as `Indexing`, spawn background index build.
574    ///
575    /// Use for large tables where inline HNSW rebuild blocks too long.
576    pub async fn run_deferred(
577        &self,
578        planner: &CompactionPlanner,
579        table: &TableIdent,
580        catalog: Arc<dyn CatalogProvider>,
581        output_prefix: &str,
582    ) -> AilakeResult<Option<DataFileEntry>> {
583        let all_files = catalog.list_files(table, None).await?;
584        let to_compact = planner.plan(&all_files);
585        if to_compact.is_empty() {
586            return Ok(None);
587        }
588
589        let ts = std::time::SystemTime::now()
590            .duration_since(std::time::UNIX_EPOCH)
591            .unwrap_or_else(|e| e.duration())
592            .as_millis();
593        let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
594
595        let merged = self
596            .compact_deferred(&to_compact, &output_path, catalog.clone(), table)
597            .await?;
598
599        // Commit immediately: merged file in Indexing state replaces input files.
600        let snapshot = NewSnapshot {
601            snapshot_id: ailake_catalog::new_snapshot_id(),
602            parent_snapshot_id: None,
603            files: vec![merged.clone()],
604            operation: SnapshotOperation::Replace,
605            iceberg_schema: None,
606            extra_properties: std::collections::HashMap::new(),
607            bloom_filters: vec![],
608            equality_delete_files: vec![],
609        };
610        catalog.commit_snapshot(table, snapshot).await?;
611
612        info!(
613            "ailake: compaction committed (deferred) — merged {} files into {} \
614             (index building in background)",
615            to_compact.len(),
616            output_path
617        );
618
619        delete_old_files(&self.store, &to_compact).await;
620
621        Ok(Some(merged))
622    }
623}
624
625async fn delete_old_files(store: &Arc<dyn Store>, files: &[DataFileEntry]) {
626    for entry in files {
627        if let Err(e) = store.delete(&entry.path).await {
628            error!(
629                "ailake: compaction cleanup failed — could not delete {}: {} \
630                 (orphan file in object store after successful catalog commit; \
631                 delete manually to reclaim storage)",
632                entry.path, e
633            );
634        }
635    }
636}
637
638fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
639    arrow_select::concat::concat_batches(&schema, batches)
640        .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646    use ailake_catalog::IndexStatus;
647
648    #[test]
649    fn plan_returns_empty_if_too_few_files() {
650        let planner = CompactionPlanner::new(CompactionConfig {
651            min_files_to_compact: 4,
652            target_file_size_bytes: 1024 * 1024,
653            ..Default::default()
654        });
655        let files: Vec<DataFileEntry> = (0..3)
656            .map(|i| DataFileEntry {
657                path: format!("file-{i}.parquet"),
658                record_count: 10,
659                file_size_bytes: 100,
660                centroid_b64: None,
661                radius: None,
662                hnsw_offset: None,
663                hnsw_len: None,
664                vector_column: None,
665                vector_dim: None,
666                extra_vector_indexes: vec![],
667                index_status: IndexStatus::Ready,
668                batch_id: None,
669                embedding_model: None,
670                partition_value: None,
671                deletion_vector: None,
672                first_row_id: None,
673            })
674            .collect();
675        assert!(planner.plan(&files).is_empty());
676    }
677
678    #[test]
679    fn plan_selects_small_files() {
680        let planner = CompactionPlanner::new(CompactionConfig {
681            min_files_to_compact: 2,
682            target_file_size_bytes: 1000,
683            ..Default::default()
684        });
685        let files = vec![
686            DataFileEntry {
687                path: "small.parquet".into(),
688                record_count: 5,
689                file_size_bytes: 500,
690                centroid_b64: None,
691                radius: None,
692                hnsw_offset: None,
693                hnsw_len: None,
694                vector_column: None,
695                vector_dim: None,
696                extra_vector_indexes: vec![],
697                index_status: IndexStatus::Ready,
698                batch_id: None,
699                embedding_model: None,
700                partition_value: None,
701                deletion_vector: None,
702                first_row_id: None,
703            },
704            DataFileEntry {
705                path: "large.parquet".into(),
706                record_count: 5000,
707                file_size_bytes: 200_000_000,
708                centroid_b64: None,
709                radius: None,
710                hnsw_offset: None,
711                hnsw_len: None,
712                vector_column: None,
713                vector_dim: None,
714                extra_vector_indexes: vec![],
715                index_status: IndexStatus::Ready,
716                batch_id: None,
717                embedding_model: None,
718                partition_value: None,
719                deletion_vector: None,
720                first_row_id: None,
721            },
722            DataFileEntry {
723                path: "also-small.parquet".into(),
724                record_count: 5,
725                file_size_bytes: 800,
726                centroid_b64: None,
727                radius: None,
728                hnsw_offset: None,
729                hnsw_len: None,
730                vector_column: None,
731                vector_dim: None,
732                extra_vector_indexes: vec![],
733                index_status: IndexStatus::Ready,
734                batch_id: None,
735                embedding_model: None,
736                partition_value: None,
737                deletion_vector: None,
738                first_row_id: None,
739            },
740        ];
741        let selected = planner.plan(&files);
742        assert_eq!(selected.len(), 2);
743        assert!(selected.iter().any(|f| f.path == "small.parquet"));
744        assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
745    }
746
747    #[test]
748    fn plan_respects_max_files_per_pass() {
749        let planner = CompactionPlanner::new(CompactionConfig {
750            min_files_to_compact: 2,
751            target_file_size_bytes: 1_000_000,
752            max_files_per_pass: 3,
753            ..Default::default()
754        });
755        let files: Vec<DataFileEntry> = (0..5)
756            .map(|i| DataFileEntry {
757                path: format!("f{i}.parquet"),
758                record_count: 10,
759                file_size_bytes: 100 + i as u64 * 100,
760                centroid_b64: None,
761                radius: None,
762                hnsw_offset: None,
763                hnsw_len: None,
764                vector_column: None,
765                vector_dim: None,
766                extra_vector_indexes: vec![],
767                index_status: IndexStatus::Ready,
768                batch_id: None,
769                embedding_model: None,
770                partition_value: None,
771                deletion_vector: None,
772                first_row_id: None,
773            })
774            .collect();
775        let selected = planner.plan(&files);
776        assert_eq!(selected.len(), 3);
777        assert_eq!(selected[0].file_size_bytes, 100);
778        assert_eq!(selected[1].file_size_bytes, 200);
779        assert_eq!(selected[2].file_size_bytes, 300);
780    }
781
782    #[test]
783    fn plan_sorts_smallest_first() {
784        let planner = CompactionPlanner::new(CompactionConfig {
785            min_files_to_compact: 2,
786            target_file_size_bytes: 10_000,
787            max_files_per_pass: 4,
788            ..Default::default()
789        });
790        let files = vec![
791            DataFileEntry {
792                path: "c.parquet".into(),
793                record_count: 1,
794                file_size_bytes: 300,
795                centroid_b64: None,
796                radius: None,
797                hnsw_offset: None,
798                hnsw_len: None,
799                vector_column: None,
800                vector_dim: None,
801                extra_vector_indexes: vec![],
802                index_status: IndexStatus::Ready,
803                batch_id: None,
804                embedding_model: None,
805                partition_value: None,
806                deletion_vector: None,
807                first_row_id: None,
808            },
809            DataFileEntry {
810                path: "a.parquet".into(),
811                record_count: 1,
812                file_size_bytes: 100,
813                centroid_b64: None,
814                radius: None,
815                hnsw_offset: None,
816                hnsw_len: None,
817                vector_column: None,
818                vector_dim: None,
819                extra_vector_indexes: vec![],
820                index_status: IndexStatus::Ready,
821                batch_id: None,
822                embedding_model: None,
823                partition_value: None,
824                deletion_vector: None,
825                first_row_id: None,
826            },
827            DataFileEntry {
828                path: "b.parquet".into(),
829                record_count: 1,
830                file_size_bytes: 200,
831                centroid_b64: None,
832                radius: None,
833                hnsw_offset: None,
834                hnsw_len: None,
835                vector_column: None,
836                vector_dim: None,
837                extra_vector_indexes: vec![],
838                index_status: IndexStatus::Ready,
839                batch_id: None,
840                embedding_model: None,
841                partition_value: None,
842                deletion_vector: None,
843                first_row_id: None,
844            },
845        ];
846        let selected = planner.plan(&files);
847        assert_eq!(selected[0].file_size_bytes, 100);
848        assert_eq!(selected[1].file_size_bytes, 200);
849        assert_eq!(selected[2].file_size_bytes, 300);
850    }
851
852    #[tokio::test]
853    async fn compact_merges_two_files() {
854        use ailake_core::{VectorMetric, VectorPrecision};
855        use ailake_store::LocalStore;
856        use arrow_array::{Int32Array, RecordBatch};
857        use arrow_schema::{DataType, Field, Schema};
858        use std::sync::Arc;
859        use tempfile::TempDir;
860
861        let dir = TempDir::new().unwrap();
862        let store = Arc::new(LocalStore::new(dir.path()));
863        let policy = VectorStoragePolicy {
864            column_name: "embedding".into(),
865            dim: 4,
866            metric: VectorMetric::Cosine,
867            precision: VectorPrecision::F16,
868            pq: None,
869            keep_raw_for_reranking: true,
870            pre_normalize: false,
871            hnsw_m: None,
872            hnsw_ef_construction: None,
873            ivf_residual: false,
874            embedding_model: None,
875            modality: None,
876            partition_by: None,
877            partition_value: None,
878            partition_column_type: None,
879            partition_fields: vec![],
880        };
881
882        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
883        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
884        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
885
886        let batch_a = RecordBatch::try_new(
887            schema.clone(),
888            vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
889        )
890        .unwrap();
891        let batch_b = RecordBatch::try_new(
892            schema.clone(),
893            vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
894        )
895        .unwrap();
896
897        let writer_a = AilakeFileWriter::new(policy.clone());
898        let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
899        let writer_b = AilakeFileWriter::new(policy.clone());
900        let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
901
902        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
903        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
904
905        let entries = vec![
906            DataFileEntry {
907                path: "data/a.parquet".into(),
908                record_count: 2,
909                file_size_bytes: bytes_a.len() as u64,
910                centroid_b64: None,
911                radius: None,
912                hnsw_offset: None,
913                hnsw_len: None,
914                vector_column: None,
915                vector_dim: None,
916                extra_vector_indexes: vec![],
917                index_status: IndexStatus::Ready,
918                batch_id: None,
919                embedding_model: None,
920                partition_value: None,
921                deletion_vector: None,
922                first_row_id: None,
923            },
924            DataFileEntry {
925                path: "data/b.parquet".into(),
926                record_count: 2,
927                file_size_bytes: bytes_b.len() as u64,
928                centroid_b64: None,
929                radius: None,
930                hnsw_offset: None,
931                hnsw_len: None,
932                vector_column: None,
933                vector_dim: None,
934                extra_vector_indexes: vec![],
935                index_status: IndexStatus::Ready,
936                batch_id: None,
937                embedding_model: None,
938                partition_value: None,
939                deletion_vector: None,
940                first_row_id: None,
941            },
942        ];
943
944        let executor = CompactionExecutor::new(store.clone(), policy.clone());
945        let merged = executor
946            .compact(&entries, "data/merged.parquet")
947            .await
948            .unwrap();
949
950        assert_eq!(merged.record_count, 4);
951        assert_eq!(merged.path, "data/merged.parquet");
952
953        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
954        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
955        reader.verify_integrity().unwrap();
956        let (batch, embs) = reader.read_parquet().unwrap();
957        assert_eq!(batch.num_rows(), 4);
958        assert_eq!(embs.len(), 4);
959    }
960
961    #[tokio::test]
962    async fn compact_incremental_merges_dominant_plus_small() {
963        use ailake_core::{RowId, VectorMetric, VectorPrecision};
964        use ailake_store::LocalStore;
965        use arrow_array::{Int32Array, RecordBatch};
966        use arrow_schema::{DataType, Field, Schema};
967        use std::sync::Arc;
968        use tempfile::TempDir;
969
970        let dir = TempDir::new().unwrap();
971        let store = Arc::new(LocalStore::new(dir.path()));
972        let policy = VectorStoragePolicy {
973            column_name: "embedding".into(),
974            dim: 4,
975            metric: VectorMetric::Cosine,
976            precision: VectorPrecision::F16,
977            pq: None,
978            keep_raw_for_reranking: true,
979            pre_normalize: false,
980            hnsw_m: None,
981            hnsw_ef_construction: None,
982            ivf_residual: false,
983            embedding_model: None,
984            modality: None,
985            partition_by: None,
986            partition_value: None,
987            partition_column_type: None,
988            partition_fields: vec![],
989        };
990
991        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
992
993        // Dominant file: 6 rows (75% of total 8 rows — above 40% threshold).
994        let embs_dom: Vec<Vec<f32>> = vec![
995            vec![1.0, 0.0, 0.0, 0.0],
996            vec![0.0, 1.0, 0.0, 0.0],
997            vec![0.0, 0.0, 1.0, 0.0],
998            vec![0.7, 0.7, 0.0, 0.0],
999            vec![0.0, 0.7, 0.7, 0.0],
1000            vec![0.0, 0.0, 0.7, 0.7],
1001        ];
1002        let batch_dom = RecordBatch::try_new(
1003            schema.clone(),
1004            vec![Arc::new(Int32Array::from(vec![0i32, 1, 2, 3, 4, 5]))],
1005        )
1006        .unwrap();
1007
1008        // Small file: 2 rows.
1009        let embs_small: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 0.0, 1.0], vec![0.5, 0.5, 0.5, 0.5]];
1010        let batch_small = RecordBatch::try_new(
1011            schema.clone(),
1012            vec![Arc::new(Int32Array::from(vec![6i32, 7]))],
1013        )
1014        .unwrap();
1015
1016        let bytes_dom = AilakeFileWriter::new(policy.clone())
1017            .write(&batch_dom, &embs_dom)
1018            .unwrap();
1019        let bytes_small = AilakeFileWriter::new(policy.clone())
1020            .write(&batch_small, &embs_small)
1021            .unwrap();
1022
1023        store
1024            .put("data/dominant.parquet", bytes_dom.clone())
1025            .await
1026            .unwrap();
1027        store
1028            .put("data/small.parquet", bytes_small.clone())
1029            .await
1030            .unwrap();
1031
1032        let entries = vec![
1033            DataFileEntry {
1034                path: "data/dominant.parquet".into(),
1035                record_count: 6,
1036                file_size_bytes: bytes_dom.len() as u64,
1037                centroid_b64: None,
1038                radius: None,
1039                hnsw_offset: None,
1040                hnsw_len: None,
1041                vector_column: None,
1042                vector_dim: None,
1043                extra_vector_indexes: vec![],
1044                index_status: IndexStatus::Ready,
1045                batch_id: None,
1046                embedding_model: None,
1047                partition_value: None,
1048                deletion_vector: None,
1049                first_row_id: None,
1050            },
1051            DataFileEntry {
1052                path: "data/small.parquet".into(),
1053                record_count: 2,
1054                file_size_bytes: bytes_small.len() as u64,
1055                centroid_b64: None,
1056                radius: None,
1057                hnsw_offset: None,
1058                hnsw_len: None,
1059                vector_column: None,
1060                vector_dim: None,
1061                extra_vector_indexes: vec![],
1062                index_status: IndexStatus::Ready,
1063                batch_id: None,
1064                embedding_model: None,
1065                partition_value: None,
1066                deletion_vector: None,
1067                first_row_id: None,
1068            },
1069        ];
1070
1071        let executor = CompactionExecutor::new(store.clone(), policy.clone());
1072        let merged = executor
1073            .compact_incremental(&entries, "data/merged.parquet")
1074            .await
1075            .unwrap();
1076
1077        // Structural checks.
1078        assert_eq!(merged.record_count, 8);
1079        assert_eq!(merged.path, "data/merged.parquet");
1080
1081        // Load merged file and verify it's a valid AI-Lake file.
1082        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1083        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
1084        reader.verify_integrity().unwrap();
1085
1086        let (batch, embs) = reader.read_parquet().unwrap();
1087        assert_eq!(batch.num_rows(), 8);
1088        assert_eq!(embs.len(), 8);
1089
1090        // Dominant rows must come first (positions 0..5).
1091        for f in &embs[..6] {
1092            assert_eq!(f.len(), 4);
1093        }
1094
1095        // HNSW must be searchable and return the nearest neighbor for a known query.
1096        let hnsw = reader.load_index().unwrap();
1097        assert_eq!(hnsw.node_count(), 8);
1098
1099        // Query [1, 0, 0, 0] → nearest should be RowId 0 (embs_dom[0]).
1100        let results = hnsw.search(&[1.0, 0.0, 0.0, 0.0], 1, 50);
1101        assert_eq!(results[0].0, RowId::new(0));
1102
1103        // Query [0, 0, 0, 1] → nearest should be RowId 6 (first row of small file,
1104        // inserted at position 6 in the merged file).
1105        let results = hnsw.search(&[0.0, 0.0, 0.0, 1.0], 1, 50);
1106        assert_eq!(results[0].0, RowId::new(6));
1107    }
1108
1109    #[tokio::test]
1110    async fn compact_incremental_falls_back_when_no_dominant() {
1111        use ailake_core::{VectorMetric, VectorPrecision};
1112        use ailake_store::LocalStore;
1113        use arrow_array::{Int32Array, RecordBatch};
1114        use arrow_schema::{DataType, Field, Schema};
1115        use std::sync::Arc;
1116        use tempfile::TempDir;
1117
1118        let dir = TempDir::new().unwrap();
1119        let store = Arc::new(LocalStore::new(dir.path()));
1120        let policy = VectorStoragePolicy {
1121            column_name: "embedding".into(),
1122            dim: 4,
1123            metric: VectorMetric::Cosine,
1124            precision: VectorPrecision::F16,
1125            pq: None,
1126            keep_raw_for_reranking: true,
1127            pre_normalize: false,
1128            hnsw_m: None,
1129            hnsw_ef_construction: None,
1130            ivf_residual: false,
1131            embedding_model: None,
1132            modality: None,
1133            partition_by: None,
1134            partition_value: None,
1135            partition_column_type: None,
1136            partition_fields: vec![],
1137        };
1138
1139        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1140
1141        // Two equal-sized files (50/50 split — no dominant, both below 40% threshold).
1142        let make_batch = |ids: Vec<i32>, embs: Vec<Vec<f32>>| {
1143            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(ids))])
1144                .unwrap();
1145            AilakeFileWriter::new(policy.clone())
1146                .write(&batch, &embs)
1147                .unwrap()
1148        };
1149
1150        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1151        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
1152        let bytes_a = make_batch(vec![0, 1], embs_a);
1153        let bytes_b = make_batch(vec![2, 3], embs_b);
1154
1155        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
1156        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
1157
1158        let entries = vec![
1159            DataFileEntry {
1160                path: "data/a.parquet".into(),
1161                record_count: 2,
1162                file_size_bytes: bytes_a.len() as u64,
1163                centroid_b64: None,
1164                radius: None,
1165                hnsw_offset: None,
1166                hnsw_len: None,
1167                vector_column: None,
1168                vector_dim: None,
1169                extra_vector_indexes: vec![],
1170                index_status: IndexStatus::Ready,
1171                batch_id: None,
1172                embedding_model: None,
1173                partition_value: None,
1174                deletion_vector: None,
1175                first_row_id: None,
1176            },
1177            DataFileEntry {
1178                path: "data/b.parquet".into(),
1179                record_count: 2,
1180                file_size_bytes: bytes_b.len() as u64,
1181                centroid_b64: None,
1182                radius: None,
1183                hnsw_offset: None,
1184                hnsw_len: None,
1185                vector_column: None,
1186                vector_dim: None,
1187                extra_vector_indexes: vec![],
1188                index_status: IndexStatus::Ready,
1189                batch_id: None,
1190                embedding_model: None,
1191                partition_value: None,
1192                deletion_vector: None,
1193                first_row_id: None,
1194            },
1195        ];
1196
1197        let executor = CompactionExecutor::new(store.clone(), policy.clone());
1198        // Should fall back to full rebuild without error.
1199        let merged = executor
1200            .compact_incremental(&entries, "data/merged.parquet")
1201            .await
1202            .unwrap();
1203
1204        assert_eq!(merged.record_count, 4);
1205
1206        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1207        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
1208        reader.verify_integrity().unwrap();
1209    }
1210
1211    #[tokio::test]
1212    async fn compact_deferred_produces_parquet_only_file() {
1213        use ailake_catalog::HadoopCatalog;
1214        use ailake_core::{VectorMetric, VectorPrecision};
1215        use ailake_store::LocalStore;
1216        use arrow_array::{Int32Array, RecordBatch};
1217        use arrow_schema::{DataType, Field, Schema};
1218        use std::sync::Arc;
1219        use tempfile::TempDir;
1220
1221        let dir = TempDir::new().unwrap();
1222        let store = Arc::new(LocalStore::new(dir.path()));
1223        let catalog_dir = TempDir::new().unwrap();
1224        let catalog_store = Arc::new(LocalStore::new(catalog_dir.path()));
1225        let catalog = Arc::new(HadoopCatalog::new(catalog_store, ""));
1226        let table = TableIdent {
1227            namespace: "ns".into(),
1228            name: "tbl".into(),
1229        };
1230
1231        let policy = VectorStoragePolicy {
1232            column_name: "embedding".into(),
1233            dim: 4,
1234            metric: VectorMetric::Cosine,
1235            precision: VectorPrecision::F16,
1236            pq: None,
1237            keep_raw_for_reranking: true,
1238            pre_normalize: false,
1239            hnsw_m: None,
1240            hnsw_ef_construction: None,
1241            ivf_residual: false,
1242            embedding_model: None,
1243            modality: None,
1244            partition_by: None,
1245            partition_value: None,
1246            partition_column_type: None,
1247            partition_fields: vec![],
1248        };
1249
1250        use ailake_catalog::TableProperties;
1251        catalog
1252            .create_table(
1253                &table,
1254                &TableProperties {
1255                    policy: policy.clone(),
1256                    extra: std::collections::HashMap::new(),
1257                    format_version: 2,
1258                    partition_column_type: None,
1259                },
1260            )
1261            .await
1262            .unwrap();
1263
1264        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1265        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1266        let batch_a = RecordBatch::try_new(
1267            schema.clone(),
1268            vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
1269        )
1270        .unwrap();
1271        let bytes_a = AilakeFileWriter::new(policy.clone())
1272            .write(&batch_a, &embs_a)
1273            .unwrap();
1274        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
1275
1276        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
1277        let batch_b = RecordBatch::try_new(
1278            schema.clone(),
1279            vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
1280        )
1281        .unwrap();
1282        let bytes_b = AilakeFileWriter::new(policy.clone())
1283            .write(&batch_b, &embs_b)
1284            .unwrap();
1285        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
1286
1287        let entries = vec![
1288            DataFileEntry {
1289                path: "data/a.parquet".into(),
1290                record_count: 2,
1291                file_size_bytes: bytes_a.len() as u64,
1292                centroid_b64: None,
1293                radius: None,
1294                hnsw_offset: None,
1295                hnsw_len: None,
1296                vector_column: None,
1297                vector_dim: None,
1298                extra_vector_indexes: vec![],
1299                index_status: IndexStatus::Ready,
1300                batch_id: None,
1301                embedding_model: None,
1302                partition_value: None,
1303                deletion_vector: None,
1304                first_row_id: None,
1305            },
1306            DataFileEntry {
1307                path: "data/b.parquet".into(),
1308                record_count: 2,
1309                file_size_bytes: bytes_b.len() as u64,
1310                centroid_b64: None,
1311                radius: None,
1312                hnsw_offset: None,
1313                hnsw_len: None,
1314                vector_column: None,
1315                vector_dim: None,
1316                extra_vector_indexes: vec![],
1317                index_status: IndexStatus::Ready,
1318                batch_id: None,
1319                embedding_model: None,
1320                partition_value: None,
1321                deletion_vector: None,
1322                first_row_id: None,
1323            },
1324        ];
1325
1326        let executor = CompactionExecutor::new(store.clone(), policy.clone());
1327        let entry = executor
1328            .compact_deferred(&entries, "data/merged.parquet", catalog.clone(), &table)
1329            .await
1330            .unwrap();
1331
1332        // Entry is Indexing — HNSW build pending in background
1333        assert_eq!(entry.index_status, IndexStatus::Indexing);
1334        assert_eq!(entry.record_count, 4);
1335
1336        // The written file must be valid Parquet (readable) even without HNSW
1337        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1338        let pq_reader = ailake_parquet::ParquetVectorReader::new(merged_bytes, "embedding");
1339        let count = pq_reader.record_count().unwrap();
1340        assert_eq!(count, 4);
1341    }
1342}