gun/graph.rs
1//! Graph storage and conflict resolution
2//!
3//! This module implements the in-memory graph storage for Gun. The graph stores all
4//! nodes by their soul (unique identifier) and provides conflict resolution using
5//! the HAM (Hypothetical Amnesia Machine) algorithm.
6//!
7//! Based on Gun.js graph structure. The graph is thread-safe and can be shared
8//! across threads using `Arc<Graph>`.
9
10use crate::error::GunResult;
11use crate::state::Node;
12use parking_lot::RwLock;
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Graph storage for all nodes in the database
18///
19/// The graph is an in-memory hash map that stores nodes by their soul (unique identifier).
20/// It provides thread-safe access and automatic conflict resolution when merging updates.
21///
22/// # Thread Safety
23///
24/// `Graph` is thread-safe and uses `parking_lot::RwLock` for concurrent access.
25/// Multiple threads can read simultaneously, or a single thread can write exclusively.
26///
27/// # Conflict Resolution
28///
29/// When merging nodes, the graph uses the HAM algorithm:
30/// - Compare state timestamps for each property
31/// - Higher state wins
32/// - Merge non-conflicting properties
33///
34/// # Example
35///
36/// ```rust,no_run
37/// use gun::graph::Graph;
38/// use gun::state::Node;
39///
40/// let graph = Graph::new();
41/// let node = Node::with_soul("user_123".to_string());
42/// graph.put("user_123", node)?;
43///
44/// if let Some(loaded_node) = graph.get("user_123") {
45/// println!("Found node: {:?}", loaded_node);
46/// }
47/// ```
48#[derive(Clone)]
49pub struct Graph {
50 nodes: Arc<RwLock<HashMap<String, Node>>>,
51}
52
53impl Graph {
54 /// Create a new empty graph
55 pub fn new() -> Self {
56 Self {
57 nodes: Arc::new(RwLock::new(HashMap::new())),
58 }
59 }
60
61 /// Get a node by its soul (unique identifier)
62 ///
63 /// # Arguments
64 /// * `soul` - The unique identifier of the node
65 ///
66 /// # Returns
67 /// The node if found, or `None` if it doesn't exist.
68 pub fn get(&self, soul: &str) -> Option<Node> {
69 self.nodes.read().get(soul).cloned()
70 }
71
72 /// Store a node in the graph by its soul
73 ///
74 /// If a node with the same soul already exists, it will be overwritten.
75 /// For conflict resolution, use [`merge`](Self::merge) instead.
76 ///
77 /// # Arguments
78 /// * `soul` - The unique identifier for the node
79 /// * `node` - The node to store
80 ///
81 /// # Returns
82 /// `Ok(())` on success, or a `GunError` if something goes wrong.
83 pub fn put(&self, soul: &str, node: Node) -> GunResult<()> {
84 self.nodes.write().insert(soul.to_string(), node);
85 Ok(())
86 }
87
88 /// Check if a node with the given soul exists in the graph
89 ///
90 /// # Arguments
91 /// * `soul` - The unique identifier to check
92 ///
93 /// # Returns
94 /// `true` if the node exists, `false` otherwise.
95 pub fn has(&self, soul: &str) -> bool {
96 self.nodes.read().contains_key(soul)
97 }
98
99 /// Get a copy of all nodes in the graph (for debugging/testing)
100 ///
101 /// **Warning**: This clones all nodes, which can be expensive for large graphs.
102 /// Only use this for debugging or small datasets.
103 ///
104 /// # Returns
105 /// A `HashMap` mapping soul to node for all nodes in the graph.
106 pub fn all_nodes(&self) -> HashMap<String, Node> {
107 self.nodes.read().clone()
108 }
109
110 /// Merge a node into the graph with automatic conflict resolution
111 ///
112 /// This method implements the HAM (Hypothetical Amnesia Machine) algorithm:
113 /// - If the node doesn't exist, it's inserted
114 /// - If the node exists, properties are merged based on state timestamps
115 /// - Higher state always wins in conflicts
116 /// - Non-conflicting properties are preserved
117 ///
118 /// # Arguments
119 /// * `soul` - The unique identifier for the node
120 /// * `incoming` - The node to merge in
121 /// * `state_fn` - Function that generates the current state timestamp
122 ///
123 /// # Returns
124 /// The merged node after conflict resolution.
125 ///
126 /// # Example
127 ///
128 /// ```rust,no_run
129 /// use gun::graph::Graph;
130 /// use gun::state::{Node, State};
131 /// use serde_json::json;
132 ///
133 /// let graph = Graph::new();
134 /// let state = State::new();
135 ///
136 /// let mut node1 = Node::with_soul("user_123".to_string());
137 /// node1.data.insert("name".to_string(), json!("Alice"));
138 ///
139 /// let merged = graph.merge("user_123", &node1, || state.next())?;
140 /// ```
141 pub fn merge(
142 &self,
143 soul: &str,
144 incoming: &Node,
145 state_fn: impl Fn() -> f64,
146 ) -> GunResult<Node> {
147 let mut nodes = self.nodes.write();
148 let existing = nodes.get(soul);
149
150 if let Some(existing_node) = existing {
151 // Merge logic - resolve conflicts based on state timestamps
152 let merged = Self::merge_nodes(existing_node, incoming, state_fn)?;
153 nodes.insert(soul.to_string(), merged.clone());
154 Ok(merged)
155 } else {
156 nodes.insert(soul.to_string(), incoming.clone());
157 Ok(incoming.clone())
158 }
159 }
160
161 /// Merge two nodes resolving conflicts based on state
162 fn merge_nodes(
163 existing: &Node,
164 incoming: &Node,
165 state_fn: impl Fn() -> f64,
166 ) -> GunResult<Node> {
167 let mut merged = existing.clone();
168 let _current_state = state_fn();
169
170 // Get states from both nodes
171 let existing_states = existing
172 .meta
173 .get(">")
174 .and_then(|v| v.as_object())
175 .cloned()
176 .unwrap_or_else(serde_json::Map::new);
177
178 let incoming_states = incoming
179 .meta
180 .get(">")
181 .and_then(|v| v.as_object())
182 .cloned()
183 .unwrap_or_else(serde_json::Map::new);
184
185 // Merge data fields based on state comparison
186 for (key, incoming_value) in incoming.data.iter() {
187 let existing_state = existing_states
188 .get(key)
189 .and_then(|v| v.as_f64())
190 .unwrap_or(f64::NEG_INFINITY);
191
192 let incoming_state = incoming_states
193 .get(key)
194 .and_then(|v| v.as_f64())
195 .unwrap_or(f64::NEG_INFINITY);
196
197 if incoming_state >= existing_state {
198 merged.data.insert(key.clone(), incoming_value.clone());
199
200 // Update state
201 let states = merged
202 .meta
203 .entry(">".to_string())
204 .or_insert_with(|| Value::Object(serde_json::Map::new()));
205
206 if let Value::Object(ref mut map) = states {
207 // from_f64 can fail if incoming_state is NaN or Infinity
208 // In practice, incoming_state should always be a valid finite number
209 if let Some(number) = serde_json::Number::from_f64(incoming_state) {
210 map.insert(key.clone(), Value::Number(number));
211 } else {
212 // Log error but continue - this should never happen in practice
213 tracing::error!("Invalid incoming state number: {} (NaN or Infinity)", incoming_state);
214 }
215 }
216 }
217 }
218
219 // Merge meta fields
220 for (key, value) in incoming.meta.iter() {
221 if key != ">" && key != "#" {
222 merged.meta.insert(key.clone(), value.clone());
223 }
224 }
225
226 Ok(merged)
227 }
228}
229
230impl Default for Graph {
231 fn default() -> Self {
232 Self::new()
233 }
234}