1use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use anyhow::{Result, anyhow};
8use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
9use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
10use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
11use lance::dataset::Dataset;
12use lancedb::Table;
13use lancedb::index::Index as LanceDbIndex;
14use lancedb::index::scalar::BTreeIndexBuilder;
15use sha3::{Digest, Sha3_256};
16use std::sync::Arc;
17use uni_common::Properties;
18use uni_common::core::id::{UniId, Vid};
19use uni_common::core::schema::Schema;
20
21pub struct VertexDataset {
22 uri: String,
23 label: String,
24 _label_id: u16,
25}
26
27impl VertexDataset {
28 pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
29 let uri = format!("{}/vertices/{}", base_uri, label);
30 Self {
31 uri,
32 label: label.to_string(),
33 _label_id: label_id,
34 }
35 }
36
37 pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
40 let mut hasher = Sha3_256::new();
41
42 hasher.update(label.as_bytes());
44 hasher.update(b"\x00"); if let Some(eid) = ext_id {
48 hasher.update(eid.as_bytes());
49 }
50 hasher.update(b"\x00");
51
52 let mut sorted_props: Vec<_> = properties.iter().collect();
54 sorted_props.sort_by_key(|(k, _)| *k);
55 for (key, value) in sorted_props {
56 hasher.update(key.as_bytes());
57 hasher.update(b"=");
58 hasher.update(value.to_string().as_bytes());
59 hasher.update(b"\x00");
60 }
61
62 let hash: [u8; 32] = hasher.finalize().into();
63 UniId::from_bytes(hash)
64 }
65
66 pub async fn open(&self) -> Result<Arc<Dataset>> {
67 self.open_at(None).await
68 }
69
70 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
71 let mut ds = Dataset::open(&self.uri).await?;
72 if let Some(v) = version {
73 ds = ds.checkout_version(v).await?;
74 }
75 Ok(Arc::new(ds))
76 }
77
78 pub async fn open_raw(&self) -> Result<Dataset> {
79 let ds = Dataset::open(&self.uri).await?;
80 Ok(ds)
81 }
82
83 pub fn build_record_batch(
87 &self,
88 vertices: &[(Vid, Vec<String>, Properties)],
89 deleted: &[bool],
90 versions: &[u64],
91 schema: &Schema,
92 ) -> Result<RecordBatch> {
93 self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
94 }
95
96 pub fn build_record_batch_with_timestamps(
106 &self,
107 vertices: &[(Vid, Vec<String>, Properties)],
108 deleted: &[bool],
109 versions: &[u64],
110 schema: &Schema,
111 created_at: Option<&std::collections::HashMap<uni_common::core::id::Vid, i64>>,
112 updated_at: Option<&std::collections::HashMap<uni_common::core::id::Vid, i64>>,
113 ) -> Result<RecordBatch> {
114 let arrow_schema = self.get_arrow_schema(schema)?;
115 let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
116
117 let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
118 columns.push(Arc::new(UInt64Array::from(vids)));
119
120 let mut uid_builder = FixedSizeBinaryBuilder::new(32);
121 for (_vid, _labels, props) in vertices.iter() {
122 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
123 let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
124 uid_builder.append_value(uid.as_bytes())?;
125 }
126 columns.push(Arc::new(uid_builder.finish()));
127
128 columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
129 columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
130
131 let mut ext_id_builder = StringBuilder::new();
133 for (_vid, _labels, props) in vertices.iter() {
134 if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
135 ext_id_builder.append_value(ext_id_val);
136 } else {
137 ext_id_builder.append_null();
138 }
139 }
140 columns.push(Arc::new(ext_id_builder.finish()));
141
142 let mut labels_builder = ListBuilder::new(StringBuilder::new());
144 for (_vid, labels, _props) in vertices.iter() {
145 let values = labels_builder.values();
146 for lbl in labels {
147 values.append_value(lbl);
148 }
149 labels_builder.append(true);
150 }
151 columns.push(Arc::new(labels_builder.finish()));
152
153 let vids = vertices.iter().map(|(v, _, _)| *v);
155 columns.push(build_timestamp_column_from_vid_map(
156 vids.clone(),
157 created_at,
158 ));
159 columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
160
161 let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
163 .with_deleted(deleted)
164 .build(|i| &vertices[i].2)?;
165
166 columns.extend(prop_columns);
167
168 let overflow_column = self.build_overflow_json_column(vertices, schema)?;
170 columns.push(overflow_column);
171
172 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
173 }
174
175 fn build_overflow_json_column(
177 &self,
178 vertices: &[(Vid, Vec<String>, Properties)],
179 schema: &Schema,
180 ) -> Result<ArrayRef> {
181 crate::storage::property_builder::build_overflow_json_column(
182 vertices.len(),
183 &self.label,
184 schema,
185 |i| &vertices[i].2,
186 &["ext_id"],
187 )
188 }
189
190 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
191 let mut fields = vec![
192 Field::new("_vid", arrow_schema::DataType::UInt64, false),
193 Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
194 Field::new("_deleted", arrow_schema::DataType::Boolean, false),
195 Field::new("_version", arrow_schema::DataType::UInt64, false),
196 Field::new("ext_id", arrow_schema::DataType::Utf8, true),
198 Field::new(
199 "_labels",
200 arrow_schema::DataType::List(Arc::new(Field::new(
201 "item",
202 arrow_schema::DataType::Utf8,
203 true,
204 ))),
205 true,
206 ),
207 Field::new(
208 "_created_at",
209 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
210 true,
211 ),
212 Field::new(
213 "_updated_at",
214 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
215 true,
216 ),
217 ];
218
219 if let Some(label_props) = schema.properties.get(&self.label) {
220 let mut sorted_props: Vec<_> = label_props.iter().collect();
221 sorted_props.sort_by_key(|(name, _)| *name);
222
223 for (name, meta) in sorted_props {
224 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
225 }
226 }
227
228 fields.push(Field::new(
230 "overflow_json",
231 arrow_schema::DataType::LargeBinary,
232 true,
233 ));
234
235 Ok(Arc::new(ArrowSchema::new(fields)))
236 }
237
238 pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
246 store.open_vertex_table(&self.label).await
247 }
248
249 pub async fn open_or_create_lancedb(
251 &self,
252 store: &LanceDbStore,
253 schema: &Schema,
254 ) -> Result<Table> {
255 let arrow_schema = self.get_arrow_schema(schema)?;
256 store
257 .open_or_create_vertex_table(&self.label, arrow_schema)
258 .await
259 }
260
261 pub async fn write_batch_lancedb(
265 &self,
266 store: &LanceDbStore,
267 batch: RecordBatch,
268 _schema: &Schema,
269 ) -> Result<Table> {
270 let table_name = LanceDbStore::vertex_table_name(&self.label);
271
272 if store.table_exists(&table_name).await? {
273 let table = store.open_table(&table_name).await?;
274 store.append_to_table(&table, vec![batch]).await?;
275 Ok(table)
276 } else {
277 store.create_table(&table_name, vec![batch]).await
278 }
279 }
280
281 pub async fn ensure_default_indexes_lancedb(&self, table: &Table) -> Result<()> {
285 let indices = table
286 .list_indices()
287 .await
288 .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
289
290 let has_index = |col: &str| {
291 indices
292 .iter()
293 .any(|idx| idx.columns.contains(&col.to_string()))
294 };
295
296 for column in &["_vid", "_uid", "ext_id"] {
297 if has_index(column) {
298 continue;
299 }
300 log::info!(
301 "Creating {} BTree index for label '{}' via LanceDB",
302 column,
303 self.label
304 );
305 if let Err(e) = table
306 .create_index(&[column], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
307 .execute()
308 .await
309 {
310 log::warn!(
311 "Failed to create {} index for '{}' via LanceDB: {}",
312 column,
313 self.label,
314 e
315 );
316 }
317 }
318
319 Ok(())
320 }
321
322 pub fn lancedb_table_name(&self) -> String {
324 LanceDbStore::vertex_table_name(&self.label)
325 }
326
327 pub async fn replace_lancedb(
332 &self,
333 store: &LanceDbStore,
334 batch: RecordBatch,
335 schema: &Schema,
336 ) -> Result<Table> {
337 let table_name = self.lancedb_table_name();
338 let arrow_schema = self.get_arrow_schema(schema)?;
339 store
340 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
341 .await
342 }
343}