Skip to main content

gmsol_solana_utils/
bundle_builder.rs

1use std::{collections::HashMap, ops::Deref};
2
3use futures_util::{stream::FuturesOrdered, FutureExt, StreamExt};
4use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
5use solana_sdk::{
6    commitment_config::CommitmentConfig, message::VersionedMessage, packet::PACKET_DATA_SIZE,
7    pubkey::Pubkey, signature::Signature, signer::Signer, transaction::VersionedTransaction,
8};
9
10use crate::{
11    address_lookup_table::AddressLookupTables,
12    client::SendAndConfirm,
13    cluster::Cluster,
14    instruction_group::{AtomicGroupOptions, ComputeBudgetOptions, ParallelGroupOptions},
15    signer::TransactionSigners,
16    transaction_builder::{default_before_sign, TransactionBuilder},
17    transaction_group::TransactionGroupOptions,
18    utils::{inspect_transaction, WithSlot},
19    AtomicGroup, ParallelGroup, TransactionGroup,
20};
21
22const TRANSACTION_SIZE_LIMIT: usize = PACKET_DATA_SIZE;
23/// Default max instruction for one transaction.
24pub const DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX: usize = 14;
25
26/// Bundle Options.
27#[derive(Debug, Clone)]
28pub struct BundleOptions {
29    /// Whether to force one transaction.
30    pub force_one_transaction: bool,
31    /// Max packet size.
32    pub max_packet_size: Option<usize>,
33    /// Max number of instructions for one transaction.
34    pub max_instructions_for_one_tx: usize,
35}
36
37impl Default for BundleOptions {
38    fn default() -> Self {
39        Self {
40            force_one_transaction: false,
41            max_packet_size: None,
42            max_instructions_for_one_tx: DEFAULT_MAX_INSTRUCTIONS_FOR_ONE_TX,
43        }
44    }
45}
46
47/// Create Bundle Options.
48#[derive(Debug, Clone, Default)]
49pub struct CreateBundleOptions {
50    /// Cluster.
51    pub cluster: Cluster,
52    /// Commitment config.
53    pub commitment: CommitmentConfig,
54    /// Bundle options.
55    pub options: BundleOptions,
56}
57
58/// Send Bundle Options.
59#[derive(Debug, Clone, Default)]
60pub struct SendBundleOptions {
61    /// Whether to send without compute budget.
62    pub without_compute_budget: bool,
63    /// Set the compute unit price.
64    pub compute_unit_price_micro_lamports: Option<u64>,
65    /// Set the min priority lamports.
66    /// `None` means the value is left unchanged.
67    pub compute_unit_min_priority_lamports: Option<u64>,
68    /// Whether to continue on error.
69    pub continue_on_error: bool,
70    /// RPC config.
71    pub config: RpcSendTransactionConfig,
72    /// Whether to trace transaction error.
73    pub disable_error_tracing: bool,
74    /// Cluster of the inspector url.
75    pub inspector_cluster: Option<Cluster>,
76}
77
78/// Builder for transaction bundle.
79pub struct BundleBuilder<'a, C> {
80    ctx: Ctx<'a, C>,
81    options: BundleOptions,
82    groups: Vec<ParallelGroup>,
83    luts: AddressLookupTables,
84}
85
86impl<C> BundleBuilder<'_, C> {
87    /// Create a new [`BundleBuilder`] for the given cluster.
88    pub fn new(cluster: Cluster) -> Self {
89        Self::new_with_options(CreateBundleOptions {
90            cluster,
91            ..Default::default()
92        })
93    }
94
95    /// Create a new [`BundleBuilder`] with the given options.
96    pub fn new_with_options(options: CreateBundleOptions) -> Self {
97        let rpc = options.cluster.rpc(options.commitment);
98
99        Self::from_rpc_client_with_options(rpc, options.options)
100    }
101
102    /// Replaces the bundle options with the given.
103    pub fn set_options(&mut self, options: BundleOptions) -> &mut Self {
104        self.options = options;
105        self
106    }
107
108    /// Create a new [`BundleBuilder`] from [`RpcClient`].
109    pub fn from_rpc_client(client: RpcClient) -> Self {
110        Self::from_rpc_client_with_options(client, Default::default())
111    }
112
113    /// Create a new [`BundleBuilder`] from [`RpcClient`] with the given options.
114    pub fn from_rpc_client_with_options(client: RpcClient, options: BundleOptions) -> Self {
115        Self {
116            groups: Default::default(),
117            options,
118            ctx: Ctx {
119                client,
120                cfg_signers: Default::default(),
121                signers: Default::default(),
122            },
123            luts: Default::default(),
124        }
125    }
126
127    /// Get packet size.
128    pub fn packet_size(&self) -> usize {
129        self.options
130            .max_packet_size
131            .unwrap_or(TRANSACTION_SIZE_LIMIT)
132    }
133
134    /// Get the client.
135    pub fn client(&self) -> &RpcClient {
136        &self.ctx.client
137    }
138
139    /// Is empty.
140    pub fn is_empty(&self) -> bool {
141        self.groups.is_empty()
142    }
143
144    /// Get total number of transactions.
145    pub fn len(&self) -> usize {
146        self.groups.iter().map(|pg| pg.len()).sum()
147    }
148
149    /// Try clone empty.
150    pub fn try_clone_empty(&self) -> crate::Result<Self> {
151        let cluster = self.ctx.client.url().parse()?;
152        let commitment = self.ctx.client.commitment();
153        Ok(Self::new_with_options(CreateBundleOptions {
154            cluster,
155            commitment,
156            options: self.options.clone(),
157        }))
158    }
159
160    /// Push a [`ParallelGroup`].
161    pub fn push_parallel_group(&mut self, group: ParallelGroup) -> &mut Self {
162        if !group.is_empty() {
163            self.groups.push(group);
164        }
165        self
166    }
167}
168
169impl<'a, C: Deref<Target = impl Signer> + Clone> BundleBuilder<'a, C> {
170    fn register_transaction_builder(
171        &mut self,
172        txn: TransactionBuilder<'a, C>,
173        options: AtomicGroupOptions,
174    ) -> AtomicGroup {
175        txn.into_atomic_group(
176            &mut self.ctx.cfg_signers,
177            &mut self.ctx.signers,
178            &mut self.luts,
179            options,
180        )
181    }
182
183    /// Push a [`TransactionBuilder`] with options.
184    #[allow(clippy::result_large_err)]
185    pub fn try_push_with_opts(
186        &mut self,
187        txn: TransactionBuilder<'a, C>,
188        new_transaction: bool,
189    ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
190        let ag = self.register_transaction_builder(
191            txn,
192            AtomicGroupOptions {
193                is_mergeable: !new_transaction,
194            },
195        );
196        self.push_parallel_group(ParallelGroup::with_options(
197            [ag],
198            ParallelGroupOptions {
199                is_mergeable: !new_transaction,
200            },
201        ));
202        Ok(self)
203    }
204
205    /// Push multiple transactions that can be sent simultaneously.
206    pub fn push_parallel(&mut self) -> PushParallel<'_, 'a, C> {
207        self.push_parallel_with_options(Default::default())
208    }
209
210    /// Push multiple transactions that can be sent simultaneously with the given options.
211    pub fn push_parallel_with_options(
212        &mut self,
213        options: ParallelGroupOptions,
214    ) -> PushParallel<'_, 'a, C> {
215        PushParallel::new(self, options)
216    }
217
218    /// Try to push a [`TransactionBuilder`] to the builder.
219    #[allow(clippy::result_large_err)]
220    #[inline]
221    pub fn try_push(
222        &mut self,
223        txn: TransactionBuilder<'a, C>,
224    ) -> Result<&mut Self, (TransactionBuilder<'a, C>, crate::Error)> {
225        self.try_push_with_opts(txn, false)
226    }
227
228    /// Push a [`TransactionBuilder`].
229    pub fn push(&mut self, txn: TransactionBuilder<'a, C>) -> crate::Result<&mut Self> {
230        self.try_push(txn).map_err(|(_, err)| err)
231    }
232
233    /// Push [`TransactionBuilder`]s.
234    pub fn push_many(
235        &mut self,
236        txns: impl IntoIterator<Item = TransactionBuilder<'a, C>>,
237        new_transaction: bool,
238    ) -> crate::Result<&mut Self> {
239        for (idx, txn) in txns.into_iter().enumerate() {
240            self.try_push_with_opts(txn, (idx == 0) && new_transaction)
241                .map_err(|(_, err)| err)?;
242        }
243        Ok(self)
244    }
245
246    /// Returns the transaction groups.
247    pub fn into_parallel_groups(self) -> Vec<ParallelGroup> {
248        self.groups
249    }
250
251    /// Insert all the transaction groups of `other` into `self`.
252    ///
253    /// If `new_transaction` is `true`, then a new transaction will be created before pushing.
254    pub fn append(&mut self, other: Self, new_transaction: bool) -> crate::Result<()> {
255        let Self {
256            mut groups,
257            ctx:
258                Ctx {
259                    mut cfg_signers,
260                    signers,
261                    ..
262                },
263            luts,
264            ..
265        } = other;
266
267        if let Some(first) = groups.first_mut() {
268            first.set_is_mergeable(first.is_mergeable() && !new_transaction);
269        }
270
271        self.groups.append(&mut groups);
272        self.ctx.cfg_signers.merge(&mut cfg_signers);
273        self.ctx.signers.extend(signers);
274        self.luts.extend(luts);
275
276        Ok(())
277    }
278
279    /// Get the reference of the address lookup table cache.
280    pub fn luts(&self) -> &AddressLookupTables {
281        &self.luts
282    }
283
284    /// Get the mutable reference of the address lookup table cache.
285    pub fn luts_mut(&mut self) -> &mut AddressLookupTables {
286        &mut self.luts
287    }
288
289    /// Build the [`Bundle`].
290    pub fn build(self) -> crate::Result<Bundle<'a, C>> {
291        self.build_with_options(|options| {
292            Ok(TransactionGroupOptions {
293                max_transaction_size: options.max_packet_size.unwrap_or(TRANSACTION_SIZE_LIMIT),
294                max_instructions_per_tx: options.max_instructions_for_one_tx,
295                memo: None,
296                memo_signers: None,
297                extra_compute_units: None,
298            })
299        })
300    }
301
302    /// Build [`Bundle`] with options builder `f`.
303    pub fn build_with_options(
304        self,
305        f: impl FnOnce(BundleOptions) -> crate::Result<TransactionGroupOptions>,
306    ) -> crate::Result<Bundle<'a, C>> {
307        let Self {
308            groups,
309            options,
310            ctx,
311            luts,
312        } = self;
313        let mut group = TransactionGroup::with_options_and_luts(f(options)?, luts);
314        for pg in groups {
315            group.add(pg)?;
316        }
317        group.optimize(false);
318        Ok(Bundle { ctx, group })
319    }
320}
321
322struct Ctx<'a, C> {
323    client: RpcClient,
324    cfg_signers: TransactionSigners<C>,
325    signers: HashMap<Pubkey, &'a dyn Signer>,
326}
327
328/// Push multiple transactions that can be sent simultaneously to the [`BundleBuilder`].
329pub struct PushParallel<'a, 'ctx, C> {
330    bundle: &'a mut BundleBuilder<'ctx, C>,
331    pg: Option<ParallelGroup>,
332}
333
334impl<'a, 'ctx, C> PushParallel<'a, 'ctx, C> {
335    fn new(bundle: &'a mut BundleBuilder<'ctx, C>, options: ParallelGroupOptions) -> Self {
336        Self {
337            bundle,
338            pg: Some(ParallelGroup::with_options([], options)),
339        }
340    }
341}
342
343impl<'ctx, C: Deref<Target = impl Signer> + Clone> PushParallel<'_, 'ctx, C> {
344    /// Add a [`TransactionBuilder`] to the parallel group with the given options.
345    pub fn add_with_options(
346        &mut self,
347        txn: TransactionBuilder<'ctx, C>,
348        options: AtomicGroupOptions,
349    ) -> &mut Self {
350        let ag = self.bundle.register_transaction_builder(txn, options);
351        self.pg.as_mut().expect("the builder is dropped").add(ag);
352        self
353    }
354
355    /// Add a [`TransactionBuilder`] to the parallel group.
356    pub fn add(&mut self, txn: TransactionBuilder<'ctx, C>) -> &mut Self {
357        self.add_with_options(txn, Default::default())
358    }
359}
360
361impl<C> Drop for PushParallel<'_, '_, C> {
362    fn drop(&mut self) {
363        if let Some(pg) = self.pg.take() {
364            self.bundle.push_parallel_group(pg);
365        }
366    }
367}
368
369/// A bundle of transactions.
370pub struct Bundle<'a, C> {
371    ctx: Ctx<'a, C>,
372    group: TransactionGroup,
373}
374
375impl<C: Deref<Target = impl Signer> + Clone> Bundle<'_, C> {
376    /// Is empty.
377    pub fn is_empty(&self) -> bool {
378        self.group.is_empty()
379    }
380
381    /// Get total number of transactions.
382    pub fn len(&self) -> usize {
383        self.group.len()
384    }
385
386    /// Returns the inner [`TransactionGroup`].
387    pub fn into_group(self) -> TransactionGroup {
388        self.group
389    }
390
391    /// Consumes the [`Bundle`] and runs `f` with the inner [`TransactionGroup`] and [`TransactionSigners`].
392    pub fn with_inner<'a, T>(
393        &'a self,
394        f: impl FnOnce(&'a RpcClient, &'a TransactionGroup, TransactionSigners<&'a dyn Signer>) -> T,
395    ) -> T {
396        let Self { ctx, group } = self;
397        let Ctx {
398            client,
399            cfg_signers,
400            signers,
401        } = ctx;
402
403        let mut transaction_signers = cfg_signers.to_local();
404        transaction_signers.extend(signers.clone().into_values());
405        f(client, group, transaction_signers)
406    }
407
408    /// Estimate execution fee.
409    pub fn estimate_execution_fee(
410        &self,
411        compute_unit_price_micro_lamports: Option<u64>,
412        compute_unit_min_priority_lamports: Option<u64>,
413    ) -> u64 {
414        self.group.estimate_execution_fee(
415            compute_unit_price_micro_lamports,
416            compute_unit_min_priority_lamports,
417        )
418    }
419
420    /// Send all in order and returns the signatures of the success transactions.
421    pub async fn send_all(
422        self,
423        skip_preflight: bool,
424    ) -> Result<Vec<Signature>, (Vec<Signature>, crate::Error)> {
425        match self
426            .send_all_with_opts(
427                SendBundleOptions {
428                    config: RpcSendTransactionConfig {
429                        skip_preflight,
430                        ..Default::default()
431                    },
432                    ..Default::default()
433                },
434                default_before_sign,
435            )
436            .await
437        {
438            Ok(signatures) => Ok(signatures
439                .into_iter()
440                .map(|with_slot| with_slot.into_value())
441                .collect()),
442            Err((signatures, err)) => Err((
443                signatures
444                    .into_iter()
445                    .map(|with_slot| with_slot.into_value())
446                    .collect(),
447                err,
448            )),
449        }
450    }
451
452    /// Send all in order with the given options and returns the signatures of the success transactions.
453    pub async fn send_all_with_opts(
454        self,
455        opts: SendBundleOptions,
456        before_sign: impl FnMut(&VersionedMessage) -> crate::Result<()>,
457    ) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
458        let SendBundleOptions {
459            without_compute_budget,
460            compute_unit_price_micro_lamports,
461            compute_unit_min_priority_lamports,
462            continue_on_error,
463            mut config,
464            disable_error_tracing,
465            inspector_cluster,
466        } = opts;
467        config.preflight_commitment = config
468            .preflight_commitment
469            .or(Some(self.ctx.client.commitment().commitment));
470
471        let Self {
472            ctx:
473                Ctx {
474                    client,
475                    cfg_signers,
476                    signers,
477                },
478            group,
479        } = self;
480
481        let latest_hash = client
482            .get_latest_blockhash()
483            .await
484            .map_err(|err| (vec![], Box::new(err).into()))?;
485
486        let mut transaction_signers = cfg_signers.to_local();
487        transaction_signers.extend(signers.into_values());
488
489        let txns = group
490            .to_transactions_with_options(
491                &transaction_signers,
492                latest_hash,
493                false,
494                ComputeBudgetOptions {
495                    without_compute_budget,
496                    compute_unit_price_micro_lamports,
497                    compute_unit_min_priority_lamports,
498                },
499                before_sign,
500            )
501            .collect::<crate::Result<Vec<_>>>()
502            .map_err(|err| (vec![], err))?;
503        send_all_txns(
504            &client,
505            txns,
506            config,
507            continue_on_error,
508            !disable_error_tracing,
509            inspector_cluster,
510        )
511        .await
512    }
513}
514
515async fn send_all_txns(
516    client: &RpcClient,
517    txns: Vec<Vec<VersionedTransaction>>,
518    config: RpcSendTransactionConfig,
519    continue_on_error: bool,
520    enable_tracing: bool,
521    inspector_cluster: Option<Cluster>,
522) -> Result<Vec<WithSlot<Signature>>, (Vec<WithSlot<Signature>>, crate::Error)> {
523    let size = txns.iter().map(|txns| txns.len()).sum();
524    let mut signatures = Vec::with_capacity(size);
525    let mut error = None;
526    for (batch_idx, txns) in txns.into_iter().enumerate() {
527        let mut batch = txns
528            .iter().enumerate()
529            .map(|(idx, txn)| {
530                tracing::debug!(
531                    %batch_idx,
532                    commitment = ?client.commitment(),
533                    ?config,
534                    "sending transaction {idx}"
535                );
536                let inspector_cluster = inspector_cluster.clone();
537                client
538                    .send_and_confirm_transaction_with_config(txn, config)
539                    .then(move |res| match res {
540                        Ok(signature) => {
541                            std::future::ready(Ok(signature))
542                        }
543                        Err(err) => {
544                            if enable_tracing {
545                                let cluster = inspector_cluster
546                                    .clone()
547                                    .or_else(|| client.url().parse().ok());
548                                let inspector_url =
549                                    inspect_transaction(&txn.message, cluster.as_ref(), false);
550                                let hash = txn.message.recent_blockhash();
551                                tracing::error!(%err, %hash, ?config, "[batch {batch_idx}] transaction {idx} failed: {inspector_url}");
552                            }
553                            std::future::ready(Err(err))
554                        }
555                    })
556            })
557            .collect::<FuturesOrdered<_>>();
558        while let Some(res) = batch.next().await {
559            match res {
560                Ok(signature) => signatures.push(signature),
561                Err(err) => {
562                    error = Some(Box::new(err).into());
563                    if !continue_on_error {
564                        break;
565                    }
566                }
567            }
568        }
569    }
570    match error {
571        None => Ok(signatures),
572        Some(err) => Err((signatures, err)),
573    }
574}