use std::{
collections::HashMap,
collections::HashSet,
net::SocketAddr,
panic::{AssertUnwindSafe, catch_unwind},
sync::{
Arc, Mutex, OnceLock, Weak,
atomic::{AtomicBool, Ordering},
mpsc::{self as std_mpsc, SyncSender, TrySendError},
},
thread,
time::{Duration, Instant, SystemTime},
};
use sof_support::{short_vec::decode_short_u16_len_prefix, time_support::duration_millis_u64};
use sof_types::SignatureBytes;
use tokio::{
net::TcpStream,
sync::mpsc,
task::JoinSet,
time::{sleep, timeout},
};
use super::{
DirectSubmitConfig, DirectSubmitTransport, JitoSubmitConfig, JitoSubmitTransport,
RpcSubmitConfig, RpcSubmitTransport, SignedTx, SubmitError, SubmitMode, SubmitPlan,
SubmitReliability, SubmitResult, SubmitRoute, SubmitStrategy, SubmitTransportError,
TxFlowSafetyQuality, TxFlowSafetySource, TxSubmitClientBuilder, TxSubmitContext,
TxSubmitGuardPolicy, TxSubmitOutcome, TxSubmitOutcomeKind, TxSubmitOutcomeReporter,
TxToxicFlowRejectionReason, TxToxicFlowTelemetry, TxToxicFlowTelemetrySnapshot,
};
use crate::{
providers::{
LeaderProvider, LeaderTarget, RecentBlockhashProvider, RpcRecentBlockhashProvider,
StaticLeaderProvider,
},
routing::{RoutingPolicy, SignatureDeduper, select_targets},
submit::{JsonRpcTransport, types::TxSuppressionCache},
};
pub struct TxSubmitClient {
blockhash_provider: Arc<dyn RecentBlockhashProvider>,
on_demand_blockhash_provider: Option<Arc<RpcRecentBlockhashProvider>>,
leader_provider: Arc<dyn LeaderProvider>,
backups: Vec<LeaderTarget>,
policy: RoutingPolicy,
deduper: SignatureDeduper,
rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
rpc_config: RpcSubmitConfig,
jito_config: JitoSubmitConfig,
direct_config: DirectSubmitConfig,
flow_safety_source: Option<Arc<dyn TxFlowSafetySource>>,
guard_policy: TxSubmitGuardPolicy,
suppression: TxSuppressionCache,
telemetry: Arc<TxToxicFlowTelemetry>,
outcome_reporter: Option<OutcomeReporterHandle>,
}
impl TxSubmitClient {
#[must_use]
pub fn builder() -> TxSubmitClientBuilder {
TxSubmitClientBuilder::new()
}
#[must_use]
pub fn new(
blockhash_provider: Arc<dyn RecentBlockhashProvider>,
leader_provider: Arc<dyn LeaderProvider>,
) -> Self {
Self {
blockhash_provider,
on_demand_blockhash_provider: None,
leader_provider,
backups: Vec::new(),
policy: RoutingPolicy::default(),
deduper: SignatureDeduper::new(Duration::from_secs(10)),
rpc_transport: None,
direct_transport: None,
jito_transport: None,
rpc_config: RpcSubmitConfig::default(),
jito_config: JitoSubmitConfig::default(),
direct_config: DirectSubmitConfig::default(),
flow_safety_source: None,
guard_policy: TxSubmitGuardPolicy::default(),
suppression: TxSuppressionCache::default(),
telemetry: TxToxicFlowTelemetry::shared(),
outcome_reporter: None,
}
}
#[must_use]
pub fn blockhash_only(blockhash_provider: Arc<dyn RecentBlockhashProvider>) -> Self {
Self::new(
blockhash_provider,
Arc::new(StaticLeaderProvider::default()),
)
}
pub fn blockhash_via_rpc(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
let blockhash_provider = Arc::new(RpcRecentBlockhashProvider::new(rpc_url.into())?);
Ok(Self::blockhash_only(blockhash_provider.clone())
.with_rpc_blockhash_provider(blockhash_provider))
}
pub fn rpc_only(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
let rpc_url = rpc_url.into();
let client = Self::blockhash_via_rpc(rpc_url.clone())?;
let rpc_transport = Arc::new(JsonRpcTransport::new(rpc_url)?);
Ok(client.with_rpc_transport(rpc_transport))
}
#[must_use]
pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
self.backups = backups;
self
}
#[must_use]
pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
self.policy = policy.normalized();
self
}
#[must_use]
pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
self.deduper = SignatureDeduper::new(ttl);
self
}
#[must_use]
pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
self.rpc_transport = Some(transport);
self
}
#[must_use]
pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
self.direct_transport = Some(transport);
self
}
#[must_use]
pub fn with_jito_transport(mut self, transport: Arc<dyn JitoSubmitTransport>) -> Self {
self.jito_transport = Some(transport);
self
}
#[must_use]
pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
self.rpc_config = config;
self
}
#[must_use]
pub const fn with_jito_config(mut self, config: JitoSubmitConfig) -> Self {
self.jito_config = config;
self
}
#[must_use]
pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
self.direct_config = config.normalized();
self
}
#[must_use]
pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
self.direct_config = DirectSubmitConfig::from_reliability(reliability);
self
}
#[must_use]
pub fn with_flow_safety_source(mut self, source: Arc<dyn TxFlowSafetySource>) -> Self {
self.flow_safety_source = Some(source);
self
}
#[must_use]
pub const fn with_guard_policy(mut self, policy: TxSubmitGuardPolicy) -> Self {
self.guard_policy = policy;
self
}
#[must_use]
pub fn with_outcome_reporter(mut self, reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
self.outcome_reporter = Some(OutcomeReporterHandle::new(reporter));
self
}
#[must_use]
pub fn with_rpc_blockhash_provider(
mut self,
provider: Arc<RpcRecentBlockhashProvider>,
) -> Self {
self.on_demand_blockhash_provider = Some(provider);
self
}
#[must_use]
pub fn toxic_flow_telemetry(&self) -> TxToxicFlowTelemetrySnapshot {
self.telemetry.snapshot()
}
pub fn record_external_outcome(&self, outcome: &TxSubmitOutcome) {
record_external_outcome_shared(&self.telemetry, self.outcome_reporter.as_ref(), outcome);
}
pub async fn submit_signed(
&mut self,
signed_tx: SignedTx,
mode: SubmitMode,
) -> Result<SubmitResult, SubmitError> {
self.submit_signed_with_context(signed_tx, mode, TxSubmitContext::default())
.await
}
pub async fn submit_signed_via(
&mut self,
signed_tx: SignedTx,
plan: SubmitPlan,
) -> Result<SubmitResult, SubmitError> {
self.submit_signed_with_context_via(signed_tx, plan, TxSubmitContext::default())
.await
}
pub async fn submit_signed_with_context(
&mut self,
signed_tx: SignedTx,
mode: SubmitMode,
context: TxSubmitContext,
) -> Result<SubmitResult, SubmitError> {
self.submit_signed_with_context_via(signed_tx, SubmitPlan::from(mode), context)
.await
}
pub async fn submit_signed_with_context_via(
&mut self,
signed_tx: SignedTx,
plan: SubmitPlan,
context: TxSubmitContext,
) -> Result<SubmitResult, SubmitError> {
let tx_bytes = match signed_tx {
SignedTx::VersionedTransactionBytes(bytes) => bytes,
SignedTx::WireTransactionBytes(bytes) => bytes,
};
let signature = extract_first_signature(&tx_bytes)?;
self.submit_bytes(tx_bytes, signature, plan, context).await
}
pub async fn refresh_latest_blockhash_bytes(
&self,
) -> Result<Option<[u8; 32]>, SubmitTransportError> {
if let Some(provider) = &self.on_demand_blockhash_provider {
let _ = provider.refresh().await?;
}
Ok(self.latest_blockhash_bytes())
}
#[must_use]
pub fn latest_blockhash_bytes(&self) -> Option<[u8; 32]> {
self.blockhash_provider.latest_blockhash()
}
async fn submit_bytes(
&mut self,
tx_bytes: Vec<u8>,
signature: Option<SignatureBytes>,
plan: SubmitPlan,
context: TxSubmitContext,
) -> Result<SubmitResult, SubmitError> {
let plan = plan.into_normalized();
self.validate_submit_plan(&plan)?;
self.enforce_toxic_flow_guards(signature, &plan, &context)?;
self.enforce_dedupe(signature)?;
let tx_bytes = Arc::<[u8]>::from(tx_bytes);
match plan.strategy {
SubmitStrategy::OrderedFallback => {
self.submit_routes_in_order(tx_bytes, signature, plan).await
}
SubmitStrategy::AllAtOnce => {
self.submit_routes_all_at_once(tx_bytes, signature, plan)
.await
}
}
}
fn enforce_dedupe(&mut self, signature: Option<SignatureBytes>) -> Result<(), SubmitError> {
if let Some(signature) = signature {
let now = Instant::now();
if !self.deduper.check_and_insert(signature, now) {
return Err(SubmitError::DuplicateSignature);
}
}
Ok(())
}
fn enforce_toxic_flow_guards(
&mut self,
signature: Option<SignatureBytes>,
plan: &SubmitPlan,
context: &TxSubmitContext,
) -> Result<(), SubmitError> {
let legacy_mode = plan.legacy_mode();
let now = SystemTime::now();
let opportunity_age_ms = context
.opportunity_created_at
.and_then(|created_at| now.duration_since(created_at).ok())
.map(duration_millis_u64);
if let Some(age_ms) = opportunity_age_ms
&& let Some(max_age) = self.guard_policy.max_opportunity_age
{
let max_allowed_ms = duration_millis_u64(max_age);
if age_ms > max_allowed_ms {
return Err(self.reject_with_outcome(
TxToxicFlowRejectionReason::OpportunityStale {
age_ms,
max_allowed_ms,
},
TxSubmitOutcomeKind::RejectedDueToStaleness,
RejectionMetadata {
signature,
plan: plan.clone(),
legacy_mode,
state_version: None,
opportunity_age_ms,
},
));
}
}
if self.suppression.is_suppressed(
&context.suppression_keys,
now,
self.guard_policy.suppression_ttl,
) {
return Err(self.reject_with_outcome(
TxToxicFlowRejectionReason::Suppressed,
TxSubmitOutcomeKind::Suppressed,
RejectionMetadata {
signature,
plan: plan.clone(),
legacy_mode,
state_version: None,
opportunity_age_ms,
},
));
}
if let Some(source) = &self.flow_safety_source {
let snapshot = source.toxic_flow_snapshot();
if self.guard_policy.reject_on_replay_recovery_pending
&& snapshot.replay_recovery_pending
{
return Err(self.reject_with_outcome(
TxToxicFlowRejectionReason::ReplayRecoveryPending,
TxSubmitOutcomeKind::RejectedDueToReplayRecovery,
RejectionMetadata {
signature,
plan: plan.clone(),
legacy_mode,
state_version: snapshot.current_state_version,
opportunity_age_ms,
},
));
}
if self.guard_policy.require_stable_control_plane
&& !matches!(snapshot.quality, TxFlowSafetyQuality::Stable)
{
let outcome_kind = match snapshot.quality {
TxFlowSafetyQuality::ReorgRisk | TxFlowSafetyQuality::Provisional => {
TxSubmitOutcomeKind::RejectedDueToReorgRisk
}
TxFlowSafetyQuality::Stale => TxSubmitOutcomeKind::RejectedDueToStaleness,
TxFlowSafetyQuality::Degraded
| TxFlowSafetyQuality::IncompleteControlPlane
| TxFlowSafetyQuality::Stable => TxSubmitOutcomeKind::Suppressed,
};
return Err(self.reject_with_outcome(
TxToxicFlowRejectionReason::UnsafeControlPlane {
quality: snapshot.quality,
},
outcome_kind,
RejectionMetadata {
signature,
plan: plan.clone(),
legacy_mode,
state_version: snapshot.current_state_version,
opportunity_age_ms,
},
));
}
if let (Some(decision_version), Some(current_version), Some(max_allowed)) = (
context.decision_state_version,
snapshot.current_state_version,
self.guard_policy.max_state_version_drift,
) {
let drift = current_version.saturating_sub(decision_version);
if drift > max_allowed {
return Err(self.reject_with_outcome(
TxToxicFlowRejectionReason::StateDrift { drift, max_allowed },
TxSubmitOutcomeKind::RejectedDueToStateDrift,
RejectionMetadata {
signature,
plan: plan.clone(),
legacy_mode,
state_version: Some(current_version),
opportunity_age_ms,
},
));
}
}
}
self.suppression.insert_all(&context.suppression_keys, now);
Ok(())
}
fn reject_with_outcome(
&self,
reason: TxToxicFlowRejectionReason,
outcome_kind: TxSubmitOutcomeKind,
metadata: RejectionMetadata,
) -> SubmitError {
let outcome = TxSubmitOutcome {
kind: outcome_kind,
signature: metadata.signature,
route: None,
plan: metadata.plan,
legacy_mode: metadata.legacy_mode,
rpc_signature: None,
jito_signature: None,
jito_bundle_id: None,
state_version: metadata.state_version,
opportunity_age_ms: metadata.opportunity_age_ms,
};
self.record_external_outcome(&outcome);
SubmitError::ToxicFlow { reason }
}
fn validate_submit_plan(&self, plan: &SubmitPlan) -> Result<(), SubmitError> {
if plan.routes.is_empty() {
return Err(SubmitError::InternalSync {
message: "submit plan must contain at least one route".to_owned(),
});
}
for route in &plan.routes {
match route {
SubmitRoute::Rpc if self.rpc_transport.is_none() => {
return Err(SubmitError::MissingRpcTransport);
}
SubmitRoute::Jito if self.jito_transport.is_none() => {
return Err(SubmitError::MissingJitoTransport);
}
SubmitRoute::Direct if self.direct_transport.is_none() => {
return Err(SubmitError::MissingDirectTransport);
}
SubmitRoute::Rpc | SubmitRoute::Jito | SubmitRoute::Direct => {}
}
}
Ok(())
}
async fn submit_routes_in_order(
&self,
tx_bytes: Arc<[u8]>,
signature: Option<SignatureBytes>,
plan: SubmitPlan,
) -> Result<SubmitResult, SubmitError> {
let legacy_mode = plan.legacy_mode();
let mut last_error = None;
let task_context = self.route_task_context();
for (route_idx, route) in plan.routes.iter().copied().enumerate() {
let next_idx = route_idx.saturating_add(1);
let has_later_routes = plan.routes.get(next_idx).is_some();
let direct_mode = if has_later_routes {
DirectExecutionMode::Fallback
} else {
DirectExecutionMode::Standalone
};
match submit_one_route_task(
route,
Arc::clone(&tx_bytes),
task_context.clone(),
direct_mode,
)
.await
{
Ok(outcome) => {
self.record_route_outcome(signature, &plan, &outcome);
if matches!(outcome.route, SubmitRoute::Direct) {
self.spawn_agave_rebroadcast(
Arc::clone(&tx_bytes),
&self.direct_config.clone().normalized(),
);
if has_later_routes
&& self.direct_config.hybrid_rpc_broadcast
&& plan
.routes
.iter()
.skip(next_idx)
.any(|next| *next == SubmitRoute::Rpc)
{
self.spawn_background_rpc_broadcast(
Arc::clone(&tx_bytes),
signature,
plan.clone(),
);
}
}
return Ok(SubmitResult {
signature,
plan,
legacy_mode,
first_success_route: Some(outcome.route),
successful_routes: vec![outcome.route],
direct_target: outcome.direct_target,
rpc_signature: outcome.rpc_signature,
jito_signature: outcome.jito_signature,
jito_bundle_id: outcome.jito_bundle_id,
used_fallback_route: route_idx > 0,
selected_target_count: outcome.selected_target_count,
selected_identity_count: outcome.selected_identity_count,
});
}
Err(error) => last_error = Some(error),
}
}
Err(last_error.unwrap_or_else(|| SubmitError::InternalSync {
message: "ordered submit plan completed without a route outcome".to_owned(),
}))
}
async fn submit_routes_all_at_once(
&self,
tx_bytes: Arc<[u8]>,
signature: Option<SignatureBytes>,
plan: SubmitPlan,
) -> Result<SubmitResult, SubmitError> {
let legacy_mode = plan.legacy_mode();
let task_context = self.route_task_context();
let direct_transport = self.direct_transport.clone();
let leader_provider = self.leader_provider.clone();
let backups = self.backups.clone();
let policy = self.policy;
let direct_config = self.direct_config.clone().normalized();
let (result_tx, mut result_rx) = mpsc::unbounded_channel();
for (route_idx, route) in plan.routes.iter().copied().enumerate() {
let task_context = task_context.clone();
let tx_bytes = Arc::clone(&tx_bytes);
let result_tx = result_tx.clone();
let telemetry = Arc::clone(&self.telemetry);
let reporter = self.outcome_reporter.clone();
let flow_safety_source = self.flow_safety_source.clone();
let plan_for_task = plan.clone();
let direct_transport = direct_transport.clone();
let leader_provider = leader_provider.clone();
let backups = backups.clone();
let direct_config = direct_config.clone();
tokio::spawn(async move {
let result = submit_one_route_task(
route,
Arc::clone(&tx_bytes),
task_context,
DirectExecutionMode::Standalone,
)
.await;
if let Ok(outcome) = &result {
record_route_outcome_shared(
&telemetry,
reporter.as_ref(),
flow_safety_source.as_ref(),
signature,
&plan_for_task,
outcome,
);
if matches!(outcome.route, SubmitRoute::Direct)
&& direct_config.agave_rebroadcast_enabled
&& !direct_config.agave_rebroadcast_window.is_zero()
&& let Some(direct_transport) = direct_transport
{
spawn_agave_rebroadcast_task(
Arc::clone(&tx_bytes),
direct_transport,
leader_provider,
backups,
policy,
direct_config.clone(),
);
}
}
drop(result_tx.send((route_idx, result)));
});
}
drop(result_tx);
let mut errors_by_route: Vec<Option<SubmitError>> = std::iter::repeat_with(|| None)
.take(plan.routes.len())
.collect();
while let Some((route_idx, result)) = result_rx.recv().await {
match result {
Ok(outcome) => {
return Ok(SubmitResult {
signature,
plan,
legacy_mode,
first_success_route: Some(outcome.route),
successful_routes: vec![outcome.route],
direct_target: outcome.direct_target,
rpc_signature: outcome.rpc_signature,
jito_signature: outcome.jito_signature,
jito_bundle_id: outcome.jito_bundle_id,
used_fallback_route: false,
selected_target_count: outcome.selected_target_count,
selected_identity_count: outcome.selected_identity_count,
});
}
Err(error) => {
if let Some(slot) = errors_by_route.get_mut(route_idx) {
*slot = Some(error);
}
}
}
}
Err(errors_by_route
.into_iter()
.flatten()
.next()
.unwrap_or_else(|| SubmitError::InternalSync {
message: "all-at-once submit plan completed without a route outcome".to_owned(),
}))
}
fn record_route_outcome(
&self,
signature: Option<SignatureBytes>,
plan: &SubmitPlan,
outcome: &RouteSubmitOutcome,
) {
record_route_outcome_shared(
&self.telemetry,
self.outcome_reporter.as_ref(),
self.flow_safety_source.as_ref(),
signature,
plan,
outcome,
);
}
fn route_task_context(&self) -> RouteTaskContext {
RouteTaskContext {
rpc_transport: self.rpc_transport.clone(),
jito_transport: self.jito_transport.clone(),
direct_transport: self.direct_transport.clone(),
leader_provider: self.leader_provider.clone(),
backups: Arc::from(self.backups.clone()),
policy: self.policy,
rpc_config: self.rpc_config.clone(),
jito_config: self.jito_config.clone(),
direct_config: self.direct_config.clone().normalized(),
}
}
fn spawn_agave_rebroadcast(&self, tx_bytes: Arc<[u8]>, direct_config: &DirectSubmitConfig) {
if !direct_config.agave_rebroadcast_enabled
|| direct_config.agave_rebroadcast_window.is_zero()
{
return;
}
let Some(direct_transport) = self.direct_transport.clone() else {
return;
};
spawn_agave_rebroadcast_task(
tx_bytes,
direct_transport,
self.leader_provider.clone(),
self.backups.clone(),
self.policy,
direct_config.clone(),
);
}
fn spawn_background_rpc_broadcast(
&self,
tx_bytes: Arc<[u8]>,
signature: Option<SignatureBytes>,
plan: SubmitPlan,
) {
let Some(rpc) = self.rpc_transport.clone() else {
return;
};
let rpc_config = self.rpc_config.clone();
let telemetry = Arc::clone(&self.telemetry);
let reporter = self.outcome_reporter.clone();
let flow_safety_source = self.flow_safety_source.clone();
tokio::spawn(async move {
if let Ok(rpc_signature) = rpc.submit_rpc(tx_bytes.as_ref(), &rpc_config).await {
let outcome = RouteSubmitOutcome {
route: SubmitRoute::Rpc,
direct_target: None,
rpc_signature: Some(rpc_signature),
jito_signature: None,
jito_bundle_id: None,
selected_target_count: 0,
selected_identity_count: 0,
};
record_route_outcome_shared(
&telemetry,
reporter.as_ref(),
flow_safety_source.as_ref(),
signature,
&plan,
&outcome,
);
}
});
}
}
#[derive(Debug, Clone)]
struct RejectionMetadata {
signature: Option<SignatureBytes>,
plan: SubmitPlan,
legacy_mode: Option<SubmitMode>,
state_version: Option<u64>,
opportunity_age_ms: Option<u64>,
}
#[derive(Clone)]
struct RouteTaskContext {
rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
jito_transport: Option<Arc<dyn JitoSubmitTransport>>,
direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
leader_provider: Arc<dyn LeaderProvider>,
backups: Arc<[LeaderTarget]>,
policy: RoutingPolicy,
rpc_config: RpcSubmitConfig,
jito_config: JitoSubmitConfig,
direct_config: DirectSubmitConfig,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum DirectExecutionMode {
Standalone,
Fallback,
}
#[derive(Debug)]
struct RouteSubmitOutcome {
route: SubmitRoute,
direct_target: Option<LeaderTarget>,
rpc_signature: Option<String>,
jito_signature: Option<String>,
jito_bundle_id: Option<String>,
selected_target_count: usize,
selected_identity_count: usize,
}
fn record_external_outcome_shared(
telemetry: &Arc<TxToxicFlowTelemetry>,
reporter: Option<&OutcomeReporterHandle>,
outcome: &TxSubmitOutcome,
) {
telemetry.record(outcome);
if let Some(reporter) = reporter {
reporter.dispatch(telemetry, outcome.clone());
}
}
fn record_route_outcome_shared(
telemetry: &Arc<TxToxicFlowTelemetry>,
reporter: Option<&OutcomeReporterHandle>,
flow_safety_source: Option<&Arc<dyn TxFlowSafetySource>>,
signature: Option<SignatureBytes>,
plan: &SubmitPlan,
outcome: &RouteSubmitOutcome,
) {
let kind = match outcome.route {
SubmitRoute::Rpc => TxSubmitOutcomeKind::RpcAccepted,
SubmitRoute::Jito => TxSubmitOutcomeKind::JitoAccepted,
SubmitRoute::Direct => TxSubmitOutcomeKind::DirectAccepted,
};
let outcome = TxSubmitOutcome {
kind,
signature,
route: Some(outcome.route),
plan: plan.clone(),
legacy_mode: plan.legacy_mode(),
rpc_signature: outcome.rpc_signature.clone(),
jito_signature: outcome.jito_signature.clone(),
jito_bundle_id: outcome.jito_bundle_id.clone(),
state_version: flow_safety_source
.and_then(|source| source.toxic_flow_snapshot().current_state_version),
opportunity_age_ms: None,
};
record_external_outcome_shared(telemetry, reporter, &outcome);
}
#[derive(Clone)]
enum OutcomeReporterHandle {
Ready(Arc<OutcomeReporterDispatcher>),
Unavailable,
}
impl OutcomeReporterHandle {
fn new(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Self {
match OutcomeReporterDispatcher::shared(reporter) {
Ok(dispatcher) => Self::Ready(dispatcher),
Err(error) => {
eprintln!("sof-tx: failed to start external outcome reporter worker: {error}");
Self::Unavailable
}
}
}
fn dispatch(&self, telemetry: &Arc<TxToxicFlowTelemetry>, outcome: TxSubmitOutcome) {
match self {
Self::Ready(dispatcher) => match dispatcher.dispatch(outcome) {
ReporterDispatchStatus::Enqueued => {}
ReporterDispatchStatus::DroppedFull => telemetry.record_reporter_drop(),
ReporterDispatchStatus::Unavailable => telemetry.record_reporter_unavailable(),
},
Self::Unavailable => telemetry.record_reporter_unavailable(),
}
}
}
struct OutcomeReporterDispatcher {
tx: SyncSender<TxSubmitOutcome>,
queue_full_warned: AtomicBool,
unavailable_warned: AtomicBool,
}
impl OutcomeReporterDispatcher {
#[cfg(not(test))]
const QUEUE_CAPACITY: usize = 1024;
#[cfg(test)]
const QUEUE_CAPACITY: usize = 8;
fn shared(reporter: Arc<dyn TxSubmitOutcomeReporter>) -> Result<Arc<Self>, std::io::Error> {
let key = reporter_identity(&reporter);
let registry = outcome_reporter_registry();
let mut registry = registry
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
registry.retain(|_key, dispatcher| dispatcher.strong_count() > 0);
if let Some(existing) = registry.get(&key).and_then(Weak::upgrade) {
return Ok(existing);
}
let (tx, rx) = std_mpsc::sync_channel::<TxSubmitOutcome>(Self::QUEUE_CAPACITY);
thread::Builder::new()
.name("sof-tx-outcome-reporter".to_owned())
.spawn(move || {
while let Ok(outcome) = rx.recv() {
drop(catch_unwind(AssertUnwindSafe(|| {
reporter.record_outcome(&outcome);
})));
}
})?;
let dispatcher = Arc::new(Self {
tx,
queue_full_warned: AtomicBool::new(false),
unavailable_warned: AtomicBool::new(false),
});
let _ = registry.insert(key, Arc::downgrade(&dispatcher));
Ok(dispatcher)
}
fn dispatch(&self, outcome: TxSubmitOutcome) -> ReporterDispatchStatus {
match self.tx.try_send(outcome) {
Ok(()) => {
self.queue_full_warned.store(false, Ordering::Relaxed);
ReporterDispatchStatus::Enqueued
}
Err(TrySendError::Disconnected(_)) => {
if !self.unavailable_warned.swap(true, Ordering::Relaxed) {
eprintln!(
"sof-tx: external outcome reporter worker stopped; dropping reporter outcomes"
);
}
ReporterDispatchStatus::Unavailable
}
Err(TrySendError::Full(_)) => {
if !self.queue_full_warned.swap(true, Ordering::Relaxed) {
eprintln!(
"sof-tx: external outcome reporter queue is full; dropping reporter outcomes until it drains"
);
}
ReporterDispatchStatus::DroppedFull
}
}
}
}
enum ReporterDispatchStatus {
Enqueued,
DroppedFull,
Unavailable,
}
static OUTCOME_REPORTER_REGISTRY: OnceLock<Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>>> =
OnceLock::new();
fn outcome_reporter_registry() -> &'static Mutex<HashMap<usize, Weak<OutcomeReporterDispatcher>>> {
OUTCOME_REPORTER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
fn reporter_identity(reporter: &Arc<dyn TxSubmitOutcomeReporter>) -> usize {
Arc::as_ptr(reporter) as *const () as usize
}
fn extract_first_signature(tx_bytes: &[u8]) -> Result<Option<SignatureBytes>, SubmitError> {
let Some((signature_count, offset)) = decode_short_u16_len_prefix(tx_bytes) else {
return Err(decode_signed_bytes_error(
"transaction bytes did not contain a valid signature vector prefix",
));
};
if signature_count == 0 {
return Ok(None);
}
let signature_end = offset.saturating_add(64);
let Some(signature_bytes) = tx_bytes.get(offset..signature_end) else {
return Err(decode_signed_bytes_error(
"transaction bytes ended before the first signature completed",
));
};
let mut signature = [0_u8; 64];
signature.copy_from_slice(signature_bytes);
Ok(Some(SignatureBytes::new(signature)))
}
fn decode_signed_bytes_error(message: &'static str) -> SubmitError {
SubmitError::DecodeSignedBytes {
source: Box::new(bincode::ErrorKind::Custom(message.to_owned())),
}
}
async fn submit_one_route_task(
route: SubmitRoute,
tx_bytes: Arc<[u8]>,
task_context: RouteTaskContext,
direct_mode: DirectExecutionMode,
) -> Result<RouteSubmitOutcome, SubmitError> {
match route {
SubmitRoute::Rpc => {
let rpc = task_context
.rpc_transport
.ok_or(SubmitError::MissingRpcTransport)?;
let rpc_signature = rpc
.submit_rpc(tx_bytes.as_ref(), &task_context.rpc_config)
.await
.map_err(|source| SubmitError::Rpc { source })?;
Ok(RouteSubmitOutcome {
route,
direct_target: None,
rpc_signature: Some(rpc_signature),
jito_signature: None,
jito_bundle_id: None,
selected_target_count: 0,
selected_identity_count: 0,
})
}
SubmitRoute::Jito => {
let jito = task_context
.jito_transport
.ok_or(SubmitError::MissingJitoTransport)?;
let response = jito
.submit_jito(tx_bytes.as_ref(), &task_context.jito_config)
.await
.map_err(|source| SubmitError::Jito { source })?;
Ok(RouteSubmitOutcome {
route,
direct_target: None,
rpc_signature: None,
jito_signature: response.transaction_signature,
jito_bundle_id: response.bundle_id,
selected_target_count: 0,
selected_identity_count: 0,
})
}
SubmitRoute::Direct => {
let direct = task_context
.direct_transport
.ok_or(SubmitError::MissingDirectTransport)?;
let attempt_timeout = direct_attempt_timeout(&task_context.direct_config);
let attempt_count = match direct_mode {
DirectExecutionMode::Standalone => {
task_context.direct_config.direct_submit_attempts
}
DirectExecutionMode::Fallback => task_context.direct_config.hybrid_direct_attempts,
};
let mut last_error = None;
for attempt_idx in 0..attempt_count {
let mut targets = select_and_rank_targets(
task_context.leader_provider.as_ref(),
task_context.backups.as_ref(),
task_context.policy,
&task_context.direct_config,
)
.await;
rotate_targets_for_attempt(&mut targets, attempt_idx, task_context.policy);
let (selected_target_count, selected_identity_count) = summarize_targets(&targets);
if targets.is_empty() {
if matches!(direct_mode, DirectExecutionMode::Fallback) {
break;
}
return Err(SubmitError::NoDirectTargets);
}
match timeout(
attempt_timeout,
direct.submit_direct(
tx_bytes.as_ref(),
&targets,
task_context.policy,
&task_context.direct_config,
),
)
.await
{
Ok(Ok(target)) => {
return Ok(RouteSubmitOutcome {
route,
direct_target: Some(target),
rpc_signature: None,
jito_signature: None,
jito_bundle_id: None,
selected_target_count,
selected_identity_count,
});
}
Ok(Err(source)) => last_error = Some(source),
Err(_elapsed) => {
last_error = Some(SubmitTransportError::Failure {
message: format!(
"direct submit attempt timed out after {}ms",
attempt_timeout.as_millis()
),
});
}
}
if attempt_idx < attempt_count.saturating_sub(1) {
sleep(task_context.direct_config.rebroadcast_interval).await;
}
}
Err(SubmitError::Direct {
source: last_error.unwrap_or_else(|| SubmitTransportError::Failure {
message: "direct submit attempts exhausted".to_owned(),
}),
})
}
}
}
#[cfg(not(test))]
fn spawn_agave_rebroadcast_task(
tx_bytes: Arc<[u8]>,
direct_transport: Arc<dyn DirectSubmitTransport>,
leader_provider: Arc<dyn LeaderProvider>,
backups: Vec<LeaderTarget>,
policy: RoutingPolicy,
direct_config: DirectSubmitConfig,
) {
tokio::spawn(async move {
let deadline = Instant::now()
.checked_add(direct_config.agave_rebroadcast_window)
.unwrap_or_else(Instant::now);
loop {
let now = Instant::now();
if now >= deadline {
break;
}
let sleep_for = deadline
.saturating_duration_since(now)
.min(direct_config.agave_rebroadcast_interval);
if !sleep_for.is_zero() {
sleep(sleep_for).await;
}
if Instant::now() >= deadline {
break;
}
let targets = select_and_rank_targets(
leader_provider.as_ref(),
backups.as_slice(),
policy,
&direct_config,
)
.await;
if targets.is_empty() {
continue;
}
drop(
timeout(
direct_attempt_timeout(&direct_config),
direct_transport.submit_direct(
tx_bytes.as_ref(),
&targets,
policy,
&direct_config,
),
)
.await,
);
}
});
}
#[cfg(test)]
fn spawn_agave_rebroadcast_task(
_tx_bytes: Arc<[u8]>,
_direct_transport: Arc<dyn DirectSubmitTransport>,
_leader_provider: Arc<dyn LeaderProvider>,
_backups: Vec<LeaderTarget>,
_policy: RoutingPolicy,
_direct_config: DirectSubmitConfig,
) {
}
async fn select_and_rank_targets(
leader_provider: &(impl LeaderProvider + ?Sized),
backups: &[LeaderTarget],
policy: RoutingPolicy,
direct_config: &DirectSubmitConfig,
) -> Vec<LeaderTarget> {
let targets = select_targets(leader_provider, backups, policy);
rank_targets_by_latency(targets, direct_config).await
}
async fn rank_targets_by_latency(
targets: Vec<LeaderTarget>,
direct_config: &DirectSubmitConfig,
) -> Vec<LeaderTarget> {
if targets.len() <= 1 || !direct_config.latency_aware_targeting {
return targets;
}
let probe_timeout = direct_config.latency_probe_timeout;
let probe_port = direct_config.latency_probe_port;
let probe_count = targets
.len()
.min(direct_config.latency_probe_max_targets.max(1));
let mut latencies = vec![None; probe_count];
let mut probes = JoinSet::new();
for (idx, target) in targets.iter().take(probe_count).cloned().enumerate() {
probes.spawn(async move {
(
idx,
probe_target_latency(&target, probe_port, probe_timeout).await,
)
});
}
while let Some(result) = probes.join_next().await {
if let Ok((idx, latency)) = result
&& idx < latencies.len()
&& let Some(slot) = latencies.get_mut(idx)
{
*slot = latency;
}
}
let mut ranked = targets
.iter()
.take(probe_count)
.cloned()
.enumerate()
.collect::<Vec<_>>();
ranked.sort_by_key(|(idx, _target)| {
(
latencies.get(*idx).copied().flatten().unwrap_or(u128::MAX),
*idx,
)
});
let mut output = ranked
.into_iter()
.map(|(_idx, target)| target)
.collect::<Vec<_>>();
output.extend(targets.iter().skip(probe_count).cloned());
output
}
async fn probe_target_latency(
target: &LeaderTarget,
probe_port: Option<u16>,
probe_timeout: Duration,
) -> Option<u128> {
let mut ports = vec![target.tpu_addr.port()];
if let Some(port) = probe_port
&& port != target.tpu_addr.port()
{
ports.push(port);
}
let ip = target.tpu_addr.ip();
let mut best = None::<u128>;
for port in ports {
if let Some(latency) = probe_tcp_latency(ip, port, probe_timeout).await {
best = Some(best.map_or(latency, |current| current.min(latency)));
}
}
best
}
async fn probe_tcp_latency(
ip: std::net::IpAddr,
port: u16,
timeout_duration: Duration,
) -> Option<u128> {
let start = Instant::now();
let addr = SocketAddr::new(ip, port);
let stream = timeout(timeout_duration, TcpStream::connect(addr))
.await
.ok()?
.ok()?;
drop(stream);
Some(start.elapsed().as_millis())
}
fn summarize_targets(targets: &[LeaderTarget]) -> (usize, usize) {
let selected_target_count = targets.len();
let selected_identity_count = targets
.iter()
.filter_map(|target| target.identity)
.collect::<HashSet<_>>()
.len();
(selected_target_count, selected_identity_count)
}
fn rotate_targets_for_attempt(
targets: &mut [LeaderTarget],
attempt_idx: usize,
policy: RoutingPolicy,
) {
if attempt_idx == 0 || targets.len() <= 1 {
return;
}
let normalized = policy.normalized();
let stride = normalized.max_parallel_sends.max(1);
let rotation = attempt_idx
.saturating_mul(stride)
.checked_rem(targets.len())
.unwrap_or(0);
if rotation > 0 {
targets.rotate_left(rotation);
}
}
fn direct_attempt_timeout(direct_config: &DirectSubmitConfig) -> Duration {
direct_config
.global_timeout
.saturating_add(direct_config.per_target_timeout)
.saturating_add(direct_config.rebroadcast_interval)
.max(Duration::from_secs(8))
}
#[cfg(test)]
#[allow(clippy::panic)]
mod tests {
use super::*;
#[derive(Debug)]
struct NoopOutcomeReporter;
impl TxSubmitOutcomeReporter for NoopOutcomeReporter {
fn record_outcome(&self, _outcome: &TxSubmitOutcome) {}
}
#[test]
fn reporter_dispatcher_reuses_existing_instance() {
let reporter: Arc<dyn TxSubmitOutcomeReporter> = Arc::new(NoopOutcomeReporter);
let first = match OutcomeReporterDispatcher::shared(Arc::clone(&reporter)) {
Ok(dispatcher) => dispatcher,
Err(error) => panic!("first dispatcher failed: {error}"),
};
let second = match OutcomeReporterDispatcher::shared(reporter) {
Ok(dispatcher) => dispatcher,
Err(error) => panic!("second dispatcher failed: {error}"),
};
assert!(Arc::ptr_eq(&first, &second));
}
}