Skip to main content

ailake_query/
compaction.rs

1use std::sync::Arc;
2
3use ailake_catalog::{
4    make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
5    TableIdent, VectorIndexInfo,
6};
7use ailake_core::{AilakeResult, VectorStoragePolicy};
8use ailake_file::{AilakeFileReader, AilakeFileWriter};
9use ailake_store::Store;
10use ailake_vec::compute_centroid_and_radius;
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use bytes::Bytes;
14
15/// Index strategy for the merged file produced by compaction.
16#[derive(Debug, Clone, Default)]
17pub enum CompactionIndexStrategy {
18    /// Detect GPU / CPU cores at compaction time and pick the best index.
19    /// IVF-PQ on GPU/many-core machines; HNSW elsewhere. (default)
20    #[default]
21    Auto,
22    /// Always rebuild with HNSW — highest recall, larger index.
23    ForceHnsw,
24    /// Always rebuild with IVF-PQ — smaller index, better S3 throughput.
25    ForceIvfPq,
26}
27
28#[derive(Debug, Clone)]
29pub struct CompactionConfig {
30    /// Trigger compaction only if at least this many files are eligible.
31    pub min_files_to_compact: usize,
32    /// Target output file size in bytes. Files below this are merged.
33    pub target_file_size_bytes: u64,
34    /// Index algorithm for the merged output file.
35    pub index_strategy: CompactionIndexStrategy,
36}
37
38impl Default for CompactionConfig {
39    fn default() -> Self {
40        Self {
41            min_files_to_compact: 4,
42            target_file_size_bytes: 128 * 1024 * 1024, // 128 MB
43            index_strategy: CompactionIndexStrategy::Auto,
44        }
45    }
46}
47
48#[derive(Debug, Clone, Copy)]
49pub enum CompactionMode {
50    Full,    // compact all files below target size
51    Partial, // compact the smallest N files
52}
53
54pub struct CompactionPlanner {
55    config: CompactionConfig,
56}
57
58impl CompactionPlanner {
59    pub fn new(config: CompactionConfig) -> Self {
60        Self { config }
61    }
62
63    /// Select files to compact: all files smaller than `target_file_size_bytes`,
64    /// provided at least `min_files_to_compact` qualify.
65    pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
66        let candidates: Vec<DataFileEntry> = files
67            .iter()
68            .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
69            .cloned()
70            .collect();
71        if candidates.len() < self.config.min_files_to_compact {
72            return vec![];
73        }
74        candidates
75    }
76}
77
78/// Executes compaction plans: reads N small files, merges them into a single
79/// AI-Lake file with a rebuilt index, and commits to the catalog.
80///
81/// The index algorithm is chosen via `CompactionIndexStrategy` (default: `Auto`,
82/// which detects GPU / CPU cores at compaction time — the same heuristic used
83/// by `write_batch_auto`).
84pub struct CompactionExecutor {
85    store: Arc<dyn Store>,
86    policy: VectorStoragePolicy,
87    index_strategy: CompactionIndexStrategy,
88}
89
90impl CompactionExecutor {
91    pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
92        Self {
93            store,
94            policy,
95            index_strategy: CompactionIndexStrategy::Auto,
96        }
97    }
98
99    /// Override the default (Auto) index strategy for this executor.
100    pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
101        self.index_strategy = strategy;
102        self
103    }
104
105    /// Merge `files` into a single new file at `output_path`.
106    /// Returns the DataFileEntry for the merged file.
107    pub async fn compact(
108        &self,
109        files: &[DataFileEntry],
110        output_path: &str,
111    ) -> AilakeResult<DataFileEntry> {
112        if files.is_empty() {
113            return Err(ailake_core::AilakeError::Catalog(
114                "compact: no files provided".into(),
115            ));
116        }
117
118        let mut all_batches: Vec<RecordBatch> = Vec::new();
119        let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
120        let mut schema: Option<SchemaRef> = None;
121
122        for entry in files {
123            let bytes: Bytes = self.store.get(&entry.path).await?;
124            let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
125            if !reader.is_ailake_file() {
126                continue;
127            }
128            let (batch, embs) = reader.read_parquet()?;
129            if schema.is_none() {
130                schema = Some(batch.schema());
131            }
132            all_batches.push(batch);
133            all_embeddings.extend(embs);
134        }
135
136        if all_batches.is_empty() {
137            return Err(ailake_core::AilakeError::Catalog(
138                "compact: no valid AI-Lake files in input".into(),
139            ));
140        }
141
142        // Concatenate all row groups into one batch
143        let merged_batch = concat_batches(schema.unwrap(), &all_batches)?;
144        let record_count = merged_batch.num_rows() as u64;
145
146        // Write merged file with adaptive index selection.
147        let writer = {
148            let base = AilakeFileWriter::new(self.policy.clone());
149            match &self.index_strategy {
150                CompactionIndexStrategy::Auto => base.with_auto_index(),
151                CompactionIndexStrategy::ForceHnsw => base,
152                CompactionIndexStrategy::ForceIvfPq => {
153                    let cfg = ailake_index::IvfPqConfig::for_dataset(
154                        self.policy.dim as usize,
155                        all_embeddings.len(),
156                    );
157                    base.with_ivf_pq(cfg)
158                }
159            }
160        };
161        let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
162        let file_size = file_bytes.len() as u64;
163        self.store.put(output_path, file_bytes.clone()).await?;
164
165        // Compute centroid and HNSW offsets for catalog entry
166        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
167        let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
168        let header = reader.read_header()?;
169        let ailk_start = reader.ailk_offset()?;
170
171        let entry = make_data_file_entry(
172            output_path,
173            record_count,
174            file_size,
175            &centroid,
176            VectorIndexInfo {
177                column: &self.policy.column_name,
178                dim: self.policy.dim,
179                hnsw_offset: ailk_start + header.hnsw_offset,
180                hnsw_len: header.hnsw_len,
181            },
182        );
183        Ok(entry)
184    }
185
186    /// Full compaction workflow: plan, compact, drop old files from catalog, commit.
187    pub async fn run(
188        &self,
189        planner: &CompactionPlanner,
190        table: &TableIdent,
191        catalog: Arc<dyn CatalogProvider>,
192        output_prefix: &str,
193    ) -> AilakeResult<Option<DataFileEntry>> {
194        let all_files = catalog.list_files(table, None).await?;
195        let to_compact = planner.plan(&all_files);
196        if to_compact.is_empty() {
197            return Ok(None);
198        }
199
200        let ts = std::time::SystemTime::now()
201            .duration_since(std::time::UNIX_EPOCH)
202            .unwrap()
203            .as_millis();
204        let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
205
206        let merged = self.compact(&to_compact, &output_path).await?;
207
208        // Commit: add merged file, remove input files (via Overwrite snapshot)
209        let snapshot = NewSnapshot {
210            snapshot_id: ailake_catalog::new_snapshot_id(),
211            parent_snapshot_id: None,
212            files: vec![merged.clone()],
213            operation: SnapshotOperation::Replace,
214            iceberg_schema: None,
215        };
216        catalog.commit_snapshot(table, snapshot).await?;
217
218        // Delete old files from store
219        for entry in &to_compact {
220            let _ = self.store.delete(&entry.path).await;
221        }
222
223        Ok(Some(merged))
224    }
225}
226
227fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
228    arrow_select::concat::concat_batches(&schema, batches)
229        .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn plan_returns_empty_if_too_few_files() {
238        let planner = CompactionPlanner::new(CompactionConfig {
239            min_files_to_compact: 4,
240            target_file_size_bytes: 1024 * 1024,
241            ..Default::default()
242        });
243        let files: Vec<DataFileEntry> = (0..3)
244            .map(|i| DataFileEntry {
245                path: format!("file-{i}.parquet"),
246                record_count: 10,
247                file_size_bytes: 100, // below target
248                centroid_b64: None,
249                radius: None,
250                hnsw_offset: None,
251                hnsw_len: None,
252                vector_column: None,
253                vector_dim: None,
254                extra_vector_indexes: vec![],
255                index_status: ailake_catalog::IndexStatus::Ready,
256            })
257            .collect();
258        assert!(planner.plan(&files).is_empty());
259    }
260
261    #[test]
262    fn plan_selects_small_files() {
263        let planner = CompactionPlanner::new(CompactionConfig {
264            min_files_to_compact: 2,
265            target_file_size_bytes: 1000,
266            ..Default::default()
267        });
268        let files = vec![
269            DataFileEntry {
270                path: "small.parquet".into(),
271                record_count: 5,
272                file_size_bytes: 500,
273                centroid_b64: None,
274                radius: None,
275                hnsw_offset: None,
276                hnsw_len: None,
277                vector_column: None,
278                vector_dim: None,
279                extra_vector_indexes: vec![],
280                index_status: ailake_catalog::IndexStatus::Ready,
281            },
282            DataFileEntry {
283                path: "large.parquet".into(),
284                record_count: 5000,
285                file_size_bytes: 200_000_000,
286                centroid_b64: None,
287                radius: None,
288                hnsw_offset: None,
289                hnsw_len: None,
290                vector_column: None,
291                vector_dim: None,
292                extra_vector_indexes: vec![],
293                index_status: ailake_catalog::IndexStatus::Ready,
294            },
295            DataFileEntry {
296                path: "also-small.parquet".into(),
297                record_count: 5,
298                file_size_bytes: 800,
299                centroid_b64: None,
300                radius: None,
301                hnsw_offset: None,
302                hnsw_len: None,
303                vector_column: None,
304                vector_dim: None,
305                extra_vector_indexes: vec![],
306                index_status: ailake_catalog::IndexStatus::Ready,
307            },
308        ];
309        let selected = planner.plan(&files);
310        assert_eq!(selected.len(), 2);
311        assert!(selected.iter().any(|f| f.path == "small.parquet"));
312        assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
313    }
314
315    #[tokio::test]
316    async fn compact_merges_two_files() {
317        use ailake_core::{VectorMetric, VectorPrecision};
318        use ailake_store::LocalStore;
319        use arrow_array::{Int32Array, RecordBatch};
320        use arrow_schema::{DataType, Field, Schema};
321        use std::sync::Arc;
322        use tempfile::TempDir;
323
324        let dir = TempDir::new().unwrap();
325        let store = Arc::new(LocalStore::new(dir.path()));
326        let policy = VectorStoragePolicy {
327            column_name: "embedding".into(),
328            dim: 4,
329            metric: VectorMetric::Cosine,
330            precision: VectorPrecision::F16,
331            pq: None,
332            keep_raw_for_reranking: false,
333        };
334
335        // Write two small files
336        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
337        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
338        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
339
340        let batch_a = RecordBatch::try_new(
341            schema.clone(),
342            vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
343        )
344        .unwrap();
345        let batch_b = RecordBatch::try_new(
346            schema.clone(),
347            vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
348        )
349        .unwrap();
350
351        let writer_a = AilakeFileWriter::new(policy.clone());
352        let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
353        let writer_b = AilakeFileWriter::new(policy.clone());
354        let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
355
356        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
357        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
358
359        let entries = vec![
360            DataFileEntry {
361                path: "data/a.parquet".into(),
362                record_count: 2,
363                file_size_bytes: bytes_a.len() as u64,
364                centroid_b64: None,
365                radius: None,
366                hnsw_offset: None,
367                hnsw_len: None,
368                vector_column: None,
369                vector_dim: None,
370                extra_vector_indexes: vec![],
371                index_status: ailake_catalog::IndexStatus::Ready,
372            },
373            DataFileEntry {
374                path: "data/b.parquet".into(),
375                record_count: 2,
376                file_size_bytes: bytes_b.len() as u64,
377                centroid_b64: None,
378                radius: None,
379                hnsw_offset: None,
380                hnsw_len: None,
381                vector_column: None,
382                vector_dim: None,
383                extra_vector_indexes: vec![],
384                index_status: ailake_catalog::IndexStatus::Ready,
385            },
386        ];
387
388        let executor = CompactionExecutor::new(store.clone(), policy.clone());
389        let merged = executor
390            .compact(&entries, "data/merged.parquet")
391            .await
392            .unwrap();
393
394        assert_eq!(merged.record_count, 4);
395        assert_eq!(merged.path, "data/merged.parquet");
396
397        // Verify merged file is a valid AI-Lake file with all 4 rows
398        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
399        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
400        reader.verify_integrity().unwrap();
401        let (batch, embs) = reader.read_parquet().unwrap();
402        assert_eq!(batch.num_rows(), 4);
403        assert_eq!(embs.len(), 4);
404    }
405}