rustywallet_vanity/
distributed.rs

1//! Distributed vanity address search.
2//!
3//! This module provides infrastructure for distributing vanity address
4//! search across multiple workers, either locally or over a network.
5
6use crate::address_type::AddressType;
7use crate::error::VanityError;
8use crate::result::{SearchStats, VanityResult};
9use rustywallet_keys::private_key::PrivateKey;
10use serde::{Deserialize, Serialize};
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15/// A work unit for distributed search.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct WorkUnit {
18    /// Unique identifier for this work unit
19    pub id: u64,
20    /// Pattern to search for
21    pub pattern: String,
22    /// Address type to generate
23    pub address_type: String,
24    /// Whether pattern matching is case-insensitive
25    pub case_insensitive: bool,
26    /// Number of keys to check in this unit
27    pub key_count: usize,
28    /// Whether to use testnet
29    pub testnet: bool,
30}
31
32impl WorkUnit {
33    /// Create a new work unit.
34    pub fn new(
35        id: u64,
36        pattern: &str,
37        address_type: AddressType,
38        case_insensitive: bool,
39        key_count: usize,
40        testnet: bool,
41    ) -> Self {
42        Self {
43            id,
44            pattern: pattern.to_string(),
45            address_type: format!("{:?}", address_type),
46            case_insensitive,
47            key_count,
48            testnet,
49        }
50    }
51
52    /// Parse address type from string.
53    pub fn get_address_type(&self) -> Result<AddressType, VanityError> {
54        match self.address_type.as_str() {
55            "P2PKH" => Ok(AddressType::P2PKH),
56            "P2WPKH" => Ok(AddressType::P2WPKH),
57            "P2TR" => Ok(AddressType::P2TR),
58            "Ethereum" => Ok(AddressType::Ethereum),
59            _ => Err(VanityError::InvalidConfig(format!(
60                "Unknown address type: {}",
61                self.address_type
62            ))),
63        }
64    }
65}
66
67/// Result from processing a work unit.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct WorkResult {
70    /// Work unit ID
71    pub work_id: u64,
72    /// Worker ID
73    pub worker_id: String,
74    /// Whether a match was found
75    pub found: bool,
76    /// The matching address (if found)
77    pub address: Option<String>,
78    /// The private key in WIF format (if found)
79    pub private_key_wif: Option<String>,
80    /// Number of keys checked
81    pub keys_checked: u64,
82    /// Time taken in milliseconds
83    pub duration_ms: u64,
84}
85
86impl WorkResult {
87    /// Create a result indicating no match found.
88    pub fn not_found(work_id: u64, worker_id: &str, keys_checked: u64, duration_ms: u64) -> Self {
89        Self {
90            work_id,
91            worker_id: worker_id.to_string(),
92            found: false,
93            address: None,
94            private_key_wif: None,
95            keys_checked,
96            duration_ms,
97        }
98    }
99
100    /// Create a result indicating a match was found.
101    pub fn found(
102        work_id: u64,
103        worker_id: &str,
104        address: &str,
105        private_key_wif: &str,
106        keys_checked: u64,
107        duration_ms: u64,
108    ) -> Self {
109        Self {
110            work_id,
111            worker_id: worker_id.to_string(),
112            found: true,
113            address: Some(address.to_string()),
114            private_key_wif: Some(private_key_wif.to_string()),
115            keys_checked,
116            duration_ms,
117        }
118    }
119}
120
121/// Coordinator for distributed vanity search.
122///
123/// The coordinator manages work distribution and result collection
124/// across multiple workers.
125pub struct SearchCoordinator {
126    /// Pattern to search for
127    pattern: String,
128    /// Address type
129    address_type: AddressType,
130    /// Case insensitive matching
131    case_insensitive: bool,
132    /// Use testnet
133    testnet: bool,
134    /// Keys per work unit
135    keys_per_unit: usize,
136    /// Next work unit ID
137    next_id: AtomicU64,
138    /// Total keys checked
139    total_checked: AtomicU64,
140    /// Whether search is complete
141    found: AtomicBool,
142    /// Start time
143    start_time: Instant,
144}
145
146impl SearchCoordinator {
147    /// Create a new search coordinator.
148    pub fn new(
149        pattern: &str,
150        address_type: AddressType,
151        case_insensitive: bool,
152        testnet: bool,
153    ) -> Self {
154        Self {
155            pattern: pattern.to_string(),
156            address_type,
157            case_insensitive,
158            testnet,
159            keys_per_unit: 100_000,
160            next_id: AtomicU64::new(0),
161            total_checked: AtomicU64::new(0),
162            found: AtomicBool::new(false),
163            start_time: Instant::now(),
164        }
165    }
166
167    /// Set the number of keys per work unit.
168    pub fn keys_per_unit(mut self, count: usize) -> Self {
169        self.keys_per_unit = count;
170        self
171    }
172
173    /// Get the next work unit.
174    ///
175    /// Returns `None` if a match has already been found.
176    pub fn next_work_unit(&self) -> Option<WorkUnit> {
177        if self.found.load(Ordering::Relaxed) {
178            return None;
179        }
180
181        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
182        Some(WorkUnit::new(
183            id,
184            &self.pattern,
185            self.address_type,
186            self.case_insensitive,
187            self.keys_per_unit,
188            self.testnet,
189        ))
190    }
191
192    /// Submit a work result.
193    ///
194    /// Returns `true` if this result contains the winning match.
195    pub fn submit_result(&self, result: &WorkResult) -> bool {
196        self.total_checked.fetch_add(result.keys_checked, Ordering::Relaxed);
197
198        if result.found {
199            self.found.store(true, Ordering::Relaxed);
200            return true;
201        }
202
203        false
204    }
205
206    /// Check if search is complete.
207    pub fn is_complete(&self) -> bool {
208        self.found.load(Ordering::Relaxed)
209    }
210
211    /// Get current statistics.
212    pub fn stats(&self) -> SearchStats {
213        let elapsed = self.start_time.elapsed();
214        let total = self.total_checked.load(Ordering::Relaxed);
215        let rate = if elapsed.as_secs_f64() > 0.0 {
216            total as f64 / elapsed.as_secs_f64()
217        } else {
218            0.0
219        };
220
221        SearchStats {
222            attempts: total,
223            elapsed,
224            rate,
225        }
226    }
227}
228
229/// A worker for distributed vanity search.
230///
231/// Workers process work units and report results back to the coordinator.
232pub struct SearchWorker {
233    /// Worker identifier
234    id: String,
235    /// Stop signal
236    stop: Arc<AtomicBool>,
237}
238
239impl SearchWorker {
240    /// Create a new search worker.
241    pub fn new(id: &str) -> Self {
242        Self {
243            id: id.to_string(),
244            stop: Arc::new(AtomicBool::new(false)),
245        }
246    }
247
248    /// Get the worker ID.
249    pub fn id(&self) -> &str {
250        &self.id
251    }
252
253    /// Get a stop signal handle.
254    pub fn stop_signal(&self) -> Arc<AtomicBool> {
255        Arc::clone(&self.stop)
256    }
257
258    /// Signal the worker to stop.
259    pub fn stop(&self) {
260        self.stop.store(true, Ordering::Relaxed);
261    }
262
263    /// Process a work unit.
264    pub fn process(&self, work: &WorkUnit) -> Result<WorkResult, VanityError> {
265        use crate::pattern::Pattern;
266
267        let start = Instant::now();
268        let address_type = work.get_address_type()?;
269        let pattern = Pattern::prefix(&work.pattern)?;
270
271        // Search with limit
272        let mut checked = 0u64;
273        while checked < work.key_count as u64 {
274            if self.stop.load(Ordering::Relaxed) {
275                break;
276            }
277
278            // Generate and check a batch
279            let batch_size = 1000.min((work.key_count as u64 - checked) as usize);
280            
281            for _ in 0..batch_size {
282                let key = PrivateKey::random();
283                let address = address_type.derive_address(&key, work.testnet)
284                    .map_err(VanityError::AddressError)?;
285
286                if pattern.matches(&address, !work.case_insensitive) {
287                    let duration_ms = start.elapsed().as_millis() as u64;
288                    let wif = key.to_wif(rustywallet_keys::network::Network::Mainnet);
289                    return Ok(WorkResult::found(
290                        work.id,
291                        &self.id,
292                        &address,
293                        &wif,
294                        checked + 1,
295                        duration_ms,
296                    ));
297                }
298                checked += 1;
299            }
300        }
301
302        let duration_ms = start.elapsed().as_millis() as u64;
303        Ok(WorkResult::not_found(work.id, &self.id, checked, duration_ms))
304    }
305}
306
307/// Configuration for distributed search.
308#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct DistributedConfig {
310    /// Number of local workers
311    pub local_workers: usize,
312    /// Keys per work unit
313    pub keys_per_unit: usize,
314    /// Progress report interval in seconds
315    pub report_interval_secs: u64,
316}
317
318impl Default for DistributedConfig {
319    fn default() -> Self {
320        Self {
321            local_workers: num_cpus(),
322            keys_per_unit: 100_000,
323            report_interval_secs: 5,
324        }
325    }
326}
327
328/// Get the number of CPUs.
329fn num_cpus() -> usize {
330    std::thread::available_parallelism()
331        .map(|p| p.get())
332        .unwrap_or(4)
333}
334
335/// Run a distributed search with local workers.
336///
337/// This function spawns multiple worker threads and coordinates
338/// the search across them.
339pub fn run_distributed_search<F>(
340    pattern: &str,
341    address_type: AddressType,
342    case_insensitive: bool,
343    testnet: bool,
344    config: DistributedConfig,
345    mut progress_callback: F,
346) -> Result<Option<VanityResult>, VanityError>
347where
348    F: FnMut(&SearchStats),
349{
350    use crate::pattern::Pattern;
351    use std::sync::mpsc;
352    use std::thread;
353
354    let coordinator = Arc::new(SearchCoordinator::new(
355        pattern,
356        address_type,
357        case_insensitive,
358        testnet,
359    ).keys_per_unit(config.keys_per_unit));
360
361    let (tx, rx) = mpsc::channel::<WorkResult>();
362    let mut handles = Vec::new();
363
364    // Spawn workers
365    for i in 0..config.local_workers {
366        let coord = Arc::clone(&coordinator);
367        let tx = tx.clone();
368        let worker_id = format!("worker-{}", i);
369
370        let handle = thread::spawn(move || {
371            let worker = SearchWorker::new(&worker_id);
372
373            while let Some(work) = coord.next_work_unit() {
374                match worker.process(&work) {
375                    Ok(result) => {
376                        let found = result.found;
377                        let _ = tx.send(result);
378                        if found {
379                            break;
380                        }
381                    }
382                    Err(_) => break,
383                }
384            }
385        });
386
387        handles.push(handle);
388    }
389
390    // Drop the original sender so rx will close when all workers finish
391    drop(tx);
392
393    // Collect results
394    let mut result: Option<VanityResult> = None;
395    let mut last_report = Instant::now();
396    let report_interval = Duration::from_secs(config.report_interval_secs);
397
398    for work_result in rx {
399        coordinator.submit_result(&work_result);
400
401        if work_result.found {
402            if let (Some(addr), Some(wif)) = (&work_result.address, &work_result.private_key_wif) {
403                let key = PrivateKey::from_wif(wif)
404                    .map_err(|e| VanityError::GenerationFailed(e.to_string()))?;
405
406                let matched = Pattern::prefix(pattern)?;
407
408                result = Some(VanityResult::new(
409                    key,
410                    addr.clone(),
411                    matched,
412                    coordinator.stats(),
413                ));
414            }
415            break;
416        }
417
418        // Progress report
419        if last_report.elapsed() >= report_interval {
420            progress_callback(&coordinator.stats());
421            last_report = Instant::now();
422        }
423    }
424
425    // Wait for all workers to finish
426    for handle in handles {
427        let _ = handle.join();
428    }
429
430    // Final progress report
431    progress_callback(&coordinator.stats());
432
433    Ok(result)
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_work_unit_creation() {
442        let work = WorkUnit::new(1, "1A", AddressType::P2PKH, false, 1000, false);
443        assert_eq!(work.id, 1);
444        assert_eq!(work.pattern, "1A");
445        assert_eq!(work.key_count, 1000);
446    }
447
448    #[test]
449    fn test_work_result_not_found() {
450        let result = WorkResult::not_found(1, "worker-0", 1000, 100);
451        assert!(!result.found);
452        assert!(result.address.is_none());
453    }
454
455    #[test]
456    fn test_work_result_found() {
457        let result = WorkResult::found(1, "worker-0", "1ABC123", "WIF123", 500, 50);
458        assert!(result.found);
459        assert_eq!(result.address, Some("1ABC123".to_string()));
460    }
461
462    #[test]
463    fn test_coordinator_work_distribution() {
464        let coord = SearchCoordinator::new("1A", AddressType::P2PKH, false, false)
465            .keys_per_unit(1000);
466
467        let work1 = coord.next_work_unit().unwrap();
468        let work2 = coord.next_work_unit().unwrap();
469
470        assert_eq!(work1.id, 0);
471        assert_eq!(work2.id, 1);
472    }
473
474    #[test]
475    fn test_coordinator_completion() {
476        let coord = SearchCoordinator::new("1A", AddressType::P2PKH, false, false);
477
478        assert!(!coord.is_complete());
479
480        let result = WorkResult::found(0, "worker-0", "1ABC", "WIF", 100, 10);
481        coord.submit_result(&result);
482
483        assert!(coord.is_complete());
484        assert!(coord.next_work_unit().is_none());
485    }
486
487    #[test]
488    fn test_distributed_config_default() {
489        let config = DistributedConfig::default();
490        assert!(config.local_workers > 0);
491        assert!(config.keys_per_unit > 0);
492    }
493}