use ccp_msr::MSRModeEnforcer;
use ccp_randomx::cache::CacheHandle;
use ccp_randomx::dataset::DatasetHandle;
use ccp_randomx::Dataset;
use ccp_randomx::RandomXFlags;
use ccp_shared::types::*;
use ccp_utils::run_utils::run_unordered;
use cpu_utils::CPUTopology;
use super::config::CUProverConfig;
use super::proving_thread::ProvingThreadAsync;
use super::proving_thread::ProvingThreadFacade;
use super::proving_thread_utils::ThreadAllocator;
use super::status::CUStatus;
use super::status::ToCUStatus;
use super::CUResult;
use crate::utility_thread::message::ToUtilityInlet;
#[derive(Debug)]
pub struct CUProver {
threads: nonempty::NonEmpty<ProvingThreadAsync>,
pinned_core_id: PhysicalCoreId,
randomx_flags: RandomXFlags,
cpu_topology: CPUTopology,
dataset: Dataset,
status: CUStatus,
}
impl CUProver {
pub(crate) async fn create(
config: CUProverConfig,
to_utility: ToUtilityInlet,
msr_enforcer: MSRModeEnforcer,
core_id: PhysicalCoreId,
) -> CUResult<Self> {
let topology = CPUTopology::new()?;
let mut threads = ThreadAllocator::new(config.threads_per_core_policy, core_id, &topology)?
.allocate(msr_enforcer, to_utility)?;
let thread = &mut threads.head;
let dataset = thread.allocate_dataset(config.randomx_flags).await?;
let prover = Self {
threads,
pinned_core_id: core_id,
randomx_flags: config.randomx_flags,
cpu_topology: topology,
dataset,
status: CUStatus::Idle,
};
Ok(prover)
}
pub(crate) async fn new_epoch(&mut self, epoch: EpochParameters, cu_id: CUID) -> CUResult<()> {
self.pause().await?;
self.status = CUStatus::Running { cu_id };
let thread = &mut self.threads.head;
let randomx_flags = self.randomx_flags;
let cache = thread.create_cache(epoch, cu_id, randomx_flags).await?;
let dataset_handle = self.dataset.handle();
let cache_handle = cache.handle();
self.initialize_dataset(epoch, cache_handle, dataset_handle.clone())
.await?;
self.run_proving_jobs(epoch, dataset_handle, cu_id).await
}
#[allow(clippy::needless_lifetimes)]
pub(crate) async fn pin<'threads>(&'threads mut self, core_id: PhysicalCoreId) -> CUResult<()> {
use super::proving_thread_utils::RoundRobinDistributor;
use super::proving_thread_utils::ThreadDistributionPolicy;
use futures::FutureExt;
let logical_cores = self.cpu_topology.logical_cores_for_physical(core_id)?;
let distributor = RoundRobinDistributor {};
let closure = |thread_id: usize, thread: &'threads mut ProvingThreadAsync| {
let core_id = distributor.distribute(thread_id, &logical_cores);
thread.pin(core_id).boxed()
};
run_unordered(self.threads.iter_mut(), closure).await?;
Ok(())
}
#[allow(clippy::needless_lifetimes)]
pub(crate) async fn pause<'threads>(&'threads mut self) -> CUResult<()> {
use futures::FutureExt;
let closure = |_: usize, thread: &'threads mut ProvingThreadAsync| thread.pause().boxed();
run_unordered(self.threads.iter_mut(), closure).await?;
self.status = CUStatus::Idle;
Ok(())
}
pub(crate) async fn stop_nonblocking<'threads>(&'threads self) -> CUResult<()> {
use futures::FutureExt;
let closure =
|_: usize, thread: &'threads ProvingThreadAsync| thread.stop_nonblocking().boxed();
run_unordered(self.threads.iter(), closure).await?;
Ok(())
}
pub(crate) async fn join(self) -> CUResult<()> {
use futures::FutureExt;
let closure = |_: usize, thread: ProvingThreadAsync| thread.join().boxed();
run_unordered(self.threads.into_iter(), closure).await?;
Ok(())
}
pub(crate) async fn stop_join(self) -> CUResult<()> {
use futures::FutureExt;
let closure = |_: usize, thread: ProvingThreadAsync| thread.stop_join().boxed();
run_unordered(self.threads.into_iter(), closure).await?;
Ok(())
}
pub(crate) fn pinned_core_id(&self) -> PhysicalCoreId {
self.pinned_core_id
}
#[allow(clippy::needless_lifetimes)]
async fn initialize_dataset<'threads>(
&'threads mut self,
epoch: EpochParameters,
cache: CacheHandle,
dataset: DatasetHandle,
) -> CUResult<()> {
use futures::FutureExt;
let threads_number = self.threads.len() as u64;
let dataset_size = dataset.items_count();
let closure = |thread_id: usize, thread: &'threads mut ProvingThreadAsync| {
let thread_id = thread_id as u64;
let start_item = (dataset_size * thread_id) / threads_number;
let next_start_item = (dataset_size * (thread_id + 1)) / threads_number;
let items_count = next_start_item - start_item;
thread
.initialize_dataset(
epoch,
cache.clone(),
dataset.clone(),
start_item,
items_count,
)
.boxed()
};
run_unordered(self.threads.iter_mut(), closure).await?;
Ok(())
}
#[allow(clippy::needless_lifetimes)]
async fn run_proving_jobs<'threads>(
&'threads mut self,
epoch: EpochParameters,
dataset: DatasetHandle,
cu_id: CUID,
) -> CUResult<()> {
use futures::FutureExt;
let randomx_flags = self.randomx_flags;
let closure = |_: usize, thread: &'threads mut ProvingThreadAsync| {
thread
.run_cc_job(epoch, dataset.clone(), randomx_flags, cu_id)
.boxed()
};
run_unordered(self.threads.iter_mut(), closure).await?;
Ok(())
}
}
impl ToCUStatus for CUProver {
fn status(&self) -> CUStatus {
self.status
}
}