layer_climb_core/
pool.rs

1use std::sync::atomic::AtomicU32;
2
3use crate::{
4    cache::ClimbCache,
5    querier::{Connection, QueryClient},
6    signing::SigningClient,
7};
8use anyhow::{bail, Result};
9use deadpool::managed::{Manager, Metrics, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use tokio::sync::Mutex;
13
14/// Currently this only works with mnemonic phrases
15pub struct SigningClientPoolManager {
16    pub mnemonic: String,
17    pub derivation_index: AtomicU32,
18    pub chain_config: ChainConfig,
19    pub balance_maintainer: Option<BalanceMaintainer>,
20    pub cache: ClimbCache,
21    pub connection: Connection,
22}
23
24impl SigningClientPoolManager {
25    pub fn new_mnemonic(
26        mnemonic: String,
27        chain_config: ChainConfig,
28        start_index: Option<u32>,
29        connection: Option<Connection>,
30    ) -> Self {
31        let connection = connection.unwrap_or_default();
32        Self {
33            mnemonic,
34            chain_config,
35            derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
36            balance_maintainer: None,
37            cache: ClimbCache::new(connection.rpc.clone()),
38            connection,
39        }
40    }
41
42    // Setting this has a few implications:
43    // 1. on each client hand-out, it will query for the balance (no locking at all, just another query)
44    // 2. if the balance is below the threshhold set here, then it will lock the funding client for the transfer
45    //
46    // in other words, while the pool itself is completely async and can be parallelized, the balance maintainer
47    // does crate an async await across all clients who need to top-up an account, if they happen at the same time
48    //
49    // This isn't a major performance impact, but nevertheless,
50    // it's recommended to tune the values so that it's reasonably infrequent
51    pub async fn with_minimum_balance(
52        mut self,
53        // the minimum balance to maintain
54        // set this to as low as reasonable, to reduce unnecessary transfers
55        threshhold: u128,
56        // the amount to send to top up the account
57        // set this to as high as reasonable, to reduce unnecessary transfers
58        amount: u128,
59        // if set, it will use this client to fund the account
60        // otherwise, it will use the first derivation index, and bump it for subsequent clients
61        funder: Option<SigningClient>,
62        denom: Option<String>,
63    ) -> Result<Self> {
64        // keep a separate query client so we can get balances
65        // without locking the funder client
66        let query_client =
67            QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
68
69        let balance_maintainer = match funder {
70            Some(funder) => BalanceMaintainer {
71                client: Mutex::new(funder),
72                query_client,
73                threshhold,
74                amount,
75                denom,
76            },
77            None => BalanceMaintainer {
78                client: Mutex::new(self.create_client(None).await?),
79                query_client,
80                threshhold,
81                amount,
82                denom,
83            },
84        };
85
86        self.balance_maintainer = Some(balance_maintainer);
87        Ok(self)
88    }
89
90    fn create_signer(&self) -> Result<KeySigner> {
91        match &self.chain_config.address_kind {
92            layer_climb_config::AddrKind::Cosmos { .. } => {
93                let index = self
94                    .derivation_index
95                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
96
97                KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
98            }
99            layer_climb_config::AddrKind::Evm => {
100                bail!("EVM address kind is not supported (yet)")
101            }
102        }
103    }
104
105    async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
106        let signer: KeySigner = match signer {
107            Some(signer) => signer,
108            None => self.create_signer()?,
109        };
110
111        SigningClient::new_with_cache(
112            self.chain_config.clone(),
113            signer,
114            self.cache.clone(),
115            Some(self.connection.clone()),
116        )
117        .await
118    }
119
120    async fn maybe_top_up(&self, addr: Address) -> Result<()> {
121        if let Some(balance_maintainer) = &self.balance_maintainer {
122            let current_balance = balance_maintainer
123                .query_client
124                .balance(addr.clone(), balance_maintainer.denom.clone())
125                .await?
126                .unwrap_or_default();
127            if current_balance < balance_maintainer.threshhold {
128                let amount = balance_maintainer.amount - current_balance;
129                // just a scope to ensure we always drop the lock
130                {
131                    let funder = balance_maintainer.client.lock().await;
132
133                    tracing::debug!(
134                        "Balance on {} is {}, below {}, sending {} to top-up from {}",
135                        addr,
136                        current_balance,
137                        balance_maintainer.threshhold,
138                        amount,
139                        funder.addr
140                    );
141
142                    funder
143                        .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
144                        .await?;
145                }
146            }
147        }
148
149        Ok(())
150    }
151}
152
153// just a helper struct to keep track of the balance maintainer
154pub struct BalanceMaintainer {
155    pub client: Mutex<SigningClient>,
156    query_client: QueryClient,
157    threshhold: u128,
158    amount: u128,
159    denom: Option<String>,
160}
161
162impl Manager for SigningClientPoolManager {
163    type Type = SigningClient;
164    type Error = anyhow::Error;
165
166    async fn create(&self) -> Result<SigningClient> {
167        // it's possible that the client hasn't ever been funded
168        // which would cause an error when trying to create it (specifically in base account)
169        // so if we're configured to use a funder, let's get the raw address
170        // before we create the client
171        let signer = self.create_signer()?;
172        let addr = self
173            .chain_config
174            .address_from_pub_key(&signer.public_key().await?)?;
175
176        self.maybe_top_up(addr).await?;
177        let client = self.create_client(Some(signer)).await?;
178
179        tracing::debug!("POOL CREATED CLIENT {}", client.addr);
180
181        Ok(client)
182    }
183
184    async fn recycle(
185        &self,
186        client: &mut SigningClient,
187        _: &Metrics,
188    ) -> RecycleResult<anyhow::Error> {
189        tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
190        self.maybe_top_up(client.addr.clone()).await?;
191
192        Ok(())
193    }
194}