1use std::sync::Arc;
2
3use ailake_catalog::{
4 make_data_file_entry, CatalogProvider, DataFileEntry, NewSnapshot, SnapshotOperation,
5 TableIdent, VectorIndexInfo,
6};
7use ailake_core::{AilakeResult, VectorStoragePolicy};
8use ailake_file::{AilakeFileReader, AilakeFileWriter};
9use ailake_store::Store;
10use ailake_vec::compute_centroid_and_radius;
11use arrow_array::RecordBatch;
12use arrow_schema::SchemaRef;
13use bytes::Bytes;
14
15#[derive(Debug, Clone, Default)]
17pub enum CompactionIndexStrategy {
18 #[default]
21 Auto,
22 ForceHnsw,
24 ForceIvfPq,
26}
27
28#[derive(Debug, Clone)]
29pub struct CompactionConfig {
30 pub min_files_to_compact: usize,
32 pub target_file_size_bytes: u64,
34 pub index_strategy: CompactionIndexStrategy,
36}
37
38impl Default for CompactionConfig {
39 fn default() -> Self {
40 Self {
41 min_files_to_compact: 4,
42 target_file_size_bytes: 128 * 1024 * 1024, index_strategy: CompactionIndexStrategy::Auto,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Copy)]
49pub enum CompactionMode {
50 Full, Partial, }
53
54pub struct CompactionPlanner {
55 config: CompactionConfig,
56}
57
58impl CompactionPlanner {
59 pub fn new(config: CompactionConfig) -> Self {
60 Self { config }
61 }
62
63 pub fn plan(&self, files: &[DataFileEntry]) -> Vec<DataFileEntry> {
66 let candidates: Vec<DataFileEntry> = files
67 .iter()
68 .filter(|f| f.file_size_bytes < self.config.target_file_size_bytes)
69 .cloned()
70 .collect();
71 if candidates.len() < self.config.min_files_to_compact {
72 return vec![];
73 }
74 candidates
75 }
76}
77
78pub struct CompactionExecutor {
85 store: Arc<dyn Store>,
86 policy: VectorStoragePolicy,
87 index_strategy: CompactionIndexStrategy,
88}
89
90impl CompactionExecutor {
91 pub fn new(store: Arc<dyn Store>, policy: VectorStoragePolicy) -> Self {
92 Self {
93 store,
94 policy,
95 index_strategy: CompactionIndexStrategy::Auto,
96 }
97 }
98
99 pub fn with_index_strategy(mut self, strategy: CompactionIndexStrategy) -> Self {
101 self.index_strategy = strategy;
102 self
103 }
104
105 pub async fn compact(
108 &self,
109 files: &[DataFileEntry],
110 output_path: &str,
111 ) -> AilakeResult<DataFileEntry> {
112 if files.is_empty() {
113 return Err(ailake_core::AilakeError::Catalog(
114 "compact: no files provided".into(),
115 ));
116 }
117
118 let mut all_batches: Vec<RecordBatch> = Vec::new();
119 let mut all_embeddings: Vec<Vec<f32>> = Vec::new();
120 let mut schema: Option<SchemaRef> = None;
121
122 for entry in files {
123 let bytes: Bytes = self.store.get(&entry.path).await?;
124 let reader = AilakeFileReader::new(bytes, &self.policy.column_name, self.policy.dim);
125 if !reader.is_ailake_file() {
126 continue;
127 }
128 let (batch, embs) = reader.read_parquet()?;
129 if schema.is_none() {
130 schema = Some(batch.schema());
131 }
132 all_batches.push(batch);
133 all_embeddings.extend(embs);
134 }
135
136 if all_batches.is_empty() {
137 return Err(ailake_core::AilakeError::Catalog(
138 "compact: no valid AI-Lake files in input".into(),
139 ));
140 }
141
142 let merged_batch = concat_batches(schema.unwrap(), &all_batches)?;
144 let record_count = merged_batch.num_rows() as u64;
145
146 let writer = {
148 let base = AilakeFileWriter::new(self.policy.clone());
149 match &self.index_strategy {
150 CompactionIndexStrategy::Auto => base.with_auto_index(),
151 CompactionIndexStrategy::ForceHnsw => base,
152 CompactionIndexStrategy::ForceIvfPq => {
153 let cfg = ailake_index::IvfPqConfig::for_dataset(
154 self.policy.dim as usize,
155 all_embeddings.len(),
156 );
157 base.with_ivf_pq(cfg)
158 }
159 }
160 };
161 let file_bytes = writer.write(&merged_batch, &all_embeddings)?;
162 let file_size = file_bytes.len() as u64;
163 self.store.put(output_path, file_bytes.clone()).await?;
164
165 let centroid = compute_centroid_and_radius(&all_embeddings, self.policy.metric);
167 let reader = AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
168 let header = reader.read_header()?;
169 let ailk_start = reader.ailk_offset()?;
170
171 let entry = make_data_file_entry(
172 output_path,
173 record_count,
174 file_size,
175 ¢roid,
176 VectorIndexInfo {
177 column: &self.policy.column_name,
178 dim: self.policy.dim,
179 hnsw_offset: ailk_start + header.hnsw_offset,
180 hnsw_len: header.hnsw_len,
181 },
182 );
183 Ok(entry)
184 }
185
186 pub async fn run(
188 &self,
189 planner: &CompactionPlanner,
190 table: &TableIdent,
191 catalog: Arc<dyn CatalogProvider>,
192 output_prefix: &str,
193 ) -> AilakeResult<Option<DataFileEntry>> {
194 let all_files = catalog.list_files(table, None).await?;
195 let to_compact = planner.plan(&all_files);
196 if to_compact.is_empty() {
197 return Ok(None);
198 }
199
200 let ts = std::time::SystemTime::now()
201 .duration_since(std::time::UNIX_EPOCH)
202 .unwrap()
203 .as_millis();
204 let output_path = format!("{output_prefix}/compacted-{ts}.parquet");
205
206 let merged = self.compact(&to_compact, &output_path).await?;
207
208 let snapshot = NewSnapshot {
210 snapshot_id: ailake_catalog::new_snapshot_id(),
211 parent_snapshot_id: None,
212 files: vec![merged.clone()],
213 operation: SnapshotOperation::Replace,
214 iceberg_schema: None,
215 };
216 catalog.commit_snapshot(table, snapshot).await?;
217
218 for entry in &to_compact {
220 let _ = self.store.delete(&entry.path).await;
221 }
222
223 Ok(Some(merged))
224 }
225}
226
227fn concat_batches(schema: SchemaRef, batches: &[RecordBatch]) -> AilakeResult<RecordBatch> {
228 arrow_select::concat::concat_batches(&schema, batches)
229 .map_err(|e| ailake_core::AilakeError::Arrow(e.to_string()))
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn plan_returns_empty_if_too_few_files() {
238 let planner = CompactionPlanner::new(CompactionConfig {
239 min_files_to_compact: 4,
240 target_file_size_bytes: 1024 * 1024,
241 ..Default::default()
242 });
243 let files: Vec<DataFileEntry> = (0..3)
244 .map(|i| DataFileEntry {
245 path: format!("file-{i}.parquet"),
246 record_count: 10,
247 file_size_bytes: 100, centroid_b64: None,
249 radius: None,
250 hnsw_offset: None,
251 hnsw_len: None,
252 vector_column: None,
253 vector_dim: None,
254 extra_vector_indexes: vec![],
255 index_status: ailake_catalog::IndexStatus::Ready,
256 })
257 .collect();
258 assert!(planner.plan(&files).is_empty());
259 }
260
261 #[test]
262 fn plan_selects_small_files() {
263 let planner = CompactionPlanner::new(CompactionConfig {
264 min_files_to_compact: 2,
265 target_file_size_bytes: 1000,
266 ..Default::default()
267 });
268 let files = vec![
269 DataFileEntry {
270 path: "small.parquet".into(),
271 record_count: 5,
272 file_size_bytes: 500,
273 centroid_b64: None,
274 radius: None,
275 hnsw_offset: None,
276 hnsw_len: None,
277 vector_column: None,
278 vector_dim: None,
279 extra_vector_indexes: vec![],
280 index_status: ailake_catalog::IndexStatus::Ready,
281 },
282 DataFileEntry {
283 path: "large.parquet".into(),
284 record_count: 5000,
285 file_size_bytes: 200_000_000,
286 centroid_b64: None,
287 radius: None,
288 hnsw_offset: None,
289 hnsw_len: None,
290 vector_column: None,
291 vector_dim: None,
292 extra_vector_indexes: vec![],
293 index_status: ailake_catalog::IndexStatus::Ready,
294 },
295 DataFileEntry {
296 path: "also-small.parquet".into(),
297 record_count: 5,
298 file_size_bytes: 800,
299 centroid_b64: None,
300 radius: None,
301 hnsw_offset: None,
302 hnsw_len: None,
303 vector_column: None,
304 vector_dim: None,
305 extra_vector_indexes: vec![],
306 index_status: ailake_catalog::IndexStatus::Ready,
307 },
308 ];
309 let selected = planner.plan(&files);
310 assert_eq!(selected.len(), 2);
311 assert!(selected.iter().any(|f| f.path == "small.parquet"));
312 assert!(selected.iter().any(|f| f.path == "also-small.parquet"));
313 }
314
315 #[tokio::test]
316 async fn compact_merges_two_files() {
317 use ailake_core::{VectorMetric, VectorPrecision};
318 use ailake_store::LocalStore;
319 use arrow_array::{Int32Array, RecordBatch};
320 use arrow_schema::{DataType, Field, Schema};
321 use std::sync::Arc;
322 use tempfile::TempDir;
323
324 let dir = TempDir::new().unwrap();
325 let store = Arc::new(LocalStore::new(dir.path()));
326 let policy = VectorStoragePolicy {
327 column_name: "embedding".into(),
328 dim: 4,
329 metric: VectorMetric::Cosine,
330 precision: VectorPrecision::F16,
331 pq: None,
332 keep_raw_for_reranking: false,
333 };
334
335 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
337 let embs_a: Vec<Vec<f32>> = vec![vec![1.0, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
338 let embs_b: Vec<Vec<f32>> = vec![vec![0.0, 0.0, 1.0, 0.0], vec![0.0, 0.0, 0.0, 1.0]];
339
340 let batch_a = RecordBatch::try_new(
341 schema.clone(),
342 vec![Arc::new(Int32Array::from(vec![0i32, 1]))],
343 )
344 .unwrap();
345 let batch_b = RecordBatch::try_new(
346 schema.clone(),
347 vec![Arc::new(Int32Array::from(vec![2i32, 3]))],
348 )
349 .unwrap();
350
351 let writer_a = AilakeFileWriter::new(policy.clone());
352 let bytes_a = writer_a.write(&batch_a, &embs_a).unwrap();
353 let writer_b = AilakeFileWriter::new(policy.clone());
354 let bytes_b = writer_b.write(&batch_b, &embs_b).unwrap();
355
356 store.put("data/a.parquet", bytes_a.clone()).await.unwrap();
357 store.put("data/b.parquet", bytes_b.clone()).await.unwrap();
358
359 let entries = vec![
360 DataFileEntry {
361 path: "data/a.parquet".into(),
362 record_count: 2,
363 file_size_bytes: bytes_a.len() as u64,
364 centroid_b64: None,
365 radius: None,
366 hnsw_offset: None,
367 hnsw_len: None,
368 vector_column: None,
369 vector_dim: None,
370 extra_vector_indexes: vec![],
371 index_status: ailake_catalog::IndexStatus::Ready,
372 },
373 DataFileEntry {
374 path: "data/b.parquet".into(),
375 record_count: 2,
376 file_size_bytes: bytes_b.len() as u64,
377 centroid_b64: None,
378 radius: None,
379 hnsw_offset: None,
380 hnsw_len: None,
381 vector_column: None,
382 vector_dim: None,
383 extra_vector_indexes: vec![],
384 index_status: ailake_catalog::IndexStatus::Ready,
385 },
386 ];
387
388 let executor = CompactionExecutor::new(store.clone(), policy.clone());
389 let merged = executor
390 .compact(&entries, "data/merged.parquet")
391 .await
392 .unwrap();
393
394 assert_eq!(merged.record_count, 4);
395 assert_eq!(merged.path, "data/merged.parquet");
396
397 let merged_bytes = store.get("data/merged.parquet").await.unwrap();
399 let reader = AilakeFileReader::new(merged_bytes, "embedding", 4);
400 reader.verify_integrity().unwrap();
401 let (batch, embs) = reader.read_parquet().unwrap();
402 assert_eq!(batch.num_rows(), 4);
403 assert_eq!(embs.len(), 4);
404 }
405}