1use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
9#[serde(rename_all = "lowercase")]
10pub enum IndexStatus {
11 #[default]
13 Ready,
14 Indexing,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct TableIdent {
21 pub namespace: String,
22 pub name: String,
23}
24
25impl TableIdent {
26 pub fn new(namespace: &str, name: &str) -> Self {
27 Self {
28 namespace: namespace.to_string(),
29 name: name.to_string(),
30 }
31 }
32}
33
34pub type SnapshotId = i64;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ExtraVectorIndex {
39 pub column: String,
40 pub dim: u32,
41 pub hnsw_offset: u64,
42 pub hnsw_len: u64,
43 pub centroid_b64: Option<String>,
44 pub radius: Option<f32>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataFileEntry {
50 pub path: String,
52 pub record_count: u64,
53 pub file_size_bytes: u64,
54 pub centroid_b64: Option<String>,
56 pub radius: Option<f32>,
57 pub hnsw_offset: Option<u64>,
58 pub hnsw_len: Option<u64>,
59 pub vector_column: Option<String>,
60 pub vector_dim: Option<u32>,
61 #[serde(default)]
63 pub extra_vector_indexes: Vec<ExtraVectorIndex>,
64 #[serde(default)]
66 pub index_status: IndexStatus,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub batch_id: Option<String>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub embedding_model: Option<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TableMetadata {
80 pub table_uuid: String,
81 pub format_version: i32,
82 pub location: String,
83 pub properties: HashMap<String, String>,
85 pub current_snapshot_id: Option<SnapshotId>,
86}
87
88#[derive(Debug, Clone)]
94pub struct IcebergSchemaUpdate {
95 pub fields: Vec<serde_json::Value>,
97 pub last_column_id: i32,
98 pub name_mapping_json: String,
100}
101
102#[derive(Debug, Clone)]
104pub struct NewSnapshot {
105 pub snapshot_id: SnapshotId,
106 pub parent_snapshot_id: Option<SnapshotId>,
107 pub files: Vec<DataFileEntry>,
108 pub operation: SnapshotOperation,
109 pub iceberg_schema: Option<IcebergSchemaUpdate>,
111 pub extra_properties: HashMap<String, String>,
114}
115
116#[derive(Debug, Clone)]
117pub enum SnapshotOperation {
118 Append,
119 Overwrite,
120 Delete,
121 Replace,
122}
123
124#[derive(Debug, Clone)]
126pub struct TableProperties {
127 pub policy: VectorStoragePolicy,
128 pub extra: HashMap<String, String>,
129}
130
131#[async_trait]
133pub trait CatalogProvider: Send + Sync {
134 async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
135
136 async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
137
138 async fn commit_snapshot(
139 &self,
140 table: &TableIdent,
141 snapshot: NewSnapshot,
142 ) -> AilakeResult<SnapshotId>;
143
144 async fn list_files(
145 &self,
146 table: &TableIdent,
147 snapshot_id: Option<SnapshotId>,
148 ) -> AilakeResult<Vec<DataFileEntry>>;
149
150 async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
151}
152
153pub struct VectorIndexInfo<'a> {
155 pub column: &'a str,
156 pub dim: u32,
157 pub hnsw_offset: u64,
158 pub hnsw_len: u64,
159}
160
161pub fn make_data_file_entry(
163 path: &str,
164 record_count: u64,
165 file_size_bytes: u64,
166 centroid: &Centroid,
167 index: VectorIndexInfo<'_>,
168) -> DataFileEntry {
169 make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
170}
171
172pub fn make_multi_column_data_file_entry(
177 path: &str,
178 record_count: u64,
179 file_size_bytes: u64,
180 primary_centroid: &Centroid,
181 primary_index: VectorIndexInfo<'_>,
182 extra: &[ExtraVectorIndex],
183) -> DataFileEntry {
184 use base64::Engine;
185 let centroid_bytes: Vec<u8> = primary_centroid
186 .values
187 .iter()
188 .flat_map(|v| v.to_le_bytes())
189 .collect();
190 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
191 DataFileEntry {
192 path: path.to_string(),
193 record_count,
194 file_size_bytes,
195 centroid_b64: Some(centroid_b64),
196 radius: Some(primary_centroid.radius),
197 hnsw_offset: Some(primary_index.hnsw_offset),
198 hnsw_len: Some(primary_index.hnsw_len),
199 vector_column: Some(primary_index.column.to_string()),
200 vector_dim: Some(primary_index.dim),
201 extra_vector_indexes: extra.to_vec(),
202 index_status: IndexStatus::Ready,
203 batch_id: None,
204 embedding_model: None,
205 }
206}
207
208pub fn make_data_file_entry_indexing(
213 path: &str,
214 record_count: u64,
215 file_size_bytes: u64,
216 centroid: &Centroid,
217 column: &str,
218 dim: u32,
219) -> DataFileEntry {
220 use base64::Engine;
221 let centroid_bytes: Vec<u8> = centroid
222 .values
223 .iter()
224 .flat_map(|v| v.to_le_bytes())
225 .collect();
226 let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
227 DataFileEntry {
228 path: path.to_string(),
229 record_count,
230 file_size_bytes,
231 centroid_b64: Some(centroid_b64),
232 radius: Some(centroid.radius),
233 hnsw_offset: None,
234 hnsw_len: None,
235 vector_column: Some(column.to_string()),
236 vector_dim: Some(dim),
237 extra_vector_indexes: vec![],
238 index_status: IndexStatus::Indexing,
239 batch_id: None,
240 embedding_model: None,
241 }
242}
243
244pub fn encode_centroid_b64(centroid: &Centroid) -> String {
246 use base64::Engine;
247 let bytes: Vec<u8> = centroid
248 .values
249 .iter()
250 .flat_map(|v| v.to_le_bytes())
251 .collect();
252 base64::engine::general_purpose::STANDARD.encode(&bytes)
253}
254
255pub fn decode_centroid(
257 entry: &DataFileEntry,
258 metric: ailake_core::VectorMetric,
259) -> Option<Centroid> {
260 use base64::Engine;
261 let b64 = entry.centroid_b64.as_ref()?;
262 let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
263 let values: Vec<f32> = bytes
264 .chunks_exact(4)
265 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
266 .collect();
267 Some(Centroid {
268 values,
269 radius: entry.radius.unwrap_or(0.0),
270 metric,
271 })
272}
273
274pub fn new_snapshot_id() -> SnapshotId {
275 std::time::SystemTime::now()
277 .duration_since(std::time::UNIX_EPOCH)
278 .unwrap()
279 .as_millis() as i64
280}