Skip to main content

sof_tx/submit/
types.rs

1//! Shared submission types, errors, and transport traits.
2
3use std::{
4    collections::HashMap,
5    hash::{Hash, Hasher},
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10    time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use solana_signature::Signature;
16use thiserror::Error;
17
18use crate::{builder::BuilderError, providers::LeaderTarget, routing::RoutingPolicy};
19
20/// Runtime submit mode.
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23    /// Submit only through JSON-RPC.
24    RpcOnly,
25    /// Submit only through Jito block engine.
26    JitoOnly,
27    /// Submit only through direct leader/validator targets.
28    DirectOnly,
29    /// Submit direct first, then RPC fallback on failure.
30    Hybrid,
31}
32
33/// Reliability profile for direct and hybrid submission behavior.
34#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
35pub enum SubmitReliability {
36    /// Fastest path with minimal retrying.
37    LowLatency,
38    /// Balanced latency and retry behavior.
39    #[default]
40    Balanced,
41    /// Aggressive retrying before giving up.
42    HighReliability,
43}
44
45/// Signed transaction payload variants accepted by submit APIs.
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum SignedTx {
48    /// Bincode-serialized `VersionedTransaction` bytes.
49    VersionedTransactionBytes(Vec<u8>),
50    /// Wire-format transaction bytes.
51    WireTransactionBytes(Vec<u8>),
52}
53
54/// RPC submit tuning.
55#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct RpcSubmitConfig {
57    /// Skip preflight simulation when true.
58    pub skip_preflight: bool,
59    /// Optional preflight commitment string.
60    pub preflight_commitment: Option<String>,
61}
62
63impl Default for RpcSubmitConfig {
64    fn default() -> Self {
65        Self {
66            skip_preflight: true,
67            preflight_commitment: None,
68        }
69    }
70}
71
72/// Jito block-engine submit tuning.
73#[derive(Debug, Clone, Eq, PartialEq, Default)]
74pub struct JitoSubmitConfig {
75    /// Enables revert protection by sending through bundle-only mode.
76    ///
77    /// This applies to the JSON-RPC transaction path. The gRPC transport is bundle-based and
78    /// therefore always behaves as a bundle submission.
79    pub bundle_only: bool,
80}
81
82/// Successful Jito submission metadata.
83#[derive(Debug, Clone, Eq, PartialEq, Default)]
84pub struct JitoSubmitResponse {
85    /// Jito JSON-RPC returned transaction signature, when available.
86    pub transaction_signature: Option<String>,
87    /// Jito gRPC returned bundle UUID, when available.
88    pub bundle_id: Option<String>,
89}
90
91/// Direct submit tuning.
92#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct DirectSubmitConfig {
94    /// Per-target send timeout.
95    pub per_target_timeout: Duration,
96    /// Global send budget for one submission.
97    pub global_timeout: Duration,
98    /// Number of rounds to iterate across selected direct targets.
99    pub direct_target_rounds: usize,
100    /// Number of direct-only submit attempts (target selection can refresh per attempt).
101    pub direct_submit_attempts: usize,
102    /// Number of direct submit attempts in `Hybrid` mode before RPC fallback.
103    pub hybrid_direct_attempts: usize,
104    /// Delay between direct rebroadcast attempts/rounds (Agave-like pacing).
105    pub rebroadcast_interval: Duration,
106    /// Enables Agave-style post-ack rebroadcast persistence for direct submits.
107    pub agave_rebroadcast_enabled: bool,
108    /// Maximum time budget for background rebroadcast persistence.
109    pub agave_rebroadcast_window: Duration,
110    /// Delay between background rebroadcast cycles.
111    pub agave_rebroadcast_interval: Duration,
112    /// When true, `Hybrid` mode broadcasts to RPC even after direct send succeeds.
113    pub hybrid_rpc_broadcast: bool,
114    /// Enables latency-aware ordering of direct targets before submit.
115    pub latency_aware_targeting: bool,
116    /// Timeout used for per-target TCP latency probes.
117    pub latency_probe_timeout: Duration,
118    /// Optional extra TCP port to probe (in addition to target TPU port).
119    pub latency_probe_port: Option<u16>,
120    /// Max number of targets to probe per submission.
121    pub latency_probe_max_targets: usize,
122}
123
124impl DirectSubmitConfig {
125    /// Builds a direct-submit config from a reliability profile.
126    #[must_use]
127    pub const fn from_reliability(reliability: SubmitReliability) -> Self {
128        match reliability {
129            SubmitReliability::LowLatency => Self {
130                per_target_timeout: Duration::from_millis(200),
131                global_timeout: Duration::from_millis(1_200),
132                direct_target_rounds: 3,
133                direct_submit_attempts: 3,
134                hybrid_direct_attempts: 2,
135                rebroadcast_interval: Duration::from_millis(90),
136                agave_rebroadcast_enabled: true,
137                agave_rebroadcast_window: Duration::from_secs(30),
138                agave_rebroadcast_interval: Duration::from_millis(700),
139                hybrid_rpc_broadcast: false,
140                latency_aware_targeting: true,
141                latency_probe_timeout: Duration::from_millis(80),
142                latency_probe_port: Some(8899),
143                latency_probe_max_targets: 128,
144            },
145            SubmitReliability::Balanced => Self {
146                per_target_timeout: Duration::from_millis(300),
147                global_timeout: Duration::from_millis(1_800),
148                direct_target_rounds: 4,
149                direct_submit_attempts: 4,
150                hybrid_direct_attempts: 3,
151                rebroadcast_interval: Duration::from_millis(110),
152                agave_rebroadcast_enabled: true,
153                agave_rebroadcast_window: Duration::from_secs(45),
154                agave_rebroadcast_interval: Duration::from_millis(800),
155                hybrid_rpc_broadcast: true,
156                latency_aware_targeting: true,
157                latency_probe_timeout: Duration::from_millis(120),
158                latency_probe_port: Some(8899),
159                latency_probe_max_targets: 128,
160            },
161            SubmitReliability::HighReliability => Self {
162                per_target_timeout: Duration::from_millis(450),
163                global_timeout: Duration::from_millis(3_200),
164                direct_target_rounds: 6,
165                direct_submit_attempts: 5,
166                hybrid_direct_attempts: 4,
167                rebroadcast_interval: Duration::from_millis(140),
168                agave_rebroadcast_enabled: true,
169                agave_rebroadcast_window: Duration::from_secs(70),
170                agave_rebroadcast_interval: Duration::from_millis(900),
171                hybrid_rpc_broadcast: true,
172                latency_aware_targeting: true,
173                latency_probe_timeout: Duration::from_millis(160),
174                latency_probe_port: Some(8899),
175                latency_probe_max_targets: 128,
176            },
177        }
178    }
179
180    /// Returns this config with minimum valid retry counters.
181    #[must_use]
182    pub const fn normalized(self) -> Self {
183        let direct_target_rounds = if self.direct_target_rounds == 0 {
184            1
185        } else {
186            self.direct_target_rounds
187        };
188        let direct_submit_attempts = if self.direct_submit_attempts == 0 {
189            1
190        } else {
191            self.direct_submit_attempts
192        };
193        let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
194            1
195        } else {
196            self.hybrid_direct_attempts
197        };
198        let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
199            1
200        } else {
201            self.latency_probe_max_targets
202        };
203        let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
204            Duration::from_millis(1)
205        } else {
206            self.rebroadcast_interval
207        };
208        let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
209            Duration::from_millis(1)
210        } else {
211            self.agave_rebroadcast_interval
212        };
213        Self {
214            per_target_timeout: self.per_target_timeout,
215            global_timeout: self.global_timeout,
216            direct_target_rounds,
217            direct_submit_attempts,
218            hybrid_direct_attempts,
219            rebroadcast_interval,
220            agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
221            agave_rebroadcast_window: self.agave_rebroadcast_window,
222            agave_rebroadcast_interval,
223            hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
224            latency_aware_targeting: self.latency_aware_targeting,
225            latency_probe_timeout: self.latency_probe_timeout,
226            latency_probe_port: self.latency_probe_port,
227            latency_probe_max_targets,
228        }
229    }
230}
231
232impl Default for DirectSubmitConfig {
233    fn default() -> Self {
234        Self::from_reliability(SubmitReliability::default())
235    }
236}
237
238/// Low-level transport errors surfaced by submit backends.
239#[derive(Debug, Error, Clone, Eq, PartialEq)]
240pub enum SubmitTransportError {
241    /// Invalid transport configuration.
242    #[error("transport configuration invalid: {message}")]
243    Config {
244        /// Human-readable description.
245        message: String,
246    },
247    /// Transport operation failed.
248    #[error("transport failure: {message}")]
249    Failure {
250        /// Human-readable description.
251        message: String,
252    },
253}
254
255/// Submission-level errors.
256#[derive(Debug, Error)]
257pub enum SubmitError {
258    /// Could not build/sign transaction for builder submit path.
259    #[error("failed to build/sign transaction: {source}")]
260    Build {
261        /// Builder-layer failure.
262        source: BuilderError,
263    },
264    /// No blockhash available for builder submit path.
265    #[error("blockhash provider returned no recent blockhash")]
266    MissingRecentBlockhash,
267    /// Signed bytes could not be decoded into a transaction.
268    #[error("failed to decode signed transaction bytes: {source}")]
269    DecodeSignedBytes {
270        /// Bincode decode error.
271        source: Box<bincode::ErrorKind>,
272    },
273    /// Duplicate signature was suppressed by dedupe window.
274    #[error("duplicate signature suppressed by dedupe window")]
275    DuplicateSignature,
276    /// RPC mode requested but no RPC transport was configured.
277    #[error("rpc transport is not configured")]
278    MissingRpcTransport,
279    /// Jito mode requested but no Jito transport was configured.
280    #[error("jito transport is not configured")]
281    MissingJitoTransport,
282    /// Direct mode requested but no direct transport was configured.
283    #[error("direct transport is not configured")]
284    MissingDirectTransport,
285    /// No direct targets resolved from routing inputs.
286    #[error("no direct targets resolved from leader/backups")]
287    NoDirectTargets,
288    /// Direct transport failure.
289    #[error("direct submit failed: {source}")]
290    Direct {
291        /// Direct transport error.
292        source: SubmitTransportError,
293    },
294    /// RPC transport failure.
295    #[error("rpc submit failed: {source}")]
296    Rpc {
297        /// RPC transport error.
298        source: SubmitTransportError,
299    },
300    /// Jito transport failure.
301    #[error("jito submit failed: {source}")]
302    Jito {
303        /// Jito transport error.
304        source: SubmitTransportError,
305    },
306    /// Internal synchronization failure.
307    #[error("internal synchronization failure: {message}")]
308    InternalSync {
309        /// Synchronization error details.
310        message: String,
311    },
312    /// Submit attempt was rejected by the toxic-flow guard.
313    #[error("submission rejected by toxic-flow guard: {reason}")]
314    ToxicFlow {
315        /// Structured reason for the rejection.
316        reason: TxToxicFlowRejectionReason,
317    },
318}
319
320/// Summary of a successful submission.
321#[derive(Debug, Clone, Eq, PartialEq)]
322pub struct SubmitResult {
323    /// Signature parsed from submitted transaction bytes.
324    pub signature: Option<Signature>,
325    /// Mode selected by caller.
326    pub mode: SubmitMode,
327    /// Target chosen by direct path when applicable.
328    pub direct_target: Option<LeaderTarget>,
329    /// RPC-returned signature string when RPC path succeeded.
330    pub rpc_signature: Option<String>,
331    /// Jito block-engine returned transaction signature when the JSON-RPC path succeeded.
332    pub jito_signature: Option<String>,
333    /// Jito block-engine returned bundle UUID when the gRPC bundle path succeeded.
334    pub jito_bundle_id: Option<String>,
335    /// True when RPC fallback was used from hybrid mode.
336    pub used_rpc_fallback: bool,
337    /// Number of direct targets selected for submit attempt that succeeded.
338    pub selected_target_count: usize,
339    /// Number of unique validator identities in selected direct targets.
340    pub selected_identity_count: usize,
341}
342
343/// Coarse toxic-flow quality used by submit guards.
344#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
345pub enum TxFlowSafetyQuality {
346    /// Required inputs are present, coherent, and safe to use.
347    Stable,
348    /// Inputs exist but have not reached a stable confirmation boundary yet.
349    Provisional,
350    /// Inputs exist but the current branch still carries material reorg risk.
351    ReorgRisk,
352    /// Inputs are present but stale.
353    Stale,
354    /// Inputs are present but mutually inconsistent.
355    Degraded,
356    /// Required inputs are still missing.
357    IncompleteControlPlane,
358}
359
360/// One concrete toxic-flow issue reported by a submit guard source.
361#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
362pub enum TxFlowSafetyIssue {
363    /// Submit source is still recovering from replay/checkpoint continuity.
364    ReplayRecoveryPending,
365    /// Control-plane inputs are still missing.
366    MissingControlPlane,
367    /// Control-plane inputs are stale.
368    StaleControlPlane,
369    /// Control-plane inputs are inconsistent.
370    DegradedControlPlane,
371    /// Current branch carries reorg risk.
372    ReorgRisk,
373    /// Current branch is still provisional.
374    Provisional,
375}
376
377/// Current toxic-flow safety snapshot exposed by one submit guard source.
378#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
379pub struct TxFlowSafetySnapshot {
380    /// Coarse quality classification for the current control-plane view.
381    pub quality: TxFlowSafetyQuality,
382    /// Concrete issues behind `quality`.
383    pub issues: Vec<TxFlowSafetyIssue>,
384    /// Current upstream state version when known.
385    pub current_state_version: Option<u64>,
386    /// True when replay recovery is still pending.
387    pub replay_recovery_pending: bool,
388}
389
390impl TxFlowSafetySnapshot {
391    /// Returns true when the current snapshot is strategy-safe.
392    #[must_use]
393    pub const fn is_safe(&self) -> bool {
394        matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
395    }
396}
397
398/// Dynamic source of toxic-flow safety state for submit guards.
399pub trait TxFlowSafetySource: Send + Sync {
400    /// Returns the latest toxic-flow safety snapshot.
401    fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
402}
403
404/// One key used to suppress repeated submission attempts for the same opportunity.
405#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
406pub enum TxSubmitSuppressionKey {
407    /// Suppress by signature.
408    Signature(Signature),
409    /// Suppress by opaque opportunity identifier.
410    Opportunity([u8; 32]),
411    /// Suppress by hashed account set identifier.
412    AccountSet([u8; 32]),
413    /// Suppress by slot-window key.
414    SlotWindow {
415        /// Slot associated with the opportunity.
416        slot: u64,
417        /// Window width used to group nearby opportunities.
418        window: u64,
419    },
420}
421
422impl Hash for TxSubmitSuppressionKey {
423    fn hash<H: Hasher>(&self, state: &mut H) {
424        match self {
425            Self::Signature(signature) => {
426                0_u8.hash(state);
427                signature.as_array().hash(state);
428            }
429            Self::Opportunity(key) => {
430                1_u8.hash(state);
431                key.hash(state);
432            }
433            Self::AccountSet(key) => {
434                2_u8.hash(state);
435                key.hash(state);
436            }
437            Self::SlotWindow { slot, window } => {
438                3_u8.hash(state);
439                slot.hash(state);
440                window.hash(state);
441            }
442        }
443    }
444}
445
446/// Call-site context used by toxic-flow guards.
447#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
448pub struct TxSubmitContext {
449    /// Additional suppression keys to apply before submit.
450    pub suppression_keys: Vec<TxSubmitSuppressionKey>,
451    /// State version used when the submit decision was made.
452    pub decision_state_version: Option<u64>,
453    /// Timestamp when the opportunity or decision was created.
454    pub opportunity_created_at: Option<SystemTime>,
455}
456
457/// Policy controlling toxic-flow submit rejection.
458#[derive(Debug, Clone, Eq, PartialEq)]
459pub struct TxSubmitGuardPolicy {
460    /// Reject when the flow-safety source is not `Stable`.
461    pub require_stable_control_plane: bool,
462    /// Reject when the flow-safety source reports replay recovery pending.
463    pub reject_on_replay_recovery_pending: bool,
464    /// Maximum allowed drift between decision and current state versions.
465    pub max_state_version_drift: Option<u64>,
466    /// Maximum allowed age for one opportunity before submit.
467    pub max_opportunity_age: Option<Duration>,
468    /// TTL applied to built-in suppression keys.
469    pub suppression_ttl: Duration,
470}
471
472impl Default for TxSubmitGuardPolicy {
473    fn default() -> Self {
474        Self {
475            require_stable_control_plane: true,
476            reject_on_replay_recovery_pending: true,
477            max_state_version_drift: Some(4),
478            max_opportunity_age: Some(Duration::from_millis(750)),
479            suppression_ttl: Duration::from_millis(750),
480        }
481    }
482}
483
484/// Concrete reason one submit attempt was rejected before transport.
485#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
486pub enum TxToxicFlowRejectionReason {
487    /// Control-plane quality is not safe enough.
488    #[error("control-plane quality {quality:?} is not safe for submit")]
489    UnsafeControlPlane {
490        /// Current quality observed from the guard source.
491        quality: TxFlowSafetyQuality,
492    },
493    /// Replay recovery is still pending.
494    #[error("submit source is still recovering replay continuity")]
495    ReplayRecoveryPending,
496    /// One suppression key is still active.
497    #[error("submit suppressed by active key")]
498    Suppressed,
499    /// State version drift exceeded policy.
500    #[error("state version drift {drift} exceeded maximum {max_allowed}")]
501    StateDrift {
502        /// Observed drift between decision and current state versions.
503        drift: u64,
504        /// Maximum drift allowed by policy.
505        max_allowed: u64,
506    },
507    /// Opportunity age exceeded policy.
508    #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
509    OpportunityStale {
510        /// Observed age in milliseconds.
511        age_ms: u64,
512        /// Maximum age allowed in milliseconds.
513        max_allowed_ms: u64,
514    },
515}
516
517/// Final or immediate outcome classification for one submit attempt.
518#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
519pub enum TxSubmitOutcomeKind {
520    /// Direct path accepted the transaction.
521    DirectAccepted,
522    /// RPC path accepted the transaction.
523    RpcAccepted,
524    /// Jito block-engine accepted the transaction.
525    JitoAccepted,
526    /// Transaction landed on chain.
527    Landed,
528    /// Transaction expired before landing.
529    Expired,
530    /// Transaction was dropped before landing.
531    Dropped,
532    /// Route missed the intended leader window.
533    LeaderMissed,
534    /// Submit used a stale blockhash.
535    BlockhashStale,
536    /// Selected route was unhealthy.
537    UnhealthyRoute,
538    /// Submit was rejected due to stale inputs.
539    RejectedDueToStaleness,
540    /// Submit was rejected due to reorg risk.
541    RejectedDueToReorgRisk,
542    /// Submit was rejected due to state drift.
543    RejectedDueToStateDrift,
544    /// Submit was rejected by replay recovery pending.
545    RejectedDueToReplayRecovery,
546    /// Submit was suppressed by a built-in key.
547    Suppressed,
548}
549
550/// Structured outcome record for toxic-flow telemetry/reporting.
551#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
552pub struct TxSubmitOutcome {
553    /// Outcome classification.
554    pub kind: TxSubmitOutcomeKind,
555    /// Transaction signature when available.
556    pub signature: Option<Signature>,
557    /// Mode selected for the submit attempt.
558    pub mode: SubmitMode,
559    /// Current state version at outcome time when known.
560    pub state_version: Option<u64>,
561    /// Opportunity age in milliseconds when known.
562    pub opportunity_age_ms: Option<u64>,
563}
564
565/// Callback surface for external outcome sinks.
566pub trait TxSubmitOutcomeReporter: Send + Sync {
567    /// Records one structured outcome.
568    fn record_outcome(&self, outcome: &TxSubmitOutcome);
569}
570
571/// Snapshot of built-in toxic-flow counters collected by [`TxSubmitClient`](crate::submit::TxSubmitClient).
572#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
573pub struct TxToxicFlowTelemetrySnapshot {
574    /// Number of submit attempts rejected due to stale inputs.
575    pub rejected_due_to_staleness: u64,
576    /// Number of submit attempts rejected due to reorg risk.
577    pub rejected_due_to_reorg_risk: u64,
578    /// Number of submit attempts rejected due to state drift.
579    pub rejected_due_to_state_drift: u64,
580    /// Number of accepted submits emitted while the source was stale.
581    pub submit_on_stale_blockhash: u64,
582    /// Number of route misses classified as leader misses.
583    pub leader_route_miss_rate: u64,
584    /// Most recent opportunity age seen at submit time.
585    pub opportunity_age_at_send_ms: Option<u64>,
586    /// Number of submits rejected while replay recovery was pending.
587    pub rejected_due_to_replay_recovery: u64,
588    /// Number of submits suppressed by a built-in key.
589    pub suppressed_submissions: u64,
590}
591
592/// One cache-line-aligned atomic counter used by hot telemetry updates.
593#[derive(Debug, Default)]
594#[repr(align(64))]
595struct CacheAlignedAtomicU64(AtomicU64);
596
597impl CacheAlignedAtomicU64 {
598    /// Loads the current counter value.
599    fn load(&self, ordering: Ordering) -> u64 {
600        self.0.load(ordering)
601    }
602
603    /// Stores a new value and returns the previous one.
604    fn swap(&self, value: u64, ordering: Ordering) -> u64 {
605        self.0.swap(value, ordering)
606    }
607
608    /// Increments the counter and returns the previous value.
609    fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
610        self.0.fetch_add(value, ordering)
611    }
612}
613
614/// In-memory telemetry counters for toxic-flow outcomes.
615#[derive(Debug, Default)]
616pub struct TxToxicFlowTelemetry {
617    /// Number of stale-input rejections.
618    rejected_due_to_staleness: CacheAlignedAtomicU64,
619    /// Number of reorg-risk rejections.
620    rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
621    /// Number of state-drift rejections.
622    rejected_due_to_state_drift: CacheAlignedAtomicU64,
623    /// Number of accepted submits observed with stale blockhash state.
624    submit_on_stale_blockhash: CacheAlignedAtomicU64,
625    /// Number of leader-route misses.
626    leader_route_miss_rate: CacheAlignedAtomicU64,
627    /// Last opportunity age seen by the client.
628    opportunity_age_at_send_ms: CacheAlignedAtomicU64,
629    /// Number of replay-recovery rejections.
630    rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
631    /// Number of suppressed submissions.
632    suppressed_submissions: CacheAlignedAtomicU64,
633}
634
635impl TxToxicFlowTelemetry {
636    /// Returns a shareable telemetry sink.
637    #[must_use]
638    pub fn shared() -> Arc<Self> {
639        Arc::new(Self::default())
640    }
641
642    /// Records one structured outcome.
643    pub fn record(&self, outcome: &TxSubmitOutcome) {
644        if let Some(age_ms) = outcome.opportunity_age_ms {
645            let _ = self
646                .opportunity_age_at_send_ms
647                .swap(age_ms, Ordering::Relaxed);
648        }
649        match outcome.kind {
650            TxSubmitOutcomeKind::RejectedDueToStaleness => {
651                let _ = self
652                    .rejected_due_to_staleness
653                    .fetch_add(1, Ordering::Relaxed);
654            }
655            TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
656                let _ = self
657                    .rejected_due_to_reorg_risk
658                    .fetch_add(1, Ordering::Relaxed);
659            }
660            TxSubmitOutcomeKind::RejectedDueToStateDrift => {
661                let _ = self
662                    .rejected_due_to_state_drift
663                    .fetch_add(1, Ordering::Relaxed);
664            }
665            TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
666                let _ = self
667                    .rejected_due_to_replay_recovery
668                    .fetch_add(1, Ordering::Relaxed);
669            }
670            TxSubmitOutcomeKind::Suppressed => {
671                let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
672            }
673            TxSubmitOutcomeKind::LeaderMissed => {
674                let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
675            }
676            TxSubmitOutcomeKind::DirectAccepted
677            | TxSubmitOutcomeKind::RpcAccepted
678            | TxSubmitOutcomeKind::JitoAccepted
679            | TxSubmitOutcomeKind::Landed
680            | TxSubmitOutcomeKind::Expired
681            | TxSubmitOutcomeKind::Dropped
682            | TxSubmitOutcomeKind::UnhealthyRoute => {}
683            TxSubmitOutcomeKind::BlockhashStale => {
684                let _ = self
685                    .submit_on_stale_blockhash
686                    .fetch_add(1, Ordering::Relaxed);
687            }
688        }
689    }
690
691    /// Returns the current telemetry snapshot.
692    #[must_use]
693    pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
694        let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
695        TxToxicFlowTelemetrySnapshot {
696            rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
697            rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
698            rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
699            submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
700            leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
701            opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
702            rejected_due_to_replay_recovery: self
703                .rejected_due_to_replay_recovery
704                .load(Ordering::Relaxed),
705            suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
706        }
707    }
708}
709
710impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
711    fn record_outcome(&self, outcome: &TxSubmitOutcome) {
712        self.record(outcome);
713    }
714}
715
716/// Internal suppression map used by the submit client.
717#[derive(Debug, Default)]
718pub(crate) struct TxSuppressionCache {
719    /// Active suppression entries keyed by opportunity identity.
720    entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
721}
722
723impl TxSuppressionCache {
724    /// Returns true when at least one key is still active inside `ttl`.
725    pub(crate) fn is_suppressed(
726        &mut self,
727        keys: &[TxSubmitSuppressionKey],
728        now: SystemTime,
729        ttl: Duration,
730    ) -> bool {
731        self.evict_expired(now, ttl);
732        keys.iter().any(|key| self.entries.contains_key(key))
733    }
734
735    /// Inserts all provided suppression keys with the current timestamp.
736    pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
737        for key in keys {
738            let _ = self.entries.insert(key.clone(), now);
739        }
740    }
741
742    /// Removes entries older than the current TTL window.
743    fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
744        self.entries.retain(|_, inserted_at| {
745            now.duration_since(*inserted_at)
746                .map(|elapsed| elapsed <= ttl)
747                .unwrap_or(false)
748        });
749    }
750}
751
752/// RPC transport interface.
753#[async_trait]
754pub trait RpcSubmitTransport: Send + Sync {
755    /// Submits transaction bytes to RPC and returns signature string.
756    async fn submit_rpc(
757        &self,
758        tx_bytes: &[u8],
759        config: &RpcSubmitConfig,
760    ) -> Result<String, SubmitTransportError>;
761}
762
763/// Jito transport interface.
764#[async_trait]
765pub trait JitoSubmitTransport: Send + Sync {
766    /// Submits transaction bytes to Jito block engine and returns Jito-specific acceptance data.
767    async fn submit_jito(
768        &self,
769        tx_bytes: &[u8],
770        config: &JitoSubmitConfig,
771    ) -> Result<JitoSubmitResponse, SubmitTransportError>;
772}
773
774/// Direct transport interface.
775#[async_trait]
776pub trait DirectSubmitTransport: Send + Sync {
777    /// Submits transaction bytes to direct targets and returns the first successful target.
778    async fn submit_direct(
779        &self,
780        tx_bytes: &[u8],
781        targets: &[LeaderTarget],
782        policy: RoutingPolicy,
783        config: &DirectSubmitConfig,
784    ) -> Result<LeaderTarget, SubmitTransportError>;
785}