Skip to main content

uni_query/procedures_plugin/
graph.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! `uni.graph.{project, drop, list, exists}` procedures — named
5//! graph-projection lifecycle (M5c.4 / proposal §4.10.3).
6//!
7//! Projections live in the per-`StorageManager` [`crate::projection_store::ProjectionStore`]
8//! (see `crates/uni-query/src/projection_store.rs`). v1 is in-memory
9//! only — restart clears every projection — and the only eviction
10//! mechanism is `uni.graph.drop`.
11//!
12//! The procedures live in `uni-query` (not `uni-plugin-builtin`)
13//! because they need to call `QueryProcedureHost::execute_inner_query`
14//! and `crate::projection_store::for_storage`, both of which are uni-
15//! query types that `uni-plugin-builtin` cannot reach without an
16//! inverted dependency.
17
18use std::sync::{Arc, OnceLock};
19use std::time::SystemTime;
20
21use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use datafusion::execution::SendableRecordBatchStream;
24use datafusion::logical_expr::ColumnarValue;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::scalar::ScalarValue;
27use uni_algo::{ProjectionInput, parse_graph_ref};
28use uni_plugin::traits::procedure::{
29    NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
30};
31use uni_plugin::traits::scalar::ArgType;
32use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
33
34use crate::projection_store::{ProjectionEntry, ProjectionSourceKind, estimate_bytes, for_storage};
35use crate::query::executor::procedure_host::QueryProcedureHost;
36
37// Rust guideline compliant
38
39/// Register every `uni.graph.*` procedure into `r`.
40///
41/// # Errors
42///
43/// Propagates [`PluginError::DuplicateRegistration`] if any qname is
44/// already taken in the underlying plugin registry.
45pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
46    r.procedure(
47        QName::new("uni", "graph.project"),
48        ProjectProcedure::signature_static().clone(),
49        Arc::new(ProjectProcedure),
50    )?;
51    r.procedure(
52        QName::new("uni", "graph.drop"),
53        DropProcedure::signature_static().clone(),
54        Arc::new(DropProcedure),
55    )?;
56    r.procedure(
57        QName::new("uni", "graph.list"),
58        ListProcedure::signature_static().clone(),
59        Arc::new(ListProcedure),
60    )?;
61    r.procedure(
62        QName::new("uni", "graph.exists"),
63        ExistsProcedure::signature_static().clone(),
64        Arc::new(ExistsProcedure),
65    )?;
66    Ok(())
67}
68
69// ─────────────────────────── helpers ──────────────────────────────
70
71fn require_host<'a>(ctx: &ProcedureContext<'a>) -> Result<&'a QueryProcedureHost, FnError> {
72    ctx.host
73        .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
74        .ok_or_else(|| FnError::new(0x701, "uni.graph.*: requires QueryProcedureHost"))
75}
76
77/// Decode a positional arg into a `serde_json::Value`. Mirrors the
78/// algo adapter's decoder (LargeBinary → JSON for Map / List; scalars
79/// pass through).
80fn arg_to_json(cv: &ColumnarValue) -> serde_json::Value {
81    match cv {
82        ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(b)))
83        | ColumnarValue::Scalar(ScalarValue::Binary(Some(b))) => {
84            serde_json::from_slice::<serde_json::Value>(b).unwrap_or(serde_json::Value::Null)
85        }
86        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
87        | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => {
88            serde_json::Value::String(s.clone())
89        }
90        ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => serde_json::Value::Bool(*b),
91        ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => {
92            serde_json::Value::Number((*i).into())
93        }
94        _ => serde_json::Value::Null,
95    }
96}
97
98fn arg_as_string(cv: &ColumnarValue) -> Option<String> {
99    match cv {
100        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
101        | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => Some(s.clone()),
102        _ => None,
103    }
104}
105
106fn one_row_stream(
107    schema: SchemaRef,
108    cols: Vec<ArrayRef>,
109) -> Result<SendableRecordBatchStream, FnError> {
110    let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
111        .map_err(|e| FnError::new(0x830, format!("RecordBatch build: {e}")))?;
112    let stream =
113        futures::stream::once(async move { Ok::<_, datafusion::error::DataFusionError>(batch) });
114    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
115}
116
117// ─────────────────────────── uni.graph.project ──────────────────────────────
118
119/// `uni.graph.project(name, graphRef, config) -> (name, node_count,
120/// edge_count, bytes)`.
121///
122/// Materialises the projection described by `graphRef` (Native or
123/// Cypher; Named is rejected — no projection-of-a-projection) and
124/// stores it under `name` in the per-`StorageManager` projection
125/// store. Duplicate names error with `FnError 0x824`.
126#[derive(Debug)]
127pub struct ProjectProcedure;
128
129impl ProjectProcedure {
130    fn signature_static() -> &'static ProcedureSignature {
131        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
132        SIG.get_or_init(|| ProcedureSignature {
133            args: vec![
134                NamedArgType {
135                    name: smol_str::SmolStr::new("name"),
136                    ty: ArgType::Primitive(DataType::Utf8),
137                    default: None,
138                    doc: "Name to register the materialised projection under.".to_owned(),
139                },
140                NamedArgType {
141                    name: smol_str::SmolStr::new("graphRef"),
142                    ty: ArgType::Primitive(DataType::LargeBinary),
143                    default: None,
144                    doc: "Native or Cypher projection descriptor (Map).".to_owned(),
145                },
146                NamedArgType {
147                    name: smol_str::SmolStr::new("config"),
148                    ty: ArgType::Primitive(DataType::LargeBinary),
149                    default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
150                    doc: "Materialisation options (currently unused).".to_owned(),
151                },
152            ],
153            yields: vec![
154                Field::new("name", DataType::Utf8, false),
155                Field::new("node_count", DataType::Int64, false),
156                Field::new("edge_count", DataType::Int64, false),
157                Field::new("bytes", DataType::Int64, false),
158            ],
159            mode: ProcedureMode::Read, // store mutation is in-memory, no graph-write
160            side_effects: SideEffects::ReadOnly,
161            retry_contract: None,
162            batch_input: None,
163            docs: "uni.graph.project(name, graphRef, config) — materialise \
164                   a named graph projection from a Native or Cypher graphRef \
165                   (no Named-of-Named). v1: in-memory, restart-clears."
166                .to_owned(),
167        })
168    }
169}
170
171impl ProcedurePlugin for ProjectProcedure {
172    fn signature(&self) -> &ProcedureSignature {
173        Self::signature_static()
174    }
175
176    fn invoke(
177        &self,
178        ctx: ProcedureContext<'_>,
179        args: &[ColumnarValue],
180    ) -> Result<SendableRecordBatchStream, FnError> {
181        let host = require_host(&ctx)?.clone();
182        let name = args
183            .first()
184            .and_then(arg_as_string)
185            .ok_or_else(|| FnError::new(0x824, "uni.graph.project: name (String) required"))?;
186        let graph_ref = args
187            .get(1)
188            .map(arg_to_json)
189            .ok_or_else(|| FnError::new(0x824, "uni.graph.project: graphRef (Map) required"))?;
190        let projection_input = parse_graph_ref(&graph_ref)
191            .map_err(|e| FnError::new(0x820, format!("graphRef parse: {e}")))?;
192
193        // Pre-check duplicates eagerly so the caller sees the error
194        // synchronously (matches `uni.graph.exists` ordering); the
195        // actual materialisation work is async and runs inside the
196        // result stream.
197        let store = for_storage(host.storage());
198        if store.contains(&name) {
199            return Err(FnError::new(
200                0x824,
201                format!("uni.graph.project: projection `{name}` already exists; drop first"),
202            ));
203        }
204        if let ProjectionInput::Named { .. } = &projection_input {
205            return Err(FnError::new(
206                0x824,
207                "uni.graph.project: graphRef cannot itself be Named \
208                 (no projection-of-a-projection in v1)",
209            ));
210        }
211
212        let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
213        let name_for_async = name.clone();
214        let store_for_async = Arc::clone(&store);
215
216        let schema_in_fut = Arc::clone(&schema);
217        let fut = async move {
218            let (projection, source_kind) = match projection_input {
219                ProjectionInput::Native {
220                    node_labels,
221                    edge_types,
222                    weight_property,
223                    include_reverse,
224                } => {
225                    let storage = Arc::clone(host.storage());
226                    let l0 = build_l0_manager(&host);
227                    let mut builder = uni_algo::ProjectionBuilder::new(storage)
228                        .l0_manager(l0)
229                        .node_labels(&node_labels.iter().map(String::as_str).collect::<Vec<_>>())
230                        .edge_types(&edge_types.iter().map(String::as_str).collect::<Vec<_>>())
231                        .include_reverse(include_reverse);
232                    if let Some(wp) = weight_property {
233                        builder = builder.weight_property(&wp);
234                    }
235                    let projection = builder.build().await.map_err(|e| {
236                        datafusion::error::DataFusionError::Execution(format!(
237                            "uni.graph.project (Native): {e}"
238                        ))
239                    })?;
240                    (projection, ProjectionSourceKind::Native)
241                }
242                ProjectionInput::Cypher {
243                    node_query,
244                    edge_query,
245                    weight_column,
246                    include_reverse,
247                } => {
248                    let inner_params = std::collections::HashMap::new();
249                    let node_rows = host
250                        .execute_inner_query(
251                            &node_query,
252                            &inner_params,
253                            uni_plugin::traits::procedure::ProcedureMode::Read,
254                        )
255                        .await
256                        .map_err(|e| {
257                            datafusion::error::DataFusionError::Execution(format!(
258                                "uni.graph.project node query: {e}"
259                            ))
260                        })?;
261                    let edge_rows = host
262                        .execute_inner_query(
263                            &edge_query,
264                            &inner_params,
265                            uni_plugin::traits::procedure::ProcedureMode::Read,
266                        )
267                        .await
268                        .map_err(|e| {
269                            datafusion::error::DataFusionError::Execution(format!(
270                                "uni.graph.project edge query: {e}"
271                            ))
272                        })?;
273                    let projection = uni_algo::algo::projection::GraphProjection::from_rows(
274                        &node_rows,
275                        &edge_rows,
276                        weight_column.as_deref(),
277                        include_reverse,
278                    )
279                    .map_err(|e| {
280                        datafusion::error::DataFusionError::Execution(format!(
281                            "uni.graph.project (Cypher): {e}"
282                        ))
283                    })?;
284                    (projection, ProjectionSourceKind::Cypher)
285                }
286                ProjectionInput::Named { .. } => unreachable!("filtered above"),
287            };
288            let node_count = projection.vertex_count();
289            let edge_count = projection.edge_count();
290            let bytes = estimate_bytes(&projection);
291            let entry = ProjectionEntry {
292                projection: Arc::new(projection),
293                node_count,
294                edge_count,
295                bytes,
296                created_at: SystemTime::now(),
297                source_kind,
298            };
299            store_for_async
300                .insert(name_for_async.clone(), entry)
301                .map_err(|n| {
302                    datafusion::error::DataFusionError::Execution(format!(
303                        "uni.graph.project: projection `{n}` already exists"
304                    ))
305                })?;
306
307            let cols: Vec<ArrayRef> = vec![
308                Arc::new(StringArray::from(vec![name_for_async])),
309                Arc::new(Int64Array::from(vec![node_count as i64])),
310                Arc::new(Int64Array::from(vec![edge_count as i64])),
311                Arc::new(Int64Array::from(vec![bytes as i64])),
312            ];
313            RecordBatch::try_new(schema_in_fut, cols).map_err(|e| {
314                datafusion::error::DataFusionError::Execution(format!("RecordBatch: {e}"))
315            })
316        };
317        let stream = futures::stream::once(fut);
318        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
319    }
320}
321
322// ─────────────────────────── uni.graph.drop ──────────────────────────────
323
324/// `uni.graph.drop(name) -> (dropped)`. Returns `false` when no
325/// projection by that name existed (not an error).
326#[derive(Debug)]
327pub struct DropProcedure;
328
329impl DropProcedure {
330    fn signature_static() -> &'static ProcedureSignature {
331        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
332        SIG.get_or_init(|| ProcedureSignature {
333            args: vec![NamedArgType {
334                name: smol_str::SmolStr::new("name"),
335                ty: ArgType::Primitive(DataType::Utf8),
336                default: None,
337                doc: "Projection name to evict from the store.".to_owned(),
338            }],
339            yields: vec![Field::new("dropped", DataType::Boolean, false)],
340            mode: ProcedureMode::Read,
341            side_effects: SideEffects::ReadOnly,
342            retry_contract: None,
343            batch_input: None,
344            docs: "uni.graph.drop(name) — remove a named projection. Returns \
345                   `false` if no projection by that name existed."
346                .to_owned(),
347        })
348    }
349}
350
351impl ProcedurePlugin for DropProcedure {
352    fn signature(&self) -> &ProcedureSignature {
353        Self::signature_static()
354    }
355
356    fn invoke(
357        &self,
358        ctx: ProcedureContext<'_>,
359        args: &[ColumnarValue],
360    ) -> Result<SendableRecordBatchStream, FnError> {
361        let host = require_host(&ctx)?;
362        let name = args
363            .first()
364            .and_then(arg_as_string)
365            .ok_or_else(|| FnError::new(0x824, "uni.graph.drop: name (String) required"))?;
366        let dropped = for_storage(host.storage()).drop_by_name(&name);
367        let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
368        let cols: Vec<ArrayRef> = vec![Arc::new(BooleanArray::from(vec![dropped]))];
369        one_row_stream(schema, cols)
370    }
371}
372
373// ─────────────────────────── uni.graph.list ──────────────────────────────
374
375/// `uni.graph.list() -> (name, node_count, edge_count, bytes,
376/// created_at_ms, source_kind)`.
377#[derive(Debug)]
378pub struct ListProcedure;
379
380impl ListProcedure {
381    fn signature_static() -> &'static ProcedureSignature {
382        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
383        SIG.get_or_init(|| ProcedureSignature {
384            args: vec![],
385            yields: vec![
386                Field::new("name", DataType::Utf8, false),
387                Field::new("node_count", DataType::Int64, false),
388                Field::new("edge_count", DataType::Int64, false),
389                Field::new("bytes", DataType::Int64, false),
390                // Wall-clock instant the projection was materialised,
391                // as a plain Int64 millisecond count (more portable
392                // through Cypher than `DataType::Timestamp`, which the
393                // simple-executor scalar decoder doesn't translate).
394                Field::new("created_at_ms", DataType::Int64, false),
395                Field::new("source_kind", DataType::Utf8, false),
396            ],
397            mode: ProcedureMode::Read,
398            side_effects: SideEffects::ReadOnly,
399            retry_contract: None,
400            batch_input: None,
401            docs: "uni.graph.list — one row per stored projection. \
402                   `source_kind` is `Native` or `Cypher`."
403                .to_owned(),
404        })
405    }
406}
407
408impl ProcedurePlugin for ListProcedure {
409    fn signature(&self) -> &ProcedureSignature {
410        Self::signature_static()
411    }
412
413    fn invoke(
414        &self,
415        ctx: ProcedureContext<'_>,
416        _args: &[ColumnarValue],
417    ) -> Result<SendableRecordBatchStream, FnError> {
418        let host = require_host(&ctx)?;
419        let entries = for_storage(host.storage()).list();
420        let mut names = Vec::with_capacity(entries.len());
421        let mut nodes = Vec::with_capacity(entries.len());
422        let mut edges = Vec::with_capacity(entries.len());
423        let mut bytes = Vec::with_capacity(entries.len());
424        let mut created = Vec::with_capacity(entries.len());
425        let mut kinds = Vec::with_capacity(entries.len());
426        for (name, e) in entries {
427            names.push(name);
428            nodes.push(e.node_count as i64);
429            edges.push(e.edge_count as i64);
430            bytes.push(e.bytes as i64);
431            let ms = e
432                .created_at
433                .duration_since(std::time::UNIX_EPOCH)
434                .map(|d| d.as_millis() as i64)
435                .unwrap_or(0);
436            created.push(ms);
437            kinds.push(e.source_kind.as_str().to_owned());
438        }
439        let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
440        let cols: Vec<ArrayRef> = vec![
441            Arc::new(StringArray::from(names)),
442            Arc::new(Int64Array::from(nodes)),
443            Arc::new(Int64Array::from(edges)),
444            Arc::new(Int64Array::from(bytes)),
445            Arc::new(Int64Array::from(created)),
446            Arc::new(StringArray::from(kinds)),
447        ];
448        let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
449            .map_err(|e| FnError::new(0x830, format!("RecordBatch build: {e}")))?;
450        let stream =
451            futures::stream::once(
452                async move { Ok::<_, datafusion::error::DataFusionError>(batch) },
453            );
454        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
455    }
456}
457
458// ─────────────────────────── uni.graph.exists ──────────────────────────────
459
460/// `uni.graph.exists(name) -> (exists)`. Pure read; never errors on
461/// missing names.
462#[derive(Debug)]
463pub struct ExistsProcedure;
464
465impl ExistsProcedure {
466    fn signature_static() -> &'static ProcedureSignature {
467        static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
468        SIG.get_or_init(|| ProcedureSignature {
469            args: vec![NamedArgType {
470                name: smol_str::SmolStr::new("name"),
471                ty: ArgType::Primitive(DataType::Utf8),
472                default: None,
473                doc: "Projection name to probe.".to_owned(),
474            }],
475            yields: vec![Field::new("exists", DataType::Boolean, false)],
476            mode: ProcedureMode::Read,
477            side_effects: SideEffects::ReadOnly,
478            retry_contract: None,
479            batch_input: None,
480            docs: "uni.graph.exists(name) — `true` iff a projection by that \
481                   name is currently in the store."
482                .to_owned(),
483        })
484    }
485}
486
487impl ProcedurePlugin for ExistsProcedure {
488    fn signature(&self) -> &ProcedureSignature {
489        Self::signature_static()
490    }
491
492    fn invoke(
493        &self,
494        ctx: ProcedureContext<'_>,
495        args: &[ColumnarValue],
496    ) -> Result<SendableRecordBatchStream, FnError> {
497        let host = require_host(&ctx)?;
498        let name = args
499            .first()
500            .and_then(arg_as_string)
501            .ok_or_else(|| FnError::new(0x824, "uni.graph.exists: name (String) required"))?;
502        let exists = for_storage(host.storage()).contains(&name);
503        let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
504        let cols: Vec<ArrayRef> = vec![Arc::new(BooleanArray::from(vec![exists]))];
505        one_row_stream(schema, cols)
506    }
507}
508
509// ─────────────────────────── shared helpers ──────────────────────────────
510
511/// Build an `L0Manager` snapshot mirroring the host's L0 visibility so
512/// `uni.graph.project` (Native) sees the same recently-written rows
513/// the outer query would.
514fn build_l0_manager(host: &QueryProcedureHost) -> Option<Arc<uni_store::runtime::L0Manager>> {
515    use uni_store::runtime::L0Manager;
516    let l0_ctx = host.l0_context();
517    l0_ctx.current_l0.as_ref().map(|current| {
518        let mut pending = l0_ctx.pending_flush_l0s.clone();
519        if let Some(tx_l0) = &l0_ctx.transaction_l0 {
520            pending.push(tx_l0.clone());
521        }
522        Arc::new(L0Manager::from_snapshot(current.clone(), pending))
523    })
524}