1use crate::query::datetime::parse_datetime_utc;
27use crate::query::df_graph::GraphExecutionContext;
28use crate::query::df_graph::common::{compute_plan_properties, labels_data_type};
29use arrow_array::builder::{
30 BinaryBuilder, BooleanBuilder, Date32Builder, FixedSizeListBuilder, Float32Builder,
31 Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
32 Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt64Builder,
33};
34use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
35use arrow_schema::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit};
36use chrono::{NaiveDate, NaiveTime, Timelike};
37use datafusion::common::Result as DFResult;
38use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
39use datafusion::physical_expr::PhysicalExpr;
40use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
41use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
42use futures::Stream;
43use std::any::Any;
44use std::collections::{HashMap, HashSet};
45use std::fmt;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use uni_common::Properties;
50use uni_common::Value;
51use uni_common::core::id::Vid;
52use uni_common::core::schema::Schema as UniSchema;
53
54pub struct GraphScanExec {
76 graph_ctx: Arc<GraphExecutionContext>,
78
79 label: String,
81
82 variable: String,
84
85 projected_properties: Vec<String>,
87
88 filter: Option<Arc<dyn PhysicalExpr>>,
90
91 is_edge_scan: bool,
93
94 is_schemaless: bool,
96
97 schema: SchemaRef,
99
100 properties: PlanProperties,
102
103 metrics: ExecutionPlanMetricsSet,
105}
106
107impl fmt::Debug for GraphScanExec {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("GraphScanExec")
110 .field("label", &self.label)
111 .field("variable", &self.variable)
112 .field("projected_properties", &self.projected_properties)
113 .field("is_edge_scan", &self.is_edge_scan)
114 .finish()
115 }
116}
117
118impl GraphScanExec {
119 pub fn new_vertex_scan(
124 graph_ctx: Arc<GraphExecutionContext>,
125 label: impl Into<String>,
126 variable: impl Into<String>,
127 projected_properties: Vec<String>,
128 filter: Option<Arc<dyn PhysicalExpr>>,
129 ) -> Self {
130 let label = label.into();
131 let variable = variable.into();
132
133 let uni_schema = graph_ctx.storage().schema_manager().schema();
135 let schema =
136 Self::build_vertex_schema(&variable, &label, &projected_properties, &uni_schema);
137
138 let properties = compute_plan_properties(schema.clone());
139
140 Self {
141 graph_ctx,
142 label,
143 variable,
144 projected_properties,
145 filter,
146 is_edge_scan: false,
147 is_schemaless: false,
148 schema,
149 properties,
150 metrics: ExecutionPlanMetricsSet::new(),
151 }
152 }
153
154 pub fn new_schemaless_vertex_scan(
160 graph_ctx: Arc<GraphExecutionContext>,
161 label_name: impl Into<String>,
162 variable: impl Into<String>,
163 projected_properties: Vec<String>,
164 filter: Option<Arc<dyn PhysicalExpr>>,
165 ) -> Self {
166 let label = label_name.into();
167 let variable = variable.into();
168
169 let projected_properties: Vec<String> = projected_properties
174 .into_iter()
175 .filter(|p| p != "_vid" && p != "_labels")
176 .collect();
177
178 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
180 let properties = compute_plan_properties(schema.clone());
181
182 Self {
183 graph_ctx,
184 label,
185 variable,
186 projected_properties,
187 filter,
188 is_edge_scan: false,
189 is_schemaless: true,
190 schema,
191 properties,
192 metrics: ExecutionPlanMetricsSet::new(),
193 }
194 }
195
196 pub fn new_multi_label_vertex_scan(
201 graph_ctx: Arc<GraphExecutionContext>,
202 labels: Vec<String>,
203 variable: impl Into<String>,
204 projected_properties: Vec<String>,
205 filter: Option<Arc<dyn PhysicalExpr>>,
206 ) -> Self {
207 let variable = variable.into();
208 let projected_properties: Vec<String> = projected_properties
209 .into_iter()
210 .filter(|p| p != "_vid" && p != "_labels")
211 .collect();
212 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
213 let properties = compute_plan_properties(schema.clone());
214
215 let encoded_labels = labels.join(":");
217
218 Self {
219 graph_ctx,
220 label: encoded_labels,
221 variable,
222 projected_properties,
223 filter,
224 is_edge_scan: false,
225 is_schemaless: true,
226 schema,
227 properties,
228 metrics: ExecutionPlanMetricsSet::new(),
229 }
230 }
231
232 pub fn new_schemaless_all_scan(
238 graph_ctx: Arc<GraphExecutionContext>,
239 variable: impl Into<String>,
240 projected_properties: Vec<String>,
241 filter: Option<Arc<dyn PhysicalExpr>>,
242 ) -> Self {
243 let variable = variable.into();
244 let projected_properties: Vec<String> = projected_properties
245 .into_iter()
246 .filter(|p| p != "_vid" && p != "_labels")
247 .collect();
248
249 let schema = Self::build_schemaless_vertex_schema(&variable, &projected_properties);
251 let properties = compute_plan_properties(schema.clone());
252
253 Self {
254 graph_ctx,
255 label: String::new(), variable,
257 projected_properties,
258 filter,
259 is_edge_scan: false,
260 is_schemaless: true,
261 schema,
262 properties,
263 metrics: ExecutionPlanMetricsSet::new(),
264 }
265 }
266
267 fn build_schemaless_vertex_schema(variable: &str, properties: &[String]) -> SchemaRef {
269 let mut fields = vec![
270 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
271 Field::new(format!("{}._labels", variable), labels_data_type(), true),
272 ];
273
274 for prop in properties {
275 let col_name = format!("{}.{}", variable, prop);
276 fields.push(Field::new(&col_name, DataType::LargeBinary, true));
278 }
279
280 Arc::new(Schema::new(fields))
281 }
282
283 pub fn new_edge_scan(
288 graph_ctx: Arc<GraphExecutionContext>,
289 edge_type: impl Into<String>,
290 variable: impl Into<String>,
291 projected_properties: Vec<String>,
292 filter: Option<Arc<dyn PhysicalExpr>>,
293 ) -> Self {
294 let label = edge_type.into();
295 let variable = variable.into();
296
297 let uni_schema = graph_ctx.storage().schema_manager().schema();
299 let schema = Self::build_edge_schema(&variable, &label, &projected_properties, &uni_schema);
300
301 let properties = compute_plan_properties(schema.clone());
302
303 Self {
304 graph_ctx,
305 label,
306 variable,
307 projected_properties,
308 filter,
309 is_edge_scan: true,
310 is_schemaless: false,
311 schema,
312 properties,
313 metrics: ExecutionPlanMetricsSet::new(),
314 }
315 }
316
317 fn build_vertex_schema(
319 variable: &str,
320 label: &str,
321 properties: &[String],
322 uni_schema: &UniSchema,
323 ) -> SchemaRef {
324 let mut fields = vec![
325 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
326 Field::new(format!("{}._labels", variable), labels_data_type(), true),
327 ];
328 let label_props = uni_schema.properties.get(label);
329 for prop in properties {
330 let col_name = format!("{}.{}", variable, prop);
331 let arrow_type = resolve_property_type(prop, label_props);
332 fields.push(Field::new(&col_name, arrow_type, true));
333 }
334 Arc::new(Schema::new(fields))
335 }
336
337 fn build_edge_schema(
339 variable: &str,
340 edge_type: &str,
341 properties: &[String],
342 uni_schema: &UniSchema,
343 ) -> SchemaRef {
344 let mut fields = vec![
345 Field::new(format!("{}._eid", variable), DataType::UInt64, false),
346 Field::new(format!("{}._src_vid", variable), DataType::UInt64, false),
347 Field::new(format!("{}._dst_vid", variable), DataType::UInt64, false),
348 ];
349 let edge_props = uni_schema.properties.get(edge_type);
350 for prop in properties {
351 let col_name = format!("{}.{}", variable, prop);
352 let arrow_type = resolve_property_type(prop, edge_props);
353 fields.push(Field::new(&col_name, arrow_type, true));
354 }
355 Arc::new(Schema::new(fields))
356 }
357}
358
359impl DisplayAs for GraphScanExec {
360 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 let scan_type = if self.is_edge_scan { "Edge" } else { "Vertex" };
362 write!(
363 f,
364 "GraphScanExec: {}={}, properties={:?}",
365 scan_type, self.label, self.projected_properties
366 )?;
367 if self.filter.is_some() {
368 write!(f, ", filter=<pushed>")?;
369 }
370 Ok(())
371 }
372}
373
374impl ExecutionPlan for GraphScanExec {
375 fn name(&self) -> &str {
376 "GraphScanExec"
377 }
378
379 fn as_any(&self) -> &dyn Any {
380 self
381 }
382
383 fn schema(&self) -> SchemaRef {
384 self.schema.clone()
385 }
386
387 fn properties(&self) -> &PlanProperties {
388 &self.properties
389 }
390
391 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
392 vec![]
393 }
394
395 fn with_new_children(
396 self: Arc<Self>,
397 children: Vec<Arc<dyn ExecutionPlan>>,
398 ) -> DFResult<Arc<dyn ExecutionPlan>> {
399 if children.is_empty() {
400 Ok(self)
401 } else {
402 Err(datafusion::error::DataFusionError::Plan(
403 "GraphScanExec does not accept children".to_string(),
404 ))
405 }
406 }
407
408 fn execute(
409 &self,
410 partition: usize,
411 _context: Arc<TaskContext>,
412 ) -> DFResult<SendableRecordBatchStream> {
413 let metrics = BaselineMetrics::new(&self.metrics, partition);
414
415 Ok(Box::pin(GraphScanStream::new(
416 self.graph_ctx.clone(),
417 self.label.clone(),
418 self.variable.clone(),
419 self.projected_properties.clone(),
420 self.is_edge_scan,
421 self.is_schemaless,
422 self.schema.clone(),
423 metrics,
424 )))
425 }
426
427 fn metrics(&self) -> Option<MetricsSet> {
428 Some(self.metrics.clone_inner())
429 }
430}
431
432enum GraphScanState {
434 Init,
436 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
438 Done,
440}
441
442struct GraphScanStream {
448 graph_ctx: Arc<GraphExecutionContext>,
450
451 label: String,
453
454 variable: String,
456
457 properties: Vec<String>,
459
460 is_edge_scan: bool,
462
463 is_schemaless: bool,
465
466 schema: SchemaRef,
468
469 state: GraphScanState,
471
472 metrics: BaselineMetrics,
474}
475
476impl GraphScanStream {
477 #[expect(clippy::too_many_arguments)]
479 fn new(
480 graph_ctx: Arc<GraphExecutionContext>,
481 label: String,
482 variable: String,
483 properties: Vec<String>,
484 is_edge_scan: bool,
485 is_schemaless: bool,
486 schema: SchemaRef,
487 metrics: BaselineMetrics,
488 ) -> Self {
489 Self {
490 graph_ctx,
491 label,
492 variable,
493 properties,
494 is_edge_scan,
495 is_schemaless,
496 schema,
497 state: GraphScanState::Init,
498 metrics,
499 }
500 }
501}
502
503pub(crate) fn resolve_property_type(
508 prop: &str,
509 schema_props: Option<
510 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
511 >,
512) -> DataType {
513 if prop == "overflow_json" {
514 DataType::LargeBinary
515 } else {
516 schema_props
517 .and_then(|props| props.get(prop))
518 .map(|meta| meta.r#type.to_arrow())
519 .unwrap_or(DataType::LargeBinary)
520 }
521}
522
523#[cfg(test)]
532fn mvcc_dedup_batch(batch: &RecordBatch) -> DFResult<RecordBatch> {
533 mvcc_dedup_batch_by(batch, "_vid")
534}
535
536fn mvcc_dedup_to_option(
541 batch: Option<RecordBatch>,
542 id_column: &str,
543) -> DFResult<Option<RecordBatch>> {
544 match batch {
545 Some(b) => {
546 let deduped = mvcc_dedup_batch_by(&b, id_column)?;
547 Ok(if deduped.num_rows() > 0 {
548 Some(deduped)
549 } else {
550 None
551 })
552 }
553 None => Ok(None),
554 }
555}
556
557fn merge_lance_and_l0(
561 lance_deduped: Option<RecordBatch>,
562 l0_batch: RecordBatch,
563 internal_schema: &SchemaRef,
564 id_column: &str,
565) -> DFResult<Option<RecordBatch>> {
566 let has_l0 = l0_batch.num_rows() > 0;
567 match (lance_deduped, has_l0) {
568 (Some(lance), true) => {
569 let combined = arrow::compute::concat_batches(internal_schema, &[lance, l0_batch])
570 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
571 Ok(Some(mvcc_dedup_batch_by(&combined, id_column)?))
572 }
573 (Some(lance), false) => Ok(Some(lance)),
574 (None, true) => Ok(Some(l0_batch)),
575 (None, false) => Ok(None),
576 }
577}
578
579fn push_column_if_absent(columns: &mut Vec<String>, col_name: &str) {
584 if !columns.iter().any(|c| c == col_name) {
585 columns.push(col_name.to_string());
586 }
587}
588
589fn extract_from_overflow_blob(
594 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
595 row: usize,
596 prop: &str,
597) -> Option<Vec<u8>> {
598 let arr = overflow_arr?;
599 if arr.is_null(row) {
600 return None;
601 }
602 uni_common::cypher_value_codec::extract_map_entry_raw(arr.value(row), prop)
603}
604
605fn build_overflow_property_column(
612 num_rows: usize,
613 vid_arr: &UInt64Array,
614 overflow_arr: Option<&arrow_array::LargeBinaryArray>,
615 prop: &str,
616 l0_ctx: &crate::query::df_graph::L0Context,
617) -> ArrayRef {
618 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
619 for i in 0..num_rows {
620 let vid = Vid::from(vid_arr.value(i));
621
622 let l0_val = resolve_l0_property(&vid, prop, l0_ctx);
624
625 if let Some(val_opt) = l0_val {
626 append_value_as_cypher_binary(&mut builder, val_opt.as_ref());
627 } else if let Some(bytes) = extract_from_overflow_blob(overflow_arr, i, prop) {
628 builder.append_value(&bytes);
629 } else {
630 builder.append_null();
631 }
632 }
633 Arc::new(builder.finish())
634}
635
636fn resolve_l0_property(
642 vid: &Vid,
643 prop: &str,
644 l0_ctx: &crate::query::df_graph::L0Context,
645) -> Option<Option<Value>> {
646 let mut result = None;
647 for l0 in l0_ctx.iter_l0_buffers() {
648 let guard = l0.read();
649 if let Some(props) = guard.vertex_properties.get(vid)
650 && let Some(val) = props.get(prop)
651 {
652 result = Some(Some(val.clone()));
653 }
654 }
655 result
656}
657
658fn append_value_as_cypher_binary(
663 builder: &mut arrow_array::builder::LargeBinaryBuilder,
664 val: Option<&Value>,
665) {
666 match val {
667 Some(v) if !v.is_null() => {
668 let json_val: serde_json::Value = v.clone().into();
669 match encode_cypher_value(&json_val) {
670 Ok(bytes) => builder.append_value(bytes),
671 Err(_) => builder.append_null(),
672 }
673 }
674 _ => builder.append_null(),
675 }
676}
677
678fn mvcc_dedup_batch_by(batch: &RecordBatch, id_column: &str) -> DFResult<RecordBatch> {
684 if batch.num_rows() == 0 {
685 return Ok(batch.clone());
686 }
687
688 let id_col = batch
689 .column_by_name(id_column)
690 .ok_or_else(|| {
691 datafusion::error::DataFusionError::Internal(format!("Missing {} column", id_column))
692 })?
693 .clone();
694 let version_col = batch
695 .column_by_name("_version")
696 .ok_or_else(|| {
697 datafusion::error::DataFusionError::Internal("Missing _version column".to_string())
698 })?
699 .clone();
700
701 let sort_columns = vec![
703 arrow::compute::SortColumn {
704 values: id_col,
705 options: Some(arrow::compute::SortOptions {
706 descending: false,
707 nulls_first: false,
708 }),
709 },
710 arrow::compute::SortColumn {
711 values: version_col,
712 options: Some(arrow::compute::SortOptions {
713 descending: true,
714 nulls_first: false,
715 }),
716 },
717 ];
718 let indices = arrow::compute::lexsort_to_indices(&sort_columns, None)
719 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
720
721 let sorted_columns: Vec<ArrayRef> = batch
723 .columns()
724 .iter()
725 .map(|col| arrow::compute::take(col.as_ref(), &indices, None))
726 .collect::<Result<_, _>>()
727 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
728 let sorted = RecordBatch::try_new(batch.schema(), sorted_columns)
729 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
730
731 let sorted_id = sorted
733 .column_by_name(id_column)
734 .unwrap()
735 .as_any()
736 .downcast_ref::<UInt64Array>()
737 .unwrap();
738
739 let mut keep = vec![false; sorted.num_rows()];
740 if !keep.is_empty() {
741 keep[0] = true;
742 for (i, flag) in keep.iter_mut().enumerate().skip(1) {
743 if sorted_id.value(i) != sorted_id.value(i - 1) {
744 *flag = true;
745 }
746 }
747 }
748
749 let mask = arrow_array::BooleanArray::from(keep);
750 arrow::compute::filter_record_batch(&sorted, &mask)
751 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
752}
753
754fn filter_deleted_edge_ops(batch: &RecordBatch) -> DFResult<RecordBatch> {
756 if batch.num_rows() == 0 {
757 return Ok(batch.clone());
758 }
759 let op_col = match batch.column_by_name("op") {
760 Some(col) => col
761 .as_any()
762 .downcast_ref::<arrow_array::UInt8Array>()
763 .unwrap(),
764 None => return Ok(batch.clone()),
765 };
766 let keep: Vec<bool> = (0..op_col.len()).map(|i| op_col.value(i) == 0).collect();
767 let mask = arrow_array::BooleanArray::from(keep);
768 arrow::compute::filter_record_batch(batch, &mask)
769 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
770}
771
772fn filter_deleted_rows(batch: &RecordBatch) -> DFResult<RecordBatch> {
774 if batch.num_rows() == 0 {
775 return Ok(batch.clone());
776 }
777 let deleted_col = match batch.column_by_name("_deleted") {
778 Some(col) => col
779 .as_any()
780 .downcast_ref::<arrow_array::BooleanArray>()
781 .unwrap(),
782 None => return Ok(batch.clone()),
783 };
784 let keep: Vec<bool> = (0..deleted_col.len())
785 .map(|i| !deleted_col.value(i))
786 .collect();
787 let mask = arrow_array::BooleanArray::from(keep);
788 arrow::compute::filter_record_batch(batch, &mask)
789 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
790}
791
792fn filter_l0_tombstones(
794 batch: &RecordBatch,
795 l0_ctx: &crate::query::df_graph::L0Context,
796) -> DFResult<RecordBatch> {
797 if batch.num_rows() == 0 {
798 return Ok(batch.clone());
799 }
800
801 let mut tombstones: HashSet<u64> = HashSet::new();
802 for l0 in l0_ctx.iter_l0_buffers() {
803 let guard = l0.read();
804 for vid in guard.vertex_tombstones.iter() {
805 tombstones.insert(vid.as_u64());
806 }
807 }
808
809 if tombstones.is_empty() {
810 return Ok(batch.clone());
811 }
812
813 let vid_col = batch
814 .column_by_name("_vid")
815 .ok_or_else(|| {
816 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
817 })?
818 .as_any()
819 .downcast_ref::<UInt64Array>()
820 .unwrap();
821
822 let keep: Vec<bool> = (0..vid_col.len())
823 .map(|i| !tombstones.contains(&vid_col.value(i)))
824 .collect();
825 let mask = arrow_array::BooleanArray::from(keep);
826 arrow::compute::filter_record_batch(batch, &mask)
827 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
828}
829
830fn filter_l0_edge_tombstones(
832 batch: &RecordBatch,
833 l0_ctx: &crate::query::df_graph::L0Context,
834) -> DFResult<RecordBatch> {
835 if batch.num_rows() == 0 {
836 return Ok(batch.clone());
837 }
838
839 let mut tombstones: HashSet<u64> = HashSet::new();
840 for l0 in l0_ctx.iter_l0_buffers() {
841 let guard = l0.read();
842 for eid in guard.tombstones.keys() {
843 tombstones.insert(eid.as_u64());
844 }
845 }
846
847 if tombstones.is_empty() {
848 return Ok(batch.clone());
849 }
850
851 let eid_col = batch
852 .column_by_name("eid")
853 .ok_or_else(|| {
854 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
855 })?
856 .as_any()
857 .downcast_ref::<UInt64Array>()
858 .unwrap();
859
860 let keep: Vec<bool> = (0..eid_col.len())
861 .map(|i| !tombstones.contains(&eid_col.value(i)))
862 .collect();
863 let mask = arrow_array::BooleanArray::from(keep);
864 arrow::compute::filter_record_batch(batch, &mask)
865 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
866}
867
868fn build_l0_vertex_batch(
874 l0_ctx: &crate::query::df_graph::L0Context,
875 label: &str,
876 lance_schema: &SchemaRef,
877 label_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
878) -> DFResult<RecordBatch> {
879 let mut vid_data: HashMap<u64, (Properties, u64)> = HashMap::new(); let mut tombstones: HashSet<u64> = HashSet::new();
882
883 for l0 in l0_ctx.iter_l0_buffers() {
884 let guard = l0.read();
885 for vid in guard.vertex_tombstones.iter() {
887 tombstones.insert(vid.as_u64());
888 }
889 for vid in guard.vids_for_label(label) {
891 let vid_u64 = vid.as_u64();
892 if tombstones.contains(&vid_u64) {
893 continue;
894 }
895 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
896 let entry = vid_data
897 .entry(vid_u64)
898 .or_insert_with(|| (Properties::new(), 0));
899 if let Some(props) = guard.vertex_properties.get(&vid) {
901 for (k, v) in props {
902 entry.0.insert(k.clone(), v.clone());
903 }
904 }
905 if version > entry.1 {
907 entry.1 = version;
908 }
909 }
910 }
911
912 for t in &tombstones {
914 vid_data.remove(t);
915 }
916
917 if vid_data.is_empty() {
918 return Ok(RecordBatch::new_empty(lance_schema.clone()));
919 }
920
921 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
923 vids.sort_unstable();
924
925 let num_rows = vids.len();
926 let mut columns: Vec<ArrayRef> = Vec::with_capacity(lance_schema.fields().len());
927
928 let schema_prop_names: HashSet<&str> = label_props
930 .map(|lp| lp.keys().map(|k| k.as_str()).collect())
931 .unwrap_or_default();
932
933 for field in lance_schema.fields() {
934 let col_name = field.name().as_str();
935 match col_name {
936 "_vid" => {
937 columns.push(Arc::new(UInt64Array::from(vids.clone())));
938 }
939 "_deleted" => {
940 let vals = vec![false; num_rows];
942 columns.push(Arc::new(arrow_array::BooleanArray::from(vals)));
943 }
944 "_version" => {
945 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
946 columns.push(Arc::new(UInt64Array::from(vals)));
947 }
948 "overflow_json" => {
949 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
951 for vid_u64 in &vids {
952 let (props, _) = &vid_data[vid_u64];
953 let mut overflow = serde_json::Map::new();
954 for (k, v) in props {
955 if k == "ext_id" || k.starts_with('_') {
956 continue;
957 }
958 if !schema_prop_names.contains(k.as_str()) {
959 let json_val: serde_json::Value = v.clone().into();
960 overflow.insert(k.clone(), json_val);
961 }
962 }
963 if overflow.is_empty() {
964 builder.append_null();
965 } else {
966 let json_val = serde_json::Value::Object(overflow);
967 match encode_cypher_value(&json_val) {
968 Ok(bytes) => builder.append_value(bytes),
969 Err(_) => builder.append_null(),
970 }
971 }
972 }
973 columns.push(Arc::new(builder.finish()));
974 }
975 _ => {
976 let col = build_l0_property_column(&vids, &vid_data, col_name, field.data_type())?;
978 columns.push(col);
979 }
980 }
981 }
982
983 RecordBatch::try_new(lance_schema.clone(), columns)
984 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
985}
986
987fn build_l0_property_column(
991 vids: &[u64],
992 vid_data: &HashMap<u64, (Properties, u64)>,
993 prop_name: &str,
994 data_type: &DataType,
995) -> DFResult<ArrayRef> {
996 let vid_keys: Vec<Vid> = vids.iter().map(|v| Vid::from(*v)).collect();
998 let props_map: HashMap<Vid, Properties> = vid_data
999 .iter()
1000 .map(|(k, (props, _))| (Vid::from(*k), props.clone()))
1001 .collect();
1002
1003 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1004}
1005
1006fn build_l0_edge_batch(
1012 l0_ctx: &crate::query::df_graph::L0Context,
1013 edge_type: &str,
1014 internal_schema: &SchemaRef,
1015 type_props: Option<&HashMap<String, uni_common::core::schema::PropertyMeta>>,
1016) -> DFResult<RecordBatch> {
1017 let mut eid_data: HashMap<u64, (u64, u64, Properties, u64)> = HashMap::new();
1020 let mut tombstones: HashSet<u64> = HashSet::new();
1021
1022 for l0 in l0_ctx.iter_l0_buffers() {
1023 let guard = l0.read();
1024 for eid in guard.tombstones.keys() {
1026 tombstones.insert(eid.as_u64());
1027 }
1028 for eid in guard.eids_for_type(edge_type) {
1030 let eid_u64 = eid.as_u64();
1031 if tombstones.contains(&eid_u64) {
1032 continue;
1033 }
1034 let (src_vid, dst_vid) = match guard.get_edge_endpoints(eid) {
1035 Some(endpoints) => (endpoints.0.as_u64(), endpoints.1.as_u64()),
1036 None => continue,
1037 };
1038 let version = guard.edge_versions.get(&eid).copied().unwrap_or(0);
1039 let entry = eid_data
1040 .entry(eid_u64)
1041 .or_insert_with(|| (src_vid, dst_vid, Properties::new(), 0));
1042 if let Some(props) = guard.edge_properties.get(&eid) {
1044 for (k, v) in props {
1045 entry.2.insert(k.clone(), v.clone());
1046 }
1047 }
1048 entry.0 = src_vid;
1050 entry.1 = dst_vid;
1051 if version > entry.3 {
1053 entry.3 = version;
1054 }
1055 }
1056 }
1057
1058 for t in &tombstones {
1060 eid_data.remove(t);
1061 }
1062
1063 if eid_data.is_empty() {
1064 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1065 }
1066
1067 let mut eids: Vec<u64> = eid_data.keys().copied().collect();
1069 eids.sort_unstable();
1070
1071 let num_rows = eids.len();
1072 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1073
1074 let schema_prop_names: HashSet<&str> = type_props
1076 .map(|tp| tp.keys().map(|k| k.as_str()).collect())
1077 .unwrap_or_default();
1078
1079 for field in internal_schema.fields() {
1080 let col_name = field.name().as_str();
1081 match col_name {
1082 "eid" => {
1083 columns.push(Arc::new(UInt64Array::from(eids.clone())));
1084 }
1085 "src_vid" => {
1086 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].0).collect();
1087 columns.push(Arc::new(UInt64Array::from(vals)));
1088 }
1089 "dst_vid" => {
1090 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].1).collect();
1091 columns.push(Arc::new(UInt64Array::from(vals)));
1092 }
1093 "op" => {
1094 let vals = vec![0u8; num_rows];
1096 columns.push(Arc::new(arrow_array::UInt8Array::from(vals)));
1097 }
1098 "_version" => {
1099 let vals: Vec<u64> = eids.iter().map(|e| eid_data[e].3).collect();
1100 columns.push(Arc::new(UInt64Array::from(vals)));
1101 }
1102 "overflow_json" => {
1103 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1105 for eid_u64 in &eids {
1106 let (_, _, props, _) = &eid_data[eid_u64];
1107 let mut overflow = serde_json::Map::new();
1108 for (k, v) in props {
1109 if k.starts_with('_') {
1110 continue;
1111 }
1112 if !schema_prop_names.contains(k.as_str()) {
1113 let json_val: serde_json::Value = v.clone().into();
1114 overflow.insert(k.clone(), json_val);
1115 }
1116 }
1117 if overflow.is_empty() {
1118 builder.append_null();
1119 } else {
1120 let json_val = serde_json::Value::Object(overflow);
1121 match encode_cypher_value(&json_val) {
1122 Ok(bytes) => builder.append_value(bytes),
1123 Err(_) => builder.append_null(),
1124 }
1125 }
1126 }
1127 columns.push(Arc::new(builder.finish()));
1128 }
1129 _ => {
1130 let col =
1132 build_l0_edge_property_column(&eids, &eid_data, col_name, field.data_type())?;
1133 columns.push(col);
1134 }
1135 }
1136 }
1137
1138 RecordBatch::try_new(internal_schema.clone(), columns)
1139 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1140}
1141
1142fn build_l0_edge_property_column(
1146 eids: &[u64],
1147 eid_data: &HashMap<u64, (u64, u64, Properties, u64)>,
1148 prop_name: &str,
1149 data_type: &DataType,
1150) -> DFResult<ArrayRef> {
1151 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(*e)).collect();
1153 let props_map: HashMap<Vid, Properties> = eid_data
1154 .iter()
1155 .map(|(k, (_, _, props, _))| (Vid::from(*k), props.clone()))
1156 .collect();
1157
1158 build_property_column_static(&vid_keys, &props_map, prop_name, data_type)
1159}
1160
1161fn build_labels_column_for_known_label(
1167 vid_arr: &UInt64Array,
1168 label: &str,
1169 l0_ctx: &crate::query::df_graph::L0Context,
1170 batch_labels_col: Option<&arrow_array::ListArray>,
1171) -> DFResult<ArrayRef> {
1172 use uni_store::storage::arrow_convert::labels_from_list_array;
1173
1174 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1175
1176 for i in 0..vid_arr.len() {
1177 let vid = Vid::from(vid_arr.value(i));
1178
1179 let mut labels = match batch_labels_col {
1181 Some(list_arr) => {
1182 let stored = labels_from_list_array(list_arr, i);
1183 if stored.is_empty() {
1184 vec![label.to_string()]
1185 } else {
1186 stored
1187 }
1188 }
1189 None => vec![label.to_string()],
1190 };
1191
1192 if !labels.iter().any(|l| l == label) {
1194 labels.push(label.to_string());
1195 }
1196
1197 for l0 in l0_ctx.iter_l0_buffers() {
1199 let guard = l0.read();
1200 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
1201 for lbl in l0_labels {
1202 if !labels.contains(lbl) {
1203 labels.push(lbl.clone());
1204 }
1205 }
1206 }
1207 }
1208
1209 let values = labels_builder.values();
1210 for lbl in &labels {
1211 values.append_value(lbl);
1212 }
1213 labels_builder.append(true);
1214 }
1215
1216 Ok(Arc::new(labels_builder.finish()))
1217}
1218
1219fn map_to_output_schema(
1225 batch: &RecordBatch,
1226 label: &str,
1227 _variable: &str,
1228 projected_properties: &[String],
1229 output_schema: &SchemaRef,
1230 l0_ctx: &crate::query::df_graph::L0Context,
1231) -> DFResult<RecordBatch> {
1232 if batch.num_rows() == 0 {
1233 return Ok(RecordBatch::new_empty(output_schema.clone()));
1234 }
1235
1236 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1237
1238 let vid_col = batch
1240 .column_by_name("_vid")
1241 .ok_or_else(|| {
1242 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
1243 })?
1244 .clone();
1245 let vid_arr = vid_col
1246 .as_any()
1247 .downcast_ref::<UInt64Array>()
1248 .ok_or_else(|| {
1249 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
1250 })?;
1251
1252 let batch_labels_col = batch
1254 .column_by_name("_labels")
1255 .and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
1256 let labels_col = build_labels_column_for_known_label(vid_arr, label, l0_ctx, batch_labels_col)?;
1257 columns.push(vid_col.clone());
1258 columns.push(labels_col);
1259
1260 let overflow_arr = batch
1263 .column_by_name("overflow_json")
1264 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1265
1266 for prop in projected_properties {
1267 if prop == "overflow_json" {
1268 match batch.column_by_name("overflow_json") {
1269 Some(col) => columns.push(col.clone()),
1270 None => {
1271 columns.push(arrow_array::new_null_array(
1273 &DataType::LargeBinary,
1274 batch.num_rows(),
1275 ));
1276 }
1277 }
1278 } else {
1279 match batch.column_by_name(prop) {
1280 Some(col) => columns.push(col.clone()),
1281 None => {
1282 let col = build_overflow_property_column(
1285 batch.num_rows(),
1286 vid_arr,
1287 overflow_arr,
1288 prop,
1289 l0_ctx,
1290 );
1291 columns.push(col);
1292 }
1293 }
1294 }
1295 }
1296
1297 RecordBatch::try_new(output_schema.clone(), columns)
1298 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1299}
1300
1301fn map_edge_to_output_schema(
1308 batch: &RecordBatch,
1309 variable: &str,
1310 projected_properties: &[String],
1311 output_schema: &SchemaRef,
1312) -> DFResult<RecordBatch> {
1313 if batch.num_rows() == 0 {
1314 return Ok(RecordBatch::new_empty(output_schema.clone()));
1315 }
1316
1317 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
1318
1319 let eid_col = batch
1321 .column_by_name("eid")
1322 .ok_or_else(|| {
1323 datafusion::error::DataFusionError::Internal("Missing eid column".to_string())
1324 })?
1325 .clone();
1326 columns.push(eid_col);
1327
1328 let src_col = batch
1330 .column_by_name("src_vid")
1331 .ok_or_else(|| {
1332 datafusion::error::DataFusionError::Internal("Missing src_vid column".to_string())
1333 })?
1334 .clone();
1335 columns.push(src_col);
1336
1337 let dst_col = batch
1339 .column_by_name("dst_vid")
1340 .ok_or_else(|| {
1341 datafusion::error::DataFusionError::Internal("Missing dst_vid column".to_string())
1342 })?
1343 .clone();
1344 columns.push(dst_col);
1345
1346 for prop in projected_properties {
1348 if prop == "overflow_json" {
1349 match batch.column_by_name("overflow_json") {
1350 Some(col) => columns.push(col.clone()),
1351 None => {
1352 columns.push(arrow_array::new_null_array(
1353 &DataType::LargeBinary,
1354 batch.num_rows(),
1355 ));
1356 }
1357 }
1358 } else {
1359 match batch.column_by_name(prop) {
1360 Some(col) => columns.push(col.clone()),
1361 None => {
1362 let overflow_arr = batch
1365 .column_by_name("overflow_json")
1366 .and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
1367
1368 if let Some(arr) = overflow_arr {
1369 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1370 for i in 0..batch.num_rows() {
1371 if !arr.is_null(i) {
1372 let blob = arr.value(i);
1373 if let Some(sub_bytes) =
1375 uni_common::cypher_value_codec::extract_map_entry_raw(
1376 blob, prop,
1377 )
1378 {
1379 builder.append_value(&sub_bytes);
1380 } else {
1381 builder.append_null();
1382 }
1383 } else {
1384 builder.append_null();
1385 }
1386 }
1387 columns.push(Arc::new(builder.finish()));
1388 } else {
1389 let target_field = output_schema
1391 .fields()
1392 .iter()
1393 .find(|f| f.name() == &format!("{}.{}", variable, prop));
1394 let dt = target_field
1395 .map(|f| f.data_type().clone())
1396 .unwrap_or(DataType::LargeBinary);
1397 columns.push(arrow_array::new_null_array(&dt, batch.num_rows()));
1398 }
1399 }
1400 }
1401 }
1402 }
1403
1404 RecordBatch::try_new(output_schema.clone(), columns)
1405 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1406}
1407
1408async fn columnar_scan_vertex_batch_static(
1415 graph_ctx: &GraphExecutionContext,
1416 label: &str,
1417 variable: &str,
1418 projected_properties: &[String],
1419 output_schema: &SchemaRef,
1420) -> DFResult<RecordBatch> {
1421 let storage = graph_ctx.storage();
1422 let l0_ctx = graph_ctx.l0_context();
1423 let uni_schema = storage.schema_manager().schema();
1424 let label_props = uni_schema.properties.get(label);
1425
1426 let mut lance_columns: Vec<String> = vec![
1428 "_vid".to_string(),
1429 "_deleted".to_string(),
1430 "_version".to_string(),
1431 ];
1432 for prop in projected_properties {
1433 if prop == "overflow_json" {
1434 push_column_if_absent(&mut lance_columns, "overflow_json");
1435 } else {
1436 let exists_in_schema = label_props.is_some_and(|lp| lp.contains_key(prop));
1437 if exists_in_schema {
1438 push_column_if_absent(&mut lance_columns, prop);
1439 }
1440 }
1441 }
1442
1443 let needs_overflow = projected_properties
1445 .iter()
1446 .any(|p| p == "overflow_json" || !label_props.is_some_and(|lp| lp.contains_key(p)));
1447 if needs_overflow {
1448 push_column_if_absent(&mut lance_columns, "overflow_json");
1449 }
1450
1451 let ds = storage
1453 .vertex_dataset(label)
1454 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1455 let lancedb_store = storage.lancedb_store();
1456
1457 let lance_batch = match ds.open_lancedb(lancedb_store).await {
1458 Ok(table) => {
1459 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1460
1461 let table_schema = table
1463 .schema()
1464 .await
1465 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1466 let table_field_names: HashSet<&str> = table_schema
1467 .fields()
1468 .iter()
1469 .map(|f| f.name().as_str())
1470 .collect();
1471
1472 let actual_columns: Vec<&str> = lance_columns
1473 .iter()
1474 .filter(|c| table_field_names.contains(c.as_str()))
1475 .map(|c| c.as_str())
1476 .collect();
1477
1478 let query = table.query().select(Select::columns(&actual_columns));
1479 let query = match storage.version_high_water_mark() {
1483 Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1484 None => query,
1485 };
1486
1487 match query.execute().await {
1488 Ok(stream) => {
1489 use futures::TryStreamExt;
1490 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1491 if batches.is_empty() {
1492 None
1493 } else {
1494 Some(
1495 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1496 .map_err(|e| {
1497 datafusion::error::DataFusionError::ArrowError(
1498 Box::new(e),
1499 None,
1500 )
1501 })?,
1502 )
1503 }
1504 }
1505 Err(_) => None,
1506 }
1507 }
1508 Err(_) => None, };
1510
1511 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1513
1514 let internal_schema = match &lance_deduped {
1517 Some(batch) => batch.schema(),
1518 None => {
1519 let mut fields = vec![
1520 Field::new("_vid", DataType::UInt64, false),
1521 Field::new("_deleted", DataType::Boolean, false),
1522 Field::new("_version", DataType::UInt64, false),
1523 ];
1524 for col in &lance_columns {
1525 if matches!(col.as_str(), "_vid" | "_deleted" | "_version") {
1526 continue;
1527 }
1528 if col == "overflow_json" {
1529 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1530 } else {
1531 let arrow_type = label_props
1532 .and_then(|lp| lp.get(col.as_str()))
1533 .map(|meta| meta.r#type.to_arrow())
1534 .unwrap_or(DataType::LargeBinary);
1535 fields.push(Field::new(col, arrow_type, true));
1536 }
1537 }
1538 Arc::new(Schema::new(fields))
1539 }
1540 };
1541
1542 let l0_batch = build_l0_vertex_batch(l0_ctx, label, &internal_schema, label_props)?;
1544
1545 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1547 else {
1548 return Ok(RecordBatch::new_empty(output_schema.clone()));
1549 };
1550
1551 let merged = filter_deleted_rows(&merged)?;
1553 if merged.num_rows() == 0 {
1554 return Ok(RecordBatch::new_empty(output_schema.clone()));
1555 }
1556
1557 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1559
1560 if filtered.num_rows() == 0 {
1561 return Ok(RecordBatch::new_empty(output_schema.clone()));
1562 }
1563
1564 map_to_output_schema(
1566 &filtered,
1567 label,
1568 variable,
1569 projected_properties,
1570 output_schema,
1571 l0_ctx,
1572 )
1573}
1574
1575async fn columnar_scan_edge_batch_static(
1582 graph_ctx: &GraphExecutionContext,
1583 edge_type: &str,
1584 variable: &str,
1585 projected_properties: &[String],
1586 output_schema: &SchemaRef,
1587) -> DFResult<RecordBatch> {
1588 let storage = graph_ctx.storage();
1589 let l0_ctx = graph_ctx.l0_context();
1590 let uni_schema = storage.schema_manager().schema();
1591 let type_props = uni_schema.properties.get(edge_type);
1592
1593 let mut lance_columns: Vec<String> = vec![
1595 "eid".to_string(),
1596 "src_vid".to_string(),
1597 "dst_vid".to_string(),
1598 "op".to_string(),
1599 "_version".to_string(),
1600 ];
1601 for prop in projected_properties {
1602 if prop == "overflow_json" {
1603 push_column_if_absent(&mut lance_columns, "overflow_json");
1604 } else {
1605 let exists_in_schema = type_props.is_some_and(|tp| tp.contains_key(prop));
1606 if exists_in_schema {
1607 push_column_if_absent(&mut lance_columns, prop);
1608 }
1609 }
1610 }
1611
1612 let needs_overflow = projected_properties
1614 .iter()
1615 .any(|p| p == "overflow_json" || !type_props.is_some_and(|tp| tp.contains_key(p)));
1616 if needs_overflow {
1617 push_column_if_absent(&mut lance_columns, "overflow_json");
1618 }
1619
1620 let ds = storage
1622 .delta_dataset(edge_type, "fwd")
1623 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1624 let lancedb_store = storage.lancedb_store();
1625
1626 let lance_batch = match ds.open_lancedb(lancedb_store).await {
1627 Ok(table) => {
1628 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1629
1630 let table_schema = table
1632 .schema()
1633 .await
1634 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1635 let table_field_names: HashSet<&str> = table_schema
1636 .fields()
1637 .iter()
1638 .map(|f| f.name().as_str())
1639 .collect();
1640
1641 let actual_columns: Vec<&str> = lance_columns
1642 .iter()
1643 .filter(|c| table_field_names.contains(c.as_str()))
1644 .map(|c| c.as_str())
1645 .collect();
1646
1647 let query = table.query().select(Select::columns(&actual_columns));
1651 let query = match storage.version_high_water_mark() {
1652 Some(hwm) => query.only_if(format!("_version <= {}", hwm)),
1653 None => query,
1654 };
1655
1656 match query.execute().await {
1657 Ok(stream) => {
1658 use futures::TryStreamExt;
1659 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1660 if batches.is_empty() {
1661 None
1662 } else {
1663 Some(
1664 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1665 .map_err(|e| {
1666 datafusion::error::DataFusionError::ArrowError(
1667 Box::new(e),
1668 None,
1669 )
1670 })?,
1671 )
1672 }
1673 }
1674 Err(_) => None,
1675 }
1676 }
1677 Err(_) => None, };
1679
1680 let lance_deduped = mvcc_dedup_to_option(lance_batch, "eid")?;
1682
1683 let internal_schema = match &lance_deduped {
1686 Some(batch) => batch.schema(),
1687 None => {
1688 let mut fields = vec![
1689 Field::new("eid", DataType::UInt64, false),
1690 Field::new("src_vid", DataType::UInt64, false),
1691 Field::new("dst_vid", DataType::UInt64, false),
1692 Field::new("op", DataType::UInt8, false),
1693 Field::new("_version", DataType::UInt64, false),
1694 ];
1695 for col in &lance_columns {
1696 if matches!(
1697 col.as_str(),
1698 "eid" | "src_vid" | "dst_vid" | "op" | "_version"
1699 ) {
1700 continue;
1701 }
1702 if col == "overflow_json" {
1703 fields.push(Field::new("overflow_json", DataType::LargeBinary, true));
1704 } else {
1705 let arrow_type = type_props
1706 .and_then(|tp| tp.get(col.as_str()))
1707 .map(|meta| meta.r#type.to_arrow())
1708 .unwrap_or(DataType::LargeBinary);
1709 fields.push(Field::new(col, arrow_type, true));
1710 }
1711 }
1712 Arc::new(Schema::new(fields))
1713 }
1714 };
1715
1716 let l0_batch = build_l0_edge_batch(l0_ctx, edge_type, &internal_schema, type_props)?;
1718
1719 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "eid")? else {
1721 return Ok(RecordBatch::new_empty(output_schema.clone()));
1722 };
1723
1724 let merged = filter_deleted_edge_ops(&merged)?;
1726 if merged.num_rows() == 0 {
1727 return Ok(RecordBatch::new_empty(output_schema.clone()));
1728 }
1729
1730 let filtered = filter_l0_edge_tombstones(&merged, l0_ctx)?;
1732
1733 if filtered.num_rows() == 0 {
1734 return Ok(RecordBatch::new_empty(output_schema.clone()));
1735 }
1736
1737 map_edge_to_output_schema(&filtered, variable, projected_properties, output_schema)
1739}
1740
1741async fn columnar_scan_schemaless_vertex_batch_static(
1748 graph_ctx: &GraphExecutionContext,
1749 label: &str,
1750 variable: &str,
1751 projected_properties: &[String],
1752 output_schema: &SchemaRef,
1753) -> DFResult<RecordBatch> {
1754 use uni_store::storage::main_vertex::MainVertexDataset;
1755
1756 let storage = graph_ctx.storage();
1757 let l0_ctx = graph_ctx.l0_context();
1758 let lancedb_store = storage.lancedb_store();
1759
1760 let filter = {
1763 let mut parts = Vec::new();
1764
1765 if !label.is_empty() {
1767 if label.contains(':') {
1768 for lbl in label.split(':') {
1770 parts.push(format!("array_contains(labels, '{}')", lbl));
1771 }
1772 } else {
1773 parts.push(format!("array_contains(labels, '{}')", label));
1774 }
1775 }
1776
1777 if let Some(hwm) = storage.version_high_water_mark() {
1779 parts.push(format!("_version <= {}", hwm));
1780 }
1781
1782 if parts.is_empty() {
1783 None
1784 } else {
1785 Some(parts.join(" AND "))
1786 }
1787 };
1788
1789 let lance_batch = match MainVertexDataset::open_table(lancedb_store).await {
1791 Ok(table) => {
1792 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1793
1794 let query = table.query().select(Select::columns(&[
1795 "_vid",
1796 "_deleted",
1797 "labels",
1798 "props_json",
1799 "_version",
1800 ]));
1801 let query = match filter {
1802 Some(f) => query.only_if(f),
1803 None => query,
1804 };
1805
1806 match query.execute().await {
1807 Ok(stream) => {
1808 use futures::TryStreamExt;
1809 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap_or_default();
1810 if batches.is_empty() {
1811 None
1812 } else {
1813 Some(
1814 arrow::compute::concat_batches(&batches[0].schema(), &batches)
1815 .map_err(|e| {
1816 datafusion::error::DataFusionError::ArrowError(
1817 Box::new(e),
1818 None,
1819 )
1820 })?,
1821 )
1822 }
1823 }
1824 Err(_) => None,
1825 }
1826 }
1827 Err(_) => None, };
1829
1830 let lance_deduped = mvcc_dedup_to_option(lance_batch, "_vid")?;
1832
1833 let internal_schema = match &lance_deduped {
1836 Some(batch) => batch.schema(),
1837 None => Arc::new(Schema::new(vec![
1838 Field::new("_vid", DataType::UInt64, false),
1839 Field::new("_deleted", DataType::Boolean, false),
1840 Field::new("labels", labels_data_type(), false),
1841 Field::new("props_json", DataType::LargeBinary, true),
1842 Field::new("_version", DataType::UInt64, false),
1843 ])),
1844 };
1845
1846 let l0_batch = build_l0_schemaless_vertex_batch(l0_ctx, label, &internal_schema)?;
1848
1849 let Some(merged) = merge_lance_and_l0(lance_deduped, l0_batch, &internal_schema, "_vid")?
1851 else {
1852 return Ok(RecordBatch::new_empty(output_schema.clone()));
1853 };
1854
1855 let merged = filter_deleted_rows(&merged)?;
1857 if merged.num_rows() == 0 {
1858 return Ok(RecordBatch::new_empty(output_schema.clone()));
1859 }
1860
1861 let filtered = filter_l0_tombstones(&merged, l0_ctx)?;
1863
1864 if filtered.num_rows() == 0 {
1865 return Ok(RecordBatch::new_empty(output_schema.clone()));
1866 }
1867
1868 map_to_schemaless_output_schema(
1870 &filtered,
1871 variable,
1872 projected_properties,
1873 output_schema,
1874 l0_ctx,
1875 )
1876}
1877
1878fn build_l0_schemaless_vertex_batch(
1884 l0_ctx: &crate::query::df_graph::L0Context,
1885 label: &str,
1886 internal_schema: &SchemaRef,
1887) -> DFResult<RecordBatch> {
1888 let mut vid_data: HashMap<u64, (Properties, u64, Vec<String>)> = HashMap::new();
1891 let mut tombstones: HashSet<u64> = HashSet::new();
1892
1893 let label_filter: Vec<&str> = if label.is_empty() {
1895 vec![]
1896 } else if label.contains(':') {
1897 label.split(':').collect()
1898 } else {
1899 vec![label]
1900 };
1901
1902 for l0 in l0_ctx.iter_l0_buffers() {
1903 let guard = l0.read();
1904
1905 for vid in guard.vertex_tombstones.iter() {
1907 tombstones.insert(vid.as_u64());
1908 }
1909
1910 let vids: Vec<Vid> = if label_filter.is_empty() {
1912 guard.all_vertex_vids()
1913 } else if label_filter.len() == 1 {
1914 guard.vids_for_label(label_filter[0])
1915 } else {
1916 guard.vids_with_all_labels(&label_filter)
1917 };
1918
1919 for vid in vids {
1920 let vid_u64 = vid.as_u64();
1921 if tombstones.contains(&vid_u64) {
1922 continue;
1923 }
1924 let version = guard.vertex_versions.get(&vid).copied().unwrap_or(0);
1925 let entry = vid_data
1926 .entry(vid_u64)
1927 .or_insert_with(|| (Properties::new(), 0, Vec::new()));
1928
1929 if let Some(props) = guard.vertex_properties.get(&vid) {
1931 for (k, v) in props {
1932 entry.0.insert(k.clone(), v.clone());
1933 }
1934 }
1935 if version > entry.1 {
1937 entry.1 = version;
1938 }
1939 if let Some(labels) = guard.vertex_labels.get(&vid) {
1941 entry.2 = labels.clone();
1942 }
1943 }
1944 }
1945
1946 for t in &tombstones {
1948 vid_data.remove(t);
1949 }
1950
1951 if vid_data.is_empty() {
1952 return Ok(RecordBatch::new_empty(internal_schema.clone()));
1953 }
1954
1955 let mut vids: Vec<u64> = vid_data.keys().copied().collect();
1957 vids.sort_unstable();
1958
1959 let num_rows = vids.len();
1960 let mut columns: Vec<ArrayRef> = Vec::with_capacity(internal_schema.fields().len());
1961
1962 for field in internal_schema.fields() {
1963 match field.name().as_str() {
1964 "_vid" => {
1965 columns.push(Arc::new(UInt64Array::from(vids.clone())));
1966 }
1967 "labels" => {
1968 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1969 for vid_u64 in &vids {
1970 let (_, _, labels) = &vid_data[vid_u64];
1971 let values = labels_builder.values();
1972 for lbl in labels {
1973 values.append_value(lbl);
1974 }
1975 labels_builder.append(true);
1976 }
1977 columns.push(Arc::new(labels_builder.finish()));
1978 }
1979 "props_json" => {
1980 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
1981 for vid_u64 in &vids {
1982 let (props, _, _) = &vid_data[vid_u64];
1983 if props.is_empty() {
1984 builder.append_null();
1985 } else {
1986 let json_obj: serde_json::Value = {
1988 let mut map = serde_json::Map::new();
1989 for (k, v) in props {
1990 let json_val: serde_json::Value = v.clone().into();
1991 map.insert(k.clone(), json_val);
1992 }
1993 serde_json::Value::Object(map)
1994 };
1995 match encode_cypher_value(&json_obj) {
1996 Ok(bytes) => builder.append_value(bytes),
1997 Err(_) => builder.append_null(),
1998 }
1999 }
2000 }
2001 columns.push(Arc::new(builder.finish()));
2002 }
2003 "_deleted" => {
2004 columns.push(Arc::new(arrow_array::BooleanArray::from(vec![
2006 false;
2007 num_rows
2008 ])));
2009 }
2010 "_version" => {
2011 let vals: Vec<u64> = vids.iter().map(|v| vid_data[v].1).collect();
2012 columns.push(Arc::new(UInt64Array::from(vals)));
2013 }
2014 _ => {
2015 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
2017 }
2018 }
2019 }
2020
2021 RecordBatch::try_new(internal_schema.clone(), columns)
2022 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2023}
2024
2025fn map_to_schemaless_output_schema(
2032 batch: &RecordBatch,
2033 _variable: &str,
2034 projected_properties: &[String],
2035 output_schema: &SchemaRef,
2036 l0_ctx: &crate::query::df_graph::L0Context,
2037) -> DFResult<RecordBatch> {
2038 if batch.num_rows() == 0 {
2039 return Ok(RecordBatch::new_empty(output_schema.clone()));
2040 }
2041
2042 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
2043
2044 let vid_col = batch
2046 .column_by_name("_vid")
2047 .ok_or_else(|| {
2048 datafusion::error::DataFusionError::Internal("Missing _vid column".to_string())
2049 })?
2050 .clone();
2051 let vid_arr = vid_col
2052 .as_any()
2053 .downcast_ref::<UInt64Array>()
2054 .ok_or_else(|| {
2055 datafusion::error::DataFusionError::Internal("_vid not UInt64".to_string())
2056 })?;
2057 columns.push(vid_col.clone());
2058
2059 let labels_col = batch.column_by_name("labels");
2061 let labels_arr = labels_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::ListArray>());
2062
2063 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2064 for i in 0..vid_arr.len() {
2065 let vid_u64 = vid_arr.value(i);
2066 let vid = Vid::from(vid_u64);
2067
2068 let mut row_labels: Vec<String> = Vec::new();
2070 if let Some(arr) = labels_arr
2071 && !arr.is_null(i)
2072 {
2073 let list_val = arr.value(i);
2074 if let Some(str_arr) = list_val.as_any().downcast_ref::<arrow_array::StringArray>() {
2075 for j in 0..str_arr.len() {
2076 if !str_arr.is_null(j) {
2077 row_labels.push(str_arr.value(j).to_string());
2078 }
2079 }
2080 }
2081 }
2082
2083 for l0 in l0_ctx.iter_l0_buffers() {
2085 let guard = l0.read();
2086 if let Some(l0_labels) = guard.vertex_labels.get(&vid) {
2087 for lbl in l0_labels {
2088 if !row_labels.contains(lbl) {
2089 row_labels.push(lbl.clone());
2090 }
2091 }
2092 }
2093 }
2094
2095 let values = labels_builder.values();
2096 for lbl in &row_labels {
2097 values.append_value(lbl);
2098 }
2099 labels_builder.append(true);
2100 }
2101 columns.push(Arc::new(labels_builder.finish()));
2102
2103 let props_col = batch.column_by_name("props_json");
2105 let props_arr =
2106 props_col.and_then(|c| c.as_any().downcast_ref::<arrow_array::LargeBinaryArray>());
2107
2108 for prop in projected_properties {
2109 if prop == "_all_props" {
2110 match props_col {
2112 Some(col) => columns.push(col.clone()),
2113 None => {
2114 columns.push(arrow_array::new_null_array(
2115 &DataType::LargeBinary,
2116 batch.num_rows(),
2117 ));
2118 }
2119 }
2120 } else {
2121 let col =
2123 build_overflow_property_column(batch.num_rows(), vid_arr, props_arr, prop, l0_ctx);
2124 columns.push(col);
2125 }
2126 }
2127
2128 RecordBatch::try_new(output_schema.clone(), columns)
2129 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2130}
2131
2132pub(crate) fn get_property_value(
2134 vid: &Vid,
2135 props_map: &HashMap<Vid, Properties>,
2136 prop_name: &str,
2137) -> Option<Value> {
2138 if prop_name == "_all_props" {
2139 return props_map.get(vid).map(|p| {
2140 let map: HashMap<String, Value> =
2141 p.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2142 Value::Map(map)
2143 });
2144 }
2145 props_map
2146 .get(vid)
2147 .and_then(|props| props.get(prop_name))
2148 .cloned()
2149}
2150
2151pub(crate) fn encode_cypher_value(val: &serde_json::Value) -> Result<Vec<u8>, String> {
2155 let uni_val: uni_common::Value = val.clone().into();
2156 Ok(uni_common::cypher_value_codec::encode(&uni_val))
2157}
2158
2159macro_rules! build_numeric_column {
2161 ($vids:expr, $props_map:expr, $prop_name:expr, $builder_ty:ty, $extractor:expr, $cast:expr) => {{
2162 let mut builder = <$builder_ty>::new();
2163 for vid in $vids {
2164 match get_property_value(vid, $props_map, $prop_name) {
2165 Some(ref v) => {
2166 if let Some(val) = $extractor(v) {
2167 builder.append_value($cast(val));
2168 } else {
2169 builder.append_null();
2170 }
2171 }
2172 None => builder.append_null(),
2173 }
2174 }
2175 Ok(Arc::new(builder.finish()) as ArrayRef)
2176 }};
2177}
2178
2179pub(crate) fn build_property_column_static(
2181 vids: &[Vid],
2182 props_map: &HashMap<Vid, Properties>,
2183 prop_name: &str,
2184 data_type: &DataType,
2185) -> DFResult<ArrayRef> {
2186 match data_type {
2187 DataType::LargeBinary => {
2188 use arrow_array::builder::LargeBinaryBuilder;
2190 let mut builder = LargeBinaryBuilder::new();
2191
2192 for vid in vids {
2193 match get_property_value(vid, props_map, prop_name) {
2194 Some(Value::Null) | None => builder.append_null(),
2195 Some(Value::Bytes(bytes)) => {
2196 builder.append_value(&bytes);
2197 }
2198 Some(Value::List(arr)) if arr.iter().all(|v| v.as_u64().is_some()) => {
2199 let bytes: Vec<u8> = arr
2202 .iter()
2203 .filter_map(|v| v.as_u64().map(|n| n as u8))
2204 .collect();
2205 if uni_common::cypher_value_codec::decode(&bytes).is_ok() {
2206 builder.append_value(&bytes);
2207 } else {
2208 let json_val: serde_json::Value = Value::List(arr).into();
2209 match encode_cypher_value(&json_val) {
2210 Ok(encoded) => builder.append_value(encoded),
2211 Err(_) => builder.append_null(),
2212 }
2213 }
2214 }
2215 Some(val) => {
2216 let json_val: serde_json::Value = val.into();
2218 match encode_cypher_value(&json_val) {
2219 Ok(bytes) => builder.append_value(bytes),
2220 Err(_) => builder.append_null(),
2221 }
2222 }
2223 }
2224 }
2225 Ok(Arc::new(builder.finish()))
2226 }
2227 DataType::Binary => {
2228 let mut builder = BinaryBuilder::new();
2230 for vid in vids {
2231 let bytes = get_property_value(vid, props_map, prop_name)
2232 .filter(|v| !v.is_null())
2233 .and_then(|v| {
2234 let json_val: serde_json::Value = v.into();
2235 serde_json::from_value::<uni_crdt::Crdt>(json_val).ok()
2236 })
2237 .and_then(|crdt| crdt.to_msgpack().ok());
2238 match bytes {
2239 Some(b) => builder.append_value(&b),
2240 None => builder.append_null(),
2241 }
2242 }
2243 Ok(Arc::new(builder.finish()))
2244 }
2245 DataType::Utf8 => {
2246 let mut builder = StringBuilder::new();
2247 for vid in vids {
2248 match get_property_value(vid, props_map, prop_name) {
2249 Some(Value::String(s)) => builder.append_value(s),
2250 Some(Value::Null) | None => builder.append_null(),
2251 Some(other) => builder.append_value(other.to_string()),
2252 }
2253 }
2254 Ok(Arc::new(builder.finish()))
2255 }
2256 DataType::Int64 => {
2257 build_numeric_column!(
2258 vids,
2259 props_map,
2260 prop_name,
2261 Int64Builder,
2262 |v: &Value| v.as_i64(),
2263 |v| v
2264 )
2265 }
2266 DataType::Int32 => {
2267 build_numeric_column!(
2268 vids,
2269 props_map,
2270 prop_name,
2271 Int32Builder,
2272 |v: &Value| v.as_i64(),
2273 |v: i64| v as i32
2274 )
2275 }
2276 DataType::Float64 => {
2277 build_numeric_column!(
2278 vids,
2279 props_map,
2280 prop_name,
2281 Float64Builder,
2282 |v: &Value| v.as_f64(),
2283 |v| v
2284 )
2285 }
2286 DataType::Float32 => {
2287 build_numeric_column!(
2288 vids,
2289 props_map,
2290 prop_name,
2291 Float32Builder,
2292 |v: &Value| v.as_f64(),
2293 |v: f64| v as f32
2294 )
2295 }
2296 DataType::Boolean => {
2297 let mut builder = BooleanBuilder::new();
2298 for vid in vids {
2299 match get_property_value(vid, props_map, prop_name) {
2300 Some(Value::Bool(b)) => builder.append_value(b),
2301 _ => builder.append_null(),
2302 }
2303 }
2304 Ok(Arc::new(builder.finish()))
2305 }
2306 DataType::UInt64 => {
2307 build_numeric_column!(
2308 vids,
2309 props_map,
2310 prop_name,
2311 UInt64Builder,
2312 |v: &Value| v.as_u64(),
2313 |v| v
2314 )
2315 }
2316 DataType::FixedSizeList(inner, dim) if *inner.data_type() == DataType::Float32 => {
2317 let values_builder = Float32Builder::new();
2319 let mut list_builder = FixedSizeListBuilder::new(values_builder, *dim);
2320 for vid in vids {
2321 match get_property_value(vid, props_map, prop_name) {
2322 Some(Value::Vector(v)) => {
2323 for val in v {
2324 list_builder.values().append_value(val);
2325 }
2326 list_builder.append(true);
2327 }
2328 Some(Value::List(arr)) => {
2329 for v in arr {
2330 list_builder
2331 .values()
2332 .append_value(v.as_f64().unwrap_or(0.0) as f32);
2333 }
2334 list_builder.append(true);
2335 }
2336 _ => {
2337 for _ in 0..*dim {
2339 list_builder.values().append_null();
2340 }
2341 list_builder.append(false);
2342 }
2343 }
2344 }
2345 Ok(Arc::new(list_builder.finish()))
2346 }
2347 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
2348 let mut builder = TimestampNanosecondBuilder::new().with_timezone("UTC");
2350 for vid in vids {
2351 match get_property_value(vid, props_map, prop_name) {
2352 Some(Value::Temporal(tv)) => match tv {
2353 uni_common::TemporalValue::DateTime {
2354 nanos_since_epoch, ..
2355 }
2356 | uni_common::TemporalValue::LocalDateTime {
2357 nanos_since_epoch, ..
2358 } => {
2359 builder.append_value(nanos_since_epoch);
2360 }
2361 uni_common::TemporalValue::Date { days_since_epoch } => {
2362 builder.append_value(days_since_epoch as i64 * 86_400_000_000_000);
2363 }
2364 _ => builder.append_null(),
2365 },
2366 Some(Value::String(s)) => match parse_datetime_utc(&s) {
2367 Ok(dt) => builder.append_value(dt.timestamp_nanos_opt().unwrap_or(0)),
2368 Err(_) => builder.append_null(),
2369 },
2370 Some(Value::Int(n)) => {
2371 builder.append_value(n);
2372 }
2373 _ => builder.append_null(),
2374 }
2375 }
2376 Ok(Arc::new(builder.finish()))
2377 }
2378 DataType::Date32 => {
2379 let mut builder = Date32Builder::new();
2380 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
2381 for vid in vids {
2382 match get_property_value(vid, props_map, prop_name) {
2383 Some(Value::Temporal(uni_common::TemporalValue::Date { days_since_epoch })) => {
2384 builder.append_value(days_since_epoch);
2385 }
2386 Some(Value::String(s)) => match NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
2387 Ok(d) => builder.append_value((d - epoch).num_days() as i32),
2388 Err(_) => builder.append_null(),
2389 },
2390 Some(Value::Int(n)) => {
2391 builder.append_value(n as i32);
2392 }
2393 _ => builder.append_null(),
2394 }
2395 }
2396 Ok(Arc::new(builder.finish()))
2397 }
2398 DataType::Time64(TimeUnit::Nanosecond) => {
2399 let mut builder = Time64NanosecondBuilder::new();
2400 for vid in vids {
2401 match get_property_value(vid, props_map, prop_name) {
2402 Some(Value::Temporal(
2403 uni_common::TemporalValue::LocalTime {
2404 nanos_since_midnight,
2405 }
2406 | uni_common::TemporalValue::Time {
2407 nanos_since_midnight,
2408 ..
2409 },
2410 )) => {
2411 builder.append_value(nanos_since_midnight);
2412 }
2413 Some(Value::Temporal(_)) => builder.append_null(),
2414 Some(Value::String(s)) => {
2415 match NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")
2416 .or_else(|_| NaiveTime::parse_from_str(&s, "%H:%M:%S"))
2417 {
2418 Ok(t) => {
2419 let nanos = t.num_seconds_from_midnight() as i64 * 1_000_000_000
2420 + t.nanosecond() as i64;
2421 builder.append_value(nanos);
2422 }
2423 Err(_) => builder.append_null(),
2424 }
2425 }
2426 Some(Value::Int(n)) => {
2427 builder.append_value(n);
2428 }
2429 _ => builder.append_null(),
2430 }
2431 }
2432 Ok(Arc::new(builder.finish()))
2433 }
2434 DataType::Interval(IntervalUnit::MonthDayNano) => {
2435 let mut values: Vec<Option<arrow::datatypes::IntervalMonthDayNano>> =
2436 Vec::with_capacity(vids.len());
2437 for vid in vids {
2438 match get_property_value(vid, props_map, prop_name) {
2439 Some(Value::Temporal(uni_common::TemporalValue::Duration {
2440 months,
2441 days,
2442 nanos,
2443 })) => {
2444 values.push(Some(arrow::datatypes::IntervalMonthDayNano {
2445 months: months as i32,
2446 days: days as i32,
2447 nanoseconds: nanos,
2448 }));
2449 }
2450 Some(Value::Int(_n)) => {
2451 values.push(None);
2452 }
2453 _ => values.push(None),
2454 }
2455 }
2456 let arr: arrow_array::IntervalMonthDayNanoArray = values.into_iter().collect();
2457 Ok(Arc::new(arr))
2458 }
2459 DataType::List(inner_field) => {
2460 build_list_property_column(vids, props_map, prop_name, inner_field)
2461 }
2462 DataType::Struct(fields) => {
2463 build_struct_property_column(vids, props_map, prop_name, fields)
2464 }
2465 _ => {
2467 let mut builder = StringBuilder::new();
2468 for vid in vids {
2469 match get_property_value(vid, props_map, prop_name) {
2470 Some(Value::Null) | None => builder.append_null(),
2471 Some(other) => builder.append_value(other.to_string()),
2472 }
2473 }
2474 Ok(Arc::new(builder.finish()))
2475 }
2476 }
2477}
2478
2479fn build_list_property_column(
2481 vids: &[Vid],
2482 props_map: &HashMap<Vid, Properties>,
2483 prop_name: &str,
2484 inner_field: &Arc<Field>,
2485) -> DFResult<ArrayRef> {
2486 match inner_field.data_type() {
2487 DataType::Utf8 => {
2488 let mut builder = ListBuilder::new(StringBuilder::new());
2489 for vid in vids {
2490 match get_property_value(vid, props_map, prop_name) {
2491 Some(Value::List(arr)) => {
2492 for v in arr {
2493 match v {
2494 Value::String(s) => builder.values().append_value(s),
2495 Value::Null => builder.values().append_null(),
2496 other => builder.values().append_value(format!("{other:?}")),
2497 }
2498 }
2499 builder.append(true);
2500 }
2501 _ => builder.append(false),
2502 }
2503 }
2504 Ok(Arc::new(builder.finish()))
2505 }
2506 DataType::Int64 => {
2507 let mut builder = ListBuilder::new(Int64Builder::new());
2508 for vid in vids {
2509 match get_property_value(vid, props_map, prop_name) {
2510 Some(Value::List(arr)) => {
2511 for v in arr {
2512 match v.as_i64() {
2513 Some(n) => builder.values().append_value(n),
2514 None => builder.values().append_null(),
2515 }
2516 }
2517 builder.append(true);
2518 }
2519 _ => builder.append(false),
2520 }
2521 }
2522 Ok(Arc::new(builder.finish()))
2523 }
2524 DataType::Float64 => {
2525 let mut builder = ListBuilder::new(Float64Builder::new());
2526 for vid in vids {
2527 match get_property_value(vid, props_map, prop_name) {
2528 Some(Value::List(arr)) => {
2529 for v in arr {
2530 match v.as_f64() {
2531 Some(n) => builder.values().append_value(n),
2532 None => builder.values().append_null(),
2533 }
2534 }
2535 builder.append(true);
2536 }
2537 _ => builder.append(false),
2538 }
2539 }
2540 Ok(Arc::new(builder.finish()))
2541 }
2542 DataType::Boolean => {
2543 let mut builder = ListBuilder::new(BooleanBuilder::new());
2544 for vid in vids {
2545 match get_property_value(vid, props_map, prop_name) {
2546 Some(Value::List(arr)) => {
2547 for v in arr {
2548 match v.as_bool() {
2549 Some(b) => builder.values().append_value(b),
2550 None => builder.values().append_null(),
2551 }
2552 }
2553 builder.append(true);
2554 }
2555 _ => builder.append(false),
2556 }
2557 }
2558 Ok(Arc::new(builder.finish()))
2559 }
2560 DataType::Struct(fields) => {
2561 build_list_of_structs_column(vids, props_map, prop_name, fields)
2563 }
2564 _ => {
2566 let mut builder = ListBuilder::new(StringBuilder::new());
2567 for vid in vids {
2568 match get_property_value(vid, props_map, prop_name) {
2569 Some(Value::List(arr)) => {
2570 for v in arr {
2571 match v {
2572 Value::Null => builder.values().append_null(),
2573 other => builder.values().append_value(format!("{other:?}")),
2574 }
2575 }
2576 builder.append(true);
2577 }
2578 _ => builder.append(false),
2579 }
2580 }
2581 Ok(Arc::new(builder.finish()))
2582 }
2583 }
2584}
2585
2586fn build_list_of_structs_column(
2592 vids: &[Vid],
2593 props_map: &HashMap<Vid, Properties>,
2594 prop_name: &str,
2595 fields: &Fields,
2596) -> DFResult<ArrayRef> {
2597 use arrow_array::StructArray;
2598
2599 let values: Vec<Option<Value>> = vids
2600 .iter()
2601 .map(|vid| get_property_value(vid, props_map, prop_name))
2602 .collect();
2603
2604 let rows: Vec<Option<Vec<HashMap<String, Value>>>> = values
2607 .iter()
2608 .map(|val| match val {
2609 Some(Value::List(arr)) => {
2610 let objs: Vec<HashMap<String, Value>> = arr
2611 .iter()
2612 .filter_map(|v| {
2613 if let Value::Map(m) = v {
2614 Some(m.clone())
2615 } else {
2616 None
2617 }
2618 })
2619 .collect();
2620 if objs.is_empty() { None } else { Some(objs) }
2621 }
2622 Some(Value::Map(obj)) => {
2623 let kv_pairs: Vec<HashMap<String, Value>> = obj
2625 .iter()
2626 .map(|(k, v)| {
2627 let mut m = HashMap::new();
2628 m.insert("key".to_string(), Value::String(k.clone()));
2629 m.insert("value".to_string(), v.clone());
2630 m
2631 })
2632 .collect();
2633 Some(kv_pairs)
2634 }
2635 _ => None,
2636 })
2637 .collect();
2638
2639 let total_items: usize = rows
2640 .iter()
2641 .filter_map(|r| r.as_ref())
2642 .map(|v| v.len())
2643 .sum();
2644
2645 let child_arrays: Vec<ArrayRef> = fields
2647 .iter()
2648 .map(|field| {
2649 let field_name = field.name();
2650 match field.data_type() {
2651 DataType::Utf8 => {
2652 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2653 for obj in rows.iter().flatten().flatten() {
2654 match obj.get(field_name) {
2655 Some(Value::String(s)) => builder.append_value(s),
2656 Some(Value::Null) | None => builder.append_null(),
2657 Some(other) => builder.append_value(format!("{other:?}")),
2658 }
2659 }
2660 Arc::new(builder.finish()) as ArrayRef
2661 }
2662 DataType::Int64 => {
2663 let mut builder = Int64Builder::with_capacity(total_items);
2664 for obj in rows.iter().flatten().flatten() {
2665 match obj.get(field_name).and_then(|v| v.as_i64()) {
2666 Some(n) => builder.append_value(n),
2667 None => builder.append_null(),
2668 }
2669 }
2670 Arc::new(builder.finish()) as ArrayRef
2671 }
2672 DataType::Float64 => {
2673 let mut builder = Float64Builder::with_capacity(total_items);
2674 for obj in rows.iter().flatten().flatten() {
2675 match obj.get(field_name).and_then(|v| v.as_f64()) {
2676 Some(n) => builder.append_value(n),
2677 None => builder.append_null(),
2678 }
2679 }
2680 Arc::new(builder.finish()) as ArrayRef
2681 }
2682 _ => {
2684 let mut builder = StringBuilder::with_capacity(total_items, total_items * 16);
2685 for obj in rows.iter().flatten().flatten() {
2686 match obj.get(field_name) {
2687 Some(Value::Null) | None => builder.append_null(),
2688 Some(other) => builder.append_value(format!("{other:?}")),
2689 }
2690 }
2691 Arc::new(builder.finish()) as ArrayRef
2692 }
2693 }
2694 })
2695 .collect();
2696
2697 let struct_array = StructArray::try_new(fields.clone(), child_arrays, None)
2699 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2700
2701 let mut offsets = Vec::with_capacity(vids.len() + 1);
2703 let mut nulls = Vec::with_capacity(vids.len());
2704 let mut offset = 0i32;
2705 offsets.push(offset);
2706 for row in &rows {
2707 match row {
2708 Some(objs) => {
2709 offset += objs.len() as i32;
2710 offsets.push(offset);
2711 nulls.push(true);
2712 }
2713 None => {
2714 offsets.push(offset);
2715 nulls.push(false);
2716 }
2717 }
2718 }
2719
2720 let list_field = Arc::new(Field::new("item", DataType::Struct(fields.clone()), true));
2721 let list_array = arrow_array::ListArray::try_new(
2722 list_field,
2723 arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)),
2724 Arc::new(struct_array),
2725 Some(arrow::buffer::NullBuffer::from(nulls)),
2726 )
2727 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2728
2729 Ok(Arc::new(list_array))
2730}
2731
2732fn temporal_to_struct_map(tv: &uni_common::value::TemporalValue) -> HashMap<String, Value> {
2735 use uni_common::value::TemporalValue;
2736 let mut m = HashMap::new();
2737 match tv {
2738 TemporalValue::DateTime {
2739 nanos_since_epoch,
2740 offset_seconds,
2741 timezone_name,
2742 } => {
2743 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2744 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2745 if let Some(tz) = timezone_name {
2746 m.insert("timezone_name".into(), Value::String(tz.clone()));
2747 }
2748 }
2749 TemporalValue::LocalDateTime { nanos_since_epoch } => {
2750 m.insert("nanos_since_epoch".into(), Value::Int(*nanos_since_epoch));
2751 }
2752 TemporalValue::Time {
2753 nanos_since_midnight,
2754 offset_seconds,
2755 } => {
2756 m.insert(
2757 "nanos_since_midnight".into(),
2758 Value::Int(*nanos_since_midnight),
2759 );
2760 m.insert("offset_seconds".into(), Value::Int(*offset_seconds as i64));
2761 }
2762 TemporalValue::LocalTime {
2763 nanos_since_midnight,
2764 } => {
2765 m.insert(
2766 "nanos_since_midnight".into(),
2767 Value::Int(*nanos_since_midnight),
2768 );
2769 }
2770 TemporalValue::Date { days_since_epoch } => {
2771 m.insert(
2772 "days_since_epoch".into(),
2773 Value::Int(*days_since_epoch as i64),
2774 );
2775 }
2776 TemporalValue::Duration {
2777 months,
2778 days,
2779 nanos,
2780 } => {
2781 m.insert("months".into(), Value::Int(*months));
2782 m.insert("days".into(), Value::Int(*days));
2783 m.insert("nanos".into(), Value::Int(*nanos));
2784 }
2785 }
2786 m
2787}
2788
2789fn build_struct_property_column(
2791 vids: &[Vid],
2792 props_map: &HashMap<Vid, Properties>,
2793 prop_name: &str,
2794 fields: &Fields,
2795) -> DFResult<ArrayRef> {
2796 use arrow_array::StructArray;
2797
2798 let values: Vec<Option<Value>> = vids
2801 .iter()
2802 .map(|vid| {
2803 let val = get_property_value(vid, props_map, prop_name);
2804 match val {
2805 Some(Value::Temporal(ref tv)) => Some(Value::Map(temporal_to_struct_map(tv))),
2806 other => other,
2807 }
2808 })
2809 .collect();
2810
2811 let child_arrays: Vec<ArrayRef> = fields
2812 .iter()
2813 .map(|field| {
2814 let field_name = field.name();
2815 match field.data_type() {
2816 DataType::Float64 => {
2817 let mut builder = Float64Builder::with_capacity(vids.len());
2818 for val in &values {
2819 match val {
2820 Some(Value::Map(obj)) => {
2821 match obj.get(field_name).and_then(|v| v.as_f64()) {
2822 Some(n) => builder.append_value(n),
2823 None => builder.append_null(),
2824 }
2825 }
2826 _ => builder.append_null(),
2827 }
2828 }
2829 Arc::new(builder.finish()) as ArrayRef
2830 }
2831 DataType::Utf8 => {
2832 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2833 for val in &values {
2834 match val {
2835 Some(Value::Map(obj)) => match obj.get(field_name) {
2836 Some(Value::String(s)) => builder.append_value(s),
2837 Some(Value::Null) | None => builder.append_null(),
2838 Some(other) => builder.append_value(format!("{other:?}")),
2839 },
2840 _ => builder.append_null(),
2841 }
2842 }
2843 Arc::new(builder.finish()) as ArrayRef
2844 }
2845 DataType::Int64 => {
2846 let mut builder = Int64Builder::with_capacity(vids.len());
2847 for val in &values {
2848 match val {
2849 Some(Value::Map(obj)) => {
2850 match obj.get(field_name).and_then(|v| v.as_i64()) {
2851 Some(n) => builder.append_value(n),
2852 None => builder.append_null(),
2853 }
2854 }
2855 _ => builder.append_null(),
2856 }
2857 }
2858 Arc::new(builder.finish()) as ArrayRef
2859 }
2860 DataType::Timestamp(_, _) => {
2861 let mut builder = TimestampNanosecondBuilder::with_capacity(vids.len());
2862 for val in &values {
2863 match val {
2864 Some(Value::Map(obj)) => {
2865 match obj.get(field_name).and_then(|v| v.as_i64()) {
2866 Some(n) => builder.append_value(n),
2867 None => builder.append_null(),
2868 }
2869 }
2870 _ => builder.append_null(),
2871 }
2872 }
2873 Arc::new(builder.finish()) as ArrayRef
2874 }
2875 DataType::Int32 => {
2876 let mut builder = Int32Builder::with_capacity(vids.len());
2877 for val in &values {
2878 match val {
2879 Some(Value::Map(obj)) => {
2880 match obj.get(field_name).and_then(|v| v.as_i64()) {
2881 Some(n) => builder.append_value(n as i32),
2882 None => builder.append_null(),
2883 }
2884 }
2885 _ => builder.append_null(),
2886 }
2887 }
2888 Arc::new(builder.finish()) as ArrayRef
2889 }
2890 DataType::Time64(_) => {
2891 let mut builder = Time64NanosecondBuilder::with_capacity(vids.len());
2892 for val in &values {
2893 match val {
2894 Some(Value::Map(obj)) => {
2895 match obj.get(field_name).and_then(|v| v.as_i64()) {
2896 Some(n) => builder.append_value(n),
2897 None => builder.append_null(),
2898 }
2899 }
2900 _ => builder.append_null(),
2901 }
2902 }
2903 Arc::new(builder.finish()) as ArrayRef
2904 }
2905 _ => {
2907 let mut builder = StringBuilder::with_capacity(vids.len(), vids.len() * 16);
2908 for val in &values {
2909 match val {
2910 Some(Value::Map(obj)) => match obj.get(field_name) {
2911 Some(Value::Null) | None => builder.append_null(),
2912 Some(other) => builder.append_value(format!("{other:?}")),
2913 },
2914 _ => builder.append_null(),
2915 }
2916 }
2917 Arc::new(builder.finish()) as ArrayRef
2918 }
2919 }
2920 })
2921 .collect();
2922
2923 let nulls: Vec<bool> = values
2925 .iter()
2926 .map(|v| matches!(v, Some(Value::Map(_))))
2927 .collect();
2928
2929 let struct_array = StructArray::try_new(
2930 fields.clone(),
2931 child_arrays,
2932 Some(arrow::buffer::NullBuffer::from(nulls)),
2933 )
2934 .map_err(|e| datafusion::common::DataFusionError::ArrowError(Box::new(e), None))?;
2935
2936 Ok(Arc::new(struct_array))
2937}
2938
2939impl Stream for GraphScanStream {
2940 type Item = DFResult<RecordBatch>;
2941
2942 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2943 loop {
2944 let state = std::mem::replace(&mut self.state, GraphScanState::Done);
2946
2947 match state {
2948 GraphScanState::Init => {
2949 let graph_ctx = self.graph_ctx.clone();
2951 let label = self.label.clone();
2952 let variable = self.variable.clone();
2953 let properties = self.properties.clone();
2954 let is_edge_scan = self.is_edge_scan;
2955 let is_schemaless = self.is_schemaless;
2956 let schema = self.schema.clone();
2957
2958 let fut = async move {
2959 graph_ctx.check_timeout().map_err(|e| {
2960 datafusion::error::DataFusionError::Execution(e.to_string())
2961 })?;
2962
2963 let batch = if is_edge_scan {
2964 columnar_scan_edge_batch_static(
2965 &graph_ctx,
2966 &label,
2967 &variable,
2968 &properties,
2969 &schema,
2970 )
2971 .await?
2972 } else if is_schemaless {
2973 columnar_scan_schemaless_vertex_batch_static(
2974 &graph_ctx,
2975 &label,
2976 &variable,
2977 &properties,
2978 &schema,
2979 )
2980 .await?
2981 } else {
2982 columnar_scan_vertex_batch_static(
2983 &graph_ctx,
2984 &label,
2985 &variable,
2986 &properties,
2987 &schema,
2988 )
2989 .await?
2990 };
2991 Ok(Some(batch))
2992 };
2993
2994 self.state = GraphScanState::Executing(Box::pin(fut));
2995 }
2997 GraphScanState::Executing(mut fut) => match fut.as_mut().poll(cx) {
2998 Poll::Ready(Ok(batch)) => {
2999 self.state = GraphScanState::Done;
3000 self.metrics
3001 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
3002 return Poll::Ready(batch.map(Ok));
3003 }
3004 Poll::Ready(Err(e)) => {
3005 self.state = GraphScanState::Done;
3006 return Poll::Ready(Some(Err(e)));
3007 }
3008 Poll::Pending => {
3009 self.state = GraphScanState::Executing(fut);
3010 return Poll::Pending;
3011 }
3012 },
3013 GraphScanState::Done => {
3014 return Poll::Ready(None);
3015 }
3016 }
3017 }
3018 }
3019}
3020
3021impl RecordBatchStream for GraphScanStream {
3022 fn schema(&self) -> SchemaRef {
3023 self.schema.clone()
3024 }
3025}
3026
3027#[cfg(test)]
3028mod tests {
3029 use super::*;
3030
3031 #[test]
3032 fn test_build_vertex_schema() {
3033 let uni_schema = UniSchema::default();
3034 let schema = GraphScanExec::build_vertex_schema(
3035 "n",
3036 "Person",
3037 &["name".to_string(), "age".to_string()],
3038 &uni_schema,
3039 );
3040
3041 assert_eq!(schema.fields().len(), 4);
3042 assert_eq!(schema.field(0).name(), "n._vid");
3043 assert_eq!(schema.field(1).name(), "n._labels");
3044 assert_eq!(schema.field(2).name(), "n.name");
3045 assert_eq!(schema.field(3).name(), "n.age");
3046 }
3047
3048 #[test]
3049 fn test_build_edge_schema() {
3050 let uni_schema = UniSchema::default();
3051 let schema =
3052 GraphScanExec::build_edge_schema("r", "KNOWS", &["weight".to_string()], &uni_schema);
3053
3054 assert_eq!(schema.fields().len(), 4);
3055 assert_eq!(schema.field(0).name(), "r._eid");
3056 assert_eq!(schema.field(1).name(), "r._src_vid");
3057 assert_eq!(schema.field(2).name(), "r._dst_vid");
3058 assert_eq!(schema.field(3).name(), "r.weight");
3059 }
3060
3061 #[test]
3062 fn test_build_schemaless_vertex_schema() {
3063 let schema = GraphScanExec::build_schemaless_vertex_schema(
3064 "n",
3065 &["name".to_string(), "age".to_string()],
3066 );
3067
3068 assert_eq!(schema.fields().len(), 4);
3069 assert_eq!(schema.field(0).name(), "n._vid");
3070 assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
3071 assert_eq!(schema.field(1).name(), "n._labels");
3072 assert_eq!(schema.field(2).name(), "n.name");
3073 assert_eq!(schema.field(2).data_type(), &DataType::LargeBinary);
3074 assert_eq!(schema.field(3).name(), "n.age");
3075 assert_eq!(schema.field(3).data_type(), &DataType::LargeBinary);
3076 }
3077
3078 #[test]
3079 fn test_schemaless_all_scan_has_empty_label() {
3080 let schema = GraphScanExec::build_schemaless_vertex_schema("n", &[]);
3085
3086 assert_eq!(schema.fields().len(), 2);
3088 assert_eq!(schema.field(0).name(), "n._vid");
3089 assert_eq!(schema.field(1).name(), "n._labels");
3090 }
3091
3092 #[test]
3093 fn test_cypher_value_all_props_extraction() {
3094 let json_obj = serde_json::json!({"age": 30, "name": "Alice"});
3096 let cv_bytes = encode_cypher_value(&json_obj).unwrap();
3097
3098 let decoded = uni_common::cypher_value_codec::decode(&cv_bytes).unwrap();
3100 match decoded {
3101 uni_common::Value::Map(map) => {
3102 let age_val = map.get("age").unwrap();
3103 assert_eq!(age_val, &uni_common::Value::Int(30));
3104 }
3105 _ => panic!("Expected Map"),
3106 }
3107
3108 let single_val = serde_json::json!(30);
3110 let single_bytes = encode_cypher_value(&single_val).unwrap();
3111 let single_decoded = uni_common::cypher_value_codec::decode(&single_bytes).unwrap();
3112 assert_eq!(single_decoded, uni_common::Value::Int(30));
3113 }
3114
3115 fn make_mvcc_batch(vids: &[u64], versions: &[u64], deleted: &[bool]) -> RecordBatch {
3117 let schema = Arc::new(Schema::new(vec![
3118 Field::new("_vid", DataType::UInt64, false),
3119 Field::new("_deleted", DataType::Boolean, false),
3120 Field::new("_version", DataType::UInt64, false),
3121 Field::new("name", DataType::Utf8, true),
3122 ]));
3123 let names: Vec<String> = vids
3125 .iter()
3126 .zip(versions.iter())
3127 .map(|(v, ver)| format!("v{}_ver{}", v, ver))
3128 .collect();
3129 let name_arr: arrow_array::StringArray = names.iter().map(|s| Some(s.as_str())).collect();
3130
3131 RecordBatch::try_new(
3132 schema,
3133 vec![
3134 Arc::new(UInt64Array::from(vids.to_vec())),
3135 Arc::new(arrow_array::BooleanArray::from(deleted.to_vec())),
3136 Arc::new(UInt64Array::from(versions.to_vec())),
3137 Arc::new(name_arr),
3138 ],
3139 )
3140 .unwrap()
3141 }
3142
3143 #[test]
3144 fn test_mvcc_dedup_multiple_versions() {
3145 let batch = make_mvcc_batch(
3148 &[1, 1, 1, 2, 2],
3149 &[3, 1, 5, 2, 4],
3150 &[false, false, false, false, false],
3151 );
3152
3153 let result = mvcc_dedup_batch(&batch).unwrap();
3154 assert_eq!(result.num_rows(), 2);
3155
3156 let vid_col = result
3157 .column_by_name("_vid")
3158 .unwrap()
3159 .as_any()
3160 .downcast_ref::<UInt64Array>()
3161 .unwrap();
3162 let ver_col = result
3163 .column_by_name("_version")
3164 .unwrap()
3165 .as_any()
3166 .downcast_ref::<UInt64Array>()
3167 .unwrap();
3168 let name_col = result
3169 .column_by_name("name")
3170 .unwrap()
3171 .as_any()
3172 .downcast_ref::<arrow_array::StringArray>()
3173 .unwrap();
3174
3175 assert_eq!(vid_col.value(0), 1);
3177 assert_eq!(ver_col.value(0), 5);
3178 assert_eq!(name_col.value(0), "v1_ver5");
3179
3180 assert_eq!(vid_col.value(1), 2);
3181 assert_eq!(ver_col.value(1), 4);
3182 assert_eq!(name_col.value(1), "v2_ver4");
3183 }
3184
3185 #[test]
3186 fn test_mvcc_dedup_single_rows() {
3187 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3189 let result = mvcc_dedup_batch(&batch).unwrap();
3190 assert_eq!(result.num_rows(), 3);
3191 }
3192
3193 #[test]
3194 fn test_mvcc_dedup_empty() {
3195 let batch = make_mvcc_batch(&[], &[], &[]);
3196 let result = mvcc_dedup_batch(&batch).unwrap();
3197 assert_eq!(result.num_rows(), 0);
3198 }
3199
3200 #[test]
3201 fn test_filter_l0_tombstones_removes_tombstoned() {
3202 use crate::query::df_graph::L0Context;
3203
3204 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3206
3207 let l0 = uni_store::runtime::l0::L0Buffer::new(1, None);
3209 {
3210 }
3214 let l0_buf = std::sync::Arc::new(parking_lot::RwLock::new(l0));
3215 l0_buf.write().vertex_tombstones.insert(Vid::from(2u64));
3216
3217 let l0_ctx = L0Context {
3218 current_l0: Some(l0_buf),
3219 transaction_l0: None,
3220 pending_flush_l0s: vec![],
3221 };
3222
3223 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3224 assert_eq!(result.num_rows(), 2);
3225
3226 let vid_col = result
3227 .column_by_name("_vid")
3228 .unwrap()
3229 .as_any()
3230 .downcast_ref::<UInt64Array>()
3231 .unwrap();
3232 assert_eq!(vid_col.value(0), 1);
3233 assert_eq!(vid_col.value(1), 3);
3234 }
3235
3236 #[test]
3237 fn test_filter_l0_tombstones_none() {
3238 use crate::query::df_graph::L0Context;
3239
3240 let batch = make_mvcc_batch(&[1, 2, 3], &[1, 1, 1], &[false, false, false]);
3241 let l0_ctx = L0Context::default();
3242
3243 let result = filter_l0_tombstones(&batch, &l0_ctx).unwrap();
3244 assert_eq!(result.num_rows(), 3);
3245 }
3246
3247 #[test]
3248 fn test_map_to_output_schema_basic() {
3249 use crate::query::df_graph::L0Context;
3250
3251 let lance_schema = Arc::new(Schema::new(vec![
3253 Field::new("_vid", DataType::UInt64, false),
3254 Field::new("_deleted", DataType::Boolean, false),
3255 Field::new("_version", DataType::UInt64, false),
3256 Field::new("name", DataType::Utf8, true),
3257 ]));
3258 let name_arr: arrow_array::StringArray =
3259 vec![Some("Alice"), Some("Bob")].into_iter().collect();
3260 let batch = RecordBatch::try_new(
3261 lance_schema,
3262 vec![
3263 Arc::new(UInt64Array::from(vec![1u64, 2])),
3264 Arc::new(arrow_array::BooleanArray::from(vec![false, false])),
3265 Arc::new(UInt64Array::from(vec![1u64, 1])),
3266 Arc::new(name_arr),
3267 ],
3268 )
3269 .unwrap();
3270
3271 let output_schema = Arc::new(Schema::new(vec![
3273 Field::new("n._vid", DataType::UInt64, false),
3274 Field::new("n._labels", labels_data_type(), true),
3275 Field::new("n.name", DataType::Utf8, true),
3276 ]));
3277
3278 let l0_ctx = L0Context::default();
3279 let result = map_to_output_schema(
3280 &batch,
3281 "Person",
3282 "n",
3283 &["name".to_string()],
3284 &output_schema,
3285 &l0_ctx,
3286 )
3287 .unwrap();
3288
3289 assert_eq!(result.num_rows(), 2);
3290 assert_eq!(result.schema().fields().len(), 3);
3291 assert_eq!(result.schema().field(0).name(), "n._vid");
3292 assert_eq!(result.schema().field(1).name(), "n._labels");
3293 assert_eq!(result.schema().field(2).name(), "n.name");
3294
3295 let name_col = result
3297 .column(2)
3298 .as_any()
3299 .downcast_ref::<arrow_array::StringArray>()
3300 .unwrap();
3301 assert_eq!(name_col.value(0), "Alice");
3302 assert_eq!(name_col.value(1), "Bob");
3303 }
3304}