Skip to main content

sof_tx/submit/
client.rs

1//! Submission client implementation and mode orchestration.
2
3use std::{
4    sync::{Arc, Mutex},
5    time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9use solana_signer::signers::Signers;
10use solana_transaction::versioned::VersionedTransaction;
11
12use super::{
13    DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
14    SubmitError, SubmitMode, SubmitReliability, SubmitResult,
15};
16use crate::{
17    builder::TxBuilder,
18    providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
19    routing::{RoutingPolicy, SignatureDeduper, select_targets},
20};
21
22/// Transaction submission client that orchestrates RPC and direct submit modes.
23pub struct TxSubmitClient {
24    /// Blockhash source used by builder submit path.
25    blockhash_provider: Arc<dyn RecentBlockhashProvider>,
26    /// Leader source used by direct/hybrid paths.
27    leader_provider: Arc<dyn LeaderProvider>,
28    /// Optional backup validator targets.
29    backups: Vec<LeaderTarget>,
30    /// Direct routing policy.
31    policy: RoutingPolicy,
32    /// Signature dedupe window.
33    deduper: Mutex<SignatureDeduper>,
34    /// Optional RPC transport.
35    rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
36    /// Optional direct transport.
37    direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
38    /// RPC tuning.
39    rpc_config: RpcSubmitConfig,
40    /// Direct tuning.
41    direct_config: DirectSubmitConfig,
42}
43
44impl TxSubmitClient {
45    /// Creates a submission client with no transports preconfigured.
46    #[must_use]
47    pub fn new(
48        blockhash_provider: Arc<dyn RecentBlockhashProvider>,
49        leader_provider: Arc<dyn LeaderProvider>,
50    ) -> Self {
51        Self {
52            blockhash_provider,
53            leader_provider,
54            backups: Vec::new(),
55            policy: RoutingPolicy::default(),
56            deduper: Mutex::new(SignatureDeduper::new(Duration::from_secs(10))),
57            rpc_transport: None,
58            direct_transport: None,
59            rpc_config: RpcSubmitConfig::default(),
60            direct_config: DirectSubmitConfig::default(),
61        }
62    }
63
64    /// Sets optional backup validators.
65    #[must_use]
66    pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
67        self.backups = backups;
68        self
69    }
70
71    /// Sets routing policy.
72    #[must_use]
73    pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
74        self.policy = policy.normalized();
75        self
76    }
77
78    /// Sets dedupe TTL.
79    #[must_use]
80    pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
81        self.deduper = Mutex::new(SignatureDeduper::new(ttl));
82        self
83    }
84
85    /// Sets RPC transport.
86    #[must_use]
87    pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
88        self.rpc_transport = Some(transport);
89        self
90    }
91
92    /// Sets direct transport.
93    #[must_use]
94    pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
95        self.direct_transport = Some(transport);
96        self
97    }
98
99    /// Sets RPC submit tuning.
100    #[must_use]
101    pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
102        self.rpc_config = config;
103        self
104    }
105
106    /// Sets direct submit tuning.
107    #[must_use]
108    pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
109        self.direct_config = config.normalized();
110        self
111    }
112
113    /// Sets direct/hybrid reliability profile.
114    #[must_use]
115    pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
116        self.direct_config = DirectSubmitConfig::from_reliability(reliability);
117        self
118    }
119
120    /// Builds, signs, and submits a transaction in one API call.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`SubmitError`] when blockhash lookup, signing, dedupe, routing, or submission
125    /// fails.
126    pub async fn submit_builder<T>(
127        &self,
128        builder: TxBuilder,
129        signers: &T,
130        mode: SubmitMode,
131    ) -> Result<SubmitResult, SubmitError>
132    where
133        T: Signers + ?Sized,
134    {
135        let blockhash = self
136            .blockhash_provider
137            .latest_blockhash()
138            .ok_or(SubmitError::MissingRecentBlockhash)?;
139        let tx = builder
140            .build_and_sign(blockhash, signers)
141            .map_err(|source| SubmitError::Build { source })?;
142        self.submit_transaction(tx, mode).await
143    }
144
145    /// Submits one signed `VersionedTransaction`.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`SubmitError`] when encoding, dedupe, routing, or submission fails.
150    pub async fn submit_transaction(
151        &self,
152        tx: VersionedTransaction,
153        mode: SubmitMode,
154    ) -> Result<SubmitResult, SubmitError> {
155        let signature = tx.signatures.first().copied();
156        let tx_bytes =
157            bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
158        self.submit_bytes(tx_bytes, signature, mode).await
159    }
160
161    /// Submits externally signed transaction bytes.
162    ///
163    /// # Errors
164    ///
165    /// Returns [`SubmitError`] when decoding, dedupe, routing, or submission fails.
166    pub async fn submit_signed(
167        &self,
168        signed_tx: SignedTx,
169        mode: SubmitMode,
170    ) -> Result<SubmitResult, SubmitError> {
171        let tx_bytes = match signed_tx {
172            SignedTx::VersionedTransactionBytes(bytes) => bytes,
173            SignedTx::WireTransactionBytes(bytes) => bytes,
174        };
175        let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
176            .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
177        let signature = tx.signatures.first().copied();
178        self.submit_bytes(tx_bytes, signature, mode).await
179    }
180
181    /// Submits raw tx bytes after dedupe check.
182    async fn submit_bytes(
183        &self,
184        tx_bytes: Vec<u8>,
185        signature: Option<Signature>,
186        mode: SubmitMode,
187    ) -> Result<SubmitResult, SubmitError> {
188        self.enforce_dedupe(signature)?;
189        match mode {
190            SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
191            SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
192            SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
193        }
194    }
195
196    /// Applies signature dedupe policy.
197    fn enforce_dedupe(&self, signature: Option<Signature>) -> Result<(), SubmitError> {
198        if let Some(signature) = signature {
199            let now = Instant::now();
200            let mut deduper =
201                self.deduper
202                    .lock()
203                    .map_err(|poisoned| SubmitError::InternalSync {
204                        message: poisoned.to_string(),
205                    })?;
206            if !deduper.check_and_insert(signature, now) {
207                return Err(SubmitError::DuplicateSignature);
208            }
209        }
210        Ok(())
211    }
212
213    /// Submits through RPC path only.
214    async fn submit_rpc_only(
215        &self,
216        tx_bytes: Vec<u8>,
217        signature: Option<Signature>,
218        mode: SubmitMode,
219    ) -> Result<SubmitResult, SubmitError> {
220        let rpc = self
221            .rpc_transport
222            .as_ref()
223            .ok_or(SubmitError::MissingRpcTransport)?;
224        let rpc_signature = rpc
225            .submit_rpc(&tx_bytes, &self.rpc_config)
226            .await
227            .map_err(|source| SubmitError::Rpc { source })?;
228        Ok(SubmitResult {
229            signature,
230            mode,
231            direct_target: None,
232            rpc_signature: Some(rpc_signature),
233            used_rpc_fallback: false,
234        })
235    }
236
237    /// Submits through direct path only.
238    async fn submit_direct_only(
239        &self,
240        tx_bytes: Vec<u8>,
241        signature: Option<Signature>,
242        mode: SubmitMode,
243    ) -> Result<SubmitResult, SubmitError> {
244        let direct = self
245            .direct_transport
246            .as_ref()
247            .ok_or(SubmitError::MissingDirectTransport)?;
248        let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
249        if targets.is_empty() {
250            return Err(SubmitError::NoDirectTargets);
251        }
252        let direct_config = self.direct_config.clone().normalized();
253        let target = direct
254            .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
255            .await
256            .map_err(|source| SubmitError::Direct { source })?;
257        Ok(SubmitResult {
258            signature,
259            mode,
260            direct_target: Some(target),
261            rpc_signature: None,
262            used_rpc_fallback: false,
263        })
264    }
265
266    /// Submits through hybrid mode (direct first, RPC fallback).
267    async fn submit_hybrid(
268        &self,
269        tx_bytes: Vec<u8>,
270        signature: Option<Signature>,
271        mode: SubmitMode,
272    ) -> Result<SubmitResult, SubmitError> {
273        let direct = self
274            .direct_transport
275            .as_ref()
276            .ok_or(SubmitError::MissingDirectTransport)?;
277        let rpc = self
278            .rpc_transport
279            .as_ref()
280            .ok_or(SubmitError::MissingRpcTransport)?;
281
282        let direct_config = self.direct_config.clone().normalized();
283        let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
284        if !targets.is_empty() {
285            for _ in 0..direct_config.hybrid_direct_attempts {
286                if let Ok(target) = direct
287                    .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
288                    .await
289                {
290                    return Ok(SubmitResult {
291                        signature,
292                        mode,
293                        direct_target: Some(target),
294                        rpc_signature: None,
295                        used_rpc_fallback: false,
296                    });
297                }
298            }
299        }
300
301        let rpc_signature = rpc
302            .submit_rpc(&tx_bytes, &self.rpc_config)
303            .await
304            .map_err(|source| SubmitError::Rpc { source })?;
305        Ok(SubmitResult {
306            signature,
307            mode,
308            direct_target: None,
309            rpc_signature: Some(rpc_signature),
310            used_rpc_fallback: true,
311        })
312    }
313}