1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{arrow_err, compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
92
93 vid_list_filter: Option<Vec<u64>>,
98
99 extra_lance_filter: Option<String>,
105
106 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
111
112 is_edge_scan: bool,
114
115 is_schemaless: bool,
117
118 schema: SchemaRef,
120
121 properties: Arc<PlanProperties>,
123
124 metrics: ExecutionPlanMetricsSet,
126}
127
128impl fmt::Debug for GraphScanExec {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("GraphScanExec")
131 .field("label", &self.label)
132 .field("variable", &self.variable)
133 .field("projected_properties", &self.projected_properties)
134 .field(
135 "vid_list_filter_len",
136 &self.vid_list_filter.as_ref().map(Vec::len),
137 )
138 .field("is_edge_scan", &self.is_edge_scan)
139 .finish()
140 }
141}
142
143impl GraphScanExec {
144 pub fn with_vid_list_filter(mut self, vids: Vec<u64>) -> Self {
149 self.vid_list_filter = Some(vids);
150 self
151 }
152
153 pub fn with_extra_lance_filter(mut self, filter: String) -> Self {
157 self.extra_lance_filter = Some(filter);
158 self
159 }
160
161 pub fn with_extra_runtime_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
165 self.extra_runtime_filter = Some(filter);
166 self
167 }
168
169 pub fn has_extra_lance_filter(&self) -> bool {
172 self.extra_lance_filter.is_some()
173 }
174
175 pub(crate) async fn execute_with_vid_filter(&self, vids: &[u64]) -> DFResult<RecordBatch> {
185 if self.is_edge_scan {
186 return Err(datafusion::error::DataFusionError::Plan(
187 "execute_with_vid_filter not supported for edge scans".into(),
188 ));
189 }
190 if self.is_schemaless {
191 columnar_scan_schemaless_vertex_batch_static(
192 &self.graph_ctx,
193 &self.label,
194 &self.variable,
195 &self.projected_properties,
196 &self.schema,
197 &self.filter,
198 Some(vids),
199 self.extra_lance_filter.as_deref(),
200 self.extra_runtime_filter.as_ref(),
201 )
202 .await
203 } else {
204 columnar_scan_vertex_batch_static(
205 &self.graph_ctx,
206 &self.label,
207 &self.variable,
208 &self.projected_properties,
209 &self.schema,
210 &self.filter,
211 Some(vids),
212 self.extra_lance_filter.as_deref(),
213 self.extra_runtime_filter.as_ref(),
214 )
215 .await
216 }
217 }
218}
219
220impl GraphScanExec {
221 pub fn new_vertex_scan(
226 graph_ctx: Arc<GraphExecutionContext>,
227 label: impl Into<String>,
228 variable: impl Into<String>,
229 projected_properties: Vec<String>,
230 filter: Option<Arc<dyn PhysicalExpr>>,
231 ) -> Self {
232 let label = label.into();
233 let variable = variable.into();
234
235 let uni_schema = graph_ctx.storage().schema_manager().schema();
237 let schema =
238 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
239
240 let properties = compute_plan_properties(schema.clone());
241
242 Self {
243 graph_ctx,
244 label,
245 variable,
246 projected_properties,
247 filter,
248 vid_list_filter: None,
249 extra_lance_filter: None,
250 extra_runtime_filter: None,
251 is_edge_scan: false,
252 is_schemaless: false,
253 schema,
254 properties,
255 metrics: ExecutionPlanMetricsSet::new(),
256 }
257 }
258
259 pub fn new_schemaless_vertex_scan(
265 graph_ctx: Arc<GraphExecutionContext>,
266 label_name: impl Into<String>,
267 variable: impl Into<String>,
268 projected_properties: Vec<String>,
269 filter: Option<Arc<dyn PhysicalExpr>>,
270 ) -> Self {
271 let label = label_name.into();
272 let variable = variable.into();
273
274 let projected_properties: Vec<String> = projected_properties
279 .into_iter()
280 .filter(|p| p != "_vid" && p != "_labels")
281 .collect();
282
283 let uni_schema = graph_ctx.storage().schema_manager().schema();
284 let schema =
285 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
286 let properties = compute_plan_properties(schema.clone());
287
288 Self {
289 graph_ctx,
290 label,
291 variable,
292 projected_properties,
293 filter,
294 vid_list_filter: None,
295 extra_lance_filter: None,
296 extra_runtime_filter: None,
297 is_edge_scan: false,
298 is_schemaless: true,
299 schema,
300 properties,
301 metrics: ExecutionPlanMetricsSet::new(),
302 }
303 }
304
305 pub fn new_multi_label_vertex_scan(
310 graph_ctx: Arc<GraphExecutionContext>,
311 labels: Vec<String>,
312 variable: impl Into<String>,
313 projected_properties: Vec<String>,
314 filter: Option<Arc<dyn PhysicalExpr>>,
315 ) -> Self {
316 let variable = variable.into();
317 let projected_properties: Vec<String> = projected_properties
318 .into_iter()
319 .filter(|p| p != "_vid" && p != "_labels")
320 .collect();
321 let uni_schema = graph_ctx.storage().schema_manager().schema();
322 let schema =
323 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
324 let properties = compute_plan_properties(schema.clone());
325
326 let encoded_labels = labels.join(":");
328
329 Self {
330 graph_ctx,
331 label: encoded_labels,
332 variable,
333 projected_properties,
334 filter,
335 vid_list_filter: None,
336 extra_lance_filter: None,
337 extra_runtime_filter: None,
338 is_edge_scan: false,
339 is_schemaless: true,
340 schema,
341 properties,
342 metrics: ExecutionPlanMetricsSet::new(),
343 }
344 }
345
346 pub fn new_schemaless_all_scan(
352 graph_ctx: Arc<GraphExecutionContext>,
353 variable: impl Into<String>,
354 projected_properties: Vec<String>,
355 filter: Option<Arc<dyn PhysicalExpr>>,
356 ) -> Self {
357 let variable = variable.into();
358 let projected_properties: Vec<String> = projected_properties
359 .into_iter()
360 .filter(|p| p != "_vid" && p != "_labels")
361 .collect();
362
363 let uni_schema = graph_ctx.storage().schema_manager().schema();
364 let schema =
365 Self::build_schemaless_vertex_schema(&variable, &projected_properties, &uni_schema);
366 let properties = compute_plan_properties(schema.clone());
367
368 Self {
369 graph_ctx,
370 label: String::new(), variable,
372 projected_properties,
373 filter,
374 vid_list_filter: None,
375 extra_lance_filter: None,
376 extra_runtime_filter: None,
377 is_edge_scan: false,
378 is_schemaless: true,
379 schema,
380 properties,
381 metrics: ExecutionPlanMetricsSet::new(),
382 }
383 }
384
385 fn build_schemaless_vertex_schema(
391 variable: &str,
392 properties: &[String],
393 uni_schema: &uni_common::core::schema::Schema,
394 ) -> SchemaRef {
395 let mut merged: std::collections::HashMap<&str, &uni_common::core::schema::PropertyMeta> =
397 std::collections::HashMap::new();
398 for label_props in uni_schema.properties.values() {
399 for (name, meta) in label_props {
400 merged.entry(name.as_str()).or_insert(meta);
401 }
402 }
403
404 let mut fields = vec![
405 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
406 Field::new(format!("{}._labels", variable), labels_data_type(), true),
407 ];
408
409 for prop in properties {
410 let col_name = format!("{}.{}", variable, prop);
411 let arrow_type = merged
412 .get(prop.as_str())
413 .map(|meta| meta.r#type.to_arrow())
414 .unwrap_or(DataType::LargeBinary);
415 fields.push(Field::new(&col_name, arrow_type, true));
416 }
417
418 Arc::new(Schema::new(fields))
419 }
420
421 pub fn new_edge_scan(
426 graph_ctx: Arc<GraphExecutionContext>,
427 edge_type: impl Into<String>,
428 variable: impl Into<String>,
429 projected_properties: Vec<String>,
430 filter: Option<Arc<dyn PhysicalExpr>>,
431 ) -> Self {
432 let label = edge_type.into();
433 let variable = variable.into();
434
435 let uni_schema = graph_ctx.storage().schema_manager().schema();
437 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
438
439 let properties = compute_plan_properties(schema.clone());
440
441 Self {
442 graph_ctx,
443 label,
444 variable,
445 projected_properties,
446 filter,
447 vid_list_filter: None,
448 extra_lance_filter: None,
449 extra_runtime_filter: None,
450 is_edge_scan: true,
451 is_schemaless: false,
452 schema,
453 properties,
454 metrics: ExecutionPlanMetricsSet::new(),
455 }
456 }
457
458 fn build_vertex_schema(
460 variable: &str,
461 label: &str,
462 properties: &[String],
463 uni_schema: &UniSchema,
464 ) -> SchemaRef {
465 let mut fields = vec![
466 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
467 Field::new(format!("{}._labels", variable), labels_data_type(), true),
468 ];
469 let label_props = uni_schema.properties.get(label);
470 for prop in properties {
471 let col_name = format!("{}.{}", variable, prop);
472 let arrow_type = resolve_property_type(prop, label_props);
473 fields.push(Field::new(&col_name, arrow_type, true));
474 }
475 Arc::new(Schema::new(fields))
476 }
477
478 fn build_edge_schema(
480 variable: &str,
481 edge_type: &str,
482 properties: &[String],
483 uni_schema: &UniSchema,
484 ) -> SchemaRef {
485 let mut fields = vec![
486 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
487 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
488 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
489 ];
490 let edge_props = uni_schema.properties.get(edge_type);
491 for prop in properties {
492 let col_name = format!("{}.{}", variable, prop);
493 let arrow_type = resolve_property_type(prop, edge_props);
494 fields.push(Field::new(&col_name, arrow_type, true));
495 }
496 Arc::new(Schema::new(fields))
497 }
498}
499
500impl DisplayAs for GraphScanExec {
501 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
503 write!(
504 f,
505 "GraphScanExec: {}={}, properties={:?}",
506 scan_type, self.label, self.projected_properties
507 )?;
508 if self.filter.is_some() {
509 write!(f, ", filter=<pushed>")?;
510 }
511 Ok(())
512 }
513}
514
515impl ExecutionPlan for GraphScanExec {
516 fn name(&self) -> &str {
517 "GraphScanExec"
518 }
519
520 fn as_any(&self) -> &dyn Any {
521 self
522 }
523
524 fn schema(&self) -> SchemaRef {
525 self.schema.clone()
526 }
527
528 fn properties(&self) -> &Arc<PlanProperties> {
529 &self.properties
530 }
531
532 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
533 vec![]
534 }
535
536 fn with_new_children(
537 self: Arc<Self>,
538 children: Vec<Arc<dyn ExecutionPlan>>,
539 ) -> DFResult<Arc<dyn ExecutionPlan>> {
540 if children.is_empty() {
541 Ok(self)
542 } else {
543 Err(datafusion::error::DataFusionError::Plan(
544 "GraphScanExec does not accept children".to_string(),
545 ))
546 }
547 }
548
549 fn execute(
550 &self,
551 partition: usize,
552 _context: Arc<TaskContext>,
553 ) -> DFResult<SendableRecordBatchStream> {
554 let metrics = BaselineMetrics::new(&self.metrics, partition);
555
556 Ok(Box::pin(GraphScanStream::new(
557 self.graph_ctx.clone(),
558 self.label.clone(),
559 self.variable.clone(),
560 self.projected_properties.clone(),
561 self.is_edge_scan,
562 self.is_schemaless,
563 self.filter.clone(),
564 self.vid_list_filter.clone(),
565 self.extra_lance_filter.clone(),
566 self.extra_runtime_filter.clone(),
567 self.schema.clone(),
568 metrics,
569 )))
570 }
571
572 fn metrics(&self) -> Option<MetricsSet> {
573 Some(self.metrics.clone_inner())
574 }
575}
576
577enum GraphScanState {
579 Init,
581 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
583 Done,
585}
586
587struct GraphScanStream {
593 graph_ctx: Arc<GraphExecutionContext>,
595
596 label: String,
598
599 variable: String,
601
602 properties: Vec<String>,
604
605 is_edge_scan: bool,
607
608 is_schemaless: bool,
610
611 filter: Option<Arc<dyn PhysicalExpr>>,
613
614 vid_list_filter: Option<Vec<u64>>,
616
617 extra_lance_filter: Option<String>,
620
621 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
623
624 schema: SchemaRef,
626
627 state: GraphScanState,
629
630 metrics: BaselineMetrics,
632}
633
634impl GraphScanStream {
635 #[expect(clippy::too_many_arguments)]
637 fn new(
638 graph_ctx: Arc<GraphExecutionContext>,
639 label: String,
640 variable: String,
641 properties: Vec<String>,
642 is_edge_scan: bool,
643 is_schemaless: bool,
644 filter: Option<Arc<dyn PhysicalExpr>>,
645 vid_list_filter: Option<Vec<u64>>,
646 extra_lance_filter: Option<String>,
647 extra_runtime_filter: Option<Arc<dyn PhysicalExpr>>,
648 schema: SchemaRef,
649 metrics: BaselineMetrics,
650 ) -> Self {
651 Self {
652 graph_ctx,
653 label,
654 variable,
655 properties,
656 is_edge_scan,
657 is_schemaless,
658 filter,
659 vid_list_filter,
660 extra_lance_filter,
661 extra_runtime_filter,
662 schema,
663 state: GraphScanState::Init,
664 metrics,
665 }
666 }
667}
668
669pub(crate) fn resolve_property_type(
674 prop: &str,
675 schema_props: Option<
676 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
677 >,
678) -> DataType {
679 if prop == "overflow_json" {
680 DataType::LargeBinary
681 } else if prop == "_created_at" || prop == "_updated_at" {
682 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
686 } else {
687 schema_props
688 .and_then(|props| props.get(prop))
689 .map(|meta| meta.r#type.to_arrow())
690 .unwrap_or(DataType::LargeBinary)
691 }
692}
693
694#[cfg(test)]
703fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
704 mvcc_dedup_batch_by(batch, "_vid")
705}
706
707fn mvcc_dedup_to_option(
712 batch: Option<RecordBatch>,
713 id_column: &str,
714) -> DFResult<Option<RecordBatch>> {
715 match batch {
716 Some(b) => {
717 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
718 Ok(if deduped.num_rows() > 0 {
719 Some(deduped)
720 } else {
721 None
722 })
723 }
724 None => Ok(None),
725 }
726}
727
728fn merge_lance_and_l0(
732 lance_deduped: Option<RecordBatch>,
733 l0_batch: RecordBatch,
734 internal_schema: &SchemaRef,
735 id_column: &str,
736) -> DFResult<Option<RecordBatch>> {
737 let has_l0 = l0_batch.num_rows() > 0;
738 match (lance_deduped, has_l0) {
739 (Some(lance), true) => {
740 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
741 .map_err(arrow_err)?;
742 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
743 }
744 (Some(lance), false) => Ok(Some(lance)),
745 (None, true) => Ok(Some(l0_batch)),
746 (None, false) => Ok(None),
747 }
748}
749
750async fn drop_superseded_pushdown_rows(
763 storage: &Arc<uni_store::storage::manager::StorageManager>,
764 label_table: Option<&str>,
765 batch: RecordBatch,
766) -> DFResult<RecordBatch> {
767 if batch.num_rows() == 0 {
768 return Ok(batch);
769 }
770 let (Some(vid_col), Some(ver_col)) = (
771 batch
772 .column_by_name("_vid")
773 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
774 batch
775 .column_by_name("_version")
776 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
777 ) else {
778 return Err(datafusion::error::DataFusionError::Execution(
779 "pushdown version verification: scan batch missing _vid/_version".to_string(),
780 ));
781 };
782
783 let mut candidates: Vec<u64> = Vec::new();
784 let mut seen: HashSet<u64> = HashSet::new();
785 for i in 0..vid_col.len() {
786 let vid = vid_col.value(i);
787 if seen.insert(vid) {
788 candidates.push(vid);
789 }
790 }
791
792 const VERIFY_CHUNK: usize = 1000;
796 let mut max_ver: HashMap<u64, u64> = HashMap::with_capacity(candidates.len());
797 for chunk in candidates.chunks(VERIFY_CHUNK) {
798 let filter = format_vid_in_list(chunk);
799 let scanned = match label_table {
800 Some(label) => {
801 storage
802 .scan_vertex_table(label, &["_vid", "_version"], Some(&filter))
803 .await
804 }
805 None => {
806 storage
807 .scan_main_vertex_table(&["_vid", "_version"], Some(&filter))
808 .await
809 }
810 }
811 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
812 let Some(vbatch) = scanned else { continue };
813 let (Some(v_vid), Some(v_ver)) = (
814 vbatch
815 .column_by_name("_vid")
816 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
817 vbatch
818 .column_by_name("_version")
819 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>()),
820 ) else {
821 return Err(datafusion::error::DataFusionError::Execution(
822 "pushdown version verification: rescan missing _vid/_version".to_string(),
823 ));
824 };
825 for i in 0..v_vid.len() {
826 let entry = max_ver.entry(v_vid.value(i)).or_insert(0);
827 *entry = (*entry).max(v_ver.value(i));
828 }
829 }
830
831 let keep: arrow_array::BooleanArray = (0..batch.num_rows())
832 .map(|i| {
833 Some(
834 max_ver
835 .get(&vid_col.value(i))
836 .is_none_or(|&max| ver_col.value(i) >= max),
837 )
838 })
839 .collect();
840 arrow::compute::filter_record_batch(&batch, &keep).map_err(arrow_err)
841}
842
843fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
848 if !columns.iter().any(|c| c == col_name) {
849 columns.push(col_name.to_string());
850 }
851}
852
853fn extract_from_overflow_blob(
858 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
859 row: usize,
860 prop: &str,
861) -> Option<Vec<u8>> {
862 let arr = overflow_arr?;
863 if arr.is_null(row) {
864 return None;
865 }
866 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
867}
868
869fn build_overflow_property_column(
876 num_rows: usize,
877 vid_arr: &UInt64Array,
878 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
879 prop: &str,
880 l0_ctx: &crate::query::df_graph::L0Context,
881) -> ArrayRef {
882 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
883 for i in 0..num_rows {
884 let vid = Vid::from(vid_arr.value(i));
885
886 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
888
889 if let Some(val_opt) = l0_val {
890 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
891 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
892 builder.append_value(&bytes);
893 } else {
894 builder.append_null();
895 }
896 }
897 Arc::new(builder.finish())
898}
899
900fn resolve_l0_property(
906 vid: &Vid,
907 prop: &str,
908 l0_ctx: &crate::query::df_graph::L0Context,
909) -> Option<Option<Value>> {
910 let mut result = None;
911 for l0 in l0_ctx.iter_l0_buffers() {
912 let guard = l0.read();
913 if let Some(props) = guard.vertex_properties.get(vid)
914 && let Some(val) = props.get(prop)
915 {
916 result = Some(Some(val.clone()));
917 }
918 }
919 result
920}
921
922fn append_value_as_cypher_binary(
927 builder: &mut arrow_array::builder::LargeBinaryBuilder,
928 val: Option<&Value>,
929) {
930 match val {
931 Some(v) if !v.is_null() => {
932 builder.append_value(uni_common::cypher_value_codec::encode(v));
933 }
934 _ => builder.append_null(),
935 }
936}
937
938fn build_all_props_column_with_l0_overlay(
946 num_rows: usize,
947 vid_arr: &UInt64Array,
948 props_arr: Option<&arrow_array::LargeBinaryArray>,
949 l0_ctx: &crate::query::df_graph::L0Context,
950) -> ArrayRef {
951 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
952 for i in 0..num_rows {
953 let vid = Vid::from(vid_arr.value(i));
954
955 let mut merged_props: HashMap<String, Value> = HashMap::new();
958 if let Some(arr) = props_arr
959 && !arr.is_null(i)
960 && let Ok(uni_common::Value::Map(map)) =
961 uni_common::cypher_value_codec::decode(arr.value(i))
962 {
963 merged_props.extend(map);
964 }
965
966 for l0 in l0_ctx.iter_l0_buffers() {
968 let guard = l0.read();
969 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
970 for (k, v) in l0_props {
971 merged_props.insert(k.clone(), v.clone());
972 }
973 }
974 }
975
976 if merged_props.is_empty() {
978 builder.append_null();
979 } else {
980 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
981 merged_props,
982 )));
983 }
984 }
985 Arc::new(builder.finish())
986}
987
988fn build_all_props_column_for_schema_scan(
993 batch: &RecordBatch,
994 vid_arr: &UInt64Array,
995 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
996 projected_properties: &[String],
997 l0_ctx: &crate::query::df_graph::L0Context,
998) -> ArrayRef {
999 let schema_props: Vec<&str> = projected_properties
1001 .iter()
1002 .filter(|p| *p != "overflow_json" && *p != "_all_props" && !p.starts_with('_'))
1003 .map(String::as_str)
1004 .collect();
1005
1006 let num_rows = batch.num_rows();
1007 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1008 for i in 0..num_rows {
1009 let vid = Vid::from(vid_arr.value(i));
1010 let mut merged_props: HashMap<String, Value> = HashMap::new();
1013
1014 for &prop in &schema_props {
1016 if let Some(col) = batch.column_by_name(prop) {
1017 let val = uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), i, None);
1018 if !val.is_null() {
1019 merged_props.insert(prop.to_string(), val);
1020 }
1021 }
1022 }
1023
1024 if let Some(arr) = overflow_arr
1026 && !arr.is_null(i)
1027 && let Ok(uni_common::Value::Map(map)) =
1028 uni_common::cypher_value_codec::decode(arr.value(i))
1029 {
1030 merged_props.extend(map);
1031 }
1032
1033 for l0 in l0_ctx.iter_l0_buffers() {
1035 let guard = l0.read();
1036 if let Some(l0_props) = guard.vertex_properties.get(&vid) {
1037 for (k, v) in l0_props {
1038 merged_props.insert(k.clone(), v.clone());
1039 }
1040 }
1041 }
1042
1043 if merged_props.is_empty() {
1044 builder.append_null();
1045 } else {
1046 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1047 merged_props,
1048 )));
1049 }
1050 }
1051 Arc::new(builder.finish())
1052}
1053
1054fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
1060 if batch.num_rows() == 0 {
1061 return Ok(batch.clone());
1062 }
1063
1064 let id_col = batch
1065 .column_by_name(id_column)
1066 .ok_or_else(|| {
1067 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
1068 })?
1069 .clone();
1070 let version_col = batch
1071 .column_by_name("_version")
1072 .ok_or_else(|| {
1073 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
1074 })?
1075 .clone();
1076
1077 let sort_columns = vec![
1079 arrow::compute::SortColumn {
1080 values: id_col,
1081 options: Some(arrow::compute::SortOptions {
1082 descending: false,
1083 nulls_first: false,
1084 }),
1085 },
1086 arrow::compute::SortColumn {
1087 values: version_col,
1088 options: Some(arrow::compute::SortOptions {
1089 descending: true,
1090 nulls_first: false,
1091 }),
1092 },
1093 ];
1094 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None).map_err(arrow_err)?;
1095
1096 let sorted_columns: Vec<ArrayRef> = batch
1098 .columns()
1099 .iter()
1100 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
1101 .collect::<Result<_, _>>()
1102 .map_err(arrow_err)?;
1103 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns).map_err(arrow_err)?;
1104
1105 let sorted_id = sorted
1107 .column_by_name(id_column)
1108 .unwrap()
1109 .as_any()
1110 .downcast_ref::<UInt64Array>()
1111 .unwrap();
1112
1113 let mut keep = vec![false; sorted.num_rows()];
1114 if !keep.is_empty() {
1115 keep[0] = true;
1116 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
1117 if sorted_id.value(i) != sorted_id.value(i - 1) {
1118 *flag = true;
1119 }
1120 }
1121 }
1122
1123 let mask = arrow_array::BooleanArray::from(keep);
1124 arrow::compute::filter_record_batch(&sorted, &mask).map_err(arrow_err)
1125}
1126
1127fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
1129 if batch.num_rows() == 0 {
1130 return Ok(batch.clone());
1131 }
1132 let op_col = match batch.column_by_name("op") {
1133 Some(col) => col
1134 .as_any()
1135 .downcast_ref::<arrow_array::UInt8Array>()
1136 .unwrap(),
1137 None => return Ok(batch.clone()),
1138 };
1139 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
1140 let mask = arrow_array::BooleanArray::from(keep);
1141 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1142}
1143
1144fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
1146 if batch.num_rows() == 0 {
1147 return Ok(batch.clone());
1148 }
1149 let deleted_col = match batch.column_by_name("_deleted") {
1150 Some(col) => col
1151 .as_any()
1152 .downcast_ref::<arrow_array::BooleanArray>()
1153 .unwrap(),
1154 None => return Ok(batch.clone()),
1155 };
1156 let keep: Vec<bool> = (0..deleted_col.len())
1157 .map(|i| !deleted_col.value(i))
1158 .collect();
1159 let mask = arrow_array::BooleanArray::from(keep);
1160 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1161}
1162
1163fn filter_l0_tombstones(
1165 batch: &RecordBatch,
1166 l0_ctx: &crate::query::df_graph::L0Context,
1167) -> DFResult<RecordBatch> {
1168 if batch.num_rows() == 0 {
1169 return Ok(batch.clone());
1170 }
1171
1172 let mut tombstones: HashSet<u64> = HashSet::new();
1173 for l0 in l0_ctx.iter_l0_buffers() {
1174 let guard = l0.read();
1175 for vid in guard.vertex_tombstones.iter() {
1176 tombstones.insert(vid.as_u64());
1177 }
1178 }
1179
1180 if tombstones.is_empty() {
1181 return Ok(batch.clone());
1182 }
1183
1184 let vid_col = batch
1185 .column_by_name("_vid")
1186 .ok_or_else(|| {
1187 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1188 })?
1189 .as_any()
1190 .downcast_ref::<UInt64Array>()
1191 .unwrap();
1192
1193 let keep: Vec<bool> = (0..vid_col.len())
1194 .map(|i| !tombstones.contains(&vid_col.value(i)))
1195 .collect();
1196 let mask = arrow_array::BooleanArray::from(keep);
1197 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1198}
1199
1200fn filter_l0_edge_tombstones(
1202 batch: &RecordBatch,
1203 l0_ctx: &crate::query::df_graph::L0Context,
1204) -> DFResult<RecordBatch> {
1205 if batch.num_rows() == 0 {
1206 return Ok(batch.clone());
1207 }
1208
1209 let mut tombstones: HashSet<u64> = HashSet::new();
1210 for l0 in l0_ctx.iter_l0_buffers() {
1211 let guard = l0.read();
1212 for eid in guard.tombstones.keys() {
1213 tombstones.insert(eid.as_u64());
1214 }
1215 }
1216
1217 if tombstones.is_empty() {
1218 return Ok(batch.clone());
1219 }
1220
1221 let eid_col = batch
1222 .column_by_name("eid")
1223 .ok_or_else(|| {
1224 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1225 })?
1226 .as_any()
1227 .downcast_ref::<UInt64Array>()
1228 .unwrap();
1229
1230 let keep: Vec<bool> = (0..eid_col.len())
1231 .map(|i| !tombstones.contains(&eid_col.value(i)))
1232 .collect();
1233 let mask = arrow_array::BooleanArray::from(keep);
1234 arrow::compute::filter_record_batch(batch, &mask).map_err(arrow_err)
1235}
1236
1237fn extract_vid_from_physical_filter(filter: &Arc<dyn PhysicalExpr>) -> Option<u64> {
1246 use datafusion::logical_expr::Operator;
1247 use datafusion::physical_expr::expressions::BinaryExpr;
1248
1249 if let Some(bin) = filter.as_any().downcast_ref::<BinaryExpr>() {
1251 if bin.op() == &Operator::Eq {
1252 if let Some(vid) = try_extract_vid_eq(bin.left(), bin.right()) {
1254 return Some(vid);
1255 }
1256 if let Some(vid) = try_extract_vid_eq(bin.right(), bin.left()) {
1257 return Some(vid);
1258 }
1259 }
1260 if bin.op() == &Operator::And {
1262 if let Some(vid) = extract_vid_from_physical_filter(bin.left()) {
1263 return Some(vid);
1264 }
1265 return extract_vid_from_physical_filter(bin.right());
1266 }
1267 }
1268 None
1269}
1270
1271fn try_extract_vid_eq(
1275 col_side: &Arc<dyn PhysicalExpr>,
1276 val_side: &Arc<dyn PhysicalExpr>,
1277) -> Option<u64> {
1278 use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
1279
1280 let col = col_side.as_any().downcast_ref::<Column>()?;
1282 if col.name() != "_vid" && !col.name().ends_with("._vid") {
1283 return None;
1284 }
1285
1286 if let Some(lit) = val_side.as_any().downcast_ref::<Literal>() {
1288 return scalar_to_u64(lit.value());
1289 }
1290
1291 if let Some(cast) = val_side.as_any().downcast_ref::<CastExpr>()
1293 && let Some(lit) = cast.expr().as_any().downcast_ref::<Literal>()
1294 {
1295 return scalar_to_u64(lit.value());
1296 }
1297
1298 None
1299}
1300
1301fn combine_lance_filters(vid_filter: Option<&str>, extra: Option<&str>) -> Option<String> {
1304 match (vid_filter, extra) {
1305 (Some(a), Some(b)) => Some(format!("({a}) AND ({b})")),
1306 (Some(a), None) => Some(a.to_string()),
1307 (None, Some(b)) => Some(b.to_string()),
1308 (None, None) => None,
1309 }
1310}
1311
1312fn format_vid_in_list(vids: &[u64]) -> String {
1315 use std::fmt::Write;
1316 debug_assert!(!vids.is_empty());
1317 let mut s = String::with_capacity(vids.len() * 8 + 12);
1318 s.push_str("_vid IN (");
1319 for (i, v) in vids.iter().enumerate() {
1320 if i > 0 {
1321 s.push(',');
1322 }
1323 let _ = write!(s, "{v}");
1324 }
1325 s.push(')');
1326 s
1327}
1328
1329fn scalar_to_u64(sv: &datafusion::common::ScalarValue) -> Option<u64> {
1331 use datafusion::common::ScalarValue;
1332 match sv {
1333 ScalarValue::UInt64(Some(v)) => Some(*v),
1334 ScalarValue::Int64(Some(v)) if *v >= 0 => Some(*v as u64),
1335 ScalarValue::UInt32(Some(v)) => Some(*v as u64),
1336 ScalarValue::Int32(Some(v)) if *v >= 0 => Some(*v as u64),
1337 _ => None,
1338 }
1339}
1340
1341fn build_l0_vertex_batch(
1352 l0_ctx: &crate::query::df_graph::L0Context,
1353 label: &str,
1354 lance_schema: &SchemaRef,
1355 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1356 target_vids: Option<&[u64]>,
1357) -> DFResult<RecordBatch> {
1358 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
1361 let mut vid_created_at: HashMap<u64, i64> = HashMap::new();
1367 let mut vid_updated_at: HashMap<u64, i64> = HashMap::new();
1368
1369 for l0 in l0_ctx.iter_l0_buffers() {
1370 let guard = l0.read();
1371 for vid in guard.vertex_tombstones.iter() {
1373 tombstones.insert(vid.as_u64());
1374 }
1375 let candidate_vids: Vec<Vid> = if let Some(tvs) = target_vids {
1381 let mut out = Vec::with_capacity(tvs.len());
1382 for &tv in tvs {
1383 let vid = Vid::from(tv);
1384 if guard.vertex_properties.contains_key(&vid)
1385 && (label.is_empty()
1386 || guard
1387 .label_to_vids
1388 .get(label)
1389 .is_some_and(|s| s.contains(&vid)))
1390 {
1391 out.push(vid);
1392 }
1393 }
1394 out
1395 } else {
1396 guard.vids_for_label(label)
1397 };
1398 for vid in candidate_vids {
1399 let vid_u64 = vid.as_u64();
1400 if tombstones.contains(&vid_u64) {
1401 continue;
1402 }
1403 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1404 let entry = vid_data
1405 .entry(vid_u64)
1406 .or_insert_with(|| (Properties::new(), 0));
1407 if let Some(props) = guard.vertex_properties.get(&vid) {
1409 for (k, v) in props {
1410 entry.0.insert(k.clone(), v.clone());
1411 }
1412 }
1413 if version > entry.1 {
1415 entry.1 = version;
1416 }
1417 if let Some(&ts) = guard.vertex_created_at.get(&vid) {
1419 vid_created_at
1420 .entry(vid_u64)
1421 .and_modify(|cur| {
1422 if ts < *cur {
1423 *cur = ts;
1424 }
1425 })
1426 .or_insert(ts);
1427 }
1428 if let Some(&ts) = guard.vertex_updated_at.get(&vid) {
1429 vid_updated_at
1430 .entry(vid_u64)
1431 .and_modify(|cur| {
1432 if ts > *cur {
1433 *cur = ts;
1434 }
1435 })
1436 .or_insert(ts);
1437 }
1438 }
1439 }
1440
1441 for t in &tombstones {
1443 vid_data.remove(t);
1444 }
1445
1446 if vid_data.is_empty() {
1447 return Ok(RecordBatch::new_empty(lance_schema.clone()));
1448 }
1449
1450 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1452 vids.sort_unstable();
1453
1454 let num_rows = vids.len();
1455 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
1456
1457 let schema_prop_names: HashSet<&str> = label_props
1459 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
1460 .unwrap_or_default();
1461
1462 for field in lance_schema.fields() {
1463 let col_name = field.name().as_str();
1464 match col_name {
1465 "_vid" => {
1466 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1467 }
1468 "_deleted" => {
1469 let vals = vec![false; num_rows];
1471 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
1472 }
1473 "_version" => {
1474 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
1475 columns.push(Arc::new(UInt64Array::from(vals)));
1476 }
1477 "_created_at" => {
1478 let mut builder =
1479 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1480 for v in &vids {
1481 match vid_created_at.get(v) {
1482 Some(&ts) => builder.append_value(ts),
1483 None => builder.append_null(),
1484 }
1485 }
1486 columns.push(Arc::new(builder.finish()));
1487 }
1488 "_updated_at" => {
1489 let mut builder =
1490 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1491 for v in &vids {
1492 match vid_updated_at.get(v) {
1493 Some(&ts) => builder.append_value(ts),
1494 None => builder.append_null(),
1495 }
1496 }
1497 columns.push(Arc::new(builder.finish()));
1498 }
1499 "overflow_json" => {
1500 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1502 for vid_u64 in &vids {
1503 let (props, _) = &vid_data[vid_u64];
1504 let mut overflow: HashMap<String, Value> = HashMap::new();
1505 for (k, v) in props {
1506 if k == "ext_id" || k.starts_with('_') {
1507 continue;
1508 }
1509 if !schema_prop_names.contains(k.as_str()) {
1510 overflow.insert(k.clone(), v.clone());
1511 }
1512 }
1513 if overflow.is_empty() {
1514 builder.append_null();
1515 } else {
1516 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1517 overflow,
1518 )));
1519 }
1520 }
1521 columns.push(Arc::new(builder.finish()));
1522 }
1523 _ => {
1524 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
1526 columns.push(col);
1527 }
1528 }
1529 }
1530
1531 RecordBatch::try_new(lance_schema.clone(), columns).map_err(arrow_err)
1532}
1533
1534fn build_l0_property_column(
1538 vids: &[u64],
1539 vid_data: &HashMap<u64, (Properties, u64)>,
1540 prop_name: &str,
1541 data_type: &DataType,
1542) -> DFResult<ArrayRef> {
1543 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
1545 let props_map: HashMap<Vid, Properties> = vid_data
1546 .iter()
1547 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1548 .collect();
1549
1550 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1551}
1552
1553fn build_l0_edge_batch(
1559 l0_ctx: &crate::query::df_graph::L0Context,
1560 edge_type: &str,
1561 internal_schema: &SchemaRef,
1562 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1563) -> DFResult<RecordBatch> {
1564 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1567 let mut tombstones: HashSet<u64> = HashSet::new();
1568 let mut eid_created_at: HashMap<u64, i64> = HashMap::new();
1571 let mut eid_updated_at: HashMap<u64, i64> = HashMap::new();
1572
1573 for l0 in l0_ctx.iter_l0_buffers() {
1574 let guard = l0.read();
1575 for eid in guard.tombstones.keys() {
1577 tombstones.insert(eid.as_u64());
1578 }
1579 for eid in guard.eids_for_type(edge_type) {
1581 let eid_u64 = eid.as_u64();
1582 if tombstones.contains(&eid_u64) {
1583 continue;
1584 }
1585 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1586 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1587 None => continue,
1588 };
1589 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1590 let entry = eid_data
1591 .entry(eid_u64)
1592 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1593 if let Some(props) = guard.edge_properties.get(&eid) {
1595 for (k, v) in props {
1596 entry.2.insert(k.clone(), v.clone());
1597 }
1598 }
1599 entry.0 = src_vid;
1601 entry.1 = dst_vid;
1602 if version > entry.3 {
1604 entry.3 = version;
1605 }
1606 if let Some(&ts) = guard.edge_created_at.get(&eid) {
1608 eid_created_at
1609 .entry(eid_u64)
1610 .and_modify(|cur| {
1611 if ts < *cur {
1612 *cur = ts;
1613 }
1614 })
1615 .or_insert(ts);
1616 }
1617 if let Some(&ts) = guard.edge_updated_at.get(&eid) {
1618 eid_updated_at
1619 .entry(eid_u64)
1620 .and_modify(|cur| {
1621 if ts > *cur {
1622 *cur = ts;
1623 }
1624 })
1625 .or_insert(ts);
1626 }
1627 }
1628 }
1629
1630 for t in &tombstones {
1632 eid_data.remove(t);
1633 }
1634
1635 if eid_data.is_empty() {
1636 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1637 }
1638
1639 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1641 eids.sort_unstable();
1642
1643 let num_rows = eids.len();
1644 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1645
1646 let schema_prop_names: HashSet<&str> = type_props
1648 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1649 .unwrap_or_default();
1650
1651 for field in internal_schema.fields() {
1652 let col_name = field.name().as_str();
1653 match col_name {
1654 "eid" => {
1655 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1656 }
1657 "src_vid" => {
1658 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1659 columns.push(Arc::new(UInt64Array::from(vals)));
1660 }
1661 "dst_vid" => {
1662 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1663 columns.push(Arc::new(UInt64Array::from(vals)));
1664 }
1665 "op" => {
1666 let vals = vec![0u8; num_rows];
1668 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1669 }
1670 "_version" => {
1671 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1672 columns.push(Arc::new(UInt64Array::from(vals)));
1673 }
1674 "_created_at" => {
1675 let mut builder =
1676 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1677 for e in &eids {
1678 match eid_created_at.get(e) {
1679 Some(&ts) => builder.append_value(ts),
1680 None => builder.append_null(),
1681 }
1682 }
1683 columns.push(Arc::new(builder.finish()));
1684 }
1685 "_updated_at" => {
1686 let mut builder =
1687 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
1688 for e in &eids {
1689 match eid_updated_at.get(e) {
1690 Some(&ts) => builder.append_value(ts),
1691 None => builder.append_null(),
1692 }
1693 }
1694 columns.push(Arc::new(builder.finish()));
1695 }
1696 "overflow_json" => {
1697 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1699 for eid_u64 in &eids {
1700 let (_, _, props, _) = &eid_data[eid_u64];
1701 let mut overflow: HashMap<String, Value> = HashMap::new();
1702 for (k, v) in props {
1703 if k.starts_with('_') {
1704 continue;
1705 }
1706 if !schema_prop_names.contains(k.as_str()) {
1707 overflow.insert(k.clone(), v.clone());
1708 }
1709 }
1710 if overflow.is_empty() {
1711 builder.append_null();
1712 } else {
1713 builder.append_value(uni_common::cypher_value_codec::encode(&Value::Map(
1714 overflow,
1715 )));
1716 }
1717 }
1718 columns.push(Arc::new(builder.finish()));
1719 }
1720 _ => {
1721 let col =
1723 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1724 columns.push(col);
1725 }
1726 }
1727 }
1728
1729 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
1730}
1731
1732fn build_l0_edge_property_column(
1736 eids: &[u64],
1737 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1738 prop_name: &str,
1739 data_type: &DataType,
1740) -> DFResult<ArrayRef> {
1741 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1743 let props_map: HashMap<Vid, Properties> = eid_data
1744 .iter()
1745 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1746 .collect();
1747
1748 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1749}
1750
1751fn build_labels_column_for_known_label(
1757 vid_arr: &UInt64Array,
1758 label: &str,
1759 l0_ctx: &crate::query::df_graph::L0Context,
1760 batch_labels_col: Option<&arrow_array::ListArray>,
1761) -> DFResult<ArrayRef> {
1762 use uni_store::storage::arrow_convert::labels_from_list_array;
1763
1764 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1765
1766 for i in 0..vid_arr.len() {
1767 let vid = Vid::from(vid_arr.value(i));
1768
1769 let mut labels = match batch_labels_col {
1771 Some(list_arr) => {
1772 let stored = labels_from_list_array(list_arr, i);
1773 if stored.is_empty() {
1774 vec![label.to_string()]
1775 } else {
1776 stored
1777 }
1778 }
1779 None => vec![label.to_string()],
1780 };
1781
1782 if !labels.iter().any(|l| l == label) {
1784 labels.push(label.to_string());
1785 }
1786
1787 for l0 in l0_ctx.iter_l0_buffers() {
1789 let guard = l0.read();
1790 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1791 for lbl in l0_labels {
1792 if !labels.contains(lbl) {
1793 labels.push(lbl.clone());
1794 }
1795 }
1796 }
1797 }
1798
1799 let values = labels_builder.values();
1800 for lbl in &labels {
1801 values.append_value(lbl);
1802 }
1803 labels_builder.append(true);
1804 }
1805
1806 Ok(Arc::new(labels_builder.finish()))
1807}
1808
1809fn map_to_output_schema(
1815 batch: &RecordBatch,
1816 label: &str,
1817 _variable: &str,
1818 projected_properties: &[String],
1819 output_schema: &SchemaRef,
1820 l0_ctx: &crate::query::df_graph::L0Context,
1821) -> DFResult<RecordBatch> {
1822 if batch.num_rows() == 0 {
1823 return Ok(RecordBatch::new_empty(output_schema.clone()));
1824 }
1825
1826 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1827
1828 let vid_col = batch
1830 .column_by_name("_vid")
1831 .ok_or_else(|| {
1832 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1833 })?
1834 .clone();
1835 let vid_arr = vid_col
1836 .as_any()
1837 .downcast_ref::<UInt64Array>()
1838 .ok_or_else(|| {
1839 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1840 })?;
1841
1842 let batch_labels_col = batch
1844 .column_by_name("_labels")
1845 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1846 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1847 columns.push(vid_col.clone());
1848 columns.push(labels_col);
1849
1850 let overflow_arr = batch
1853 .column_by_name("overflow_json")
1854 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1855
1856 for prop in projected_properties {
1857 if prop == "overflow_json" {
1858 match batch.column_by_name("overflow_json") {
1859 Some(col) => columns.push(col.clone()),
1860 None => {
1861 columns.push(arrow_array::new_null_array(
1863 &DataType::LargeBinary,
1864 batch.num_rows(),
1865 ));
1866 }
1867 }
1868 } else if prop == "_all_props" {
1869 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
1873 let guard = l0.read();
1874 !guard.vertex_properties.is_empty()
1875 });
1876 let has_schema_cols = projected_properties
1878 .iter()
1879 .any(|p| p != "overflow_json" && p != "_all_props" && !p.starts_with('_'));
1880
1881 if !any_l0_has_vertex_props && !has_schema_cols {
1882 match batch.column_by_name("overflow_json") {
1884 Some(col) => columns.push(col.clone()),
1885 None => {
1886 columns.push(arrow_array::new_null_array(
1887 &DataType::LargeBinary,
1888 batch.num_rows(),
1889 ));
1890 }
1891 }
1892 } else {
1893 let col = build_all_props_column_for_schema_scan(
1895 batch,
1896 vid_arr,
1897 overflow_arr,
1898 projected_properties,
1899 l0_ctx,
1900 );
1901 columns.push(col);
1902 }
1903 } else {
1904 match batch.column_by_name(prop) {
1905 Some(col) => columns.push(col.clone()),
1906 None => {
1907 let col = build_overflow_property_column(
1910 batch.num_rows(),
1911 vid_arr,
1912 overflow_arr,
1913 prop,
1914 l0_ctx,
1915 );
1916 columns.push(col);
1917 }
1918 }
1919 }
1920 }
1921
1922 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
1923}
1924
1925fn map_edge_to_output_schema(
1932 batch: &RecordBatch,
1933 variable: &str,
1934 projected_properties: &[String],
1935 output_schema: &SchemaRef,
1936) -> DFResult<RecordBatch> {
1937 if batch.num_rows() == 0 {
1938 return Ok(RecordBatch::new_empty(output_schema.clone()));
1939 }
1940
1941 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1942
1943 let eid_col = batch
1945 .column_by_name("eid")
1946 .ok_or_else(|| {
1947 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1948 })?
1949 .clone();
1950 columns.push(eid_col);
1951
1952 let src_col = batch
1954 .column_by_name("src_vid")
1955 .ok_or_else(|| {
1956 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1957 })?
1958 .clone();
1959 columns.push(src_col);
1960
1961 let dst_col = batch
1963 .column_by_name("dst_vid")
1964 .ok_or_else(|| {
1965 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1966 })?
1967 .clone();
1968 columns.push(dst_col);
1969
1970 for prop in projected_properties {
1972 if prop == "overflow_json" {
1973 match batch.column_by_name("overflow_json") {
1974 Some(col) => columns.push(col.clone()),
1975 None => {
1976 columns.push(arrow_array::new_null_array(
1977 &DataType::LargeBinary,
1978 batch.num_rows(),
1979 ));
1980 }
1981 }
1982 } else {
1983 match batch.column_by_name(prop) {
1984 Some(col) => columns.push(col.clone()),
1985 None => {
1986 let overflow_arr = batch
1989 .column_by_name("overflow_json")
1990 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1991
1992 if let Some(arr) = overflow_arr {
1993 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1994 for i in 0..batch.num_rows() {
1995 if !arr.is_null(i) {
1996 let blob = arr.value(i);
1997 if let Some(sub_bytes) =
1999 uni_common::cypher_value_codec::extract_map_entry_raw(
2000 blob, prop,
2001 )
2002 {
2003 builder.append_value(&sub_bytes);
2004 } else {
2005 builder.append_null();
2006 }
2007 } else {
2008 builder.append_null();
2009 }
2010 }
2011 columns.push(Arc::new(builder.finish()));
2012 } else {
2013 let target_field = output_schema
2015 .fields()
2016 .iter()
2017 .find(|f| f.name() == &format!("{}.{}", variable, prop));
2018 let dt = target_field
2019 .map(|f| f.data_type().clone())
2020 .unwrap_or(DataType::LargeBinary);
2021 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
2022 }
2023 }
2024 }
2025 }
2026 }
2027
2028 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2029}
2030
2031#[expect(clippy::too_many_arguments)]
2038async fn columnar_scan_vertex_batch_static(
2039 graph_ctx: &GraphExecutionContext,
2040 label: &str,
2041 variable: &str,
2042 projected_properties: &[String],
2043 output_schema: &SchemaRef,
2044 filter: &Option<Arc<dyn PhysicalExpr>>,
2045 vid_list_filter: Option<&[u64]>,
2046 extra_lance_filter: Option<&str>,
2047 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2048) -> DFResult<RecordBatch> {
2049 let storage = graph_ctx.storage();
2050 let l0_ctx = graph_ctx.l0_context();
2051 let uni_schema = storage.schema_manager().schema();
2052 let label_props = uni_schema.properties.get(label);
2053
2054 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2059
2060 let mut lance_columns: Vec<String> = vec![
2062 "_vid".to_string(),
2063 "_deleted".to_string(),
2064 "_version".to_string(),
2065 ];
2066 for prop in projected_properties {
2067 if prop == "overflow_json" {
2068 push_column_if_absent(&mut lance_columns, "overflow_json");
2069 } else if prop == "_created_at" || prop == "_updated_at" {
2070 push_column_if_absent(&mut lance_columns, prop);
2073 } else {
2074 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
2075 if exists_in_schema {
2076 push_column_if_absent(&mut lance_columns, prop);
2077 }
2078 }
2079 }
2080
2081 let needs_overflow = projected_properties.iter().any(|p| {
2084 p == "overflow_json"
2085 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2086 && !label_props.is_some_and(|lp| lp.contains_key(p)))
2087 });
2088 if needs_overflow {
2089 push_column_if_absent(&mut lance_columns, "overflow_json");
2090 }
2091
2092 let vid_part = match (vid_list_filter, target_vid) {
2097 (Some(vs), _) if !vs.is_empty() => Some(format_vid_in_list(vs)),
2098 (_, Some(v)) => Some(format!("_vid = {v}")),
2099 _ => None,
2100 };
2101 let combined_filter = combine_lance_filters(vid_part.as_deref(), extra_lance_filter);
2102 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2103
2104 let plugin_batch: Option<arrow::record_batch::RecordBatch> = match graph_ctx.plugin_registry() {
2110 Some(reg) => match reg.lookup_label_storage(label) {
2111 Some(plugin_storage) => {
2112 let mut stream = plugin_storage.read_batch(label, None).await.map_err(|e| {
2113 datafusion::error::DataFusionError::Execution(format!(
2114 "plugin Storage::read_batch({label}) failed: {} (code 0x{:x})",
2115 e.message, e.code
2116 ))
2117 })?;
2118 use futures::StreamExt;
2119 let mut batches: Vec<arrow::record_batch::RecordBatch> = Vec::new();
2120 let mut schema_ref: Option<SchemaRef> = None;
2121 while let Some(b) = stream.next().await {
2122 let b = b.map_err(|e| {
2123 datafusion::error::DataFusionError::Execution(format!(
2124 "plugin Storage stream({label}) errored: {e}"
2125 ))
2126 })?;
2127 if schema_ref.is_none() {
2128 schema_ref = Some(b.schema());
2129 }
2130 batches.push(b);
2131 }
2132 if let Some(s) = schema_ref {
2133 Some(arrow::compute::concat_batches(&s, &batches).map_err(|e| {
2134 datafusion::error::DataFusionError::Execution(format!(
2135 "plugin Storage concat({label}) failed: {e}"
2136 ))
2137 })?)
2138 } else {
2139 None
2140 }
2141 }
2142 None => None,
2143 },
2144 None => None,
2145 };
2146
2147 let (lance_batch, pushdown_filtered) = match plugin_batch {
2151 Some(b) => (Some(b), false),
2152 None => (
2153 storage
2154 .scan_vertex_table(label, &lance_columns_refs, combined_filter.as_deref())
2155 .await
2156 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?,
2157 extra_lance_filter.is_some(),
2158 ),
2159 };
2160
2161 let lance_batch = match (lance_batch, pushdown_filtered) {
2165 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, Some(label), b).await?),
2166 (b, _) => b,
2167 };
2168
2169 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2171
2172 let internal_schema = match &lance_deduped {
2175 Some(batch) => batch.schema(),
2176 None => {
2177 let mut fields = vec![
2178 Field::new("_vid", DataType::UInt64, false),
2179 Field::new("_deleted", DataType::Boolean, false),
2180 Field::new("_version", DataType::UInt64, false),
2181 ];
2182 for col in &lance_columns {
2183 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
2184 continue;
2185 }
2186 if col == "overflow_json" {
2187 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2188 } else if col == "_created_at" || col == "_updated_at" {
2189 fields.push(Field::new(
2190 col,
2191 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2192 true,
2193 ));
2194 } else {
2195 let arrow_type = label_props
2196 .and_then(|lp| lp.get(col.as_str()))
2197 .map(|meta| meta.r#type.to_arrow())
2198 .unwrap_or(DataType::LargeBinary);
2199 fields.push(Field::new(col, arrow_type, true));
2200 }
2201 }
2202 Arc::new(Schema::new(fields))
2203 }
2204 };
2205
2206 let single_vid_buf: [u64; 1];
2212 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2213 (Some(vs), _) if !vs.is_empty() => Some(vs),
2214 (_, Some(v)) => {
2215 single_vid_buf = [v];
2216 Some(&single_vid_buf)
2217 }
2218 _ => None,
2219 };
2220 let l0_batch =
2221 build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props, l0_target_vids)?;
2222
2223 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2225 else {
2226 return Ok(RecordBatch::new_empty(output_schema.clone()));
2227 };
2228
2229 let merged = filter_deleted_rows(&merged)?;
2231 if merged.num_rows() == 0 {
2232 return Ok(RecordBatch::new_empty(output_schema.clone()));
2233 }
2234
2235 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2237
2238 if filtered.num_rows() == 0 {
2239 return Ok(RecordBatch::new_empty(output_schema.clone()));
2240 }
2241
2242 let mapped = map_to_output_schema(
2244 &filtered,
2245 label,
2246 variable,
2247 projected_properties,
2248 output_schema,
2249 l0_ctx,
2250 )?;
2251
2252 apply_runtime_filter(mapped, extra_runtime_filter)
2256}
2257
2258fn apply_runtime_filter(
2263 batch: RecordBatch,
2264 runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2265) -> DFResult<RecordBatch> {
2266 let Some(filter) = runtime_filter else {
2267 return Ok(batch);
2268 };
2269 if batch.num_rows() == 0 {
2270 return Ok(batch);
2271 }
2272 let result = filter.evaluate(&batch)?;
2273 let array = result.into_array(batch.num_rows())?;
2274 let bools = array
2275 .as_any()
2276 .downcast_ref::<arrow_array::BooleanArray>()
2277 .ok_or_else(|| {
2278 datafusion::error::DataFusionError::Internal(
2279 "indexed-property runtime filter did not produce a BooleanArray".to_string(),
2280 )
2281 })?;
2282 arrow::compute::filter_record_batch(&batch, bools).map_err(arrow_err)
2283}
2284
2285async fn columnar_scan_edge_batch_static(
2292 graph_ctx: &GraphExecutionContext,
2293 edge_type: &str,
2294 variable: &str,
2295 projected_properties: &[String],
2296 output_schema: &SchemaRef,
2297) -> DFResult<RecordBatch> {
2298 let storage = graph_ctx.storage();
2299 let l0_ctx = graph_ctx.l0_context();
2300 let uni_schema = storage.schema_manager().schema();
2301 let type_props = uni_schema.properties.get(edge_type);
2302
2303 let mut lance_columns: Vec<String> = vec![
2305 "eid".to_string(),
2306 "src_vid".to_string(),
2307 "dst_vid".to_string(),
2308 "op".to_string(),
2309 "_version".to_string(),
2310 ];
2311 for prop in projected_properties {
2312 if prop == "overflow_json" {
2313 push_column_if_absent(&mut lance_columns, "overflow_json");
2314 } else if prop == "_created_at" || prop == "_updated_at" {
2315 push_column_if_absent(&mut lance_columns, prop);
2317 } else {
2318 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
2319 if exists_in_schema {
2320 push_column_if_absent(&mut lance_columns, prop);
2321 }
2322 }
2323 }
2324
2325 let needs_overflow = projected_properties.iter().any(|p| {
2328 p == "overflow_json"
2329 || (!matches!(p.as_str(), "_created_at" | "_updated_at")
2330 && !type_props.is_some_and(|tp| tp.contains_key(p)))
2331 });
2332 if needs_overflow {
2333 push_column_if_absent(&mut lance_columns, "overflow_json");
2334 }
2335
2336 let lance_columns_refs: Vec<&str> = lance_columns.iter().map(|s| s.as_str()).collect();
2338 let lance_batch = storage
2339 .scan_delta_table(edge_type, "fwd", &lance_columns_refs, None)
2340 .await
2341 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2342
2343 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
2345
2346 let internal_schema = match &lance_deduped {
2349 Some(batch) => batch.schema(),
2350 None => {
2351 let mut fields = vec![
2352 Field::new("eid", DataType::UInt64, false),
2353 Field::new("src_vid", DataType::UInt64, false),
2354 Field::new("dst_vid", DataType::UInt64, false),
2355 Field::new("op", DataType::UInt8, false),
2356 Field::new("_version", DataType::UInt64, false),
2357 ];
2358 for col in &lance_columns {
2359 if matches!(
2360 col.as_str(),
2361 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
2362 ) {
2363 continue;
2364 }
2365 if col == "overflow_json" {
2366 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
2367 } else if col == "_created_at" || col == "_updated_at" {
2368 fields.push(Field::new(
2369 col,
2370 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
2371 true,
2372 ));
2373 } else {
2374 let arrow_type = type_props
2375 .and_then(|tp| tp.get(col.as_str()))
2376 .map(|meta| meta.r#type.to_arrow())
2377 .unwrap_or(DataType::LargeBinary);
2378 fields.push(Field::new(col, arrow_type, true));
2379 }
2380 }
2381 Arc::new(Schema::new(fields))
2382 }
2383 };
2384
2385 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
2387
2388 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
2390 return Ok(RecordBatch::new_empty(output_schema.clone()));
2391 };
2392
2393 let merged = filter_deleted_edge_ops(&merged)?;
2395 if merged.num_rows() == 0 {
2396 return Ok(RecordBatch::new_empty(output_schema.clone()));
2397 }
2398
2399 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
2401
2402 if filtered.num_rows() == 0 {
2403 return Ok(RecordBatch::new_empty(output_schema.clone()));
2404 }
2405
2406 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
2408}
2409
2410#[expect(clippy::too_many_arguments)]
2417async fn columnar_scan_schemaless_vertex_batch_static(
2418 graph_ctx: &GraphExecutionContext,
2419 label: &str,
2420 variable: &str,
2421 projected_properties: &[String],
2422 output_schema: &SchemaRef,
2423 filter: &Option<Arc<dyn PhysicalExpr>>,
2424 vid_list_filter: Option<&[u64]>,
2425 extra_lance_filter: Option<&str>,
2426 extra_runtime_filter: Option<&Arc<dyn PhysicalExpr>>,
2427) -> DFResult<RecordBatch> {
2428 let storage = graph_ctx.storage();
2429 let l0_ctx = graph_ctx.l0_context();
2430
2431 let target_vid = filter.as_ref().and_then(extract_vid_from_physical_filter);
2435
2436 let filter = {
2439 let mut parts = Vec::new();
2440
2441 match (vid_list_filter, target_vid) {
2444 (Some(vs), _) if !vs.is_empty() => parts.push(format_vid_in_list(vs)),
2445 (_, Some(vid)) => parts.push(format!("_vid = {vid}")),
2446 _ => {}
2447 }
2448
2449 if !label.is_empty() {
2451 if label.contains(':') {
2452 for lbl in label.split(':') {
2454 parts.push(format!("array_contains(labels, '{}')", lbl));
2455 }
2456 } else {
2457 parts.push(format!("array_contains(labels, '{}')", label));
2458 }
2459 }
2460
2461 if let Some(extra) = extra_lance_filter {
2463 parts.push(extra.to_string());
2464 }
2465
2466 if parts.is_empty() {
2467 None
2468 } else {
2469 Some(parts.join(" AND "))
2470 }
2471 };
2472
2473 let lance_batch = storage
2475 .scan_main_vertex_table(
2476 &["_vid", "_deleted", "labels", "props_json", "_version"],
2477 filter.as_deref(),
2478 )
2479 .await
2480 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2481
2482 let lance_batch = match (lance_batch, extra_lance_filter.is_some()) {
2486 (Some(b), true) => Some(drop_superseded_pushdown_rows(storage, None, b).await?),
2487 (b, _) => b,
2488 };
2489
2490 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
2492
2493 let internal_schema = match &lance_deduped {
2496 Some(batch) => batch.schema(),
2497 None => Arc::new(Schema::new(vec![
2498 Field::new("_vid", DataType::UInt64, false),
2499 Field::new("_deleted", DataType::Boolean, false),
2500 Field::new("labels", labels_data_type(), false),
2501 Field::new("props_json", DataType::LargeBinary, true),
2502 Field::new("_version", DataType::UInt64, false),
2503 ])),
2504 };
2505
2506 let single_vid_buf: [u64; 1];
2510 let l0_target_vids: Option<&[u64]> = match (vid_list_filter, target_vid) {
2511 (Some(vs), _) if !vs.is_empty() => Some(vs),
2512 (_, Some(v)) => {
2513 single_vid_buf = [v];
2514 Some(&single_vid_buf)
2515 }
2516 _ => None,
2517 };
2518 let l0_batch =
2519 build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema, l0_target_vids)?;
2520
2521 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
2523 else {
2524 return Ok(RecordBatch::new_empty(output_schema.clone()));
2525 };
2526
2527 let merged = filter_deleted_rows(&merged)?;
2529 if merged.num_rows() == 0 {
2530 return Ok(RecordBatch::new_empty(output_schema.clone()));
2531 }
2532
2533 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
2535
2536 if filtered.num_rows() == 0 {
2537 return Ok(RecordBatch::new_empty(output_schema.clone()));
2538 }
2539
2540 let mapped = map_to_schemaless_output_schema(
2542 &filtered,
2543 variable,
2544 projected_properties,
2545 output_schema,
2546 l0_ctx,
2547 )?;
2548
2549 apply_runtime_filter(mapped, extra_runtime_filter)
2551}
2552
2553fn build_l0_schemaless_vertex_batch(
2559 l0_ctx: &crate::query::df_graph::L0Context,
2560 label: &str,
2561 internal_schema: &SchemaRef,
2562 target_vids: Option<&[u64]>,
2563) -> DFResult<RecordBatch> {
2564 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
2567 let mut tombstones: HashSet<u64> = HashSet::new();
2568
2569 let label_filter: Vec<&str> = if label.is_empty() {
2571 vec![]
2572 } else if label.contains(':') {
2573 label.split(':').collect()
2574 } else {
2575 vec![label]
2576 };
2577
2578 for l0 in l0_ctx.iter_l0_buffers() {
2579 let guard = l0.read();
2580
2581 for vid in guard.vertex_tombstones.iter() {
2583 tombstones.insert(vid.as_u64());
2584 }
2585
2586 let vids: Vec<Vid> = if let Some(tvs) = target_vids {
2589 let mut out = Vec::with_capacity(tvs.len());
2590 for &tv in tvs {
2591 let vid = Vid::from(tv);
2592 if !guard.vertex_properties.contains_key(&vid) {
2593 continue;
2594 }
2595 let label_ok = if label_filter.is_empty() {
2596 true
2597 } else if let Some(labels) = guard.vertex_labels.get(&vid) {
2598 label_filter
2599 .iter()
2600 .all(|lf| labels.contains(&lf.to_string()))
2601 } else {
2602 false
2603 };
2604 if label_ok {
2605 out.push(vid);
2606 }
2607 }
2608 out
2609 } else if label_filter.is_empty() {
2610 guard.all_vertex_vids()
2611 } else if label_filter.len() == 1 {
2612 guard.vids_for_label(label_filter[0])
2613 } else {
2614 guard.vids_with_all_labels(&label_filter)
2615 };
2616
2617 for vid in vids {
2618 let vid_u64 = vid.as_u64();
2619 if tombstones.contains(&vid_u64) {
2620 continue;
2621 }
2622 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
2623 let entry = vid_data
2624 .entry(vid_u64)
2625 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
2626
2627 if let Some(props) = guard.vertex_properties.get(&vid) {
2629 for (k, v) in props {
2630 entry.0.insert(k.clone(), v.clone());
2631 }
2632 }
2633 if version > entry.1 {
2635 entry.1 = version;
2636 }
2637 if let Some(labels) = guard.vertex_labels.get(&vid) {
2639 entry.2 = labels.clone();
2640 }
2641 }
2642 }
2643
2644 for t in &tombstones {
2646 vid_data.remove(t);
2647 }
2648
2649 if vid_data.is_empty() {
2650 return Ok(RecordBatch::new_empty(internal_schema.clone()));
2651 }
2652
2653 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
2655 vids.sort_unstable();
2656
2657 let num_rows = vids.len();
2658 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
2659
2660 for field in internal_schema.fields() {
2661 match field.name().as_str() {
2662 "_vid" => {
2663 columns.push(Arc::new(UInt64Array::from(vids.clone())));
2664 }
2665 "labels" => {
2666 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2667 for vid_u64 in &vids {
2668 let (_, _, labels) = &vid_data[vid_u64];
2669 let values = labels_builder.values();
2670 for lbl in labels {
2671 values.append_value(lbl);
2672 }
2673 labels_builder.append(true);
2674 }
2675 columns.push(Arc::new(labels_builder.finish()));
2676 }
2677 "props_json" => {
2678 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2679 for vid_u64 in &vids {
2680 let (props, _, _) = &vid_data[vid_u64];
2681 if props.is_empty() {
2682 builder.append_null();
2683 } else {
2684 let map: HashMap<String, Value> =
2687 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2688 builder
2689 .append_value(uni_common::cypher_value_codec::encode(&Value::Map(map)));
2690 }
2691 }
2692 columns.push(Arc::new(builder.finish()));
2693 }
2694 "_deleted" => {
2695 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2697 false;
2698 num_rows
2699 ])));
2700 }
2701 "_version" => {
2702 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2703 columns.push(Arc::new(UInt64Array::from(vals)));
2704 }
2705 _ => {
2706 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2708 }
2709 }
2710 }
2711
2712 RecordBatch::try_new(internal_schema.clone(), columns).map_err(arrow_err)
2713}
2714
2715fn map_to_schemaless_output_schema(
2722 batch: &RecordBatch,
2723 _variable: &str,
2724 projected_properties: &[String],
2725 output_schema: &SchemaRef,
2726 l0_ctx: &crate::query::df_graph::L0Context,
2727) -> DFResult<RecordBatch> {
2728 if batch.num_rows() == 0 {
2729 return Ok(RecordBatch::new_empty(output_schema.clone()));
2730 }
2731
2732 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2733
2734 let vid_col = batch
2736 .column_by_name("_vid")
2737 .ok_or_else(|| {
2738 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2739 })?
2740 .clone();
2741 let vid_arr = vid_col
2742 .as_any()
2743 .downcast_ref::<UInt64Array>()
2744 .ok_or_else(|| {
2745 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2746 })?;
2747 columns.push(vid_col.clone());
2748
2749 let labels_col = batch.column_by_name("labels");
2751 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2752
2753 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2754 for i in 0..vid_arr.len() {
2755 let vid_u64 = vid_arr.value(i);
2756 let vid = Vid::from(vid_u64);
2757
2758 let mut row_labels: Vec<String> = Vec::new();
2760 if let Some(arr) = labels_arr
2761 && !arr.is_null(i)
2762 {
2763 let list_val = arr.value(i);
2764 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2765 for j in 0..str_arr.len() {
2766 if !str_arr.is_null(j) {
2767 row_labels.push(str_arr.value(j).to_string());
2768 }
2769 }
2770 }
2771 }
2772
2773 for l0 in l0_ctx.iter_l0_buffers() {
2775 let guard = l0.read();
2776 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2777 for lbl in l0_labels {
2778 if !row_labels.contains(lbl) {
2779 row_labels.push(lbl.clone());
2780 }
2781 }
2782 }
2783 }
2784
2785 let values = labels_builder.values();
2786 for lbl in &row_labels {
2787 values.append_value(lbl);
2788 }
2789 labels_builder.append(true);
2790 }
2791 columns.push(Arc::new(labels_builder.finish()));
2792
2793 let props_col = batch.column_by_name("props_json");
2795 let props_arr =
2796 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2797
2798 for prop in projected_properties {
2799 if prop == "_all_props" {
2800 let any_l0_has_vertex_props = l0_ctx.iter_l0_buffers().any(|l0| {
2803 let guard = l0.read();
2804 !guard.vertex_properties.is_empty()
2805 });
2806 if !any_l0_has_vertex_props {
2807 match props_col {
2808 Some(col) => columns.push(col.clone()),
2809 None => {
2810 columns.push(arrow_array::new_null_array(
2811 &DataType::LargeBinary,
2812 batch.num_rows(),
2813 ));
2814 }
2815 }
2816 } else {
2817 let col = build_all_props_column_with_l0_overlay(
2818 batch.num_rows(),
2819 vid_arr,
2820 props_arr,
2821 l0_ctx,
2822 );
2823 columns.push(col);
2824 }
2825 } else {
2826 let expected_type = output_schema
2831 .field_with_name(&format!("{_variable}.{prop}"))
2832 .map(|f| f.data_type().clone())
2833 .unwrap_or(DataType::LargeBinary);
2834
2835 if expected_type == DataType::LargeBinary {
2836 let col = build_overflow_property_column(
2837 batch.num_rows(),
2838 vid_arr,
2839 props_arr,
2840 prop,
2841 l0_ctx,
2842 );
2843 columns.push(col);
2844 } else {
2845 let mut prop_values: HashMap<Vid, Properties> = HashMap::new();
2847 for i in 0..batch.num_rows() {
2848 let vid = Vid::from(vid_arr.value(i));
2849 let resolved =
2850 resolve_l0_property(&vid, prop, l0_ctx)
2851 .flatten()
2852 .or_else(|| {
2853 extract_from_overflow_blob(props_arr, i, prop).and_then(|bytes| {
2854 uni_common::cypher_value_codec::decode(&bytes).ok()
2855 })
2856 });
2857 if let Some(val) = resolved {
2858 prop_values.insert(vid, HashMap::from([(prop.to_string(), val)]));
2859 }
2860 }
2861 let vids: Vec<Vid> = (0..batch.num_rows())
2862 .map(|i| Vid::from(vid_arr.value(i)))
2863 .collect();
2864 let col = build_property_column_static(&vids, &prop_values, prop, &expected_type)
2865 .unwrap_or_else(|_| {
2866 arrow_array::new_null_array(&expected_type, batch.num_rows())
2867 });
2868 columns.push(col);
2869 }
2870 }
2871 }
2872
2873 RecordBatch::try_new(output_schema.clone(), columns).map_err(arrow_err)
2874}
2875
2876pub(crate) fn get_property_value(
2878 vid: &Vid,
2879 props_map: &HashMap<Vid, Properties>,
2880 prop_name: &str,
2881) -> Option<Value> {
2882 if prop_name == "_all_props" {
2883 return props_map.get(vid).map(|p| {
2884 let map: HashMap<String, Value> =
2885 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2886 Value::Map(map)
2887 });
2888 }
2889 props_map
2890 .get(vid)
2891 .and_then(|props| props.get(prop_name))
2892 .cloned()
2893}
2894
2895macro_rules! build_numeric_column {
2897 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2898 let mut builder = <$builder_ty>::new();
2899 for vid in $vids {
2900 match get_property_value(vid, $props_map, $prop_name) {
2901 Some(ref v) => {
2902 if let Some(val) = $extractor(v) {
2903 builder.append_value($cast(val));
2904 } else {
2905 builder.append_null();
2906 }
2907 }
2908 None => builder.append_null(),
2909 }
2910 }
2911 Ok(Arc::new(builder.finish()) as ArrayRef)
2912 }};
2913}
2914
2915pub(crate) fn build_property_column_static(
2917 vids: &[Vid],
2918 props_map: &HashMap<Vid, Properties>,
2919 prop_name: &str,
2920 data_type: &DataType,
2921) -> DFResult<ArrayRef> {
2922 match data_type {
2923 DataType::LargeBinary => {
2924 use arrow_array::builder::LargeBinaryBuilder;
2926 let mut builder = LargeBinaryBuilder::new();
2927
2928 for vid in vids {
2929 match get_property_value(vid, props_map, prop_name) {
2930 Some(Value::Null) | None => builder.append_null(),
2931 Some(Value::Bytes(bytes)) => {
2932 builder.append_value(&bytes);
2933 }
2934 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2935 let bytes: Vec<u8> = arr
2938 .iter()
2939 .filter_map(|v| v.as_u64().map(|n| n as u8))
2940 .collect();
2941 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2942 builder.append_value(&bytes);
2943 } else {
2944 builder.append_value(uni_common::cypher_value_codec::encode(
2945 &Value::List(arr),
2946 ));
2947 }
2948 }
2949 Some(val) => {
2950 builder.append_value(uni_common::cypher_value_codec::encode(&val));
2954 }
2955 }
2956 }
2957 Ok(Arc::new(builder.finish()))
2958 }
2959 DataType::Binary => {
2960 let mut builder = BinaryBuilder::new();
2962 for vid in vids {
2963 let bytes = get_property_value(vid, props_map, prop_name)
2964 .filter(|v| !v.is_null())
2965 .and_then(|v| {
2966 let json_val: serde_json::Value = v.into();
2967 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2968 })
2969 .and_then(|crdt| crdt.to_msgpack().ok());
2970 match bytes {
2971 Some(b) => builder.append_value(&b),
2972 None => builder.append_null(),
2973 }
2974 }
2975 Ok(Arc::new(builder.finish()))
2976 }
2977 DataType::Utf8 => {
2978 let mut builder = StringBuilder::new();
2979 for vid in vids {
2980 match get_property_value(vid, props_map, prop_name) {
2981 Some(Value::String(s)) => builder.append_value(s),
2982 Some(Value::Null) | None => builder.append_null(),
2983 Some(other) => builder.append_value(other.to_string()),
2984 }
2985 }
2986 Ok(Arc::new(builder.finish()))
2987 }
2988 DataType::Int64 => {
2989 build_numeric_column!(
2990 vids,
2991 props_map,
2992 prop_name,
2993 Int64Builder,
2994 |v: &Value| v.as_i64(),
2995 |v| v
2996 )
2997 }
2998 DataType::Int32 => {
2999 build_numeric_column!(
3000 vids,
3001 props_map,
3002 prop_name,
3003 Int32Builder,
3004 |v: &Value| v.as_i64(),
3005 |v: i64| v as i32
3006 )
3007 }
3008 DataType::Float64 => {
3009 build_numeric_column!(
3010 vids,
3011 props_map,
3012 prop_name,
3013 Float64Builder,
3014 |v: &Value| v.as_f64(),
3015 |v| v
3016 )
3017 }
3018 DataType::Float32 => {
3019 build_numeric_column!(
3020 vids,
3021 props_map,
3022 prop_name,
3023 Float32Builder,
3024 |v: &Value| v.as_f64(),
3025 |v: f64| v as f32
3026 )
3027 }
3028 DataType::Boolean => {
3029 let mut builder = BooleanBuilder::new();
3030 for vid in vids {
3031 match get_property_value(vid, props_map, prop_name) {
3032 Some(Value::Bool(b)) => builder.append_value(b),
3033 _ => builder.append_null(),
3034 }
3035 }
3036 Ok(Arc::new(builder.finish()))
3037 }
3038 DataType::UInt64 => {
3039 build_numeric_column!(
3040 vids,
3041 props_map,
3042 prop_name,
3043 UInt64Builder,
3044 |v: &Value| v.as_u64(),
3045 |v| v
3046 )
3047 }
3048 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
3049 let values_builder = Float32Builder::new();
3051 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
3052 for vid in vids {
3053 match get_property_value(vid, props_map, prop_name) {
3054 Some(Value::Vector(v)) => {
3055 for val in v {
3056 list_builder.values().append_value(val);
3057 }
3058 list_builder.append(true);
3059 }
3060 Some(Value::List(arr)) => {
3061 for v in arr {
3062 list_builder
3063 .values()
3064 .append_value(v.as_f64().unwrap_or(0.0) as f32);
3065 }
3066 list_builder.append(true);
3067 }
3068 _ => {
3069 for _ in 0..*dim {
3071 list_builder.values().append_null();
3072 }
3073 list_builder.append(false);
3074 }
3075 }
3076 }
3077 Ok(Arc::new(list_builder.finish()))
3078 }
3079 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
3080 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
3082 for vid in vids {
3083 match get_property_value(vid, props_map, prop_name) {
3084 Some(Value::Temporal(tv)) => match tv {
3085 uni_common::TemporalValue::DateTime {
3086 nanos_since_epoch, ..
3087 }
3088 | uni_common::TemporalValue::LocalDateTime {
3089 nanos_since_epoch, ..
3090 } => {
3091 builder.append_value(nanos_since_epoch);
3092 }
3093 uni_common::TemporalValue::Date { days_since_epoch } => {
3094 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
3095 }
3096 _ => builder.append_null(),
3097 },
3098 Some(Value::String(s)) => match parse_datetime_utc(&s) {
3099 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
3100 Err(_) => builder.append_null(),
3101 },
3102 Some(Value::Int(n)) => {
3103 builder.append_value(n);
3104 }
3105 _ => builder.append_null(),
3106 }
3107 }
3108 Ok(Arc::new(builder.finish()))
3109 }
3110 DataType::Date32 => {
3111 let mut builder = Date32Builder::new();
3112 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
3113 for vid in vids {
3114 match get_property_value(vid, props_map, prop_name) {
3115 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
3116 builder.append_value(days_since_epoch);
3117 }
3118 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
3119 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
3120 Err(_) => builder.append_null(),
3121 },
3122 Some(Value::Int(n)) => {
3123 builder.append_value(n as i32);
3124 }
3125 _ => builder.append_null(),
3126 }
3127 }
3128 Ok(Arc::new(builder.finish()))
3129 }
3130 DataType::Time64(TimeUnit::Nanosecond) => {
3131 let mut builder = Time64NanosecondBuilder::new();
3132 for vid in vids {
3133 match get_property_value(vid, props_map, prop_name) {
3134 Some(Value::Temporal(
3135 uni_common::TemporalValue::LocalTime {
3136 nanos_since_midnight,
3137 }
3138 | uni_common::TemporalValue::Time {
3139 nanos_since_midnight,
3140 ..
3141 },
3142 )) => {
3143 builder.append_value(nanos_since_midnight);
3144 }
3145 Some(Value::Temporal(_)) => builder.append_null(),
3146 Some(Value::String(s)) => {
3147 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
3148 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
3149 {
3150 Ok(t) => {
3151 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
3152 + t.nanosecond() as i64;
3153 builder.append_value(nanos);
3154 }
3155 Err(_) => builder.append_null(),
3156 }
3157 }
3158 Some(Value::Int(n)) => {
3159 builder.append_value(n);
3160 }
3161 _ => builder.append_null(),
3162 }
3163 }
3164 Ok(Arc::new(builder.finish()))
3165 }
3166 DataType::Interval(IntervalUnit::MonthDayNano) => {
3167 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
3168 Vec::with_capacity(vids.len());
3169 for vid in vids {
3170 match get_property_value(vid, props_map, prop_name) {
3171 Some(Value::Temporal(uni_common::TemporalValue::Duration {
3172 months,
3173 days,
3174 nanos,
3175 })) => {
3176 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
3177 months: months as i32,
3178 days: days as i32,
3179 nanoseconds: nanos,
3180 }));
3181 }
3182 Some(Value::Int(_n)) => {
3183 values.push(None);
3184 }
3185 _ => values.push(None),
3186 }
3187 }
3188 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
3189 Ok(Arc::new(arr))
3190 }
3191 DataType::List(inner_field) => {
3192 build_list_property_column(vids, props_map, prop_name, inner_field)
3193 }
3194 DataType::Struct(fields) => {
3195 build_struct_property_column(vids, props_map, prop_name, fields)
3196 }
3197 DataType::FixedSizeBinary(24) => {
3198 use arrow_array::builder::FixedSizeBinaryBuilder;
3200 const BTIC_LEN: i32 = 24;
3201 let mut builder = FixedSizeBinaryBuilder::with_capacity(vids.len(), BTIC_LEN);
3202 for vid in vids {
3203 match get_property_value(vid, props_map, prop_name) {
3204 Some(Value::Temporal(uni_common::TemporalValue::Btic { lo, hi, meta })) => {
3205 match uni_btic::Btic::new(lo, hi, meta) {
3206 Ok(b) => {
3207 builder
3208 .append_value(uni_btic::encode::encode(&b))
3209 .map_err(arrow_err)?;
3210 }
3211 Err(e) => {
3212 tracing::warn!(
3213 "BTIC coercion failed for property '{}': invalid value (lo={}, hi={}, meta={:#x}): {}",
3214 prop_name,
3215 lo,
3216 hi,
3217 meta,
3218 e
3219 );
3220 builder.append_null()
3221 }
3222 }
3223 }
3224 Some(Value::String(s)) => match uni_btic::parse::parse_btic_literal(&s) {
3225 Ok(b) => {
3226 builder
3227 .append_value(uni_btic::encode::encode(&b))
3228 .map_err(arrow_err)?;
3229 }
3230 Err(e) => {
3231 tracing::warn!(
3232 "BTIC coercion failed for property '{}': '{}' is not a valid BTIC literal: {}",
3233 prop_name,
3234 s,
3235 e
3236 );
3237 builder.append_null()
3238 }
3239 },
3240 _ => builder.append_null(),
3241 }
3242 }
3243 Ok(Arc::new(builder.finish()))
3244 }
3245 _ => {
3247 let mut builder = StringBuilder::new();
3248 for vid in vids {
3249 match get_property_value(vid, props_map, prop_name) {
3250 Some(Value::Null) | None => builder.append_null(),
3251 Some(other) => builder.append_value(other.to_string()),
3252 }
3253 }
3254 Ok(Arc::new(builder.finish()))
3255 }
3256 }
3257}
3258
3259fn build_list_property_column(
3261 vids: &[Vid],
3262 props_map: &HashMap<Vid, Properties>,
3263 prop_name: &str,
3264 inner_field: &Arc<Field>,
3265) -> DFResult<ArrayRef> {
3266 match inner_field.data_type() {
3267 DataType::Utf8 => {
3268 let mut builder = ListBuilder::new(StringBuilder::new());
3269 for vid in vids {
3270 match get_property_value(vid, props_map, prop_name) {
3271 Some(Value::List(arr)) => {
3272 for v in arr {
3273 match v {
3274 Value::String(s) => builder.values().append_value(s),
3275 Value::Null => builder.values().append_null(),
3276 other => builder.values().append_value(format!("{other:?}")),
3277 }
3278 }
3279 builder.append(true);
3280 }
3281 _ => builder.append(false),
3282 }
3283 }
3284 Ok(Arc::new(builder.finish()))
3285 }
3286 DataType::Int64 => {
3287 let mut builder = ListBuilder::new(Int64Builder::new());
3288 for vid in vids {
3289 match get_property_value(vid, props_map, prop_name) {
3290 Some(Value::List(arr)) => {
3291 for v in arr {
3292 match v.as_i64() {
3293 Some(n) => builder.values().append_value(n),
3294 None => builder.values().append_null(),
3295 }
3296 }
3297 builder.append(true);
3298 }
3299 _ => builder.append(false),
3300 }
3301 }
3302 Ok(Arc::new(builder.finish()))
3303 }
3304 DataType::Float64 => {
3305 let mut builder = ListBuilder::new(Float64Builder::new());
3306 for vid in vids {
3307 match get_property_value(vid, props_map, prop_name) {
3308 Some(Value::List(arr)) => {
3309 for v in arr {
3310 match v.as_f64() {
3311 Some(n) => builder.values().append_value(n),
3312 None => builder.values().append_null(),
3313 }
3314 }
3315 builder.append(true);
3316 }
3317 _ => builder.append(false),
3318 }
3319 }
3320 Ok(Arc::new(builder.finish()))
3321 }
3322 DataType::Boolean => {
3323 let mut builder = ListBuilder::new(BooleanBuilder::new());
3324 for vid in vids {
3325 match get_property_value(vid, props_map, prop_name) {
3326 Some(Value::List(arr)) => {
3327 for v in arr {
3328 match v.as_bool() {
3329 Some(b) => builder.values().append_value(b),
3330 None => builder.values().append_null(),
3331 }
3332 }
3333 builder.append(true);
3334 }
3335 _ => builder.append(false),
3336 }
3337 }
3338 Ok(Arc::new(builder.finish()))
3339 }
3340 DataType::Struct(fields) => {
3341 build_list_of_structs_column(vids, props_map, prop_name, fields)
3343 }
3344 _ => {
3346 let mut builder = ListBuilder::new(StringBuilder::new());
3347 for vid in vids {
3348 match get_property_value(vid, props_map, prop_name) {
3349 Some(Value::List(arr)) => {
3350 for v in arr {
3351 match v {
3352 Value::Null => builder.values().append_null(),
3353 other => builder.values().append_value(format!("{other:?}")),
3354 }
3355 }
3356 builder.append(true);
3357 }
3358 _ => builder.append(false),
3359 }
3360 }
3361 Ok(Arc::new(builder.finish()))
3362 }
3363 }
3364}
3365
3366fn build_list_of_structs_column(
3372 vids: &[Vid],
3373 props_map: &HashMap<Vid, Properties>,
3374 prop_name: &str,
3375 fields: &Fields,
3376) -> DFResult<ArrayRef> {
3377 use arrow_array::StructArray;
3378
3379 let values: Vec<Option<Value>> = vids
3380 .iter()
3381 .map(|vid| get_property_value(vid, props_map, prop_name))
3382 .collect();
3383
3384 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
3387 .iter()
3388 .map(|val| match val {
3389 Some(Value::List(arr)) => {
3390 let objs: Vec<HashMap<String, Value>> = arr
3391 .iter()
3392 .filter_map(|v| {
3393 if let Value::Map(m) = v {
3394 Some(m.clone())
3395 } else {
3396 None
3397 }
3398 })
3399 .collect();
3400 if objs.is_empty() { None } else { Some(objs) }
3401 }
3402 Some(Value::Map(obj)) => {
3403 let kv_pairs: Vec<HashMap<String, Value>> = obj
3405 .iter()
3406 .map(|(k, v)| {
3407 let mut m = HashMap::new();
3408 m.insert("key".to_string(), Value::String(k.clone()));
3409 m.insert("value".to_string(), v.clone());
3410 m
3411 })
3412 .collect();
3413 Some(kv_pairs)
3414 }
3415 _ => None,
3416 })
3417 .collect();
3418
3419 let total_items: usize = rows
3420 .iter()
3421 .filter_map(|r| r.as_ref())
3422 .map(|v| v.len())
3423 .sum();
3424
3425 let child_arrays: Vec<ArrayRef> = fields
3427 .iter()
3428 .map(|field| {
3429 let field_name = field.name();
3430 match field.data_type() {
3431 DataType::Utf8 => {
3432 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3433 for obj in rows.iter().flatten().flatten() {
3434 match obj.get(field_name) {
3435 Some(Value::String(s)) => builder.append_value(s),
3436 Some(Value::Null) | None => builder.append_null(),
3437 Some(other) => builder.append_value(format!("{other:?}")),
3438 }
3439 }
3440 Arc::new(builder.finish()) as ArrayRef
3441 }
3442 DataType::Int64 => {
3443 let mut builder = Int64Builder::with_capacity(total_items);
3444 for obj in rows.iter().flatten().flatten() {
3445 match obj.get(field_name).and_then(|v| v.as_i64()) {
3446 Some(n) => builder.append_value(n),
3447 None => builder.append_null(),
3448 }
3449 }
3450 Arc::new(builder.finish()) as ArrayRef
3451 }
3452 DataType::Float64 => {
3453 let mut builder = Float64Builder::with_capacity(total_items);
3454 for obj in rows.iter().flatten().flatten() {
3455 match obj.get(field_name).and_then(|v| v.as_f64()) {
3456 Some(n) => builder.append_value(n),
3457 None => builder.append_null(),
3458 }
3459 }
3460 Arc::new(builder.finish()) as ArrayRef
3461 }
3462 _ => {
3464 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
3465 for obj in rows.iter().flatten().flatten() {
3466 match obj.get(field_name) {
3467 Some(Value::Null) | None => builder.append_null(),
3468 Some(other) => builder.append_value(format!("{other:?}")),
3469 }
3470 }
3471 Arc::new(builder.finish()) as ArrayRef
3472 }
3473 }
3474 })
3475 .collect();
3476
3477 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
3479 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3480
3481 let mut offsets = Vec::with_capacity(vids.len() + 1);
3483 let mut nulls = Vec::with_capacity(vids.len());
3484 let mut offset = 0i32;
3485 offsets.push(offset);
3486 for row in &rows {
3487 match row {
3488 Some(objs) => {
3489 offset += objs.len() as i32;
3490 offsets.push(offset);
3491 nulls.push(true);
3492 }
3493 None => {
3494 offsets.push(offset);
3495 nulls.push(false);
3496 }
3497 }
3498 }
3499
3500 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
3501 let list_array = arrow_array::ListArray::try_new(
3502 list_field,
3503 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
3504 Arc::new(struct_array),
3505 Some(arrow::buffer::NullBuffer::from(nulls)),
3506 )
3507 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3508
3509 Ok(Arc::new(list_array))
3510}
3511
3512fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
3515 use uni_common::value::TemporalValue;
3516 let mut m = HashMap::new();
3517 match tv {
3518 TemporalValue::DateTime {
3519 nanos_since_epoch,
3520 offset_seconds,
3521 timezone_name,
3522 } => {
3523 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3524 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3525 if let Some(tz) = timezone_name {
3526 m.insert("timezone_name".into(), Value::String(tz.clone()));
3527 }
3528 }
3529 TemporalValue::LocalDateTime { nanos_since_epoch } => {
3530 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
3531 }
3532 TemporalValue::Time {
3533 nanos_since_midnight,
3534 offset_seconds,
3535 } => {
3536 m.insert(
3537 "nanos_since_midnight".into(),
3538 Value::Int(*nanos_since_midnight),
3539 );
3540 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
3541 }
3542 TemporalValue::LocalTime {
3543 nanos_since_midnight,
3544 } => {
3545 m.insert(
3546 "nanos_since_midnight".into(),
3547 Value::Int(*nanos_since_midnight),
3548 );
3549 }
3550 TemporalValue::Date { days_since_epoch } => {
3551 m.insert(
3552 "days_since_epoch".into(),
3553 Value::Int(*days_since_epoch as i64),
3554 );
3555 }
3556 TemporalValue::Duration {
3557 months,
3558 days,
3559 nanos,
3560 } => {
3561 m.insert("months".into(), Value::Int(*months));
3562 m.insert("days".into(), Value::Int(*days));
3563 m.insert("nanos".into(), Value::Int(*nanos));
3564 }
3565 TemporalValue::Btic { lo, hi, meta } => {
3566 m.insert("lo".into(), Value::Int(*lo));
3567 m.insert("hi".into(), Value::Int(*hi));
3568 m.insert("meta".into(), Value::Int(*meta as i64));
3569 }
3570 }
3571 m
3572}
3573
3574fn build_struct_property_column(
3576 vids: &[Vid],
3577 props_map: &HashMap<Vid, Properties>,
3578 prop_name: &str,
3579 fields: &Fields,
3580) -> DFResult<ArrayRef> {
3581 use arrow_array::StructArray;
3582
3583 let values: Vec<Option<Value>> = vids
3586 .iter()
3587 .map(|vid| {
3588 let val = get_property_value(vid, props_map, prop_name);
3589 match val {
3590 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
3591 other => other,
3592 }
3593 })
3594 .collect();
3595
3596 let child_arrays: Vec<ArrayRef> = fields
3597 .iter()
3598 .map(|field| {
3599 let field_name = field.name();
3600 match field.data_type() {
3601 DataType::Float64 => {
3602 let mut builder = Float64Builder::with_capacity(vids.len());
3603 for val in &values {
3604 match val {
3605 Some(Value::Map(obj)) => {
3606 match obj.get(field_name).and_then(|v| v.as_f64()) {
3607 Some(n) => builder.append_value(n),
3608 None => builder.append_null(),
3609 }
3610 }
3611 _ => builder.append_null(),
3612 }
3613 }
3614 Arc::new(builder.finish()) as ArrayRef
3615 }
3616 DataType::Utf8 => {
3617 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3618 for val in &values {
3619 match val {
3620 Some(Value::Map(obj)) => match obj.get(field_name) {
3621 Some(Value::String(s)) => builder.append_value(s),
3622 Some(Value::Null) | None => builder.append_null(),
3623 Some(other) => builder.append_value(format!("{other:?}")),
3624 },
3625 _ => builder.append_null(),
3626 }
3627 }
3628 Arc::new(builder.finish()) as ArrayRef
3629 }
3630 DataType::Int64 => {
3631 let mut builder = Int64Builder::with_capacity(vids.len());
3632 for val in &values {
3633 match val {
3634 Some(Value::Map(obj)) => {
3635 match obj.get(field_name).and_then(|v| v.as_i64()) {
3636 Some(n) => builder.append_value(n),
3637 None => builder.append_null(),
3638 }
3639 }
3640 _ => builder.append_null(),
3641 }
3642 }
3643 Arc::new(builder.finish()) as ArrayRef
3644 }
3645 DataType::Timestamp(_, _) => {
3646 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
3647 for val in &values {
3648 match val {
3649 Some(Value::Map(obj)) => {
3650 match obj.get(field_name).and_then(|v| v.as_i64()) {
3651 Some(n) => builder.append_value(n),
3652 None => builder.append_null(),
3653 }
3654 }
3655 _ => builder.append_null(),
3656 }
3657 }
3658 Arc::new(builder.finish()) as ArrayRef
3659 }
3660 DataType::Int32 => {
3661 let mut builder = Int32Builder::with_capacity(vids.len());
3662 for val in &values {
3663 match val {
3664 Some(Value::Map(obj)) => {
3665 match obj.get(field_name).and_then(|v| v.as_i64()) {
3666 Some(n) => builder.append_value(n as i32),
3667 None => builder.append_null(),
3668 }
3669 }
3670 _ => builder.append_null(),
3671 }
3672 }
3673 Arc::new(builder.finish()) as ArrayRef
3674 }
3675 DataType::Time64(_) => {
3676 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
3677 for val in &values {
3678 match val {
3679 Some(Value::Map(obj)) => {
3680 match obj.get(field_name).and_then(|v| v.as_i64()) {
3681 Some(n) => builder.append_value(n),
3682 None => builder.append_null(),
3683 }
3684 }
3685 _ => builder.append_null(),
3686 }
3687 }
3688 Arc::new(builder.finish()) as ArrayRef
3689 }
3690 _ => {
3692 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
3693 for val in &values {
3694 match val {
3695 Some(Value::Map(obj)) => match obj.get(field_name) {
3696 Some(Value::Null) | None => builder.append_null(),
3697 Some(other) => builder.append_value(format!("{other:?}")),
3698 },
3699 _ => builder.append_null(),
3700 }
3701 }
3702 Arc::new(builder.finish()) as ArrayRef
3703 }
3704 }
3705 })
3706 .collect();
3707
3708 let nulls: Vec<bool> = values
3710 .iter()
3711 .map(|v| matches!(v, Some(Value::Map(_))))
3712 .collect();
3713
3714 let struct_array = StructArray::try_new(
3715 fields.clone(),
3716 child_arrays,
3717 Some(arrow::buffer::NullBuffer::from(nulls)),
3718 )
3719 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
3720
3721 Ok(Arc::new(struct_array))
3722}
3723
3724impl Stream for GraphScanStream {
3725 type Item = DFResult<RecordBatch>;
3726
3727 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3728 let metrics = self.metrics.clone();
3729 let _timer = metrics.elapsed_compute().timer();
3730 loop {
3731 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
3733
3734 match state {
3735 GraphScanState::Init => {
3736 let graph_ctx = self.graph_ctx.clone();
3738 let label = self.label.clone();
3739 let variable = self.variable.clone();
3740 let properties = self.properties.clone();
3741 let is_edge_scan = self.is_edge_scan;
3742 let is_schemaless = self.is_schemaless;
3743 let filter = self.filter.clone();
3744 let vid_list_filter = self.vid_list_filter.clone();
3745 let extra_lance_filter = self.extra_lance_filter.clone();
3746 let extra_runtime_filter = self.extra_runtime_filter.clone();
3747 let schema = self.schema.clone();
3748
3749 let fut = async move {
3750 graph_ctx.check_timeout().map_err(|e| {
3751 datafusion::error::DataFusionError::Execution(e.to_string())
3752 })?;
3753
3754 let batch = if is_edge_scan {
3755 columnar_scan_edge_batch_static(
3756 &graph_ctx,
3757 &label,
3758 &variable,
3759 &properties,
3760 &schema,
3761 )
3762 .await?
3763 } else if is_schemaless {
3764 columnar_scan_schemaless_vertex_batch_static(
3765 &graph_ctx,
3766 &label,
3767 &variable,
3768 &properties,
3769 &schema,
3770 &filter,
3771 vid_list_filter.as_deref(),
3772 extra_lance_filter.as_deref(),
3773 extra_runtime_filter.as_ref(),
3774 )
3775 .await?
3776 } else {
3777 columnar_scan_vertex_batch_static(
3778 &graph_ctx,
3779 &label,
3780 &variable,
3781 &properties,
3782 &schema,
3783 &filter,
3784 vid_list_filter.as_deref(),
3785 extra_lance_filter.as_deref(),
3786 extra_runtime_filter.as_ref(),
3787 )
3788 .await?
3789 };
3790 Ok(Some(batch))
3791 };
3792
3793 self.state = GraphScanState::Executing(Box::pin(fut));
3794 }
3796 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
3797 Poll::Ready(Ok(batch)) => {
3798 self.state = GraphScanState::Done;
3799 self.metrics
3800 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3801 return Poll::Ready(batch.map(Ok));
3802 }
3803 Poll::Ready(Err(e)) => {
3804 self.state = GraphScanState::Done;
3805 return Poll::Ready(Some(Err(e)));
3806 }
3807 Poll::Pending => {
3808 self.state = GraphScanState::Executing(fut);
3809 return Poll::Pending;
3810 }
3811 },
3812 GraphScanState::Done => {
3813 return Poll::Ready(None);
3814 }
3815 }
3816 }
3817 }
3818}
3819
3820impl RecordBatchStream for GraphScanStream {
3821 fn schema(&self) -> SchemaRef {
3822 self.schema.clone()
3823 }
3824}
3825
3826#[cfg(test)]
3827mod tests {
3828 use super::*;
3829
3830 #[test]
3831 fn test_build_vertex_schema() {
3832 let uni_schema = UniSchema::default();
3833 let schema = GraphScanExec::build_vertex_schema(
3834 "n",
3835 "Person",
3836 &["name".to_string(), "age".to_string()],
3837 &uni_schema,
3838 );
3839
3840 assert_eq!(schema.fields().len(), 4);
3841 assert_eq!(schema.field(0).name(), "n._vid");
3842 assert_eq!(schema.field(1).name(), "n._labels");
3843 assert_eq!(schema.field(2).name(), "n.name");
3844 assert_eq!(schema.field(3).name(), "n.age");
3845 }
3846
3847 #[test]
3848 fn test_build_edge_schema() {
3849 let uni_schema = UniSchema::default();
3850 let schema =
3851 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3852
3853 assert_eq!(schema.fields().len(), 4);
3854 assert_eq!(schema.field(0).name(), "r._eid");
3855 assert_eq!(schema.field(1).name(), "r._src_vid");
3856 assert_eq!(schema.field(2).name(), "r._dst_vid");
3857 assert_eq!(schema.field(3).name(), "r.weight");
3858 }
3859
3860 #[test]
3861 fn test_build_schemaless_vertex_schema() {
3862 let empty_schema = uni_common::core::schema::Schema::default();
3863 let schema = GraphScanExec::build_schemaless_vertex_schema(
3864 "n",
3865 &["name".to_string(), "age".to_string()],
3866 &empty_schema,
3867 );
3868
3869 assert_eq!(schema.fields().len(), 4);
3870 assert_eq!(schema.field(0).name(), "n._vid");
3871 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3872 assert_eq!(schema.field(1).name(), "n._labels");
3873 assert_eq!(schema.field(2).name(), "n.name");
3874 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3876 assert_eq!(schema.field(3).name(), "n.age");
3877 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3878 }
3879
3880 #[test]
3881 fn test_schemaless_all_scan_has_empty_label() {
3882 let empty_schema = uni_common::core::schema::Schema::default();
3883 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[], &empty_schema);
3884
3885 assert_eq!(schema.fields().len(), 2);
3887 assert_eq!(schema.field(0).name(), "n._vid");
3888 assert_eq!(schema.field(1).name(), "n._labels");
3889 }
3890
3891 #[test]
3892 fn test_cypher_value_all_props_extraction() {
3893 let map: HashMap<String, Value> = [
3896 ("age".to_string(), Value::Int(30)),
3897 ("name".to_string(), Value::String("Alice".to_string())),
3898 ]
3899 .into_iter()
3900 .collect();
3901 let cv_bytes = uni_common::cypher_value_codec::encode(&Value::Map(map));
3902
3903 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3905 match decoded {
3906 uni_common::Value::Map(map) => {
3907 let age_val = map.get("age").unwrap();
3908 assert_eq!(age_val, &uni_common::Value::Int(30));
3909 }
3910 _ => panic!("Expected Map"),
3911 }
3912
3913 let single_bytes = uni_common::cypher_value_codec::encode(&Value::Int(30));
3915 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3916 assert_eq!(single_decoded, uni_common::Value::Int(30));
3917 }
3918
3919 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3921 let schema = Arc::new(Schema::new(vec![
3922 Field::new("_vid", DataType::UInt64, false),
3923 Field::new("_deleted", DataType::Boolean, false),
3924 Field::new("_version", DataType::UInt64, false),
3925 Field::new("name", DataType::Utf8, true),
3926 ]));
3927 let names: Vec<String> = vids
3929 .iter()
3930 .zip(versions.iter())
3931 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3932 .collect();
3933 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3934
3935 RecordBatch::try_new(
3936 schema,
3937 vec![
3938 Arc::new(UInt64Array::from(vids.to_vec())),
3939 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3940 Arc::new(UInt64Array::from(versions.to_vec())),
3941 Arc::new(name_arr),
3942 ],
3943 )
3944 .unwrap()
3945 }
3946
3947 #[test]
3948 fn test_mvcc_dedup_multiple_versions() {
3949 let batch = make_mvcc_batch(
3952 &[1, 1, 1, 2, 2],
3953 &[3, 1, 5, 2, 4],
3954 &[false, false, false, false, false],
3955 );
3956
3957 let result = mvcc_dedup_batch(&batch).unwrap();
3958 assert_eq!(result.num_rows(), 2);
3959
3960 let vid_col = result
3961 .column_by_name("_vid")
3962 .unwrap()
3963 .as_any()
3964 .downcast_ref::<UInt64Array>()
3965 .unwrap();
3966 let ver_col = result
3967 .column_by_name("_version")
3968 .unwrap()
3969 .as_any()
3970 .downcast_ref::<UInt64Array>()
3971 .unwrap();
3972 let name_col = result
3973 .column_by_name("name")
3974 .unwrap()
3975 .as_any()
3976 .downcast_ref::<arrow_array::StringArray>()
3977 .unwrap();
3978
3979 assert_eq!(vid_col.value(0), 1);
3981 assert_eq!(ver_col.value(0), 5);
3982 assert_eq!(name_col.value(0), "v1_ver5");
3983
3984 assert_eq!(vid_col.value(1), 2);
3985 assert_eq!(ver_col.value(1), 4);
3986 assert_eq!(name_col.value(1), "v2_ver4");
3987 }
3988
3989 #[test]
3990 fn test_mvcc_dedup_single_rows() {
3991 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3993 let result = mvcc_dedup_batch(&batch).unwrap();
3994 assert_eq!(result.num_rows(), 3);
3995 }
3996
3997 #[test]
3998 fn test_mvcc_dedup_empty() {
3999 let batch = make_mvcc_batch(&[], &[], &[]);
4000 let result = mvcc_dedup_batch(&batch).unwrap();
4001 assert_eq!(result.num_rows(), 0);
4002 }
4003
4004 #[test]
4005 fn test_filter_l0_tombstones_removes_tombstoned() {
4006 use crate::query::df_graph::L0Context;
4007
4008 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4010
4011 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
4013 {
4014 }
4018 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
4019 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
4020
4021 let l0_ctx = L0Context {
4022 current_l0: Some(l0_buf),
4023 transaction_l0: None,
4024 pending_flush_l0s: vec![],
4025 };
4026
4027 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4028 assert_eq!(result.num_rows(), 2);
4029
4030 let vid_col = result
4031 .column_by_name("_vid")
4032 .unwrap()
4033 .as_any()
4034 .downcast_ref::<UInt64Array>()
4035 .unwrap();
4036 assert_eq!(vid_col.value(0), 1);
4037 assert_eq!(vid_col.value(1), 3);
4038 }
4039
4040 #[test]
4041 fn test_filter_l0_tombstones_none() {
4042 use crate::query::df_graph::L0Context;
4043
4044 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
4045 let l0_ctx = L0Context::default();
4046
4047 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
4048 assert_eq!(result.num_rows(), 3);
4049 }
4050
4051 #[test]
4052 fn test_map_to_output_schema_basic() {
4053 use crate::query::df_graph::L0Context;
4054
4055 let lance_schema = Arc::new(Schema::new(vec![
4057 Field::new("_vid", DataType::UInt64, false),
4058 Field::new("_deleted", DataType::Boolean, false),
4059 Field::new("_version", DataType::UInt64, false),
4060 Field::new("name", DataType::Utf8, true),
4061 ]));
4062 let name_arr: arrow_array::StringArray =
4063 vec![Some("Alice"), Some("Bob")].into_iter().collect();
4064 let batch = RecordBatch::try_new(
4065 lance_schema,
4066 vec![
4067 Arc::new(UInt64Array::from(vec![1u64, 2])),
4068 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
4069 Arc::new(UInt64Array::from(vec![1u64, 1])),
4070 Arc::new(name_arr),
4071 ],
4072 )
4073 .unwrap();
4074
4075 let output_schema = Arc::new(Schema::new(vec![
4077 Field::new("n._vid", DataType::UInt64, false),
4078 Field::new("n._labels", labels_data_type(), true),
4079 Field::new("n.name", DataType::Utf8, true),
4080 ]));
4081
4082 let l0_ctx = L0Context::default();
4083 let result = map_to_output_schema(
4084 &batch,
4085 "Person",
4086 "n",
4087 &["name".to_string()],
4088 &output_schema,
4089 &l0_ctx,
4090 )
4091 .unwrap();
4092
4093 assert_eq!(result.num_rows(), 2);
4094 assert_eq!(result.schema().fields().len(), 3);
4095 assert_eq!(result.schema().field(0).name(), "n._vid");
4096 assert_eq!(result.schema().field(1).name(), "n._labels");
4097 assert_eq!(result.schema().field(2).name(), "n.name");
4098
4099 let name_col = result
4101 .column(2)
4102 .as_any()
4103 .downcast_ref::<arrow_array::StringArray>()
4104 .unwrap();
4105 assert_eq!(name_col.value(0), "Alice");
4106 assert_eq!(name_col.value(1), "Bob");
4107 }
4108}