Skip to main content

sof_tx/submit/
types.rs

1//! Shared submission types, errors, and transport traits.
2
3use std::{
4    collections::HashMap,
5    hash::{Hash, Hasher},
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10    time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20/// Runtime submit mode.
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23    /// Submit only through JSON-RPC.
24    RpcOnly,
25    /// Submit only through Jito block engine.
26    JitoOnly,
27    /// Submit only through direct leader/validator targets.
28    DirectOnly,
29    /// Submit direct first, then RPC fallback on failure.
30    Hybrid,
31}
32
33/// Reliability profile for direct and hybrid submission behavior.
34#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36    /// Fastest path with minimal retrying.
37    LowLatency,
38    /// Balanced latency and retry behavior.
39    #[default]
40    Balanced,
41    /// Aggressive retrying before giving up.
42    HighReliability,
43}
44
45/// Signed transaction payload variants accepted by submit APIs.
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48    /// Bincode-serialized `VersionedTransaction` bytes.
49    VersionedTransactionBytes(Vec<u8>),
50    /// Wire-format transaction bytes.
51    WireTransactionBytes(Vec<u8>),
52}
53
54/// RPC submit tuning.
55#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57    /// Skip preflight simulation when true.
58    pub skip_preflight: bool,
59    /// Optional preflight commitment string.
60    pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64    fn default() -> Self {
65        Self {
66            skip_preflight: true,
67            preflight_commitment: None,
68        }
69    }
70}
71
72/// Jito block-engine submit tuning.
73#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75    /// Enables revert protection by sending through bundle-only mode.
76    pub bundle_only: bool,
77}
78
79/// Direct submit tuning.
80#[derive(Debug, Clone, Eq, PartialEq)]
81pub struct DirectSubmitConfig {
82    /// Per-target send timeout.
83    pub per_target_timeout: Duration,
84    /// Global send budget for one submission.
85    pub global_timeout: Duration,
86    /// Number of rounds to iterate across selected direct targets.
87    pub direct_target_rounds: usize,
88    /// Number of direct-only submit attempts (target selection can refresh per attempt).
89    pub direct_submit_attempts: usize,
90    /// Number of direct submit attempts in `Hybrid` mode before RPC fallback.
91    pub hybrid_direct_attempts: usize,
92    /// Delay between direct rebroadcast attempts/rounds (Agave-like pacing).
93    pub rebroadcast_interval: Duration,
94    /// Enables Agave-style post-ack rebroadcast persistence for direct submits.
95    pub agave_rebroadcast_enabled: bool,
96    /// Maximum time budget for background rebroadcast persistence.
97    pub agave_rebroadcast_window: Duration,
98    /// Delay between background rebroadcast cycles.
99    pub agave_rebroadcast_interval: Duration,
100    /// When true, `Hybrid` mode broadcasts to RPC even after direct send succeeds.
101    pub hybrid_rpc_broadcast: bool,
102    /// Enables latency-aware ordering of direct targets before submit.
103    pub latency_aware_targeting: bool,
104    /// Timeout used for per-target TCP latency probes.
105    pub latency_probe_timeout: Duration,
106    /// Optional extra TCP port to probe (in addition to target TPU port).
107    pub latency_probe_port: Option<u16>,
108    /// Max number of targets to probe per submission.
109    pub latency_probe_max_targets: usize,
110}
111
112impl DirectSubmitConfig {
113    /// Builds a direct-submit config from a reliability profile.
114    #[must_use]
115    pub const fn from_reliability(reliability: SubmitReliability) -> Self {
116        match reliability {
117            SubmitReliability::LowLatency => Self {
118                per_target_timeout: Duration::from_millis(200),
119                global_timeout: Duration::from_millis(1_200),
120                direct_target_rounds: 3,
121                direct_submit_attempts: 3,
122                hybrid_direct_attempts: 2,
123                rebroadcast_interval: Duration::from_millis(90),
124                agave_rebroadcast_enabled: true,
125                agave_rebroadcast_window: Duration::from_secs(30),
126                agave_rebroadcast_interval: Duration::from_millis(700),
127                hybrid_rpc_broadcast: false,
128                latency_aware_targeting: true,
129                latency_probe_timeout: Duration::from_millis(80),
130                latency_probe_port: Some(8899),
131                latency_probe_max_targets: 128,
132            },
133            SubmitReliability::Balanced => Self {
134                per_target_timeout: Duration::from_millis(300),
135                global_timeout: Duration::from_millis(1_800),
136                direct_target_rounds: 4,
137                direct_submit_attempts: 4,
138                hybrid_direct_attempts: 3,
139                rebroadcast_interval: Duration::from_millis(110),
140                agave_rebroadcast_enabled: true,
141                agave_rebroadcast_window: Duration::from_secs(45),
142                agave_rebroadcast_interval: Duration::from_millis(800),
143                hybrid_rpc_broadcast: true,
144                latency_aware_targeting: true,
145                latency_probe_timeout: Duration::from_millis(120),
146                latency_probe_port: Some(8899),
147                latency_probe_max_targets: 128,
148            },
149            SubmitReliability::HighReliability => Self {
150                per_target_timeout: Duration::from_millis(450),
151                global_timeout: Duration::from_millis(3_200),
152                direct_target_rounds: 6,
153                direct_submit_attempts: 5,
154                hybrid_direct_attempts: 4,
155                rebroadcast_interval: Duration::from_millis(140),
156                agave_rebroadcast_enabled: true,
157                agave_rebroadcast_window: Duration::from_secs(70),
158                agave_rebroadcast_interval: Duration::from_millis(900),
159                hybrid_rpc_broadcast: true,
160                latency_aware_targeting: true,
161                latency_probe_timeout: Duration::from_millis(160),
162                latency_probe_port: Some(8899),
163                latency_probe_max_targets: 128,
164            },
165        }
166    }
167
168    /// Returns this config with minimum valid retry counters.
169    #[must_use]
170    pub const fn normalized(self) -> Self {
171        let direct_target_rounds = if self.direct_target_rounds == 0 {
172            1
173        } else {
174            self.direct_target_rounds
175        };
176        let direct_submit_attempts = if self.direct_submit_attempts == 0 {
177            1
178        } else {
179            self.direct_submit_attempts
180        };
181        let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
182            1
183        } else {
184            self.hybrid_direct_attempts
185        };
186        let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
187            1
188        } else {
189            self.latency_probe_max_targets
190        };
191        let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
192            Duration::from_millis(1)
193        } else {
194            self.rebroadcast_interval
195        };
196        let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
197            Duration::from_millis(1)
198        } else {
199            self.agave_rebroadcast_interval
200        };
201        Self {
202            per_target_timeout: self.per_target_timeout,
203            global_timeout: self.global_timeout,
204            direct_target_rounds,
205            direct_submit_attempts,
206            hybrid_direct_attempts,
207            rebroadcast_interval,
208            agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
209            agave_rebroadcast_window: self.agave_rebroadcast_window,
210            agave_rebroadcast_interval,
211            hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
212            latency_aware_targeting: self.latency_aware_targeting,
213            latency_probe_timeout: self.latency_probe_timeout,
214            latency_probe_port: self.latency_probe_port,
215            latency_probe_max_targets,
216        }
217    }
218}
219
220impl Default for DirectSubmitConfig {
221    fn default() -> Self {
222        Self::from_reliability(SubmitReliability::default())
223    }
224}
225
226/// Low-level transport errors surfaced by submit backends.
227#[derive(Debug, Error, Clone, Eq, PartialEq)]
228pub enum SubmitTransportError {
229    /// Invalid transport configuration.
230    #[error("transport configuration invalid: {message}")]
231    Config {
232        /// Human-readable description.
233        message: String,
234    },
235    /// Transport operation failed.
236    #[error("transport failure: {message}")]
237    Failure {
238        /// Human-readable description.
239        message: String,
240    },
241}
242
243/// Submission-level errors.
244#[derive(Debug, Error)]
245pub enum SubmitError {
246    /// Could not build/sign transaction for builder submit path.
247    #[error("failed to build/sign transaction: {source}")]
248    Build {
249        /// Builder-layer failure.
250        source: BuilderError,
251    },
252    /// No blockhash available for builder submit path.
253    #[error("blockhash provider returned no recent blockhash")]
254    MissingRecentBlockhash,
255    /// Signed bytes could not be decoded into a transaction.
256    #[error("failed to decode signed transaction bytes: {source}")]
257    DecodeSignedBytes {
258        /// Bincode decode error.
259        source: Box<bincode::ErrorKind>,
260    },
261    /// Duplicate signature was suppressed by dedupe window.
262    #[error("duplicate signature suppressed by dedupe window")]
263    DuplicateSignature,
264    /// RPC mode requested but no RPC transport was configured.
265    #[error("rpc transport is not configured")]
266    MissingRpcTransport,
267    /// Jito mode requested but no Jito transport was configured.
268    #[error("jito transport is not configured")]
269    MissingJitoTransport,
270    /// Direct mode requested but no direct transport was configured.
271    #[error("direct transport is not configured")]
272    MissingDirectTransport,
273    /// No direct targets resolved from routing inputs.
274    #[error("no direct targets resolved from leader/backups")]
275    NoDirectTargets,
276    /// Direct transport failure.
277    #[error("direct submit failed: {source}")]
278    Direct {
279        /// Direct transport error.
280        source: SubmitTransportError,
281    },
282    /// RPC transport failure.
283    #[error("rpc submit failed: {source}")]
284    Rpc {
285        /// RPC transport error.
286        source: SubmitTransportError,
287    },
288    /// Jito transport failure.
289    #[error("jito submit failed: {source}")]
290    Jito {
291        /// Jito transport error.
292        source: SubmitTransportError,
293    },
294    /// Internal synchronization failure.
295    #[error("internal synchronization failure: {message}")]
296    InternalSync {
297        /// Synchronization error details.
298        message: String,
299    },
300    /// Submit attempt was rejected by the toxic-flow guard.
301    #[error("submission rejected by toxic-flow guard: {reason}")]
302    ToxicFlow {
303        /// Structured reason for the rejection.
304        reason: TxToxicFlowRejectionReason,
305    },
306}
307
308/// Summary of a successful submission.
309#[derive(Debug, Clone, Eq, PartialEq)]
310pub struct SubmitResult {
311    /// Signature parsed from submitted transaction bytes.
312    pub signature: Option<Signature>,
313    /// Mode selected by caller.
314    pub mode: SubmitMode,
315    /// Target chosen by direct path when applicable.
316    pub direct_target: Option<LeaderTarget>,
317    /// RPC-returned signature string when RPC path succeeded.
318    pub rpc_signature: Option<String>,
319    /// Jito block-engine returned signature string when Jito path succeeded.
320    pub jito_signature: Option<String>,
321    /// True when RPC fallback was used from hybrid mode.
322    pub used_rpc_fallback: bool,
323    /// Number of direct targets selected for submit attempt that succeeded.
324    pub selected_target_count: usize,
325    /// Number of unique validator identities in selected direct targets.
326    pub selected_identity_count: usize,
327}
328
329/// Coarse toxic-flow quality used by submit guards.
330#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
331pub enum TxFlowSafetyQuality {
332    /// Required inputs are present, coherent, and safe to use.
333    Stable,
334    /// Inputs exist but have not reached a stable confirmation boundary yet.
335    Provisional,
336    /// Inputs exist but the current branch still carries material reorg risk.
337    ReorgRisk,
338    /// Inputs are present but stale.
339    Stale,
340    /// Inputs are present but mutually inconsistent.
341    Degraded,
342    /// Required inputs are still missing.
343    IncompleteControlPlane,
344}
345
346/// One concrete toxic-flow issue reported by a submit guard source.
347#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
348pub enum TxFlowSafetyIssue {
349    /// Submit source is still recovering from replay/checkpoint continuity.
350    ReplayRecoveryPending,
351    /// Control-plane inputs are still missing.
352    MissingControlPlane,
353    /// Control-plane inputs are stale.
354    StaleControlPlane,
355    /// Control-plane inputs are inconsistent.
356    DegradedControlPlane,
357    /// Current branch carries reorg risk.
358    ReorgRisk,
359    /// Current branch is still provisional.
360    Provisional,
361}
362
363/// Current toxic-flow safety snapshot exposed by one submit guard source.
364#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
365pub struct TxFlowSafetySnapshot {
366    /// Coarse quality classification for the current control-plane view.
367    pub quality: TxFlowSafetyQuality,
368    /// Concrete issues behind `quality`.
369    pub issues: Vec<TxFlowSafetyIssue>,
370    /// Current upstream state version when known.
371    pub current_state_version: Option<u64>,
372    /// True when replay recovery is still pending.
373    pub replay_recovery_pending: bool,
374}
375
376impl TxFlowSafetySnapshot {
377    /// Returns true when the current snapshot is strategy-safe.
378    #[must_use]
379    pub const fn is_safe(&self) -> bool {
380        matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
381    }
382}
383
384/// Dynamic source of toxic-flow safety state for submit guards.
385pub trait TxFlowSafetySource: Send + Sync {
386    /// Returns the latest toxic-flow safety snapshot.
387    fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
388}
389
390/// One key used to suppress repeated submission attempts for the same opportunity.
391#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
392pub enum TxSubmitSuppressionKey {
393    /// Suppress by signature.
394    Signature(Signature),
395    /// Suppress by opaque opportunity identifier.
396    Opportunity([u8; 32]),
397    /// Suppress by hashed account set identifier.
398    AccountSet([u8; 32]),
399    /// Suppress by slot-window key.
400    SlotWindow {
401        /// Slot associated with the opportunity.
402        slot: u64,
403        /// Window width used to group nearby opportunities.
404        window: u64,
405    },
406}
407
408impl Hash for TxSubmitSuppressionKey {
409    fn hash<H: Hasher>(&self, state: &mut H) {
410        match self {
411            Self::Signature(signature) => {
412                0_u8.hash(state);
413                signature.as_array().hash(state);
414            }
415            Self::Opportunity(key) => {
416                1_u8.hash(state);
417                key.hash(state);
418            }
419            Self::AccountSet(key) => {
420                2_u8.hash(state);
421                key.hash(state);
422            }
423            Self::SlotWindow { slot, window } => {
424                3_u8.hash(state);
425                slot.hash(state);
426                window.hash(state);
427            }
428        }
429    }
430}
431
432/// Call-site context used by toxic-flow guards.
433#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
434pub struct TxSubmitContext {
435    /// Additional suppression keys to apply before submit.
436    pub suppression_keys: Vec<TxSubmitSuppressionKey>,
437    /// State version used when the submit decision was made.
438    pub decision_state_version: Option<u64>,
439    /// Timestamp when the opportunity or decision was created.
440    pub opportunity_created_at: Option<SystemTime>,
441}
442
443/// Policy controlling toxic-flow submit rejection.
444#[derive(Debug, Clone, Eq, PartialEq)]
445pub struct TxSubmitGuardPolicy {
446    /// Reject when the flow-safety source is not `Stable`.
447    pub require_stable_control_plane: bool,
448    /// Reject when the flow-safety source reports replay recovery pending.
449    pub reject_on_replay_recovery_pending: bool,
450    /// Maximum allowed drift between decision and current state versions.
451    pub max_state_version_drift: Option<u64>,
452    /// Maximum allowed age for one opportunity before submit.
453    pub max_opportunity_age: Option<Duration>,
454    /// TTL applied to built-in suppression keys.
455    pub suppression_ttl: Duration,
456}
457
458impl Default for TxSubmitGuardPolicy {
459    fn default() -> Self {
460        Self {
461            require_stable_control_plane: true,
462            reject_on_replay_recovery_pending: true,
463            max_state_version_drift: Some(4),
464            max_opportunity_age: Some(Duration::from_millis(750)),
465            suppression_ttl: Duration::from_millis(750),
466        }
467    }
468}
469
470/// Concrete reason one submit attempt was rejected before transport.
471#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
472pub enum TxToxicFlowRejectionReason {
473    /// Control-plane quality is not safe enough.
474    #[error("control-plane quality {quality:?} is not safe for submit")]
475    UnsafeControlPlane {
476        /// Current quality observed from the guard source.
477        quality: TxFlowSafetyQuality,
478    },
479    /// Replay recovery is still pending.
480    #[error("submit source is still recovering replay continuity")]
481    ReplayRecoveryPending,
482    /// One suppression key is still active.
483    #[error("submit suppressed by active key")]
484    Suppressed,
485    /// State version drift exceeded policy.
486    #[error("state version drift {drift} exceeded maximum {max_allowed}")]
487    StateDrift {
488        /// Observed drift between decision and current state versions.
489        drift: u64,
490        /// Maximum drift allowed by policy.
491        max_allowed: u64,
492    },
493    /// Opportunity age exceeded policy.
494    #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
495    OpportunityStale {
496        /// Observed age in milliseconds.
497        age_ms: u64,
498        /// Maximum age allowed in milliseconds.
499        max_allowed_ms: u64,
500    },
501}
502
503/// Final or immediate outcome classification for one submit attempt.
504#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
505pub enum TxSubmitOutcomeKind {
506    /// Direct path accepted the transaction.
507    DirectAccepted,
508    /// RPC path accepted the transaction.
509    RpcAccepted,
510    /// Jito block-engine accepted the transaction.
511    JitoAccepted,
512    /// Transaction landed on chain.
513    Landed,
514    /// Transaction expired before landing.
515    Expired,
516    /// Transaction was dropped before landing.
517    Dropped,
518    /// Route missed the intended leader window.
519    LeaderMissed,
520    /// Submit used a stale blockhash.
521    BlockhashStale,
522    /// Selected route was unhealthy.
523    UnhealthyRoute,
524    /// Submit was rejected due to stale inputs.
525    RejectedDueToStaleness,
526    /// Submit was rejected due to reorg risk.
527    RejectedDueToReorgRisk,
528    /// Submit was rejected due to state drift.
529    RejectedDueToStateDrift,
530    /// Submit was rejected by replay recovery pending.
531    RejectedDueToReplayRecovery,
532    /// Submit was suppressed by a built-in key.
533    Suppressed,
534}
535
536/// Structured outcome record for toxic-flow telemetry/reporting.
537#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
538pub struct TxSubmitOutcome {
539    /// Outcome classification.
540    pub kind: TxSubmitOutcomeKind,
541    /// Transaction signature when available.
542    pub signature: Option<Signature>,
543    /// Mode selected for the submit attempt.
544    pub mode: SubmitMode,
545    /// Current state version at outcome time when known.
546    pub state_version: Option<u64>,
547    /// Opportunity age in milliseconds when known.
548    pub opportunity_age_ms: Option<u64>,
549}
550
551/// Callback surface for external outcome sinks.
552pub trait TxSubmitOutcomeReporter: Send + Sync {
553    /// Records one structured outcome.
554    fn record_outcome(&self, outcome: &TxSubmitOutcome);
555}
556
557/// Snapshot of built-in toxic-flow counters collected by [`TxSubmitClient`](crate::submit::TxSubmitClient).
558#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
559pub struct TxToxicFlowTelemetrySnapshot {
560    /// Number of submit attempts rejected due to stale inputs.
561    pub rejected_due_to_staleness: u64,
562    /// Number of submit attempts rejected due to reorg risk.
563    pub rejected_due_to_reorg_risk: u64,
564    /// Number of submit attempts rejected due to state drift.
565    pub rejected_due_to_state_drift: u64,
566    /// Number of accepted submits emitted while the source was stale.
567    pub submit_on_stale_blockhash: u64,
568    /// Number of route misses classified as leader misses.
569    pub leader_route_miss_rate: u64,
570    /// Most recent opportunity age seen at submit time.
571    pub opportunity_age_at_send_ms: Option<u64>,
572    /// Number of submits rejected while replay recovery was pending.
573    pub rejected_due_to_replay_recovery: u64,
574    /// Number of submits suppressed by a built-in key.
575    pub suppressed_submissions: u64,
576}
577
578/// One cache-line-aligned atomic counter used by hot telemetry updates.
579#[derive(Debug, Default)]
580#[repr(align(64))]
581struct CacheAlignedAtomicU64(AtomicU64);
582
583impl CacheAlignedAtomicU64 {
584    /// Loads the current counter value.
585    fn load(&self, ordering: Ordering) -> u64 {
586        self.0.load(ordering)
587    }
588
589    /// Stores a new value and returns the previous one.
590    fn swap(&self, value: u64, ordering: Ordering) -> u64 {
591        self.0.swap(value, ordering)
592    }
593
594    /// Increments the counter and returns the previous value.
595    fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
596        self.0.fetch_add(value, ordering)
597    }
598}
599
600/// In-memory telemetry counters for toxic-flow outcomes.
601#[derive(Debug, Default)]
602pub struct TxToxicFlowTelemetry {
603    /// Number of stale-input rejections.
604    rejected_due_to_staleness: CacheAlignedAtomicU64,
605    /// Number of reorg-risk rejections.
606    rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
607    /// Number of state-drift rejections.
608    rejected_due_to_state_drift: CacheAlignedAtomicU64,
609    /// Number of accepted submits observed with stale blockhash state.
610    submit_on_stale_blockhash: CacheAlignedAtomicU64,
611    /// Number of leader-route misses.
612    leader_route_miss_rate: CacheAlignedAtomicU64,
613    /// Last opportunity age seen by the client.
614    opportunity_age_at_send_ms: CacheAlignedAtomicU64,
615    /// Number of replay-recovery rejections.
616    rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
617    /// Number of suppressed submissions.
618    suppressed_submissions: CacheAlignedAtomicU64,
619}
620
621impl TxToxicFlowTelemetry {
622    /// Returns a shareable telemetry sink.
623    #[must_use]
624    pub fn shared() -> Arc<Self> {
625        Arc::new(Self::default())
626    }
627
628    /// Records one structured outcome.
629    pub fn record(&self, outcome: &TxSubmitOutcome) {
630        if let Some(age_ms) = outcome.opportunity_age_ms {
631            let _ = self
632                .opportunity_age_at_send_ms
633                .swap(age_ms, Ordering::Relaxed);
634        }
635        match outcome.kind {
636            TxSubmitOutcomeKind::RejectedDueToStaleness => {
637                let _ = self
638                    .rejected_due_to_staleness
639                    .fetch_add(1, Ordering::Relaxed);
640            }
641            TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
642                let _ = self
643                    .rejected_due_to_reorg_risk
644                    .fetch_add(1, Ordering::Relaxed);
645            }
646            TxSubmitOutcomeKind::RejectedDueToStateDrift => {
647                let _ = self
648                    .rejected_due_to_state_drift
649                    .fetch_add(1, Ordering::Relaxed);
650            }
651            TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
652                let _ = self
653                    .rejected_due_to_replay_recovery
654                    .fetch_add(1, Ordering::Relaxed);
655            }
656            TxSubmitOutcomeKind::Suppressed => {
657                let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
658            }
659            TxSubmitOutcomeKind::LeaderMissed => {
660                let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
661            }
662            TxSubmitOutcomeKind::DirectAccepted
663            | TxSubmitOutcomeKind::RpcAccepted
664            | TxSubmitOutcomeKind::JitoAccepted
665            | TxSubmitOutcomeKind::Landed
666            | TxSubmitOutcomeKind::Expired
667            | TxSubmitOutcomeKind::Dropped
668            | TxSubmitOutcomeKind::UnhealthyRoute => {}
669            TxSubmitOutcomeKind::BlockhashStale => {
670                let _ = self
671                    .submit_on_stale_blockhash
672                    .fetch_add(1, Ordering::Relaxed);
673            }
674        }
675    }
676
677    /// Returns the current telemetry snapshot.
678    #[must_use]
679    pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
680        let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
681        TxToxicFlowTelemetrySnapshot {
682            rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
683            rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
684            rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
685            submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
686            leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
687            opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
688            rejected_due_to_replay_recovery: self
689                .rejected_due_to_replay_recovery
690                .load(Ordering::Relaxed),
691            suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
692        }
693    }
694}
695
696impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
697    fn record_outcome(&self, outcome: &TxSubmitOutcome) {
698        self.record(outcome);
699    }
700}
701
702/// Internal suppression map used by the submit client.
703#[derive(Debug, Default)]
704pub(crate) struct TxSuppressionCache {
705    /// Active suppression entries keyed by opportunity identity.
706    entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
707}
708
709impl TxSuppressionCache {
710    /// Returns true when at least one key is still active inside `ttl`.
711    pub(crate) fn is_suppressed(
712        &mut self,
713        keys: &[TxSubmitSuppressionKey],
714        now: SystemTime,
715        ttl: Duration,
716    ) -> bool {
717        self.evict_expired(now, ttl);
718        keys.iter().any(|key| self.entries.contains_key(key))
719    }
720
721    /// Inserts all provided suppression keys with the current timestamp.
722    pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
723        for key in keys {
724            let _ = self.entries.insert(key.clone(), now);
725        }
726    }
727
728    /// Removes entries older than the current TTL window.
729    fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
730        self.entries.retain(|_, inserted_at| {
731            now.duration_since(*inserted_at)
732                .map(|elapsed| elapsed <= ttl)
733                .unwrap_or(false)
734        });
735    }
736}
737
738/// RPC transport interface.
739#[async_trait]
740pub trait RpcSubmitTransport: Send + Sync {
741    /// Submits transaction bytes to RPC and returns signature string.
742    async fn submit_rpc(
743        &self,
744        tx_bytes: &[u8],
745        config: &RpcSubmitConfig,
746    ) -> Result<String, SubmitTransportError>;
747}
748
749/// Jito transport interface.
750#[async_trait]
751pub trait JitoSubmitTransport: Send + Sync {
752    /// Submits transaction bytes to Jito block engine and returns signature string.
753    async fn submit_jito(
754        &self,
755        tx_bytes: &[u8],
756        config: &JitoSubmitConfig,
757    ) -> Result<String, SubmitTransportError>;
758}
759
760/// Direct transport interface.
761#[async_trait]
762pub trait DirectSubmitTransport: Send + Sync {
763    /// Submits transaction bytes to direct targets and returns the first successful target.
764    async fn submit_direct(
765        &self,
766        tx_bytes: &[u8],
767        targets: &[LeaderTarget],
768        policy: RoutingPolicy,
769        config: &DirectSubmitConfig,
770    ) -> Result<LeaderTarget, SubmitTransportError>;
771}