1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
92
93 vid_list_filter: Option<Vec<u64>>,
98
99 extra_lance_filter: Option<String>,
105
106 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112 is_edge_scan: bool,
114
115 is_schemaless: bool,
117
118 schema: SchemaRef,
120
121 properties: Arc<PlanProperties>,
123
124 metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("GraphScanExec")
131 .field("label", &self.label)
132 .field("variable", &self.variable)
133 .field("projected_properties", &self.projected_properties)
134 .field(
135 "vid_list_filter_len",
136 &self.vid_list_filter.as_ref().map(Vec::len),
137 )
138 .field("is_edge_scan", &self.is_edge_scan)
139 .finish()
140 }
141}
142
143impl GraphScanExec {
144 pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149 self.vid_list_filter = Some(vids);
150 self
151 }
152
153 pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157 self.extra_lance_filter = Some(filter);
158 self
159 }
160
161 pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165 self.extra_runtime_filter = Some(filter);
166 self
167 }
168
169 pub fn has_extra_lance_filter(&self) -> bool {
172 self.extra_lance_filter.is_some()
173 }
174
175 pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185 if self.is_edge_scan {
186 return Err(datafusion::error::DataFusionError::Plan(
187 "execute_with_vid_filter not supported for edge scans".into(),
188 ));
189 }
190 if self.is_schemaless {
191 columnar_scan_schemaless_vertex_batch_static(
192 &self.graph_ctx,
193 &self.label,
194 &self.variable,
195 &self.projected_properties,
196 &self.schema,
197 &self.filter,
198 Some(vids),
199 self.extra_lance_filter.as_deref(),
200 self.extra_runtime_filter.as_ref(),
201 )
202 .await
203 } else {
204 columnar_scan_vertex_batch_static(
205 &self.graph_ctx,
206 &self.label,
207 &self.variable,
208 &self.projected_properties,
209 &self.schema,
210 &self.filter,
211 Some(vids),
212 self.extra_lance_filter.as_deref(),
213 self.extra_runtime_filter.as_ref(),
214 )
215 .await
216 }
217 }
218}
219
220impl GraphScanExec {
221 pub fn new_vertex_scan(
226 graph_ctx: Arc<GraphExecutionContext>,
227 label: impl Into<String>,
228 variable: impl Into<String>,
229 projected_properties: Vec<String>,
230 filter: Option<Arc<dyn PhysicalExpr>>,
231 ) -> Self {
232 let label = label.into();
233 let variable = variable.into();
234
235 let uni_schema = graph_ctx.storage().schema_manager().schema();
237 let schema =
238 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240 let properties = compute_plan_properties(schema.clone());
241
242 Self {
243 graph_ctx,
244 label,
245 variable,
246 projected_properties,
247 filter,
248 vid_list_filter: None,
249 extra_lance_filter: None,
250 extra_runtime_filter: None,
251 is_edge_scan: false,
252 is_schemaless: false,
253 schema,
254 properties,
255 metrics: ExecutionPlanMetricsSet::new(),
256 }
257 }
258
259 pub fn new_schemaless_vertex_scan(
265 graph_ctx: Arc<GraphExecutionContext>,
266 label_name: impl Into<String>,
267 variable: impl Into<String>,
268 projected_properties: Vec<String>,
269 filter: Option<Arc<dyn PhysicalExpr>>,
270 ) -> Self {
271 let label = label_name.into();
272 let variable = variable.into();
273
274 let projected_properties: Vec<String> = projected_properties
279 .into_iter()
280 .filter(|p| p != "_vid" && p != "_labels")
281 .collect();
282
283 let uni_schema = graph_ctx.storage().schema_manager().schema();
284 let schema =
285 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286 let properties = compute_plan_properties(schema.clone());
287
288 Self {
289 graph_ctx,
290 label,
291 variable,
292 projected_properties,
293 filter,
294 vid_list_filter: None,
295 extra_lance_filter: None,
296 extra_runtime_filter: None,
297 is_edge_scan: false,
298 is_schemaless: true,
299 schema,
300 properties,
301 metrics: ExecutionPlanMetricsSet::new(),
302 }
303 }
304
305 pub fn new_multi_label_vertex_scan(
310 graph_ctx: Arc<GraphExecutionContext>,
311 labels: Vec<String>,
312 variable: impl Into<String>,
313 projected_properties: Vec<String>,
314 filter: Option<Arc<dyn PhysicalExpr>>,
315 ) -> Self {
316 let variable = variable.into();
317 let projected_properties: Vec<String> = projected_properties
318 .into_iter()
319 .filter(|p| p != "_vid" && p != "_labels")
320 .collect();
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
322 let schema =
323 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324 let properties = compute_plan_properties(schema.clone());
325
326 let encoded_labels = labels.join(":");
328
329 Self {
330 graph_ctx,
331 label: encoded_labels,
332 variable,
333 projected_properties,
334 filter,
335 vid_list_filter: None,
336 extra_lance_filter: None,
337 extra_runtime_filter: None,
338 is_edge_scan: false,
339 is_schemaless: true,
340 schema,
341 properties,
342 metrics: ExecutionPlanMetricsSet::new(),
343 }
344 }
345
346 pub fn new_schemaless_all_scan(
352 graph_ctx: Arc<GraphExecutionContext>,
353 variable: impl Into<String>,
354 projected_properties: Vec<String>,
355 filter: Option<Arc<dyn PhysicalExpr>>,
356 ) -> Self {
357 let variable = variable.into();
358 let projected_properties: Vec<String> = projected_properties
359 .into_iter()
360 .filter(|p| p != "_vid" && p != "_labels")
361 .collect();
362
363 let uni_schema = graph_ctx.storage().schema_manager().schema();
364 let schema =
365 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366 let properties = compute_plan_properties(schema.clone());
367
368 Self {
369 graph_ctx,
370 label: String::new(), variable,
372 projected_properties,
373 filter,
374 vid_list_filter: None,
375 extra_lance_filter: None,
376 extra_runtime_filter: None,
377 is_edge_scan: false,
378 is_schemaless: true,
379 schema,
380 properties,
381 metrics: ExecutionPlanMetricsSet::new(),
382 }
383 }
384
385 fn build_schemaless_vertex_schema(
391 variable: &str,
392 properties: &[String],
393 uni_schema: &uni_common::core::schema::Schema,
394 ) -> SchemaRef {
395 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397 std::collections::HashMap::new();
398 for label_props in uni_schema.properties.values() {
399 for (name, meta) in label_props {
400 merged.entry(name.as_str()).or_insert(meta);
401 }
402 }
403
404 let mut fields = vec![
405 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406 Field::new(format!("{}._labels", variable), labels_data_type(), true),
407 ];
408
409 for prop in properties {
410 let col_name = format!("{}.{}", variable, prop);
411 let uni_type = merged.get(prop.as_str()).map(|meta| &meta.r#type);
412 let arrow_type = uni_type
413 .map(|t| t.to_arrow())
414 .unwrap_or(DataType::LargeBinary);
415 fields.push(property_field(&col_name, arrow_type, uni_type));
416 }
417
418 Arc::new(Schema::new(fields))
419 }
420
421 pub fn new_edge_scan(
426 graph_ctx: Arc<GraphExecutionContext>,
427 edge_type: impl Into<String>,
428 variable: impl Into<String>,
429 projected_properties: Vec<String>,
430 filter: Option<Arc<dyn PhysicalExpr>>,
431 ) -> Self {
432 let label = edge_type.into();
433 let variable = variable.into();
434
435 let uni_schema = graph_ctx.storage().schema_manager().schema();
437 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439 let properties = compute_plan_properties(schema.clone());
440
441 Self {
442 graph_ctx,
443 label,
444 variable,
445 projected_properties,
446 filter,
447 vid_list_filter: None,
448 extra_lance_filter: None,
449 extra_runtime_filter: None,
450 is_edge_scan: true,
451 is_schemaless: false,
452 schema,
453 properties,
454 metrics: ExecutionPlanMetricsSet::new(),
455 }
456 }
457
458 fn build_vertex_schema(
460 variable: &str,
461 label: &str,
462 properties: &[String],
463 uni_schema: &UniSchema,
464 ) -> SchemaRef {
465 let mut fields = vec![
466 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467 Field::new(format!("{}._labels", variable), labels_data_type(), true),
468 ];
469 let label_props = uni_schema.properties.get(label);
470 for prop in properties {
471 let col_name = format!("{}.{}", variable, prop);
472 let arrow_type = resolve_property_type(prop, label_props);
473 let uni_type = label_props
474 .and_then(|props| props.get(prop))
475 .map(|m| &m.r#type);
476 fields.push(property_field(&col_name, arrow_type, uni_type));
477 }
478 Arc::new(Schema::new(fields))
479 }
480
481 fn build_edge_schema(
483 variable: &str,
484 edge_type: &str,
485 properties: &[String],
486 uni_schema: &UniSchema,
487 ) -> SchemaRef {
488 let mut fields = vec![
489 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
490 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
491 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
492 ];
493 let edge_props = uni_schema.properties.get(edge_type);
494 for prop in properties {
495 let col_name = format!("{}.{}", variable, prop);
496 let arrow_type = resolve_property_type(prop, edge_props);
497 let uni_type = edge_props
498 .and_then(|props| props.get(prop))
499 .map(|m| &m.r#type);
500 fields.push(property_field(&col_name, arrow_type, uni_type));
501 }
502 Arc::new(Schema::new(fields))
503 }
504}
505
506impl DisplayAs for GraphScanExec {
507 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
509 write!(
510 f,
511 "GraphScanExec: {}={}, properties={:?}",
512 scan_type, self.label, self.projected_properties
513 )?;
514 if self.filter.is_some() {
515 write!(f, ", filter=<pushed>")?;
516 }
517 Ok(())
518 }
519}
520
521impl ExecutionPlan for GraphScanExec {
522 fn name(&self) -> &str {
523 "GraphScanExec"
524 }
525
526 fn as_any(&self) -> &dyn Any {
527 self
528 }
529
530 fn schema(&self) -> SchemaRef {
531 self.schema.clone()
532 }
533
534 fn properties(&self) -> &Arc<PlanProperties> {
535 &self.properties
536 }
537
538 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
539 vec![]
540 }
541
542 fn with_new_children(
543 self: Arc<Self>,
544 children: Vec<Arc<dyn ExecutionPlan>>,
545 ) -> DFResult<Arc<dyn ExecutionPlan>> {
546 if children.is_empty() {
547 Ok(self)
548 } else {
549 Err(datafusion::error::DataFusionError::Plan(
550 "GraphScanExec does not accept children".to_string(),
551 ))
552 }
553 }
554
555 fn execute(
556 &self,
557 partition: usize,
558 _context: Arc<TaskContext>,
559 ) -> DFResult<SendableRecordBatchStream> {
560 let metrics = BaselineMetrics::new(&self.metrics, partition);
561
562 Ok(Box::pin(GraphScanStream::new(
563 self.graph_ctx.clone(),
564 self.label.clone(),
565 self.variable.clone(),
566 self.projected_properties.clone(),
567 self.is_edge_scan,
568 self.is_schemaless,
569 self.filter.clone(),
570 self.vid_list_filter.clone(),
571 self.extra_lance_filter.clone(),
572 self.extra_runtime_filter.clone(),
573 self.schema.clone(),
574 metrics,
575 )))
576 }
577
578 fn metrics(&self) -> Option<MetricsSet> {
579 Some(self.metrics.clone_inner())
580 }
581}
582
583enum GraphScanState {
585 Init,
587 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
589 Done,
591}
592
593struct GraphScanStream {
599 graph_ctx: Arc<GraphExecutionContext>,
601
602 label: String,
604
605 variable: String,
607
608 properties: Vec<String>,
610
611 is_edge_scan: bool,
613
614 is_schemaless: bool,
616
617 filter: Option<Arc<dyn PhysicalExpr>>,
619
620 vid_list_filter: Option<Vec<u64>>,
622
623 extra_lance_filter: Option<String>,
626
627 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
629
630 schema: SchemaRef,
632
633 state: GraphScanState,
635
636 metrics: BaselineMetrics,
638}
639
640impl GraphScanStream {
641 #[expect(clippy::too_many_arguments)]
643 fn new(
644 graph_ctx: Arc<GraphExecutionContext>,
645 label: String,
646 variable: String,
647 properties: Vec<String>,
648 is_edge_scan: bool,
649 is_schemaless: bool,
650 filter: Option<Arc<dyn PhysicalExpr>>,
651 vid_list_filter: Option<Vec<u64>>,
652 extra_lance_filter: Option<String>,
653 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
654 schema: SchemaRef,
655 metrics: BaselineMetrics,
656 ) -> Self {
657 Self {
658 graph_ctx,
659 label,
660 variable,
661 properties,
662 is_edge_scan,
663 is_schemaless,
664 filter,
665 vid_list_filter,
666 extra_lance_filter,
667 extra_runtime_filter,
668 schema,
669 state: GraphScanState::Init,
670 metrics,
671 }
672 }
673}
674
675pub(crate) fn resolve_property_type(
680 prop: &str,
681 schema_props: Option<
682 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
683 >,
684) -> DataType {
685 if prop == "overflow_json" {
686 DataType::LargeBinary
687 } else if prop == "_created_at" || prop == "_updated_at" {
688 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
692 } else {
693 schema_props
694 .and_then(|props| props.get(prop))
695 .map(|meta| meta.r#type.to_arrow())
696 .unwrap_or(DataType::LargeBinary)
697 }
698}
699
700pub(crate) fn property_field(
709 col_name: &str,
710 arrow_type: DataType,
711 uni_type: Option<&uni_common::DataType>,
712) -> Field {
713 let field = Field::new(col_name, arrow_type, true);
714 if matches!(uni_type, Some(uni_common::DataType::Bytes)) {
715 field.with_metadata(std::collections::HashMap::from([(
716 "uni_raw_bytes".to_string(),
717 "true".to_string(),
718 )]))
719 } else {
720 field
721 }
722}
723
724#[cfg(test)]
733fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
734 mvcc_dedup_batch_by(batch, "_vid")
735}
736
737fn mvcc_dedup_to_option(
742 batch: Option<RecordBatch>,
743 id_column: &str,
744) -> DFResult<Option<RecordBatch>> {
745 match batch {
746 Some(b) => {
747 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
748 Ok(if deduped.num_rows() > 0 {
749 Some(deduped)
750 } else {
751 None
752 })
753 }
754 None => Ok(None),
755 }
756}
757
758fn merge_lance_and_l0(
762 lance_deduped: Option<RecordBatch>,
763 l0_batch: RecordBatch,
764 internal_schema: &SchemaRef,
765 id_column: &str,
766) -> DFResult<Option<RecordBatch>> {
767 let has_l0 = l0_batch.num_rows() > 0;
768 match (lance_deduped, has_l0) {
769 (Some(lance), true) => {
770 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
771 .map_err(arrow_err)?;
772 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
773 }
774 (Some(lance), false) => Ok(Some(lance)),
775 (None, true) => Ok(Some(l0_batch)),
776 (None, false) => Ok(None),
777 }
778}
779
780async fn drop_superseded_pushdown_rows(
793 storage: &Arc<uni_store::storage::manager::StorageManager>,
794 label_table: Option<&str>,
795 batch: RecordBatch,
796) -> DFResult<RecordBatch> {
797 if batch.num_rows() == 0 {
798 return Ok(batch);
799 }
800 let (Some(vid_col), Some(ver_col)) = (
801 batch
802 .column_by_name("_vid")
803 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
804 batch
805 .column_by_name("_version")
806 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
807 ) else {
808 return Err(datafusion::error::DataFusionError::Execution(
809 "pushdown version verification: scan batch missing _vid/_version".to_string(),
810 ));
811 };
812
813 let mut candidates: Vec<u64> = Vec::new();
814 let mut seen: HashSet<u64> = HashSet::new();
815 for i in 0..vid_col.len() {
816 let vid = vid_col.value(i);
817 if seen.insert(vid) {
818 candidates.push(vid);
819 }
820 }
821
822 const VERIFY_CHUNK: usize = 1000;
826 let mut max_ver: HashMap<u64, u64> = HashMap::with_capacity(candidates.len());
827 for chunk in candidates.chunks(VERIFY_CHUNK) {
828 let filter = format_vid_in_list(chunk);
829 let scanned = match label_table {
830 Some(label) => {
831 storage
832 .scan_vertex_table(label, &["_vid", "_version"], Some(&filter))
833 .await
834 }
835 None => {
836 storage
837 .scan_main_vertex_table(&["_vid", "_version"], Some(&filter))
838 .await
839 }
840 }
841 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
842 let Some(vbatch) = scanned else { continue };
843 let (Some(v_vid), Some(v_ver)) = (
844 vbatch
845 .column_by_name("_vid")
846 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
847 vbatch
848 .column_by_name("_version")
849 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
850 ) else {
851 return Err(datafusion::error::DataFusionError::Execution(
852 "pushdown version verification: rescan missing _vid/_version".to_string(),
853 ));
854 };
855 for i in 0..v_vid.len() {
856 let entry = max_ver.entry(v_vid.value(i)).or_insert(0);
857 *entry = (*entry).max(v_ver.value(i));
858 }
859 }
860
861 let keep: arrow_array::BooleanArray = (0..batch.num_rows())
862 .map(|i| {
863 Some(
864 max_ver
865 .get(&vid_col.value(i))
866 .is_none_or(|&max| ver_col.value(i) >= max),
867 )
868 })
869 .collect();
870 arrow::compute::filter_record_batch(&batch, &keep).map_err(arrow_err)
871}
872
873fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
878 if !columns.iter().any(|c| c == col_name) {
879 columns.push(col_name.to_string());
880 }
881}
882
883fn extract_from_overflow_blob(
888 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
889 row: usize,
890 prop: &str,
891) -> Option<Vec<u8>> {
892 let arr = overflow_arr?;
893 if arr.is_null(row) {
894 return None;
895 }
896 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
897}
898
899fn build_overflow_property_column(
906 num_rows: usize,
907 vid_arr: &UInt64Array,
908 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
909 prop: &str,
910 l0_ctx: &crate::query::df_graph::L0Context,
911) -> ArrayRef {
912 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
913 for i in 0..num_rows {
914 let vid = Vid::from(vid_arr.value(i));
915
916 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
918
919 if let Some(val_opt) = l0_val {
920 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
921 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
922 builder.append_value(&bytes);
923 } else {
924 builder.append_null();
925 }
926 }
927 Arc::new(builder.finish())
928}
929
930fn resolve_l0_property(
936 vid: &Vid,
937 prop: &str,
938 l0_ctx: &crate::query::df_graph::L0Context,
939) -> Option<Option<Value>> {
940 let mut result = None;
941 for l0 in l0_ctx.iter_l0_buffers() {
942 let guard = l0.read();
943 if let Some(props) = guard.vertex_properties.get(vid)
944 && let Some(val) = props.get(prop)
945 {
946 result = Some(Some(val.clone()));
947 }
948 }
949 result
950}
951
952fn append_value_as_cypher_binary(
957 builder: &mut arrow_array::builder::LargeBinaryBuilder,
958 val: Option<&Value>,
959) {
960 match val {
961 Some(v) if !v.is_null() => {
962 builder.append_value(uni_common::cypher_value_codec::encode(v));
963 }
964 _ => builder.append_null(),
965 }
966}
967
968fn build_all_props_column_with_l0_overlay(
976 num_rows: usize,
977 vid_arr: &UInt64Array,
978 props_arr: Option<&arrow_array::LargeBinaryArray>,
979 l0_ctx: &crate::query::df_graph::L0Context,
980) -> ArrayRef {
981 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
982 for i in 0..num_rows {
983 let vid = Vid::from(vid_arr.value(i));
984
985 let mut merged_props: HashMap<String, Value> = HashMap::new();
988 if let Some(arr) = props_arr
989 && !arr.is_null(i)
990 && let Ok(uni_common::Value::Map(map)) =
991 uni_common::cypher_value_codec::decode(arr.value(i))
992 {
993 merged_props.extend(map);
994 }
995
996 for l0 in l0_ctx.iter_l0_buffers() {
998 let guard = l0.read();
999 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1000 for (k, v) in l0_props {
1001 merged_props.insert(k.clone(), v.clone());
1002 }
1003 }
1004 }
1005
1006 if merged_props.is_empty() {
1008 builder.append_null();
1009 } else {
1010 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1011 merged_props,
1012 )));
1013 }
1014 }
1015 Arc::new(builder.finish())
1016}
1017
1018fn build_all_props_column_for_schema_scan(
1023 batch: &RecordBatch,
1024 vid_arr: &UInt64Array,
1025 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
1026 projected_properties: &[String],
1027 l0_ctx: &crate::query::df_graph::L0Context,
1028) -> ArrayRef {
1029 let schema_props: Vec<&str> = projected_properties
1031 .iter()
1032 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
1033 .map(String::as_str)
1034 .collect();
1035
1036 let num_rows = batch.num_rows();
1037 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1038 for i in 0..num_rows {
1039 let vid = Vid::from(vid_arr.value(i));
1040 let mut merged_props: HashMap<String, Value> = HashMap::new();
1043
1044 for &prop in &schema_props {
1046 if let Some(col) = batch.column_by_name(prop) {
1047 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
1048 if !val.is_null() {
1049 merged_props.insert(prop.to_string(), val);
1050 }
1051 }
1052 }
1053
1054 if let Some(arr) = overflow_arr
1056 && !arr.is_null(i)
1057 && let Ok(uni_common::Value::Map(map)) =
1058 uni_common::cypher_value_codec::decode(arr.value(i))
1059 {
1060 merged_props.extend(map);
1061 }
1062
1063 for l0 in l0_ctx.iter_l0_buffers() {
1065 let guard = l0.read();
1066 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1067 for (k, v) in l0_props {
1068 merged_props.insert(k.clone(), v.clone());
1069 }
1070 }
1071 }
1072
1073 if merged_props.is_empty() {
1074 builder.append_null();
1075 } else {
1076 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1077 merged_props,
1078 )));
1079 }
1080 }
1081 Arc::new(builder.finish())
1082}
1083
1084fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
1090 if batch.num_rows() == 0 {
1091 return Ok(batch.clone());
1092 }
1093
1094 let id_col = batch
1095 .column_by_name(id_column)
1096 .ok_or_else(|| {
1097 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
1098 })?
1099 .clone();
1100 let version_col = batch
1101 .column_by_name("_version")
1102 .ok_or_else(|| {
1103 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
1104 })?
1105 .clone();
1106
1107 let sort_columns = vec![
1109 arrow::compute::SortColumn {
1110 values: id_col,
1111 options: Some(arrow::compute::SortOptions {
1112 descending: false,
1113 nulls_first: false,
1114 }),
1115 },
1116 arrow::compute::SortColumn {
1117 values: version_col,
1118 options: Some(arrow::compute::SortOptions {
1119 descending: true,
1120 nulls_first: false,
1121 }),
1122 },
1123 ];
1124 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1125
1126 let sorted_columns: Vec<ArrayRef> = batch
1128 .columns()
1129 .iter()
1130 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1131 .collect::<Result<_, _>>()
1132 .map_err(arrow_err)?;
1133 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1134
1135 let sorted_id = sorted
1137 .column_by_name(id_column)
1138 .unwrap()
1139 .as_any()
1140 .downcast_ref::<UInt64Array>()
1141 .unwrap();
1142
1143 let mut keep = vec![false; sorted.num_rows()];
1144 if !keep.is_empty() {
1145 keep[0] = true;
1146 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1147 if sorted_id.value(i) != sorted_id.value(i - 1) {
1148 *flag = true;
1149 }
1150 }
1151 }
1152
1153 let mask = arrow_array::BooleanArray::from(keep);
1154 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1155}
1156
1157fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1159 if batch.num_rows() == 0 {
1160 return Ok(batch.clone());
1161 }
1162 let op_col = match batch.column_by_name("op") {
1163 Some(col) => col
1164 .as_any()
1165 .downcast_ref::<arrow_array::UInt8Array>()
1166 .unwrap(),
1167 None => return Ok(batch.clone()),
1168 };
1169 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1170 let mask = arrow_array::BooleanArray::from(keep);
1171 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1172}
1173
1174fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1176 if batch.num_rows() == 0 {
1177 return Ok(batch.clone());
1178 }
1179 let deleted_col = match batch.column_by_name("_deleted") {
1180 Some(col) => col
1181 .as_any()
1182 .downcast_ref::<arrow_array::BooleanArray>()
1183 .unwrap(),
1184 None => return Ok(batch.clone()),
1185 };
1186 let keep: Vec<bool> = (0..deleted_col.len())
1187 .map(|i| !deleted_col.value(i))
1188 .collect();
1189 let mask = arrow_array::BooleanArray::from(keep);
1190 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1191}
1192
1193fn filter_l0_tombstones(
1195 batch: &RecordBatch,
1196 l0_ctx: &crate::query::df_graph::L0Context,
1197) -> DFResult<RecordBatch> {
1198 if batch.num_rows() == 0 {
1199 return Ok(batch.clone());
1200 }
1201
1202 let mut tombstones: HashSet<u64> = HashSet::new();
1203 for l0 in l0_ctx.iter_l0_buffers() {
1204 let guard = l0.read();
1205 for vid in guard.vertex_tombstones.iter() {
1206 tombstones.insert(vid.as_u64());
1207 }
1208 }
1209
1210 if tombstones.is_empty() {
1211 return Ok(batch.clone());
1212 }
1213
1214 let vid_col = batch
1215 .column_by_name("_vid")
1216 .ok_or_else(|| {
1217 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1218 })?
1219 .as_any()
1220 .downcast_ref::<UInt64Array>()
1221 .unwrap();
1222
1223 let keep: Vec<bool> = (0..vid_col.len())
1224 .map(|i| !tombstones.contains(&vid_col.value(i)))
1225 .collect();
1226 let mask = arrow_array::BooleanArray::from(keep);
1227 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1228}
1229
1230fn filter_l0_edge_tombstones(
1232 batch: &RecordBatch,
1233 l0_ctx: &crate::query::df_graph::L0Context,
1234) -> DFResult<RecordBatch> {
1235 if batch.num_rows() == 0 {
1236 return Ok(batch.clone());
1237 }
1238
1239 let mut tombstones: HashSet<u64> = HashSet::new();
1240 for l0 in l0_ctx.iter_l0_buffers() {
1241 let guard = l0.read();
1242 for eid in guard.tombstones.keys() {
1243 tombstones.insert(eid.as_u64());
1244 }
1245 }
1246
1247 if tombstones.is_empty() {
1248 return Ok(batch.clone());
1249 }
1250
1251 let eid_col = batch
1252 .column_by_name("eid")
1253 .ok_or_else(|| {
1254 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1255 })?
1256 .as_any()
1257 .downcast_ref::<UInt64Array>()
1258 .unwrap();
1259
1260 let keep: Vec<bool> = (0..eid_col.len())
1261 .map(|i| !tombstones.contains(&eid_col.value(i)))
1262 .collect();
1263 let mask = arrow_array::BooleanArray::from(keep);
1264 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1265}
1266
1267fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1276 use datafusion::logical_expr::Operator;
1277 use datafusion::physical_expr::expressions::BinaryExpr;
1278
1279 if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1281 if bin.op() == &Operator::Eq {
1282 if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1284 return Some(vid);
1285 }
1286 if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1287 return Some(vid);
1288 }
1289 }
1290 if bin.op() == &Operator::And {
1292 if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1293 return Some(vid);
1294 }
1295 return extract_vid_from_physical_filter(bin.right());
1296 }
1297 }
1298 None
1299}
1300
1301fn try_extract_vid_eq(
1305 col_side: &Arc<dyn PhysicalExpr>,
1306 val_side: &Arc<dyn PhysicalExpr>,
1307) -> Option<u64> {
1308 use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1309
1310 let col = col_side.as_any().downcast_ref::<Column>()?;
1312 if col.name() != "_vid" && !col.name().ends_with("._vid") {
1313 return None;
1314 }
1315
1316 if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1318 return scalar_to_u64(lit.value());
1319 }
1320
1321 if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1323 && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1324 {
1325 return scalar_to_u64(lit.value());
1326 }
1327
1328 None
1329}
1330
1331fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1334 match (vid_filter, extra) {
1335 (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1336 (Some(a), None) => Some(a.to_string()),
1337 (None, Some(b)) => Some(b.to_string()),
1338 (None, None) => None,
1339 }
1340}
1341
1342fn format_vid_in_list(vids: &[u64]) -> String {
1345 use std::fmt::Write;
1346 debug_assert!(!vids.is_empty());
1347 let mut s = String::with_capacity(vids.len() * 8 + 12);
1348 s.push_str("_vid IN (");
1349 for (i, v) in vids.iter().enumerate() {
1350 if i > 0 {
1351 s.push(',');
1352 }
1353 let _ = write!(s, "{v}");
1354 }
1355 s.push(')');
1356 s
1357}
1358
1359fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1361 use datafusion::common::ScalarValue;
1362 match sv {
1363 ScalarValue::UInt64(Some(v)) => Some(*v),
1364 ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1365 ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1366 ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1367 _ => None,
1368 }
1369}
1370
1371fn build_l0_vertex_batch(
1382 l0_ctx: &crate::query::df_graph::L0Context,
1383 label: &str,
1384 lance_schema: &SchemaRef,
1385 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1386 target_vids: Option<&[u64]>,
1387) -> DFResult<RecordBatch> {
1388 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1391 let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1397 let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1398
1399 for l0 in l0_ctx.iter_l0_buffers() {
1400 let guard = l0.read();
1401 for vid in guard.vertex_tombstones.iter() {
1403 tombstones.insert(vid.as_u64());
1404 }
1405 let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1411 let mut out = Vec::with_capacity(tvs.len());
1412 for &tv in tvs {
1413 let vid = Vid::from(tv);
1414 if guard.vertex_properties.contains_key(&vid)
1415 && (label.is_empty()
1416 || guard
1417 .label_to_vids
1418 .get(label)
1419 .is_some_and(|s| s.contains(&vid)))
1420 {
1421 out.push(vid);
1422 }
1423 }
1424 out
1425 } else {
1426 guard.vids_for_label(label)
1427 };
1428 for vid in candidate_vids {
1429 let vid_u64 = vid.as_u64();
1430 if tombstones.contains(&vid_u64) {
1431 continue;
1432 }
1433 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1434 let entry = vid_data
1435 .entry(vid_u64)
1436 .or_insert_with(|| (Properties::new(), 0));
1437 if let Some(props) = guard.vertex_properties.get(&vid) {
1439 for (k, v) in props {
1440 entry.0.insert(k.clone(), v.clone());
1441 }
1442 }
1443 if version > entry.1 {
1445 entry.1 = version;
1446 }
1447 if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1449 vid_created_at
1450 .entry(vid_u64)
1451 .and_modify(|cur| {
1452 if ts < *cur {
1453 *cur = ts;
1454 }
1455 })
1456 .or_insert(ts);
1457 }
1458 if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1459 vid_updated_at
1460 .entry(vid_u64)
1461 .and_modify(|cur| {
1462 if ts > *cur {
1463 *cur = ts;
1464 }
1465 })
1466 .or_insert(ts);
1467 }
1468 }
1469 }
1470
1471 for t in &tombstones {
1473 vid_data.remove(t);
1474 }
1475
1476 if vid_data.is_empty() {
1477 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1478 }
1479
1480 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1482 vids.sort_unstable();
1483
1484 let num_rows = vids.len();
1485 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1486
1487 let schema_prop_names: HashSet<&str> = label_props
1489 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1490 .unwrap_or_default();
1491
1492 for field in lance_schema.fields() {
1493 let col_name = field.name().as_str();
1494 match col_name {
1495 "_vid" => {
1496 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1497 }
1498 "_deleted" => {
1499 let vals = vec![false; num_rows];
1501 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1502 }
1503 "_version" => {
1504 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1505 columns.push(Arc::new(UInt64Array::from(vals)));
1506 }
1507 "_created_at" => {
1508 let mut builder =
1509 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1510 for v in &vids {
1511 match vid_created_at.get(v) {
1512 Some(&ts) => builder.append_value(ts),
1513 None => builder.append_null(),
1514 }
1515 }
1516 columns.push(Arc::new(builder.finish()));
1517 }
1518 "_updated_at" => {
1519 let mut builder =
1520 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1521 for v in &vids {
1522 match vid_updated_at.get(v) {
1523 Some(&ts) => builder.append_value(ts),
1524 None => builder.append_null(),
1525 }
1526 }
1527 columns.push(Arc::new(builder.finish()));
1528 }
1529 "overflow_json" => {
1530 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1532 for vid_u64 in &vids {
1533 let (props, _) = &vid_data[vid_u64];
1534 let mut overflow: HashMap<String, Value> = HashMap::new();
1535 for (k, v) in props {
1536 if k == "ext_id" || k.starts_with('_') {
1537 continue;
1538 }
1539 if !schema_prop_names.contains(k.as_str()) {
1540 overflow.insert(k.clone(), v.clone());
1541 }
1542 }
1543 if overflow.is_empty() {
1544 builder.append_null();
1545 } else {
1546 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1547 overflow,
1548 )));
1549 }
1550 }
1551 columns.push(Arc::new(builder.finish()));
1552 }
1553 _ => {
1554 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1556 columns.push(col);
1557 }
1558 }
1559 }
1560
1561 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1562}
1563
1564fn build_l0_property_column(
1568 vids: &[u64],
1569 vid_data: &HashMap<u64, (Properties, u64)>,
1570 prop_name: &str,
1571 data_type: &DataType,
1572) -> DFResult<ArrayRef> {
1573 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1575 let props_map: HashMap<Vid, Properties> = vid_data
1576 .iter()
1577 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1578 .collect();
1579
1580 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1581}
1582
1583fn build_l0_edge_batch(
1589 l0_ctx: &crate::query::df_graph::L0Context,
1590 edge_type: &str,
1591 internal_schema: &SchemaRef,
1592 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1593) -> DFResult<RecordBatch> {
1594 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1597 let mut tombstones: HashSet<u64> = HashSet::new();
1598 let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1601 let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1602
1603 for l0 in l0_ctx.iter_l0_buffers() {
1604 let guard = l0.read();
1605 for eid in guard.tombstones.keys() {
1607 tombstones.insert(eid.as_u64());
1608 }
1609 for eid in guard.eids_for_type(edge_type) {
1611 let eid_u64 = eid.as_u64();
1612 if tombstones.contains(&eid_u64) {
1613 continue;
1614 }
1615 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1616 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1617 None => continue,
1618 };
1619 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1620 let entry = eid_data
1621 .entry(eid_u64)
1622 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1623 if let Some(props) = guard.edge_properties.get(&eid) {
1625 for (k, v) in props {
1626 entry.2.insert(k.clone(), v.clone());
1627 }
1628 }
1629 entry.0 = src_vid;
1631 entry.1 = dst_vid;
1632 if version > entry.3 {
1634 entry.3 = version;
1635 }
1636 if let Some(&ts) = guard.edge_created_at.get(&eid) {
1638 eid_created_at
1639 .entry(eid_u64)
1640 .and_modify(|cur| {
1641 if ts < *cur {
1642 *cur = ts;
1643 }
1644 })
1645 .or_insert(ts);
1646 }
1647 if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1648 eid_updated_at
1649 .entry(eid_u64)
1650 .and_modify(|cur| {
1651 if ts > *cur {
1652 *cur = ts;
1653 }
1654 })
1655 .or_insert(ts);
1656 }
1657 }
1658 }
1659
1660 for t in &tombstones {
1662 eid_data.remove(t);
1663 }
1664
1665 if eid_data.is_empty() {
1666 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1667 }
1668
1669 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1671 eids.sort_unstable();
1672
1673 let num_rows = eids.len();
1674 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1675
1676 let schema_prop_names: HashSet<&str> = type_props
1678 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1679 .unwrap_or_default();
1680
1681 for field in internal_schema.fields() {
1682 let col_name = field.name().as_str();
1683 match col_name {
1684 "eid" => {
1685 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1686 }
1687 "src_vid" => {
1688 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1689 columns.push(Arc::new(UInt64Array::from(vals)));
1690 }
1691 "dst_vid" => {
1692 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1693 columns.push(Arc::new(UInt64Array::from(vals)));
1694 }
1695 "op" => {
1696 let vals = vec![0u8; num_rows];
1698 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1699 }
1700 "_version" => {
1701 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1702 columns.push(Arc::new(UInt64Array::from(vals)));
1703 }
1704 "_created_at" => {
1705 let mut builder =
1706 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1707 for e in &eids {
1708 match eid_created_at.get(e) {
1709 Some(&ts) => builder.append_value(ts),
1710 None => builder.append_null(),
1711 }
1712 }
1713 columns.push(Arc::new(builder.finish()));
1714 }
1715 "_updated_at" => {
1716 let mut builder =
1717 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1718 for e in &eids {
1719 match eid_updated_at.get(e) {
1720 Some(&ts) => builder.append_value(ts),
1721 None => builder.append_null(),
1722 }
1723 }
1724 columns.push(Arc::new(builder.finish()));
1725 }
1726 "overflow_json" => {
1727 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1729 for eid_u64 in &eids {
1730 let (_, _, props, _) = &eid_data[eid_u64];
1731 let mut overflow: HashMap<String, Value> = HashMap::new();
1732 for (k, v) in props {
1733 if k.starts_with('_') {
1734 continue;
1735 }
1736 if !schema_prop_names.contains(k.as_str()) {
1737 overflow.insert(k.clone(), v.clone());
1738 }
1739 }
1740 if overflow.is_empty() {
1741 builder.append_null();
1742 } else {
1743 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1744 overflow,
1745 )));
1746 }
1747 }
1748 columns.push(Arc::new(builder.finish()));
1749 }
1750 _ => {
1751 let col =
1753 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1754 columns.push(col);
1755 }
1756 }
1757 }
1758
1759 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1760}
1761
1762fn build_l0_edge_property_column(
1766 eids: &[u64],
1767 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1768 prop_name: &str,
1769 data_type: &DataType,
1770) -> DFResult<ArrayRef> {
1771 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1773 let props_map: HashMap<Vid, Properties> = eid_data
1774 .iter()
1775 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1776 .collect();
1777
1778 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1779}
1780
1781fn build_labels_column_for_known_label(
1787 vid_arr: &UInt64Array,
1788 label: &str,
1789 l0_ctx: &crate::query::df_graph::L0Context,
1790 batch_labels_col: Option<&arrow_array::ListArray>,
1791) -> DFResult<ArrayRef> {
1792 use uni_store::storage::arrow_convert::labels_from_list_array;
1793
1794 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1795
1796 for i in 0..vid_arr.len() {
1797 let vid = Vid::from(vid_arr.value(i));
1798
1799 let mut labels = match batch_labels_col {
1801 Some(list_arr) => {
1802 let stored = labels_from_list_array(list_arr, i);
1803 if stored.is_empty() {
1804 vec![label.to_string()]
1805 } else {
1806 stored
1807 }
1808 }
1809 None => vec![label.to_string()],
1810 };
1811
1812 if !labels.iter().any(|l| l == label) {
1814 labels.push(label.to_string());
1815 }
1816
1817 for l0 in l0_ctx.iter_l0_buffers() {
1819 let guard = l0.read();
1820 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1821 for lbl in l0_labels {
1822 if !labels.contains(lbl) {
1823 labels.push(lbl.clone());
1824 }
1825 }
1826 }
1827 }
1828
1829 let values = labels_builder.values();
1830 for lbl in &labels {
1831 values.append_value(lbl);
1832 }
1833 labels_builder.append(true);
1834 }
1835
1836 Ok(Arc::new(labels_builder.finish()))
1837}
1838
1839fn map_to_output_schema(
1845 batch: &RecordBatch,
1846 label: &str,
1847 _variable: &str,
1848 projected_properties: &[String],
1849 output_schema: &SchemaRef,
1850 l0_ctx: &crate::query::df_graph::L0Context,
1851) -> DFResult<RecordBatch> {
1852 if batch.num_rows() == 0 {
1853 return Ok(RecordBatch::new_empty(output_schema.clone()));
1854 }
1855
1856 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1857
1858 let vid_col = batch
1860 .column_by_name("_vid")
1861 .ok_or_else(|| {
1862 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1863 })?
1864 .clone();
1865 let vid_arr = vid_col
1866 .as_any()
1867 .downcast_ref::<UInt64Array>()
1868 .ok_or_else(|| {
1869 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1870 })?;
1871
1872 let batch_labels_col = batch
1874 .column_by_name("_labels")
1875 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1876 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1877 columns.push(vid_col.clone());
1878 columns.push(labels_col);
1879
1880 let overflow_arr = batch
1883 .column_by_name("overflow_json")
1884 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1885
1886 for prop in projected_properties {
1887 if prop == "overflow_json" {
1888 match batch.column_by_name("overflow_json") {
1889 Some(col) => columns.push(col.clone()),
1890 None => {
1891 columns.push(arrow_array::new_null_array(
1893 &DataType::LargeBinary,
1894 batch.num_rows(),
1895 ));
1896 }
1897 }
1898 } else if prop == "_all_props" {
1899 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1903 let guard = l0.read();
1904 !guard.vertex_properties.is_empty()
1905 });
1906 let has_schema_cols = projected_properties
1908 .iter()
1909 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1910
1911 if !any_l0_has_vertex_props && !has_schema_cols {
1912 match batch.column_by_name("overflow_json") {
1914 Some(col) => columns.push(col.clone()),
1915 None => {
1916 columns.push(arrow_array::new_null_array(
1917 &DataType::LargeBinary,
1918 batch.num_rows(),
1919 ));
1920 }
1921 }
1922 } else {
1923 let col = build_all_props_column_for_schema_scan(
1925 batch,
1926 vid_arr,
1927 overflow_arr,
1928 projected_properties,
1929 l0_ctx,
1930 );
1931 columns.push(col);
1932 }
1933 } else {
1934 match batch.column_by_name(prop) {
1935 Some(col) => columns.push(col.clone()),
1936 None => {
1937 let col = build_overflow_property_column(
1940 batch.num_rows(),
1941 vid_arr,
1942 overflow_arr,
1943 prop,
1944 l0_ctx,
1945 );
1946 columns.push(col);
1947 }
1948 }
1949 }
1950 }
1951
1952 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1953}
1954
1955fn map_edge_to_output_schema(
1962 batch: &RecordBatch,
1963 variable: &str,
1964 projected_properties: &[String],
1965 output_schema: &SchemaRef,
1966) -> DFResult<RecordBatch> {
1967 if batch.num_rows() == 0 {
1968 return Ok(RecordBatch::new_empty(output_schema.clone()));
1969 }
1970
1971 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1972
1973 let eid_col = batch
1975 .column_by_name("eid")
1976 .ok_or_else(|| {
1977 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1978 })?
1979 .clone();
1980 columns.push(eid_col);
1981
1982 let src_col = batch
1984 .column_by_name("src_vid")
1985 .ok_or_else(|| {
1986 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1987 })?
1988 .clone();
1989 columns.push(src_col);
1990
1991 let dst_col = batch
1993 .column_by_name("dst_vid")
1994 .ok_or_else(|| {
1995 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1996 })?
1997 .clone();
1998 columns.push(dst_col);
1999
2000 for prop in projected_properties {
2002 if prop == "overflow_json" {
2003 match batch.column_by_name("overflow_json") {
2004 Some(col) => columns.push(col.clone()),
2005 None => {
2006 columns.push(arrow_array::new_null_array(
2007 &DataType::LargeBinary,
2008 batch.num_rows(),
2009 ));
2010 }
2011 }
2012 } else {
2013 match batch.column_by_name(prop) {
2014 Some(col) => columns.push(col.clone()),
2015 None => {
2016 let overflow_arr = batch
2019 .column_by_name("overflow_json")
2020 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2021
2022 if let Some(arr) = overflow_arr {
2023 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2024 for i in 0..batch.num_rows() {
2025 if !arr.is_null(i) {
2026 let blob = arr.value(i);
2027 if let Some(sub_bytes) =
2029 uni_common::cypher_value_codec::extract_map_entry_raw(
2030 blob, prop,
2031 )
2032 {
2033 builder.append_value(&sub_bytes);
2034 } else {
2035 builder.append_null();
2036 }
2037 } else {
2038 builder.append_null();
2039 }
2040 }
2041 columns.push(Arc::new(builder.finish()));
2042 } else {
2043 let target_field = output_schema
2045 .fields()
2046 .iter()
2047 .find(|f| f.name() == &format!("{}.{}", variable, prop));
2048 let dt = target_field
2049 .map(|f| f.data_type().clone())
2050 .unwrap_or(DataType::LargeBinary);
2051 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
2052 }
2053 }
2054 }
2055 }
2056 }
2057
2058 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2059}
2060
2061#[expect(clippy::too_many_arguments)]
2068async fn columnar_scan_vertex_batch_static(
2069 graph_ctx: &GraphExecutionContext,
2070 label: &str,
2071 variable: &str,
2072 projected_properties: &[String],
2073 output_schema: &SchemaRef,
2074 filter: &Option<Arc<dyn PhysicalExpr>>,
2075 vid_list_filter: Option<&[u64]>,
2076 extra_lance_filter: Option<&str>,
2077 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2078) -> DFResult<RecordBatch> {
2079 let storage = graph_ctx.storage();
2080 let l0_ctx = graph_ctx.l0_context();
2081 let uni_schema = storage.schema_manager().schema();
2082 let label_props = uni_schema.properties.get(label);
2083
2084 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2089
2090 let mut lance_columns: Vec<String> = vec![
2092 "_vid".to_string(),
2093 "_deleted".to_string(),
2094 "_version".to_string(),
2095 ];
2096 for prop in projected_properties {
2097 if prop == "overflow_json" {
2098 push_column_if_absent(&mut lance_columns, "overflow_json");
2099 } else if prop == "_created_at" || prop == "_updated_at" {
2100 push_column_if_absent(&mut lance_columns, prop);
2103 } else {
2104 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2105 if exists_in_schema {
2106 push_column_if_absent(&mut lance_columns, prop);
2107 }
2108 }
2109 }
2110
2111 let needs_overflow = projected_properties.iter().any(|p| {
2114 p == "overflow_json"
2115 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2116 && !label_props.is_some_and(|lp| lp.contains_key(p)))
2117 });
2118 if needs_overflow {
2119 push_column_if_absent(&mut lance_columns, "overflow_json");
2120 }
2121
2122 let vid_part = match (vid_list_filter, target_vid) {
2127 (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2128 (_, Some(v)) => Some(format!("_vid = {v}")),
2129 _ => None,
2130 };
2131 let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2132 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2133
2134 let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2140 Some(reg) => match reg.lookup_label_storage(label) {
2141 Some(plugin_storage) => {
2142 let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2143 datafusion::error::DataFusionError::Execution(format!(
2144 "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2145 e.message, e.code
2146 ))
2147 })?;
2148 use futures::StreamExt;
2149 let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2150 let mut schema_ref: Option<SchemaRef> = None;
2151 while let Some(b) = stream.next().await {
2152 let b = b.map_err(|e| {
2153 datafusion::error::DataFusionError::Execution(format!(
2154 "plugin Storage stream({label}) errored: {e}"
2155 ))
2156 })?;
2157 if schema_ref.is_none() {
2158 schema_ref = Some(b.schema());
2159 }
2160 batches.push(b);
2161 }
2162 if let Some(s) = schema_ref {
2163 Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2164 datafusion::error::DataFusionError::Execution(format!(
2165 "plugin Storage concat({label}) failed: {e}"
2166 ))
2167 })?)
2168 } else {
2169 None
2170 }
2171 }
2172 None => None,
2173 },
2174 None => None,
2175 };
2176
2177 let (lance_batch, pushdown_filtered) = match plugin_batch {
2181 Some(b) => (Some(b), false),
2182 None => (
2183 storage
2184 .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2185 .await
2186 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2187 extra_lance_filter.is_some(),
2188 ),
2189 };
2190
2191 let lance_batch = match (lance_batch, pushdown_filtered) {
2195 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, Some(label), b).await?),
2196 (b, _) => b,
2197 };
2198
2199 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2201
2202 let internal_schema = match &lance_deduped {
2205 Some(batch) => batch.schema(),
2206 None => {
2207 let mut fields = vec![
2208 Field::new("_vid", DataType::UInt64, false),
2209 Field::new("_deleted", DataType::Boolean, false),
2210 Field::new("_version", DataType::UInt64, false),
2211 ];
2212 for col in &lance_columns {
2213 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2214 continue;
2215 }
2216 if col == "overflow_json" {
2217 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2218 } else if col == "_created_at" || col == "_updated_at" {
2219 fields.push(Field::new(
2220 col,
2221 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2222 true,
2223 ));
2224 } else {
2225 let arrow_type = label_props
2226 .and_then(|lp| lp.get(col.as_str()))
2227 .map(|meta| meta.r#type.to_arrow())
2228 .unwrap_or(DataType::LargeBinary);
2229 fields.push(Field::new(col, arrow_type, true));
2230 }
2231 }
2232 Arc::new(Schema::new(fields))
2233 }
2234 };
2235
2236 let single_vid_buf: [u64; 1];
2242 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2243 (Some(vs), _) if !vs.is_empty() => Some(vs),
2244 (_, Some(v)) => {
2245 single_vid_buf = [v];
2246 Some(&single_vid_buf)
2247 }
2248 _ => None,
2249 };
2250 let l0_batch =
2251 build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2252
2253 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2255 else {
2256 return Ok(RecordBatch::new_empty(output_schema.clone()));
2257 };
2258
2259 let merged = filter_deleted_rows(&merged)?;
2261 if merged.num_rows() == 0 {
2262 return Ok(RecordBatch::new_empty(output_schema.clone()));
2263 }
2264
2265 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2267
2268 if filtered.num_rows() == 0 {
2269 return Ok(RecordBatch::new_empty(output_schema.clone()));
2270 }
2271
2272 let mapped = map_to_output_schema(
2274 &filtered,
2275 label,
2276 variable,
2277 projected_properties,
2278 output_schema,
2279 l0_ctx,
2280 )?;
2281
2282 apply_runtime_filter(mapped, extra_runtime_filter)
2286}
2287
2288fn apply_runtime_filter(
2293 batch: RecordBatch,
2294 runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2295) -> DFResult<RecordBatch> {
2296 let Some(filter) = runtime_filter else {
2297 return Ok(batch);
2298 };
2299 if batch.num_rows() == 0 {
2300 return Ok(batch);
2301 }
2302 let result = filter.evaluate(&batch)?;
2303 let array = result.into_array(batch.num_rows())?;
2304 let bools = array
2305 .as_any()
2306 .downcast_ref::<arrow_array::BooleanArray>()
2307 .ok_or_else(|| {
2308 datafusion::error::DataFusionError::Internal(
2309 "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2310 )
2311 })?;
2312 arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2313}
2314
2315async fn columnar_scan_edge_batch_static(
2322 graph_ctx: &GraphExecutionContext,
2323 edge_type: &str,
2324 variable: &str,
2325 projected_properties: &[String],
2326 output_schema: &SchemaRef,
2327) -> DFResult<RecordBatch> {
2328 let storage = graph_ctx.storage();
2329 let l0_ctx = graph_ctx.l0_context();
2330 let uni_schema = storage.schema_manager().schema();
2331 let type_props = uni_schema.properties.get(edge_type);
2332
2333 let mut lance_columns: Vec<String> = vec![
2335 "eid".to_string(),
2336 "src_vid".to_string(),
2337 "dst_vid".to_string(),
2338 "op".to_string(),
2339 "_version".to_string(),
2340 ];
2341 for prop in projected_properties {
2342 if prop == "overflow_json" {
2343 push_column_if_absent(&mut lance_columns, "overflow_json");
2344 } else if prop == "_created_at" || prop == "_updated_at" {
2345 push_column_if_absent(&mut lance_columns, prop);
2347 } else {
2348 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2349 if exists_in_schema {
2350 push_column_if_absent(&mut lance_columns, prop);
2351 }
2352 }
2353 }
2354
2355 let needs_overflow = projected_properties.iter().any(|p| {
2358 p == "overflow_json"
2359 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2360 && !type_props.is_some_and(|tp| tp.contains_key(p)))
2361 });
2362 if needs_overflow {
2363 push_column_if_absent(&mut lance_columns, "overflow_json");
2364 }
2365
2366 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2368 let lance_batch = storage
2369 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2370 .await
2371 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2372
2373 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2375
2376 let internal_schema = match &lance_deduped {
2379 Some(batch) => batch.schema(),
2380 None => {
2381 let mut fields = vec![
2382 Field::new("eid", DataType::UInt64, false),
2383 Field::new("src_vid", DataType::UInt64, false),
2384 Field::new("dst_vid", DataType::UInt64, false),
2385 Field::new("op", DataType::UInt8, false),
2386 Field::new("_version", DataType::UInt64, false),
2387 ];
2388 for col in &lance_columns {
2389 if matches!(
2390 col.as_str(),
2391 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2392 ) {
2393 continue;
2394 }
2395 if col == "overflow_json" {
2396 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2397 } else if col == "_created_at" || col == "_updated_at" {
2398 fields.push(Field::new(
2399 col,
2400 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2401 true,
2402 ));
2403 } else {
2404 let arrow_type = type_props
2405 .and_then(|tp| tp.get(col.as_str()))
2406 .map(|meta| meta.r#type.to_arrow())
2407 .unwrap_or(DataType::LargeBinary);
2408 fields.push(Field::new(col, arrow_type, true));
2409 }
2410 }
2411 Arc::new(Schema::new(fields))
2412 }
2413 };
2414
2415 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2417
2418 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2420 return Ok(RecordBatch::new_empty(output_schema.clone()));
2421 };
2422
2423 let merged = filter_deleted_edge_ops(&merged)?;
2425 if merged.num_rows() == 0 {
2426 return Ok(RecordBatch::new_empty(output_schema.clone()));
2427 }
2428
2429 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2431
2432 if filtered.num_rows() == 0 {
2433 return Ok(RecordBatch::new_empty(output_schema.clone()));
2434 }
2435
2436 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2438}
2439
2440#[expect(clippy::too_many_arguments)]
2447async fn columnar_scan_schemaless_vertex_batch_static(
2448 graph_ctx: &GraphExecutionContext,
2449 label: &str,
2450 variable: &str,
2451 projected_properties: &[String],
2452 output_schema: &SchemaRef,
2453 filter: &Option<Arc<dyn PhysicalExpr>>,
2454 vid_list_filter: Option<&[u64]>,
2455 extra_lance_filter: Option<&str>,
2456 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2457) -> DFResult<RecordBatch> {
2458 let storage = graph_ctx.storage();
2459 let l0_ctx = graph_ctx.l0_context();
2460
2461 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2465
2466 let filter = {
2469 let mut parts = Vec::new();
2470
2471 match (vid_list_filter, target_vid) {
2474 (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2475 (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2476 _ => {}
2477 }
2478
2479 if !label.is_empty() {
2481 if label.contains(':') {
2482 for lbl in label.split(':') {
2484 parts.push(format!("array_contains(labels, '{}')", lbl));
2485 }
2486 } else {
2487 parts.push(format!("array_contains(labels, '{}')", label));
2488 }
2489 }
2490
2491 if let Some(extra) = extra_lance_filter {
2493 parts.push(extra.to_string());
2494 }
2495
2496 if parts.is_empty() {
2497 None
2498 } else {
2499 Some(parts.join(" AND "))
2500 }
2501 };
2502
2503 let lance_batch = storage
2505 .scan_main_vertex_table(
2506 &["_vid", "_deleted", "labels", "props_json", "_version"],
2507 filter.as_deref(),
2508 )
2509 .await
2510 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2511
2512 let lance_batch = match (lance_batch, extra_lance_filter.is_some()) {
2516 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, None, b).await?),
2517 (b, _) => b,
2518 };
2519
2520 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2522
2523 let internal_schema = match &lance_deduped {
2526 Some(batch) => batch.schema(),
2527 None => Arc::new(Schema::new(vec![
2528 Field::new("_vid", DataType::UInt64, false),
2529 Field::new("_deleted", DataType::Boolean, false),
2530 Field::new("labels", labels_data_type(), false),
2531 Field::new("props_json", DataType::LargeBinary, true),
2532 Field::new("_version", DataType::UInt64, false),
2533 ])),
2534 };
2535
2536 let single_vid_buf: [u64; 1];
2540 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2541 (Some(vs), _) if !vs.is_empty() => Some(vs),
2542 (_, Some(v)) => {
2543 single_vid_buf = [v];
2544 Some(&single_vid_buf)
2545 }
2546 _ => None,
2547 };
2548 let l0_batch =
2549 build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2550
2551 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2553 else {
2554 return Ok(RecordBatch::new_empty(output_schema.clone()));
2555 };
2556
2557 let merged = filter_deleted_rows(&merged)?;
2559 if merged.num_rows() == 0 {
2560 return Ok(RecordBatch::new_empty(output_schema.clone()));
2561 }
2562
2563 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2565
2566 if filtered.num_rows() == 0 {
2567 return Ok(RecordBatch::new_empty(output_schema.clone()));
2568 }
2569
2570 let mapped = map_to_schemaless_output_schema(
2572 &filtered,
2573 variable,
2574 projected_properties,
2575 output_schema,
2576 l0_ctx,
2577 )?;
2578
2579 apply_runtime_filter(mapped, extra_runtime_filter)
2581}
2582
2583fn build_l0_schemaless_vertex_batch(
2589 l0_ctx: &crate::query::df_graph::L0Context,
2590 label: &str,
2591 internal_schema: &SchemaRef,
2592 target_vids: Option<&[u64]>,
2593) -> DFResult<RecordBatch> {
2594 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2597 let mut tombstones: HashSet<u64> = HashSet::new();
2598
2599 let label_filter: Vec<&str> = if label.is_empty() {
2601 vec![]
2602 } else if label.contains(':') {
2603 label.split(':').collect()
2604 } else {
2605 vec![label]
2606 };
2607
2608 for l0 in l0_ctx.iter_l0_buffers() {
2609 let guard = l0.read();
2610
2611 for vid in guard.vertex_tombstones.iter() {
2613 tombstones.insert(vid.as_u64());
2614 }
2615
2616 let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2619 let mut out = Vec::with_capacity(tvs.len());
2620 for &tv in tvs {
2621 let vid = Vid::from(tv);
2622 if !guard.vertex_properties.contains_key(&vid) {
2623 continue;
2624 }
2625 let label_ok = if label_filter.is_empty() {
2626 true
2627 } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2628 label_filter
2629 .iter()
2630 .all(|lf| labels.contains(&lf.to_string()))
2631 } else {
2632 false
2633 };
2634 if label_ok {
2635 out.push(vid);
2636 }
2637 }
2638 out
2639 } else if label_filter.is_empty() {
2640 guard.all_vertex_vids()
2641 } else if label_filter.len() == 1 {
2642 guard.vids_for_label(label_filter[0])
2643 } else {
2644 guard.vids_with_all_labels(&label_filter)
2645 };
2646
2647 for vid in vids {
2648 let vid_u64 = vid.as_u64();
2649 if tombstones.contains(&vid_u64) {
2650 continue;
2651 }
2652 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2653 let entry = vid_data
2654 .entry(vid_u64)
2655 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2656
2657 if let Some(props) = guard.vertex_properties.get(&vid) {
2659 for (k, v) in props {
2660 entry.0.insert(k.clone(), v.clone());
2661 }
2662 }
2663 if version > entry.1 {
2665 entry.1 = version;
2666 }
2667 if let Some(labels) = guard.vertex_labels.get(&vid) {
2669 entry.2 = labels.clone();
2670 }
2671 }
2672 }
2673
2674 for t in &tombstones {
2676 vid_data.remove(t);
2677 }
2678
2679 if vid_data.is_empty() {
2680 return Ok(RecordBatch::new_empty(internal_schema.clone()));
2681 }
2682
2683 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2685 vids.sort_unstable();
2686
2687 let num_rows = vids.len();
2688 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2689
2690 for field in internal_schema.fields() {
2691 match field.name().as_str() {
2692 "_vid" => {
2693 columns.push(Arc::new(UInt64Array::from(vids.clone())));
2694 }
2695 "labels" => {
2696 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2697 for vid_u64 in &vids {
2698 let (_, _, labels) = &vid_data[vid_u64];
2699 let values = labels_builder.values();
2700 for lbl in labels {
2701 values.append_value(lbl);
2702 }
2703 labels_builder.append(true);
2704 }
2705 columns.push(Arc::new(labels_builder.finish()));
2706 }
2707 "props_json" => {
2708 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2709 for vid_u64 in &vids {
2710 let (props, _, _) = &vid_data[vid_u64];
2711 if props.is_empty() {
2712 builder.append_null();
2713 } else {
2714 let map: HashMap<String, Value> =
2717 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2718 builder
2719 .append_value(uni_common::cypher_value_codec::encode(&Value::Map(map)));
2720 }
2721 }
2722 columns.push(Arc::new(builder.finish()));
2723 }
2724 "_deleted" => {
2725 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2727 false;
2728 num_rows
2729 ])));
2730 }
2731 "_version" => {
2732 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2733 columns.push(Arc::new(UInt64Array::from(vals)));
2734 }
2735 _ => {
2736 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2738 }
2739 }
2740 }
2741
2742 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2743}
2744
2745fn map_to_schemaless_output_schema(
2752 batch: &RecordBatch,
2753 _variable: &str,
2754 projected_properties: &[String],
2755 output_schema: &SchemaRef,
2756 l0_ctx: &crate::query::df_graph::L0Context,
2757) -> DFResult<RecordBatch> {
2758 if batch.num_rows() == 0 {
2759 return Ok(RecordBatch::new_empty(output_schema.clone()));
2760 }
2761
2762 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2763
2764 let vid_col = batch
2766 .column_by_name("_vid")
2767 .ok_or_else(|| {
2768 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2769 })?
2770 .clone();
2771 let vid_arr = vid_col
2772 .as_any()
2773 .downcast_ref::<UInt64Array>()
2774 .ok_or_else(|| {
2775 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2776 })?;
2777 columns.push(vid_col.clone());
2778
2779 let labels_col = batch.column_by_name("labels");
2781 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2782
2783 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2784 for i in 0..vid_arr.len() {
2785 let vid_u64 = vid_arr.value(i);
2786 let vid = Vid::from(vid_u64);
2787
2788 let mut row_labels: Vec<String> = Vec::new();
2790 if let Some(arr) = labels_arr
2791 && !arr.is_null(i)
2792 {
2793 let list_val = arr.value(i);
2794 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2795 for j in 0..str_arr.len() {
2796 if !str_arr.is_null(j) {
2797 row_labels.push(str_arr.value(j).to_string());
2798 }
2799 }
2800 }
2801 }
2802
2803 for l0 in l0_ctx.iter_l0_buffers() {
2805 let guard = l0.read();
2806 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2807 for lbl in l0_labels {
2808 if !row_labels.contains(lbl) {
2809 row_labels.push(lbl.clone());
2810 }
2811 }
2812 }
2813 }
2814
2815 let values = labels_builder.values();
2816 for lbl in &row_labels {
2817 values.append_value(lbl);
2818 }
2819 labels_builder.append(true);
2820 }
2821 columns.push(Arc::new(labels_builder.finish()));
2822
2823 let props_col = batch.column_by_name("props_json");
2825 let props_arr =
2826 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2827
2828 for prop in projected_properties {
2829 if prop == "_all_props" {
2830 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2833 let guard = l0.read();
2834 !guard.vertex_properties.is_empty()
2835 });
2836 if !any_l0_has_vertex_props {
2837 match props_col {
2838 Some(col) => columns.push(col.clone()),
2839 None => {
2840 columns.push(arrow_array::new_null_array(
2841 &DataType::LargeBinary,
2842 batch.num_rows(),
2843 ));
2844 }
2845 }
2846 } else {
2847 let col = build_all_props_column_with_l0_overlay(
2848 batch.num_rows(),
2849 vid_arr,
2850 props_arr,
2851 l0_ctx,
2852 );
2853 columns.push(col);
2854 }
2855 } else {
2856 let expected_type = output_schema
2861 .field_with_name(&format!("{_variable}.{prop}"))
2862 .map(|f| f.data_type().clone())
2863 .unwrap_or(DataType::LargeBinary);
2864
2865 if expected_type == DataType::LargeBinary {
2866 let col = build_overflow_property_column(
2867 batch.num_rows(),
2868 vid_arr,
2869 props_arr,
2870 prop,
2871 l0_ctx,
2872 );
2873 columns.push(col);
2874 } else {
2875 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2877 for i in 0..batch.num_rows() {
2878 let vid = Vid::from(vid_arr.value(i));
2879 let resolved =
2880 resolve_l0_property(&vid, prop, l0_ctx)
2881 .flatten()
2882 .or_else(|| {
2883 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2884 uni_common::cypher_value_codec::decode(&bytes).ok()
2885 })
2886 });
2887 if let Some(val) = resolved {
2888 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2889 }
2890 }
2891 let vids: Vec<Vid> = (0..batch.num_rows())
2892 .map(|i| Vid::from(vid_arr.value(i)))
2893 .collect();
2894 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2895 .unwrap_or_else(|_| {
2896 arrow_array::new_null_array(&expected_type, batch.num_rows())
2897 });
2898 columns.push(col);
2899 }
2900 }
2901 }
2902
2903 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2904}
2905
2906pub(crate) fn get_property_value(
2908 vid: &Vid,
2909 props_map: &HashMap<Vid, Properties>,
2910 prop_name: &str,
2911) -> Option<Value> {
2912 if prop_name == "_all_props" {
2913 return props_map.get(vid).map(|p| {
2914 let map: HashMap<String, Value> =
2915 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2916 Value::Map(map)
2917 });
2918 }
2919 props_map
2920 .get(vid)
2921 .and_then(|props| props.get(prop_name))
2922 .cloned()
2923}
2924
2925macro_rules! build_numeric_column {
2927 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2928 let mut builder = <$builder_ty>::new();
2929 for vid in $vids {
2930 match get_property_value(vid, $props_map, $prop_name) {
2931 Some(ref v) => {
2932 if let Some(val) = $extractor(v) {
2933 builder.append_value($cast(val));
2934 } else {
2935 builder.append_null();
2936 }
2937 }
2938 None => builder.append_null(),
2939 }
2940 }
2941 Ok(Arc::new(builder.finish()) as ArrayRef)
2942 }};
2943}
2944
2945pub(crate) fn build_property_column_static(
2947 vids: &[Vid],
2948 props_map: &HashMap<Vid, Properties>,
2949 prop_name: &str,
2950 data_type: &DataType,
2951) -> DFResult<ArrayRef> {
2952 match data_type {
2953 DataType::LargeBinary => {
2954 use arrow_array::builder::LargeBinaryBuilder;
2956 let mut builder = LargeBinaryBuilder::new();
2957
2958 for vid in vids {
2959 match get_property_value(vid, props_map, prop_name) {
2960 Some(Value::Null) | None => builder.append_null(),
2961 Some(Value::Bytes(bytes)) => {
2962 builder.append_value(&bytes);
2963 }
2964 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2965 let bytes: Vec<u8> = arr
2968 .iter()
2969 .filter_map(|v| v.as_u64().map(|n| n as u8))
2970 .collect();
2971 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2972 builder.append_value(&bytes);
2973 } else {
2974 builder.append_value(uni_common::cypher_value_codec::encode(
2975 &Value::List(arr),
2976 ));
2977 }
2978 }
2979 Some(val) => {
2980 builder.append_value(uni_common::cypher_value_codec::encode(&val));
2984 }
2985 }
2986 }
2987 Ok(Arc::new(builder.finish()))
2988 }
2989 DataType::Binary => {
2990 let mut builder = BinaryBuilder::new();
2992 for vid in vids {
2993 let bytes = get_property_value(vid, props_map, prop_name)
2994 .filter(|v| !v.is_null())
2995 .and_then(|v| {
2996 let json_val: serde_json::Value = v.into();
2997 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2998 })
2999 .and_then(|crdt| crdt.to_msgpack().ok());
3000 match bytes {
3001 Some(b) => builder.append_value(&b),
3002 None => builder.append_null(),
3003 }
3004 }
3005 Ok(Arc::new(builder.finish()))
3006 }
3007 DataType::Utf8 => {
3008 let mut builder = StringBuilder::new();
3009 for vid in vids {
3010 match get_property_value(vid, props_map, prop_name) {
3011 Some(Value::String(s)) => builder.append_value(s),
3012 Some(Value::Null) | None => builder.append_null(),
3013 Some(other) => builder.append_value(other.to_string()),
3014 }
3015 }
3016 Ok(Arc::new(builder.finish()))
3017 }
3018 DataType::Int64 => {
3019 build_numeric_column!(
3020 vids,
3021 props_map,
3022 prop_name,
3023 Int64Builder,
3024 |v: &Value| v.as_i64(),
3025 |v| v
3026 )
3027 }
3028 DataType::Int32 => {
3029 build_numeric_column!(
3030 vids,
3031 props_map,
3032 prop_name,
3033 Int32Builder,
3034 |v: &Value| v.as_i64(),
3035 |v: i64| v as i32
3036 )
3037 }
3038 DataType::Float64 => {
3039 build_numeric_column!(
3040 vids,
3041 props_map,
3042 prop_name,
3043 Float64Builder,
3044 |v: &Value| v.as_f64(),
3045 |v| v
3046 )
3047 }
3048 DataType::Float32 => {
3049 build_numeric_column!(
3050 vids,
3051 props_map,
3052 prop_name,
3053 Float32Builder,
3054 |v: &Value| v.as_f64(),
3055 |v: f64| v as f32
3056 )
3057 }
3058 DataType::Boolean => {
3059 let mut builder = BooleanBuilder::new();
3060 for vid in vids {
3061 match get_property_value(vid, props_map, prop_name) {
3062 Some(Value::Bool(b)) => builder.append_value(b),
3063 _ => builder.append_null(),
3064 }
3065 }
3066 Ok(Arc::new(builder.finish()))
3067 }
3068 DataType::UInt64 => {
3069 build_numeric_column!(
3070 vids,
3071 props_map,
3072 prop_name,
3073 UInt64Builder,
3074 |v: &Value| v.as_u64(),
3075 |v| v
3076 )
3077 }
3078 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
3079 let values_builder = Float32Builder::new();
3081 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
3082 for vid in vids {
3083 match get_property_value(vid, props_map, prop_name) {
3084 Some(Value::Vector(v)) => {
3085 for val in v {
3086 list_builder.values().append_value(val);
3087 }
3088 list_builder.append(true);
3089 }
3090 Some(Value::List(arr)) => {
3091 for v in arr {
3092 list_builder
3093 .values()
3094 .append_value(v.as_f64().unwrap_or(0.0) as f32);
3095 }
3096 list_builder.append(true);
3097 }
3098 _ => {
3099 for _ in 0..*dim {
3101 list_builder.values().append_null();
3102 }
3103 list_builder.append(false);
3104 }
3105 }
3106 }
3107 Ok(Arc::new(list_builder.finish()))
3108 }
3109 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3110 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3112 for vid in vids {
3113 match get_property_value(vid, props_map, prop_name) {
3114 Some(Value::Temporal(tv)) => match tv {
3115 uni_common::TemporalValue::DateTime {
3116 nanos_since_epoch, ..
3117 }
3118 | uni_common::TemporalValue::LocalDateTime {
3119 nanos_since_epoch, ..
3120 } => {
3121 builder.append_value(nanos_since_epoch);
3122 }
3123 uni_common::TemporalValue::Date { days_since_epoch } => {
3124 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3125 }
3126 _ => builder.append_null(),
3127 },
3128 Some(Value::String(s)) => match parse_datetime_utc(&s) {
3129 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3130 Err(_) => builder.append_null(),
3131 },
3132 Some(Value::Int(n)) => {
3133 builder.append_value(n);
3134 }
3135 _ => builder.append_null(),
3136 }
3137 }
3138 Ok(Arc::new(builder.finish()))
3139 }
3140 DataType::Date32 => {
3141 let mut builder = Date32Builder::new();
3142 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3143 for vid in vids {
3144 match get_property_value(vid, props_map, prop_name) {
3145 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3146 builder.append_value(days_since_epoch);
3147 }
3148 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3149 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3150 Err(_) => builder.append_null(),
3151 },
3152 Some(Value::Int(n)) => {
3153 builder.append_value(n as i32);
3154 }
3155 _ => builder.append_null(),
3156 }
3157 }
3158 Ok(Arc::new(builder.finish()))
3159 }
3160 DataType::Time64(TimeUnit::Nanosecond) => {
3161 let mut builder = Time64NanosecondBuilder::new();
3162 for vid in vids {
3163 match get_property_value(vid, props_map, prop_name) {
3164 Some(Value::Temporal(
3165 uni_common::TemporalValue::LocalTime {
3166 nanos_since_midnight,
3167 }
3168 | uni_common::TemporalValue::Time {
3169 nanos_since_midnight,
3170 ..
3171 },
3172 )) => {
3173 builder.append_value(nanos_since_midnight);
3174 }
3175 Some(Value::Temporal(_)) => builder.append_null(),
3176 Some(Value::String(s)) => {
3177 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3178 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3179 {
3180 Ok(t) => {
3181 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3182 + t.nanosecond() as i64;
3183 builder.append_value(nanos);
3184 }
3185 Err(_) => builder.append_null(),
3186 }
3187 }
3188 Some(Value::Int(n)) => {
3189 builder.append_value(n);
3190 }
3191 _ => builder.append_null(),
3192 }
3193 }
3194 Ok(Arc::new(builder.finish()))
3195 }
3196 DataType::Interval(IntervalUnit::MonthDayNano) => {
3197 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3198 Vec::with_capacity(vids.len());
3199 for vid in vids {
3200 match get_property_value(vid, props_map, prop_name) {
3201 Some(Value::Temporal(uni_common::TemporalValue::Duration {
3202 months,
3203 days,
3204 nanos,
3205 })) => {
3206 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3207 months: months as i32,
3208 days: days as i32,
3209 nanoseconds: nanos,
3210 }));
3211 }
3212 Some(Value::Int(_n)) => {
3213 values.push(None);
3214 }
3215 _ => values.push(None),
3216 }
3217 }
3218 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3219 Ok(Arc::new(arr))
3220 }
3221 DataType::List(inner_field) => {
3222 build_list_property_column(vids, props_map, prop_name, inner_field)
3223 }
3224 DataType::Struct(fields) => {
3225 build_struct_property_column(vids, props_map, prop_name, fields)
3226 }
3227 DataType::FixedSizeBinary(24) => {
3228 use arrow_array::builder::FixedSizeBinaryBuilder;
3230 const BTIC_LEN: i32 = 24;
3231 let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3232 for vid in vids {
3233 match get_property_value(vid, props_map, prop_name) {
3234 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3235 match uni_btic::Btic::new(lo, hi, meta) {
3236 Ok(b) => {
3237 builder
3238 .append_value(uni_btic::encode::encode(&b))
3239 .map_err(arrow_err)?;
3240 }
3241 Err(e) => {
3242 tracing::warn!(
3243 "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3244 prop_name,
3245 lo,
3246 hi,
3247 meta,
3248 e
3249 );
3250 builder.append_null()
3251 }
3252 }
3253 }
3254 Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3255 Ok(b) => {
3256 builder
3257 .append_value(uni_btic::encode::encode(&b))
3258 .map_err(arrow_err)?;
3259 }
3260 Err(e) => {
3261 tracing::warn!(
3262 "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3263 prop_name,
3264 s,
3265 e
3266 );
3267 builder.append_null()
3268 }
3269 },
3270 _ => builder.append_null(),
3271 }
3272 }
3273 Ok(Arc::new(builder.finish()))
3274 }
3275 _ => {
3277 let mut builder = StringBuilder::new();
3278 for vid in vids {
3279 match get_property_value(vid, props_map, prop_name) {
3280 Some(Value::Null) | None => builder.append_null(),
3281 Some(other) => builder.append_value(other.to_string()),
3282 }
3283 }
3284 Ok(Arc::new(builder.finish()))
3285 }
3286 }
3287}
3288
3289fn build_list_property_column(
3291 vids: &[Vid],
3292 props_map: &HashMap<Vid, Properties>,
3293 prop_name: &str,
3294 inner_field: &Arc<Field>,
3295) -> DFResult<ArrayRef> {
3296 match inner_field.data_type() {
3297 DataType::Utf8 => {
3298 let mut builder = ListBuilder::new(StringBuilder::new());
3299 for vid in vids {
3300 match get_property_value(vid, props_map, prop_name) {
3301 Some(Value::List(arr)) => {
3302 for v in arr {
3303 match v {
3304 Value::String(s) => builder.values().append_value(s),
3305 Value::Null => builder.values().append_null(),
3306 other => builder.values().append_value(format!("{other:?}")),
3307 }
3308 }
3309 builder.append(true);
3310 }
3311 _ => builder.append(false),
3312 }
3313 }
3314 Ok(Arc::new(builder.finish()))
3315 }
3316 DataType::Int64 => {
3317 let mut builder = ListBuilder::new(Int64Builder::new());
3318 for vid in vids {
3319 match get_property_value(vid, props_map, prop_name) {
3320 Some(Value::List(arr)) => {
3321 for v in arr {
3322 match v.as_i64() {
3323 Some(n) => builder.values().append_value(n),
3324 None => builder.values().append_null(),
3325 }
3326 }
3327 builder.append(true);
3328 }
3329 _ => builder.append(false),
3330 }
3331 }
3332 Ok(Arc::new(builder.finish()))
3333 }
3334 DataType::Float64 => {
3335 let mut builder = ListBuilder::new(Float64Builder::new());
3336 for vid in vids {
3337 match get_property_value(vid, props_map, prop_name) {
3338 Some(Value::List(arr)) => {
3339 for v in arr {
3340 match v.as_f64() {
3341 Some(n) => builder.values().append_value(n),
3342 None => builder.values().append_null(),
3343 }
3344 }
3345 builder.append(true);
3346 }
3347 _ => builder.append(false),
3348 }
3349 }
3350 Ok(Arc::new(builder.finish()))
3351 }
3352 DataType::Boolean => {
3353 let mut builder = ListBuilder::new(BooleanBuilder::new());
3354 for vid in vids {
3355 match get_property_value(vid, props_map, prop_name) {
3356 Some(Value::List(arr)) => {
3357 for v in arr {
3358 match v.as_bool() {
3359 Some(b) => builder.values().append_value(b),
3360 None => builder.values().append_null(),
3361 }
3362 }
3363 builder.append(true);
3364 }
3365 _ => builder.append(false),
3366 }
3367 }
3368 Ok(Arc::new(builder.finish()))
3369 }
3370 DataType::Struct(fields) => {
3371 build_list_of_structs_column(vids, props_map, prop_name, fields)
3373 }
3374 DataType::LargeBinary
3375 if inner_field
3376 .metadata()
3377 .get("uni_raw_bytes")
3378 .is_some_and(|v| v == "true") =>
3379 {
3380 let mut builder = ListBuilder::new(arrow_array::builder::LargeBinaryBuilder::new())
3386 .with_field(inner_field.clone());
3387 for vid in vids {
3388 match get_property_value(vid, props_map, prop_name) {
3389 Some(Value::List(arr)) => {
3390 for v in arr {
3391 if let Value::Bytes(b) = v {
3392 builder.values().append_value(b);
3393 } else {
3394 builder.values().append_null();
3395 }
3396 }
3397 builder.append(true);
3398 }
3399 _ => builder.append(false),
3400 }
3401 }
3402 Ok(Arc::new(builder.finish()))
3403 }
3404 _ => {
3406 let mut builder = ListBuilder::new(StringBuilder::new());
3407 for vid in vids {
3408 match get_property_value(vid, props_map, prop_name) {
3409 Some(Value::List(arr)) => {
3410 for v in arr {
3411 match v {
3412 Value::Null => builder.values().append_null(),
3413 other => builder.values().append_value(format!("{other:?}")),
3414 }
3415 }
3416 builder.append(true);
3417 }
3418 _ => builder.append(false),
3419 }
3420 }
3421 Ok(Arc::new(builder.finish()))
3422 }
3423 }
3424}
3425
3426fn build_list_of_structs_column(
3432 vids: &[Vid],
3433 props_map: &HashMap<Vid, Properties>,
3434 prop_name: &str,
3435 fields: &Fields,
3436) -> DFResult<ArrayRef> {
3437 use arrow_array::StructArray;
3438
3439 let values: Vec<Option<Value>> = vids
3440 .iter()
3441 .map(|vid| get_property_value(vid, props_map, prop_name))
3442 .collect();
3443
3444 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3447 .iter()
3448 .map(|val| match val {
3449 Some(Value::List(arr)) => {
3450 let objs: Vec<HashMap<String, Value>> = arr
3451 .iter()
3452 .filter_map(|v| {
3453 if let Value::Map(m) = v {
3454 Some(m.clone())
3455 } else {
3456 None
3457 }
3458 })
3459 .collect();
3460 if objs.is_empty() { None } else { Some(objs) }
3461 }
3462 Some(Value::Map(obj)) => {
3463 let kv_pairs: Vec<HashMap<String, Value>> = obj
3465 .iter()
3466 .map(|(k, v)| {
3467 let mut m = HashMap::new();
3468 m.insert("key".to_string(), Value::String(k.clone()));
3469 m.insert("value".to_string(), v.clone());
3470 m
3471 })
3472 .collect();
3473 Some(kv_pairs)
3474 }
3475 _ => None,
3476 })
3477 .collect();
3478
3479 let total_items: usize = rows
3480 .iter()
3481 .filter_map(|r| r.as_ref())
3482 .map(|v| v.len())
3483 .sum();
3484
3485 let child_arrays: Vec<ArrayRef> = fields
3487 .iter()
3488 .map(|field| {
3489 let field_name = field.name();
3490 match field.data_type() {
3491 DataType::Utf8 => {
3492 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3493 for obj in rows.iter().flatten().flatten() {
3494 match obj.get(field_name) {
3495 Some(Value::String(s)) => builder.append_value(s),
3496 Some(Value::Null) | None => builder.append_null(),
3497 Some(other) => builder.append_value(format!("{other:?}")),
3498 }
3499 }
3500 Arc::new(builder.finish()) as ArrayRef
3501 }
3502 DataType::Int64 => {
3503 let mut builder = Int64Builder::with_capacity(total_items);
3504 for obj in rows.iter().flatten().flatten() {
3505 match obj.get(field_name).and_then(|v| v.as_i64()) {
3506 Some(n) => builder.append_value(n),
3507 None => builder.append_null(),
3508 }
3509 }
3510 Arc::new(builder.finish()) as ArrayRef
3511 }
3512 DataType::Float64 => {
3513 let mut builder = Float64Builder::with_capacity(total_items);
3514 for obj in rows.iter().flatten().flatten() {
3515 match obj.get(field_name).and_then(|v| v.as_f64()) {
3516 Some(n) => builder.append_value(n),
3517 None => builder.append_null(),
3518 }
3519 }
3520 Arc::new(builder.finish()) as ArrayRef
3521 }
3522 DataType::LargeBinary
3527 if field
3528 .metadata()
3529 .get("uni_raw_bytes")
3530 .is_some_and(|v| v == "true") =>
3531 {
3532 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
3533 for obj in rows.iter().flatten().flatten() {
3534 match obj.get(field_name) {
3535 Some(Value::Bytes(b)) => builder.append_value(b),
3536 _ => builder.append_null(),
3537 }
3538 }
3539 Arc::new(builder.finish()) as ArrayRef
3540 }
3541 _ => {
3543 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3544 for obj in rows.iter().flatten().flatten() {
3545 match obj.get(field_name) {
3546 Some(Value::Null) | None => builder.append_null(),
3547 Some(other) => builder.append_value(format!("{other:?}")),
3548 }
3549 }
3550 Arc::new(builder.finish()) as ArrayRef
3551 }
3552 }
3553 })
3554 .collect();
3555
3556 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3558 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3559
3560 let mut offsets = Vec::with_capacity(vids.len() + 1);
3562 let mut nulls = Vec::with_capacity(vids.len());
3563 let mut offset = 0i32;
3564 offsets.push(offset);
3565 for row in &rows {
3566 match row {
3567 Some(objs) => {
3568 offset += objs.len() as i32;
3569 offsets.push(offset);
3570 nulls.push(true);
3571 }
3572 None => {
3573 offsets.push(offset);
3574 nulls.push(false);
3575 }
3576 }
3577 }
3578
3579 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3580 let list_array = arrow_array::ListArray::try_new(
3581 list_field,
3582 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3583 Arc::new(struct_array),
3584 Some(arrow::buffer::NullBuffer::from(nulls)),
3585 )
3586 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3587
3588 Ok(Arc::new(list_array))
3589}
3590
3591fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3594 use uni_common::value::TemporalValue;
3595 let mut m = HashMap::new();
3596 match tv {
3597 TemporalValue::DateTime {
3598 nanos_since_epoch,
3599 offset_seconds,
3600 timezone_name,
3601 } => {
3602 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3603 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3604 if let Some(tz) = timezone_name {
3605 m.insert("timezone_name".into(), Value::String(tz.clone()));
3606 }
3607 }
3608 TemporalValue::LocalDateTime { nanos_since_epoch } => {
3609 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3610 }
3611 TemporalValue::Time {
3612 nanos_since_midnight,
3613 offset_seconds,
3614 } => {
3615 m.insert(
3616 "nanos_since_midnight".into(),
3617 Value::Int(*nanos_since_midnight),
3618 );
3619 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3620 }
3621 TemporalValue::LocalTime {
3622 nanos_since_midnight,
3623 } => {
3624 m.insert(
3625 "nanos_since_midnight".into(),
3626 Value::Int(*nanos_since_midnight),
3627 );
3628 }
3629 TemporalValue::Date { days_since_epoch } => {
3630 m.insert(
3631 "days_since_epoch".into(),
3632 Value::Int(*days_since_epoch as i64),
3633 );
3634 }
3635 TemporalValue::Duration {
3636 months,
3637 days,
3638 nanos,
3639 } => {
3640 m.insert("months".into(), Value::Int(*months));
3641 m.insert("days".into(), Value::Int(*days));
3642 m.insert("nanos".into(), Value::Int(*nanos));
3643 }
3644 TemporalValue::Btic { lo, hi, meta } => {
3645 m.insert("lo".into(), Value::Int(*lo));
3646 m.insert("hi".into(), Value::Int(*hi));
3647 m.insert("meta".into(), Value::Int(*meta as i64));
3648 }
3649 }
3650 m
3651}
3652
3653fn build_struct_property_column(
3655 vids: &[Vid],
3656 props_map: &HashMap<Vid, Properties>,
3657 prop_name: &str,
3658 fields: &Fields,
3659) -> DFResult<ArrayRef> {
3660 use arrow_array::StructArray;
3661
3662 let values: Vec<Option<Value>> = vids
3665 .iter()
3666 .map(|vid| {
3667 let val = get_property_value(vid, props_map, prop_name);
3668 match val {
3669 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3670 other => other,
3671 }
3672 })
3673 .collect();
3674
3675 let child_arrays: Vec<ArrayRef> = fields
3676 .iter()
3677 .map(|field| {
3678 let field_name = field.name();
3679 match field.data_type() {
3680 DataType::Float64 => {
3681 let mut builder = Float64Builder::with_capacity(vids.len());
3682 for val in &values {
3683 match val {
3684 Some(Value::Map(obj)) => {
3685 match obj.get(field_name).and_then(|v| v.as_f64()) {
3686 Some(n) => builder.append_value(n),
3687 None => builder.append_null(),
3688 }
3689 }
3690 _ => builder.append_null(),
3691 }
3692 }
3693 Arc::new(builder.finish()) as ArrayRef
3694 }
3695 DataType::Utf8 => {
3696 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3697 for val in &values {
3698 match val {
3699 Some(Value::Map(obj)) => match obj.get(field_name) {
3700 Some(Value::String(s)) => builder.append_value(s),
3701 Some(Value::Null) | None => builder.append_null(),
3702 Some(other) => builder.append_value(format!("{other:?}")),
3703 },
3704 _ => builder.append_null(),
3705 }
3706 }
3707 Arc::new(builder.finish()) as ArrayRef
3708 }
3709 DataType::Int64 => {
3710 let mut builder = Int64Builder::with_capacity(vids.len());
3711 for val in &values {
3712 match val {
3713 Some(Value::Map(obj)) => {
3714 match obj.get(field_name).and_then(|v| v.as_i64()) {
3715 Some(n) => builder.append_value(n),
3716 None => builder.append_null(),
3717 }
3718 }
3719 _ => builder.append_null(),
3720 }
3721 }
3722 Arc::new(builder.finish()) as ArrayRef
3723 }
3724 DataType::Timestamp(_, _) => {
3725 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3726 for val in &values {
3727 match val {
3728 Some(Value::Map(obj)) => {
3729 match obj.get(field_name).and_then(|v| v.as_i64()) {
3730 Some(n) => builder.append_value(n),
3731 None => builder.append_null(),
3732 }
3733 }
3734 _ => builder.append_null(),
3735 }
3736 }
3737 Arc::new(builder.finish()) as ArrayRef
3738 }
3739 DataType::Int32 => {
3740 let mut builder = Int32Builder::with_capacity(vids.len());
3741 for val in &values {
3742 match val {
3743 Some(Value::Map(obj)) => {
3744 match obj.get(field_name).and_then(|v| v.as_i64()) {
3745 Some(n) => builder.append_value(n as i32),
3746 None => builder.append_null(),
3747 }
3748 }
3749 _ => builder.append_null(),
3750 }
3751 }
3752 Arc::new(builder.finish()) as ArrayRef
3753 }
3754 DataType::Time64(_) => {
3755 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3756 for val in &values {
3757 match val {
3758 Some(Value::Map(obj)) => {
3759 match obj.get(field_name).and_then(|v| v.as_i64()) {
3760 Some(n) => builder.append_value(n),
3761 None => builder.append_null(),
3762 }
3763 }
3764 _ => builder.append_null(),
3765 }
3766 }
3767 Arc::new(builder.finish()) as ArrayRef
3768 }
3769 _ => {
3771 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3772 for val in &values {
3773 match val {
3774 Some(Value::Map(obj)) => match obj.get(field_name) {
3775 Some(Value::Null) | None => builder.append_null(),
3776 Some(other) => builder.append_value(format!("{other:?}")),
3777 },
3778 _ => builder.append_null(),
3779 }
3780 }
3781 Arc::new(builder.finish()) as ArrayRef
3782 }
3783 }
3784 })
3785 .collect();
3786
3787 let nulls: Vec<bool> = values
3789 .iter()
3790 .map(|v| matches!(v, Some(Value::Map(_))))
3791 .collect();
3792
3793 let struct_array = StructArray::try_new(
3794 fields.clone(),
3795 child_arrays,
3796 Some(arrow::buffer::NullBuffer::from(nulls)),
3797 )
3798 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3799
3800 Ok(Arc::new(struct_array))
3801}
3802
3803impl Stream for GraphScanStream {
3804 type Item = DFResult<RecordBatch>;
3805
3806 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3807 let metrics = self.metrics.clone();
3808 let _timer = metrics.elapsed_compute().timer();
3809 loop {
3810 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3812
3813 match state {
3814 GraphScanState::Init => {
3815 let graph_ctx = self.graph_ctx.clone();
3817 let label = self.label.clone();
3818 let variable = self.variable.clone();
3819 let properties = self.properties.clone();
3820 let is_edge_scan = self.is_edge_scan;
3821 let is_schemaless = self.is_schemaless;
3822 let filter = self.filter.clone();
3823 let vid_list_filter = self.vid_list_filter.clone();
3824 let extra_lance_filter = self.extra_lance_filter.clone();
3825 let extra_runtime_filter = self.extra_runtime_filter.clone();
3826 let schema = self.schema.clone();
3827
3828 let fut = async move {
3829 graph_ctx.check_timeout().map_err(|e| {
3830 datafusion::error::DataFusionError::Execution(e.to_string())
3831 })?;
3832
3833 let batch = if is_edge_scan {
3834 columnar_scan_edge_batch_static(
3835 &graph_ctx,
3836 &label,
3837 &variable,
3838 &properties,
3839 &schema,
3840 )
3841 .await?
3842 } else if is_schemaless {
3843 columnar_scan_schemaless_vertex_batch_static(
3844 &graph_ctx,
3845 &label,
3846 &variable,
3847 &properties,
3848 &schema,
3849 &filter,
3850 vid_list_filter.as_deref(),
3851 extra_lance_filter.as_deref(),
3852 extra_runtime_filter.as_ref(),
3853 )
3854 .await?
3855 } else {
3856 columnar_scan_vertex_batch_static(
3857 &graph_ctx,
3858 &label,
3859 &variable,
3860 &properties,
3861 &schema,
3862 &filter,
3863 vid_list_filter.as_deref(),
3864 extra_lance_filter.as_deref(),
3865 extra_runtime_filter.as_ref(),
3866 )
3867 .await?
3868 };
3869 Ok(Some(batch))
3870 };
3871
3872 self.state = GraphScanState::Executing(Box::pin(fut));
3873 }
3875 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3876 Poll::Ready(Ok(batch)) => {
3877 self.state = GraphScanState::Done;
3878 self.metrics
3879 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3880 return Poll::Ready(batch.map(Ok));
3881 }
3882 Poll::Ready(Err(e)) => {
3883 self.state = GraphScanState::Done;
3884 return Poll::Ready(Some(Err(e)));
3885 }
3886 Poll::Pending => {
3887 self.state = GraphScanState::Executing(fut);
3888 return Poll::Pending;
3889 }
3890 },
3891 GraphScanState::Done => {
3892 return Poll::Ready(None);
3893 }
3894 }
3895 }
3896 }
3897}
3898
3899impl RecordBatchStream for GraphScanStream {
3900 fn schema(&self) -> SchemaRef {
3901 self.schema.clone()
3902 }
3903}
3904
3905#[cfg(test)]
3906mod tests {
3907 use super::*;
3908
3909 #[test]
3910 fn test_build_vertex_schema() {
3911 let uni_schema = UniSchema::default();
3912 let schema = GraphScanExec::build_vertex_schema(
3913 "n",
3914 "Person",
3915 &["name".to_string(), "age".to_string()],
3916 &uni_schema,
3917 );
3918
3919 assert_eq!(schema.fields().len(), 4);
3920 assert_eq!(schema.field(0).name(), "n._vid");
3921 assert_eq!(schema.field(1).name(), "n._labels");
3922 assert_eq!(schema.field(2).name(), "n.name");
3923 assert_eq!(schema.field(3).name(), "n.age");
3924 }
3925
3926 #[test]
3927 fn test_build_edge_schema() {
3928 let uni_schema = UniSchema::default();
3929 let schema =
3930 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3931
3932 assert_eq!(schema.fields().len(), 4);
3933 assert_eq!(schema.field(0).name(), "r._eid");
3934 assert_eq!(schema.field(1).name(), "r._src_vid");
3935 assert_eq!(schema.field(2).name(), "r._dst_vid");
3936 assert_eq!(schema.field(3).name(), "r.weight");
3937 }
3938
3939 #[test]
3940 fn test_build_schemaless_vertex_schema() {
3941 let empty_schema = uni_common::core::schema::Schema::default();
3942 let schema = GraphScanExec::build_schemaless_vertex_schema(
3943 "n",
3944 &["name".to_string(), "age".to_string()],
3945 &empty_schema,
3946 );
3947
3948 assert_eq!(schema.fields().len(), 4);
3949 assert_eq!(schema.field(0).name(), "n._vid");
3950 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3951 assert_eq!(schema.field(1).name(), "n._labels");
3952 assert_eq!(schema.field(2).name(), "n.name");
3953 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3955 assert_eq!(schema.field(3).name(), "n.age");
3956 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3957 }
3958
3959 #[test]
3960 fn test_schemaless_all_scan_has_empty_label() {
3961 let empty_schema = uni_common::core::schema::Schema::default();
3962 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3963
3964 assert_eq!(schema.fields().len(), 2);
3966 assert_eq!(schema.field(0).name(), "n._vid");
3967 assert_eq!(schema.field(1).name(), "n._labels");
3968 }
3969
3970 #[test]
3971 fn test_cypher_value_all_props_extraction() {
3972 let map: HashMap<String, Value> = [
3975 ("age".to_string(), Value::Int(30)),
3976 ("name".to_string(), Value::String("Alice".to_string())),
3977 ]
3978 .into_iter()
3979 .collect();
3980 let cv_bytes = uni_common::cypher_value_codec::encode(&Value::Map(map));
3981
3982 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3984 match decoded {
3985 uni_common::Value::Map(map) => {
3986 let age_val = map.get("age").unwrap();
3987 assert_eq!(age_val, &uni_common::Value::Int(30));
3988 }
3989 _ => panic!("Expected Map"),
3990 }
3991
3992 let single_bytes = uni_common::cypher_value_codec::encode(&Value::Int(30));
3994 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3995 assert_eq!(single_decoded, uni_common::Value::Int(30));
3996 }
3997
3998 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
4000 let schema = Arc::new(Schema::new(vec![
4001 Field::new("_vid", DataType::UInt64, false),
4002 Field::new("_deleted", DataType::Boolean, false),
4003 Field::new("_version", DataType::UInt64, false),
4004 Field::new("name", DataType::Utf8, true),
4005 ]));
4006 let names: Vec<String> = vids
4008 .iter()
4009 .zip(versions.iter())
4010 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
4011 .collect();
4012 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
4013
4014 RecordBatch::try_new(
4015 schema,
4016 vec![
4017 Arc::new(UInt64Array::from(vids.to_vec())),
4018 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
4019 Arc::new(UInt64Array::from(versions.to_vec())),
4020 Arc::new(name_arr),
4021 ],
4022 )
4023 .unwrap()
4024 }
4025
4026 #[test]
4027 fn test_mvcc_dedup_multiple_versions() {
4028 let batch = make_mvcc_batch(
4031 &[1, 1, 1, 2, 2],
4032 &[3, 1, 5, 2, 4],
4033 &[false, false, false, false, false],
4034 );
4035
4036 let result = mvcc_dedup_batch(&batch).unwrap();
4037 assert_eq!(result.num_rows(), 2);
4038
4039 let vid_col = result
4040 .column_by_name("_vid")
4041 .unwrap()
4042 .as_any()
4043 .downcast_ref::<UInt64Array>()
4044 .unwrap();
4045 let ver_col = result
4046 .column_by_name("_version")
4047 .unwrap()
4048 .as_any()
4049 .downcast_ref::<UInt64Array>()
4050 .unwrap();
4051 let name_col = result
4052 .column_by_name("name")
4053 .unwrap()
4054 .as_any()
4055 .downcast_ref::<arrow_array::StringArray>()
4056 .unwrap();
4057
4058 assert_eq!(vid_col.value(0), 1);
4060 assert_eq!(ver_col.value(0), 5);
4061 assert_eq!(name_col.value(0), "v1_ver5");
4062
4063 assert_eq!(vid_col.value(1), 2);
4064 assert_eq!(ver_col.value(1), 4);
4065 assert_eq!(name_col.value(1), "v2_ver4");
4066 }
4067
4068 #[test]
4069 fn test_mvcc_dedup_single_rows() {
4070 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4072 let result = mvcc_dedup_batch(&batch).unwrap();
4073 assert_eq!(result.num_rows(), 3);
4074 }
4075
4076 #[test]
4077 fn test_mvcc_dedup_empty() {
4078 let batch = make_mvcc_batch(&[], &[], &[]);
4079 let result = mvcc_dedup_batch(&batch).unwrap();
4080 assert_eq!(result.num_rows(), 0);
4081 }
4082
4083 #[test]
4084 fn test_filter_l0_tombstones_removes_tombstoned() {
4085 use crate::query::df_graph::L0Context;
4086
4087 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4089
4090 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
4092 {
4093 }
4097 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
4098 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
4099
4100 let l0_ctx = L0Context {
4101 current_l0: Some(l0_buf),
4102 transaction_l0: None,
4103 pending_flush_l0s: vec![],
4104 };
4105
4106 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4107 assert_eq!(result.num_rows(), 2);
4108
4109 let vid_col = result
4110 .column_by_name("_vid")
4111 .unwrap()
4112 .as_any()
4113 .downcast_ref::<UInt64Array>()
4114 .unwrap();
4115 assert_eq!(vid_col.value(0), 1);
4116 assert_eq!(vid_col.value(1), 3);
4117 }
4118
4119 #[test]
4120 fn test_filter_l0_tombstones_none() {
4121 use crate::query::df_graph::L0Context;
4122
4123 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4124 let l0_ctx = L0Context::default();
4125
4126 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4127 assert_eq!(result.num_rows(), 3);
4128 }
4129
4130 #[test]
4131 fn test_map_to_output_schema_basic() {
4132 use crate::query::df_graph::L0Context;
4133
4134 let lance_schema = Arc::new(Schema::new(vec![
4136 Field::new("_vid", DataType::UInt64, false),
4137 Field::new("_deleted", DataType::Boolean, false),
4138 Field::new("_version", DataType::UInt64, false),
4139 Field::new("name", DataType::Utf8, true),
4140 ]));
4141 let name_arr: arrow_array::StringArray =
4142 vec![Some("Alice"), Some("Bob")].into_iter().collect();
4143 let batch = RecordBatch::try_new(
4144 lance_schema,
4145 vec![
4146 Arc::new(UInt64Array::from(vec![1u64, 2])),
4147 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
4148 Arc::new(UInt64Array::from(vec![1u64, 1])),
4149 Arc::new(name_arr),
4150 ],
4151 )
4152 .unwrap();
4153
4154 let output_schema = Arc::new(Schema::new(vec![
4156 Field::new("n._vid", DataType::UInt64, false),
4157 Field::new("n._labels", labels_data_type(), true),
4158 Field::new("n.name", DataType::Utf8, true),
4159 ]));
4160
4161 let l0_ctx = L0Context::default();
4162 let result = map_to_output_schema(
4163 &batch,
4164 "Person",
4165 "n",
4166 &["name".to_string()],
4167 &output_schema,
4168 &l0_ctx,
4169 )
4170 .unwrap();
4171
4172 assert_eq!(result.num_rows(), 2);
4173 assert_eq!(result.schema().fields().len(), 3);
4174 assert_eq!(result.schema().field(0).name(), "n._vid");
4175 assert_eq!(result.schema().field(1).name(), "n._labels");
4176 assert_eq!(result.schema().field(2).name(), "n.name");
4177
4178 let name_col = result
4180 .column(2)
4181 .as_any()
4182 .downcast_ref::<arrow_array::StringArray>()
4183 .unwrap();
4184 assert_eq!(name_col.value(0), "Alice");
4185 assert_eq!(name_col.value(1), "Bob");
4186 }
4187}