Skip to main content

uni_query/query/df_graph/
procedure_call.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Procedure call execution plan for DataFusion.
5//!
6//! This module provides [`GraphProcedureCallExec`], a DataFusion [`ExecutionPlan`] that
7//! executes Cypher `CALL` procedures natively within the DataFusion engine.
8//!
9//! Used for composite queries where a `CALL` is followed by `MATCH`, e.g.:
10//! ```text
11//! CALL uni.schema.labels() YIELD label
12//! MATCH (n:Person) WHERE label = 'Person'
13//! RETURN n.name, label
14//! ```
15
16use arrow_array::builder::{
17    BooleanBuilder, Float32Builder, Float64Builder, Int64Builder, StringBuilder, UInt64Builder,
18};
19use arrow_array::{ArrayRef, RecordBatch};
20use arrow_schema::{DataType, Field, Schema, SchemaRef};
21use datafusion::common::Result as DFResult;
22use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
23use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
24use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
25use futures::Stream;
26use std::any::Any;
27use std::collections::HashMap;
28use std::fmt;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32use uni_common::Value;
33use uni_common::core::id::Vid;
34use uni_common::core::schema::DistanceMetric;
35use uni_cypher::ast::Expr;
36
37use crate::query::df_graph::GraphExecutionContext;
38use crate::query::df_graph::common::{
39    arrow_err, calculate_score, compute_plan_properties, evaluate_simple_expr, labels_data_type,
40};
41use crate::query::df_graph::scan::resolve_property_type;
42
43/// Maps a user-provided yield name to a canonical name.
44///
45/// - "vid", "_vid" → "vid"
46/// - "distance", "dist", "_distance" → "distance"
47/// - "score", "_score" → "score"
48/// - anything else → "node" (treated as node variable)
49pub(crate) fn map_yield_to_canonical(yield_name: &str) -> String {
50    match yield_name.to_lowercase().as_str() {
51        "vid" | "_vid" => "vid",
52        "distance" | "dist" | "_distance" => "distance",
53        "score" | "_score" => "score",
54        "vector_score" => "vector_score",
55        "fts_score" => "fts_score",
56        "raw_score" => "raw_score",
57        _ => "node",
58    }
59    .to_string()
60}
61
62/// Procedure call execution plan for DataFusion.
63///
64/// Executes Cypher CALL procedures (schema introspection, vector search, FTS, etc.)
65/// and emits results as Arrow RecordBatches.
66pub struct GraphProcedureCallExec {
67    /// Graph execution context for storage access.
68    graph_ctx: Arc<GraphExecutionContext>,
69
70    /// Fully qualified procedure name (e.g. "uni.schema.labels").
71    procedure_name: String,
72
73    /// Argument expressions from the CALL clause.
74    arguments: Vec<Expr>,
75
76    /// Yield items: (original_name, optional_alias).
77    yield_items: Vec<(String, Option<String>)>,
78
79    /// Query parameters for expression evaluation.
80    params: HashMap<String, Value>,
81
82    /// Outer values from correlated context (e.g. MATCH variables).
83    outer_values: HashMap<String, Value>,
84
85    /// Target properties per variable (for node-like yields).
86    target_properties: HashMap<String, Vec<String>>,
87
88    /// Output schema.
89    schema: SchemaRef,
90
91    /// Plan properties.
92    properties: PlanProperties,
93
94    /// Execution metrics.
95    metrics: ExecutionPlanMetricsSet,
96}
97
98impl fmt::Debug for GraphProcedureCallExec {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("GraphProcedureCallExec")
101            .field("procedure_name", &self.procedure_name)
102            .field("yield_items", &self.yield_items)
103            .finish()
104    }
105}
106
107impl GraphProcedureCallExec {
108    /// Create a new procedure call execution plan.
109    pub fn new(
110        graph_ctx: Arc<GraphExecutionContext>,
111        procedure_name: String,
112        arguments: Vec<Expr>,
113        yield_items: Vec<(String, Option<String>)>,
114        params: HashMap<String, Value>,
115        outer_values: HashMap<String, Value>,
116        target_properties: HashMap<String, Vec<String>>,
117    ) -> Self {
118        let schema = Self::build_schema(
119            &procedure_name,
120            &yield_items,
121            &target_properties,
122            &graph_ctx,
123        );
124        let properties = compute_plan_properties(schema.clone());
125
126        Self {
127            graph_ctx,
128            procedure_name,
129            arguments,
130            yield_items,
131            params,
132            outer_values,
133            target_properties,
134            schema,
135            properties,
136            metrics: ExecutionPlanMetricsSet::new(),
137        }
138    }
139
140    /// Build the output schema based on the procedure name and yield items.
141    fn build_schema(
142        procedure_name: &str,
143        yield_items: &[(String, Option<String>)],
144        target_properties: &HashMap<String, Vec<String>>,
145        graph_ctx: &GraphExecutionContext,
146    ) -> SchemaRef {
147        let mut fields = Vec::new();
148
149        match procedure_name {
150            "uni.schema.labels" => {
151                // Schema procedure yields scalar columns
152                for (name, alias) in yield_items {
153                    let col_name = alias.as_ref().unwrap_or(name);
154                    let data_type = match name.as_str() {
155                        "label" => DataType::Utf8,
156                        "propertyCount" | "nodeCount" | "indexCount" => DataType::Int64,
157                        _ => DataType::Utf8,
158                    };
159                    fields.push(Field::new(col_name, data_type, true));
160                }
161            }
162            "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
163                for (name, alias) in yield_items {
164                    let col_name = alias.as_ref().unwrap_or(name);
165                    let data_type = match name.as_str() {
166                        "type" | "relationshipType" => DataType::Utf8,
167                        "propertyCount" => DataType::Int64,
168                        "sourceLabels" | "targetLabels" => DataType::Utf8, // JSON string
169                        _ => DataType::Utf8,
170                    };
171                    fields.push(Field::new(col_name, data_type, true));
172                }
173            }
174            "uni.schema.indexes" => {
175                for (name, alias) in yield_items {
176                    let col_name = alias.as_ref().unwrap_or(name);
177                    let data_type = match name.as_str() {
178                        "name" | "type" | "label" | "state" | "properties" => DataType::Utf8,
179                        _ => DataType::Utf8,
180                    };
181                    fields.push(Field::new(col_name, data_type, true));
182                }
183            }
184            "uni.schema.constraints" => {
185                for (name, alias) in yield_items {
186                    let col_name = alias.as_ref().unwrap_or(name);
187                    let data_type = match name.as_str() {
188                        "enabled" => DataType::Boolean,
189                        _ => DataType::Utf8,
190                    };
191                    fields.push(Field::new(col_name, data_type, true));
192                }
193            }
194            "uni.schema.labelInfo" => {
195                for (name, alias) in yield_items {
196                    let col_name = alias.as_ref().unwrap_or(name);
197                    let data_type = match name.as_str() {
198                        "property" | "dataType" => DataType::Utf8,
199                        "nullable" | "indexed" | "unique" => DataType::Boolean,
200                        _ => DataType::Utf8,
201                    };
202                    fields.push(Field::new(col_name, data_type, true));
203                }
204            }
205            "uni.vector.query" | "uni.fts.query" | "uni.search" => {
206                // Search procedures yield node-like and scalar columns
207                for (name, alias) in yield_items {
208                    let output_name = alias.as_ref().unwrap_or(name);
209                    let canonical = map_yield_to_canonical(name);
210
211                    match canonical.as_str() {
212                        "node" => {
213                            // Node-like yield: emit _vid, variable, _label, and properties
214                            fields.push(Field::new(
215                                format!("{}._vid", output_name),
216                                DataType::UInt64,
217                                false,
218                            ));
219                            fields.push(Field::new(output_name, DataType::Utf8, false));
220                            fields.push(Field::new(
221                                format!("{}._labels", output_name),
222                                labels_data_type(),
223                                true,
224                            ));
225
226                            // Add property columns
227                            if let Some(props) = target_properties.get(output_name.as_str()) {
228                                let uni_schema = graph_ctx.storage().schema_manager().schema();
229                                // We don't know the exact label yet at planning time,
230                                // but we can try to resolve property types from any label
231                                for prop_name in props {
232                                    let col_name = format!("{}.{}", output_name, prop_name);
233                                    let arrow_type = resolve_property_type(prop_name, None);
234                                    // Try to resolve from all labels in the schema
235                                    let resolved_type = uni_schema
236                                        .properties
237                                        .values()
238                                        .find_map(|label_props| {
239                                            label_props.get(prop_name.as_str()).map(|_| {
240                                                resolve_property_type(prop_name, Some(label_props))
241                                            })
242                                        })
243                                        .unwrap_or(arrow_type);
244                                    fields.push(Field::new(&col_name, resolved_type, true));
245                                }
246                            }
247                        }
248                        "distance" => {
249                            fields.push(Field::new(output_name, DataType::Float64, true));
250                        }
251                        "score" | "vector_score" | "fts_score" | "raw_score" => {
252                            fields.push(Field::new(output_name, DataType::Float32, true));
253                        }
254                        "vid" => {
255                            fields.push(Field::new(output_name, DataType::Int64, true));
256                        }
257                        _ => {
258                            fields.push(Field::new(output_name, DataType::Utf8, true));
259                        }
260                    }
261                }
262            }
263            name if name.starts_with("uni.algo.") => {
264                if let Some(registry) = graph_ctx.algo_registry()
265                    && let Some(procedure) = registry.get(name)
266                {
267                    let sig = procedure.signature();
268                    for (yield_name, alias) in yield_items {
269                        let col_name = alias.as_ref().unwrap_or(yield_name);
270                        let yield_vt = sig.yields.iter().find(|(n, _)| *n == yield_name.as_str());
271                        let data_type = yield_vt
272                            .map(|(_, vt)| value_type_to_arrow(vt))
273                            .unwrap_or(DataType::Utf8);
274                        let mut field = Field::new(col_name, data_type, true);
275                        // Tag complex types (List, Map, etc.) so record_batches_to_rows
276                        // can parse the JSON string back to the original type.
277                        if yield_vt.is_some_and(|(_, vt)| is_complex_value_type(vt)) {
278                            let mut metadata = std::collections::HashMap::new();
279                            metadata.insert("cv_encoded".to_string(), "true".to_string());
280                            field = field.with_metadata(metadata);
281                        }
282                        fields.push(field);
283                    }
284                } else {
285                    // Unknown algo or no registry: fallback to Utf8
286                    for (name, alias) in yield_items {
287                        let col_name = alias.as_ref().unwrap_or(name);
288                        fields.push(Field::new(col_name, DataType::Utf8, true));
289                    }
290                }
291            }
292            _ => {
293                // Check external procedure registry for type information
294                if let Some(registry) = graph_ctx.procedure_registry()
295                    && let Some(proc_def) = registry.get(procedure_name)
296                {
297                    for (name, alias) in yield_items {
298                        let col_name = alias.as_ref().unwrap_or(name);
299                        // Find the output type from the procedure definition
300                        let data_type = proc_def
301                            .outputs
302                            .iter()
303                            .find(|o| o.name == *name)
304                            .map(|o| procedure_value_type_to_arrow(&o.output_type))
305                            .unwrap_or(DataType::Utf8);
306                        fields.push(Field::new(col_name, data_type, true));
307                    }
308                } else if yield_items.is_empty() {
309                    // Void procedure (no YIELD) — no output columns
310                } else {
311                    // Unknown procedure without registry: fallback to Utf8
312                    for (name, alias) in yield_items {
313                        let col_name = alias.as_ref().unwrap_or(name);
314                        fields.push(Field::new(col_name, DataType::Utf8, true));
315                    }
316                }
317            }
318        }
319
320        Arc::new(Schema::new(fields))
321    }
322}
323
324/// Convert an algorithm `ValueType` to an Arrow `DataType`.
325fn value_type_to_arrow(vt: &uni_algo::algo::procedures::ValueType) -> DataType {
326    use uni_algo::algo::procedures::ValueType;
327    match vt {
328        ValueType::Int => DataType::Int64,
329        ValueType::Float => DataType::Float64,
330        ValueType::String => DataType::Utf8,
331        ValueType::Bool => DataType::Boolean,
332        ValueType::List
333        | ValueType::Map
334        | ValueType::Node
335        | ValueType::Relationship
336        | ValueType::Path
337        | ValueType::Any => DataType::Utf8,
338    }
339}
340
341/// Returns true if the ValueType is a complex type that should be JSON-encoded as Utf8
342/// and tagged with `cv_encoded=true` metadata for downstream parsing.
343fn is_complex_value_type(vt: &uni_algo::algo::procedures::ValueType) -> bool {
344    use uni_algo::algo::procedures::ValueType;
345    matches!(
346        vt,
347        ValueType::List
348            | ValueType::Map
349            | ValueType::Node
350            | ValueType::Relationship
351            | ValueType::Path
352    )
353}
354
355/// Convert a `ProcedureValueType` to an Arrow `DataType`.
356fn procedure_value_type_to_arrow(
357    vt: &crate::query::executor::procedure::ProcedureValueType,
358) -> DataType {
359    use crate::query::executor::procedure::ProcedureValueType;
360    match vt {
361        ProcedureValueType::Integer => DataType::Int64,
362        ProcedureValueType::Float | ProcedureValueType::Number => DataType::Float64,
363        ProcedureValueType::Boolean => DataType::Boolean,
364        ProcedureValueType::String | ProcedureValueType::Any => DataType::Utf8,
365    }
366}
367
368impl DisplayAs for GraphProcedureCallExec {
369    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        write!(
371            f,
372            "GraphProcedureCallExec: procedure={}",
373            self.procedure_name
374        )
375    }
376}
377
378impl ExecutionPlan for GraphProcedureCallExec {
379    fn name(&self) -> &str {
380        "GraphProcedureCallExec"
381    }
382
383    fn as_any(&self) -> &dyn Any {
384        self
385    }
386
387    fn schema(&self) -> SchemaRef {
388        self.schema.clone()
389    }
390
391    fn properties(&self) -> &PlanProperties {
392        &self.properties
393    }
394
395    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
396        vec![]
397    }
398
399    fn with_new_children(
400        self: Arc<Self>,
401        children: Vec<Arc<dyn ExecutionPlan>>,
402    ) -> DFResult<Arc<dyn ExecutionPlan>> {
403        if !children.is_empty() {
404            return Err(datafusion::error::DataFusionError::Internal(
405                "GraphProcedureCallExec has no children".to_string(),
406            ));
407        }
408        Ok(self)
409    }
410
411    fn execute(
412        &self,
413        partition: usize,
414        _context: Arc<TaskContext>,
415    ) -> DFResult<SendableRecordBatchStream> {
416        let metrics = BaselineMetrics::new(&self.metrics, partition);
417
418        // Evaluate arguments upfront (outer_values provides MATCH-bound variables)
419        let mut evaluated_args = Vec::with_capacity(self.arguments.len());
420        for arg in &self.arguments {
421            evaluated_args.push(evaluate_simple_expr(arg, &self.params, &self.outer_values)?);
422        }
423
424        Ok(Box::pin(ProcedureCallStream::new(
425            self.graph_ctx.clone(),
426            self.procedure_name.clone(),
427            evaluated_args,
428            self.yield_items.clone(),
429            self.target_properties.clone(),
430            self.schema.clone(),
431            metrics,
432        )))
433    }
434
435    fn metrics(&self) -> Option<MetricsSet> {
436        Some(self.metrics.clone_inner())
437    }
438}
439
440// ---------------------------------------------------------------------------
441// Stream implementation
442// ---------------------------------------------------------------------------
443
444/// State machine for procedure call stream.
445enum ProcedureCallState {
446    /// Initial state, ready to start execution.
447    Init,
448    /// Executing the async procedure.
449    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
450    /// Stream is done.
451    Done,
452}
453
454/// Stream that executes a procedure call.
455struct ProcedureCallStream {
456    graph_ctx: Arc<GraphExecutionContext>,
457    procedure_name: String,
458    evaluated_args: Vec<Value>,
459    yield_items: Vec<(String, Option<String>)>,
460    target_properties: HashMap<String, Vec<String>>,
461    schema: SchemaRef,
462    state: ProcedureCallState,
463    metrics: BaselineMetrics,
464}
465
466impl ProcedureCallStream {
467    fn new(
468        graph_ctx: Arc<GraphExecutionContext>,
469        procedure_name: String,
470        evaluated_args: Vec<Value>,
471        yield_items: Vec<(String, Option<String>)>,
472        target_properties: HashMap<String, Vec<String>>,
473        schema: SchemaRef,
474        metrics: BaselineMetrics,
475    ) -> Self {
476        Self {
477            graph_ctx,
478            procedure_name,
479            evaluated_args,
480            yield_items,
481            target_properties,
482            schema,
483            state: ProcedureCallState::Init,
484            metrics,
485        }
486    }
487}
488
489impl Stream for ProcedureCallStream {
490    type Item = DFResult<RecordBatch>;
491
492    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
493        loop {
494            let state = std::mem::replace(&mut self.state, ProcedureCallState::Done);
495
496            match state {
497                ProcedureCallState::Init => {
498                    let graph_ctx = self.graph_ctx.clone();
499                    let procedure_name = self.procedure_name.clone();
500                    let evaluated_args = self.evaluated_args.clone();
501                    let yield_items = self.yield_items.clone();
502                    let target_properties = self.target_properties.clone();
503                    let schema = self.schema.clone();
504
505                    let fut = async move {
506                        graph_ctx.check_timeout().map_err(|e| {
507                            datafusion::error::DataFusionError::Execution(e.to_string())
508                        })?;
509
510                        execute_procedure(
511                            &graph_ctx,
512                            &procedure_name,
513                            &evaluated_args,
514                            &yield_items,
515                            &target_properties,
516                            &schema,
517                        )
518                        .await
519                    };
520
521                    self.state = ProcedureCallState::Executing(Box::pin(fut));
522                }
523                ProcedureCallState::Executing(mut fut) => match fut.as_mut().poll(cx) {
524                    Poll::Ready(Ok(batch)) => {
525                        self.state = ProcedureCallState::Done;
526                        self.metrics
527                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
528                        return Poll::Ready(batch.map(Ok));
529                    }
530                    Poll::Ready(Err(e)) => {
531                        self.state = ProcedureCallState::Done;
532                        return Poll::Ready(Some(Err(e)));
533                    }
534                    Poll::Pending => {
535                        self.state = ProcedureCallState::Executing(fut);
536                        return Poll::Pending;
537                    }
538                },
539                ProcedureCallState::Done => {
540                    return Poll::Ready(None);
541                }
542            }
543        }
544    }
545}
546
547impl RecordBatchStream for ProcedureCallStream {
548    fn schema(&self) -> SchemaRef {
549        self.schema.clone()
550    }
551}
552
553// ---------------------------------------------------------------------------
554// Procedure execution dispatch
555// ---------------------------------------------------------------------------
556
557/// Execute a procedure and build a RecordBatch result.
558async fn execute_procedure(
559    graph_ctx: &GraphExecutionContext,
560    procedure_name: &str,
561    args: &[Value],
562    yield_items: &[(String, Option<String>)],
563    target_properties: &HashMap<String, Vec<String>>,
564    schema: &SchemaRef,
565) -> DFResult<Option<RecordBatch>> {
566    match procedure_name {
567        "uni.schema.labels" => execute_schema_labels(graph_ctx, yield_items, schema).await,
568        "uni.schema.edgeTypes" | "uni.schema.relationshipTypes" => {
569            execute_schema_edge_types(graph_ctx, yield_items, schema).await
570        }
571        "uni.schema.indexes" => execute_schema_indexes(graph_ctx, yield_items, schema).await,
572        "uni.schema.constraints" => {
573            execute_schema_constraints(graph_ctx, yield_items, schema).await
574        }
575        "uni.schema.labelInfo" => {
576            execute_schema_label_info(graph_ctx, args, yield_items, schema).await
577        }
578        "uni.vector.query" => {
579            execute_vector_query(graph_ctx, args, yield_items, target_properties, schema).await
580        }
581        "uni.fts.query" => {
582            execute_fts_query(graph_ctx, args, yield_items, target_properties, schema).await
583        }
584        "uni.search" => {
585            execute_hybrid_search(graph_ctx, args, yield_items, target_properties, schema).await
586        }
587        name if name.starts_with("uni.algo.") => {
588            execute_algo_procedure(graph_ctx, name, args, yield_items, schema).await
589        }
590        _ => {
591            execute_registered_procedure(graph_ctx, procedure_name, args, yield_items, schema).await
592        }
593    }
594}
595
596// ---------------------------------------------------------------------------
597// Schema procedures
598// ---------------------------------------------------------------------------
599
600async fn execute_schema_labels(
601    graph_ctx: &GraphExecutionContext,
602    yield_items: &[(String, Option<String>)],
603    schema: &SchemaRef,
604) -> DFResult<Option<RecordBatch>> {
605    let uni_schema = graph_ctx.storage().schema_manager().schema();
606    let storage = graph_ctx.storage();
607
608    // Collect rows: one per label
609    let mut rows: Vec<HashMap<String, Value>> = Vec::new();
610    for label_name in uni_schema.labels.keys() {
611        let mut row = HashMap::new();
612        row.insert("label".to_string(), Value::String(label_name.clone()));
613
614        let prop_count = uni_schema
615            .properties
616            .get(label_name)
617            .map(|p| p.len())
618            .unwrap_or(0);
619        row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
620
621        let node_count = if let Ok(ds) = storage.vertex_dataset(label_name) {
622            if let Ok(raw) = ds.open_raw().await {
623                raw.count_rows(None).await.unwrap_or(0)
624            } else {
625                0
626            }
627        } else {
628            0
629        };
630        row.insert("nodeCount".to_string(), Value::Int(node_count as i64));
631
632        let idx_count = uni_schema
633            .indexes
634            .iter()
635            .filter(|i| i.label() == label_name)
636            .count();
637        row.insert("indexCount".to_string(), Value::Int(idx_count as i64));
638
639        rows.push(row);
640    }
641
642    build_scalar_batch(&rows, yield_items, schema)
643}
644
645async fn execute_schema_edge_types(
646    graph_ctx: &GraphExecutionContext,
647    yield_items: &[(String, Option<String>)],
648    schema: &SchemaRef,
649) -> DFResult<Option<RecordBatch>> {
650    let uni_schema = graph_ctx.storage().schema_manager().schema();
651
652    let mut rows: Vec<HashMap<String, Value>> = Vec::new();
653    for (type_name, meta) in &uni_schema.edge_types {
654        let mut row = HashMap::new();
655        row.insert("type".to_string(), Value::String(type_name.clone()));
656        row.insert(
657            "relationshipType".to_string(),
658            Value::String(type_name.clone()),
659        );
660        row.insert(
661            "sourceLabels".to_string(),
662            Value::String(format!("{:?}", meta.src_labels)),
663        );
664        row.insert(
665            "targetLabels".to_string(),
666            Value::String(format!("{:?}", meta.dst_labels)),
667        );
668
669        let prop_count = uni_schema
670            .properties
671            .get(type_name)
672            .map(|p| p.len())
673            .unwrap_or(0);
674        row.insert("propertyCount".to_string(), Value::Int(prop_count as i64));
675
676        rows.push(row);
677    }
678
679    build_scalar_batch(&rows, yield_items, schema)
680}
681
682async fn execute_schema_indexes(
683    graph_ctx: &GraphExecutionContext,
684    yield_items: &[(String, Option<String>)],
685    schema: &SchemaRef,
686) -> DFResult<Option<RecordBatch>> {
687    let uni_schema = graph_ctx.storage().schema_manager().schema();
688
689    let mut rows: Vec<HashMap<String, Value>> = Vec::new();
690    for idx in &uni_schema.indexes {
691        use uni_common::core::schema::IndexDefinition;
692
693        // Extract type name and properties JSON per variant
694        let (type_name, properties_json) = match &idx {
695            IndexDefinition::Vector(v) => (
696                "VECTOR",
697                serde_json::to_string(&[&v.property]).unwrap_or_default(),
698            ),
699            IndexDefinition::FullText(f) => (
700                "FULLTEXT",
701                serde_json::to_string(&f.properties).unwrap_or_default(),
702            ),
703            IndexDefinition::Scalar(s) => (
704                "SCALAR",
705                serde_json::to_string(&s.properties).unwrap_or_default(),
706            ),
707            IndexDefinition::JsonFullText(j) => (
708                "JSON_FTS",
709                serde_json::to_string(&[&j.column]).unwrap_or_default(),
710            ),
711            IndexDefinition::Inverted(inv) => (
712                "INVERTED",
713                serde_json::to_string(&[&inv.property]).unwrap_or_default(),
714            ),
715            _ => ("UNKNOWN", String::new()),
716        };
717
718        let row = HashMap::from([
719            ("state".to_string(), Value::String("ONLINE".to_string())),
720            ("name".to_string(), Value::String(idx.name().to_string())),
721            ("type".to_string(), Value::String(type_name.to_string())),
722            ("label".to_string(), Value::String(idx.label().to_string())),
723            ("properties".to_string(), Value::String(properties_json)),
724        ]);
725        rows.push(row);
726    }
727
728    build_scalar_batch(&rows, yield_items, schema)
729}
730
731async fn execute_schema_constraints(
732    graph_ctx: &GraphExecutionContext,
733    yield_items: &[(String, Option<String>)],
734    schema: &SchemaRef,
735) -> DFResult<Option<RecordBatch>> {
736    let uni_schema = graph_ctx.storage().schema_manager().schema();
737
738    let mut rows: Vec<HashMap<String, Value>> = Vec::new();
739    for c in &uni_schema.constraints {
740        let mut row = HashMap::new();
741        row.insert("name".to_string(), Value::String(c.name.clone()));
742        row.insert("enabled".to_string(), Value::Bool(c.enabled));
743
744        match &c.constraint_type {
745            uni_common::core::schema::ConstraintType::Unique { properties } => {
746                row.insert("type".to_string(), Value::String("UNIQUE".to_string()));
747                row.insert(
748                    "properties".to_string(),
749                    Value::String(serde_json::to_string(&properties).unwrap_or_default()),
750                );
751            }
752            uni_common::core::schema::ConstraintType::Exists { property } => {
753                row.insert("type".to_string(), Value::String("EXISTS".to_string()));
754                row.insert(
755                    "properties".to_string(),
756                    Value::String(serde_json::to_string(&[&property]).unwrap_or_default()),
757                );
758            }
759            uni_common::core::schema::ConstraintType::Check { expression } => {
760                row.insert("type".to_string(), Value::String("CHECK".to_string()));
761                row.insert("expression".to_string(), Value::String(expression.clone()));
762            }
763            _ => {
764                row.insert("type".to_string(), Value::String("UNKNOWN".to_string()));
765            }
766        }
767
768        match &c.target {
769            uni_common::core::schema::ConstraintTarget::Label(l) => {
770                row.insert("label".to_string(), Value::String(l.clone()));
771            }
772            uni_common::core::schema::ConstraintTarget::EdgeType(t) => {
773                row.insert("relationshipType".to_string(), Value::String(t.clone()));
774            }
775            _ => {
776                row.insert("target".to_string(), Value::String("UNKNOWN".to_string()));
777            }
778        }
779
780        rows.push(row);
781    }
782
783    build_scalar_batch(&rows, yield_items, schema)
784}
785
786async fn execute_schema_label_info(
787    graph_ctx: &GraphExecutionContext,
788    args: &[Value],
789    yield_items: &[(String, Option<String>)],
790    schema: &SchemaRef,
791) -> DFResult<Option<RecordBatch>> {
792    let label_name = require_string_arg(args, 0, "uni.schema.labelInfo: first argument (label)")?;
793
794    let uni_schema = graph_ctx.storage().schema_manager().schema();
795
796    let mut rows: Vec<HashMap<String, Value>> = Vec::new();
797    if let Some(props) = uni_schema.properties.get(&label_name) {
798        for (prop_name, prop_meta) in props {
799            let mut row = HashMap::new();
800            row.insert("property".to_string(), Value::String(prop_name.clone()));
801            row.insert(
802                "dataType".to_string(),
803                Value::String(format!("{:?}", prop_meta.r#type)),
804            );
805            row.insert("nullable".to_string(), Value::Bool(prop_meta.nullable));
806
807            let is_indexed = uni_schema.indexes.iter().any(|idx| match idx {
808                uni_common::core::schema::IndexDefinition::Vector(v) => {
809                    v.label == label_name && v.property == *prop_name
810                }
811                uni_common::core::schema::IndexDefinition::Scalar(s) => {
812                    s.label == label_name && s.properties.contains(prop_name)
813                }
814                uni_common::core::schema::IndexDefinition::FullText(f) => {
815                    f.label == label_name && f.properties.contains(prop_name)
816                }
817                uni_common::core::schema::IndexDefinition::Inverted(inv) => {
818                    inv.label == label_name && inv.property == *prop_name
819                }
820                uni_common::core::schema::IndexDefinition::JsonFullText(j) => j.label == label_name,
821                _ => false,
822            });
823            row.insert("indexed".to_string(), Value::Bool(is_indexed));
824
825            let unique = uni_schema.constraints.iter().any(|c| {
826                if let uni_common::core::schema::ConstraintTarget::Label(l) = &c.target
827                    && l == &label_name
828                    && c.enabled
829                    && let uni_common::core::schema::ConstraintType::Unique { properties } =
830                        &c.constraint_type
831                {
832                    return properties.contains(prop_name);
833                }
834                false
835            });
836            row.insert("unique".to_string(), Value::Bool(unique));
837
838            rows.push(row);
839        }
840    }
841
842    build_scalar_batch(&rows, yield_items, schema)
843}
844
845/// Build a typed Arrow column from an iterator of optional `Value`s.
846///
847/// Dispatches on `data_type` to build the appropriate Arrow array. For types
848/// not explicitly handled (Utf8 fallback), values are stringified.
849fn build_typed_column<'a>(
850    values: impl Iterator<Item = Option<&'a Value>>,
851    num_rows: usize,
852    data_type: &DataType,
853) -> ArrayRef {
854    match data_type {
855        DataType::Int64 => {
856            let mut builder = Int64Builder::with_capacity(num_rows);
857            for val in values {
858                match val.and_then(|v| v.as_i64()) {
859                    Some(i) => builder.append_value(i),
860                    None => builder.append_null(),
861                }
862            }
863            Arc::new(builder.finish())
864        }
865        DataType::Float64 => {
866            let mut builder = Float64Builder::with_capacity(num_rows);
867            for val in values {
868                match val.and_then(|v| v.as_f64()) {
869                    Some(f) => builder.append_value(f),
870                    None => builder.append_null(),
871                }
872            }
873            Arc::new(builder.finish())
874        }
875        DataType::Boolean => {
876            let mut builder = BooleanBuilder::with_capacity(num_rows);
877            for val in values {
878                match val.and_then(|v| v.as_bool()) {
879                    Some(b) => builder.append_value(b),
880                    None => builder.append_null(),
881                }
882            }
883            Arc::new(builder.finish())
884        }
885        _ => {
886            // Utf8 fallback: stringify values
887            let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
888            for val in values {
889                match val {
890                    Some(Value::String(s)) => builder.append_value(s),
891                    Some(v) => builder.append_value(format!("{v}")),
892                    None => builder.append_null(),
893                }
894            }
895            Arc::new(builder.finish())
896        }
897    }
898}
899
900/// Create an empty RecordBatch for the given schema.
901///
902/// When a schema has zero fields, `RecordBatch::new_empty()` panics because it
903/// cannot determine the row count from an empty array. This helper handles that
904/// edge case by using `RecordBatchOptions::with_row_count(0)`.
905fn create_empty_batch(schema: SchemaRef) -> DFResult<RecordBatch> {
906    if schema.fields().is_empty() {
907        let options = arrow_array::RecordBatchOptions::new().with_row_count(Some(0));
908        RecordBatch::try_new_with_options(schema, vec![], &options).map_err(arrow_err)
909    } else {
910        Ok(RecordBatch::new_empty(schema))
911    }
912}
913
914/// Build a RecordBatch from scalar-valued rows for schema procedures.
915fn build_scalar_batch(
916    rows: &[HashMap<String, Value>],
917    yield_items: &[(String, Option<String>)],
918    schema: &SchemaRef,
919) -> DFResult<Option<RecordBatch>> {
920    if rows.is_empty() {
921        return Ok(Some(create_empty_batch(schema.clone())?));
922    }
923
924    let num_rows = rows.len();
925    let mut columns: Vec<ArrayRef> = Vec::new();
926
927    for (idx, (name, _alias)) in yield_items.iter().enumerate() {
928        let field = schema.field(idx);
929        let values = rows.iter().map(|row| row.get(name));
930        columns.push(build_typed_column(values, num_rows, field.data_type()));
931    }
932
933    let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
934    Ok(Some(batch))
935}
936
937// ---------------------------------------------------------------------------
938// External/registered procedures
939// ---------------------------------------------------------------------------
940
941/// Execute an externally registered procedure (e.g., TCK test procedures).
942///
943/// Looks up the procedure in the `ProcedureRegistry`, evaluates arguments,
944/// filters data rows by matching input columns, and projects output columns.
945async fn execute_registered_procedure(
946    graph_ctx: &GraphExecutionContext,
947    procedure_name: &str,
948    args: &[Value],
949    yield_items: &[(String, Option<String>)],
950    schema: &SchemaRef,
951) -> DFResult<Option<RecordBatch>> {
952    let registry = graph_ctx.procedure_registry().ok_or_else(|| {
953        datafusion::error::DataFusionError::Execution(format!(
954            "Procedure '{}' not supported in DataFusion engine (no procedure registry)",
955            procedure_name
956        ))
957    })?;
958
959    let proc_def = registry.get(procedure_name).ok_or_else(|| {
960        datafusion::error::DataFusionError::Execution(format!(
961            "ProcedureNotFound: Unknown procedure '{}'",
962            procedure_name
963        ))
964    })?;
965
966    // Validate argument count
967    if args.len() != proc_def.params.len() {
968        return Err(datafusion::error::DataFusionError::Execution(format!(
969            "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
970            proc_def.name,
971            proc_def.params.len(),
972            args.len()
973        )));
974    }
975
976    // Validate argument types
977    for (i, (arg_val, param)) in args.iter().zip(&proc_def.params).enumerate() {
978        if !arg_val.is_null() && !check_proc_type_compatible(arg_val, &param.param_type) {
979            return Err(datafusion::error::DataFusionError::Execution(format!(
980                "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
981                i, param.name, proc_def.name
982            )));
983        }
984    }
985
986    // Filter data rows: keep rows where input columns match the provided args
987    let filtered: Vec<&HashMap<String, Value>> = proc_def
988        .data
989        .iter()
990        .filter(|row| {
991            for (param, arg_val) in proc_def.params.iter().zip(args) {
992                if let Some(row_val) = row.get(&param.name)
993                    && !proc_values_match(row_val, arg_val)
994                {
995                    return false;
996                }
997            }
998            true
999        })
1000        .collect();
1001
1002    // If the procedure has no yield items (void procedure), return empty batch
1003    if yield_items.is_empty() {
1004        return Ok(Some(create_empty_batch(schema.clone())?));
1005    }
1006
1007    if filtered.is_empty() {
1008        return Ok(Some(create_empty_batch(schema.clone())?));
1009    }
1010
1011    // Project output columns based on yield items
1012    // We need to map yield names back to output column names in the procedure definition
1013    let num_rows = filtered.len();
1014    let mut columns: Vec<ArrayRef> = Vec::new();
1015
1016    for (idx, (name, _alias)) in yield_items.iter().enumerate() {
1017        let field = schema.field(idx);
1018        let values = filtered.iter().map(|row| row.get(name.as_str()));
1019        columns.push(build_typed_column(values, num_rows, field.data_type()));
1020    }
1021
1022    let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1023    Ok(Some(batch))
1024}
1025
1026/// Checks whether a value is compatible with a procedure type (DF engine version).
1027fn check_proc_type_compatible(
1028    val: &Value,
1029    expected: &crate::query::executor::procedure::ProcedureValueType,
1030) -> bool {
1031    use crate::query::executor::procedure::ProcedureValueType;
1032    match expected {
1033        ProcedureValueType::Any => true,
1034        ProcedureValueType::String => val.is_string(),
1035        ProcedureValueType::Boolean => val.is_bool(),
1036        ProcedureValueType::Integer => val.is_i64(),
1037        ProcedureValueType::Float => val.is_f64() || val.is_i64(),
1038        ProcedureValueType::Number => val.is_number(),
1039    }
1040}
1041
1042/// Checks whether two values match for input-column filtering (DF engine version).
1043fn proc_values_match(row_val: &Value, arg_val: &Value) -> bool {
1044    if arg_val.is_null() || row_val.is_null() {
1045        return arg_val.is_null() && row_val.is_null();
1046    }
1047    // Compare numbers by f64 to handle int/float cross-comparison
1048    if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
1049        return (a - b).abs() < f64::EPSILON;
1050    }
1051    row_val == arg_val
1052}
1053
1054// ---------------------------------------------------------------------------
1055// Algorithm procedures
1056// ---------------------------------------------------------------------------
1057
1058async fn execute_algo_procedure(
1059    graph_ctx: &GraphExecutionContext,
1060    procedure_name: &str,
1061    args: &[Value],
1062    yield_items: &[(String, Option<String>)],
1063    schema: &SchemaRef,
1064) -> DFResult<Option<RecordBatch>> {
1065    use futures::StreamExt;
1066    use uni_algo::algo::procedures::AlgoContext;
1067
1068    let registry = graph_ctx.algo_registry().ok_or_else(|| {
1069        datafusion::error::DataFusionError::Execution(
1070            "Algorithm registry not available".to_string(),
1071        )
1072    })?;
1073
1074    let procedure = registry.get(procedure_name).ok_or_else(|| {
1075        datafusion::error::DataFusionError::Execution(format!(
1076            "Unknown algorithm: {}",
1077            procedure_name
1078        ))
1079    })?;
1080
1081    let signature = procedure.signature();
1082
1083    // Convert uni_common::Value args to serde_json::Value for algo crate.
1084    // Note: do NOT call validate_args here — the procedure's own execute()
1085    // already validates and fills defaults internally.
1086    let serde_args: Vec<serde_json::Value> = args.iter().cloned().map(|v| v.into()).collect();
1087
1088    // Build AlgoContext with L0 visibility so algorithms see uncommitted-but-committed data.
1089    let l0_mgr = {
1090        let l0_ctx = graph_ctx.l0_context();
1091        l0_ctx.current_l0.as_ref().map(|current| {
1092            let mut pending = l0_ctx.pending_flush_l0s.clone();
1093            if let Some(tx_l0) = &l0_ctx.transaction_l0 {
1094                pending.push(tx_l0.clone());
1095            }
1096            Arc::new(uni_store::runtime::l0_manager::L0Manager::from_snapshot(
1097                current.clone(),
1098                pending,
1099            ))
1100        })
1101    };
1102    let algo_ctx = AlgoContext::new(graph_ctx.storage().clone(), l0_mgr);
1103
1104    // Execute and collect stream
1105    let mut stream = procedure.execute(algo_ctx, serde_args);
1106    let mut rows = Vec::new();
1107    while let Some(row_res) = stream.next().await {
1108        // Check timeout periodically
1109        if rows.len() % 1000 == 0 {
1110            graph_ctx
1111                .check_timeout()
1112                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1113        }
1114        let row =
1115            row_res.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1116        rows.push(row);
1117    }
1118
1119    build_algo_batch(&rows, &signature, yield_items, schema)
1120}
1121
1122/// Convert a `serde_json::Value` to a `uni_common::Value` for column building.
1123fn json_to_value(jv: &serde_json::Value) -> Value {
1124    match jv {
1125        serde_json::Value::Null => Value::Null,
1126        serde_json::Value::Bool(b) => Value::Bool(*b),
1127        serde_json::Value::Number(n) => {
1128            if let Some(i) = n.as_i64() {
1129                Value::Int(i)
1130            } else if let Some(f) = n.as_f64() {
1131                Value::Float(f)
1132            } else {
1133                Value::Null
1134            }
1135        }
1136        serde_json::Value::String(s) => Value::String(s.clone()),
1137        other => Value::String(other.to_string()),
1138    }
1139}
1140
1141/// Build a RecordBatch from algorithm result rows.
1142fn build_algo_batch(
1143    rows: &[uni_algo::algo::procedures::AlgoResultRow],
1144    signature: &uni_algo::algo::procedures::ProcedureSignature,
1145    yield_items: &[(String, Option<String>)],
1146    schema: &SchemaRef,
1147) -> DFResult<Option<RecordBatch>> {
1148    if rows.is_empty() {
1149        return Ok(Some(create_empty_batch(schema.clone())?));
1150    }
1151
1152    let num_rows = rows.len();
1153    let mut columns: Vec<ArrayRef> = Vec::new();
1154
1155    for (idx, (yield_name, _alias)) in yield_items.iter().enumerate() {
1156        let sig_idx = signature
1157            .yields
1158            .iter()
1159            .position(|(n, _)| *n == yield_name.as_str());
1160
1161        // Convert serde_json values to uni_common::Value for the shared column builder
1162        let uni_values: Vec<Value> = rows
1163            .iter()
1164            .map(|row| match sig_idx {
1165                Some(si) => json_to_value(&row.values[si]),
1166                None => Value::Null,
1167            })
1168            .collect();
1169
1170        let field = schema.field(idx);
1171        let values = uni_values.iter().map(Some);
1172        columns.push(build_typed_column(values, num_rows, field.data_type()));
1173    }
1174
1175    let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1176    Ok(Some(batch))
1177}
1178
1179// ---------------------------------------------------------------------------
1180// Shared search argument helpers
1181// ---------------------------------------------------------------------------
1182
1183/// Extract a required string argument from the argument list at a given position.
1184fn require_string_arg(args: &[Value], index: usize, description: &str) -> DFResult<String> {
1185    args.get(index)
1186        .and_then(|v| v.as_str())
1187        .map(|s| s.to_string())
1188        .ok_or_else(|| {
1189            datafusion::error::DataFusionError::Execution(format!("{description} must be a string"))
1190        })
1191}
1192
1193/// Extract an optional filter string from the argument list.
1194/// Returns `None` if the argument is missing, null, or not a string.
1195fn extract_optional_filter(args: &[Value], index: usize) -> Option<String> {
1196    args.get(index).and_then(|v| {
1197        if v.is_null() {
1198            None
1199        } else {
1200            v.as_str().map(|s| s.to_string())
1201        }
1202    })
1203}
1204
1205/// Extract an optional float threshold from the argument list.
1206/// Returns `None` if the argument is missing or null.
1207fn extract_optional_threshold(args: &[Value], index: usize) -> Option<f64> {
1208    args.get(index)
1209        .and_then(|v| if v.is_null() { None } else { v.as_f64() })
1210}
1211
1212/// Extract a required integer argument from the argument list at a given position.
1213fn require_int_arg(args: &[Value], index: usize, description: &str) -> DFResult<usize> {
1214    args.get(index)
1215        .and_then(|v| v.as_u64())
1216        .map(|v| v as usize)
1217        .ok_or_else(|| {
1218            datafusion::error::DataFusionError::Execution(format!(
1219                "{description} must be an integer"
1220            ))
1221        })
1222}
1223
1224// ---------------------------------------------------------------------------
1225// Vector/FTS/Hybrid search procedures
1226// ---------------------------------------------------------------------------
1227
1228/// Auto-embed a text query using the vector index's embedding configuration.
1229///
1230/// Looks up the embedding config from the index on `label.property` and uses
1231/// it to embed the provided text query into a vector.
1232async fn auto_embed_text(
1233    graph_ctx: &GraphExecutionContext,
1234    label: &str,
1235    property: &str,
1236    query_text: &str,
1237) -> DFResult<Vec<f32>> {
1238    let storage = graph_ctx.storage();
1239    let uni_schema = storage.schema_manager().schema();
1240    let index_config = uni_schema.vector_index_for_property(label, property);
1241
1242    let embedding_config = index_config
1243        .and_then(|cfg| cfg.embedding_config.as_ref())
1244        .ok_or_else(|| {
1245            datafusion::error::DataFusionError::Execution(format!(
1246                "Cannot auto-embed: vector index for {label}.{property} has no embedding_config. \
1247                 Either provide a pre-computed vector or create the index with embedding options."
1248            ))
1249        })?;
1250
1251    let runtime = graph_ctx.xervo_runtime().ok_or_else(|| {
1252        datafusion::error::DataFusionError::Execution(
1253            "Cannot auto-embed: Uni-Xervo runtime not configured".to_string(),
1254        )
1255    })?;
1256
1257    let embedder = runtime
1258        .embedding(&embedding_config.alias)
1259        .await
1260        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1261    let embeddings = embedder
1262        .embed(vec![query_text])
1263        .await
1264        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1265    embeddings.into_iter().next().ok_or_else(|| {
1266        datafusion::error::DataFusionError::Execution(
1267            "Embedding service returned no results".to_string(),
1268        )
1269    })
1270}
1271
1272async fn execute_vector_query(
1273    graph_ctx: &GraphExecutionContext,
1274    args: &[Value],
1275    yield_items: &[(String, Option<String>)],
1276    target_properties: &HashMap<String, Vec<String>>,
1277    schema: &SchemaRef,
1278) -> DFResult<Option<RecordBatch>> {
1279    let label = require_string_arg(args, 0, "uni.vector.query: first argument (label)")?;
1280    let property = require_string_arg(args, 1, "uni.vector.query: second argument (property)")?;
1281
1282    let query_val = args.get(2).ok_or_else(|| {
1283        datafusion::error::DataFusionError::Execution(
1284            "uni.vector.query: third argument (query) is required".to_string(),
1285        )
1286    })?;
1287
1288    let storage = graph_ctx.storage();
1289
1290    let query_vector: Vec<f32> = if let Some(query_text) = query_val.as_str() {
1291        auto_embed_text(graph_ctx, &label, &property, query_text).await?
1292    } else {
1293        extract_vector(query_val)?
1294    };
1295
1296    let k = require_int_arg(args, 3, "uni.vector.query: fourth argument (k)")?;
1297    let filter = extract_optional_filter(args, 4);
1298    let threshold = extract_optional_threshold(args, 5);
1299    let query_ctx = graph_ctx.query_context();
1300
1301    let mut results = storage
1302        .vector_search(
1303            &label,
1304            &property,
1305            &query_vector,
1306            k,
1307            filter.as_deref(),
1308            Some(&query_ctx),
1309        )
1310        .await
1311        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1312
1313    // Apply threshold post-filter (on distance)
1314    if let Some(max_dist) = threshold {
1315        results.retain(|(_, dist)| *dist <= max_dist as f32);
1316    }
1317
1318    if results.is_empty() {
1319        return Ok(Some(create_empty_batch(schema.clone())?));
1320    }
1321
1322    // Calculate scores using the same logic as the old executor
1323    let schema_manager = storage.schema_manager();
1324    let uni_schema = schema_manager.schema();
1325    let metric = uni_schema
1326        .vector_index_for_property(&label, &property)
1327        .map(|config| config.metric.clone())
1328        .unwrap_or(DistanceMetric::L2);
1329
1330    build_search_result_batch(
1331        &results,
1332        &label,
1333        &metric,
1334        yield_items,
1335        target_properties,
1336        graph_ctx,
1337        schema,
1338    )
1339    .await
1340}
1341
1342// ---------------------------------------------------------------------------
1343// FTS search procedure
1344// ---------------------------------------------------------------------------
1345
1346async fn execute_fts_query(
1347    graph_ctx: &GraphExecutionContext,
1348    args: &[Value],
1349    yield_items: &[(String, Option<String>)],
1350    target_properties: &HashMap<String, Vec<String>>,
1351    schema: &SchemaRef,
1352) -> DFResult<Option<RecordBatch>> {
1353    let label = require_string_arg(args, 0, "uni.fts.query: first argument (label)")?;
1354    let property = require_string_arg(args, 1, "uni.fts.query: second argument (property)")?;
1355    let search_term = require_string_arg(args, 2, "uni.fts.query: third argument (search_term)")?;
1356    let k = require_int_arg(args, 3, "uni.fts.query: fourth argument (k)")?;
1357    let filter = extract_optional_filter(args, 4);
1358    let threshold = extract_optional_threshold(args, 5);
1359
1360    let storage = graph_ctx.storage();
1361    let query_ctx = graph_ctx.query_context();
1362
1363    let mut results = storage
1364        .fts_search(
1365            &label,
1366            &property,
1367            &search_term,
1368            k,
1369            filter.as_deref(),
1370            Some(&query_ctx),
1371        )
1372        .await
1373        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1374
1375    if let Some(min_score) = threshold {
1376        results.retain(|(_, score)| *score as f64 >= min_score);
1377    }
1378
1379    if results.is_empty() {
1380        return Ok(Some(create_empty_batch(schema.clone())?));
1381    }
1382
1383    // FTS uses a "fake" L2 metric for the batch builder — scores are already BM25
1384    // We use L2 as a placeholder; the actual score column is built differently.
1385    build_search_result_batch(
1386        &results,
1387        &label,
1388        &DistanceMetric::L2,
1389        yield_items,
1390        target_properties,
1391        graph_ctx,
1392        schema,
1393    )
1394    .await
1395}
1396
1397// ---------------------------------------------------------------------------
1398// Hybrid search procedure
1399// ---------------------------------------------------------------------------
1400
1401async fn execute_hybrid_search(
1402    graph_ctx: &GraphExecutionContext,
1403    args: &[Value],
1404    yield_items: &[(String, Option<String>)],
1405    target_properties: &HashMap<String, Vec<String>>,
1406    schema: &SchemaRef,
1407) -> DFResult<Option<RecordBatch>> {
1408    let label = require_string_arg(args, 0, "uni.search: first argument (label)")?;
1409
1410    // Parse properties: {vector: '...', fts: '...'} or just a string
1411    let properties_val = args.get(1).ok_or_else(|| {
1412        datafusion::error::DataFusionError::Execution(
1413            "uni.search: second argument (properties) is required".to_string(),
1414        )
1415    })?;
1416
1417    let (vector_prop, fts_prop) = if let Some(obj) = properties_val.as_object() {
1418        let vec_prop = obj
1419            .get("vector")
1420            .and_then(|v| v.as_str())
1421            .map(|s| s.to_string());
1422        let fts_prop = obj
1423            .get("fts")
1424            .and_then(|v| v.as_str())
1425            .map(|s| s.to_string());
1426        (vec_prop, fts_prop)
1427    } else if let Some(prop) = properties_val.as_str() {
1428        // Shorthand: just property name means both vector and FTS
1429        (Some(prop.to_string()), Some(prop.to_string()))
1430    } else {
1431        return Err(datafusion::error::DataFusionError::Execution(
1432            "Properties must be an object {vector: '...', fts: '...'} or a string".to_string(),
1433        ));
1434    };
1435
1436    let query_text = require_string_arg(args, 2, "uni.search: third argument (query_text)")?;
1437
1438    // Arg 3: query vector (optional, can be null)
1439    let query_vector: Option<Vec<f32>> = args.get(3).and_then(|v| {
1440        if v.is_null() {
1441            return None;
1442        }
1443        v.as_array().map(|arr| {
1444            arr.iter()
1445                .filter_map(|v| v.as_f64().map(|f| f as f32))
1446                .collect()
1447        })
1448    });
1449
1450    let k = require_int_arg(args, 4, "uni.search: fifth argument (k)")?;
1451    let filter = extract_optional_filter(args, 5);
1452
1453    // Arg 6: options (optional)
1454    let options_val = args.get(6);
1455    let options_map = options_val.and_then(|v| v.as_object());
1456    let fusion_method = options_map
1457        .and_then(|m| m.get("method"))
1458        .and_then(|v| v.as_str())
1459        .unwrap_or("rrf")
1460        .to_string();
1461    let alpha = options_map
1462        .and_then(|m| m.get("alpha"))
1463        .and_then(|v| v.as_f64())
1464        .unwrap_or(0.5) as f32;
1465    let over_fetch_factor = options_map
1466        .and_then(|m| m.get("over_fetch"))
1467        .and_then(|v| v.as_f64())
1468        .unwrap_or(2.0) as f32;
1469    let rrf_k = options_map
1470        .and_then(|m| m.get("rrf_k"))
1471        .and_then(|v| v.as_u64())
1472        .unwrap_or(60) as usize;
1473
1474    let over_fetch_k = (k as f32 * over_fetch_factor).ceil() as usize;
1475
1476    let storage = graph_ctx.storage();
1477    let query_ctx = graph_ctx.query_context();
1478
1479    // Execute vector search if configured
1480    let mut vector_results: Vec<(Vid, f32)> = Vec::new();
1481    if let Some(ref vec_prop) = vector_prop {
1482        // Get or generate query vector
1483        let qvec = if let Some(ref v) = query_vector {
1484            v.clone()
1485        } else {
1486            // Auto-embed the query text if embedding config exists
1487            auto_embed_text(graph_ctx, &label, vec_prop, &query_text)
1488                .await
1489                .unwrap_or_default()
1490        };
1491
1492        if !qvec.is_empty() {
1493            vector_results = storage
1494                .vector_search(
1495                    &label,
1496                    vec_prop,
1497                    &qvec,
1498                    over_fetch_k,
1499                    filter.as_deref(),
1500                    Some(&query_ctx),
1501                )
1502                .await
1503                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1504        }
1505    }
1506
1507    // Execute FTS search if configured
1508    let mut fts_results: Vec<(Vid, f32)> = Vec::new();
1509    if let Some(ref fts_prop) = fts_prop {
1510        fts_results = storage
1511            .fts_search(
1512                &label,
1513                fts_prop,
1514                &query_text,
1515                over_fetch_k,
1516                filter.as_deref(),
1517                Some(&query_ctx),
1518            )
1519            .await
1520            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
1521    }
1522
1523    // Fuse results
1524    let fused_results = match fusion_method.as_str() {
1525        "weighted" => fuse_weighted(&vector_results, &fts_results, alpha),
1526        _ => fuse_rrf(&vector_results, &fts_results, rrf_k),
1527    };
1528
1529    // Limit to k results
1530    let final_results: Vec<_> = fused_results.into_iter().take(k).collect();
1531
1532    if final_results.is_empty() {
1533        return Ok(Some(create_empty_batch(schema.clone())?));
1534    }
1535
1536    // Build lookup maps for original scores
1537    let vec_score_map: HashMap<Vid, f32> = vector_results.iter().cloned().collect();
1538    let fts_score_map: HashMap<Vid, f32> = fts_results.iter().cloned().collect();
1539    let fts_max = fts_results.iter().map(|(_, s)| *s).fold(0.0f32, f32::max);
1540
1541    // Get distance metric for vector score normalization
1542    let uni_schema = storage.schema_manager().schema();
1543    let metric = vector_prop
1544        .as_ref()
1545        .and_then(|vp| {
1546            uni_schema
1547                .vector_index_for_property(&label, vp)
1548                .map(|config| config.metric.clone())
1549        })
1550        .unwrap_or(DistanceMetric::L2);
1551
1552    let score_ctx = HybridScoreContext {
1553        vec_score_map: &vec_score_map,
1554        fts_score_map: &fts_score_map,
1555        fts_max,
1556        metric: &metric,
1557    };
1558
1559    build_hybrid_search_batch(
1560        &final_results,
1561        &score_ctx,
1562        &label,
1563        yield_items,
1564        target_properties,
1565        graph_ctx,
1566        schema,
1567    )
1568    .await
1569}
1570
1571/// Reciprocal Rank Fusion (RRF) for combining search results.
1572/// Delegates to the shared `fusion` module.
1573fn fuse_rrf(vec_results: &[(Vid, f32)], fts_results: &[(Vid, f32)], k: usize) -> Vec<(Vid, f32)> {
1574    crate::query::fusion::fuse_rrf(vec_results, fts_results, k)
1575}
1576
1577/// Weighted fusion: alpha * vec_score + (1 - alpha) * fts_score.
1578/// Delegates to the shared `fusion` module.
1579fn fuse_weighted(
1580    vec_results: &[(Vid, f32)],
1581    fts_results: &[(Vid, f32)],
1582    alpha: f32,
1583) -> Vec<(Vid, f32)> {
1584    crate::query::fusion::fuse_weighted(vec_results, fts_results, alpha)
1585}
1586
1587/// Precomputed score context for hybrid search batch building.
1588struct HybridScoreContext<'a> {
1589    vec_score_map: &'a HashMap<Vid, f32>,
1590    fts_score_map: &'a HashMap<Vid, f32>,
1591    fts_max: f32,
1592    metric: &'a DistanceMetric,
1593}
1594
1595/// Build a RecordBatch for hybrid search results with fused, vector, and FTS scores.
1596async fn build_hybrid_search_batch(
1597    results: &[(Vid, f32)],
1598    scores: &HybridScoreContext<'_>,
1599    label: &str,
1600    yield_items: &[(String, Option<String>)],
1601    target_properties: &HashMap<String, Vec<String>>,
1602    graph_ctx: &GraphExecutionContext,
1603    schema: &SchemaRef,
1604) -> DFResult<Option<RecordBatch>> {
1605    let num_rows = results.len();
1606    let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1607    let fused_scores: Vec<f32> = results.iter().map(|(_, s)| *s).collect();
1608
1609    // Pre-load properties for node-like yields
1610    let property_manager = graph_ctx.property_manager();
1611    let query_ctx = graph_ctx.query_context();
1612    let uni_schema = graph_ctx.storage().schema_manager().schema();
1613    let label_props = uni_schema.properties.get(label);
1614
1615    let has_node_yield = yield_items
1616        .iter()
1617        .any(|(name, _)| map_yield_to_canonical(name) == "node");
1618
1619    let props_map = if has_node_yield {
1620        property_manager
1621            .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1622            .await
1623            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1624    } else {
1625        HashMap::new()
1626    };
1627
1628    let mut columns: Vec<ArrayRef> = Vec::new();
1629
1630    for (name, alias) in yield_items {
1631        let output_name = alias.as_ref().unwrap_or(name);
1632        let canonical = map_yield_to_canonical(name);
1633
1634        match canonical.as_str() {
1635            "node" => {
1636                columns.extend(build_node_yield_columns(
1637                    &vids,
1638                    label,
1639                    output_name,
1640                    target_properties,
1641                    &props_map,
1642                    label_props,
1643                )?);
1644            }
1645            "vid" => {
1646                let mut builder = Int64Builder::with_capacity(num_rows);
1647                for vid in &vids {
1648                    builder.append_value(vid.as_u64() as i64);
1649                }
1650                columns.push(Arc::new(builder.finish()));
1651            }
1652            "score" => {
1653                let mut builder = Float32Builder::with_capacity(num_rows);
1654                for score in &fused_scores {
1655                    builder.append_value(*score);
1656                }
1657                columns.push(Arc::new(builder.finish()));
1658            }
1659            "vector_score" => {
1660                let mut builder = Float32Builder::with_capacity(num_rows);
1661                for vid in &vids {
1662                    if let Some(&dist) = scores.vec_score_map.get(vid) {
1663                        let score = calculate_score(dist, scores.metric);
1664                        builder.append_value(score);
1665                    } else {
1666                        builder.append_null();
1667                    }
1668                }
1669                columns.push(Arc::new(builder.finish()));
1670            }
1671            "fts_score" => {
1672                let mut builder = Float32Builder::with_capacity(num_rows);
1673                for vid in &vids {
1674                    if let Some(&raw_score) = scores.fts_score_map.get(vid) {
1675                        let norm = if scores.fts_max > 0.0 {
1676                            raw_score / scores.fts_max
1677                        } else {
1678                            0.0
1679                        };
1680                        builder.append_value(norm);
1681                    } else {
1682                        builder.append_null();
1683                    }
1684                }
1685                columns.push(Arc::new(builder.finish()));
1686            }
1687            "distance" => {
1688                // For hybrid search, distance is the vector distance if available
1689                let mut builder = Float64Builder::with_capacity(num_rows);
1690                for vid in &vids {
1691                    if let Some(&dist) = scores.vec_score_map.get(vid) {
1692                        builder.append_value(dist as f64);
1693                    } else {
1694                        builder.append_null();
1695                    }
1696                }
1697                columns.push(Arc::new(builder.finish()));
1698            }
1699            _ => {
1700                let mut builder = StringBuilder::with_capacity(num_rows, 0);
1701                for _ in 0..num_rows {
1702                    builder.append_null();
1703                }
1704                columns.push(Arc::new(builder.finish()));
1705            }
1706        }
1707    }
1708
1709    let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1710    Ok(Some(batch))
1711}
1712
1713// ---------------------------------------------------------------------------
1714// Shared search result batch builder
1715// ---------------------------------------------------------------------------
1716
1717/// Build a RecordBatch for search procedures (vector, FTS) that yield
1718/// both node-like and scalar columns.
1719async fn build_search_result_batch(
1720    results: &[(Vid, f32)],
1721    label: &str,
1722    metric: &DistanceMetric,
1723    yield_items: &[(String, Option<String>)],
1724    target_properties: &HashMap<String, Vec<String>>,
1725    graph_ctx: &GraphExecutionContext,
1726    schema: &SchemaRef,
1727) -> DFResult<Option<RecordBatch>> {
1728    let num_rows = results.len();
1729    let vids: Vec<Vid> = results.iter().map(|(vid, _)| *vid).collect();
1730    let distances: Vec<f32> = results.iter().map(|(_, d)| *d).collect();
1731
1732    // Pre-compute scores
1733    let scores: Vec<f32> = distances
1734        .iter()
1735        .map(|dist| calculate_score(*dist, metric))
1736        .collect();
1737
1738    // Pre-load properties for all node-like yields
1739    let property_manager = graph_ctx.property_manager();
1740    let query_ctx = graph_ctx.query_context();
1741    let uni_schema = graph_ctx.storage().schema_manager().schema();
1742    let label_props = uni_schema.properties.get(label);
1743
1744    // Load properties if any node-like yield needs them
1745    let has_node_yield = yield_items
1746        .iter()
1747        .any(|(name, _)| map_yield_to_canonical(name) == "node");
1748
1749    let props_map = if has_node_yield {
1750        property_manager
1751            .get_batch_vertex_props_for_label(&vids, label, Some(&query_ctx))
1752            .await
1753            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
1754    } else {
1755        HashMap::new()
1756    };
1757
1758    // Build columns in schema order
1759    let mut columns: Vec<ArrayRef> = Vec::new();
1760
1761    for (name, alias) in yield_items {
1762        let output_name = alias.as_ref().unwrap_or(name);
1763        let canonical = map_yield_to_canonical(name);
1764
1765        match canonical.as_str() {
1766            "node" => {
1767                columns.extend(build_node_yield_columns(
1768                    &vids,
1769                    label,
1770                    output_name,
1771                    target_properties,
1772                    &props_map,
1773                    label_props,
1774                )?);
1775            }
1776            "distance" => {
1777                let mut builder = Float64Builder::with_capacity(num_rows);
1778                for dist in &distances {
1779                    builder.append_value(*dist as f64);
1780                }
1781                columns.push(Arc::new(builder.finish()));
1782            }
1783            "score" => {
1784                let mut builder = Float32Builder::with_capacity(num_rows);
1785                for score in &scores {
1786                    builder.append_value(*score);
1787                }
1788                columns.push(Arc::new(builder.finish()));
1789            }
1790            "vid" => {
1791                let mut builder = Int64Builder::with_capacity(num_rows);
1792                for vid in &vids {
1793                    builder.append_value(vid.as_u64() as i64);
1794                }
1795                columns.push(Arc::new(builder.finish()));
1796            }
1797            _ => {
1798                // Unknown yield — emit nulls
1799                let mut builder = StringBuilder::with_capacity(num_rows, 0);
1800                for _ in 0..num_rows {
1801                    builder.append_null();
1802                }
1803                columns.push(Arc::new(builder.finish()));
1804            }
1805        }
1806    }
1807
1808    let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1809    Ok(Some(batch))
1810}
1811
1812// ---------------------------------------------------------------------------
1813// Helpers
1814// ---------------------------------------------------------------------------
1815
1816/// Build the node-yield columns (_vid, variable, _labels, property columns) shared by
1817/// search result batch builders. Returns the columns to append.
1818fn build_node_yield_columns(
1819    vids: &[Vid],
1820    label: &str,
1821    output_name: &str,
1822    target_properties: &HashMap<String, Vec<String>>,
1823    props_map: &HashMap<Vid, uni_common::Properties>,
1824    label_props: Option<&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>>,
1825) -> DFResult<Vec<ArrayRef>> {
1826    let num_rows = vids.len();
1827    let mut columns = Vec::new();
1828
1829    // _vid column
1830    let mut vid_builder = UInt64Builder::with_capacity(num_rows);
1831    for vid in vids {
1832        vid_builder.append_value(vid.as_u64());
1833    }
1834    columns.push(Arc::new(vid_builder.finish()) as ArrayRef);
1835
1836    // variable column (VID as string)
1837    let mut var_builder = StringBuilder::with_capacity(num_rows, num_rows * 20);
1838    for vid in vids {
1839        var_builder.append_value(vid.to_string());
1840    }
1841    columns.push(Arc::new(var_builder.finish()) as ArrayRef);
1842
1843    // _labels column
1844    let mut labels_builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
1845    for _ in 0..num_rows {
1846        labels_builder.values().append_value(label);
1847        labels_builder.append(true);
1848    }
1849    columns.push(Arc::new(labels_builder.finish()) as ArrayRef);
1850
1851    // Property columns
1852    if let Some(props) = target_properties.get(output_name) {
1853        for prop_name in props {
1854            let data_type = resolve_property_type(prop_name, label_props);
1855            let column = crate::query::df_graph::scan::build_property_column_static(
1856                vids, props_map, prop_name, &data_type,
1857            )?;
1858            columns.push(column);
1859        }
1860    }
1861
1862    Ok(columns)
1863}
1864
1865/// Extract a vector from a Value.
1866fn extract_vector(val: &Value) -> DFResult<Vec<f32>> {
1867    match val {
1868        Value::Vector(vec) => Ok(vec.clone()),
1869        Value::List(arr) => {
1870            let mut vec = Vec::with_capacity(arr.len());
1871            for v in arr {
1872                if let Some(f) = v.as_f64() {
1873                    vec.push(f as f32);
1874                } else {
1875                    return Err(datafusion::error::DataFusionError::Execution(
1876                        "Query vector must contain numbers".to_string(),
1877                    ));
1878                }
1879            }
1880            Ok(vec)
1881        }
1882        _ => Err(datafusion::error::DataFusionError::Execution(
1883            "Query vector must be a list or vector".to_string(),
1884        )),
1885    }
1886}