Skip to main content

uni_store/storage/
main_edge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Main edge table for unified edge storage.
5//!
6//! This module implements the main `edges` table as described in STORAGE_DESIGN.md.
7//! The main table contains all edges in the graph with:
8//! - `_eid`: Internal edge ID (primary key)
9//! - `src_vid`: Source vertex ID
10//! - `dst_vid`: Destination vertex ID
11//! - `type`: Edge type name
12//! - `props_json`: All properties as JSONB blob
13//! - `_deleted`: Soft-delete flag
14//! - `_version`: MVCC version
15//! - `_created_at`: Creation timestamp
16//! - `_updated_at`: Update timestamp
17
18use crate::backend::StorageBackend;
19use crate::backend::table_names;
20use crate::backend::types::{ScalarIndexType, ScanRequest};
21use crate::storage::arrow_convert::build_timestamp_column_from_eid_map;
22use anyhow::{Result, anyhow};
23use arrow_array::builder::{LargeBinaryBuilder, StringBuilder};
24use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt64Array};
25use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
26use sha3::{Digest, Sha3_256};
27use std::collections::HashMap;
28use std::sync::Arc;
29use uni_common::Properties;
30use uni_common::core::id::{Eid, UniId, Vid};
31
32/// Main edge dataset for the unified `edges` table.
33///
34/// This table contains all edges regardless of type, providing:
35/// - Fast ID-based lookups without knowing the edge type
36/// - Unified traversal queries
37#[derive(Debug)]
38pub struct MainEdgeDataset {
39    _base_uri: String,
40}
41
42impl MainEdgeDataset {
43    /// Create a new MainEdgeDataset.
44    pub fn new(base_uri: &str) -> Self {
45        Self {
46            _base_uri: base_uri.to_string(),
47        }
48    }
49
50    /// Compute the content-addressed UID for an edge.
51    ///
52    /// Edge identity is the SHA3-256 of
53    /// `(src_uid, dst_uid, edge_type, sorted_properties)` — the same
54    /// content-addressed pattern as
55    /// `MainVertexDataset::compute_vertex_uid` but extended with
56    /// edge endpoint UIDs and the edge type. This lets fork diff and
57    /// promote distinguish parallel edges between the same endpoints
58    /// when their property bags differ (multi-edge support).
59    ///
60    /// Property iteration is sorted by key for deterministic hashing
61    /// across machines and runs.
62    pub fn compute_edge_uid(
63        src_uid: &UniId,
64        dst_uid: &UniId,
65        edge_type: &str,
66        props: &Properties,
67    ) -> UniId {
68        let mut hasher = Sha3_256::new();
69
70        // Endpoint UIDs first — direction is significant
71        // (src→dst ≠ dst→src for the same property bag).
72        hasher.update(b"src:");
73        hasher.update(src_uid.as_bytes());
74        hasher.update(b"\0");
75        hasher.update(b"dst:");
76        hasher.update(dst_uid.as_bytes());
77        hasher.update(b"\0");
78
79        // Edge type.
80        hasher.update(b"type:");
81        hasher.update(edge_type.as_bytes());
82        hasher.update(b"\0");
83
84        // Properties sorted by key (matches compute_vertex_uid).
85        let mut sorted_keys: Vec<_> = props.keys().collect();
86        sorted_keys.sort();
87        for key in sorted_keys {
88            if let Some(val) = props.get(key) {
89                hasher.update(key.as_bytes());
90                hasher.update(b":");
91                hasher.update(val.to_string().as_bytes());
92                hasher.update(b"\0");
93            }
94        }
95
96        let result = hasher.finalize();
97        UniId::from_bytes(result.into())
98    }
99
100    /// Get the Arrow schema for the main edges table.
101    pub fn get_arrow_schema() -> Arc<ArrowSchema> {
102        Arc::new(ArrowSchema::new(vec![
103            Field::new("_eid", DataType::UInt64, false),
104            Field::new("src_vid", DataType::UInt64, false),
105            Field::new("dst_vid", DataType::UInt64, false),
106            Field::new("type", DataType::Utf8, false),
107            Field::new("props_json", DataType::LargeBinary, true),
108            Field::new("_deleted", DataType::Boolean, false),
109            Field::new("_version", DataType::UInt64, false),
110            Field::new(
111                "_created_at",
112                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
113                true,
114            ),
115            Field::new(
116                "_updated_at",
117                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
118                true,
119            ),
120        ]))
121    }
122
123    /// Get the table name for the main edges table.
124    pub fn table_name() -> &'static str {
125        "edges"
126    }
127
128    /// Build a record batch for the main edges table.
129    ///
130    /// # Arguments
131    /// * `edges` - List of (eid, src_vid, dst_vid, edge_type, properties, deleted, version) tuples
132    /// * `created_at` - Optional map of Eid -> nanoseconds since epoch
133    /// * `updated_at` - Optional map of Eid -> nanoseconds since epoch
134    pub fn build_record_batch(
135        edges: &[(Eid, Vid, Vid, String, Properties, bool, u64)],
136        created_at: Option<&HashMap<Eid, i64>>,
137        updated_at: Option<&HashMap<Eid, i64>>,
138    ) -> Result<RecordBatch> {
139        let arrow_schema = Self::get_arrow_schema();
140        let mut columns: Vec<ArrayRef> = Vec::with_capacity(arrow_schema.fields().len());
141
142        // _eid column
143        let eids: Vec<u64> = edges
144            .iter()
145            .map(|(e, _, _, _, _, _, _)| e.as_u64())
146            .collect();
147        columns.push(Arc::new(UInt64Array::from(eids)));
148
149        // src_vid column
150        let src_vids: Vec<u64> = edges
151            .iter()
152            .map(|(_, s, _, _, _, _, _)| s.as_u64())
153            .collect();
154        columns.push(Arc::new(UInt64Array::from(src_vids)));
155
156        // dst_vid column
157        let dst_vids: Vec<u64> = edges
158            .iter()
159            .map(|(_, _, d, _, _, _, _)| d.as_u64())
160            .collect();
161        columns.push(Arc::new(UInt64Array::from(dst_vids)));
162
163        // type column
164        let mut type_builder = StringBuilder::new();
165        for (_, _, _, edge_type, _, _, _) in edges.iter() {
166            type_builder.append_value(edge_type);
167        }
168        columns.push(Arc::new(type_builder.finish()));
169
170        // props_json column (JSONB binary encoding)
171        let mut props_json_builder = LargeBinaryBuilder::new();
172        for (_, _, _, _, props, _, _) in edges.iter() {
173            let jsonb_bytes = {
174                let json_val = serde_json::to_value(props).unwrap_or(serde_json::json!({}));
175                let uni_val: uni_common::Value = json_val.into();
176                uni_common::cypher_value_codec::encode(&uni_val)
177            };
178            props_json_builder.append_value(&jsonb_bytes);
179        }
180        columns.push(Arc::new(props_json_builder.finish()));
181
182        // _deleted column
183        let deleted: Vec<bool> = edges.iter().map(|(_, _, _, _, _, d, _)| *d).collect();
184        columns.push(Arc::new(BooleanArray::from(deleted)));
185
186        // _version column
187        let versions: Vec<u64> = edges.iter().map(|(_, _, _, _, _, _, v)| *v).collect();
188        columns.push(Arc::new(UInt64Array::from(versions)));
189
190        // _created_at and _updated_at columns using shared builder
191        let eids = edges.iter().map(|(e, _, _, _, _, _, _)| *e);
192        columns.push(build_timestamp_column_from_eid_map(
193            eids.clone(),
194            created_at,
195        ));
196        columns.push(build_timestamp_column_from_eid_map(eids, updated_at));
197
198        RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
199    }
200
201    /// Write a batch to the main edges table.
202    ///
203    /// Creates the table if it doesn't exist, otherwise appends to it.
204    /// Race-safe under async-flush — see
205    /// `crate::storage::manager::write_batch_with_lance_conflict_retry`.
206    pub async fn write_batch(backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
207        let table_name = table_names::main_edge_table_name();
208        crate::storage::manager::write_batch_with_lance_conflict_retry(backend, table_name, batch)
209            .await
210    }
211
212    /// Ensure default indexes exist on the main edges table.
213    ///
214    /// Checks for existing indexes before creating to avoid expensive
215    /// full-table rebuilds on every flush (LanceDB replaces indexes on create).
216    pub async fn ensure_default_indexes(backend: &dyn StorageBackend) -> Result<()> {
217        let table_name = table_names::main_edge_table_name();
218        let indices = backend.list_indexes(table_name).await?;
219
220        let has_index = |col: &str| {
221            indices
222                .iter()
223                .any(|idx| idx.columns.contains(&col.to_string()))
224        };
225
226        for (column, idx_type) in [
227            ("_eid", ScalarIndexType::BTree),
228            ("src_vid", ScalarIndexType::BTree),
229            ("dst_vid", ScalarIndexType::BTree),
230            ("type", ScalarIndexType::BTree),
231        ] {
232            if has_index(column) {
233                continue;
234            }
235            log::info!("Creating {} index on main_edges", column);
236            if let Err(e) = backend
237                .create_scalar_index(table_name, column, idx_type)
238                .await
239            {
240                log::warn!("Failed to create {} index on main_edges: {}", column, e);
241            }
242        }
243
244        Ok(())
245    }
246
247    /// Query the main edges table for an edge by eid.
248    pub async fn find_by_eid(
249        backend: &dyn StorageBackend,
250        eid: Eid,
251    ) -> Result<Option<(Vid, Vid, String, Properties)>> {
252        let filter = format!("_eid = {}", eid.as_u64());
253        let results = Self::execute_query(backend, &filter, None).await?;
254
255        for batch in results {
256            if batch.num_rows() > 0 {
257                let src_vid_col = batch.column_by_name("src_vid");
258                let dst_vid_col = batch.column_by_name("dst_vid");
259                let type_col = batch.column_by_name("type");
260                let props_col = batch.column_by_name("props_json");
261
262                if let (Some(src), Some(dst), Some(typ), Some(props)) =
263                    (src_vid_col, dst_vid_col, type_col, props_col)
264                    && let (Some(src_arr), Some(dst_arr), Some(type_arr), Some(props_arr)) = (
265                        src.as_any().downcast_ref::<UInt64Array>(),
266                        dst.as_any().downcast_ref::<UInt64Array>(),
267                        typ.as_any().downcast_ref::<arrow_array::StringArray>(),
268                        props
269                            .as_any()
270                            .downcast_ref::<arrow_array::LargeBinaryArray>(),
271                    )
272                {
273                    let src_vid = Vid::from(src_arr.value(0));
274                    let dst_vid = Vid::from(dst_arr.value(0));
275                    let edge_type = type_arr.value(0).to_string();
276                    let properties: Properties = if props_arr.is_null(0)
277                        || props_arr.value(0).is_empty()
278                    {
279                        Properties::new()
280                    } else {
281                        let uni_val = uni_common::cypher_value_codec::decode(props_arr.value(0))
282                            .unwrap_or(uni_common::Value::Null);
283                        let json_val: serde_json::Value = uni_val.into();
284                        serde_json::from_value(json_val).unwrap_or_default()
285                    };
286
287                    return Ok(Some((src_vid, dst_vid, edge_type, properties)));
288                }
289            }
290        }
291
292        Ok(None)
293    }
294
295    /// Check whether an edge exists by EID, regardless of deletion status.
296    ///
297    /// Unlike `find_props_by_eid`, this does NOT filter by `_deleted = false`,
298    /// so it returns true for both active and soft-deleted edges. Used by the
299    /// compaction invariant check to verify dual-writes occurred.
300    pub async fn exists_by_eid(backend: &dyn StorageBackend, eid: Eid) -> Result<bool> {
301        let filter = format!("_eid = {}", eid.as_u64());
302        let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
303        Ok(!batches.is_empty() && batches.iter().any(|b| b.num_rows() > 0))
304    }
305
306    /// Execute a query on the main edges table.
307    ///
308    /// Returns empty vec if table doesn't exist.
309    async fn execute_query(
310        backend: &dyn StorageBackend,
311        filter: &str,
312        columns: Option<Vec<&str>>,
313    ) -> Result<Vec<RecordBatch>> {
314        let table_name = table_names::main_edge_table_name();
315
316        if !backend.table_exists(table_name).await? {
317            return Ok(Vec::new());
318        }
319
320        let mut request = ScanRequest::all(table_name).with_filter(filter);
321        if let Some(cols) = columns {
322            request = request.with_columns(cols.into_iter().map(String::from).collect());
323        }
324
325        backend.scan(request).await
326    }
327
328    /// Extract EIDs from record batches.
329    fn extract_eids(batches: &[RecordBatch]) -> Vec<Eid> {
330        let mut eids = Vec::new();
331        for batch in batches {
332            if let Some(eid_col) = batch.column_by_name("_eid")
333                && let Some(eid_arr) = eid_col.as_any().downcast_ref::<UInt64Array>()
334            {
335                for i in 0..eid_arr.len() {
336                    if !eid_arr.is_null(i) {
337                        eids.push(Eid::new(eid_arr.value(i)));
338                    }
339                }
340            }
341        }
342        eids
343    }
344
345    /// Find all non-deleted EIDs from the main edges table.
346    pub async fn find_all_eids(backend: &dyn StorageBackend) -> Result<Vec<Eid>> {
347        let batches = Self::execute_query(backend, "_deleted = false", Some(vec!["_eid"])).await?;
348        Ok(Self::extract_eids(&batches))
349    }
350
351    /// Find EIDs by type name in the main edges table.
352    pub async fn find_eids_by_type_name(
353        backend: &dyn StorageBackend,
354        type_name: &str,
355    ) -> Result<Vec<Eid>> {
356        let filter = format!(
357            "_deleted = false AND type = '{}'",
358            type_name.replace('\'', "''")
359        );
360        let batches = Self::execute_query(backend, &filter, Some(vec!["_eid"])).await?;
361        Ok(Self::extract_eids(&batches))
362    }
363
364    /// Find properties for an edge by EID in the main edges table.
365    ///
366    /// Returns the props_json parsed into a Properties HashMap if found.
367    /// This is used as a fallback for unknown/schemaless edge types.
368    pub async fn find_props_by_eid(
369        backend: &dyn StorageBackend,
370        eid: Eid,
371    ) -> Result<Option<Properties>> {
372        let filter = format!("_eid = {} AND _deleted = false", eid.as_u64());
373        let batches =
374            Self::execute_query(backend, &filter, Some(vec!["props_json", "_version"])).await?;
375
376        if batches.is_empty() {
377            return Ok(None);
378        }
379
380        // Find the row with highest version (latest)
381        let mut best_props: Option<Properties> = None;
382        let mut best_version: u64 = 0;
383
384        for batch in &batches {
385            let props_col = batch.column_by_name("props_json");
386            let version_col = batch.column_by_name("_version");
387
388            if let (Some(props_arr), Some(ver_arr)) = (
389                props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()),
390                version_col.and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
391            ) {
392                for i in 0..batch.num_rows() {
393                    let version = if ver_arr.is_null(i) {
394                        0
395                    } else {
396                        ver_arr.value(i)
397                    };
398
399                    if version >= best_version {
400                        best_version = version;
401                        best_props = Some(Self::parse_props_json(props_arr, i)?);
402                    }
403                }
404            }
405        }
406
407        Ok(best_props)
408    }
409
410    /// Parse props_json from a LargeBinaryArray (JSONB) at the given index.
411    fn parse_props_json(arr: &arrow_array::LargeBinaryArray, idx: usize) -> Result<Properties> {
412        if arr.is_null(idx) || arr.value(idx).is_empty() {
413            return Ok(Properties::new());
414        }
415        let bytes = arr.value(idx);
416        let uni_val = uni_common::cypher_value_codec::decode(bytes)
417            .map_err(|e| anyhow!("Failed to decode CypherValue: {}", e))?;
418        let json_val: serde_json::Value = uni_val.into();
419        serde_json::from_value(json_val).map_err(|e| anyhow!("Failed to parse props_json: {}", e))
420    }
421
422    /// Find edge type name by EID in the main edges table.
423    pub async fn find_type_by_eid(
424        backend: &dyn StorageBackend,
425        eid: Eid,
426    ) -> Result<Option<String>> {
427        let filter = format!("_eid = {} AND _deleted = false", eid.as_u64());
428        let batches = Self::execute_query(backend, &filter, Some(vec!["type"])).await?;
429
430        for batch in batches {
431            if batch.num_rows() > 0
432                && let Some(type_col) = batch.column_by_name("type")
433                && let Some(type_arr) = type_col.as_any().downcast_ref::<arrow_array::StringArray>()
434                && !type_arr.is_null(0)
435            {
436                return Ok(Some(type_arr.value(0).to_string()));
437            }
438        }
439
440        Ok(None)
441    }
442
443    /// Find edge data (eid, src_vid, dst_vid, props) by type name in the main edges table.
444    ///
445    /// Returns all non-deleted edges with the given type name.
446    pub async fn find_edges_by_type_name(
447        backend: &dyn StorageBackend,
448        type_name: &str,
449    ) -> Result<Vec<(Eid, Vid, Vid, Properties)>> {
450        let filter = format!(
451            "_deleted = false AND type = '{}'",
452            type_name.replace('\'', "''")
453        );
454        // Fetch all columns for edge data
455        let batches = Self::execute_query(backend, &filter, None).await?;
456
457        let mut edges = Vec::new();
458        for batch in &batches {
459            Self::extract_edges_from_batch(batch, &mut edges)?;
460        }
461
462        Ok(edges)
463    }
464
465    /// Find edge data (eid, src_vid, dst_vid, edge_type, props) by multiple type names in the main edges table.
466    ///
467    /// Returns all non-deleted edges with any of the given type names.
468    /// This is used for OR relationship type queries like `[:KNOWS|HATES]`.
469    pub async fn find_edges_by_type_names(
470        backend: &dyn StorageBackend,
471        type_names: &[&str],
472    ) -> Result<Vec<(Eid, Vid, Vid, String, Properties)>> {
473        if type_names.is_empty() {
474            return Ok(Vec::new());
475        }
476
477        // Build IN clause: type IN ('T1', 'T2', ...)
478        let escaped_types: Vec<String> = type_names
479            .iter()
480            .map(|t| format!("'{}'", t.replace('\'', "''")))
481            .collect();
482        let filter = format!(
483            "_deleted = false AND type IN ({})",
484            escaped_types.join(", ")
485        );
486
487        // Fetch all columns for edge data
488        let batches = Self::execute_query(backend, &filter, None).await?;
489
490        let mut edges = Vec::new();
491        for batch in &batches {
492            Self::extract_edges_with_type_from_batch(batch, &mut edges)?;
493        }
494
495        Ok(edges)
496    }
497
498    /// Extract edge data from a record batch (without edge type).
499    fn extract_edges_from_batch(
500        batch: &RecordBatch,
501        edges: &mut Vec<(Eid, Vid, Vid, Properties)>,
502    ) -> Result<()> {
503        // Reuse the with-type extraction and discard the edge type
504        let mut edges_with_type = Vec::new();
505        Self::extract_edges_with_type_from_batch(batch, &mut edges_with_type)?;
506        edges.extend(
507            edges_with_type
508                .into_iter()
509                .map(|(eid, src, dst, _type, props)| (eid, src, dst, props)),
510        );
511        Ok(())
512    }
513
514    /// Extract edge data with type from a record batch.
515    fn extract_edges_with_type_from_batch(
516        batch: &RecordBatch,
517        edges: &mut Vec<(Eid, Vid, Vid, String, Properties)>,
518    ) -> Result<()> {
519        let Some(eid_arr) = batch
520            .column_by_name("_eid")
521            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
522        else {
523            return Ok(());
524        };
525        let Some(src_arr) = batch
526            .column_by_name("src_vid")
527            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
528        else {
529            return Ok(());
530        };
531        let Some(dst_arr) = batch
532            .column_by_name("dst_vid")
533            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
534        else {
535            return Ok(());
536        };
537        let type_arr = batch
538            .column_by_name("type")
539            .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
540        let props_arr = batch
541            .column_by_name("props_json")
542            .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
543
544        for i in 0..batch.num_rows() {
545            if eid_arr.is_null(i) || src_arr.is_null(i) || dst_arr.is_null(i) {
546                continue;
547            }
548
549            let eid = Eid::new(eid_arr.value(i));
550            let src_vid = Vid::new(src_arr.value(i));
551            let dst_vid = Vid::new(dst_arr.value(i));
552            let edge_type = type_arr
553                .filter(|arr| !arr.is_null(i))
554                .map(|arr| arr.value(i).to_string())
555                .unwrap_or_default();
556            let props = props_arr
557                .map(|arr| Self::parse_props_json(arr, i))
558                .transpose()?
559                .unwrap_or_default();
560
561            edges.push((eid, src_vid, dst_vid, edge_type, props));
562        }
563
564        Ok(())
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571
572    #[test]
573    fn test_main_edge_schema() {
574        let schema = MainEdgeDataset::get_arrow_schema();
575        assert_eq!(schema.fields().len(), 9);
576        assert!(schema.field_with_name("_eid").is_ok());
577        assert!(schema.field_with_name("src_vid").is_ok());
578        assert!(schema.field_with_name("dst_vid").is_ok());
579        assert!(schema.field_with_name("type").is_ok());
580        assert!(schema.field_with_name("props_json").is_ok());
581        assert!(schema.field_with_name("_deleted").is_ok());
582        assert!(schema.field_with_name("_version").is_ok());
583        assert!(schema.field_with_name("_created_at").is_ok());
584        assert!(schema.field_with_name("_updated_at").is_ok());
585    }
586
587    #[test]
588    fn test_build_record_batch() {
589        use uni_common::Value;
590        let mut props = HashMap::new();
591        props.insert("weight".to_string(), Value::Float(0.5));
592
593        let edges = vec![(
594            Eid::new(1),
595            Vid::new(1),
596            Vid::new(2),
597            "KNOWS".to_string(),
598            props,
599            false,
600            1u64,
601        )];
602
603        let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
604        assert_eq!(batch.num_rows(), 1);
605        assert_eq!(batch.num_columns(), 9);
606    }
607
608    #[test]
609    fn test_build_record_batch_multiple_edges() {
610        use uni_common::Value;
611
612        let edges = vec![
613            (
614                Eid::new(1),
615                Vid::new(1),
616                Vid::new(2),
617                "KNOWS".to_string(),
618                HashMap::from([("since".to_string(), Value::Int(2020))]),
619                false,
620                1u64,
621            ),
622            (
623                Eid::new(2),
624                Vid::new(2),
625                Vid::new(3),
626                "WORKS_AT".to_string(),
627                HashMap::new(),
628                false,
629                2u64,
630            ),
631            (
632                Eid::new(3),
633                Vid::new(1),
634                Vid::new(3),
635                "KNOWS".to_string(),
636                HashMap::new(),
637                true, // deleted
638                3u64,
639            ),
640        ];
641
642        let batch = MainEdgeDataset::build_record_batch(&edges, None, None).unwrap();
643        assert_eq!(batch.num_rows(), 3);
644        assert_eq!(batch.num_columns(), 9);
645
646        // Verify type column has correct values
647        let type_col = batch
648            .column_by_name("type")
649            .unwrap()
650            .as_any()
651            .downcast_ref::<arrow_array::StringArray>()
652            .unwrap();
653        assert_eq!(type_col.value(0), "KNOWS");
654        assert_eq!(type_col.value(1), "WORKS_AT");
655        assert_eq!(type_col.value(2), "KNOWS");
656    }
657
658    #[test]
659    fn test_build_record_batch_with_timestamps() {
660        let edges = vec![(
661            Eid::new(1),
662            Vid::new(1),
663            Vid::new(2),
664            "KNOWS".to_string(),
665            HashMap::new(),
666            false,
667            1u64,
668        )];
669
670        let mut created_at: HashMap<Eid, i64> = HashMap::new();
671        created_at.insert(Eid::new(1), 1_000_000_000);
672
673        let mut updated_at: HashMap<Eid, i64> = HashMap::new();
674        updated_at.insert(Eid::new(1), 2_000_000_000);
675
676        let batch =
677            MainEdgeDataset::build_record_batch(&edges, Some(&created_at), Some(&updated_at))
678                .unwrap();
679        assert_eq!(batch.num_rows(), 1);
680
681        // Timestamp columns should exist and not be all null
682        let created_col = batch.column_by_name("_created_at").unwrap();
683        assert!(!created_col.is_null(0), "created_at should be populated");
684    }
685}