Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    sync::Arc,
5    time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9use solana_signer::signers::Signers;
10use solana_transaction::versioned::VersionedTransaction;
11
12use super::{
13    DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
14    SubmitError, SubmitMode, SubmitReliability, SubmitResult,
15};
16use crate::{
17    builder::TxBuilder,
18    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
19    routing::{RoutingPolicy, SignatureDeduper, select_targets},
20};
21
22/// Transaction submission client that orchestrates RPC and direct submit modes.
23pub struct TxSubmitClient {
24    /// Blockhash source used by builder submit path.
25    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
26    /// Leader source used by direct/hybrid paths.
27    leader_provider: Arc<dyn LeaderProvider>,
28    /// Optional backup validator targets.
29    backups: Vec<LeaderTarget>,
30    /// Direct routing policy.
31    policy: RoutingPolicy,
32    /// Signature dedupe window.
33    deduper: SignatureDeduper,
34    /// Optional RPC transport.
35    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
36    /// Optional direct transport.
37    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
38    /// RPC tuning.
39    rpc_config: RpcSubmitConfig,
40    /// Direct tuning.
41    direct_config: DirectSubmitConfig,
42}
43
44impl TxSubmitClient {
45    /// Creates a submission client with no transports preconfigured.
46    #[must_use]
47    pub fn new(
48        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
49        leader_provider: Arc<dyn LeaderProvider>,
50    ) -> Self {
51        Self {
52            blockhash_provider,
53            leader_provider,
54            backups: Vec::new(),
55            policy: RoutingPolicy::default(),
56            deduper: SignatureDeduper::new(Duration::from_secs(10)),
57            rpc_transport: None,
58            direct_transport: None,
59            rpc_config: RpcSubmitConfig::default(),
60            direct_config: DirectSubmitConfig::default(),
61        }
62    }
63
64    /// Sets optional backup validators.
65    #[must_use]
66    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
67        self.backups = backups;
68        self
69    }
70
71    /// Sets routing policy.
72    #[must_use]
73    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
74        self.policy = policy.normalized();
75        self
76    }
77
78    /// Sets dedupe TTL.
79    #[must_use]
80    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
81        self.deduper = SignatureDeduper::new(ttl);
82        self
83    }
84
85    /// Sets RPC transport.
86    #[must_use]
87    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
88        self.rpc_transport = Some(transport);
89        self
90    }
91
92    /// Sets direct transport.
93    #[must_use]
94    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
95        self.direct_transport = Some(transport);
96        self
97    }
98
99    /// Sets RPC submit tuning.
100    #[must_use]
101    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
102        self.rpc_config = config;
103        self
104    }
105
106    /// Sets direct submit tuning.
107    #[must_use]
108    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
109        self.direct_config = config.normalized();
110        self
111    }
112
113    /// Sets direct/hybrid reliability profile.
114    #[must_use]
115    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
116        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
117        self
118    }
119
120    /// Builds, signs, and submits a transaction in one API call.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
125    /// fails.
126    pub async fn submit_builder<T>(
127        &mut self,
128        builder: TxBuilder,
129        signers: &T,
130        mode: SubmitMode,
131    ) -> Result<SubmitResult, SubmitError>
132    where
133        T: Signers + ?Sized,
134    {
135        let blockhash = self
136            .blockhash_provider
137            .latest_blockhash()
138            .ok_or(SubmitError::MissingRecentBlockhash)?;
139        let tx = builder
140            .build_and_sign(blockhash, signers)
141            .map_err(|source| SubmitError::Build { source })?;
142        self.submit_transaction(tx, mode).await
143    }
144
145    /// Submits one signed `VersionedTransaction`.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
150    pub async fn submit_transaction(
151        &mut self,
152        tx: VersionedTransaction,
153        mode: SubmitMode,
154    ) -> Result<SubmitResult, SubmitError> {
155        let signature = tx.signatures.first().copied();
156        let tx_bytes =
157            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
158        self.submit_bytes(tx_bytes, signature, mode).await
159    }
160
161    /// Submits externally signed transaction bytes.
162    ///
163    /// # Errors
164    ///
165    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
166    pub async fn submit_signed(
167        &mut self,
168        signed_tx: SignedTx,
169        mode: SubmitMode,
170    ) -> Result<SubmitResult, SubmitError> {
171        let tx_bytes = match signed_tx {
172            SignedTx::VersionedTransactionBytes(bytes) => bytes,
173            SignedTx::WireTransactionBytes(bytes) => bytes,
174        };
175        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
176            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
177        let signature = tx.signatures.first().copied();
178        self.submit_bytes(tx_bytes, signature, mode).await
179    }
180
181    /// Submits raw tx bytes after dedupe check.
182    async fn submit_bytes(
183        &mut self,
184        tx_bytes: Vec<u8>,
185        signature: Option<Signature>,
186        mode: SubmitMode,
187    ) -> Result<SubmitResult, SubmitError> {
188        self.enforce_dedupe(signature)?;
189        match mode {
190            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
191            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
192            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
193        }
194    }
195
196    /// Applies signature dedupe policy.
197    fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
198        if let Some(signature) = signature {
199            let now = Instant::now();
200            if !self.deduper.check_and_insert(signature, now) {
201                return Err(SubmitError::DuplicateSignature);
202            }
203        }
204        Ok(())
205    }
206
207    /// Submits through RPC path only.
208    async fn submit_rpc_only(
209        &self,
210        tx_bytes: Vec<u8>,
211        signature: Option<Signature>,
212        mode: SubmitMode,
213    ) -> Result<SubmitResult, SubmitError> {
214        let rpc = self
215            .rpc_transport
216            .as_ref()
217            .ok_or(SubmitError::MissingRpcTransport)?;
218        let rpc_signature = rpc
219            .submit_rpc(&tx_bytes, &self.rpc_config)
220            .await
221            .map_err(|source| SubmitError::Rpc { source })?;
222        Ok(SubmitResult {
223            signature,
224            mode,
225            direct_target: None,
226            rpc_signature: Some(rpc_signature),
227            used_rpc_fallback: false,
228        })
229    }
230
231    /// Submits through direct path only.
232    async fn submit_direct_only(
233        &self,
234        tx_bytes: Vec<u8>,
235        signature: Option<Signature>,
236        mode: SubmitMode,
237    ) -> Result<SubmitResult, SubmitError> {
238        let direct = self
239            .direct_transport
240            .as_ref()
241            .ok_or(SubmitError::MissingDirectTransport)?;
242        let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
243        if targets.is_empty() {
244            return Err(SubmitError::NoDirectTargets);
245        }
246        let direct_config = self.direct_config.clone().normalized();
247        let target = direct
248            .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
249            .await
250            .map_err(|source| SubmitError::Direct { source })?;
251        Ok(SubmitResult {
252            signature,
253            mode,
254            direct_target: Some(target),
255            rpc_signature: None,
256            used_rpc_fallback: false,
257        })
258    }
259
260    /// Submits through hybrid mode (direct first, RPC fallback).
261    async fn submit_hybrid(
262        &self,
263        tx_bytes: Vec<u8>,
264        signature: Option<Signature>,
265        mode: SubmitMode,
266    ) -> Result<SubmitResult, SubmitError> {
267        let direct = self
268            .direct_transport
269            .as_ref()
270            .ok_or(SubmitError::MissingDirectTransport)?;
271        let rpc = self
272            .rpc_transport
273            .as_ref()
274            .ok_or(SubmitError::MissingRpcTransport)?;
275
276        let direct_config = self.direct_config.clone().normalized();
277        let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
278        if !targets.is_empty() {
279            for _ in 0..direct_config.hybrid_direct_attempts {
280                if let Ok(target) = direct
281                    .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
282                    .await
283                {
284                    return Ok(SubmitResult {
285                        signature,
286                        mode,
287                        direct_target: Some(target),
288                        rpc_signature: None,
289                        used_rpc_fallback: false,
290                    });
291                }
292            }
293        }
294
295        let rpc_signature = rpc
296            .submit_rpc(&tx_bytes, &self.rpc_config)
297            .await
298            .map_err(|source| SubmitError::Rpc { source })?;
299        Ok(SubmitResult {
300            signature,
301            mode,
302            direct_target: None,
303            rpc_signature: Some(rpc_signature),
304            used_rpc_fallback: true,
305        })
306    }
307}