Skip to main content

uni_query/query/df_graph/
catalog_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Physical execution plans that dispatch graph reads against
5//! plugin-registered `CatalogTable`s (M5 Batch 2 follow-up #6 — virtual
6//! label-id allocation).
7//!
8//! When the planner encounters a `MATCH (n:External)` whose label is
9//! not in the native schema, follow-up #5 consults registered
10//! `CatalogProvider` / `ReplacementScanProvider`s for a claim. This
11//! file implements the "what happens after the claim succeeds" leg:
12//! a virtual `u16` label-id is allocated on `PluginRegistry`, the
13//! claiming `CatalogTable` is stashed alongside, and at physical-plan
14//! time `CatalogVertexScanExec` adapts that table's rows into the
15//! graph-row schema convention every downstream operator expects
16//! (`{var}._vid`, `{var}._labels`, `{var}.<prop>` columns).
17//!
18//! ## Adaptation contract
19//!
20//! - `_vid` (UInt64) is **synthesized** per row as
21//!   `(virtual_label_id as u64) << 48 | row_offset`. The high-16-bit
22//!   encoding makes virtual vids unambiguously distinguishable from
23//!   native vids (sequentially allocated from 0, well below
24//!   `0xFF00_0000_0000_0000`). Row offset increments across batches
25//!   within a single `execute()` call via an `AtomicU64`.
26//! - `_labels` is **synthesized** as a single-element `[label_name]`
27//!   `List<Utf8>` per row.
28//! - Property columns are projected from the catalog table's columns
29//!   by name match (`prop` ↔ table column named `prop`), then renamed
30//!   to the `{var}.{prop}` convention. Properties the table does not
31//!   expose materialize as null columns of `Utf8` type (loose typing
32//!   for now; tighten when the planner gains a property-type oracle
33//!   for virtual labels).
34//! - Reserved system column names (`_vid`, `_labels`, etc., and any
35//!   name starting with `_`) on the catalog table are rejected at
36//!   constructor time — they would collide with our synthesized
37//!   columns and silently produce wrong results.
38//!
39//! ## Edges
40//!
41//! `CatalogEdgeScanExec` follows the same pattern but synthesizes
42//! `_eid`, `_src_vid`, `_dst_vid` columns. The catalog table MUST
43//! declare `src_id` and `dst_id` columns (Int64 or UInt64) so the
44//! exec can populate `_src_vid`/`_dst_vid`. Without them the
45//! constructor errors immediately.
46
47use std::any::Any;
48use std::collections::HashMap;
49use std::fmt;
50use std::pin::Pin;
51use std::sync::Arc;
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::task::{Context, Poll};
54
55use arrow_array::builder::ListBuilder;
56use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
57use arrow_schema::{DataType, Field, Schema, SchemaRef};
58use datafusion::common::Result as DFResult;
59use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
60use datafusion::logical_expr::Expr as DfExpr;
61use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
62use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
63use futures::Stream;
64use uni_plugin::traits::catalog::CatalogTable;
65
66use crate::query::df_graph::common::{compute_plan_properties, labels_data_type};
67
68/// Per-row virtual-vid base. Encodes the label id in the high 16 bits.
69#[inline]
70fn virtual_vid_base(virtual_label_id: u16) -> u64 {
71    (virtual_label_id as u64) << 48
72}
73
74/// Verify the catalog table's schema has no reserved column names.
75/// Returns the offending name on failure so the caller can surface it.
76fn check_no_reserved_columns(schema: &SchemaRef) -> Result<(), String> {
77    for field in schema.fields() {
78        if field.name().starts_with('_') {
79            return Err(field.name().clone());
80        }
81    }
82    Ok(())
83}
84
85// ── Vertex scan ──────────────────────────────────────────────────────
86
87/// Adapts a virtual-label `CatalogTable` into a graph-row-shaped
88/// vertex scan. See module docs for the adaptation contract.
89pub struct CatalogVertexScanExec {
90    table: Arc<dyn CatalogTable>,
91    virtual_label_id: u16,
92    label_name: String,
93    variable: String,
94    /// Properties to project, in output order. Each must either match
95    /// a catalog-table column name (case-sensitive) or be served as a
96    /// nullable `Utf8` column of nulls.
97    properties: Vec<String>,
98    /// DataFusion filter expressions to pass to `table.scan(filters=)`.
99    /// The catalog is free to ignore them; the planner re-applies the
100    /// same predicates as a top-level `FilterExec` for safety.
101    pushdown_filters: Vec<DfExpr>,
102    /// Limit to pass to `table.scan(limit=)`; same "advisory" semantics.
103    pushdown_limit: Option<usize>,
104    /// Output schema with graph-row convention.
105    schema: SchemaRef,
106    properties_plan: Arc<PlanProperties>,
107    metrics: ExecutionPlanMetricsSet,
108}
109
110impl fmt::Debug for CatalogVertexScanExec {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("CatalogVertexScanExec")
113            .field("label_name", &self.label_name)
114            .field(
115                "virtual_label_id",
116                &format_args!("{:#x}", self.virtual_label_id),
117            )
118            .field("variable", &self.variable)
119            .field("properties", &self.properties)
120            .field("pushdown_filters", &self.pushdown_filters.len())
121            .field("pushdown_limit", &self.pushdown_limit)
122            .finish()
123    }
124}
125
126impl CatalogVertexScanExec {
127    /// Construct a new catalog-backed vertex scan.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the catalog table's schema contains a column
132    /// whose name starts with `_` (reserved for synthesized graph-row
133    /// system columns).
134    pub fn try_new(
135        table: Arc<dyn CatalogTable>,
136        virtual_label_id: u16,
137        label_name: impl Into<String>,
138        variable: impl Into<String>,
139        properties: Vec<String>,
140        pushdown_filters: Vec<DfExpr>,
141        pushdown_limit: Option<usize>,
142    ) -> anyhow::Result<Self> {
143        let label_name = label_name.into();
144        let variable = variable.into();
145        let table_schema = table.schema();
146        if let Err(bad) = check_no_reserved_columns(&table_schema) {
147            return Err(anyhow::anyhow!(
148                "CatalogTable for label `{label_name}` declares reserved column \
149                 `{bad}` (names starting with `_` are synthesized by the graph-row \
150                 adapter — rename it in the underlying table)"
151            ));
152        }
153        let schema = Self::build_output_schema(&variable, &properties, &table_schema);
154        let properties_plan = compute_plan_properties(schema.clone());
155        Ok(Self {
156            table,
157            virtual_label_id,
158            label_name,
159            variable,
160            properties,
161            pushdown_filters,
162            pushdown_limit,
163            schema,
164            properties_plan,
165            metrics: ExecutionPlanMetricsSet::new(),
166        })
167    }
168
169    fn build_output_schema(
170        variable: &str,
171        properties: &[String],
172        table_schema: &SchemaRef,
173    ) -> SchemaRef {
174        let mut fields = vec![
175            Field::new(format!("{variable}._vid"), DataType::UInt64, false),
176            Field::new(format!("{variable}._labels"), labels_data_type(), false),
177        ];
178        let table_by_name: HashMap<&str, &Field> = table_schema
179            .fields()
180            .iter()
181            .map(|f| (f.name().as_str(), f.as_ref()))
182            .collect();
183        for prop in properties {
184            let col_name = format!("{variable}.{prop}");
185            let (dtype, nullable) = match table_by_name.get(prop.as_str()) {
186                Some(f) => (f.data_type().clone(), true),
187                None => (DataType::Utf8, true),
188            };
189            fields.push(Field::new(&col_name, dtype, nullable));
190        }
191        Arc::new(Schema::new(fields))
192    }
193}
194
195impl DisplayAs for CatalogVertexScanExec {
196    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        write!(
198            f,
199            "CatalogVertexScanExec: label={}, virtual_id={:#x}, variable={}, props={:?}",
200            self.label_name, self.virtual_label_id, self.variable, self.properties
201        )?;
202        if !self.pushdown_filters.is_empty() {
203            write!(f, ", filters={}", self.pushdown_filters.len())?;
204        }
205        if let Some(lim) = self.pushdown_limit {
206            write!(f, ", limit={lim}")?;
207        }
208        Ok(())
209    }
210}
211
212impl ExecutionPlan for CatalogVertexScanExec {
213    fn name(&self) -> &str {
214        "CatalogVertexScanExec"
215    }
216
217    fn as_any(&self) -> &dyn Any {
218        self
219    }
220
221    fn schema(&self) -> SchemaRef {
222        self.schema.clone()
223    }
224
225    fn properties(&self) -> &Arc<PlanProperties> {
226        &self.properties_plan
227    }
228
229    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
230        vec![]
231    }
232
233    fn with_new_children(
234        self: Arc<Self>,
235        children: Vec<Arc<dyn ExecutionPlan>>,
236    ) -> DFResult<Arc<dyn ExecutionPlan>> {
237        if !children.is_empty() {
238            return Err(datafusion::error::DataFusionError::Plan(
239                "CatalogVertexScanExec has no children".into(),
240            ));
241        }
242        Ok(self)
243    }
244
245    fn execute(
246        &self,
247        partition: usize,
248        _context: Arc<TaskContext>,
249    ) -> DFResult<SendableRecordBatchStream> {
250        let metrics = BaselineMetrics::new(&self.metrics, partition);
251        // Build projection: indices into the catalog table's schema for
252        // every property name we actually want. Properties the table
253        // doesn't expose are populated as null columns by the adapter
254        // (no projection index).
255        let table_schema = self.table.schema();
256        let projection: Vec<usize> = self
257            .properties
258            .iter()
259            .filter_map(|p| table_schema.index_of(p).ok())
260            .collect();
261        let projection_opt = if projection.is_empty() {
262            None
263        } else {
264            Some(projection.as_slice())
265        };
266        let stream = self
267            .table
268            .scan(projection_opt, &self.pushdown_filters, self.pushdown_limit)
269            .map_err(|e| {
270                datafusion::error::DataFusionError::Execution(format!(
271                    "CatalogTable::scan failed: {e}"
272                ))
273            })?;
274        Ok(Box::pin(VertexAdapterStream {
275            inner: stream,
276            output_schema: self.schema.clone(),
277            virtual_label_id: self.virtual_label_id,
278            label_name: self.label_name.clone(),
279            variable: self.variable.clone(),
280            properties: self.properties.clone(),
281            next_offset: AtomicU64::new(0),
282            metrics,
283        }))
284    }
285
286    fn metrics(&self) -> Option<MetricsSet> {
287        Some(self.metrics.clone_inner())
288    }
289}
290
291struct VertexAdapterStream {
292    inner: SendableRecordBatchStream,
293    output_schema: SchemaRef,
294    virtual_label_id: u16,
295    label_name: String,
296    variable: String,
297    properties: Vec<String>,
298    next_offset: AtomicU64,
299    metrics: BaselineMetrics,
300}
301
302impl RecordBatchStream for VertexAdapterStream {
303    fn schema(&self) -> SchemaRef {
304        self.output_schema.clone()
305    }
306}
307
308impl Stream for VertexAdapterStream {
309    type Item = DFResult<RecordBatch>;
310
311    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        match Pin::new(&mut self.inner).poll_next(cx) {
313            Poll::Pending => Poll::Pending,
314            Poll::Ready(None) => Poll::Ready(None),
315            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
316            Poll::Ready(Some(Ok(batch))) => {
317                let row_count = batch.num_rows();
318                let base = virtual_vid_base(self.virtual_label_id)
319                    | self
320                        .next_offset
321                        .fetch_add(row_count as u64, Ordering::SeqCst);
322                let adapted = adapt_vertex_batch(
323                    &batch,
324                    &self.output_schema,
325                    base,
326                    &self.label_name,
327                    &self.variable,
328                    &self.properties,
329                );
330                self.metrics.record_output(row_count);
331                Poll::Ready(Some(adapted))
332            }
333        }
334    }
335}
336
337/// Build a graph-row-shaped batch from the catalog table's batch. The
338/// `vid_start` is the value of `_vid` for the first row.
339fn adapt_vertex_batch(
340    in_batch: &RecordBatch,
341    output_schema: &SchemaRef,
342    vid_start: u64,
343    label_name: &str,
344    variable: &str,
345    properties: &[String],
346) -> DFResult<RecordBatch> {
347    let n = in_batch.num_rows();
348    let vid_array: ArrayRef = Arc::new(UInt64Array::from_iter_values(
349        (0..n as u64).map(|i| vid_start + i),
350    ));
351    let labels_array: ArrayRef = {
352        let mut b = ListBuilder::new(arrow_array::builder::StringBuilder::new());
353        for _ in 0..n {
354            b.values().append_value(label_name);
355            b.append(true);
356        }
357        Arc::new(b.finish())
358    };
359    let in_schema = in_batch.schema();
360    let in_by_name: HashMap<&str, ArrayRef> = in_schema
361        .fields()
362        .iter()
363        .enumerate()
364        .map(|(i, f)| (f.name().as_str(), in_batch.column(i).clone()))
365        .collect();
366    let _ = variable; // already embedded in output_schema field names
367    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
368    columns.push(vid_array);
369    columns.push(labels_array);
370    for prop in properties {
371        let col = in_by_name
372            .get(prop.as_str())
373            .cloned()
374            .unwrap_or_else(|| Arc::new(StringArray::new_null(n)));
375        columns.push(col);
376    }
377    RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| {
378        datafusion::error::DataFusionError::Execution(format!(
379            "CatalogVertexScanExec: failed to assemble adapted batch: {e}"
380        ))
381    })
382}
383
384// ── Edge scan ────────────────────────────────────────────────────────
385
386/// Adapts a virtual-edge-type `CatalogTable` into a graph-row-shaped
387/// edge scan. The table MUST declare `src_id` and `dst_id` columns;
388/// `_eid` is synthesized per row from the virtual edge-type id.
389pub struct CatalogEdgeScanExec {
390    table: Arc<dyn CatalogTable>,
391    virtual_type_id: u32,
392    type_name: String,
393    variable: String,
394    properties: Vec<String>,
395    pushdown_filters: Vec<DfExpr>,
396    pushdown_limit: Option<usize>,
397    schema: SchemaRef,
398    properties_plan: Arc<PlanProperties>,
399    metrics: ExecutionPlanMetricsSet,
400}
401
402impl fmt::Debug for CatalogEdgeScanExec {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        f.debug_struct("CatalogEdgeScanExec")
405            .field("type_name", &self.type_name)
406            .field(
407                "virtual_type_id",
408                &format_args!("{:#x}", self.virtual_type_id),
409            )
410            .field("variable", &self.variable)
411            .field("properties", &self.properties)
412            .finish()
413    }
414}
415
416impl CatalogEdgeScanExec {
417    /// # Errors
418    /// Returns an error if the table's schema lacks `src_id`/`dst_id`,
419    /// or if it declares a column whose name starts with `_`.
420    pub fn try_new(
421        table: Arc<dyn CatalogTable>,
422        virtual_type_id: u32,
423        type_name: impl Into<String>,
424        variable: impl Into<String>,
425        properties: Vec<String>,
426        pushdown_filters: Vec<DfExpr>,
427        pushdown_limit: Option<usize>,
428    ) -> anyhow::Result<Self> {
429        let type_name = type_name.into();
430        let variable = variable.into();
431        let table_schema = table.schema();
432        if let Err(bad) = check_no_reserved_columns(&table_schema) {
433            return Err(anyhow::anyhow!(
434                "CatalogTable for edge type `{type_name}` declares reserved column \
435                 `{bad}` (names starting with `_` are synthesized by the graph-row adapter)"
436            ));
437        }
438        for required in ["src_id", "dst_id"] {
439            if table_schema.index_of(required).is_err() {
440                return Err(anyhow::anyhow!(
441                    "CatalogTable for edge type `{type_name}` must declare a \
442                     `{required}` column (mapped to `_{}_vid` in the graph-row \
443                     adapter)",
444                    if required == "src_id" { "src" } else { "dst" }
445                ));
446            }
447        }
448        let schema = Self::build_output_schema(&variable, &properties, &table_schema);
449        let properties_plan = compute_plan_properties(schema.clone());
450        Ok(Self {
451            table,
452            virtual_type_id,
453            type_name,
454            variable,
455            properties,
456            pushdown_filters,
457            pushdown_limit,
458            schema,
459            properties_plan,
460            metrics: ExecutionPlanMetricsSet::new(),
461        })
462    }
463
464    fn build_output_schema(
465        variable: &str,
466        properties: &[String],
467        table_schema: &SchemaRef,
468    ) -> SchemaRef {
469        let mut fields = vec![
470            Field::new(format!("{variable}._eid"), DataType::UInt64, false),
471            Field::new(format!("{variable}._src_vid"), DataType::UInt64, false),
472            Field::new(format!("{variable}._dst_vid"), DataType::UInt64, false),
473        ];
474        let table_by_name: HashMap<&str, &Field> = table_schema
475            .fields()
476            .iter()
477            .map(|f| (f.name().as_str(), f.as_ref()))
478            .collect();
479        for prop in properties {
480            if prop == "src_id" || prop == "dst_id" {
481                // These are surfaced via the synthesized `_src_vid` /
482                // `_dst_vid` system columns; don't double-project.
483                continue;
484            }
485            let col_name = format!("{variable}.{prop}");
486            let (dtype, nullable) = match table_by_name.get(prop.as_str()) {
487                Some(f) => (f.data_type().clone(), true),
488                None => (DataType::Utf8, true),
489            };
490            fields.push(Field::new(&col_name, dtype, nullable));
491        }
492        Arc::new(Schema::new(fields))
493    }
494}
495
496impl DisplayAs for CatalogEdgeScanExec {
497    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498        write!(
499            f,
500            "CatalogEdgeScanExec: type={}, virtual_id={:#x}, variable={}, props={:?}",
501            self.type_name, self.virtual_type_id, self.variable, self.properties
502        )
503    }
504}
505
506impl ExecutionPlan for CatalogEdgeScanExec {
507    fn name(&self) -> &str {
508        "CatalogEdgeScanExec"
509    }
510    fn as_any(&self) -> &dyn Any {
511        self
512    }
513    fn schema(&self) -> SchemaRef {
514        self.schema.clone()
515    }
516    fn properties(&self) -> &Arc<PlanProperties> {
517        &self.properties_plan
518    }
519    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
520        vec![]
521    }
522    fn with_new_children(
523        self: Arc<Self>,
524        children: Vec<Arc<dyn ExecutionPlan>>,
525    ) -> DFResult<Arc<dyn ExecutionPlan>> {
526        if !children.is_empty() {
527            return Err(datafusion::error::DataFusionError::Plan(
528                "CatalogEdgeScanExec has no children".into(),
529            ));
530        }
531        Ok(self)
532    }
533    fn execute(
534        &self,
535        partition: usize,
536        _context: Arc<TaskContext>,
537    ) -> DFResult<SendableRecordBatchStream> {
538        let metrics = BaselineMetrics::new(&self.metrics, partition);
539        let table_schema = self.table.schema();
540        // The projection must include src_id/dst_id (so the adapter
541        // can populate _src_vid/_dst_vid) plus the requested
542        // properties. Build the projection list deterministically.
543        let mut wanted: Vec<&str> = vec!["src_id", "dst_id"];
544        for p in &self.properties {
545            if p != "src_id" && p != "dst_id" {
546                wanted.push(p.as_str());
547            }
548        }
549        let projection: Vec<usize> = wanted
550            .iter()
551            .filter_map(|p| table_schema.index_of(p).ok())
552            .collect();
553        let projection_opt = if projection.is_empty() {
554            None
555        } else {
556            Some(projection.as_slice())
557        };
558        let stream = self
559            .table
560            .scan(projection_opt, &self.pushdown_filters, self.pushdown_limit)
561            .map_err(|e| {
562                datafusion::error::DataFusionError::Execution(format!(
563                    "CatalogTable::scan failed: {e}"
564                ))
565            })?;
566        Ok(Box::pin(EdgeAdapterStream {
567            inner: stream,
568            output_schema: self.schema.clone(),
569            virtual_type_id: self.virtual_type_id,
570            variable: self.variable.clone(),
571            properties: self.properties.clone(),
572            next_offset: AtomicU64::new(0),
573            metrics,
574        }))
575    }
576    fn metrics(&self) -> Option<MetricsSet> {
577        Some(self.metrics.clone_inner())
578    }
579}
580
581struct EdgeAdapterStream {
582    inner: SendableRecordBatchStream,
583    output_schema: SchemaRef,
584    virtual_type_id: u32,
585    variable: String,
586    properties: Vec<String>,
587    next_offset: AtomicU64,
588    metrics: BaselineMetrics,
589}
590
591impl RecordBatchStream for EdgeAdapterStream {
592    fn schema(&self) -> SchemaRef {
593        self.output_schema.clone()
594    }
595}
596
597impl Stream for EdgeAdapterStream {
598    type Item = DFResult<RecordBatch>;
599    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
600        match Pin::new(&mut self.inner).poll_next(cx) {
601            Poll::Pending => Poll::Pending,
602            Poll::Ready(None) => Poll::Ready(None),
603            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
604            Poll::Ready(Some(Ok(batch))) => {
605                let row_count = batch.num_rows();
606                let base = ((self.virtual_type_id as u64) << 32)
607                    | self
608                        .next_offset
609                        .fetch_add(row_count as u64, Ordering::SeqCst);
610                let adapted = adapt_edge_batch(
611                    &batch,
612                    &self.output_schema,
613                    base,
614                    &self.variable,
615                    &self.properties,
616                );
617                self.metrics.record_output(row_count);
618                Poll::Ready(Some(adapted))
619            }
620        }
621    }
622}
623
624fn adapt_edge_batch(
625    in_batch: &RecordBatch,
626    output_schema: &SchemaRef,
627    eid_start: u64,
628    variable: &str,
629    properties: &[String],
630) -> DFResult<RecordBatch> {
631    use arrow_array::cast::AsArray;
632    use arrow_array::types::Int64Type;
633    let n = in_batch.num_rows();
634    let eid: ArrayRef = Arc::new(UInt64Array::from_iter_values(
635        (0..n as u64).map(|i| eid_start + i),
636    ));
637    let in_schema = in_batch.schema();
638    let in_by_name: HashMap<&str, ArrayRef> = in_schema
639        .fields()
640        .iter()
641        .enumerate()
642        .map(|(i, f)| (f.name().as_str(), in_batch.column(i).clone()))
643        .collect();
644    let to_u64 = |arr: &ArrayRef| -> DFResult<ArrayRef> {
645        match arr.data_type() {
646            DataType::UInt64 => Ok(arr.clone()),
647            DataType::Int64 => {
648                let a = arr.as_primitive::<Int64Type>();
649                Ok(Arc::new(UInt64Array::from_iter_values(
650                    (0..a.len()).map(|i| a.value(i) as u64),
651                )))
652            }
653            DataType::UInt32 => {
654                let a = arr.as_primitive::<arrow_array::types::UInt32Type>();
655                Ok(Arc::new(UInt64Array::from_iter_values(
656                    (0..a.len()).map(|i| u64::from(a.value(i))),
657                )))
658            }
659            other => Err(datafusion::error::DataFusionError::Execution(format!(
660                "CatalogEdgeScanExec: src_id/dst_id must be Int64/UInt64/UInt32, got {other:?}"
661            ))),
662        }
663    };
664    let src_arr = in_by_name.get("src_id").ok_or_else(|| {
665        datafusion::error::DataFusionError::Execution("missing src_id column".into())
666    })?;
667    let dst_arr = in_by_name.get("dst_id").ok_or_else(|| {
668        datafusion::error::DataFusionError::Execution("missing dst_id column".into())
669    })?;
670    let src_vid = to_u64(src_arr)?;
671    let dst_vid = to_u64(dst_arr)?;
672
673    let _ = variable;
674    let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
675    columns.push(eid);
676    columns.push(src_vid);
677    columns.push(dst_vid);
678    for prop in properties {
679        if prop == "src_id" || prop == "dst_id" {
680            continue;
681        }
682        let col = in_by_name
683            .get(prop.as_str())
684            .cloned()
685            .unwrap_or_else(|| Arc::new(StringArray::new_null(n)));
686        columns.push(col);
687    }
688    RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| {
689        datafusion::error::DataFusionError::Execution(format!(
690            "CatalogEdgeScanExec: failed to assemble adapted batch: {e}"
691        ))
692    })
693}