use anyhow::Result;
use std::sync::Arc;
use subxt::backend::legacy::LegacyRpcMethods;
use subxt::backend::rpc::RpcClient;
use subxt::{OnlineClient, PolkadotConfig};
use tokio::sync::RwLock;
use tracing::{debug, info};
fn is_insecure_endpoint(endpoint: &str) -> bool {
endpoint.starts_with("ws://") || endpoint.starts_with("http://")
}
#[derive(Debug, Clone)]
pub struct TransferInfo {
pub from: String,
pub to: String,
pub amount: String,
pub block_number: u32,
pub event_index: usize,
}
pub struct BlockchainMonitor {
client: Arc<RwLock<OnlineClient<PolkadotConfig>>>,
rpc_methods: Arc<RwLock<LegacyRpcMethods<PolkadotConfig>>>,
endpoint: String,
}
impl BlockchainMonitor {
pub async fn new(endpoint: &str) -> Result<Self> {
let client = Self::create_client(endpoint).await?;
let rpc_methods = Self::create_rpc_methods(endpoint).await?;
Ok(Self {
client: Arc::new(RwLock::new(client)),
rpc_methods: Arc::new(RwLock::new(rpc_methods)),
endpoint: endpoint.to_string(),
})
}
async fn create_client(endpoint: &str) -> Result<OnlineClient<PolkadotConfig>> {
if is_insecure_endpoint(endpoint) {
debug!("Using insecure connection for endpoint: {}", endpoint);
Ok(OnlineClient::<PolkadotConfig>::from_insecure_url(endpoint).await?)
} else {
Ok(OnlineClient::<PolkadotConfig>::from_url(endpoint).await?)
}
}
async fn create_rpc_methods(endpoint: &str) -> Result<LegacyRpcMethods<PolkadotConfig>> {
let rpc = if is_insecure_endpoint(endpoint) {
RpcClient::from_insecure_url(endpoint).await?
} else {
RpcClient::from_url(endpoint).await?
};
Ok(LegacyRpcMethods::new(rpc))
}
pub async fn reconnect(&self) -> Result<()> {
info!("Reconnecting to blockchain at: {}", self.endpoint);
let new_client = Self::create_client(&self.endpoint).await?;
let new_rpc_methods = Self::create_rpc_methods(&self.endpoint).await?;
let mut client_guard = self.client.write().await;
let mut rpc_guard = self.rpc_methods.write().await;
*client_guard = new_client;
*rpc_guard = new_rpc_methods;
info!("Successfully reconnected to blockchain");
Ok(())
}
pub async fn get_current_block(&self) -> Result<u32> {
let client = self.client.read().await;
let block = client.blocks().at_latest().await?;
Ok(block.number())
}
pub async fn get_latest_transfers(&self) -> Result<Vec<TransferInfo>> {
let client = self.client.read().await;
let block = client.blocks().at_latest().await?;
Self::get_transfers_from_block(&client, block).await
}
pub async fn get_transfers_at_block(&self, block_number: u32) -> Result<Vec<TransferInfo>> {
let client = self.client.read().await;
let rpc_methods = self.rpc_methods.read().await;
let block_hash = rpc_methods
.chain_get_block_hash(Some(block_number.into()))
.await?
.ok_or_else(|| anyhow::anyhow!("Block {} not found", block_number))?;
let block = client.blocks().at(block_hash).await?;
Self::get_transfers_from_block(&client, block).await
}
async fn get_transfers_from_block(
client: &OnlineClient<PolkadotConfig>,
block: subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
) -> Result<Vec<TransferInfo>> {
let _ = client; let mut transfers = Vec::new();
let block_num = block.number();
let events = block.events().await?;
for (idx, event) in events.iter().enumerate() {
if let Ok(ev) = event {
if ev.pallet_name() == "Balances" && ev.variant_name() == "Transfer" {
if let Some(transfer) = Self::extract_transfer(&ev, block_num, idx) {
transfers.push(transfer);
}
}
}
}
Ok(transfers)
}
fn extract_transfer(
ev: &subxt::events::EventDetails<PolkadotConfig>,
block_number: u32,
event_index: usize,
) -> Option<TransferInfo> {
let fields = ev.field_values().ok()?;
let (from, to, amount) = match fields {
subxt::ext::scale_value::Composite::Named(named_fields) => {
let mut from = None;
let mut to = None;
let mut amount = None;
for (name, value) in named_fields {
match name.as_str() {
"from" => from = extract_account_hex(&value),
"to" => to = extract_account_hex(&value),
"amount" => amount = Some(value.to_string()),
_ => {}
}
}
(from?, to?, amount?)
}
subxt::ext::scale_value::Composite::Unnamed(unnamed_fields) => {
if unnamed_fields.len() < 3 {
return None;
}
let from = extract_account_hex(&unnamed_fields[0])?;
let to = extract_account_hex(&unnamed_fields[1])?;
let amount = unnamed_fields[2].to_string();
(from, to, amount)
}
};
Some(TransferInfo {
from,
to,
amount,
block_number,
event_index,
})
}
pub async fn poll_transfers<F>(
&self,
mut last_block: u32,
interval: tokio::time::Duration,
mut callback: F,
) -> Result<()>
where
F: FnMut(Vec<TransferInfo>) -> Result<()>,
{
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let current_block = self.get_current_block().await?;
if current_block > last_block {
let transfers = self.get_latest_transfers().await?;
if !transfers.is_empty() {
info!(
"Found {} transfers in block {}",
transfers.len(),
current_block
);
callback(transfers)?;
}
last_block = current_block;
}
}
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn is_block_likely_pruned(
current_block: u32,
target_block: u32,
retention_blocks: u32,
) -> bool {
if target_block > current_block {
return false;
}
current_block - target_block > retention_blocks
}
}
fn extract_account_hex(value: &subxt::ext::scale_value::Value<u32>) -> Option<String> {
let value_str = value.to_string();
let cleaned = value_str
.trim_start_matches('(')
.trim_end_matches(')')
.trim_start_matches('(')
.trim_end_matches(')');
let bytes: Vec<u8> = cleaned
.split(',')
.filter_map(|s| s.trim().parse::<u8>().ok())
.collect();
if bytes.len() == 32 {
Some(bytes.iter().map(|b| format!("{:02x}", b)).collect())
} else {
debug!("Invalid account bytes length: {}", bytes.len());
None
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_extract_account_hex() {
let test_str = "((126, 85, 233, 164, 31, 92, 185, 17, 101, 198, 143, 31, 141, 41, 187, 43, 115, 147, 93, 29, 237, 199, 253, 100, 235, 33, 224, 71, 168, 155, 113, 242))";
let cleaned = test_str
.trim_start_matches('(')
.trim_end_matches(')')
.trim_start_matches('(')
.trim_end_matches(')');
let bytes: Vec<u8> = cleaned
.split(',')
.filter_map(|s| s.trim().parse::<u8>().ok())
.collect();
assert_eq!(bytes.len(), 32);
let hex: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
assert_eq!(hex.len(), 64); assert_eq!(&hex[0..2], "7e"); assert_eq!(&hex[2..4], "55"); }
}