use crate::query::datetime::parse_datetime_utc;
use crate::query::df_graph::GraphExecutionContext;
use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
use arrow_array::builder::{
BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
};
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
use chrono::{NaiveDate, NaiveTime, Timelike};
use datafusion::common::Result as DFResult;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::Stream;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use uni_common::Properties;
use uni_common::Value;
use uni_common::core::id::Vid;
use uni_common::core::schema::Schema as UniSchema;
pub struct GraphScanExec {
graph_ctx: Arc<GraphExecutionContext>,
label: String,
variable: String,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
is_edge_scan: bool,
is_schemaless: bool,
schema: SchemaRef,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphScanExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphScanExec")
.field("label", &self.label)
.field("variable", &self.variable)
.field("projected_properties", &self.projected_properties)
.field("is_edge_scan", &self.is_edge_scan)
.finish()
}
}
impl GraphScanExec {
pub fn new_vertex_scan(
graph_ctx: Arc<GraphExecutionContext>,
label: impl Into<String>,
variable: impl Into<String>,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
let label = label.into();
let variable = variable.into();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let schema =
Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
let properties = compute_plan_properties(schema.clone());
Self {
graph_ctx,
label,
variable,
projected_properties,
filter,
is_edge_scan: false,
is_schemaless: false,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn new_schemaless_vertex_scan(
graph_ctx: Arc<GraphExecutionContext>,
label_name: impl Into<String>,
variable: impl Into<String>,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
let label = label_name.into();
let variable = variable.into();
let projected_properties: Vec<String> = projected_properties
.into_iter()
.filter(|p| p != "_vid" && p != "_labels")
.collect();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let schema =
Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
let properties = compute_plan_properties(schema.clone());
Self {
graph_ctx,
label,
variable,
projected_properties,
filter,
is_edge_scan: false,
is_schemaless: true,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn new_multi_label_vertex_scan(
graph_ctx: Arc<GraphExecutionContext>,
labels: Vec<String>,
variable: impl Into<String>,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
let variable = variable.into();
let projected_properties: Vec<String> = projected_properties
.into_iter()
.filter(|p| p != "_vid" && p != "_labels")
.collect();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let schema =
Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
let properties = compute_plan_properties(schema.clone());
let encoded_labels = labels.join(":");
Self {
graph_ctx,
label: encoded_labels,
variable,
projected_properties,
filter,
is_edge_scan: false,
is_schemaless: true,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn new_schemaless_all_scan(
graph_ctx: Arc<GraphExecutionContext>,
variable: impl Into<String>,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
let variable = variable.into();
let projected_properties: Vec<String> = projected_properties
.into_iter()
.filter(|p| p != "_vid" && p != "_labels")
.collect();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let schema =
Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
let properties = compute_plan_properties(schema.clone());
Self {
graph_ctx,
label: String::new(), variable,
projected_properties,
filter,
is_edge_scan: false,
is_schemaless: true,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_schemaless_vertex_schema(
variable: &str,
properties: &[String],
uni_schema: &uni_common::core::schema::Schema,
) -> SchemaRef {
let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
std::collections::HashMap::new();
for label_props in uni_schema.properties.values() {
for (name, meta) in label_props {
merged.entry(name.as_str()).or_insert(meta);
}
}
let mut fields = vec![
Field::new(format!("{}._vid", variable), DataType::UInt64, false),
Field::new(format!("{}._labels", variable), labels_data_type(), true),
];
for prop in properties {
let col_name = format!("{}.{}", variable, prop);
let arrow_type = merged
.get(prop.as_str())
.map(|meta| meta.r#type.to_arrow())
.unwrap_or(DataType::LargeBinary);
fields.push(Field::new(&col_name, arrow_type, true));
}
Arc::new(Schema::new(fields))
}
pub fn new_edge_scan(
graph_ctx: Arc<GraphExecutionContext>,
edge_type: impl Into<String>,
variable: impl Into<String>,
projected_properties: Vec<String>,
filter: Option<Arc<dyn PhysicalExpr>>,
) -> Self {
let label = edge_type.into();
let variable = variable.into();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
let properties = compute_plan_properties(schema.clone());
Self {
graph_ctx,
label,
variable,
projected_properties,
filter,
is_edge_scan: true,
is_schemaless: false,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_vertex_schema(
variable: &str,
label: &str,
properties: &[String],
uni_schema: &UniSchema,
) -> SchemaRef {
let mut fields = vec![
Field::new(format!("{}._vid", variable), DataType::UInt64, false),
Field::new(format!("{}._labels", variable), labels_data_type(), true),
];
let label_props = uni_schema.properties.get(label);
for prop in properties {
let col_name = format!("{}.{}", variable, prop);
let arrow_type = resolve_property_type(prop, label_props);
fields.push(Field::new(&col_name, arrow_type, true));
}
Arc::new(Schema::new(fields))
}
fn build_edge_schema(
variable: &str,
edge_type: &str,
properties: &[String],
uni_schema: &UniSchema,
) -> SchemaRef {
let mut fields = vec![
Field::new(format!("{}._eid", variable), DataType::UInt64, false),
Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
];
let edge_props = uni_schema.properties.get(edge_type);
for prop in properties {
let col_name = format!("{}.{}", variable, prop);
let arrow_type = resolve_property_type(prop, edge_props);
fields.push(Field::new(&col_name, arrow_type, true));
}
Arc::new(Schema::new(fields))
}
}
impl DisplayAs for GraphScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
write!(
f,
"GraphScanExec: {}={}, properties={:?}",
scan_type, self.label, self.projected_properties
)?;
if self.filter.is_some() {
write!(f, ", filter=<pushed>")?;
}
Ok(())
}
}
impl ExecutionPlan for GraphScanExec {
fn name(&self) -> &str {
"GraphScanExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(datafusion::error::DataFusionError::Plan(
"GraphScanExec does not accept children".to_string(),
))
}
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(GraphScanStream::new(
self.graph_ctx.clone(),
self.label.clone(),
self.variable.clone(),
self.projected_properties.clone(),
self.is_edge_scan,
self.is_schemaless,
self.schema.clone(),
metrics,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
enum GraphScanState {
Init,
Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
Done,
}
struct GraphScanStream {
graph_ctx: Arc<GraphExecutionContext>,
label: String,
variable: String,
properties: Vec<String>,
is_edge_scan: bool,
is_schemaless: bool,
schema: SchemaRef,
state: GraphScanState,
metrics: BaselineMetrics,
}
impl GraphScanStream {
#[expect(clippy::too_many_arguments)]
fn new(
graph_ctx: Arc<GraphExecutionContext>,
label: String,
variable: String,
properties: Vec<String>,
is_edge_scan: bool,
is_schemaless: bool,
schema: SchemaRef,
metrics: BaselineMetrics,
) -> Self {
Self {
graph_ctx,
label,
variable,
properties,
is_edge_scan,
is_schemaless,
schema,
state: GraphScanState::Init,
metrics,
}
}
}
pub(crate) fn resolve_property_type(
prop: &str,
schema_props: Option<
&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
>,
) -> DataType {
if prop == "overflow_json" {
DataType::LargeBinary
} else {
schema_props
.and_then(|props| props.get(prop))
.map(|meta| meta.r#type.to_arrow())
.unwrap_or(DataType::LargeBinary)
}
}
#[cfg(test)]
fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
mvcc_dedup_batch_by(batch, "_vid")
}
fn mvcc_dedup_to_option(
batch: Option<RecordBatch>,
id_column: &str,
) -> DFResult<Option<RecordBatch>> {
match batch {
Some(b) => {
let deduped = mvcc_dedup_batch_by(&b, id_column)?;
Ok(if deduped.num_rows() > 0 {
Some(deduped)
} else {
None
})
}
None => Ok(None),
}
}
fn merge_lance_and_l0(
lance_deduped: Option<RecordBatch>,
l0_batch: RecordBatch,
internal_schema: &SchemaRef,
id_column: &str,
) -> DFResult<Option<RecordBatch>> {
let has_l0 = l0_batch.num_rows() > 0;
match (lance_deduped, has_l0) {
(Some(lance), true) => {
let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
.map_err(arrow_err)?;
Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
}
(Some(lance), false) => Ok(Some(lance)),
(None, true) => Ok(Some(l0_batch)),
(None, false) => Ok(None),
}
}
fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
if !columns.iter().any(|c| c == col_name) {
columns.push(col_name.to_string());
}
}
fn extract_from_overflow_blob(
overflow_arr: Option<&arrow_array::LargeBinaryArray>,
row: usize,
prop: &str,
) -> Option<Vec<u8>> {
let arr = overflow_arr?;
if arr.is_null(row) {
return None;
}
uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
}
fn build_overflow_property_column(
num_rows: usize,
vid_arr: &UInt64Array,
overflow_arr: Option<&arrow_array::LargeBinaryArray>,
prop: &str,
l0_ctx: &crate::query::df_graph::L0Context,
) -> ArrayRef {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for i in 0..num_rows {
let vid = Vid::from(vid_arr.value(i));
let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
if let Some(val_opt) = l0_val {
append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
} else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
builder.append_value(&bytes);
} else {
builder.append_null();
}
}
Arc::new(builder.finish())
}
fn resolve_l0_property(
vid: &Vid,
prop: &str,
l0_ctx: &crate::query::df_graph::L0Context,
) -> Option<Option<Value>> {
let mut result = None;
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(props) = guard.vertex_properties.get(vid)
&& let Some(val) = props.get(prop)
{
result = Some(Some(val.clone()));
}
}
result
}
fn append_value_as_cypher_binary(
builder: &mut arrow_array::builder::LargeBinaryBuilder,
val: Option<&Value>,
) {
match val {
Some(v) if !v.is_null() => {
let json_val: serde_json::Value = v.clone().into();
match encode_cypher_value(&json_val) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
fn build_all_props_column_with_l0_overlay(
num_rows: usize,
vid_arr: &UInt64Array,
props_arr: Option<&arrow_array::LargeBinaryArray>,
l0_ctx: &crate::query::df_graph::L0Context,
) -> ArrayRef {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for i in 0..num_rows {
let vid = Vid::from(vid_arr.value(i));
let mut merged_props = serde_json::Map::new();
if let Some(arr) = props_arr
&& !arr.is_null(i)
&& let Ok(uni_common::Value::Map(map)) =
uni_common::cypher_value_codec::decode(arr.value(i))
{
for (k, v) in map {
let json_val: serde_json::Value = v.into();
merged_props.insert(k, json_val);
}
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_props) = guard.vertex_properties.get(&vid) {
for (k, v) in l0_props {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.clone(), json_val);
}
}
}
if merged_props.is_empty() {
builder.append_null();
} else {
let json_obj = serde_json::Value::Object(merged_props);
match encode_cypher_value(&json_obj) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
Arc::new(builder.finish())
}
fn build_all_props_column_for_schema_scan(
batch: &RecordBatch,
vid_arr: &UInt64Array,
overflow_arr: Option<&arrow_array::LargeBinaryArray>,
projected_properties: &[String],
l0_ctx: &crate::query::df_graph::L0Context,
) -> ArrayRef {
let schema_props: Vec<&str> = projected_properties
.iter()
.filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
.map(String::as_str)
.collect();
let num_rows = batch.num_rows();
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for i in 0..num_rows {
let vid = Vid::from(vid_arr.value(i));
let mut merged_props = serde_json::Map::new();
for &prop in &schema_props {
if let Some(col) = batch.column_by_name(prop) {
let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
if !val.is_null() {
let json_val: serde_json::Value = val.into();
merged_props.insert(prop.to_string(), json_val);
}
}
}
if let Some(arr) = overflow_arr
&& !arr.is_null(i)
&& let Ok(uni_common::Value::Map(map)) =
uni_common::cypher_value_codec::decode(arr.value(i))
{
for (k, v) in map {
let json_val: serde_json::Value = v.into();
merged_props.insert(k, json_val);
}
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_props) = guard.vertex_properties.get(&vid) {
for (k, v) in l0_props {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.clone(), json_val);
}
}
}
if merged_props.is_empty() {
builder.append_null();
} else {
let json_obj = serde_json::Value::Object(merged_props);
match encode_cypher_value(&json_obj) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
Arc::new(builder.finish())
}
fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(batch.clone());
}
let id_col = batch
.column_by_name(id_column)
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
})?
.clone();
let version_col = batch
.column_by_name("_version")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
})?
.clone();
let sort_columns = vec![
arrow::compute::SortColumn {
values: id_col,
options: Some(arrow::compute::SortOptions {
descending: false,
nulls_first: false,
}),
},
arrow::compute::SortColumn {
values: version_col,
options: Some(arrow::compute::SortOptions {
descending: true,
nulls_first: false,
}),
},
];
let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
let sorted_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|col| arrow::compute::take(col.as_ref(), &indices, None))
.collect::<Result<_, _>>()
.map_err(arrow_err)?;
let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
let sorted_id = sorted
.column_by_name(id_column)
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let mut keep = vec![false; sorted.num_rows()];
if !keep.is_empty() {
keep[0] = true;
for (i, flag) in keep.iter_mut().enumerate().skip(1) {
if sorted_id.value(i) != sorted_id.value(i - 1) {
*flag = true;
}
}
}
let mask = arrow_array::BooleanArray::from(keep);
arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
}
fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(batch.clone());
}
let op_col = match batch.column_by_name("op") {
Some(col) => col
.as_any()
.downcast_ref::<arrow_array::UInt8Array>()
.unwrap(),
None => return Ok(batch.clone()),
};
let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
let mask = arrow_array::BooleanArray::from(keep);
arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
}
fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(batch.clone());
}
let deleted_col = match batch.column_by_name("_deleted") {
Some(col) => col
.as_any()
.downcast_ref::<arrow_array::BooleanArray>()
.unwrap(),
None => return Ok(batch.clone()),
};
let keep: Vec<bool> = (0..deleted_col.len())
.map(|i| !deleted_col.value(i))
.collect();
let mask = arrow_array::BooleanArray::from(keep);
arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
}
fn filter_l0_tombstones(
batch: &RecordBatch,
l0_ctx: &crate::query::df_graph::L0Context,
) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(batch.clone());
}
let mut tombstones: HashSet<u64> = HashSet::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
for vid in guard.vertex_tombstones.iter() {
tombstones.insert(vid.as_u64());
}
}
if tombstones.is_empty() {
return Ok(batch.clone());
}
let vid_col = batch
.column_by_name("_vid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
})?
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let keep: Vec<bool> = (0..vid_col.len())
.map(|i| !tombstones.contains(&vid_col.value(i)))
.collect();
let mask = arrow_array::BooleanArray::from(keep);
arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
}
fn filter_l0_edge_tombstones(
batch: &RecordBatch,
l0_ctx: &crate::query::df_graph::L0Context,
) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(batch.clone());
}
let mut tombstones: HashSet<u64> = HashSet::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
for eid in guard.tombstones.keys() {
tombstones.insert(eid.as_u64());
}
}
if tombstones.is_empty() {
return Ok(batch.clone());
}
let eid_col = batch
.column_by_name("eid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
})?
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let keep: Vec<bool> = (0..eid_col.len())
.map(|i| !tombstones.contains(&eid_col.value(i)))
.collect();
let mask = arrow_array::BooleanArray::from(keep);
arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
}
fn build_l0_vertex_batch(
l0_ctx: &crate::query::df_graph::L0Context,
label: &str,
lance_schema: &SchemaRef,
label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
) -> DFResult<RecordBatch> {
let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
for vid in guard.vertex_tombstones.iter() {
tombstones.insert(vid.as_u64());
}
for vid in guard.vids_for_label(label) {
let vid_u64 = vid.as_u64();
if tombstones.contains(&vid_u64) {
continue;
}
let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
let entry = vid_data
.entry(vid_u64)
.or_insert_with(|| (Properties::new(), 0));
if let Some(props) = guard.vertex_properties.get(&vid) {
for (k, v) in props {
entry.0.insert(k.clone(), v.clone());
}
}
if version > entry.1 {
entry.1 = version;
}
}
}
for t in &tombstones {
vid_data.remove(t);
}
if vid_data.is_empty() {
return Ok(RecordBatch::new_empty(lance_schema.clone()));
}
let mut vids: Vec<u64> = vid_data.keys().copied().collect();
vids.sort_unstable();
let num_rows = vids.len();
let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
let schema_prop_names: HashSet<&str> = label_props
.map(|lp| lp.keys().map(|k| k.as_str()).collect())
.unwrap_or_default();
for field in lance_schema.fields() {
let col_name = field.name().as_str();
match col_name {
"_vid" => {
columns.push(Arc::new(UInt64Array::from(vids.clone())));
}
"_deleted" => {
let vals = vec![false; num_rows];
columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
}
"_version" => {
let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
columns.push(Arc::new(UInt64Array::from(vals)));
}
"overflow_json" => {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for vid_u64 in &vids {
let (props, _) = &vid_data[vid_u64];
let mut overflow = serde_json::Map::new();
for (k, v) in props {
if k == "ext_id" || k.starts_with('_') {
continue;
}
if !schema_prop_names.contains(k.as_str()) {
let json_val: serde_json::Value = v.clone().into();
overflow.insert(k.clone(), json_val);
}
}
if overflow.is_empty() {
builder.append_null();
} else {
let json_val = serde_json::Value::Object(overflow);
match encode_cypher_value(&json_val) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
columns.push(Arc::new(builder.finish()));
}
_ => {
let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
columns.push(col);
}
}
}
RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
}
fn build_l0_property_column(
vids: &[u64],
vid_data: &HashMap<u64, (Properties, u64)>,
prop_name: &str,
data_type: &DataType,
) -> DFResult<ArrayRef> {
let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
let props_map: HashMap<Vid, Properties> = vid_data
.iter()
.map(|(k, (props, _))| (Vid::from(*k), props.clone()))
.collect();
build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
}
fn build_l0_edge_batch(
l0_ctx: &crate::query::df_graph::L0Context,
edge_type: &str,
internal_schema: &SchemaRef,
type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
) -> DFResult<RecordBatch> {
let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
let mut tombstones: HashSet<u64> = HashSet::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
for eid in guard.tombstones.keys() {
tombstones.insert(eid.as_u64());
}
for eid in guard.eids_for_type(edge_type) {
let eid_u64 = eid.as_u64();
if tombstones.contains(&eid_u64) {
continue;
}
let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
None => continue,
};
let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
let entry = eid_data
.entry(eid_u64)
.or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
if let Some(props) = guard.edge_properties.get(&eid) {
for (k, v) in props {
entry.2.insert(k.clone(), v.clone());
}
}
entry.0 = src_vid;
entry.1 = dst_vid;
if version > entry.3 {
entry.3 = version;
}
}
}
for t in &tombstones {
eid_data.remove(t);
}
if eid_data.is_empty() {
return Ok(RecordBatch::new_empty(internal_schema.clone()));
}
let mut eids: Vec<u64> = eid_data.keys().copied().collect();
eids.sort_unstable();
let num_rows = eids.len();
let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
let schema_prop_names: HashSet<&str> = type_props
.map(|tp| tp.keys().map(|k| k.as_str()).collect())
.unwrap_or_default();
for field in internal_schema.fields() {
let col_name = field.name().as_str();
match col_name {
"eid" => {
columns.push(Arc::new(UInt64Array::from(eids.clone())));
}
"src_vid" => {
let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
columns.push(Arc::new(UInt64Array::from(vals)));
}
"dst_vid" => {
let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
columns.push(Arc::new(UInt64Array::from(vals)));
}
"op" => {
let vals = vec![0u8; num_rows];
columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
}
"_version" => {
let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
columns.push(Arc::new(UInt64Array::from(vals)));
}
"overflow_json" => {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for eid_u64 in &eids {
let (_, _, props, _) = &eid_data[eid_u64];
let mut overflow = serde_json::Map::new();
for (k, v) in props {
if k.starts_with('_') {
continue;
}
if !schema_prop_names.contains(k.as_str()) {
let json_val: serde_json::Value = v.clone().into();
overflow.insert(k.clone(), json_val);
}
}
if overflow.is_empty() {
builder.append_null();
} else {
let json_val = serde_json::Value::Object(overflow);
match encode_cypher_value(&json_val) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
columns.push(Arc::new(builder.finish()));
}
_ => {
let col =
build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
columns.push(col);
}
}
}
RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
}
fn build_l0_edge_property_column(
eids: &[u64],
eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
prop_name: &str,
data_type: &DataType,
) -> DFResult<ArrayRef> {
let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
let props_map: HashMap<Vid, Properties> = eid_data
.iter()
.map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
.collect();
build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
}
fn build_labels_column_for_known_label(
vid_arr: &UInt64Array,
label: &str,
l0_ctx: &crate::query::df_graph::L0Context,
batch_labels_col: Option<&arrow_array::ListArray>,
) -> DFResult<ArrayRef> {
use uni_store::storage::arrow_convert::labels_from_list_array;
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for i in 0..vid_arr.len() {
let vid = Vid::from(vid_arr.value(i));
let mut labels = match batch_labels_col {
Some(list_arr) => {
let stored = labels_from_list_array(list_arr, i);
if stored.is_empty() {
vec![label.to_string()]
} else {
stored
}
}
None => vec![label.to_string()],
};
if !labels.iter().any(|l| l == label) {
labels.push(label.to_string());
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
for lbl in l0_labels {
if !labels.contains(lbl) {
labels.push(lbl.clone());
}
}
}
}
let values = labels_builder.values();
for lbl in &labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
Ok(Arc::new(labels_builder.finish()))
}
fn map_to_output_schema(
batch: &RecordBatch,
label: &str,
_variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
l0_ctx: &crate::query::df_graph::L0Context,
) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
let vid_col = batch
.column_by_name("_vid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
})?
.clone();
let vid_arr = vid_col
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
})?;
let batch_labels_col = batch
.column_by_name("_labels")
.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
columns.push(vid_col.clone());
columns.push(labels_col);
let overflow_arr = batch
.column_by_name("overflow_json")
.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
for prop in projected_properties {
if prop == "overflow_json" {
match batch.column_by_name("overflow_json") {
Some(col) => columns.push(col.clone()),
None => {
columns.push(arrow_array::new_null_array(
&DataType::LargeBinary,
batch.num_rows(),
));
}
}
} else if prop == "_all_props" {
let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
let guard = l0.read();
!guard.vertex_properties.is_empty()
});
let has_schema_cols = projected_properties
.iter()
.any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
if !any_l0_has_vertex_props && !has_schema_cols {
match batch.column_by_name("overflow_json") {
Some(col) => columns.push(col.clone()),
None => {
columns.push(arrow_array::new_null_array(
&DataType::LargeBinary,
batch.num_rows(),
));
}
}
} else {
let col = build_all_props_column_for_schema_scan(
batch,
vid_arr,
overflow_arr,
projected_properties,
l0_ctx,
);
columns.push(col);
}
} else {
match batch.column_by_name(prop) {
Some(col) => columns.push(col.clone()),
None => {
let col = build_overflow_property_column(
batch.num_rows(),
vid_arr,
overflow_arr,
prop,
l0_ctx,
);
columns.push(col);
}
}
}
}
RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
}
fn map_edge_to_output_schema(
batch: &RecordBatch,
variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
let eid_col = batch
.column_by_name("eid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
})?
.clone();
columns.push(eid_col);
let src_col = batch
.column_by_name("src_vid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
})?
.clone();
columns.push(src_col);
let dst_col = batch
.column_by_name("dst_vid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
})?
.clone();
columns.push(dst_col);
for prop in projected_properties {
if prop == "overflow_json" {
match batch.column_by_name("overflow_json") {
Some(col) => columns.push(col.clone()),
None => {
columns.push(arrow_array::new_null_array(
&DataType::LargeBinary,
batch.num_rows(),
));
}
}
} else {
match batch.column_by_name(prop) {
Some(col) => columns.push(col.clone()),
None => {
let overflow_arr = batch
.column_by_name("overflow_json")
.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
if let Some(arr) = overflow_arr {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for i in 0..batch.num_rows() {
if !arr.is_null(i) {
let blob = arr.value(i);
if let Some(sub_bytes) =
uni_common::cypher_value_codec::extract_map_entry_raw(
blob, prop,
)
{
builder.append_value(&sub_bytes);
} else {
builder.append_null();
}
} else {
builder.append_null();
}
}
columns.push(Arc::new(builder.finish()));
} else {
let target_field = output_schema
.fields()
.iter()
.find(|f| f.name() == &format!("{}.{}", variable, prop));
let dt = target_field
.map(|f| f.data_type().clone())
.unwrap_or(DataType::LargeBinary);
columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
}
}
}
}
}
RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
}
async fn columnar_scan_vertex_batch_static(
graph_ctx: &GraphExecutionContext,
label: &str,
variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
) -> DFResult<RecordBatch> {
let storage = graph_ctx.storage();
let l0_ctx = graph_ctx.l0_context();
let uni_schema = storage.schema_manager().schema();
let label_props = uni_schema.properties.get(label);
let mut lance_columns: Vec<String> = vec![
"_vid".to_string(),
"_deleted".to_string(),
"_version".to_string(),
];
for prop in projected_properties {
if prop == "overflow_json" {
push_column_if_absent(&mut lance_columns, "overflow_json");
} else {
let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
if exists_in_schema {
push_column_if_absent(&mut lance_columns, prop);
}
}
}
let needs_overflow = projected_properties
.iter()
.any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
if needs_overflow {
push_column_if_absent(&mut lance_columns, "overflow_json");
}
let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
let lance_batch = storage
.scan_vertex_table(label, &lance_columns_refs, None)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
let internal_schema = match &lance_deduped {
Some(batch) => batch.schema(),
None => {
let mut fields = vec![
Field::new("_vid", DataType::UInt64, false),
Field::new("_deleted", DataType::Boolean, false),
Field::new("_version", DataType::UInt64, false),
];
for col in &lance_columns {
if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
continue;
}
if col == "overflow_json" {
fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
} else {
let arrow_type = label_props
.and_then(|lp| lp.get(col.as_str()))
.map(|meta| meta.r#type.to_arrow())
.unwrap_or(DataType::LargeBinary);
fields.push(Field::new(col, arrow_type, true));
}
}
Arc::new(Schema::new(fields))
}
};
let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
else {
return Ok(RecordBatch::new_empty(output_schema.clone()));
};
let merged = filter_deleted_rows(&merged)?;
if merged.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
if filtered.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
map_to_output_schema(
&filtered,
label,
variable,
projected_properties,
output_schema,
l0_ctx,
)
}
async fn columnar_scan_edge_batch_static(
graph_ctx: &GraphExecutionContext,
edge_type: &str,
variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
) -> DFResult<RecordBatch> {
let storage = graph_ctx.storage();
let l0_ctx = graph_ctx.l0_context();
let uni_schema = storage.schema_manager().schema();
let type_props = uni_schema.properties.get(edge_type);
let mut lance_columns: Vec<String> = vec![
"eid".to_string(),
"src_vid".to_string(),
"dst_vid".to_string(),
"op".to_string(),
"_version".to_string(),
];
for prop in projected_properties {
if prop == "overflow_json" {
push_column_if_absent(&mut lance_columns, "overflow_json");
} else {
let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
if exists_in_schema {
push_column_if_absent(&mut lance_columns, prop);
}
}
}
let needs_overflow = projected_properties
.iter()
.any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
if needs_overflow {
push_column_if_absent(&mut lance_columns, "overflow_json");
}
let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
let lance_batch = storage
.scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
let internal_schema = match &lance_deduped {
Some(batch) => batch.schema(),
None => {
let mut fields = vec![
Field::new("eid", DataType::UInt64, false),
Field::new("src_vid", DataType::UInt64, false),
Field::new("dst_vid", DataType::UInt64, false),
Field::new("op", DataType::UInt8, false),
Field::new("_version", DataType::UInt64, false),
];
for col in &lance_columns {
if matches!(
col.as_str(),
"eid" | "src_vid" | "dst_vid" | "op" | "_version"
) {
continue;
}
if col == "overflow_json" {
fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
} else {
let arrow_type = type_props
.and_then(|tp| tp.get(col.as_str()))
.map(|meta| meta.r#type.to_arrow())
.unwrap_or(DataType::LargeBinary);
fields.push(Field::new(col, arrow_type, true));
}
}
Arc::new(Schema::new(fields))
}
};
let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
return Ok(RecordBatch::new_empty(output_schema.clone()));
};
let merged = filter_deleted_edge_ops(&merged)?;
if merged.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
if filtered.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
}
async fn columnar_scan_schemaless_vertex_batch_static(
graph_ctx: &GraphExecutionContext,
label: &str,
variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
) -> DFResult<RecordBatch> {
let storage = graph_ctx.storage();
let l0_ctx = graph_ctx.l0_context();
let filter = {
let mut parts = Vec::new();
if !label.is_empty() {
if label.contains(':') {
for lbl in label.split(':') {
parts.push(format!("array_contains(labels, '{}')", lbl));
}
} else {
parts.push(format!("array_contains(labels, '{}')", label));
}
}
if parts.is_empty() {
None
} else {
Some(parts.join(" AND "))
}
};
let lance_batch = storage
.scan_main_vertex_table(
&["_vid", "_deleted", "labels", "props_json", "_version"],
filter.as_deref(),
)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
let internal_schema = match &lance_deduped {
Some(batch) => batch.schema(),
None => Arc::new(Schema::new(vec![
Field::new("_vid", DataType::UInt64, false),
Field::new("_deleted", DataType::Boolean, false),
Field::new("labels", labels_data_type(), false),
Field::new("props_json", DataType::LargeBinary, true),
Field::new("_version", DataType::UInt64, false),
])),
};
let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
else {
return Ok(RecordBatch::new_empty(output_schema.clone()));
};
let merged = filter_deleted_rows(&merged)?;
if merged.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
if filtered.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
map_to_schemaless_output_schema(
&filtered,
variable,
projected_properties,
output_schema,
l0_ctx,
)
}
fn build_l0_schemaless_vertex_batch(
l0_ctx: &crate::query::df_graph::L0Context,
label: &str,
internal_schema: &SchemaRef,
) -> DFResult<RecordBatch> {
let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
let mut tombstones: HashSet<u64> = HashSet::new();
let label_filter: Vec<&str> = if label.is_empty() {
vec![]
} else if label.contains(':') {
label.split(':').collect()
} else {
vec![label]
};
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
for vid in guard.vertex_tombstones.iter() {
tombstones.insert(vid.as_u64());
}
let vids: Vec<Vid> = if label_filter.is_empty() {
guard.all_vertex_vids()
} else if label_filter.len() == 1 {
guard.vids_for_label(label_filter[0])
} else {
guard.vids_with_all_labels(&label_filter)
};
for vid in vids {
let vid_u64 = vid.as_u64();
if tombstones.contains(&vid_u64) {
continue;
}
let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
let entry = vid_data
.entry(vid_u64)
.or_insert_with(|| (Properties::new(), 0, Vec::new()));
if let Some(props) = guard.vertex_properties.get(&vid) {
for (k, v) in props {
entry.0.insert(k.clone(), v.clone());
}
}
if version > entry.1 {
entry.1 = version;
}
if let Some(labels) = guard.vertex_labels.get(&vid) {
entry.2 = labels.clone();
}
}
}
for t in &tombstones {
vid_data.remove(t);
}
if vid_data.is_empty() {
return Ok(RecordBatch::new_empty(internal_schema.clone()));
}
let mut vids: Vec<u64> = vid_data.keys().copied().collect();
vids.sort_unstable();
let num_rows = vids.len();
let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
for field in internal_schema.fields() {
match field.name().as_str() {
"_vid" => {
columns.push(Arc::new(UInt64Array::from(vids.clone())));
}
"labels" => {
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for vid_u64 in &vids {
let (_, _, labels) = &vid_data[vid_u64];
let values = labels_builder.values();
for lbl in labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
}
"props_json" => {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for vid_u64 in &vids {
let (props, _, _) = &vid_data[vid_u64];
if props.is_empty() {
builder.append_null();
} else {
let json_obj: serde_json::Value = {
let mut map = serde_json::Map::new();
for (k, v) in props {
let json_val: serde_json::Value = v.clone().into();
map.insert(k.clone(), json_val);
}
serde_json::Value::Object(map)
};
match encode_cypher_value(&json_obj) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
columns.push(Arc::new(builder.finish()));
}
"_deleted" => {
columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
false;
num_rows
])));
}
"_version" => {
let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
columns.push(Arc::new(UInt64Array::from(vals)));
}
_ => {
columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
}
}
}
RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
}
fn map_to_schemaless_output_schema(
batch: &RecordBatch,
_variable: &str,
projected_properties: &[String],
output_schema: &SchemaRef,
l0_ctx: &crate::query::df_graph::L0Context,
) -> DFResult<RecordBatch> {
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(output_schema.clone()));
}
let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
let vid_col = batch
.column_by_name("_vid")
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
})?
.clone();
let vid_arr = vid_col
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
})?;
columns.push(vid_col.clone());
let labels_col = batch.column_by_name("labels");
let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for i in 0..vid_arr.len() {
let vid_u64 = vid_arr.value(i);
let vid = Vid::from(vid_u64);
let mut row_labels: Vec<String> = Vec::new();
if let Some(arr) = labels_arr
&& !arr.is_null(i)
{
let list_val = arr.value(i);
if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
for j in 0..str_arr.len() {
if !str_arr.is_null(j) {
row_labels.push(str_arr.value(j).to_string());
}
}
}
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
for lbl in l0_labels {
if !row_labels.contains(lbl) {
row_labels.push(lbl.clone());
}
}
}
}
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
let props_col = batch.column_by_name("props_json");
let props_arr =
props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
for prop in projected_properties {
if prop == "_all_props" {
let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
let guard = l0.read();
!guard.vertex_properties.is_empty()
});
if !any_l0_has_vertex_props {
match props_col {
Some(col) => columns.push(col.clone()),
None => {
columns.push(arrow_array::new_null_array(
&DataType::LargeBinary,
batch.num_rows(),
));
}
}
} else {
let col = build_all_props_column_with_l0_overlay(
batch.num_rows(),
vid_arr,
props_arr,
l0_ctx,
);
columns.push(col);
}
} else {
let expected_type = output_schema
.field_with_name(&format!("{_variable}.{prop}"))
.map(|f| f.data_type().clone())
.unwrap_or(DataType::LargeBinary);
if expected_type == DataType::LargeBinary {
let col = build_overflow_property_column(
batch.num_rows(),
vid_arr,
props_arr,
prop,
l0_ctx,
);
columns.push(col);
} else {
let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
for i in 0..batch.num_rows() {
let vid = Vid::from(vid_arr.value(i));
let resolved =
resolve_l0_property(&vid, prop, l0_ctx)
.flatten()
.or_else(|| {
extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
uni_common::cypher_value_codec::decode(&bytes).ok()
})
});
if let Some(val) = resolved {
prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
}
}
let vids: Vec<Vid> = (0..batch.num_rows())
.map(|i| Vid::from(vid_arr.value(i)))
.collect();
let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
.unwrap_or_else(|_| {
arrow_array::new_null_array(&expected_type, batch.num_rows())
});
columns.push(col);
}
}
}
RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
}
pub(crate) fn get_property_value(
vid: &Vid,
props_map: &HashMap<Vid, Properties>,
prop_name: &str,
) -> Option<Value> {
if prop_name == "_all_props" {
return props_map.get(vid).map(|p| {
let map: HashMap<String, Value> =
p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
Value::Map(map)
});
}
props_map
.get(vid)
.and_then(|props| props.get(prop_name))
.cloned()
}
pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
let uni_val: uni_common::Value = val.clone().into();
Ok(uni_common::cypher_value_codec::encode(&uni_val))
}
macro_rules! build_numeric_column {
($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
let mut builder = <$builder_ty>::new();
for vid in $vids {
match get_property_value(vid, $props_map, $prop_name) {
Some(ref v) => {
if let Some(val) = $extractor(v) {
builder.append_value($cast(val));
} else {
builder.append_null();
}
}
None => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()) as ArrayRef)
}};
}
pub(crate) fn build_property_column_static(
vids: &[Vid],
props_map: &HashMap<Vid, Properties>,
prop_name: &str,
data_type: &DataType,
) -> DFResult<ArrayRef> {
match data_type {
DataType::LargeBinary => {
use arrow_array::builder::LargeBinaryBuilder;
let mut builder = LargeBinaryBuilder::new();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Null) | None => builder.append_null(),
Some(Value::Bytes(bytes)) => {
builder.append_value(&bytes);
}
Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
let bytes: Vec<u8> = arr
.iter()
.filter_map(|v| v.as_u64().map(|n| n as u8))
.collect();
if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
builder.append_value(&bytes);
} else {
let json_val: serde_json::Value = Value::List(arr).into();
match encode_cypher_value(&json_val) {
Ok(encoded) => builder.append_value(encoded),
Err(_) => builder.append_null(),
}
}
}
Some(val @ Value::Temporal(_)) => {
builder.append_value(uni_common::cypher_value_codec::encode(&val));
}
Some(val) => {
let json_val: serde_json::Value = val.into();
match encode_cypher_value(&json_val) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Binary => {
let mut builder = BinaryBuilder::new();
for vid in vids {
let bytes = get_property_value(vid, props_map, prop_name)
.filter(|v| !v.is_null())
.and_then(|v| {
let json_val: serde_json::Value = v.into();
serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
})
.and_then(|crdt| crdt.to_msgpack().ok());
match bytes {
Some(b) => builder.append_value(&b),
None => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Utf8 => {
let mut builder = StringBuilder::new();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::String(s)) => builder.append_value(s),
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(other.to_string()),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Int64 => {
build_numeric_column!(
vids,
props_map,
prop_name,
Int64Builder,
|v: &Value| v.as_i64(),
|v| v
)
}
DataType::Int32 => {
build_numeric_column!(
vids,
props_map,
prop_name,
Int32Builder,
|v: &Value| v.as_i64(),
|v: i64| v as i32
)
}
DataType::Float64 => {
build_numeric_column!(
vids,
props_map,
prop_name,
Float64Builder,
|v: &Value| v.as_f64(),
|v| v
)
}
DataType::Float32 => {
build_numeric_column!(
vids,
props_map,
prop_name,
Float32Builder,
|v: &Value| v.as_f64(),
|v: f64| v as f32
)
}
DataType::Boolean => {
let mut builder = BooleanBuilder::new();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Bool(b)) => builder.append_value(b),
_ => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::UInt64 => {
build_numeric_column!(
vids,
props_map,
prop_name,
UInt64Builder,
|v: &Value| v.as_u64(),
|v| v
)
}
DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
let values_builder = Float32Builder::new();
let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Vector(v)) => {
for val in v {
list_builder.values().append_value(val);
}
list_builder.append(true);
}
Some(Value::List(arr)) => {
for v in arr {
list_builder
.values()
.append_value(v.as_f64().unwrap_or(0.0) as f32);
}
list_builder.append(true);
}
_ => {
for _ in 0..*dim {
list_builder.values().append_null();
}
list_builder.append(false);
}
}
}
Ok(Arc::new(list_builder.finish()))
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Temporal(tv)) => match tv {
uni_common::TemporalValue::DateTime {
nanos_since_epoch, ..
}
| uni_common::TemporalValue::LocalDateTime {
nanos_since_epoch, ..
} => {
builder.append_value(nanos_since_epoch);
}
uni_common::TemporalValue::Date { days_since_epoch } => {
builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
}
_ => builder.append_null(),
},
Some(Value::String(s)) => match parse_datetime_utc(&s) {
Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
Err(_) => builder.append_null(),
},
Some(Value::Int(n)) => {
builder.append_value(n);
}
_ => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Date32 => {
let mut builder = Date32Builder::new();
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
builder.append_value(days_since_epoch);
}
Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
Ok(d) => builder.append_value((d - epoch).num_days() as i32),
Err(_) => builder.append_null(),
},
Some(Value::Int(n)) => {
builder.append_value(n as i32);
}
_ => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Time64(TimeUnit::Nanosecond) => {
let mut builder = Time64NanosecondBuilder::new();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Temporal(
uni_common::TemporalValue::LocalTime {
nanos_since_midnight,
}
| uni_common::TemporalValue::Time {
nanos_since_midnight,
..
},
)) => {
builder.append_value(nanos_since_midnight);
}
Some(Value::Temporal(_)) => builder.append_null(),
Some(Value::String(s)) => {
match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
.or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
{
Ok(t) => {
let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
+ t.nanosecond() as i64;
builder.append_value(nanos);
}
Err(_) => builder.append_null(),
}
}
Some(Value::Int(n)) => {
builder.append_value(n);
}
_ => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
Vec::with_capacity(vids.len());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Temporal(uni_common::TemporalValue::Duration {
months,
days,
nanos,
})) => {
values.push(Some(arrow::datatypes::IntervalMonthDayNano {
months: months as i32,
days: days as i32,
nanoseconds: nanos,
}));
}
Some(Value::Int(_n)) => {
values.push(None);
}
_ => values.push(None),
}
}
let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
Ok(Arc::new(arr))
}
DataType::List(inner_field) => {
build_list_property_column(vids, props_map, prop_name, inner_field)
}
DataType::Struct(fields) => {
build_struct_property_column(vids, props_map, prop_name, fields)
}
DataType::FixedSizeBinary(24) => {
use arrow_array::builder::FixedSizeBinaryBuilder;
const BTIC_LEN: i32 = 24;
let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
match uni_btic::Btic::new(lo, hi, meta) {
Ok(b) => {
builder
.append_value(uni_btic::encode::encode(&b))
.map_err(arrow_err)?;
}
Err(e) => {
tracing::warn!(
"BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
prop_name,
lo,
hi,
meta,
e
);
builder.append_null()
}
}
}
Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
Ok(b) => {
builder
.append_value(uni_btic::encode::encode(&b))
.map_err(arrow_err)?;
}
Err(e) => {
tracing::warn!(
"BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
prop_name,
s,
e
);
builder.append_null()
}
},
_ => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
_ => {
let mut builder = StringBuilder::new();
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(other.to_string()),
}
}
Ok(Arc::new(builder.finish()))
}
}
}
fn build_list_property_column(
vids: &[Vid],
props_map: &HashMap<Vid, Properties>,
prop_name: &str,
inner_field: &Arc<Field>,
) -> DFResult<ArrayRef> {
match inner_field.data_type() {
DataType::Utf8 => {
let mut builder = ListBuilder::new(StringBuilder::new());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::List(arr)) => {
for v in arr {
match v {
Value::String(s) => builder.values().append_value(s),
Value::Null => builder.values().append_null(),
other => builder.values().append_value(format!("{other:?}")),
}
}
builder.append(true);
}
_ => builder.append(false),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Int64 => {
let mut builder = ListBuilder::new(Int64Builder::new());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::List(arr)) => {
for v in arr {
match v.as_i64() {
Some(n) => builder.values().append_value(n),
None => builder.values().append_null(),
}
}
builder.append(true);
}
_ => builder.append(false),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Float64 => {
let mut builder = ListBuilder::new(Float64Builder::new());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::List(arr)) => {
for v in arr {
match v.as_f64() {
Some(n) => builder.values().append_value(n),
None => builder.values().append_null(),
}
}
builder.append(true);
}
_ => builder.append(false),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Boolean => {
let mut builder = ListBuilder::new(BooleanBuilder::new());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::List(arr)) => {
for v in arr {
match v.as_bool() {
Some(b) => builder.values().append_value(b),
None => builder.values().append_null(),
}
}
builder.append(true);
}
_ => builder.append(false),
}
}
Ok(Arc::new(builder.finish()))
}
DataType::Struct(fields) => {
build_list_of_structs_column(vids, props_map, prop_name, fields)
}
_ => {
let mut builder = ListBuilder::new(StringBuilder::new());
for vid in vids {
match get_property_value(vid, props_map, prop_name) {
Some(Value::List(arr)) => {
for v in arr {
match v {
Value::Null => builder.values().append_null(),
other => builder.values().append_value(format!("{other:?}")),
}
}
builder.append(true);
}
_ => builder.append(false),
}
}
Ok(Arc::new(builder.finish()))
}
}
}
fn build_list_of_structs_column(
vids: &[Vid],
props_map: &HashMap<Vid, Properties>,
prop_name: &str,
fields: &Fields,
) -> DFResult<ArrayRef> {
use arrow_array::StructArray;
let values: Vec<Option<Value>> = vids
.iter()
.map(|vid| get_property_value(vid, props_map, prop_name))
.collect();
let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
.iter()
.map(|val| match val {
Some(Value::List(arr)) => {
let objs: Vec<HashMap<String, Value>> = arr
.iter()
.filter_map(|v| {
if let Value::Map(m) = v {
Some(m.clone())
} else {
None
}
})
.collect();
if objs.is_empty() { None } else { Some(objs) }
}
Some(Value::Map(obj)) => {
let kv_pairs: Vec<HashMap<String, Value>> = obj
.iter()
.map(|(k, v)| {
let mut m = HashMap::new();
m.insert("key".to_string(), Value::String(k.clone()));
m.insert("value".to_string(), v.clone());
m
})
.collect();
Some(kv_pairs)
}
_ => None,
})
.collect();
let total_items: usize = rows
.iter()
.filter_map(|r| r.as_ref())
.map(|v| v.len())
.sum();
let child_arrays: Vec<ArrayRef> = fields
.iter()
.map(|field| {
let field_name = field.name();
match field.data_type() {
DataType::Utf8 => {
let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
for obj in rows.iter().flatten().flatten() {
match obj.get(field_name) {
Some(Value::String(s)) => builder.append_value(s),
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(format!("{other:?}")),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Int64 => {
let mut builder = Int64Builder::with_capacity(total_items);
for obj in rows.iter().flatten().flatten() {
match obj.get(field_name).and_then(|v| v.as_i64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Float64 => {
let mut builder = Float64Builder::with_capacity(total_items);
for obj in rows.iter().flatten().flatten() {
match obj.get(field_name).and_then(|v| v.as_f64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
_ => {
let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
for obj in rows.iter().flatten().flatten() {
match obj.get(field_name) {
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(format!("{other:?}")),
}
}
Arc::new(builder.finish()) as ArrayRef
}
}
})
.collect();
let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
.map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
let mut offsets = Vec::with_capacity(vids.len() + 1);
let mut nulls = Vec::with_capacity(vids.len());
let mut offset = 0i32;
offsets.push(offset);
for row in &rows {
match row {
Some(objs) => {
offset += objs.len() as i32;
offsets.push(offset);
nulls.push(true);
}
None => {
offsets.push(offset);
nulls.push(false);
}
}
}
let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
let list_array = arrow_array::ListArray::try_new(
list_field,
arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
Arc::new(struct_array),
Some(arrow::buffer::NullBuffer::from(nulls)),
)
.map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
Ok(Arc::new(list_array))
}
fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
use uni_common::value::TemporalValue;
let mut m = HashMap::new();
match tv {
TemporalValue::DateTime {
nanos_since_epoch,
offset_seconds,
timezone_name,
} => {
m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
if let Some(tz) = timezone_name {
m.insert("timezone_name".into(), Value::String(tz.clone()));
}
}
TemporalValue::LocalDateTime { nanos_since_epoch } => {
m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
}
TemporalValue::Time {
nanos_since_midnight,
offset_seconds,
} => {
m.insert(
"nanos_since_midnight".into(),
Value::Int(*nanos_since_midnight),
);
m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
}
TemporalValue::LocalTime {
nanos_since_midnight,
} => {
m.insert(
"nanos_since_midnight".into(),
Value::Int(*nanos_since_midnight),
);
}
TemporalValue::Date { days_since_epoch } => {
m.insert(
"days_since_epoch".into(),
Value::Int(*days_since_epoch as i64),
);
}
TemporalValue::Duration {
months,
days,
nanos,
} => {
m.insert("months".into(), Value::Int(*months));
m.insert("days".into(), Value::Int(*days));
m.insert("nanos".into(), Value::Int(*nanos));
}
TemporalValue::Btic { lo, hi, meta } => {
m.insert("lo".into(), Value::Int(*lo));
m.insert("hi".into(), Value::Int(*hi));
m.insert("meta".into(), Value::Int(*meta as i64));
}
}
m
}
fn build_struct_property_column(
vids: &[Vid],
props_map: &HashMap<Vid, Properties>,
prop_name: &str,
fields: &Fields,
) -> DFResult<ArrayRef> {
use arrow_array::StructArray;
let values: Vec<Option<Value>> = vids
.iter()
.map(|vid| {
let val = get_property_value(vid, props_map, prop_name);
match val {
Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
other => other,
}
})
.collect();
let child_arrays: Vec<ArrayRef> = fields
.iter()
.map(|field| {
let field_name = field.name();
match field.data_type() {
DataType::Float64 => {
let mut builder = Float64Builder::with_capacity(vids.len());
for val in &values {
match val {
Some(Value::Map(obj)) => {
match obj.get(field_name).and_then(|v| v.as_f64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Utf8 => {
let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
for val in &values {
match val {
Some(Value::Map(obj)) => match obj.get(field_name) {
Some(Value::String(s)) => builder.append_value(s),
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(format!("{other:?}")),
},
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Int64 => {
let mut builder = Int64Builder::with_capacity(vids.len());
for val in &values {
match val {
Some(Value::Map(obj)) => {
match obj.get(field_name).and_then(|v| v.as_i64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Timestamp(_, _) => {
let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
for val in &values {
match val {
Some(Value::Map(obj)) => {
match obj.get(field_name).and_then(|v| v.as_i64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Int32 => {
let mut builder = Int32Builder::with_capacity(vids.len());
for val in &values {
match val {
Some(Value::Map(obj)) => {
match obj.get(field_name).and_then(|v| v.as_i64()) {
Some(n) => builder.append_value(n as i32),
None => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
DataType::Time64(_) => {
let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
for val in &values {
match val {
Some(Value::Map(obj)) => {
match obj.get(field_name).and_then(|v| v.as_i64()) {
Some(n) => builder.append_value(n),
None => builder.append_null(),
}
}
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
_ => {
let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
for val in &values {
match val {
Some(Value::Map(obj)) => match obj.get(field_name) {
Some(Value::Null) | None => builder.append_null(),
Some(other) => builder.append_value(format!("{other:?}")),
},
_ => builder.append_null(),
}
}
Arc::new(builder.finish()) as ArrayRef
}
}
})
.collect();
let nulls: Vec<bool> = values
.iter()
.map(|v| matches!(v, Some(Value::Map(_))))
.collect();
let struct_array = StructArray::try_new(
fields.clone(),
child_arrays,
Some(arrow::buffer::NullBuffer::from(nulls)),
)
.map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
Ok(Arc::new(struct_array))
}
impl Stream for GraphScanStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let state = std::mem::replace(&mut self.state, GraphScanState::Done);
match state {
GraphScanState::Init => {
let graph_ctx = self.graph_ctx.clone();
let label = self.label.clone();
let variable = self.variable.clone();
let properties = self.properties.clone();
let is_edge_scan = self.is_edge_scan;
let is_schemaless = self.is_schemaless;
let schema = self.schema.clone();
let fut = async move {
graph_ctx.check_timeout().map_err(|e| {
datafusion::error::DataFusionError::Execution(e.to_string())
})?;
let batch = if is_edge_scan {
columnar_scan_edge_batch_static(
&graph_ctx,
&label,
&variable,
&properties,
&schema,
)
.await?
} else if is_schemaless {
columnar_scan_schemaless_vertex_batch_static(
&graph_ctx,
&label,
&variable,
&properties,
&schema,
)
.await?
} else {
columnar_scan_vertex_batch_static(
&graph_ctx,
&label,
&variable,
&properties,
&schema,
)
.await?
};
Ok(Some(batch))
};
self.state = GraphScanState::Executing(Box::pin(fut));
}
GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(batch)) => {
self.state = GraphScanState::Done;
self.metrics
.record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
return Poll::Ready(batch.map(Ok));
}
Poll::Ready(Err(e)) => {
self.state = GraphScanState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = GraphScanState::Executing(fut);
return Poll::Pending;
}
},
GraphScanState::Done => {
return Poll::Ready(None);
}
}
}
}
}
impl RecordBatchStream for GraphScanStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_vertex_schema() {
let uni_schema = UniSchema::default();
let schema = GraphScanExec::build_vertex_schema(
"n",
"Person",
&["name".to_string(), "age".to_string()],
&uni_schema,
);
assert_eq!(schema.fields().len(), 4);
assert_eq!(schema.field(0).name(), "n._vid");
assert_eq!(schema.field(1).name(), "n._labels");
assert_eq!(schema.field(2).name(), "n.name");
assert_eq!(schema.field(3).name(), "n.age");
}
#[test]
fn test_build_edge_schema() {
let uni_schema = UniSchema::default();
let schema =
GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
assert_eq!(schema.fields().len(), 4);
assert_eq!(schema.field(0).name(), "r._eid");
assert_eq!(schema.field(1).name(), "r._src_vid");
assert_eq!(schema.field(2).name(), "r._dst_vid");
assert_eq!(schema.field(3).name(), "r.weight");
}
#[test]
fn test_build_schemaless_vertex_schema() {
let empty_schema = uni_common::core::schema::Schema::default();
let schema = GraphScanExec::build_schemaless_vertex_schema(
"n",
&["name".to_string(), "age".to_string()],
&empty_schema,
);
assert_eq!(schema.fields().len(), 4);
assert_eq!(schema.field(0).name(), "n._vid");
assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
assert_eq!(schema.field(1).name(), "n._labels");
assert_eq!(schema.field(2).name(), "n.name");
assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
assert_eq!(schema.field(3).name(), "n.age");
assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
}
#[test]
fn test_schemaless_all_scan_has_empty_label() {
let empty_schema = uni_common::core::schema::Schema::default();
let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "n._vid");
assert_eq!(schema.field(1).name(), "n._labels");
}
#[test]
fn test_cypher_value_all_props_extraction() {
let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
let cv_bytes = encode_cypher_value(&json_obj).unwrap();
let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
match decoded {
uni_common::Value::Map(map) => {
let age_val = map.get("age").unwrap();
assert_eq!(age_val, &uni_common::Value::Int(30));
}
_ => panic!("Expected Map"),
}
let single_val = serde_json::json!(30);
let single_bytes = encode_cypher_value(&single_val).unwrap();
let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
assert_eq!(single_decoded, uni_common::Value::Int(30));
}
fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("_vid", DataType::UInt64, false),
Field::new("_deleted", DataType::Boolean, false),
Field::new("_version", DataType::UInt64, false),
Field::new("name", DataType::Utf8, true),
]));
let names: Vec<String> = vids
.iter()
.zip(versions.iter())
.map(|(v, ver)| format!("v{}_ver{}", v, ver))
.collect();
let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt64Array::from(vids.to_vec())),
Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
Arc::new(UInt64Array::from(versions.to_vec())),
Arc::new(name_arr),
],
)
.unwrap()
}
#[test]
fn test_mvcc_dedup_multiple_versions() {
let batch = make_mvcc_batch(
&[1, 1, 1, 2, 2],
&[3, 1, 5, 2, 4],
&[false, false, false, false, false],
);
let result = mvcc_dedup_batch(&batch).unwrap();
assert_eq!(result.num_rows(), 2);
let vid_col = result
.column_by_name("_vid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let ver_col = result
.column_by_name("_version")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let name_col = result
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
assert_eq!(vid_col.value(0), 1);
assert_eq!(ver_col.value(0), 5);
assert_eq!(name_col.value(0), "v1_ver5");
assert_eq!(vid_col.value(1), 2);
assert_eq!(ver_col.value(1), 4);
assert_eq!(name_col.value(1), "v2_ver4");
}
#[test]
fn test_mvcc_dedup_single_rows() {
let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
let result = mvcc_dedup_batch(&batch).unwrap();
assert_eq!(result.num_rows(), 3);
}
#[test]
fn test_mvcc_dedup_empty() {
let batch = make_mvcc_batch(&[], &[], &[]);
let result = mvcc_dedup_batch(&batch).unwrap();
assert_eq!(result.num_rows(), 0);
}
#[test]
fn test_filter_l0_tombstones_removes_tombstoned() {
use crate::query::df_graph::L0Context;
let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
{
}
let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
let l0_ctx = L0Context {
current_l0: Some(l0_buf),
transaction_l0: None,
pending_flush_l0s: vec![],
};
let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
assert_eq!(result.num_rows(), 2);
let vid_col = result
.column_by_name("_vid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(vid_col.value(0), 1);
assert_eq!(vid_col.value(1), 3);
}
#[test]
fn test_filter_l0_tombstones_none() {
use crate::query::df_graph::L0Context;
let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
let l0_ctx = L0Context::default();
let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
assert_eq!(result.num_rows(), 3);
}
#[test]
fn test_map_to_output_schema_basic() {
use crate::query::df_graph::L0Context;
let lance_schema = Arc::new(Schema::new(vec![
Field::new("_vid", DataType::UInt64, false),
Field::new("_deleted", DataType::Boolean, false),
Field::new("_version", DataType::UInt64, false),
Field::new("name", DataType::Utf8, true),
]));
let name_arr: arrow_array::StringArray =
vec![Some("Alice"), Some("Bob")].into_iter().collect();
let batch = RecordBatch::try_new(
lance_schema,
vec![
Arc::new(UInt64Array::from(vec![1u64, 2])),
Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
Arc::new(UInt64Array::from(vec![1u64, 1])),
Arc::new(name_arr),
],
)
.unwrap();
let output_schema = Arc::new(Schema::new(vec![
Field::new("n._vid", DataType::UInt64, false),
Field::new("n._labels", labels_data_type(), true),
Field::new("n.name", DataType::Utf8, true),
]));
let l0_ctx = L0Context::default();
let result = map_to_output_schema(
&batch,
"Person",
"n",
&["name".to_string()],
&output_schema,
&l0_ctx,
)
.unwrap();
assert_eq!(result.num_rows(), 2);
assert_eq!(result.schema().fields().len(), 3);
assert_eq!(result.schema().field(0).name(), "n._vid");
assert_eq!(result.schema().field(1).name(), "n._labels");
assert_eq!(result.schema().field(2).name(), "n.name");
let name_col = result
.column(2)
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
}
}