1use crate::backend::StorageBackend;
19use crate::backend::table_names;
20use crate::backend::types::{ScalarIndexType, ScanRequest};
21use crate::storage::arrow_convert::build_timestamp_column_from_eid_map;
22use anyhow::{Result, anyhow};
23use arrow_array::builder::{LargeBinaryBuilder, StringBuilder};
24use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt64Array};
25use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
26use sha3::{Digest, Sha3_256};
27use std::collections::HashMap;
28use std::sync::Arc;
29use uni_common::Properties;
30use uni_common::core::id::{Eid, UniId, Vid};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum EndpointSide {
39 Src,
40 Dst,
41 Either,
42}
43
44#[derive(Debug)]
50pub struct MainEdgeDataset {
51 _base_uri: String,
52}
53
54impl MainEdgeDataset {
55 pub fn new(base_uri: &str) -> Self {
57 Self {
58 _base_uri: base_uri.to_string(),
59 }
60 }
61
62 pub fn compute_edge_uid(
75 src_uid: &UniId,
76 dst_uid: &UniId,
77 edge_type: &str,
78 props: &Properties,
79 ) -> UniId {
80 let mut hasher = Sha3_256::new();
81
82 hasher.update(b"src:");
85 hasher.update(src_uid.as_bytes());
86 hasher.update(b"\0");
87 hasher.update(b"dst:");
88 hasher.update(dst_uid.as_bytes());
89 hasher.update(b"\0");
90
91 hasher.update(b"type:");
93 hasher.update(edge_type.as_bytes());
94 hasher.update(b"\0");
95
96 let mut sorted_keys: Vec<_> = props.keys().collect();
98 sorted_keys.sort();
99 for key in sorted_keys {
100 if let Some(val) = props.get(key) {
101 hasher.update(key.as_bytes());
102 hasher.update(b":");
103 hasher.update(val.to_string().as_bytes());
104 hasher.update(b"\0");
105 }
106 }
107
108 let result = hasher.finalize();
109 UniId::from_bytes(result.into())
110 }
111
112 pub fn get_arrow_schema() -> Arc<ArrowSchema> {
114 Arc::new(ArrowSchema::new(vec![
115 Field::new("_eid", DataType::UInt64, false),
116 Field::new("src_vid", DataType::UInt64, false),
117 Field::new("dst_vid", DataType::UInt64, false),
118 Field::new("type", DataType::Utf8, false),
119 Field::new("props_json", DataType::LargeBinary, true),
120 Field::new("_deleted", DataType::Boolean, false),
121 Field::new("_version", DataType::UInt64, false),
122 Field::new(
123 "_created_at",
124 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
125 true,
126 ),
127 Field::new(
128 "_updated_at",
129 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
130 true,
131 ),
132 ]))
133 }
134
135 pub fn table_name() -> &'static str {
137 "edges"
138 }
139
140 pub fn build_record_batch(
147 edges: &[(Eid, Vid, Vid, String, Properties, bool, u64)],
148 created_at: Option<&HashMap<Eid, i64>>,
149 updated_at: Option<&HashMap<Eid, i64>>,
150 ) -> Result<RecordBatch> {
151 let arrow_schema = Self::get_arrow_schema();
152 let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
153
154 let eids: Vec<u64> = edges
156 .iter()
157 .map(|(e, _, _, _, _, _, _)| e.as_u64())
158 .collect();
159 columns.push(Arc::new(UInt64Array::from(eids)));
160
161 let src_vids: Vec<u64> = edges
163 .iter()
164 .map(|(_, s, _, _, _, _, _)| s.as_u64())
165 .collect();
166 columns.push(Arc::new(UInt64Array::from(src_vids)));
167
168 let dst_vids: Vec<u64> = edges
170 .iter()
171 .map(|(_, _, d, _, _, _, _)| d.as_u64())
172 .collect();
173 columns.push(Arc::new(UInt64Array::from(dst_vids)));
174
175 let mut type_builder = StringBuilder::new();
177 for (_, _, _, edge_type, _, _, _) in edges.iter() {
178 type_builder.append_value(edge_type);
179 }
180 columns.push(Arc::new(type_builder.finish()));
181
182 let mut props_json_builder = LargeBinaryBuilder::new();
184 for (_, _, _, _, props, _, _) in edges.iter() {
185 let jsonb_bytes = {
186 let json_val = serde_json::to_value(props).unwrap_or(serde_json::json!({}));
187 let uni_val: uni_common::Value = json_val.into();
188 uni_common::cypher_value_codec::encode(&uni_val)
189 };
190 props_json_builder.append_value(&jsonb_bytes);
191 }
192 columns.push(Arc::new(props_json_builder.finish()));
193
194 let deleted: Vec<bool> = edges.iter().map(|(_, _, _, _, _, d, _)| *d).collect();
196 columns.push(Arc::new(BooleanArray::from(deleted)));
197
198 let versions: Vec<u64> = edges.iter().map(|(_, _, _, _, _, _, v)| *v).collect();
200 columns.push(Arc::new(UInt64Array::from(versions)));
201
202 let eids = edges.iter().map(|(e, _, _, _, _, _, _)| *e);
204 columns.push(build_timestamp_column_from_eid_map(
205 eids.clone(),
206 created_at,
207 ));
208 columns.push(build_timestamp_column_from_eid_map(eids, updated_at));
209
210 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
211 }
212
213 pub async fn write_batch(backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
219 let table_name = table_names::main_edge_table_name();
220 crate::storage::manager::write_batch_with_lance_conflict_retry(backend, table_name, batch)
221 .await
222 }
223
224 pub async fn ensure_default_indexes(backend: &dyn StorageBackend) -> Result<()> {
229 let table_name = table_names::main_edge_table_name();
230 let indices = backend.list_indexes(table_name).await?;
231
232 let has_index = |col: &str| {
233 indices
234 .iter()
235 .any(|idx| idx.columns.contains(&col.to_string()))
236 };
237
238 for (column, idx_type) in [
239 ("_eid", ScalarIndexType::BTree),
240 ("src_vid", ScalarIndexType::BTree),
241 ("dst_vid", ScalarIndexType::BTree),
242 ("type", ScalarIndexType::BTree),
243 ] {
244 if has_index(column) {
245 continue;
246 }
247 log::info!("Creating {} index on main_edges", column);
248 if let Err(e) = backend
249 .create_scalar_index(table_name, column, idx_type)
250 .await
251 {
252 log::warn!("Failed to create {} index on main_edges: {}", column, e);
253 }
254 }
255
256 Ok(())
257 }
258
259 pub async fn find_by_eid(
261 backend: &dyn StorageBackend,
262 eid: Eid,
263 ) -> Result<Option<(Vid, Vid, String, Properties)>> {
264 let filter = format!("_eid = {}", eid.as_u64());
265 let results = Self::execute_query(backend, &filter, None).await?;
266
267 for batch in results {
268 if batch.num_rows() > 0 {
269 let src_vid_col = batch.column_by_name("src_vid");
270 let dst_vid_col = batch.column_by_name("dst_vid");
271 let type_col = batch.column_by_name("type");
272 let props_col = batch.column_by_name("props_json");
273
274 if let (Some(src), Some(dst), Some(typ), Some(props)) =
275 (src_vid_col, dst_vid_col, type_col, props_col)
276 && let (Some(src_arr), Some(dst_arr), Some(type_arr), Some(props_arr)) = (
277 src.as_any().downcast_ref::<UInt64Array>(),
278 dst.as_any().downcast_ref::<UInt64Array>(),
279 typ.as_any().downcast_ref::<arrow_array::StringArray>(),
280 props
281 .as_any()
282 .downcast_ref::<arrow_array::LargeBinaryArray>(),
283 )
284 {
285 let src_vid = Vid::from(src_arr.value(0));
286 let dst_vid = Vid::from(dst_arr.value(0));
287 let edge_type = type_arr.value(0).to_string();
288 let properties: Properties = if props_arr.is_null(0)
289 || props_arr.value(0).is_empty()
290 {
291 Properties::new()
292 } else {
293 let uni_val = uni_common::cypher_value_codec::decode(props_arr.value(0))
294 .unwrap_or(uni_common::Value::Null);
295 let json_val: serde_json::Value = uni_val.into();
296 serde_json::from_value(json_val).unwrap_or_default()
297 };
298
299 return Ok(Some((src_vid, dst_vid, edge_type, properties)));
300 }
301 }
302 }
303
304 Ok(None)
305 }
306
307 pub async fn exists_by_eid(backend: &dyn StorageBackend, eid: Eid) -> Result<bool> {
313 let filter = format!("_eid = {}", eid.as_u64());
314 let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
315 Ok(!batches.is_empty() && batches.iter().any(|b| b.num_rows() > 0))
316 }
317
318 async fn execute_query(
322 backend: &dyn StorageBackend,
323 filter: &str,
324 columns: Option<Vec<&str>>,
325 ) -> Result<Vec<RecordBatch>> {
326 let table_name = table_names::main_edge_table_name();
327
328 if !backend.table_exists(table_name).await? {
329 return Ok(Vec::new());
330 }
331
332 let mut request = ScanRequest::all(table_name).with_filter(filter);
333 if let Some(cols) = columns {
334 request = request.with_columns(cols.into_iter().map(String::from).collect());
335 }
336
337 backend.scan(request).await
338 }
339
340 fn extract_eids(batches: &[RecordBatch]) -> Vec<Eid> {
342 let mut eids = Vec::new();
343 for batch in batches {
344 if let Some(eid_col) = batch.column_by_name("_eid")
345 && let Some(eid_arr) = eid_col.as_any().downcast_ref::<UInt64Array>()
346 {
347 for i in 0..eid_arr.len() {
348 if !eid_arr.is_null(i) {
349 eids.push(Eid::new(eid_arr.value(i)));
350 }
351 }
352 }
353 }
354 eids
355 }
356
357 pub async fn find_all_eids(backend: &dyn StorageBackend) -> Result<Vec<Eid>> {
359 let batches = Self::execute_query(backend, "_deleted = false", Some(vec!["_eid"])).await?;
360 Ok(Self::extract_eids(&batches))
361 }
362
363 pub async fn find_eids_by_type_name(
365 backend: &dyn StorageBackend,
366 type_name: &str,
367 ) -> Result<Vec<Eid>> {
368 let filter = format!(
369 "_deleted = false AND type = '{}'",
370 type_name.replace('\'', "''")
371 );
372 let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
373 Ok(Self::extract_eids(&batches))
374 }
375
376 pub async fn find_props_by_eid(
381 backend: &dyn StorageBackend,
382 eid: Eid,
383 ) -> Result<Option<Properties>> {
384 let filter = format!("_eid = {}", eid.as_u64());
390 let batches = Self::execute_query(
391 backend,
392 &filter,
393 Some(vec!["props_json", "_version", "_deleted"]),
394 )
395 .await?;
396
397 if batches.is_empty() {
398 return Ok(None);
399 }
400
401 let mut best_props: Option<Properties> = None;
403 let mut best_version: u64 = 0;
404 let mut best_deleted = false;
405
406 for batch in &batches {
407 let props_col = batch.column_by_name("props_json");
408 let version_col = batch.column_by_name("_version");
409 let deleted_col = batch
410 .column_by_name("_deleted")
411 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>());
412
413 if let (Some(props_arr), Some(ver_arr)) = (
414 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()),
415 version_col.and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
416 ) {
417 for i in 0..batch.num_rows() {
418 let version = if ver_arr.is_null(i) {
419 0
420 } else {
421 ver_arr.value(i)
422 };
423
424 if version >= best_version {
425 best_version = version;
426 best_deleted = deleted_col.is_some_and(|d| d.value(i));
427 best_props = if best_deleted {
428 Some(Properties::new())
429 } else {
430 Some(Self::parse_props_json(props_arr, i)?)
431 };
432 }
433 }
434 }
435 }
436
437 if best_deleted {
438 return Ok(None);
439 }
440 Ok(best_props)
441 }
442
443 fn parse_props_json(arr: &arrow_array::LargeBinaryArray, idx: usize) -> Result<Properties> {
445 if arr.is_null(idx) || arr.value(idx).is_empty() {
446 return Ok(Properties::new());
447 }
448 let bytes = arr.value(idx);
449 let uni_val = uni_common::cypher_value_codec::decode(bytes)
450 .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
451 let json_val: serde_json::Value = uni_val.into();
452 serde_json::from_value(json_val).map_err(|e| anyhow!("Failed to parse props_json: {}", e))
453 }
454
455 pub async fn find_type_by_eid(
457 backend: &dyn StorageBackend,
458 eid: Eid,
459 ) -> Result<Option<String>> {
460 let filter = format!("_eid = {}", eid.as_u64());
465 let batches =
466 Self::execute_query(backend, &filter, Some(vec!["type", "_version", "_deleted"]))
467 .await?;
468
469 let mut best_type: Option<String> = None;
470 let mut best_version: u64 = 0;
471 let mut best_deleted = false;
472
473 for batch in &batches {
474 let type_col = batch
475 .column_by_name("type")
476 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
477 let ver_col = batch
478 .column_by_name("_version")
479 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
480 let deleted_col = batch
481 .column_by_name("_deleted")
482 .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>());
483
484 if let (Some(type_arr), Some(ver_arr)) = (type_col, ver_col) {
485 for i in 0..batch.num_rows() {
486 let version = if ver_arr.is_null(i) {
487 0
488 } else {
489 ver_arr.value(i)
490 };
491
492 if version >= best_version {
493 best_version = version;
494 best_deleted = deleted_col.is_some_and(|d| d.value(i));
495 best_type = if best_deleted || type_arr.is_null(i) {
496 None
497 } else {
498 Some(type_arr.value(i).to_string())
499 };
500 }
501 }
502 }
503 }
504
505 if best_deleted {
506 return Ok(None);
507 }
508 Ok(best_type)
509 }
510
511 pub async fn find_edges_by_type_name(
515 backend: &dyn StorageBackend,
516 type_name: &str,
517 ) -> Result<Vec<(Eid, Vid, Vid, Properties)>> {
518 let filter = format!(
519 "_deleted = false AND type = '{}'",
520 type_name.replace('\'', "''")
521 );
522 let batches = Self::execute_query(backend, &filter, None).await?;
524
525 let mut edges = Vec::new();
526 for batch in &batches {
527 Self::extract_edges_from_batch(batch, &mut edges)?;
528 }
529
530 Ok(edges)
531 }
532
533 pub async fn find_edges_by_type_names(
542 backend: &dyn StorageBackend,
543 type_names: &[&str],
544 endpoint_filter: Option<(EndpointSide, &[Vid])>,
545 ) -> Result<Vec<(Eid, Vid, Vid, String, Properties)>> {
546 if type_names.is_empty() {
547 return Ok(Vec::new());
548 }
549
550 let escaped_types: Vec<String> = type_names
552 .iter()
553 .map(|t| format!("'{}'", t.replace('\'', "''")))
554 .collect();
555 let base_filter = format!(
556 "_deleted = false AND type IN ({})",
557 escaped_types.join(", ")
558 );
559
560 let mut edges = Vec::new();
561 match endpoint_filter {
562 None => {
563 let batches = Self::execute_query(backend, &base_filter, None).await?;
565 for batch in &batches {
566 Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
567 }
568 }
569 Some((_, [])) => {}
570 Some((side, vids)) => {
571 const VID_CHUNK: usize = 8192;
573 for chunk in vids.chunks(VID_CHUNK) {
574 let list = chunk
575 .iter()
576 .map(|v| v.as_u64().to_string())
577 .collect::<Vec<_>>()
578 .join(", ");
579 let endpoint_clause = match side {
580 EndpointSide::Src => format!("src_vid IN ({list})"),
581 EndpointSide::Dst => format!("dst_vid IN ({list})"),
582 EndpointSide::Either => {
583 format!("(src_vid IN ({list}) OR dst_vid IN ({list}))")
584 }
585 };
586 let filter = format!("{base_filter} AND {endpoint_clause}");
587 let batches = Self::execute_query(backend, &filter, None).await?;
588 for batch in &batches {
589 Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
590 }
591 }
592 }
593 }
594
595 Ok(edges)
596 }
597
598 fn extract_edges_from_batch(
600 batch: &RecordBatch,
601 edges: &mut Vec<(Eid, Vid, Vid, Properties)>,
602 ) -> Result<()> {
603 let mut edges_with_type = Vec::new();
605 Self::extract_edges_with_type_from_batch(batch, &mut edges_with_type)?;
606 edges.extend(
607 edges_with_type
608 .into_iter()
609 .map(|(eid, src, dst, _type, props)| (eid, src, dst, props)),
610 );
611 Ok(())
612 }
613
614 fn extract_edges_with_type_from_batch(
616 batch: &RecordBatch,
617 edges: &mut Vec<(Eid, Vid, Vid, String, Properties)>,
618 ) -> Result<()> {
619 let Some(eid_arr) = batch
620 .column_by_name("_eid")
621 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
622 else {
623 return Ok(());
624 };
625 let Some(src_arr) = batch
626 .column_by_name("src_vid")
627 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
628 else {
629 return Ok(());
630 };
631 let Some(dst_arr) = batch
632 .column_by_name("dst_vid")
633 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
634 else {
635 return Ok(());
636 };
637 let type_arr = batch
638 .column_by_name("type")
639 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
640 let props_arr = batch
641 .column_by_name("props_json")
642 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
643
644 for i in 0..batch.num_rows() {
645 if eid_arr.is_null(i) || src_arr.is_null(i) || dst_arr.is_null(i) {
646 continue;
647 }
648
649 let eid = Eid::new(eid_arr.value(i));
650 let src_vid = Vid::new(src_arr.value(i));
651 let dst_vid = Vid::new(dst_arr.value(i));
652 let edge_type = type_arr
653 .filter(|arr| !arr.is_null(i))
654 .map(|arr| arr.value(i).to_string())
655 .unwrap_or_default();
656 let props = props_arr
657 .map(|arr| Self::parse_props_json(arr, i))
658 .transpose()?
659 .unwrap_or_default();
660
661 edges.push((eid, src_vid, dst_vid, edge_type, props));
662 }
663
664 Ok(())
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn test_main_edge_schema() {
674 let schema = MainEdgeDataset::get_arrow_schema();
675 assert_eq!(schema.fields().len(), 9);
676 assert!(schema.field_with_name("_eid").is_ok());
677 assert!(schema.field_with_name("src_vid").is_ok());
678 assert!(schema.field_with_name("dst_vid").is_ok());
679 assert!(schema.field_with_name("type").is_ok());
680 assert!(schema.field_with_name("props_json").is_ok());
681 assert!(schema.field_with_name("_deleted").is_ok());
682 assert!(schema.field_with_name("_version").is_ok());
683 assert!(schema.field_with_name("_created_at").is_ok());
684 assert!(schema.field_with_name("_updated_at").is_ok());
685 }
686
687 #[test]
688 fn test_build_record_batch() {
689 use uni_common::Value;
690 let mut props = HashMap::new();
691 props.insert("weight".to_string(), Value::Float(0.5));
692
693 let edges = vec![(
694 Eid::new(1),
695 Vid::new(1),
696 Vid::new(2),
697 "KNOWS".to_string(),
698 props,
699 false,
700 1u64,
701 )];
702
703 let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
704 assert_eq!(batch.num_rows(), 1);
705 assert_eq!(batch.num_columns(), 9);
706 }
707
708 #[test]
709 fn test_build_record_batch_multiple_edges() {
710 use uni_common::Value;
711
712 let edges = vec![
713 (
714 Eid::new(1),
715 Vid::new(1),
716 Vid::new(2),
717 "KNOWS".to_string(),
718 HashMap::from([("since".to_string(), Value::Int(2020))]),
719 false,
720 1u64,
721 ),
722 (
723 Eid::new(2),
724 Vid::new(2),
725 Vid::new(3),
726 "WORKS_AT".to_string(),
727 HashMap::new(),
728 false,
729 2u64,
730 ),
731 (
732 Eid::new(3),
733 Vid::new(1),
734 Vid::new(3),
735 "KNOWS".to_string(),
736 HashMap::new(),
737 true, 3u64,
739 ),
740 ];
741
742 let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
743 assert_eq!(batch.num_rows(), 3);
744 assert_eq!(batch.num_columns(), 9);
745
746 let type_col = batch
748 .column_by_name("type")
749 .unwrap()
750 .as_any()
751 .downcast_ref::<arrow_array::StringArray>()
752 .unwrap();
753 assert_eq!(type_col.value(0), "KNOWS");
754 assert_eq!(type_col.value(1), "WORKS_AT");
755 assert_eq!(type_col.value(2), "KNOWS");
756 }
757
758 #[test]
759 fn test_build_record_batch_with_timestamps() {
760 let edges = vec![(
761 Eid::new(1),
762 Vid::new(1),
763 Vid::new(2),
764 "KNOWS".to_string(),
765 HashMap::new(),
766 false,
767 1u64,
768 )];
769
770 let mut created_at: HashMap<Eid, i64> = HashMap::new();
771 created_at.insert(Eid::new(1), 1_000_000_000);
772
773 let mut updated_at: HashMap<Eid, i64> = HashMap::new();
774 updated_at.insert(Eid::new(1), 2_000_000_000);
775
776 let batch =
777 MainEdgeDataset::build_record_batch(&edges, Some(&created_at), Some(&updated_at))
778 .unwrap();
779 assert_eq!(batch.num_rows(), 1);
780
781 let created_col = batch.column_by_name("_created_at").unwrap();
783 assert!(!created_col.is_null(0), "created_at should be populated");
784 }
785
786 #[tokio::test]
792 async fn test_edge_key_reads_respect_tombstone_winner() {
793 use crate::backend::lance::LanceDbBackend;
794 use uni_common::Value;
795
796 let dir = tempfile::TempDir::new().unwrap();
797 let be = LanceDbBackend::connect(dir.path().to_str().unwrap(), None)
798 .await
799 .unwrap();
800 let backend: &dyn StorageBackend = &be;
801
802 let mut props = HashMap::new();
803 props.insert("weight".to_string(), Value::Float(0.5));
804
805 let live = MainEdgeDataset::build_record_batch(
807 &[(
808 Eid::new(1),
809 Vid::new(1),
810 Vid::new(2),
811 "KNOWS".to_string(),
812 props.clone(),
813 false,
814 1u64,
815 )],
816 None,
817 None,
818 )
819 .unwrap();
820 MainEdgeDataset::write_batch(backend, live).await.unwrap();
821
822 assert!(
824 MainEdgeDataset::find_props_by_eid(backend, Eid::new(1))
825 .await
826 .unwrap()
827 .is_some()
828 );
829 assert_eq!(
830 MainEdgeDataset::find_type_by_eid(backend, Eid::new(1))
831 .await
832 .unwrap(),
833 Some("KNOWS".to_string())
834 );
835
836 let dead = MainEdgeDataset::build_record_batch(
838 &[(
839 Eid::new(1),
840 Vid::new(1),
841 Vid::new(2),
842 "KNOWS".to_string(),
843 props,
844 true,
845 2u64,
846 )],
847 None,
848 None,
849 )
850 .unwrap();
851 MainEdgeDataset::write_batch(backend, dead).await.unwrap();
852
853 assert_eq!(
854 MainEdgeDataset::find_props_by_eid(backend, Eid::new(1))
855 .await
856 .unwrap(),
857 None,
858 "deleted (highest-version) winner must not resurrect edge props"
859 );
860 assert_eq!(
861 MainEdgeDataset::find_type_by_eid(backend, Eid::new(1))
862 .await
863 .unwrap(),
864 None,
865 "deleted winner must not return an edge type"
866 );
867 }
868}