uni_plugin_host/persistence.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! System-label persistence backends for `CustomPlugin`.
5//!
6//! Per proposal §9.7, declarations made via `uni.plugin.declareFunction`
7//! / `declareProcedure` / `declareAggregate` / `declareTrigger`
8//! survive restart by living in a host-owned system label
9//! `_DeclaredPlugin`.
10//!
11//! This module ships [`SystemLabelPersistence`] — the
12//! [`uni_plugin_custom::Persistence`] backend that the host's
13//! `Uni::build` flow installs by default. The backend persists
14//! declarations under `<data_path>/_system/declared_plugins.json`
15//! using the atomic write-then-rename pattern from
16//! [`uni_plugin_custom::JsonFilePersistence`]. The path is reserved
17//! under the database's directory tree so backup / restore tooling
18//! picks up declarations alongside graph data, and the
19//! [`uni_plugin_custom::DeclaredPlugin`] serde shape is identical to
20//! the `_DeclaredPlugin` system-label record (proposal §9.7), so the
21//! eventual cutover to Cypher-MERGE-through-`execute_inner_query`
22//! (M11 deliverable #8 follow-up) is a backend swap rather than a
23//! schema migration.
24//!
25//! The name and module placement (`uni-db`, not `uni-plugin-custom`)
26//! mark the layering: `SystemLabelPersistence` belongs at the host
27//! layer because the cutover-eventual implementation needs
28//! `QueryProcedureHost` (from `uni-query`), which `uni-plugin-custom`
29//! does not depend on.
30
31// Rust guideline compliant
32
33use std::path::{Path, PathBuf};
34use std::sync::{Arc, OnceLock};
35
36use uni_plugin_custom::{DeclaredPlugin, JsonFilePersistence, Persistence, PersistenceError};
37
38use crate::host::HostCypherExecutor;
39
40/// Lazy bridge from a `Persistence` backend to the host's write-mode
41/// Cypher executor.
42///
43/// Backends constructed at `register_builtin_plugins` time hold one of
44/// these. The host's `Uni::build` flow calls [`Self::set_host_executor`]
45/// **after** the host is fully constructed, at which point
46/// [`Self::try_write_cypher`] runs the supplied body via the host.
47///
48/// Holds the executor as a strong `Arc<dyn HostCypherExecutor>`; the
49/// executor itself only weakly references the host, so the
50/// persistence ↔ host cycle doesn't leak. Pre-wire `try_write_cypher`
51/// returns `Err(...)` — callers should treat that as "best-effort
52/// failed; record state via the durable sidecar instead."
53#[derive(Debug, Default)]
54pub struct LazyCypherSink {
55 host_executor: OnceLock<Arc<dyn HostCypherExecutor>>,
56}
57
58impl LazyCypherSink {
59 /// Construct an unwired sink.
60 #[must_use]
61 pub fn new() -> Self {
62 Self::default()
63 }
64
65 /// Wire the host-side Cypher executor. Idempotent.
66 pub fn set_host_executor(&self, exec: Arc<dyn HostCypherExecutor>) {
67 let _ = self.host_executor.set(exec);
68 }
69
70 /// Best-effort write-mode Cypher execution. Returns an error
71 /// string when the sink isn't yet wired or the statement fails;
72 /// the caller decides whether to retry or fall back to a durable
73 /// sidecar write.
74 ///
75 /// # Errors
76 ///
77 /// Returns a free-form error string when the sink isn't yet
78 /// wired, the host has been dropped, or the Cypher statement
79 /// fails to parse / commit. (The current-thread-runtime guard and
80 /// `block_in_place` handling live in the host's implementation.)
81 pub fn try_write_cypher(&self, cypher: &str) -> Result<(), String> {
82 let exec = self
83 .host_executor
84 .get()
85 .ok_or_else(|| "LazyCypherSink: host executor not wired".to_owned())?;
86 exec.execute_write_cypher(cypher)
87 }
88}
89
90/// Persistence backend for the host's declared-plugin records.
91///
92/// Today's implementation: a JSON sidecar at
93/// `<data_path>/_system/declared_plugins.json` (atomic
94/// write-then-rename; the same mechanism as
95/// [`JsonFilePersistence`]).
96///
97/// Cutover target: Cypher `MERGE (:_DeclaredPlugin {...})` through
98/// the write-enabled `QueryProcedureHost::execute_inner_query` shipped
99/// by A.1. The cutover preserves the [`DeclaredPlugin`] serde shape
100/// (per proposal §9.7), so swapping the backend is a no-op at the
101/// `Persistence` trait surface.
102#[derive(Debug)]
103pub struct SystemLabelPersistence {
104 inner: JsonFilePersistence,
105 sidecar_path: PathBuf,
106 /// Lazy sink to the write-mode Cypher executor; populated after
107 /// `Uni::build` returns. When wired, every `save` / `delete`
108 /// dual-writes to a `_DeclaredPlugin` graph node so the
109 /// declaration is visible to user `MATCH` queries
110 /// (`MATCH (p:_DeclaredPlugin) RETURN p`). The JSON sidecar
111 /// remains the source-of-truth for `load_all` at startup since
112 /// the cypher sink is unavailable before `Uni::build` finishes.
113 cypher_sink: Arc<LazyCypherSink>,
114}
115
116impl SystemLabelPersistence {
117 /// Construct rooted at `data_path/_system/declared_plugins.json`.
118 ///
119 /// `data_path` is the database's filesystem root (the URI passed
120 /// to `Uni::open` for local-disk instances).
121 /// For in-memory / object-store instances where no local root
122 /// exists, callers fall back to
123 /// [`uni_plugin_custom::NullPersistence`].
124 ///
125 /// # Examples
126 ///
127 /// ```
128 /// # use std::path::PathBuf;
129 /// # use uni_plugin_host::persistence::SystemLabelPersistence;
130 /// let p = SystemLabelPersistence::new(PathBuf::from("/tmp/mydb"));
131 /// assert!(p.sidecar_path().ends_with("declared_plugins.json"));
132 /// ```
133 #[must_use]
134 pub fn new(data_path: impl Into<PathBuf>) -> Self {
135 let mut sidecar_path = data_path.into();
136 sidecar_path.push("_system");
137 sidecar_path.push("declared_plugins.json");
138 let inner = JsonFilePersistence::new(sidecar_path.clone());
139 Self {
140 inner,
141 sidecar_path,
142 cypher_sink: Arc::new(LazyCypherSink::new()),
143 }
144 }
145
146 /// Borrow the underlying sidecar path (for diagnostics + tests).
147 #[must_use]
148 pub fn sidecar_path(&self) -> &Path {
149 &self.sidecar_path
150 }
151
152 /// Borrow the lazy cypher sink so the host can wire it after
153 /// `Uni::build` completes.
154 #[must_use]
155 pub fn cypher_sink(&self) -> &Arc<LazyCypherSink> {
156 &self.cypher_sink
157 }
158}
159
160/// Build the Cypher `MERGE` body that mirrors a [`DeclaredPlugin`]
161/// into a `_DeclaredPlugin` graph node. The qname is the natural key.
162fn merge_cypher(plugin: &DeclaredPlugin) -> String {
163 // Escape any single-quotes by doubling them — minimal escaping
164 // for the v1 dual-write path. Production callers should bind via
165 // parameters once `Session::tx().execute_with(...).bind(...)` is
166 // stabilized through the Tx API.
167 fn esc(s: &str) -> String {
168 s.replace('\'', "''")
169 }
170 let deps = plugin
171 .dependencies
172 .iter()
173 .map(|d| format!("'{}'", esc(d)))
174 .collect::<Vec<_>>()
175 .join(", ");
176 format!(
177 "MERGE (p:_DeclaredPlugin {{qname: '{q}'}}) \
178 SET p.kind = '{k}', \
179 p.body = '{b}', \
180 p.signature_json = '{s}', \
181 p.dependencies = [{d}], \
182 p.declared_by = '{db}', \
183 p.active = {a}",
184 q = esc(&plugin.qname),
185 k = esc(&plugin.kind),
186 b = esc(&plugin.body),
187 s = esc(&plugin.signature_json),
188 d = deps,
189 db = esc(&plugin.declared_by),
190 a = plugin.active,
191 )
192}
193
194/// Build the Cypher `MATCH ... DETACH DELETE` body that removes the
195/// `_DeclaredPlugin` graph node for a given qname.
196fn delete_cypher(qname: &str) -> String {
197 let q = qname.replace('\'', "''");
198 format!("MATCH (p:_DeclaredPlugin {{qname: '{q}'}}) DETACH DELETE p")
199}
200
201impl Persistence for SystemLabelPersistence {
202 fn save(&self, plugin: &DeclaredPlugin) -> Result<(), PersistenceError> {
203 // Source of truth: JSON sidecar. Always durable, available
204 // even before `Uni::build` finishes.
205 self.inner.save(plugin)?;
206 // Best-effort: mirror into the `_DeclaredPlugin` graph
207 // label. Failure is logged, not propagated, since the sidecar
208 // already committed the durable record.
209 if let Err(e) = self.cypher_sink.try_write_cypher(&merge_cypher(plugin)) {
210 tracing::debug!(
211 qname = %plugin.qname,
212 error = %e,
213 "SystemLabelPersistence: cypher mirror skipped",
214 );
215 }
216 Ok(())
217 }
218
219 fn delete(&self, qname: &str) -> Result<(), PersistenceError> {
220 self.inner.delete(qname)?;
221 if let Err(e) = self.cypher_sink.try_write_cypher(&delete_cypher(qname)) {
222 tracing::debug!(
223 qname = %qname,
224 error = %e,
225 "SystemLabelPersistence: cypher mirror delete skipped",
226 );
227 }
228 Ok(())
229 }
230
231 fn load_all(&self) -> Result<Vec<DeclaredPlugin>, PersistenceError> {
232 // JSON is the source-of-truth at startup; the graph label is
233 // a projection updated on subsequent writes.
234 self.inner.load_all()
235 }
236}
237
238/// Choose the appropriate persistence backend for a `Uni` instance.
239///
240/// Returns [`SystemLabelPersistence`] when `data_path` is a local
241/// filesystem path; falls back to [`uni_plugin_custom::NullPersistence`]
242/// when the instance is in-memory / object-store-backed (no local
243/// root for the JSON sidecar).
244///
245/// The optional second tuple element is the [`LazyCypherSink`] held by
246/// the `SystemLabelPersistence` (none for the null backend). The
247/// host's `Uni::build` flow stashes it and calls
248/// [`LazyCypherSink::set_host_executor`] after the host is constructed,
249/// at which point subsequent declarations dual-write into the
250/// `_DeclaredPlugin` graph label.
251#[must_use]
252pub fn persistence_for_data_path(
253 data_path: Option<&Path>,
254) -> (Arc<dyn Persistence>, Option<Arc<LazyCypherSink>>) {
255 match data_path {
256 Some(path) => {
257 let p = Arc::new(SystemLabelPersistence::new(path.to_owned()));
258 let sink = Arc::clone(p.cypher_sink());
259 (p as Arc<dyn Persistence>, Some(sink))
260 }
261 None => (Arc::new(uni_plugin_custom::NullPersistence), None),
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use tempfile::TempDir;
269
270 fn fixture_plugin() -> DeclaredPlugin {
271 DeclaredPlugin {
272 qname: "mycorp.fullName".to_owned(),
273 kind: "function".to_owned(),
274 body: "$first + ' ' + $last".to_owned(),
275 signature_json: "{}".to_owned(),
276 dependencies: vec![],
277 declared_by: "alice".to_owned(),
278 active: true,
279 }
280 }
281
282 #[test]
283 fn sidecar_lives_under_system_subdir() {
284 let p = SystemLabelPersistence::new("/tmp/mydb");
285 assert!(p.sidecar_path().to_string_lossy().contains("/_system/"));
286 assert!(p.sidecar_path().ends_with("declared_plugins.json"));
287 }
288
289 #[test]
290 fn save_and_load_round_trip() {
291 let tmp = TempDir::new().unwrap();
292 let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
293 let plugin = fixture_plugin();
294 p.save(&plugin).expect("save");
295 let loaded = p.load_all().expect("load_all");
296 assert_eq!(loaded.len(), 1);
297 assert_eq!(loaded[0], plugin);
298 }
299
300 #[test]
301 fn delete_removes_the_record() {
302 let tmp = TempDir::new().unwrap();
303 let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
304 let plugin = fixture_plugin();
305 p.save(&plugin).expect("save");
306 p.delete(&plugin.qname).expect("delete");
307 let loaded = p.load_all().expect("load_all");
308 assert!(loaded.is_empty());
309 }
310
311 #[test]
312 fn save_then_close_reopen_survives() {
313 let tmp = TempDir::new().unwrap();
314 {
315 let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
316 p.save(&fixture_plugin()).expect("save");
317 }
318 // New instance pointing at the same root.
319 let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
320 let loaded = p.load_all().expect("load_all");
321 assert_eq!(loaded.len(), 1, "declaration must survive close+reopen");
322 }
323
324 #[test]
325 fn persistence_for_in_memory_returns_null() {
326 let (p, sink) = persistence_for_data_path(None);
327 assert!(sink.is_none(), "NullPersistence has no cypher sink");
328 assert!(p.load_all().expect("load_all").is_empty());
329 p.save(&fixture_plugin()).expect("null save is ok");
330 assert!(
331 p.load_all().expect("load_all").is_empty(),
332 "NullPersistence drops on the floor"
333 );
334 }
335
336 #[test]
337 fn persistence_for_local_path_returns_sink() {
338 let tmp = TempDir::new().unwrap();
339 let (_p, sink) = persistence_for_data_path(Some(tmp.path()));
340 assert!(
341 sink.is_some(),
342 "local-path persistence must expose a cypher sink"
343 );
344 }
345
346 #[test]
347 fn cypher_sink_pre_wire_returns_err() {
348 let sink = LazyCypherSink::new();
349 let result = sink.try_write_cypher("MATCH (n) RETURN n");
350 assert!(result.is_err(), "pre-wire try_write_cypher must error");
351 }
352
353 #[tokio::test]
354 async fn cypher_sink_current_thread_runtime_degrades_to_err() {
355 // `#[tokio::test]` defaults to current_thread, which cannot
356 // host `block_in_place`. The sink must return an Err instead
357 // of panicking so the dual-write `save()` path stays
358 // best-effort.
359 let sink = LazyCypherSink::new();
360 // No need to actually wire a UniInner — the pre-wire branch
361 // also exercises the no-panic invariant, but we want to
362 // explicitly assert the flavor-check Err shape when the wire
363 // *is* present without paying the full Uni build cost. The
364 // pre-wire path errors first; that's fine — the panic only
365 // surfaced once a UniInner had been wired. Here we just
366 // confirm the function returns Err without panicking.
367 let result = sink.try_write_cypher("MATCH (n) RETURN n");
368 assert!(result.is_err());
369 }
370
371 #[test]
372 fn save_succeeds_when_cypher_mirror_is_unwired() {
373 // The Cypher sink isn't wired in this fixture, so the
374 // dual-write path's MERGE call returns Err — but save() must
375 // still succeed because the JSON sidecar IS the source of
376 // truth.
377 let tmp = TempDir::new().unwrap();
378 let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
379 p.save(&fixture_plugin())
380 .expect("JSON sidecar save must succeed even without sink");
381 let loaded = p.load_all().expect("load_all");
382 assert_eq!(loaded.len(), 1);
383 }
384}