Skip to main content

sof_tx/
routing.rs

1//! Routing policy, target selection, and signature-level duplicate suppression.
2
3use std::{
4    collections::{HashMap, HashSet},
5    time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12/// Routing controls used for direct and hybrid submit paths.
13#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15    /// Number of upcoming leaders to include after current leader.
16    pub next_leaders: usize,
17    /// Number of static backup validators to include.
18    pub backup_validators: usize,
19    /// Maximum concurrent direct sends.
20    pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24    fn default() -> Self {
25        Self {
26            next_leaders: 2,
27            backup_validators: 1,
28            max_parallel_sends: 4,
29        }
30    }
31}
32
33impl RoutingPolicy {
34    /// Returns a normalized policy with bounded minimums.
35    #[must_use]
36    pub fn normalized(self) -> Self {
37        Self {
38            next_leaders: self.next_leaders,
39            backup_validators: self.backup_validators,
40            max_parallel_sends: self.max_parallel_sends.max(1),
41        }
42    }
43}
44
45/// Selects leader/backup targets in deterministic order.
46#[must_use]
47pub fn select_targets<P>(
48    leader_provider: &P,
49    backups: &[LeaderTarget],
50    policy: RoutingPolicy,
51) -> Vec<LeaderTarget>
52where
53    P: LeaderProvider + ?Sized,
54{
55    let policy = policy.normalized();
56    let dynamic_backup_count = if backups.is_empty() {
57        policy.backup_validators
58    } else {
59        0
60    };
61    let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
62    let estimated_targets = 1_usize
63        .saturating_add(requested_next)
64        .saturating_add(policy.backup_validators);
65    let mut seen = HashSet::with_capacity(estimated_targets);
66    let mut selected = Vec::with_capacity(estimated_targets);
67
68    if let Some(current) = leader_provider.current_leader()
69        && seen.insert(current.tpu_addr)
70    {
71        selected.push(current);
72    }
73
74    for target in leader_provider.next_leaders(requested_next) {
75        if seen.insert(target.tpu_addr) {
76            selected.push(target);
77        }
78    }
79
80    for target in backups.iter().take(policy.backup_validators) {
81        if seen.insert(target.tpu_addr) {
82            selected.push(target.clone());
83        }
84    }
85
86    selected
87}
88
89/// Deduplicates transaction signatures for a bounded time window.
90#[derive(Debug)]
91pub struct SignatureDeduper {
92    /// Time-to-live for seen signatures.
93    ttl: Duration,
94    /// Last seen timestamps by signature.
95    seen: HashMap<Signature, Instant>,
96}
97
98impl SignatureDeduper {
99    /// Creates a dedupe window with a minimum TTL of one millisecond.
100    #[must_use]
101    pub fn new(ttl: Duration) -> Self {
102        Self {
103            ttl: ttl.max(Duration::from_millis(1)),
104            seen: HashMap::new(),
105        }
106    }
107
108    /// Returns true when signature is new (and records it), false when duplicate.
109    pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
110        self.evict_expired(now);
111        if self.seen.contains_key(&signature) {
112            return false;
113        }
114        let _ = self.seen.insert(signature, now);
115        true
116    }
117
118    /// Returns number of signatures currently tracked.
119    #[must_use]
120    pub fn len(&self) -> usize {
121        self.seen.len()
122    }
123
124    /// Returns true when no signatures are currently tracked.
125    #[must_use]
126    pub fn is_empty(&self) -> bool {
127        self.seen.is_empty()
128    }
129
130    /// Removes all expired signature entries.
131    fn evict_expired(&mut self, now: Instant) {
132        let ttl = self.ttl;
133        self.seen
134            .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::providers::{LeaderTarget, StaticLeaderProvider};
142
143    fn target(port: u16) -> LeaderTarget {
144        LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
145    }
146
147    #[test]
148    fn select_targets_prefers_current_next_then_backups() {
149        let provider =
150            StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
151        let backups = vec![target(9004), target(9005)];
152        let selected = select_targets(
153            &provider,
154            &backups,
155            RoutingPolicy {
156                next_leaders: 2,
157                backup_validators: 1,
158                max_parallel_sends: 8,
159            },
160        );
161        assert_eq!(
162            selected,
163            vec![target(9001), target(9002), target(9003), target(9004)]
164        );
165    }
166
167    #[test]
168    fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
169        let provider = StaticLeaderProvider::new(
170            Some(target(9010)),
171            vec![target(9011), target(9012), target(9013), target(9014)],
172        );
173        let selected = select_targets(
174            &provider,
175            &[],
176            RoutingPolicy {
177                next_leaders: 2,
178                backup_validators: 2,
179                max_parallel_sends: 8,
180            },
181        );
182        assert_eq!(
183            selected,
184            vec![
185                target(9010),
186                target(9011),
187                target(9012),
188                target(9013),
189                target(9014)
190            ]
191        );
192    }
193
194    #[test]
195    fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
196        let signature = Signature::from([7_u8; 64]);
197        let now = Instant::now();
198        let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
199        assert!(deduper.check_and_insert(signature, now));
200        assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
201        assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
202    }
203}