use std::{
collections::{HashMap, VecDeque},
hash::{Hash, Hasher},
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sof_types::SignatureBytes;
use thiserror::Error;
use crate::{providers::LeaderTarget, routing::RoutingPolicy};
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum SubmitMode {
RpcOnly,
JitoOnly,
DirectOnly,
Hybrid,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub enum SubmitRoute {
Rpc,
Jito,
Direct,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
pub enum SubmitStrategy {
#[default]
OrderedFallback,
AllAtOnce,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SubmitPlan {
pub routes: Vec<SubmitRoute>,
pub strategy: SubmitStrategy,
}
impl SubmitPlan {
#[must_use]
pub fn new(routes: Vec<SubmitRoute>, strategy: SubmitStrategy) -> Self {
Self { routes, strategy }.into_normalized()
}
#[must_use]
pub fn into_normalized(mut self) -> Self {
let mut seen_rpc = false;
let mut seen_jito = false;
let mut seen_direct = false;
self.routes.retain(|route| match route {
SubmitRoute::Rpc if seen_rpc => false,
SubmitRoute::Rpc => {
seen_rpc = true;
true
}
SubmitRoute::Jito if seen_jito => false,
SubmitRoute::Jito => {
seen_jito = true;
true
}
SubmitRoute::Direct if seen_direct => false,
SubmitRoute::Direct => {
seen_direct = true;
true
}
});
self
}
#[must_use]
pub fn normalized(&self) -> Self {
self.clone().into_normalized()
}
#[must_use]
pub fn rpc_only() -> Self {
Self::new(vec![SubmitRoute::Rpc], SubmitStrategy::OrderedFallback)
}
#[must_use]
pub fn jito_only() -> Self {
Self::new(vec![SubmitRoute::Jito], SubmitStrategy::OrderedFallback)
}
#[must_use]
pub fn direct_only() -> Self {
Self::new(vec![SubmitRoute::Direct], SubmitStrategy::OrderedFallback)
}
#[must_use]
pub fn ordered(routes: Vec<SubmitRoute>) -> Self {
Self::new(routes, SubmitStrategy::OrderedFallback)
}
#[must_use]
pub fn hybrid() -> Self {
Self::ordered(vec![SubmitRoute::Direct, SubmitRoute::Rpc])
}
#[must_use]
pub fn all_at_once(routes: Vec<SubmitRoute>) -> Self {
Self::new(routes, SubmitStrategy::AllAtOnce)
}
#[must_use]
pub fn legacy_mode(&self) -> Option<SubmitMode> {
match (self.strategy, self.routes.as_slice()) {
(SubmitStrategy::OrderedFallback, [SubmitRoute::Rpc]) => Some(SubmitMode::RpcOnly),
(SubmitStrategy::OrderedFallback, [SubmitRoute::Jito]) => Some(SubmitMode::JitoOnly),
(SubmitStrategy::OrderedFallback, [SubmitRoute::Direct]) => {
Some(SubmitMode::DirectOnly)
}
(SubmitStrategy::OrderedFallback, [SubmitRoute::Direct, SubmitRoute::Rpc]) => {
Some(SubmitMode::Hybrid)
}
_ => None,
}
}
}
impl Default for SubmitPlan {
fn default() -> Self {
Self::rpc_only()
}
}
impl From<SubmitMode> for SubmitPlan {
fn from(value: SubmitMode) -> Self {
match value {
SubmitMode::RpcOnly => Self::rpc_only(),
SubmitMode::JitoOnly => Self::jito_only(),
SubmitMode::DirectOnly => Self::direct_only(),
SubmitMode::Hybrid => Self::hybrid(),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
pub enum SubmitReliability {
LowLatency,
#[default]
Balanced,
HighReliability,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum SignedTx {
VersionedTransactionBytes(Vec<u8>),
WireTransactionBytes(Vec<u8>),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RpcSubmitConfig {
pub skip_preflight: bool,
pub preflight_commitment: Option<String>,
}
impl Default for RpcSubmitConfig {
fn default() -> Self {
Self {
skip_preflight: true,
preflight_commitment: None,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct JitoSubmitConfig {
pub bundle_only: bool,
}
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct JitoSubmitResponse {
pub transaction_signature: Option<String>,
pub bundle_id: Option<String>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DirectSubmitConfig {
pub per_target_timeout: Duration,
pub global_timeout: Duration,
pub direct_target_rounds: usize,
pub direct_submit_attempts: usize,
pub hybrid_direct_attempts: usize,
pub rebroadcast_interval: Duration,
pub agave_rebroadcast_enabled: bool,
pub agave_rebroadcast_window: Duration,
pub agave_rebroadcast_interval: Duration,
pub hybrid_rpc_broadcast: bool,
pub latency_aware_targeting: bool,
pub latency_probe_timeout: Duration,
pub latency_probe_port: Option<u16>,
pub latency_probe_max_targets: usize,
}
impl DirectSubmitConfig {
#[must_use]
pub const fn from_reliability(reliability: SubmitReliability) -> Self {
match reliability {
SubmitReliability::LowLatency => Self {
per_target_timeout: Duration::from_millis(200),
global_timeout: Duration::from_millis(1_200),
direct_target_rounds: 3,
direct_submit_attempts: 3,
hybrid_direct_attempts: 2,
rebroadcast_interval: Duration::from_millis(90),
agave_rebroadcast_enabled: true,
agave_rebroadcast_window: Duration::from_secs(30),
agave_rebroadcast_interval: Duration::from_millis(700),
hybrid_rpc_broadcast: false,
latency_aware_targeting: true,
latency_probe_timeout: Duration::from_millis(80),
latency_probe_port: Some(8899),
latency_probe_max_targets: 128,
},
SubmitReliability::Balanced => Self {
per_target_timeout: Duration::from_millis(300),
global_timeout: Duration::from_millis(1_800),
direct_target_rounds: 4,
direct_submit_attempts: 4,
hybrid_direct_attempts: 3,
rebroadcast_interval: Duration::from_millis(110),
agave_rebroadcast_enabled: true,
agave_rebroadcast_window: Duration::from_secs(45),
agave_rebroadcast_interval: Duration::from_millis(800),
hybrid_rpc_broadcast: true,
latency_aware_targeting: true,
latency_probe_timeout: Duration::from_millis(120),
latency_probe_port: Some(8899),
latency_probe_max_targets: 128,
},
SubmitReliability::HighReliability => Self {
per_target_timeout: Duration::from_millis(450),
global_timeout: Duration::from_millis(3_200),
direct_target_rounds: 6,
direct_submit_attempts: 5,
hybrid_direct_attempts: 4,
rebroadcast_interval: Duration::from_millis(140),
agave_rebroadcast_enabled: true,
agave_rebroadcast_window: Duration::from_secs(70),
agave_rebroadcast_interval: Duration::from_millis(900),
hybrid_rpc_broadcast: true,
latency_aware_targeting: true,
latency_probe_timeout: Duration::from_millis(160),
latency_probe_port: Some(8899),
latency_probe_max_targets: 128,
},
}
}
#[must_use]
pub const fn normalized(self) -> Self {
let per_target_timeout = if self.per_target_timeout.is_zero() {
Duration::from_millis(1)
} else {
self.per_target_timeout
};
let global_timeout = if self.global_timeout.is_zero() {
Duration::from_millis(1)
} else {
self.global_timeout
};
let direct_target_rounds = if self.direct_target_rounds == 0 {
1
} else {
self.direct_target_rounds
};
let direct_submit_attempts = if self.direct_submit_attempts == 0 {
1
} else {
self.direct_submit_attempts
};
let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
1
} else {
self.hybrid_direct_attempts
};
let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
1
} else {
self.latency_probe_max_targets
};
let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
Duration::from_millis(1)
} else {
self.rebroadcast_interval
};
let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
Duration::from_millis(1)
} else {
self.agave_rebroadcast_interval
};
let latency_probe_timeout = if self.latency_probe_timeout.is_zero() {
Duration::from_millis(1)
} else {
self.latency_probe_timeout
};
Self {
per_target_timeout,
global_timeout,
direct_target_rounds,
direct_submit_attempts,
hybrid_direct_attempts,
rebroadcast_interval,
agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
agave_rebroadcast_window: self.agave_rebroadcast_window,
agave_rebroadcast_interval,
hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
latency_aware_targeting: self.latency_aware_targeting,
latency_probe_timeout,
latency_probe_port: self.latency_probe_port,
latency_probe_max_targets,
}
}
}
impl Default for DirectSubmitConfig {
fn default() -> Self {
Self::from_reliability(SubmitReliability::default())
}
}
#[derive(Debug, Error, Clone, Eq, PartialEq)]
pub enum SubmitTransportError {
#[error("transport configuration invalid: {message}")]
Config {
message: String,
},
#[error("transport failure: {message}")]
Failure {
message: String,
},
}
#[derive(Debug, Error)]
pub enum SubmitError {
#[error("blockhash provider returned no recent blockhash")]
MissingRecentBlockhash,
#[error("failed to decode signed transaction bytes: {source}")]
DecodeSignedBytes {
source: Box<bincode::ErrorKind>,
},
#[error("duplicate signature suppressed by dedupe window")]
DuplicateSignature,
#[error("rpc transport is not configured")]
MissingRpcTransport,
#[error("jito transport is not configured")]
MissingJitoTransport,
#[error("direct transport is not configured")]
MissingDirectTransport,
#[error("no direct targets resolved from leader/backups")]
NoDirectTargets,
#[error("direct submit failed: {source}")]
Direct {
source: SubmitTransportError,
},
#[error("rpc submit failed: {source}")]
Rpc {
source: SubmitTransportError,
},
#[error("jito submit failed: {source}")]
Jito {
source: SubmitTransportError,
},
#[error("internal synchronization failure: {message}")]
InternalSync {
message: String,
},
#[error("submission rejected by toxic-flow guard: {reason}")]
ToxicFlow {
reason: TxToxicFlowRejectionReason,
},
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SubmitResult {
pub signature: Option<SignatureBytes>,
pub plan: SubmitPlan,
pub legacy_mode: Option<SubmitMode>,
pub first_success_route: Option<SubmitRoute>,
pub successful_routes: Vec<SubmitRoute>,
pub direct_target: Option<LeaderTarget>,
pub rpc_signature: Option<String>,
pub jito_signature: Option<String>,
pub jito_bundle_id: Option<String>,
pub used_fallback_route: bool,
pub selected_target_count: usize,
pub selected_identity_count: usize,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum TxFlowSafetyQuality {
Stable,
Provisional,
ReorgRisk,
Stale,
Degraded,
IncompleteControlPlane,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TxFlowSafetyIssue {
ReplayRecoveryPending,
MissingControlPlane,
StaleControlPlane,
DegradedControlPlane,
ReorgRisk,
Provisional,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct TxFlowSafetySnapshot {
pub quality: TxFlowSafetyQuality,
pub issues: Vec<TxFlowSafetyIssue>,
pub current_state_version: Option<u64>,
pub replay_recovery_pending: bool,
}
impl TxFlowSafetySnapshot {
#[must_use]
pub const fn is_safe(&self) -> bool {
matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
}
}
pub trait TxFlowSafetySource: Send + Sync {
fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TxSubmitSuppressionKey {
Signature(SignatureBytes),
Opportunity([u8; 32]),
AccountSet([u8; 32]),
SlotWindow {
slot: u64,
window: u64,
},
}
impl Hash for TxSubmitSuppressionKey {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
Self::Signature(signature) => {
0_u8.hash(state);
signature.as_array().hash(state);
}
Self::Opportunity(key) => {
1_u8.hash(state);
key.hash(state);
}
Self::AccountSet(key) => {
2_u8.hash(state);
key.hash(state);
}
Self::SlotWindow { slot, window } => {
3_u8.hash(state);
slot.hash(state);
window.hash(state);
}
}
}
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct TxSubmitContext {
pub suppression_keys: Vec<TxSubmitSuppressionKey>,
pub decision_state_version: Option<u64>,
pub opportunity_created_at: Option<SystemTime>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TxSubmitGuardPolicy {
pub require_stable_control_plane: bool,
pub reject_on_replay_recovery_pending: bool,
pub max_state_version_drift: Option<u64>,
pub max_opportunity_age: Option<Duration>,
pub suppression_ttl: Duration,
}
impl Default for TxSubmitGuardPolicy {
fn default() -> Self {
Self {
require_stable_control_plane: true,
reject_on_replay_recovery_pending: true,
max_state_version_drift: Some(4),
max_opportunity_age: Some(Duration::from_millis(750)),
suppression_ttl: Duration::from_millis(750),
}
}
}
#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TxToxicFlowRejectionReason {
#[error("control-plane quality {quality:?} is not safe for submit")]
UnsafeControlPlane {
quality: TxFlowSafetyQuality,
},
#[error("submit source is still recovering replay continuity")]
ReplayRecoveryPending,
#[error("submit suppressed by active key")]
Suppressed,
#[error("state version drift {drift} exceeded maximum {max_allowed}")]
StateDrift {
drift: u64,
max_allowed: u64,
},
#[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
OpportunityStale {
age_ms: u64,
max_allowed_ms: u64,
},
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum TxSubmitOutcomeKind {
DirectAccepted,
RpcAccepted,
JitoAccepted,
Landed,
Expired,
Dropped,
LeaderMissed,
BlockhashStale,
UnhealthyRoute,
RejectedDueToStaleness,
RejectedDueToReorgRisk,
RejectedDueToStateDrift,
RejectedDueToReplayRecovery,
Suppressed,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct TxSubmitOutcome {
pub kind: TxSubmitOutcomeKind,
pub signature: Option<SignatureBytes>,
pub route: Option<SubmitRoute>,
pub plan: SubmitPlan,
pub legacy_mode: Option<SubmitMode>,
pub rpc_signature: Option<String>,
pub jito_signature: Option<String>,
pub jito_bundle_id: Option<String>,
pub state_version: Option<u64>,
pub opportunity_age_ms: Option<u64>,
}
pub trait TxSubmitOutcomeReporter: Send + Sync {
fn record_outcome(&self, outcome: &TxSubmitOutcome);
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct TxToxicFlowTelemetrySnapshot {
pub reporter_outcomes_dropped: u64,
pub reporter_outcomes_unavailable: u64,
pub direct_accepted: u64,
pub rpc_accepted: u64,
pub jito_accepted: u64,
pub rejected_due_to_staleness: u64,
pub rejected_due_to_reorg_risk: u64,
pub rejected_due_to_state_drift: u64,
pub submit_on_stale_blockhash: u64,
pub leader_route_miss_rate: u64,
pub opportunity_age_at_send_ms: Option<u64>,
pub rejected_due_to_replay_recovery: u64,
pub suppressed_submissions: u64,
}
#[derive(Debug, Default)]
#[repr(align(64))]
struct CacheAlignedAtomicU64(AtomicU64);
impl CacheAlignedAtomicU64 {
fn load(&self, ordering: Ordering) -> u64 {
self.0.load(ordering)
}
fn swap(&self, value: u64, ordering: Ordering) -> u64 {
self.0.swap(value, ordering)
}
fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
self.0.fetch_add(value, ordering)
}
}
#[derive(Debug, Default)]
pub struct TxToxicFlowTelemetry {
reporter_outcomes_dropped: CacheAlignedAtomicU64,
reporter_outcomes_unavailable: CacheAlignedAtomicU64,
direct_accepted: CacheAlignedAtomicU64,
rpc_accepted: CacheAlignedAtomicU64,
jito_accepted: CacheAlignedAtomicU64,
rejected_due_to_staleness: CacheAlignedAtomicU64,
rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
rejected_due_to_state_drift: CacheAlignedAtomicU64,
submit_on_stale_blockhash: CacheAlignedAtomicU64,
leader_route_miss_rate: CacheAlignedAtomicU64,
opportunity_age_at_send_ms: CacheAlignedAtomicU64,
rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
suppressed_submissions: CacheAlignedAtomicU64,
}
impl TxToxicFlowTelemetry {
#[must_use]
pub fn shared() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn record(&self, outcome: &TxSubmitOutcome) {
if let Some(age_ms) = outcome.opportunity_age_ms {
let _ = self
.opportunity_age_at_send_ms
.swap(age_ms, Ordering::Relaxed);
}
match outcome.kind {
TxSubmitOutcomeKind::DirectAccepted => {
let _ = self.direct_accepted.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::RpcAccepted => {
let _ = self.rpc_accepted.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::JitoAccepted => {
let _ = self.jito_accepted.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::RejectedDueToStaleness => {
let _ = self
.rejected_due_to_staleness
.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
let _ = self
.rejected_due_to_reorg_risk
.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::RejectedDueToStateDrift => {
let _ = self
.rejected_due_to_state_drift
.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
let _ = self
.rejected_due_to_replay_recovery
.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::Suppressed => {
let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::LeaderMissed => {
let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
}
TxSubmitOutcomeKind::Landed
| TxSubmitOutcomeKind::Expired
| TxSubmitOutcomeKind::Dropped
| TxSubmitOutcomeKind::UnhealthyRoute => {}
TxSubmitOutcomeKind::BlockhashStale => {
let _ = self
.submit_on_stale_blockhash
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub(crate) fn record_reporter_drop(&self) {
let _ = self
.reporter_outcomes_dropped
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_reporter_unavailable(&self) {
let _ = self
.reporter_outcomes_unavailable
.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
TxToxicFlowTelemetrySnapshot {
reporter_outcomes_dropped: self.reporter_outcomes_dropped.load(Ordering::Relaxed),
reporter_outcomes_unavailable: self
.reporter_outcomes_unavailable
.load(Ordering::Relaxed),
direct_accepted: self.direct_accepted.load(Ordering::Relaxed),
rpc_accepted: self.rpc_accepted.load(Ordering::Relaxed),
jito_accepted: self.jito_accepted.load(Ordering::Relaxed),
rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
rejected_due_to_replay_recovery: self
.rejected_due_to_replay_recovery
.load(Ordering::Relaxed),
suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
}
}
}
impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
fn record_outcome(&self, outcome: &TxSubmitOutcome) {
self.record(outcome);
}
}
#[derive(Debug, Default)]
pub(crate) struct TxSuppressionCache {
entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
order: VecDeque<(TxSubmitSuppressionKey, SystemTime)>,
}
impl TxSuppressionCache {
pub(crate) fn is_suppressed(
&mut self,
keys: &[TxSubmitSuppressionKey],
now: SystemTime,
ttl: Duration,
) -> bool {
self.evict_expired(now, ttl);
keys.iter().any(|key| self.entries.contains_key(key))
}
pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
for key in keys {
let _ = self.entries.insert(key.clone(), now);
self.order.push_back((key.clone(), now));
}
}
fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
while let Some((_, front_inserted_at)) = self.order.front() {
let still_live = now
.duration_since(*front_inserted_at)
.map(|elapsed| elapsed <= ttl)
.unwrap_or(false);
if still_live {
break;
}
let Some((key, queued_inserted_at)) = self.order.pop_front() else {
break;
};
if self.entries.get(&key) == Some(&queued_inserted_at) {
let _ = self.entries.remove(&key);
}
}
}
}
#[async_trait]
pub trait RpcSubmitTransport: Send + Sync {
async fn submit_rpc(
&self,
tx_bytes: &[u8],
config: &RpcSubmitConfig,
) -> Result<String, SubmitTransportError>;
}
#[async_trait]
pub trait JitoSubmitTransport: Send + Sync {
async fn submit_jito(
&self,
tx_bytes: &[u8],
config: &JitoSubmitConfig,
) -> Result<JitoSubmitResponse, SubmitTransportError>;
}
#[async_trait]
pub trait DirectSubmitTransport: Send + Sync {
async fn submit_direct(
&self,
tx_bytes: &[u8],
targets: &[LeaderTarget],
policy: RoutingPolicy,
config: &DirectSubmitConfig,
) -> Result<LeaderTarget, SubmitTransportError>;
}
#[cfg(test)]
mod tests {
use super::*;
use std::{hint::black_box, time::Instant};
use sof_support::bench::profile_iterations;
#[test]
#[ignore = "profiling fixture for submit suppression cache churn"]
fn suppression_cache_profile_fixture() {
let iterations = profile_iterations(50_000);
let ttl = Duration::from_millis(750);
let base = SystemTime::UNIX_EPOCH + Duration::from_secs(1);
let keys = (0_u8..64)
.map(|value| TxSubmitSuppressionKey::Opportunity([value; 32]))
.collect::<Vec<_>>();
let mut cache = TxSuppressionCache::default();
let started = Instant::now();
for (iteration, key) in keys.iter().cycle().take(iterations).enumerate() {
let now = base + Duration::from_millis(u64::try_from(iteration % 2_000).unwrap_or(0));
cache.insert_all(std::slice::from_ref(key), now);
black_box(cache.is_suppressed(std::slice::from_ref(key), now, ttl));
}
let elapsed = started.elapsed();
let avg_ns_per_iteration = elapsed.as_nanos() / u128::try_from(iterations).unwrap_or(1);
let avg_us_per_iteration = avg_ns_per_iteration as f64 / 1_000.0;
eprintln!(
"suppression_cache_profile_fixture iterations={} elapsed_us={} avg_ns_per_iteration={} avg_us_per_iteration={:.3} entries={}",
iterations,
elapsed.as_micros(),
avg_ns_per_iteration,
avg_us_per_iteration,
cache.entries.len(),
);
}
#[test]
fn suppression_cache_keeps_refreshed_entry_live() {
let mut cache = TxSuppressionCache::default();
let key = TxSubmitSuppressionKey::Opportunity([7_u8; 32]);
let ttl = Duration::from_millis(750);
let first_inserted_at = SystemTime::UNIX_EPOCH + Duration::from_secs(1);
let refreshed_at = first_inserted_at + Duration::from_millis(500);
cache.insert_all(std::slice::from_ref(&key), first_inserted_at);
cache.insert_all(std::slice::from_ref(&key), refreshed_at);
assert!(cache.is_suppressed(
std::slice::from_ref(&key),
refreshed_at + Duration::from_millis(100),
ttl,
));
assert!(!cache.is_suppressed(
std::slice::from_ref(&key),
refreshed_at + ttl + Duration::from_millis(1),
ttl,
));
}
#[test]
fn direct_submit_config_clamps_zero_timeouts() {
let normalized = DirectSubmitConfig {
per_target_timeout: Duration::ZERO,
global_timeout: Duration::ZERO,
direct_target_rounds: 1,
direct_submit_attempts: 1,
hybrid_direct_attempts: 1,
rebroadcast_interval: Duration::from_millis(5),
agave_rebroadcast_enabled: false,
agave_rebroadcast_window: Duration::ZERO,
agave_rebroadcast_interval: Duration::from_millis(5),
hybrid_rpc_broadcast: false,
latency_aware_targeting: true,
latency_probe_timeout: Duration::ZERO,
latency_probe_port: None,
latency_probe_max_targets: 1,
}
.normalized();
assert_eq!(normalized.per_target_timeout, Duration::from_millis(1));
assert_eq!(normalized.global_timeout, Duration::from_millis(1));
assert_eq!(normalized.latency_probe_timeout, Duration::from_millis(1));
}
}