use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
prime::types::event_types,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NodeEntry {
node_type: String,
properties: Value,
domain: Option<String>,
labels: Vec<String>,
deleted: bool,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
pub struct NodeStateProjection {
name: String,
nodes: Arc<DashMap<String, NodeEntry>>,
}
impl NodeStateProjection {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
nodes: Arc::new(DashMap::new()),
}
}
pub fn len(&self) -> usize {
self.nodes.len()
}
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
pub fn get_node(&self, entity_id: &str) -> Option<crate::prime::types::Node> {
let entry = self.nodes.get(entity_id)?;
let e = entry.value();
let id = crate::prime::types::EntityId::parse(entity_id)
.map_or_else(|| entity_id.to_string(), |eid| eid.short_id().to_string());
Some(crate::prime::types::Node {
id: crate::prime::types::NodeId::new(id),
node_type: e.node_type.clone(),
properties: e.properties.clone(),
domain: e.domain.clone(),
labels: e.labels.clone(),
deleted: e.deleted,
created_at: e.created_at,
updated_at: e.updated_at,
})
}
pub fn all_nodes(&self) -> Vec<crate::prime::types::Node> {
self.nodes
.iter()
.filter(|entry| !entry.value().deleted)
.map(|entry| {
let e = entry.value();
let id = crate::prime::types::EntityId::parse(entry.key())
.map_or_else(|| entry.key().clone(), |eid| eid.short_id().to_string());
crate::prime::types::Node {
id: crate::prime::types::NodeId::new(id),
node_type: e.node_type.clone(),
properties: e.properties.clone(),
domain: e.domain.clone(),
labels: e.labels.clone(),
deleted: false,
created_at: e.created_at,
updated_at: e.updated_at,
}
})
.collect()
}
pub fn is_live(&self, entity_id: &str) -> bool {
self.nodes.get(entity_id).is_some_and(|e| !e.deleted)
}
pub fn is_deleted(&self, entity_id: &str) -> bool {
self.nodes.get(entity_id).is_some_and(|e| e.deleted)
}
}
impl Projection for NodeStateProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
let event_type = event.event_type_str();
let entity_id = event.entity_id_str().to_string();
let payload = &event.payload;
let timestamp = event.timestamp;
match event_type {
event_types::NODE_CREATED => {
let node_type = payload
.get("node_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let properties = payload
.get("properties")
.cloned()
.unwrap_or(Value::Object(Default::default()));
let domain = payload
.get("domain")
.and_then(|v| v.as_str())
.map(String::from);
let labels = payload
.get("labels")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
self.nodes.insert(
entity_id,
NodeEntry {
node_type,
properties,
domain,
labels,
deleted: false,
created_at: timestamp,
updated_at: timestamp,
},
);
}
event_types::NODE_UPDATED => {
if let Some(mut entry) = self.nodes.get_mut(&entity_id) {
if let (Some(Value::Object(updates)), Value::Object(existing)) =
(payload.get("properties"), &mut entry.properties)
{
for (key, value) in updates {
existing.insert(key.clone(), value.clone());
}
}
if let Some(domain) = payload.get("domain") {
entry.domain = domain.as_str().map(String::from);
}
if let Some(labels) =
payload.get("labels").and_then(serde_json::Value::as_array)
{
entry.labels = labels
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
}
entry.updated_at = timestamp;
}
}
event_types::NODE_DELETED => {
if let Some(mut entry) = self.nodes.get_mut(&entity_id) {
entry.deleted = true;
entry.updated_at = timestamp;
}
}
_ => {} }
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.nodes
.get(entity_id)
.map(|entry| serde_json::to_value(entry.value()).unwrap_or(Value::Null))
}
fn clear(&self) {
self.nodes.clear();
}
fn snapshot(&self) -> Option<Value> {
let entries: Vec<(String, NodeEntry)> = self
.nodes
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
serde_json::to_value(entries).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<(String, NodeEntry)> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.nodes.clear();
for (key, entry) in entries {
self.nodes.insert(key, entry);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn make_event(entity_id: &str, event_type: &str, payload: Value) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
Utc::now(),
None,
1,
)
}
#[test]
fn test_create_node_and_get_state() {
let proj = NodeStateProjection::new("node_state");
let event = make_event(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({
"node_type": "person",
"properties": {"name": "Alice", "role": "engineer"},
"domain": "engineering",
"labels": ["active"],
}),
);
proj.process(&event).unwrap();
let state = proj.get_state("node:person:alice").unwrap();
assert_eq!(state["node_type"], "person");
assert_eq!(state["properties"]["name"], "Alice");
assert_eq!(state["properties"]["role"], "engineer");
assert_eq!(state["domain"], "engineering");
assert_eq!(state["deleted"], false);
assert_eq!(state["labels"][0], "active");
}
#[test]
fn test_update_node_deep_merges_properties() {
let proj = NodeStateProjection::new("node_state");
proj.process(&make_event(
"node:person:bob",
event_types::NODE_CREATED,
serde_json::json!({
"node_type": "person",
"properties": {"name": "Bob", "age": 30},
}),
))
.unwrap();
proj.process(&make_event(
"node:person:bob",
event_types::NODE_UPDATED,
serde_json::json!({
"properties": {"role": "manager", "age": 31},
}),
))
.unwrap();
let state = proj.get_state("node:person:bob").unwrap();
assert_eq!(state["properties"]["name"], "Bob"); assert_eq!(state["properties"]["role"], "manager"); assert_eq!(state["properties"]["age"], 31); }
#[test]
fn test_delete_node_soft_deletes() {
let proj = NodeStateProjection::new("node_state");
proj.process(&make_event(
"node:person:carol",
event_types::NODE_CREATED,
serde_json::json!({
"node_type": "person",
"properties": {"name": "Carol"},
}),
))
.unwrap();
proj.process(&make_event(
"node:person:carol",
event_types::NODE_DELETED,
serde_json::json!({}),
))
.unwrap();
let state = proj.get_state("node:person:carol").unwrap();
assert_eq!(state["deleted"], true);
assert_eq!(state["properties"]["name"], "Carol");
}
#[test]
fn test_snapshot_and_restore() {
let proj = NodeStateProjection::new("node_state");
proj.process(&make_event(
"node:person:dave",
event_types::NODE_CREATED,
serde_json::json!({
"node_type": "person",
"properties": {"name": "Dave"},
"domain": "sales",
}),
))
.unwrap();
let snap = proj.snapshot().expect("snapshot should be Some");
proj.clear();
assert!(proj.get_state("node:person:dave").is_none());
proj.restore(&snap).unwrap();
let state = proj.get_state("node:person:dave").unwrap();
assert_eq!(state["node_type"], "person");
assert_eq!(state["properties"]["name"], "Dave");
assert_eq!(state["domain"], "sales");
}
#[test]
fn test_ignores_non_node_events() {
let proj = NodeStateProjection::new("node_state");
proj.process(&make_event(
"edge:e-1",
event_types::EDGE_CREATED,
serde_json::json!({"source": "a", "target": "b"}),
))
.unwrap();
assert!(proj.is_empty());
}
#[test]
fn test_update_nonexistent_node_is_noop() {
let proj = NodeStateProjection::new("node_state");
proj.process(&make_event(
"node:person:ghost",
event_types::NODE_UPDATED,
serde_json::json!({"properties": {"name": "Ghost"}}),
))
.unwrap();
assert!(proj.get_state("node:person:ghost").is_none());
}
}