1use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use anyhow::{Result, anyhow};
8use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
9use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
10use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
11use lance::dataset::Dataset;
12use lancedb::Table;
13use lancedb::index::Index as LanceDbIndex;
14use lancedb::index::scalar::BTreeIndexBuilder;
15use sha3::{Digest, Sha3_256};
16use std::collections::HashMap;
17use std::sync::Arc;
18use uni_common::Properties;
19use uni_common::core::id::{UniId, Vid};
20use uni_common::core::schema::Schema;
21
22pub struct VertexDataset {
23 uri: String,
24 label: String,
25 _label_id: u16,
26}
27
28impl VertexDataset {
29 pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
30 let uri = format!("{}/vertices_{}", base_uri, label);
31 Self {
32 uri,
33 label: label.to_string(),
34 _label_id: label_id,
35 }
36 }
37
38 pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
41 let mut hasher = Sha3_256::new();
42
43 hasher.update(label.as_bytes());
45 hasher.update(b"\x00"); if let Some(eid) = ext_id {
49 hasher.update(eid.as_bytes());
50 }
51 hasher.update(b"\x00");
52
53 let mut sorted_props: Vec<_> = properties.iter().collect();
55 sorted_props.sort_by_key(|(k, _)| *k);
56 for (key, value) in sorted_props {
57 hasher.update(key.as_bytes());
58 hasher.update(b"=");
59 hasher.update(value.to_string().as_bytes());
60 hasher.update(b"\x00");
61 }
62
63 let hash: [u8; 32] = hasher.finalize().into();
64 UniId::from_bytes(hash)
65 }
66
67 pub async fn open(&self) -> Result<Arc<Dataset>> {
68 self.open_at(None).await
69 }
70
71 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
72 let mut ds = Dataset::open(&self.uri).await?;
73 if let Some(v) = version {
74 ds = ds.checkout_version(v).await?;
75 }
76 Ok(Arc::new(ds))
77 }
78
79 pub async fn open_raw(&self) -> Result<Dataset> {
80 let ds = Dataset::open(&self.uri).await?;
81 Ok(ds)
82 }
83
84 pub fn build_record_batch(
88 &self,
89 vertices: &[(Vid, Vec<String>, Properties)],
90 deleted: &[bool],
91 versions: &[u64],
92 schema: &Schema,
93 ) -> Result<RecordBatch> {
94 self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
95 }
96
97 pub fn build_record_batch_with_timestamps(
107 &self,
108 vertices: &[(Vid, Vec<String>, Properties)],
109 deleted: &[bool],
110 versions: &[u64],
111 schema: &Schema,
112 created_at: Option<&HashMap<Vid, i64>>,
113 updated_at: Option<&HashMap<Vid, i64>>,
114 ) -> Result<RecordBatch> {
115 let arrow_schema = self.get_arrow_schema(schema)?;
116 let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
117
118 let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
119 columns.push(Arc::new(UInt64Array::from(vids)));
120
121 let mut uid_builder = FixedSizeBinaryBuilder::new(32);
122 for (_vid, _labels, props) in vertices.iter() {
123 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
124 let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
125 uid_builder.append_value(uid.as_bytes())?;
126 }
127 columns.push(Arc::new(uid_builder.finish()));
128
129 columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
130 columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
131
132 let mut ext_id_builder = StringBuilder::new();
134 for (_vid, _labels, props) in vertices.iter() {
135 if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
136 ext_id_builder.append_value(ext_id_val);
137 } else {
138 ext_id_builder.append_null();
139 }
140 }
141 columns.push(Arc::new(ext_id_builder.finish()));
142
143 let mut labels_builder = ListBuilder::new(StringBuilder::new());
145 for (_vid, labels, _props) in vertices.iter() {
146 let values = labels_builder.values();
147 for lbl in labels {
148 values.append_value(lbl);
149 }
150 labels_builder.append(true);
151 }
152 columns.push(Arc::new(labels_builder.finish()));
153
154 let vids = vertices.iter().map(|(v, _, _)| *v);
156 columns.push(build_timestamp_column_from_vid_map(
157 vids.clone(),
158 created_at,
159 ));
160 columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
161
162 let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
164 .with_deleted(deleted)
165 .build(|i| &vertices[i].2)?;
166
167 columns.extend(prop_columns);
168
169 let overflow_column = self.build_overflow_json_column(vertices, schema)?;
171 columns.push(overflow_column);
172
173 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
174 }
175
176 fn build_overflow_json_column(
178 &self,
179 vertices: &[(Vid, Vec<String>, Properties)],
180 schema: &Schema,
181 ) -> Result<ArrayRef> {
182 crate::storage::property_builder::build_overflow_json_column(
183 vertices.len(),
184 &self.label,
185 schema,
186 |i| &vertices[i].2,
187 &["ext_id"],
188 )
189 }
190
191 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
192 let mut fields = vec![
193 Field::new("_vid", arrow_schema::DataType::UInt64, false),
194 Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
195 Field::new("_deleted", arrow_schema::DataType::Boolean, false),
196 Field::new("_version", arrow_schema::DataType::UInt64, false),
197 Field::new("ext_id", arrow_schema::DataType::Utf8, true),
199 Field::new(
200 "_labels",
201 arrow_schema::DataType::List(Arc::new(Field::new(
202 "item",
203 arrow_schema::DataType::Utf8,
204 true,
205 ))),
206 true,
207 ),
208 Field::new(
209 "_created_at",
210 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
211 true,
212 ),
213 Field::new(
214 "_updated_at",
215 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
216 true,
217 ),
218 ];
219
220 if let Some(label_props) = schema.properties.get(&self.label) {
221 let mut sorted_props: Vec<_> = label_props.iter().collect();
222 sorted_props.sort_by_key(|(name, _)| *name);
223
224 for (name, meta) in sorted_props {
225 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
226 }
227 }
228
229 fields.push(Field::new(
231 "overflow_json",
232 arrow_schema::DataType::LargeBinary,
233 true,
234 ));
235
236 Ok(Arc::new(ArrowSchema::new(fields)))
237 }
238
239 pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
247 store.open_vertex_table(&self.label).await
248 }
249
250 pub async fn open_or_create_lancedb(
252 &self,
253 store: &LanceDbStore,
254 schema: &Schema,
255 ) -> Result<Table> {
256 let arrow_schema = self.get_arrow_schema(schema)?;
257 store
258 .open_or_create_vertex_table(&self.label, arrow_schema)
259 .await
260 }
261
262 pub async fn write_batch_lancedb(
266 &self,
267 store: &LanceDbStore,
268 batch: RecordBatch,
269 _schema: &Schema,
270 ) -> Result<Table> {
271 let table_name = LanceDbStore::vertex_table_name(&self.label);
272
273 if store.table_exists(&table_name).await? {
274 let table = store.open_table(&table_name).await?;
275 store.append_to_table(&table, vec![batch]).await?;
276 Ok(table)
277 } else {
278 store.create_table(&table_name, vec![batch]).await
279 }
280 }
281
282 pub async fn ensure_default_indexes_lancedb(&self, table: &Table) -> Result<()> {
286 let indices = table
287 .list_indices()
288 .await
289 .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
290
291 let has_index = |col: &str| {
292 indices
293 .iter()
294 .any(|idx| idx.columns.contains(&col.to_string()))
295 };
296
297 for column in &["_vid", "_uid", "ext_id"] {
298 if has_index(column) {
299 continue;
300 }
301 log::info!(
302 "Creating {} BTree index for label '{}' via LanceDB",
303 column,
304 self.label
305 );
306 if let Err(e) = table
307 .create_index(&[column], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
308 .execute()
309 .await
310 {
311 log::warn!(
312 "Failed to create {} index for '{}' via LanceDB: {}",
313 column,
314 self.label,
315 e
316 );
317 }
318 }
319
320 Ok(())
321 }
322
323 pub fn lancedb_table_name(&self) -> String {
325 LanceDbStore::vertex_table_name(&self.label)
326 }
327
328 pub async fn replace_lancedb(
333 &self,
334 store: &LanceDbStore,
335 batch: RecordBatch,
336 schema: &Schema,
337 ) -> Result<Table> {
338 let table_name = self.lancedb_table_name();
339 let arrow_schema = self.get_arrow_schema(schema)?;
340 store
341 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
342 .await
343 }
344}