1use crate::backend::StorageBackend;
19use crate::backend::table_names;
20use crate::backend::types::{ScalarIndexType, ScanRequest};
21use crate::storage::arrow_convert::build_timestamp_column_from_eid_map;
22use anyhow::{Result, anyhow};
23use arrow_array::builder::{LargeBinaryBuilder, StringBuilder};
24use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt64Array};
25use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
26use sha3::{Digest, Sha3_256};
27use std::collections::HashMap;
28use std::sync::Arc;
29use uni_common::Properties;
30use uni_common::core::id::{Eid, UniId, Vid};
31
32#[derive(Debug)]
38pub struct MainEdgeDataset {
39 _base_uri: String,
40}
41
42impl MainEdgeDataset {
43 pub fn new(base_uri: &str) -> Self {
45 Self {
46 _base_uri: base_uri.to_string(),
47 }
48 }
49
50 pub fn compute_edge_uid(
63 src_uid: &UniId,
64 dst_uid: &UniId,
65 edge_type: &str,
66 props: &Properties,
67 ) -> UniId {
68 let mut hasher = Sha3_256::new();
69
70 hasher.update(b"src:");
73 hasher.update(src_uid.as_bytes());
74 hasher.update(b"\0");
75 hasher.update(b"dst:");
76 hasher.update(dst_uid.as_bytes());
77 hasher.update(b"\0");
78
79 hasher.update(b"type:");
81 hasher.update(edge_type.as_bytes());
82 hasher.update(b"\0");
83
84 let mut sorted_keys: Vec<_> = props.keys().collect();
86 sorted_keys.sort();
87 for key in sorted_keys {
88 if let Some(val) = props.get(key) {
89 hasher.update(key.as_bytes());
90 hasher.update(b":");
91 hasher.update(val.to_string().as_bytes());
92 hasher.update(b"\0");
93 }
94 }
95
96 let result = hasher.finalize();
97 UniId::from_bytes(result.into())
98 }
99
100 pub fn get_arrow_schema() -> Arc<ArrowSchema> {
102 Arc::new(ArrowSchema::new(vec![
103 Field::new("_eid", DataType::UInt64, false),
104 Field::new("src_vid", DataType::UInt64, false),
105 Field::new("dst_vid", DataType::UInt64, false),
106 Field::new("type", DataType::Utf8, false),
107 Field::new("props_json", DataType::LargeBinary, true),
108 Field::new("_deleted", DataType::Boolean, false),
109 Field::new("_version", DataType::UInt64, false),
110 Field::new(
111 "_created_at",
112 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
113 true,
114 ),
115 Field::new(
116 "_updated_at",
117 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
118 true,
119 ),
120 ]))
121 }
122
123 pub fn table_name() -> &'static str {
125 "edges"
126 }
127
128 pub fn build_record_batch(
135 edges: &[(Eid, Vid, Vid, String, Properties, bool, u64)],
136 created_at: Option<&HashMap<Eid, i64>>,
137 updated_at: Option<&HashMap<Eid, i64>>,
138 ) -> Result<RecordBatch> {
139 let arrow_schema = Self::get_arrow_schema();
140 let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
141
142 let eids: Vec<u64> = edges
144 .iter()
145 .map(|(e, _, _, _, _, _, _)| e.as_u64())
146 .collect();
147 columns.push(Arc::new(UInt64Array::from(eids)));
148
149 let src_vids: Vec<u64> = edges
151 .iter()
152 .map(|(_, s, _, _, _, _, _)| s.as_u64())
153 .collect();
154 columns.push(Arc::new(UInt64Array::from(src_vids)));
155
156 let dst_vids: Vec<u64> = edges
158 .iter()
159 .map(|(_, _, d, _, _, _, _)| d.as_u64())
160 .collect();
161 columns.push(Arc::new(UInt64Array::from(dst_vids)));
162
163 let mut type_builder = StringBuilder::new();
165 for (_, _, _, edge_type, _, _, _) in edges.iter() {
166 type_builder.append_value(edge_type);
167 }
168 columns.push(Arc::new(type_builder.finish()));
169
170 let mut props_json_builder = LargeBinaryBuilder::new();
172 for (_, _, _, _, props, _, _) in edges.iter() {
173 let jsonb_bytes = {
174 let json_val = serde_json::to_value(props).unwrap_or(serde_json::json!({}));
175 let uni_val: uni_common::Value = json_val.into();
176 uni_common::cypher_value_codec::encode(&uni_val)
177 };
178 props_json_builder.append_value(&jsonb_bytes);
179 }
180 columns.push(Arc::new(props_json_builder.finish()));
181
182 let deleted: Vec<bool> = edges.iter().map(|(_, _, _, _, _, d, _)| *d).collect();
184 columns.push(Arc::new(BooleanArray::from(deleted)));
185
186 let versions: Vec<u64> = edges.iter().map(|(_, _, _, _, _, _, v)| *v).collect();
188 columns.push(Arc::new(UInt64Array::from(versions)));
189
190 let eids = edges.iter().map(|(e, _, _, _, _, _, _)| *e);
192 columns.push(build_timestamp_column_from_eid_map(
193 eids.clone(),
194 created_at,
195 ));
196 columns.push(build_timestamp_column_from_eid_map(eids, updated_at));
197
198 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
199 }
200
201 pub async fn write_batch(backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
207 let table_name = table_names::main_edge_table_name();
208 crate::storage::manager::write_batch_with_lance_conflict_retry(backend, table_name, batch)
209 .await
210 }
211
212 pub async fn ensure_default_indexes(backend: &dyn StorageBackend) -> Result<()> {
217 let table_name = table_names::main_edge_table_name();
218 let indices = backend.list_indexes(table_name).await?;
219
220 let has_index = |col: &str| {
221 indices
222 .iter()
223 .any(|idx| idx.columns.contains(&col.to_string()))
224 };
225
226 for (column, idx_type) in [
227 ("_eid", ScalarIndexType::BTree),
228 ("src_vid", ScalarIndexType::BTree),
229 ("dst_vid", ScalarIndexType::BTree),
230 ("type", ScalarIndexType::BTree),
231 ] {
232 if has_index(column) {
233 continue;
234 }
235 log::info!("Creating {} index on main_edges", column);
236 if let Err(e) = backend
237 .create_scalar_index(table_name, column, idx_type)
238 .await
239 {
240 log::warn!("Failed to create {} index on main_edges: {}", column, e);
241 }
242 }
243
244 Ok(())
245 }
246
247 pub async fn find_by_eid(
249 backend: &dyn StorageBackend,
250 eid: Eid,
251 ) -> Result<Option<(Vid, Vid, String, Properties)>> {
252 let filter = format!("_eid = {}", eid.as_u64());
253 let results = Self::execute_query(backend, &filter, None).await?;
254
255 for batch in results {
256 if batch.num_rows() > 0 {
257 let src_vid_col = batch.column_by_name("src_vid");
258 let dst_vid_col = batch.column_by_name("dst_vid");
259 let type_col = batch.column_by_name("type");
260 let props_col = batch.column_by_name("props_json");
261
262 if let (Some(src), Some(dst), Some(typ), Some(props)) =
263 (src_vid_col, dst_vid_col, type_col, props_col)
264 && let (Some(src_arr), Some(dst_arr), Some(type_arr), Some(props_arr)) = (
265 src.as_any().downcast_ref::<UInt64Array>(),
266 dst.as_any().downcast_ref::<UInt64Array>(),
267 typ.as_any().downcast_ref::<arrow_array::StringArray>(),
268 props
269 .as_any()
270 .downcast_ref::<arrow_array::LargeBinaryArray>(),
271 )
272 {
273 let src_vid = Vid::from(src_arr.value(0));
274 let dst_vid = Vid::from(dst_arr.value(0));
275 let edge_type = type_arr.value(0).to_string();
276 let properties: Properties = if props_arr.is_null(0)
277 || props_arr.value(0).is_empty()
278 {
279 Properties::new()
280 } else {
281 let uni_val = uni_common::cypher_value_codec::decode(props_arr.value(0))
282 .unwrap_or(uni_common::Value::Null);
283 let json_val: serde_json::Value = uni_val.into();
284 serde_json::from_value(json_val).unwrap_or_default()
285 };
286
287 return Ok(Some((src_vid, dst_vid, edge_type, properties)));
288 }
289 }
290 }
291
292 Ok(None)
293 }
294
295 pub async fn exists_by_eid(backend: &dyn StorageBackend, eid: Eid) -> Result<bool> {
301 let filter = format!("_eid = {}", eid.as_u64());
302 let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
303 Ok(!batches.is_empty() && batches.iter().any(|b| b.num_rows() > 0))
304 }
305
306 async fn execute_query(
310 backend: &dyn StorageBackend,
311 filter: &str,
312 columns: Option<Vec<&str>>,
313 ) -> Result<Vec<RecordBatch>> {
314 let table_name = table_names::main_edge_table_name();
315
316 if !backend.table_exists(table_name).await? {
317 return Ok(Vec::new());
318 }
319
320 let mut request = ScanRequest::all(table_name).with_filter(filter);
321 if let Some(cols) = columns {
322 request = request.with_columns(cols.into_iter().map(String::from).collect());
323 }
324
325 backend.scan(request).await
326 }
327
328 fn extract_eids(batches: &[RecordBatch]) -> Vec<Eid> {
330 let mut eids = Vec::new();
331 for batch in batches {
332 if let Some(eid_col) = batch.column_by_name("_eid")
333 && let Some(eid_arr) = eid_col.as_any().downcast_ref::<UInt64Array>()
334 {
335 for i in 0..eid_arr.len() {
336 if !eid_arr.is_null(i) {
337 eids.push(Eid::new(eid_arr.value(i)));
338 }
339 }
340 }
341 }
342 eids
343 }
344
345 pub async fn find_all_eids(backend: &dyn StorageBackend) -> Result<Vec<Eid>> {
347 let batches = Self::execute_query(backend, "_deleted = false", Some(vec!["_eid"])).await?;
348 Ok(Self::extract_eids(&batches))
349 }
350
351 pub async fn find_eids_by_type_name(
353 backend: &dyn StorageBackend,
354 type_name: &str,
355 ) -> Result<Vec<Eid>> {
356 let filter = format!(
357 "_deleted = false AND type = '{}'",
358 type_name.replace('\'', "''")
359 );
360 let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
361 Ok(Self::extract_eids(&batches))
362 }
363
364 pub async fn find_props_by_eid(
369 backend: &dyn StorageBackend,
370 eid: Eid,
371 ) -> Result<Option<Properties>> {
372 let filter = format!("_eid = {} AND _deleted = false", eid.as_u64());
373 let batches =
374 Self::execute_query(backend, &filter, Some(vec!["props_json", "_version"])).await?;
375
376 if batches.is_empty() {
377 return Ok(None);
378 }
379
380 let mut best_props: Option<Properties> = None;
382 let mut best_version: u64 = 0;
383
384 for batch in &batches {
385 let props_col = batch.column_by_name("props_json");
386 let version_col = batch.column_by_name("_version");
387
388 if let (Some(props_arr), Some(ver_arr)) = (
389 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()),
390 version_col.and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
391 ) {
392 for i in 0..batch.num_rows() {
393 let version = if ver_arr.is_null(i) {
394 0
395 } else {
396 ver_arr.value(i)
397 };
398
399 if version >= best_version {
400 best_version = version;
401 best_props = Some(Self::parse_props_json(props_arr, i)?);
402 }
403 }
404 }
405 }
406
407 Ok(best_props)
408 }
409
410 fn parse_props_json(arr: &arrow_array::LargeBinaryArray, idx: usize) -> Result<Properties> {
412 if arr.is_null(idx) || arr.value(idx).is_empty() {
413 return Ok(Properties::new());
414 }
415 let bytes = arr.value(idx);
416 let uni_val = uni_common::cypher_value_codec::decode(bytes)
417 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
418 let json_val: serde_json::Value = uni_val.into();
419 serde_json::from_value(json_val).map_err(|e| anyhow!("Failed to parse props_json: {}", e))
420 }
421
422 pub async fn find_type_by_eid(
424 backend: &dyn StorageBackend,
425 eid: Eid,
426 ) -> Result<Option<String>> {
427 let filter = format!("_eid = {} AND _deleted = false", eid.as_u64());
428 let batches = Self::execute_query(backend, &filter, Some(vec!["type"])).await?;
429
430 for batch in batches {
431 if batch.num_rows() > 0
432 && let Some(type_col) = batch.column_by_name("type")
433 && let Some(type_arr) = type_col.as_any().downcast_ref::<arrow_array::StringArray>()
434 && !type_arr.is_null(0)
435 {
436 return Ok(Some(type_arr.value(0).to_string()));
437 }
438 }
439
440 Ok(None)
441 }
442
443 pub async fn find_edges_by_type_name(
447 backend: &dyn StorageBackend,
448 type_name: &str,
449 ) -> Result<Vec<(Eid, Vid, Vid, Properties)>> {
450 let filter = format!(
451 "_deleted = false AND type = '{}'",
452 type_name.replace('\'', "''")
453 );
454 let batches = Self::execute_query(backend, &filter, None).await?;
456
457 let mut edges = Vec::new();
458 for batch in &batches {
459 Self::extract_edges_from_batch(batch, &mut edges)?;
460 }
461
462 Ok(edges)
463 }
464
465 pub async fn find_edges_by_type_names(
470 backend: &dyn StorageBackend,
471 type_names: &[&str],
472 ) -> Result<Vec<(Eid, Vid, Vid, String, Properties)>> {
473 if type_names.is_empty() {
474 return Ok(Vec::new());
475 }
476
477 let escaped_types: Vec<String> = type_names
479 .iter()
480 .map(|t| format!("'{}'", t.replace('\'', "''")))
481 .collect();
482 let filter = format!(
483 "_deleted = false AND type IN ({})",
484 escaped_types.join(", ")
485 );
486
487 let batches = Self::execute_query(backend, &filter, None).await?;
489
490 let mut edges = Vec::new();
491 for batch in &batches {
492 Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
493 }
494
495 Ok(edges)
496 }
497
498 fn extract_edges_from_batch(
500 batch: &RecordBatch,
501 edges: &mut Vec<(Eid, Vid, Vid, Properties)>,
502 ) -> Result<()> {
503 let mut edges_with_type = Vec::new();
505 Self::extract_edges_with_type_from_batch(batch, &mut edges_with_type)?;
506 edges.extend(
507 edges_with_type
508 .into_iter()
509 .map(|(eid, src, dst, _type, props)| (eid, src, dst, props)),
510 );
511 Ok(())
512 }
513
514 fn extract_edges_with_type_from_batch(
516 batch: &RecordBatch,
517 edges: &mut Vec<(Eid, Vid, Vid, String, Properties)>,
518 ) -> Result<()> {
519 let Some(eid_arr) = batch
520 .column_by_name("_eid")
521 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
522 else {
523 return Ok(());
524 };
525 let Some(src_arr) = batch
526 .column_by_name("src_vid")
527 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
528 else {
529 return Ok(());
530 };
531 let Some(dst_arr) = batch
532 .column_by_name("dst_vid")
533 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
534 else {
535 return Ok(());
536 };
537 let type_arr = batch
538 .column_by_name("type")
539 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
540 let props_arr = batch
541 .column_by_name("props_json")
542 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
543
544 for i in 0..batch.num_rows() {
545 if eid_arr.is_null(i) || src_arr.is_null(i) || dst_arr.is_null(i) {
546 continue;
547 }
548
549 let eid = Eid::new(eid_arr.value(i));
550 let src_vid = Vid::new(src_arr.value(i));
551 let dst_vid = Vid::new(dst_arr.value(i));
552 let edge_type = type_arr
553 .filter(|arr| !arr.is_null(i))
554 .map(|arr| arr.value(i).to_string())
555 .unwrap_or_default();
556 let props = props_arr
557 .map(|arr| Self::parse_props_json(arr, i))
558 .transpose()?
559 .unwrap_or_default();
560
561 edges.push((eid, src_vid, dst_vid, edge_type, props));
562 }
563
564 Ok(())
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
573 fn test_main_edge_schema() {
574 let schema = MainEdgeDataset::get_arrow_schema();
575 assert_eq!(schema.fields().len(), 9);
576 assert!(schema.field_with_name("_eid").is_ok());
577 assert!(schema.field_with_name("src_vid").is_ok());
578 assert!(schema.field_with_name("dst_vid").is_ok());
579 assert!(schema.field_with_name("type").is_ok());
580 assert!(schema.field_with_name("props_json").is_ok());
581 assert!(schema.field_with_name("_deleted").is_ok());
582 assert!(schema.field_with_name("_version").is_ok());
583 assert!(schema.field_with_name("_created_at").is_ok());
584 assert!(schema.field_with_name("_updated_at").is_ok());
585 }
586
587 #[test]
588 fn test_build_record_batch() {
589 use uni_common::Value;
590 let mut props = HashMap::new();
591 props.insert("weight".to_string(), Value::Float(0.5));
592
593 let edges = vec![(
594 Eid::new(1),
595 Vid::new(1),
596 Vid::new(2),
597 "KNOWS".to_string(),
598 props,
599 false,
600 1u64,
601 )];
602
603 let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
604 assert_eq!(batch.num_rows(), 1);
605 assert_eq!(batch.num_columns(), 9);
606 }
607
608 #[test]
609 fn test_build_record_batch_multiple_edges() {
610 use uni_common::Value;
611
612 let edges = vec![
613 (
614 Eid::new(1),
615 Vid::new(1),
616 Vid::new(2),
617 "KNOWS".to_string(),
618 HashMap::from([("since".to_string(), Value::Int(2020))]),
619 false,
620 1u64,
621 ),
622 (
623 Eid::new(2),
624 Vid::new(2),
625 Vid::new(3),
626 "WORKS_AT".to_string(),
627 HashMap::new(),
628 false,
629 2u64,
630 ),
631 (
632 Eid::new(3),
633 Vid::new(1),
634 Vid::new(3),
635 "KNOWS".to_string(),
636 HashMap::new(),
637 true, 3u64,
639 ),
640 ];
641
642 let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
643 assert_eq!(batch.num_rows(), 3);
644 assert_eq!(batch.num_columns(), 9);
645
646 let type_col = batch
648 .column_by_name("type")
649 .unwrap()
650 .as_any()
651 .downcast_ref::<arrow_array::StringArray>()
652 .unwrap();
653 assert_eq!(type_col.value(0), "KNOWS");
654 assert_eq!(type_col.value(1), "WORKS_AT");
655 assert_eq!(type_col.value(2), "KNOWS");
656 }
657
658 #[test]
659 fn test_build_record_batch_with_timestamps() {
660 let edges = vec![(
661 Eid::new(1),
662 Vid::new(1),
663 Vid::new(2),
664 "KNOWS".to_string(),
665 HashMap::new(),
666 false,
667 1u64,
668 )];
669
670 let mut created_at: HashMap<Eid, i64> = HashMap::new();
671 created_at.insert(Eid::new(1), 1_000_000_000);
672
673 let mut updated_at: HashMap<Eid, i64> = HashMap::new();
674 updated_at.insert(Eid::new(1), 2_000_000_000);
675
676 let batch =
677 MainEdgeDataset::build_record_batch(&edges, Some(&created_at), Some(&updated_at))
678 .unwrap();
679 assert_eq!(batch.num_rows(), 1);
680
681 let created_col = batch.column_by_name("_created_at").unwrap();
683 assert!(!created_col.is_null(0), "created_at should be populated");
684 }
685}