1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
92
93 vid_list_filter: Option<Vec<u64>>,
98
99 extra_lance_filter: Option<String>,
105
106 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112 is_edge_scan: bool,
114
115 is_schemaless: bool,
117
118 schema: SchemaRef,
120
121 properties: Arc<PlanProperties>,
123
124 metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("GraphScanExec")
131 .field("label", &self.label)
132 .field("variable", &self.variable)
133 .field("projected_properties", &self.projected_properties)
134 .field(
135 "vid_list_filter_len",
136 &self.vid_list_filter.as_ref().map(Vec::len),
137 )
138 .field("is_edge_scan", &self.is_edge_scan)
139 .finish()
140 }
141}
142
143impl GraphScanExec {
144 pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149 self.vid_list_filter = Some(vids);
150 self
151 }
152
153 pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157 self.extra_lance_filter = Some(filter);
158 self
159 }
160
161 pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165 self.extra_runtime_filter = Some(filter);
166 self
167 }
168
169 pub fn has_extra_lance_filter(&self) -> bool {
172 self.extra_lance_filter.is_some()
173 }
174
175 pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185 if self.is_edge_scan {
186 return Err(datafusion::error::DataFusionError::Plan(
187 "execute_with_vid_filter not supported for edge scans".into(),
188 ));
189 }
190 if self.is_schemaless {
191 columnar_scan_schemaless_vertex_batch_static(
192 &self.graph_ctx,
193 &self.label,
194 &self.variable,
195 &self.projected_properties,
196 &self.schema,
197 &self.filter,
198 Some(vids),
199 self.extra_lance_filter.as_deref(),
200 self.extra_runtime_filter.as_ref(),
201 )
202 .await
203 } else {
204 columnar_scan_vertex_batch_static(
205 &self.graph_ctx,
206 &self.label,
207 &self.variable,
208 &self.projected_properties,
209 &self.schema,
210 &self.filter,
211 Some(vids),
212 self.extra_lance_filter.as_deref(),
213 self.extra_runtime_filter.as_ref(),
214 )
215 .await
216 }
217 }
218}
219
220impl GraphScanExec {
221 pub fn new_vertex_scan(
226 graph_ctx: Arc<GraphExecutionContext>,
227 label: impl Into<String>,
228 variable: impl Into<String>,
229 projected_properties: Vec<String>,
230 filter: Option<Arc<dyn PhysicalExpr>>,
231 ) -> Self {
232 let label = label.into();
233 let variable = variable.into();
234
235 let uni_schema = graph_ctx.storage().schema_manager().schema();
237 let schema =
238 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240 let properties = compute_plan_properties(schema.clone());
241
242 Self {
243 graph_ctx,
244 label,
245 variable,
246 projected_properties,
247 filter,
248 vid_list_filter: None,
249 extra_lance_filter: None,
250 extra_runtime_filter: None,
251 is_edge_scan: false,
252 is_schemaless: false,
253 schema,
254 properties,
255 metrics: ExecutionPlanMetricsSet::new(),
256 }
257 }
258
259 pub fn new_schemaless_vertex_scan(
265 graph_ctx: Arc<GraphExecutionContext>,
266 label_name: impl Into<String>,
267 variable: impl Into<String>,
268 projected_properties: Vec<String>,
269 filter: Option<Arc<dyn PhysicalExpr>>,
270 ) -> Self {
271 let label = label_name.into();
272 let variable = variable.into();
273
274 let projected_properties: Vec<String> = projected_properties
279 .into_iter()
280 .filter(|p| p != "_vid" && p != "_labels")
281 .collect();
282
283 let uni_schema = graph_ctx.storage().schema_manager().schema();
284 let schema =
285 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286 let properties = compute_plan_properties(schema.clone());
287
288 Self {
289 graph_ctx,
290 label,
291 variable,
292 projected_properties,
293 filter,
294 vid_list_filter: None,
295 extra_lance_filter: None,
296 extra_runtime_filter: None,
297 is_edge_scan: false,
298 is_schemaless: true,
299 schema,
300 properties,
301 metrics: ExecutionPlanMetricsSet::new(),
302 }
303 }
304
305 pub fn new_multi_label_vertex_scan(
310 graph_ctx: Arc<GraphExecutionContext>,
311 labels: Vec<String>,
312 variable: impl Into<String>,
313 projected_properties: Vec<String>,
314 filter: Option<Arc<dyn PhysicalExpr>>,
315 ) -> Self {
316 let variable = variable.into();
317 let projected_properties: Vec<String> = projected_properties
318 .into_iter()
319 .filter(|p| p != "_vid" && p != "_labels")
320 .collect();
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
322 let schema =
323 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324 let properties = compute_plan_properties(schema.clone());
325
326 let encoded_labels = labels.join(":");
328
329 Self {
330 graph_ctx,
331 label: encoded_labels,
332 variable,
333 projected_properties,
334 filter,
335 vid_list_filter: None,
336 extra_lance_filter: None,
337 extra_runtime_filter: None,
338 is_edge_scan: false,
339 is_schemaless: true,
340 schema,
341 properties,
342 metrics: ExecutionPlanMetricsSet::new(),
343 }
344 }
345
346 pub fn new_schemaless_all_scan(
352 graph_ctx: Arc<GraphExecutionContext>,
353 variable: impl Into<String>,
354 projected_properties: Vec<String>,
355 filter: Option<Arc<dyn PhysicalExpr>>,
356 ) -> Self {
357 let variable = variable.into();
358 let projected_properties: Vec<String> = projected_properties
359 .into_iter()
360 .filter(|p| p != "_vid" && p != "_labels")
361 .collect();
362
363 let uni_schema = graph_ctx.storage().schema_manager().schema();
364 let schema =
365 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366 let properties = compute_plan_properties(schema.clone());
367
368 Self {
369 graph_ctx,
370 label: String::new(), variable,
372 projected_properties,
373 filter,
374 vid_list_filter: None,
375 extra_lance_filter: None,
376 extra_runtime_filter: None,
377 is_edge_scan: false,
378 is_schemaless: true,
379 schema,
380 properties,
381 metrics: ExecutionPlanMetricsSet::new(),
382 }
383 }
384
385 fn build_schemaless_vertex_schema(
391 variable: &str,
392 properties: &[String],
393 uni_schema: &uni_common::core::schema::Schema,
394 ) -> SchemaRef {
395 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397 std::collections::HashMap::new();
398 for label_props in uni_schema.properties.values() {
399 for (name, meta) in label_props {
400 merged.entry(name.as_str()).or_insert(meta);
401 }
402 }
403
404 let mut fields = vec![
405 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406 Field::new(format!("{}._labels", variable), labels_data_type(), true),
407 ];
408
409 for prop in properties {
410 let col_name = format!("{}.{}", variable, prop);
411 let uni_type = merged.get(prop.as_str()).map(|meta| &meta.r#type);
412 let arrow_type = uni_type
413 .map(|t| t.to_arrow())
414 .unwrap_or(DataType::LargeBinary);
415 fields.push(property_field(&col_name, arrow_type, uni_type));
416 }
417
418 Arc::new(Schema::new(fields))
419 }
420
421 pub fn new_edge_scan(
426 graph_ctx: Arc<GraphExecutionContext>,
427 edge_type: impl Into<String>,
428 variable: impl Into<String>,
429 projected_properties: Vec<String>,
430 filter: Option<Arc<dyn PhysicalExpr>>,
431 ) -> Self {
432 let label = edge_type.into();
433 let variable = variable.into();
434
435 let uni_schema = graph_ctx.storage().schema_manager().schema();
437 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439 let properties = compute_plan_properties(schema.clone());
440
441 Self {
442 graph_ctx,
443 label,
444 variable,
445 projected_properties,
446 filter,
447 vid_list_filter: None,
448 extra_lance_filter: None,
449 extra_runtime_filter: None,
450 is_edge_scan: true,
451 is_schemaless: false,
452 schema,
453 properties,
454 metrics: ExecutionPlanMetricsSet::new(),
455 }
456 }
457
458 fn build_vertex_schema(
460 variable: &str,
461 label: &str,
462 properties: &[String],
463 uni_schema: &UniSchema,
464 ) -> SchemaRef {
465 let mut fields = vec![
466 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467 Field::new(format!("{}._labels", variable), labels_data_type(), true),
468 ];
469 let label_props = uni_schema.properties.get(label);
470 for prop in properties {
471 let col_name = format!("{}.{}", variable, prop);
472 let arrow_type = resolve_property_type(prop, label_props);
473 let uni_type = label_props
474 .and_then(|props| props.get(prop))
475 .map(|m| &m.r#type);
476 fields.push(property_field(&col_name, arrow_type, uni_type));
477 }
478 Arc::new(Schema::new(fields))
479 }
480
481 fn build_edge_schema(
483 variable: &str,
484 edge_type: &str,
485 properties: &[String],
486 uni_schema: &UniSchema,
487 ) -> SchemaRef {
488 let mut fields = vec![
489 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
490 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
491 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
492 ];
493 let edge_props = uni_schema.properties.get(edge_type);
494 for prop in properties {
495 let col_name = format!("{}.{}", variable, prop);
496 let arrow_type = resolve_property_type(prop, edge_props);
497 let uni_type = edge_props
498 .and_then(|props| props.get(prop))
499 .map(|m| &m.r#type);
500 fields.push(property_field(&col_name, arrow_type, uni_type));
501 }
502 Arc::new(Schema::new(fields))
503 }
504}
505
506impl DisplayAs for GraphScanExec {
507 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
509 write!(
510 f,
511 "GraphScanExec: {}={}, properties={:?}",
512 scan_type, self.label, self.projected_properties
513 )?;
514 if self.filter.is_some() {
515 write!(f, ", filter=<pushed>")?;
516 }
517 Ok(())
518 }
519}
520
521impl ExecutionPlan for GraphScanExec {
522 fn name(&self) -> &str {
523 "GraphScanExec"
524 }
525
526 fn as_any(&self) -> &dyn Any {
527 self
528 }
529
530 fn schema(&self) -> SchemaRef {
531 self.schema.clone()
532 }
533
534 fn properties(&self) -> &Arc<PlanProperties> {
535 &self.properties
536 }
537
538 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
539 vec![]
540 }
541
542 fn with_new_children(
543 self: Arc<Self>,
544 children: Vec<Arc<dyn ExecutionPlan>>,
545 ) -> DFResult<Arc<dyn ExecutionPlan>> {
546 if children.is_empty() {
547 Ok(self)
548 } else {
549 Err(datafusion::error::DataFusionError::Plan(
550 "GraphScanExec does not accept children".to_string(),
551 ))
552 }
553 }
554
555 fn execute(
556 &self,
557 partition: usize,
558 _context: Arc<TaskContext>,
559 ) -> DFResult<SendableRecordBatchStream> {
560 let metrics = BaselineMetrics::new(&self.metrics, partition);
561
562 Ok(Box::pin(GraphScanStream::new(
563 self.graph_ctx.clone(),
564 self.label.clone(),
565 self.variable.clone(),
566 self.projected_properties.clone(),
567 self.is_edge_scan,
568 self.is_schemaless,
569 self.filter.clone(),
570 self.vid_list_filter.clone(),
571 self.extra_lance_filter.clone(),
572 self.extra_runtime_filter.clone(),
573 self.schema.clone(),
574 metrics,
575 )))
576 }
577
578 fn metrics(&self) -> Option<MetricsSet> {
579 Some(self.metrics.clone_inner())
580 }
581}
582
583enum GraphScanState {
585 Init,
587 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
589 Done,
591}
592
593struct GraphScanStream {
599 graph_ctx: Arc<GraphExecutionContext>,
601
602 label: String,
604
605 variable: String,
607
608 properties: Vec<String>,
610
611 is_edge_scan: bool,
613
614 is_schemaless: bool,
616
617 filter: Option<Arc<dyn PhysicalExpr>>,
619
620 vid_list_filter: Option<Vec<u64>>,
622
623 extra_lance_filter: Option<String>,
626
627 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
629
630 schema: SchemaRef,
632
633 state: GraphScanState,
635
636 metrics: BaselineMetrics,
638}
639
640impl GraphScanStream {
641 #[expect(clippy::too_many_arguments)]
643 fn new(
644 graph_ctx: Arc<GraphExecutionContext>,
645 label: String,
646 variable: String,
647 properties: Vec<String>,
648 is_edge_scan: bool,
649 is_schemaless: bool,
650 filter: Option<Arc<dyn PhysicalExpr>>,
651 vid_list_filter: Option<Vec<u64>>,
652 extra_lance_filter: Option<String>,
653 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
654 schema: SchemaRef,
655 metrics: BaselineMetrics,
656 ) -> Self {
657 Self {
658 graph_ctx,
659 label,
660 variable,
661 properties,
662 is_edge_scan,
663 is_schemaless,
664 filter,
665 vid_list_filter,
666 extra_lance_filter,
667 extra_runtime_filter,
668 schema,
669 state: GraphScanState::Init,
670 metrics,
671 }
672 }
673}
674
675pub(crate) fn resolve_property_type(
680 prop: &str,
681 schema_props: Option<
682 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
683 >,
684) -> DataType {
685 if prop == "overflow_json" {
686 DataType::LargeBinary
687 } else if prop == "_created_at" || prop == "_updated_at" {
688 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
692 } else {
693 schema_props
694 .and_then(|props| props.get(prop))
695 .map(|meta| meta.r#type.to_arrow())
696 .unwrap_or(DataType::LargeBinary)
697 }
698}
699
700pub(crate) fn property_field(
709 col_name: &str,
710 arrow_type: DataType,
711 uni_type: Option<&uni_common::DataType>,
712) -> Field {
713 let field = Field::new(col_name, arrow_type, true);
714 if matches!(uni_type, Some(uni_common::DataType::Bytes)) {
715 field.with_metadata(std::collections::HashMap::from([(
716 "uni_raw_bytes".to_string(),
717 "true".to_string(),
718 )]))
719 } else {
720 field
721 }
722}
723
724#[cfg(test)]
733fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
734 mvcc_dedup_batch_by(batch, "_vid")
735}
736
737fn mvcc_dedup_to_option(
742 batch: Option<RecordBatch>,
743 id_column: &str,
744) -> DFResult<Option<RecordBatch>> {
745 match batch {
746 Some(b) => {
747 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
748 Ok(if deduped.num_rows() > 0 {
749 Some(deduped)
750 } else {
751 None
752 })
753 }
754 None => Ok(None),
755 }
756}
757
758fn merge_lance_and_l0(
762 lance_deduped: Option<RecordBatch>,
763 l0_batch: RecordBatch,
764 internal_schema: &SchemaRef,
765 id_column: &str,
766) -> DFResult<Option<RecordBatch>> {
767 let has_l0 = l0_batch.num_rows() > 0;
768 match (lance_deduped, has_l0) {
769 (Some(lance), true) => {
770 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
771 .map_err(arrow_err)?;
772 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
773 }
774 (Some(lance), false) => Ok(Some(lance)),
775 (None, true) => Ok(Some(l0_batch)),
776 (None, false) => Ok(None),
777 }
778}
779
780async fn drop_superseded_pushdown_rows(
793 storage: &Arc<uni_store::storage::manager::StorageManager>,
794 label_table: Option<&str>,
795 batch: RecordBatch,
796) -> DFResult<RecordBatch> {
797 if batch.num_rows() == 0 {
798 return Ok(batch);
799 }
800 let (Some(vid_col), Some(ver_col)) = (
801 batch
802 .column_by_name("_vid")
803 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
804 batch
805 .column_by_name("_version")
806 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
807 ) else {
808 return Err(datafusion::error::DataFusionError::Execution(
809 "pushdown version verification: scan batch missing _vid/_version".to_string(),
810 ));
811 };
812
813 let mut candidates: Vec<u64> = Vec::new();
814 let mut seen: HashSet<u64> = HashSet::new();
815 for i in 0..vid_col.len() {
816 let vid = vid_col.value(i);
817 if seen.insert(vid) {
818 candidates.push(vid);
819 }
820 }
821
822 const VERIFY_CHUNK: usize = 1000;
826 let mut max_ver: HashMap<u64, u64> = HashMap::with_capacity(candidates.len());
827 for chunk in candidates.chunks(VERIFY_CHUNK) {
828 let filter = format_vid_in_list(chunk);
829 let scanned = match label_table {
830 Some(label) => {
831 storage
832 .scan_vertex_table(label, &["_vid", "_version"], Some(&filter))
833 .await
834 }
835 None => {
836 storage
837 .scan_main_vertex_table(&["_vid", "_version"], Some(&filter))
838 .await
839 }
840 }
841 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
842 let Some(vbatch) = scanned else { continue };
843 let (Some(v_vid), Some(v_ver)) = (
844 vbatch
845 .column_by_name("_vid")
846 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
847 vbatch
848 .column_by_name("_version")
849 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
850 ) else {
851 return Err(datafusion::error::DataFusionError::Execution(
852 "pushdown version verification: rescan missing _vid/_version".to_string(),
853 ));
854 };
855 for i in 0..v_vid.len() {
856 let entry = max_ver.entry(v_vid.value(i)).or_insert(0);
857 *entry = (*entry).max(v_ver.value(i));
858 }
859 }
860
861 let keep: arrow_array::BooleanArray = (0..batch.num_rows())
862 .map(|i| {
863 Some(
864 max_ver
865 .get(&vid_col.value(i))
866 .is_none_or(|&max| ver_col.value(i) >= max),
867 )
868 })
869 .collect();
870 arrow::compute::filter_record_batch(&batch, &keep).map_err(arrow_err)
871}
872
873fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
878 if !columns.iter().any(|c| c == col_name) {
879 columns.push(col_name.to_string());
880 }
881}
882
883fn extract_from_overflow_blob(
888 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
889 row: usize,
890 prop: &str,
891) -> Option<Vec<u8>> {
892 let arr = overflow_arr?;
893 if arr.is_null(row) {
894 return None;
895 }
896 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
897}
898
899fn build_overflow_property_column(
906 num_rows: usize,
907 vid_arr: &UInt64Array,
908 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
909 prop: &str,
910 l0_ctx: &crate::query::df_graph::L0Context,
911) -> ArrayRef {
912 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
913 for i in 0..num_rows {
914 let vid = Vid::from(vid_arr.value(i));
915
916 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
918
919 if let Some(val_opt) = l0_val {
920 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
921 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
922 builder.append_value(&bytes);
923 } else {
924 builder.append_null();
925 }
926 }
927 Arc::new(builder.finish())
928}
929
930fn resolve_l0_property(
936 vid: &Vid,
937 prop: &str,
938 l0_ctx: &crate::query::df_graph::L0Context,
939) -> Option<Option<Value>> {
940 let mut result = None;
941 for l0 in l0_ctx.iter_l0_buffers() {
942 let guard = l0.read();
943 if let Some(props) = guard.vertex_properties.get(vid)
944 && let Some(val) = props.get(prop)
945 {
946 result = Some(Some(val.clone()));
947 }
948 }
949 result
950}
951
952fn append_value_as_cypher_binary(
957 builder: &mut arrow_array::builder::LargeBinaryBuilder,
958 val: Option<&Value>,
959) {
960 match val {
961 Some(v) if !v.is_null() => {
962 builder.append_value(uni_common::cypher_value_codec::encode(v));
963 }
964 _ => builder.append_null(),
965 }
966}
967
968fn build_all_props_column_with_l0_overlay(
976 num_rows: usize,
977 vid_arr: &UInt64Array,
978 props_arr: Option<&arrow_array::LargeBinaryArray>,
979 l0_ctx: &crate::query::df_graph::L0Context,
980) -> ArrayRef {
981 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
982 for i in 0..num_rows {
983 let vid = Vid::from(vid_arr.value(i));
984
985 let mut merged_props: HashMap<String, Value> = HashMap::new();
988 if let Some(arr) = props_arr
989 && !arr.is_null(i)
990 && let Ok(uni_common::Value::Map(map)) =
991 uni_common::cypher_value_codec::decode(arr.value(i))
992 {
993 merged_props.extend(map);
994 }
995
996 for l0 in l0_ctx.iter_l0_buffers() {
998 let guard = l0.read();
999 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1000 for (k, v) in l0_props {
1001 merged_props.insert(k.clone(), v.clone());
1002 }
1003 }
1004 }
1005
1006 if merged_props.is_empty() {
1008 builder.append_null();
1009 } else {
1010 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1011 merged_props,
1012 )));
1013 }
1014 }
1015 Arc::new(builder.finish())
1016}
1017
1018fn build_all_props_column_for_schema_scan(
1023 batch: &RecordBatch,
1024 vid_arr: &UInt64Array,
1025 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
1026 projected_properties: &[String],
1027 l0_ctx: &crate::query::df_graph::L0Context,
1028) -> ArrayRef {
1029 let schema_props: Vec<&str> = projected_properties
1031 .iter()
1032 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
1033 .map(String::as_str)
1034 .collect();
1035
1036 let num_rows = batch.num_rows();
1037 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1038 for i in 0..num_rows {
1039 let vid = Vid::from(vid_arr.value(i));
1040 let mut merged_props: HashMap<String, Value> = HashMap::new();
1043
1044 for &prop in &schema_props {
1046 if let Some(col) = batch.column_by_name(prop) {
1047 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
1048 if !val.is_null() {
1049 merged_props.insert(prop.to_string(), val);
1050 }
1051 }
1052 }
1053
1054 if let Some(arr) = overflow_arr
1056 && !arr.is_null(i)
1057 && let Ok(uni_common::Value::Map(map)) =
1058 uni_common::cypher_value_codec::decode(arr.value(i))
1059 {
1060 merged_props.extend(map);
1061 }
1062
1063 for l0 in l0_ctx.iter_l0_buffers() {
1065 let guard = l0.read();
1066 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1067 for (k, v) in l0_props {
1068 merged_props.insert(k.clone(), v.clone());
1069 }
1070 }
1071 }
1072
1073 if merged_props.is_empty() {
1074 builder.append_null();
1075 } else {
1076 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1077 merged_props,
1078 )));
1079 }
1080 }
1081 Arc::new(builder.finish())
1082}
1083
1084fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
1090 if batch.num_rows() == 0 {
1091 return Ok(batch.clone());
1092 }
1093
1094 let id_col = batch
1095 .column_by_name(id_column)
1096 .ok_or_else(|| {
1097 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
1098 })?
1099 .clone();
1100 let version_col = batch
1101 .column_by_name("_version")
1102 .ok_or_else(|| {
1103 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
1104 })?
1105 .clone();
1106
1107 let sort_columns = vec![
1109 arrow::compute::SortColumn {
1110 values: id_col,
1111 options: Some(arrow::compute::SortOptions {
1112 descending: false,
1113 nulls_first: false,
1114 }),
1115 },
1116 arrow::compute::SortColumn {
1117 values: version_col,
1118 options: Some(arrow::compute::SortOptions {
1119 descending: true,
1120 nulls_first: false,
1121 }),
1122 },
1123 ];
1124 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1125
1126 let sorted_columns: Vec<ArrayRef> = batch
1128 .columns()
1129 .iter()
1130 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1131 .collect::<Result<_, _>>()
1132 .map_err(arrow_err)?;
1133 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1134
1135 let sorted_id = sorted
1137 .column_by_name(id_column)
1138 .unwrap()
1139 .as_any()
1140 .downcast_ref::<UInt64Array>()
1141 .unwrap();
1142
1143 let mut keep = vec![false; sorted.num_rows()];
1144 if !keep.is_empty() {
1145 keep[0] = true;
1146 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1147 if sorted_id.value(i) != sorted_id.value(i - 1) {
1148 *flag = true;
1149 }
1150 }
1151 }
1152
1153 let mask = arrow_array::BooleanArray::from(keep);
1154 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1155}
1156
1157fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1159 if batch.num_rows() == 0 {
1160 return Ok(batch.clone());
1161 }
1162 let op_col = match batch.column_by_name("op") {
1163 Some(col) => col
1164 .as_any()
1165 .downcast_ref::<arrow_array::UInt8Array>()
1166 .unwrap(),
1167 None => return Ok(batch.clone()),
1168 };
1169 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1170 let mask = arrow_array::BooleanArray::from(keep);
1171 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1172}
1173
1174fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1176 if batch.num_rows() == 0 {
1177 return Ok(batch.clone());
1178 }
1179 let deleted_col = match batch.column_by_name("_deleted") {
1180 Some(col) => col
1181 .as_any()
1182 .downcast_ref::<arrow_array::BooleanArray>()
1183 .unwrap(),
1184 None => return Ok(batch.clone()),
1185 };
1186 let keep: Vec<bool> = (0..deleted_col.len())
1187 .map(|i| !deleted_col.value(i))
1188 .collect();
1189 let mask = arrow_array::BooleanArray::from(keep);
1190 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1191}
1192
1193fn filter_l0_tombstones(
1195 batch: &RecordBatch,
1196 l0_ctx: &crate::query::df_graph::L0Context,
1197) -> DFResult<RecordBatch> {
1198 if batch.num_rows() == 0 {
1199 return Ok(batch.clone());
1200 }
1201
1202 let mut tombstones: HashSet<u64> = HashSet::new();
1203 for l0 in l0_ctx.iter_l0_buffers() {
1204 let guard = l0.read();
1205 for vid in guard.vertex_tombstones.iter() {
1206 tombstones.insert(vid.as_u64());
1207 }
1208 }
1209
1210 if tombstones.is_empty() {
1211 return Ok(batch.clone());
1212 }
1213
1214 let vid_col = batch
1215 .column_by_name("_vid")
1216 .ok_or_else(|| {
1217 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1218 })?
1219 .as_any()
1220 .downcast_ref::<UInt64Array>()
1221 .unwrap();
1222
1223 let keep: Vec<bool> = (0..vid_col.len())
1224 .map(|i| !tombstones.contains(&vid_col.value(i)))
1225 .collect();
1226 let mask = arrow_array::BooleanArray::from(keep);
1227 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1228}
1229
1230fn filter_l0_edge_tombstones(
1232 batch: &RecordBatch,
1233 l0_ctx: &crate::query::df_graph::L0Context,
1234) -> DFResult<RecordBatch> {
1235 if batch.num_rows() == 0 {
1236 return Ok(batch.clone());
1237 }
1238
1239 let mut tombstones: HashSet<u64> = HashSet::new();
1240 for l0 in l0_ctx.iter_l0_buffers() {
1241 let guard = l0.read();
1242 for eid in guard.tombstones.keys() {
1243 tombstones.insert(eid.as_u64());
1244 }
1245 }
1246
1247 if tombstones.is_empty() {
1248 return Ok(batch.clone());
1249 }
1250
1251 let eid_col = batch
1252 .column_by_name("eid")
1253 .ok_or_else(|| {
1254 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1255 })?
1256 .as_any()
1257 .downcast_ref::<UInt64Array>()
1258 .unwrap();
1259
1260 let keep: Vec<bool> = (0..eid_col.len())
1261 .map(|i| !tombstones.contains(&eid_col.value(i)))
1262 .collect();
1263 let mask = arrow_array::BooleanArray::from(keep);
1264 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1265}
1266
1267fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1276 use datafusion::logical_expr::Operator;
1277 use datafusion::physical_expr::expressions::BinaryExpr;
1278
1279 if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1281 if bin.op() == &Operator::Eq {
1282 if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1284 return Some(vid);
1285 }
1286 if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1287 return Some(vid);
1288 }
1289 }
1290 if bin.op() == &Operator::And {
1292 if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1293 return Some(vid);
1294 }
1295 return extract_vid_from_physical_filter(bin.right());
1296 }
1297 }
1298 None
1299}
1300
1301fn try_extract_vid_eq(
1305 col_side: &Arc<dyn PhysicalExpr>,
1306 val_side: &Arc<dyn PhysicalExpr>,
1307) -> Option<u64> {
1308 use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1309
1310 let col = col_side.as_any().downcast_ref::<Column>()?;
1312 if col.name() != "_vid" && !col.name().ends_with("._vid") {
1313 return None;
1314 }
1315
1316 if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1318 return scalar_to_u64(lit.value());
1319 }
1320
1321 if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1323 && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1324 {
1325 return scalar_to_u64(lit.value());
1326 }
1327
1328 None
1329}
1330
1331fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1334 match (vid_filter, extra) {
1335 (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1336 (Some(a), None) => Some(a.to_string()),
1337 (None, Some(b)) => Some(b.to_string()),
1338 (None, None) => None,
1339 }
1340}
1341
1342fn format_vid_in_list(vids: &[u64]) -> String {
1345 use std::fmt::Write;
1346 debug_assert!(!vids.is_empty());
1347 let mut s = String::with_capacity(vids.len() * 8 + 12);
1348 s.push_str("_vid IN (");
1349 for (i, v) in vids.iter().enumerate() {
1350 if i > 0 {
1351 s.push(',');
1352 }
1353 let _ = write!(s, "{v}");
1354 }
1355 s.push(')');
1356 s
1357}
1358
1359fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1361 use datafusion::common::ScalarValue;
1362 match sv {
1363 ScalarValue::UInt64(Some(v)) => Some(*v),
1364 ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1365 ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1366 ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1367 _ => None,
1368 }
1369}
1370
1371fn build_l0_vertex_batch(
1382 l0_ctx: &crate::query::df_graph::L0Context,
1383 label: &str,
1384 lance_schema: &SchemaRef,
1385 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1386 target_vids: Option<&[u64]>,
1387) -> DFResult<RecordBatch> {
1388 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1391 let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1397 let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1398
1399 for l0 in l0_ctx.iter_l0_buffers() {
1400 let guard = l0.read();
1401 for vid in guard.vertex_tombstones.iter() {
1403 tombstones.insert(vid.as_u64());
1404 }
1405 let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1411 let mut out = Vec::with_capacity(tvs.len());
1412 for &tv in tvs {
1413 let vid = Vid::from(tv);
1414 if guard.vertex_properties.contains_key(&vid)
1415 && (label.is_empty()
1416 || guard
1417 .label_to_vids
1418 .get(label)
1419 .is_some_and(|s| s.contains(&vid)))
1420 {
1421 out.push(vid);
1422 }
1423 }
1424 out
1425 } else {
1426 guard.vids_for_label(label)
1427 };
1428 for vid in candidate_vids {
1429 let vid_u64 = vid.as_u64();
1430 if tombstones.contains(&vid_u64) {
1431 continue;
1432 }
1433 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1434 let entry = vid_data
1435 .entry(vid_u64)
1436 .or_insert_with(|| (Properties::new(), 0));
1437 if let Some(props) = guard.vertex_properties.get(&vid) {
1439 for (k, v) in props {
1440 entry.0.insert(k.clone(), v.clone());
1441 }
1442 }
1443 if version > entry.1 {
1445 entry.1 = version;
1446 }
1447 if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1449 vid_created_at
1450 .entry(vid_u64)
1451 .and_modify(|cur| {
1452 if ts < *cur {
1453 *cur = ts;
1454 }
1455 })
1456 .or_insert(ts);
1457 }
1458 if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1459 vid_updated_at
1460 .entry(vid_u64)
1461 .and_modify(|cur| {
1462 if ts > *cur {
1463 *cur = ts;
1464 }
1465 })
1466 .or_insert(ts);
1467 }
1468 }
1469 }
1470
1471 for t in &tombstones {
1473 vid_data.remove(t);
1474 }
1475
1476 if vid_data.is_empty() {
1477 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1478 }
1479
1480 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1482 vids.sort_unstable();
1483
1484 let num_rows = vids.len();
1485 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1486
1487 let schema_prop_names: HashSet<&str> = label_props
1489 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1490 .unwrap_or_default();
1491
1492 for field in lance_schema.fields() {
1493 let col_name = field.name().as_str();
1494 match col_name {
1495 "_vid" => {
1496 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1497 }
1498 "_deleted" => {
1499 let vals = vec![false; num_rows];
1501 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1502 }
1503 "_version" => {
1504 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1505 columns.push(Arc::new(UInt64Array::from(vals)));
1506 }
1507 "_created_at" => {
1508 let mut builder =
1509 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1510 for v in &vids {
1511 match vid_created_at.get(v) {
1512 Some(&ts) => builder.append_value(ts),
1513 None => builder.append_null(),
1514 }
1515 }
1516 columns.push(Arc::new(builder.finish()));
1517 }
1518 "_updated_at" => {
1519 let mut builder =
1520 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1521 for v in &vids {
1522 match vid_updated_at.get(v) {
1523 Some(&ts) => builder.append_value(ts),
1524 None => builder.append_null(),
1525 }
1526 }
1527 columns.push(Arc::new(builder.finish()));
1528 }
1529 "overflow_json" => {
1530 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1532 for vid_u64 in &vids {
1533 let (props, _) = &vid_data[vid_u64];
1534 let mut overflow: HashMap<String, Value> = HashMap::new();
1535 for (k, v) in props {
1536 if k == "ext_id" || k.starts_with('_') {
1537 continue;
1538 }
1539 if !schema_prop_names.contains(k.as_str()) {
1540 overflow.insert(k.clone(), v.clone());
1541 }
1542 }
1543 if overflow.is_empty() {
1544 builder.append_null();
1545 } else {
1546 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1547 overflow,
1548 )));
1549 }
1550 }
1551 columns.push(Arc::new(builder.finish()));
1552 }
1553 _ => {
1554 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1556 columns.push(col);
1557 }
1558 }
1559 }
1560
1561 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1562}
1563
1564fn build_l0_property_column(
1568 vids: &[u64],
1569 vid_data: &HashMap<u64, (Properties, u64)>,
1570 prop_name: &str,
1571 data_type: &DataType,
1572) -> DFResult<ArrayRef> {
1573 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1575 let props_map: HashMap<Vid, Properties> = vid_data
1576 .iter()
1577 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1578 .collect();
1579
1580 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1581}
1582
1583fn build_l0_edge_batch(
1589 l0_ctx: &crate::query::df_graph::L0Context,
1590 edge_type: &str,
1591 internal_schema: &SchemaRef,
1592 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1593) -> DFResult<RecordBatch> {
1594 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1597 let mut tombstones: HashSet<u64> = HashSet::new();
1598 let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1601 let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1602
1603 for l0 in l0_ctx.iter_l0_buffers() {
1604 let guard = l0.read();
1605 for eid in guard.tombstones.keys() {
1607 tombstones.insert(eid.as_u64());
1608 }
1609 for eid in guard.eids_for_type(edge_type) {
1611 let eid_u64 = eid.as_u64();
1612 if tombstones.contains(&eid_u64) {
1613 continue;
1614 }
1615 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1616 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1617 None => continue,
1618 };
1619 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1620 let entry = eid_data
1621 .entry(eid_u64)
1622 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1623 if let Some(props) = guard.edge_properties.get(&eid) {
1625 for (k, v) in props {
1626 entry.2.insert(k.clone(), v.clone());
1627 }
1628 }
1629 entry.0 = src_vid;
1631 entry.1 = dst_vid;
1632 if version > entry.3 {
1634 entry.3 = version;
1635 }
1636 if let Some(&ts) = guard.edge_created_at.get(&eid) {
1638 eid_created_at
1639 .entry(eid_u64)
1640 .and_modify(|cur| {
1641 if ts < *cur {
1642 *cur = ts;
1643 }
1644 })
1645 .or_insert(ts);
1646 }
1647 if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1648 eid_updated_at
1649 .entry(eid_u64)
1650 .and_modify(|cur| {
1651 if ts > *cur {
1652 *cur = ts;
1653 }
1654 })
1655 .or_insert(ts);
1656 }
1657 }
1658 }
1659
1660 for t in &tombstones {
1662 eid_data.remove(t);
1663 }
1664
1665 if eid_data.is_empty() {
1666 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1667 }
1668
1669 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1671 eids.sort_unstable();
1672
1673 let num_rows = eids.len();
1674 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1675
1676 let schema_prop_names: HashSet<&str> = type_props
1678 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1679 .unwrap_or_default();
1680
1681 for field in internal_schema.fields() {
1682 let col_name = field.name().as_str();
1683 match col_name {
1684 "eid" => {
1685 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1686 }
1687 "src_vid" => {
1688 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1689 columns.push(Arc::new(UInt64Array::from(vals)));
1690 }
1691 "dst_vid" => {
1692 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1693 columns.push(Arc::new(UInt64Array::from(vals)));
1694 }
1695 "op" => {
1696 let vals = vec![0u8; num_rows];
1698 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1699 }
1700 "_version" => {
1701 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1702 columns.push(Arc::new(UInt64Array::from(vals)));
1703 }
1704 "_created_at" => {
1705 let mut builder =
1706 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1707 for e in &eids {
1708 match eid_created_at.get(e) {
1709 Some(&ts) => builder.append_value(ts),
1710 None => builder.append_null(),
1711 }
1712 }
1713 columns.push(Arc::new(builder.finish()));
1714 }
1715 "_updated_at" => {
1716 let mut builder =
1717 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1718 for e in &eids {
1719 match eid_updated_at.get(e) {
1720 Some(&ts) => builder.append_value(ts),
1721 None => builder.append_null(),
1722 }
1723 }
1724 columns.push(Arc::new(builder.finish()));
1725 }
1726 "overflow_json" => {
1727 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1729 for eid_u64 in &eids {
1730 let (_, _, props, _) = &eid_data[eid_u64];
1731 let mut overflow: HashMap<String, Value> = HashMap::new();
1732 for (k, v) in props {
1733 if k.starts_with('_') {
1734 continue;
1735 }
1736 if !schema_prop_names.contains(k.as_str()) {
1737 overflow.insert(k.clone(), v.clone());
1738 }
1739 }
1740 if overflow.is_empty() {
1741 builder.append_null();
1742 } else {
1743 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1744 overflow,
1745 )));
1746 }
1747 }
1748 columns.push(Arc::new(builder.finish()));
1749 }
1750 _ => {
1751 let col =
1753 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1754 columns.push(col);
1755 }
1756 }
1757 }
1758
1759 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1760}
1761
1762fn build_l0_edge_property_column(
1766 eids: &[u64],
1767 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1768 prop_name: &str,
1769 data_type: &DataType,
1770) -> DFResult<ArrayRef> {
1771 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1773 let props_map: HashMap<Vid, Properties> = eid_data
1774 .iter()
1775 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1776 .collect();
1777
1778 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1779}
1780
1781fn build_labels_column_for_known_label(
1787 vid_arr: &UInt64Array,
1788 label: &str,
1789 l0_ctx: &crate::query::df_graph::L0Context,
1790 batch_labels_col: Option<&arrow_array::ListArray>,
1791) -> DFResult<ArrayRef> {
1792 use uni_store::storage::arrow_convert::labels_from_list_array;
1793
1794 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1795
1796 for i in 0..vid_arr.len() {
1797 let vid = Vid::from(vid_arr.value(i));
1798
1799 let mut labels = match batch_labels_col {
1801 Some(list_arr) => {
1802 let stored = labels_from_list_array(list_arr, i);
1803 if stored.is_empty() {
1804 vec![label.to_string()]
1805 } else {
1806 stored
1807 }
1808 }
1809 None => vec![label.to_string()],
1810 };
1811
1812 if !labels.iter().any(|l| l == label) {
1814 labels.push(label.to_string());
1815 }
1816
1817 for l0 in l0_ctx.iter_l0_buffers() {
1819 let guard = l0.read();
1820 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1821 for lbl in l0_labels {
1822 if !labels.contains(lbl) {
1823 labels.push(lbl.clone());
1824 }
1825 }
1826 }
1827 }
1828
1829 let values = labels_builder.values();
1830 for lbl in &labels {
1831 values.append_value(lbl);
1832 }
1833 labels_builder.append(true);
1834 }
1835
1836 Ok(Arc::new(labels_builder.finish()))
1837}
1838
1839fn map_to_output_schema(
1845 batch: &RecordBatch,
1846 label: &str,
1847 _variable: &str,
1848 projected_properties: &[String],
1849 output_schema: &SchemaRef,
1850 l0_ctx: &crate::query::df_graph::L0Context,
1851) -> DFResult<RecordBatch> {
1852 if batch.num_rows() == 0 {
1853 return Ok(RecordBatch::new_empty(output_schema.clone()));
1854 }
1855
1856 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1857
1858 let vid_col = batch
1860 .column_by_name("_vid")
1861 .ok_or_else(|| {
1862 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1863 })?
1864 .clone();
1865 let vid_arr = vid_col
1866 .as_any()
1867 .downcast_ref::<UInt64Array>()
1868 .ok_or_else(|| {
1869 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1870 })?;
1871
1872 let batch_labels_col = batch
1874 .column_by_name("_labels")
1875 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1876 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1877 columns.push(vid_col.clone());
1878 columns.push(labels_col);
1879
1880 let overflow_arr = batch
1883 .column_by_name("overflow_json")
1884 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1885
1886 for prop in projected_properties {
1887 if prop == "overflow_json" {
1888 match batch.column_by_name("overflow_json") {
1889 Some(col) => columns.push(col.clone()),
1890 None => {
1891 columns.push(arrow_array::new_null_array(
1893 &DataType::LargeBinary,
1894 batch.num_rows(),
1895 ));
1896 }
1897 }
1898 } else if prop == "_all_props" {
1899 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1903 let guard = l0.read();
1904 !guard.vertex_properties.is_empty()
1905 });
1906 let has_schema_cols = projected_properties
1908 .iter()
1909 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1910
1911 if !any_l0_has_vertex_props && !has_schema_cols {
1912 match batch.column_by_name("overflow_json") {
1914 Some(col) => columns.push(col.clone()),
1915 None => {
1916 columns.push(arrow_array::new_null_array(
1917 &DataType::LargeBinary,
1918 batch.num_rows(),
1919 ));
1920 }
1921 }
1922 } else {
1923 let col = build_all_props_column_for_schema_scan(
1925 batch,
1926 vid_arr,
1927 overflow_arr,
1928 projected_properties,
1929 l0_ctx,
1930 );
1931 columns.push(col);
1932 }
1933 } else {
1934 match batch.column_by_name(prop) {
1935 Some(col) => columns.push(col.clone()),
1936 None => {
1937 let col = build_overflow_property_column(
1940 batch.num_rows(),
1941 vid_arr,
1942 overflow_arr,
1943 prop,
1944 l0_ctx,
1945 );
1946 columns.push(col);
1947 }
1948 }
1949 }
1950 }
1951
1952 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1953}
1954
1955fn map_edge_to_output_schema(
1962 batch: &RecordBatch,
1963 variable: &str,
1964 projected_properties: &[String],
1965 output_schema: &SchemaRef,
1966) -> DFResult<RecordBatch> {
1967 if batch.num_rows() == 0 {
1968 return Ok(RecordBatch::new_empty(output_schema.clone()));
1969 }
1970
1971 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1972
1973 let eid_col = batch
1975 .column_by_name("eid")
1976 .ok_or_else(|| {
1977 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1978 })?
1979 .clone();
1980 columns.push(eid_col);
1981
1982 let src_col = batch
1984 .column_by_name("src_vid")
1985 .ok_or_else(|| {
1986 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1987 })?
1988 .clone();
1989 columns.push(src_col);
1990
1991 let dst_col = batch
1993 .column_by_name("dst_vid")
1994 .ok_or_else(|| {
1995 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1996 })?
1997 .clone();
1998 columns.push(dst_col);
1999
2000 for prop in projected_properties {
2002 if prop == "overflow_json" {
2003 match batch.column_by_name("overflow_json") {
2004 Some(col) => columns.push(col.clone()),
2005 None => {
2006 columns.push(arrow_array::new_null_array(
2007 &DataType::LargeBinary,
2008 batch.num_rows(),
2009 ));
2010 }
2011 }
2012 } else {
2013 match batch.column_by_name(prop) {
2014 Some(col) => columns.push(col.clone()),
2015 None => {
2016 let overflow_arr = batch
2019 .column_by_name("overflow_json")
2020 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2021
2022 if let Some(arr) = overflow_arr {
2023 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2024 for i in 0..batch.num_rows() {
2025 if !arr.is_null(i) {
2026 let blob = arr.value(i);
2027 if let Some(sub_bytes) =
2029 uni_common::cypher_value_codec::extract_map_entry_raw(
2030 blob, prop,
2031 )
2032 {
2033 builder.append_value(&sub_bytes);
2034 } else {
2035 builder.append_null();
2036 }
2037 } else {
2038 builder.append_null();
2039 }
2040 }
2041 columns.push(Arc::new(builder.finish()));
2042 } else {
2043 let target_field = output_schema
2045 .fields()
2046 .iter()
2047 .find(|f| f.name() == &format!("{}.{}", variable, prop));
2048 let dt = target_field
2049 .map(|f| f.data_type().clone())
2050 .unwrap_or(DataType::LargeBinary);
2051 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
2052 }
2053 }
2054 }
2055 }
2056 }
2057
2058 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2059}
2060
2061#[expect(clippy::too_many_arguments)]
2068async fn columnar_scan_vertex_batch_static(
2069 graph_ctx: &GraphExecutionContext,
2070 label: &str,
2071 variable: &str,
2072 projected_properties: &[String],
2073 output_schema: &SchemaRef,
2074 filter: &Option<Arc<dyn PhysicalExpr>>,
2075 vid_list_filter: Option<&[u64]>,
2076 extra_lance_filter: Option<&str>,
2077 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2078) -> DFResult<RecordBatch> {
2079 let storage = graph_ctx.storage();
2080 let l0_ctx = graph_ctx.l0_context();
2081 let uni_schema = storage.schema_manager().schema();
2082 let label_props = uni_schema.properties.get(label);
2083
2084 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2089
2090 let mut lance_columns: Vec<String> = vec![
2092 "_vid".to_string(),
2093 "_deleted".to_string(),
2094 "_version".to_string(),
2095 ];
2096 for prop in projected_properties {
2097 if prop == "overflow_json" {
2098 push_column_if_absent(&mut lance_columns, "overflow_json");
2099 } else if prop == "_created_at" || prop == "_updated_at" {
2100 push_column_if_absent(&mut lance_columns, prop);
2103 } else {
2104 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2105 if exists_in_schema {
2106 push_column_if_absent(&mut lance_columns, prop);
2107 }
2108 }
2109 }
2110
2111 let needs_overflow = projected_properties.iter().any(|p| {
2114 p == "overflow_json"
2115 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2116 && !label_props.is_some_and(|lp| lp.contains_key(p)))
2117 });
2118 if needs_overflow {
2119 push_column_if_absent(&mut lance_columns, "overflow_json");
2120 }
2121
2122 let vid_part = match (vid_list_filter, target_vid) {
2127 (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2128 (_, Some(v)) => Some(format!("_vid = {v}")),
2129 _ => None,
2130 };
2131 let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2132 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2133
2134 let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2140 Some(reg) => match reg.lookup_label_storage(label) {
2141 Some(plugin_storage) => {
2142 let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2143 datafusion::error::DataFusionError::Execution(format!(
2144 "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2145 e.message, e.code
2146 ))
2147 })?;
2148 use futures::StreamExt;
2149 let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2150 let mut schema_ref: Option<SchemaRef> = None;
2151 while let Some(b) = stream.next().await {
2152 let b = b.map_err(|e| {
2153 datafusion::error::DataFusionError::Execution(format!(
2154 "plugin Storage stream({label}) errored: {e}"
2155 ))
2156 })?;
2157 if schema_ref.is_none() {
2158 schema_ref = Some(b.schema());
2159 }
2160 batches.push(b);
2161 }
2162 if let Some(s) = schema_ref {
2163 Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2164 datafusion::error::DataFusionError::Execution(format!(
2165 "plugin Storage concat({label}) failed: {e}"
2166 ))
2167 })?)
2168 } else {
2169 None
2170 }
2171 }
2172 None => None,
2173 },
2174 None => None,
2175 };
2176
2177 let (lance_batch, pushdown_filtered) = match plugin_batch {
2181 Some(b) => (Some(b), false),
2182 None => (
2183 storage
2184 .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2185 .await
2186 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2187 extra_lance_filter.is_some(),
2188 ),
2189 };
2190
2191 let lance_batch = match (lance_batch, pushdown_filtered) {
2195 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, Some(label), b).await?),
2196 (b, _) => b,
2197 };
2198
2199 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2201
2202 let internal_schema = match &lance_deduped {
2205 Some(batch) => batch.schema(),
2206 None => {
2207 let mut fields = vec![
2208 Field::new("_vid", DataType::UInt64, false),
2209 Field::new("_deleted", DataType::Boolean, false),
2210 Field::new("_version", DataType::UInt64, false),
2211 ];
2212 for col in &lance_columns {
2213 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2214 continue;
2215 }
2216 if col == "overflow_json" {
2217 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2218 } else if col == "_created_at" || col == "_updated_at" {
2219 fields.push(Field::new(
2220 col,
2221 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2222 true,
2223 ));
2224 } else {
2225 let arrow_type = label_props
2226 .and_then(|lp| lp.get(col.as_str()))
2227 .map(|meta| meta.r#type.to_arrow())
2228 .unwrap_or(DataType::LargeBinary);
2229 fields.push(Field::new(col, arrow_type, true));
2230 }
2231 }
2232 Arc::new(Schema::new(fields))
2233 }
2234 };
2235
2236 let single_vid_buf: [u64; 1];
2242 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2243 (Some(vs), _) if !vs.is_empty() => Some(vs),
2244 (_, Some(v)) => {
2245 single_vid_buf = [v];
2246 Some(&single_vid_buf)
2247 }
2248 _ => None,
2249 };
2250 let l0_batch =
2251 build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2252
2253 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2255 else {
2256 return Ok(RecordBatch::new_empty(output_schema.clone()));
2257 };
2258
2259 let merged = filter_deleted_rows(&merged)?;
2261 if merged.num_rows() == 0 {
2262 return Ok(RecordBatch::new_empty(output_schema.clone()));
2263 }
2264
2265 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2267
2268 if filtered.num_rows() == 0 {
2269 return Ok(RecordBatch::new_empty(output_schema.clone()));
2270 }
2271
2272 let mapped = map_to_output_schema(
2274 &filtered,
2275 label,
2276 variable,
2277 projected_properties,
2278 output_schema,
2279 l0_ctx,
2280 )?;
2281
2282 apply_runtime_filter(mapped, extra_runtime_filter)
2286}
2287
2288fn apply_runtime_filter(
2293 batch: RecordBatch,
2294 runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2295) -> DFResult<RecordBatch> {
2296 let Some(filter) = runtime_filter else {
2297 return Ok(batch);
2298 };
2299 if batch.num_rows() == 0 {
2300 return Ok(batch);
2301 }
2302 let result = filter.evaluate(&batch)?;
2303 let array = result.into_array(batch.num_rows())?;
2304 let bools = array
2305 .as_any()
2306 .downcast_ref::<arrow_array::BooleanArray>()
2307 .ok_or_else(|| {
2308 datafusion::error::DataFusionError::Internal(
2309 "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2310 )
2311 })?;
2312 arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2313}
2314
2315async fn columnar_scan_edge_batch_static(
2322 graph_ctx: &GraphExecutionContext,
2323 edge_type: &str,
2324 variable: &str,
2325 projected_properties: &[String],
2326 output_schema: &SchemaRef,
2327) -> DFResult<RecordBatch> {
2328 let storage = graph_ctx.storage();
2329 let l0_ctx = graph_ctx.l0_context();
2330 let uni_schema = storage.schema_manager().schema();
2331 let type_props = uni_schema.properties.get(edge_type);
2332
2333 let mut lance_columns: Vec<String> = vec![
2335 "eid".to_string(),
2336 "src_vid".to_string(),
2337 "dst_vid".to_string(),
2338 "op".to_string(),
2339 "_version".to_string(),
2340 ];
2341 for prop in projected_properties {
2342 if prop == "overflow_json" {
2343 push_column_if_absent(&mut lance_columns, "overflow_json");
2344 } else if prop == "_created_at" || prop == "_updated_at" {
2345 push_column_if_absent(&mut lance_columns, prop);
2347 } else {
2348 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2349 if exists_in_schema {
2350 push_column_if_absent(&mut lance_columns, prop);
2351 }
2352 }
2353 }
2354
2355 let needs_overflow = projected_properties.iter().any(|p| {
2358 p == "overflow_json"
2359 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2360 && !type_props.is_some_and(|tp| tp.contains_key(p)))
2361 });
2362 if needs_overflow {
2363 push_column_if_absent(&mut lance_columns, "overflow_json");
2364 }
2365
2366 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2368 let lance_batch = storage
2369 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2370 .await
2371 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2372
2373 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2375
2376 let internal_schema = match &lance_deduped {
2379 Some(batch) => batch.schema(),
2380 None => {
2381 let mut fields = vec![
2382 Field::new("eid", DataType::UInt64, false),
2383 Field::new("src_vid", DataType::UInt64, false),
2384 Field::new("dst_vid", DataType::UInt64, false),
2385 Field::new("op", DataType::UInt8, false),
2386 Field::new("_version", DataType::UInt64, false),
2387 ];
2388 for col in &lance_columns {
2389 if matches!(
2390 col.as_str(),
2391 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2392 ) {
2393 continue;
2394 }
2395 if col == "overflow_json" {
2396 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2397 } else if col == "_created_at" || col == "_updated_at" {
2398 fields.push(Field::new(
2399 col,
2400 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2401 true,
2402 ));
2403 } else {
2404 let arrow_type = type_props
2405 .and_then(|tp| tp.get(col.as_str()))
2406 .map(|meta| meta.r#type.to_arrow())
2407 .unwrap_or(DataType::LargeBinary);
2408 fields.push(Field::new(col, arrow_type, true));
2409 }
2410 }
2411 Arc::new(Schema::new(fields))
2412 }
2413 };
2414
2415 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2417
2418 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2420 return Ok(RecordBatch::new_empty(output_schema.clone()));
2421 };
2422
2423 let merged = filter_deleted_edge_ops(&merged)?;
2425 if merged.num_rows() == 0 {
2426 return Ok(RecordBatch::new_empty(output_schema.clone()));
2427 }
2428
2429 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2431
2432 if filtered.num_rows() == 0 {
2433 return Ok(RecordBatch::new_empty(output_schema.clone()));
2434 }
2435
2436 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2438}
2439
2440#[expect(clippy::too_many_arguments)]
2447async fn columnar_scan_schemaless_vertex_batch_static(
2448 graph_ctx: &GraphExecutionContext,
2449 label: &str,
2450 variable: &str,
2451 projected_properties: &[String],
2452 output_schema: &SchemaRef,
2453 filter: &Option<Arc<dyn PhysicalExpr>>,
2454 vid_list_filter: Option<&[u64]>,
2455 extra_lance_filter: Option<&str>,
2456 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2457) -> DFResult<RecordBatch> {
2458 let storage = graph_ctx.storage();
2459 let l0_ctx = graph_ctx.l0_context();
2460
2461 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2465
2466 let filter = {
2469 let mut parts = Vec::new();
2470
2471 match (vid_list_filter, target_vid) {
2474 (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2475 (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2476 _ => {}
2477 }
2478
2479 if !label.is_empty() {
2481 if label.contains(':') {
2482 for lbl in label.split(':') {
2484 parts.push(format!("array_contains(labels, '{}')", lbl));
2485 }
2486 } else {
2487 parts.push(format!("array_contains(labels, '{}')", label));
2488 }
2489 }
2490
2491 if let Some(extra) = extra_lance_filter {
2493 parts.push(extra.to_string());
2494 }
2495
2496 if parts.is_empty() {
2497 None
2498 } else {
2499 Some(parts.join(" AND "))
2500 }
2501 };
2502
2503 let lance_batch = storage
2505 .scan_main_vertex_table(
2506 &["_vid", "_deleted", "labels", "props_json", "_version"],
2507 filter.as_deref(),
2508 )
2509 .await
2510 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2511
2512 let lance_batch = match (lance_batch, extra_lance_filter.is_some()) {
2516 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, None, b).await?),
2517 (b, _) => b,
2518 };
2519
2520 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2522
2523 let internal_schema = match &lance_deduped {
2526 Some(batch) => batch.schema(),
2527 None => Arc::new(Schema::new(vec![
2528 Field::new("_vid", DataType::UInt64, false),
2529 Field::new("_deleted", DataType::Boolean, false),
2530 Field::new("labels", labels_data_type(), false),
2531 Field::new("props_json", DataType::LargeBinary, true),
2532 Field::new("_version", DataType::UInt64, false),
2533 ])),
2534 };
2535
2536 let single_vid_buf: [u64; 1];
2540 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2541 (Some(vs), _) if !vs.is_empty() => Some(vs),
2542 (_, Some(v)) => {
2543 single_vid_buf = [v];
2544 Some(&single_vid_buf)
2545 }
2546 _ => None,
2547 };
2548 let l0_batch =
2549 build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2550
2551 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2553 else {
2554 return Ok(RecordBatch::new_empty(output_schema.clone()));
2555 };
2556
2557 let merged = filter_deleted_rows(&merged)?;
2559 if merged.num_rows() == 0 {
2560 return Ok(RecordBatch::new_empty(output_schema.clone()));
2561 }
2562
2563 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2565
2566 if filtered.num_rows() == 0 {
2567 return Ok(RecordBatch::new_empty(output_schema.clone()));
2568 }
2569
2570 let mapped = map_to_schemaless_output_schema(
2572 &filtered,
2573 variable,
2574 projected_properties,
2575 output_schema,
2576 l0_ctx,
2577 )?;
2578
2579 apply_runtime_filter(mapped, extra_runtime_filter)
2581}
2582
2583fn build_l0_schemaless_vertex_batch(
2589 l0_ctx: &crate::query::df_graph::L0Context,
2590 label: &str,
2591 internal_schema: &SchemaRef,
2592 target_vids: Option<&[u64]>,
2593) -> DFResult<RecordBatch> {
2594 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2597 let mut tombstones: HashSet<u64> = HashSet::new();
2598
2599 let label_filter: Vec<&str> = if label.is_empty() {
2601 vec![]
2602 } else if label.contains(':') {
2603 label.split(':').collect()
2604 } else {
2605 vec![label]
2606 };
2607
2608 for l0 in l0_ctx.iter_l0_buffers() {
2609 let guard = l0.read();
2610
2611 for vid in guard.vertex_tombstones.iter() {
2613 tombstones.insert(vid.as_u64());
2614 }
2615
2616 let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2619 let mut out = Vec::with_capacity(tvs.len());
2620 for &tv in tvs {
2621 let vid = Vid::from(tv);
2622 if !guard.vertex_properties.contains_key(&vid) {
2623 continue;
2624 }
2625 let label_ok = if label_filter.is_empty() {
2626 true
2627 } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2628 label_filter
2629 .iter()
2630 .all(|lf| labels.contains(&lf.to_string()))
2631 } else {
2632 false
2633 };
2634 if label_ok {
2635 out.push(vid);
2636 }
2637 }
2638 out
2639 } else if label_filter.is_empty() {
2640 guard.all_vertex_vids()
2641 } else if label_filter.len() == 1 {
2642 guard.vids_for_label(label_filter[0])
2643 } else {
2644 guard.vids_with_all_labels(&label_filter)
2645 };
2646
2647 for vid in vids {
2648 let vid_u64 = vid.as_u64();
2649 if tombstones.contains(&vid_u64) {
2650 continue;
2651 }
2652 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2653 let entry = vid_data
2654 .entry(vid_u64)
2655 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2656
2657 if let Some(props) = guard.vertex_properties.get(&vid) {
2659 for (k, v) in props {
2660 entry.0.insert(k.clone(), v.clone());
2661 }
2662 }
2663 if version > entry.1 {
2665 entry.1 = version;
2666 }
2667 if let Some(labels) = guard.vertex_labels.get(&vid) {
2669 entry.2 = labels.clone();
2670 }
2671 }
2672 }
2673
2674 for t in &tombstones {
2676 vid_data.remove(t);
2677 }
2678
2679 if vid_data.is_empty() {
2680 return Ok(RecordBatch::new_empty(internal_schema.clone()));
2681 }
2682
2683 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2685 vids.sort_unstable();
2686
2687 let num_rows = vids.len();
2688 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2689
2690 for field in internal_schema.fields() {
2691 match field.name().as_str() {
2692 "_vid" => {
2693 columns.push(Arc::new(UInt64Array::from(vids.clone())));
2694 }
2695 "labels" => {
2696 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2697 for vid_u64 in &vids {
2698 let (_, _, labels) = &vid_data[vid_u64];
2699 let values = labels_builder.values();
2700 for lbl in labels {
2701 values.append_value(lbl);
2702 }
2703 labels_builder.append(true);
2704 }
2705 columns.push(Arc::new(labels_builder.finish()));
2706 }
2707 "props_json" => {
2708 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2709 for vid_u64 in &vids {
2710 let (props, _, _) = &vid_data[vid_u64];
2711 if props.is_empty() {
2712 builder.append_null();
2713 } else {
2714 let map: HashMap<String, Value> =
2717 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2718 builder
2719 .append_value(uni_common::cypher_value_codec::encode(&Value::Map(map)));
2720 }
2721 }
2722 columns.push(Arc::new(builder.finish()));
2723 }
2724 "_deleted" => {
2725 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2727 false;
2728 num_rows
2729 ])));
2730 }
2731 "_version" => {
2732 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2733 columns.push(Arc::new(UInt64Array::from(vals)));
2734 }
2735 _ => {
2736 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2738 }
2739 }
2740 }
2741
2742 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2743}
2744
2745fn map_to_schemaless_output_schema(
2752 batch: &RecordBatch,
2753 _variable: &str,
2754 projected_properties: &[String],
2755 output_schema: &SchemaRef,
2756 l0_ctx: &crate::query::df_graph::L0Context,
2757) -> DFResult<RecordBatch> {
2758 if batch.num_rows() == 0 {
2759 return Ok(RecordBatch::new_empty(output_schema.clone()));
2760 }
2761
2762 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2763
2764 let vid_col = batch
2766 .column_by_name("_vid")
2767 .ok_or_else(|| {
2768 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2769 })?
2770 .clone();
2771 let vid_arr = vid_col
2772 .as_any()
2773 .downcast_ref::<UInt64Array>()
2774 .ok_or_else(|| {
2775 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2776 })?;
2777 columns.push(vid_col.clone());
2778
2779 let labels_col = batch.column_by_name("labels");
2781 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2782
2783 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2784 for i in 0..vid_arr.len() {
2785 let vid_u64 = vid_arr.value(i);
2786 let vid = Vid::from(vid_u64);
2787
2788 let mut row_labels: Vec<String> = Vec::new();
2790 if let Some(arr) = labels_arr
2791 && !arr.is_null(i)
2792 {
2793 let list_val = arr.value(i);
2794 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2795 for j in 0..str_arr.len() {
2796 if !str_arr.is_null(j) {
2797 row_labels.push(str_arr.value(j).to_string());
2798 }
2799 }
2800 }
2801 }
2802
2803 for l0 in l0_ctx.iter_l0_buffers() {
2805 let guard = l0.read();
2806 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2807 for lbl in l0_labels {
2808 if !row_labels.contains(lbl) {
2809 row_labels.push(lbl.clone());
2810 }
2811 }
2812 }
2813 }
2814
2815 let values = labels_builder.values();
2816 for lbl in &row_labels {
2817 values.append_value(lbl);
2818 }
2819 labels_builder.append(true);
2820 }
2821 columns.push(Arc::new(labels_builder.finish()));
2822
2823 let props_col = batch.column_by_name("props_json");
2825 let props_arr =
2826 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2827
2828 for prop in projected_properties {
2829 if prop == "_all_props" {
2830 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2833 let guard = l0.read();
2834 !guard.vertex_properties.is_empty()
2835 });
2836 if !any_l0_has_vertex_props {
2837 match props_col {
2838 Some(col) => columns.push(col.clone()),
2839 None => {
2840 columns.push(arrow_array::new_null_array(
2841 &DataType::LargeBinary,
2842 batch.num_rows(),
2843 ));
2844 }
2845 }
2846 } else {
2847 let col = build_all_props_column_with_l0_overlay(
2848 batch.num_rows(),
2849 vid_arr,
2850 props_arr,
2851 l0_ctx,
2852 );
2853 columns.push(col);
2854 }
2855 } else {
2856 let expected_type = output_schema
2861 .field_with_name(&format!("{_variable}.{prop}"))
2862 .map(|f| f.data_type().clone())
2863 .unwrap_or(DataType::LargeBinary);
2864
2865 if expected_type == DataType::LargeBinary {
2866 let col = build_overflow_property_column(
2867 batch.num_rows(),
2868 vid_arr,
2869 props_arr,
2870 prop,
2871 l0_ctx,
2872 );
2873 columns.push(col);
2874 } else {
2875 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2877 for i in 0..batch.num_rows() {
2878 let vid = Vid::from(vid_arr.value(i));
2879 let resolved =
2880 resolve_l0_property(&vid, prop, l0_ctx)
2881 .flatten()
2882 .or_else(|| {
2883 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2884 uni_common::cypher_value_codec::decode(&bytes).ok()
2885 })
2886 });
2887 if let Some(val) = resolved {
2888 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2889 }
2890 }
2891 let vids: Vec<Vid> = (0..batch.num_rows())
2892 .map(|i| Vid::from(vid_arr.value(i)))
2893 .collect();
2894 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2895 .unwrap_or_else(|_| {
2896 arrow_array::new_null_array(&expected_type, batch.num_rows())
2897 });
2898 columns.push(col);
2899 }
2900 }
2901 }
2902
2903 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2904}
2905
2906pub(crate) fn get_property_value(
2908 vid: &Vid,
2909 props_map: &HashMap<Vid, Properties>,
2910 prop_name: &str,
2911) -> Option<Value> {
2912 if prop_name == "_all_props" {
2913 return props_map.get(vid).map(|p| {
2914 let map: HashMap<String, Value> =
2915 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2916 Value::Map(map)
2917 });
2918 }
2919 props_map
2920 .get(vid)
2921 .and_then(|props| props.get(prop_name))
2922 .cloned()
2923}
2924
2925macro_rules! build_numeric_column {
2927 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2928 let mut builder = <$builder_ty>::new();
2929 for vid in $vids {
2930 match get_property_value(vid, $props_map, $prop_name) {
2931 Some(ref v) => {
2932 if let Some(val) = $extractor(v) {
2933 builder.append_value($cast(val));
2934 } else {
2935 builder.append_null();
2936 }
2937 }
2938 None => builder.append_null(),
2939 }
2940 }
2941 Ok(Arc::new(builder.finish()) as ArrayRef)
2942 }};
2943}
2944
2945pub(crate) fn build_property_column_static(
2947 vids: &[Vid],
2948 props_map: &HashMap<Vid, Properties>,
2949 prop_name: &str,
2950 data_type: &DataType,
2951) -> DFResult<ArrayRef> {
2952 match data_type {
2953 DataType::LargeBinary => {
2954 use arrow_array::builder::LargeBinaryBuilder;
2956 let mut builder = LargeBinaryBuilder::new();
2957
2958 for vid in vids {
2959 match get_property_value(vid, props_map, prop_name) {
2960 Some(Value::Null) | None => builder.append_null(),
2961 Some(Value::Bytes(bytes)) => {
2962 builder.append_value(&bytes);
2963 }
2964 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2965 let bytes: Vec<u8> = arr
2968 .iter()
2969 .filter_map(|v| v.as_u64().map(|n| n as u8))
2970 .collect();
2971 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2972 builder.append_value(&bytes);
2973 } else {
2974 builder.append_value(uni_common::cypher_value_codec::encode(
2975 &Value::List(arr),
2976 ));
2977 }
2978 }
2979 Some(val) => {
2980 builder.append_value(uni_common::cypher_value_codec::encode(&val));
2984 }
2985 }
2986 }
2987 Ok(Arc::new(builder.finish()))
2988 }
2989 DataType::Binary => {
2990 let mut builder = BinaryBuilder::new();
2992 for vid in vids {
2993 let bytes = get_property_value(vid, props_map, prop_name)
2994 .filter(|v| !v.is_null())
2995 .and_then(|v| {
2996 let json_val: serde_json::Value = v.into();
2997 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2998 })
2999 .and_then(|crdt| crdt.to_msgpack().ok());
3000 match bytes {
3001 Some(b) => builder.append_value(&b),
3002 None => builder.append_null(),
3003 }
3004 }
3005 Ok(Arc::new(builder.finish()))
3006 }
3007 DataType::Utf8 => {
3008 let mut builder = StringBuilder::new();
3009 for vid in vids {
3010 match get_property_value(vid, props_map, prop_name) {
3011 Some(Value::String(s)) => builder.append_value(s),
3012 Some(Value::Null) | None => builder.append_null(),
3013 Some(other) => builder.append_value(other.to_string()),
3014 }
3015 }
3016 Ok(Arc::new(builder.finish()))
3017 }
3018 DataType::Int64 => {
3019 build_numeric_column!(
3020 vids,
3021 props_map,
3022 prop_name,
3023 Int64Builder,
3024 |v: &Value| v.as_i64(),
3025 |v| v
3026 )
3027 }
3028 DataType::Int32 => {
3029 build_numeric_column!(
3030 vids,
3031 props_map,
3032 prop_name,
3033 Int32Builder,
3034 |v: &Value| v.as_i64(),
3035 |v: i64| v as i32
3036 )
3037 }
3038 DataType::Float64 => {
3039 build_numeric_column!(
3040 vids,
3041 props_map,
3042 prop_name,
3043 Float64Builder,
3044 |v: &Value| v.as_f64(),
3045 |v| v
3046 )
3047 }
3048 DataType::Float32 => {
3049 build_numeric_column!(
3050 vids,
3051 props_map,
3052 prop_name,
3053 Float32Builder,
3054 |v: &Value| v.as_f64(),
3055 |v: f64| v as f32
3056 )
3057 }
3058 DataType::Boolean => {
3059 let mut builder = BooleanBuilder::new();
3060 for vid in vids {
3061 match get_property_value(vid, props_map, prop_name) {
3062 Some(Value::Bool(b)) => builder.append_value(b),
3063 _ => builder.append_null(),
3064 }
3065 }
3066 Ok(Arc::new(builder.finish()))
3067 }
3068 DataType::UInt64 => {
3069 build_numeric_column!(
3070 vids,
3071 props_map,
3072 prop_name,
3073 UInt64Builder,
3074 |v: &Value| v.as_u64(),
3075 |v| v
3076 )
3077 }
3078 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
3079 let values_builder = Float32Builder::new();
3081 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
3082 for vid in vids {
3083 match get_property_value(vid, props_map, prop_name) {
3084 Some(Value::Vector(v)) => {
3085 for val in v {
3086 list_builder.values().append_value(val);
3087 }
3088 list_builder.append(true);
3089 }
3090 Some(Value::List(arr)) => {
3091 for v in arr {
3092 list_builder
3093 .values()
3094 .append_value(v.as_f64().unwrap_or(0.0) as f32);
3095 }
3096 list_builder.append(true);
3097 }
3098 _ => {
3099 for _ in 0..*dim {
3101 list_builder.values().append_null();
3102 }
3103 list_builder.append(false);
3104 }
3105 }
3106 }
3107 Ok(Arc::new(list_builder.finish()))
3108 }
3109 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3110 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3112 for vid in vids {
3113 match get_property_value(vid, props_map, prop_name) {
3114 Some(Value::Temporal(tv)) => match tv {
3115 uni_common::TemporalValue::DateTime {
3116 nanos_since_epoch, ..
3117 }
3118 | uni_common::TemporalValue::LocalDateTime {
3119 nanos_since_epoch, ..
3120 } => {
3121 builder.append_value(nanos_since_epoch);
3122 }
3123 uni_common::TemporalValue::Date { days_since_epoch } => {
3124 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3125 }
3126 _ => builder.append_null(),
3127 },
3128 Some(Value::String(s)) => match parse_datetime_utc(&s) {
3129 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3130 Err(_) => builder.append_null(),
3131 },
3132 Some(Value::Int(n)) => {
3133 builder.append_value(n);
3134 }
3135 _ => builder.append_null(),
3136 }
3137 }
3138 Ok(Arc::new(builder.finish()))
3139 }
3140 DataType::Date32 => {
3141 let mut builder = Date32Builder::new();
3142 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3143 for vid in vids {
3144 match get_property_value(vid, props_map, prop_name) {
3145 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3146 builder.append_value(days_since_epoch);
3147 }
3148 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3149 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3150 Err(_) => builder.append_null(),
3151 },
3152 Some(Value::Int(n)) => {
3153 builder.append_value(n as i32);
3154 }
3155 _ => builder.append_null(),
3156 }
3157 }
3158 Ok(Arc::new(builder.finish()))
3159 }
3160 DataType::Time64(TimeUnit::Nanosecond) => {
3161 let mut builder = Time64NanosecondBuilder::new();
3162 for vid in vids {
3163 match get_property_value(vid, props_map, prop_name) {
3164 Some(Value::Temporal(
3165 uni_common::TemporalValue::LocalTime {
3166 nanos_since_midnight,
3167 }
3168 | uni_common::TemporalValue::Time {
3169 nanos_since_midnight,
3170 ..
3171 },
3172 )) => {
3173 builder.append_value(nanos_since_midnight);
3174 }
3175 Some(Value::Temporal(_)) => builder.append_null(),
3176 Some(Value::String(s)) => {
3177 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3178 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3179 {
3180 Ok(t) => {
3181 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3182 + t.nanosecond() as i64;
3183 builder.append_value(nanos);
3184 }
3185 Err(_) => builder.append_null(),
3186 }
3187 }
3188 Some(Value::Int(n)) => {
3189 builder.append_value(n);
3190 }
3191 _ => builder.append_null(),
3192 }
3193 }
3194 Ok(Arc::new(builder.finish()))
3195 }
3196 DataType::Interval(IntervalUnit::MonthDayNano) => {
3197 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3198 Vec::with_capacity(vids.len());
3199 for vid in vids {
3200 match get_property_value(vid, props_map, prop_name) {
3201 Some(Value::Temporal(uni_common::TemporalValue::Duration {
3202 months,
3203 days,
3204 nanos,
3205 })) => {
3206 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3207 months: months as i32,
3208 days: days as i32,
3209 nanoseconds: nanos,
3210 }));
3211 }
3212 Some(Value::Int(_n)) => {
3213 values.push(None);
3214 }
3215 _ => values.push(None),
3216 }
3217 }
3218 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3219 Ok(Arc::new(arr))
3220 }
3221 DataType::List(inner_field) => {
3222 build_list_property_column(vids, props_map, prop_name, inner_field)
3223 }
3224 DataType::Struct(fields) => {
3225 build_struct_property_column(vids, props_map, prop_name, fields)
3226 }
3227 DataType::FixedSizeBinary(24) => {
3228 use arrow_array::builder::FixedSizeBinaryBuilder;
3230 const BTIC_LEN: i32 = 24;
3231 let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3232 for vid in vids {
3233 match get_property_value(vid, props_map, prop_name) {
3234 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3235 match uni_btic::Btic::new(lo, hi, meta) {
3236 Ok(b) => {
3237 builder
3238 .append_value(uni_btic::encode::encode(&b))
3239 .map_err(arrow_err)?;
3240 }
3241 Err(e) => {
3242 tracing::warn!(
3243 "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3244 prop_name,
3245 lo,
3246 hi,
3247 meta,
3248 e
3249 );
3250 builder.append_null()
3251 }
3252 }
3253 }
3254 Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3255 Ok(b) => {
3256 builder
3257 .append_value(uni_btic::encode::encode(&b))
3258 .map_err(arrow_err)?;
3259 }
3260 Err(e) => {
3261 tracing::warn!(
3262 "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3263 prop_name,
3264 s,
3265 e
3266 );
3267 builder.append_null()
3268 }
3269 },
3270 _ => builder.append_null(),
3271 }
3272 }
3273 Ok(Arc::new(builder.finish()))
3274 }
3275 _ => {
3277 let mut builder = StringBuilder::new();
3278 for vid in vids {
3279 match get_property_value(vid, props_map, prop_name) {
3280 Some(Value::Null) | None => builder.append_null(),
3281 Some(other) => builder.append_value(other.to_string()),
3282 }
3283 }
3284 Ok(Arc::new(builder.finish()))
3285 }
3286 }
3287}
3288
3289fn build_list_property_column(
3291 vids: &[Vid],
3292 props_map: &HashMap<Vid, Properties>,
3293 prop_name: &str,
3294 inner_field: &Arc<Field>,
3295) -> DFResult<ArrayRef> {
3296 match inner_field.data_type() {
3297 DataType::Utf8 => {
3298 let mut builder = ListBuilder::new(StringBuilder::new());
3299 for vid in vids {
3300 match get_property_value(vid, props_map, prop_name) {
3301 Some(Value::List(arr)) => {
3302 for v in arr {
3303 match v {
3304 Value::String(s) => builder.values().append_value(s),
3305 Value::Null => builder.values().append_null(),
3306 other => builder.values().append_value(format!("{other:?}")),
3307 }
3308 }
3309 builder.append(true);
3310 }
3311 _ => builder.append(false),
3312 }
3313 }
3314 Ok(Arc::new(builder.finish()))
3315 }
3316 DataType::Int64 => {
3317 let mut builder = ListBuilder::new(Int64Builder::new());
3318 for vid in vids {
3319 match get_property_value(vid, props_map, prop_name) {
3320 Some(Value::List(arr)) => {
3321 for v in arr {
3322 match v.as_i64() {
3323 Some(n) => builder.values().append_value(n),
3324 None => builder.values().append_null(),
3325 }
3326 }
3327 builder.append(true);
3328 }
3329 _ => builder.append(false),
3330 }
3331 }
3332 Ok(Arc::new(builder.finish()))
3333 }
3334 DataType::Float64 => {
3335 let mut builder = ListBuilder::new(Float64Builder::new());
3336 for vid in vids {
3337 match get_property_value(vid, props_map, prop_name) {
3338 Some(Value::List(arr)) => {
3339 for v in arr {
3340 match v.as_f64() {
3341 Some(n) => builder.values().append_value(n),
3342 None => builder.values().append_null(),
3343 }
3344 }
3345 builder.append(true);
3346 }
3347 _ => builder.append(false),
3348 }
3349 }
3350 Ok(Arc::new(builder.finish()))
3351 }
3352 DataType::Boolean => {
3353 let mut builder = ListBuilder::new(BooleanBuilder::new());
3354 for vid in vids {
3355 match get_property_value(vid, props_map, prop_name) {
3356 Some(Value::List(arr)) => {
3357 for v in arr {
3358 match v.as_bool() {
3359 Some(b) => builder.values().append_value(b),
3360 None => builder.values().append_null(),
3361 }
3362 }
3363 builder.append(true);
3364 }
3365 _ => builder.append(false),
3366 }
3367 }
3368 Ok(Arc::new(builder.finish()))
3369 }
3370 DataType::Struct(fields) => {
3371 build_list_of_structs_column(vids, props_map, prop_name, fields)
3373 }
3374 _ => {
3376 let mut builder = ListBuilder::new(StringBuilder::new());
3377 for vid in vids {
3378 match get_property_value(vid, props_map, prop_name) {
3379 Some(Value::List(arr)) => {
3380 for v in arr {
3381 match v {
3382 Value::Null => builder.values().append_null(),
3383 other => builder.values().append_value(format!("{other:?}")),
3384 }
3385 }
3386 builder.append(true);
3387 }
3388 _ => builder.append(false),
3389 }
3390 }
3391 Ok(Arc::new(builder.finish()))
3392 }
3393 }
3394}
3395
3396fn build_list_of_structs_column(
3402 vids: &[Vid],
3403 props_map: &HashMap<Vid, Properties>,
3404 prop_name: &str,
3405 fields: &Fields,
3406) -> DFResult<ArrayRef> {
3407 use arrow_array::StructArray;
3408
3409 let values: Vec<Option<Value>> = vids
3410 .iter()
3411 .map(|vid| get_property_value(vid, props_map, prop_name))
3412 .collect();
3413
3414 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3417 .iter()
3418 .map(|val| match val {
3419 Some(Value::List(arr)) => {
3420 let objs: Vec<HashMap<String, Value>> = arr
3421 .iter()
3422 .filter_map(|v| {
3423 if let Value::Map(m) = v {
3424 Some(m.clone())
3425 } else {
3426 None
3427 }
3428 })
3429 .collect();
3430 if objs.is_empty() { None } else { Some(objs) }
3431 }
3432 Some(Value::Map(obj)) => {
3433 let kv_pairs: Vec<HashMap<String, Value>> = obj
3435 .iter()
3436 .map(|(k, v)| {
3437 let mut m = HashMap::new();
3438 m.insert("key".to_string(), Value::String(k.clone()));
3439 m.insert("value".to_string(), v.clone());
3440 m
3441 })
3442 .collect();
3443 Some(kv_pairs)
3444 }
3445 _ => None,
3446 })
3447 .collect();
3448
3449 let total_items: usize = rows
3450 .iter()
3451 .filter_map(|r| r.as_ref())
3452 .map(|v| v.len())
3453 .sum();
3454
3455 let child_arrays: Vec<ArrayRef> = fields
3457 .iter()
3458 .map(|field| {
3459 let field_name = field.name();
3460 match field.data_type() {
3461 DataType::Utf8 => {
3462 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3463 for obj in rows.iter().flatten().flatten() {
3464 match obj.get(field_name) {
3465 Some(Value::String(s)) => builder.append_value(s),
3466 Some(Value::Null) | None => builder.append_null(),
3467 Some(other) => builder.append_value(format!("{other:?}")),
3468 }
3469 }
3470 Arc::new(builder.finish()) as ArrayRef
3471 }
3472 DataType::Int64 => {
3473 let mut builder = Int64Builder::with_capacity(total_items);
3474 for obj in rows.iter().flatten().flatten() {
3475 match obj.get(field_name).and_then(|v| v.as_i64()) {
3476 Some(n) => builder.append_value(n),
3477 None => builder.append_null(),
3478 }
3479 }
3480 Arc::new(builder.finish()) as ArrayRef
3481 }
3482 DataType::Float64 => {
3483 let mut builder = Float64Builder::with_capacity(total_items);
3484 for obj in rows.iter().flatten().flatten() {
3485 match obj.get(field_name).and_then(|v| v.as_f64()) {
3486 Some(n) => builder.append_value(n),
3487 None => builder.append_null(),
3488 }
3489 }
3490 Arc::new(builder.finish()) as ArrayRef
3491 }
3492 _ => {
3494 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3495 for obj in rows.iter().flatten().flatten() {
3496 match obj.get(field_name) {
3497 Some(Value::Null) | None => builder.append_null(),
3498 Some(other) => builder.append_value(format!("{other:?}")),
3499 }
3500 }
3501 Arc::new(builder.finish()) as ArrayRef
3502 }
3503 }
3504 })
3505 .collect();
3506
3507 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3509 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3510
3511 let mut offsets = Vec::with_capacity(vids.len() + 1);
3513 let mut nulls = Vec::with_capacity(vids.len());
3514 let mut offset = 0i32;
3515 offsets.push(offset);
3516 for row in &rows {
3517 match row {
3518 Some(objs) => {
3519 offset += objs.len() as i32;
3520 offsets.push(offset);
3521 nulls.push(true);
3522 }
3523 None => {
3524 offsets.push(offset);
3525 nulls.push(false);
3526 }
3527 }
3528 }
3529
3530 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3531 let list_array = arrow_array::ListArray::try_new(
3532 list_field,
3533 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3534 Arc::new(struct_array),
3535 Some(arrow::buffer::NullBuffer::from(nulls)),
3536 )
3537 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3538
3539 Ok(Arc::new(list_array))
3540}
3541
3542fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3545 use uni_common::value::TemporalValue;
3546 let mut m = HashMap::new();
3547 match tv {
3548 TemporalValue::DateTime {
3549 nanos_since_epoch,
3550 offset_seconds,
3551 timezone_name,
3552 } => {
3553 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3554 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3555 if let Some(tz) = timezone_name {
3556 m.insert("timezone_name".into(), Value::String(tz.clone()));
3557 }
3558 }
3559 TemporalValue::LocalDateTime { nanos_since_epoch } => {
3560 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3561 }
3562 TemporalValue::Time {
3563 nanos_since_midnight,
3564 offset_seconds,
3565 } => {
3566 m.insert(
3567 "nanos_since_midnight".into(),
3568 Value::Int(*nanos_since_midnight),
3569 );
3570 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3571 }
3572 TemporalValue::LocalTime {
3573 nanos_since_midnight,
3574 } => {
3575 m.insert(
3576 "nanos_since_midnight".into(),
3577 Value::Int(*nanos_since_midnight),
3578 );
3579 }
3580 TemporalValue::Date { days_since_epoch } => {
3581 m.insert(
3582 "days_since_epoch".into(),
3583 Value::Int(*days_since_epoch as i64),
3584 );
3585 }
3586 TemporalValue::Duration {
3587 months,
3588 days,
3589 nanos,
3590 } => {
3591 m.insert("months".into(), Value::Int(*months));
3592 m.insert("days".into(), Value::Int(*days));
3593 m.insert("nanos".into(), Value::Int(*nanos));
3594 }
3595 TemporalValue::Btic { lo, hi, meta } => {
3596 m.insert("lo".into(), Value::Int(*lo));
3597 m.insert("hi".into(), Value::Int(*hi));
3598 m.insert("meta".into(), Value::Int(*meta as i64));
3599 }
3600 }
3601 m
3602}
3603
3604fn build_struct_property_column(
3606 vids: &[Vid],
3607 props_map: &HashMap<Vid, Properties>,
3608 prop_name: &str,
3609 fields: &Fields,
3610) -> DFResult<ArrayRef> {
3611 use arrow_array::StructArray;
3612
3613 let values: Vec<Option<Value>> = vids
3616 .iter()
3617 .map(|vid| {
3618 let val = get_property_value(vid, props_map, prop_name);
3619 match val {
3620 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3621 other => other,
3622 }
3623 })
3624 .collect();
3625
3626 let child_arrays: Vec<ArrayRef> = fields
3627 .iter()
3628 .map(|field| {
3629 let field_name = field.name();
3630 match field.data_type() {
3631 DataType::Float64 => {
3632 let mut builder = Float64Builder::with_capacity(vids.len());
3633 for val in &values {
3634 match val {
3635 Some(Value::Map(obj)) => {
3636 match obj.get(field_name).and_then(|v| v.as_f64()) {
3637 Some(n) => builder.append_value(n),
3638 None => builder.append_null(),
3639 }
3640 }
3641 _ => builder.append_null(),
3642 }
3643 }
3644 Arc::new(builder.finish()) as ArrayRef
3645 }
3646 DataType::Utf8 => {
3647 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3648 for val in &values {
3649 match val {
3650 Some(Value::Map(obj)) => match obj.get(field_name) {
3651 Some(Value::String(s)) => builder.append_value(s),
3652 Some(Value::Null) | None => builder.append_null(),
3653 Some(other) => builder.append_value(format!("{other:?}")),
3654 },
3655 _ => builder.append_null(),
3656 }
3657 }
3658 Arc::new(builder.finish()) as ArrayRef
3659 }
3660 DataType::Int64 => {
3661 let mut builder = Int64Builder::with_capacity(vids.len());
3662 for val in &values {
3663 match val {
3664 Some(Value::Map(obj)) => {
3665 match obj.get(field_name).and_then(|v| v.as_i64()) {
3666 Some(n) => builder.append_value(n),
3667 None => builder.append_null(),
3668 }
3669 }
3670 _ => builder.append_null(),
3671 }
3672 }
3673 Arc::new(builder.finish()) as ArrayRef
3674 }
3675 DataType::Timestamp(_, _) => {
3676 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3677 for val in &values {
3678 match val {
3679 Some(Value::Map(obj)) => {
3680 match obj.get(field_name).and_then(|v| v.as_i64()) {
3681 Some(n) => builder.append_value(n),
3682 None => builder.append_null(),
3683 }
3684 }
3685 _ => builder.append_null(),
3686 }
3687 }
3688 Arc::new(builder.finish()) as ArrayRef
3689 }
3690 DataType::Int32 => {
3691 let mut builder = Int32Builder::with_capacity(vids.len());
3692 for val in &values {
3693 match val {
3694 Some(Value::Map(obj)) => {
3695 match obj.get(field_name).and_then(|v| v.as_i64()) {
3696 Some(n) => builder.append_value(n as i32),
3697 None => builder.append_null(),
3698 }
3699 }
3700 _ => builder.append_null(),
3701 }
3702 }
3703 Arc::new(builder.finish()) as ArrayRef
3704 }
3705 DataType::Time64(_) => {
3706 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3707 for val in &values {
3708 match val {
3709 Some(Value::Map(obj)) => {
3710 match obj.get(field_name).and_then(|v| v.as_i64()) {
3711 Some(n) => builder.append_value(n),
3712 None => builder.append_null(),
3713 }
3714 }
3715 _ => builder.append_null(),
3716 }
3717 }
3718 Arc::new(builder.finish()) as ArrayRef
3719 }
3720 _ => {
3722 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3723 for val in &values {
3724 match val {
3725 Some(Value::Map(obj)) => match obj.get(field_name) {
3726 Some(Value::Null) | None => builder.append_null(),
3727 Some(other) => builder.append_value(format!("{other:?}")),
3728 },
3729 _ => builder.append_null(),
3730 }
3731 }
3732 Arc::new(builder.finish()) as ArrayRef
3733 }
3734 }
3735 })
3736 .collect();
3737
3738 let nulls: Vec<bool> = values
3740 .iter()
3741 .map(|v| matches!(v, Some(Value::Map(_))))
3742 .collect();
3743
3744 let struct_array = StructArray::try_new(
3745 fields.clone(),
3746 child_arrays,
3747 Some(arrow::buffer::NullBuffer::from(nulls)),
3748 )
3749 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3750
3751 Ok(Arc::new(struct_array))
3752}
3753
3754impl Stream for GraphScanStream {
3755 type Item = DFResult<RecordBatch>;
3756
3757 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3758 let metrics = self.metrics.clone();
3759 let _timer = metrics.elapsed_compute().timer();
3760 loop {
3761 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3763
3764 match state {
3765 GraphScanState::Init => {
3766 let graph_ctx = self.graph_ctx.clone();
3768 let label = self.label.clone();
3769 let variable = self.variable.clone();
3770 let properties = self.properties.clone();
3771 let is_edge_scan = self.is_edge_scan;
3772 let is_schemaless = self.is_schemaless;
3773 let filter = self.filter.clone();
3774 let vid_list_filter = self.vid_list_filter.clone();
3775 let extra_lance_filter = self.extra_lance_filter.clone();
3776 let extra_runtime_filter = self.extra_runtime_filter.clone();
3777 let schema = self.schema.clone();
3778
3779 let fut = async move {
3780 graph_ctx.check_timeout().map_err(|e| {
3781 datafusion::error::DataFusionError::Execution(e.to_string())
3782 })?;
3783
3784 let batch = if is_edge_scan {
3785 columnar_scan_edge_batch_static(
3786 &graph_ctx,
3787 &label,
3788 &variable,
3789 &properties,
3790 &schema,
3791 )
3792 .await?
3793 } else if is_schemaless {
3794 columnar_scan_schemaless_vertex_batch_static(
3795 &graph_ctx,
3796 &label,
3797 &variable,
3798 &properties,
3799 &schema,
3800 &filter,
3801 vid_list_filter.as_deref(),
3802 extra_lance_filter.as_deref(),
3803 extra_runtime_filter.as_ref(),
3804 )
3805 .await?
3806 } else {
3807 columnar_scan_vertex_batch_static(
3808 &graph_ctx,
3809 &label,
3810 &variable,
3811 &properties,
3812 &schema,
3813 &filter,
3814 vid_list_filter.as_deref(),
3815 extra_lance_filter.as_deref(),
3816 extra_runtime_filter.as_ref(),
3817 )
3818 .await?
3819 };
3820 Ok(Some(batch))
3821 };
3822
3823 self.state = GraphScanState::Executing(Box::pin(fut));
3824 }
3826 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3827 Poll::Ready(Ok(batch)) => {
3828 self.state = GraphScanState::Done;
3829 self.metrics
3830 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3831 return Poll::Ready(batch.map(Ok));
3832 }
3833 Poll::Ready(Err(e)) => {
3834 self.state = GraphScanState::Done;
3835 return Poll::Ready(Some(Err(e)));
3836 }
3837 Poll::Pending => {
3838 self.state = GraphScanState::Executing(fut);
3839 return Poll::Pending;
3840 }
3841 },
3842 GraphScanState::Done => {
3843 return Poll::Ready(None);
3844 }
3845 }
3846 }
3847 }
3848}
3849
3850impl RecordBatchStream for GraphScanStream {
3851 fn schema(&self) -> SchemaRef {
3852 self.schema.clone()
3853 }
3854}
3855
3856#[cfg(test)]
3857mod tests {
3858 use super::*;
3859
3860 #[test]
3861 fn test_build_vertex_schema() {
3862 let uni_schema = UniSchema::default();
3863 let schema = GraphScanExec::build_vertex_schema(
3864 "n",
3865 "Person",
3866 &["name".to_string(), "age".to_string()],
3867 &uni_schema,
3868 );
3869
3870 assert_eq!(schema.fields().len(), 4);
3871 assert_eq!(schema.field(0).name(), "n._vid");
3872 assert_eq!(schema.field(1).name(), "n._labels");
3873 assert_eq!(schema.field(2).name(), "n.name");
3874 assert_eq!(schema.field(3).name(), "n.age");
3875 }
3876
3877 #[test]
3878 fn test_build_edge_schema() {
3879 let uni_schema = UniSchema::default();
3880 let schema =
3881 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3882
3883 assert_eq!(schema.fields().len(), 4);
3884 assert_eq!(schema.field(0).name(), "r._eid");
3885 assert_eq!(schema.field(1).name(), "r._src_vid");
3886 assert_eq!(schema.field(2).name(), "r._dst_vid");
3887 assert_eq!(schema.field(3).name(), "r.weight");
3888 }
3889
3890 #[test]
3891 fn test_build_schemaless_vertex_schema() {
3892 let empty_schema = uni_common::core::schema::Schema::default();
3893 let schema = GraphScanExec::build_schemaless_vertex_schema(
3894 "n",
3895 &["name".to_string(), "age".to_string()],
3896 &empty_schema,
3897 );
3898
3899 assert_eq!(schema.fields().len(), 4);
3900 assert_eq!(schema.field(0).name(), "n._vid");
3901 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3902 assert_eq!(schema.field(1).name(), "n._labels");
3903 assert_eq!(schema.field(2).name(), "n.name");
3904 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3906 assert_eq!(schema.field(3).name(), "n.age");
3907 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3908 }
3909
3910 #[test]
3911 fn test_schemaless_all_scan_has_empty_label() {
3912 let empty_schema = uni_common::core::schema::Schema::default();
3913 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3914
3915 assert_eq!(schema.fields().len(), 2);
3917 assert_eq!(schema.field(0).name(), "n._vid");
3918 assert_eq!(schema.field(1).name(), "n._labels");
3919 }
3920
3921 #[test]
3922 fn test_cypher_value_all_props_extraction() {
3923 let map: HashMap<String, Value> = [
3926 ("age".to_string(), Value::Int(30)),
3927 ("name".to_string(), Value::String("Alice".to_string())),
3928 ]
3929 .into_iter()
3930 .collect();
3931 let cv_bytes = uni_common::cypher_value_codec::encode(&Value::Map(map));
3932
3933 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3935 match decoded {
3936 uni_common::Value::Map(map) => {
3937 let age_val = map.get("age").unwrap();
3938 assert_eq!(age_val, &uni_common::Value::Int(30));
3939 }
3940 _ => panic!("Expected Map"),
3941 }
3942
3943 let single_bytes = uni_common::cypher_value_codec::encode(&Value::Int(30));
3945 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3946 assert_eq!(single_decoded, uni_common::Value::Int(30));
3947 }
3948
3949 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3951 let schema = Arc::new(Schema::new(vec![
3952 Field::new("_vid", DataType::UInt64, false),
3953 Field::new("_deleted", DataType::Boolean, false),
3954 Field::new("_version", DataType::UInt64, false),
3955 Field::new("name", DataType::Utf8, true),
3956 ]));
3957 let names: Vec<String> = vids
3959 .iter()
3960 .zip(versions.iter())
3961 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3962 .collect();
3963 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3964
3965 RecordBatch::try_new(
3966 schema,
3967 vec![
3968 Arc::new(UInt64Array::from(vids.to_vec())),
3969 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3970 Arc::new(UInt64Array::from(versions.to_vec())),
3971 Arc::new(name_arr),
3972 ],
3973 )
3974 .unwrap()
3975 }
3976
3977 #[test]
3978 fn test_mvcc_dedup_multiple_versions() {
3979 let batch = make_mvcc_batch(
3982 &[1, 1, 1, 2, 2],
3983 &[3, 1, 5, 2, 4],
3984 &[false, false, false, false, false],
3985 );
3986
3987 let result = mvcc_dedup_batch(&batch).unwrap();
3988 assert_eq!(result.num_rows(), 2);
3989
3990 let vid_col = result
3991 .column_by_name("_vid")
3992 .unwrap()
3993 .as_any()
3994 .downcast_ref::<UInt64Array>()
3995 .unwrap();
3996 let ver_col = result
3997 .column_by_name("_version")
3998 .unwrap()
3999 .as_any()
4000 .downcast_ref::<UInt64Array>()
4001 .unwrap();
4002 let name_col = result
4003 .column_by_name("name")
4004 .unwrap()
4005 .as_any()
4006 .downcast_ref::<arrow_array::StringArray>()
4007 .unwrap();
4008
4009 assert_eq!(vid_col.value(0), 1);
4011 assert_eq!(ver_col.value(0), 5);
4012 assert_eq!(name_col.value(0), "v1_ver5");
4013
4014 assert_eq!(vid_col.value(1), 2);
4015 assert_eq!(ver_col.value(1), 4);
4016 assert_eq!(name_col.value(1), "v2_ver4");
4017 }
4018
4019 #[test]
4020 fn test_mvcc_dedup_single_rows() {
4021 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4023 let result = mvcc_dedup_batch(&batch).unwrap();
4024 assert_eq!(result.num_rows(), 3);
4025 }
4026
4027 #[test]
4028 fn test_mvcc_dedup_empty() {
4029 let batch = make_mvcc_batch(&[], &[], &[]);
4030 let result = mvcc_dedup_batch(&batch).unwrap();
4031 assert_eq!(result.num_rows(), 0);
4032 }
4033
4034 #[test]
4035 fn test_filter_l0_tombstones_removes_tombstoned() {
4036 use crate::query::df_graph::L0Context;
4037
4038 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4040
4041 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
4043 {
4044 }
4048 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
4049 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
4050
4051 let l0_ctx = L0Context {
4052 current_l0: Some(l0_buf),
4053 transaction_l0: None,
4054 pending_flush_l0s: vec![],
4055 };
4056
4057 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4058 assert_eq!(result.num_rows(), 2);
4059
4060 let vid_col = result
4061 .column_by_name("_vid")
4062 .unwrap()
4063 .as_any()
4064 .downcast_ref::<UInt64Array>()
4065 .unwrap();
4066 assert_eq!(vid_col.value(0), 1);
4067 assert_eq!(vid_col.value(1), 3);
4068 }
4069
4070 #[test]
4071 fn test_filter_l0_tombstones_none() {
4072 use crate::query::df_graph::L0Context;
4073
4074 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4075 let l0_ctx = L0Context::default();
4076
4077 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4078 assert_eq!(result.num_rows(), 3);
4079 }
4080
4081 #[test]
4082 fn test_map_to_output_schema_basic() {
4083 use crate::query::df_graph::L0Context;
4084
4085 let lance_schema = Arc::new(Schema::new(vec![
4087 Field::new("_vid", DataType::UInt64, false),
4088 Field::new("_deleted", DataType::Boolean, false),
4089 Field::new("_version", DataType::UInt64, false),
4090 Field::new("name", DataType::Utf8, true),
4091 ]));
4092 let name_arr: arrow_array::StringArray =
4093 vec![Some("Alice"), Some("Bob")].into_iter().collect();
4094 let batch = RecordBatch::try_new(
4095 lance_schema,
4096 vec![
4097 Arc::new(UInt64Array::from(vec![1u64, 2])),
4098 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
4099 Arc::new(UInt64Array::from(vec![1u64, 1])),
4100 Arc::new(name_arr),
4101 ],
4102 )
4103 .unwrap();
4104
4105 let output_schema = Arc::new(Schema::new(vec![
4107 Field::new("n._vid", DataType::UInt64, false),
4108 Field::new("n._labels", labels_data_type(), true),
4109 Field::new("n.name", DataType::Utf8, true),
4110 ]));
4111
4112 let l0_ctx = L0Context::default();
4113 let result = map_to_output_schema(
4114 &batch,
4115 "Person",
4116 "n",
4117 &["name".to_string()],
4118 &output_schema,
4119 &l0_ctx,
4120 )
4121 .unwrap();
4122
4123 assert_eq!(result.num_rows(), 2);
4124 assert_eq!(result.schema().fields().len(), 3);
4125 assert_eq!(result.schema().field(0).name(), "n._vid");
4126 assert_eq!(result.schema().field(1).name(), "n._labels");
4127 assert_eq!(result.schema().field(2).name(), "n.name");
4128
4129 let name_col = result
4131 .column(2)
4132 .as_any()
4133 .downcast_ref::<arrow_array::StringArray>()
4134 .unwrap();
4135 assert_eq!(name_col.value(0), "Alice");
4136 assert_eq!(name_col.value(1), "Bob");
4137 }
4138}