ruvector_cluster/
consensus.rs

1//! DAG-based consensus protocol inspired by QuDAG
2//!
3//! Implements a directed acyclic graph for transaction ordering and consensus.
4
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use tracing::{debug, info, warn};
12use uuid::Uuid;
13
14use crate::{ClusterError, Result};
15
16/// A vertex in the consensus DAG
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DagVertex {
19    /// Unique vertex ID
20    pub id: String,
21    /// Node that created this vertex
22    pub node_id: String,
23    /// Transaction data
24    pub transaction: Transaction,
25    /// Parent vertices (edges in the DAG)
26    pub parents: Vec<String>,
27    /// Timestamp when vertex was created
28    pub timestamp: DateTime<Utc>,
29    /// Vector clock for causality tracking
30    pub vector_clock: HashMap<String, u64>,
31    /// Signature (in production, this would be cryptographic)
32    pub signature: String,
33}
34
35impl DagVertex {
36    /// Create a new DAG vertex
37    pub fn new(
38        node_id: String,
39        transaction: Transaction,
40        parents: Vec<String>,
41        vector_clock: HashMap<String, u64>,
42    ) -> Self {
43        Self {
44            id: Uuid::new_v4().to_string(),
45            node_id,
46            transaction,
47            parents,
48            timestamp: Utc::now(),
49            vector_clock,
50            signature: String::new(), // Would be computed cryptographically
51        }
52    }
53
54    /// Verify the vertex signature
55    pub fn verify_signature(&self) -> bool {
56        // In production, verify cryptographic signature
57        true
58    }
59}
60
61/// A transaction in the consensus system
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Transaction {
64    /// Transaction ID
65    pub id: String,
66    /// Transaction type
67    pub tx_type: TransactionType,
68    /// Transaction data
69    pub data: Vec<u8>,
70    /// Nonce for ordering
71    pub nonce: u64,
72}
73
74/// Type of transaction
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum TransactionType {
77    /// Write operation
78    Write,
79    /// Read operation
80    Read,
81    /// Delete operation
82    Delete,
83    /// Batch operation
84    Batch,
85    /// System operation
86    System,
87}
88
89/// DAG-based consensus engine
90pub struct DagConsensus {
91    /// Node ID
92    node_id: String,
93    /// DAG vertices (vertex_id -> vertex)
94    vertices: Arc<DashMap<String, DagVertex>>,
95    /// Finalized vertices
96    finalized: Arc<RwLock<HashSet<String>>>,
97    /// Vector clock for this node
98    vector_clock: Arc<RwLock<HashMap<String, u64>>>,
99    /// Pending transactions
100    pending_txs: Arc<RwLock<VecDeque<Transaction>>>,
101    /// Minimum quorum size
102    min_quorum_size: usize,
103    /// Transaction nonce counter
104    nonce_counter: Arc<RwLock<u64>>,
105}
106
107impl DagConsensus {
108    /// Create a new DAG consensus engine
109    pub fn new(node_id: String, min_quorum_size: usize) -> Self {
110        let mut vector_clock = HashMap::new();
111        vector_clock.insert(node_id.clone(), 0);
112
113        Self {
114            node_id,
115            vertices: Arc::new(DashMap::new()),
116            finalized: Arc::new(RwLock::new(HashSet::new())),
117            vector_clock: Arc::new(RwLock::new(vector_clock)),
118            pending_txs: Arc::new(RwLock::new(VecDeque::new())),
119            min_quorum_size,
120            nonce_counter: Arc::new(RwLock::new(0)),
121        }
122    }
123
124    /// Submit a transaction to the consensus system
125    pub fn submit_transaction(&self, tx_type: TransactionType, data: Vec<u8>) -> Result<String> {
126        let mut nonce = self.nonce_counter.write();
127        *nonce += 1;
128
129        let transaction = Transaction {
130            id: Uuid::new_v4().to_string(),
131            tx_type,
132            data,
133            nonce: *nonce,
134        };
135
136        let tx_id = transaction.id.clone();
137
138        let mut pending = self.pending_txs.write();
139        pending.push_back(transaction);
140
141        debug!("Transaction {} submitted to consensus", tx_id);
142        Ok(tx_id)
143    }
144
145    /// Create a new vertex for pending transactions
146    pub fn create_vertex(&self) -> Result<Option<DagVertex>> {
147        let mut pending = self.pending_txs.write();
148
149        if pending.is_empty() {
150            return Ok(None);
151        }
152
153        // Take the next transaction
154        let transaction = pending.pop_front().unwrap();
155
156        // Find parent vertices (tips of the DAG)
157        let parents = self.find_tips();
158
159        // Update vector clock
160        let mut clock = self.vector_clock.write();
161        let count = clock.entry(self.node_id.clone()).or_insert(0);
162        *count += 1;
163
164        let vertex = DagVertex::new(self.node_id.clone(), transaction, parents, clock.clone());
165
166        let vertex_id = vertex.id.clone();
167        self.vertices.insert(vertex_id.clone(), vertex.clone());
168
169        debug!(
170            "Created vertex {} for transaction {}",
171            vertex_id, vertex.transaction.id
172        );
173        Ok(Some(vertex))
174    }
175
176    /// Find tip vertices (vertices with no children)
177    fn find_tips(&self) -> Vec<String> {
178        let mut has_children = HashSet::new();
179
180        // Mark all vertices that have children
181        for entry in self.vertices.iter() {
182            for parent in &entry.value().parents {
183                has_children.insert(parent.clone());
184            }
185        }
186
187        // Find vertices without children
188        self.vertices
189            .iter()
190            .filter(|entry| !has_children.contains(entry.key()))
191            .map(|entry| entry.key().clone())
192            .collect()
193    }
194
195    /// Add a vertex from another node
196    pub fn add_vertex(&self, vertex: DagVertex) -> Result<()> {
197        // Verify signature
198        if !vertex.verify_signature() {
199            return Err(ClusterError::ConsensusError(
200                "Invalid vertex signature".to_string(),
201            ));
202        }
203
204        // Verify parents exist
205        for parent_id in &vertex.parents {
206            if !self.vertices.contains_key(parent_id) && !self.is_finalized(parent_id) {
207                return Err(ClusterError::ConsensusError(format!(
208                    "Parent vertex {} not found",
209                    parent_id
210                )));
211            }
212        }
213
214        // Merge vector clock
215        let mut clock = self.vector_clock.write();
216        for (node, count) in &vertex.vector_clock {
217            let existing = clock.entry(node.clone()).or_insert(0);
218            *existing = (*existing).max(*count);
219        }
220
221        self.vertices.insert(vertex.id.clone(), vertex);
222        Ok(())
223    }
224
225    /// Check if a vertex is finalized
226    pub fn is_finalized(&self, vertex_id: &str) -> bool {
227        let finalized = self.finalized.read();
228        finalized.contains(vertex_id)
229    }
230
231    /// Finalize vertices using the wave algorithm
232    pub fn finalize_vertices(&self) -> Result<Vec<String>> {
233        let mut finalized_ids = Vec::new();
234
235        // Find vertices that can be finalized
236        // A vertex is finalized if it has enough confirmations from different nodes
237        let mut confirmations: HashMap<String, HashSet<String>> = HashMap::new();
238
239        for entry in self.vertices.iter() {
240            let vertex = entry.value();
241
242            // Count confirmations (vertices that reference this one)
243            for other_entry in self.vertices.iter() {
244                if other_entry.value().parents.contains(&vertex.id) {
245                    confirmations
246                        .entry(vertex.id.clone())
247                        .or_insert_with(HashSet::new)
248                        .insert(other_entry.value().node_id.clone());
249                }
250            }
251        }
252
253        // Finalize vertices with enough confirmations
254        let mut finalized = self.finalized.write();
255
256        for (vertex_id, confirming_nodes) in confirmations {
257            if confirming_nodes.len() >= self.min_quorum_size && !finalized.contains(&vertex_id) {
258                finalized.insert(vertex_id.clone());
259                finalized_ids.push(vertex_id.clone());
260                info!("Finalized vertex {}", vertex_id);
261            }
262        }
263
264        Ok(finalized_ids)
265    }
266
267    /// Get the total order of finalized transactions
268    pub fn get_finalized_order(&self) -> Vec<Transaction> {
269        let finalized = self.finalized.read();
270        let mut ordered_txs = Vec::new();
271
272        // Topological sort of finalized vertices
273        let finalized_vertices: Vec<_> = self
274            .vertices
275            .iter()
276            .filter(|entry| finalized.contains(entry.key()))
277            .map(|entry| entry.value().clone())
278            .collect();
279
280        // Sort by vector clock and timestamp
281        let mut sorted = finalized_vertices;
282        sorted.sort_by(|a, b| {
283            // First by vector clock dominance
284            let a_dominates = Self::vector_clock_dominates(&a.vector_clock, &b.vector_clock);
285            let b_dominates = Self::vector_clock_dominates(&b.vector_clock, &a.vector_clock);
286
287            if a_dominates && !b_dominates {
288                std::cmp::Ordering::Less
289            } else if b_dominates && !a_dominates {
290                std::cmp::Ordering::Greater
291            } else {
292                // Fall back to timestamp
293                a.timestamp.cmp(&b.timestamp)
294            }
295        });
296
297        for vertex in sorted {
298            ordered_txs.push(vertex.transaction);
299        }
300
301        ordered_txs
302    }
303
304    /// Check if vector clock a dominates vector clock b
305    fn vector_clock_dominates(a: &HashMap<String, u64>, b: &HashMap<String, u64>) -> bool {
306        let mut dominates = false;
307
308        for (node, &a_count) in a {
309            let b_count = b.get(node).copied().unwrap_or(0);
310            if a_count < b_count {
311                return false;
312            }
313            if a_count > b_count {
314                dominates = true;
315            }
316        }
317
318        dominates
319    }
320
321    /// Detect conflicts between transactions
322    pub fn detect_conflicts(&self, tx1: &Transaction, tx2: &Transaction) -> bool {
323        // In a real implementation, this would analyze transaction data
324        // For now, conservatively assume all writes conflict
325        matches!(
326            (&tx1.tx_type, &tx2.tx_type),
327            (TransactionType::Write, TransactionType::Write)
328                | (TransactionType::Delete, TransactionType::Write)
329                | (TransactionType::Write, TransactionType::Delete)
330        )
331    }
332
333    /// Get consensus statistics
334    pub fn get_stats(&self) -> ConsensusStats {
335        let finalized = self.finalized.read();
336        let pending = self.pending_txs.read();
337
338        ConsensusStats {
339            total_vertices: self.vertices.len(),
340            finalized_vertices: finalized.len(),
341            pending_transactions: pending.len(),
342            tips: self.find_tips().len(),
343        }
344    }
345
346    /// Prune old finalized vertices to save memory
347    pub fn prune_old_vertices(&self, keep_count: usize) {
348        let finalized = self.finalized.read();
349
350        if finalized.len() <= keep_count {
351            return;
352        }
353
354        // Remove oldest finalized vertices
355        let mut vertices_to_remove = Vec::new();
356
357        for vertex_id in finalized.iter() {
358            if let Some(vertex) = self.vertices.get(vertex_id) {
359                vertices_to_remove.push((vertex_id.clone(), vertex.timestamp));
360            }
361        }
362
363        vertices_to_remove.sort_by_key(|(_, ts)| *ts);
364
365        let to_remove = vertices_to_remove.len().saturating_sub(keep_count);
366        for (vertex_id, _) in vertices_to_remove.iter().take(to_remove) {
367            self.vertices.remove(vertex_id);
368        }
369
370        debug!("Pruned {} old vertices", to_remove);
371    }
372}
373
374/// Consensus statistics
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ConsensusStats {
377    pub total_vertices: usize,
378    pub finalized_vertices: usize,
379    pub pending_transactions: usize,
380    pub tips: usize,
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386
387    #[test]
388    fn test_consensus_creation() {
389        let consensus = DagConsensus::new("node1".to_string(), 2);
390        let stats = consensus.get_stats();
391
392        assert_eq!(stats.total_vertices, 0);
393        assert_eq!(stats.pending_transactions, 0);
394    }
395
396    #[test]
397    fn test_submit_transaction() {
398        let consensus = DagConsensus::new("node1".to_string(), 2);
399
400        let tx_id = consensus
401            .submit_transaction(TransactionType::Write, vec![1, 2, 3])
402            .unwrap();
403
404        assert!(!tx_id.is_empty());
405
406        let stats = consensus.get_stats();
407        assert_eq!(stats.pending_transactions, 1);
408    }
409
410    #[test]
411    fn test_create_vertex() {
412        let consensus = DagConsensus::new("node1".to_string(), 2);
413
414        consensus
415            .submit_transaction(TransactionType::Write, vec![1, 2, 3])
416            .unwrap();
417
418        let vertex = consensus.create_vertex().unwrap();
419        assert!(vertex.is_some());
420
421        let stats = consensus.get_stats();
422        assert_eq!(stats.total_vertices, 1);
423        assert_eq!(stats.pending_transactions, 0);
424    }
425
426    #[test]
427    fn test_vector_clock_dominance() {
428        let mut clock1 = HashMap::new();
429        clock1.insert("node1".to_string(), 2);
430        clock1.insert("node2".to_string(), 1);
431
432        let mut clock2 = HashMap::new();
433        clock2.insert("node1".to_string(), 1);
434        clock2.insert("node2".to_string(), 1);
435
436        assert!(DagConsensus::vector_clock_dominates(&clock1, &clock2));
437        assert!(!DagConsensus::vector_clock_dominates(&clock2, &clock1));
438    }
439
440    #[test]
441    fn test_conflict_detection() {
442        let consensus = DagConsensus::new("node1".to_string(), 2);
443
444        let tx1 = Transaction {
445            id: "1".to_string(),
446            tx_type: TransactionType::Write,
447            data: vec![1],
448            nonce: 1,
449        };
450
451        let tx2 = Transaction {
452            id: "2".to_string(),
453            tx_type: TransactionType::Write,
454            data: vec![2],
455            nonce: 2,
456        };
457
458        assert!(consensus.detect_conflicts(&tx1, &tx2));
459    }
460
461    #[test]
462    fn test_finalization() {
463        let consensus = DagConsensus::new("node1".to_string(), 2);
464
465        // Create some vertices
466        for i in 0..5 {
467            consensus
468                .submit_transaction(TransactionType::Write, vec![i])
469                .unwrap();
470            consensus.create_vertex().unwrap();
471        }
472
473        // Try to finalize
474        let finalized = consensus.finalize_vertices().unwrap();
475
476        // Without enough confirmations, nothing should be finalized yet
477        // (would need vertices from other nodes)
478        assert_eq!(finalized.len(), 0);
479    }
480}