use ailake_core::{AilakeResult, Centroid, VectorStoragePolicy};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "lowercase")]
pub enum IndexStatus {
#[default]
Ready,
Indexing,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableIdent {
pub namespace: String,
pub name: String,
}
impl TableIdent {
pub fn new(namespace: &str, name: &str) -> Self {
Self {
namespace: namespace.to_string(),
name: name.to_string(),
}
}
}
pub type SnapshotId = i64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtraVectorIndex {
pub column: String,
pub dim: u32,
pub hnsw_offset: u64,
pub hnsw_len: u64,
pub centroid_b64: Option<String>,
pub radius: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataFileEntry {
pub path: String,
pub record_count: u64,
pub file_size_bytes: u64,
pub centroid_b64: Option<String>,
pub radius: Option<f32>,
pub hnsw_offset: Option<u64>,
pub hnsw_len: Option<u64>,
pub vector_column: Option<String>,
pub vector_dim: Option<u32>,
#[serde(default)]
pub extra_vector_indexes: Vec<ExtraVectorIndex>,
#[serde(default)]
pub index_status: IndexStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableMetadata {
pub table_uuid: String,
pub format_version: i32,
pub location: String,
pub properties: HashMap<String, String>,
pub current_snapshot_id: Option<SnapshotId>,
}
#[derive(Debug, Clone)]
pub struct IcebergSchemaUpdate {
pub fields: Vec<serde_json::Value>,
pub last_column_id: i32,
pub name_mapping_json: String,
}
#[derive(Debug, Clone)]
pub struct NewSnapshot {
pub snapshot_id: SnapshotId,
pub parent_snapshot_id: Option<SnapshotId>,
pub files: Vec<DataFileEntry>,
pub operation: SnapshotOperation,
pub iceberg_schema: Option<IcebergSchemaUpdate>,
}
#[derive(Debug, Clone)]
pub enum SnapshotOperation {
Append,
Overwrite,
Delete,
Replace,
}
#[derive(Debug, Clone)]
pub struct TableProperties {
pub policy: VectorStoragePolicy,
pub extra: HashMap<String, String>,
}
#[async_trait]
pub trait CatalogProvider: Send + Sync {
async fn create_table(&self, name: &TableIdent, props: &TableProperties) -> AilakeResult<()>;
async fn load_table(&self, name: &TableIdent) -> AilakeResult<TableMetadata>;
async fn commit_snapshot(
&self,
table: &TableIdent,
snapshot: NewSnapshot,
) -> AilakeResult<SnapshotId>;
async fn list_files(
&self,
table: &TableIdent,
snapshot_id: Option<SnapshotId>,
) -> AilakeResult<Vec<DataFileEntry>>;
async fn drop_table(&self, name: &TableIdent) -> AilakeResult<()>;
}
pub struct VectorIndexInfo<'a> {
pub column: &'a str,
pub dim: u32,
pub hnsw_offset: u64,
pub hnsw_len: u64,
}
pub fn make_data_file_entry(
path: &str,
record_count: u64,
file_size_bytes: u64,
centroid: &Centroid,
index: VectorIndexInfo<'_>,
) -> DataFileEntry {
make_multi_column_data_file_entry(path, record_count, file_size_bytes, centroid, index, &[])
}
pub fn make_multi_column_data_file_entry(
path: &str,
record_count: u64,
file_size_bytes: u64,
primary_centroid: &Centroid,
primary_index: VectorIndexInfo<'_>,
extra: &[ExtraVectorIndex],
) -> DataFileEntry {
use base64::Engine;
let centroid_bytes: Vec<u8> = primary_centroid
.values
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
DataFileEntry {
path: path.to_string(),
record_count,
file_size_bytes,
centroid_b64: Some(centroid_b64),
radius: Some(primary_centroid.radius),
hnsw_offset: Some(primary_index.hnsw_offset),
hnsw_len: Some(primary_index.hnsw_len),
vector_column: Some(primary_index.column.to_string()),
vector_dim: Some(primary_index.dim),
extra_vector_indexes: extra.to_vec(),
index_status: IndexStatus::Ready,
}
}
pub fn make_data_file_entry_indexing(
path: &str,
record_count: u64,
file_size_bytes: u64,
centroid: &Centroid,
column: &str,
dim: u32,
) -> DataFileEntry {
use base64::Engine;
let centroid_bytes: Vec<u8> = centroid
.values
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
let centroid_b64 = base64::engine::general_purpose::STANDARD.encode(¢roid_bytes);
DataFileEntry {
path: path.to_string(),
record_count,
file_size_bytes,
centroid_b64: Some(centroid_b64),
radius: Some(centroid.radius),
hnsw_offset: None,
hnsw_len: None,
vector_column: Some(column.to_string()),
vector_dim: Some(dim),
extra_vector_indexes: vec![],
index_status: IndexStatus::Indexing,
}
}
pub fn encode_centroid_b64(centroid: &Centroid) -> String {
use base64::Engine;
let bytes: Vec<u8> = centroid
.values
.iter()
.flat_map(|v| v.to_le_bytes())
.collect();
base64::engine::general_purpose::STANDARD.encode(&bytes)
}
pub fn decode_centroid(
entry: &DataFileEntry,
metric: ailake_core::VectorMetric,
) -> Option<Centroid> {
use base64::Engine;
let b64 = entry.centroid_b64.as_ref()?;
let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
let values: Vec<f32> = bytes
.chunks_exact(4)
.map(|b| f32::from_le_bytes(b.try_into().unwrap()))
.collect();
Some(Centroid {
values,
radius: entry.radius.unwrap_or(0.0),
metric,
})
}
pub fn new_snapshot_id() -> SnapshotId {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}