1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
92
93 vid_list_filter: Option<Vec<u64>>,
98
99 extra_lance_filter: Option<String>,
105
106 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112 is_edge_scan: bool,
114
115 is_schemaless: bool,
117
118 schema: SchemaRef,
120
121 properties: Arc<PlanProperties>,
123
124 metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("GraphScanExec")
131 .field("label", &self.label)
132 .field("variable", &self.variable)
133 .field("projected_properties", &self.projected_properties)
134 .field(
135 "vid_list_filter_len",
136 &self.vid_list_filter.as_ref().map(Vec::len),
137 )
138 .field("is_edge_scan", &self.is_edge_scan)
139 .finish()
140 }
141}
142
143impl GraphScanExec {
144 pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149 self.vid_list_filter = Some(vids);
150 self
151 }
152
153 pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157 self.extra_lance_filter = Some(filter);
158 self
159 }
160
161 pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165 self.extra_runtime_filter = Some(filter);
166 self
167 }
168
169 pub fn has_extra_lance_filter(&self) -> bool {
172 self.extra_lance_filter.is_some()
173 }
174
175 pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185 if self.is_edge_scan {
186 return Err(datafusion::error::DataFusionError::Plan(
187 "execute_with_vid_filter not supported for edge scans".into(),
188 ));
189 }
190 if self.is_schemaless {
191 columnar_scan_schemaless_vertex_batch_static(
192 &self.graph_ctx,
193 &self.label,
194 &self.variable,
195 &self.projected_properties,
196 &self.schema,
197 &self.filter,
198 Some(vids),
199 self.extra_lance_filter.as_deref(),
200 self.extra_runtime_filter.as_ref(),
201 )
202 .await
203 } else {
204 columnar_scan_vertex_batch_static(
205 &self.graph_ctx,
206 &self.label,
207 &self.variable,
208 &self.projected_properties,
209 &self.schema,
210 &self.filter,
211 Some(vids),
212 self.extra_lance_filter.as_deref(),
213 self.extra_runtime_filter.as_ref(),
214 )
215 .await
216 }
217 }
218}
219
220impl GraphScanExec {
221 pub fn new_vertex_scan(
226 graph_ctx: Arc<GraphExecutionContext>,
227 label: impl Into<String>,
228 variable: impl Into<String>,
229 projected_properties: Vec<String>,
230 filter: Option<Arc<dyn PhysicalExpr>>,
231 ) -> Self {
232 let label = label.into();
233 let variable = variable.into();
234
235 let uni_schema = graph_ctx.storage().schema_manager().schema();
237 let schema =
238 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240 let properties = compute_plan_properties(schema.clone());
241
242 Self {
243 graph_ctx,
244 label,
245 variable,
246 projected_properties,
247 filter,
248 vid_list_filter: None,
249 extra_lance_filter: None,
250 extra_runtime_filter: None,
251 is_edge_scan: false,
252 is_schemaless: false,
253 schema,
254 properties,
255 metrics: ExecutionPlanMetricsSet::new(),
256 }
257 }
258
259 pub fn new_schemaless_vertex_scan(
265 graph_ctx: Arc<GraphExecutionContext>,
266 label_name: impl Into<String>,
267 variable: impl Into<String>,
268 projected_properties: Vec<String>,
269 filter: Option<Arc<dyn PhysicalExpr>>,
270 ) -> Self {
271 let label = label_name.into();
272 let variable = variable.into();
273
274 let projected_properties: Vec<String> = projected_properties
279 .into_iter()
280 .filter(|p| p != "_vid" && p != "_labels")
281 .collect();
282
283 let uni_schema = graph_ctx.storage().schema_manager().schema();
284 let schema =
285 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286 let properties = compute_plan_properties(schema.clone());
287
288 Self {
289 graph_ctx,
290 label,
291 variable,
292 projected_properties,
293 filter,
294 vid_list_filter: None,
295 extra_lance_filter: None,
296 extra_runtime_filter: None,
297 is_edge_scan: false,
298 is_schemaless: true,
299 schema,
300 properties,
301 metrics: ExecutionPlanMetricsSet::new(),
302 }
303 }
304
305 pub fn new_multi_label_vertex_scan(
310 graph_ctx: Arc<GraphExecutionContext>,
311 labels: Vec<String>,
312 variable: impl Into<String>,
313 projected_properties: Vec<String>,
314 filter: Option<Arc<dyn PhysicalExpr>>,
315 ) -> Self {
316 let variable = variable.into();
317 let projected_properties: Vec<String> = projected_properties
318 .into_iter()
319 .filter(|p| p != "_vid" && p != "_labels")
320 .collect();
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
322 let schema =
323 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324 let properties = compute_plan_properties(schema.clone());
325
326 let encoded_labels = labels.join(":");
328
329 Self {
330 graph_ctx,
331 label: encoded_labels,
332 variable,
333 projected_properties,
334 filter,
335 vid_list_filter: None,
336 extra_lance_filter: None,
337 extra_runtime_filter: None,
338 is_edge_scan: false,
339 is_schemaless: true,
340 schema,
341 properties,
342 metrics: ExecutionPlanMetricsSet::new(),
343 }
344 }
345
346 pub fn new_schemaless_all_scan(
352 graph_ctx: Arc<GraphExecutionContext>,
353 variable: impl Into<String>,
354 projected_properties: Vec<String>,
355 filter: Option<Arc<dyn PhysicalExpr>>,
356 ) -> Self {
357 let variable = variable.into();
358 let projected_properties: Vec<String> = projected_properties
359 .into_iter()
360 .filter(|p| p != "_vid" && p != "_labels")
361 .collect();
362
363 let uni_schema = graph_ctx.storage().schema_manager().schema();
364 let schema =
365 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366 let properties = compute_plan_properties(schema.clone());
367
368 Self {
369 graph_ctx,
370 label: String::new(), variable,
372 projected_properties,
373 filter,
374 vid_list_filter: None,
375 extra_lance_filter: None,
376 extra_runtime_filter: None,
377 is_edge_scan: false,
378 is_schemaless: true,
379 schema,
380 properties,
381 metrics: ExecutionPlanMetricsSet::new(),
382 }
383 }
384
385 fn build_schemaless_vertex_schema(
391 variable: &str,
392 properties: &[String],
393 uni_schema: &uni_common::core::schema::Schema,
394 ) -> SchemaRef {
395 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397 std::collections::HashMap::new();
398 for label_props in uni_schema.properties.values() {
399 for (name, meta) in label_props {
400 merged.entry(name.as_str()).or_insert(meta);
401 }
402 }
403
404 let mut fields = vec![
405 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406 Field::new(format!("{}._labels", variable), labels_data_type(), true),
407 ];
408
409 for prop in properties {
410 let col_name = format!("{}.{}", variable, prop);
411 let arrow_type = merged
412 .get(prop.as_str())
413 .map(|meta| meta.r#type.to_arrow())
414 .unwrap_or(DataType::LargeBinary);
415 fields.push(Field::new(&col_name, arrow_type, true));
416 }
417
418 Arc::new(Schema::new(fields))
419 }
420
421 pub fn new_edge_scan(
426 graph_ctx: Arc<GraphExecutionContext>,
427 edge_type: impl Into<String>,
428 variable: impl Into<String>,
429 projected_properties: Vec<String>,
430 filter: Option<Arc<dyn PhysicalExpr>>,
431 ) -> Self {
432 let label = edge_type.into();
433 let variable = variable.into();
434
435 let uni_schema = graph_ctx.storage().schema_manager().schema();
437 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439 let properties = compute_plan_properties(schema.clone());
440
441 Self {
442 graph_ctx,
443 label,
444 variable,
445 projected_properties,
446 filter,
447 vid_list_filter: None,
448 extra_lance_filter: None,
449 extra_runtime_filter: None,
450 is_edge_scan: true,
451 is_schemaless: false,
452 schema,
453 properties,
454 metrics: ExecutionPlanMetricsSet::new(),
455 }
456 }
457
458 fn build_vertex_schema(
460 variable: &str,
461 label: &str,
462 properties: &[String],
463 uni_schema: &UniSchema,
464 ) -> SchemaRef {
465 let mut fields = vec![
466 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467 Field::new(format!("{}._labels", variable), labels_data_type(), true),
468 ];
469 let label_props = uni_schema.properties.get(label);
470 for prop in properties {
471 let col_name = format!("{}.{}", variable, prop);
472 let arrow_type = resolve_property_type(prop, label_props);
473 fields.push(Field::new(&col_name, arrow_type, true));
474 }
475 Arc::new(Schema::new(fields))
476 }
477
478 fn build_edge_schema(
480 variable: &str,
481 edge_type: &str,
482 properties: &[String],
483 uni_schema: &UniSchema,
484 ) -> SchemaRef {
485 let mut fields = vec![
486 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
487 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
488 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
489 ];
490 let edge_props = uni_schema.properties.get(edge_type);
491 for prop in properties {
492 let col_name = format!("{}.{}", variable, prop);
493 let arrow_type = resolve_property_type(prop, edge_props);
494 fields.push(Field::new(&col_name, arrow_type, true));
495 }
496 Arc::new(Schema::new(fields))
497 }
498}
499
500impl DisplayAs for GraphScanExec {
501 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
503 write!(
504 f,
505 "GraphScanExec: {}={}, properties={:?}",
506 scan_type, self.label, self.projected_properties
507 )?;
508 if self.filter.is_some() {
509 write!(f, ", filter=<pushed>")?;
510 }
511 Ok(())
512 }
513}
514
515impl ExecutionPlan for GraphScanExec {
516 fn name(&self) -> &str {
517 "GraphScanExec"
518 }
519
520 fn as_any(&self) -> &dyn Any {
521 self
522 }
523
524 fn schema(&self) -> SchemaRef {
525 self.schema.clone()
526 }
527
528 fn properties(&self) -> &Arc<PlanProperties> {
529 &self.properties
530 }
531
532 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
533 vec![]
534 }
535
536 fn with_new_children(
537 self: Arc<Self>,
538 children: Vec<Arc<dyn ExecutionPlan>>,
539 ) -> DFResult<Arc<dyn ExecutionPlan>> {
540 if children.is_empty() {
541 Ok(self)
542 } else {
543 Err(datafusion::error::DataFusionError::Plan(
544 "GraphScanExec does not accept children".to_string(),
545 ))
546 }
547 }
548
549 fn execute(
550 &self,
551 partition: usize,
552 _context: Arc<TaskContext>,
553 ) -> DFResult<SendableRecordBatchStream> {
554 let metrics = BaselineMetrics::new(&self.metrics, partition);
555
556 Ok(Box::pin(GraphScanStream::new(
557 self.graph_ctx.clone(),
558 self.label.clone(),
559 self.variable.clone(),
560 self.projected_properties.clone(),
561 self.is_edge_scan,
562 self.is_schemaless,
563 self.filter.clone(),
564 self.vid_list_filter.clone(),
565 self.extra_lance_filter.clone(),
566 self.extra_runtime_filter.clone(),
567 self.schema.clone(),
568 metrics,
569 )))
570 }
571
572 fn metrics(&self) -> Option<MetricsSet> {
573 Some(self.metrics.clone_inner())
574 }
575}
576
577enum GraphScanState {
579 Init,
581 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
583 Done,
585}
586
587struct GraphScanStream {
593 graph_ctx: Arc<GraphExecutionContext>,
595
596 label: String,
598
599 variable: String,
601
602 properties: Vec<String>,
604
605 is_edge_scan: bool,
607
608 is_schemaless: bool,
610
611 filter: Option<Arc<dyn PhysicalExpr>>,
613
614 vid_list_filter: Option<Vec<u64>>,
616
617 extra_lance_filter: Option<String>,
620
621 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
623
624 schema: SchemaRef,
626
627 state: GraphScanState,
629
630 metrics: BaselineMetrics,
632}
633
634impl GraphScanStream {
635 #[expect(clippy::too_many_arguments)]
637 fn new(
638 graph_ctx: Arc<GraphExecutionContext>,
639 label: String,
640 variable: String,
641 properties: Vec<String>,
642 is_edge_scan: bool,
643 is_schemaless: bool,
644 filter: Option<Arc<dyn PhysicalExpr>>,
645 vid_list_filter: Option<Vec<u64>>,
646 extra_lance_filter: Option<String>,
647 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
648 schema: SchemaRef,
649 metrics: BaselineMetrics,
650 ) -> Self {
651 Self {
652 graph_ctx,
653 label,
654 variable,
655 properties,
656 is_edge_scan,
657 is_schemaless,
658 filter,
659 vid_list_filter,
660 extra_lance_filter,
661 extra_runtime_filter,
662 schema,
663 state: GraphScanState::Init,
664 metrics,
665 }
666 }
667}
668
669pub(crate) fn resolve_property_type(
674 prop: &str,
675 schema_props: Option<
676 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
677 >,
678) -> DataType {
679 if prop == "overflow_json" {
680 DataType::LargeBinary
681 } else if prop == "_created_at" || prop == "_updated_at" {
682 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
686 } else {
687 schema_props
688 .and_then(|props| props.get(prop))
689 .map(|meta| meta.r#type.to_arrow())
690 .unwrap_or(DataType::LargeBinary)
691 }
692}
693
694#[cfg(test)]
703fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
704 mvcc_dedup_batch_by(batch, "_vid")
705}
706
707fn mvcc_dedup_to_option(
712 batch: Option<RecordBatch>,
713 id_column: &str,
714) -> DFResult<Option<RecordBatch>> {
715 match batch {
716 Some(b) => {
717 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
718 Ok(if deduped.num_rows() > 0 {
719 Some(deduped)
720 } else {
721 None
722 })
723 }
724 None => Ok(None),
725 }
726}
727
728fn merge_lance_and_l0(
732 lance_deduped: Option<RecordBatch>,
733 l0_batch: RecordBatch,
734 internal_schema: &SchemaRef,
735 id_column: &str,
736) -> DFResult<Option<RecordBatch>> {
737 let has_l0 = l0_batch.num_rows() > 0;
738 match (lance_deduped, has_l0) {
739 (Some(lance), true) => {
740 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
741 .map_err(arrow_err)?;
742 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
743 }
744 (Some(lance), false) => Ok(Some(lance)),
745 (None, true) => Ok(Some(l0_batch)),
746 (None, false) => Ok(None),
747 }
748}
749
750fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
755 if !columns.iter().any(|c| c == col_name) {
756 columns.push(col_name.to_string());
757 }
758}
759
760fn extract_from_overflow_blob(
765 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
766 row: usize,
767 prop: &str,
768) -> Option<Vec<u8>> {
769 let arr = overflow_arr?;
770 if arr.is_null(row) {
771 return None;
772 }
773 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
774}
775
776fn build_overflow_property_column(
783 num_rows: usize,
784 vid_arr: &UInt64Array,
785 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
786 prop: &str,
787 l0_ctx: &crate::query::df_graph::L0Context,
788) -> ArrayRef {
789 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
790 for i in 0..num_rows {
791 let vid = Vid::from(vid_arr.value(i));
792
793 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
795
796 if let Some(val_opt) = l0_val {
797 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
798 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
799 builder.append_value(&bytes);
800 } else {
801 builder.append_null();
802 }
803 }
804 Arc::new(builder.finish())
805}
806
807fn resolve_l0_property(
813 vid: &Vid,
814 prop: &str,
815 l0_ctx: &crate::query::df_graph::L0Context,
816) -> Option<Option<Value>> {
817 let mut result = None;
818 for l0 in l0_ctx.iter_l0_buffers() {
819 let guard = l0.read();
820 if let Some(props) = guard.vertex_properties.get(vid)
821 && let Some(val) = props.get(prop)
822 {
823 result = Some(Some(val.clone()));
824 }
825 }
826 result
827}
828
829fn append_value_as_cypher_binary(
834 builder: &mut arrow_array::builder::LargeBinaryBuilder,
835 val: Option<&Value>,
836) {
837 match val {
838 Some(v) if !v.is_null() => {
839 let json_val: serde_json::Value = v.clone().into();
840 match encode_cypher_value(&json_val) {
841 Ok(bytes) => builder.append_value(bytes),
842 Err(_) => builder.append_null(),
843 }
844 }
845 _ => builder.append_null(),
846 }
847}
848
849fn build_all_props_column_with_l0_overlay(
857 num_rows: usize,
858 vid_arr: &UInt64Array,
859 props_arr: Option<&arrow_array::LargeBinaryArray>,
860 l0_ctx: &crate::query::df_graph::L0Context,
861) -> ArrayRef {
862 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
863 for i in 0..num_rows {
864 let vid = Vid::from(vid_arr.value(i));
865
866 let mut merged_props = serde_json::Map::new();
868 if let Some(arr) = props_arr
869 && !arr.is_null(i)
870 && let Ok(uni_common::Value::Map(map)) =
871 uni_common::cypher_value_codec::decode(arr.value(i))
872 {
873 for (k, v) in map {
874 let json_val: serde_json::Value = v.into();
875 merged_props.insert(k, json_val);
876 }
877 }
878
879 for l0 in l0_ctx.iter_l0_buffers() {
881 let guard = l0.read();
882 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
883 for (k, v) in l0_props {
884 let json_val: serde_json::Value = v.clone().into();
885 merged_props.insert(k.clone(), json_val);
886 }
887 }
888 }
889
890 if merged_props.is_empty() {
892 builder.append_null();
893 } else {
894 let json_obj = serde_json::Value::Object(merged_props);
895 match encode_cypher_value(&json_obj) {
896 Ok(bytes) => builder.append_value(bytes),
897 Err(_) => builder.append_null(),
898 }
899 }
900 }
901 Arc::new(builder.finish())
902}
903
904fn build_all_props_column_for_schema_scan(
909 batch: &RecordBatch,
910 vid_arr: &UInt64Array,
911 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
912 projected_properties: &[String],
913 l0_ctx: &crate::query::df_graph::L0Context,
914) -> ArrayRef {
915 let schema_props: Vec<&str> = projected_properties
917 .iter()
918 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
919 .map(String::as_str)
920 .collect();
921
922 let num_rows = batch.num_rows();
923 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
924 for i in 0..num_rows {
925 let vid = Vid::from(vid_arr.value(i));
926 let mut merged_props = serde_json::Map::new();
927
928 for &prop in &schema_props {
930 if let Some(col) = batch.column_by_name(prop) {
931 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
932 if !val.is_null() {
933 let json_val: serde_json::Value = val.into();
934 merged_props.insert(prop.to_string(), json_val);
935 }
936 }
937 }
938
939 if let Some(arr) = overflow_arr
941 && !arr.is_null(i)
942 && let Ok(uni_common::Value::Map(map)) =
943 uni_common::cypher_value_codec::decode(arr.value(i))
944 {
945 for (k, v) in map {
946 let json_val: serde_json::Value = v.into();
947 merged_props.insert(k, json_val);
948 }
949 }
950
951 for l0 in l0_ctx.iter_l0_buffers() {
953 let guard = l0.read();
954 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
955 for (k, v) in l0_props {
956 let json_val: serde_json::Value = v.clone().into();
957 merged_props.insert(k.clone(), json_val);
958 }
959 }
960 }
961
962 if merged_props.is_empty() {
963 builder.append_null();
964 } else {
965 let json_obj = serde_json::Value::Object(merged_props);
966 match encode_cypher_value(&json_obj) {
967 Ok(bytes) => builder.append_value(bytes),
968 Err(_) => builder.append_null(),
969 }
970 }
971 }
972 Arc::new(builder.finish())
973}
974
975fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
981 if batch.num_rows() == 0 {
982 return Ok(batch.clone());
983 }
984
985 let id_col = batch
986 .column_by_name(id_column)
987 .ok_or_else(|| {
988 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
989 })?
990 .clone();
991 let version_col = batch
992 .column_by_name("_version")
993 .ok_or_else(|| {
994 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
995 })?
996 .clone();
997
998 let sort_columns = vec![
1000 arrow::compute::SortColumn {
1001 values: id_col,
1002 options: Some(arrow::compute::SortOptions {
1003 descending: false,
1004 nulls_first: false,
1005 }),
1006 },
1007 arrow::compute::SortColumn {
1008 values: version_col,
1009 options: Some(arrow::compute::SortOptions {
1010 descending: true,
1011 nulls_first: false,
1012 }),
1013 },
1014 ];
1015 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1016
1017 let sorted_columns: Vec<ArrayRef> = batch
1019 .columns()
1020 .iter()
1021 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1022 .collect::<Result<_, _>>()
1023 .map_err(arrow_err)?;
1024 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1025
1026 let sorted_id = sorted
1028 .column_by_name(id_column)
1029 .unwrap()
1030 .as_any()
1031 .downcast_ref::<UInt64Array>()
1032 .unwrap();
1033
1034 let mut keep = vec![false; sorted.num_rows()];
1035 if !keep.is_empty() {
1036 keep[0] = true;
1037 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1038 if sorted_id.value(i) != sorted_id.value(i - 1) {
1039 *flag = true;
1040 }
1041 }
1042 }
1043
1044 let mask = arrow_array::BooleanArray::from(keep);
1045 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1046}
1047
1048fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1050 if batch.num_rows() == 0 {
1051 return Ok(batch.clone());
1052 }
1053 let op_col = match batch.column_by_name("op") {
1054 Some(col) => col
1055 .as_any()
1056 .downcast_ref::<arrow_array::UInt8Array>()
1057 .unwrap(),
1058 None => return Ok(batch.clone()),
1059 };
1060 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1061 let mask = arrow_array::BooleanArray::from(keep);
1062 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1063}
1064
1065fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1067 if batch.num_rows() == 0 {
1068 return Ok(batch.clone());
1069 }
1070 let deleted_col = match batch.column_by_name("_deleted") {
1071 Some(col) => col
1072 .as_any()
1073 .downcast_ref::<arrow_array::BooleanArray>()
1074 .unwrap(),
1075 None => return Ok(batch.clone()),
1076 };
1077 let keep: Vec<bool> = (0..deleted_col.len())
1078 .map(|i| !deleted_col.value(i))
1079 .collect();
1080 let mask = arrow_array::BooleanArray::from(keep);
1081 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1082}
1083
1084fn filter_l0_tombstones(
1086 batch: &RecordBatch,
1087 l0_ctx: &crate::query::df_graph::L0Context,
1088) -> DFResult<RecordBatch> {
1089 if batch.num_rows() == 0 {
1090 return Ok(batch.clone());
1091 }
1092
1093 let mut tombstones: HashSet<u64> = HashSet::new();
1094 for l0 in l0_ctx.iter_l0_buffers() {
1095 let guard = l0.read();
1096 for vid in guard.vertex_tombstones.iter() {
1097 tombstones.insert(vid.as_u64());
1098 }
1099 }
1100
1101 if tombstones.is_empty() {
1102 return Ok(batch.clone());
1103 }
1104
1105 let vid_col = batch
1106 .column_by_name("_vid")
1107 .ok_or_else(|| {
1108 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1109 })?
1110 .as_any()
1111 .downcast_ref::<UInt64Array>()
1112 .unwrap();
1113
1114 let keep: Vec<bool> = (0..vid_col.len())
1115 .map(|i| !tombstones.contains(&vid_col.value(i)))
1116 .collect();
1117 let mask = arrow_array::BooleanArray::from(keep);
1118 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1119}
1120
1121fn filter_l0_edge_tombstones(
1123 batch: &RecordBatch,
1124 l0_ctx: &crate::query::df_graph::L0Context,
1125) -> DFResult<RecordBatch> {
1126 if batch.num_rows() == 0 {
1127 return Ok(batch.clone());
1128 }
1129
1130 let mut tombstones: HashSet<u64> = HashSet::new();
1131 for l0 in l0_ctx.iter_l0_buffers() {
1132 let guard = l0.read();
1133 for eid in guard.tombstones.keys() {
1134 tombstones.insert(eid.as_u64());
1135 }
1136 }
1137
1138 if tombstones.is_empty() {
1139 return Ok(batch.clone());
1140 }
1141
1142 let eid_col = batch
1143 .column_by_name("eid")
1144 .ok_or_else(|| {
1145 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1146 })?
1147 .as_any()
1148 .downcast_ref::<UInt64Array>()
1149 .unwrap();
1150
1151 let keep: Vec<bool> = (0..eid_col.len())
1152 .map(|i| !tombstones.contains(&eid_col.value(i)))
1153 .collect();
1154 let mask = arrow_array::BooleanArray::from(keep);
1155 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1156}
1157
1158fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1167 use datafusion::logical_expr::Operator;
1168 use datafusion::physical_expr::expressions::BinaryExpr;
1169
1170 if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1172 if bin.op() == &Operator::Eq {
1173 if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1175 return Some(vid);
1176 }
1177 if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1178 return Some(vid);
1179 }
1180 }
1181 if bin.op() == &Operator::And {
1183 if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1184 return Some(vid);
1185 }
1186 return extract_vid_from_physical_filter(bin.right());
1187 }
1188 }
1189 None
1190}
1191
1192fn try_extract_vid_eq(
1196 col_side: &Arc<dyn PhysicalExpr>,
1197 val_side: &Arc<dyn PhysicalExpr>,
1198) -> Option<u64> {
1199 use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1200
1201 let col = col_side.as_any().downcast_ref::<Column>()?;
1203 if col.name() != "_vid" && !col.name().ends_with("._vid") {
1204 return None;
1205 }
1206
1207 if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1209 return scalar_to_u64(lit.value());
1210 }
1211
1212 if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1214 && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1215 {
1216 return scalar_to_u64(lit.value());
1217 }
1218
1219 None
1220}
1221
1222fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1225 match (vid_filter, extra) {
1226 (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1227 (Some(a), None) => Some(a.to_string()),
1228 (None, Some(b)) => Some(b.to_string()),
1229 (None, None) => None,
1230 }
1231}
1232
1233fn format_vid_in_list(vids: &[u64]) -> String {
1236 use std::fmt::Write;
1237 debug_assert!(!vids.is_empty());
1238 let mut s = String::with_capacity(vids.len() * 8 + 12);
1239 s.push_str("_vid IN (");
1240 for (i, v) in vids.iter().enumerate() {
1241 if i > 0 {
1242 s.push(',');
1243 }
1244 let _ = write!(s, "{v}");
1245 }
1246 s.push(')');
1247 s
1248}
1249
1250fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1252 use datafusion::common::ScalarValue;
1253 match sv {
1254 ScalarValue::UInt64(Some(v)) => Some(*v),
1255 ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1256 ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1257 ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1258 _ => None,
1259 }
1260}
1261
1262fn build_l0_vertex_batch(
1273 l0_ctx: &crate::query::df_graph::L0Context,
1274 label: &str,
1275 lance_schema: &SchemaRef,
1276 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1277 target_vids: Option<&[u64]>,
1278) -> DFResult<RecordBatch> {
1279 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1282 let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1288 let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1289
1290 for l0 in l0_ctx.iter_l0_buffers() {
1291 let guard = l0.read();
1292 for vid in guard.vertex_tombstones.iter() {
1294 tombstones.insert(vid.as_u64());
1295 }
1296 let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1302 let mut out = Vec::with_capacity(tvs.len());
1303 for &tv in tvs {
1304 let vid = Vid::from(tv);
1305 if guard.vertex_properties.contains_key(&vid)
1306 && (label.is_empty()
1307 || guard
1308 .label_to_vids
1309 .get(label)
1310 .is_some_and(|s| s.contains(&vid)))
1311 {
1312 out.push(vid);
1313 }
1314 }
1315 out
1316 } else {
1317 guard.vids_for_label(label)
1318 };
1319 for vid in candidate_vids {
1320 let vid_u64 = vid.as_u64();
1321 if tombstones.contains(&vid_u64) {
1322 continue;
1323 }
1324 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1325 let entry = vid_data
1326 .entry(vid_u64)
1327 .or_insert_with(|| (Properties::new(), 0));
1328 if let Some(props) = guard.vertex_properties.get(&vid) {
1330 for (k, v) in props {
1331 entry.0.insert(k.clone(), v.clone());
1332 }
1333 }
1334 if version > entry.1 {
1336 entry.1 = version;
1337 }
1338 if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1340 vid_created_at
1341 .entry(vid_u64)
1342 .and_modify(|cur| {
1343 if ts < *cur {
1344 *cur = ts;
1345 }
1346 })
1347 .or_insert(ts);
1348 }
1349 if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1350 vid_updated_at
1351 .entry(vid_u64)
1352 .and_modify(|cur| {
1353 if ts > *cur {
1354 *cur = ts;
1355 }
1356 })
1357 .or_insert(ts);
1358 }
1359 }
1360 }
1361
1362 for t in &tombstones {
1364 vid_data.remove(t);
1365 }
1366
1367 if vid_data.is_empty() {
1368 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1369 }
1370
1371 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1373 vids.sort_unstable();
1374
1375 let num_rows = vids.len();
1376 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1377
1378 let schema_prop_names: HashSet<&str> = label_props
1380 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1381 .unwrap_or_default();
1382
1383 for field in lance_schema.fields() {
1384 let col_name = field.name().as_str();
1385 match col_name {
1386 "_vid" => {
1387 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1388 }
1389 "_deleted" => {
1390 let vals = vec![false; num_rows];
1392 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1393 }
1394 "_version" => {
1395 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1396 columns.push(Arc::new(UInt64Array::from(vals)));
1397 }
1398 "_created_at" => {
1399 let mut builder =
1400 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1401 for v in &vids {
1402 match vid_created_at.get(v) {
1403 Some(&ts) => builder.append_value(ts),
1404 None => builder.append_null(),
1405 }
1406 }
1407 columns.push(Arc::new(builder.finish()));
1408 }
1409 "_updated_at" => {
1410 let mut builder =
1411 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1412 for v in &vids {
1413 match vid_updated_at.get(v) {
1414 Some(&ts) => builder.append_value(ts),
1415 None => builder.append_null(),
1416 }
1417 }
1418 columns.push(Arc::new(builder.finish()));
1419 }
1420 "overflow_json" => {
1421 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1423 for vid_u64 in &vids {
1424 let (props, _) = &vid_data[vid_u64];
1425 let mut overflow = serde_json::Map::new();
1426 for (k, v) in props {
1427 if k == "ext_id" || k.starts_with('_') {
1428 continue;
1429 }
1430 if !schema_prop_names.contains(k.as_str()) {
1431 let json_val: serde_json::Value = v.clone().into();
1432 overflow.insert(k.clone(), json_val);
1433 }
1434 }
1435 if overflow.is_empty() {
1436 builder.append_null();
1437 } else {
1438 let json_val = serde_json::Value::Object(overflow);
1439 match encode_cypher_value(&json_val) {
1440 Ok(bytes) => builder.append_value(bytes),
1441 Err(_) => builder.append_null(),
1442 }
1443 }
1444 }
1445 columns.push(Arc::new(builder.finish()));
1446 }
1447 _ => {
1448 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1450 columns.push(col);
1451 }
1452 }
1453 }
1454
1455 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1456}
1457
1458fn build_l0_property_column(
1462 vids: &[u64],
1463 vid_data: &HashMap<u64, (Properties, u64)>,
1464 prop_name: &str,
1465 data_type: &DataType,
1466) -> DFResult<ArrayRef> {
1467 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1469 let props_map: HashMap<Vid, Properties> = vid_data
1470 .iter()
1471 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1472 .collect();
1473
1474 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1475}
1476
1477fn build_l0_edge_batch(
1483 l0_ctx: &crate::query::df_graph::L0Context,
1484 edge_type: &str,
1485 internal_schema: &SchemaRef,
1486 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1487) -> DFResult<RecordBatch> {
1488 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1491 let mut tombstones: HashSet<u64> = HashSet::new();
1492 let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1495 let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1496
1497 for l0 in l0_ctx.iter_l0_buffers() {
1498 let guard = l0.read();
1499 for eid in guard.tombstones.keys() {
1501 tombstones.insert(eid.as_u64());
1502 }
1503 for eid in guard.eids_for_type(edge_type) {
1505 let eid_u64 = eid.as_u64();
1506 if tombstones.contains(&eid_u64) {
1507 continue;
1508 }
1509 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1510 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1511 None => continue,
1512 };
1513 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1514 let entry = eid_data
1515 .entry(eid_u64)
1516 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1517 if let Some(props) = guard.edge_properties.get(&eid) {
1519 for (k, v) in props {
1520 entry.2.insert(k.clone(), v.clone());
1521 }
1522 }
1523 entry.0 = src_vid;
1525 entry.1 = dst_vid;
1526 if version > entry.3 {
1528 entry.3 = version;
1529 }
1530 if let Some(&ts) = guard.edge_created_at.get(&eid) {
1532 eid_created_at
1533 .entry(eid_u64)
1534 .and_modify(|cur| {
1535 if ts < *cur {
1536 *cur = ts;
1537 }
1538 })
1539 .or_insert(ts);
1540 }
1541 if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1542 eid_updated_at
1543 .entry(eid_u64)
1544 .and_modify(|cur| {
1545 if ts > *cur {
1546 *cur = ts;
1547 }
1548 })
1549 .or_insert(ts);
1550 }
1551 }
1552 }
1553
1554 for t in &tombstones {
1556 eid_data.remove(t);
1557 }
1558
1559 if eid_data.is_empty() {
1560 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1561 }
1562
1563 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1565 eids.sort_unstable();
1566
1567 let num_rows = eids.len();
1568 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1569
1570 let schema_prop_names: HashSet<&str> = type_props
1572 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1573 .unwrap_or_default();
1574
1575 for field in internal_schema.fields() {
1576 let col_name = field.name().as_str();
1577 match col_name {
1578 "eid" => {
1579 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1580 }
1581 "src_vid" => {
1582 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1583 columns.push(Arc::new(UInt64Array::from(vals)));
1584 }
1585 "dst_vid" => {
1586 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1587 columns.push(Arc::new(UInt64Array::from(vals)));
1588 }
1589 "op" => {
1590 let vals = vec![0u8; num_rows];
1592 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1593 }
1594 "_version" => {
1595 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1596 columns.push(Arc::new(UInt64Array::from(vals)));
1597 }
1598 "_created_at" => {
1599 let mut builder =
1600 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1601 for e in &eids {
1602 match eid_created_at.get(e) {
1603 Some(&ts) => builder.append_value(ts),
1604 None => builder.append_null(),
1605 }
1606 }
1607 columns.push(Arc::new(builder.finish()));
1608 }
1609 "_updated_at" => {
1610 let mut builder =
1611 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1612 for e in &eids {
1613 match eid_updated_at.get(e) {
1614 Some(&ts) => builder.append_value(ts),
1615 None => builder.append_null(),
1616 }
1617 }
1618 columns.push(Arc::new(builder.finish()));
1619 }
1620 "overflow_json" => {
1621 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1623 for eid_u64 in &eids {
1624 let (_, _, props, _) = &eid_data[eid_u64];
1625 let mut overflow = serde_json::Map::new();
1626 for (k, v) in props {
1627 if k.starts_with('_') {
1628 continue;
1629 }
1630 if !schema_prop_names.contains(k.as_str()) {
1631 let json_val: serde_json::Value = v.clone().into();
1632 overflow.insert(k.clone(), json_val);
1633 }
1634 }
1635 if overflow.is_empty() {
1636 builder.append_null();
1637 } else {
1638 let json_val = serde_json::Value::Object(overflow);
1639 match encode_cypher_value(&json_val) {
1640 Ok(bytes) => builder.append_value(bytes),
1641 Err(_) => builder.append_null(),
1642 }
1643 }
1644 }
1645 columns.push(Arc::new(builder.finish()));
1646 }
1647 _ => {
1648 let col =
1650 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1651 columns.push(col);
1652 }
1653 }
1654 }
1655
1656 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1657}
1658
1659fn build_l0_edge_property_column(
1663 eids: &[u64],
1664 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1665 prop_name: &str,
1666 data_type: &DataType,
1667) -> DFResult<ArrayRef> {
1668 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1670 let props_map: HashMap<Vid, Properties> = eid_data
1671 .iter()
1672 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1673 .collect();
1674
1675 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1676}
1677
1678fn build_labels_column_for_known_label(
1684 vid_arr: &UInt64Array,
1685 label: &str,
1686 l0_ctx: &crate::query::df_graph::L0Context,
1687 batch_labels_col: Option<&arrow_array::ListArray>,
1688) -> DFResult<ArrayRef> {
1689 use uni_store::storage::arrow_convert::labels_from_list_array;
1690
1691 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1692
1693 for i in 0..vid_arr.len() {
1694 let vid = Vid::from(vid_arr.value(i));
1695
1696 let mut labels = match batch_labels_col {
1698 Some(list_arr) => {
1699 let stored = labels_from_list_array(list_arr, i);
1700 if stored.is_empty() {
1701 vec![label.to_string()]
1702 } else {
1703 stored
1704 }
1705 }
1706 None => vec![label.to_string()],
1707 };
1708
1709 if !labels.iter().any(|l| l == label) {
1711 labels.push(label.to_string());
1712 }
1713
1714 for l0 in l0_ctx.iter_l0_buffers() {
1716 let guard = l0.read();
1717 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1718 for lbl in l0_labels {
1719 if !labels.contains(lbl) {
1720 labels.push(lbl.clone());
1721 }
1722 }
1723 }
1724 }
1725
1726 let values = labels_builder.values();
1727 for lbl in &labels {
1728 values.append_value(lbl);
1729 }
1730 labels_builder.append(true);
1731 }
1732
1733 Ok(Arc::new(labels_builder.finish()))
1734}
1735
1736fn map_to_output_schema(
1742 batch: &RecordBatch,
1743 label: &str,
1744 _variable: &str,
1745 projected_properties: &[String],
1746 output_schema: &SchemaRef,
1747 l0_ctx: &crate::query::df_graph::L0Context,
1748) -> DFResult<RecordBatch> {
1749 if batch.num_rows() == 0 {
1750 return Ok(RecordBatch::new_empty(output_schema.clone()));
1751 }
1752
1753 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1754
1755 let vid_col = batch
1757 .column_by_name("_vid")
1758 .ok_or_else(|| {
1759 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1760 })?
1761 .clone();
1762 let vid_arr = vid_col
1763 .as_any()
1764 .downcast_ref::<UInt64Array>()
1765 .ok_or_else(|| {
1766 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1767 })?;
1768
1769 let batch_labels_col = batch
1771 .column_by_name("_labels")
1772 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1773 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1774 columns.push(vid_col.clone());
1775 columns.push(labels_col);
1776
1777 let overflow_arr = batch
1780 .column_by_name("overflow_json")
1781 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1782
1783 for prop in projected_properties {
1784 if prop == "overflow_json" {
1785 match batch.column_by_name("overflow_json") {
1786 Some(col) => columns.push(col.clone()),
1787 None => {
1788 columns.push(arrow_array::new_null_array(
1790 &DataType::LargeBinary,
1791 batch.num_rows(),
1792 ));
1793 }
1794 }
1795 } else if prop == "_all_props" {
1796 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1800 let guard = l0.read();
1801 !guard.vertex_properties.is_empty()
1802 });
1803 let has_schema_cols = projected_properties
1805 .iter()
1806 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1807
1808 if !any_l0_has_vertex_props && !has_schema_cols {
1809 match batch.column_by_name("overflow_json") {
1811 Some(col) => columns.push(col.clone()),
1812 None => {
1813 columns.push(arrow_array::new_null_array(
1814 &DataType::LargeBinary,
1815 batch.num_rows(),
1816 ));
1817 }
1818 }
1819 } else {
1820 let col = build_all_props_column_for_schema_scan(
1822 batch,
1823 vid_arr,
1824 overflow_arr,
1825 projected_properties,
1826 l0_ctx,
1827 );
1828 columns.push(col);
1829 }
1830 } else {
1831 match batch.column_by_name(prop) {
1832 Some(col) => columns.push(col.clone()),
1833 None => {
1834 let col = build_overflow_property_column(
1837 batch.num_rows(),
1838 vid_arr,
1839 overflow_arr,
1840 prop,
1841 l0_ctx,
1842 );
1843 columns.push(col);
1844 }
1845 }
1846 }
1847 }
1848
1849 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1850}
1851
1852fn map_edge_to_output_schema(
1859 batch: &RecordBatch,
1860 variable: &str,
1861 projected_properties: &[String],
1862 output_schema: &SchemaRef,
1863) -> DFResult<RecordBatch> {
1864 if batch.num_rows() == 0 {
1865 return Ok(RecordBatch::new_empty(output_schema.clone()));
1866 }
1867
1868 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1869
1870 let eid_col = batch
1872 .column_by_name("eid")
1873 .ok_or_else(|| {
1874 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1875 })?
1876 .clone();
1877 columns.push(eid_col);
1878
1879 let src_col = batch
1881 .column_by_name("src_vid")
1882 .ok_or_else(|| {
1883 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1884 })?
1885 .clone();
1886 columns.push(src_col);
1887
1888 let dst_col = batch
1890 .column_by_name("dst_vid")
1891 .ok_or_else(|| {
1892 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1893 })?
1894 .clone();
1895 columns.push(dst_col);
1896
1897 for prop in projected_properties {
1899 if prop == "overflow_json" {
1900 match batch.column_by_name("overflow_json") {
1901 Some(col) => columns.push(col.clone()),
1902 None => {
1903 columns.push(arrow_array::new_null_array(
1904 &DataType::LargeBinary,
1905 batch.num_rows(),
1906 ));
1907 }
1908 }
1909 } else {
1910 match batch.column_by_name(prop) {
1911 Some(col) => columns.push(col.clone()),
1912 None => {
1913 let overflow_arr = batch
1916 .column_by_name("overflow_json")
1917 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1918
1919 if let Some(arr) = overflow_arr {
1920 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1921 for i in 0..batch.num_rows() {
1922 if !arr.is_null(i) {
1923 let blob = arr.value(i);
1924 if let Some(sub_bytes) =
1926 uni_common::cypher_value_codec::extract_map_entry_raw(
1927 blob, prop,
1928 )
1929 {
1930 builder.append_value(&sub_bytes);
1931 } else {
1932 builder.append_null();
1933 }
1934 } else {
1935 builder.append_null();
1936 }
1937 }
1938 columns.push(Arc::new(builder.finish()));
1939 } else {
1940 let target_field = output_schema
1942 .fields()
1943 .iter()
1944 .find(|f| f.name() == &format!("{}.{}", variable, prop));
1945 let dt = target_field
1946 .map(|f| f.data_type().clone())
1947 .unwrap_or(DataType::LargeBinary);
1948 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1949 }
1950 }
1951 }
1952 }
1953 }
1954
1955 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1956}
1957
1958#[expect(clippy::too_many_arguments)]
1965async fn columnar_scan_vertex_batch_static(
1966 graph_ctx: &GraphExecutionContext,
1967 label: &str,
1968 variable: &str,
1969 projected_properties: &[String],
1970 output_schema: &SchemaRef,
1971 filter: &Option<Arc<dyn PhysicalExpr>>,
1972 vid_list_filter: Option<&[u64]>,
1973 extra_lance_filter: Option<&str>,
1974 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
1975) -> DFResult<RecordBatch> {
1976 let storage = graph_ctx.storage();
1977 let l0_ctx = graph_ctx.l0_context();
1978 let uni_schema = storage.schema_manager().schema();
1979 let label_props = uni_schema.properties.get(label);
1980
1981 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
1986
1987 let mut lance_columns: Vec<String> = vec![
1989 "_vid".to_string(),
1990 "_deleted".to_string(),
1991 "_version".to_string(),
1992 ];
1993 for prop in projected_properties {
1994 if prop == "overflow_json" {
1995 push_column_if_absent(&mut lance_columns, "overflow_json");
1996 } else if prop == "_created_at" || prop == "_updated_at" {
1997 push_column_if_absent(&mut lance_columns, prop);
2000 } else {
2001 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2002 if exists_in_schema {
2003 push_column_if_absent(&mut lance_columns, prop);
2004 }
2005 }
2006 }
2007
2008 let needs_overflow = projected_properties.iter().any(|p| {
2011 p == "overflow_json"
2012 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2013 && !label_props.is_some_and(|lp| lp.contains_key(p)))
2014 });
2015 if needs_overflow {
2016 push_column_if_absent(&mut lance_columns, "overflow_json");
2017 }
2018
2019 let vid_part = match (vid_list_filter, target_vid) {
2024 (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2025 (_, Some(v)) => Some(format!("_vid = {v}")),
2026 _ => None,
2027 };
2028 let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2029 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2030
2031 let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2037 Some(reg) => match reg.lookup_label_storage(label) {
2038 Some(plugin_storage) => {
2039 let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2040 datafusion::error::DataFusionError::Execution(format!(
2041 "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2042 e.message, e.code
2043 ))
2044 })?;
2045 use futures::StreamExt;
2046 let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2047 let mut schema_ref: Option<SchemaRef> = None;
2048 while let Some(b) = stream.next().await {
2049 let b = b.map_err(|e| {
2050 datafusion::error::DataFusionError::Execution(format!(
2051 "plugin Storage stream({label}) errored: {e}"
2052 ))
2053 })?;
2054 if schema_ref.is_none() {
2055 schema_ref = Some(b.schema());
2056 }
2057 batches.push(b);
2058 }
2059 if let Some(s) = schema_ref {
2060 Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2061 datafusion::error::DataFusionError::Execution(format!(
2062 "plugin Storage concat({label}) failed: {e}"
2063 ))
2064 })?)
2065 } else {
2066 None
2067 }
2068 }
2069 None => None,
2070 },
2071 None => None,
2072 };
2073
2074 let lance_batch = match plugin_batch {
2075 Some(b) => Some(b),
2076 None => storage
2077 .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2078 .await
2079 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2080 };
2081
2082 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2084
2085 let internal_schema = match &lance_deduped {
2088 Some(batch) => batch.schema(),
2089 None => {
2090 let mut fields = vec![
2091 Field::new("_vid", DataType::UInt64, false),
2092 Field::new("_deleted", DataType::Boolean, false),
2093 Field::new("_version", DataType::UInt64, false),
2094 ];
2095 for col in &lance_columns {
2096 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2097 continue;
2098 }
2099 if col == "overflow_json" {
2100 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2101 } else if col == "_created_at" || col == "_updated_at" {
2102 fields.push(Field::new(
2103 col,
2104 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2105 true,
2106 ));
2107 } else {
2108 let arrow_type = label_props
2109 .and_then(|lp| lp.get(col.as_str()))
2110 .map(|meta| meta.r#type.to_arrow())
2111 .unwrap_or(DataType::LargeBinary);
2112 fields.push(Field::new(col, arrow_type, true));
2113 }
2114 }
2115 Arc::new(Schema::new(fields))
2116 }
2117 };
2118
2119 let single_vid_buf: [u64; 1];
2125 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2126 (Some(vs), _) if !vs.is_empty() => Some(vs),
2127 (_, Some(v)) => {
2128 single_vid_buf = [v];
2129 Some(&single_vid_buf)
2130 }
2131 _ => None,
2132 };
2133 let l0_batch =
2134 build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2135
2136 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2138 else {
2139 return Ok(RecordBatch::new_empty(output_schema.clone()));
2140 };
2141
2142 let merged = filter_deleted_rows(&merged)?;
2144 if merged.num_rows() == 0 {
2145 return Ok(RecordBatch::new_empty(output_schema.clone()));
2146 }
2147
2148 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2150
2151 if filtered.num_rows() == 0 {
2152 return Ok(RecordBatch::new_empty(output_schema.clone()));
2153 }
2154
2155 let mapped = map_to_output_schema(
2157 &filtered,
2158 label,
2159 variable,
2160 projected_properties,
2161 output_schema,
2162 l0_ctx,
2163 )?;
2164
2165 apply_runtime_filter(mapped, extra_runtime_filter)
2169}
2170
2171fn apply_runtime_filter(
2176 batch: RecordBatch,
2177 runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2178) -> DFResult<RecordBatch> {
2179 let Some(filter) = runtime_filter else {
2180 return Ok(batch);
2181 };
2182 if batch.num_rows() == 0 {
2183 return Ok(batch);
2184 }
2185 let result = filter.evaluate(&batch)?;
2186 let array = result.into_array(batch.num_rows())?;
2187 let bools = array
2188 .as_any()
2189 .downcast_ref::<arrow_array::BooleanArray>()
2190 .ok_or_else(|| {
2191 datafusion::error::DataFusionError::Internal(
2192 "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2193 )
2194 })?;
2195 arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2196}
2197
2198async fn columnar_scan_edge_batch_static(
2205 graph_ctx: &GraphExecutionContext,
2206 edge_type: &str,
2207 variable: &str,
2208 projected_properties: &[String],
2209 output_schema: &SchemaRef,
2210) -> DFResult<RecordBatch> {
2211 let storage = graph_ctx.storage();
2212 let l0_ctx = graph_ctx.l0_context();
2213 let uni_schema = storage.schema_manager().schema();
2214 let type_props = uni_schema.properties.get(edge_type);
2215
2216 let mut lance_columns: Vec<String> = vec![
2218 "eid".to_string(),
2219 "src_vid".to_string(),
2220 "dst_vid".to_string(),
2221 "op".to_string(),
2222 "_version".to_string(),
2223 ];
2224 for prop in projected_properties {
2225 if prop == "overflow_json" {
2226 push_column_if_absent(&mut lance_columns, "overflow_json");
2227 } else if prop == "_created_at" || prop == "_updated_at" {
2228 push_column_if_absent(&mut lance_columns, prop);
2230 } else {
2231 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2232 if exists_in_schema {
2233 push_column_if_absent(&mut lance_columns, prop);
2234 }
2235 }
2236 }
2237
2238 let needs_overflow = projected_properties.iter().any(|p| {
2241 p == "overflow_json"
2242 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2243 && !type_props.is_some_and(|tp| tp.contains_key(p)))
2244 });
2245 if needs_overflow {
2246 push_column_if_absent(&mut lance_columns, "overflow_json");
2247 }
2248
2249 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2251 let lance_batch = storage
2252 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2253 .await
2254 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2255
2256 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2258
2259 let internal_schema = match &lance_deduped {
2262 Some(batch) => batch.schema(),
2263 None => {
2264 let mut fields = vec![
2265 Field::new("eid", DataType::UInt64, false),
2266 Field::new("src_vid", DataType::UInt64, false),
2267 Field::new("dst_vid", DataType::UInt64, false),
2268 Field::new("op", DataType::UInt8, false),
2269 Field::new("_version", DataType::UInt64, false),
2270 ];
2271 for col in &lance_columns {
2272 if matches!(
2273 col.as_str(),
2274 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2275 ) {
2276 continue;
2277 }
2278 if col == "overflow_json" {
2279 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2280 } else if col == "_created_at" || col == "_updated_at" {
2281 fields.push(Field::new(
2282 col,
2283 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2284 true,
2285 ));
2286 } else {
2287 let arrow_type = type_props
2288 .and_then(|tp| tp.get(col.as_str()))
2289 .map(|meta| meta.r#type.to_arrow())
2290 .unwrap_or(DataType::LargeBinary);
2291 fields.push(Field::new(col, arrow_type, true));
2292 }
2293 }
2294 Arc::new(Schema::new(fields))
2295 }
2296 };
2297
2298 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2300
2301 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2303 return Ok(RecordBatch::new_empty(output_schema.clone()));
2304 };
2305
2306 let merged = filter_deleted_edge_ops(&merged)?;
2308 if merged.num_rows() == 0 {
2309 return Ok(RecordBatch::new_empty(output_schema.clone()));
2310 }
2311
2312 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2314
2315 if filtered.num_rows() == 0 {
2316 return Ok(RecordBatch::new_empty(output_schema.clone()));
2317 }
2318
2319 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2321}
2322
2323#[expect(clippy::too_many_arguments)]
2330async fn columnar_scan_schemaless_vertex_batch_static(
2331 graph_ctx: &GraphExecutionContext,
2332 label: &str,
2333 variable: &str,
2334 projected_properties: &[String],
2335 output_schema: &SchemaRef,
2336 filter: &Option<Arc<dyn PhysicalExpr>>,
2337 vid_list_filter: Option<&[u64]>,
2338 extra_lance_filter: Option<&str>,
2339 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2340) -> DFResult<RecordBatch> {
2341 let storage = graph_ctx.storage();
2342 let l0_ctx = graph_ctx.l0_context();
2343
2344 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2348
2349 let filter = {
2352 let mut parts = Vec::new();
2353
2354 match (vid_list_filter, target_vid) {
2357 (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2358 (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2359 _ => {}
2360 }
2361
2362 if !label.is_empty() {
2364 if label.contains(':') {
2365 for lbl in label.split(':') {
2367 parts.push(format!("array_contains(labels, '{}')", lbl));
2368 }
2369 } else {
2370 parts.push(format!("array_contains(labels, '{}')", label));
2371 }
2372 }
2373
2374 if let Some(extra) = extra_lance_filter {
2376 parts.push(extra.to_string());
2377 }
2378
2379 if parts.is_empty() {
2380 None
2381 } else {
2382 Some(parts.join(" AND "))
2383 }
2384 };
2385
2386 let lance_batch = storage
2388 .scan_main_vertex_table(
2389 &["_vid", "_deleted", "labels", "props_json", "_version"],
2390 filter.as_deref(),
2391 )
2392 .await
2393 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2394
2395 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2397
2398 let internal_schema = match &lance_deduped {
2401 Some(batch) => batch.schema(),
2402 None => Arc::new(Schema::new(vec![
2403 Field::new("_vid", DataType::UInt64, false),
2404 Field::new("_deleted", DataType::Boolean, false),
2405 Field::new("labels", labels_data_type(), false),
2406 Field::new("props_json", DataType::LargeBinary, true),
2407 Field::new("_version", DataType::UInt64, false),
2408 ])),
2409 };
2410
2411 let single_vid_buf: [u64; 1];
2415 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2416 (Some(vs), _) if !vs.is_empty() => Some(vs),
2417 (_, Some(v)) => {
2418 single_vid_buf = [v];
2419 Some(&single_vid_buf)
2420 }
2421 _ => None,
2422 };
2423 let l0_batch =
2424 build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2425
2426 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2428 else {
2429 return Ok(RecordBatch::new_empty(output_schema.clone()));
2430 };
2431
2432 let merged = filter_deleted_rows(&merged)?;
2434 if merged.num_rows() == 0 {
2435 return Ok(RecordBatch::new_empty(output_schema.clone()));
2436 }
2437
2438 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2440
2441 if filtered.num_rows() == 0 {
2442 return Ok(RecordBatch::new_empty(output_schema.clone()));
2443 }
2444
2445 let mapped = map_to_schemaless_output_schema(
2447 &filtered,
2448 variable,
2449 projected_properties,
2450 output_schema,
2451 l0_ctx,
2452 )?;
2453
2454 apply_runtime_filter(mapped, extra_runtime_filter)
2456}
2457
2458fn build_l0_schemaless_vertex_batch(
2464 l0_ctx: &crate::query::df_graph::L0Context,
2465 label: &str,
2466 internal_schema: &SchemaRef,
2467 target_vids: Option<&[u64]>,
2468) -> DFResult<RecordBatch> {
2469 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2472 let mut tombstones: HashSet<u64> = HashSet::new();
2473
2474 let label_filter: Vec<&str> = if label.is_empty() {
2476 vec![]
2477 } else if label.contains(':') {
2478 label.split(':').collect()
2479 } else {
2480 vec![label]
2481 };
2482
2483 for l0 in l0_ctx.iter_l0_buffers() {
2484 let guard = l0.read();
2485
2486 for vid in guard.vertex_tombstones.iter() {
2488 tombstones.insert(vid.as_u64());
2489 }
2490
2491 let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2494 let mut out = Vec::with_capacity(tvs.len());
2495 for &tv in tvs {
2496 let vid = Vid::from(tv);
2497 if !guard.vertex_properties.contains_key(&vid) {
2498 continue;
2499 }
2500 let label_ok = if label_filter.is_empty() {
2501 true
2502 } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2503 label_filter
2504 .iter()
2505 .all(|lf| labels.contains(&lf.to_string()))
2506 } else {
2507 false
2508 };
2509 if label_ok {
2510 out.push(vid);
2511 }
2512 }
2513 out
2514 } else if label_filter.is_empty() {
2515 guard.all_vertex_vids()
2516 } else if label_filter.len() == 1 {
2517 guard.vids_for_label(label_filter[0])
2518 } else {
2519 guard.vids_with_all_labels(&label_filter)
2520 };
2521
2522 for vid in vids {
2523 let vid_u64 = vid.as_u64();
2524 if tombstones.contains(&vid_u64) {
2525 continue;
2526 }
2527 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2528 let entry = vid_data
2529 .entry(vid_u64)
2530 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2531
2532 if let Some(props) = guard.vertex_properties.get(&vid) {
2534 for (k, v) in props {
2535 entry.0.insert(k.clone(), v.clone());
2536 }
2537 }
2538 if version > entry.1 {
2540 entry.1 = version;
2541 }
2542 if let Some(labels) = guard.vertex_labels.get(&vid) {
2544 entry.2 = labels.clone();
2545 }
2546 }
2547 }
2548
2549 for t in &tombstones {
2551 vid_data.remove(t);
2552 }
2553
2554 if vid_data.is_empty() {
2555 return Ok(RecordBatch::new_empty(internal_schema.clone()));
2556 }
2557
2558 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2560 vids.sort_unstable();
2561
2562 let num_rows = vids.len();
2563 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2564
2565 for field in internal_schema.fields() {
2566 match field.name().as_str() {
2567 "_vid" => {
2568 columns.push(Arc::new(UInt64Array::from(vids.clone())));
2569 }
2570 "labels" => {
2571 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2572 for vid_u64 in &vids {
2573 let (_, _, labels) = &vid_data[vid_u64];
2574 let values = labels_builder.values();
2575 for lbl in labels {
2576 values.append_value(lbl);
2577 }
2578 labels_builder.append(true);
2579 }
2580 columns.push(Arc::new(labels_builder.finish()));
2581 }
2582 "props_json" => {
2583 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2584 for vid_u64 in &vids {
2585 let (props, _, _) = &vid_data[vid_u64];
2586 if props.is_empty() {
2587 builder.append_null();
2588 } else {
2589 let json_obj: serde_json::Value = {
2591 let mut map = serde_json::Map::new();
2592 for (k, v) in props {
2593 let json_val: serde_json::Value = v.clone().into();
2594 map.insert(k.clone(), json_val);
2595 }
2596 serde_json::Value::Object(map)
2597 };
2598 match encode_cypher_value(&json_obj) {
2599 Ok(bytes) => builder.append_value(bytes),
2600 Err(_) => builder.append_null(),
2601 }
2602 }
2603 }
2604 columns.push(Arc::new(builder.finish()));
2605 }
2606 "_deleted" => {
2607 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2609 false;
2610 num_rows
2611 ])));
2612 }
2613 "_version" => {
2614 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2615 columns.push(Arc::new(UInt64Array::from(vals)));
2616 }
2617 _ => {
2618 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2620 }
2621 }
2622 }
2623
2624 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2625}
2626
2627fn map_to_schemaless_output_schema(
2634 batch: &RecordBatch,
2635 _variable: &str,
2636 projected_properties: &[String],
2637 output_schema: &SchemaRef,
2638 l0_ctx: &crate::query::df_graph::L0Context,
2639) -> DFResult<RecordBatch> {
2640 if batch.num_rows() == 0 {
2641 return Ok(RecordBatch::new_empty(output_schema.clone()));
2642 }
2643
2644 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2645
2646 let vid_col = batch
2648 .column_by_name("_vid")
2649 .ok_or_else(|| {
2650 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2651 })?
2652 .clone();
2653 let vid_arr = vid_col
2654 .as_any()
2655 .downcast_ref::<UInt64Array>()
2656 .ok_or_else(|| {
2657 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2658 })?;
2659 columns.push(vid_col.clone());
2660
2661 let labels_col = batch.column_by_name("labels");
2663 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2664
2665 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2666 for i in 0..vid_arr.len() {
2667 let vid_u64 = vid_arr.value(i);
2668 let vid = Vid::from(vid_u64);
2669
2670 let mut row_labels: Vec<String> = Vec::new();
2672 if let Some(arr) = labels_arr
2673 && !arr.is_null(i)
2674 {
2675 let list_val = arr.value(i);
2676 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2677 for j in 0..str_arr.len() {
2678 if !str_arr.is_null(j) {
2679 row_labels.push(str_arr.value(j).to_string());
2680 }
2681 }
2682 }
2683 }
2684
2685 for l0 in l0_ctx.iter_l0_buffers() {
2687 let guard = l0.read();
2688 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2689 for lbl in l0_labels {
2690 if !row_labels.contains(lbl) {
2691 row_labels.push(lbl.clone());
2692 }
2693 }
2694 }
2695 }
2696
2697 let values = labels_builder.values();
2698 for lbl in &row_labels {
2699 values.append_value(lbl);
2700 }
2701 labels_builder.append(true);
2702 }
2703 columns.push(Arc::new(labels_builder.finish()));
2704
2705 let props_col = batch.column_by_name("props_json");
2707 let props_arr =
2708 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2709
2710 for prop in projected_properties {
2711 if prop == "_all_props" {
2712 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2715 let guard = l0.read();
2716 !guard.vertex_properties.is_empty()
2717 });
2718 if !any_l0_has_vertex_props {
2719 match props_col {
2720 Some(col) => columns.push(col.clone()),
2721 None => {
2722 columns.push(arrow_array::new_null_array(
2723 &DataType::LargeBinary,
2724 batch.num_rows(),
2725 ));
2726 }
2727 }
2728 } else {
2729 let col = build_all_props_column_with_l0_overlay(
2730 batch.num_rows(),
2731 vid_arr,
2732 props_arr,
2733 l0_ctx,
2734 );
2735 columns.push(col);
2736 }
2737 } else {
2738 let expected_type = output_schema
2743 .field_with_name(&format!("{_variable}.{prop}"))
2744 .map(|f| f.data_type().clone())
2745 .unwrap_or(DataType::LargeBinary);
2746
2747 if expected_type == DataType::LargeBinary {
2748 let col = build_overflow_property_column(
2749 batch.num_rows(),
2750 vid_arr,
2751 props_arr,
2752 prop,
2753 l0_ctx,
2754 );
2755 columns.push(col);
2756 } else {
2757 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2759 for i in 0..batch.num_rows() {
2760 let vid = Vid::from(vid_arr.value(i));
2761 let resolved =
2762 resolve_l0_property(&vid, prop, l0_ctx)
2763 .flatten()
2764 .or_else(|| {
2765 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2766 uni_common::cypher_value_codec::decode(&bytes).ok()
2767 })
2768 });
2769 if let Some(val) = resolved {
2770 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2771 }
2772 }
2773 let vids: Vec<Vid> = (0..batch.num_rows())
2774 .map(|i| Vid::from(vid_arr.value(i)))
2775 .collect();
2776 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2777 .unwrap_or_else(|_| {
2778 arrow_array::new_null_array(&expected_type, batch.num_rows())
2779 });
2780 columns.push(col);
2781 }
2782 }
2783 }
2784
2785 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2786}
2787
2788pub(crate) fn get_property_value(
2790 vid: &Vid,
2791 props_map: &HashMap<Vid, Properties>,
2792 prop_name: &str,
2793) -> Option<Value> {
2794 if prop_name == "_all_props" {
2795 return props_map.get(vid).map(|p| {
2796 let map: HashMap<String, Value> =
2797 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2798 Value::Map(map)
2799 });
2800 }
2801 props_map
2802 .get(vid)
2803 .and_then(|props| props.get(prop_name))
2804 .cloned()
2805}
2806
2807pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2811 let uni_val: uni_common::Value = val.clone().into();
2812 Ok(uni_common::cypher_value_codec::encode(&uni_val))
2813}
2814
2815macro_rules! build_numeric_column {
2817 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2818 let mut builder = <$builder_ty>::new();
2819 for vid in $vids {
2820 match get_property_value(vid, $props_map, $prop_name) {
2821 Some(ref v) => {
2822 if let Some(val) = $extractor(v) {
2823 builder.append_value($cast(val));
2824 } else {
2825 builder.append_null();
2826 }
2827 }
2828 None => builder.append_null(),
2829 }
2830 }
2831 Ok(Arc::new(builder.finish()) as ArrayRef)
2832 }};
2833}
2834
2835pub(crate) fn build_property_column_static(
2837 vids: &[Vid],
2838 props_map: &HashMap<Vid, Properties>,
2839 prop_name: &str,
2840 data_type: &DataType,
2841) -> DFResult<ArrayRef> {
2842 match data_type {
2843 DataType::LargeBinary => {
2844 use arrow_array::builder::LargeBinaryBuilder;
2846 let mut builder = LargeBinaryBuilder::new();
2847
2848 for vid in vids {
2849 match get_property_value(vid, props_map, prop_name) {
2850 Some(Value::Null) | None => builder.append_null(),
2851 Some(Value::Bytes(bytes)) => {
2852 builder.append_value(&bytes);
2853 }
2854 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2855 let bytes: Vec<u8> = arr
2858 .iter()
2859 .filter_map(|v| v.as_u64().map(|n| n as u8))
2860 .collect();
2861 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2862 builder.append_value(&bytes);
2863 } else {
2864 let json_val: serde_json::Value = Value::List(arr).into();
2865 match encode_cypher_value(&json_val) {
2866 Ok(encoded) => builder.append_value(encoded),
2867 Err(_) => builder.append_null(),
2868 }
2869 }
2870 }
2871 Some(val @ Value::Temporal(_)) => {
2872 builder.append_value(uni_common::cypher_value_codec::encode(&val));
2875 }
2876 Some(val) => {
2877 let json_val: serde_json::Value = val.into();
2879 match encode_cypher_value(&json_val) {
2880 Ok(bytes) => builder.append_value(bytes),
2881 Err(_) => builder.append_null(),
2882 }
2883 }
2884 }
2885 }
2886 Ok(Arc::new(builder.finish()))
2887 }
2888 DataType::Binary => {
2889 let mut builder = BinaryBuilder::new();
2891 for vid in vids {
2892 let bytes = get_property_value(vid, props_map, prop_name)
2893 .filter(|v| !v.is_null())
2894 .and_then(|v| {
2895 let json_val: serde_json::Value = v.into();
2896 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2897 })
2898 .and_then(|crdt| crdt.to_msgpack().ok());
2899 match bytes {
2900 Some(b) => builder.append_value(&b),
2901 None => builder.append_null(),
2902 }
2903 }
2904 Ok(Arc::new(builder.finish()))
2905 }
2906 DataType::Utf8 => {
2907 let mut builder = StringBuilder::new();
2908 for vid in vids {
2909 match get_property_value(vid, props_map, prop_name) {
2910 Some(Value::String(s)) => builder.append_value(s),
2911 Some(Value::Null) | None => builder.append_null(),
2912 Some(other) => builder.append_value(other.to_string()),
2913 }
2914 }
2915 Ok(Arc::new(builder.finish()))
2916 }
2917 DataType::Int64 => {
2918 build_numeric_column!(
2919 vids,
2920 props_map,
2921 prop_name,
2922 Int64Builder,
2923 |v: &Value| v.as_i64(),
2924 |v| v
2925 )
2926 }
2927 DataType::Int32 => {
2928 build_numeric_column!(
2929 vids,
2930 props_map,
2931 prop_name,
2932 Int32Builder,
2933 |v: &Value| v.as_i64(),
2934 |v: i64| v as i32
2935 )
2936 }
2937 DataType::Float64 => {
2938 build_numeric_column!(
2939 vids,
2940 props_map,
2941 prop_name,
2942 Float64Builder,
2943 |v: &Value| v.as_f64(),
2944 |v| v
2945 )
2946 }
2947 DataType::Float32 => {
2948 build_numeric_column!(
2949 vids,
2950 props_map,
2951 prop_name,
2952 Float32Builder,
2953 |v: &Value| v.as_f64(),
2954 |v: f64| v as f32
2955 )
2956 }
2957 DataType::Boolean => {
2958 let mut builder = BooleanBuilder::new();
2959 for vid in vids {
2960 match get_property_value(vid, props_map, prop_name) {
2961 Some(Value::Bool(b)) => builder.append_value(b),
2962 _ => builder.append_null(),
2963 }
2964 }
2965 Ok(Arc::new(builder.finish()))
2966 }
2967 DataType::UInt64 => {
2968 build_numeric_column!(
2969 vids,
2970 props_map,
2971 prop_name,
2972 UInt64Builder,
2973 |v: &Value| v.as_u64(),
2974 |v| v
2975 )
2976 }
2977 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2978 let values_builder = Float32Builder::new();
2980 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2981 for vid in vids {
2982 match get_property_value(vid, props_map, prop_name) {
2983 Some(Value::Vector(v)) => {
2984 for val in v {
2985 list_builder.values().append_value(val);
2986 }
2987 list_builder.append(true);
2988 }
2989 Some(Value::List(arr)) => {
2990 for v in arr {
2991 list_builder
2992 .values()
2993 .append_value(v.as_f64().unwrap_or(0.0) as f32);
2994 }
2995 list_builder.append(true);
2996 }
2997 _ => {
2998 for _ in 0..*dim {
3000 list_builder.values().append_null();
3001 }
3002 list_builder.append(false);
3003 }
3004 }
3005 }
3006 Ok(Arc::new(list_builder.finish()))
3007 }
3008 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3009 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3011 for vid in vids {
3012 match get_property_value(vid, props_map, prop_name) {
3013 Some(Value::Temporal(tv)) => match tv {
3014 uni_common::TemporalValue::DateTime {
3015 nanos_since_epoch, ..
3016 }
3017 | uni_common::TemporalValue::LocalDateTime {
3018 nanos_since_epoch, ..
3019 } => {
3020 builder.append_value(nanos_since_epoch);
3021 }
3022 uni_common::TemporalValue::Date { days_since_epoch } => {
3023 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3024 }
3025 _ => builder.append_null(),
3026 },
3027 Some(Value::String(s)) => match parse_datetime_utc(&s) {
3028 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3029 Err(_) => builder.append_null(),
3030 },
3031 Some(Value::Int(n)) => {
3032 builder.append_value(n);
3033 }
3034 _ => builder.append_null(),
3035 }
3036 }
3037 Ok(Arc::new(builder.finish()))
3038 }
3039 DataType::Date32 => {
3040 let mut builder = Date32Builder::new();
3041 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3042 for vid in vids {
3043 match get_property_value(vid, props_map, prop_name) {
3044 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3045 builder.append_value(days_since_epoch);
3046 }
3047 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3048 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3049 Err(_) => builder.append_null(),
3050 },
3051 Some(Value::Int(n)) => {
3052 builder.append_value(n as i32);
3053 }
3054 _ => builder.append_null(),
3055 }
3056 }
3057 Ok(Arc::new(builder.finish()))
3058 }
3059 DataType::Time64(TimeUnit::Nanosecond) => {
3060 let mut builder = Time64NanosecondBuilder::new();
3061 for vid in vids {
3062 match get_property_value(vid, props_map, prop_name) {
3063 Some(Value::Temporal(
3064 uni_common::TemporalValue::LocalTime {
3065 nanos_since_midnight,
3066 }
3067 | uni_common::TemporalValue::Time {
3068 nanos_since_midnight,
3069 ..
3070 },
3071 )) => {
3072 builder.append_value(nanos_since_midnight);
3073 }
3074 Some(Value::Temporal(_)) => builder.append_null(),
3075 Some(Value::String(s)) => {
3076 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3077 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3078 {
3079 Ok(t) => {
3080 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3081 + t.nanosecond() as i64;
3082 builder.append_value(nanos);
3083 }
3084 Err(_) => builder.append_null(),
3085 }
3086 }
3087 Some(Value::Int(n)) => {
3088 builder.append_value(n);
3089 }
3090 _ => builder.append_null(),
3091 }
3092 }
3093 Ok(Arc::new(builder.finish()))
3094 }
3095 DataType::Interval(IntervalUnit::MonthDayNano) => {
3096 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3097 Vec::with_capacity(vids.len());
3098 for vid in vids {
3099 match get_property_value(vid, props_map, prop_name) {
3100 Some(Value::Temporal(uni_common::TemporalValue::Duration {
3101 months,
3102 days,
3103 nanos,
3104 })) => {
3105 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3106 months: months as i32,
3107 days: days as i32,
3108 nanoseconds: nanos,
3109 }));
3110 }
3111 Some(Value::Int(_n)) => {
3112 values.push(None);
3113 }
3114 _ => values.push(None),
3115 }
3116 }
3117 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3118 Ok(Arc::new(arr))
3119 }
3120 DataType::List(inner_field) => {
3121 build_list_property_column(vids, props_map, prop_name, inner_field)
3122 }
3123 DataType::Struct(fields) => {
3124 build_struct_property_column(vids, props_map, prop_name, fields)
3125 }
3126 DataType::FixedSizeBinary(24) => {
3127 use arrow_array::builder::FixedSizeBinaryBuilder;
3129 const BTIC_LEN: i32 = 24;
3130 let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3131 for vid in vids {
3132 match get_property_value(vid, props_map, prop_name) {
3133 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3134 match uni_btic::Btic::new(lo, hi, meta) {
3135 Ok(b) => {
3136 builder
3137 .append_value(uni_btic::encode::encode(&b))
3138 .map_err(arrow_err)?;
3139 }
3140 Err(e) => {
3141 tracing::warn!(
3142 "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3143 prop_name,
3144 lo,
3145 hi,
3146 meta,
3147 e
3148 );
3149 builder.append_null()
3150 }
3151 }
3152 }
3153 Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3154 Ok(b) => {
3155 builder
3156 .append_value(uni_btic::encode::encode(&b))
3157 .map_err(arrow_err)?;
3158 }
3159 Err(e) => {
3160 tracing::warn!(
3161 "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3162 prop_name,
3163 s,
3164 e
3165 );
3166 builder.append_null()
3167 }
3168 },
3169 _ => builder.append_null(),
3170 }
3171 }
3172 Ok(Arc::new(builder.finish()))
3173 }
3174 _ => {
3176 let mut builder = StringBuilder::new();
3177 for vid in vids {
3178 match get_property_value(vid, props_map, prop_name) {
3179 Some(Value::Null) | None => builder.append_null(),
3180 Some(other) => builder.append_value(other.to_string()),
3181 }
3182 }
3183 Ok(Arc::new(builder.finish()))
3184 }
3185 }
3186}
3187
3188fn build_list_property_column(
3190 vids: &[Vid],
3191 props_map: &HashMap<Vid, Properties>,
3192 prop_name: &str,
3193 inner_field: &Arc<Field>,
3194) -> DFResult<ArrayRef> {
3195 match inner_field.data_type() {
3196 DataType::Utf8 => {
3197 let mut builder = ListBuilder::new(StringBuilder::new());
3198 for vid in vids {
3199 match get_property_value(vid, props_map, prop_name) {
3200 Some(Value::List(arr)) => {
3201 for v in arr {
3202 match v {
3203 Value::String(s) => builder.values().append_value(s),
3204 Value::Null => builder.values().append_null(),
3205 other => builder.values().append_value(format!("{other:?}")),
3206 }
3207 }
3208 builder.append(true);
3209 }
3210 _ => builder.append(false),
3211 }
3212 }
3213 Ok(Arc::new(builder.finish()))
3214 }
3215 DataType::Int64 => {
3216 let mut builder = ListBuilder::new(Int64Builder::new());
3217 for vid in vids {
3218 match get_property_value(vid, props_map, prop_name) {
3219 Some(Value::List(arr)) => {
3220 for v in arr {
3221 match v.as_i64() {
3222 Some(n) => builder.values().append_value(n),
3223 None => builder.values().append_null(),
3224 }
3225 }
3226 builder.append(true);
3227 }
3228 _ => builder.append(false),
3229 }
3230 }
3231 Ok(Arc::new(builder.finish()))
3232 }
3233 DataType::Float64 => {
3234 let mut builder = ListBuilder::new(Float64Builder::new());
3235 for vid in vids {
3236 match get_property_value(vid, props_map, prop_name) {
3237 Some(Value::List(arr)) => {
3238 for v in arr {
3239 match v.as_f64() {
3240 Some(n) => builder.values().append_value(n),
3241 None => builder.values().append_null(),
3242 }
3243 }
3244 builder.append(true);
3245 }
3246 _ => builder.append(false),
3247 }
3248 }
3249 Ok(Arc::new(builder.finish()))
3250 }
3251 DataType::Boolean => {
3252 let mut builder = ListBuilder::new(BooleanBuilder::new());
3253 for vid in vids {
3254 match get_property_value(vid, props_map, prop_name) {
3255 Some(Value::List(arr)) => {
3256 for v in arr {
3257 match v.as_bool() {
3258 Some(b) => builder.values().append_value(b),
3259 None => builder.values().append_null(),
3260 }
3261 }
3262 builder.append(true);
3263 }
3264 _ => builder.append(false),
3265 }
3266 }
3267 Ok(Arc::new(builder.finish()))
3268 }
3269 DataType::Struct(fields) => {
3270 build_list_of_structs_column(vids, props_map, prop_name, fields)
3272 }
3273 _ => {
3275 let mut builder = ListBuilder::new(StringBuilder::new());
3276 for vid in vids {
3277 match get_property_value(vid, props_map, prop_name) {
3278 Some(Value::List(arr)) => {
3279 for v in arr {
3280 match v {
3281 Value::Null => builder.values().append_null(),
3282 other => builder.values().append_value(format!("{other:?}")),
3283 }
3284 }
3285 builder.append(true);
3286 }
3287 _ => builder.append(false),
3288 }
3289 }
3290 Ok(Arc::new(builder.finish()))
3291 }
3292 }
3293}
3294
3295fn build_list_of_structs_column(
3301 vids: &[Vid],
3302 props_map: &HashMap<Vid, Properties>,
3303 prop_name: &str,
3304 fields: &Fields,
3305) -> DFResult<ArrayRef> {
3306 use arrow_array::StructArray;
3307
3308 let values: Vec<Option<Value>> = vids
3309 .iter()
3310 .map(|vid| get_property_value(vid, props_map, prop_name))
3311 .collect();
3312
3313 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3316 .iter()
3317 .map(|val| match val {
3318 Some(Value::List(arr)) => {
3319 let objs: Vec<HashMap<String, Value>> = arr
3320 .iter()
3321 .filter_map(|v| {
3322 if let Value::Map(m) = v {
3323 Some(m.clone())
3324 } else {
3325 None
3326 }
3327 })
3328 .collect();
3329 if objs.is_empty() { None } else { Some(objs) }
3330 }
3331 Some(Value::Map(obj)) => {
3332 let kv_pairs: Vec<HashMap<String, Value>> = obj
3334 .iter()
3335 .map(|(k, v)| {
3336 let mut m = HashMap::new();
3337 m.insert("key".to_string(), Value::String(k.clone()));
3338 m.insert("value".to_string(), v.clone());
3339 m
3340 })
3341 .collect();
3342 Some(kv_pairs)
3343 }
3344 _ => None,
3345 })
3346 .collect();
3347
3348 let total_items: usize = rows
3349 .iter()
3350 .filter_map(|r| r.as_ref())
3351 .map(|v| v.len())
3352 .sum();
3353
3354 let child_arrays: Vec<ArrayRef> = fields
3356 .iter()
3357 .map(|field| {
3358 let field_name = field.name();
3359 match field.data_type() {
3360 DataType::Utf8 => {
3361 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3362 for obj in rows.iter().flatten().flatten() {
3363 match obj.get(field_name) {
3364 Some(Value::String(s)) => builder.append_value(s),
3365 Some(Value::Null) | None => builder.append_null(),
3366 Some(other) => builder.append_value(format!("{other:?}")),
3367 }
3368 }
3369 Arc::new(builder.finish()) as ArrayRef
3370 }
3371 DataType::Int64 => {
3372 let mut builder = Int64Builder::with_capacity(total_items);
3373 for obj in rows.iter().flatten().flatten() {
3374 match obj.get(field_name).and_then(|v| v.as_i64()) {
3375 Some(n) => builder.append_value(n),
3376 None => builder.append_null(),
3377 }
3378 }
3379 Arc::new(builder.finish()) as ArrayRef
3380 }
3381 DataType::Float64 => {
3382 let mut builder = Float64Builder::with_capacity(total_items);
3383 for obj in rows.iter().flatten().flatten() {
3384 match obj.get(field_name).and_then(|v| v.as_f64()) {
3385 Some(n) => builder.append_value(n),
3386 None => builder.append_null(),
3387 }
3388 }
3389 Arc::new(builder.finish()) as ArrayRef
3390 }
3391 _ => {
3393 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3394 for obj in rows.iter().flatten().flatten() {
3395 match obj.get(field_name) {
3396 Some(Value::Null) | None => builder.append_null(),
3397 Some(other) => builder.append_value(format!("{other:?}")),
3398 }
3399 }
3400 Arc::new(builder.finish()) as ArrayRef
3401 }
3402 }
3403 })
3404 .collect();
3405
3406 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3408 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3409
3410 let mut offsets = Vec::with_capacity(vids.len() + 1);
3412 let mut nulls = Vec::with_capacity(vids.len());
3413 let mut offset = 0i32;
3414 offsets.push(offset);
3415 for row in &rows {
3416 match row {
3417 Some(objs) => {
3418 offset += objs.len() as i32;
3419 offsets.push(offset);
3420 nulls.push(true);
3421 }
3422 None => {
3423 offsets.push(offset);
3424 nulls.push(false);
3425 }
3426 }
3427 }
3428
3429 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3430 let list_array = arrow_array::ListArray::try_new(
3431 list_field,
3432 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3433 Arc::new(struct_array),
3434 Some(arrow::buffer::NullBuffer::from(nulls)),
3435 )
3436 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3437
3438 Ok(Arc::new(list_array))
3439}
3440
3441fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3444 use uni_common::value::TemporalValue;
3445 let mut m = HashMap::new();
3446 match tv {
3447 TemporalValue::DateTime {
3448 nanos_since_epoch,
3449 offset_seconds,
3450 timezone_name,
3451 } => {
3452 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3453 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3454 if let Some(tz) = timezone_name {
3455 m.insert("timezone_name".into(), Value::String(tz.clone()));
3456 }
3457 }
3458 TemporalValue::LocalDateTime { nanos_since_epoch } => {
3459 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3460 }
3461 TemporalValue::Time {
3462 nanos_since_midnight,
3463 offset_seconds,
3464 } => {
3465 m.insert(
3466 "nanos_since_midnight".into(),
3467 Value::Int(*nanos_since_midnight),
3468 );
3469 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3470 }
3471 TemporalValue::LocalTime {
3472 nanos_since_midnight,
3473 } => {
3474 m.insert(
3475 "nanos_since_midnight".into(),
3476 Value::Int(*nanos_since_midnight),
3477 );
3478 }
3479 TemporalValue::Date { days_since_epoch } => {
3480 m.insert(
3481 "days_since_epoch".into(),
3482 Value::Int(*days_since_epoch as i64),
3483 );
3484 }
3485 TemporalValue::Duration {
3486 months,
3487 days,
3488 nanos,
3489 } => {
3490 m.insert("months".into(), Value::Int(*months));
3491 m.insert("days".into(), Value::Int(*days));
3492 m.insert("nanos".into(), Value::Int(*nanos));
3493 }
3494 TemporalValue::Btic { lo, hi, meta } => {
3495 m.insert("lo".into(), Value::Int(*lo));
3496 m.insert("hi".into(), Value::Int(*hi));
3497 m.insert("meta".into(), Value::Int(*meta as i64));
3498 }
3499 }
3500 m
3501}
3502
3503fn build_struct_property_column(
3505 vids: &[Vid],
3506 props_map: &HashMap<Vid, Properties>,
3507 prop_name: &str,
3508 fields: &Fields,
3509) -> DFResult<ArrayRef> {
3510 use arrow_array::StructArray;
3511
3512 let values: Vec<Option<Value>> = vids
3515 .iter()
3516 .map(|vid| {
3517 let val = get_property_value(vid, props_map, prop_name);
3518 match val {
3519 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3520 other => other,
3521 }
3522 })
3523 .collect();
3524
3525 let child_arrays: Vec<ArrayRef> = fields
3526 .iter()
3527 .map(|field| {
3528 let field_name = field.name();
3529 match field.data_type() {
3530 DataType::Float64 => {
3531 let mut builder = Float64Builder::with_capacity(vids.len());
3532 for val in &values {
3533 match val {
3534 Some(Value::Map(obj)) => {
3535 match obj.get(field_name).and_then(|v| v.as_f64()) {
3536 Some(n) => builder.append_value(n),
3537 None => builder.append_null(),
3538 }
3539 }
3540 _ => builder.append_null(),
3541 }
3542 }
3543 Arc::new(builder.finish()) as ArrayRef
3544 }
3545 DataType::Utf8 => {
3546 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3547 for val in &values {
3548 match val {
3549 Some(Value::Map(obj)) => match obj.get(field_name) {
3550 Some(Value::String(s)) => builder.append_value(s),
3551 Some(Value::Null) | None => builder.append_null(),
3552 Some(other) => builder.append_value(format!("{other:?}")),
3553 },
3554 _ => builder.append_null(),
3555 }
3556 }
3557 Arc::new(builder.finish()) as ArrayRef
3558 }
3559 DataType::Int64 => {
3560 let mut builder = Int64Builder::with_capacity(vids.len());
3561 for val in &values {
3562 match val {
3563 Some(Value::Map(obj)) => {
3564 match obj.get(field_name).and_then(|v| v.as_i64()) {
3565 Some(n) => builder.append_value(n),
3566 None => builder.append_null(),
3567 }
3568 }
3569 _ => builder.append_null(),
3570 }
3571 }
3572 Arc::new(builder.finish()) as ArrayRef
3573 }
3574 DataType::Timestamp(_, _) => {
3575 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3576 for val in &values {
3577 match val {
3578 Some(Value::Map(obj)) => {
3579 match obj.get(field_name).and_then(|v| v.as_i64()) {
3580 Some(n) => builder.append_value(n),
3581 None => builder.append_null(),
3582 }
3583 }
3584 _ => builder.append_null(),
3585 }
3586 }
3587 Arc::new(builder.finish()) as ArrayRef
3588 }
3589 DataType::Int32 => {
3590 let mut builder = Int32Builder::with_capacity(vids.len());
3591 for val in &values {
3592 match val {
3593 Some(Value::Map(obj)) => {
3594 match obj.get(field_name).and_then(|v| v.as_i64()) {
3595 Some(n) => builder.append_value(n as i32),
3596 None => builder.append_null(),
3597 }
3598 }
3599 _ => builder.append_null(),
3600 }
3601 }
3602 Arc::new(builder.finish()) as ArrayRef
3603 }
3604 DataType::Time64(_) => {
3605 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3606 for val in &values {
3607 match val {
3608 Some(Value::Map(obj)) => {
3609 match obj.get(field_name).and_then(|v| v.as_i64()) {
3610 Some(n) => builder.append_value(n),
3611 None => builder.append_null(),
3612 }
3613 }
3614 _ => builder.append_null(),
3615 }
3616 }
3617 Arc::new(builder.finish()) as ArrayRef
3618 }
3619 _ => {
3621 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3622 for val in &values {
3623 match val {
3624 Some(Value::Map(obj)) => match obj.get(field_name) {
3625 Some(Value::Null) | None => builder.append_null(),
3626 Some(other) => builder.append_value(format!("{other:?}")),
3627 },
3628 _ => builder.append_null(),
3629 }
3630 }
3631 Arc::new(builder.finish()) as ArrayRef
3632 }
3633 }
3634 })
3635 .collect();
3636
3637 let nulls: Vec<bool> = values
3639 .iter()
3640 .map(|v| matches!(v, Some(Value::Map(_))))
3641 .collect();
3642
3643 let struct_array = StructArray::try_new(
3644 fields.clone(),
3645 child_arrays,
3646 Some(arrow::buffer::NullBuffer::from(nulls)),
3647 )
3648 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3649
3650 Ok(Arc::new(struct_array))
3651}
3652
3653impl Stream for GraphScanStream {
3654 type Item = DFResult<RecordBatch>;
3655
3656 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3657 let metrics = self.metrics.clone();
3658 let _timer = metrics.elapsed_compute().timer();
3659 loop {
3660 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3662
3663 match state {
3664 GraphScanState::Init => {
3665 let graph_ctx = self.graph_ctx.clone();
3667 let label = self.label.clone();
3668 let variable = self.variable.clone();
3669 let properties = self.properties.clone();
3670 let is_edge_scan = self.is_edge_scan;
3671 let is_schemaless = self.is_schemaless;
3672 let filter = self.filter.clone();
3673 let vid_list_filter = self.vid_list_filter.clone();
3674 let extra_lance_filter = self.extra_lance_filter.clone();
3675 let extra_runtime_filter = self.extra_runtime_filter.clone();
3676 let schema = self.schema.clone();
3677
3678 let fut = async move {
3679 graph_ctx.check_timeout().map_err(|e| {
3680 datafusion::error::DataFusionError::Execution(e.to_string())
3681 })?;
3682
3683 let batch = if is_edge_scan {
3684 columnar_scan_edge_batch_static(
3685 &graph_ctx,
3686 &label,
3687 &variable,
3688 &properties,
3689 &schema,
3690 )
3691 .await?
3692 } else if is_schemaless {
3693 columnar_scan_schemaless_vertex_batch_static(
3694 &graph_ctx,
3695 &label,
3696 &variable,
3697 &properties,
3698 &schema,
3699 &filter,
3700 vid_list_filter.as_deref(),
3701 extra_lance_filter.as_deref(),
3702 extra_runtime_filter.as_ref(),
3703 )
3704 .await?
3705 } else {
3706 columnar_scan_vertex_batch_static(
3707 &graph_ctx,
3708 &label,
3709 &variable,
3710 &properties,
3711 &schema,
3712 &filter,
3713 vid_list_filter.as_deref(),
3714 extra_lance_filter.as_deref(),
3715 extra_runtime_filter.as_ref(),
3716 )
3717 .await?
3718 };
3719 Ok(Some(batch))
3720 };
3721
3722 self.state = GraphScanState::Executing(Box::pin(fut));
3723 }
3725 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3726 Poll::Ready(Ok(batch)) => {
3727 self.state = GraphScanState::Done;
3728 self.metrics
3729 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3730 return Poll::Ready(batch.map(Ok));
3731 }
3732 Poll::Ready(Err(e)) => {
3733 self.state = GraphScanState::Done;
3734 return Poll::Ready(Some(Err(e)));
3735 }
3736 Poll::Pending => {
3737 self.state = GraphScanState::Executing(fut);
3738 return Poll::Pending;
3739 }
3740 },
3741 GraphScanState::Done => {
3742 return Poll::Ready(None);
3743 }
3744 }
3745 }
3746 }
3747}
3748
3749impl RecordBatchStream for GraphScanStream {
3750 fn schema(&self) -> SchemaRef {
3751 self.schema.clone()
3752 }
3753}
3754
3755#[cfg(test)]
3756mod tests {
3757 use super::*;
3758
3759 #[test]
3760 fn test_build_vertex_schema() {
3761 let uni_schema = UniSchema::default();
3762 let schema = GraphScanExec::build_vertex_schema(
3763 "n",
3764 "Person",
3765 &["name".to_string(), "age".to_string()],
3766 &uni_schema,
3767 );
3768
3769 assert_eq!(schema.fields().len(), 4);
3770 assert_eq!(schema.field(0).name(), "n._vid");
3771 assert_eq!(schema.field(1).name(), "n._labels");
3772 assert_eq!(schema.field(2).name(), "n.name");
3773 assert_eq!(schema.field(3).name(), "n.age");
3774 }
3775
3776 #[test]
3777 fn test_build_edge_schema() {
3778 let uni_schema = UniSchema::default();
3779 let schema =
3780 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3781
3782 assert_eq!(schema.fields().len(), 4);
3783 assert_eq!(schema.field(0).name(), "r._eid");
3784 assert_eq!(schema.field(1).name(), "r._src_vid");
3785 assert_eq!(schema.field(2).name(), "r._dst_vid");
3786 assert_eq!(schema.field(3).name(), "r.weight");
3787 }
3788
3789 #[test]
3790 fn test_build_schemaless_vertex_schema() {
3791 let empty_schema = uni_common::core::schema::Schema::default();
3792 let schema = GraphScanExec::build_schemaless_vertex_schema(
3793 "n",
3794 &["name".to_string(), "age".to_string()],
3795 &empty_schema,
3796 );
3797
3798 assert_eq!(schema.fields().len(), 4);
3799 assert_eq!(schema.field(0).name(), "n._vid");
3800 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3801 assert_eq!(schema.field(1).name(), "n._labels");
3802 assert_eq!(schema.field(2).name(), "n.name");
3803 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3805 assert_eq!(schema.field(3).name(), "n.age");
3806 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3807 }
3808
3809 #[test]
3810 fn test_schemaless_all_scan_has_empty_label() {
3811 let empty_schema = uni_common::core::schema::Schema::default();
3812 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3813
3814 assert_eq!(schema.fields().len(), 2);
3816 assert_eq!(schema.field(0).name(), "n._vid");
3817 assert_eq!(schema.field(1).name(), "n._labels");
3818 }
3819
3820 #[test]
3821 fn test_cypher_value_all_props_extraction() {
3822 let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3824 let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3825
3826 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3828 match decoded {
3829 uni_common::Value::Map(map) => {
3830 let age_val = map.get("age").unwrap();
3831 assert_eq!(age_val, &uni_common::Value::Int(30));
3832 }
3833 _ => panic!("Expected Map"),
3834 }
3835
3836 let single_val = serde_json::json!(30);
3838 let single_bytes = encode_cypher_value(&single_val).unwrap();
3839 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3840 assert_eq!(single_decoded, uni_common::Value::Int(30));
3841 }
3842
3843 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3845 let schema = Arc::new(Schema::new(vec![
3846 Field::new("_vid", DataType::UInt64, false),
3847 Field::new("_deleted", DataType::Boolean, false),
3848 Field::new("_version", DataType::UInt64, false),
3849 Field::new("name", DataType::Utf8, true),
3850 ]));
3851 let names: Vec<String> = vids
3853 .iter()
3854 .zip(versions.iter())
3855 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3856 .collect();
3857 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3858
3859 RecordBatch::try_new(
3860 schema,
3861 vec![
3862 Arc::new(UInt64Array::from(vids.to_vec())),
3863 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3864 Arc::new(UInt64Array::from(versions.to_vec())),
3865 Arc::new(name_arr),
3866 ],
3867 )
3868 .unwrap()
3869 }
3870
3871 #[test]
3872 fn test_mvcc_dedup_multiple_versions() {
3873 let batch = make_mvcc_batch(
3876 &[1, 1, 1, 2, 2],
3877 &[3, 1, 5, 2, 4],
3878 &[false, false, false, false, false],
3879 );
3880
3881 let result = mvcc_dedup_batch(&batch).unwrap();
3882 assert_eq!(result.num_rows(), 2);
3883
3884 let vid_col = result
3885 .column_by_name("_vid")
3886 .unwrap()
3887 .as_any()
3888 .downcast_ref::<UInt64Array>()
3889 .unwrap();
3890 let ver_col = result
3891 .column_by_name("_version")
3892 .unwrap()
3893 .as_any()
3894 .downcast_ref::<UInt64Array>()
3895 .unwrap();
3896 let name_col = result
3897 .column_by_name("name")
3898 .unwrap()
3899 .as_any()
3900 .downcast_ref::<arrow_array::StringArray>()
3901 .unwrap();
3902
3903 assert_eq!(vid_col.value(0), 1);
3905 assert_eq!(ver_col.value(0), 5);
3906 assert_eq!(name_col.value(0), "v1_ver5");
3907
3908 assert_eq!(vid_col.value(1), 2);
3909 assert_eq!(ver_col.value(1), 4);
3910 assert_eq!(name_col.value(1), "v2_ver4");
3911 }
3912
3913 #[test]
3914 fn test_mvcc_dedup_single_rows() {
3915 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3917 let result = mvcc_dedup_batch(&batch).unwrap();
3918 assert_eq!(result.num_rows(), 3);
3919 }
3920
3921 #[test]
3922 fn test_mvcc_dedup_empty() {
3923 let batch = make_mvcc_batch(&[], &[], &[]);
3924 let result = mvcc_dedup_batch(&batch).unwrap();
3925 assert_eq!(result.num_rows(), 0);
3926 }
3927
3928 #[test]
3929 fn test_filter_l0_tombstones_removes_tombstoned() {
3930 use crate::query::df_graph::L0Context;
3931
3932 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3934
3935 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3937 {
3938 }
3942 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3943 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3944
3945 let l0_ctx = L0Context {
3946 current_l0: Some(l0_buf),
3947 transaction_l0: None,
3948 pending_flush_l0s: vec![],
3949 };
3950
3951 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3952 assert_eq!(result.num_rows(), 2);
3953
3954 let vid_col = result
3955 .column_by_name("_vid")
3956 .unwrap()
3957 .as_any()
3958 .downcast_ref::<UInt64Array>()
3959 .unwrap();
3960 assert_eq!(vid_col.value(0), 1);
3961 assert_eq!(vid_col.value(1), 3);
3962 }
3963
3964 #[test]
3965 fn test_filter_l0_tombstones_none() {
3966 use crate::query::df_graph::L0Context;
3967
3968 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3969 let l0_ctx = L0Context::default();
3970
3971 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3972 assert_eq!(result.num_rows(), 3);
3973 }
3974
3975 #[test]
3976 fn test_map_to_output_schema_basic() {
3977 use crate::query::df_graph::L0Context;
3978
3979 let lance_schema = Arc::new(Schema::new(vec![
3981 Field::new("_vid", DataType::UInt64, false),
3982 Field::new("_deleted", DataType::Boolean, false),
3983 Field::new("_version", DataType::UInt64, false),
3984 Field::new("name", DataType::Utf8, true),
3985 ]));
3986 let name_arr: arrow_array::StringArray =
3987 vec![Some("Alice"), Some("Bob")].into_iter().collect();
3988 let batch = RecordBatch::try_new(
3989 lance_schema,
3990 vec![
3991 Arc::new(UInt64Array::from(vec![1u64, 2])),
3992 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3993 Arc::new(UInt64Array::from(vec![1u64, 1])),
3994 Arc::new(name_arr),
3995 ],
3996 )
3997 .unwrap();
3998
3999 let output_schema = Arc::new(Schema::new(vec![
4001 Field::new("n._vid", DataType::UInt64, false),
4002 Field::new("n._labels", labels_data_type(), true),
4003 Field::new("n.name", DataType::Utf8, true),
4004 ]));
4005
4006 let l0_ctx = L0Context::default();
4007 let result = map_to_output_schema(
4008 &batch,
4009 "Person",
4010 "n",
4011 &["name".to_string()],
4012 &output_schema,
4013 &l0_ctx,
4014 )
4015 .unwrap();
4016
4017 assert_eq!(result.num_rows(), 2);
4018 assert_eq!(result.schema().fields().len(), 3);
4019 assert_eq!(result.schema().field(0).name(), "n._vid");
4020 assert_eq!(result.schema().field(1).name(), "n._labels");
4021 assert_eq!(result.schema().field(2).name(), "n.name");
4022
4023 let name_col = result
4025 .column(2)
4026 .as_any()
4027 .downcast_ref::<arrow_array::StringArray>()
4028 .unwrap();
4029 assert_eq!(name_col.value(0), "Alice");
4030 assert_eq!(name_col.value(1), "Bob");
4031 }
4032}