1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sof_types::SignatureBytes;
16use thiserror::Error;
17
18use crate::{providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 JitoOnly,
27 DirectOnly,
29 Hybrid,
31}
32
33#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
35pub enum SubmitRoute {
36 Rpc,
38 Jito,
40 Direct,
42}
43
44#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
46pub enum SubmitStrategy {
47 #[default]
49 OrderedFallback,
50 AllAtOnce,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
56pub struct SubmitPlan {
57 pub routes: Vec<SubmitRoute>,
59 pub strategy: SubmitStrategy,
61}
62
63impl SubmitPlan {
64 #[must_use]
66 pub fn new(routes: Vec<SubmitRoute>, strategy: SubmitStrategy) -> Self {
67 Self { routes, strategy }.into_normalized()
68 }
69
70 #[must_use]
72 pub fn into_normalized(mut self) -> Self {
73 let mut seen_rpc = false;
74 let mut seen_jito = false;
75 let mut seen_direct = false;
76 self.routes.retain(|route| match route {
77 SubmitRoute::Rpc if seen_rpc => false,
78 SubmitRoute::Rpc => {
79 seen_rpc = true;
80 true
81 }
82 SubmitRoute::Jito if seen_jito => false,
83 SubmitRoute::Jito => {
84 seen_jito = true;
85 true
86 }
87 SubmitRoute::Direct if seen_direct => false,
88 SubmitRoute::Direct => {
89 seen_direct = true;
90 true
91 }
92 });
93 self
94 }
95
96 #[must_use]
98 pub fn normalized(&self) -> Self {
99 self.clone().into_normalized()
100 }
101
102 #[must_use]
104 pub fn rpc_only() -> Self {
105 Self::new(vec![SubmitRoute::Rpc], SubmitStrategy::OrderedFallback)
106 }
107
108 #[must_use]
110 pub fn jito_only() -> Self {
111 Self::new(vec![SubmitRoute::Jito], SubmitStrategy::OrderedFallback)
112 }
113
114 #[must_use]
116 pub fn direct_only() -> Self {
117 Self::new(vec![SubmitRoute::Direct], SubmitStrategy::OrderedFallback)
118 }
119
120 #[must_use]
122 pub fn ordered(routes: Vec<SubmitRoute>) -> Self {
123 Self::new(routes, SubmitStrategy::OrderedFallback)
124 }
125
126 #[must_use]
128 pub fn hybrid() -> Self {
129 Self::ordered(vec![SubmitRoute::Direct, SubmitRoute::Rpc])
130 }
131
132 #[must_use]
134 pub fn all_at_once(routes: Vec<SubmitRoute>) -> Self {
135 Self::new(routes, SubmitStrategy::AllAtOnce)
136 }
137
138 #[must_use]
140 pub fn legacy_mode(&self) -> Option<SubmitMode> {
141 match (self.strategy, self.routes.as_slice()) {
142 (SubmitStrategy::OrderedFallback, [SubmitRoute::Rpc]) => Some(SubmitMode::RpcOnly),
143 (SubmitStrategy::OrderedFallback, [SubmitRoute::Jito]) => Some(SubmitMode::JitoOnly),
144 (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct]) => {
145 Some(SubmitMode::DirectOnly)
146 }
147 (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct, SubmitRoute::Rpc]) => {
148 Some(SubmitMode::Hybrid)
149 }
150 _ => None,
151 }
152 }
153}
154
155impl Default for SubmitPlan {
156 fn default() -> Self {
157 Self::rpc_only()
158 }
159}
160
161impl From<SubmitMode> for SubmitPlan {
162 fn from(value: SubmitMode) -> Self {
163 match value {
164 SubmitMode::RpcOnly => Self::rpc_only(),
165 SubmitMode::JitoOnly => Self::jito_only(),
166 SubmitMode::DirectOnly => Self::direct_only(),
167 SubmitMode::Hybrid => Self::hybrid(),
168 }
169 }
170}
171
172#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
174pub enum SubmitReliability {
175 LowLatency,
177 #[default]
179 Balanced,
180 HighReliability,
182}
183
184#[derive(Debug, Clone, Eq, PartialEq)]
186pub enum SignedTx {
187 VersionedTransactionBytes(Vec<u8>),
189 WireTransactionBytes(Vec<u8>),
191}
192
193#[derive(Debug, Clone, Eq, PartialEq)]
195pub struct RpcSubmitConfig {
196 pub skip_preflight: bool,
198 pub preflight_commitment: Option<String>,
200}
201
202impl Default for RpcSubmitConfig {
203 fn default() -> Self {
204 Self {
205 skip_preflight: true,
206 preflight_commitment: None,
207 }
208 }
209}
210
211#[derive(Debug, Clone, Eq, PartialEq, Default)]
213pub struct JitoSubmitConfig {
214 pub bundle_only: bool,
219}
220
221#[derive(Debug, Clone, Eq, PartialEq, Default)]
223pub struct JitoSubmitResponse {
224 pub transaction_signature: Option<String>,
226 pub bundle_id: Option<String>,
228}
229
230#[derive(Debug, Clone, Eq, PartialEq)]
232pub struct DirectSubmitConfig {
233 pub per_target_timeout: Duration,
235 pub global_timeout: Duration,
237 pub direct_target_rounds: usize,
239 pub direct_submit_attempts: usize,
241 pub hybrid_direct_attempts: usize,
243 pub rebroadcast_interval: Duration,
245 pub agave_rebroadcast_enabled: bool,
247 pub agave_rebroadcast_window: Duration,
249 pub agave_rebroadcast_interval: Duration,
251 pub hybrid_rpc_broadcast: bool,
253 pub latency_aware_targeting: bool,
255 pub latency_probe_timeout: Duration,
257 pub latency_probe_port: Option<u16>,
259 pub latency_probe_max_targets: usize,
261}
262
263impl DirectSubmitConfig {
264 #[must_use]
266 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
267 match reliability {
268 SubmitReliability::LowLatency => Self {
269 per_target_timeout: Duration::from_millis(200),
270 global_timeout: Duration::from_millis(1_200),
271 direct_target_rounds: 3,
272 direct_submit_attempts: 3,
273 hybrid_direct_attempts: 2,
274 rebroadcast_interval: Duration::from_millis(90),
275 agave_rebroadcast_enabled: true,
276 agave_rebroadcast_window: Duration::from_secs(30),
277 agave_rebroadcast_interval: Duration::from_millis(700),
278 hybrid_rpc_broadcast: false,
279 latency_aware_targeting: true,
280 latency_probe_timeout: Duration::from_millis(80),
281 latency_probe_port: Some(8899),
282 latency_probe_max_targets: 128,
283 },
284 SubmitReliability::Balanced => Self {
285 per_target_timeout: Duration::from_millis(300),
286 global_timeout: Duration::from_millis(1_800),
287 direct_target_rounds: 4,
288 direct_submit_attempts: 4,
289 hybrid_direct_attempts: 3,
290 rebroadcast_interval: Duration::from_millis(110),
291 agave_rebroadcast_enabled: true,
292 agave_rebroadcast_window: Duration::from_secs(45),
293 agave_rebroadcast_interval: Duration::from_millis(800),
294 hybrid_rpc_broadcast: true,
295 latency_aware_targeting: true,
296 latency_probe_timeout: Duration::from_millis(120),
297 latency_probe_port: Some(8899),
298 latency_probe_max_targets: 128,
299 },
300 SubmitReliability::HighReliability => Self {
301 per_target_timeout: Duration::from_millis(450),
302 global_timeout: Duration::from_millis(3_200),
303 direct_target_rounds: 6,
304 direct_submit_attempts: 5,
305 hybrid_direct_attempts: 4,
306 rebroadcast_interval: Duration::from_millis(140),
307 agave_rebroadcast_enabled: true,
308 agave_rebroadcast_window: Duration::from_secs(70),
309 agave_rebroadcast_interval: Duration::from_millis(900),
310 hybrid_rpc_broadcast: true,
311 latency_aware_targeting: true,
312 latency_probe_timeout: Duration::from_millis(160),
313 latency_probe_port: Some(8899),
314 latency_probe_max_targets: 128,
315 },
316 }
317 }
318
319 #[must_use]
321 pub const fn normalized(self) -> Self {
322 let direct_target_rounds = if self.direct_target_rounds == 0 {
323 1
324 } else {
325 self.direct_target_rounds
326 };
327 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
328 1
329 } else {
330 self.direct_submit_attempts
331 };
332 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
333 1
334 } else {
335 self.hybrid_direct_attempts
336 };
337 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
338 1
339 } else {
340 self.latency_probe_max_targets
341 };
342 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
343 Duration::from_millis(1)
344 } else {
345 self.rebroadcast_interval
346 };
347 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
348 Duration::from_millis(1)
349 } else {
350 self.agave_rebroadcast_interval
351 };
352 Self {
353 per_target_timeout: self.per_target_timeout,
354 global_timeout: self.global_timeout,
355 direct_target_rounds,
356 direct_submit_attempts,
357 hybrid_direct_attempts,
358 rebroadcast_interval,
359 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
360 agave_rebroadcast_window: self.agave_rebroadcast_window,
361 agave_rebroadcast_interval,
362 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
363 latency_aware_targeting: self.latency_aware_targeting,
364 latency_probe_timeout: self.latency_probe_timeout,
365 latency_probe_port: self.latency_probe_port,
366 latency_probe_max_targets,
367 }
368 }
369}
370
371impl Default for DirectSubmitConfig {
372 fn default() -> Self {
373 Self::from_reliability(SubmitReliability::default())
374 }
375}
376
377#[derive(Debug, Error, Clone, Eq, PartialEq)]
379pub enum SubmitTransportError {
380 #[error("transport configuration invalid: {message}")]
382 Config {
383 message: String,
385 },
386 #[error("transport failure: {message}")]
388 Failure {
389 message: String,
391 },
392}
393
394#[derive(Debug, Error)]
396pub enum SubmitError {
397 #[error("blockhash provider returned no recent blockhash")]
399 MissingRecentBlockhash,
400 #[error("failed to decode signed transaction bytes: {source}")]
402 DecodeSignedBytes {
403 source: Box<bincode::ErrorKind>,
405 },
406 #[error("duplicate signature suppressed by dedupe window")]
408 DuplicateSignature,
409 #[error("rpc transport is not configured")]
411 MissingRpcTransport,
412 #[error("jito transport is not configured")]
414 MissingJitoTransport,
415 #[error("direct transport is not configured")]
417 MissingDirectTransport,
418 #[error("no direct targets resolved from leader/backups")]
420 NoDirectTargets,
421 #[error("direct submit failed: {source}")]
423 Direct {
424 source: SubmitTransportError,
426 },
427 #[error("rpc submit failed: {source}")]
429 Rpc {
430 source: SubmitTransportError,
432 },
433 #[error("jito submit failed: {source}")]
435 Jito {
436 source: SubmitTransportError,
438 },
439 #[error("internal synchronization failure: {message}")]
441 InternalSync {
442 message: String,
444 },
445 #[error("submission rejected by toxic-flow guard: {reason}")]
447 ToxicFlow {
448 reason: TxToxicFlowRejectionReason,
450 },
451}
452
453#[derive(Debug, Clone, Eq, PartialEq)]
455pub struct SubmitResult {
456 pub signature: Option<SignatureBytes>,
458 pub plan: SubmitPlan,
460 pub legacy_mode: Option<SubmitMode>,
462 pub first_success_route: Option<SubmitRoute>,
464 pub successful_routes: Vec<SubmitRoute>,
472 pub direct_target: Option<LeaderTarget>,
474 pub rpc_signature: Option<String>,
476 pub jito_signature: Option<String>,
479 pub jito_bundle_id: Option<String>,
481 pub used_fallback_route: bool,
483 pub selected_target_count: usize,
485 pub selected_identity_count: usize,
487}
488
489#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
491pub enum TxFlowSafetyQuality {
492 Stable,
494 Provisional,
496 ReorgRisk,
498 Stale,
500 Degraded,
502 IncompleteControlPlane,
504}
505
506#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
508pub enum TxFlowSafetyIssue {
509 ReplayRecoveryPending,
511 MissingControlPlane,
513 StaleControlPlane,
515 DegradedControlPlane,
517 ReorgRisk,
519 Provisional,
521}
522
523#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
525pub struct TxFlowSafetySnapshot {
526 pub quality: TxFlowSafetyQuality,
528 pub issues: Vec<TxFlowSafetyIssue>,
530 pub current_state_version: Option<u64>,
532 pub replay_recovery_pending: bool,
534}
535
536impl TxFlowSafetySnapshot {
537 #[must_use]
539 pub const fn is_safe(&self) -> bool {
540 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
541 }
542}
543
544pub trait TxFlowSafetySource: Send + Sync {
546 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
548}
549
550#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
552pub enum TxSubmitSuppressionKey {
553 Signature(SignatureBytes),
555 Opportunity([u8; 32]),
557 AccountSet([u8; 32]),
559 SlotWindow {
561 slot: u64,
563 window: u64,
565 },
566}
567
568impl Hash for TxSubmitSuppressionKey {
569 fn hash<H: Hasher>(&self, state: &mut H) {
570 match self {
571 Self::Signature(signature) => {
572 0_u8.hash(state);
573 signature.as_array().hash(state);
574 }
575 Self::Opportunity(key) => {
576 1_u8.hash(state);
577 key.hash(state);
578 }
579 Self::AccountSet(key) => {
580 2_u8.hash(state);
581 key.hash(state);
582 }
583 Self::SlotWindow { slot, window } => {
584 3_u8.hash(state);
585 slot.hash(state);
586 window.hash(state);
587 }
588 }
589 }
590}
591
592#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
594pub struct TxSubmitContext {
595 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
597 pub decision_state_version: Option<u64>,
599 pub opportunity_created_at: Option<SystemTime>,
601}
602
603#[derive(Debug, Clone, Eq, PartialEq)]
605pub struct TxSubmitGuardPolicy {
606 pub require_stable_control_plane: bool,
608 pub reject_on_replay_recovery_pending: bool,
610 pub max_state_version_drift: Option<u64>,
612 pub max_opportunity_age: Option<Duration>,
614 pub suppression_ttl: Duration,
616}
617
618impl Default for TxSubmitGuardPolicy {
619 fn default() -> Self {
620 Self {
621 require_stable_control_plane: true,
622 reject_on_replay_recovery_pending: true,
623 max_state_version_drift: Some(4),
624 max_opportunity_age: Some(Duration::from_millis(750)),
625 suppression_ttl: Duration::from_millis(750),
626 }
627 }
628}
629
630#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
632pub enum TxToxicFlowRejectionReason {
633 #[error("control-plane quality {quality:?} is not safe for submit")]
635 UnsafeControlPlane {
636 quality: TxFlowSafetyQuality,
638 },
639 #[error("submit source is still recovering replay continuity")]
641 ReplayRecoveryPending,
642 #[error("submit suppressed by active key")]
644 Suppressed,
645 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
647 StateDrift {
648 drift: u64,
650 max_allowed: u64,
652 },
653 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
655 OpportunityStale {
656 age_ms: u64,
658 max_allowed_ms: u64,
660 },
661}
662
663#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
665pub enum TxSubmitOutcomeKind {
666 DirectAccepted,
668 RpcAccepted,
670 JitoAccepted,
672 Landed,
674 Expired,
676 Dropped,
678 LeaderMissed,
680 BlockhashStale,
682 UnhealthyRoute,
684 RejectedDueToStaleness,
686 RejectedDueToReorgRisk,
688 RejectedDueToStateDrift,
690 RejectedDueToReplayRecovery,
692 Suppressed,
694}
695
696#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
702pub struct TxSubmitOutcome {
703 pub kind: TxSubmitOutcomeKind,
705 pub signature: Option<SignatureBytes>,
707 pub route: Option<SubmitRoute>,
709 pub plan: SubmitPlan,
711 pub legacy_mode: Option<SubmitMode>,
713 pub rpc_signature: Option<String>,
715 pub jito_signature: Option<String>,
717 pub jito_bundle_id: Option<String>,
719 pub state_version: Option<u64>,
721 pub opportunity_age_ms: Option<u64>,
723}
724
725pub trait TxSubmitOutcomeReporter: Send + Sync {
734 fn record_outcome(&self, outcome: &TxSubmitOutcome);
736}
737
738#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
740pub struct TxToxicFlowTelemetrySnapshot {
741 pub reporter_outcomes_dropped: u64,
743 pub reporter_outcomes_unavailable: u64,
745 pub direct_accepted: u64,
747 pub rpc_accepted: u64,
749 pub jito_accepted: u64,
751 pub rejected_due_to_staleness: u64,
753 pub rejected_due_to_reorg_risk: u64,
755 pub rejected_due_to_state_drift: u64,
757 pub submit_on_stale_blockhash: u64,
759 pub leader_route_miss_rate: u64,
761 pub opportunity_age_at_send_ms: Option<u64>,
763 pub rejected_due_to_replay_recovery: u64,
765 pub suppressed_submissions: u64,
767}
768
769#[derive(Debug, Default)]
771#[repr(align(64))]
772struct CacheAlignedAtomicU64(AtomicU64);
773
774impl CacheAlignedAtomicU64 {
775 fn load(&self, ordering: Ordering) -> u64 {
777 self.0.load(ordering)
778 }
779
780 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
782 self.0.swap(value, ordering)
783 }
784
785 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
787 self.0.fetch_add(value, ordering)
788 }
789}
790
791#[derive(Debug, Default)]
793pub struct TxToxicFlowTelemetry {
794 reporter_outcomes_dropped: CacheAlignedAtomicU64,
796 reporter_outcomes_unavailable: CacheAlignedAtomicU64,
798 direct_accepted: CacheAlignedAtomicU64,
800 rpc_accepted: CacheAlignedAtomicU64,
802 jito_accepted: CacheAlignedAtomicU64,
804 rejected_due_to_staleness: CacheAlignedAtomicU64,
806 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
808 rejected_due_to_state_drift: CacheAlignedAtomicU64,
810 submit_on_stale_blockhash: CacheAlignedAtomicU64,
812 leader_route_miss_rate: CacheAlignedAtomicU64,
814 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
816 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
818 suppressed_submissions: CacheAlignedAtomicU64,
820}
821
822impl TxToxicFlowTelemetry {
823 #[must_use]
825 pub fn shared() -> Arc<Self> {
826 Arc::new(Self::default())
827 }
828
829 pub fn record(&self, outcome: &TxSubmitOutcome) {
831 if let Some(age_ms) = outcome.opportunity_age_ms {
832 let _ = self
833 .opportunity_age_at_send_ms
834 .swap(age_ms, Ordering::Relaxed);
835 }
836 match outcome.kind {
837 TxSubmitOutcomeKind::DirectAccepted => {
838 let _ = self.direct_accepted.fetch_add(1, Ordering::Relaxed);
839 }
840 TxSubmitOutcomeKind::RpcAccepted => {
841 let _ = self.rpc_accepted.fetch_add(1, Ordering::Relaxed);
842 }
843 TxSubmitOutcomeKind::JitoAccepted => {
844 let _ = self.jito_accepted.fetch_add(1, Ordering::Relaxed);
845 }
846 TxSubmitOutcomeKind::RejectedDueToStaleness => {
847 let _ = self
848 .rejected_due_to_staleness
849 .fetch_add(1, Ordering::Relaxed);
850 }
851 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
852 let _ = self
853 .rejected_due_to_reorg_risk
854 .fetch_add(1, Ordering::Relaxed);
855 }
856 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
857 let _ = self
858 .rejected_due_to_state_drift
859 .fetch_add(1, Ordering::Relaxed);
860 }
861 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
862 let _ = self
863 .rejected_due_to_replay_recovery
864 .fetch_add(1, Ordering::Relaxed);
865 }
866 TxSubmitOutcomeKind::Suppressed => {
867 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
868 }
869 TxSubmitOutcomeKind::LeaderMissed => {
870 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
871 }
872 TxSubmitOutcomeKind::Landed
873 | TxSubmitOutcomeKind::Expired
874 | TxSubmitOutcomeKind::Dropped
875 | TxSubmitOutcomeKind::UnhealthyRoute => {}
876 TxSubmitOutcomeKind::BlockhashStale => {
877 let _ = self
878 .submit_on_stale_blockhash
879 .fetch_add(1, Ordering::Relaxed);
880 }
881 }
882 }
883
884 pub(crate) fn record_reporter_drop(&self) {
886 let _ = self
887 .reporter_outcomes_dropped
888 .fetch_add(1, Ordering::Relaxed);
889 }
890
891 pub(crate) fn record_reporter_unavailable(&self) {
893 let _ = self
894 .reporter_outcomes_unavailable
895 .fetch_add(1, Ordering::Relaxed);
896 }
897
898 #[must_use]
900 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
901 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
902 TxToxicFlowTelemetrySnapshot {
903 reporter_outcomes_dropped: self.reporter_outcomes_dropped.load(Ordering::Relaxed),
904 reporter_outcomes_unavailable: self
905 .reporter_outcomes_unavailable
906 .load(Ordering::Relaxed),
907 direct_accepted: self.direct_accepted.load(Ordering::Relaxed),
908 rpc_accepted: self.rpc_accepted.load(Ordering::Relaxed),
909 jito_accepted: self.jito_accepted.load(Ordering::Relaxed),
910 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
911 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
912 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
913 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
914 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
915 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
916 rejected_due_to_replay_recovery: self
917 .rejected_due_to_replay_recovery
918 .load(Ordering::Relaxed),
919 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
920 }
921 }
922}
923
924impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
925 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
926 self.record(outcome);
927 }
928}
929
930#[derive(Debug, Default)]
932pub(crate) struct TxSuppressionCache {
933 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
935}
936
937impl TxSuppressionCache {
938 pub(crate) fn is_suppressed(
940 &mut self,
941 keys: &[TxSubmitSuppressionKey],
942 now: SystemTime,
943 ttl: Duration,
944 ) -> bool {
945 self.evict_expired(now, ttl);
946 keys.iter().any(|key| self.entries.contains_key(key))
947 }
948
949 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
951 for key in keys {
952 let _ = self.entries.insert(key.clone(), now);
953 }
954 }
955
956 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
958 self.entries.retain(|_, inserted_at| {
959 now.duration_since(*inserted_at)
960 .map(|elapsed| elapsed <= ttl)
961 .unwrap_or(false)
962 });
963 }
964}
965
966#[async_trait]
968pub trait RpcSubmitTransport: Send + Sync {
969 async fn submit_rpc(
971 &self,
972 tx_bytes: &[u8],
973 config: &RpcSubmitConfig,
974 ) -> Result<String, SubmitTransportError>;
975}
976
977#[async_trait]
979pub trait JitoSubmitTransport: Send + Sync {
980 async fn submit_jito(
982 &self,
983 tx_bytes: &[u8],
984 config: &JitoSubmitConfig,
985 ) -> Result<JitoSubmitResponse, SubmitTransportError>;
986}
987
988#[async_trait]
990pub trait DirectSubmitTransport: Send + Sync {
991 async fn submit_direct(
993 &self,
994 tx_bytes: &[u8],
995 targets: &[LeaderTarget],
996 policy: RoutingPolicy,
997 config: &DirectSubmitConfig,
998 ) -> Result<LeaderTarget, SubmitTransportError>;
999}