1use std::{
4 collections::{HashMap, HashSet},
5 time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15 pub next_leaders: usize,
17 pub backup_validators: usize,
19 pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24 fn default() -> Self {
25 Self {
26 next_leaders: 2,
27 backup_validators: 1,
28 max_parallel_sends: 4,
29 }
30 }
31}
32
33impl RoutingPolicy {
34 #[must_use]
36 pub fn normalized(self) -> Self {
37 Self {
38 next_leaders: self.next_leaders,
39 backup_validators: self.backup_validators,
40 max_parallel_sends: self.max_parallel_sends.max(1),
41 }
42 }
43}
44
45#[must_use]
47pub fn select_targets<P>(
48 leader_provider: &P,
49 backups: &[LeaderTarget],
50 policy: RoutingPolicy,
51) -> Vec<LeaderTarget>
52where
53 P: LeaderProvider + ?Sized,
54{
55 let policy = policy.normalized();
56 let dynamic_backup_count = if backups.is_empty() {
57 policy.backup_validators
58 } else {
59 0
60 };
61 let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
62 let estimated_targets = 1_usize
63 .saturating_add(requested_next)
64 .saturating_add(policy.backup_validators);
65 let mut seen = HashSet::with_capacity(estimated_targets);
66 let mut selected = Vec::with_capacity(estimated_targets);
67
68 if let Some(current) = leader_provider.current_leader()
69 && seen.insert(current.tpu_addr)
70 {
71 selected.push(current);
72 }
73
74 for target in leader_provider.next_leaders(requested_next) {
75 if seen.insert(target.tpu_addr) {
76 selected.push(target);
77 }
78 }
79
80 for target in backups.iter().take(policy.backup_validators) {
81 if seen.insert(target.tpu_addr) {
82 selected.push(target.clone());
83 }
84 }
85
86 selected
87}
88
89#[derive(Debug)]
91pub struct SignatureDeduper {
92 ttl: Duration,
94 seen: HashMap<Signature, Instant>,
96}
97
98impl SignatureDeduper {
99 #[must_use]
101 pub fn new(ttl: Duration) -> Self {
102 Self {
103 ttl: ttl.max(Duration::from_millis(1)),
104 seen: HashMap::new(),
105 }
106 }
107
108 pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
110 self.evict_expired(now);
111 if self.seen.contains_key(&signature) {
112 return false;
113 }
114 let _ = self.seen.insert(signature, now);
115 true
116 }
117
118 #[must_use]
120 pub fn len(&self) -> usize {
121 self.seen.len()
122 }
123
124 #[must_use]
126 pub fn is_empty(&self) -> bool {
127 self.seen.is_empty()
128 }
129
130 fn evict_expired(&mut self, now: Instant) {
132 let ttl = self.ttl;
133 self.seen
134 .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use crate::providers::{LeaderTarget, StaticLeaderProvider};
142
143 fn target(port: u16) -> LeaderTarget {
144 LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
145 }
146
147 #[test]
148 fn select_targets_prefers_current_next_then_backups() {
149 let provider =
150 StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
151 let backups = vec![target(9004), target(9005)];
152 let selected = select_targets(
153 &provider,
154 &backups,
155 RoutingPolicy {
156 next_leaders: 2,
157 backup_validators: 1,
158 max_parallel_sends: 8,
159 },
160 );
161 assert_eq!(
162 selected,
163 vec![target(9001), target(9002), target(9003), target(9004)]
164 );
165 }
166
167 #[test]
168 fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
169 let provider = StaticLeaderProvider::new(
170 Some(target(9010)),
171 vec![target(9011), target(9012), target(9013), target(9014)],
172 );
173 let selected = select_targets(
174 &provider,
175 &[],
176 RoutingPolicy {
177 next_leaders: 2,
178 backup_validators: 2,
179 max_parallel_sends: 8,
180 },
181 );
182 assert_eq!(
183 selected,
184 vec![
185 target(9010),
186 target(9011),
187 target(9012),
188 target(9013),
189 target(9014)
190 ]
191 );
192 }
193
194 #[test]
195 fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
196 let signature = Signature::from([7_u8; 64]);
197 let now = Instant::now();
198 let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
199 assert!(deduper.check_and_insert(signature, now));
200 assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
201 assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
202 }
203}