use std::collections::HashMap;
use ailake_core::{AilakeError, AilakeResult, EmbeddingModelInfo, VectorStoragePolicy};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use crate::provider::{PartitionField, PartitionSpec, SnapshotId, TableMetadata};
#[derive(Debug, Serialize, Deserialize)]
pub struct IcebergMetadata {
#[serde(rename = "format-version")]
pub format_version: i32,
#[serde(rename = "table-uuid")]
pub table_uuid: String,
pub location: String,
#[serde(rename = "last-sequence-number", default)]
pub last_sequence_number: i64,
#[serde(rename = "next-row-id", default)]
pub next_row_id: i64,
#[serde(rename = "last-updated-ms")]
pub last_updated_ms: i64,
#[serde(rename = "last-column-id", default)]
pub last_column_id: i32,
#[serde(default)]
pub schemas: Vec<Value>,
#[serde(rename = "current-schema-id", default)]
pub current_schema_id: i32,
#[serde(rename = "partition-specs", default)]
pub partition_specs: Vec<Value>,
#[serde(rename = "default-spec-id", default)]
pub default_spec_id: i32,
#[serde(rename = "last-partition-id", default)]
pub last_partition_id: i32,
#[serde(default)]
pub properties: HashMap<String, String>,
#[serde(rename = "current-snapshot-id", default)]
pub current_snapshot_id: Option<SnapshotId>,
#[serde(default)]
pub snapshots: Vec<IcebergSnapshot>,
#[serde(rename = "snapshot-log", default)]
pub snapshot_log: Vec<Value>,
#[serde(rename = "metadata-log", default)]
pub metadata_log: Vec<Value>,
#[serde(rename = "sort-orders", default)]
pub sort_orders: Vec<Value>,
#[serde(rename = "default-sort-order-id", default)]
pub default_sort_order_id: i32,
#[serde(rename = "refs", default)]
pub refs: HashMap<String, Value>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub statistics: Vec<IcebergStatisticsRef>,
#[serde(
rename = "partition-statistics",
default,
skip_serializing_if = "Vec::is_empty"
)]
pub partition_statistics: Vec<IcebergPartitionStatsRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IcebergStatisticsRef {
#[serde(rename = "snapshot-id")]
pub snapshot_id: i64,
#[serde(rename = "statistics-path")]
pub statistics_path: String,
#[serde(rename = "file-size-in-bytes")]
pub file_size_in_bytes: u64,
#[serde(rename = "file-footer-size-in-bytes")]
pub file_footer_size_in_bytes: u64,
#[serde(
rename = "blob-file-references",
default,
skip_serializing_if = "Vec::is_empty"
)]
pub blob_file_references: Vec<BlobRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobRef {
#[serde(rename = "type")]
pub blob_type: String,
#[serde(rename = "snapshot-id")]
pub snapshot_id: i64,
#[serde(default)]
pub fields: Vec<i32>,
pub offset: u64,
pub length: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IcebergPartitionStatsRef {
#[serde(rename = "snapshot-id")]
pub snapshot_id: i64,
#[serde(rename = "statistics-path")]
pub statistics_path: String,
#[serde(rename = "file-size-in-bytes")]
pub file_size_in_bytes: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct IcebergSnapshot {
#[serde(rename = "snapshot-id")]
pub snapshot_id: SnapshotId,
#[serde(rename = "parent-snapshot-id", skip_serializing_if = "Option::is_none")]
pub parent_snapshot_id: Option<SnapshotId>,
#[serde(rename = "sequence-number")]
pub sequence_number: i64,
#[serde(rename = "timestamp-ms")]
pub timestamp_ms: i64,
#[serde(rename = "manifest-list")]
pub manifest_list: String,
pub summary: HashMap<String, String>,
#[serde(rename = "schema-id")]
pub schema_id: Option<i32>,
}
impl IcebergMetadata {
pub fn new(
location: &str,
policy: &VectorStoragePolicy,
format_version: u8,
partition_column_type: Option<&str>,
partition_fields: &[ailake_core::PartitionDef],
) -> Self {
let mut properties = HashMap::new();
properties.insert("ailake.format-version".to_string(), "1".to_string());
properties.insert(
"ailake.vector-column".to_string(),
policy.column_name.clone(),
);
properties.insert("ailake.vector-dim".to_string(), policy.dim.to_string());
properties.insert(
"ailake.vector-metric".to_string(),
format!("{:?}", policy.metric).to_lowercase(),
);
properties.insert(
"ailake.vector-precision".to_string(),
format!("{:?}", policy.precision).to_lowercase(),
);
if let Some(m) = policy.hnsw_m {
properties.insert("ailake.hnsw-m".to_string(), m.to_string());
}
if let Some(ef) = policy.hnsw_ef_construction {
properties.insert("ailake.hnsw-ef-construction".to_string(), ef.to_string());
}
if let Some(model) = &policy.embedding_model {
properties.insert(
EmbeddingModelInfo::property_key().to_string(),
model.to_property_value(),
);
if let Some(dim) = model.dim {
properties.insert("ailake.embedding-model-dim".to_string(), dim.to_string());
}
if let Some(metric) = model.metric {
properties.insert(
"ailake.embedding-model-metric".to_string(),
format!("{:?}", metric).to_lowercase(),
);
}
}
if let Some(modality) = policy.modality {
properties.insert(
format!("ailake.modality-{}", policy.column_name),
modality.as_str().to_string(),
);
}
if let Some(col) = &policy.partition_by {
properties.insert("ailake.partition-by".to_string(), col.clone());
}
let (schemas, last_column_id, partition_specs, default_spec_id, last_partition_id) =
if !partition_fields.is_empty() {
let schema_fields: Vec<serde_json::Value> = partition_fields
.iter()
.enumerate()
.map(|(i, pf)| {
serde_json::json!({
"id": i + 1,
"name": pf.column,
"required": false,
"type": pf.column_type
})
})
.collect();
let spec_fields: Vec<serde_json::Value> = partition_fields
.iter()
.enumerate()
.map(|(i, pf)| {
serde_json::json!({
"name": pf.column,
"transform": pf.transform,
"source-id": i + 1,
"field-id": 1000 + i as i32
})
})
.collect();
let schema = serde_json::json!({
"schema-id": 0, "type": "struct", "fields": schema_fields
});
let spec = serde_json::json!({
"spec-id": 1, "fields": spec_fields
});
let n = partition_fields.len() as i32;
(
vec![schema],
n,
vec![serde_json::json!({"spec-id": 0, "fields": []}), spec],
1,
1000 + n - 1,
)
} else if let Some(col) = &policy.partition_by {
let col_type = partition_column_type.unwrap_or("string");
let schema = serde_json::json!({
"schema-id": 0,
"type": "struct",
"fields": [{
"id": 1,
"name": col,
"required": false,
"type": col_type
}]
});
let spec = serde_json::json!({
"spec-id": 1,
"fields": [{
"name": col,
"transform": "identity",
"source-id": 1,
"field-id": 1000
}]
});
(
vec![schema],
1,
vec![serde_json::json!({"spec-id": 0, "fields": []}), spec],
1,
1000,
)
} else {
(
vec![serde_json::json!({"schema-id": 0, "type": "struct", "fields": []})],
0,
vec![serde_json::json!({"spec-id": 0, "fields": []})],
0,
999,
)
};
let format_version = format_version.max(2) as i32;
if format_version >= 3 {
eprintln!(
"[ailake] WARN: creating Iceberg V3 table at {location} — \
append/update workloads fully supported; \
equality deletes not implemented"
);
}
let now_ms = now_ms();
IcebergMetadata {
format_version,
table_uuid: Uuid::new_v4().to_string(),
location: location.to_string(),
last_sequence_number: 0,
next_row_id: 0,
last_updated_ms: now_ms,
last_column_id,
schemas,
current_schema_id: 0,
partition_specs,
default_spec_id,
last_partition_id,
properties,
current_snapshot_id: None,
snapshots: vec![],
snapshot_log: vec![],
metadata_log: vec![],
sort_orders: vec![serde_json::json!({"order-id": 0, "fields": []})],
default_sort_order_id: 0,
refs: HashMap::new(),
statistics: vec![],
partition_statistics: vec![],
}
}
pub fn to_json(&self) -> AilakeResult<String> {
serde_json::to_string_pretty(self).map_err(AilakeError::Json)
}
pub fn from_json(s: &str) -> AilakeResult<Self> {
serde_json::from_str(s).map_err(AilakeError::Json)
}
pub fn to_table_metadata(&self) -> TableMetadata {
let current_statistics_path = self.current_snapshot_id.and_then(|snap_id| {
self.statistics
.iter()
.rev()
.find(|s| s.snapshot_id == snap_id)
.map(|s| s.statistics_path.clone())
});
let schema_fields = self
.schemas
.iter()
.find(|s| s["schema-id"].as_i64() == Some(self.current_schema_id as i64))
.and_then(|s| s["fields"].as_array().cloned())
.map(|fields| {
fields
.into_iter()
.filter_map(|f| {
let id = f["id"].as_i64()? as i32;
let name = f["name"].as_str()?.to_string();
let required = f["required"].as_bool().unwrap_or(false);
let iceberg_type = match &f["type"] {
Value::String(s) => s.clone(),
other => other.to_string(),
};
let initial_default =
f.get("initial-default").filter(|v| !v.is_null()).cloned();
let write_default =
f.get("write-default").filter(|v| !v.is_null()).cloned();
Some(crate::provider::SchemaField {
id,
name,
required,
iceberg_type,
initial_default,
write_default,
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let partition_spec = self
.partition_specs
.iter()
.find(|s| s["spec-id"].as_i64() == Some(self.default_spec_id as i64))
.and_then(|s| {
let spec_id = s["spec-id"].as_i64()? as i32;
let fields: Vec<PartitionField> = s["fields"]
.as_array()?
.iter()
.filter_map(|f| {
let source_id = f["source-id"].as_i64()? as i32;
let field_id = f["field-id"].as_i64()? as i32;
let name = f["name"].as_str()?.to_string();
let transform = f["transform"].as_str().unwrap_or("identity").to_string();
let source_type = schema_fields
.iter()
.find(|sf| sf.id == source_id)
.map(|sf| sf.iceberg_type.clone())
.unwrap_or_else(|| "string".to_string());
Some(PartitionField {
source_id,
field_id,
name,
transform,
source_type,
})
})
.collect();
if fields.is_empty() {
None
} else {
Some(PartitionSpec { spec_id, fields })
}
});
TableMetadata {
table_uuid: self.table_uuid.clone(),
format_version: self.format_version,
location: self.location.clone(),
properties: self.properties.clone(),
current_snapshot_id: self.current_snapshot_id,
current_statistics_path,
schema_fields,
equality_delete_files: vec![], partition_spec,
}
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[cfg(test)]
mod tests {
use super::*;
use ailake_core::{VectorMetric, VectorPrecision};
fn make_policy() -> VectorStoragePolicy {
VectorStoragePolicy {
column_name: "embedding".to_string(),
dim: 4,
metric: VectorMetric::Cosine,
precision: VectorPrecision::F16,
pq: None,
keep_raw_for_reranking: true,
pre_normalize: false,
hnsw_m: None,
hnsw_ef_construction: None,
ivf_residual: false,
embedding_model: None,
modality: None,
partition_by: None,
partition_value: None,
partition_column_type: None,
partition_fields: vec![],
}
}
#[test]
fn roundtrip_json() {
let meta = IcebergMetadata::new("s3://my-lake/my_table", &make_policy(), 2, None, &[]);
let json = meta.to_json().unwrap();
let meta2 = IcebergMetadata::from_json(&json).unwrap();
assert_eq!(meta2.format_version, 2);
assert_eq!(
meta2.properties.get("ailake.vector-column"),
Some(&"embedding".to_string())
);
}
#[test]
fn properties_contain_ailake_keys() {
let meta = IcebergMetadata::new("file:///tmp/tbl", &make_policy(), 2, None, &[]);
assert!(meta.properties.contains_key("ailake.format-version"));
assert!(meta.properties.contains_key("ailake.vector-dim"));
}
#[test]
fn embedding_model_stored_in_properties() {
use ailake_core::EmbeddingModelInfo;
let mut policy = make_policy();
policy.embedding_model =
Some(EmbeddingModelInfo::new("text-embedding-3-small").with_version("2024-01"));
let meta = IcebergMetadata::new("file:///tmp/tbl", &policy, 2, None, &[]);
assert_eq!(
meta.properties
.get("ailake.embedding-model")
.map(|s| s.as_str()),
Some("text-embedding-3-small@2024-01")
);
}
#[test]
fn format_version_v3_emitted() {
let meta = IcebergMetadata::new("s3://my-lake/v3_table", &make_policy(), 3, None, &[]);
let json = meta.to_json().unwrap();
let meta2 = IcebergMetadata::from_json(&json).unwrap();
assert_eq!(meta2.format_version, 3);
}
#[test]
fn format_version_defaults_to_v2() {
let meta = IcebergMetadata::new("s3://my-lake/v2_table", &make_policy(), 2, None, &[]);
assert_eq!(meta.format_version, 2);
}
#[test]
fn partition_spec_written_to_metadata_json() {
let mut policy = make_policy();
policy.partition_by = Some("agent_id".to_string());
let meta = IcebergMetadata::new("s3://my-lake/agents", &policy, 2, Some("string"), &[]);
let schema = &meta.schemas[0];
let fields = schema["fields"].as_array().unwrap();
assert_eq!(fields.len(), 1);
assert_eq!(fields[0]["name"].as_str(), Some("agent_id"));
assert_eq!(fields[0]["id"].as_i64(), Some(1));
assert_eq!(fields[0]["type"].as_str(), Some("string"));
assert_eq!(meta.default_spec_id, 1);
let active = meta
.partition_specs
.iter()
.find(|s| s["spec-id"] == 1)
.unwrap();
let pf = &active["fields"][0];
assert_eq!(pf["name"].as_str(), Some("agent_id"));
assert_eq!(pf["transform"].as_str(), Some("identity"));
assert_eq!(pf["source-id"].as_i64(), Some(1));
assert_eq!(pf["field-id"].as_i64(), Some(1000));
assert_eq!(meta.last_partition_id, 1000);
assert_eq!(meta.last_column_id, 1);
let tm = meta.to_table_metadata();
let spec = tm.partition_spec.expect("partition_spec must be Some");
assert_eq!(spec.spec_id, 1);
assert_eq!(spec.fields.len(), 1);
assert_eq!(spec.fields[0].name, "agent_id");
assert_eq!(spec.fields[0].source_id, 1);
assert_eq!(spec.fields[0].field_id, 1000);
assert_eq!(spec.fields[0].transform, "identity");
assert_eq!(spec.fields[0].source_type, "string");
}
#[test]
fn unpartitioned_table_has_no_partition_spec() {
let meta = IcebergMetadata::new("s3://my-lake/simple", &make_policy(), 2, None, &[]);
let tm = meta.to_table_metadata();
assert!(tm.partition_spec.is_none());
}
#[test]
fn partition_spec_int_type() {
let mut policy = make_policy();
policy.partition_by = Some("shard_id".to_string());
let meta = IcebergMetadata::new("s3://my-lake/shards", &policy, 2, Some("int"), &[]);
let schema = &meta.schemas[0];
assert_eq!(schema["fields"][0]["type"].as_str(), Some("int"));
let tm = meta.to_table_metadata();
let spec = tm.partition_spec.unwrap();
assert_eq!(spec.fields[0].source_type, "int");
}
}