Skip to main content

ailake_catalog/
provider.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Whether a shard's HNSW index has been built.
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11    /// HNSW index embedded in the file — normal HNSW search applies.
12    #[default]
13    Ready,
14    /// Parquet written; HNSW build running in background — flat scan applies.
15    Indexing,
16}
17
18/// Fully-qualified table identifier: namespace.table_name.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21    pub namespace: String,
22    pub name: String,
23}
24
25impl TableIdent {
26    pub fn new(namespace: &str, name: &str) -> Self {
27        Self {
28            namespace: namespace.to_string(),
29            name: name.to_string(),
30        }
31    }
32}
33
34pub type SnapshotId = i64;
35
36/// HNSW index info for one additional (non-primary) vector column.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39    pub column: String,
40    pub dim: u32,
41    pub hnsw_offset: u64,
42    pub hnsw_len: u64,
43    pub centroid_b64: Option<String>,
44    pub radius: Option<f32>,
45}
46
47/// Metadata about a single data file in a table snapshot.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50    /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
51    pub path: String,
52    pub record_count: u64,
53    pub file_size_bytes: u64,
54    /// base64-encoded centroid F32 values (primary vector column)
55    pub centroid_b64: Option<String>,
56    pub radius: Option<f32>,
57    pub hnsw_offset: Option<u64>,
58    pub hnsw_len: Option<u64>,
59    pub vector_column: Option<String>,
60    pub vector_dim: Option<u32>,
61    /// Additional vector columns beyond the primary (empty for single-column tables).
62    #[serde(default)]
63    pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64    /// Index build status. Defaults to Ready for backward compatibility with old manifests.
65    #[serde(default)]
66    pub index_status: IndexStatus,
67    /// Caller-supplied idempotency key. When set, `write_batch_idempotent` skips the
68    /// write if a file with the same batch_id is already committed in the snapshot.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub batch_id: Option<String>,
71    /// Embedding model identifier stored per-file so mixed-model tables (during migration)
72    /// can be identified without reading the main metadata.json.
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub embedding_model: Option<String>,
75}
76
77/// Iceberg-compatible table metadata read from the catalog.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TableMetadata {
80    pub table_uuid: String,
81    pub format_version: i32,
82    pub location: String,
83    /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
84    pub properties: HashMap<String, String>,
85    pub current_snapshot_id: Option<SnapshotId>,
86}
87
88/// Iceberg schema update carried inside a snapshot commit.
89///
90/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
91/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
92/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
93#[derive(Debug, Clone)]
94pub struct IcebergSchemaUpdate {
95    /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
96    pub fields: Vec<serde_json::Value>,
97    pub last_column_id: i32,
98    /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
99    pub name_mapping_json: String,
100}
101
102/// Snapshot commit request.
103#[derive(Debug, Clone)]
104pub struct NewSnapshot {
105    pub snapshot_id: SnapshotId,
106    pub parent_snapshot_id: Option<SnapshotId>,
107    pub files: Vec<DataFileEntry>,
108    pub operation: SnapshotOperation,
109    /// When set, the catalog backend should update the table schema on commit.
110    pub iceberg_schema: Option<IcebergSchemaUpdate>,
111}
112
113#[derive(Debug, Clone)]
114pub enum SnapshotOperation {
115    Append,
116    Overwrite,
117    Delete,
118    Replace,
119}
120
121/// Schema properties passed at table creation time.
122#[derive(Debug, Clone)]
123pub struct TableProperties {
124    pub policy: VectorStoragePolicy,
125    pub extra: HashMap<String, String>,
126}
127
128/// Unified catalog interface. All backends implement this trait.
129#[async_trait]
130pub trait CatalogProvider: Send + Sync {
131    async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
132
133    async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
134
135    async fn commit_snapshot(
136        &self,
137        table: &TableIdent,
138        snapshot: NewSnapshot,
139    ) -> AilakeResult<SnapshotId>;
140
141    async fn list_files(
142        &self,
143        table: &TableIdent,
144        snapshot_id: Option<SnapshotId>,
145    ) -> AilakeResult<Vec<DataFileEntry>>;
146
147    async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
148}
149
150/// Vector index metadata for a single data file.
151pub struct VectorIndexInfo<'a> {
152    pub column: &'a str,
153    pub dim: u32,
154    pub hnsw_offset: u64,
155    pub hnsw_len: u64,
156}
157
158/// Build DataFileEntry from a centroid, encoding it as base64.
159pub fn make_data_file_entry(
160    path: &str,
161    record_count: u64,
162    file_size_bytes: u64,
163    centroid: &Centroid,
164    index: VectorIndexInfo<'_>,
165) -> DataFileEntry {
166    make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
167}
168
169/// Build DataFileEntry for a file with multiple vector columns.
170///
171/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
172/// `extra` contains info for additional columns.
173pub fn make_multi_column_data_file_entry(
174    path: &str,
175    record_count: u64,
176    file_size_bytes: u64,
177    primary_centroid: &Centroid,
178    primary_index: VectorIndexInfo<'_>,
179    extra: &[ExtraVectorIndex],
180) -> DataFileEntry {
181    use base64::Engine;
182    let centroid_bytes: Vec<u8> = primary_centroid
183        .values
184        .iter()
185        .flat_map(|v| v.to_le_bytes())
186        .collect();
187    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
188    DataFileEntry {
189        path: path.to_string(),
190        record_count,
191        file_size_bytes,
192        centroid_b64: Some(centroid_b64),
193        radius: Some(primary_centroid.radius),
194        hnsw_offset: Some(primary_index.hnsw_offset),
195        hnsw_len: Some(primary_index.hnsw_len),
196        vector_column: Some(primary_index.column.to_string()),
197        vector_dim: Some(primary_index.dim),
198        extra_vector_indexes: extra.to_vec(),
199        index_status: IndexStatus::Ready,
200        batch_id: None,
201        embedding_model: None,
202    }
203}
204
205/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
206///
207/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
208/// The centroid is included so geometric pruning still works during the build window.
209pub fn make_data_file_entry_indexing(
210    path: &str,
211    record_count: u64,
212    file_size_bytes: u64,
213    centroid: &Centroid,
214    column: &str,
215    dim: u32,
216) -> DataFileEntry {
217    use base64::Engine;
218    let centroid_bytes: Vec<u8> = centroid
219        .values
220        .iter()
221        .flat_map(|v| v.to_le_bytes())
222        .collect();
223    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
224    DataFileEntry {
225        path: path.to_string(),
226        record_count,
227        file_size_bytes,
228        centroid_b64: Some(centroid_b64),
229        radius: Some(centroid.radius),
230        hnsw_offset: None,
231        hnsw_len: None,
232        vector_column: Some(column.to_string()),
233        vector_dim: Some(dim),
234        extra_vector_indexes: vec![],
235        index_status: IndexStatus::Indexing,
236        batch_id: None,
237        embedding_model: None,
238    }
239}
240
241/// Encode a centroid to base64 for use in ExtraVectorIndex.
242pub fn encode_centroid_b64(centroid: &Centroid) -> String {
243    use base64::Engine;
244    let bytes: Vec<u8> = centroid
245        .values
246        .iter()
247        .flat_map(|v| v.to_le_bytes())
248        .collect();
249    base64::engine::general_purpose::STANDARD.encode(&bytes)
250}
251
252/// Decode centroid bytes from base64 in a DataFileEntry.
253pub fn decode_centroid(
254    entry: &DataFileEntry,
255    metric: ailake_core::VectorMetric,
256) -> Option<Centroid> {
257    use base64::Engine;
258    let b64 = entry.centroid_b64.as_ref()?;
259    let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
260    let values: Vec<f32> = bytes
261        .chunks_exact(4)
262        .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
263        .collect();
264    Some(Centroid {
265        values,
266        radius: entry.radius.unwrap_or(0.0),
267        metric,
268    })
269}
270
271pub fn new_snapshot_id() -> SnapshotId {
272    // Use timestamp-based ID for simplicity (Iceberg uses i64)
273    std::time::SystemTime::now()
274        .duration_since(std::time::UNIX_EPOCH)
275        .unwrap()
276        .as_millis() as i64
277}