saorsa-core 0.20.0

Saorsa - Core P2P networking library with DHT, QUIC transport, and post-quantum cryptography
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
// Copyright 2024 Saorsa Labs Limited
//
// This software is dual-licensed under:
// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
// - Commercial License
//
// For AGPL-3.0 license, see LICENSE-AGPL-3.0
// For commercial licensing, contact: david@saorsalabs.com
//
// Unless required by applicable law or agreed to in writing, software
// distributed under these licenses is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

//! Local trust scoring based on direct peer interactions.
//!
//! Scores use an exponential moving average (EMA) that blends each new
//! observation and decays toward neutral when idle. No background task
//! needed — decay is applied lazily on each read or write.
//!
//! Future: full EigenTrust with peer-to-peer trust gossip.

use crate::PeerId;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

/// Default trust score for unknown peers
pub const DEFAULT_NEUTRAL_TRUST: f64 = 0.5;

/// Minimum trust score a peer can reach
const MIN_TRUST_SCORE: f64 = 0.0;

/// Maximum trust score a peer can reach
const MAX_TRUST_SCORE: f64 = 1.0;

/// EMA weight for each new observation (higher = faster response to events)
const EMA_WEIGHT: f64 = 0.1;

/// Decay constant (per-second).
///
/// Tuned so that the worst possible score (0.0) takes 3 days of idle time
/// to decay back above the block threshold (0.15).
///
/// Derivation: 0.15 = 0.5 - 0.5 * e^(-λ * 259200)  →  λ = -ln(0.7) / 259200
const DECAY_LAMBDA: f64 = 1.3761e-6;

/// Per-node trust state
#[derive(Debug, Clone)]
struct PeerTrust {
    /// Current trust score (between MIN and MAX)
    score: f64,
    /// When the score was last updated (for decay calculation)
    last_updated: Instant,
}

impl PeerTrust {
    fn new() -> Self {
        Self {
            score: DEFAULT_NEUTRAL_TRUST,
            last_updated: Instant::now(),
        }
    }

    /// Apply time-based decay toward neutral, then clamp to bounds.
    ///
    /// Uses exponential decay: `score = neutral + (score - neutral) * e^(-λt)`
    /// This smoothly pulls the score back toward 0.5 over time.
    fn apply_decay(&mut self) {
        let elapsed_secs = self.last_updated.elapsed().as_secs_f64();
        self.apply_decay_secs(elapsed_secs);
    }

    /// Apply decay for an explicit number of elapsed seconds.
    ///
    /// Factored out so tests can call this directly without manipulating
    /// `Instant` (which can overflow on Windows if uptime < the duration).
    fn apply_decay_secs(&mut self, elapsed_secs: f64) {
        if elapsed_secs > 0.0 {
            let decay_factor = (-DECAY_LAMBDA * elapsed_secs).exp();
            self.score =
                DEFAULT_NEUTRAL_TRUST + (self.score - DEFAULT_NEUTRAL_TRUST) * decay_factor;
            self.score = self.score.clamp(MIN_TRUST_SCORE, MAX_TRUST_SCORE);
            self.last_updated = Instant::now();
        }
    }

    /// Apply a new observation via weighted EMA, after first applying decay.
    ///
    /// The weight controls how heavily this observation influences the score.
    /// `(1-α)^W * score + (1-(1-α)^W) * observation` generalizes the unit-weight
    /// formula and is equivalent to applying `W` consecutive unit-weight updates
    /// for integer W.
    fn record_weighted(&mut self, observation: f64, weight: f64) {
        if !weight.is_finite() || weight <= 0.0 {
            return;
        }
        self.apply_decay();
        let alpha_w = 1.0 - (1.0 - EMA_WEIGHT).powf(weight);
        self.score = (1.0 - alpha_w) * self.score + alpha_w * observation;
        self.score = self.score.clamp(MIN_TRUST_SCORE, MAX_TRUST_SCORE);
        self.last_updated = Instant::now();
    }

    /// Apply a new observation via EMA with unit weight, after first applying decay.
    #[allow(dead_code)] // design API: retained as convenience wrapper for record_weighted
    fn record(&mut self, observation: f64) {
        self.record_weighted(observation, 1.0);
    }

    /// Get the current score with decay applied (does not mutate).
    fn decayed_score(&self) -> f64 {
        Self::decay_score(self.score, self.last_updated.elapsed().as_secs_f64())
    }

    /// Pure function: compute what a score would be after `elapsed_secs` of decay.
    fn decay_score(score: f64, elapsed_secs: f64) -> f64 {
        if elapsed_secs > 0.0 {
            let decay_factor = (-DECAY_LAMBDA * elapsed_secs).exp();
            let decayed = DEFAULT_NEUTRAL_TRUST + (score - DEFAULT_NEUTRAL_TRUST) * decay_factor;
            decayed.clamp(MIN_TRUST_SCORE, MAX_TRUST_SCORE)
        } else {
            score
        }
    }
}

/// Observation value for a successful interaction
const SUCCESS_OBSERVATION: f64 = 1.0;

/// Observation value for a failed interaction
const FAILURE_OBSERVATION: f64 = 0.0;

/// Statistics update type for recording peer interaction outcomes
#[derive(Debug, Clone)]
pub enum NodeStatisticsUpdate {
    /// Peer provided a correct response
    CorrectResponse,
    /// Peer failed to provide a response
    FailedResponse,
}

/// Serializable trust snapshot for persistence across restarts.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrustSnapshot {
    /// Peer trust scores with timestamps.
    /// The timestamp is seconds since UNIX epoch when the score was last updated.
    pub peers: HashMap<PeerId, TrustRecord>,
}

/// A single peer's trust record for serialization.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrustRecord {
    /// Trust score [0.0, 1.0]
    pub score: f64,
    /// When the score was last updated (seconds since UNIX epoch)
    pub last_updated_epoch_secs: u64,
}

/// Local trust engine based on direct peer observations.
///
/// Scores are an exponential moving average of success/failure observations
/// that decays toward neutral (0.5) when idle. Bounded by `MIN_TRUST_SCORE`
/// and `MAX_TRUST_SCORE`.
///
/// This is the **sole authority** on peer trust scores in the system.
#[derive(Debug)]
pub struct TrustEngine {
    /// Per-node trust state
    peers: Arc<RwLock<HashMap<PeerId, PeerTrust>>>,
}

impl TrustEngine {
    /// Create a new TrustEngine
    pub fn new() -> Self {
        Self {
            peers: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    /// Record a peer interaction outcome
    pub fn update_node_stats(&self, node_id: &PeerId, update: NodeStatisticsUpdate) {
        self.update_node_stats_weighted(node_id, update, 1.0);
    }

    /// Record a peer interaction outcome with an explicit weight.
    ///
    /// Weight `1.0` is equivalent to a single internal event. Higher weights
    /// amplify the observation's influence on the EMA. The caller is responsible
    /// for validating/clamping the weight before calling this method.
    pub fn update_node_stats_weighted(
        &self,
        node_id: &PeerId,
        update: NodeStatisticsUpdate,
        weight: f64,
    ) {
        let mut peers = self.peers.write();
        let entry = peers.entry(*node_id).or_insert_with(PeerTrust::new);

        let observation = match update {
            NodeStatisticsUpdate::CorrectResponse => SUCCESS_OBSERVATION,
            NodeStatisticsUpdate::FailedResponse => FAILURE_OBSERVATION,
        };

        entry.record_weighted(observation, weight);
    }

    /// Get current trust score for a peer (synchronous).
    ///
    /// Applies time decay lazily — no background task needed.
    /// Returns `DEFAULT_NEUTRAL_TRUST` (0.5) for unknown peers.
    ///
    /// Uses `parking_lot::RwLock` so this never falls back to a stale
    /// neutral value during write contention — it briefly blocks until
    /// the writer releases.
    pub fn score(&self, node_id: &PeerId) -> f64 {
        let peers = self.peers.read();
        peers
            .get(node_id)
            .map(|p| p.decayed_score())
            .unwrap_or(DEFAULT_NEUTRAL_TRUST)
    }

    /// Remove a peer from the trust system entirely
    pub fn remove_node(&self, node_id: &PeerId) {
        let mut peers = self.peers.write();
        peers.remove(node_id);
    }

    /// Export current trust state as a serializable snapshot.
    ///
    /// Applies decay to all scores before exporting so the snapshot
    /// reflects the current effective scores.
    pub fn export_snapshot(&self) -> TrustSnapshot {
        let peers_guard = self.peers.read();
        let now_epoch = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        let peers = peers_guard
            .iter()
            .map(|(peer_id, peer_trust)| {
                let record = TrustRecord {
                    score: peer_trust.decayed_score(),
                    last_updated_epoch_secs: now_epoch,
                };
                (*peer_id, record)
            })
            .collect();

        TrustSnapshot { peers }
    }

    /// Import trust state from a persisted snapshot.
    ///
    /// Scores are restored as-is with `last_updated` set to now.  Decay does
    /// not run while our node is offline — we can't observe peer behavior
    /// during downtime, so penalising peers for our absence would be wrong.
    /// Decay resumes naturally from the moment the node restarts.
    pub fn import_snapshot(&self, snapshot: &TrustSnapshot) {
        let mut peers_guard = self.peers.write();

        for (peer_id, record) in &snapshot.peers {
            // Guard against NaN/Infinity from corrupted or malicious snapshots —
            // non-finite values would propagate through all EMA/decay calculations.
            let score = if record.score.is_finite() {
                record.score.clamp(MIN_TRUST_SCORE, MAX_TRUST_SCORE)
            } else {
                DEFAULT_NEUTRAL_TRUST
            };
            let peer_trust = PeerTrust {
                score,
                last_updated: Instant::now(),
            };
            peers_guard.insert(*peer_id, peer_trust);
        }
    }

    /// Simulate time passing for a peer (test only).
    ///
    /// Applies decay as if `elapsed` time had passed since the last update.
    /// Uses `apply_decay_secs` directly to avoid `Instant` subtraction,
    /// which panics on Windows when system uptime < `elapsed`.
    #[cfg(test)]
    pub async fn simulate_elapsed(&self, node_id: &PeerId, elapsed: std::time::Duration) {
        let mut peers = self.peers.write();
        if let Some(trust) = peers.get_mut(node_id) {
            trust.apply_decay_secs(elapsed.as_secs_f64());
        }
    }
}

impl Default for TrustEngine {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_unknown_peer_returns_neutral() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();
        assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
    }

    #[tokio::test]
    async fn test_successes_increase_score() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        for _ in 0..50 {
            engine.update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse);
        }

        let score = engine.score(&peer);
        assert!(
            score > DEFAULT_NEUTRAL_TRUST,
            "Score {score} should be above neutral"
        );
        assert!(score <= MAX_TRUST_SCORE, "Score {score} should be <= max");
    }

    #[tokio::test]
    async fn test_failures_decrease_score() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        for _ in 0..50 {
            engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        }

        let score = engine.score(&peer);
        assert!(
            score < DEFAULT_NEUTRAL_TRUST,
            "Score {score} should be below neutral"
        );
        assert!(score >= MIN_TRUST_SCORE, "Score {score} should be >= min");
    }

    #[tokio::test]
    async fn test_scores_clamped_to_bounds() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Many successes — should not exceed MAX
        for _ in 0..1000 {
            engine.update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse);
        }
        let score = engine.score(&peer);
        assert!(score >= MIN_TRUST_SCORE, "Score {score} below min");
        assert!(score <= MAX_TRUST_SCORE, "Score {score} above max");

        // Many failures — should not go below MIN
        for _ in 0..2000 {
            engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        }
        let score = engine.score(&peer);
        assert!(score >= MIN_TRUST_SCORE, "Score {score} below min");
        assert!(score <= MAX_TRUST_SCORE, "Score {score} above max");
    }

    #[tokio::test]
    async fn test_remove_node_resets_to_neutral() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        assert!(engine.score(&peer) < DEFAULT_NEUTRAL_TRUST);

        engine.remove_node(&peer);
        assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
    }

    #[tokio::test]
    async fn test_ema_blends_observations() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // First failure moves score below neutral
        engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        let after_fail = engine.score(&peer);
        assert!(after_fail < DEFAULT_NEUTRAL_TRUST);

        // A success moves it back up (but not all the way to neutral)
        engine.update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse);
        let after_success = engine.score(&peer);
        assert!(after_success > after_fail, "Success should increase score");
    }

    /// 3 days of idle time from worst score (0.0) should cross the block threshold (0.15).
    ///
    /// Uses the pure `decay_score` function to avoid `Instant` subtraction,
    /// which panics on Windows if system uptime < the simulated duration.
    #[test]
    fn test_worst_score_unblocks_after_3_days() {
        let three_days_secs = (3 * 24 * 3600) as f64;
        let score = PeerTrust::decay_score(MIN_TRUST_SCORE, three_days_secs);

        assert!(
            score >= 0.15,
            "After 3 days, score {score} should be >= block threshold 0.15",
        );
    }

    /// Just under 3 days should NOT be enough to unblock
    #[test]
    fn test_worst_score_still_blocked_before_3_days() {
        let just_under_3_days = (3 * 24 * 3600 - 3600) as f64; // 3 days minus 1 hour
        let score = PeerTrust::decay_score(MIN_TRUST_SCORE, just_under_3_days);

        assert!(
            score < 0.15,
            "Before 3 days, score {score} should still be < block threshold 0.15",
        );
    }

    #[test]
    fn test_decay_from_high_score_moves_down() {
        let one_week_secs = (7 * 24 * 3600) as f64;
        let score = PeerTrust::decay_score(0.95, one_week_secs);

        assert!(score < 0.95, "Score should have decayed from 0.95");
        assert!(
            score > DEFAULT_NEUTRAL_TRUST,
            "Score should still be above neutral after 1 week"
        );
    }

    #[test]
    fn test_decay_from_low_score_moves_up() {
        let one_week_secs = (7 * 24 * 3600) as f64;
        let score = PeerTrust::decay_score(0.1, one_week_secs);

        assert!(score > 0.1, "Low score should decay upward toward neutral");
    }

    #[tokio::test]
    async fn test_export_import_roundtrip() {
        let engine = TrustEngine::new();
        let peer1 = PeerId::random();
        let peer2 = PeerId::random();

        // Build up some trust
        for _ in 0..20 {
            engine.update_node_stats(&peer1, NodeStatisticsUpdate::CorrectResponse);
        }
        for _ in 0..10 {
            engine.update_node_stats(&peer2, NodeStatisticsUpdate::FailedResponse);
        }

        let score1_before = engine.score(&peer1);
        let score2_before = engine.score(&peer2);

        // Export
        let snapshot = engine.export_snapshot();
        assert_eq!(snapshot.peers.len(), 2);

        // Import into fresh engine
        let engine2 = TrustEngine::new();
        engine2.import_snapshot(&snapshot);

        let score1_after = engine2.score(&peer1);
        let score2_after = engine2.score(&peer2);

        // Scores should be approximately equal (small time drift from test execution)
        assert!(
            (score1_before - score1_after).abs() < 0.01,
            "peer1 score drifted: before={score1_before}, after={score1_after}"
        );
        assert!(
            (score2_before - score2_after).abs() < 0.01,
            "peer2 score drifted: before={score2_before}, after={score2_after}"
        );
    }

    #[tokio::test]
    async fn test_import_preserves_scores_without_decay() {
        // Create a snapshot with a timestamp 1 day in the past.
        // Scores should be restored as-is — no decay for offline time.
        let peer = PeerId::random();
        let one_day_secs: u64 = 86_400;
        let one_day_ago = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs()
            - one_day_secs;

        let snapshot = TrustSnapshot {
            peers: HashMap::from([(
                peer,
                TrustRecord {
                    score: 0.9,
                    last_updated_epoch_secs: one_day_ago,
                },
            )]),
        };

        let engine = TrustEngine::new();
        engine.import_snapshot(&snapshot);

        let score = engine.score(&peer);
        // Score should be restored at 0.9 — offline time doesn't decay
        assert!(
            (score - 0.9).abs() < 0.01,
            "Score {score} should be ~0.9 (no offline decay)"
        );
    }

    #[tokio::test]
    async fn test_import_nan_score_falls_back_to_neutral() {
        let peer = PeerId::random();
        let snapshot = TrustSnapshot {
            peers: HashMap::from([(
                peer,
                TrustRecord {
                    score: f64::NAN,
                    last_updated_epoch_secs: 1_000_000,
                },
            )]),
        };

        let engine = TrustEngine::new();
        engine.import_snapshot(&snapshot);

        let score = engine.score(&peer);
        assert!(
            score.is_finite(),
            "NaN score should have been replaced with a finite value"
        );
        assert!(
            (score - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
            "NaN score should fall back to neutral, got {score}"
        );
    }

    #[tokio::test]
    async fn test_import_infinity_score_falls_back_to_neutral() {
        let peer = PeerId::random();
        let snapshot = TrustSnapshot {
            peers: HashMap::from([(
                peer,
                TrustRecord {
                    score: f64::INFINITY,
                    last_updated_epoch_secs: 1_000_000,
                },
            )]),
        };

        let engine = TrustEngine::new();
        engine.import_snapshot(&snapshot);

        let score = engine.score(&peer);
        assert!(
            score.is_finite(),
            "Infinity score should have been replaced with a finite value"
        );
        assert!(
            (score - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
            "Infinity score should fall back to neutral, got {score}"
        );
    }

    /// Test: negative weights are rejected and do not corrupt the trust score.
    ///
    /// The `record_weighted` guard (`weight <= 0.0`) prevents negative weights
    /// from reversing the observation direction. This test confirms that
    /// calling `update_node_stats_weighted` with a negative weight is a no-op.
    #[tokio::test]
    async fn test_negative_weight_is_noop() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        let before = engine.score(&peer);

        // Attempt a failure with negative weight — should be rejected
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, -5.0);
        let after_negative = engine.score(&peer);
        assert!(
            (before - after_negative).abs() < f64::EPSILON,
            "negative weight should be a no-op: before={before}, after={after_negative}"
        );

        // Attempt a success with negative weight — also a no-op
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::CorrectResponse, -1.0);
        let after_negative_success = engine.score(&peer);
        assert!(
            (before - after_negative_success).abs() < f64::EPSILON,
            "negative weight success should be a no-op: before={before}, after={after_negative_success}"
        );

        // Confirm normal weight still works after negative attempts
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, 1.0);
        let after_valid = engine.score(&peer);
        assert!(
            after_valid < before,
            "valid weight-1 failure should reduce score: before={before}, after={after_valid}"
        );
    }

    /// Test: weighted EMA has larger impact than unit weight
    #[tokio::test]
    async fn test_weighted_ema_larger_impact() {
        let engine = TrustEngine::new();
        let peer_a = PeerId::random();
        let peer_b = PeerId::random();

        // Unit-weight failure for peer A
        engine.update_node_stats_weighted(&peer_a, NodeStatisticsUpdate::FailedResponse, 1.0);
        let score_a = engine.score(&peer_a);

        // Weight-5 failure for peer B
        engine.update_node_stats_weighted(&peer_b, NodeStatisticsUpdate::FailedResponse, 5.0);
        let score_b = engine.score(&peer_b);

        assert!(
            score_b < score_a,
            "weight-5 failure ({score_b}) should produce lower score than weight-1 ({score_a})"
        );
    }

    /// Test: weight-1 weighted path is equivalent to the original unit-weight path
    #[tokio::test]
    async fn test_unit_weight_equivalence() {
        let engine1 = TrustEngine::new();
        let engine2 = TrustEngine::new();
        let peer = PeerId::random();

        engine1.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        engine2.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, 1.0);

        let diff = (engine1.score(&peer) - engine2.score(&peer)).abs();
        assert!(
            diff < 1e-10,
            "unit-weight paths should be equivalent, diff={diff}"
        );
    }

    // =======================================================================
    // Phase 8: Integration test matrix — missing coverage
    // =======================================================================

    // -----------------------------------------------------------------------
    // Test 54: Consumer penalty degrades trust to blocking
    // -----------------------------------------------------------------------

    /// Repeated high-weight failures should push a peer's trust score below
    /// the block threshold (0.15), eventually making it eligible for eviction.
    #[tokio::test]
    async fn test_consumer_penalty_degrades_to_blocking() {
        /// Block threshold matching the value in adaptive/dht.rs
        const BLOCK_THRESHOLD: f64 = 0.15;

        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Repeated weight-3 failures from neutral (0.5) should push well below 0.15.
        let failure_count = 10;
        for _ in 0..failure_count {
            engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, 3.0);
        }

        let score = engine.score(&peer);
        assert!(
            score < BLOCK_THRESHOLD,
            "after {failure_count} weight-3 failures, score {score} should be below block threshold {BLOCK_THRESHOLD}"
        );
    }

    // -----------------------------------------------------------------------
    // Test 58: Consumer and internal events combine in same EMA
    // -----------------------------------------------------------------------

    /// Internal (weight-1) and consumer-reported (weight-3) events feed the
    /// same EMA. A heavier failure should outweigh a lighter success.
    #[tokio::test]
    async fn test_consumer_and_internal_events_combine() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Internal success (unit weight)
        engine.update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse);
        let after_success = engine.score(&peer);
        assert!(
            after_success > DEFAULT_NEUTRAL_TRUST,
            "single success should raise above neutral"
        );

        // Consumer failure with weight 3 — should outweigh the single success
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, 3.0);
        let after_failure = engine.score(&peer);

        assert!(
            after_failure < after_success,
            "weight-3 failure ({after_failure}) should outweigh weight-1 success ({after_success})"
        );
        assert!(
            after_failure < DEFAULT_NEUTRAL_TRUST,
            "net effect ({after_failure}) should be below neutral ({DEFAULT_NEUTRAL_TRUST})"
        );
    }

    // -----------------------------------------------------------------------
    // Test 59: Consumer trust query reflects all event sources
    // -----------------------------------------------------------------------

    /// `score()` returns a single EMA value shaped by a mix of internal and
    /// consumer-reported events — there is no separate "consumer score."
    #[tokio::test]
    async fn test_trust_query_reflects_all_event_sources() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Mix of internal and consumer events
        engine.update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse);
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::CorrectResponse, 2.0);
        engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);

        // Score should reflect the combined influence, not just internal events.
        let score = engine.score(&peer);
        // With 1 unit-success + 1 weight-2-success + 1 unit-failure, the net
        // effect is positive (3 success-units vs 1 failure-unit).
        assert!(
            score > DEFAULT_NEUTRAL_TRUST,
            "combined score {score} should be above neutral (net positive events)"
        );
    }

    // -----------------------------------------------------------------------
    // Test 63: Time decay applies to consumer events
    // -----------------------------------------------------------------------

    /// Consumer-reported events are subject to the same time decay as internal
    /// events. After enough idle time, the score should decay back toward
    /// neutral (0.5).
    #[tokio::test]
    async fn test_time_decay_applies_to_consumer_events() {
        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Apply a consumer failure with weight 3
        engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::FailedResponse, 3.0);
        let after_failure = engine.score(&peer);
        assert!(
            after_failure < DEFAULT_NEUTRAL_TRUST,
            "after failure, score {after_failure} should be below neutral"
        );

        // Simulate 3+ days of idle time
        let three_days = std::time::Duration::from_secs(3 * 24 * 3600);
        engine.simulate_elapsed(&peer, three_days).await;

        let after_decay = engine.score(&peer);
        assert!(
            after_decay > after_failure,
            "score should decay toward neutral: {after_failure} -> {after_decay}"
        );
        // After 3 days from a moderate failure, the score should be close to neutral.
        let distance_from_neutral = (after_decay - DEFAULT_NEUTRAL_TRUST).abs();
        assert!(
            distance_from_neutral < 0.15,
            "after 3 days, score {after_decay} should be near neutral (distance {distance_from_neutral})"
        );
    }

    // -----------------------------------------------------------------------
    // Test 57: Consumer rewards restore trust protection
    // -----------------------------------------------------------------------

    /// A peer with trust below TRUST_PROTECTION_THRESHOLD (0.7) can be
    /// restored above that threshold by enough consumer success events.
    #[tokio::test]
    async fn test_consumer_rewards_restore_trust_protection() {
        /// Trust protection threshold from core_engine.rs
        const TRUST_PROTECTION_THRESHOLD: f64 = 0.7;

        let engine = TrustEngine::new();
        let peer = PeerId::random();

        // Start below trust protection with some failures
        for _ in 0..5 {
            engine.update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse);
        }
        let low_score = engine.score(&peer);
        assert!(
            low_score < TRUST_PROTECTION_THRESHOLD,
            "peer should start below trust protection: {low_score}"
        );

        // Consumer-reported successes with weight 3 should lift the score
        let success_rounds = 30;
        for _ in 0..success_rounds {
            engine.update_node_stats_weighted(&peer, NodeStatisticsUpdate::CorrectResponse, 3.0);
        }
        let restored_score = engine.score(&peer);
        assert!(
            restored_score >= TRUST_PROTECTION_THRESHOLD,
            "after {success_rounds} weight-3 successes, score {restored_score} should be >= {TRUST_PROTECTION_THRESHOLD}"
        );
    }
}