1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 JitoOnly,
27 DirectOnly,
29 Hybrid,
31}
32
33#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36 LowLatency,
38 #[default]
40 Balanced,
41 HighReliability,
43}
44
45#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48 VersionedTransactionBytes(Vec<u8>),
50 WireTransactionBytes(Vec<u8>),
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57 pub skip_preflight: bool,
59 pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64 fn default() -> Self {
65 Self {
66 skip_preflight: true,
67 preflight_commitment: None,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75 pub bundle_only: bool,
77}
78
79#[derive(Debug, Clone, Eq, PartialEq)]
81pub struct DirectSubmitConfig {
82 pub per_target_timeout: Duration,
84 pub global_timeout: Duration,
86 pub direct_target_rounds: usize,
88 pub direct_submit_attempts: usize,
90 pub hybrid_direct_attempts: usize,
92 pub rebroadcast_interval: Duration,
94 pub agave_rebroadcast_enabled: bool,
96 pub agave_rebroadcast_window: Duration,
98 pub agave_rebroadcast_interval: Duration,
100 pub hybrid_rpc_broadcast: bool,
102 pub latency_aware_targeting: bool,
104 pub latency_probe_timeout: Duration,
106 pub latency_probe_port: Option<u16>,
108 pub latency_probe_max_targets: usize,
110}
111
112impl DirectSubmitConfig {
113 #[must_use]
115 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
116 match reliability {
117 SubmitReliability::LowLatency => Self {
118 per_target_timeout: Duration::from_millis(200),
119 global_timeout: Duration::from_millis(1_200),
120 direct_target_rounds: 3,
121 direct_submit_attempts: 3,
122 hybrid_direct_attempts: 2,
123 rebroadcast_interval: Duration::from_millis(90),
124 agave_rebroadcast_enabled: true,
125 agave_rebroadcast_window: Duration::from_secs(30),
126 agave_rebroadcast_interval: Duration::from_millis(700),
127 hybrid_rpc_broadcast: false,
128 latency_aware_targeting: true,
129 latency_probe_timeout: Duration::from_millis(80),
130 latency_probe_port: Some(8899),
131 latency_probe_max_targets: 128,
132 },
133 SubmitReliability::Balanced => Self {
134 per_target_timeout: Duration::from_millis(300),
135 global_timeout: Duration::from_millis(1_800),
136 direct_target_rounds: 4,
137 direct_submit_attempts: 4,
138 hybrid_direct_attempts: 3,
139 rebroadcast_interval: Duration::from_millis(110),
140 agave_rebroadcast_enabled: true,
141 agave_rebroadcast_window: Duration::from_secs(45),
142 agave_rebroadcast_interval: Duration::from_millis(800),
143 hybrid_rpc_broadcast: true,
144 latency_aware_targeting: true,
145 latency_probe_timeout: Duration::from_millis(120),
146 latency_probe_port: Some(8899),
147 latency_probe_max_targets: 128,
148 },
149 SubmitReliability::HighReliability => Self {
150 per_target_timeout: Duration::from_millis(450),
151 global_timeout: Duration::from_millis(3_200),
152 direct_target_rounds: 6,
153 direct_submit_attempts: 5,
154 hybrid_direct_attempts: 4,
155 rebroadcast_interval: Duration::from_millis(140),
156 agave_rebroadcast_enabled: true,
157 agave_rebroadcast_window: Duration::from_secs(70),
158 agave_rebroadcast_interval: Duration::from_millis(900),
159 hybrid_rpc_broadcast: true,
160 latency_aware_targeting: true,
161 latency_probe_timeout: Duration::from_millis(160),
162 latency_probe_port: Some(8899),
163 latency_probe_max_targets: 128,
164 },
165 }
166 }
167
168 #[must_use]
170 pub const fn normalized(self) -> Self {
171 let direct_target_rounds = if self.direct_target_rounds == 0 {
172 1
173 } else {
174 self.direct_target_rounds
175 };
176 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
177 1
178 } else {
179 self.direct_submit_attempts
180 };
181 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
182 1
183 } else {
184 self.hybrid_direct_attempts
185 };
186 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
187 1
188 } else {
189 self.latency_probe_max_targets
190 };
191 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
192 Duration::from_millis(1)
193 } else {
194 self.rebroadcast_interval
195 };
196 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
197 Duration::from_millis(1)
198 } else {
199 self.agave_rebroadcast_interval
200 };
201 Self {
202 per_target_timeout: self.per_target_timeout,
203 global_timeout: self.global_timeout,
204 direct_target_rounds,
205 direct_submit_attempts,
206 hybrid_direct_attempts,
207 rebroadcast_interval,
208 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
209 agave_rebroadcast_window: self.agave_rebroadcast_window,
210 agave_rebroadcast_interval,
211 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
212 latency_aware_targeting: self.latency_aware_targeting,
213 latency_probe_timeout: self.latency_probe_timeout,
214 latency_probe_port: self.latency_probe_port,
215 latency_probe_max_targets,
216 }
217 }
218}
219
220impl Default for DirectSubmitConfig {
221 fn default() -> Self {
222 Self::from_reliability(SubmitReliability::default())
223 }
224}
225
226#[derive(Debug, Error, Clone, Eq, PartialEq)]
228pub enum SubmitTransportError {
229 #[error("transport configuration invalid: {message}")]
231 Config {
232 message: String,
234 },
235 #[error("transport failure: {message}")]
237 Failure {
238 message: String,
240 },
241}
242
243#[derive(Debug, Error)]
245pub enum SubmitError {
246 #[error("failed to build/sign transaction: {source}")]
248 Build {
249 source: BuilderError,
251 },
252 #[error("blockhash provider returned no recent blockhash")]
254 MissingRecentBlockhash,
255 #[error("failed to decode signed transaction bytes: {source}")]
257 DecodeSignedBytes {
258 source: Box<bincode::ErrorKind>,
260 },
261 #[error("duplicate signature suppressed by dedupe window")]
263 DuplicateSignature,
264 #[error("rpc transport is not configured")]
266 MissingRpcTransport,
267 #[error("jito transport is not configured")]
269 MissingJitoTransport,
270 #[error("direct transport is not configured")]
272 MissingDirectTransport,
273 #[error("no direct targets resolved from leader/backups")]
275 NoDirectTargets,
276 #[error("direct submit failed: {source}")]
278 Direct {
279 source: SubmitTransportError,
281 },
282 #[error("rpc submit failed: {source}")]
284 Rpc {
285 source: SubmitTransportError,
287 },
288 #[error("jito submit failed: {source}")]
290 Jito {
291 source: SubmitTransportError,
293 },
294 #[error("internal synchronization failure: {message}")]
296 InternalSync {
297 message: String,
299 },
300 #[error("submission rejected by toxic-flow guard: {reason}")]
302 ToxicFlow {
303 reason: TxToxicFlowRejectionReason,
305 },
306}
307
308#[derive(Debug, Clone, Eq, PartialEq)]
310pub struct SubmitResult {
311 pub signature: Option<Signature>,
313 pub mode: SubmitMode,
315 pub direct_target: Option<LeaderTarget>,
317 pub rpc_signature: Option<String>,
319 pub jito_signature: Option<String>,
321 pub used_rpc_fallback: bool,
323 pub selected_target_count: usize,
325 pub selected_identity_count: usize,
327}
328
329#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
331pub enum TxFlowSafetyQuality {
332 Stable,
334 Provisional,
336 ReorgRisk,
338 Stale,
340 Degraded,
342 IncompleteControlPlane,
344}
345
346#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
348pub enum TxFlowSafetyIssue {
349 ReplayRecoveryPending,
351 MissingControlPlane,
353 StaleControlPlane,
355 DegradedControlPlane,
357 ReorgRisk,
359 Provisional,
361}
362
363#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
365pub struct TxFlowSafetySnapshot {
366 pub quality: TxFlowSafetyQuality,
368 pub issues: Vec<TxFlowSafetyIssue>,
370 pub current_state_version: Option<u64>,
372 pub replay_recovery_pending: bool,
374}
375
376impl TxFlowSafetySnapshot {
377 #[must_use]
379 pub const fn is_safe(&self) -> bool {
380 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
381 }
382}
383
384pub trait TxFlowSafetySource: Send + Sync {
386 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
388}
389
390#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
392pub enum TxSubmitSuppressionKey {
393 Signature(Signature),
395 Opportunity([u8; 32]),
397 AccountSet([u8; 32]),
399 SlotWindow {
401 slot: u64,
403 window: u64,
405 },
406}
407
408impl Hash for TxSubmitSuppressionKey {
409 fn hash<H: Hasher>(&self, state: &mut H) {
410 match self {
411 Self::Signature(signature) => {
412 0_u8.hash(state);
413 signature.as_array().hash(state);
414 }
415 Self::Opportunity(key) => {
416 1_u8.hash(state);
417 key.hash(state);
418 }
419 Self::AccountSet(key) => {
420 2_u8.hash(state);
421 key.hash(state);
422 }
423 Self::SlotWindow { slot, window } => {
424 3_u8.hash(state);
425 slot.hash(state);
426 window.hash(state);
427 }
428 }
429 }
430}
431
432#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
434pub struct TxSubmitContext {
435 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
437 pub decision_state_version: Option<u64>,
439 pub opportunity_created_at: Option<SystemTime>,
441}
442
443#[derive(Debug, Clone, Eq, PartialEq)]
445pub struct TxSubmitGuardPolicy {
446 pub require_stable_control_plane: bool,
448 pub reject_on_replay_recovery_pending: bool,
450 pub max_state_version_drift: Option<u64>,
452 pub max_opportunity_age: Option<Duration>,
454 pub suppression_ttl: Duration,
456}
457
458impl Default for TxSubmitGuardPolicy {
459 fn default() -> Self {
460 Self {
461 require_stable_control_plane: true,
462 reject_on_replay_recovery_pending: true,
463 max_state_version_drift: Some(4),
464 max_opportunity_age: Some(Duration::from_millis(750)),
465 suppression_ttl: Duration::from_millis(750),
466 }
467 }
468}
469
470#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
472pub enum TxToxicFlowRejectionReason {
473 #[error("control-plane quality {quality:?} is not safe for submit")]
475 UnsafeControlPlane {
476 quality: TxFlowSafetyQuality,
478 },
479 #[error("submit source is still recovering replay continuity")]
481 ReplayRecoveryPending,
482 #[error("submit suppressed by active key")]
484 Suppressed,
485 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
487 StateDrift {
488 drift: u64,
490 max_allowed: u64,
492 },
493 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
495 OpportunityStale {
496 age_ms: u64,
498 max_allowed_ms: u64,
500 },
501}
502
503#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
505pub enum TxSubmitOutcomeKind {
506 DirectAccepted,
508 RpcAccepted,
510 JitoAccepted,
512 Landed,
514 Expired,
516 Dropped,
518 LeaderMissed,
520 BlockhashStale,
522 UnhealthyRoute,
524 RejectedDueToStaleness,
526 RejectedDueToReorgRisk,
528 RejectedDueToStateDrift,
530 RejectedDueToReplayRecovery,
532 Suppressed,
534}
535
536#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
538pub struct TxSubmitOutcome {
539 pub kind: TxSubmitOutcomeKind,
541 pub signature: Option<Signature>,
543 pub mode: SubmitMode,
545 pub state_version: Option<u64>,
547 pub opportunity_age_ms: Option<u64>,
549}
550
551pub trait TxSubmitOutcomeReporter: Send + Sync {
553 fn record_outcome(&self, outcome: &TxSubmitOutcome);
555}
556
557#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
559pub struct TxToxicFlowTelemetrySnapshot {
560 pub rejected_due_to_staleness: u64,
562 pub rejected_due_to_reorg_risk: u64,
564 pub rejected_due_to_state_drift: u64,
566 pub submit_on_stale_blockhash: u64,
568 pub leader_route_miss_rate: u64,
570 pub opportunity_age_at_send_ms: Option<u64>,
572 pub rejected_due_to_replay_recovery: u64,
574 pub suppressed_submissions: u64,
576}
577
578#[derive(Debug, Default)]
580#[repr(align(64))]
581struct CacheAlignedAtomicU64(AtomicU64);
582
583impl CacheAlignedAtomicU64 {
584 fn load(&self, ordering: Ordering) -> u64 {
586 self.0.load(ordering)
587 }
588
589 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
591 self.0.swap(value, ordering)
592 }
593
594 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
596 self.0.fetch_add(value, ordering)
597 }
598}
599
600#[derive(Debug, Default)]
602pub struct TxToxicFlowTelemetry {
603 rejected_due_to_staleness: CacheAlignedAtomicU64,
605 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
607 rejected_due_to_state_drift: CacheAlignedAtomicU64,
609 submit_on_stale_blockhash: CacheAlignedAtomicU64,
611 leader_route_miss_rate: CacheAlignedAtomicU64,
613 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
615 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
617 suppressed_submissions: CacheAlignedAtomicU64,
619}
620
621impl TxToxicFlowTelemetry {
622 #[must_use]
624 pub fn shared() -> Arc<Self> {
625 Arc::new(Self::default())
626 }
627
628 pub fn record(&self, outcome: &TxSubmitOutcome) {
630 if let Some(age_ms) = outcome.opportunity_age_ms {
631 let _ = self
632 .opportunity_age_at_send_ms
633 .swap(age_ms, Ordering::Relaxed);
634 }
635 match outcome.kind {
636 TxSubmitOutcomeKind::RejectedDueToStaleness => {
637 let _ = self
638 .rejected_due_to_staleness
639 .fetch_add(1, Ordering::Relaxed);
640 }
641 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
642 let _ = self
643 .rejected_due_to_reorg_risk
644 .fetch_add(1, Ordering::Relaxed);
645 }
646 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
647 let _ = self
648 .rejected_due_to_state_drift
649 .fetch_add(1, Ordering::Relaxed);
650 }
651 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
652 let _ = self
653 .rejected_due_to_replay_recovery
654 .fetch_add(1, Ordering::Relaxed);
655 }
656 TxSubmitOutcomeKind::Suppressed => {
657 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
658 }
659 TxSubmitOutcomeKind::LeaderMissed => {
660 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
661 }
662 TxSubmitOutcomeKind::DirectAccepted
663 | TxSubmitOutcomeKind::RpcAccepted
664 | TxSubmitOutcomeKind::JitoAccepted
665 | TxSubmitOutcomeKind::Landed
666 | TxSubmitOutcomeKind::Expired
667 | TxSubmitOutcomeKind::Dropped
668 | TxSubmitOutcomeKind::UnhealthyRoute => {}
669 TxSubmitOutcomeKind::BlockhashStale => {
670 let _ = self
671 .submit_on_stale_blockhash
672 .fetch_add(1, Ordering::Relaxed);
673 }
674 }
675 }
676
677 #[must_use]
679 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
680 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
681 TxToxicFlowTelemetrySnapshot {
682 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
683 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
684 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
685 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
686 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
687 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
688 rejected_due_to_replay_recovery: self
689 .rejected_due_to_replay_recovery
690 .load(Ordering::Relaxed),
691 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
692 }
693 }
694}
695
696impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
697 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
698 self.record(outcome);
699 }
700}
701
702#[derive(Debug, Default)]
704pub(crate) struct TxSuppressionCache {
705 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
707}
708
709impl TxSuppressionCache {
710 pub(crate) fn is_suppressed(
712 &mut self,
713 keys: &[TxSubmitSuppressionKey],
714 now: SystemTime,
715 ttl: Duration,
716 ) -> bool {
717 self.evict_expired(now, ttl);
718 keys.iter().any(|key| self.entries.contains_key(key))
719 }
720
721 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
723 for key in keys {
724 let _ = self.entries.insert(key.clone(), now);
725 }
726 }
727
728 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
730 self.entries.retain(|_, inserted_at| {
731 now.duration_since(*inserted_at)
732 .map(|elapsed| elapsed <= ttl)
733 .unwrap_or(false)
734 });
735 }
736}
737
738#[async_trait]
740pub trait RpcSubmitTransport: Send + Sync {
741 async fn submit_rpc(
743 &self,
744 tx_bytes: &[u8],
745 config: &RpcSubmitConfig,
746 ) -> Result<String, SubmitTransportError>;
747}
748
749#[async_trait]
751pub trait JitoSubmitTransport: Send + Sync {
752 async fn submit_jito(
754 &self,
755 tx_bytes: &[u8],
756 config: &JitoSubmitConfig,
757 ) -> Result<String, SubmitTransportError>;
758}
759
760#[async_trait]
762pub trait DirectSubmitTransport: Send + Sync {
763 async fn submit_direct(
765 &self,
766 tx_bytes: &[u8],
767 targets: &[LeaderTarget],
768 policy: RoutingPolicy,
769 config: &DirectSubmitConfig,
770 ) -> Result<LeaderTarget, SubmitTransportError>;
771}