1use std::{
4 collections::{HashMap, HashSet},
5 time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15 pub next_leaders: usize,
17 pub backup_validators: usize,
19 pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24 fn default() -> Self {
25 Self {
26 next_leaders: 2,
27 backup_validators: 1,
28 max_parallel_sends: 4,
29 }
30 }
31}
32
33impl RoutingPolicy {
34 #[must_use]
36 pub fn normalized(self) -> Self {
37 Self {
38 next_leaders: self.next_leaders,
39 backup_validators: self.backup_validators,
40 max_parallel_sends: self.max_parallel_sends.max(1),
41 }
42 }
43}
44
45#[must_use]
47pub fn select_targets(
48 leader_provider: &dyn LeaderProvider,
49 backups: &[LeaderTarget],
50 policy: RoutingPolicy,
51) -> Vec<LeaderTarget> {
52 let policy = policy.normalized();
53 let mut seen = HashSet::new();
54 let mut selected = Vec::new();
55
56 if let Some(current) = leader_provider.current_leader()
57 && seen.insert(current.tpu_addr)
58 {
59 selected.push(current);
60 }
61
62 for target in leader_provider.next_leaders(policy.next_leaders) {
63 if seen.insert(target.tpu_addr) {
64 selected.push(target);
65 }
66 }
67
68 for target in backups.iter().take(policy.backup_validators) {
69 if seen.insert(target.tpu_addr) {
70 selected.push(target.clone());
71 }
72 }
73
74 selected
75}
76
77#[derive(Debug)]
79pub struct SignatureDeduper {
80 ttl: Duration,
82 seen: HashMap<Signature, Instant>,
84}
85
86impl SignatureDeduper {
87 #[must_use]
89 pub fn new(ttl: Duration) -> Self {
90 Self {
91 ttl: ttl.max(Duration::from_millis(1)),
92 seen: HashMap::new(),
93 }
94 }
95
96 pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
98 self.evict_expired(now);
99 if self.seen.contains_key(&signature) {
100 return false;
101 }
102 let _ = self.seen.insert(signature, now);
103 true
104 }
105
106 #[must_use]
108 pub fn len(&self) -> usize {
109 self.seen.len()
110 }
111
112 #[must_use]
114 pub fn is_empty(&self) -> bool {
115 self.seen.is_empty()
116 }
117
118 fn evict_expired(&mut self, now: Instant) {
120 let ttl = self.ttl;
121 self.seen
122 .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use crate::providers::{LeaderTarget, StaticLeaderProvider};
130
131 fn target(port: u16) -> LeaderTarget {
132 LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
133 }
134
135 #[test]
136 fn select_targets_prefers_current_next_then_backups() {
137 let provider =
138 StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
139 let backups = vec![target(9004), target(9005)];
140 let selected = select_targets(
141 &provider,
142 &backups,
143 RoutingPolicy {
144 next_leaders: 2,
145 backup_validators: 1,
146 max_parallel_sends: 8,
147 },
148 );
149 assert_eq!(
150 selected,
151 vec![target(9001), target(9002), target(9003), target(9004)]
152 );
153 }
154
155 #[test]
156 fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
157 let signature = Signature::from([7_u8; 64]);
158 let now = Instant::now();
159 let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
160 assert!(deduper.check_and_insert(signature, now));
161 assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
162 assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
163 }
164}