1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
90
91 is_edge_scan: bool,
93
94 is_schemaless: bool,
96
97 schema: SchemaRef,
99
100 properties: PlanProperties,
102
103 metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("GraphScanExec")
110 .field("label", &self.label)
111 .field("variable", &self.variable)
112 .field("projected_properties", &self.projected_properties)
113 .field("is_edge_scan", &self.is_edge_scan)
114 .finish()
115 }
116}
117
118impl GraphScanExec {
119 pub fn new_vertex_scan(
124 graph_ctx: Arc<GraphExecutionContext>,
125 label: impl Into<String>,
126 variable: impl Into<String>,
127 projected_properties: Vec<String>,
128 filter: Option<Arc<dyn PhysicalExpr>>,
129 ) -> Self {
130 let label = label.into();
131 let variable = variable.into();
132
133 let uni_schema = graph_ctx.storage().schema_manager().schema();
135 let schema =
136 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138 let properties = compute_plan_properties(schema.clone());
139
140 Self {
141 graph_ctx,
142 label,
143 variable,
144 projected_properties,
145 filter,
146 is_edge_scan: false,
147 is_schemaless: false,
148 schema,
149 properties,
150 metrics: ExecutionPlanMetricsSet::new(),
151 }
152 }
153
154 pub fn new_schemaless_vertex_scan(
160 graph_ctx: Arc<GraphExecutionContext>,
161 label_name: impl Into<String>,
162 variable: impl Into<String>,
163 projected_properties: Vec<String>,
164 filter: Option<Arc<dyn PhysicalExpr>>,
165 ) -> Self {
166 let label = label_name.into();
167 let variable = variable.into();
168
169 let projected_properties: Vec<String> = projected_properties
174 .into_iter()
175 .filter(|p| p != "_vid" && p != "_labels")
176 .collect();
177
178 let uni_schema = graph_ctx.storage().schema_manager().schema();
179 let schema =
180 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
181 let properties = compute_plan_properties(schema.clone());
182
183 Self {
184 graph_ctx,
185 label,
186 variable,
187 projected_properties,
188 filter,
189 is_edge_scan: false,
190 is_schemaless: true,
191 schema,
192 properties,
193 metrics: ExecutionPlanMetricsSet::new(),
194 }
195 }
196
197 pub fn new_multi_label_vertex_scan(
202 graph_ctx: Arc<GraphExecutionContext>,
203 labels: Vec<String>,
204 variable: impl Into<String>,
205 projected_properties: Vec<String>,
206 filter: Option<Arc<dyn PhysicalExpr>>,
207 ) -> Self {
208 let variable = variable.into();
209 let projected_properties: Vec<String> = projected_properties
210 .into_iter()
211 .filter(|p| p != "_vid" && p != "_labels")
212 .collect();
213 let uni_schema = graph_ctx.storage().schema_manager().schema();
214 let schema =
215 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
216 let properties = compute_plan_properties(schema.clone());
217
218 let encoded_labels = labels.join(":");
220
221 Self {
222 graph_ctx,
223 label: encoded_labels,
224 variable,
225 projected_properties,
226 filter,
227 is_edge_scan: false,
228 is_schemaless: true,
229 schema,
230 properties,
231 metrics: ExecutionPlanMetricsSet::new(),
232 }
233 }
234
235 pub fn new_schemaless_all_scan(
241 graph_ctx: Arc<GraphExecutionContext>,
242 variable: impl Into<String>,
243 projected_properties: Vec<String>,
244 filter: Option<Arc<dyn PhysicalExpr>>,
245 ) -> Self {
246 let variable = variable.into();
247 let projected_properties: Vec<String> = projected_properties
248 .into_iter()
249 .filter(|p| p != "_vid" && p != "_labels")
250 .collect();
251
252 let uni_schema = graph_ctx.storage().schema_manager().schema();
253 let schema =
254 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
255 let properties = compute_plan_properties(schema.clone());
256
257 Self {
258 graph_ctx,
259 label: String::new(), variable,
261 projected_properties,
262 filter,
263 is_edge_scan: false,
264 is_schemaless: true,
265 schema,
266 properties,
267 metrics: ExecutionPlanMetricsSet::new(),
268 }
269 }
270
271 fn build_schemaless_vertex_schema(
277 variable: &str,
278 properties: &[String],
279 uni_schema: &uni_common::core::schema::Schema,
280 ) -> SchemaRef {
281 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
283 std::collections::HashMap::new();
284 for label_props in uni_schema.properties.values() {
285 for (name, meta) in label_props {
286 merged.entry(name.as_str()).or_insert(meta);
287 }
288 }
289
290 let mut fields = vec![
291 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
292 Field::new(format!("{}._labels", variable), labels_data_type(), true),
293 ];
294
295 for prop in properties {
296 let col_name = format!("{}.{}", variable, prop);
297 let arrow_type = merged
298 .get(prop.as_str())
299 .map(|meta| meta.r#type.to_arrow())
300 .unwrap_or(DataType::LargeBinary);
301 fields.push(Field::new(&col_name, arrow_type, true));
302 }
303
304 Arc::new(Schema::new(fields))
305 }
306
307 pub fn new_edge_scan(
312 graph_ctx: Arc<GraphExecutionContext>,
313 edge_type: impl Into<String>,
314 variable: impl Into<String>,
315 projected_properties: Vec<String>,
316 filter: Option<Arc<dyn PhysicalExpr>>,
317 ) -> Self {
318 let label = edge_type.into();
319 let variable = variable.into();
320
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
323 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
324
325 let properties = compute_plan_properties(schema.clone());
326
327 Self {
328 graph_ctx,
329 label,
330 variable,
331 projected_properties,
332 filter,
333 is_edge_scan: true,
334 is_schemaless: false,
335 schema,
336 properties,
337 metrics: ExecutionPlanMetricsSet::new(),
338 }
339 }
340
341 fn build_vertex_schema(
343 variable: &str,
344 label: &str,
345 properties: &[String],
346 uni_schema: &UniSchema,
347 ) -> SchemaRef {
348 let mut fields = vec![
349 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
350 Field::new(format!("{}._labels", variable), labels_data_type(), true),
351 ];
352 let label_props = uni_schema.properties.get(label);
353 for prop in properties {
354 let col_name = format!("{}.{}", variable, prop);
355 let arrow_type = resolve_property_type(prop, label_props);
356 fields.push(Field::new(&col_name, arrow_type, true));
357 }
358 Arc::new(Schema::new(fields))
359 }
360
361 fn build_edge_schema(
363 variable: &str,
364 edge_type: &str,
365 properties: &[String],
366 uni_schema: &UniSchema,
367 ) -> SchemaRef {
368 let mut fields = vec![
369 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
370 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
371 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
372 ];
373 let edge_props = uni_schema.properties.get(edge_type);
374 for prop in properties {
375 let col_name = format!("{}.{}", variable, prop);
376 let arrow_type = resolve_property_type(prop, edge_props);
377 fields.push(Field::new(&col_name, arrow_type, true));
378 }
379 Arc::new(Schema::new(fields))
380 }
381}
382
383impl DisplayAs for GraphScanExec {
384 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
386 write!(
387 f,
388 "GraphScanExec: {}={}, properties={:?}",
389 scan_type, self.label, self.projected_properties
390 )?;
391 if self.filter.is_some() {
392 write!(f, ", filter=<pushed>")?;
393 }
394 Ok(())
395 }
396}
397
398impl ExecutionPlan for GraphScanExec {
399 fn name(&self) -> &str {
400 "GraphScanExec"
401 }
402
403 fn as_any(&self) -> &dyn Any {
404 self
405 }
406
407 fn schema(&self) -> SchemaRef {
408 self.schema.clone()
409 }
410
411 fn properties(&self) -> &PlanProperties {
412 &self.properties
413 }
414
415 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
416 vec![]
417 }
418
419 fn with_new_children(
420 self: Arc<Self>,
421 children: Vec<Arc<dyn ExecutionPlan>>,
422 ) -> DFResult<Arc<dyn ExecutionPlan>> {
423 if children.is_empty() {
424 Ok(self)
425 } else {
426 Err(datafusion::error::DataFusionError::Plan(
427 "GraphScanExec does not accept children".to_string(),
428 ))
429 }
430 }
431
432 fn execute(
433 &self,
434 partition: usize,
435 _context: Arc<TaskContext>,
436 ) -> DFResult<SendableRecordBatchStream> {
437 let metrics = BaselineMetrics::new(&self.metrics, partition);
438
439 Ok(Box::pin(GraphScanStream::new(
440 self.graph_ctx.clone(),
441 self.label.clone(),
442 self.variable.clone(),
443 self.projected_properties.clone(),
444 self.is_edge_scan,
445 self.is_schemaless,
446 self.schema.clone(),
447 metrics,
448 )))
449 }
450
451 fn metrics(&self) -> Option<MetricsSet> {
452 Some(self.metrics.clone_inner())
453 }
454}
455
456enum GraphScanState {
458 Init,
460 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
462 Done,
464}
465
466struct GraphScanStream {
472 graph_ctx: Arc<GraphExecutionContext>,
474
475 label: String,
477
478 variable: String,
480
481 properties: Vec<String>,
483
484 is_edge_scan: bool,
486
487 is_schemaless: bool,
489
490 schema: SchemaRef,
492
493 state: GraphScanState,
495
496 metrics: BaselineMetrics,
498}
499
500impl GraphScanStream {
501 #[expect(clippy::too_many_arguments)]
503 fn new(
504 graph_ctx: Arc<GraphExecutionContext>,
505 label: String,
506 variable: String,
507 properties: Vec<String>,
508 is_edge_scan: bool,
509 is_schemaless: bool,
510 schema: SchemaRef,
511 metrics: BaselineMetrics,
512 ) -> Self {
513 Self {
514 graph_ctx,
515 label,
516 variable,
517 properties,
518 is_edge_scan,
519 is_schemaless,
520 schema,
521 state: GraphScanState::Init,
522 metrics,
523 }
524 }
525}
526
527pub(crate) fn resolve_property_type(
532 prop: &str,
533 schema_props: Option<
534 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
535 >,
536) -> DataType {
537 if prop == "overflow_json" {
538 DataType::LargeBinary
539 } else {
540 schema_props
541 .and_then(|props| props.get(prop))
542 .map(|meta| meta.r#type.to_arrow())
543 .unwrap_or(DataType::LargeBinary)
544 }
545}
546
547#[cfg(test)]
556fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
557 mvcc_dedup_batch_by(batch, "_vid")
558}
559
560fn mvcc_dedup_to_option(
565 batch: Option<RecordBatch>,
566 id_column: &str,
567) -> DFResult<Option<RecordBatch>> {
568 match batch {
569 Some(b) => {
570 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
571 Ok(if deduped.num_rows() > 0 {
572 Some(deduped)
573 } else {
574 None
575 })
576 }
577 None => Ok(None),
578 }
579}
580
581fn merge_lance_and_l0(
585 lance_deduped: Option<RecordBatch>,
586 l0_batch: RecordBatch,
587 internal_schema: &SchemaRef,
588 id_column: &str,
589) -> DFResult<Option<RecordBatch>> {
590 let has_l0 = l0_batch.num_rows() > 0;
591 match (lance_deduped, has_l0) {
592 (Some(lance), true) => {
593 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
594 .map_err(arrow_err)?;
595 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
596 }
597 (Some(lance), false) => Ok(Some(lance)),
598 (None, true) => Ok(Some(l0_batch)),
599 (None, false) => Ok(None),
600 }
601}
602
603fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
608 if !columns.iter().any(|c| c == col_name) {
609 columns.push(col_name.to_string());
610 }
611}
612
613fn extract_from_overflow_blob(
618 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
619 row: usize,
620 prop: &str,
621) -> Option<Vec<u8>> {
622 let arr = overflow_arr?;
623 if arr.is_null(row) {
624 return None;
625 }
626 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
627}
628
629fn build_overflow_property_column(
636 num_rows: usize,
637 vid_arr: &UInt64Array,
638 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
639 prop: &str,
640 l0_ctx: &crate::query::df_graph::L0Context,
641) -> ArrayRef {
642 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
643 for i in 0..num_rows {
644 let vid = Vid::from(vid_arr.value(i));
645
646 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
648
649 if let Some(val_opt) = l0_val {
650 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
651 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
652 builder.append_value(&bytes);
653 } else {
654 builder.append_null();
655 }
656 }
657 Arc::new(builder.finish())
658}
659
660fn resolve_l0_property(
666 vid: &Vid,
667 prop: &str,
668 l0_ctx: &crate::query::df_graph::L0Context,
669) -> Option<Option<Value>> {
670 let mut result = None;
671 for l0 in l0_ctx.iter_l0_buffers() {
672 let guard = l0.read();
673 if let Some(props) = guard.vertex_properties.get(vid)
674 && let Some(val) = props.get(prop)
675 {
676 result = Some(Some(val.clone()));
677 }
678 }
679 result
680}
681
682fn append_value_as_cypher_binary(
687 builder: &mut arrow_array::builder::LargeBinaryBuilder,
688 val: Option<&Value>,
689) {
690 match val {
691 Some(v) if !v.is_null() => {
692 let json_val: serde_json::Value = v.clone().into();
693 match encode_cypher_value(&json_val) {
694 Ok(bytes) => builder.append_value(bytes),
695 Err(_) => builder.append_null(),
696 }
697 }
698 _ => builder.append_null(),
699 }
700}
701
702fn build_all_props_column_with_l0_overlay(
710 num_rows: usize,
711 vid_arr: &UInt64Array,
712 props_arr: Option<&arrow_array::LargeBinaryArray>,
713 l0_ctx: &crate::query::df_graph::L0Context,
714) -> ArrayRef {
715 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
716 for i in 0..num_rows {
717 let vid = Vid::from(vid_arr.value(i));
718
719 let mut merged_props = serde_json::Map::new();
721 if let Some(arr) = props_arr
722 && !arr.is_null(i)
723 && let Ok(uni_common::Value::Map(map)) =
724 uni_common::cypher_value_codec::decode(arr.value(i))
725 {
726 for (k, v) in map {
727 let json_val: serde_json::Value = v.into();
728 merged_props.insert(k, json_val);
729 }
730 }
731
732 for l0 in l0_ctx.iter_l0_buffers() {
734 let guard = l0.read();
735 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
736 for (k, v) in l0_props {
737 let json_val: serde_json::Value = v.clone().into();
738 merged_props.insert(k.clone(), json_val);
739 }
740 }
741 }
742
743 if merged_props.is_empty() {
745 builder.append_null();
746 } else {
747 let json_obj = serde_json::Value::Object(merged_props);
748 match encode_cypher_value(&json_obj) {
749 Ok(bytes) => builder.append_value(bytes),
750 Err(_) => builder.append_null(),
751 }
752 }
753 }
754 Arc::new(builder.finish())
755}
756
757fn build_all_props_column_for_schema_scan(
762 batch: &RecordBatch,
763 vid_arr: &UInt64Array,
764 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
765 projected_properties: &[String],
766 l0_ctx: &crate::query::df_graph::L0Context,
767) -> ArrayRef {
768 let schema_props: Vec<&str> = projected_properties
770 .iter()
771 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
772 .map(String::as_str)
773 .collect();
774
775 let num_rows = batch.num_rows();
776 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
777 for i in 0..num_rows {
778 let vid = Vid::from(vid_arr.value(i));
779 let mut merged_props = serde_json::Map::new();
780
781 for &prop in &schema_props {
783 if let Some(col) = batch.column_by_name(prop) {
784 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
785 if !val.is_null() {
786 let json_val: serde_json::Value = val.into();
787 merged_props.insert(prop.to_string(), json_val);
788 }
789 }
790 }
791
792 if let Some(arr) = overflow_arr
794 && !arr.is_null(i)
795 && let Ok(uni_common::Value::Map(map)) =
796 uni_common::cypher_value_codec::decode(arr.value(i))
797 {
798 for (k, v) in map {
799 let json_val: serde_json::Value = v.into();
800 merged_props.insert(k, json_val);
801 }
802 }
803
804 for l0 in l0_ctx.iter_l0_buffers() {
806 let guard = l0.read();
807 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
808 for (k, v) in l0_props {
809 let json_val: serde_json::Value = v.clone().into();
810 merged_props.insert(k.clone(), json_val);
811 }
812 }
813 }
814
815 if merged_props.is_empty() {
816 builder.append_null();
817 } else {
818 let json_obj = serde_json::Value::Object(merged_props);
819 match encode_cypher_value(&json_obj) {
820 Ok(bytes) => builder.append_value(bytes),
821 Err(_) => builder.append_null(),
822 }
823 }
824 }
825 Arc::new(builder.finish())
826}
827
828fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
834 if batch.num_rows() == 0 {
835 return Ok(batch.clone());
836 }
837
838 let id_col = batch
839 .column_by_name(id_column)
840 .ok_or_else(|| {
841 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
842 })?
843 .clone();
844 let version_col = batch
845 .column_by_name("_version")
846 .ok_or_else(|| {
847 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
848 })?
849 .clone();
850
851 let sort_columns = vec![
853 arrow::compute::SortColumn {
854 values: id_col,
855 options: Some(arrow::compute::SortOptions {
856 descending: false,
857 nulls_first: false,
858 }),
859 },
860 arrow::compute::SortColumn {
861 values: version_col,
862 options: Some(arrow::compute::SortOptions {
863 descending: true,
864 nulls_first: false,
865 }),
866 },
867 ];
868 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
869
870 let sorted_columns: Vec<ArrayRef> = batch
872 .columns()
873 .iter()
874 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
875 .collect::<Result<_, _>>()
876 .map_err(arrow_err)?;
877 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
878
879 let sorted_id = sorted
881 .column_by_name(id_column)
882 .unwrap()
883 .as_any()
884 .downcast_ref::<UInt64Array>()
885 .unwrap();
886
887 let mut keep = vec![false; sorted.num_rows()];
888 if !keep.is_empty() {
889 keep[0] = true;
890 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
891 if sorted_id.value(i) != sorted_id.value(i - 1) {
892 *flag = true;
893 }
894 }
895 }
896
897 let mask = arrow_array::BooleanArray::from(keep);
898 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
899}
900
901fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
903 if batch.num_rows() == 0 {
904 return Ok(batch.clone());
905 }
906 let op_col = match batch.column_by_name("op") {
907 Some(col) => col
908 .as_any()
909 .downcast_ref::<arrow_array::UInt8Array>()
910 .unwrap(),
911 None => return Ok(batch.clone()),
912 };
913 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
914 let mask = arrow_array::BooleanArray::from(keep);
915 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
916}
917
918fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
920 if batch.num_rows() == 0 {
921 return Ok(batch.clone());
922 }
923 let deleted_col = match batch.column_by_name("_deleted") {
924 Some(col) => col
925 .as_any()
926 .downcast_ref::<arrow_array::BooleanArray>()
927 .unwrap(),
928 None => return Ok(batch.clone()),
929 };
930 let keep: Vec<bool> = (0..deleted_col.len())
931 .map(|i| !deleted_col.value(i))
932 .collect();
933 let mask = arrow_array::BooleanArray::from(keep);
934 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
935}
936
937fn filter_l0_tombstones(
939 batch: &RecordBatch,
940 l0_ctx: &crate::query::df_graph::L0Context,
941) -> DFResult<RecordBatch> {
942 if batch.num_rows() == 0 {
943 return Ok(batch.clone());
944 }
945
946 let mut tombstones: HashSet<u64> = HashSet::new();
947 for l0 in l0_ctx.iter_l0_buffers() {
948 let guard = l0.read();
949 for vid in guard.vertex_tombstones.iter() {
950 tombstones.insert(vid.as_u64());
951 }
952 }
953
954 if tombstones.is_empty() {
955 return Ok(batch.clone());
956 }
957
958 let vid_col = batch
959 .column_by_name("_vid")
960 .ok_or_else(|| {
961 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
962 })?
963 .as_any()
964 .downcast_ref::<UInt64Array>()
965 .unwrap();
966
967 let keep: Vec<bool> = (0..vid_col.len())
968 .map(|i| !tombstones.contains(&vid_col.value(i)))
969 .collect();
970 let mask = arrow_array::BooleanArray::from(keep);
971 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
972}
973
974fn filter_l0_edge_tombstones(
976 batch: &RecordBatch,
977 l0_ctx: &crate::query::df_graph::L0Context,
978) -> DFResult<RecordBatch> {
979 if batch.num_rows() == 0 {
980 return Ok(batch.clone());
981 }
982
983 let mut tombstones: HashSet<u64> = HashSet::new();
984 for l0 in l0_ctx.iter_l0_buffers() {
985 let guard = l0.read();
986 for eid in guard.tombstones.keys() {
987 tombstones.insert(eid.as_u64());
988 }
989 }
990
991 if tombstones.is_empty() {
992 return Ok(batch.clone());
993 }
994
995 let eid_col = batch
996 .column_by_name("eid")
997 .ok_or_else(|| {
998 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
999 })?
1000 .as_any()
1001 .downcast_ref::<UInt64Array>()
1002 .unwrap();
1003
1004 let keep: Vec<bool> = (0..eid_col.len())
1005 .map(|i| !tombstones.contains(&eid_col.value(i)))
1006 .collect();
1007 let mask = arrow_array::BooleanArray::from(keep);
1008 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1009}
1010
1011fn build_l0_vertex_batch(
1017 l0_ctx: &crate::query::df_graph::L0Context,
1018 label: &str,
1019 lance_schema: &SchemaRef,
1020 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1021) -> DFResult<RecordBatch> {
1022 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1025
1026 for l0 in l0_ctx.iter_l0_buffers() {
1027 let guard = l0.read();
1028 for vid in guard.vertex_tombstones.iter() {
1030 tombstones.insert(vid.as_u64());
1031 }
1032 for vid in guard.vids_for_label(label) {
1034 let vid_u64 = vid.as_u64();
1035 if tombstones.contains(&vid_u64) {
1036 continue;
1037 }
1038 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1039 let entry = vid_data
1040 .entry(vid_u64)
1041 .or_insert_with(|| (Properties::new(), 0));
1042 if let Some(props) = guard.vertex_properties.get(&vid) {
1044 for (k, v) in props {
1045 entry.0.insert(k.clone(), v.clone());
1046 }
1047 }
1048 if version > entry.1 {
1050 entry.1 = version;
1051 }
1052 }
1053 }
1054
1055 for t in &tombstones {
1057 vid_data.remove(t);
1058 }
1059
1060 if vid_data.is_empty() {
1061 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1062 }
1063
1064 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1066 vids.sort_unstable();
1067
1068 let num_rows = vids.len();
1069 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1070
1071 let schema_prop_names: HashSet<&str> = label_props
1073 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1074 .unwrap_or_default();
1075
1076 for field in lance_schema.fields() {
1077 let col_name = field.name().as_str();
1078 match col_name {
1079 "_vid" => {
1080 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1081 }
1082 "_deleted" => {
1083 let vals = vec![false; num_rows];
1085 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1086 }
1087 "_version" => {
1088 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1089 columns.push(Arc::new(UInt64Array::from(vals)));
1090 }
1091 "overflow_json" => {
1092 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1094 for vid_u64 in &vids {
1095 let (props, _) = &vid_data[vid_u64];
1096 let mut overflow = serde_json::Map::new();
1097 for (k, v) in props {
1098 if k == "ext_id" || k.starts_with('_') {
1099 continue;
1100 }
1101 if !schema_prop_names.contains(k.as_str()) {
1102 let json_val: serde_json::Value = v.clone().into();
1103 overflow.insert(k.clone(), json_val);
1104 }
1105 }
1106 if overflow.is_empty() {
1107 builder.append_null();
1108 } else {
1109 let json_val = serde_json::Value::Object(overflow);
1110 match encode_cypher_value(&json_val) {
1111 Ok(bytes) => builder.append_value(bytes),
1112 Err(_) => builder.append_null(),
1113 }
1114 }
1115 }
1116 columns.push(Arc::new(builder.finish()));
1117 }
1118 _ => {
1119 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1121 columns.push(col);
1122 }
1123 }
1124 }
1125
1126 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1127}
1128
1129fn build_l0_property_column(
1133 vids: &[u64],
1134 vid_data: &HashMap<u64, (Properties, u64)>,
1135 prop_name: &str,
1136 data_type: &DataType,
1137) -> DFResult<ArrayRef> {
1138 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1140 let props_map: HashMap<Vid, Properties> = vid_data
1141 .iter()
1142 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1143 .collect();
1144
1145 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1146}
1147
1148fn build_l0_edge_batch(
1154 l0_ctx: &crate::query::df_graph::L0Context,
1155 edge_type: &str,
1156 internal_schema: &SchemaRef,
1157 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1158) -> DFResult<RecordBatch> {
1159 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1162 let mut tombstones: HashSet<u64> = HashSet::new();
1163
1164 for l0 in l0_ctx.iter_l0_buffers() {
1165 let guard = l0.read();
1166 for eid in guard.tombstones.keys() {
1168 tombstones.insert(eid.as_u64());
1169 }
1170 for eid in guard.eids_for_type(edge_type) {
1172 let eid_u64 = eid.as_u64();
1173 if tombstones.contains(&eid_u64) {
1174 continue;
1175 }
1176 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1177 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1178 None => continue,
1179 };
1180 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1181 let entry = eid_data
1182 .entry(eid_u64)
1183 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1184 if let Some(props) = guard.edge_properties.get(&eid) {
1186 for (k, v) in props {
1187 entry.2.insert(k.clone(), v.clone());
1188 }
1189 }
1190 entry.0 = src_vid;
1192 entry.1 = dst_vid;
1193 if version > entry.3 {
1195 entry.3 = version;
1196 }
1197 }
1198 }
1199
1200 for t in &tombstones {
1202 eid_data.remove(t);
1203 }
1204
1205 if eid_data.is_empty() {
1206 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1207 }
1208
1209 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1211 eids.sort_unstable();
1212
1213 let num_rows = eids.len();
1214 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1215
1216 let schema_prop_names: HashSet<&str> = type_props
1218 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1219 .unwrap_or_default();
1220
1221 for field in internal_schema.fields() {
1222 let col_name = field.name().as_str();
1223 match col_name {
1224 "eid" => {
1225 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1226 }
1227 "src_vid" => {
1228 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1229 columns.push(Arc::new(UInt64Array::from(vals)));
1230 }
1231 "dst_vid" => {
1232 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1233 columns.push(Arc::new(UInt64Array::from(vals)));
1234 }
1235 "op" => {
1236 let vals = vec![0u8; num_rows];
1238 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1239 }
1240 "_version" => {
1241 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1242 columns.push(Arc::new(UInt64Array::from(vals)));
1243 }
1244 "overflow_json" => {
1245 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1247 for eid_u64 in &eids {
1248 let (_, _, props, _) = &eid_data[eid_u64];
1249 let mut overflow = serde_json::Map::new();
1250 for (k, v) in props {
1251 if k.starts_with('_') {
1252 continue;
1253 }
1254 if !schema_prop_names.contains(k.as_str()) {
1255 let json_val: serde_json::Value = v.clone().into();
1256 overflow.insert(k.clone(), json_val);
1257 }
1258 }
1259 if overflow.is_empty() {
1260 builder.append_null();
1261 } else {
1262 let json_val = serde_json::Value::Object(overflow);
1263 match encode_cypher_value(&json_val) {
1264 Ok(bytes) => builder.append_value(bytes),
1265 Err(_) => builder.append_null(),
1266 }
1267 }
1268 }
1269 columns.push(Arc::new(builder.finish()));
1270 }
1271 _ => {
1272 let col =
1274 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1275 columns.push(col);
1276 }
1277 }
1278 }
1279
1280 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1281}
1282
1283fn build_l0_edge_property_column(
1287 eids: &[u64],
1288 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1289 prop_name: &str,
1290 data_type: &DataType,
1291) -> DFResult<ArrayRef> {
1292 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1294 let props_map: HashMap<Vid, Properties> = eid_data
1295 .iter()
1296 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1297 .collect();
1298
1299 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1300}
1301
1302fn build_labels_column_for_known_label(
1308 vid_arr: &UInt64Array,
1309 label: &str,
1310 l0_ctx: &crate::query::df_graph::L0Context,
1311 batch_labels_col: Option<&arrow_array::ListArray>,
1312) -> DFResult<ArrayRef> {
1313 use uni_store::storage::arrow_convert::labels_from_list_array;
1314
1315 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1316
1317 for i in 0..vid_arr.len() {
1318 let vid = Vid::from(vid_arr.value(i));
1319
1320 let mut labels = match batch_labels_col {
1322 Some(list_arr) => {
1323 let stored = labels_from_list_array(list_arr, i);
1324 if stored.is_empty() {
1325 vec![label.to_string()]
1326 } else {
1327 stored
1328 }
1329 }
1330 None => vec![label.to_string()],
1331 };
1332
1333 if !labels.iter().any(|l| l == label) {
1335 labels.push(label.to_string());
1336 }
1337
1338 for l0 in l0_ctx.iter_l0_buffers() {
1340 let guard = l0.read();
1341 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1342 for lbl in l0_labels {
1343 if !labels.contains(lbl) {
1344 labels.push(lbl.clone());
1345 }
1346 }
1347 }
1348 }
1349
1350 let values = labels_builder.values();
1351 for lbl in &labels {
1352 values.append_value(lbl);
1353 }
1354 labels_builder.append(true);
1355 }
1356
1357 Ok(Arc::new(labels_builder.finish()))
1358}
1359
1360fn map_to_output_schema(
1366 batch: &RecordBatch,
1367 label: &str,
1368 _variable: &str,
1369 projected_properties: &[String],
1370 output_schema: &SchemaRef,
1371 l0_ctx: &crate::query::df_graph::L0Context,
1372) -> DFResult<RecordBatch> {
1373 if batch.num_rows() == 0 {
1374 return Ok(RecordBatch::new_empty(output_schema.clone()));
1375 }
1376
1377 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1378
1379 let vid_col = batch
1381 .column_by_name("_vid")
1382 .ok_or_else(|| {
1383 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1384 })?
1385 .clone();
1386 let vid_arr = vid_col
1387 .as_any()
1388 .downcast_ref::<UInt64Array>()
1389 .ok_or_else(|| {
1390 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1391 })?;
1392
1393 let batch_labels_col = batch
1395 .column_by_name("_labels")
1396 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1397 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1398 columns.push(vid_col.clone());
1399 columns.push(labels_col);
1400
1401 let overflow_arr = batch
1404 .column_by_name("overflow_json")
1405 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1406
1407 for prop in projected_properties {
1408 if prop == "overflow_json" {
1409 match batch.column_by_name("overflow_json") {
1410 Some(col) => columns.push(col.clone()),
1411 None => {
1412 columns.push(arrow_array::new_null_array(
1414 &DataType::LargeBinary,
1415 batch.num_rows(),
1416 ));
1417 }
1418 }
1419 } else if prop == "_all_props" {
1420 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1424 let guard = l0.read();
1425 !guard.vertex_properties.is_empty()
1426 });
1427 let has_schema_cols = projected_properties
1429 .iter()
1430 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1431
1432 if !any_l0_has_vertex_props && !has_schema_cols {
1433 match batch.column_by_name("overflow_json") {
1435 Some(col) => columns.push(col.clone()),
1436 None => {
1437 columns.push(arrow_array::new_null_array(
1438 &DataType::LargeBinary,
1439 batch.num_rows(),
1440 ));
1441 }
1442 }
1443 } else {
1444 let col = build_all_props_column_for_schema_scan(
1446 batch,
1447 vid_arr,
1448 overflow_arr,
1449 projected_properties,
1450 l0_ctx,
1451 );
1452 columns.push(col);
1453 }
1454 } else {
1455 match batch.column_by_name(prop) {
1456 Some(col) => columns.push(col.clone()),
1457 None => {
1458 let col = build_overflow_property_column(
1461 batch.num_rows(),
1462 vid_arr,
1463 overflow_arr,
1464 prop,
1465 l0_ctx,
1466 );
1467 columns.push(col);
1468 }
1469 }
1470 }
1471 }
1472
1473 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1474}
1475
1476fn map_edge_to_output_schema(
1483 batch: &RecordBatch,
1484 variable: &str,
1485 projected_properties: &[String],
1486 output_schema: &SchemaRef,
1487) -> DFResult<RecordBatch> {
1488 if batch.num_rows() == 0 {
1489 return Ok(RecordBatch::new_empty(output_schema.clone()));
1490 }
1491
1492 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1493
1494 let eid_col = batch
1496 .column_by_name("eid")
1497 .ok_or_else(|| {
1498 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1499 })?
1500 .clone();
1501 columns.push(eid_col);
1502
1503 let src_col = batch
1505 .column_by_name("src_vid")
1506 .ok_or_else(|| {
1507 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1508 })?
1509 .clone();
1510 columns.push(src_col);
1511
1512 let dst_col = batch
1514 .column_by_name("dst_vid")
1515 .ok_or_else(|| {
1516 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1517 })?
1518 .clone();
1519 columns.push(dst_col);
1520
1521 for prop in projected_properties {
1523 if prop == "overflow_json" {
1524 match batch.column_by_name("overflow_json") {
1525 Some(col) => columns.push(col.clone()),
1526 None => {
1527 columns.push(arrow_array::new_null_array(
1528 &DataType::LargeBinary,
1529 batch.num_rows(),
1530 ));
1531 }
1532 }
1533 } else {
1534 match batch.column_by_name(prop) {
1535 Some(col) => columns.push(col.clone()),
1536 None => {
1537 let overflow_arr = batch
1540 .column_by_name("overflow_json")
1541 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1542
1543 if let Some(arr) = overflow_arr {
1544 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1545 for i in 0..batch.num_rows() {
1546 if !arr.is_null(i) {
1547 let blob = arr.value(i);
1548 if let Some(sub_bytes) =
1550 uni_common::cypher_value_codec::extract_map_entry_raw(
1551 blob, prop,
1552 )
1553 {
1554 builder.append_value(&sub_bytes);
1555 } else {
1556 builder.append_null();
1557 }
1558 } else {
1559 builder.append_null();
1560 }
1561 }
1562 columns.push(Arc::new(builder.finish()));
1563 } else {
1564 let target_field = output_schema
1566 .fields()
1567 .iter()
1568 .find(|f| f.name() == &format!("{}.{}", variable, prop));
1569 let dt = target_field
1570 .map(|f| f.data_type().clone())
1571 .unwrap_or(DataType::LargeBinary);
1572 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1573 }
1574 }
1575 }
1576 }
1577 }
1578
1579 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1580}
1581
1582async fn columnar_scan_vertex_batch_static(
1589 graph_ctx: &GraphExecutionContext,
1590 label: &str,
1591 variable: &str,
1592 projected_properties: &[String],
1593 output_schema: &SchemaRef,
1594) -> DFResult<RecordBatch> {
1595 let storage = graph_ctx.storage();
1596 let l0_ctx = graph_ctx.l0_context();
1597 let uni_schema = storage.schema_manager().schema();
1598 let label_props = uni_schema.properties.get(label);
1599
1600 let mut lance_columns: Vec<String> = vec![
1602 "_vid".to_string(),
1603 "_deleted".to_string(),
1604 "_version".to_string(),
1605 ];
1606 for prop in projected_properties {
1607 if prop == "overflow_json" {
1608 push_column_if_absent(&mut lance_columns, "overflow_json");
1609 } else {
1610 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1611 if exists_in_schema {
1612 push_column_if_absent(&mut lance_columns, prop);
1613 }
1614 }
1615 }
1616
1617 let needs_overflow = projected_properties
1619 .iter()
1620 .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1621 if needs_overflow {
1622 push_column_if_absent(&mut lance_columns, "overflow_json");
1623 }
1624
1625 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1627 let lance_batch = storage
1628 .scan_vertex_table(label, &lance_columns_refs, None)
1629 .await
1630 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1631
1632 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1634
1635 let internal_schema = match &lance_deduped {
1638 Some(batch) => batch.schema(),
1639 None => {
1640 let mut fields = vec![
1641 Field::new("_vid", DataType::UInt64, false),
1642 Field::new("_deleted", DataType::Boolean, false),
1643 Field::new("_version", DataType::UInt64, false),
1644 ];
1645 for col in &lance_columns {
1646 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1647 continue;
1648 }
1649 if col == "overflow_json" {
1650 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1651 } else {
1652 let arrow_type = label_props
1653 .and_then(|lp| lp.get(col.as_str()))
1654 .map(|meta| meta.r#type.to_arrow())
1655 .unwrap_or(DataType::LargeBinary);
1656 fields.push(Field::new(col, arrow_type, true));
1657 }
1658 }
1659 Arc::new(Schema::new(fields))
1660 }
1661 };
1662
1663 let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1665
1666 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1668 else {
1669 return Ok(RecordBatch::new_empty(output_schema.clone()));
1670 };
1671
1672 let merged = filter_deleted_rows(&merged)?;
1674 if merged.num_rows() == 0 {
1675 return Ok(RecordBatch::new_empty(output_schema.clone()));
1676 }
1677
1678 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1680
1681 if filtered.num_rows() == 0 {
1682 return Ok(RecordBatch::new_empty(output_schema.clone()));
1683 }
1684
1685 map_to_output_schema(
1687 &filtered,
1688 label,
1689 variable,
1690 projected_properties,
1691 output_schema,
1692 l0_ctx,
1693 )
1694}
1695
1696async fn columnar_scan_edge_batch_static(
1703 graph_ctx: &GraphExecutionContext,
1704 edge_type: &str,
1705 variable: &str,
1706 projected_properties: &[String],
1707 output_schema: &SchemaRef,
1708) -> DFResult<RecordBatch> {
1709 let storage = graph_ctx.storage();
1710 let l0_ctx = graph_ctx.l0_context();
1711 let uni_schema = storage.schema_manager().schema();
1712 let type_props = uni_schema.properties.get(edge_type);
1713
1714 let mut lance_columns: Vec<String> = vec![
1716 "eid".to_string(),
1717 "src_vid".to_string(),
1718 "dst_vid".to_string(),
1719 "op".to_string(),
1720 "_version".to_string(),
1721 ];
1722 for prop in projected_properties {
1723 if prop == "overflow_json" {
1724 push_column_if_absent(&mut lance_columns, "overflow_json");
1725 } else {
1726 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1727 if exists_in_schema {
1728 push_column_if_absent(&mut lance_columns, prop);
1729 }
1730 }
1731 }
1732
1733 let needs_overflow = projected_properties
1735 .iter()
1736 .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1737 if needs_overflow {
1738 push_column_if_absent(&mut lance_columns, "overflow_json");
1739 }
1740
1741 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1743 let lance_batch = storage
1744 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
1745 .await
1746 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1747
1748 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1750
1751 let internal_schema = match &lance_deduped {
1754 Some(batch) => batch.schema(),
1755 None => {
1756 let mut fields = vec![
1757 Field::new("eid", DataType::UInt64, false),
1758 Field::new("src_vid", DataType::UInt64, false),
1759 Field::new("dst_vid", DataType::UInt64, false),
1760 Field::new("op", DataType::UInt8, false),
1761 Field::new("_version", DataType::UInt64, false),
1762 ];
1763 for col in &lance_columns {
1764 if matches!(
1765 col.as_str(),
1766 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1767 ) {
1768 continue;
1769 }
1770 if col == "overflow_json" {
1771 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1772 } else {
1773 let arrow_type = type_props
1774 .and_then(|tp| tp.get(col.as_str()))
1775 .map(|meta| meta.r#type.to_arrow())
1776 .unwrap_or(DataType::LargeBinary);
1777 fields.push(Field::new(col, arrow_type, true));
1778 }
1779 }
1780 Arc::new(Schema::new(fields))
1781 }
1782 };
1783
1784 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1786
1787 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1789 return Ok(RecordBatch::new_empty(output_schema.clone()));
1790 };
1791
1792 let merged = filter_deleted_edge_ops(&merged)?;
1794 if merged.num_rows() == 0 {
1795 return Ok(RecordBatch::new_empty(output_schema.clone()));
1796 }
1797
1798 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1800
1801 if filtered.num_rows() == 0 {
1802 return Ok(RecordBatch::new_empty(output_schema.clone()));
1803 }
1804
1805 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1807}
1808
1809async fn columnar_scan_schemaless_vertex_batch_static(
1816 graph_ctx: &GraphExecutionContext,
1817 label: &str,
1818 variable: &str,
1819 projected_properties: &[String],
1820 output_schema: &SchemaRef,
1821) -> DFResult<RecordBatch> {
1822 let storage = graph_ctx.storage();
1823 let l0_ctx = graph_ctx.l0_context();
1824
1825 let filter = {
1828 let mut parts = Vec::new();
1829
1830 if !label.is_empty() {
1832 if label.contains(':') {
1833 for lbl in label.split(':') {
1835 parts.push(format!("array_contains(labels, '{}')", lbl));
1836 }
1837 } else {
1838 parts.push(format!("array_contains(labels, '{}')", label));
1839 }
1840 }
1841
1842 if parts.is_empty() {
1843 None
1844 } else {
1845 Some(parts.join(" AND "))
1846 }
1847 };
1848
1849 let lance_batch = storage
1851 .scan_main_vertex_table(
1852 &["_vid", "_deleted", "labels", "props_json", "_version"],
1853 filter.as_deref(),
1854 )
1855 .await
1856 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1857
1858 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1860
1861 let internal_schema = match &lance_deduped {
1864 Some(batch) => batch.schema(),
1865 None => Arc::new(Schema::new(vec![
1866 Field::new("_vid", DataType::UInt64, false),
1867 Field::new("_deleted", DataType::Boolean, false),
1868 Field::new("labels", labels_data_type(), false),
1869 Field::new("props_json", DataType::LargeBinary, true),
1870 Field::new("_version", DataType::UInt64, false),
1871 ])),
1872 };
1873
1874 let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1876
1877 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1879 else {
1880 return Ok(RecordBatch::new_empty(output_schema.clone()));
1881 };
1882
1883 let merged = filter_deleted_rows(&merged)?;
1885 if merged.num_rows() == 0 {
1886 return Ok(RecordBatch::new_empty(output_schema.clone()));
1887 }
1888
1889 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1891
1892 if filtered.num_rows() == 0 {
1893 return Ok(RecordBatch::new_empty(output_schema.clone()));
1894 }
1895
1896 map_to_schemaless_output_schema(
1898 &filtered,
1899 variable,
1900 projected_properties,
1901 output_schema,
1902 l0_ctx,
1903 )
1904}
1905
1906fn build_l0_schemaless_vertex_batch(
1912 l0_ctx: &crate::query::df_graph::L0Context,
1913 label: &str,
1914 internal_schema: &SchemaRef,
1915) -> DFResult<RecordBatch> {
1916 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1919 let mut tombstones: HashSet<u64> = HashSet::new();
1920
1921 let label_filter: Vec<&str> = if label.is_empty() {
1923 vec![]
1924 } else if label.contains(':') {
1925 label.split(':').collect()
1926 } else {
1927 vec![label]
1928 };
1929
1930 for l0 in l0_ctx.iter_l0_buffers() {
1931 let guard = l0.read();
1932
1933 for vid in guard.vertex_tombstones.iter() {
1935 tombstones.insert(vid.as_u64());
1936 }
1937
1938 let vids: Vec<Vid> = if label_filter.is_empty() {
1940 guard.all_vertex_vids()
1941 } else if label_filter.len() == 1 {
1942 guard.vids_for_label(label_filter[0])
1943 } else {
1944 guard.vids_with_all_labels(&label_filter)
1945 };
1946
1947 for vid in vids {
1948 let vid_u64 = vid.as_u64();
1949 if tombstones.contains(&vid_u64) {
1950 continue;
1951 }
1952 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1953 let entry = vid_data
1954 .entry(vid_u64)
1955 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1956
1957 if let Some(props) = guard.vertex_properties.get(&vid) {
1959 for (k, v) in props {
1960 entry.0.insert(k.clone(), v.clone());
1961 }
1962 }
1963 if version > entry.1 {
1965 entry.1 = version;
1966 }
1967 if let Some(labels) = guard.vertex_labels.get(&vid) {
1969 entry.2 = labels.clone();
1970 }
1971 }
1972 }
1973
1974 for t in &tombstones {
1976 vid_data.remove(t);
1977 }
1978
1979 if vid_data.is_empty() {
1980 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1981 }
1982
1983 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1985 vids.sort_unstable();
1986
1987 let num_rows = vids.len();
1988 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1989
1990 for field in internal_schema.fields() {
1991 match field.name().as_str() {
1992 "_vid" => {
1993 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1994 }
1995 "labels" => {
1996 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1997 for vid_u64 in &vids {
1998 let (_, _, labels) = &vid_data[vid_u64];
1999 let values = labels_builder.values();
2000 for lbl in labels {
2001 values.append_value(lbl);
2002 }
2003 labels_builder.append(true);
2004 }
2005 columns.push(Arc::new(labels_builder.finish()));
2006 }
2007 "props_json" => {
2008 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2009 for vid_u64 in &vids {
2010 let (props, _, _) = &vid_data[vid_u64];
2011 if props.is_empty() {
2012 builder.append_null();
2013 } else {
2014 let json_obj: serde_json::Value = {
2016 let mut map = serde_json::Map::new();
2017 for (k, v) in props {
2018 let json_val: serde_json::Value = v.clone().into();
2019 map.insert(k.clone(), json_val);
2020 }
2021 serde_json::Value::Object(map)
2022 };
2023 match encode_cypher_value(&json_obj) {
2024 Ok(bytes) => builder.append_value(bytes),
2025 Err(_) => builder.append_null(),
2026 }
2027 }
2028 }
2029 columns.push(Arc::new(builder.finish()));
2030 }
2031 "_deleted" => {
2032 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2034 false;
2035 num_rows
2036 ])));
2037 }
2038 "_version" => {
2039 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2040 columns.push(Arc::new(UInt64Array::from(vals)));
2041 }
2042 _ => {
2043 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2045 }
2046 }
2047 }
2048
2049 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2050}
2051
2052fn map_to_schemaless_output_schema(
2059 batch: &RecordBatch,
2060 _variable: &str,
2061 projected_properties: &[String],
2062 output_schema: &SchemaRef,
2063 l0_ctx: &crate::query::df_graph::L0Context,
2064) -> DFResult<RecordBatch> {
2065 if batch.num_rows() == 0 {
2066 return Ok(RecordBatch::new_empty(output_schema.clone()));
2067 }
2068
2069 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2070
2071 let vid_col = batch
2073 .column_by_name("_vid")
2074 .ok_or_else(|| {
2075 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2076 })?
2077 .clone();
2078 let vid_arr = vid_col
2079 .as_any()
2080 .downcast_ref::<UInt64Array>()
2081 .ok_or_else(|| {
2082 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2083 })?;
2084 columns.push(vid_col.clone());
2085
2086 let labels_col = batch.column_by_name("labels");
2088 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2089
2090 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2091 for i in 0..vid_arr.len() {
2092 let vid_u64 = vid_arr.value(i);
2093 let vid = Vid::from(vid_u64);
2094
2095 let mut row_labels: Vec<String> = Vec::new();
2097 if let Some(arr) = labels_arr
2098 && !arr.is_null(i)
2099 {
2100 let list_val = arr.value(i);
2101 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2102 for j in 0..str_arr.len() {
2103 if !str_arr.is_null(j) {
2104 row_labels.push(str_arr.value(j).to_string());
2105 }
2106 }
2107 }
2108 }
2109
2110 for l0 in l0_ctx.iter_l0_buffers() {
2112 let guard = l0.read();
2113 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2114 for lbl in l0_labels {
2115 if !row_labels.contains(lbl) {
2116 row_labels.push(lbl.clone());
2117 }
2118 }
2119 }
2120 }
2121
2122 let values = labels_builder.values();
2123 for lbl in &row_labels {
2124 values.append_value(lbl);
2125 }
2126 labels_builder.append(true);
2127 }
2128 columns.push(Arc::new(labels_builder.finish()));
2129
2130 let props_col = batch.column_by_name("props_json");
2132 let props_arr =
2133 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2134
2135 for prop in projected_properties {
2136 if prop == "_all_props" {
2137 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2140 let guard = l0.read();
2141 !guard.vertex_properties.is_empty()
2142 });
2143 if !any_l0_has_vertex_props {
2144 match props_col {
2145 Some(col) => columns.push(col.clone()),
2146 None => {
2147 columns.push(arrow_array::new_null_array(
2148 &DataType::LargeBinary,
2149 batch.num_rows(),
2150 ));
2151 }
2152 }
2153 } else {
2154 let col = build_all_props_column_with_l0_overlay(
2155 batch.num_rows(),
2156 vid_arr,
2157 props_arr,
2158 l0_ctx,
2159 );
2160 columns.push(col);
2161 }
2162 } else {
2163 let expected_type = output_schema
2168 .field_with_name(&format!("{_variable}.{prop}"))
2169 .map(|f| f.data_type().clone())
2170 .unwrap_or(DataType::LargeBinary);
2171
2172 if expected_type == DataType::LargeBinary {
2173 let col = build_overflow_property_column(
2174 batch.num_rows(),
2175 vid_arr,
2176 props_arr,
2177 prop,
2178 l0_ctx,
2179 );
2180 columns.push(col);
2181 } else {
2182 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2184 for i in 0..batch.num_rows() {
2185 let vid = Vid::from(vid_arr.value(i));
2186 let resolved =
2187 resolve_l0_property(&vid, prop, l0_ctx)
2188 .flatten()
2189 .or_else(|| {
2190 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2191 uni_common::cypher_value_codec::decode(&bytes).ok()
2192 })
2193 });
2194 if let Some(val) = resolved {
2195 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2196 }
2197 }
2198 let vids: Vec<Vid> = (0..batch.num_rows())
2199 .map(|i| Vid::from(vid_arr.value(i)))
2200 .collect();
2201 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2202 .unwrap_or_else(|_| {
2203 arrow_array::new_null_array(&expected_type, batch.num_rows())
2204 });
2205 columns.push(col);
2206 }
2207 }
2208 }
2209
2210 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2211}
2212
2213pub(crate) fn get_property_value(
2215 vid: &Vid,
2216 props_map: &HashMap<Vid, Properties>,
2217 prop_name: &str,
2218) -> Option<Value> {
2219 if prop_name == "_all_props" {
2220 return props_map.get(vid).map(|p| {
2221 let map: HashMap<String, Value> =
2222 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2223 Value::Map(map)
2224 });
2225 }
2226 props_map
2227 .get(vid)
2228 .and_then(|props| props.get(prop_name))
2229 .cloned()
2230}
2231
2232pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2236 let uni_val: uni_common::Value = val.clone().into();
2237 Ok(uni_common::cypher_value_codec::encode(&uni_val))
2238}
2239
2240macro_rules! build_numeric_column {
2242 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2243 let mut builder = <$builder_ty>::new();
2244 for vid in $vids {
2245 match get_property_value(vid, $props_map, $prop_name) {
2246 Some(ref v) => {
2247 if let Some(val) = $extractor(v) {
2248 builder.append_value($cast(val));
2249 } else {
2250 builder.append_null();
2251 }
2252 }
2253 None => builder.append_null(),
2254 }
2255 }
2256 Ok(Arc::new(builder.finish()) as ArrayRef)
2257 }};
2258}
2259
2260pub(crate) fn build_property_column_static(
2262 vids: &[Vid],
2263 props_map: &HashMap<Vid, Properties>,
2264 prop_name: &str,
2265 data_type: &DataType,
2266) -> DFResult<ArrayRef> {
2267 match data_type {
2268 DataType::LargeBinary => {
2269 use arrow_array::builder::LargeBinaryBuilder;
2271 let mut builder = LargeBinaryBuilder::new();
2272
2273 for vid in vids {
2274 match get_property_value(vid, props_map, prop_name) {
2275 Some(Value::Null) | None => builder.append_null(),
2276 Some(Value::Bytes(bytes)) => {
2277 builder.append_value(&bytes);
2278 }
2279 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2280 let bytes: Vec<u8> = arr
2283 .iter()
2284 .filter_map(|v| v.as_u64().map(|n| n as u8))
2285 .collect();
2286 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2287 builder.append_value(&bytes);
2288 } else {
2289 let json_val: serde_json::Value = Value::List(arr).into();
2290 match encode_cypher_value(&json_val) {
2291 Ok(encoded) => builder.append_value(encoded),
2292 Err(_) => builder.append_null(),
2293 }
2294 }
2295 }
2296 Some(val) => {
2297 let json_val: serde_json::Value = val.into();
2299 match encode_cypher_value(&json_val) {
2300 Ok(bytes) => builder.append_value(bytes),
2301 Err(_) => builder.append_null(),
2302 }
2303 }
2304 }
2305 }
2306 Ok(Arc::new(builder.finish()))
2307 }
2308 DataType::Binary => {
2309 let mut builder = BinaryBuilder::new();
2311 for vid in vids {
2312 let bytes = get_property_value(vid, props_map, prop_name)
2313 .filter(|v| !v.is_null())
2314 .and_then(|v| {
2315 let json_val: serde_json::Value = v.into();
2316 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2317 })
2318 .and_then(|crdt| crdt.to_msgpack().ok());
2319 match bytes {
2320 Some(b) => builder.append_value(&b),
2321 None => builder.append_null(),
2322 }
2323 }
2324 Ok(Arc::new(builder.finish()))
2325 }
2326 DataType::Utf8 => {
2327 let mut builder = StringBuilder::new();
2328 for vid in vids {
2329 match get_property_value(vid, props_map, prop_name) {
2330 Some(Value::String(s)) => builder.append_value(s),
2331 Some(Value::Null) | None => builder.append_null(),
2332 Some(other) => builder.append_value(other.to_string()),
2333 }
2334 }
2335 Ok(Arc::new(builder.finish()))
2336 }
2337 DataType::Int64 => {
2338 build_numeric_column!(
2339 vids,
2340 props_map,
2341 prop_name,
2342 Int64Builder,
2343 |v: &Value| v.as_i64(),
2344 |v| v
2345 )
2346 }
2347 DataType::Int32 => {
2348 build_numeric_column!(
2349 vids,
2350 props_map,
2351 prop_name,
2352 Int32Builder,
2353 |v: &Value| v.as_i64(),
2354 |v: i64| v as i32
2355 )
2356 }
2357 DataType::Float64 => {
2358 build_numeric_column!(
2359 vids,
2360 props_map,
2361 prop_name,
2362 Float64Builder,
2363 |v: &Value| v.as_f64(),
2364 |v| v
2365 )
2366 }
2367 DataType::Float32 => {
2368 build_numeric_column!(
2369 vids,
2370 props_map,
2371 prop_name,
2372 Float32Builder,
2373 |v: &Value| v.as_f64(),
2374 |v: f64| v as f32
2375 )
2376 }
2377 DataType::Boolean => {
2378 let mut builder = BooleanBuilder::new();
2379 for vid in vids {
2380 match get_property_value(vid, props_map, prop_name) {
2381 Some(Value::Bool(b)) => builder.append_value(b),
2382 _ => builder.append_null(),
2383 }
2384 }
2385 Ok(Arc::new(builder.finish()))
2386 }
2387 DataType::UInt64 => {
2388 build_numeric_column!(
2389 vids,
2390 props_map,
2391 prop_name,
2392 UInt64Builder,
2393 |v: &Value| v.as_u64(),
2394 |v| v
2395 )
2396 }
2397 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2398 let values_builder = Float32Builder::new();
2400 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2401 for vid in vids {
2402 match get_property_value(vid, props_map, prop_name) {
2403 Some(Value::Vector(v)) => {
2404 for val in v {
2405 list_builder.values().append_value(val);
2406 }
2407 list_builder.append(true);
2408 }
2409 Some(Value::List(arr)) => {
2410 for v in arr {
2411 list_builder
2412 .values()
2413 .append_value(v.as_f64().unwrap_or(0.0) as f32);
2414 }
2415 list_builder.append(true);
2416 }
2417 _ => {
2418 for _ in 0..*dim {
2420 list_builder.values().append_null();
2421 }
2422 list_builder.append(false);
2423 }
2424 }
2425 }
2426 Ok(Arc::new(list_builder.finish()))
2427 }
2428 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2429 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2431 for vid in vids {
2432 match get_property_value(vid, props_map, prop_name) {
2433 Some(Value::Temporal(tv)) => match tv {
2434 uni_common::TemporalValue::DateTime {
2435 nanos_since_epoch, ..
2436 }
2437 | uni_common::TemporalValue::LocalDateTime {
2438 nanos_since_epoch, ..
2439 } => {
2440 builder.append_value(nanos_since_epoch);
2441 }
2442 uni_common::TemporalValue::Date { days_since_epoch } => {
2443 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2444 }
2445 _ => builder.append_null(),
2446 },
2447 Some(Value::String(s)) => match parse_datetime_utc(&s) {
2448 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2449 Err(_) => builder.append_null(),
2450 },
2451 Some(Value::Int(n)) => {
2452 builder.append_value(n);
2453 }
2454 _ => builder.append_null(),
2455 }
2456 }
2457 Ok(Arc::new(builder.finish()))
2458 }
2459 DataType::Date32 => {
2460 let mut builder = Date32Builder::new();
2461 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2462 for vid in vids {
2463 match get_property_value(vid, props_map, prop_name) {
2464 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2465 builder.append_value(days_since_epoch);
2466 }
2467 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2468 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2469 Err(_) => builder.append_null(),
2470 },
2471 Some(Value::Int(n)) => {
2472 builder.append_value(n as i32);
2473 }
2474 _ => builder.append_null(),
2475 }
2476 }
2477 Ok(Arc::new(builder.finish()))
2478 }
2479 DataType::Time64(TimeUnit::Nanosecond) => {
2480 let mut builder = Time64NanosecondBuilder::new();
2481 for vid in vids {
2482 match get_property_value(vid, props_map, prop_name) {
2483 Some(Value::Temporal(
2484 uni_common::TemporalValue::LocalTime {
2485 nanos_since_midnight,
2486 }
2487 | uni_common::TemporalValue::Time {
2488 nanos_since_midnight,
2489 ..
2490 },
2491 )) => {
2492 builder.append_value(nanos_since_midnight);
2493 }
2494 Some(Value::Temporal(_)) => builder.append_null(),
2495 Some(Value::String(s)) => {
2496 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2497 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2498 {
2499 Ok(t) => {
2500 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2501 + t.nanosecond() as i64;
2502 builder.append_value(nanos);
2503 }
2504 Err(_) => builder.append_null(),
2505 }
2506 }
2507 Some(Value::Int(n)) => {
2508 builder.append_value(n);
2509 }
2510 _ => builder.append_null(),
2511 }
2512 }
2513 Ok(Arc::new(builder.finish()))
2514 }
2515 DataType::Interval(IntervalUnit::MonthDayNano) => {
2516 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2517 Vec::with_capacity(vids.len());
2518 for vid in vids {
2519 match get_property_value(vid, props_map, prop_name) {
2520 Some(Value::Temporal(uni_common::TemporalValue::Duration {
2521 months,
2522 days,
2523 nanos,
2524 })) => {
2525 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2526 months: months as i32,
2527 days: days as i32,
2528 nanoseconds: nanos,
2529 }));
2530 }
2531 Some(Value::Int(_n)) => {
2532 values.push(None);
2533 }
2534 _ => values.push(None),
2535 }
2536 }
2537 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2538 Ok(Arc::new(arr))
2539 }
2540 DataType::List(inner_field) => {
2541 build_list_property_column(vids, props_map, prop_name, inner_field)
2542 }
2543 DataType::Struct(fields) => {
2544 build_struct_property_column(vids, props_map, prop_name, fields)
2545 }
2546 _ => {
2548 let mut builder = StringBuilder::new();
2549 for vid in vids {
2550 match get_property_value(vid, props_map, prop_name) {
2551 Some(Value::Null) | None => builder.append_null(),
2552 Some(other) => builder.append_value(other.to_string()),
2553 }
2554 }
2555 Ok(Arc::new(builder.finish()))
2556 }
2557 }
2558}
2559
2560fn build_list_property_column(
2562 vids: &[Vid],
2563 props_map: &HashMap<Vid, Properties>,
2564 prop_name: &str,
2565 inner_field: &Arc<Field>,
2566) -> DFResult<ArrayRef> {
2567 match inner_field.data_type() {
2568 DataType::Utf8 => {
2569 let mut builder = ListBuilder::new(StringBuilder::new());
2570 for vid in vids {
2571 match get_property_value(vid, props_map, prop_name) {
2572 Some(Value::List(arr)) => {
2573 for v in arr {
2574 match v {
2575 Value::String(s) => builder.values().append_value(s),
2576 Value::Null => builder.values().append_null(),
2577 other => builder.values().append_value(format!("{other:?}")),
2578 }
2579 }
2580 builder.append(true);
2581 }
2582 _ => builder.append(false),
2583 }
2584 }
2585 Ok(Arc::new(builder.finish()))
2586 }
2587 DataType::Int64 => {
2588 let mut builder = ListBuilder::new(Int64Builder::new());
2589 for vid in vids {
2590 match get_property_value(vid, props_map, prop_name) {
2591 Some(Value::List(arr)) => {
2592 for v in arr {
2593 match v.as_i64() {
2594 Some(n) => builder.values().append_value(n),
2595 None => builder.values().append_null(),
2596 }
2597 }
2598 builder.append(true);
2599 }
2600 _ => builder.append(false),
2601 }
2602 }
2603 Ok(Arc::new(builder.finish()))
2604 }
2605 DataType::Float64 => {
2606 let mut builder = ListBuilder::new(Float64Builder::new());
2607 for vid in vids {
2608 match get_property_value(vid, props_map, prop_name) {
2609 Some(Value::List(arr)) => {
2610 for v in arr {
2611 match v.as_f64() {
2612 Some(n) => builder.values().append_value(n),
2613 None => builder.values().append_null(),
2614 }
2615 }
2616 builder.append(true);
2617 }
2618 _ => builder.append(false),
2619 }
2620 }
2621 Ok(Arc::new(builder.finish()))
2622 }
2623 DataType::Boolean => {
2624 let mut builder = ListBuilder::new(BooleanBuilder::new());
2625 for vid in vids {
2626 match get_property_value(vid, props_map, prop_name) {
2627 Some(Value::List(arr)) => {
2628 for v in arr {
2629 match v.as_bool() {
2630 Some(b) => builder.values().append_value(b),
2631 None => builder.values().append_null(),
2632 }
2633 }
2634 builder.append(true);
2635 }
2636 _ => builder.append(false),
2637 }
2638 }
2639 Ok(Arc::new(builder.finish()))
2640 }
2641 DataType::Struct(fields) => {
2642 build_list_of_structs_column(vids, props_map, prop_name, fields)
2644 }
2645 _ => {
2647 let mut builder = ListBuilder::new(StringBuilder::new());
2648 for vid in vids {
2649 match get_property_value(vid, props_map, prop_name) {
2650 Some(Value::List(arr)) => {
2651 for v in arr {
2652 match v {
2653 Value::Null => builder.values().append_null(),
2654 other => builder.values().append_value(format!("{other:?}")),
2655 }
2656 }
2657 builder.append(true);
2658 }
2659 _ => builder.append(false),
2660 }
2661 }
2662 Ok(Arc::new(builder.finish()))
2663 }
2664 }
2665}
2666
2667fn build_list_of_structs_column(
2673 vids: &[Vid],
2674 props_map: &HashMap<Vid, Properties>,
2675 prop_name: &str,
2676 fields: &Fields,
2677) -> DFResult<ArrayRef> {
2678 use arrow_array::StructArray;
2679
2680 let values: Vec<Option<Value>> = vids
2681 .iter()
2682 .map(|vid| get_property_value(vid, props_map, prop_name))
2683 .collect();
2684
2685 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2688 .iter()
2689 .map(|val| match val {
2690 Some(Value::List(arr)) => {
2691 let objs: Vec<HashMap<String, Value>> = arr
2692 .iter()
2693 .filter_map(|v| {
2694 if let Value::Map(m) = v {
2695 Some(m.clone())
2696 } else {
2697 None
2698 }
2699 })
2700 .collect();
2701 if objs.is_empty() { None } else { Some(objs) }
2702 }
2703 Some(Value::Map(obj)) => {
2704 let kv_pairs: Vec<HashMap<String, Value>> = obj
2706 .iter()
2707 .map(|(k, v)| {
2708 let mut m = HashMap::new();
2709 m.insert("key".to_string(), Value::String(k.clone()));
2710 m.insert("value".to_string(), v.clone());
2711 m
2712 })
2713 .collect();
2714 Some(kv_pairs)
2715 }
2716 _ => None,
2717 })
2718 .collect();
2719
2720 let total_items: usize = rows
2721 .iter()
2722 .filter_map(|r| r.as_ref())
2723 .map(|v| v.len())
2724 .sum();
2725
2726 let child_arrays: Vec<ArrayRef> = fields
2728 .iter()
2729 .map(|field| {
2730 let field_name = field.name();
2731 match field.data_type() {
2732 DataType::Utf8 => {
2733 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2734 for obj in rows.iter().flatten().flatten() {
2735 match obj.get(field_name) {
2736 Some(Value::String(s)) => builder.append_value(s),
2737 Some(Value::Null) | None => builder.append_null(),
2738 Some(other) => builder.append_value(format!("{other:?}")),
2739 }
2740 }
2741 Arc::new(builder.finish()) as ArrayRef
2742 }
2743 DataType::Int64 => {
2744 let mut builder = Int64Builder::with_capacity(total_items);
2745 for obj in rows.iter().flatten().flatten() {
2746 match obj.get(field_name).and_then(|v| v.as_i64()) {
2747 Some(n) => builder.append_value(n),
2748 None => builder.append_null(),
2749 }
2750 }
2751 Arc::new(builder.finish()) as ArrayRef
2752 }
2753 DataType::Float64 => {
2754 let mut builder = Float64Builder::with_capacity(total_items);
2755 for obj in rows.iter().flatten().flatten() {
2756 match obj.get(field_name).and_then(|v| v.as_f64()) {
2757 Some(n) => builder.append_value(n),
2758 None => builder.append_null(),
2759 }
2760 }
2761 Arc::new(builder.finish()) as ArrayRef
2762 }
2763 _ => {
2765 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2766 for obj in rows.iter().flatten().flatten() {
2767 match obj.get(field_name) {
2768 Some(Value::Null) | None => builder.append_null(),
2769 Some(other) => builder.append_value(format!("{other:?}")),
2770 }
2771 }
2772 Arc::new(builder.finish()) as ArrayRef
2773 }
2774 }
2775 })
2776 .collect();
2777
2778 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2780 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2781
2782 let mut offsets = Vec::with_capacity(vids.len() + 1);
2784 let mut nulls = Vec::with_capacity(vids.len());
2785 let mut offset = 0i32;
2786 offsets.push(offset);
2787 for row in &rows {
2788 match row {
2789 Some(objs) => {
2790 offset += objs.len() as i32;
2791 offsets.push(offset);
2792 nulls.push(true);
2793 }
2794 None => {
2795 offsets.push(offset);
2796 nulls.push(false);
2797 }
2798 }
2799 }
2800
2801 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2802 let list_array = arrow_array::ListArray::try_new(
2803 list_field,
2804 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2805 Arc::new(struct_array),
2806 Some(arrow::buffer::NullBuffer::from(nulls)),
2807 )
2808 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2809
2810 Ok(Arc::new(list_array))
2811}
2812
2813fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2816 use uni_common::value::TemporalValue;
2817 let mut m = HashMap::new();
2818 match tv {
2819 TemporalValue::DateTime {
2820 nanos_since_epoch,
2821 offset_seconds,
2822 timezone_name,
2823 } => {
2824 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2825 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2826 if let Some(tz) = timezone_name {
2827 m.insert("timezone_name".into(), Value::String(tz.clone()));
2828 }
2829 }
2830 TemporalValue::LocalDateTime { nanos_since_epoch } => {
2831 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2832 }
2833 TemporalValue::Time {
2834 nanos_since_midnight,
2835 offset_seconds,
2836 } => {
2837 m.insert(
2838 "nanos_since_midnight".into(),
2839 Value::Int(*nanos_since_midnight),
2840 );
2841 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2842 }
2843 TemporalValue::LocalTime {
2844 nanos_since_midnight,
2845 } => {
2846 m.insert(
2847 "nanos_since_midnight".into(),
2848 Value::Int(*nanos_since_midnight),
2849 );
2850 }
2851 TemporalValue::Date { days_since_epoch } => {
2852 m.insert(
2853 "days_since_epoch".into(),
2854 Value::Int(*days_since_epoch as i64),
2855 );
2856 }
2857 TemporalValue::Duration {
2858 months,
2859 days,
2860 nanos,
2861 } => {
2862 m.insert("months".into(), Value::Int(*months));
2863 m.insert("days".into(), Value::Int(*days));
2864 m.insert("nanos".into(), Value::Int(*nanos));
2865 }
2866 }
2867 m
2868}
2869
2870fn build_struct_property_column(
2872 vids: &[Vid],
2873 props_map: &HashMap<Vid, Properties>,
2874 prop_name: &str,
2875 fields: &Fields,
2876) -> DFResult<ArrayRef> {
2877 use arrow_array::StructArray;
2878
2879 let values: Vec<Option<Value>> = vids
2882 .iter()
2883 .map(|vid| {
2884 let val = get_property_value(vid, props_map, prop_name);
2885 match val {
2886 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2887 other => other,
2888 }
2889 })
2890 .collect();
2891
2892 let child_arrays: Vec<ArrayRef> = fields
2893 .iter()
2894 .map(|field| {
2895 let field_name = field.name();
2896 match field.data_type() {
2897 DataType::Float64 => {
2898 let mut builder = Float64Builder::with_capacity(vids.len());
2899 for val in &values {
2900 match val {
2901 Some(Value::Map(obj)) => {
2902 match obj.get(field_name).and_then(|v| v.as_f64()) {
2903 Some(n) => builder.append_value(n),
2904 None => builder.append_null(),
2905 }
2906 }
2907 _ => builder.append_null(),
2908 }
2909 }
2910 Arc::new(builder.finish()) as ArrayRef
2911 }
2912 DataType::Utf8 => {
2913 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2914 for val in &values {
2915 match val {
2916 Some(Value::Map(obj)) => match obj.get(field_name) {
2917 Some(Value::String(s)) => builder.append_value(s),
2918 Some(Value::Null) | None => builder.append_null(),
2919 Some(other) => builder.append_value(format!("{other:?}")),
2920 },
2921 _ => builder.append_null(),
2922 }
2923 }
2924 Arc::new(builder.finish()) as ArrayRef
2925 }
2926 DataType::Int64 => {
2927 let mut builder = Int64Builder::with_capacity(vids.len());
2928 for val in &values {
2929 match val {
2930 Some(Value::Map(obj)) => {
2931 match obj.get(field_name).and_then(|v| v.as_i64()) {
2932 Some(n) => builder.append_value(n),
2933 None => builder.append_null(),
2934 }
2935 }
2936 _ => builder.append_null(),
2937 }
2938 }
2939 Arc::new(builder.finish()) as ArrayRef
2940 }
2941 DataType::Timestamp(_, _) => {
2942 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
2943 for val in &values {
2944 match val {
2945 Some(Value::Map(obj)) => {
2946 match obj.get(field_name).and_then(|v| v.as_i64()) {
2947 Some(n) => builder.append_value(n),
2948 None => builder.append_null(),
2949 }
2950 }
2951 _ => builder.append_null(),
2952 }
2953 }
2954 Arc::new(builder.finish()) as ArrayRef
2955 }
2956 DataType::Int32 => {
2957 let mut builder = Int32Builder::with_capacity(vids.len());
2958 for val in &values {
2959 match val {
2960 Some(Value::Map(obj)) => {
2961 match obj.get(field_name).and_then(|v| v.as_i64()) {
2962 Some(n) => builder.append_value(n as i32),
2963 None => builder.append_null(),
2964 }
2965 }
2966 _ => builder.append_null(),
2967 }
2968 }
2969 Arc::new(builder.finish()) as ArrayRef
2970 }
2971 DataType::Time64(_) => {
2972 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
2973 for val in &values {
2974 match val {
2975 Some(Value::Map(obj)) => {
2976 match obj.get(field_name).and_then(|v| v.as_i64()) {
2977 Some(n) => builder.append_value(n),
2978 None => builder.append_null(),
2979 }
2980 }
2981 _ => builder.append_null(),
2982 }
2983 }
2984 Arc::new(builder.finish()) as ArrayRef
2985 }
2986 _ => {
2988 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2989 for val in &values {
2990 match val {
2991 Some(Value::Map(obj)) => match obj.get(field_name) {
2992 Some(Value::Null) | None => builder.append_null(),
2993 Some(other) => builder.append_value(format!("{other:?}")),
2994 },
2995 _ => builder.append_null(),
2996 }
2997 }
2998 Arc::new(builder.finish()) as ArrayRef
2999 }
3000 }
3001 })
3002 .collect();
3003
3004 let nulls: Vec<bool> = values
3006 .iter()
3007 .map(|v| matches!(v, Some(Value::Map(_))))
3008 .collect();
3009
3010 let struct_array = StructArray::try_new(
3011 fields.clone(),
3012 child_arrays,
3013 Some(arrow::buffer::NullBuffer::from(nulls)),
3014 )
3015 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3016
3017 Ok(Arc::new(struct_array))
3018}
3019
3020impl Stream for GraphScanStream {
3021 type Item = DFResult<RecordBatch>;
3022
3023 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3024 loop {
3025 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3027
3028 match state {
3029 GraphScanState::Init => {
3030 let graph_ctx = self.graph_ctx.clone();
3032 let label = self.label.clone();
3033 let variable = self.variable.clone();
3034 let properties = self.properties.clone();
3035 let is_edge_scan = self.is_edge_scan;
3036 let is_schemaless = self.is_schemaless;
3037 let schema = self.schema.clone();
3038
3039 let fut = async move {
3040 graph_ctx.check_timeout().map_err(|e| {
3041 datafusion::error::DataFusionError::Execution(e.to_string())
3042 })?;
3043
3044 let batch = if is_edge_scan {
3045 columnar_scan_edge_batch_static(
3046 &graph_ctx,
3047 &label,
3048 &variable,
3049 &properties,
3050 &schema,
3051 )
3052 .await?
3053 } else if is_schemaless {
3054 columnar_scan_schemaless_vertex_batch_static(
3055 &graph_ctx,
3056 &label,
3057 &variable,
3058 &properties,
3059 &schema,
3060 )
3061 .await?
3062 } else {
3063 columnar_scan_vertex_batch_static(
3064 &graph_ctx,
3065 &label,
3066 &variable,
3067 &properties,
3068 &schema,
3069 )
3070 .await?
3071 };
3072 Ok(Some(batch))
3073 };
3074
3075 self.state = GraphScanState::Executing(Box::pin(fut));
3076 }
3078 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3079 Poll::Ready(Ok(batch)) => {
3080 self.state = GraphScanState::Done;
3081 self.metrics
3082 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3083 return Poll::Ready(batch.map(Ok));
3084 }
3085 Poll::Ready(Err(e)) => {
3086 self.state = GraphScanState::Done;
3087 return Poll::Ready(Some(Err(e)));
3088 }
3089 Poll::Pending => {
3090 self.state = GraphScanState::Executing(fut);
3091 return Poll::Pending;
3092 }
3093 },
3094 GraphScanState::Done => {
3095 return Poll::Ready(None);
3096 }
3097 }
3098 }
3099 }
3100}
3101
3102impl RecordBatchStream for GraphScanStream {
3103 fn schema(&self) -> SchemaRef {
3104 self.schema.clone()
3105 }
3106}
3107
3108#[cfg(test)]
3109mod tests {
3110 use super::*;
3111
3112 #[test]
3113 fn test_build_vertex_schema() {
3114 let uni_schema = UniSchema::default();
3115 let schema = GraphScanExec::build_vertex_schema(
3116 "n",
3117 "Person",
3118 &["name".to_string(), "age".to_string()],
3119 &uni_schema,
3120 );
3121
3122 assert_eq!(schema.fields().len(), 4);
3123 assert_eq!(schema.field(0).name(), "n._vid");
3124 assert_eq!(schema.field(1).name(), "n._labels");
3125 assert_eq!(schema.field(2).name(), "n.name");
3126 assert_eq!(schema.field(3).name(), "n.age");
3127 }
3128
3129 #[test]
3130 fn test_build_edge_schema() {
3131 let uni_schema = UniSchema::default();
3132 let schema =
3133 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3134
3135 assert_eq!(schema.fields().len(), 4);
3136 assert_eq!(schema.field(0).name(), "r._eid");
3137 assert_eq!(schema.field(1).name(), "r._src_vid");
3138 assert_eq!(schema.field(2).name(), "r._dst_vid");
3139 assert_eq!(schema.field(3).name(), "r.weight");
3140 }
3141
3142 #[test]
3143 fn test_build_schemaless_vertex_schema() {
3144 let empty_schema = uni_common::core::schema::Schema::default();
3145 let schema = GraphScanExec::build_schemaless_vertex_schema(
3146 "n",
3147 &["name".to_string(), "age".to_string()],
3148 &empty_schema,
3149 );
3150
3151 assert_eq!(schema.fields().len(), 4);
3152 assert_eq!(schema.field(0).name(), "n._vid");
3153 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3154 assert_eq!(schema.field(1).name(), "n._labels");
3155 assert_eq!(schema.field(2).name(), "n.name");
3156 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3158 assert_eq!(schema.field(3).name(), "n.age");
3159 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3160 }
3161
3162 #[test]
3163 fn test_schemaless_all_scan_has_empty_label() {
3164 let empty_schema = uni_common::core::schema::Schema::default();
3165 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3166
3167 assert_eq!(schema.fields().len(), 2);
3169 assert_eq!(schema.field(0).name(), "n._vid");
3170 assert_eq!(schema.field(1).name(), "n._labels");
3171 }
3172
3173 #[test]
3174 fn test_cypher_value_all_props_extraction() {
3175 let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3177 let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3178
3179 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3181 match decoded {
3182 uni_common::Value::Map(map) => {
3183 let age_val = map.get("age").unwrap();
3184 assert_eq!(age_val, &uni_common::Value::Int(30));
3185 }
3186 _ => panic!("Expected Map"),
3187 }
3188
3189 let single_val = serde_json::json!(30);
3191 let single_bytes = encode_cypher_value(&single_val).unwrap();
3192 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3193 assert_eq!(single_decoded, uni_common::Value::Int(30));
3194 }
3195
3196 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3198 let schema = Arc::new(Schema::new(vec![
3199 Field::new("_vid", DataType::UInt64, false),
3200 Field::new("_deleted", DataType::Boolean, false),
3201 Field::new("_version", DataType::UInt64, false),
3202 Field::new("name", DataType::Utf8, true),
3203 ]));
3204 let names: Vec<String> = vids
3206 .iter()
3207 .zip(versions.iter())
3208 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3209 .collect();
3210 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3211
3212 RecordBatch::try_new(
3213 schema,
3214 vec![
3215 Arc::new(UInt64Array::from(vids.to_vec())),
3216 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3217 Arc::new(UInt64Array::from(versions.to_vec())),
3218 Arc::new(name_arr),
3219 ],
3220 )
3221 .unwrap()
3222 }
3223
3224 #[test]
3225 fn test_mvcc_dedup_multiple_versions() {
3226 let batch = make_mvcc_batch(
3229 &[1, 1, 1, 2, 2],
3230 &[3, 1, 5, 2, 4],
3231 &[false, false, false, false, false],
3232 );
3233
3234 let result = mvcc_dedup_batch(&batch).unwrap();
3235 assert_eq!(result.num_rows(), 2);
3236
3237 let vid_col = result
3238 .column_by_name("_vid")
3239 .unwrap()
3240 .as_any()
3241 .downcast_ref::<UInt64Array>()
3242 .unwrap();
3243 let ver_col = result
3244 .column_by_name("_version")
3245 .unwrap()
3246 .as_any()
3247 .downcast_ref::<UInt64Array>()
3248 .unwrap();
3249 let name_col = result
3250 .column_by_name("name")
3251 .unwrap()
3252 .as_any()
3253 .downcast_ref::<arrow_array::StringArray>()
3254 .unwrap();
3255
3256 assert_eq!(vid_col.value(0), 1);
3258 assert_eq!(ver_col.value(0), 5);
3259 assert_eq!(name_col.value(0), "v1_ver5");
3260
3261 assert_eq!(vid_col.value(1), 2);
3262 assert_eq!(ver_col.value(1), 4);
3263 assert_eq!(name_col.value(1), "v2_ver4");
3264 }
3265
3266 #[test]
3267 fn test_mvcc_dedup_single_rows() {
3268 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3270 let result = mvcc_dedup_batch(&batch).unwrap();
3271 assert_eq!(result.num_rows(), 3);
3272 }
3273
3274 #[test]
3275 fn test_mvcc_dedup_empty() {
3276 let batch = make_mvcc_batch(&[], &[], &[]);
3277 let result = mvcc_dedup_batch(&batch).unwrap();
3278 assert_eq!(result.num_rows(), 0);
3279 }
3280
3281 #[test]
3282 fn test_filter_l0_tombstones_removes_tombstoned() {
3283 use crate::query::df_graph::L0Context;
3284
3285 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3287
3288 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3290 {
3291 }
3295 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3296 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3297
3298 let l0_ctx = L0Context {
3299 current_l0: Some(l0_buf),
3300 transaction_l0: None,
3301 pending_flush_l0s: vec![],
3302 };
3303
3304 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3305 assert_eq!(result.num_rows(), 2);
3306
3307 let vid_col = result
3308 .column_by_name("_vid")
3309 .unwrap()
3310 .as_any()
3311 .downcast_ref::<UInt64Array>()
3312 .unwrap();
3313 assert_eq!(vid_col.value(0), 1);
3314 assert_eq!(vid_col.value(1), 3);
3315 }
3316
3317 #[test]
3318 fn test_filter_l0_tombstones_none() {
3319 use crate::query::df_graph::L0Context;
3320
3321 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3322 let l0_ctx = L0Context::default();
3323
3324 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3325 assert_eq!(result.num_rows(), 3);
3326 }
3327
3328 #[test]
3329 fn test_map_to_output_schema_basic() {
3330 use crate::query::df_graph::L0Context;
3331
3332 let lance_schema = Arc::new(Schema::new(vec![
3334 Field::new("_vid", DataType::UInt64, false),
3335 Field::new("_deleted", DataType::Boolean, false),
3336 Field::new("_version", DataType::UInt64, false),
3337 Field::new("name", DataType::Utf8, true),
3338 ]));
3339 let name_arr: arrow_array::StringArray =
3340 vec![Some("Alice"), Some("Bob")].into_iter().collect();
3341 let batch = RecordBatch::try_new(
3342 lance_schema,
3343 vec![
3344 Arc::new(UInt64Array::from(vec![1u64, 2])),
3345 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3346 Arc::new(UInt64Array::from(vec![1u64, 1])),
3347 Arc::new(name_arr),
3348 ],
3349 )
3350 .unwrap();
3351
3352 let output_schema = Arc::new(Schema::new(vec![
3354 Field::new("n._vid", DataType::UInt64, false),
3355 Field::new("n._labels", labels_data_type(), true),
3356 Field::new("n.name", DataType::Utf8, true),
3357 ]));
3358
3359 let l0_ctx = L0Context::default();
3360 let result = map_to_output_schema(
3361 &batch,
3362 "Person",
3363 "n",
3364 &["name".to_string()],
3365 &output_schema,
3366 &l0_ctx,
3367 )
3368 .unwrap();
3369
3370 assert_eq!(result.num_rows(), 2);
3371 assert_eq!(result.schema().fields().len(), 3);
3372 assert_eq!(result.schema().field(0).name(), "n._vid");
3373 assert_eq!(result.schema().field(1).name(), "n._labels");
3374 assert_eq!(result.schema().field(2).name(), "n.name");
3375
3376 let name_col = result
3378 .column(2)
3379 .as_any()
3380 .downcast_ref::<arrow_array::StringArray>()
3381 .unwrap();
3382 assert_eq!(name_col.value(0), "Alice");
3383 assert_eq!(name_col.value(1), "Bob");
3384 }
3385}