layer_climb_core/
transaction.rs

1use crate::prelude::*;
2use crate::querier::tx::AnyTxResponse;
3use crate::signing::middleware::{SigningMiddlewareMapBody, SigningMiddlewareMapResp};
4use std::sync::{
5    atomic::{AtomicBool, AtomicU64},
6    Arc,
7};
8
9use layer_climb_signer::TxSigner;
10
11pub struct TxBuilder<'a> {
12    pub querier: &'a QueryClient,
13    pub signer: &'a dyn TxSigner,
14
15    /// Must be set if not providing a `sequence` or `account_number`
16    pub sender: Option<Address>,
17
18    /// how many blocks until a tx is considered invalid
19    /// if not set, the default is 10 blocks
20    pub tx_timeout_blocks: Option<u64>,
21    /// for manually overriding the sequence number, e.g. parallel transactions (multiple *messages* in a tx do not need this)
22    pub sequence_strategy: Option<SequenceStrategy>,
23
24    /// The account number of the sender. If not set, it will be derived from the sender's account
25    pub account_number: Option<u64>,
26
27    pub memo: Option<String>,
28
29    /// The gas coin to use. Gas price (in gas_coin.denom) = gas_coin.amount * gas_units
30    /// If not set, it will be derived from querier.chain_config (without hitting the network)
31    pub gas_coin: Option<layer_climb_proto::Coin>,
32
33    /// The maximum gas units. Gas price (in gas_coin.denom) = gas_coin.amount * gas_units
34    /// If not set, it will be derived from running an on-chain simulation multiplied by `gas_multiplier`
35    pub gas_units_or_simulate: Option<u64>,
36
37    /// A multiplier to use for simulated gas units.
38    /// If not set, a default of 1.5 will be used.
39    pub gas_simulate_multiplier: Option<f32>,
40
41    /// The broadcast mode to use. If not set, the default is `Sync`
42    pub broadcast_mode: Option<layer_climb_proto::tx::BroadcastMode>,
43
44    /// Whether broadcasting should poll for the tx landing on chain before returning
45    /// default is true
46    pub broadcast_poll: bool,
47
48    /// The duration to sleep between polling for the tx landing on chain
49    /// If not set, the default is 1 second
50    pub broadcast_poll_sleep_duration: Option<std::time::Duration>,
51
52    /// The duration to wait before giving up on polling for the tx landing on chain
53    /// If not set, the default is 30 seconds
54    pub broadcast_poll_timeout_duration: Option<std::time::Duration>,
55
56    /// Middleware to run before the tx is broadcast
57    pub middleware_map_body: Option<Arc<Vec<SigningMiddlewareMapBody>>>,
58
59    /// Middleware to run after the tx is broadcast
60    pub middleware_map_resp: Option<Arc<Vec<SigningMiddlewareMapResp>>>,
61}
62
63impl<'a> TxBuilder<'a> {
64    const DEFAULT_TX_TIMEOUT_BLOCKS: u64 = 10;
65    const DEFAULT_GAS_MULTIPLIER: f32 = 1.5;
66    const DEFAULT_BROADCAST_MODE: layer_climb_proto::tx::BroadcastMode =
67        layer_climb_proto::tx::BroadcastMode::Sync;
68    const DEFAULT_BROADCAST_POLL_SLEEP_DURATION: std::time::Duration =
69        std::time::Duration::from_secs(1);
70    const DEFAULT_BROADCAST_POLL_TIMEOUT_DURATION: std::time::Duration =
71        std::time::Duration::from_secs(30);
72
73    pub fn new(querier: &'a QueryClient, signer: &'a dyn TxSigner) -> Self {
74        Self {
75            querier,
76            signer,
77            gas_coin: None,
78            sender: None,
79            memo: None,
80            tx_timeout_blocks: None,
81            sequence_strategy: None,
82            gas_units_or_simulate: None,
83            gas_simulate_multiplier: None,
84            account_number: None,
85            broadcast_mode: None,
86            broadcast_poll: true,
87            broadcast_poll_sleep_duration: None,
88            broadcast_poll_timeout_duration: None,
89            middleware_map_body: None,
90            middleware_map_resp: None,
91        }
92    }
93
94    pub fn set_tx_timeout_blocks(&mut self, tx_timeout_blocks: u64) -> &mut Self {
95        self.tx_timeout_blocks = Some(tx_timeout_blocks);
96        self
97    }
98
99    pub fn set_memo(&mut self, memo: impl Into<String>) -> &mut Self {
100        self.memo = Some(memo.into());
101        self
102    }
103
104    pub fn set_sequence_strategy(&mut self, sequence_strategy: SequenceStrategy) -> &mut Self {
105        self.sequence_strategy = Some(sequence_strategy);
106        self
107    }
108
109    pub fn set_sender(&mut self, sender: Address) -> &mut Self {
110        self.sender = Some(sender);
111        self
112    }
113
114    pub fn set_gas_coin(&mut self, gas_coin: layer_climb_proto::Coin) -> &mut Self {
115        self.gas_coin = Some(gas_coin);
116        self
117    }
118
119    pub fn set_gas_units_or_simulate(&mut self, gas_units: Option<u64>) -> &mut Self {
120        self.gas_units_or_simulate = gas_units;
121        self
122    }
123
124    pub fn set_gas_simulate_multiplier(&mut self, gas_multiplier: f32) -> &mut Self {
125        self.gas_simulate_multiplier = Some(gas_multiplier);
126        self
127    }
128
129    pub fn set_account_number(&mut self, account_number: u64) -> &mut Self {
130        self.account_number = Some(account_number);
131        self
132    }
133
134    pub fn set_broadcast_mode(
135        &mut self,
136        broadcast_mode: layer_climb_proto::tx::BroadcastMode,
137    ) -> &mut Self {
138        self.broadcast_mode = Some(broadcast_mode);
139        self
140    }
141
142    pub fn set_broadcast_poll(&mut self, broadcast_poll: bool) -> &mut Self {
143        self.broadcast_poll = broadcast_poll;
144        self
145    }
146
147    pub fn set_broadcast_poll_sleep_duration(
148        &mut self,
149        broadcast_poll_sleep_duration: std::time::Duration,
150    ) -> &mut Self {
151        self.broadcast_poll_sleep_duration = Some(broadcast_poll_sleep_duration);
152        self
153    }
154
155    pub fn set_broadcast_poll_timeout_duration(
156        &mut self,
157        broadcast_poll_timeout_duration: std::time::Duration,
158    ) -> &mut Self {
159        self.broadcast_poll_timeout_duration = Some(broadcast_poll_timeout_duration);
160        self
161    }
162
163    pub fn set_middleware_map_body(
164        &mut self,
165        middleware_map_body: Arc<Vec<SigningMiddlewareMapBody>>,
166    ) -> &mut Self {
167        self.middleware_map_body = Some(middleware_map_body);
168        self
169    }
170
171    pub fn set_middleware_map_resp(
172        &mut self,
173        middleware_map_resp: Arc<Vec<SigningMiddlewareMapResp>>,
174    ) -> &mut Self {
175        self.middleware_map_resp = Some(middleware_map_resp);
176        self
177    }
178
179    async fn query_base_account(&self) -> Result<layer_climb_proto::auth::BaseAccount> {
180        self.querier
181            .base_account(
182                self.sender
183                    .as_ref()
184                    .with_context(|| "must provide a sender if no sequence")?,
185            )
186            .await
187    }
188
189    pub async fn broadcast(
190        self,
191        messages: impl IntoIterator<Item = layer_climb_proto::Any>,
192    ) -> Result<layer_climb_proto::abci::TxResponse> {
193        let messages = messages.into_iter().collect();
194        let resp = self.broadcast_raw(messages).await?;
195
196        match resp {
197            AnyTxResponse::Abci(tx_response) => Ok(tx_response),
198            AnyTxResponse::Rpc(_) => Err(anyhow!(
199                "Unexpected AnyTxResponse type - did you mean to call broadcast_raw instead?"
200            )),
201        }
202    }
203
204    pub async fn simulate_gas(
205        &self,
206        signer_info: layer_climb_proto::tx::SignerInfo,
207        account_number: u64,
208        // mutable so we can set the timeout_height here
209        tx_body: &mut layer_climb_proto::tx::TxBody,
210    ) -> Result<layer_climb_proto::abci::GasInfo> {
211        let fee = FeeCalculation::Simulation {
212            chain_config: &self.querier.chain_config,
213        }
214        .calculate()?;
215
216        let simulate_tx_resp = self
217            .querier
218            .simulate_tx(
219                self.sign_tx(signer_info, account_number, tx_body, fee, true)
220                    .await?,
221            )
222            .await?;
223
224        simulate_tx_resp
225            .gas_info
226            .context("unable to get gas from simulation")
227    }
228
229    pub async fn current_sequence(&self) -> Result<u64> {
230        let sequence = match &self.sequence_strategy {
231            Some(sequence_strategy) => match sequence_strategy.kind {
232                SequenceStrategyKind::Query => {
233                    let base_account = self.query_base_account().await?;
234                    base_account.sequence
235                }
236                SequenceStrategyKind::QueryAndIncrement => {
237                    if !sequence_strategy
238                        .has_queried
239                        .load(std::sync::atomic::Ordering::SeqCst)
240                    {
241                        let base_account = self.query_base_account().await?;
242                        sequence_strategy
243                            .has_queried
244                            .store(true, std::sync::atomic::Ordering::SeqCst);
245                        sequence_strategy
246                            .value
247                            .store(base_account.sequence, std::sync::atomic::Ordering::SeqCst);
248                        base_account.sequence
249                    } else {
250                        sequence_strategy
251                            .value
252                            .load(std::sync::atomic::Ordering::SeqCst)
253                    }
254                }
255                SequenceStrategyKind::SetAndIncrement(_) => sequence_strategy
256                    .value
257                    .load(std::sync::atomic::Ordering::SeqCst),
258                SequenceStrategyKind::Constant(n) => n,
259            },
260            None => {
261                let base_account = self.query_base_account().await?;
262                base_account.sequence
263            }
264        };
265
266        tracing::debug!(
267            "{} is Using sequence: {}",
268            self.signer.address(&self.querier.chain_config).await?,
269            sequence
270        );
271
272        Ok(sequence)
273    }
274
275    /// Typically do _not_ want to do this directly, use `broadcast` instead
276    /// however, in a case where you do not want to wait for the tx to be committed, you can use this
277    /// (and if the original tx response is AnyTxResponse::Rpc, it will stay that way)
278    pub async fn broadcast_raw(
279        self,
280        messages: Vec<layer_climb_proto::Any>,
281    ) -> Result<AnyTxResponse> {
282        let account_number = match self.account_number {
283            Some(account_number) => account_number,
284            None => self.query_base_account().await?.account_number,
285        };
286
287        let mut body = layer_climb_proto::tx::TxBody {
288            messages,
289            memo: self.memo.as_deref().unwrap_or("").to_string(),
290            timeout_height: 0, // will be set later so we don't get delayed by other async calls before we send
291            extension_options: Default::default(),
292            non_critical_extension_options: Default::default(),
293        };
294
295        if let Some(middleware) = self.middleware_map_body.as_ref() {
296            for middleware in middleware.iter() {
297                body = match middleware.map_body(body).await {
298                    Ok(req) => req,
299                    Err(e) => return Err(e),
300                }
301            }
302        }
303
304        let gas_units = match self.gas_units_or_simulate {
305            Some(gas_units) => gas_units,
306            None => {
307                let gas_multiplier = self
308                    .gas_simulate_multiplier
309                    .unwrap_or(Self::DEFAULT_GAS_MULTIPLIER);
310
311                let signer_info = self
312                    .signer
313                    .signer_info(
314                        self.current_sequence().await?,
315                        layer_climb_proto::tx::signing::SignMode::Unspecified,
316                    )
317                    .await?;
318
319                let gas_info = self
320                    .simulate_gas(signer_info, account_number, &mut body)
321                    .await?;
322
323                (gas_info.gas_used as f32 * gas_multiplier).ceil() as u64
324            }
325        };
326
327        let fee = match self.gas_coin.clone() {
328            Some(gas_coin) => FeeCalculation::RealCoin {
329                gas_coin,
330                gas_units,
331            }
332            .calculate()?,
333            None => FeeCalculation::RealNetwork {
334                chain_config: &self.querier.chain_config,
335                gas_units,
336            }
337            .calculate()?,
338        };
339
340        let signer_info = self
341            .signer
342            .signer_info(
343                self.current_sequence().await?,
344                layer_climb_proto::tx::signing::SignMode::Direct,
345            )
346            .await?;
347
348        let tx_bytes = self
349            .sign_tx(signer_info, account_number, &mut body, fee, false)
350            .await?;
351        let broadcast_mode = self.broadcast_mode.unwrap_or(Self::DEFAULT_BROADCAST_MODE);
352
353        let tx_response = self
354            .querier
355            .broadcast_tx_bytes(tx_bytes, broadcast_mode)
356            .await?;
357
358        if tx_response.code() != 0 {
359            bail!(
360                "tx failed with code: {}, codespace: {}, raw_log: {}",
361                tx_response.code(),
362                tx_response.codespace(),
363                tx_response.raw_log()
364            );
365        }
366
367        let mut tx_response = if self.broadcast_poll {
368            let sleep_duration = self
369                .broadcast_poll_sleep_duration
370                .unwrap_or(Self::DEFAULT_BROADCAST_POLL_SLEEP_DURATION);
371            let timeout_duration = self
372                .broadcast_poll_timeout_duration
373                .unwrap_or(Self::DEFAULT_BROADCAST_POLL_TIMEOUT_DURATION);
374
375            AnyTxResponse::Abci(
376                self.querier
377                    .poll_until_tx_ready(tx_response.tx_hash(), sleep_duration, timeout_duration)
378                    .await?
379                    .tx_response,
380            )
381        } else {
382            tx_response
383        };
384
385        if tx_response.code() != 0 {
386            bail!(
387                "tx failed with code: {}, codespace: {}, raw_log: {}",
388                tx_response.code(),
389                tx_response.codespace(),
390                tx_response.raw_log()
391            );
392        }
393
394        // TODO not sure about this... should increase even if failed?
395        if let Some(sequence) = self.sequence_strategy {
396            match sequence.kind {
397                SequenceStrategyKind::QueryAndIncrement => {
398                    sequence
399                        .value
400                        .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
401                }
402                SequenceStrategyKind::SetAndIncrement(_) => {
403                    sequence
404                        .value
405                        .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
406                }
407                _ => {}
408            }
409        }
410
411        if let Some(middleware) = self.middleware_map_resp.as_ref() {
412            for middleware in middleware.iter() {
413                tx_response = match middleware.map_resp(tx_response).await {
414                    Ok(req) => req,
415                    Err(e) => return Err(e),
416                }
417            }
418        }
419
420        Ok(tx_response)
421    }
422
423    async fn sign_tx(
424        &self,
425        signer_info: layer_climb_proto::tx::SignerInfo,
426        account_number: u64,
427        // mutable so we can set the timeout_height here
428        body: &mut layer_climb_proto::tx::TxBody,
429        fee: layer_climb_proto::tx::Fee,
430        simulate_only: bool,
431    ) -> Result<Vec<u8>> {
432        #[allow(deprecated)]
433        let auth_info = layer_climb_proto::tx::AuthInfo {
434            signer_infos: vec![signer_info],
435            fee: Some(fee),
436            tip: None,
437        };
438
439        let block_height = self.querier.block_height().await?;
440
441        let tx_timeout_blocks = self
442            .tx_timeout_blocks
443            .unwrap_or(Self::DEFAULT_TX_TIMEOUT_BLOCKS);
444
445        // latest possible time we can grab the current block height
446        body.timeout_height = block_height + tx_timeout_blocks;
447
448        let sign_doc = layer_climb_proto::tx::SignDoc {
449            body_bytes: proto_into_bytes(body)?,
450            auth_info_bytes: proto_into_bytes(&auth_info)?,
451            chain_id: self.querier.chain_config.chain_id.to_string(),
452            account_number,
453        };
454
455        let signature = match simulate_only {
456            true => Vec::new(),
457            false => self.signer.sign(&sign_doc).await?,
458        };
459
460        let tx_raw = layer_climb_proto::tx::TxRaw {
461            body_bytes: sign_doc.body_bytes.clone(),
462            auth_info_bytes: sign_doc.auth_info_bytes.clone(),
463            signatures: vec![signature],
464        };
465
466        proto_into_bytes(&tx_raw)
467    }
468}
469
470#[derive(Clone, Debug)]
471pub struct SequenceStrategy {
472    pub kind: SequenceStrategyKind,
473    pub value: Arc<AtomicU64>,
474    pub has_queried: Arc<AtomicBool>,
475}
476
477impl SequenceStrategy {
478    pub fn new(kind: SequenceStrategyKind) -> Self {
479        Self {
480            value: Arc::new(AtomicU64::new(match kind {
481                SequenceStrategyKind::Query => 0,             // will be ignored
482                SequenceStrategyKind::QueryAndIncrement => 0, // will be ignored
483                SequenceStrategyKind::SetAndIncrement(n) => n,
484                SequenceStrategyKind::Constant(n) => n,
485            })),
486            kind,
487            has_queried: Arc::new(AtomicBool::new(false)),
488        }
489    }
490}
491
492#[derive(Clone, Debug)]
493pub enum SequenceStrategyKind {
494    /// Always query
495    Query,
496    /// Query the first time, and then increment each successful tx
497    QueryAndIncrement,
498    /// Set to this the first time, and then increment each successful tx
499    SetAndIncrement(u64),
500    /// Set to this each time
501    Constant(u64),
502}
503
504pub enum FeeCalculation<'a> {
505    Simulation {
506        chain_config: &'a ChainConfig,
507    },
508    RealNetwork {
509        chain_config: &'a ChainConfig,
510        gas_units: u64,
511    },
512    RealCoin {
513        gas_coin: layer_climb_proto::Coin,
514        gas_units: u64,
515    },
516}
517
518impl FeeCalculation<'_> {
519    pub fn calculate(&self) -> Result<layer_climb_proto::tx::Fee> {
520        let (gas_coin, gas_limit) = match self {
521            Self::Simulation { chain_config } => (new_coin(0, &chain_config.gas_denom), 0),
522            Self::RealNetwork {
523                chain_config,
524                gas_units,
525            } => {
526                let amount = (chain_config.gas_price * *gas_units as f32).ceil() as u128;
527                (new_coin(amount, &chain_config.gas_denom), *gas_units)
528            }
529            Self::RealCoin {
530                gas_coin,
531                gas_units,
532            } => (gas_coin.clone(), *gas_units),
533        };
534
535        Ok(layer_climb_proto::tx::Fee {
536            amount: vec![gas_coin],
537            gas_limit,
538            payer: "".to_string(),
539            granter: "".to_string(),
540        })
541    }
542}