1use std::sync::Arc;
3use tracing::{debug, error, info};
4
5use ailake_catalog::{
6 make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
7 TableIdent, VectorIndexInfo,
8};
9use ailake_core::{AilakeResult, VectorStoragePolicy};
10use ailake_file::{AilakeFileReader, AilakeFileWriter};
11use ailake_store::Store;
12use ailake_vec::compute_centroid_and_radius;
13use arrow_array::RecordBatch;
14use arrow_schema::SchemaRef;
15use bytes::Bytes;
16
17#[derive(Debug, Clone, Default)]
19pub enum CompactionIndexStrategy {
20 #[default]
23 Auto,
24 ForceHnsw,
26 ForceIvfPq,
28}
29
30#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32 pub min_files_to_compact: usize,
34 pub target_file_size_bytes: u64,
36 pub index_strategy: CompactionIndexStrategy,
38}
39
40impl Default for CompactionConfig {
41 fn default() -> Self {
42 Self {
43 min_files_to_compact: 4,
44 target_file_size_bytes: 128 * 1024 * 1024, index_strategy: CompactionIndexStrategy::Auto,
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum CompactionMode {
52 Full, Partial, }
55
56pub struct CompactionPlanner {
57 config: CompactionConfig,
58}
59
60impl CompactionPlanner {
61 pub fn new(config: CompactionConfig) -> Self {
62 Self { config }
63 }
64
65 pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
68 let candidates: Vec<DataFileEntry> = files
69 .iter()
70 .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
71 .cloned()
72 .collect();
73 if candidates.len() < self.config.min_files_to_compact {
74 debug!(
75 "ailake: compaction skipped — {} eligible files < min_files_to_compact={}",
76 candidates.len(),
77 self.config.min_files_to_compact
78 );
79 return vec![];
80 }
81 let total_bytes: u64 = candidates.iter().map(|f| f.file_size_bytes).sum();
82 info!(
83 "ailake: compaction plan — {} files ({} bytes) → 1 merged file",
84 candidates.len(),
85 total_bytes
86 );
87 candidates
88 }
89}
90
91pub struct CompactionExecutor {
98 store: Arc<dyn Store>,
99 policy: VectorStoragePolicy,
100 index_strategy: CompactionIndexStrategy,
101}
102
103impl CompactionExecutor {
104 pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
105 Self {
106 store,
107 policy,
108 index_strategy: CompactionIndexStrategy::Auto,
109 }
110 }
111
112 pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
114 self.index_strategy = strategy;
115 self
116 }
117
118 pub async fn compact(
121 &self,
122 files: &[DataFileEntry],
123 output_path: &str,
124 ) -> AilakeResult<DataFileEntry> {
125 if files.is_empty() {
126 return Err(ailake_core::AilakeError::Catalog(
127 "compact: no files provided".into(),
128 ));
129 }
130
131 let mut all_batches: Vec<RecordBatch> = Vec::new();
132 let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
133 let mut schema: Option<SchemaRef> = None;
134
135 for entry in files {
136 let bytes: Bytes = self.store.get(&entry.path).await?;
137 let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
138 if !reader.is_ailake_file() {
139 debug!(
140 "ailake: compaction skipping {} — not an AI-Lake file",
141 entry.path
142 );
143 continue;
144 }
145 let (batch, embs) = reader.read_parquet()?;
146 if schema.is_none() {
147 schema = Some(batch.schema());
148 }
149 all_batches.push(batch);
150 all_embeddings.extend(embs);
151 }
152
153 if all_batches.is_empty() {
154 return Err(ailake_core::AilakeError::Catalog(
155 "compact: no valid AI-Lake files in input".into(),
156 ));
157 }
158
159 let merged_batch = concat_batches(
162 schema.expect("schema set because all_batches is non-empty"),
163 &all_batches,
164 )?;
165 let record_count = merged_batch.num_rows() as u64;
166
167 let writer = {
169 let base = AilakeFileWriter::new(self.policy.clone());
170 match &self.index_strategy {
171 CompactionIndexStrategy::Auto => base.with_auto_index(),
172 CompactionIndexStrategy::ForceHnsw => base,
173 CompactionIndexStrategy::ForceIvfPq => {
174 let cfg = ailake_index::IvfPqConfig::for_dataset(
175 self.policy.dim as usize,
176 all_embeddings.len(),
177 );
178 base.with_ivf_pq(cfg)
179 }
180 }
181 };
182 let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
183 let file_size = file_bytes.len() as u64;
184 self.store.put(output_path, file_bytes.clone()).await?;
185
186 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
188 let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
189 let header = reader.read_header()?;
190 let ailk_start = reader.ailk_offset()?;
191
192 let entry = make_data_file_entry(
193 output_path,
194 record_count,
195 file_size,
196 ¢roid,
197 VectorIndexInfo {
198 column: &self.policy.column_name,
199 dim: self.policy.dim,
200 hnsw_offset: ailk_start + header.hnsw_offset,
201 hnsw_len: header.hnsw_len,
202 },
203 );
204 Ok(entry)
205 }
206
207 pub async fn run(
209 &self,
210 planner: &CompactionPlanner,
211 table: &TableIdent,
212 catalog: Arc<dyn CatalogProvider>,
213 output_prefix: &str,
214 ) -> AilakeResult<Option<DataFileEntry>> {
215 let all_files = catalog.list_files(table, None).await?;
216 let to_compact = planner.plan(&all_files);
217 if to_compact.is_empty() {
218 return Ok(None);
219 }
220
221 let ts = std::time::SystemTime::now()
222 .duration_since(std::time::UNIX_EPOCH)
223 .unwrap_or_else(|e| e.duration()) .as_millis();
225 let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
226
227 let merged = self.compact(&to_compact, &output_path).await?;
228
229 let snapshot = NewSnapshot {
231 snapshot_id: ailake_catalog::new_snapshot_id(),
232 parent_snapshot_id: None,
233 files: vec![merged.clone()],
234 operation: SnapshotOperation::Replace,
235 iceberg_schema: None,
236 extra_properties: std::collections::HashMap::new(),
237 };
238 catalog.commit_snapshot(table, snapshot).await?;
239
240 info!(
241 "ailake: compaction committed — merged {} files into {}",
242 to_compact.len(),
243 output_path
244 );
245
246 for entry in &to_compact {
248 if let Err(e) = self.store.delete(&entry.path).await {
249 error!(
250 "ailake: compaction cleanup failed — could not delete {}: {} \
251 (orphan file in object store after successful catalog commit; \
252 delete manually to reclaim storage)",
253 entry.path, e
254 );
255 }
256 }
257
258 Ok(Some(merged))
259 }
260}
261
262fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
263 arrow_select::concat::concat_batches(&schema, batches)
264 .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn plan_returns_empty_if_too_few_files() {
273 let planner = CompactionPlanner::new(CompactionConfig {
274 min_files_to_compact: 4,
275 target_file_size_bytes: 1024 * 1024,
276 ..Default::default()
277 });
278 let files: Vec<DataFileEntry> = (0..3)
279 .map(|i| DataFileEntry {
280 path: format!("file-{i}.parquet"),
281 record_count: 10,
282 file_size_bytes: 100, centroid_b64: None,
284 radius: None,
285 hnsw_offset: None,
286 hnsw_len: None,
287 vector_column: None,
288 vector_dim: None,
289 extra_vector_indexes: vec![],
290 index_status: ailake_catalog::IndexStatus::Ready,
291 batch_id: None,
292 embedding_model: None,
293 })
294 .collect();
295 assert!(planner.plan(&files).is_empty());
296 }
297
298 #[test]
299 fn plan_selects_small_files() {
300 let planner = CompactionPlanner::new(CompactionConfig {
301 min_files_to_compact: 2,
302 target_file_size_bytes: 1000,
303 ..Default::default()
304 });
305 let files = vec![
306 DataFileEntry {
307 path: "small.parquet".into(),
308 record_count: 5,
309 file_size_bytes: 500,
310 centroid_b64: None,
311 radius: None,
312 hnsw_offset: None,
313 hnsw_len: None,
314 vector_column: None,
315 vector_dim: None,
316 extra_vector_indexes: vec![],
317 index_status: ailake_catalog::IndexStatus::Ready,
318 batch_id: None,
319 embedding_model: None,
320 },
321 DataFileEntry {
322 path: "large.parquet".into(),
323 record_count: 5000,
324 file_size_bytes: 200_000_000,
325 centroid_b64: None,
326 radius: None,
327 hnsw_offset: None,
328 hnsw_len: None,
329 vector_column: None,
330 vector_dim: None,
331 extra_vector_indexes: vec![],
332 index_status: ailake_catalog::IndexStatus::Ready,
333 batch_id: None,
334 embedding_model: None,
335 },
336 DataFileEntry {
337 path: "also-small.parquet".into(),
338 record_count: 5,
339 file_size_bytes: 800,
340 centroid_b64: None,
341 radius: None,
342 hnsw_offset: None,
343 hnsw_len: None,
344 vector_column: None,
345 vector_dim: None,
346 extra_vector_indexes: vec![],
347 index_status: ailake_catalog::IndexStatus::Ready,
348 batch_id: None,
349 embedding_model: None,
350 },
351 ];
352 let selected = planner.plan(&files);
353 assert_eq!(selected.len(), 2);
354 assert!(selected.iter().any(|f| f.path == "small.parquet"));
355 assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
356 }
357
358 #[tokio::test]
359 async fn compact_merges_two_files() {
360 use ailake_core::{VectorMetric, VectorPrecision};
361 use ailake_store::LocalStore;
362 use arrow_array::{Int32Array, RecordBatch};
363 use arrow_schema::{DataType, Field, Schema};
364 use std::sync::Arc;
365 use tempfile::TempDir;
366
367 let dir = TempDir::new().unwrap();
368 let store = Arc::new(LocalStore::new(dir.path()));
369 let policy = VectorStoragePolicy {
370 column_name: "embedding".into(),
371 dim: 4,
372 metric: VectorMetric::Cosine,
373 precision: VectorPrecision::F16,
374 pq: None,
375 keep_raw_for_reranking: true,
376 pre_normalize: false,
377 hnsw_m: None,
378 hnsw_ef_construction: None,
379 ivf_residual: false,
380 embedding_model: None,
381 modality: None,
382 };
383
384 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
386 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
387 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
388
389 let batch_a = RecordBatch::try_new(
390 schema.clone(),
391 vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
392 )
393 .unwrap();
394 let batch_b = RecordBatch::try_new(
395 schema.clone(),
396 vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
397 )
398 .unwrap();
399
400 let writer_a = AilakeFileWriter::new(policy.clone());
401 let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
402 let writer_b = AilakeFileWriter::new(policy.clone());
403 let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
404
405 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
406 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
407
408 let entries = vec![
409 DataFileEntry {
410 path: "data/a.parquet".into(),
411 record_count: 2,
412 file_size_bytes: bytes_a.len() as u64,
413 centroid_b64: None,
414 radius: None,
415 hnsw_offset: None,
416 hnsw_len: None,
417 vector_column: None,
418 vector_dim: None,
419 extra_vector_indexes: vec![],
420 index_status: ailake_catalog::IndexStatus::Ready,
421 batch_id: None,
422 embedding_model: None,
423 },
424 DataFileEntry {
425 path: "data/b.parquet".into(),
426 record_count: 2,
427 file_size_bytes: bytes_b.len() as u64,
428 centroid_b64: None,
429 radius: None,
430 hnsw_offset: None,
431 hnsw_len: None,
432 vector_column: None,
433 vector_dim: None,
434 extra_vector_indexes: vec![],
435 index_status: ailake_catalog::IndexStatus::Ready,
436 batch_id: None,
437 embedding_model: None,
438 },
439 ];
440
441 let executor = CompactionExecutor::new(store.clone(), policy.clone());
442 let merged = executor
443 .compact(&entries, "data/merged.parquet")
444 .await
445 .unwrap();
446
447 assert_eq!(merged.record_count, 4);
448 assert_eq!(merged.path, "data/merged.parquet");
449
450 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
452 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
453 reader.verify_integrity().unwrap();
454 let (batch, embs) = reader.read_parquet().unwrap();
455 assert_eq!(batch.num_rows(), 4);
456 assert_eq!(embs.len(), 4);
457 }
458}