1use std::collections::HashMap;
14use std::sync::Arc;
15use std::sync::OnceLock;
16
17use arrow_array::{ArrayRef, RecordBatch};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use datafusion::execution::SendableRecordBatchStream;
20use datafusion::logical_expr::ColumnarValue;
21use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
22use futures::stream;
23use uni_common::Value;
24use uni_plugin::traits::procedure::{
25 NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
26};
27use uni_plugin::traits::scalar::ArgType;
28use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
29
30use crate::query::df_graph::procedure_call::build_typed_column;
31use crate::query::executor::procedure_host::QueryProcedureHost;
32
33fn require_host<'a>(ctx: &'a ProcedureContext<'_>) -> Result<&'a QueryProcedureHost, FnError> {
40 ctx.host
41 .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
42 .ok_or_else(|| {
43 FnError::new(
44 0x701,
45 "uni.schema.*: requires QueryProcedureHost (host not bound on ProcedureContext)",
46 )
47 })
48}
49
50fn require_string_arg(args: &[ColumnarValue], index: usize, name: &str) -> Result<String, FnError> {
51 use datafusion::scalar::ScalarValue;
52 match args.get(index) {
53 Some(ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))) => Ok(s.clone()),
54 Some(ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s)))) => Ok(s.clone()),
55 _ => Err(FnError::new(
56 FnError::CODE_TYPE_COERCION,
57 format!("uni.schema.*: {name} (arg #{index}) must be a non-null string"),
58 )),
59 }
60}
61
62fn rows_to_batch(
63 rows: Vec<HashMap<String, Value>>,
64 schema: SchemaRef,
65) -> Result<RecordBatch, FnError> {
66 if rows.is_empty() {
67 return Ok(RecordBatch::new_empty(schema));
68 }
69 let num_rows = rows.len();
70 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
71 for field in schema.fields() {
72 let name = field.name();
73 let values_iter = rows.iter().map(|row| row.get(name));
74 columns.push(build_typed_column(values_iter, num_rows, field.data_type()));
75 }
76 RecordBatch::try_new(schema, columns)
77 .map_err(|e| FnError::new(0x600, format!("uni.schema.*: build batch: {e}")))
78}
79
80fn single_batch_stream(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
81 Box::pin(RecordBatchStreamAdapter::new(
82 schema,
83 stream::iter(vec![Ok(batch)]),
84 ))
85}
86
87#[derive(Debug)]
92struct SchemaLabelsProc;
93
94fn schema_labels_signature() -> &'static ProcedureSignature {
95 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
96 SIG.get_or_init(|| ProcedureSignature {
97 args: vec![],
98 yields: vec![
99 Field::new("label", DataType::Utf8, true),
100 Field::new("propertyCount", DataType::Int64, true),
101 Field::new("nodeCount", DataType::Int64, true),
102 Field::new("indexCount", DataType::Int64, true),
103 ],
104 mode: ProcedureMode::Read,
105 side_effects: SideEffects::ReadOnly,
106 retry_contract: None,
107 batch_input: None,
108 docs: "List every label with property / node / index counts.".to_owned(),
109 })
110}
111
112impl ProcedurePlugin for SchemaLabelsProc {
113 fn signature(&self) -> &ProcedureSignature {
114 schema_labels_signature()
115 }
116
117 fn invoke(
118 &self,
119 ctx: ProcedureContext<'_>,
120 _args: &[ColumnarValue],
121 ) -> Result<SendableRecordBatchStream, FnError> {
122 let host = require_host(&ctx)?;
123 let storage = Arc::clone(host.storage());
124 let stream = futures::stream::once(async move {
125 let uni_schema = storage.schema_manager().schema();
126 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
127 for label_name in uni_schema.labels.keys() {
128 let prop_count = uni_schema
129 .properties
130 .get(label_name)
131 .map(|p| p.len() as i64)
132 .unwrap_or(0);
133 let node_count = if let Ok(ds) = storage.vertex_dataset(label_name) {
134 if let Ok(raw) = ds.open_raw().await {
135 raw.count_rows(None).await.unwrap_or(0) as i64
136 } else {
137 0
138 }
139 } else {
140 0
141 };
142 let idx_count = uni_schema
143 .indexes
144 .iter()
145 .filter(|i| i.label() == label_name)
146 .count() as i64;
147 rows.push(HashMap::from([
148 ("label".to_owned(), Value::String(label_name.clone())),
149 ("propertyCount".to_owned(), Value::Int(prop_count)),
150 ("nodeCount".to_owned(), Value::Int(node_count)),
151 ("indexCount".to_owned(), Value::Int(idx_count)),
152 ]));
153 }
154 let schema = Arc::new(Schema::new(schema_labels_signature().yields.clone()));
155 rows_to_batch(rows, schema)
156 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))
157 });
158 let out_schema = Arc::new(Schema::new(schema_labels_signature().yields.clone()));
159 Ok(Box::pin(RecordBatchStreamAdapter::new(out_schema, stream)))
160 }
161}
162
163#[derive(Debug)]
168struct SchemaEdgeTypesProc;
169
170fn schema_edge_types_signature() -> &'static ProcedureSignature {
171 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
172 SIG.get_or_init(|| ProcedureSignature {
173 args: vec![],
174 yields: vec![
175 Field::new("type", DataType::Utf8, true),
176 Field::new("relationshipType", DataType::Utf8, true),
177 Field::new("sourceLabels", DataType::Utf8, true),
178 Field::new("targetLabels", DataType::Utf8, true),
179 Field::new("propertyCount", DataType::Int64, true),
180 ],
181 mode: ProcedureMode::Read,
182 side_effects: SideEffects::ReadOnly,
183 retry_contract: None,
184 batch_input: None,
185 docs: "List every edge type with source / target labels and property count.".to_owned(),
186 })
187}
188
189impl ProcedurePlugin for SchemaEdgeTypesProc {
190 fn signature(&self) -> &ProcedureSignature {
191 schema_edge_types_signature()
192 }
193
194 fn invoke(
195 &self,
196 ctx: ProcedureContext<'_>,
197 _args: &[ColumnarValue],
198 ) -> Result<SendableRecordBatchStream, FnError> {
199 let host = require_host(&ctx)?;
200 let uni_schema = host.storage().schema_manager().schema();
201 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
202 for (type_name, meta) in &uni_schema.edge_types {
203 let prop_count = uni_schema
204 .properties
205 .get(type_name)
206 .map(|p| p.len() as i64)
207 .unwrap_or(0);
208 rows.push(HashMap::from([
209 ("type".to_owned(), Value::String(type_name.clone())),
210 (
211 "relationshipType".to_owned(),
212 Value::String(type_name.clone()),
213 ),
214 (
215 "sourceLabels".to_owned(),
216 Value::String(format!("{:?}", meta.src_labels)),
217 ),
218 (
219 "targetLabels".to_owned(),
220 Value::String(format!("{:?}", meta.dst_labels)),
221 ),
222 ("propertyCount".to_owned(), Value::Int(prop_count)),
223 ]));
224 }
225 let schema = Arc::new(Schema::new(schema_edge_types_signature().yields.clone()));
226 let batch = rows_to_batch(rows, schema.clone())?;
227 Ok(single_batch_stream(schema, batch))
228 }
229}
230
231#[derive(Debug)]
236struct SchemaIndexesProc;
237
238fn schema_indexes_signature() -> &'static ProcedureSignature {
239 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
240 SIG.get_or_init(|| ProcedureSignature {
241 args: vec![],
242 yields: vec![
243 Field::new("state", DataType::Utf8, true),
244 Field::new("name", DataType::Utf8, true),
245 Field::new("type", DataType::Utf8, true),
246 Field::new("label", DataType::Utf8, true),
247 Field::new("properties", DataType::Utf8, true),
248 ],
249 mode: ProcedureMode::Read,
250 side_effects: SideEffects::ReadOnly,
251 retry_contract: None,
252 batch_input: None,
253 docs: "List every index (Vector / FullText / Scalar / JsonFullText / Inverted).".to_owned(),
254 })
255}
256
257impl ProcedurePlugin for SchemaIndexesProc {
258 fn signature(&self) -> &ProcedureSignature {
259 schema_indexes_signature()
260 }
261
262 fn invoke(
263 &self,
264 ctx: ProcedureContext<'_>,
265 _args: &[ColumnarValue],
266 ) -> Result<SendableRecordBatchStream, FnError> {
267 use uni_common::core::schema::IndexDefinition;
268
269 let host = require_host(&ctx)?;
270 let uni_schema = host.storage().schema_manager().schema();
271 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
272 for idx in &uni_schema.indexes {
273 let (type_name, properties_json) = match idx {
274 IndexDefinition::Vector(v) => (
275 "VECTOR",
276 serde_json::to_string(&[&v.property]).unwrap_or_default(),
277 ),
278 IndexDefinition::FullText(f) => (
279 "FULLTEXT",
280 serde_json::to_string(&f.properties).unwrap_or_default(),
281 ),
282 IndexDefinition::Scalar(s) => (
283 "SCALAR",
284 serde_json::to_string(&s.properties).unwrap_or_default(),
285 ),
286 IndexDefinition::JsonFullText(j) => (
287 "JSON_FTS",
288 serde_json::to_string(&[&j.column]).unwrap_or_default(),
289 ),
290 IndexDefinition::Inverted(inv) => (
291 "INVERTED",
292 serde_json::to_string(&[&inv.property]).unwrap_or_default(),
293 ),
294 _ => ("UNKNOWN", String::new()),
295 };
296 rows.push(HashMap::from([
297 ("state".to_owned(), Value::String("ONLINE".to_owned())),
298 ("name".to_owned(), Value::String(idx.name().to_owned())),
299 ("type".to_owned(), Value::String(type_name.to_owned())),
300 ("label".to_owned(), Value::String(idx.label().to_owned())),
301 ("properties".to_owned(), Value::String(properties_json)),
302 ]));
303 }
304 let schema = Arc::new(Schema::new(schema_indexes_signature().yields.clone()));
305 let batch = rows_to_batch(rows, schema.clone())?;
306 Ok(single_batch_stream(schema, batch))
307 }
308}
309
310#[derive(Debug)]
315struct SchemaConstraintsProc;
316
317fn schema_constraints_signature() -> &'static ProcedureSignature {
318 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
319 SIG.get_or_init(|| ProcedureSignature {
320 args: vec![],
321 yields: vec![
322 Field::new("name", DataType::Utf8, true),
323 Field::new("enabled", DataType::Boolean, true),
324 Field::new("type", DataType::Utf8, true),
325 Field::new("properties", DataType::Utf8, true),
326 Field::new("expression", DataType::Utf8, true),
327 Field::new("label", DataType::Utf8, true),
328 Field::new("relationshipType", DataType::Utf8, true),
329 Field::new("target", DataType::Utf8, true),
330 ],
331 mode: ProcedureMode::Read,
332 side_effects: SideEffects::ReadOnly,
333 retry_contract: None,
334 batch_input: None,
335 docs: "List every constraint (Unique / Exists / Check) per label or edge type.".to_owned(),
336 })
337}
338
339impl ProcedurePlugin for SchemaConstraintsProc {
340 fn signature(&self) -> &ProcedureSignature {
341 schema_constraints_signature()
342 }
343
344 fn invoke(
345 &self,
346 ctx: ProcedureContext<'_>,
347 _args: &[ColumnarValue],
348 ) -> Result<SendableRecordBatchStream, FnError> {
349 use uni_common::core::schema::{ConstraintTarget, ConstraintType};
350
351 let host = require_host(&ctx)?;
352 let uni_schema = host.storage().schema_manager().schema();
353 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
354 for c in &uni_schema.constraints {
355 let mut row: HashMap<String, Value> = HashMap::new();
356 row.insert("name".to_owned(), Value::String(c.name.clone()));
357 row.insert("enabled".to_owned(), Value::Bool(c.enabled));
358 match &c.constraint_type {
359 ConstraintType::Unique { properties } => {
360 row.insert("type".to_owned(), Value::String("UNIQUE".to_owned()));
361 row.insert(
362 "properties".to_owned(),
363 Value::String(serde_json::to_string(&properties).unwrap_or_default()),
364 );
365 }
366 ConstraintType::Exists { property } => {
367 row.insert("type".to_owned(), Value::String("EXISTS".to_owned()));
368 row.insert(
369 "properties".to_owned(),
370 Value::String(serde_json::to_string(&[&property]).unwrap_or_default()),
371 );
372 }
373 ConstraintType::Check { expression } => {
374 row.insert("type".to_owned(), Value::String("CHECK".to_owned()));
375 row.insert("expression".to_owned(), Value::String(expression.clone()));
376 }
377 _ => {
378 row.insert("type".to_owned(), Value::String("UNKNOWN".to_owned()));
379 }
380 }
381 match &c.target {
382 ConstraintTarget::Label(l) => {
383 row.insert("label".to_owned(), Value::String(l.clone()));
384 }
385 ConstraintTarget::EdgeType(t) => {
386 row.insert("relationshipType".to_owned(), Value::String(t.clone()));
387 }
388 _ => {
389 row.insert("target".to_owned(), Value::String("UNKNOWN".to_owned()));
390 }
391 }
392 rows.push(row);
393 }
394 let schema = Arc::new(Schema::new(schema_constraints_signature().yields.clone()));
395 let batch = rows_to_batch(rows, schema.clone())?;
396 Ok(single_batch_stream(schema, batch))
397 }
398}
399
400#[derive(Debug)]
405struct SchemaLabelInfoProc;
406
407fn schema_label_info_signature() -> &'static ProcedureSignature {
408 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
409 SIG.get_or_init(|| ProcedureSignature {
410 args: vec![NamedArgType {
411 name: smol_str::SmolStr::new("label"),
412 ty: ArgType::Primitive(DataType::Utf8),
413 default: None,
414 doc: "Label name to introspect.".to_owned(),
415 }],
416 yields: vec![
417 Field::new("property", DataType::Utf8, true),
418 Field::new("dataType", DataType::Utf8, true),
419 Field::new("nullable", DataType::Boolean, true),
420 Field::new("indexed", DataType::Boolean, true),
421 Field::new("unique", DataType::Boolean, true),
422 ],
423 mode: ProcedureMode::Read,
424 side_effects: SideEffects::ReadOnly,
425 retry_contract: None,
426 batch_input: None,
427 docs: "Per-property metadata (type, nullable, indexed, unique) for a given label."
428 .to_owned(),
429 })
430}
431
432impl ProcedurePlugin for SchemaLabelInfoProc {
433 fn signature(&self) -> &ProcedureSignature {
434 schema_label_info_signature()
435 }
436
437 fn invoke(
438 &self,
439 ctx: ProcedureContext<'_>,
440 args: &[ColumnarValue],
441 ) -> Result<SendableRecordBatchStream, FnError> {
442 use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
443
444 let host = require_host(&ctx)?;
445 let label_name = require_string_arg(args, 0, "label")?;
446 let uni_schema = host.storage().schema_manager().schema();
447
448 let mut rows: Vec<HashMap<String, Value>> = Vec::new();
449 if let Some(props) = uni_schema.properties.get(&label_name) {
450 for (prop_name, prop_meta) in props {
451 let is_indexed = uni_schema.indexes.iter().any(|idx| match idx {
452 IndexDefinition::Vector(v) => v.label == label_name && v.property == *prop_name,
453 IndexDefinition::Scalar(s) => {
454 s.label == label_name && s.properties.contains(prop_name)
455 }
456 IndexDefinition::FullText(f) => {
457 f.label == label_name && f.properties.contains(prop_name)
458 }
459 IndexDefinition::Inverted(inv) => {
460 inv.label == label_name && inv.property == *prop_name
461 }
462 IndexDefinition::JsonFullText(j) => j.label == label_name,
463 _ => false,
464 });
465 let unique = uni_schema.constraints.iter().any(|c| {
466 if let ConstraintTarget::Label(l) = &c.target
467 && l == &label_name
468 && c.enabled
469 && let ConstraintType::Unique { properties } = &c.constraint_type
470 {
471 return properties.contains(prop_name);
472 }
473 false
474 });
475 rows.push(HashMap::from([
476 ("property".to_owned(), Value::String(prop_name.clone())),
477 (
478 "dataType".to_owned(),
479 Value::String(format!("{:?}", prop_meta.r#type)),
480 ),
481 ("nullable".to_owned(), Value::Bool(prop_meta.nullable)),
482 ("indexed".to_owned(), Value::Bool(is_indexed)),
483 ("unique".to_owned(), Value::Bool(unique)),
484 ]));
485 }
486 }
487 let schema = Arc::new(Schema::new(schema_label_info_signature().yields.clone()));
488 let batch = rows_to_batch(rows, schema.clone())?;
489 Ok(single_batch_stream(schema, batch))
490 }
491}
492
493pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
505 r.procedure(
506 QName::new("uni", "schema.labels"),
507 schema_labels_signature().clone(),
508 Arc::new(SchemaLabelsProc),
509 )?;
510 let edge_types_impl: Arc<dyn ProcedurePlugin> = Arc::new(SchemaEdgeTypesProc);
511 r.procedure(
512 QName::new("uni", "schema.edgeTypes"),
513 schema_edge_types_signature().clone(),
514 Arc::clone(&edge_types_impl),
515 )?;
516 r.procedure(
517 QName::new("uni", "schema.relationshipTypes"),
518 schema_edge_types_signature().clone(),
519 edge_types_impl,
520 )?;
521 r.procedure(
522 QName::new("uni", "schema.indexes"),
523 schema_indexes_signature().clone(),
524 Arc::new(SchemaIndexesProc),
525 )?;
526 r.procedure(
527 QName::new("uni", "schema.constraints"),
528 schema_constraints_signature().clone(),
529 Arc::new(SchemaConstraintsProc),
530 )?;
531 r.procedure(
532 QName::new("uni", "schema.labelInfo"),
533 schema_label_info_signature().clone(),
534 Arc::new(SchemaLabelInfoProc),
535 )?;
536 Ok(())
537}