Skip to main content

sof_tx/submit/
types.rs

1//! Shared submission types, errors, and transport traits.
2
3use std::{
4    collections::HashMap,
5    hash::{Hash, Hasher},
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10    time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sof_types::SignatureBytes;
16use thiserror::Error;
17
18use crate::{providers::LeaderTarget, routing::RoutingPolicy};
19
20/// Runtime submit mode.
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23    /// Submit only through JSON-RPC.
24    RpcOnly,
25    /// Submit only through Jito block engine.
26    JitoOnly,
27    /// Submit only through direct leader/validator targets.
28    DirectOnly,
29    /// Submit direct first, then RPC fallback on failure.
30    Hybrid,
31}
32
33/// Reliability profile for direct and hybrid submission behavior.
34#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36    /// Fastest path with minimal retrying.
37    LowLatency,
38    /// Balanced latency and retry behavior.
39    #[default]
40    Balanced,
41    /// Aggressive retrying before giving up.
42    HighReliability,
43}
44
45/// Signed transaction payload variants accepted by submit APIs.
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48    /// Bincode-serialized `VersionedTransaction` bytes.
49    VersionedTransactionBytes(Vec<u8>),
50    /// Wire-format transaction bytes.
51    WireTransactionBytes(Vec<u8>),
52}
53
54/// RPC submit tuning.
55#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57    /// Skip preflight simulation when true.
58    pub skip_preflight: bool,
59    /// Optional preflight commitment string.
60    pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64    fn default() -> Self {
65        Self {
66            skip_preflight: true,
67            preflight_commitment: None,
68        }
69    }
70}
71
72/// Jito block-engine submit tuning.
73#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75    /// Enables revert protection by sending through bundle-only mode.
76    ///
77    /// This applies to the JSON-RPC transaction path. The gRPC transport is bundle-based and
78    /// therefore always behaves as a bundle submission.
79    pub bundle_only: bool,
80}
81
82/// Successful Jito submission metadata.
83#[derive(Debug, Clone, Eq, PartialEq, Default)]
84pub struct JitoSubmitResponse {
85    /// Jito JSON-RPC returned transaction signature, when available.
86    pub transaction_signature: Option<String>,
87    /// Jito gRPC returned bundle UUID, when available.
88    pub bundle_id: Option<String>,
89}
90
91/// Direct submit tuning.
92#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct DirectSubmitConfig {
94    /// Per-target send timeout.
95    pub per_target_timeout: Duration,
96    /// Global send budget for one submission.
97    pub global_timeout: Duration,
98    /// Number of rounds to iterate across selected direct targets.
99    pub direct_target_rounds: usize,
100    /// Number of direct-only submit attempts (target selection can refresh per attempt).
101    pub direct_submit_attempts: usize,
102    /// Number of direct submit attempts in `Hybrid` mode before RPC fallback.
103    pub hybrid_direct_attempts: usize,
104    /// Delay between direct rebroadcast attempts/rounds (Agave-like pacing).
105    pub rebroadcast_interval: Duration,
106    /// Enables Agave-style post-ack rebroadcast persistence for direct submits.
107    pub agave_rebroadcast_enabled: bool,
108    /// Maximum time budget for background rebroadcast persistence.
109    pub agave_rebroadcast_window: Duration,
110    /// Delay between background rebroadcast cycles.
111    pub agave_rebroadcast_interval: Duration,
112    /// When true, `Hybrid` mode broadcasts to RPC even after direct send succeeds.
113    pub hybrid_rpc_broadcast: bool,
114    /// Enables latency-aware ordering of direct targets before submit.
115    pub latency_aware_targeting: bool,
116    /// Timeout used for per-target TCP latency probes.
117    pub latency_probe_timeout: Duration,
118    /// Optional extra TCP port to probe (in addition to target TPU port).
119    pub latency_probe_port: Option<u16>,
120    /// Max number of targets to probe per submission.
121    pub latency_probe_max_targets: usize,
122}
123
124impl DirectSubmitConfig {
125    /// Builds a direct-submit config from a reliability profile.
126    #[must_use]
127    pub const fn from_reliability(reliability: SubmitReliability) -> Self {
128        match reliability {
129            SubmitReliability::LowLatency => Self {
130                per_target_timeout: Duration::from_millis(200),
131                global_timeout: Duration::from_millis(1_200),
132                direct_target_rounds: 3,
133                direct_submit_attempts: 3,
134                hybrid_direct_attempts: 2,
135                rebroadcast_interval: Duration::from_millis(90),
136                agave_rebroadcast_enabled: true,
137                agave_rebroadcast_window: Duration::from_secs(30),
138                agave_rebroadcast_interval: Duration::from_millis(700),
139                hybrid_rpc_broadcast: false,
140                latency_aware_targeting: true,
141                latency_probe_timeout: Duration::from_millis(80),
142                latency_probe_port: Some(8899),
143                latency_probe_max_targets: 128,
144            },
145            SubmitReliability::Balanced => Self {
146                per_target_timeout: Duration::from_millis(300),
147                global_timeout: Duration::from_millis(1_800),
148                direct_target_rounds: 4,
149                direct_submit_attempts: 4,
150                hybrid_direct_attempts: 3,
151                rebroadcast_interval: Duration::from_millis(110),
152                agave_rebroadcast_enabled: true,
153                agave_rebroadcast_window: Duration::from_secs(45),
154                agave_rebroadcast_interval: Duration::from_millis(800),
155                hybrid_rpc_broadcast: true,
156                latency_aware_targeting: true,
157                latency_probe_timeout: Duration::from_millis(120),
158                latency_probe_port: Some(8899),
159                latency_probe_max_targets: 128,
160            },
161            SubmitReliability::HighReliability => Self {
162                per_target_timeout: Duration::from_millis(450),
163                global_timeout: Duration::from_millis(3_200),
164                direct_target_rounds: 6,
165                direct_submit_attempts: 5,
166                hybrid_direct_attempts: 4,
167                rebroadcast_interval: Duration::from_millis(140),
168                agave_rebroadcast_enabled: true,
169                agave_rebroadcast_window: Duration::from_secs(70),
170                agave_rebroadcast_interval: Duration::from_millis(900),
171                hybrid_rpc_broadcast: true,
172                latency_aware_targeting: true,
173                latency_probe_timeout: Duration::from_millis(160),
174                latency_probe_port: Some(8899),
175                latency_probe_max_targets: 128,
176            },
177        }
178    }
179
180    /// Returns this config with minimum valid retry counters.
181    #[must_use]
182    pub const fn normalized(self) -> Self {
183        let direct_target_rounds = if self.direct_target_rounds == 0 {
184            1
185        } else {
186            self.direct_target_rounds
187        };
188        let direct_submit_attempts = if self.direct_submit_attempts == 0 {
189            1
190        } else {
191            self.direct_submit_attempts
192        };
193        let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
194            1
195        } else {
196            self.hybrid_direct_attempts
197        };
198        let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
199            1
200        } else {
201            self.latency_probe_max_targets
202        };
203        let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
204            Duration::from_millis(1)
205        } else {
206            self.rebroadcast_interval
207        };
208        let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
209            Duration::from_millis(1)
210        } else {
211            self.agave_rebroadcast_interval
212        };
213        Self {
214            per_target_timeout: self.per_target_timeout,
215            global_timeout: self.global_timeout,
216            direct_target_rounds,
217            direct_submit_attempts,
218            hybrid_direct_attempts,
219            rebroadcast_interval,
220            agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
221            agave_rebroadcast_window: self.agave_rebroadcast_window,
222            agave_rebroadcast_interval,
223            hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
224            latency_aware_targeting: self.latency_aware_targeting,
225            latency_probe_timeout: self.latency_probe_timeout,
226            latency_probe_port: self.latency_probe_port,
227            latency_probe_max_targets,
228        }
229    }
230}
231
232impl Default for DirectSubmitConfig {
233    fn default() -> Self {
234        Self::from_reliability(SubmitReliability::default())
235    }
236}
237
238/// Low-level transport errors surfaced by submit backends.
239#[derive(Debug, Error, Clone, Eq, PartialEq)]
240pub enum SubmitTransportError {
241    /// Invalid transport configuration.
242    #[error("transport configuration invalid: {message}")]
243    Config {
244        /// Human-readable description.
245        message: String,
246    },
247    /// Transport operation failed.
248    #[error("transport failure: {message}")]
249    Failure {
250        /// Human-readable description.
251        message: String,
252    },
253}
254
255/// Submission-level errors.
256#[derive(Debug, Error)]
257pub enum SubmitError {
258    /// No blockhash available for unsigned submit path.
259    #[error("blockhash provider returned no recent blockhash")]
260    MissingRecentBlockhash,
261    /// Signed bytes could not be decoded into a transaction.
262    #[error("failed to decode signed transaction bytes: {source}")]
263    DecodeSignedBytes {
264        /// Bincode decode error.
265        source: Box<bincode::ErrorKind>,
266    },
267    /// Duplicate signature was suppressed by dedupe window.
268    #[error("duplicate signature suppressed by dedupe window")]
269    DuplicateSignature,
270    /// RPC mode requested but no RPC transport was configured.
271    #[error("rpc transport is not configured")]
272    MissingRpcTransport,
273    /// Jito mode requested but no Jito transport was configured.
274    #[error("jito transport is not configured")]
275    MissingJitoTransport,
276    /// Direct mode requested but no direct transport was configured.
277    #[error("direct transport is not configured")]
278    MissingDirectTransport,
279    /// No direct targets resolved from routing inputs.
280    #[error("no direct targets resolved from leader/backups")]
281    NoDirectTargets,
282    /// Direct transport failure.
283    #[error("direct submit failed: {source}")]
284    Direct {
285        /// Direct transport error.
286        source: SubmitTransportError,
287    },
288    /// RPC transport failure.
289    #[error("rpc submit failed: {source}")]
290    Rpc {
291        /// RPC transport error.
292        source: SubmitTransportError,
293    },
294    /// Jito transport failure.
295    #[error("jito submit failed: {source}")]
296    Jito {
297        /// Jito transport error.
298        source: SubmitTransportError,
299    },
300    /// Internal synchronization failure.
301    #[error("internal synchronization failure: {message}")]
302    InternalSync {
303        /// Synchronization error details.
304        message: String,
305    },
306    /// Submit attempt was rejected by the toxic-flow guard.
307    #[error("submission rejected by toxic-flow guard: {reason}")]
308    ToxicFlow {
309        /// Structured reason for the rejection.
310        reason: TxToxicFlowRejectionReason,
311    },
312}
313
314/// Summary of a successful submission.
315#[derive(Debug, Clone, Eq, PartialEq)]
316pub struct SubmitResult {
317    /// Signature parsed from submitted transaction bytes.
318    pub signature: Option<SignatureBytes>,
319    /// Mode selected by caller.
320    pub mode: SubmitMode,
321    /// Target chosen by direct path when applicable.
322    pub direct_target: Option<LeaderTarget>,
323    /// RPC-returned signature string when RPC path succeeded.
324    pub rpc_signature: Option<String>,
325    /// Jito block-engine returned transaction signature when the JSON-RPC path succeeded.
326    pub jito_signature: Option<String>,
327    /// Jito block-engine returned bundle UUID when the gRPC bundle path succeeded.
328    pub jito_bundle_id: Option<String>,
329    /// True when RPC fallback was used from hybrid mode.
330    pub used_rpc_fallback: bool,
331    /// Number of direct targets selected for submit attempt that succeeded.
332    pub selected_target_count: usize,
333    /// Number of unique validator identities in selected direct targets.
334    pub selected_identity_count: usize,
335}
336
337/// Coarse toxic-flow quality used by submit guards.
338#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
339pub enum TxFlowSafetyQuality {
340    /// Required inputs are present, coherent, and safe to use.
341    Stable,
342    /// Inputs exist but have not reached a stable confirmation boundary yet.
343    Provisional,
344    /// Inputs exist but the current branch still carries material reorg risk.
345    ReorgRisk,
346    /// Inputs are present but stale.
347    Stale,
348    /// Inputs are present but mutually inconsistent.
349    Degraded,
350    /// Required inputs are still missing.
351    IncompleteControlPlane,
352}
353
354/// One concrete toxic-flow issue reported by a submit guard source.
355#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
356pub enum TxFlowSafetyIssue {
357    /// Submit source is still recovering from replay/checkpoint continuity.
358    ReplayRecoveryPending,
359    /// Control-plane inputs are still missing.
360    MissingControlPlane,
361    /// Control-plane inputs are stale.
362    StaleControlPlane,
363    /// Control-plane inputs are inconsistent.
364    DegradedControlPlane,
365    /// Current branch carries reorg risk.
366    ReorgRisk,
367    /// Current branch is still provisional.
368    Provisional,
369}
370
371/// Current toxic-flow safety snapshot exposed by one submit guard source.
372#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
373pub struct TxFlowSafetySnapshot {
374    /// Coarse quality classification for the current control-plane view.
375    pub quality: TxFlowSafetyQuality,
376    /// Concrete issues behind `quality`.
377    pub issues: Vec<TxFlowSafetyIssue>,
378    /// Current upstream state version when known.
379    pub current_state_version: Option<u64>,
380    /// True when replay recovery is still pending.
381    pub replay_recovery_pending: bool,
382}
383
384impl TxFlowSafetySnapshot {
385    /// Returns true when the current snapshot is strategy-safe.
386    #[must_use]
387    pub const fn is_safe(&self) -> bool {
388        matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
389    }
390}
391
392/// Dynamic source of toxic-flow safety state for submit guards.
393pub trait TxFlowSafetySource: Send + Sync {
394    /// Returns the latest toxic-flow safety snapshot.
395    fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
396}
397
398/// One key used to suppress repeated submission attempts for the same opportunity.
399#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
400pub enum TxSubmitSuppressionKey {
401    /// Suppress by signature.
402    Signature(SignatureBytes),
403    /// Suppress by opaque opportunity identifier.
404    Opportunity([u8; 32]),
405    /// Suppress by hashed account set identifier.
406    AccountSet([u8; 32]),
407    /// Suppress by slot-window key.
408    SlotWindow {
409        /// Slot associated with the opportunity.
410        slot: u64,
411        /// Window width used to group nearby opportunities.
412        window: u64,
413    },
414}
415
416impl Hash for TxSubmitSuppressionKey {
417    fn hash<H: Hasher>(&self, state: &mut H) {
418        match self {
419            Self::Signature(signature) => {
420                0_u8.hash(state);
421                signature.as_array().hash(state);
422            }
423            Self::Opportunity(key) => {
424                1_u8.hash(state);
425                key.hash(state);
426            }
427            Self::AccountSet(key) => {
428                2_u8.hash(state);
429                key.hash(state);
430            }
431            Self::SlotWindow { slot, window } => {
432                3_u8.hash(state);
433                slot.hash(state);
434                window.hash(state);
435            }
436        }
437    }
438}
439
440/// Call-site context used by toxic-flow guards.
441#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
442pub struct TxSubmitContext {
443    /// Additional suppression keys to apply before submit.
444    pub suppression_keys: Vec<TxSubmitSuppressionKey>,
445    /// State version used when the submit decision was made.
446    pub decision_state_version: Option<u64>,
447    /// Timestamp when the opportunity or decision was created.
448    pub opportunity_created_at: Option<SystemTime>,
449}
450
451/// Policy controlling toxic-flow submit rejection.
452#[derive(Debug, Clone, Eq, PartialEq)]
453pub struct TxSubmitGuardPolicy {
454    /// Reject when the flow-safety source is not `Stable`.
455    pub require_stable_control_plane: bool,
456    /// Reject when the flow-safety source reports replay recovery pending.
457    pub reject_on_replay_recovery_pending: bool,
458    /// Maximum allowed drift between decision and current state versions.
459    pub max_state_version_drift: Option<u64>,
460    /// Maximum allowed age for one opportunity before submit.
461    pub max_opportunity_age: Option<Duration>,
462    /// TTL applied to built-in suppression keys.
463    pub suppression_ttl: Duration,
464}
465
466impl Default for TxSubmitGuardPolicy {
467    fn default() -> Self {
468        Self {
469            require_stable_control_plane: true,
470            reject_on_replay_recovery_pending: true,
471            max_state_version_drift: Some(4),
472            max_opportunity_age: Some(Duration::from_millis(750)),
473            suppression_ttl: Duration::from_millis(750),
474        }
475    }
476}
477
478/// Concrete reason one submit attempt was rejected before transport.
479#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
480pub enum TxToxicFlowRejectionReason {
481    /// Control-plane quality is not safe enough.
482    #[error("control-plane quality {quality:?} is not safe for submit")]
483    UnsafeControlPlane {
484        /// Current quality observed from the guard source.
485        quality: TxFlowSafetyQuality,
486    },
487    /// Replay recovery is still pending.
488    #[error("submit source is still recovering replay continuity")]
489    ReplayRecoveryPending,
490    /// One suppression key is still active.
491    #[error("submit suppressed by active key")]
492    Suppressed,
493    /// State version drift exceeded policy.
494    #[error("state version drift {drift} exceeded maximum {max_allowed}")]
495    StateDrift {
496        /// Observed drift between decision and current state versions.
497        drift: u64,
498        /// Maximum drift allowed by policy.
499        max_allowed: u64,
500    },
501    /// Opportunity age exceeded policy.
502    #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
503    OpportunityStale {
504        /// Observed age in milliseconds.
505        age_ms: u64,
506        /// Maximum age allowed in milliseconds.
507        max_allowed_ms: u64,
508    },
509}
510
511/// Final or immediate outcome classification for one submit attempt.
512#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
513pub enum TxSubmitOutcomeKind {
514    /// Direct path accepted the transaction.
515    DirectAccepted,
516    /// RPC path accepted the transaction.
517    RpcAccepted,
518    /// Jito block-engine accepted the transaction.
519    JitoAccepted,
520    /// Transaction landed on chain.
521    Landed,
522    /// Transaction expired before landing.
523    Expired,
524    /// Transaction was dropped before landing.
525    Dropped,
526    /// Route missed the intended leader window.
527    LeaderMissed,
528    /// Submit used a stale blockhash.
529    BlockhashStale,
530    /// Selected route was unhealthy.
531    UnhealthyRoute,
532    /// Submit was rejected due to stale inputs.
533    RejectedDueToStaleness,
534    /// Submit was rejected due to reorg risk.
535    RejectedDueToReorgRisk,
536    /// Submit was rejected due to state drift.
537    RejectedDueToStateDrift,
538    /// Submit was rejected by replay recovery pending.
539    RejectedDueToReplayRecovery,
540    /// Submit was suppressed by a built-in key.
541    Suppressed,
542}
543
544/// Structured outcome record for toxic-flow telemetry/reporting.
545#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
546pub struct TxSubmitOutcome {
547    /// Outcome classification.
548    pub kind: TxSubmitOutcomeKind,
549    /// Transaction signature when available.
550    pub signature: Option<SignatureBytes>,
551    /// Mode selected for the submit attempt.
552    pub mode: SubmitMode,
553    /// Current state version at outcome time when known.
554    pub state_version: Option<u64>,
555    /// Opportunity age in milliseconds when known.
556    pub opportunity_age_ms: Option<u64>,
557}
558
559/// Callback surface for external outcome sinks.
560pub trait TxSubmitOutcomeReporter: Send + Sync {
561    /// Records one structured outcome.
562    fn record_outcome(&self, outcome: &TxSubmitOutcome);
563}
564
565/// Snapshot of built-in toxic-flow counters collected by [`TxSubmitClient`](crate::submit::TxSubmitClient).
566#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
567pub struct TxToxicFlowTelemetrySnapshot {
568    /// Number of submit attempts rejected due to stale inputs.
569    pub rejected_due_to_staleness: u64,
570    /// Number of submit attempts rejected due to reorg risk.
571    pub rejected_due_to_reorg_risk: u64,
572    /// Number of submit attempts rejected due to state drift.
573    pub rejected_due_to_state_drift: u64,
574    /// Number of accepted submits emitted while the source was stale.
575    pub submit_on_stale_blockhash: u64,
576    /// Number of route misses classified as leader misses.
577    pub leader_route_miss_rate: u64,
578    /// Most recent opportunity age seen at submit time.
579    pub opportunity_age_at_send_ms: Option<u64>,
580    /// Number of submits rejected while replay recovery was pending.
581    pub rejected_due_to_replay_recovery: u64,
582    /// Number of submits suppressed by a built-in key.
583    pub suppressed_submissions: u64,
584}
585
586/// One cache-line-aligned atomic counter used by hot telemetry updates.
587#[derive(Debug, Default)]
588#[repr(align(64))]
589struct CacheAlignedAtomicU64(AtomicU64);
590
591impl CacheAlignedAtomicU64 {
592    /// Loads the current counter value.
593    fn load(&self, ordering: Ordering) -> u64 {
594        self.0.load(ordering)
595    }
596
597    /// Stores a new value and returns the previous one.
598    fn swap(&self, value: u64, ordering: Ordering) -> u64 {
599        self.0.swap(value, ordering)
600    }
601
602    /// Increments the counter and returns the previous value.
603    fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
604        self.0.fetch_add(value, ordering)
605    }
606}
607
608/// In-memory telemetry counters for toxic-flow outcomes.
609#[derive(Debug, Default)]
610pub struct TxToxicFlowTelemetry {
611    /// Number of stale-input rejections.
612    rejected_due_to_staleness: CacheAlignedAtomicU64,
613    /// Number of reorg-risk rejections.
614    rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
615    /// Number of state-drift rejections.
616    rejected_due_to_state_drift: CacheAlignedAtomicU64,
617    /// Number of accepted submits observed with stale blockhash state.
618    submit_on_stale_blockhash: CacheAlignedAtomicU64,
619    /// Number of leader-route misses.
620    leader_route_miss_rate: CacheAlignedAtomicU64,
621    /// Last opportunity age seen by the client.
622    opportunity_age_at_send_ms: CacheAlignedAtomicU64,
623    /// Number of replay-recovery rejections.
624    rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
625    /// Number of suppressed submissions.
626    suppressed_submissions: CacheAlignedAtomicU64,
627}
628
629impl TxToxicFlowTelemetry {
630    /// Returns a shareable telemetry sink.
631    #[must_use]
632    pub fn shared() -> Arc<Self> {
633        Arc::new(Self::default())
634    }
635
636    /// Records one structured outcome.
637    pub fn record(&self, outcome: &TxSubmitOutcome) {
638        if let Some(age_ms) = outcome.opportunity_age_ms {
639            let _ = self
640                .opportunity_age_at_send_ms
641                .swap(age_ms, Ordering::Relaxed);
642        }
643        match outcome.kind {
644            TxSubmitOutcomeKind::RejectedDueToStaleness => {
645                let _ = self
646                    .rejected_due_to_staleness
647                    .fetch_add(1, Ordering::Relaxed);
648            }
649            TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
650                let _ = self
651                    .rejected_due_to_reorg_risk
652                    .fetch_add(1, Ordering::Relaxed);
653            }
654            TxSubmitOutcomeKind::RejectedDueToStateDrift => {
655                let _ = self
656                    .rejected_due_to_state_drift
657                    .fetch_add(1, Ordering::Relaxed);
658            }
659            TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
660                let _ = self
661                    .rejected_due_to_replay_recovery
662                    .fetch_add(1, Ordering::Relaxed);
663            }
664            TxSubmitOutcomeKind::Suppressed => {
665                let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
666            }
667            TxSubmitOutcomeKind::LeaderMissed => {
668                let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
669            }
670            TxSubmitOutcomeKind::DirectAccepted
671            | TxSubmitOutcomeKind::RpcAccepted
672            | TxSubmitOutcomeKind::JitoAccepted
673            | TxSubmitOutcomeKind::Landed
674            | TxSubmitOutcomeKind::Expired
675            | TxSubmitOutcomeKind::Dropped
676            | TxSubmitOutcomeKind::UnhealthyRoute => {}
677            TxSubmitOutcomeKind::BlockhashStale => {
678                let _ = self
679                    .submit_on_stale_blockhash
680                    .fetch_add(1, Ordering::Relaxed);
681            }
682        }
683    }
684
685    /// Returns the current telemetry snapshot.
686    #[must_use]
687    pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
688        let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
689        TxToxicFlowTelemetrySnapshot {
690            rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
691            rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
692            rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
693            submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
694            leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
695            opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
696            rejected_due_to_replay_recovery: self
697                .rejected_due_to_replay_recovery
698                .load(Ordering::Relaxed),
699            suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
700        }
701    }
702}
703
704impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
705    fn record_outcome(&self, outcome: &TxSubmitOutcome) {
706        self.record(outcome);
707    }
708}
709
710/// Internal suppression map used by the submit client.
711#[derive(Debug, Default)]
712pub(crate) struct TxSuppressionCache {
713    /// Active suppression entries keyed by opportunity identity.
714    entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
715}
716
717impl TxSuppressionCache {
718    /// Returns true when at least one key is still active inside `ttl`.
719    pub(crate) fn is_suppressed(
720        &mut self,
721        keys: &[TxSubmitSuppressionKey],
722        now: SystemTime,
723        ttl: Duration,
724    ) -> bool {
725        self.evict_expired(now, ttl);
726        keys.iter().any(|key| self.entries.contains_key(key))
727    }
728
729    /// Inserts all provided suppression keys with the current timestamp.
730    pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
731        for key in keys {
732            let _ = self.entries.insert(key.clone(), now);
733        }
734    }
735
736    /// Removes entries older than the current TTL window.
737    fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
738        self.entries.retain(|_, inserted_at| {
739            now.duration_since(*inserted_at)
740                .map(|elapsed| elapsed <= ttl)
741                .unwrap_or(false)
742        });
743    }
744}
745
746/// RPC transport interface.
747#[async_trait]
748pub trait RpcSubmitTransport: Send + Sync {
749    /// Submits transaction bytes to RPC and returns signature string.
750    async fn submit_rpc(
751        &self,
752        tx_bytes: &[u8],
753        config: &RpcSubmitConfig,
754    ) -> Result<String, SubmitTransportError>;
755}
756
757/// Jito transport interface.
758#[async_trait]
759pub trait JitoSubmitTransport: Send + Sync {
760    /// Submits transaction bytes to Jito block engine and returns Jito-specific acceptance data.
761    async fn submit_jito(
762        &self,
763        tx_bytes: &[u8],
764        config: &JitoSubmitConfig,
765    ) -> Result<JitoSubmitResponse, SubmitTransportError>;
766}
767
768/// Direct transport interface.
769#[async_trait]
770pub trait DirectSubmitTransport: Send + Sync {
771    /// Submits transaction bytes to direct targets and returns the first successful target.
772    async fn submit_direct(
773        &self,
774        tx_bytes: &[u8],
775        targets: &[LeaderTarget],
776        policy: RoutingPolicy,
777        config: &DirectSubmitConfig,
778    ) -> Result<LeaderTarget, SubmitTransportError>;
779}