use crate::tools::pow::pow_measure_from_data_hash;
use crate::tools::pow_required_estimator::PowRequiredEstimator;
use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
use crate::tools::types::{Hash, Pow, Salt};
use futures::stream::{FuturesUnordered, StreamExt};
use log::trace;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
pub struct PowJobStatus {
pub label: String,
pub pow_min: Pow,
pub best_pow_so_far: Pow,
}
type JobId = u64;
struct JobEntry {
label: String,
pow_min: Pow,
best_pow_so_far: Pow,
}
#[derive(Default)]
pub struct JobTracker {
next_id: JobId,
jobs: HashMap<JobId, JobEntry>,
last_work_time_millis: i64,
}
impl JobTracker {
pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
let job_id = self.next_id;
self.next_id += 1;
self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
job_id
}
pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
if let Some(entry) = self.jobs.get_mut(&job_id) {
entry.best_pow_so_far = best_pow_so_far;
}
}
pub fn remove(&mut self, job_id: JobId) {
self.jobs.remove(&job_id);
}
pub fn mark_work(&mut self, now_millis: i64) {
self.last_work_time_millis = now_millis;
}
pub fn is_busy(&self, now_millis: i64, within_millis: i64) -> bool {
!self.jobs.is_empty() || (self.last_work_time_millis != 0 && now_millis - self.last_work_time_millis <= within_millis)
}
pub fn snapshot(&self) -> Vec<PowJobStatus> {
self.jobs.values().map(|entry| PowJobStatus {
label: entry.label.clone(),
pow_min: entry.pow_min,
best_pow_so_far: entry.best_pow_so_far,
}).collect()
}
}
struct TrackedJobGuard {
tracker: Arc<Mutex<JobTracker>>,
job_id: JobId,
}
impl TrackedJobGuard {
fn new(tracker: Arc<Mutex<JobTracker>>, label: &str, pow_min: Pow) -> Self {
let now = RealTimeProvider.current_time_millis().0;
let job_id = {
let mut tracker = tracker.lock().unwrap();
let job_id = tracker.add(label, pow_min);
tracker.mark_work(now);
job_id
};
Self { tracker, job_id }
}
fn update(&self, best_pow_so_far: Pow) {
self.tracker.lock().unwrap().update(self.job_id, best_pow_so_far);
}
}
impl Drop for TrackedJobGuard {
fn drop(&mut self) {
let now = RealTimeProvider.current_time_millis().0;
let mut tracker = self.tracker.lock().unwrap();
tracker.remove(self.job_id);
tracker.mark_work(now);
}
}
#[async_trait::async_trait]
pub trait PowGenerator: Send + Sync {
fn pool_size(&self) -> usize;
async fn run_chunk(&self, slot: usize, chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
fn tracker(&self) -> &Arc<Mutex<JobTracker>>;
fn active_jobs(&self) -> Vec<PowJobStatus> {
self.tracker().lock().unwrap().snapshot()
}
fn is_pow_busy(&self, within_millis: i64) -> bool {
let now = RealTimeProvider.current_time_millis().0;
self.tracker().lock().unwrap().is_busy(now, within_millis)
}
async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
run_pool(self, label, Some(iteration_limit), pow_min, data_hash).await
}
async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
run_pool(self, label, None, pow_min, data_hash).await
}
}
const CHUNK_ITERATIONS: usize = 4 * 1024;
type SlotFuture<'a> = Pin<Box<dyn Future<Output = (usize, usize, anyhow::Result<(Salt, Pow, Hash)>)> + Send + 'a>>;
pub async fn run_pool<'a, G: PowGenerator + ?Sized>(
generator: &'a G,
label: &'a str,
iteration_cap: Option<usize>,
pow_min: Pow,
data_hash: Hash,
) -> anyhow::Result<(Salt, Pow, Hash)> {
let tracker = generator.tracker().clone();
let guard = TrackedJobGuard::new(tracker, label, pow_min);
let real_time_provider = RealTimeProvider;
let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
let pool_size = generator.pool_size().max(1);
let mut remaining_iterations: Option<usize> = iteration_cap;
let mut best = {
let seed_salt = Salt::random();
let (seed_pow, seed_hash) = pow_measure_from_data_hash(&data_hash, &seed_salt)?;
(seed_salt, seed_pow, seed_hash)
};
guard.update(best.1);
if best.1 >= pow_min {
return Ok(best);
}
let mut in_flight: FuturesUnordered<SlotFuture<'a>> = FuturesUnordered::new();
for slot in 0..pool_size {
let chunk_size = pick_next_chunk_size(&mut remaining_iterations);
if chunk_size == 0 {
break;
}
in_flight.push(Box::pin(async move {
let chunk_result = generator.run_chunk(slot, chunk_size, pow_min, data_hash).await;
(slot, chunk_size, chunk_result)
}));
}
while let Some((slot, chunk_size, chunk_result)) = in_flight.next().await {
let chunk_best = chunk_result?;
if chunk_best.1 > best.1 {
best = chunk_best;
guard.update(best.1);
}
if best.1 >= pow_min {
return Ok(best);
}
if let Some(progress) = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), chunk_size, best.1) {
trace!("{}", progress);
}
let next_chunk_size = pick_next_chunk_size(&mut remaining_iterations);
if next_chunk_size == 0 {
continue;
}
in_flight.push(Box::pin(async move {
let chunk_result = generator.run_chunk(slot, next_chunk_size, pow_min, data_hash).await;
(slot, next_chunk_size, chunk_result)
}));
}
Ok(best)
}
fn pick_next_chunk_size(remaining_iterations: &mut Option<usize>) -> usize {
match remaining_iterations {
Some(0) => 0,
Some(remaining) => {
let chunk_size = (*remaining).min(CHUNK_ITERATIONS);
*remaining -= chunk_size;
chunk_size
}
None => CHUNK_ITERATIONS,
}
}
pub fn run_pool_chunk(chunk_iterations: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
let mut best = {
let salt = Salt::random();
let (pow, hash) = pow_measure_from_data_hash(&data_hash, &salt)?;
(salt, pow, hash)
};
if best.1 >= pow_min {
return Ok(best);
}
for _ in 1..chunk_iterations {
let salt = Salt::random();
let (pow, hash) = pow_measure_from_data_hash(&data_hash, &salt)?;
if pow > best.1 {
best = (salt, pow, hash);
if best.1 >= pow_min {
return Ok(best);
}
}
}
Ok(best)
}
#[cfg(test)]
mod tests {
use crate::tools::pow_generator::pow_generator::{JobTracker, TrackedJobGuard};
use crate::tools::types::Pow;
use std::sync::{Arc, Mutex};
#[test]
fn job_tracker_round_trip() {
let mut tracker = JobTracker::default();
assert!(tracker.snapshot().is_empty());
let job_a = tracker.add("rpc", Pow(18));
let job_b = tracker.add("post", Pow(22));
tracker.update(job_a, Pow(7));
tracker.update(job_b, Pow(13));
tracker.update(99999, Pow(255));
let mut snapshot = tracker.snapshot();
snapshot.sort_by(|a, b| a.label.cmp(&b.label));
assert_eq!(snapshot.len(), 2);
assert_eq!(snapshot[0].label, "post");
assert_eq!(snapshot[0].pow_min, Pow(22));
assert_eq!(snapshot[0].best_pow_so_far, Pow(13));
assert_eq!(snapshot[1].label, "rpc");
assert_eq!(snapshot[1].pow_min, Pow(18));
assert_eq!(snapshot[1].best_pow_so_far, Pow(7));
tracker.remove(job_a);
let remaining = tracker.snapshot();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].label, "post");
tracker.remove(job_b);
assert!(tracker.snapshot().is_empty());
}
#[test]
fn tracked_job_guard_removes_on_drop() {
let tracker = Arc::new(Mutex::new(JobTracker::default()));
{
let _guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
assert_eq!(tracker.lock().unwrap().snapshot().len(), 1);
}
assert!(tracker.lock().unwrap().snapshot().is_empty());
}
#[test]
fn tracked_job_guard_update_writes_through() {
let tracker = Arc::new(Mutex::new(JobTracker::default()));
let guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
guard.update(Pow(42));
let snapshot = tracker.lock().unwrap().snapshot();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].label, "rpc");
assert_eq!(snapshot[0].pow_min, Pow(18));
assert_eq!(snapshot[0].best_pow_so_far, Pow(42));
}
#[test]
fn is_busy_tracks_active_jobs_and_recent_work_window() {
let mut tracker = JobTracker::default();
assert!(!tracker.is_busy(1_000_000, 1000));
let job = tracker.add("rpc", Pow(18));
tracker.mark_work(1_000_000);
assert!(tracker.is_busy(1_000_000, 1000));
assert!(tracker.is_busy(9_999_999, 1000));
tracker.remove(job);
tracker.mark_work(2_000_000);
assert!(tracker.is_busy(2_000_500, 1000)); assert!(tracker.is_busy(2_001_000, 1000)); assert!(!tracker.is_busy(2_001_500, 1000)); }
#[test]
fn tracked_job_guard_marks_work_via_real_clock() {
use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
let tracker = Arc::new(Mutex::new(JobTracker::default()));
{
let _guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
assert!(tracker.lock().unwrap().is_busy(0, 0));
}
assert!(tracker.lock().unwrap().snapshot().is_empty());
let now = RealTimeProvider.current_time_millis().0;
assert!(tracker.lock().unwrap().is_busy(now, 1000));
assert!(!tracker.lock().unwrap().is_busy(now + 60_000, 1000));
}
#[tokio::test]
async fn run_pool_returns_consistent_sample_when_iteration_limit_is_zero() {
use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use crate::tools::types::Pow;
let data_hash = pow_compute_data_hash(&[b"zero-budget"]);
let generator = SingleThreadedPowGenerator::new();
let (salt, achieved_pow, _) = generator.generate_best_effort("zero", 0, Pow(255), data_hash).await.unwrap();
let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
assert_eq!(recomputed_pow, achieved_pow);
}
#[tokio::test]
async fn run_pool_tracker_clears_after_completion() {
use crate::tools::pow::pow_compute_data_hash;
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use crate::tools::types::Pow;
let data_hash = pow_compute_data_hash(&[b"tracker-cleanup"]);
let generator = SingleThreadedPowGenerator::new();
let _ = generator.generate("clean", Pow(0), data_hash).await.unwrap();
assert!(generator.active_jobs().is_empty());
}
#[tokio::test]
async fn run_pool_returns_as_soon_as_pow_min_is_met() {
use crate::tools::pow::pow_compute_data_hash;
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use crate::tools::types::Pow;
const POW_MIN: Pow = Pow(8);
let data_hash = pow_compute_data_hash(&[b"early-exit"]);
let generator = SingleThreadedPowGenerator::new();
let (_, achieved_pow, _) = generator.generate("early", POW_MIN, data_hash).await.unwrap();
assert!(achieved_pow >= POW_MIN);
}
#[tokio::test]
async fn run_pool_pow_min_zero_returns_consistent_salt_and_pow() {
use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use crate::tools::types::Pow;
let generator = SingleThreadedPowGenerator::new();
for trial in 0u32..256 {
let data_hash = pow_compute_data_hash(&[&trial.to_le_bytes()]);
let (salt, achieved_pow, _) = generator.generate_best_effort("regression", 1, Pow(0), data_hash).await.unwrap();
let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
assert_eq!(recomputed_pow, achieved_pow, "trial {}: salt and pow drifted apart", trial);
}
}
#[tokio::test]
async fn run_pool_returns_consistent_sample_when_budget_exhausted() {
use crate::tools::pow::{pow_compute_data_hash, pow_measure_from_data_hash};
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use crate::tools::types::Pow;
let generator = SingleThreadedPowGenerator::new();
for trial in 0u32..256 {
let data_hash = pow_compute_data_hash(&[b"exhaust", &trial.to_le_bytes()]);
let (salt, achieved_pow, _) = generator.generate_best_effort("exhaust", 1, Pow(255), data_hash).await.unwrap();
let (recomputed_pow, _) = pow_measure_from_data_hash(&data_hash, &salt).unwrap();
assert_eq!(recomputed_pow, achieved_pow, "trial {}: returned salt does not produce returned pow", trial);
}
}
}