Skip to main content

ailake_catalog/
provider.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Whether a shard's HNSW index has been built.
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11    /// HNSW index embedded in the file — normal HNSW search applies.
12    #[default]
13    Ready,
14    /// Parquet written; HNSW build running in background — flat scan applies.
15    Indexing,
16}
17
18/// Fully-qualified table identifier: namespace.table_name.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21    pub namespace: String,
22    pub name: String,
23}
24
25impl TableIdent {
26    pub fn new(namespace: &str, name: &str) -> Self {
27        Self {
28            namespace: namespace.to_string(),
29            name: name.to_string(),
30        }
31    }
32}
33
34pub type SnapshotId = i64;
35
36/// Iceberg V3 Deletion Vector reference stored in a manifest entry.
37///
38/// Points to a Roaring Bitmap blob inside a Puffin `.dvd` file.
39/// `offset` + `length` address the blob bytes directly — no full Puffin
40/// footer parse required for Phase B read support.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct DeletionVector {
43    /// Absolute path to the Puffin `.dvd` file in the object store.
44    pub path: String,
45    /// Byte offset of the Roaring Bitmap blob within the Puffin file.
46    pub offset: u64,
47    /// Byte length of the Roaring Bitmap blob.
48    pub length: u64,
49    /// Number of deleted rows (bitmap popcount; -1 when unknown).
50    pub cardinality: i64,
51}
52
53/// HNSW index info for one additional (non-primary) vector column.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ExtraVectorIndex {
56    pub column: String,
57    pub dim: u32,
58    pub hnsw_offset: u64,
59    pub hnsw_len: u64,
60    pub centroid_b64: Option<String>,
61    pub radius: Option<f32>,
62}
63
64/// Metadata about a single data file in a table snapshot.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct DataFileEntry {
67    /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
68    pub path: String,
69    pub record_count: u64,
70    pub file_size_bytes: u64,
71    /// base64-encoded centroid F32 values (primary vector column)
72    pub centroid_b64: Option<String>,
73    pub radius: Option<f32>,
74    pub hnsw_offset: Option<u64>,
75    pub hnsw_len: Option<u64>,
76    pub vector_column: Option<String>,
77    pub vector_dim: Option<u32>,
78    /// Additional vector columns beyond the primary (empty for single-column tables).
79    #[serde(default)]
80    pub extra_vector_indexes: Vec<ExtraVectorIndex>,
81    /// Index build status. Defaults to Ready for backward compatibility with old manifests.
82    #[serde(default)]
83    pub index_status: IndexStatus,
84    /// Caller-supplied idempotency key. When set, `write_batch_idempotent` skips the
85    /// write if a file with the same batch_id is already committed in the snapshot.
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub batch_id: Option<String>,
88    /// Embedding model identifier stored per-file so mixed-model tables (during migration)
89    /// can be identified without reading the main metadata.json.
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    pub embedding_model: Option<String>,
92    /// Partition value for this file (e.g. the agent_id UUID).
93    /// Written per-file when `VectorStoragePolicy::partition_by` is set.
94    /// Enables manifest-level pruning: search skips files whose partition_value
95    /// doesn't match the requested partition filter, avoiding all HNSW I/O.
96    #[serde(default, skip_serializing_if = "Option::is_none")]
97    pub partition_value: Option<String>,
98    /// Iceberg V3 Deletion Vector: Roaring Bitmap of deleted row positions.
99    /// None for V2 tables or V3 tables with no deletes for this file.
100    /// When present, scanner masks these row IDs from HNSW results.
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub deletion_vector: Option<DeletionVector>,
103    /// Iceberg V3 Row Lineage: globally unique first row ID assigned to this file.
104    /// Computed at commit time from the table's cumulative `next-row-id` counter.
105    /// None for V2 tables (row lineage requires format-version=3).
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub first_row_id: Option<i64>,
108}
109
110/// One field from the current Iceberg table schema (Phase G).
111///
112/// Parsed from `schemas[current-schema-id].fields` in `metadata.json`.
113/// Used by `SchemaFiller` to inject missing columns when reading old files.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SchemaField {
116    pub id: i32,
117    pub name: String,
118    pub required: bool,
119    /// Iceberg type string, e.g. `"int"`, `"string"`, `"timestamptz"`.
120    pub iceberg_type: String,
121    /// Value injected when reading old files that predate this field.
122    /// `None` → null is used.
123    #[serde(default, skip_serializing_if = "Option::is_none")]
124    pub initial_default: Option<serde_json::Value>,
125    /// Default value written to new files. Same as `initial_default` in most cases.
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub write_default: Option<serde_json::Value>,
128}
129
130/// Iceberg-compatible table metadata read from the catalog.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct TableMetadata {
133    pub table_uuid: String,
134    pub format_version: i32,
135    pub location: String,
136    /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
137    pub properties: HashMap<String, String>,
138    pub current_snapshot_id: Option<SnapshotId>,
139    /// Absolute path to the Puffin stats file for the current snapshot (Phase F).
140    /// `None` for V2 tables or V3 tables without any committed statistics yet.
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub current_statistics_path: Option<String>,
143    /// Current schema fields parsed from `metadata.json` (Phase G).
144    /// Empty for tables created before Phase G or tables with no schema committed.
145    /// Used by `SchemaFiller` in the scanner to inject missing columns with defaults.
146    #[serde(default, skip_serializing_if = "Vec::is_empty")]
147    pub schema_fields: Vec<SchemaField>,
148    /// Equality delete files active in the current snapshot (Phase H).
149    /// Loaded from delete manifests (content=2 in the manifest list).
150    /// Populated by `CatalogProvider::list_equality_deletes` — empty until first call.
151    #[serde(default, skip_serializing_if = "Vec::is_empty")]
152    pub equality_delete_files: Vec<EqualityDeleteFile>,
153    /// Active partition spec for this table (Phase I).
154    /// `None` for unpartitioned tables or tables created before Phase I.
155    #[serde(default, skip_serializing_if = "Option::is_none")]
156    pub partition_spec: Option<PartitionSpec>,
157}
158
159/// Iceberg schema update carried inside a snapshot commit.
160///
161/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
162/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
163/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
164#[derive(Debug, Clone)]
165pub struct IcebergSchemaUpdate {
166    /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
167    pub fields: Vec<serde_json::Value>,
168    pub last_column_id: i32,
169    /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
170    pub name_mapping_json: String,
171}
172
173/// Reference to an Iceberg equality delete file (Phase H).
174///
175/// The delete file is an Avro file with `content=2` whose rows contain equality
176/// predicates — any data row matching one of those rows is logically deleted.
177/// `equality_ids` lists the field IDs used for the equality check.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct EqualityDeleteFile {
180    /// Absolute or warehouse-relative path of the Avro delete file.
181    pub path: String,
182    /// Field IDs whose values must all match to delete a data row.
183    pub equality_ids: Vec<i32>,
184    /// Number of predicates (rows) in the delete file.
185    pub record_count: u64,
186    pub file_size_bytes: u64,
187}
188
189/// Snapshot commit request.
190#[derive(Debug, Clone)]
191pub struct NewSnapshot {
192    pub snapshot_id: SnapshotId,
193    pub parent_snapshot_id: Option<SnapshotId>,
194    pub files: Vec<DataFileEntry>,
195    pub operation: SnapshotOperation,
196    /// When set, the catalog backend should update the table schema on commit.
197    pub iceberg_schema: Option<IcebergSchemaUpdate>,
198    /// Additional table-level properties to merge on commit (e.g. secondary column dims).
199    /// Keys use `ailake.dim-<col>` / `ailake.metric-<col>` convention.
200    pub extra_properties: HashMap<String, String>,
201    /// Per-file BM25 Bloom filter bytes for term-level file pruning (Phase F).
202    /// Key = data file path (relative, matches `DataFileEntry::path`).
203    /// Written to the Puffin stats file on V3 commits; ignored for V2 tables.
204    pub bloom_filters: Vec<(String, Vec<u8>)>,
205    /// Equality delete files to add to this snapshot (Phase H).
206    /// Written as a separate delete manifest with content=2 in the manifest list.
207    pub equality_delete_files: Vec<EqualityDeleteFile>,
208}
209
210#[derive(Debug, Clone)]
211pub enum SnapshotOperation {
212    Append,
213    Overwrite,
214    Delete,
215    Replace,
216}
217
218/// One field in an Iceberg partition spec (Phase I).
219///
220/// For an `identity` partition on column "agent_id":
221/// `source_id=1` (must match the field id in the table schema),
222/// `field_id=1000` (Iceberg convention: partition fields start at 1000).
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct PartitionField {
225    /// Field ID of the source column in the table schema.
226    pub source_id: i32,
227    /// Partition field ID (≥ 1000 by convention).
228    pub field_id: i32,
229    /// Partition column name (usually same as the source column).
230    pub name: String,
231    /// Transform function: "identity", "bucket[N]", "truncate[W]", "year", etc.
232    pub transform: String,
233    /// Iceberg type of the source column ("string", "int", "long", "uuid").
234    /// Derived from the table schema at read time; stored here for encoding.
235    pub source_type: String,
236}
237
238/// Iceberg partition spec (Phase I).
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct PartitionSpec {
241    pub spec_id: i32,
242    pub fields: Vec<PartitionField>,
243}
244
245impl PartitionSpec {
246    /// True when this spec has no partition fields (unpartitioned table).
247    pub fn is_unpartitioned(&self) -> bool {
248        self.fields.is_empty()
249    }
250}
251
252/// Schema properties passed at table creation time.
253#[derive(Debug, Clone)]
254pub struct TableProperties {
255    pub policy: VectorStoragePolicy,
256    pub extra: HashMap<String, String>,
257    /// Iceberg format version to write. 2 = default (V2). 3 = opt-in V3.
258    pub format_version: u8,
259    /// Iceberg type of the partition column when `policy.partition_by` is set.
260    /// Defaults to `"string"` when `None`. Supported: "string", "uuid", "int", "long".
261    pub partition_column_type: Option<String>,
262}
263
264/// Unified catalog interface. All backends implement this trait.
265#[async_trait]
266pub trait CatalogProvider: Send + Sync {
267    async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
268
269    async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
270
271    async fn commit_snapshot(
272        &self,
273        table: &TableIdent,
274        snapshot: NewSnapshot,
275    ) -> AilakeResult<SnapshotId>;
276
277    async fn list_files(
278        &self,
279        table: &TableIdent,
280        snapshot_id: Option<SnapshotId>,
281    ) -> AilakeResult<Vec<DataFileEntry>>;
282
283    async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
284
285    /// Apply schema evolution (add columns / rename columns) without rewriting data files.
286    ///
287    /// Returns the new `schema-id` assigned in `metadata.json`.
288    /// Old files missing new columns will have their values filled at read time using
289    /// `AddColumnRequest::initial_default` (Phase G `SchemaFiller`).
290    ///
291    /// Default implementation returns an error — override in file-based backends.
292    async fn evolve_schema(
293        &self,
294        _table: &TableIdent,
295        _evolution: crate::schema_evolution::SchemaEvolution,
296    ) -> AilakeResult<i32> {
297        Err(ailake_core::AilakeError::Catalog(
298            "evolve_schema not supported by this catalog backend".into(),
299        ))
300    }
301
302    /// Return all equality delete files active in the current (or specified) snapshot.
303    ///
304    /// Reads delete manifests (manifest list entries with `content=1`) and parses their
305    /// `content=2` entries. Default returns empty vec for catalog backends that do not
306    /// support Iceberg equality deletes.
307    async fn list_equality_deletes(
308        &self,
309        _table: &TableIdent,
310        _snapshot_id: Option<SnapshotId>,
311    ) -> AilakeResult<Vec<EqualityDeleteFile>> {
312        Ok(vec![])
313    }
314}
315
316/// Vector index metadata for a single data file.
317pub struct VectorIndexInfo<'a> {
318    pub column: &'a str,
319    pub dim: u32,
320    pub hnsw_offset: u64,
321    pub hnsw_len: u64,
322}
323
324/// Build DataFileEntry from a centroid, encoding it as base64.
325pub fn make_data_file_entry(
326    path: &str,
327    record_count: u64,
328    file_size_bytes: u64,
329    centroid: &Centroid,
330    index: VectorIndexInfo<'_>,
331) -> DataFileEntry {
332    make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
333}
334
335/// Build DataFileEntry for a file with multiple vector columns.
336///
337/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
338/// `extra` contains info for additional columns.
339pub fn make_multi_column_data_file_entry(
340    path: &str,
341    record_count: u64,
342    file_size_bytes: u64,
343    primary_centroid: &Centroid,
344    primary_index: VectorIndexInfo<'_>,
345    extra: &[ExtraVectorIndex],
346) -> DataFileEntry {
347    use base64::Engine;
348    let centroid_bytes: Vec<u8> = primary_centroid
349        .values
350        .iter()
351        .flat_map(|v| v.to_le_bytes())
352        .collect();
353    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
354    DataFileEntry {
355        path: path.to_string(),
356        record_count,
357        file_size_bytes,
358        centroid_b64: Some(centroid_b64),
359        radius: Some(primary_centroid.radius),
360        hnsw_offset: Some(primary_index.hnsw_offset),
361        hnsw_len: Some(primary_index.hnsw_len),
362        vector_column: Some(primary_index.column.to_string()),
363        vector_dim: Some(primary_index.dim),
364        extra_vector_indexes: extra.to_vec(),
365        index_status: IndexStatus::Ready,
366        batch_id: None,
367        embedding_model: None,
368        partition_value: None,
369        deletion_vector: None,
370        first_row_id: None,
371    }
372}
373
374/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
375///
376/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
377/// The centroid is included so geometric pruning still works during the build window.
378pub fn make_data_file_entry_indexing(
379    path: &str,
380    record_count: u64,
381    file_size_bytes: u64,
382    centroid: &Centroid,
383    column: &str,
384    dim: u32,
385) -> DataFileEntry {
386    use base64::Engine;
387    let centroid_bytes: Vec<u8> = centroid
388        .values
389        .iter()
390        .flat_map(|v| v.to_le_bytes())
391        .collect();
392    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
393    DataFileEntry {
394        path: path.to_string(),
395        record_count,
396        file_size_bytes,
397        centroid_b64: Some(centroid_b64),
398        radius: Some(centroid.radius),
399        hnsw_offset: None,
400        hnsw_len: None,
401        vector_column: Some(column.to_string()),
402        vector_dim: Some(dim),
403        extra_vector_indexes: vec![],
404        index_status: IndexStatus::Indexing,
405        batch_id: None,
406        embedding_model: None,
407        partition_value: None,
408        deletion_vector: None,
409        first_row_id: None,
410    }
411}
412
413/// Encode a centroid to base64 for use in ExtraVectorIndex.
414pub fn encode_centroid_b64(centroid: &Centroid) -> String {
415    use base64::Engine;
416    let bytes: Vec<u8> = centroid
417        .values
418        .iter()
419        .flat_map(|v| v.to_le_bytes())
420        .collect();
421    base64::engine::general_purpose::STANDARD.encode(&bytes)
422}
423
424/// Decode centroid bytes from base64 in a DataFileEntry.
425pub fn decode_centroid(
426    entry: &DataFileEntry,
427    metric: ailake_core::VectorMetric,
428) -> Option<Centroid> {
429    use base64::Engine;
430    let b64 = entry.centroid_b64.as_ref()?;
431    let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
432    let values: Vec<f32> = bytes
433        .chunks_exact(4)
434        .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
435        .collect();
436    Some(Centroid {
437        values,
438        radius: entry.radius.unwrap_or(0.0),
439        metric,
440    })
441}
442
443pub fn new_snapshot_id() -> SnapshotId {
444    // Use timestamp-based ID for simplicity (Iceberg uses i64)
445    std::time::SystemTime::now()
446        .duration_since(std::time::UNIX_EPOCH)
447        .unwrap()
448        .as_millis() as i64
449}