Skip to main content

meshdb_executor/
procedures.rs

1#[cfg(any(
2    feature = "apoc-create",
3    feature = "apoc-refactor",
4    feature = "apoc-cypher"
5))]
6use crate::error::Error;
7use crate::error::Result;
8use crate::reader::GraphReader;
9use crate::value::Value;
10#[cfg(any(
11    feature = "apoc-create",
12    feature = "apoc-refactor",
13    feature = "apoc-cypher"
14))]
15use crate::writer::GraphWriter;
16#[cfg(any(feature = "apoc-create", feature = "apoc-refactor"))]
17use meshdb_core::Edge;
18#[cfg(feature = "apoc-create")]
19use meshdb_core::Node;
20use meshdb_core::Property;
21use std::collections::{BTreeSet, HashMap};
22
23/// Declared argument / output type for a procedure signature.
24/// Mirrors the openCypher type names the TCK uses (`STRING?`,
25/// `INTEGER?`, `FLOAT?`, `NUMBER?`, `BOOLEAN?`, `ANY?`). Nullability
26/// is not tracked separately — every TCK type in practice is nullable
27/// (`?`) and the match logic treats nulls uniformly.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ProcType {
30    String,
31    Integer,
32    Float,
33    Number,
34    Boolean,
35    Any,
36}
37
38impl ProcType {
39    pub fn parse(s: &str) -> Self {
40        let trimmed = s.trim().trim_end_matches('?').trim();
41        match trimmed.to_ascii_uppercase().as_str() {
42            "STRING" => ProcType::String,
43            "INTEGER" | "INT" => ProcType::Integer,
44            "FLOAT" => ProcType::Float,
45            "NUMBER" | "NUMERIC" => ProcType::Number,
46            "BOOLEAN" | "BOOL" => ProcType::Boolean,
47            _ => ProcType::Any,
48        }
49    }
50
51    /// True when `value` is acceptable as a procedure argument of
52    /// this declared type. Follows Neo4j's assignable-type rules:
53    /// `FLOAT` accepts integers (coerced), `NUMBER` accepts both
54    /// numeric kinds, `ANY` accepts everything.
55    pub fn accepts(&self, value: &Value) -> bool {
56        if matches!(value, Value::Null) {
57            return true;
58        }
59        match (self, value) {
60            (ProcType::Any, _) => true,
61            (ProcType::String, Value::Property(Property::String(_))) => true,
62            (ProcType::Integer, Value::Property(Property::Int64(_))) => true,
63            (ProcType::Float, Value::Property(Property::Float64(_))) => true,
64            (ProcType::Float, Value::Property(Property::Int64(_))) => true,
65            (ProcType::Number, Value::Property(Property::Int64(_))) => true,
66            (ProcType::Number, Value::Property(Property::Float64(_))) => true,
67            (ProcType::Boolean, Value::Property(Property::Bool(_))) => true,
68            _ => false,
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub struct ProcArgSpec {
75    pub name: String,
76    pub ty: ProcType,
77}
78
79#[derive(Debug, Clone)]
80pub struct ProcOutSpec {
81    pub name: String,
82    pub ty: ProcType,
83}
84
85/// A procedure registered with a [`ProcedureRegistry`]. The TCK
86/// harness builds one per `And there exists a procedure ...` step by
87/// collating the signature and the gherkin data table: each data row
88/// contributes one entry to `rows` where the leading cells are the
89/// input-column values (matched against call arguments) and the
90/// trailing cells are the output-column values (projected by
91/// YIELD).
92///
93/// Built-in procedures — `db.labels()` and friends — leave `rows`
94/// empty and set `builtin` so the executor materialises the row set
95/// live from the current graph via [`Procedure::resolve_rows`].
96#[derive(Debug, Clone)]
97pub struct Procedure {
98    pub qualified_name: Vec<String>,
99    pub inputs: Vec<ProcArgSpec>,
100    pub outputs: Vec<ProcOutSpec>,
101    pub rows: Vec<ProcRow>,
102    pub builtin: Option<BuiltinProc>,
103}
104
105/// Identifies a procedure whose rows are derived from the live graph
106/// at call time rather than pre-populated in [`Procedure::rows`].
107/// Keeps the procedure surface uniform — the executor still iterates
108/// `ProcRow`s; the only difference is who produced them.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum BuiltinProc {
111    /// `db.labels()` — yields one row per distinct node label.
112    DbLabels,
113    /// `db.relationshipTypes()` — yields one row per distinct edge type.
114    DbRelationshipTypes,
115    /// `db.propertyKeys()` — yields one row per distinct property key
116    /// observed on any node or edge.
117    DbPropertyKeys,
118    /// `db.constraints()` — yields one row per registered constraint,
119    /// carrying `name`, `label`, `property`, and `type` columns.
120    /// Mirrors the `SHOW CONSTRAINTS` surface.
121    DbConstraints,
122    /// `apoc.create.node(labels, props)` — write builtin that
123    /// creates a node with the given labels and properties and
124    /// yields it as `node`. The first builtin that mutates the
125    /// store, so it goes through [`Procedure::resolve_write_rows`]
126    /// rather than [`Procedure::resolve_rows`] — the args drive
127    /// the row directly, no candidate-row filtering needed.
128    #[cfg(feature = "apoc-create")]
129    ApocCreateNode,
130    /// `apoc.create.relationship(from, relType, props, to)` —
131    /// write builtin that creates an edge between two existing
132    /// nodes and yields it as `rel`. Argument order matches
133    /// Neo4j's APOC (relType + props between the two endpoints,
134    /// not at the end). Both endpoints must already exist; this
135    /// procedure does not auto-create missing nodes.
136    #[cfg(feature = "apoc-create")]
137    ApocCreateRelationship,
138    /// `apoc.create.addLabels(node|nodes, labels)` — adds the
139    /// given labels to the supplied nodes (no-op for labels
140    /// already present) and yields each updated node back. The
141    /// first arg accepts either a single Node or a list of Nodes
142    /// to match Neo4j APOC's variadic input convention.
143    #[cfg(feature = "apoc-create")]
144    ApocCreateAddLabels,
145    /// `apoc.create.removeLabels(node|nodes, labels)` — removes
146    /// the given labels from the supplied nodes (no-op for
147    /// labels not present) and yields each updated node back.
148    #[cfg(feature = "apoc-create")]
149    ApocCreateRemoveLabels,
150    /// `apoc.create.setLabels(node|nodes, labels)` — replaces
151    /// the entire label set on each supplied node with the given
152    /// list and yields the result back.
153    #[cfg(feature = "apoc-create")]
154    ApocCreateSetLabels,
155    /// `apoc.create.setProperty(node|nodes, key, value)` — sets
156    /// (or, if value is null, clears) a single property on each
157    /// supplied node and yields the result. Mirrors Neo4j APOC's
158    /// signature; for relationships use
159    /// [`Self::ApocCreateSetRelProperty`].
160    #[cfg(feature = "apoc-create")]
161    ApocCreateSetProperty,
162    /// `apoc.create.setRelProperty(rel|rels, key, value)` —
163    /// relationship-scope counterpart to
164    /// [`Self::ApocCreateSetProperty`].
165    #[cfg(feature = "apoc-create")]
166    ApocCreateSetRelProperty,
167    /// `apoc.refactor.setType(rel, newType)` — change a
168    /// relationship's type. Implementation deletes the old edge
169    /// and creates a new one with the supplied type, preserving
170    /// endpoints and properties; the new edge has a fresh
171    /// EdgeId. Yields the new edge as `rel`.
172    #[cfg(feature = "apoc-refactor")]
173    ApocRefactorSetType,
174    /// `apoc.meta.schema()` — read-only introspection that walks
175    /// the live graph and produces a single-row Map keyed by
176    /// label and relationship-type names. Each entry carries a
177    /// `count`, a `type` discriminator (`"node"` or
178    /// `"relationship"`), and a `properties` map describing each
179    /// observed property's APOC type and (for nodes) whether
180    /// it's covered by a property index. O(N) in the graph size,
181    /// same complexity class as Neo4j APOC's equivalent.
182    #[cfg(feature = "apoc-meta")]
183    ApocMetaSchema,
184    /// `apoc.path.expand(startNode, relationshipFilter,
185    /// labelFilter, minLevel, maxLevel)` — BFS path traversal
186    /// from `startNode`, yielding one `path` column per
187    /// discovered path that satisfies the filter + level
188    /// constraints. Streams rows via [`ProcRows::Streaming`] so
189    /// a downstream `LIMIT` short-circuits enumeration.
190    #[cfg(feature = "apoc-path")]
191    ApocPathExpand,
192    /// `apoc.path.expandConfig(startNode, config)` — config-map
193    /// variant of [`Self::ApocPathExpand`]. Accepts `minLevel`,
194    /// `maxLevel`, `relationshipFilter`, `labelFilter`,
195    /// `uniqueness`, `filterStartNode`, `limit`, `endNodes`,
196    /// `blacklistNodes`, `whitelistNodes`. Other Neo4j APOC
197    /// config keys (`sequence`, `bfs`, `optional`,
198    /// `terminatorNodes`) parse without error but aren't yet
199    /// load-bearing — `bfs` is treated as always-true, the
200    /// others as not-set.
201    #[cfg(feature = "apoc-path")]
202    ApocPathExpandConfig,
203    /// `apoc.path.subgraphNodes(startNode, config)` — walks the
204    /// reachable subgraph under NODE_GLOBAL uniqueness and
205    /// yields one row per distinct node reached, under the
206    /// `node` column. Same config surface as expandConfig, but
207    /// `minLevel` is forced to 0 (start node is included) and
208    /// `uniqueness` is forced to NODE_GLOBAL.
209    #[cfg(feature = "apoc-path")]
210    ApocPathSubgraphNodes,
211    /// `apoc.path.subgraphAll(startNode, config)` — walks the
212    /// reachable subgraph and yields a single row with two
213    /// columns: `nodes` (list of distinct nodes) and
214    /// `relationships` (list of distinct edges). Enumeration is
215    /// eager internally since the output is a single aggregate.
216    #[cfg(feature = "apoc-path")]
217    ApocPathSubgraphAll,
218    /// `apoc.path.spanningTree(startNode, config)` — walks under
219    /// NODE_GLOBAL uniqueness and yields one row per reached
220    /// node carrying its discovery path (the first BFS-order
221    /// path that reached that node) under the `path` column.
222    #[cfg(feature = "apoc-path")]
223    ApocPathSpanningTree,
224    /// `apoc.cypher.run(cypher, params)` — parse / plan / execute
225    /// the supplied Cypher statement against the current reader
226    /// and yield each result row as a single `value` column
227    /// carrying a `Map<String, Value>`. Rejects plans containing
228    /// write operators — callers who need writes route through
229    /// [`Self::ApocCypherDoIt`]. Registered as a write builtin
230    /// so the dispatch path has access to both reader and
231    /// procedure registry; the read-only check fires after
232    /// planning.
233    #[cfg(feature = "apoc-cypher")]
234    ApocCypherRun,
235    /// `apoc.cypher.doIt(cypher, params)` — same shape as
236    /// `apoc.cypher.run` but allows writes. The inner execution
237    /// uses the outer query's writer, so mutations accumulate in
238    /// the same buffer and commit atomically with the enclosing
239    /// transaction.
240    #[cfg(feature = "apoc-cypher")]
241    ApocCypherDoIt,
242    /// `apoc.load.json(urlOrPath [, path])` — stream the top-
243    /// level elements of a JSON document fetched from a local
244    /// file or an HTTP URL. Security gates live on the
245    /// ImportConfig attached to the ProcedureRegistry; a
246    /// missing / disabled config fails every call.
247    #[cfg(feature = "apoc-load")]
248    ApocLoadJson,
249    /// `apoc.load.csv(urlOrPath [, config])` — stream CSV rows
250    /// (lineNo, list, map) from the source. Same security
251    /// model as [`Self::ApocLoadJson`].
252    #[cfg(feature = "apoc-load")]
253    ApocLoadCsv,
254    /// `apoc.export.csv.all(file, config)` — serialise every
255    /// node and relationship to a CSV file. Reuses
256    /// ImportConfig's gates for file writes.
257    #[cfg(feature = "apoc-export")]
258    ApocExportCsvAll,
259    /// `apoc.export.csv.query(query, file, config)` — execute
260    /// the inner Cypher and serialise each result row as a CSV
261    /// record. Inner query must be read-only.
262    #[cfg(feature = "apoc-export")]
263    ApocExportCsvQuery,
264    /// `apoc.export.json.all(file, config)` — JSONL export of
265    /// every node + relationship.
266    #[cfg(feature = "apoc-export")]
267    ApocExportJsonAll,
268    /// `apoc.export.json.query(query, file, config)` — one
269    /// JSONL object per result row.
270    #[cfg(feature = "apoc-export")]
271    ApocExportJsonQuery,
272    /// `apoc.export.cypher.all(file, config)` — emit re-runnable
273    /// Cypher reconstructing the graph.
274    #[cfg(feature = "apoc-export")]
275    ApocExportCypherAll,
276    /// `apoc.export.cypher.query(query, file, config)` — one
277    /// `RETURN {...} AS row;` statement per result row.
278    #[cfg(feature = "apoc-export")]
279    ApocExportCypherQuery,
280    /// `apoc.trigger.install(databaseName, name, statement,
281    /// selector, config)` — register an after-phase Cypher
282    /// trigger persisted in the trigger CF.
283    #[cfg(feature = "apoc-trigger")]
284    ApocTriggerInstall,
285    /// `apoc.trigger.drop(databaseName, name)` — remove a
286    /// registered trigger.
287    #[cfg(feature = "apoc-trigger")]
288    ApocTriggerDrop,
289    /// `apoc.trigger.list()` — yield one row per registered
290    /// trigger.
291    #[cfg(feature = "apoc-trigger")]
292    ApocTriggerList,
293    /// `apoc.trigger.start(databaseName, name)` — un-pause a
294    /// previously stopped trigger. The change replicates
295    /// across the cluster like install/drop.
296    #[cfg(feature = "apoc-trigger")]
297    ApocTriggerStart,
298    /// `apoc.trigger.stop(databaseName, name)` — pause a
299    /// registered trigger. Paused triggers stay in the
300    /// registry (and replicate) but are skipped on every commit.
301    #[cfg(feature = "apoc-trigger")]
302    ApocTriggerStop,
303}
304
305/// One data-table row. Columns are keyed by declared column name
306/// so the registry can look up either the input side (for arg
307/// matching) or the output side (for YIELD projection) without
308/// recomputing offsets.
309pub type ProcRow = HashMap<String, Value>;
310
311/// A read-procedure row source. Most built-ins materialize a small
312/// finite result up front (`Eager`). Path-traversal procedures with
313/// potentially-unbounded output return a `Streaming` cursor so the
314/// executor pulls one row at a time — a downstream `LIMIT` stops
315/// enumeration without materializing the full path set.
316pub enum ProcRows {
317    Eager(Vec<ProcRow>),
318    Streaming(Box<dyn ProcCursor>),
319}
320
321impl std::fmt::Debug for ProcRows {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        match self {
324            ProcRows::Eager(rows) => f
325                .debug_tuple("Eager")
326                .field(&format_args!("{} rows", rows.len()))
327                .finish(),
328            ProcRows::Streaming(_) => f.debug_struct("Streaming").finish(),
329        }
330    }
331}
332
333/// Lazy row producer used by streaming procedures. Each `advance`
334/// call receives a fresh `GraphReader` reference and pulls the next
335/// output row, returning `None` when exhausted. State that needs to
336/// live across calls (visited sets, frontier queues, the currently-
337/// expanding path) is owned by the cursor itself; the reader is
338/// borrowed afresh on each call so the executor retains the freedom
339/// to switch reader implementations mid-query (e.g. for the
340/// in-transaction overlay reader).
341pub trait ProcCursor {
342    fn advance(&mut self, reader: &dyn GraphReader) -> Result<Option<ProcRow>>;
343}
344
345impl Procedure {
346    /// True when the call arguments match this row's input columns.
347    /// Applied per row during execution — rows whose input cells
348    /// differ from the supplied arg values are filtered out.
349    /// Argument-type coercion (`FLOAT` accepts an integer, etc.) is
350    /// handled by the caller converting the call arg to the declared
351    /// type before comparing here.
352    pub fn row_matches(&self, row: &ProcRow, args: &[Value]) -> bool {
353        for (spec, arg) in self.inputs.iter().zip(args.iter()) {
354            let cell = row.get(&spec.name).unwrap_or(&Value::Null);
355            if !values_equal_for_procedure(cell, arg) {
356                return false;
357            }
358        }
359        true
360    }
361
362    /// True when this procedure mutates the store and therefore
363    /// needs to be dispatched through [`Self::resolve_write_rows`]
364    /// (which receives the writer and the already-evaluated args)
365    /// rather than [`Self::resolve_rows`]. Read-only built-ins and
366    /// pre-populated rows return `false`.
367    pub fn is_write_builtin(&self) -> bool {
368        match self.builtin {
369            #[cfg(feature = "apoc-create")]
370            Some(
371                BuiltinProc::ApocCreateNode
372                | BuiltinProc::ApocCreateRelationship
373                | BuiltinProc::ApocCreateAddLabels
374                | BuiltinProc::ApocCreateRemoveLabels
375                | BuiltinProc::ApocCreateSetLabels
376                | BuiltinProc::ApocCreateSetProperty
377                | BuiltinProc::ApocCreateSetRelProperty,
378            ) => true,
379            #[cfg(feature = "apoc-refactor")]
380            Some(BuiltinProc::ApocRefactorSetType) => true,
381            // Both cypher.run and cypher.doIt go through the
382            // write path — run uses it only for the plan-level
383            // read-only check it performs *after* planning, doIt
384            // uses it because it actually writes.
385            #[cfg(feature = "apoc-cypher")]
386            Some(BuiltinProc::ApocCypherRun | BuiltinProc::ApocCypherDoIt) => true,
387            #[cfg(feature = "apoc-trigger")]
388            Some(
389                BuiltinProc::ApocTriggerInstall
390                | BuiltinProc::ApocTriggerDrop
391                | BuiltinProc::ApocTriggerStart
392                | BuiltinProc::ApocTriggerStop,
393            ) => true,
394            _ => false,
395        }
396    }
397
398    /// Produce the row source the executor should iterate. Static
399    /// procedures simply hand back their pre-populated `rows` as
400    /// `Eager`; built-ins with bounded output derive their rows
401    /// from the live graph (still eager, but live). Streaming
402    /// built-ins — path traversals, subgraph walks, `apoc.load.*`
403    /// — hand back a cursor so the executor can pull lazily and
404    /// short-circuit on downstream `LIMIT`. `args` carries the
405    /// call arguments after type coercion; today's eager
406    /// builtins ignore them (their inputs are empty), but
407    /// streaming cursors use them to seed traversal state. The
408    /// procedure registry is threaded through so load cursors
409    /// can read their `ImportConfig` at construction time; most
410    /// built-ins ignore it.
411    pub fn resolve_rows(
412        &self,
413        reader: &dyn GraphReader,
414        args: &[Value],
415        procedures: &ProcedureRegistry,
416    ) -> Result<ProcRows> {
417        let _ = args;
418        let _ = procedures;
419        match self.builtin {
420            None => Ok(ProcRows::Eager(self.rows.clone())),
421            Some(BuiltinProc::DbLabels) => builtin_db_labels(reader).map(ProcRows::Eager),
422            Some(BuiltinProc::DbRelationshipTypes) => {
423                builtin_db_relationship_types(reader).map(ProcRows::Eager)
424            }
425            Some(BuiltinProc::DbPropertyKeys) => {
426                builtin_db_property_keys(reader).map(ProcRows::Eager)
427            }
428            Some(BuiltinProc::DbConstraints) => builtin_db_constraints(reader).map(ProcRows::Eager),
429            #[cfg(feature = "apoc-meta")]
430            Some(BuiltinProc::ApocMetaSchema) => {
431                builtin_apoc_meta_schema(reader).map(ProcRows::Eager)
432            }
433            #[cfg(feature = "apoc-path")]
434            Some(BuiltinProc::ApocPathExpand) => {
435                let cfg = crate::apoc_path::config_from_expand_args(args)?;
436                Ok(ProcRows::Streaming(Box::new(
437                    crate::apoc_path::ExpandCursor::new(cfg),
438                )))
439            }
440            #[cfg(feature = "apoc-path")]
441            Some(BuiltinProc::ApocPathExpandConfig) => {
442                let cfg = crate::apoc_path::config_from_expand_config_args(args)?;
443                Ok(ProcRows::Streaming(Box::new(
444                    crate::apoc_path::ExpandCursor::new(cfg),
445                )))
446            }
447            #[cfg(feature = "apoc-path")]
448            Some(BuiltinProc::ApocPathSubgraphNodes) => {
449                let cfg = crate::apoc_path::config_from_expand_config_args(args)?;
450                Ok(ProcRows::Streaming(Box::new(
451                    crate::apoc_path::SubgraphNodesCursor::new(cfg),
452                )))
453            }
454            #[cfg(feature = "apoc-path")]
455            Some(BuiltinProc::ApocPathSubgraphAll) => {
456                let cfg = crate::apoc_path::config_from_expand_config_args(args)?;
457                Ok(ProcRows::Streaming(Box::new(
458                    crate::apoc_path::SubgraphAllCursor::new(cfg),
459                )))
460            }
461            #[cfg(feature = "apoc-path")]
462            Some(BuiltinProc::ApocPathSpanningTree) => {
463                let cfg = crate::apoc_path::config_from_expand_config_args(args)?;
464                Ok(ProcRows::Streaming(Box::new(
465                    crate::apoc_path::SpanningTreeCursor::new(cfg),
466                )))
467            }
468            #[cfg(feature = "apoc-create")]
469            Some(BuiltinProc::ApocCreateNode) => Err(Error::Procedure(
470                "apoc.create.node is a write procedure — call via resolve_write_rows".into(),
471            )),
472            #[cfg(feature = "apoc-create")]
473            Some(BuiltinProc::ApocCreateRelationship) => Err(Error::Procedure(
474                "apoc.create.relationship is a write procedure — call via resolve_write_rows"
475                    .into(),
476            )),
477            #[cfg(feature = "apoc-create")]
478            Some(
479                BuiltinProc::ApocCreateAddLabels
480                | BuiltinProc::ApocCreateRemoveLabels
481                | BuiltinProc::ApocCreateSetLabels
482                | BuiltinProc::ApocCreateSetProperty
483                | BuiltinProc::ApocCreateSetRelProperty,
484            ) => Err(Error::Procedure(
485                "apoc.create.* mutator is a write procedure — call via resolve_write_rows".into(),
486            )),
487            #[cfg(feature = "apoc-refactor")]
488            Some(BuiltinProc::ApocRefactorSetType) => Err(Error::Procedure(
489                "apoc.refactor.setType is a write procedure — call via resolve_write_rows".into(),
490            )),
491            #[cfg(feature = "apoc-cypher")]
492            Some(BuiltinProc::ApocCypherRun | BuiltinProc::ApocCypherDoIt) => {
493                Err(Error::Procedure(
494                    "apoc.cypher.* is dispatched through resolve_write_rows — caller bug".into(),
495                ))
496            }
497            #[cfg(feature = "apoc-load")]
498            Some(BuiltinProc::ApocLoadJson) => {
499                let input =
500                    crate::apoc_load::expect_source_arg(&args[0], "first argument (urlOrPath)")?;
501                let pointer = if args.len() > 1 {
502                    crate::apoc_load::expect_optional_string(&args[1], "second argument (path)")?
503                } else {
504                    None
505                };
506                let cfg = crate::apoc_load::import_config_from_registry(procedures);
507                Ok(ProcRows::Streaming(Box::new(
508                    crate::apoc_load::LoadJsonCursor::new(cfg, input, pointer),
509                )))
510            }
511            #[cfg(feature = "apoc-load")]
512            Some(BuiltinProc::ApocLoadCsv) => {
513                let input =
514                    crate::apoc_load::expect_source_arg(&args[0], "first argument (urlOrPath)")?;
515                let csv_cfg = if args.len() > 1 {
516                    crate::apoc_load::expect_optional_config_map(&args[1])?
517                } else {
518                    None
519                };
520                let cfg = crate::apoc_load::import_config_from_registry(procedures);
521                Ok(ProcRows::Streaming(Box::new(
522                    crate::apoc_load::LoadCsvCursor::new(cfg, input, csv_cfg.as_ref()),
523                )))
524            }
525            #[cfg(feature = "apoc-export")]
526            Some(BuiltinProc::ApocExportCsvAll) => {
527                let file = crate::apoc_export::expect_all_args(args)?;
528                let cfg = crate::apoc_load::import_config_from_registry(procedures);
529                crate::apoc_export::export_csv_all(reader, &cfg, &file).map(ProcRows::Eager)
530            }
531            #[cfg(feature = "apoc-export")]
532            Some(BuiltinProc::ApocExportCsvQuery) => {
533                let (query, file) = crate::apoc_export::expect_query_args(args)?;
534                let cfg = crate::apoc_load::import_config_from_registry(procedures);
535                crate::apoc_export::export_csv_query(reader, &cfg, procedures, &query, &file)
536                    .map(ProcRows::Eager)
537            }
538            #[cfg(feature = "apoc-export")]
539            Some(BuiltinProc::ApocExportJsonAll) => {
540                let file = crate::apoc_export::expect_all_args(args)?;
541                let cfg = crate::apoc_load::import_config_from_registry(procedures);
542                crate::apoc_export::export_json_all(reader, &cfg, &file).map(ProcRows::Eager)
543            }
544            #[cfg(feature = "apoc-export")]
545            Some(BuiltinProc::ApocExportJsonQuery) => {
546                let (query, file) = crate::apoc_export::expect_query_args(args)?;
547                let cfg = crate::apoc_load::import_config_from_registry(procedures);
548                crate::apoc_export::export_json_query(reader, &cfg, procedures, &query, &file)
549                    .map(ProcRows::Eager)
550            }
551            #[cfg(feature = "apoc-export")]
552            Some(BuiltinProc::ApocExportCypherAll) => {
553                let file = crate::apoc_export::expect_all_args(args)?;
554                let cfg = crate::apoc_load::import_config_from_registry(procedures);
555                crate::apoc_export::export_cypher_all(reader, &cfg, &file).map(ProcRows::Eager)
556            }
557            #[cfg(feature = "apoc-export")]
558            Some(BuiltinProc::ApocExportCypherQuery) => {
559                let (query, file) = crate::apoc_export::expect_query_args(args)?;
560                let cfg = crate::apoc_load::import_config_from_registry(procedures);
561                crate::apoc_export::export_cypher_query(reader, &cfg, procedures, &query, &file)
562                    .map(ProcRows::Eager)
563            }
564            #[cfg(feature = "apoc-trigger")]
565            Some(
566                BuiltinProc::ApocTriggerInstall
567                | BuiltinProc::ApocTriggerDrop
568                | BuiltinProc::ApocTriggerStart
569                | BuiltinProc::ApocTriggerStop,
570            ) => Err(Error::Procedure(
571                "apoc.trigger.install/drop/start/stop are write builtins — must go through resolve_write_rows"
572                    .into(),
573            )),
574            #[cfg(feature = "apoc-trigger")]
575            Some(BuiltinProc::ApocTriggerList) => {
576                let registry = procedures.trigger_registry().ok_or_else(|| {
577                    Error::Procedure(
578                        "apoc.trigger.* not available — server did not attach a trigger registry"
579                            .into(),
580                    )
581                })?;
582                crate::apoc_trigger::list_call(registry).map(ProcRows::Eager)
583            }
584        }
585    }
586
587    /// Write-procedure dispatch path. The args are already
588    /// evaluated and type-checked; the row is produced as a side
589    /// effect of the mutation (e.g. the newly-created node) so
590    /// `row_matches` is skipped for these procedures. The
591    /// procedure registry is threaded through for procedures
592    /// (like `apoc.cypher.doIt`) that recurse into the executor
593    /// and need to resolve nested `CALL` sites against the same
594    /// set; most write built-ins ignore it.
595    #[cfg(any(
596        feature = "apoc-create",
597        feature = "apoc-refactor",
598        feature = "apoc-cypher"
599    ))]
600    pub fn resolve_write_rows(
601        &self,
602        reader: &dyn GraphReader,
603        writer: &dyn GraphWriter,
604        args: &[Value],
605        procedures: &ProcedureRegistry,
606    ) -> Result<Vec<ProcRow>> {
607        let _ = procedures;
608        match self.builtin {
609            #[cfg(feature = "apoc-create")]
610            Some(BuiltinProc::ApocCreateNode) => apoc_create_node(writer, args),
611            #[cfg(feature = "apoc-create")]
612            Some(BuiltinProc::ApocCreateRelationship) => apoc_create_relationship(writer, args),
613            #[cfg(feature = "apoc-create")]
614            Some(BuiltinProc::ApocCreateAddLabels) => {
615                apoc_label_mutator(reader, writer, args, LabelMode::Add)
616            }
617            #[cfg(feature = "apoc-create")]
618            Some(BuiltinProc::ApocCreateRemoveLabels) => {
619                apoc_label_mutator(reader, writer, args, LabelMode::Remove)
620            }
621            #[cfg(feature = "apoc-create")]
622            Some(BuiltinProc::ApocCreateSetLabels) => {
623                apoc_label_mutator(reader, writer, args, LabelMode::Set)
624            }
625            #[cfg(feature = "apoc-create")]
626            Some(BuiltinProc::ApocCreateSetProperty) => {
627                apoc_set_node_property(reader, writer, args)
628            }
629            #[cfg(feature = "apoc-create")]
630            Some(BuiltinProc::ApocCreateSetRelProperty) => {
631                apoc_set_rel_property(reader, writer, args)
632            }
633            #[cfg(feature = "apoc-refactor")]
634            Some(BuiltinProc::ApocRefactorSetType) => apoc_refactor_set_type(reader, writer, args),
635            #[cfg(feature = "apoc-cypher")]
636            Some(BuiltinProc::ApocCypherRun) => {
637                crate::apoc_cypher::run_cypher(reader, writer, args, procedures, false)
638            }
639            #[cfg(feature = "apoc-cypher")]
640            Some(BuiltinProc::ApocCypherDoIt) => {
641                crate::apoc_cypher::run_cypher(reader, writer, args, procedures, true)
642            }
643            #[cfg(feature = "apoc-trigger")]
644            Some(BuiltinProc::ApocTriggerInstall) => {
645                crate::apoc_trigger::install_call(writer, args)
646            }
647            #[cfg(feature = "apoc-trigger")]
648            Some(BuiltinProc::ApocTriggerDrop) => crate::apoc_trigger::drop_call(writer, args),
649            #[cfg(feature = "apoc-trigger")]
650            Some(BuiltinProc::ApocTriggerStart) => {
651                let registry = procedures.trigger_registry().ok_or_else(|| {
652                    Error::Procedure("apoc.trigger.start: no trigger registry attached".into())
653                })?;
654                crate::apoc_trigger::start_call(registry, writer, args)
655            }
656            #[cfg(feature = "apoc-trigger")]
657            Some(BuiltinProc::ApocTriggerStop) => {
658                let registry = procedures.trigger_registry().ok_or_else(|| {
659                    Error::Procedure("apoc.trigger.stop: no trigger registry attached".into())
660                })?;
661                crate::apoc_trigger::stop_call(registry, writer, args)
662            }
663            _ => Err(Error::Procedure("procedure is not a write builtin".into())),
664        }
665    }
666}
667
668fn str_row(column: &str, value: String) -> ProcRow {
669    let mut row = HashMap::new();
670    row.insert(column.to_string(), Value::Property(Property::String(value)));
671    row
672}
673
674fn builtin_db_labels(reader: &dyn GraphReader) -> Result<Vec<ProcRow>> {
675    let mut labels: BTreeSet<String> = BTreeSet::new();
676    for id in reader.all_node_ids()? {
677        if let Some(n) = reader.get_node(id)? {
678            for l in n.labels {
679                labels.insert(l);
680            }
681        }
682    }
683    Ok(labels.into_iter().map(|l| str_row("label", l)).collect())
684}
685
686fn builtin_db_relationship_types(reader: &dyn GraphReader) -> Result<Vec<ProcRow>> {
687    let mut types: BTreeSet<String> = BTreeSet::new();
688    for id in reader.all_node_ids()? {
689        for (edge_id, _) in reader.outgoing(id)? {
690            if let Some(e) = reader.get_edge(edge_id)? {
691                types.insert(e.edge_type);
692            }
693        }
694    }
695    Ok(types
696        .into_iter()
697        .map(|t| str_row("relationshipType", t))
698        .collect())
699}
700
701fn builtin_db_constraints(reader: &dyn GraphReader) -> Result<Vec<ProcRow>> {
702    let specs = reader.list_property_constraints()?;
703    Ok(specs
704        .into_iter()
705        .map(|spec| {
706            let mut row: ProcRow = HashMap::new();
707            row.insert("name".into(), Value::Property(Property::String(spec.name)));
708            let (scope_tag, target) = match spec.scope {
709                meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
710                meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
711            };
712            row.insert(
713                "scope".into(),
714                Value::Property(Property::String(scope_tag.into())),
715            );
716            row.insert("label".into(), Value::Property(Property::String(target)));
717            let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
718            row.insert("properties".into(), Value::Property(Property::List(props)));
719            row.insert(
720                "type".into(),
721                Value::Property(Property::String(spec.kind.as_string())),
722            );
723            row
724        })
725        .collect())
726}
727
728fn builtin_db_property_keys(reader: &dyn GraphReader) -> Result<Vec<ProcRow>> {
729    let mut keys: BTreeSet<String> = BTreeSet::new();
730    for id in reader.all_node_ids()? {
731        if let Some(n) = reader.get_node(id)? {
732            for k in n.properties.keys() {
733                keys.insert(k.clone());
734            }
735            for (edge_id, _) in reader.outgoing(id)? {
736                if let Some(e) = reader.get_edge(edge_id)? {
737                    for k in e.properties.keys() {
738                        keys.insert(k.clone());
739                    }
740                }
741            }
742        }
743    }
744    Ok(keys
745        .into_iter()
746        .map(|k| str_row("propertyKey", k))
747        .collect())
748}
749
750/// Implementation of `apoc.meta.schema()` — one full graph walk,
751/// collecting per-label and per-relationship-type statistics and
752/// merging them with the registered index set into a single Map
753/// keyed by label / type name. Matches Neo4j's APOC shape closely
754/// enough that ported dashboards rendering the `value` column
755/// keep working:
756///
757/// ```text
758/// {
759///   <label>: {
760///     type: "node",
761///     count: <int>,
762///     properties: {
763///       <key>: { type: <APOC type name>, indexed: <bool> }
764///     }
765///   },
766///   <edgeType>: {
767///     type: "relationship",
768///     count: <int>,
769///     properties: {
770///       <key>: { type: <APOC type name> }
771///     }
772///   }
773/// }
774/// ```
775///
776/// Emits a single row under the `value` column (standard APOC
777/// convention). Unlabeled nodes are omitted — Neo4j APOC does the
778/// same, since there's no natural key for that bucket.
779#[cfg(feature = "apoc-meta")]
780fn builtin_apoc_meta_schema(reader: &dyn GraphReader) -> Result<Vec<ProcRow>> {
781    use std::collections::HashSet;
782    // Per-label accumulator: count + observed (key → type set).
783    let mut label_count: HashMap<String, i64> = HashMap::new();
784    let mut label_props: HashMap<String, HashMap<String, HashSet<&'static str>>> = HashMap::new();
785    let mut edge_count: HashMap<String, i64> = HashMap::new();
786    let mut edge_props: HashMap<String, HashMap<String, HashSet<&'static str>>> = HashMap::new();
787
788    for id in reader.all_node_ids()? {
789        let node = match reader.get_node(id)? {
790            Some(n) => n,
791            None => continue,
792        };
793        for label in &node.labels {
794            *label_count.entry(label.clone()).or_insert(0) += 1;
795            let per_label = label_props.entry(label.clone()).or_default();
796            for (k, v) in &node.properties {
797                per_label
798                    .entry(k.clone())
799                    .or_default()
800                    .insert(apoc_schema_type(v));
801            }
802        }
803        for (edge_id, _) in reader.outgoing(id)? {
804            let edge = match reader.get_edge(edge_id)? {
805                Some(e) => e,
806                None => continue,
807            };
808            *edge_count.entry(edge.edge_type.clone()).or_insert(0) += 1;
809            let per_type = edge_props.entry(edge.edge_type.clone()).or_default();
810            for (k, v) in &edge.properties {
811                per_type
812                    .entry(k.clone())
813                    .or_default()
814                    .insert(apoc_schema_type(v));
815            }
816        }
817    }
818
819    // Index lookup: a property is marked `indexed: true` if any
820    // registered index mentions it (composite indexes count for
821    // each of their columns). Only node-scope indexes feed the
822    // per-label view; relationship property indexes follow the
823    // same pattern but under the edge type.
824    let mut node_indexed: HashMap<String, HashSet<String>> = HashMap::new();
825    for (label, props) in reader.list_property_indexes()? {
826        let entry = node_indexed.entry(label).or_default();
827        for p in props {
828            entry.insert(p);
829        }
830    }
831    let mut edge_indexed: HashMap<String, HashSet<String>> = HashMap::new();
832    for (edge_type, props) in reader.list_edge_property_indexes()? {
833        let entry = edge_indexed.entry(edge_type).or_default();
834        for p in props {
835            entry.insert(p);
836        }
837    }
838
839    let mut schema: HashMap<String, Property> = HashMap::new();
840    for (label, count) in label_count {
841        let props = label_props.remove(&label).unwrap_or_default();
842        let indexed = node_indexed.remove(&label).unwrap_or_default();
843        schema.insert(
844            label,
845            Property::Map(schema_entry(count, "node", props, Some(&indexed))),
846        );
847    }
848    for (edge_type, count) in edge_count {
849        let props = edge_props.remove(&edge_type).unwrap_or_default();
850        let indexed = edge_indexed.remove(&edge_type).unwrap_or_default();
851        schema.insert(
852            edge_type,
853            Property::Map(schema_entry(count, "relationship", props, Some(&indexed))),
854        );
855    }
856
857    let mut row = HashMap::new();
858    row.insert("value".to_string(), Value::Property(Property::Map(schema)));
859    Ok(vec![row])
860}
861
862/// Build one label-or-type entry in the schema map. `type_tag`
863/// is `"node"` or `"relationship"`. If a property was observed
864/// with several value kinds across rows, its `type` cell collapses
865/// those into a sorted `"STRING|INTEGER"` string — that's the
866/// convention Neo4j APOC uses for heterogeneous columns.
867#[cfg(feature = "apoc-meta")]
868fn schema_entry(
869    count: i64,
870    type_tag: &str,
871    props: HashMap<String, std::collections::HashSet<&'static str>>,
872    indexed: Option<&std::collections::HashSet<String>>,
873) -> HashMap<String, Property> {
874    let mut out: HashMap<String, Property> = HashMap::new();
875    out.insert("type".into(), Property::String(type_tag.into()));
876    out.insert("count".into(), Property::Int64(count));
877    let mut properties: HashMap<String, Property> = HashMap::new();
878    for (k, types) in props {
879        let mut sorted: Vec<&'static str> = types.into_iter().collect();
880        sorted.sort_unstable();
881        let ty = sorted.join("|");
882        let mut info: HashMap<String, Property> = HashMap::new();
883        info.insert("type".into(), Property::String(ty));
884        if let Some(idx) = indexed {
885            info.insert("indexed".into(), Property::Bool(idx.contains(&k)));
886        }
887        properties.insert(k, Property::Map(info));
888    }
889    out.insert("properties".into(), Property::Map(properties));
890    out
891}
892
893/// Upper-snake-case type name for a [`Property`], matching the
894/// spelling used by `apoc.meta.type` (e.g. `"STRING"`,
895/// `"DATE_TIME"`). Duplicates the scalar-side table in
896/// `meshdb_apoc::meta` because procedures.rs deliberately stays
897/// decoupled from meshdb-apoc.
898#[cfg(feature = "apoc-meta")]
899fn apoc_schema_type(p: &Property) -> &'static str {
900    match p {
901        Property::Null => "NULL",
902        Property::String(_) => "STRING",
903        Property::Int64(_) => "INTEGER",
904        Property::Float64(_) => "FLOAT",
905        Property::Bool(_) => "BOOLEAN",
906        Property::List(_) => "LIST",
907        Property::Map(_) => "MAP",
908        Property::DateTime { .. } => "DATE_TIME",
909        Property::LocalDateTime(_) => "LOCAL_DATE_TIME",
910        Property::Date(_) => "DATE",
911        Property::Time { .. } => "TIME",
912        Property::Duration(_) => "DURATION",
913        Property::Point(_) => "POINT",
914    }
915}
916
917/// Implementation of the `apoc.create.node(labels, props)` write
918/// builtin. Constructs a [`Node`] from the args, persists it via
919/// `writer.put_node`, and yields a single row with the new node
920/// under the `node` column. Both args may be Null (Neo4j allows
921/// `apoc.create.node(null, null)` — yields an empty unlabeled
922/// node), but a non-null first arg must be a list of strings and
923/// a non-null second arg must be a map.
924#[cfg(feature = "apoc-create")]
925fn apoc_create_node(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
926    let labels = match &args[0] {
927        Value::Null | Value::Property(Property::Null) => Vec::new(),
928        Value::List(items) => items
929            .iter()
930            .map(|v| match v {
931                Value::Property(Property::String(s)) => Ok(s.clone()),
932                other => Err(Error::Procedure(format!(
933                    "apoc.create.node: labels must be strings, got {other:?}"
934                ))),
935            })
936            .collect::<Result<Vec<_>>>()?,
937        Value::Property(Property::List(items)) => items
938            .iter()
939            .map(|p| match p {
940                Property::String(s) => Ok(s.clone()),
941                other => Err(Error::Procedure(format!(
942                    "apoc.create.node: labels must be strings, got {other:?}"
943                ))),
944            })
945            .collect::<Result<Vec<_>>>()?,
946        other => {
947            return Err(Error::Procedure(format!(
948                "apoc.create.node: first argument must be a list of strings, got {other:?}"
949            )));
950        }
951    };
952    let props: HashMap<String, Property> = match &args[1] {
953        Value::Null | Value::Property(Property::Null) => HashMap::new(),
954        Value::Map(pairs) => pairs
955            .iter()
956            .map(|(k, v)| Ok((k.clone(), value_to_storable_property(v)?)))
957            .collect::<Result<HashMap<_, _>>>()?,
958        Value::Property(Property::Map(entries)) => entries
959            .iter()
960            .map(|(k, p)| (k.clone(), p.clone()))
961            .collect(),
962        other => {
963            return Err(Error::Procedure(format!(
964                "apoc.create.node: second argument must be a map, got {other:?}"
965            )));
966        }
967    };
968    let mut node = Node::new();
969    node.labels = labels;
970    // Skip null property values — matches the openCypher rule used
971    // by `CREATE (n {k: null})`: the key is treated as absent.
972    for (k, p) in props {
973        if !matches!(p, Property::Null) {
974            node.properties.insert(k, p);
975        }
976    }
977    writer.put_node(&node)?;
978    let mut row = HashMap::new();
979    row.insert("node".to_string(), Value::Node(node));
980    Ok(vec![row])
981}
982
983/// Implementation of the `apoc.create.relationship(from, type,
984/// props, to)` write builtin. Argument order matches Neo4j's
985/// APOC: from + type + props + to. Both endpoint args must
986/// resolve to a [`Value::Node`]; the procedure does not
987/// auto-create missing endpoints (use `apoc.create.node` first
988/// or merge through CREATE).
989#[cfg(feature = "apoc-create")]
990fn apoc_create_relationship(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
991    let from = expect_node_id(&args[0], "first argument (from)")?;
992    let rel_type = match &args[1] {
993        Value::Property(Property::String(s)) => s.clone(),
994        Value::Null | Value::Property(Property::Null) => {
995            return Err(Error::Procedure(
996                "apoc.create.relationship: relationship type must not be null".into(),
997            ));
998        }
999        other => {
1000            return Err(Error::Procedure(format!(
1001                "apoc.create.relationship: relationship type must be a string, got {other:?}"
1002            )));
1003        }
1004    };
1005    let props: HashMap<String, Property> = match &args[2] {
1006        Value::Null | Value::Property(Property::Null) => HashMap::new(),
1007        Value::Map(pairs) => pairs
1008            .iter()
1009            .map(|(k, v)| Ok((k.clone(), value_to_storable_property(v)?)))
1010            .collect::<Result<HashMap<_, _>>>()?,
1011        Value::Property(Property::Map(entries)) => entries
1012            .iter()
1013            .map(|(k, p)| (k.clone(), p.clone()))
1014            .collect(),
1015        other => {
1016            return Err(Error::Procedure(format!(
1017                "apoc.create.relationship: third argument must be a map, got {other:?}"
1018            )));
1019        }
1020    };
1021    let to = expect_node_id(&args[3], "fourth argument (to)")?;
1022    let mut edge = Edge::new(rel_type, from, to);
1023    for (k, p) in props {
1024        if !matches!(p, Property::Null) {
1025            edge.properties.insert(k, p);
1026        }
1027    }
1028    writer.put_edge(&edge)?;
1029    let mut row = HashMap::new();
1030    row.insert("rel".to_string(), Value::Edge(edge));
1031    Ok(vec![row])
1032}
1033
1034/// Implementation of `apoc.refactor.setType(rel, newType)`.
1035/// Edges in Mesh carry their type as immutable storage state, so
1036/// changing it requires a delete + recreate. The new edge keeps
1037/// the source / target / properties of the old but receives a
1038/// fresh [`EdgeId`] — mirrors what Neo4j APOC does (the
1039/// procedure documents the ID change explicitly).
1040#[cfg(feature = "apoc-refactor")]
1041fn apoc_refactor_set_type(
1042    reader: &dyn GraphReader,
1043    writer: &dyn GraphWriter,
1044    args: &[Value],
1045) -> Result<Vec<ProcRow>> {
1046    let old_id = match &args[0] {
1047        Value::Edge(e) => e.id,
1048        Value::Null | Value::Property(Property::Null) => {
1049            return Err(Error::Procedure(
1050                "apoc.refactor.setType: relationship argument must not be null".into(),
1051            ));
1052        }
1053        other => {
1054            return Err(Error::Procedure(format!(
1055                "apoc.refactor.setType: first argument must be a relationship, got {other:?}"
1056            )));
1057        }
1058    };
1059    let new_type = match &args[1] {
1060        Value::Property(Property::String(s)) => s.clone(),
1061        Value::Null | Value::Property(Property::Null) => {
1062            return Err(Error::Procedure(
1063                "apoc.refactor.setType: new type must not be null".into(),
1064            ));
1065        }
1066        other => {
1067            return Err(Error::Procedure(format!(
1068                "apoc.refactor.setType: new type must be a string, got {other:?}"
1069            )));
1070        }
1071    };
1072    let old = reader
1073        .get_edge(old_id)?
1074        .ok_or_else(|| Error::Procedure(format!("edge {old_id:?} no longer exists")))?;
1075    // Same-type case is a no-op: hand the existing edge back
1076    // unchanged so the caller's EdgeId stays valid.
1077    if old.edge_type == new_type {
1078        let mut row = HashMap::new();
1079        row.insert("rel".to_string(), Value::Edge(old));
1080        return Ok(vec![row]);
1081    }
1082    let mut new_edge = Edge::new(new_type, old.source, old.target);
1083    new_edge.properties = old.properties.clone();
1084    writer.delete_edge(old_id)?;
1085    writer.put_edge(&new_edge)?;
1086    let mut row = HashMap::new();
1087    row.insert("rel".to_string(), Value::Edge(new_edge));
1088    Ok(vec![row])
1089}
1090
1091/// Three-way switch shared by the label mutators
1092/// (`apoc.create.addLabels` / `removeLabels` / `setLabels`). The
1093/// per-mode logic is identical except for how the existing label
1094/// list is combined with the supplied list.
1095#[cfg(feature = "apoc-create")]
1096#[derive(Debug, Clone, Copy)]
1097enum LabelMode {
1098    Add,
1099    Remove,
1100    Set,
1101}
1102
1103/// Implementation of `apoc.create.addLabels` / `removeLabels` /
1104/// `setLabels`. The first arg is either a single Node or a list
1105/// of Nodes (matching APOC's variadic input). The second arg is
1106/// a list of label strings. Each input node is reloaded fresh
1107/// from the reader so we apply the mutation to the latest state
1108/// (the Node value in the row may have been captured earlier in
1109/// the query). The updated node is written back via the writer
1110/// and yielded under the `node` column.
1111#[cfg(feature = "apoc-create")]
1112fn apoc_label_mutator(
1113    reader: &dyn GraphReader,
1114    writer: &dyn GraphWriter,
1115    args: &[Value],
1116    mode: LabelMode,
1117) -> Result<Vec<ProcRow>> {
1118    let node_ids = expect_node_or_node_list(&args[0], "first argument")?;
1119    let labels = expect_string_list(&args[1], "second argument (labels)")?;
1120    let mut out: Vec<ProcRow> = Vec::with_capacity(node_ids.len());
1121    for nid in node_ids {
1122        let mut node = reader
1123            .get_node(nid)?
1124            .ok_or_else(|| Error::Procedure(format!("node {nid:?} no longer exists")))?;
1125        match mode {
1126            LabelMode::Add => {
1127                for l in &labels {
1128                    if !node.labels.iter().any(|existing| existing == l) {
1129                        node.labels.push(l.clone());
1130                    }
1131                }
1132            }
1133            LabelMode::Remove => {
1134                node.labels
1135                    .retain(|existing| !labels.iter().any(|l| l == existing));
1136            }
1137            LabelMode::Set => {
1138                node.labels = labels.clone();
1139            }
1140        }
1141        writer.put_node(&node)?;
1142        let mut row = HashMap::new();
1143        row.insert("node".to_string(), Value::Node(node));
1144        out.push(row);
1145    }
1146    Ok(out)
1147}
1148
1149/// Implementation of `apoc.create.setProperty(nodes, key,
1150/// value)`. Sets the property on each supplied node; passing a
1151/// null value clears the property (matches APOC, and matches the
1152/// `SET n.k = null` openCypher rule). The node is reloaded from
1153/// the reader before mutation so concurrent writes earlier in
1154/// the query are picked up.
1155#[cfg(feature = "apoc-create")]
1156fn apoc_set_node_property(
1157    reader: &dyn GraphReader,
1158    writer: &dyn GraphWriter,
1159    args: &[Value],
1160) -> Result<Vec<ProcRow>> {
1161    let node_ids = expect_node_or_node_list(&args[0], "first argument")?;
1162    let key = expect_string(&args[1], "second argument (key)")?;
1163    let value = value_to_storable_property(&args[2])?;
1164    let mut out: Vec<ProcRow> = Vec::with_capacity(node_ids.len());
1165    for nid in node_ids {
1166        let mut node = reader
1167            .get_node(nid)?
1168            .ok_or_else(|| Error::Procedure(format!("node {nid:?} no longer exists")))?;
1169        if matches!(value, Property::Null) {
1170            node.properties.remove(&key);
1171        } else {
1172            node.properties.insert(key.clone(), value.clone());
1173        }
1174        writer.put_node(&node)?;
1175        let mut row = HashMap::new();
1176        row.insert("node".to_string(), Value::Node(node));
1177        out.push(row);
1178    }
1179    Ok(out)
1180}
1181
1182/// Relationship-scope counterpart to [`apoc_set_node_property`].
1183#[cfg(feature = "apoc-create")]
1184fn apoc_set_rel_property(
1185    reader: &dyn GraphReader,
1186    writer: &dyn GraphWriter,
1187    args: &[Value],
1188) -> Result<Vec<ProcRow>> {
1189    let edge_ids = expect_edge_or_edge_list(&args[0], "first argument")?;
1190    let key = expect_string(&args[1], "second argument (key)")?;
1191    let value = value_to_storable_property(&args[2])?;
1192    let mut out: Vec<ProcRow> = Vec::with_capacity(edge_ids.len());
1193    for eid in edge_ids {
1194        let mut edge = reader
1195            .get_edge(eid)?
1196            .ok_or_else(|| Error::Procedure(format!("edge {eid:?} no longer exists")))?;
1197        if matches!(value, Property::Null) {
1198            edge.properties.remove(&key);
1199        } else {
1200            edge.properties.insert(key.clone(), value.clone());
1201        }
1202        writer.put_edge(&edge)?;
1203        let mut row = HashMap::new();
1204        row.insert("rel".to_string(), Value::Edge(edge));
1205        out.push(row);
1206    }
1207    Ok(out)
1208}
1209
1210/// Coerce the variadic first argument used by the label / set-
1211/// property mutators into a flat list of [`NodeId`]s. Accepts a
1212/// single Node or a (possibly nested) list of Nodes — anything
1213/// else is a type error.
1214#[cfg(feature = "apoc-create")]
1215fn expect_node_or_node_list(v: &Value, position: &str) -> Result<Vec<meshdb_core::NodeId>> {
1216    let mut ids: Vec<meshdb_core::NodeId> = Vec::new();
1217    collect_node_ids(v, position, &mut ids)?;
1218    if ids.is_empty() {
1219        return Err(Error::Procedure(format!(
1220            "apoc.create.*: {position} resolved to zero nodes"
1221        )));
1222    }
1223    Ok(ids)
1224}
1225
1226#[cfg(feature = "apoc-create")]
1227fn collect_node_ids(v: &Value, position: &str, out: &mut Vec<meshdb_core::NodeId>) -> Result<()> {
1228    match v {
1229        Value::Node(n) => {
1230            out.push(n.id);
1231            Ok(())
1232        }
1233        Value::List(items) => {
1234            for item in items {
1235                collect_node_ids(item, position, out)?;
1236            }
1237            Ok(())
1238        }
1239        Value::Null | Value::Property(Property::Null) => Ok(()),
1240        other => Err(Error::Procedure(format!(
1241            "apoc.create.*: {position} must be a node or list of nodes, got {other:?}"
1242        ))),
1243    }
1244}
1245
1246/// Edge-scope counterpart to [`expect_node_or_node_list`].
1247#[cfg(feature = "apoc-create")]
1248fn expect_edge_or_edge_list(v: &Value, position: &str) -> Result<Vec<meshdb_core::EdgeId>> {
1249    let mut ids: Vec<meshdb_core::EdgeId> = Vec::new();
1250    collect_edge_ids(v, position, &mut ids)?;
1251    if ids.is_empty() {
1252        return Err(Error::Procedure(format!(
1253            "apoc.create.*: {position} resolved to zero relationships"
1254        )));
1255    }
1256    Ok(ids)
1257}
1258
1259#[cfg(feature = "apoc-create")]
1260fn collect_edge_ids(v: &Value, position: &str, out: &mut Vec<meshdb_core::EdgeId>) -> Result<()> {
1261    match v {
1262        Value::Edge(e) => {
1263            out.push(e.id);
1264            Ok(())
1265        }
1266        Value::List(items) => {
1267            for item in items {
1268                collect_edge_ids(item, position, out)?;
1269            }
1270            Ok(())
1271        }
1272        Value::Null | Value::Property(Property::Null) => Ok(()),
1273        other => Err(Error::Procedure(format!(
1274            "apoc.create.*: {position} must be a relationship or list of relationships, got {other:?}"
1275        ))),
1276    }
1277}
1278
1279#[cfg(feature = "apoc-create")]
1280fn expect_string_list(v: &Value, position: &str) -> Result<Vec<String>> {
1281    match v {
1282        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
1283        Value::List(items) => items
1284            .iter()
1285            .map(|item| match item {
1286                Value::Property(Property::String(s)) => Ok(s.clone()),
1287                other => Err(Error::Procedure(format!(
1288                    "apoc.create.*: {position} must contain strings, got {other:?}"
1289                ))),
1290            })
1291            .collect(),
1292        Value::Property(Property::List(items)) => items
1293            .iter()
1294            .map(|p| match p {
1295                Property::String(s) => Ok(s.clone()),
1296                other => Err(Error::Procedure(format!(
1297                    "apoc.create.*: {position} must contain strings, got {other:?}"
1298                ))),
1299            })
1300            .collect(),
1301        other => Err(Error::Procedure(format!(
1302            "apoc.create.*: {position} must be a list of strings, got {other:?}"
1303        ))),
1304    }
1305}
1306
1307#[cfg(feature = "apoc-create")]
1308fn expect_string(v: &Value, position: &str) -> Result<String> {
1309    match v {
1310        Value::Property(Property::String(s)) => Ok(s.clone()),
1311        other => Err(Error::Procedure(format!(
1312            "apoc.create.*: {position} must be a string, got {other:?}"
1313        ))),
1314    }
1315}
1316
1317/// Resolve an endpoint argument to its [`NodeId`]. Both
1318/// [`Value::Node`] (a fully-materialised node) and a node-id
1319/// integer are accepted; everything else (including null) is a
1320/// type error since a relationship needs concrete endpoints.
1321#[cfg(feature = "apoc-create")]
1322fn expect_node_id(v: &Value, position: &str) -> Result<meshdb_core::NodeId> {
1323    match v {
1324        Value::Node(n) => Ok(n.id),
1325        Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
1326            "apoc.create.relationship: {position} must be a node, got null"
1327        ))),
1328        other => Err(Error::Procedure(format!(
1329            "apoc.create.relationship: {position} must be a node, got {other:?}"
1330        ))),
1331    }
1332}
1333
1334/// Convert a [`Value`] supplied as a procedure arg into a
1335/// storable [`Property`]. Mirrors the `value_to_property` helper
1336/// in `ops.rs` but lives here so procedures.rs can stay
1337/// self-contained — graph elements (Node/Edge/Path) and
1338/// graph-aware Maps aren't valid as stored properties.
1339#[cfg(feature = "apoc-create")]
1340fn value_to_storable_property(v: &Value) -> Result<Property> {
1341    match v {
1342        Value::Property(p) => Ok(p.clone()),
1343        Value::Null => Ok(Property::Null),
1344        Value::List(items) => {
1345            let props = items
1346                .iter()
1347                .map(value_to_storable_property)
1348                .collect::<Result<Vec<_>>>()?;
1349            Ok(Property::List(props))
1350        }
1351        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
1352            Err(Error::Procedure(
1353                "apoc.create.node: property values can't be graph elements or graph-aware maps"
1354                    .into(),
1355            ))
1356        }
1357    }
1358}
1359
1360fn values_equal_for_procedure(a: &Value, b: &Value) -> bool {
1361    match (a, b) {
1362        (Value::Null, Value::Null) => true,
1363        (Value::Null, _) | (_, Value::Null) => false,
1364        (Value::Property(Property::Int64(x)), Value::Property(Property::Int64(y))) => x == y,
1365        (Value::Property(Property::Float64(x)), Value::Property(Property::Float64(y))) => x == y,
1366        (Value::Property(Property::Int64(i)), Value::Property(Property::Float64(f)))
1367        | (Value::Property(Property::Float64(f)), Value::Property(Property::Int64(i))) => {
1368            *f == (*i as f64)
1369        }
1370        (Value::Property(Property::String(x)), Value::Property(Property::String(y))) => x == y,
1371        (Value::Property(Property::Bool(x)), Value::Property(Property::Bool(y))) => x == y,
1372        _ => a == b,
1373    }
1374}
1375
1376/// Lookup table for registered procedures, keyed by fully qualified
1377/// name (`test.my.proc`). The executor consults this at run time;
1378/// callers (TCK harness, server startup) build an instance and pass
1379/// it to [`crate::execute_with_reader_and_procs`]. An empty registry
1380/// is the default, meaning no procedures are known and any CALL
1381/// raises `ProcedureNotFound`.
1382#[derive(Debug, Clone, Default)]
1383pub struct ProcedureRegistry {
1384    procs: HashMap<String, Procedure>,
1385    /// Runtime security config for `apoc.load.*` / (future)
1386    /// `apoc.export.*`. `None` means "strict default" — every
1387    /// load call fails with a message pointing at the server
1388    /// config. Callers opt in via [`Self::set_import_config`].
1389    #[cfg(feature = "apoc-load")]
1390    import_config: Option<crate::apoc_load::ImportConfig>,
1391    /// Active trigger registry. `None` means triggers are
1392    /// effectively disabled — install / drop / list will all
1393    /// fail with a clear error pointing at the missing
1394    /// attachment. meshdb-server attaches one at startup.
1395    #[cfg(feature = "apoc-trigger")]
1396    trigger_registry: Option<crate::apoc_trigger::TriggerRegistry>,
1397}
1398
1399impl ProcedureRegistry {
1400    pub fn new() -> Self {
1401        Self::default()
1402    }
1403
1404    pub fn register(&mut self, proc: Procedure) {
1405        let key = proc.qualified_name.join(".");
1406        self.procs.insert(key, proc);
1407    }
1408
1409    pub fn get(&self, qualified_name: &[String]) -> Option<&Procedure> {
1410        self.procs.get(&qualified_name.join("."))
1411    }
1412
1413    /// Attach the server's [`ImportConfig`] so `apoc.load.*`
1414    /// knows which file paths / URLs are allowed. Without a
1415    /// call to this method every load call refuses with the
1416    /// "apoc.import.enabled" message.
1417    #[cfg(feature = "apoc-load")]
1418    pub fn set_import_config(&mut self, cfg: crate::apoc_load::ImportConfig) {
1419        self.import_config = Some(cfg);
1420    }
1421
1422    /// The currently-attached import config, if any.
1423    #[cfg(feature = "apoc-load")]
1424    pub fn import_config(&self) -> Option<&crate::apoc_load::ImportConfig> {
1425        self.import_config.as_ref()
1426    }
1427
1428    /// Attach the runtime trigger registry. Required before
1429    /// `apoc.trigger.install` / `drop` / `list` will succeed.
1430    #[cfg(feature = "apoc-trigger")]
1431    pub fn set_trigger_registry(&mut self, registry: crate::apoc_trigger::TriggerRegistry) {
1432        self.trigger_registry = Some(registry);
1433    }
1434
1435    /// The currently-attached trigger registry, if any.
1436    #[cfg(feature = "apoc-trigger")]
1437    pub fn trigger_registry(&self) -> Option<&crate::apoc_trigger::TriggerRegistry> {
1438        self.trigger_registry.as_ref()
1439    }
1440
1441    /// Register the built-in `db.labels`, `db.relationshipTypes`, and
1442    /// `db.propertyKeys` procedures. Call once at server startup —
1443    /// the executor materialises each call's row set from the live
1444    /// graph, so no data needs to be recomputed here when new
1445    /// labels / types / keys appear.
1446    pub fn register_defaults(&mut self) {
1447        self.register(Procedure {
1448            qualified_name: vec!["db".into(), "labels".into()],
1449            inputs: Vec::new(),
1450            outputs: vec![ProcOutSpec {
1451                name: "label".into(),
1452                ty: ProcType::String,
1453            }],
1454            rows: Vec::new(),
1455            builtin: Some(BuiltinProc::DbLabels),
1456        });
1457        self.register(Procedure {
1458            qualified_name: vec!["db".into(), "relationshipTypes".into()],
1459            inputs: Vec::new(),
1460            outputs: vec![ProcOutSpec {
1461                name: "relationshipType".into(),
1462                ty: ProcType::String,
1463            }],
1464            rows: Vec::new(),
1465            builtin: Some(BuiltinProc::DbRelationshipTypes),
1466        });
1467        self.register(Procedure {
1468            qualified_name: vec!["db".into(), "propertyKeys".into()],
1469            inputs: Vec::new(),
1470            outputs: vec![ProcOutSpec {
1471                name: "propertyKey".into(),
1472                ty: ProcType::String,
1473            }],
1474            rows: Vec::new(),
1475            builtin: Some(BuiltinProc::DbPropertyKeys),
1476        });
1477        self.register(Procedure {
1478            qualified_name: vec!["db".into(), "constraints".into()],
1479            inputs: Vec::new(),
1480            outputs: vec![
1481                ProcOutSpec {
1482                    name: "name".into(),
1483                    ty: ProcType::String,
1484                },
1485                ProcOutSpec {
1486                    name: "scope".into(),
1487                    ty: ProcType::String,
1488                },
1489                ProcOutSpec {
1490                    name: "label".into(),
1491                    ty: ProcType::String,
1492                },
1493                ProcOutSpec {
1494                    name: "properties".into(),
1495                    // Returned as a list of strings; the type lattice
1496                    // doesn't have a dedicated `List<String>` variant
1497                    // so `ANY` is the closest fit — the executor
1498                    // preserves the actual List value at call time.
1499                    ty: ProcType::Any,
1500                },
1501                ProcOutSpec {
1502                    name: "type".into(),
1503                    ty: ProcType::String,
1504                },
1505            ],
1506            rows: Vec::new(),
1507            builtin: Some(BuiltinProc::DbConstraints),
1508        });
1509        #[cfg(feature = "apoc-create")]
1510        self.register(Procedure {
1511            qualified_name: vec!["apoc".into(), "create".into(), "node".into()],
1512            inputs: vec![
1513                ProcArgSpec {
1514                    name: "labels".into(),
1515                    // List<String> declared as ANY because the type
1516                    // lattice doesn't carry container parameters; the
1517                    // builtin validates structure at call time.
1518                    ty: ProcType::Any,
1519                },
1520                ProcArgSpec {
1521                    name: "props".into(),
1522                    ty: ProcType::Any,
1523                },
1524            ],
1525            outputs: vec![ProcOutSpec {
1526                name: "node".into(),
1527                ty: ProcType::Any,
1528            }],
1529            rows: Vec::new(),
1530            builtin: Some(BuiltinProc::ApocCreateNode),
1531        });
1532        #[cfg(feature = "apoc-create")]
1533        self.register(Procedure {
1534            qualified_name: vec!["apoc".into(), "create".into(), "relationship".into()],
1535            inputs: vec![
1536                // Argument order matches Neo4j's APOC: from, type,
1537                // props, to. Endpoints accept Value::Node directly;
1538                // ProcType::Any is the broadest match that lets a
1539                // node value flow through without coercion.
1540                ProcArgSpec {
1541                    name: "from".into(),
1542                    ty: ProcType::Any,
1543                },
1544                ProcArgSpec {
1545                    name: "relType".into(),
1546                    ty: ProcType::String,
1547                },
1548                ProcArgSpec {
1549                    name: "props".into(),
1550                    ty: ProcType::Any,
1551                },
1552                ProcArgSpec {
1553                    name: "to".into(),
1554                    ty: ProcType::Any,
1555                },
1556            ],
1557            outputs: vec![ProcOutSpec {
1558                name: "rel".into(),
1559                ty: ProcType::Any,
1560            }],
1561            rows: Vec::new(),
1562            builtin: Some(BuiltinProc::ApocCreateRelationship),
1563        });
1564        #[cfg(feature = "apoc-create")]
1565        for (name, builtin) in [
1566            ("addLabels", BuiltinProc::ApocCreateAddLabels),
1567            ("removeLabels", BuiltinProc::ApocCreateRemoveLabels),
1568            ("setLabels", BuiltinProc::ApocCreateSetLabels),
1569        ] {
1570            self.register(Procedure {
1571                qualified_name: vec!["apoc".into(), "create".into(), name.into()],
1572                inputs: vec![
1573                    // First arg accepts a Node or a list of Nodes;
1574                    // ProcType::Any covers both without coercion.
1575                    ProcArgSpec {
1576                        name: "nodes".into(),
1577                        ty: ProcType::Any,
1578                    },
1579                    ProcArgSpec {
1580                        name: "labels".into(),
1581                        ty: ProcType::Any,
1582                    },
1583                ],
1584                outputs: vec![ProcOutSpec {
1585                    name: "node".into(),
1586                    ty: ProcType::Any,
1587                }],
1588                rows: Vec::new(),
1589                builtin: Some(builtin),
1590            });
1591        }
1592        #[cfg(feature = "apoc-create")]
1593        self.register(Procedure {
1594            qualified_name: vec!["apoc".into(), "create".into(), "setProperty".into()],
1595            inputs: vec![
1596                ProcArgSpec {
1597                    name: "nodes".into(),
1598                    ty: ProcType::Any,
1599                },
1600                ProcArgSpec {
1601                    name: "key".into(),
1602                    ty: ProcType::String,
1603                },
1604                ProcArgSpec {
1605                    name: "value".into(),
1606                    ty: ProcType::Any,
1607                },
1608            ],
1609            outputs: vec![ProcOutSpec {
1610                name: "node".into(),
1611                ty: ProcType::Any,
1612            }],
1613            rows: Vec::new(),
1614            builtin: Some(BuiltinProc::ApocCreateSetProperty),
1615        });
1616        #[cfg(feature = "apoc-create")]
1617        self.register(Procedure {
1618            qualified_name: vec!["apoc".into(), "create".into(), "setRelProperty".into()],
1619            inputs: vec![
1620                ProcArgSpec {
1621                    name: "rels".into(),
1622                    ty: ProcType::Any,
1623                },
1624                ProcArgSpec {
1625                    name: "key".into(),
1626                    ty: ProcType::String,
1627                },
1628                ProcArgSpec {
1629                    name: "value".into(),
1630                    ty: ProcType::Any,
1631                },
1632            ],
1633            outputs: vec![ProcOutSpec {
1634                name: "rel".into(),
1635                ty: ProcType::Any,
1636            }],
1637            rows: Vec::new(),
1638            builtin: Some(BuiltinProc::ApocCreateSetRelProperty),
1639        });
1640        #[cfg(feature = "apoc-meta")]
1641        self.register(Procedure {
1642            qualified_name: vec!["apoc".into(), "meta".into(), "schema".into()],
1643            inputs: Vec::new(),
1644            outputs: vec![ProcOutSpec {
1645                name: "value".into(),
1646                ty: ProcType::Any,
1647            }],
1648            rows: Vec::new(),
1649            builtin: Some(BuiltinProc::ApocMetaSchema),
1650        });
1651        #[cfg(feature = "apoc-refactor")]
1652        self.register(Procedure {
1653            qualified_name: vec!["apoc".into(), "refactor".into(), "setType".into()],
1654            inputs: vec![
1655                ProcArgSpec {
1656                    name: "rel".into(),
1657                    ty: ProcType::Any,
1658                },
1659                ProcArgSpec {
1660                    name: "newType".into(),
1661                    ty: ProcType::String,
1662                },
1663            ],
1664            outputs: vec![ProcOutSpec {
1665                name: "rel".into(),
1666                ty: ProcType::Any,
1667            }],
1668            rows: Vec::new(),
1669            builtin: Some(BuiltinProc::ApocRefactorSetType),
1670        });
1671        #[cfg(feature = "apoc-path")]
1672        self.register(Procedure {
1673            qualified_name: vec!["apoc".into(), "path".into(), "expand".into()],
1674            inputs: vec![
1675                ProcArgSpec {
1676                    name: "startNode".into(),
1677                    ty: ProcType::Any,
1678                },
1679                ProcArgSpec {
1680                    name: "relationshipFilter".into(),
1681                    ty: ProcType::Any,
1682                },
1683                ProcArgSpec {
1684                    name: "labelFilter".into(),
1685                    ty: ProcType::Any,
1686                },
1687                ProcArgSpec {
1688                    name: "minLevel".into(),
1689                    ty: ProcType::Any,
1690                },
1691                ProcArgSpec {
1692                    name: "maxLevel".into(),
1693                    ty: ProcType::Any,
1694                },
1695            ],
1696            outputs: vec![ProcOutSpec {
1697                name: "path".into(),
1698                ty: ProcType::Any,
1699            }],
1700            rows: Vec::new(),
1701            builtin: Some(BuiltinProc::ApocPathExpand),
1702        });
1703        #[cfg(feature = "apoc-path")]
1704        self.register(Procedure {
1705            qualified_name: vec!["apoc".into(), "path".into(), "expandConfig".into()],
1706            inputs: vec![
1707                ProcArgSpec {
1708                    name: "startNode".into(),
1709                    ty: ProcType::Any,
1710                },
1711                ProcArgSpec {
1712                    name: "config".into(),
1713                    ty: ProcType::Any,
1714                },
1715            ],
1716            outputs: vec![ProcOutSpec {
1717                name: "path".into(),
1718                ty: ProcType::Any,
1719            }],
1720            rows: Vec::new(),
1721            builtin: Some(BuiltinProc::ApocPathExpandConfig),
1722        });
1723        #[cfg(feature = "apoc-path")]
1724        self.register(Procedure {
1725            qualified_name: vec!["apoc".into(), "path".into(), "subgraphNodes".into()],
1726            inputs: vec![
1727                ProcArgSpec {
1728                    name: "startNode".into(),
1729                    ty: ProcType::Any,
1730                },
1731                ProcArgSpec {
1732                    name: "config".into(),
1733                    ty: ProcType::Any,
1734                },
1735            ],
1736            outputs: vec![ProcOutSpec {
1737                name: "node".into(),
1738                ty: ProcType::Any,
1739            }],
1740            rows: Vec::new(),
1741            builtin: Some(BuiltinProc::ApocPathSubgraphNodes),
1742        });
1743        #[cfg(feature = "apoc-path")]
1744        self.register(Procedure {
1745            qualified_name: vec!["apoc".into(), "path".into(), "subgraphAll".into()],
1746            inputs: vec![
1747                ProcArgSpec {
1748                    name: "startNode".into(),
1749                    ty: ProcType::Any,
1750                },
1751                ProcArgSpec {
1752                    name: "config".into(),
1753                    ty: ProcType::Any,
1754                },
1755            ],
1756            outputs: vec![
1757                ProcOutSpec {
1758                    name: "nodes".into(),
1759                    ty: ProcType::Any,
1760                },
1761                ProcOutSpec {
1762                    name: "relationships".into(),
1763                    ty: ProcType::Any,
1764                },
1765            ],
1766            rows: Vec::new(),
1767            builtin: Some(BuiltinProc::ApocPathSubgraphAll),
1768        });
1769        #[cfg(feature = "apoc-path")]
1770        self.register(Procedure {
1771            qualified_name: vec!["apoc".into(), "path".into(), "spanningTree".into()],
1772            inputs: vec![
1773                ProcArgSpec {
1774                    name: "startNode".into(),
1775                    ty: ProcType::Any,
1776                },
1777                ProcArgSpec {
1778                    name: "config".into(),
1779                    ty: ProcType::Any,
1780                },
1781            ],
1782            outputs: vec![ProcOutSpec {
1783                name: "path".into(),
1784                ty: ProcType::Any,
1785            }],
1786            rows: Vec::new(),
1787            builtin: Some(BuiltinProc::ApocPathSpanningTree),
1788        });
1789        #[cfg(feature = "apoc-cypher")]
1790        self.register(Procedure {
1791            qualified_name: vec!["apoc".into(), "cypher".into(), "run".into()],
1792            inputs: vec![
1793                ProcArgSpec {
1794                    name: "cypher".into(),
1795                    ty: ProcType::String,
1796                },
1797                ProcArgSpec {
1798                    name: "params".into(),
1799                    ty: ProcType::Any,
1800                },
1801            ],
1802            outputs: vec![ProcOutSpec {
1803                name: "value".into(),
1804                ty: ProcType::Any,
1805            }],
1806            rows: Vec::new(),
1807            builtin: Some(BuiltinProc::ApocCypherRun),
1808        });
1809        #[cfg(feature = "apoc-cypher")]
1810        self.register(Procedure {
1811            qualified_name: vec!["apoc".into(), "cypher".into(), "doIt".into()],
1812            inputs: vec![
1813                ProcArgSpec {
1814                    name: "cypher".into(),
1815                    ty: ProcType::String,
1816                },
1817                ProcArgSpec {
1818                    name: "params".into(),
1819                    ty: ProcType::Any,
1820                },
1821            ],
1822            outputs: vec![ProcOutSpec {
1823                name: "value".into(),
1824                ty: ProcType::Any,
1825            }],
1826            rows: Vec::new(),
1827            builtin: Some(BuiltinProc::ApocCypherDoIt),
1828        });
1829        #[cfg(feature = "apoc-load")]
1830        self.register(Procedure {
1831            qualified_name: vec!["apoc".into(), "load".into(), "json".into()],
1832            inputs: vec![
1833                ProcArgSpec {
1834                    name: "urlOrPath".into(),
1835                    ty: ProcType::String,
1836                },
1837                ProcArgSpec {
1838                    name: "path".into(),
1839                    ty: ProcType::Any,
1840                },
1841            ],
1842            outputs: vec![ProcOutSpec {
1843                name: "value".into(),
1844                ty: ProcType::Any,
1845            }],
1846            rows: Vec::new(),
1847            builtin: Some(BuiltinProc::ApocLoadJson),
1848        });
1849        #[cfg(feature = "apoc-load")]
1850        self.register(Procedure {
1851            qualified_name: vec!["apoc".into(), "load".into(), "csv".into()],
1852            inputs: vec![
1853                ProcArgSpec {
1854                    name: "urlOrPath".into(),
1855                    ty: ProcType::String,
1856                },
1857                ProcArgSpec {
1858                    name: "config".into(),
1859                    ty: ProcType::Any,
1860                },
1861            ],
1862            outputs: vec![
1863                ProcOutSpec {
1864                    name: "lineNo".into(),
1865                    ty: ProcType::Integer,
1866                },
1867                ProcOutSpec {
1868                    name: "list".into(),
1869                    ty: ProcType::Any,
1870                },
1871                ProcOutSpec {
1872                    name: "map".into(),
1873                    ty: ProcType::Any,
1874                },
1875            ],
1876            rows: Vec::new(),
1877            builtin: Some(BuiltinProc::ApocLoadCsv),
1878        });
1879        #[cfg(feature = "apoc-export")]
1880        {
1881            let all_inputs = || {
1882                vec![
1883                    ProcArgSpec {
1884                        name: "file".into(),
1885                        ty: ProcType::String,
1886                    },
1887                    ProcArgSpec {
1888                        name: "config".into(),
1889                        ty: ProcType::Any,
1890                    },
1891                ]
1892            };
1893            let query_inputs = || {
1894                vec![
1895                    ProcArgSpec {
1896                        name: "query".into(),
1897                        ty: ProcType::String,
1898                    },
1899                    ProcArgSpec {
1900                        name: "file".into(),
1901                        ty: ProcType::String,
1902                    },
1903                    ProcArgSpec {
1904                        name: "config".into(),
1905                        ty: ProcType::Any,
1906                    },
1907                ]
1908            };
1909            let stats_outputs = || {
1910                vec![
1911                    ProcOutSpec {
1912                        name: "file".into(),
1913                        ty: ProcType::String,
1914                    },
1915                    ProcOutSpec {
1916                        name: "source".into(),
1917                        ty: ProcType::String,
1918                    },
1919                    ProcOutSpec {
1920                        name: "format".into(),
1921                        ty: ProcType::String,
1922                    },
1923                    ProcOutSpec {
1924                        name: "nodes".into(),
1925                        ty: ProcType::Integer,
1926                    },
1927                    ProcOutSpec {
1928                        name: "relationships".into(),
1929                        ty: ProcType::Integer,
1930                    },
1931                    ProcOutSpec {
1932                        name: "properties".into(),
1933                        ty: ProcType::Integer,
1934                    },
1935                    ProcOutSpec {
1936                        name: "rows".into(),
1937                        ty: ProcType::Integer,
1938                    },
1939                    ProcOutSpec {
1940                        name: "time".into(),
1941                        ty: ProcType::Integer,
1942                    },
1943                ]
1944            };
1945            for (ns, fn_name, is_query, variant) in [
1946                ("csv", "all", false, BuiltinProc::ApocExportCsvAll),
1947                ("csv", "query", true, BuiltinProc::ApocExportCsvQuery),
1948                ("json", "all", false, BuiltinProc::ApocExportJsonAll),
1949                ("json", "query", true, BuiltinProc::ApocExportJsonQuery),
1950                ("cypher", "all", false, BuiltinProc::ApocExportCypherAll),
1951                ("cypher", "query", true, BuiltinProc::ApocExportCypherQuery),
1952            ] {
1953                self.register(Procedure {
1954                    qualified_name: vec!["apoc".into(), "export".into(), ns.into(), fn_name.into()],
1955                    inputs: if is_query {
1956                        query_inputs()
1957                    } else {
1958                        all_inputs()
1959                    },
1960                    outputs: stats_outputs(),
1961                    rows: Vec::new(),
1962                    builtin: Some(variant),
1963                });
1964            }
1965        }
1966        #[cfg(feature = "apoc-trigger")]
1967        {
1968            self.register(Procedure {
1969                qualified_name: vec!["apoc".into(), "trigger".into(), "install".into()],
1970                inputs: vec![
1971                    ProcArgSpec {
1972                        name: "databaseName".into(),
1973                        ty: ProcType::String,
1974                    },
1975                    ProcArgSpec {
1976                        name: "name".into(),
1977                        ty: ProcType::String,
1978                    },
1979                    ProcArgSpec {
1980                        name: "statement".into(),
1981                        ty: ProcType::String,
1982                    },
1983                    ProcArgSpec {
1984                        name: "selector".into(),
1985                        ty: ProcType::Any,
1986                    },
1987                    ProcArgSpec {
1988                        name: "config".into(),
1989                        ty: ProcType::Any,
1990                    },
1991                ],
1992                outputs: vec![
1993                    ProcOutSpec {
1994                        name: "name".into(),
1995                        ty: ProcType::String,
1996                    },
1997                    ProcOutSpec {
1998                        name: "query".into(),
1999                        ty: ProcType::String,
2000                    },
2001                    ProcOutSpec {
2002                        name: "installed".into(),
2003                        ty: ProcType::Boolean,
2004                    },
2005                    ProcOutSpec {
2006                        name: "previous".into(),
2007                        ty: ProcType::Any,
2008                    },
2009                ],
2010                rows: Vec::new(),
2011                builtin: Some(BuiltinProc::ApocTriggerInstall),
2012            });
2013            self.register(Procedure {
2014                qualified_name: vec!["apoc".into(), "trigger".into(), "drop".into()],
2015                inputs: vec![
2016                    ProcArgSpec {
2017                        name: "databaseName".into(),
2018                        ty: ProcType::String,
2019                    },
2020                    ProcArgSpec {
2021                        name: "name".into(),
2022                        ty: ProcType::String,
2023                    },
2024                ],
2025                outputs: vec![
2026                    ProcOutSpec {
2027                        name: "name".into(),
2028                        ty: ProcType::String,
2029                    },
2030                    ProcOutSpec {
2031                        name: "removed".into(),
2032                        ty: ProcType::Boolean,
2033                    },
2034                ],
2035                rows: Vec::new(),
2036                builtin: Some(BuiltinProc::ApocTriggerDrop),
2037            });
2038            self.register(Procedure {
2039                qualified_name: vec!["apoc".into(), "trigger".into(), "list".into()],
2040                inputs: Vec::new(),
2041                outputs: vec![
2042                    ProcOutSpec {
2043                        name: "name".into(),
2044                        ty: ProcType::String,
2045                    },
2046                    ProcOutSpec {
2047                        name: "query".into(),
2048                        ty: ProcType::String,
2049                    },
2050                    ProcOutSpec {
2051                        name: "phase".into(),
2052                        ty: ProcType::String,
2053                    },
2054                    ProcOutSpec {
2055                        name: "installed_at".into(),
2056                        ty: ProcType::Integer,
2057                    },
2058                    ProcOutSpec {
2059                        name: "paused".into(),
2060                        ty: ProcType::Boolean,
2061                    },
2062                ],
2063                rows: Vec::new(),
2064                builtin: Some(BuiltinProc::ApocTriggerList),
2065            });
2066            for (fn_name, variant) in [
2067                ("start", BuiltinProc::ApocTriggerStart),
2068                ("stop", BuiltinProc::ApocTriggerStop),
2069            ] {
2070                self.register(Procedure {
2071                    qualified_name: vec!["apoc".into(), "trigger".into(), fn_name.into()],
2072                    inputs: vec![
2073                        ProcArgSpec {
2074                            name: "databaseName".into(),
2075                            ty: ProcType::String,
2076                        },
2077                        ProcArgSpec {
2078                            name: "name".into(),
2079                            ty: ProcType::String,
2080                        },
2081                    ],
2082                    outputs: vec![
2083                        ProcOutSpec {
2084                            name: "name".into(),
2085                            ty: ProcType::String,
2086                        },
2087                        ProcOutSpec {
2088                            name: "paused".into(),
2089                            ty: ProcType::Boolean,
2090                        },
2091                    ],
2092                    rows: Vec::new(),
2093                    builtin: Some(variant),
2094                });
2095            }
2096        }
2097    }
2098}