bittensor_rs/connect/
monitor.rs

1//! Blockchain monitoring for Bittensor Substrate chain
2//!
3//! Simple, robust implementation following KISS principle.
4
5use anyhow::Result;
6use std::sync::Arc;
7use subxt::backend::legacy::LegacyRpcMethods;
8use subxt::backend::rpc::RpcClient;
9use subxt::{OnlineClient, PolkadotConfig};
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13fn is_insecure_endpoint(endpoint: &str) -> bool {
14    endpoint.starts_with("ws://") || endpoint.starts_with("http://")
15}
16
17/// Transfer event data
18#[derive(Debug, Clone)]
19pub struct TransferInfo {
20    pub from: String,
21    pub to: String,
22    pub amount: String,
23    pub block_number: u32,
24    pub event_index: usize,
25}
26
27/// Simple blockchain monitor for Bittensor transfers
28pub struct BlockchainMonitor {
29    client: Arc<RwLock<OnlineClient<PolkadotConfig>>>,
30    rpc_methods: Arc<RwLock<LegacyRpcMethods<PolkadotConfig>>>,
31    endpoint: String,
32}
33
34impl BlockchainMonitor {
35    /// Connect to blockchain endpoint
36    pub async fn new(endpoint: &str) -> Result<Self> {
37        let client = Self::create_client(endpoint).await?;
38        let rpc_methods = Self::create_rpc_methods(endpoint).await?;
39        Ok(Self {
40            client: Arc::new(RwLock::new(client)),
41            rpc_methods: Arc::new(RwLock::new(rpc_methods)),
42            endpoint: endpoint.to_string(),
43        })
44    }
45
46    /// Create a new OnlineClient for the endpoint
47    async fn create_client(endpoint: &str) -> Result<OnlineClient<PolkadotConfig>> {
48        if is_insecure_endpoint(endpoint) {
49            debug!("Using insecure connection for endpoint: {}", endpoint);
50            Ok(OnlineClient::<PolkadotConfig>::from_insecure_url(endpoint).await?)
51        } else {
52            Ok(OnlineClient::<PolkadotConfig>::from_url(endpoint).await?)
53        }
54    }
55
56    /// Create RPC methods client for the endpoint
57    async fn create_rpc_methods(endpoint: &str) -> Result<LegacyRpcMethods<PolkadotConfig>> {
58        let rpc = if is_insecure_endpoint(endpoint) {
59            RpcClient::from_insecure_url(endpoint).await?
60        } else {
61            RpcClient::from_url(endpoint).await?
62        };
63        Ok(LegacyRpcMethods::new(rpc))
64    }
65
66    /// Reconnect to the blockchain endpoint
67    pub async fn reconnect(&self) -> Result<()> {
68        info!("Reconnecting to blockchain at: {}", self.endpoint);
69        let new_client = Self::create_client(&self.endpoint).await?;
70        let new_rpc_methods = Self::create_rpc_methods(&self.endpoint).await?;
71
72        // Update both clients atomically under write locks
73        let mut client_guard = self.client.write().await;
74        let mut rpc_guard = self.rpc_methods.write().await;
75        *client_guard = new_client;
76        *rpc_guard = new_rpc_methods;
77
78        info!("Successfully reconnected to blockchain");
79        Ok(())
80    }
81
82    /// Get current block number
83    pub async fn get_current_block(&self) -> Result<u32> {
84        let client = self.client.read().await;
85        let block = client.blocks().at_latest().await?;
86        Ok(block.number())
87    }
88
89    /// Get transfers from latest block
90    pub async fn get_latest_transfers(&self) -> Result<Vec<TransferInfo>> {
91        let client = self.client.read().await;
92        let block = client.blocks().at_latest().await?;
93        Self::get_transfers_from_block(&client, block).await
94    }
95
96    /// Get transfers from a specific block number
97    pub async fn get_transfers_at_block(&self, block_number: u32) -> Result<Vec<TransferInfo>> {
98        // Acquire locks in consistent order (client first, then rpc_methods) to prevent deadlock
99        let client = self.client.read().await;
100        let rpc_methods = self.rpc_methods.read().await;
101        let block_hash = rpc_methods
102            .chain_get_block_hash(Some(block_number.into()))
103            .await?
104            .ok_or_else(|| anyhow::anyhow!("Block {} not found", block_number))?;
105
106        let block = client.blocks().at(block_hash).await?;
107        Self::get_transfers_from_block(&client, block).await
108    }
109
110    /// Extract transfers from a block
111    async fn get_transfers_from_block(
112        client: &OnlineClient<PolkadotConfig>,
113        block: subxt::blocks::Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
114    ) -> Result<Vec<TransferInfo>> {
115        let _ = client; // Used for type consistency, block already contains client ref
116        let mut transfers = Vec::new();
117        let block_num = block.number();
118        let events = block.events().await?;
119
120        for (idx, event) in events.iter().enumerate() {
121            if let Ok(ev) = event {
122                if ev.pallet_name() == "Balances" && ev.variant_name() == "Transfer" {
123                    if let Some(transfer) = Self::extract_transfer(&ev, block_num, idx) {
124                        transfers.push(transfer);
125                    }
126                }
127            }
128        }
129
130        Ok(transfers)
131    }
132
133    /// Extract transfer details from event
134    fn extract_transfer(
135        ev: &subxt::events::EventDetails<PolkadotConfig>,
136        block_number: u32,
137        event_index: usize,
138    ) -> Option<TransferInfo> {
139        let fields = ev.field_values().ok()?;
140
141        // Handle both named and unnamed fields
142        let (from, to, amount) = match fields {
143            subxt::ext::scale_value::Composite::Named(named_fields) => {
144                let mut from = None;
145                let mut to = None;
146                let mut amount = None;
147
148                for (name, value) in named_fields {
149                    match name.as_str() {
150                        "from" => from = extract_account_hex(&value),
151                        "to" => to = extract_account_hex(&value),
152                        "amount" => amount = Some(value.to_string()),
153                        _ => {}
154                    }
155                }
156
157                (from?, to?, amount?)
158            }
159            subxt::ext::scale_value::Composite::Unnamed(unnamed_fields) => {
160                if unnamed_fields.len() < 3 {
161                    return None;
162                }
163
164                let from = extract_account_hex(&unnamed_fields[0])?;
165                let to = extract_account_hex(&unnamed_fields[1])?;
166                let amount = unnamed_fields[2].to_string();
167
168                (from, to, amount)
169            }
170        };
171
172        Some(TransferInfo {
173            from,
174            to,
175            amount,
176            block_number,
177            event_index,
178        })
179    }
180
181    /// Poll for new transfers continuously
182    pub async fn poll_transfers<F>(
183        &self,
184        mut last_block: u32,
185        interval: tokio::time::Duration,
186        mut callback: F,
187    ) -> Result<()>
188    where
189        F: FnMut(Vec<TransferInfo>) -> Result<()>,
190    {
191        let mut ticker = tokio::time::interval(interval);
192
193        loop {
194            ticker.tick().await;
195
196            let current_block = self.get_current_block().await?;
197
198            if current_block > last_block {
199                // Get transfers from latest block only
200                let transfers = self.get_latest_transfers().await?;
201
202                if !transfers.is_empty() {
203                    info!(
204                        "Found {} transfers in block {}",
205                        transfers.len(),
206                        current_block
207                    );
208                    callback(transfers)?;
209                }
210
211                last_block = current_block;
212            }
213        }
214    }
215
216    /// Get the endpoint URL
217    pub fn endpoint(&self) -> &str {
218        &self.endpoint
219    }
220
221    /// Check if a block is likely too old to be available on non-archive nodes.
222    /// Non-archive nodes typically keep ~256 blocks (~51 minutes at 12s/block).
223    pub fn is_block_likely_pruned(
224        current_block: u32,
225        target_block: u32,
226        retention_blocks: u32,
227    ) -> bool {
228        if target_block > current_block {
229            return false;
230        }
231        current_block - target_block > retention_blocks
232    }
233}
234
235/// Extract account hex from nested composite value
236fn extract_account_hex(value: &subxt::ext::scale_value::Value<u32>) -> Option<String> {
237    // Convert to string and extract byte values from nested structure
238    let value_str = value.to_string();
239
240    // Handle nested composite: ((byte1, byte2, ...))
241    let cleaned = value_str
242        .trim_start_matches('(')
243        .trim_end_matches(')')
244        .trim_start_matches('(')
245        .trim_end_matches(')');
246
247    // Parse comma-separated bytes
248    let bytes: Vec<u8> = cleaned
249        .split(',')
250        .filter_map(|s| s.trim().parse::<u8>().ok())
251        .collect();
252
253    // Must be exactly 32 bytes for AccountId32
254    if bytes.len() == 32 {
255        Some(bytes.iter().map(|b| format!("{:02x}", b)).collect())
256    } else {
257        debug!("Invalid account bytes length: {}", bytes.len());
258        None
259    }
260}
261
262#[cfg(test)]
263mod tests {
264
265    #[test]
266    fn test_extract_account_hex() {
267        // Test with a mock Value that produces the expected string format
268        // In reality, this would be a subxt Value, but we can test the logic
269        // by directly testing the string parsing
270
271        // Simulate the nested composite format: ((byte1, byte2, ...))
272        let test_str = "((126, 85, 233, 164, 31, 92, 185, 17, 101, 198, 143, 31, 141, 41, 187, 43, 115, 147, 93, 29, 237, 199, 253, 100, 235, 33, 224, 71, 168, 155, 113, 242))";
273
274        // Extract bytes from the string
275        let cleaned = test_str
276            .trim_start_matches('(')
277            .trim_end_matches(')')
278            .trim_start_matches('(')
279            .trim_end_matches(')');
280
281        let bytes: Vec<u8> = cleaned
282            .split(',')
283            .filter_map(|s| s.trim().parse::<u8>().ok())
284            .collect();
285
286        assert_eq!(bytes.len(), 32);
287
288        let hex: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
289        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
290        assert_eq!(&hex[0..2], "7e"); // First byte is 126 = 0x7e
291        assert_eq!(&hex[2..4], "55"); // Second byte is 85 = 0x55
292    }
293}