1use super::*;
20use async_trait::async_trait;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::RwLock;
25
26#[derive(Debug)]
28pub struct EigenTrustEngine {
29 local_trust: Arc<RwLock<HashMap<(NodeId, NodeId), LocalTrustData>>>,
31
32 global_trust: Arc<RwLock<HashMap<NodeId, f64>>>,
34
35 pre_trusted_nodes: Arc<RwLock<HashSet<NodeId>>>,
37
38 node_stats: Arc<RwLock<HashMap<NodeId, NodeStatistics>>>,
40
41 alpha: f64,
43
44 decay_rate: f64,
46
47 last_update: RwLock<Instant>,
49
50 update_interval: Duration,
52
53 trust_cache: Arc<RwLock<HashMap<NodeId, f64>>>,
55}
56
57#[derive(Debug, Clone)]
59struct LocalTrustData {
60 value: f64,
62 interactions: u64,
64 last_interaction: Instant,
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct NodeStatistics {
71 pub uptime: u64,
73 pub correct_responses: u64,
75 pub failed_responses: u64,
77 pub storage_contributed: u64,
79 pub bandwidth_contributed: u64,
81 pub compute_contributed: u64,
83}
84
85#[derive(Debug, Clone)]
87pub enum NodeStatisticsUpdate {
88 Uptime(u64),
90 CorrectResponse,
92 FailedResponse,
94 DataUnavailable,
96 CorruptedData,
99 ProtocolViolation,
102 StorageContributed(u64),
104 BandwidthContributed(u64),
106 ComputeContributed(u64),
108}
109
110impl EigenTrustEngine {
111 pub fn new(pre_trusted_nodes: HashSet<NodeId>) -> Self {
113 let mut initial_cache = HashMap::new();
114 for node in &pre_trusted_nodes {
116 initial_cache.insert(node.clone(), 0.9);
117 }
118
119 Self {
120 local_trust: Arc::new(RwLock::new(HashMap::new())),
121 global_trust: Arc::new(RwLock::new(HashMap::new())),
122 pre_trusted_nodes: Arc::new(RwLock::new(pre_trusted_nodes)),
123 node_stats: Arc::new(RwLock::new(HashMap::new())),
124 alpha: 0.4, decay_rate: 0.99,
126 last_update: RwLock::new(Instant::now()),
127 update_interval: Duration::from_secs(300), trust_cache: Arc::new(RwLock::new(initial_cache)),
129 }
130 }
131
132 pub fn start_background_updates(self: Arc<Self>) {
134 tokio::spawn(async move {
135 loop {
136 tokio::time::sleep(self.update_interval).await;
137 let _ = self.compute_global_trust().await;
138 }
139 });
140 }
141
142 pub async fn update_local_trust(&self, from: &NodeId, to: &NodeId, success: bool) {
144 let key = (from.clone(), to.clone());
145 let new_value = if success { 1.0 } else { 0.0 };
146
147 let mut trust_map = self.local_trust.write().await;
148 trust_map
149 .entry(key)
150 .and_modify(|data| {
151 data.value = 0.9 * data.value + 0.1 * new_value;
153 data.interactions += 1;
154 data.last_interaction = Instant::now();
155 })
156 .or_insert(LocalTrustData {
157 value: new_value,
158 interactions: 1,
159 last_interaction: Instant::now(),
160 });
161 }
162
163 pub async fn update_node_stats(&self, node_id: &NodeId, stats_update: NodeStatisticsUpdate) {
165 let mut stats = self.node_stats.write().await;
166 let node_stats = stats.entry(node_id.clone()).or_default();
167
168 match stats_update {
169 NodeStatisticsUpdate::Uptime(seconds) => node_stats.uptime += seconds,
170 NodeStatisticsUpdate::CorrectResponse => node_stats.correct_responses += 1,
171 NodeStatisticsUpdate::FailedResponse => node_stats.failed_responses += 1,
172 NodeStatisticsUpdate::DataUnavailable => node_stats.failed_responses += 1,
173 NodeStatisticsUpdate::CorruptedData => {
174 node_stats.failed_responses += 2;
176 }
177 NodeStatisticsUpdate::ProtocolViolation => {
178 node_stats.failed_responses += 2;
180 }
181 NodeStatisticsUpdate::StorageContributed(gb) => node_stats.storage_contributed += gb,
182 NodeStatisticsUpdate::BandwidthContributed(gb) => {
183 node_stats.bandwidth_contributed += gb
184 }
185 NodeStatisticsUpdate::ComputeContributed(cycles) => {
186 node_stats.compute_contributed += cycles
187 }
188 }
189 }
190
191 pub async fn compute_global_trust(&self) -> HashMap<NodeId, f64> {
193 let result = tokio::time::timeout(
196 std::time::Duration::from_secs(2),
197 self.compute_global_trust_internal(),
198 )
199 .await;
200
201 match result {
202 Ok(trust_map) => trust_map,
203 Err(_) => {
204 self.trust_cache.read().await.clone()
206 }
207 }
208 }
209
210 async fn compute_global_trust_internal(&self) -> HashMap<NodeId, f64> {
211 let local_trust = self.local_trust.read().await;
213 let node_stats = self.node_stats.read().await;
214 let pre_trusted = self.pre_trusted_nodes.read().await;
215
216 let mut node_set = HashSet::new();
218 for ((from, to), _) in local_trust.iter() {
219 node_set.insert(from.clone());
220 node_set.insert(to.clone());
221 }
222 for node in node_stats.keys() {
223 node_set.insert(node.clone());
224 }
225
226 if node_set.is_empty() {
227 return HashMap::new();
228 }
229
230 let n = node_set.len();
231
232 let mut incoming_edges: HashMap<NodeId, Vec<(NodeId, f64)>> = HashMap::new();
235 let mut outgoing_sums: HashMap<NodeId, f64> = HashMap::new();
236
237 for ((from, _), data) in local_trust.iter() {
239 if data.value > 0.0 {
240 *outgoing_sums.entry(from.clone()).or_insert(0.0) += data.value;
241 }
242 }
243
244 for ((from, to), data) in local_trust.iter() {
246 if data.value <= 0.0 {
247 continue;
248 }
249
250 let Some(sum) = outgoing_sums.get(from) else {
251 continue;
252 };
253 if *sum <= 0.0 {
254 continue;
255 }
256
257 let normalized_value = data.value / sum;
258 incoming_edges
259 .entry(to.clone())
260 .or_default()
261 .push((from.clone(), normalized_value));
262 }
263
264 let mut trust_vector: HashMap<NodeId, f64> = HashMap::new();
266 let initial_trust = 1.0 / n as f64;
267 for node in &node_set {
268 trust_vector.insert(node.clone(), initial_trust);
269 }
270
271 let pre_trust_value = if !pre_trusted.is_empty() {
274 1.0 / pre_trusted.len() as f64
275 } else {
276 0.0
277 };
278
279 const MAX_ITERATIONS: usize = 50; const CONVERGENCE_THRESHOLD: f64 = 0.0001; for iteration in 0..MAX_ITERATIONS {
284 let mut new_trust: HashMap<NodeId, f64> = HashMap::new();
285
286 for node in &node_set {
288 let mut trust_sum = 0.0;
289
290 if let Some(edges) = incoming_edges.get(node) {
292 for (from_node, weight) in edges {
293 if let Some(from_trust) = trust_vector.get(from_node) {
294 trust_sum += weight * from_trust;
295 }
296 }
297 }
298
299 new_trust.insert(node.clone(), (1.0 - self.alpha) * trust_sum);
301 }
302
303 if !pre_trusted.is_empty() {
305 for pre_node in pre_trusted.iter() {
308 let current = new_trust.entry(pre_node.clone()).or_insert(0.0);
309 *current += self.alpha * pre_trust_value;
310 }
311 } else {
312 let uniform_value = self.alpha / n as f64;
314 for node in &node_set {
315 let current = new_trust.entry(node.clone()).or_insert(0.0);
316 *current += uniform_value;
317 }
318 }
319
320 let sum: f64 = new_trust.values().sum();
322 if sum > 0.0 {
323 for trust in new_trust.values_mut() {
324 *trust /= sum;
325 }
326 }
327
328 let mut diff = 0.0;
330 for node in &node_set {
331 let old = trust_vector.get(node).unwrap_or(&0.0);
332 let new = new_trust.get(node).unwrap_or(&0.0);
333 diff += (old - new).abs();
334 }
335
336 trust_vector = new_trust;
337
338 if diff < CONVERGENCE_THRESHOLD {
340 break;
341 }
342
343 if n > 100 && iteration > 5 {
345 break;
346 }
347 if n > 500 && iteration > 2 {
348 break;
349 }
350 }
351
352 for (node, trust) in trust_vector.iter_mut() {
354 if let Some(stats) = node_stats.get(node) {
355 let factor = self.compute_multi_factor_adjustment(stats);
356 *trust *= factor;
357 }
358 }
359
360 let last_update = self.last_update.read().await;
362 let elapsed = last_update.elapsed().as_secs() as f64 / 3600.0; for (_, trust) in trust_vector.iter_mut() {
365 *trust *= self.decay_rate.powf(elapsed);
366 }
367
368 let total_trust: f64 = trust_vector.values().sum();
370 if total_trust > 0.0 {
371 for (_, trust) in trust_vector.iter_mut() {
372 *trust /= total_trust;
373 }
374 }
375
376 let mut global_trust = self.global_trust.write().await;
378 let mut trust_cache = self.trust_cache.write().await;
379
380 for (node, trust) in &trust_vector {
381 global_trust.insert(node.clone(), *trust);
382 trust_cache.insert(node.clone(), *trust);
383 }
384
385 *self.last_update.write().await = Instant::now();
387
388 trust_vector
389 }
390
391 fn compute_multi_factor_adjustment(&self, stats: &NodeStatistics) -> f64 {
393 let response_rate = if stats.correct_responses + stats.failed_responses > 0 {
394 stats.correct_responses as f64
395 / (stats.correct_responses + stats.failed_responses) as f64
396 } else {
397 0.5
398 };
399
400 let storage_factor = (1.0 + stats.storage_contributed as f64).ln() / 10.0;
402 let bandwidth_factor = (1.0 + stats.bandwidth_contributed as f64).ln() / 10.0;
403 let compute_factor = (1.0 + stats.compute_contributed as f64).ln() / 10.0;
404 let uptime_factor = (stats.uptime as f64 / 86400.0).min(1.0); 0.4 * response_rate
408 + 0.2 * uptime_factor
409 + 0.15 * storage_factor
410 + 0.15 * bandwidth_factor
411 + 0.1 * compute_factor
412 }
413
414 pub async fn add_pre_trusted(&self, node_id: NodeId) {
416 let mut pre_trusted = self.pre_trusted_nodes.write().await;
417 pre_trusted.insert(node_id.clone());
418
419 let mut cache = self.trust_cache.write().await;
421 cache.insert(node_id, 0.9);
422 }
423
424 pub async fn remove_pre_trusted(&self, node_id: &NodeId) {
426 let mut pre_trusted = self.pre_trusted_nodes.write().await;
427 pre_trusted.remove(node_id);
428 }
429
430 pub async fn get_trust_async(&self, node_id: &NodeId) -> f64 {
432 let cache = self.trust_cache.read().await;
433 cache.get(node_id).copied().unwrap_or(0.5)
434 }
435}
436
437impl TrustProvider for EigenTrustEngine {
438 fn get_trust(&self, node: &NodeId) -> f64 {
439 if let Ok(cache) = self.trust_cache.try_read() {
442 cache.get(node).copied().unwrap_or(0.0) } else {
444 0.0 }
447 }
448
449 fn update_trust(&self, from: &NodeId, to: &NodeId, success: bool) {
450 let local_trust = self.local_trust.clone();
452 let from = from.clone();
453 let to = to.clone();
454
455 tokio::spawn(async move {
456 let key = (from, to);
457 let new_value = if success { 1.0 } else { 0.0 };
458
459 let mut trust_map = local_trust.write().await;
460 trust_map
461 .entry(key)
462 .and_modify(|data| {
463 data.value = 0.9 * data.value + 0.1 * new_value;
464 data.interactions += 1;
465 data.last_interaction = Instant::now();
466 })
467 .or_insert(LocalTrustData {
468 value: new_value,
469 interactions: 1,
470 last_interaction: Instant::now(),
471 });
472 });
473 }
474
475 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
476 if let Ok(cache) = self.trust_cache.try_read() {
478 cache.clone()
479 } else {
480 HashMap::new()
481 }
482 }
483
484 fn remove_node(&self, node: &NodeId) {
485 let node_id = node.clone();
487 let local_trust = self.local_trust.clone();
488 let trust_cache = self.trust_cache.clone();
489
490 tokio::spawn(async move {
491 let mut trust_map = local_trust.write().await;
493 trust_map.retain(|(from, to), _| from != &node_id && to != &node_id);
494
495 let mut cache = trust_cache.write().await;
497 cache.remove(&node_id);
498 });
499 }
500}
501
502#[derive(Debug, Clone)]
504pub struct TrustRoutingConfig {
505 pub min_trust_threshold: f64,
508 pub max_intermediate_hops: usize,
511}
512
513impl Default for TrustRoutingConfig {
514 fn default() -> Self {
515 Self {
516 min_trust_threshold: 0.15, max_intermediate_hops: 3,
518 }
519 }
520}
521
522impl TrustRoutingConfig {
523 pub fn with_min_trust(min_trust_threshold: f64) -> Self {
525 Self {
526 min_trust_threshold,
527 ..Default::default()
528 }
529 }
530}
531
532pub struct TrustBasedRoutingStrategy {
534 trust_engine: Arc<EigenTrustEngine>,
536
537 local_id: NodeId,
539
540 config: TrustRoutingConfig,
542}
543
544impl TrustBasedRoutingStrategy {
545 pub fn new(trust_engine: Arc<EigenTrustEngine>, local_id: NodeId) -> Self {
547 Self::with_config(trust_engine, local_id, TrustRoutingConfig::default())
548 }
549
550 pub fn with_config(
552 trust_engine: Arc<EigenTrustEngine>,
553 local_id: NodeId,
554 config: TrustRoutingConfig,
555 ) -> Self {
556 Self {
557 trust_engine,
558 local_id,
559 config,
560 }
561 }
562
563 pub fn min_trust_threshold(&self) -> f64 {
565 self.config.min_trust_threshold
566 }
567}
568
569#[async_trait]
570impl RoutingStrategy for TrustBasedRoutingStrategy {
571 async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
572 let trust_scores = self.trust_engine.get_global_trust();
574
575 let mut trusted_nodes: Vec<(NodeId, f64)> = trust_scores
577 .into_iter()
578 .filter(|(id, trust)| {
579 id != &self.local_id && id != target && *trust >= self.config.min_trust_threshold
580 })
581 .collect();
582
583 trusted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
585
586 let path: Vec<NodeId> = trusted_nodes
588 .into_iter()
589 .take(self.config.max_intermediate_hops)
590 .map(|(id, _)| id)
591 .chain(std::iter::once(target.clone()))
592 .collect();
593
594 if path.len() == 1 {
595 Err(AdaptiveNetworkError::Routing(
597 "No trusted path found".to_string(),
598 ))
599 } else {
600 Ok(path)
601 }
602 }
603
604 fn route_score(&self, neighbor: &NodeId, _target: &NodeId) -> f64 {
605 self.trust_engine.get_trust(neighbor)
606 }
607
608 fn update_metrics(&self, path: &[NodeId], success: bool) {
609 if path.len() >= 2 {
611 for window in path.windows(2) {
612 self.trust_engine
613 .update_trust(&window[0], &window[1], success);
614 }
615 }
616 }
617}
618
619pub struct MockTrustProvider {
621 trust_scores: Arc<RwLock<HashMap<NodeId, f64>>>,
622}
623
624impl Default for MockTrustProvider {
625 fn default() -> Self {
626 Self::new()
627 }
628}
629
630impl MockTrustProvider {
631 pub fn new() -> Self {
632 Self {
633 trust_scores: Arc::new(RwLock::new(HashMap::new())),
634 }
635 }
636}
637
638impl TrustProvider for MockTrustProvider {
639 fn get_trust(&self, node: &NodeId) -> f64 {
640 self.trust_scores
641 .blocking_read()
642 .get(node)
643 .copied()
644 .unwrap_or(0.0) }
646
647 fn update_trust(&self, _from: &NodeId, to: &NodeId, success: bool) {
648 let mut scores = self.trust_scores.blocking_write();
649 let current = scores.get(to).copied().unwrap_or(0.5);
650 let new_score = if success {
651 (current + 0.1).min(1.0)
652 } else {
653 (current - 0.1).max(0.0)
654 };
655 scores.insert(to.clone(), new_score);
656 }
657
658 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
659 self.trust_scores.blocking_read().clone()
660 }
661
662 fn remove_node(&self, node: &NodeId) {
663 self.trust_scores.blocking_write().remove(node);
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use std::collections::HashMap;
671
672 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
673 async fn test_eigentrust_basic() {
674 use rand::RngCore;
675
676 let mut hash_pre = [0u8; 32];
677 rand::thread_rng().fill_bytes(&mut hash_pre);
678 let pre_trusted = HashSet::from([NodeId::from_bytes(hash_pre)]);
679
680 let engine = EigenTrustEngine::new(pre_trusted.clone());
681
682 let mut hash1 = [0u8; 32];
684 rand::thread_rng().fill_bytes(&mut hash1);
685 let node1 = NodeId { hash: hash1 };
686
687 let mut hash2 = [0u8; 32];
688 rand::thread_rng().fill_bytes(&mut hash2);
689 let node2 = NodeId { hash: hash2 };
690
691 let pre_trusted_node = pre_trusted.iter().next().unwrap();
692
693 engine
694 .update_local_trust(pre_trusted_node, &node1, true)
695 .await;
696 engine.update_local_trust(&node1, &node2, true).await;
697 engine.update_local_trust(&node2, &node1, false).await;
698
699 let global_trust = engine.get_global_trust();
701
702 let pre_trust = global_trust.get(pre_trusted_node).unwrap_or(&0.0);
704 let node1_trust = global_trust.get(&node1).unwrap_or(&0.0);
705
706 assert!(pre_trust > node1_trust);
707 }
708
709 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
710 async fn test_trust_normalization() {
711 use rand::RngCore;
712
713 let engine = EigenTrustEngine::new(HashSet::new());
714
715 let mut hash1 = [0u8; 32];
716 rand::thread_rng().fill_bytes(&mut hash1);
717 let node1 = NodeId { hash: hash1 };
718
719 let mut hash2 = [0u8; 32];
720 rand::thread_rng().fill_bytes(&mut hash2);
721 let node2 = NodeId { hash: hash2 };
722
723 let mut hash3 = [0u8; 32];
724 rand::thread_rng().fill_bytes(&mut hash3);
725 let node3 = NodeId { hash: hash3 };
726
727 engine.update_local_trust(&node1, &node2, true).await;
728 engine.update_local_trust(&node1, &node3, true).await;
729
730 let global_trust = tokio::time::timeout(
733 std::time::Duration::from_secs(2),
734 engine.compute_global_trust(),
735 )
736 .await
737 .unwrap_or_else(|_| HashMap::new());
738
739 let trust2 = global_trust.get(&node2).copied().unwrap_or(0.0);
740 let trust3 = global_trust.get(&node3).copied().unwrap_or(0.0);
741
742 if trust2 > 0.0 && trust3 > 0.0 {
744 assert!((trust2 - trust3).abs() < 0.01);
745 }
746 }
747
748 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
749 async fn test_multi_factor_trust() {
750 use rand::RngCore;
751
752 let engine = Arc::new(EigenTrustEngine::new(HashSet::new()));
753
754 let mut hash = [0u8; 32];
755 rand::thread_rng().fill_bytes(&mut hash);
756 let node = NodeId { hash };
757
758 engine
760 .update_node_stats(&node, NodeStatisticsUpdate::Uptime(3600))
761 .await;
762 engine
763 .update_node_stats(&node, NodeStatisticsUpdate::CorrectResponse)
764 .await;
765 engine
766 .update_node_stats(&node, NodeStatisticsUpdate::CorrectResponse)
767 .await;
768 engine
769 .update_node_stats(&node, NodeStatisticsUpdate::FailedResponse)
770 .await;
771 engine
772 .update_node_stats(&node, NodeStatisticsUpdate::StorageContributed(100))
773 .await;
774
775 let mut hash2 = [0u8; 32];
777 rand::thread_rng().fill_bytes(&mut hash2);
778 let other = NodeId { hash: hash2 };
779
780 engine.update_local_trust(&other, &node, true).await;
781
782 let compute_ok = tokio::time::timeout(
784 std::time::Duration::from_secs(2),
785 engine.compute_global_trust(),
786 )
787 .await
788 .is_ok();
789
790 let trust_value = if compute_ok {
791 let global_trust = engine.get_global_trust();
792 *global_trust.get(&node).unwrap_or(&0.0)
793 } else {
794 engine.get_trust_async(&node).await
796 };
797
798 assert!(trust_value >= 0.0);
799 }
800
801 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
802 async fn test_trust_decay() {
803 use rand::RngCore;
804
805 let mut engine = EigenTrustEngine::new(HashSet::new());
806 engine.decay_rate = 0.5; let mut hash1 = [0u8; 32];
809 rand::thread_rng().fill_bytes(&mut hash1);
810 let node1 = NodeId { hash: hash1 };
811
812 let mut hash2 = [0u8; 32];
813 rand::thread_rng().fill_bytes(&mut hash2);
814 let node2 = NodeId { hash: hash2 };
815
816 engine.update_local_trust(&node1, &node2, true).await;
817
818 let _ = tokio::time::timeout(
820 std::time::Duration::from_secs(2),
821 engine.compute_global_trust(),
822 )
823 .await;
824 let trust1 = engine.get_global_trust();
825 let initial_trust = trust1.get(&node2).copied().unwrap_or(0.0);
826
827 if let Some(past_time) = Instant::now().checked_sub(Duration::from_secs(3600)) {
830 *engine.last_update.write().await = past_time;
831 }
832
833 let _ = tokio::time::timeout(
835 std::time::Duration::from_secs(2),
836 engine.compute_global_trust(),
837 )
838 .await;
839 let trust2 = engine.get_global_trust();
840 let decayed_trust = trust2.get(&node2).copied().unwrap_or(0.0);
841
842 if initial_trust > 0.0 && decayed_trust > 0.0 {
844 assert!(decayed_trust <= initial_trust);
845 }
846 }
847
848 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
849 async fn test_trust_based_routing() {
850 use rand::RngCore;
851
852 let mut hash_pre = [0u8; 32];
854 rand::thread_rng().fill_bytes(&mut hash_pre);
855 let pre_trusted_id = NodeId::from_bytes(hash_pre);
856
857 let engine = Arc::new(EigenTrustEngine::new(HashSet::from([
858 pre_trusted_id.clone()
859 ])));
860
861 let mut hash_local = [0u8; 32];
863 rand::thread_rng().fill_bytes(&mut hash_local);
864 let local_id = NodeId::from_bytes(hash_local);
865
866 let mut hash_target = [0u8; 32];
867 rand::thread_rng().fill_bytes(&mut hash_target);
868 let target_id = NodeId::from_bytes(hash_target);
869
870 engine
872 .update_local_trust(&pre_trusted_id, &local_id, true)
873 .await;
874 engine.update_local_trust(&local_id, &target_id, true).await;
875
876 let _ = engine.get_global_trust();
877
878 let strategy = TrustBasedRoutingStrategy::new(engine.clone(), local_id);
880
881 let result = tokio::time::timeout(
883 std::time::Duration::from_secs(2),
884 strategy.find_path(&target_id),
885 )
886 .await
887 .expect("find_path timed out");
888
889 assert!(result.is_ok());
891 let path = result.unwrap();
892 assert!(path.contains(&target_id));
893 }
894}