1use std::{
4 collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5 time::{Duration, Instant},
6};
7
8use sof_types::SignatureBytes;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12const INITIAL_SIGNATURE_DEDUPER_CAPACITY: usize = 4_096;
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
17pub struct RoutingPolicy {
18 pub next_leaders: usize,
20 pub backup_validators: usize,
22 pub max_parallel_sends: usize,
24}
25
26impl Default for RoutingPolicy {
27 fn default() -> Self {
28 Self {
29 next_leaders: 2,
30 backup_validators: 1,
31 max_parallel_sends: 4,
32 }
33 }
34}
35
36impl RoutingPolicy {
37 #[must_use]
39 pub fn normalized(self) -> Self {
40 Self {
41 next_leaders: self.next_leaders,
42 backup_validators: self.backup_validators,
43 max_parallel_sends: self.max_parallel_sends.max(1),
44 }
45 }
46}
47
48#[must_use]
50pub fn select_targets<P>(
51 leader_provider: &P,
52 backups: &[LeaderTarget],
53 policy: RoutingPolicy,
54) -> Vec<LeaderTarget>
55where
56 P: LeaderProvider + ?Sized,
57{
58 let policy = policy.normalized();
59 let dynamic_backup_count = if backups.is_empty() {
60 policy.backup_validators
61 } else {
62 0
63 };
64 let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
65 let estimated_targets = 1_usize
66 .saturating_add(requested_next)
67 .saturating_add(policy.backup_validators);
68 let mut seen = HashSet::with_capacity(estimated_targets);
69 let mut selected = Vec::with_capacity(estimated_targets);
70
71 if let Some(current) = leader_provider.current_leader()
72 && seen.insert(current.tpu_addr)
73 {
74 selected.push(current);
75 }
76
77 for target in leader_provider.next_leaders(requested_next) {
78 if seen.insert(target.tpu_addr) {
79 selected.push(target);
80 }
81 }
82
83 for target in backups.iter().take(policy.backup_validators) {
84 if seen.insert(target.tpu_addr) {
85 selected.push(target.clone());
86 }
87 }
88
89 selected
90}
91
92#[derive(Debug)]
94pub struct SignatureDeduper {
95 ttl: Duration,
97 seen: HashMap<SignatureBytes, Instant>,
99 order: VecDeque<(SignatureBytes, Instant)>,
101}
102
103impl SignatureDeduper {
104 #[must_use]
106 pub fn new(ttl: Duration) -> Self {
107 Self {
108 ttl: ttl.max(Duration::from_millis(1)),
109 seen: HashMap::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
110 order: VecDeque::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
111 }
112 }
113
114 pub fn check_and_insert(&mut self, signature: SignatureBytes, now: Instant) -> bool {
116 self.evict_expired(now);
117 match self.seen.entry(signature) {
118 Entry::Occupied(_) => false,
119 Entry::Vacant(entry) => {
120 entry.insert(now);
121 self.order.push_back((signature, now));
122 true
123 }
124 }
125 }
126
127 #[must_use]
129 pub fn len(&self) -> usize {
130 self.seen.len()
131 }
132
133 #[must_use]
135 pub fn is_empty(&self) -> bool {
136 self.seen.is_empty()
137 }
138
139 fn evict_expired(&mut self, now: Instant) {
141 while let Some((signature, first_seen)) = self.order.front().copied() {
142 if now.saturating_duration_since(first_seen) < self.ttl {
143 break;
144 }
145 self.order.pop_front();
146 if self.seen.get(&signature).copied() == Some(first_seen) {
147 let _ = self.seen.remove(&signature);
148 }
149 }
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use sof_support::{bench::avg_ns_per_iteration, env_support::read_positive_usize};
156
157 use super::*;
158 use crate::providers::{LeaderTarget, StaticLeaderProvider};
159
160 fn target(port: u16) -> LeaderTarget {
161 LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
162 }
163
164 #[test]
165 fn select_targets_prefers_current_next_then_backups() {
166 let provider =
167 StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
168 let backups = vec![target(9004), target(9005)];
169 let selected = select_targets(
170 &provider,
171 &backups,
172 RoutingPolicy {
173 next_leaders: 2,
174 backup_validators: 1,
175 max_parallel_sends: 8,
176 },
177 );
178 assert_eq!(
179 selected,
180 vec![target(9001), target(9002), target(9003), target(9004)]
181 );
182 }
183
184 #[test]
185 fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
186 let provider = StaticLeaderProvider::new(
187 Some(target(9010)),
188 vec![target(9011), target(9012), target(9013), target(9014)],
189 );
190 let selected = select_targets(
191 &provider,
192 &[],
193 RoutingPolicy {
194 next_leaders: 2,
195 backup_validators: 2,
196 max_parallel_sends: 8,
197 },
198 );
199 assert_eq!(
200 selected,
201 vec![
202 target(9010),
203 target(9011),
204 target(9012),
205 target(9013),
206 target(9014)
207 ]
208 );
209 }
210
211 #[test]
212 fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
213 let signature = SignatureBytes::from([7_u8; 64]);
214 let now = Instant::now();
215 let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
216 assert!(deduper.check_and_insert(signature, now));
217 assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
218 assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
219 }
220
221 #[test]
222 #[ignore = "profiling fixture for signature dedupe churn"]
223 fn signature_deduper_profile_fixture() {
224 let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
225 let ttl_ms = u64::try_from(read_positive_usize(
226 "SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
227 10_000,
228 ))
229 .unwrap_or(10_000);
230 let mut deduper = SignatureDeduper::new(Duration::from_millis(ttl_ms));
231 let start = Instant::now();
232 let now = Instant::now();
233
234 for index in 0..iterations {
235 let mut signature = [0_u8; 64];
236 signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
237 assert!(deduper.check_and_insert(
238 SignatureBytes::from(signature),
239 now + Duration::from_nanos(index as u64)
240 ));
241 }
242
243 let elapsed = start.elapsed();
244 let avg_ns = avg_ns_per_iteration(elapsed, iterations);
245 println!(
246 "signature_deduper_profile_fixture iterations={} ttl_ms={} entries={} elapsed_us={} avg_ns_per_iteration={} avg_us_per_iteration={:.3}",
247 iterations,
248 ttl_ms,
249 deduper.len(),
250 elapsed.as_micros(),
251 avg_ns,
252 avg_ns as f64 / 1_000.0
253 );
254 }
255
256 #[test]
257 #[ignore = "profiling fixture for signature deduper allocation churn"]
258 fn signature_deduper_allocation_profile_fixture() {
259 let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
260 let ttl_ms = u64::try_from(read_positive_usize(
261 "SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
262 10_000,
263 ))
264 .unwrap_or(10_000);
265 let ttl = Duration::from_millis(ttl_ms);
266 let mut baseline = signature_deduper_baseline(ttl);
267 let mut optimized = SignatureDeduper::new(ttl);
268 let now = Instant::now();
269
270 let baseline_started = Instant::now();
271 for index in 0..iterations {
272 let signature = make_signature(index);
273 assert!(baseline.check_and_insert(
274 SignatureBytes::from(signature),
275 now + Duration::from_nanos(index as u64)
276 ));
277 }
278 let baseline_elapsed = baseline_started.elapsed();
279
280 let optimized_started = Instant::now();
281 for index in 0..iterations {
282 let signature = make_signature(index);
283 assert!(optimized.check_and_insert(
284 SignatureBytes::from(signature),
285 now + Duration::from_nanos(index as u64)
286 ));
287 }
288 let optimized_elapsed = optimized_started.elapsed();
289
290 println!(
291 "signature_deduper_allocation_profile_fixture iterations={} baseline_us={} optimized_us={}",
292 iterations,
293 baseline_elapsed.as_micros(),
294 optimized_elapsed.as_micros(),
295 );
296 }
297
298 fn signature_deduper_baseline(ttl: Duration) -> SignatureDeduper {
299 SignatureDeduper {
300 ttl: ttl.max(Duration::from_millis(1)),
301 seen: HashMap::new(),
302 order: VecDeque::new(),
303 }
304 }
305
306 fn make_signature(index: usize) -> [u8; 64] {
307 let mut signature = [0_u8; 64];
308 signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
309 signature
310 }
311}