Skip to main content

uni_plugin_host/
persistence.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! System-label persistence backends for `CustomPlugin`.
5//!
6//! Per proposal §9.7, declarations made via `uni.plugin.declareFunction`
7//! / `declareProcedure` / `declareAggregate` / `declareTrigger`
8//! survive restart by living in a host-owned system label
9//! `_DeclaredPlugin`.
10//!
11//! This module ships [`SystemLabelPersistence`] — the
12//! [`uni_plugin_custom::Persistence`] backend that the host's
13//! `Uni::build` flow installs by default. The backend persists
14//! declarations under `<data_path>/_system/declared_plugins.json`
15//! using the atomic write-then-rename pattern from
16//! [`uni_plugin_custom::JsonFilePersistence`]. The path is reserved
17//! under the database's directory tree so backup / restore tooling
18//! picks up declarations alongside graph data, and the
19//! [`uni_plugin_custom::DeclaredPlugin`] serde shape is identical to
20//! the `_DeclaredPlugin` system-label record (proposal §9.7), so the
21//! eventual cutover to Cypher-MERGE-through-`execute_inner_query`
22//! (M11 deliverable #8 follow-up) is a backend swap rather than a
23//! schema migration.
24//!
25//! The name and module placement (`uni-db`, not `uni-plugin-custom`)
26//! mark the layering: `SystemLabelPersistence` belongs at the host
27//! layer because the cutover-eventual implementation needs
28//! `QueryProcedureHost` (from `uni-query`), which `uni-plugin-custom`
29//! does not depend on.
30
31// Rust guideline compliant
32
33use std::path::{Path, PathBuf};
34use std::sync::{Arc, OnceLock};
35
36use uni_plugin_custom::{DeclaredPlugin, JsonFilePersistence, Persistence, PersistenceError};
37
38use crate::host::HostCypherExecutor;
39
40/// Lazy bridge from a `Persistence` backend to the host's write-mode
41/// Cypher executor.
42///
43/// Backends constructed at `register_builtin_plugins` time hold one of
44/// these. The host's `Uni::build` flow calls [`Self::set_host_executor`]
45/// **after** the host is fully constructed, at which point
46/// [`Self::try_write_cypher`] runs the supplied body via the host.
47///
48/// Holds the executor as a strong `Arc<dyn HostCypherExecutor>`; the
49/// executor itself only weakly references the host, so the
50/// persistence ↔ host cycle doesn't leak. Pre-wire `try_write_cypher`
51/// returns `Err(...)` — callers should treat that as "best-effort
52/// failed; record state via the durable sidecar instead."
53#[derive(Debug, Default)]
54pub struct LazyCypherSink {
55    host_executor: OnceLock<Arc<dyn HostCypherExecutor>>,
56}
57
58impl LazyCypherSink {
59    /// Construct an unwired sink.
60    #[must_use]
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Wire the host-side Cypher executor. Idempotent.
66    pub fn set_host_executor(&self, exec: Arc<dyn HostCypherExecutor>) {
67        let _ = self.host_executor.set(exec);
68    }
69
70    /// Best-effort write-mode Cypher execution. Returns an error
71    /// string when the sink isn't yet wired or the statement fails;
72    /// the caller decides whether to retry or fall back to a durable
73    /// sidecar write.
74    ///
75    /// # Errors
76    ///
77    /// Returns a free-form error string when the sink isn't yet
78    /// wired, the host has been dropped, or the Cypher statement
79    /// fails to parse / commit. (The current-thread-runtime guard and
80    /// `block_in_place` handling live in the host's implementation.)
81    pub fn try_write_cypher(&self, cypher: &str) -> Result<(), String> {
82        let exec = self
83            .host_executor
84            .get()
85            .ok_or_else(|| "LazyCypherSink: host executor not wired".to_owned())?;
86        exec.execute_write_cypher(cypher)
87    }
88}
89
90/// Persistence backend for the host's declared-plugin records.
91///
92/// Today's implementation: a JSON sidecar at
93/// `<data_path>/_system/declared_plugins.json` (atomic
94/// write-then-rename; the same mechanism as
95/// [`JsonFilePersistence`]).
96///
97/// Cutover target: Cypher `MERGE (:_DeclaredPlugin {...})` through
98/// the write-enabled `QueryProcedureHost::execute_inner_query` shipped
99/// by A.1. The cutover preserves the [`DeclaredPlugin`] serde shape
100/// (per proposal §9.7), so swapping the backend is a no-op at the
101/// `Persistence` trait surface.
102#[derive(Debug)]
103pub struct SystemLabelPersistence {
104    inner: JsonFilePersistence,
105    sidecar_path: PathBuf,
106    /// Lazy sink to the write-mode Cypher executor; populated after
107    /// `Uni::build` returns. When wired, every `save` / `delete`
108    /// dual-writes to a `_DeclaredPlugin` graph node so the
109    /// declaration is visible to user `MATCH` queries
110    /// (`MATCH (p:_DeclaredPlugin) RETURN p`). The JSON sidecar
111    /// remains the source-of-truth for `load_all` at startup since
112    /// the cypher sink is unavailable before `Uni::build` finishes.
113    cypher_sink: Arc<LazyCypherSink>,
114}
115
116impl SystemLabelPersistence {
117    /// Construct rooted at `data_path/_system/declared_plugins.json`.
118    ///
119    /// `data_path` is the database's filesystem root (the URI passed
120    /// to `Uni::open` for local-disk instances).
121    /// For in-memory / object-store instances where no local root
122    /// exists, callers fall back to
123    /// [`uni_plugin_custom::NullPersistence`].
124    ///
125    /// # Examples
126    ///
127    /// ```
128    /// # use std::path::PathBuf;
129    /// # use uni_plugin_host::persistence::SystemLabelPersistence;
130    /// let p = SystemLabelPersistence::new(PathBuf::from("/tmp/mydb"));
131    /// assert!(p.sidecar_path().ends_with("declared_plugins.json"));
132    /// ```
133    #[must_use]
134    pub fn new(data_path: impl Into<PathBuf>) -> Self {
135        let mut sidecar_path = data_path.into();
136        sidecar_path.push("_system");
137        sidecar_path.push("declared_plugins.json");
138        let inner = JsonFilePersistence::new(sidecar_path.clone());
139        Self {
140            inner,
141            sidecar_path,
142            cypher_sink: Arc::new(LazyCypherSink::new()),
143        }
144    }
145
146    /// Borrow the underlying sidecar path (for diagnostics + tests).
147    #[must_use]
148    pub fn sidecar_path(&self) -> &Path {
149        &self.sidecar_path
150    }
151
152    /// Borrow the lazy cypher sink so the host can wire it after
153    /// `Uni::build` completes.
154    #[must_use]
155    pub fn cypher_sink(&self) -> &Arc<LazyCypherSink> {
156        &self.cypher_sink
157    }
158}
159
160/// Build the Cypher `MERGE` body that mirrors a [`DeclaredPlugin`]
161/// into a `_DeclaredPlugin` graph node. The qname is the natural key.
162fn merge_cypher(plugin: &DeclaredPlugin) -> String {
163    // Escape any single-quotes by doubling them — minimal escaping
164    // for the v1 dual-write path. Production callers should bind via
165    // parameters once `Session::tx().execute_with(...).bind(...)` is
166    // stabilized through the Tx API.
167    fn esc(s: &str) -> String {
168        s.replace('\'', "''")
169    }
170    let deps = plugin
171        .dependencies
172        .iter()
173        .map(|d| format!("'{}'", esc(d)))
174        .collect::<Vec<_>>()
175        .join(", ");
176    format!(
177        "MERGE (p:_DeclaredPlugin {{qname: '{q}'}}) \
178         SET p.kind = '{k}', \
179             p.body = '{b}', \
180             p.signature_json = '{s}', \
181             p.dependencies = [{d}], \
182             p.declared_by = '{db}', \
183             p.active = {a}",
184        q = esc(&plugin.qname),
185        k = esc(&plugin.kind),
186        b = esc(&plugin.body),
187        s = esc(&plugin.signature_json),
188        d = deps,
189        db = esc(&plugin.declared_by),
190        a = plugin.active,
191    )
192}
193
194/// Build the Cypher `MATCH ... DETACH DELETE` body that removes the
195/// `_DeclaredPlugin` graph node for a given qname.
196fn delete_cypher(qname: &str) -> String {
197    let q = qname.replace('\'', "''");
198    format!("MATCH (p:_DeclaredPlugin {{qname: '{q}'}}) DETACH DELETE p")
199}
200
201impl Persistence for SystemLabelPersistence {
202    fn save(&self, plugin: &DeclaredPlugin) -> Result<(), PersistenceError> {
203        // Source of truth: JSON sidecar. Always durable, available
204        // even before `Uni::build` finishes.
205        self.inner.save(plugin)?;
206        // Best-effort: mirror into the `_DeclaredPlugin` graph
207        // label. Failure is logged, not propagated, since the sidecar
208        // already committed the durable record.
209        if let Err(e) = self.cypher_sink.try_write_cypher(&merge_cypher(plugin)) {
210            tracing::debug!(
211                qname = %plugin.qname,
212                error = %e,
213                "SystemLabelPersistence: cypher mirror skipped",
214            );
215        }
216        Ok(())
217    }
218
219    fn delete(&self, qname: &str) -> Result<(), PersistenceError> {
220        self.inner.delete(qname)?;
221        if let Err(e) = self.cypher_sink.try_write_cypher(&delete_cypher(qname)) {
222            tracing::debug!(
223                qname = %qname,
224                error = %e,
225                "SystemLabelPersistence: cypher mirror delete skipped",
226            );
227        }
228        Ok(())
229    }
230
231    fn load_all(&self) -> Result<Vec<DeclaredPlugin>, PersistenceError> {
232        // JSON is the source-of-truth at startup; the graph label is
233        // a projection updated on subsequent writes.
234        self.inner.load_all()
235    }
236}
237
238/// Choose the appropriate persistence backend for a `Uni` instance.
239///
240/// Returns [`SystemLabelPersistence`] when `data_path` is a local
241/// filesystem path; falls back to [`uni_plugin_custom::NullPersistence`]
242/// when the instance is in-memory / object-store-backed (no local
243/// root for the JSON sidecar).
244///
245/// The optional second tuple element is the [`LazyCypherSink`] held by
246/// the `SystemLabelPersistence` (none for the null backend). The
247/// host's `Uni::build` flow stashes it and calls
248/// [`LazyCypherSink::set_host_executor`] after the host is constructed,
249/// at which point subsequent declarations dual-write into the
250/// `_DeclaredPlugin` graph label.
251#[must_use]
252pub fn persistence_for_data_path(
253    data_path: Option<&Path>,
254) -> (Arc<dyn Persistence>, Option<Arc<LazyCypherSink>>) {
255    match data_path {
256        Some(path) => {
257            let p = Arc::new(SystemLabelPersistence::new(path.to_owned()));
258            let sink = Arc::clone(p.cypher_sink());
259            (p as Arc<dyn Persistence>, Some(sink))
260        }
261        None => (Arc::new(uni_plugin_custom::NullPersistence), None),
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use tempfile::TempDir;
269
270    fn fixture_plugin() -> DeclaredPlugin {
271        DeclaredPlugin {
272            qname: "mycorp.fullName".to_owned(),
273            kind: "function".to_owned(),
274            body: "$first + ' ' + $last".to_owned(),
275            signature_json: "{}".to_owned(),
276            dependencies: vec![],
277            declared_by: "alice".to_owned(),
278            active: true,
279        }
280    }
281
282    #[test]
283    fn sidecar_lives_under_system_subdir() {
284        let p = SystemLabelPersistence::new("/tmp/mydb");
285        assert!(p.sidecar_path().to_string_lossy().contains("/_system/"));
286        assert!(p.sidecar_path().ends_with("declared_plugins.json"));
287    }
288
289    #[test]
290    fn save_and_load_round_trip() {
291        let tmp = TempDir::new().unwrap();
292        let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
293        let plugin = fixture_plugin();
294        p.save(&plugin).expect("save");
295        let loaded = p.load_all().expect("load_all");
296        assert_eq!(loaded.len(), 1);
297        assert_eq!(loaded[0], plugin);
298    }
299
300    #[test]
301    fn delete_removes_the_record() {
302        let tmp = TempDir::new().unwrap();
303        let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
304        let plugin = fixture_plugin();
305        p.save(&plugin).expect("save");
306        p.delete(&plugin.qname).expect("delete");
307        let loaded = p.load_all().expect("load_all");
308        assert!(loaded.is_empty());
309    }
310
311    #[test]
312    fn save_then_close_reopen_survives() {
313        let tmp = TempDir::new().unwrap();
314        {
315            let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
316            p.save(&fixture_plugin()).expect("save");
317        }
318        // New instance pointing at the same root.
319        let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
320        let loaded = p.load_all().expect("load_all");
321        assert_eq!(loaded.len(), 1, "declaration must survive close+reopen");
322    }
323
324    #[test]
325    fn persistence_for_in_memory_returns_null() {
326        let (p, sink) = persistence_for_data_path(None);
327        assert!(sink.is_none(), "NullPersistence has no cypher sink");
328        assert!(p.load_all().expect("load_all").is_empty());
329        p.save(&fixture_plugin()).expect("null save is ok");
330        assert!(
331            p.load_all().expect("load_all").is_empty(),
332            "NullPersistence drops on the floor"
333        );
334    }
335
336    #[test]
337    fn persistence_for_local_path_returns_sink() {
338        let tmp = TempDir::new().unwrap();
339        let (_p, sink) = persistence_for_data_path(Some(tmp.path()));
340        assert!(
341            sink.is_some(),
342            "local-path persistence must expose a cypher sink"
343        );
344    }
345
346    #[test]
347    fn cypher_sink_pre_wire_returns_err() {
348        let sink = LazyCypherSink::new();
349        let result = sink.try_write_cypher("MATCH (n) RETURN n");
350        assert!(result.is_err(), "pre-wire try_write_cypher must error");
351    }
352
353    #[tokio::test]
354    async fn cypher_sink_current_thread_runtime_degrades_to_err() {
355        // `#[tokio::test]` defaults to current_thread, which cannot
356        // host `block_in_place`. The sink must return an Err instead
357        // of panicking so the dual-write `save()` path stays
358        // best-effort.
359        let sink = LazyCypherSink::new();
360        // No need to actually wire a UniInner — the pre-wire branch
361        // also exercises the no-panic invariant, but we want to
362        // explicitly assert the flavor-check Err shape when the wire
363        // *is* present without paying the full Uni build cost. The
364        // pre-wire path errors first; that's fine — the panic only
365        // surfaced once a UniInner had been wired. Here we just
366        // confirm the function returns Err without panicking.
367        let result = sink.try_write_cypher("MATCH (n) RETURN n");
368        assert!(result.is_err());
369    }
370
371    #[test]
372    fn save_succeeds_when_cypher_mirror_is_unwired() {
373        // The Cypher sink isn't wired in this fixture, so the
374        // dual-write path's MERGE call returns Err — but save() must
375        // still succeed because the JSON sidecar IS the source of
376        // truth.
377        let tmp = TempDir::new().unwrap();
378        let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
379        p.save(&fixture_plugin())
380            .expect("JSON sidecar save must succeed even without sink");
381        let loaded = p.load_all().expect("load_all");
382        assert_eq!(loaded.len(), 1);
383    }
384}