Skip to main content

ailake_catalog/
provider.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Whether a shard's HNSW index has been built.
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11    /// HNSW index embedded in the file — normal HNSW search applies.
12    #[default]
13    Ready,
14    /// Parquet written; HNSW build running in background — flat scan applies.
15    Indexing,
16}
17
18/// Fully-qualified table identifier: namespace.table_name.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21    pub namespace: String,
22    pub name: String,
23}
24
25impl TableIdent {
26    pub fn new(namespace: &str, name: &str) -> Self {
27        Self {
28            namespace: namespace.to_string(),
29            name: name.to_string(),
30        }
31    }
32}
33
34pub type SnapshotId = i64;
35
36/// HNSW index info for one additional (non-primary) vector column.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39    pub column: String,
40    pub dim: u32,
41    pub hnsw_offset: u64,
42    pub hnsw_len: u64,
43    pub centroid_b64: Option<String>,
44    pub radius: Option<f32>,
45}
46
47/// Metadata about a single data file in a table snapshot.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50    /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
51    pub path: String,
52    pub record_count: u64,
53    pub file_size_bytes: u64,
54    /// base64-encoded centroid F32 values (primary vector column)
55    pub centroid_b64: Option<String>,
56    pub radius: Option<f32>,
57    pub hnsw_offset: Option<u64>,
58    pub hnsw_len: Option<u64>,
59    pub vector_column: Option<String>,
60    pub vector_dim: Option<u32>,
61    /// Additional vector columns beyond the primary (empty for single-column tables).
62    #[serde(default)]
63    pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64    /// Index build status. Defaults to Ready for backward compatibility with old manifests.
65    #[serde(default)]
66    pub index_status: IndexStatus,
67    /// Caller-supplied idempotency key. When set, `write_batch_idempotent` skips the
68    /// write if a file with the same batch_id is already committed in the snapshot.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub batch_id: Option<String>,
71}
72
73/// Iceberg-compatible table metadata read from the catalog.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct TableMetadata {
76    pub table_uuid: String,
77    pub format_version: i32,
78    pub location: String,
79    /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
80    pub properties: HashMap<String, String>,
81    pub current_snapshot_id: Option<SnapshotId>,
82}
83
84/// Iceberg schema update carried inside a snapshot commit.
85///
86/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
87/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
88/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
89#[derive(Debug, Clone)]
90pub struct IcebergSchemaUpdate {
91    /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
92    pub fields: Vec<serde_json::Value>,
93    pub last_column_id: i32,
94    /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
95    pub name_mapping_json: String,
96}
97
98/// Snapshot commit request.
99#[derive(Debug, Clone)]
100pub struct NewSnapshot {
101    pub snapshot_id: SnapshotId,
102    pub parent_snapshot_id: Option<SnapshotId>,
103    pub files: Vec<DataFileEntry>,
104    pub operation: SnapshotOperation,
105    /// When set, the catalog backend should update the table schema on commit.
106    pub iceberg_schema: Option<IcebergSchemaUpdate>,
107}
108
109#[derive(Debug, Clone)]
110pub enum SnapshotOperation {
111    Append,
112    Overwrite,
113    Delete,
114    Replace,
115}
116
117/// Schema properties passed at table creation time.
118#[derive(Debug, Clone)]
119pub struct TableProperties {
120    pub policy: VectorStoragePolicy,
121    pub extra: HashMap<String, String>,
122}
123
124/// Unified catalog interface. All backends implement this trait.
125#[async_trait]
126pub trait CatalogProvider: Send + Sync {
127    async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
128
129    async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
130
131    async fn commit_snapshot(
132        &self,
133        table: &TableIdent,
134        snapshot: NewSnapshot,
135    ) -> AilakeResult<SnapshotId>;
136
137    async fn list_files(
138        &self,
139        table: &TableIdent,
140        snapshot_id: Option<SnapshotId>,
141    ) -> AilakeResult<Vec<DataFileEntry>>;
142
143    async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
144}
145
146/// Vector index metadata for a single data file.
147pub struct VectorIndexInfo<'a> {
148    pub column: &'a str,
149    pub dim: u32,
150    pub hnsw_offset: u64,
151    pub hnsw_len: u64,
152}
153
154/// Build DataFileEntry from a centroid, encoding it as base64.
155pub fn make_data_file_entry(
156    path: &str,
157    record_count: u64,
158    file_size_bytes: u64,
159    centroid: &Centroid,
160    index: VectorIndexInfo<'_>,
161) -> DataFileEntry {
162    make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
163}
164
165/// Build DataFileEntry for a file with multiple vector columns.
166///
167/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
168/// `extra` contains info for additional columns.
169pub fn make_multi_column_data_file_entry(
170    path: &str,
171    record_count: u64,
172    file_size_bytes: u64,
173    primary_centroid: &Centroid,
174    primary_index: VectorIndexInfo<'_>,
175    extra: &[ExtraVectorIndex],
176) -> DataFileEntry {
177    use base64::Engine;
178    let centroid_bytes: Vec<u8> = primary_centroid
179        .values
180        .iter()
181        .flat_map(|v| v.to_le_bytes())
182        .collect();
183    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
184    DataFileEntry {
185        path: path.to_string(),
186        record_count,
187        file_size_bytes,
188        centroid_b64: Some(centroid_b64),
189        radius: Some(primary_centroid.radius),
190        hnsw_offset: Some(primary_index.hnsw_offset),
191        hnsw_len: Some(primary_index.hnsw_len),
192        vector_column: Some(primary_index.column.to_string()),
193        vector_dim: Some(primary_index.dim),
194        extra_vector_indexes: extra.to_vec(),
195        index_status: IndexStatus::Ready,
196        batch_id: None,
197    }
198}
199
200/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
201///
202/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
203/// The centroid is included so geometric pruning still works during the build window.
204pub fn make_data_file_entry_indexing(
205    path: &str,
206    record_count: u64,
207    file_size_bytes: u64,
208    centroid: &Centroid,
209    column: &str,
210    dim: u32,
211) -> DataFileEntry {
212    use base64::Engine;
213    let centroid_bytes: Vec<u8> = centroid
214        .values
215        .iter()
216        .flat_map(|v| v.to_le_bytes())
217        .collect();
218    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
219    DataFileEntry {
220        path: path.to_string(),
221        record_count,
222        file_size_bytes,
223        centroid_b64: Some(centroid_b64),
224        radius: Some(centroid.radius),
225        hnsw_offset: None,
226        hnsw_len: None,
227        vector_column: Some(column.to_string()),
228        vector_dim: Some(dim),
229        extra_vector_indexes: vec![],
230        index_status: IndexStatus::Indexing,
231        batch_id: None,
232    }
233}
234
235/// Encode a centroid to base64 for use in ExtraVectorIndex.
236pub fn encode_centroid_b64(centroid: &Centroid) -> String {
237    use base64::Engine;
238    let bytes: Vec<u8> = centroid
239        .values
240        .iter()
241        .flat_map(|v| v.to_le_bytes())
242        .collect();
243    base64::engine::general_purpose::STANDARD.encode(&bytes)
244}
245
246/// Decode centroid bytes from base64 in a DataFileEntry.
247pub fn decode_centroid(
248    entry: &DataFileEntry,
249    metric: ailake_core::VectorMetric,
250) -> Option<Centroid> {
251    use base64::Engine;
252    let b64 = entry.centroid_b64.as_ref()?;
253    let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
254    let values: Vec<f32> = bytes
255        .chunks_exact(4)
256        .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
257        .collect();
258    Some(Centroid {
259        values,
260        radius: entry.radius.unwrap_or(0.0),
261        metric,
262    })
263}
264
265pub fn new_snapshot_id() -> SnapshotId {
266    // Use timestamp-based ID for simplicity (Iceberg uses i64)
267    std::time::SystemTime::now()
268        .duration_since(std::time::UNIX_EPOCH)
269        .unwrap()
270        .as_millis() as i64
271}