Skip to main content

ailake_query/
delete.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// Iceberg V3 Deletion Vector write support — Phase C.
3//
4// Produces Roaring Bitmap blobs in minimal Puffin `.dvd` files and updates
5// the manifest entry so scanners (Phase B) automatically mask deleted rows
6// from HNSW and flat-scan results.
7//
8// Phase B (read) is independent: existing DVs written by Spark / Trino /
9// PyIceberg are consumed without requiring Phase C.
10
11use std::sync::Arc;
12
13use bytes::Bytes;
14use roaring::RoaringBitmap;
15
16use ailake_catalog::{
17    provider::{
18        new_snapshot_id, CatalogProvider, DeletionVector, NewSnapshot, SnapshotOperation,
19        TableIdent,
20    },
21    DataFileEntry,
22};
23use ailake_core::{AilakeError, AilakeResult};
24use ailake_store::Store;
25
26use crate::dv::load_deletion_vector;
27
28// ── Puffin writer ─────────────────────────────────────────────────────────────
29
30/// Puffin magic bytes — per Iceberg Puffin spec §2.
31const PUFFIN_MAGIC: &[u8] = b"PFAc";
32
33/// Minimal single-blob Puffin file writer for Deletion Vectors.
34///
35/// Puffin format (simplified, one blob):
36/// ```text
37/// [4 bytes magic "PFAc"] [blob bytes] [footer JSON] [4 bytes footer_len LE] [4 bytes magic "PFAc"]
38/// ```
39/// The DV manifest entry stores `offset=4` (after magic) and `length=blob.len()`, so
40/// readers skip the Puffin header/footer and fetch only the bitmap bytes via range GET.
41pub struct PuffinWriter;
42
43impl PuffinWriter {
44    /// Serialize `bitmap` into a single-blob Puffin file.
45    ///
46    /// Returns `(file_bytes, blob_offset, blob_length)`.
47    pub fn write_single_dv(
48        bitmap: &RoaringBitmap,
49        snapshot_id: i64,
50    ) -> AilakeResult<(Bytes, u64, u64)> {
51        let mut blob = Vec::new();
52        bitmap
53            .serialize_into(&mut blob)
54            .map_err(|e| AilakeError::Io(std::io::Error::other(format!("DV serialize: {e}"))))?;
55
56        let blob_offset = PUFFIN_MAGIC.len() as u64;
57        let blob_length = blob.len() as u64;
58
59        // Footer JSON per Iceberg Puffin spec §4.
60        let footer_json = serde_json::json!({
61            "blobs": [{
62                "type": "deletion-vector-v1",
63                "snapshot-id": snapshot_id,
64                "sequence-number": 0,
65                "offset": blob_offset,
66                "length": blob_length
67            }],
68            "properties": {}
69        })
70        .to_string();
71        let footer_bytes = footer_json.as_bytes();
72        let footer_len = (footer_bytes.len() as u32).to_le_bytes();
73
74        let mut out =
75            Vec::with_capacity(PUFFIN_MAGIC.len() * 2 + blob.len() + footer_bytes.len() + 4);
76        out.extend_from_slice(PUFFIN_MAGIC);
77        out.extend_from_slice(&blob);
78        out.extend_from_slice(footer_bytes);
79        out.extend_from_slice(&footer_len);
80        out.extend_from_slice(PUFFIN_MAGIC);
81
82        Ok((Bytes::from(out), blob_offset, blob_length))
83    }
84}
85
86// ── Public API ────────────────────────────────────────────────────────────────
87
88/// Logically delete rows from a V3 AI-Lake table using Iceberg Deletion Vectors.
89///
90/// # What this does
91/// 1. Verifies the table is `format-version=3` (DVs require V3).
92/// 2. Reads the current file list from the catalog.
93/// 3. Finds `file_path` in the snapshot (exact match or suffix match for
94///    tables where the catalog prefixes absolute paths).
95/// 4. Merges `row_ids` into the existing DV bitmap for that file (or creates
96///    a new one if the file has no DV yet).
97/// 5. Writes a new Puffin `.dvd` file to `{table_location}/metadata/dv-{snap_id}.dvd`.
98/// 6. Commits a `Replace` snapshot so all readers see the updated DV immediately.
99///
100/// After the call, `scanner.rs` (Phase B) will automatically exclude the
101/// deleted rows from HNSW and flat-scan results. The data file is not modified.
102///
103/// # Arguments
104/// * `catalog` — catalog for manifest reads and snapshot commits.
105/// * `store` — object store for Puffin file I/O.
106/// * `table` — fully-qualified table identifier (`namespace.name`).
107/// * `file_path` — path of the data file whose rows are being deleted.
108///   May be a relative path (e.g. `"data/part-00001.parquet"`) or an absolute
109///   path as returned by `catalog.list_files()`. Suffix matching is applied.
110/// * `row_ids` — 0-based row positions to delete (within the data file).
111///
112/// # Errors
113/// * `InvalidArgument` if the table is `format-version < 3`.
114/// * `Catalog` if the table has no current snapshot or `file_path` is not found.
115pub async fn delete_rows(
116    catalog: Arc<dyn CatalogProvider>,
117    store: Arc<dyn Store>,
118    table: &TableIdent,
119    file_path: &str,
120    row_ids: &[u32],
121) -> AilakeResult<()> {
122    if row_ids.is_empty() {
123        return Ok(());
124    }
125
126    // Verify table is V3.
127    let meta = catalog.load_table(table).await?;
128    if meta.format_version < 3 {
129        return Err(AilakeError::InvalidArgument(format!(
130            "Deletion Vectors require Iceberg V3 table (got format-version={}). \
131             Recreate the table with format_version=3.",
132            meta.format_version
133        )));
134    }
135
136    // Load current file list.
137    let mut files: Vec<DataFileEntry> = catalog.list_files(table, None).await?;
138
139    // Find target file (exact match or suffix match for absolute-path manifests).
140    let target_idx = files
141        .iter()
142        .position(|f| f.path == file_path || f.path.ends_with(file_path))
143        .ok_or_else(|| {
144            AilakeError::Catalog(format!("file '{file_path}' not found in current snapshot"))
145        })?;
146
147    // Build bitmap: merge existing DV with new row_ids.
148    let mut bitmap = if let Some(ref dv) = files[target_idx].deletion_vector {
149        load_deletion_vector(&store, dv).await.unwrap_or_default()
150    } else {
151        RoaringBitmap::new()
152    };
153    for &id in row_ids {
154        bitmap.insert(id);
155    }
156    let cardinality = bitmap.len() as i64;
157
158    // Write Puffin .dvd file alongside table metadata.
159    let snap_id = new_snapshot_id();
160    let (puffin_bytes, blob_offset, blob_length) = PuffinWriter::write_single_dv(&bitmap, snap_id)?;
161    let table_root = meta.location.trim_end_matches('/');
162    let dv_path = format!("{table_root}/metadata/dv-{snap_id}.dvd");
163    store.put(&dv_path, puffin_bytes).await?;
164
165    // Patch the target entry with the new DV pointer.
166    files[target_idx].deletion_vector = Some(DeletionVector {
167        path: dv_path,
168        offset: blob_offset,
169        length: blob_length,
170        cardinality,
171    });
172
173    // Replace snapshot: carries all files with the updated DV entry.
174    // Replace does not inherit old manifests — the full file list is the new state.
175    let snapshot = NewSnapshot {
176        snapshot_id: snap_id,
177        parent_snapshot_id: meta.current_snapshot_id,
178        files,
179        operation: SnapshotOperation::Replace,
180        iceberg_schema: None,
181        extra_properties: std::collections::HashMap::new(),
182        bloom_filters: vec![],
183        equality_delete_files: vec![],
184    };
185    catalog.commit_snapshot(table, snapshot).await?;
186    Ok(())
187}
188
189// ── Equality Delete (Phase H) ──────────────────────────────────────────────────
190
191/// Logically delete all rows where `column_name` equals any value in `values`.
192///
193/// Writes an Iceberg equality delete Avro file containing one row per value,
194/// then commits a `Delete` snapshot that inherits existing data manifests and
195/// appends a new delete manifest (`content=1`) pointing to that file.
196///
197/// Scanners that load equality delete files (AI-Lake, Spark, Trino with plugin)
198/// will automatically mask matching rows at read time without rewriting data files.
199///
200/// # Arguments
201/// * `column_name` — column to match against (must exist in the table schema)
202/// * `values` — values that identify rows to delete
203pub async fn delete_where(
204    catalog: Arc<dyn CatalogProvider>,
205    store: Arc<dyn Store>,
206    table: &TableIdent,
207    column_name: &str,
208    values: &[&str],
209) -> AilakeResult<()> {
210    if values.is_empty() {
211        return Ok(());
212    }
213
214    let meta = catalog.load_table(table).await?;
215    let table_root = meta.location.trim_end_matches('/');
216
217    // Look up field-id and iceberg_type from schema_fields. Fall back to id=0 / "string"
218    // for tables without schema_fields (old format) — the column name in the Avro file
219    // is still sufficient for AI-Lake's own scanner.
220    let (field_id, iceberg_type) = meta
221        .schema_fields
222        .iter()
223        .find(|sf| sf.name == column_name)
224        .map(|sf| (sf.id, sf.iceberg_type.clone()))
225        .unwrap_or((0, "string".to_string()));
226
227    // Write equality delete Avro file.
228    let snap_id = new_snapshot_id();
229    let eq_del_avro =
230        ailake_catalog::write_equality_delete_avro(column_name, field_id, &iceberg_type, values)
231            .map_err(|e| AilakeError::Catalog(e.to_string()))?;
232    let file_size = eq_del_avro.len() as u64;
233    let eq_del_path = format!("{table_root}/metadata/eq-del-{snap_id}.avro");
234    store.put(&eq_del_path, eq_del_avro).await?;
235
236    let eq_del_file = ailake_catalog::EqualityDeleteFile {
237        path: eq_del_path,
238        equality_ids: vec![field_id],
239        record_count: values.len() as u64,
240        file_size_bytes: file_size,
241    };
242
243    // Commit Delete snapshot — inherits previous data manifests, appends delete manifest.
244    let snapshot = NewSnapshot {
245        snapshot_id: snap_id,
246        parent_snapshot_id: meta.current_snapshot_id,
247        files: vec![],
248        operation: SnapshotOperation::Delete,
249        iceberg_schema: None,
250        extra_properties: std::collections::HashMap::new(),
251        bloom_filters: vec![],
252        equality_delete_files: vec![eq_del_file],
253    };
254    catalog.commit_snapshot(table, snapshot).await?;
255    Ok(())
256}
257
258// ── Tests ─────────────────────────────────────────────────────────────────────
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use ailake_catalog::{
264        provider::{IndexStatus, TableProperties},
265        HadoopCatalog,
266    };
267    use ailake_core::{VectorMetric, VectorPrecision, VectorStoragePolicy};
268    use ailake_store::LocalStore;
269
270    fn make_props(format_version: u8) -> TableProperties {
271        TableProperties {
272            policy: VectorStoragePolicy {
273                column_name: "embedding".to_string(),
274                dim: 4,
275                metric: VectorMetric::Cosine,
276                precision: VectorPrecision::F16,
277                pq: None,
278                keep_raw_for_reranking: true,
279                pre_normalize: false,
280                hnsw_m: None,
281                hnsw_ef_construction: None,
282                ivf_residual: false,
283                embedding_model: None,
284                modality: None,
285                partition_by: None,
286                partition_value: None,
287                partition_column_type: None,
288                partition_fields: vec![],
289            },
290            extra: std::collections::HashMap::new(),
291            format_version,
292            partition_column_type: None,
293        }
294    }
295
296    fn make_file_entry(path: &str) -> DataFileEntry {
297        DataFileEntry {
298            path: path.to_string(),
299            record_count: 100,
300            file_size_bytes: 4096,
301            centroid_b64: None,
302            radius: None,
303            hnsw_offset: None,
304            hnsw_len: None,
305            vector_column: Some("embedding".to_string()),
306            vector_dim: Some(4),
307            extra_vector_indexes: vec![],
308            index_status: IndexStatus::Ready,
309            batch_id: None,
310            embedding_model: None,
311            partition_value: None,
312            deletion_vector: None,
313            first_row_id: None,
314        }
315    }
316
317    async fn setup_v3_table(
318        warehouse: &str,
319        store: Arc<dyn Store>,
320    ) -> (Arc<dyn CatalogProvider>, TableIdent) {
321        let catalog: Arc<dyn CatalogProvider> =
322            Arc::new(HadoopCatalog::new(Arc::clone(&store), warehouse));
323        let table = TableIdent::new("default", "docs");
324        catalog.create_table(&table, &make_props(3)).await.unwrap();
325
326        let snap = NewSnapshot {
327            snapshot_id: new_snapshot_id(),
328            parent_snapshot_id: None,
329            files: vec![make_file_entry("data/part-00001.parquet")],
330            operation: SnapshotOperation::Append,
331            iceberg_schema: None,
332            extra_properties: std::collections::HashMap::new(),
333            bloom_filters: vec![],
334            equality_delete_files: vec![],
335        };
336        catalog.commit_snapshot(&table, snap).await.unwrap();
337        (catalog, table)
338    }
339
340    #[tokio::test]
341    async fn writes_dv_and_manifest_reflects_cardinality() {
342        let dir = tempfile::tempdir().unwrap();
343        let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
344        let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
345
346        delete_rows(
347            Arc::clone(&catalog),
348            Arc::clone(&store),
349            &table,
350            "data/part-00001.parquet",
351            &[5, 10, 42],
352        )
353        .await
354        .unwrap();
355
356        let files = catalog.list_files(&table, None).await.unwrap();
357        assert_eq!(files.len(), 1);
358        let dv = files[0]
359            .deletion_vector
360            .as_ref()
361            .expect("DV should be present");
362        assert_eq!(dv.cardinality, 3);
363
364        // Verify Puffin file was created and bitmap is correct.
365        let bm = load_deletion_vector(&store, dv).await.unwrap();
366        assert!(bm.contains(5));
367        assert!(bm.contains(10));
368        assert!(bm.contains(42));
369        assert!(!bm.contains(0));
370        assert_eq!(bm.len(), 3);
371    }
372
373    #[tokio::test]
374    async fn merges_with_existing_dv_across_calls() {
375        let dir = tempfile::tempdir().unwrap();
376        let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
377        let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
378
379        // First delete batch.
380        delete_rows(
381            Arc::clone(&catalog),
382            Arc::clone(&store),
383            &table,
384            "data/part-00001.parquet",
385            &[1, 2],
386        )
387        .await
388        .unwrap();
389
390        // Second delete batch — should accumulate.
391        delete_rows(
392            Arc::clone(&catalog),
393            Arc::clone(&store),
394            &table,
395            "data/part-00001.parquet",
396            &[3, 4],
397        )
398        .await
399        .unwrap();
400
401        let files = catalog.list_files(&table, None).await.unwrap();
402        let dv = files[0].deletion_vector.as_ref().unwrap();
403        let bm = load_deletion_vector(&store, dv).await.unwrap();
404        assert!(bm.contains(1) && bm.contains(2) && bm.contains(3) && bm.contains(4));
405        assert_eq!(bm.len(), 4);
406    }
407
408    #[tokio::test]
409    async fn rejects_v2_table() {
410        let dir = tempfile::tempdir().unwrap();
411        let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
412        let catalog: Arc<dyn CatalogProvider> =
413            Arc::new(HadoopCatalog::new(Arc::clone(&store), ""));
414        let table = TableIdent::new("default", "docs");
415        catalog.create_table(&table, &make_props(2)).await.unwrap();
416
417        let err = delete_rows(
418            Arc::clone(&catalog),
419            Arc::clone(&store),
420            &table,
421            "data/part-00001.parquet",
422            &[0],
423        )
424        .await
425        .unwrap_err();
426        assert!(err.to_string().contains("format-version=2"));
427    }
428
429    #[tokio::test]
430    async fn noop_when_row_ids_empty() {
431        let dir = tempfile::tempdir().unwrap();
432        let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
433        let (catalog, table) = setup_v3_table("", Arc::clone(&store)).await;
434
435        // Should return Ok immediately, no DV written.
436        delete_rows(
437            Arc::clone(&catalog),
438            Arc::clone(&store),
439            &table,
440            "data/part-00001.parquet",
441            &[],
442        )
443        .await
444        .unwrap();
445
446        let files = catalog.list_files(&table, None).await.unwrap();
447        assert!(files[0].deletion_vector.is_none());
448    }
449
450    #[tokio::test]
451    async fn puffin_magic_and_structure_valid() {
452        let mut bm = RoaringBitmap::new();
453        bm.insert(7);
454        bm.insert(99);
455        let (bytes, offset, length) = PuffinWriter::write_single_dv(&bm, 42).unwrap();
456
457        // Starts and ends with magic.
458        assert_eq!(&bytes[..4], PUFFIN_MAGIC);
459        assert_eq!(&bytes[bytes.len() - 4..], PUFFIN_MAGIC);
460
461        // Bitmap bytes are at the declared offset.
462        let blob_slice = &bytes[offset as usize..(offset + length) as usize];
463        let recovered = RoaringBitmap::deserialize_from(blob_slice).unwrap();
464        assert!(recovered.contains(7) && recovered.contains(99));
465    }
466}