1use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use tracing::{debug, info, warn};
12use uuid::Uuid;
13
14use crate::{ClusterError, Result};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DagVertex {
19 pub id: String,
21 pub node_id: String,
23 pub transaction: Transaction,
25 pub parents: Vec<String>,
27 pub timestamp: DateTime<Utc>,
29 pub vector_clock: HashMap<String, u64>,
31 pub signature: String,
33}
34
35impl DagVertex {
36 pub fn new(
38 node_id: String,
39 transaction: Transaction,
40 parents: Vec<String>,
41 vector_clock: HashMap<String, u64>,
42 ) -> Self {
43 Self {
44 id: Uuid::new_v4().to_string(),
45 node_id,
46 transaction,
47 parents,
48 timestamp: Utc::now(),
49 vector_clock,
50 signature: String::new(), }
52 }
53
54 pub fn verify_signature(&self) -> bool {
56 true
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Transaction {
64 pub id: String,
66 pub tx_type: TransactionType,
68 pub data: Vec<u8>,
70 pub nonce: u64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum TransactionType {
77 Write,
79 Read,
81 Delete,
83 Batch,
85 System,
87}
88
89pub struct DagConsensus {
91 node_id: String,
93 vertices: Arc<DashMap<String, DagVertex>>,
95 finalized: Arc<RwLock<HashSet<String>>>,
97 vector_clock: Arc<RwLock<HashMap<String, u64>>>,
99 pending_txs: Arc<RwLock<VecDeque<Transaction>>>,
101 min_quorum_size: usize,
103 nonce_counter: Arc<RwLock<u64>>,
105}
106
107impl DagConsensus {
108 pub fn new(node_id: String, min_quorum_size: usize) -> Self {
110 let mut vector_clock = HashMap::new();
111 vector_clock.insert(node_id.clone(), 0);
112
113 Self {
114 node_id,
115 vertices: Arc::new(DashMap::new()),
116 finalized: Arc::new(RwLock::new(HashSet::new())),
117 vector_clock: Arc::new(RwLock::new(vector_clock)),
118 pending_txs: Arc::new(RwLock::new(VecDeque::new())),
119 min_quorum_size,
120 nonce_counter: Arc::new(RwLock::new(0)),
121 }
122 }
123
124 pub fn submit_transaction(&self, tx_type: TransactionType, data: Vec<u8>) -> Result<String> {
126 let mut nonce = self.nonce_counter.write();
127 *nonce += 1;
128
129 let transaction = Transaction {
130 id: Uuid::new_v4().to_string(),
131 tx_type,
132 data,
133 nonce: *nonce,
134 };
135
136 let tx_id = transaction.id.clone();
137
138 let mut pending = self.pending_txs.write();
139 pending.push_back(transaction);
140
141 debug!("Transaction {} submitted to consensus", tx_id);
142 Ok(tx_id)
143 }
144
145 pub fn create_vertex(&self) -> Result<Option<DagVertex>> {
147 let mut pending = self.pending_txs.write();
148
149 if pending.is_empty() {
150 return Ok(None);
151 }
152
153 let transaction = pending.pop_front().unwrap();
155
156 let parents = self.find_tips();
158
159 let mut clock = self.vector_clock.write();
161 let count = clock.entry(self.node_id.clone()).or_insert(0);
162 *count += 1;
163
164 let vertex = DagVertex::new(self.node_id.clone(), transaction, parents, clock.clone());
165
166 let vertex_id = vertex.id.clone();
167 self.vertices.insert(vertex_id.clone(), vertex.clone());
168
169 debug!(
170 "Created vertex {} for transaction {}",
171 vertex_id, vertex.transaction.id
172 );
173 Ok(Some(vertex))
174 }
175
176 fn find_tips(&self) -> Vec<String> {
178 let mut has_children = HashSet::new();
179
180 for entry in self.vertices.iter() {
182 for parent in &entry.value().parents {
183 has_children.insert(parent.clone());
184 }
185 }
186
187 self.vertices
189 .iter()
190 .filter(|entry| !has_children.contains(entry.key()))
191 .map(|entry| entry.key().clone())
192 .collect()
193 }
194
195 pub fn add_vertex(&self, vertex: DagVertex) -> Result<()> {
197 if !vertex.verify_signature() {
199 return Err(ClusterError::ConsensusError(
200 "Invalid vertex signature".to_string(),
201 ));
202 }
203
204 for parent_id in &vertex.parents {
206 if !self.vertices.contains_key(parent_id) && !self.is_finalized(parent_id) {
207 return Err(ClusterError::ConsensusError(format!(
208 "Parent vertex {} not found",
209 parent_id
210 )));
211 }
212 }
213
214 let mut clock = self.vector_clock.write();
216 for (node, count) in &vertex.vector_clock {
217 let existing = clock.entry(node.clone()).or_insert(0);
218 *existing = (*existing).max(*count);
219 }
220
221 self.vertices.insert(vertex.id.clone(), vertex);
222 Ok(())
223 }
224
225 pub fn is_finalized(&self, vertex_id: &str) -> bool {
227 let finalized = self.finalized.read();
228 finalized.contains(vertex_id)
229 }
230
231 pub fn finalize_vertices(&self) -> Result<Vec<String>> {
233 let mut finalized_ids = Vec::new();
234
235 let mut confirmations: HashMap<String, HashSet<String>> = HashMap::new();
238
239 for entry in self.vertices.iter() {
240 let vertex = entry.value();
241
242 for other_entry in self.vertices.iter() {
244 if other_entry.value().parents.contains(&vertex.id) {
245 confirmations
246 .entry(vertex.id.clone())
247 .or_insert_with(HashSet::new)
248 .insert(other_entry.value().node_id.clone());
249 }
250 }
251 }
252
253 let mut finalized = self.finalized.write();
255
256 for (vertex_id, confirming_nodes) in confirmations {
257 if confirming_nodes.len() >= self.min_quorum_size && !finalized.contains(&vertex_id) {
258 finalized.insert(vertex_id.clone());
259 finalized_ids.push(vertex_id.clone());
260 info!("Finalized vertex {}", vertex_id);
261 }
262 }
263
264 Ok(finalized_ids)
265 }
266
267 pub fn get_finalized_order(&self) -> Vec<Transaction> {
269 let finalized = self.finalized.read();
270 let mut ordered_txs = Vec::new();
271
272 let finalized_vertices: Vec<_> = self
274 .vertices
275 .iter()
276 .filter(|entry| finalized.contains(entry.key()))
277 .map(|entry| entry.value().clone())
278 .collect();
279
280 let mut sorted = finalized_vertices;
282 sorted.sort_by(|a, b| {
283 let a_dominates = Self::vector_clock_dominates(&a.vector_clock, &b.vector_clock);
285 let b_dominates = Self::vector_clock_dominates(&b.vector_clock, &a.vector_clock);
286
287 if a_dominates && !b_dominates {
288 std::cmp::Ordering::Less
289 } else if b_dominates && !a_dominates {
290 std::cmp::Ordering::Greater
291 } else {
292 a.timestamp.cmp(&b.timestamp)
294 }
295 });
296
297 for vertex in sorted {
298 ordered_txs.push(vertex.transaction);
299 }
300
301 ordered_txs
302 }
303
304 fn vector_clock_dominates(a: &HashMap<String, u64>, b: &HashMap<String, u64>) -> bool {
306 let mut dominates = false;
307
308 for (node, &a_count) in a {
309 let b_count = b.get(node).copied().unwrap_or(0);
310 if a_count < b_count {
311 return false;
312 }
313 if a_count > b_count {
314 dominates = true;
315 }
316 }
317
318 dominates
319 }
320
321 pub fn detect_conflicts(&self, tx1: &Transaction, tx2: &Transaction) -> bool {
323 matches!(
326 (&tx1.tx_type, &tx2.tx_type),
327 (TransactionType::Write, TransactionType::Write)
328 | (TransactionType::Delete, TransactionType::Write)
329 | (TransactionType::Write, TransactionType::Delete)
330 )
331 }
332
333 pub fn get_stats(&self) -> ConsensusStats {
335 let finalized = self.finalized.read();
336 let pending = self.pending_txs.read();
337
338 ConsensusStats {
339 total_vertices: self.vertices.len(),
340 finalized_vertices: finalized.len(),
341 pending_transactions: pending.len(),
342 tips: self.find_tips().len(),
343 }
344 }
345
346 pub fn prune_old_vertices(&self, keep_count: usize) {
348 let finalized = self.finalized.read();
349
350 if finalized.len() <= keep_count {
351 return;
352 }
353
354 let mut vertices_to_remove = Vec::new();
356
357 for vertex_id in finalized.iter() {
358 if let Some(vertex) = self.vertices.get(vertex_id) {
359 vertices_to_remove.push((vertex_id.clone(), vertex.timestamp));
360 }
361 }
362
363 vertices_to_remove.sort_by_key(|(_, ts)| *ts);
364
365 let to_remove = vertices_to_remove.len().saturating_sub(keep_count);
366 for (vertex_id, _) in vertices_to_remove.iter().take(to_remove) {
367 self.vertices.remove(vertex_id);
368 }
369
370 debug!("Pruned {} old vertices", to_remove);
371 }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ConsensusStats {
377 pub total_vertices: usize,
378 pub finalized_vertices: usize,
379 pub pending_transactions: usize,
380 pub tips: usize,
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386
387 #[test]
388 fn test_consensus_creation() {
389 let consensus = DagConsensus::new("node1".to_string(), 2);
390 let stats = consensus.get_stats();
391
392 assert_eq!(stats.total_vertices, 0);
393 assert_eq!(stats.pending_transactions, 0);
394 }
395
396 #[test]
397 fn test_submit_transaction() {
398 let consensus = DagConsensus::new("node1".to_string(), 2);
399
400 let tx_id = consensus
401 .submit_transaction(TransactionType::Write, vec![1, 2, 3])
402 .unwrap();
403
404 assert!(!tx_id.is_empty());
405
406 let stats = consensus.get_stats();
407 assert_eq!(stats.pending_transactions, 1);
408 }
409
410 #[test]
411 fn test_create_vertex() {
412 let consensus = DagConsensus::new("node1".to_string(), 2);
413
414 consensus
415 .submit_transaction(TransactionType::Write, vec![1, 2, 3])
416 .unwrap();
417
418 let vertex = consensus.create_vertex().unwrap();
419 assert!(vertex.is_some());
420
421 let stats = consensus.get_stats();
422 assert_eq!(stats.total_vertices, 1);
423 assert_eq!(stats.pending_transactions, 0);
424 }
425
426 #[test]
427 fn test_vector_clock_dominance() {
428 let mut clock1 = HashMap::new();
429 clock1.insert("node1".to_string(), 2);
430 clock1.insert("node2".to_string(), 1);
431
432 let mut clock2 = HashMap::new();
433 clock2.insert("node1".to_string(), 1);
434 clock2.insert("node2".to_string(), 1);
435
436 assert!(DagConsensus::vector_clock_dominates(&clock1, &clock2));
437 assert!(!DagConsensus::vector_clock_dominates(&clock2, &clock1));
438 }
439
440 #[test]
441 fn test_conflict_detection() {
442 let consensus = DagConsensus::new("node1".to_string(), 2);
443
444 let tx1 = Transaction {
445 id: "1".to_string(),
446 tx_type: TransactionType::Write,
447 data: vec![1],
448 nonce: 1,
449 };
450
451 let tx2 = Transaction {
452 id: "2".to_string(),
453 tx_type: TransactionType::Write,
454 data: vec![2],
455 nonce: 2,
456 };
457
458 assert!(consensus.detect_conflicts(&tx1, &tx2));
459 }
460
461 #[test]
462 fn test_finalization() {
463 let consensus = DagConsensus::new("node1".to_string(), 2);
464
465 for i in 0..5 {
467 consensus
468 .submit_transaction(TransactionType::Write, vec![i])
469 .unwrap();
470 consensus.create_vertex().unwrap();
471 }
472
473 let finalized = consensus.finalize_vertices().unwrap();
475
476 assert_eq!(finalized.len(), 0);
479 }
480}