1use crate::adaptive::trust::{NodeStatisticsUpdate, TrustEngine};
24use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
25use crate::{MultiAddr, PeerId};
26
27use crate::error::P2pResult as Result;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31const DEFAULT_BLOCK_THRESHOLD: f64 = 0.15;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(default)]
37pub struct AdaptiveDhtConfig {
38 pub block_threshold: f64,
43}
44
45impl Default for AdaptiveDhtConfig {
46 fn default() -> Self {
47 Self {
48 block_threshold: DEFAULT_BLOCK_THRESHOLD,
49 }
50 }
51}
52
53impl AdaptiveDhtConfig {
54 pub fn validate(&self) -> crate::error::P2pResult<()> {
58 if !(0.0..=1.0).contains(&self.block_threshold) || self.block_threshold.is_nan() {
59 return Err(crate::error::P2PError::Validation(
60 format!(
61 "block_threshold must be in [0.0, 1.0], got {}",
62 self.block_threshold
63 )
64 .into(),
65 ));
66 }
67 Ok(())
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum TrustEvent {
79 SuccessfulResponse,
82 SuccessfulConnection,
84
85 ConnectionFailed,
88 ConnectionTimeout,
90}
91
92impl TrustEvent {
93 fn to_stats_update(self) -> NodeStatisticsUpdate {
95 match self {
96 TrustEvent::SuccessfulResponse | TrustEvent::SuccessfulConnection => {
97 NodeStatisticsUpdate::CorrectResponse
98 }
99 TrustEvent::ConnectionFailed | TrustEvent::ConnectionTimeout => {
100 NodeStatisticsUpdate::FailedResponse
101 }
102 }
103 }
104}
105
106pub struct AdaptiveDHT {
112 dht_manager: Arc<DhtNetworkManager>,
114
115 trust_engine: Arc<TrustEngine>,
117
118 config: AdaptiveDhtConfig,
120}
121
122impl AdaptiveDHT {
123 pub async fn new(
135 transport: Arc<crate::transport_handle::TransportHandle>,
136 mut dht_config: DhtNetworkConfig,
137 adaptive_config: AdaptiveDhtConfig,
138 ) -> Result<Self> {
139 adaptive_config.validate()?;
140
141 dht_config.block_threshold = adaptive_config.block_threshold;
142
143 let trust_engine = Arc::new(TrustEngine::new());
144
145 let dht_manager = Arc::new(
146 DhtNetworkManager::new(transport, Some(trust_engine.clone()), dht_config).await?,
147 );
148
149 Ok(Self {
150 dht_manager,
151 trust_engine,
152 config: adaptive_config,
153 })
154 }
155
156 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
166 self.trust_engine
167 .update_node_stats(peer_id, event.to_stats_update())
168 .await;
169 }
170
171 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
175 self.trust_engine.score(peer_id)
176 }
177
178 pub fn trust_engine(&self) -> &Arc<TrustEngine> {
180 &self.trust_engine
181 }
182
183 pub fn config(&self) -> &AdaptiveDhtConfig {
185 &self.config
186 }
187
188 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
198 &self.dht_manager
199 }
200
201 pub async fn start(&self) -> Result<()> {
207 Arc::clone(&self.dht_manager).start().await
208 }
209
210 pub async fn stop(&self) -> Result<()> {
212 self.dht_manager.stop().await
213 }
214
215 pub(crate) async fn peer_addresses_for_dial(&self, peer_id: &PeerId) -> Vec<MultiAddr> {
221 self.dht_manager.peer_addresses_for_dial(peer_id).await
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST;
229
230 #[test]
231 fn test_trust_event_mapping() {
232 assert!(matches!(
234 TrustEvent::SuccessfulResponse.to_stats_update(),
235 NodeStatisticsUpdate::CorrectResponse
236 ));
237 assert!(matches!(
238 TrustEvent::SuccessfulConnection.to_stats_update(),
239 NodeStatisticsUpdate::CorrectResponse
240 ));
241
242 assert!(matches!(
244 TrustEvent::ConnectionFailed.to_stats_update(),
245 NodeStatisticsUpdate::FailedResponse
246 ));
247 assert!(matches!(
248 TrustEvent::ConnectionTimeout.to_stats_update(),
249 NodeStatisticsUpdate::FailedResponse
250 ));
251 }
252
253 #[test]
254 fn test_adaptive_dht_config_defaults() {
255 let config = AdaptiveDhtConfig::default();
256 assert!((config.block_threshold - DEFAULT_BLOCK_THRESHOLD).abs() < f64::EPSILON);
257 }
258
259 #[test]
260 fn test_block_threshold_validation_rejects_invalid() {
261 for &bad in &[-0.1, 1.1, f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
262 let config = AdaptiveDhtConfig {
263 block_threshold: bad,
264 };
265 assert!(
266 config.validate().is_err(),
267 "block_threshold {bad} should fail validation"
268 );
269 }
270 }
271
272 #[test]
273 fn test_block_threshold_validation_accepts_valid() {
274 for &good in &[0.0, 0.15, 0.5, 1.0] {
275 let config = AdaptiveDhtConfig {
276 block_threshold: good,
277 };
278 assert!(
279 config.validate().is_ok(),
280 "block_threshold {good} should pass validation"
281 );
282 }
283 }
284
285 #[tokio::test]
291 async fn test_trust_events_affect_scores() {
292 let engine = Arc::new(TrustEngine::new());
293 let peer = PeerId::random();
294
295 assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
297
298 for _ in 0..10 {
300 engine
301 .update_node_stats(&peer, TrustEvent::SuccessfulResponse.to_stats_update())
302 .await;
303 }
304
305 assert!(engine.score(&peer) > DEFAULT_NEUTRAL_TRUST);
306 }
307
308 #[tokio::test]
310 async fn test_failures_reduce_trust_below_block_threshold() {
311 let engine = Arc::new(TrustEngine::new());
312 let bad_peer = PeerId::random();
313
314 for _ in 0..20 {
316 engine
317 .update_node_stats(&bad_peer, TrustEvent::ConnectionFailed.to_stats_update())
318 .await;
319 }
320
321 let trust = engine.score(&bad_peer);
322 assert!(
323 trust < DEFAULT_BLOCK_THRESHOLD,
324 "Bad peer trust {trust} should be below block threshold {DEFAULT_BLOCK_THRESHOLD}"
325 );
326 }
327
328 #[tokio::test]
330 async fn test_trust_scores_bounded() {
331 let engine = Arc::new(TrustEngine::new());
332 let peer = PeerId::random();
333
334 for _ in 0..100 {
335 engine
336 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
337 .await;
338 }
339
340 let score = engine.score(&peer);
341 assert!(score >= 0.0, "Score must be >= 0.0, got {score}");
342 assert!(score <= 1.0, "Score must be <= 1.0, got {score}");
343 }
344
345 #[test]
347 fn test_all_trust_events_produce_valid_updates() {
348 let events = [
349 TrustEvent::SuccessfulResponse,
350 TrustEvent::SuccessfulConnection,
351 TrustEvent::ConnectionFailed,
352 TrustEvent::ConnectionTimeout,
353 ];
354
355 for event in events {
356 let _update = event.to_stats_update();
358 }
359 }
360
361 #[tokio::test]
367 async fn test_peer_lifecycle_block_and_recovery() {
368 let engine = TrustEngine::new();
369 let peer = PeerId::random();
370
371 assert!(
373 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
374 "New peer should not be blocked"
375 );
376
377 for _ in 0..20 {
379 engine
380 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
381 .await;
382 }
383 let good_score = engine.score(&peer);
384 assert!(
385 good_score > DEFAULT_NEUTRAL_TRUST,
386 "Trusted peer: {good_score}"
387 );
388
389 for _ in 0..200 {
391 engine
392 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
393 .await;
394 }
395 let bad_score = engine.score(&peer);
396 assert!(
397 bad_score < DEFAULT_BLOCK_THRESHOLD,
398 "After many failures, peer should be blocked: {bad_score}"
399 );
400
401 let three_days = std::time::Duration::from_secs(3 * 24 * 3600);
403 engine.simulate_elapsed(&peer, three_days).await;
404 let recovered_score = engine.score(&peer);
405 assert!(
406 recovered_score >= DEFAULT_BLOCK_THRESHOLD,
407 "After 3 days idle, peer should be unblocked: {recovered_score}"
408 );
409 }
410
411 #[tokio::test]
413 async fn test_block_threshold_is_binary() {
414 let engine = TrustEngine::new();
415 let threshold = DEFAULT_BLOCK_THRESHOLD;
416
417 let peer_above = PeerId::random();
418 let peer_below = PeerId::random();
419
420 for _ in 0..5 {
422 engine
423 .update_node_stats(&peer_above, NodeStatisticsUpdate::CorrectResponse)
424 .await;
425 }
426 assert!(
427 engine.score(&peer_above) >= threshold,
428 "Peer with successes should be above threshold"
429 );
430
431 for _ in 0..50 {
433 engine
434 .update_node_stats(&peer_below, NodeStatisticsUpdate::FailedResponse)
435 .await;
436 }
437 assert!(
438 engine.score(&peer_below) < threshold,
439 "Peer with only failures should be below threshold"
440 );
441
442 let unknown = PeerId::random();
444 assert!(
445 engine.score(&unknown) >= threshold,
446 "Unknown peer at neutral should not be blocked"
447 );
448 }
449
450 #[tokio::test]
452 async fn test_single_failure_does_not_block() {
453 let engine = TrustEngine::new();
454 let peer = PeerId::random();
455
456 engine
457 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
458 .await;
459
460 assert!(
462 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
463 "One failure from neutral should not block: {}",
464 engine.score(&peer)
465 );
466 }
467
468 #[tokio::test]
470 async fn test_trusted_peer_resilient_to_occasional_failures() {
471 let engine = TrustEngine::new();
472 let peer = PeerId::random();
473
474 for _ in 0..50 {
476 engine
477 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
478 .await;
479 }
480 let trusted_score = engine.score(&peer);
481
482 for _ in 0..3 {
484 engine
485 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
486 .await;
487 }
488
489 assert!(
490 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
491 "3 failures after 50 successes should not block: {}",
492 engine.score(&peer)
493 );
494 assert!(
495 engine.score(&peer) < trusted_score,
496 "Score should have decreased"
497 );
498 }
499
500 #[tokio::test]
502 async fn test_removed_peer_starts_fresh() {
503 let engine = TrustEngine::new();
504 let peer = PeerId::random();
505
506 for _ in 0..100 {
508 engine
509 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
510 .await;
511 }
512 assert!(engine.score(&peer) < DEFAULT_BLOCK_THRESHOLD);
513
514 engine.remove_node(&peer).await;
516 assert!(
517 (engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
518 "Removed peer should return to neutral"
519 );
520 }
521}