use futures::channel::mpsc;
use futures::StreamExt;
use pocx_protocol::{JsonRpcClient, ProtocolError, SubmitNonceParams, SubmitNonceResult};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
struct GlobalQueuedSubmission {
params: SubmitNonceParams,
}
#[derive(Clone)]
pub struct GlobalBestQueue {
block_hashes: Arc<RwLock<VecDeque<String>>>, best_qualities: Arc<RwLock<HashMap<String, u64>>>, tx_submit: mpsc::UnboundedSender<GlobalQueuedSubmission>,
}
impl GlobalBestQueue {
pub fn new(client: JsonRpcClient) -> Self {
let (tx_submit, rx_submit) = mpsc::unbounded();
let block_hashes = Arc::new(RwLock::new(VecDeque::new()));
let best_qualities = Arc::new(RwLock::new(HashMap::new()));
Self::start_handler(client, rx_submit);
Self {
block_hashes,
best_qualities,
tx_submit,
}
}
pub async fn submit(&self, params: SubmitNonceParams) -> bool {
const MAX_BLOCKS: usize = 3;
let block_hash = ¶ms.block_hash;
let raw_quality = params.raw_quality;
let mut hashes = self.block_hashes.write().await;
let mut qualities = self.best_qualities.write().await;
if let Some(&best_quality) = qualities.get(block_hash) {
if raw_quality >= best_quality {
log::debug!(
"Filtered submission: raw_quality {} not better than best {} for block {}",
raw_quality,
best_quality,
&block_hash[..16]
);
return false;
}
qualities.insert(block_hash.clone(), raw_quality);
} else {
if hashes.len() >= MAX_BLOCKS {
if let Some(oldest_hash) = hashes.pop_front() {
qualities.remove(&oldest_hash);
log::debug!("Evicted oldest block hash: {}", &oldest_hash[..16]);
}
}
hashes.push_back(block_hash.clone());
qualities.insert(block_hash.clone(), raw_quality);
}
let submission = GlobalQueuedSubmission { params };
self.tx_submit.unbounded_send(submission).is_ok()
}
fn start_handler(
client: JsonRpcClient,
mut rx: mpsc::UnboundedReceiver<GlobalQueuedSubmission>,
) {
tokio::task::spawn(async move {
while let Some(submission) = rx.next().await {
match client.submit_nonce(submission.params.clone()).await {
Ok(result) => {
log_submission_accepted(&submission.params, &result);
}
Err(ProtocolError::RateLimited) => {
log_server_busy(&submission.params);
}
Err(ProtocolError::InvalidSubmission(msg)) => {
log_submission_not_accepted(
&submission.params,
&format!("Rejected: {}", msg),
);
}
Err(ProtocolError::StaleSubmission) => {
log_submission_not_accepted(&submission.params, "Stale submission");
}
Err(e) => {
log_submission_failed(&submission.params, &e.to_string());
}
}
}
});
}
}
fn log_submission_accepted(params: &SubmitNonceParams, result: &SubmitNonceResult) {
log::info!(
"Submitted: height={}, account=...{}, raw_quality={}, poc_time={}s",
params.height,
¶ms.account_id[params.account_id.len().saturating_sub(8)..],
result.raw_quality,
result.poc_time
);
}
fn log_submission_failed(params: &SubmitNonceParams, err: &str) {
log::warn!(
"Upstream submission failed (global best): height={}, account={}, nonce={}, error={}",
params.height,
params.account_id,
params.nonce,
err
);
}
fn log_submission_not_accepted(params: &SubmitNonceParams, msg: &str) {
log::error!(
"Upstream rejected submission (global best): height={}, account={}, nonce={}, message={}",
params.height,
params.account_id,
params.nonce,
msg
);
}
fn log_server_busy(params: &SubmitNonceParams) {
log::info!(
"Upstream server busy (global best): height={}, account={}, nonce={}",
params.height,
params.account_id,
params.nonce
);
}