Skip to main content

ailake_query/
compaction.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::Arc;
3
4use ailake_catalog::{
5    make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
6    TableIdent, VectorIndexInfo,
7};
8use ailake_core::{AilakeResult, VectorStoragePolicy};
9use ailake_file::{AilakeFileReader, AilakeFileWriter};
10use ailake_store::Store;
11use ailake_vec::compute_centroid_and_radius;
12use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use bytes::Bytes;
15
16/// Index strategy for the merged file produced by compaction.
17#[derive(Debug, Clone, Default)]
18pub enum CompactionIndexStrategy {
19    /// Detect GPU / CPU cores at compaction time and pick the best index.
20    /// IVF-PQ on GPU/many-core machines; HNSW elsewhere. (default)
21    #[default]
22    Auto,
23    /// Always rebuild with HNSW — highest recall, larger index.
24    ForceHnsw,
25    /// Always rebuild with IVF-PQ — smaller index, better S3 throughput.
26    ForceIvfPq,
27}
28
29#[derive(Debug, Clone)]
30pub struct CompactionConfig {
31    /// Trigger compaction only if at least this many files are eligible.
32    pub min_files_to_compact: usize,
33    /// Target output file size in bytes. Files below this are merged.
34    pub target_file_size_bytes: u64,
35    /// Index algorithm for the merged output file.
36    pub index_strategy: CompactionIndexStrategy,
37}
38
39impl Default for CompactionConfig {
40    fn default() -> Self {
41        Self {
42            min_files_to_compact: 4,
43            target_file_size_bytes: 128 * 1024 * 1024, // 128 MB
44            index_strategy: CompactionIndexStrategy::Auto,
45        }
46    }
47}
48
49#[derive(Debug, Clone, Copy)]
50pub enum CompactionMode {
51    Full,    // compact all files below target size
52    Partial, // compact the smallest N files
53}
54
55pub struct CompactionPlanner {
56    config: CompactionConfig,
57}
58
59impl CompactionPlanner {
60    pub fn new(config: CompactionConfig) -> Self {
61        Self { config }
62    }
63
64    /// Select files to compact: all files smaller than `target_file_size_bytes`,
65    /// provided at least `min_files_to_compact` qualify.
66    pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
67        let candidates: Vec<DataFileEntry> = files
68            .iter()
69            .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
70            .cloned()
71            .collect();
72        if candidates.len() < self.config.min_files_to_compact {
73            return vec![];
74        }
75        candidates
76    }
77}
78
79/// Executes compaction plans: reads N small files, merges them into a single
80/// AI-Lake file with a rebuilt index, and commits to the catalog.
81///
82/// The index algorithm is chosen via `CompactionIndexStrategy` (default: `Auto`,
83/// which detects GPU / CPU cores at compaction time — the same heuristic used
84/// by `write_batch_auto`).
85pub struct CompactionExecutor {
86    store: Arc<dyn Store>,
87    policy: VectorStoragePolicy,
88    index_strategy: CompactionIndexStrategy,
89}
90
91impl CompactionExecutor {
92    pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
93        Self {
94            store,
95            policy,
96            index_strategy: CompactionIndexStrategy::Auto,
97        }
98    }
99
100    /// Override the default (Auto) index strategy for this executor.
101    pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
102        self.index_strategy = strategy;
103        self
104    }
105
106    /// Merge `files` into a single new file at `output_path`.
107    /// Returns the DataFileEntry for the merged file.
108    pub async fn compact(
109        &self,
110        files: &[DataFileEntry],
111        output_path: &str,
112    ) -> AilakeResult<DataFileEntry> {
113        if files.is_empty() {
114            return Err(ailake_core::AilakeError::Catalog(
115                "compact: no files provided".into(),
116            ));
117        }
118
119        let mut all_batches: Vec<RecordBatch> = Vec::new();
120        let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
121        let mut schema: Option<SchemaRef> = None;
122
123        for entry in files {
124            let bytes: Bytes = self.store.get(&entry.path).await?;
125            let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
126            if !reader.is_ailake_file() {
127                continue;
128            }
129            let (batch, embs) = reader.read_parquet()?;
130            if schema.is_none() {
131                schema = Some(batch.schema());
132            }
133            all_batches.push(batch);
134            all_embeddings.extend(embs);
135        }
136
137        if all_batches.is_empty() {
138            return Err(ailake_core::AilakeError::Catalog(
139                "compact: no valid AI-Lake files in input".into(),
140            ));
141        }
142
143        // Concatenate all row groups into one batch
144        let merged_batch = concat_batches(schema.unwrap(), &all_batches)?;
145        let record_count = merged_batch.num_rows() as u64;
146
147        // Write merged file with adaptive index selection.
148        let writer = {
149            let base = AilakeFileWriter::new(self.policy.clone());
150            match &self.index_strategy {
151                CompactionIndexStrategy::Auto => base.with_auto_index(),
152                CompactionIndexStrategy::ForceHnsw => base,
153                CompactionIndexStrategy::ForceIvfPq => {
154                    let cfg = ailake_index::IvfPqConfig::for_dataset(
155                        self.policy.dim as usize,
156                        all_embeddings.len(),
157                    );
158                    base.with_ivf_pq(cfg)
159                }
160            }
161        };
162        let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
163        let file_size = file_bytes.len() as u64;
164        self.store.put(output_path, file_bytes.clone()).await?;
165
166        // Compute centroid and HNSW offsets for catalog entry
167        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
168        let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
169        let header = reader.read_header()?;
170        let ailk_start = reader.ailk_offset()?;
171
172        let entry = make_data_file_entry(
173            output_path,
174            record_count,
175            file_size,
176            &centroid,
177            VectorIndexInfo {
178                column: &self.policy.column_name,
179                dim: self.policy.dim,
180                hnsw_offset: ailk_start + header.hnsw_offset,
181                hnsw_len: header.hnsw_len,
182            },
183        );
184        Ok(entry)
185    }
186
187    /// Full compaction workflow: plan, compact, drop old files from catalog, commit.
188    pub async fn run(
189        &self,
190        planner: &CompactionPlanner,
191        table: &TableIdent,
192        catalog: Arc<dyn CatalogProvider>,
193        output_prefix: &str,
194    ) -> AilakeResult<Option<DataFileEntry>> {
195        let all_files = catalog.list_files(table, None).await?;
196        let to_compact = planner.plan(&all_files);
197        if to_compact.is_empty() {
198            return Ok(None);
199        }
200
201        let ts = std::time::SystemTime::now()
202            .duration_since(std::time::UNIX_EPOCH)
203            .unwrap()
204            .as_millis();
205        let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
206
207        let merged = self.compact(&to_compact, &output_path).await?;
208
209        // Commit: add merged file, remove input files (via Overwrite snapshot)
210        let snapshot = NewSnapshot {
211            snapshot_id: ailake_catalog::new_snapshot_id(),
212            parent_snapshot_id: None,
213            files: vec![merged.clone()],
214            operation: SnapshotOperation::Replace,
215            iceberg_schema: None,
216        };
217        catalog.commit_snapshot(table, snapshot).await?;
218
219        // Delete old files from store
220        for entry in &to_compact {
221            let _ = self.store.delete(&entry.path).await;
222        }
223
224        Ok(Some(merged))
225    }
226}
227
228fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
229    arrow_select::concat::concat_batches(&schema, batches)
230        .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn plan_returns_empty_if_too_few_files() {
239        let planner = CompactionPlanner::new(CompactionConfig {
240            min_files_to_compact: 4,
241            target_file_size_bytes: 1024 * 1024,
242            ..Default::default()
243        });
244        let files: Vec<DataFileEntry> = (0..3)
245            .map(|i| DataFileEntry {
246                path: format!("file-{i}.parquet"),
247                record_count: 10,
248                file_size_bytes: 100, // below target
249                centroid_b64: None,
250                radius: None,
251                hnsw_offset: None,
252                hnsw_len: None,
253                vector_column: None,
254                vector_dim: None,
255                extra_vector_indexes: vec![],
256                index_status: ailake_catalog::IndexStatus::Ready,
257                batch_id: None,
258            })
259            .collect();
260        assert!(planner.plan(&files).is_empty());
261    }
262
263    #[test]
264    fn plan_selects_small_files() {
265        let planner = CompactionPlanner::new(CompactionConfig {
266            min_files_to_compact: 2,
267            target_file_size_bytes: 1000,
268            ..Default::default()
269        });
270        let files = vec![
271            DataFileEntry {
272                path: "small.parquet".into(),
273                record_count: 5,
274                file_size_bytes: 500,
275                centroid_b64: None,
276                radius: None,
277                hnsw_offset: None,
278                hnsw_len: None,
279                vector_column: None,
280                vector_dim: None,
281                extra_vector_indexes: vec![],
282                index_status: ailake_catalog::IndexStatus::Ready,
283                batch_id: None,
284            },
285            DataFileEntry {
286                path: "large.parquet".into(),
287                record_count: 5000,
288                file_size_bytes: 200_000_000,
289                centroid_b64: None,
290                radius: None,
291                hnsw_offset: None,
292                hnsw_len: None,
293                vector_column: None,
294                vector_dim: None,
295                extra_vector_indexes: vec![],
296                index_status: ailake_catalog::IndexStatus::Ready,
297                batch_id: None,
298            },
299            DataFileEntry {
300                path: "also-small.parquet".into(),
301                record_count: 5,
302                file_size_bytes: 800,
303                centroid_b64: None,
304                radius: None,
305                hnsw_offset: None,
306                hnsw_len: None,
307                vector_column: None,
308                vector_dim: None,
309                extra_vector_indexes: vec![],
310                index_status: ailake_catalog::IndexStatus::Ready,
311                batch_id: None,
312            },
313        ];
314        let selected = planner.plan(&files);
315        assert_eq!(selected.len(), 2);
316        assert!(selected.iter().any(|f| f.path == "small.parquet"));
317        assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
318    }
319
320    #[tokio::test]
321    async fn compact_merges_two_files() {
322        use ailake_core::{VectorMetric, VectorPrecision};
323        use ailake_store::LocalStore;
324        use arrow_array::{Int32Array, RecordBatch};
325        use arrow_schema::{DataType, Field, Schema};
326        use std::sync::Arc;
327        use tempfile::TempDir;
328
329        let dir = TempDir::new().unwrap();
330        let store = Arc::new(LocalStore::new(dir.path()));
331        let policy = VectorStoragePolicy {
332            column_name: "embedding".into(),
333            dim: 4,
334            metric: VectorMetric::Cosine,
335            precision: VectorPrecision::F16,
336            pq: None,
337            keep_raw_for_reranking: false,
338        };
339
340        // Write two small files
341        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
342        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
343        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
344
345        let batch_a = RecordBatch::try_new(
346            schema.clone(),
347            vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
348        )
349        .unwrap();
350        let batch_b = RecordBatch::try_new(
351            schema.clone(),
352            vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
353        )
354        .unwrap();
355
356        let writer_a = AilakeFileWriter::new(policy.clone());
357        let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
358        let writer_b = AilakeFileWriter::new(policy.clone());
359        let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
360
361        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
362        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
363
364        let entries = vec![
365            DataFileEntry {
366                path: "data/a.parquet".into(),
367                record_count: 2,
368                file_size_bytes: bytes_a.len() as u64,
369                centroid_b64: None,
370                radius: None,
371                hnsw_offset: None,
372                hnsw_len: None,
373                vector_column: None,
374                vector_dim: None,
375                extra_vector_indexes: vec![],
376                index_status: ailake_catalog::IndexStatus::Ready,
377                batch_id: None,
378            },
379            DataFileEntry {
380                path: "data/b.parquet".into(),
381                record_count: 2,
382                file_size_bytes: bytes_b.len() as u64,
383                centroid_b64: None,
384                radius: None,
385                hnsw_offset: None,
386                hnsw_len: None,
387                vector_column: None,
388                vector_dim: None,
389                extra_vector_indexes: vec![],
390                index_status: ailake_catalog::IndexStatus::Ready,
391                batch_id: None,
392            },
393        ];
394
395        let executor = CompactionExecutor::new(store.clone(), policy.clone());
396        let merged = executor
397            .compact(&entries, "data/merged.parquet")
398            .await
399            .unwrap();
400
401        assert_eq!(merged.record_count, 4);
402        assert_eq!(merged.path, "data/merged.parquet");
403
404        // Verify merged file is a valid AI-Lake file with all 4 rows
405        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
406        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
407        reader.verify_integrity().unwrap();
408        let (batch, embs) = reader.read_parquet().unwrap();
409        assert_eq!(batch.num_rows(), 4);
410        assert_eq!(embs.len(), 4);
411    }
412}