Skip to main content

uhash_prover/cpu/
parallel.rs

1//! Multi-threaded CPU solver with persistent thread pool.
2//!
3//! Workers are created once and reused across batches, eliminating per-batch
4//! thread creation overhead and 2 MB UniversalHash scratchpad allocations.
5
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
8use std::sync::{Arc, Mutex};
9
10use anyhow::Result;
11use uhash_core::{meets_difficulty, UniversalHash};
12
13use crate::solver::{ProofResult, Solver};
14
15/// A single unit of work dispatched to one worker.
16struct Job {
17    challenge: Arc<Vec<u8>>,
18    start_nonce: u64,
19    total_lanes: usize,
20    tid: usize,
21    thread_count: usize,
22    /// `Some(difficulty)` → proof-search mode; `None` → benchmark mode.
23    difficulty: Option<u32>,
24    found: Arc<AtomicBool>,
25    winner: Arc<Mutex<ProofResult>>,
26    /// Shared counter for actual hashes computed across all workers.
27    actual_hashes: Arc<AtomicUsize>,
28}
29
30/// Pool of persistent worker threads.
31struct WorkerPool {
32    thread_count: usize,
33    /// One sender per worker — each worker blocks on its own receiver.
34    job_txs: Vec<SyncSender<Option<Job>>>,
35    /// Workers send `()` here after finishing a job; main collects N signals.
36    done_rx: Receiver<()>,
37}
38
39impl WorkerPool {
40    fn new(thread_count: usize) -> Self {
41        // Capacity = thread_count so all workers can signal without blocking.
42        let (done_tx, done_rx) = sync_channel::<()>(thread_count);
43        let mut job_txs = Vec::with_capacity(thread_count);
44
45        for _ in 0..thread_count {
46            let (job_tx, job_rx) = sync_channel::<Option<Job>>(1);
47            let done_tx = done_tx.clone();
48
49            std::thread::spawn(move || {
50                // Allocate the 2 MB scratchpad once for the lifetime of this thread.
51                let mut hasher = UniversalHash::new();
52                // Input buffer grown to the right size on first use and reused.
53                let mut input = Vec::with_capacity(128);
54
55                while let Ok(Some(job)) = job_rx.recv() {
56                    let mut lane = job.tid;
57                    let mut count = 0usize;
58
59                    if let Some(d) = job.difficulty {
60                        // Proof-search: check difficulty, stop early on found.
61                        while lane < job.total_lanes && !job.found.load(Ordering::Relaxed) {
62                            let nonce = job.start_nonce.saturating_add(lane as u64);
63                            input.clear();
64                            input.extend_from_slice(&job.challenge);
65                            input.extend_from_slice(&nonce.to_le_bytes());
66                            let hash = hasher.hash(&input);
67                            count += 1;
68                            if meets_difficulty(&hash, d) {
69                                if !job.found.swap(true, Ordering::Relaxed) {
70                                    let mut guard =
71                                        job.winner.lock().expect("winner mutex poisoned");
72                                    *guard = Some((nonce, hash));
73                                }
74                                break;
75                            }
76                            lane += job.thread_count;
77                        }
78                    } else {
79                        // Benchmark: hash all assigned lanes, no difficulty check.
80                        while lane < job.total_lanes {
81                            let nonce = job.start_nonce.saturating_add(lane as u64);
82                            input.clear();
83                            input.extend_from_slice(&job.challenge);
84                            input.extend_from_slice(&nonce.to_le_bytes());
85                            let _ = hasher.hash(&input);
86                            count += 1;
87                            lane += job.thread_count;
88                        }
89                    }
90
91                    job.actual_hashes.fetch_add(count, Ordering::Relaxed);
92                    let _ = done_tx.send(());
93                }
94                // `None` received → thread exits cleanly.
95            });
96
97            job_txs.push(job_tx);
98        }
99
100        WorkerPool {
101            thread_count,
102            job_txs,
103            done_rx,
104        }
105    }
106
107    /// Send one job to every worker and block until all complete.
108    /// Returns `(proof_result, actual_hashes_computed)`.
109    fn dispatch(
110        &mut self,
111        challenge: Arc<Vec<u8>>,
112        start_nonce: u64,
113        total_lanes: usize,
114        difficulty: Option<u32>,
115    ) -> (ProofResult, usize) {
116        let found = Arc::new(AtomicBool::new(false));
117        let winner = Arc::new(Mutex::new(None::<(u64, [u8; 32])>));
118        let actual_hashes = Arc::new(AtomicUsize::new(0));
119
120        for (tid, tx) in self.job_txs.iter().enumerate() {
121            let job = Job {
122                challenge: Arc::clone(&challenge),
123                start_nonce,
124                total_lanes,
125                tid,
126                thread_count: self.thread_count,
127                difficulty,
128                found: Arc::clone(&found),
129                winner: Arc::clone(&winner),
130                actual_hashes: Arc::clone(&actual_hashes),
131            };
132            tx.send(Some(job))
133                .expect("worker channel closed unexpectedly");
134        }
135
136        // Collect completion signal from every worker.
137        for _ in 0..self.thread_count {
138            self.done_rx.recv().expect("worker done signal lost");
139        }
140
141        let result = *winner.lock().expect("winner mutex poisoned");
142        (result, actual_hashes.load(Ordering::Relaxed))
143    }
144}
145
146impl Drop for WorkerPool {
147    fn drop(&mut self) {
148        // Signal each worker to exit by sending `None`.
149        for tx in &self.job_txs {
150            let _ = tx.send(None);
151        }
152        // Workers will exit after receiving None; OS reclaims threads.
153    }
154}
155
156/// Multi-threaded CPU solver backed by a persistent thread pool.
157///
158/// Threads and their 2 MB scratchpads are allocated once on first use and
159/// reused for every subsequent batch, eliminating per-batch overhead.
160pub struct ParallelCpuSolver {
161    threads: usize,
162    pool: Option<WorkerPool>,
163}
164
165impl ParallelCpuSolver {
166    /// Create a new parallel solver with the given thread count.
167    /// If `threads` is 0, it will be resolved in `recommended_lanes`.
168    pub fn new(threads: usize) -> Self {
169        Self {
170            threads,
171            pool: None,
172        }
173    }
174
175    fn effective_threads(&self) -> usize {
176        if self.threads == 0 {
177            std::thread::available_parallelism()
178                .map(|n| n.get())
179                .unwrap_or(1)
180        } else {
181            self.threads
182        }
183    }
184
185    fn ensure_pool(&mut self) {
186        if self.pool.is_none() {
187            let n = self.effective_threads();
188            self.pool = Some(WorkerPool::new(n));
189        }
190    }
191}
192
193impl Solver for ParallelCpuSolver {
194    fn backend_name(&self) -> &'static str {
195        "cpu"
196    }
197
198    fn recommended_lanes(&mut self, requested: usize) -> usize {
199        if requested == 0 {
200            self.effective_threads() * 1024
201        } else {
202            requested
203        }
204    }
205
206    fn find_proof_batch(
207        &mut self,
208        header_without_nonce: &[u8],
209        start_nonce: u64,
210        lanes: usize,
211        difficulty: u32,
212    ) -> Result<(ProofResult, usize)> {
213        self.ensure_pool();
214        let challenge = Arc::new(header_without_nonce.to_vec());
215        let (result, actual) =
216            self.pool
217                .as_mut()
218                .unwrap()
219                .dispatch(challenge, start_nonce, lanes, Some(difficulty));
220        Ok((result, actual))
221    }
222
223    fn benchmark_hashes(
224        &mut self,
225        header_without_nonce: &[u8],
226        start_nonce: u64,
227        lanes: usize,
228    ) -> Result<usize> {
229        self.ensure_pool();
230        let challenge = Arc::new(header_without_nonce.to_vec());
231        let (_, actual) = self
232            .pool
233            .as_mut()
234            .unwrap()
235            .dispatch(challenge, start_nonce, lanes, None);
236        Ok(actual)
237    }
238}