1use blake3::Hash;
2use dashmap::DashMap;
3use lru::LruCache;
4use parking_lot::RwLock;
5use rayon::prelude::*;
6use std::collections::{HashSet, VecDeque};
7use std::num::NonZeroUsize;
8use std::time::Instant;
9use tracing::info;
10
11use crate::{DagError, Edge, Node, Result};
12
13#[derive(Debug, Default)]
15pub struct GraphMetrics {
16 pub avg_vertex_time_ns: u64,
18 pub vertices_processed: u64,
20 pub cache_hit_rate: f64,
22 pub memory_usage_bytes: usize,
24 pub pruned_vertices: u64,
26}
27
28#[derive(Debug, Clone)]
30pub struct StorageConfig {
31 pub max_vertices: usize,
33 pub max_edges: usize,
35 pub pruning_threshold: usize,
37 pub cache_depth: usize,
39}
40
41impl Default for StorageConfig {
42 fn default() -> Self {
43 Self {
44 max_vertices: 100_000,
45 max_edges: 500_000,
46 pruning_threshold: 10_000,
47 cache_depth: 1000,
48 }
49 }
50}
51
52struct VertexStorage {
54 vertices: DashMap<Hash, Node>,
56 cache: RwLock<LruCache<Hash, Node>>,
58 pruning_queue: RwLock<VecDeque<Hash>>,
60 config: StorageConfig,
62 cache_hits: std::sync::atomic::AtomicU64,
64 cache_misses: std::sync::atomic::AtomicU64,
65}
66
67impl VertexStorage {
68 fn new(config: StorageConfig) -> Self {
69 let cache_size = NonZeroUsize::new(config.cache_depth).unwrap();
70 Self {
71 vertices: DashMap::with_capacity(config.max_vertices),
72 cache: RwLock::new(LruCache::new(cache_size)),
73 pruning_queue: RwLock::new(VecDeque::new()),
74 config,
75 cache_hits: std::sync::atomic::AtomicU64::new(0),
76 cache_misses: std::sync::atomic::AtomicU64::new(0),
77 }
78 }
79
80 fn get(&self, hash: &Hash) -> Option<Node> {
81 if let Some(node) = self.cache.write().get(hash) {
83 self.cache_hits
84 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
85 return Some(node.clone());
86 }
87
88 if let Some(node) = self.vertices.get(hash) {
90 let node = node.clone();
91 self.cache.write().put(*hash, node.clone());
93 self.cache_misses
94 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
95 Some(node)
96 } else {
97 None
98 }
99 }
100
101 fn insert(&self, hash: Hash, node: Node) -> Result<()> {
102 if self.vertices.len() >= self.config.max_vertices {
104 self.prune_old_vertices()?;
105 }
106
107 self.vertices.insert(hash, node.clone());
109
110 self.cache.write().put(hash, node);
112
113 self.pruning_queue.write().push_back(hash);
115
116 Ok(())
117 }
118
119 fn prune_old_vertices(&self) -> Result<()> {
120 let mut pruning_queue = self.pruning_queue.write();
121 let target_size = self.config.pruning_threshold;
122 let current_size = self.vertices.len();
123
124 if current_size <= target_size {
125 return Ok(());
126 }
127
128 let to_remove = current_size - target_size;
129 let mut removed = 0;
130
131 while removed < to_remove && !pruning_queue.is_empty() {
132 if let Some(hash) = pruning_queue.pop_front() {
133 if let Some(node) = self.vertices.get(&hash) {
135 if matches!(node.state(), crate::NodeState::Final) {
136 self.vertices.remove(&hash);
137 removed += 1;
138 }
139 }
140 }
141 }
142
143 info!("Pruned {} vertices from storage", removed);
144 Ok(())
145 }
146
147 fn cache_hit_rate(&self) -> f64 {
148 let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed);
149 let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
150 let total = hits + misses;
151 if total == 0 {
152 0.0
153 } else {
154 hits as f64 / total as f64
155 }
156 }
157
158 fn memory_usage(&self) -> usize {
159 self.vertices.len() * 256
161 }
162}
163
164pub struct Graph {
166 storage: VertexStorage,
168 edges: DashMap<Hash, HashSet<Edge>>,
170 metrics: RwLock<GraphMetrics>,
172}
173
174impl Default for Graph {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180impl Graph {
181 pub fn new() -> Self {
183 Self::with_config(StorageConfig::default())
184 }
185
186 pub fn with_capacity(capacity: usize) -> Self {
188 let config = StorageConfig {
189 max_vertices: capacity,
190 max_edges: capacity * 5,
191 ..StorageConfig::default()
192 };
193 Self::with_config(config)
194 }
195
196 pub fn with_config(config: StorageConfig) -> Self {
198 Self {
199 storage: VertexStorage::new(config.clone()),
200 edges: DashMap::with_capacity(config.max_edges),
201 metrics: RwLock::new(GraphMetrics::default()),
202 }
203 }
204
205 pub fn is_empty(&self) -> bool {
207 self.storage.vertices.is_empty()
208 }
209
210 pub fn len(&self) -> usize {
212 self.storage.vertices.len()
213 }
214
215 pub fn add_node(&self, node: Node) -> Result<()> {
217 let start = Instant::now();
218 let node_hash = node.hash();
219
220 if self.storage.get(&node_hash).is_some() {
222 return Err(DagError::NodeExists(format!("{:?}", node_hash)));
223 }
224
225 let parents = node.parents();
227 let missing_parent = parents
228 .par_iter()
229 .find_first(|parent| self.storage.get(parent).is_none());
230
231 if let Some(parent) = missing_parent {
232 return Err(DagError::MissingParent(format!("{:?}", parent)));
233 }
234
235 self.storage.insert(node_hash, node)?;
237
238 self.edges.entry(node_hash).or_default();
240
241 if let Some(node) = self.storage.get(&node_hash) {
243 let parents = node.parents();
244 parents.par_iter().for_each(|parent| {
245 let edge = Edge::new(*parent, node_hash);
246 if let Some(mut parent_edges) = self.edges.get_mut(parent) {
247 parent_edges.insert(edge);
248 }
249 });
250 }
251
252 let elapsed = start.elapsed().as_nanos() as u64;
254 let mut metrics = self.metrics.write();
255 metrics.vertices_processed += 1;
256 metrics.avg_vertex_time_ns =
257 (metrics.avg_vertex_time_ns * (metrics.vertices_processed - 1) + elapsed)
258 / metrics.vertices_processed;
259 metrics.cache_hit_rate = self.storage.cache_hit_rate();
260 metrics.memory_usage_bytes = self.storage.memory_usage();
261
262 Ok(())
263 }
264
265 pub fn get_node(&self, hash: &Hash) -> Option<Node> {
267 self.storage.get(hash)
268 }
269
270 pub fn get_edges(&self, hash: &Hash) -> Option<HashSet<Edge>> {
272 self.edges.get(hash).map(|edges| edges.clone())
274 }
275
276 pub fn update_node_state(&self, hash: &Hash, new_state: crate::node::NodeState) -> Result<()> {
278 let mut node = self
280 .storage
281 .get(hash)
282 .ok_or_else(|| DagError::NodeNotFound(format!("{:?}", hash)))?;
283
284 node.update_state(new_state)?;
286
287 self.storage.insert(*hash, node)?;
289
290 Ok(())
291 }
292
293 #[allow(dead_code)]
295 fn would_create_cycle(&self, from: &Hash, to: &Hash, visited: &mut HashSet<Hash>) -> bool {
296 if from == to {
297 return true;
298 }
299
300 if !visited.insert(*from) {
301 return false;
302 }
303
304 if let Some(edges) = self.edges.get(from) {
305 for edge in edges.iter() {
306 let edge_to = edge.to();
307 if self.would_create_cycle(&edge_to, to, visited) {
308 return true;
309 }
310 }
311 }
312
313 false
314 }
315
316 pub fn prune(&self) -> Result<()> {
318 self.storage.prune_old_vertices()?;
319
320 let mut metrics = self.metrics.write();
322 metrics.pruned_vertices += 1;
323 metrics.memory_usage_bytes = self.storage.memory_usage();
324
325 Ok(())
326 }
327
328 pub fn metrics(&self) -> GraphMetrics {
330 let metrics_guard = self.metrics.read();
331 GraphMetrics {
332 avg_vertex_time_ns: metrics_guard.avg_vertex_time_ns,
333 vertices_processed: metrics_guard.vertices_processed,
334 cache_hit_rate: self.storage.cache_hit_rate(),
335 memory_usage_bytes: self.storage.memory_usage(),
336 pruned_vertices: metrics_guard.pruned_vertices,
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::node::NodeState;
345
346 #[test]
347 fn test_graph_basic_operations() {
348 let graph = Graph::new();
349 assert!(graph.is_empty());
350 assert_eq!(graph.len(), 0);
351
352 let root = Node::new(vec![1], vec![]);
354 let root_hash = root.hash();
355 assert!(graph.add_node(root).is_ok());
356 assert!(!graph.is_empty());
357 assert_eq!(graph.len(), 1);
358
359 let child = Node::new(vec![2], vec![root_hash]);
361 let child_hash = child.hash();
362 assert!(graph.add_node(child).is_ok());
363 assert_eq!(graph.len(), 2);
364
365 let root_edges = graph.get_edges(&root_hash).unwrap();
367 assert_eq!(root_edges.len(), 1);
368 assert!(root_edges.iter().any(|e| e.to() == child_hash));
369 }
370
371 #[test]
372 fn test_node_state_updates() {
373 let graph = Graph::new();
374 let node = Node::new(vec![1], vec![]);
375 let hash = node.hash();
376
377 graph.add_node(node).unwrap();
378
379 assert!(graph.update_node_state(&hash, NodeState::Verified).is_ok());
381
382 let node = graph.get_node(&hash).unwrap();
383 assert_eq!(node.state(), NodeState::Verified);
384
385 assert!(graph.update_node_state(&hash, NodeState::Pending).is_err());
387 }
388
389 #[test]
390 fn test_cycle_prevention() {
391 let graph = Graph::new();
392
393 let a = Node::new(vec![1], vec![]);
395 let a_hash = a.hash();
396 graph.add_node(a).unwrap();
397
398 let b = Node::new(vec![2], vec![a_hash]);
399 let b_hash = b.hash();
400 graph.add_node(b).unwrap();
401
402 let c = Node::new(vec![3], vec![b_hash]);
403 let c_hash = c.hash();
404 graph.add_node(c).unwrap();
405
406 let cycle_node = Node::new(vec![4], vec![c_hash, a_hash]);
408 assert!(graph.add_node(cycle_node).is_ok());
409 }
410
411 #[test]
412 fn test_missing_parent() {
413 let graph = Graph::new();
414 let missing_hash = blake3::hash(b"missing");
415 let node = Node::new(vec![1], vec![missing_hash]);
416
417 assert!(matches!(
418 graph.add_node(node),
419 Err(DagError::MissingParent(_))
420 ));
421 }
422}