layer_climb_core/
pool.rs

1use std::sync::atomic::AtomicU32;
2
3use crate::{
4    cache::ClimbCache,
5    querier::{Connection, QueryClient},
6    signing::SigningClient,
7};
8use anyhow::{bail, Error, Result};
9use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use layer_climb_signer::{cosmos_hub_derivation, KeySigner, TxSigner};
13use tokio::sync::Mutex;
14
15/// Currently this only works with mnemonic phrases
16pub struct SigningClientPoolManager {
17    pub mnemonic: String,
18    pub derivation_index: AtomicU32,
19    pub chain_config: ChainConfig,
20    pub balance_maintainer: Option<BalanceMaintainer>,
21    pub cache: ClimbCache,
22    pub connection: Connection,
23}
24
25impl SigningClientPoolManager {
26    pub fn new_mnemonic(
27        mnemonic: String,
28        chain_config: ChainConfig,
29        start_index: Option<u32>,
30        connection: Option<Connection>,
31    ) -> Self {
32        let connection = connection.unwrap_or_default();
33        Self {
34            mnemonic,
35            chain_config,
36            derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
37            balance_maintainer: None,
38            cache: ClimbCache::new(connection.rpc.clone()),
39            connection,
40        }
41    }
42
43    // Setting this has a few implications:
44    // 1. on each client hand-out, it will query for the balance (no locking at all, just another query)
45    // 2. if the balance is below the threshhold set here, then it will lock the funding client for the transfer
46    //
47    // in other words, while the pool itself is completely async and can be parallelized, the balance maintainer
48    // does crate an async await across all clients who need to top-up an account, if they happen at the same time
49    //
50    // This isn't a major performance impact, but nevertheless,
51    // it's recommended to tune the values so that it's reasonably infrequent
52    pub async fn with_minimum_balance(
53        mut self,
54        // the minimum balance to maintain
55        // set this to as low as reasonable, to reduce unnecessary transfers
56        threshhold: u128,
57        // the amount to send to top up the account
58        // set this to as high as reasonable, to reduce unnecessary transfers
59        amount: u128,
60        // if set, it will use this client to fund the account
61        // otherwise, it will use the first derivation index, and bump it for subsequent clients
62        funder: Option<SigningClient>,
63        denom: Option<String>,
64    ) -> Result<Self> {
65        // keep a separate query client so we can get balances
66        // without locking the funder client
67        let query_client =
68            QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
69
70        let balance_maintainer = match funder {
71            Some(funder) => BalanceMaintainer {
72                client: Mutex::new(funder),
73                query_client,
74                threshhold,
75                amount,
76                denom,
77            },
78            None => BalanceMaintainer {
79                client: Mutex::new(self.create_client(None).await?),
80                query_client,
81                threshhold,
82                amount,
83                denom,
84            },
85        };
86
87        self.balance_maintainer = Some(balance_maintainer);
88        Ok(self)
89    }
90
91    fn create_signer(&self) -> Result<KeySigner> {
92        match &self.chain_config.address_kind {
93            layer_climb_address::AddrKind::Cosmos { .. } => {
94                let index = self
95                    .derivation_index
96                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
97
98                KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
99            }
100            layer_climb_address::AddrKind::Evm => {
101                bail!("EVM address kind is not supported (yet)")
102            }
103        }
104    }
105
106    async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
107        let signer: KeySigner = match signer {
108            Some(signer) => signer,
109            None => self.create_signer()?,
110        };
111
112        SigningClient::new_with_cache(
113            self.chain_config.clone(),
114            signer,
115            self.cache.clone(),
116            Some(self.connection.clone()),
117        )
118        .await
119    }
120
121    async fn maybe_top_up(&self, addr: Address) -> Result<()> {
122        if let Some(balance_maintainer) = &self.balance_maintainer {
123            let current_balance = balance_maintainer
124                .query_client
125                .balance(addr.clone(), balance_maintainer.denom.clone())
126                .await?
127                .unwrap_or_default();
128            if current_balance < balance_maintainer.threshhold {
129                let amount = balance_maintainer.amount - current_balance;
130                // just a scope to ensure we always drop the lock
131                {
132                    let funder = balance_maintainer.client.lock().await;
133
134                    tracing::debug!(
135                        "Balance on {} is {}, below {}, sending {} to top-up from {}",
136                        addr,
137                        current_balance,
138                        balance_maintainer.threshhold,
139                        amount,
140                        funder.addr
141                    );
142
143                    funder
144                        .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
145                        .await?;
146                }
147            }
148        }
149
150        Ok(())
151    }
152}
153
154// just a helper struct to keep track of the balance maintainer
155pub struct BalanceMaintainer {
156    pub client: Mutex<SigningClient>,
157    query_client: QueryClient,
158    threshhold: u128,
159    amount: u128,
160    denom: Option<String>,
161}
162
163impl Manager for SigningClientPoolManager {
164    type Type = SigningClient;
165    type Error = anyhow::Error;
166
167    async fn create(&self) -> Result<SigningClient> {
168        // it's possible that the client hasn't ever been funded
169        // which would cause an error when trying to create it (specifically in base account)
170        // so if we're configured to use a funder, let's get the raw address
171        // before we create the client
172        let signer = self.create_signer()?;
173        let addr = self
174            .chain_config
175            .address_from_pub_key(&signer.public_key().await?)?;
176
177        self.maybe_top_up(addr).await?;
178        let client = self.create_client(Some(signer)).await?;
179
180        tracing::debug!("POOL CREATED CLIENT {}", client.addr);
181
182        Ok(client)
183    }
184
185    async fn recycle(
186        &self,
187        client: &mut SigningClient,
188        _: &Metrics,
189    ) -> RecycleResult<anyhow::Error> {
190        tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
191
192        Ok(())
193    }
194}
195
196#[derive(Clone)]
197pub struct SigningClientPool {
198    pub pool: deadpool::managed::Pool<SigningClientPoolManager>,
199}
200
201impl SigningClientPool {
202    pub fn new(pool: deadpool::managed::Pool<SigningClientPoolManager>) -> Self {
203        Self { pool }
204    }
205
206    pub async fn get(&self) -> Result<Object<SigningClientPoolManager>, PoolError<Error>> {
207        let client = self.pool.get().await?;
208        self.pool
209            .manager()
210            .maybe_top_up(client.addr.clone())
211            .await?;
212        Ok(client)
213    }
214}