1use std::{
4 sync::Arc,
5 time::{Duration, Instant},
6};
7
8use solana_signature::Signature;
9use solana_signer::signers::Signers;
10use solana_transaction::versioned::VersionedTransaction;
11
12use super::{
13 DirectSubmitConfig, DirectSubmitTransport, RpcSubmitConfig, RpcSubmitTransport, SignedTx,
14 SubmitError, SubmitMode, SubmitReliability, SubmitResult,
15};
16use crate::{
17 builder::TxBuilder,
18 providers::{LeaderProvider, LeaderTarget, RecentBlockhashProvider},
19 routing::{RoutingPolicy, SignatureDeduper, select_targets},
20};
21
22pub struct TxSubmitClient {
24 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
26 leader_provider: Arc<dyn LeaderProvider>,
28 backups: Vec<LeaderTarget>,
30 policy: RoutingPolicy,
32 deduper: SignatureDeduper,
34 rpc_transport: Option<Arc<dyn RpcSubmitTransport>>,
36 direct_transport: Option<Arc<dyn DirectSubmitTransport>>,
38 rpc_config: RpcSubmitConfig,
40 direct_config: DirectSubmitConfig,
42}
43
44impl TxSubmitClient {
45 #[must_use]
47 pub fn new(
48 blockhash_provider: Arc<dyn RecentBlockhashProvider>,
49 leader_provider: Arc<dyn LeaderProvider>,
50 ) -> Self {
51 Self {
52 blockhash_provider,
53 leader_provider,
54 backups: Vec::new(),
55 policy: RoutingPolicy::default(),
56 deduper: SignatureDeduper::new(Duration::from_secs(10)),
57 rpc_transport: None,
58 direct_transport: None,
59 rpc_config: RpcSubmitConfig::default(),
60 direct_config: DirectSubmitConfig::default(),
61 }
62 }
63
64 #[must_use]
66 pub fn with_backups(mut self, backups: Vec<LeaderTarget>) -> Self {
67 self.backups = backups;
68 self
69 }
70
71 #[must_use]
73 pub fn with_routing_policy(mut self, policy: RoutingPolicy) -> Self {
74 self.policy = policy.normalized();
75 self
76 }
77
78 #[must_use]
80 pub fn with_dedupe_ttl(mut self, ttl: Duration) -> Self {
81 self.deduper = SignatureDeduper::new(ttl);
82 self
83 }
84
85 #[must_use]
87 pub fn with_rpc_transport(mut self, transport: Arc<dyn RpcSubmitTransport>) -> Self {
88 self.rpc_transport = Some(transport);
89 self
90 }
91
92 #[must_use]
94 pub fn with_direct_transport(mut self, transport: Arc<dyn DirectSubmitTransport>) -> Self {
95 self.direct_transport = Some(transport);
96 self
97 }
98
99 #[must_use]
101 pub fn with_rpc_config(mut self, config: RpcSubmitConfig) -> Self {
102 self.rpc_config = config;
103 self
104 }
105
106 #[must_use]
108 pub const fn with_direct_config(mut self, config: DirectSubmitConfig) -> Self {
109 self.direct_config = config.normalized();
110 self
111 }
112
113 #[must_use]
115 pub const fn with_reliability(mut self, reliability: SubmitReliability) -> Self {
116 self.direct_config = DirectSubmitConfig::from_reliability(reliability);
117 self
118 }
119
120 pub async fn submit_builder<T>(
127 &mut self,
128 builder: TxBuilder,
129 signers: &T,
130 mode: SubmitMode,
131 ) -> Result<SubmitResult, SubmitError>
132 where
133 T: Signers + ?Sized,
134 {
135 let blockhash = self
136 .blockhash_provider
137 .latest_blockhash()
138 .ok_or(SubmitError::MissingRecentBlockhash)?;
139 let tx = builder
140 .build_and_sign(blockhash, signers)
141 .map_err(|source| SubmitError::Build { source })?;
142 self.submit_transaction(tx, mode).await
143 }
144
145 pub async fn submit_transaction(
151 &mut self,
152 tx: VersionedTransaction,
153 mode: SubmitMode,
154 ) -> Result<SubmitResult, SubmitError> {
155 let signature = tx.signatures.first().copied();
156 let tx_bytes =
157 bincode::serialize(&tx).map_err(|source| SubmitError::DecodeSignedBytes { source })?;
158 self.submit_bytes(tx_bytes, signature, mode).await
159 }
160
161 pub async fn submit_signed(
167 &mut self,
168 signed_tx: SignedTx,
169 mode: SubmitMode,
170 ) -> Result<SubmitResult, SubmitError> {
171 let tx_bytes = match signed_tx {
172 SignedTx::VersionedTransactionBytes(bytes) => bytes,
173 SignedTx::WireTransactionBytes(bytes) => bytes,
174 };
175 let tx: VersionedTransaction = bincode::deserialize(&tx_bytes)
176 .map_err(|source| SubmitError::DecodeSignedBytes { source })?;
177 let signature = tx.signatures.first().copied();
178 self.submit_bytes(tx_bytes, signature, mode).await
179 }
180
181 async fn submit_bytes(
183 &mut self,
184 tx_bytes: Vec<u8>,
185 signature: Option<Signature>,
186 mode: SubmitMode,
187 ) -> Result<SubmitResult, SubmitError> {
188 self.enforce_dedupe(signature)?;
189 match mode {
190 SubmitMode::RpcOnly => self.submit_rpc_only(tx_bytes, signature, mode).await,
191 SubmitMode::DirectOnly => self.submit_direct_only(tx_bytes, signature, mode).await,
192 SubmitMode::Hybrid => self.submit_hybrid(tx_bytes, signature, mode).await,
193 }
194 }
195
196 fn enforce_dedupe(&mut self, signature: Option<Signature>) -> Result<(), SubmitError> {
198 if let Some(signature) = signature {
199 let now = Instant::now();
200 if !self.deduper.check_and_insert(signature, now) {
201 return Err(SubmitError::DuplicateSignature);
202 }
203 }
204 Ok(())
205 }
206
207 async fn submit_rpc_only(
209 &self,
210 tx_bytes: Vec<u8>,
211 signature: Option<Signature>,
212 mode: SubmitMode,
213 ) -> Result<SubmitResult, SubmitError> {
214 let rpc = self
215 .rpc_transport
216 .as_ref()
217 .ok_or(SubmitError::MissingRpcTransport)?;
218 let rpc_signature = rpc
219 .submit_rpc(&tx_bytes, &self.rpc_config)
220 .await
221 .map_err(|source| SubmitError::Rpc { source })?;
222 Ok(SubmitResult {
223 signature,
224 mode,
225 direct_target: None,
226 rpc_signature: Some(rpc_signature),
227 used_rpc_fallback: false,
228 })
229 }
230
231 async fn submit_direct_only(
233 &self,
234 tx_bytes: Vec<u8>,
235 signature: Option<Signature>,
236 mode: SubmitMode,
237 ) -> Result<SubmitResult, SubmitError> {
238 let direct = self
239 .direct_transport
240 .as_ref()
241 .ok_or(SubmitError::MissingDirectTransport)?;
242 let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
243 if targets.is_empty() {
244 return Err(SubmitError::NoDirectTargets);
245 }
246 let direct_config = self.direct_config.clone().normalized();
247 let target = direct
248 .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
249 .await
250 .map_err(|source| SubmitError::Direct { source })?;
251 Ok(SubmitResult {
252 signature,
253 mode,
254 direct_target: Some(target),
255 rpc_signature: None,
256 used_rpc_fallback: false,
257 })
258 }
259
260 async fn submit_hybrid(
262 &self,
263 tx_bytes: Vec<u8>,
264 signature: Option<Signature>,
265 mode: SubmitMode,
266 ) -> Result<SubmitResult, SubmitError> {
267 let direct = self
268 .direct_transport
269 .as_ref()
270 .ok_or(SubmitError::MissingDirectTransport)?;
271 let rpc = self
272 .rpc_transport
273 .as_ref()
274 .ok_or(SubmitError::MissingRpcTransport)?;
275
276 let direct_config = self.direct_config.clone().normalized();
277 let targets = select_targets(self.leader_provider.as_ref(), &self.backups, self.policy);
278 if !targets.is_empty() {
279 for _ in 0..direct_config.hybrid_direct_attempts {
280 if let Ok(target) = direct
281 .submit_direct(&tx_bytes, &targets, self.policy, &direct_config)
282 .await
283 {
284 return Ok(SubmitResult {
285 signature,
286 mode,
287 direct_target: Some(target),
288 rpc_signature: None,
289 used_rpc_fallback: false,
290 });
291 }
292 }
293 }
294
295 let rpc_signature = rpc
296 .submit_rpc(&tx_bytes, &self.rpc_config)
297 .await
298 .map_err(|source| SubmitError::Rpc { source })?;
299 Ok(SubmitResult {
300 signature,
301 mode,
302 direct_target: None,
303 rpc_signature: Some(rpc_signature),
304 used_rpc_fallback: true,
305 })
306 }
307}