1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 JitoOnly,
27 DirectOnly,
29 Hybrid,
31}
32
33#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36 LowLatency,
38 #[default]
40 Balanced,
41 HighReliability,
43}
44
45#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48 VersionedTransactionBytes(Vec<u8>),
50 WireTransactionBytes(Vec<u8>),
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57 pub skip_preflight: bool,
59 pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64 fn default() -> Self {
65 Self {
66 skip_preflight: true,
67 preflight_commitment: None,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75 pub bundle_only: bool,
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Default)]
84pub struct JitoSubmitResponse {
85 pub transaction_signature: Option<String>,
87 pub bundle_id: Option<String>,
89}
90
91#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct DirectSubmitConfig {
94 pub per_target_timeout: Duration,
96 pub global_timeout: Duration,
98 pub direct_target_rounds: usize,
100 pub direct_submit_attempts: usize,
102 pub hybrid_direct_attempts: usize,
104 pub rebroadcast_interval: Duration,
106 pub agave_rebroadcast_enabled: bool,
108 pub agave_rebroadcast_window: Duration,
110 pub agave_rebroadcast_interval: Duration,
112 pub hybrid_rpc_broadcast: bool,
114 pub latency_aware_targeting: bool,
116 pub latency_probe_timeout: Duration,
118 pub latency_probe_port: Option<u16>,
120 pub latency_probe_max_targets: usize,
122}
123
124impl DirectSubmitConfig {
125 #[must_use]
127 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
128 match reliability {
129 SubmitReliability::LowLatency => Self {
130 per_target_timeout: Duration::from_millis(200),
131 global_timeout: Duration::from_millis(1_200),
132 direct_target_rounds: 3,
133 direct_submit_attempts: 3,
134 hybrid_direct_attempts: 2,
135 rebroadcast_interval: Duration::from_millis(90),
136 agave_rebroadcast_enabled: true,
137 agave_rebroadcast_window: Duration::from_secs(30),
138 agave_rebroadcast_interval: Duration::from_millis(700),
139 hybrid_rpc_broadcast: false,
140 latency_aware_targeting: true,
141 latency_probe_timeout: Duration::from_millis(80),
142 latency_probe_port: Some(8899),
143 latency_probe_max_targets: 128,
144 },
145 SubmitReliability::Balanced => Self {
146 per_target_timeout: Duration::from_millis(300),
147 global_timeout: Duration::from_millis(1_800),
148 direct_target_rounds: 4,
149 direct_submit_attempts: 4,
150 hybrid_direct_attempts: 3,
151 rebroadcast_interval: Duration::from_millis(110),
152 agave_rebroadcast_enabled: true,
153 agave_rebroadcast_window: Duration::from_secs(45),
154 agave_rebroadcast_interval: Duration::from_millis(800),
155 hybrid_rpc_broadcast: true,
156 latency_aware_targeting: true,
157 latency_probe_timeout: Duration::from_millis(120),
158 latency_probe_port: Some(8899),
159 latency_probe_max_targets: 128,
160 },
161 SubmitReliability::HighReliability => Self {
162 per_target_timeout: Duration::from_millis(450),
163 global_timeout: Duration::from_millis(3_200),
164 direct_target_rounds: 6,
165 direct_submit_attempts: 5,
166 hybrid_direct_attempts: 4,
167 rebroadcast_interval: Duration::from_millis(140),
168 agave_rebroadcast_enabled: true,
169 agave_rebroadcast_window: Duration::from_secs(70),
170 agave_rebroadcast_interval: Duration::from_millis(900),
171 hybrid_rpc_broadcast: true,
172 latency_aware_targeting: true,
173 latency_probe_timeout: Duration::from_millis(160),
174 latency_probe_port: Some(8899),
175 latency_probe_max_targets: 128,
176 },
177 }
178 }
179
180 #[must_use]
182 pub const fn normalized(self) -> Self {
183 let direct_target_rounds = if self.direct_target_rounds == 0 {
184 1
185 } else {
186 self.direct_target_rounds
187 };
188 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
189 1
190 } else {
191 self.direct_submit_attempts
192 };
193 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
194 1
195 } else {
196 self.hybrid_direct_attempts
197 };
198 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
199 1
200 } else {
201 self.latency_probe_max_targets
202 };
203 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
204 Duration::from_millis(1)
205 } else {
206 self.rebroadcast_interval
207 };
208 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
209 Duration::from_millis(1)
210 } else {
211 self.agave_rebroadcast_interval
212 };
213 Self {
214 per_target_timeout: self.per_target_timeout,
215 global_timeout: self.global_timeout,
216 direct_target_rounds,
217 direct_submit_attempts,
218 hybrid_direct_attempts,
219 rebroadcast_interval,
220 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
221 agave_rebroadcast_window: self.agave_rebroadcast_window,
222 agave_rebroadcast_interval,
223 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
224 latency_aware_targeting: self.latency_aware_targeting,
225 latency_probe_timeout: self.latency_probe_timeout,
226 latency_probe_port: self.latency_probe_port,
227 latency_probe_max_targets,
228 }
229 }
230}
231
232impl Default for DirectSubmitConfig {
233 fn default() -> Self {
234 Self::from_reliability(SubmitReliability::default())
235 }
236}
237
238#[derive(Debug, Error, Clone, Eq, PartialEq)]
240pub enum SubmitTransportError {
241 #[error("transport configuration invalid: {message}")]
243 Config {
244 message: String,
246 },
247 #[error("transport failure: {message}")]
249 Failure {
250 message: String,
252 },
253}
254
255#[derive(Debug, Error)]
257pub enum SubmitError {
258 #[error("failed to build/sign transaction: {source}")]
260 Build {
261 source: BuilderError,
263 },
264 #[error("blockhash provider returned no recent blockhash")]
266 MissingRecentBlockhash,
267 #[error("failed to decode signed transaction bytes: {source}")]
269 DecodeSignedBytes {
270 source: Box<bincode::ErrorKind>,
272 },
273 #[error("duplicate signature suppressed by dedupe window")]
275 DuplicateSignature,
276 #[error("rpc transport is not configured")]
278 MissingRpcTransport,
279 #[error("jito transport is not configured")]
281 MissingJitoTransport,
282 #[error("direct transport is not configured")]
284 MissingDirectTransport,
285 #[error("no direct targets resolved from leader/backups")]
287 NoDirectTargets,
288 #[error("direct submit failed: {source}")]
290 Direct {
291 source: SubmitTransportError,
293 },
294 #[error("rpc submit failed: {source}")]
296 Rpc {
297 source: SubmitTransportError,
299 },
300 #[error("jito submit failed: {source}")]
302 Jito {
303 source: SubmitTransportError,
305 },
306 #[error("internal synchronization failure: {message}")]
308 InternalSync {
309 message: String,
311 },
312 #[error("submission rejected by toxic-flow guard: {reason}")]
314 ToxicFlow {
315 reason: TxToxicFlowRejectionReason,
317 },
318}
319
320#[derive(Debug, Clone, Eq, PartialEq)]
322pub struct SubmitResult {
323 pub signature: Option<Signature>,
325 pub mode: SubmitMode,
327 pub direct_target: Option<LeaderTarget>,
329 pub rpc_signature: Option<String>,
331 pub jito_signature: Option<String>,
333 pub jito_bundle_id: Option<String>,
335 pub used_rpc_fallback: bool,
337 pub selected_target_count: usize,
339 pub selected_identity_count: usize,
341}
342
343#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
345pub enum TxFlowSafetyQuality {
346 Stable,
348 Provisional,
350 ReorgRisk,
352 Stale,
354 Degraded,
356 IncompleteControlPlane,
358}
359
360#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
362pub enum TxFlowSafetyIssue {
363 ReplayRecoveryPending,
365 MissingControlPlane,
367 StaleControlPlane,
369 DegradedControlPlane,
371 ReorgRisk,
373 Provisional,
375}
376
377#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
379pub struct TxFlowSafetySnapshot {
380 pub quality: TxFlowSafetyQuality,
382 pub issues: Vec<TxFlowSafetyIssue>,
384 pub current_state_version: Option<u64>,
386 pub replay_recovery_pending: bool,
388}
389
390impl TxFlowSafetySnapshot {
391 #[must_use]
393 pub const fn is_safe(&self) -> bool {
394 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
395 }
396}
397
398pub trait TxFlowSafetySource: Send + Sync {
400 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
402}
403
404#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
406pub enum TxSubmitSuppressionKey {
407 Signature(Signature),
409 Opportunity([u8; 32]),
411 AccountSet([u8; 32]),
413 SlotWindow {
415 slot: u64,
417 window: u64,
419 },
420}
421
422impl Hash for TxSubmitSuppressionKey {
423 fn hash<H: Hasher>(&self, state: &mut H) {
424 match self {
425 Self::Signature(signature) => {
426 0_u8.hash(state);
427 signature.as_array().hash(state);
428 }
429 Self::Opportunity(key) => {
430 1_u8.hash(state);
431 key.hash(state);
432 }
433 Self::AccountSet(key) => {
434 2_u8.hash(state);
435 key.hash(state);
436 }
437 Self::SlotWindow { slot, window } => {
438 3_u8.hash(state);
439 slot.hash(state);
440 window.hash(state);
441 }
442 }
443 }
444}
445
446#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
448pub struct TxSubmitContext {
449 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
451 pub decision_state_version: Option<u64>,
453 pub opportunity_created_at: Option<SystemTime>,
455}
456
457#[derive(Debug, Clone, Eq, PartialEq)]
459pub struct TxSubmitGuardPolicy {
460 pub require_stable_control_plane: bool,
462 pub reject_on_replay_recovery_pending: bool,
464 pub max_state_version_drift: Option<u64>,
466 pub max_opportunity_age: Option<Duration>,
468 pub suppression_ttl: Duration,
470}
471
472impl Default for TxSubmitGuardPolicy {
473 fn default() -> Self {
474 Self {
475 require_stable_control_plane: true,
476 reject_on_replay_recovery_pending: true,
477 max_state_version_drift: Some(4),
478 max_opportunity_age: Some(Duration::from_millis(750)),
479 suppression_ttl: Duration::from_millis(750),
480 }
481 }
482}
483
484#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
486pub enum TxToxicFlowRejectionReason {
487 #[error("control-plane quality {quality:?} is not safe for submit")]
489 UnsafeControlPlane {
490 quality: TxFlowSafetyQuality,
492 },
493 #[error("submit source is still recovering replay continuity")]
495 ReplayRecoveryPending,
496 #[error("submit suppressed by active key")]
498 Suppressed,
499 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
501 StateDrift {
502 drift: u64,
504 max_allowed: u64,
506 },
507 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
509 OpportunityStale {
510 age_ms: u64,
512 max_allowed_ms: u64,
514 },
515}
516
517#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
519pub enum TxSubmitOutcomeKind {
520 DirectAccepted,
522 RpcAccepted,
524 JitoAccepted,
526 Landed,
528 Expired,
530 Dropped,
532 LeaderMissed,
534 BlockhashStale,
536 UnhealthyRoute,
538 RejectedDueToStaleness,
540 RejectedDueToReorgRisk,
542 RejectedDueToStateDrift,
544 RejectedDueToReplayRecovery,
546 Suppressed,
548}
549
550#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
552pub struct TxSubmitOutcome {
553 pub kind: TxSubmitOutcomeKind,
555 pub signature: Option<Signature>,
557 pub mode: SubmitMode,
559 pub state_version: Option<u64>,
561 pub opportunity_age_ms: Option<u64>,
563}
564
565pub trait TxSubmitOutcomeReporter: Send + Sync {
567 fn record_outcome(&self, outcome: &TxSubmitOutcome);
569}
570
571#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
573pub struct TxToxicFlowTelemetrySnapshot {
574 pub rejected_due_to_staleness: u64,
576 pub rejected_due_to_reorg_risk: u64,
578 pub rejected_due_to_state_drift: u64,
580 pub submit_on_stale_blockhash: u64,
582 pub leader_route_miss_rate: u64,
584 pub opportunity_age_at_send_ms: Option<u64>,
586 pub rejected_due_to_replay_recovery: u64,
588 pub suppressed_submissions: u64,
590}
591
592#[derive(Debug, Default)]
594#[repr(align(64))]
595struct CacheAlignedAtomicU64(AtomicU64);
596
597impl CacheAlignedAtomicU64 {
598 fn load(&self, ordering: Ordering) -> u64 {
600 self.0.load(ordering)
601 }
602
603 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
605 self.0.swap(value, ordering)
606 }
607
608 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
610 self.0.fetch_add(value, ordering)
611 }
612}
613
614#[derive(Debug, Default)]
616pub struct TxToxicFlowTelemetry {
617 rejected_due_to_staleness: CacheAlignedAtomicU64,
619 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
621 rejected_due_to_state_drift: CacheAlignedAtomicU64,
623 submit_on_stale_blockhash: CacheAlignedAtomicU64,
625 leader_route_miss_rate: CacheAlignedAtomicU64,
627 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
629 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
631 suppressed_submissions: CacheAlignedAtomicU64,
633}
634
635impl TxToxicFlowTelemetry {
636 #[must_use]
638 pub fn shared() -> Arc<Self> {
639 Arc::new(Self::default())
640 }
641
642 pub fn record(&self, outcome: &TxSubmitOutcome) {
644 if let Some(age_ms) = outcome.opportunity_age_ms {
645 let _ = self
646 .opportunity_age_at_send_ms
647 .swap(age_ms, Ordering::Relaxed);
648 }
649 match outcome.kind {
650 TxSubmitOutcomeKind::RejectedDueToStaleness => {
651 let _ = self
652 .rejected_due_to_staleness
653 .fetch_add(1, Ordering::Relaxed);
654 }
655 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
656 let _ = self
657 .rejected_due_to_reorg_risk
658 .fetch_add(1, Ordering::Relaxed);
659 }
660 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
661 let _ = self
662 .rejected_due_to_state_drift
663 .fetch_add(1, Ordering::Relaxed);
664 }
665 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
666 let _ = self
667 .rejected_due_to_replay_recovery
668 .fetch_add(1, Ordering::Relaxed);
669 }
670 TxSubmitOutcomeKind::Suppressed => {
671 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
672 }
673 TxSubmitOutcomeKind::LeaderMissed => {
674 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
675 }
676 TxSubmitOutcomeKind::DirectAccepted
677 | TxSubmitOutcomeKind::RpcAccepted
678 | TxSubmitOutcomeKind::JitoAccepted
679 | TxSubmitOutcomeKind::Landed
680 | TxSubmitOutcomeKind::Expired
681 | TxSubmitOutcomeKind::Dropped
682 | TxSubmitOutcomeKind::UnhealthyRoute => {}
683 TxSubmitOutcomeKind::BlockhashStale => {
684 let _ = self
685 .submit_on_stale_blockhash
686 .fetch_add(1, Ordering::Relaxed);
687 }
688 }
689 }
690
691 #[must_use]
693 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
694 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
695 TxToxicFlowTelemetrySnapshot {
696 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
697 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
698 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
699 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
700 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
701 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
702 rejected_due_to_replay_recovery: self
703 .rejected_due_to_replay_recovery
704 .load(Ordering::Relaxed),
705 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
706 }
707 }
708}
709
710impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
711 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
712 self.record(outcome);
713 }
714}
715
716#[derive(Debug, Default)]
718pub(crate) struct TxSuppressionCache {
719 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
721}
722
723impl TxSuppressionCache {
724 pub(crate) fn is_suppressed(
726 &mut self,
727 keys: &[TxSubmitSuppressionKey],
728 now: SystemTime,
729 ttl: Duration,
730 ) -> bool {
731 self.evict_expired(now, ttl);
732 keys.iter().any(|key| self.entries.contains_key(key))
733 }
734
735 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
737 for key in keys {
738 let _ = self.entries.insert(key.clone(), now);
739 }
740 }
741
742 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
744 self.entries.retain(|_, inserted_at| {
745 now.duration_since(*inserted_at)
746 .map(|elapsed| elapsed <= ttl)
747 .unwrap_or(false)
748 });
749 }
750}
751
752#[async_trait]
754pub trait RpcSubmitTransport: Send + Sync {
755 async fn submit_rpc(
757 &self,
758 tx_bytes: &[u8],
759 config: &RpcSubmitConfig,
760 ) -> Result<String, SubmitTransportError>;
761}
762
763#[async_trait]
765pub trait JitoSubmitTransport: Send + Sync {
766 async fn submit_jito(
768 &self,
769 tx_bytes: &[u8],
770 config: &JitoSubmitConfig,
771 ) -> Result<JitoSubmitResponse, SubmitTransportError>;
772}
773
774#[async_trait]
776pub trait DirectSubmitTransport: Send + Sync {
777 async fn submit_direct(
779 &self,
780 tx_bytes: &[u8],
781 targets: &[LeaderTarget],
782 policy: RoutingPolicy,
783 config: &DirectSubmitConfig,
784 ) -> Result<LeaderTarget, SubmitTransportError>;
785}