1use std::collections::HashMap;
37use std::sync::Arc;
38use std::sync::OnceLock;
39
40use arrow_array::builder::{ListBuilder, StringBuilder, UInt64Builder};
41use arrow_array::{ArrayRef, Int64Array, LargeBinaryArray, RecordBatch, StringArray, StructArray};
42use arrow_schema::{DataType, Field, Schema, SchemaRef};
43use datafusion::execution::SendableRecordBatchStream;
44use datafusion::logical_expr::ColumnarValue;
45use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
46use datafusion::scalar::ScalarValue;
47use uni_common::Properties;
48use uni_common::core::id::{Eid, Vid};
49use uni_plugin::traits::procedure::{
50 NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
51};
52use uni_plugin::traits::scalar::ArgType;
53use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
54
55use crate::query::df_graph::common::edge_struct_fields;
56use crate::query::df_graph::procedure_call::map_yield_to_canonical;
57use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
58use crate::query::executor::procedure_host::QueryProcedureHost;
59
60fn require_host<'a>(ctx: &ProcedureContext<'a>) -> Result<&'a QueryProcedureHost, FnError> {
67 ctx.host
68 .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
69 .ok_or_else(|| FnError::new(0x701, "uni.create.*: requires QueryProcedureHost"))
70}
71
72fn arg_to_json(cv: &ColumnarValue) -> serde_json::Value {
76 match cv {
77 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(b)))
78 | ColumnarValue::Scalar(ScalarValue::Binary(Some(b))) => {
79 serde_json::from_slice::<serde_json::Value>(b).unwrap_or(serde_json::Value::Null)
80 }
81 ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
82 | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => {
83 serde_json::Value::String(s.clone())
84 }
85 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => serde_json::Value::Bool(*b),
86 ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => {
87 serde_json::Value::Number((*i).into())
88 }
89 _ => serde_json::Value::Null,
90 }
91}
92
93fn arg_as_i64(cv: &ColumnarValue) -> Option<i64> {
94 match cv {
95 ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => Some(*i),
96 ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => Some(i64::from(*i)),
97 ColumnarValue::Scalar(ScalarValue::UInt64(Some(u))) => i64::try_from(*u).ok(),
98 _ => None,
99 }
100}
101
102fn arg_as_string(cv: &ColumnarValue) -> Option<String> {
103 match cv {
104 ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
105 | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => Some(s.clone()),
106 _ => None,
107 }
108}
109
110fn labels_from_json(jv: &serde_json::Value) -> Vec<String> {
111 match jv {
112 serde_json::Value::Array(arr) => arr
113 .iter()
114 .filter_map(|v| v.as_str().map(str::to_owned))
115 .collect(),
116 serde_json::Value::String(s) => vec![s.clone()],
117 _ => Vec::new(),
118 }
119}
120
121fn properties_from_json(jv: &serde_json::Value) -> Properties {
123 match jv {
124 serde_json::Value::Object(obj) => obj
125 .iter()
126 .map(|(k, v)| (k.clone(), uni_common::Value::from(v.clone())))
127 .collect(),
128 _ => Properties::new(),
129 }
130}
131
132fn one_batch_stream(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
133 let stream =
134 futures::stream::once(async move { Ok::<_, datafusion::error::DataFusionError>(batch) });
135 Box::pin(RecordBatchStreamAdapter::new(schema, stream))
136}
137
138fn vid_node_yield_field() -> Field {
147 let mut md = HashMap::new();
148 md.insert("_yield_kind".to_owned(), "node_vid_source".to_owned());
149 Field::new("vid", DataType::Int64, false).with_metadata(md)
150}
151
152#[derive(Debug)]
153pub struct VNodeProcedure;
154
155impl VNodeProcedure {
156 fn signature_static() -> &'static ProcedureSignature {
157 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
158 SIG.get_or_init(|| ProcedureSignature {
159 args: vec![
160 NamedArgType {
161 name: smol_str::SmolStr::new("labels"),
162 ty: ArgType::Primitive(DataType::LargeBinary),
163 default: Some(ScalarValue::LargeBinary(Some(b"[]".to_vec()))),
164 doc: "List of label names (JSON-encoded array).".to_owned(),
165 },
166 NamedArgType {
167 name: smol_str::SmolStr::new("props"),
168 ty: ArgType::Primitive(DataType::LargeBinary),
169 default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
170 doc: "Property map (JSON-encoded object).".to_owned(),
171 },
172 ],
173 yields: vec![vid_node_yield_field()],
174 mode: ProcedureMode::Read,
175 side_effects: SideEffects::ReadOnly,
176 retry_contract: None,
177 batch_input: None,
178 docs: "uni.create.vNode(labels, props) — mint a transient, \
179 in-query ephemeral node. Yields a single canonical \
180 Node column; when the caller writes `YIELD node` the \
181 planner expands it to the standard \
182 `<n>._vid + <n> + <n>._labels + <n>.<prop>` tuple. \
183 The returned vid has the `EPHEMERAL_BIT` (high bit) \
184 set; writes against it fail with \
185 `EphemeralWriteAttempt`. Not visible to subsequent \
186 MATCH."
187 .to_owned(),
188 })
189 }
190}
191
192impl ProcedurePlugin for VNodeProcedure {
193 fn signature(&self) -> &ProcedureSignature {
194 Self::signature_static()
195 }
196
197 fn invoke(
198 &self,
199 ctx: ProcedureContext<'_>,
200 args: &[ColumnarValue],
201 ) -> Result<SendableRecordBatchStream, FnError> {
202 let host = require_host(&ctx)?;
203 let labels_json = args
204 .first()
205 .map(arg_to_json)
206 .unwrap_or(serde_json::Value::Null);
207 let props_json = args
208 .get(1)
209 .map(arg_to_json)
210 .unwrap_or(serde_json::Value::Null);
211 let labels = labels_from_json(&labels_json);
212 let props = properties_from_json(&props_json);
213
214 let vid = Vid::ephemeral(host.allocate_transient_id());
215
216 let host_yields = host.yield_items();
219 if host_yields.is_empty() {
220 let schema: SchemaRef = Arc::new(Schema::new(vec![vid_node_yield_field()]));
221 #[allow(clippy::cast_possible_wrap)]
222 let cols: Vec<ArrayRef> = vec![Arc::new(Int64Array::from(vec![vid.as_u64() as i64]))];
223 let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
224 .map_err(|e| FnError::new(0x830, format!("vNode RecordBatch build: {e}")))?;
225 return Ok(one_batch_stream(schema, batch));
226 }
227
228 let expected_schema = host.expected_schema().cloned().ok_or_else(|| {
231 FnError::new(0x830, "vNode: host yield_items set without expected_schema")
232 })?;
233 let target_properties = host.target_properties();
234
235 let cols = build_vnode_columns(
236 host_yields,
237 target_properties,
238 &expected_schema,
239 vid,
240 &labels,
241 &props,
242 )?;
243 let batch = RecordBatch::try_new(Arc::clone(&expected_schema), cols)
244 .map_err(|e| FnError::new(0x830, format!("vNode RecordBatch build: {e}")))?;
245 Ok(one_batch_stream(expected_schema, batch))
246 }
247}
248
249fn build_vnode_columns(
254 yield_items: &[(String, Option<String>)],
255 target_properties: &HashMap<String, Vec<String>>,
256 expected_schema: &SchemaRef,
257 vid: Vid,
258 labels: &[String],
259 props: &Properties,
260) -> Result<Vec<ArrayRef>, FnError> {
261 let mut cols: Vec<ArrayRef> = Vec::with_capacity(expected_schema.fields().len());
262 let vids = [vid];
263 let mut props_map: HashMap<Vid, Properties> = HashMap::new();
264 props_map.insert(vid, props.clone());
265
266 for (yield_name, alias) in yield_items {
267 let output_name = alias.as_ref().unwrap_or(yield_name);
268 let canonical = map_yield_to_canonical(yield_name);
269
270 match canonical {
271 "node" => {
272 let mut vid_builder = UInt64Builder::with_capacity(1);
274 vid_builder.append_value(vid.as_u64());
275 cols.push(Arc::new(vid_builder.finish()));
276
277 let mut var_builder = StringBuilder::new();
279 var_builder.append_value(vid.to_string());
280 cols.push(Arc::new(var_builder.finish()));
281
282 let mut labels_builder = ListBuilder::new(StringBuilder::new());
284 for l in labels {
285 labels_builder.values().append_value(l);
286 }
287 labels_builder.append(true);
288 cols.push(Arc::new(labels_builder.finish()));
289
290 if let Some(prop_names) = target_properties.get(output_name) {
292 for prop_name in prop_names {
293 let col_name = format!("{}.{}", output_name, prop_name);
294 let data_type = expected_schema
297 .field_with_name(&col_name)
298 .map(|f| f.data_type().clone())
299 .unwrap_or_else(|_| resolve_property_type(prop_name, None));
300 let col =
301 build_property_column_static(&vids, &props_map, prop_name, &data_type)
302 .map_err(|e| {
303 FnError::new(
304 0x830,
305 format!("vNode property column `{prop_name}`: {e}"),
306 )
307 })?;
308 cols.push(col);
309 }
310 }
311 }
312 "vid" => {
313 #[allow(clippy::cast_possible_wrap)]
314 let arr = Int64Array::from(vec![vid.as_u64() as i64]);
315 cols.push(Arc::new(arr));
316 }
317 other => {
318 return Err(FnError::new(
319 0x830,
320 format!("vNode: unexpected canonical yield `{other}` for `{yield_name}`"),
321 ));
322 }
323 }
324 }
325
326 Ok(cols)
327}
328
329fn edge_yield_field() -> Field {
338 Field::new("edge", DataType::Struct(edge_struct_fields()), false)
339}
340
341#[derive(Debug)]
342pub struct VEdgeProcedure;
343
344impl VEdgeProcedure {
345 fn signature_static() -> &'static ProcedureSignature {
346 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
347 SIG.get_or_init(|| ProcedureSignature {
348 args: vec![
349 NamedArgType {
350 name: smol_str::SmolStr::new("src"),
351 ty: ArgType::Primitive(DataType::Int64),
352 default: None,
353 doc: "Source vid (stored or ephemeral).".to_owned(),
354 },
355 NamedArgType {
356 name: smol_str::SmolStr::new("type"),
357 ty: ArgType::Primitive(DataType::Utf8),
358 default: None,
359 doc: "Edge type name.".to_owned(),
360 },
361 NamedArgType {
362 name: smol_str::SmolStr::new("props"),
363 ty: ArgType::Primitive(DataType::LargeBinary),
364 default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
365 doc: "Property map (JSON-encoded object).".to_owned(),
366 },
367 NamedArgType {
368 name: smol_str::SmolStr::new("dst"),
369 ty: ArgType::Primitive(DataType::Int64),
370 default: None,
371 doc: "Destination vid (stored or ephemeral).".to_owned(),
372 },
373 ],
374 yields: vec![edge_yield_field()],
375 mode: ProcedureMode::Read,
376 side_effects: SideEffects::ReadOnly,
377 retry_contract: None,
378 batch_input: None,
379 docs: "uni.create.vEdge(src, type, props, dst) — mint a \
380 transient, in-query ephemeral edge between two \
381 (stored or ephemeral) vids. Yields a single \
382 canonical Edge struct column. The returned `eid` has \
383 the `EPHEMERAL_BIT` set; writes against it fail \
384 with `EphemeralWriteAttempt`."
385 .to_owned(),
386 })
387 }
388}
389
390impl ProcedurePlugin for VEdgeProcedure {
391 fn signature(&self) -> &ProcedureSignature {
392 Self::signature_static()
393 }
394
395 fn invoke(
396 &self,
397 ctx: ProcedureContext<'_>,
398 args: &[ColumnarValue],
399 ) -> Result<SendableRecordBatchStream, FnError> {
400 let host = require_host(&ctx)?;
401 let src = args
402 .first()
403 .and_then(arg_as_i64)
404 .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: src (Int) required"))?;
405 let edge_type = args
406 .get(1)
407 .and_then(arg_as_string)
408 .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: type (String) required"))?;
409 let props_json = args
410 .get(2)
411 .map(arg_to_json)
412 .unwrap_or(serde_json::Value::Null);
413 let dst = args
414 .get(3)
415 .and_then(arg_as_i64)
416 .ok_or_else(|| FnError::new(0x824, "uni.create.vEdge: dst (Int) required"))?;
417 let props_value = uni_common::Value::Map(properties_from_json(&props_json));
418 let props_bytes = uni_common::cypher_value_codec::encode(&props_value);
419
420 let eid = Eid::ephemeral(host.allocate_transient_id());
421
422 #[allow(clippy::cast_sign_loss)]
424 let src_u64 = src as u64;
425 #[allow(clippy::cast_sign_loss)]
426 let dst_u64 = dst as u64;
427
428 let edge_struct =
429 build_edge_struct_array(eid.as_u64(), &edge_type, src_u64, dst_u64, &props_bytes)
430 .map_err(|e| FnError::new(0x830, format!("vEdge struct build: {e}")))?;
431
432 let schema: SchemaRef = Arc::new(Schema::new(vec![edge_yield_field()]));
433 let cols: Vec<ArrayRef> = vec![Arc::new(edge_struct)];
434 let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
435 .map_err(|e| FnError::new(0x830, format!("vEdge RecordBatch build: {e}")))?;
436 Ok(one_batch_stream(schema, batch))
437 }
438}
439
440fn build_edge_struct_array(
443 eid: u64,
444 type_name: &str,
445 src: u64,
446 dst: u64,
447 props_bytes: &[u8],
448) -> Result<StructArray, arrow_schema::ArrowError> {
449 let fields = edge_struct_fields();
450
451 let eid_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![eid]));
452 let type_arr: ArrayRef = Arc::new(StringArray::from(vec![type_name.to_owned()]));
453 let src_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![src]));
454 let dst_arr: ArrayRef = Arc::new(arrow_array::UInt64Array::from(vec![dst]));
455 let props_arr: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(props_bytes)]));
456
457 StructArray::try_new(
458 fields,
459 vec![eid_arr, type_arr, src_arr, dst_arr, props_arr],
460 None,
461 )
462}
463
464pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
474 r.procedure(
475 QName::new("uni", "create.vNode"),
476 VNodeProcedure::signature_static().clone(),
477 Arc::new(VNodeProcedure),
478 )?;
479 r.procedure(
480 QName::new("uni", "create.vEdge"),
481 VEdgeProcedure::signature_static().clone(),
482 Arc::new(VEdgeProcedure),
483 )?;
484 Ok(())
485}