1use core::fmt;
2use std::collections::HashMap;
3use std::time::{Duration, Instant};
4
5use rand::distributions::weighted::WeightedIndex;
6use rand::distributions::Distribution;
7use rand::Rng;
8use tracing::debug;
9
10use malachitebft_peer::PeerId;
11
12pub mod ema;
13pub mod metrics;
14
15use metrics::Metrics;
16
17#[derive(Copy, Clone, Debug)]
19pub enum SyncResult {
20 Success(Duration),
22
23 Timeout,
25
26 Failure,
28}
29
30pub type Score = f64;
31
32pub trait ScoringStrategy: Send + Sync {
34 fn initial_score(&self, peer_id: PeerId) -> Score;
39
40 fn update_score(&mut self, previous_score: Score, result: SyncResult) -> Score;
45}
46
47#[derive(Copy, Clone, Debug, Default)]
48pub enum Strategy {
49 #[default]
51 Ema,
52}
53
54#[derive(Copy, Clone)]
55pub struct PeerScore {
56 score: Score,
57 last_update: Instant,
58}
59
60impl PeerScore {
61 pub fn new(score: Score) -> Self {
62 Self {
63 score,
64 last_update: Instant::now(),
65 }
66 }
67}
68
69impl fmt::Debug for PeerScore {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 fn round_score(score: Score) -> f64 {
73 (score * 1000.0).round() / 1000.0
74 }
75
76 f.debug_struct("PeerScore")
77 .field("score", &round_score(self.score))
78 .field("last_update", &self.last_update.elapsed())
79 .finish()
80 }
81}
82
83pub struct PeerScorer {
85 scores: HashMap<PeerId, PeerScore>,
86 strategy: Box<dyn ScoringStrategy>,
87}
88
89impl PeerScorer {
90 pub fn new(strategy: impl ScoringStrategy + 'static) -> Self {
92 Self {
93 scores: HashMap::new(),
94 strategy: Box::new(strategy),
95 }
96 }
97
98 pub fn update_score_with_metrics(
101 &mut self,
102 peer_id: PeerId,
103 result: SyncResult,
104 metrics: &Metrics,
105 ) -> Score {
106 let new_score = self.update_score(peer_id, result);
107 metrics.observe_score(peer_id, new_score);
108 new_score
109 }
110
111 pub fn update_score(&mut self, peer_id: PeerId, result: SyncResult) -> Score {
114 let peer_score = self
115 .scores
116 .entry(peer_id)
117 .or_insert_with(|| PeerScore::new(self.strategy.initial_score(peer_id)));
118
119 let previous_score = peer_score.score;
120
121 debug!("Updating score for peer {peer_id}");
122 debug!(" Result = {result:?}");
123 debug!(" Prev = {previous_score}");
124
125 let new_score = self.strategy.update_score(previous_score, result);
126 debug!(" New = {new_score}");
127
128 peer_score.score = new_score;
129 peer_score.last_update = Instant::now();
130
131 new_score
132 }
133
134 pub fn get_score(&self, peer_id: &PeerId) -> Score {
136 self.scores
137 .get(peer_id)
138 .map(|p| p.score)
139 .unwrap_or(self.strategy.initial_score(*peer_id))
140 }
141
142 pub fn get_scores(&self) -> &HashMap<PeerId, PeerScore> {
144 &self.scores
145 }
146
147 pub fn select_peer<R: Rng>(&self, peers: &[PeerId], rng: &mut R) -> Option<PeerId> {
149 if peers.is_empty() {
150 return None;
151 }
152
153 let scores = peers.iter().map(|id| self.get_score(id).max(0.0));
154
155 let distr = WeightedIndex::new(scores).ok()?;
157 let index = distr.sample(rng);
158
159 assert!(index < peers.len(), "Index out of bounds");
160 Some(peers[index])
161 }
162
163 pub fn reset_inactive_peers_scores(&mut self, inactive_threshold: Duration) {
172 let now = Instant::now();
173
174 self.scores
175 .retain(|_, score| now.duration_since(score.last_update) < inactive_threshold);
176 }
177}
178
179impl Default for PeerScorer {
180 fn default() -> Self {
181 Self::new(ema::ExponentialMovingAverage::default())
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use arbtest::arbtest;
189 use rand::rngs::StdRng;
190 use rand::SeedableRng;
191 use std::collections::HashSet;
192 use std::ops::RangeInclusive;
193
194 use arbtest::arbitrary::{Result, Unstructured};
195
196 fn arb_response_time(u: &mut Unstructured) -> Result<Duration> {
197 u.int_in_range(10..=5000).map(Duration::from_millis)
198 }
199
200 fn arb_response_time_fast(u: &mut Unstructured, slow_threshold: Duration) -> Result<Duration> {
201 let max = slow_threshold.as_millis() as u64 - 10;
202 u.int_in_range(10..=max).map(Duration::from_millis)
203 }
204
205 fn arb_response_time_slow(u: &mut Unstructured, slow_threshold: Duration) -> Result<Duration> {
206 let min = slow_threshold.as_millis() as u64;
207 let max = slow_threshold.as_millis() as u64 * 5;
208 u.int_in_range(min..=max).map(Duration::from_millis)
209 }
210
211 fn arb_sync_result(u: &mut Unstructured) -> Result<SyncResult> {
212 let result_type = u.int_in_range(0..=2)?;
213
214 Ok(match result_type {
215 0 => SyncResult::Success(arb_response_time(u)?),
216 1 => SyncResult::Timeout,
217 2 => SyncResult::Failure,
218 _ => unreachable!(),
219 })
220 }
221
222 fn arb_sync_result_success_fast(
223 u: &mut Unstructured,
224 slow_threshold: Duration,
225 ) -> Result<SyncResult> {
226 Ok(SyncResult::Success(arb_response_time_fast(
227 u,
228 slow_threshold,
229 )?))
230 }
231
232 fn arb_sync_result_failure(u: &mut Unstructured) -> Result<SyncResult> {
233 let result_type = u.int_in_range(0..=1)?;
234 Ok(match result_type {
235 0 => SyncResult::Timeout,
236 1 => SyncResult::Failure,
237 _ => unreachable!(),
238 })
239 }
240
241 fn arb_strategy(u: &mut Unstructured) -> Result<ema::ExponentialMovingAverage> {
242 let alpha_success = u.choose(&[0.20, 0.25, 0.30])?;
243 let alpha_timeout = u.choose(&[0.10, 0.15, 0.20])?;
244 let alpha_failure = u.choose(&[0.10, 0.15, 0.20])?;
245 let slow_threshold = u.int_in_range(1000..=5000)?;
246
247 Ok(ema::ExponentialMovingAverage::new(
248 *alpha_success,
249 *alpha_timeout,
250 *alpha_failure,
251 Duration::from_millis(slow_threshold),
252 ))
253 }
254
255 fn arb_vec<T>(
256 u: &mut Unstructured,
257 f: impl Fn(&mut Unstructured) -> Result<T>,
258 size: RangeInclusive<usize>,
259 ) -> Result<Vec<T>> {
260 let size = u.int_in_range(size)?;
261 (0..size).map(|_| f(u)).collect::<Result<Vec<T>>>()
262 }
263
264 #[test]
266 fn scores_are_bounded() {
267 arbtest(|u| {
268 let strategy = arb_strategy(u)?;
269 let results = arb_vec(
270 u,
271 |u| arb_sync_result_success_fast(u, strategy.slow_threshold),
272 10..=100,
273 )?;
274
275 let mut scorer = PeerScorer::new(strategy);
276 let peer_id = PeerId::random();
277
278 let initial_score = scorer.get_score(&peer_id);
280 assert!((0.0..=1.0).contains(&initial_score));
281
282 for result in results {
284 scorer.update_score(peer_id, result);
285 let score = scorer.get_score(&peer_id);
286 assert!(
287 (0.0..=1.0).contains(&score),
288 "Score {score} is out of bounds after result {result:?}",
289 );
290 }
291
292 Ok(())
293 });
294
295 arbtest(|u| {
296 let strategy = arb_strategy(u)?;
297 let results = arb_vec(u, arb_sync_result_failure, 10..=100)?;
298
299 let mut scorer = PeerScorer::new(strategy);
300 let peer_id = PeerId::random();
301
302 let initial_score = scorer.get_score(&peer_id);
304 assert!((0.0..=1.0).contains(&initial_score));
305
306 for result in results {
308 scorer.update_score(peer_id, result);
309 let score = scorer.get_score(&peer_id);
310 assert!(
311 (0.0..=1.0).contains(&score),
312 "Score {score} is out of bounds after result {result:?}",
313 );
314 }
315
316 Ok(())
317 });
318 }
319
320 #[test]
322 fn fast_responses_improve_score() {
323 arbtest(|u| {
324 let strategy = arb_strategy(u)?;
325 let response_time = arb_response_time_fast(u, strategy.slow_threshold)?;
326
327 let mut scorer = PeerScorer::new(strategy);
328 let peer_id = PeerId::random();
329
330 let initial_score = scorer.get_score(&peer_id);
331 let update_score = scorer
332 .strategy
333 .update_score(initial_score, SyncResult::Success(response_time));
334
335 assert!(
336 update_score > initial_score,
337 "Fast response decreased score: {initial_score} -> {update_score}",
338 );
339
340 Ok(())
341 });
342 }
343
344 #[test]
346 fn slow_responses_decrease_high_score() {
347 arbtest(|u| {
348 let strategy = arb_strategy(u)?;
349 let response_time = arb_response_time_slow(u, strategy.slow_threshold)?;
350
351 let mut scorer = PeerScorer::new(strategy);
352
353 let initial_score = 1.0;
354 let update_score = scorer
355 .strategy
356 .update_score(initial_score, SyncResult::Success(response_time));
357
358 assert!(
359 update_score < initial_score,
360 "Slow response ({response_time:?}) should decrease score: {initial_score} -> {update_score}",
361 );
362
363 Ok(())
364 });
365 }
366
367 #[test]
369 fn failures_decrease_score() {
370 arbtest(|u| {
371 let strategy = arb_strategy(u)?;
372 let failure_type = u.choose(&[SyncResult::Timeout, SyncResult::Failure])?;
373
374 let mut scorer = PeerScorer::new(strategy);
375 let peer_id = PeerId::random();
376
377 let initial_score = scorer.get_score(&peer_id);
378 let update_score = scorer.strategy.update_score(initial_score, *failure_type);
379
380 assert!(
381 update_score < initial_score,
382 "Failure/timeout should decrease score: {initial_score} -> {update_score} for {failure_type:?}",
383 );
384
385 Ok(())
386 });
387 }
388
389 #[test]
391 fn peer_selection_is_deterministic() {
392 arbtest(|u| {
393 let peer_count = u.int_in_range(2usize..=10)?;
394 let seed = u.arbitrary()?;
395 let results = arb_vec(u, arb_sync_result, 0..=50)?;
396
397 let peers: Vec<_> = (0..peer_count).map(|_| PeerId::random()).collect();
398
399 let mut scorer1 = PeerScorer::default();
400 let mut scorer2 = PeerScorer::default();
401
402 for (i, result) in results.into_iter().enumerate() {
404 let peer_id = peers[i % peers.len()];
405 scorer1.update_score(peer_id, result);
406 scorer2.update_score(peer_id, result);
407 }
408
409 let mut rng1 = StdRng::seed_from_u64(seed);
411 let mut rng2 = StdRng::seed_from_u64(seed);
412
413 for _ in 0..10 {
414 let selection1 = scorer1.select_peer(&peers, &mut rng1);
415 let selection2 = scorer2.select_peer(&peers, &mut rng2);
416 assert_eq!(selection1, selection2);
417 }
418
419 Ok(())
420 });
421 }
422
423 #[test]
425 fn all_peers_selectable() {
426 arbtest(|u| {
427 let peer_count = u.int_in_range(2_usize..=6)?;
428 let results = arb_vec(u, arb_sync_result, 0..=20)?;
429
430 let peers: Vec<PeerId> = (0..peer_count).map(|_| PeerId::random()).collect();
431 let mut scorer = PeerScorer::default();
432
433 for (i, result) in results.iter().enumerate() {
435 let peer_id = peers[i % peers.len()];
436 scorer.update_score(peer_id, *result);
437 }
438
439 let mut rng = StdRng::seed_from_u64(42);
441 let mut selected_peers = HashSet::new();
442
443 for _ in 0..1000 {
444 if let Some(selected) = scorer.select_peer(&peers, &mut rng) {
445 selected_peers.insert(selected);
446 }
447 }
448
449 let selection_ratio = selected_peers.len() as f64 / peers.len() as f64;
452 assert!(
453 selection_ratio >= 0.8,
454 "Only {}/{} peers were selected",
455 selected_peers.len(),
456 peers.len()
457 );
458
459 Ok(())
460 });
461 }
462
463 #[test]
465 fn higher_scores_selected_more_frequently() {
466 arbtest(|u| {
467 let good_results = arb_vec(
468 u,
469 |u| {
470 u.choose_iter([
471 SyncResult::Success(Duration::from_millis(50)),
472 SyncResult::Success(Duration::from_millis(100)),
473 ])
474 },
475 5..=15,
476 )?;
477
478 let bad_results = arb_vec(
479 u,
480 |u| u.choose_iter([SyncResult::Timeout, SyncResult::Failure]),
481 5..=15,
482 )?;
483
484 let good_peer = PeerId::random();
485 let bad_peer = PeerId::random();
486 assert_ne!(good_peer, bad_peer, "Peers should be distinct");
487
488 let peers = vec![good_peer, bad_peer];
489
490 let mut scorer = PeerScorer::default();
491
492 for result in good_results {
494 scorer.update_score(good_peer, result);
495 }
496
497 for result in bad_results {
499 scorer.update_score(bad_peer, result);
500 }
501
502 let good_score = scorer.get_score(&good_peer);
503 let bad_score = scorer.get_score(&bad_peer);
504
505 if good_score > bad_score + 0.1 {
507 let mut rng = StdRng::seed_from_u64(123);
508 let mut good_selections = 0;
509 let mut bad_selections = 0;
510
511 for _ in 0..1000 {
512 match scorer.select_peer(&peers, &mut rng) {
513 Some(peer) if peer == good_peer => good_selections += 1,
514 Some(peer) if peer == bad_peer => bad_selections += 1,
515 _ => {}
516 }
517 }
518
519 assert!(
520 good_selections > bad_selections,
521 "Good peer (score: {good_score}) selected {good_selections} times, bad peer (score: {bad_score}) selected {bad_selections} times"
522 );
523 }
524
525 Ok(())
526 });
527 }
528
529 #[test]
532 #[ignore]
533 fn monotonic_score_updates() {
534 arbtest(|u| {
535 let strategy = arb_strategy(u)?;
536 let result = arb_sync_result(u)?;
537 let update_count = u.int_in_range(1_usize..=20)?;
538
539 let mut scorer = PeerScorer::new(strategy);
540 let mut current_score = scorer.strategy.initial_score(PeerId::random());
541 let mut scores = vec![current_score];
542
543 println!(
544 "Testing monotonicity for result {:?} over {} updates, threshold={:?}",
545 result, update_count, strategy.slow_threshold
546 );
547
548 for _ in 0..update_count {
549 current_score = scorer.strategy.update_score(current_score, result);
550 scores.push(current_score);
551 }
552
553 match result {
555 SyncResult::Success(rt) if rt < strategy.slow_threshold => {
556 for window in scores.windows(2) {
558 let diff = window[1] - window[0];
559 assert!(
560 diff >= 0.0,
561 "Fast response ({rt:?}) should improve score: {} -> {}",
562 window[0],
563 window[1]
564 );
565 }
566 }
567 SyncResult::Success(rt) => {
568 for window in scores.windows(2) {
570 assert!(
571 window[1] <= window[0],
572 "Slow response ({rt:?}) should decrease score: {} -> {}",
573 window[0],
574 window[1]
575 );
576 }
577 }
578 SyncResult::Timeout | SyncResult::Failure => {
579 for window in scores.windows(2) {
581 assert!(
582 window[1] <= window[0],
583 "Timeouts and failures should decrease score: {} -> {}",
584 window[0],
585 window[1]
586 );
587 }
588 }
589 }
590
591 Ok(())
592 });
593 }
594
595 #[test]
597 fn empty_peer_list_returns_none() {
598 arbtest(|u| {
599 let seed = u.arbitrary()?;
600
601 let scorer = PeerScorer::default();
602 let mut rng = StdRng::seed_from_u64(seed);
603 let result = scorer.select_peer(&[], &mut rng);
604 assert_eq!(result, None);
605 Ok(())
606 });
607 }
608
609 #[test]
611 fn single_peer_always_selected() {
612 arbtest(|u| {
613 let seed = u.arbitrary()?;
614 let results = arb_vec(u, arb_sync_result, 0..=10)?;
615
616 let peer = PeerId::random();
617 let peers = vec![peer];
618 let mut scorer = PeerScorer::default();
619
620 for result in results {
622 scorer.update_score(peer, result);
623 }
624
625 let mut rng = StdRng::seed_from_u64(seed);
626 for _ in 0..10 {
627 let selected = scorer.select_peer(&peers, &mut rng);
628 assert_eq!(selected, Some(peer));
629 }
630
631 Ok(())
632 });
633 }
634
635 #[test]
637 fn response_time_affects_success_score() {
638 arbtest(|u| {
639 let strategy = arb_strategy(u)?;
640 let fast_time = u.int_in_range(10_u64..=100)?;
641 let slow_time = u.int_in_range(1000_u64..=5000)?;
642
643 let mut scorer = PeerScorer::new(strategy);
644 let initial_score = scorer.strategy.initial_score(PeerId::random());
645
646 let fast_result = SyncResult::Success(Duration::from_millis(fast_time));
647 let slow_result = SyncResult::Success(Duration::from_millis(slow_time));
648
649 let fast_score = scorer.strategy.update_score(initial_score, fast_result);
650 let slow_score = scorer.strategy.update_score(initial_score, slow_result);
651
652 assert!(
653 fast_score >= slow_score,
654 "Fast response ({fast_time} ms) should score >= slow response ({slow_time} ms): {fast_score} vs {slow_score}"
655 );
656
657 Ok(())
658 });
659 }
660
661 #[test]
663 fn updating_one_peer_does_not_affect_others() {
664 arbtest(|u| {
665 let strategy = arb_strategy(u)?;
666 let results = arb_vec(u, arb_sync_result, 0..=10)?;
667
668 let mut scorer = PeerScorer::new(strategy);
669 let peer1 = PeerId::random();
670 let peer2 = PeerId::random();
671
672 for result in &results {
674 scorer.update_score(peer1, *result);
675 }
676
677 let initial_score_peer2 = scorer.get_score(&peer2);
679
680 for result in &results {
682 scorer.update_score(peer1, *result);
683 }
684
685 let final_score_peer2 = scorer.get_score(&peer2);
687 assert_eq!(initial_score_peer2, final_score_peer2);
688
689 Ok(())
690 });
691 }
692
693 #[test]
695 fn fast_response_help_recover_score_quickly() {
696 arbtest(|u| {
697 let strategy = arb_strategy(u)?;
698 let response_time = arb_response_time_fast(u, strategy.slow_threshold)?;
699
700 let mut scorer = PeerScorer::new(strategy);
701 let peer_id = PeerId::random();
702
703 let initial_score = scorer.get_score(&peer_id);
704
705 scorer.update_score(peer_id, SyncResult::Timeout);
707 let score_after_timeout = scorer.get_score(&peer_id);
708
709 assert!(
711 score_after_timeout < initial_score,
712 "Score after timeout ({score_after_timeout}) should be lower than initial score ({initial_score})"
713 );
714
715 scorer.update_score(peer_id, SyncResult::Success(response_time));
717 let score_after_success = scorer.get_score(&peer_id);
718
719 assert!(
721 score_after_success > initial_score,
722 "Score after success ({score_after_success}) should be greater than initial score ({initial_score})"
723 );
724
725 Ok(())
726 });
727 }
728
729 #[test]
731 fn pruning_inactive_peers_resets_scores() {
732 arbtest(|u| {
733 let strategy = arb_strategy(u)?;
734 let mut scorer = PeerScorer::new(strategy);
735 let peer_id = PeerId::random();
736
737 scorer.update_score(peer_id, SyncResult::Success(Duration::from_millis(100)));
739
740 assert!(scorer.get_scores().contains_key(&peer_id));
742
743 scorer.reset_inactive_peers_scores(Duration::from_millis(0));
745
746 assert!(!scorer.get_scores().contains_key(&peer_id));
748 assert_eq!(scorer.get_score(&peer_id), strategy.initial_score(peer_id));
749
750 Ok(())
751 });
752 }
753}