1use anyhow::Result;
6use std::sync::Arc;
7use subxt::backend::legacy::LegacyRpcMethods;
8use subxt::backend::rpc::RpcClient;
9use subxt::{OnlineClient, PolkadotConfig};
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13fn is_insecure_endpoint(endpoint: &str) -> bool {
14 endpoint.starts_with("ws://") || endpoint.starts_with("http://")
15}
16
17#[derive(Debug, Clone)]
19pub struct TransferInfo {
20 pub from: String,
21 pub to: String,
22 pub amount: String,
23 pub block_number: u32,
24 pub event_index: usize,
25}
26
27pub struct BlockchainMonitor {
29 client: Arc<RwLock<OnlineClient<PolkadotConfig>>>,
30 rpc_methods: Arc<RwLock<LegacyRpcMethods<PolkadotConfig>>>,
31 endpoint: String,
32}
33
34impl BlockchainMonitor {
35 pub async fn new(endpoint: &str) -> Result<Self> {
37 let client = Self::create_client(endpoint).await?;
38 let rpc_methods = Self::create_rpc_methods(endpoint).await?;
39 Ok(Self {
40 client: Arc::new(RwLock::new(client)),
41 rpc_methods: Arc::new(RwLock::new(rpc_methods)),
42 endpoint: endpoint.to_string(),
43 })
44 }
45
46 async fn create_client(endpoint: &str) -> Result<OnlineClient<PolkadotConfig>> {
48 if is_insecure_endpoint(endpoint) {
49 debug!("Using insecure connection for endpoint: {}", endpoint);
50 Ok(OnlineClient::<PolkadotConfig>::from_insecure_url(endpoint).await?)
51 } else {
52 Ok(OnlineClient::<PolkadotConfig>::from_url(endpoint).await?)
53 }
54 }
55
56 async fn create_rpc_methods(endpoint: &str) -> Result<LegacyRpcMethods<PolkadotConfig>> {
58 let rpc = if is_insecure_endpoint(endpoint) {
59 RpcClient::from_insecure_url(endpoint).await?
60 } else {
61 RpcClient::from_url(endpoint).await?
62 };
63 Ok(LegacyRpcMethods::new(rpc))
64 }
65
66 pub async fn reconnect(&self) -> Result<()> {
68 info!("Reconnecting to blockchain at: {}", self.endpoint);
69 let new_client = Self::create_client(&self.endpoint).await?;
70 let new_rpc_methods = Self::create_rpc_methods(&self.endpoint).await?;
71
72 let mut client_guard = self.client.write().await;
74 let mut rpc_guard = self.rpc_methods.write().await;
75 *client_guard = new_client;
76 *rpc_guard = new_rpc_methods;
77
78 info!("Successfully reconnected to blockchain");
79 Ok(())
80 }
81
82 pub async fn get_current_block(&self) -> Result<u32> {
84 let client = self.client.read().await;
85 let block = client.blocks().at_latest().await?;
86 Ok(block.number())
87 }
88
89 pub async fn get_latest_transfers(&self) -> Result<Vec<TransferInfo>> {
91 let client = self.client.read().await;
92 let block = client.blocks().at_latest().await?;
93 Self::get_transfers_from_block(&client, block).await
94 }
95
96 pub async fn get_transfers_at_block(&self, block_number: u32) -> Result<Vec<TransferInfo>> {
98 let client = self.client.read().await;
100 let rpc_methods = self.rpc_methods.read().await;
101 let block_hash = rpc_methods
102 .chain_get_block_hash(Some(block_number.into()))
103 .await?
104 .ok_or_else(|| anyhow::anyhow!("Block {} not found", block_number))?;
105
106 let block = client.blocks().at(block_hash).await?;
107 Self::get_transfers_from_block(&client, block).await
108 }
109
110 async fn get_transfers_from_block(
112 client: &OnlineClient<PolkadotConfig>,
113 block: subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
114 ) -> Result<Vec<TransferInfo>> {
115 let _ = client; let mut transfers = Vec::new();
117 let block_num = block.number();
118 let events = block.events().await?;
119
120 for (idx, event) in events.iter().enumerate() {
121 if let Ok(ev) = event {
122 if ev.pallet_name() == "Balances" && ev.variant_name() == "Transfer" {
123 if let Some(transfer) = Self::extract_transfer(&ev, block_num, idx) {
124 transfers.push(transfer);
125 }
126 }
127 }
128 }
129
130 Ok(transfers)
131 }
132
133 fn extract_transfer(
135 ev: &subxt::events::EventDetails<PolkadotConfig>,
136 block_number: u32,
137 event_index: usize,
138 ) -> Option<TransferInfo> {
139 let fields = ev.field_values().ok()?;
140
141 let (from, to, amount) = match fields {
143 subxt::ext::scale_value::Composite::Named(named_fields) => {
144 let mut from = None;
145 let mut to = None;
146 let mut amount = None;
147
148 for (name, value) in named_fields {
149 match name.as_str() {
150 "from" => from = extract_account_hex(&value),
151 "to" => to = extract_account_hex(&value),
152 "amount" => amount = Some(value.to_string()),
153 _ => {}
154 }
155 }
156
157 (from?, to?, amount?)
158 }
159 subxt::ext::scale_value::Composite::Unnamed(unnamed_fields) => {
160 if unnamed_fields.len() < 3 {
161 return None;
162 }
163
164 let from = extract_account_hex(&unnamed_fields[0])?;
165 let to = extract_account_hex(&unnamed_fields[1])?;
166 let amount = unnamed_fields[2].to_string();
167
168 (from, to, amount)
169 }
170 };
171
172 Some(TransferInfo {
173 from,
174 to,
175 amount,
176 block_number,
177 event_index,
178 })
179 }
180
181 pub async fn poll_transfers<F>(
183 &self,
184 mut last_block: u32,
185 interval: tokio::time::Duration,
186 mut callback: F,
187 ) -> Result<()>
188 where
189 F: FnMut(Vec<TransferInfo>) -> Result<()>,
190 {
191 let mut ticker = tokio::time::interval(interval);
192
193 loop {
194 ticker.tick().await;
195
196 let current_block = self.get_current_block().await?;
197
198 if current_block > last_block {
199 let transfers = self.get_latest_transfers().await?;
201
202 if !transfers.is_empty() {
203 info!(
204 "Found {} transfers in block {}",
205 transfers.len(),
206 current_block
207 );
208 callback(transfers)?;
209 }
210
211 last_block = current_block;
212 }
213 }
214 }
215
216 pub fn endpoint(&self) -> &str {
218 &self.endpoint
219 }
220
221 pub fn is_block_likely_pruned(
224 current_block: u32,
225 target_block: u32,
226 retention_blocks: u32,
227 ) -> bool {
228 if target_block > current_block {
229 return false;
230 }
231 current_block - target_block > retention_blocks
232 }
233}
234
235fn extract_account_hex(value: &subxt::ext::scale_value::Value<u32>) -> Option<String> {
237 let value_str = value.to_string();
239
240 let cleaned = value_str
242 .trim_start_matches('(')
243 .trim_end_matches(')')
244 .trim_start_matches('(')
245 .trim_end_matches(')');
246
247 let bytes: Vec<u8> = cleaned
249 .split(',')
250 .filter_map(|s| s.trim().parse::<u8>().ok())
251 .collect();
252
253 if bytes.len() == 32 {
255 Some(bytes.iter().map(|b| format!("{:02x}", b)).collect())
256 } else {
257 debug!("Invalid account bytes length: {}", bytes.len());
258 None
259 }
260}
261
262#[cfg(test)]
263mod tests {
264
265 #[test]
266 fn test_extract_account_hex() {
267 let test_str = "((126, 85, 233, 164, 31, 92, 185, 17, 101, 198, 143, 31, 141, 41, 187, 43, 115, 147, 93, 29, 237, 199, 253, 100, 235, 33, 224, 71, 168, 155, 113, 242))";
273
274 let cleaned = test_str
276 .trim_start_matches('(')
277 .trim_end_matches(')')
278 .trim_start_matches('(')
279 .trim_end_matches(')');
280
281 let bytes: Vec<u8> = cleaned
282 .split(',')
283 .filter_map(|s| s.trim().parse::<u8>().ok())
284 .collect();
285
286 assert_eq!(bytes.len(), 32);
287
288 let hex: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
289 assert_eq!(hex.len(), 64); assert_eq!(&hex[0..2], "7e"); assert_eq!(&hex[2..4], "55"); }
293}