rustywallet_electrum/
batch.rs

1//! Optimized batch request utilities.
2//!
3//! This module provides utilities for efficiently batching multiple
4//! Electrum requests into single network round-trips.
5
6use std::collections::HashMap;
7
8use crate::client::ElectrumClient;
9use crate::error::Result;
10use crate::types::{Balance, TxHistory, Utxo};
11
12/// Maximum number of requests per batch.
13pub const MAX_BATCH_SIZE: usize = 100;
14
15/// Batch request builder for efficient multi-address queries.
16pub struct BatchRequest<'a> {
17    client: &'a ElectrumClient,
18    balance_addresses: Vec<String>,
19    utxo_addresses: Vec<String>,
20    history_addresses: Vec<String>,
21    transactions: Vec<String>,
22}
23
24impl<'a> BatchRequest<'a> {
25    /// Create a new batch request builder.
26    pub fn new(client: &'a ElectrumClient) -> Self {
27        Self {
28            client,
29            balance_addresses: Vec::new(),
30            utxo_addresses: Vec::new(),
31            history_addresses: Vec::new(),
32            transactions: Vec::new(),
33        }
34    }
35
36    /// Add addresses for balance queries.
37    pub fn balances(mut self, addresses: impl IntoIterator<Item = impl Into<String>>) -> Self {
38        self.balance_addresses.extend(addresses.into_iter().map(|a| a.into()));
39        self
40    }
41
42    /// Add a single address for balance query.
43    pub fn balance(mut self, address: impl Into<String>) -> Self {
44        self.balance_addresses.push(address.into());
45        self
46    }
47
48    /// Add addresses for UTXO queries.
49    pub fn utxos(mut self, addresses: impl IntoIterator<Item = impl Into<String>>) -> Self {
50        self.utxo_addresses.extend(addresses.into_iter().map(|a| a.into()));
51        self
52    }
53
54    /// Add a single address for UTXO query.
55    pub fn utxo(mut self, address: impl Into<String>) -> Self {
56        self.utxo_addresses.push(address.into());
57        self
58    }
59
60    /// Add addresses for history queries.
61    pub fn histories(mut self, addresses: impl IntoIterator<Item = impl Into<String>>) -> Self {
62        self.history_addresses.extend(addresses.into_iter().map(|a| a.into()));
63        self
64    }
65
66    /// Add a single address for history query.
67    pub fn history(mut self, address: impl Into<String>) -> Self {
68        self.history_addresses.push(address.into());
69        self
70    }
71
72    /// Add transaction IDs to fetch.
73    pub fn transactions(mut self, txids: impl IntoIterator<Item = impl Into<String>>) -> Self {
74        self.transactions.extend(txids.into_iter().map(|t| t.into()));
75        self
76    }
77
78    /// Add a single transaction ID to fetch.
79    pub fn transaction(mut self, txid: impl Into<String>) -> Self {
80        self.transactions.push(txid.into());
81        self
82    }
83
84    /// Execute the batch request.
85    pub async fn execute(self) -> Result<BatchResponse> {
86        let mut response = BatchResponse::new();
87
88        // Execute balance queries
89        if !self.balance_addresses.is_empty() {
90            let addresses: Vec<&str> = self.balance_addresses.iter().map(|s| s.as_str()).collect();
91            let balances = self.client.get_balances(&addresses).await?;
92            
93            for (addr, bal) in self.balance_addresses.into_iter().zip(balances) {
94                response.balances.insert(addr, bal);
95            }
96        }
97
98        // Execute UTXO queries in batches
99        for chunk in self.utxo_addresses.chunks(MAX_BATCH_SIZE) {
100            for addr in chunk {
101                let utxos = self.client.list_unspent(addr).await?;
102                response.utxos.insert(addr.clone(), utxos);
103            }
104        }
105
106        // Execute history queries in batches
107        for chunk in self.history_addresses.chunks(MAX_BATCH_SIZE) {
108            for addr in chunk {
109                let history = self.client.get_history(addr).await?;
110                response.histories.insert(addr.clone(), history);
111            }
112        }
113
114        // Execute transaction queries in batches
115        for chunk in self.transactions.chunks(MAX_BATCH_SIZE) {
116            for txid in chunk {
117                let tx = self.client.get_transaction(txid).await?;
118                response.transactions.insert(txid.clone(), tx);
119            }
120        }
121
122        Ok(response)
123    }
124}
125
126/// Response from a batch request.
127#[derive(Debug, Clone, Default)]
128pub struct BatchResponse {
129    /// Balance results by address
130    pub balances: HashMap<String, Balance>,
131    /// UTXO results by address
132    pub utxos: HashMap<String, Vec<Utxo>>,
133    /// History results by address
134    pub histories: HashMap<String, Vec<TxHistory>>,
135    /// Transaction results by txid
136    pub transactions: HashMap<String, String>,
137}
138
139impl BatchResponse {
140    /// Create a new empty response.
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Get balance for an address.
146    pub fn get_balance(&self, address: &str) -> Option<&Balance> {
147        self.balances.get(address)
148    }
149
150    /// Get UTXOs for an address.
151    pub fn get_utxos(&self, address: &str) -> Option<&[Utxo]> {
152        self.utxos.get(address).map(|v| v.as_slice())
153    }
154
155    /// Get history for an address.
156    pub fn get_history(&self, address: &str) -> Option<&[TxHistory]> {
157        self.histories.get(address).map(|v| v.as_slice())
158    }
159
160    /// Get a transaction by txid.
161    pub fn get_transaction(&self, txid: &str) -> Option<&str> {
162        self.transactions.get(txid).map(|s| s.as_str())
163    }
164
165    /// Get total confirmed balance across all addresses.
166    pub fn total_confirmed(&self) -> u64 {
167        self.balances.values().map(|b| b.confirmed).sum()
168    }
169
170    /// Get total unconfirmed balance across all addresses.
171    pub fn total_unconfirmed(&self) -> i64 {
172        self.balances.values().map(|b| b.unconfirmed).sum()
173    }
174
175    /// Get all UTXOs across all addresses.
176    pub fn all_utxos(&self) -> Vec<(&str, &Utxo)> {
177        self.utxos
178            .iter()
179            .flat_map(|(addr, utxos)| utxos.iter().map(move |u| (addr.as_str(), u)))
180            .collect()
181    }
182
183    /// Get total UTXO value across all addresses.
184    pub fn total_utxo_value(&self) -> u64 {
185        self.utxos.values().flat_map(|v| v.iter()).map(|u| u.value).sum()
186    }
187
188    /// Get addresses with non-zero balance.
189    pub fn funded_addresses(&self) -> Vec<&str> {
190        self.balances
191            .iter()
192            .filter(|(_, b)| b.has_balance())
193            .map(|(a, _)| a.as_str())
194            .collect()
195    }
196
197    /// Check if any address has balance.
198    pub fn has_any_balance(&self) -> bool {
199        self.balances.values().any(|b| b.has_balance())
200    }
201}
202
203/// Parallel batch executor for very large address sets.
204pub struct ParallelBatchExecutor<'a> {
205    client: &'a ElectrumClient,
206    chunk_size: usize,
207}
208
209impl<'a> ParallelBatchExecutor<'a> {
210    /// Create a new parallel executor.
211    pub fn new(client: &'a ElectrumClient) -> Self {
212        Self {
213            client,
214            chunk_size: MAX_BATCH_SIZE,
215        }
216    }
217
218    /// Set the chunk size for parallel execution.
219    pub fn chunk_size(mut self, size: usize) -> Self {
220        self.chunk_size = size.min(MAX_BATCH_SIZE);
221        self
222    }
223
224    /// Get balances for many addresses in parallel chunks.
225    pub async fn get_balances(&self, addresses: &[&str]) -> Result<HashMap<String, Balance>> {
226        let mut results = HashMap::new();
227
228        for chunk in addresses.chunks(self.chunk_size) {
229            let balances = self.client.get_balances(chunk).await?;
230            
231            for (addr, bal) in chunk.iter().zip(balances) {
232                results.insert(addr.to_string(), bal);
233            }
234        }
235
236        Ok(results)
237    }
238
239    /// Get UTXOs for many addresses.
240    pub async fn list_unspent(&self, addresses: &[&str]) -> Result<HashMap<String, Vec<Utxo>>> {
241        let mut results = HashMap::new();
242
243        for addr in addresses {
244            let utxos = self.client.list_unspent(addr).await?;
245            results.insert(addr.to_string(), utxos);
246        }
247
248        Ok(results)
249    }
250
251    /// Get history for many addresses.
252    pub async fn get_histories(&self, addresses: &[&str]) -> Result<HashMap<String, Vec<TxHistory>>> {
253        let mut results = HashMap::new();
254
255        for addr in addresses {
256            let history = self.client.get_history(addr).await?;
257            results.insert(addr.to_string(), history);
258        }
259
260        Ok(results)
261    }
262}
263
264/// Scan addresses until gap limit is reached.
265pub struct GapLimitScanner<'a> {
266    client: &'a ElectrumClient,
267    gap_limit: usize,
268}
269
270impl<'a> GapLimitScanner<'a> {
271    /// Create a new gap limit scanner.
272    pub fn new(client: &'a ElectrumClient, gap_limit: usize) -> Self {
273        Self { client, gap_limit }
274    }
275
276    /// Scan addresses and return those with balance.
277    ///
278    /// Stops when `gap_limit` consecutive addresses have no history.
279    pub async fn scan<F>(&self, mut address_generator: F) -> Result<Vec<(usize, String, Balance)>>
280    where
281        F: FnMut(usize) -> String,
282    {
283        let mut results = Vec::new();
284        let mut consecutive_empty = 0;
285        let mut index = 0;
286
287        while consecutive_empty < self.gap_limit {
288            let address = address_generator(index);
289            let history = self.client.get_history(&address).await?;
290
291            if history.is_empty() {
292                consecutive_empty += 1;
293            } else {
294                consecutive_empty = 0;
295                let balance = self.client.get_balance(&address).await?;
296                results.push((index, address, balance));
297            }
298
299            index += 1;
300        }
301
302        Ok(results)
303    }
304
305    /// Scan with batch queries for better performance.
306    pub async fn scan_batch<F>(&self, mut address_generator: F, batch_size: usize) -> Result<Vec<(usize, String, Balance)>>
307    where
308        F: FnMut(usize) -> String,
309    {
310        let mut results = Vec::new();
311        let mut consecutive_empty = 0;
312        let mut index = 0;
313
314        while consecutive_empty < self.gap_limit {
315            // Generate a batch of addresses
316            let batch: Vec<(usize, String)> = (0..batch_size)
317                .map(|i| {
318                    let idx = index + i;
319                    (idx, address_generator(idx))
320                })
321                .collect();
322
323            let addresses: Vec<&str> = batch.iter().map(|(_, a)| a.as_str()).collect();
324            let balances = self.client.get_balances(&addresses).await?;
325
326            let mut batch_empty = true;
327            for ((idx, addr), balance) in batch.into_iter().zip(balances) {
328                if balance.has_balance() {
329                    results.push((idx, addr, balance));
330                    consecutive_empty = 0;
331                    batch_empty = false;
332                } else {
333                    consecutive_empty += 1;
334                    if consecutive_empty >= self.gap_limit {
335                        break;
336                    }
337                }
338            }
339
340            if batch_empty {
341                // All addresses in batch were empty
342                break;
343            }
344
345            index += batch_size;
346        }
347
348        Ok(results)
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_batch_response() {
358        let mut response = BatchResponse::new();
359        
360        response.balances.insert(
361            "addr1".to_string(),
362            Balance { confirmed: 1000, unconfirmed: 0 },
363        );
364        response.balances.insert(
365            "addr2".to_string(),
366            Balance { confirmed: 2000, unconfirmed: 100 },
367        );
368
369        assert_eq!(response.total_confirmed(), 3000);
370        assert_eq!(response.total_unconfirmed(), 100);
371        assert!(response.has_any_balance());
372        assert_eq!(response.funded_addresses().len(), 2);
373    }
374
375    #[test]
376    fn test_batch_response_utxos() {
377        let mut response = BatchResponse::new();
378        
379        response.utxos.insert(
380            "addr1".to_string(),
381            vec![
382                Utxo { txid: "tx1".into(), vout: 0, value: 1000, height: 100 },
383                Utxo { txid: "tx2".into(), vout: 1, value: 2000, height: 101 },
384            ],
385        );
386
387        assert_eq!(response.total_utxo_value(), 3000);
388        assert_eq!(response.all_utxos().len(), 2);
389    }
390
391    #[test]
392    fn test_max_batch_size() {
393        assert!(MAX_BATCH_SIZE > 0);
394        assert!(MAX_BATCH_SIZE <= 1000);
395    }
396}