use crate::config::{SubmissionMode, UpstreamConfig};
use crate::error::{Error, Result};
use crate::queue::{GlobalBestQueue, SubmissionQueue};
use log::{debug, error, info};
use pocx_protocol::{JsonRpcClient, MiningInfo, SubmitNonceParams, SubmitNonceResult};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{Duration, Instant};
#[derive(Clone)]
enum SubmissionHandler {
Pool(SubmissionQueue),
Wallet(GlobalBestQueue),
}
impl SubmissionHandler {
async fn submit(&self, params: SubmitNonceParams) -> bool {
match self {
Self::Pool(queue) => queue.submit(params).await,
Self::Wallet(queue) => queue.submit(params).await,
}
}
}
#[derive(Clone)]
pub struct PoolManager {
upstream_name: String,
client: JsonRpcClient,
current_mining_info: Arc<RwLock<Option<CachedMiningInfo>>>,
cache_ttl: Duration,
fetch_lock: Arc<Mutex<()>>, submission_handler: SubmissionHandler,
block_time_secs: u64,
}
struct CachedMiningInfo {
info: MiningInfo,
cached_at: Instant,
}
impl PoolManager {
pub fn new(upstream: &UpstreamConfig, cache_ttl_secs: u64, timeout_secs: u64) -> Result<Self> {
let timeout = Duration::from_secs(timeout_secs);
let url = upstream
.build_url()
.ok_or_else(|| Error::Config("HTTP/HTTPS transport requires valid URL".to_string()))?;
info!(
"Connecting to upstream '{}' via HTTP: {}",
upstream.name, url
);
let mut client = JsonRpcClient::new(&url)
.map_err(|e| {
Error::Pool(format!(
"Failed to create HTTP client for {}: {}",
upstream.name, e
))
})?
.with_timeout(timeout);
if let Some(token) = upstream.get_auth_token_or_exit().map_err(Error::Config)? {
client = client.with_auth_token(token);
}
let submission_handler = match upstream.submission_mode {
SubmissionMode::Pool => {
info!("Using Pool submission mode (per-account best tracking)");
SubmissionHandler::Pool(SubmissionQueue::new(client.clone()))
}
SubmissionMode::Wallet => {
info!("Using Wallet submission mode (global best tracking)");
SubmissionHandler::Wallet(GlobalBestQueue::new(client.clone()))
}
};
Ok(Self {
upstream_name: upstream.name.clone(),
client,
current_mining_info: Arc::new(RwLock::new(None)),
cache_ttl: Duration::from_secs(cache_ttl_secs),
fetch_lock: Arc::new(Mutex::new(())),
submission_handler,
block_time_secs: upstream.block_time_secs,
})
}
pub async fn get_mining_info(&self) -> Result<MiningInfo> {
{
let cached = self.current_mining_info.read().await;
if let Some(cached_info) = cached.as_ref() {
if cached_info.cached_at.elapsed() < self.cache_ttl {
debug!(
"Returning cached mining info (age: {:?})",
cached_info.cached_at.elapsed()
);
return Ok(cached_info.info.clone());
}
}
}
let _fetch_guard = self.fetch_lock.lock().await;
{
let cached = self.current_mining_info.read().await;
if let Some(cached_info) = cached.as_ref() {
if cached_info.cached_at.elapsed() < self.cache_ttl {
debug!("Cache updated by another task, returning cached info");
return Ok(cached_info.info.clone());
}
}
}
let info = self.fetch_mining_info_from_upstream().await?;
{
let mut cached = self.current_mining_info.write().await;
let signature_changed = cached
.as_ref()
.map(|c| c.info.generation_signature != info.generation_signature)
.unwrap_or(true);
if signature_changed {
info!(
"New block detected: height={}, gen_sig={}",
info.height,
&info.generation_signature[..16]
);
}
*cached = Some(CachedMiningInfo {
info: info.clone(),
cached_at: Instant::now(),
});
}
Ok(info)
}
pub async fn submit_nonce(&self, params: SubmitNonceParams) -> Result<SubmitNonceResult> {
self.submission_handler.submit(params.clone()).await;
let poc_time = crate::time_bending::calculate_time_bended_deadline(
params.raw_quality,
params.base_target,
self.block_time_secs,
);
Ok(SubmitNonceResult::new(params.raw_quality, poc_time))
}
async fn fetch_mining_info_from_upstream(&self) -> Result<MiningInfo> {
debug!("Fetching mining info from upstream: {}", self.upstream_name);
match self.client.get_mining_info().await {
Ok(info) => {
debug!(
"Got mining info from '{}': height={}, base_target={}",
self.upstream_name, info.height, info.base_target
);
Ok(info)
}
Err(e) => {
error!(
"Failed to get mining info from '{}': {}",
self.upstream_name, e
);
Err(Error::Protocol(e))
}
}
}
}