1use std::sync::atomic::AtomicU32;
2
3use crate::{
4 cache::ClimbCache,
5 querier::{Connection, QueryClient},
6 signing::SigningClient,
7};
8use anyhow::{bail, Error, Result};
9use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use layer_climb_signer::{cosmos_hub_derivation, KeySigner, TxSigner};
13use tokio::sync::Mutex;
14
15pub struct SigningClientPoolManager {
17 pub mnemonic: String,
18 pub derivation_index: AtomicU32,
19 pub chain_config: ChainConfig,
20 pub balance_maintainer: Option<BalanceMaintainer>,
21 pub cache: ClimbCache,
22 pub connection: Connection,
23}
24
25impl SigningClientPoolManager {
26 pub fn new_mnemonic(
27 mnemonic: String,
28 chain_config: ChainConfig,
29 start_index: Option<u32>,
30 connection: Option<Connection>,
31 ) -> Self {
32 let connection = connection.unwrap_or_default();
33 Self {
34 mnemonic,
35 chain_config,
36 derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
37 balance_maintainer: None,
38 cache: ClimbCache::new(connection.rpc.clone()),
39 connection,
40 }
41 }
42
43 pub async fn address(&self, index: u32) -> Result<Address, PoolError<Error>> {
44 let signer =
45 KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))?;
46
47 let addr = self
48 .chain_config
49 .address_from_pub_key(&signer.public_key().await?)?;
50
51 Ok(addr)
52 }
53
54 pub async fn with_minimum_balance(
64 mut self,
65 threshhold: u128,
68 amount: u128,
71 funder: Option<SigningClient>,
74 denom: Option<String>,
75 ) -> Result<Self> {
76 let query_client =
79 QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
80
81 let balance_maintainer = match funder {
82 Some(funder) => {
83 let addr = funder.addr.clone();
84 BalanceMaintainer {
85 client: Mutex::new(funder),
86 addr,
87 query_client,
88 threshhold,
89 amount,
90 denom,
91 }
92 }
93 None => {
94 let funder = self.create_client(None).await?;
95 let addr = funder.addr.clone();
96
97 BalanceMaintainer {
98 client: Mutex::new(funder),
99 addr,
100 query_client,
101 threshhold,
102 amount,
103 denom,
104 }
105 }
106 };
107
108 self.balance_maintainer = Some(balance_maintainer);
109 Ok(self)
110 }
111
112 fn create_signer(&self) -> Result<KeySigner> {
113 match &self.chain_config.address_kind {
114 layer_climb_address::AddrKind::Cosmos { .. } => {
115 let index = self
116 .derivation_index
117 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
118
119 KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
120 }
121 layer_climb_address::AddrKind::Evm => {
122 bail!("EVM address kind is not supported (yet)")
123 }
124 }
125 }
126
127 async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
128 let signer: KeySigner = match signer {
129 Some(signer) => signer,
130 None => self.create_signer()?,
131 };
132
133 SigningClient::new_with_cache(
134 self.chain_config.clone(),
135 signer,
136 self.cache.clone(),
137 Some(self.connection.clone()),
138 )
139 .await
140 }
141
142 async fn maybe_top_up(&self, addr: Address) -> Result<()> {
143 if let Some(balance_maintainer) = &self.balance_maintainer {
144 let current_balance = balance_maintainer
145 .query_client
146 .balance(addr.clone(), balance_maintainer.denom.clone())
147 .await?
148 .unwrap_or_default();
149 if current_balance < balance_maintainer.threshhold {
150 let amount = balance_maintainer.amount - current_balance;
151 {
153 let funder = balance_maintainer.client.lock().await;
154
155 tracing::debug!(
156 "[top-up] Balance on {} is {}, below {}, sending {} from {}",
157 addr,
158 current_balance,
159 balance_maintainer.threshhold,
160 amount,
161 funder.addr
162 );
163
164 funder
165 .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
166 .await?;
167 }
168 }
169 }
170
171 Ok(())
172 }
173}
174
175pub struct BalanceMaintainer {
177 pub client: Mutex<SigningClient>,
178 pub addr: Address,
179 query_client: QueryClient,
180 threshhold: u128,
181 amount: u128,
182 denom: Option<String>,
183}
184
185impl Manager for SigningClientPoolManager {
186 type Type = SigningClient;
187 type Error = anyhow::Error;
188
189 async fn create(&self) -> Result<SigningClient> {
190 let signer = self.create_signer()?;
195 let addr = self
196 .chain_config
197 .address_from_pub_key(&signer.public_key().await?)?;
198
199 self.maybe_top_up(addr).await?;
200 let client = self.create_client(Some(signer)).await?;
201
202 tracing::debug!("POOL CREATED CLIENT {}", client.addr);
203
204 Ok(client)
205 }
206
207 async fn recycle(
208 &self,
209 client: &mut SigningClient,
210 _: &Metrics,
211 ) -> RecycleResult<anyhow::Error> {
212 tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
213
214 Ok(())
215 }
216}
217
218#[derive(Clone)]
219pub struct SigningClientPool {
220 pool: deadpool::managed::Pool<SigningClientPoolManager>,
221}
222
223impl SigningClientPool {
224 pub fn new(pool: deadpool::managed::Pool<SigningClientPoolManager>) -> Self {
225 Self { pool }
226 }
227
228 pub fn inner(&self) -> &deadpool::managed::Pool<SigningClientPoolManager> {
229 &self.pool
230 }
231
232 pub fn manager(&self) -> &SigningClientPoolManager {
233 self.pool.manager()
234 }
235
236 pub async fn get(&self) -> Result<Object<SigningClientPoolManager>, PoolError<Error>> {
237 let client = self.pool.get().await?;
238 self.pool
239 .manager()
240 .maybe_top_up(client.addr.clone())
241 .await?;
242 Ok(client)
243 }
244}