Skip to main content

sof_tx/
routing.rs

1//! Routing policy, target selection, and signature-level duplicate suppression.
2
3use std::{
4    collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5    time::{Duration, Instant},
6};
7
8use sof_types::SignatureBytes;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12/// Initial storage reserved for the signature dedupe window before it grows.
13const INITIAL_SIGNATURE_DEDUPER_CAPACITY: usize = 4_096;
14
15/// Routing controls used for direct and hybrid submit paths.
16#[derive(Debug, Clone, Copy, Eq, PartialEq)]
17pub struct RoutingPolicy {
18    /// Number of upcoming leaders to include after current leader.
19    pub next_leaders: usize,
20    /// Number of static backup validators to include.
21    pub backup_validators: usize,
22    /// Maximum concurrent direct sends.
23    pub max_parallel_sends: usize,
24}
25
26impl Default for RoutingPolicy {
27    fn default() -> Self {
28        Self {
29            next_leaders: 2,
30            backup_validators: 1,
31            max_parallel_sends: 4,
32        }
33    }
34}
35
36impl RoutingPolicy {
37    /// Returns a normalized policy with bounded minimums.
38    #[must_use]
39    pub fn normalized(self) -> Self {
40        Self {
41            next_leaders: self.next_leaders,
42            backup_validators: self.backup_validators,
43            max_parallel_sends: self.max_parallel_sends.max(1),
44        }
45    }
46}
47
48/// Selects leader/backup targets in deterministic order.
49#[must_use]
50pub fn select_targets<P>(
51    leader_provider: &P,
52    backups: &[LeaderTarget],
53    policy: RoutingPolicy,
54) -> Vec<LeaderTarget>
55where
56    P: LeaderProvider + ?Sized,
57{
58    let policy = policy.normalized();
59    let dynamic_backup_count = if backups.is_empty() {
60        policy.backup_validators
61    } else {
62        0
63    };
64    let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
65    let estimated_targets = 1_usize
66        .saturating_add(requested_next)
67        .saturating_add(policy.backup_validators);
68    let mut seen = HashSet::with_capacity(estimated_targets);
69    let mut selected = Vec::with_capacity(estimated_targets);
70
71    if let Some(current) = leader_provider.current_leader()
72        && seen.insert(current.tpu_addr)
73    {
74        selected.push(current);
75    }
76
77    for target in leader_provider.next_leaders(requested_next) {
78        if seen.insert(target.tpu_addr) {
79            selected.push(target);
80        }
81    }
82
83    for target in backups.iter().take(policy.backup_validators) {
84        if seen.insert(target.tpu_addr) {
85            selected.push(target.clone());
86        }
87    }
88
89    selected
90}
91
92/// Deduplicates transaction signatures for a bounded time window.
93#[derive(Debug)]
94pub struct SignatureDeduper {
95    /// Time-to-live for seen signatures.
96    ttl: Duration,
97    /// Last seen timestamps by signature.
98    seen: HashMap<SignatureBytes, Instant>,
99    /// Arrival order for bounded eviction without rescanning the whole map.
100    order: VecDeque<(SignatureBytes, Instant)>,
101}
102
103impl SignatureDeduper {
104    /// Creates a dedupe window with a minimum TTL of one millisecond.
105    #[must_use]
106    pub fn new(ttl: Duration) -> Self {
107        Self {
108            ttl: ttl.max(Duration::from_millis(1)),
109            seen: HashMap::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
110            order: VecDeque::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
111        }
112    }
113
114    /// Returns true when signature is new (and records it), false when duplicate.
115    pub fn check_and_insert(&mut self, signature: SignatureBytes, now: Instant) -> bool {
116        self.evict_expired(now);
117        match self.seen.entry(signature) {
118            Entry::Occupied(_) => false,
119            Entry::Vacant(entry) => {
120                entry.insert(now);
121                self.order.push_back((signature, now));
122                true
123            }
124        }
125    }
126
127    /// Returns number of signatures currently tracked.
128    #[must_use]
129    pub fn len(&self) -> usize {
130        self.seen.len()
131    }
132
133    /// Returns true when no signatures are currently tracked.
134    #[must_use]
135    pub fn is_empty(&self) -> bool {
136        self.seen.is_empty()
137    }
138
139    /// Removes all expired signature entries.
140    fn evict_expired(&mut self, now: Instant) {
141        while let Some((signature, first_seen)) = self.order.front().copied() {
142            if now.saturating_duration_since(first_seen) < self.ttl {
143                break;
144            }
145            self.order.pop_front();
146            if self.seen.get(&signature).copied() == Some(first_seen) {
147                let _ = self.seen.remove(&signature);
148            }
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use sof_support::{bench::avg_ns_per_iteration, env_support::read_positive_usize};
156
157    use super::*;
158    use crate::providers::{LeaderTarget, StaticLeaderProvider};
159
160    fn target(port: u16) -> LeaderTarget {
161        LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
162    }
163
164    #[test]
165    fn select_targets_prefers_current_next_then_backups() {
166        let provider =
167            StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
168        let backups = vec![target(9004), target(9005)];
169        let selected = select_targets(
170            &provider,
171            &backups,
172            RoutingPolicy {
173                next_leaders: 2,
174                backup_validators: 1,
175                max_parallel_sends: 8,
176            },
177        );
178        assert_eq!(
179            selected,
180            vec![target(9001), target(9002), target(9003), target(9004)]
181        );
182    }
183
184    #[test]
185    fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
186        let provider = StaticLeaderProvider::new(
187            Some(target(9010)),
188            vec![target(9011), target(9012), target(9013), target(9014)],
189        );
190        let selected = select_targets(
191            &provider,
192            &[],
193            RoutingPolicy {
194                next_leaders: 2,
195                backup_validators: 2,
196                max_parallel_sends: 8,
197            },
198        );
199        assert_eq!(
200            selected,
201            vec![
202                target(9010),
203                target(9011),
204                target(9012),
205                target(9013),
206                target(9014)
207            ]
208        );
209    }
210
211    #[test]
212    fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
213        let signature = SignatureBytes::from([7_u8; 64]);
214        let now = Instant::now();
215        let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
216        assert!(deduper.check_and_insert(signature, now));
217        assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
218        assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
219    }
220
221    #[test]
222    #[ignore = "profiling fixture for signature dedupe churn"]
223    fn signature_deduper_profile_fixture() {
224        let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
225        let ttl_ms = u64::try_from(read_positive_usize(
226            "SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
227            10_000,
228        ))
229        .unwrap_or(10_000);
230        let mut deduper = SignatureDeduper::new(Duration::from_millis(ttl_ms));
231        let start = Instant::now();
232        let now = Instant::now();
233
234        for index in 0..iterations {
235            let mut signature = [0_u8; 64];
236            signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
237            assert!(deduper.check_and_insert(
238                SignatureBytes::from(signature),
239                now + Duration::from_nanos(index as u64)
240            ));
241        }
242
243        let elapsed = start.elapsed();
244        let avg_ns = avg_ns_per_iteration(elapsed, iterations);
245        println!(
246            "signature_deduper_profile_fixture iterations={} ttl_ms={} entries={} elapsed_us={} avg_ns_per_iteration={} avg_us_per_iteration={:.3}",
247            iterations,
248            ttl_ms,
249            deduper.len(),
250            elapsed.as_micros(),
251            avg_ns,
252            avg_ns as f64 / 1_000.0
253        );
254    }
255
256    #[test]
257    #[ignore = "profiling fixture for signature deduper allocation churn"]
258    fn signature_deduper_allocation_profile_fixture() {
259        let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
260        let ttl_ms = u64::try_from(read_positive_usize(
261            "SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
262            10_000,
263        ))
264        .unwrap_or(10_000);
265        let ttl = Duration::from_millis(ttl_ms);
266        let mut baseline = signature_deduper_baseline(ttl);
267        let mut optimized = SignatureDeduper::new(ttl);
268        let now = Instant::now();
269
270        let baseline_started = Instant::now();
271        for index in 0..iterations {
272            let signature = make_signature(index);
273            assert!(baseline.check_and_insert(
274                SignatureBytes::from(signature),
275                now + Duration::from_nanos(index as u64)
276            ));
277        }
278        let baseline_elapsed = baseline_started.elapsed();
279
280        let optimized_started = Instant::now();
281        for index in 0..iterations {
282            let signature = make_signature(index);
283            assert!(optimized.check_and_insert(
284                SignatureBytes::from(signature),
285                now + Duration::from_nanos(index as u64)
286            ));
287        }
288        let optimized_elapsed = optimized_started.elapsed();
289
290        println!(
291            "signature_deduper_allocation_profile_fixture iterations={} baseline_us={} optimized_us={}",
292            iterations,
293            baseline_elapsed.as_micros(),
294            optimized_elapsed.as_micros(),
295        );
296    }
297
298    fn signature_deduper_baseline(ttl: Duration) -> SignatureDeduper {
299        SignatureDeduper {
300            ttl: ttl.max(Duration::from_millis(1)),
301            seen: HashMap::new(),
302            order: VecDeque::new(),
303        }
304    }
305
306    fn make_signature(index: usize) -> [u8; 64] {
307        let mut signature = [0_u8; 64];
308        signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
309        signature
310    }
311}