1use crate::backend::StorageBackend;
5use crate::backend::table_names;
6use crate::backend::types::{ScalarIndexType, WriteMode};
7use crate::storage::arrow_convert::build_timestamp_column_from_vid_map;
8use crate::storage::property_builder::PropertyColumnBuilder;
9use anyhow::{Result, anyhow};
10use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, StringBuilder};
11use arrow_array::{ArrayRef, BooleanArray, RecordBatch, UInt64Array};
12use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
13#[cfg(feature = "lance-backend")]
14use lance::dataset::Dataset;
15use sha3::{Digest, Sha3_256};
16use std::collections::HashMap;
17use std::sync::Arc;
18use uni_common::Properties;
19use uni_common::core::id::{UniId, Vid};
20use uni_common::core::schema::Schema;
21
22pub struct VertexDataset {
23 #[cfg_attr(not(feature = "lance-backend"), allow(dead_code))]
24 uri: String,
25 label: String,
26 _label_id: u16,
27}
28
29impl VertexDataset {
30 pub fn new(base_uri: &str, label: &str, label_id: u16) -> Self {
31 let uri = format!("{}/vertices_{}", base_uri, label);
32 Self {
33 uri,
34 label: label.to_string(),
35 _label_id: label_id,
36 }
37 }
38
39 pub fn compute_vertex_uid(label: &str, ext_id: Option<&str>, properties: &Properties) -> UniId {
42 let mut hasher = Sha3_256::new();
43
44 hasher.update(label.as_bytes());
46 hasher.update(b"\x00"); if let Some(eid) = ext_id {
50 hasher.update(eid.as_bytes());
51 }
52 hasher.update(b"\x00");
53
54 let mut sorted_props: Vec<_> = properties.iter().collect();
56 sorted_props.sort_by_key(|(k, _)| *k);
57 for (key, value) in sorted_props {
58 hasher.update(key.as_bytes());
59 hasher.update(b"=");
60 hasher.update(value.to_string().as_bytes());
61 hasher.update(b"\x00");
62 }
63
64 let hash: [u8; 32] = hasher.finalize().into();
65 UniId::from_bytes(hash)
66 }
67
68 #[cfg(feature = "lance-backend")]
69 pub async fn open(&self) -> Result<Arc<Dataset>> {
70 self.open_at(None).await
71 }
72
73 #[cfg(feature = "lance-backend")]
74 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
75 let mut ds = Dataset::open(&self.uri).await?;
76 if let Some(v) = version {
77 ds = ds.checkout_version(v).await?;
78 }
79 Ok(Arc::new(ds))
80 }
81
82 #[cfg(feature = "lance-backend")]
83 pub async fn open_raw(&self) -> Result<Dataset> {
84 let ds = Dataset::open(&self.uri).await?;
85 Ok(ds)
86 }
87
88 pub fn build_record_batch(
92 &self,
93 vertices: &[(Vid, Vec<String>, Properties)],
94 deleted: &[bool],
95 versions: &[u64],
96 schema: &Schema,
97 ) -> Result<RecordBatch> {
98 self.build_record_batch_with_timestamps(vertices, deleted, versions, schema, None, None)
99 }
100
101 pub fn build_record_batch_with_timestamps(
111 &self,
112 vertices: &[(Vid, Vec<String>, Properties)],
113 deleted: &[bool],
114 versions: &[u64],
115 schema: &Schema,
116 created_at: Option<&HashMap<Vid, i64>>,
117 updated_at: Option<&HashMap<Vid, i64>>,
118 ) -> Result<RecordBatch> {
119 let arrow_schema = self.get_arrow_schema(schema)?;
120 let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
121
122 let vids: Vec<u64> = vertices.iter().map(|(v, _, _)| v.as_u64()).collect();
123 columns.push(Arc::new(UInt64Array::from(vids)));
124
125 let mut uid_builder = FixedSizeBinaryBuilder::new(32);
126 for (_vid, _labels, props) in vertices.iter() {
127 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
128 let uid = Self::compute_vertex_uid(&self.label, ext_id, props);
129 uid_builder.append_value(uid.as_bytes())?;
130 }
131 columns.push(Arc::new(uid_builder.finish()));
132
133 columns.push(Arc::new(BooleanArray::from(deleted.to_vec())));
134 columns.push(Arc::new(UInt64Array::from(versions.to_vec())));
135
136 let mut ext_id_builder = StringBuilder::new();
138 for (_vid, _labels, props) in vertices.iter() {
139 if let Some(ext_id_val) = props.get("ext_id").and_then(|v| v.as_str()) {
140 ext_id_builder.append_value(ext_id_val);
141 } else {
142 ext_id_builder.append_null();
143 }
144 }
145 columns.push(Arc::new(ext_id_builder.finish()));
146
147 let mut labels_builder = ListBuilder::new(StringBuilder::new());
149 for (_vid, labels, _props) in vertices.iter() {
150 let values = labels_builder.values();
151 for lbl in labels {
152 values.append_value(lbl);
153 }
154 labels_builder.append(true);
155 }
156 columns.push(Arc::new(labels_builder.finish()));
157
158 let vids = vertices.iter().map(|(v, _, _)| *v);
160 columns.push(build_timestamp_column_from_vid_map(
161 vids.clone(),
162 created_at,
163 ));
164 columns.push(build_timestamp_column_from_vid_map(vids, updated_at));
165
166 let prop_columns = PropertyColumnBuilder::new(schema, &self.label, vertices.len())
168 .with_deleted(deleted)
169 .build(|i| &vertices[i].2)?;
170
171 columns.extend(prop_columns);
172
173 let overflow_column = self.build_overflow_json_column(vertices, schema)?;
175 columns.push(overflow_column);
176
177 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
178 }
179
180 fn build_overflow_json_column(
182 &self,
183 vertices: &[(Vid, Vec<String>, Properties)],
184 schema: &Schema,
185 ) -> Result<ArrayRef> {
186 crate::storage::property_builder::build_overflow_json_column(
187 vertices.len(),
188 &self.label,
189 schema,
190 |i| &vertices[i].2,
191 &["ext_id"],
192 )
193 }
194
195 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
196 let mut fields = vec![
197 Field::new("_vid", arrow_schema::DataType::UInt64, false),
198 Field::new("_uid", arrow_schema::DataType::FixedSizeBinary(32), true),
199 Field::new("_deleted", arrow_schema::DataType::Boolean, false),
200 Field::new("_version", arrow_schema::DataType::UInt64, false),
201 Field::new("ext_id", arrow_schema::DataType::Utf8, true),
203 Field::new(
204 "_labels",
205 arrow_schema::DataType::List(Arc::new(Field::new(
206 "item",
207 arrow_schema::DataType::Utf8,
208 true,
209 ))),
210 true,
211 ),
212 Field::new(
213 "_created_at",
214 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
215 true,
216 ),
217 Field::new(
218 "_updated_at",
219 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
220 true,
221 ),
222 ];
223
224 if let Some(label_props) = schema.properties.get(&self.label) {
225 let mut sorted_props: Vec<_> = label_props.iter().collect();
226 sorted_props.sort_by_key(|(name, _)| *name);
227
228 for (name, meta) in sorted_props {
229 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
230 }
231 }
232
233 fields.push(Field::new(
235 "overflow_json",
236 arrow_schema::DataType::LargeBinary,
237 true,
238 ));
239
240 Ok(Arc::new(ArrowSchema::new(fields)))
241 }
242
243 pub async fn open_or_create(
249 &self,
250 backend: &dyn StorageBackend,
251 schema: &Schema,
252 ) -> Result<()> {
253 let table_name = table_names::vertex_table_name(&self.label);
254 let arrow_schema = self.get_arrow_schema(schema)?;
255 backend
256 .open_or_create_table(&table_name, arrow_schema)
257 .await
258 }
259
260 pub async fn write_batch(
264 &self,
265 backend: &dyn StorageBackend,
266 batch: RecordBatch,
267 _schema: &Schema,
268 ) -> Result<()> {
269 let table_name = table_names::vertex_table_name(&self.label);
270 if backend.table_exists(&table_name).await? {
271 backend
272 .write(&table_name, vec![batch], WriteMode::Append)
273 .await
274 } else {
275 backend.create_table(&table_name, vec![batch]).await
276 }
277 }
278
279 pub async fn ensure_default_indexes(&self, backend: &dyn StorageBackend) -> Result<()> {
281 let table_name = table_names::vertex_table_name(&self.label);
282 let indices = backend.list_indexes(&table_name).await?;
283
284 let has_index = |col: &str| {
285 indices
286 .iter()
287 .any(|idx| idx.columns.contains(&col.to_string()))
288 };
289
290 for column in &["_vid", "_uid", "ext_id"] {
291 if has_index(column) {
292 continue;
293 }
294 log::info!("Creating {} BTree index for label '{}'", column, self.label);
295 if let Err(e) = backend
296 .create_scalar_index(&table_name, column, ScalarIndexType::BTree)
297 .await
298 {
299 log::warn!(
300 "Failed to create {} index for '{}': {}",
301 column,
302 self.label,
303 e
304 );
305 }
306 }
307
308 Ok(())
309 }
310
311 pub fn table_name(&self) -> String {
313 table_names::vertex_table_name(&self.label)
314 }
315
316 pub async fn replace(
320 &self,
321 backend: &dyn StorageBackend,
322 batch: RecordBatch,
323 schema: &Schema,
324 ) -> Result<()> {
325 let table_name = self.table_name();
326 let arrow_schema = self.get_arrow_schema(schema)?;
327 backend
328 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
329 .await
330 }
331}