1use std::sync::Arc;
3
4use ailake_catalog::{
5 make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
6 TableIdent, VectorIndexInfo,
7};
8use ailake_core::{AilakeResult, VectorStoragePolicy};
9use ailake_file::{AilakeFileReader, AilakeFileWriter};
10use ailake_store::Store;
11use ailake_vec::compute_centroid_and_radius;
12use arrow_array::RecordBatch;
13use arrow_schema::SchemaRef;
14use bytes::Bytes;
15
16#[derive(Debug, Clone, Default)]
18pub enum CompactionIndexStrategy {
19 #[default]
22 Auto,
23 ForceHnsw,
25 ForceIvfPq,
27}
28
29#[derive(Debug, Clone)]
30pub struct CompactionConfig {
31 pub min_files_to_compact: usize,
33 pub target_file_size_bytes: u64,
35 pub index_strategy: CompactionIndexStrategy,
37}
38
39impl Default for CompactionConfig {
40 fn default() -> Self {
41 Self {
42 min_files_to_compact: 4,
43 target_file_size_bytes: 128 * 1024 * 1024, index_strategy: CompactionIndexStrategy::Auto,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy)]
50pub enum CompactionMode {
51 Full, Partial, }
54
55pub struct CompactionPlanner {
56 config: CompactionConfig,
57}
58
59impl CompactionPlanner {
60 pub fn new(config: CompactionConfig) -> Self {
61 Self { config }
62 }
63
64 pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
67 let candidates: Vec<DataFileEntry> = files
68 .iter()
69 .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
70 .cloned()
71 .collect();
72 if candidates.len() < self.config.min_files_to_compact {
73 return vec![];
74 }
75 candidates
76 }
77}
78
79pub struct CompactionExecutor {
86 store: Arc<dyn Store>,
87 policy: VectorStoragePolicy,
88 index_strategy: CompactionIndexStrategy,
89}
90
91impl CompactionExecutor {
92 pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
93 Self {
94 store,
95 policy,
96 index_strategy: CompactionIndexStrategy::Auto,
97 }
98 }
99
100 pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
102 self.index_strategy = strategy;
103 self
104 }
105
106 pub async fn compact(
109 &self,
110 files: &[DataFileEntry],
111 output_path: &str,
112 ) -> AilakeResult<DataFileEntry> {
113 if files.is_empty() {
114 return Err(ailake_core::AilakeError::Catalog(
115 "compact: no files provided".into(),
116 ));
117 }
118
119 let mut all_batches: Vec<RecordBatch> = Vec::new();
120 let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
121 let mut schema: Option<SchemaRef> = None;
122
123 for entry in files {
124 let bytes: Bytes = self.store.get(&entry.path).await?;
125 let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
126 if !reader.is_ailake_file() {
127 continue;
128 }
129 let (batch, embs) = reader.read_parquet()?;
130 if schema.is_none() {
131 schema = Some(batch.schema());
132 }
133 all_batches.push(batch);
134 all_embeddings.extend(embs);
135 }
136
137 if all_batches.is_empty() {
138 return Err(ailake_core::AilakeError::Catalog(
139 "compact: no valid AI-Lake files in input".into(),
140 ));
141 }
142
143 let merged_batch = concat_batches(schema.unwrap(), &all_batches)?;
145 let record_count = merged_batch.num_rows() as u64;
146
147 let writer = {
149 let base = AilakeFileWriter::new(self.policy.clone());
150 match &self.index_strategy {
151 CompactionIndexStrategy::Auto => base.with_auto_index(),
152 CompactionIndexStrategy::ForceHnsw => base,
153 CompactionIndexStrategy::ForceIvfPq => {
154 let cfg = ailake_index::IvfPqConfig::for_dataset(
155 self.policy.dim as usize,
156 all_embeddings.len(),
157 );
158 base.with_ivf_pq(cfg)
159 }
160 }
161 };
162 let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
163 let file_size = file_bytes.len() as u64;
164 self.store.put(output_path, file_bytes.clone()).await?;
165
166 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
168 let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
169 let header = reader.read_header()?;
170 let ailk_start = reader.ailk_offset()?;
171
172 let entry = make_data_file_entry(
173 output_path,
174 record_count,
175 file_size,
176 ¢roid,
177 VectorIndexInfo {
178 column: &self.policy.column_name,
179 dim: self.policy.dim,
180 hnsw_offset: ailk_start + header.hnsw_offset,
181 hnsw_len: header.hnsw_len,
182 },
183 );
184 Ok(entry)
185 }
186
187 pub async fn run(
189 &self,
190 planner: &CompactionPlanner,
191 table: &TableIdent,
192 catalog: Arc<dyn CatalogProvider>,
193 output_prefix: &str,
194 ) -> AilakeResult<Option<DataFileEntry>> {
195 let all_files = catalog.list_files(table, None).await?;
196 let to_compact = planner.plan(&all_files);
197 if to_compact.is_empty() {
198 return Ok(None);
199 }
200
201 let ts = std::time::SystemTime::now()
202 .duration_since(std::time::UNIX_EPOCH)
203 .unwrap()
204 .as_millis();
205 let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
206
207 let merged = self.compact(&to_compact, &output_path).await?;
208
209 let snapshot = NewSnapshot {
211 snapshot_id: ailake_catalog::new_snapshot_id(),
212 parent_snapshot_id: None,
213 files: vec![merged.clone()],
214 operation: SnapshotOperation::Replace,
215 iceberg_schema: None,
216 };
217 catalog.commit_snapshot(table, snapshot).await?;
218
219 for entry in &to_compact {
221 let _ = self.store.delete(&entry.path).await;
222 }
223
224 Ok(Some(merged))
225 }
226}
227
228fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
229 arrow_select::concat::concat_batches(&schema, batches)
230 .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn plan_returns_empty_if_too_few_files() {
239 let planner = CompactionPlanner::new(CompactionConfig {
240 min_files_to_compact: 4,
241 target_file_size_bytes: 1024 * 1024,
242 ..Default::default()
243 });
244 let files: Vec<DataFileEntry> = (0..3)
245 .map(|i| DataFileEntry {
246 path: format!("file-{i}.parquet"),
247 record_count: 10,
248 file_size_bytes: 100, centroid_b64: None,
250 radius: None,
251 hnsw_offset: None,
252 hnsw_len: None,
253 vector_column: None,
254 vector_dim: None,
255 extra_vector_indexes: vec![],
256 index_status: ailake_catalog::IndexStatus::Ready,
257 batch_id: None,
258 })
259 .collect();
260 assert!(planner.plan(&files).is_empty());
261 }
262
263 #[test]
264 fn plan_selects_small_files() {
265 let planner = CompactionPlanner::new(CompactionConfig {
266 min_files_to_compact: 2,
267 target_file_size_bytes: 1000,
268 ..Default::default()
269 });
270 let files = vec![
271 DataFileEntry {
272 path: "small.parquet".into(),
273 record_count: 5,
274 file_size_bytes: 500,
275 centroid_b64: None,
276 radius: None,
277 hnsw_offset: None,
278 hnsw_len: None,
279 vector_column: None,
280 vector_dim: None,
281 extra_vector_indexes: vec![],
282 index_status: ailake_catalog::IndexStatus::Ready,
283 batch_id: None,
284 },
285 DataFileEntry {
286 path: "large.parquet".into(),
287 record_count: 5000,
288 file_size_bytes: 200_000_000,
289 centroid_b64: None,
290 radius: None,
291 hnsw_offset: None,
292 hnsw_len: None,
293 vector_column: None,
294 vector_dim: None,
295 extra_vector_indexes: vec![],
296 index_status: ailake_catalog::IndexStatus::Ready,
297 batch_id: None,
298 },
299 DataFileEntry {
300 path: "also-small.parquet".into(),
301 record_count: 5,
302 file_size_bytes: 800,
303 centroid_b64: None,
304 radius: None,
305 hnsw_offset: None,
306 hnsw_len: None,
307 vector_column: None,
308 vector_dim: None,
309 extra_vector_indexes: vec![],
310 index_status: ailake_catalog::IndexStatus::Ready,
311 batch_id: None,
312 },
313 ];
314 let selected = planner.plan(&files);
315 assert_eq!(selected.len(), 2);
316 assert!(selected.iter().any(|f| f.path == "small.parquet"));
317 assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
318 }
319
320 #[tokio::test]
321 async fn compact_merges_two_files() {
322 use ailake_core::{VectorMetric, VectorPrecision};
323 use ailake_store::LocalStore;
324 use arrow_array::{Int32Array, RecordBatch};
325 use arrow_schema::{DataType, Field, Schema};
326 use std::sync::Arc;
327 use tempfile::TempDir;
328
329 let dir = TempDir::new().unwrap();
330 let store = Arc::new(LocalStore::new(dir.path()));
331 let policy = VectorStoragePolicy {
332 column_name: "embedding".into(),
333 dim: 4,
334 metric: VectorMetric::Cosine,
335 precision: VectorPrecision::F16,
336 pq: None,
337 keep_raw_for_reranking: false,
338 };
339
340 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
342 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
343 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
344
345 let batch_a = RecordBatch::try_new(
346 schema.clone(),
347 vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
348 )
349 .unwrap();
350 let batch_b = RecordBatch::try_new(
351 schema.clone(),
352 vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
353 )
354 .unwrap();
355
356 let writer_a = AilakeFileWriter::new(policy.clone());
357 let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
358 let writer_b = AilakeFileWriter::new(policy.clone());
359 let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
360
361 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
362 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
363
364 let entries = vec![
365 DataFileEntry {
366 path: "data/a.parquet".into(),
367 record_count: 2,
368 file_size_bytes: bytes_a.len() as u64,
369 centroid_b64: None,
370 radius: None,
371 hnsw_offset: None,
372 hnsw_len: None,
373 vector_column: None,
374 vector_dim: None,
375 extra_vector_indexes: vec![],
376 index_status: ailake_catalog::IndexStatus::Ready,
377 batch_id: None,
378 },
379 DataFileEntry {
380 path: "data/b.parquet".into(),
381 record_count: 2,
382 file_size_bytes: bytes_b.len() as u64,
383 centroid_b64: None,
384 radius: None,
385 hnsw_offset: None,
386 hnsw_len: None,
387 vector_column: None,
388 vector_dim: None,
389 extra_vector_indexes: vec![],
390 index_status: ailake_catalog::IndexStatus::Ready,
391 batch_id: None,
392 },
393 ];
394
395 let executor = CompactionExecutor::new(store.clone(), policy.clone());
396 let merged = executor
397 .compact(&entries, "data/merged.parquet")
398 .await
399 .unwrap();
400
401 assert_eq!(merged.record_count, 4);
402 assert_eq!(merged.path, "data/merged.parquet");
403
404 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
406 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
407 reader.verify_integrity().unwrap();
408 let (batch, embs) = reader.read_parquet().unwrap();
409 assert_eq!(batch.num_rows(), 4);
410 assert_eq!(embs.len(), 4);
411 }
412}