1use std::{
4 collections::HashMap,
5 hash::{Hash, Hasher},
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10 time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23 RpcOnly,
25 DirectOnly,
27 Hybrid,
29}
30
31#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
33pub enum SubmitReliability {
34 LowLatency,
36 #[default]
38 Balanced,
39 HighReliability,
41}
42
43#[derive(Debug, Clone, Eq, PartialEq)]
45pub enum SignedTx {
46 VersionedTransactionBytes(Vec<u8>),
48 WireTransactionBytes(Vec<u8>),
50}
51
52#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct RpcSubmitConfig {
55 pub skip_preflight: bool,
57 pub preflight_commitment: Option<String>,
59}
60
61impl Default for RpcSubmitConfig {
62 fn default() -> Self {
63 Self {
64 skip_preflight: true,
65 preflight_commitment: None,
66 }
67 }
68}
69
70#[derive(Debug, Clone, Eq, PartialEq)]
72pub struct DirectSubmitConfig {
73 pub per_target_timeout: Duration,
75 pub global_timeout: Duration,
77 pub direct_target_rounds: usize,
79 pub direct_submit_attempts: usize,
81 pub hybrid_direct_attempts: usize,
83 pub rebroadcast_interval: Duration,
85 pub agave_rebroadcast_enabled: bool,
87 pub agave_rebroadcast_window: Duration,
89 pub agave_rebroadcast_interval: Duration,
91 pub hybrid_rpc_broadcast: bool,
93 pub latency_aware_targeting: bool,
95 pub latency_probe_timeout: Duration,
97 pub latency_probe_port: Option<u16>,
99 pub latency_probe_max_targets: usize,
101}
102
103impl DirectSubmitConfig {
104 #[must_use]
106 pub const fn from_reliability(reliability: SubmitReliability) -> Self {
107 match reliability {
108 SubmitReliability::LowLatency => Self {
109 per_target_timeout: Duration::from_millis(200),
110 global_timeout: Duration::from_millis(1_200),
111 direct_target_rounds: 3,
112 direct_submit_attempts: 3,
113 hybrid_direct_attempts: 2,
114 rebroadcast_interval: Duration::from_millis(90),
115 agave_rebroadcast_enabled: true,
116 agave_rebroadcast_window: Duration::from_secs(30),
117 agave_rebroadcast_interval: Duration::from_millis(700),
118 hybrid_rpc_broadcast: false,
119 latency_aware_targeting: true,
120 latency_probe_timeout: Duration::from_millis(80),
121 latency_probe_port: Some(8899),
122 latency_probe_max_targets: 128,
123 },
124 SubmitReliability::Balanced => Self {
125 per_target_timeout: Duration::from_millis(300),
126 global_timeout: Duration::from_millis(1_800),
127 direct_target_rounds: 4,
128 direct_submit_attempts: 4,
129 hybrid_direct_attempts: 3,
130 rebroadcast_interval: Duration::from_millis(110),
131 agave_rebroadcast_enabled: true,
132 agave_rebroadcast_window: Duration::from_secs(45),
133 agave_rebroadcast_interval: Duration::from_millis(800),
134 hybrid_rpc_broadcast: true,
135 latency_aware_targeting: true,
136 latency_probe_timeout: Duration::from_millis(120),
137 latency_probe_port: Some(8899),
138 latency_probe_max_targets: 128,
139 },
140 SubmitReliability::HighReliability => Self {
141 per_target_timeout: Duration::from_millis(450),
142 global_timeout: Duration::from_millis(3_200),
143 direct_target_rounds: 6,
144 direct_submit_attempts: 5,
145 hybrid_direct_attempts: 4,
146 rebroadcast_interval: Duration::from_millis(140),
147 agave_rebroadcast_enabled: true,
148 agave_rebroadcast_window: Duration::from_secs(70),
149 agave_rebroadcast_interval: Duration::from_millis(900),
150 hybrid_rpc_broadcast: true,
151 latency_aware_targeting: true,
152 latency_probe_timeout: Duration::from_millis(160),
153 latency_probe_port: Some(8899),
154 latency_probe_max_targets: 128,
155 },
156 }
157 }
158
159 #[must_use]
161 pub const fn normalized(self) -> Self {
162 let direct_target_rounds = if self.direct_target_rounds == 0 {
163 1
164 } else {
165 self.direct_target_rounds
166 };
167 let direct_submit_attempts = if self.direct_submit_attempts == 0 {
168 1
169 } else {
170 self.direct_submit_attempts
171 };
172 let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
173 1
174 } else {
175 self.hybrid_direct_attempts
176 };
177 let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
178 1
179 } else {
180 self.latency_probe_max_targets
181 };
182 let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
183 Duration::from_millis(1)
184 } else {
185 self.rebroadcast_interval
186 };
187 let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
188 Duration::from_millis(1)
189 } else {
190 self.agave_rebroadcast_interval
191 };
192 Self {
193 per_target_timeout: self.per_target_timeout,
194 global_timeout: self.global_timeout,
195 direct_target_rounds,
196 direct_submit_attempts,
197 hybrid_direct_attempts,
198 rebroadcast_interval,
199 agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
200 agave_rebroadcast_window: self.agave_rebroadcast_window,
201 agave_rebroadcast_interval,
202 hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
203 latency_aware_targeting: self.latency_aware_targeting,
204 latency_probe_timeout: self.latency_probe_timeout,
205 latency_probe_port: self.latency_probe_port,
206 latency_probe_max_targets,
207 }
208 }
209}
210
211impl Default for DirectSubmitConfig {
212 fn default() -> Self {
213 Self::from_reliability(SubmitReliability::default())
214 }
215}
216
217#[derive(Debug, Error, Clone, Eq, PartialEq)]
219pub enum SubmitTransportError {
220 #[error("transport configuration invalid: {message}")]
222 Config {
223 message: String,
225 },
226 #[error("transport failure: {message}")]
228 Failure {
229 message: String,
231 },
232}
233
234#[derive(Debug, Error)]
236pub enum SubmitError {
237 #[error("failed to build/sign transaction: {source}")]
239 Build {
240 source: BuilderError,
242 },
243 #[error("blockhash provider returned no recent blockhash")]
245 MissingRecentBlockhash,
246 #[error("failed to decode signed transaction bytes: {source}")]
248 DecodeSignedBytes {
249 source: Box<bincode::ErrorKind>,
251 },
252 #[error("duplicate signature suppressed by dedupe window")]
254 DuplicateSignature,
255 #[error("rpc transport is not configured")]
257 MissingRpcTransport,
258 #[error("direct transport is not configured")]
260 MissingDirectTransport,
261 #[error("no direct targets resolved from leader/backups")]
263 NoDirectTargets,
264 #[error("direct submit failed: {source}")]
266 Direct {
267 source: SubmitTransportError,
269 },
270 #[error("rpc submit failed: {source}")]
272 Rpc {
273 source: SubmitTransportError,
275 },
276 #[error("internal synchronization failure: {message}")]
278 InternalSync {
279 message: String,
281 },
282 #[error("submission rejected by toxic-flow guard: {reason}")]
284 ToxicFlow {
285 reason: TxToxicFlowRejectionReason,
287 },
288}
289
290#[derive(Debug, Clone, Eq, PartialEq)]
292pub struct SubmitResult {
293 pub signature: Option<Signature>,
295 pub mode: SubmitMode,
297 pub direct_target: Option<LeaderTarget>,
299 pub rpc_signature: Option<String>,
301 pub used_rpc_fallback: bool,
303 pub selected_target_count: usize,
305 pub selected_identity_count: usize,
307}
308
309#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
311pub enum TxFlowSafetyQuality {
312 Stable,
314 Provisional,
316 ReorgRisk,
318 Stale,
320 Degraded,
322 IncompleteControlPlane,
324}
325
326#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
328pub enum TxFlowSafetyIssue {
329 ReplayRecoveryPending,
331 MissingControlPlane,
333 StaleControlPlane,
335 DegradedControlPlane,
337 ReorgRisk,
339 Provisional,
341}
342
343#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
345pub struct TxFlowSafetySnapshot {
346 pub quality: TxFlowSafetyQuality,
348 pub issues: Vec<TxFlowSafetyIssue>,
350 pub current_state_version: Option<u64>,
352 pub replay_recovery_pending: bool,
354}
355
356impl TxFlowSafetySnapshot {
357 #[must_use]
359 pub const fn is_safe(&self) -> bool {
360 matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
361 }
362}
363
364pub trait TxFlowSafetySource: Send + Sync {
366 fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
368}
369
370#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
372pub enum TxSubmitSuppressionKey {
373 Signature(Signature),
375 Opportunity([u8; 32]),
377 AccountSet([u8; 32]),
379 SlotWindow {
381 slot: u64,
383 window: u64,
385 },
386}
387
388impl Hash for TxSubmitSuppressionKey {
389 fn hash<H: Hasher>(&self, state: &mut H) {
390 match self {
391 Self::Signature(signature) => {
392 0_u8.hash(state);
393 signature.as_array().hash(state);
394 }
395 Self::Opportunity(key) => {
396 1_u8.hash(state);
397 key.hash(state);
398 }
399 Self::AccountSet(key) => {
400 2_u8.hash(state);
401 key.hash(state);
402 }
403 Self::SlotWindow { slot, window } => {
404 3_u8.hash(state);
405 slot.hash(state);
406 window.hash(state);
407 }
408 }
409 }
410}
411
412#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
414pub struct TxSubmitContext {
415 pub suppression_keys: Vec<TxSubmitSuppressionKey>,
417 pub decision_state_version: Option<u64>,
419 pub opportunity_created_at: Option<SystemTime>,
421}
422
423#[derive(Debug, Clone, Eq, PartialEq)]
425pub struct TxSubmitGuardPolicy {
426 pub require_stable_control_plane: bool,
428 pub reject_on_replay_recovery_pending: bool,
430 pub max_state_version_drift: Option<u64>,
432 pub max_opportunity_age: Option<Duration>,
434 pub suppression_ttl: Duration,
436}
437
438impl Default for TxSubmitGuardPolicy {
439 fn default() -> Self {
440 Self {
441 require_stable_control_plane: true,
442 reject_on_replay_recovery_pending: true,
443 max_state_version_drift: Some(4),
444 max_opportunity_age: Some(Duration::from_millis(750)),
445 suppression_ttl: Duration::from_millis(750),
446 }
447 }
448}
449
450#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
452pub enum TxToxicFlowRejectionReason {
453 #[error("control-plane quality {quality:?} is not safe for submit")]
455 UnsafeControlPlane {
456 quality: TxFlowSafetyQuality,
458 },
459 #[error("submit source is still recovering replay continuity")]
461 ReplayRecoveryPending,
462 #[error("submit suppressed by active key")]
464 Suppressed,
465 #[error("state version drift {drift} exceeded maximum {max_allowed}")]
467 StateDrift {
468 drift: u64,
470 max_allowed: u64,
472 },
473 #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
475 OpportunityStale {
476 age_ms: u64,
478 max_allowed_ms: u64,
480 },
481}
482
483#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
485pub enum TxSubmitOutcomeKind {
486 DirectAccepted,
488 RpcAccepted,
490 Landed,
492 Expired,
494 Dropped,
496 LeaderMissed,
498 BlockhashStale,
500 UnhealthyRoute,
502 RejectedDueToStaleness,
504 RejectedDueToReorgRisk,
506 RejectedDueToStateDrift,
508 RejectedDueToReplayRecovery,
510 Suppressed,
512}
513
514#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
516pub struct TxSubmitOutcome {
517 pub kind: TxSubmitOutcomeKind,
519 pub signature: Option<Signature>,
521 pub mode: SubmitMode,
523 pub state_version: Option<u64>,
525 pub opportunity_age_ms: Option<u64>,
527}
528
529pub trait TxSubmitOutcomeReporter: Send + Sync {
531 fn record_outcome(&self, outcome: &TxSubmitOutcome);
533}
534
535#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
537pub struct TxToxicFlowTelemetrySnapshot {
538 pub rejected_due_to_staleness: u64,
540 pub rejected_due_to_reorg_risk: u64,
542 pub rejected_due_to_state_drift: u64,
544 pub submit_on_stale_blockhash: u64,
546 pub leader_route_miss_rate: u64,
548 pub opportunity_age_at_send_ms: Option<u64>,
550 pub rejected_due_to_replay_recovery: u64,
552 pub suppressed_submissions: u64,
554}
555
556#[derive(Debug, Default)]
558pub struct TxToxicFlowTelemetry {
559 rejected_due_to_staleness: AtomicU64,
561 rejected_due_to_reorg_risk: AtomicU64,
563 rejected_due_to_state_drift: AtomicU64,
565 submit_on_stale_blockhash: AtomicU64,
567 leader_route_miss_rate: AtomicU64,
569 opportunity_age_at_send_ms: AtomicU64,
571 rejected_due_to_replay_recovery: AtomicU64,
573 suppressed_submissions: AtomicU64,
575}
576
577impl TxToxicFlowTelemetry {
578 #[must_use]
580 pub fn shared() -> Arc<Self> {
581 Arc::new(Self::default())
582 }
583
584 pub fn record(&self, outcome: &TxSubmitOutcome) {
586 if let Some(age_ms) = outcome.opportunity_age_ms {
587 let _ = self
588 .opportunity_age_at_send_ms
589 .swap(age_ms, Ordering::Relaxed);
590 }
591 match outcome.kind {
592 TxSubmitOutcomeKind::RejectedDueToStaleness => {
593 let _ = self
594 .rejected_due_to_staleness
595 .fetch_add(1, Ordering::Relaxed);
596 }
597 TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
598 let _ = self
599 .rejected_due_to_reorg_risk
600 .fetch_add(1, Ordering::Relaxed);
601 }
602 TxSubmitOutcomeKind::RejectedDueToStateDrift => {
603 let _ = self
604 .rejected_due_to_state_drift
605 .fetch_add(1, Ordering::Relaxed);
606 }
607 TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
608 let _ = self
609 .rejected_due_to_replay_recovery
610 .fetch_add(1, Ordering::Relaxed);
611 }
612 TxSubmitOutcomeKind::Suppressed => {
613 let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
614 }
615 TxSubmitOutcomeKind::LeaderMissed => {
616 let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
617 }
618 TxSubmitOutcomeKind::DirectAccepted
619 | TxSubmitOutcomeKind::RpcAccepted
620 | TxSubmitOutcomeKind::Landed
621 | TxSubmitOutcomeKind::Expired
622 | TxSubmitOutcomeKind::Dropped
623 | TxSubmitOutcomeKind::UnhealthyRoute => {}
624 TxSubmitOutcomeKind::BlockhashStale => {
625 let _ = self
626 .submit_on_stale_blockhash
627 .fetch_add(1, Ordering::Relaxed);
628 }
629 }
630 }
631
632 #[must_use]
634 pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
635 let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
636 TxToxicFlowTelemetrySnapshot {
637 rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
638 rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
639 rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
640 submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
641 leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
642 opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
643 rejected_due_to_replay_recovery: self
644 .rejected_due_to_replay_recovery
645 .load(Ordering::Relaxed),
646 suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
647 }
648 }
649}
650
651impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
652 fn record_outcome(&self, outcome: &TxSubmitOutcome) {
653 self.record(outcome);
654 }
655}
656
657#[derive(Debug, Default)]
659pub(crate) struct TxSuppressionCache {
660 entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
662}
663
664impl TxSuppressionCache {
665 pub(crate) fn is_suppressed(
667 &mut self,
668 keys: &[TxSubmitSuppressionKey],
669 now: SystemTime,
670 ttl: Duration,
671 ) -> bool {
672 self.evict_expired(now, ttl);
673 keys.iter().any(|key| self.entries.contains_key(key))
674 }
675
676 pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
678 for key in keys {
679 let _ = self.entries.insert(key.clone(), now);
680 }
681 }
682
683 fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
685 self.entries.retain(|_, inserted_at| {
686 now.duration_since(*inserted_at)
687 .map(|elapsed| elapsed <= ttl)
688 .unwrap_or(false)
689 });
690 }
691}
692
693#[async_trait]
695pub trait RpcSubmitTransport: Send + Sync {
696 async fn submit_rpc(
698 &self,
699 tx_bytes: &[u8],
700 config: &RpcSubmitConfig,
701 ) -> Result<String, SubmitTransportError>;
702}
703
704#[async_trait]
706pub trait DirectSubmitTransport: Send + Sync {
707 async fn submit_direct(
709 &self,
710 tx_bytes: &[u8],
711 targets: &[LeaderTarget],
712 policy: RoutingPolicy,
713 config: &DirectSubmitConfig,
714 ) -> Result<LeaderTarget, SubmitTransportError>;
715}