1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 DirectOnly,
27 Hybrid,
29}
30
31#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
33pub enum SubmitReliability {
34 LowLatency,
36 #[default]
38 Balanced,
39 HighReliability,
41}
42
43#[derive(Debug, Clone, Eq, PartialEq)]
45pub enum SignedTx {
46 VersionedTransactionBytes(Vec<u8>),
48 WireTransactionBytes(Vec<u8>),
50}
51
52#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct RpcSubmitConfig {
55 pub skip_preflight: bool,
57 pub preflight_commitment: Option<String>,
59}
60
61impl Default for RpcSubmitConfig {
62 fn default() -> Self {
63 Self {
64 skip_preflight: true,
65 preflight_commitment: None,
66 }
67 }
68}
69
70#[derive(Debug, Clone, Eq, PartialEq)]
72pub struct DirectSubmitConfig {
73 pub per_target_timeout: Duration,
75 pub global_timeout: Duration,
77 pub direct_target_rounds: usize,
79 pub direct_submit_attempts: usize,
81 pub hybrid_direct_attempts: usize,
83 pub rebroadcast_interval: Duration,
85 pub agave_rebroadcast_enabled: bool,
87 pub agave_rebroadcast_window: Duration,
89 pub agave_rebroadcast_interval: Duration,
91 pub hybrid_rpc_broadcast: bool,
93 pub latency_aware_targeting: bool,
95 pub latency_probe_timeout: Duration,
97 pub latency_probe_port: Option<u16>,
99 pub latency_probe_max_targets: usize,
101}
102
103impl DirectSubmitConfig {
104 #[must_use]
106 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
107 match reliability {
108 SubmitReliability::LowLatency => Self {
109 per_target_timeout: Duration::from_millis(200),
110 global_timeout: Duration::from_millis(1_200),
111 direct_target_rounds: 3,
112 direct_submit_attempts: 3,
113 hybrid_direct_attempts: 2,
114 rebroadcast_interval: Duration::from_millis(90),
115 agave_rebroadcast_enabled: true,
116 agave_rebroadcast_window: Duration::from_secs(30),
117 agave_rebroadcast_interval: Duration::from_millis(700),
118 hybrid_rpc_broadcast: false,
119 latency_aware_targeting: true,
120 latency_probe_timeout: Duration::from_millis(80),
121 latency_probe_port: Some(8899),
122 latency_probe_max_targets: 128,
123 },
124 SubmitReliability::Balanced => Self {
125 per_target_timeout: Duration::from_millis(300),
126 global_timeout: Duration::from_millis(1_800),
127 direct_target_rounds: 4,
128 direct_submit_attempts: 4,
129 hybrid_direct_attempts: 3,
130 rebroadcast_interval: Duration::from_millis(110),
131 agave_rebroadcast_enabled: true,
132 agave_rebroadcast_window: Duration::from_secs(45),
133 agave_rebroadcast_interval: Duration::from_millis(800),
134 hybrid_rpc_broadcast: true,
135 latency_aware_targeting: true,
136 latency_probe_timeout: Duration::from_millis(120),
137 latency_probe_port: Some(8899),
138 latency_probe_max_targets: 128,
139 },
140 SubmitReliability::HighReliability => Self {
141 per_target_timeout: Duration::from_millis(450),
142 global_timeout: Duration::from_millis(3_200),
143 direct_target_rounds: 6,
144 direct_submit_attempts: 5,
145 hybrid_direct_attempts: 4,
146 rebroadcast_interval: Duration::from_millis(140),
147 agave_rebroadcast_enabled: true,
148 agave_rebroadcast_window: Duration::from_secs(70),
149 agave_rebroadcast_interval: Duration::from_millis(900),
150 hybrid_rpc_broadcast: true,
151 latency_aware_targeting: true,
152 latency_probe_timeout: Duration::from_millis(160),
153 latency_probe_port: Some(8899),
154 latency_probe_max_targets: 128,
155 },
156 }
157 }
158
159 #[must_use]
161 pub const fn normalized(self) -> Self {
162 let direct_target_rounds = if self.direct_target_rounds == 0 {
163 1
164 } else {
165 self.direct_target_rounds
166 };
167 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
168 1
169 } else {
170 self.direct_submit_attempts
171 };
172 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
173 1
174 } else {
175 self.hybrid_direct_attempts
176 };
177 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
178 1
179 } else {
180 self.latency_probe_max_targets
181 };
182 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
183 Duration::from_millis(1)
184 } else {
185 self.rebroadcast_interval
186 };
187 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
188 Duration::from_millis(1)
189 } else {
190 self.agave_rebroadcast_interval
191 };
192 Self {
193 per_target_timeout: self.per_target_timeout,
194 global_timeout: self.global_timeout,
195 direct_target_rounds,
196 direct_submit_attempts,
197 hybrid_direct_attempts,
198 rebroadcast_interval,
199 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
200 agave_rebroadcast_window: self.agave_rebroadcast_window,
201 agave_rebroadcast_interval,
202 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
203 latency_aware_targeting: self.latency_aware_targeting,
204 latency_probe_timeout: self.latency_probe_timeout,
205 latency_probe_port: self.latency_probe_port,
206 latency_probe_max_targets,
207 }
208 }
209}
210
211impl Default for DirectSubmitConfig {
212 fn default() -> Self {
213 Self::from_reliability(SubmitReliability::default())
214 }
215}
216
217#[derive(Debug, Error, Clone, Eq, PartialEq)]
219pub enum SubmitTransportError {
220 #[error("transport configuration invalid: {message}")]
222 Config {
223 message: String,
225 },
226 #[error("transport failure: {message}")]
228 Failure {
229 message: String,
231 },
232}
233
234#[derive(Debug, Error)]
236pub enum SubmitError {
237 #[error("failed to build/sign transaction: {source}")]
239 Build {
240 source: BuilderError,
242 },
243 #[error("blockhash provider returned no recent blockhash")]
245 MissingRecentBlockhash,
246 #[error("failed to decode signed transaction bytes: {source}")]
248 DecodeSignedBytes {
249 source: Box<bincode::ErrorKind>,
251 },
252 #[error("duplicate signature suppressed by dedupe window")]
254 DuplicateSignature,
255 #[error("rpc transport is not configured")]
257 MissingRpcTransport,
258 #[error("direct transport is not configured")]
260 MissingDirectTransport,
261 #[error("no direct targets resolved from leader/backups")]
263 NoDirectTargets,
264 #[error("direct submit failed: {source}")]
266 Direct {
267 source: SubmitTransportError,
269 },
270 #[error("rpc submit failed: {source}")]
272 Rpc {
273 source: SubmitTransportError,
275 },
276 #[error("internal synchronization failure: {message}")]
278 InternalSync {
279 message: String,
281 },
282 #[error("submission rejected by toxic-flow guard: {reason}")]
284 ToxicFlow {
285 reason: TxToxicFlowRejectionReason,
287 },
288}
289
290#[derive(Debug, Clone, Eq, PartialEq)]
292pub struct SubmitResult {
293 pub signature: Option<Signature>,
295 pub mode: SubmitMode,
297 pub direct_target: Option<LeaderTarget>,
299 pub rpc_signature: Option<String>,
301 pub used_rpc_fallback: bool,
303 pub selected_target_count: usize,
305 pub selected_identity_count: usize,
307}
308
309#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
311pub enum TxFlowSafetyQuality {
312 Stable,
314 Provisional,
316 ReorgRisk,
318 Stale,
320 Degraded,
322 IncompleteControlPlane,
324}
325
326#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
328pub enum TxFlowSafetyIssue {
329 ReplayRecoveryPending,
331 MissingControlPlane,
333 StaleControlPlane,
335 DegradedControlPlane,
337 ReorgRisk,
339 Provisional,
341}
342
343#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
345pub struct TxFlowSafetySnapshot {
346 pub quality: TxFlowSafetyQuality,
348 pub issues: Vec<TxFlowSafetyIssue>,
350 pub current_state_version: Option<u64>,
352 pub replay_recovery_pending: bool,
354}
355
356impl TxFlowSafetySnapshot {
357 #[must_use]
359 pub const fn is_safe(&self) -> bool {
360 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
361 }
362}
363
364pub trait TxFlowSafetySource: Send + Sync {
366 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
368}
369
370#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
372pub enum TxSubmitSuppressionKey {
373 Signature(Signature),
375 Opportunity([u8; 32]),
377 AccountSet([u8; 32]),
379 SlotWindow {
381 slot: u64,
383 window: u64,
385 },
386}
387
388impl Hash for TxSubmitSuppressionKey {
389 fn hash<H: Hasher>(&self, state: &mut H) {
390 match self {
391 Self::Signature(signature) => {
392 0_u8.hash(state);
393 signature.as_array().hash(state);
394 }
395 Self::Opportunity(key) => {
396 1_u8.hash(state);
397 key.hash(state);
398 }
399 Self::AccountSet(key) => {
400 2_u8.hash(state);
401 key.hash(state);
402 }
403 Self::SlotWindow { slot, window } => {
404 3_u8.hash(state);
405 slot.hash(state);
406 window.hash(state);
407 }
408 }
409 }
410}
411
412#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
414pub struct TxSubmitContext {
415 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
417 pub decision_state_version: Option<u64>,
419 pub opportunity_created_at: Option<SystemTime>,
421}
422
423#[derive(Debug, Clone, Eq, PartialEq)]
425pub struct TxSubmitGuardPolicy {
426 pub require_stable_control_plane: bool,
428 pub reject_on_replay_recovery_pending: bool,
430 pub max_state_version_drift: Option<u64>,
432 pub max_opportunity_age: Option<Duration>,
434 pub suppression_ttl: Duration,
436}
437
438impl Default for TxSubmitGuardPolicy {
439 fn default() -> Self {
440 Self {
441 require_stable_control_plane: true,
442 reject_on_replay_recovery_pending: true,
443 max_state_version_drift: Some(4),
444 max_opportunity_age: Some(Duration::from_millis(750)),
445 suppression_ttl: Duration::from_millis(750),
446 }
447 }
448}
449
450#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
452pub enum TxToxicFlowRejectionReason {
453 #[error("control-plane quality {quality:?} is not safe for submit")]
455 UnsafeControlPlane {
456 quality: TxFlowSafetyQuality,
458 },
459 #[error("submit source is still recovering replay continuity")]
461 ReplayRecoveryPending,
462 #[error("submit suppressed by active key")]
464 Suppressed,
465 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
467 StateDrift {
468 drift: u64,
470 max_allowed: u64,
472 },
473 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
475 OpportunityStale {
476 age_ms: u64,
478 max_allowed_ms: u64,
480 },
481}
482
483#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
485pub enum TxSubmitOutcomeKind {
486 DirectAccepted,
488 RpcAccepted,
490 Landed,
492 Expired,
494 Dropped,
496 LeaderMissed,
498 BlockhashStale,
500 UnhealthyRoute,
502 RejectedDueToStaleness,
504 RejectedDueToReorgRisk,
506 RejectedDueToStateDrift,
508 RejectedDueToReplayRecovery,
510 Suppressed,
512}
513
514#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
516pub struct TxSubmitOutcome {
517 pub kind: TxSubmitOutcomeKind,
519 pub signature: Option<Signature>,
521 pub mode: SubmitMode,
523 pub state_version: Option<u64>,
525 pub opportunity_age_ms: Option<u64>,
527}
528
529pub trait TxSubmitOutcomeReporter: Send + Sync {
531 fn record_outcome(&self, outcome: &TxSubmitOutcome);
533}
534
535#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
537pub struct TxToxicFlowTelemetrySnapshot {
538 pub rejected_due_to_staleness: u64,
540 pub rejected_due_to_reorg_risk: u64,
542 pub rejected_due_to_state_drift: u64,
544 pub submit_on_stale_blockhash: u64,
546 pub leader_route_miss_rate: u64,
548 pub opportunity_age_at_send_ms: Option<u64>,
550 pub rejected_due_to_replay_recovery: u64,
552 pub suppressed_submissions: u64,
554}
555
556#[derive(Debug, Default)]
558#[repr(align(64))]
559struct CacheAlignedAtomicU64(AtomicU64);
560
561impl CacheAlignedAtomicU64 {
562 fn load(&self, ordering: Ordering) -> u64 {
564 self.0.load(ordering)
565 }
566
567 fn swap(&self, value: u64, ordering: Ordering) -> u64 {
569 self.0.swap(value, ordering)
570 }
571
572 fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
574 self.0.fetch_add(value, ordering)
575 }
576}
577
578#[derive(Debug, Default)]
580pub struct TxToxicFlowTelemetry {
581 rejected_due_to_staleness: CacheAlignedAtomicU64,
583 rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
585 rejected_due_to_state_drift: CacheAlignedAtomicU64,
587 submit_on_stale_blockhash: CacheAlignedAtomicU64,
589 leader_route_miss_rate: CacheAlignedAtomicU64,
591 opportunity_age_at_send_ms: CacheAlignedAtomicU64,
593 rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
595 suppressed_submissions: CacheAlignedAtomicU64,
597}
598
599impl TxToxicFlowTelemetry {
600 #[must_use]
602 pub fn shared() -> Arc<Self> {
603 Arc::new(Self::default())
604 }
605
606 pub fn record(&self, outcome: &TxSubmitOutcome) {
608 if let Some(age_ms) = outcome.opportunity_age_ms {
609 let _ = self
610 .opportunity_age_at_send_ms
611 .swap(age_ms, Ordering::Relaxed);
612 }
613 match outcome.kind {
614 TxSubmitOutcomeKind::RejectedDueToStaleness => {
615 let _ = self
616 .rejected_due_to_staleness
617 .fetch_add(1, Ordering::Relaxed);
618 }
619 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
620 let _ = self
621 .rejected_due_to_reorg_risk
622 .fetch_add(1, Ordering::Relaxed);
623 }
624 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
625 let _ = self
626 .rejected_due_to_state_drift
627 .fetch_add(1, Ordering::Relaxed);
628 }
629 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
630 let _ = self
631 .rejected_due_to_replay_recovery
632 .fetch_add(1, Ordering::Relaxed);
633 }
634 TxSubmitOutcomeKind::Suppressed => {
635 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
636 }
637 TxSubmitOutcomeKind::LeaderMissed => {
638 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
639 }
640 TxSubmitOutcomeKind::DirectAccepted
641 | TxSubmitOutcomeKind::RpcAccepted
642 | TxSubmitOutcomeKind::Landed
643 | TxSubmitOutcomeKind::Expired
644 | TxSubmitOutcomeKind::Dropped
645 | TxSubmitOutcomeKind::UnhealthyRoute => {}
646 TxSubmitOutcomeKind::BlockhashStale => {
647 let _ = self
648 .submit_on_stale_blockhash
649 .fetch_add(1, Ordering::Relaxed);
650 }
651 }
652 }
653
654 #[must_use]
656 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
657 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
658 TxToxicFlowTelemetrySnapshot {
659 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
660 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
661 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
662 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
663 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
664 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
665 rejected_due_to_replay_recovery: self
666 .rejected_due_to_replay_recovery
667 .load(Ordering::Relaxed),
668 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
669 }
670 }
671}
672
673impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
674 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
675 self.record(outcome);
676 }
677}
678
679#[derive(Debug, Default)]
681pub(crate) struct TxSuppressionCache {
682 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
684}
685
686impl TxSuppressionCache {
687 pub(crate) fn is_suppressed(
689 &mut self,
690 keys: &[TxSubmitSuppressionKey],
691 now: SystemTime,
692 ttl: Duration,
693 ) -> bool {
694 self.evict_expired(now, ttl);
695 keys.iter().any(|key| self.entries.contains_key(key))
696 }
697
698 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
700 for key in keys {
701 let _ = self.entries.insert(key.clone(), now);
702 }
703 }
704
705 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
707 self.entries.retain(|_, inserted_at| {
708 now.duration_since(*inserted_at)
709 .map(|elapsed| elapsed <= ttl)
710 .unwrap_or(false)
711 });
712 }
713}
714
715#[async_trait]
717pub trait RpcSubmitTransport: Send + Sync {
718 async fn submit_rpc(
720 &self,
721 tx_bytes: &[u8],
722 config: &RpcSubmitConfig,
723 ) -> Result<String, SubmitTransportError>;
724}
725
726#[async_trait]
728pub trait DirectSubmitTransport: Send + Sync {
729 async fn submit_direct(
731 &self,
732 tx_bytes: &[u8],
733 targets: &[LeaderTarget],
734 policy: RoutingPolicy,
735 config: &DirectSubmitConfig,
736 ) -> Result<LeaderTarget, SubmitTransportError>;
737}