1use std::sync::atomic::AtomicU32;
2
3use crate::{
4 cache::ClimbCache,
5 querier::{Connection, QueryClient},
6 signing::SigningClient,
7};
8use anyhow::{bail, Error, Result};
9use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use layer_climb_signer::{cosmos_hub_derivation, KeySigner, TxSigner};
13use tokio::sync::Mutex;
14
15pub struct SigningClientPoolManager {
17 pub mnemonic: String,
18 pub derivation_index: AtomicU32,
19 pub chain_config: ChainConfig,
20 pub balance_maintainer: Option<BalanceMaintainer>,
21 pub cache: ClimbCache,
22 pub connection: Connection,
23}
24
25impl SigningClientPoolManager {
26 pub fn new_mnemonic(
27 mnemonic: String,
28 chain_config: ChainConfig,
29 start_index: Option<u32>,
30 connection: Option<Connection>,
31 ) -> Self {
32 let connection = connection.unwrap_or_default();
33 Self {
34 mnemonic,
35 chain_config,
36 derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
37 balance_maintainer: None,
38 cache: ClimbCache::new(connection.rpc.clone()),
39 connection,
40 }
41 }
42
43 pub async fn with_minimum_balance(
53 mut self,
54 threshhold: u128,
57 amount: u128,
60 funder: Option<SigningClient>,
63 denom: Option<String>,
64 ) -> Result<Self> {
65 let query_client =
68 QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
69
70 let balance_maintainer = match funder {
71 Some(funder) => BalanceMaintainer {
72 client: Mutex::new(funder),
73 query_client,
74 threshhold,
75 amount,
76 denom,
77 },
78 None => BalanceMaintainer {
79 client: Mutex::new(self.create_client(None).await?),
80 query_client,
81 threshhold,
82 amount,
83 denom,
84 },
85 };
86
87 self.balance_maintainer = Some(balance_maintainer);
88 Ok(self)
89 }
90
91 fn create_signer(&self) -> Result<KeySigner> {
92 match &self.chain_config.address_kind {
93 layer_climb_address::AddrKind::Cosmos { .. } => {
94 let index = self
95 .derivation_index
96 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
97
98 KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
99 }
100 layer_climb_address::AddrKind::Evm => {
101 bail!("EVM address kind is not supported (yet)")
102 }
103 }
104 }
105
106 async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
107 let signer: KeySigner = match signer {
108 Some(signer) => signer,
109 None => self.create_signer()?,
110 };
111
112 SigningClient::new_with_cache(
113 self.chain_config.clone(),
114 signer,
115 self.cache.clone(),
116 Some(self.connection.clone()),
117 )
118 .await
119 }
120
121 async fn maybe_top_up(&self, addr: Address) -> Result<()> {
122 if let Some(balance_maintainer) = &self.balance_maintainer {
123 let current_balance = balance_maintainer
124 .query_client
125 .balance(addr.clone(), balance_maintainer.denom.clone())
126 .await?
127 .unwrap_or_default();
128 if current_balance < balance_maintainer.threshhold {
129 let amount = balance_maintainer.amount - current_balance;
130 {
132 let funder = balance_maintainer.client.lock().await;
133
134 tracing::debug!(
135 "Balance on {} is {}, below {}, sending {} to top-up from {}",
136 addr,
137 current_balance,
138 balance_maintainer.threshhold,
139 amount,
140 funder.addr
141 );
142
143 funder
144 .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
145 .await?;
146 }
147 }
148 }
149
150 Ok(())
151 }
152}
153
154pub struct BalanceMaintainer {
156 pub client: Mutex<SigningClient>,
157 query_client: QueryClient,
158 threshhold: u128,
159 amount: u128,
160 denom: Option<String>,
161}
162
163impl Manager for SigningClientPoolManager {
164 type Type = SigningClient;
165 type Error = anyhow::Error;
166
167 async fn create(&self) -> Result<SigningClient> {
168 let signer = self.create_signer()?;
173 let addr = self
174 .chain_config
175 .address_from_pub_key(&signer.public_key().await?)?;
176
177 self.maybe_top_up(addr).await?;
178 let client = self.create_client(Some(signer)).await?;
179
180 tracing::debug!("POOL CREATED CLIENT {}", client.addr);
181
182 Ok(client)
183 }
184
185 async fn recycle(
186 &self,
187 client: &mut SigningClient,
188 _: &Metrics,
189 ) -> RecycleResult<anyhow::Error> {
190 tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
191
192 Ok(())
193 }
194}
195
196#[derive(Clone)]
197pub struct SigningClientPool {
198 pub pool: deadpool::managed::Pool<SigningClientPoolManager>,
199}
200
201impl SigningClientPool {
202 pub fn new(pool: deadpool::managed::Pool<SigningClientPoolManager>) -> Self {
203 Self { pool }
204 }
205
206 pub async fn get(&self) -> Result<Object<SigningClientPoolManager>, PoolError<Error>> {
207 let client = self.pool.get().await?;
208 self.pool
209 .manager()
210 .maybe_top_up(client.addr.clone())
211 .await?;
212 Ok(client)
213 }
214}