use futures::channel::mpsc;
use futures::StreamExt;
use pocx_protocol::{JsonRpcClient, ProtocolError, SubmitNonceParams, SubmitNonceResult};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub struct QueuedSubmission {
pub params: SubmitNonceParams,
}
#[derive(Debug, Clone)]
struct TimestampedSubmission {
submission: QueuedSubmission,
queued_at: Instant,
retry_count: u32,
}
#[derive(Debug)]
struct AccountFilter {
block_hashes: VecDeque<String>,
best_qualities: HashMap<String, u64>,
}
#[derive(Clone)]
pub struct SubmissionQueue {
account_filters: Arc<RwLock<HashMap<String, AccountFilter>>>,
tx_submit: mpsc::UnboundedSender<QueuedSubmission>,
}
impl SubmissionQueue {
pub fn new(client: JsonRpcClient) -> Self {
let (tx_submit, rx_submit) = mpsc::unbounded();
let account_filters = Arc::new(RwLock::new(HashMap::new()));
Self::start_handler(client, rx_submit);
Self {
account_filters,
tx_submit,
}
}
pub async fn submit(&self, params: SubmitNonceParams) -> bool {
const MAX_BLOCKS: usize = 3;
let account_id = params.account_id.clone();
let block_hash = params.block_hash.clone();
let raw_quality = params.raw_quality;
let mut filters = self.account_filters.write().await;
let account_filter = filters
.entry(account_id.clone())
.or_insert_with(|| AccountFilter {
block_hashes: VecDeque::new(),
best_qualities: HashMap::new(),
});
if let Some(&best_quality) = account_filter.best_qualities.get(&block_hash) {
if raw_quality >= best_quality {
log::debug!(
"Filtered submission: account={}, raw_quality {} not better than best {} for block {}",
&account_id[..8.min(account_id.len())],
raw_quality,
best_quality,
&block_hash[..8]
);
return false;
}
account_filter
.best_qualities
.insert(block_hash.clone(), raw_quality);
log::info!(
"Updated best raw_quality for account={}, block {}: {} -> {}",
&account_id[..8.min(account_id.len())],
&block_hash[..8],
best_quality,
raw_quality
);
} else {
if account_filter.block_hashes.len() >= MAX_BLOCKS {
if let Some(oldest_hash) = account_filter.block_hashes.pop_front() {
account_filter.best_qualities.remove(&oldest_hash);
log::debug!(
"Evicted oldest block hash for account={}: {}",
&account_id[..8.min(account_id.len())],
&oldest_hash[..8]
);
}
}
account_filter.block_hashes.push_back(block_hash.clone());
account_filter
.best_qualities
.insert(block_hash.clone(), raw_quality);
log::info!(
"New block hash detected for account={}: {} with raw_quality {}",
&account_id[..8.min(account_id.len())],
&block_hash[..8],
raw_quality
);
}
let submission = QueuedSubmission { params };
self.tx_submit.unbounded_send(submission).is_ok()
}
fn start_handler(client: JsonRpcClient, mut rx: mpsc::UnboundedReceiver<QueuedSubmission>) {
tokio::task::spawn(async move {
let mut pending_queue: VecDeque<TimestampedSubmission> = VecDeque::new();
let mut in_queue_best: HashMap<(String, String), u64> = HashMap::new();
const STALE_TIMEOUT: Duration = Duration::from_secs(240); const MAX_RETRIES: u32 = 5;
const BASE_DELAY_MS: u64 = 1000;
log::info!("SubmissionQueue handler task started");
loop {
let now = Instant::now();
while let Ok(new_submission) = rx.try_recv() {
let key = (
new_submission.params.account_id.clone(),
new_submission.params.block_hash.clone(),
);
let should_add = if let Some(&best_in_queue) = in_queue_best.get(&key) {
if new_submission.params.raw_quality < best_in_queue {
in_queue_best.insert(key, new_submission.params.raw_quality);
true
} else {
log::debug!(
"Dropping new submission: worse than queued ({} >= {})",
new_submission.params.raw_quality,
best_in_queue
);
false
}
} else {
in_queue_best.insert(key, new_submission.params.raw_quality);
true
};
if should_add {
pending_queue.push_back(TimestampedSubmission {
submission: new_submission,
queued_at: now,
retry_count: 0,
});
}
}
if let Some(mut item) = pending_queue.pop_front() {
let key = (
item.submission.params.account_id.clone(),
item.submission.params.block_hash.clone(),
);
let age = now.duration_since(item.queued_at);
if age > STALE_TIMEOUT {
log::warn!(
"Dropping stale submission: account={}, block={}, age={}s",
&item.submission.params.account_id
[..8.min(item.submission.params.account_id.len())],
&item.submission.params.block_hash[..8],
age.as_secs()
);
in_queue_best.remove(&key);
continue;
}
if let Some(&best_in_queue) = in_queue_best.get(&key) {
if item.submission.params.raw_quality > best_in_queue {
log::debug!(
"Dropping queued item: better submission in queue ({} > {})",
item.submission.params.raw_quality,
best_in_queue
);
continue;
}
}
log::debug!(
"Processing submission: account={}, block={}, raw_quality={}, retry={}/{}",
&item.submission.params.account_id
[..8.min(item.submission.params.account_id.len())],
&item.submission.params.block_hash[..8],
item.submission.params.raw_quality,
item.retry_count,
MAX_RETRIES
);
match client.submit_nonce(item.submission.params.clone()).await {
Ok(result) => {
log_submission_accepted(&item.submission.params, &result);
in_queue_best.remove(&key);
}
Err(ProtocolError::RateLimited) => {
log_server_busy(&item.submission.params);
item.retry_count += 1;
if item.retry_count > MAX_RETRIES {
log::warn!(
"Max retries exceeded: account={}, block={}",
&item.submission.params.account_id
[..8.min(item.submission.params.account_id.len())],
&item.submission.params.block_hash[..8]
);
in_queue_best.remove(&key);
} else {
let delay_ms = BASE_DELAY_MS * 2u64.pow(item.retry_count - 1);
log::info!(
"Retrying in {}ms (attempt {}/{})",
delay_ms,
item.retry_count,
MAX_RETRIES
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
pending_queue.push_front(item);
}
}
Err(ProtocolError::InvalidSubmission(msg)) => {
log_submission_not_accepted(
&item.submission.params,
&format!("Rejected: {}", msg),
);
in_queue_best.remove(&key);
}
Err(ProtocolError::StaleSubmission) => {
log_submission_not_accepted(
&item.submission.params,
"Stale submission",
);
in_queue_best.remove(&key);
}
Err(e) => {
log::warn!("Network error: {}", e);
item.retry_count += 1;
if item.retry_count > MAX_RETRIES {
log::warn!(
"Max retries exceeded after network errors: account={}, block={}",
&item.submission.params.account_id[..8.min(item.submission.params.account_id.len())],
&item.submission.params.block_hash[..8]
);
in_queue_best.remove(&key);
} else {
let delay_ms = BASE_DELAY_MS * 2u64.pow(item.retry_count - 1);
log::info!(
"Retrying after network error in {}ms (attempt {}/{})",
delay_ms,
item.retry_count,
MAX_RETRIES
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
pending_queue.push_front(item);
}
}
}
} else {
if let Some(new_submission) = rx.next().await {
let key = (
new_submission.params.account_id.clone(),
new_submission.params.block_hash.clone(),
);
in_queue_best.insert(key, new_submission.params.raw_quality);
pending_queue.push_back(TimestampedSubmission {
submission: new_submission,
queued_at: Instant::now(),
retry_count: 0,
});
} else {
break; }
}
}
log::warn!("SubmissionQueue handler task exited - stream ended");
});
}
}
fn log_submission_accepted(params: &SubmitNonceParams, result: &SubmitNonceResult) {
log::info!(
"Submitted to upstream: height={}, account={}, nonce={}, raw_quality={}, poc_time={}",
params.height,
params.account_id,
params.nonce,
result.raw_quality,
result.poc_time
);
}
fn log_submission_not_accepted(params: &SubmitNonceParams, msg: &str) {
log::error!(
"Upstream rejected submission: height={}, account={}, nonce={}, message={}",
params.height,
params.account_id,
params.nonce,
msg
);
}
fn log_server_busy(params: &SubmitNonceParams) {
log::info!(
"Upstream server busy (will retry): height={}, account={}, nonce={}",
params.height,
params.account_id,
params.nonce
);
}