uhash_prover/cpu/
parallel.rs1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
8use std::sync::{Arc, Mutex};
9
10use anyhow::Result;
11use uhash_core::{meets_difficulty, UniversalHash};
12
13use crate::solver::{ProofResult, Solver};
14
15struct Job {
17 challenge: Arc<Vec<u8>>,
18 start_nonce: u64,
19 total_lanes: usize,
20 tid: usize,
21 thread_count: usize,
22 difficulty: Option<u32>,
24 found: Arc<AtomicBool>,
25 winner: Arc<Mutex<ProofResult>>,
26 actual_hashes: Arc<AtomicUsize>,
28}
29
30struct WorkerPool {
32 thread_count: usize,
33 job_txs: Vec<SyncSender<Option<Job>>>,
35 done_rx: Receiver<()>,
37}
38
39impl WorkerPool {
40 fn new(thread_count: usize) -> Self {
41 let (done_tx, done_rx) = sync_channel::<()>(thread_count);
43 let mut job_txs = Vec::with_capacity(thread_count);
44
45 for _ in 0..thread_count {
46 let (job_tx, job_rx) = sync_channel::<Option<Job>>(1);
47 let done_tx = done_tx.clone();
48
49 std::thread::spawn(move || {
50 let mut hasher = UniversalHash::new();
52 let mut input = Vec::with_capacity(128);
54
55 while let Ok(Some(job)) = job_rx.recv() {
56 let mut lane = job.tid;
57 let mut count = 0usize;
58
59 if let Some(d) = job.difficulty {
60 while lane < job.total_lanes && !job.found.load(Ordering::Relaxed) {
62 let nonce = job.start_nonce.saturating_add(lane as u64);
63 input.clear();
64 input.extend_from_slice(&job.challenge);
65 input.extend_from_slice(&nonce.to_le_bytes());
66 let hash = hasher.hash(&input);
67 count += 1;
68 if meets_difficulty(&hash, d) {
69 if !job.found.swap(true, Ordering::Relaxed) {
70 let mut guard =
71 job.winner.lock().expect("winner mutex poisoned");
72 *guard = Some((nonce, hash));
73 }
74 break;
75 }
76 lane += job.thread_count;
77 }
78 } else {
79 while lane < job.total_lanes {
81 let nonce = job.start_nonce.saturating_add(lane as u64);
82 input.clear();
83 input.extend_from_slice(&job.challenge);
84 input.extend_from_slice(&nonce.to_le_bytes());
85 let _ = hasher.hash(&input);
86 count += 1;
87 lane += job.thread_count;
88 }
89 }
90
91 job.actual_hashes.fetch_add(count, Ordering::Relaxed);
92 let _ = done_tx.send(());
93 }
94 });
96
97 job_txs.push(job_tx);
98 }
99
100 WorkerPool {
101 thread_count,
102 job_txs,
103 done_rx,
104 }
105 }
106
107 fn dispatch(
110 &mut self,
111 challenge: Arc<Vec<u8>>,
112 start_nonce: u64,
113 total_lanes: usize,
114 difficulty: Option<u32>,
115 ) -> (ProofResult, usize) {
116 let found = Arc::new(AtomicBool::new(false));
117 let winner = Arc::new(Mutex::new(None::<(u64, [u8; 32])>));
118 let actual_hashes = Arc::new(AtomicUsize::new(0));
119
120 for (tid, tx) in self.job_txs.iter().enumerate() {
121 let job = Job {
122 challenge: Arc::clone(&challenge),
123 start_nonce,
124 total_lanes,
125 tid,
126 thread_count: self.thread_count,
127 difficulty,
128 found: Arc::clone(&found),
129 winner: Arc::clone(&winner),
130 actual_hashes: Arc::clone(&actual_hashes),
131 };
132 tx.send(Some(job))
133 .expect("worker channel closed unexpectedly");
134 }
135
136 for _ in 0..self.thread_count {
138 self.done_rx.recv().expect("worker done signal lost");
139 }
140
141 let result = *winner.lock().expect("winner mutex poisoned");
142 (result, actual_hashes.load(Ordering::Relaxed))
143 }
144}
145
146impl Drop for WorkerPool {
147 fn drop(&mut self) {
148 for tx in &self.job_txs {
150 let _ = tx.send(None);
151 }
152 }
154}
155
156pub struct ParallelCpuSolver {
161 threads: usize,
162 pool: Option<WorkerPool>,
163}
164
165impl ParallelCpuSolver {
166 pub fn new(threads: usize) -> Self {
169 Self {
170 threads,
171 pool: None,
172 }
173 }
174
175 fn effective_threads(&self) -> usize {
176 if self.threads == 0 {
177 std::thread::available_parallelism()
178 .map(|n| n.get())
179 .unwrap_or(1)
180 } else {
181 self.threads
182 }
183 }
184
185 fn ensure_pool(&mut self) {
186 if self.pool.is_none() {
187 let n = self.effective_threads();
188 self.pool = Some(WorkerPool::new(n));
189 }
190 }
191}
192
193impl Solver for ParallelCpuSolver {
194 fn backend_name(&self) -> &'static str {
195 "cpu"
196 }
197
198 fn recommended_lanes(&mut self, requested: usize) -> usize {
199 if requested == 0 {
200 self.effective_threads() * 1024
201 } else {
202 requested
203 }
204 }
205
206 fn find_proof_batch(
207 &mut self,
208 header_without_nonce: &[u8],
209 start_nonce: u64,
210 lanes: usize,
211 difficulty: u32,
212 ) -> Result<(ProofResult, usize)> {
213 self.ensure_pool();
214 let challenge = Arc::new(header_without_nonce.to_vec());
215 let (result, actual) =
216 self.pool
217 .as_mut()
218 .unwrap()
219 .dispatch(challenge, start_nonce, lanes, Some(difficulty));
220 Ok((result, actual))
221 }
222
223 fn benchmark_hashes(
224 &mut self,
225 header_without_nonce: &[u8],
226 start_nonce: u64,
227 lanes: usize,
228 ) -> Result<usize> {
229 self.ensure_pool();
230 let challenge = Arc::new(header_without_nonce.to_vec());
231 let (_, actual) = self
232 .pool
233 .as_mut()
234 .unwrap()
235 .dispatch(challenge, start_nonce, lanes, None);
236 Ok(actual)
237 }
238}