1use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11 #[default]
13 Ready,
14 Indexing,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21 pub namespace: String,
22 pub name: String,
23}
24
25impl TableIdent {
26 pub fn new(namespace: &str, name: &str) -> Self {
27 Self {
28 namespace: namespace.to_string(),
29 name: name.to_string(),
30 }
31 }
32}
33
34pub type SnapshotId = i64;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39 pub column: String,
40 pub dim: u32,
41 pub hnsw_offset: u64,
42 pub hnsw_len: u64,
43 pub centroid_b64: Option<String>,
44 pub radius: Option<f32>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50 pub path: String,
52 pub record_count: u64,
53 pub file_size_bytes: u64,
54 pub centroid_b64: Option<String>,
56 pub radius: Option<f32>,
57 pub hnsw_offset: Option<u64>,
58 pub hnsw_len: Option<u64>,
59 pub vector_column: Option<String>,
60 pub vector_dim: Option<u32>,
61 #[serde(default)]
63 pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64 #[serde(default)]
66 pub index_status: IndexStatus,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub batch_id: Option<String>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct TableMetadata {
76 pub table_uuid: String,
77 pub format_version: i32,
78 pub location: String,
79 pub properties: HashMap<String, String>,
81 pub current_snapshot_id: Option<SnapshotId>,
82}
83
84#[derive(Debug, Clone)]
90pub struct IcebergSchemaUpdate {
91 pub fields: Vec<serde_json::Value>,
93 pub last_column_id: i32,
94 pub name_mapping_json: String,
96}
97
98#[derive(Debug, Clone)]
100pub struct NewSnapshot {
101 pub snapshot_id: SnapshotId,
102 pub parent_snapshot_id: Option<SnapshotId>,
103 pub files: Vec<DataFileEntry>,
104 pub operation: SnapshotOperation,
105 pub iceberg_schema: Option<IcebergSchemaUpdate>,
107}
108
109#[derive(Debug, Clone)]
110pub enum SnapshotOperation {
111 Append,
112 Overwrite,
113 Delete,
114 Replace,
115}
116
117#[derive(Debug, Clone)]
119pub struct TableProperties {
120 pub policy: VectorStoragePolicy,
121 pub extra: HashMap<String, String>,
122}
123
124#[async_trait]
126pub trait CatalogProvider: Send + Sync {
127 async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
128
129 async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
130
131 async fn commit_snapshot(
132 &self,
133 table: &TableIdent,
134 snapshot: NewSnapshot,
135 ) -> AilakeResult<SnapshotId>;
136
137 async fn list_files(
138 &self,
139 table: &TableIdent,
140 snapshot_id: Option<SnapshotId>,
141 ) -> AilakeResult<Vec<DataFileEntry>>;
142
143 async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
144}
145
146pub struct VectorIndexInfo<'a> {
148 pub column: &'a str,
149 pub dim: u32,
150 pub hnsw_offset: u64,
151 pub hnsw_len: u64,
152}
153
154pub fn make_data_file_entry(
156 path: &str,
157 record_count: u64,
158 file_size_bytes: u64,
159 centroid: &Centroid,
160 index: VectorIndexInfo<'_>,
161) -> DataFileEntry {
162 make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
163}
164
165pub fn make_multi_column_data_file_entry(
170 path: &str,
171 record_count: u64,
172 file_size_bytes: u64,
173 primary_centroid: &Centroid,
174 primary_index: VectorIndexInfo<'_>,
175 extra: &[ExtraVectorIndex],
176) -> DataFileEntry {
177 use base64::Engine;
178 let centroid_bytes: Vec<u8> = primary_centroid
179 .values
180 .iter()
181 .flat_map(|v| v.to_le_bytes())
182 .collect();
183 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
184 DataFileEntry {
185 path: path.to_string(),
186 record_count,
187 file_size_bytes,
188 centroid_b64: Some(centroid_b64),
189 radius: Some(primary_centroid.radius),
190 hnsw_offset: Some(primary_index.hnsw_offset),
191 hnsw_len: Some(primary_index.hnsw_len),
192 vector_column: Some(primary_index.column.to_string()),
193 vector_dim: Some(primary_index.dim),
194 extra_vector_indexes: extra.to_vec(),
195 index_status: IndexStatus::Ready,
196 batch_id: None,
197 }
198}
199
200pub fn make_data_file_entry_indexing(
205 path: &str,
206 record_count: u64,
207 file_size_bytes: u64,
208 centroid: &Centroid,
209 column: &str,
210 dim: u32,
211) -> DataFileEntry {
212 use base64::Engine;
213 let centroid_bytes: Vec<u8> = centroid
214 .values
215 .iter()
216 .flat_map(|v| v.to_le_bytes())
217 .collect();
218 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
219 DataFileEntry {
220 path: path.to_string(),
221 record_count,
222 file_size_bytes,
223 centroid_b64: Some(centroid_b64),
224 radius: Some(centroid.radius),
225 hnsw_offset: None,
226 hnsw_len: None,
227 vector_column: Some(column.to_string()),
228 vector_dim: Some(dim),
229 extra_vector_indexes: vec![],
230 index_status: IndexStatus::Indexing,
231 batch_id: None,
232 }
233}
234
235pub fn encode_centroid_b64(centroid: &Centroid) -> String {
237 use base64::Engine;
238 let bytes: Vec<u8> = centroid
239 .values
240 .iter()
241 .flat_map(|v| v.to_le_bytes())
242 .collect();
243 base64::engine::general_purpose::STANDARD.encode(&bytes)
244}
245
246pub fn decode_centroid(
248 entry: &DataFileEntry,
249 metric: ailake_core::VectorMetric,
250) -> Option<Centroid> {
251 use base64::Engine;
252 let b64 = entry.centroid_b64.as_ref()?;
253 let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
254 let values: Vec<f32> = bytes
255 .chunks_exact(4)
256 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
257 .collect();
258 Some(Centroid {
259 values,
260 radius: entry.radius.unwrap_or(0.0),
261 metric,
262 })
263}
264
265pub fn new_snapshot_id() -> SnapshotId {
266 std::time::SystemTime::now()
268 .duration_since(std::time::UNIX_EPOCH)
269 .unwrap()
270 .as_millis() as i64
271}