opt-in-miner 0.4.1

Opt-in Monero/Wownero mining library for transparent application monetization
Documentation
use std::{
    sync::{
        Arc, Mutex,
        atomic::{AtomicBool, AtomicU64, Ordering},
        mpsc::Sender,
    },
    thread::{self, JoinHandle},
    time::{Duration, Instant},
};

#[cfg(feature = "wownero")]
use rust_randomwow::{Context, Hasher, Output};
#[cfg(not(feature = "wownero"))]
use rust_randomx::{Context, Hasher, Output};

use crate::{job::Job, throttle::Throttle};

pub struct Share {
    pub job_id: String,
    pub nonce_value: u32,
    pub nonce_hex: String,
    pub hash_hex: String,
}

pub struct Worker {
    handles: Vec<JoinHandle<()>>,
    running: Arc<AtomicBool>,
    job_sender: watch::Sender<Option<Job>>,
}

impl Worker {
    pub fn new(
        thread_count: usize,
        light: bool,
        throttle: Throttle,
        share_sender: Sender<Share>,
        hash_count: Arc<AtomicU64>,
    ) -> Self {
        let running = Arc::new(AtomicBool::new(true));
        let (job_sender, job_receiver) = watch::channel(None);
        let context_cache: Arc<Mutex<Option<Arc<Context>>>> = Arc::new(Mutex::new(None));

        let mut handles = Vec::with_capacity(thread_count);
        for thread_index in 0..thread_count {
            let share_sender = share_sender.clone();
            let running = running.clone();
            let job_receiver = job_receiver.clone();
            let context_cache = context_cache.clone();
            let throttle = throttle.clone();
            let hash_count = hash_count.clone();

            handles.push(thread::spawn(move || {
                mine(MineParams {
                    thread_index,
                    thread_count,
                    light,
                    context_cache,
                    job_receiver,
                    share_sender,
                    running,
                    throttle,
                    hash_count,
                });
            }));
        }

        Self {
            handles,
            running,
            job_sender,
        }
    }

    pub fn set_job(&self, job: Job) {
        self.job_sender.send(Some(job));
    }

    pub fn stop(self) {
        self.running.store(false, Ordering::Relaxed);
        for handle in self.handles {
            let _ = handle.join();
        }
    }
}

const NONCE_OFFSET: usize = 39;

struct MineParams {
    thread_index: usize,
    thread_count: usize,
    light: bool,
    context_cache: Arc<Mutex<Option<Arc<Context>>>>,
    job_receiver: watch::Receiver<Option<Job>>,
    share_sender: Sender<Share>,
    running: Arc<AtomicBool>,
    throttle: Throttle,
    hash_count: Arc<AtomicU64>,
}

#[allow(clippy::too_many_lines)]
fn mine(mut params: MineParams) {
    let MineParams {
        thread_index,
        thread_count,
        light,
        context_cache,
        ref mut job_receiver,
        ref share_sender,
        running,
        ref throttle,
        ref hash_count,
    } = params;
    let mut current_seed: Vec<u8> = Vec::new();
    let mut hasher: Option<Hasher> = None;

    while running.load(Ordering::Relaxed) {
        let Some(job) = job_receiver.get() else {
            thread::sleep(Duration::from_millis(100));
            job_receiver.wait();
            continue;
        };

        if job.seed != current_seed {
            let mut cache = context_cache.lock().unwrap();
            let need_new = cache.as_ref().is_none_or(|cached| cached.key() != job.seed);
            if need_new {
                *cache = Some(Arc::new(Context::new(&job.seed, !light)));
            }
            let context = cache.clone().unwrap();
            drop(cache);
            match &mut hasher {
                Some(existing) => existing.update(context),
                None => hasher = Some(Hasher::new(context)),
            }
            current_seed.clone_from(&job.seed);
        }

        let Some(hasher) = &mut hasher else { continue };

        let mut blob = job.hashing_blob.clone();
        if blob.len() < NONCE_OFFSET + 4 {
            continue;
        }

        let random_offset = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map_or(0, |d| d.subsec_nanos());
        let start_nonce = random_offset.wrapping_add(thread_index as u32);
        let step = thread_count as u32;
        let threshold = job.threshold;
        let mut nonces = (start_nonce..u32::MAX).step_by(thread_count);

        let Some(first_nonce) = nonces.next() else {
            continue;
        };
        blob[NONCE_OFFSET..NONCE_OFFSET + 4].copy_from_slice(&first_nonce.to_le_bytes());
        hasher.hash_first(&blob);
        let mut previous_nonce = first_nonce;
        let mut batch_start = Instant::now();

        for nonce in nonces {
            if !running.load(Ordering::Relaxed) || job_receiver.has_changed() {
                let _ = hasher.hash_last();
                break;
            }

            blob[NONCE_OFFSET..NONCE_OFFSET + 4].copy_from_slice(&nonce.to_le_bytes());
            let output = hasher.hash_next(&blob);
            hash_count.fetch_add(1, Ordering::Relaxed);
            check_and_submit(output, previous_nonce, threshold, &job.id, share_sender);
            previous_nonce = nonce;

            if (nonce / step).is_multiple_of(16) {
                let fraction = throttle.fraction();
                if fraction < 1.0 {
                    let hash_duration = batch_start.elapsed();
                    let sleep = hash_duration.mul_f32((1.0 - fraction) / fraction);
                    thread::sleep(sleep);
                }
                batch_start = Instant::now();
            }
        }
    }
}

fn check_and_submit(
    output: Output,
    nonce: u32,
    threshold: u64,
    job_id: &str,
    share_sender: &Sender<Share>,
) {
    let hash_bytes: &[u8] = output.as_ref();
    let mut tail = [0u8; 8];
    tail.copy_from_slice(&hash_bytes[24..32]);
    let hash_value = u64::from_le_bytes(tail);

    if hash_value < threshold {
        let _ = share_sender.send(Share {
            job_id: job_id.into(),
            nonce_value: nonce,
            nonce_hex: hex::encode(nonce.to_le_bytes()),
            hash_hex: hex::encode(hash_bytes),
        });
    }
}

pub mod watch {
    use std::sync::{Arc, Condvar, Mutex};

    struct Inner<T> {
        value: Mutex<(T, u64)>,
        notify: Condvar,
    }

    pub struct Sender<T> {
        inner: Arc<Inner<T>>,
    }

    impl<T> Sender<T> {
        pub fn send(&self, value: T) {
            let mut guard = self.inner.value.lock().unwrap();
            guard.0 = value;
            guard.1 += 1;
            drop(guard);
            self.inner.notify.notify_all();
        }
    }

    pub struct Receiver<T> {
        inner: Arc<Inner<T>>,
        seen_version: u64,
    }

    impl<T: Clone> Receiver<T> {
        pub fn get(&mut self) -> T {
            let guard = self.inner.value.lock().unwrap();
            self.seen_version = guard.1;
            guard.0.clone()
        }

        pub fn has_changed(&self) -> bool {
            let guard = self.inner.value.lock().unwrap();
            guard.1 != self.seen_version
        }

        #[allow(clippy::significant_drop_tightening)]
        pub fn wait(&self) {
            let guard = self.inner.value.lock().unwrap();
            if guard.1 != self.seen_version {
                return;
            }
            let _guard = self.inner.notify.wait(guard).unwrap();
        }

        pub fn clone(&self) -> Self {
            Self {
                inner: self.inner.clone(),
                seen_version: self.seen_version,
            }
        }
    }

    pub fn channel<T>(initial: T) -> (Sender<T>, Receiver<T>) {
        let inner = Arc::new(Inner {
            value: Mutex::new((initial, 0)),
            notify: Condvar::new(),
        });
        (
            Sender {
                inner: inner.clone(),
            },
            Receiver {
                inner,
                seen_version: 0,
            },
        )
    }
}