Skip to main content

ailake_catalog/
provider.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Whether a shard's HNSW index has been built.
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11    /// HNSW index embedded in the file — normal HNSW search applies.
12    #[default]
13    Ready,
14    /// Parquet written; HNSW build running in background — flat scan applies.
15    Indexing,
16}
17
18/// Fully-qualified table identifier: namespace.table_name.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21    pub namespace: String,
22    pub name: String,
23}
24
25impl TableIdent {
26    pub fn new(namespace: &str, name: &str) -> Self {
27        Self {
28            namespace: namespace.to_string(),
29            name: name.to_string(),
30        }
31    }
32}
33
34pub type SnapshotId = i64;
35
36/// HNSW index info for one additional (non-primary) vector column.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39    pub column: String,
40    pub dim: u32,
41    pub hnsw_offset: u64,
42    pub hnsw_len: u64,
43    pub centroid_b64: Option<String>,
44    pub radius: Option<f32>,
45}
46
47/// Metadata about a single data file in a table snapshot.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50    /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
51    pub path: String,
52    pub record_count: u64,
53    pub file_size_bytes: u64,
54    /// base64-encoded centroid F32 values (primary vector column)
55    pub centroid_b64: Option<String>,
56    pub radius: Option<f32>,
57    pub hnsw_offset: Option<u64>,
58    pub hnsw_len: Option<u64>,
59    pub vector_column: Option<String>,
60    pub vector_dim: Option<u32>,
61    /// Additional vector columns beyond the primary (empty for single-column tables).
62    #[serde(default)]
63    pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64    /// Index build status. Defaults to Ready for backward compatibility with old manifests.
65    #[serde(default)]
66    pub index_status: IndexStatus,
67    /// Caller-supplied idempotency key. When set, `write_batch_idempotent` skips the
68    /// write if a file with the same batch_id is already committed in the snapshot.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub batch_id: Option<String>,
71    /// Embedding model identifier stored per-file so mixed-model tables (during migration)
72    /// can be identified without reading the main metadata.json.
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub embedding_model: Option<String>,
75}
76
77/// Iceberg-compatible table metadata read from the catalog.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TableMetadata {
80    pub table_uuid: String,
81    pub format_version: i32,
82    pub location: String,
83    /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
84    pub properties: HashMap<String, String>,
85    pub current_snapshot_id: Option<SnapshotId>,
86}
87
88/// Iceberg schema update carried inside a snapshot commit.
89///
90/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
91/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
92/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
93#[derive(Debug, Clone)]
94pub struct IcebergSchemaUpdate {
95    /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
96    pub fields: Vec<serde_json::Value>,
97    pub last_column_id: i32,
98    /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
99    pub name_mapping_json: String,
100}
101
102/// Snapshot commit request.
103#[derive(Debug, Clone)]
104pub struct NewSnapshot {
105    pub snapshot_id: SnapshotId,
106    pub parent_snapshot_id: Option<SnapshotId>,
107    pub files: Vec<DataFileEntry>,
108    pub operation: SnapshotOperation,
109    /// When set, the catalog backend should update the table schema on commit.
110    pub iceberg_schema: Option<IcebergSchemaUpdate>,
111    /// Additional table-level properties to merge on commit (e.g. secondary column dims).
112    /// Keys use `ailake.dim-<col>` / `ailake.metric-<col>` convention.
113    pub extra_properties: HashMap<String, String>,
114}
115
116#[derive(Debug, Clone)]
117pub enum SnapshotOperation {
118    Append,
119    Overwrite,
120    Delete,
121    Replace,
122}
123
124/// Schema properties passed at table creation time.
125#[derive(Debug, Clone)]
126pub struct TableProperties {
127    pub policy: VectorStoragePolicy,
128    pub extra: HashMap<String, String>,
129}
130
131/// Unified catalog interface. All backends implement this trait.
132#[async_trait]
133pub trait CatalogProvider: Send + Sync {
134    async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
135
136    async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
137
138    async fn commit_snapshot(
139        &self,
140        table: &TableIdent,
141        snapshot: NewSnapshot,
142    ) -> AilakeResult<SnapshotId>;
143
144    async fn list_files(
145        &self,
146        table: &TableIdent,
147        snapshot_id: Option<SnapshotId>,
148    ) -> AilakeResult<Vec<DataFileEntry>>;
149
150    async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
151}
152
153/// Vector index metadata for a single data file.
154pub struct VectorIndexInfo<'a> {
155    pub column: &'a str,
156    pub dim: u32,
157    pub hnsw_offset: u64,
158    pub hnsw_len: u64,
159}
160
161/// Build DataFileEntry from a centroid, encoding it as base64.
162pub fn make_data_file_entry(
163    path: &str,
164    record_count: u64,
165    file_size_bytes: u64,
166    centroid: &Centroid,
167    index: VectorIndexInfo<'_>,
168) -> DataFileEntry {
169    make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
170}
171
172/// Build DataFileEntry for a file with multiple vector columns.
173///
174/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
175/// `extra` contains info for additional columns.
176pub fn make_multi_column_data_file_entry(
177    path: &str,
178    record_count: u64,
179    file_size_bytes: u64,
180    primary_centroid: &Centroid,
181    primary_index: VectorIndexInfo<'_>,
182    extra: &[ExtraVectorIndex],
183) -> DataFileEntry {
184    use base64::Engine;
185    let centroid_bytes: Vec<u8> = primary_centroid
186        .values
187        .iter()
188        .flat_map(|v| v.to_le_bytes())
189        .collect();
190    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
191    DataFileEntry {
192        path: path.to_string(),
193        record_count,
194        file_size_bytes,
195        centroid_b64: Some(centroid_b64),
196        radius: Some(primary_centroid.radius),
197        hnsw_offset: Some(primary_index.hnsw_offset),
198        hnsw_len: Some(primary_index.hnsw_len),
199        vector_column: Some(primary_index.column.to_string()),
200        vector_dim: Some(primary_index.dim),
201        extra_vector_indexes: extra.to_vec(),
202        index_status: IndexStatus::Ready,
203        batch_id: None,
204        embedding_model: None,
205    }
206}
207
208/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
209///
210/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
211/// The centroid is included so geometric pruning still works during the build window.
212pub fn make_data_file_entry_indexing(
213    path: &str,
214    record_count: u64,
215    file_size_bytes: u64,
216    centroid: &Centroid,
217    column: &str,
218    dim: u32,
219) -> DataFileEntry {
220    use base64::Engine;
221    let centroid_bytes: Vec<u8> = centroid
222        .values
223        .iter()
224        .flat_map(|v| v.to_le_bytes())
225        .collect();
226    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
227    DataFileEntry {
228        path: path.to_string(),
229        record_count,
230        file_size_bytes,
231        centroid_b64: Some(centroid_b64),
232        radius: Some(centroid.radius),
233        hnsw_offset: None,
234        hnsw_len: None,
235        vector_column: Some(column.to_string()),
236        vector_dim: Some(dim),
237        extra_vector_indexes: vec![],
238        index_status: IndexStatus::Indexing,
239        batch_id: None,
240        embedding_model: None,
241    }
242}
243
244/// Encode a centroid to base64 for use in ExtraVectorIndex.
245pub fn encode_centroid_b64(centroid: &Centroid) -> String {
246    use base64::Engine;
247    let bytes: Vec<u8> = centroid
248        .values
249        .iter()
250        .flat_map(|v| v.to_le_bytes())
251        .collect();
252    base64::engine::general_purpose::STANDARD.encode(&bytes)
253}
254
255/// Decode centroid bytes from base64 in a DataFileEntry.
256pub fn decode_centroid(
257    entry: &DataFileEntry,
258    metric: ailake_core::VectorMetric,
259) -> Option<Centroid> {
260    use base64::Engine;
261    let b64 = entry.centroid_b64.as_ref()?;
262    let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
263    let values: Vec<f32> = bytes
264        .chunks_exact(4)
265        .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
266        .collect();
267    Some(Centroid {
268        values,
269        radius: entry.radius.unwrap_or(0.0),
270        metric,
271    })
272}
273
274pub fn new_snapshot_id() -> SnapshotId {
275    // Use timestamp-based ID for simplicity (Iceberg uses i64)
276    std::time::SystemTime::now()
277        .duration_since(std::time::UNIX_EPOCH)
278        .unwrap()
279        .as_millis() as i64
280}