1use std::collections::HashMap;
6
7use ailake_core::{AilakeError, AilakeResult, VectorStoragePolicy};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use uuid::Uuid;
11
12use crate::provider::{SnapshotId, TableMetadata};
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct IcebergMetadata {
16 #[serde(rename = "format-version")]
17 pub format_version: i32,
18 #[serde(rename = "table-uuid")]
19 pub table_uuid: String,
20 pub location: String,
21 #[serde(rename = "last-sequence-number", default)]
22 pub last_sequence_number: i64,
23 #[serde(rename = "last-updated-ms")]
24 pub last_updated_ms: i64,
25 #[serde(rename = "last-column-id", default)]
26 pub last_column_id: i32,
27 #[serde(default)]
28 pub schemas: Vec<Value>,
29 #[serde(rename = "current-schema-id", default)]
30 pub current_schema_id: i32,
31 #[serde(rename = "partition-specs", default)]
32 pub partition_specs: Vec<Value>,
33 #[serde(rename = "default-spec-id", default)]
34 pub default_spec_id: i32,
35 #[serde(rename = "last-partition-id", default)]
36 pub last_partition_id: i32,
37 #[serde(default)]
38 pub properties: HashMap<String, String>,
39 #[serde(rename = "current-snapshot-id", default)]
40 pub current_snapshot_id: Option<SnapshotId>,
41 #[serde(default)]
42 pub snapshots: Vec<IcebergSnapshot>,
43 #[serde(rename = "snapshot-log", default)]
44 pub snapshot_log: Vec<Value>,
45 #[serde(rename = "metadata-log", default)]
46 pub metadata_log: Vec<Value>,
47 #[serde(rename = "sort-orders", default)]
48 pub sort_orders: Vec<Value>,
49 #[serde(rename = "default-sort-order-id", default)]
50 pub default_sort_order_id: i32,
51 #[serde(rename = "refs", default)]
52 pub refs: HashMap<String, Value>,
53}
54
55#[derive(Debug, Serialize, Deserialize, Clone)]
56pub struct IcebergSnapshot {
57 #[serde(rename = "snapshot-id")]
58 pub snapshot_id: SnapshotId,
59 #[serde(rename = "parent-snapshot-id", skip_serializing_if = "Option::is_none")]
60 pub parent_snapshot_id: Option<SnapshotId>,
61 #[serde(rename = "sequence-number")]
62 pub sequence_number: i64,
63 #[serde(rename = "timestamp-ms")]
64 pub timestamp_ms: i64,
65 #[serde(rename = "manifest-list")]
66 pub manifest_list: String,
67 pub summary: HashMap<String, String>,
68 #[serde(rename = "schema-id")]
69 pub schema_id: Option<i32>,
70}
71
72impl IcebergMetadata {
73 pub fn new(location: &str, policy: &VectorStoragePolicy) -> Self {
75 let mut properties = HashMap::new();
76 properties.insert("ailake.format-version".to_string(), "1".to_string());
77 properties.insert(
78 "ailake.vector-column".to_string(),
79 policy.column_name.clone(),
80 );
81 properties.insert("ailake.vector-dim".to_string(), policy.dim.to_string());
82 properties.insert(
83 "ailake.vector-metric".to_string(),
84 format!("{:?}", policy.metric).to_lowercase(),
85 );
86 properties.insert(
87 "ailake.vector-precision".to_string(),
88 format!("{:?}", policy.precision).to_lowercase(),
89 );
90 if let Some(m) = policy.hnsw_m {
91 properties.insert("ailake.hnsw-m".to_string(), m.to_string());
92 }
93 if let Some(ef) = policy.hnsw_ef_construction {
94 properties.insert("ailake.hnsw-ef-construction".to_string(), ef.to_string());
95 }
96
97 let now_ms = now_ms();
98 IcebergMetadata {
99 format_version: 2,
100 table_uuid: Uuid::new_v4().to_string(),
101 location: location.to_string(),
102 last_sequence_number: 0,
103 last_updated_ms: now_ms,
104 last_column_id: 0,
105 schemas: vec![serde_json::json!({"schema-id": 0, "type": "struct", "fields": []})],
106 current_schema_id: 0,
107 partition_specs: vec![serde_json::json!({"spec-id": 0, "fields": []})],
108 default_spec_id: 0,
109 last_partition_id: 999,
110 properties,
111 current_snapshot_id: None,
112 snapshots: vec![],
113 snapshot_log: vec![],
114 metadata_log: vec![],
115 sort_orders: vec![serde_json::json!({"order-id": 0, "fields": []})],
116 default_sort_order_id: 0,
117 refs: HashMap::new(),
118 }
119 }
120
121 pub fn to_json(&self) -> AilakeResult<String> {
122 serde_json::to_string_pretty(self).map_err(AilakeError::Json)
123 }
124
125 pub fn from_json(s: &str) -> AilakeResult<Self> {
126 serde_json::from_str(s).map_err(AilakeError::Json)
127 }
128
129 pub fn to_table_metadata(&self) -> TableMetadata {
130 TableMetadata {
131 table_uuid: self.table_uuid.clone(),
132 format_version: self.format_version,
133 location: self.location.clone(),
134 properties: self.properties.clone(),
135 current_snapshot_id: self.current_snapshot_id,
136 }
137 }
138}
139
140fn now_ms() -> i64 {
141 std::time::SystemTime::now()
142 .duration_since(std::time::UNIX_EPOCH)
143 .unwrap()
144 .as_millis() as i64
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use ailake_core::{VectorMetric, VectorPrecision};
151
152 fn make_policy() -> VectorStoragePolicy {
153 VectorStoragePolicy {
154 column_name: "embedding".to_string(),
155 dim: 4,
156 metric: VectorMetric::Cosine,
157 precision: VectorPrecision::F16,
158 pq: None,
159 keep_raw_for_reranking: false,
160 pre_normalize: false,
161 hnsw_m: None,
162 hnsw_ef_construction: None,
163 rabitq: None,
164 }
165 }
166
167 #[test]
168 fn roundtrip_json() {
169 let meta = IcebergMetadata::new("s3://my-lake/my_table", &make_policy());
170 let json = meta.to_json().unwrap();
171 let meta2 = IcebergMetadata::from_json(&json).unwrap();
172 assert_eq!(meta2.format_version, 2);
173 assert_eq!(
174 meta2.properties.get("ailake.vector-column"),
175 Some(&"embedding".to_string())
176 );
177 }
178
179 #[test]
180 fn properties_contain_ailake_keys() {
181 let meta = IcebergMetadata::new("file:///tmp/tbl", &make_policy());
182 assert!(meta.properties.contains_key("ailake.format-version"));
183 assert!(meta.properties.contains_key("ailake.vector-dim"));
184 }
185}