Skip to main content

sof_tx/
routing.rs

1//! Routing policy, target selection, and signature-level duplicate suppression.
2
3use std::{
4    collections::{HashMap, HashSet},
5    time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12/// Routing controls used for direct and hybrid submit paths.
13#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15    /// Number of upcoming leaders to include after current leader.
16    pub next_leaders: usize,
17    /// Number of static backup validators to include.
18    pub backup_validators: usize,
19    /// Maximum concurrent direct sends.
20    pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24    fn default() -> Self {
25        Self {
26            next_leaders: 2,
27            backup_validators: 1,
28            max_parallel_sends: 4,
29        }
30    }
31}
32
33impl RoutingPolicy {
34    /// Returns a normalized policy with bounded minimums.
35    #[must_use]
36    pub fn normalized(self) -> Self {
37        Self {
38            next_leaders: self.next_leaders,
39            backup_validators: self.backup_validators,
40            max_parallel_sends: self.max_parallel_sends.max(1),
41        }
42    }
43}
44
45/// Selects leader/backup targets in deterministic order.
46#[must_use]
47pub fn select_targets(
48    leader_provider: &dyn LeaderProvider,
49    backups: &[LeaderTarget],
50    policy: RoutingPolicy,
51) -> Vec<LeaderTarget> {
52    let policy = policy.normalized();
53    let mut seen = HashSet::new();
54    let mut selected = Vec::new();
55
56    if let Some(current) = leader_provider.current_leader()
57        && seen.insert(current.tpu_addr)
58    {
59        selected.push(current);
60    }
61
62    for target in leader_provider.next_leaders(policy.next_leaders) {
63        if seen.insert(target.tpu_addr) {
64            selected.push(target);
65        }
66    }
67
68    for target in backups.iter().take(policy.backup_validators) {
69        if seen.insert(target.tpu_addr) {
70            selected.push(target.clone());
71        }
72    }
73
74    selected
75}
76
77/// Deduplicates transaction signatures for a bounded time window.
78#[derive(Debug)]
79pub struct SignatureDeduper {
80    /// Time-to-live for seen signatures.
81    ttl: Duration,
82    /// Last seen timestamps by signature.
83    seen: HashMap<Signature, Instant>,
84}
85
86impl SignatureDeduper {
87    /// Creates a dedupe window with a minimum TTL of one millisecond.
88    #[must_use]
89    pub fn new(ttl: Duration) -> Self {
90        Self {
91            ttl: ttl.max(Duration::from_millis(1)),
92            seen: HashMap::new(),
93        }
94    }
95
96    /// Returns true when signature is new (and records it), false when duplicate.
97    pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
98        self.evict_expired(now);
99        if self.seen.contains_key(&signature) {
100            return false;
101        }
102        let _ = self.seen.insert(signature, now);
103        true
104    }
105
106    /// Returns number of signatures currently tracked.
107    #[must_use]
108    pub fn len(&self) -> usize {
109        self.seen.len()
110    }
111
112    /// Returns true when no signatures are currently tracked.
113    #[must_use]
114    pub fn is_empty(&self) -> bool {
115        self.seen.is_empty()
116    }
117
118    /// Removes all expired signature entries.
119    fn evict_expired(&mut self, now: Instant) {
120        let ttl = self.ttl;
121        self.seen
122            .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use crate::providers::{LeaderTarget, StaticLeaderProvider};
130
131    fn target(port: u16) -> LeaderTarget {
132        LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
133    }
134
135    #[test]
136    fn select_targets_prefers_current_next_then_backups() {
137        let provider =
138            StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
139        let backups = vec![target(9004), target(9005)];
140        let selected = select_targets(
141            &provider,
142            &backups,
143            RoutingPolicy {
144                next_leaders: 2,
145                backup_validators: 1,
146                max_parallel_sends: 8,
147            },
148        );
149        assert_eq!(
150            selected,
151            vec![target(9001), target(9002), target(9003), target(9004)]
152        );
153    }
154
155    #[test]
156    fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
157        let signature = Signature::from([7_u8; 64]);
158        let now = Instant::now();
159        let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
160        assert!(deduper.check_and_insert(signature, now));
161        assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
162        assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
163    }
164}