Skip to main content

ailake_catalog/
metadata.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// Iceberg Spec v2 metadata.json read/write.
3// Only the fields needed by AI-Lake are modelled — the rest are passed through as JSON.
4
5use std::collections::HashMap;
6
7use ailake_core::{AilakeError, AilakeResult, VectorStoragePolicy};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use uuid::Uuid;
11
12use crate::provider::{SnapshotId, TableMetadata};
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct IcebergMetadata {
16    #[serde(rename = "format-version")]
17    pub format_version: i32,
18    #[serde(rename = "table-uuid")]
19    pub table_uuid: String,
20    pub location: String,
21    #[serde(rename = "last-sequence-number", default)]
22    pub last_sequence_number: i64,
23    #[serde(rename = "last-updated-ms")]
24    pub last_updated_ms: i64,
25    #[serde(rename = "last-column-id", default)]
26    pub last_column_id: i32,
27    #[serde(default)]
28    pub schemas: Vec<Value>,
29    #[serde(rename = "current-schema-id", default)]
30    pub current_schema_id: i32,
31    #[serde(rename = "partition-specs", default)]
32    pub partition_specs: Vec<Value>,
33    #[serde(rename = "default-spec-id", default)]
34    pub default_spec_id: i32,
35    #[serde(rename = "last-partition-id", default)]
36    pub last_partition_id: i32,
37    #[serde(default)]
38    pub properties: HashMap<String, String>,
39    #[serde(rename = "current-snapshot-id", default)]
40    pub current_snapshot_id: Option<SnapshotId>,
41    #[serde(default)]
42    pub snapshots: Vec<IcebergSnapshot>,
43    #[serde(rename = "snapshot-log", default)]
44    pub snapshot_log: Vec<Value>,
45    #[serde(rename = "metadata-log", default)]
46    pub metadata_log: Vec<Value>,
47    #[serde(rename = "sort-orders", default)]
48    pub sort_orders: Vec<Value>,
49    #[serde(rename = "default-sort-order-id", default)]
50    pub default_sort_order_id: i32,
51    #[serde(rename = "refs", default)]
52    pub refs: HashMap<String, Value>,
53}
54
55#[derive(Debug, Serialize, Deserialize, Clone)]
56pub struct IcebergSnapshot {
57    #[serde(rename = "snapshot-id")]
58    pub snapshot_id: SnapshotId,
59    #[serde(rename = "parent-snapshot-id", skip_serializing_if = "Option::is_none")]
60    pub parent_snapshot_id: Option<SnapshotId>,
61    #[serde(rename = "sequence-number")]
62    pub sequence_number: i64,
63    #[serde(rename = "timestamp-ms")]
64    pub timestamp_ms: i64,
65    #[serde(rename = "manifest-list")]
66    pub manifest_list: String,
67    pub summary: HashMap<String, String>,
68    #[serde(rename = "schema-id")]
69    pub schema_id: Option<i32>,
70}
71
72impl IcebergMetadata {
73    /// Create a new metadata.json for a fresh AI-Lake table.
74    pub fn new(location: &str, policy: &VectorStoragePolicy) -> Self {
75        let mut properties = HashMap::new();
76        properties.insert("ailake.format-version".to_string(), "1".to_string());
77        properties.insert(
78            "ailake.vector-column".to_string(),
79            policy.column_name.clone(),
80        );
81        properties.insert("ailake.vector-dim".to_string(), policy.dim.to_string());
82        properties.insert(
83            "ailake.vector-metric".to_string(),
84            format!("{:?}", policy.metric).to_lowercase(),
85        );
86        properties.insert(
87            "ailake.vector-precision".to_string(),
88            format!("{:?}", policy.precision).to_lowercase(),
89        );
90        if let Some(m) = policy.hnsw_m {
91            properties.insert("ailake.hnsw-m".to_string(), m.to_string());
92        }
93        if let Some(ef) = policy.hnsw_ef_construction {
94            properties.insert("ailake.hnsw-ef-construction".to_string(), ef.to_string());
95        }
96
97        let now_ms = now_ms();
98        IcebergMetadata {
99            format_version: 2,
100            table_uuid: Uuid::new_v4().to_string(),
101            location: location.to_string(),
102            last_sequence_number: 0,
103            last_updated_ms: now_ms,
104            last_column_id: 0,
105            schemas: vec![serde_json::json!({"schema-id": 0, "type": "struct", "fields": []})],
106            current_schema_id: 0,
107            partition_specs: vec![serde_json::json!({"spec-id": 0, "fields": []})],
108            default_spec_id: 0,
109            last_partition_id: 999,
110            properties,
111            current_snapshot_id: None,
112            snapshots: vec![],
113            snapshot_log: vec![],
114            metadata_log: vec![],
115            sort_orders: vec![serde_json::json!({"order-id": 0, "fields": []})],
116            default_sort_order_id: 0,
117            refs: HashMap::new(),
118        }
119    }
120
121    pub fn to_json(&self) -> AilakeResult<String> {
122        serde_json::to_string_pretty(self).map_err(AilakeError::Json)
123    }
124
125    pub fn from_json(s: &str) -> AilakeResult<Self> {
126        serde_json::from_str(s).map_err(AilakeError::Json)
127    }
128
129    pub fn to_table_metadata(&self) -> TableMetadata {
130        TableMetadata {
131            table_uuid: self.table_uuid.clone(),
132            format_version: self.format_version,
133            location: self.location.clone(),
134            properties: self.properties.clone(),
135            current_snapshot_id: self.current_snapshot_id,
136        }
137    }
138}
139
140fn now_ms() -> i64 {
141    std::time::SystemTime::now()
142        .duration_since(std::time::UNIX_EPOCH)
143        .unwrap()
144        .as_millis() as i64
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use ailake_core::{VectorMetric, VectorPrecision};
151
152    fn make_policy() -> VectorStoragePolicy {
153        VectorStoragePolicy {
154            column_name: "embedding".to_string(),
155            dim: 4,
156            metric: VectorMetric::Cosine,
157            precision: VectorPrecision::F16,
158            pq: None,
159            keep_raw_for_reranking: false,
160            pre_normalize: false,
161            hnsw_m: None,
162            hnsw_ef_construction: None,
163            rabitq: None,
164        }
165    }
166
167    #[test]
168    fn roundtrip_json() {
169        let meta = IcebergMetadata::new("s3://my-lake/my_table", &make_policy());
170        let json = meta.to_json().unwrap();
171        let meta2 = IcebergMetadata::from_json(&json).unwrap();
172        assert_eq!(meta2.format_version, 2);
173        assert_eq!(
174            meta2.properties.get("ailake.vector-column"),
175            Some(&"embedding".to_string())
176        );
177    }
178
179    #[test]
180    fn properties_contain_ailake_keys() {
181        let meta = IcebergMetadata::new("file:///tmp/tbl", &make_policy());
182        assert!(meta.properties.contains_key("ailake.format-version"));
183        assert!(meta.properties.contains_key("ailake.vector-dim"));
184    }
185}