1use std::{
4 collections::{HashMap, HashSet},
5 time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15 pub next_leaders: usize,
17 pub backup_validators: usize,
19 pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24 fn default() -> Self {
25 Self {
26 next_leaders: 2,
27 backup_validators: 1,
28 max_parallel_sends: 4,
29 }
30 }
31}
32
33impl RoutingPolicy {
34 #[must_use]
36 pub fn normalized(self) -> Self {
37 Self {
38 next_leaders: self.next_leaders,
39 backup_validators: self.backup_validators,
40 max_parallel_sends: self.max_parallel_sends.max(1),
41 }
42 }
43}
44
45#[must_use]
47pub fn select_targets(
48 leader_provider: &dyn LeaderProvider,
49 backups: &[LeaderTarget],
50 policy: RoutingPolicy,
51) -> Vec<LeaderTarget> {
52 let policy = policy.normalized();
53 let mut seen = HashSet::new();
54 let mut selected = Vec::new();
55 let dynamic_backup_count = if backups.is_empty() {
56 policy.backup_validators
57 } else {
58 0
59 };
60
61 if let Some(current) = leader_provider.current_leader()
62 && seen.insert(current.tpu_addr)
63 {
64 selected.push(current);
65 }
66
67 let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
68 for target in leader_provider.next_leaders(requested_next) {
69 if seen.insert(target.tpu_addr) {
70 selected.push(target);
71 }
72 }
73
74 for target in backups.iter().take(policy.backup_validators) {
75 if seen.insert(target.tpu_addr) {
76 selected.push(target.clone());
77 }
78 }
79
80 selected
81}
82
83#[derive(Debug)]
85pub struct SignatureDeduper {
86 ttl: Duration,
88 seen: HashMap<Signature, Instant>,
90}
91
92impl SignatureDeduper {
93 #[must_use]
95 pub fn new(ttl: Duration) -> Self {
96 Self {
97 ttl: ttl.max(Duration::from_millis(1)),
98 seen: HashMap::new(),
99 }
100 }
101
102 pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
104 self.evict_expired(now);
105 if self.seen.contains_key(&signature) {
106 return false;
107 }
108 let _ = self.seen.insert(signature, now);
109 true
110 }
111
112 #[must_use]
114 pub fn len(&self) -> usize {
115 self.seen.len()
116 }
117
118 #[must_use]
120 pub fn is_empty(&self) -> bool {
121 self.seen.is_empty()
122 }
123
124 fn evict_expired(&mut self, now: Instant) {
126 let ttl = self.ttl;
127 self.seen
128 .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::providers::{LeaderTarget, StaticLeaderProvider};
136
137 fn target(port: u16) -> LeaderTarget {
138 LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
139 }
140
141 #[test]
142 fn select_targets_prefers_current_next_then_backups() {
143 let provider =
144 StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
145 let backups = vec![target(9004), target(9005)];
146 let selected = select_targets(
147 &provider,
148 &backups,
149 RoutingPolicy {
150 next_leaders: 2,
151 backup_validators: 1,
152 max_parallel_sends: 8,
153 },
154 );
155 assert_eq!(
156 selected,
157 vec![target(9001), target(9002), target(9003), target(9004)]
158 );
159 }
160
161 #[test]
162 fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
163 let provider = StaticLeaderProvider::new(
164 Some(target(9010)),
165 vec![target(9011), target(9012), target(9013), target(9014)],
166 );
167 let selected = select_targets(
168 &provider,
169 &[],
170 RoutingPolicy {
171 next_leaders: 2,
172 backup_validators: 2,
173 max_parallel_sends: 8,
174 },
175 );
176 assert_eq!(
177 selected,
178 vec![
179 target(9010),
180 target(9011),
181 target(9012),
182 target(9013),
183 target(9014)
184 ]
185 );
186 }
187
188 #[test]
189 fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
190 let signature = Signature::from([7_u8; 64]);
191 let now = Instant::now();
192 let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
193 assert!(deduper.check_and_insert(signature, now));
194 assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
195 assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
196 }
197}