Skip to main content

saorsa_core/adaptive/
dht.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! AdaptiveDHT — the trust boundary for all DHT operations.
15//!
16//! `AdaptiveDHT` is the **sole component** that creates and owns the [`TrustEngine`].
17//! All DHT operations flow through it, and all trust signals originate from it.
18//!
19//! Internal DHT operations (iterative lookups) record trust via the `TrustEngine`
20//! reference passed to `DhtNetworkManager`. External callers report additional
21//! trust signals through [`AdaptiveDHT::report_trust_event`].
22
23use crate::PeerId;
24use crate::adaptive::trust::{NodeStatisticsUpdate, TrustEngine};
25use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
26
27use crate::error::P2pResult as Result;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31/// Default trust score threshold below which a peer is evicted and blocked
32const DEFAULT_BLOCK_THRESHOLD: f64 = 0.15;
33
34/// Configuration for the AdaptiveDHT layer
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(default)]
37pub struct AdaptiveDhtConfig {
38    /// Trust score below which a peer is evicted from the routing table
39    /// and blocked from sending DHT messages or being re-added to the RT.
40    /// Eviction is immediate when a peer's score crosses this threshold.
41    /// Default: 0.15
42    pub block_threshold: f64,
43}
44
45impl Default for AdaptiveDhtConfig {
46    fn default() -> Self {
47        Self {
48            block_threshold: DEFAULT_BLOCK_THRESHOLD,
49        }
50    }
51}
52
53impl AdaptiveDhtConfig {
54    /// Validate that all config values are within acceptable ranges.
55    ///
56    /// Returns `Err` if `block_threshold` is outside `[0.0, 1.0]` or is NaN.
57    pub fn validate(&self) -> crate::error::P2pResult<()> {
58        if !(0.0..=1.0).contains(&self.block_threshold) || self.block_threshold.is_nan() {
59            return Err(crate::error::P2PError::Validation(
60                format!(
61                    "block_threshold must be in [0.0, 1.0], got {}",
62                    self.block_threshold
63                )
64                .into(),
65            ));
66        }
67        Ok(())
68    }
69}
70
71/// Trust-relevant events observable by the saorsa-core network layer.
72///
73/// Each variant maps to an internal [`NodeStatisticsUpdate`] with appropriate severity.
74/// Only events that saorsa-core can directly observe are included here.
75/// Application-level events (data verification, storage checks) belong in
76/// the consuming application and should be added when that layer exists.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum TrustEvent {
79    // === Positive signals ===
80    /// Peer provided a correct response to a request
81    SuccessfulResponse,
82    /// Peer connection was established and authenticated
83    SuccessfulConnection,
84
85    // === Negative signals ===
86    /// Could not establish a connection to the peer
87    ConnectionFailed,
88    /// Connection attempt timed out
89    ConnectionTimeout,
90}
91
92impl TrustEvent {
93    /// Convert a TrustEvent to the internal NodeStatisticsUpdate
94    fn to_stats_update(self) -> NodeStatisticsUpdate {
95        match self {
96            TrustEvent::SuccessfulResponse | TrustEvent::SuccessfulConnection => {
97                NodeStatisticsUpdate::CorrectResponse
98            }
99            TrustEvent::ConnectionFailed | TrustEvent::ConnectionTimeout => {
100                NodeStatisticsUpdate::FailedResponse
101            }
102        }
103    }
104}
105
106/// AdaptiveDHT — the trust boundary for all DHT operations.
107///
108/// Owns the `TrustEngine` and `DhtNetworkManager`. All DHT operations
109/// should go through this component. Application-level trust signals
110/// are reported via [`report_trust_event`](Self::report_trust_event).
111pub struct AdaptiveDHT {
112    /// The underlying DHT network manager (handles raw DHT operations)
113    dht_manager: Arc<DhtNetworkManager>,
114
115    /// The trust engine — sole authority on peer trust scores
116    trust_engine: Arc<TrustEngine>,
117
118    /// Configuration for trust-weighted behavior
119    config: AdaptiveDhtConfig,
120}
121
122impl AdaptiveDHT {
123    /// Create a new AdaptiveDHT instance.
124    ///
125    /// This creates the `TrustEngine` and the `DhtNetworkManager` with the
126    /// trust engine injected. Call [`start`](Self::start) to begin DHT
127    /// operations. Trust scores are computed live — peers are evicted
128    /// immediately when their score crosses the block threshold.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if `block_threshold` is not in `[0.0, 1.0]` or if
133    /// the underlying `DhtNetworkManager` fails to initialise.
134    pub async fn new(
135        transport: Arc<crate::transport_handle::TransportHandle>,
136        mut dht_config: DhtNetworkConfig,
137        adaptive_config: AdaptiveDhtConfig,
138    ) -> Result<Self> {
139        adaptive_config.validate()?;
140
141        dht_config.block_threshold = adaptive_config.block_threshold;
142
143        let trust_engine = Arc::new(TrustEngine::new());
144
145        let dht_manager = Arc::new(
146            DhtNetworkManager::new(transport, Some(trust_engine.clone()), dht_config).await?,
147        );
148
149        Ok(Self {
150            dht_manager,
151            trust_engine,
152            config: adaptive_config,
153        })
154    }
155
156    // =========================================================================
157    // Trust API — the only place where external callers record trust events
158    // =========================================================================
159
160    /// Report a trust event for a peer.
161    ///
162    /// Records a network-observable outcome (connection success/failure)
163    /// that the DHT layer did not record automatically. See [`TrustEvent`]
164    /// for the supported variants.
165    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
166        self.trust_engine
167            .update_node_stats(peer_id, event.to_stats_update())
168            .await;
169    }
170
171    /// Get the current trust score for a peer (synchronous).
172    ///
173    /// Returns `DEFAULT_NEUTRAL_TRUST` (0.5) for unknown peers.
174    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
175        self.trust_engine.score(peer_id)
176    }
177
178    /// Get a reference to the underlying trust engine for advanced use cases.
179    pub fn trust_engine(&self) -> &Arc<TrustEngine> {
180        &self.trust_engine
181    }
182
183    /// Get the adaptive DHT configuration.
184    pub fn config(&self) -> &AdaptiveDhtConfig {
185        &self.config
186    }
187
188    // =========================================================================
189    // DHT operations — delegates to DhtNetworkManager
190    // =========================================================================
191
192    /// Get the underlying DHT network manager.
193    ///
194    /// All DHT operations are accessible through this reference.
195    /// The DHT manager records trust internally for per-peer outcomes
196    /// during iterative lookups.
197    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
198        &self.dht_manager
199    }
200
201    /// Start the DHT manager.
202    ///
203    /// Trust scores are computed live — no background tasks needed.
204    /// Peers are evicted from the routing table immediately when their
205    /// trust drops below the block threshold.
206    pub async fn start(&self) -> Result<()> {
207        Arc::clone(&self.dht_manager).start().await
208    }
209
210    /// Stop the DHT manager gracefully.
211    pub async fn stop(&self) -> Result<()> {
212        self.dht_manager.stop().await
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST;
220
221    #[test]
222    fn test_trust_event_mapping() {
223        // Positive events map to CorrectResponse
224        assert!(matches!(
225            TrustEvent::SuccessfulResponse.to_stats_update(),
226            NodeStatisticsUpdate::CorrectResponse
227        ));
228        assert!(matches!(
229            TrustEvent::SuccessfulConnection.to_stats_update(),
230            NodeStatisticsUpdate::CorrectResponse
231        ));
232
233        // Failure events map to FailedResponse
234        assert!(matches!(
235            TrustEvent::ConnectionFailed.to_stats_update(),
236            NodeStatisticsUpdate::FailedResponse
237        ));
238        assert!(matches!(
239            TrustEvent::ConnectionTimeout.to_stats_update(),
240            NodeStatisticsUpdate::FailedResponse
241        ));
242    }
243
244    #[test]
245    fn test_adaptive_dht_config_defaults() {
246        let config = AdaptiveDhtConfig::default();
247        assert!((config.block_threshold - DEFAULT_BLOCK_THRESHOLD).abs() < f64::EPSILON);
248    }
249
250    #[test]
251    fn test_block_threshold_validation_rejects_invalid() {
252        for &bad in &[-0.1, 1.1, f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
253            let config = AdaptiveDhtConfig {
254                block_threshold: bad,
255            };
256            assert!(
257                config.validate().is_err(),
258                "block_threshold {bad} should fail validation"
259            );
260        }
261    }
262
263    #[test]
264    fn test_block_threshold_validation_accepts_valid() {
265        for &good in &[0.0, 0.15, 0.5, 1.0] {
266            let config = AdaptiveDhtConfig {
267                block_threshold: good,
268            };
269            assert!(
270                config.validate().is_ok(),
271                "block_threshold {good} should pass validation"
272            );
273        }
274    }
275
276    // =========================================================================
277    // Integration tests: full trust signal flow
278    // =========================================================================
279
280    /// Test: trust events flow through to TrustEngine and change scores immediately
281    #[tokio::test]
282    async fn test_trust_events_affect_scores() {
283        let engine = Arc::new(TrustEngine::new());
284        let peer = PeerId::random();
285
286        // Unknown peer starts at neutral trust
287        assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
288
289        // Record successes — score should rise above neutral
290        for _ in 0..10 {
291            engine
292                .update_node_stats(&peer, TrustEvent::SuccessfulResponse.to_stats_update())
293                .await;
294        }
295
296        assert!(engine.score(&peer) > DEFAULT_NEUTRAL_TRUST);
297    }
298
299    /// Test: failures reduce trust below block threshold
300    #[tokio::test]
301    async fn test_failures_reduce_trust_below_block_threshold() {
302        let engine = Arc::new(TrustEngine::new());
303        let bad_peer = PeerId::random();
304
305        // Record only failures — score should be 0.0 immediately
306        for _ in 0..20 {
307            engine
308                .update_node_stats(&bad_peer, TrustEvent::ConnectionFailed.to_stats_update())
309                .await;
310        }
311
312        let trust = engine.score(&bad_peer);
313        assert!(
314            trust < DEFAULT_BLOCK_THRESHOLD,
315            "Bad peer trust {trust} should be below block threshold {DEFAULT_BLOCK_THRESHOLD}"
316        );
317    }
318
319    /// Test: TrustEngine scores are bounded 0.0-1.0
320    #[tokio::test]
321    async fn test_trust_scores_bounded() {
322        let engine = Arc::new(TrustEngine::new());
323        let peer = PeerId::random();
324
325        for _ in 0..100 {
326            engine
327                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
328                .await;
329        }
330
331        let score = engine.score(&peer);
332        assert!(score >= 0.0, "Score must be >= 0.0, got {score}");
333        assert!(score <= 1.0, "Score must be <= 1.0, got {score}");
334    }
335
336    /// Test: all TrustEvent variants produce valid stats updates
337    #[test]
338    fn test_all_trust_events_produce_valid_updates() {
339        let events = [
340            TrustEvent::SuccessfulResponse,
341            TrustEvent::SuccessfulConnection,
342            TrustEvent::ConnectionFailed,
343            TrustEvent::ConnectionTimeout,
344        ];
345
346        for event in events {
347            // Should not panic
348            let _update = event.to_stats_update();
349        }
350    }
351
352    // =========================================================================
353    // End-to-end: peer lifecycle from trusted to blocked to unblocked
354    // =========================================================================
355
356    /// Full lifecycle: good peer → fails → blocked → time passes → unblocked
357    #[tokio::test]
358    async fn test_peer_lifecycle_block_and_recovery() {
359        let engine = TrustEngine::new();
360        let peer = PeerId::random();
361
362        // Phase 1: Peer starts at neutral
363        assert!(
364            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
365            "New peer should not be blocked"
366        );
367
368        // Phase 2: Some successes — peer is trusted
369        for _ in 0..20 {
370            engine
371                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
372                .await;
373        }
374        let good_score = engine.score(&peer);
375        assert!(
376            good_score > DEFAULT_NEUTRAL_TRUST,
377            "Trusted peer: {good_score}"
378        );
379
380        // Phase 3: Peer starts failing — score drops
381        for _ in 0..200 {
382            engine
383                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
384                .await;
385        }
386        let bad_score = engine.score(&peer);
387        assert!(
388            bad_score < DEFAULT_BLOCK_THRESHOLD,
389            "After many failures, peer should be blocked: {bad_score}"
390        );
391
392        // Phase 4: Time passes (3+ days) — score decays back toward neutral
393        let three_days = std::time::Duration::from_secs(3 * 24 * 3600);
394        engine.simulate_elapsed(&peer, three_days).await;
395        let recovered_score = engine.score(&peer);
396        assert!(
397            recovered_score >= DEFAULT_BLOCK_THRESHOLD,
398            "After 3 days idle, peer should be unblocked: {recovered_score}"
399        );
400    }
401
402    /// Verify the block threshold works as a binary gate
403    #[tokio::test]
404    async fn test_block_threshold_is_binary() {
405        let engine = TrustEngine::new();
406        let threshold = DEFAULT_BLOCK_THRESHOLD;
407
408        let peer_above = PeerId::random();
409        let peer_below = PeerId::random();
410
411        // Peer with some successes — above threshold
412        for _ in 0..5 {
413            engine
414                .update_node_stats(&peer_above, NodeStatisticsUpdate::CorrectResponse)
415                .await;
416        }
417        assert!(
418            engine.score(&peer_above) >= threshold,
419            "Peer with successes should be above threshold"
420        );
421
422        // Peer with only failures — below threshold
423        for _ in 0..50 {
424            engine
425                .update_node_stats(&peer_below, NodeStatisticsUpdate::FailedResponse)
426                .await;
427        }
428        assert!(
429            engine.score(&peer_below) < threshold,
430            "Peer with only failures should be below threshold"
431        );
432
433        // Unknown peer — at neutral, which is above threshold
434        let unknown = PeerId::random();
435        assert!(
436            engine.score(&unknown) >= threshold,
437            "Unknown peer at neutral should not be blocked"
438        );
439    }
440
441    /// Verify that a single failure doesn't immediately block a peer
442    #[tokio::test]
443    async fn test_single_failure_does_not_block() {
444        let engine = TrustEngine::new();
445        let peer = PeerId::random();
446
447        engine
448            .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
449            .await;
450
451        // A single failure from neutral (0.5) should give ~0.45, still above 0.15
452        assert!(
453            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
454            "One failure from neutral should not block: {}",
455            engine.score(&peer)
456        );
457    }
458
459    /// Verify that a previously-trusted peer needs many failures to get blocked
460    #[tokio::test]
461    async fn test_trusted_peer_resilient_to_occasional_failures() {
462        let engine = TrustEngine::new();
463        let peer = PeerId::random();
464
465        // Build up trust
466        for _ in 0..50 {
467            engine
468                .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
469                .await;
470        }
471        let trusted_score = engine.score(&peer);
472
473        // A few failures shouldn't block
474        for _ in 0..3 {
475            engine
476                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
477                .await;
478        }
479
480        assert!(
481            engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
482            "3 failures after 50 successes should not block: {}",
483            engine.score(&peer)
484        );
485        assert!(
486            engine.score(&peer) < trusted_score,
487            "Score should have decreased"
488        );
489    }
490
491    /// Verify removing a peer resets their state completely
492    #[tokio::test]
493    async fn test_removed_peer_starts_fresh() {
494        let engine = TrustEngine::new();
495        let peer = PeerId::random();
496
497        // Block the peer
498        for _ in 0..100 {
499            engine
500                .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
501                .await;
502        }
503        assert!(engine.score(&peer) < DEFAULT_BLOCK_THRESHOLD);
504
505        // Remove and check — should be back to neutral
506        engine.remove_node(&peer).await;
507        assert!(
508            (engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
509            "Removed peer should return to neutral"
510        );
511    }
512}