use crate::{error::GetOnchainDisputesError, metrics, LOG_TARGET};
use futures::channel::oneshot;
use polkadot_node_primitives::{dispute_is_inactive, CandidateVotes, DisputeStatus, Timestamp};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
overseer, ActivatedLeaf,
};
use polkadot_primitives::{
supermajority_threshold, CandidateHash, DisputeState, DisputeStatement, DisputeStatementSet,
Hash, MultiDisputeStatementSet, SessionIndex, ValidDisputeStatementKind, ValidatorIndex,
};
use std::{
collections::{BTreeMap, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
#[cfg(test)]
mod tests;
#[cfg(not(test))]
pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200_000;
#[cfg(test)]
pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200;
#[cfg(not(test))]
const VOTES_SELECTION_BATCH_SIZE: usize = 1_100;
#[cfg(test)]
const VOTES_SELECTION_BATCH_SIZE: usize = 11;
pub async fn select_disputes<Sender>(
sender: &mut Sender,
metrics: &metrics::Metrics,
leaf: &ActivatedLeaf,
) -> MultiDisputeStatementSet
where
Sender: overseer::ProvisionerSenderTrait,
{
gum::trace!(
target: LOG_TARGET,
?leaf,
"Selecting disputes for inherent data using prioritized selection"
);
let onchain = match get_onchain_disputes(sender, leaf.hash).await {
Ok(r) => {
gum::trace!(
target: LOG_TARGET,
?leaf,
"Successfully fetched {} onchain disputes",
r.len()
);
r
},
Err(GetOnchainDisputesError::NotSupported(runtime_api_err, relay_parent)) => {
gum::error!(
target: LOG_TARGET,
?runtime_api_err,
?relay_parent,
"Can't fetch onchain disputes, because ParachainHost runtime api version is old. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
Err(GetOnchainDisputesError::Channel) => {
gum::debug!(
target: LOG_TARGET,
"Channel error occurred while fetching onchain disputes. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
Err(GetOnchainDisputesError::Execution(runtime_api_err, parent_hash)) => {
gum::warn!(
target: LOG_TARGET,
?runtime_api_err,
?parent_hash,
"Unexpected execution error occurred while fetching onchain votes. Will continue with empty onchain disputes set.",
);
HashMap::new()
},
};
metrics.on_fetched_onchain_disputes(onchain.keys().len() as u64);
gum::trace!(target: LOG_TARGET, ?leaf, "Fetching recent disputes");
let recent_disputes = request_disputes(sender).await;
gum::trace!(
target: LOG_TARGET,
?leaf,
"Got {} recent disputes and {} onchain disputes.",
recent_disputes.len(),
onchain.len(),
);
gum::trace!(target: LOG_TARGET, ?leaf, "Filtering recent disputes");
let recent_disputes = recent_disputes
.into_iter()
.filter(|(key, dispute_status)| {
dispute_status.is_confirmed_concluded() || onchain.contains_key(key)
})
.collect::<BTreeMap<_, _>>();
gum::trace!(target: LOG_TARGET, ?leaf, "Partitioning recent disputes");
let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);
if partitioned.inactive_unknown_onchain.len() > 0 {
gum::warn!(
target: LOG_TARGET,
?leaf,
"Got {} inactive unknown onchain disputes. This should not happen in normal conditions!",
partitioned.inactive_unknown_onchain.len()
);
}
gum::trace!(target: LOG_TARGET, ?leaf, "Vote selection for recent disputes");
let result = vote_selection(sender, partitioned, &onchain).await;
gum::trace!(target: LOG_TARGET, ?leaf, "Convert to multi dispute statement set");
make_multi_dispute_statement_set(metrics, result)
}
async fn vote_selection<Sender>(
sender: &mut Sender,
partitioned: PartitionedDisputes,
onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>,
) -> BTreeMap<(SessionIndex, CandidateHash), CandidateVotes>
where
Sender: overseer::ProvisionerSenderTrait,
{
let mut disputes = partitioned.into_iter().collect::<Vec<_>>();
let mut total_votes_len = 0;
let mut result = BTreeMap::new();
let mut request_votes_counter = 0;
while !disputes.is_empty() {
gum::trace!(target: LOG_TARGET, "has to process {} disputes left", disputes.len());
let batch_size = std::cmp::min(VOTES_SELECTION_BATCH_SIZE, disputes.len());
let batch = Vec::from_iter(disputes.drain(0..batch_size));
request_votes_counter += 1;
gum::trace!(target: LOG_TARGET, "requesting onchain votes",);
let votes = super::request_votes(sender, batch)
.await
.into_iter()
.map(|(session_index, candidate_hash, mut votes)| {
let onchain_state =
if let Some(onchain_state) = onchain.get(&(session_index, candidate_hash)) {
onchain_state
} else {
return (session_index, candidate_hash, votes);
};
votes.valid.retain(|validator_idx, (statement_kind, _)| {
is_vote_worth_to_keep(
validator_idx,
DisputeStatement::Valid(statement_kind.clone()),
&onchain_state,
)
});
votes.invalid.retain(|validator_idx, (statement_kind, _)| {
is_vote_worth_to_keep(
validator_idx,
DisputeStatement::Invalid(*statement_kind),
&onchain_state,
)
});
(session_index, candidate_hash, votes)
})
.collect::<Vec<_>>();
gum::trace!(target: LOG_TARGET, "got {} onchain votes after processing", votes.len());
for (session_index, candidate_hash, selected_votes) in votes {
let votes_len = selected_votes.valid.raw().len() + selected_votes.invalid.len();
if votes_len + total_votes_len > MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME {
gum::trace!(
target: LOG_TARGET,
?request_votes_counter,
?total_votes_len,
"vote_selection DisputeCoordinatorMessage::QueryCandidateVotes counter",
);
return result;
}
result.insert((session_index, candidate_hash), selected_votes);
total_votes_len += votes_len
}
}
gum::trace!(
target: LOG_TARGET,
?request_votes_counter,
?total_votes_len,
"vote_selection DisputeCoordinatorMessage::QueryCandidateVotes counter",
);
result
}
#[derive(Default)]
pub(crate) struct PartitionedDisputes {
pub inactive_unknown_onchain: Vec<(SessionIndex, CandidateHash)>,
pub inactive_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>,
pub active_unknown_onchain: Vec<(SessionIndex, CandidateHash)>,
pub active_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>,
pub active_concluded_onchain: Vec<(SessionIndex, CandidateHash)>,
pub inactive_concluded_onchain: Vec<(SessionIndex, CandidateHash)>,
}
impl PartitionedDisputes {
fn new() -> PartitionedDisputes {
Default::default()
}
fn into_iter(self) -> impl Iterator<Item = (SessionIndex, CandidateHash)> {
self.inactive_unknown_onchain
.into_iter()
.chain(self.inactive_unconcluded_onchain.into_iter())
.chain(self.active_unknown_onchain.into_iter())
.chain(self.active_unconcluded_onchain.into_iter())
.chain(self.active_concluded_onchain.into_iter())
}
}
fn secs_since_epoch() -> Timestamp {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(e) => {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Error getting system time."
);
0
},
}
}
fn concluded_onchain(onchain_state: &DisputeState) -> bool {
let supermajority = supermajority_threshold(onchain_state.validators_for.len());
onchain_state.validators_for.count_ones() >= supermajority ||
onchain_state.validators_against.count_ones() >= supermajority
}
fn partition_recent_disputes(
recent: BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>,
onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>,
) -> PartitionedDisputes {
let mut partitioned = PartitionedDisputes::new();
let time_now = &secs_since_epoch();
for ((session_index, candidate_hash), dispute_state) in recent {
let key = (session_index, candidate_hash);
if dispute_is_inactive(&dispute_state, time_now) {
match onchain.get(&key) {
Some(onchain_state) => {
if concluded_onchain(onchain_state) {
partitioned
.inactive_concluded_onchain
.push((session_index, candidate_hash));
} else {
partitioned
.inactive_unconcluded_onchain
.push((session_index, candidate_hash));
}
},
None => partitioned.inactive_unknown_onchain.push((session_index, candidate_hash)),
}
} else {
match onchain.get(&(session_index, candidate_hash)) {
Some(d) => match concluded_onchain(d) {
true => {
partitioned.active_concluded_onchain.push((session_index, candidate_hash))
},
false => {
partitioned.active_unconcluded_onchain.push((session_index, candidate_hash))
},
},
None => partitioned.active_unknown_onchain.push((session_index, candidate_hash)),
}
}
}
partitioned
}
fn is_vote_worth_to_keep(
validator_index: &ValidatorIndex,
dispute_statement: DisputeStatement,
onchain_state: &DisputeState,
) -> bool {
let (offchain_vote, valid_kind) = match dispute_statement {
DisputeStatement::Valid(kind) => (true, Some(kind)),
DisputeStatement::Invalid(_) => (false, None),
};
if let Some(kind) = valid_kind {
match kind {
ValidDisputeStatementKind::BackingValid(_) |
ValidDisputeStatementKind::BackingSeconded(_) => return true,
_ => (),
}
}
let in_validators_for = onchain_state
.validators_for
.get(validator_index.0 as usize)
.as_deref()
.copied()
.unwrap_or(false);
let in_validators_against = onchain_state
.validators_against
.get(validator_index.0 as usize)
.as_deref()
.copied()
.unwrap_or(false);
if in_validators_for && in_validators_against {
return false;
}
if offchain_vote && in_validators_against || !offchain_vote && in_validators_for {
return true;
}
!in_validators_for && !in_validators_against
}
async fn request_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> BTreeMap<(SessionIndex, CandidateHash), DisputeStatus> {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = rx.await.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, err=?err, "Unable to gather recent disputes");
BTreeMap::new()
});
recent_disputes
}
fn make_multi_dispute_statement_set(
metrics: &metrics::Metrics,
dispute_candidate_votes: BTreeMap<(SessionIndex, CandidateHash), CandidateVotes>,
) -> MultiDisputeStatementSet {
dispute_candidate_votes
.into_iter()
.map(|((session_index, candidate_hash), votes)| {
let valid_statements = votes
.valid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig));
let invalid_statements = votes
.invalid
.into_iter()
.map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig));
metrics.inc_valid_statements_by(valid_statements.len());
metrics.inc_invalid_statements_by(invalid_statements.len());
metrics.inc_dispute_statement_sets_by(1);
DisputeStatementSet {
candidate_hash,
session: session_index,
statements: valid_statements.chain(invalid_statements).collect(),
}
})
.collect()
}
pub async fn get_onchain_disputes<Sender>(
sender: &mut Sender,
relay_parent: Hash,
) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError>
where
Sender: overseer::ProvisionerSenderTrait,
{
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes");
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Disputes(tx)))
.await;
rx.await
.map_err(|_| GetOnchainDisputesError::Channel)
.and_then(|res| {
res.map_err(|e| match e {
RuntimeApiError::Execution { .. } => {
GetOnchainDisputesError::Execution(e, relay_parent)
},
RuntimeApiError::NotSupported { .. } => {
GetOnchainDisputesError::NotSupported(e, relay_parent)
},
})
})
.map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect())
}