1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
90
91 is_edge_scan: bool,
93
94 is_schemaless: bool,
96
97 schema: SchemaRef,
99
100 properties: PlanProperties,
102
103 metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("GraphScanExec")
110 .field("label", &self.label)
111 .field("variable", &self.variable)
112 .field("projected_properties", &self.projected_properties)
113 .field("is_edge_scan", &self.is_edge_scan)
114 .finish()
115 }
116}
117
118impl GraphScanExec {
119 pub fn new_vertex_scan(
124 graph_ctx: Arc<GraphExecutionContext>,
125 label: impl Into<String>,
126 variable: impl Into<String>,
127 projected_properties: Vec<String>,
128 filter: Option<Arc<dyn PhysicalExpr>>,
129 ) -> Self {
130 let label = label.into();
131 let variable = variable.into();
132
133 let uni_schema = graph_ctx.storage().schema_manager().schema();
135 let schema =
136 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138 let properties = compute_plan_properties(schema.clone());
139
140 Self {
141 graph_ctx,
142 label,
143 variable,
144 projected_properties,
145 filter,
146 is_edge_scan: false,
147 is_schemaless: false,
148 schema,
149 properties,
150 metrics: ExecutionPlanMetricsSet::new(),
151 }
152 }
153
154 pub fn new_schemaless_vertex_scan(
160 graph_ctx: Arc<GraphExecutionContext>,
161 label_name: impl Into<String>,
162 variable: impl Into<String>,
163 projected_properties: Vec<String>,
164 filter: Option<Arc<dyn PhysicalExpr>>,
165 ) -> Self {
166 let label = label_name.into();
167 let variable = variable.into();
168
169 let projected_properties: Vec<String> = projected_properties
174 .into_iter()
175 .filter(|p| p != "_vid" && p != "_labels")
176 .collect();
177
178 let uni_schema = graph_ctx.storage().schema_manager().schema();
179 let schema =
180 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
181 let properties = compute_plan_properties(schema.clone());
182
183 Self {
184 graph_ctx,
185 label,
186 variable,
187 projected_properties,
188 filter,
189 is_edge_scan: false,
190 is_schemaless: true,
191 schema,
192 properties,
193 metrics: ExecutionPlanMetricsSet::new(),
194 }
195 }
196
197 pub fn new_multi_label_vertex_scan(
202 graph_ctx: Arc<GraphExecutionContext>,
203 labels: Vec<String>,
204 variable: impl Into<String>,
205 projected_properties: Vec<String>,
206 filter: Option<Arc<dyn PhysicalExpr>>,
207 ) -> Self {
208 let variable = variable.into();
209 let projected_properties: Vec<String> = projected_properties
210 .into_iter()
211 .filter(|p| p != "_vid" && p != "_labels")
212 .collect();
213 let uni_schema = graph_ctx.storage().schema_manager().schema();
214 let schema =
215 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
216 let properties = compute_plan_properties(schema.clone());
217
218 let encoded_labels = labels.join(":");
220
221 Self {
222 graph_ctx,
223 label: encoded_labels,
224 variable,
225 projected_properties,
226 filter,
227 is_edge_scan: false,
228 is_schemaless: true,
229 schema,
230 properties,
231 metrics: ExecutionPlanMetricsSet::new(),
232 }
233 }
234
235 pub fn new_schemaless_all_scan(
241 graph_ctx: Arc<GraphExecutionContext>,
242 variable: impl Into<String>,
243 projected_properties: Vec<String>,
244 filter: Option<Arc<dyn PhysicalExpr>>,
245 ) -> Self {
246 let variable = variable.into();
247 let projected_properties: Vec<String> = projected_properties
248 .into_iter()
249 .filter(|p| p != "_vid" && p != "_labels")
250 .collect();
251
252 let uni_schema = graph_ctx.storage().schema_manager().schema();
253 let schema =
254 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
255 let properties = compute_plan_properties(schema.clone());
256
257 Self {
258 graph_ctx,
259 label: String::new(), variable,
261 projected_properties,
262 filter,
263 is_edge_scan: false,
264 is_schemaless: true,
265 schema,
266 properties,
267 metrics: ExecutionPlanMetricsSet::new(),
268 }
269 }
270
271 fn build_schemaless_vertex_schema(
277 variable: &str,
278 properties: &[String],
279 uni_schema: &uni_common::core::schema::Schema,
280 ) -> SchemaRef {
281 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
283 std::collections::HashMap::new();
284 for label_props in uni_schema.properties.values() {
285 for (name, meta) in label_props {
286 merged.entry(name.as_str()).or_insert(meta);
287 }
288 }
289
290 let mut fields = vec![
291 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
292 Field::new(format!("{}._labels", variable), labels_data_type(), true),
293 ];
294
295 for prop in properties {
296 let col_name = format!("{}.{}", variable, prop);
297 let arrow_type = merged
298 .get(prop.as_str())
299 .map(|meta| meta.r#type.to_arrow())
300 .unwrap_or(DataType::LargeBinary);
301 fields.push(Field::new(&col_name, arrow_type, true));
302 }
303
304 Arc::new(Schema::new(fields))
305 }
306
307 pub fn new_edge_scan(
312 graph_ctx: Arc<GraphExecutionContext>,
313 edge_type: impl Into<String>,
314 variable: impl Into<String>,
315 projected_properties: Vec<String>,
316 filter: Option<Arc<dyn PhysicalExpr>>,
317 ) -> Self {
318 let label = edge_type.into();
319 let variable = variable.into();
320
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
323 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
324
325 let properties = compute_plan_properties(schema.clone());
326
327 Self {
328 graph_ctx,
329 label,
330 variable,
331 projected_properties,
332 filter,
333 is_edge_scan: true,
334 is_schemaless: false,
335 schema,
336 properties,
337 metrics: ExecutionPlanMetricsSet::new(),
338 }
339 }
340
341 fn build_vertex_schema(
343 variable: &str,
344 label: &str,
345 properties: &[String],
346 uni_schema: &UniSchema,
347 ) -> SchemaRef {
348 let mut fields = vec![
349 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
350 Field::new(format!("{}._labels", variable), labels_data_type(), true),
351 ];
352 let label_props = uni_schema.properties.get(label);
353 for prop in properties {
354 let col_name = format!("{}.{}", variable, prop);
355 let arrow_type = resolve_property_type(prop, label_props);
356 fields.push(Field::new(&col_name, arrow_type, true));
357 }
358 Arc::new(Schema::new(fields))
359 }
360
361 fn build_edge_schema(
363 variable: &str,
364 edge_type: &str,
365 properties: &[String],
366 uni_schema: &UniSchema,
367 ) -> SchemaRef {
368 let mut fields = vec![
369 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
370 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
371 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
372 ];
373 let edge_props = uni_schema.properties.get(edge_type);
374 for prop in properties {
375 let col_name = format!("{}.{}", variable, prop);
376 let arrow_type = resolve_property_type(prop, edge_props);
377 fields.push(Field::new(&col_name, arrow_type, true));
378 }
379 Arc::new(Schema::new(fields))
380 }
381}
382
383impl DisplayAs for GraphScanExec {
384 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
386 write!(
387 f,
388 "GraphScanExec: {}={}, properties={:?}",
389 scan_type, self.label, self.projected_properties
390 )?;
391 if self.filter.is_some() {
392 write!(f, ", filter=<pushed>")?;
393 }
394 Ok(())
395 }
396}
397
398impl ExecutionPlan for GraphScanExec {
399 fn name(&self) -> &str {
400 "GraphScanExec"
401 }
402
403 fn as_any(&self) -> &dyn Any {
404 self
405 }
406
407 fn schema(&self) -> SchemaRef {
408 self.schema.clone()
409 }
410
411 fn properties(&self) -> &PlanProperties {
412 &self.properties
413 }
414
415 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
416 vec![]
417 }
418
419 fn with_new_children(
420 self: Arc<Self>,
421 children: Vec<Arc<dyn ExecutionPlan>>,
422 ) -> DFResult<Arc<dyn ExecutionPlan>> {
423 if children.is_empty() {
424 Ok(self)
425 } else {
426 Err(datafusion::error::DataFusionError::Plan(
427 "GraphScanExec does not accept children".to_string(),
428 ))
429 }
430 }
431
432 fn execute(
433 &self,
434 partition: usize,
435 _context: Arc<TaskContext>,
436 ) -> DFResult<SendableRecordBatchStream> {
437 let metrics = BaselineMetrics::new(&self.metrics, partition);
438
439 Ok(Box::pin(GraphScanStream::new(
440 self.graph_ctx.clone(),
441 self.label.clone(),
442 self.variable.clone(),
443 self.projected_properties.clone(),
444 self.is_edge_scan,
445 self.is_schemaless,
446 self.schema.clone(),
447 metrics,
448 )))
449 }
450
451 fn metrics(&self) -> Option<MetricsSet> {
452 Some(self.metrics.clone_inner())
453 }
454}
455
456enum GraphScanState {
458 Init,
460 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
462 Done,
464}
465
466struct GraphScanStream {
472 graph_ctx: Arc<GraphExecutionContext>,
474
475 label: String,
477
478 variable: String,
480
481 properties: Vec<String>,
483
484 is_edge_scan: bool,
486
487 is_schemaless: bool,
489
490 schema: SchemaRef,
492
493 state: GraphScanState,
495
496 metrics: BaselineMetrics,
498}
499
500impl GraphScanStream {
501 #[expect(clippy::too_many_arguments)]
503 fn new(
504 graph_ctx: Arc<GraphExecutionContext>,
505 label: String,
506 variable: String,
507 properties: Vec<String>,
508 is_edge_scan: bool,
509 is_schemaless: bool,
510 schema: SchemaRef,
511 metrics: BaselineMetrics,
512 ) -> Self {
513 Self {
514 graph_ctx,
515 label,
516 variable,
517 properties,
518 is_edge_scan,
519 is_schemaless,
520 schema,
521 state: GraphScanState::Init,
522 metrics,
523 }
524 }
525}
526
527pub(crate) fn resolve_property_type(
532 prop: &str,
533 schema_props: Option<
534 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
535 >,
536) -> DataType {
537 if prop == "overflow_json" {
538 DataType::LargeBinary
539 } else {
540 schema_props
541 .and_then(|props| props.get(prop))
542 .map(|meta| meta.r#type.to_arrow())
543 .unwrap_or(DataType::LargeBinary)
544 }
545}
546
547#[cfg(test)]
556fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
557 mvcc_dedup_batch_by(batch, "_vid")
558}
559
560fn mvcc_dedup_to_option(
565 batch: Option<RecordBatch>,
566 id_column: &str,
567) -> DFResult<Option<RecordBatch>> {
568 match batch {
569 Some(b) => {
570 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
571 Ok(if deduped.num_rows() > 0 {
572 Some(deduped)
573 } else {
574 None
575 })
576 }
577 None => Ok(None),
578 }
579}
580
581fn merge_lance_and_l0(
585 lance_deduped: Option<RecordBatch>,
586 l0_batch: RecordBatch,
587 internal_schema: &SchemaRef,
588 id_column: &str,
589) -> DFResult<Option<RecordBatch>> {
590 let has_l0 = l0_batch.num_rows() > 0;
591 match (lance_deduped, has_l0) {
592 (Some(lance), true) => {
593 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
594 .map_err(arrow_err)?;
595 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
596 }
597 (Some(lance), false) => Ok(Some(lance)),
598 (None, true) => Ok(Some(l0_batch)),
599 (None, false) => Ok(None),
600 }
601}
602
603fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
608 if !columns.iter().any(|c| c == col_name) {
609 columns.push(col_name.to_string());
610 }
611}
612
613fn extract_from_overflow_blob(
618 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
619 row: usize,
620 prop: &str,
621) -> Option<Vec<u8>> {
622 let arr = overflow_arr?;
623 if arr.is_null(row) {
624 return None;
625 }
626 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
627}
628
629fn build_overflow_property_column(
636 num_rows: usize,
637 vid_arr: &UInt64Array,
638 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
639 prop: &str,
640 l0_ctx: &crate::query::df_graph::L0Context,
641) -> ArrayRef {
642 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
643 for i in 0..num_rows {
644 let vid = Vid::from(vid_arr.value(i));
645
646 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
648
649 if let Some(val_opt) = l0_val {
650 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
651 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
652 builder.append_value(&bytes);
653 } else {
654 builder.append_null();
655 }
656 }
657 Arc::new(builder.finish())
658}
659
660fn resolve_l0_property(
666 vid: &Vid,
667 prop: &str,
668 l0_ctx: &crate::query::df_graph::L0Context,
669) -> Option<Option<Value>> {
670 let mut result = None;
671 for l0 in l0_ctx.iter_l0_buffers() {
672 let guard = l0.read();
673 if let Some(props) = guard.vertex_properties.get(vid)
674 && let Some(val) = props.get(prop)
675 {
676 result = Some(Some(val.clone()));
677 }
678 }
679 result
680}
681
682fn append_value_as_cypher_binary(
687 builder: &mut arrow_array::builder::LargeBinaryBuilder,
688 val: Option<&Value>,
689) {
690 match val {
691 Some(v) if !v.is_null() => {
692 let json_val: serde_json::Value = v.clone().into();
693 match encode_cypher_value(&json_val) {
694 Ok(bytes) => builder.append_value(bytes),
695 Err(_) => builder.append_null(),
696 }
697 }
698 _ => builder.append_null(),
699 }
700}
701
702fn build_all_props_column_with_l0_overlay(
710 num_rows: usize,
711 vid_arr: &UInt64Array,
712 props_arr: Option<&arrow_array::LargeBinaryArray>,
713 l0_ctx: &crate::query::df_graph::L0Context,
714) -> ArrayRef {
715 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
716 for i in 0..num_rows {
717 let vid = Vid::from(vid_arr.value(i));
718
719 let mut merged_props = serde_json::Map::new();
721 if let Some(arr) = props_arr
722 && !arr.is_null(i)
723 && let Ok(uni_common::Value::Map(map)) =
724 uni_common::cypher_value_codec::decode(arr.value(i))
725 {
726 for (k, v) in map {
727 let json_val: serde_json::Value = v.into();
728 merged_props.insert(k, json_val);
729 }
730 }
731
732 for l0 in l0_ctx.iter_l0_buffers() {
734 let guard = l0.read();
735 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
736 for (k, v) in l0_props {
737 let json_val: serde_json::Value = v.clone().into();
738 merged_props.insert(k.clone(), json_val);
739 }
740 }
741 }
742
743 if merged_props.is_empty() {
745 builder.append_null();
746 } else {
747 let json_obj = serde_json::Value::Object(merged_props);
748 match encode_cypher_value(&json_obj) {
749 Ok(bytes) => builder.append_value(bytes),
750 Err(_) => builder.append_null(),
751 }
752 }
753 }
754 Arc::new(builder.finish())
755}
756
757fn build_all_props_column_for_schema_scan(
762 batch: &RecordBatch,
763 vid_arr: &UInt64Array,
764 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
765 projected_properties: &[String],
766 l0_ctx: &crate::query::df_graph::L0Context,
767) -> ArrayRef {
768 let schema_props: Vec<&str> = projected_properties
770 .iter()
771 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
772 .map(String::as_str)
773 .collect();
774
775 let num_rows = batch.num_rows();
776 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
777 for i in 0..num_rows {
778 let vid = Vid::from(vid_arr.value(i));
779 let mut merged_props = serde_json::Map::new();
780
781 for &prop in &schema_props {
783 if let Some(col) = batch.column_by_name(prop) {
784 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
785 if !val.is_null() {
786 let json_val: serde_json::Value = val.into();
787 merged_props.insert(prop.to_string(), json_val);
788 }
789 }
790 }
791
792 if let Some(arr) = overflow_arr
794 && !arr.is_null(i)
795 && let Ok(uni_common::Value::Map(map)) =
796 uni_common::cypher_value_codec::decode(arr.value(i))
797 {
798 for (k, v) in map {
799 let json_val: serde_json::Value = v.into();
800 merged_props.insert(k, json_val);
801 }
802 }
803
804 for l0 in l0_ctx.iter_l0_buffers() {
806 let guard = l0.read();
807 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
808 for (k, v) in l0_props {
809 let json_val: serde_json::Value = v.clone().into();
810 merged_props.insert(k.clone(), json_val);
811 }
812 }
813 }
814
815 if merged_props.is_empty() {
816 builder.append_null();
817 } else {
818 let json_obj = serde_json::Value::Object(merged_props);
819 match encode_cypher_value(&json_obj) {
820 Ok(bytes) => builder.append_value(bytes),
821 Err(_) => builder.append_null(),
822 }
823 }
824 }
825 Arc::new(builder.finish())
826}
827
828fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
834 if batch.num_rows() == 0 {
835 return Ok(batch.clone());
836 }
837
838 let id_col = batch
839 .column_by_name(id_column)
840 .ok_or_else(|| {
841 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
842 })?
843 .clone();
844 let version_col = batch
845 .column_by_name("_version")
846 .ok_or_else(|| {
847 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
848 })?
849 .clone();
850
851 let sort_columns = vec![
853 arrow::compute::SortColumn {
854 values: id_col,
855 options: Some(arrow::compute::SortOptions {
856 descending: false,
857 nulls_first: false,
858 }),
859 },
860 arrow::compute::SortColumn {
861 values: version_col,
862 options: Some(arrow::compute::SortOptions {
863 descending: true,
864 nulls_first: false,
865 }),
866 },
867 ];
868 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
869
870 let sorted_columns: Vec<ArrayRef> = batch
872 .columns()
873 .iter()
874 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
875 .collect::<Result<_, _>>()
876 .map_err(arrow_err)?;
877 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
878
879 let sorted_id = sorted
881 .column_by_name(id_column)
882 .unwrap()
883 .as_any()
884 .downcast_ref::<UInt64Array>()
885 .unwrap();
886
887 let mut keep = vec![false; sorted.num_rows()];
888 if !keep.is_empty() {
889 keep[0] = true;
890 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
891 if sorted_id.value(i) != sorted_id.value(i - 1) {
892 *flag = true;
893 }
894 }
895 }
896
897 let mask = arrow_array::BooleanArray::from(keep);
898 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
899}
900
901fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
903 if batch.num_rows() == 0 {
904 return Ok(batch.clone());
905 }
906 let op_col = match batch.column_by_name("op") {
907 Some(col) => col
908 .as_any()
909 .downcast_ref::<arrow_array::UInt8Array>()
910 .unwrap(),
911 None => return Ok(batch.clone()),
912 };
913 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
914 let mask = arrow_array::BooleanArray::from(keep);
915 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
916}
917
918fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
920 if batch.num_rows() == 0 {
921 return Ok(batch.clone());
922 }
923 let deleted_col = match batch.column_by_name("_deleted") {
924 Some(col) => col
925 .as_any()
926 .downcast_ref::<arrow_array::BooleanArray>()
927 .unwrap(),
928 None => return Ok(batch.clone()),
929 };
930 let keep: Vec<bool> = (0..deleted_col.len())
931 .map(|i| !deleted_col.value(i))
932 .collect();
933 let mask = arrow_array::BooleanArray::from(keep);
934 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
935}
936
937fn filter_l0_tombstones(
939 batch: &RecordBatch,
940 l0_ctx: &crate::query::df_graph::L0Context,
941) -> DFResult<RecordBatch> {
942 if batch.num_rows() == 0 {
943 return Ok(batch.clone());
944 }
945
946 let mut tombstones: HashSet<u64> = HashSet::new();
947 for l0 in l0_ctx.iter_l0_buffers() {
948 let guard = l0.read();
949 for vid in guard.vertex_tombstones.iter() {
950 tombstones.insert(vid.as_u64());
951 }
952 }
953
954 if tombstones.is_empty() {
955 return Ok(batch.clone());
956 }
957
958 let vid_col = batch
959 .column_by_name("_vid")
960 .ok_or_else(|| {
961 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
962 })?
963 .as_any()
964 .downcast_ref::<UInt64Array>()
965 .unwrap();
966
967 let keep: Vec<bool> = (0..vid_col.len())
968 .map(|i| !tombstones.contains(&vid_col.value(i)))
969 .collect();
970 let mask = arrow_array::BooleanArray::from(keep);
971 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
972}
973
974fn filter_l0_edge_tombstones(
976 batch: &RecordBatch,
977 l0_ctx: &crate::query::df_graph::L0Context,
978) -> DFResult<RecordBatch> {
979 if batch.num_rows() == 0 {
980 return Ok(batch.clone());
981 }
982
983 let mut tombstones: HashSet<u64> = HashSet::new();
984 for l0 in l0_ctx.iter_l0_buffers() {
985 let guard = l0.read();
986 for eid in guard.tombstones.keys() {
987 tombstones.insert(eid.as_u64());
988 }
989 }
990
991 if tombstones.is_empty() {
992 return Ok(batch.clone());
993 }
994
995 let eid_col = batch
996 .column_by_name("eid")
997 .ok_or_else(|| {
998 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
999 })?
1000 .as_any()
1001 .downcast_ref::<UInt64Array>()
1002 .unwrap();
1003
1004 let keep: Vec<bool> = (0..eid_col.len())
1005 .map(|i| !tombstones.contains(&eid_col.value(i)))
1006 .collect();
1007 let mask = arrow_array::BooleanArray::from(keep);
1008 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1009}
1010
1011fn build_l0_vertex_batch(
1017 l0_ctx: &crate::query::df_graph::L0Context,
1018 label: &str,
1019 lance_schema: &SchemaRef,
1020 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1021) -> DFResult<RecordBatch> {
1022 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1025
1026 for l0 in l0_ctx.iter_l0_buffers() {
1027 let guard = l0.read();
1028 for vid in guard.vertex_tombstones.iter() {
1030 tombstones.insert(vid.as_u64());
1031 }
1032 for vid in guard.vids_for_label(label) {
1034 let vid_u64 = vid.as_u64();
1035 if tombstones.contains(&vid_u64) {
1036 continue;
1037 }
1038 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1039 let entry = vid_data
1040 .entry(vid_u64)
1041 .or_insert_with(|| (Properties::new(), 0));
1042 if let Some(props) = guard.vertex_properties.get(&vid) {
1044 for (k, v) in props {
1045 entry.0.insert(k.clone(), v.clone());
1046 }
1047 }
1048 if version > entry.1 {
1050 entry.1 = version;
1051 }
1052 }
1053 }
1054
1055 for t in &tombstones {
1057 vid_data.remove(t);
1058 }
1059
1060 if vid_data.is_empty() {
1061 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1062 }
1063
1064 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1066 vids.sort_unstable();
1067
1068 let num_rows = vids.len();
1069 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1070
1071 let schema_prop_names: HashSet<&str> = label_props
1073 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1074 .unwrap_or_default();
1075
1076 for field in lance_schema.fields() {
1077 let col_name = field.name().as_str();
1078 match col_name {
1079 "_vid" => {
1080 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1081 }
1082 "_deleted" => {
1083 let vals = vec![false; num_rows];
1085 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1086 }
1087 "_version" => {
1088 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1089 columns.push(Arc::new(UInt64Array::from(vals)));
1090 }
1091 "overflow_json" => {
1092 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1094 for vid_u64 in &vids {
1095 let (props, _) = &vid_data[vid_u64];
1096 let mut overflow = serde_json::Map::new();
1097 for (k, v) in props {
1098 if k == "ext_id" || k.starts_with('_') {
1099 continue;
1100 }
1101 if !schema_prop_names.contains(k.as_str()) {
1102 let json_val: serde_json::Value = v.clone().into();
1103 overflow.insert(k.clone(), json_val);
1104 }
1105 }
1106 if overflow.is_empty() {
1107 builder.append_null();
1108 } else {
1109 let json_val = serde_json::Value::Object(overflow);
1110 match encode_cypher_value(&json_val) {
1111 Ok(bytes) => builder.append_value(bytes),
1112 Err(_) => builder.append_null(),
1113 }
1114 }
1115 }
1116 columns.push(Arc::new(builder.finish()));
1117 }
1118 _ => {
1119 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1121 columns.push(col);
1122 }
1123 }
1124 }
1125
1126 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1127}
1128
1129fn build_l0_property_column(
1133 vids: &[u64],
1134 vid_data: &HashMap<u64, (Properties, u64)>,
1135 prop_name: &str,
1136 data_type: &DataType,
1137) -> DFResult<ArrayRef> {
1138 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1140 let props_map: HashMap<Vid, Properties> = vid_data
1141 .iter()
1142 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1143 .collect();
1144
1145 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1146}
1147
1148fn build_l0_edge_batch(
1154 l0_ctx: &crate::query::df_graph::L0Context,
1155 edge_type: &str,
1156 internal_schema: &SchemaRef,
1157 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1158) -> DFResult<RecordBatch> {
1159 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1162 let mut tombstones: HashSet<u64> = HashSet::new();
1163
1164 for l0 in l0_ctx.iter_l0_buffers() {
1165 let guard = l0.read();
1166 for eid in guard.tombstones.keys() {
1168 tombstones.insert(eid.as_u64());
1169 }
1170 for eid in guard.eids_for_type(edge_type) {
1172 let eid_u64 = eid.as_u64();
1173 if tombstones.contains(&eid_u64) {
1174 continue;
1175 }
1176 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1177 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1178 None => continue,
1179 };
1180 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1181 let entry = eid_data
1182 .entry(eid_u64)
1183 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1184 if let Some(props) = guard.edge_properties.get(&eid) {
1186 for (k, v) in props {
1187 entry.2.insert(k.clone(), v.clone());
1188 }
1189 }
1190 entry.0 = src_vid;
1192 entry.1 = dst_vid;
1193 if version > entry.3 {
1195 entry.3 = version;
1196 }
1197 }
1198 }
1199
1200 for t in &tombstones {
1202 eid_data.remove(t);
1203 }
1204
1205 if eid_data.is_empty() {
1206 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1207 }
1208
1209 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1211 eids.sort_unstable();
1212
1213 let num_rows = eids.len();
1214 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1215
1216 let schema_prop_names: HashSet<&str> = type_props
1218 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1219 .unwrap_or_default();
1220
1221 for field in internal_schema.fields() {
1222 let col_name = field.name().as_str();
1223 match col_name {
1224 "eid" => {
1225 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1226 }
1227 "src_vid" => {
1228 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1229 columns.push(Arc::new(UInt64Array::from(vals)));
1230 }
1231 "dst_vid" => {
1232 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1233 columns.push(Arc::new(UInt64Array::from(vals)));
1234 }
1235 "op" => {
1236 let vals = vec![0u8; num_rows];
1238 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1239 }
1240 "_version" => {
1241 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1242 columns.push(Arc::new(UInt64Array::from(vals)));
1243 }
1244 "overflow_json" => {
1245 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1247 for eid_u64 in &eids {
1248 let (_, _, props, _) = &eid_data[eid_u64];
1249 let mut overflow = serde_json::Map::new();
1250 for (k, v) in props {
1251 if k.starts_with('_') {
1252 continue;
1253 }
1254 if !schema_prop_names.contains(k.as_str()) {
1255 let json_val: serde_json::Value = v.clone().into();
1256 overflow.insert(k.clone(), json_val);
1257 }
1258 }
1259 if overflow.is_empty() {
1260 builder.append_null();
1261 } else {
1262 let json_val = serde_json::Value::Object(overflow);
1263 match encode_cypher_value(&json_val) {
1264 Ok(bytes) => builder.append_value(bytes),
1265 Err(_) => builder.append_null(),
1266 }
1267 }
1268 }
1269 columns.push(Arc::new(builder.finish()));
1270 }
1271 _ => {
1272 let col =
1274 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1275 columns.push(col);
1276 }
1277 }
1278 }
1279
1280 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1281}
1282
1283fn build_l0_edge_property_column(
1287 eids: &[u64],
1288 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1289 prop_name: &str,
1290 data_type: &DataType,
1291) -> DFResult<ArrayRef> {
1292 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1294 let props_map: HashMap<Vid, Properties> = eid_data
1295 .iter()
1296 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1297 .collect();
1298
1299 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1300}
1301
1302fn build_labels_column_for_known_label(
1308 vid_arr: &UInt64Array,
1309 label: &str,
1310 l0_ctx: &crate::query::df_graph::L0Context,
1311 batch_labels_col: Option<&arrow_array::ListArray>,
1312) -> DFResult<ArrayRef> {
1313 use uni_store::storage::arrow_convert::labels_from_list_array;
1314
1315 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1316
1317 for i in 0..vid_arr.len() {
1318 let vid = Vid::from(vid_arr.value(i));
1319
1320 let mut labels = match batch_labels_col {
1322 Some(list_arr) => {
1323 let stored = labels_from_list_array(list_arr, i);
1324 if stored.is_empty() {
1325 vec![label.to_string()]
1326 } else {
1327 stored
1328 }
1329 }
1330 None => vec![label.to_string()],
1331 };
1332
1333 if !labels.iter().any(|l| l == label) {
1335 labels.push(label.to_string());
1336 }
1337
1338 for l0 in l0_ctx.iter_l0_buffers() {
1340 let guard = l0.read();
1341 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1342 for lbl in l0_labels {
1343 if !labels.contains(lbl) {
1344 labels.push(lbl.clone());
1345 }
1346 }
1347 }
1348 }
1349
1350 let values = labels_builder.values();
1351 for lbl in &labels {
1352 values.append_value(lbl);
1353 }
1354 labels_builder.append(true);
1355 }
1356
1357 Ok(Arc::new(labels_builder.finish()))
1358}
1359
1360fn map_to_output_schema(
1366 batch: &RecordBatch,
1367 label: &str,
1368 _variable: &str,
1369 projected_properties: &[String],
1370 output_schema: &SchemaRef,
1371 l0_ctx: &crate::query::df_graph::L0Context,
1372) -> DFResult<RecordBatch> {
1373 if batch.num_rows() == 0 {
1374 return Ok(RecordBatch::new_empty(output_schema.clone()));
1375 }
1376
1377 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1378
1379 let vid_col = batch
1381 .column_by_name("_vid")
1382 .ok_or_else(|| {
1383 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1384 })?
1385 .clone();
1386 let vid_arr = vid_col
1387 .as_any()
1388 .downcast_ref::<UInt64Array>()
1389 .ok_or_else(|| {
1390 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1391 })?;
1392
1393 let batch_labels_col = batch
1395 .column_by_name("_labels")
1396 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1397 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1398 columns.push(vid_col.clone());
1399 columns.push(labels_col);
1400
1401 let overflow_arr = batch
1404 .column_by_name("overflow_json")
1405 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1406
1407 for prop in projected_properties {
1408 if prop == "overflow_json" {
1409 match batch.column_by_name("overflow_json") {
1410 Some(col) => columns.push(col.clone()),
1411 None => {
1412 columns.push(arrow_array::new_null_array(
1414 &DataType::LargeBinary,
1415 batch.num_rows(),
1416 ));
1417 }
1418 }
1419 } else if prop == "_all_props" {
1420 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1424 let guard = l0.read();
1425 !guard.vertex_properties.is_empty()
1426 });
1427 let has_schema_cols = projected_properties
1429 .iter()
1430 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1431
1432 if !any_l0_has_vertex_props && !has_schema_cols {
1433 match batch.column_by_name("overflow_json") {
1435 Some(col) => columns.push(col.clone()),
1436 None => {
1437 columns.push(arrow_array::new_null_array(
1438 &DataType::LargeBinary,
1439 batch.num_rows(),
1440 ));
1441 }
1442 }
1443 } else {
1444 let col = build_all_props_column_for_schema_scan(
1446 batch,
1447 vid_arr,
1448 overflow_arr,
1449 projected_properties,
1450 l0_ctx,
1451 );
1452 columns.push(col);
1453 }
1454 } else {
1455 match batch.column_by_name(prop) {
1456 Some(col) => columns.push(col.clone()),
1457 None => {
1458 let col = build_overflow_property_column(
1461 batch.num_rows(),
1462 vid_arr,
1463 overflow_arr,
1464 prop,
1465 l0_ctx,
1466 );
1467 columns.push(col);
1468 }
1469 }
1470 }
1471 }
1472
1473 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1474}
1475
1476fn map_edge_to_output_schema(
1483 batch: &RecordBatch,
1484 variable: &str,
1485 projected_properties: &[String],
1486 output_schema: &SchemaRef,
1487) -> DFResult<RecordBatch> {
1488 if batch.num_rows() == 0 {
1489 return Ok(RecordBatch::new_empty(output_schema.clone()));
1490 }
1491
1492 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1493
1494 let eid_col = batch
1496 .column_by_name("eid")
1497 .ok_or_else(|| {
1498 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1499 })?
1500 .clone();
1501 columns.push(eid_col);
1502
1503 let src_col = batch
1505 .column_by_name("src_vid")
1506 .ok_or_else(|| {
1507 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1508 })?
1509 .clone();
1510 columns.push(src_col);
1511
1512 let dst_col = batch
1514 .column_by_name("dst_vid")
1515 .ok_or_else(|| {
1516 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1517 })?
1518 .clone();
1519 columns.push(dst_col);
1520
1521 for prop in projected_properties {
1523 if prop == "overflow_json" {
1524 match batch.column_by_name("overflow_json") {
1525 Some(col) => columns.push(col.clone()),
1526 None => {
1527 columns.push(arrow_array::new_null_array(
1528 &DataType::LargeBinary,
1529 batch.num_rows(),
1530 ));
1531 }
1532 }
1533 } else {
1534 match batch.column_by_name(prop) {
1535 Some(col) => columns.push(col.clone()),
1536 None => {
1537 let overflow_arr = batch
1540 .column_by_name("overflow_json")
1541 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1542
1543 if let Some(arr) = overflow_arr {
1544 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1545 for i in 0..batch.num_rows() {
1546 if !arr.is_null(i) {
1547 let blob = arr.value(i);
1548 if let Some(sub_bytes) =
1550 uni_common::cypher_value_codec::extract_map_entry_raw(
1551 blob, prop,
1552 )
1553 {
1554 builder.append_value(&sub_bytes);
1555 } else {
1556 builder.append_null();
1557 }
1558 } else {
1559 builder.append_null();
1560 }
1561 }
1562 columns.push(Arc::new(builder.finish()));
1563 } else {
1564 let target_field = output_schema
1566 .fields()
1567 .iter()
1568 .find(|f| f.name() == &format!("{}.{}", variable, prop));
1569 let dt = target_field
1570 .map(|f| f.data_type().clone())
1571 .unwrap_or(DataType::LargeBinary);
1572 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1573 }
1574 }
1575 }
1576 }
1577 }
1578
1579 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1580}
1581
1582async fn columnar_scan_vertex_batch_static(
1589 graph_ctx: &GraphExecutionContext,
1590 label: &str,
1591 variable: &str,
1592 projected_properties: &[String],
1593 output_schema: &SchemaRef,
1594) -> DFResult<RecordBatch> {
1595 let storage = graph_ctx.storage();
1596 let l0_ctx = graph_ctx.l0_context();
1597 let uni_schema = storage.schema_manager().schema();
1598 let label_props = uni_schema.properties.get(label);
1599
1600 let mut lance_columns: Vec<String> = vec![
1602 "_vid".to_string(),
1603 "_deleted".to_string(),
1604 "_version".to_string(),
1605 ];
1606 for prop in projected_properties {
1607 if prop == "overflow_json" {
1608 push_column_if_absent(&mut lance_columns, "overflow_json");
1609 } else {
1610 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1611 if exists_in_schema {
1612 push_column_if_absent(&mut lance_columns, prop);
1613 }
1614 }
1615 }
1616
1617 let needs_overflow = projected_properties
1619 .iter()
1620 .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1621 if needs_overflow {
1622 push_column_if_absent(&mut lance_columns, "overflow_json");
1623 }
1624
1625 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1627 let lance_batch = storage
1628 .scan_vertex_table(label, &lance_columns_refs, None)
1629 .await
1630 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1631
1632 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1634
1635 let internal_schema = match &lance_deduped {
1638 Some(batch) => batch.schema(),
1639 None => {
1640 let mut fields = vec![
1641 Field::new("_vid", DataType::UInt64, false),
1642 Field::new("_deleted", DataType::Boolean, false),
1643 Field::new("_version", DataType::UInt64, false),
1644 ];
1645 for col in &lance_columns {
1646 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1647 continue;
1648 }
1649 if col == "overflow_json" {
1650 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1651 } else {
1652 let arrow_type = label_props
1653 .and_then(|lp| lp.get(col.as_str()))
1654 .map(|meta| meta.r#type.to_arrow())
1655 .unwrap_or(DataType::LargeBinary);
1656 fields.push(Field::new(col, arrow_type, true));
1657 }
1658 }
1659 Arc::new(Schema::new(fields))
1660 }
1661 };
1662
1663 let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1665
1666 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1668 else {
1669 return Ok(RecordBatch::new_empty(output_schema.clone()));
1670 };
1671
1672 let merged = filter_deleted_rows(&merged)?;
1674 if merged.num_rows() == 0 {
1675 return Ok(RecordBatch::new_empty(output_schema.clone()));
1676 }
1677
1678 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1680
1681 if filtered.num_rows() == 0 {
1682 return Ok(RecordBatch::new_empty(output_schema.clone()));
1683 }
1684
1685 map_to_output_schema(
1687 &filtered,
1688 label,
1689 variable,
1690 projected_properties,
1691 output_schema,
1692 l0_ctx,
1693 )
1694}
1695
1696async fn columnar_scan_edge_batch_static(
1703 graph_ctx: &GraphExecutionContext,
1704 edge_type: &str,
1705 variable: &str,
1706 projected_properties: &[String],
1707 output_schema: &SchemaRef,
1708) -> DFResult<RecordBatch> {
1709 let storage = graph_ctx.storage();
1710 let l0_ctx = graph_ctx.l0_context();
1711 let uni_schema = storage.schema_manager().schema();
1712 let type_props = uni_schema.properties.get(edge_type);
1713
1714 let mut lance_columns: Vec<String> = vec![
1716 "eid".to_string(),
1717 "src_vid".to_string(),
1718 "dst_vid".to_string(),
1719 "op".to_string(),
1720 "_version".to_string(),
1721 ];
1722 for prop in projected_properties {
1723 if prop == "overflow_json" {
1724 push_column_if_absent(&mut lance_columns, "overflow_json");
1725 } else {
1726 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1727 if exists_in_schema {
1728 push_column_if_absent(&mut lance_columns, prop);
1729 }
1730 }
1731 }
1732
1733 let needs_overflow = projected_properties
1735 .iter()
1736 .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1737 if needs_overflow {
1738 push_column_if_absent(&mut lance_columns, "overflow_json");
1739 }
1740
1741 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
1743 let lance_batch = storage
1744 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
1745 .await
1746 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1747
1748 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1750
1751 let internal_schema = match &lance_deduped {
1754 Some(batch) => batch.schema(),
1755 None => {
1756 let mut fields = vec![
1757 Field::new("eid", DataType::UInt64, false),
1758 Field::new("src_vid", DataType::UInt64, false),
1759 Field::new("dst_vid", DataType::UInt64, false),
1760 Field::new("op", DataType::UInt8, false),
1761 Field::new("_version", DataType::UInt64, false),
1762 ];
1763 for col in &lance_columns {
1764 if matches!(
1765 col.as_str(),
1766 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1767 ) {
1768 continue;
1769 }
1770 if col == "overflow_json" {
1771 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1772 } else {
1773 let arrow_type = type_props
1774 .and_then(|tp| tp.get(col.as_str()))
1775 .map(|meta| meta.r#type.to_arrow())
1776 .unwrap_or(DataType::LargeBinary);
1777 fields.push(Field::new(col, arrow_type, true));
1778 }
1779 }
1780 Arc::new(Schema::new(fields))
1781 }
1782 };
1783
1784 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1786
1787 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1789 return Ok(RecordBatch::new_empty(output_schema.clone()));
1790 };
1791
1792 let merged = filter_deleted_edge_ops(&merged)?;
1794 if merged.num_rows() == 0 {
1795 return Ok(RecordBatch::new_empty(output_schema.clone()));
1796 }
1797
1798 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1800
1801 if filtered.num_rows() == 0 {
1802 return Ok(RecordBatch::new_empty(output_schema.clone()));
1803 }
1804
1805 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1807}
1808
1809async fn columnar_scan_schemaless_vertex_batch_static(
1816 graph_ctx: &GraphExecutionContext,
1817 label: &str,
1818 variable: &str,
1819 projected_properties: &[String],
1820 output_schema: &SchemaRef,
1821) -> DFResult<RecordBatch> {
1822 let storage = graph_ctx.storage();
1823 let l0_ctx = graph_ctx.l0_context();
1824
1825 let filter = {
1828 let mut parts = Vec::new();
1829
1830 if !label.is_empty() {
1832 if label.contains(':') {
1833 for lbl in label.split(':') {
1835 parts.push(format!("array_contains(labels, '{}')", lbl));
1836 }
1837 } else {
1838 parts.push(format!("array_contains(labels, '{}')", label));
1839 }
1840 }
1841
1842 if parts.is_empty() {
1843 None
1844 } else {
1845 Some(parts.join(" AND "))
1846 }
1847 };
1848
1849 let lance_batch = storage
1851 .scan_main_vertex_table(
1852 &["_vid", "_deleted", "labels", "props_json", "_version"],
1853 filter.as_deref(),
1854 )
1855 .await
1856 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1857
1858 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1860
1861 let internal_schema = match &lance_deduped {
1864 Some(batch) => batch.schema(),
1865 None => Arc::new(Schema::new(vec![
1866 Field::new("_vid", DataType::UInt64, false),
1867 Field::new("_deleted", DataType::Boolean, false),
1868 Field::new("labels", labels_data_type(), false),
1869 Field::new("props_json", DataType::LargeBinary, true),
1870 Field::new("_version", DataType::UInt64, false),
1871 ])),
1872 };
1873
1874 let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1876
1877 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1879 else {
1880 return Ok(RecordBatch::new_empty(output_schema.clone()));
1881 };
1882
1883 let merged = filter_deleted_rows(&merged)?;
1885 if merged.num_rows() == 0 {
1886 return Ok(RecordBatch::new_empty(output_schema.clone()));
1887 }
1888
1889 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1891
1892 if filtered.num_rows() == 0 {
1893 return Ok(RecordBatch::new_empty(output_schema.clone()));
1894 }
1895
1896 map_to_schemaless_output_schema(
1898 &filtered,
1899 variable,
1900 projected_properties,
1901 output_schema,
1902 l0_ctx,
1903 )
1904}
1905
1906fn build_l0_schemaless_vertex_batch(
1912 l0_ctx: &crate::query::df_graph::L0Context,
1913 label: &str,
1914 internal_schema: &SchemaRef,
1915) -> DFResult<RecordBatch> {
1916 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1919 let mut tombstones: HashSet<u64> = HashSet::new();
1920
1921 let label_filter: Vec<&str> = if label.is_empty() {
1923 vec![]
1924 } else if label.contains(':') {
1925 label.split(':').collect()
1926 } else {
1927 vec![label]
1928 };
1929
1930 for l0 in l0_ctx.iter_l0_buffers() {
1931 let guard = l0.read();
1932
1933 for vid in guard.vertex_tombstones.iter() {
1935 tombstones.insert(vid.as_u64());
1936 }
1937
1938 let vids: Vec<Vid> = if label_filter.is_empty() {
1940 guard.all_vertex_vids()
1941 } else if label_filter.len() == 1 {
1942 guard.vids_for_label(label_filter[0])
1943 } else {
1944 guard.vids_with_all_labels(&label_filter)
1945 };
1946
1947 for vid in vids {
1948 let vid_u64 = vid.as_u64();
1949 if tombstones.contains(&vid_u64) {
1950 continue;
1951 }
1952 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1953 let entry = vid_data
1954 .entry(vid_u64)
1955 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1956
1957 if let Some(props) = guard.vertex_properties.get(&vid) {
1959 for (k, v) in props {
1960 entry.0.insert(k.clone(), v.clone());
1961 }
1962 }
1963 if version > entry.1 {
1965 entry.1 = version;
1966 }
1967 if let Some(labels) = guard.vertex_labels.get(&vid) {
1969 entry.2 = labels.clone();
1970 }
1971 }
1972 }
1973
1974 for t in &tombstones {
1976 vid_data.remove(t);
1977 }
1978
1979 if vid_data.is_empty() {
1980 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1981 }
1982
1983 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1985 vids.sort_unstable();
1986
1987 let num_rows = vids.len();
1988 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1989
1990 for field in internal_schema.fields() {
1991 match field.name().as_str() {
1992 "_vid" => {
1993 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1994 }
1995 "labels" => {
1996 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1997 for vid_u64 in &vids {
1998 let (_, _, labels) = &vid_data[vid_u64];
1999 let values = labels_builder.values();
2000 for lbl in labels {
2001 values.append_value(lbl);
2002 }
2003 labels_builder.append(true);
2004 }
2005 columns.push(Arc::new(labels_builder.finish()));
2006 }
2007 "props_json" => {
2008 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2009 for vid_u64 in &vids {
2010 let (props, _, _) = &vid_data[vid_u64];
2011 if props.is_empty() {
2012 builder.append_null();
2013 } else {
2014 let json_obj: serde_json::Value = {
2016 let mut map = serde_json::Map::new();
2017 for (k, v) in props {
2018 let json_val: serde_json::Value = v.clone().into();
2019 map.insert(k.clone(), json_val);
2020 }
2021 serde_json::Value::Object(map)
2022 };
2023 match encode_cypher_value(&json_obj) {
2024 Ok(bytes) => builder.append_value(bytes),
2025 Err(_) => builder.append_null(),
2026 }
2027 }
2028 }
2029 columns.push(Arc::new(builder.finish()));
2030 }
2031 "_deleted" => {
2032 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2034 false;
2035 num_rows
2036 ])));
2037 }
2038 "_version" => {
2039 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2040 columns.push(Arc::new(UInt64Array::from(vals)));
2041 }
2042 _ => {
2043 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2045 }
2046 }
2047 }
2048
2049 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2050}
2051
2052fn map_to_schemaless_output_schema(
2059 batch: &RecordBatch,
2060 _variable: &str,
2061 projected_properties: &[String],
2062 output_schema: &SchemaRef,
2063 l0_ctx: &crate::query::df_graph::L0Context,
2064) -> DFResult<RecordBatch> {
2065 if batch.num_rows() == 0 {
2066 return Ok(RecordBatch::new_empty(output_schema.clone()));
2067 }
2068
2069 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2070
2071 let vid_col = batch
2073 .column_by_name("_vid")
2074 .ok_or_else(|| {
2075 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2076 })?
2077 .clone();
2078 let vid_arr = vid_col
2079 .as_any()
2080 .downcast_ref::<UInt64Array>()
2081 .ok_or_else(|| {
2082 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2083 })?;
2084 columns.push(vid_col.clone());
2085
2086 let labels_col = batch.column_by_name("labels");
2088 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2089
2090 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2091 for i in 0..vid_arr.len() {
2092 let vid_u64 = vid_arr.value(i);
2093 let vid = Vid::from(vid_u64);
2094
2095 let mut row_labels: Vec<String> = Vec::new();
2097 if let Some(arr) = labels_arr
2098 && !arr.is_null(i)
2099 {
2100 let list_val = arr.value(i);
2101 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2102 for j in 0..str_arr.len() {
2103 if !str_arr.is_null(j) {
2104 row_labels.push(str_arr.value(j).to_string());
2105 }
2106 }
2107 }
2108 }
2109
2110 for l0 in l0_ctx.iter_l0_buffers() {
2112 let guard = l0.read();
2113 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2114 for lbl in l0_labels {
2115 if !row_labels.contains(lbl) {
2116 row_labels.push(lbl.clone());
2117 }
2118 }
2119 }
2120 }
2121
2122 let values = labels_builder.values();
2123 for lbl in &row_labels {
2124 values.append_value(lbl);
2125 }
2126 labels_builder.append(true);
2127 }
2128 columns.push(Arc::new(labels_builder.finish()));
2129
2130 let props_col = batch.column_by_name("props_json");
2132 let props_arr =
2133 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2134
2135 for prop in projected_properties {
2136 if prop == "_all_props" {
2137 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2140 let guard = l0.read();
2141 !guard.vertex_properties.is_empty()
2142 });
2143 if !any_l0_has_vertex_props {
2144 match props_col {
2145 Some(col) => columns.push(col.clone()),
2146 None => {
2147 columns.push(arrow_array::new_null_array(
2148 &DataType::LargeBinary,
2149 batch.num_rows(),
2150 ));
2151 }
2152 }
2153 } else {
2154 let col = build_all_props_column_with_l0_overlay(
2155 batch.num_rows(),
2156 vid_arr,
2157 props_arr,
2158 l0_ctx,
2159 );
2160 columns.push(col);
2161 }
2162 } else {
2163 let expected_type = output_schema
2168 .field_with_name(&format!("{_variable}.{prop}"))
2169 .map(|f| f.data_type().clone())
2170 .unwrap_or(DataType::LargeBinary);
2171
2172 if expected_type == DataType::LargeBinary {
2173 let col = build_overflow_property_column(
2174 batch.num_rows(),
2175 vid_arr,
2176 props_arr,
2177 prop,
2178 l0_ctx,
2179 );
2180 columns.push(col);
2181 } else {
2182 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2184 for i in 0..batch.num_rows() {
2185 let vid = Vid::from(vid_arr.value(i));
2186 let resolved =
2187 resolve_l0_property(&vid, prop, l0_ctx)
2188 .flatten()
2189 .or_else(|| {
2190 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2191 uni_common::cypher_value_codec::decode(&bytes).ok()
2192 })
2193 });
2194 if let Some(val) = resolved {
2195 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2196 }
2197 }
2198 let vids: Vec<Vid> = (0..batch.num_rows())
2199 .map(|i| Vid::from(vid_arr.value(i)))
2200 .collect();
2201 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2202 .unwrap_or_else(|_| {
2203 arrow_array::new_null_array(&expected_type, batch.num_rows())
2204 });
2205 columns.push(col);
2206 }
2207 }
2208 }
2209
2210 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2211}
2212
2213pub(crate) fn get_property_value(
2215 vid: &Vid,
2216 props_map: &HashMap<Vid, Properties>,
2217 prop_name: &str,
2218) -> Option<Value> {
2219 if prop_name == "_all_props" {
2220 return props_map.get(vid).map(|p| {
2221 let map: HashMap<String, Value> =
2222 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2223 Value::Map(map)
2224 });
2225 }
2226 props_map
2227 .get(vid)
2228 .and_then(|props| props.get(prop_name))
2229 .cloned()
2230}
2231
2232pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2236 let uni_val: uni_common::Value = val.clone().into();
2237 Ok(uni_common::cypher_value_codec::encode(&uni_val))
2238}
2239
2240macro_rules! build_numeric_column {
2242 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2243 let mut builder = <$builder_ty>::new();
2244 for vid in $vids {
2245 match get_property_value(vid, $props_map, $prop_name) {
2246 Some(ref v) => {
2247 if let Some(val) = $extractor(v) {
2248 builder.append_value($cast(val));
2249 } else {
2250 builder.append_null();
2251 }
2252 }
2253 None => builder.append_null(),
2254 }
2255 }
2256 Ok(Arc::new(builder.finish()) as ArrayRef)
2257 }};
2258}
2259
2260pub(crate) fn build_property_column_static(
2262 vids: &[Vid],
2263 props_map: &HashMap<Vid, Properties>,
2264 prop_name: &str,
2265 data_type: &DataType,
2266) -> DFResult<ArrayRef> {
2267 match data_type {
2268 DataType::LargeBinary => {
2269 use arrow_array::builder::LargeBinaryBuilder;
2271 let mut builder = LargeBinaryBuilder::new();
2272
2273 for vid in vids {
2274 match get_property_value(vid, props_map, prop_name) {
2275 Some(Value::Null) | None => builder.append_null(),
2276 Some(Value::Bytes(bytes)) => {
2277 builder.append_value(&bytes);
2278 }
2279 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2280 let bytes: Vec<u8> = arr
2283 .iter()
2284 .filter_map(|v| v.as_u64().map(|n| n as u8))
2285 .collect();
2286 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2287 builder.append_value(&bytes);
2288 } else {
2289 let json_val: serde_json::Value = Value::List(arr).into();
2290 match encode_cypher_value(&json_val) {
2291 Ok(encoded) => builder.append_value(encoded),
2292 Err(_) => builder.append_null(),
2293 }
2294 }
2295 }
2296 Some(val @ Value::Temporal(_)) => {
2297 builder.append_value(uni_common::cypher_value_codec::encode(&val));
2300 }
2301 Some(val) => {
2302 let json_val: serde_json::Value = val.into();
2304 match encode_cypher_value(&json_val) {
2305 Ok(bytes) => builder.append_value(bytes),
2306 Err(_) => builder.append_null(),
2307 }
2308 }
2309 }
2310 }
2311 Ok(Arc::new(builder.finish()))
2312 }
2313 DataType::Binary => {
2314 let mut builder = BinaryBuilder::new();
2316 for vid in vids {
2317 let bytes = get_property_value(vid, props_map, prop_name)
2318 .filter(|v| !v.is_null())
2319 .and_then(|v| {
2320 let json_val: serde_json::Value = v.into();
2321 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2322 })
2323 .and_then(|crdt| crdt.to_msgpack().ok());
2324 match bytes {
2325 Some(b) => builder.append_value(&b),
2326 None => builder.append_null(),
2327 }
2328 }
2329 Ok(Arc::new(builder.finish()))
2330 }
2331 DataType::Utf8 => {
2332 let mut builder = StringBuilder::new();
2333 for vid in vids {
2334 match get_property_value(vid, props_map, prop_name) {
2335 Some(Value::String(s)) => builder.append_value(s),
2336 Some(Value::Null) | None => builder.append_null(),
2337 Some(other) => builder.append_value(other.to_string()),
2338 }
2339 }
2340 Ok(Arc::new(builder.finish()))
2341 }
2342 DataType::Int64 => {
2343 build_numeric_column!(
2344 vids,
2345 props_map,
2346 prop_name,
2347 Int64Builder,
2348 |v: &Value| v.as_i64(),
2349 |v| v
2350 )
2351 }
2352 DataType::Int32 => {
2353 build_numeric_column!(
2354 vids,
2355 props_map,
2356 prop_name,
2357 Int32Builder,
2358 |v: &Value| v.as_i64(),
2359 |v: i64| v as i32
2360 )
2361 }
2362 DataType::Float64 => {
2363 build_numeric_column!(
2364 vids,
2365 props_map,
2366 prop_name,
2367 Float64Builder,
2368 |v: &Value| v.as_f64(),
2369 |v| v
2370 )
2371 }
2372 DataType::Float32 => {
2373 build_numeric_column!(
2374 vids,
2375 props_map,
2376 prop_name,
2377 Float32Builder,
2378 |v: &Value| v.as_f64(),
2379 |v: f64| v as f32
2380 )
2381 }
2382 DataType::Boolean => {
2383 let mut builder = BooleanBuilder::new();
2384 for vid in vids {
2385 match get_property_value(vid, props_map, prop_name) {
2386 Some(Value::Bool(b)) => builder.append_value(b),
2387 _ => builder.append_null(),
2388 }
2389 }
2390 Ok(Arc::new(builder.finish()))
2391 }
2392 DataType::UInt64 => {
2393 build_numeric_column!(
2394 vids,
2395 props_map,
2396 prop_name,
2397 UInt64Builder,
2398 |v: &Value| v.as_u64(),
2399 |v| v
2400 )
2401 }
2402 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2403 let values_builder = Float32Builder::new();
2405 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2406 for vid in vids {
2407 match get_property_value(vid, props_map, prop_name) {
2408 Some(Value::Vector(v)) => {
2409 for val in v {
2410 list_builder.values().append_value(val);
2411 }
2412 list_builder.append(true);
2413 }
2414 Some(Value::List(arr)) => {
2415 for v in arr {
2416 list_builder
2417 .values()
2418 .append_value(v.as_f64().unwrap_or(0.0) as f32);
2419 }
2420 list_builder.append(true);
2421 }
2422 _ => {
2423 for _ in 0..*dim {
2425 list_builder.values().append_null();
2426 }
2427 list_builder.append(false);
2428 }
2429 }
2430 }
2431 Ok(Arc::new(list_builder.finish()))
2432 }
2433 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2434 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2436 for vid in vids {
2437 match get_property_value(vid, props_map, prop_name) {
2438 Some(Value::Temporal(tv)) => match tv {
2439 uni_common::TemporalValue::DateTime {
2440 nanos_since_epoch, ..
2441 }
2442 | uni_common::TemporalValue::LocalDateTime {
2443 nanos_since_epoch, ..
2444 } => {
2445 builder.append_value(nanos_since_epoch);
2446 }
2447 uni_common::TemporalValue::Date { days_since_epoch } => {
2448 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2449 }
2450 _ => builder.append_null(),
2451 },
2452 Some(Value::String(s)) => match parse_datetime_utc(&s) {
2453 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2454 Err(_) => builder.append_null(),
2455 },
2456 Some(Value::Int(n)) => {
2457 builder.append_value(n);
2458 }
2459 _ => builder.append_null(),
2460 }
2461 }
2462 Ok(Arc::new(builder.finish()))
2463 }
2464 DataType::Date32 => {
2465 let mut builder = Date32Builder::new();
2466 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2467 for vid in vids {
2468 match get_property_value(vid, props_map, prop_name) {
2469 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2470 builder.append_value(days_since_epoch);
2471 }
2472 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2473 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2474 Err(_) => builder.append_null(),
2475 },
2476 Some(Value::Int(n)) => {
2477 builder.append_value(n as i32);
2478 }
2479 _ => builder.append_null(),
2480 }
2481 }
2482 Ok(Arc::new(builder.finish()))
2483 }
2484 DataType::Time64(TimeUnit::Nanosecond) => {
2485 let mut builder = Time64NanosecondBuilder::new();
2486 for vid in vids {
2487 match get_property_value(vid, props_map, prop_name) {
2488 Some(Value::Temporal(
2489 uni_common::TemporalValue::LocalTime {
2490 nanos_since_midnight,
2491 }
2492 | uni_common::TemporalValue::Time {
2493 nanos_since_midnight,
2494 ..
2495 },
2496 )) => {
2497 builder.append_value(nanos_since_midnight);
2498 }
2499 Some(Value::Temporal(_)) => builder.append_null(),
2500 Some(Value::String(s)) => {
2501 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2502 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2503 {
2504 Ok(t) => {
2505 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2506 + t.nanosecond() as i64;
2507 builder.append_value(nanos);
2508 }
2509 Err(_) => builder.append_null(),
2510 }
2511 }
2512 Some(Value::Int(n)) => {
2513 builder.append_value(n);
2514 }
2515 _ => builder.append_null(),
2516 }
2517 }
2518 Ok(Arc::new(builder.finish()))
2519 }
2520 DataType::Interval(IntervalUnit::MonthDayNano) => {
2521 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2522 Vec::with_capacity(vids.len());
2523 for vid in vids {
2524 match get_property_value(vid, props_map, prop_name) {
2525 Some(Value::Temporal(uni_common::TemporalValue::Duration {
2526 months,
2527 days,
2528 nanos,
2529 })) => {
2530 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2531 months: months as i32,
2532 days: days as i32,
2533 nanoseconds: nanos,
2534 }));
2535 }
2536 Some(Value::Int(_n)) => {
2537 values.push(None);
2538 }
2539 _ => values.push(None),
2540 }
2541 }
2542 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2543 Ok(Arc::new(arr))
2544 }
2545 DataType::List(inner_field) => {
2546 build_list_property_column(vids, props_map, prop_name, inner_field)
2547 }
2548 DataType::Struct(fields) => {
2549 build_struct_property_column(vids, props_map, prop_name, fields)
2550 }
2551 DataType::FixedSizeBinary(24) => {
2552 use arrow_array::builder::FixedSizeBinaryBuilder;
2554 const BTIC_LEN: i32 = 24;
2555 let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
2556 for vid in vids {
2557 match get_property_value(vid, props_map, prop_name) {
2558 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
2559 match uni_btic::Btic::new(lo, hi, meta) {
2560 Ok(b) => {
2561 builder
2562 .append_value(uni_btic::encode::encode(&b))
2563 .map_err(arrow_err)?;
2564 }
2565 Err(e) => {
2566 tracing::warn!(
2567 "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
2568 prop_name,
2569 lo,
2570 hi,
2571 meta,
2572 e
2573 );
2574 builder.append_null()
2575 }
2576 }
2577 }
2578 Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
2579 Ok(b) => {
2580 builder
2581 .append_value(uni_btic::encode::encode(&b))
2582 .map_err(arrow_err)?;
2583 }
2584 Err(e) => {
2585 tracing::warn!(
2586 "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
2587 prop_name,
2588 s,
2589 e
2590 );
2591 builder.append_null()
2592 }
2593 },
2594 _ => builder.append_null(),
2595 }
2596 }
2597 Ok(Arc::new(builder.finish()))
2598 }
2599 _ => {
2601 let mut builder = StringBuilder::new();
2602 for vid in vids {
2603 match get_property_value(vid, props_map, prop_name) {
2604 Some(Value::Null) | None => builder.append_null(),
2605 Some(other) => builder.append_value(other.to_string()),
2606 }
2607 }
2608 Ok(Arc::new(builder.finish()))
2609 }
2610 }
2611}
2612
2613fn build_list_property_column(
2615 vids: &[Vid],
2616 props_map: &HashMap<Vid, Properties>,
2617 prop_name: &str,
2618 inner_field: &Arc<Field>,
2619) -> DFResult<ArrayRef> {
2620 match inner_field.data_type() {
2621 DataType::Utf8 => {
2622 let mut builder = ListBuilder::new(StringBuilder::new());
2623 for vid in vids {
2624 match get_property_value(vid, props_map, prop_name) {
2625 Some(Value::List(arr)) => {
2626 for v in arr {
2627 match v {
2628 Value::String(s) => builder.values().append_value(s),
2629 Value::Null => builder.values().append_null(),
2630 other => builder.values().append_value(format!("{other:?}")),
2631 }
2632 }
2633 builder.append(true);
2634 }
2635 _ => builder.append(false),
2636 }
2637 }
2638 Ok(Arc::new(builder.finish()))
2639 }
2640 DataType::Int64 => {
2641 let mut builder = ListBuilder::new(Int64Builder::new());
2642 for vid in vids {
2643 match get_property_value(vid, props_map, prop_name) {
2644 Some(Value::List(arr)) => {
2645 for v in arr {
2646 match v.as_i64() {
2647 Some(n) => builder.values().append_value(n),
2648 None => builder.values().append_null(),
2649 }
2650 }
2651 builder.append(true);
2652 }
2653 _ => builder.append(false),
2654 }
2655 }
2656 Ok(Arc::new(builder.finish()))
2657 }
2658 DataType::Float64 => {
2659 let mut builder = ListBuilder::new(Float64Builder::new());
2660 for vid in vids {
2661 match get_property_value(vid, props_map, prop_name) {
2662 Some(Value::List(arr)) => {
2663 for v in arr {
2664 match v.as_f64() {
2665 Some(n) => builder.values().append_value(n),
2666 None => builder.values().append_null(),
2667 }
2668 }
2669 builder.append(true);
2670 }
2671 _ => builder.append(false),
2672 }
2673 }
2674 Ok(Arc::new(builder.finish()))
2675 }
2676 DataType::Boolean => {
2677 let mut builder = ListBuilder::new(BooleanBuilder::new());
2678 for vid in vids {
2679 match get_property_value(vid, props_map, prop_name) {
2680 Some(Value::List(arr)) => {
2681 for v in arr {
2682 match v.as_bool() {
2683 Some(b) => builder.values().append_value(b),
2684 None => builder.values().append_null(),
2685 }
2686 }
2687 builder.append(true);
2688 }
2689 _ => builder.append(false),
2690 }
2691 }
2692 Ok(Arc::new(builder.finish()))
2693 }
2694 DataType::Struct(fields) => {
2695 build_list_of_structs_column(vids, props_map, prop_name, fields)
2697 }
2698 _ => {
2700 let mut builder = ListBuilder::new(StringBuilder::new());
2701 for vid in vids {
2702 match get_property_value(vid, props_map, prop_name) {
2703 Some(Value::List(arr)) => {
2704 for v in arr {
2705 match v {
2706 Value::Null => builder.values().append_null(),
2707 other => builder.values().append_value(format!("{other:?}")),
2708 }
2709 }
2710 builder.append(true);
2711 }
2712 _ => builder.append(false),
2713 }
2714 }
2715 Ok(Arc::new(builder.finish()))
2716 }
2717 }
2718}
2719
2720fn build_list_of_structs_column(
2726 vids: &[Vid],
2727 props_map: &HashMap<Vid, Properties>,
2728 prop_name: &str,
2729 fields: &Fields,
2730) -> DFResult<ArrayRef> {
2731 use arrow_array::StructArray;
2732
2733 let values: Vec<Option<Value>> = vids
2734 .iter()
2735 .map(|vid| get_property_value(vid, props_map, prop_name))
2736 .collect();
2737
2738 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2741 .iter()
2742 .map(|val| match val {
2743 Some(Value::List(arr)) => {
2744 let objs: Vec<HashMap<String, Value>> = arr
2745 .iter()
2746 .filter_map(|v| {
2747 if let Value::Map(m) = v {
2748 Some(m.clone())
2749 } else {
2750 None
2751 }
2752 })
2753 .collect();
2754 if objs.is_empty() { None } else { Some(objs) }
2755 }
2756 Some(Value::Map(obj)) => {
2757 let kv_pairs: Vec<HashMap<String, Value>> = obj
2759 .iter()
2760 .map(|(k, v)| {
2761 let mut m = HashMap::new();
2762 m.insert("key".to_string(), Value::String(k.clone()));
2763 m.insert("value".to_string(), v.clone());
2764 m
2765 })
2766 .collect();
2767 Some(kv_pairs)
2768 }
2769 _ => None,
2770 })
2771 .collect();
2772
2773 let total_items: usize = rows
2774 .iter()
2775 .filter_map(|r| r.as_ref())
2776 .map(|v| v.len())
2777 .sum();
2778
2779 let child_arrays: Vec<ArrayRef> = fields
2781 .iter()
2782 .map(|field| {
2783 let field_name = field.name();
2784 match field.data_type() {
2785 DataType::Utf8 => {
2786 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2787 for obj in rows.iter().flatten().flatten() {
2788 match obj.get(field_name) {
2789 Some(Value::String(s)) => builder.append_value(s),
2790 Some(Value::Null) | None => builder.append_null(),
2791 Some(other) => builder.append_value(format!("{other:?}")),
2792 }
2793 }
2794 Arc::new(builder.finish()) as ArrayRef
2795 }
2796 DataType::Int64 => {
2797 let mut builder = Int64Builder::with_capacity(total_items);
2798 for obj in rows.iter().flatten().flatten() {
2799 match obj.get(field_name).and_then(|v| v.as_i64()) {
2800 Some(n) => builder.append_value(n),
2801 None => builder.append_null(),
2802 }
2803 }
2804 Arc::new(builder.finish()) as ArrayRef
2805 }
2806 DataType::Float64 => {
2807 let mut builder = Float64Builder::with_capacity(total_items);
2808 for obj in rows.iter().flatten().flatten() {
2809 match obj.get(field_name).and_then(|v| v.as_f64()) {
2810 Some(n) => builder.append_value(n),
2811 None => builder.append_null(),
2812 }
2813 }
2814 Arc::new(builder.finish()) as ArrayRef
2815 }
2816 _ => {
2818 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2819 for obj in rows.iter().flatten().flatten() {
2820 match obj.get(field_name) {
2821 Some(Value::Null) | None => builder.append_null(),
2822 Some(other) => builder.append_value(format!("{other:?}")),
2823 }
2824 }
2825 Arc::new(builder.finish()) as ArrayRef
2826 }
2827 }
2828 })
2829 .collect();
2830
2831 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2833 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2834
2835 let mut offsets = Vec::with_capacity(vids.len() + 1);
2837 let mut nulls = Vec::with_capacity(vids.len());
2838 let mut offset = 0i32;
2839 offsets.push(offset);
2840 for row in &rows {
2841 match row {
2842 Some(objs) => {
2843 offset += objs.len() as i32;
2844 offsets.push(offset);
2845 nulls.push(true);
2846 }
2847 None => {
2848 offsets.push(offset);
2849 nulls.push(false);
2850 }
2851 }
2852 }
2853
2854 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2855 let list_array = arrow_array::ListArray::try_new(
2856 list_field,
2857 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2858 Arc::new(struct_array),
2859 Some(arrow::buffer::NullBuffer::from(nulls)),
2860 )
2861 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2862
2863 Ok(Arc::new(list_array))
2864}
2865
2866fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2869 use uni_common::value::TemporalValue;
2870 let mut m = HashMap::new();
2871 match tv {
2872 TemporalValue::DateTime {
2873 nanos_since_epoch,
2874 offset_seconds,
2875 timezone_name,
2876 } => {
2877 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2878 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2879 if let Some(tz) = timezone_name {
2880 m.insert("timezone_name".into(), Value::String(tz.clone()));
2881 }
2882 }
2883 TemporalValue::LocalDateTime { nanos_since_epoch } => {
2884 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2885 }
2886 TemporalValue::Time {
2887 nanos_since_midnight,
2888 offset_seconds,
2889 } => {
2890 m.insert(
2891 "nanos_since_midnight".into(),
2892 Value::Int(*nanos_since_midnight),
2893 );
2894 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2895 }
2896 TemporalValue::LocalTime {
2897 nanos_since_midnight,
2898 } => {
2899 m.insert(
2900 "nanos_since_midnight".into(),
2901 Value::Int(*nanos_since_midnight),
2902 );
2903 }
2904 TemporalValue::Date { days_since_epoch } => {
2905 m.insert(
2906 "days_since_epoch".into(),
2907 Value::Int(*days_since_epoch as i64),
2908 );
2909 }
2910 TemporalValue::Duration {
2911 months,
2912 days,
2913 nanos,
2914 } => {
2915 m.insert("months".into(), Value::Int(*months));
2916 m.insert("days".into(), Value::Int(*days));
2917 m.insert("nanos".into(), Value::Int(*nanos));
2918 }
2919 TemporalValue::Btic { lo, hi, meta } => {
2920 m.insert("lo".into(), Value::Int(*lo));
2921 m.insert("hi".into(), Value::Int(*hi));
2922 m.insert("meta".into(), Value::Int(*meta as i64));
2923 }
2924 }
2925 m
2926}
2927
2928fn build_struct_property_column(
2930 vids: &[Vid],
2931 props_map: &HashMap<Vid, Properties>,
2932 prop_name: &str,
2933 fields: &Fields,
2934) -> DFResult<ArrayRef> {
2935 use arrow_array::StructArray;
2936
2937 let values: Vec<Option<Value>> = vids
2940 .iter()
2941 .map(|vid| {
2942 let val = get_property_value(vid, props_map, prop_name);
2943 match val {
2944 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2945 other => other,
2946 }
2947 })
2948 .collect();
2949
2950 let child_arrays: Vec<ArrayRef> = fields
2951 .iter()
2952 .map(|field| {
2953 let field_name = field.name();
2954 match field.data_type() {
2955 DataType::Float64 => {
2956 let mut builder = Float64Builder::with_capacity(vids.len());
2957 for val in &values {
2958 match val {
2959 Some(Value::Map(obj)) => {
2960 match obj.get(field_name).and_then(|v| v.as_f64()) {
2961 Some(n) => builder.append_value(n),
2962 None => builder.append_null(),
2963 }
2964 }
2965 _ => builder.append_null(),
2966 }
2967 }
2968 Arc::new(builder.finish()) as ArrayRef
2969 }
2970 DataType::Utf8 => {
2971 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2972 for val in &values {
2973 match val {
2974 Some(Value::Map(obj)) => match obj.get(field_name) {
2975 Some(Value::String(s)) => builder.append_value(s),
2976 Some(Value::Null) | None => builder.append_null(),
2977 Some(other) => builder.append_value(format!("{other:?}")),
2978 },
2979 _ => builder.append_null(),
2980 }
2981 }
2982 Arc::new(builder.finish()) as ArrayRef
2983 }
2984 DataType::Int64 => {
2985 let mut builder = Int64Builder::with_capacity(vids.len());
2986 for val in &values {
2987 match val {
2988 Some(Value::Map(obj)) => {
2989 match obj.get(field_name).and_then(|v| v.as_i64()) {
2990 Some(n) => builder.append_value(n),
2991 None => builder.append_null(),
2992 }
2993 }
2994 _ => builder.append_null(),
2995 }
2996 }
2997 Arc::new(builder.finish()) as ArrayRef
2998 }
2999 DataType::Timestamp(_, _) => {
3000 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3001 for val in &values {
3002 match val {
3003 Some(Value::Map(obj)) => {
3004 match obj.get(field_name).and_then(|v| v.as_i64()) {
3005 Some(n) => builder.append_value(n),
3006 None => builder.append_null(),
3007 }
3008 }
3009 _ => builder.append_null(),
3010 }
3011 }
3012 Arc::new(builder.finish()) as ArrayRef
3013 }
3014 DataType::Int32 => {
3015 let mut builder = Int32Builder::with_capacity(vids.len());
3016 for val in &values {
3017 match val {
3018 Some(Value::Map(obj)) => {
3019 match obj.get(field_name).and_then(|v| v.as_i64()) {
3020 Some(n) => builder.append_value(n as i32),
3021 None => builder.append_null(),
3022 }
3023 }
3024 _ => builder.append_null(),
3025 }
3026 }
3027 Arc::new(builder.finish()) as ArrayRef
3028 }
3029 DataType::Time64(_) => {
3030 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3031 for val in &values {
3032 match val {
3033 Some(Value::Map(obj)) => {
3034 match obj.get(field_name).and_then(|v| v.as_i64()) {
3035 Some(n) => builder.append_value(n),
3036 None => builder.append_null(),
3037 }
3038 }
3039 _ => builder.append_null(),
3040 }
3041 }
3042 Arc::new(builder.finish()) as ArrayRef
3043 }
3044 _ => {
3046 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3047 for val in &values {
3048 match val {
3049 Some(Value::Map(obj)) => match obj.get(field_name) {
3050 Some(Value::Null) | None => builder.append_null(),
3051 Some(other) => builder.append_value(format!("{other:?}")),
3052 },
3053 _ => builder.append_null(),
3054 }
3055 }
3056 Arc::new(builder.finish()) as ArrayRef
3057 }
3058 }
3059 })
3060 .collect();
3061
3062 let nulls: Vec<bool> = values
3064 .iter()
3065 .map(|v| matches!(v, Some(Value::Map(_))))
3066 .collect();
3067
3068 let struct_array = StructArray::try_new(
3069 fields.clone(),
3070 child_arrays,
3071 Some(arrow::buffer::NullBuffer::from(nulls)),
3072 )
3073 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3074
3075 Ok(Arc::new(struct_array))
3076}
3077
3078impl Stream for GraphScanStream {
3079 type Item = DFResult<RecordBatch>;
3080
3081 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3082 loop {
3083 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3085
3086 match state {
3087 GraphScanState::Init => {
3088 let graph_ctx = self.graph_ctx.clone();
3090 let label = self.label.clone();
3091 let variable = self.variable.clone();
3092 let properties = self.properties.clone();
3093 let is_edge_scan = self.is_edge_scan;
3094 let is_schemaless = self.is_schemaless;
3095 let schema = self.schema.clone();
3096
3097 let fut = async move {
3098 graph_ctx.check_timeout().map_err(|e| {
3099 datafusion::error::DataFusionError::Execution(e.to_string())
3100 })?;
3101
3102 let batch = if is_edge_scan {
3103 columnar_scan_edge_batch_static(
3104 &graph_ctx,
3105 &label,
3106 &variable,
3107 &properties,
3108 &schema,
3109 )
3110 .await?
3111 } else if is_schemaless {
3112 columnar_scan_schemaless_vertex_batch_static(
3113 &graph_ctx,
3114 &label,
3115 &variable,
3116 &properties,
3117 &schema,
3118 )
3119 .await?
3120 } else {
3121 columnar_scan_vertex_batch_static(
3122 &graph_ctx,
3123 &label,
3124 &variable,
3125 &properties,
3126 &schema,
3127 )
3128 .await?
3129 };
3130 Ok(Some(batch))
3131 };
3132
3133 self.state = GraphScanState::Executing(Box::pin(fut));
3134 }
3136 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3137 Poll::Ready(Ok(batch)) => {
3138 self.state = GraphScanState::Done;
3139 self.metrics
3140 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3141 return Poll::Ready(batch.map(Ok));
3142 }
3143 Poll::Ready(Err(e)) => {
3144 self.state = GraphScanState::Done;
3145 return Poll::Ready(Some(Err(e)));
3146 }
3147 Poll::Pending => {
3148 self.state = GraphScanState::Executing(fut);
3149 return Poll::Pending;
3150 }
3151 },
3152 GraphScanState::Done => {
3153 return Poll::Ready(None);
3154 }
3155 }
3156 }
3157 }
3158}
3159
3160impl RecordBatchStream for GraphScanStream {
3161 fn schema(&self) -> SchemaRef {
3162 self.schema.clone()
3163 }
3164}
3165
3166#[cfg(test)]
3167mod tests {
3168 use super::*;
3169
3170 #[test]
3171 fn test_build_vertex_schema() {
3172 let uni_schema = UniSchema::default();
3173 let schema = GraphScanExec::build_vertex_schema(
3174 "n",
3175 "Person",
3176 &["name".to_string(), "age".to_string()],
3177 &uni_schema,
3178 );
3179
3180 assert_eq!(schema.fields().len(), 4);
3181 assert_eq!(schema.field(0).name(), "n._vid");
3182 assert_eq!(schema.field(1).name(), "n._labels");
3183 assert_eq!(schema.field(2).name(), "n.name");
3184 assert_eq!(schema.field(3).name(), "n.age");
3185 }
3186
3187 #[test]
3188 fn test_build_edge_schema() {
3189 let uni_schema = UniSchema::default();
3190 let schema =
3191 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3192
3193 assert_eq!(schema.fields().len(), 4);
3194 assert_eq!(schema.field(0).name(), "r._eid");
3195 assert_eq!(schema.field(1).name(), "r._src_vid");
3196 assert_eq!(schema.field(2).name(), "r._dst_vid");
3197 assert_eq!(schema.field(3).name(), "r.weight");
3198 }
3199
3200 #[test]
3201 fn test_build_schemaless_vertex_schema() {
3202 let empty_schema = uni_common::core::schema::Schema::default();
3203 let schema = GraphScanExec::build_schemaless_vertex_schema(
3204 "n",
3205 &["name".to_string(), "age".to_string()],
3206 &empty_schema,
3207 );
3208
3209 assert_eq!(schema.fields().len(), 4);
3210 assert_eq!(schema.field(0).name(), "n._vid");
3211 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3212 assert_eq!(schema.field(1).name(), "n._labels");
3213 assert_eq!(schema.field(2).name(), "n.name");
3214 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3216 assert_eq!(schema.field(3).name(), "n.age");
3217 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3218 }
3219
3220 #[test]
3221 fn test_schemaless_all_scan_has_empty_label() {
3222 let empty_schema = uni_common::core::schema::Schema::default();
3223 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3224
3225 assert_eq!(schema.fields().len(), 2);
3227 assert_eq!(schema.field(0).name(), "n._vid");
3228 assert_eq!(schema.field(1).name(), "n._labels");
3229 }
3230
3231 #[test]
3232 fn test_cypher_value_all_props_extraction() {
3233 let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3235 let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3236
3237 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3239 match decoded {
3240 uni_common::Value::Map(map) => {
3241 let age_val = map.get("age").unwrap();
3242 assert_eq!(age_val, &uni_common::Value::Int(30));
3243 }
3244 _ => panic!("Expected Map"),
3245 }
3246
3247 let single_val = serde_json::json!(30);
3249 let single_bytes = encode_cypher_value(&single_val).unwrap();
3250 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3251 assert_eq!(single_decoded, uni_common::Value::Int(30));
3252 }
3253
3254 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3256 let schema = Arc::new(Schema::new(vec![
3257 Field::new("_vid", DataType::UInt64, false),
3258 Field::new("_deleted", DataType::Boolean, false),
3259 Field::new("_version", DataType::UInt64, false),
3260 Field::new("name", DataType::Utf8, true),
3261 ]));
3262 let names: Vec<String> = vids
3264 .iter()
3265 .zip(versions.iter())
3266 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3267 .collect();
3268 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3269
3270 RecordBatch::try_new(
3271 schema,
3272 vec![
3273 Arc::new(UInt64Array::from(vids.to_vec())),
3274 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3275 Arc::new(UInt64Array::from(versions.to_vec())),
3276 Arc::new(name_arr),
3277 ],
3278 )
3279 .unwrap()
3280 }
3281
3282 #[test]
3283 fn test_mvcc_dedup_multiple_versions() {
3284 let batch = make_mvcc_batch(
3287 &[1, 1, 1, 2, 2],
3288 &[3, 1, 5, 2, 4],
3289 &[false, false, false, false, false],
3290 );
3291
3292 let result = mvcc_dedup_batch(&batch).unwrap();
3293 assert_eq!(result.num_rows(), 2);
3294
3295 let vid_col = result
3296 .column_by_name("_vid")
3297 .unwrap()
3298 .as_any()
3299 .downcast_ref::<UInt64Array>()
3300 .unwrap();
3301 let ver_col = result
3302 .column_by_name("_version")
3303 .unwrap()
3304 .as_any()
3305 .downcast_ref::<UInt64Array>()
3306 .unwrap();
3307 let name_col = result
3308 .column_by_name("name")
3309 .unwrap()
3310 .as_any()
3311 .downcast_ref::<arrow_array::StringArray>()
3312 .unwrap();
3313
3314 assert_eq!(vid_col.value(0), 1);
3316 assert_eq!(ver_col.value(0), 5);
3317 assert_eq!(name_col.value(0), "v1_ver5");
3318
3319 assert_eq!(vid_col.value(1), 2);
3320 assert_eq!(ver_col.value(1), 4);
3321 assert_eq!(name_col.value(1), "v2_ver4");
3322 }
3323
3324 #[test]
3325 fn test_mvcc_dedup_single_rows() {
3326 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3328 let result = mvcc_dedup_batch(&batch).unwrap();
3329 assert_eq!(result.num_rows(), 3);
3330 }
3331
3332 #[test]
3333 fn test_mvcc_dedup_empty() {
3334 let batch = make_mvcc_batch(&[], &[], &[]);
3335 let result = mvcc_dedup_batch(&batch).unwrap();
3336 assert_eq!(result.num_rows(), 0);
3337 }
3338
3339 #[test]
3340 fn test_filter_l0_tombstones_removes_tombstoned() {
3341 use crate::query::df_graph::L0Context;
3342
3343 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3345
3346 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3348 {
3349 }
3353 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3354 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3355
3356 let l0_ctx = L0Context {
3357 current_l0: Some(l0_buf),
3358 transaction_l0: None,
3359 pending_flush_l0s: vec![],
3360 };
3361
3362 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3363 assert_eq!(result.num_rows(), 2);
3364
3365 let vid_col = result
3366 .column_by_name("_vid")
3367 .unwrap()
3368 .as_any()
3369 .downcast_ref::<UInt64Array>()
3370 .unwrap();
3371 assert_eq!(vid_col.value(0), 1);
3372 assert_eq!(vid_col.value(1), 3);
3373 }
3374
3375 #[test]
3376 fn test_filter_l0_tombstones_none() {
3377 use crate::query::df_graph::L0Context;
3378
3379 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3380 let l0_ctx = L0Context::default();
3381
3382 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3383 assert_eq!(result.num_rows(), 3);
3384 }
3385
3386 #[test]
3387 fn test_map_to_output_schema_basic() {
3388 use crate::query::df_graph::L0Context;
3389
3390 let lance_schema = Arc::new(Schema::new(vec![
3392 Field::new("_vid", DataType::UInt64, false),
3393 Field::new("_deleted", DataType::Boolean, false),
3394 Field::new("_version", DataType::UInt64, false),
3395 Field::new("name", DataType::Utf8, true),
3396 ]));
3397 let name_arr: arrow_array::StringArray =
3398 vec![Some("Alice"), Some("Bob")].into_iter().collect();
3399 let batch = RecordBatch::try_new(
3400 lance_schema,
3401 vec![
3402 Arc::new(UInt64Array::from(vec![1u64, 2])),
3403 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3404 Arc::new(UInt64Array::from(vec![1u64, 1])),
3405 Arc::new(name_arr),
3406 ],
3407 )
3408 .unwrap();
3409
3410 let output_schema = Arc::new(Schema::new(vec![
3412 Field::new("n._vid", DataType::UInt64, false),
3413 Field::new("n._labels", labels_data_type(), true),
3414 Field::new("n.name", DataType::Utf8, true),
3415 ]));
3416
3417 let l0_ctx = L0Context::default();
3418 let result = map_to_output_schema(
3419 &batch,
3420 "Person",
3421 "n",
3422 &["name".to_string()],
3423 &output_schema,
3424 &l0_ctx,
3425 )
3426 .unwrap();
3427
3428 assert_eq!(result.num_rows(), 2);
3429 assert_eq!(result.schema().fields().len(), 3);
3430 assert_eq!(result.schema().field(0).name(), "n._vid");
3431 assert_eq!(result.schema().field(1).name(), "n._labels");
3432 assert_eq!(result.schema().field(2).name(), "n.name");
3433
3434 let name_col = result
3436 .column(2)
3437 .as_any()
3438 .downcast_ref::<arrow_array::StringArray>()
3439 .unwrap();
3440 assert_eq!(name_col.value(0), "Alice");
3441 assert_eq!(name_col.value(1), "Bob");
3442 }
3443}