1use std::sync::atomic::AtomicU32;
2
3use crate::{
4 cache::ClimbCache,
5 querier::{Connection, QueryClient},
6 signing::SigningClient,
7};
8use anyhow::{bail, Result};
9use deadpool::managed::{Manager, Metrics, RecycleResult};
10use layer_climb_address::*;
11use layer_climb_config::ChainConfig;
12use tokio::sync::Mutex;
13
14pub struct SigningClientPoolManager {
16 pub mnemonic: String,
17 pub derivation_index: AtomicU32,
18 pub chain_config: ChainConfig,
19 pub balance_maintainer: Option<BalanceMaintainer>,
20 pub cache: ClimbCache,
21 pub connection: Connection,
22}
23
24impl SigningClientPoolManager {
25 pub fn new_mnemonic(
26 mnemonic: String,
27 chain_config: ChainConfig,
28 start_index: Option<u32>,
29 connection: Option<Connection>,
30 ) -> Self {
31 let connection = connection.unwrap_or_default();
32 Self {
33 mnemonic,
34 chain_config,
35 derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
36 balance_maintainer: None,
37 cache: ClimbCache::new(connection.rpc.clone()),
38 connection,
39 }
40 }
41
42 pub async fn with_minimum_balance(
52 mut self,
53 threshhold: u128,
56 amount: u128,
59 funder: Option<SigningClient>,
62 denom: Option<String>,
63 ) -> Result<Self> {
64 let query_client =
67 QueryClient::new(self.chain_config.clone(), Some(self.connection.clone())).await?;
68
69 let balance_maintainer = match funder {
70 Some(funder) => BalanceMaintainer {
71 client: Mutex::new(funder),
72 query_client,
73 threshhold,
74 amount,
75 denom,
76 },
77 None => BalanceMaintainer {
78 client: Mutex::new(self.create_client(None).await?),
79 query_client,
80 threshhold,
81 amount,
82 denom,
83 },
84 };
85
86 self.balance_maintainer = Some(balance_maintainer);
87 Ok(self)
88 }
89
90 fn create_signer(&self) -> Result<KeySigner> {
91 match &self.chain_config.address_kind {
92 layer_climb_config::AddrKind::Cosmos { .. } => {
93 let index = self
94 .derivation_index
95 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
96
97 KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))
98 }
99 layer_climb_config::AddrKind::Evm => {
100 bail!("EVM address kind is not supported (yet)")
101 }
102 }
103 }
104
105 async fn create_client(&self, signer: Option<KeySigner>) -> Result<SigningClient> {
106 let signer: KeySigner = match signer {
107 Some(signer) => signer,
108 None => self.create_signer()?,
109 };
110
111 SigningClient::new_with_cache(
112 self.chain_config.clone(),
113 signer,
114 self.cache.clone(),
115 Some(self.connection.clone()),
116 )
117 .await
118 }
119
120 async fn maybe_top_up(&self, addr: Address) -> Result<()> {
121 if let Some(balance_maintainer) = &self.balance_maintainer {
122 let current_balance = balance_maintainer
123 .query_client
124 .balance(addr.clone(), balance_maintainer.denom.clone())
125 .await?
126 .unwrap_or_default();
127 if current_balance < balance_maintainer.threshhold {
128 let amount = balance_maintainer.amount - current_balance;
129 {
131 let funder = balance_maintainer.client.lock().await;
132
133 tracing::debug!(
134 "Balance on {} is {}, below {}, sending {} to top-up from {}",
135 addr,
136 current_balance,
137 balance_maintainer.threshhold,
138 amount,
139 funder.addr
140 );
141
142 funder
143 .transfer(amount, &addr, balance_maintainer.denom.as_deref(), None)
144 .await?;
145 }
146 }
147 }
148
149 Ok(())
150 }
151}
152
153pub struct BalanceMaintainer {
155 pub client: Mutex<SigningClient>,
156 query_client: QueryClient,
157 threshhold: u128,
158 amount: u128,
159 denom: Option<String>,
160}
161
162impl Manager for SigningClientPoolManager {
163 type Type = SigningClient;
164 type Error = anyhow::Error;
165
166 async fn create(&self) -> Result<SigningClient> {
167 let signer = self.create_signer()?;
172 let addr = self
173 .chain_config
174 .address_from_pub_key(&signer.public_key().await?)?;
175
176 self.maybe_top_up(addr).await?;
177 let client = self.create_client(Some(signer)).await?;
178
179 tracing::debug!("POOL CREATED CLIENT {}", client.addr);
180
181 Ok(client)
182 }
183
184 async fn recycle(
185 &self,
186 client: &mut SigningClient,
187 _: &Metrics,
188 ) -> RecycleResult<anyhow::Error> {
189 tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
190 self.maybe_top_up(client.addr.clone()).await?;
191
192 Ok(())
193 }
194}