1use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
8#[serde(rename_all = "lowercase")]
9pub enum IndexStatus {
10 #[default]
12 Ready,
13 Indexing,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct TableIdent {
20 pub namespace: String,
21 pub name: String,
22}
23
24impl TableIdent {
25 pub fn new(namespace: &str, name: &str) -> Self {
26 Self {
27 namespace: namespace.to_string(),
28 name: name.to_string(),
29 }
30 }
31}
32
33pub type SnapshotId = i64;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ExtraVectorIndex {
38 pub column: String,
39 pub dim: u32,
40 pub hnsw_offset: u64,
41 pub hnsw_len: u64,
42 pub centroid_b64: Option<String>,
43 pub radius: Option<f32>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataFileEntry {
49 pub path: String,
51 pub record_count: u64,
52 pub file_size_bytes: u64,
53 pub centroid_b64: Option<String>,
55 pub radius: Option<f32>,
56 pub hnsw_offset: Option<u64>,
57 pub hnsw_len: Option<u64>,
58 pub vector_column: Option<String>,
59 pub vector_dim: Option<u32>,
60 #[serde(default)]
62 pub extra_vector_indexes: Vec<ExtraVectorIndex>,
63 #[serde(default)]
65 pub index_status: IndexStatus,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct TableMetadata {
71 pub table_uuid: String,
72 pub format_version: i32,
73 pub location: String,
74 pub properties: HashMap<String, String>,
76 pub current_snapshot_id: Option<SnapshotId>,
77}
78
79#[derive(Debug, Clone)]
85pub struct IcebergSchemaUpdate {
86 pub fields: Vec<serde_json::Value>,
88 pub last_column_id: i32,
89 pub name_mapping_json: String,
91}
92
93#[derive(Debug, Clone)]
95pub struct NewSnapshot {
96 pub snapshot_id: SnapshotId,
97 pub parent_snapshot_id: Option<SnapshotId>,
98 pub files: Vec<DataFileEntry>,
99 pub operation: SnapshotOperation,
100 pub iceberg_schema: Option<IcebergSchemaUpdate>,
102}
103
104#[derive(Debug, Clone)]
105pub enum SnapshotOperation {
106 Append,
107 Overwrite,
108 Delete,
109 Replace,
110}
111
112#[derive(Debug, Clone)]
114pub struct TableProperties {
115 pub policy: VectorStoragePolicy,
116 pub extra: HashMap<String, String>,
117}
118
119#[async_trait]
121pub trait CatalogProvider: Send + Sync {
122 async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
123
124 async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
125
126 async fn commit_snapshot(
127 &self,
128 table: &TableIdent,
129 snapshot: NewSnapshot,
130 ) -> AilakeResult<SnapshotId>;
131
132 async fn list_files(
133 &self,
134 table: &TableIdent,
135 snapshot_id: Option<SnapshotId>,
136 ) -> AilakeResult<Vec<DataFileEntry>>;
137
138 async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
139}
140
141pub struct VectorIndexInfo<'a> {
143 pub column: &'a str,
144 pub dim: u32,
145 pub hnsw_offset: u64,
146 pub hnsw_len: u64,
147}
148
149pub fn make_data_file_entry(
151 path: &str,
152 record_count: u64,
153 file_size_bytes: u64,
154 centroid: &Centroid,
155 index: VectorIndexInfo<'_>,
156) -> DataFileEntry {
157 make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
158}
159
160pub fn make_multi_column_data_file_entry(
165 path: &str,
166 record_count: u64,
167 file_size_bytes: u64,
168 primary_centroid: &Centroid,
169 primary_index: VectorIndexInfo<'_>,
170 extra: &[ExtraVectorIndex],
171) -> DataFileEntry {
172 use base64::Engine;
173 let centroid_bytes: Vec<u8> = primary_centroid
174 .values
175 .iter()
176 .flat_map(|v| v.to_le_bytes())
177 .collect();
178 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
179 DataFileEntry {
180 path: path.to_string(),
181 record_count,
182 file_size_bytes,
183 centroid_b64: Some(centroid_b64),
184 radius: Some(primary_centroid.radius),
185 hnsw_offset: Some(primary_index.hnsw_offset),
186 hnsw_len: Some(primary_index.hnsw_len),
187 vector_column: Some(primary_index.column.to_string()),
188 vector_dim: Some(primary_index.dim),
189 extra_vector_indexes: extra.to_vec(),
190 index_status: IndexStatus::Ready,
191 }
192}
193
194pub fn make_data_file_entry_indexing(
199 path: &str,
200 record_count: u64,
201 file_size_bytes: u64,
202 centroid: &Centroid,
203 column: &str,
204 dim: u32,
205) -> DataFileEntry {
206 use base64::Engine;
207 let centroid_bytes: Vec<u8> = centroid
208 .values
209 .iter()
210 .flat_map(|v| v.to_le_bytes())
211 .collect();
212 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
213 DataFileEntry {
214 path: path.to_string(),
215 record_count,
216 file_size_bytes,
217 centroid_b64: Some(centroid_b64),
218 radius: Some(centroid.radius),
219 hnsw_offset: None,
220 hnsw_len: None,
221 vector_column: Some(column.to_string()),
222 vector_dim: Some(dim),
223 extra_vector_indexes: vec![],
224 index_status: IndexStatus::Indexing,
225 }
226}
227
228pub fn encode_centroid_b64(centroid: &Centroid) -> String {
230 use base64::Engine;
231 let bytes: Vec<u8> = centroid
232 .values
233 .iter()
234 .flat_map(|v| v.to_le_bytes())
235 .collect();
236 base64::engine::general_purpose::STANDARD.encode(&bytes)
237}
238
239pub fn decode_centroid(
241 entry: &DataFileEntry,
242 metric: ailake_core::VectorMetric,
243) -> Option<Centroid> {
244 use base64::Engine;
245 let b64 = entry.centroid_b64.as_ref()?;
246 let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
247 let values: Vec<f32> = bytes
248 .chunks_exact(4)
249 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
250 .collect();
251 Some(Centroid {
252 values,
253 radius: entry.radius.unwrap_or(0.0),
254 metric,
255 })
256}
257
258pub fn new_snapshot_id() -> SnapshotId {
259 std::time::SystemTime::now()
261 .duration_since(std::time::UNIX_EPOCH)
262 .unwrap()
263 .as_millis() as i64
264}