1use std::any::Any;
48use std::collections::HashMap;
49use std::fmt;
50use std::pin::Pin;
51use std::sync::Arc;
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::task::{Context, Poll};
54
55use arrow_array::builder::ListBuilder;
56use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
57use arrow_schema::{DataType, Field, Schema, SchemaRef};
58use datafusion::common::Result as DFResult;
59use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
60use datafusion::logical_expr::Expr as DfExpr;
61use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
62use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
63use futures::Stream;
64use uni_plugin::traits::catalog::CatalogTable;
65
66use crate::query::df_graph::common::{compute_plan_properties, labels_data_type};
67
68#[inline]
70fn virtual_vid_base(virtual_label_id: u16) -> u64 {
71 (virtual_label_id as u64) << 48
72}
73
74fn check_no_reserved_columns(schema: &SchemaRef) -> Result<(), String> {
77 for field in schema.fields() {
78 if field.name().starts_with('_') {
79 return Err(field.name().clone());
80 }
81 }
82 Ok(())
83}
84
85pub struct CatalogVertexScanExec {
90 table: Arc<dyn CatalogTable>,
91 virtual_label_id: u16,
92 label_name: String,
93 variable: String,
94 properties: Vec<String>,
98 pushdown_filters: Vec<DfExpr>,
102 pushdown_limit: Option<usize>,
104 schema: SchemaRef,
106 properties_plan: Arc<PlanProperties>,
107 metrics: ExecutionPlanMetricsSet,
108}
109
110impl fmt::Debug for CatalogVertexScanExec {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("CatalogVertexScanExec")
113 .field("label_name", &self.label_name)
114 .field(
115 "virtual_label_id",
116 &format_args!("{:#x}", self.virtual_label_id),
117 )
118 .field("variable", &self.variable)
119 .field("properties", &self.properties)
120 .field("pushdown_filters", &self.pushdown_filters.len())
121 .field("pushdown_limit", &self.pushdown_limit)
122 .finish()
123 }
124}
125
126impl CatalogVertexScanExec {
127 pub fn try_new(
135 table: Arc<dyn CatalogTable>,
136 virtual_label_id: u16,
137 label_name: impl Into<String>,
138 variable: impl Into<String>,
139 properties: Vec<String>,
140 pushdown_filters: Vec<DfExpr>,
141 pushdown_limit: Option<usize>,
142 ) -> anyhow::Result<Self> {
143 let label_name = label_name.into();
144 let variable = variable.into();
145 let table_schema = table.schema();
146 if let Err(bad) = check_no_reserved_columns(&table_schema) {
147 return Err(anyhow::anyhow!(
148 "CatalogTable for label `{label_name}` declares reserved column \
149 `{bad}` (names starting with `_` are synthesized by the graph-row \
150 adapter — rename it in the underlying table)"
151 ));
152 }
153 let schema = Self::build_output_schema(&variable, &properties, &table_schema);
154 let properties_plan = compute_plan_properties(schema.clone());
155 Ok(Self {
156 table,
157 virtual_label_id,
158 label_name,
159 variable,
160 properties,
161 pushdown_filters,
162 pushdown_limit,
163 schema,
164 properties_plan,
165 metrics: ExecutionPlanMetricsSet::new(),
166 })
167 }
168
169 fn build_output_schema(
170 variable: &str,
171 properties: &[String],
172 table_schema: &SchemaRef,
173 ) -> SchemaRef {
174 let mut fields = vec![
175 Field::new(format!("{variable}._vid"), DataType::UInt64, false),
176 Field::new(format!("{variable}._labels"), labels_data_type(), false),
177 ];
178 let table_by_name: HashMap<&str, &Field> = table_schema
179 .fields()
180 .iter()
181 .map(|f| (f.name().as_str(), f.as_ref()))
182 .collect();
183 for prop in properties {
184 let col_name = format!("{variable}.{prop}");
185 let (dtype, nullable) = match table_by_name.get(prop.as_str()) {
186 Some(f) => (f.data_type().clone(), true),
187 None => (DataType::Utf8, true),
188 };
189 fields.push(Field::new(&col_name, dtype, nullable));
190 }
191 Arc::new(Schema::new(fields))
192 }
193}
194
195impl DisplayAs for CatalogVertexScanExec {
196 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 write!(
198 f,
199 "CatalogVertexScanExec: label={}, virtual_id={:#x}, variable={}, props={:?}",
200 self.label_name, self.virtual_label_id, self.variable, self.properties
201 )?;
202 if !self.pushdown_filters.is_empty() {
203 write!(f, ", filters={}", self.pushdown_filters.len())?;
204 }
205 if let Some(lim) = self.pushdown_limit {
206 write!(f, ", limit={lim}")?;
207 }
208 Ok(())
209 }
210}
211
212impl ExecutionPlan for CatalogVertexScanExec {
213 fn name(&self) -> &str {
214 "CatalogVertexScanExec"
215 }
216
217 fn as_any(&self) -> &dyn Any {
218 self
219 }
220
221 fn schema(&self) -> SchemaRef {
222 self.schema.clone()
223 }
224
225 fn properties(&self) -> &Arc<PlanProperties> {
226 &self.properties_plan
227 }
228
229 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
230 vec![]
231 }
232
233 fn with_new_children(
234 self: Arc<Self>,
235 children: Vec<Arc<dyn ExecutionPlan>>,
236 ) -> DFResult<Arc<dyn ExecutionPlan>> {
237 if !children.is_empty() {
238 return Err(datafusion::error::DataFusionError::Plan(
239 "CatalogVertexScanExec has no children".into(),
240 ));
241 }
242 Ok(self)
243 }
244
245 fn execute(
246 &self,
247 partition: usize,
248 _context: Arc<TaskContext>,
249 ) -> DFResult<SendableRecordBatchStream> {
250 let metrics = BaselineMetrics::new(&self.metrics, partition);
251 let table_schema = self.table.schema();
256 let projection: Vec<usize> = self
257 .properties
258 .iter()
259 .filter_map(|p| table_schema.index_of(p).ok())
260 .collect();
261 let projection_opt = if projection.is_empty() {
262 None
263 } else {
264 Some(projection.as_slice())
265 };
266 let stream = self
267 .table
268 .scan(projection_opt, &self.pushdown_filters, self.pushdown_limit)
269 .map_err(|e| {
270 datafusion::error::DataFusionError::Execution(format!(
271 "CatalogTable::scan failed: {e}"
272 ))
273 })?;
274 Ok(Box::pin(VertexAdapterStream {
275 inner: stream,
276 output_schema: self.schema.clone(),
277 virtual_label_id: self.virtual_label_id,
278 label_name: self.label_name.clone(),
279 variable: self.variable.clone(),
280 properties: self.properties.clone(),
281 next_offset: AtomicU64::new(0),
282 metrics,
283 }))
284 }
285
286 fn metrics(&self) -> Option<MetricsSet> {
287 Some(self.metrics.clone_inner())
288 }
289}
290
291struct VertexAdapterStream {
292 inner: SendableRecordBatchStream,
293 output_schema: SchemaRef,
294 virtual_label_id: u16,
295 label_name: String,
296 variable: String,
297 properties: Vec<String>,
298 next_offset: AtomicU64,
299 metrics: BaselineMetrics,
300}
301
302impl RecordBatchStream for VertexAdapterStream {
303 fn schema(&self) -> SchemaRef {
304 self.output_schema.clone()
305 }
306}
307
308impl Stream for VertexAdapterStream {
309 type Item = DFResult<RecordBatch>;
310
311 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 match Pin::new(&mut self.inner).poll_next(cx) {
313 Poll::Pending => Poll::Pending,
314 Poll::Ready(None) => Poll::Ready(None),
315 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
316 Poll::Ready(Some(Ok(batch))) => {
317 let row_count = batch.num_rows();
318 let base = virtual_vid_base(self.virtual_label_id)
319 | self
320 .next_offset
321 .fetch_add(row_count as u64, Ordering::SeqCst);
322 let adapted = adapt_vertex_batch(
323 &batch,
324 &self.output_schema,
325 base,
326 &self.label_name,
327 &self.variable,
328 &self.properties,
329 );
330 self.metrics.record_output(row_count);
331 Poll::Ready(Some(adapted))
332 }
333 }
334 }
335}
336
337fn adapt_vertex_batch(
340 in_batch: &RecordBatch,
341 output_schema: &SchemaRef,
342 vid_start: u64,
343 label_name: &str,
344 variable: &str,
345 properties: &[String],
346) -> DFResult<RecordBatch> {
347 let n = in_batch.num_rows();
348 let vid_array: ArrayRef = Arc::new(UInt64Array::from_iter_values(
349 (0..n as u64).map(|i| vid_start + i),
350 ));
351 let labels_array: ArrayRef = {
352 let mut b = ListBuilder::new(arrow_array::builder::StringBuilder::new());
353 for _ in 0..n {
354 b.values().append_value(label_name);
355 b.append(true);
356 }
357 Arc::new(b.finish())
358 };
359 let in_schema = in_batch.schema();
360 let in_by_name: HashMap<&str, ArrayRef> = in_schema
361 .fields()
362 .iter()
363 .enumerate()
364 .map(|(i, f)| (f.name().as_str(), in_batch.column(i).clone()))
365 .collect();
366 let _ = variable; let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
368 columns.push(vid_array);
369 columns.push(labels_array);
370 for prop in properties {
371 let col = in_by_name
372 .get(prop.as_str())
373 .cloned()
374 .unwrap_or_else(|| Arc::new(StringArray::new_null(n)));
375 columns.push(col);
376 }
377 RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| {
378 datafusion::error::DataFusionError::Execution(format!(
379 "CatalogVertexScanExec: failed to assemble adapted batch: {e}"
380 ))
381 })
382}
383
384pub struct CatalogEdgeScanExec {
390 table: Arc<dyn CatalogTable>,
391 virtual_type_id: u32,
392 type_name: String,
393 variable: String,
394 properties: Vec<String>,
395 pushdown_filters: Vec<DfExpr>,
396 pushdown_limit: Option<usize>,
397 schema: SchemaRef,
398 properties_plan: Arc<PlanProperties>,
399 metrics: ExecutionPlanMetricsSet,
400}
401
402impl fmt::Debug for CatalogEdgeScanExec {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 f.debug_struct("CatalogEdgeScanExec")
405 .field("type_name", &self.type_name)
406 .field(
407 "virtual_type_id",
408 &format_args!("{:#x}", self.virtual_type_id),
409 )
410 .field("variable", &self.variable)
411 .field("properties", &self.properties)
412 .finish()
413 }
414}
415
416impl CatalogEdgeScanExec {
417 pub fn try_new(
421 table: Arc<dyn CatalogTable>,
422 virtual_type_id: u32,
423 type_name: impl Into<String>,
424 variable: impl Into<String>,
425 properties: Vec<String>,
426 pushdown_filters: Vec<DfExpr>,
427 pushdown_limit: Option<usize>,
428 ) -> anyhow::Result<Self> {
429 let type_name = type_name.into();
430 let variable = variable.into();
431 let table_schema = table.schema();
432 if let Err(bad) = check_no_reserved_columns(&table_schema) {
433 return Err(anyhow::anyhow!(
434 "CatalogTable for edge type `{type_name}` declares reserved column \
435 `{bad}` (names starting with `_` are synthesized by the graph-row adapter)"
436 ));
437 }
438 for required in ["src_id", "dst_id"] {
439 if table_schema.index_of(required).is_err() {
440 return Err(anyhow::anyhow!(
441 "CatalogTable for edge type `{type_name}` must declare a \
442 `{required}` column (mapped to `_{}_vid` in the graph-row \
443 adapter)",
444 if required == "src_id" { "src" } else { "dst" }
445 ));
446 }
447 }
448 let schema = Self::build_output_schema(&variable, &properties, &table_schema);
449 let properties_plan = compute_plan_properties(schema.clone());
450 Ok(Self {
451 table,
452 virtual_type_id,
453 type_name,
454 variable,
455 properties,
456 pushdown_filters,
457 pushdown_limit,
458 schema,
459 properties_plan,
460 metrics: ExecutionPlanMetricsSet::new(),
461 })
462 }
463
464 fn build_output_schema(
465 variable: &str,
466 properties: &[String],
467 table_schema: &SchemaRef,
468 ) -> SchemaRef {
469 let mut fields = vec![
470 Field::new(format!("{variable}._eid"), DataType::UInt64, false),
471 Field::new(format!("{variable}._src_vid"), DataType::UInt64, false),
472 Field::new(format!("{variable}._dst_vid"), DataType::UInt64, false),
473 ];
474 let table_by_name: HashMap<&str, &Field> = table_schema
475 .fields()
476 .iter()
477 .map(|f| (f.name().as_str(), f.as_ref()))
478 .collect();
479 for prop in properties {
480 if prop == "src_id" || prop == "dst_id" {
481 continue;
484 }
485 let col_name = format!("{variable}.{prop}");
486 let (dtype, nullable) = match table_by_name.get(prop.as_str()) {
487 Some(f) => (f.data_type().clone(), true),
488 None => (DataType::Utf8, true),
489 };
490 fields.push(Field::new(&col_name, dtype, nullable));
491 }
492 Arc::new(Schema::new(fields))
493 }
494}
495
496impl DisplayAs for CatalogEdgeScanExec {
497 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 write!(
499 f,
500 "CatalogEdgeScanExec: type={}, virtual_id={:#x}, variable={}, props={:?}",
501 self.type_name, self.virtual_type_id, self.variable, self.properties
502 )
503 }
504}
505
506impl ExecutionPlan for CatalogEdgeScanExec {
507 fn name(&self) -> &str {
508 "CatalogEdgeScanExec"
509 }
510 fn as_any(&self) -> &dyn Any {
511 self
512 }
513 fn schema(&self) -> SchemaRef {
514 self.schema.clone()
515 }
516 fn properties(&self) -> &Arc<PlanProperties> {
517 &self.properties_plan
518 }
519 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
520 vec![]
521 }
522 fn with_new_children(
523 self: Arc<Self>,
524 children: Vec<Arc<dyn ExecutionPlan>>,
525 ) -> DFResult<Arc<dyn ExecutionPlan>> {
526 if !children.is_empty() {
527 return Err(datafusion::error::DataFusionError::Plan(
528 "CatalogEdgeScanExec has no children".into(),
529 ));
530 }
531 Ok(self)
532 }
533 fn execute(
534 &self,
535 partition: usize,
536 _context: Arc<TaskContext>,
537 ) -> DFResult<SendableRecordBatchStream> {
538 let metrics = BaselineMetrics::new(&self.metrics, partition);
539 let table_schema = self.table.schema();
540 let mut wanted: Vec<&str> = vec!["src_id", "dst_id"];
544 for p in &self.properties {
545 if p != "src_id" && p != "dst_id" {
546 wanted.push(p.as_str());
547 }
548 }
549 let projection: Vec<usize> = wanted
550 .iter()
551 .filter_map(|p| table_schema.index_of(p).ok())
552 .collect();
553 let projection_opt = if projection.is_empty() {
554 None
555 } else {
556 Some(projection.as_slice())
557 };
558 let stream = self
559 .table
560 .scan(projection_opt, &self.pushdown_filters, self.pushdown_limit)
561 .map_err(|e| {
562 datafusion::error::DataFusionError::Execution(format!(
563 "CatalogTable::scan failed: {e}"
564 ))
565 })?;
566 Ok(Box::pin(EdgeAdapterStream {
567 inner: stream,
568 output_schema: self.schema.clone(),
569 virtual_type_id: self.virtual_type_id,
570 variable: self.variable.clone(),
571 properties: self.properties.clone(),
572 next_offset: AtomicU64::new(0),
573 metrics,
574 }))
575 }
576 fn metrics(&self) -> Option<MetricsSet> {
577 Some(self.metrics.clone_inner())
578 }
579}
580
581struct EdgeAdapterStream {
582 inner: SendableRecordBatchStream,
583 output_schema: SchemaRef,
584 virtual_type_id: u32,
585 variable: String,
586 properties: Vec<String>,
587 next_offset: AtomicU64,
588 metrics: BaselineMetrics,
589}
590
591impl RecordBatchStream for EdgeAdapterStream {
592 fn schema(&self) -> SchemaRef {
593 self.output_schema.clone()
594 }
595}
596
597impl Stream for EdgeAdapterStream {
598 type Item = DFResult<RecordBatch>;
599 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
600 match Pin::new(&mut self.inner).poll_next(cx) {
601 Poll::Pending => Poll::Pending,
602 Poll::Ready(None) => Poll::Ready(None),
603 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
604 Poll::Ready(Some(Ok(batch))) => {
605 let row_count = batch.num_rows();
606 let base = ((self.virtual_type_id as u64) << 32)
607 | self
608 .next_offset
609 .fetch_add(row_count as u64, Ordering::SeqCst);
610 let adapted = adapt_edge_batch(
611 &batch,
612 &self.output_schema,
613 base,
614 &self.variable,
615 &self.properties,
616 );
617 self.metrics.record_output(row_count);
618 Poll::Ready(Some(adapted))
619 }
620 }
621 }
622}
623
624fn adapt_edge_batch(
625 in_batch: &RecordBatch,
626 output_schema: &SchemaRef,
627 eid_start: u64,
628 variable: &str,
629 properties: &[String],
630) -> DFResult<RecordBatch> {
631 use arrow_array::cast::AsArray;
632 use arrow_array::types::Int64Type;
633 let n = in_batch.num_rows();
634 let eid: ArrayRef = Arc::new(UInt64Array::from_iter_values(
635 (0..n as u64).map(|i| eid_start + i),
636 ));
637 let in_schema = in_batch.schema();
638 let in_by_name: HashMap<&str, ArrayRef> = in_schema
639 .fields()
640 .iter()
641 .enumerate()
642 .map(|(i, f)| (f.name().as_str(), in_batch.column(i).clone()))
643 .collect();
644 let to_u64 = |arr: &ArrayRef| -> DFResult<ArrayRef> {
645 match arr.data_type() {
646 DataType::UInt64 => Ok(arr.clone()),
647 DataType::Int64 => {
648 let a = arr.as_primitive::<Int64Type>();
649 Ok(Arc::new(UInt64Array::from_iter_values(
650 (0..a.len()).map(|i| a.value(i) as u64),
651 )))
652 }
653 DataType::UInt32 => {
654 let a = arr.as_primitive::<arrow_array::types::UInt32Type>();
655 Ok(Arc::new(UInt64Array::from_iter_values(
656 (0..a.len()).map(|i| u64::from(a.value(i))),
657 )))
658 }
659 other => Err(datafusion::error::DataFusionError::Execution(format!(
660 "CatalogEdgeScanExec: src_id/dst_id must be Int64/UInt64/UInt32, got {other:?}"
661 ))),
662 }
663 };
664 let src_arr = in_by_name.get("src_id").ok_or_else(|| {
665 datafusion::error::DataFusionError::Execution("missing src_id column".into())
666 })?;
667 let dst_arr = in_by_name.get("dst_id").ok_or_else(|| {
668 datafusion::error::DataFusionError::Execution("missing dst_id column".into())
669 })?;
670 let src_vid = to_u64(src_arr)?;
671 let dst_vid = to_u64(dst_arr)?;
672
673 let _ = variable;
674 let mut columns: Vec<ArrayRef> = Vec::with_capacity(output_schema.fields().len());
675 columns.push(eid);
676 columns.push(src_vid);
677 columns.push(dst_vid);
678 for prop in properties {
679 if prop == "src_id" || prop == "dst_id" {
680 continue;
681 }
682 let col = in_by_name
683 .get(prop.as_str())
684 .cloned()
685 .unwrap_or_else(|| Arc::new(StringArray::new_null(n)));
686 columns.push(col);
687 }
688 RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| {
689 datafusion::error::DataFusionError::Execution(format!(
690 "CatalogEdgeScanExec: failed to assemble adapted batch: {e}"
691 ))
692 })
693}