1use std::sync::Arc;
12
13use bytes::Bytes;
14use roaring::RoaringBitmap;
15
16use ailake_catalog::{
17 provider::{
18 new_snapshot_id, CatalogProvider, DeletionVector, NewSnapshot, SnapshotOperation,
19 TableIdent,
20 },
21 DataFileEntry,
22};
23use ailake_core::{AilakeError, AilakeResult};
24use ailake_store::Store;
25
26use crate::dv::load_deletion_vector;
27
28const PUFFIN_MAGIC: &[u8] = b"PFAc";
32
33pub struct PuffinWriter;
42
43impl PuffinWriter {
44 pub fn write_single_dv(
48 bitmap: &RoaringBitmap,
49 snapshot_id: i64,
50 ) -> AilakeResult<(Bytes, u64, u64)> {
51 let mut blob = Vec::new();
52 bitmap
53 .serialize_into(&mut blob)
54 .map_err(|e| AilakeError::Io(std::io::Error::other(format!("DV serialize: {e}"))))?;
55
56 let blob_offset = PUFFIN_MAGIC.len() as u64;
57 let blob_length = blob.len() as u64;
58
59 let footer_json = serde_json::json!({
61 "blobs": [{
62 "type": "deletion-vector-v1",
63 "snapshot-id": snapshot_id,
64 "sequence-number": 0,
65 "offset": blob_offset,
66 "length": blob_length
67 }],
68 "properties": {}
69 })
70 .to_string();
71 let footer_bytes = footer_json.as_bytes();
72 let footer_len = (footer_bytes.len() as u32).to_le_bytes();
73
74 let mut out =
75 Vec::with_capacity(PUFFIN_MAGIC.len() * 2 + blob.len() + footer_bytes.len() + 4);
76 out.extend_from_slice(PUFFIN_MAGIC);
77 out.extend_from_slice(&blob);
78 out.extend_from_slice(footer_bytes);
79 out.extend_from_slice(&footer_len);
80 out.extend_from_slice(PUFFIN_MAGIC);
81
82 Ok((Bytes::from(out), blob_offset, blob_length))
83 }
84}
85
86pub async fn delete_rows(
116 catalog: Arc<dyn CatalogProvider>,
117 store: Arc<dyn Store>,
118 table: &TableIdent,
119 file_path: &str,
120 row_ids: &[u32],
121) -> AilakeResult<()> {
122 if row_ids.is_empty() {
123 return Ok(());
124 }
125
126 let meta = catalog.load_table(table).await?;
128 if meta.format_version < 3 {
129 return Err(AilakeError::InvalidArgument(format!(
130 "Deletion Vectors require Iceberg V3 table (got format-version={}). \
131 Recreate the table with format_version=3.",
132 meta.format_version
133 )));
134 }
135
136 let mut files: Vec<DataFileEntry> = catalog.list_files(table, None).await?;
138
139 let target_idx = files
141 .iter()
142 .position(|f| f.path == file_path || f.path.ends_with(file_path))
143 .ok_or_else(|| {
144 AilakeError::Catalog(format!("file '{file_path}' not found in current snapshot"))
145 })?;
146
147 let mut bitmap = if let Some(ref dv) = files[target_idx].deletion_vector {
149 load_deletion_vector(&store, dv).await.unwrap_or_default()
150 } else {
151 RoaringBitmap::new()
152 };
153 for &id in row_ids {
154 bitmap.insert(id);
155 }
156 let cardinality = bitmap.len() as i64;
157
158 let snap_id = new_snapshot_id();
160 let (puffin_bytes, blob_offset, blob_length) = PuffinWriter::write_single_dv(&bitmap, snap_id)?;
161 let table_root = meta.location.trim_end_matches('/');
162 let dv_path = format!("{table_root}/metadata/dv-{snap_id}.dvd");
163 store.put(&dv_path, puffin_bytes).await?;
164
165 files[target_idx].deletion_vector = Some(DeletionVector {
167 path: dv_path,
168 offset: blob_offset,
169 length: blob_length,
170 cardinality,
171 });
172
173 let snapshot = NewSnapshot {
176 snapshot_id: snap_id,
177 parent_snapshot_id: meta.current_snapshot_id,
178 files,
179 operation: SnapshotOperation::Replace,
180 iceberg_schema: None,
181 extra_properties: std::collections::HashMap::new(),
182 bloom_filters: vec![],
183 equality_delete_files: vec![],
184 };
185 catalog.commit_snapshot(table, snapshot).await?;
186 Ok(())
187}
188
189pub async fn delete_where(
204 catalog: Arc<dyn CatalogProvider>,
205 store: Arc<dyn Store>,
206 table: &TableIdent,
207 column_name: &str,
208 values: &[&str],
209) -> AilakeResult<()> {
210 if values.is_empty() {
211 return Ok(());
212 }
213
214 let meta = catalog.load_table(table).await?;
215 let table_root = meta.location.trim_end_matches('/');
216
217 let (field_id, iceberg_type) = meta
221 .schema_fields
222 .iter()
223 .find(|sf| sf.name == column_name)
224 .map(|sf| (sf.id, sf.iceberg_type.clone()))
225 .unwrap_or((0, "string".to_string()));
226
227 let snap_id = new_snapshot_id();
229 let eq_del_avro =
230 ailake_catalog::write_equality_delete_avro(column_name, field_id, &iceberg_type, values)
231 .map_err(|e| AilakeError::Catalog(e.to_string()))?;
232 let file_size = eq_del_avro.len() as u64;
233 let eq_del_path = format!("{table_root}/metadata/eq-del-{snap_id}.avro");
234 store.put(&eq_del_path, eq_del_avro).await?;
235
236 let eq_del_file = ailake_catalog::EqualityDeleteFile {
237 path: eq_del_path,
238 equality_ids: vec![field_id],
239 record_count: values.len() as u64,
240 file_size_bytes: file_size,
241 };
242
243 let snapshot = NewSnapshot {
245 snapshot_id: snap_id,
246 parent_snapshot_id: meta.current_snapshot_id,
247 files: vec![],
248 operation: SnapshotOperation::Delete,
249 iceberg_schema: None,
250 extra_properties: std::collections::HashMap::new(),
251 bloom_filters: vec![],
252 equality_delete_files: vec![eq_del_file],
253 };
254 catalog.commit_snapshot(table, snapshot).await?;
255 Ok(())
256}
257
258#[cfg(test)]
261mod tests {
262 use super::*;
263 use ailake_catalog::{
264 provider::{IndexStatus, TableProperties},
265 HadoopCatalog,
266 };
267 use ailake_core::{VectorMetric, VectorPrecision, VectorStoragePolicy};
268 use ailake_store::LocalStore;
269
270 fn make_props(format_version: u8) -> TableProperties {
271 TableProperties {
272 policy: VectorStoragePolicy {
273 column_name: "embedding".to_string(),
274 dim: 4,
275 metric: VectorMetric::Cosine,
276 precision: VectorPrecision::F16,
277 pq: None,
278 keep_raw_for_reranking: true,
279 pre_normalize: false,
280 hnsw_m: None,
281 hnsw_ef_construction: None,
282 ivf_residual: false,
283 embedding_model: None,
284 modality: None,
285 partition_by: None,
286 partition_value: None,
287 partition_column_type: None,
288 partition_fields: vec![],
289 },
290 extra: std::collections::HashMap::new(),
291 format_version,
292 partition_column_type: None,
293 }
294 }
295
296 fn make_file_entry(path: &str) -> DataFileEntry {
297 DataFileEntry {
298 path: path.to_string(),
299 record_count: 100,
300 file_size_bytes: 4096,
301 centroid_b64: None,
302 radius: None,
303 hnsw_offset: None,
304 hnsw_len: None,
305 vector_column: Some("embedding".to_string()),
306 vector_dim: Some(4),
307 extra_vector_indexes: vec![],
308 index_status: IndexStatus::Ready,
309 batch_id: None,
310 embedding_model: None,
311 partition_value: None,
312 deletion_vector: None,
313 first_row_id: None,
314 }
315 }
316
317 async fn setup_v3_table(
318 warehouse: &str,
319 store: Arc<dyn Store>,
320 ) -> (Arc<dyn CatalogProvider>, TableIdent) {
321 let catalog: Arc<dyn CatalogProvider> =
322 Arc::new(HadoopCatalog::new(Arc::clone(&store), warehouse));
323 let table = TableIdent::new("default", "docs");
324 catalog.create_table(&table, &make_props(3)).await.unwrap();
325
326 let snap = NewSnapshot {
327 snapshot_id: new_snapshot_id(),
328 parent_snapshot_id: None,
329 files: vec![make_file_entry("data/part-00001.parquet")],
330 operation: SnapshotOperation::Append,
331 iceberg_schema: None,
332 extra_properties: std::collections::HashMap::new(),
333 bloom_filters: vec![],
334 equality_delete_files: vec![],
335 };
336 catalog.commit_snapshot(&table, snap).await.unwrap();
337 (catalog, table)
338 }
339
340 #[tokio::test]
341 async fn writes_dv_and_manifest_reflects_cardinality() {
342 let dir = tempfile::tempdir().unwrap();
343 let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
344 let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
345
346 delete_rows(
347 Arc::clone(&catalog),
348 Arc::clone(&store),
349 &table,
350 "data/part-00001.parquet",
351 &[5, 10, 42],
352 )
353 .await
354 .unwrap();
355
356 let files = catalog.list_files(&table, None).await.unwrap();
357 assert_eq!(files.len(), 1);
358 let dv = files[0]
359 .deletion_vector
360 .as_ref()
361 .expect("DV should be present");
362 assert_eq!(dv.cardinality, 3);
363
364 let bm = load_deletion_vector(&store, dv).await.unwrap();
366 assert!(bm.contains(5));
367 assert!(bm.contains(10));
368 assert!(bm.contains(42));
369 assert!(!bm.contains(0));
370 assert_eq!(bm.len(), 3);
371 }
372
373 #[tokio::test]
374 async fn merges_with_existing_dv_across_calls() {
375 let dir = tempfile::tempdir().unwrap();
376 let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
377 let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
378
379 delete_rows(
381 Arc::clone(&catalog),
382 Arc::clone(&store),
383 &table,
384 "data/part-00001.parquet",
385 &[1, 2],
386 )
387 .await
388 .unwrap();
389
390 delete_rows(
392 Arc::clone(&catalog),
393 Arc::clone(&store),
394 &table,
395 "data/part-00001.parquet",
396 &[3, 4],
397 )
398 .await
399 .unwrap();
400
401 let files = catalog.list_files(&table, None).await.unwrap();
402 let dv = files[0].deletion_vector.as_ref().unwrap();
403 let bm = load_deletion_vector(&store, dv).await.unwrap();
404 assert!(bm.contains(1) && bm.contains(2) && bm.contains(3) && bm.contains(4));
405 assert_eq!(bm.len(), 4);
406 }
407
408 #[tokio::test]
409 async fn rejects_v2_table() {
410 let dir = tempfile::tempdir().unwrap();
411 let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
412 let catalog: Arc<dyn CatalogProvider> =
413 Arc::new(HadoopCatalog::new(Arc::clone(&store), ""));
414 let table = TableIdent::new("default", "docs");
415 catalog.create_table(&table, &make_props(2)).await.unwrap();
416
417 let err = delete_rows(
418 Arc::clone(&catalog),
419 Arc::clone(&store),
420 &table,
421 "data/part-00001.parquet",
422 &[0],
423 )
424 .await
425 .unwrap_err();
426 assert!(err.to_string().contains("format-version=2"));
427 }
428
429 #[tokio::test]
430 async fn noop_when_row_ids_empty() {
431 let dir = tempfile::tempdir().unwrap();
432 let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
433 let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
434
435 delete_rows(
437 Arc::clone(&catalog),
438 Arc::clone(&store),
439 &table,
440 "data/part-00001.parquet",
441 &[],
442 )
443 .await
444 .unwrap();
445
446 let files = catalog.list_files(&table, None).await.unwrap();
447 assert!(files[0].deletion_vector.is_none());
448 }
449
450 #[tokio::test]
451 async fn puffin_magic_and_structure_valid() {
452 let mut bm = RoaringBitmap::new();
453 bm.insert(7);
454 bm.insert(99);
455 let (bytes, offset, length) = PuffinWriter::write_single_dv(&bm, 42).unwrap();
456
457 assert_eq!(&bytes[..4], PUFFIN_MAGIC);
459 assert_eq!(&bytes[bytes.len() - 4..], PUFFIN_MAGIC);
460
461 let blob_slice = &bytes[offset as usize..(offset + length) as usize];
463 let recovered = RoaringBitmap::deserialize_from(blob_slice).unwrap();
464 assert!(recovered.contains(7) && recovered.contains(99));
465 }
466}