use core::future::Future;
use std::time::{Duration, SystemTime};
use serai_db::*;
use serai_task::ContinuallyRan;
use crate::{
HasEvents, GlobalSession, NetworksLatestCosignedBlock, RequestNotableCosigns,
intend::{GlobalSessionsChannel, BlockEventData, BlockEvents},
};
create_db!(
SubstrateCosignEvaluator {
CurrentlyEvaluatedGlobalSession: () -> ([u8; 32], GlobalSession),
}
);
db_channel!(
SubstrateCosignEvaluatorChannels {
CosignedBlocks: () -> (u64, u64),
}
);
fn currently_evaluated_global_session_strict(
txn: &mut impl DbTxn,
block_number: u64,
) -> ([u8; 32], GlobalSession) {
let mut res = {
let existing = match CurrentlyEvaluatedGlobalSession::get(txn) {
Some(existing) => existing,
None => {
let first = GlobalSessionsChannel::try_recv(txn)
.expect("fetching latest global session yet none declared");
CurrentlyEvaluatedGlobalSession::set(txn, &first);
first
}
};
assert!(
existing.1.start_block_number <= block_number,
"candidate's start block number exceeds our block number"
);
existing
};
if let Some(next) = GlobalSessionsChannel::peek(txn) {
assert!(
block_number <= next.1.start_block_number,
"currently_evaluated_global_session_strict wasn't called incrementally"
);
if block_number == next.1.start_block_number {
GlobalSessionsChannel::try_recv(txn).unwrap();
CurrentlyEvaluatedGlobalSession::set(txn, &next);
res = next;
}
}
res
}
pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
pub(crate) db: D,
pub(crate) request: R,
}
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut known_cosign = None;
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn)
else {
break;
};
match has_events {
HasEvents::Notable => {
let (global_session, global_session_info) =
currently_evaluated_global_session_strict(&mut txn, block_number);
let mut weight_cosigned = 0;
for set in global_session_info.sets {
if NetworksLatestCosignedBlock::get(&txn, global_session, set.network)
.map(|signed_cosign| signed_cosign.cosign.block_number) ==
Some(block_number)
{
weight_cosigned +=
global_session_info.stakes.get(&set.network).ok_or_else(|| {
"ValidatorSet in global session yet didn't have its stake".to_string()
})?;
}
}
if weight_cosigned < (((global_session_info.total_stake * 83) / 100) + 1) {
self
.request
.request_notable_cosigns(global_session)
.await
.map_err(|e| format!("{e:?}"))?;
return Err(format!(
"notable block (#{block_number}) wasn't yet cosigned. this should resolve shortly",
));
}
}
HasEvents::NonNotable => {
let known_cosigned = if let Some(known_cosign) = known_cosign {
known_cosign >= block_number
} else {
known_cosign = None;
false
};
if !known_cosigned {
let (global_session, global_session_info) =
currently_evaluated_global_session_strict(&mut txn, block_number);
let mut weight_cosigned = 0;
let mut lowest_common_block: Option<u64> = None;
for set in global_session_info.sets {
let Some(cosign) =
NetworksLatestCosignedBlock::get(&txn, global_session, set.network)
else {
continue;
};
if cosign.cosign.block_number >= block_number {
weight_cosigned +=
global_session_info.stakes.get(&set.network).ok_or_else(|| {
"ValidatorSet in global session yet didn't have its stake".to_string()
})?;
}
lowest_common_block = lowest_common_block
.map(|existing| existing.min(cosign.cosign.block_number))
.or(Some(cosign.cosign.block_number));
}
if weight_cosigned < (((global_session_info.total_stake * 83) / 100) + 1) {
self
.request
.request_notable_cosigns(global_session)
.await
.map_err(|e| format!("{e:?}"))?;
return Err(format!(
"block (#{block_number}) wasn't yet cosigned. this should resolve shortly",
));
}
known_cosign = lowest_common_block;
}
}
HasEvents::No => {}
}
CosignedBlocks::send(
&mut txn,
&(
block_number,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs(),
),
);
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}