use std::{
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::Sender,
},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
#[cfg(feature = "wownero")]
use rust_randomwow::{Context, Hasher, Output};
#[cfg(not(feature = "wownero"))]
use rust_randomx::{Context, Hasher, Output};
use crate::{job::Job, throttle::Throttle};
pub struct Share {
pub job_id: String,
pub nonce_value: u32,
pub nonce_hex: String,
pub hash_hex: String,
}
pub struct Worker {
handles: Vec<JoinHandle<()>>,
running: Arc<AtomicBool>,
job_sender: watch::Sender<Option<Job>>,
}
impl Worker {
pub fn new(
thread_count: usize,
light: bool,
throttle: Throttle,
share_sender: Sender<Share>,
hash_count: Arc<AtomicU64>,
) -> Self {
let running = Arc::new(AtomicBool::new(true));
let (job_sender, job_receiver) = watch::channel(None);
let context_cache: Arc<Mutex<Option<Arc<Context>>>> = Arc::new(Mutex::new(None));
let mut handles = Vec::with_capacity(thread_count);
for thread_index in 0..thread_count {
let share_sender = share_sender.clone();
let running = running.clone();
let job_receiver = job_receiver.clone();
let context_cache = context_cache.clone();
let throttle = throttle.clone();
let hash_count = hash_count.clone();
handles.push(thread::spawn(move || {
mine(MineParams {
thread_index,
thread_count,
light,
context_cache,
job_receiver,
share_sender,
running,
throttle,
hash_count,
});
}));
}
Self {
handles,
running,
job_sender,
}
}
pub fn set_job(&self, job: Job) {
self.job_sender.send(Some(job));
}
pub fn stop(self) {
self.running.store(false, Ordering::Relaxed);
for handle in self.handles {
let _ = handle.join();
}
}
}
const NONCE_OFFSET: usize = 39;
struct MineParams {
thread_index: usize,
thread_count: usize,
light: bool,
context_cache: Arc<Mutex<Option<Arc<Context>>>>,
job_receiver: watch::Receiver<Option<Job>>,
share_sender: Sender<Share>,
running: Arc<AtomicBool>,
throttle: Throttle,
hash_count: Arc<AtomicU64>,
}
#[allow(clippy::too_many_lines)]
fn mine(mut params: MineParams) {
let MineParams {
thread_index,
thread_count,
light,
context_cache,
ref mut job_receiver,
ref share_sender,
running,
ref throttle,
ref hash_count,
} = params;
let mut current_seed: Vec<u8> = Vec::new();
let mut hasher: Option<Hasher> = None;
while running.load(Ordering::Relaxed) {
let Some(job) = job_receiver.get() else {
thread::sleep(Duration::from_millis(100));
job_receiver.wait();
continue;
};
if job.seed != current_seed {
let mut cache = context_cache.lock().unwrap();
let need_new = cache.as_ref().is_none_or(|cached| cached.key() != job.seed);
if need_new {
*cache = Some(Arc::new(Context::new(&job.seed, !light)));
}
let context = cache.clone().unwrap();
drop(cache);
match &mut hasher {
Some(existing) => existing.update(context),
None => hasher = Some(Hasher::new(context)),
}
current_seed.clone_from(&job.seed);
}
let Some(hasher) = &mut hasher else { continue };
let mut blob = job.hashing_blob.clone();
if blob.len() < NONCE_OFFSET + 4 {
continue;
}
let random_offset = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.subsec_nanos());
let start_nonce = random_offset.wrapping_add(thread_index as u32);
let step = thread_count as u32;
let threshold = job.threshold;
let mut nonces = (start_nonce..u32::MAX).step_by(thread_count);
let Some(first_nonce) = nonces.next() else {
continue;
};
blob[NONCE_OFFSET..NONCE_OFFSET + 4].copy_from_slice(&first_nonce.to_le_bytes());
hasher.hash_first(&blob);
let mut previous_nonce = first_nonce;
let mut batch_start = Instant::now();
for nonce in nonces {
if !running.load(Ordering::Relaxed) || job_receiver.has_changed() {
let _ = hasher.hash_last();
break;
}
blob[NONCE_OFFSET..NONCE_OFFSET + 4].copy_from_slice(&nonce.to_le_bytes());
let output = hasher.hash_next(&blob);
hash_count.fetch_add(1, Ordering::Relaxed);
check_and_submit(output, previous_nonce, threshold, &job.id, share_sender);
previous_nonce = nonce;
if (nonce / step).is_multiple_of(16) {
let fraction = throttle.fraction();
if fraction < 1.0 {
let hash_duration = batch_start.elapsed();
let sleep = hash_duration.mul_f32((1.0 - fraction) / fraction);
thread::sleep(sleep);
}
batch_start = Instant::now();
}
}
}
}
fn check_and_submit(
output: Output,
nonce: u32,
threshold: u64,
job_id: &str,
share_sender: &Sender<Share>,
) {
let hash_bytes: &[u8] = output.as_ref();
let mut tail = [0u8; 8];
tail.copy_from_slice(&hash_bytes[24..32]);
let hash_value = u64::from_le_bytes(tail);
if hash_value < threshold {
let _ = share_sender.send(Share {
job_id: job_id.into(),
nonce_value: nonce,
nonce_hex: hex::encode(nonce.to_le_bytes()),
hash_hex: hex::encode(hash_bytes),
});
}
}
pub mod watch {
use std::sync::{Arc, Condvar, Mutex};
struct Inner<T> {
value: Mutex<(T, u64)>,
notify: Condvar,
}
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
impl<T> Sender<T> {
pub fn send(&self, value: T) {
let mut guard = self.inner.value.lock().unwrap();
guard.0 = value;
guard.1 += 1;
drop(guard);
self.inner.notify.notify_all();
}
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
seen_version: u64,
}
impl<T: Clone> Receiver<T> {
pub fn get(&mut self) -> T {
let guard = self.inner.value.lock().unwrap();
self.seen_version = guard.1;
guard.0.clone()
}
pub fn has_changed(&self) -> bool {
let guard = self.inner.value.lock().unwrap();
guard.1 != self.seen_version
}
#[allow(clippy::significant_drop_tightening)]
pub fn wait(&self) {
let guard = self.inner.value.lock().unwrap();
if guard.1 != self.seen_version {
return;
}
let _guard = self.inner.notify.wait(guard).unwrap();
}
pub fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
seen_version: self.seen_version,
}
}
}
pub fn channel<T>(initial: T) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
value: Mutex::new((initial, 0)),
notify: Condvar::new(),
});
(
Sender {
inner: inner.clone(),
},
Receiver {
inner,
seen_version: 0,
},
)
}
}