Skip to main content

uni_query/query/executor/
procedure.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use super::core::*;
5use anyhow::{Result, anyhow};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uni_common::Value;
9use uni_cypher::ast::Expr;
10use uni_store::QueryContext;
11use uni_store::runtime::property_manager::PropertyManager;
12
13fn success_result(success: bool) -> Result<Vec<HashMap<String, Value>>> {
14    Ok(vec![HashMap::from([(
15        "success".to_string(),
16        Value::Bool(success),
17    )])])
18}
19
20/// Value type for procedure parameters and outputs.
21#[derive(Debug, Clone, PartialEq)]
22pub enum ProcedureValueType {
23    /// Cypher STRING type.
24    String,
25    /// Cypher INTEGER type.
26    Integer,
27    /// Cypher FLOAT type.
28    Float,
29    /// Cypher NUMBER type (accepts both INTEGER and FLOAT).
30    Number,
31    /// Cypher BOOLEAN type.
32    Boolean,
33    /// Accepts any value type.
34    Any,
35}
36
37/// Single parameter declaration for a registered procedure.
38#[derive(Debug, Clone)]
39pub struct ProcedureParam {
40    /// Parameter name as declared in the procedure signature.
41    pub name: String,
42    /// Expected type for this parameter.
43    pub param_type: ProcedureValueType,
44}
45
46/// Single output column declaration for a registered procedure.
47#[derive(Debug, Clone)]
48pub struct ProcedureOutput {
49    /// Output column name as declared in the procedure signature.
50    pub name: String,
51    /// Type of the output column.
52    pub output_type: ProcedureValueType,
53}
54
55/// A procedure registered at runtime with static mock data.
56///
57/// Used by the TCK harness to define test procedures that the query
58/// engine can call via `CALL proc.name(args) YIELD columns`.
59#[derive(Debug, Clone)]
60pub struct RegisteredProcedure {
61    /// Fully qualified procedure name (e.g. `test.my.proc`).
62    pub name: String,
63    /// Declared input parameters.
64    pub params: Vec<ProcedureParam>,
65    /// Declared output columns.
66    pub outputs: Vec<ProcedureOutput>,
67    /// Mock data rows keyed by column name.
68    pub data: Vec<HashMap<String, Value>>,
69}
70
71/// Thread-safe registry of procedures.
72///
73/// **M4 bridge:** The legacy `procedures` hashmap holds test-only
74/// `RegisteredProcedure` mock rows used by TCK step definitions. The new
75/// `plugin_registry` field holds an `Arc<uni_plugin::PluginRegistry>`
76/// that the M4 cutover commits route real procedure dispatch through.
77/// Both coexist during the M4 coexistence window; once every consumer
78/// switches to plugin-path dispatch, the legacy hashmap is removed.
79#[derive(Debug, Default)]
80pub struct ProcedureRegistry {
81    procedures: std::sync::RwLock<HashMap<String, RegisteredProcedure>>,
82    plugin_registry: std::sync::RwLock<Option<Arc<uni_plugin::PluginRegistry>>>,
83}
84
85impl ProcedureRegistry {
86    /// Creates an empty registry.
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Registers a procedure, replacing any existing one with the same name.
92    pub fn register(&self, proc_def: RegisteredProcedure) {
93        self.procedures
94            .write()
95            .expect("ProcedureRegistry lock poisoned")
96            .insert(proc_def.name.clone(), proc_def);
97    }
98
99    /// Looks up a procedure by fully qualified name (legacy path).
100    pub fn get(&self, name: &str) -> Option<RegisteredProcedure> {
101        self.procedures
102            .read()
103            .expect("ProcedureRegistry lock poisoned")
104            .get(name)
105            .cloned()
106    }
107
108    /// Removes all registered procedures.
109    pub fn clear(&self) {
110        self.procedures
111            .write()
112            .expect("ProcedureRegistry lock poisoned")
113            .clear();
114    }
115
116    /// Attach an [`uni_plugin::PluginRegistry`] for plugin-path dispatch.
117    ///
118    /// M4 bridge: callers configure this once at executor construction,
119    /// and the procedure dispatch site consults the plugin registry for
120    /// any qname not present in the legacy `procedures` hashmap.
121    pub fn set_plugin_registry(&self, pr: Arc<uni_plugin::PluginRegistry>) {
122        *self
123            .plugin_registry
124            .write()
125            .expect("ProcedureRegistry plugin-registry lock poisoned") = Some(pr);
126    }
127
128    /// Snapshot of the currently attached plugin registry, if any.
129    ///
130    /// Used by the read executor to thread the host's `PluginRegistry`
131    /// into the physical planner so consultation sites like
132    /// `plan_vector_knn` can look up registered `IndexHandle`s. Returns
133    /// `None` if `set_plugin_registry` was never called (e.g., low-level
134    /// test setups that bypass `Uni::build`).
135    pub fn plugin_registry(&self) -> Option<Arc<uni_plugin::PluginRegistry>> {
136        self.plugin_registry
137            .read()
138            .expect("ProcedureRegistry plugin-registry lock poisoned")
139            .clone()
140    }
141
142    /// Look up a procedure through the attached `PluginRegistry`, if any.
143    ///
144    /// Dual-consult (M8.6): checks the per-task session-local plugin
145    /// registry first (set by host crates via
146    /// [`crate::scoped_with_session_plugin_registry`]) and falls back
147    /// to the executor's instance-attached plugin registry. Returns
148    /// `None` if neither has `qname`.
149    pub fn get_plugin(
150        &self,
151        qname: &uni_plugin::QName,
152    ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
153        // Session-local first.
154        if let Some(session_pr) = crate::current_session_plugin_registry()
155            && let Some(entry) = session_pr.procedure(qname)
156        {
157            return Some(entry);
158        }
159        self.plugin_registry
160            .read()
161            .expect("ProcedureRegistry plugin-registry lock poisoned")
162            .as_ref()
163            .and_then(|pr| pr.procedure(qname))
164    }
165
166    /// Resolve a user-facing procedure name (as written in `CALL X.Y.Z(...)`)
167    /// to a registered plugin entry, applying the framework's namespace
168    /// resolution rules:
169    ///
170    /// 1. If `user_qname` parses as `<namespace>.<local>`, try that exact
171    ///    qname against the plugin registry.
172    /// 2. Strip a leading `uni.` prefix if present, then try each known
173    ///    built-in plugin namespace (`builtin`, `apoc-core`) with the
174    ///    stripped local name. This lets user-facing names like
175    ///    `uni.bitwise.and` route to plugins that registered under the
176    ///    `apoc-core` namespace as `apoc-core.bitwise.and`.
177    ///
178    /// Future user plugins that want their qnames reachable as `uni.*`
179    /// can declare their own namespace; the resolver will try the
180    /// declared namespace before falling through.
181    pub fn resolve_user_procedure(
182        &self,
183        user_qname: &str,
184    ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
185        // Exact namespace.local match first, trying every split point
186        // (first-dot → last-dot) so plugin ids that themselves contain dots
187        // (e.g. `ai.example`) resolve alongside single-segment ids and the
188        // first-dot M9/builtin convention. See `QName::candidate_splits`.
189        for q in uni_plugin::QName::candidate_splits(user_qname) {
190            if let Some(p) = self.get_plugin(&q) {
191                return Some(p);
192            }
193        }
194        // Strip `uni.` prefix and try each known built-in plugin namespace.
195        // The `uni` namespace itself is reserved for host-coupled procedures
196        // registered from `uni-query::procedures_plugin` (M4).
197        let stripped = user_qname.strip_prefix("uni.").unwrap_or(user_qname);
198        for plugin_id in ["uni", "builtin", "apoc-core", "custom"] {
199            if let Some(p) = self.get_plugin(&uni_plugin::QName::new(plugin_id, stripped)) {
200                return Some(p);
201            }
202        }
203        None
204    }
205}
206
207use crate::query::df_graph::procedure_call::value_to_columnar;
208
209/// Convert one row of an Arrow array column into a [`uni_common::Value`].
210/// Used when draining a plugin's output `RecordBatch` back to the legacy
211/// row-shaped `Vec<HashMap<String, Value>>` the Executor returns.
212///
213/// This intentionally does **not** delegate to
214/// `uni_store::storage::arrow_convert::arrow_to_value`: that helper is
215/// driven by uni's logical `DataType` (which the plugin output schema
216/// does not carry here) and degrades to `Value::Null` with a `log::warn!`
217/// for shapes it cannot decode. The plugin-output contract instead
218/// requires a hard error on any unexpected Arrow type so the failure
219/// surfaces to the `CALL` site rather than silently producing nulls.
220fn arrow_scalar_to_value(
221    arr: &dyn arrow_array::Array,
222    row_idx: usize,
223) -> std::result::Result<Value, String> {
224    use arrow_array::cast::AsArray;
225    use arrow_schema::DataType as Dt;
226
227    if arr.is_null(row_idx) {
228        return Ok(Value::Null);
229    }
230    match arr.data_type() {
231        Dt::Boolean => Ok(Value::Bool(arr.as_boolean().value(row_idx))),
232        Dt::Int64 => Ok(Value::Int(
233            arr.as_primitive::<arrow_array::types::Int64Type>()
234                .value(row_idx),
235        )),
236        Dt::Int32 => Ok(Value::Int(
237            arr.as_primitive::<arrow_array::types::Int32Type>()
238                .value(row_idx) as i64,
239        )),
240        Dt::UInt64 => Ok(Value::Int(
241            arr.as_primitive::<arrow_array::types::UInt64Type>()
242                .value(row_idx) as i64,
243        )),
244        Dt::Float64 => Ok(Value::Float(
245            arr.as_primitive::<arrow_array::types::Float64Type>()
246                .value(row_idx),
247        )),
248        Dt::Float32 => Ok(Value::Float(
249            arr.as_primitive::<arrow_array::types::Float32Type>()
250                .value(row_idx) as f64,
251        )),
252        Dt::Utf8 => Ok(Value::String(
253            arr.as_string::<i32>().value(row_idx).to_string(),
254        )),
255        Dt::LargeUtf8 => Ok(Value::String(
256            arr.as_string::<i64>().value(row_idx).to_string(),
257        )),
258        Dt::Binary => Ok(Value::Bytes(arr.as_binary::<i32>().value(row_idx).to_vec())),
259        Dt::LargeBinary => Ok(Value::Bytes(arr.as_binary::<i64>().value(row_idx).to_vec())),
260        other => Err(format!(
261            "unsupported Arrow type in plugin procedure output: {other:?}"
262        )),
263    }
264}
265
266/// Filters a full result map to only the requested yield items.
267/// If `yield_items` is empty, returns the full result unchanged.
268fn filter_yield_items(
269    full_result: HashMap<String, Value>,
270    yield_items: &[String],
271) -> HashMap<String, Value> {
272    if yield_items.is_empty() {
273        return full_result;
274    }
275    yield_items
276        .iter()
277        .filter_map(|name| full_result.get(name).map(|val| (name.clone(), val.clone())))
278        .collect()
279}
280
281impl Executor {
282    /// Evaluate a procedure argument as a string, returning an error with the given description.
283    async fn eval_string_arg<'a>(
284        &'a self,
285        arg: &Expr,
286        description: &str,
287        prop_manager: &'a PropertyManager,
288        params: &'a HashMap<String, Value>,
289        ctx: Option<&'a QueryContext>,
290    ) -> Result<String> {
291        let empty_row = HashMap::new();
292        self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
293            .await?
294            .as_str()
295            .ok_or_else(|| anyhow!("{} must be string", description))
296            .map(|s| s.to_string())
297    }
298
299    pub(crate) async fn execute_procedure<'a>(
300        &'a self,
301        name: &str,
302        args: &[Expr],
303        yield_items: &[String],
304        prop_manager: &'a PropertyManager,
305        params: &'a HashMap<String, Value>,
306        ctx: Option<&'a QueryContext>,
307    ) -> Result<Vec<HashMap<String, Value>>> {
308        match name {
309            "uni.admin.compact" => {
310                let stats = self.storage.compact().await?;
311                let full_result = HashMap::from([
312                    (
313                        "files_compacted".to_string(),
314                        Value::Int(stats.files_compacted as i64),
315                    ),
316                    (
317                        "bytes_before".to_string(),
318                        Value::Int(stats.bytes_before as i64),
319                    ),
320                    (
321                        "bytes_after".to_string(),
322                        Value::Int(stats.bytes_after as i64),
323                    ),
324                    (
325                        "duration_ms".to_string(),
326                        Value::Int(stats.duration.as_millis() as i64),
327                    ),
328                ]);
329
330                Ok(vec![filter_yield_items(full_result, yield_items)])
331            }
332            "uni.admin.compactionStatus" => {
333                let status = self
334                    .storage
335                    .compaction_status()
336                    .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
337                let full_result = HashMap::from([
338                    ("l1_runs".to_string(), Value::Int(status.l1_runs as i64)),
339                    (
340                        "l1_size_bytes".to_string(),
341                        Value::Int(status.l1_size_bytes as i64),
342                    ),
343                    (
344                        "in_progress".to_string(),
345                        Value::Bool(status.compaction_in_progress),
346                    ),
347                    (
348                        "pending".to_string(),
349                        Value::Int(status.compaction_pending as i64),
350                    ),
351                    (
352                        "total_compactions".to_string(),
353                        Value::Int(status.total_compactions as i64),
354                    ),
355                    (
356                        "total_bytes_compacted".to_string(),
357                        Value::Int(status.total_bytes_compacted as i64),
358                    ),
359                ]);
360
361                Ok(vec![filter_yield_items(full_result, yield_items)])
362            }
363            "uni.admin.snapshot.create" => {
364                let name = if !args.is_empty() {
365                    Some(
366                        self.eval_string_arg(&args[0], "Snapshot name", prop_manager, params, ctx)
367                            .await?,
368                    )
369                } else {
370                    None
371                };
372
373                let writer_arc = self
374                    .writer
375                    .as_ref()
376                    .ok_or_else(|| anyhow!("Database is in read-only mode"))?;
377                let writer: &uni_store::Writer = writer_arc.as_ref();
378                let snapshot_id = writer.flush_to_l1(name).await?;
379
380                Ok(vec![HashMap::from([(
381                    "snapshot_id".to_string(),
382                    Value::String(snapshot_id),
383                )])])
384            }
385            "uni.admin.snapshot.list" => {
386                let sm = self.storage.snapshot_manager();
387                let ids = sm.list_snapshots().await?;
388                let mut results = Vec::new();
389                for id in ids {
390                    if let Ok(m) = sm.load_snapshot(&id).await {
391                        results.push(HashMap::from([
392                            ("snapshot_id".to_string(), Value::String(m.snapshot_id)),
393                            (
394                                "name".to_string(),
395                                m.name.map(Value::String).unwrap_or(Value::Null),
396                            ),
397                            (
398                                "created_at".to_string(),
399                                Value::String(m.created_at.to_rfc3339()),
400                            ),
401                            (
402                                "version_hwm".to_string(),
403                                Value::Int(m.version_high_water_mark as i64),
404                            ),
405                        ]));
406                    }
407                }
408                Ok(results)
409            }
410            "uni.admin.snapshot.restore" => {
411                let id = self
412                    .eval_string_arg(&args[0], "Snapshot ID", prop_manager, params, ctx)
413                    .await?;
414
415                self.storage
416                    .snapshot_manager()
417                    .set_latest_snapshot(&id)
418                    .await?;
419                Ok(vec![HashMap::from([(
420                    "status".to_string(),
421                    Value::String("Restored".to_string()),
422                )])])
423            }
424            // DDL Procedures
425            "uni.schema.createLabel" => {
426                let empty_row = HashMap::new();
427                let name = self
428                    .eval_string_arg(&args[0], "Label name", prop_manager, params, ctx)
429                    .await?;
430                let config = self
431                    .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
432                    .await?;
433
434                let success =
435                    super::ddl_procedures::create_label(&self.storage, &name, &config).await?;
436                success_result(success)
437            }
438            "uni.schema.createEdgeType" => {
439                let empty_row = HashMap::new();
440                let name = self
441                    .eval_string_arg(&args[0], "Edge type name", prop_manager, params, ctx)
442                    .await?;
443                let src_val = self
444                    .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
445                    .await?;
446                let dst_val = self
447                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
448                    .await?;
449                let config = self
450                    .evaluate_expr(&args[3], &empty_row, prop_manager, params, ctx)
451                    .await?;
452
453                // Convert src/dst to Vec<String>
454                let src_labels = src_val
455                    .as_array()
456                    .ok_or(anyhow!("Source labels must be a list"))?
457                    .iter()
458                    .map(|v| {
459                        v.as_str()
460                            .map(|s| s.to_string())
461                            .ok_or(anyhow!("Label must be string"))
462                    })
463                    .collect::<Result<Vec<_>>>()?;
464                let dst_labels = dst_val
465                    .as_array()
466                    .ok_or(anyhow!("Target labels must be a list"))?
467                    .iter()
468                    .map(|v| {
469                        v.as_str()
470                            .map(|s| s.to_string())
471                            .ok_or(anyhow!("Label must be string"))
472                    })
473                    .collect::<Result<Vec<_>>>()?;
474
475                let success = super::ddl_procedures::create_edge_type(
476                    &self.storage,
477                    &name,
478                    src_labels,
479                    dst_labels,
480                    &config,
481                )
482                .await?;
483                success_result(success)
484            }
485            "uni.schema.createIndex" => {
486                let empty_row = HashMap::new();
487                let label = self
488                    .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
489                    .await?;
490                let property = self
491                    .eval_string_arg(&args[1], "Property", prop_manager, params, ctx)
492                    .await?;
493                let config = self
494                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
495                    .await?;
496
497                let success =
498                    super::ddl_procedures::create_index(&self.storage, &label, &property, &config)
499                        .await?;
500                success_result(success)
501            }
502            "uni.schema.createConstraint" => {
503                let label = self
504                    .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
505                    .await?;
506                let c_type = self
507                    .eval_string_arg(&args[1], "Constraint type", prop_manager, params, ctx)
508                    .await?;
509                let empty_row = HashMap::new();
510                let props_val = self
511                    .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
512                    .await?;
513
514                let properties = props_val
515                    .as_array()
516                    .ok_or(anyhow!("Properties must be a list"))?
517                    .iter()
518                    .map(|v| {
519                        v.as_str()
520                            .map(|s| s.to_string())
521                            .ok_or(anyhow!("Property must be string"))
522                    })
523                    .collect::<Result<Vec<_>>>()?;
524
525                let success = super::ddl_procedures::create_constraint(
526                    &self.storage,
527                    &label,
528                    &c_type,
529                    properties,
530                )
531                .await?;
532                success_result(success)
533            }
534            // The four `drop*` procedures share one shape: evaluate the
535            // single string argument, dispatch to the matching DDL helper,
536            // and report success. Only the argument label and the helper
537            // differ.
538            "uni.schema.dropLabel"
539            | "uni.schema.dropEdgeType"
540            | "uni.schema.dropIndex"
541            | "uni.schema.dropConstraint" => {
542                let description = match name {
543                    "uni.schema.dropLabel" => "Label name",
544                    "uni.schema.dropEdgeType" => "Edge type name",
545                    "uni.schema.dropIndex" => "Index name",
546                    _ => "Constraint name",
547                };
548                let target = self
549                    .eval_string_arg(&args[0], description, prop_manager, params, ctx)
550                    .await?;
551                let success = match name {
552                    "uni.schema.dropLabel" => {
553                        super::ddl_procedures::drop_label(&self.storage, &target).await?
554                    }
555                    "uni.schema.dropEdgeType" => {
556                        super::ddl_procedures::drop_edge_type(&self.storage, &target).await?
557                    }
558                    "uni.schema.dropIndex" => {
559                        super::ddl_procedures::drop_index(&self.storage, &target).await?
560                    }
561                    _ => super::ddl_procedures::drop_constraint(&self.storage, &target).await?,
562                };
563                success_result(success)
564            }
565            _ => {
566                // M4: Plugin path — consult the framework PluginRegistry
567                // before falling back to the legacy TCK mock registry.
568                if let Some(registry) = &self.procedure_registry
569                    && let Some(entry) = registry.resolve_user_procedure(name)
570                {
571                    return self
572                        .execute_plugin_procedure(
573                            name,
574                            &entry,
575                            args,
576                            yield_items,
577                            prop_manager,
578                            params,
579                            ctx,
580                        )
581                        .await;
582                }
583
584                // Legacy TCK mock-procedure registry.
585                if let Some(registry) = &self.procedure_registry
586                    && let Some(proc_def) = registry.get(name)
587                {
588                    return self
589                        .execute_registered_procedure(
590                            &proc_def,
591                            args,
592                            yield_items,
593                            prop_manager,
594                            params,
595                            ctx,
596                        )
597                        .await;
598                }
599                Err(anyhow!("ProcedureNotFound: Unknown procedure '{}'", name))
600            }
601        }
602    }
603
604    /// Executes a procedure registered through the plugin framework.
605    ///
606    /// Evaluates argument `Expr`s to Values, converts them to
607    /// `ColumnarValue` scalars, calls the plugin's `invoke()` to obtain
608    /// a `SendableRecordBatchStream`, drains the stream, and converts the
609    /// resulting Arrow batches to the legacy `Vec<HashMap<String, Value>>`
610    /// shape the Executor expects.
611    #[allow(clippy::too_many_arguments)] // mirrors the legacy execute_procedure signature
612    async fn execute_plugin_procedure<'a>(
613        &'a self,
614        name: &str,
615        entry: &uni_plugin::registry::ProcedureEntry,
616        args: &[Expr],
617        yield_items: &[String],
618        prop_manager: &'a PropertyManager,
619        params: &'a HashMap<String, Value>,
620        ctx: Option<&'a QueryContext>,
621    ) -> Result<Vec<HashMap<String, Value>>> {
622        use datafusion::logical_expr::ColumnarValue;
623        use futures::StreamExt;
624
625        // Evaluate each arg expression to a Value, then map to a
626        // ColumnarValue scalar for the plugin's invoke signature.
627        let empty_row: HashMap<String, Value> = HashMap::new();
628        let mut columnar_args: Vec<ColumnarValue> = Vec::with_capacity(args.len());
629        for arg in args {
630            let v = self
631                .evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
632                .await?;
633            columnar_args.push(
634                value_to_columnar(&v)
635                    .map_err(|e| anyhow!("Procedure '{name}': argument conversion failed: {e}"))?,
636            );
637        }
638
639        let mut host = crate::query::executor::procedure_host::QueryProcedureHost::from_components(
640            Arc::clone(&self.storage),
641            Some(Arc::clone(&self.algo_registry)),
642            self.procedure_registry.clone(),
643        );
644        // FU-1 / M11 #6: attach the outer executor's writer handle so
645        // declared `WRITE`-mode procedures synthesized by
646        // `CypherProcedureSynthesizer` can mutate via the write-enabled
647        // inner-query host. The simple-Executor path
648        // (`from_components`) is what the procedure_call -> stream
649        // pipeline lands on for top-level `CALL <declared.qname>()`
650        // invocations.
651        if let Some(writer) = &self.writer {
652            host = host.with_writer(Arc::clone(writer));
653        }
654        // FU-1: propagate the in-flight principal so capability gates
655        // (e.g., `Capability::ProcedureWrites` on
656        // `uni.plugin.declareProcedure WRITE`) see the session's
657        // authenticated user, not an anonymous default. The
658        // host + principal -> ProcedureContext construction is
659        // consolidated in `uni_plugin::host::build_procedure_context`.
660        let principal = crate::current_principal();
661        let pctx = uni_plugin::host::build_procedure_context(&host, principal.as_deref());
662        let mut stream = entry
663            .procedure
664            .invoke(pctx, &columnar_args)
665            .map_err(|e| anyhow!("Procedure '{name}': {e}"))?;
666
667        // Collect every batch the plugin yields and convert to row-shaped
668        // Value maps. Schema comes from the plugin signature's yields.
669        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
670        while let Some(item) = stream.next().await {
671            let batch = item.map_err(|e| anyhow!("Procedure '{name}' stream error: {e}"))?;
672            for row_idx in 0..batch.num_rows() {
673                let mut row: HashMap<String, Value> = HashMap::new();
674                let schema = batch.schema();
675                for col_idx in 0..batch.num_columns() {
676                    let field = schema.field(col_idx);
677                    let arr = batch.column(col_idx);
678                    let v = arrow_scalar_to_value(arr.as_ref(), row_idx)
679                        .map_err(|e| anyhow!("Procedure '{name}': output decode: {e}"))?;
680                    row.insert(field.name().clone(), v);
681                }
682                rows.push(filter_yield_items(row, yield_items));
683            }
684        }
685        Ok(rows)
686    }
687
688    /// Executes a procedure from the external registry.
689    ///
690    /// Evaluates arguments, validates count and types against the procedure
691    /// declaration, filters data rows by matching input columns, and projects
692    /// the requested output columns.
693    ///
694    /// # Errors
695    ///
696    /// Returns `InvalidNumberOfArguments` if the argument count is wrong,
697    /// or `InvalidArgumentType` if an argument has an incompatible type.
698    async fn execute_registered_procedure<'a>(
699        &'a self,
700        proc_def: &RegisteredProcedure,
701        args: &[Expr],
702        yield_items: &[String],
703        prop_manager: &'a PropertyManager,
704        params: &'a HashMap<String, Value>,
705        ctx: Option<&'a QueryContext>,
706    ) -> Result<Vec<HashMap<String, Value>>> {
707        let empty_row = HashMap::new();
708
709        // Evaluate arguments
710        let mut evaluated_args = Vec::with_capacity(args.len());
711        for arg in args {
712            evaluated_args.push(
713                self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
714                    .await?,
715            );
716        }
717
718        // Validate argument count
719        if evaluated_args.len() != proc_def.params.len() {
720            if evaluated_args.is_empty() && !proc_def.params.is_empty() {
721                if yield_items.is_empty() {
722                    // Standalone CALL — resolve implicit arguments from query parameters
723                    let mut resolved = Vec::with_capacity(proc_def.params.len());
724                    for param in &proc_def.params {
725                        if let Some(val) = params.get(&param.name) {
726                            resolved.push(val.clone());
727                        } else {
728                            return Err(anyhow!(
729                                "MissingParameter: Procedure '{}' requires implicit argument '{}' \
730                                 but it was not provided as a query parameter",
731                                proc_def.name,
732                                param.name
733                            ));
734                        }
735                    }
736                    evaluated_args = resolved;
737                } else {
738                    // In-query CALL with YIELD cannot use implicit arguments
739                    return Err(anyhow!(
740                        "InvalidArgumentPassingMode: Procedure '{}' requires explicit argument passing in in-query CALL",
741                        proc_def.name
742                    ));
743                }
744            } else {
745                return Err(anyhow!(
746                    "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
747                    proc_def.name,
748                    proc_def.params.len(),
749                    evaluated_args.len()
750                ));
751            }
752        }
753
754        // Validate argument types
755        for (i, (arg_val, param)) in evaluated_args.iter().zip(&proc_def.params).enumerate() {
756            if !arg_val.is_null() && !check_type_compatible(arg_val, &param.param_type) {
757                return Err(anyhow!(
758                    "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
759                    i,
760                    param.name,
761                    proc_def.name
762                ));
763            }
764        }
765
766        // Filter data rows: keep rows where input columns match the provided args
767        let filtered: Vec<&HashMap<String, Value>> = proc_def
768            .data
769            .iter()
770            .filter(|row| {
771                for (param, arg_val) in proc_def.params.iter().zip(&evaluated_args) {
772                    if let Some(row_val) = row.get(&param.name)
773                        && !values_match(row_val, arg_val)
774                    {
775                        return false;
776                    }
777                }
778                true
779            })
780            .collect();
781
782        // Collect output column names
783        let output_names: Vec<&str> = proc_def.outputs.iter().map(|o| o.name.as_str()).collect();
784
785        // Project output columns, applying yield_items filtering. With no
786        // yield list, return every declared output column; otherwise route
787        // through `filter_yield_items` over the data row.
788        let results = filtered
789            .into_iter()
790            .map(|row| {
791                if yield_items.is_empty() {
792                    output_names
793                        .iter()
794                        .filter_map(|name| {
795                            row.get(*name).map(|val| ((*name).to_string(), val.clone()))
796                        })
797                        .collect()
798                } else {
799                    filter_yield_items(row.clone(), yield_items)
800                }
801            })
802            .collect();
803
804        Ok(results)
805    }
806}
807
808/// Checks whether a value is compatible with a procedure type.
809fn check_type_compatible(val: &Value, expected: &ProcedureValueType) -> bool {
810    match expected {
811        ProcedureValueType::Any => true,
812        ProcedureValueType::String => val.is_string(),
813        ProcedureValueType::Boolean => val.is_bool(),
814        ProcedureValueType::Integer => val.is_i64(),
815        ProcedureValueType::Float => val.is_f64() || val.is_i64(),
816        ProcedureValueType::Number => val.is_number(),
817    }
818}
819
820/// Checks whether two values match for input-column filtering.
821fn values_match(row_val: &Value, arg_val: &Value) -> bool {
822    if arg_val.is_null() || row_val.is_null() {
823        return arg_val.is_null() && row_val.is_null();
824    }
825    // Compare numbers by f64 to handle int/float cross-comparison
826    if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
827        return (a - b).abs() < f64::EPSILON;
828    }
829    row_val == arg_val
830}