ailake_catalog/provider.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Whether a shard's HNSW index has been built.
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11 /// HNSW index embedded in the file — normal HNSW search applies.
12 #[default]
13 Ready,
14 /// Parquet written; HNSW build running in background — flat scan applies.
15 Indexing,
16}
17
18/// Fully-qualified table identifier: namespace.table_name.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21 pub namespace: String,
22 pub name: String,
23}
24
25impl TableIdent {
26 pub fn new(namespace: &str, name: &str) -> Self {
27 Self {
28 namespace: namespace.to_string(),
29 name: name.to_string(),
30 }
31 }
32}
33
34pub type SnapshotId = i64;
35
36/// Iceberg V3 Deletion Vector reference stored in a manifest entry.
37///
38/// Points to a Roaring Bitmap blob inside a Puffin `.dvd` file.
39/// `offset` + `length` address the blob bytes directly — no full Puffin
40/// footer parse required for Phase B read support.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct DeletionVector {
43 /// Absolute path to the Puffin `.dvd` file in the object store.
44 pub path: String,
45 /// Byte offset of the Roaring Bitmap blob within the Puffin file.
46 pub offset: u64,
47 /// Byte length of the Roaring Bitmap blob.
48 pub length: u64,
49 /// Number of deleted rows (bitmap popcount; -1 when unknown).
50 pub cardinality: i64,
51}
52
53/// HNSW index info for one additional (non-primary) vector column.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ExtraVectorIndex {
56 pub column: String,
57 pub dim: u32,
58 pub hnsw_offset: u64,
59 pub hnsw_len: u64,
60 pub centroid_b64: Option<String>,
61 pub radius: Option<f32>,
62}
63
64/// Metadata about a single data file in a table snapshot.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct DataFileEntry {
67 /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
68 pub path: String,
69 pub record_count: u64,
70 pub file_size_bytes: u64,
71 /// base64-encoded centroid F32 values (primary vector column)
72 pub centroid_b64: Option<String>,
73 pub radius: Option<f32>,
74 pub hnsw_offset: Option<u64>,
75 pub hnsw_len: Option<u64>,
76 pub vector_column: Option<String>,
77 pub vector_dim: Option<u32>,
78 /// Additional vector columns beyond the primary (empty for single-column tables).
79 #[serde(default)]
80 pub extra_vector_indexes: Vec<ExtraVectorIndex>,
81 /// Index build status. Defaults to Ready for backward compatibility with old manifests.
82 #[serde(default)]
83 pub index_status: IndexStatus,
84 /// Caller-supplied idempotency key. When set, `write_batch_idempotent` skips the
85 /// write if a file with the same batch_id is already committed in the snapshot.
86 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub batch_id: Option<String>,
88 /// Embedding model identifier stored per-file so mixed-model tables (during migration)
89 /// can be identified without reading the main metadata.json.
90 #[serde(default, skip_serializing_if = "Option::is_none")]
91 pub embedding_model: Option<String>,
92 /// Partition value for this file (e.g. the agent_id UUID).
93 /// Written per-file when `VectorStoragePolicy::partition_by` is set.
94 /// Enables manifest-level pruning: search skips files whose partition_value
95 /// doesn't match the requested partition filter, avoiding all HNSW I/O.
96 #[serde(default, skip_serializing_if = "Option::is_none")]
97 pub partition_value: Option<String>,
98 /// Iceberg V3 Deletion Vector: Roaring Bitmap of deleted row positions.
99 /// None for V2 tables or V3 tables with no deletes for this file.
100 /// When present, scanner masks these row IDs from HNSW results.
101 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub deletion_vector: Option<DeletionVector>,
103 /// Iceberg V3 Row Lineage: globally unique first row ID assigned to this file.
104 /// Computed at commit time from the table's cumulative `next-row-id` counter.
105 /// None for V2 tables (row lineage requires format-version=3).
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub first_row_id: Option<i64>,
108}
109
110/// One field from the current Iceberg table schema (Phase G).
111///
112/// Parsed from `schemas[current-schema-id].fields` in `metadata.json`.
113/// Used by `SchemaFiller` to inject missing columns when reading old files.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SchemaField {
116 pub id: i32,
117 pub name: String,
118 pub required: bool,
119 /// Iceberg type string, e.g. `"int"`, `"string"`, `"timestamptz"`.
120 pub iceberg_type: String,
121 /// Value injected when reading old files that predate this field.
122 /// `None` → null is used.
123 #[serde(default, skip_serializing_if = "Option::is_none")]
124 pub initial_default: Option<serde_json::Value>,
125 /// Default value written to new files. Same as `initial_default` in most cases.
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub write_default: Option<serde_json::Value>,
128}
129
130/// Iceberg-compatible table metadata read from the catalog.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct TableMetadata {
133 pub table_uuid: String,
134 pub format_version: i32,
135 pub location: String,
136 /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
137 pub properties: HashMap<String, String>,
138 pub current_snapshot_id: Option<SnapshotId>,
139 /// Absolute path to the Puffin stats file for the current snapshot (Phase F).
140 /// `None` for V2 tables or V3 tables without any committed statistics yet.
141 #[serde(default, skip_serializing_if = "Option::is_none")]
142 pub current_statistics_path: Option<String>,
143 /// Current schema fields parsed from `metadata.json` (Phase G).
144 /// Empty for tables created before Phase G or tables with no schema committed.
145 /// Used by `SchemaFiller` in the scanner to inject missing columns with defaults.
146 #[serde(default, skip_serializing_if = "Vec::is_empty")]
147 pub schema_fields: Vec<SchemaField>,
148 /// Equality delete files active in the current snapshot (Phase H).
149 /// Loaded from delete manifests (content=2 in the manifest list).
150 /// Populated by `CatalogProvider::list_equality_deletes` — empty until first call.
151 #[serde(default, skip_serializing_if = "Vec::is_empty")]
152 pub equality_delete_files: Vec<EqualityDeleteFile>,
153 /// Active partition spec for this table (Phase I).
154 /// `None` for unpartitioned tables or tables created before Phase I.
155 #[serde(default, skip_serializing_if = "Option::is_none")]
156 pub partition_spec: Option<PartitionSpec>,
157}
158
159/// Iceberg schema update carried inside a snapshot commit.
160///
161/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
162/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
163/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
164#[derive(Debug, Clone)]
165pub struct IcebergSchemaUpdate {
166 /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
167 pub fields: Vec<serde_json::Value>,
168 pub last_column_id: i32,
169 /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
170 pub name_mapping_json: String,
171}
172
173/// Reference to an Iceberg equality delete file (Phase H).
174///
175/// The delete file is an Avro file with `content=2` whose rows contain equality
176/// predicates — any data row matching one of those rows is logically deleted.
177/// `equality_ids` lists the field IDs used for the equality check.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct EqualityDeleteFile {
180 /// Absolute or warehouse-relative path of the Avro delete file.
181 pub path: String,
182 /// Field IDs whose values must all match to delete a data row.
183 pub equality_ids: Vec<i32>,
184 /// Number of predicates (rows) in the delete file.
185 pub record_count: u64,
186 pub file_size_bytes: u64,
187}
188
189/// Snapshot commit request.
190#[derive(Debug, Clone)]
191pub struct NewSnapshot {
192 pub snapshot_id: SnapshotId,
193 pub parent_snapshot_id: Option<SnapshotId>,
194 pub files: Vec<DataFileEntry>,
195 pub operation: SnapshotOperation,
196 /// When set, the catalog backend should update the table schema on commit.
197 pub iceberg_schema: Option<IcebergSchemaUpdate>,
198 /// Additional table-level properties to merge on commit (e.g. secondary column dims).
199 /// Keys use `ailake.dim-<col>` / `ailake.metric-<col>` convention.
200 pub extra_properties: HashMap<String, String>,
201 /// Per-file BM25 Bloom filter bytes for term-level file pruning (Phase F).
202 /// Key = data file path (relative, matches `DataFileEntry::path`).
203 /// Written to the Puffin stats file on V3 commits; ignored for V2 tables.
204 pub bloom_filters: Vec<(String, Vec<u8>)>,
205 /// Equality delete files to add to this snapshot (Phase H).
206 /// Written as a separate delete manifest with content=2 in the manifest list.
207 pub equality_delete_files: Vec<EqualityDeleteFile>,
208}
209
210#[derive(Debug, Clone)]
211pub enum SnapshotOperation {
212 Append,
213 Overwrite,
214 Delete,
215 Replace,
216}
217
218/// One field in an Iceberg partition spec (Phase I).
219///
220/// For an `identity` partition on column "agent_id":
221/// `source_id=1` (must match the field id in the table schema),
222/// `field_id=1000` (Iceberg convention: partition fields start at 1000).
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct PartitionField {
225 /// Field ID of the source column in the table schema.
226 pub source_id: i32,
227 /// Partition field ID (≥ 1000 by convention).
228 pub field_id: i32,
229 /// Partition column name (usually same as the source column).
230 pub name: String,
231 /// Transform function: "identity", "bucket[N]", "truncate[W]", "year", etc.
232 pub transform: String,
233 /// Iceberg type of the source column ("string", "int", "long", "uuid").
234 /// Derived from the table schema at read time; stored here for encoding.
235 pub source_type: String,
236}
237
238/// Iceberg partition spec (Phase I).
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct PartitionSpec {
241 pub spec_id: i32,
242 pub fields: Vec<PartitionField>,
243}
244
245impl PartitionSpec {
246 /// True when this spec has no partition fields (unpartitioned table).
247 pub fn is_unpartitioned(&self) -> bool {
248 self.fields.is_empty()
249 }
250}
251
252/// Schema properties passed at table creation time.
253#[derive(Debug, Clone)]
254pub struct TableProperties {
255 pub policy: VectorStoragePolicy,
256 pub extra: HashMap<String, String>,
257 /// Iceberg format version to write. 2 = default (V2). 3 = opt-in V3.
258 pub format_version: u8,
259 /// Iceberg type of the partition column when `policy.partition_by` is set.
260 /// Defaults to `"string"` when `None`. Supported: "string", "uuid", "int", "long".
261 pub partition_column_type: Option<String>,
262}
263
264/// Unified catalog interface. All backends implement this trait.
265#[async_trait]
266pub trait CatalogProvider: Send + Sync {
267 async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
268
269 async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
270
271 async fn commit_snapshot(
272 &self,
273 table: &TableIdent,
274 snapshot: NewSnapshot,
275 ) -> AilakeResult<SnapshotId>;
276
277 async fn list_files(
278 &self,
279 table: &TableIdent,
280 snapshot_id: Option<SnapshotId>,
281 ) -> AilakeResult<Vec<DataFileEntry>>;
282
283 async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
284
285 /// Apply schema evolution (add columns / rename columns) without rewriting data files.
286 ///
287 /// Returns the new `schema-id` assigned in `metadata.json`.
288 /// Old files missing new columns will have their values filled at read time using
289 /// `AddColumnRequest::initial_default` (Phase G `SchemaFiller`).
290 ///
291 /// Default implementation returns an error — override in file-based backends.
292 async fn evolve_schema(
293 &self,
294 _table: &TableIdent,
295 _evolution: crate::schema_evolution::SchemaEvolution,
296 ) -> AilakeResult<i32> {
297 Err(ailake_core::AilakeError::Catalog(
298 "evolve_schema not supported by this catalog backend".into(),
299 ))
300 }
301
302 /// Return all equality delete files active in the current (or specified) snapshot.
303 ///
304 /// Reads delete manifests (manifest list entries with `content=1`) and parses their
305 /// `content=2` entries. Default returns empty vec for catalog backends that do not
306 /// support Iceberg equality deletes.
307 async fn list_equality_deletes(
308 &self,
309 _table: &TableIdent,
310 _snapshot_id: Option<SnapshotId>,
311 ) -> AilakeResult<Vec<EqualityDeleteFile>> {
312 Ok(vec![])
313 }
314}
315
316/// Vector index metadata for a single data file.
317pub struct VectorIndexInfo<'a> {
318 pub column: &'a str,
319 pub dim: u32,
320 pub hnsw_offset: u64,
321 pub hnsw_len: u64,
322}
323
324/// Build DataFileEntry from a centroid, encoding it as base64.
325pub fn make_data_file_entry(
326 path: &str,
327 record_count: u64,
328 file_size_bytes: u64,
329 centroid: &Centroid,
330 index: VectorIndexInfo<'_>,
331) -> DataFileEntry {
332 make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
333}
334
335/// Build DataFileEntry for a file with multiple vector columns.
336///
337/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
338/// `extra` contains info for additional columns.
339pub fn make_multi_column_data_file_entry(
340 path: &str,
341 record_count: u64,
342 file_size_bytes: u64,
343 primary_centroid: &Centroid,
344 primary_index: VectorIndexInfo<'_>,
345 extra: &[ExtraVectorIndex],
346) -> DataFileEntry {
347 use base64::Engine;
348 let centroid_bytes: Vec<u8> = primary_centroid
349 .values
350 .iter()
351 .flat_map(|v| v.to_le_bytes())
352 .collect();
353 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
354 DataFileEntry {
355 path: path.to_string(),
356 record_count,
357 file_size_bytes,
358 centroid_b64: Some(centroid_b64),
359 radius: Some(primary_centroid.radius),
360 hnsw_offset: Some(primary_index.hnsw_offset),
361 hnsw_len: Some(primary_index.hnsw_len),
362 vector_column: Some(primary_index.column.to_string()),
363 vector_dim: Some(primary_index.dim),
364 extra_vector_indexes: extra.to_vec(),
365 index_status: IndexStatus::Ready,
366 batch_id: None,
367 embedding_model: None,
368 partition_value: None,
369 deletion_vector: None,
370 first_row_id: None,
371 }
372}
373
374/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
375///
376/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
377/// The centroid is included so geometric pruning still works during the build window.
378pub fn make_data_file_entry_indexing(
379 path: &str,
380 record_count: u64,
381 file_size_bytes: u64,
382 centroid: &Centroid,
383 column: &str,
384 dim: u32,
385) -> DataFileEntry {
386 use base64::Engine;
387 let centroid_bytes: Vec<u8> = centroid
388 .values
389 .iter()
390 .flat_map(|v| v.to_le_bytes())
391 .collect();
392 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
393 DataFileEntry {
394 path: path.to_string(),
395 record_count,
396 file_size_bytes,
397 centroid_b64: Some(centroid_b64),
398 radius: Some(centroid.radius),
399 hnsw_offset: None,
400 hnsw_len: None,
401 vector_column: Some(column.to_string()),
402 vector_dim: Some(dim),
403 extra_vector_indexes: vec![],
404 index_status: IndexStatus::Indexing,
405 batch_id: None,
406 embedding_model: None,
407 partition_value: None,
408 deletion_vector: None,
409 first_row_id: None,
410 }
411}
412
413/// Encode a centroid to base64 for use in ExtraVectorIndex.
414pub fn encode_centroid_b64(centroid: &Centroid) -> String {
415 use base64::Engine;
416 let bytes: Vec<u8> = centroid
417 .values
418 .iter()
419 .flat_map(|v| v.to_le_bytes())
420 .collect();
421 base64::engine::general_purpose::STANDARD.encode(&bytes)
422}
423
424/// Decode centroid bytes from base64 in a DataFileEntry.
425pub fn decode_centroid(
426 entry: &DataFileEntry,
427 metric: ailake_core::VectorMetric,
428) -> Option<Centroid> {
429 use base64::Engine;
430 let b64 = entry.centroid_b64.as_ref()?;
431 let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
432 let values: Vec<f32> = bytes
433 .chunks_exact(4)
434 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
435 .collect();
436 Some(Centroid {
437 values,
438 radius: entry.radius.unwrap_or(0.0),
439 metric,
440 })
441}
442
443pub fn new_snapshot_id() -> SnapshotId {
444 // Use timestamp-based ID for simplicity (Iceberg uses i64)
445 std::time::SystemTime::now()
446 .duration_since(std::time::UNIX_EPOCH)
447 .unwrap()
448 .as_millis() as i64
449}