Skip to main content

uni_query/query/executor/
procedure.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use anyhow::{Result, anyhow};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uni_common::Value;
9use uni_cypher::ast::Expr;
10use uni_store::QueryContext;
11use uni_store::runtime::property_manager::PropertyManager;
12
13fn success_result(success: bool) -> Result<Vec<HashMap<String, Value>>> {
14    Ok(vec![HashMap::from([(
15        "success".to_string(),
16        Value::Bool(success),
17    )])])
18}
19
20/// Value type for procedure parameters and outputs.
21#[derive(Debug, Clone, PartialEq)]
22pub enum ProcedureValueType {
23    /// Cypher STRING type.
24    String,
25    /// Cypher INTEGER type.
26    Integer,
27    /// Cypher FLOAT type.
28    Float,
29    /// Cypher NUMBER type (accepts both INTEGER and FLOAT).
30    Number,
31    /// Cypher BOOLEAN type.
32    Boolean,
33    /// Accepts any value type.
34    Any,
35}
36
37/// Single parameter declaration for a registered procedure.
38#[derive(Debug, Clone)]
39pub struct ProcedureParam {
40    /// Parameter name as declared in the procedure signature.
41    pub name: String,
42    /// Expected type for this parameter.
43    pub param_type: ProcedureValueType,
44}
45
46/// Single output column declaration for a registered procedure.
47#[derive(Debug, Clone)]
48pub struct ProcedureOutput {
49    /// Output column name as declared in the procedure signature.
50    pub name: String,
51    /// Type of the output column.
52    pub output_type: ProcedureValueType,
53}
54
55/// A procedure registered at runtime with static mock data.
56///
57/// Used by the TCK harness to define test procedures that the query
58/// engine can call via `CALL proc.name(args) YIELD columns`.
59#[derive(Debug, Clone)]
60pub struct RegisteredProcedure {
61    /// Fully qualified procedure name (e.g. `test.my.proc`).
62    pub name: String,
63    /// Declared input parameters.
64    pub params: Vec<ProcedureParam>,
65    /// Declared output columns.
66    pub outputs: Vec<ProcedureOutput>,
67    /// Mock data rows keyed by column name.
68    pub data: Vec<HashMap<String, Value>>,
69}
70
71/// Thread-safe registry of procedures.
72///
73/// **M4 bridge:** The legacy `procedures` hashmap holds test-only
74/// `RegisteredProcedure` mock rows used by TCK step definitions. The new
75/// `plugin_registry` field holds an `Arc<uni_plugin::PluginRegistry>`
76/// that the M4 cutover commits route real procedure dispatch through.
77/// Both coexist during the M4 coexistence window; once every consumer
78/// switches to plugin-path dispatch, the legacy hashmap is removed.
79#[derive(Debug, Default)]
80pub struct ProcedureRegistry {
81    procedures: std::sync::RwLock<HashMap<String, RegisteredProcedure>>,
82    plugin_registry: std::sync::RwLock<Option<Arc<uni_plugin::PluginRegistry>>>,
83}
84
85impl ProcedureRegistry {
86    /// Creates an empty registry.
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Registers a procedure, replacing any existing one with the same name.
92    pub fn register(&self, proc_def: RegisteredProcedure) {
93        self.procedures
94            .write()
95            .expect("ProcedureRegistry lock poisoned")
96            .insert(proc_def.name.clone(), proc_def);
97    }
98
99    /// Looks up a procedure by fully qualified name (legacy path).
100    pub fn get(&self, name: &str) -> Option<RegisteredProcedure> {
101        self.procedures
102            .read()
103            .expect("ProcedureRegistry lock poisoned")
104            .get(name)
105            .cloned()
106    }
107
108    /// Removes all registered procedures.
109    pub fn clear(&self) {
110        self.procedures
111            .write()
112            .expect("ProcedureRegistry lock poisoned")
113            .clear();
114    }
115
116    /// Attach an [`uni_plugin::PluginRegistry`] for plugin-path dispatch.
117    ///
118    /// M4 bridge: callers configure this once at executor construction,
119    /// and the procedure dispatch site consults the plugin registry for
120    /// any qname not present in the legacy `procedures` hashmap.
121    pub fn set_plugin_registry(&self, pr: Arc<uni_plugin::PluginRegistry>) {
122        *self
123            .plugin_registry
124            .write()
125            .expect("ProcedureRegistry plugin-registry lock poisoned") = Some(pr);
126    }
127
128    /// Snapshot of the currently attached plugin registry, if any.
129    ///
130    /// Used by the read executor to thread the host's `PluginRegistry`
131    /// into the physical planner so consultation sites like
132    /// `plan_vector_knn` can look up registered `IndexHandle`s. Returns
133    /// `None` if `set_plugin_registry` was never called (e.g., low-level
134    /// test setups that bypass `Uni::build`).
135    pub fn plugin_registry(&self) -> Option<Arc<uni_plugin::PluginRegistry>> {
136        self.plugin_registry
137            .read()
138            .expect("ProcedureRegistry plugin-registry lock poisoned")
139            .clone()
140    }
141
142    /// Look up a procedure through the attached `PluginRegistry`, if any.
143    ///
144    /// Dual-consult (M8.6): checks the per-task session-local plugin
145    /// registry first (set by host crates via
146    /// [`crate::scoped_with_session_plugin_registry`]) and falls back
147    /// to the executor's instance-attached plugin registry. Returns
148    /// `None` if neither has `qname`.
149    pub fn get_plugin(
150        &self,
151        qname: &uni_plugin::QName,
152    ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
153        // Session-local first.
154        if let Some(session_pr) = crate::current_session_plugin_registry()
155            && let Some(entry) = session_pr.procedure(qname)
156        {
157            return Some(entry);
158        }
159        self.plugin_registry
160            .read()
161            .expect("ProcedureRegistry plugin-registry lock poisoned")
162            .as_ref()
163            .and_then(|pr| pr.procedure(qname))
164    }
165
166    /// Resolve a user-facing procedure name (as written in `CALL X.Y.Z(...)`)
167    /// to a registered plugin entry, applying the framework's namespace
168    /// resolution rules:
169    ///
170    /// 1. If `user_qname` parses as `<namespace>.<local>`, try that exact
171    ///    qname against the plugin registry.
172    /// 2. Strip a leading `uni.` prefix if present, then try each known
173    ///    built-in plugin namespace (`builtin`, `apoc-core`) with the
174    ///    stripped local name. This lets user-facing names like
175    ///    `uni.bitwise.and` route to plugins that registered under the
176    ///    `apoc-core` namespace as `apoc-core.bitwise.and`.
177    ///
178    /// Future user plugins that want their qnames reachable as `uni.*`
179    /// can declare their own namespace; the resolver will try the
180    /// declared namespace before falling through.
181    pub fn resolve_user_procedure(
182        &self,
183        user_qname: &str,
184    ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
185        // Exact namespace.local match first.
186        if let Some((ns, local)) = user_qname.split_once('.')
187            && let Some(p) = self.get_plugin(&uni_plugin::QName::new(ns, local))
188        {
189            return Some(p);
190        }
191        // Strip `uni.` prefix and try each known built-in plugin namespace.
192        // The `uni` namespace itself is reserved for host-coupled procedures
193        // registered from `uni-query::procedures_plugin` (M4).
194        let stripped = user_qname.strip_prefix("uni.").unwrap_or(user_qname);
195        for plugin_id in ["uni", "builtin", "apoc-core", "custom"] {
196            if let Some(p) = self.get_plugin(&uni_plugin::QName::new(plugin_id, stripped)) {
197                return Some(p);
198            }
199        }
200        None
201    }
202}
203
204use crate::query::df_graph::procedure_call::value_to_columnar;
205
206/// Convert one row of an Arrow array column into a [`uni_common::Value`].
207/// Used when draining a plugin's output `RecordBatch` back to the legacy
208/// row-shaped `Vec<HashMap<String, Value>>` the Executor returns.
209///
210/// This intentionally does **not** delegate to
211/// `uni_store::storage::arrow_convert::arrow_to_value`: that helper is
212/// driven by uni's logical `DataType` (which the plugin output schema
213/// does not carry here) and degrades to `Value::Null` with a `log::warn!`
214/// for shapes it cannot decode. The plugin-output contract instead
215/// requires a hard error on any unexpected Arrow type so the failure
216/// surfaces to the `CALL` site rather than silently producing nulls.
217fn arrow_scalar_to_value(
218    arr: &dyn arrow_array::Array,
219    row_idx: usize,
220) -> std::result::Result<Value, String> {
221    use arrow_array::cast::AsArray;
222    use arrow_schema::DataType as Dt;
223
224    if arr.is_null(row_idx) {
225        return Ok(Value::Null);
226    }
227    match arr.data_type() {
228        Dt::Boolean => Ok(Value::Bool(arr.as_boolean().value(row_idx))),
229        Dt::Int64 => Ok(Value::Int(
230            arr.as_primitive::<arrow_array::types::Int64Type>()
231                .value(row_idx),
232        )),
233        Dt::Int32 => Ok(Value::Int(
234            arr.as_primitive::<arrow_array::types::Int32Type>()
235                .value(row_idx) as i64,
236        )),
237        Dt::UInt64 => Ok(Value::Int(
238            arr.as_primitive::<arrow_array::types::UInt64Type>()
239                .value(row_idx) as i64,
240        )),
241        Dt::Float64 => Ok(Value::Float(
242            arr.as_primitive::<arrow_array::types::Float64Type>()
243                .value(row_idx),
244        )),
245        Dt::Float32 => Ok(Value::Float(
246            arr.as_primitive::<arrow_array::types::Float32Type>()
247                .value(row_idx) as f64,
248        )),
249        Dt::Utf8 => Ok(Value::String(
250            arr.as_string::<i32>().value(row_idx).to_string(),
251        )),
252        Dt::LargeUtf8 => Ok(Value::String(
253            arr.as_string::<i64>().value(row_idx).to_string(),
254        )),
255        Dt::Binary => Ok(Value::Bytes(arr.as_binary::<i32>().value(row_idx).to_vec())),
256        Dt::LargeBinary => Ok(Value::Bytes(arr.as_binary::<i64>().value(row_idx).to_vec())),
257        other => Err(format!(
258            "unsupported Arrow type in plugin procedure output: {other:?}"
259        )),
260    }
261}
262
263/// Filters a full result map to only the requested yield items.
264/// If `yield_items` is empty, returns the full result unchanged.
265fn filter_yield_items(
266    full_result: HashMap<String, Value>,
267    yield_items: &[String],
268) -> HashMap<String, Value> {
269    if yield_items.is_empty() {
270        return full_result;
271    }
272    yield_items
273        .iter()
274        .filter_map(|name| full_result.get(name).map(|val| (name.clone(), val.clone())))
275        .collect()
276}
277
278impl Executor {
279    /// Evaluate a procedure argument as a string, returning an error with the given description.
280    async fn eval_string_arg<'a>(
281        &'a self,
282        arg: &Expr,
283        description: &str,
284        prop_manager: &'a PropertyManager,
285        params: &'a HashMap<String, Value>,
286        ctx: Option<&'a QueryContext>,
287    ) -> Result<String> {
288        let empty_row = HashMap::new();
289        self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
290            .await?
291            .as_str()
292            .ok_or_else(|| anyhow!("{} must be string", description))
293            .map(|s| s.to_string())
294    }
295
296    pub(crate) async fn execute_procedure<'a>(
297        &'a self,
298        name: &str,
299        args: &[Expr],
300        yield_items: &[String],
301        prop_manager: &'a PropertyManager,
302        params: &'a HashMap<String, Value>,
303        ctx: Option<&'a QueryContext>,
304    ) -> Result<Vec<HashMap<String, Value>>> {
305        match name {
306            "uni.admin.compact" => {
307                let stats = self.storage.compact().await?;
308                let full_result = HashMap::from([
309                    (
310                        "files_compacted".to_string(),
311                        Value::Int(stats.files_compacted as i64),
312                    ),
313                    (
314                        "bytes_before".to_string(),
315                        Value::Int(stats.bytes_before as i64),
316                    ),
317                    (
318                        "bytes_after".to_string(),
319                        Value::Int(stats.bytes_after as i64),
320                    ),
321                    (
322                        "duration_ms".to_string(),
323                        Value::Int(stats.duration.as_millis() as i64),
324                    ),
325                ]);
326
327                Ok(vec![filter_yield_items(full_result, yield_items)])
328            }
329            "uni.admin.compactionStatus" => {
330                let status = self
331                    .storage
332                    .compaction_status()
333                    .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
334                let full_result = HashMap::from([
335                    ("l1_runs".to_string(), Value::Int(status.l1_runs as i64)),
336                    (
337                        "l1_size_bytes".to_string(),
338                        Value::Int(status.l1_size_bytes as i64),
339                    ),
340                    (
341                        "in_progress".to_string(),
342                        Value::Bool(status.compaction_in_progress),
343                    ),
344                    (
345                        "pending".to_string(),
346                        Value::Int(status.compaction_pending as i64),
347                    ),
348                    (
349                        "total_compactions".to_string(),
350                        Value::Int(status.total_compactions as i64),
351                    ),
352                    (
353                        "total_bytes_compacted".to_string(),
354                        Value::Int(status.total_bytes_compacted as i64),
355                    ),
356                ]);
357
358                Ok(vec![filter_yield_items(full_result, yield_items)])
359            }
360            "uni.admin.snapshot.create" => {
361                let name = if !args.is_empty() {
362                    Some(
363                        self.eval_string_arg(&args[0], "Snapshot name", prop_manager, params, ctx)
364                            .await?,
365                    )
366                } else {
367                    None
368                };
369
370                let writer_arc = self
371                    .writer
372                    .as_ref()
373                    .ok_or_else(|| anyhow!("Database is in read-only mode"))?;
374                let writer: &uni_store::Writer = writer_arc.as_ref();
375                let snapshot_id = writer.flush_to_l1(name).await?;
376
377                Ok(vec![HashMap::from([(
378                    "snapshot_id".to_string(),
379                    Value::String(snapshot_id),
380                )])])
381            }
382            "uni.admin.snapshot.list" => {
383                let sm = self.storage.snapshot_manager();
384                let ids = sm.list_snapshots().await?;
385                let mut results = Vec::new();
386                for id in ids {
387                    if let Ok(m) = sm.load_snapshot(&id).await {
388                        results.push(HashMap::from([
389                            ("snapshot_id".to_string(), Value::String(m.snapshot_id)),
390                            (
391                                "name".to_string(),
392                                m.name.map(Value::String).unwrap_or(Value::Null),
393                            ),
394                            (
395                                "created_at".to_string(),
396                                Value::String(m.created_at.to_rfc3339()),
397                            ),
398                            (
399                                "version_hwm".to_string(),
400                                Value::Int(m.version_high_water_mark as i64),
401                            ),
402                        ]));
403                    }
404                }
405                Ok(results)
406            }
407            "uni.admin.snapshot.restore" => {
408                let id = self
409                    .eval_string_arg(&args[0], "Snapshot ID", prop_manager, params, ctx)
410                    .await?;
411
412                self.storage
413                    .snapshot_manager()
414                    .set_latest_snapshot(&id)
415                    .await?;
416                Ok(vec![HashMap::from([(
417                    "status".to_string(),
418                    Value::String("Restored".to_string()),
419                )])])
420            }
421            // DDL Procedures
422            "uni.schema.createLabel" => {
423                let empty_row = HashMap::new();
424                let name = self
425                    .eval_string_arg(&args[0], "Label name", prop_manager, params, ctx)
426                    .await?;
427                let config = self
428                    .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
429                    .await?;
430
431                let success =
432                    super::ddl_procedures::create_label(&self.storage, &name, &config).await?;
433                success_result(success)
434            }
435            "uni.schema.createEdgeType" => {
436                let empty_row = HashMap::new();
437                let name = self
438                    .eval_string_arg(&args[0], "Edge type name", prop_manager, params, ctx)
439                    .await?;
440                let src_val = self
441                    .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
442                    .await?;
443                let dst_val = self
444                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
445                    .await?;
446                let config = self
447                    .evaluate_expr(&args[3], &empty_row, prop_manager, params, ctx)
448                    .await?;
449
450                // Convert src/dst to Vec<String>
451                let src_labels = src_val
452                    .as_array()
453                    .ok_or(anyhow!("Source labels must be a list"))?
454                    .iter()
455                    .map(|v| {
456                        v.as_str()
457                            .map(|s| s.to_string())
458                            .ok_or(anyhow!("Label must be string"))
459                    })
460                    .collect::<Result<Vec<_>>>()?;
461                let dst_labels = dst_val
462                    .as_array()
463                    .ok_or(anyhow!("Target labels must be a list"))?
464                    .iter()
465                    .map(|v| {
466                        v.as_str()
467                            .map(|s| s.to_string())
468                            .ok_or(anyhow!("Label must be string"))
469                    })
470                    .collect::<Result<Vec<_>>>()?;
471
472                let success = super::ddl_procedures::create_edge_type(
473                    &self.storage,
474                    &name,
475                    src_labels,
476                    dst_labels,
477                    &config,
478                )
479                .await?;
480                success_result(success)
481            }
482            "uni.schema.createIndex" => {
483                let empty_row = HashMap::new();
484                let label = self
485                    .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
486                    .await?;
487                let property = self
488                    .eval_string_arg(&args[1], "Property", prop_manager, params, ctx)
489                    .await?;
490                let config = self
491                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
492                    .await?;
493
494                let success =
495                    super::ddl_procedures::create_index(&self.storage, &label, &property, &config)
496                        .await?;
497                success_result(success)
498            }
499            "uni.schema.createConstraint" => {
500                let label = self
501                    .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
502                    .await?;
503                let c_type = self
504                    .eval_string_arg(&args[1], "Constraint type", prop_manager, params, ctx)
505                    .await?;
506                let empty_row = HashMap::new();
507                let props_val = self
508                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
509                    .await?;
510
511                let properties = props_val
512                    .as_array()
513                    .ok_or(anyhow!("Properties must be a list"))?
514                    .iter()
515                    .map(|v| {
516                        v.as_str()
517                            .map(|s| s.to_string())
518                            .ok_or(anyhow!("Property must be string"))
519                    })
520                    .collect::<Result<Vec<_>>>()?;
521
522                let success = super::ddl_procedures::create_constraint(
523                    &self.storage,
524                    &label,
525                    &c_type,
526                    properties,
527                )
528                .await?;
529                success_result(success)
530            }
531            // The four `drop*` procedures share one shape: evaluate the
532            // single string argument, dispatch to the matching DDL helper,
533            // and report success. Only the argument label and the helper
534            // differ.
535            "uni.schema.dropLabel"
536            | "uni.schema.dropEdgeType"
537            | "uni.schema.dropIndex"
538            | "uni.schema.dropConstraint" => {
539                let description = match name {
540                    "uni.schema.dropLabel" => "Label name",
541                    "uni.schema.dropEdgeType" => "Edge type name",
542                    "uni.schema.dropIndex" => "Index name",
543                    _ => "Constraint name",
544                };
545                let target = self
546                    .eval_string_arg(&args[0], description, prop_manager, params, ctx)
547                    .await?;
548                let success = match name {
549                    "uni.schema.dropLabel" => {
550                        super::ddl_procedures::drop_label(&self.storage, &target).await?
551                    }
552                    "uni.schema.dropEdgeType" => {
553                        super::ddl_procedures::drop_edge_type(&self.storage, &target).await?
554                    }
555                    "uni.schema.dropIndex" => {
556                        super::ddl_procedures::drop_index(&self.storage, &target).await?
557                    }
558                    _ => super::ddl_procedures::drop_constraint(&self.storage, &target).await?,
559                };
560                success_result(success)
561            }
562            _ => {
563                // M4: Plugin path — consult the framework PluginRegistry
564                // before falling back to the legacy TCK mock registry.
565                if let Some(registry) = &self.procedure_registry
566                    && let Some(entry) = registry.resolve_user_procedure(name)
567                {
568                    return self
569                        .execute_plugin_procedure(
570                            name,
571                            &entry,
572                            args,
573                            yield_items,
574                            prop_manager,
575                            params,
576                            ctx,
577                        )
578                        .await;
579                }
580
581                // Legacy TCK mock-procedure registry.
582                if let Some(registry) = &self.procedure_registry
583                    && let Some(proc_def) = registry.get(name)
584                {
585                    return self
586                        .execute_registered_procedure(
587                            &proc_def,
588                            args,
589                            yield_items,
590                            prop_manager,
591                            params,
592                            ctx,
593                        )
594                        .await;
595                }
596                Err(anyhow!("ProcedureNotFound: Unknown procedure '{}'", name))
597            }
598        }
599    }
600
601    /// Executes a procedure registered through the plugin framework.
602    ///
603    /// Evaluates argument `Expr`s to Values, converts them to
604    /// `ColumnarValue` scalars, calls the plugin's `invoke()` to obtain
605    /// a `SendableRecordBatchStream`, drains the stream, and converts the
606    /// resulting Arrow batches to the legacy `Vec<HashMap<String, Value>>`
607    /// shape the Executor expects.
608    #[allow(clippy::too_many_arguments)] // mirrors the legacy execute_procedure signature
609    async fn execute_plugin_procedure<'a>(
610        &'a self,
611        name: &str,
612        entry: &uni_plugin::registry::ProcedureEntry,
613        args: &[Expr],
614        yield_items: &[String],
615        prop_manager: &'a PropertyManager,
616        params: &'a HashMap<String, Value>,
617        ctx: Option<&'a QueryContext>,
618    ) -> Result<Vec<HashMap<String, Value>>> {
619        use datafusion::logical_expr::ColumnarValue;
620        use futures::StreamExt;
621
622        // Evaluate each arg expression to a Value, then map to a
623        // ColumnarValue scalar for the plugin's invoke signature.
624        let empty_row: HashMap<String, Value> = HashMap::new();
625        let mut columnar_args: Vec<ColumnarValue> = Vec::with_capacity(args.len());
626        for arg in args {
627            let v = self
628                .evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
629                .await?;
630            columnar_args.push(
631                value_to_columnar(&v)
632                    .map_err(|e| anyhow!("Procedure '{name}': argument conversion failed: {e}"))?,
633            );
634        }
635
636        let mut host = crate::query::executor::procedure_host::QueryProcedureHost::from_components(
637            Arc::clone(&self.storage),
638            Some(Arc::clone(&self.algo_registry)),
639            self.procedure_registry.clone(),
640        );
641        // FU-1 / M11 #6: attach the outer executor's writer handle so
642        // declared `WRITE`-mode procedures synthesized by
643        // `CypherProcedureSynthesizer` can mutate via the write-enabled
644        // inner-query host. The simple-Executor path
645        // (`from_components`) is what the procedure_call -> stream
646        // pipeline lands on for top-level `CALL <declared.qname>()`
647        // invocations.
648        if let Some(writer) = &self.writer {
649            host = host.with_writer(Arc::clone(writer));
650        }
651        // FU-1: propagate the in-flight principal so capability gates
652        // (e.g., `Capability::ProcedureWrites` on
653        // `uni.plugin.declareProcedure WRITE`) see the session's
654        // authenticated user, not an anonymous default. The
655        // host + principal -> ProcedureContext construction is
656        // consolidated in `uni_plugin::host::build_procedure_context`.
657        let principal = crate::current_principal();
658        let pctx = uni_plugin::host::build_procedure_context(&host, principal.as_deref());
659        let mut stream = entry
660            .procedure
661            .invoke(pctx, &columnar_args)
662            .map_err(|e| anyhow!("Procedure '{name}': {e}"))?;
663
664        // Collect every batch the plugin yields and convert to row-shaped
665        // Value maps. Schema comes from the plugin signature's yields.
666        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
667        while let Some(item) = stream.next().await {
668            let batch = item.map_err(|e| anyhow!("Procedure '{name}' stream error: {e}"))?;
669            for row_idx in 0..batch.num_rows() {
670                let mut row: HashMap<String, Value> = HashMap::new();
671                let schema = batch.schema();
672                for col_idx in 0..batch.num_columns() {
673                    let field = schema.field(col_idx);
674                    let arr = batch.column(col_idx);
675                    let v = arrow_scalar_to_value(arr.as_ref(), row_idx)
676                        .map_err(|e| anyhow!("Procedure '{name}': output decode: {e}"))?;
677                    row.insert(field.name().clone(), v);
678                }
679                rows.push(filter_yield_items(row, yield_items));
680            }
681        }
682        Ok(rows)
683    }
684
685    /// Executes a procedure from the external registry.
686    ///
687    /// Evaluates arguments, validates count and types against the procedure
688    /// declaration, filters data rows by matching input columns, and projects
689    /// the requested output columns.
690    ///
691    /// # Errors
692    ///
693    /// Returns `InvalidNumberOfArguments` if the argument count is wrong,
694    /// or `InvalidArgumentType` if an argument has an incompatible type.
695    async fn execute_registered_procedure<'a>(
696        &'a self,
697        proc_def: &RegisteredProcedure,
698        args: &[Expr],
699        yield_items: &[String],
700        prop_manager: &'a PropertyManager,
701        params: &'a HashMap<String, Value>,
702        ctx: Option<&'a QueryContext>,
703    ) -> Result<Vec<HashMap<String, Value>>> {
704        let empty_row = HashMap::new();
705
706        // Evaluate arguments
707        let mut evaluated_args = Vec::with_capacity(args.len());
708        for arg in args {
709            evaluated_args.push(
710                self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
711                    .await?,
712            );
713        }
714
715        // Validate argument count
716        if evaluated_args.len() != proc_def.params.len() {
717            if evaluated_args.is_empty() && !proc_def.params.is_empty() {
718                if yield_items.is_empty() {
719                    // Standalone CALL — resolve implicit arguments from query parameters
720                    let mut resolved = Vec::with_capacity(proc_def.params.len());
721                    for param in &proc_def.params {
722                        if let Some(val) = params.get(&param.name) {
723                            resolved.push(val.clone());
724                        } else {
725                            return Err(anyhow!(
726                                "MissingParameter: Procedure '{}' requires implicit argument '{}' \
727                                 but it was not provided as a query parameter",
728                                proc_def.name,
729                                param.name
730                            ));
731                        }
732                    }
733                    evaluated_args = resolved;
734                } else {
735                    // In-query CALL with YIELD cannot use implicit arguments
736                    return Err(anyhow!(
737                        "InvalidArgumentPassingMode: Procedure '{}' requires explicit argument passing in in-query CALL",
738                        proc_def.name
739                    ));
740                }
741            } else {
742                return Err(anyhow!(
743                    "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
744                    proc_def.name,
745                    proc_def.params.len(),
746                    evaluated_args.len()
747                ));
748            }
749        }
750
751        // Validate argument types
752        for (i, (arg_val, param)) in evaluated_args.iter().zip(&proc_def.params).enumerate() {
753            if !arg_val.is_null() && !check_type_compatible(arg_val, &param.param_type) {
754                return Err(anyhow!(
755                    "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
756                    i,
757                    param.name,
758                    proc_def.name
759                ));
760            }
761        }
762
763        // Filter data rows: keep rows where input columns match the provided args
764        let filtered: Vec<&HashMap<String, Value>> = proc_def
765            .data
766            .iter()
767            .filter(|row| {
768                for (param, arg_val) in proc_def.params.iter().zip(&evaluated_args) {
769                    if let Some(row_val) = row.get(&param.name)
770                        && !values_match(row_val, arg_val)
771                    {
772                        return false;
773                    }
774                }
775                true
776            })
777            .collect();
778
779        // Collect output column names
780        let output_names: Vec<&str> = proc_def.outputs.iter().map(|o| o.name.as_str()).collect();
781
782        // Project output columns, applying yield_items filtering. With no
783        // yield list, return every declared output column; otherwise route
784        // through `filter_yield_items` over the data row.
785        let results = filtered
786            .into_iter()
787            .map(|row| {
788                if yield_items.is_empty() {
789                    output_names
790                        .iter()
791                        .filter_map(|name| {
792                            row.get(*name).map(|val| ((*name).to_string(), val.clone()))
793                        })
794                        .collect()
795                } else {
796                    filter_yield_items(row.clone(), yield_items)
797                }
798            })
799            .collect();
800
801        Ok(results)
802    }
803}
804
805/// Checks whether a value is compatible with a procedure type.
806fn check_type_compatible(val: &Value, expected: &ProcedureValueType) -> bool {
807    match expected {
808        ProcedureValueType::Any => true,
809        ProcedureValueType::String => val.is_string(),
810        ProcedureValueType::Boolean => val.is_bool(),
811        ProcedureValueType::Integer => val.is_i64(),
812        ProcedureValueType::Float => val.is_f64() || val.is_i64(),
813        ProcedureValueType::Number => val.is_number(),
814    }
815}
816
817/// Checks whether two values match for input-column filtering.
818fn values_match(row_val: &Value, arg_val: &Value) -> bool {
819    if arg_val.is_null() || row_val.is_null() {
820        return arg_val.is_null() && row_val.is_null();
821    }
822    // Compare numbers by f64 to handle int/float cross-comparison
823    if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
824        return (a - b).abs() < f64::EPSILON;
825    }
826    row_val == arg_val
827}