1use std::{
4 collections::{HashMap, VecDeque},
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sof_types::SignatureBytes;
16use thiserror::Error;
17
18use crate::{providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 JitoOnly,
27 DirectOnly,
29 Hybrid,
31}
32
33#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
35pub enum SubmitRoute {
36 Rpc,
38 Jito,
40 Direct,
42}
43
44#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
46pub enum SubmitStrategy {
47 #[default]
49 OrderedFallback,
50 AllAtOnce,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
56pub struct SubmitPlan {
57 pub routes: Vec<SubmitRoute>,
59 pub strategy: SubmitStrategy,
61}
62
63impl SubmitPlan {
64 #[must_use]
66 pub fn new(routes: Vec<SubmitRoute>, strategy: SubmitStrategy) -> Self {
67 Self { routes, strategy }.into_normalized()
68 }
69
70 #[must_use]
72 pub fn into_normalized(mut self) -> Self {
73 let mut seen_rpc = false;
74 let mut seen_jito = false;
75 let mut seen_direct = false;
76 self.routes.retain(|route| match route {
77 SubmitRoute::Rpc if seen_rpc => false,
78 SubmitRoute::Rpc => {
79 seen_rpc = true;
80 true
81 }
82 SubmitRoute::Jito if seen_jito => false,
83 SubmitRoute::Jito => {
84 seen_jito = true;
85 true
86 }
87 SubmitRoute::Direct if seen_direct => false,
88 SubmitRoute::Direct => {
89 seen_direct = true;
90 true
91 }
92 });
93 self
94 }
95
96 #[must_use]
98 pub fn normalized(&self) -> Self {
99 self.clone().into_normalized()
100 }
101
102 #[must_use]
104 pub fn rpc_only() -> Self {
105 Self::new(vec![SubmitRoute::Rpc], SubmitStrategy::OrderedFallback)
106 }
107
108 #[must_use]
110 pub fn jito_only() -> Self {
111 Self::new(vec![SubmitRoute::Jito], SubmitStrategy::OrderedFallback)
112 }
113
114 #[must_use]
116 pub fn direct_only() -> Self {
117 Self::new(vec![SubmitRoute::Direct], SubmitStrategy::OrderedFallback)
118 }
119
120 #[must_use]
122 pub fn ordered(routes: Vec<SubmitRoute>) -> Self {
123 Self::new(routes, SubmitStrategy::OrderedFallback)
124 }
125
126 #[must_use]
128 pub fn hybrid() -> Self {
129 Self::ordered(vec![SubmitRoute::Direct, SubmitRoute::Rpc])
130 }
131
132 #[must_use]
134 pub fn all_at_once(routes: Vec<SubmitRoute>) -> Self {
135 Self::new(routes, SubmitStrategy::AllAtOnce)
136 }
137
138 #[must_use]
140 pub fn legacy_mode(&self) -> Option<SubmitMode> {
141 match (self.strategy, self.routes.as_slice()) {
142 (SubmitStrategy::OrderedFallback, [SubmitRoute::Rpc]) => Some(SubmitMode::RpcOnly),
143 (SubmitStrategy::OrderedFallback, [SubmitRoute::Jito]) => Some(SubmitMode::JitoOnly),
144 (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct]) => {
145 Some(SubmitMode::DirectOnly)
146 }
147 (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct, SubmitRoute::Rpc]) => {
148 Some(SubmitMode::Hybrid)
149 }
150 _ => None,
151 }
152 }
153}
154
155impl Default for SubmitPlan {
156 fn default() -> Self {
157 Self::rpc_only()
158 }
159}
160
161impl From<SubmitMode> for SubmitPlan {
162 fn from(value: SubmitMode) -> Self {
163 match value {
164 SubmitMode::RpcOnly => Self::rpc_only(),
165 SubmitMode::JitoOnly => Self::jito_only(),
166 SubmitMode::DirectOnly => Self::direct_only(),
167 SubmitMode::Hybrid => Self::hybrid(),
168 }
169 }
170}
171
172#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
174pub enum SubmitReliability {
175 LowLatency,
177 #[default]
179 Balanced,
180 HighReliability,
182}
183
184#[derive(Debug, Clone, Eq, PartialEq)]
186pub enum SignedTx {
187 VersionedTransactionBytes(Vec<u8>),
189 WireTransactionBytes(Vec<u8>),
191}
192
193#[derive(Debug, Clone, Eq, PartialEq)]
195pub struct RpcSubmitConfig {
196 pub skip_preflight: bool,
198 pub preflight_commitment: Option<String>,
200}
201
202impl Default for RpcSubmitConfig {
203 fn default() -> Self {
204 Self {
205 skip_preflight: true,
206 preflight_commitment: None,
207 }
208 }
209}
210
211#[derive(Debug, Clone, Eq, PartialEq, Default)]
213pub struct JitoSubmitConfig {
214 pub bundle_only: bool,
219}
220
221#[derive(Debug, Clone, Eq, PartialEq, Default)]
223pub struct JitoSubmitResponse {
224 pub transaction_signature: Option<String>,
226 pub bundle_id: Option<String>,
228}
229
230#[derive(Debug, Clone, Eq, PartialEq)]
232pub struct DirectSubmitConfig {
233 pub per_target_timeout: Duration,
235 pub global_timeout: Duration,
237 pub direct_target_rounds: usize,
239 pub direct_submit_attempts: usize,
241 pub hybrid_direct_attempts: usize,
243 pub rebroadcast_interval: Duration,
245 pub agave_rebroadcast_enabled: bool,
247 pub agave_rebroadcast_window: Duration,
249 pub agave_rebroadcast_interval: Duration,
251 pub hybrid_rpc_broadcast: bool,
253 pub latency_aware_targeting: bool,
255 pub latency_probe_timeout: Duration,
257 pub latency_probe_port: Option<u16>,
259 pub latency_probe_max_targets: usize,
261}
262
263impl DirectSubmitConfig {
264 #[must_use]
266 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
267 match reliability {
268 SubmitReliability::LowLatency => Self {
269 per_target_timeout: Duration::from_millis(200),
270 global_timeout: Duration::from_millis(1_200),
271 direct_target_rounds: 3,
272 direct_submit_attempts: 3,
273 hybrid_direct_attempts: 2,
274 rebroadcast_interval: Duration::from_millis(90),
275 agave_rebroadcast_enabled: true,
276 agave_rebroadcast_window: Duration::from_secs(30),
277 agave_rebroadcast_interval: Duration::from_millis(700),
278 hybrid_rpc_broadcast: false,
279 latency_aware_targeting: true,
280 latency_probe_timeout: Duration::from_millis(80),
281 latency_probe_port: Some(8899),
282 latency_probe_max_targets: 128,
283 },
284 SubmitReliability::Balanced => Self {
285 per_target_timeout: Duration::from_millis(300),
286 global_timeout: Duration::from_millis(1_800),
287 direct_target_rounds: 4,
288 direct_submit_attempts: 4,
289 hybrid_direct_attempts: 3,
290 rebroadcast_interval: Duration::from_millis(110),
291 agave_rebroadcast_enabled: true,
292 agave_rebroadcast_window: Duration::from_secs(45),
293 agave_rebroadcast_interval: Duration::from_millis(800),
294 hybrid_rpc_broadcast: true,
295 latency_aware_targeting: true,
296 latency_probe_timeout: Duration::from_millis(120),
297 latency_probe_port: Some(8899),
298 latency_probe_max_targets: 128,
299 },
300 SubmitReliability::HighReliability => Self {
301 per_target_timeout: Duration::from_millis(450),
302 global_timeout: Duration::from_millis(3_200),
303 direct_target_rounds: 6,
304 direct_submit_attempts: 5,
305 hybrid_direct_attempts: 4,
306 rebroadcast_interval: Duration::from_millis(140),
307 agave_rebroadcast_enabled: true,
308 agave_rebroadcast_window: Duration::from_secs(70),
309 agave_rebroadcast_interval: Duration::from_millis(900),
310 hybrid_rpc_broadcast: true,
311 latency_aware_targeting: true,
312 latency_probe_timeout: Duration::from_millis(160),
313 latency_probe_port: Some(8899),
314 latency_probe_max_targets: 128,
315 },
316 }
317 }
318
319 #[must_use]
321 pub const fn normalized(self) -> Self {
322 let per_target_timeout = if self.per_target_timeout.is_zero() {
323 Duration::from_millis(1)
324 } else {
325 self.per_target_timeout
326 };
327 let global_timeout = if self.global_timeout.is_zero() {
328 Duration::from_millis(1)
329 } else {
330 self.global_timeout
331 };
332 let direct_target_rounds = if self.direct_target_rounds == 0 {
333 1
334 } else {
335 self.direct_target_rounds
336 };
337 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
338 1
339 } else {
340 self.direct_submit_attempts
341 };
342 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
343 1
344 } else {
345 self.hybrid_direct_attempts
346 };
347 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
348 1
349 } else {
350 self.latency_probe_max_targets
351 };
352 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
353 Duration::from_millis(1)
354 } else {
355 self.rebroadcast_interval
356 };
357 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
358 Duration::from_millis(1)
359 } else {
360 self.agave_rebroadcast_interval
361 };
362 let latency_probe_timeout = if self.latency_probe_timeout.is_zero() {
363 Duration::from_millis(1)
364 } else {
365 self.latency_probe_timeout
366 };
367 Self {
368 per_target_timeout,
369 global_timeout,
370 direct_target_rounds,
371 direct_submit_attempts,
372 hybrid_direct_attempts,
373 rebroadcast_interval,
374 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
375 agave_rebroadcast_window: self.agave_rebroadcast_window,
376 agave_rebroadcast_interval,
377 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
378 latency_aware_targeting: self.latency_aware_targeting,
379 latency_probe_timeout,
380 latency_probe_port: self.latency_probe_port,
381 latency_probe_max_targets,
382 }
383 }
384}
385
386impl Default for DirectSubmitConfig {
387 fn default() -> Self {
388 Self::from_reliability(SubmitReliability::default())
389 }
390}
391
392#[derive(Debug, Error, Clone, Eq, PartialEq)]
394pub enum SubmitTransportError {
395 #[error("transport configuration invalid: {message}")]
397 Config {
398 message: String,
400 },
401 #[error("transport failure: {message}")]
403 Failure {
404 message: String,
406 },
407}
408
409#[derive(Debug, Error)]
411pub enum SubmitError {
412 #[error("blockhash provider returned no recent blockhash")]
414 MissingRecentBlockhash,
415 #[error("failed to decode signed transaction bytes: {source}")]
417 DecodeSignedBytes {
418 source: Box<bincode::ErrorKind>,
420 },
421 #[error("duplicate signature suppressed by dedupe window")]
423 DuplicateSignature,
424 #[error("rpc transport is not configured")]
426 MissingRpcTransport,
427 #[error("jito transport is not configured")]
429 MissingJitoTransport,
430 #[error("direct transport is not configured")]
432 MissingDirectTransport,
433 #[error("no direct targets resolved from leader/backups")]
435 NoDirectTargets,
436 #[error("direct submit failed: {source}")]
438 Direct {
439 source: SubmitTransportError,
441 },
442 #[error("rpc submit failed: {source}")]
444 Rpc {
445 source: SubmitTransportError,
447 },
448 #[error("jito submit failed: {source}")]
450 Jito {
451 source: SubmitTransportError,
453 },
454 #[error("internal synchronization failure: {message}")]
456 InternalSync {
457 message: String,
459 },
460 #[error("submission rejected by toxic-flow guard: {reason}")]
462 ToxicFlow {
463 reason: TxToxicFlowRejectionReason,
465 },
466}
467
468#[derive(Debug, Clone, Eq, PartialEq)]
470pub struct SubmitResult {
471 pub signature: Option<SignatureBytes>,
473 pub plan: SubmitPlan,
475 pub legacy_mode: Option<SubmitMode>,
477 pub first_success_route: Option<SubmitRoute>,
479 pub successful_routes: Vec<SubmitRoute>,
487 pub direct_target: Option<LeaderTarget>,
489 pub rpc_signature: Option<String>,
491 pub jito_signature: Option<String>,
494 pub jito_bundle_id: Option<String>,
496 pub used_fallback_route: bool,
498 pub selected_target_count: usize,
500 pub selected_identity_count: usize,
502}
503
504#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
506pub enum TxFlowSafetyQuality {
507 Stable,
509 Provisional,
511 ReorgRisk,
513 Stale,
515 Degraded,
517 IncompleteControlPlane,
519}
520
521#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
523pub enum TxFlowSafetyIssue {
524 ReplayRecoveryPending,
526 MissingControlPlane,
528 StaleControlPlane,
530 DegradedControlPlane,
532 ReorgRisk,
534 Provisional,
536}
537
538#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
540pub struct TxFlowSafetySnapshot {
541 pub quality: TxFlowSafetyQuality,
543 pub issues: Vec<TxFlowSafetyIssue>,
545 pub current_state_version: Option<u64>,
547 pub replay_recovery_pending: bool,
549}
550
551impl TxFlowSafetySnapshot {
552 #[must_use]
554 pub const fn is_safe(&self) -> bool {
555 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
556 }
557}
558
559pub trait TxFlowSafetySource: Send + Sync {
561 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
563}
564
565#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
567pub enum TxSubmitSuppressionKey {
568 Signature(SignatureBytes),
570 Opportunity([u8; 32]),
572 AccountSet([u8; 32]),
574 SlotWindow {
576 slot: u64,
578 window: u64,
580 },
581}
582
583impl Hash for TxSubmitSuppressionKey {
584 fn hash<H: Hasher>(&self, state: &mut H) {
585 match self {
586 Self::Signature(signature) => {
587 0_u8.hash(state);
588 signature.as_array().hash(state);
589 }
590 Self::Opportunity(key) => {
591 1_u8.hash(state);
592 key.hash(state);
593 }
594 Self::AccountSet(key) => {
595 2_u8.hash(state);
596 key.hash(state);
597 }
598 Self::SlotWindow { slot, window } => {
599 3_u8.hash(state);
600 slot.hash(state);
601 window.hash(state);
602 }
603 }
604 }
605}
606
607#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
609pub struct TxSubmitContext {
610 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
612 pub decision_state_version: Option<u64>,
614 pub opportunity_created_at: Option<SystemTime>,
616}
617
618#[derive(Debug, Clone, Eq, PartialEq)]
620pub struct TxSubmitGuardPolicy {
621 pub require_stable_control_plane: bool,
623 pub reject_on_replay_recovery_pending: bool,
625 pub max_state_version_drift: Option<u64>,
627 pub max_opportunity_age: Option<Duration>,
629 pub suppression_ttl: Duration,
631}
632
633impl Default for TxSubmitGuardPolicy {
634 fn default() -> Self {
635 Self {
636 require_stable_control_plane: true,
637 reject_on_replay_recovery_pending: true,
638 max_state_version_drift: Some(4),
639 max_opportunity_age: Some(Duration::from_millis(750)),
640 suppression_ttl: Duration::from_millis(750),
641 }
642 }
643}
644
645#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
647pub enum TxToxicFlowRejectionReason {
648 #[error("control-plane quality {quality:?} is not safe for submit")]
650 UnsafeControlPlane {
651 quality: TxFlowSafetyQuality,
653 },
654 #[error("submit source is still recovering replay continuity")]
656 ReplayRecoveryPending,
657 #[error("submit suppressed by active key")]
659 Suppressed,
660 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
662 StateDrift {
663 drift: u64,
665 max_allowed: u64,
667 },
668 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
670 OpportunityStale {
671 age_ms: u64,
673 max_allowed_ms: u64,
675 },
676}
677
678#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
680pub enum TxSubmitOutcomeKind {
681 DirectAccepted,
683 RpcAccepted,
685 JitoAccepted,
687 Landed,
689 Expired,
691 Dropped,
693 LeaderMissed,
695 BlockhashStale,
697 UnhealthyRoute,
699 RejectedDueToStaleness,
701 RejectedDueToReorgRisk,
703 RejectedDueToStateDrift,
705 RejectedDueToReplayRecovery,
707 Suppressed,
709}
710
711#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
717pub struct TxSubmitOutcome {
718 pub kind: TxSubmitOutcomeKind,
720 pub signature: Option<SignatureBytes>,
722 pub route: Option<SubmitRoute>,
724 pub plan: SubmitPlan,
726 pub legacy_mode: Option<SubmitMode>,
728 pub rpc_signature: Option<String>,
730 pub jito_signature: Option<String>,
732 pub jito_bundle_id: Option<String>,
734 pub state_version: Option<u64>,
736 pub opportunity_age_ms: Option<u64>,
738}
739
740pub trait TxSubmitOutcomeReporter: Send + Sync {
749 fn record_outcome(&self, outcome: &TxSubmitOutcome);
751}
752
753#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
755pub struct TxToxicFlowTelemetrySnapshot {
756 pub reporter_outcomes_dropped: u64,
758 pub reporter_outcomes_unavailable: u64,
760 pub direct_accepted: u64,
762 pub rpc_accepted: u64,
764 pub jito_accepted: u64,
766 pub rejected_due_to_staleness: u64,
768 pub rejected_due_to_reorg_risk: u64,
770 pub rejected_due_to_state_drift: u64,
772 pub submit_on_stale_blockhash: u64,
774 pub leader_route_miss_rate: u64,
776 pub opportunity_age_at_send_ms: Option<u64>,
778 pub rejected_due_to_replay_recovery: u64,
780 pub suppressed_submissions: u64,
782}
783
784#[derive(Debug, Default)]
786#[repr(align(64))]
787struct CacheAlignedAtomicU64(AtomicU64);
788
789impl CacheAlignedAtomicU64 {
790 fn load(&self, ordering: Ordering) -> u64 {
792 self.0.load(ordering)
793 }
794
795 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
797 self.0.swap(value, ordering)
798 }
799
800 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
802 self.0.fetch_add(value, ordering)
803 }
804}
805
806#[derive(Debug, Default)]
808pub struct TxToxicFlowTelemetry {
809 reporter_outcomes_dropped: CacheAlignedAtomicU64,
811 reporter_outcomes_unavailable: CacheAlignedAtomicU64,
813 direct_accepted: CacheAlignedAtomicU64,
815 rpc_accepted: CacheAlignedAtomicU64,
817 jito_accepted: CacheAlignedAtomicU64,
819 rejected_due_to_staleness: CacheAlignedAtomicU64,
821 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
823 rejected_due_to_state_drift: CacheAlignedAtomicU64,
825 submit_on_stale_blockhash: CacheAlignedAtomicU64,
827 leader_route_miss_rate: CacheAlignedAtomicU64,
829 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
831 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
833 suppressed_submissions: CacheAlignedAtomicU64,
835}
836
837impl TxToxicFlowTelemetry {
838 #[must_use]
840 pub fn shared() -> Arc<Self> {
841 Arc::new(Self::default())
842 }
843
844 pub fn record(&self, outcome: &TxSubmitOutcome) {
846 if let Some(age_ms) = outcome.opportunity_age_ms {
847 let _ = self
848 .opportunity_age_at_send_ms
849 .swap(age_ms, Ordering::Relaxed);
850 }
851 match outcome.kind {
852 TxSubmitOutcomeKind::DirectAccepted => {
853 let _ = self.direct_accepted.fetch_add(1, Ordering::Relaxed);
854 }
855 TxSubmitOutcomeKind::RpcAccepted => {
856 let _ = self.rpc_accepted.fetch_add(1, Ordering::Relaxed);
857 }
858 TxSubmitOutcomeKind::JitoAccepted => {
859 let _ = self.jito_accepted.fetch_add(1, Ordering::Relaxed);
860 }
861 TxSubmitOutcomeKind::RejectedDueToStaleness => {
862 let _ = self
863 .rejected_due_to_staleness
864 .fetch_add(1, Ordering::Relaxed);
865 }
866 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
867 let _ = self
868 .rejected_due_to_reorg_risk
869 .fetch_add(1, Ordering::Relaxed);
870 }
871 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
872 let _ = self
873 .rejected_due_to_state_drift
874 .fetch_add(1, Ordering::Relaxed);
875 }
876 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
877 let _ = self
878 .rejected_due_to_replay_recovery
879 .fetch_add(1, Ordering::Relaxed);
880 }
881 TxSubmitOutcomeKind::Suppressed => {
882 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
883 }
884 TxSubmitOutcomeKind::LeaderMissed => {
885 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
886 }
887 TxSubmitOutcomeKind::Landed
888 | TxSubmitOutcomeKind::Expired
889 | TxSubmitOutcomeKind::Dropped
890 | TxSubmitOutcomeKind::UnhealthyRoute => {}
891 TxSubmitOutcomeKind::BlockhashStale => {
892 let _ = self
893 .submit_on_stale_blockhash
894 .fetch_add(1, Ordering::Relaxed);
895 }
896 }
897 }
898
899 pub(crate) fn record_reporter_drop(&self) {
901 let _ = self
902 .reporter_outcomes_dropped
903 .fetch_add(1, Ordering::Relaxed);
904 }
905
906 pub(crate) fn record_reporter_unavailable(&self) {
908 let _ = self
909 .reporter_outcomes_unavailable
910 .fetch_add(1, Ordering::Relaxed);
911 }
912
913 #[must_use]
915 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
916 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
917 TxToxicFlowTelemetrySnapshot {
918 reporter_outcomes_dropped: self.reporter_outcomes_dropped.load(Ordering::Relaxed),
919 reporter_outcomes_unavailable: self
920 .reporter_outcomes_unavailable
921 .load(Ordering::Relaxed),
922 direct_accepted: self.direct_accepted.load(Ordering::Relaxed),
923 rpc_accepted: self.rpc_accepted.load(Ordering::Relaxed),
924 jito_accepted: self.jito_accepted.load(Ordering::Relaxed),
925 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
926 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
927 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
928 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
929 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
930 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
931 rejected_due_to_replay_recovery: self
932 .rejected_due_to_replay_recovery
933 .load(Ordering::Relaxed),
934 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
935 }
936 }
937}
938
939impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
940 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
941 self.record(outcome);
942 }
943}
944
945#[derive(Debug, Default)]
947pub(crate) struct TxSuppressionCache {
948 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
950 order: VecDeque<(TxSubmitSuppressionKey, SystemTime)>,
952}
953
954impl TxSuppressionCache {
955 pub(crate) fn is_suppressed(
957 &mut self,
958 keys: &[TxSubmitSuppressionKey],
959 now: SystemTime,
960 ttl: Duration,
961 ) -> bool {
962 self.evict_expired(now, ttl);
963 keys.iter().any(|key| self.entries.contains_key(key))
964 }
965
966 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
968 for key in keys {
969 let _ = self.entries.insert(key.clone(), now);
970 self.order.push_back((key.clone(), now));
971 }
972 }
973
974 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
976 while let Some((_, front_inserted_at)) = self.order.front() {
977 let still_live = now
978 .duration_since(*front_inserted_at)
979 .map(|elapsed| elapsed <= ttl)
980 .unwrap_or(false);
981 if still_live {
982 break;
983 }
984 let Some((key, queued_inserted_at)) = self.order.pop_front() else {
985 break;
986 };
987 if self.entries.get(&key) == Some(&queued_inserted_at) {
988 let _ = self.entries.remove(&key);
989 }
990 }
991 }
992}
993
994#[async_trait]
996pub trait RpcSubmitTransport: Send + Sync {
997 async fn submit_rpc(
999 &self,
1000 tx_bytes: &[u8],
1001 config: &RpcSubmitConfig,
1002 ) -> Result<String, SubmitTransportError>;
1003}
1004
1005#[async_trait]
1007pub trait JitoSubmitTransport: Send + Sync {
1008 async fn submit_jito(
1010 &self,
1011 tx_bytes: &[u8],
1012 config: &JitoSubmitConfig,
1013 ) -> Result<JitoSubmitResponse, SubmitTransportError>;
1014}
1015
1016#[async_trait]
1018pub trait DirectSubmitTransport: Send + Sync {
1019 async fn submit_direct(
1021 &self,
1022 tx_bytes: &[u8],
1023 targets: &[LeaderTarget],
1024 policy: RoutingPolicy,
1025 config: &DirectSubmitConfig,
1026 ) -> Result<LeaderTarget, SubmitTransportError>;
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031 use super::*;
1032 use std::{hint::black_box, time::Instant};
1033
1034 use sof_support::bench::profile_iterations;
1035
1036 #[test]
1037 #[ignore = "profiling fixture for submit suppression cache churn"]
1038 fn suppression_cache_profile_fixture() {
1039 let iterations = profile_iterations(50_000);
1040 let ttl = Duration::from_millis(750);
1041 let base = SystemTime::UNIX_EPOCH + Duration::from_secs(1);
1042 let keys = (0_u8..64)
1043 .map(|value| TxSubmitSuppressionKey::Opportunity([value; 32]))
1044 .collect::<Vec<_>>();
1045 let mut cache = TxSuppressionCache::default();
1046
1047 let started = Instant::now();
1048 for (iteration, key) in keys.iter().cycle().take(iterations).enumerate() {
1049 let now = base + Duration::from_millis(u64::try_from(iteration % 2_000).unwrap_or(0));
1050 cache.insert_all(std::slice::from_ref(key), now);
1051 black_box(cache.is_suppressed(std::slice::from_ref(key), now, ttl));
1052 }
1053 let elapsed = started.elapsed();
1054 let avg_ns_per_iteration = elapsed.as_nanos() / u128::try_from(iterations).unwrap_or(1);
1055 let avg_us_per_iteration = avg_ns_per_iteration as f64 / 1_000.0;
1056
1057 eprintln!(
1058 "suppression_cache_profile_fixture iterations={} elapsed_us={} avg_ns_per_iteration={} avg_us_per_iteration={:.3} entries={}",
1059 iterations,
1060 elapsed.as_micros(),
1061 avg_ns_per_iteration,
1062 avg_us_per_iteration,
1063 cache.entries.len(),
1064 );
1065 }
1066
1067 #[test]
1068 fn suppression_cache_keeps_refreshed_entry_live() {
1069 let mut cache = TxSuppressionCache::default();
1070 let key = TxSubmitSuppressionKey::Opportunity([7_u8; 32]);
1071 let ttl = Duration::from_millis(750);
1072 let first_inserted_at = SystemTime::UNIX_EPOCH + Duration::from_secs(1);
1073 let refreshed_at = first_inserted_at + Duration::from_millis(500);
1074
1075 cache.insert_all(std::slice::from_ref(&key), first_inserted_at);
1076 cache.insert_all(std::slice::from_ref(&key), refreshed_at);
1077
1078 assert!(cache.is_suppressed(
1079 std::slice::from_ref(&key),
1080 refreshed_at + Duration::from_millis(100),
1081 ttl,
1082 ));
1083 assert!(!cache.is_suppressed(
1084 std::slice::from_ref(&key),
1085 refreshed_at + ttl + Duration::from_millis(1),
1086 ttl,
1087 ));
1088 }
1089
1090 #[test]
1091 fn direct_submit_config_clamps_zero_timeouts() {
1092 let normalized = DirectSubmitConfig {
1093 per_target_timeout: Duration::ZERO,
1094 global_timeout: Duration::ZERO,
1095 direct_target_rounds: 1,
1096 direct_submit_attempts: 1,
1097 hybrid_direct_attempts: 1,
1098 rebroadcast_interval: Duration::from_millis(5),
1099 agave_rebroadcast_enabled: false,
1100 agave_rebroadcast_window: Duration::ZERO,
1101 agave_rebroadcast_interval: Duration::from_millis(5),
1102 hybrid_rpc_broadcast: false,
1103 latency_aware_targeting: true,
1104 latency_probe_timeout: Duration::ZERO,
1105 latency_probe_port: None,
1106 latency_probe_max_targets: 1,
1107 }
1108 .normalized();
1109
1110 assert_eq!(normalized.per_target_timeout, Duration::from_millis(1));
1111 assert_eq!(normalized.global_timeout, Duration::from_millis(1));
1112 assert_eq!(normalized.latency_probe_timeout, Duration::from_millis(1));
1113 }
1114}