1use std::collections::{BTreeMap, HashSet, VecDeque};
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use chrono::{DateTime, Utc};
37use dashmap::DashMap;
38use entelix_core::{Error, ExecutionContext, Result};
39use parking_lot::RwLock;
40use serde::{Deserialize, Serialize};
41use uuid::Uuid;
42
43use crate::namespace::Namespace;
44
45#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
47pub struct NodeId(String);
48
49impl NodeId {
50 #[must_use]
52 pub fn new() -> Self {
53 Self(Uuid::now_v7().to_string())
54 }
55
56 #[must_use]
60 pub fn from_string(s: impl Into<String>) -> Self {
61 Self(s.into())
62 }
63
64 #[must_use]
66 pub fn as_str(&self) -> &str {
67 &self.0
68 }
69}
70
71impl Default for NodeId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl std::fmt::Display for NodeId {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.write_str(&self.0)
80 }
81}
82
83#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
85pub struct EdgeId(String);
86
87impl EdgeId {
88 #[must_use]
90 pub fn new() -> Self {
91 Self(Uuid::now_v7().to_string())
92 }
93
94 #[must_use]
96 pub fn from_string(s: impl Into<String>) -> Self {
97 Self(s.into())
98 }
99
100 #[must_use]
102 pub fn as_str(&self) -> &str {
103 &self.0
104 }
105}
106
107impl Default for EdgeId {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl std::fmt::Display for EdgeId {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.write_str(&self.0)
116 }
117}
118
119#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
122#[non_exhaustive]
123pub enum Direction {
124 Outgoing,
126 Incoming,
128 Both,
130}
131
132#[derive(Clone, Debug)]
135#[non_exhaustive]
136pub struct GraphHop<E> {
137 pub edge_id: EdgeId,
139 pub from: NodeId,
141 pub to: NodeId,
143 pub edge: E,
145 pub timestamp: DateTime<Utc>,
149}
150
151impl<E> GraphHop<E> {
152 #[must_use]
157 pub const fn new(
158 edge_id: EdgeId,
159 from: NodeId,
160 to: NodeId,
161 edge: E,
162 timestamp: DateTime<Utc>,
163 ) -> Self {
164 Self {
165 edge_id,
166 from,
167 to,
168 edge,
169 timestamp,
170 }
171 }
172}
173
174#[async_trait]
179pub trait GraphMemory<N, E>: Send + Sync + 'static
180where
181 N: Clone + Send + Sync + 'static,
182 E: Clone + Send + Sync + 'static,
183{
184 async fn add_node(&self, ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId>;
186
187 async fn add_edge(
191 &self,
192 ctx: &ExecutionContext,
193 ns: &Namespace,
194 from: &NodeId,
195 to: &NodeId,
196 edge: E,
197 timestamp: DateTime<Utc>,
198 ) -> Result<EdgeId>;
199
200 async fn add_edges_batch(
212 &self,
213 ctx: &ExecutionContext,
214 ns: &Namespace,
215 edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
216 ) -> Result<Vec<EdgeId>> {
217 let mut ids = Vec::with_capacity(edges.len());
218 for (from, to, edge, timestamp) in edges {
219 ids.push(self.add_edge(ctx, ns, &from, &to, edge, timestamp).await?);
220 }
221 Ok(ids)
222 }
223
224 async fn get_node(
228 &self,
229 ctx: &ExecutionContext,
230 ns: &Namespace,
231 id: &NodeId,
232 ) -> Result<Option<N>>;
233
234 async fn get_edge(
249 &self,
250 _ctx: &ExecutionContext,
251 _ns: &Namespace,
252 _edge_id: &EdgeId,
253 ) -> Result<Option<GraphHop<E>>> {
254 Ok(None)
255 }
256
257 async fn neighbors(
260 &self,
261 ctx: &ExecutionContext,
262 ns: &Namespace,
263 node: &NodeId,
264 direction: Direction,
265 ) -> Result<Vec<(EdgeId, NodeId, E)>>;
266
267 async fn traverse(
275 &self,
276 ctx: &ExecutionContext,
277 ns: &Namespace,
278 start: &NodeId,
279 direction: Direction,
280 max_depth: usize,
281 ) -> Result<Vec<GraphHop<E>>>;
282
283 async fn find_path(
289 &self,
290 ctx: &ExecutionContext,
291 ns: &Namespace,
292 from: &NodeId,
293 to: &NodeId,
294 direction: Direction,
295 max_depth: usize,
296 ) -> Result<Option<Vec<GraphHop<E>>>>;
297
298 async fn temporal_filter(
302 &self,
303 ctx: &ExecutionContext,
304 ns: &Namespace,
305 from: DateTime<Utc>,
306 to: DateTime<Utc>,
307 ) -> Result<Vec<GraphHop<E>>>;
308
309 async fn node_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
314 Ok(0)
315 }
316
317 async fn edge_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
321 Ok(0)
322 }
323
324 async fn delete_edge(
329 &self,
330 ctx: &ExecutionContext,
331 ns: &Namespace,
332 edge_id: &EdgeId,
333 ) -> Result<()>;
334
335 async fn delete_node(
350 &self,
351 ctx: &ExecutionContext,
352 ns: &Namespace,
353 node_id: &NodeId,
354 ) -> Result<usize>;
355
356 async fn prune_older_than(
374 &self,
375 _ctx: &ExecutionContext,
376 _ns: &Namespace,
377 _ttl: std::time::Duration,
378 ) -> Result<usize> {
379 Ok(0)
380 }
381}
382
383#[derive(Clone, Debug)]
388struct StoredEdge<E> {
389 id: EdgeId,
390 from: NodeId,
391 to: NodeId,
392 payload: E,
393 timestamp: DateTime<Utc>,
394}
395
396#[derive(Default)]
398struct GraphTable<N, E> {
399 nodes: BTreeMap<NodeId, N>,
400 edges: BTreeMap<EdgeId, StoredEdge<E>>,
401 out_adj: BTreeMap<NodeId, Vec<EdgeId>>,
402 in_adj: BTreeMap<NodeId, Vec<EdgeId>>,
403}
404
405impl<N, E> GraphTable<N, E> {
406 const fn new() -> Self {
407 Self {
408 nodes: BTreeMap::new(),
409 edges: BTreeMap::new(),
410 out_adj: BTreeMap::new(),
411 in_adj: BTreeMap::new(),
412 }
413 }
414}
415
416type NamespaceTable<N, E> = Arc<RwLock<GraphTable<N, E>>>;
420
421type ShardedNamespaceMap<N, E> = Arc<DashMap<String, NamespaceTable<N, E>>>;
425
426pub struct InMemoryGraphMemory<N, E>
444where
445 N: Clone + Send + Sync + 'static,
446 E: Clone + Send + Sync + 'static,
447{
448 inner: ShardedNamespaceMap<N, E>,
449}
450
451impl<N, E> InMemoryGraphMemory<N, E>
452where
453 N: Clone + Send + Sync + 'static,
454 E: Clone + Send + Sync + 'static,
455{
456 #[must_use]
458 pub fn new() -> Self {
459 Self {
460 inner: Arc::new(DashMap::new()),
461 }
462 }
463
464 #[must_use]
468 pub fn total_nodes(&self) -> usize {
469 self.inner
470 .iter()
471 .map(|entry| entry.value().read().nodes.len())
472 .sum()
473 }
474
475 #[must_use]
477 pub fn total_edges(&self) -> usize {
478 self.inner
479 .iter()
480 .map(|entry| entry.value().read().edges.len())
481 .sum()
482 }
483
484 fn table_for(&self, key: &str) -> Option<NamespaceTable<N, E>> {
487 self.inner.get(key).map(|r| Arc::clone(r.value()))
488 }
489
490 fn table_for_write(&self, key: String) -> NamespaceTable<N, E> {
494 self.inner
495 .entry(key)
496 .or_insert_with(|| Arc::new(RwLock::new(GraphTable::new())))
497 .clone()
498 }
499}
500
501impl<N, E> Default for InMemoryGraphMemory<N, E>
502where
503 N: Clone + Send + Sync + 'static,
504 E: Clone + Send + Sync + 'static,
505{
506 fn default() -> Self {
507 Self::new()
508 }
509}
510
511impl<N, E> Clone for InMemoryGraphMemory<N, E>
512where
513 N: Clone + Send + Sync + 'static,
514 E: Clone + Send + Sync + 'static,
515{
516 fn clone(&self) -> Self {
517 Self {
518 inner: Arc::clone(&self.inner),
519 }
520 }
521}
522
523#[async_trait]
524impl<N, E> GraphMemory<N, E> for InMemoryGraphMemory<N, E>
525where
526 N: Clone + Send + Sync + 'static,
527 E: Clone + Send + Sync + 'static,
528{
529 async fn add_node(&self, _ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId> {
530 let id = NodeId::new();
531 let table = self.table_for_write(ns.render());
532 table.write().nodes.insert(id.clone(), node);
533 Ok(id)
534 }
535
536 async fn add_edge(
537 &self,
538 _ctx: &ExecutionContext,
539 ns: &Namespace,
540 from: &NodeId,
541 to: &NodeId,
542 edge: E,
543 timestamp: DateTime<Utc>,
544 ) -> Result<EdgeId> {
545 let id = EdgeId::new();
546 let table = self.table_for_write(ns.render());
547 let mut guard = table.write();
548 if !guard.nodes.contains_key(from) {
549 return Err(entelix_core::Error::invalid_request(format!(
550 "GraphMemory::add_edge: source node {from} does not exist"
551 )));
552 }
553 if !guard.nodes.contains_key(to) {
554 return Err(entelix_core::Error::invalid_request(format!(
555 "GraphMemory::add_edge: target node {to} does not exist"
556 )));
557 }
558 let stored = StoredEdge {
559 id: id.clone(),
560 from: from.clone(),
561 to: to.clone(),
562 payload: edge,
563 timestamp,
564 };
565 guard.edges.insert(id.clone(), stored);
566 guard
567 .out_adj
568 .entry(from.clone())
569 .or_default()
570 .push(id.clone());
571 guard.in_adj.entry(to.clone()).or_default().push(id.clone());
572 Ok(id)
573 }
574
575 async fn add_edges_batch(
576 &self,
577 _ctx: &ExecutionContext,
578 ns: &Namespace,
579 edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
580 ) -> Result<Vec<EdgeId>> {
581 if edges.is_empty() {
582 return Ok(Vec::new());
583 }
584 let table = self.table_for_write(ns.render());
585 let mut guard = table.write();
586 for (from, to, _, _) in &edges {
590 if !guard.nodes.contains_key(from) {
591 return Err(entelix_core::Error::invalid_request(format!(
592 "GraphMemory::add_edges_batch: source node {from} does not exist"
593 )));
594 }
595 if !guard.nodes.contains_key(to) {
596 return Err(entelix_core::Error::invalid_request(format!(
597 "GraphMemory::add_edges_batch: target node {to} does not exist"
598 )));
599 }
600 }
601 let mut ids = Vec::with_capacity(edges.len());
603 for (from, to, payload, timestamp) in edges {
604 let id = EdgeId::new();
605 let stored = StoredEdge {
606 id: id.clone(),
607 from: from.clone(),
608 to: to.clone(),
609 payload,
610 timestamp,
611 };
612 guard.edges.insert(id.clone(), stored);
613 guard.out_adj.entry(from).or_default().push(id.clone());
614 guard.in_adj.entry(to).or_default().push(id.clone());
615 ids.push(id);
616 }
617 Ok(ids)
618 }
619
620 async fn get_node(
621 &self,
622 _ctx: &ExecutionContext,
623 ns: &Namespace,
624 id: &NodeId,
625 ) -> Result<Option<N>> {
626 let Some(table) = self.table_for(&ns.render()) else {
627 return Ok(None);
628 };
629 Ok(table.read().nodes.get(id).cloned())
630 }
631
632 async fn get_edge(
633 &self,
634 _ctx: &ExecutionContext,
635 ns: &Namespace,
636 edge_id: &EdgeId,
637 ) -> Result<Option<GraphHop<E>>> {
638 let Some(table) = self.table_for(&ns.render()) else {
639 return Ok(None);
640 };
641 Ok(table.read().edges.get(edge_id).map(|e| GraphHop {
642 edge_id: e.id.clone(),
643 from: e.from.clone(),
644 to: e.to.clone(),
645 edge: e.payload.clone(),
646 timestamp: e.timestamp,
647 }))
648 }
649
650 async fn neighbors(
651 &self,
652 _ctx: &ExecutionContext,
653 ns: &Namespace,
654 node: &NodeId,
655 direction: Direction,
656 ) -> Result<Vec<(EdgeId, NodeId, E)>> {
657 let Some(table) = self.table_for(&ns.render()) else {
658 return Ok(Vec::new());
659 };
660 let guard = table.read();
661 let mut out = Vec::new();
662 let mut collect = |edge_ids: &[EdgeId], pick_far: fn(&StoredEdge<E>) -> &NodeId| {
663 for eid in edge_ids {
664 if let Some(stored) = guard.edges.get(eid) {
665 out.push((
666 eid.clone(),
667 pick_far(stored).clone(),
668 stored.payload.clone(),
669 ));
670 }
671 }
672 };
673 if matches!(direction, Direction::Outgoing | Direction::Both)
674 && let Some(ids) = guard.out_adj.get(node)
675 {
676 collect(ids, |s| &s.to);
677 }
678 if matches!(direction, Direction::Incoming | Direction::Both)
679 && let Some(ids) = guard.in_adj.get(node)
680 {
681 collect(ids, |s| &s.from);
682 }
683 Ok(out)
684 }
685
686 async fn traverse(
687 &self,
688 ctx: &ExecutionContext,
689 ns: &Namespace,
690 start: &NodeId,
691 direction: Direction,
692 max_depth: usize,
693 ) -> Result<Vec<GraphHop<E>>> {
694 if max_depth == 0 {
695 return Ok(Vec::new());
696 }
697 let Some(table) = self.table_for(&ns.render()) else {
698 return Ok(Vec::new());
699 };
700 let guard = table.read();
701 let mut visited: HashSet<NodeId> = HashSet::new();
702 visited.insert(start.clone());
703 let mut frontier: VecDeque<(NodeId, usize)> = VecDeque::new();
704 frontier.push_back((start.clone(), 0));
705 let mut out = Vec::new();
706 while let Some((current, depth)) = frontier.pop_front() {
707 if ctx.is_cancelled() {
708 return Err(Error::Cancelled);
709 }
710 if depth >= max_depth {
711 continue;
712 }
713 for stored in directional_edges(&guard, ¤t, direction) {
714 let neighbour = stored
715 .other_endpoint_of(¤t)
716 .cloned()
717 .unwrap_or_else(|| stored.to.clone());
718 if visited.insert(neighbour.clone()) {
719 out.push(GraphHop {
720 edge_id: stored.id.clone(),
721 from: stored.from.clone(),
722 to: stored.to.clone(),
723 edge: stored.payload.clone(),
724 timestamp: stored.timestamp,
725 });
726 frontier.push_back((neighbour, depth + 1));
727 }
728 }
729 }
730 Ok(out)
731 }
732
733 async fn find_path(
734 &self,
735 ctx: &ExecutionContext,
736 ns: &Namespace,
737 from: &NodeId,
738 to: &NodeId,
739 direction: Direction,
740 max_depth: usize,
741 ) -> Result<Option<Vec<GraphHop<E>>>> {
742 if from == to {
743 return Ok(Some(Vec::new()));
744 }
745 if max_depth == 0 {
746 return Ok(None);
747 }
748 let Some(table) = self.table_for(&ns.render()) else {
749 return Ok(None);
750 };
751 let guard = table.read();
752 let mut parents: BTreeMap<NodeId, (EdgeId, NodeId)> = BTreeMap::new();
753 let mut depths: BTreeMap<NodeId, usize> = BTreeMap::new();
754 depths.insert(from.clone(), 0);
755 let mut frontier: VecDeque<NodeId> = VecDeque::new();
756 frontier.push_back(from.clone());
757 while let Some(current) = frontier.pop_front() {
758 if ctx.is_cancelled() {
759 return Err(Error::Cancelled);
760 }
761 let depth = *depths.get(¤t).unwrap_or(&0);
762 if depth >= max_depth {
763 continue;
764 }
765 for stored in directional_edges(&guard, ¤t, direction) {
766 let neighbour = stored
767 .other_endpoint_of(¤t)
768 .cloned()
769 .unwrap_or_else(|| stored.to.clone());
770 if depths.contains_key(&neighbour) {
771 continue;
772 }
773 depths.insert(neighbour.clone(), depth + 1);
774 parents.insert(neighbour.clone(), (stored.id.clone(), current.clone()));
775 if &neighbour == to {
776 let mut hops: Vec<GraphHop<E>> = Vec::new();
777 let mut cursor = to.clone();
778 while let Some((eid, prev)) = parents.get(&cursor).cloned() {
779 if let Some(stored) = guard.edges.get(&eid) {
780 hops.push(GraphHop {
781 edge_id: stored.id.clone(),
782 from: stored.from.clone(),
783 to: stored.to.clone(),
784 edge: stored.payload.clone(),
785 timestamp: stored.timestamp,
786 });
787 }
788 cursor = prev;
789 }
790 hops.reverse();
791 return Ok(Some(hops));
792 }
793 frontier.push_back(neighbour);
794 }
795 }
796 Ok(None)
797 }
798
799 async fn temporal_filter(
800 &self,
801 _ctx: &ExecutionContext,
802 ns: &Namespace,
803 from: DateTime<Utc>,
804 to: DateTime<Utc>,
805 ) -> Result<Vec<GraphHop<E>>> {
806 let Some(table) = self.table_for(&ns.render()) else {
807 return Ok(Vec::new());
808 };
809 let guard = table.read();
810 let mut out: Vec<GraphHop<E>> = guard
811 .edges
812 .values()
813 .filter(|e| e.timestamp >= from && e.timestamp < to)
814 .map(|e| GraphHop {
815 edge_id: e.id.clone(),
816 from: e.from.clone(),
817 to: e.to.clone(),
818 edge: e.payload.clone(),
819 timestamp: e.timestamp,
820 })
821 .collect();
822 out.sort_by_key(|hop| hop.timestamp);
823 Ok(out)
824 }
825
826 async fn node_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
827 let Some(table) = self.table_for(&ns.render()) else {
828 return Ok(0);
829 };
830 Ok(table.read().nodes.len())
831 }
832
833 async fn edge_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
834 let Some(table) = self.table_for(&ns.render()) else {
835 return Ok(0);
836 };
837 Ok(table.read().edges.len())
838 }
839
840 async fn delete_edge(
841 &self,
842 _ctx: &ExecutionContext,
843 ns: &Namespace,
844 edge_id: &EdgeId,
845 ) -> Result<()> {
846 let Some(table) = self.table_for(&ns.render()) else {
847 return Ok(());
848 };
849 let mut guard = table.write();
850 if let Some(edge) = guard.edges.remove(edge_id) {
851 if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
852 out_list.retain(|e| e != &edge.id);
853 }
854 if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
855 in_list.retain(|e| e != &edge.id);
856 }
857 }
858 Ok(())
859 }
860
861 async fn delete_node(
862 &self,
863 _ctx: &ExecutionContext,
864 ns: &Namespace,
865 node_id: &NodeId,
866 ) -> Result<usize> {
867 let Some(table) = self.table_for(&ns.render()) else {
868 return Ok(0);
869 };
870 let mut guard = table.write();
871 let mut incident: HashSet<EdgeId> = HashSet::new();
875 if let Some(out_list) = guard.out_adj.get(node_id) {
876 for id in out_list {
877 incident.insert(id.clone());
878 }
879 }
880 if let Some(in_list) = guard.in_adj.get(node_id) {
881 for id in in_list {
882 incident.insert(id.clone());
883 }
884 }
885 let removed = incident.len();
886 for edge_id in incident {
887 if let Some(edge) = guard.edges.remove(&edge_id) {
888 if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
889 out_list.retain(|e| e != &edge.id);
890 }
891 if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
892 in_list.retain(|e| e != &edge.id);
893 }
894 }
895 }
896 guard.nodes.remove(node_id);
897 guard.out_adj.remove(node_id);
898 guard.in_adj.remove(node_id);
899 Ok(removed)
900 }
901
902 async fn prune_older_than(
903 &self,
904 _ctx: &ExecutionContext,
905 ns: &Namespace,
906 ttl: std::time::Duration,
907 ) -> Result<usize> {
908 let Some(table) = self.table_for(&ns.render()) else {
909 return Ok(0);
910 };
911 let cutoff = Utc::now() - chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
915 let mut guard = table.write();
916 let stale: Vec<EdgeId> = guard
917 .edges
918 .iter()
919 .filter(|(_, e)| e.timestamp < cutoff)
920 .map(|(id, _)| id.clone())
921 .collect();
922 let removed = stale.len();
923 for id in stale {
924 if let Some(edge) = guard.edges.remove(&id) {
925 if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
926 out_list.retain(|e| e != &edge.id);
927 }
928 if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
929 in_list.retain(|e| e != &edge.id);
930 }
931 }
932 }
933 Ok(removed)
934 }
935}
936
937impl<E> StoredEdge<E> {
938 fn other_endpoint_of(&self, node: &NodeId) -> Option<&NodeId> {
939 if &self.from == node {
940 Some(&self.to)
941 } else if &self.to == node {
942 Some(&self.from)
943 } else {
944 None
945 }
946 }
947}
948
949fn directional_edges<'a, N, E>(
950 table: &'a GraphTable<N, E>,
951 node: &NodeId,
952 direction: Direction,
953) -> Vec<&'a StoredEdge<E>> {
954 let mut out: Vec<&'a StoredEdge<E>> = Vec::new();
955 if matches!(direction, Direction::Outgoing | Direction::Both)
956 && let Some(ids) = table.out_adj.get(node)
957 {
958 for eid in ids {
959 if let Some(stored) = table.edges.get(eid) {
960 out.push(stored);
961 }
962 }
963 }
964 if matches!(direction, Direction::Incoming | Direction::Both)
965 && let Some(ids) = table.in_adj.get(node)
966 {
967 for eid in ids {
968 if let Some(stored) = table.edges.get(eid) {
969 out.push(stored);
970 }
971 }
972 }
973 out
974}
975
976#[cfg(test)]
977#[allow(
978 clippy::unwrap_used,
979 clippy::indexing_slicing,
980 clippy::many_single_char_names
981)]
982mod tests {
983 use super::*;
984 use entelix_core::TenantId;
985
986 fn ns() -> Namespace {
987 Namespace::new(TenantId::new("tenant")).with_scope("graph")
988 }
989
990 #[tokio::test]
991 async fn add_and_lookup_node() {
992 let g = InMemoryGraphMemory::<&str, &str>::new();
993 let ctx = ExecutionContext::new();
994 let id = g.add_node(&ctx, &ns(), "alice").await.unwrap();
995 let fetched = g.get_node(&ctx, &ns(), &id).await.unwrap();
996 assert_eq!(fetched, Some("alice"));
997 }
998
999 #[tokio::test]
1000 async fn add_edges_batch_inserts_all_atomically() {
1001 let g = InMemoryGraphMemory::<&str, &str>::new();
1002 let ctx = ExecutionContext::new();
1003 let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1004 let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1005 let carol = g.add_node(&ctx, &ns(), "carol").await.unwrap();
1006 let now = Utc::now();
1007 let ids = g
1008 .add_edges_batch(
1009 &ctx,
1010 &ns(),
1011 vec![
1012 (alice.clone(), bob.clone(), "knows", now),
1013 (bob.clone(), carol.clone(), "knows", now),
1014 (alice.clone(), carol.clone(), "knows", now),
1015 ],
1016 )
1017 .await
1018 .unwrap();
1019 assert_eq!(ids.len(), 3, "returns one EdgeId per input");
1020 for id in &ids {
1022 let hop = g.get_edge(&ctx, &ns(), id).await.unwrap();
1023 assert!(hop.is_some(), "edge {id} must be retrievable");
1024 }
1025 }
1026
1027 #[tokio::test]
1028 async fn add_edges_batch_rejects_unknown_endpoint_without_partial_writes() {
1029 let g = InMemoryGraphMemory::<&str, &str>::new();
1032 let ctx = ExecutionContext::new();
1033 let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1034 let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1035 let ghost = NodeId::new();
1036 let now = Utc::now();
1037 let err = g
1038 .add_edges_batch(
1039 &ctx,
1040 &ns(),
1041 vec![
1042 (alice.clone(), bob.clone(), "knows", now),
1043 (alice.clone(), ghost, "knows", now), ],
1045 )
1046 .await;
1047 assert!(err.is_err(), "batch with unknown endpoint must fail");
1048 assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1051 }
1052
1053 #[tokio::test]
1054 async fn add_edges_batch_empty_input_is_a_noop() {
1055 let g = InMemoryGraphMemory::<&str, &str>::new();
1056 let ctx = ExecutionContext::new();
1057 let ids = g.add_edges_batch(&ctx, &ns(), Vec::new()).await.unwrap();
1058 assert!(ids.is_empty());
1059 }
1060
1061 #[tokio::test]
1062 async fn add_edge_requires_existing_endpoints() {
1063 let g = InMemoryGraphMemory::<&str, &str>::new();
1064 let ctx = ExecutionContext::new();
1065 let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1066 let ghost = NodeId::new();
1067 let err = g
1068 .add_edge(&ctx, &ns(), &alice, &ghost, "knows", Utc::now())
1069 .await;
1070 assert!(err.is_err());
1071 }
1072
1073 #[tokio::test]
1074 async fn neighbors_split_by_direction() {
1075 let g = InMemoryGraphMemory::<&str, &str>::new();
1076 let ctx = ExecutionContext::new();
1077 let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1078 let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1079 let _eid = g
1080 .add_edge(&ctx, &ns(), &alice, &bob, "knows", Utc::now())
1081 .await
1082 .unwrap();
1083 let outgoing = g
1084 .neighbors(&ctx, &ns(), &alice, Direction::Outgoing)
1085 .await
1086 .unwrap();
1087 assert_eq!(outgoing.len(), 1);
1088 let incoming = g
1089 .neighbors(&ctx, &ns(), &alice, Direction::Incoming)
1090 .await
1091 .unwrap();
1092 assert!(incoming.is_empty());
1093 }
1094
1095 #[tokio::test]
1096 async fn traverse_respects_max_depth() {
1097 let g = InMemoryGraphMemory::<&str, &str>::new();
1098 let ctx = ExecutionContext::new();
1099 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1100 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1101 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1102 let d = g.add_node(&ctx, &ns(), "d").await.unwrap();
1103 let now = Utc::now();
1104 g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
1105 g.add_edge(&ctx, &ns(), &b, &c, "->", now).await.unwrap();
1106 g.add_edge(&ctx, &ns(), &c, &d, "->", now).await.unwrap();
1107 let two = g
1108 .traverse(&ctx, &ns(), &a, Direction::Outgoing, 2)
1109 .await
1110 .unwrap();
1111 assert_eq!(two.len(), 2);
1112 let three = g
1113 .traverse(&ctx, &ns(), &a, Direction::Outgoing, 3)
1114 .await
1115 .unwrap();
1116 assert_eq!(three.len(), 3);
1117 }
1118
1119 #[tokio::test]
1120 async fn traverse_with_direction_both_walks_inverse_edges() {
1121 let g = InMemoryGraphMemory::<&str, &str>::new();
1122 let ctx = ExecutionContext::new();
1123 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1124 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1125 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1126 let now = Utc::now();
1127 g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
1130 g.add_edge(&ctx, &ns(), &c, &b, "->", now).await.unwrap();
1131 let from_b = g
1132 .traverse(&ctx, &ns(), &b, Direction::Both, 1)
1133 .await
1134 .unwrap();
1135 assert_eq!(from_b.len(), 2);
1136 }
1137
1138 #[tokio::test]
1139 async fn find_path_returns_shortest() {
1140 let g = InMemoryGraphMemory::<&str, &str>::new();
1141 let ctx = ExecutionContext::new();
1142 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1143 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1144 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1145 let now = Utc::now();
1146 g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1147 g.add_edge(&ctx, &ns(), &b, &c, "bc", now).await.unwrap();
1148 let path = g
1149 .find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
1150 .await
1151 .unwrap();
1152 let hops = path.unwrap();
1153 assert_eq!(hops.len(), 2);
1154 assert_eq!(hops[0].from, a);
1155 assert_eq!(hops[1].to, c);
1156 }
1157
1158 #[tokio::test]
1159 async fn temporal_filter_picks_window() {
1160 let g = InMemoryGraphMemory::<&str, &str>::new();
1161 let ctx = ExecutionContext::new();
1162 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1163 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1164 let early = Utc::now() - chrono::Duration::hours(2);
1165 let late = Utc::now();
1166 g.add_edge(&ctx, &ns(), &a, &b, "early", early)
1167 .await
1168 .unwrap();
1169 g.add_edge(&ctx, &ns(), &a, &b, "late", late).await.unwrap();
1170 let window = g
1171 .temporal_filter(
1172 &ctx,
1173 &ns(),
1174 Utc::now() - chrono::Duration::hours(1),
1175 Utc::now() + chrono::Duration::hours(1),
1176 )
1177 .await
1178 .unwrap();
1179 assert_eq!(window.len(), 1);
1180 assert_eq!(window[0].edge, "late");
1181 }
1182
1183 #[tokio::test]
1184 async fn node_count_and_edge_count_track_inserts() {
1185 let g = InMemoryGraphMemory::<&str, &str>::new();
1186 let ctx = ExecutionContext::new();
1187 assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 0);
1189 assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1190 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1191 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1192 assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 2);
1193 assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1194 let _ = g
1195 .add_edge(&ctx, &ns(), &a, &b, "ab", Utc::now())
1196 .await
1197 .unwrap();
1198 assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 1);
1199 }
1200
1201 #[tokio::test]
1202 async fn count_methods_respect_namespace_isolation() {
1203 let g = InMemoryGraphMemory::<&str, &str>::new();
1204 let ctx = ExecutionContext::new();
1205 let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
1206 let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
1207 let _ = g.add_node(&ctx, &alpha, "n").await.unwrap();
1208 assert_eq!(g.node_count(&ctx, &alpha).await.unwrap(), 1);
1209 assert_eq!(g.node_count(&ctx, &beta).await.unwrap(), 0);
1210 }
1211
1212 #[tokio::test]
1213 async fn delete_edge_is_idempotent_and_dedups_adjacency() {
1214 let g = InMemoryGraphMemory::<&str, &str>::new();
1215 let ctx = ExecutionContext::new();
1216 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1217 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1218 let now = Utc::now();
1219 let id = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1220 g.delete_edge(&ctx, &ns(), &id).await.unwrap();
1222 g.delete_edge(&ctx, &ns(), &id).await.unwrap();
1223 let outgoing = g
1225 .neighbors(&ctx, &ns(), &a, Direction::Outgoing)
1226 .await
1227 .unwrap();
1228 assert!(outgoing.is_empty());
1229 let incoming = g
1230 .neighbors(&ctx, &ns(), &b, Direction::Incoming)
1231 .await
1232 .unwrap();
1233 assert!(incoming.is_empty());
1234 }
1235
1236 #[tokio::test]
1237 async fn delete_node_cascades_to_incident_edges() {
1238 let g = InMemoryGraphMemory::<&str, &str>::new();
1239 let ctx = ExecutionContext::new();
1240 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1241 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1242 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1243 let now = Utc::now();
1244 let _ = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1246 let _ = g.add_edge(&ctx, &ns(), &a, &c, "ac", now).await.unwrap();
1247 let _ = g.add_edge(&ctx, &ns(), &b, &a, "ba", now).await.unwrap();
1248 let removed = g.delete_node(&ctx, &ns(), &a).await.unwrap();
1249 assert_eq!(removed, 3);
1250 assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_none());
1251 assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
1253 assert!(g.get_node(&ctx, &ns(), &c).await.unwrap().is_some());
1254 let b_in = g
1257 .neighbors(&ctx, &ns(), &b, Direction::Incoming)
1258 .await
1259 .unwrap();
1260 assert!(b_in.is_empty());
1261 }
1262
1263 #[tokio::test]
1264 async fn delete_node_with_self_loop_dedups_count() {
1265 let g = InMemoryGraphMemory::<&str, &str>::new();
1266 let ctx = ExecutionContext::new();
1267 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1268 let _ = g
1271 .add_edge(&ctx, &ns(), &a, &a, "self", Utc::now())
1272 .await
1273 .unwrap();
1274 assert_eq!(g.delete_node(&ctx, &ns(), &a).await.unwrap(), 1);
1275 }
1276
1277 #[tokio::test]
1278 async fn delete_node_on_absent_node_is_zero_noop() {
1279 let g = InMemoryGraphMemory::<&str, &str>::new();
1280 let ctx = ExecutionContext::new();
1281 let phantom = NodeId::from_string("does-not-exist");
1282 assert_eq!(g.delete_node(&ctx, &ns(), &phantom).await.unwrap(), 0);
1283 }
1284
1285 #[tokio::test]
1286 async fn prune_older_than_drops_stale_edges_only() {
1287 let g = InMemoryGraphMemory::<&str, &str>::new();
1288 let ctx = ExecutionContext::new();
1289 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1290 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1291 let now = Utc::now();
1292 let _old = g
1293 .add_edge(
1294 &ctx,
1295 &ns(),
1296 &a,
1297 &b,
1298 "old",
1299 now - chrono::Duration::seconds(120),
1300 )
1301 .await
1302 .unwrap();
1303 let _fresh = g
1304 .add_edge(
1305 &ctx,
1306 &ns(),
1307 &a,
1308 &b,
1309 "fresh",
1310 now - chrono::Duration::seconds(5),
1311 )
1312 .await
1313 .unwrap();
1314 let removed = g
1315 .prune_older_than(&ctx, &ns(), std::time::Duration::from_mins(1))
1316 .await
1317 .unwrap();
1318 assert_eq!(removed, 1);
1319 assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_some());
1321 assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
1322 let outgoing = g
1324 .neighbors(&ctx, &ns(), &a, Direction::Outgoing)
1325 .await
1326 .unwrap();
1327 assert_eq!(outgoing.len(), 1);
1328 assert_eq!(outgoing[0].2, "fresh");
1329 }
1330
1331 #[tokio::test]
1332 async fn prune_older_than_on_empty_namespace_is_noop() {
1333 let g = InMemoryGraphMemory::<&str, &str>::new();
1334 let ctx = ExecutionContext::new();
1335 let removed = g
1336 .prune_older_than(&ctx, &ns(), std::time::Duration::from_secs(0))
1337 .await
1338 .unwrap();
1339 assert_eq!(removed, 0);
1340 }
1341
1342 #[tokio::test]
1343 async fn namespaces_are_isolated() {
1344 let g = InMemoryGraphMemory::<&str, &str>::new();
1345 let ctx = ExecutionContext::new();
1346 let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
1347 let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
1348 let _ = g.add_node(&ctx, &alpha, "a-node").await.unwrap();
1349 let _ = g.add_node(&ctx, &beta, "b-node").await.unwrap();
1350 assert_eq!(g.total_nodes(), 2);
1353 }
1354
1355 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1356 async fn distinct_namespaces_write_concurrently() {
1357 let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
1365 let mut handles = Vec::new();
1366 for tenant in 0..8 {
1367 let g = g.clone();
1368 handles.push(tokio::spawn(async move {
1369 let ctx = ExecutionContext::new();
1370 let ns = Namespace::new(TenantId::new(format!("tenant-{tenant}")));
1371 let mut ids = Vec::new();
1372 for i in 0..50 {
1373 let id = g
1374 .add_node(&ctx, &ns, format!("t{tenant}-n{i}"))
1375 .await
1376 .unwrap();
1377 ids.push(id);
1378 }
1379 let now = Utc::now();
1382 for window in ids.windows(2) {
1383 g.add_edge(
1384 &ctx,
1385 &ns,
1386 &window[0],
1387 &window[1],
1388 format!("t{tenant}-edge"),
1389 now,
1390 )
1391 .await
1392 .unwrap();
1393 }
1394 }));
1395 }
1396 for h in handles {
1397 h.await.unwrap();
1398 }
1399 assert_eq!(g.total_nodes(), 8 * 50);
1402 assert_eq!(g.total_edges(), 8 * 49);
1403 }
1404
1405 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1406 async fn read_during_write_on_other_namespace_does_not_block() {
1407 let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
1417 let alpha = Namespace::new(TenantId::new("alpha"));
1418 let beta = Namespace::new(TenantId::new("beta"));
1419 let ctx = ExecutionContext::new();
1420 let beta_node_id = g
1421 .add_node(&ctx, &beta, "beta-fixture".to_owned())
1422 .await
1423 .unwrap();
1424
1425 let g_writer = g.clone();
1429 let alpha_writer = alpha.clone();
1430 let writer = tokio::spawn(async move {
1431 let ctx = ExecutionContext::new();
1432 for i in 0..200 {
1433 g_writer
1434 .add_node(&ctx, &alpha_writer, format!("alpha-{i}"))
1435 .await
1436 .unwrap();
1437 }
1438 });
1439 let mut reads = Vec::new();
1440 for _ in 0..200 {
1441 let g_reader = g.clone();
1442 let beta_reader = beta.clone();
1443 let id_reader = beta_node_id.clone();
1444 reads.push(tokio::spawn(async move {
1445 let ctx = ExecutionContext::new();
1446 g_reader
1447 .get_node(&ctx, &beta_reader, &id_reader)
1448 .await
1449 .unwrap()
1450 }));
1451 }
1452 for r in reads {
1453 assert_eq!(r.await.unwrap().as_deref(), Some("beta-fixture"));
1454 }
1455 writer.await.unwrap();
1456 assert_eq!(g.total_nodes(), 1 + 200);
1457 }
1458
1459 #[tokio::test]
1460 async fn traverse_short_circuits_on_cancellation() {
1461 let g = InMemoryGraphMemory::<&str, &str>::new();
1462 let ctx = ExecutionContext::new();
1463 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1464 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1465 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1466 let now = Utc::now();
1467 g.add_edge(&ctx, &ns(), &a, &b, "knows", now).await.unwrap();
1468 g.add_edge(&ctx, &ns(), &b, &c, "knows", now).await.unwrap();
1469 ctx.cancellation().cancel();
1470 let err = g
1471 .traverse(&ctx, &ns(), &a, Direction::Outgoing, 5)
1472 .await
1473 .unwrap_err();
1474 assert!(matches!(err, Error::Cancelled), "got {err:?}");
1475 }
1476
1477 #[tokio::test]
1478 async fn find_path_short_circuits_on_cancellation() {
1479 let g = InMemoryGraphMemory::<&str, &str>::new();
1480 let ctx = ExecutionContext::new();
1481 let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1482 let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1483 let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1484 let now = Utc::now();
1485 g.add_edge(&ctx, &ns(), &a, &b, "e", now).await.unwrap();
1486 g.add_edge(&ctx, &ns(), &b, &c, "e", now).await.unwrap();
1487 ctx.cancellation().cancel();
1488 let err = g
1489 .find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
1490 .await
1491 .unwrap_err();
1492 assert!(matches!(err, Error::Cancelled), "got {err:?}");
1493 }
1494}