1use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11 #[default]
13 Ready,
14 Indexing,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21 pub namespace: String,
22 pub name: String,
23}
24
25impl TableIdent {
26 pub fn new(namespace: &str, name: &str) -> Self {
27 Self {
28 namespace: namespace.to_string(),
29 name: name.to_string(),
30 }
31 }
32}
33
34pub type SnapshotId = i64;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39 pub column: String,
40 pub dim: u32,
41 pub hnsw_offset: u64,
42 pub hnsw_len: u64,
43 pub centroid_b64: Option<String>,
44 pub radius: Option<f32>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50 pub path: String,
52 pub record_count: u64,
53 pub file_size_bytes: u64,
54 pub centroid_b64: Option<String>,
56 pub radius: Option<f32>,
57 pub hnsw_offset: Option<u64>,
58 pub hnsw_len: Option<u64>,
59 pub vector_column: Option<String>,
60 pub vector_dim: Option<u32>,
61 #[serde(default)]
63 pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64 #[serde(default)]
66 pub index_status: IndexStatus,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub batch_id: Option<String>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub embedding_model: Option<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TableMetadata {
80 pub table_uuid: String,
81 pub format_version: i32,
82 pub location: String,
83 pub properties: HashMap<String, String>,
85 pub current_snapshot_id: Option<SnapshotId>,
86}
87
88#[derive(Debug, Clone)]
94pub struct IcebergSchemaUpdate {
95 pub fields: Vec<serde_json::Value>,
97 pub last_column_id: i32,
98 pub name_mapping_json: String,
100}
101
102#[derive(Debug, Clone)]
104pub struct NewSnapshot {
105 pub snapshot_id: SnapshotId,
106 pub parent_snapshot_id: Option<SnapshotId>,
107 pub files: Vec<DataFileEntry>,
108 pub operation: SnapshotOperation,
109 pub iceberg_schema: Option<IcebergSchemaUpdate>,
111}
112
113#[derive(Debug, Clone)]
114pub enum SnapshotOperation {
115 Append,
116 Overwrite,
117 Delete,
118 Replace,
119}
120
121#[derive(Debug, Clone)]
123pub struct TableProperties {
124 pub policy: VectorStoragePolicy,
125 pub extra: HashMap<String, String>,
126}
127
128#[async_trait]
130pub trait CatalogProvider: Send + Sync {
131 async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
132
133 async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
134
135 async fn commit_snapshot(
136 &self,
137 table: &TableIdent,
138 snapshot: NewSnapshot,
139 ) -> AilakeResult<SnapshotId>;
140
141 async fn list_files(
142 &self,
143 table: &TableIdent,
144 snapshot_id: Option<SnapshotId>,
145 ) -> AilakeResult<Vec<DataFileEntry>>;
146
147 async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
148}
149
150pub struct VectorIndexInfo<'a> {
152 pub column: &'a str,
153 pub dim: u32,
154 pub hnsw_offset: u64,
155 pub hnsw_len: u64,
156}
157
158pub fn make_data_file_entry(
160 path: &str,
161 record_count: u64,
162 file_size_bytes: u64,
163 centroid: &Centroid,
164 index: VectorIndexInfo<'_>,
165) -> DataFileEntry {
166 make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
167}
168
169pub fn make_multi_column_data_file_entry(
174 path: &str,
175 record_count: u64,
176 file_size_bytes: u64,
177 primary_centroid: &Centroid,
178 primary_index: VectorIndexInfo<'_>,
179 extra: &[ExtraVectorIndex],
180) -> DataFileEntry {
181 use base64::Engine;
182 let centroid_bytes: Vec<u8> = primary_centroid
183 .values
184 .iter()
185 .flat_map(|v| v.to_le_bytes())
186 .collect();
187 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
188 DataFileEntry {
189 path: path.to_string(),
190 record_count,
191 file_size_bytes,
192 centroid_b64: Some(centroid_b64),
193 radius: Some(primary_centroid.radius),
194 hnsw_offset: Some(primary_index.hnsw_offset),
195 hnsw_len: Some(primary_index.hnsw_len),
196 vector_column: Some(primary_index.column.to_string()),
197 vector_dim: Some(primary_index.dim),
198 extra_vector_indexes: extra.to_vec(),
199 index_status: IndexStatus::Ready,
200 batch_id: None,
201 embedding_model: None,
202 }
203}
204
205pub fn make_data_file_entry_indexing(
210 path: &str,
211 record_count: u64,
212 file_size_bytes: u64,
213 centroid: &Centroid,
214 column: &str,
215 dim: u32,
216) -> DataFileEntry {
217 use base64::Engine;
218 let centroid_bytes: Vec<u8> = centroid
219 .values
220 .iter()
221 .flat_map(|v| v.to_le_bytes())
222 .collect();
223 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
224 DataFileEntry {
225 path: path.to_string(),
226 record_count,
227 file_size_bytes,
228 centroid_b64: Some(centroid_b64),
229 radius: Some(centroid.radius),
230 hnsw_offset: None,
231 hnsw_len: None,
232 vector_column: Some(column.to_string()),
233 vector_dim: Some(dim),
234 extra_vector_indexes: vec![],
235 index_status: IndexStatus::Indexing,
236 batch_id: None,
237 embedding_model: None,
238 }
239}
240
241pub fn encode_centroid_b64(centroid: &Centroid) -> String {
243 use base64::Engine;
244 let bytes: Vec<u8> = centroid
245 .values
246 .iter()
247 .flat_map(|v| v.to_le_bytes())
248 .collect();
249 base64::engine::general_purpose::STANDARD.encode(&bytes)
250}
251
252pub fn decode_centroid(
254 entry: &DataFileEntry,
255 metric: ailake_core::VectorMetric,
256) -> Option<Centroid> {
257 use base64::Engine;
258 let b64 = entry.centroid_b64.as_ref()?;
259 let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
260 let values: Vec<f32> = bytes
261 .chunks_exact(4)
262 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
263 .collect();
264 Some(Centroid {
265 values,
266 radius: entry.radius.unwrap_or(0.0),
267 metric,
268 })
269}
270
271pub fn new_snapshot_id() -> SnapshotId {
272 std::time::SystemTime::now()
274 .duration_since(std::time::UNIX_EPOCH)
275 .unwrap()
276 .as_millis() as i64
277}