1use super::core::*;
5use anyhow::{Result, anyhow};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uni_common::Value;
9use uni_cypher::ast::Expr;
10use uni_store::QueryContext;
11use uni_store::runtime::property_manager::PropertyManager;
12
13fn success_result(success: bool) -> Result<Vec<HashMap<String, Value>>> {
14 Ok(vec![HashMap::from([(
15 "success".to_string(),
16 Value::Bool(success),
17 )])])
18}
19
20#[derive(Debug, Clone, PartialEq)]
22pub enum ProcedureValueType {
23 String,
25 Integer,
27 Float,
29 Number,
31 Boolean,
33 Any,
35}
36
37#[derive(Debug, Clone)]
39pub struct ProcedureParam {
40 pub name: String,
42 pub param_type: ProcedureValueType,
44}
45
46#[derive(Debug, Clone)]
48pub struct ProcedureOutput {
49 pub name: String,
51 pub output_type: ProcedureValueType,
53}
54
55#[derive(Debug, Clone)]
60pub struct RegisteredProcedure {
61 pub name: String,
63 pub params: Vec<ProcedureParam>,
65 pub outputs: Vec<ProcedureOutput>,
67 pub data: Vec<HashMap<String, Value>>,
69}
70
71#[derive(Debug, Default)]
80pub struct ProcedureRegistry {
81 procedures: std::sync::RwLock<HashMap<String, RegisteredProcedure>>,
82 plugin_registry: std::sync::RwLock<Option<Arc<uni_plugin::PluginRegistry>>>,
83}
84
85impl ProcedureRegistry {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn register(&self, proc_def: RegisteredProcedure) {
93 self.procedures
94 .write()
95 .expect("ProcedureRegistry lock poisoned")
96 .insert(proc_def.name.clone(), proc_def);
97 }
98
99 pub fn get(&self, name: &str) -> Option<RegisteredProcedure> {
101 self.procedures
102 .read()
103 .expect("ProcedureRegistry lock poisoned")
104 .get(name)
105 .cloned()
106 }
107
108 pub fn clear(&self) {
110 self.procedures
111 .write()
112 .expect("ProcedureRegistry lock poisoned")
113 .clear();
114 }
115
116 pub fn set_plugin_registry(&self, pr: Arc<uni_plugin::PluginRegistry>) {
122 *self
123 .plugin_registry
124 .write()
125 .expect("ProcedureRegistry plugin-registry lock poisoned") = Some(pr);
126 }
127
128 pub fn plugin_registry(&self) -> Option<Arc<uni_plugin::PluginRegistry>> {
136 self.plugin_registry
137 .read()
138 .expect("ProcedureRegistry plugin-registry lock poisoned")
139 .clone()
140 }
141
142 pub fn get_plugin(
150 &self,
151 qname: &uni_plugin::QName,
152 ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
153 if let Some(session_pr) = crate::current_session_plugin_registry()
155 && let Some(entry) = session_pr.procedure(qname)
156 {
157 return Some(entry);
158 }
159 self.plugin_registry
160 .read()
161 .expect("ProcedureRegistry plugin-registry lock poisoned")
162 .as_ref()
163 .and_then(|pr| pr.procedure(qname))
164 }
165
166 pub fn resolve_user_procedure(
182 &self,
183 user_qname: &str,
184 ) -> Option<std::sync::Arc<uni_plugin::registry::ProcedureEntry>> {
185 for q in uni_plugin::QName::candidate_splits(user_qname) {
190 if let Some(p) = self.get_plugin(&q) {
191 return Some(p);
192 }
193 }
194 let stripped = user_qname.strip_prefix("uni.").unwrap_or(user_qname);
198 for plugin_id in ["uni", "builtin", "apoc-core", "custom"] {
199 if let Some(p) = self.get_plugin(&uni_plugin::QName::new(plugin_id, stripped)) {
200 return Some(p);
201 }
202 }
203 None
204 }
205}
206
207use crate::query::df_graph::procedure_call::value_to_columnar;
208
209fn arrow_scalar_to_value(
221 arr: &dyn arrow_array::Array,
222 row_idx: usize,
223) -> std::result::Result<Value, String> {
224 use arrow_array::cast::AsArray;
225 use arrow_schema::DataType as Dt;
226
227 if arr.is_null(row_idx) {
228 return Ok(Value::Null);
229 }
230 match arr.data_type() {
231 Dt::Boolean => Ok(Value::Bool(arr.as_boolean().value(row_idx))),
232 Dt::Int64 => Ok(Value::Int(
233 arr.as_primitive::<arrow_array::types::Int64Type>()
234 .value(row_idx),
235 )),
236 Dt::Int32 => Ok(Value::Int(
237 arr.as_primitive::<arrow_array::types::Int32Type>()
238 .value(row_idx) as i64,
239 )),
240 Dt::UInt64 => Ok(Value::Int(
241 arr.as_primitive::<arrow_array::types::UInt64Type>()
242 .value(row_idx) as i64,
243 )),
244 Dt::Float64 => Ok(Value::Float(
245 arr.as_primitive::<arrow_array::types::Float64Type>()
246 .value(row_idx),
247 )),
248 Dt::Float32 => Ok(Value::Float(
249 arr.as_primitive::<arrow_array::types::Float32Type>()
250 .value(row_idx) as f64,
251 )),
252 Dt::Utf8 => Ok(Value::String(
253 arr.as_string::<i32>().value(row_idx).to_string(),
254 )),
255 Dt::LargeUtf8 => Ok(Value::String(
256 arr.as_string::<i64>().value(row_idx).to_string(),
257 )),
258 Dt::Binary => Ok(Value::Bytes(arr.as_binary::<i32>().value(row_idx).to_vec())),
259 Dt::LargeBinary => Ok(Value::Bytes(arr.as_binary::<i64>().value(row_idx).to_vec())),
260 other => Err(format!(
261 "unsupported Arrow type in plugin procedure output: {other:?}"
262 )),
263 }
264}
265
266fn filter_yield_items(
269 full_result: HashMap<String, Value>,
270 yield_items: &[String],
271) -> HashMap<String, Value> {
272 if yield_items.is_empty() {
273 return full_result;
274 }
275 yield_items
276 .iter()
277 .filter_map(|name| full_result.get(name).map(|val| (name.clone(), val.clone())))
278 .collect()
279}
280
281impl Executor {
282 async fn eval_string_arg<'a>(
284 &'a self,
285 arg: &Expr,
286 description: &str,
287 prop_manager: &'a PropertyManager,
288 params: &'a HashMap<String, Value>,
289 ctx: Option<&'a QueryContext>,
290 ) -> Result<String> {
291 let empty_row = HashMap::new();
292 self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
293 .await?
294 .as_str()
295 .ok_or_else(|| anyhow!("{} must be string", description))
296 .map(|s| s.to_string())
297 }
298
299 pub(crate) async fn execute_procedure<'a>(
300 &'a self,
301 name: &str,
302 args: &[Expr],
303 yield_items: &[String],
304 prop_manager: &'a PropertyManager,
305 params: &'a HashMap<String, Value>,
306 ctx: Option<&'a QueryContext>,
307 ) -> Result<Vec<HashMap<String, Value>>> {
308 match name {
309 "uni.admin.compact" => {
310 let stats = self.storage.compact().await?;
311 let full_result = HashMap::from([
312 (
313 "files_compacted".to_string(),
314 Value::Int(stats.files_compacted as i64),
315 ),
316 (
317 "bytes_before".to_string(),
318 Value::Int(stats.bytes_before as i64),
319 ),
320 (
321 "bytes_after".to_string(),
322 Value::Int(stats.bytes_after as i64),
323 ),
324 (
325 "duration_ms".to_string(),
326 Value::Int(stats.duration.as_millis() as i64),
327 ),
328 ]);
329
330 Ok(vec![filter_yield_items(full_result, yield_items)])
331 }
332 "uni.admin.compactionStatus" => {
333 let status = self
334 .storage
335 .compaction_status()
336 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
337 let full_result = HashMap::from([
338 ("l1_runs".to_string(), Value::Int(status.l1_runs as i64)),
339 (
340 "l1_size_bytes".to_string(),
341 Value::Int(status.l1_size_bytes as i64),
342 ),
343 (
344 "in_progress".to_string(),
345 Value::Bool(status.compaction_in_progress),
346 ),
347 (
348 "pending".to_string(),
349 Value::Int(status.compaction_pending as i64),
350 ),
351 (
352 "total_compactions".to_string(),
353 Value::Int(status.total_compactions as i64),
354 ),
355 (
356 "total_bytes_compacted".to_string(),
357 Value::Int(status.total_bytes_compacted as i64),
358 ),
359 ]);
360
361 Ok(vec![filter_yield_items(full_result, yield_items)])
362 }
363 "uni.admin.snapshot.create" => {
364 let name = if !args.is_empty() {
365 Some(
366 self.eval_string_arg(&args[0], "Snapshot name", prop_manager, params, ctx)
367 .await?,
368 )
369 } else {
370 None
371 };
372
373 let writer_arc = self
374 .writer
375 .as_ref()
376 .ok_or_else(|| anyhow!("Database is in read-only mode"))?;
377 let writer: &uni_store::Writer = writer_arc.as_ref();
378 let snapshot_id = writer.flush_to_l1(name).await?;
379
380 Ok(vec![HashMap::from([(
381 "snapshot_id".to_string(),
382 Value::String(snapshot_id),
383 )])])
384 }
385 "uni.admin.snapshot.list" => {
386 let sm = self.storage.snapshot_manager();
387 let ids = sm.list_snapshots().await?;
388 let mut results = Vec::new();
389 for id in ids {
390 if let Ok(m) = sm.load_snapshot(&id).await {
391 results.push(HashMap::from([
392 ("snapshot_id".to_string(), Value::String(m.snapshot_id)),
393 (
394 "name".to_string(),
395 m.name.map(Value::String).unwrap_or(Value::Null),
396 ),
397 (
398 "created_at".to_string(),
399 Value::String(m.created_at.to_rfc3339()),
400 ),
401 (
402 "version_hwm".to_string(),
403 Value::Int(m.version_high_water_mark as i64),
404 ),
405 ]));
406 }
407 }
408 Ok(results)
409 }
410 "uni.admin.snapshot.restore" => {
411 let id = self
412 .eval_string_arg(&args[0], "Snapshot ID", prop_manager, params, ctx)
413 .await?;
414
415 self.storage
416 .snapshot_manager()
417 .set_latest_snapshot(&id)
418 .await?;
419 Ok(vec![HashMap::from([(
420 "status".to_string(),
421 Value::String("Restored".to_string()),
422 )])])
423 }
424 "uni.schema.createLabel" => {
426 let empty_row = HashMap::new();
427 let name = self
428 .eval_string_arg(&args[0], "Label name", prop_manager, params, ctx)
429 .await?;
430 let config = self
431 .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
432 .await?;
433
434 let success =
435 super::ddl_procedures::create_label(&self.storage, &name, &config).await?;
436 success_result(success)
437 }
438 "uni.schema.createEdgeType" => {
439 let empty_row = HashMap::new();
440 let name = self
441 .eval_string_arg(&args[0], "Edge type name", prop_manager, params, ctx)
442 .await?;
443 let src_val = self
444 .evaluate_expr(&args[1], &empty_row, prop_manager, params, ctx)
445 .await?;
446 let dst_val = self
447 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
448 .await?;
449 let config = self
450 .evaluate_expr(&args[3], &empty_row, prop_manager, params, ctx)
451 .await?;
452
453 let src_labels = src_val
455 .as_array()
456 .ok_or(anyhow!("Source labels must be a list"))?
457 .iter()
458 .map(|v| {
459 v.as_str()
460 .map(|s| s.to_string())
461 .ok_or(anyhow!("Label must be string"))
462 })
463 .collect::<Result<Vec<_>>>()?;
464 let dst_labels = dst_val
465 .as_array()
466 .ok_or(anyhow!("Target labels must be a list"))?
467 .iter()
468 .map(|v| {
469 v.as_str()
470 .map(|s| s.to_string())
471 .ok_or(anyhow!("Label must be string"))
472 })
473 .collect::<Result<Vec<_>>>()?;
474
475 let success = super::ddl_procedures::create_edge_type(
476 &self.storage,
477 &name,
478 src_labels,
479 dst_labels,
480 &config,
481 )
482 .await?;
483 success_result(success)
484 }
485 "uni.schema.createIndex" => {
486 let empty_row = HashMap::new();
487 let label = self
488 .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
489 .await?;
490 let property = self
491 .eval_string_arg(&args[1], "Property", prop_manager, params, ctx)
492 .await?;
493 let config = self
494 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
495 .await?;
496
497 let success =
498 super::ddl_procedures::create_index(&self.storage, &label, &property, &config)
499 .await?;
500 success_result(success)
501 }
502 "uni.schema.createConstraint" => {
503 let label = self
504 .eval_string_arg(&args[0], "Label", prop_manager, params, ctx)
505 .await?;
506 let c_type = self
507 .eval_string_arg(&args[1], "Constraint type", prop_manager, params, ctx)
508 .await?;
509 let empty_row = HashMap::new();
510 let props_val = self
511 .evaluate_expr(&args[2], &empty_row, prop_manager, params, ctx)
512 .await?;
513
514 let properties = props_val
515 .as_array()
516 .ok_or(anyhow!("Properties must be a list"))?
517 .iter()
518 .map(|v| {
519 v.as_str()
520 .map(|s| s.to_string())
521 .ok_or(anyhow!("Property must be string"))
522 })
523 .collect::<Result<Vec<_>>>()?;
524
525 let success = super::ddl_procedures::create_constraint(
526 &self.storage,
527 &label,
528 &c_type,
529 properties,
530 )
531 .await?;
532 success_result(success)
533 }
534 "uni.schema.dropLabel"
539 | "uni.schema.dropEdgeType"
540 | "uni.schema.dropIndex"
541 | "uni.schema.dropConstraint" => {
542 let description = match name {
543 "uni.schema.dropLabel" => "Label name",
544 "uni.schema.dropEdgeType" => "Edge type name",
545 "uni.schema.dropIndex" => "Index name",
546 _ => "Constraint name",
547 };
548 let target = self
549 .eval_string_arg(&args[0], description, prop_manager, params, ctx)
550 .await?;
551 let success = match name {
552 "uni.schema.dropLabel" => {
553 super::ddl_procedures::drop_label(&self.storage, &target).await?
554 }
555 "uni.schema.dropEdgeType" => {
556 super::ddl_procedures::drop_edge_type(&self.storage, &target).await?
557 }
558 "uni.schema.dropIndex" => {
559 super::ddl_procedures::drop_index(&self.storage, &target).await?
560 }
561 _ => super::ddl_procedures::drop_constraint(&self.storage, &target).await?,
562 };
563 success_result(success)
564 }
565 _ => {
566 if let Some(registry) = &self.procedure_registry
569 && let Some(entry) = registry.resolve_user_procedure(name)
570 {
571 return self
572 .execute_plugin_procedure(
573 name,
574 &entry,
575 args,
576 yield_items,
577 prop_manager,
578 params,
579 ctx,
580 )
581 .await;
582 }
583
584 if let Some(registry) = &self.procedure_registry
586 && let Some(proc_def) = registry.get(name)
587 {
588 return self
589 .execute_registered_procedure(
590 &proc_def,
591 args,
592 yield_items,
593 prop_manager,
594 params,
595 ctx,
596 )
597 .await;
598 }
599 Err(anyhow!("ProcedureNotFound: Unknown procedure '{}'", name))
600 }
601 }
602 }
603
604 #[allow(clippy::too_many_arguments)] async fn execute_plugin_procedure<'a>(
613 &'a self,
614 name: &str,
615 entry: &uni_plugin::registry::ProcedureEntry,
616 args: &[Expr],
617 yield_items: &[String],
618 prop_manager: &'a PropertyManager,
619 params: &'a HashMap<String, Value>,
620 ctx: Option<&'a QueryContext>,
621 ) -> Result<Vec<HashMap<String, Value>>> {
622 use datafusion::logical_expr::ColumnarValue;
623 use futures::StreamExt;
624
625 let empty_row: HashMap<String, Value> = HashMap::new();
628 let mut columnar_args: Vec<ColumnarValue> = Vec::with_capacity(args.len());
629 for arg in args {
630 let v = self
631 .evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
632 .await?;
633 columnar_args.push(
634 value_to_columnar(&v)
635 .map_err(|e| anyhow!("Procedure '{name}': argument conversion failed: {e}"))?,
636 );
637 }
638
639 let mut host = crate::query::executor::procedure_host::QueryProcedureHost::from_components(
640 Arc::clone(&self.storage),
641 Some(Arc::clone(&self.algo_registry)),
642 self.procedure_registry.clone(),
643 );
644 if let Some(writer) = &self.writer {
652 host = host.with_writer(Arc::clone(writer));
653 }
654 let principal = crate::current_principal();
661 let pctx = uni_plugin::host::build_procedure_context(&host, principal.as_deref());
662 let mut stream = entry
663 .procedure
664 .invoke(pctx, &columnar_args)
665 .map_err(|e| anyhow!("Procedure '{name}': {e}"))?;
666
667 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
670 while let Some(item) = stream.next().await {
671 let batch = item.map_err(|e| anyhow!("Procedure '{name}' stream error: {e}"))?;
672 for row_idx in 0..batch.num_rows() {
673 let mut row: HashMap<String, Value> = HashMap::new();
674 let schema = batch.schema();
675 for col_idx in 0..batch.num_columns() {
676 let field = schema.field(col_idx);
677 let arr = batch.column(col_idx);
678 let v = arrow_scalar_to_value(arr.as_ref(), row_idx)
679 .map_err(|e| anyhow!("Procedure '{name}': output decode: {e}"))?;
680 row.insert(field.name().clone(), v);
681 }
682 rows.push(filter_yield_items(row, yield_items));
683 }
684 }
685 Ok(rows)
686 }
687
688 async fn execute_registered_procedure<'a>(
699 &'a self,
700 proc_def: &RegisteredProcedure,
701 args: &[Expr],
702 yield_items: &[String],
703 prop_manager: &'a PropertyManager,
704 params: &'a HashMap<String, Value>,
705 ctx: Option<&'a QueryContext>,
706 ) -> Result<Vec<HashMap<String, Value>>> {
707 let empty_row = HashMap::new();
708
709 let mut evaluated_args = Vec::with_capacity(args.len());
711 for arg in args {
712 evaluated_args.push(
713 self.evaluate_expr(arg, &empty_row, prop_manager, params, ctx)
714 .await?,
715 );
716 }
717
718 if evaluated_args.len() != proc_def.params.len() {
720 if evaluated_args.is_empty() && !proc_def.params.is_empty() {
721 if yield_items.is_empty() {
722 let mut resolved = Vec::with_capacity(proc_def.params.len());
724 for param in &proc_def.params {
725 if let Some(val) = params.get(¶m.name) {
726 resolved.push(val.clone());
727 } else {
728 return Err(anyhow!(
729 "MissingParameter: Procedure '{}' requires implicit argument '{}' \
730 but it was not provided as a query parameter",
731 proc_def.name,
732 param.name
733 ));
734 }
735 }
736 evaluated_args = resolved;
737 } else {
738 return Err(anyhow!(
740 "InvalidArgumentPassingMode: Procedure '{}' requires explicit argument passing in in-query CALL",
741 proc_def.name
742 ));
743 }
744 } else {
745 return Err(anyhow!(
746 "InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
747 proc_def.name,
748 proc_def.params.len(),
749 evaluated_args.len()
750 ));
751 }
752 }
753
754 for (i, (arg_val, param)) in evaluated_args.iter().zip(&proc_def.params).enumerate() {
756 if !arg_val.is_null() && !check_type_compatible(arg_val, ¶m.param_type) {
757 return Err(anyhow!(
758 "InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
759 i,
760 param.name,
761 proc_def.name
762 ));
763 }
764 }
765
766 let filtered: Vec<&HashMap<String, Value>> = proc_def
768 .data
769 .iter()
770 .filter(|row| {
771 for (param, arg_val) in proc_def.params.iter().zip(&evaluated_args) {
772 if let Some(row_val) = row.get(¶m.name)
773 && !values_match(row_val, arg_val)
774 {
775 return false;
776 }
777 }
778 true
779 })
780 .collect();
781
782 let output_names: Vec<&str> = proc_def.outputs.iter().map(|o| o.name.as_str()).collect();
784
785 let results = filtered
789 .into_iter()
790 .map(|row| {
791 if yield_items.is_empty() {
792 output_names
793 .iter()
794 .filter_map(|name| {
795 row.get(*name).map(|val| ((*name).to_string(), val.clone()))
796 })
797 .collect()
798 } else {
799 filter_yield_items(row.clone(), yield_items)
800 }
801 })
802 .collect();
803
804 Ok(results)
805 }
806}
807
808fn check_type_compatible(val: &Value, expected: &ProcedureValueType) -> bool {
810 match expected {
811 ProcedureValueType::Any => true,
812 ProcedureValueType::String => val.is_string(),
813 ProcedureValueType::Boolean => val.is_bool(),
814 ProcedureValueType::Integer => val.is_i64(),
815 ProcedureValueType::Float => val.is_f64() || val.is_i64(),
816 ProcedureValueType::Number => val.is_number(),
817 }
818}
819
820fn values_match(row_val: &Value, arg_val: &Value) -> bool {
822 if arg_val.is_null() || row_val.is_null() {
823 return arg_val.is_null() && row_val.is_null();
824 }
825 if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
827 return (a - b).abs() < f64::EPSILON;
828 }
829 row_val == arg_val
830}