Skip to main content

ailake_catalog/
provider.rs

1use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6/// Whether a shard's HNSW index has been built.
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
8#[serde(rename_all = "lowercase")]
9pub enum IndexStatus {
10    /// HNSW index embedded in the file — normal HNSW search applies.
11    #[default]
12    Ready,
13    /// Parquet written; HNSW build running in background — flat scan applies.
14    Indexing,
15}
16
17/// Fully-qualified table identifier: namespace.table_name.
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct TableIdent {
20    pub namespace: String,
21    pub name: String,
22}
23
24impl TableIdent {
25    pub fn new(namespace: &str, name: &str) -> Self {
26        Self {
27            namespace: namespace.to_string(),
28            name: name.to_string(),
29        }
30    }
31}
32
33pub type SnapshotId = i64;
34
35/// HNSW index info for one additional (non-primary) vector column.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ExtraVectorIndex {
38    pub column: String,
39    pub dim: u32,
40    pub hnsw_offset: u64,
41    pub hnsw_len: u64,
42    pub centroid_b64: Option<String>,
43    pub radius: Option<f32>,
44}
45
46/// Metadata about a single data file in a table snapshot.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataFileEntry {
49    /// Relative path within the warehouse (e.g., "data/part-00001.parquet")
50    pub path: String,
51    pub record_count: u64,
52    pub file_size_bytes: u64,
53    /// base64-encoded centroid F32 values (primary vector column)
54    pub centroid_b64: Option<String>,
55    pub radius: Option<f32>,
56    pub hnsw_offset: Option<u64>,
57    pub hnsw_len: Option<u64>,
58    pub vector_column: Option<String>,
59    pub vector_dim: Option<u32>,
60    /// Additional vector columns beyond the primary (empty for single-column tables).
61    #[serde(default)]
62    pub extra_vector_indexes: Vec<ExtraVectorIndex>,
63    /// Index build status. Defaults to Ready for backward compatibility with old manifests.
64    #[serde(default)]
65    pub index_status: IndexStatus,
66}
67
68/// Iceberg-compatible table metadata read from the catalog.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct TableMetadata {
71    pub table_uuid: String,
72    pub format_version: i32,
73    pub location: String,
74    /// Ailake-specific properties: ailake.vector-column, ailake.dim, etc.
75    pub properties: HashMap<String, String>,
76    pub current_snapshot_id: Option<SnapshotId>,
77}
78
79/// Iceberg schema update carried inside a snapshot commit.
80///
81/// When present, `HadoopCatalog::commit_snapshot` patches `schemas[0].fields`,
82/// `last-column-id`, and `schema.name-mapping.default` in the persisted metadata.
83/// REST/Glue/JDBC backends that delegate schema management to the server ignore this.
84#[derive(Debug, Clone)]
85pub struct IcebergSchemaUpdate {
86    /// Iceberg-typed field descriptors, e.g. `[{"id":1,"name":"id","required":false,"type":"int"}]`.
87    pub fields: Vec<serde_json::Value>,
88    pub last_column_id: i32,
89    /// Compact JSON string: `[{"field-id":1,"names":["id"]},...]`.
90    pub name_mapping_json: String,
91}
92
93/// Snapshot commit request.
94#[derive(Debug, Clone)]
95pub struct NewSnapshot {
96    pub snapshot_id: SnapshotId,
97    pub parent_snapshot_id: Option<SnapshotId>,
98    pub files: Vec<DataFileEntry>,
99    pub operation: SnapshotOperation,
100    /// When set, the catalog backend should update the table schema on commit.
101    pub iceberg_schema: Option<IcebergSchemaUpdate>,
102}
103
104#[derive(Debug, Clone)]
105pub enum SnapshotOperation {
106    Append,
107    Overwrite,
108    Delete,
109    Replace,
110}
111
112/// Schema properties passed at table creation time.
113#[derive(Debug, Clone)]
114pub struct TableProperties {
115    pub policy: VectorStoragePolicy,
116    pub extra: HashMap<String, String>,
117}
118
119/// Unified catalog interface. All backends implement this trait.
120#[async_trait]
121pub trait CatalogProvider: Send + Sync {
122    async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
123
124    async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
125
126    async fn commit_snapshot(
127        &self,
128        table: &TableIdent,
129        snapshot: NewSnapshot,
130    ) -> AilakeResult<SnapshotId>;
131
132    async fn list_files(
133        &self,
134        table: &TableIdent,
135        snapshot_id: Option<SnapshotId>,
136    ) -> AilakeResult<Vec<DataFileEntry>>;
137
138    async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
139}
140
141/// Vector index metadata for a single data file.
142pub struct VectorIndexInfo<'a> {
143    pub column: &'a str,
144    pub dim: u32,
145    pub hnsw_offset: u64,
146    pub hnsw_len: u64,
147}
148
149/// Build DataFileEntry from a centroid, encoding it as base64.
150pub fn make_data_file_entry(
151    path: &str,
152    record_count: u64,
153    file_size_bytes: u64,
154    centroid: &Centroid,
155    index: VectorIndexInfo<'_>,
156) -> DataFileEntry {
157    make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
158}
159
160/// Build DataFileEntry for a file with multiple vector columns.
161///
162/// `primary_centroid` and `primary_index` describe the primary (first) vector column.
163/// `extra` contains info for additional columns.
164pub fn make_multi_column_data_file_entry(
165    path: &str,
166    record_count: u64,
167    file_size_bytes: u64,
168    primary_centroid: &Centroid,
169    primary_index: VectorIndexInfo<'_>,
170    extra: &[ExtraVectorIndex],
171) -> DataFileEntry {
172    use base64::Engine;
173    let centroid_bytes: Vec<u8> = primary_centroid
174        .values
175        .iter()
176        .flat_map(|v| v.to_le_bytes())
177        .collect();
178    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
179    DataFileEntry {
180        path: path.to_string(),
181        record_count,
182        file_size_bytes,
183        centroid_b64: Some(centroid_b64),
184        radius: Some(primary_centroid.radius),
185        hnsw_offset: Some(primary_index.hnsw_offset),
186        hnsw_len: Some(primary_index.hnsw_len),
187        vector_column: Some(primary_index.column.to_string()),
188        vector_dim: Some(primary_index.dim),
189        extra_vector_indexes: extra.to_vec(),
190        index_status: IndexStatus::Ready,
191    }
192}
193
194/// Build a DataFileEntry for a file whose HNSW is still being built asynchronously.
195///
196/// `hnsw_offset` and `hnsw_len` are `None`; `index_status` is `Indexing`.
197/// The centroid is included so geometric pruning still works during the build window.
198pub fn make_data_file_entry_indexing(
199    path: &str,
200    record_count: u64,
201    file_size_bytes: u64,
202    centroid: &Centroid,
203    column: &str,
204    dim: u32,
205) -> DataFileEntry {
206    use base64::Engine;
207    let centroid_bytes: Vec<u8> = centroid
208        .values
209        .iter()
210        .flat_map(|v| v.to_le_bytes())
211        .collect();
212    let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(&centroid_bytes);
213    DataFileEntry {
214        path: path.to_string(),
215        record_count,
216        file_size_bytes,
217        centroid_b64: Some(centroid_b64),
218        radius: Some(centroid.radius),
219        hnsw_offset: None,
220        hnsw_len: None,
221        vector_column: Some(column.to_string()),
222        vector_dim: Some(dim),
223        extra_vector_indexes: vec![],
224        index_status: IndexStatus::Indexing,
225    }
226}
227
228/// Encode a centroid to base64 for use in ExtraVectorIndex.
229pub fn encode_centroid_b64(centroid: &Centroid) -> String {
230    use base64::Engine;
231    let bytes: Vec<u8> = centroid
232        .values
233        .iter()
234        .flat_map(|v| v.to_le_bytes())
235        .collect();
236    base64::engine::general_purpose::STANDARD.encode(&bytes)
237}
238
239/// Decode centroid bytes from base64 in a DataFileEntry.
240pub fn decode_centroid(
241    entry: &DataFileEntry,
242    metric: ailake_core::VectorMetric,
243) -> Option<Centroid> {
244    use base64::Engine;
245    let b64 = entry.centroid_b64.as_ref()?;
246    let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
247    let values: Vec<f32> = bytes
248        .chunks_exact(4)
249        .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
250        .collect();
251    Some(Centroid {
252        values,
253        radius: entry.radius.unwrap_or(0.0),
254        metric,
255    })
256}
257
258pub fn new_snapshot_id() -> SnapshotId {
259    // Use timestamp-based ID for simplicity (Iceberg uses i64)
260    std::time::SystemTime::now()
261        .duration_since(std::time::UNIX_EPOCH)
262        .unwrap()
263        .as_millis() as i64
264}