1use arrow_array::builder::{
17 BooleanBuilder, Float32Builder, Float64Builder, Int64Builder, StringBuilder, UInt64Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, Field, Schema, SchemaRef};
21use datafusion::common::Result as DFResult;
22use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
23use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
24use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
25use futures::Stream;
26use std::any::Any;
27use std::collections::HashMap;
28use std::fmt;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32use uni_common::Value;
33use uni_common::core::id::Vid;
34use uni_common::core::schema::DistanceMetric;
35use uni_cypher::ast::Expr;
36
37use crate::query::df_graph::GraphExecutionContext;
38use crate::query::df_graph::common::{
39 arrow_err, calculate_score, compute_plan_properties, evaluate_simple_expr, labels_data_type,
40};
41use crate::query::df_graph::scan::resolve_property_type;
42
43pub(crate) fn map_yield_to_canonical(yield_name: &str) -> String {
50 match yield_name.to_lowercase().as_str() {
51 "vid" | "_vid" => "vid",
52 "distance" | "dist" | "_distance" => "distance",
53 "score" | "_score" => "score",
54 "vector_score" => "vector_score",
55 "fts_score" => "fts_score",
56 "raw_score" => "raw_score",
57 _ => "node",
58 }
59 .to_string()
60}
61
62pub struct GraphProcedureCallExec {
67 graph_ctx: Arc<GraphExecutionContext>,
69
70 procedure_name: String,
72
73 arguments: Vec<Expr>,
75
76 yield_items: Vec<(String, Option<String>)>,
78
79 params: HashMap<String, Value>,
81
82 outer_values: HashMap<String, Value>,
84
85 target_properties: HashMap<String, Vec<String>>,
87
88 schema: SchemaRef,
90
91 properties: PlanProperties,
93
94 metrics: ExecutionPlanMetricsSet,
96}
97
98impl fmt::Debug for GraphProcedureCallExec {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("GraphProcedureCallExec")
101 .field("procedure_name", &self.procedure_name)
102 .field("yield_items", &self.yield_items)
103 .finish()
104 }
105}
106
107impl GraphProcedureCallExec {
108 pub fn new(
110 graph_ctx: Arc<GraphExecutionContext>,
111 procedure_name: String,
112 arguments: Vec<Expr>,
113 yield_items: Vec<(String, Option<String>)>,
114 params: HashMap<String, Value>,
115 outer_values: HashMap<String, Value>,
116 target_properties: HashMap<String, Vec<String>>,
117 ) -> Self {
118 let schema = Self::build_schema(
119 &procedure_name,
120 &yield_items,
121 &target_properties,
122 &graph_ctx,
123 );
124 let properties = compute_plan_properties(schema.clone());
125
126 Self {
127 graph_ctx,
128 procedure_name,
129 arguments,
130 yield_items,
131 params,
132 outer_values,
133 target_properties,
134 schema,
135 properties,
136 metrics: ExecutionPlanMetricsSet::new(),
137 }
138 }
139
140 fn build_schema(
142 procedure_name: &str,
143 yield_items: &[(String, Option<String>)],
144 target_properties: &HashMap<String, Vec<String>>,
145 graph_ctx: &GraphExecutionContext,
146 ) -> SchemaRef {
147 let mut fields = Vec::new();
148
149 match procedure_name {
150 "uni.schema.labels" => {
151 for (name, alias) in yield_items {
153 let col_name = alias.as_ref().unwrap_or(name);
154 let data_type = match name.as_str() {
155 "label" => DataType::Utf8,
156 "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
157 _ => DataType::Utf8,
158 };
159 fields.push(Field::new(col_name, data_type, true));
160 }
161 }
162 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
163 for (name, alias) in yield_items {
164 let col_name = alias.as_ref().unwrap_or(name);
165 let data_type = match name.as_str() {
166 "type" | "relationshipType" => DataType::Utf8,
167 "propertyCount" => DataType::Int64,
168 "sourceLabels" | "targetLabels" => DataType::Utf8, _ => DataType::Utf8,
170 };
171 fields.push(Field::new(col_name, data_type, true));
172 }
173 }
174 "uni.schema.indexes" => {
175 for (name, alias) in yield_items {
176 let col_name = alias.as_ref().unwrap_or(name);
177 let data_type = match name.as_str() {
178 "name" | "type" | "label" | "state" | "properties" => DataType::Utf8,
179 _ => DataType::Utf8,
180 };
181 fields.push(Field::new(col_name, data_type, true));
182 }
183 }
184 "uni.schema.constraints" => {
185 for (name, alias) in yield_items {
186 let col_name = alias.as_ref().unwrap_or(name);
187 let data_type = match name.as_str() {
188 "enabled" => DataType::Boolean,
189 _ => DataType::Utf8,
190 };
191 fields.push(Field::new(col_name, data_type, true));
192 }
193 }
194 "uni.schema.labelInfo" => {
195 for (name, alias) in yield_items {
196 let col_name = alias.as_ref().unwrap_or(name);
197 let data_type = match name.as_str() {
198 "property" | "dataType" => DataType::Utf8,
199 "nullable" | "indexed" | "unique" => DataType::Boolean,
200 _ => DataType::Utf8,
201 };
202 fields.push(Field::new(col_name, data_type, true));
203 }
204 }
205 "uni.vector.query" | "uni.fts.query" | "uni.search" => {
206 for (name, alias) in yield_items {
208 let output_name = alias.as_ref().unwrap_or(name);
209 let canonical = map_yield_to_canonical(name);
210
211 match canonical.as_str() {
212 "node" => {
213 fields.push(Field::new(
215 format!("{}._vid", output_name),
216 DataType::UInt64,
217 false,
218 ));
219 fields.push(Field::new(output_name, DataType::Utf8, false));
220 fields.push(Field::new(
221 format!("{}._labels", output_name),
222 labels_data_type(),
223 true,
224 ));
225
226 if let Some(props) = target_properties.get(output_name.as_str()) {
228 let uni_schema = graph_ctx.storage().schema_manager().schema();
229 for prop_name in props {
232 let col_name = format!("{}.{}", output_name, prop_name);
233 let arrow_type = resolve_property_type(prop_name, None);
234 let resolved_type = uni_schema
236 .properties
237 .values()
238 .find_map(|label_props| {
239 label_props.get(prop_name.as_str()).map(|_| {
240 resolve_property_type(prop_name, Some(label_props))
241 })
242 })
243 .unwrap_or(arrow_type);
244 fields.push(Field::new(&col_name, resolved_type, true));
245 }
246 }
247 }
248 "distance" => {
249 fields.push(Field::new(output_name, DataType::Float64, true));
250 }
251 "score" | "vector_score" | "fts_score" | "raw_score" => {
252 fields.push(Field::new(output_name, DataType::Float32, true));
253 }
254 "vid" => {
255 fields.push(Field::new(output_name, DataType::Int64, true));
256 }
257 _ => {
258 fields.push(Field::new(output_name, DataType::Utf8, true));
259 }
260 }
261 }
262 }
263 name if name.starts_with("uni.algo.") => {
264 if let Some(registry) = graph_ctx.algo_registry()
265 && let Some(procedure) = registry.get(name)
266 {
267 let sig = procedure.signature();
268 for (yield_name, alias) in yield_items {
269 let col_name = alias.as_ref().unwrap_or(yield_name);
270 let yield_vt = sig.yields.iter().find(|(n, _)| *n == yield_name.as_str());
271 let data_type = yield_vt
272 .map(|(_, vt)| value_type_to_arrow(vt))
273 .unwrap_or(DataType::Utf8);
274 let mut field = Field::new(col_name, data_type, true);
275 if yield_vt.is_some_and(|(_, vt)| is_complex_value_type(vt)) {
278 let mut metadata = std::collections::HashMap::new();
279 metadata.insert("cv_encoded".to_string(), "true".to_string());
280 field = field.with_metadata(metadata);
281 }
282 fields.push(field);
283 }
284 } else {
285 for (name, alias) in yield_items {
287 let col_name = alias.as_ref().unwrap_or(name);
288 fields.push(Field::new(col_name, DataType::Utf8, true));
289 }
290 }
291 }
292 _ => {
293 if let Some(registry) = graph_ctx.procedure_registry()
295 && let Some(proc_def) = registry.get(procedure_name)
296 {
297 for (name, alias) in yield_items {
298 let col_name = alias.as_ref().unwrap_or(name);
299 let data_type = proc_def
301 .outputs
302 .iter()
303 .find(|o| o.name == *name)
304 .map(|o| procedure_value_type_to_arrow(&o.output_type))
305 .unwrap_or(DataType::Utf8);
306 fields.push(Field::new(col_name, data_type, true));
307 }
308 } else if yield_items.is_empty() {
309 } else {
311 for (name, alias) in yield_items {
313 let col_name = alias.as_ref().unwrap_or(name);
314 fields.push(Field::new(col_name, DataType::Utf8, true));
315 }
316 }
317 }
318 }
319
320 Arc::new(Schema::new(fields))
321 }
322}
323
324fn value_type_to_arrow(vt: &uni_algo::algo::procedures::ValueType) -> DataType {
326 use uni_algo::algo::procedures::ValueType;
327 match vt {
328 ValueType::Int => DataType::Int64,
329 ValueType::Float => DataType::Float64,
330 ValueType::String => DataType::Utf8,
331 ValueType::Bool => DataType::Boolean,
332 ValueType::List
333 | ValueType::Map
334 | ValueType::Node
335 | ValueType::Relationship
336 | ValueType::Path
337 | ValueType::Any => DataType::Utf8,
338 }
339}
340
341fn is_complex_value_type(vt: &uni_algo::algo::procedures::ValueType) -> bool {
344 use uni_algo::algo::procedures::ValueType;
345 matches!(
346 vt,
347 ValueType::List
348 | ValueType::Map
349 | ValueType::Node
350 | ValueType::Relationship
351 | ValueType::Path
352 )
353}
354
355fn procedure_value_type_to_arrow(
357 vt: &crate::query::executor::procedure::ProcedureValueType,
358) -> DataType {
359 use crate::query::executor::procedure::ProcedureValueType;
360 match vt {
361 ProcedureValueType::Integer => DataType::Int64,
362 ProcedureValueType::Float | ProcedureValueType::Number => DataType::Float64,
363 ProcedureValueType::Boolean => DataType::Boolean,
364 ProcedureValueType::String | ProcedureValueType::Any => DataType::Utf8,
365 }
366}
367
368impl DisplayAs for GraphProcedureCallExec {
369 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 write!(
371 f,
372 "GraphProcedureCallExec: procedure={}",
373 self.procedure_name
374 )
375 }
376}
377
378impl ExecutionPlan for GraphProcedureCallExec {
379 fn name(&self) -> &str {
380 "GraphProcedureCallExec"
381 }
382
383 fn as_any(&self) -> &dyn Any {
384 self
385 }
386
387 fn schema(&self) -> SchemaRef {
388 self.schema.clone()
389 }
390
391 fn properties(&self) -> &PlanProperties {
392 &self.properties
393 }
394
395 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
396 vec![]
397 }
398
399 fn with_new_children(
400 self: Arc<Self>,
401 children: Vec<Arc<dyn ExecutionPlan>>,
402 ) -> DFResult<Arc<dyn ExecutionPlan>> {
403 if !children.is_empty() {
404 return Err(datafusion::error::DataFusionError::Internal(
405 "GraphProcedureCallExec has no children".to_string(),
406 ));
407 }
408 Ok(self)
409 }
410
411 fn execute(
412 &self,
413 partition: usize,
414 _context: Arc<TaskContext>,
415 ) -> DFResult<SendableRecordBatchStream> {
416 let metrics = BaselineMetrics::new(&self.metrics, partition);
417
418 let mut evaluated_args = Vec::with_capacity(self.arguments.len());
420 for arg in &self.arguments {
421 evaluated_args.push(evaluate_simple_expr(arg, &self.params, &self.outer_values)?);
422 }
423
424 Ok(Box::pin(ProcedureCallStream::new(
425 self.graph_ctx.clone(),
426 self.procedure_name.clone(),
427 evaluated_args,
428 self.yield_items.clone(),
429 self.target_properties.clone(),
430 self.schema.clone(),
431 metrics,
432 )))
433 }
434
435 fn metrics(&self) -> Option<MetricsSet> {
436 Some(self.metrics.clone_inner())
437 }
438}
439
440enum ProcedureCallState {
446 Init,
448 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
450 Done,
452}
453
454struct ProcedureCallStream {
456 graph_ctx: Arc<GraphExecutionContext>,
457 procedure_name: String,
458 evaluated_args: Vec<Value>,
459 yield_items: Vec<(String, Option<String>)>,
460 target_properties: HashMap<String, Vec<String>>,
461 schema: SchemaRef,
462 state: ProcedureCallState,
463 metrics: BaselineMetrics,
464}
465
466impl ProcedureCallStream {
467 fn new(
468 graph_ctx: Arc<GraphExecutionContext>,
469 procedure_name: String,
470 evaluated_args: Vec<Value>,
471 yield_items: Vec<(String, Option<String>)>,
472 target_properties: HashMap<String, Vec<String>>,
473 schema: SchemaRef,
474 metrics: BaselineMetrics,
475 ) -> Self {
476 Self {
477 graph_ctx,
478 procedure_name,
479 evaluated_args,
480 yield_items,
481 target_properties,
482 schema,
483 state: ProcedureCallState::Init,
484 metrics,
485 }
486 }
487}
488
489impl Stream for ProcedureCallStream {
490 type Item = DFResult<RecordBatch>;
491
492 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
493 loop {
494 let state = std::mem::replace(&mut self.state, ProcedureCallState::Done);
495
496 match state {
497 ProcedureCallState::Init => {
498 let graph_ctx = self.graph_ctx.clone();
499 let procedure_name = self.procedure_name.clone();
500 let evaluated_args = self.evaluated_args.clone();
501 let yield_items = self.yield_items.clone();
502 let target_properties = self.target_properties.clone();
503 let schema = self.schema.clone();
504
505 let fut = async move {
506 graph_ctx.check_timeout().map_err(|e| {
507 datafusion::error::DataFusionError::Execution(e.to_string())
508 })?;
509
510 execute_procedure(
511 &graph_ctx,
512 &procedure_name,
513 &evaluated_args,
514 &yield_items,
515 &target_properties,
516 &schema,
517 )
518 .await
519 };
520
521 self.state = ProcedureCallState::Executing(Box::pin(fut));
522 }
523 ProcedureCallState::Executing(mut fut) => match fut.as_mut().poll(cx) {
524 Poll::Ready(Ok(batch)) => {
525 self.state = ProcedureCallState::Done;
526 self.metrics
527 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
528 return Poll::Ready(batch.map(Ok));
529 }
530 Poll::Ready(Err(e)) => {
531 self.state = ProcedureCallState::Done;
532 return Poll::Ready(Some(Err(e)));
533 }
534 Poll::Pending => {
535 self.state = ProcedureCallState::Executing(fut);
536 return Poll::Pending;
537 }
538 },
539 ProcedureCallState::Done => {
540 return Poll::Ready(None);
541 }
542 }
543 }
544 }
545}
546
547impl RecordBatchStream for ProcedureCallStream {
548 fn schema(&self) -> SchemaRef {
549 self.schema.clone()
550 }
551}
552
553async fn execute_procedure(
559 graph_ctx: &GraphExecutionContext,
560 procedure_name: &str,
561 args: &[Value],
562 yield_items: &[(String, Option<String>)],
563 target_properties: &HashMap<String, Vec<String>>,
564 schema: &SchemaRef,
565) -> DFResult<Option<RecordBatch>> {
566 match procedure_name {
567 "uni.schema.labels" => execute_schema_labels(graph_ctx, yield_items, schema).await,
568 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
569 execute_schema_edge_types(graph_ctx, yield_items, schema).await
570 }
571 "uni.schema.indexes" => execute_schema_indexes(graph_ctx, yield_items, schema).await,
572 "uni.schema.constraints" => {
573 execute_schema_constraints(graph_ctx, yield_items, schema).await
574 }
575 "uni.schema.labelInfo" => {
576 execute_schema_label_info(graph_ctx, args, yield_items, schema).await
577 }
578 "uni.vector.query" => {
579 execute_vector_query(graph_ctx, args, yield_items, target_properties, schema).await
580 }
581 "uni.fts.query" => {
582 execute_fts_query(graph_ctx, args, yield_items, target_properties, schema).await
583 }
584 "uni.search" => {
585 execute_hybrid_search(graph_ctx, args, yield_items, target_properties, schema).await
586 }
587 name if name.starts_with("uni.algo.") => {
588 execute_algo_procedure(graph_ctx, name, args, yield_items, schema).await
589 }
590 _ => {
591 execute_registered_procedure(graph_ctx, procedure_name, args, yield_items, schema).await
592 }
593 }
594}
595
596async fn execute_schema_labels(
601 graph_ctx: &GraphExecutionContext,
602 yield_items: &[(String, Option<String>)],
603 schema: &SchemaRef,
604) -> DFResult<Option<RecordBatch>> {
605 let uni_schema = graph_ctx.storage().schema_manager().schema();
606 let storage = graph_ctx.storage();
607
608 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
610 for label_name in uni_schema.labels.keys() {
611 let mut row = HashMap::new();
612 row.insert("label".to_string(), Value::String(label_name.clone()));
613
614 let prop_count = uni_schema
615 .properties
616 .get(label_name)
617 .map(|p| p.len())
618 .unwrap_or(0);
619 row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
620
621 let node_count = if let Ok(ds) = storage.vertex_dataset(label_name) {
622 if let Ok(raw) = ds.open_raw().await {
623 raw.count_rows(None).await.unwrap_or(0)
624 } else {
625 0
626 }
627 } else {
628 0
629 };
630 row.insert("nodeCount".to_string(), Value::Int(node_count as i64));
631
632 let idx_count = uni_schema
633 .indexes
634 .iter()
635 .filter(|i| i.label() == label_name)
636 .count();
637 row.insert("indexCount".to_string(), Value::Int(idx_count as i64));
638
639 rows.push(row);
640 }
641
642 build_scalar_batch(&rows, yield_items, schema)
643}
644
645async fn execute_schema_edge_types(
646 graph_ctx: &GraphExecutionContext,
647 yield_items: &[(String, Option<String>)],
648 schema: &SchemaRef,
649) -> DFResult<Option<RecordBatch>> {
650 let uni_schema = graph_ctx.storage().schema_manager().schema();
651
652 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
653 for (type_name, meta) in &uni_schema.edge_types {
654 let mut row = HashMap::new();
655 row.insert("type".to_string(), Value::String(type_name.clone()));
656 row.insert(
657 "relationshipType".to_string(),
658 Value::String(type_name.clone()),
659 );
660 row.insert(
661 "sourceLabels".to_string(),
662 Value::String(format!("{:?}", meta.src_labels)),
663 );
664 row.insert(
665 "targetLabels".to_string(),
666 Value::String(format!("{:?}", meta.dst_labels)),
667 );
668
669 let prop_count = uni_schema
670 .properties
671 .get(type_name)
672 .map(|p| p.len())
673 .unwrap_or(0);
674 row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
675
676 rows.push(row);
677 }
678
679 build_scalar_batch(&rows, yield_items, schema)
680}
681
682async fn execute_schema_indexes(
683 graph_ctx: &GraphExecutionContext,
684 yield_items: &[(String, Option<String>)],
685 schema: &SchemaRef,
686) -> DFResult<Option<RecordBatch>> {
687 let uni_schema = graph_ctx.storage().schema_manager().schema();
688
689 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
690 for idx in &uni_schema.indexes {
691 use uni_common::core::schema::IndexDefinition;
692
693 let (type_name, properties_json) = match &idx {
695 IndexDefinition::Vector(v) => (
696 "VECTOR",
697 serde_json::to_string(&[&v.property]).unwrap_or_default(),
698 ),
699 IndexDefinition::FullText(f) => (
700 "FULLTEXT",
701 serde_json::to_string(&f.properties).unwrap_or_default(),
702 ),
703 IndexDefinition::Scalar(s) => (
704 "SCALAR",
705 serde_json::to_string(&s.properties).unwrap_or_default(),
706 ),
707 IndexDefinition::JsonFullText(j) => (
708 "JSON_FTS",
709 serde_json::to_string(&[&j.column]).unwrap_or_default(),
710 ),
711 IndexDefinition::Inverted(inv) => (
712 "INVERTED",
713 serde_json::to_string(&[&inv.property]).unwrap_or_default(),
714 ),
715 _ => ("UNKNOWN", String::new()),
716 };
717
718 let row = HashMap::from([
719 ("state".to_string(), Value::String("ONLINE".to_string())),
720 ("name".to_string(), Value::String(idx.name().to_string())),
721 ("type".to_string(), Value::String(type_name.to_string())),
722 ("label".to_string(), Value::String(idx.label().to_string())),
723 ("properties".to_string(), Value::String(properties_json)),
724 ]);
725 rows.push(row);
726 }
727
728 build_scalar_batch(&rows, yield_items, schema)
729}
730
731async fn execute_schema_constraints(
732 graph_ctx: &GraphExecutionContext,
733 yield_items: &[(String, Option<String>)],
734 schema: &SchemaRef,
735) -> DFResult<Option<RecordBatch>> {
736 let uni_schema = graph_ctx.storage().schema_manager().schema();
737
738 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
739 for c in &uni_schema.constraints {
740 let mut row = HashMap::new();
741 row.insert("name".to_string(), Value::String(c.name.clone()));
742 row.insert("enabled".to_string(), Value::Bool(c.enabled));
743
744 match &c.constraint_type {
745 uni_common::core::schema::ConstraintType::Unique { properties } => {
746 row.insert("type".to_string(), Value::String("UNIQUE".to_string()));
747 row.insert(
748 "properties".to_string(),
749 Value::String(serde_json::to_string(&properties).unwrap_or_default()),
750 );
751 }
752 uni_common::core::schema::ConstraintType::Exists { property } => {
753 row.insert("type".to_string(), Value::String("EXISTS".to_string()));
754 row.insert(
755 "properties".to_string(),
756 Value::String(serde_json::to_string(&[&property]).unwrap_or_default()),
757 );
758 }
759 uni_common::core::schema::ConstraintType::Check { expression } => {
760 row.insert("type".to_string(), Value::String("CHECK".to_string()));
761 row.insert("expression".to_string(), Value::String(expression.clone()));
762 }
763 _ => {
764 row.insert("type".to_string(), Value::String("UNKNOWN".to_string()));
765 }
766 }
767
768 match &c.target {
769 uni_common::core::schema::ConstraintTarget::Label(l) => {
770 row.insert("label".to_string(), Value::String(l.clone()));
771 }
772 uni_common::core::schema::ConstraintTarget::EdgeType(t) => {
773 row.insert("relationshipType".to_string(), Value::String(t.clone()));
774 }
775 _ => {
776 row.insert("target".to_string(), Value::String("UNKNOWN".to_string()));
777 }
778 }
779
780 rows.push(row);
781 }
782
783 build_scalar_batch(&rows, yield_items, schema)
784}
785
786async fn execute_schema_label_info(
787 graph_ctx: &GraphExecutionContext,
788 args: &[Value],
789 yield_items: &[(String, Option<String>)],
790 schema: &SchemaRef,
791) -> DFResult<Option<RecordBatch>> {
792 let label_name = require_string_arg(args, 0, "uni.schema.labelInfo: first argument (label)")?;
793
794 let uni_schema = graph_ctx.storage().schema_manager().schema();
795
796 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
797 if let Some(props) = uni_schema.properties.get(&label_name) {
798 for (prop_name, prop_meta) in props {
799 let mut row = HashMap::new();
800 row.insert("property".to_string(), Value::String(prop_name.clone()));
801 row.insert(
802 "dataType".to_string(),
803 Value::String(format!("{:?}", prop_meta.r#type)),
804 );
805 row.insert("nullable".to_string(), Value::Bool(prop_meta.nullable));
806
807 let is_indexed = uni_schema.indexes.iter().any(|idx| match idx {
808 uni_common::core::schema::IndexDefinition::Vector(v) => {
809 v.label == label_name && v.property == *prop_name
810 }
811 uni_common::core::schema::IndexDefinition::Scalar(s) => {
812 s.label == label_name && s.properties.contains(prop_name)
813 }
814 uni_common::core::schema::IndexDefinition::FullText(f) => {
815 f.label == label_name && f.properties.contains(prop_name)
816 }
817 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
818 inv.label == label_name && inv.property == *prop_name
819 }
820 uni_common::core::schema::IndexDefinition::JsonFullText(j) => j.label == label_name,
821 _ => false,
822 });
823 row.insert("indexed".to_string(), Value::Bool(is_indexed));
824
825 let unique = uni_schema.constraints.iter().any(|c| {
826 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
827 && l == &label_name
828 && c.enabled
829 && let uni_common::core::schema::ConstraintType::Unique { properties } =
830 &c.constraint_type
831 {
832 return properties.contains(prop_name);
833 }
834 false
835 });
836 row.insert("unique".to_string(), Value::Bool(unique));
837
838 rows.push(row);
839 }
840 }
841
842 build_scalar_batch(&rows, yield_items, schema)
843}
844
845fn build_typed_column<'a>(
850 values: impl Iterator<Item = Option<&'a Value>>,
851 num_rows: usize,
852 data_type: &DataType,
853) -> ArrayRef {
854 match data_type {
855 DataType::Int64 => {
856 let mut builder = Int64Builder::with_capacity(num_rows);
857 for val in values {
858 match val.and_then(|v| v.as_i64()) {
859 Some(i) => builder.append_value(i),
860 None => builder.append_null(),
861 }
862 }
863 Arc::new(builder.finish())
864 }
865 DataType::Float64 => {
866 let mut builder = Float64Builder::with_capacity(num_rows);
867 for val in values {
868 match val.and_then(|v| v.as_f64()) {
869 Some(f) => builder.append_value(f),
870 None => builder.append_null(),
871 }
872 }
873 Arc::new(builder.finish())
874 }
875 DataType::Boolean => {
876 let mut builder = BooleanBuilder::with_capacity(num_rows);
877 for val in values {
878 match val.and_then(|v| v.as_bool()) {
879 Some(b) => builder.append_value(b),
880 None => builder.append_null(),
881 }
882 }
883 Arc::new(builder.finish())
884 }
885 _ => {
886 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
888 for val in values {
889 match val {
890 Some(Value::String(s)) => builder.append_value(s),
891 Some(v) => builder.append_value(format!("{v}")),
892 None => builder.append_null(),
893 }
894 }
895 Arc::new(builder.finish())
896 }
897 }
898}
899
900fn create_empty_batch(schema: SchemaRef) -> DFResult<RecordBatch> {
906 if schema.fields().is_empty() {
907 let options = arrow_array::RecordBatchOptions::new().with_row_count(Some(0));
908 RecordBatch::try_new_with_options(schema, vec![], &options).map_err(arrow_err)
909 } else {
910 Ok(RecordBatch::new_empty(schema))
911 }
912}
913
914fn build_scalar_batch(
916 rows: &[HashMap<String, Value>],
917 yield_items: &[(String, Option<String>)],
918 schema: &SchemaRef,
919) -> DFResult<Option<RecordBatch>> {
920 if rows.is_empty() {
921 return Ok(Some(create_empty_batch(schema.clone())?));
922 }
923
924 let num_rows = rows.len();
925 let mut columns: Vec<ArrayRef> = Vec::new();
926
927 for (idx, (name, _alias)) in yield_items.iter().enumerate() {
928 let field = schema.field(idx);
929 let values = rows.iter().map(|row| row.get(name));
930 columns.push(build_typed_column(values, num_rows, field.data_type()));
931 }
932
933 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
934 Ok(Some(batch))
935}
936
937async fn execute_registered_procedure(
946 graph_ctx: &GraphExecutionContext,
947 procedure_name: &str,
948 args: &[Value],
949 yield_items: &[(String, Option<String>)],
950 schema: &SchemaRef,
951) -> DFResult<Option<RecordBatch>> {
952 let registry = graph_ctx.procedure_registry().ok_or_else(|| {
953 datafusion::error::DataFusionError::Execution(format!(
954 "Procedure '{}' not supported in DataFusion engine (no procedure registry)",
955 procedure_name
956 ))
957 })?;
958
959 let proc_def = registry.get(procedure_name).ok_or_else(|| {
960 datafusion::error::DataFusionError::Execution(format!(
961 "ProcedureNotFound: Unknown procedure '{}'",
962 procedure_name
963 ))
964 })?;
965
966 if args.len() != proc_def.params.len() {
968 return Err(datafusion::error::DataFusionError::Execution(format!(
969 "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
970 proc_def.name,
971 proc_def.params.len(),
972 args.len()
973 )));
974 }
975
976 for (i, (arg_val, param)) in args.iter().zip(&proc_def.params).enumerate() {
978 if !arg_val.is_null() && !check_proc_type_compatible(arg_val, ¶m.param_type) {
979 return Err(datafusion::error::DataFusionError::Execution(format!(
980 "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
981 i, param.name, proc_def.name
982 )));
983 }
984 }
985
986 let filtered: Vec<&HashMap<String, Value>> = proc_def
988 .data
989 .iter()
990 .filter(|row| {
991 for (param, arg_val) in proc_def.params.iter().zip(args) {
992 if let Some(row_val) = row.get(¶m.name)
993 && !proc_values_match(row_val, arg_val)
994 {
995 return false;
996 }
997 }
998 true
999 })
1000 .collect();
1001
1002 if yield_items.is_empty() {
1004 return Ok(Some(create_empty_batch(schema.clone())?));
1005 }
1006
1007 if filtered.is_empty() {
1008 return Ok(Some(create_empty_batch(schema.clone())?));
1009 }
1010
1011 let num_rows = filtered.len();
1014 let mut columns: Vec<ArrayRef> = Vec::new();
1015
1016 for (idx, (name, _alias)) in yield_items.iter().enumerate() {
1017 let field = schema.field(idx);
1018 let values = filtered.iter().map(|row| row.get(name.as_str()));
1019 columns.push(build_typed_column(values, num_rows, field.data_type()));
1020 }
1021
1022 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1023 Ok(Some(batch))
1024}
1025
1026fn check_proc_type_compatible(
1028 val: &Value,
1029 expected: &crate::query::executor::procedure::ProcedureValueType,
1030) -> bool {
1031 use crate::query::executor::procedure::ProcedureValueType;
1032 match expected {
1033 ProcedureValueType::Any => true,
1034 ProcedureValueType::String => val.is_string(),
1035 ProcedureValueType::Boolean => val.is_bool(),
1036 ProcedureValueType::Integer => val.is_i64(),
1037 ProcedureValueType::Float => val.is_f64() || val.is_i64(),
1038 ProcedureValueType::Number => val.is_number(),
1039 }
1040}
1041
1042fn proc_values_match(row_val: &Value, arg_val: &Value) -> bool {
1044 if arg_val.is_null() || row_val.is_null() {
1045 return arg_val.is_null() && row_val.is_null();
1046 }
1047 if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
1049 return (a - b).abs() < f64::EPSILON;
1050 }
1051 row_val == arg_val
1052}
1053
1054async fn execute_algo_procedure(
1059 graph_ctx: &GraphExecutionContext,
1060 procedure_name: &str,
1061 args: &[Value],
1062 yield_items: &[(String, Option<String>)],
1063 schema: &SchemaRef,
1064) -> DFResult<Option<RecordBatch>> {
1065 use futures::StreamExt;
1066 use uni_algo::algo::procedures::AlgoContext;
1067
1068 let registry = graph_ctx.algo_registry().ok_or_else(|| {
1069 datafusion::error::DataFusionError::Execution(
1070 "Algorithm registry not available".to_string(),
1071 )
1072 })?;
1073
1074 let procedure = registry.get(procedure_name).ok_or_else(|| {
1075 datafusion::error::DataFusionError::Execution(format!(
1076 "Unknown algorithm: {}",
1077 procedure_name
1078 ))
1079 })?;
1080
1081 let signature = procedure.signature();
1082
1083 let serde_args: Vec<serde_json::Value> = args.iter().cloned().map(|v| v.into()).collect();
1087
1088 let l0_mgr = {
1090 let l0_ctx = graph_ctx.l0_context();
1091 l0_ctx.current_l0.as_ref().map(|current| {
1092 let mut pending = l0_ctx.pending_flush_l0s.clone();
1093 if let Some(tx_l0) = &l0_ctx.transaction_l0 {
1094 pending.push(tx_l0.clone());
1095 }
1096 Arc::new(uni_store::runtime::l0_manager::L0Manager::from_snapshot(
1097 current.clone(),
1098 pending,
1099 ))
1100 })
1101 };
1102 let algo_ctx = AlgoContext::new(graph_ctx.storage().clone(), l0_mgr);
1103
1104 let mut stream = procedure.execute(algo_ctx, serde_args);
1106 let mut rows = Vec::new();
1107 while let Some(row_res) = stream.next().await {
1108 if rows.len() % 1000 == 0 {
1110 graph_ctx
1111 .check_timeout()
1112 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1113 }
1114 let row =
1115 row_res.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1116 rows.push(row);
1117 }
1118
1119 build_algo_batch(&rows, &signature, yield_items, schema)
1120}
1121
1122fn json_to_value(jv: &serde_json::Value) -> Value {
1124 match jv {
1125 serde_json::Value::Null => Value::Null,
1126 serde_json::Value::Bool(b) => Value::Bool(*b),
1127 serde_json::Value::Number(n) => {
1128 if let Some(i) = n.as_i64() {
1129 Value::Int(i)
1130 } else if let Some(f) = n.as_f64() {
1131 Value::Float(f)
1132 } else {
1133 Value::Null
1134 }
1135 }
1136 serde_json::Value::String(s) => Value::String(s.clone()),
1137 other => Value::String(other.to_string()),
1138 }
1139}
1140
1141fn build_algo_batch(
1143 rows: &[uni_algo::algo::procedures::AlgoResultRow],
1144 signature: &uni_algo::algo::procedures::ProcedureSignature,
1145 yield_items: &[(String, Option<String>)],
1146 schema: &SchemaRef,
1147) -> DFResult<Option<RecordBatch>> {
1148 if rows.is_empty() {
1149 return Ok(Some(create_empty_batch(schema.clone())?));
1150 }
1151
1152 let num_rows = rows.len();
1153 let mut columns: Vec<ArrayRef> = Vec::new();
1154
1155 for (idx, (yield_name, _alias)) in yield_items.iter().enumerate() {
1156 let sig_idx = signature
1157 .yields
1158 .iter()
1159 .position(|(n, _)| *n == yield_name.as_str());
1160
1161 let uni_values: Vec<Value> = rows
1163 .iter()
1164 .map(|row| match sig_idx {
1165 Some(si) => json_to_value(&row.values[si]),
1166 None => Value::Null,
1167 })
1168 .collect();
1169
1170 let field = schema.field(idx);
1171 let values = uni_values.iter().map(Some);
1172 columns.push(build_typed_column(values, num_rows, field.data_type()));
1173 }
1174
1175 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1176 Ok(Some(batch))
1177}
1178
1179fn require_string_arg(args: &[Value], index: usize, description: &str) -> DFResult<String> {
1185 args.get(index)
1186 .and_then(|v| v.as_str())
1187 .map(|s| s.to_string())
1188 .ok_or_else(|| {
1189 datafusion::error::DataFusionError::Execution(format!("{description} must be a string"))
1190 })
1191}
1192
1193fn extract_optional_filter(args: &[Value], index: usize) -> Option<String> {
1196 args.get(index).and_then(|v| {
1197 if v.is_null() {
1198 None
1199 } else {
1200 v.as_str().map(|s| s.to_string())
1201 }
1202 })
1203}
1204
1205fn extract_optional_threshold(args: &[Value], index: usize) -> Option<f64> {
1208 args.get(index)
1209 .and_then(|v| if v.is_null() { None } else { v.as_f64() })
1210}
1211
1212fn require_int_arg(args: &[Value], index: usize, description: &str) -> DFResult<usize> {
1214 args.get(index)
1215 .and_then(|v| v.as_u64())
1216 .map(|v| v as usize)
1217 .ok_or_else(|| {
1218 datafusion::error::DataFusionError::Execution(format!(
1219 "{description} must be an integer"
1220 ))
1221 })
1222}
1223
1224async fn auto_embed_text(
1233 graph_ctx: &GraphExecutionContext,
1234 label: &str,
1235 property: &str,
1236 query_text: &str,
1237) -> DFResult<Vec<f32>> {
1238 let storage = graph_ctx.storage();
1239 let uni_schema = storage.schema_manager().schema();
1240 let index_config = uni_schema.vector_index_for_property(label, property);
1241
1242 let embedding_config = index_config
1243 .and_then(|cfg| cfg.embedding_config.as_ref())
1244 .ok_or_else(|| {
1245 datafusion::error::DataFusionError::Execution(format!(
1246 "Cannot auto-embed: vector index for {label}.{property} has no embedding_config. \
1247 Either provide a pre-computed vector or create the index with embedding options."
1248 ))
1249 })?;
1250
1251 let runtime = graph_ctx.xervo_runtime().ok_or_else(|| {
1252 datafusion::error::DataFusionError::Execution(
1253 "Cannot auto-embed: Uni-Xervo runtime not configured".to_string(),
1254 )
1255 })?;
1256
1257 let embedder = runtime
1258 .embedding(&embedding_config.alias)
1259 .await
1260 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1261 let embeddings = embedder
1262 .embed(vec![query_text])
1263 .await
1264 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1265 embeddings.into_iter().next().ok_or_else(|| {
1266 datafusion::error::DataFusionError::Execution(
1267 "Embedding service returned no results".to_string(),
1268 )
1269 })
1270}
1271
1272async fn execute_vector_query(
1273 graph_ctx: &GraphExecutionContext,
1274 args: &[Value],
1275 yield_items: &[(String, Option<String>)],
1276 target_properties: &HashMap<String, Vec<String>>,
1277 schema: &SchemaRef,
1278) -> DFResult<Option<RecordBatch>> {
1279 let label = require_string_arg(args, 0, "uni.vector.query: first argument (label)")?;
1280 let property = require_string_arg(args, 1, "uni.vector.query: second argument (property)")?;
1281
1282 let query_val = args.get(2).ok_or_else(|| {
1283 datafusion::error::DataFusionError::Execution(
1284 "uni.vector.query: third argument (query) is required".to_string(),
1285 )
1286 })?;
1287
1288 let storage = graph_ctx.storage();
1289
1290 let query_vector: Vec<f32> = if let Some(query_text) = query_val.as_str() {
1291 auto_embed_text(graph_ctx, &label, &property, query_text).await?
1292 } else {
1293 extract_vector(query_val)?
1294 };
1295
1296 let k = require_int_arg(args, 3, "uni.vector.query: fourth argument (k)")?;
1297 let filter = extract_optional_filter(args, 4);
1298 let threshold = extract_optional_threshold(args, 5);
1299 let query_ctx = graph_ctx.query_context();
1300
1301 let mut results = storage
1302 .vector_search(
1303 &label,
1304 &property,
1305 &query_vector,
1306 k,
1307 filter.as_deref(),
1308 Some(&query_ctx),
1309 )
1310 .await
1311 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1312
1313 if let Some(max_dist) = threshold {
1315 results.retain(|(_, dist)| *dist <= max_dist as f32);
1316 }
1317
1318 if results.is_empty() {
1319 return Ok(Some(create_empty_batch(schema.clone())?));
1320 }
1321
1322 let schema_manager = storage.schema_manager();
1324 let uni_schema = schema_manager.schema();
1325 let metric = uni_schema
1326 .vector_index_for_property(&label, &property)
1327 .map(|config| config.metric.clone())
1328 .unwrap_or(DistanceMetric::L2);
1329
1330 build_search_result_batch(
1331 &results,
1332 &label,
1333 &metric,
1334 yield_items,
1335 target_properties,
1336 graph_ctx,
1337 schema,
1338 )
1339 .await
1340}
1341
1342async fn execute_fts_query(
1347 graph_ctx: &GraphExecutionContext,
1348 args: &[Value],
1349 yield_items: &[(String, Option<String>)],
1350 target_properties: &HashMap<String, Vec<String>>,
1351 schema: &SchemaRef,
1352) -> DFResult<Option<RecordBatch>> {
1353 let label = require_string_arg(args, 0, "uni.fts.query: first argument (label)")?;
1354 let property = require_string_arg(args, 1, "uni.fts.query: second argument (property)")?;
1355 let search_term = require_string_arg(args, 2, "uni.fts.query: third argument (search_term)")?;
1356 let k = require_int_arg(args, 3, "uni.fts.query: fourth argument (k)")?;
1357 let filter = extract_optional_filter(args, 4);
1358 let threshold = extract_optional_threshold(args, 5);
1359
1360 let storage = graph_ctx.storage();
1361 let query_ctx = graph_ctx.query_context();
1362
1363 let mut results = storage
1364 .fts_search(
1365 &label,
1366 &property,
1367 &search_term,
1368 k,
1369 filter.as_deref(),
1370 Some(&query_ctx),
1371 )
1372 .await
1373 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1374
1375 if let Some(min_score) = threshold {
1376 results.retain(|(_, score)| *score as f64 >= min_score);
1377 }
1378
1379 if results.is_empty() {
1380 return Ok(Some(create_empty_batch(schema.clone())?));
1381 }
1382
1383 build_search_result_batch(
1386 &results,
1387 &label,
1388 &DistanceMetric::L2,
1389 yield_items,
1390 target_properties,
1391 graph_ctx,
1392 schema,
1393 )
1394 .await
1395}
1396
1397async fn execute_hybrid_search(
1402 graph_ctx: &GraphExecutionContext,
1403 args: &[Value],
1404 yield_items: &[(String, Option<String>)],
1405 target_properties: &HashMap<String, Vec<String>>,
1406 schema: &SchemaRef,
1407) -> DFResult<Option<RecordBatch>> {
1408 let label = require_string_arg(args, 0, "uni.search: first argument (label)")?;
1409
1410 let properties_val = args.get(1).ok_or_else(|| {
1412 datafusion::error::DataFusionError::Execution(
1413 "uni.search: second argument (properties) is required".to_string(),
1414 )
1415 })?;
1416
1417 let (vector_prop, fts_prop) = if let Some(obj) = properties_val.as_object() {
1418 let vec_prop = obj
1419 .get("vector")
1420 .and_then(|v| v.as_str())
1421 .map(|s| s.to_string());
1422 let fts_prop = obj
1423 .get("fts")
1424 .and_then(|v| v.as_str())
1425 .map(|s| s.to_string());
1426 (vec_prop, fts_prop)
1427 } else if let Some(prop) = properties_val.as_str() {
1428 (Some(prop.to_string()), Some(prop.to_string()))
1430 } else {
1431 return Err(datafusion::error::DataFusionError::Execution(
1432 "Properties must be an object {vector: '...', fts: '...'} or a string".to_string(),
1433 ));
1434 };
1435
1436 let query_text = require_string_arg(args, 2, "uni.search: third argument (query_text)")?;
1437
1438 let query_vector: Option<Vec<f32>> = args.get(3).and_then(|v| {
1440 if v.is_null() {
1441 return None;
1442 }
1443 v.as_array().map(|arr| {
1444 arr.iter()
1445 .filter_map(|v| v.as_f64().map(|f| f as f32))
1446 .collect()
1447 })
1448 });
1449
1450 let k = require_int_arg(args, 4, "uni.search: fifth argument (k)")?;
1451 let filter = extract_optional_filter(args, 5);
1452
1453 let options_val = args.get(6);
1455 let options_map = options_val.and_then(|v| v.as_object());
1456 let fusion_method = options_map
1457 .and_then(|m| m.get("method"))
1458 .and_then(|v| v.as_str())
1459 .unwrap_or("rrf")
1460 .to_string();
1461 let alpha = options_map
1462 .and_then(|m| m.get("alpha"))
1463 .and_then(|v| v.as_f64())
1464 .unwrap_or(0.5) as f32;
1465 let over_fetch_factor = options_map
1466 .and_then(|m| m.get("over_fetch"))
1467 .and_then(|v| v.as_f64())
1468 .unwrap_or(2.0) as f32;
1469 let rrf_k = options_map
1470 .and_then(|m| m.get("rrf_k"))
1471 .and_then(|v| v.as_u64())
1472 .unwrap_or(60) as usize;
1473
1474 let over_fetch_k = (k as f32 * over_fetch_factor).ceil() as usize;
1475
1476 let storage = graph_ctx.storage();
1477 let query_ctx = graph_ctx.query_context();
1478
1479 let mut vector_results: Vec<(Vid, f32)> = Vec::new();
1481 if let Some(ref vec_prop) = vector_prop {
1482 let qvec = if let Some(ref v) = query_vector {
1484 v.clone()
1485 } else {
1486 auto_embed_text(graph_ctx, &label, vec_prop, &query_text)
1488 .await
1489 .unwrap_or_default()
1490 };
1491
1492 if !qvec.is_empty() {
1493 vector_results = storage
1494 .vector_search(
1495 &label,
1496 vec_prop,
1497 &qvec,
1498 over_fetch_k,
1499 filter.as_deref(),
1500 Some(&query_ctx),
1501 )
1502 .await
1503 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1504 }
1505 }
1506
1507 let mut fts_results: Vec<(Vid, f32)> = Vec::new();
1509 if let Some(ref fts_prop) = fts_prop {
1510 fts_results = storage
1511 .fts_search(
1512 &label,
1513 fts_prop,
1514 &query_text,
1515 over_fetch_k,
1516 filter.as_deref(),
1517 Some(&query_ctx),
1518 )
1519 .await
1520 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1521 }
1522
1523 let fused_results = match fusion_method.as_str() {
1525 "weighted" => fuse_weighted(&vector_results, &fts_results, alpha),
1526 _ => fuse_rrf(&vector_results, &fts_results, rrf_k),
1527 };
1528
1529 let final_results: Vec<_> = fused_results.into_iter().take(k).collect();
1531
1532 if final_results.is_empty() {
1533 return Ok(Some(create_empty_batch(schema.clone())?));
1534 }
1535
1536 let vec_score_map: HashMap<Vid, f32> = vector_results.iter().cloned().collect();
1538 let fts_score_map: HashMap<Vid, f32> = fts_results.iter().cloned().collect();
1539 let fts_max = fts_results.iter().map(|(_, s)| *s).fold(0.0f32, f32::max);
1540
1541 let uni_schema = storage.schema_manager().schema();
1543 let metric = vector_prop
1544 .as_ref()
1545 .and_then(|vp| {
1546 uni_schema
1547 .vector_index_for_property(&label, vp)
1548 .map(|config| config.metric.clone())
1549 })
1550 .unwrap_or(DistanceMetric::L2);
1551
1552 let score_ctx = HybridScoreContext {
1553 vec_score_map: &vec_score_map,
1554 fts_score_map: &fts_score_map,
1555 fts_max,
1556 metric: &metric,
1557 };
1558
1559 build_hybrid_search_batch(
1560 &final_results,
1561 &score_ctx,
1562 &label,
1563 yield_items,
1564 target_properties,
1565 graph_ctx,
1566 schema,
1567 )
1568 .await
1569}
1570
1571fn fuse_rrf(vec_results: &[(Vid, f32)], fts_results: &[(Vid, f32)], k: usize) -> Vec<(Vid, f32)> {
1574 crate::query::fusion::fuse_rrf(vec_results, fts_results, k)
1575}
1576
1577fn fuse_weighted(
1580 vec_results: &[(Vid, f32)],
1581 fts_results: &[(Vid, f32)],
1582 alpha: f32,
1583) -> Vec<(Vid, f32)> {
1584 crate::query::fusion::fuse_weighted(vec_results, fts_results, alpha)
1585}
1586
1587struct HybridScoreContext<'a> {
1589 vec_score_map: &'a HashMap<Vid, f32>,
1590 fts_score_map: &'a HashMap<Vid, f32>,
1591 fts_max: f32,
1592 metric: &'a DistanceMetric,
1593}
1594
1595async fn build_hybrid_search_batch(
1597 results: &[(Vid, f32)],
1598 scores: &HybridScoreContext<'_>,
1599 label: &str,
1600 yield_items: &[(String, Option<String>)],
1601 target_properties: &HashMap<String, Vec<String>>,
1602 graph_ctx: &GraphExecutionContext,
1603 schema: &SchemaRef,
1604) -> DFResult<Option<RecordBatch>> {
1605 let num_rows = results.len();
1606 let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1607 let fused_scores: Vec<f32> = results.iter().map(|(_, s)| *s).collect();
1608
1609 let property_manager = graph_ctx.property_manager();
1611 let query_ctx = graph_ctx.query_context();
1612 let uni_schema = graph_ctx.storage().schema_manager().schema();
1613 let label_props = uni_schema.properties.get(label);
1614
1615 let has_node_yield = yield_items
1616 .iter()
1617 .any(|(name, _)| map_yield_to_canonical(name) == "node");
1618
1619 let props_map = if has_node_yield {
1620 property_manager
1621 .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1622 .await
1623 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1624 } else {
1625 HashMap::new()
1626 };
1627
1628 let mut columns: Vec<ArrayRef> = Vec::new();
1629
1630 for (name, alias) in yield_items {
1631 let output_name = alias.as_ref().unwrap_or(name);
1632 let canonical = map_yield_to_canonical(name);
1633
1634 match canonical.as_str() {
1635 "node" => {
1636 columns.extend(build_node_yield_columns(
1637 &vids,
1638 label,
1639 output_name,
1640 target_properties,
1641 &props_map,
1642 label_props,
1643 )?);
1644 }
1645 "vid" => {
1646 let mut builder = Int64Builder::with_capacity(num_rows);
1647 for vid in &vids {
1648 builder.append_value(vid.as_u64() as i64);
1649 }
1650 columns.push(Arc::new(builder.finish()));
1651 }
1652 "score" => {
1653 let mut builder = Float32Builder::with_capacity(num_rows);
1654 for score in &fused_scores {
1655 builder.append_value(*score);
1656 }
1657 columns.push(Arc::new(builder.finish()));
1658 }
1659 "vector_score" => {
1660 let mut builder = Float32Builder::with_capacity(num_rows);
1661 for vid in &vids {
1662 if let Some(&dist) = scores.vec_score_map.get(vid) {
1663 let score = calculate_score(dist, scores.metric);
1664 builder.append_value(score);
1665 } else {
1666 builder.append_null();
1667 }
1668 }
1669 columns.push(Arc::new(builder.finish()));
1670 }
1671 "fts_score" => {
1672 let mut builder = Float32Builder::with_capacity(num_rows);
1673 for vid in &vids {
1674 if let Some(&raw_score) = scores.fts_score_map.get(vid) {
1675 let norm = if scores.fts_max > 0.0 {
1676 raw_score / scores.fts_max
1677 } else {
1678 0.0
1679 };
1680 builder.append_value(norm);
1681 } else {
1682 builder.append_null();
1683 }
1684 }
1685 columns.push(Arc::new(builder.finish()));
1686 }
1687 "distance" => {
1688 let mut builder = Float64Builder::with_capacity(num_rows);
1690 for vid in &vids {
1691 if let Some(&dist) = scores.vec_score_map.get(vid) {
1692 builder.append_value(dist as f64);
1693 } else {
1694 builder.append_null();
1695 }
1696 }
1697 columns.push(Arc::new(builder.finish()));
1698 }
1699 _ => {
1700 let mut builder = StringBuilder::with_capacity(num_rows, 0);
1701 for _ in 0..num_rows {
1702 builder.append_null();
1703 }
1704 columns.push(Arc::new(builder.finish()));
1705 }
1706 }
1707 }
1708
1709 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1710 Ok(Some(batch))
1711}
1712
1713async fn build_search_result_batch(
1720 results: &[(Vid, f32)],
1721 label: &str,
1722 metric: &DistanceMetric,
1723 yield_items: &[(String, Option<String>)],
1724 target_properties: &HashMap<String, Vec<String>>,
1725 graph_ctx: &GraphExecutionContext,
1726 schema: &SchemaRef,
1727) -> DFResult<Option<RecordBatch>> {
1728 let num_rows = results.len();
1729 let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1730 let distances: Vec<f32> = results.iter().map(|(_, d)| *d).collect();
1731
1732 let scores: Vec<f32> = distances
1734 .iter()
1735 .map(|dist| calculate_score(*dist, metric))
1736 .collect();
1737
1738 let property_manager = graph_ctx.property_manager();
1740 let query_ctx = graph_ctx.query_context();
1741 let uni_schema = graph_ctx.storage().schema_manager().schema();
1742 let label_props = uni_schema.properties.get(label);
1743
1744 let has_node_yield = yield_items
1746 .iter()
1747 .any(|(name, _)| map_yield_to_canonical(name) == "node");
1748
1749 let props_map = if has_node_yield {
1750 property_manager
1751 .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1752 .await
1753 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1754 } else {
1755 HashMap::new()
1756 };
1757
1758 let mut columns: Vec<ArrayRef> = Vec::new();
1760
1761 for (name, alias) in yield_items {
1762 let output_name = alias.as_ref().unwrap_or(name);
1763 let canonical = map_yield_to_canonical(name);
1764
1765 match canonical.as_str() {
1766 "node" => {
1767 columns.extend(build_node_yield_columns(
1768 &vids,
1769 label,
1770 output_name,
1771 target_properties,
1772 &props_map,
1773 label_props,
1774 )?);
1775 }
1776 "distance" => {
1777 let mut builder = Float64Builder::with_capacity(num_rows);
1778 for dist in &distances {
1779 builder.append_value(*dist as f64);
1780 }
1781 columns.push(Arc::new(builder.finish()));
1782 }
1783 "score" => {
1784 let mut builder = Float32Builder::with_capacity(num_rows);
1785 for score in &scores {
1786 builder.append_value(*score);
1787 }
1788 columns.push(Arc::new(builder.finish()));
1789 }
1790 "vid" => {
1791 let mut builder = Int64Builder::with_capacity(num_rows);
1792 for vid in &vids {
1793 builder.append_value(vid.as_u64() as i64);
1794 }
1795 columns.push(Arc::new(builder.finish()));
1796 }
1797 _ => {
1798 let mut builder = StringBuilder::with_capacity(num_rows, 0);
1800 for _ in 0..num_rows {
1801 builder.append_null();
1802 }
1803 columns.push(Arc::new(builder.finish()));
1804 }
1805 }
1806 }
1807
1808 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1809 Ok(Some(batch))
1810}
1811
1812fn build_node_yield_columns(
1819 vids: &[Vid],
1820 label: &str,
1821 output_name: &str,
1822 target_properties: &HashMap<String, Vec<String>>,
1823 props_map: &HashMap<Vid, uni_common::Properties>,
1824 label_props: Option<&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>>,
1825) -> DFResult<Vec<ArrayRef>> {
1826 let num_rows = vids.len();
1827 let mut columns = Vec::new();
1828
1829 let mut vid_builder = UInt64Builder::with_capacity(num_rows);
1831 for vid in vids {
1832 vid_builder.append_value(vid.as_u64());
1833 }
1834 columns.push(Arc::new(vid_builder.finish()) as ArrayRef);
1835
1836 let mut var_builder = StringBuilder::with_capacity(num_rows, num_rows * 20);
1838 for vid in vids {
1839 var_builder.append_value(vid.to_string());
1840 }
1841 columns.push(Arc::new(var_builder.finish()) as ArrayRef);
1842
1843 let mut labels_builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
1845 for _ in 0..num_rows {
1846 labels_builder.values().append_value(label);
1847 labels_builder.append(true);
1848 }
1849 columns.push(Arc::new(labels_builder.finish()) as ArrayRef);
1850
1851 if let Some(props) = target_properties.get(output_name) {
1853 for prop_name in props {
1854 let data_type = resolve_property_type(prop_name, label_props);
1855 let column = crate::query::df_graph::scan::build_property_column_static(
1856 vids, props_map, prop_name, &data_type,
1857 )?;
1858 columns.push(column);
1859 }
1860 }
1861
1862 Ok(columns)
1863}
1864
1865fn extract_vector(val: &Value) -> DFResult<Vec<f32>> {
1867 match val {
1868 Value::Vector(vec) => Ok(vec.clone()),
1869 Value::List(arr) => {
1870 let mut vec = Vec::with_capacity(arr.len());
1871 for v in arr {
1872 if let Some(f) = v.as_f64() {
1873 vec.push(f as f32);
1874 } else {
1875 return Err(datafusion::error::DataFusionError::Execution(
1876 "Query vector must contain numbers".to_string(),
1877 ));
1878 }
1879 }
1880 Ok(vec)
1881 }
1882 _ => Err(datafusion::error::DataFusionError::Execution(
1883 "Query vector must be a list or vector".to_string(),
1884 )),
1885 }
1886}