use crate::tools::pow_required_estimator::PowRequiredEstimator;
use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
use crate::tools::tools;
use crate::tools::types::{Hash, Pow, Salt};
use log::trace;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub struct PowJobStatus {
pub label: String,
pub pow_min: Pow,
pub best_pow_so_far: Pow,
}
type JobId = u64;
struct JobEntry {
label: String,
pow_min: Pow,
best_pow_so_far: Pow,
}
#[derive(Default)]
pub struct JobTracker {
next_id: JobId,
jobs: HashMap<JobId, JobEntry>,
}
impl JobTracker {
pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
let job_id = self.next_id;
self.next_id += 1;
self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
job_id
}
pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
if let Some(entry) = self.jobs.get_mut(&job_id) {
entry.best_pow_so_far = best_pow_so_far;
}
}
pub fn remove(&mut self, job_id: JobId) {
self.jobs.remove(&job_id);
}
pub fn snapshot(&self) -> Vec<PowJobStatus> {
self.jobs.values().map(|entry| PowJobStatus {
label: entry.label.clone(),
pow_min: entry.pow_min,
best_pow_so_far: entry.best_pow_so_far,
}).collect()
}
}
struct TrackedJobGuard {
tracker: Arc<Mutex<JobTracker>>,
job_id: JobId,
}
impl TrackedJobGuard {
fn new(tracker: Arc<Mutex<JobTracker>>, label: &str, pow_min: Pow) -> Self {
let job_id = tracker.lock().unwrap().add(label, pow_min);
Self { tracker, job_id }
}
fn update(&self, best_pow_so_far: Pow) {
self.tracker.lock().unwrap().update(self.job_id, best_pow_so_far);
}
}
impl Drop for TrackedJobGuard {
fn drop(&mut self) {
self.tracker.lock().unwrap().remove(self.job_id);
}
}
#[async_trait::async_trait]
pub trait PowGenerator: Send + Sync {
async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
fn active_jobs(&self) -> Vec<PowJobStatus>;
fn tracker(&self) -> &Arc<Mutex<JobTracker>>;
async fn generate_best_effort_tracked(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
let _guard = TrackedJobGuard::new(self.tracker().clone(), label, pow_min);
self.generate_best_effort(label, iteration_limit, pow_min, data_hash).await
}
}
pub async fn generate_loop(
generator: &(dyn PowGenerator + '_),
tracker: &Arc<Mutex<JobTracker>>,
label: &str,
pow_min: Pow,
data_hash: Hash,
) -> anyhow::Result<(Salt, Pow, Hash)> {
const BATCH_SIZE: usize = 64 * 1024;
let real_time_provider = RealTimeProvider::default();
let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
let guard = TrackedJobGuard::new(tracker.clone(), label, pow_min);
loop {
let result = generator.generate_best_effort(label, BATCH_SIZE, pow_min, data_hash).await?;
if result.1 >= pow_min {
return Ok(result);
}
guard.update(result.1);
let progress = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), BATCH_SIZE, result.1);
trace!("{}", progress);
tools::yield_now().await;
}
}
#[cfg(test)]
mod tests {
use crate::tools::pow_generator::pow_generator::{JobTracker, TrackedJobGuard};
use crate::tools::types::Pow;
use std::sync::{Arc, Mutex};
#[test]
fn job_tracker_round_trip() {
let mut tracker = JobTracker::default();
assert!(tracker.snapshot().is_empty());
let job_a = tracker.add("rpc", Pow(18));
let job_b = tracker.add("post", Pow(22));
tracker.update(job_a, Pow(7));
tracker.update(job_b, Pow(13));
tracker.update(99999, Pow(255));
let mut snapshot = tracker.snapshot();
snapshot.sort_by(|a, b| a.label.cmp(&b.label));
assert_eq!(snapshot.len(), 2);
assert_eq!(snapshot[0].label, "post");
assert_eq!(snapshot[0].pow_min, Pow(22));
assert_eq!(snapshot[0].best_pow_so_far, Pow(13));
assert_eq!(snapshot[1].label, "rpc");
assert_eq!(snapshot[1].pow_min, Pow(18));
assert_eq!(snapshot[1].best_pow_so_far, Pow(7));
tracker.remove(job_a);
let remaining = tracker.snapshot();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].label, "post");
tracker.remove(job_b);
assert!(tracker.snapshot().is_empty());
}
#[test]
fn tracked_job_guard_removes_on_drop() {
let tracker = Arc::new(Mutex::new(JobTracker::default()));
{
let _guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
assert_eq!(tracker.lock().unwrap().snapshot().len(), 1);
}
assert!(tracker.lock().unwrap().snapshot().is_empty());
}
#[test]
fn tracked_job_guard_update_writes_through() {
let tracker = Arc::new(Mutex::new(JobTracker::default()));
let guard = TrackedJobGuard::new(tracker.clone(), "rpc", Pow(18));
guard.update(Pow(42));
let snapshot = tracker.lock().unwrap().snapshot();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].label, "rpc");
assert_eq!(snapshot[0].pow_min, Pow(18));
assert_eq!(snapshot[0].best_pow_so_far, Pow(42));
}
}