gun/
graph.rs

1//! Graph storage and conflict resolution
2//!
3//! This module implements the in-memory graph storage for Gun. The graph stores all
4//! nodes by their soul (unique identifier) and provides conflict resolution using
5//! the HAM (Hypothetical Amnesia Machine) algorithm.
6//!
7//! Based on Gun.js graph structure. The graph is thread-safe and can be shared
8//! across threads using `Arc<Graph>`.
9
10use crate::error::GunResult;
11use crate::state::Node;
12use parking_lot::RwLock;
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// Graph storage for all nodes in the database
18///
19/// The graph is an in-memory hash map that stores nodes by their soul (unique identifier).
20/// It provides thread-safe access and automatic conflict resolution when merging updates.
21///
22/// # Thread Safety
23///
24/// `Graph` is thread-safe and uses `parking_lot::RwLock` for concurrent access.
25/// Multiple threads can read simultaneously, or a single thread can write exclusively.
26///
27/// # Conflict Resolution
28///
29/// When merging nodes, the graph uses the HAM algorithm:
30/// - Compare state timestamps for each property
31/// - Higher state wins
32/// - Merge non-conflicting properties
33///
34/// # Example
35///
36/// ```rust,no_run
37/// use gun::graph::Graph;
38/// use gun::state::Node;
39///
40/// let graph = Graph::new();
41/// let node = Node::with_soul("user_123".to_string());
42/// graph.put("user_123", node)?;
43///
44/// if let Some(loaded_node) = graph.get("user_123") {
45///     println!("Found node: {:?}", loaded_node);
46/// }
47/// ```
48#[derive(Clone)]
49pub struct Graph {
50    nodes: Arc<RwLock<HashMap<String, Node>>>,
51}
52
53impl Graph {
54    /// Create a new empty graph
55    pub fn new() -> Self {
56        Self {
57            nodes: Arc::new(RwLock::new(HashMap::new())),
58        }
59    }
60
61    /// Get a node by its soul (unique identifier)
62    ///
63    /// # Arguments
64    /// * `soul` - The unique identifier of the node
65    ///
66    /// # Returns
67    /// The node if found, or `None` if it doesn't exist.
68    pub fn get(&self, soul: &str) -> Option<Node> {
69        self.nodes.read().get(soul).cloned()
70    }
71
72    /// Store a node in the graph by its soul
73    ///
74    /// If a node with the same soul already exists, it will be overwritten.
75    /// For conflict resolution, use [`merge`](Self::merge) instead.
76    ///
77    /// # Arguments
78    /// * `soul` - The unique identifier for the node
79    /// * `node` - The node to store
80    ///
81    /// # Returns
82    /// `Ok(())` on success, or a `GunError` if something goes wrong.
83    pub fn put(&self, soul: &str, node: Node) -> GunResult<()> {
84        self.nodes.write().insert(soul.to_string(), node);
85        Ok(())
86    }
87
88    /// Check if a node with the given soul exists in the graph
89    ///
90    /// # Arguments
91    /// * `soul` - The unique identifier to check
92    ///
93    /// # Returns
94    /// `true` if the node exists, `false` otherwise.
95    pub fn has(&self, soul: &str) -> bool {
96        self.nodes.read().contains_key(soul)
97    }
98
99    /// Get a copy of all nodes in the graph (for debugging/testing)
100    ///
101    /// **Warning**: This clones all nodes, which can be expensive for large graphs.
102    /// Only use this for debugging or small datasets.
103    ///
104    /// # Returns
105    /// A `HashMap` mapping soul to node for all nodes in the graph.
106    pub fn all_nodes(&self) -> HashMap<String, Node> {
107        self.nodes.read().clone()
108    }
109
110    /// Merge a node into the graph with automatic conflict resolution
111    ///
112    /// This method implements the HAM (Hypothetical Amnesia Machine) algorithm:
113    /// - If the node doesn't exist, it's inserted
114    /// - If the node exists, properties are merged based on state timestamps
115    /// - Higher state always wins in conflicts
116    /// - Non-conflicting properties are preserved
117    ///
118    /// # Arguments
119    /// * `soul` - The unique identifier for the node
120    /// * `incoming` - The node to merge in
121    /// * `state_fn` - Function that generates the current state timestamp
122    ///
123    /// # Returns
124    /// The merged node after conflict resolution.
125    ///
126    /// # Example
127    ///
128    /// ```rust,no_run
129    /// use gun::graph::Graph;
130    /// use gun::state::{Node, State};
131    /// use serde_json::json;
132    ///
133    /// let graph = Graph::new();
134    /// let state = State::new();
135    ///
136    /// let mut node1 = Node::with_soul("user_123".to_string());
137    /// node1.data.insert("name".to_string(), json!("Alice"));
138    ///
139    /// let merged = graph.merge("user_123", &node1, || state.next())?;
140    /// ```
141    pub fn merge(
142        &self,
143        soul: &str,
144        incoming: &Node,
145        state_fn: impl Fn() -> f64,
146    ) -> GunResult<Node> {
147        let mut nodes = self.nodes.write();
148        let existing = nodes.get(soul);
149
150        if let Some(existing_node) = existing {
151            // Merge logic - resolve conflicts based on state timestamps
152            let merged = Self::merge_nodes(existing_node, incoming, state_fn)?;
153            nodes.insert(soul.to_string(), merged.clone());
154            Ok(merged)
155        } else {
156            nodes.insert(soul.to_string(), incoming.clone());
157            Ok(incoming.clone())
158        }
159    }
160
161    /// Merge two nodes resolving conflicts based on state
162    fn merge_nodes(
163        existing: &Node,
164        incoming: &Node,
165        state_fn: impl Fn() -> f64,
166    ) -> GunResult<Node> {
167        let mut merged = existing.clone();
168        let _current_state = state_fn();
169
170        // Get states from both nodes
171        let existing_states = existing
172            .meta
173            .get(">")
174            .and_then(|v| v.as_object())
175            .cloned()
176            .unwrap_or_else(serde_json::Map::new);
177
178        let incoming_states = incoming
179            .meta
180            .get(">")
181            .and_then(|v| v.as_object())
182            .cloned()
183            .unwrap_or_else(serde_json::Map::new);
184
185        // Merge data fields based on state comparison
186        for (key, incoming_value) in incoming.data.iter() {
187            let existing_state = existing_states
188                .get(key)
189                .and_then(|v| v.as_f64())
190                .unwrap_or(f64::NEG_INFINITY);
191
192            let incoming_state = incoming_states
193                .get(key)
194                .and_then(|v| v.as_f64())
195                .unwrap_or(f64::NEG_INFINITY);
196
197            if incoming_state >= existing_state {
198                merged.data.insert(key.clone(), incoming_value.clone());
199
200                // Update state
201                let states = merged
202                    .meta
203                    .entry(">".to_string())
204                    .or_insert_with(|| Value::Object(serde_json::Map::new()));
205
206                if let Value::Object(ref mut map) = states {
207                    // from_f64 can fail if incoming_state is NaN or Infinity
208                    // In practice, incoming_state should always be a valid finite number
209                    if let Some(number) = serde_json::Number::from_f64(incoming_state) {
210                        map.insert(key.clone(), Value::Number(number));
211                    } else {
212                        // Log error but continue - this should never happen in practice
213                        tracing::error!("Invalid incoming state number: {} (NaN or Infinity)", incoming_state);
214                    }
215                }
216            }
217        }
218
219        // Merge meta fields
220        for (key, value) in incoming.meta.iter() {
221            if key != ">" && key != "#" {
222                merged.meta.insert(key.clone(), value.clone());
223            }
224        }
225
226        Ok(merged)
227    }
228}
229
230impl Default for Graph {
231    fn default() -> Self {
232        Self::new()
233    }
234}