use std::{
collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
time::{Duration, Instant},
};
use sof_types::SignatureBytes;
use crate::providers::{LeaderProvider, LeaderTarget};
const INITIAL_SIGNATURE_DEDUPER_CAPACITY: usize = 4_096;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct RoutingPolicy {
pub next_leaders: usize,
pub backup_validators: usize,
pub max_parallel_sends: usize,
}
impl Default for RoutingPolicy {
fn default() -> Self {
Self {
next_leaders: 2,
backup_validators: 1,
max_parallel_sends: 4,
}
}
}
impl RoutingPolicy {
#[must_use]
pub fn normalized(self) -> Self {
Self {
next_leaders: self.next_leaders,
backup_validators: self.backup_validators,
max_parallel_sends: self.max_parallel_sends.max(1),
}
}
}
#[must_use]
pub fn select_targets<P>(
leader_provider: &P,
backups: &[LeaderTarget],
policy: RoutingPolicy,
) -> Vec<LeaderTarget>
where
P: LeaderProvider + ?Sized,
{
let policy = policy.normalized();
let dynamic_backup_count = if backups.is_empty() {
policy.backup_validators
} else {
0
};
let requested_next = policy.next_leaders.saturating_add(dynamic_backup_count);
let estimated_targets = 1_usize
.saturating_add(requested_next)
.saturating_add(policy.backup_validators);
let mut seen = HashSet::with_capacity(estimated_targets);
let mut selected = Vec::with_capacity(estimated_targets);
if let Some(current) = leader_provider.current_leader()
&& seen.insert(current.tpu_addr)
{
selected.push(current);
}
for target in leader_provider.next_leaders(requested_next) {
if seen.insert(target.tpu_addr) {
selected.push(target);
}
}
for target in backups.iter().take(policy.backup_validators) {
if seen.insert(target.tpu_addr) {
selected.push(target.clone());
}
}
selected
}
#[derive(Debug)]
pub struct SignatureDeduper {
ttl: Duration,
seen: HashMap<SignatureBytes, Instant>,
order: VecDeque<(SignatureBytes, Instant)>,
}
impl SignatureDeduper {
#[must_use]
pub fn new(ttl: Duration) -> Self {
Self {
ttl: ttl.max(Duration::from_millis(1)),
seen: HashMap::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
order: VecDeque::with_capacity(INITIAL_SIGNATURE_DEDUPER_CAPACITY),
}
}
pub fn check_and_insert(&mut self, signature: SignatureBytes, now: Instant) -> bool {
self.evict_expired(now);
match self.seen.entry(signature) {
Entry::Occupied(_) => false,
Entry::Vacant(entry) => {
entry.insert(now);
self.order.push_back((signature, now));
true
}
}
}
#[must_use]
pub fn len(&self) -> usize {
self.seen.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.seen.is_empty()
}
fn evict_expired(&mut self, now: Instant) {
while let Some((signature, first_seen)) = self.order.front().copied() {
if now.saturating_duration_since(first_seen) < self.ttl {
break;
}
self.order.pop_front();
if self.seen.get(&signature).copied() == Some(first_seen) {
let _ = self.seen.remove(&signature);
}
}
}
}
#[cfg(test)]
mod tests {
use sof_support::{bench::avg_ns_per_iteration, env_support::read_positive_usize};
use super::*;
use crate::providers::{LeaderTarget, StaticLeaderProvider};
fn target(port: u16) -> LeaderTarget {
LeaderTarget::new(None, std::net::SocketAddr::from(([127, 0, 0, 1], port)))
}
#[test]
fn select_targets_prefers_current_next_then_backups() {
let provider =
StaticLeaderProvider::new(Some(target(9001)), vec![target(9002), target(9003)]);
let backups = vec![target(9004), target(9005)];
let selected = select_targets(
&provider,
&backups,
RoutingPolicy {
next_leaders: 2,
backup_validators: 1,
max_parallel_sends: 8,
},
);
assert_eq!(
selected,
vec![target(9001), target(9002), target(9003), target(9004)]
);
}
#[test]
fn select_targets_uses_dynamic_backups_when_static_backups_are_absent() {
let provider = StaticLeaderProvider::new(
Some(target(9010)),
vec![target(9011), target(9012), target(9013), target(9014)],
);
let selected = select_targets(
&provider,
&[],
RoutingPolicy {
next_leaders: 2,
backup_validators: 2,
max_parallel_sends: 8,
},
);
assert_eq!(
selected,
vec![
target(9010),
target(9011),
target(9012),
target(9013),
target(9014)
]
);
}
#[test]
fn deduper_rejects_recent_duplicate_and_allows_after_ttl() {
let signature = SignatureBytes::from([7_u8; 64]);
let now = Instant::now();
let mut deduper = SignatureDeduper::new(Duration::from_millis(25));
assert!(deduper.check_and_insert(signature, now));
assert!(!deduper.check_and_insert(signature, now + Duration::from_millis(5)));
assert!(deduper.check_and_insert(signature, now + Duration::from_millis(30)));
}
#[test]
#[ignore = "profiling fixture for signature dedupe churn"]
fn signature_deduper_profile_fixture() {
let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
let ttl_ms = u64::try_from(read_positive_usize(
"SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
10_000,
))
.unwrap_or(10_000);
let mut deduper = SignatureDeduper::new(Duration::from_millis(ttl_ms));
let start = Instant::now();
let now = Instant::now();
for index in 0..iterations {
let mut signature = [0_u8; 64];
signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
assert!(deduper.check_and_insert(
SignatureBytes::from(signature),
now + Duration::from_nanos(index as u64)
));
}
let elapsed = start.elapsed();
let avg_ns = avg_ns_per_iteration(elapsed, iterations);
println!(
"signature_deduper_profile_fixture iterations={} ttl_ms={} entries={} elapsed_us={} avg_ns_per_iteration={} avg_us_per_iteration={:.3}",
iterations,
ttl_ms,
deduper.len(),
elapsed.as_micros(),
avg_ns,
avg_ns as f64 / 1_000.0
);
}
#[test]
#[ignore = "profiling fixture for signature deduper allocation churn"]
fn signature_deduper_allocation_profile_fixture() {
let iterations = read_positive_usize("SOF_TX_SIGNATURE_DEDUPER_PROFILE_ITERS", 50_000);
let ttl_ms = u64::try_from(read_positive_usize(
"SOF_TX_SIGNATURE_DEDUPER_PROFILE_TTL_MS",
10_000,
))
.unwrap_or(10_000);
let ttl = Duration::from_millis(ttl_ms);
let mut baseline = signature_deduper_baseline(ttl);
let mut optimized = SignatureDeduper::new(ttl);
let now = Instant::now();
let baseline_started = Instant::now();
for index in 0..iterations {
let signature = make_signature(index);
assert!(baseline.check_and_insert(
SignatureBytes::from(signature),
now + Duration::from_nanos(index as u64)
));
}
let baseline_elapsed = baseline_started.elapsed();
let optimized_started = Instant::now();
for index in 0..iterations {
let signature = make_signature(index);
assert!(optimized.check_and_insert(
SignatureBytes::from(signature),
now + Duration::from_nanos(index as u64)
));
}
let optimized_elapsed = optimized_started.elapsed();
println!(
"signature_deduper_allocation_profile_fixture iterations={} baseline_us={} optimized_us={}",
iterations,
baseline_elapsed.as_micros(),
optimized_elapsed.as_micros(),
);
}
fn signature_deduper_baseline(ttl: Duration) -> SignatureDeduper {
SignatureDeduper {
ttl: ttl.max(Duration::from_millis(1)),
seen: HashMap::new(),
order: VecDeque::new(),
}
}
fn make_signature(index: usize) -> [u8; 64] {
let mut signature = [0_u8; 64];
signature[..8].copy_from_slice(&(index as u64).to_le_bytes());
signature
}
}