Skip to main content

sof_tx/submit/
types.rs

1//! Shared submission types, errors, and transport traits.
2
3use std::{
4    collections::HashMap,
5    hash::{Hash, Hasher},
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10    time::{Duration, SystemTime},
11};
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sof_types::SignatureBytes;
16use thiserror::Error;
17
18use crate::{providers::LeaderTarget, routing::RoutingPolicy};
19
20/// Legacy runtime submit presets.
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22pub enum SubmitMode {
23    /// Submit only through JSON-RPC.
24    RpcOnly,
25    /// Submit only through Jito block engine.
26    JitoOnly,
27    /// Submit only through direct leader/validator targets.
28    DirectOnly,
29    /// Submit direct first, then RPC fallback on failure.
30    Hybrid,
31}
32
33/// One concrete submit route.
34#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
35pub enum SubmitRoute {
36    /// JSON-RPC submission.
37    Rpc,
38    /// Jito block-engine submission.
39    Jito,
40    /// Direct leader/validator submission.
41    Direct,
42}
43
44/// Route execution policy for one submission attempt.
45#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
46pub enum SubmitStrategy {
47    /// Execute routes in order and stop at the first accepted route.
48    #[default]
49    OrderedFallback,
50    /// Execute all configured routes at the same time.
51    AllAtOnce,
52}
53
54/// Route-plan based submission configuration.
55#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
56pub struct SubmitPlan {
57    /// Configured submit routes.
58    pub routes: Vec<SubmitRoute>,
59    /// Route execution strategy.
60    pub strategy: SubmitStrategy,
61}
62
63impl SubmitPlan {
64    /// Creates one normalized plan.
65    #[must_use]
66    pub fn new(routes: Vec<SubmitRoute>, strategy: SubmitStrategy) -> Self {
67        Self { routes, strategy }.into_normalized()
68    }
69
70    /// Normalizes this plan in place while preserving route order.
71    #[must_use]
72    pub fn into_normalized(mut self) -> Self {
73        let mut seen_rpc = false;
74        let mut seen_jito = false;
75        let mut seen_direct = false;
76        self.routes.retain(|route| match route {
77            SubmitRoute::Rpc if seen_rpc => false,
78            SubmitRoute::Rpc => {
79                seen_rpc = true;
80                true
81            }
82            SubmitRoute::Jito if seen_jito => false,
83            SubmitRoute::Jito => {
84                seen_jito = true;
85                true
86            }
87            SubmitRoute::Direct if seen_direct => false,
88            SubmitRoute::Direct => {
89                seen_direct = true;
90                true
91            }
92        });
93        self
94    }
95
96    /// Returns a normalized clone of this plan.
97    #[must_use]
98    pub fn normalized(&self) -> Self {
99        self.clone().into_normalized()
100    }
101
102    /// Builds a single-route RPC fallback plan.
103    #[must_use]
104    pub fn rpc_only() -> Self {
105        Self::new(vec![SubmitRoute::Rpc], SubmitStrategy::OrderedFallback)
106    }
107
108    /// Builds a single-route Jito plan.
109    #[must_use]
110    pub fn jito_only() -> Self {
111        Self::new(vec![SubmitRoute::Jito], SubmitStrategy::OrderedFallback)
112    }
113
114    /// Builds a single-route direct plan.
115    #[must_use]
116    pub fn direct_only() -> Self {
117        Self::new(vec![SubmitRoute::Direct], SubmitStrategy::OrderedFallback)
118    }
119
120    /// Builds one custom ordered-fallback plan.
121    #[must_use]
122    pub fn ordered(routes: Vec<SubmitRoute>) -> Self {
123        Self::new(routes, SubmitStrategy::OrderedFallback)
124    }
125
126    /// Builds the legacy direct-then-RPC fallback plan.
127    #[must_use]
128    pub fn hybrid() -> Self {
129        Self::ordered(vec![SubmitRoute::Direct, SubmitRoute::Rpc])
130    }
131
132    /// Builds one concurrent all-route plan.
133    #[must_use]
134    pub fn all_at_once(routes: Vec<SubmitRoute>) -> Self {
135        Self::new(routes, SubmitStrategy::AllAtOnce)
136    }
137
138    /// Returns the matching legacy preset when this plan is one exact legacy shape.
139    #[must_use]
140    pub fn legacy_mode(&self) -> Option<SubmitMode> {
141        match (self.strategy, self.routes.as_slice()) {
142            (SubmitStrategy::OrderedFallback, [SubmitRoute::Rpc]) => Some(SubmitMode::RpcOnly),
143            (SubmitStrategy::OrderedFallback, [SubmitRoute::Jito]) => Some(SubmitMode::JitoOnly),
144            (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct]) => {
145                Some(SubmitMode::DirectOnly)
146            }
147            (SubmitStrategy::OrderedFallback, [SubmitRoute::Direct, SubmitRoute::Rpc]) => {
148                Some(SubmitMode::Hybrid)
149            }
150            _ => None,
151        }
152    }
153}
154
155impl Default for SubmitPlan {
156    fn default() -> Self {
157        Self::rpc_only()
158    }
159}
160
161impl From<SubmitMode> for SubmitPlan {
162    fn from(value: SubmitMode) -> Self {
163        match value {
164            SubmitMode::RpcOnly => Self::rpc_only(),
165            SubmitMode::JitoOnly => Self::jito_only(),
166            SubmitMode::DirectOnly => Self::direct_only(),
167            SubmitMode::Hybrid => Self::hybrid(),
168        }
169    }
170}
171
172/// Reliability profile for direct and hybrid submission behavior.
173#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
174pub enum SubmitReliability {
175    /// Fastest path with minimal retrying.
176    LowLatency,
177    /// Balanced latency and retry behavior.
178    #[default]
179    Balanced,
180    /// Aggressive retrying before giving up.
181    HighReliability,
182}
183
184/// Signed transaction payload variants accepted by submit APIs.
185#[derive(Debug, Clone, Eq, PartialEq)]
186pub enum SignedTx {
187    /// Bincode-serialized `VersionedTransaction` bytes.
188    VersionedTransactionBytes(Vec<u8>),
189    /// Wire-format transaction bytes.
190    WireTransactionBytes(Vec<u8>),
191}
192
193/// RPC submit tuning.
194#[derive(Debug, Clone, Eq, PartialEq)]
195pub struct RpcSubmitConfig {
196    /// Skip preflight simulation when true.
197    pub skip_preflight: bool,
198    /// Optional preflight commitment string.
199    pub preflight_commitment: Option<String>,
200}
201
202impl Default for RpcSubmitConfig {
203    fn default() -> Self {
204        Self {
205            skip_preflight: true,
206            preflight_commitment: None,
207        }
208    }
209}
210
211/// Jito block-engine submit tuning.
212#[derive(Debug, Clone, Eq, PartialEq, Default)]
213pub struct JitoSubmitConfig {
214    /// Enables revert protection by sending through bundle-only mode.
215    ///
216    /// This applies to the JSON-RPC transaction path. The gRPC transport is bundle-based and
217    /// therefore always behaves as a bundle submission.
218    pub bundle_only: bool,
219}
220
221/// Successful Jito submission metadata.
222#[derive(Debug, Clone, Eq, PartialEq, Default)]
223pub struct JitoSubmitResponse {
224    /// Jito JSON-RPC returned transaction signature, when available.
225    pub transaction_signature: Option<String>,
226    /// Jito gRPC returned bundle UUID, when available.
227    pub bundle_id: Option<String>,
228}
229
230/// Direct submit tuning.
231#[derive(Debug, Clone, Eq, PartialEq)]
232pub struct DirectSubmitConfig {
233    /// Per-target send timeout.
234    pub per_target_timeout: Duration,
235    /// Global send budget for one submission.
236    pub global_timeout: Duration,
237    /// Number of rounds to iterate across selected direct targets.
238    pub direct_target_rounds: usize,
239    /// Number of direct-only submit attempts (target selection can refresh per attempt).
240    pub direct_submit_attempts: usize,
241    /// Number of direct submit attempts in `Hybrid` mode before RPC fallback.
242    pub hybrid_direct_attempts: usize,
243    /// Delay between direct rebroadcast attempts/rounds (Agave-like pacing).
244    pub rebroadcast_interval: Duration,
245    /// Enables Agave-style post-ack rebroadcast persistence for direct submits.
246    pub agave_rebroadcast_enabled: bool,
247    /// Maximum time budget for background rebroadcast persistence.
248    pub agave_rebroadcast_window: Duration,
249    /// Delay between background rebroadcast cycles.
250    pub agave_rebroadcast_interval: Duration,
251    /// When true, `Hybrid` mode broadcasts to RPC even after direct send succeeds.
252    pub hybrid_rpc_broadcast: bool,
253    /// Enables latency-aware ordering of direct targets before submit.
254    pub latency_aware_targeting: bool,
255    /// Timeout used for per-target TCP latency probes.
256    pub latency_probe_timeout: Duration,
257    /// Optional extra TCP port to probe (in addition to target TPU port).
258    pub latency_probe_port: Option<u16>,
259    /// Max number of targets to probe per submission.
260    pub latency_probe_max_targets: usize,
261}
262
263impl DirectSubmitConfig {
264    /// Builds a direct-submit config from a reliability profile.
265    #[must_use]
266    pub const fn from_reliability(reliability: SubmitReliability) -> Self {
267        match reliability {
268            SubmitReliability::LowLatency => Self {
269                per_target_timeout: Duration::from_millis(200),
270                global_timeout: Duration::from_millis(1_200),
271                direct_target_rounds: 3,
272                direct_submit_attempts: 3,
273                hybrid_direct_attempts: 2,
274                rebroadcast_interval: Duration::from_millis(90),
275                agave_rebroadcast_enabled: true,
276                agave_rebroadcast_window: Duration::from_secs(30),
277                agave_rebroadcast_interval: Duration::from_millis(700),
278                hybrid_rpc_broadcast: false,
279                latency_aware_targeting: true,
280                latency_probe_timeout: Duration::from_millis(80),
281                latency_probe_port: Some(8899),
282                latency_probe_max_targets: 128,
283            },
284            SubmitReliability::Balanced => Self {
285                per_target_timeout: Duration::from_millis(300),
286                global_timeout: Duration::from_millis(1_800),
287                direct_target_rounds: 4,
288                direct_submit_attempts: 4,
289                hybrid_direct_attempts: 3,
290                rebroadcast_interval: Duration::from_millis(110),
291                agave_rebroadcast_enabled: true,
292                agave_rebroadcast_window: Duration::from_secs(45),
293                agave_rebroadcast_interval: Duration::from_millis(800),
294                hybrid_rpc_broadcast: true,
295                latency_aware_targeting: true,
296                latency_probe_timeout: Duration::from_millis(120),
297                latency_probe_port: Some(8899),
298                latency_probe_max_targets: 128,
299            },
300            SubmitReliability::HighReliability => Self {
301                per_target_timeout: Duration::from_millis(450),
302                global_timeout: Duration::from_millis(3_200),
303                direct_target_rounds: 6,
304                direct_submit_attempts: 5,
305                hybrid_direct_attempts: 4,
306                rebroadcast_interval: Duration::from_millis(140),
307                agave_rebroadcast_enabled: true,
308                agave_rebroadcast_window: Duration::from_secs(70),
309                agave_rebroadcast_interval: Duration::from_millis(900),
310                hybrid_rpc_broadcast: true,
311                latency_aware_targeting: true,
312                latency_probe_timeout: Duration::from_millis(160),
313                latency_probe_port: Some(8899),
314                latency_probe_max_targets: 128,
315            },
316        }
317    }
318
319    /// Returns this config with minimum valid retry counters.
320    #[must_use]
321    pub const fn normalized(self) -> Self {
322        let direct_target_rounds = if self.direct_target_rounds == 0 {
323            1
324        } else {
325            self.direct_target_rounds
326        };
327        let direct_submit_attempts = if self.direct_submit_attempts == 0 {
328            1
329        } else {
330            self.direct_submit_attempts
331        };
332        let hybrid_direct_attempts = if self.hybrid_direct_attempts == 0 {
333            1
334        } else {
335            self.hybrid_direct_attempts
336        };
337        let latency_probe_max_targets = if self.latency_probe_max_targets == 0 {
338            1
339        } else {
340            self.latency_probe_max_targets
341        };
342        let rebroadcast_interval = if self.rebroadcast_interval.is_zero() {
343            Duration::from_millis(1)
344        } else {
345            self.rebroadcast_interval
346        };
347        let agave_rebroadcast_interval = if self.agave_rebroadcast_interval.is_zero() {
348            Duration::from_millis(1)
349        } else {
350            self.agave_rebroadcast_interval
351        };
352        Self {
353            per_target_timeout: self.per_target_timeout,
354            global_timeout: self.global_timeout,
355            direct_target_rounds,
356            direct_submit_attempts,
357            hybrid_direct_attempts,
358            rebroadcast_interval,
359            agave_rebroadcast_enabled: self.agave_rebroadcast_enabled,
360            agave_rebroadcast_window: self.agave_rebroadcast_window,
361            agave_rebroadcast_interval,
362            hybrid_rpc_broadcast: self.hybrid_rpc_broadcast,
363            latency_aware_targeting: self.latency_aware_targeting,
364            latency_probe_timeout: self.latency_probe_timeout,
365            latency_probe_port: self.latency_probe_port,
366            latency_probe_max_targets,
367        }
368    }
369}
370
371impl Default for DirectSubmitConfig {
372    fn default() -> Self {
373        Self::from_reliability(SubmitReliability::default())
374    }
375}
376
377/// Low-level transport errors surfaced by submit backends.
378#[derive(Debug, Error, Clone, Eq, PartialEq)]
379pub enum SubmitTransportError {
380    /// Invalid transport configuration.
381    #[error("transport configuration invalid: {message}")]
382    Config {
383        /// Human-readable description.
384        message: String,
385    },
386    /// Transport operation failed.
387    #[error("transport failure: {message}")]
388    Failure {
389        /// Human-readable description.
390        message: String,
391    },
392}
393
394/// Submission-level errors.
395#[derive(Debug, Error)]
396pub enum SubmitError {
397    /// No blockhash available for unsigned submit path.
398    #[error("blockhash provider returned no recent blockhash")]
399    MissingRecentBlockhash,
400    /// Signed bytes could not be decoded into a transaction.
401    #[error("failed to decode signed transaction bytes: {source}")]
402    DecodeSignedBytes {
403        /// Bincode decode error.
404        source: Box<bincode::ErrorKind>,
405    },
406    /// Duplicate signature was suppressed by dedupe window.
407    #[error("duplicate signature suppressed by dedupe window")]
408    DuplicateSignature,
409    /// RPC mode requested but no RPC transport was configured.
410    #[error("rpc transport is not configured")]
411    MissingRpcTransport,
412    /// Jito mode requested but no Jito transport was configured.
413    #[error("jito transport is not configured")]
414    MissingJitoTransport,
415    /// Direct mode requested but no direct transport was configured.
416    #[error("direct transport is not configured")]
417    MissingDirectTransport,
418    /// No direct targets resolved from routing inputs.
419    #[error("no direct targets resolved from leader/backups")]
420    NoDirectTargets,
421    /// Direct transport failure.
422    #[error("direct submit failed: {source}")]
423    Direct {
424        /// Direct transport error.
425        source: SubmitTransportError,
426    },
427    /// RPC transport failure.
428    #[error("rpc submit failed: {source}")]
429    Rpc {
430        /// RPC transport error.
431        source: SubmitTransportError,
432    },
433    /// Jito transport failure.
434    #[error("jito submit failed: {source}")]
435    Jito {
436        /// Jito transport error.
437        source: SubmitTransportError,
438    },
439    /// Internal synchronization failure.
440    #[error("internal synchronization failure: {message}")]
441    InternalSync {
442        /// Synchronization error details.
443        message: String,
444    },
445    /// Submit attempt was rejected by the toxic-flow guard.
446    #[error("submission rejected by toxic-flow guard: {reason}")]
447    ToxicFlow {
448        /// Structured reason for the rejection.
449        reason: TxToxicFlowRejectionReason,
450    },
451}
452
453/// Summary of a successful submission.
454#[derive(Debug, Clone, Eq, PartialEq)]
455pub struct SubmitResult {
456    /// Signature parsed from submitted transaction bytes.
457    pub signature: Option<SignatureBytes>,
458    /// Route plan selected by caller.
459    pub plan: SubmitPlan,
460    /// Legacy preset used by caller when one exact preset was selected.
461    pub legacy_mode: Option<SubmitMode>,
462    /// First successful route observed before this submit call returned.
463    pub first_success_route: Option<SubmitRoute>,
464    ///
465    /// Later background accepts from concurrently configured routes are counted by built-in
466    /// telemetry and are best-effort through [`TxSubmitOutcomeReporter`], including any
467    /// route-specific acceptance metadata, rather than retroactively mutating this synchronous
468    /// result. Reporter delivery failures are reflected in
469    /// [`TxToxicFlowTelemetrySnapshot::reporter_outcomes_dropped`] and
470    /// [`TxToxicFlowTelemetrySnapshot::reporter_outcomes_unavailable`].
471    pub successful_routes: Vec<SubmitRoute>,
472    /// Target chosen by direct path when applicable.
473    pub direct_target: Option<LeaderTarget>,
474    /// RPC-returned signature string when RPC path succeeded before return.
475    pub rpc_signature: Option<String>,
476    /// Jito block-engine returned transaction signature when the JSON-RPC path succeeded before
477    /// return.
478    pub jito_signature: Option<String>,
479    /// Jito block-engine returned bundle UUID when the gRPC bundle path succeeded before return.
480    pub jito_bundle_id: Option<String>,
481    /// True when one later ordered-fallback route accepted the submit before return.
482    pub used_fallback_route: bool,
483    /// Number of direct targets selected for submit attempt that succeeded.
484    pub selected_target_count: usize,
485    /// Number of unique validator identities in selected direct targets.
486    pub selected_identity_count: usize,
487}
488
489/// Coarse toxic-flow quality used by submit guards.
490#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
491pub enum TxFlowSafetyQuality {
492    /// Required inputs are present, coherent, and safe to use.
493    Stable,
494    /// Inputs exist but have not reached a stable confirmation boundary yet.
495    Provisional,
496    /// Inputs exist but the current branch still carries material reorg risk.
497    ReorgRisk,
498    /// Inputs are present but stale.
499    Stale,
500    /// Inputs are present but mutually inconsistent.
501    Degraded,
502    /// Required inputs are still missing.
503    IncompleteControlPlane,
504}
505
506/// One concrete toxic-flow issue reported by a submit guard source.
507#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
508pub enum TxFlowSafetyIssue {
509    /// Submit source is still recovering from replay/checkpoint continuity.
510    ReplayRecoveryPending,
511    /// Control-plane inputs are still missing.
512    MissingControlPlane,
513    /// Control-plane inputs are stale.
514    StaleControlPlane,
515    /// Control-plane inputs are inconsistent.
516    DegradedControlPlane,
517    /// Current branch carries reorg risk.
518    ReorgRisk,
519    /// Current branch is still provisional.
520    Provisional,
521}
522
523/// Current toxic-flow safety snapshot exposed by one submit guard source.
524#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
525pub struct TxFlowSafetySnapshot {
526    /// Coarse quality classification for the current control-plane view.
527    pub quality: TxFlowSafetyQuality,
528    /// Concrete issues behind `quality`.
529    pub issues: Vec<TxFlowSafetyIssue>,
530    /// Current upstream state version when known.
531    pub current_state_version: Option<u64>,
532    /// True when replay recovery is still pending.
533    pub replay_recovery_pending: bool,
534}
535
536impl TxFlowSafetySnapshot {
537    /// Returns true when the current snapshot is strategy-safe.
538    #[must_use]
539    pub const fn is_safe(&self) -> bool {
540        matches!(self.quality, TxFlowSafetyQuality::Stable) && !self.replay_recovery_pending
541    }
542}
543
544/// Dynamic source of toxic-flow safety state for submit guards.
545pub trait TxFlowSafetySource: Send + Sync {
546    /// Returns the latest toxic-flow safety snapshot.
547    fn toxic_flow_snapshot(&self) -> TxFlowSafetySnapshot;
548}
549
550/// One key used to suppress repeated submission attempts for the same opportunity.
551#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
552pub enum TxSubmitSuppressionKey {
553    /// Suppress by signature.
554    Signature(SignatureBytes),
555    /// Suppress by opaque opportunity identifier.
556    Opportunity([u8; 32]),
557    /// Suppress by hashed account set identifier.
558    AccountSet([u8; 32]),
559    /// Suppress by slot-window key.
560    SlotWindow {
561        /// Slot associated with the opportunity.
562        slot: u64,
563        /// Window width used to group nearby opportunities.
564        window: u64,
565    },
566}
567
568impl Hash for TxSubmitSuppressionKey {
569    fn hash<H: Hasher>(&self, state: &mut H) {
570        match self {
571            Self::Signature(signature) => {
572                0_u8.hash(state);
573                signature.as_array().hash(state);
574            }
575            Self::Opportunity(key) => {
576                1_u8.hash(state);
577                key.hash(state);
578            }
579            Self::AccountSet(key) => {
580                2_u8.hash(state);
581                key.hash(state);
582            }
583            Self::SlotWindow { slot, window } => {
584                3_u8.hash(state);
585                slot.hash(state);
586                window.hash(state);
587            }
588        }
589    }
590}
591
592/// Call-site context used by toxic-flow guards.
593#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
594pub struct TxSubmitContext {
595    /// Additional suppression keys to apply before submit.
596    pub suppression_keys: Vec<TxSubmitSuppressionKey>,
597    /// State version used when the submit decision was made.
598    pub decision_state_version: Option<u64>,
599    /// Timestamp when the opportunity or decision was created.
600    pub opportunity_created_at: Option<SystemTime>,
601}
602
603/// Policy controlling toxic-flow submit rejection.
604#[derive(Debug, Clone, Eq, PartialEq)]
605pub struct TxSubmitGuardPolicy {
606    /// Reject when the flow-safety source is not `Stable`.
607    pub require_stable_control_plane: bool,
608    /// Reject when the flow-safety source reports replay recovery pending.
609    pub reject_on_replay_recovery_pending: bool,
610    /// Maximum allowed drift between decision and current state versions.
611    pub max_state_version_drift: Option<u64>,
612    /// Maximum allowed age for one opportunity before submit.
613    pub max_opportunity_age: Option<Duration>,
614    /// TTL applied to built-in suppression keys.
615    pub suppression_ttl: Duration,
616}
617
618impl Default for TxSubmitGuardPolicy {
619    fn default() -> Self {
620        Self {
621            require_stable_control_plane: true,
622            reject_on_replay_recovery_pending: true,
623            max_state_version_drift: Some(4),
624            max_opportunity_age: Some(Duration::from_millis(750)),
625            suppression_ttl: Duration::from_millis(750),
626        }
627    }
628}
629
630/// Concrete reason one submit attempt was rejected before transport.
631#[derive(Debug, Error, Clone, Eq, PartialEq, Serialize, Deserialize)]
632pub enum TxToxicFlowRejectionReason {
633    /// Control-plane quality is not safe enough.
634    #[error("control-plane quality {quality:?} is not safe for submit")]
635    UnsafeControlPlane {
636        /// Current quality observed from the guard source.
637        quality: TxFlowSafetyQuality,
638    },
639    /// Replay recovery is still pending.
640    #[error("submit source is still recovering replay continuity")]
641    ReplayRecoveryPending,
642    /// One suppression key is still active.
643    #[error("submit suppressed by active key")]
644    Suppressed,
645    /// State version drift exceeded policy.
646    #[error("state version drift {drift} exceeded maximum {max_allowed}")]
647    StateDrift {
648        /// Observed drift between decision and current state versions.
649        drift: u64,
650        /// Maximum drift allowed by policy.
651        max_allowed: u64,
652    },
653    /// Opportunity age exceeded policy.
654    #[error("opportunity age {age_ms}ms exceeded maximum {max_allowed_ms}ms")]
655    OpportunityStale {
656        /// Observed age in milliseconds.
657        age_ms: u64,
658        /// Maximum age allowed in milliseconds.
659        max_allowed_ms: u64,
660    },
661}
662
663/// Final or immediate outcome classification for one submit attempt.
664#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
665pub enum TxSubmitOutcomeKind {
666    /// Direct path accepted the transaction.
667    DirectAccepted,
668    /// RPC path accepted the transaction.
669    RpcAccepted,
670    /// Jito block-engine accepted the transaction.
671    JitoAccepted,
672    /// Transaction landed on chain.
673    Landed,
674    /// Transaction expired before landing.
675    Expired,
676    /// Transaction was dropped before landing.
677    Dropped,
678    /// Route missed the intended leader window.
679    LeaderMissed,
680    /// Submit used a stale blockhash.
681    BlockhashStale,
682    /// Selected route was unhealthy.
683    UnhealthyRoute,
684    /// Submit was rejected due to stale inputs.
685    RejectedDueToStaleness,
686    /// Submit was rejected due to reorg risk.
687    RejectedDueToReorgRisk,
688    /// Submit was rejected due to state drift.
689    RejectedDueToStateDrift,
690    /// Submit was rejected by replay recovery pending.
691    RejectedDueToReplayRecovery,
692    /// Submit was suppressed by a built-in key.
693    Suppressed,
694}
695
696/// Structured outcome record for toxic-flow telemetry/reporting.
697///
698/// Route-level accepts may carry richer metadata than the synchronous [`SubmitResult`] when one
699/// different route already returned first. Consumers that care about later accepts should observe
700/// this surface rather than expecting the original [`SubmitResult`] to mutate.
701#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
702pub struct TxSubmitOutcome {
703    /// Outcome classification.
704    pub kind: TxSubmitOutcomeKind,
705    /// Transaction signature when available.
706    pub signature: Option<SignatureBytes>,
707    /// Concrete accepted route when the outcome came from one route-level success.
708    pub route: Option<SubmitRoute>,
709    /// Route plan selected for the submit attempt.
710    pub plan: SubmitPlan,
711    /// Legacy preset used for the submit attempt when applicable.
712    pub legacy_mode: Option<SubmitMode>,
713    /// RPC-returned signature metadata when the RPC route accepted.
714    pub rpc_signature: Option<String>,
715    /// Jito-returned transaction signature metadata when the Jito route accepted.
716    pub jito_signature: Option<String>,
717    /// Jito-returned bundle UUID when the gRPC bundle route accepted.
718    pub jito_bundle_id: Option<String>,
719    /// Current state version at outcome time when known.
720    pub state_version: Option<u64>,
721    /// Opportunity age in milliseconds when known.
722    pub opportunity_age_ms: Option<u64>,
723}
724
725/// Callback surface for external outcome sinks.
726///
727/// `TxSubmitClient` delivers these callbacks asynchronously through one bounded FIFO dispatcher per
728/// reporter instance, shared across clients that use that same reporter. That keeps the reporter
729/// off the synchronous submit hot path while preserving outcome order for events that are
730/// successfully queued. Under sustained pressure or dispatcher startup failure, reporter delivery
731/// is best-effort and built-in telemetry remains the authoritative always-inline outcome counter
732/// surface.
733pub trait TxSubmitOutcomeReporter: Send + Sync {
734    /// Records one structured outcome.
735    fn record_outcome(&self, outcome: &TxSubmitOutcome);
736}
737
738/// Snapshot of built-in toxic-flow counters collected by [`TxSubmitClient`](crate::submit::TxSubmitClient).
739#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
740pub struct TxToxicFlowTelemetrySnapshot {
741    /// Number of external reporter outcomes dropped because the reporter queue was full.
742    pub reporter_outcomes_dropped: u64,
743    /// Number of outcomes that could not reach the external reporter because the worker was unavailable.
744    pub reporter_outcomes_unavailable: u64,
745    /// Number of direct-route accepts observed.
746    pub direct_accepted: u64,
747    /// Number of RPC-route accepts observed.
748    pub rpc_accepted: u64,
749    /// Number of Jito-route accepts observed.
750    pub jito_accepted: u64,
751    /// Number of submit attempts rejected due to stale inputs.
752    pub rejected_due_to_staleness: u64,
753    /// Number of submit attempts rejected due to reorg risk.
754    pub rejected_due_to_reorg_risk: u64,
755    /// Number of submit attempts rejected due to state drift.
756    pub rejected_due_to_state_drift: u64,
757    /// Number of accepted submits emitted while the source was stale.
758    pub submit_on_stale_blockhash: u64,
759    /// Number of route misses classified as leader misses.
760    pub leader_route_miss_rate: u64,
761    /// Most recent opportunity age seen at submit time.
762    pub opportunity_age_at_send_ms: Option<u64>,
763    /// Number of submits rejected while replay recovery was pending.
764    pub rejected_due_to_replay_recovery: u64,
765    /// Number of submits suppressed by a built-in key.
766    pub suppressed_submissions: u64,
767}
768
769/// One cache-line-aligned atomic counter used by hot telemetry updates.
770#[derive(Debug, Default)]
771#[repr(align(64))]
772struct CacheAlignedAtomicU64(AtomicU64);
773
774impl CacheAlignedAtomicU64 {
775    /// Loads the current counter value.
776    fn load(&self, ordering: Ordering) -> u64 {
777        self.0.load(ordering)
778    }
779
780    /// Stores a new value and returns the previous one.
781    fn swap(&self, value: u64, ordering: Ordering) -> u64 {
782        self.0.swap(value, ordering)
783    }
784
785    /// Increments the counter and returns the previous value.
786    fn fetch_add(&self, value: u64, ordering: Ordering) -> u64 {
787        self.0.fetch_add(value, ordering)
788    }
789}
790
791/// In-memory telemetry counters for toxic-flow outcomes.
792#[derive(Debug, Default)]
793pub struct TxToxicFlowTelemetry {
794    /// Number of external reporter outcomes dropped because the reporter queue was full.
795    reporter_outcomes_dropped: CacheAlignedAtomicU64,
796    /// Number of outcomes that could not reach the external reporter because the worker was unavailable.
797    reporter_outcomes_unavailable: CacheAlignedAtomicU64,
798    /// Number of direct-route accepts.
799    direct_accepted: CacheAlignedAtomicU64,
800    /// Number of RPC-route accepts.
801    rpc_accepted: CacheAlignedAtomicU64,
802    /// Number of Jito-route accepts.
803    jito_accepted: CacheAlignedAtomicU64,
804    /// Number of stale-input rejections.
805    rejected_due_to_staleness: CacheAlignedAtomicU64,
806    /// Number of reorg-risk rejections.
807    rejected_due_to_reorg_risk: CacheAlignedAtomicU64,
808    /// Number of state-drift rejections.
809    rejected_due_to_state_drift: CacheAlignedAtomicU64,
810    /// Number of accepted submits observed with stale blockhash state.
811    submit_on_stale_blockhash: CacheAlignedAtomicU64,
812    /// Number of leader-route misses.
813    leader_route_miss_rate: CacheAlignedAtomicU64,
814    /// Last opportunity age seen by the client.
815    opportunity_age_at_send_ms: CacheAlignedAtomicU64,
816    /// Number of replay-recovery rejections.
817    rejected_due_to_replay_recovery: CacheAlignedAtomicU64,
818    /// Number of suppressed submissions.
819    suppressed_submissions: CacheAlignedAtomicU64,
820}
821
822impl TxToxicFlowTelemetry {
823    /// Returns a shareable telemetry sink.
824    #[must_use]
825    pub fn shared() -> Arc<Self> {
826        Arc::new(Self::default())
827    }
828
829    /// Records one structured outcome.
830    pub fn record(&self, outcome: &TxSubmitOutcome) {
831        if let Some(age_ms) = outcome.opportunity_age_ms {
832            let _ = self
833                .opportunity_age_at_send_ms
834                .swap(age_ms, Ordering::Relaxed);
835        }
836        match outcome.kind {
837            TxSubmitOutcomeKind::DirectAccepted => {
838                let _ = self.direct_accepted.fetch_add(1, Ordering::Relaxed);
839            }
840            TxSubmitOutcomeKind::RpcAccepted => {
841                let _ = self.rpc_accepted.fetch_add(1, Ordering::Relaxed);
842            }
843            TxSubmitOutcomeKind::JitoAccepted => {
844                let _ = self.jito_accepted.fetch_add(1, Ordering::Relaxed);
845            }
846            TxSubmitOutcomeKind::RejectedDueToStaleness => {
847                let _ = self
848                    .rejected_due_to_staleness
849                    .fetch_add(1, Ordering::Relaxed);
850            }
851            TxSubmitOutcomeKind::RejectedDueToReorgRisk => {
852                let _ = self
853                    .rejected_due_to_reorg_risk
854                    .fetch_add(1, Ordering::Relaxed);
855            }
856            TxSubmitOutcomeKind::RejectedDueToStateDrift => {
857                let _ = self
858                    .rejected_due_to_state_drift
859                    .fetch_add(1, Ordering::Relaxed);
860            }
861            TxSubmitOutcomeKind::RejectedDueToReplayRecovery => {
862                let _ = self
863                    .rejected_due_to_replay_recovery
864                    .fetch_add(1, Ordering::Relaxed);
865            }
866            TxSubmitOutcomeKind::Suppressed => {
867                let _ = self.suppressed_submissions.fetch_add(1, Ordering::Relaxed);
868            }
869            TxSubmitOutcomeKind::LeaderMissed => {
870                let _ = self.leader_route_miss_rate.fetch_add(1, Ordering::Relaxed);
871            }
872            TxSubmitOutcomeKind::Landed
873            | TxSubmitOutcomeKind::Expired
874            | TxSubmitOutcomeKind::Dropped
875            | TxSubmitOutcomeKind::UnhealthyRoute => {}
876            TxSubmitOutcomeKind::BlockhashStale => {
877                let _ = self
878                    .submit_on_stale_blockhash
879                    .fetch_add(1, Ordering::Relaxed);
880            }
881        }
882    }
883
884    /// Records one dropped external-reporter outcome.
885    pub(crate) fn record_reporter_drop(&self) {
886        let _ = self
887            .reporter_outcomes_dropped
888            .fetch_add(1, Ordering::Relaxed);
889    }
890
891    /// Records one unavailable external-reporter outcome.
892    pub(crate) fn record_reporter_unavailable(&self) {
893        let _ = self
894            .reporter_outcomes_unavailable
895            .fetch_add(1, Ordering::Relaxed);
896    }
897
898    /// Returns the current telemetry snapshot.
899    #[must_use]
900    pub fn snapshot(&self) -> TxToxicFlowTelemetrySnapshot {
901        let age_ms = self.opportunity_age_at_send_ms.load(Ordering::Relaxed);
902        TxToxicFlowTelemetrySnapshot {
903            reporter_outcomes_dropped: self.reporter_outcomes_dropped.load(Ordering::Relaxed),
904            reporter_outcomes_unavailable: self
905                .reporter_outcomes_unavailable
906                .load(Ordering::Relaxed),
907            direct_accepted: self.direct_accepted.load(Ordering::Relaxed),
908            rpc_accepted: self.rpc_accepted.load(Ordering::Relaxed),
909            jito_accepted: self.jito_accepted.load(Ordering::Relaxed),
910            rejected_due_to_staleness: self.rejected_due_to_staleness.load(Ordering::Relaxed),
911            rejected_due_to_reorg_risk: self.rejected_due_to_reorg_risk.load(Ordering::Relaxed),
912            rejected_due_to_state_drift: self.rejected_due_to_state_drift.load(Ordering::Relaxed),
913            submit_on_stale_blockhash: self.submit_on_stale_blockhash.load(Ordering::Relaxed),
914            leader_route_miss_rate: self.leader_route_miss_rate.load(Ordering::Relaxed),
915            opportunity_age_at_send_ms: if age_ms == 0 { None } else { Some(age_ms) },
916            rejected_due_to_replay_recovery: self
917                .rejected_due_to_replay_recovery
918                .load(Ordering::Relaxed),
919            suppressed_submissions: self.suppressed_submissions.load(Ordering::Relaxed),
920        }
921    }
922}
923
924impl TxSubmitOutcomeReporter for TxToxicFlowTelemetry {
925    fn record_outcome(&self, outcome: &TxSubmitOutcome) {
926        self.record(outcome);
927    }
928}
929
930/// Internal suppression map used by the submit client.
931#[derive(Debug, Default)]
932pub(crate) struct TxSuppressionCache {
933    /// Active suppression entries keyed by opportunity identity.
934    entries: HashMap<TxSubmitSuppressionKey, SystemTime>,
935}
936
937impl TxSuppressionCache {
938    /// Returns true when at least one key is still active inside `ttl`.
939    pub(crate) fn is_suppressed(
940        &mut self,
941        keys: &[TxSubmitSuppressionKey],
942        now: SystemTime,
943        ttl: Duration,
944    ) -> bool {
945        self.evict_expired(now, ttl);
946        keys.iter().any(|key| self.entries.contains_key(key))
947    }
948
949    /// Inserts all provided suppression keys with the current timestamp.
950    pub(crate) fn insert_all(&mut self, keys: &[TxSubmitSuppressionKey], now: SystemTime) {
951        for key in keys {
952            let _ = self.entries.insert(key.clone(), now);
953        }
954    }
955
956    /// Removes entries older than the current TTL window.
957    fn evict_expired(&mut self, now: SystemTime, ttl: Duration) {
958        self.entries.retain(|_, inserted_at| {
959            now.duration_since(*inserted_at)
960                .map(|elapsed| elapsed <= ttl)
961                .unwrap_or(false)
962        });
963    }
964}
965
966/// RPC transport interface.
967#[async_trait]
968pub trait RpcSubmitTransport: Send + Sync {
969    /// Submits transaction bytes to RPC and returns signature string.
970    async fn submit_rpc(
971        &self,
972        tx_bytes: &[u8],
973        config: &RpcSubmitConfig,
974    ) -> Result<String, SubmitTransportError>;
975}
976
977/// Jito transport interface.
978#[async_trait]
979pub trait JitoSubmitTransport: Send + Sync {
980    /// Submits transaction bytes to Jito block engine and returns Jito-specific acceptance data.
981    async fn submit_jito(
982        &self,
983        tx_bytes: &[u8],
984        config: &JitoSubmitConfig,
985    ) -> Result<JitoSubmitResponse, SubmitTransportError>;
986}
987
988/// Direct transport interface.
989#[async_trait]
990pub trait DirectSubmitTransport: Send + Sync {
991    /// Submits transaction bytes to direct targets and returns the first successful target.
992    async fn submit_direct(
993        &self,
994        tx_bytes: &[u8],
995        targets: &[LeaderTarget],
996        policy: RoutingPolicy,
997        config: &DirectSubmitConfig,
998    ) -> Result<LeaderTarget, SubmitTransportError>;
999}