Skip to main content

saorsa_core/adaptive/
dht.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! AdaptiveDHT — the trust boundary for all DHT operations.
15//!
16//! `AdaptiveDHT` is the **sole component** that creates and owns the [`TrustEngine`].
17//! All DHT operations flow through it, and all trust signals originate from it.
18//!
19//! Internal DHT operations (iterative lookups) record trust via the `TrustEngine`
20//! reference passed to `DhtNetworkManager`. External callers report additional
21//! trust signals through [`AdaptiveDHT::report_trust_event`].
22
23use crate::adaptive::trust::{NodeStatisticsUpdate, TrustEngine};
24use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
25use crate::{MultiAddr, PeerId};
26
27use crate::error::P2pResult as Result;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31/// Default trust score threshold below which a peer is evicted and blocked
32const DEFAULT_BLOCK_THRESHOLD: f64 = 0.15;
33
34/// Configuration for the AdaptiveDHT layer
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(default)]
37pub struct AdaptiveDhtConfig {
38    /// Trust score below which a peer is evicted from the routing table
39    /// and blocked from sending DHT messages or being re-added to the RT.
40    /// Eviction is immediate when a peer's score crosses this threshold.
41    /// Default: 0.15
42    pub block_threshold: f64,
43}
44
45impl Default for AdaptiveDhtConfig {
46    fn default() -> Self {
47        Self {
48            block_threshold: DEFAULT_BLOCK_THRESHOLD,
49        }
50    }
51}
52
53impl AdaptiveDhtConfig {
54    /// Validate that all config values are within acceptable ranges.
55    ///
56    /// Returns `Err` if `block_threshold` is outside `[0.0, 1.0]` or is NaN.
57    pub fn validate(&self) -> crate::error::P2pResult<()> {
58        if !(0.0..=1.0).contains(&self.block_threshold) || self.block_threshold.is_nan() {
59            return Err(crate::error::P2PError::Validation(
60                format!(
61                    "block_threshold must be in [0.0, 1.0], got {}",
62                    self.block_threshold
63                )
64                .into(),
65            ));
66        }
67        Ok(())
68    }
69}
70
71/// Trust-relevant events observable by the saorsa-core network layer.
72///
73/// Each variant maps to an internal [`NodeStatisticsUpdate`] with appropriate severity.
74/// Only events that saorsa-core can directly observe are included here.
75/// Application-level events (data verification, storage checks) belong in
76/// the consuming application and should be added when that layer exists.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum TrustEvent {
79    // === Positive signals ===
80    /// Peer provided a correct response to a request
81    SuccessfulResponse,
82    /// Peer connection was established and authenticated
83    SuccessfulConnection,
84
85    // === Negative signals ===
86    /// Could not establish a connection to the peer
87    ConnectionFailed,
88    /// Connection attempt timed out
89    ConnectionTimeout,
90}
91
92impl TrustEvent {
93    /// Convert a TrustEvent to the internal NodeStatisticsUpdate
94    fn to_stats_update(self) -> NodeStatisticsUpdate {
95        match self {
96            TrustEvent::SuccessfulResponse | TrustEvent::SuccessfulConnection => {
97                NodeStatisticsUpdate::CorrectResponse
98            }
99            TrustEvent::ConnectionFailed | TrustEvent::ConnectionTimeout => {
100                NodeStatisticsUpdate::FailedResponse
101            }
102        }
103    }
104}
105
106/// AdaptiveDHT — the trust boundary for all DHT operations.
107///
108/// Owns the `TrustEngine` and `DhtNetworkManager`. All DHT operations
109/// should go through this component. Application-level trust signals
110/// are reported via [`report_trust_event`](Self::report_trust_event).
111pub struct AdaptiveDHT {
112    /// The underlying DHT network manager (handles raw DHT operations)
113    dht_manager: Arc<DhtNetworkManager>,
114
115    /// The trust engine — sole authority on peer trust scores
116    trust_engine: Arc<TrustEngine>,
117
118    /// Configuration for trust-weighted behavior
119    config: AdaptiveDhtConfig,
120}
121
122impl AdaptiveDHT {
123    /// Create a new AdaptiveDHT instance.
124    ///
125    /// This creates the `TrustEngine` and the `DhtNetworkManager` with the
126    /// trust engine injected. Call [`start`](Self::start) to begin DHT
127    /// operations. Trust scores are computed live — peers are evicted
128    /// immediately when their score crosses the block threshold.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if `block_threshold` is not in `[0.0, 1.0]` or if
133    /// the underlying `DhtNetworkManager` fails to initialise.
134    pub async fn new(
135        transport: Arc<crate::transport_handle::TransportHandle>,
136        mut dht_config: DhtNetworkConfig,
137        adaptive_config: AdaptiveDhtConfig,
138    ) -> Result<Self> {
139        adaptive_config.validate()?;
140
141        dht_config.block_threshold = adaptive_config.block_threshold;
142
143        let trust_engine = Arc::new(TrustEngine::new());
144
145        let dht_manager = Arc::new(
146            DhtNetworkManager::new(transport, Some(trust_engine.clone()), dht_config).await?,
147        );
148
149        Ok(Self {
150            dht_manager,
151            trust_engine,
152            config: adaptive_config,
153        })
154    }
155
156    // =========================================================================
157    // Trust API — the only place where external callers record trust events
158    // =========================================================================
159
160    /// Report a trust event for a peer.
161    ///
162    /// Records a network-observable outcome (connection success/failure)
163    /// that the DHT layer did not record automatically. See [`TrustEvent`]
164    /// for the supported variants.
165    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
166        self.trust_engine
167            .update_node_stats(peer_id, event.to_stats_update())
168            .await;
169    }
170
171    /// Get the current trust score for a peer (synchronous).
172    ///
173    /// Returns `DEFAULT_NEUTRAL_TRUST` (0.5) for unknown peers.
174    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
175        self.trust_engine.score(peer_id)
176    }
177
178    /// Get a reference to the underlying trust engine for advanced use cases.
179    pub fn trust_engine(&self) -> &Arc<TrustEngine> {
180        &self.trust_engine
181    }
182
183    /// Get the adaptive DHT configuration.
184    pub fn config(&self) -> &AdaptiveDhtConfig {
185        &self.config
186    }
187
188    // =========================================================================
189    // DHT operations — delegates to DhtNetworkManager
190    // =========================================================================
191
192    /// Get the underlying DHT network manager.
193    ///
194    /// All DHT operations are accessible through this reference.
195    /// The DHT manager records trust internally for per-peer outcomes
196    /// during iterative lookups.
197    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
198        &self.dht_manager
199    }
200
201    /// Start the DHT manager.
202    ///
203    /// Trust scores are computed live — no background tasks needed.
204    /// Peers are evicted from the routing table immediately when their
205    /// trust drops below the block threshold.
206    pub async fn start(&self) -> Result<()> {
207        Arc::clone(&self.dht_manager).start().await
208    }
209
210    /// Stop the DHT manager gracefully.
211    pub async fn stop(&self) -> Result<()> {
212        self.dht_manager.stop().await
213    }
214
215    /// Look up connectable addresses for a peer.
216    ///
217    /// Checks the DHT routing table first, then falls back to the transport
218    /// layer. Returns an empty vec when the peer is unknown or has no dialable
219    /// addresses.
220    pub(crate) async fn peer_addresses_for_dial(&self, peer_id: &PeerId) -> Vec<MultiAddr> {
221        self.dht_manager.peer_addresses_for_dial(peer_id).await
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST;
229
230    #[test]
231    fn test_trust_event_mapping() {
232        // Positive events map to CorrectResponse
233        assert!(matches!(
234            TrustEvent::SuccessfulResponse.to_stats_update(),
235            NodeStatisticsUpdate::CorrectResponse
236        ));
237        assert!(matches!(
238            TrustEvent::SuccessfulConnection.to_stats_update(),
239            NodeStatisticsUpdate::CorrectResponse
240        ));
241
242        // Failure events map to FailedResponse
243        assert!(matches!(
244            TrustEvent::ConnectionFailed.to_stats_update(),
245            NodeStatisticsUpdate::FailedResponse
246        ));
247        assert!(matches!(
248            TrustEvent::ConnectionTimeout.to_stats_update(),
249            NodeStatisticsUpdate::FailedResponse
250        ));
251    }
252
253    #[test]
254    fn test_adaptive_dht_config_defaults() {
255        let config = AdaptiveDhtConfig::default();
256        assert!((config.block_threshold - DEFAULT_BLOCK_THRESHOLD).abs() < f64::EPSILON);
257    }
258
259    #[test]
260    fn test_block_threshold_validation_rejects_invalid() {
261        for &bad in &[-0.1, 1.1, f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
262            let config = AdaptiveDhtConfig {
263                block_threshold: bad,
264            };
265            assert!(
266                config.validate().is_err(),
267                "block_threshold {bad} should fail validation"
268            );
269        }
270    }
271
272    #[test]
273    fn test_block_threshold_validation_accepts_valid() {
274        for &good in &[0.0, 0.15, 0.5, 1.0] {
275            let config = AdaptiveDhtConfig {
276                block_threshold: good,
277            };
278            assert!(
279                config.validate().is_ok(),
280                "block_threshold {good} should pass validation"
281            );
282        }
283    }
284
285    // =========================================================================
286    // Integration tests: full trust signal flow
287    // =========================================================================
288
289    /// Test: trust events flow through to TrustEngine and change scores immediately
290    #[tokio::test]
291    async fn test_trust_events_affect_scores() {
292        let engine = Arc::new(TrustEngine::new());
293        let peer = PeerId::random();
294
295        // Unknown peer starts at neutral trust
296        assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
297
298        // Record successes — score should rise above neutral
299        for _ in 0..10 {
300            engine
301                .update_node_stats(&peer, TrustEvent::SuccessfulResponse.to_stats_update())
302                .await;
303        }
304
305        assert!(engine.score(&peer) > DEFAULT_NEUTRAL_TRUST);
306    }
307
308    /// Test: failures reduce trust below block threshold
309    #[tokio::test]
310    async fn test_failures_reduce_trust_below_block_threshold() {
311        let engine = Arc::new(TrustEngine::new());
312        let bad_peer = PeerId::random();
313
314        // Record only failures — score should be 0.0 immediately
315        for _ in 0..20 {
316            engine
317                .update_node_stats(&bad_peer, TrustEvent::ConnectionFailed.to_stats_update())
318                .await;
319        }
320
321        let trust = engine.score(&bad_peer);
322        assert!(
323            trust < DEFAULT_BLOCK_THRESHOLD,
324            "Bad peer trust {trust} should be below block threshold {DEFAULT_BLOCK_THRESHOLD}"
325        );
326    }
327
328    /// Test: TrustEngine scores are bounded 0.0-1.0
329    #[tokio::test]
330    async fn test_trust_scores_bounded() {
331        let engine = Arc::new(TrustEngine::new());
332        let peer = PeerId::random();
333
334        for _ in 0..100 {
335            engine
336                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
337                .await;
338        }
339
340        let score = engine.score(&peer);
341        assert!(score >= 0.0, "Score must be >= 0.0, got {score}");
342        assert!(score <= 1.0, "Score must be <= 1.0, got {score}");
343    }
344
345    /// Test: all TrustEvent variants produce valid stats updates
346    #[test]
347    fn test_all_trust_events_produce_valid_updates() {
348        let events = [
349            TrustEvent::SuccessfulResponse,
350            TrustEvent::SuccessfulConnection,
351            TrustEvent::ConnectionFailed,
352            TrustEvent::ConnectionTimeout,
353        ];
354
355        for event in events {
356            // Should not panic
357            let _update = event.to_stats_update();
358        }
359    }
360
361    // =========================================================================
362    // End-to-end: peer lifecycle from trusted to blocked to unblocked
363    // =========================================================================
364
365    /// Full lifecycle: good peer → fails → blocked → time passes → unblocked
366    #[tokio::test]
367    async fn test_peer_lifecycle_block_and_recovery() {
368        let engine = TrustEngine::new();
369        let peer = PeerId::random();
370
371        // Phase 1: Peer starts at neutral
372        assert!(
373            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
374            "New peer should not be blocked"
375        );
376
377        // Phase 2: Some successes — peer is trusted
378        for _ in 0..20 {
379            engine
380                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
381                .await;
382        }
383        let good_score = engine.score(&peer);
384        assert!(
385            good_score > DEFAULT_NEUTRAL_TRUST,
386            "Trusted peer: {good_score}"
387        );
388
389        // Phase 3: Peer starts failing — score drops
390        for _ in 0..200 {
391            engine
392                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
393                .await;
394        }
395        let bad_score = engine.score(&peer);
396        assert!(
397            bad_score < DEFAULT_BLOCK_THRESHOLD,
398            "After many failures, peer should be blocked: {bad_score}"
399        );
400
401        // Phase 4: Time passes (3+ days) — score decays back toward neutral
402        let three_days = std::time::Duration::from_secs(3 * 24 * 3600);
403        engine.simulate_elapsed(&peer, three_days).await;
404        let recovered_score = engine.score(&peer);
405        assert!(
406            recovered_score >= DEFAULT_BLOCK_THRESHOLD,
407            "After 3 days idle, peer should be unblocked: {recovered_score}"
408        );
409    }
410
411    /// Verify the block threshold works as a binary gate
412    #[tokio::test]
413    async fn test_block_threshold_is_binary() {
414        let engine = TrustEngine::new();
415        let threshold = DEFAULT_BLOCK_THRESHOLD;
416
417        let peer_above = PeerId::random();
418        let peer_below = PeerId::random();
419
420        // Peer with some successes — above threshold
421        for _ in 0..5 {
422            engine
423                .update_node_stats(&peer_above, NodeStatisticsUpdate::CorrectResponse)
424                .await;
425        }
426        assert!(
427            engine.score(&peer_above) >= threshold,
428            "Peer with successes should be above threshold"
429        );
430
431        // Peer with only failures — below threshold
432        for _ in 0..50 {
433            engine
434                .update_node_stats(&peer_below, NodeStatisticsUpdate::FailedResponse)
435                .await;
436        }
437        assert!(
438            engine.score(&peer_below) < threshold,
439            "Peer with only failures should be below threshold"
440        );
441
442        // Unknown peer — at neutral, which is above threshold
443        let unknown = PeerId::random();
444        assert!(
445            engine.score(&unknown) >= threshold,
446            "Unknown peer at neutral should not be blocked"
447        );
448    }
449
450    /// Verify that a single failure doesn't immediately block a peer
451    #[tokio::test]
452    async fn test_single_failure_does_not_block() {
453        let engine = TrustEngine::new();
454        let peer = PeerId::random();
455
456        engine
457            .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
458            .await;
459
460        // A single failure from neutral (0.5) should give ~0.45, still above 0.15
461        assert!(
462            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
463            "One failure from neutral should not block: {}",
464            engine.score(&peer)
465        );
466    }
467
468    /// Verify that a previously-trusted peer needs many failures to get blocked
469    #[tokio::test]
470    async fn test_trusted_peer_resilient_to_occasional_failures() {
471        let engine = TrustEngine::new();
472        let peer = PeerId::random();
473
474        // Build up trust
475        for _ in 0..50 {
476            engine
477                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
478                .await;
479        }
480        let trusted_score = engine.score(&peer);
481
482        // A few failures shouldn't block
483        for _ in 0..3 {
484            engine
485                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
486                .await;
487        }
488
489        assert!(
490            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
491            "3 failures after 50 successes should not block: {}",
492            engine.score(&peer)
493        );
494        assert!(
495            engine.score(&peer) < trusted_score,
496            "Score should have decreased"
497        );
498    }
499
500    /// Verify removing a peer resets their state completely
501    #[tokio::test]
502    async fn test_removed_peer_starts_fresh() {
503        let engine = TrustEngine::new();
504        let peer = PeerId::random();
505
506        // Block the peer
507        for _ in 0..100 {
508            engine
509                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
510                .await;
511        }
512        assert!(engine.score(&peer) < DEFAULT_BLOCK_THRESHOLD);
513
514        // Remove and check — should be back to neutral
515        engine.remove_node(&peer).await;
516        assert!(
517            (engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
518            "Removed peer should return to neutral"
519        );
520    }
521}