layer_climb_core/
pool.rs

1use std::sync::atomic::AtomicU32;
2
3use crate::{
4    cache::ClimbCache,
5    querier::{Connection, QueryClient},
6    signing::SigningClient,
7};
8use anyhow::{bail, Error, Result};
9use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use layer_climb_signer::{cosmos_hub_derivation, KeySigner, TxSigner};
13use tokio::sync::Mutex;
14
15/// Currently this only works with mnemonic phrases
16pub struct SigningClientPoolManager {
17    pub mnemonic: String,
18    pub derivation_index: AtomicU32,
19    pub chain_config: ChainConfig,
20    pub balance_maintainer: Option<BalanceMaintainer>,
21    pub cache: ClimbCache,
22    pub connection: Connection,
23}
24
25impl SigningClientPoolManager {
26    pub fn new_mnemonic(
27        mnemonic: String,
28        chain_config: ChainConfig,
29        start_index: Option<u32>,
30        connection: Option<Connection>,
31    ) -> Self {
32        let connection = connection.unwrap_or_default();
33        Self {
34            mnemonic,
35            chain_config,
36            derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
37            balance_maintainer: None,
38            cache: ClimbCache::new(connection.rpc.clone()),
39            connection,
40        }
41    }
42
43    pub async fn address(&self, index: u32) -> Result<Address, PoolError<Error>> {
44        let signer =
45            KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))?;
46
47        let addr = self
48            .chain_config
49            .address_from_pub_key(&signer.public_key().await?)?;
50
51        Ok(addr)
52    }
53
54    // Setting this has a few implications:
55    // 1. on each client hand-out, it will query for the balance (no locking at all, just another query)
56    // 2. if the balance is below the threshhold set here, then it will lock the funding client for the transfer
57    //
58    // in other words, while the pool itself is completely async and can be parallelized, the balance maintainer
59    // does crate an async await across all clients who need to top-up an account, if they happen at the same time
60    //
61    // This isn't a major performance impact, but nevertheless,
62    // it's recommended to tune the values so that it's reasonably infrequent
63    pub async fn with_minimum_balance(
64        mut self,
65        // the minimum balance to maintain
66        // set this to as low as reasonable, to reduce unnecessary transfers
67        threshhold: u128,
68        // the amount to send to top up the account
69        // set this to as high as reasonable, to reduce unnecessary transfers
70        amount: u128,
71        // if set, it will use this client to fund the account
72        // otherwise, it will use the first derivation index, and bump it for subsequent clients
73        funder: Option<SigningClient>,
74        denom: Option<String>,
75    ) -> Result<Self> {
76        // keep a separate query client so we can get balances
77        // without locking the funder client
78        let query_client =
79            QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
80
81        let balance_maintainer = match funder {
82            Some(funder) => {
83                let addr = funder.addr.clone();
84                BalanceMaintainer {
85                    client: Mutex::new(funder),
86                    addr,
87                    query_client,
88                    threshhold,
89                    amount,
90                    denom,
91                }
92            }
93            None => {
94                let funder = self.create_client(None).await?;
95                let addr = funder.addr.clone();
96
97                BalanceMaintainer {
98                    client: Mutex::new(funder),
99                    addr,
100                    query_client,
101                    threshhold,
102                    amount,
103                    denom,
104                }
105            }
106        };
107
108        self.balance_maintainer = Some(balance_maintainer);
109        Ok(self)
110    }
111
112    fn create_signer(&self) -> Result<KeySigner> {
113        match &self.chain_config.address_kind {
114            layer_climb_address::AddrKind::Cosmos { .. } => {
115                let index = self
116                    .derivation_index
117                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
118
119                KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
120            }
121            layer_climb_address::AddrKind::Evm => {
122                bail!("EVM address kind is not supported (yet)")
123            }
124        }
125    }
126
127    async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
128        let signer: KeySigner = match signer {
129            Some(signer) => signer,
130            None => self.create_signer()?,
131        };
132
133        SigningClient::new_with_cache(
134            self.chain_config.clone(),
135            signer,
136            self.cache.clone(),
137            Some(self.connection.clone()),
138        )
139        .await
140    }
141
142    async fn maybe_top_up(&self, addr: Address) -> Result<()> {
143        if let Some(balance_maintainer) = &self.balance_maintainer {
144            let current_balance = balance_maintainer
145                .query_client
146                .balance(addr.clone(), balance_maintainer.denom.clone())
147                .await?
148                .unwrap_or_default();
149            if current_balance < balance_maintainer.threshhold {
150                let amount = balance_maintainer.amount - current_balance;
151                // just a scope to ensure we always drop the lock
152                {
153                    let funder = balance_maintainer.client.lock().await;
154
155                    tracing::debug!(
156                        "[top-up] Balance on {} is {}, below {}, sending {} from {}",
157                        addr,
158                        current_balance,
159                        balance_maintainer.threshhold,
160                        amount,
161                        funder.addr
162                    );
163
164                    funder
165                        .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
166                        .await?;
167                }
168            }
169        }
170
171        Ok(())
172    }
173}
174
175// just a helper struct to keep track of the balance maintainer
176pub struct BalanceMaintainer {
177    pub client: Mutex<SigningClient>,
178    pub addr: Address,
179    query_client: QueryClient,
180    threshhold: u128,
181    amount: u128,
182    denom: Option<String>,
183}
184
185impl Manager for SigningClientPoolManager {
186    type Type = SigningClient;
187    type Error = anyhow::Error;
188
189    async fn create(&self) -> Result<SigningClient> {
190        // it's possible that the client hasn't ever been funded
191        // which would cause an error when trying to create it (specifically in base account)
192        // so if we're configured to use a funder, let's get the raw address
193        // before we create the client
194        let signer = self.create_signer()?;
195        let addr = self
196            .chain_config
197            .address_from_pub_key(&signer.public_key().await?)?;
198
199        self.maybe_top_up(addr).await?;
200        let client = self.create_client(Some(signer)).await?;
201
202        tracing::debug!("POOL CREATED CLIENT {}", client.addr);
203
204        Ok(client)
205    }
206
207    async fn recycle(
208        &self,
209        client: &mut SigningClient,
210        _: &Metrics,
211    ) -> RecycleResult<anyhow::Error> {
212        tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
213
214        Ok(())
215    }
216}
217
218#[derive(Clone)]
219pub struct SigningClientPool {
220    pool: deadpool::managed::Pool<SigningClientPoolManager>,
221}
222
223impl SigningClientPool {
224    pub fn new(pool: deadpool::managed::Pool<SigningClientPoolManager>) -> Self {
225        Self { pool }
226    }
227
228    pub fn inner(&self) -> &deadpool::managed::Pool<SigningClientPoolManager> {
229        &self.pool
230    }
231
232    pub fn manager(&self) -> &SigningClientPoolManager {
233        self.pool.manager()
234    }
235
236    pub async fn get(&self) -> Result<Object<SigningClientPoolManager>, PoolError<Error>> {
237        let client = self.pool.get().await?;
238        self.pool
239            .manager()
240            .maybe_top_up(client.addr.clone())
241            .await?;
242        Ok(client)
243    }
244}