use super::{Delta, DeltaListener, DeltaOp};
use serde_json::{json, Value};
use std::sync::RwLock;
#[derive(Debug, Clone)]
pub struct CypherDelta {
pub statement: String,
pub params: Value,
}
pub struct GraphProjection {
buffer: RwLock<Vec<CypherDelta>>,
}
impl Default for GraphProjection {
fn default() -> Self {
Self::new()
}
}
impl GraphProjection {
pub fn new() -> Self {
Self {
buffer: RwLock::new(Vec::new()),
}
}
pub fn schema_ddl() -> Vec<&'static str> {
vec![
"CREATE NODE TABLE IF NOT EXISTS Entity (name STRING, created_at STRING, PRIMARY KEY (name))",
"CREATE NODE TABLE IF NOT EXISTS Component (id STRING, type STRING, entity STRING, data STRING, updated STRING, PRIMARY KEY (id))",
"CREATE REL TABLE IF NOT EXISTS HAS_COMPONENT (FROM Entity TO Component)",
"CREATE REL TABLE IF NOT EXISTS REFERENCES (FROM Entity TO Entity, component STRING, field STRING)",
"CREATE REL TABLE IF NOT EXISTS SPAWNED_FROM (FROM Entity TO Entity)",
"CREATE NODE TABLE IF NOT EXISTS Actor (name STRING, template STRING, config STRING, PRIMARY KEY (name))",
"CREATE NODE TABLE IF NOT EXISTS ActorTemplate (id STRING, name STRING, inports STRING, outports STRING, description STRING, category STRING, PRIMARY KEY (id))",
"CREATE REL TABLE IF NOT EXISTS CONNECTS_TO (FROM Actor TO Actor, from_port STRING, to_port STRING)",
"CREATE REL TABLE IF NOT EXISTS TARGETS (FROM Actor TO Entity)",
"CREATE REL TABLE IF NOT EXISTS INSTANCE_OF (FROM Actor TO ActorTemplate)",
"CREATE NODE TABLE IF NOT EXISTS WiringPattern (id STRING, name STRING, description STRING, actors STRING, connections STRING, PRIMARY KEY (id))",
"CREATE REL TABLE IF NOT EXISTS USES_TEMPLATE (FROM WiringPattern TO ActorTemplate)",
"CREATE NODE TABLE IF NOT EXISTS RenderResult (id STRING, entity STRING, timestamp STRING, image_path STRING, width INT64, height INT64, PRIMARY KEY (id))",
"CREATE NODE TABLE IF NOT EXISTS QualityScore (id STRING, render_id STRING, score DOUBLE, analysis STRING, timestamp STRING, PRIMARY KEY (id))",
"CREATE REL TABLE IF NOT EXISTS RENDERED_BY (FROM RenderResult TO Entity)",
"CREATE REL TABLE IF NOT EXISTS SCORED (FROM QualityScore TO RenderResult)",
"CREATE NODE TABLE IF NOT EXISTS Trace (id STRING, actor STRING, timestamp STRING, duration_ms DOUBLE, input_keys STRING, output_keys STRING, PRIMARY KEY (id))",
"CREATE NODE TABLE IF NOT EXISTS Warning (id STRING, entity STRING, type STRING, message STRING, timestamp STRING, PRIMARY KEY (id))",
"CREATE REL TABLE IF NOT EXISTS TRACED_BY (FROM Trace TO Actor)",
"CREATE REL TABLE IF NOT EXISTS WARNS_ABOUT (FROM Warning TO Entity)",
]
}
pub fn project_catalog(
&self,
templates: &[(String, String, Vec<String>, Vec<String>)], ) {
self.push(CypherDelta {
statement: "CREATE NODE TABLE IF NOT EXISTS ActorTemplate (id STRING, name STRING, inports STRING, outports STRING, PRIMARY KEY (id))".into(),
params: json!({}),
});
self.push(CypherDelta {
statement: "CREATE REL TABLE IF NOT EXISTS INSTANCE_OF (FROM Actor TO ActorTemplate)"
.into(),
params: json!({}),
});
for (template_id, actor_name, inports, outports) in templates {
self.push(CypherDelta {
statement: concat!(
"MERGE (t:ActorTemplate {id: $id}) ",
"SET t.name = $name, t.inports = $inports, t.outports = $outports"
)
.into(),
params: json!({
"id": template_id,
"name": actor_name,
"inports": serde_json::to_string(inports).unwrap_or_default(),
"outports": serde_json::to_string(outports).unwrap_or_default(),
}),
});
}
}
pub fn drain(&self) -> Vec<CypherDelta> {
let mut buf = self.buffer.write().unwrap_or_else(|e| e.into_inner());
std::mem::take(&mut *buf)
}
pub fn project_actor_added(&self, name: &str, template: &str, config: &Value) {
self.push(CypherDelta {
statement:
"MERGE (a:Actor {name: $name}) SET a.template = $template, a.config = $config"
.into(),
params: json!({"name": name, "template": template, "config": config}),
});
self.push(CypherDelta {
statement: concat!(
"MATCH (a:Actor {name: $name}), (t:ActorTemplate {id: $template}) ",
"MERGE (a)-[:INSTANCE_OF]->(t)"
)
.into(),
params: json!({"name": name, "template": template}),
});
}
pub fn project_actor_removed(&self, name: &str) {
self.push(CypherDelta {
statement: "MATCH (a:Actor {name: $name}) DETACH DELETE a".into(),
params: json!({"name": name}),
});
}
pub fn project_connection_added(
&self,
from_actor: &str,
from_port: &str,
to_actor: &str,
to_port: &str,
) {
self.push(CypherDelta {
statement: concat!(
"MATCH (a:Actor {name: $from}), (b:Actor {name: $to}) ",
"MERGE (a)-[:CONNECTS_TO {from_port: $fp, to_port: $tp}]->(b)"
)
.into(),
params: json!({
"from": from_actor, "fp": from_port,
"to": to_actor, "tp": to_port,
}),
});
}
pub fn project_connection_removed(
&self,
from_actor: &str,
from_port: &str,
to_actor: &str,
to_port: &str,
) {
self.push(CypherDelta {
statement: concat!(
"MATCH (a:Actor {name: $from})-[r:CONNECTS_TO {from_port: $fp, to_port: $tp}]->(b:Actor {name: $to}) ",
"DELETE r"
).into(),
params: json!({
"from": from_actor, "fp": from_port,
"to": to_actor, "tp": to_port,
}),
});
}
pub fn project_render_result(&self, entity: &str, image_path: &str, width: u32, height: u32) {
let id = format!("render_{}_{}", entity, now_iso_compact());
self.push(CypherDelta {
statement: concat!(
"CREATE (r:RenderResult {id: $id, entity: $entity, timestamp: $ts, ",
"image_path: $path, width: $w, height: $h}) ",
"WITH r MATCH (e:Entity {name: $entity}) ",
"CREATE (r)-[:RENDERED_BY]->(e)"
)
.into(),
params: json!({
"id": id,
"entity": entity,
"ts": chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
"path": image_path,
"w": width,
"h": height,
}),
});
}
pub fn project_quality_score(&self, render_id: &str, score: f64, analysis: &str) {
let id = format!("score_{}", now_iso_compact());
self.push(CypherDelta {
statement: concat!(
"CREATE (q:QualityScore {id: $id, render_id: $rid, score: $score, ",
"analysis: $analysis, timestamp: $ts}) ",
"WITH q MATCH (r:RenderResult {id: $rid}) ",
"CREATE (q)-[:SCORED]->(r)"
)
.into(),
params: json!({
"id": id,
"rid": render_id,
"score": score,
"analysis": analysis,
"ts": chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
}),
});
}
pub fn project_trace(
&self,
actor: &str,
duration_ms: f64,
input_keys: &[String],
output_keys: &[String],
) {
let id = format!("trace_{}_{}", actor, now_iso_compact());
self.push(CypherDelta {
statement: concat!(
"CREATE (t:Trace {id: $id, actor: $actor, timestamp: $ts, ",
"duration_ms: $dur, input_keys: $ik, output_keys: $ok}) ",
"WITH t MATCH (a:Actor {name: $actor}) ",
"CREATE (t)-[:TRACED_BY]->(a)"
)
.into(),
params: json!({
"id": id,
"actor": actor,
"ts": chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
"dur": duration_ms,
"ik": serde_json::to_string(input_keys).unwrap_or_default(),
"ok": serde_json::to_string(output_keys).unwrap_or_default(),
}),
});
}
pub fn project_warning(&self, entity: &str, warning_type: &str, message: &str) {
let id = format!("warn_{}_{}", entity, now_iso_compact());
self.push(CypherDelta {
statement: concat!(
"CREATE (w:Warning {id: $id, entity: $entity, type: $wtype, ",
"message: $msg, timestamp: $ts}) ",
"WITH w MATCH (e:Entity {name: $entity}) ",
"CREATE (w)-[:WARNS_ABOUT]->(e)"
)
.into(),
params: json!({
"id": id,
"entity": entity,
"wtype": warning_type,
"msg": message,
"ts": chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
}),
});
}
pub fn project_wiring_pattern(
&self,
id: &str,
name: &str,
description: &str,
actors: &[(&str, &str)], connections: &[(&str, &str, &str, &str)], ) {
self.push(CypherDelta {
statement: concat!(
"MERGE (p:WiringPattern {id: $id}) ",
"SET p.name = $name, p.description = $desc, ",
"p.actors = $actors, p.connections = $conns"
)
.into(),
params: json!({
"id": id,
"name": name,
"desc": description,
"actors": serde_json::to_string(actors).unwrap_or_default(),
"conns": serde_json::to_string(connections).unwrap_or_default(),
}),
});
for (_, template) in actors {
self.push(CypherDelta {
statement: concat!(
"MATCH (p:WiringPattern {id: $pid}), (t:ActorTemplate {id: $tid}) ",
"MERGE (p)-[:USES_TEMPLATE]->(t)"
)
.into(),
params: json!({"pid": id, "tid": template}),
});
}
}
fn push(&self, delta: CypherDelta) {
if let Ok(mut buf) = self.buffer.write() {
buf.push(delta);
}
}
}
impl DeltaListener for GraphProjection {
fn on_delta(&self, delta: &Delta) {
match delta.op {
DeltaOp::Put | DeltaOp::Merge => {
self.push(CypherDelta {
statement: "MERGE (e:Entity {name: $entity})".into(),
params: json!({"entity": &delta.entity}),
});
if !delta.component.is_empty() {
self.push(CypherDelta {
statement: concat!(
"MERGE (c:Component {id: $id}) ",
"SET c.type = $type, c.entity = $entity, c.data = $data, c.updated = $ts ",
"WITH c ",
"MATCH (e:Entity {name: $entity}) ",
"MERGE (e)-[:HAS_COMPONENT]->(c)"
).into(),
params: json!({
"id": &delta.id,
"type": &delta.component,
"entity": &delta.entity,
"data": delta.data.as_ref().unwrap_or(&Value::Null),
"ts": &delta.timestamp,
}),
});
if let Some(ref data) = delta.data {
extract_references(&delta.entity, &delta.component, data, self);
}
}
}
DeltaOp::Delete => {
if !delta.component.is_empty() {
self.push(CypherDelta {
statement: "MATCH (c:Component {id: $id}) DETACH DELETE c".into(),
params: json!({"id": &delta.id}),
});
} else {
self.push(CypherDelta {
statement: concat!(
"MATCH (e:Entity {name: $name}) ",
"OPTIONAL MATCH (e)-[:HAS_COMPONENT]->(c:Component) ",
"DETACH DELETE e, c"
)
.into(),
params: json!({"name": &delta.entity}),
});
}
}
DeltaOp::Tag => {
if let Some(ref tags) = delta.data {
self.push(CypherDelta {
statement: concat!("MATCH (n {id: $id}) ", "SET n.tags = $tags").into(),
params: json!({"id": &delta.id, "tags": tags}),
});
}
}
DeltaOp::Spawn => {
if let Some(ref data) = delta.data {
let template = data.get("template").and_then(|v| v.as_str()).unwrap_or("");
self.push(CypherDelta {
statement: concat!(
"MATCH (tpl:Entity {name: $template}) ",
"MERGE (e:Entity {name: $entity}) ",
"MERGE (e)-[:SPAWNED_FROM]->(tpl)"
)
.into(),
params: json!({"template": template, "entity": &delta.entity}),
});
}
}
DeltaOp::Destroy => {
self.push(CypherDelta {
statement: concat!(
"MATCH (e:Entity {name: $name}) ",
"OPTIONAL MATCH (e)-[:HAS_COMPONENT]->(c:Component) ",
"DETACH DELETE e, c"
)
.into(),
params: json!({"name": &delta.entity}),
});
}
}
}
}
fn now_iso_compact() -> String {
chrono::Utc::now().format("%Y%m%d%H%M%S%3f").to_string()
}
fn extract_references(entity: &str, component: &str, data: &Value, projection: &GraphProjection) {
if let Value::Object(map) = data {
for (field, val) in map {
extract_refs_recursive(entity, component, field, val, projection);
}
}
}
fn extract_refs_recursive(
entity: &str,
component: &str,
field: &str,
value: &Value,
projection: &GraphProjection,
) {
match value {
Value::String(s) => {
let s = s.trim();
if !s.is_empty()
&& !s.contains(' ')
&& !s.starts_with('/')
&& !s.starts_with("http")
&& !s.starts_with('@')
&& !s.starts_with('$')
&& !s.contains('(')
&& s.chars()
.all(|c| c.is_alphanumeric() || c == '_' || c == '-' || c == ':' || c == '/')
&& s != entity
{
let target_entity = if let Some(colon) = s.rfind(':') {
&s[..colon]
} else {
s
};
projection.push(CypherDelta {
statement: concat!(
"MATCH (src:Entity {name: $src}), (dst:Entity {name: $dst}) ",
"MERGE (src)-[:REFERENCES {component: $comp, field: $field}]->(dst)"
)
.into(),
params: json!({
"src": entity,
"dst": target_entity,
"comp": component,
"field": field,
}),
});
}
}
Value::Object(map) => {
for (k, v) in map {
extract_refs_recursive(
entity,
component,
&format!("{}.{}", field, k),
v,
projection,
);
}
}
Value::Array(arr) => {
for (i, v) in arr.iter().enumerate() {
extract_refs_recursive(
entity,
component,
&format!("{}[{}]", field, i),
v,
projection,
);
}
}
_ => {}
}
}