use std::collections::{BTreeMap, HashSet, VecDeque};
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use entelix_core::{Error, ExecutionContext, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::namespace::Namespace;
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct NodeId(String);
impl NodeId {
#[must_use]
pub fn new() -> Self {
Self(Uuid::now_v7().to_string())
}
#[must_use]
pub fn from_string(s: impl Into<String>) -> Self {
Self(s.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for NodeId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct EdgeId(String);
impl EdgeId {
#[must_use]
pub fn new() -> Self {
Self(Uuid::now_v7().to_string())
}
#[must_use]
pub fn from_string(s: impl Into<String>) -> Self {
Self(s.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for EdgeId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for EdgeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[non_exhaustive]
pub enum Direction {
Outgoing,
Incoming,
Both,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct GraphHop<E> {
pub edge_id: EdgeId,
pub from: NodeId,
pub to: NodeId,
pub edge: E,
pub timestamp: DateTime<Utc>,
}
impl<E> GraphHop<E> {
#[must_use]
pub const fn new(
edge_id: EdgeId,
from: NodeId,
to: NodeId,
edge: E,
timestamp: DateTime<Utc>,
) -> Self {
Self {
edge_id,
from,
to,
edge,
timestamp,
}
}
}
#[async_trait]
pub trait GraphMemory<N, E>: Send + Sync + 'static
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
async fn add_node(&self, ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId>;
async fn add_edge(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
from: &NodeId,
to: &NodeId,
edge: E,
timestamp: DateTime<Utc>,
) -> Result<EdgeId>;
async fn add_edges_batch(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
) -> Result<Vec<EdgeId>> {
let mut ids = Vec::with_capacity(edges.len());
for (from, to, edge, timestamp) in edges {
ids.push(self.add_edge(ctx, ns, &from, &to, edge, timestamp).await?);
}
Ok(ids)
}
async fn get_node(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
id: &NodeId,
) -> Result<Option<N>>;
async fn get_edge(
&self,
_ctx: &ExecutionContext,
_ns: &Namespace,
_edge_id: &EdgeId,
) -> Result<Option<GraphHop<E>>> {
Ok(None)
}
async fn neighbors(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
node: &NodeId,
direction: Direction,
) -> Result<Vec<(EdgeId, NodeId, E)>>;
async fn traverse(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
start: &NodeId,
direction: Direction,
max_depth: usize,
) -> Result<Vec<GraphHop<E>>>;
async fn find_path(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
from: &NodeId,
to: &NodeId,
direction: Direction,
max_depth: usize,
) -> Result<Option<Vec<GraphHop<E>>>>;
async fn temporal_filter(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<Vec<GraphHop<E>>>;
async fn node_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
Ok(0)
}
async fn edge_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
Ok(0)
}
async fn delete_edge(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
edge_id: &EdgeId,
) -> Result<()>;
async fn delete_node(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
node_id: &NodeId,
) -> Result<usize>;
async fn prune_older_than(
&self,
_ctx: &ExecutionContext,
_ns: &Namespace,
_ttl: std::time::Duration,
) -> Result<usize> {
Ok(0)
}
}
#[derive(Clone, Debug)]
struct StoredEdge<E> {
id: EdgeId,
from: NodeId,
to: NodeId,
payload: E,
timestamp: DateTime<Utc>,
}
#[derive(Default)]
struct GraphTable<N, E> {
nodes: BTreeMap<NodeId, N>,
edges: BTreeMap<EdgeId, StoredEdge<E>>,
out_adj: BTreeMap<NodeId, Vec<EdgeId>>,
in_adj: BTreeMap<NodeId, Vec<EdgeId>>,
}
impl<N, E> GraphTable<N, E> {
const fn new() -> Self {
Self {
nodes: BTreeMap::new(),
edges: BTreeMap::new(),
out_adj: BTreeMap::new(),
in_adj: BTreeMap::new(),
}
}
}
type NamespaceTable<N, E> = Arc<RwLock<GraphTable<N, E>>>;
type ShardedNamespaceMap<N, E> = Arc<DashMap<String, NamespaceTable<N, E>>>;
pub struct InMemoryGraphMemory<N, E>
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
inner: ShardedNamespaceMap<N, E>,
}
impl<N, E> InMemoryGraphMemory<N, E>
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}
#[must_use]
pub fn total_nodes(&self) -> usize {
self.inner
.iter()
.map(|entry| entry.value().read().nodes.len())
.sum()
}
#[must_use]
pub fn total_edges(&self) -> usize {
self.inner
.iter()
.map(|entry| entry.value().read().edges.len())
.sum()
}
fn table_for(&self, key: &str) -> Option<NamespaceTable<N, E>> {
self.inner.get(key).map(|r| Arc::clone(r.value()))
}
fn table_for_write(&self, key: String) -> NamespaceTable<N, E> {
self.inner
.entry(key)
.or_insert_with(|| Arc::new(RwLock::new(GraphTable::new())))
.clone()
}
}
impl<N, E> Default for InMemoryGraphMemory<N, E>
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<N, E> Clone for InMemoryGraphMemory<N, E>
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
#[async_trait]
impl<N, E> GraphMemory<N, E> for InMemoryGraphMemory<N, E>
where
N: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
async fn add_node(&self, _ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId> {
let id = NodeId::new();
let table = self.table_for_write(ns.render());
table.write().nodes.insert(id.clone(), node);
Ok(id)
}
async fn add_edge(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
from: &NodeId,
to: &NodeId,
edge: E,
timestamp: DateTime<Utc>,
) -> Result<EdgeId> {
let id = EdgeId::new();
let table = self.table_for_write(ns.render());
let mut guard = table.write();
if !guard.nodes.contains_key(from) {
return Err(entelix_core::Error::invalid_request(format!(
"GraphMemory::add_edge: source node {from} does not exist"
)));
}
if !guard.nodes.contains_key(to) {
return Err(entelix_core::Error::invalid_request(format!(
"GraphMemory::add_edge: target node {to} does not exist"
)));
}
let stored = StoredEdge {
id: id.clone(),
from: from.clone(),
to: to.clone(),
payload: edge,
timestamp,
};
guard.edges.insert(id.clone(), stored);
guard
.out_adj
.entry(from.clone())
.or_default()
.push(id.clone());
guard.in_adj.entry(to.clone()).or_default().push(id.clone());
Ok(id)
}
async fn add_edges_batch(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
) -> Result<Vec<EdgeId>> {
if edges.is_empty() {
return Ok(Vec::new());
}
let table = self.table_for_write(ns.render());
let mut guard = table.write();
for (from, to, _, _) in &edges {
if !guard.nodes.contains_key(from) {
return Err(entelix_core::Error::invalid_request(format!(
"GraphMemory::add_edges_batch: source node {from} does not exist"
)));
}
if !guard.nodes.contains_key(to) {
return Err(entelix_core::Error::invalid_request(format!(
"GraphMemory::add_edges_batch: target node {to} does not exist"
)));
}
}
let mut ids = Vec::with_capacity(edges.len());
for (from, to, payload, timestamp) in edges {
let id = EdgeId::new();
let stored = StoredEdge {
id: id.clone(),
from: from.clone(),
to: to.clone(),
payload,
timestamp,
};
guard.edges.insert(id.clone(), stored);
guard.out_adj.entry(from).or_default().push(id.clone());
guard.in_adj.entry(to).or_default().push(id.clone());
ids.push(id);
}
Ok(ids)
}
async fn get_node(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
id: &NodeId,
) -> Result<Option<N>> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(None);
};
Ok(table.read().nodes.get(id).cloned())
}
async fn get_edge(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
edge_id: &EdgeId,
) -> Result<Option<GraphHop<E>>> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(None);
};
Ok(table.read().edges.get(edge_id).map(|e| GraphHop {
edge_id: e.id.clone(),
from: e.from.clone(),
to: e.to.clone(),
edge: e.payload.clone(),
timestamp: e.timestamp,
}))
}
async fn neighbors(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
node: &NodeId,
direction: Direction,
) -> Result<Vec<(EdgeId, NodeId, E)>> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(Vec::new());
};
let guard = table.read();
let mut out = Vec::new();
let mut collect = |edge_ids: &[EdgeId], pick_far: fn(&StoredEdge<E>) -> &NodeId| {
for eid in edge_ids {
if let Some(stored) = guard.edges.get(eid) {
out.push((
eid.clone(),
pick_far(stored).clone(),
stored.payload.clone(),
));
}
}
};
if matches!(direction, Direction::Outgoing | Direction::Both)
&& let Some(ids) = guard.out_adj.get(node)
{
collect(ids, |s| &s.to);
}
if matches!(direction, Direction::Incoming | Direction::Both)
&& let Some(ids) = guard.in_adj.get(node)
{
collect(ids, |s| &s.from);
}
Ok(out)
}
async fn traverse(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
start: &NodeId,
direction: Direction,
max_depth: usize,
) -> Result<Vec<GraphHop<E>>> {
if max_depth == 0 {
return Ok(Vec::new());
}
let Some(table) = self.table_for(&ns.render()) else {
return Ok(Vec::new());
};
let guard = table.read();
let mut visited: HashSet<NodeId> = HashSet::new();
visited.insert(start.clone());
let mut frontier: VecDeque<(NodeId, usize)> = VecDeque::new();
frontier.push_back((start.clone(), 0));
let mut out = Vec::new();
while let Some((current, depth)) = frontier.pop_front() {
if ctx.is_cancelled() {
return Err(Error::Cancelled);
}
if depth >= max_depth {
continue;
}
for stored in directional_edges(&guard, ¤t, direction) {
let neighbour = stored
.other_endpoint_of(¤t)
.cloned()
.unwrap_or_else(|| stored.to.clone());
if visited.insert(neighbour.clone()) {
out.push(GraphHop {
edge_id: stored.id.clone(),
from: stored.from.clone(),
to: stored.to.clone(),
edge: stored.payload.clone(),
timestamp: stored.timestamp,
});
frontier.push_back((neighbour, depth + 1));
}
}
}
Ok(out)
}
async fn find_path(
&self,
ctx: &ExecutionContext,
ns: &Namespace,
from: &NodeId,
to: &NodeId,
direction: Direction,
max_depth: usize,
) -> Result<Option<Vec<GraphHop<E>>>> {
if from == to {
return Ok(Some(Vec::new()));
}
if max_depth == 0 {
return Ok(None);
}
let Some(table) = self.table_for(&ns.render()) else {
return Ok(None);
};
let guard = table.read();
let mut parents: BTreeMap<NodeId, (EdgeId, NodeId)> = BTreeMap::new();
let mut depths: BTreeMap<NodeId, usize> = BTreeMap::new();
depths.insert(from.clone(), 0);
let mut frontier: VecDeque<NodeId> = VecDeque::new();
frontier.push_back(from.clone());
while let Some(current) = frontier.pop_front() {
if ctx.is_cancelled() {
return Err(Error::Cancelled);
}
let depth = *depths.get(¤t).unwrap_or(&0);
if depth >= max_depth {
continue;
}
for stored in directional_edges(&guard, ¤t, direction) {
let neighbour = stored
.other_endpoint_of(¤t)
.cloned()
.unwrap_or_else(|| stored.to.clone());
if depths.contains_key(&neighbour) {
continue;
}
depths.insert(neighbour.clone(), depth + 1);
parents.insert(neighbour.clone(), (stored.id.clone(), current.clone()));
if &neighbour == to {
let mut hops: Vec<GraphHop<E>> = Vec::new();
let mut cursor = to.clone();
while let Some((eid, prev)) = parents.get(&cursor).cloned() {
if let Some(stored) = guard.edges.get(&eid) {
hops.push(GraphHop {
edge_id: stored.id.clone(),
from: stored.from.clone(),
to: stored.to.clone(),
edge: stored.payload.clone(),
timestamp: stored.timestamp,
});
}
cursor = prev;
}
hops.reverse();
return Ok(Some(hops));
}
frontier.push_back(neighbour);
}
}
Ok(None)
}
async fn temporal_filter(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<Vec<GraphHop<E>>> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(Vec::new());
};
let guard = table.read();
let mut out: Vec<GraphHop<E>> = guard
.edges
.values()
.filter(|e| e.timestamp >= from && e.timestamp < to)
.map(|e| GraphHop {
edge_id: e.id.clone(),
from: e.from.clone(),
to: e.to.clone(),
edge: e.payload.clone(),
timestamp: e.timestamp,
})
.collect();
out.sort_by_key(|hop| hop.timestamp);
Ok(out)
}
async fn node_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(0);
};
Ok(table.read().nodes.len())
}
async fn edge_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(0);
};
Ok(table.read().edges.len())
}
async fn delete_edge(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
edge_id: &EdgeId,
) -> Result<()> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(());
};
let mut guard = table.write();
if let Some(edge) = guard.edges.remove(edge_id) {
if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
out_list.retain(|e| e != &edge.id);
}
if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
in_list.retain(|e| e != &edge.id);
}
}
Ok(())
}
async fn delete_node(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
node_id: &NodeId,
) -> Result<usize> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(0);
};
let mut guard = table.write();
let mut incident: HashSet<EdgeId> = HashSet::new();
if let Some(out_list) = guard.out_adj.get(node_id) {
for id in out_list {
incident.insert(id.clone());
}
}
if let Some(in_list) = guard.in_adj.get(node_id) {
for id in in_list {
incident.insert(id.clone());
}
}
let removed = incident.len();
for edge_id in incident {
if let Some(edge) = guard.edges.remove(&edge_id) {
if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
out_list.retain(|e| e != &edge.id);
}
if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
in_list.retain(|e| e != &edge.id);
}
}
}
guard.nodes.remove(node_id);
guard.out_adj.remove(node_id);
guard.in_adj.remove(node_id);
Ok(removed)
}
async fn prune_older_than(
&self,
_ctx: &ExecutionContext,
ns: &Namespace,
ttl: std::time::Duration,
) -> Result<usize> {
let Some(table) = self.table_for(&ns.render()) else {
return Ok(0);
};
let cutoff = Utc::now() - chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
let mut guard = table.write();
let stale: Vec<EdgeId> = guard
.edges
.iter()
.filter(|(_, e)| e.timestamp < cutoff)
.map(|(id, _)| id.clone())
.collect();
let removed = stale.len();
for id in stale {
if let Some(edge) = guard.edges.remove(&id) {
if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
out_list.retain(|e| e != &edge.id);
}
if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
in_list.retain(|e| e != &edge.id);
}
}
}
Ok(removed)
}
}
impl<E> StoredEdge<E> {
fn other_endpoint_of(&self, node: &NodeId) -> Option<&NodeId> {
if &self.from == node {
Some(&self.to)
} else if &self.to == node {
Some(&self.from)
} else {
None
}
}
}
fn directional_edges<'a, N, E>(
table: &'a GraphTable<N, E>,
node: &NodeId,
direction: Direction,
) -> Vec<&'a StoredEdge<E>> {
let mut out: Vec<&'a StoredEdge<E>> = Vec::new();
if matches!(direction, Direction::Outgoing | Direction::Both)
&& let Some(ids) = table.out_adj.get(node)
{
for eid in ids {
if let Some(stored) = table.edges.get(eid) {
out.push(stored);
}
}
}
if matches!(direction, Direction::Incoming | Direction::Both)
&& let Some(ids) = table.in_adj.get(node)
{
for eid in ids {
if let Some(stored) = table.edges.get(eid) {
out.push(stored);
}
}
}
out
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::many_single_char_names
)]
mod tests {
use super::*;
use entelix_core::TenantId;
fn ns() -> Namespace {
Namespace::new(TenantId::new("tenant")).with_scope("graph")
}
#[tokio::test]
async fn add_and_lookup_node() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let id = g.add_node(&ctx, &ns(), "alice").await.unwrap();
let fetched = g.get_node(&ctx, &ns(), &id).await.unwrap();
assert_eq!(fetched, Some("alice"));
}
#[tokio::test]
async fn add_edges_batch_inserts_all_atomically() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
let carol = g.add_node(&ctx, &ns(), "carol").await.unwrap();
let now = Utc::now();
let ids = g
.add_edges_batch(
&ctx,
&ns(),
vec![
(alice.clone(), bob.clone(), "knows", now),
(bob.clone(), carol.clone(), "knows", now),
(alice.clone(), carol.clone(), "knows", now),
],
)
.await
.unwrap();
assert_eq!(ids.len(), 3, "returns one EdgeId per input");
for id in &ids {
let hop = g.get_edge(&ctx, &ns(), id).await.unwrap();
assert!(hop.is_some(), "edge {id} must be retrievable");
}
}
#[tokio::test]
async fn add_edges_batch_rejects_unknown_endpoint_without_partial_writes() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
let ghost = NodeId::new();
let now = Utc::now();
let err = g
.add_edges_batch(
&ctx,
&ns(),
vec![
(alice.clone(), bob.clone(), "knows", now),
(alice.clone(), ghost, "knows", now), ],
)
.await;
assert!(err.is_err(), "batch with unknown endpoint must fail");
assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
}
#[tokio::test]
async fn add_edges_batch_empty_input_is_a_noop() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let ids = g.add_edges_batch(&ctx, &ns(), Vec::new()).await.unwrap();
assert!(ids.is_empty());
}
#[tokio::test]
async fn add_edge_requires_existing_endpoints() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
let ghost = NodeId::new();
let err = g
.add_edge(&ctx, &ns(), &alice, &ghost, "knows", Utc::now())
.await;
assert!(err.is_err());
}
#[tokio::test]
async fn neighbors_split_by_direction() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
let _eid = g
.add_edge(&ctx, &ns(), &alice, &bob, "knows", Utc::now())
.await
.unwrap();
let outgoing = g
.neighbors(&ctx, &ns(), &alice, Direction::Outgoing)
.await
.unwrap();
assert_eq!(outgoing.len(), 1);
let incoming = g
.neighbors(&ctx, &ns(), &alice, Direction::Incoming)
.await
.unwrap();
assert!(incoming.is_empty());
}
#[tokio::test]
async fn traverse_respects_max_depth() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let d = g.add_node(&ctx, &ns(), "d").await.unwrap();
let now = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
g.add_edge(&ctx, &ns(), &b, &c, "->", now).await.unwrap();
g.add_edge(&ctx, &ns(), &c, &d, "->", now).await.unwrap();
let two = g
.traverse(&ctx, &ns(), &a, Direction::Outgoing, 2)
.await
.unwrap();
assert_eq!(two.len(), 2);
let three = g
.traverse(&ctx, &ns(), &a, Direction::Outgoing, 3)
.await
.unwrap();
assert_eq!(three.len(), 3);
}
#[tokio::test]
async fn traverse_with_direction_both_walks_inverse_edges() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let now = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
g.add_edge(&ctx, &ns(), &c, &b, "->", now).await.unwrap();
let from_b = g
.traverse(&ctx, &ns(), &b, Direction::Both, 1)
.await
.unwrap();
assert_eq!(from_b.len(), 2);
}
#[tokio::test]
async fn find_path_returns_shortest() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let now = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
g.add_edge(&ctx, &ns(), &b, &c, "bc", now).await.unwrap();
let path = g
.find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
.await
.unwrap();
let hops = path.unwrap();
assert_eq!(hops.len(), 2);
assert_eq!(hops[0].from, a);
assert_eq!(hops[1].to, c);
}
#[tokio::test]
async fn temporal_filter_picks_window() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let early = Utc::now() - chrono::Duration::hours(2);
let late = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "early", early)
.await
.unwrap();
g.add_edge(&ctx, &ns(), &a, &b, "late", late).await.unwrap();
let window = g
.temporal_filter(
&ctx,
&ns(),
Utc::now() - chrono::Duration::hours(1),
Utc::now() + chrono::Duration::hours(1),
)
.await
.unwrap();
assert_eq!(window.len(), 1);
assert_eq!(window[0].edge, "late");
}
#[tokio::test]
async fn node_count_and_edge_count_track_inserts() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 0);
assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 2);
assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
let _ = g
.add_edge(&ctx, &ns(), &a, &b, "ab", Utc::now())
.await
.unwrap();
assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 1);
}
#[tokio::test]
async fn count_methods_respect_namespace_isolation() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
let _ = g.add_node(&ctx, &alpha, "n").await.unwrap();
assert_eq!(g.node_count(&ctx, &alpha).await.unwrap(), 1);
assert_eq!(g.node_count(&ctx, &beta).await.unwrap(), 0);
}
#[tokio::test]
async fn delete_edge_is_idempotent_and_dedups_adjacency() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let now = Utc::now();
let id = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
g.delete_edge(&ctx, &ns(), &id).await.unwrap();
g.delete_edge(&ctx, &ns(), &id).await.unwrap();
let outgoing = g
.neighbors(&ctx, &ns(), &a, Direction::Outgoing)
.await
.unwrap();
assert!(outgoing.is_empty());
let incoming = g
.neighbors(&ctx, &ns(), &b, Direction::Incoming)
.await
.unwrap();
assert!(incoming.is_empty());
}
#[tokio::test]
async fn delete_node_cascades_to_incident_edges() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let now = Utc::now();
let _ = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
let _ = g.add_edge(&ctx, &ns(), &a, &c, "ac", now).await.unwrap();
let _ = g.add_edge(&ctx, &ns(), &b, &a, "ba", now).await.unwrap();
let removed = g.delete_node(&ctx, &ns(), &a).await.unwrap();
assert_eq!(removed, 3);
assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_none());
assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
assert!(g.get_node(&ctx, &ns(), &c).await.unwrap().is_some());
let b_in = g
.neighbors(&ctx, &ns(), &b, Direction::Incoming)
.await
.unwrap();
assert!(b_in.is_empty());
}
#[tokio::test]
async fn delete_node_with_self_loop_dedups_count() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let _ = g
.add_edge(&ctx, &ns(), &a, &a, "self", Utc::now())
.await
.unwrap();
assert_eq!(g.delete_node(&ctx, &ns(), &a).await.unwrap(), 1);
}
#[tokio::test]
async fn delete_node_on_absent_node_is_zero_noop() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let phantom = NodeId::from_string("does-not-exist");
assert_eq!(g.delete_node(&ctx, &ns(), &phantom).await.unwrap(), 0);
}
#[tokio::test]
async fn prune_older_than_drops_stale_edges_only() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let now = Utc::now();
let _old = g
.add_edge(
&ctx,
&ns(),
&a,
&b,
"old",
now - chrono::Duration::seconds(120),
)
.await
.unwrap();
let _fresh = g
.add_edge(
&ctx,
&ns(),
&a,
&b,
"fresh",
now - chrono::Duration::seconds(5),
)
.await
.unwrap();
let removed = g
.prune_older_than(&ctx, &ns(), std::time::Duration::from_mins(1))
.await
.unwrap();
assert_eq!(removed, 1);
assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_some());
assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
let outgoing = g
.neighbors(&ctx, &ns(), &a, Direction::Outgoing)
.await
.unwrap();
assert_eq!(outgoing.len(), 1);
assert_eq!(outgoing[0].2, "fresh");
}
#[tokio::test]
async fn prune_older_than_on_empty_namespace_is_noop() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let removed = g
.prune_older_than(&ctx, &ns(), std::time::Duration::from_secs(0))
.await
.unwrap();
assert_eq!(removed, 0);
}
#[tokio::test]
async fn namespaces_are_isolated() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
let _ = g.add_node(&ctx, &alpha, "a-node").await.unwrap();
let _ = g.add_node(&ctx, &beta, "b-node").await.unwrap();
assert_eq!(g.total_nodes(), 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn distinct_namespaces_write_concurrently() {
let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
let mut handles = Vec::new();
for tenant in 0..8 {
let g = g.clone();
handles.push(tokio::spawn(async move {
let ctx = ExecutionContext::new();
let ns = Namespace::new(TenantId::new(format!("tenant-{tenant}")));
let mut ids = Vec::new();
for i in 0..50 {
let id = g
.add_node(&ctx, &ns, format!("t{tenant}-n{i}"))
.await
.unwrap();
ids.push(id);
}
let now = Utc::now();
for window in ids.windows(2) {
g.add_edge(
&ctx,
&ns,
&window[0],
&window[1],
format!("t{tenant}-edge"),
now,
)
.await
.unwrap();
}
}));
}
for h in handles {
h.await.unwrap();
}
assert_eq!(g.total_nodes(), 8 * 50);
assert_eq!(g.total_edges(), 8 * 49);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn read_during_write_on_other_namespace_does_not_block() {
let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
let alpha = Namespace::new(TenantId::new("alpha"));
let beta = Namespace::new(TenantId::new("beta"));
let ctx = ExecutionContext::new();
let beta_node_id = g
.add_node(&ctx, &beta, "beta-fixture".to_owned())
.await
.unwrap();
let g_writer = g.clone();
let alpha_writer = alpha.clone();
let writer = tokio::spawn(async move {
let ctx = ExecutionContext::new();
for i in 0..200 {
g_writer
.add_node(&ctx, &alpha_writer, format!("alpha-{i}"))
.await
.unwrap();
}
});
let mut reads = Vec::new();
for _ in 0..200 {
let g_reader = g.clone();
let beta_reader = beta.clone();
let id_reader = beta_node_id.clone();
reads.push(tokio::spawn(async move {
let ctx = ExecutionContext::new();
g_reader
.get_node(&ctx, &beta_reader, &id_reader)
.await
.unwrap()
}));
}
for r in reads {
assert_eq!(r.await.unwrap().as_deref(), Some("beta-fixture"));
}
writer.await.unwrap();
assert_eq!(g.total_nodes(), 1 + 200);
}
#[tokio::test]
async fn traverse_short_circuits_on_cancellation() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let now = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "knows", now).await.unwrap();
g.add_edge(&ctx, &ns(), &b, &c, "knows", now).await.unwrap();
ctx.cancellation().cancel();
let err = g
.traverse(&ctx, &ns(), &a, Direction::Outgoing, 5)
.await
.unwrap_err();
assert!(matches!(err, Error::Cancelled), "got {err:?}");
}
#[tokio::test]
async fn find_path_short_circuits_on_cancellation() {
let g = InMemoryGraphMemory::<&str, &str>::new();
let ctx = ExecutionContext::new();
let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
let now = Utc::now();
g.add_edge(&ctx, &ns(), &a, &b, "e", now).await.unwrap();
g.add_edge(&ctx, &ns(), &b, &c, "e", now).await.unwrap();
ctx.cancellation().cancel();
let err = g
.find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
.await
.unwrap_err();
assert!(matches!(err, Error::Cancelled), "got {err:?}");
}
}