1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
90
91 is_edge_scan: bool,
93
94 is_schemaless: bool,
96
97 schema: SchemaRef,
99
100 properties: PlanProperties,
102
103 metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("GraphScanExec")
110 .field("label", &self.label)
111 .field("variable", &self.variable)
112 .field("projected_properties", &self.projected_properties)
113 .field("is_edge_scan", &self.is_edge_scan)
114 .finish()
115 }
116}
117
118impl GraphScanExec {
119 pub fn new_vertex_scan(
124 graph_ctx: Arc<GraphExecutionContext>,
125 label: impl Into<String>,
126 variable: impl Into<String>,
127 projected_properties: Vec<String>,
128 filter: Option<Arc<dyn PhysicalExpr>>,
129 ) -> Self {
130 let label = label.into();
131 let variable = variable.into();
132
133 let uni_schema = graph_ctx.storage().schema_manager().schema();
135 let schema =
136 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138 let properties = compute_plan_properties(schema.clone());
139
140 Self {
141 graph_ctx,
142 label,
143 variable,
144 projected_properties,
145 filter,
146 is_edge_scan: false,
147 is_schemaless: false,
148 schema,
149 properties,
150 metrics: ExecutionPlanMetricsSet::new(),
151 }
152 }
153
154 pub fn new_schemaless_vertex_scan(
160 graph_ctx: Arc<GraphExecutionContext>,
161 label_name: impl Into<String>,
162 variable: impl Into<String>,
163 projected_properties: Vec<String>,
164 filter: Option<Arc<dyn PhysicalExpr>>,
165 ) -> Self {
166 let label = label_name.into();
167 let variable = variable.into();
168
169 let projected_properties: Vec<String> = projected_properties
174 .into_iter()
175 .filter(|p| p != "_vid" && p != "_labels")
176 .collect();
177
178 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
180 let properties = compute_plan_properties(schema.clone());
181
182 Self {
183 graph_ctx,
184 label,
185 variable,
186 projected_properties,
187 filter,
188 is_edge_scan: false,
189 is_schemaless: true,
190 schema,
191 properties,
192 metrics: ExecutionPlanMetricsSet::new(),
193 }
194 }
195
196 pub fn new_multi_label_vertex_scan(
201 graph_ctx: Arc<GraphExecutionContext>,
202 labels: Vec<String>,
203 variable: impl Into<String>,
204 projected_properties: Vec<String>,
205 filter: Option<Arc<dyn PhysicalExpr>>,
206 ) -> Self {
207 let variable = variable.into();
208 let projected_properties: Vec<String> = projected_properties
209 .into_iter()
210 .filter(|p| p != "_vid" && p != "_labels")
211 .collect();
212 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
213 let properties = compute_plan_properties(schema.clone());
214
215 let encoded_labels = labels.join(":");
217
218 Self {
219 graph_ctx,
220 label: encoded_labels,
221 variable,
222 projected_properties,
223 filter,
224 is_edge_scan: false,
225 is_schemaless: true,
226 schema,
227 properties,
228 metrics: ExecutionPlanMetricsSet::new(),
229 }
230 }
231
232 pub fn new_schemaless_all_scan(
238 graph_ctx: Arc<GraphExecutionContext>,
239 variable: impl Into<String>,
240 projected_properties: Vec<String>,
241 filter: Option<Arc<dyn PhysicalExpr>>,
242 ) -> Self {
243 let variable = variable.into();
244 let projected_properties: Vec<String> = projected_properties
245 .into_iter()
246 .filter(|p| p != "_vid" && p != "_labels")
247 .collect();
248
249 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
251 let properties = compute_plan_properties(schema.clone());
252
253 Self {
254 graph_ctx,
255 label: String::new(), variable,
257 projected_properties,
258 filter,
259 is_edge_scan: false,
260 is_schemaless: true,
261 schema,
262 properties,
263 metrics: ExecutionPlanMetricsSet::new(),
264 }
265 }
266
267 fn build_schemaless_vertex_schema(variable: &str, properties: &[String]) -> SchemaRef {
269 let mut fields = vec![
270 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
271 Field::new(format!("{}._labels", variable), labels_data_type(), true),
272 ];
273
274 for prop in properties {
275 let col_name = format!("{}.{}", variable, prop);
276 fields.push(Field::new(&col_name, DataType::LargeBinary, true));
278 }
279
280 Arc::new(Schema::new(fields))
281 }
282
283 pub fn new_edge_scan(
288 graph_ctx: Arc<GraphExecutionContext>,
289 edge_type: impl Into<String>,
290 variable: impl Into<String>,
291 projected_properties: Vec<String>,
292 filter: Option<Arc<dyn PhysicalExpr>>,
293 ) -> Self {
294 let label = edge_type.into();
295 let variable = variable.into();
296
297 let uni_schema = graph_ctx.storage().schema_manager().schema();
299 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
300
301 let properties = compute_plan_properties(schema.clone());
302
303 Self {
304 graph_ctx,
305 label,
306 variable,
307 projected_properties,
308 filter,
309 is_edge_scan: true,
310 is_schemaless: false,
311 schema,
312 properties,
313 metrics: ExecutionPlanMetricsSet::new(),
314 }
315 }
316
317 fn build_vertex_schema(
319 variable: &str,
320 label: &str,
321 properties: &[String],
322 uni_schema: &UniSchema,
323 ) -> SchemaRef {
324 let mut fields = vec![
325 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
326 Field::new(format!("{}._labels", variable), labels_data_type(), true),
327 ];
328 let label_props = uni_schema.properties.get(label);
329 for prop in properties {
330 let col_name = format!("{}.{}", variable, prop);
331 let arrow_type = resolve_property_type(prop, label_props);
332 fields.push(Field::new(&col_name, arrow_type, true));
333 }
334 Arc::new(Schema::new(fields))
335 }
336
337 fn build_edge_schema(
339 variable: &str,
340 edge_type: &str,
341 properties: &[String],
342 uni_schema: &UniSchema,
343 ) -> SchemaRef {
344 let mut fields = vec![
345 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
346 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
347 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
348 ];
349 let edge_props = uni_schema.properties.get(edge_type);
350 for prop in properties {
351 let col_name = format!("{}.{}", variable, prop);
352 let arrow_type = resolve_property_type(prop, edge_props);
353 fields.push(Field::new(&col_name, arrow_type, true));
354 }
355 Arc::new(Schema::new(fields))
356 }
357}
358
359impl DisplayAs for GraphScanExec {
360 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
362 write!(
363 f,
364 "GraphScanExec: {}={}, properties={:?}",
365 scan_type, self.label, self.projected_properties
366 )?;
367 if self.filter.is_some() {
368 write!(f, ", filter=<pushed>")?;
369 }
370 Ok(())
371 }
372}
373
374impl ExecutionPlan for GraphScanExec {
375 fn name(&self) -> &str {
376 "GraphScanExec"
377 }
378
379 fn as_any(&self) -> &dyn Any {
380 self
381 }
382
383 fn schema(&self) -> SchemaRef {
384 self.schema.clone()
385 }
386
387 fn properties(&self) -> &PlanProperties {
388 &self.properties
389 }
390
391 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
392 vec![]
393 }
394
395 fn with_new_children(
396 self: Arc<Self>,
397 children: Vec<Arc<dyn ExecutionPlan>>,
398 ) -> DFResult<Arc<dyn ExecutionPlan>> {
399 if children.is_empty() {
400 Ok(self)
401 } else {
402 Err(datafusion::error::DataFusionError::Plan(
403 "GraphScanExec does not accept children".to_string(),
404 ))
405 }
406 }
407
408 fn execute(
409 &self,
410 partition: usize,
411 _context: Arc<TaskContext>,
412 ) -> DFResult<SendableRecordBatchStream> {
413 let metrics = BaselineMetrics::new(&self.metrics, partition);
414
415 Ok(Box::pin(GraphScanStream::new(
416 self.graph_ctx.clone(),
417 self.label.clone(),
418 self.variable.clone(),
419 self.projected_properties.clone(),
420 self.is_edge_scan,
421 self.is_schemaless,
422 self.schema.clone(),
423 metrics,
424 )))
425 }
426
427 fn metrics(&self) -> Option<MetricsSet> {
428 Some(self.metrics.clone_inner())
429 }
430}
431
432enum GraphScanState {
434 Init,
436 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
438 Done,
440}
441
442struct GraphScanStream {
448 graph_ctx: Arc<GraphExecutionContext>,
450
451 label: String,
453
454 variable: String,
456
457 properties: Vec<String>,
459
460 is_edge_scan: bool,
462
463 is_schemaless: bool,
465
466 schema: SchemaRef,
468
469 state: GraphScanState,
471
472 metrics: BaselineMetrics,
474}
475
476impl GraphScanStream {
477 #[expect(clippy::too_many_arguments)]
479 fn new(
480 graph_ctx: Arc<GraphExecutionContext>,
481 label: String,
482 variable: String,
483 properties: Vec<String>,
484 is_edge_scan: bool,
485 is_schemaless: bool,
486 schema: SchemaRef,
487 metrics: BaselineMetrics,
488 ) -> Self {
489 Self {
490 graph_ctx,
491 label,
492 variable,
493 properties,
494 is_edge_scan,
495 is_schemaless,
496 schema,
497 state: GraphScanState::Init,
498 metrics,
499 }
500 }
501}
502
503pub(crate) fn resolve_property_type(
508 prop: &str,
509 schema_props: Option<
510 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
511 >,
512) -> DataType {
513 if prop == "overflow_json" {
514 DataType::LargeBinary
515 } else {
516 schema_props
517 .and_then(|props| props.get(prop))
518 .map(|meta| meta.r#type.to_arrow())
519 .unwrap_or(DataType::LargeBinary)
520 }
521}
522
523#[cfg(test)]
532fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
533 mvcc_dedup_batch_by(batch, "_vid")
534}
535
536fn mvcc_dedup_to_option(
541 batch: Option<RecordBatch>,
542 id_column: &str,
543) -> DFResult<Option<RecordBatch>> {
544 match batch {
545 Some(b) => {
546 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
547 Ok(if deduped.num_rows() > 0 {
548 Some(deduped)
549 } else {
550 None
551 })
552 }
553 None => Ok(None),
554 }
555}
556
557fn merge_lance_and_l0(
561 lance_deduped: Option<RecordBatch>,
562 l0_batch: RecordBatch,
563 internal_schema: &SchemaRef,
564 id_column: &str,
565) -> DFResult<Option<RecordBatch>> {
566 let has_l0 = l0_batch.num_rows() > 0;
567 match (lance_deduped, has_l0) {
568 (Some(lance), true) => {
569 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
570 .map_err(arrow_err)?;
571 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
572 }
573 (Some(lance), false) => Ok(Some(lance)),
574 (None, true) => Ok(Some(l0_batch)),
575 (None, false) => Ok(None),
576 }
577}
578
579fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
584 if !columns.iter().any(|c| c == col_name) {
585 columns.push(col_name.to_string());
586 }
587}
588
589fn extract_from_overflow_blob(
594 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
595 row: usize,
596 prop: &str,
597) -> Option<Vec<u8>> {
598 let arr = overflow_arr?;
599 if arr.is_null(row) {
600 return None;
601 }
602 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
603}
604
605fn build_overflow_property_column(
612 num_rows: usize,
613 vid_arr: &UInt64Array,
614 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
615 prop: &str,
616 l0_ctx: &crate::query::df_graph::L0Context,
617) -> ArrayRef {
618 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
619 for i in 0..num_rows {
620 let vid = Vid::from(vid_arr.value(i));
621
622 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
624
625 if let Some(val_opt) = l0_val {
626 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
627 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
628 builder.append_value(&bytes);
629 } else {
630 builder.append_null();
631 }
632 }
633 Arc::new(builder.finish())
634}
635
636fn resolve_l0_property(
642 vid: &Vid,
643 prop: &str,
644 l0_ctx: &crate::query::df_graph::L0Context,
645) -> Option<Option<Value>> {
646 let mut result = None;
647 for l0 in l0_ctx.iter_l0_buffers() {
648 let guard = l0.read();
649 if let Some(props) = guard.vertex_properties.get(vid)
650 && let Some(val) = props.get(prop)
651 {
652 result = Some(Some(val.clone()));
653 }
654 }
655 result
656}
657
658fn append_value_as_cypher_binary(
663 builder: &mut arrow_array::builder::LargeBinaryBuilder,
664 val: Option<&Value>,
665) {
666 match val {
667 Some(v) if !v.is_null() => {
668 let json_val: serde_json::Value = v.clone().into();
669 match encode_cypher_value(&json_val) {
670 Ok(bytes) => builder.append_value(bytes),
671 Err(_) => builder.append_null(),
672 }
673 }
674 _ => builder.append_null(),
675 }
676}
677
678fn build_all_props_column_with_l0_overlay(
686 num_rows: usize,
687 vid_arr: &UInt64Array,
688 props_arr: Option<&arrow_array::LargeBinaryArray>,
689 l0_ctx: &crate::query::df_graph::L0Context,
690) -> ArrayRef {
691 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
692 for i in 0..num_rows {
693 let vid = Vid::from(vid_arr.value(i));
694
695 let mut merged_props = serde_json::Map::new();
697 if let Some(arr) = props_arr
698 && !arr.is_null(i)
699 && let Ok(uni_common::Value::Map(map)) =
700 uni_common::cypher_value_codec::decode(arr.value(i))
701 {
702 for (k, v) in map {
703 let json_val: serde_json::Value = v.into();
704 merged_props.insert(k, json_val);
705 }
706 }
707
708 for l0 in l0_ctx.iter_l0_buffers() {
710 let guard = l0.read();
711 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
712 for (k, v) in l0_props {
713 let json_val: serde_json::Value = v.clone().into();
714 merged_props.insert(k.clone(), json_val);
715 }
716 }
717 }
718
719 if merged_props.is_empty() {
721 builder.append_null();
722 } else {
723 let json_obj = serde_json::Value::Object(merged_props);
724 match encode_cypher_value(&json_obj) {
725 Ok(bytes) => builder.append_value(bytes),
726 Err(_) => builder.append_null(),
727 }
728 }
729 }
730 Arc::new(builder.finish())
731}
732
733fn build_all_props_column_for_schema_scan(
738 batch: &RecordBatch,
739 vid_arr: &UInt64Array,
740 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
741 projected_properties: &[String],
742 l0_ctx: &crate::query::df_graph::L0Context,
743) -> ArrayRef {
744 let schema_props: Vec<&str> = projected_properties
746 .iter()
747 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
748 .map(String::as_str)
749 .collect();
750
751 let num_rows = batch.num_rows();
752 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
753 for i in 0..num_rows {
754 let vid = Vid::from(vid_arr.value(i));
755 let mut merged_props = serde_json::Map::new();
756
757 for &prop in &schema_props {
759 if let Some(col) = batch.column_by_name(prop) {
760 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
761 if !val.is_null() {
762 let json_val: serde_json::Value = val.into();
763 merged_props.insert(prop.to_string(), json_val);
764 }
765 }
766 }
767
768 if let Some(arr) = overflow_arr
770 && !arr.is_null(i)
771 && let Ok(uni_common::Value::Map(map)) =
772 uni_common::cypher_value_codec::decode(arr.value(i))
773 {
774 for (k, v) in map {
775 let json_val: serde_json::Value = v.into();
776 merged_props.insert(k, json_val);
777 }
778 }
779
780 for l0 in l0_ctx.iter_l0_buffers() {
782 let guard = l0.read();
783 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
784 for (k, v) in l0_props {
785 let json_val: serde_json::Value = v.clone().into();
786 merged_props.insert(k.clone(), json_val);
787 }
788 }
789 }
790
791 if merged_props.is_empty() {
792 builder.append_null();
793 } else {
794 let json_obj = serde_json::Value::Object(merged_props);
795 match encode_cypher_value(&json_obj) {
796 Ok(bytes) => builder.append_value(bytes),
797 Err(_) => builder.append_null(),
798 }
799 }
800 }
801 Arc::new(builder.finish())
802}
803
804fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
810 if batch.num_rows() == 0 {
811 return Ok(batch.clone());
812 }
813
814 let id_col = batch
815 .column_by_name(id_column)
816 .ok_or_else(|| {
817 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
818 })?
819 .clone();
820 let version_col = batch
821 .column_by_name("_version")
822 .ok_or_else(|| {
823 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
824 })?
825 .clone();
826
827 let sort_columns = vec![
829 arrow::compute::SortColumn {
830 values: id_col,
831 options: Some(arrow::compute::SortOptions {
832 descending: false,
833 nulls_first: false,
834 }),
835 },
836 arrow::compute::SortColumn {
837 values: version_col,
838 options: Some(arrow::compute::SortOptions {
839 descending: true,
840 nulls_first: false,
841 }),
842 },
843 ];
844 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
845
846 let sorted_columns: Vec<ArrayRef> = batch
848 .columns()
849 .iter()
850 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
851 .collect::<Result<_, _>>()
852 .map_err(arrow_err)?;
853 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
854
855 let sorted_id = sorted
857 .column_by_name(id_column)
858 .unwrap()
859 .as_any()
860 .downcast_ref::<UInt64Array>()
861 .unwrap();
862
863 let mut keep = vec![false; sorted.num_rows()];
864 if !keep.is_empty() {
865 keep[0] = true;
866 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
867 if sorted_id.value(i) != sorted_id.value(i - 1) {
868 *flag = true;
869 }
870 }
871 }
872
873 let mask = arrow_array::BooleanArray::from(keep);
874 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
875}
876
877fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
879 if batch.num_rows() == 0 {
880 return Ok(batch.clone());
881 }
882 let op_col = match batch.column_by_name("op") {
883 Some(col) => col
884 .as_any()
885 .downcast_ref::<arrow_array::UInt8Array>()
886 .unwrap(),
887 None => return Ok(batch.clone()),
888 };
889 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
890 let mask = arrow_array::BooleanArray::from(keep);
891 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
892}
893
894fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
896 if batch.num_rows() == 0 {
897 return Ok(batch.clone());
898 }
899 let deleted_col = match batch.column_by_name("_deleted") {
900 Some(col) => col
901 .as_any()
902 .downcast_ref::<arrow_array::BooleanArray>()
903 .unwrap(),
904 None => return Ok(batch.clone()),
905 };
906 let keep: Vec<bool> = (0..deleted_col.len())
907 .map(|i| !deleted_col.value(i))
908 .collect();
909 let mask = arrow_array::BooleanArray::from(keep);
910 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
911}
912
913fn filter_l0_tombstones(
915 batch: &RecordBatch,
916 l0_ctx: &crate::query::df_graph::L0Context,
917) -> DFResult<RecordBatch> {
918 if batch.num_rows() == 0 {
919 return Ok(batch.clone());
920 }
921
922 let mut tombstones: HashSet<u64> = HashSet::new();
923 for l0 in l0_ctx.iter_l0_buffers() {
924 let guard = l0.read();
925 for vid in guard.vertex_tombstones.iter() {
926 tombstones.insert(vid.as_u64());
927 }
928 }
929
930 if tombstones.is_empty() {
931 return Ok(batch.clone());
932 }
933
934 let vid_col = batch
935 .column_by_name("_vid")
936 .ok_or_else(|| {
937 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
938 })?
939 .as_any()
940 .downcast_ref::<UInt64Array>()
941 .unwrap();
942
943 let keep: Vec<bool> = (0..vid_col.len())
944 .map(|i| !tombstones.contains(&vid_col.value(i)))
945 .collect();
946 let mask = arrow_array::BooleanArray::from(keep);
947 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
948}
949
950fn filter_l0_edge_tombstones(
952 batch: &RecordBatch,
953 l0_ctx: &crate::query::df_graph::L0Context,
954) -> DFResult<RecordBatch> {
955 if batch.num_rows() == 0 {
956 return Ok(batch.clone());
957 }
958
959 let mut tombstones: HashSet<u64> = HashSet::new();
960 for l0 in l0_ctx.iter_l0_buffers() {
961 let guard = l0.read();
962 for eid in guard.tombstones.keys() {
963 tombstones.insert(eid.as_u64());
964 }
965 }
966
967 if tombstones.is_empty() {
968 return Ok(batch.clone());
969 }
970
971 let eid_col = batch
972 .column_by_name("eid")
973 .ok_or_else(|| {
974 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
975 })?
976 .as_any()
977 .downcast_ref::<UInt64Array>()
978 .unwrap();
979
980 let keep: Vec<bool> = (0..eid_col.len())
981 .map(|i| !tombstones.contains(&eid_col.value(i)))
982 .collect();
983 let mask = arrow_array::BooleanArray::from(keep);
984 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
985}
986
987fn build_l0_vertex_batch(
993 l0_ctx: &crate::query::df_graph::L0Context,
994 label: &str,
995 lance_schema: &SchemaRef,
996 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
997) -> DFResult<RecordBatch> {
998 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1001
1002 for l0 in l0_ctx.iter_l0_buffers() {
1003 let guard = l0.read();
1004 for vid in guard.vertex_tombstones.iter() {
1006 tombstones.insert(vid.as_u64());
1007 }
1008 for vid in guard.vids_for_label(label) {
1010 let vid_u64 = vid.as_u64();
1011 if tombstones.contains(&vid_u64) {
1012 continue;
1013 }
1014 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1015 let entry = vid_data
1016 .entry(vid_u64)
1017 .or_insert_with(|| (Properties::new(), 0));
1018 if let Some(props) = guard.vertex_properties.get(&vid) {
1020 for (k, v) in props {
1021 entry.0.insert(k.clone(), v.clone());
1022 }
1023 }
1024 if version > entry.1 {
1026 entry.1 = version;
1027 }
1028 }
1029 }
1030
1031 for t in &tombstones {
1033 vid_data.remove(t);
1034 }
1035
1036 if vid_data.is_empty() {
1037 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1038 }
1039
1040 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1042 vids.sort_unstable();
1043
1044 let num_rows = vids.len();
1045 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1046
1047 let schema_prop_names: HashSet<&str> = label_props
1049 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1050 .unwrap_or_default();
1051
1052 for field in lance_schema.fields() {
1053 let col_name = field.name().as_str();
1054 match col_name {
1055 "_vid" => {
1056 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1057 }
1058 "_deleted" => {
1059 let vals = vec![false; num_rows];
1061 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1062 }
1063 "_version" => {
1064 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1065 columns.push(Arc::new(UInt64Array::from(vals)));
1066 }
1067 "overflow_json" => {
1068 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1070 for vid_u64 in &vids {
1071 let (props, _) = &vid_data[vid_u64];
1072 let mut overflow = serde_json::Map::new();
1073 for (k, v) in props {
1074 if k == "ext_id" || k.starts_with('_') {
1075 continue;
1076 }
1077 if !schema_prop_names.contains(k.as_str()) {
1078 let json_val: serde_json::Value = v.clone().into();
1079 overflow.insert(k.clone(), json_val);
1080 }
1081 }
1082 if overflow.is_empty() {
1083 builder.append_null();
1084 } else {
1085 let json_val = serde_json::Value::Object(overflow);
1086 match encode_cypher_value(&json_val) {
1087 Ok(bytes) => builder.append_value(bytes),
1088 Err(_) => builder.append_null(),
1089 }
1090 }
1091 }
1092 columns.push(Arc::new(builder.finish()));
1093 }
1094 _ => {
1095 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1097 columns.push(col);
1098 }
1099 }
1100 }
1101
1102 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1103}
1104
1105fn build_l0_property_column(
1109 vids: &[u64],
1110 vid_data: &HashMap<u64, (Properties, u64)>,
1111 prop_name: &str,
1112 data_type: &DataType,
1113) -> DFResult<ArrayRef> {
1114 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1116 let props_map: HashMap<Vid, Properties> = vid_data
1117 .iter()
1118 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1119 .collect();
1120
1121 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1122}
1123
1124fn build_l0_edge_batch(
1130 l0_ctx: &crate::query::df_graph::L0Context,
1131 edge_type: &str,
1132 internal_schema: &SchemaRef,
1133 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1134) -> DFResult<RecordBatch> {
1135 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1138 let mut tombstones: HashSet<u64> = HashSet::new();
1139
1140 for l0 in l0_ctx.iter_l0_buffers() {
1141 let guard = l0.read();
1142 for eid in guard.tombstones.keys() {
1144 tombstones.insert(eid.as_u64());
1145 }
1146 for eid in guard.eids_for_type(edge_type) {
1148 let eid_u64 = eid.as_u64();
1149 if tombstones.contains(&eid_u64) {
1150 continue;
1151 }
1152 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1153 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1154 None => continue,
1155 };
1156 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1157 let entry = eid_data
1158 .entry(eid_u64)
1159 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1160 if let Some(props) = guard.edge_properties.get(&eid) {
1162 for (k, v) in props {
1163 entry.2.insert(k.clone(), v.clone());
1164 }
1165 }
1166 entry.0 = src_vid;
1168 entry.1 = dst_vid;
1169 if version > entry.3 {
1171 entry.3 = version;
1172 }
1173 }
1174 }
1175
1176 for t in &tombstones {
1178 eid_data.remove(t);
1179 }
1180
1181 if eid_data.is_empty() {
1182 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1183 }
1184
1185 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1187 eids.sort_unstable();
1188
1189 let num_rows = eids.len();
1190 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1191
1192 let schema_prop_names: HashSet<&str> = type_props
1194 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1195 .unwrap_or_default();
1196
1197 for field in internal_schema.fields() {
1198 let col_name = field.name().as_str();
1199 match col_name {
1200 "eid" => {
1201 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1202 }
1203 "src_vid" => {
1204 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1205 columns.push(Arc::new(UInt64Array::from(vals)));
1206 }
1207 "dst_vid" => {
1208 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1209 columns.push(Arc::new(UInt64Array::from(vals)));
1210 }
1211 "op" => {
1212 let vals = vec![0u8; num_rows];
1214 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1215 }
1216 "_version" => {
1217 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1218 columns.push(Arc::new(UInt64Array::from(vals)));
1219 }
1220 "overflow_json" => {
1221 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1223 for eid_u64 in &eids {
1224 let (_, _, props, _) = &eid_data[eid_u64];
1225 let mut overflow = serde_json::Map::new();
1226 for (k, v) in props {
1227 if k.starts_with('_') {
1228 continue;
1229 }
1230 if !schema_prop_names.contains(k.as_str()) {
1231 let json_val: serde_json::Value = v.clone().into();
1232 overflow.insert(k.clone(), json_val);
1233 }
1234 }
1235 if overflow.is_empty() {
1236 builder.append_null();
1237 } else {
1238 let json_val = serde_json::Value::Object(overflow);
1239 match encode_cypher_value(&json_val) {
1240 Ok(bytes) => builder.append_value(bytes),
1241 Err(_) => builder.append_null(),
1242 }
1243 }
1244 }
1245 columns.push(Arc::new(builder.finish()));
1246 }
1247 _ => {
1248 let col =
1250 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1251 columns.push(col);
1252 }
1253 }
1254 }
1255
1256 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1257}
1258
1259fn build_l0_edge_property_column(
1263 eids: &[u64],
1264 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1265 prop_name: &str,
1266 data_type: &DataType,
1267) -> DFResult<ArrayRef> {
1268 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1270 let props_map: HashMap<Vid, Properties> = eid_data
1271 .iter()
1272 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1273 .collect();
1274
1275 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1276}
1277
1278fn build_labels_column_for_known_label(
1284 vid_arr: &UInt64Array,
1285 label: &str,
1286 l0_ctx: &crate::query::df_graph::L0Context,
1287 batch_labels_col: Option<&arrow_array::ListArray>,
1288) -> DFResult<ArrayRef> {
1289 use uni_store::storage::arrow_convert::labels_from_list_array;
1290
1291 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1292
1293 for i in 0..vid_arr.len() {
1294 let vid = Vid::from(vid_arr.value(i));
1295
1296 let mut labels = match batch_labels_col {
1298 Some(list_arr) => {
1299 let stored = labels_from_list_array(list_arr, i);
1300 if stored.is_empty() {
1301 vec![label.to_string()]
1302 } else {
1303 stored
1304 }
1305 }
1306 None => vec![label.to_string()],
1307 };
1308
1309 if !labels.iter().any(|l| l == label) {
1311 labels.push(label.to_string());
1312 }
1313
1314 for l0 in l0_ctx.iter_l0_buffers() {
1316 let guard = l0.read();
1317 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1318 for lbl in l0_labels {
1319 if !labels.contains(lbl) {
1320 labels.push(lbl.clone());
1321 }
1322 }
1323 }
1324 }
1325
1326 let values = labels_builder.values();
1327 for lbl in &labels {
1328 values.append_value(lbl);
1329 }
1330 labels_builder.append(true);
1331 }
1332
1333 Ok(Arc::new(labels_builder.finish()))
1334}
1335
1336fn map_to_output_schema(
1342 batch: &RecordBatch,
1343 label: &str,
1344 _variable: &str,
1345 projected_properties: &[String],
1346 output_schema: &SchemaRef,
1347 l0_ctx: &crate::query::df_graph::L0Context,
1348) -> DFResult<RecordBatch> {
1349 if batch.num_rows() == 0 {
1350 return Ok(RecordBatch::new_empty(output_schema.clone()));
1351 }
1352
1353 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1354
1355 let vid_col = batch
1357 .column_by_name("_vid")
1358 .ok_or_else(|| {
1359 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1360 })?
1361 .clone();
1362 let vid_arr = vid_col
1363 .as_any()
1364 .downcast_ref::<UInt64Array>()
1365 .ok_or_else(|| {
1366 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1367 })?;
1368
1369 let batch_labels_col = batch
1371 .column_by_name("_labels")
1372 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1373 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1374 columns.push(vid_col.clone());
1375 columns.push(labels_col);
1376
1377 let overflow_arr = batch
1380 .column_by_name("overflow_json")
1381 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1382
1383 for prop in projected_properties {
1384 if prop == "overflow_json" {
1385 match batch.column_by_name("overflow_json") {
1386 Some(col) => columns.push(col.clone()),
1387 None => {
1388 columns.push(arrow_array::new_null_array(
1390 &DataType::LargeBinary,
1391 batch.num_rows(),
1392 ));
1393 }
1394 }
1395 } else if prop == "_all_props" {
1396 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1400 let guard = l0.read();
1401 !guard.vertex_properties.is_empty()
1402 });
1403 let has_schema_cols = projected_properties
1405 .iter()
1406 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1407
1408 if !any_l0_has_vertex_props && !has_schema_cols {
1409 match batch.column_by_name("overflow_json") {
1411 Some(col) => columns.push(col.clone()),
1412 None => {
1413 columns.push(arrow_array::new_null_array(
1414 &DataType::LargeBinary,
1415 batch.num_rows(),
1416 ));
1417 }
1418 }
1419 } else {
1420 let col = build_all_props_column_for_schema_scan(
1422 batch,
1423 vid_arr,
1424 overflow_arr,
1425 projected_properties,
1426 l0_ctx,
1427 );
1428 columns.push(col);
1429 }
1430 } else {
1431 match batch.column_by_name(prop) {
1432 Some(col) => columns.push(col.clone()),
1433 None => {
1434 let col = build_overflow_property_column(
1437 batch.num_rows(),
1438 vid_arr,
1439 overflow_arr,
1440 prop,
1441 l0_ctx,
1442 );
1443 columns.push(col);
1444 }
1445 }
1446 }
1447 }
1448
1449 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1450}
1451
1452fn map_edge_to_output_schema(
1459 batch: &RecordBatch,
1460 variable: &str,
1461 projected_properties: &[String],
1462 output_schema: &SchemaRef,
1463) -> DFResult<RecordBatch> {
1464 if batch.num_rows() == 0 {
1465 return Ok(RecordBatch::new_empty(output_schema.clone()));
1466 }
1467
1468 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1469
1470 let eid_col = batch
1472 .column_by_name("eid")
1473 .ok_or_else(|| {
1474 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1475 })?
1476 .clone();
1477 columns.push(eid_col);
1478
1479 let src_col = batch
1481 .column_by_name("src_vid")
1482 .ok_or_else(|| {
1483 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1484 })?
1485 .clone();
1486 columns.push(src_col);
1487
1488 let dst_col = batch
1490 .column_by_name("dst_vid")
1491 .ok_or_else(|| {
1492 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1493 })?
1494 .clone();
1495 columns.push(dst_col);
1496
1497 for prop in projected_properties {
1499 if prop == "overflow_json" {
1500 match batch.column_by_name("overflow_json") {
1501 Some(col) => columns.push(col.clone()),
1502 None => {
1503 columns.push(arrow_array::new_null_array(
1504 &DataType::LargeBinary,
1505 batch.num_rows(),
1506 ));
1507 }
1508 }
1509 } else {
1510 match batch.column_by_name(prop) {
1511 Some(col) => columns.push(col.clone()),
1512 None => {
1513 let overflow_arr = batch
1516 .column_by_name("overflow_json")
1517 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1518
1519 if let Some(arr) = overflow_arr {
1520 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1521 for i in 0..batch.num_rows() {
1522 if !arr.is_null(i) {
1523 let blob = arr.value(i);
1524 if let Some(sub_bytes) =
1526 uni_common::cypher_value_codec::extract_map_entry_raw(
1527 blob, prop,
1528 )
1529 {
1530 builder.append_value(&sub_bytes);
1531 } else {
1532 builder.append_null();
1533 }
1534 } else {
1535 builder.append_null();
1536 }
1537 }
1538 columns.push(Arc::new(builder.finish()));
1539 } else {
1540 let target_field = output_schema
1542 .fields()
1543 .iter()
1544 .find(|f| f.name() == &format!("{}.{}", variable, prop));
1545 let dt = target_field
1546 .map(|f| f.data_type().clone())
1547 .unwrap_or(DataType::LargeBinary);
1548 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1549 }
1550 }
1551 }
1552 }
1553 }
1554
1555 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1556}
1557
1558async fn columnar_scan_vertex_batch_static(
1565 graph_ctx: &GraphExecutionContext,
1566 label: &str,
1567 variable: &str,
1568 projected_properties: &[String],
1569 output_schema: &SchemaRef,
1570) -> DFResult<RecordBatch> {
1571 let storage = graph_ctx.storage();
1572 let l0_ctx = graph_ctx.l0_context();
1573 let uni_schema = storage.schema_manager().schema();
1574 let label_props = uni_schema.properties.get(label);
1575
1576 let mut lance_columns: Vec<String> = vec![
1578 "_vid".to_string(),
1579 "_deleted".to_string(),
1580 "_version".to_string(),
1581 ];
1582 for prop in projected_properties {
1583 if prop == "overflow_json" {
1584 push_column_if_absent(&mut lance_columns, "overflow_json");
1585 } else {
1586 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1587 if exists_in_schema {
1588 push_column_if_absent(&mut lance_columns, prop);
1589 }
1590 }
1591 }
1592
1593 let needs_overflow = projected_properties
1595 .iter()
1596 .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1597 if needs_overflow {
1598 push_column_if_absent(&mut lance_columns, "overflow_json");
1599 }
1600
1601 let ds = storage
1603 .vertex_dataset(label)
1604 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1605 let lancedb_store = storage.lancedb_store();
1606
1607 let lance_batch = match ds.open_lancedb(lancedb_store).await {
1608 Ok(table) => {
1609 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1610
1611 let table_schema = table
1613 .schema()
1614 .await
1615 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1616 let table_field_names: HashSet<&str> = table_schema
1617 .fields()
1618 .iter()
1619 .map(|f| f.name().as_str())
1620 .collect();
1621
1622 let actual_columns: Vec<&str> = lance_columns
1623 .iter()
1624 .filter(|c| table_field_names.contains(c.as_str()))
1625 .map(|c| c.as_str())
1626 .collect();
1627
1628 let query = table.query().select(Select::columns(&actual_columns));
1629 let query = match storage.version_high_water_mark() {
1633 Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1634 None => query,
1635 };
1636
1637 match query.execute().await {
1638 Ok(stream) => {
1639 use futures::TryStreamExt;
1640 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1641 if batches.is_empty() {
1642 None
1643 } else {
1644 Some(
1645 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1646 .map_err(|e| {
1647 datafusion::error::DataFusionError::ArrowError(
1648 Box::new(e),
1649 None,
1650 )
1651 })?,
1652 )
1653 }
1654 }
1655 Err(_) => None,
1656 }
1657 }
1658 Err(_) => None, };
1660
1661 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1663
1664 let internal_schema = match &lance_deduped {
1667 Some(batch) => batch.schema(),
1668 None => {
1669 let mut fields = vec![
1670 Field::new("_vid", DataType::UInt64, false),
1671 Field::new("_deleted", DataType::Boolean, false),
1672 Field::new("_version", DataType::UInt64, false),
1673 ];
1674 for col in &lance_columns {
1675 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1676 continue;
1677 }
1678 if col == "overflow_json" {
1679 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1680 } else {
1681 let arrow_type = label_props
1682 .and_then(|lp| lp.get(col.as_str()))
1683 .map(|meta| meta.r#type.to_arrow())
1684 .unwrap_or(DataType::LargeBinary);
1685 fields.push(Field::new(col, arrow_type, true));
1686 }
1687 }
1688 Arc::new(Schema::new(fields))
1689 }
1690 };
1691
1692 let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1694
1695 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1697 else {
1698 return Ok(RecordBatch::new_empty(output_schema.clone()));
1699 };
1700
1701 let merged = filter_deleted_rows(&merged)?;
1703 if merged.num_rows() == 0 {
1704 return Ok(RecordBatch::new_empty(output_schema.clone()));
1705 }
1706
1707 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1709
1710 if filtered.num_rows() == 0 {
1711 return Ok(RecordBatch::new_empty(output_schema.clone()));
1712 }
1713
1714 map_to_output_schema(
1716 &filtered,
1717 label,
1718 variable,
1719 projected_properties,
1720 output_schema,
1721 l0_ctx,
1722 )
1723}
1724
1725async fn columnar_scan_edge_batch_static(
1732 graph_ctx: &GraphExecutionContext,
1733 edge_type: &str,
1734 variable: &str,
1735 projected_properties: &[String],
1736 output_schema: &SchemaRef,
1737) -> DFResult<RecordBatch> {
1738 let storage = graph_ctx.storage();
1739 let l0_ctx = graph_ctx.l0_context();
1740 let uni_schema = storage.schema_manager().schema();
1741 let type_props = uni_schema.properties.get(edge_type);
1742
1743 let mut lance_columns: Vec<String> = vec![
1745 "eid".to_string(),
1746 "src_vid".to_string(),
1747 "dst_vid".to_string(),
1748 "op".to_string(),
1749 "_version".to_string(),
1750 ];
1751 for prop in projected_properties {
1752 if prop == "overflow_json" {
1753 push_column_if_absent(&mut lance_columns, "overflow_json");
1754 } else {
1755 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1756 if exists_in_schema {
1757 push_column_if_absent(&mut lance_columns, prop);
1758 }
1759 }
1760 }
1761
1762 let needs_overflow = projected_properties
1764 .iter()
1765 .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1766 if needs_overflow {
1767 push_column_if_absent(&mut lance_columns, "overflow_json");
1768 }
1769
1770 let ds = storage
1772 .delta_dataset(edge_type, "fwd")
1773 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1774 let lancedb_store = storage.lancedb_store();
1775
1776 let lance_batch = match ds.open_lancedb(lancedb_store).await {
1777 Ok(table) => {
1778 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1779
1780 let table_schema = table
1782 .schema()
1783 .await
1784 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1785 let table_field_names: HashSet<&str> = table_schema
1786 .fields()
1787 .iter()
1788 .map(|f| f.name().as_str())
1789 .collect();
1790
1791 let actual_columns: Vec<&str> = lance_columns
1792 .iter()
1793 .filter(|c| table_field_names.contains(c.as_str()))
1794 .map(|c| c.as_str())
1795 .collect();
1796
1797 let query = table.query().select(Select::columns(&actual_columns));
1801 let query = match storage.version_high_water_mark() {
1802 Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1803 None => query,
1804 };
1805
1806 match query.execute().await {
1807 Ok(stream) => {
1808 use futures::TryStreamExt;
1809 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1810 if batches.is_empty() {
1811 None
1812 } else {
1813 Some(
1814 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1815 .map_err(|e| {
1816 datafusion::error::DataFusionError::ArrowError(
1817 Box::new(e),
1818 None,
1819 )
1820 })?,
1821 )
1822 }
1823 }
1824 Err(_) => None,
1825 }
1826 }
1827 Err(_) => None, };
1829
1830 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1832
1833 let internal_schema = match &lance_deduped {
1836 Some(batch) => batch.schema(),
1837 None => {
1838 let mut fields = vec![
1839 Field::new("eid", DataType::UInt64, false),
1840 Field::new("src_vid", DataType::UInt64, false),
1841 Field::new("dst_vid", DataType::UInt64, false),
1842 Field::new("op", DataType::UInt8, false),
1843 Field::new("_version", DataType::UInt64, false),
1844 ];
1845 for col in &lance_columns {
1846 if matches!(
1847 col.as_str(),
1848 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1849 ) {
1850 continue;
1851 }
1852 if col == "overflow_json" {
1853 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1854 } else {
1855 let arrow_type = type_props
1856 .and_then(|tp| tp.get(col.as_str()))
1857 .map(|meta| meta.r#type.to_arrow())
1858 .unwrap_or(DataType::LargeBinary);
1859 fields.push(Field::new(col, arrow_type, true));
1860 }
1861 }
1862 Arc::new(Schema::new(fields))
1863 }
1864 };
1865
1866 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1868
1869 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1871 return Ok(RecordBatch::new_empty(output_schema.clone()));
1872 };
1873
1874 let merged = filter_deleted_edge_ops(&merged)?;
1876 if merged.num_rows() == 0 {
1877 return Ok(RecordBatch::new_empty(output_schema.clone()));
1878 }
1879
1880 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1882
1883 if filtered.num_rows() == 0 {
1884 return Ok(RecordBatch::new_empty(output_schema.clone()));
1885 }
1886
1887 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1889}
1890
1891async fn columnar_scan_schemaless_vertex_batch_static(
1898 graph_ctx: &GraphExecutionContext,
1899 label: &str,
1900 variable: &str,
1901 projected_properties: &[String],
1902 output_schema: &SchemaRef,
1903) -> DFResult<RecordBatch> {
1904 use uni_store::storage::main_vertex::MainVertexDataset;
1905
1906 let storage = graph_ctx.storage();
1907 let l0_ctx = graph_ctx.l0_context();
1908 let lancedb_store = storage.lancedb_store();
1909
1910 let filter = {
1913 let mut parts = Vec::new();
1914
1915 if !label.is_empty() {
1917 if label.contains(':') {
1918 for lbl in label.split(':') {
1920 parts.push(format!("array_contains(labels, '{}')", lbl));
1921 }
1922 } else {
1923 parts.push(format!("array_contains(labels, '{}')", label));
1924 }
1925 }
1926
1927 if let Some(hwm) = storage.version_high_water_mark() {
1929 parts.push(format!("_version <= {}", hwm));
1930 }
1931
1932 if parts.is_empty() {
1933 None
1934 } else {
1935 Some(parts.join(" AND "))
1936 }
1937 };
1938
1939 let lance_batch = match MainVertexDataset::open_table(lancedb_store).await {
1941 Ok(table) => {
1942 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1943
1944 let query = table.query().select(Select::columns(&[
1945 "_vid",
1946 "_deleted",
1947 "labels",
1948 "props_json",
1949 "_version",
1950 ]));
1951 let query = match filter {
1952 Some(f) => query.only_if(f),
1953 None => query,
1954 };
1955
1956 match query.execute().await {
1957 Ok(stream) => {
1958 use futures::TryStreamExt;
1959 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1960 if batches.is_empty() {
1961 None
1962 } else {
1963 Some(
1964 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1965 .map_err(|e| {
1966 datafusion::error::DataFusionError::ArrowError(
1967 Box::new(e),
1968 None,
1969 )
1970 })?,
1971 )
1972 }
1973 }
1974 Err(_) => None,
1975 }
1976 }
1977 Err(_) => None, };
1979
1980 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1982
1983 let internal_schema = match &lance_deduped {
1986 Some(batch) => batch.schema(),
1987 None => Arc::new(Schema::new(vec![
1988 Field::new("_vid", DataType::UInt64, false),
1989 Field::new("_deleted", DataType::Boolean, false),
1990 Field::new("labels", labels_data_type(), false),
1991 Field::new("props_json", DataType::LargeBinary, true),
1992 Field::new("_version", DataType::UInt64, false),
1993 ])),
1994 };
1995
1996 let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1998
1999 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2001 else {
2002 return Ok(RecordBatch::new_empty(output_schema.clone()));
2003 };
2004
2005 let merged = filter_deleted_rows(&merged)?;
2007 if merged.num_rows() == 0 {
2008 return Ok(RecordBatch::new_empty(output_schema.clone()));
2009 }
2010
2011 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2013
2014 if filtered.num_rows() == 0 {
2015 return Ok(RecordBatch::new_empty(output_schema.clone()));
2016 }
2017
2018 map_to_schemaless_output_schema(
2020 &filtered,
2021 variable,
2022 projected_properties,
2023 output_schema,
2024 l0_ctx,
2025 )
2026}
2027
2028fn build_l0_schemaless_vertex_batch(
2034 l0_ctx: &crate::query::df_graph::L0Context,
2035 label: &str,
2036 internal_schema: &SchemaRef,
2037) -> DFResult<RecordBatch> {
2038 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2041 let mut tombstones: HashSet<u64> = HashSet::new();
2042
2043 let label_filter: Vec<&str> = if label.is_empty() {
2045 vec![]
2046 } else if label.contains(':') {
2047 label.split(':').collect()
2048 } else {
2049 vec![label]
2050 };
2051
2052 for l0 in l0_ctx.iter_l0_buffers() {
2053 let guard = l0.read();
2054
2055 for vid in guard.vertex_tombstones.iter() {
2057 tombstones.insert(vid.as_u64());
2058 }
2059
2060 let vids: Vec<Vid> = if label_filter.is_empty() {
2062 guard.all_vertex_vids()
2063 } else if label_filter.len() == 1 {
2064 guard.vids_for_label(label_filter[0])
2065 } else {
2066 guard.vids_with_all_labels(&label_filter)
2067 };
2068
2069 for vid in vids {
2070 let vid_u64 = vid.as_u64();
2071 if tombstones.contains(&vid_u64) {
2072 continue;
2073 }
2074 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2075 let entry = vid_data
2076 .entry(vid_u64)
2077 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2078
2079 if let Some(props) = guard.vertex_properties.get(&vid) {
2081 for (k, v) in props {
2082 entry.0.insert(k.clone(), v.clone());
2083 }
2084 }
2085 if version > entry.1 {
2087 entry.1 = version;
2088 }
2089 if let Some(labels) = guard.vertex_labels.get(&vid) {
2091 entry.2 = labels.clone();
2092 }
2093 }
2094 }
2095
2096 for t in &tombstones {
2098 vid_data.remove(t);
2099 }
2100
2101 if vid_data.is_empty() {
2102 return Ok(RecordBatch::new_empty(internal_schema.clone()));
2103 }
2104
2105 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2107 vids.sort_unstable();
2108
2109 let num_rows = vids.len();
2110 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2111
2112 for field in internal_schema.fields() {
2113 match field.name().as_str() {
2114 "_vid" => {
2115 columns.push(Arc::new(UInt64Array::from(vids.clone())));
2116 }
2117 "labels" => {
2118 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2119 for vid_u64 in &vids {
2120 let (_, _, labels) = &vid_data[vid_u64];
2121 let values = labels_builder.values();
2122 for lbl in labels {
2123 values.append_value(lbl);
2124 }
2125 labels_builder.append(true);
2126 }
2127 columns.push(Arc::new(labels_builder.finish()));
2128 }
2129 "props_json" => {
2130 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2131 for vid_u64 in &vids {
2132 let (props, _, _) = &vid_data[vid_u64];
2133 if props.is_empty() {
2134 builder.append_null();
2135 } else {
2136 let json_obj: serde_json::Value = {
2138 let mut map = serde_json::Map::new();
2139 for (k, v) in props {
2140 let json_val: serde_json::Value = v.clone().into();
2141 map.insert(k.clone(), json_val);
2142 }
2143 serde_json::Value::Object(map)
2144 };
2145 match encode_cypher_value(&json_obj) {
2146 Ok(bytes) => builder.append_value(bytes),
2147 Err(_) => builder.append_null(),
2148 }
2149 }
2150 }
2151 columns.push(Arc::new(builder.finish()));
2152 }
2153 "_deleted" => {
2154 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2156 false;
2157 num_rows
2158 ])));
2159 }
2160 "_version" => {
2161 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2162 columns.push(Arc::new(UInt64Array::from(vals)));
2163 }
2164 _ => {
2165 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2167 }
2168 }
2169 }
2170
2171 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2172}
2173
2174fn map_to_schemaless_output_schema(
2181 batch: &RecordBatch,
2182 _variable: &str,
2183 projected_properties: &[String],
2184 output_schema: &SchemaRef,
2185 l0_ctx: &crate::query::df_graph::L0Context,
2186) -> DFResult<RecordBatch> {
2187 if batch.num_rows() == 0 {
2188 return Ok(RecordBatch::new_empty(output_schema.clone()));
2189 }
2190
2191 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2192
2193 let vid_col = batch
2195 .column_by_name("_vid")
2196 .ok_or_else(|| {
2197 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2198 })?
2199 .clone();
2200 let vid_arr = vid_col
2201 .as_any()
2202 .downcast_ref::<UInt64Array>()
2203 .ok_or_else(|| {
2204 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2205 })?;
2206 columns.push(vid_col.clone());
2207
2208 let labels_col = batch.column_by_name("labels");
2210 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2211
2212 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2213 for i in 0..vid_arr.len() {
2214 let vid_u64 = vid_arr.value(i);
2215 let vid = Vid::from(vid_u64);
2216
2217 let mut row_labels: Vec<String> = Vec::new();
2219 if let Some(arr) = labels_arr
2220 && !arr.is_null(i)
2221 {
2222 let list_val = arr.value(i);
2223 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2224 for j in 0..str_arr.len() {
2225 if !str_arr.is_null(j) {
2226 row_labels.push(str_arr.value(j).to_string());
2227 }
2228 }
2229 }
2230 }
2231
2232 for l0 in l0_ctx.iter_l0_buffers() {
2234 let guard = l0.read();
2235 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2236 for lbl in l0_labels {
2237 if !row_labels.contains(lbl) {
2238 row_labels.push(lbl.clone());
2239 }
2240 }
2241 }
2242 }
2243
2244 let values = labels_builder.values();
2245 for lbl in &row_labels {
2246 values.append_value(lbl);
2247 }
2248 labels_builder.append(true);
2249 }
2250 columns.push(Arc::new(labels_builder.finish()));
2251
2252 let props_col = batch.column_by_name("props_json");
2254 let props_arr =
2255 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2256
2257 for prop in projected_properties {
2258 if prop == "_all_props" {
2259 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2262 let guard = l0.read();
2263 !guard.vertex_properties.is_empty()
2264 });
2265 if !any_l0_has_vertex_props {
2266 match props_col {
2267 Some(col) => columns.push(col.clone()),
2268 None => {
2269 columns.push(arrow_array::new_null_array(
2270 &DataType::LargeBinary,
2271 batch.num_rows(),
2272 ));
2273 }
2274 }
2275 } else {
2276 let col = build_all_props_column_with_l0_overlay(
2277 batch.num_rows(),
2278 vid_arr,
2279 props_arr,
2280 l0_ctx,
2281 );
2282 columns.push(col);
2283 }
2284 } else {
2285 let col =
2287 build_overflow_property_column(batch.num_rows(), vid_arr, props_arr, prop, l0_ctx);
2288 columns.push(col);
2289 }
2290 }
2291
2292 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2293}
2294
2295pub(crate) fn get_property_value(
2297 vid: &Vid,
2298 props_map: &HashMap<Vid, Properties>,
2299 prop_name: &str,
2300) -> Option<Value> {
2301 if prop_name == "_all_props" {
2302 return props_map.get(vid).map(|p| {
2303 let map: HashMap<String, Value> =
2304 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2305 Value::Map(map)
2306 });
2307 }
2308 props_map
2309 .get(vid)
2310 .and_then(|props| props.get(prop_name))
2311 .cloned()
2312}
2313
2314pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2318 let uni_val: uni_common::Value = val.clone().into();
2319 Ok(uni_common::cypher_value_codec::encode(&uni_val))
2320}
2321
2322macro_rules! build_numeric_column {
2324 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2325 let mut builder = <$builder_ty>::new();
2326 for vid in $vids {
2327 match get_property_value(vid, $props_map, $prop_name) {
2328 Some(ref v) => {
2329 if let Some(val) = $extractor(v) {
2330 builder.append_value($cast(val));
2331 } else {
2332 builder.append_null();
2333 }
2334 }
2335 None => builder.append_null(),
2336 }
2337 }
2338 Ok(Arc::new(builder.finish()) as ArrayRef)
2339 }};
2340}
2341
2342pub(crate) fn build_property_column_static(
2344 vids: &[Vid],
2345 props_map: &HashMap<Vid, Properties>,
2346 prop_name: &str,
2347 data_type: &DataType,
2348) -> DFResult<ArrayRef> {
2349 match data_type {
2350 DataType::LargeBinary => {
2351 use arrow_array::builder::LargeBinaryBuilder;
2353 let mut builder = LargeBinaryBuilder::new();
2354
2355 for vid in vids {
2356 match get_property_value(vid, props_map, prop_name) {
2357 Some(Value::Null) | None => builder.append_null(),
2358 Some(Value::Bytes(bytes)) => {
2359 builder.append_value(&bytes);
2360 }
2361 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2362 let bytes: Vec<u8> = arr
2365 .iter()
2366 .filter_map(|v| v.as_u64().map(|n| n as u8))
2367 .collect();
2368 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2369 builder.append_value(&bytes);
2370 } else {
2371 let json_val: serde_json::Value = Value::List(arr).into();
2372 match encode_cypher_value(&json_val) {
2373 Ok(encoded) => builder.append_value(encoded),
2374 Err(_) => builder.append_null(),
2375 }
2376 }
2377 }
2378 Some(val) => {
2379 let json_val: serde_json::Value = val.into();
2381 match encode_cypher_value(&json_val) {
2382 Ok(bytes) => builder.append_value(bytes),
2383 Err(_) => builder.append_null(),
2384 }
2385 }
2386 }
2387 }
2388 Ok(Arc::new(builder.finish()))
2389 }
2390 DataType::Binary => {
2391 let mut builder = BinaryBuilder::new();
2393 for vid in vids {
2394 let bytes = get_property_value(vid, props_map, prop_name)
2395 .filter(|v| !v.is_null())
2396 .and_then(|v| {
2397 let json_val: serde_json::Value = v.into();
2398 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2399 })
2400 .and_then(|crdt| crdt.to_msgpack().ok());
2401 match bytes {
2402 Some(b) => builder.append_value(&b),
2403 None => builder.append_null(),
2404 }
2405 }
2406 Ok(Arc::new(builder.finish()))
2407 }
2408 DataType::Utf8 => {
2409 let mut builder = StringBuilder::new();
2410 for vid in vids {
2411 match get_property_value(vid, props_map, prop_name) {
2412 Some(Value::String(s)) => builder.append_value(s),
2413 Some(Value::Null) | None => builder.append_null(),
2414 Some(other) => builder.append_value(other.to_string()),
2415 }
2416 }
2417 Ok(Arc::new(builder.finish()))
2418 }
2419 DataType::Int64 => {
2420 build_numeric_column!(
2421 vids,
2422 props_map,
2423 prop_name,
2424 Int64Builder,
2425 |v: &Value| v.as_i64(),
2426 |v| v
2427 )
2428 }
2429 DataType::Int32 => {
2430 build_numeric_column!(
2431 vids,
2432 props_map,
2433 prop_name,
2434 Int32Builder,
2435 |v: &Value| v.as_i64(),
2436 |v: i64| v as i32
2437 )
2438 }
2439 DataType::Float64 => {
2440 build_numeric_column!(
2441 vids,
2442 props_map,
2443 prop_name,
2444 Float64Builder,
2445 |v: &Value| v.as_f64(),
2446 |v| v
2447 )
2448 }
2449 DataType::Float32 => {
2450 build_numeric_column!(
2451 vids,
2452 props_map,
2453 prop_name,
2454 Float32Builder,
2455 |v: &Value| v.as_f64(),
2456 |v: f64| v as f32
2457 )
2458 }
2459 DataType::Boolean => {
2460 let mut builder = BooleanBuilder::new();
2461 for vid in vids {
2462 match get_property_value(vid, props_map, prop_name) {
2463 Some(Value::Bool(b)) => builder.append_value(b),
2464 _ => builder.append_null(),
2465 }
2466 }
2467 Ok(Arc::new(builder.finish()))
2468 }
2469 DataType::UInt64 => {
2470 build_numeric_column!(
2471 vids,
2472 props_map,
2473 prop_name,
2474 UInt64Builder,
2475 |v: &Value| v.as_u64(),
2476 |v| v
2477 )
2478 }
2479 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2480 let values_builder = Float32Builder::new();
2482 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2483 for vid in vids {
2484 match get_property_value(vid, props_map, prop_name) {
2485 Some(Value::Vector(v)) => {
2486 for val in v {
2487 list_builder.values().append_value(val);
2488 }
2489 list_builder.append(true);
2490 }
2491 Some(Value::List(arr)) => {
2492 for v in arr {
2493 list_builder
2494 .values()
2495 .append_value(v.as_f64().unwrap_or(0.0) as f32);
2496 }
2497 list_builder.append(true);
2498 }
2499 _ => {
2500 for _ in 0..*dim {
2502 list_builder.values().append_null();
2503 }
2504 list_builder.append(false);
2505 }
2506 }
2507 }
2508 Ok(Arc::new(list_builder.finish()))
2509 }
2510 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2511 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2513 for vid in vids {
2514 match get_property_value(vid, props_map, prop_name) {
2515 Some(Value::Temporal(tv)) => match tv {
2516 uni_common::TemporalValue::DateTime {
2517 nanos_since_epoch, ..
2518 }
2519 | uni_common::TemporalValue::LocalDateTime {
2520 nanos_since_epoch, ..
2521 } => {
2522 builder.append_value(nanos_since_epoch);
2523 }
2524 uni_common::TemporalValue::Date { days_since_epoch } => {
2525 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2526 }
2527 _ => builder.append_null(),
2528 },
2529 Some(Value::String(s)) => match parse_datetime_utc(&s) {
2530 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2531 Err(_) => builder.append_null(),
2532 },
2533 Some(Value::Int(n)) => {
2534 builder.append_value(n);
2535 }
2536 _ => builder.append_null(),
2537 }
2538 }
2539 Ok(Arc::new(builder.finish()))
2540 }
2541 DataType::Date32 => {
2542 let mut builder = Date32Builder::new();
2543 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2544 for vid in vids {
2545 match get_property_value(vid, props_map, prop_name) {
2546 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2547 builder.append_value(days_since_epoch);
2548 }
2549 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2550 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2551 Err(_) => builder.append_null(),
2552 },
2553 Some(Value::Int(n)) => {
2554 builder.append_value(n as i32);
2555 }
2556 _ => builder.append_null(),
2557 }
2558 }
2559 Ok(Arc::new(builder.finish()))
2560 }
2561 DataType::Time64(TimeUnit::Nanosecond) => {
2562 let mut builder = Time64NanosecondBuilder::new();
2563 for vid in vids {
2564 match get_property_value(vid, props_map, prop_name) {
2565 Some(Value::Temporal(
2566 uni_common::TemporalValue::LocalTime {
2567 nanos_since_midnight,
2568 }
2569 | uni_common::TemporalValue::Time {
2570 nanos_since_midnight,
2571 ..
2572 },
2573 )) => {
2574 builder.append_value(nanos_since_midnight);
2575 }
2576 Some(Value::Temporal(_)) => builder.append_null(),
2577 Some(Value::String(s)) => {
2578 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2579 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2580 {
2581 Ok(t) => {
2582 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2583 + t.nanosecond() as i64;
2584 builder.append_value(nanos);
2585 }
2586 Err(_) => builder.append_null(),
2587 }
2588 }
2589 Some(Value::Int(n)) => {
2590 builder.append_value(n);
2591 }
2592 _ => builder.append_null(),
2593 }
2594 }
2595 Ok(Arc::new(builder.finish()))
2596 }
2597 DataType::Interval(IntervalUnit::MonthDayNano) => {
2598 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2599 Vec::with_capacity(vids.len());
2600 for vid in vids {
2601 match get_property_value(vid, props_map, prop_name) {
2602 Some(Value::Temporal(uni_common::TemporalValue::Duration {
2603 months,
2604 days,
2605 nanos,
2606 })) => {
2607 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2608 months: months as i32,
2609 days: days as i32,
2610 nanoseconds: nanos,
2611 }));
2612 }
2613 Some(Value::Int(_n)) => {
2614 values.push(None);
2615 }
2616 _ => values.push(None),
2617 }
2618 }
2619 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2620 Ok(Arc::new(arr))
2621 }
2622 DataType::List(inner_field) => {
2623 build_list_property_column(vids, props_map, prop_name, inner_field)
2624 }
2625 DataType::Struct(fields) => {
2626 build_struct_property_column(vids, props_map, prop_name, fields)
2627 }
2628 _ => {
2630 let mut builder = StringBuilder::new();
2631 for vid in vids {
2632 match get_property_value(vid, props_map, prop_name) {
2633 Some(Value::Null) | None => builder.append_null(),
2634 Some(other) => builder.append_value(other.to_string()),
2635 }
2636 }
2637 Ok(Arc::new(builder.finish()))
2638 }
2639 }
2640}
2641
2642fn build_list_property_column(
2644 vids: &[Vid],
2645 props_map: &HashMap<Vid, Properties>,
2646 prop_name: &str,
2647 inner_field: &Arc<Field>,
2648) -> DFResult<ArrayRef> {
2649 match inner_field.data_type() {
2650 DataType::Utf8 => {
2651 let mut builder = ListBuilder::new(StringBuilder::new());
2652 for vid in vids {
2653 match get_property_value(vid, props_map, prop_name) {
2654 Some(Value::List(arr)) => {
2655 for v in arr {
2656 match v {
2657 Value::String(s) => builder.values().append_value(s),
2658 Value::Null => builder.values().append_null(),
2659 other => builder.values().append_value(format!("{other:?}")),
2660 }
2661 }
2662 builder.append(true);
2663 }
2664 _ => builder.append(false),
2665 }
2666 }
2667 Ok(Arc::new(builder.finish()))
2668 }
2669 DataType::Int64 => {
2670 let mut builder = ListBuilder::new(Int64Builder::new());
2671 for vid in vids {
2672 match get_property_value(vid, props_map, prop_name) {
2673 Some(Value::List(arr)) => {
2674 for v in arr {
2675 match v.as_i64() {
2676 Some(n) => builder.values().append_value(n),
2677 None => builder.values().append_null(),
2678 }
2679 }
2680 builder.append(true);
2681 }
2682 _ => builder.append(false),
2683 }
2684 }
2685 Ok(Arc::new(builder.finish()))
2686 }
2687 DataType::Float64 => {
2688 let mut builder = ListBuilder::new(Float64Builder::new());
2689 for vid in vids {
2690 match get_property_value(vid, props_map, prop_name) {
2691 Some(Value::List(arr)) => {
2692 for v in arr {
2693 match v.as_f64() {
2694 Some(n) => builder.values().append_value(n),
2695 None => builder.values().append_null(),
2696 }
2697 }
2698 builder.append(true);
2699 }
2700 _ => builder.append(false),
2701 }
2702 }
2703 Ok(Arc::new(builder.finish()))
2704 }
2705 DataType::Boolean => {
2706 let mut builder = ListBuilder::new(BooleanBuilder::new());
2707 for vid in vids {
2708 match get_property_value(vid, props_map, prop_name) {
2709 Some(Value::List(arr)) => {
2710 for v in arr {
2711 match v.as_bool() {
2712 Some(b) => builder.values().append_value(b),
2713 None => builder.values().append_null(),
2714 }
2715 }
2716 builder.append(true);
2717 }
2718 _ => builder.append(false),
2719 }
2720 }
2721 Ok(Arc::new(builder.finish()))
2722 }
2723 DataType::Struct(fields) => {
2724 build_list_of_structs_column(vids, props_map, prop_name, fields)
2726 }
2727 _ => {
2729 let mut builder = ListBuilder::new(StringBuilder::new());
2730 for vid in vids {
2731 match get_property_value(vid, props_map, prop_name) {
2732 Some(Value::List(arr)) => {
2733 for v in arr {
2734 match v {
2735 Value::Null => builder.values().append_null(),
2736 other => builder.values().append_value(format!("{other:?}")),
2737 }
2738 }
2739 builder.append(true);
2740 }
2741 _ => builder.append(false),
2742 }
2743 }
2744 Ok(Arc::new(builder.finish()))
2745 }
2746 }
2747}
2748
2749fn build_list_of_structs_column(
2755 vids: &[Vid],
2756 props_map: &HashMap<Vid, Properties>,
2757 prop_name: &str,
2758 fields: &Fields,
2759) -> DFResult<ArrayRef> {
2760 use arrow_array::StructArray;
2761
2762 let values: Vec<Option<Value>> = vids
2763 .iter()
2764 .map(|vid| get_property_value(vid, props_map, prop_name))
2765 .collect();
2766
2767 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2770 .iter()
2771 .map(|val| match val {
2772 Some(Value::List(arr)) => {
2773 let objs: Vec<HashMap<String, Value>> = arr
2774 .iter()
2775 .filter_map(|v| {
2776 if let Value::Map(m) = v {
2777 Some(m.clone())
2778 } else {
2779 None
2780 }
2781 })
2782 .collect();
2783 if objs.is_empty() { None } else { Some(objs) }
2784 }
2785 Some(Value::Map(obj)) => {
2786 let kv_pairs: Vec<HashMap<String, Value>> = obj
2788 .iter()
2789 .map(|(k, v)| {
2790 let mut m = HashMap::new();
2791 m.insert("key".to_string(), Value::String(k.clone()));
2792 m.insert("value".to_string(), v.clone());
2793 m
2794 })
2795 .collect();
2796 Some(kv_pairs)
2797 }
2798 _ => None,
2799 })
2800 .collect();
2801
2802 let total_items: usize = rows
2803 .iter()
2804 .filter_map(|r| r.as_ref())
2805 .map(|v| v.len())
2806 .sum();
2807
2808 let child_arrays: Vec<ArrayRef> = fields
2810 .iter()
2811 .map(|field| {
2812 let field_name = field.name();
2813 match field.data_type() {
2814 DataType::Utf8 => {
2815 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2816 for obj in rows.iter().flatten().flatten() {
2817 match obj.get(field_name) {
2818 Some(Value::String(s)) => builder.append_value(s),
2819 Some(Value::Null) | None => builder.append_null(),
2820 Some(other) => builder.append_value(format!("{other:?}")),
2821 }
2822 }
2823 Arc::new(builder.finish()) as ArrayRef
2824 }
2825 DataType::Int64 => {
2826 let mut builder = Int64Builder::with_capacity(total_items);
2827 for obj in rows.iter().flatten().flatten() {
2828 match obj.get(field_name).and_then(|v| v.as_i64()) {
2829 Some(n) => builder.append_value(n),
2830 None => builder.append_null(),
2831 }
2832 }
2833 Arc::new(builder.finish()) as ArrayRef
2834 }
2835 DataType::Float64 => {
2836 let mut builder = Float64Builder::with_capacity(total_items);
2837 for obj in rows.iter().flatten().flatten() {
2838 match obj.get(field_name).and_then(|v| v.as_f64()) {
2839 Some(n) => builder.append_value(n),
2840 None => builder.append_null(),
2841 }
2842 }
2843 Arc::new(builder.finish()) as ArrayRef
2844 }
2845 _ => {
2847 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2848 for obj in rows.iter().flatten().flatten() {
2849 match obj.get(field_name) {
2850 Some(Value::Null) | None => builder.append_null(),
2851 Some(other) => builder.append_value(format!("{other:?}")),
2852 }
2853 }
2854 Arc::new(builder.finish()) as ArrayRef
2855 }
2856 }
2857 })
2858 .collect();
2859
2860 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2862 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2863
2864 let mut offsets = Vec::with_capacity(vids.len() + 1);
2866 let mut nulls = Vec::with_capacity(vids.len());
2867 let mut offset = 0i32;
2868 offsets.push(offset);
2869 for row in &rows {
2870 match row {
2871 Some(objs) => {
2872 offset += objs.len() as i32;
2873 offsets.push(offset);
2874 nulls.push(true);
2875 }
2876 None => {
2877 offsets.push(offset);
2878 nulls.push(false);
2879 }
2880 }
2881 }
2882
2883 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2884 let list_array = arrow_array::ListArray::try_new(
2885 list_field,
2886 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2887 Arc::new(struct_array),
2888 Some(arrow::buffer::NullBuffer::from(nulls)),
2889 )
2890 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2891
2892 Ok(Arc::new(list_array))
2893}
2894
2895fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2898 use uni_common::value::TemporalValue;
2899 let mut m = HashMap::new();
2900 match tv {
2901 TemporalValue::DateTime {
2902 nanos_since_epoch,
2903 offset_seconds,
2904 timezone_name,
2905 } => {
2906 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2907 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2908 if let Some(tz) = timezone_name {
2909 m.insert("timezone_name".into(), Value::String(tz.clone()));
2910 }
2911 }
2912 TemporalValue::LocalDateTime { nanos_since_epoch } => {
2913 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2914 }
2915 TemporalValue::Time {
2916 nanos_since_midnight,
2917 offset_seconds,
2918 } => {
2919 m.insert(
2920 "nanos_since_midnight".into(),
2921 Value::Int(*nanos_since_midnight),
2922 );
2923 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2924 }
2925 TemporalValue::LocalTime {
2926 nanos_since_midnight,
2927 } => {
2928 m.insert(
2929 "nanos_since_midnight".into(),
2930 Value::Int(*nanos_since_midnight),
2931 );
2932 }
2933 TemporalValue::Date { days_since_epoch } => {
2934 m.insert(
2935 "days_since_epoch".into(),
2936 Value::Int(*days_since_epoch as i64),
2937 );
2938 }
2939 TemporalValue::Duration {
2940 months,
2941 days,
2942 nanos,
2943 } => {
2944 m.insert("months".into(), Value::Int(*months));
2945 m.insert("days".into(), Value::Int(*days));
2946 m.insert("nanos".into(), Value::Int(*nanos));
2947 }
2948 }
2949 m
2950}
2951
2952fn build_struct_property_column(
2954 vids: &[Vid],
2955 props_map: &HashMap<Vid, Properties>,
2956 prop_name: &str,
2957 fields: &Fields,
2958) -> DFResult<ArrayRef> {
2959 use arrow_array::StructArray;
2960
2961 let values: Vec<Option<Value>> = vids
2964 .iter()
2965 .map(|vid| {
2966 let val = get_property_value(vid, props_map, prop_name);
2967 match val {
2968 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2969 other => other,
2970 }
2971 })
2972 .collect();
2973
2974 let child_arrays: Vec<ArrayRef> = fields
2975 .iter()
2976 .map(|field| {
2977 let field_name = field.name();
2978 match field.data_type() {
2979 DataType::Float64 => {
2980 let mut builder = Float64Builder::with_capacity(vids.len());
2981 for val in &values {
2982 match val {
2983 Some(Value::Map(obj)) => {
2984 match obj.get(field_name).and_then(|v| v.as_f64()) {
2985 Some(n) => builder.append_value(n),
2986 None => builder.append_null(),
2987 }
2988 }
2989 _ => builder.append_null(),
2990 }
2991 }
2992 Arc::new(builder.finish()) as ArrayRef
2993 }
2994 DataType::Utf8 => {
2995 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2996 for val in &values {
2997 match val {
2998 Some(Value::Map(obj)) => match obj.get(field_name) {
2999 Some(Value::String(s)) => builder.append_value(s),
3000 Some(Value::Null) | None => builder.append_null(),
3001 Some(other) => builder.append_value(format!("{other:?}")),
3002 },
3003 _ => builder.append_null(),
3004 }
3005 }
3006 Arc::new(builder.finish()) as ArrayRef
3007 }
3008 DataType::Int64 => {
3009 let mut builder = Int64Builder::with_capacity(vids.len());
3010 for val in &values {
3011 match val {
3012 Some(Value::Map(obj)) => {
3013 match obj.get(field_name).and_then(|v| v.as_i64()) {
3014 Some(n) => builder.append_value(n),
3015 None => builder.append_null(),
3016 }
3017 }
3018 _ => builder.append_null(),
3019 }
3020 }
3021 Arc::new(builder.finish()) as ArrayRef
3022 }
3023 DataType::Timestamp(_, _) => {
3024 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3025 for val in &values {
3026 match val {
3027 Some(Value::Map(obj)) => {
3028 match obj.get(field_name).and_then(|v| v.as_i64()) {
3029 Some(n) => builder.append_value(n),
3030 None => builder.append_null(),
3031 }
3032 }
3033 _ => builder.append_null(),
3034 }
3035 }
3036 Arc::new(builder.finish()) as ArrayRef
3037 }
3038 DataType::Int32 => {
3039 let mut builder = Int32Builder::with_capacity(vids.len());
3040 for val in &values {
3041 match val {
3042 Some(Value::Map(obj)) => {
3043 match obj.get(field_name).and_then(|v| v.as_i64()) {
3044 Some(n) => builder.append_value(n as i32),
3045 None => builder.append_null(),
3046 }
3047 }
3048 _ => builder.append_null(),
3049 }
3050 }
3051 Arc::new(builder.finish()) as ArrayRef
3052 }
3053 DataType::Time64(_) => {
3054 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3055 for val in &values {
3056 match val {
3057 Some(Value::Map(obj)) => {
3058 match obj.get(field_name).and_then(|v| v.as_i64()) {
3059 Some(n) => builder.append_value(n),
3060 None => builder.append_null(),
3061 }
3062 }
3063 _ => builder.append_null(),
3064 }
3065 }
3066 Arc::new(builder.finish()) as ArrayRef
3067 }
3068 _ => {
3070 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3071 for val in &values {
3072 match val {
3073 Some(Value::Map(obj)) => match obj.get(field_name) {
3074 Some(Value::Null) | None => builder.append_null(),
3075 Some(other) => builder.append_value(format!("{other:?}")),
3076 },
3077 _ => builder.append_null(),
3078 }
3079 }
3080 Arc::new(builder.finish()) as ArrayRef
3081 }
3082 }
3083 })
3084 .collect();
3085
3086 let nulls: Vec<bool> = values
3088 .iter()
3089 .map(|v| matches!(v, Some(Value::Map(_))))
3090 .collect();
3091
3092 let struct_array = StructArray::try_new(
3093 fields.clone(),
3094 child_arrays,
3095 Some(arrow::buffer::NullBuffer::from(nulls)),
3096 )
3097 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3098
3099 Ok(Arc::new(struct_array))
3100}
3101
3102impl Stream for GraphScanStream {
3103 type Item = DFResult<RecordBatch>;
3104
3105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3106 loop {
3107 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3109
3110 match state {
3111 GraphScanState::Init => {
3112 let graph_ctx = self.graph_ctx.clone();
3114 let label = self.label.clone();
3115 let variable = self.variable.clone();
3116 let properties = self.properties.clone();
3117 let is_edge_scan = self.is_edge_scan;
3118 let is_schemaless = self.is_schemaless;
3119 let schema = self.schema.clone();
3120
3121 let fut = async move {
3122 graph_ctx.check_timeout().map_err(|e| {
3123 datafusion::error::DataFusionError::Execution(e.to_string())
3124 })?;
3125
3126 let batch = if is_edge_scan {
3127 columnar_scan_edge_batch_static(
3128 &graph_ctx,
3129 &label,
3130 &variable,
3131 &properties,
3132 &schema,
3133 )
3134 .await?
3135 } else if is_schemaless {
3136 columnar_scan_schemaless_vertex_batch_static(
3137 &graph_ctx,
3138 &label,
3139 &variable,
3140 &properties,
3141 &schema,
3142 )
3143 .await?
3144 } else {
3145 columnar_scan_vertex_batch_static(
3146 &graph_ctx,
3147 &label,
3148 &variable,
3149 &properties,
3150 &schema,
3151 )
3152 .await?
3153 };
3154 Ok(Some(batch))
3155 };
3156
3157 self.state = GraphScanState::Executing(Box::pin(fut));
3158 }
3160 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3161 Poll::Ready(Ok(batch)) => {
3162 self.state = GraphScanState::Done;
3163 self.metrics
3164 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3165 return Poll::Ready(batch.map(Ok));
3166 }
3167 Poll::Ready(Err(e)) => {
3168 self.state = GraphScanState::Done;
3169 return Poll::Ready(Some(Err(e)));
3170 }
3171 Poll::Pending => {
3172 self.state = GraphScanState::Executing(fut);
3173 return Poll::Pending;
3174 }
3175 },
3176 GraphScanState::Done => {
3177 return Poll::Ready(None);
3178 }
3179 }
3180 }
3181 }
3182}
3183
3184impl RecordBatchStream for GraphScanStream {
3185 fn schema(&self) -> SchemaRef {
3186 self.schema.clone()
3187 }
3188}
3189
3190#[cfg(test)]
3191mod tests {
3192 use super::*;
3193
3194 #[test]
3195 fn test_build_vertex_schema() {
3196 let uni_schema = UniSchema::default();
3197 let schema = GraphScanExec::build_vertex_schema(
3198 "n",
3199 "Person",
3200 &["name".to_string(), "age".to_string()],
3201 &uni_schema,
3202 );
3203
3204 assert_eq!(schema.fields().len(), 4);
3205 assert_eq!(schema.field(0).name(), "n._vid");
3206 assert_eq!(schema.field(1).name(), "n._labels");
3207 assert_eq!(schema.field(2).name(), "n.name");
3208 assert_eq!(schema.field(3).name(), "n.age");
3209 }
3210
3211 #[test]
3212 fn test_build_edge_schema() {
3213 let uni_schema = UniSchema::default();
3214 let schema =
3215 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3216
3217 assert_eq!(schema.fields().len(), 4);
3218 assert_eq!(schema.field(0).name(), "r._eid");
3219 assert_eq!(schema.field(1).name(), "r._src_vid");
3220 assert_eq!(schema.field(2).name(), "r._dst_vid");
3221 assert_eq!(schema.field(3).name(), "r.weight");
3222 }
3223
3224 #[test]
3225 fn test_build_schemaless_vertex_schema() {
3226 let schema = GraphScanExec::build_schemaless_vertex_schema(
3227 "n",
3228 &["name".to_string(), "age".to_string()],
3229 );
3230
3231 assert_eq!(schema.fields().len(), 4);
3232 assert_eq!(schema.field(0).name(), "n._vid");
3233 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3234 assert_eq!(schema.field(1).name(), "n._labels");
3235 assert_eq!(schema.field(2).name(), "n.name");
3236 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3237 assert_eq!(schema.field(3).name(), "n.age");
3238 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3239 }
3240
3241 #[test]
3242 fn test_schemaless_all_scan_has_empty_label() {
3243 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[]);
3248
3249 assert_eq!(schema.fields().len(), 2);
3251 assert_eq!(schema.field(0).name(), "n._vid");
3252 assert_eq!(schema.field(1).name(), "n._labels");
3253 }
3254
3255 #[test]
3256 fn test_cypher_value_all_props_extraction() {
3257 let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3259 let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3260
3261 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3263 match decoded {
3264 uni_common::Value::Map(map) => {
3265 let age_val = map.get("age").unwrap();
3266 assert_eq!(age_val, &uni_common::Value::Int(30));
3267 }
3268 _ => panic!("Expected Map"),
3269 }
3270
3271 let single_val = serde_json::json!(30);
3273 let single_bytes = encode_cypher_value(&single_val).unwrap();
3274 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3275 assert_eq!(single_decoded, uni_common::Value::Int(30));
3276 }
3277
3278 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3280 let schema = Arc::new(Schema::new(vec![
3281 Field::new("_vid", DataType::UInt64, false),
3282 Field::new("_deleted", DataType::Boolean, false),
3283 Field::new("_version", DataType::UInt64, false),
3284 Field::new("name", DataType::Utf8, true),
3285 ]));
3286 let names: Vec<String> = vids
3288 .iter()
3289 .zip(versions.iter())
3290 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3291 .collect();
3292 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3293
3294 RecordBatch::try_new(
3295 schema,
3296 vec![
3297 Arc::new(UInt64Array::from(vids.to_vec())),
3298 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3299 Arc::new(UInt64Array::from(versions.to_vec())),
3300 Arc::new(name_arr),
3301 ],
3302 )
3303 .unwrap()
3304 }
3305
3306 #[test]
3307 fn test_mvcc_dedup_multiple_versions() {
3308 let batch = make_mvcc_batch(
3311 &[1, 1, 1, 2, 2],
3312 &[3, 1, 5, 2, 4],
3313 &[false, false, false, false, false],
3314 );
3315
3316 let result = mvcc_dedup_batch(&batch).unwrap();
3317 assert_eq!(result.num_rows(), 2);
3318
3319 let vid_col = result
3320 .column_by_name("_vid")
3321 .unwrap()
3322 .as_any()
3323 .downcast_ref::<UInt64Array>()
3324 .unwrap();
3325 let ver_col = result
3326 .column_by_name("_version")
3327 .unwrap()
3328 .as_any()
3329 .downcast_ref::<UInt64Array>()
3330 .unwrap();
3331 let name_col = result
3332 .column_by_name("name")
3333 .unwrap()
3334 .as_any()
3335 .downcast_ref::<arrow_array::StringArray>()
3336 .unwrap();
3337
3338 assert_eq!(vid_col.value(0), 1);
3340 assert_eq!(ver_col.value(0), 5);
3341 assert_eq!(name_col.value(0), "v1_ver5");
3342
3343 assert_eq!(vid_col.value(1), 2);
3344 assert_eq!(ver_col.value(1), 4);
3345 assert_eq!(name_col.value(1), "v2_ver4");
3346 }
3347
3348 #[test]
3349 fn test_mvcc_dedup_single_rows() {
3350 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3352 let result = mvcc_dedup_batch(&batch).unwrap();
3353 assert_eq!(result.num_rows(), 3);
3354 }
3355
3356 #[test]
3357 fn test_mvcc_dedup_empty() {
3358 let batch = make_mvcc_batch(&[], &[], &[]);
3359 let result = mvcc_dedup_batch(&batch).unwrap();
3360 assert_eq!(result.num_rows(), 0);
3361 }
3362
3363 #[test]
3364 fn test_filter_l0_tombstones_removes_tombstoned() {
3365 use crate::query::df_graph::L0Context;
3366
3367 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3369
3370 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3372 {
3373 }
3377 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3378 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3379
3380 let l0_ctx = L0Context {
3381 current_l0: Some(l0_buf),
3382 transaction_l0: None,
3383 pending_flush_l0s: vec![],
3384 };
3385
3386 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3387 assert_eq!(result.num_rows(), 2);
3388
3389 let vid_col = result
3390 .column_by_name("_vid")
3391 .unwrap()
3392 .as_any()
3393 .downcast_ref::<UInt64Array>()
3394 .unwrap();
3395 assert_eq!(vid_col.value(0), 1);
3396 assert_eq!(vid_col.value(1), 3);
3397 }
3398
3399 #[test]
3400 fn test_filter_l0_tombstones_none() {
3401 use crate::query::df_graph::L0Context;
3402
3403 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3404 let l0_ctx = L0Context::default();
3405
3406 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3407 assert_eq!(result.num_rows(), 3);
3408 }
3409
3410 #[test]
3411 fn test_map_to_output_schema_basic() {
3412 use crate::query::df_graph::L0Context;
3413
3414 let lance_schema = Arc::new(Schema::new(vec![
3416 Field::new("_vid", DataType::UInt64, false),
3417 Field::new("_deleted", DataType::Boolean, false),
3418 Field::new("_version", DataType::UInt64, false),
3419 Field::new("name", DataType::Utf8, true),
3420 ]));
3421 let name_arr: arrow_array::StringArray =
3422 vec![Some("Alice"), Some("Bob")].into_iter().collect();
3423 let batch = RecordBatch::try_new(
3424 lance_schema,
3425 vec![
3426 Arc::new(UInt64Array::from(vec![1u64, 2])),
3427 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3428 Arc::new(UInt64Array::from(vec![1u64, 1])),
3429 Arc::new(name_arr),
3430 ],
3431 )
3432 .unwrap();
3433
3434 let output_schema = Arc::new(Schema::new(vec![
3436 Field::new("n._vid", DataType::UInt64, false),
3437 Field::new("n._labels", labels_data_type(), true),
3438 Field::new("n.name", DataType::Utf8, true),
3439 ]));
3440
3441 let l0_ctx = L0Context::default();
3442 let result = map_to_output_schema(
3443 &batch,
3444 "Person",
3445 "n",
3446 &["name".to_string()],
3447 &output_schema,
3448 &l0_ctx,
3449 )
3450 .unwrap();
3451
3452 assert_eq!(result.num_rows(), 2);
3453 assert_eq!(result.schema().fields().len(), 3);
3454 assert_eq!(result.schema().field(0).name(), "n._vid");
3455 assert_eq!(result.schema().field(1).name(), "n._labels");
3456 assert_eq!(result.schema().field(2).name(), "n.name");
3457
3458 let name_col = result
3460 .column(2)
3461 .as_any()
3462 .downcast_ref::<arrow_array::StringArray>()
3463 .unwrap();
3464 assert_eq!(name_col.value(0), "Alice");
3465 assert_eq!(name_col.value(1), "Bob");
3466 }
3467}