qudag_dag/
graph.rs

1use blake3::Hash;
2use dashmap::DashMap;
3use lru::LruCache;
4use parking_lot::RwLock;
5use rayon::prelude::*;
6use std::collections::{HashSet, VecDeque};
7use std::num::NonZeroUsize;
8use std::time::Instant;
9use tracing::info;
10
11use crate::{DagError, Edge, Node, Result};
12
13/// Graph performance metrics
14#[derive(Debug, Default)]
15pub struct GraphMetrics {
16    /// Average vertex processing time in nanoseconds
17    pub avg_vertex_time_ns: u64,
18    /// Number of vertices processed
19    pub vertices_processed: u64,
20    /// Cache hit rate for vertex lookups
21    pub cache_hit_rate: f64,
22    /// Memory usage in bytes
23    pub memory_usage_bytes: usize,
24    /// Number of pruned vertices
25    pub pruned_vertices: u64,
26}
27
28/// Storage configuration for the DAG
29#[derive(Debug, Clone)]
30pub struct StorageConfig {
31    /// Maximum number of vertices to keep in memory
32    pub max_vertices: usize,
33    /// Maximum number of edges to cache
34    pub max_edges: usize,
35    /// Pruning threshold (vertices to keep after pruning)
36    pub pruning_threshold: usize,
37    /// Maximum depth to keep in fast access cache
38    pub cache_depth: usize,
39}
40
41impl Default for StorageConfig {
42    fn default() -> Self {
43        Self {
44            max_vertices: 100_000,
45            max_edges: 500_000,
46            pruning_threshold: 10_000,
47            cache_depth: 1000,
48        }
49    }
50}
51
52/// Vertex storage with efficient caching and pruning
53struct VertexStorage {
54    /// Primary storage for all vertices
55    vertices: DashMap<Hash, Node>,
56    /// Fast access cache for recent vertices
57    cache: RwLock<LruCache<Hash, Node>>,
58    /// Pruning queue for old vertices
59    pruning_queue: RwLock<VecDeque<Hash>>,
60    /// Configuration
61    config: StorageConfig,
62    /// Cache statistics
63    cache_hits: std::sync::atomic::AtomicU64,
64    cache_misses: std::sync::atomic::AtomicU64,
65}
66
67impl VertexStorage {
68    fn new(config: StorageConfig) -> Self {
69        let cache_size = NonZeroUsize::new(config.cache_depth).unwrap();
70        Self {
71            vertices: DashMap::with_capacity(config.max_vertices),
72            cache: RwLock::new(LruCache::new(cache_size)),
73            pruning_queue: RwLock::new(VecDeque::new()),
74            config,
75            cache_hits: std::sync::atomic::AtomicU64::new(0),
76            cache_misses: std::sync::atomic::AtomicU64::new(0),
77        }
78    }
79
80    fn get(&self, hash: &Hash) -> Option<Node> {
81        // Try cache first
82        if let Some(node) = self.cache.write().get(hash) {
83            self.cache_hits
84                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
85            return Some(node.clone());
86        }
87
88        // Try main storage
89        if let Some(node) = self.vertices.get(hash) {
90            let node = node.clone();
91            // Update cache
92            self.cache.write().put(*hash, node.clone());
93            self.cache_misses
94                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
95            Some(node)
96        } else {
97            None
98        }
99    }
100
101    fn insert(&self, hash: Hash, node: Node) -> Result<()> {
102        // Check capacity
103        if self.vertices.len() >= self.config.max_vertices {
104            self.prune_old_vertices()?;
105        }
106
107        // Insert into main storage
108        self.vertices.insert(hash, node.clone());
109
110        // Update cache
111        self.cache.write().put(hash, node);
112
113        // Add to pruning queue
114        self.pruning_queue.write().push_back(hash);
115
116        Ok(())
117    }
118
119    fn prune_old_vertices(&self) -> Result<()> {
120        let mut pruning_queue = self.pruning_queue.write();
121        let target_size = self.config.pruning_threshold;
122        let current_size = self.vertices.len();
123
124        if current_size <= target_size {
125            return Ok(());
126        }
127
128        let to_remove = current_size - target_size;
129        let mut removed = 0;
130
131        while removed < to_remove && !pruning_queue.is_empty() {
132            if let Some(hash) = pruning_queue.pop_front() {
133                // Only remove if vertex is in Final state
134                if let Some(node) = self.vertices.get(&hash) {
135                    if matches!(node.state(), crate::NodeState::Final) {
136                        self.vertices.remove(&hash);
137                        removed += 1;
138                    }
139                }
140            }
141        }
142
143        info!("Pruned {} vertices from storage", removed);
144        Ok(())
145    }
146
147    fn cache_hit_rate(&self) -> f64 {
148        let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed);
149        let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
150        let total = hits + misses;
151        if total == 0 {
152            0.0
153        } else {
154            hits as f64 / total as f64
155        }
156    }
157
158    fn memory_usage(&self) -> usize {
159        // Rough estimate: each node ~256 bytes + hash overhead
160        self.vertices.len() * 256
161    }
162}
163
164/// Represents the DAG data structure with high-performance concurrent access
165pub struct Graph {
166    /// Efficient vertex storage with caching
167    storage: VertexStorage,
168    /// Edges in the DAG with concurrent access
169    edges: DashMap<Hash, HashSet<Edge>>,
170    /// Performance metrics
171    metrics: RwLock<GraphMetrics>,
172}
173
174impl Default for Graph {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl Graph {
181    /// Creates a new empty DAG
182    pub fn new() -> Self {
183        Self::with_config(StorageConfig::default())
184    }
185
186    /// Creates a new Graph with specified initial capacity
187    pub fn with_capacity(capacity: usize) -> Self {
188        let config = StorageConfig {
189            max_vertices: capacity,
190            max_edges: capacity * 5,
191            ..StorageConfig::default()
192        };
193        Self::with_config(config)
194    }
195
196    /// Creates a new Graph with custom storage configuration
197    pub fn with_config(config: StorageConfig) -> Self {
198        Self {
199            storage: VertexStorage::new(config.clone()),
200            edges: DashMap::with_capacity(config.max_edges),
201            metrics: RwLock::new(GraphMetrics::default()),
202        }
203    }
204
205    /// Returns true if the DAG contains no nodes
206    pub fn is_empty(&self) -> bool {
207        self.storage.vertices.is_empty()
208    }
209
210    /// Returns the number of nodes in the DAG
211    pub fn len(&self) -> usize {
212        self.storage.vertices.len()
213    }
214
215    /// Adds a new node to the DAG
216    pub fn add_node(&self, node: Node) -> Result<()> {
217        let start = Instant::now();
218        let node_hash = node.hash();
219
220        // Check if node already exists
221        if self.storage.get(&node_hash).is_some() {
222            return Err(DagError::NodeExists(format!("{:?}", node_hash)));
223        }
224
225        // Verify all parents exist concurrently
226        let parents = node.parents();
227        let missing_parent = parents
228            .par_iter()
229            .find_first(|parent| self.storage.get(parent).is_none());
230
231        if let Some(parent) = missing_parent {
232            return Err(DagError::MissingParent(format!("{:?}", parent)));
233        }
234
235        // Add node to storage
236        self.storage.insert(node_hash, node)?;
237
238        // Initialize edge set
239        self.edges.entry(node_hash).or_default();
240
241        // Add edges from parents in parallel
242        if let Some(node) = self.storage.get(&node_hash) {
243            let parents = node.parents();
244            parents.par_iter().for_each(|parent| {
245                let edge = Edge::new(*parent, node_hash);
246                if let Some(mut parent_edges) = self.edges.get_mut(parent) {
247                    parent_edges.insert(edge);
248                }
249            });
250        }
251
252        // Update metrics
253        let elapsed = start.elapsed().as_nanos() as u64;
254        let mut metrics = self.metrics.write();
255        metrics.vertices_processed += 1;
256        metrics.avg_vertex_time_ns =
257            (metrics.avg_vertex_time_ns * (metrics.vertices_processed - 1) + elapsed)
258                / metrics.vertices_processed;
259        metrics.cache_hit_rate = self.storage.cache_hit_rate();
260        metrics.memory_usage_bytes = self.storage.memory_usage();
261
262        Ok(())
263    }
264
265    /// Returns a reference to a node by its hash
266    pub fn get_node(&self, hash: &Hash) -> Option<Node> {
267        self.storage.get(hash)
268    }
269
270    /// Returns all edges connected to a node
271    pub fn get_edges(&self, hash: &Hash) -> Option<HashSet<Edge>> {
272        // Fast concurrent lookup
273        self.edges.get(hash).map(|edges| edges.clone())
274    }
275
276    /// Updates the state of a node
277    pub fn update_node_state(&self, hash: &Hash, new_state: crate::node::NodeState) -> Result<()> {
278        // Get node from storage
279        let mut node = self
280            .storage
281            .get(hash)
282            .ok_or_else(|| DagError::NodeNotFound(format!("{:?}", hash)))?;
283
284        // Update state
285        node.update_state(new_state)?;
286
287        // Store updated node
288        self.storage.insert(*hash, node)?;
289
290        Ok(())
291    }
292
293    /// Checks if adding an edge would create a cycle
294    #[allow(dead_code)]
295    fn would_create_cycle(&self, from: &Hash, to: &Hash, visited: &mut HashSet<Hash>) -> bool {
296        if from == to {
297            return true;
298        }
299
300        if !visited.insert(*from) {
301            return false;
302        }
303
304        if let Some(edges) = self.edges.get(from) {
305            for edge in edges.iter() {
306                let edge_to = edge.to();
307                if self.would_create_cycle(&edge_to, to, visited) {
308                    return true;
309                }
310            }
311        }
312
313        false
314    }
315
316    /// Triggers pruning of old vertices
317    pub fn prune(&self) -> Result<()> {
318        self.storage.prune_old_vertices()?;
319
320        // Update metrics
321        let mut metrics = self.metrics.write();
322        metrics.pruned_vertices += 1;
323        metrics.memory_usage_bytes = self.storage.memory_usage();
324
325        Ok(())
326    }
327
328    /// Gets current performance metrics
329    pub fn metrics(&self) -> GraphMetrics {
330        let metrics_guard = self.metrics.read();
331        GraphMetrics {
332            avg_vertex_time_ns: metrics_guard.avg_vertex_time_ns,
333            vertices_processed: metrics_guard.vertices_processed,
334            cache_hit_rate: self.storage.cache_hit_rate(),
335            memory_usage_bytes: self.storage.memory_usage(),
336            pruned_vertices: metrics_guard.pruned_vertices,
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::node::NodeState;
345
346    #[test]
347    fn test_graph_basic_operations() {
348        let graph = Graph::new();
349        assert!(graph.is_empty());
350        assert_eq!(graph.len(), 0);
351
352        // Create root node
353        let root = Node::new(vec![1], vec![]);
354        let root_hash = root.hash();
355        assert!(graph.add_node(root).is_ok());
356        assert!(!graph.is_empty());
357        assert_eq!(graph.len(), 1);
358
359        // Create child node
360        let child = Node::new(vec![2], vec![root_hash]);
361        let child_hash = child.hash();
362        assert!(graph.add_node(child).is_ok());
363        assert_eq!(graph.len(), 2);
364
365        // Verify edges
366        let root_edges = graph.get_edges(&root_hash).unwrap();
367        assert_eq!(root_edges.len(), 1);
368        assert!(root_edges.iter().any(|e| e.to() == child_hash));
369    }
370
371    #[test]
372    fn test_node_state_updates() {
373        let graph = Graph::new();
374        let node = Node::new(vec![1], vec![]);
375        let hash = node.hash();
376
377        graph.add_node(node).unwrap();
378
379        // Valid transition
380        assert!(graph.update_node_state(&hash, NodeState::Verified).is_ok());
381
382        let node = graph.get_node(&hash).unwrap();
383        assert_eq!(node.state(), NodeState::Verified);
384
385        // Invalid transition
386        assert!(graph.update_node_state(&hash, NodeState::Pending).is_err());
387    }
388
389    #[test]
390    fn test_cycle_prevention() {
391        let graph = Graph::new();
392
393        // Create nodes a -> b -> c
394        let a = Node::new(vec![1], vec![]);
395        let a_hash = a.hash();
396        graph.add_node(a).unwrap();
397
398        let b = Node::new(vec![2], vec![a_hash]);
399        let b_hash = b.hash();
400        graph.add_node(b).unwrap();
401
402        let c = Node::new(vec![3], vec![b_hash]);
403        let c_hash = c.hash();
404        graph.add_node(c).unwrap();
405
406        // Attempt to create cycle by adding edge c -> a
407        let cycle_node = Node::new(vec![4], vec![c_hash, a_hash]);
408        assert!(graph.add_node(cycle_node).is_ok());
409    }
410
411    #[test]
412    fn test_missing_parent() {
413        let graph = Graph::new();
414        let missing_hash = blake3::hash(b"missing");
415        let node = Node::new(vec![1], vec![missing_hash]);
416
417        assert!(matches!(
418            graph.add_node(node),
419            Err(DagError::MissingParent(_))
420        ));
421    }
422}