1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sof_types::SignatureBytes;
16use thiserror::Error;
17
18use crate::{providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 JitoOnly,
27 DirectOnly,
29 Hybrid,
31}
32
33#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36 LowLatency,
38 #[default]
40 Balanced,
41 HighReliability,
43}
44
45#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48 VersionedTransactionBytes(Vec<u8>),
50 WireTransactionBytes(Vec<u8>),
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57 pub skip_preflight: bool,
59 pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64 fn default() -> Self {
65 Self {
66 skip_preflight: true,
67 preflight_commitment: None,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75 pub bundle_only: bool,
80}
81
82#[derive(Debug, Clone, Eq, PartialEq, Default)]
84pub struct JitoSubmitResponse {
85 pub transaction_signature: Option<String>,
87 pub bundle_id: Option<String>,
89}
90
91#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct DirectSubmitConfig {
94 pub per_target_timeout: Duration,
96 pub global_timeout: Duration,
98 pub direct_target_rounds: usize,
100 pub direct_submit_attempts: usize,
102 pub hybrid_direct_attempts: usize,
104 pub rebroadcast_interval: Duration,
106 pub agave_rebroadcast_enabled: bool,
108 pub agave_rebroadcast_window: Duration,
110 pub agave_rebroadcast_interval: Duration,
112 pub hybrid_rpc_broadcast: bool,
114 pub latency_aware_targeting: bool,
116 pub latency_probe_timeout: Duration,
118 pub latency_probe_port: Option<u16>,
120 pub latency_probe_max_targets: usize,
122}
123
124impl DirectSubmitConfig {
125 #[must_use]
127 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
128 match reliability {
129 SubmitReliability::LowLatency => Self {
130 per_target_timeout: Duration::from_millis(200),
131 global_timeout: Duration::from_millis(1_200),
132 direct_target_rounds: 3,
133 direct_submit_attempts: 3,
134 hybrid_direct_attempts: 2,
135 rebroadcast_interval: Duration::from_millis(90),
136 agave_rebroadcast_enabled: true,
137 agave_rebroadcast_window: Duration::from_secs(30),
138 agave_rebroadcast_interval: Duration::from_millis(700),
139 hybrid_rpc_broadcast: false,
140 latency_aware_targeting: true,
141 latency_probe_timeout: Duration::from_millis(80),
142 latency_probe_port: Some(8899),
143 latency_probe_max_targets: 128,
144 },
145 SubmitReliability::Balanced => Self {
146 per_target_timeout: Duration::from_millis(300),
147 global_timeout: Duration::from_millis(1_800),
148 direct_target_rounds: 4,
149 direct_submit_attempts: 4,
150 hybrid_direct_attempts: 3,
151 rebroadcast_interval: Duration::from_millis(110),
152 agave_rebroadcast_enabled: true,
153 agave_rebroadcast_window: Duration::from_secs(45),
154 agave_rebroadcast_interval: Duration::from_millis(800),
155 hybrid_rpc_broadcast: true,
156 latency_aware_targeting: true,
157 latency_probe_timeout: Duration::from_millis(120),
158 latency_probe_port: Some(8899),
159 latency_probe_max_targets: 128,
160 },
161 SubmitReliability::HighReliability => Self {
162 per_target_timeout: Duration::from_millis(450),
163 global_timeout: Duration::from_millis(3_200),
164 direct_target_rounds: 6,
165 direct_submit_attempts: 5,
166 hybrid_direct_attempts: 4,
167 rebroadcast_interval: Duration::from_millis(140),
168 agave_rebroadcast_enabled: true,
169 agave_rebroadcast_window: Duration::from_secs(70),
170 agave_rebroadcast_interval: Duration::from_millis(900),
171 hybrid_rpc_broadcast: true,
172 latency_aware_targeting: true,
173 latency_probe_timeout: Duration::from_millis(160),
174 latency_probe_port: Some(8899),
175 latency_probe_max_targets: 128,
176 },
177 }
178 }
179
180 #[must_use]
182 pub const fn normalized(self) -> Self {
183 let direct_target_rounds = if self.direct_target_rounds == 0 {
184 1
185 } else {
186 self.direct_target_rounds
187 };
188 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
189 1
190 } else {
191 self.direct_submit_attempts
192 };
193 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
194 1
195 } else {
196 self.hybrid_direct_attempts
197 };
198 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
199 1
200 } else {
201 self.latency_probe_max_targets
202 };
203 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
204 Duration::from_millis(1)
205 } else {
206 self.rebroadcast_interval
207 };
208 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
209 Duration::from_millis(1)
210 } else {
211 self.agave_rebroadcast_interval
212 };
213 Self {
214 per_target_timeout: self.per_target_timeout,
215 global_timeout: self.global_timeout,
216 direct_target_rounds,
217 direct_submit_attempts,
218 hybrid_direct_attempts,
219 rebroadcast_interval,
220 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
221 agave_rebroadcast_window: self.agave_rebroadcast_window,
222 agave_rebroadcast_interval,
223 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
224 latency_aware_targeting: self.latency_aware_targeting,
225 latency_probe_timeout: self.latency_probe_timeout,
226 latency_probe_port: self.latency_probe_port,
227 latency_probe_max_targets,
228 }
229 }
230}
231
232impl Default for DirectSubmitConfig {
233 fn default() -> Self {
234 Self::from_reliability(SubmitReliability::default())
235 }
236}
237
238#[derive(Debug, Error, Clone, Eq, PartialEq)]
240pub enum SubmitTransportError {
241 #[error("transport configuration invalid: {message}")]
243 Config {
244 message: String,
246 },
247 #[error("transport failure: {message}")]
249 Failure {
250 message: String,
252 },
253}
254
255#[derive(Debug, Error)]
257pub enum SubmitError {
258 #[error("blockhash provider returned no recent blockhash")]
260 MissingRecentBlockhash,
261 #[error("failed to decode signed transaction bytes: {source}")]
263 DecodeSignedBytes {
264 source: Box<bincode::ErrorKind>,
266 },
267 #[error("duplicate signature suppressed by dedupe window")]
269 DuplicateSignature,
270 #[error("rpc transport is not configured")]
272 MissingRpcTransport,
273 #[error("jito transport is not configured")]
275 MissingJitoTransport,
276 #[error("direct transport is not configured")]
278 MissingDirectTransport,
279 #[error("no direct targets resolved from leader/backups")]
281 NoDirectTargets,
282 #[error("direct submit failed: {source}")]
284 Direct {
285 source: SubmitTransportError,
287 },
288 #[error("rpc submit failed: {source}")]
290 Rpc {
291 source: SubmitTransportError,
293 },
294 #[error("jito submit failed: {source}")]
296 Jito {
297 source: SubmitTransportError,
299 },
300 #[error("internal synchronization failure: {message}")]
302 InternalSync {
303 message: String,
305 },
306 #[error("submission rejected by toxic-flow guard: {reason}")]
308 ToxicFlow {
309 reason: TxToxicFlowRejectionReason,
311 },
312}
313
314#[derive(Debug, Clone, Eq, PartialEq)]
316pub struct SubmitResult {
317 pub signature: Option<SignatureBytes>,
319 pub mode: SubmitMode,
321 pub direct_target: Option<LeaderTarget>,
323 pub rpc_signature: Option<String>,
325 pub jito_signature: Option<String>,
327 pub jito_bundle_id: Option<String>,
329 pub used_rpc_fallback: bool,
331 pub selected_target_count: usize,
333 pub selected_identity_count: usize,
335}
336
337#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
339pub enum TxFlowSafetyQuality {
340 Stable,
342 Provisional,
344 ReorgRisk,
346 Stale,
348 Degraded,
350 IncompleteControlPlane,
352}
353
354#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
356pub enum TxFlowSafetyIssue {
357 ReplayRecoveryPending,
359 MissingControlPlane,
361 StaleControlPlane,
363 DegradedControlPlane,
365 ReorgRisk,
367 Provisional,
369}
370
371#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
373pub struct TxFlowSafetySnapshot {
374 pub quality: TxFlowSafetyQuality,
376 pub issues: Vec<TxFlowSafetyIssue>,
378 pub current_state_version: Option<u64>,
380 pub replay_recovery_pending: bool,
382}
383
384impl TxFlowSafetySnapshot {
385 #[must_use]
387 pub const fn is_safe(&self) -> bool {
388 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
389 }
390}
391
392pub trait TxFlowSafetySource: Send + Sync {
394 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
396}
397
398#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
400pub enum TxSubmitSuppressionKey {
401 Signature(SignatureBytes),
403 Opportunity([u8; 32]),
405 AccountSet([u8; 32]),
407 SlotWindow {
409 slot: u64,
411 window: u64,
413 },
414}
415
416impl Hash for TxSubmitSuppressionKey {
417 fn hash<H: Hasher>(&self, state: &mut H) {
418 match self {
419 Self::Signature(signature) => {
420 0_u8.hash(state);
421 signature.as_array().hash(state);
422 }
423 Self::Opportunity(key) => {
424 1_u8.hash(state);
425 key.hash(state);
426 }
427 Self::AccountSet(key) => {
428 2_u8.hash(state);
429 key.hash(state);
430 }
431 Self::SlotWindow { slot, window } => {
432 3_u8.hash(state);
433 slot.hash(state);
434 window.hash(state);
435 }
436 }
437 }
438}
439
440#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
442pub struct TxSubmitContext {
443 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
445 pub decision_state_version: Option<u64>,
447 pub opportunity_created_at: Option<SystemTime>,
449}
450
451#[derive(Debug, Clone, Eq, PartialEq)]
453pub struct TxSubmitGuardPolicy {
454 pub require_stable_control_plane: bool,
456 pub reject_on_replay_recovery_pending: bool,
458 pub max_state_version_drift: Option<u64>,
460 pub max_opportunity_age: Option<Duration>,
462 pub suppression_ttl: Duration,
464}
465
466impl Default for TxSubmitGuardPolicy {
467 fn default() -> Self {
468 Self {
469 require_stable_control_plane: true,
470 reject_on_replay_recovery_pending: true,
471 max_state_version_drift: Some(4),
472 max_opportunity_age: Some(Duration::from_millis(750)),
473 suppression_ttl: Duration::from_millis(750),
474 }
475 }
476}
477
478#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
480pub enum TxToxicFlowRejectionReason {
481 #[error("control-plane quality {quality:?} is not safe for submit")]
483 UnsafeControlPlane {
484 quality: TxFlowSafetyQuality,
486 },
487 #[error("submit source is still recovering replay continuity")]
489 ReplayRecoveryPending,
490 #[error("submit suppressed by active key")]
492 Suppressed,
493 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
495 StateDrift {
496 drift: u64,
498 max_allowed: u64,
500 },
501 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
503 OpportunityStale {
504 age_ms: u64,
506 max_allowed_ms: u64,
508 },
509}
510
511#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
513pub enum TxSubmitOutcomeKind {
514 DirectAccepted,
516 RpcAccepted,
518 JitoAccepted,
520 Landed,
522 Expired,
524 Dropped,
526 LeaderMissed,
528 BlockhashStale,
530 UnhealthyRoute,
532 RejectedDueToStaleness,
534 RejectedDueToReorgRisk,
536 RejectedDueToStateDrift,
538 RejectedDueToReplayRecovery,
540 Suppressed,
542}
543
544#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
546pub struct TxSubmitOutcome {
547 pub kind: TxSubmitOutcomeKind,
549 pub signature: Option<SignatureBytes>,
551 pub mode: SubmitMode,
553 pub state_version: Option<u64>,
555 pub opportunity_age_ms: Option<u64>,
557}
558
559pub trait TxSubmitOutcomeReporter: Send + Sync {
561 fn record_outcome(&self, outcome: &TxSubmitOutcome);
563}
564
565#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
567pub struct TxToxicFlowTelemetrySnapshot {
568 pub rejected_due_to_staleness: u64,
570 pub rejected_due_to_reorg_risk: u64,
572 pub rejected_due_to_state_drift: u64,
574 pub submit_on_stale_blockhash: u64,
576 pub leader_route_miss_rate: u64,
578 pub opportunity_age_at_send_ms: Option<u64>,
580 pub rejected_due_to_replay_recovery: u64,
582 pub suppressed_submissions: u64,
584}
585
586#[derive(Debug, Default)]
588#[repr(align(64))]
589struct CacheAlignedAtomicU64(AtomicU64);
590
591impl CacheAlignedAtomicU64 {
592 fn load(&self, ordering: Ordering) -> u64 {
594 self.0.load(ordering)
595 }
596
597 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
599 self.0.swap(value, ordering)
600 }
601
602 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
604 self.0.fetch_add(value, ordering)
605 }
606}
607
608#[derive(Debug, Default)]
610pub struct TxToxicFlowTelemetry {
611 rejected_due_to_staleness: CacheAlignedAtomicU64,
613 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
615 rejected_due_to_state_drift: CacheAlignedAtomicU64,
617 submit_on_stale_blockhash: CacheAlignedAtomicU64,
619 leader_route_miss_rate: CacheAlignedAtomicU64,
621 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
623 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
625 suppressed_submissions: CacheAlignedAtomicU64,
627}
628
629impl TxToxicFlowTelemetry {
630 #[must_use]
632 pub fn shared() -> Arc<Self> {
633 Arc::new(Self::default())
634 }
635
636 pub fn record(&self, outcome: &TxSubmitOutcome) {
638 if let Some(age_ms) = outcome.opportunity_age_ms {
639 let _ = self
640 .opportunity_age_at_send_ms
641 .swap(age_ms, Ordering::Relaxed);
642 }
643 match outcome.kind {
644 TxSubmitOutcomeKind::RejectedDueToStaleness => {
645 let _ = self
646 .rejected_due_to_staleness
647 .fetch_add(1, Ordering::Relaxed);
648 }
649 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
650 let _ = self
651 .rejected_due_to_reorg_risk
652 .fetch_add(1, Ordering::Relaxed);
653 }
654 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
655 let _ = self
656 .rejected_due_to_state_drift
657 .fetch_add(1, Ordering::Relaxed);
658 }
659 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
660 let _ = self
661 .rejected_due_to_replay_recovery
662 .fetch_add(1, Ordering::Relaxed);
663 }
664 TxSubmitOutcomeKind::Suppressed => {
665 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
666 }
667 TxSubmitOutcomeKind::LeaderMissed => {
668 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
669 }
670 TxSubmitOutcomeKind::DirectAccepted
671 | TxSubmitOutcomeKind::RpcAccepted
672 | TxSubmitOutcomeKind::JitoAccepted
673 | TxSubmitOutcomeKind::Landed
674 | TxSubmitOutcomeKind::Expired
675 | TxSubmitOutcomeKind::Dropped
676 | TxSubmitOutcomeKind::UnhealthyRoute => {}
677 TxSubmitOutcomeKind::BlockhashStale => {
678 let _ = self
679 .submit_on_stale_blockhash
680 .fetch_add(1, Ordering::Relaxed);
681 }
682 }
683 }
684
685 #[must_use]
687 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
688 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
689 TxToxicFlowTelemetrySnapshot {
690 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
691 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
692 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
693 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
694 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
695 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
696 rejected_due_to_replay_recovery: self
697 .rejected_due_to_replay_recovery
698 .load(Ordering::Relaxed),
699 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
700 }
701 }
702}
703
704impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
705 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
706 self.record(outcome);
707 }
708}
709
710#[derive(Debug, Default)]
712pub(crate) struct TxSuppressionCache {
713 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
715}
716
717impl TxSuppressionCache {
718 pub(crate) fn is_suppressed(
720 &mut self,
721 keys: &[TxSubmitSuppressionKey],
722 now: SystemTime,
723 ttl: Duration,
724 ) -> bool {
725 self.evict_expired(now, ttl);
726 keys.iter().any(|key| self.entries.contains_key(key))
727 }
728
729 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
731 for key in keys {
732 let _ = self.entries.insert(key.clone(), now);
733 }
734 }
735
736 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
738 self.entries.retain(|_, inserted_at| {
739 now.duration_since(*inserted_at)
740 .map(|elapsed| elapsed <= ttl)
741 .unwrap_or(false)
742 });
743 }
744}
745
746#[async_trait]
748pub trait RpcSubmitTransport: Send + Sync {
749 async fn submit_rpc(
751 &self,
752 tx_bytes: &[u8],
753 config: &RpcSubmitConfig,
754 ) -> Result<String, SubmitTransportError>;
755}
756
757#[async_trait]
759pub trait JitoSubmitTransport: Send + Sync {
760 async fn submit_jito(
762 &self,
763 tx_bytes: &[u8],
764 config: &JitoSubmitConfig,
765 ) -> Result<JitoSubmitResponse, SubmitTransportError>;
766}
767
768#[async_trait]
770pub trait DirectSubmitTransport: Send + Sync {
771 async fn submit_direct(
773 &self,
774 tx_bytes: &[u8],
775 targets: &[LeaderTarget],
776 policy: RoutingPolicy,
777 config: &DirectSubmitConfig,
778 ) -> Result<LeaderTarget, SubmitTransportError>;
779}