Skip to main content

uni_store/storage/
main_edge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Main edge table for unified edge storage.
5//!
6//! This module implements the main `edges` table as described in STORAGE_DESIGN.md.
7//! The main table contains all edges in the graph with:
8//! - `_eid`: Internal edge ID (primary key)
9//! - `src_vid`: Source vertex ID
10//! - `dst_vid`: Destination vertex ID
11//! - `type`: Edge type name
12//! - `props_json`: All properties as JSONB blob
13//! - `_deleted`: Soft-delete flag
14//! - `_version`: MVCC version
15//! - `_created_at`: Creation timestamp
16//! - `_updated_at`: Update timestamp
17
18use crate::backend::StorageBackend;
19use crate::backend::table_names;
20use crate::backend::types::{ScalarIndexType, ScanRequest};
21use crate::storage::arrow_convert::build_timestamp_column_from_eid_map;
22use anyhow::{Result, anyhow};
23use arrow_array::builder::{LargeBinaryBuilder, StringBuilder};
24use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt64Array};
25use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
26use sha3::{Digest, Sha3_256};
27use std::collections::HashMap;
28use std::sync::Arc;
29use uni_common::Properties;
30use uni_common::core::id::{Eid, UniId, Vid};
31
32/// Which edge endpoint a pushed-down vid set constrains in
33/// [`MainEdgeDataset::find_edges_by_type_names`].
34///
35/// `Src` for outgoing traversals, `Dst` for incoming, `Either` for
36/// undirected (`Both`) traversals.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum EndpointSide {
39    Src,
40    Dst,
41    Either,
42}
43
44/// Main edge dataset for the unified `edges` table.
45///
46/// This table contains all edges regardless of type, providing:
47/// - Fast ID-based lookups without knowing the edge type
48/// - Unified traversal queries
49#[derive(Debug)]
50pub struct MainEdgeDataset {
51    _base_uri: String,
52}
53
54impl MainEdgeDataset {
55    /// Create a new MainEdgeDataset.
56    pub fn new(base_uri: &str) -> Self {
57        Self {
58            _base_uri: base_uri.to_string(),
59        }
60    }
61
62    /// Compute the content-addressed UID for an edge.
63    ///
64    /// Edge identity is the SHA3-256 of
65    /// `(src_uid, dst_uid, edge_type, sorted_properties)` — the same
66    /// content-addressed pattern as
67    /// `MainVertexDataset::compute_vertex_uid` but extended with
68    /// edge endpoint UIDs and the edge type. This lets fork diff and
69    /// promote distinguish parallel edges between the same endpoints
70    /// when their property bags differ (multi-edge support).
71    ///
72    /// Property iteration is sorted by key for deterministic hashing
73    /// across machines and runs.
74    pub fn compute_edge_uid(
75        src_uid: &UniId,
76        dst_uid: &UniId,
77        edge_type: &str,
78        props: &Properties,
79    ) -> UniId {
80        let mut hasher = Sha3_256::new();
81
82        // Endpoint UIDs first — direction is significant
83        // (src→dst ≠ dst→src for the same property bag).
84        hasher.update(b"src:");
85        hasher.update(src_uid.as_bytes());
86        hasher.update(b"\0");
87        hasher.update(b"dst:");
88        hasher.update(dst_uid.as_bytes());
89        hasher.update(b"\0");
90
91        // Edge type.
92        hasher.update(b"type:");
93        hasher.update(edge_type.as_bytes());
94        hasher.update(b"\0");
95
96        // Properties sorted by key (matches compute_vertex_uid).
97        let mut sorted_keys: Vec<_> = props.keys().collect();
98        sorted_keys.sort();
99        for key in sorted_keys {
100            if let Some(val) = props.get(key) {
101                hasher.update(key.as_bytes());
102                hasher.update(b":");
103                hasher.update(val.to_string().as_bytes());
104                hasher.update(b"\0");
105            }
106        }
107
108        let result = hasher.finalize();
109        UniId::from_bytes(result.into())
110    }
111
112    /// Get the Arrow schema for the main edges table.
113    pub fn get_arrow_schema() -> Arc<ArrowSchema> {
114        Arc::new(ArrowSchema::new(vec![
115            Field::new("_eid", DataType::UInt64, false),
116            Field::new("src_vid", DataType::UInt64, false),
117            Field::new("dst_vid", DataType::UInt64, false),
118            Field::new("type", DataType::Utf8, false),
119            Field::new("props_json", DataType::LargeBinary, true),
120            Field::new("_deleted", DataType::Boolean, false),
121            Field::new("_version", DataType::UInt64, false),
122            Field::new(
123                "_created_at",
124                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
125                true,
126            ),
127            Field::new(
128                "_updated_at",
129                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
130                true,
131            ),
132        ]))
133    }
134
135    /// Get the table name for the main edges table.
136    pub fn table_name() -> &'static str {
137        "edges"
138    }
139
140    /// Build a record batch for the main edges table.
141    ///
142    /// # Arguments
143    /// * `edges` - List of (eid, src_vid, dst_vid, edge_type, properties, deleted, version) tuples
144    /// * `created_at` - Optional map of Eid -> nanoseconds since epoch
145    /// * `updated_at` - Optional map of Eid -> nanoseconds since epoch
146    pub fn build_record_batch(
147        edges: &[(Eid, Vid, Vid, String, Properties, bool, u64)],
148        created_at: Option<&HashMap<Eid, i64>>,
149        updated_at: Option<&HashMap<Eid, i64>>,
150    ) -> Result<RecordBatch> {
151        let arrow_schema = Self::get_arrow_schema();
152        let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
153
154        // _eid column
155        let eids: Vec<u64> = edges
156            .iter()
157            .map(|(e, _, _, _, _, _, _)| e.as_u64())
158            .collect();
159        columns.push(Arc::new(UInt64Array::from(eids)));
160
161        // src_vid column
162        let src_vids: Vec<u64> = edges
163            .iter()
164            .map(|(_, s, _, _, _, _, _)| s.as_u64())
165            .collect();
166        columns.push(Arc::new(UInt64Array::from(src_vids)));
167
168        // dst_vid column
169        let dst_vids: Vec<u64> = edges
170            .iter()
171            .map(|(_, _, d, _, _, _, _)| d.as_u64())
172            .collect();
173        columns.push(Arc::new(UInt64Array::from(dst_vids)));
174
175        // type column
176        let mut type_builder = StringBuilder::new();
177        for (_, _, _, edge_type, _, _, _) in edges.iter() {
178            type_builder.append_value(edge_type);
179        }
180        columns.push(Arc::new(type_builder.finish()));
181
182        // props_json column (JSONB binary encoding)
183        let mut props_json_builder = LargeBinaryBuilder::new();
184        for (_, _, _, _, props, _, _) in edges.iter() {
185            let jsonb_bytes = {
186                let json_val = serde_json::to_value(props).unwrap_or(serde_json::json!({}));
187                let uni_val: uni_common::Value = json_val.into();
188                uni_common::cypher_value_codec::encode(&uni_val)
189            };
190            props_json_builder.append_value(&jsonb_bytes);
191        }
192        columns.push(Arc::new(props_json_builder.finish()));
193
194        // _deleted column
195        let deleted: Vec<bool> = edges.iter().map(|(_, _, _, _, _, d, _)| *d).collect();
196        columns.push(Arc::new(BooleanArray::from(deleted)));
197
198        // _version column
199        let versions: Vec<u64> = edges.iter().map(|(_, _, _, _, _, _, v)| *v).collect();
200        columns.push(Arc::new(UInt64Array::from(versions)));
201
202        // _created_at and _updated_at columns using shared builder
203        let eids = edges.iter().map(|(e, _, _, _, _, _, _)| *e);
204        columns.push(build_timestamp_column_from_eid_map(
205            eids.clone(),
206            created_at,
207        ));
208        columns.push(build_timestamp_column_from_eid_map(eids, updated_at));
209
210        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
211    }
212
213    /// Write a batch to the main edges table.
214    ///
215    /// Creates the table if it doesn't exist, otherwise appends to it.
216    /// Race-safe under async-flush — see
217    /// `crate::storage::manager::write_batch_with_lance_conflict_retry`.
218    pub async fn write_batch(backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
219        let table_name = table_names::main_edge_table_name();
220        crate::storage::manager::write_batch_with_lance_conflict_retry(backend, table_name, batch)
221            .await
222    }
223
224    /// Ensure default indexes exist on the main edges table.
225    ///
226    /// Checks for existing indexes before creating to avoid expensive
227    /// full-table rebuilds on every flush (LanceDB replaces indexes on create).
228    pub async fn ensure_default_indexes(backend: &dyn StorageBackend) -> Result<()> {
229        let table_name = table_names::main_edge_table_name();
230        let indices = backend.list_indexes(table_name).await?;
231
232        let has_index = |col: &str| {
233            indices
234                .iter()
235                .any(|idx| idx.columns.contains(&col.to_string()))
236        };
237
238        for (column, idx_type) in [
239            ("_eid", ScalarIndexType::BTree),
240            ("src_vid", ScalarIndexType::BTree),
241            ("dst_vid", ScalarIndexType::BTree),
242            ("type", ScalarIndexType::BTree),
243        ] {
244            if has_index(column) {
245                continue;
246            }
247            log::info!("Creating {} index on main_edges", column);
248            if let Err(e) = backend
249                .create_scalar_index(table_name, column, idx_type)
250                .await
251            {
252                log::warn!("Failed to create {} index on main_edges: {}", column, e);
253            }
254        }
255
256        Ok(())
257    }
258
259    /// Query the main edges table for an edge by eid.
260    pub async fn find_by_eid(
261        backend: &dyn StorageBackend,
262        eid: Eid,
263    ) -> Result<Option<(Vid, Vid, String, Properties)>> {
264        let filter = format!("_eid = {}", eid.as_u64());
265        let results = Self::execute_query(backend, &filter, None).await?;
266
267        for batch in results {
268            if batch.num_rows() > 0 {
269                let src_vid_col = batch.column_by_name("src_vid");
270                let dst_vid_col = batch.column_by_name("dst_vid");
271                let type_col = batch.column_by_name("type");
272                let props_col = batch.column_by_name("props_json");
273
274                if let (Some(src), Some(dst), Some(typ), Some(props)) =
275                    (src_vid_col, dst_vid_col, type_col, props_col)
276                    && let (Some(src_arr), Some(dst_arr), Some(type_arr), Some(props_arr)) = (
277                        src.as_any().downcast_ref::<UInt64Array>(),
278                        dst.as_any().downcast_ref::<UInt64Array>(),
279                        typ.as_any().downcast_ref::<arrow_array::StringArray>(),
280                        props
281                            .as_any()
282                            .downcast_ref::<arrow_array::LargeBinaryArray>(),
283                    )
284                {
285                    let src_vid = Vid::from(src_arr.value(0));
286                    let dst_vid = Vid::from(dst_arr.value(0));
287                    let edge_type = type_arr.value(0).to_string();
288                    let properties: Properties = if props_arr.is_null(0)
289                        || props_arr.value(0).is_empty()
290                    {
291                        Properties::new()
292                    } else {
293                        let uni_val = uni_common::cypher_value_codec::decode(props_arr.value(0))
294                            .unwrap_or(uni_common::Value::Null);
295                        let json_val: serde_json::Value = uni_val.into();
296                        serde_json::from_value(json_val).unwrap_or_default()
297                    };
298
299                    return Ok(Some((src_vid, dst_vid, edge_type, properties)));
300                }
301            }
302        }
303
304        Ok(None)
305    }
306
307    /// Check whether an edge exists by EID, regardless of deletion status.
308    ///
309    /// Unlike `find_props_by_eid`, this does NOT filter by `_deleted = false`,
310    /// so it returns true for both active and soft-deleted edges. Used by the
311    /// compaction invariant check to verify dual-writes occurred.
312    pub async fn exists_by_eid(backend: &dyn StorageBackend, eid: Eid) -> Result<bool> {
313        let filter = format!("_eid = {}", eid.as_u64());
314        let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
315        Ok(!batches.is_empty() && batches.iter().any(|b| b.num_rows() > 0))
316    }
317
318    /// Execute a query on the main edges table.
319    ///
320    /// Returns empty vec if table doesn't exist.
321    async fn execute_query(
322        backend: &dyn StorageBackend,
323        filter: &str,
324        columns: Option<Vec<&str>>,
325    ) -> Result<Vec<RecordBatch>> {
326        let table_name = table_names::main_edge_table_name();
327
328        if !backend.table_exists(table_name).await? {
329            return Ok(Vec::new());
330        }
331
332        let mut request = ScanRequest::all(table_name).with_filter(filter);
333        if let Some(cols) = columns {
334            request = request.with_columns(cols.into_iter().map(String::from).collect());
335        }
336
337        backend.scan(request).await
338    }
339
340    /// Extract EIDs from record batches.
341    fn extract_eids(batches: &[RecordBatch]) -> Vec<Eid> {
342        let mut eids = Vec::new();
343        for batch in batches {
344            if let Some(eid_col) = batch.column_by_name("_eid")
345                && let Some(eid_arr) = eid_col.as_any().downcast_ref::<UInt64Array>()
346            {
347                for i in 0..eid_arr.len() {
348                    if !eid_arr.is_null(i) {
349                        eids.push(Eid::new(eid_arr.value(i)));
350                    }
351                }
352            }
353        }
354        eids
355    }
356
357    /// Find all non-deleted EIDs from the main edges table.
358    pub async fn find_all_eids(backend: &dyn StorageBackend) -> Result<Vec<Eid>> {
359        let batches = Self::execute_query(backend, "_deleted = false", Some(vec!["_eid"])).await?;
360        Ok(Self::extract_eids(&batches))
361    }
362
363    /// Find EIDs by type name in the main edges table.
364    pub async fn find_eids_by_type_name(
365        backend: &dyn StorageBackend,
366        type_name: &str,
367    ) -> Result<Vec<Eid>> {
368        let filter = format!(
369            "_deleted = false AND type = '{}'",
370            type_name.replace('\'', "''")
371        );
372        let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
373        Ok(Self::extract_eids(&batches))
374    }
375
376    /// Find properties for an edge by EID in the main edges table.
377    ///
378    /// Returns the props_json parsed into a Properties HashMap if found.
379    /// This is used as a fallback for unknown/schemaless edge types.
380    pub async fn find_props_by_eid(
381        backend: &dyn StorageBackend,
382        eid: Eid,
383    ) -> Result<Option<Properties>> {
384        // MVCC (review C2): the scan must see deletion tombstones — the
385        // highest-version row wins, and a deleted winner yields `None`.
386        // Filtering `_deleted = false` here would let an OLDER live version
387        // resurrect an edge whose tombstone is the true (highest-version)
388        // winner.
389        let filter = format!("_eid = {}", eid.as_u64());
390        let batches = Self::execute_query(
391            backend,
392            &filter,
393            Some(vec!["props_json", "_version", "_deleted"]),
394        )
395        .await?;
396
397        if batches.is_empty() {
398            return Ok(None);
399        }
400
401        // Find the row with highest version (latest), tombstones included.
402        let mut best_props: Option<Properties> = None;
403        let mut best_version: u64 = 0;
404        let mut best_deleted = false;
405
406        for batch in &batches {
407            let props_col = batch.column_by_name("props_json");
408            let version_col = batch.column_by_name("_version");
409            let deleted_col = batch
410                .column_by_name("_deleted")
411                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>());
412
413            if let (Some(props_arr), Some(ver_arr)) = (
414                props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()),
415                version_col.and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
416            ) {
417                for i in 0..batch.num_rows() {
418                    let version = if ver_arr.is_null(i) {
419                        0
420                    } else {
421                        ver_arr.value(i)
422                    };
423
424                    if version >= best_version {
425                        best_version = version;
426                        best_deleted = deleted_col.is_some_and(|d| d.value(i));
427                        best_props = if best_deleted {
428                            Some(Properties::new())
429                        } else {
430                            Some(Self::parse_props_json(props_arr, i)?)
431                        };
432                    }
433                }
434            }
435        }
436
437        if best_deleted {
438            return Ok(None);
439        }
440        Ok(best_props)
441    }
442
443    /// Parse props_json from a LargeBinaryArray (JSONB) at the given index.
444    fn parse_props_json(arr: &arrow_array::LargeBinaryArray, idx: usize) -> Result<Properties> {
445        if arr.is_null(idx) || arr.value(idx).is_empty() {
446            return Ok(Properties::new());
447        }
448        let bytes = arr.value(idx);
449        let uni_val = uni_common::cypher_value_codec::decode(bytes)
450            .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
451        let json_val: serde_json::Value = uni_val.into();
452        serde_json::from_value(json_val).map_err(|e| anyhow!("Failed to parse props_json: {}", e))
453    }
454
455    /// Find edge type name by EID in the main edges table.
456    pub async fn find_type_by_eid(
457        backend: &dyn StorageBackend,
458        eid: Eid,
459    ) -> Result<Option<String>> {
460        // MVCC (review C2): take the highest-version row — including tombstones —
461        // and return `None` if that winner is deleted. The old code filtered
462        // `_deleted = false` and took `value(0)` with no version comparison, so
463        // it could return an arbitrary (and possibly stale) edge type.
464        let filter = format!("_eid = {}", eid.as_u64());
465        let batches =
466            Self::execute_query(backend, &filter, Some(vec!["type", "_version", "_deleted"]))
467                .await?;
468
469        let mut best_type: Option<String> = None;
470        let mut best_version: u64 = 0;
471        let mut best_deleted = false;
472
473        for batch in &batches {
474            let type_col = batch
475                .column_by_name("type")
476                .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
477            let ver_col = batch
478                .column_by_name("_version")
479                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>());
480            let deleted_col = batch
481                .column_by_name("_deleted")
482                .and_then(|c| c.as_any().downcast_ref::<arrow_array::BooleanArray>());
483
484            if let (Some(type_arr), Some(ver_arr)) = (type_col, ver_col) {
485                for i in 0..batch.num_rows() {
486                    let version = if ver_arr.is_null(i) {
487                        0
488                    } else {
489                        ver_arr.value(i)
490                    };
491
492                    if version >= best_version {
493                        best_version = version;
494                        best_deleted = deleted_col.is_some_and(|d| d.value(i));
495                        best_type = if best_deleted || type_arr.is_null(i) {
496                            None
497                        } else {
498                            Some(type_arr.value(i).to_string())
499                        };
500                    }
501                }
502            }
503        }
504
505        if best_deleted {
506            return Ok(None);
507        }
508        Ok(best_type)
509    }
510
511    /// Find edge data (eid, src_vid, dst_vid, props) by type name in the main edges table.
512    ///
513    /// Returns all non-deleted edges with the given type name.
514    pub async fn find_edges_by_type_name(
515        backend: &dyn StorageBackend,
516        type_name: &str,
517    ) -> Result<Vec<(Eid, Vid, Vid, Properties)>> {
518        let filter = format!(
519            "_deleted = false AND type = '{}'",
520            type_name.replace('\'', "''")
521        );
522        // Fetch all columns for edge data
523        let batches = Self::execute_query(backend, &filter, None).await?;
524
525        let mut edges = Vec::new();
526        for batch in &batches {
527            Self::extract_edges_from_batch(batch, &mut edges)?;
528        }
529
530        Ok(edges)
531    }
532
533    /// Find edge data (eid, src_vid, dst_vid, edge_type, props) by multiple type names in the main edges table.
534    ///
535    /// Returns all non-deleted edges with any of the given type names.
536    /// This is used for OR relationship type queries like `[:KNOWS|HATES]`.
537    ///
538    /// `endpoint_filter` pushes a bounded endpoint set into the scan (review
539    /// perf #5: a 1-source schemaless traversal used to materialize the whole
540    /// edge type). `None` keeps the full-type scan.
541    pub async fn find_edges_by_type_names(
542        backend: &dyn StorageBackend,
543        type_names: &[&str],
544        endpoint_filter: Option<(EndpointSide, &[Vid])>,
545    ) -> Result<Vec<(Eid, Vid, Vid, String, Properties)>> {
546        if type_names.is_empty() {
547            return Ok(Vec::new());
548        }
549
550        // Build IN clause: type IN ('T1', 'T2', ...)
551        let escaped_types: Vec<String> = type_names
552            .iter()
553            .map(|t| format!("'{}'", t.replace('\'', "''")))
554            .collect();
555        let base_filter = format!(
556            "_deleted = false AND type IN ({})",
557            escaped_types.join(", ")
558        );
559
560        let mut edges = Vec::new();
561        match endpoint_filter {
562            None => {
563                // Fetch all columns for edge data
564                let batches = Self::execute_query(backend, &base_filter, None).await?;
565                for batch in &batches {
566                    Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
567                }
568            }
569            Some((_, [])) => {}
570            Some((side, vids)) => {
571                // Chunked so the filter string stays parseable for large sets.
572                const VID_CHUNK: usize = 8192;
573                for chunk in vids.chunks(VID_CHUNK) {
574                    let list = chunk
575                        .iter()
576                        .map(|v| v.as_u64().to_string())
577                        .collect::<Vec<_>>()
578                        .join(", ");
579                    let endpoint_clause = match side {
580                        EndpointSide::Src => format!("src_vid IN ({list})"),
581                        EndpointSide::Dst => format!("dst_vid IN ({list})"),
582                        EndpointSide::Either => {
583                            format!("(src_vid IN ({list}) OR dst_vid IN ({list}))")
584                        }
585                    };
586                    let filter = format!("{base_filter} AND {endpoint_clause}");
587                    let batches = Self::execute_query(backend, &filter, None).await?;
588                    for batch in &batches {
589                        Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
590                    }
591                }
592            }
593        }
594
595        Ok(edges)
596    }
597
598    /// Extract edge data from a record batch (without edge type).
599    fn extract_edges_from_batch(
600        batch: &RecordBatch,
601        edges: &mut Vec<(Eid, Vid, Vid, Properties)>,
602    ) -> Result<()> {
603        // Reuse the with-type extraction and discard the edge type
604        let mut edges_with_type = Vec::new();
605        Self::extract_edges_with_type_from_batch(batch, &mut edges_with_type)?;
606        edges.extend(
607            edges_with_type
608                .into_iter()
609                .map(|(eid, src, dst, _type, props)| (eid, src, dst, props)),
610        );
611        Ok(())
612    }
613
614    /// Extract edge data with type from a record batch.
615    fn extract_edges_with_type_from_batch(
616        batch: &RecordBatch,
617        edges: &mut Vec<(Eid, Vid, Vid, String, Properties)>,
618    ) -> Result<()> {
619        let Some(eid_arr) = batch
620            .column_by_name("_eid")
621            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
622        else {
623            return Ok(());
624        };
625        let Some(src_arr) = batch
626            .column_by_name("src_vid")
627            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
628        else {
629            return Ok(());
630        };
631        let Some(dst_arr) = batch
632            .column_by_name("dst_vid")
633            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
634        else {
635            return Ok(());
636        };
637        let type_arr = batch
638            .column_by_name("type")
639            .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
640        let props_arr = batch
641            .column_by_name("props_json")
642            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
643
644        for i in 0..batch.num_rows() {
645            if eid_arr.is_null(i) || src_arr.is_null(i) || dst_arr.is_null(i) {
646                continue;
647            }
648
649            let eid = Eid::new(eid_arr.value(i));
650            let src_vid = Vid::new(src_arr.value(i));
651            let dst_vid = Vid::new(dst_arr.value(i));
652            let edge_type = type_arr
653                .filter(|arr| !arr.is_null(i))
654                .map(|arr| arr.value(i).to_string())
655                .unwrap_or_default();
656            let props = props_arr
657                .map(|arr| Self::parse_props_json(arr, i))
658                .transpose()?
659                .unwrap_or_default();
660
661            edges.push((eid, src_vid, dst_vid, edge_type, props));
662        }
663
664        Ok(())
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    #[test]
673    fn test_main_edge_schema() {
674        let schema = MainEdgeDataset::get_arrow_schema();
675        assert_eq!(schema.fields().len(), 9);
676        assert!(schema.field_with_name("_eid").is_ok());
677        assert!(schema.field_with_name("src_vid").is_ok());
678        assert!(schema.field_with_name("dst_vid").is_ok());
679        assert!(schema.field_with_name("type").is_ok());
680        assert!(schema.field_with_name("props_json").is_ok());
681        assert!(schema.field_with_name("_deleted").is_ok());
682        assert!(schema.field_with_name("_version").is_ok());
683        assert!(schema.field_with_name("_created_at").is_ok());
684        assert!(schema.field_with_name("_updated_at").is_ok());
685    }
686
687    #[test]
688    fn test_build_record_batch() {
689        use uni_common::Value;
690        let mut props = HashMap::new();
691        props.insert("weight".to_string(), Value::Float(0.5));
692
693        let edges = vec![(
694            Eid::new(1),
695            Vid::new(1),
696            Vid::new(2),
697            "KNOWS".to_string(),
698            props,
699            false,
700            1u64,
701        )];
702
703        let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
704        assert_eq!(batch.num_rows(), 1);
705        assert_eq!(batch.num_columns(), 9);
706    }
707
708    #[test]
709    fn test_build_record_batch_multiple_edges() {
710        use uni_common::Value;
711
712        let edges = vec![
713            (
714                Eid::new(1),
715                Vid::new(1),
716                Vid::new(2),
717                "KNOWS".to_string(),
718                HashMap::from([("since".to_string(), Value::Int(2020))]),
719                false,
720                1u64,
721            ),
722            (
723                Eid::new(2),
724                Vid::new(2),
725                Vid::new(3),
726                "WORKS_AT".to_string(),
727                HashMap::new(),
728                false,
729                2u64,
730            ),
731            (
732                Eid::new(3),
733                Vid::new(1),
734                Vid::new(3),
735                "KNOWS".to_string(),
736                HashMap::new(),
737                true, // deleted
738                3u64,
739            ),
740        ];
741
742        let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
743        assert_eq!(batch.num_rows(), 3);
744        assert_eq!(batch.num_columns(), 9);
745
746        // Verify type column has correct values
747        let type_col = batch
748            .column_by_name("type")
749            .unwrap()
750            .as_any()
751            .downcast_ref::<arrow_array::StringArray>()
752            .unwrap();
753        assert_eq!(type_col.value(0), "KNOWS");
754        assert_eq!(type_col.value(1), "WORKS_AT");
755        assert_eq!(type_col.value(2), "KNOWS");
756    }
757
758    #[test]
759    fn test_build_record_batch_with_timestamps() {
760        let edges = vec![(
761            Eid::new(1),
762            Vid::new(1),
763            Vid::new(2),
764            "KNOWS".to_string(),
765            HashMap::new(),
766            false,
767            1u64,
768        )];
769
770        let mut created_at: HashMap<Eid, i64> = HashMap::new();
771        created_at.insert(Eid::new(1), 1_000_000_000);
772
773        let mut updated_at: HashMap<Eid, i64> = HashMap::new();
774        updated_at.insert(Eid::new(1), 2_000_000_000);
775
776        let batch =
777            MainEdgeDataset::build_record_batch(&edges, Some(&created_at), Some(&updated_at))
778                .unwrap();
779        assert_eq!(batch.num_rows(), 1);
780
781        // Timestamp columns should exist and not be all null
782        let created_col = batch.column_by_name("_created_at").unwrap();
783        assert!(!created_col.is_null(0), "created_at should be populated");
784    }
785
786    /// MVCC regression (review C2): a deletion tombstone written at a higher
787    /// version must win over the older live row. `find_props_by_eid` filtered
788    /// `_deleted = false` before version-ranking (so an older live version
789    /// resurrected a deleted edge), and `find_type_by_eid` took `value(0)` with
790    /// no version comparison.
791    #[tokio::test]
792    async fn test_edge_key_reads_respect_tombstone_winner() {
793        use crate::backend::lance::LanceDbBackend;
794        use uni_common::Value;
795
796        let dir = tempfile::TempDir::new().unwrap();
797        let be = LanceDbBackend::connect(dir.path().to_str().unwrap(), None)
798            .await
799            .unwrap();
800        let backend: &dyn StorageBackend = &be;
801
802        let mut props = HashMap::new();
803        props.insert("weight".to_string(), Value::Float(0.5));
804
805        // v1: live edge.
806        let live = MainEdgeDataset::build_record_batch(
807            &[(
808                Eid::new(1),
809                Vid::new(1),
810                Vid::new(2),
811                "KNOWS".to_string(),
812                props.clone(),
813                false,
814                1u64,
815            )],
816            None,
817            None,
818        )
819        .unwrap();
820        MainEdgeDataset::write_batch(backend, live).await.unwrap();
821
822        // Sanity: visible while live.
823        assert!(
824            MainEdgeDataset::find_props_by_eid(backend, Eid::new(1))
825                .await
826                .unwrap()
827                .is_some()
828        );
829        assert_eq!(
830            MainEdgeDataset::find_type_by_eid(backend, Eid::new(1))
831                .await
832                .unwrap(),
833            Some("KNOWS".to_string())
834        );
835
836        // v2: deletion tombstone at a higher version — the winning row.
837        let dead = MainEdgeDataset::build_record_batch(
838            &[(
839                Eid::new(1),
840                Vid::new(1),
841                Vid::new(2),
842                "KNOWS".to_string(),
843                props,
844                true,
845                2u64,
846            )],
847            None,
848            None,
849        )
850        .unwrap();
851        MainEdgeDataset::write_batch(backend, dead).await.unwrap();
852
853        assert_eq!(
854            MainEdgeDataset::find_props_by_eid(backend, Eid::new(1))
855                .await
856                .unwrap(),
857            None,
858            "deleted (highest-version) winner must not resurrect edge props"
859        );
860        assert_eq!(
861            MainEdgeDataset::find_type_by_eid(backend, Eid::new(1))
862                .await
863                .unwrap(),
864            None,
865            "deleted winner must not return an edge type"
866        );
867    }
868}