1use std::sync::atomic::AtomicU32;
2
3use crate::{cache::ClimbCache, querier::Connection, signing::SigningClient};
4use anyhow::{bail, Result};
5use deadpool::managed::{Manager, Metrics, RecycleResult};
6use layer_climb_address::*;
7use layer_climb_config::ChainConfig;
8use tokio::sync::Mutex;
9
10pub struct SigningClientPoolManager {
12 pub mnemonic: String,
13 pub derivation_index: AtomicU32,
14 pub chain_config: ChainConfig,
15 pub balance_maintainer: Option<BalanceMaintainer>,
16 pub cache: ClimbCache,
17 pub connection: Connection,
18}
19
20impl SigningClientPoolManager {
21 pub fn new_mnemonic(
22 mnemonic: String,
23 chain_config: ChainConfig,
24 start_index: Option<u32>,
25 connection: Option<Connection>,
26 ) -> Self {
27 let connection = connection.unwrap_or_default();
28 Self {
29 mnemonic,
30 chain_config,
31 derivation_index: AtomicU32::new(start_index.unwrap_or_default()),
32 balance_maintainer: None,
33 cache: ClimbCache::new(connection.rpc.clone()),
34 connection,
35 }
36 }
37
38 pub async fn with_minimum_balance(
48 mut self,
49 threshhold: u128,
52 amount: u128,
55 funder: Option<SigningClient>,
58 denom: Option<String>,
59 ) -> Result<Self> {
60 let balance_maintainer = match funder {
61 Some(funder) => BalanceMaintainer {
62 client: Mutex::new(funder),
63 threshhold,
64 amount,
65 denom,
66 },
67 None => BalanceMaintainer {
68 client: Mutex::new(self.create_client().await?),
69 threshhold,
70 amount,
71 denom,
72 },
73 };
74
75 self.balance_maintainer = Some(balance_maintainer);
76 Ok(self)
77 }
78
79 async fn create_client(&self) -> Result<SigningClient> {
80 let signer: KeySigner = match &self.chain_config.address_kind {
81 layer_climb_config::AddrKind::Cosmos { .. } => {
82 let index = self
83 .derivation_index
84 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
85
86 KeySigner::new_mnemonic_str(&self.mnemonic, Some(&cosmos_hub_derivation(index)?))?
87 }
88 layer_climb_config::AddrKind::Eth => {
89 bail!("Eth address kind is not supported (yet)")
90 }
91 };
92
93 SigningClient::new_with_cache(
94 self.chain_config.clone(),
95 signer,
96 self.cache.clone(),
97 Some(self.connection.clone()),
98 )
99 .await
100 }
101
102 async fn maybe_top_up(&self, client: &SigningClient) -> Result<()> {
103 if let Some(balance_maintainer) = &self.balance_maintainer {
104 let current_balance = client
105 .querier
106 .balance(client.addr.clone(), balance_maintainer.denom.clone())
107 .await?
108 .unwrap_or_default();
109 if current_balance < balance_maintainer.threshhold {
110 let amount = balance_maintainer.amount - current_balance;
111 {
113 let funder = balance_maintainer.client.lock().await;
114
115 tracing::debug!(
116 "Balance on {} is {}, below {}, sending {} to top-up from {}",
117 client.addr,
118 current_balance,
119 balance_maintainer.threshhold,
120 amount,
121 funder.addr
122 );
123
124 funder
125 .transfer(
126 amount,
127 &client.addr,
128 balance_maintainer.denom.as_deref(),
129 None,
130 )
131 .await?;
132 }
133 }
134 }
135
136 Ok(())
137 }
138}
139
140pub struct BalanceMaintainer {
142 client: Mutex<SigningClient>,
143 threshhold: u128,
144 amount: u128,
145 denom: Option<String>,
146}
147
148impl Manager for SigningClientPoolManager {
149 type Type = SigningClient;
150 type Error = anyhow::Error;
151
152 async fn create(&self) -> Result<SigningClient> {
153 let client = self.create_client().await?;
154 tracing::debug!("POOL CREATED CLIENT {}", client.addr);
155 self.maybe_top_up(&client).await?;
156
157 Ok(client)
158 }
159
160 async fn recycle(
161 &self,
162 client: &mut SigningClient,
163 _: &Metrics,
164 ) -> RecycleResult<anyhow::Error> {
165 tracing::debug!("POOL RECYCLING CLIENT {}", client.addr);
166 self.maybe_top_up(client).await?;
167
168 Ok(())
169 }
170}