1use super::*;
20use async_trait::async_trait;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::RwLock;
25
26#[derive(Debug)]
28pub struct EigenTrustEngine {
29 local_trust: Arc<RwLock<HashMap<(NodeId, NodeId), LocalTrustData>>>,
31
32 global_trust: Arc<RwLock<HashMap<NodeId, f64>>>,
34
35 pre_trusted_nodes: Arc<RwLock<HashSet<NodeId>>>,
37
38 node_stats: Arc<RwLock<HashMap<NodeId, NodeStatistics>>>,
40
41 alpha: f64,
43
44 decay_rate: f64,
46
47 last_update: RwLock<Instant>,
49
50 update_interval: Duration,
52
53 trust_cache: Arc<RwLock<HashMap<NodeId, f64>>>,
55}
56
57#[derive(Debug, Clone)]
59struct LocalTrustData {
60 value: f64,
62 interactions: u64,
64 last_interaction: Instant,
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct NodeStatistics {
71 pub uptime: u64,
73 pub correct_responses: u64,
75 pub failed_responses: u64,
77 pub storage_contributed: u64,
79 pub bandwidth_contributed: u64,
81 pub compute_contributed: u64,
83}
84
85#[derive(Debug, Clone)]
87pub enum NodeStatisticsUpdate {
88 Uptime(u64),
89 CorrectResponse,
90 FailedResponse,
91 StorageContributed(u64),
92 BandwidthContributed(u64),
93 ComputeContributed(u64),
94}
95
96impl EigenTrustEngine {
97 pub fn new(pre_trusted_nodes: HashSet<NodeId>) -> Self {
99 let mut initial_cache = HashMap::new();
100 for node in &pre_trusted_nodes {
102 initial_cache.insert(node.clone(), 0.9);
103 }
104
105 Self {
106 local_trust: Arc::new(RwLock::new(HashMap::new())),
107 global_trust: Arc::new(RwLock::new(HashMap::new())),
108 pre_trusted_nodes: Arc::new(RwLock::new(pre_trusted_nodes)),
109 node_stats: Arc::new(RwLock::new(HashMap::new())),
110 alpha: 0.4, decay_rate: 0.99,
112 last_update: RwLock::new(Instant::now()),
113 update_interval: Duration::from_secs(300), trust_cache: Arc::new(RwLock::new(initial_cache)),
115 }
116 }
117
118 pub fn start_background_updates(self: Arc<Self>) {
120 tokio::spawn(async move {
121 loop {
122 tokio::time::sleep(self.update_interval).await;
123 let _ = self.compute_global_trust().await;
124 }
125 });
126 }
127
128 pub async fn update_local_trust(&self, from: &NodeId, to: &NodeId, success: bool) {
130 let key = (from.clone(), to.clone());
131 let new_value = if success { 1.0 } else { 0.0 };
132
133 let mut trust_map = self.local_trust.write().await;
134 trust_map
135 .entry(key)
136 .and_modify(|data| {
137 data.value = 0.9 * data.value + 0.1 * new_value;
139 data.interactions += 1;
140 data.last_interaction = Instant::now();
141 })
142 .or_insert(LocalTrustData {
143 value: new_value,
144 interactions: 1,
145 last_interaction: Instant::now(),
146 });
147 }
148
149 pub async fn update_node_stats(&self, node_id: &NodeId, stats_update: NodeStatisticsUpdate) {
151 let mut stats = self.node_stats.write().await;
152 let node_stats = stats.entry(node_id.clone()).or_default();
153
154 match stats_update {
155 NodeStatisticsUpdate::Uptime(seconds) => node_stats.uptime += seconds,
156 NodeStatisticsUpdate::CorrectResponse => node_stats.correct_responses += 1,
157 NodeStatisticsUpdate::FailedResponse => node_stats.failed_responses += 1,
158 NodeStatisticsUpdate::StorageContributed(gb) => node_stats.storage_contributed += gb,
159 NodeStatisticsUpdate::BandwidthContributed(gb) => {
160 node_stats.bandwidth_contributed += gb
161 }
162 NodeStatisticsUpdate::ComputeContributed(cycles) => {
163 node_stats.compute_contributed += cycles
164 }
165 }
166 }
167
168 pub async fn compute_global_trust(&self) -> HashMap<NodeId, f64> {
170 let result = tokio::time::timeout(
173 std::time::Duration::from_secs(2),
174 self.compute_global_trust_internal(),
175 )
176 .await;
177
178 match result {
179 Ok(trust_map) => trust_map,
180 Err(_) => {
181 self.trust_cache.read().await.clone()
183 }
184 }
185 }
186
187 async fn compute_global_trust_internal(&self) -> HashMap<NodeId, f64> {
188 let local_trust = self.local_trust.read().await;
190 let node_stats = self.node_stats.read().await;
191 let pre_trusted = self.pre_trusted_nodes.read().await;
192
193 let mut node_set = HashSet::new();
195 for ((from, to), _) in local_trust.iter() {
196 node_set.insert(from.clone());
197 node_set.insert(to.clone());
198 }
199 for node in node_stats.keys() {
200 node_set.insert(node.clone());
201 }
202
203 if node_set.is_empty() {
204 return HashMap::new();
205 }
206
207 let n = node_set.len();
208
209 let mut incoming_edges: HashMap<NodeId, Vec<(NodeId, f64)>> = HashMap::new();
212 let mut outgoing_sums: HashMap<NodeId, f64> = HashMap::new();
213
214 for ((from, _), data) in local_trust.iter() {
216 if data.value > 0.0 {
217 *outgoing_sums.entry(from.clone()).or_insert(0.0) += data.value;
218 }
219 }
220
221 for ((from, to), data) in local_trust.iter() {
223 if data.value <= 0.0 {
224 continue;
225 }
226
227 let Some(sum) = outgoing_sums.get(from) else {
228 continue;
229 };
230 if *sum <= 0.0 {
231 continue;
232 }
233
234 let normalized_value = data.value / sum;
235 incoming_edges
236 .entry(to.clone())
237 .or_default()
238 .push((from.clone(), normalized_value));
239 }
240
241 let mut trust_vector: HashMap<NodeId, f64> = HashMap::new();
243 let initial_trust = 1.0 / n as f64;
244 for node in &node_set {
245 trust_vector.insert(node.clone(), initial_trust);
246 }
247
248 let pre_trust_value = if !pre_trusted.is_empty() {
251 1.0 / pre_trusted.len() as f64
252 } else {
253 0.0
254 };
255
256 const MAX_ITERATIONS: usize = 50; const CONVERGENCE_THRESHOLD: f64 = 0.0001; for iteration in 0..MAX_ITERATIONS {
261 let mut new_trust: HashMap<NodeId, f64> = HashMap::new();
262
263 for node in &node_set {
265 let mut trust_sum = 0.0;
266
267 if let Some(edges) = incoming_edges.get(node) {
269 for (from_node, weight) in edges {
270 if let Some(from_trust) = trust_vector.get(from_node) {
271 trust_sum += weight * from_trust;
272 }
273 }
274 }
275
276 new_trust.insert(node.clone(), (1.0 - self.alpha) * trust_sum);
278 }
279
280 if !pre_trusted.is_empty() {
282 for pre_node in pre_trusted.iter() {
285 let current = new_trust.entry(pre_node.clone()).or_insert(0.0);
286 *current += self.alpha * pre_trust_value;
287 }
288 } else {
289 let uniform_value = self.alpha / n as f64;
291 for node in &node_set {
292 let current = new_trust.entry(node.clone()).or_insert(0.0);
293 *current += uniform_value;
294 }
295 }
296
297 let sum: f64 = new_trust.values().sum();
299 if sum > 0.0 {
300 for trust in new_trust.values_mut() {
301 *trust /= sum;
302 }
303 }
304
305 let mut diff = 0.0;
307 for node in &node_set {
308 let old = trust_vector.get(node).unwrap_or(&0.0);
309 let new = new_trust.get(node).unwrap_or(&0.0);
310 diff += (old - new).abs();
311 }
312
313 trust_vector = new_trust;
314
315 if diff < CONVERGENCE_THRESHOLD {
317 break;
318 }
319
320 if n > 100 && iteration > 5 {
322 break;
323 }
324 if n > 500 && iteration > 2 {
325 break;
326 }
327 }
328
329 for (node, trust) in trust_vector.iter_mut() {
331 if let Some(stats) = node_stats.get(node) {
332 let factor = self.compute_multi_factor_adjustment(stats);
333 *trust *= factor;
334 }
335 }
336
337 let last_update = self.last_update.read().await;
339 let elapsed = last_update.elapsed().as_secs() as f64 / 3600.0; for (_, trust) in trust_vector.iter_mut() {
342 *trust *= self.decay_rate.powf(elapsed);
343 }
344
345 let total_trust: f64 = trust_vector.values().sum();
347 if total_trust > 0.0 {
348 for (_, trust) in trust_vector.iter_mut() {
349 *trust /= total_trust;
350 }
351 }
352
353 let mut global_trust = self.global_trust.write().await;
355 let mut trust_cache = self.trust_cache.write().await;
356
357 for (node, trust) in &trust_vector {
358 global_trust.insert(node.clone(), *trust);
359 trust_cache.insert(node.clone(), *trust);
360 }
361
362 *self.last_update.write().await = Instant::now();
364
365 trust_vector
366 }
367
368 fn compute_multi_factor_adjustment(&self, stats: &NodeStatistics) -> f64 {
370 let response_rate = if stats.correct_responses + stats.failed_responses > 0 {
371 stats.correct_responses as f64
372 / (stats.correct_responses + stats.failed_responses) as f64
373 } else {
374 0.5
375 };
376
377 let storage_factor = (1.0 + stats.storage_contributed as f64).ln() / 10.0;
379 let bandwidth_factor = (1.0 + stats.bandwidth_contributed as f64).ln() / 10.0;
380 let compute_factor = (1.0 + stats.compute_contributed as f64).ln() / 10.0;
381 let uptime_factor = (stats.uptime as f64 / 86400.0).min(1.0); 0.4 * response_rate
385 + 0.2 * uptime_factor
386 + 0.15 * storage_factor
387 + 0.15 * bandwidth_factor
388 + 0.1 * compute_factor
389 }
390
391 pub async fn add_pre_trusted(&self, node_id: NodeId) {
393 let mut pre_trusted = self.pre_trusted_nodes.write().await;
394 pre_trusted.insert(node_id.clone());
395
396 let mut cache = self.trust_cache.write().await;
398 cache.insert(node_id, 0.9);
399 }
400
401 pub async fn remove_pre_trusted(&self, node_id: &NodeId) {
403 let mut pre_trusted = self.pre_trusted_nodes.write().await;
404 pre_trusted.remove(node_id);
405 }
406
407 pub async fn get_trust_async(&self, node_id: &NodeId) -> f64 {
409 let cache = self.trust_cache.read().await;
410 cache.get(node_id).copied().unwrap_or(0.5)
411 }
412}
413
414impl TrustProvider for EigenTrustEngine {
415 fn get_trust(&self, node: &NodeId) -> f64 {
416 if let Ok(cache) = self.trust_cache.try_read() {
419 cache.get(node).copied().unwrap_or(0.0) } else {
421 0.0 }
424 }
425
426 fn update_trust(&self, from: &NodeId, to: &NodeId, success: bool) {
427 let local_trust = self.local_trust.clone();
429 let from = from.clone();
430 let to = to.clone();
431
432 tokio::spawn(async move {
433 let key = (from, to);
434 let new_value = if success { 1.0 } else { 0.0 };
435
436 let mut trust_map = local_trust.write().await;
437 trust_map
438 .entry(key)
439 .and_modify(|data| {
440 data.value = 0.9 * data.value + 0.1 * new_value;
441 data.interactions += 1;
442 data.last_interaction = Instant::now();
443 })
444 .or_insert(LocalTrustData {
445 value: new_value,
446 interactions: 1,
447 last_interaction: Instant::now(),
448 });
449 });
450 }
451
452 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
453 if let Ok(cache) = self.trust_cache.try_read() {
455 cache.clone()
456 } else {
457 HashMap::new()
458 }
459 }
460
461 fn remove_node(&self, node: &NodeId) {
462 let node_id = node.clone();
464 let local_trust = self.local_trust.clone();
465 let trust_cache = self.trust_cache.clone();
466
467 tokio::spawn(async move {
468 let mut trust_map = local_trust.write().await;
470 trust_map.retain(|(from, to), _| from != &node_id && to != &node_id);
471
472 let mut cache = trust_cache.write().await;
474 cache.remove(&node_id);
475 });
476 }
477}
478
479#[derive(Debug, Clone)]
481pub struct TrustRoutingConfig {
482 pub min_trust_threshold: f64,
485 pub max_intermediate_hops: usize,
488}
489
490impl Default for TrustRoutingConfig {
491 fn default() -> Self {
492 Self {
493 min_trust_threshold: 0.15, max_intermediate_hops: 3,
495 }
496 }
497}
498
499impl TrustRoutingConfig {
500 pub fn with_min_trust(min_trust_threshold: f64) -> Self {
502 Self {
503 min_trust_threshold,
504 ..Default::default()
505 }
506 }
507}
508
509pub struct TrustBasedRoutingStrategy {
511 trust_engine: Arc<EigenTrustEngine>,
513
514 local_id: NodeId,
516
517 config: TrustRoutingConfig,
519}
520
521impl TrustBasedRoutingStrategy {
522 pub fn new(trust_engine: Arc<EigenTrustEngine>, local_id: NodeId) -> Self {
524 Self::with_config(trust_engine, local_id, TrustRoutingConfig::default())
525 }
526
527 pub fn with_config(
529 trust_engine: Arc<EigenTrustEngine>,
530 local_id: NodeId,
531 config: TrustRoutingConfig,
532 ) -> Self {
533 Self {
534 trust_engine,
535 local_id,
536 config,
537 }
538 }
539
540 pub fn min_trust_threshold(&self) -> f64 {
542 self.config.min_trust_threshold
543 }
544}
545
546#[async_trait]
547impl RoutingStrategy for TrustBasedRoutingStrategy {
548 async fn find_path(&self, target: &NodeId) -> Result<Vec<NodeId>> {
549 let trust_scores = self.trust_engine.get_global_trust();
551
552 let mut trusted_nodes: Vec<(NodeId, f64)> = trust_scores
554 .into_iter()
555 .filter(|(id, trust)| {
556 id != &self.local_id && id != target && *trust >= self.config.min_trust_threshold
557 })
558 .collect();
559
560 trusted_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
562
563 let path: Vec<NodeId> = trusted_nodes
565 .into_iter()
566 .take(self.config.max_intermediate_hops)
567 .map(|(id, _)| id)
568 .chain(std::iter::once(target.clone()))
569 .collect();
570
571 if path.len() == 1 {
572 Err(AdaptiveNetworkError::Routing(
574 "No trusted path found".to_string(),
575 ))
576 } else {
577 Ok(path)
578 }
579 }
580
581 fn route_score(&self, neighbor: &NodeId, _target: &NodeId) -> f64 {
582 self.trust_engine.get_trust(neighbor)
583 }
584
585 fn update_metrics(&mut self, path: &[NodeId], success: bool) {
586 if path.len() >= 2 {
588 for window in path.windows(2) {
589 self.trust_engine
590 .update_trust(&window[0], &window[1], success);
591 }
592 }
593 }
594}
595
596pub struct MockTrustProvider {
598 trust_scores: Arc<RwLock<HashMap<NodeId, f64>>>,
599}
600
601impl Default for MockTrustProvider {
602 fn default() -> Self {
603 Self::new()
604 }
605}
606
607impl MockTrustProvider {
608 pub fn new() -> Self {
609 Self {
610 trust_scores: Arc::new(RwLock::new(HashMap::new())),
611 }
612 }
613}
614
615impl TrustProvider for MockTrustProvider {
616 fn get_trust(&self, node: &NodeId) -> f64 {
617 self.trust_scores
618 .blocking_read()
619 .get(node)
620 .copied()
621 .unwrap_or(0.0) }
623
624 fn update_trust(&self, _from: &NodeId, to: &NodeId, success: bool) {
625 let mut scores = self.trust_scores.blocking_write();
626 let current = scores.get(to).copied().unwrap_or(0.5);
627 let new_score = if success {
628 (current + 0.1).min(1.0)
629 } else {
630 (current - 0.1).max(0.0)
631 };
632 scores.insert(to.clone(), new_score);
633 }
634
635 fn get_global_trust(&self) -> HashMap<NodeId, f64> {
636 self.trust_scores.blocking_read().clone()
637 }
638
639 fn remove_node(&self, node: &NodeId) {
640 self.trust_scores.blocking_write().remove(node);
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use std::collections::HashMap;
648
649 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
650 async fn test_eigentrust_basic() {
651 use rand::RngCore;
652
653 let mut hash_pre = [0u8; 32];
654 rand::thread_rng().fill_bytes(&mut hash_pre);
655 let pre_trusted = HashSet::from([NodeId::from_bytes(hash_pre)]);
656
657 let engine = EigenTrustEngine::new(pre_trusted.clone());
658
659 let mut hash1 = [0u8; 32];
661 rand::thread_rng().fill_bytes(&mut hash1);
662 let node1 = NodeId { hash: hash1 };
663
664 let mut hash2 = [0u8; 32];
665 rand::thread_rng().fill_bytes(&mut hash2);
666 let node2 = NodeId { hash: hash2 };
667
668 let pre_trusted_node = pre_trusted.iter().next().unwrap();
669
670 engine
671 .update_local_trust(pre_trusted_node, &node1, true)
672 .await;
673 engine.update_local_trust(&node1, &node2, true).await;
674 engine.update_local_trust(&node2, &node1, false).await;
675
676 let global_trust = engine.get_global_trust();
678
679 let pre_trust = global_trust.get(pre_trusted_node).unwrap_or(&0.0);
681 let node1_trust = global_trust.get(&node1).unwrap_or(&0.0);
682
683 assert!(pre_trust > node1_trust);
684 }
685
686 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
687 async fn test_trust_normalization() {
688 use rand::RngCore;
689
690 let engine = EigenTrustEngine::new(HashSet::new());
691
692 let mut hash1 = [0u8; 32];
693 rand::thread_rng().fill_bytes(&mut hash1);
694 let node1 = NodeId { hash: hash1 };
695
696 let mut hash2 = [0u8; 32];
697 rand::thread_rng().fill_bytes(&mut hash2);
698 let node2 = NodeId { hash: hash2 };
699
700 let mut hash3 = [0u8; 32];
701 rand::thread_rng().fill_bytes(&mut hash3);
702 let node3 = NodeId { hash: hash3 };
703
704 engine.update_local_trust(&node1, &node2, true).await;
705 engine.update_local_trust(&node1, &node3, true).await;
706
707 let global_trust = tokio::time::timeout(
710 std::time::Duration::from_secs(2),
711 engine.compute_global_trust(),
712 )
713 .await
714 .unwrap_or_else(|_| HashMap::new());
715
716 let trust2 = global_trust.get(&node2).copied().unwrap_or(0.0);
717 let trust3 = global_trust.get(&node3).copied().unwrap_or(0.0);
718
719 if trust2 > 0.0 && trust3 > 0.0 {
721 assert!((trust2 - trust3).abs() < 0.01);
722 }
723 }
724
725 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
726 async fn test_multi_factor_trust() {
727 use rand::RngCore;
728
729 let engine = Arc::new(EigenTrustEngine::new(HashSet::new()));
730
731 let mut hash = [0u8; 32];
732 rand::thread_rng().fill_bytes(&mut hash);
733 let node = NodeId { hash };
734
735 engine
737 .update_node_stats(&node, NodeStatisticsUpdate::Uptime(3600))
738 .await;
739 engine
740 .update_node_stats(&node, NodeStatisticsUpdate::CorrectResponse)
741 .await;
742 engine
743 .update_node_stats(&node, NodeStatisticsUpdate::CorrectResponse)
744 .await;
745 engine
746 .update_node_stats(&node, NodeStatisticsUpdate::FailedResponse)
747 .await;
748 engine
749 .update_node_stats(&node, NodeStatisticsUpdate::StorageContributed(100))
750 .await;
751
752 let mut hash2 = [0u8; 32];
754 rand::thread_rng().fill_bytes(&mut hash2);
755 let other = NodeId { hash: hash2 };
756
757 engine.update_local_trust(&other, &node, true).await;
758
759 let compute_ok = tokio::time::timeout(
761 std::time::Duration::from_secs(2),
762 engine.compute_global_trust(),
763 )
764 .await
765 .is_ok();
766
767 let trust_value = if compute_ok {
768 let global_trust = engine.get_global_trust();
769 *global_trust.get(&node).unwrap_or(&0.0)
770 } else {
771 engine.get_trust_async(&node).await
773 };
774
775 assert!(trust_value >= 0.0);
776 }
777
778 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
779 async fn test_trust_decay() {
780 use rand::RngCore;
781
782 let mut engine = EigenTrustEngine::new(HashSet::new());
783 engine.decay_rate = 0.5; let mut hash1 = [0u8; 32];
786 rand::thread_rng().fill_bytes(&mut hash1);
787 let node1 = NodeId { hash: hash1 };
788
789 let mut hash2 = [0u8; 32];
790 rand::thread_rng().fill_bytes(&mut hash2);
791 let node2 = NodeId { hash: hash2 };
792
793 engine.update_local_trust(&node1, &node2, true).await;
794
795 let _ = tokio::time::timeout(
797 std::time::Duration::from_secs(2),
798 engine.compute_global_trust(),
799 )
800 .await;
801 let trust1 = engine.get_global_trust();
802 let initial_trust = trust1.get(&node2).copied().unwrap_or(0.0);
803
804 if let Some(past_time) = Instant::now().checked_sub(Duration::from_secs(3600)) {
807 *engine.last_update.write().await = past_time;
808 }
809
810 let _ = tokio::time::timeout(
812 std::time::Duration::from_secs(2),
813 engine.compute_global_trust(),
814 )
815 .await;
816 let trust2 = engine.get_global_trust();
817 let decayed_trust = trust2.get(&node2).copied().unwrap_or(0.0);
818
819 if initial_trust > 0.0 && decayed_trust > 0.0 {
821 assert!(decayed_trust <= initial_trust);
822 }
823 }
824
825 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
826 async fn test_trust_based_routing() {
827 use rand::RngCore;
828
829 let mut hash_pre = [0u8; 32];
831 rand::thread_rng().fill_bytes(&mut hash_pre);
832 let pre_trusted_id = NodeId::from_bytes(hash_pre);
833
834 let engine = Arc::new(EigenTrustEngine::new(HashSet::from([
835 pre_trusted_id.clone()
836 ])));
837
838 let mut hash_local = [0u8; 32];
840 rand::thread_rng().fill_bytes(&mut hash_local);
841 let local_id = NodeId::from_bytes(hash_local);
842
843 let mut hash_target = [0u8; 32];
844 rand::thread_rng().fill_bytes(&mut hash_target);
845 let target_id = NodeId::from_bytes(hash_target);
846
847 engine
849 .update_local_trust(&pre_trusted_id, &local_id, true)
850 .await;
851 engine.update_local_trust(&local_id, &target_id, true).await;
852
853 let _ = engine.get_global_trust();
854
855 let strategy = TrustBasedRoutingStrategy::new(engine.clone(), local_id);
857
858 let result = tokio::time::timeout(
860 std::time::Duration::from_secs(2),
861 strategy.find_path(&target_id),
862 )
863 .await
864 .expect("find_path timed out");
865
866 assert!(result.is_ok());
868 let path = result.unwrap();
869 assert!(path.contains(&target_id));
870 }
871}