1use arrow_array::builder::{
17 BooleanBuilder, Float32Builder, Float64Builder, Int64Builder, StringBuilder, UInt64Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, Field, Schema, SchemaRef};
21use datafusion::common::Result as DFResult;
22use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
23use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
24use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
25use futures::Stream;
26use std::any::Any;
27use std::collections::HashMap;
28use std::fmt;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32use uni_common::Value;
33use uni_common::core::id::Vid;
34use uni_common::core::schema::DistanceMetric;
35use uni_cypher::ast::Expr;
36
37use crate::query::df_graph::GraphExecutionContext;
38use crate::query::df_graph::common::{
39 arrow_err, calculate_score, compute_plan_properties, evaluate_simple_expr, labels_data_type,
40};
41use crate::query::df_graph::scan::resolve_property_type;
42
43pub(crate) fn map_yield_to_canonical(yield_name: &str) -> String {
50 match yield_name.to_lowercase().as_str() {
51 "vid" | "_vid" => "vid",
52 "distance" | "dist" | "_distance" => "distance",
53 "score" | "_score" => "score",
54 "vector_score" => "vector_score",
55 "fts_score" => "fts_score",
56 "raw_score" => "raw_score",
57 _ => "node",
58 }
59 .to_string()
60}
61
62pub struct GraphProcedureCallExec {
67 graph_ctx: Arc<GraphExecutionContext>,
69
70 procedure_name: String,
72
73 arguments: Vec<Expr>,
75
76 yield_items: Vec<(String, Option<String>)>,
78
79 params: HashMap<String, Value>,
81
82 target_properties: HashMap<String, Vec<String>>,
84
85 schema: SchemaRef,
87
88 properties: PlanProperties,
90
91 metrics: ExecutionPlanMetricsSet,
93}
94
95impl fmt::Debug for GraphProcedureCallExec {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("GraphProcedureCallExec")
98 .field("procedure_name", &self.procedure_name)
99 .field("yield_items", &self.yield_items)
100 .finish()
101 }
102}
103
104impl GraphProcedureCallExec {
105 pub fn new(
107 graph_ctx: Arc<GraphExecutionContext>,
108 procedure_name: String,
109 arguments: Vec<Expr>,
110 yield_items: Vec<(String, Option<String>)>,
111 params: HashMap<String, Value>,
112 target_properties: HashMap<String, Vec<String>>,
113 ) -> Self {
114 let schema = Self::build_schema(
115 &procedure_name,
116 &yield_items,
117 &target_properties,
118 &graph_ctx,
119 );
120 let properties = compute_plan_properties(schema.clone());
121
122 Self {
123 graph_ctx,
124 procedure_name,
125 arguments,
126 yield_items,
127 params,
128 target_properties,
129 schema,
130 properties,
131 metrics: ExecutionPlanMetricsSet::new(),
132 }
133 }
134
135 fn build_schema(
137 procedure_name: &str,
138 yield_items: &[(String, Option<String>)],
139 target_properties: &HashMap<String, Vec<String>>,
140 graph_ctx: &GraphExecutionContext,
141 ) -> SchemaRef {
142 let mut fields = Vec::new();
143
144 match procedure_name {
145 "uni.schema.labels" => {
146 for (name, alias) in yield_items {
148 let col_name = alias.as_ref().unwrap_or(name);
149 let data_type = match name.as_str() {
150 "label" => DataType::Utf8,
151 "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
152 _ => DataType::Utf8,
153 };
154 fields.push(Field::new(col_name, data_type, true));
155 }
156 }
157 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
158 for (name, alias) in yield_items {
159 let col_name = alias.as_ref().unwrap_or(name);
160 let data_type = match name.as_str() {
161 "type" | "relationshipType" => DataType::Utf8,
162 "propertyCount" => DataType::Int64,
163 "sourceLabels" | "targetLabels" => DataType::Utf8, _ => DataType::Utf8,
165 };
166 fields.push(Field::new(col_name, data_type, true));
167 }
168 }
169 "uni.schema.indexes" => {
170 for (name, alias) in yield_items {
171 let col_name = alias.as_ref().unwrap_or(name);
172 let data_type = match name.as_str() {
173 "name" | "type" | "label" | "state" | "properties" => DataType::Utf8,
174 _ => DataType::Utf8,
175 };
176 fields.push(Field::new(col_name, data_type, true));
177 }
178 }
179 "uni.schema.constraints" => {
180 for (name, alias) in yield_items {
181 let col_name = alias.as_ref().unwrap_or(name);
182 let data_type = match name.as_str() {
183 "enabled" => DataType::Boolean,
184 _ => DataType::Utf8,
185 };
186 fields.push(Field::new(col_name, data_type, true));
187 }
188 }
189 "uni.schema.labelInfo" => {
190 for (name, alias) in yield_items {
191 let col_name = alias.as_ref().unwrap_or(name);
192 let data_type = match name.as_str() {
193 "property" | "dataType" => DataType::Utf8,
194 "nullable" | "indexed" | "unique" => DataType::Boolean,
195 _ => DataType::Utf8,
196 };
197 fields.push(Field::new(col_name, data_type, true));
198 }
199 }
200 "uni.vector.query" | "uni.fts.query" | "uni.search" => {
201 for (name, alias) in yield_items {
203 let output_name = alias.as_ref().unwrap_or(name);
204 let canonical = map_yield_to_canonical(name);
205
206 match canonical.as_str() {
207 "node" => {
208 fields.push(Field::new(
210 format!("{}._vid", output_name),
211 DataType::UInt64,
212 false,
213 ));
214 fields.push(Field::new(output_name, DataType::Utf8, false));
215 fields.push(Field::new(
216 format!("{}._labels", output_name),
217 labels_data_type(),
218 true,
219 ));
220
221 if let Some(props) = target_properties.get(output_name.as_str()) {
223 let uni_schema = graph_ctx.storage().schema_manager().schema();
224 for prop_name in props {
227 let col_name = format!("{}.{}", output_name, prop_name);
228 let arrow_type = resolve_property_type(prop_name, None);
229 let resolved_type = uni_schema
231 .properties
232 .values()
233 .find_map(|label_props| {
234 label_props.get(prop_name.as_str()).map(|_| {
235 resolve_property_type(prop_name, Some(label_props))
236 })
237 })
238 .unwrap_or(arrow_type);
239 fields.push(Field::new(&col_name, resolved_type, true));
240 }
241 }
242 }
243 "distance" => {
244 fields.push(Field::new(output_name, DataType::Float64, true));
245 }
246 "score" | "vector_score" | "fts_score" | "raw_score" => {
247 fields.push(Field::new(output_name, DataType::Float32, true));
248 }
249 "vid" => {
250 fields.push(Field::new(output_name, DataType::Int64, true));
251 }
252 _ => {
253 fields.push(Field::new(output_name, DataType::Utf8, true));
254 }
255 }
256 }
257 }
258 name if name.starts_with("uni.algo.") => {
259 if let Some(registry) = graph_ctx.algo_registry()
260 && let Some(procedure) = registry.get(name)
261 {
262 let sig = procedure.signature();
263 for (yield_name, alias) in yield_items {
264 let col_name = alias.as_ref().unwrap_or(yield_name);
265 let yield_vt = sig.yields.iter().find(|(n, _)| *n == yield_name.as_str());
266 let data_type = yield_vt
267 .map(|(_, vt)| value_type_to_arrow(vt))
268 .unwrap_or(DataType::Utf8);
269 let mut field = Field::new(col_name, data_type, true);
270 if yield_vt.is_some_and(|(_, vt)| is_complex_value_type(vt)) {
273 let mut metadata = std::collections::HashMap::new();
274 metadata.insert("cv_encoded".to_string(), "true".to_string());
275 field = field.with_metadata(metadata);
276 }
277 fields.push(field);
278 }
279 } else {
280 for (name, alias) in yield_items {
282 let col_name = alias.as_ref().unwrap_or(name);
283 fields.push(Field::new(col_name, DataType::Utf8, true));
284 }
285 }
286 }
287 _ => {
288 if let Some(registry) = graph_ctx.procedure_registry()
290 && let Some(proc_def) = registry.get(procedure_name)
291 {
292 for (name, alias) in yield_items {
293 let col_name = alias.as_ref().unwrap_or(name);
294 let data_type = proc_def
296 .outputs
297 .iter()
298 .find(|o| o.name == *name)
299 .map(|o| procedure_value_type_to_arrow(&o.output_type))
300 .unwrap_or(DataType::Utf8);
301 fields.push(Field::new(col_name, data_type, true));
302 }
303 } else if yield_items.is_empty() {
304 } else {
306 for (name, alias) in yield_items {
308 let col_name = alias.as_ref().unwrap_or(name);
309 fields.push(Field::new(col_name, DataType::Utf8, true));
310 }
311 }
312 }
313 }
314
315 Arc::new(Schema::new(fields))
316 }
317}
318
319fn value_type_to_arrow(vt: &uni_algo::algo::procedures::ValueType) -> DataType {
321 use uni_algo::algo::procedures::ValueType;
322 match vt {
323 ValueType::Int => DataType::Int64,
324 ValueType::Float => DataType::Float64,
325 ValueType::String => DataType::Utf8,
326 ValueType::Bool => DataType::Boolean,
327 ValueType::List
328 | ValueType::Map
329 | ValueType::Node
330 | ValueType::Relationship
331 | ValueType::Path
332 | ValueType::Any => DataType::Utf8,
333 }
334}
335
336fn is_complex_value_type(vt: &uni_algo::algo::procedures::ValueType) -> bool {
339 use uni_algo::algo::procedures::ValueType;
340 matches!(
341 vt,
342 ValueType::List
343 | ValueType::Map
344 | ValueType::Node
345 | ValueType::Relationship
346 | ValueType::Path
347 )
348}
349
350fn procedure_value_type_to_arrow(
352 vt: &crate::query::executor::procedure::ProcedureValueType,
353) -> DataType {
354 use crate::query::executor::procedure::ProcedureValueType;
355 match vt {
356 ProcedureValueType::Integer => DataType::Int64,
357 ProcedureValueType::Float | ProcedureValueType::Number => DataType::Float64,
358 ProcedureValueType::Boolean => DataType::Boolean,
359 ProcedureValueType::String | ProcedureValueType::Any => DataType::Utf8,
360 }
361}
362
363impl DisplayAs for GraphProcedureCallExec {
364 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 write!(
366 f,
367 "GraphProcedureCallExec: procedure={}",
368 self.procedure_name
369 )
370 }
371}
372
373impl ExecutionPlan for GraphProcedureCallExec {
374 fn name(&self) -> &str {
375 "GraphProcedureCallExec"
376 }
377
378 fn as_any(&self) -> &dyn Any {
379 self
380 }
381
382 fn schema(&self) -> SchemaRef {
383 self.schema.clone()
384 }
385
386 fn properties(&self) -> &PlanProperties {
387 &self.properties
388 }
389
390 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
391 vec![]
392 }
393
394 fn with_new_children(
395 self: Arc<Self>,
396 children: Vec<Arc<dyn ExecutionPlan>>,
397 ) -> DFResult<Arc<dyn ExecutionPlan>> {
398 if !children.is_empty() {
399 return Err(datafusion::error::DataFusionError::Internal(
400 "GraphProcedureCallExec has no children".to_string(),
401 ));
402 }
403 Ok(self)
404 }
405
406 fn execute(
407 &self,
408 partition: usize,
409 _context: Arc<TaskContext>,
410 ) -> DFResult<SendableRecordBatchStream> {
411 let metrics = BaselineMetrics::new(&self.metrics, partition);
412
413 let mut evaluated_args = Vec::with_capacity(self.arguments.len());
415 for arg in &self.arguments {
416 evaluated_args.push(evaluate_simple_expr(arg, &self.params)?);
417 }
418
419 Ok(Box::pin(ProcedureCallStream::new(
420 self.graph_ctx.clone(),
421 self.procedure_name.clone(),
422 evaluated_args,
423 self.yield_items.clone(),
424 self.target_properties.clone(),
425 self.schema.clone(),
426 metrics,
427 )))
428 }
429
430 fn metrics(&self) -> Option<MetricsSet> {
431 Some(self.metrics.clone_inner())
432 }
433}
434
435enum ProcedureCallState {
441 Init,
443 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
445 Done,
447}
448
449struct ProcedureCallStream {
451 graph_ctx: Arc<GraphExecutionContext>,
452 procedure_name: String,
453 evaluated_args: Vec<Value>,
454 yield_items: Vec<(String, Option<String>)>,
455 target_properties: HashMap<String, Vec<String>>,
456 schema: SchemaRef,
457 state: ProcedureCallState,
458 metrics: BaselineMetrics,
459}
460
461impl ProcedureCallStream {
462 fn new(
463 graph_ctx: Arc<GraphExecutionContext>,
464 procedure_name: String,
465 evaluated_args: Vec<Value>,
466 yield_items: Vec<(String, Option<String>)>,
467 target_properties: HashMap<String, Vec<String>>,
468 schema: SchemaRef,
469 metrics: BaselineMetrics,
470 ) -> Self {
471 Self {
472 graph_ctx,
473 procedure_name,
474 evaluated_args,
475 yield_items,
476 target_properties,
477 schema,
478 state: ProcedureCallState::Init,
479 metrics,
480 }
481 }
482}
483
484impl Stream for ProcedureCallStream {
485 type Item = DFResult<RecordBatch>;
486
487 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
488 loop {
489 let state = std::mem::replace(&mut self.state, ProcedureCallState::Done);
490
491 match state {
492 ProcedureCallState::Init => {
493 let graph_ctx = self.graph_ctx.clone();
494 let procedure_name = self.procedure_name.clone();
495 let evaluated_args = self.evaluated_args.clone();
496 let yield_items = self.yield_items.clone();
497 let target_properties = self.target_properties.clone();
498 let schema = self.schema.clone();
499
500 let fut = async move {
501 graph_ctx.check_timeout().map_err(|e| {
502 datafusion::error::DataFusionError::Execution(e.to_string())
503 })?;
504
505 execute_procedure(
506 &graph_ctx,
507 &procedure_name,
508 &evaluated_args,
509 &yield_items,
510 &target_properties,
511 &schema,
512 )
513 .await
514 };
515
516 self.state = ProcedureCallState::Executing(Box::pin(fut));
517 }
518 ProcedureCallState::Executing(mut fut) => match fut.as_mut().poll(cx) {
519 Poll::Ready(Ok(batch)) => {
520 self.state = ProcedureCallState::Done;
521 self.metrics
522 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
523 return Poll::Ready(batch.map(Ok));
524 }
525 Poll::Ready(Err(e)) => {
526 self.state = ProcedureCallState::Done;
527 return Poll::Ready(Some(Err(e)));
528 }
529 Poll::Pending => {
530 self.state = ProcedureCallState::Executing(fut);
531 return Poll::Pending;
532 }
533 },
534 ProcedureCallState::Done => {
535 return Poll::Ready(None);
536 }
537 }
538 }
539 }
540}
541
542impl RecordBatchStream for ProcedureCallStream {
543 fn schema(&self) -> SchemaRef {
544 self.schema.clone()
545 }
546}
547
548async fn execute_procedure(
554 graph_ctx: &GraphExecutionContext,
555 procedure_name: &str,
556 args: &[Value],
557 yield_items: &[(String, Option<String>)],
558 target_properties: &HashMap<String, Vec<String>>,
559 schema: &SchemaRef,
560) -> DFResult<Option<RecordBatch>> {
561 match procedure_name {
562 "uni.schema.labels" => execute_schema_labels(graph_ctx, yield_items, schema).await,
563 "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
564 execute_schema_edge_types(graph_ctx, yield_items, schema).await
565 }
566 "uni.schema.indexes" => execute_schema_indexes(graph_ctx, yield_items, schema).await,
567 "uni.schema.constraints" => {
568 execute_schema_constraints(graph_ctx, yield_items, schema).await
569 }
570 "uni.schema.labelInfo" => {
571 execute_schema_label_info(graph_ctx, args, yield_items, schema).await
572 }
573 "uni.vector.query" => {
574 execute_vector_query(graph_ctx, args, yield_items, target_properties, schema).await
575 }
576 "uni.fts.query" => {
577 execute_fts_query(graph_ctx, args, yield_items, target_properties, schema).await
578 }
579 "uni.search" => {
580 execute_hybrid_search(graph_ctx, args, yield_items, target_properties, schema).await
581 }
582 name if name.starts_with("uni.algo.") => {
583 execute_algo_procedure(graph_ctx, name, args, yield_items, schema).await
584 }
585 _ => {
586 execute_registered_procedure(graph_ctx, procedure_name, args, yield_items, schema).await
587 }
588 }
589}
590
591async fn execute_schema_labels(
596 graph_ctx: &GraphExecutionContext,
597 yield_items: &[(String, Option<String>)],
598 schema: &SchemaRef,
599) -> DFResult<Option<RecordBatch>> {
600 let uni_schema = graph_ctx.storage().schema_manager().schema();
601 let storage = graph_ctx.storage();
602
603 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
605 for label_name in uni_schema.labels.keys() {
606 let mut row = HashMap::new();
607 row.insert("label".to_string(), Value::String(label_name.clone()));
608
609 let prop_count = uni_schema
610 .properties
611 .get(label_name)
612 .map(|p| p.len())
613 .unwrap_or(0);
614 row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
615
616 let node_count = if let Ok(ds) = storage.vertex_dataset(label_name) {
617 if let Ok(raw) = ds.open_raw().await {
618 raw.count_rows(None).await.unwrap_or(0)
619 } else {
620 0
621 }
622 } else {
623 0
624 };
625 row.insert("nodeCount".to_string(), Value::Int(node_count as i64));
626
627 let idx_count = uni_schema
628 .indexes
629 .iter()
630 .filter(|i| i.label() == label_name)
631 .count();
632 row.insert("indexCount".to_string(), Value::Int(idx_count as i64));
633
634 rows.push(row);
635 }
636
637 build_scalar_batch(&rows, yield_items, schema)
638}
639
640async fn execute_schema_edge_types(
641 graph_ctx: &GraphExecutionContext,
642 yield_items: &[(String, Option<String>)],
643 schema: &SchemaRef,
644) -> DFResult<Option<RecordBatch>> {
645 let uni_schema = graph_ctx.storage().schema_manager().schema();
646
647 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
648 for (type_name, meta) in &uni_schema.edge_types {
649 let mut row = HashMap::new();
650 row.insert("type".to_string(), Value::String(type_name.clone()));
651 row.insert(
652 "relationshipType".to_string(),
653 Value::String(type_name.clone()),
654 );
655 row.insert(
656 "sourceLabels".to_string(),
657 Value::String(format!("{:?}", meta.src_labels)),
658 );
659 row.insert(
660 "targetLabels".to_string(),
661 Value::String(format!("{:?}", meta.dst_labels)),
662 );
663
664 let prop_count = uni_schema
665 .properties
666 .get(type_name)
667 .map(|p| p.len())
668 .unwrap_or(0);
669 row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
670
671 rows.push(row);
672 }
673
674 build_scalar_batch(&rows, yield_items, schema)
675}
676
677async fn execute_schema_indexes(
678 graph_ctx: &GraphExecutionContext,
679 yield_items: &[(String, Option<String>)],
680 schema: &SchemaRef,
681) -> DFResult<Option<RecordBatch>> {
682 let uni_schema = graph_ctx.storage().schema_manager().schema();
683
684 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
685 for idx in &uni_schema.indexes {
686 use uni_common::core::schema::IndexDefinition;
687
688 let (type_name, properties_json) = match &idx {
690 IndexDefinition::Vector(v) => (
691 "VECTOR",
692 serde_json::to_string(&[&v.property]).unwrap_or_default(),
693 ),
694 IndexDefinition::FullText(f) => (
695 "FULLTEXT",
696 serde_json::to_string(&f.properties).unwrap_or_default(),
697 ),
698 IndexDefinition::Scalar(s) => (
699 "SCALAR",
700 serde_json::to_string(&s.properties).unwrap_or_default(),
701 ),
702 IndexDefinition::JsonFullText(j) => (
703 "JSON_FTS",
704 serde_json::to_string(&[&j.column]).unwrap_or_default(),
705 ),
706 IndexDefinition::Inverted(inv) => (
707 "INVERTED",
708 serde_json::to_string(&[&inv.property]).unwrap_or_default(),
709 ),
710 _ => ("UNKNOWN", String::new()),
711 };
712
713 let row = HashMap::from([
714 ("state".to_string(), Value::String("ONLINE".to_string())),
715 ("name".to_string(), Value::String(idx.name().to_string())),
716 ("type".to_string(), Value::String(type_name.to_string())),
717 ("label".to_string(), Value::String(idx.label().to_string())),
718 ("properties".to_string(), Value::String(properties_json)),
719 ]);
720 rows.push(row);
721 }
722
723 build_scalar_batch(&rows, yield_items, schema)
724}
725
726async fn execute_schema_constraints(
727 graph_ctx: &GraphExecutionContext,
728 yield_items: &[(String, Option<String>)],
729 schema: &SchemaRef,
730) -> DFResult<Option<RecordBatch>> {
731 let uni_schema = graph_ctx.storage().schema_manager().schema();
732
733 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
734 for c in &uni_schema.constraints {
735 let mut row = HashMap::new();
736 row.insert("name".to_string(), Value::String(c.name.clone()));
737 row.insert("enabled".to_string(), Value::Bool(c.enabled));
738
739 match &c.constraint_type {
740 uni_common::core::schema::ConstraintType::Unique { properties } => {
741 row.insert("type".to_string(), Value::String("UNIQUE".to_string()));
742 row.insert(
743 "properties".to_string(),
744 Value::String(serde_json::to_string(&properties).unwrap_or_default()),
745 );
746 }
747 uni_common::core::schema::ConstraintType::Exists { property } => {
748 row.insert("type".to_string(), Value::String("EXISTS".to_string()));
749 row.insert(
750 "properties".to_string(),
751 Value::String(serde_json::to_string(&[&property]).unwrap_or_default()),
752 );
753 }
754 uni_common::core::schema::ConstraintType::Check { expression } => {
755 row.insert("type".to_string(), Value::String("CHECK".to_string()));
756 row.insert("expression".to_string(), Value::String(expression.clone()));
757 }
758 _ => {
759 row.insert("type".to_string(), Value::String("UNKNOWN".to_string()));
760 }
761 }
762
763 match &c.target {
764 uni_common::core::schema::ConstraintTarget::Label(l) => {
765 row.insert("label".to_string(), Value::String(l.clone()));
766 }
767 uni_common::core::schema::ConstraintTarget::EdgeType(t) => {
768 row.insert("relationshipType".to_string(), Value::String(t.clone()));
769 }
770 _ => {
771 row.insert("target".to_string(), Value::String("UNKNOWN".to_string()));
772 }
773 }
774
775 rows.push(row);
776 }
777
778 build_scalar_batch(&rows, yield_items, schema)
779}
780
781async fn execute_schema_label_info(
782 graph_ctx: &GraphExecutionContext,
783 args: &[Value],
784 yield_items: &[(String, Option<String>)],
785 schema: &SchemaRef,
786) -> DFResult<Option<RecordBatch>> {
787 let label_name = require_string_arg(args, 0, "uni.schema.labelInfo: first argument (label)")?;
788
789 let uni_schema = graph_ctx.storage().schema_manager().schema();
790
791 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
792 if let Some(props) = uni_schema.properties.get(&label_name) {
793 for (prop_name, prop_meta) in props {
794 let mut row = HashMap::new();
795 row.insert("property".to_string(), Value::String(prop_name.clone()));
796 row.insert(
797 "dataType".to_string(),
798 Value::String(format!("{:?}", prop_meta.r#type)),
799 );
800 row.insert("nullable".to_string(), Value::Bool(prop_meta.nullable));
801
802 let is_indexed = uni_schema.indexes.iter().any(|idx| match idx {
803 uni_common::core::schema::IndexDefinition::Vector(v) => {
804 v.label == label_name && v.property == *prop_name
805 }
806 uni_common::core::schema::IndexDefinition::Scalar(s) => {
807 s.label == label_name && s.properties.contains(prop_name)
808 }
809 uni_common::core::schema::IndexDefinition::FullText(f) => {
810 f.label == label_name && f.properties.contains(prop_name)
811 }
812 uni_common::core::schema::IndexDefinition::Inverted(inv) => {
813 inv.label == label_name && inv.property == *prop_name
814 }
815 uni_common::core::schema::IndexDefinition::JsonFullText(j) => j.label == label_name,
816 _ => false,
817 });
818 row.insert("indexed".to_string(), Value::Bool(is_indexed));
819
820 let unique = uni_schema.constraints.iter().any(|c| {
821 if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
822 && l == &label_name
823 && c.enabled
824 && let uni_common::core::schema::ConstraintType::Unique { properties } =
825 &c.constraint_type
826 {
827 return properties.contains(prop_name);
828 }
829 false
830 });
831 row.insert("unique".to_string(), Value::Bool(unique));
832
833 rows.push(row);
834 }
835 }
836
837 build_scalar_batch(&rows, yield_items, schema)
838}
839
840fn build_typed_column<'a>(
845 values: impl Iterator<Item = Option<&'a Value>>,
846 num_rows: usize,
847 data_type: &DataType,
848) -> ArrayRef {
849 match data_type {
850 DataType::Int64 => {
851 let mut builder = Int64Builder::with_capacity(num_rows);
852 for val in values {
853 match val.and_then(|v| v.as_i64()) {
854 Some(i) => builder.append_value(i),
855 None => builder.append_null(),
856 }
857 }
858 Arc::new(builder.finish())
859 }
860 DataType::Float64 => {
861 let mut builder = Float64Builder::with_capacity(num_rows);
862 for val in values {
863 match val.and_then(|v| v.as_f64()) {
864 Some(f) => builder.append_value(f),
865 None => builder.append_null(),
866 }
867 }
868 Arc::new(builder.finish())
869 }
870 DataType::Boolean => {
871 let mut builder = BooleanBuilder::with_capacity(num_rows);
872 for val in values {
873 match val.and_then(|v| v.as_bool()) {
874 Some(b) => builder.append_value(b),
875 None => builder.append_null(),
876 }
877 }
878 Arc::new(builder.finish())
879 }
880 _ => {
881 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
883 for val in values {
884 match val {
885 Some(Value::String(s)) => builder.append_value(s),
886 Some(v) => builder.append_value(format!("{v}")),
887 None => builder.append_null(),
888 }
889 }
890 Arc::new(builder.finish())
891 }
892 }
893}
894
895fn create_empty_batch(schema: SchemaRef) -> DFResult<RecordBatch> {
901 if schema.fields().is_empty() {
902 let options = arrow_array::RecordBatchOptions::new().with_row_count(Some(0));
903 RecordBatch::try_new_with_options(schema, vec![], &options).map_err(arrow_err)
904 } else {
905 Ok(RecordBatch::new_empty(schema))
906 }
907}
908
909fn build_scalar_batch(
911 rows: &[HashMap<String, Value>],
912 yield_items: &[(String, Option<String>)],
913 schema: &SchemaRef,
914) -> DFResult<Option<RecordBatch>> {
915 if rows.is_empty() {
916 return Ok(Some(create_empty_batch(schema.clone())?));
917 }
918
919 let num_rows = rows.len();
920 let mut columns: Vec<ArrayRef> = Vec::new();
921
922 for (idx, (name, _alias)) in yield_items.iter().enumerate() {
923 let field = schema.field(idx);
924 let values = rows.iter().map(|row| row.get(name));
925 columns.push(build_typed_column(values, num_rows, field.data_type()));
926 }
927
928 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
929 Ok(Some(batch))
930}
931
932async fn execute_registered_procedure(
941 graph_ctx: &GraphExecutionContext,
942 procedure_name: &str,
943 args: &[Value],
944 yield_items: &[(String, Option<String>)],
945 schema: &SchemaRef,
946) -> DFResult<Option<RecordBatch>> {
947 let registry = graph_ctx.procedure_registry().ok_or_else(|| {
948 datafusion::error::DataFusionError::Execution(format!(
949 "Procedure '{}' not supported in DataFusion engine (no procedure registry)",
950 procedure_name
951 ))
952 })?;
953
954 let proc_def = registry.get(procedure_name).ok_or_else(|| {
955 datafusion::error::DataFusionError::Execution(format!(
956 "ProcedureNotFound: Unknown procedure '{}'",
957 procedure_name
958 ))
959 })?;
960
961 if args.len() != proc_def.params.len() {
963 return Err(datafusion::error::DataFusionError::Execution(format!(
964 "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
965 proc_def.name,
966 proc_def.params.len(),
967 args.len()
968 )));
969 }
970
971 for (i, (arg_val, param)) in args.iter().zip(&proc_def.params).enumerate() {
973 if !arg_val.is_null() && !check_proc_type_compatible(arg_val, ¶m.param_type) {
974 return Err(datafusion::error::DataFusionError::Execution(format!(
975 "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
976 i, param.name, proc_def.name
977 )));
978 }
979 }
980
981 let filtered: Vec<&HashMap<String, Value>> = proc_def
983 .data
984 .iter()
985 .filter(|row| {
986 for (param, arg_val) in proc_def.params.iter().zip(args) {
987 if let Some(row_val) = row.get(¶m.name)
988 && !proc_values_match(row_val, arg_val)
989 {
990 return false;
991 }
992 }
993 true
994 })
995 .collect();
996
997 if yield_items.is_empty() {
999 return Ok(Some(create_empty_batch(schema.clone())?));
1000 }
1001
1002 if filtered.is_empty() {
1003 return Ok(Some(create_empty_batch(schema.clone())?));
1004 }
1005
1006 let num_rows = filtered.len();
1009 let mut columns: Vec<ArrayRef> = Vec::new();
1010
1011 for (idx, (name, _alias)) in yield_items.iter().enumerate() {
1012 let field = schema.field(idx);
1013 let values = filtered.iter().map(|row| row.get(name.as_str()));
1014 columns.push(build_typed_column(values, num_rows, field.data_type()));
1015 }
1016
1017 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1018 Ok(Some(batch))
1019}
1020
1021fn check_proc_type_compatible(
1023 val: &Value,
1024 expected: &crate::query::executor::procedure::ProcedureValueType,
1025) -> bool {
1026 use crate::query::executor::procedure::ProcedureValueType;
1027 match expected {
1028 ProcedureValueType::Any => true,
1029 ProcedureValueType::String => val.is_string(),
1030 ProcedureValueType::Boolean => val.is_bool(),
1031 ProcedureValueType::Integer => val.is_i64(),
1032 ProcedureValueType::Float => val.is_f64() || val.is_i64(),
1033 ProcedureValueType::Number => val.is_number(),
1034 }
1035}
1036
1037fn proc_values_match(row_val: &Value, arg_val: &Value) -> bool {
1039 if arg_val.is_null() || row_val.is_null() {
1040 return arg_val.is_null() && row_val.is_null();
1041 }
1042 if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
1044 return (a - b).abs() < f64::EPSILON;
1045 }
1046 row_val == arg_val
1047}
1048
1049async fn execute_algo_procedure(
1054 graph_ctx: &GraphExecutionContext,
1055 procedure_name: &str,
1056 args: &[Value],
1057 yield_items: &[(String, Option<String>)],
1058 schema: &SchemaRef,
1059) -> DFResult<Option<RecordBatch>> {
1060 use futures::StreamExt;
1061 use uni_algo::algo::procedures::AlgoContext;
1062
1063 let registry = graph_ctx.algo_registry().ok_or_else(|| {
1064 datafusion::error::DataFusionError::Execution(
1065 "Algorithm registry not available".to_string(),
1066 )
1067 })?;
1068
1069 let procedure = registry.get(procedure_name).ok_or_else(|| {
1070 datafusion::error::DataFusionError::Execution(format!(
1071 "Unknown algorithm: {}",
1072 procedure_name
1073 ))
1074 })?;
1075
1076 let signature = procedure.signature();
1077
1078 let serde_args: Vec<serde_json::Value> = args.iter().cloned().map(|v| v.into()).collect();
1082
1083 let l0_mgr = {
1085 let l0_ctx = graph_ctx.l0_context();
1086 l0_ctx.current_l0.as_ref().map(|current| {
1087 let mut pending = l0_ctx.pending_flush_l0s.clone();
1088 if let Some(tx_l0) = &l0_ctx.transaction_l0 {
1089 pending.push(tx_l0.clone());
1090 }
1091 Arc::new(uni_store::runtime::l0_manager::L0Manager::from_snapshot(
1092 current.clone(),
1093 pending,
1094 ))
1095 })
1096 };
1097 let algo_ctx = AlgoContext::new(graph_ctx.storage().clone(), l0_mgr);
1098
1099 let mut stream = procedure.execute(algo_ctx, serde_args);
1101 let mut rows = Vec::new();
1102 while let Some(row_res) = stream.next().await {
1103 if rows.len() % 1000 == 0 {
1105 graph_ctx
1106 .check_timeout()
1107 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1108 }
1109 let row =
1110 row_res.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1111 rows.push(row);
1112 }
1113
1114 build_algo_batch(&rows, &signature, yield_items, schema)
1115}
1116
1117fn json_to_value(jv: &serde_json::Value) -> Value {
1119 match jv {
1120 serde_json::Value::Null => Value::Null,
1121 serde_json::Value::Bool(b) => Value::Bool(*b),
1122 serde_json::Value::Number(n) => {
1123 if let Some(i) = n.as_i64() {
1124 Value::Int(i)
1125 } else if let Some(f) = n.as_f64() {
1126 Value::Float(f)
1127 } else {
1128 Value::Null
1129 }
1130 }
1131 serde_json::Value::String(s) => Value::String(s.clone()),
1132 other => Value::String(other.to_string()),
1133 }
1134}
1135
1136fn build_algo_batch(
1138 rows: &[uni_algo::algo::procedures::AlgoResultRow],
1139 signature: &uni_algo::algo::procedures::ProcedureSignature,
1140 yield_items: &[(String, Option<String>)],
1141 schema: &SchemaRef,
1142) -> DFResult<Option<RecordBatch>> {
1143 if rows.is_empty() {
1144 return Ok(Some(create_empty_batch(schema.clone())?));
1145 }
1146
1147 let num_rows = rows.len();
1148 let mut columns: Vec<ArrayRef> = Vec::new();
1149
1150 for (idx, (yield_name, _alias)) in yield_items.iter().enumerate() {
1151 let sig_idx = signature
1152 .yields
1153 .iter()
1154 .position(|(n, _)| *n == yield_name.as_str());
1155
1156 let uni_values: Vec<Value> = rows
1158 .iter()
1159 .map(|row| match sig_idx {
1160 Some(si) => json_to_value(&row.values[si]),
1161 None => Value::Null,
1162 })
1163 .collect();
1164
1165 let field = schema.field(idx);
1166 let values = uni_values.iter().map(Some);
1167 columns.push(build_typed_column(values, num_rows, field.data_type()));
1168 }
1169
1170 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1171 Ok(Some(batch))
1172}
1173
1174fn require_string_arg(args: &[Value], index: usize, description: &str) -> DFResult<String> {
1180 args.get(index)
1181 .and_then(|v| v.as_str())
1182 .map(|s| s.to_string())
1183 .ok_or_else(|| {
1184 datafusion::error::DataFusionError::Execution(format!("{description} must be a string"))
1185 })
1186}
1187
1188fn extract_optional_filter(args: &[Value], index: usize) -> Option<String> {
1191 args.get(index).and_then(|v| {
1192 if v.is_null() {
1193 None
1194 } else {
1195 v.as_str().map(|s| s.to_string())
1196 }
1197 })
1198}
1199
1200fn extract_optional_threshold(args: &[Value], index: usize) -> Option<f64> {
1203 args.get(index)
1204 .and_then(|v| if v.is_null() { None } else { v.as_f64() })
1205}
1206
1207fn require_int_arg(args: &[Value], index: usize, description: &str) -> DFResult<usize> {
1209 args.get(index)
1210 .and_then(|v| v.as_u64())
1211 .map(|v| v as usize)
1212 .ok_or_else(|| {
1213 datafusion::error::DataFusionError::Execution(format!(
1214 "{description} must be an integer"
1215 ))
1216 })
1217}
1218
1219async fn auto_embed_text(
1228 graph_ctx: &GraphExecutionContext,
1229 label: &str,
1230 property: &str,
1231 query_text: &str,
1232) -> DFResult<Vec<f32>> {
1233 let storage = graph_ctx.storage();
1234 let uni_schema = storage.schema_manager().schema();
1235 let index_config = uni_schema.vector_index_for_property(label, property);
1236
1237 let embedding_config = index_config
1238 .and_then(|cfg| cfg.embedding_config.as_ref())
1239 .ok_or_else(|| {
1240 datafusion::error::DataFusionError::Execution(format!(
1241 "Cannot auto-embed: vector index for {label}.{property} has no embedding_config. \
1242 Either provide a pre-computed vector or create the index with embedding options."
1243 ))
1244 })?;
1245
1246 let runtime = graph_ctx.xervo_runtime().ok_or_else(|| {
1247 datafusion::error::DataFusionError::Execution(
1248 "Cannot auto-embed: Uni-Xervo runtime not configured".to_string(),
1249 )
1250 })?;
1251
1252 let embedder = runtime
1253 .embedding(&embedding_config.alias)
1254 .await
1255 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1256 let embeddings = embedder
1257 .embed(vec![query_text])
1258 .await
1259 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1260 embeddings.into_iter().next().ok_or_else(|| {
1261 datafusion::error::DataFusionError::Execution(
1262 "Embedding service returned no results".to_string(),
1263 )
1264 })
1265}
1266
1267async fn execute_vector_query(
1268 graph_ctx: &GraphExecutionContext,
1269 args: &[Value],
1270 yield_items: &[(String, Option<String>)],
1271 target_properties: &HashMap<String, Vec<String>>,
1272 schema: &SchemaRef,
1273) -> DFResult<Option<RecordBatch>> {
1274 let label = require_string_arg(args, 0, "uni.vector.query: first argument (label)")?;
1275 let property = require_string_arg(args, 1, "uni.vector.query: second argument (property)")?;
1276
1277 let query_val = args.get(2).ok_or_else(|| {
1278 datafusion::error::DataFusionError::Execution(
1279 "uni.vector.query: third argument (query) is required".to_string(),
1280 )
1281 })?;
1282
1283 let storage = graph_ctx.storage();
1284
1285 let query_vector: Vec<f32> = if let Some(query_text) = query_val.as_str() {
1286 auto_embed_text(graph_ctx, &label, &property, query_text).await?
1287 } else {
1288 extract_vector(query_val)?
1289 };
1290
1291 let k = require_int_arg(args, 3, "uni.vector.query: fourth argument (k)")?;
1292 let filter = extract_optional_filter(args, 4);
1293 let threshold = extract_optional_threshold(args, 5);
1294 let query_ctx = graph_ctx.query_context();
1295
1296 let mut results = storage
1297 .vector_search(
1298 &label,
1299 &property,
1300 &query_vector,
1301 k,
1302 filter.as_deref(),
1303 Some(&query_ctx),
1304 )
1305 .await
1306 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1307
1308 if let Some(max_dist) = threshold {
1310 results.retain(|(_, dist)| *dist <= max_dist as f32);
1311 }
1312
1313 if results.is_empty() {
1314 return Ok(Some(create_empty_batch(schema.clone())?));
1315 }
1316
1317 let schema_manager = storage.schema_manager();
1319 let uni_schema = schema_manager.schema();
1320 let metric = uni_schema
1321 .vector_index_for_property(&label, &property)
1322 .map(|config| config.metric.clone())
1323 .unwrap_or(DistanceMetric::L2);
1324
1325 build_search_result_batch(
1326 &results,
1327 &label,
1328 &metric,
1329 yield_items,
1330 target_properties,
1331 graph_ctx,
1332 schema,
1333 )
1334 .await
1335}
1336
1337async fn execute_fts_query(
1342 graph_ctx: &GraphExecutionContext,
1343 args: &[Value],
1344 yield_items: &[(String, Option<String>)],
1345 target_properties: &HashMap<String, Vec<String>>,
1346 schema: &SchemaRef,
1347) -> DFResult<Option<RecordBatch>> {
1348 let label = require_string_arg(args, 0, "uni.fts.query: first argument (label)")?;
1349 let property = require_string_arg(args, 1, "uni.fts.query: second argument (property)")?;
1350 let search_term = require_string_arg(args, 2, "uni.fts.query: third argument (search_term)")?;
1351 let k = require_int_arg(args, 3, "uni.fts.query: fourth argument (k)")?;
1352 let filter = extract_optional_filter(args, 4);
1353 let threshold = extract_optional_threshold(args, 5);
1354
1355 let storage = graph_ctx.storage();
1356 let query_ctx = graph_ctx.query_context();
1357
1358 let mut results = storage
1359 .fts_search(
1360 &label,
1361 &property,
1362 &search_term,
1363 k,
1364 filter.as_deref(),
1365 Some(&query_ctx),
1366 )
1367 .await
1368 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1369
1370 if let Some(min_score) = threshold {
1371 results.retain(|(_, score)| *score as f64 >= min_score);
1372 }
1373
1374 if results.is_empty() {
1375 return Ok(Some(create_empty_batch(schema.clone())?));
1376 }
1377
1378 build_search_result_batch(
1381 &results,
1382 &label,
1383 &DistanceMetric::L2,
1384 yield_items,
1385 target_properties,
1386 graph_ctx,
1387 schema,
1388 )
1389 .await
1390}
1391
1392async fn execute_hybrid_search(
1397 graph_ctx: &GraphExecutionContext,
1398 args: &[Value],
1399 yield_items: &[(String, Option<String>)],
1400 target_properties: &HashMap<String, Vec<String>>,
1401 schema: &SchemaRef,
1402) -> DFResult<Option<RecordBatch>> {
1403 let label = require_string_arg(args, 0, "uni.search: first argument (label)")?;
1404
1405 let properties_val = args.get(1).ok_or_else(|| {
1407 datafusion::error::DataFusionError::Execution(
1408 "uni.search: second argument (properties) is required".to_string(),
1409 )
1410 })?;
1411
1412 let (vector_prop, fts_prop) = if let Some(obj) = properties_val.as_object() {
1413 let vec_prop = obj
1414 .get("vector")
1415 .and_then(|v| v.as_str())
1416 .map(|s| s.to_string());
1417 let fts_prop = obj
1418 .get("fts")
1419 .and_then(|v| v.as_str())
1420 .map(|s| s.to_string());
1421 (vec_prop, fts_prop)
1422 } else if let Some(prop) = properties_val.as_str() {
1423 (Some(prop.to_string()), Some(prop.to_string()))
1425 } else {
1426 return Err(datafusion::error::DataFusionError::Execution(
1427 "Properties must be an object {vector: '...', fts: '...'} or a string".to_string(),
1428 ));
1429 };
1430
1431 let query_text = require_string_arg(args, 2, "uni.search: third argument (query_text)")?;
1432
1433 let query_vector: Option<Vec<f32>> = args.get(3).and_then(|v| {
1435 if v.is_null() {
1436 return None;
1437 }
1438 v.as_array().map(|arr| {
1439 arr.iter()
1440 .filter_map(|v| v.as_f64().map(|f| f as f32))
1441 .collect()
1442 })
1443 });
1444
1445 let k = require_int_arg(args, 4, "uni.search: fifth argument (k)")?;
1446 let filter = extract_optional_filter(args, 5);
1447
1448 let options_val = args.get(6);
1450 let options_map = options_val.and_then(|v| v.as_object());
1451 let fusion_method = options_map
1452 .and_then(|m| m.get("method"))
1453 .and_then(|v| v.as_str())
1454 .unwrap_or("rrf")
1455 .to_string();
1456 let alpha = options_map
1457 .and_then(|m| m.get("alpha"))
1458 .and_then(|v| v.as_f64())
1459 .unwrap_or(0.5) as f32;
1460 let over_fetch_factor = options_map
1461 .and_then(|m| m.get("over_fetch"))
1462 .and_then(|v| v.as_f64())
1463 .unwrap_or(2.0) as f32;
1464 let rrf_k = options_map
1465 .and_then(|m| m.get("rrf_k"))
1466 .and_then(|v| v.as_u64())
1467 .unwrap_or(60) as usize;
1468
1469 let over_fetch_k = (k as f32 * over_fetch_factor).ceil() as usize;
1470
1471 let storage = graph_ctx.storage();
1472 let query_ctx = graph_ctx.query_context();
1473
1474 let mut vector_results: Vec<(Vid, f32)> = Vec::new();
1476 if let Some(ref vec_prop) = vector_prop {
1477 let qvec = if let Some(ref v) = query_vector {
1479 v.clone()
1480 } else {
1481 auto_embed_text(graph_ctx, &label, vec_prop, &query_text)
1483 .await
1484 .unwrap_or_default()
1485 };
1486
1487 if !qvec.is_empty() {
1488 vector_results = storage
1489 .vector_search(
1490 &label,
1491 vec_prop,
1492 &qvec,
1493 over_fetch_k,
1494 filter.as_deref(),
1495 Some(&query_ctx),
1496 )
1497 .await
1498 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1499 }
1500 }
1501
1502 let mut fts_results: Vec<(Vid, f32)> = Vec::new();
1504 if let Some(ref fts_prop) = fts_prop {
1505 fts_results = storage
1506 .fts_search(
1507 &label,
1508 fts_prop,
1509 &query_text,
1510 over_fetch_k,
1511 filter.as_deref(),
1512 Some(&query_ctx),
1513 )
1514 .await
1515 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1516 }
1517
1518 let fused_results = match fusion_method.as_str() {
1520 "weighted" => fuse_weighted(&vector_results, &fts_results, alpha),
1521 _ => fuse_rrf(&vector_results, &fts_results, rrf_k),
1522 };
1523
1524 let final_results: Vec<_> = fused_results.into_iter().take(k).collect();
1526
1527 if final_results.is_empty() {
1528 return Ok(Some(create_empty_batch(schema.clone())?));
1529 }
1530
1531 let vec_score_map: HashMap<Vid, f32> = vector_results.iter().cloned().collect();
1533 let fts_score_map: HashMap<Vid, f32> = fts_results.iter().cloned().collect();
1534 let fts_max = fts_results.iter().map(|(_, s)| *s).fold(0.0f32, f32::max);
1535
1536 let uni_schema = storage.schema_manager().schema();
1538 let metric = vector_prop
1539 .as_ref()
1540 .and_then(|vp| {
1541 uni_schema
1542 .vector_index_for_property(&label, vp)
1543 .map(|config| config.metric.clone())
1544 })
1545 .unwrap_or(DistanceMetric::L2);
1546
1547 let score_ctx = HybridScoreContext {
1548 vec_score_map: &vec_score_map,
1549 fts_score_map: &fts_score_map,
1550 fts_max,
1551 metric: &metric,
1552 };
1553
1554 build_hybrid_search_batch(
1555 &final_results,
1556 &score_ctx,
1557 &label,
1558 yield_items,
1559 target_properties,
1560 graph_ctx,
1561 schema,
1562 )
1563 .await
1564}
1565
1566fn fuse_rrf(vec_results: &[(Vid, f32)], fts_results: &[(Vid, f32)], k: usize) -> Vec<(Vid, f32)> {
1569 crate::query::fusion::fuse_rrf(vec_results, fts_results, k)
1570}
1571
1572fn fuse_weighted(
1575 vec_results: &[(Vid, f32)],
1576 fts_results: &[(Vid, f32)],
1577 alpha: f32,
1578) -> Vec<(Vid, f32)> {
1579 crate::query::fusion::fuse_weighted(vec_results, fts_results, alpha)
1580}
1581
1582struct HybridScoreContext<'a> {
1584 vec_score_map: &'a HashMap<Vid, f32>,
1585 fts_score_map: &'a HashMap<Vid, f32>,
1586 fts_max: f32,
1587 metric: &'a DistanceMetric,
1588}
1589
1590async fn build_hybrid_search_batch(
1592 results: &[(Vid, f32)],
1593 scores: &HybridScoreContext<'_>,
1594 label: &str,
1595 yield_items: &[(String, Option<String>)],
1596 target_properties: &HashMap<String, Vec<String>>,
1597 graph_ctx: &GraphExecutionContext,
1598 schema: &SchemaRef,
1599) -> DFResult<Option<RecordBatch>> {
1600 let num_rows = results.len();
1601 let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1602 let fused_scores: Vec<f32> = results.iter().map(|(_, s)| *s).collect();
1603
1604 let property_manager = graph_ctx.property_manager();
1606 let query_ctx = graph_ctx.query_context();
1607 let uni_schema = graph_ctx.storage().schema_manager().schema();
1608 let label_props = uni_schema.properties.get(label);
1609
1610 let has_node_yield = yield_items
1611 .iter()
1612 .any(|(name, _)| map_yield_to_canonical(name) == "node");
1613
1614 let props_map = if has_node_yield {
1615 property_manager
1616 .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1617 .await
1618 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1619 } else {
1620 HashMap::new()
1621 };
1622
1623 let mut columns: Vec<ArrayRef> = Vec::new();
1624
1625 for (name, alias) in yield_items {
1626 let output_name = alias.as_ref().unwrap_or(name);
1627 let canonical = map_yield_to_canonical(name);
1628
1629 match canonical.as_str() {
1630 "node" => {
1631 columns.extend(build_node_yield_columns(
1632 &vids,
1633 label,
1634 output_name,
1635 target_properties,
1636 &props_map,
1637 label_props,
1638 )?);
1639 }
1640 "vid" => {
1641 let mut builder = Int64Builder::with_capacity(num_rows);
1642 for vid in &vids {
1643 builder.append_value(vid.as_u64() as i64);
1644 }
1645 columns.push(Arc::new(builder.finish()));
1646 }
1647 "score" => {
1648 let mut builder = Float32Builder::with_capacity(num_rows);
1649 for score in &fused_scores {
1650 builder.append_value(*score);
1651 }
1652 columns.push(Arc::new(builder.finish()));
1653 }
1654 "vector_score" => {
1655 let mut builder = Float32Builder::with_capacity(num_rows);
1656 for vid in &vids {
1657 if let Some(&dist) = scores.vec_score_map.get(vid) {
1658 let score = calculate_score(dist, scores.metric);
1659 builder.append_value(score);
1660 } else {
1661 builder.append_null();
1662 }
1663 }
1664 columns.push(Arc::new(builder.finish()));
1665 }
1666 "fts_score" => {
1667 let mut builder = Float32Builder::with_capacity(num_rows);
1668 for vid in &vids {
1669 if let Some(&raw_score) = scores.fts_score_map.get(vid) {
1670 let norm = if scores.fts_max > 0.0 {
1671 raw_score / scores.fts_max
1672 } else {
1673 0.0
1674 };
1675 builder.append_value(norm);
1676 } else {
1677 builder.append_null();
1678 }
1679 }
1680 columns.push(Arc::new(builder.finish()));
1681 }
1682 "distance" => {
1683 let mut builder = Float64Builder::with_capacity(num_rows);
1685 for vid in &vids {
1686 if let Some(&dist) = scores.vec_score_map.get(vid) {
1687 builder.append_value(dist as f64);
1688 } else {
1689 builder.append_null();
1690 }
1691 }
1692 columns.push(Arc::new(builder.finish()));
1693 }
1694 _ => {
1695 let mut builder = StringBuilder::with_capacity(num_rows, 0);
1696 for _ in 0..num_rows {
1697 builder.append_null();
1698 }
1699 columns.push(Arc::new(builder.finish()));
1700 }
1701 }
1702 }
1703
1704 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1705 Ok(Some(batch))
1706}
1707
1708async fn build_search_result_batch(
1715 results: &[(Vid, f32)],
1716 label: &str,
1717 metric: &DistanceMetric,
1718 yield_items: &[(String, Option<String>)],
1719 target_properties: &HashMap<String, Vec<String>>,
1720 graph_ctx: &GraphExecutionContext,
1721 schema: &SchemaRef,
1722) -> DFResult<Option<RecordBatch>> {
1723 let num_rows = results.len();
1724 let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1725 let distances: Vec<f32> = results.iter().map(|(_, d)| *d).collect();
1726
1727 let scores: Vec<f32> = distances
1729 .iter()
1730 .map(|dist| calculate_score(*dist, metric))
1731 .collect();
1732
1733 let property_manager = graph_ctx.property_manager();
1735 let query_ctx = graph_ctx.query_context();
1736 let uni_schema = graph_ctx.storage().schema_manager().schema();
1737 let label_props = uni_schema.properties.get(label);
1738
1739 let has_node_yield = yield_items
1741 .iter()
1742 .any(|(name, _)| map_yield_to_canonical(name) == "node");
1743
1744 let props_map = if has_node_yield {
1745 property_manager
1746 .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1747 .await
1748 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1749 } else {
1750 HashMap::new()
1751 };
1752
1753 let mut columns: Vec<ArrayRef> = Vec::new();
1755
1756 for (name, alias) in yield_items {
1757 let output_name = alias.as_ref().unwrap_or(name);
1758 let canonical = map_yield_to_canonical(name);
1759
1760 match canonical.as_str() {
1761 "node" => {
1762 columns.extend(build_node_yield_columns(
1763 &vids,
1764 label,
1765 output_name,
1766 target_properties,
1767 &props_map,
1768 label_props,
1769 )?);
1770 }
1771 "distance" => {
1772 let mut builder = Float64Builder::with_capacity(num_rows);
1773 for dist in &distances {
1774 builder.append_value(*dist as f64);
1775 }
1776 columns.push(Arc::new(builder.finish()));
1777 }
1778 "score" => {
1779 let mut builder = Float32Builder::with_capacity(num_rows);
1780 for score in &scores {
1781 builder.append_value(*score);
1782 }
1783 columns.push(Arc::new(builder.finish()));
1784 }
1785 "vid" => {
1786 let mut builder = Int64Builder::with_capacity(num_rows);
1787 for vid in &vids {
1788 builder.append_value(vid.as_u64() as i64);
1789 }
1790 columns.push(Arc::new(builder.finish()));
1791 }
1792 _ => {
1793 let mut builder = StringBuilder::with_capacity(num_rows, 0);
1795 for _ in 0..num_rows {
1796 builder.append_null();
1797 }
1798 columns.push(Arc::new(builder.finish()));
1799 }
1800 }
1801 }
1802
1803 let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1804 Ok(Some(batch))
1805}
1806
1807fn build_node_yield_columns(
1814 vids: &[Vid],
1815 label: &str,
1816 output_name: &str,
1817 target_properties: &HashMap<String, Vec<String>>,
1818 props_map: &HashMap<Vid, uni_common::Properties>,
1819 label_props: Option<&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>>,
1820) -> DFResult<Vec<ArrayRef>> {
1821 let num_rows = vids.len();
1822 let mut columns = Vec::new();
1823
1824 let mut vid_builder = UInt64Builder::with_capacity(num_rows);
1826 for vid in vids {
1827 vid_builder.append_value(vid.as_u64());
1828 }
1829 columns.push(Arc::new(vid_builder.finish()) as ArrayRef);
1830
1831 let mut var_builder = StringBuilder::with_capacity(num_rows, num_rows * 20);
1833 for vid in vids {
1834 var_builder.append_value(vid.to_string());
1835 }
1836 columns.push(Arc::new(var_builder.finish()) as ArrayRef);
1837
1838 let mut labels_builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
1840 for _ in 0..num_rows {
1841 labels_builder.values().append_value(label);
1842 labels_builder.append(true);
1843 }
1844 columns.push(Arc::new(labels_builder.finish()) as ArrayRef);
1845
1846 if let Some(props) = target_properties.get(output_name) {
1848 for prop_name in props {
1849 let data_type = resolve_property_type(prop_name, label_props);
1850 let column = crate::query::df_graph::scan::build_property_column_static(
1851 vids, props_map, prop_name, &data_type,
1852 )?;
1853 columns.push(column);
1854 }
1855 }
1856
1857 Ok(columns)
1858}
1859
1860fn extract_vector(val: &Value) -> DFResult<Vec<f32>> {
1862 match val {
1863 Value::Vector(vec) => Ok(vec.clone()),
1864 Value::List(arr) => {
1865 let mut vec = Vec::with_capacity(arr.len());
1866 for v in arr {
1867 if let Some(f) = v.as_f64() {
1868 vec.push(f as f32);
1869 } else {
1870 return Err(datafusion::error::DataFusionError::Execution(
1871 "Query vector must contain numbers".to_string(),
1872 ));
1873 }
1874 }
1875 Ok(vec)
1876 }
1877 _ => Err(datafusion::error::DataFusionError::Execution(
1878 "Query vector must be a list or vector".to_string(),
1879 )),
1880 }
1881}