layer_climb_core/
pool.rs

1use std::sync::atomic::AtomicU32;
2
3use crate::{cache::ClimbCache, querier::Connection, signing::SigningClient};
4use anyhow::{bail, Result};
5use deadpool::managed::{Manager, Metrics, RecycleResult};
6use layer_climb_address::*;
7use layer_climb_config::ChainConfig;
8use tokio::sync::Mutex;
9
10/// Currently this only works with mnemonic phrases
11pub struct SigningClientPoolManager {
12    pub mnemonic: String,
13    pub derivation_index: AtomicU32,
14    pub chain_config: ChainConfig,
15    pub balance_maintainer: Option<BalanceMaintainer>,
16    pub cache: ClimbCache,
17    pub connection: Connection,
18}
19
20impl SigningClientPoolManager {
21    pub fn new_mnemonic(
22        mnemonic: String,
23        chain_config: ChainConfig,
24        start_index: Option<u32>,
25        connection: Option<Connection>,
26    ) -> Self {
27        let connection = connection.unwrap_or_default();
28        Self {
29            mnemonic,
30            chain_config,
31            derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
32            balance_maintainer: None,
33            cache: ClimbCache::new(connection.rpc.clone()),
34            connection,
35        }
36    }
37
38    // Setting this has a few implications:
39    // 1. on each client hand-out, it will query for the balance (no locking at all, just another query)
40    // 2. if the balance is below the threshhold set here, then it will lock the funding client for the transfer
41    //
42    // in other words, while the pool itself is completely async and can be parallelized, the balance maintainer
43    // does crate an async await across all clients who need to top-up an account, if they happen at the same time
44    //
45    // This isn't a major performance impact, but nevertheless,
46    // it's recommended to tune the values so that it's reasonably infrequent
47    pub async fn with_minimum_balance(
48        mut self,
49        // the minimum balance to maintain
50        // set this to as low as reasonable, to reduce unnecessary transfers
51        threshhold: u128,
52        // the amount to send to top up the account
53        // set this to as high as reasonable, to reduce unnecessary transfers
54        amount: u128,
55        // if set, it will use this client to fund the account
56        // otherwise, it will use the first derivation index, and bump it for subsequent clients
57        funder: Option<SigningClient>,
58        denom: Option<String>,
59    ) -> Result<Self> {
60        let balance_maintainer = match funder {
61            Some(funder) => BalanceMaintainer {
62                client: Mutex::new(funder),
63                threshhold,
64                amount,
65                denom,
66            },
67            None => BalanceMaintainer {
68                client: Mutex::new(self.create_client().await?),
69                threshhold,
70                amount,
71                denom,
72            },
73        };
74
75        self.balance_maintainer = Some(balance_maintainer);
76        Ok(self)
77    }
78
79    async fn create_client(&self) -> Result<SigningClient> {
80        let signer: KeySigner = match &self.chain_config.address_kind {
81            layer_climb_config::AddrKind::Cosmos { .. } => {
82                let index = self
83                    .derivation_index
84                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
85
86                KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))?
87            }
88            layer_climb_config::AddrKind::Eth => {
89                bail!("Eth address kind is not supported (yet)")
90            }
91        };
92
93        SigningClient::new_with_cache(
94            self.chain_config.clone(),
95            signer,
96            self.cache.clone(),
97            Some(self.connection.clone()),
98        )
99        .await
100    }
101
102    async fn maybe_top_up(&self, client: &SigningClient) -> Result<()> {
103        if let Some(balance_maintainer) = &self.balance_maintainer {
104            let current_balance = client
105                .querier
106                .balance(client.addr.clone(), balance_maintainer.denom.clone())
107                .await?
108                .unwrap_or_default();
109            if current_balance < balance_maintainer.threshhold {
110                let amount = balance_maintainer.amount - current_balance;
111                // just a scope to ensure we always drop the lock
112                {
113                    let funder = balance_maintainer.client.lock().await;
114
115                    tracing::debug!(
116                        "Balance on {} is {}, below {}, sending {} to top-up from {}",
117                        client.addr,
118                        current_balance,
119                        balance_maintainer.threshhold,
120                        amount,
121                        funder.addr
122                    );
123
124                    funder
125                        .transfer(
126                            amount,
127                            &client.addr,
128                            balance_maintainer.denom.as_deref(),
129                            None,
130                        )
131                        .await?;
132                }
133            }
134        }
135
136        Ok(())
137    }
138}
139
140// just a helper struct to keep track of the balance maintainer
141pub struct BalanceMaintainer {
142    client: Mutex<SigningClient>,
143    threshhold: u128,
144    amount: u128,
145    denom: Option<String>,
146}
147
148impl Manager for SigningClientPoolManager {
149    type Type = SigningClient;
150    type Error = anyhow::Error;
151
152    async fn create(&self) -> Result<SigningClient> {
153        let client = self.create_client().await?;
154        tracing::debug!("POOL CREATED CLIENT {}", client.addr);
155        self.maybe_top_up(&client).await?;
156
157        Ok(client)
158    }
159
160    async fn recycle(
161        &self,
162        client: &mut SigningClient,
163        _: &Metrics,
164    ) -> RecycleResult<anyhow::Error> {
165        tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
166        self.maybe_top_up(client).await?;
167
168        Ok(())
169    }
170}