use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
prime::types::event_types,
};
pub const PROJ_CROSS_DOMAIN: &str = "prime.cross_domain";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrossDomainLink {
pub domain_a: String,
pub domain_b: String,
pub edge_count: usize,
pub sample_relations: Vec<String>,
}
fn domain_pair_key(a: &str, b: &str) -> (String, String) {
if a <= b {
(a.to_string(), b.to_string())
} else {
(b.to_string(), a.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CrossDomainEntry {
edges: Vec<String>, relations: Vec<String>, }
pub struct CrossDomainProjection {
name: String,
links: Arc<DashMap<(String, String), CrossDomainEntry>>,
node_domains: Arc<DashMap<String, String>>,
}
impl CrossDomainProjection {
pub fn new() -> Self {
Self {
name: PROJ_CROSS_DOMAIN.to_string(),
links: Arc::new(DashMap::new()),
node_domains: Arc::new(DashMap::new()),
}
}
pub fn cross_domain_links(&self) -> Vec<CrossDomainLink> {
self.links
.iter()
.map(|entry| {
let (domain_a, domain_b) = entry.key();
let val = entry.value();
CrossDomainLink {
domain_a: domain_a.clone(),
domain_b: domain_b.clone(),
edge_count: val.edges.len(),
sample_relations: val.relations.clone(),
}
})
.collect()
}
pub fn link_between(&self, domain_a: &str, domain_b: &str) -> Option<CrossDomainLink> {
let key = domain_pair_key(domain_a, domain_b);
self.links.get(&key).map(|entry| {
let val = entry.value();
CrossDomainLink {
domain_a: key.0.clone(),
domain_b: key.1.clone(),
edge_count: val.edges.len(),
sample_relations: val.relations.clone(),
}
})
}
pub fn connected_domains(&self) -> Vec<String> {
let mut domains = std::collections::HashSet::new();
for entry in self.links.iter() {
domains.insert(entry.key().0.clone());
domains.insert(entry.key().1.clone());
}
let mut result: Vec<String> = domains.into_iter().collect();
result.sort();
result
}
}
impl Default for CrossDomainProjection {
fn default() -> Self {
Self::new()
}
}
impl Projection for CrossDomainProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
let event_type = event.event_type_str();
match event_type {
event_types::NODE_CREATED | event_types::NODE_UPDATED => {
if let (Some(node_id), Some(domain)) = (
event.payload.get("node_id").and_then(Value::as_str),
event.payload.get("domain").and_then(Value::as_str),
) {
self.node_domains
.insert(node_id.to_string(), domain.to_string());
}
}
event_types::EDGE_CREATED => {
if let (Some(edge_id), Some(source_id), Some(target_id)) = (
event.payload.get("edge_id").and_then(Value::as_str),
event.payload.get("source").and_then(Value::as_str),
event.payload.get("target").and_then(Value::as_str),
) {
let source_domain = self.node_domains.get(source_id).map(|d| d.clone());
let target_domain = self.node_domains.get(target_id).map(|d| d.clone());
if let (Some(sd), Some(td)) = (source_domain, target_domain) {
if sd != td {
let key = domain_pair_key(&sd, &td);
let relation = event
.payload
.get("relation")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
self.links
.entry(key)
.and_modify(|entry| {
if !entry.edges.contains(&edge_id.to_string()) {
entry.edges.push(edge_id.to_string());
}
if entry.relations.len() < 5
&& !entry.relations.contains(&relation)
{
entry.relations.push(relation.clone());
}
})
.or_insert_with(|| CrossDomainEntry {
edges: vec![edge_id.to_string()],
relations: vec![relation],
});
}
}
}
}
event_types::EDGE_DELETED => {
if let Some(edge_id) = event.payload.get("edge_id").and_then(Value::as_str) {
let edge_str = edge_id.to_string();
let mut empty_keys = Vec::new();
for mut entry in self.links.iter_mut() {
entry.value_mut().edges.retain(|e| e != &edge_str);
if entry.value().edges.is_empty() {
empty_keys.push(entry.key().clone());
}
}
for key in empty_keys {
self.links.remove(&key);
}
}
}
_ => {}
}
Ok(())
}
fn get_state(&self, domain_pair: &str) -> Option<Value> {
let parts: Vec<&str> = domain_pair.split(':').collect();
if parts.len() == 2 {
self.link_between(parts[0], parts[1])
.map(|link| serde_json::to_value(link).unwrap_or_default())
} else {
None
}
}
fn clear(&self) {
self.links.clear();
self.node_domains.clear();
}
fn snapshot(&self) -> Option<Value> {
let links: std::collections::HashMap<String, CrossDomainEntry> = self
.links
.iter()
.map(|e| {
let key = format!("{}:{}", e.key().0, e.key().1);
(key, e.value().clone())
})
.collect();
let domains: std::collections::HashMap<String, String> = self
.node_domains
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
Some(serde_json::json!({
"links": links,
"node_domains": domains,
}))
}
fn restore(&self, snapshot: &Value) -> Result<()> {
self.clear();
if let Some(Ok(data)) = snapshot.get("links").map(|v| {
serde_json::from_value::<std::collections::HashMap<String, CrossDomainEntry>>(v.clone())
}) {
for (key_str, entry) in data {
let parts: Vec<&str> = key_str.split(':').collect();
if parts.len() == 2 {
self.links
.insert((parts[0].to_string(), parts[1].to_string()), entry);
}
}
}
if let Some(Ok(data)) = snapshot
.get("node_domains")
.map(|v| serde_json::from_value::<std::collections::HashMap<String, String>>(v.clone()))
{
for (node_id, domain) in data {
self.node_domains.insert(node_id, domain);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn make_node_event(node_id: &str, domain: &str) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_types::NODE_CREATED.to_string(),
format!("node:concept:{node_id}"),
"default".to_string(),
serde_json::json!({
"node_id": node_id,
"node_type": "concept",
"domain": domain,
"properties": {}
}),
chrono::Utc::now(),
None,
1,
)
}
fn make_edge_event(
event_type: &str,
edge_id: &str,
source: &str,
target: &str,
relation: &str,
) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
format!("edge:{edge_id}"),
"default".to_string(),
serde_json::json!({
"edge_id": edge_id,
"source": source,
"target": target,
"relation": relation,
}),
chrono::Utc::now(),
None,
1,
)
}
#[test]
fn test_cross_domain_link_recorded() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
let links = proj.cross_domain_links();
assert_eq!(links.len(), 1);
assert_eq!(links[0].edge_count, 1);
assert!(links[0].sample_relations.contains(&"impacts".to_string()));
}
#[test]
fn test_same_domain_not_recorded() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "engineering")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"collaborates",
))
.unwrap();
assert!(proj.cross_domain_links().is_empty());
}
#[test]
fn test_domain_pair_key_canonical() {
let k1 = domain_pair_key("revenue", "engineering");
let k2 = domain_pair_key("engineering", "revenue");
assert_eq!(k1, k2);
assert_eq!(k1.0, "engineering"); }
#[test]
fn test_multiple_edges_between_domains() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_node_event("n3", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e2",
"n1",
"n3",
"depends_on",
))
.unwrap();
let links = proj.cross_domain_links();
assert_eq!(links.len(), 1);
assert_eq!(links[0].edge_count, 2);
assert_eq!(links[0].sample_relations.len(), 2);
}
#[test]
fn test_edge_deleted_removes_link() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
assert_eq!(proj.cross_domain_links().len(), 1);
proj.process(&make_edge_event(
event_types::EDGE_DELETED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
assert!(proj.cross_domain_links().is_empty());
}
#[test]
fn test_link_between() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
assert!(proj.link_between("revenue", "engineering").is_some());
assert!(proj.link_between("engineering", "revenue").is_some());
assert!(proj.link_between("revenue", "sales").is_none());
}
#[test]
fn test_connected_domains() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_node_event("n3", "sales")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
let connected = proj.connected_domains();
assert_eq!(connected.len(), 2);
assert!(connected.contains(&"engineering".to_string()));
assert!(connected.contains(&"revenue".to_string()));
assert!(!connected.contains(&"sales".to_string()));
}
#[test]
fn test_snapshot_restore() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
"e1",
"n1",
"n2",
"impacts",
))
.unwrap();
let snapshot = proj.snapshot().unwrap();
proj.clear();
assert!(proj.cross_domain_links().is_empty());
proj.restore(&snapshot).unwrap();
assert_eq!(proj.cross_domain_links().len(), 1);
assert!(proj.link_between("revenue", "engineering").is_some());
}
#[test]
fn test_sample_relations_capped_at_5() {
let proj = CrossDomainProjection::new();
proj.process(&make_node_event("n1", "revenue")).unwrap();
proj.process(&make_node_event("n2", "engineering")).unwrap();
for i in 0..8 {
proj.process(&make_edge_event(
event_types::EDGE_CREATED,
&format!("e{i}"),
"n1",
"n2",
&format!("relation_{i}"),
))
.unwrap();
}
let links = proj.cross_domain_links();
assert_eq!(links[0].edge_count, 8);
assert_eq!(links[0].sample_relations.len(), 5); }
}