1use std::{collections::HashMap, ops::Deref};
2
3use futures_util::{stream::FuturesOrdered, FutureExt, StreamExt};
4use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
5use solana_sdk::{
6 commitment_config::CommitmentConfig, message::VersionedMessage, packet::PACKET_DATA_SIZE,
7 pubkey::Pubkey, signature::Signature, signer::Signer, transaction::VersionedTransaction,
8};
9
10use crate::{
11 address_lookup_table::AddressLookupTables,
12 client::SendAndConfirm,
13 cluster::Cluster,
14 instruction_group::{AtomicGroupOptions, ComputeBudgetOptions, ParallelGroupOptions},
15 signer::TransactionSigners,
16 transaction_builder::{default_before_sign, TransactionBuilder},
17 transaction_group::TransactionGroupOptions,
18 utils::{inspect_transaction, WithSlot},
19 AtomicGroup, ParallelGroup, TransactionGroup,
20};
21
22const TRANSACTION_SIZE_LIMIT: usize = PACKET_DATA_SIZE;
23pub const DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX: usize = 14;
25
26#[derive(Debug, Clone)]
28pub struct BundleOptions {
29 pub force_one_transaction: bool,
31 pub max_packet_size: Option<usize>,
33 pub max_instructions_for_one_tx: usize,
35}
36
37impl Default for BundleOptions {
38 fn default() -> Self {
39 Self {
40 force_one_transaction: false,
41 max_packet_size: None,
42 max_instructions_for_one_tx: DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct CreateBundleOptions {
50 pub cluster: Cluster,
52 pub commitment: CommitmentConfig,
54 pub options: BundleOptions,
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct SendBundleOptions {
61 pub without_compute_budget: bool,
63 pub compute_unit_price_micro_lamports: Option<u64>,
65 pub compute_unit_min_priority_lamports: Option<u64>,
68 pub continue_on_error: bool,
70 pub config: RpcSendTransactionConfig,
72 pub disable_error_tracing: bool,
74 pub inspector_cluster: Option<Cluster>,
76}
77
78pub struct BundleBuilder<'a, C> {
80 ctx: Ctx<'a, C>,
81 options: BundleOptions,
82 groups: Vec<ParallelGroup>,
83 luts: AddressLookupTables,
84}
85
86impl<C> BundleBuilder<'_, C> {
87 pub fn new(cluster: Cluster) -> Self {
89 Self::new_with_options(CreateBundleOptions {
90 cluster,
91 ..Default::default()
92 })
93 }
94
95 pub fn new_with_options(options: CreateBundleOptions) -> Self {
97 let rpc = options.cluster.rpc(options.commitment);
98
99 Self::from_rpc_client_with_options(rpc, options.options)
100 }
101
102 pub fn set_options(&mut self, options: BundleOptions) -> &mut Self {
104 self.options = options;
105 self
106 }
107
108 pub fn from_rpc_client(client: RpcClient) -> Self {
110 Self::from_rpc_client_with_options(client, Default::default())
111 }
112
113 pub fn from_rpc_client_with_options(client: RpcClient, options: BundleOptions) -> Self {
115 Self {
116 groups: Default::default(),
117 options,
118 ctx: Ctx {
119 client,
120 cfg_signers: Default::default(),
121 signers: Default::default(),
122 },
123 luts: Default::default(),
124 }
125 }
126
127 pub fn packet_size(&self) -> usize {
129 self.options
130 .max_packet_size
131 .unwrap_or(TRANSACTION_SIZE_LIMIT)
132 }
133
134 pub fn client(&self) -> &RpcClient {
136 &self.ctx.client
137 }
138
139 pub fn is_empty(&self) -> bool {
141 self.groups.is_empty()
142 }
143
144 pub fn len(&self) -> usize {
146 self.groups.iter().map(|pg| pg.len()).sum()
147 }
148
149 pub fn try_clone_empty(&self) -> crate::Result<Self> {
151 let cluster = self.ctx.client.url().parse()?;
152 let commitment = self.ctx.client.commitment();
153 Ok(Self::new_with_options(CreateBundleOptions {
154 cluster,
155 commitment,
156 options: self.options.clone(),
157 }))
158 }
159
160 pub fn push_parallel_group(&mut self, group: ParallelGroup) -> &mut Self {
162 if !group.is_empty() {
163 self.groups.push(group);
164 }
165 self
166 }
167}
168
169impl<'a, C: Deref<Target = impl Signer> + Clone> BundleBuilder<'a, C> {
170 fn register_transaction_builder(
171 &mut self,
172 txn: TransactionBuilder<'a, C>,
173 options: AtomicGroupOptions,
174 ) -> AtomicGroup {
175 txn.into_atomic_group(
176 &mut self.ctx.cfg_signers,
177 &mut self.ctx.signers,
178 &mut self.luts,
179 options,
180 )
181 }
182
183 #[allow(clippy::result_large_err)]
185 pub fn try_push_with_opts(
186 &mut self,
187 txn: TransactionBuilder<'a, C>,
188 new_transaction: bool,
189 ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
190 let ag = self.register_transaction_builder(
191 txn,
192 AtomicGroupOptions {
193 is_mergeable: !new_transaction,
194 },
195 );
196 self.push_parallel_group(ParallelGroup::with_options(
197 [ag],
198 ParallelGroupOptions {
199 is_mergeable: !new_transaction,
200 },
201 ));
202 Ok(self)
203 }
204
205 pub fn push_parallel(&mut self) -> PushParallel<'_, 'a, C> {
207 self.push_parallel_with_options(Default::default())
208 }
209
210 pub fn push_parallel_with_options(
212 &mut self,
213 options: ParallelGroupOptions,
214 ) -> PushParallel<'_, 'a, C> {
215 PushParallel::new(self, options)
216 }
217
218 #[allow(clippy::result_large_err)]
220 #[inline]
221 pub fn try_push(
222 &mut self,
223 txn: TransactionBuilder<'a, C>,
224 ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
225 self.try_push_with_opts(txn, false)
226 }
227
228 pub fn push(&mut self, txn: TransactionBuilder<'a, C>) -> crate::Result<&mut Self> {
230 self.try_push(txn).map_err(|(_, err)| err)
231 }
232
233 pub fn push_many(
235 &mut self,
236 txns: impl IntoIterator<Item = TransactionBuilder<'a, C>>,
237 new_transaction: bool,
238 ) -> crate::Result<&mut Self> {
239 for (idx, txn) in txns.into_iter().enumerate() {
240 self.try_push_with_opts(txn, (idx == 0) && new_transaction)
241 .map_err(|(_, err)| err)?;
242 }
243 Ok(self)
244 }
245
246 pub fn into_parallel_groups(self) -> Vec<ParallelGroup> {
248 self.groups
249 }
250
251 pub fn append(&mut self, other: Self, new_transaction: bool) -> crate::Result<()> {
255 let Self {
256 mut groups,
257 ctx:
258 Ctx {
259 mut cfg_signers,
260 signers,
261 ..
262 },
263 luts,
264 ..
265 } = other;
266
267 if let Some(first) = groups.first_mut() {
268 first.set_is_mergeable(first.is_mergeable() && !new_transaction);
269 }
270
271 self.groups.append(&mut groups);
272 self.ctx.cfg_signers.merge(&mut cfg_signers);
273 self.ctx.signers.extend(signers);
274 self.luts.extend(luts);
275
276 Ok(())
277 }
278
279 pub fn luts(&self) -> &AddressLookupTables {
281 &self.luts
282 }
283
284 pub fn luts_mut(&mut self) -> &mut AddressLookupTables {
286 &mut self.luts
287 }
288
289 pub fn build(self) -> crate::Result<Bundle<'a, C>> {
291 self.build_with_options(|options| {
292 Ok(TransactionGroupOptions {
293 max_transaction_size: options.max_packet_size.unwrap_or(TRANSACTION_SIZE_LIMIT),
294 max_instructions_per_tx: options.max_instructions_for_one_tx,
295 memo: None,
296 memo_signers: None,
297 extra_compute_units: None,
298 })
299 })
300 }
301
302 pub fn build_with_options(
304 self,
305 f: impl FnOnce(BundleOptions) -> crate::Result<TransactionGroupOptions>,
306 ) -> crate::Result<Bundle<'a, C>> {
307 let Self {
308 groups,
309 options,
310 ctx,
311 luts,
312 } = self;
313 let mut group = TransactionGroup::with_options_and_luts(f(options)?, luts);
314 for pg in groups {
315 group.add(pg)?;
316 }
317 group.optimize(false);
318 Ok(Bundle { ctx, group })
319 }
320}
321
322struct Ctx<'a, C> {
323 client: RpcClient,
324 cfg_signers: TransactionSigners<C>,
325 signers: HashMap<Pubkey, &'a dyn Signer>,
326}
327
328pub struct PushParallel<'a, 'ctx, C> {
330 bundle: &'a mut BundleBuilder<'ctx, C>,
331 pg: Option<ParallelGroup>,
332}
333
334impl<'a, 'ctx, C> PushParallel<'a, 'ctx, C> {
335 fn new(bundle: &'a mut BundleBuilder<'ctx, C>, options: ParallelGroupOptions) -> Self {
336 Self {
337 bundle,
338 pg: Some(ParallelGroup::with_options([], options)),
339 }
340 }
341}
342
343impl<'ctx, C: Deref<Target = impl Signer> + Clone> PushParallel<'_, 'ctx, C> {
344 pub fn add_with_options(
346 &mut self,
347 txn: TransactionBuilder<'ctx, C>,
348 options: AtomicGroupOptions,
349 ) -> &mut Self {
350 let ag = self.bundle.register_transaction_builder(txn, options);
351 self.pg.as_mut().expect("the builder is dropped").add(ag);
352 self
353 }
354
355 pub fn add(&mut self, txn: TransactionBuilder<'ctx, C>) -> &mut Self {
357 self.add_with_options(txn, Default::default())
358 }
359}
360
361impl<C> Drop for PushParallel<'_, '_, C> {
362 fn drop(&mut self) {
363 if let Some(pg) = self.pg.take() {
364 self.bundle.push_parallel_group(pg);
365 }
366 }
367}
368
369pub struct Bundle<'a, C> {
371 ctx: Ctx<'a, C>,
372 group: TransactionGroup,
373}
374
375impl<C: Deref<Target = impl Signer> + Clone> Bundle<'_, C> {
376 pub fn is_empty(&self) -> bool {
378 self.group.is_empty()
379 }
380
381 pub fn len(&self) -> usize {
383 self.group.len()
384 }
385
386 pub fn into_group(self) -> TransactionGroup {
388 self.group
389 }
390
391 pub fn with_inner<'a, T>(
393 &'a self,
394 f: impl FnOnce(&'a RpcClient, &'a TransactionGroup, TransactionSigners<&'a dyn Signer>) -> T,
395 ) -> T {
396 let Self { ctx, group } = self;
397 let Ctx {
398 client,
399 cfg_signers,
400 signers,
401 } = ctx;
402
403 let mut transaction_signers = cfg_signers.to_local();
404 transaction_signers.extend(signers.clone().into_values());
405 f(client, group, transaction_signers)
406 }
407
408 pub fn estimate_execution_fee(
410 &self,
411 compute_unit_price_micro_lamports: Option<u64>,
412 compute_unit_min_priority_lamports: Option<u64>,
413 ) -> u64 {
414 self.group.estimate_execution_fee(
415 compute_unit_price_micro_lamports,
416 compute_unit_min_priority_lamports,
417 )
418 }
419
420 pub async fn send_all(
422 self,
423 skip_preflight: bool,
424 ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
425 match self
426 .send_all_with_opts(
427 SendBundleOptions {
428 config: RpcSendTransactionConfig {
429 skip_preflight,
430 ..Default::default()
431 },
432 ..Default::default()
433 },
434 default_before_sign,
435 )
436 .await
437 {
438 Ok(signatures) => Ok(signatures
439 .into_iter()
440 .map(|with_slot| with_slot.into_value())
441 .collect()),
442 Err((signatures, err)) => Err((
443 signatures
444 .into_iter()
445 .map(|with_slot| with_slot.into_value())
446 .collect(),
447 err,
448 )),
449 }
450 }
451
452 pub async fn send_all_with_opts(
454 self,
455 opts: SendBundleOptions,
456 before_sign: impl FnMut(&VersionedMessage) -> crate::Result<()>,
457 ) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
458 let SendBundleOptions {
459 without_compute_budget,
460 compute_unit_price_micro_lamports,
461 compute_unit_min_priority_lamports,
462 continue_on_error,
463 mut config,
464 disable_error_tracing,
465 inspector_cluster,
466 } = opts;
467 config.preflight_commitment = config
468 .preflight_commitment
469 .or(Some(self.ctx.client.commitment().commitment));
470
471 let Self {
472 ctx:
473 Ctx {
474 client,
475 cfg_signers,
476 signers,
477 },
478 group,
479 } = self;
480
481 let latest_hash = client
482 .get_latest_blockhash()
483 .await
484 .map_err(|err| (vec![], Box::new(err).into()))?;
485
486 let mut transaction_signers = cfg_signers.to_local();
487 transaction_signers.extend(signers.into_values());
488
489 let txns = group
490 .to_transactions_with_options(
491 &transaction_signers,
492 latest_hash,
493 false,
494 ComputeBudgetOptions {
495 without_compute_budget,
496 compute_unit_price_micro_lamports,
497 compute_unit_min_priority_lamports,
498 },
499 before_sign,
500 )
501 .collect::<crate::Result<Vec<_>>>()
502 .map_err(|err| (vec![], err))?;
503 send_all_txns(
504 &client,
505 txns,
506 config,
507 continue_on_error,
508 !disable_error_tracing,
509 inspector_cluster,
510 )
511 .await
512 }
513}
514
515async fn send_all_txns(
516 client: &RpcClient,
517 txns: Vec<Vec<VersionedTransaction>>,
518 config: RpcSendTransactionConfig,
519 continue_on_error: bool,
520 enable_tracing: bool,
521 inspector_cluster: Option<Cluster>,
522) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
523 let size = txns.iter().map(|txns| txns.len()).sum();
524 let mut signatures = Vec::with_capacity(size);
525 let mut error = None;
526 for (batch_idx, txns) in txns.into_iter().enumerate() {
527 let mut batch = txns
528 .iter().enumerate()
529 .map(|(idx, txn)| {
530 tracing::debug!(
531 %batch_idx,
532 commitment = ?client.commitment(),
533 ?config,
534 "sending transaction {idx}"
535 );
536 let inspector_cluster = inspector_cluster.clone();
537 client
538 .send_and_confirm_transaction_with_config(txn, config)
539 .then(move |res| match res {
540 Ok(signature) => {
541 std::future::ready(Ok(signature))
542 }
543 Err(err) => {
544 if enable_tracing {
545 let cluster = inspector_cluster
546 .clone()
547 .or_else(|| client.url().parse().ok());
548 let inspector_url =
549 inspect_transaction(&txn.message, cluster.as_ref(), false);
550 let hash = txn.message.recent_blockhash();
551 tracing::error!(%err, %hash, ?config, "[batch {batch_idx}] transaction {idx} failed: {inspector_url}");
552 }
553 std::future::ready(Err(err))
554 }
555 })
556 })
557 .collect::<FuturesOrdered<_>>();
558 while let Some(res) = batch.next().await {
559 match res {
560 Ok(signature) => signatures.push(signature),
561 Err(err) => {
562 error = Some(Box::new(err).into());
563 if !continue_on_error {
564 break;
565 }
566 }
567 }
568 }
569 }
570 match error {
571 None => Ok(signatures),
572 Some(err) => Err((signatures, err)),
573 }
574}