use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use uni_plugin_custom::{DeclaredPlugin, JsonFilePersistence, Persistence, PersistenceError};
use crate::host::HostCypherExecutor;
#[derive(Debug, Default)]
pub struct LazyCypherSink {
host_executor: OnceLock<Arc<dyn HostCypherExecutor>>,
}
impl LazyCypherSink {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn set_host_executor(&self, exec: Arc<dyn HostCypherExecutor>) {
let _ = self.host_executor.set(exec);
}
pub fn try_write_cypher(&self, cypher: &str) -> Result<(), String> {
let exec = self
.host_executor
.get()
.ok_or_else(|| "LazyCypherSink: host executor not wired".to_owned())?;
exec.execute_write_cypher(cypher)
}
}
#[derive(Debug)]
pub struct SystemLabelPersistence {
inner: JsonFilePersistence,
sidecar_path: PathBuf,
cypher_sink: Arc<LazyCypherSink>,
}
impl SystemLabelPersistence {
#[must_use]
pub fn new(data_path: impl Into<PathBuf>) -> Self {
let mut sidecar_path = data_path.into();
sidecar_path.push("_system");
sidecar_path.push("declared_plugins.json");
let inner = JsonFilePersistence::new(sidecar_path.clone());
Self {
inner,
sidecar_path,
cypher_sink: Arc::new(LazyCypherSink::new()),
}
}
#[must_use]
pub fn sidecar_path(&self) -> &Path {
&self.sidecar_path
}
#[must_use]
pub fn cypher_sink(&self) -> &Arc<LazyCypherSink> {
&self.cypher_sink
}
}
fn merge_cypher(plugin: &DeclaredPlugin) -> String {
fn esc(s: &str) -> String {
s.replace('\'', "''")
}
let deps = plugin
.dependencies
.iter()
.map(|d| format!("'{}'", esc(d)))
.collect::<Vec<_>>()
.join(", ");
format!(
"MERGE (p:_DeclaredPlugin {{qname: '{q}'}}) \
SET p.kind = '{k}', \
p.body = '{b}', \
p.signature_json = '{s}', \
p.dependencies = [{d}], \
p.declared_by = '{db}', \
p.active = {a}",
q = esc(&plugin.qname),
k = esc(&plugin.kind),
b = esc(&plugin.body),
s = esc(&plugin.signature_json),
d = deps,
db = esc(&plugin.declared_by),
a = plugin.active,
)
}
fn delete_cypher(qname: &str) -> String {
let q = qname.replace('\'', "''");
format!("MATCH (p:_DeclaredPlugin {{qname: '{q}'}}) DETACH DELETE p")
}
impl Persistence for SystemLabelPersistence {
fn save(&self, plugin: &DeclaredPlugin) -> Result<(), PersistenceError> {
self.inner.save(plugin)?;
if let Err(e) = self.cypher_sink.try_write_cypher(&merge_cypher(plugin)) {
tracing::debug!(
qname = %plugin.qname,
error = %e,
"SystemLabelPersistence: cypher mirror skipped",
);
}
Ok(())
}
fn delete(&self, qname: &str) -> Result<(), PersistenceError> {
self.inner.delete(qname)?;
if let Err(e) = self.cypher_sink.try_write_cypher(&delete_cypher(qname)) {
tracing::debug!(
qname = %qname,
error = %e,
"SystemLabelPersistence: cypher mirror delete skipped",
);
}
Ok(())
}
fn load_all(&self) -> Result<Vec<DeclaredPlugin>, PersistenceError> {
self.inner.load_all()
}
}
#[must_use]
pub fn persistence_for_data_path(
data_path: Option<&Path>,
) -> (Arc<dyn Persistence>, Option<Arc<LazyCypherSink>>) {
match data_path {
Some(path) => {
let p = Arc::new(SystemLabelPersistence::new(path.to_owned()));
let sink = Arc::clone(p.cypher_sink());
(p as Arc<dyn Persistence>, Some(sink))
}
None => (Arc::new(uni_plugin_custom::NullPersistence), None),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn fixture_plugin() -> DeclaredPlugin {
DeclaredPlugin {
qname: "mycorp.fullName".to_owned(),
kind: "function".to_owned(),
body: "$first + ' ' + $last".to_owned(),
signature_json: "{}".to_owned(),
dependencies: vec![],
declared_by: "alice".to_owned(),
active: true,
}
}
#[test]
fn sidecar_lives_under_system_subdir() {
let p = SystemLabelPersistence::new("/tmp/mydb");
assert!(p.sidecar_path().to_string_lossy().contains("/_system/"));
assert!(p.sidecar_path().ends_with("declared_plugins.json"));
}
#[test]
fn save_and_load_round_trip() {
let tmp = TempDir::new().unwrap();
let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
let plugin = fixture_plugin();
p.save(&plugin).expect("save");
let loaded = p.load_all().expect("load_all");
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0], plugin);
}
#[test]
fn delete_removes_the_record() {
let tmp = TempDir::new().unwrap();
let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
let plugin = fixture_plugin();
p.save(&plugin).expect("save");
p.delete(&plugin.qname).expect("delete");
let loaded = p.load_all().expect("load_all");
assert!(loaded.is_empty());
}
#[test]
fn save_then_close_reopen_survives() {
let tmp = TempDir::new().unwrap();
{
let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
p.save(&fixture_plugin()).expect("save");
}
let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
let loaded = p.load_all().expect("load_all");
assert_eq!(loaded.len(), 1, "declaration must survive close+reopen");
}
#[test]
fn persistence_for_in_memory_returns_null() {
let (p, sink) = persistence_for_data_path(None);
assert!(sink.is_none(), "NullPersistence has no cypher sink");
assert!(p.load_all().expect("load_all").is_empty());
p.save(&fixture_plugin()).expect("null save is ok");
assert!(
p.load_all().expect("load_all").is_empty(),
"NullPersistence drops on the floor"
);
}
#[test]
fn persistence_for_local_path_returns_sink() {
let tmp = TempDir::new().unwrap();
let (_p, sink) = persistence_for_data_path(Some(tmp.path()));
assert!(
sink.is_some(),
"local-path persistence must expose a cypher sink"
);
}
#[test]
fn cypher_sink_pre_wire_returns_err() {
let sink = LazyCypherSink::new();
let result = sink.try_write_cypher("MATCH (n) RETURN n");
assert!(result.is_err(), "pre-wire try_write_cypher must error");
}
#[tokio::test]
async fn cypher_sink_current_thread_runtime_degrades_to_err() {
let sink = LazyCypherSink::new();
let result = sink.try_write_cypher("MATCH (n) RETURN n");
assert!(result.is_err());
}
#[test]
fn save_succeeds_when_cypher_mirror_is_unwired() {
let tmp = TempDir::new().unwrap();
let p = SystemLabelPersistence::new(tmp.path().to_path_buf());
p.save(&fixture_plugin())
.expect("JSON sidecar save must succeed even without sink");
let loaded = p.load_all().expect("load_all");
assert_eq!(loaded.len(), 1);
}
}