1use std::sync::{Arc, OnceLock};
19use std::time::SystemTime;
20
21use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray};
22use arrow_schema::{DataType, Field, Schema, SchemaRef};
23use datafusion::execution::SendableRecordBatchStream;
24use datafusion::logical_expr::ColumnarValue;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::scalar::ScalarValue;
27use uni_algo::{ProjectionInput, parse_graph_ref};
28use uni_plugin::traits::procedure::{
29 NamedArgType, ProcedureContext, ProcedureMode, ProcedurePlugin, ProcedureSignature,
30};
31use uni_plugin::traits::scalar::ArgType;
32use uni_plugin::{FnError, PluginError, PluginRegistrar, QName, SideEffects};
33
34use crate::projection_store::{ProjectionEntry, ProjectionSourceKind, estimate_bytes, for_storage};
35use crate::query::executor::procedure_host::QueryProcedureHost;
36
37pub fn register_into(r: &mut PluginRegistrar<'_>) -> Result<(), PluginError> {
46 r.procedure(
47 QName::new("uni", "graph.project"),
48 ProjectProcedure::signature_static().clone(),
49 Arc::new(ProjectProcedure),
50 )?;
51 r.procedure(
52 QName::new("uni", "graph.drop"),
53 DropProcedure::signature_static().clone(),
54 Arc::new(DropProcedure),
55 )?;
56 r.procedure(
57 QName::new("uni", "graph.list"),
58 ListProcedure::signature_static().clone(),
59 Arc::new(ListProcedure),
60 )?;
61 r.procedure(
62 QName::new("uni", "graph.exists"),
63 ExistsProcedure::signature_static().clone(),
64 Arc::new(ExistsProcedure),
65 )?;
66 Ok(())
67}
68
69fn require_host<'a>(ctx: &ProcedureContext<'a>) -> Result<&'a QueryProcedureHost, FnError> {
72 ctx.host
73 .and_then(|h| h.as_any().downcast_ref::<QueryProcedureHost>())
74 .ok_or_else(|| FnError::new(0x701, "uni.graph.*: requires QueryProcedureHost"))
75}
76
77fn arg_to_json(cv: &ColumnarValue) -> serde_json::Value {
81 match cv {
82 ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(b)))
83 | ColumnarValue::Scalar(ScalarValue::Binary(Some(b))) => {
84 serde_json::from_slice::<serde_json::Value>(b).unwrap_or(serde_json::Value::Null)
85 }
86 ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
87 | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => {
88 serde_json::Value::String(s.clone())
89 }
90 ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => serde_json::Value::Bool(*b),
91 ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => {
92 serde_json::Value::Number((*i).into())
93 }
94 _ => serde_json::Value::Null,
95 }
96}
97
98fn arg_as_string(cv: &ColumnarValue) -> Option<String> {
99 match cv {
100 ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
101 | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => Some(s.clone()),
102 _ => None,
103 }
104}
105
106fn one_row_stream(
107 schema: SchemaRef,
108 cols: Vec<ArrayRef>,
109) -> Result<SendableRecordBatchStream, FnError> {
110 let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
111 .map_err(|e| FnError::new(0x830, format!("RecordBatch build: {e}")))?;
112 let stream =
113 futures::stream::once(async move { Ok::<_, datafusion::error::DataFusionError>(batch) });
114 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
115}
116
117#[derive(Debug)]
127pub struct ProjectProcedure;
128
129impl ProjectProcedure {
130 fn signature_static() -> &'static ProcedureSignature {
131 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
132 SIG.get_or_init(|| ProcedureSignature {
133 args: vec![
134 NamedArgType {
135 name: smol_str::SmolStr::new("name"),
136 ty: ArgType::Primitive(DataType::Utf8),
137 default: None,
138 doc: "Name to register the materialised projection under.".to_owned(),
139 },
140 NamedArgType {
141 name: smol_str::SmolStr::new("graphRef"),
142 ty: ArgType::Primitive(DataType::LargeBinary),
143 default: None,
144 doc: "Native or Cypher projection descriptor (Map).".to_owned(),
145 },
146 NamedArgType {
147 name: smol_str::SmolStr::new("config"),
148 ty: ArgType::Primitive(DataType::LargeBinary),
149 default: Some(ScalarValue::LargeBinary(Some(b"{}".to_vec()))),
150 doc: "Materialisation options (currently unused).".to_owned(),
151 },
152 ],
153 yields: vec![
154 Field::new("name", DataType::Utf8, false),
155 Field::new("node_count", DataType::Int64, false),
156 Field::new("edge_count", DataType::Int64, false),
157 Field::new("bytes", DataType::Int64, false),
158 ],
159 mode: ProcedureMode::Read, side_effects: SideEffects::ReadOnly,
161 retry_contract: None,
162 batch_input: None,
163 docs: "uni.graph.project(name, graphRef, config) — materialise \
164 a named graph projection from a Native or Cypher graphRef \
165 (no Named-of-Named). v1: in-memory, restart-clears."
166 .to_owned(),
167 })
168 }
169}
170
171impl ProcedurePlugin for ProjectProcedure {
172 fn signature(&self) -> &ProcedureSignature {
173 Self::signature_static()
174 }
175
176 fn invoke(
177 &self,
178 ctx: ProcedureContext<'_>,
179 args: &[ColumnarValue],
180 ) -> Result<SendableRecordBatchStream, FnError> {
181 let host = require_host(&ctx)?.clone();
182 let name = args
183 .first()
184 .and_then(arg_as_string)
185 .ok_or_else(|| FnError::new(0x824, "uni.graph.project: name (String) required"))?;
186 let graph_ref = args
187 .get(1)
188 .map(arg_to_json)
189 .ok_or_else(|| FnError::new(0x824, "uni.graph.project: graphRef (Map) required"))?;
190 let projection_input = parse_graph_ref(&graph_ref)
191 .map_err(|e| FnError::new(0x820, format!("graphRef parse: {e}")))?;
192
193 let store = for_storage(host.storage());
198 if store.contains(&name) {
199 return Err(FnError::new(
200 0x824,
201 format!("uni.graph.project: projection `{name}` already exists; drop first"),
202 ));
203 }
204 if let ProjectionInput::Named { .. } = &projection_input {
205 return Err(FnError::new(
206 0x824,
207 "uni.graph.project: graphRef cannot itself be Named \
208 (no projection-of-a-projection in v1)",
209 ));
210 }
211
212 let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
213 let name_for_async = name.clone();
214 let store_for_async = Arc::clone(&store);
215
216 let schema_in_fut = Arc::clone(&schema);
217 let fut = async move {
218 let (projection, source_kind) = match projection_input {
219 ProjectionInput::Native {
220 node_labels,
221 edge_types,
222 weight_property,
223 include_reverse,
224 } => {
225 let storage = Arc::clone(host.storage());
226 let l0 = build_l0_manager(&host);
227 let mut builder = uni_algo::ProjectionBuilder::new(storage)
228 .l0_manager(l0)
229 .node_labels(&node_labels.iter().map(String::as_str).collect::<Vec<_>>())
230 .edge_types(&edge_types.iter().map(String::as_str).collect::<Vec<_>>())
231 .include_reverse(include_reverse);
232 if let Some(wp) = weight_property {
233 builder = builder.weight_property(&wp);
234 }
235 let projection = builder.build().await.map_err(|e| {
236 datafusion::error::DataFusionError::Execution(format!(
237 "uni.graph.project (Native): {e}"
238 ))
239 })?;
240 (projection, ProjectionSourceKind::Native)
241 }
242 ProjectionInput::Cypher {
243 node_query,
244 edge_query,
245 weight_column,
246 include_reverse,
247 } => {
248 let inner_params = std::collections::HashMap::new();
249 let node_rows = host
250 .execute_inner_query(
251 &node_query,
252 &inner_params,
253 uni_plugin::traits::procedure::ProcedureMode::Read,
254 )
255 .await
256 .map_err(|e| {
257 datafusion::error::DataFusionError::Execution(format!(
258 "uni.graph.project node query: {e}"
259 ))
260 })?;
261 let edge_rows = host
262 .execute_inner_query(
263 &edge_query,
264 &inner_params,
265 uni_plugin::traits::procedure::ProcedureMode::Read,
266 )
267 .await
268 .map_err(|e| {
269 datafusion::error::DataFusionError::Execution(format!(
270 "uni.graph.project edge query: {e}"
271 ))
272 })?;
273 let projection = uni_algo::algo::projection::GraphProjection::from_rows(
274 &node_rows,
275 &edge_rows,
276 weight_column.as_deref(),
277 include_reverse,
278 )
279 .map_err(|e| {
280 datafusion::error::DataFusionError::Execution(format!(
281 "uni.graph.project (Cypher): {e}"
282 ))
283 })?;
284 (projection, ProjectionSourceKind::Cypher)
285 }
286 ProjectionInput::Named { .. } => unreachable!("filtered above"),
287 };
288 let node_count = projection.vertex_count();
289 let edge_count = projection.edge_count();
290 let bytes = estimate_bytes(&projection);
291 let entry = ProjectionEntry {
292 projection: Arc::new(projection),
293 node_count,
294 edge_count,
295 bytes,
296 created_at: SystemTime::now(),
297 source_kind,
298 };
299 store_for_async
300 .insert(name_for_async.clone(), entry)
301 .map_err(|n| {
302 datafusion::error::DataFusionError::Execution(format!(
303 "uni.graph.project: projection `{n}` already exists"
304 ))
305 })?;
306
307 let cols: Vec<ArrayRef> = vec![
308 Arc::new(StringArray::from(vec![name_for_async])),
309 Arc::new(Int64Array::from(vec![node_count as i64])),
310 Arc::new(Int64Array::from(vec![edge_count as i64])),
311 Arc::new(Int64Array::from(vec![bytes as i64])),
312 ];
313 RecordBatch::try_new(schema_in_fut, cols).map_err(|e| {
314 datafusion::error::DataFusionError::Execution(format!("RecordBatch: {e}"))
315 })
316 };
317 let stream = futures::stream::once(fut);
318 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
319 }
320}
321
322#[derive(Debug)]
327pub struct DropProcedure;
328
329impl DropProcedure {
330 fn signature_static() -> &'static ProcedureSignature {
331 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
332 SIG.get_or_init(|| ProcedureSignature {
333 args: vec![NamedArgType {
334 name: smol_str::SmolStr::new("name"),
335 ty: ArgType::Primitive(DataType::Utf8),
336 default: None,
337 doc: "Projection name to evict from the store.".to_owned(),
338 }],
339 yields: vec![Field::new("dropped", DataType::Boolean, false)],
340 mode: ProcedureMode::Read,
341 side_effects: SideEffects::ReadOnly,
342 retry_contract: None,
343 batch_input: None,
344 docs: "uni.graph.drop(name) — remove a named projection. Returns \
345 `false` if no projection by that name existed."
346 .to_owned(),
347 })
348 }
349}
350
351impl ProcedurePlugin for DropProcedure {
352 fn signature(&self) -> &ProcedureSignature {
353 Self::signature_static()
354 }
355
356 fn invoke(
357 &self,
358 ctx: ProcedureContext<'_>,
359 args: &[ColumnarValue],
360 ) -> Result<SendableRecordBatchStream, FnError> {
361 let host = require_host(&ctx)?;
362 let name = args
363 .first()
364 .and_then(arg_as_string)
365 .ok_or_else(|| FnError::new(0x824, "uni.graph.drop: name (String) required"))?;
366 let dropped = for_storage(host.storage()).drop_by_name(&name);
367 let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
368 let cols: Vec<ArrayRef> = vec![Arc::new(BooleanArray::from(vec![dropped]))];
369 one_row_stream(schema, cols)
370 }
371}
372
373#[derive(Debug)]
378pub struct ListProcedure;
379
380impl ListProcedure {
381 fn signature_static() -> &'static ProcedureSignature {
382 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
383 SIG.get_or_init(|| ProcedureSignature {
384 args: vec![],
385 yields: vec![
386 Field::new("name", DataType::Utf8, false),
387 Field::new("node_count", DataType::Int64, false),
388 Field::new("edge_count", DataType::Int64, false),
389 Field::new("bytes", DataType::Int64, false),
390 Field::new("created_at_ms", DataType::Int64, false),
395 Field::new("source_kind", DataType::Utf8, false),
396 ],
397 mode: ProcedureMode::Read,
398 side_effects: SideEffects::ReadOnly,
399 retry_contract: None,
400 batch_input: None,
401 docs: "uni.graph.list — one row per stored projection. \
402 `source_kind` is `Native` or `Cypher`."
403 .to_owned(),
404 })
405 }
406}
407
408impl ProcedurePlugin for ListProcedure {
409 fn signature(&self) -> &ProcedureSignature {
410 Self::signature_static()
411 }
412
413 fn invoke(
414 &self,
415 ctx: ProcedureContext<'_>,
416 _args: &[ColumnarValue],
417 ) -> Result<SendableRecordBatchStream, FnError> {
418 let host = require_host(&ctx)?;
419 let entries = for_storage(host.storage()).list();
420 let mut names = Vec::with_capacity(entries.len());
421 let mut nodes = Vec::with_capacity(entries.len());
422 let mut edges = Vec::with_capacity(entries.len());
423 let mut bytes = Vec::with_capacity(entries.len());
424 let mut created = Vec::with_capacity(entries.len());
425 let mut kinds = Vec::with_capacity(entries.len());
426 for (name, e) in entries {
427 names.push(name);
428 nodes.push(e.node_count as i64);
429 edges.push(e.edge_count as i64);
430 bytes.push(e.bytes as i64);
431 let ms = e
432 .created_at
433 .duration_since(std::time::UNIX_EPOCH)
434 .map(|d| d.as_millis() as i64)
435 .unwrap_or(0);
436 created.push(ms);
437 kinds.push(e.source_kind.as_str().to_owned());
438 }
439 let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
440 let cols: Vec<ArrayRef> = vec![
441 Arc::new(StringArray::from(names)),
442 Arc::new(Int64Array::from(nodes)),
443 Arc::new(Int64Array::from(edges)),
444 Arc::new(Int64Array::from(bytes)),
445 Arc::new(Int64Array::from(created)),
446 Arc::new(StringArray::from(kinds)),
447 ];
448 let batch = RecordBatch::try_new(Arc::clone(&schema), cols)
449 .map_err(|e| FnError::new(0x830, format!("RecordBatch build: {e}")))?;
450 let stream =
451 futures::stream::once(
452 async move { Ok::<_, datafusion::error::DataFusionError>(batch) },
453 );
454 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
455 }
456}
457
458#[derive(Debug)]
463pub struct ExistsProcedure;
464
465impl ExistsProcedure {
466 fn signature_static() -> &'static ProcedureSignature {
467 static SIG: OnceLock<ProcedureSignature> = OnceLock::new();
468 SIG.get_or_init(|| ProcedureSignature {
469 args: vec![NamedArgType {
470 name: smol_str::SmolStr::new("name"),
471 ty: ArgType::Primitive(DataType::Utf8),
472 default: None,
473 doc: "Projection name to probe.".to_owned(),
474 }],
475 yields: vec![Field::new("exists", DataType::Boolean, false)],
476 mode: ProcedureMode::Read,
477 side_effects: SideEffects::ReadOnly,
478 retry_contract: None,
479 batch_input: None,
480 docs: "uni.graph.exists(name) — `true` iff a projection by that \
481 name is currently in the store."
482 .to_owned(),
483 })
484 }
485}
486
487impl ProcedurePlugin for ExistsProcedure {
488 fn signature(&self) -> &ProcedureSignature {
489 Self::signature_static()
490 }
491
492 fn invoke(
493 &self,
494 ctx: ProcedureContext<'_>,
495 args: &[ColumnarValue],
496 ) -> Result<SendableRecordBatchStream, FnError> {
497 let host = require_host(&ctx)?;
498 let name = args
499 .first()
500 .and_then(arg_as_string)
501 .ok_or_else(|| FnError::new(0x824, "uni.graph.exists: name (String) required"))?;
502 let exists = for_storage(host.storage()).contains(&name);
503 let schema: SchemaRef = Arc::new(Schema::new(Self::signature_static().yields.clone()));
504 let cols: Vec<ArrayRef> = vec![Arc::new(BooleanArray::from(vec![exists]))];
505 one_row_stream(schema, cols)
506 }
507}
508
509fn build_l0_manager(host: &QueryProcedureHost) -> Option<Arc<uni_store::runtime::L0Manager>> {
515 use uni_store::runtime::L0Manager;
516 let l0_ctx = host.l0_context();
517 l0_ctx.current_l0.as_ref().map(|current| {
518 let mut pending = l0_ctx.pending_flush_l0s.clone();
519 if let Some(tx_l0) = &l0_ctx.transaction_l0 {
520 pending.push(tx_l0.clone());
521 }
522 Arc::new(L0Manager::from_snapshot(current.clone(), pending))
523 })
524}