1use crate::PeerId;
24use crate::adaptive::trust::{NodeStatisticsUpdate, TrustEngine};
25use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
26
27use crate::error::P2pResult as Result;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31const DEFAULT_BLOCK_THRESHOLD: f64 = 0.15;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(default)]
37pub struct AdaptiveDhtConfig {
38 pub block_threshold: f64,
43}
44
45impl Default for AdaptiveDhtConfig {
46 fn default() -> Self {
47 Self {
48 block_threshold: DEFAULT_BLOCK_THRESHOLD,
49 }
50 }
51}
52
53impl AdaptiveDhtConfig {
54 pub fn validate(&self) -> crate::error::P2pResult<()> {
58 if !(0.0..=1.0).contains(&self.block_threshold) || self.block_threshold.is_nan() {
59 return Err(crate::error::P2PError::Validation(
60 format!(
61 "block_threshold must be in [0.0, 1.0], got {}",
62 self.block_threshold
63 )
64 .into(),
65 ));
66 }
67 Ok(())
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum TrustEvent {
79 SuccessfulResponse,
82 SuccessfulConnection,
84
85 ConnectionFailed,
88 ConnectionTimeout,
90}
91
92impl TrustEvent {
93 fn to_stats_update(self) -> NodeStatisticsUpdate {
95 match self {
96 TrustEvent::SuccessfulResponse | TrustEvent::SuccessfulConnection => {
97 NodeStatisticsUpdate::CorrectResponse
98 }
99 TrustEvent::ConnectionFailed | TrustEvent::ConnectionTimeout => {
100 NodeStatisticsUpdate::FailedResponse
101 }
102 }
103 }
104}
105
106pub struct AdaptiveDHT {
112 dht_manager: Arc<DhtNetworkManager>,
114
115 trust_engine: Arc<TrustEngine>,
117
118 config: AdaptiveDhtConfig,
120}
121
122impl AdaptiveDHT {
123 pub async fn new(
135 transport: Arc<crate::transport_handle::TransportHandle>,
136 mut dht_config: DhtNetworkConfig,
137 adaptive_config: AdaptiveDhtConfig,
138 ) -> Result<Self> {
139 adaptive_config.validate()?;
140
141 dht_config.block_threshold = adaptive_config.block_threshold;
142
143 let trust_engine = Arc::new(TrustEngine::new());
144
145 let dht_manager = Arc::new(
146 DhtNetworkManager::new(transport, Some(trust_engine.clone()), dht_config).await?,
147 );
148
149 Ok(Self {
150 dht_manager,
151 trust_engine,
152 config: adaptive_config,
153 })
154 }
155
156 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
166 self.trust_engine
167 .update_node_stats(peer_id, event.to_stats_update())
168 .await;
169 }
170
171 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
175 self.trust_engine.score(peer_id)
176 }
177
178 pub fn trust_engine(&self) -> &Arc<TrustEngine> {
180 &self.trust_engine
181 }
182
183 pub fn config(&self) -> &AdaptiveDhtConfig {
185 &self.config
186 }
187
188 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
198 &self.dht_manager
199 }
200
201 pub async fn start(&self) -> Result<()> {
207 Arc::clone(&self.dht_manager).start().await
208 }
209
210 pub async fn stop(&self) -> Result<()> {
212 self.dht_manager.stop().await
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST;
220
221 #[test]
222 fn test_trust_event_mapping() {
223 assert!(matches!(
225 TrustEvent::SuccessfulResponse.to_stats_update(),
226 NodeStatisticsUpdate::CorrectResponse
227 ));
228 assert!(matches!(
229 TrustEvent::SuccessfulConnection.to_stats_update(),
230 NodeStatisticsUpdate::CorrectResponse
231 ));
232
233 assert!(matches!(
235 TrustEvent::ConnectionFailed.to_stats_update(),
236 NodeStatisticsUpdate::FailedResponse
237 ));
238 assert!(matches!(
239 TrustEvent::ConnectionTimeout.to_stats_update(),
240 NodeStatisticsUpdate::FailedResponse
241 ));
242 }
243
244 #[test]
245 fn test_adaptive_dht_config_defaults() {
246 let config = AdaptiveDhtConfig::default();
247 assert!((config.block_threshold - DEFAULT_BLOCK_THRESHOLD).abs() < f64::EPSILON);
248 }
249
250 #[test]
251 fn test_block_threshold_validation_rejects_invalid() {
252 for &bad in &[-0.1, 1.1, f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
253 let config = AdaptiveDhtConfig {
254 block_threshold: bad,
255 };
256 assert!(
257 config.validate().is_err(),
258 "block_threshold {bad} should fail validation"
259 );
260 }
261 }
262
263 #[test]
264 fn test_block_threshold_validation_accepts_valid() {
265 for &good in &[0.0, 0.15, 0.5, 1.0] {
266 let config = AdaptiveDhtConfig {
267 block_threshold: good,
268 };
269 assert!(
270 config.validate().is_ok(),
271 "block_threshold {good} should pass validation"
272 );
273 }
274 }
275
276 #[tokio::test]
282 async fn test_trust_events_affect_scores() {
283 let engine = Arc::new(TrustEngine::new());
284 let peer = PeerId::random();
285
286 assert!((engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON);
288
289 for _ in 0..10 {
291 engine
292 .update_node_stats(&peer, TrustEvent::SuccessfulResponse.to_stats_update())
293 .await;
294 }
295
296 assert!(engine.score(&peer) > DEFAULT_NEUTRAL_TRUST);
297 }
298
299 #[tokio::test]
301 async fn test_failures_reduce_trust_below_block_threshold() {
302 let engine = Arc::new(TrustEngine::new());
303 let bad_peer = PeerId::random();
304
305 for _ in 0..20 {
307 engine
308 .update_node_stats(&bad_peer, TrustEvent::ConnectionFailed.to_stats_update())
309 .await;
310 }
311
312 let trust = engine.score(&bad_peer);
313 assert!(
314 trust < DEFAULT_BLOCK_THRESHOLD,
315 "Bad peer trust {trust} should be below block threshold {DEFAULT_BLOCK_THRESHOLD}"
316 );
317 }
318
319 #[tokio::test]
321 async fn test_trust_scores_bounded() {
322 let engine = Arc::new(TrustEngine::new());
323 let peer = PeerId::random();
324
325 for _ in 0..100 {
326 engine
327 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
328 .await;
329 }
330
331 let score = engine.score(&peer);
332 assert!(score >= 0.0, "Score must be >= 0.0, got {score}");
333 assert!(score <= 1.0, "Score must be <= 1.0, got {score}");
334 }
335
336 #[test]
338 fn test_all_trust_events_produce_valid_updates() {
339 let events = [
340 TrustEvent::SuccessfulResponse,
341 TrustEvent::SuccessfulConnection,
342 TrustEvent::ConnectionFailed,
343 TrustEvent::ConnectionTimeout,
344 ];
345
346 for event in events {
347 let _update = event.to_stats_update();
349 }
350 }
351
352 #[tokio::test]
358 async fn test_peer_lifecycle_block_and_recovery() {
359 let engine = TrustEngine::new();
360 let peer = PeerId::random();
361
362 assert!(
364 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
365 "New peer should not be blocked"
366 );
367
368 for _ in 0..20 {
370 engine
371 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
372 .await;
373 }
374 let good_score = engine.score(&peer);
375 assert!(
376 good_score > DEFAULT_NEUTRAL_TRUST,
377 "Trusted peer: {good_score}"
378 );
379
380 for _ in 0..200 {
382 engine
383 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
384 .await;
385 }
386 let bad_score = engine.score(&peer);
387 assert!(
388 bad_score < DEFAULT_BLOCK_THRESHOLD,
389 "After many failures, peer should be blocked: {bad_score}"
390 );
391
392 let three_days = std::time::Duration::from_secs(3 * 24 * 3600);
394 engine.simulate_elapsed(&peer, three_days).await;
395 let recovered_score = engine.score(&peer);
396 assert!(
397 recovered_score >= DEFAULT_BLOCK_THRESHOLD,
398 "After 3 days idle, peer should be unblocked: {recovered_score}"
399 );
400 }
401
402 #[tokio::test]
404 async fn test_block_threshold_is_binary() {
405 let engine = TrustEngine::new();
406 let threshold = DEFAULT_BLOCK_THRESHOLD;
407
408 let peer_above = PeerId::random();
409 let peer_below = PeerId::random();
410
411 for _ in 0..5 {
413 engine
414 .update_node_stats(&peer_above, NodeStatisticsUpdate::CorrectResponse)
415 .await;
416 }
417 assert!(
418 engine.score(&peer_above) >= threshold,
419 "Peer with successes should be above threshold"
420 );
421
422 for _ in 0..50 {
424 engine
425 .update_node_stats(&peer_below, NodeStatisticsUpdate::FailedResponse)
426 .await;
427 }
428 assert!(
429 engine.score(&peer_below) < threshold,
430 "Peer with only failures should be below threshold"
431 );
432
433 let unknown = PeerId::random();
435 assert!(
436 engine.score(&unknown) >= threshold,
437 "Unknown peer at neutral should not be blocked"
438 );
439 }
440
441 #[tokio::test]
443 async fn test_single_failure_does_not_block() {
444 let engine = TrustEngine::new();
445 let peer = PeerId::random();
446
447 engine
448 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
449 .await;
450
451 assert!(
453 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
454 "One failure from neutral should not block: {}",
455 engine.score(&peer)
456 );
457 }
458
459 #[tokio::test]
461 async fn test_trusted_peer_resilient_to_occasional_failures() {
462 let engine = TrustEngine::new();
463 let peer = PeerId::random();
464
465 for _ in 0..50 {
467 engine
468 .update_node_stats(&peer, NodeStatisticsUpdate::CorrectResponse)
469 .await;
470 }
471 let trusted_score = engine.score(&peer);
472
473 for _ in 0..3 {
475 engine
476 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
477 .await;
478 }
479
480 assert!(
481 engine.score(&peer) >= DEFAULT_BLOCK_THRESHOLD,
482 "3 failures after 50 successes should not block: {}",
483 engine.score(&peer)
484 );
485 assert!(
486 engine.score(&peer) < trusted_score,
487 "Score should have decreased"
488 );
489 }
490
491 #[tokio::test]
493 async fn test_removed_peer_starts_fresh() {
494 let engine = TrustEngine::new();
495 let peer = PeerId::random();
496
497 for _ in 0..100 {
499 engine
500 .update_node_stats(&peer, NodeStatisticsUpdate::FailedResponse)
501 .await;
502 }
503 assert!(engine.score(&peer) < DEFAULT_BLOCK_THRESHOLD);
504
505 engine.remove_node(&peer).await;
507 assert!(
508 (engine.score(&peer) - DEFAULT_NEUTRAL_TRUST).abs() < f64::EPSILON,
509 "Removed peer should return to neutral"
510 );
511 }
512}