Skip to main content

sof_tx/submit/
types.rs

1//! Shared submission types, errors, and transport traits.
2
3use std::{
4    collections::HashMap,
5    hash::{Hash, Hasher},
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10    time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20/// Runtime submit mode.
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23    /// Submit only through JSON-RPC.
24    RpcOnly,
25    /// Submit only through direct leader/validator targets.
26    DirectOnly,
27    /// Submit direct first, then RPC fallback on failure.
28    Hybrid,
29}
30
31/// Reliability profile for direct and hybrid submission behavior.
32#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
33pub enum SubmitReliability {
34    /// Fastest path with minimal retrying.
35    LowLatency,
36    /// Balanced latency and retry behavior.
37    #[default]
38    Balanced,
39    /// Aggressive retrying before giving up.
40    HighReliability,
41}
42
43/// Signed transaction payload variants accepted by submit APIs.
44#[derive(Debug, Clone, Eq, PartialEq)]
45pub enum SignedTx {
46    /// Bincode-serialized `VersionedTransaction` bytes.
47    VersionedTransactionBytes(Vec<u8>),
48    /// Wire-format transaction bytes.
49    WireTransactionBytes(Vec<u8>),
50}
51
52/// RPC submit tuning.
53#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct RpcSubmitConfig {
55    /// Skip preflight simulation when true.
56    pub skip_preflight: bool,
57    /// Optional preflight commitment string.
58    pub preflight_commitment: Option<String>,
59}
60
61impl Default for RpcSubmitConfig {
62    fn default() -> Self {
63        Self {
64            skip_preflight: true,
65            preflight_commitment: None,
66        }
67    }
68}
69
70/// Direct submit tuning.
71#[derive(Debug, Clone, Eq, PartialEq)]
72pub struct DirectSubmitConfig {
73    /// Per-target send timeout.
74    pub per_target_timeout: Duration,
75    /// Global send budget for one submission.
76    pub global_timeout: Duration,
77    /// Number of rounds to iterate across selected direct targets.
78    pub direct_target_rounds: usize,
79    /// Number of direct-only submit attempts (target selection can refresh per attempt).
80    pub direct_submit_attempts: usize,
81    /// Number of direct submit attempts in `Hybrid` mode before RPC fallback.
82    pub hybrid_direct_attempts: usize,
83    /// Delay between direct rebroadcast attempts/rounds (Agave-like pacing).
84    pub rebroadcast_interval: Duration,
85    /// Enables Agave-style post-ack rebroadcast persistence for direct submits.
86    pub agave_rebroadcast_enabled: bool,
87    /// Maximum time budget for background rebroadcast persistence.
88    pub agave_rebroadcast_window: Duration,
89    /// Delay between background rebroadcast cycles.
90    pub agave_rebroadcast_interval: Duration,
91    /// When true, `Hybrid` mode broadcasts to RPC even after direct send succeeds.
92    pub hybrid_rpc_broadcast: bool,
93    /// Enables latency-aware ordering of direct targets before submit.
94    pub latency_aware_targeting: bool,
95    /// Timeout used for per-target TCP latency probes.
96    pub latency_probe_timeout: Duration,
97    /// Optional extra TCP port to probe (in addition to target TPU port).
98    pub latency_probe_port: Option<u16>,
99    /// Max number of targets to probe per submission.
100    pub latency_probe_max_targets: usize,
101}
102
103impl DirectSubmitConfig {
104    /// Builds a direct-submit config from a reliability profile.
105    #[must_use]
106    pub const fn from_reliability(reliability: SubmitReliability) -> Self {
107        match reliability {
108            SubmitReliability::LowLatency => Self {
109                per_target_timeout: Duration::from_millis(200),
110                global_timeout: Duration::from_millis(1_200),
111                direct_target_rounds: 3,
112                direct_submit_attempts: 3,
113                hybrid_direct_attempts: 2,
114                rebroadcast_interval: Duration::from_millis(90),
115                agave_rebroadcast_enabled: true,
116                agave_rebroadcast_window: Duration::from_secs(30),
117                agave_rebroadcast_interval: Duration::from_millis(700),
118                hybrid_rpc_broadcast: false,
119                latency_aware_targeting: true,
120                latency_probe_timeout: Duration::from_millis(80),
121                latency_probe_port: Some(8899),
122                latency_probe_max_targets: 128,
123            },
124            SubmitReliability::Balanced => Self {
125                per_target_timeout: Duration::from_millis(300),
126                global_timeout: Duration::from_millis(1_800),
127                direct_target_rounds: 4,
128                direct_submit_attempts: 4,
129                hybrid_direct_attempts: 3,
130                rebroadcast_interval: Duration::from_millis(110),
131                agave_rebroadcast_enabled: true,
132                agave_rebroadcast_window: Duration::from_secs(45),
133                agave_rebroadcast_interval: Duration::from_millis(800),
134                hybrid_rpc_broadcast: true,
135                latency_aware_targeting: true,
136                latency_probe_timeout: Duration::from_millis(120),
137                latency_probe_port: Some(8899),
138                latency_probe_max_targets: 128,
139            },
140            SubmitReliability::HighReliability => Self {
141                per_target_timeout: Duration::from_millis(450),
142                global_timeout: Duration::from_millis(3_200),
143                direct_target_rounds: 6,
144                direct_submit_attempts: 5,
145                hybrid_direct_attempts: 4,
146                rebroadcast_interval: Duration::from_millis(140),
147                agave_rebroadcast_enabled: true,
148                agave_rebroadcast_window: Duration::from_secs(70),
149                agave_rebroadcast_interval: Duration::from_millis(900),
150                hybrid_rpc_broadcast: true,
151                latency_aware_targeting: true,
152                latency_probe_timeout: Duration::from_millis(160),
153                latency_probe_port: Some(8899),
154                latency_probe_max_targets: 128,
155            },
156        }
157    }
158
159    /// Returns this config with minimum valid retry counters.
160    #[must_use]
161    pub const fn normalized(self) -> Self {
162        let direct_target_rounds = if self.direct_target_rounds == 0 {
163            1
164        } else {
165            self.direct_target_rounds
166        };
167        let direct_submit_attempts = if self.direct_submit_attempts == 0 {
168            1
169        } else {
170            self.direct_submit_attempts
171        };
172        let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
173            1
174        } else {
175            self.hybrid_direct_attempts
176        };
177        let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
178            1
179        } else {
180            self.latency_probe_max_targets
181        };
182        let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
183            Duration::from_millis(1)
184        } else {
185            self.rebroadcast_interval
186        };
187        let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
188            Duration::from_millis(1)
189        } else {
190            self.agave_rebroadcast_interval
191        };
192        Self {
193            per_target_timeout: self.per_target_timeout,
194            global_timeout: self.global_timeout,
195            direct_target_rounds,
196            direct_submit_attempts,
197            hybrid_direct_attempts,
198            rebroadcast_interval,
199            agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
200            agave_rebroadcast_window: self.agave_rebroadcast_window,
201            agave_rebroadcast_interval,
202            hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
203            latency_aware_targeting: self.latency_aware_targeting,
204            latency_probe_timeout: self.latency_probe_timeout,
205            latency_probe_port: self.latency_probe_port,
206            latency_probe_max_targets,
207        }
208    }
209}
210
211impl Default for DirectSubmitConfig {
212    fn default() -> Self {
213        Self::from_reliability(SubmitReliability::default())
214    }
215}
216
217/// Low-level transport errors surfaced by submit backends.
218#[derive(Debug, Error, Clone, Eq, PartialEq)]
219pub enum SubmitTransportError {
220    /// Invalid transport configuration.
221    #[error("transport configuration invalid: {message}")]
222    Config {
223        /// Human-readable description.
224        message: String,
225    },
226    /// Transport operation failed.
227    #[error("transport failure: {message}")]
228    Failure {
229        /// Human-readable description.
230        message: String,
231    },
232}
233
234/// Submission-level errors.
235#[derive(Debug, Error)]
236pub enum SubmitError {
237    /// Could not build/sign transaction for builder submit path.
238    #[error("failed to build/sign transaction: {source}")]
239    Build {
240        /// Builder-layer failure.
241        source: BuilderError,
242    },
243    /// No blockhash available for builder submit path.
244    #[error("blockhash provider returned no recent blockhash")]
245    MissingRecentBlockhash,
246    /// Signed bytes could not be decoded into a transaction.
247    #[error("failed to decode signed transaction bytes: {source}")]
248    DecodeSignedBytes {
249        /// Bincode decode error.
250        source: Box<bincode::ErrorKind>,
251    },
252    /// Duplicate signature was suppressed by dedupe window.
253    #[error("duplicate signature suppressed by dedupe window")]
254    DuplicateSignature,
255    /// RPC mode requested but no RPC transport was configured.
256    #[error("rpc transport is not configured")]
257    MissingRpcTransport,
258    /// Direct mode requested but no direct transport was configured.
259    #[error("direct transport is not configured")]
260    MissingDirectTransport,
261    /// No direct targets resolved from routing inputs.
262    #[error("no direct targets resolved from leader/backups")]
263    NoDirectTargets,
264    /// Direct transport failure.
265    #[error("direct submit failed: {source}")]
266    Direct {
267        /// Direct transport error.
268        source: SubmitTransportError,
269    },
270    /// RPC transport failure.
271    #[error("rpc submit failed: {source}")]
272    Rpc {
273        /// RPC transport error.
274        source: SubmitTransportError,
275    },
276    /// Internal synchronization failure.
277    #[error("internal synchronization failure: {message}")]
278    InternalSync {
279        /// Synchronization error details.
280        message: String,
281    },
282    /// Submit attempt was rejected by the toxic-flow guard.
283    #[error("submission rejected by toxic-flow guard: {reason}")]
284    ToxicFlow {
285        /// Structured reason for the rejection.
286        reason: TxToxicFlowRejectionReason,
287    },
288}
289
290/// Summary of a successful submission.
291#[derive(Debug, Clone, Eq, PartialEq)]
292pub struct SubmitResult {
293    /// Signature parsed from submitted transaction bytes.
294    pub signature: Option<Signature>,
295    /// Mode selected by caller.
296    pub mode: SubmitMode,
297    /// Target chosen by direct path when applicable.
298    pub direct_target: Option<LeaderTarget>,
299    /// RPC-returned signature string when RPC path succeeded.
300    pub rpc_signature: Option<String>,
301    /// True when RPC fallback was used from hybrid mode.
302    pub used_rpc_fallback: bool,
303    /// Number of direct targets selected for submit attempt that succeeded.
304    pub selected_target_count: usize,
305    /// Number of unique validator identities in selected direct targets.
306    pub selected_identity_count: usize,
307}
308
309/// Coarse toxic-flow quality used by submit guards.
310#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
311pub enum TxFlowSafetyQuality {
312    /// Required inputs are present, coherent, and safe to use.
313    Stable,
314    /// Inputs exist but have not reached a stable confirmation boundary yet.
315    Provisional,
316    /// Inputs exist but the current branch still carries material reorg risk.
317    ReorgRisk,
318    /// Inputs are present but stale.
319    Stale,
320    /// Inputs are present but mutually inconsistent.
321    Degraded,
322    /// Required inputs are still missing.
323    IncompleteControlPlane,
324}
325
326/// One concrete toxic-flow issue reported by a submit guard source.
327#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
328pub enum TxFlowSafetyIssue {
329    /// Submit source is still recovering from replay/checkpoint continuity.
330    ReplayRecoveryPending,
331    /// Control-plane inputs are still missing.
332    MissingControlPlane,
333    /// Control-plane inputs are stale.
334    StaleControlPlane,
335    /// Control-plane inputs are inconsistent.
336    DegradedControlPlane,
337    /// Current branch carries reorg risk.
338    ReorgRisk,
339    /// Current branch is still provisional.
340    Provisional,
341}
342
343/// Current toxic-flow safety snapshot exposed by one submit guard source.
344#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
345pub struct TxFlowSafetySnapshot {
346    /// Coarse quality classification for the current control-plane view.
347    pub quality: TxFlowSafetyQuality,
348    /// Concrete issues behind `quality`.
349    pub issues: Vec<TxFlowSafetyIssue>,
350    /// Current upstream state version when known.
351    pub current_state_version: Option<u64>,
352    /// True when replay recovery is still pending.
353    pub replay_recovery_pending: bool,
354}
355
356impl TxFlowSafetySnapshot {
357    /// Returns true when the current snapshot is strategy-safe.
358    #[must_use]
359    pub const fn is_safe(&self) -> bool {
360        matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
361    }
362}
363
364/// Dynamic source of toxic-flow safety state for submit guards.
365pub trait TxFlowSafetySource: Send + Sync {
366    /// Returns the latest toxic-flow safety snapshot.
367    fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
368}
369
370/// One key used to suppress repeated submission attempts for the same opportunity.
371#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
372pub enum TxSubmitSuppressionKey {
373    /// Suppress by signature.
374    Signature(Signature),
375    /// Suppress by opaque opportunity identifier.
376    Opportunity([u8; 32]),
377    /// Suppress by hashed account set identifier.
378    AccountSet([u8; 32]),
379    /// Suppress by slot-window key.
380    SlotWindow {
381        /// Slot associated with the opportunity.
382        slot: u64,
383        /// Window width used to group nearby opportunities.
384        window: u64,
385    },
386}
387
388impl Hash for TxSubmitSuppressionKey {
389    fn hash<H: Hasher>(&self, state: &mut H) {
390        match self {
391            Self::Signature(signature) => {
392                0_u8.hash(state);
393                signature.as_array().hash(state);
394            }
395            Self::Opportunity(key) => {
396                1_u8.hash(state);
397                key.hash(state);
398            }
399            Self::AccountSet(key) => {
400                2_u8.hash(state);
401                key.hash(state);
402            }
403            Self::SlotWindow { slot, window } => {
404                3_u8.hash(state);
405                slot.hash(state);
406                window.hash(state);
407            }
408        }
409    }
410}
411
412/// Call-site context used by toxic-flow guards.
413#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
414pub struct TxSubmitContext {
415    /// Additional suppression keys to apply before submit.
416    pub suppression_keys: Vec<TxSubmitSuppressionKey>,
417    /// State version used when the submit decision was made.
418    pub decision_state_version: Option<u64>,
419    /// Timestamp when the opportunity or decision was created.
420    pub opportunity_created_at: Option<SystemTime>,
421}
422
423/// Policy controlling toxic-flow submit rejection.
424#[derive(Debug, Clone, Eq, PartialEq)]
425pub struct TxSubmitGuardPolicy {
426    /// Reject when the flow-safety source is not `Stable`.
427    pub require_stable_control_plane: bool,
428    /// Reject when the flow-safety source reports replay recovery pending.
429    pub reject_on_replay_recovery_pending: bool,
430    /// Maximum allowed drift between decision and current state versions.
431    pub max_state_version_drift: Option<u64>,
432    /// Maximum allowed age for one opportunity before submit.
433    pub max_opportunity_age: Option<Duration>,
434    /// TTL applied to built-in suppression keys.
435    pub suppression_ttl: Duration,
436}
437
438impl Default for TxSubmitGuardPolicy {
439    fn default() -> Self {
440        Self {
441            require_stable_control_plane: true,
442            reject_on_replay_recovery_pending: true,
443            max_state_version_drift: Some(4),
444            max_opportunity_age: Some(Duration::from_millis(750)),
445            suppression_ttl: Duration::from_millis(750),
446        }
447    }
448}
449
450/// Concrete reason one submit attempt was rejected before transport.
451#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
452pub enum TxToxicFlowRejectionReason {
453    /// Control-plane quality is not safe enough.
454    #[error("control-plane quality {quality:?} is not safe for submit")]
455    UnsafeControlPlane {
456        /// Current quality observed from the guard source.
457        quality: TxFlowSafetyQuality,
458    },
459    /// Replay recovery is still pending.
460    #[error("submit source is still recovering replay continuity")]
461    ReplayRecoveryPending,
462    /// One suppression key is still active.
463    #[error("submit suppressed by active key")]
464    Suppressed,
465    /// State version drift exceeded policy.
466    #[error("state version drift {drift} exceeded maximum {max_allowed}")]
467    StateDrift {
468        /// Observed drift between decision and current state versions.
469        drift: u64,
470        /// Maximum drift allowed by policy.
471        max_allowed: u64,
472    },
473    /// Opportunity age exceeded policy.
474    #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
475    OpportunityStale {
476        /// Observed age in milliseconds.
477        age_ms: u64,
478        /// Maximum age allowed in milliseconds.
479        max_allowed_ms: u64,
480    },
481}
482
483/// Final or immediate outcome classification for one submit attempt.
484#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
485pub enum TxSubmitOutcomeKind {
486    /// Direct path accepted the transaction.
487    DirectAccepted,
488    /// RPC path accepted the transaction.
489    RpcAccepted,
490    /// Transaction landed on chain.
491    Landed,
492    /// Transaction expired before landing.
493    Expired,
494    /// Transaction was dropped before landing.
495    Dropped,
496    /// Route missed the intended leader window.
497    LeaderMissed,
498    /// Submit used a stale blockhash.
499    BlockhashStale,
500    /// Selected route was unhealthy.
501    UnhealthyRoute,
502    /// Submit was rejected due to stale inputs.
503    RejectedDueToStaleness,
504    /// Submit was rejected due to reorg risk.
505    RejectedDueToReorgRisk,
506    /// Submit was rejected due to state drift.
507    RejectedDueToStateDrift,
508    /// Submit was rejected by replay recovery pending.
509    RejectedDueToReplayRecovery,
510    /// Submit was suppressed by a built-in key.
511    Suppressed,
512}
513
514/// Structured outcome record for toxic-flow telemetry/reporting.
515#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
516pub struct TxSubmitOutcome {
517    /// Outcome classification.
518    pub kind: TxSubmitOutcomeKind,
519    /// Transaction signature when available.
520    pub signature: Option<Signature>,
521    /// Mode selected for the submit attempt.
522    pub mode: SubmitMode,
523    /// Current state version at outcome time when known.
524    pub state_version: Option<u64>,
525    /// Opportunity age in milliseconds when known.
526    pub opportunity_age_ms: Option<u64>,
527}
528
529/// Callback surface for external outcome sinks.
530pub trait TxSubmitOutcomeReporter: Send + Sync {
531    /// Records one structured outcome.
532    fn record_outcome(&self, outcome: &TxSubmitOutcome);
533}
534
535/// Snapshot of built-in toxic-flow counters collected by [`TxSubmitClient`](crate::submit::TxSubmitClient).
536#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
537pub struct TxToxicFlowTelemetrySnapshot {
538    /// Number of submit attempts rejected due to stale inputs.
539    pub rejected_due_to_staleness: u64,
540    /// Number of submit attempts rejected due to reorg risk.
541    pub rejected_due_to_reorg_risk: u64,
542    /// Number of submit attempts rejected due to state drift.
543    pub rejected_due_to_state_drift: u64,
544    /// Number of accepted submits emitted while the source was stale.
545    pub submit_on_stale_blockhash: u64,
546    /// Number of route misses classified as leader misses.
547    pub leader_route_miss_rate: u64,
548    /// Most recent opportunity age seen at submit time.
549    pub opportunity_age_at_send_ms: Option<u64>,
550    /// Number of submits rejected while replay recovery was pending.
551    pub rejected_due_to_replay_recovery: u64,
552    /// Number of submits suppressed by a built-in key.
553    pub suppressed_submissions: u64,
554}
555
556/// In-memory telemetry counters for toxic-flow outcomes.
557#[derive(Debug, Default)]
558pub struct TxToxicFlowTelemetry {
559    /// Number of stale-input rejections.
560    rejected_due_to_staleness: AtomicU64,
561    /// Number of reorg-risk rejections.
562    rejected_due_to_reorg_risk: AtomicU64,
563    /// Number of state-drift rejections.
564    rejected_due_to_state_drift: AtomicU64,
565    /// Number of accepted submits observed with stale blockhash state.
566    submit_on_stale_blockhash: AtomicU64,
567    /// Number of leader-route misses.
568    leader_route_miss_rate: AtomicU64,
569    /// Last opportunity age seen by the client.
570    opportunity_age_at_send_ms: AtomicU64,
571    /// Number of replay-recovery rejections.
572    rejected_due_to_replay_recovery: AtomicU64,
573    /// Number of suppressed submissions.
574    suppressed_submissions: AtomicU64,
575}
576
577impl TxToxicFlowTelemetry {
578    /// Returns a shareable telemetry sink.
579    #[must_use]
580    pub fn shared() -> Arc<Self> {
581        Arc::new(Self::default())
582    }
583
584    /// Records one structured outcome.
585    pub fn record(&self, outcome: &TxSubmitOutcome) {
586        if let Some(age_ms) = outcome.opportunity_age_ms {
587            let _ = self
588                .opportunity_age_at_send_ms
589                .swap(age_ms, Ordering::Relaxed);
590        }
591        match outcome.kind {
592            TxSubmitOutcomeKind::RejectedDueToStaleness => {
593                let _ = self
594                    .rejected_due_to_staleness
595                    .fetch_add(1, Ordering::Relaxed);
596            }
597            TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
598                let _ = self
599                    .rejected_due_to_reorg_risk
600                    .fetch_add(1, Ordering::Relaxed);
601            }
602            TxSubmitOutcomeKind::RejectedDueToStateDrift => {
603                let _ = self
604                    .rejected_due_to_state_drift
605                    .fetch_add(1, Ordering::Relaxed);
606            }
607            TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
608                let _ = self
609                    .rejected_due_to_replay_recovery
610                    .fetch_add(1, Ordering::Relaxed);
611            }
612            TxSubmitOutcomeKind::Suppressed => {
613                let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
614            }
615            TxSubmitOutcomeKind::LeaderMissed => {
616                let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
617            }
618            TxSubmitOutcomeKind::DirectAccepted
619            | TxSubmitOutcomeKind::RpcAccepted
620            | TxSubmitOutcomeKind::Landed
621            | TxSubmitOutcomeKind::Expired
622            | TxSubmitOutcomeKind::Dropped
623            | TxSubmitOutcomeKind::UnhealthyRoute => {}
624            TxSubmitOutcomeKind::BlockhashStale => {
625                let _ = self
626                    .submit_on_stale_blockhash
627                    .fetch_add(1, Ordering::Relaxed);
628            }
629        }
630    }
631
632    /// Returns the current telemetry snapshot.
633    #[must_use]
634    pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
635        let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
636        TxToxicFlowTelemetrySnapshot {
637            rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
638            rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
639            rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
640            submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
641            leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
642            opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
643            rejected_due_to_replay_recovery: self
644                .rejected_due_to_replay_recovery
645                .load(Ordering::Relaxed),
646            suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
647        }
648    }
649}
650
651impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
652    fn record_outcome(&self, outcome: &TxSubmitOutcome) {
653        self.record(outcome);
654    }
655}
656
657/// Internal suppression map used by the submit client.
658#[derive(Debug, Default)]
659pub(crate) struct TxSuppressionCache {
660    /// Active suppression entries keyed by opportunity identity.
661    entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
662}
663
664impl TxSuppressionCache {
665    /// Returns true when at least one key is still active inside `ttl`.
666    pub(crate) fn is_suppressed(
667        &mut self,
668        keys: &[TxSubmitSuppressionKey],
669        now: SystemTime,
670        ttl: Duration,
671    ) -> bool {
672        self.evict_expired(now, ttl);
673        keys.iter().any(|key| self.entries.contains_key(key))
674    }
675
676    /// Inserts all provided suppression keys with the current timestamp.
677    pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
678        for key in keys {
679            let _ = self.entries.insert(key.clone(), now);
680        }
681    }
682
683    /// Removes entries older than the current TTL window.
684    fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
685        self.entries.retain(|_, inserted_at| {
686            now.duration_since(*inserted_at)
687                .map(|elapsed| elapsed <= ttl)
688                .unwrap_or(false)
689        });
690    }
691}
692
693/// RPC transport interface.
694#[async_trait]
695pub trait RpcSubmitTransport: Send + Sync {
696    /// Submits transaction bytes to RPC and returns signature string.
697    async fn submit_rpc(
698        &self,
699        tx_bytes: &[u8],
700        config: &RpcSubmitConfig,
701    ) -> Result<String, SubmitTransportError>;
702}
703
704/// Direct transport interface.
705#[async_trait]
706pub trait DirectSubmitTransport: Send + Sync {
707    /// Submits transaction bytes to direct targets and returns the first successful target.
708    async fn submit_direct(
709        &self,
710        tx_bytes: &[u8],
711        targets: &[LeaderTarget],
712        policy: RoutingPolicy,
713        config: &DirectSubmitConfig,
714    ) -> Result<LeaderTarget, SubmitTransportError>;
715}