Skip to main content

sof_tx/
routing.rs

1//! Routing policy, target selection, and signature-level duplicate suppression.
2
3use std::{
4    collections::{HashMap, HashSet},
5    time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9
10use crate::providers::{LeaderProvider, LeaderTarget};
11
12/// Routing controls used for direct and hybrid submit paths.
13#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub struct RoutingPolicy {
15    /// Number of upcoming leaders to include after current leader.
16    pub next_leaders: usize,
17    /// Number of static backup validators to include.
18    pub backup_validators: usize,
19    /// Maximum concurrent direct sends.
20    pub max_parallel_sends: usize,
21}
22
23impl Default for RoutingPolicy {
24    fn default() -> Self {
25        Self {
26            next_leaders: 2,
27            backup_validators: 1,
28            max_parallel_sends: 4,
29        }
30    }
31}
32
33impl RoutingPolicy {
34    /// Returns a normalized policy with bounded minimums.
35    #[must_use]
36    pub fn normalized(self) -> Self {
37        Self {
38            next_leaders: self.next_leaders,
39            backup_validators: self.backup_validators,
40            max_parallel_sends: self.max_parallel_sends.max(1),
41        }
42    }
43}
44
45/// Selects leader/backup targets in deterministic order.
46#[must_use]
47pub fn select_targets(
48    leader_provider: &dyn LeaderProvider,
49    backups: &[LeaderTarget],
50    policy: RoutingPolicy,
51) -> Vec<LeaderTarget> {
52    let policy = policy.normalized();
53    let mut seen = HashSet::new();
54    let mut selected = Vec::new();
55    let dynamic_backup_count = if backups.is_empty() {
56        policy.backup_validators
57    } else {
58        0
59    };
60
61    if let Some(current) = leader_provider.current_leader()
62        && seen.insert(current.tpu_addr)
63    {
64        selected.push(current);
65    }
66
67    let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
68    for target in leader_provider.next_leaders(requested_next) {
69        if seen.insert(target.tpu_addr) {
70            selected.push(target);
71        }
72    }
73
74    for target in backups.iter().take(policy.backup_validators) {
75        if seen.insert(target.tpu_addr) {
76            selected.push(target.clone());
77        }
78    }
79
80    selected
81}
82
83/// Deduplicates transaction signatures for a bounded time window.
84#[derive(Debug)]
85pub struct SignatureDeduper {
86    /// Time-to-live for seen signatures.
87    ttl: Duration,
88    /// Last seen timestamps by signature.
89    seen: HashMap<Signature, Instant>,
90}
91
92impl SignatureDeduper {
93    /// Creates a dedupe window with a minimum TTL of one millisecond.
94    #[must_use]
95    pub fn new(ttl: Duration) -> Self {
96        Self {
97            ttl: ttl.max(Duration::from_millis(1)),
98            seen: HashMap::new(),
99        }
100    }
101
102    /// Returns true when signature is new (and records it), false when duplicate.
103    pub fn check_and_insert(&mut self, signature: Signature, now: Instant) -> bool {
104        self.evict_expired(now);
105        if self.seen.contains_key(&signature) {
106            return false;
107        }
108        let _ = self.seen.insert(signature, now);
109        true
110    }
111
112    /// Returns number of signatures currently tracked.
113    #[must_use]
114    pub fn len(&self) -> usize {
115        self.seen.len()
116    }
117
118    /// Returns true when no signatures are currently tracked.
119    #[must_use]
120    pub fn is_empty(&self) -> bool {
121        self.seen.is_empty()
122    }
123
124    /// Removes all expired signature entries.
125    fn evict_expired(&mut self, now: Instant) {
126        let ttl = self.ttl;
127        self.seen
128            .retain(|_, first_seen| now.saturating_duration_since(*first_seen) < ttl);
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::providers::{LeaderTarget, StaticLeaderProvider};
136
137    fn target(port: u16) -> LeaderTarget {
138        LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
139    }
140
141    #[test]
142    fn select_targets_prefers_current_next_then_backups() {
143        let provider =
144            StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
145        let backups = vec![target(9004), target(9005)];
146        let selected = select_targets(
147            &provider,
148            &backups,
149            RoutingPolicy {
150                next_leaders: 2,
151                backup_validators: 1,
152                max_parallel_sends: 8,
153            },
154        );
155        assert_eq!(
156            selected,
157            vec![target(9001), target(9002), target(9003), target(9004)]
158        );
159    }
160
161    #[test]
162    fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
163        let provider = StaticLeaderProvider::new(
164            Some(target(9010)),
165            vec![target(9011), target(9012), target(9013), target(9014)],
166        );
167        let selected = select_targets(
168            &provider,
169            &[],
170            RoutingPolicy {
171                next_leaders: 2,
172                backup_validators: 2,
173                max_parallel_sends: 8,
174            },
175        );
176        assert_eq!(
177            selected,
178            vec![
179                target(9010),
180                target(9011),
181                target(9012),
182                target(9013),
183                target(9014)
184            ]
185        );
186    }
187
188    #[test]
189    fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
190        let signature = Signature::from([7_u8; 64]);
191        let now = Instant::now();
192        let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
193        assert!(deduper.check_and_insert(signature, now));
194        assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
195        assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
196    }
197}