1use std::sync::Arc;
3use tracing::{debug, error, info};
4
5use ailake_catalog::{
6 make_data_file_entry, make_data_file_entry_indexing, CatalogProvider, DataFileEntry,
7 NewSnapshot, SnapshotOperation, TableIdent, VectorIndexInfo,
8};
9use ailake_core::{AilakeResult, RowId, VectorStoragePolicy};
10use ailake_file::{AilakeFileReader, AilakeFileWriter};
11use ailake_store::Store;
12use ailake_vec::compute_centroid_and_radius;
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use bytes::Bytes;
16use futures::future::try_join_all;
17
18use crate::writer::build_and_patch_index;
19
20#[derive(Debug, Clone, Default)]
22pub enum CompactionIndexStrategy {
23 #[default]
26 Auto,
27 ForceHnsw,
29 ForceIvfPq,
34}
35
36#[derive(Debug, Clone)]
37pub struct CompactionConfig {
38 pub min_files_to_compact: usize,
40 pub target_file_size_bytes: u64,
42 pub index_strategy: CompactionIndexStrategy,
44 pub max_files_per_pass: usize,
51}
52
53impl Default for CompactionConfig {
54 fn default() -> Self {
55 Self {
56 min_files_to_compact: 4,
57 target_file_size_bytes: 128 * 1024 * 1024, index_strategy: CompactionIndexStrategy::Auto,
59 max_files_per_pass: 20,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy)]
65pub enum CompactionMode {
66 Full, Partial, }
69
70pub struct CompactionPlanner {
71 config: CompactionConfig,
72}
73
74impl CompactionPlanner {
75 pub fn new(config: CompactionConfig) -> Self {
76 Self { config }
77 }
78
79 pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
86 let mut candidates: Vec<DataFileEntry> = files
87 .iter()
88 .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
89 .cloned()
90 .collect();
91 if candidates.len() < self.config.min_files_to_compact {
92 debug!(
93 "ailake: compaction skipped — {} eligible files < min_files_to_compact={}",
94 candidates.len(),
95 self.config.min_files_to_compact
96 );
97 return vec![];
98 }
99 candidates.sort_unstable_by_key(|f| f.file_size_bytes);
102 candidates.truncate(self.config.max_files_per_pass);
103 let total_bytes: u64 = candidates.iter().map(|f| f.file_size_bytes).sum();
104 info!(
105 "ailake: compaction plan — {} files ({} bytes) → 1 merged file",
106 candidates.len(),
107 total_bytes
108 );
109 candidates
110 }
111}
112
113pub struct CompactionExecutor {
124 store: Arc<dyn Store>,
125 policy: VectorStoragePolicy,
126 index_strategy: CompactionIndexStrategy,
127}
128
129impl CompactionExecutor {
130 pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
131 Self {
132 store,
133 policy,
134 index_strategy: CompactionIndexStrategy::Auto,
135 }
136 }
137
138 pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
140 self.index_strategy = strategy;
141 self
142 }
143
144 async fn read_files_parallel(
146 &self,
147 files: &[DataFileEntry],
148 ) -> AilakeResult<Vec<(RecordBatch, Vec<Vec<f32>>)>> {
149 let futs = files.iter().map(|entry| {
150 let store = self.store.clone();
151 let path = entry.path.clone();
152 let column = self.policy.column_name.clone();
153 let dim = self.policy.dim;
154 async move {
155 let bytes: Bytes = store.get(&path).await?;
156 let reader = AilakeFileReader::new(bytes, &column, dim);
157 if !reader.is_ailake_file() {
158 debug!("ailake: compaction skipping {} — not an AI-Lake file", path);
159 return Ok::<Option<(RecordBatch, Vec<Vec<f32>>)>, ailake_core::AilakeError>(
160 None,
161 );
162 }
163 let pair = reader.read_parquet()?;
164 Ok(Some(pair))
165 }
166 });
167 let results = try_join_all(futs).await?;
168 Ok(results.into_iter().flatten().collect())
169 }
170
171 pub async fn compact(
180 &self,
181 files: &[DataFileEntry],
182 output_path: &str,
183 ) -> AilakeResult<DataFileEntry> {
184 if files.is_empty() {
185 return Err(ailake_core::AilakeError::Catalog(
186 "compact: no files provided".into(),
187 ));
188 }
189
190 let pairs = self.read_files_parallel(files).await?;
191
192 if pairs.is_empty() {
193 return Err(ailake_core::AilakeError::Catalog(
194 "compact: no valid AI-Lake files in input".into(),
195 ));
196 }
197
198 let schema: SchemaRef = pairs[0].0.schema();
199 let (all_batches, all_embeddings): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
200 let all_embeddings: Vec<Vec<f32>> = all_embeddings.into_iter().flatten().collect();
201
202 let merged_batch = concat_batches(schema, &all_batches)?;
204 let record_count = merged_batch.num_rows() as u64;
205
206 let writer = {
208 let base = AilakeFileWriter::new(self.policy.clone());
209 match &self.index_strategy {
210 CompactionIndexStrategy::Auto => base.with_auto_index(),
211 CompactionIndexStrategy::ForceHnsw => base,
212 CompactionIndexStrategy::ForceIvfPq => {
213 let cfg = ailake_index::IvfPqConfig::for_dataset(
214 self.policy.dim as usize,
215 all_embeddings.len(),
216 );
217 base.with_ivf_pq(cfg)
218 }
219 }
220 };
221 let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
222 let file_size = file_bytes.len() as u64;
223 self.store.put(output_path, file_bytes.clone()).await?;
224
225 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
227 let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
228 let header = reader.read_header()?;
229 let ailk_start = reader.ailk_offset()?;
230
231 let source_first_row_id = files.iter().filter_map(|f| f.first_row_id).min();
234
235 let mut entry = make_data_file_entry(
236 output_path,
237 record_count,
238 file_size,
239 ¢roid,
240 VectorIndexInfo {
241 column: &self.policy.column_name,
242 dim: self.policy.dim,
243 hnsw_offset: ailk_start + header.hnsw_offset,
244 hnsw_len: header.hnsw_len,
245 },
246 );
247 entry.first_row_id = source_first_row_id;
248 Ok(entry)
249 }
250
251 pub async fn compact_incremental(
270 &self,
271 files: &[DataFileEntry],
272 output_path: &str,
273 ) -> AilakeResult<DataFileEntry> {
274 const DOMINANT_RATIO: f64 = 0.40;
275
276 if files.is_empty() {
277 return Err(ailake_core::AilakeError::Catalog(
278 "compact_incremental: no files provided".into(),
279 ));
280 }
281
282 let total_rows: u64 = files.iter().map(|f| f.record_count).sum();
284 let dom_idx = files
285 .iter()
286 .enumerate()
287 .max_by_key(|(_, f)| f.record_count)
288 .map(|(i, _)| i)
289 .unwrap_or(0);
290 let dom_rows = files[dom_idx].record_count;
291
292 if (dom_rows as f64 / total_rows as f64) < DOMINANT_RATIO {
293 debug!(
294 "ailake: compact_incremental — no dominant file ({}/{} rows < {:.0}% threshold), \
295 falling back to full rebuild",
296 dom_rows,
297 total_rows,
298 DOMINANT_RATIO * 100.0
299 );
300 return self.compact(files, output_path).await;
301 }
302
303 let column = self.policy.column_name.clone();
304 let dim = self.policy.dim;
305 let dom_path = files[dom_idx].path.clone();
306
307 let futs: Vec<_> = files
310 .iter()
311 .map(|entry| {
312 let store = self.store.clone();
313 let path = entry.path.clone();
314 let col = column.clone();
315 let is_dom = path == dom_path;
316 async move {
317 let bytes: Bytes = store.get(&path).await?;
318 let reader = AilakeFileReader::new(bytes.clone(), &col, dim);
319 if !reader.is_ailake_file() {
320 debug!(
321 "ailake: compact_incremental skipping {} — not an AI-Lake file",
322 path
323 );
324 return Ok::<
325 Option<(RecordBatch, Vec<Vec<f32>>, bool, Option<Bytes>)>,
326 ailake_core::AilakeError,
327 >(None);
328 }
329 let (batch, vecs) = reader.read_parquet()?;
330 let retained = if is_dom { Some(bytes) } else { None };
331 Ok(Some((batch, vecs, is_dom, retained)))
332 }
333 })
334 .collect();
335
336 #[allow(clippy::type_complexity)]
337 let raw: Vec<(RecordBatch, Vec<Vec<f32>>, bool, Option<Bytes>)> =
338 try_join_all(futs).await?.into_iter().flatten().collect();
339
340 if raw.is_empty() {
341 return Err(ailake_core::AilakeError::Catalog(
342 "compact_incremental: no valid AI-Lake files in input".into(),
343 ));
344 }
345
346 let mut dom_batch: Option<RecordBatch> = None;
348 let mut dom_vecs: Vec<Vec<f32>> = Vec::new();
349 let mut dom_bytes_found: Option<Bytes> = None;
350 let mut other_batches: Vec<RecordBatch> = Vec::new();
351 let mut other_vecs: Vec<Vec<f32>> = Vec::new();
352
353 for (batch, vecs, is_dom, retained) in raw {
354 if is_dom {
355 dom_batch = Some(batch);
356 dom_vecs = vecs;
357 dom_bytes_found = retained;
358 } else {
359 other_batches.push(batch);
360 other_vecs.extend(vecs);
361 }
362 }
363
364 let (dom_batch, dom_bytes) = match (dom_batch, dom_bytes_found) {
365 (Some(b), Some(byt)) => (b, byt),
366 _ => {
367 debug!(
368 "ailake: compact_incremental — dominant file missing from read results, \
369 falling back to full rebuild"
370 );
371 return self.compact(files, output_path).await;
372 }
373 };
374
375 let dom_reader = AilakeFileReader::new(dom_bytes, &column, dim);
377 let mut hnsw = match dom_reader.load_index() {
378 Ok(idx) => idx,
379 Err(e) => {
380 debug!(
381 "ailake: compact_incremental — cannot load dominant HNSW ({}), \
382 falling back to full rebuild",
383 e
384 );
385 return self.compact(files, output_path).await;
386 }
387 };
388
389 let dom_count = dom_batch.num_rows() as u64;
390
391 for (j, vec) in other_vecs.iter().enumerate() {
394 hnsw.insert_node(RowId::new(dom_count + j as u64), vec.clone());
395 }
396 hnsw.quantize_to_f16();
397
398 let schema: SchemaRef = dom_batch.schema();
400 let mut all_batches = vec![dom_batch];
401 all_batches.extend(other_batches);
402 let merged_batch = concat_batches(schema, &all_batches)?;
403 let record_count = merged_batch.num_rows() as u64;
404
405 let mut all_embeddings = dom_vecs;
406 all_embeddings.extend(other_vecs);
407
408 let writer = AilakeFileWriter::new(self.policy.clone());
410 let file_bytes = writer.write_with_prebuilt_hnsw(&merged_batch, &all_embeddings, &hnsw)?;
411 let file_size = file_bytes.len() as u64;
412 self.store.put(output_path, file_bytes.clone()).await?;
413
414 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
415 let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
416 let header = reader.read_header()?;
417 let ailk_start = reader.ailk_offset()?;
418
419 let source_first_row_id = files[dom_idx].first_row_id;
423
424 let mut entry = make_data_file_entry(
425 output_path,
426 record_count,
427 file_size,
428 ¢roid,
429 VectorIndexInfo {
430 column: &self.policy.column_name,
431 dim: self.policy.dim,
432 hnsw_offset: ailk_start + header.hnsw_offset,
433 hnsw_len: header.hnsw_len,
434 },
435 );
436 entry.first_row_id = source_first_row_id;
437
438 info!(
439 "ailake: compact_incremental — merged {} files into {} \
440 ({} rows from dominant + {} inserted incrementally)",
441 files.len(),
442 output_path,
443 dom_count,
444 record_count - dom_count
445 );
446
447 Ok(entry)
448 }
449
450 pub async fn compact_deferred(
460 &self,
461 files: &[DataFileEntry],
462 output_path: &str,
463 catalog: Arc<dyn CatalogProvider>,
464 table: &TableIdent,
465 ) -> AilakeResult<DataFileEntry> {
466 if files.is_empty() {
467 return Err(ailake_core::AilakeError::Catalog(
468 "compact_deferred: no files provided".into(),
469 ));
470 }
471
472 let pairs = self.read_files_parallel(files).await?;
473
474 if pairs.is_empty() {
475 return Err(ailake_core::AilakeError::Catalog(
476 "compact_deferred: no valid AI-Lake files in input".into(),
477 ));
478 }
479
480 let schema: SchemaRef = pairs[0].0.schema();
481 let (all_batches, all_embeddings): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
482 let all_embeddings: Vec<Vec<f32>> = all_embeddings.into_iter().flatten().collect();
483
484 let merged_batch = concat_batches(schema, &all_batches)?;
485 let record_count = merged_batch.num_rows() as u64;
486
487 let file_writer = AilakeFileWriter::new(self.policy.clone());
489 let parquet_bytes = file_writer.write_parquet_only(&merged_batch, &all_embeddings)?;
490 let file_size = parquet_bytes.len() as u64;
491 self.store.put(output_path, parquet_bytes).await?;
492
493 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
495 let source_first_row_id = files.iter().filter_map(|f| f.first_row_id).min();
496 let mut entry = make_data_file_entry_indexing(
497 output_path,
498 record_count,
499 file_size,
500 ¢roid,
501 &self.policy.column_name,
502 self.policy.dim,
503 );
504 entry.first_row_id = source_first_row_id;
505
506 let store = self.store.clone();
508 let policy = self.policy.clone();
509 let table_id = table.clone();
510 let fp = output_path.to_string();
511 tokio::spawn(async move {
512 if let Err(e) = build_and_patch_index(store, catalog, policy, table_id, fp).await {
513 error!(
514 "ailake: compaction deferred HNSW build failed — file indexed as \
515 Parquet-only until next compaction rebuilds the index: {}",
516 e
517 );
518 }
519 });
520
521 Ok(entry)
522 }
523
524 pub async fn run(
527 &self,
528 planner: &CompactionPlanner,
529 table: &TableIdent,
530 catalog: Arc<dyn CatalogProvider>,
531 output_prefix: &str,
532 ) -> AilakeResult<Option<DataFileEntry>> {
533 let all_files = catalog.list_files(table, None).await?;
534 let to_compact = planner.plan(&all_files);
535 if to_compact.is_empty() {
536 return Ok(None);
537 }
538
539 let ts = std::time::SystemTime::now()
540 .duration_since(std::time::UNIX_EPOCH)
541 .unwrap_or_else(|e| e.duration())
542 .as_millis();
543 let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
544
545 let merged = self.compact_incremental(&to_compact, &output_path).await?;
547
548 let snapshot = NewSnapshot {
550 snapshot_id: ailake_catalog::new_snapshot_id(),
551 parent_snapshot_id: None,
552 files: vec![merged.clone()],
553 operation: SnapshotOperation::Replace,
554 iceberg_schema: None,
555 extra_properties: std::collections::HashMap::new(),
556 bloom_filters: vec![],
557 equality_delete_files: vec![],
558 };
559 catalog.commit_snapshot(table, snapshot).await?;
560
561 info!(
562 "ailake: compaction committed — merged {} files into {}",
563 to_compact.len(),
564 output_path
565 );
566
567 delete_old_files(&self.store, &to_compact).await;
568
569 Ok(Some(merged))
570 }
571
572 pub async fn run_deferred(
577 &self,
578 planner: &CompactionPlanner,
579 table: &TableIdent,
580 catalog: Arc<dyn CatalogProvider>,
581 output_prefix: &str,
582 ) -> AilakeResult<Option<DataFileEntry>> {
583 let all_files = catalog.list_files(table, None).await?;
584 let to_compact = planner.plan(&all_files);
585 if to_compact.is_empty() {
586 return Ok(None);
587 }
588
589 let ts = std::time::SystemTime::now()
590 .duration_since(std::time::UNIX_EPOCH)
591 .unwrap_or_else(|e| e.duration())
592 .as_millis();
593 let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
594
595 let merged = self
596 .compact_deferred(&to_compact, &output_path, catalog.clone(), table)
597 .await?;
598
599 let snapshot = NewSnapshot {
601 snapshot_id: ailake_catalog::new_snapshot_id(),
602 parent_snapshot_id: None,
603 files: vec![merged.clone()],
604 operation: SnapshotOperation::Replace,
605 iceberg_schema: None,
606 extra_properties: std::collections::HashMap::new(),
607 bloom_filters: vec![],
608 equality_delete_files: vec![],
609 };
610 catalog.commit_snapshot(table, snapshot).await?;
611
612 info!(
613 "ailake: compaction committed (deferred) — merged {} files into {} \
614 (index building in background)",
615 to_compact.len(),
616 output_path
617 );
618
619 delete_old_files(&self.store, &to_compact).await;
620
621 Ok(Some(merged))
622 }
623}
624
625async fn delete_old_files(store: &Arc<dyn Store>, files: &[DataFileEntry]) {
626 for entry in files {
627 if let Err(e) = store.delete(&entry.path).await {
628 error!(
629 "ailake: compaction cleanup failed — could not delete {}: {} \
630 (orphan file in object store after successful catalog commit; \
631 delete manually to reclaim storage)",
632 entry.path, e
633 );
634 }
635 }
636}
637
638fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
639 arrow_select::concat::concat_batches(&schema, batches)
640 .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646 use ailake_catalog::IndexStatus;
647
648 #[test]
649 fn plan_returns_empty_if_too_few_files() {
650 let planner = CompactionPlanner::new(CompactionConfig {
651 min_files_to_compact: 4,
652 target_file_size_bytes: 1024 * 1024,
653 ..Default::default()
654 });
655 let files: Vec<DataFileEntry> = (0..3)
656 .map(|i| DataFileEntry {
657 path: format!("file-{i}.parquet"),
658 record_count: 10,
659 file_size_bytes: 100,
660 centroid_b64: None,
661 radius: None,
662 hnsw_offset: None,
663 hnsw_len: None,
664 vector_column: None,
665 vector_dim: None,
666 extra_vector_indexes: vec![],
667 index_status: IndexStatus::Ready,
668 batch_id: None,
669 embedding_model: None,
670 partition_value: None,
671 deletion_vector: None,
672 first_row_id: None,
673 })
674 .collect();
675 assert!(planner.plan(&files).is_empty());
676 }
677
678 #[test]
679 fn plan_selects_small_files() {
680 let planner = CompactionPlanner::new(CompactionConfig {
681 min_files_to_compact: 2,
682 target_file_size_bytes: 1000,
683 ..Default::default()
684 });
685 let files = vec![
686 DataFileEntry {
687 path: "small.parquet".into(),
688 record_count: 5,
689 file_size_bytes: 500,
690 centroid_b64: None,
691 radius: None,
692 hnsw_offset: None,
693 hnsw_len: None,
694 vector_column: None,
695 vector_dim: None,
696 extra_vector_indexes: vec![],
697 index_status: IndexStatus::Ready,
698 batch_id: None,
699 embedding_model: None,
700 partition_value: None,
701 deletion_vector: None,
702 first_row_id: None,
703 },
704 DataFileEntry {
705 path: "large.parquet".into(),
706 record_count: 5000,
707 file_size_bytes: 200_000_000,
708 centroid_b64: None,
709 radius: None,
710 hnsw_offset: None,
711 hnsw_len: None,
712 vector_column: None,
713 vector_dim: None,
714 extra_vector_indexes: vec![],
715 index_status: IndexStatus::Ready,
716 batch_id: None,
717 embedding_model: None,
718 partition_value: None,
719 deletion_vector: None,
720 first_row_id: None,
721 },
722 DataFileEntry {
723 path: "also-small.parquet".into(),
724 record_count: 5,
725 file_size_bytes: 800,
726 centroid_b64: None,
727 radius: None,
728 hnsw_offset: None,
729 hnsw_len: None,
730 vector_column: None,
731 vector_dim: None,
732 extra_vector_indexes: vec![],
733 index_status: IndexStatus::Ready,
734 batch_id: None,
735 embedding_model: None,
736 partition_value: None,
737 deletion_vector: None,
738 first_row_id: None,
739 },
740 ];
741 let selected = planner.plan(&files);
742 assert_eq!(selected.len(), 2);
743 assert!(selected.iter().any(|f| f.path == "small.parquet"));
744 assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
745 }
746
747 #[test]
748 fn plan_respects_max_files_per_pass() {
749 let planner = CompactionPlanner::new(CompactionConfig {
750 min_files_to_compact: 2,
751 target_file_size_bytes: 1_000_000,
752 max_files_per_pass: 3,
753 ..Default::default()
754 });
755 let files: Vec<DataFileEntry> = (0..5)
756 .map(|i| DataFileEntry {
757 path: format!("f{i}.parquet"),
758 record_count: 10,
759 file_size_bytes: 100 + i as u64 * 100,
760 centroid_b64: None,
761 radius: None,
762 hnsw_offset: None,
763 hnsw_len: None,
764 vector_column: None,
765 vector_dim: None,
766 extra_vector_indexes: vec![],
767 index_status: IndexStatus::Ready,
768 batch_id: None,
769 embedding_model: None,
770 partition_value: None,
771 deletion_vector: None,
772 first_row_id: None,
773 })
774 .collect();
775 let selected = planner.plan(&files);
776 assert_eq!(selected.len(), 3);
777 assert_eq!(selected[0].file_size_bytes, 100);
778 assert_eq!(selected[1].file_size_bytes, 200);
779 assert_eq!(selected[2].file_size_bytes, 300);
780 }
781
782 #[test]
783 fn plan_sorts_smallest_first() {
784 let planner = CompactionPlanner::new(CompactionConfig {
785 min_files_to_compact: 2,
786 target_file_size_bytes: 10_000,
787 max_files_per_pass: 4,
788 ..Default::default()
789 });
790 let files = vec![
791 DataFileEntry {
792 path: "c.parquet".into(),
793 record_count: 1,
794 file_size_bytes: 300,
795 centroid_b64: None,
796 radius: None,
797 hnsw_offset: None,
798 hnsw_len: None,
799 vector_column: None,
800 vector_dim: None,
801 extra_vector_indexes: vec![],
802 index_status: IndexStatus::Ready,
803 batch_id: None,
804 embedding_model: None,
805 partition_value: None,
806 deletion_vector: None,
807 first_row_id: None,
808 },
809 DataFileEntry {
810 path: "a.parquet".into(),
811 record_count: 1,
812 file_size_bytes: 100,
813 centroid_b64: None,
814 radius: None,
815 hnsw_offset: None,
816 hnsw_len: None,
817 vector_column: None,
818 vector_dim: None,
819 extra_vector_indexes: vec![],
820 index_status: IndexStatus::Ready,
821 batch_id: None,
822 embedding_model: None,
823 partition_value: None,
824 deletion_vector: None,
825 first_row_id: None,
826 },
827 DataFileEntry {
828 path: "b.parquet".into(),
829 record_count: 1,
830 file_size_bytes: 200,
831 centroid_b64: None,
832 radius: None,
833 hnsw_offset: None,
834 hnsw_len: None,
835 vector_column: None,
836 vector_dim: None,
837 extra_vector_indexes: vec![],
838 index_status: IndexStatus::Ready,
839 batch_id: None,
840 embedding_model: None,
841 partition_value: None,
842 deletion_vector: None,
843 first_row_id: None,
844 },
845 ];
846 let selected = planner.plan(&files);
847 assert_eq!(selected[0].file_size_bytes, 100);
848 assert_eq!(selected[1].file_size_bytes, 200);
849 assert_eq!(selected[2].file_size_bytes, 300);
850 }
851
852 #[tokio::test]
853 async fn compact_merges_two_files() {
854 use ailake_core::{VectorMetric, VectorPrecision};
855 use ailake_store::LocalStore;
856 use arrow_array::{Int32Array, RecordBatch};
857 use arrow_schema::{DataType, Field, Schema};
858 use std::sync::Arc;
859 use tempfile::TempDir;
860
861 let dir = TempDir::new().unwrap();
862 let store = Arc::new(LocalStore::new(dir.path()));
863 let policy = VectorStoragePolicy {
864 column_name: "embedding".into(),
865 dim: 4,
866 metric: VectorMetric::Cosine,
867 precision: VectorPrecision::F16,
868 pq: None,
869 keep_raw_for_reranking: true,
870 pre_normalize: false,
871 hnsw_m: None,
872 hnsw_ef_construction: None,
873 ivf_residual: false,
874 embedding_model: None,
875 modality: None,
876 partition_by: None,
877 partition_value: None,
878 partition_column_type: None,
879 partition_fields: vec![],
880 };
881
882 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
883 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
884 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
885
886 let batch_a = RecordBatch::try_new(
887 schema.clone(),
888 vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
889 )
890 .unwrap();
891 let batch_b = RecordBatch::try_new(
892 schema.clone(),
893 vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
894 )
895 .unwrap();
896
897 let writer_a = AilakeFileWriter::new(policy.clone());
898 let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
899 let writer_b = AilakeFileWriter::new(policy.clone());
900 let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
901
902 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
903 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
904
905 let entries = vec![
906 DataFileEntry {
907 path: "data/a.parquet".into(),
908 record_count: 2,
909 file_size_bytes: bytes_a.len() as u64,
910 centroid_b64: None,
911 radius: None,
912 hnsw_offset: None,
913 hnsw_len: None,
914 vector_column: None,
915 vector_dim: None,
916 extra_vector_indexes: vec![],
917 index_status: IndexStatus::Ready,
918 batch_id: None,
919 embedding_model: None,
920 partition_value: None,
921 deletion_vector: None,
922 first_row_id: None,
923 },
924 DataFileEntry {
925 path: "data/b.parquet".into(),
926 record_count: 2,
927 file_size_bytes: bytes_b.len() as u64,
928 centroid_b64: None,
929 radius: None,
930 hnsw_offset: None,
931 hnsw_len: None,
932 vector_column: None,
933 vector_dim: None,
934 extra_vector_indexes: vec![],
935 index_status: IndexStatus::Ready,
936 batch_id: None,
937 embedding_model: None,
938 partition_value: None,
939 deletion_vector: None,
940 first_row_id: None,
941 },
942 ];
943
944 let executor = CompactionExecutor::new(store.clone(), policy.clone());
945 let merged = executor
946 .compact(&entries, "data/merged.parquet")
947 .await
948 .unwrap();
949
950 assert_eq!(merged.record_count, 4);
951 assert_eq!(merged.path, "data/merged.parquet");
952
953 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
954 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
955 reader.verify_integrity().unwrap();
956 let (batch, embs) = reader.read_parquet().unwrap();
957 assert_eq!(batch.num_rows(), 4);
958 assert_eq!(embs.len(), 4);
959 }
960
961 #[tokio::test]
962 async fn compact_incremental_merges_dominant_plus_small() {
963 use ailake_core::{RowId, VectorMetric, VectorPrecision};
964 use ailake_store::LocalStore;
965 use arrow_array::{Int32Array, RecordBatch};
966 use arrow_schema::{DataType, Field, Schema};
967 use std::sync::Arc;
968 use tempfile::TempDir;
969
970 let dir = TempDir::new().unwrap();
971 let store = Arc::new(LocalStore::new(dir.path()));
972 let policy = VectorStoragePolicy {
973 column_name: "embedding".into(),
974 dim: 4,
975 metric: VectorMetric::Cosine,
976 precision: VectorPrecision::F16,
977 pq: None,
978 keep_raw_for_reranking: true,
979 pre_normalize: false,
980 hnsw_m: None,
981 hnsw_ef_construction: None,
982 ivf_residual: false,
983 embedding_model: None,
984 modality: None,
985 partition_by: None,
986 partition_value: None,
987 partition_column_type: None,
988 partition_fields: vec![],
989 };
990
991 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
992
993 let embs_dom: Vec<Vec<f32>> = vec![
995 vec![1.0, 0.0, 0.0, 0.0],
996 vec![0.0, 1.0, 0.0, 0.0],
997 vec![0.0, 0.0, 1.0, 0.0],
998 vec![0.7, 0.7, 0.0, 0.0],
999 vec![0.0, 0.7, 0.7, 0.0],
1000 vec![0.0, 0.0, 0.7, 0.7],
1001 ];
1002 let batch_dom = RecordBatch::try_new(
1003 schema.clone(),
1004 vec![Arc::new(Int32Array::from(vec![0i32, 1, 2, 3, 4, 5]))],
1005 )
1006 .unwrap();
1007
1008 let embs_small: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 0.0, 1.0], vec![0.5, 0.5, 0.5, 0.5]];
1010 let batch_small = RecordBatch::try_new(
1011 schema.clone(),
1012 vec![Arc::new(Int32Array::from(vec![6i32, 7]))],
1013 )
1014 .unwrap();
1015
1016 let bytes_dom = AilakeFileWriter::new(policy.clone())
1017 .write(&batch_dom, &embs_dom)
1018 .unwrap();
1019 let bytes_small = AilakeFileWriter::new(policy.clone())
1020 .write(&batch_small, &embs_small)
1021 .unwrap();
1022
1023 store
1024 .put("data/dominant.parquet", bytes_dom.clone())
1025 .await
1026 .unwrap();
1027 store
1028 .put("data/small.parquet", bytes_small.clone())
1029 .await
1030 .unwrap();
1031
1032 let entries = vec![
1033 DataFileEntry {
1034 path: "data/dominant.parquet".into(),
1035 record_count: 6,
1036 file_size_bytes: bytes_dom.len() as u64,
1037 centroid_b64: None,
1038 radius: None,
1039 hnsw_offset: None,
1040 hnsw_len: None,
1041 vector_column: None,
1042 vector_dim: None,
1043 extra_vector_indexes: vec![],
1044 index_status: IndexStatus::Ready,
1045 batch_id: None,
1046 embedding_model: None,
1047 partition_value: None,
1048 deletion_vector: None,
1049 first_row_id: None,
1050 },
1051 DataFileEntry {
1052 path: "data/small.parquet".into(),
1053 record_count: 2,
1054 file_size_bytes: bytes_small.len() as u64,
1055 centroid_b64: None,
1056 radius: None,
1057 hnsw_offset: None,
1058 hnsw_len: None,
1059 vector_column: None,
1060 vector_dim: None,
1061 extra_vector_indexes: vec![],
1062 index_status: IndexStatus::Ready,
1063 batch_id: None,
1064 embedding_model: None,
1065 partition_value: None,
1066 deletion_vector: None,
1067 first_row_id: None,
1068 },
1069 ];
1070
1071 let executor = CompactionExecutor::new(store.clone(), policy.clone());
1072 let merged = executor
1073 .compact_incremental(&entries, "data/merged.parquet")
1074 .await
1075 .unwrap();
1076
1077 assert_eq!(merged.record_count, 8);
1079 assert_eq!(merged.path, "data/merged.parquet");
1080
1081 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1083 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
1084 reader.verify_integrity().unwrap();
1085
1086 let (batch, embs) = reader.read_parquet().unwrap();
1087 assert_eq!(batch.num_rows(), 8);
1088 assert_eq!(embs.len(), 8);
1089
1090 for f in &embs[..6] {
1092 assert_eq!(f.len(), 4);
1093 }
1094
1095 let hnsw = reader.load_index().unwrap();
1097 assert_eq!(hnsw.node_count(), 8);
1098
1099 let results = hnsw.search(&[1.0, 0.0, 0.0, 0.0], 1, 50);
1101 assert_eq!(results[0].0, RowId::new(0));
1102
1103 let results = hnsw.search(&[0.0, 0.0, 0.0, 1.0], 1, 50);
1106 assert_eq!(results[0].0, RowId::new(6));
1107 }
1108
1109 #[tokio::test]
1110 async fn compact_incremental_falls_back_when_no_dominant() {
1111 use ailake_core::{VectorMetric, VectorPrecision};
1112 use ailake_store::LocalStore;
1113 use arrow_array::{Int32Array, RecordBatch};
1114 use arrow_schema::{DataType, Field, Schema};
1115 use std::sync::Arc;
1116 use tempfile::TempDir;
1117
1118 let dir = TempDir::new().unwrap();
1119 let store = Arc::new(LocalStore::new(dir.path()));
1120 let policy = VectorStoragePolicy {
1121 column_name: "embedding".into(),
1122 dim: 4,
1123 metric: VectorMetric::Cosine,
1124 precision: VectorPrecision::F16,
1125 pq: None,
1126 keep_raw_for_reranking: true,
1127 pre_normalize: false,
1128 hnsw_m: None,
1129 hnsw_ef_construction: None,
1130 ivf_residual: false,
1131 embedding_model: None,
1132 modality: None,
1133 partition_by: None,
1134 partition_value: None,
1135 partition_column_type: None,
1136 partition_fields: vec![],
1137 };
1138
1139 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1140
1141 let make_batch = |ids: Vec<i32>, embs: Vec<Vec<f32>>| {
1143 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(ids))])
1144 .unwrap();
1145 AilakeFileWriter::new(policy.clone())
1146 .write(&batch, &embs)
1147 .unwrap()
1148 };
1149
1150 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1151 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
1152 let bytes_a = make_batch(vec![0, 1], embs_a);
1153 let bytes_b = make_batch(vec![2, 3], embs_b);
1154
1155 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
1156 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
1157
1158 let entries = vec![
1159 DataFileEntry {
1160 path: "data/a.parquet".into(),
1161 record_count: 2,
1162 file_size_bytes: bytes_a.len() as u64,
1163 centroid_b64: None,
1164 radius: None,
1165 hnsw_offset: None,
1166 hnsw_len: None,
1167 vector_column: None,
1168 vector_dim: None,
1169 extra_vector_indexes: vec![],
1170 index_status: IndexStatus::Ready,
1171 batch_id: None,
1172 embedding_model: None,
1173 partition_value: None,
1174 deletion_vector: None,
1175 first_row_id: None,
1176 },
1177 DataFileEntry {
1178 path: "data/b.parquet".into(),
1179 record_count: 2,
1180 file_size_bytes: bytes_b.len() as u64,
1181 centroid_b64: None,
1182 radius: None,
1183 hnsw_offset: None,
1184 hnsw_len: None,
1185 vector_column: None,
1186 vector_dim: None,
1187 extra_vector_indexes: vec![],
1188 index_status: IndexStatus::Ready,
1189 batch_id: None,
1190 embedding_model: None,
1191 partition_value: None,
1192 deletion_vector: None,
1193 first_row_id: None,
1194 },
1195 ];
1196
1197 let executor = CompactionExecutor::new(store.clone(), policy.clone());
1198 let merged = executor
1200 .compact_incremental(&entries, "data/merged.parquet")
1201 .await
1202 .unwrap();
1203
1204 assert_eq!(merged.record_count, 4);
1205
1206 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1207 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
1208 reader.verify_integrity().unwrap();
1209 }
1210
1211 #[tokio::test]
1212 async fn compact_deferred_produces_parquet_only_file() {
1213 use ailake_catalog::HadoopCatalog;
1214 use ailake_core::{VectorMetric, VectorPrecision};
1215 use ailake_store::LocalStore;
1216 use arrow_array::{Int32Array, RecordBatch};
1217 use arrow_schema::{DataType, Field, Schema};
1218 use std::sync::Arc;
1219 use tempfile::TempDir;
1220
1221 let dir = TempDir::new().unwrap();
1222 let store = Arc::new(LocalStore::new(dir.path()));
1223 let catalog_dir = TempDir::new().unwrap();
1224 let catalog_store = Arc::new(LocalStore::new(catalog_dir.path()));
1225 let catalog = Arc::new(HadoopCatalog::new(catalog_store, ""));
1226 let table = TableIdent {
1227 namespace: "ns".into(),
1228 name: "tbl".into(),
1229 };
1230
1231 let policy = VectorStoragePolicy {
1232 column_name: "embedding".into(),
1233 dim: 4,
1234 metric: VectorMetric::Cosine,
1235 precision: VectorPrecision::F16,
1236 pq: None,
1237 keep_raw_for_reranking: true,
1238 pre_normalize: false,
1239 hnsw_m: None,
1240 hnsw_ef_construction: None,
1241 ivf_residual: false,
1242 embedding_model: None,
1243 modality: None,
1244 partition_by: None,
1245 partition_value: None,
1246 partition_column_type: None,
1247 partition_fields: vec![],
1248 };
1249
1250 use ailake_catalog::TableProperties;
1251 catalog
1252 .create_table(
1253 &table,
1254 &TableProperties {
1255 policy: policy.clone(),
1256 extra: std::collections::HashMap::new(),
1257 format_version: 2,
1258 partition_column_type: None,
1259 },
1260 )
1261 .await
1262 .unwrap();
1263
1264 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
1265 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1266 let batch_a = RecordBatch::try_new(
1267 schema.clone(),
1268 vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
1269 )
1270 .unwrap();
1271 let bytes_a = AilakeFileWriter::new(policy.clone())
1272 .write(&batch_a, &embs_a)
1273 .unwrap();
1274 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
1275
1276 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
1277 let batch_b = RecordBatch::try_new(
1278 schema.clone(),
1279 vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
1280 )
1281 .unwrap();
1282 let bytes_b = AilakeFileWriter::new(policy.clone())
1283 .write(&batch_b, &embs_b)
1284 .unwrap();
1285 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
1286
1287 let entries = vec![
1288 DataFileEntry {
1289 path: "data/a.parquet".into(),
1290 record_count: 2,
1291 file_size_bytes: bytes_a.len() as u64,
1292 centroid_b64: None,
1293 radius: None,
1294 hnsw_offset: None,
1295 hnsw_len: None,
1296 vector_column: None,
1297 vector_dim: None,
1298 extra_vector_indexes: vec![],
1299 index_status: IndexStatus::Ready,
1300 batch_id: None,
1301 embedding_model: None,
1302 partition_value: None,
1303 deletion_vector: None,
1304 first_row_id: None,
1305 },
1306 DataFileEntry {
1307 path: "data/b.parquet".into(),
1308 record_count: 2,
1309 file_size_bytes: bytes_b.len() as u64,
1310 centroid_b64: None,
1311 radius: None,
1312 hnsw_offset: None,
1313 hnsw_len: None,
1314 vector_column: None,
1315 vector_dim: None,
1316 extra_vector_indexes: vec![],
1317 index_status: IndexStatus::Ready,
1318 batch_id: None,
1319 embedding_model: None,
1320 partition_value: None,
1321 deletion_vector: None,
1322 first_row_id: None,
1323 },
1324 ];
1325
1326 let executor = CompactionExecutor::new(store.clone(), policy.clone());
1327 let entry = executor
1328 .compact_deferred(&entries, "data/merged.parquet", catalog.clone(), &table)
1329 .await
1330 .unwrap();
1331
1332 assert_eq!(entry.index_status, IndexStatus::Indexing);
1334 assert_eq!(entry.record_count, 4);
1335
1336 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
1338 let pq_reader = ailake_parquet::ParquetVectorReader::new(merged_bytes, "embedding");
1339 let count = pq_reader.record_count().unwrap();
1340 assert_eq!(count, 4);
1341 }
1342}