1use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::Instant;
20
21pub type MerkleHash = [u8; 32];
23
24#[allow(dead_code)]
26fn hash_to_hex(hash: &MerkleHash) -> String {
27 hex::encode(hash)
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum MerkleNode {
33 Leaf { hash: MerkleHash, data_key: String },
35 Internal {
37 hash: MerkleHash,
38 left: Box<MerkleNode>,
39 right: Box<MerkleNode>,
40 },
41}
42
43impl MerkleNode {
44 pub fn hash(&self) -> &MerkleHash {
46 match self {
47 MerkleNode::Leaf { hash, .. } => hash,
48 MerkleNode::Internal { hash, .. } => hash,
49 }
50 }
51
52 pub fn is_leaf(&self) -> bool {
54 matches!(self, MerkleNode::Leaf { .. })
55 }
56
57 pub fn depth(&self) -> usize {
59 match self {
60 MerkleNode::Leaf { .. } => 0,
61 MerkleNode::Internal { left, right, .. } => {
62 1 + std::cmp::max(left.depth(), right.depth())
63 }
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct MerkleTree {
71 root: Arc<RwLock<Option<MerkleNode>>>,
72 leaves: Arc<RwLock<BTreeMap<String, MerkleHash>>>,
73 stats: Arc<RwLock<MerkleTreeStats>>,
74 hash_counter: Arc<AtomicU64>,
76 rebuild_time_ns: Arc<AtomicU64>,
78}
79
80#[derive(Debug, Clone, Default, Serialize, Deserialize)]
82pub struct MerkleTreeStats {
83 pub leaf_count: usize,
85 pub depth: usize,
87 pub total_verifications: u64,
89 pub successful_verifications: u64,
91 pub failed_verifications: u64,
93 pub total_rebuilds: u64,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct MerkleProof {
100 pub data_key: String,
102 pub leaf_hash: MerkleHash,
104 pub path: Vec<(MerkleHash, bool)>,
106 pub root_hash: MerkleHash,
108}
109
110impl MerkleTree {
111 pub fn new() -> Self {
113 Self {
114 root: Arc::new(RwLock::new(None)),
115 leaves: Arc::new(RwLock::new(BTreeMap::new())),
116 stats: Arc::new(RwLock::new(MerkleTreeStats::default())),
117 hash_counter: Arc::new(AtomicU64::new(0)),
118 rebuild_time_ns: Arc::new(AtomicU64::new(0)),
119 }
120 }
121
122 pub fn hash_operations(&self) -> u64 {
124 self.hash_counter.load(Ordering::Relaxed)
125 }
126
127 pub async fn average_rebuild_time_us(&self) -> f64 {
129 let stats = self.stats.read().await;
130 if stats.total_rebuilds == 0 {
131 return 0.0;
132 }
133 let total_ns = self.rebuild_time_ns.load(Ordering::Relaxed);
134 (total_ns as f64) / (stats.total_rebuilds as f64) / 1000.0
135 }
136
137 fn hash_data(&self, data: &str) -> MerkleHash {
139 self.hash_counter.fetch_add(1, Ordering::Relaxed);
140 let mut hasher = Sha256::new();
141 hasher.update(data.as_bytes());
142 hasher.finalize().into()
143 }
144
145 fn hash_nodes(&self, left: &MerkleHash, right: &MerkleHash) -> MerkleHash {
147 self.hash_counter.fetch_add(1, Ordering::Relaxed);
148 let mut hasher = Sha256::new();
149 hasher.update(left);
150 hasher.update(right);
151 hasher.finalize().into()
152 }
153
154 fn batch_hash_data(&self, items: &[(String, String)]) -> Vec<(String, MerkleHash)> {
165 use rayon::prelude::*;
166
167 items
169 .par_iter()
170 .map(|(key, data)| {
171 self.hash_counter.fetch_add(1, Ordering::Relaxed);
172 let mut hasher = Sha256::new();
173 hasher.update(data.as_bytes());
174 let hash = hasher.finalize().into();
175 (key.clone(), hash)
176 })
177 .collect()
178 }
179
180 pub async fn insert(&self, key: String, data: &str) {
182 let hash = self.hash_data(data);
183
184 let mut leaves = self.leaves.write().await;
185 leaves.insert(key, hash);
186
187 drop(leaves);
188
189 self.rebuild().await;
191 }
192
193 pub async fn insert_batch(&self, items: Vec<(String, String)>) {
211 if items.is_empty() {
212 return;
213 }
214
215 let hashed_items = self.batch_hash_data(&items);
217
218 let mut leaves = self.leaves.write().await;
220 for (key, hash) in hashed_items {
221 leaves.insert(key, hash);
222 }
223 drop(leaves);
224
225 self.rebuild().await;
227 }
228
229 pub async fn remove(&self, key: &str) {
231 let mut leaves = self.leaves.write().await;
232 leaves.remove(key);
233
234 drop(leaves);
235
236 self.rebuild().await;
238 }
239
240 async fn rebuild(&self) {
242 let start = Instant::now();
243
244 let leaves = self.leaves.read().await;
245
246 if leaves.is_empty() {
247 *self.root.write().await = None;
248
249 let mut stats = self.stats.write().await;
250 stats.leaf_count = 0;
251 stats.depth = 0;
252 stats.total_rebuilds += 1;
253
254 let elapsed_ns = start.elapsed().as_nanos() as u64;
256 self.rebuild_time_ns
257 .fetch_add(elapsed_ns, Ordering::Relaxed);
258
259 return;
260 }
261
262 let mut nodes: Vec<MerkleNode> = leaves
264 .iter()
265 .map(|(key, hash)| MerkleNode::Leaf {
266 hash: *hash,
267 data_key: key.clone(),
268 })
269 .collect();
270
271 while nodes.len() > 1 {
273 let mut next_level = Vec::new();
274
275 for chunk in nodes.chunks(2) {
276 if chunk.len() == 2 {
277 let hash = self.hash_nodes(chunk[0].hash(), chunk[1].hash());
279 next_level.push(MerkleNode::Internal {
280 hash,
281 left: Box::new(chunk[0].clone()),
282 right: Box::new(chunk[1].clone()),
283 });
284 } else {
285 next_level.push(chunk[0].clone());
287 }
288 }
289
290 nodes = next_level;
291 }
292
293 let root_node = nodes.into_iter().next();
294
295 let depth = root_node.as_ref().map(|n| n.depth()).unwrap_or(0);
296
297 *self.root.write().await = root_node;
298
299 let mut stats = self.stats.write().await;
300 stats.leaf_count = leaves.len();
301 stats.depth = depth;
302 stats.total_rebuilds += 1;
303
304 let elapsed_ns = start.elapsed().as_nanos() as u64;
306 self.rebuild_time_ns
307 .fetch_add(elapsed_ns, Ordering::Relaxed);
308 }
309
310 pub async fn root_hash(&self) -> Option<MerkleHash> {
312 self.root.read().await.as_ref().map(|node| *node.hash())
313 }
314
315 pub async fn verify(&self, key: &str, data: &str) -> bool {
317 let hash = self.hash_data(data);
318
319 let leaves = self.leaves.read().await;
320 let result = leaves
321 .get(key)
322 .map(|stored_hash| *stored_hash == hash)
323 .unwrap_or(false);
324
325 let mut stats = self.stats.write().await;
326 stats.total_verifications += 1;
327
328 if result {
329 stats.successful_verifications += 1;
330 } else {
331 stats.failed_verifications += 1;
332 }
333
334 result
335 }
336
337 pub async fn generate_proof(&self, key: &str) -> Option<MerkleProof> {
339 let leaves = self.leaves.read().await;
340 let leaf_hash = *leaves.get(key)?;
341
342 let root = self.root.read().await;
343 let root_node = root.as_ref()?;
344 let root_hash = *root_node.hash();
345
346 let path = self.find_proof_path(root_node, key);
348
349 Some(MerkleProof {
350 data_key: key.to_string(),
351 leaf_hash,
352 path,
353 root_hash,
354 })
355 }
356
357 fn find_proof_path(&self, node: &MerkleNode, key: &str) -> Vec<(MerkleHash, bool)> {
359 match node {
360 MerkleNode::Leaf { data_key, .. } => {
361 if data_key == key {
362 Vec::new()
363 } else {
364 Vec::new()
365 }
366 }
367 MerkleNode::Internal { left, right, .. } => {
368 if self.contains_key(left, key) {
370 let mut path = self.find_proof_path(left, key);
371 path.push((*right.hash(), false));
373 path
374 } else {
375 let mut path = self.find_proof_path(right, key);
376 path.push((*left.hash(), true));
378 path
379 }
380 }
381 }
382 }
383
384 fn contains_key(&self, node: &MerkleNode, key: &str) -> bool {
386 match node {
387 MerkleNode::Leaf { data_key, .. } => data_key == key,
388 MerkleNode::Internal { left, right, .. } => {
389 self.contains_key(left, key) || self.contains_key(right, key)
390 }
391 }
392 }
393
394 pub fn verify_proof(&self, proof: &MerkleProof, data: &str) -> bool {
396 let computed_hash = self.hash_data(data);
397
398 if computed_hash != proof.leaf_hash {
399 return false;
400 }
401
402 let mut current_hash = proof.leaf_hash;
404
405 for (sibling_hash, is_left_sibling) in &proof.path {
406 current_hash = if *is_left_sibling {
407 self.hash_nodes(sibling_hash, ¤t_hash)
409 } else {
410 self.hash_nodes(¤t_hash, sibling_hash)
412 };
413 }
414
415 current_hash == proof.root_hash
416 }
417
418 pub async fn compare(&self, other: &MerkleTree) -> MerkleComparison {
420 let our_root = self.root_hash().await;
421 let their_root = other.root_hash().await;
422
423 if our_root == their_root {
424 return MerkleComparison::Identical;
425 }
426
427 let our_leaves = self.leaves.read().await;
429 let their_leaves = other.leaves.read().await;
430
431 let mut missing_keys = Vec::new();
432 let mut extra_keys = Vec::new();
433 let mut conflicting_keys = Vec::new();
434
435 for key in our_leaves.keys() {
437 if !their_leaves.contains_key(key) {
438 extra_keys.push(key.clone());
439 }
440 }
441
442 for (key, their_hash) in their_leaves.iter() {
444 if let Some(our_hash) = our_leaves.get(key) {
445 if our_hash != their_hash {
446 conflicting_keys.push(key.clone());
447 }
448 } else {
449 missing_keys.push(key.clone());
450 }
451 }
452
453 MerkleComparison::Different {
454 missing_keys,
455 extra_keys,
456 conflicting_keys,
457 }
458 }
459
460 pub async fn get_stats(&self) -> MerkleTreeStats {
462 self.stats.read().await.clone()
463 }
464
465 pub async fn get_keys(&self) -> Vec<String> {
467 self.leaves.read().await.keys().cloned().collect()
468 }
469
470 pub async fn len(&self) -> usize {
472 self.leaves.read().await.len()
473 }
474
475 pub async fn is_empty(&self) -> bool {
477 self.leaves.read().await.is_empty()
478 }
479
480 pub async fn clear(&self) {
482 self.leaves.write().await.clear();
483 *self.root.write().await = None;
484
485 let mut stats = self.stats.write().await;
486 stats.leaf_count = 0;
487 stats.depth = 0;
488 }
489}
490
491impl Default for MerkleTree {
492 fn default() -> Self {
493 Self::new()
494 }
495}
496
497#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
499pub enum MerkleComparison {
500 Identical,
502 Different {
504 missing_keys: Vec<String>,
506 extra_keys: Vec<String>,
508 conflicting_keys: Vec<String>,
510 },
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[tokio::test]
518 async fn test_merkle_tree_creation() {
519 let tree = MerkleTree::new();
520 assert!(tree.is_empty().await);
521 assert_eq!(tree.len().await, 0);
522 assert!(tree.root_hash().await.is_none());
523 }
524
525 #[tokio::test]
526 async fn test_insert_and_verify() {
527 let tree = MerkleTree::new();
528
529 tree.insert("key1".to_string(), "value1").await;
530 tree.insert("key2".to_string(), "value2").await;
531
532 assert_eq!(tree.len().await, 2);
533 assert!(tree.root_hash().await.is_some());
534
535 assert!(tree.verify("key1", "value1").await);
536 assert!(tree.verify("key2", "value2").await);
537 assert!(!tree.verify("key1", "wrong_value").await);
538 }
539
540 #[tokio::test]
541 async fn test_remove() {
542 let tree = MerkleTree::new();
543
544 tree.insert("key1".to_string(), "value1").await;
545 tree.insert("key2".to_string(), "value2").await;
546
547 assert_eq!(tree.len().await, 2);
548
549 tree.remove("key1").await;
550
551 assert_eq!(tree.len().await, 1);
552 assert!(!tree.verify("key1", "value1").await);
553 assert!(tree.verify("key2", "value2").await);
554 }
555
556 #[tokio::test]
557 async fn test_root_hash_changes() {
558 let tree = MerkleTree::new();
559
560 tree.insert("key1".to_string(), "value1").await;
561 let hash1 = tree.root_hash().await;
562
563 tree.insert("key2".to_string(), "value2").await;
564 let hash2 = tree.root_hash().await;
565
566 assert_ne!(hash1, hash2);
567 }
568
569 #[tokio::test]
570 async fn test_merkle_proof() {
571 let tree = MerkleTree::new();
572
573 tree.insert("key1".to_string(), "value1").await;
574 tree.insert("key2".to_string(), "value2").await;
575 tree.insert("key3".to_string(), "value3").await;
576
577 let proof = tree.generate_proof("key2").await;
578 assert!(proof.is_some());
579
580 let proof = proof.unwrap();
581 assert_eq!(proof.data_key, "key2");
582
583 assert!(tree.verify_proof(&proof, "value2"));
585 assert!(!tree.verify_proof(&proof, "wrong_value"));
586 }
587
588 #[tokio::test]
589 async fn test_compare_identical_trees() {
590 let tree1 = MerkleTree::new();
591 let tree2 = MerkleTree::new();
592
593 tree1.insert("key1".to_string(), "value1").await;
594 tree1.insert("key2".to_string(), "value2").await;
595
596 tree2.insert("key1".to_string(), "value1").await;
597 tree2.insert("key2".to_string(), "value2").await;
598
599 let comparison = tree1.compare(&tree2).await;
600 assert_eq!(comparison, MerkleComparison::Identical);
601 }
602
603 #[tokio::test]
604 async fn test_compare_different_trees() {
605 let tree1 = MerkleTree::new();
606 let tree2 = MerkleTree::new();
607
608 tree1.insert("key1".to_string(), "value1").await;
609 tree1.insert("key2".to_string(), "value2").await;
610
611 tree2.insert("key2".to_string(), "value2").await;
612 tree2.insert("key3".to_string(), "value3").await;
613
614 let comparison = tree1.compare(&tree2).await;
615
616 match comparison {
617 MerkleComparison::Different {
618 missing_keys,
619 extra_keys,
620 conflicting_keys,
621 } => {
622 assert_eq!(missing_keys, vec!["key3"]);
623 assert_eq!(extra_keys, vec!["key1"]);
624 assert!(conflicting_keys.is_empty());
625 }
626 _ => panic!("Expected different trees"),
627 }
628 }
629
630 #[tokio::test]
631 async fn test_compare_conflicting_trees() {
632 let tree1 = MerkleTree::new();
633 let tree2 = MerkleTree::new();
634
635 tree1.insert("key1".to_string(), "value1").await;
636 tree2.insert("key1".to_string(), "different_value").await;
637
638 let comparison = tree1.compare(&tree2).await;
639
640 match comparison {
641 MerkleComparison::Different {
642 missing_keys,
643 extra_keys,
644 conflicting_keys,
645 } => {
646 assert!(missing_keys.is_empty());
647 assert!(extra_keys.is_empty());
648 assert_eq!(conflicting_keys, vec!["key1"]);
649 }
650 _ => panic!("Expected different trees"),
651 }
652 }
653
654 #[tokio::test]
655 async fn test_stats_tracking() {
656 let tree = MerkleTree::new();
657
658 tree.insert("key1".to_string(), "value1").await;
659 tree.insert("key2".to_string(), "value2").await;
660
661 tree.verify("key1", "value1").await;
662 tree.verify("key2", "wrong_value").await;
663
664 let stats = tree.get_stats().await;
665 assert_eq!(stats.leaf_count, 2);
666 assert_eq!(stats.total_verifications, 2);
667 assert_eq!(stats.successful_verifications, 1);
668 assert_eq!(stats.failed_verifications, 1);
669 assert!(stats.total_rebuilds > 0);
670 }
671
672 #[tokio::test]
673 async fn test_clear() {
674 let tree = MerkleTree::new();
675
676 tree.insert("key1".to_string(), "value1").await;
677 tree.insert("key2".to_string(), "value2").await;
678
679 assert_eq!(tree.len().await, 2);
680
681 tree.clear().await;
682
683 assert_eq!(tree.len().await, 0);
684 assert!(tree.is_empty().await);
685 assert!(tree.root_hash().await.is_none());
686 }
687
688 #[tokio::test]
689 async fn test_large_tree() {
690 let tree = MerkleTree::new();
691
692 for i in 0..100 {
694 tree.insert(format!("key{}", i), &format!("value{}", i))
695 .await;
696 }
697
698 assert_eq!(tree.len().await, 100);
699
700 let stats = tree.get_stats().await;
701 assert_eq!(stats.leaf_count, 100);
702 assert!(stats.depth > 0);
703
704 for i in 0..100 {
706 assert!(
707 tree.verify(&format!("key{}", i), &format!("value{}", i))
708 .await
709 );
710 }
711 }
712
713 #[tokio::test]
715 async fn test_batch_insert() {
716 let tree = MerkleTree::new();
717
718 let items: Vec<(String, String)> = (0..50)
720 .map(|i| (format!("batch_key{}", i), format!("batch_value{}", i)))
721 .collect();
722
723 tree.insert_batch(items).await;
725
726 assert_eq!(tree.len().await, 50);
727
728 for i in 0..50 {
730 assert!(
731 tree.verify(&format!("batch_key{}", i), &format!("batch_value{}", i))
732 .await
733 );
734 }
735 }
736
737 #[tokio::test]
738 async fn test_hash_operation_metrics() {
739 let tree = MerkleTree::new();
740
741 assert_eq!(tree.hash_operations(), 0);
743
744 tree.insert("key1".to_string(), "value1").await;
746 tree.insert("key2".to_string(), "value2").await;
747 tree.insert("key3".to_string(), "value3").await;
748
749 let hash_ops = tree.hash_operations();
751 assert!(hash_ops > 0, "Hash operations should be tracked");
752
753 tree.verify("key1", "value1").await;
755 assert!(tree.hash_operations() > hash_ops);
756 }
757
758 #[tokio::test]
759 async fn test_rebuild_time_metrics() {
760 let tree = MerkleTree::new();
761
762 for i in 0..10 {
764 tree.insert(format!("key{}", i), &format!("value{}", i))
765 .await;
766 }
767
768 let avg_rebuild_time = tree.average_rebuild_time_us().await;
770 assert!(avg_rebuild_time > 0.0, "Rebuild time should be tracked");
771
772 let stats = tree.get_stats().await;
773 assert!(stats.total_rebuilds > 0);
774 }
775
776 #[tokio::test]
777 async fn test_batch_vs_sequential_performance() {
778 use std::time::Instant;
779
780 let tree_seq = MerkleTree::new();
782 let start_seq = Instant::now();
783 for i in 0..100 {
784 tree_seq
785 .insert(format!("seq_key{}", i), &format!("seq_value{}", i))
786 .await;
787 }
788 let seq_duration = start_seq.elapsed();
789
790 let tree_batch = MerkleTree::new();
792 let items: Vec<(String, String)> = (0..100)
793 .map(|i| (format!("batch_key{}", i), format!("batch_value{}", i)))
794 .collect();
795
796 let start_batch = Instant::now();
797 tree_batch.insert_batch(items).await;
798 let batch_duration = start_batch.elapsed();
799
800 assert_eq!(tree_seq.len().await, 100);
802 assert_eq!(tree_batch.len().await, 100);
803
804 println!(
808 "Sequential: {:?}, Batch: {:?}, Speedup: {:.2}x",
809 seq_duration,
810 batch_duration,
811 seq_duration.as_secs_f64() / batch_duration.as_secs_f64()
812 );
813 }
814}