1use super::core::*;
5use anyhow::{Result, anyhow};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uni_common::Value;
9use uni_cypher::ast::Expr;
10use uni_store::QueryContext;
11use uni_store::runtime::property_manager::PropertyManager;
12
13fn success_result(success: bool) -> Result<Vec<HashMap<String, Value>>> {
14 Ok(vec![HashMap::from([(
15 "success".to_string(),
16 Value::Bool(success),
17 )])])
18}
19
20#[derive(Debug, Clone, PartialEq)]
22pub enum ProcedureValueType {
23 String,
25 Integer,
27 Float,
29 Number,
31 Boolean,
33 Any,
35}
36
37#[derive(Debug, Clone)]
39pub struct ProcedureParam {
40 pub name: String,
42 pub param_type: ProcedureValueType,
44}
45
46#[derive(Debug, Clone)]
48pub struct ProcedureOutput {
49 pub name: String,
51 pub output_type: ProcedureValueType,
53}
54
55#[derive(Debug, Clone)]
60pub struct RegisteredProcedure {
61 pub name: String,
63 pub params: Vec<ProcedureParam>,
65 pub outputs: Vec<ProcedureOutput>,
67 pub data: Vec<HashMap<String, Value>>,
69}
70
71#[derive(Debug, Default)]
80pub struct ProcedureRegistry {
81 procedures: std::sync::RwLock<HashMap<String, RegisteredProcedure>>,
82 plugin_registry: std::sync::RwLock<Option<Arc<uni_plugin::PluginRegistry>>>,
83}
84
85impl ProcedureRegistry {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn register(&self, proc_def: RegisteredProcedure) {
93 self.procedures
94 .write()
95 .expect("ProcedureRegistry lock poisoned")
96 .insert(proc_def.name.clone(), proc_def);
97 }
98
99 pub fn get(&self, name: &str) -> Option<RegisteredProcedure> {
101 self.procedures
102 .read()
103 .expect("ProcedureRegistry lock poisoned")
104 .get(name)
105 .cloned()
106 }
107
108 pub fn clear(&self) {
110 self.procedures
111 .write()
112 .expect("ProcedureRegistry lock poisoned")
113 .clear();
114 }
115
116 pub fn set_plugin_registry(&self, pr: Arc<uni_plugin::PluginRegistry>) {
122 *self
123 .plugin_registry
124 .write()
125 .expect("ProcedureRegistry plugin-registry lock poisoned") = Some(pr);
126 }
127
128 pub fn plugin_registry(&self) -> Option<Arc<uni_plugin::PluginRegistry>> {
136 self.plugin_registry
137 .read()
138 .expect("ProcedureRegistry plugin-registry lock poisoned")
139 .clone()
140 }
141
142 pub fn get_plugin(
150 &self,
151 qname: &uni_plugin::QName,
152 ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
153 if let Some(session_pr) = crate::current_session_plugin_registry()
155 && let Some(entry) = session_pr.procedure(qname)
156 {
157 return Some(entry);
158 }
159 self.plugin_registry
160 .read()
161 .expect("ProcedureRegistry plugin-registry lock poisoned")
162 .as_ref()
163 .and_then(|pr| pr.procedure(qname))
164 }
165
166 pub fn resolve_user_procedure(
182 &self,
183 user_qname: &str,
184 ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
185 if let Some((ns, local)) = user_qname.split_once('.')
187 && let Some(p) = self.get_plugin(&uni_plugin::QName::new(ns, local))
188 {
189 return Some(p);
190 }
191 let stripped = user_qname.strip_prefix("uni.").unwrap_or(user_qname);
195 for plugin_id in ["uni", "builtin", "apoc-core", "custom"] {
196 if let Some(p) = self.get_plugin(&uni_plugin::QName::new(plugin_id, stripped)) {
197 return Some(p);
198 }
199 }
200 None
201 }
202}
203
204use crate::query::df_graph::procedure_call::value_to_columnar;
205
206fn arrow_scalar_to_value(
218 arr: &dyn arrow_array::Array,
219 row_idx: usize,
220) -> std::result::Result<Value, String> {
221 use arrow_array::cast::AsArray;
222 use arrow_schema::DataType as Dt;
223
224 if arr.is_null(row_idx) {
225 return Ok(Value::Null);
226 }
227 match arr.data_type() {
228 Dt::Boolean => Ok(Value::Bool(arr.as_boolean().value(row_idx))),
229 Dt::Int64 => Ok(Value::Int(
230 arr.as_primitive::<arrow_array::types::Int64Type>()
231 .value(row_idx),
232 )),
233 Dt::Int32 => Ok(Value::Int(
234 arr.as_primitive::<arrow_array::types::Int32Type>()
235 .value(row_idx) as i64,
236 )),
237 Dt::UInt64 => Ok(Value::Int(
238 arr.as_primitive::<arrow_array::types::UInt64Type>()
239 .value(row_idx) as i64,
240 )),
241 Dt::Float64 => Ok(Value::Float(
242 arr.as_primitive::<arrow_array::types::Float64Type>()
243 .value(row_idx),
244 )),
245 Dt::Float32 => Ok(Value::Float(
246 arr.as_primitive::<arrow_array::types::Float32Type>()
247 .value(row_idx) as f64,
248 )),
249 Dt::Utf8 => Ok(Value::String(
250 arr.as_string::<i32>().value(row_idx).to_string(),
251 )),
252 Dt::LargeUtf8 => Ok(Value::String(
253 arr.as_string::<i64>().value(row_idx).to_string(),
254 )),
255 Dt::Binary => Ok(Value::Bytes(arr.as_binary::<i32>().value(row_idx).to_vec())),
256 Dt::LargeBinary => Ok(Value::Bytes(arr.as_binary::<i64>().value(row_idx).to_vec())),
257 other => Err(format!(
258 "unsupported Arrow type in plugin procedure output: {other:?}"
259 )),
260 }
261}
262
263fn filter_yield_items(
266 full_result: HashMap<String, Value>,
267 yield_items: &[String],
268) -> HashMap<String, Value> {
269 if yield_items.is_empty() {
270 return full_result;
271 }
272 yield_items
273 .iter()
274 .filter_map(|name| full_result.get(name).map(|val| (name.clone(), val.clone())))
275 .collect()
276}
277
278impl Executor {
279 async fn eval_string_arg<'a>(
281 &'a self,
282 arg: &Expr,
283 description: &str,
284 prop_manager: &'a PropertyManager,
285 params: &'a HashMap<String, Value>,
286 ctx: Option<&'a QueryContext>,
287 ) -> Result<String> {
288 let empty_row = HashMap::new();
289 self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
290 .await?
291 .as_str()
292 .ok_or_else(|| anyhow!("{} must be string", description))
293 .map(|s| s.to_string())
294 }
295
296 pub(crate) async fn execute_procedure<'a>(
297 &'a self,
298 name: &str,
299 args: &[Expr],
300 yield_items: &[String],
301 prop_manager: &'a PropertyManager,
302 params: &'a HashMap<String, Value>,
303 ctx: Option<&'a QueryContext>,
304 ) -> Result<Vec<HashMap<String, Value>>> {
305 match name {
306 "uni.admin.compact" => {
307 let stats = self.storage.compact().await?;
308 let full_result = HashMap::from([
309 (
310 "files_compacted".to_string(),
311 Value::Int(stats.files_compacted as i64),
312 ),
313 (
314 "bytes_before".to_string(),
315 Value::Int(stats.bytes_before as i64),
316 ),
317 (
318 "bytes_after".to_string(),
319 Value::Int(stats.bytes_after as i64),
320 ),
321 (
322 "duration_ms".to_string(),
323 Value::Int(stats.duration.as_millis() as i64),
324 ),
325 ]);
326
327 Ok(vec![filter_yield_items(full_result, yield_items)])
328 }
329 "uni.admin.compactionStatus" => {
330 let status = self
331 .storage
332 .compaction_status()
333 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
334 let full_result = HashMap::from([
335 ("l1_runs".to_string(), Value::Int(status.l1_runs as i64)),
336 (
337 "l1_size_bytes".to_string(),
338 Value::Int(status.l1_size_bytes as i64),
339 ),
340 (
341 "in_progress".to_string(),
342 Value::Bool(status.compaction_in_progress),
343 ),
344 (
345 "pending".to_string(),
346 Value::Int(status.compaction_pending as i64),
347 ),
348 (
349 "total_compactions".to_string(),
350 Value::Int(status.total_compactions as i64),
351 ),
352 (
353 "total_bytes_compacted".to_string(),
354 Value::Int(status.total_bytes_compacted as i64),
355 ),
356 ]);
357
358 Ok(vec![filter_yield_items(full_result, yield_items)])
359 }
360 "uni.admin.snapshot.create" => {
361 let name = if !args.is_empty() {
362 Some(
363 self.eval_string_arg(&args[0], "Snapshot name", prop_manager, params, ctx)
364 .await?,
365 )
366 } else {
367 None
368 };
369
370 let writer_arc = self
371 .writer
372 .as_ref()
373 .ok_or_else(|| anyhow!("Database is in read-only mode"))?;
374 let writer: &uni_store::Writer = writer_arc.as_ref();
375 let snapshot_id = writer.flush_to_l1(name).await?;
376
377 Ok(vec![HashMap::from([(
378 "snapshot_id".to_string(),
379 Value::String(snapshot_id),
380 )])])
381 }
382 "uni.admin.snapshot.list" => {
383 let sm = self.storage.snapshot_manager();
384 let ids = sm.list_snapshots().await?;
385 let mut results = Vec::new();
386 for id in ids {
387 if let Ok(m) = sm.load_snapshot(&id).await {
388 results.push(HashMap::from([
389 ("snapshot_id".to_string(), Value::String(m.snapshot_id)),
390 (
391 "name".to_string(),
392 m.name.map(Value::String).unwrap_or(Value::Null),
393 ),
394 (
395 "created_at".to_string(),
396 Value::String(m.created_at.to_rfc3339()),
397 ),
398 (
399 "version_hwm".to_string(),
400 Value::Int(m.version_high_water_mark as i64),
401 ),
402 ]));
403 }
404 }
405 Ok(results)
406 }
407 "uni.admin.snapshot.restore" => {
408 let id = self
409 .eval_string_arg(&args[0], "Snapshot ID", prop_manager, params, ctx)
410 .await?;
411
412 self.storage
413 .snapshot_manager()
414 .set_latest_snapshot(&id)
415 .await?;
416 Ok(vec![HashMap::from([(
417 "status".to_string(),
418 Value::String("Restored".to_string()),
419 )])])
420 }
421 "uni.schema.createLabel" => {
423 let empty_row = HashMap::new();
424 let name = self
425 .eval_string_arg(&args[0], "Label name", prop_manager, params, ctx)
426 .await?;
427 let config = self
428 .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
429 .await?;
430
431 let success =
432 super::ddl_procedures::create_label(&self.storage, &name, &config).await?;
433 success_result(success)
434 }
435 "uni.schema.createEdgeType" => {
436 let empty_row = HashMap::new();
437 let name = self
438 .eval_string_arg(&args[0], "Edge type name", prop_manager, params, ctx)
439 .await?;
440 let src_val = self
441 .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
442 .await?;
443 let dst_val = self
444 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
445 .await?;
446 let config = self
447 .evaluate_expr(&args[3], &empty_row, prop_manager, params, ctx)
448 .await?;
449
450 let src_labels = src_val
452 .as_array()
453 .ok_or(anyhow!("Source labels must be a list"))?
454 .iter()
455 .map(|v| {
456 v.as_str()
457 .map(|s| s.to_string())
458 .ok_or(anyhow!("Label must be string"))
459 })
460 .collect::<Result<Vec<_>>>()?;
461 let dst_labels = dst_val
462 .as_array()
463 .ok_or(anyhow!("Target labels must be a list"))?
464 .iter()
465 .map(|v| {
466 v.as_str()
467 .map(|s| s.to_string())
468 .ok_or(anyhow!("Label must be string"))
469 })
470 .collect::<Result<Vec<_>>>()?;
471
472 let success = super::ddl_procedures::create_edge_type(
473 &self.storage,
474 &name,
475 src_labels,
476 dst_labels,
477 &config,
478 )
479 .await?;
480 success_result(success)
481 }
482 "uni.schema.createIndex" => {
483 let empty_row = HashMap::new();
484 let label = self
485 .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
486 .await?;
487 let property = self
488 .eval_string_arg(&args[1], "Property", prop_manager, params, ctx)
489 .await?;
490 let config = self
491 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
492 .await?;
493
494 let success =
495 super::ddl_procedures::create_index(&self.storage, &label, &property, &config)
496 .await?;
497 success_result(success)
498 }
499 "uni.schema.createConstraint" => {
500 let label = self
501 .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
502 .await?;
503 let c_type = self
504 .eval_string_arg(&args[1], "Constraint type", prop_manager, params, ctx)
505 .await?;
506 let empty_row = HashMap::new();
507 let props_val = self
508 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
509 .await?;
510
511 let properties = props_val
512 .as_array()
513 .ok_or(anyhow!("Properties must be a list"))?
514 .iter()
515 .map(|v| {
516 v.as_str()
517 .map(|s| s.to_string())
518 .ok_or(anyhow!("Property must be string"))
519 })
520 .collect::<Result<Vec<_>>>()?;
521
522 let success = super::ddl_procedures::create_constraint(
523 &self.storage,
524 &label,
525 &c_type,
526 properties,
527 )
528 .await?;
529 success_result(success)
530 }
531 "uni.schema.dropLabel"
536 | "uni.schema.dropEdgeType"
537 | "uni.schema.dropIndex"
538 | "uni.schema.dropConstraint" => {
539 let description = match name {
540 "uni.schema.dropLabel" => "Label name",
541 "uni.schema.dropEdgeType" => "Edge type name",
542 "uni.schema.dropIndex" => "Index name",
543 _ => "Constraint name",
544 };
545 let target = self
546 .eval_string_arg(&args[0], description, prop_manager, params, ctx)
547 .await?;
548 let success = match name {
549 "uni.schema.dropLabel" => {
550 super::ddl_procedures::drop_label(&self.storage, &target).await?
551 }
552 "uni.schema.dropEdgeType" => {
553 super::ddl_procedures::drop_edge_type(&self.storage, &target).await?
554 }
555 "uni.schema.dropIndex" => {
556 super::ddl_procedures::drop_index(&self.storage, &target).await?
557 }
558 _ => super::ddl_procedures::drop_constraint(&self.storage, &target).await?,
559 };
560 success_result(success)
561 }
562 _ => {
563 if let Some(registry) = &self.procedure_registry
566 && let Some(entry) = registry.resolve_user_procedure(name)
567 {
568 return self
569 .execute_plugin_procedure(
570 name,
571 &entry,
572 args,
573 yield_items,
574 prop_manager,
575 params,
576 ctx,
577 )
578 .await;
579 }
580
581 if let Some(registry) = &self.procedure_registry
583 && let Some(proc_def) = registry.get(name)
584 {
585 return self
586 .execute_registered_procedure(
587 &proc_def,
588 args,
589 yield_items,
590 prop_manager,
591 params,
592 ctx,
593 )
594 .await;
595 }
596 Err(anyhow!("ProcedureNotFound: Unknown procedure '{}'", name))
597 }
598 }
599 }
600
601 #[allow(clippy::too_many_arguments)] async fn execute_plugin_procedure<'a>(
610 &'a self,
611 name: &str,
612 entry: &uni_plugin::registry::ProcedureEntry,
613 args: &[Expr],
614 yield_items: &[String],
615 prop_manager: &'a PropertyManager,
616 params: &'a HashMap<String, Value>,
617 ctx: Option<&'a QueryContext>,
618 ) -> Result<Vec<HashMap<String, Value>>> {
619 use datafusion::logical_expr::ColumnarValue;
620 use futures::StreamExt;
621
622 let empty_row: HashMap<String, Value> = HashMap::new();
625 let mut columnar_args: Vec<ColumnarValue> = Vec::with_capacity(args.len());
626 for arg in args {
627 let v = self
628 .evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
629 .await?;
630 columnar_args.push(
631 value_to_columnar(&v)
632 .map_err(|e| anyhow!("Procedure '{name}': argument conversion failed: {e}"))?,
633 );
634 }
635
636 let mut host = crate::query::executor::procedure_host::QueryProcedureHost::from_components(
637 Arc::clone(&self.storage),
638 Some(Arc::clone(&self.algo_registry)),
639 self.procedure_registry.clone(),
640 );
641 if let Some(writer) = &self.writer {
649 host = host.with_writer(Arc::clone(writer));
650 }
651 let principal = crate::current_principal();
658 let pctx = uni_plugin::host::build_procedure_context(&host, principal.as_deref());
659 let mut stream = entry
660 .procedure
661 .invoke(pctx, &columnar_args)
662 .map_err(|e| anyhow!("Procedure '{name}': {e}"))?;
663
664 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
667 while let Some(item) = stream.next().await {
668 let batch = item.map_err(|e| anyhow!("Procedure '{name}' stream error: {e}"))?;
669 for row_idx in 0..batch.num_rows() {
670 let mut row: HashMap<String, Value> = HashMap::new();
671 let schema = batch.schema();
672 for col_idx in 0..batch.num_columns() {
673 let field = schema.field(col_idx);
674 let arr = batch.column(col_idx);
675 let v = arrow_scalar_to_value(arr.as_ref(), row_idx)
676 .map_err(|e| anyhow!("Procedure '{name}': output decode: {e}"))?;
677 row.insert(field.name().clone(), v);
678 }
679 rows.push(filter_yield_items(row, yield_items));
680 }
681 }
682 Ok(rows)
683 }
684
685 async fn execute_registered_procedure<'a>(
696 &'a self,
697 proc_def: &RegisteredProcedure,
698 args: &[Expr],
699 yield_items: &[String],
700 prop_manager: &'a PropertyManager,
701 params: &'a HashMap<String, Value>,
702 ctx: Option<&'a QueryContext>,
703 ) -> Result<Vec<HashMap<String, Value>>> {
704 let empty_row = HashMap::new();
705
706 let mut evaluated_args = Vec::with_capacity(args.len());
708 for arg in args {
709 evaluated_args.push(
710 self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
711 .await?,
712 );
713 }
714
715 if evaluated_args.len() != proc_def.params.len() {
717 if evaluated_args.is_empty() && !proc_def.params.is_empty() {
718 if yield_items.is_empty() {
719 let mut resolved = Vec::with_capacity(proc_def.params.len());
721 for param in &proc_def.params {
722 if let Some(val) = params.get(¶m.name) {
723 resolved.push(val.clone());
724 } else {
725 return Err(anyhow!(
726 "MissingParameter: Procedure '{}' requires implicit argument '{}' \
727 but it was not provided as a query parameter",
728 proc_def.name,
729 param.name
730 ));
731 }
732 }
733 evaluated_args = resolved;
734 } else {
735 return Err(anyhow!(
737 "InvalidArgumentPassingMode: Procedure '{}' requires explicit argument passing in in-query CALL",
738 proc_def.name
739 ));
740 }
741 } else {
742 return Err(anyhow!(
743 "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
744 proc_def.name,
745 proc_def.params.len(),
746 evaluated_args.len()
747 ));
748 }
749 }
750
751 for (i, (arg_val, param)) in evaluated_args.iter().zip(&proc_def.params).enumerate() {
753 if !arg_val.is_null() && !check_type_compatible(arg_val, ¶m.param_type) {
754 return Err(anyhow!(
755 "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
756 i,
757 param.name,
758 proc_def.name
759 ));
760 }
761 }
762
763 let filtered: Vec<&HashMap<String, Value>> = proc_def
765 .data
766 .iter()
767 .filter(|row| {
768 for (param, arg_val) in proc_def.params.iter().zip(&evaluated_args) {
769 if let Some(row_val) = row.get(¶m.name)
770 && !values_match(row_val, arg_val)
771 {
772 return false;
773 }
774 }
775 true
776 })
777 .collect();
778
779 let output_names: Vec<&str> = proc_def.outputs.iter().map(|o| o.name.as_str()).collect();
781
782 let results = filtered
786 .into_iter()
787 .map(|row| {
788 if yield_items.is_empty() {
789 output_names
790 .iter()
791 .filter_map(|name| {
792 row.get(*name).map(|val| ((*name).to_string(), val.clone()))
793 })
794 .collect()
795 } else {
796 filter_yield_items(row.clone(), yield_items)
797 }
798 })
799 .collect();
800
801 Ok(results)
802 }
803}
804
805fn check_type_compatible(val: &Value, expected: &ProcedureValueType) -> bool {
807 match expected {
808 ProcedureValueType::Any => true,
809 ProcedureValueType::String => val.is_string(),
810 ProcedureValueType::Boolean => val.is_bool(),
811 ProcedureValueType::Integer => val.is_i64(),
812 ProcedureValueType::Float => val.is_f64() || val.is_i64(),
813 ProcedureValueType::Number => val.is_number(),
814 }
815}
816
817fn values_match(row_val: &Value, arg_val: &Value) -> bool {
819 if arg_val.is_null() || row_val.is_null() {
820 return arg_val.is_null() && row_val.is_null();
821 }
822 if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
824 return (a - b).abs() < f64::EPSILON;
825 }
826 row_val == arg_val
827}