Skip to main content

arc_malachitebft_sync/
scoring.rs

1use core::fmt;
2use std::collections::HashMap;
3use std::time::{Duration, Instant};
4
5use rand::distributions::weighted::WeightedIndex;
6use rand::distributions::Distribution;
7use rand::Rng;
8use tracing::debug;
9
10use malachitebft_peer::PeerId;
11
12pub mod ema;
13pub mod metrics;
14
15use metrics::Metrics;
16
17/// Result of a sync request to a peer
18#[derive(Copy, Clone, Debug)]
19pub enum SyncResult {
20    /// Successful response with given response time
21    Success(Duration),
22
23    /// Timeout response
24    Timeout,
25
26    /// Failed response
27    Failure,
28}
29
30pub type Score = f64;
31
32/// Strategy for scoring peers based on sync results
33pub trait ScoringStrategy: Send + Sync {
34    /// Initial score for new peers.
35    ///
36    /// ## Important
37    /// The initial score MUST be in the `0.0..=1.0` range.
38    fn initial_score(&self, peer_id: PeerId) -> Score;
39
40    /// Update the peer score based on previous score and sync result
41    ///
42    /// ## Important
43    /// The updated score must be in the `0.0..=1.0` range.
44    fn update_score(&mut self, previous_score: Score, result: SyncResult) -> Score;
45}
46
47#[derive(Copy, Clone, Debug, Default)]
48pub enum Strategy {
49    /// Exponential moving average strategy
50    #[default]
51    Ema,
52}
53
54#[derive(Copy, Clone)]
55pub struct PeerScore {
56    score: Score,
57    last_update: Instant,
58}
59
60impl PeerScore {
61    pub fn new(score: Score) -> Self {
62        Self {
63            score,
64            last_update: Instant::now(),
65        }
66    }
67}
68
69impl fmt::Debug for PeerScore {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        // Round score to 3 decimal places for readability
72        fn round_score(score: Score) -> f64 {
73            (score * 1000.0).round() / 1000.0
74        }
75
76        f.debug_struct("PeerScore")
77            .field("score", &round_score(self.score))
78            .field("last_update", &self.last_update.elapsed())
79            .finish()
80    }
81}
82
83/// Tracks peer scores using a scoring strategy
84pub struct PeerScorer {
85    scores: HashMap<PeerId, PeerScore>,
86    strategy: Box<dyn ScoringStrategy>,
87}
88
89impl PeerScorer {
90    /// Create a new peer scorer with specified strategy
91    pub fn new(strategy: impl ScoringStrategy + 'static) -> Self {
92        Self {
93            scores: HashMap::new(),
94            strategy: Box::new(strategy),
95        }
96    }
97
98    /// Update a peer's score based on the result of a sync request, recording the result in metrics.
99    /// Returns the new score.
100    pub fn update_score_with_metrics(
101        &mut self,
102        peer_id: PeerId,
103        result: SyncResult,
104        metrics: &Metrics,
105    ) -> Score {
106        let new_score = self.update_score(peer_id, result);
107        metrics.observe_score(peer_id, new_score);
108        new_score
109    }
110
111    /// Update a peer's score based on the result of a sync request.
112    /// Returns the new score.
113    pub fn update_score(&mut self, peer_id: PeerId, result: SyncResult) -> Score {
114        let peer_score = self
115            .scores
116            .entry(peer_id)
117            .or_insert_with(|| PeerScore::new(self.strategy.initial_score(peer_id)));
118
119        let previous_score = peer_score.score;
120
121        debug!("Updating score for peer {peer_id}");
122        debug!("  Result = {result:?}");
123        debug!("    Prev = {previous_score}");
124
125        let new_score = self.strategy.update_score(previous_score, result);
126        debug!("     New = {new_score}");
127
128        peer_score.score = new_score;
129        peer_score.last_update = Instant::now();
130
131        new_score
132    }
133
134    /// Get the current score for a peer
135    pub fn get_score(&self, peer_id: &PeerId) -> Score {
136        self.scores
137            .get(peer_id)
138            .map(|p| p.score)
139            .unwrap_or(self.strategy.initial_score(*peer_id))
140    }
141
142    /// Get all peer scores
143    pub fn get_scores(&self) -> &HashMap<PeerId, PeerScore> {
144        &self.scores
145    }
146
147    /// Select a peer using weighted probabilistic selection
148    pub fn select_peer<R: Rng>(&self, peers: &[PeerId], rng: &mut R) -> Option<PeerId> {
149        if peers.is_empty() {
150            return None;
151        }
152
153        let scores = peers.iter().map(|id| self.get_score(id).max(0.0));
154
155        // Sample from peers using a weighted distribution based on their scores
156        let distr = WeightedIndex::new(scores).ok()?;
157        let index = distr.sample(rng);
158
159        assert!(index < peers.len(), "Index out of bounds");
160        Some(peers[index])
161    }
162
163    /// Prune peers whose scores have not been updated for the specified duration,
164    /// effectively resetting their score to the initial score.
165    ///
166    /// A peer might be inactive because they were not picked for a long time
167    /// due to their score being very low. Resetting their score gives them a chance to participate again.
168    ///
169    /// Note that by resetting the score we can also reduce the score of a peer,
170    /// if the peer had a high score but was inactive for a long time.
171    pub fn reset_inactive_peers_scores(&mut self, inactive_threshold: Duration) {
172        let now = Instant::now();
173
174        self.scores
175            .retain(|_, score| now.duration_since(score.last_update) < inactive_threshold);
176    }
177}
178
179impl Default for PeerScorer {
180    fn default() -> Self {
181        Self::new(ema::ExponentialMovingAverage::default())
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use arbtest::arbtest;
189    use rand::rngs::StdRng;
190    use rand::SeedableRng;
191    use std::collections::HashSet;
192    use std::ops::RangeInclusive;
193
194    use arbtest::arbitrary::{Result, Unstructured};
195
196    fn arb_response_time(u: &mut Unstructured) -> Result<Duration> {
197        u.int_in_range(10..=5000).map(Duration::from_millis)
198    }
199
200    fn arb_response_time_fast(u: &mut Unstructured, slow_threshold: Duration) -> Result<Duration> {
201        let max = slow_threshold.as_millis() as u64 - 10;
202        u.int_in_range(10..=max).map(Duration::from_millis)
203    }
204
205    fn arb_response_time_slow(u: &mut Unstructured, slow_threshold: Duration) -> Result<Duration> {
206        let min = slow_threshold.as_millis() as u64;
207        let max = slow_threshold.as_millis() as u64 * 5;
208        u.int_in_range(min..=max).map(Duration::from_millis)
209    }
210
211    fn arb_sync_result(u: &mut Unstructured) -> Result<SyncResult> {
212        let result_type = u.int_in_range(0..=2)?;
213
214        Ok(match result_type {
215            0 => SyncResult::Success(arb_response_time(u)?),
216            1 => SyncResult::Timeout,
217            2 => SyncResult::Failure,
218            _ => unreachable!(),
219        })
220    }
221
222    fn arb_sync_result_success_fast(
223        u: &mut Unstructured,
224        slow_threshold: Duration,
225    ) -> Result<SyncResult> {
226        Ok(SyncResult::Success(arb_response_time_fast(
227            u,
228            slow_threshold,
229        )?))
230    }
231
232    fn arb_sync_result_failure(u: &mut Unstructured) -> Result<SyncResult> {
233        let result_type = u.int_in_range(0..=1)?;
234        Ok(match result_type {
235            0 => SyncResult::Timeout,
236            1 => SyncResult::Failure,
237            _ => unreachable!(),
238        })
239    }
240
241    fn arb_strategy(u: &mut Unstructured) -> Result<ema::ExponentialMovingAverage> {
242        let alpha_success = u.choose(&[0.20, 0.25, 0.30])?;
243        let alpha_timeout = u.choose(&[0.10, 0.15, 0.20])?;
244        let alpha_failure = u.choose(&[0.10, 0.15, 0.20])?;
245        let slow_threshold = u.int_in_range(1000..=5000)?;
246
247        Ok(ema::ExponentialMovingAverage::new(
248            *alpha_success,
249            *alpha_timeout,
250            *alpha_failure,
251            Duration::from_millis(slow_threshold),
252        ))
253    }
254
255    fn arb_vec<T>(
256        u: &mut Unstructured,
257        f: impl Fn(&mut Unstructured) -> Result<T>,
258        size: RangeInclusive<usize>,
259    ) -> Result<Vec<T>> {
260        let size = u.int_in_range(size)?;
261        (0..size).map(|_| f(u)).collect::<Result<Vec<T>>>()
262    }
263
264    // Property: Scores are bounded between 0.0 and 1.0
265    #[test]
266    fn scores_are_bounded() {
267        arbtest(|u| {
268            let strategy = arb_strategy(u)?;
269            let results = arb_vec(
270                u,
271                |u| arb_sync_result_success_fast(u, strategy.slow_threshold),
272                10..=100,
273            )?;
274
275            let mut scorer = PeerScorer::new(strategy);
276            let peer_id = PeerId::random();
277
278            // Initial score should be bounded
279            let initial_score = scorer.get_score(&peer_id);
280            assert!((0.0..=1.0).contains(&initial_score));
281
282            // All updated scores should remain bounded
283            for result in results {
284                scorer.update_score(peer_id, result);
285                let score = scorer.get_score(&peer_id);
286                assert!(
287                    (0.0..=1.0).contains(&score),
288                    "Score {score} is out of bounds after result {result:?}",
289                );
290            }
291
292            Ok(())
293        });
294
295        arbtest(|u| {
296            let strategy = arb_strategy(u)?;
297            let results = arb_vec(u, arb_sync_result_failure, 10..=100)?;
298
299            let mut scorer = PeerScorer::new(strategy);
300            let peer_id = PeerId::random();
301
302            // Initial score should be bounded
303            let initial_score = scorer.get_score(&peer_id);
304            assert!((0.0..=1.0).contains(&initial_score));
305
306            // All updated scores should remain bounded
307            for result in results {
308                scorer.update_score(peer_id, result);
309                let score = scorer.get_score(&peer_id);
310                assert!(
311                    (0.0..=1.0).contains(&score),
312                    "Score {score} is out of bounds after result {result:?}",
313                );
314            }
315
316            Ok(())
317        });
318    }
319
320    // Property: Fast responses should improve the score
321    #[test]
322    fn fast_responses_improve_score() {
323        arbtest(|u| {
324            let strategy = arb_strategy(u)?;
325            let response_time = arb_response_time_fast(u, strategy.slow_threshold)?;
326
327            let mut scorer = PeerScorer::new(strategy);
328            let peer_id = PeerId::random();
329
330            let initial_score = scorer.get_score(&peer_id);
331            let update_score = scorer
332                .strategy
333                .update_score(initial_score, SyncResult::Success(response_time));
334
335            assert!(
336                update_score > initial_score,
337                "Fast response decreased score: {initial_score} -> {update_score}",
338            );
339
340            Ok(())
341        });
342    }
343
344    // Property: Slow responses should decrease the score if it was already high
345    #[test]
346    fn slow_responses_decrease_high_score() {
347        arbtest(|u| {
348            let strategy = arb_strategy(u)?;
349            let response_time = arb_response_time_slow(u, strategy.slow_threshold)?;
350
351            let mut scorer = PeerScorer::new(strategy);
352
353            let initial_score = 1.0;
354            let update_score = scorer
355                .strategy
356                .update_score(initial_score, SyncResult::Success(response_time));
357
358            assert!(
359                update_score < initial_score,
360                "Slow response ({response_time:?}) should decrease score: {initial_score} -> {update_score}",
361            );
362
363            Ok(())
364        });
365    }
366
367    // Property: Failures and timeouts should decrease scores
368    #[test]
369    fn failures_decrease_score() {
370        arbtest(|u| {
371            let strategy = arb_strategy(u)?;
372            let failure_type = u.choose(&[SyncResult::Timeout, SyncResult::Failure])?;
373
374            let mut scorer = PeerScorer::new(strategy);
375            let peer_id = PeerId::random();
376
377            let initial_score = scorer.get_score(&peer_id);
378            let update_score = scorer.strategy.update_score(initial_score, *failure_type);
379
380            assert!(
381                update_score < initial_score,
382                "Failure/timeout should decrease score: {initial_score} -> {update_score} for {failure_type:?}",
383            );
384
385            Ok(())
386        });
387    }
388
389    // Property: Peer selection should be deterministic with same RNG seed
390    #[test]
391    fn peer_selection_is_deterministic() {
392        arbtest(|u| {
393            let peer_count = u.int_in_range(2usize..=10)?;
394            let seed = u.arbitrary()?;
395            let results = arb_vec(u, arb_sync_result, 0..=50)?;
396
397            let peers: Vec<_> = (0..peer_count).map(|_| PeerId::random()).collect();
398
399            let mut scorer1 = PeerScorer::default();
400            let mut scorer2 = PeerScorer::default();
401
402            // Apply same updates to both scorers
403            for (i, result) in results.into_iter().enumerate() {
404                let peer_id = peers[i % peers.len()];
405                scorer1.update_score(peer_id, result);
406                scorer2.update_score(peer_id, result);
407            }
408
409            // Select peers with same RNG seed
410            let mut rng1 = StdRng::seed_from_u64(seed);
411            let mut rng2 = StdRng::seed_from_u64(seed);
412
413            for _ in 0..10 {
414                let selection1 = scorer1.select_peer(&peers, &mut rng1);
415                let selection2 = scorer2.select_peer(&peers, &mut rng2);
416                assert_eq!(selection1, selection2);
417            }
418
419            Ok(())
420        });
421    }
422
423    // Property: All peers should be selectable (no peer gets zero probability)
424    #[test]
425    fn all_peers_selectable() {
426        arbtest(|u| {
427            let peer_count = u.int_in_range(2_usize..=6)?;
428            let results = arb_vec(u, arb_sync_result, 0..=20)?;
429
430            let peers: Vec<PeerId> = (0..peer_count).map(|_| PeerId::random()).collect();
431            let mut scorer = PeerScorer::default();
432
433            // Apply random updates
434            for (i, result) in results.iter().enumerate() {
435                let peer_id = peers[i % peers.len()];
436                scorer.update_score(peer_id, *result);
437            }
438
439            // Collect selections over many iterations
440            let mut rng = StdRng::seed_from_u64(42);
441            let mut selected_peers = HashSet::new();
442
443            for _ in 0..1000 {
444                if let Some(selected) = scorer.select_peer(&peers, &mut rng) {
445                    selected_peers.insert(selected);
446                }
447            }
448
449            // All peers should be selected at least once (with high probability)
450            // Allow for some statistical variation by requiring at least 80% of peers
451            let selection_ratio = selected_peers.len() as f64 / peers.len() as f64;
452            assert!(
453                selection_ratio >= 0.8,
454                "Only {}/{} peers were selected",
455                selected_peers.len(),
456                peers.len()
457            );
458
459            Ok(())
460        });
461    }
462
463    // Property: Higher scoring peers should be selected more frequently
464    #[test]
465    fn higher_scores_selected_more_frequently() {
466        arbtest(|u| {
467            let good_results = arb_vec(
468                u,
469                |u| {
470                    u.choose_iter([
471                        SyncResult::Success(Duration::from_millis(50)),
472                        SyncResult::Success(Duration::from_millis(100)),
473                    ])
474                },
475                5..=15,
476            )?;
477
478            let bad_results = arb_vec(
479                u,
480                |u| u.choose_iter([SyncResult::Timeout, SyncResult::Failure]),
481                5..=15,
482            )?;
483
484            let good_peer = PeerId::random();
485            let bad_peer = PeerId::random();
486            assert_ne!(good_peer, bad_peer, "Peers should be distinct");
487
488            let peers = vec![good_peer, bad_peer];
489
490            let mut scorer = PeerScorer::default();
491
492            // Give good peer good results
493            for result in good_results {
494                scorer.update_score(good_peer, result);
495            }
496
497            // Give bad peer bad results
498            for result in bad_results {
499                scorer.update_score(bad_peer, result);
500            }
501
502            let good_score = scorer.get_score(&good_peer);
503            let bad_score = scorer.get_score(&bad_peer);
504
505            // Only test if there's a meaningful difference in scores
506            if good_score > bad_score + 0.1 {
507                let mut rng = StdRng::seed_from_u64(123);
508                let mut good_selections = 0;
509                let mut bad_selections = 0;
510
511                for _ in 0..1000 {
512                    match scorer.select_peer(&peers, &mut rng) {
513                        Some(peer) if peer == good_peer => good_selections += 1,
514                        Some(peer) if peer == bad_peer => bad_selections += 1,
515                        _ => {}
516                    }
517                }
518
519                assert!(
520                    good_selections > bad_selections,
521                    "Good peer (score: {good_score}) selected {good_selections} times, bad peer (score: {bad_score}) selected {bad_selections} times"
522                );
523            }
524
525            Ok(())
526        });
527    }
528
529    // Property: Score updates should be monotonic for sequences of same result type
530    // Note: This test does not apply to EMA anymore, but it can be useful for testing other strategies that have more deterministic score changes.
531    #[test]
532    #[ignore]
533    fn monotonic_score_updates() {
534        arbtest(|u| {
535            let strategy = arb_strategy(u)?;
536            let result = arb_sync_result(u)?;
537            let update_count = u.int_in_range(1_usize..=20)?;
538
539            let mut scorer = PeerScorer::new(strategy);
540            let mut current_score = scorer.strategy.initial_score(PeerId::random());
541            let mut scores = vec![current_score];
542
543            println!(
544                "Testing monotonicity for result {:?} over {} updates, threshold={:?}",
545                result, update_count, strategy.slow_threshold
546            );
547
548            for _ in 0..update_count {
549                current_score = scorer.strategy.update_score(current_score, result);
550                scores.push(current_score);
551            }
552
553            // Check monotonicity based on result type
554            match result {
555                SyncResult::Success(rt) if rt < strategy.slow_threshold => {
556                    // For fast response, scores should increase
557                    for window in scores.windows(2) {
558                        let diff = window[1] - window[0];
559                        assert!(
560                            diff >= 0.0,
561                            "Fast response ({rt:?}) should improve score: {} -> {}",
562                            window[0],
563                            window[1]
564                        );
565                    }
566                }
567                SyncResult::Success(rt) => {
568                    // For slow responses, scores should decrease
569                    for window in scores.windows(2) {
570                        assert!(
571                            window[1] <= window[0],
572                            "Slow response ({rt:?}) should decrease score: {} -> {}",
573                            window[0],
574                            window[1]
575                        );
576                    }
577                }
578                SyncResult::Timeout | SyncResult::Failure => {
579                    // For failures, scores should decrease
580                    for window in scores.windows(2) {
581                        assert!(
582                            window[1] <= window[0],
583                            "Timeouts and failures should decrease score: {} -> {}",
584                            window[0],
585                            window[1]
586                        );
587                    }
588                }
589            }
590
591            Ok(())
592        });
593    }
594
595    // Property: Empty peer list should return None
596    #[test]
597    fn empty_peer_list_returns_none() {
598        arbtest(|u| {
599            let seed = u.arbitrary()?;
600
601            let scorer = PeerScorer::default();
602            let mut rng = StdRng::seed_from_u64(seed);
603            let result = scorer.select_peer(&[], &mut rng);
604            assert_eq!(result, None);
605            Ok(())
606        });
607    }
608
609    // Property: Single peer should always be selected
610    #[test]
611    fn single_peer_always_selected() {
612        arbtest(|u| {
613            let seed = u.arbitrary()?;
614            let results = arb_vec(u, arb_sync_result, 0..=10)?;
615
616            let peer = PeerId::random();
617            let peers = vec![peer];
618            let mut scorer = PeerScorer::default();
619
620            // Apply some updates
621            for result in results {
622                scorer.update_score(peer, result);
623            }
624
625            let mut rng = StdRng::seed_from_u64(seed);
626            for _ in 0..10 {
627                let selected = scorer.select_peer(&peers, &mut rng);
628                assert_eq!(selected, Some(peer));
629            }
630
631            Ok(())
632        });
633    }
634
635    // Property: Response time affects success score quality
636    #[test]
637    fn response_time_affects_success_score() {
638        arbtest(|u| {
639            let strategy = arb_strategy(u)?;
640            let fast_time = u.int_in_range(10_u64..=100)?;
641            let slow_time = u.int_in_range(1000_u64..=5000)?;
642
643            let mut scorer = PeerScorer::new(strategy);
644            let initial_score = scorer.strategy.initial_score(PeerId::random());
645
646            let fast_result = SyncResult::Success(Duration::from_millis(fast_time));
647            let slow_result = SyncResult::Success(Duration::from_millis(slow_time));
648
649            let fast_score = scorer.strategy.update_score(initial_score, fast_result);
650            let slow_score = scorer.strategy.update_score(initial_score, slow_result);
651
652            assert!(
653                fast_score >= slow_score,
654                "Fast response ({fast_time} ms) should score >= slow response ({slow_time} ms): {fast_score} vs {slow_score}"
655            );
656
657            Ok(())
658        });
659    }
660
661    // Property: Updating a peer's score does not affect other peers' scores
662    #[test]
663    fn updating_one_peer_does_not_affect_others() {
664        arbtest(|u| {
665            let strategy = arb_strategy(u)?;
666            let results = arb_vec(u, arb_sync_result, 0..=10)?;
667
668            let mut scorer = PeerScorer::new(strategy);
669            let peer1 = PeerId::random();
670            let peer2 = PeerId::random();
671
672            // Update peer1 with some results
673            for result in &results {
674                scorer.update_score(peer1, *result);
675            }
676
677            // Get initial score for peer2
678            let initial_score_peer2 = scorer.get_score(&peer2);
679
680            // Update peer1 again
681            for result in &results {
682                scorer.update_score(peer1, *result);
683            }
684
685            // Score for peer2 should remain unchanged
686            let final_score_peer2 = scorer.get_score(&peer2);
687            assert_eq!(initial_score_peer2, final_score_peer2);
688
689            Ok(())
690        });
691    }
692
693    // Property: Fast responses help a peer recover more quickly than timeouts penalize it
694    #[test]
695    fn fast_response_help_recover_score_quickly() {
696        arbtest(|u| {
697            let strategy = arb_strategy(u)?;
698            let response_time = arb_response_time_fast(u, strategy.slow_threshold)?;
699
700            let mut scorer = PeerScorer::new(strategy);
701            let peer_id = PeerId::random();
702
703            let initial_score = scorer.get_score(&peer_id);
704
705            // Apply a timeout
706            scorer.update_score(peer_id, SyncResult::Timeout);
707            let score_after_timeout = scorer.get_score(&peer_id);
708
709            // Score after success should be higher than after timeout
710            assert!(
711                score_after_timeout < initial_score,
712                "Score after timeout ({score_after_timeout}) should be lower than initial score ({initial_score})"
713            );
714
715            // Apply a success
716            scorer.update_score(peer_id, SyncResult::Success(response_time));
717            let score_after_success = scorer.get_score(&peer_id);
718
719            // Score after success should be higher than initial score
720            assert!(
721                score_after_success > initial_score,
722                "Score after success ({score_after_success}) should be greater than initial score ({initial_score})"
723            );
724
725            Ok(())
726        });
727    }
728
729    // Property: Pruning inactive peers resets their scores
730    #[test]
731    fn pruning_inactive_peers_resets_scores() {
732        arbtest(|u| {
733            let strategy = arb_strategy(u)?;
734            let mut scorer = PeerScorer::new(strategy);
735            let peer_id = PeerId::random();
736
737            // Update score for the peer
738            scorer.update_score(peer_id, SyncResult::Success(Duration::from_millis(100)));
739
740            // Ensure the peer is present
741            assert!(scorer.get_scores().contains_key(&peer_id));
742
743            // Prune inactive peers with a threshold that will remove this peer
744            scorer.reset_inactive_peers_scores(Duration::from_millis(0));
745
746            // Peer should be removed
747            assert!(!scorer.get_scores().contains_key(&peer_id));
748            assert_eq!(scorer.get_score(&peer_id), strategy.initial_score(peer_id));
749
750            Ok(())
751        });
752    }
753}