use dashmap::DashMap;
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection,
domain::entities::Event,
error::Result,
prime::types::{NodeId, event_types},
};
pub const PROJ_DOMAIN_INDEX: &str = "prime.domain_index";
pub struct DomainIndexProjection {
name: String,
index: Arc<DashMap<String, Vec<NodeId>>>,
}
impl DomainIndexProjection {
pub fn new() -> Self {
Self {
name: PROJ_DOMAIN_INDEX.to_string(),
index: Arc::new(DashMap::new()),
}
}
pub fn domains(&self) -> Vec<String> {
self.index.iter().map(|e| e.key().clone()).collect()
}
pub fn nodes_in_domain(&self, domain: &str) -> Vec<NodeId> {
self.index
.get(domain)
.map(|entry| entry.value().clone())
.unwrap_or_default()
}
pub fn domain_counts(&self) -> Vec<(String, usize)> {
self.index
.iter()
.map(|e| (e.key().clone(), e.value().len()))
.collect()
}
}
impl Default for DomainIndexProjection {
fn default() -> Self {
Self::new()
}
}
impl Projection for DomainIndexProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
let event_type = event.event_type_str();
match event_type {
event_types::NODE_CREATED | event_types::NODE_UPDATED => {
if let (Some(domain), Some(node_id_str)) = (
event.payload.get("domain").and_then(Value::as_str),
event.payload.get("node_id").and_then(Value::as_str),
) {
let node_id = NodeId::new(node_id_str);
self.index
.entry(domain.to_string())
.and_modify(|nodes| {
if !nodes.contains(&node_id) {
nodes.push(node_id.clone());
}
})
.or_insert_with(|| vec![node_id]);
}
}
event_types::NODE_DELETED => {
if let Some(node_id) = event.payload.get("node_id").and_then(Value::as_str) {
let target = NodeId::new(node_id);
for mut entry in self.index.iter_mut() {
entry.value_mut().retain(|id| id != &target);
}
self.index.retain(|_, nodes| !nodes.is_empty());
}
}
_ => {}
}
Ok(())
}
fn get_state(&self, domain: &str) -> Option<Value> {
self.index.get(domain).map(|entry| {
let ids: Vec<&str> = entry.value().iter().map(NodeId::as_str).collect();
serde_json::json!({ "domain": domain, "node_ids": ids, "count": ids.len() })
})
}
fn clear(&self) {
self.index.clear();
}
fn snapshot(&self) -> Option<Value> {
let data: std::collections::HashMap<String, Vec<String>> = self
.index
.iter()
.map(|e| {
(
e.key().clone(),
e.value().iter().map(|id| id.0.clone()).collect(),
)
})
.collect();
Some(serde_json::to_value(data).unwrap_or_default())
}
fn restore(&self, snapshot: &Value) -> Result<()> {
if let Ok(data) = serde_json::from_value::<std::collections::HashMap<String, Vec<String>>>(
snapshot.clone(),
) {
self.index.clear();
for (domain, ids) in data {
self.index
.insert(domain, ids.into_iter().map(NodeId::new).collect());
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn make_node_event(event_type: &str, node_id: &str, domain: Option<&str>) -> Event {
let mut payload = serde_json::json!({
"node_id": node_id,
"node_type": "concept",
"properties": {}
});
if let Some(d) = domain {
payload["domain"] = serde_json::json!(d);
}
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
format!("node:concept:{node_id}"),
"default".to_string(),
payload,
chrono::Utc::now(),
None,
1,
)
}
#[test]
fn test_domain_index_add_nodes() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n2",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n3",
Some("engineering"),
))
.unwrap();
let domains = proj.domains();
assert_eq!(domains.len(), 2);
assert_eq!(proj.nodes_in_domain("revenue").len(), 2);
assert_eq!(proj.nodes_in_domain("engineering").len(), 1);
assert_eq!(proj.nodes_in_domain("nonexistent").len(), 0);
}
#[test]
fn test_domain_index_no_duplicates() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_UPDATED,
"n1",
Some("revenue"),
))
.unwrap();
assert_eq!(proj.nodes_in_domain("revenue").len(), 1);
}
#[test]
fn test_domain_index_delete_node() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n2",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(event_types::NODE_DELETED, "n1", None))
.unwrap();
assert_eq!(proj.nodes_in_domain("revenue").len(), 1);
assert_eq!(proj.nodes_in_domain("revenue")[0].as_str(), "n2");
}
#[test]
fn test_domain_index_node_without_domain_ignored() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(event_types::NODE_CREATED, "n1", None))
.unwrap();
assert!(proj.domains().is_empty());
}
#[test]
fn test_domain_index_get_state() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
let state = proj.get_state("revenue").unwrap();
assert_eq!(state["count"], 1);
assert_eq!(state["node_ids"][0], "n1");
assert!(proj.get_state("nonexistent").is_none());
}
#[test]
fn test_domain_index_snapshot_restore() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n2",
Some("engineering"),
))
.unwrap();
let snapshot = proj.snapshot().unwrap();
proj.clear();
assert!(proj.domains().is_empty());
proj.restore(&snapshot).unwrap();
assert_eq!(proj.domains().len(), 2);
assert_eq!(proj.nodes_in_domain("revenue").len(), 1);
assert_eq!(proj.nodes_in_domain("engineering").len(), 1);
}
#[test]
fn test_domain_counts() {
let proj = DomainIndexProjection::new();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n1",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n2",
Some("revenue"),
))
.unwrap();
proj.process(&make_node_event(
event_types::NODE_CREATED,
"n3",
Some("engineering"),
))
.unwrap();
let counts = proj.domain_counts();
assert_eq!(counts.len(), 2);
}
}