use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
prime::types::event_types,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AdjEntry {
pub relation: String,
pub peer: String,
pub edge_id: String,
pub weight: Option<f64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdjacencyDirection {
Forward,
Reverse,
}
pub struct DirectedAdjacencyProjection {
name: String,
direction: AdjacencyDirection,
adj: Arc<DashMap<String, Vec<AdjEntry>>>,
edge_index: Arc<DashMap<String, String>>,
}
impl DirectedAdjacencyProjection {
pub fn new(name: impl Into<String>, direction: AdjacencyDirection) -> Self {
Self {
name: name.into(),
direction,
adj: Arc::new(DashMap::new()),
edge_index: Arc::new(DashMap::new()),
}
}
pub fn forward(name: impl Into<String>) -> Self {
Self::new(name, AdjacencyDirection::Forward)
}
pub fn reverse(name: impl Into<String>) -> Self {
Self::new(name, AdjacencyDirection::Reverse)
}
pub fn entries(&self, key: &str) -> Vec<AdjEntry> {
self.adj.get(key).map(|v| v.clone()).unwrap_or_default()
}
pub fn outgoing(&self, source: &str) -> Vec<AdjEntry> {
self.entries(source)
}
pub fn incoming(&self, target: &str) -> Vec<AdjEntry> {
self.entries(target)
}
fn extract_key_peer(&self, payload: &Value) -> (String, String) {
let source = payload
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let target = payload
.get("target")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
match self.direction {
AdjacencyDirection::Forward => (source, target),
AdjacencyDirection::Reverse => (target, source),
}
}
}
impl Projection for DirectedAdjacencyProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
let event_type = event.event_type_str();
let payload = &event.payload;
match event_type {
event_types::EDGE_CREATED => {
let (key, peer) = self.extract_key_peer(payload);
let relation = payload
.get("relation")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let edge_id = payload
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let weight = payload.get("weight").and_then(serde_json::Value::as_f64);
self.edge_index.insert(edge_id.clone(), key.clone());
self.adj.entry(key).or_default().push(AdjEntry {
relation,
peer,
edge_id,
weight,
});
}
event_types::EDGE_DELETED => {
let edge_id = payload.get("id").and_then(|v| v.as_str()).unwrap_or("");
if let Some((_, key)) = self.edge_index.remove(edge_id)
&& let Some(mut entries) = self.adj.get_mut(&key)
{
entries.retain(|e| e.edge_id != edge_id);
}
}
_ => {}
}
Ok(())
}
fn get_state(&self, key: &str) -> Option<Value> {
self.adj
.get(key)
.map(|v| serde_json::to_value(v.value()).unwrap_or(Value::Null))
}
fn clear(&self) {
self.adj.clear();
self.edge_index.clear();
}
fn snapshot(&self) -> Option<Value> {
let entries: Vec<(String, Vec<AdjEntry>)> = self
.adj
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
serde_json::to_value(entries).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<(String, Vec<AdjEntry>)> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.adj.clear();
self.edge_index.clear();
for (key, adj_entries) in entries {
for entry in &adj_entries {
self.edge_index.insert(entry.edge_id.clone(), key.clone());
}
self.adj.insert(key, adj_entries);
}
Ok(())
}
}
pub type AdjacencyListProjection = DirectedAdjacencyProjection;
pub type ReverseIndexProjection = DirectedAdjacencyProjection;
impl AdjacencyListProjection {
pub fn new_forward(name: impl Into<String>) -> Self {
Self::forward(name)
}
}
impl ReverseIndexProjection {
pub fn new_reverse(name: impl Into<String>) -> Self {
Self::reverse(name)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
fn make_event(entity_id: &str, event_type: &str, payload: Value) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
Utc::now(),
None,
1,
)
}
fn edge_created(id: &str, source: &str, target: &str, relation: &str) -> Event {
make_event(
&format!("edge:{id}"),
event_types::EDGE_CREATED,
serde_json::json!({
"id": id,
"source": source,
"target": target,
"relation": relation,
}),
)
}
fn edge_deleted(id: &str) -> Event {
make_event(
&format!("edge:{id}"),
event_types::EDGE_DELETED,
serde_json::json!({"id": id}),
)
}
#[test]
fn test_forward_and_reverse() {
let fwd = DirectedAdjacencyProjection::forward("fwd");
let rev = DirectedAdjacencyProjection::reverse("rev");
let events = vec![
edge_created("e1", "A", "B", "knows"),
edge_created("e2", "A", "C", "knows"),
edge_created("e3", "D", "B", "works_with"),
];
for event in &events {
fwd.process(event).unwrap();
rev.process(event).unwrap();
}
let a_out = fwd.outgoing("A");
assert_eq!(a_out.len(), 2);
let peers: Vec<&str> = a_out.iter().map(|e| e.peer.as_str()).collect();
assert!(peers.contains(&"B"));
assert!(peers.contains(&"C"));
let b_in = rev.incoming("B");
assert_eq!(b_in.len(), 2);
let sources: Vec<&str> = b_in.iter().map(|e| e.peer.as_str()).collect();
assert!(sources.contains(&"A"));
assert!(sources.contains(&"D"));
}
#[test]
fn test_o1_edge_deletion() {
let fwd = DirectedAdjacencyProjection::forward("fwd");
let rev = DirectedAdjacencyProjection::reverse("rev");
let create = edge_created("e1", "A", "B", "knows");
fwd.process(&create).unwrap();
rev.process(&create).unwrap();
assert_eq!(fwd.outgoing("A").len(), 1);
assert_eq!(rev.incoming("B").len(), 1);
let delete = edge_deleted("e1");
fwd.process(&delete).unwrap();
rev.process(&delete).unwrap();
assert_eq!(fwd.outgoing("A").len(), 0);
assert_eq!(rev.incoming("B").len(), 0);
}
#[test]
fn test_snapshot_restore_preserves_edge_index() {
let fwd = DirectedAdjacencyProjection::forward("fwd");
fwd.process(&edge_created("e1", "X", "Y", "links")).unwrap();
fwd.process(&edge_created("e2", "X", "Z", "links")).unwrap();
let snap = fwd.snapshot().unwrap();
fwd.clear();
assert!(fwd.outgoing("X").is_empty());
fwd.restore(&snap).unwrap();
assert_eq!(fwd.outgoing("X").len(), 2);
fwd.process(&edge_deleted("e1")).unwrap();
assert_eq!(fwd.outgoing("X").len(), 1);
assert_eq!(fwd.outgoing("X")[0].peer, "Z");
}
#[test]
fn test_delete_nonexistent_edge_is_noop() {
let fwd = DirectedAdjacencyProjection::forward("fwd");
fwd.process(&edge_created("e1", "A", "B", "knows")).unwrap();
fwd.process(&edge_deleted("e-nonexistent")).unwrap();
assert_eq!(fwd.outgoing("A").len(), 1);
}
#[test]
fn test_backward_compat_type_aliases() {
let adj = AdjacencyListProjection::new_forward("adj");
let rev = ReverseIndexProjection::new_reverse("rev");
let create = edge_created("e1", "X", "Y", "links");
adj.process(&create).unwrap();
rev.process(&create).unwrap();
assert_eq!(adj.outgoing("X").len(), 1);
assert_eq!(rev.incoming("Y").len(), 1);
}
}