Skip to main content

uni_query/procedures_plugin/
create.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! `uni.create.vNode` / `uni.create.vEdge` — ephemeral (transient,
5//! in-query) graph entities. The APOC `apoc.create.vNode` /
6//! `apoc.create.vRelationship` analogues from proposal §4.13.1.
7//!
8//! Ephemeral identities are minted from
9//! `QueryProcedureHost::allocate_transient_id()` and wrapped in
10//! `Vid::ephemeral` / `Eid::ephemeral` so their high bit (M5g
11//! `EPHEMERAL_BIT`) is set. Storage write entry points
12//! (`execute_set_items_locked`, `execute_delete_vertex`,
13//! `execute_delete_edge_from_map`) refuse any id with that bit set,
14//! surfacing `UniError::EphemeralWriteAttempt`.
15//!
16//! **Yield shape (M5g):**
17//!
18//! * `uni.create.vNode` declares a single canonical `vid` Int64 field
19//!   on its signature, tagged with `_yield_kind = node_vid_source`.
20//!   That metadata tag opts the procedure into the planner's
21//!   node-shaped YIELD expansion: when the caller writes
22//!   `YIELD node`, the planner rewrites the column projection to the
23//!   canonical Node tuple (`<n>._vid`, `<n>`, `<n>._labels`,
24//!   `<n>.<prop>` ...). This is the same surface area as
25//!   `uni.vector.query` and friends — downstream Cypher `WITH node ...
26//!   node.foo` access works out of the box because the property
27//!   columns are physically present on the row.
28//! * `uni.create.vEdge` declares a single `edge` field whose Arrow
29//!   type is `Struct(_eid, _type_name, _src, _dst, properties)` — the
30//!   canonical edge-struct shape used by path materialization
31//!   (`df_graph::common::edge_struct_fields`). Unit tests assert the
32//!   struct directly; downstream edge-property access is out of
33//!   scope for M5g (the round-trip target was `node.foo`, not
34//!   `rel.foo`).
35
36use std::collections::HashMap;
37use std::sync::Arc;
38use std::sync::OnceLock;
39
40use arrow_array::builder::{ListBuilder, StringBuilder, UInt64Builder};
41use arrow_array::{ArrayRef, Int64Array, LargeBinaryArray, RecordBatch, StringArray, StructArray};
42use arrow_schema::{DataType, Field, Schema, SchemaRef};
43use datafusion::execution::SendableRecordBatchStream;
44use datafusion::logical_expr::ColumnarValue;
45use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
46use datafusion::scalar::ScalarValue;
47use uni_common::Properties;
48use uni_common::core::id::{Eid, Vid};
49use uni_plugin::traits::procedure::{
50    NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
51};
52use uni_plugin::traits::scalar::ArgType;
53use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
54
55use crate::query::df_graph::common::edge_struct_fields;
56use crate::query::df_graph::procedure_call::map_yield_to_canonical;
57use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
58use crate::query::executor::procedure_host::QueryProcedureHost;
59
60// Rust guideline compliant
61
62// ---------------------------------------------------------------------------
63// Shared helpers
64// ---------------------------------------------------------------------------
65
66fn require_host<'a>(ctx: &ProcedureContext<'a>) -> Result<&'a QueryProcedureHost, FnError> {
67    ctx.host
68        .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
69        .ok_or_else(|| FnError::new(0x701, "uni.create.*: requires QueryProcedureHost"))
70}
71
72/// Decode a positional arg as JSON (LargeBinary-encoded by the
73/// dispatcher) or string/scalar fallback. Mirrors
74/// `graph.rs::arg_to_json`.
75fn arg_to_json(cv: &ColumnarValue) -> serde_json::Value {
76    match cv {
77        ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(b)))
78        | ColumnarValue::Scalar(ScalarValue::Binary(Some(b))) => {
79            serde_json::from_slice::<serde_json::Value>(b).unwrap_or(serde_json::Value::Null)
80        }
81        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
82        | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => {
83            serde_json::Value::String(s.clone())
84        }
85        ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => serde_json::Value::Bool(*b),
86        ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => {
87            serde_json::Value::Number((*i).into())
88        }
89        _ => serde_json::Value::Null,
90    }
91}
92
93fn arg_as_i64(cv: &ColumnarValue) -> Option<i64> {
94    match cv {
95        ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => Some(*i),
96        ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => Some(i64::from(*i)),
97        ColumnarValue::Scalar(ScalarValue::UInt64(Some(u))) => i64::try_from(*u).ok(),
98        _ => None,
99    }
100}
101
102fn arg_as_string(cv: &ColumnarValue) -> Option<String> {
103    match cv {
104        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
105        | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => Some(s.clone()),
106        _ => None,
107    }
108}
109
110fn labels_from_json(jv: &serde_json::Value) -> Vec<String> {
111    match jv {
112        serde_json::Value::Array(arr) => arr
113            .iter()
114            .filter_map(|v| v.as_str().map(str::to_owned))
115            .collect(),
116        serde_json::Value::String(s) => vec![s.clone()],
117        _ => Vec::new(),
118    }
119}
120
121/// Convert a JSON object into a `Properties` map (HashMap<String, Value>).
122fn properties_from_json(jv: &serde_json::Value) -> Properties {
123    match jv {
124        serde_json::Value::Object(obj) => obj
125            .iter()
126            .map(|(k, v)| (k.clone(), uni_common::Value::from(v.clone())))
127            .collect(),
128        _ => Properties::new(),
129    }
130}
131
132fn one_batch_stream(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
133    let stream =
134        futures::stream::once(async move { Ok::<_, datafusion::error::DataFusionError>(batch) });
135    Box::pin(RecordBatchStreamAdapter::new(schema, stream))
136}
137
138// ---------------------------------------------------------------------------
139// uni.create.vNode(labels, props) — typed Node yield (M5g)
140// ---------------------------------------------------------------------------
141
142/// Build the canonical `vid` field with `_yield_kind = node_vid_source`
143/// metadata. This is the seam that opts the procedure into the
144/// planner's node-shaped YIELD expansion (see
145/// `procedure_call::expand_node_yield_fields`).
146fn vid_node_yield_field() -> Field {
147    let mut md = HashMap::new();
148    md.insert("_yield_kind".to_owned(), "node_vid_source".to_owned());
149    Field::new("vid", DataType::Int64, false).with_metadata(md)
150}
151
152#[derive(Debug)]
153pub struct VNodeProcedure;
154
155impl VNodeProcedure {
156    fn signature_static() -> &'static ProcedureSignature {
157        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
158        SIG.get_or_init(|| ProcedureSignature {
159            args: vec![
160                NamedArgType {
161                    name: smol_str::SmolStr::new("labels"),
162                    ty: ArgType::Primitive(DataType::LargeBinary),
163                    default: Some(ScalarValue::LargeBinary(Some(b"[]".to_vec()))),
164                    doc: "List of label names (JSON-encoded array).".to_owned(),
165                },
166                NamedArgType {
167                    name: smol_str::SmolStr::new("props"),
168                    ty: ArgType::Primitive(DataType::LargeBinary),
169                    default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
170                    doc: "Property map (JSON-encoded object).".to_owned(),
171                },
172            ],
173            yields: vec![vid_node_yield_field()],
174            mode: ProcedureMode::Read,
175            side_effects: SideEffects::ReadOnly,
176            retry_contract: None,
177            batch_input: None,
178            docs: "uni.create.vNode(labels, props) — mint a transient, \
179                   in-query ephemeral node. Yields a single canonical \
180                   Node column; when the caller writes `YIELD node` the \
181                   planner expands it to the standard \
182                   `<n>._vid + <n> + <n>._labels + <n>.<prop>` tuple. \
183                   The returned vid has the `EPHEMERAL_BIT` (high bit) \
184                   set; writes against it fail with \
185                   `EphemeralWriteAttempt`. Not visible to subsequent \
186                   MATCH."
187                .to_owned(),
188        })
189    }
190}
191
192impl ProcedurePlugin for VNodeProcedure {
193    fn signature(&self) -> &ProcedureSignature {
194        Self::signature_static()
195    }
196
197    fn invoke(
198        &self,
199        ctx: ProcedureContext<'_>,
200        args: &[ColumnarValue],
201    ) -> Result<SendableRecordBatchStream, FnError> {
202        let host = require_host(&ctx)?;
203        let labels_json = args
204            .first()
205            .map(arg_to_json)
206            .unwrap_or(serde_json::Value::Null);
207        let props_json = args
208            .get(1)
209            .map(arg_to_json)
210            .unwrap_or(serde_json::Value::Null);
211        let labels = labels_from_json(&labels_json);
212        let props = properties_from_json(&props_json);
213
214        let vid = Vid::ephemeral(host.allocate_transient_id());
215
216        // Decide output shape: planner-driven (host has yield_items) or
217        // fallback (signature schema: single `vid` Int64).
218        let host_yields = host.yield_items();
219        if host_yields.is_empty() {
220            let schema: SchemaRef = Arc::new(Schema::new(vec![vid_node_yield_field()]));
221            #[allow(clippy::cast_possible_wrap)]
222            let cols: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![vid.as_u64() as i64]))];
223            let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
224                .map_err(|e| FnError::new(0x830, format!("vNode RecordBatch build: {e}")))?;
225            return Ok(one_batch_stream(schema, batch));
226        }
227
228        // Planner-driven path: emit columns matching expected_schema by
229        // dispatching each yield to its canonical column shape.
230        let expected_schema = host.expected_schema().cloned().ok_or_else(|| {
231            FnError::new(0x830, "vNode: host yield_items set without expected_schema")
232        })?;
233        let target_properties = host.target_properties();
234
235        let cols = build_vnode_columns(
236            host_yields,
237            target_properties,
238            &expected_schema,
239            vid,
240            &labels,
241            &props,
242        )?;
243        let batch = RecordBatch::try_new(Arc::clone(&expected_schema), cols)
244            .map_err(|e| FnError::new(0x830, format!("vNode RecordBatch build: {e}")))?;
245        Ok(one_batch_stream(expected_schema, batch))
246    }
247}
248
249/// Build the per-yield columns for `uni.create.vNode`, matching the
250/// planner-supplied expected schema. The expected schema's column names
251/// drive property resolution (a column named `<n>.foo` pulls `foo`
252/// from the props map).
253fn build_vnode_columns(
254    yield_items: &[(String, Option<String>)],
255    target_properties: &HashMap<String, Vec<String>>,
256    expected_schema: &SchemaRef,
257    vid: Vid,
258    labels: &[String],
259    props: &Properties,
260) -> Result<Vec<ArrayRef>, FnError> {
261    let mut cols: Vec<ArrayRef> = Vec::with_capacity(expected_schema.fields().len());
262    let vids = [vid];
263    let mut props_map: HashMap<Vid, Properties> = HashMap::new();
264    props_map.insert(vid, props.clone());
265
266    for (yield_name, alias) in yield_items {
267        let output_name = alias.as_ref().unwrap_or(yield_name);
268        let canonical = map_yield_to_canonical(yield_name);
269
270        match canonical {
271            "node" => {
272                // `<n>._vid` UInt64
273                let mut vid_builder = UInt64Builder::with_capacity(1);
274                vid_builder.append_value(vid.as_u64());
275                cols.push(Arc::new(vid_builder.finish()));
276
277                // `<n>` Utf8 (variable column — vid string)
278                let mut var_builder = StringBuilder::new();
279                var_builder.append_value(vid.to_string());
280                cols.push(Arc::new(var_builder.finish()));
281
282                // `<n>._labels` List<Utf8>
283                let mut labels_builder = ListBuilder::new(StringBuilder::new());
284                for l in labels {
285                    labels_builder.values().append_value(l);
286                }
287                labels_builder.append(true);
288                cols.push(Arc::new(labels_builder.finish()));
289
290                // `<n>.<prop>` columns from target_properties.
291                if let Some(prop_names) = target_properties.get(output_name) {
292                    for prop_name in prop_names {
293                        let col_name = format!("{}.{}", output_name, prop_name);
294                        // Use the expected_schema's declared type so we
295                        // emit the exact Arrow type the planner expects.
296                        let data_type = expected_schema
297                            .field_with_name(&col_name)
298                            .map(|f| f.data_type().clone())
299                            .unwrap_or_else(|_| resolve_property_type(prop_name, None));
300                        let col =
301                            build_property_column_static(&vids, &props_map, prop_name, &data_type)
302                                .map_err(|e| {
303                                    FnError::new(
304                                        0x830,
305                                        format!("vNode property column `{prop_name}`: {e}"),
306                                    )
307                                })?;
308                        cols.push(col);
309                    }
310                }
311            }
312            "vid" => {
313                #[allow(clippy::cast_possible_wrap)]
314                let arr = Int64Array::from(vec![vid.as_u64() as i64]);
315                cols.push(Arc::new(arr));
316            }
317            other => {
318                return Err(FnError::new(
319                    0x830,
320                    format!("vNode: unexpected canonical yield `{other}` for `{yield_name}`"),
321                ));
322            }
323        }
324    }
325
326    Ok(cols)
327}
328
329// ---------------------------------------------------------------------------
330// uni.create.vEdge(src, type, props, dst) — typed Edge yield (M5g)
331// ---------------------------------------------------------------------------
332
333/// Build the `edge` Struct field — canonical edge-struct shape
334/// (`_eid`, `_type_name`, `_src`, `_dst`, `properties`), matching
335/// `df_graph::common::edge_struct_fields()` used by path
336/// materialization.
337fn edge_yield_field() -> Field {
338    Field::new("edge", DataType::Struct(edge_struct_fields()), false)
339}
340
341#[derive(Debug)]
342pub struct VEdgeProcedure;
343
344impl VEdgeProcedure {
345    fn signature_static() -> &'static ProcedureSignature {
346        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
347        SIG.get_or_init(|| ProcedureSignature {
348            args: vec![
349                NamedArgType {
350                    name: smol_str::SmolStr::new("src"),
351                    ty: ArgType::Primitive(DataType::Int64),
352                    default: None,
353                    doc: "Source vid (stored or ephemeral).".to_owned(),
354                },
355                NamedArgType {
356                    name: smol_str::SmolStr::new("type"),
357                    ty: ArgType::Primitive(DataType::Utf8),
358                    default: None,
359                    doc: "Edge type name.".to_owned(),
360                },
361                NamedArgType {
362                    name: smol_str::SmolStr::new("props"),
363                    ty: ArgType::Primitive(DataType::LargeBinary),
364                    default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
365                    doc: "Property map (JSON-encoded object).".to_owned(),
366                },
367                NamedArgType {
368                    name: smol_str::SmolStr::new("dst"),
369                    ty: ArgType::Primitive(DataType::Int64),
370                    default: None,
371                    doc: "Destination vid (stored or ephemeral).".to_owned(),
372                },
373            ],
374            yields: vec![edge_yield_field()],
375            mode: ProcedureMode::Read,
376            side_effects: SideEffects::ReadOnly,
377            retry_contract: None,
378            batch_input: None,
379            docs: "uni.create.vEdge(src, type, props, dst) — mint a \
380                   transient, in-query ephemeral edge between two \
381                   (stored or ephemeral) vids. Yields a single \
382                   canonical Edge struct column. The returned `eid` has \
383                   the `EPHEMERAL_BIT` set; writes against it fail \
384                   with `EphemeralWriteAttempt`."
385                .to_owned(),
386        })
387    }
388}
389
390impl ProcedurePlugin for VEdgeProcedure {
391    fn signature(&self) -> &ProcedureSignature {
392        Self::signature_static()
393    }
394
395    fn invoke(
396        &self,
397        ctx: ProcedureContext<'_>,
398        args: &[ColumnarValue],
399    ) -> Result<SendableRecordBatchStream, FnError> {
400        let host = require_host(&ctx)?;
401        let src = args
402            .first()
403            .and_then(arg_as_i64)
404            .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: src (Int) required"))?;
405        let edge_type = args
406            .get(1)
407            .and_then(arg_as_string)
408            .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: type (String) required"))?;
409        let props_json = args
410            .get(2)
411            .map(arg_to_json)
412            .unwrap_or(serde_json::Value::Null);
413        let dst = args
414            .get(3)
415            .and_then(arg_as_i64)
416            .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: dst (Int) required"))?;
417        let props_value = uni_common::Value::Map(properties_from_json(&props_json));
418        let props_bytes = uni_common::cypher_value_codec::encode(&props_value);
419
420        let eid = Eid::ephemeral(host.allocate_transient_id());
421
422        // Build the canonical edge Struct column.
423        #[allow(clippy::cast_sign_loss)]
424        let src_u64 = src as u64;
425        #[allow(clippy::cast_sign_loss)]
426        let dst_u64 = dst as u64;
427
428        let edge_struct =
429            build_edge_struct_array(eid.as_u64(), &edge_type, src_u64, dst_u64, &props_bytes)
430                .map_err(|e| FnError::new(0x830, format!("vEdge struct build: {e}")))?;
431
432        let schema: SchemaRef = Arc::new(Schema::new(vec![edge_yield_field()]));
433        let cols: Vec<ArrayRef> = vec![Arc::new(edge_struct)];
434        let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
435            .map_err(|e| FnError::new(0x830, format!("vEdge RecordBatch build: {e}")))?;
436        Ok(one_batch_stream(schema, batch))
437    }
438}
439
440/// Build a single-row StructArray matching `edge_struct_fields()`:
441/// `(_eid, _type_name, _src, _dst, properties)`.
442fn build_edge_struct_array(
443    eid: u64,
444    type_name: &str,
445    src: u64,
446    dst: u64,
447    props_bytes: &[u8],
448) -> Result<StructArray, arrow_schema::ArrowError> {
449    let fields = edge_struct_fields();
450
451    let eid_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![eid]));
452    let type_arr: ArrayRef = Arc::new(StringArray::from(vec![type_name.to_owned()]));
453    let src_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![src]));
454    let dst_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![dst]));
455    let props_arr: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(props_bytes)]));
456
457    StructArray::try_new(
458        fields,
459        vec![eid_arr, type_arr, src_arr, dst_arr, props_arr],
460        None,
461    )
462}
463
464// ---------------------------------------------------------------------------
465// Registration
466// ---------------------------------------------------------------------------
467
468/// Register every `uni.create.v*` procedure into `r`.
469///
470/// # Errors
471///
472/// Propagates [`PluginError::DuplicateRegistration`] if a qname is taken.
473pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
474    r.procedure(
475        QName::new("uni", "create.vNode"),
476        VNodeProcedure::signature_static().clone(),
477        Arc::new(VNodeProcedure),
478    )?;
479    r.procedure(
480        QName::new("uni", "create.vEdge"),
481        VEdgeProcedure::signature_static().clone(),
482        Arc::new(VEdgeProcedure),
483    )?;
484    Ok(())
485}