Skip to main content

uni_query/procedures_plugin/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! `uni.schema.*` read-only introspection procedures.
5//!
6//! Direct ports of `execute_schema_labels` / `execute_schema_edge_types`
7//! / `execute_schema_indexes` / `execute_schema_constraints` /
8//! `execute_schema_label_info` from `procedure_call.rs`. Each procedure
9//! emits a `RecordBatch` containing every natively-produced column; the
10//! plugin-path dispatcher in `execute_plugin_procedure` projects the
11//! caller's `YIELD` subset.
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::sync::OnceLock;
16
17use arrow_array::{ArrayRef, RecordBatch};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use datafusion::execution::SendableRecordBatchStream;
20use datafusion::logical_expr::ColumnarValue;
21use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
22use futures::stream;
23use uni_common::Value;
24use uni_plugin::traits::procedure::{
25    NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
26};
27use uni_plugin::traits::scalar::ArgType;
28use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
29
30use crate::query::df_graph::procedure_call::build_typed_column;
31use crate::query::executor::procedure_host::QueryProcedureHost;
32
33// Rust guideline compliant
34
35// ---------------------------------------------------------------------------
36// Shared helpers
37// ---------------------------------------------------------------------------
38
39fn require_host<'a>(ctx: &'a ProcedureContext<'_>) -> Result<&'a QueryProcedureHost, FnError> {
40    ctx.host
41        .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
42        .ok_or_else(|| {
43            FnError::new(
44                0x701,
45                "uni.schema.*: requires QueryProcedureHost (host not bound on ProcedureContext)",
46            )
47        })
48}
49
50fn require_string_arg(args: &[ColumnarValue], index: usize, name: &str) -> Result<String, FnError> {
51    use datafusion::scalar::ScalarValue;
52    match args.get(index) {
53        Some(ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))) => Ok(s.clone()),
54        Some(ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s)))) => Ok(s.clone()),
55        _ => Err(FnError::new(
56            FnError::CODE_TYPE_COERCION,
57            format!("uni.schema.*: {name} (arg #{index}) must be a non-null string"),
58        )),
59    }
60}
61
62fn rows_to_batch(
63    rows: Vec<HashMap<String, Value>>,
64    schema: SchemaRef,
65) -> Result<RecordBatch, FnError> {
66    if rows.is_empty() {
67        return Ok(RecordBatch::new_empty(schema));
68    }
69    let num_rows = rows.len();
70    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
71    for field in schema.fields() {
72        let name = field.name();
73        let values_iter = rows.iter().map(|row| row.get(name));
74        columns.push(build_typed_column(values_iter, num_rows, field.data_type()));
75    }
76    RecordBatch::try_new(schema, columns)
77        .map_err(|e| FnError::new(0x600, format!("uni.schema.*: build batch: {e}")))
78}
79
80fn single_batch_stream(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
81    Box::pin(RecordBatchStreamAdapter::new(
82        schema,
83        stream::iter(vec![Ok(batch)]),
84    ))
85}
86
87// ---------------------------------------------------------------------------
88// uni.schema.labels
89// ---------------------------------------------------------------------------
90
91#[derive(Debug)]
92struct SchemaLabelsProc;
93
94fn schema_labels_signature() -> &'static ProcedureSignature {
95    static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
96    SIG.get_or_init(|| ProcedureSignature {
97        args: vec![],
98        yields: vec![
99            Field::new("label", DataType::Utf8, true),
100            Field::new("propertyCount", DataType::Int64, true),
101            Field::new("nodeCount", DataType::Int64, true),
102            Field::new("indexCount", DataType::Int64, true),
103        ],
104        mode: ProcedureMode::Read,
105        side_effects: SideEffects::ReadOnly,
106        retry_contract: None,
107        batch_input: None,
108        docs: "List every label with property / node / index counts.".to_owned(),
109    })
110}
111
112impl ProcedurePlugin for SchemaLabelsProc {
113    fn signature(&self) -> &ProcedureSignature {
114        schema_labels_signature()
115    }
116
117    fn invoke(
118        &self,
119        ctx: ProcedureContext<'_>,
120        _args: &[ColumnarValue],
121    ) -> Result<SendableRecordBatchStream, FnError> {
122        let host = require_host(&ctx)?;
123        let storage = Arc::clone(host.storage());
124        let stream = futures::stream::once(async move {
125            let uni_schema = storage.schema_manager().schema();
126            let mut rows: Vec<HashMap<String, Value>> = Vec::new();
127            for label_name in uni_schema.labels.keys() {
128                let prop_count = uni_schema
129                    .properties
130                    .get(label_name)
131                    .map(|p| p.len() as i64)
132                    .unwrap_or(0);
133                let node_count = if let Ok(ds) = storage.vertex_dataset(label_name) {
134                    if let Ok(raw) = ds.open_raw().await {
135                        raw.count_rows(None).await.unwrap_or(0) as i64
136                    } else {
137                        0
138                    }
139                } else {
140                    0
141                };
142                let idx_count = uni_schema
143                    .indexes
144                    .iter()
145                    .filter(|i| i.label() == label_name)
146                    .count() as i64;
147                rows.push(HashMap::from([
148                    ("label".to_owned(), Value::String(label_name.clone())),
149                    ("propertyCount".to_owned(), Value::Int(prop_count)),
150                    ("nodeCount".to_owned(), Value::Int(node_count)),
151                    ("indexCount".to_owned(), Value::Int(idx_count)),
152                ]));
153            }
154            let schema = Arc::new(Schema::new(schema_labels_signature().yields.clone()));
155            rows_to_batch(rows, schema)
156                .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
157        });
158        let out_schema = Arc::new(Schema::new(schema_labels_signature().yields.clone()));
159        Ok(Box::pin(RecordBatchStreamAdapter::new(out_schema, stream)))
160    }
161}
162
163// ---------------------------------------------------------------------------
164// uni.schema.edgeTypes / uni.schema.relationshipTypes
165// ---------------------------------------------------------------------------
166
167#[derive(Debug)]
168struct SchemaEdgeTypesProc;
169
170fn schema_edge_types_signature() -> &'static ProcedureSignature {
171    static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
172    SIG.get_or_init(|| ProcedureSignature {
173        args: vec![],
174        yields: vec![
175            Field::new("type", DataType::Utf8, true),
176            Field::new("relationshipType", DataType::Utf8, true),
177            Field::new("sourceLabels", DataType::Utf8, true),
178            Field::new("targetLabels", DataType::Utf8, true),
179            Field::new("propertyCount", DataType::Int64, true),
180        ],
181        mode: ProcedureMode::Read,
182        side_effects: SideEffects::ReadOnly,
183        retry_contract: None,
184        batch_input: None,
185        docs: "List every edge type with source / target labels and property count.".to_owned(),
186    })
187}
188
189impl ProcedurePlugin for SchemaEdgeTypesProc {
190    fn signature(&self) -> &ProcedureSignature {
191        schema_edge_types_signature()
192    }
193
194    fn invoke(
195        &self,
196        ctx: ProcedureContext<'_>,
197        _args: &[ColumnarValue],
198    ) -> Result<SendableRecordBatchStream, FnError> {
199        let host = require_host(&ctx)?;
200        let uni_schema = host.storage().schema_manager().schema();
201        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
202        for (type_name, meta) in &uni_schema.edge_types {
203            let prop_count = uni_schema
204                .properties
205                .get(type_name)
206                .map(|p| p.len() as i64)
207                .unwrap_or(0);
208            rows.push(HashMap::from([
209                ("type".to_owned(), Value::String(type_name.clone())),
210                (
211                    "relationshipType".to_owned(),
212                    Value::String(type_name.clone()),
213                ),
214                (
215                    "sourceLabels".to_owned(),
216                    Value::String(format!("{:?}", meta.src_labels)),
217                ),
218                (
219                    "targetLabels".to_owned(),
220                    Value::String(format!("{:?}", meta.dst_labels)),
221                ),
222                ("propertyCount".to_owned(), Value::Int(prop_count)),
223            ]));
224        }
225        let schema = Arc::new(Schema::new(schema_edge_types_signature().yields.clone()));
226        let batch = rows_to_batch(rows, schema.clone())?;
227        Ok(single_batch_stream(schema, batch))
228    }
229}
230
231// ---------------------------------------------------------------------------
232// uni.schema.indexes
233// ---------------------------------------------------------------------------
234
235#[derive(Debug)]
236struct SchemaIndexesProc;
237
238fn schema_indexes_signature() -> &'static ProcedureSignature {
239    static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
240    SIG.get_or_init(|| ProcedureSignature {
241        args: vec![],
242        yields: vec![
243            Field::new("state", DataType::Utf8, true),
244            Field::new("name", DataType::Utf8, true),
245            Field::new("type", DataType::Utf8, true),
246            Field::new("label", DataType::Utf8, true),
247            Field::new("properties", DataType::Utf8, true),
248        ],
249        mode: ProcedureMode::Read,
250        side_effects: SideEffects::ReadOnly,
251        retry_contract: None,
252        batch_input: None,
253        docs: "List every index (Vector / FullText / Scalar / JsonFullText / Inverted).".to_owned(),
254    })
255}
256
257impl ProcedurePlugin for SchemaIndexesProc {
258    fn signature(&self) -> &ProcedureSignature {
259        schema_indexes_signature()
260    }
261
262    fn invoke(
263        &self,
264        ctx: ProcedureContext<'_>,
265        _args: &[ColumnarValue],
266    ) -> Result<SendableRecordBatchStream, FnError> {
267        use uni_common::core::schema::IndexDefinition;
268
269        let host = require_host(&ctx)?;
270        let uni_schema = host.storage().schema_manager().schema();
271        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
272        for idx in &uni_schema.indexes {
273            let (type_name, properties_json) = match idx {
274                IndexDefinition::Vector(v) => (
275                    "VECTOR",
276                    serde_json::to_string(&[&v.property]).unwrap_or_default(),
277                ),
278                IndexDefinition::FullText(f) => (
279                    "FULLTEXT",
280                    serde_json::to_string(&f.properties).unwrap_or_default(),
281                ),
282                IndexDefinition::Scalar(s) => (
283                    "SCALAR",
284                    serde_json::to_string(&s.properties).unwrap_or_default(),
285                ),
286                IndexDefinition::JsonFullText(j) => (
287                    "JSON_FTS",
288                    serde_json::to_string(&[&j.column]).unwrap_or_default(),
289                ),
290                IndexDefinition::Inverted(inv) => (
291                    "INVERTED",
292                    serde_json::to_string(&[&inv.property]).unwrap_or_default(),
293                ),
294                _ => ("UNKNOWN", String::new()),
295            };
296            rows.push(HashMap::from([
297                ("state".to_owned(), Value::String("ONLINE".to_owned())),
298                ("name".to_owned(), Value::String(idx.name().to_owned())),
299                ("type".to_owned(), Value::String(type_name.to_owned())),
300                ("label".to_owned(), Value::String(idx.label().to_owned())),
301                ("properties".to_owned(), Value::String(properties_json)),
302            ]));
303        }
304        let schema = Arc::new(Schema::new(schema_indexes_signature().yields.clone()));
305        let batch = rows_to_batch(rows, schema.clone())?;
306        Ok(single_batch_stream(schema, batch))
307    }
308}
309
310// ---------------------------------------------------------------------------
311// uni.schema.constraints
312// ---------------------------------------------------------------------------
313
314#[derive(Debug)]
315struct SchemaConstraintsProc;
316
317fn schema_constraints_signature() -> &'static ProcedureSignature {
318    static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
319    SIG.get_or_init(|| ProcedureSignature {
320        args: vec![],
321        yields: vec![
322            Field::new("name", DataType::Utf8, true),
323            Field::new("enabled", DataType::Boolean, true),
324            Field::new("type", DataType::Utf8, true),
325            Field::new("properties", DataType::Utf8, true),
326            Field::new("expression", DataType::Utf8, true),
327            Field::new("label", DataType::Utf8, true),
328            Field::new("relationshipType", DataType::Utf8, true),
329            Field::new("target", DataType::Utf8, true),
330        ],
331        mode: ProcedureMode::Read,
332        side_effects: SideEffects::ReadOnly,
333        retry_contract: None,
334        batch_input: None,
335        docs: "List every constraint (Unique / Exists / Check) per label or edge type.".to_owned(),
336    })
337}
338
339impl ProcedurePlugin for SchemaConstraintsProc {
340    fn signature(&self) -> &ProcedureSignature {
341        schema_constraints_signature()
342    }
343
344    fn invoke(
345        &self,
346        ctx: ProcedureContext<'_>,
347        _args: &[ColumnarValue],
348    ) -> Result<SendableRecordBatchStream, FnError> {
349        use uni_common::core::schema::{ConstraintTarget, ConstraintType};
350
351        let host = require_host(&ctx)?;
352        let uni_schema = host.storage().schema_manager().schema();
353        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
354        for c in &uni_schema.constraints {
355            let mut row: HashMap<String, Value> = HashMap::new();
356            row.insert("name".to_owned(), Value::String(c.name.clone()));
357            row.insert("enabled".to_owned(), Value::Bool(c.enabled));
358            match &c.constraint_type {
359                ConstraintType::Unique { properties } => {
360                    row.insert("type".to_owned(), Value::String("UNIQUE".to_owned()));
361                    row.insert(
362                        "properties".to_owned(),
363                        Value::String(serde_json::to_string(&properties).unwrap_or_default()),
364                    );
365                }
366                ConstraintType::Exists { property } => {
367                    row.insert("type".to_owned(), Value::String("EXISTS".to_owned()));
368                    row.insert(
369                        "properties".to_owned(),
370                        Value::String(serde_json::to_string(&[&property]).unwrap_or_default()),
371                    );
372                }
373                ConstraintType::Check { expression } => {
374                    row.insert("type".to_owned(), Value::String("CHECK".to_owned()));
375                    row.insert("expression".to_owned(), Value::String(expression.clone()));
376                }
377                _ => {
378                    row.insert("type".to_owned(), Value::String("UNKNOWN".to_owned()));
379                }
380            }
381            match &c.target {
382                ConstraintTarget::Label(l) => {
383                    row.insert("label".to_owned(), Value::String(l.clone()));
384                }
385                ConstraintTarget::EdgeType(t) => {
386                    row.insert("relationshipType".to_owned(), Value::String(t.clone()));
387                }
388                _ => {
389                    row.insert("target".to_owned(), Value::String("UNKNOWN".to_owned()));
390                }
391            }
392            rows.push(row);
393        }
394        let schema = Arc::new(Schema::new(schema_constraints_signature().yields.clone()));
395        let batch = rows_to_batch(rows, schema.clone())?;
396        Ok(single_batch_stream(schema, batch))
397    }
398}
399
400// ---------------------------------------------------------------------------
401// uni.schema.labelInfo
402// ---------------------------------------------------------------------------
403
404#[derive(Debug)]
405struct SchemaLabelInfoProc;
406
407fn schema_label_info_signature() -> &'static ProcedureSignature {
408    static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
409    SIG.get_or_init(|| ProcedureSignature {
410        args: vec![NamedArgType {
411            name: smol_str::SmolStr::new("label"),
412            ty: ArgType::Primitive(DataType::Utf8),
413            default: None,
414            doc: "Label name to introspect.".to_owned(),
415        }],
416        yields: vec![
417            Field::new("property", DataType::Utf8, true),
418            Field::new("dataType", DataType::Utf8, true),
419            Field::new("nullable", DataType::Boolean, true),
420            Field::new("indexed", DataType::Boolean, true),
421            Field::new("unique", DataType::Boolean, true),
422        ],
423        mode: ProcedureMode::Read,
424        side_effects: SideEffects::ReadOnly,
425        retry_contract: None,
426        batch_input: None,
427        docs: "Per-property metadata (type, nullable, indexed, unique) for a given label."
428            .to_owned(),
429    })
430}
431
432impl ProcedurePlugin for SchemaLabelInfoProc {
433    fn signature(&self) -> &ProcedureSignature {
434        schema_label_info_signature()
435    }
436
437    fn invoke(
438        &self,
439        ctx: ProcedureContext<'_>,
440        args: &[ColumnarValue],
441    ) -> Result<SendableRecordBatchStream, FnError> {
442        use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
443
444        let host = require_host(&ctx)?;
445        let label_name = require_string_arg(args, 0, "label")?;
446        let uni_schema = host.storage().schema_manager().schema();
447
448        let mut rows: Vec<HashMap<String, Value>> = Vec::new();
449        if let Some(props) = uni_schema.properties.get(&label_name) {
450            for (prop_name, prop_meta) in props {
451                let is_indexed = uni_schema.indexes.iter().any(|idx| match idx {
452                    IndexDefinition::Vector(v) => v.label == label_name && v.property == *prop_name,
453                    IndexDefinition::Scalar(s) => {
454                        s.label == label_name && s.properties.contains(prop_name)
455                    }
456                    IndexDefinition::FullText(f) => {
457                        f.label == label_name && f.properties.contains(prop_name)
458                    }
459                    IndexDefinition::Inverted(inv) => {
460                        inv.label == label_name && inv.property == *prop_name
461                    }
462                    IndexDefinition::JsonFullText(j) => j.label == label_name,
463                    _ => false,
464                });
465                let unique = uni_schema.constraints.iter().any(|c| {
466                    if let ConstraintTarget::Label(l) = &c.target
467                        && l == &label_name
468                        && c.enabled
469                        && let ConstraintType::Unique { properties } = &c.constraint_type
470                    {
471                        return properties.contains(prop_name);
472                    }
473                    false
474                });
475                rows.push(HashMap::from([
476                    ("property".to_owned(), Value::String(prop_name.clone())),
477                    (
478                        "dataType".to_owned(),
479                        Value::String(format!("{:?}", prop_meta.r#type)),
480                    ),
481                    ("nullable".to_owned(), Value::Bool(prop_meta.nullable)),
482                    ("indexed".to_owned(), Value::Bool(is_indexed)),
483                    ("unique".to_owned(), Value::Bool(unique)),
484                ]));
485            }
486        }
487        let schema = Arc::new(Schema::new(schema_label_info_signature().yields.clone()));
488        let batch = rows_to_batch(rows, schema.clone())?;
489        Ok(single_batch_stream(schema, batch))
490    }
491}
492
493// ---------------------------------------------------------------------------
494// Registration
495// ---------------------------------------------------------------------------
496
497/// Register every `uni.schema.*` procedure into `r`. Edge types
498/// register under both `uni.schema.edgeTypes` and
499/// `uni.schema.relationshipTypes` for backward compatibility.
500///
501/// # Errors
502///
503/// Returns [`PluginError::DuplicateRegistration`] if a qname is taken.
504pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
505    r.procedure(
506        QName::new("uni", "schema.labels"),
507        schema_labels_signature().clone(),
508        Arc::new(SchemaLabelsProc),
509    )?;
510    let edge_types_impl: Arc<dyn ProcedurePlugin> = Arc::new(SchemaEdgeTypesProc);
511    r.procedure(
512        QName::new("uni", "schema.edgeTypes"),
513        schema_edge_types_signature().clone(),
514        Arc::clone(&edge_types_impl),
515    )?;
516    r.procedure(
517        QName::new("uni", "schema.relationshipTypes"),
518        schema_edge_types_signature().clone(),
519        edge_types_impl,
520    )?;
521    r.procedure(
522        QName::new("uni", "schema.indexes"),
523        schema_indexes_signature().clone(),
524        Arc::new(SchemaIndexesProc),
525    )?;
526    r.procedure(
527        QName::new("uni", "schema.constraints"),
528        schema_constraints_signature().clone(),
529        Arc::new(SchemaConstraintsProc),
530    )?;
531    r.procedure(
532        QName::new("uni", "schema.labelInfo"),
533        schema_label_info_signature().clone(),
534        Arc::new(SchemaLabelInfoProc),
535    )?;
536    Ok(())
537}