Skip to main content

ailake_query/
compaction.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5use ailake_catalog::{
6    make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
7    TableIdent, VectorIndexInfo,
8};
9use ailake_core::{AilakeResult, VectorStoragePolicy};
10use ailake_file::{AilakeFileReader, AilakeFileWriter};
11use ailake_store::Store;
12use ailake_vec::compute_centroid_and_radius;
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use bytes::Bytes;
16
17/// Index strategy for the merged file produced by compaction.
18#[derive(Debug, Clone, Default)]
19pub enum CompactionIndexStrategy {
20    /// Detect GPU / CPU cores at compaction time and pick the best index.
21    /// IVF-PQ on GPU/many-core machines; HNSW elsewhere. (default)
22    #[default]
23    Auto,
24    /// Always rebuild with HNSW — highest recall, larger index.
25    ForceHnsw,
26    /// Always rebuild with IVF-PQ — smaller index, better S3 throughput.
27    ForceIvfPq,
28}
29
30#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32    /// Trigger compaction only if at least this many files are eligible.
33    pub min_files_to_compact: usize,
34    /// Target output file size in bytes. Files below this are merged.
35    pub target_file_size_bytes: u64,
36    /// Index algorithm for the merged output file.
37    pub index_strategy: CompactionIndexStrategy,
38}
39
40impl Default for CompactionConfig {
41    fn default() -> Self {
42        Self {
43            min_files_to_compact: 4,
44            target_file_size_bytes: 128 * 1024 * 1024, // 128 MB
45            index_strategy: CompactionIndexStrategy::Auto,
46        }
47    }
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum CompactionMode {
52    Full,    // compact all files below target size
53    Partial, // compact the smallest N files
54}
55
56pub struct CompactionPlanner {
57    config: CompactionConfig,
58}
59
60impl CompactionPlanner {
61    pub fn new(config: CompactionConfig) -> Self {
62        Self { config }
63    }
64
65    /// Select files to compact: all files smaller than `target_file_size_bytes`,
66    /// provided at least `min_files_to_compact` qualify.
67    pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
68        let candidates: Vec<DataFileEntry> = files
69            .iter()
70            .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
71            .cloned()
72            .collect();
73        if candidates.len() < self.config.min_files_to_compact {
74            debug!(
75                "ailake: compaction skipped — {} eligible files < min_files_to_compact={}",
76                candidates.len(),
77                self.config.min_files_to_compact
78            );
79            return vec![];
80        }
81        let total_bytes: u64 = candidates.iter().map(|f| f.file_size_bytes).sum();
82        info!(
83            "ailake: compaction plan — {} files ({} bytes) → 1 merged file",
84            candidates.len(),
85            total_bytes
86        );
87        candidates
88    }
89}
90
91/// Executes compaction plans: reads N small files, merges them into a single
92/// AI-Lake file with a rebuilt index, and commits to the catalog.
93///
94/// The index algorithm is chosen via `CompactionIndexStrategy` (default: `Auto`,
95/// which detects GPU / CPU cores at compaction time — the same heuristic used
96/// by `write_batch_auto`).
97pub struct CompactionExecutor {
98    store: Arc<dyn Store>,
99    policy: VectorStoragePolicy,
100    index_strategy: CompactionIndexStrategy,
101}
102
103impl CompactionExecutor {
104    pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
105        Self {
106            store,
107            policy,
108            index_strategy: CompactionIndexStrategy::Auto,
109        }
110    }
111
112    /// Override the default (Auto) index strategy for this executor.
113    pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
114        self.index_strategy = strategy;
115        self
116    }
117
118    /// Merge `files` into a single new file at `output_path`.
119    /// Returns the DataFileEntry for the merged file.
120    pub async fn compact(
121        &self,
122        files: &[DataFileEntry],
123        output_path: &str,
124    ) -> AilakeResult<DataFileEntry> {
125        if files.is_empty() {
126            return Err(ailake_core::AilakeError::Catalog(
127                "compact: no files provided".into(),
128            ));
129        }
130
131        let mut all_batches: Vec<RecordBatch> = Vec::new();
132        let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
133        let mut schema: Option<SchemaRef> = None;
134
135        for entry in files {
136            let bytes: Bytes = self.store.get(&entry.path).await?;
137            let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
138            if !reader.is_ailake_file() {
139                debug!(
140                    "ailake: compaction skipping {} — not an AI-Lake file",
141                    entry.path
142                );
143                continue;
144            }
145            let (batch, embs) = reader.read_parquet()?;
146            if schema.is_none() {
147                schema = Some(batch.schema());
148            }
149            all_batches.push(batch);
150            all_embeddings.extend(embs);
151        }
152
153        if all_batches.is_empty() {
154            return Err(ailake_core::AilakeError::Catalog(
155                "compact: no valid AI-Lake files in input".into(),
156            ));
157        }
158
159        // Concatenate all row groups into one batch
160        // SAFETY: schema is set whenever a batch is pushed; all_batches non-empty above.
161        let merged_batch = concat_batches(
162            schema.expect("schema set because all_batches is non-empty"),
163            &all_batches,
164        )?;
165        let record_count = merged_batch.num_rows() as u64;
166
167        // Write merged file with adaptive index selection.
168        let writer = {
169            let base = AilakeFileWriter::new(self.policy.clone());
170            match &self.index_strategy {
171                CompactionIndexStrategy::Auto => base.with_auto_index(),
172                CompactionIndexStrategy::ForceHnsw => base,
173                CompactionIndexStrategy::ForceIvfPq => {
174                    let cfg = ailake_index::IvfPqConfig::for_dataset(
175                        self.policy.dim as usize,
176                        all_embeddings.len(),
177                    );
178                    base.with_ivf_pq(cfg)
179                }
180            }
181        };
182        let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
183        let file_size = file_bytes.len() as u64;
184        self.store.put(output_path, file_bytes.clone()).await?;
185
186        // Compute centroid and HNSW offsets for catalog entry
187        let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
188        let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
189        let header = reader.read_header()?;
190        let ailk_start = reader.ailk_offset()?;
191
192        let entry = make_data_file_entry(
193            output_path,
194            record_count,
195            file_size,
196            &centroid,
197            VectorIndexInfo {
198                column: &self.policy.column_name,
199                dim: self.policy.dim,
200                hnsw_offset: ailk_start + header.hnsw_offset,
201                hnsw_len: header.hnsw_len,
202            },
203        );
204        Ok(entry)
205    }
206
207    /// Full compaction workflow: plan, compact, drop old files from catalog, commit.
208    pub async fn run(
209        &self,
210        planner: &CompactionPlanner,
211        table: &TableIdent,
212        catalog: Arc<dyn CatalogProvider>,
213        output_prefix: &str,
214    ) -> AilakeResult<Option<DataFileEntry>> {
215        let all_files = catalog.list_files(table, None).await?;
216        let to_compact = planner.plan(&all_files);
217        if to_compact.is_empty() {
218            return Ok(None);
219        }
220
221        let ts = std::time::SystemTime::now()
222            .duration_since(std::time::UNIX_EPOCH)
223            .unwrap_or_else(|e| e.duration()) // handle misconfigured system clocks
224            .as_millis();
225        let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
226
227        let merged = self.compact(&to_compact, &output_path).await?;
228
229        // Commit: add merged file, remove input files (via Overwrite snapshot)
230        let snapshot = NewSnapshot {
231            snapshot_id: ailake_catalog::new_snapshot_id(),
232            parent_snapshot_id: None,
233            files: vec![merged.clone()],
234            operation: SnapshotOperation::Replace,
235            iceberg_schema: None,
236            extra_properties: std::collections::HashMap::new(),
237        };
238        catalog.commit_snapshot(table, snapshot).await?;
239
240        info!(
241            "ailake: compaction committed — merged {} files into {}",
242            to_compact.len(),
243            output_path
244        );
245
246        // Delete old files from store
247        for entry in &to_compact {
248            if let Err(e) = self.store.delete(&entry.path).await {
249                error!(
250                    "ailake: compaction cleanup failed — could not delete {}: {} \
251                     (orphan file in object store after successful catalog commit; \
252                     delete manually to reclaim storage)",
253                    entry.path, e
254                );
255            }
256        }
257
258        Ok(Some(merged))
259    }
260}
261
262fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
263    arrow_select::concat::concat_batches(&schema, batches)
264        .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn plan_returns_empty_if_too_few_files() {
273        let planner = CompactionPlanner::new(CompactionConfig {
274            min_files_to_compact: 4,
275            target_file_size_bytes: 1024 * 1024,
276            ..Default::default()
277        });
278        let files: Vec<DataFileEntry> = (0..3)
279            .map(|i| DataFileEntry {
280                path: format!("file-{i}.parquet"),
281                record_count: 10,
282                file_size_bytes: 100, // below target
283                centroid_b64: None,
284                radius: None,
285                hnsw_offset: None,
286                hnsw_len: None,
287                vector_column: None,
288                vector_dim: None,
289                extra_vector_indexes: vec![],
290                index_status: ailake_catalog::IndexStatus::Ready,
291                batch_id: None,
292                embedding_model: None,
293            })
294            .collect();
295        assert!(planner.plan(&files).is_empty());
296    }
297
298    #[test]
299    fn plan_selects_small_files() {
300        let planner = CompactionPlanner::new(CompactionConfig {
301            min_files_to_compact: 2,
302            target_file_size_bytes: 1000,
303            ..Default::default()
304        });
305        let files = vec![
306            DataFileEntry {
307                path: "small.parquet".into(),
308                record_count: 5,
309                file_size_bytes: 500,
310                centroid_b64: None,
311                radius: None,
312                hnsw_offset: None,
313                hnsw_len: None,
314                vector_column: None,
315                vector_dim: None,
316                extra_vector_indexes: vec![],
317                index_status: ailake_catalog::IndexStatus::Ready,
318                batch_id: None,
319                embedding_model: None,
320            },
321            DataFileEntry {
322                path: "large.parquet".into(),
323                record_count: 5000,
324                file_size_bytes: 200_000_000,
325                centroid_b64: None,
326                radius: None,
327                hnsw_offset: None,
328                hnsw_len: None,
329                vector_column: None,
330                vector_dim: None,
331                extra_vector_indexes: vec![],
332                index_status: ailake_catalog::IndexStatus::Ready,
333                batch_id: None,
334                embedding_model: None,
335            },
336            DataFileEntry {
337                path: "also-small.parquet".into(),
338                record_count: 5,
339                file_size_bytes: 800,
340                centroid_b64: None,
341                radius: None,
342                hnsw_offset: None,
343                hnsw_len: None,
344                vector_column: None,
345                vector_dim: None,
346                extra_vector_indexes: vec![],
347                index_status: ailake_catalog::IndexStatus::Ready,
348                batch_id: None,
349                embedding_model: None,
350            },
351        ];
352        let selected = planner.plan(&files);
353        assert_eq!(selected.len(), 2);
354        assert!(selected.iter().any(|f| f.path == "small.parquet"));
355        assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
356    }
357
358    #[tokio::test]
359    async fn compact_merges_two_files() {
360        use ailake_core::{VectorMetric, VectorPrecision};
361        use ailake_store::LocalStore;
362        use arrow_array::{Int32Array, RecordBatch};
363        use arrow_schema::{DataType, Field, Schema};
364        use std::sync::Arc;
365        use tempfile::TempDir;
366
367        let dir = TempDir::new().unwrap();
368        let store = Arc::new(LocalStore::new(dir.path()));
369        let policy = VectorStoragePolicy {
370            column_name: "embedding".into(),
371            dim: 4,
372            metric: VectorMetric::Cosine,
373            precision: VectorPrecision::F16,
374            pq: None,
375            keep_raw_for_reranking: true,
376            pre_normalize: false,
377            hnsw_m: None,
378            hnsw_ef_construction: None,
379            ivf_residual: false,
380            embedding_model: None,
381            modality: None,
382        };
383
384        // Write two small files
385        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
386        let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
387        let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
388
389        let batch_a = RecordBatch::try_new(
390            schema.clone(),
391            vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
392        )
393        .unwrap();
394        let batch_b = RecordBatch::try_new(
395            schema.clone(),
396            vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
397        )
398        .unwrap();
399
400        let writer_a = AilakeFileWriter::new(policy.clone());
401        let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
402        let writer_b = AilakeFileWriter::new(policy.clone());
403        let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
404
405        store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
406        store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
407
408        let entries = vec![
409            DataFileEntry {
410                path: "data/a.parquet".into(),
411                record_count: 2,
412                file_size_bytes: bytes_a.len() as u64,
413                centroid_b64: None,
414                radius: None,
415                hnsw_offset: None,
416                hnsw_len: None,
417                vector_column: None,
418                vector_dim: None,
419                extra_vector_indexes: vec![],
420                index_status: ailake_catalog::IndexStatus::Ready,
421                batch_id: None,
422                embedding_model: None,
423            },
424            DataFileEntry {
425                path: "data/b.parquet".into(),
426                record_count: 2,
427                file_size_bytes: bytes_b.len() as u64,
428                centroid_b64: None,
429                radius: None,
430                hnsw_offset: None,
431                hnsw_len: None,
432                vector_column: None,
433                vector_dim: None,
434                extra_vector_indexes: vec![],
435                index_status: ailake_catalog::IndexStatus::Ready,
436                batch_id: None,
437                embedding_model: None,
438            },
439        ];
440
441        let executor = CompactionExecutor::new(store.clone(), policy.clone());
442        let merged = executor
443            .compact(&entries, "data/merged.parquet")
444            .await
445            .unwrap();
446
447        assert_eq!(merged.record_count, 4);
448        assert_eq!(merged.path, "data/merged.parquet");
449
450        // Verify merged file is a valid AI-Lake file with all 4 rows
451        let merged_bytes = store.get("data/merged.parquet").await.unwrap();
452        let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
453        reader.verify_integrity().unwrap();
454        let (batch, embs) = reader.read_parquet().unwrap();
455        assert_eq!(batch.num_rows(), 4);
456        assert_eq!(embs.len(), 4);
457    }
458}