#![deny(missing_docs, unused_crate_dependencies)]
use bitvec::vec::BitVec;
use futures::{
channel::oneshot::{self, Canceled},
future::BoxFuture,
prelude::*,
stream::FuturesUnordered,
FutureExt,
};
use futures_timer::Delay;
use polkadot_node_subsystem::{
messages::{
Ancestors, BackableCandidateRef, CandidateBackingMessage, ChainApiMessage,
ProspectiveParachainsMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
SubsystemError,
};
use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt};
use polkadot_primitives::{
BackedCandidate, CandidateEvent, CoreIndex, CoreState, Hash, Id as ParaId,
SignedAvailabilityBitfield, ValidatorIndex,
};
use sc_consensus_slots::time_until_next_slot;
use schnellru::{ByLength, LruMap};
use std::{
collections::{BTreeMap, HashMap},
time::Duration,
};
mod disputes;
mod error;
mod metrics;
pub use self::metrics::*;
use error::{Error, FatalResult};
#[cfg(test)]
mod tests;
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(500);
const LOG_TARGET: &str = "parachain::provisioner";
pub struct ProvisionerSubsystem {
metrics: Metrics,
}
impl ProvisionerSubsystem {
pub fn new(metrics: Metrics) -> Self {
Self { metrics }
}
}
pub struct PerRelayParent {
leaf: ActivatedLeaf,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
is_inherent_ready: bool,
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>,
}
impl PerRelayParent {
fn new(leaf: ActivatedLeaf) -> Self {
Self {
leaf,
signed_bitfields: Vec::new(),
is_inherent_ready: false,
awaiting_inherent: Vec::new(),
}
}
}
type InherentDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
type SlotDelays = FuturesUnordered<BoxFuture<'static, Hash>>;
type InherentReceivers =
FuturesUnordered<BoxFuture<'static, (Hash, Result<ProvisionerInherentData, Canceled>)>>;
#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
impl<Context> ProvisionerSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("provisioner", e))
}
.boxed();
SpawnedSubsystem { name: "provisioner-subsystem", future }
}
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
let mut inherent_delays = InherentDelays::new();
let mut inherent_receivers = InherentReceivers::new();
let mut slot_delays = SlotDelays::new();
let mut per_relay_parent = HashMap::new();
let mut inherents = LruMap::new(ByLength::new(16));
loop {
let result = run_iteration(
&mut ctx,
&mut per_relay_parent,
&mut inherent_delays,
&mut inherent_receivers,
&mut inherents,
&mut slot_delays,
&metrics,
)
.await;
match result {
Ok(()) => break,
err => crate::error::log_error(err)?,
}
}
Ok(())
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn run_iteration<Context>(
ctx: &mut Context,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
inherent_delays: &mut InherentDelays,
inherent_receivers: &mut InherentReceivers,
inherents: &mut LruMap<Hash, ProvisionerInherentData>,
slot_delays: &mut SlotDelays,
metrics: &Metrics,
) -> Result<(), Error> {
loop {
futures::select! {
from_overseer = ctx.recv().fuse() => {
match from_overseer.map_err(Error::OverseerExited)? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
handle_active_leaves_update(ctx, update, per_relay_parent, inherent_delays, slot_delays, inherents, metrics).await?,
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => {
handle_communication(ctx, per_relay_parent, msg, metrics).await?;
},
}
},
hash = slot_delays.select_next_some() => {
gum::debug!(target: LOG_TARGET, leaf_hash=?hash, "Slot start, preparing debug inherent");
let Some(state) = per_relay_parent.get_mut(&hash) else {
continue
};
let (inherent_tx, inherent_rx) = oneshot::channel();
let task = async move {
match inherent_rx.await {
Ok(res) => (hash, Ok(res)),
Err(e) => (hash, Err(e)),
}
}
.boxed();
inherent_receivers.push(task);
send_inherent_data_bg(ctx, &state, vec![inherent_tx], metrics.clone()).await?;
},
(hash, inherent_data) = inherent_receivers.select_next_some() => {
let Ok(inherent_data) = inherent_data else {
continue
};
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Debug Inherent Data became ready"
);
inherents.insert(hash, inherent_data);
}
hash = inherent_delays.select_next_some() => {
if let Some(state) = per_relay_parent.get_mut(&hash) {
state.is_inherent_ready = true;
gum::trace!(
target: LOG_TARGET,
relay_parent = ?hash,
"Inherent Data became ready"
);
let return_senders = std::mem::take(&mut state.awaiting_inherent);
if !return_senders.is_empty() {
send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
}
}
}
}
}
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn handle_active_leaves_update<Context>(
ctx: &mut Context,
update: ActiveLeavesUpdate,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
inherent_delays: &mut InherentDelays,
slot_delays: &mut SlotDelays,
inherents: &mut LruMap<Hash, ProvisionerInherentData>,
metrics: &Metrics,
) -> Result<(), Error> {
gum::trace!(target: LOG_TARGET, "Handle ActiveLeavesUpdate");
for deactivated in &update.deactivated {
per_relay_parent.remove(deactivated);
}
let Some(leaf) = update.activated else { return Ok(()) };
gum::trace!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Adding delay");
let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf.clone()));
inherent_delays.push(delay_fut);
let slot_delay = time_until_next_slot(Duration::from_millis(6000));
gum::debug!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Expecting next slot in {}ms", slot_delay.as_millis());
let slot_delay_task =
Delay::new(slot_delay + PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
slot_delays.push(slot_delay_task);
let Ok(Ok(candidate_events)) =
polkadot_node_subsystem_util::request_candidate_events(leaf.hash, ctx.sender())
.await
.await
else {
gum::warn!(target: LOG_TARGET, leaf_hash=?leaf.hash, "Failed to fetch candidate events");
return Ok(());
};
let in_block_count = candidate_events
.into_iter()
.filter(|event| matches!(event, CandidateEvent::CandidateBacked(_, _, _, _)))
.count() as isize;
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(leaf.hash, tx)).await;
let Ok(Some(header)) = rx.await.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, hash = ?leaf.hash, ?err, "Missing header for block");
Ok(None)
}) else {
return Ok(());
};
gum::trace!(target: LOG_TARGET, hash = ?header.parent_hash, "Looking up debug inherent");
let Some(inherent) = inherents.get(&header.parent_hash) else { return Ok(()) };
let diff = inherent.backed_candidates.len() as isize - in_block_count;
gum::debug!(target: LOG_TARGET,
?diff,
?in_block_count,
local_count = ?inherent.backed_candidates.len(),
leaf_hash=?leaf.hash, "Offchain vs on-chain backing update");
metrics.observe_backable_vs_in_block(diff);
Ok(())
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn handle_communication<Context>(
ctx: &mut Context,
per_relay_parent: &mut HashMap<Hash, PerRelayParent>,
message: ProvisionerMessage,
metrics: &Metrics,
) -> Result<(), Error> {
match message {
ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Inherent data got requested.");
if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
if state.is_inherent_ready {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data.");
send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
.await?;
} else {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"Queuing inherent data request (inherent data not yet ready)."
);
state.awaiting_inherent.push(return_sender);
}
}
},
ProvisionerMessage::ProvisionableData(relay_parent, data) => {
if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
let _timer = metrics.time_provisionable_data();
gum::trace!(target: LOG_TARGET, ?relay_parent, "Received provisionable data: {:?}", &data);
note_provisionable_data(state, data);
}
},
}
Ok(())
}
#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
async fn send_inherent_data_bg<Context>(
ctx: &mut Context,
per_relay_parent: &PerRelayParent,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
metrics: Metrics,
) -> Result<(), Error> {
let leaf = per_relay_parent.leaf.clone();
let signed_bitfields = per_relay_parent.signed_bitfields.clone();
let mut sender = ctx.sender().clone();
let bg = async move {
let _timer = metrics.time_request_inherent_data();
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Sending inherent data in background."
);
let send_result =
send_inherent_data(&leaf, &signed_bitfields, return_senders, &mut sender, &metrics) .timeout(SEND_INHERENT_DATA_TIMEOUT)
.map(|v| match v {
Some(r) => r,
None => Err(Error::SendInherentDataTimeout),
});
match send_result.await {
Err(err) => {
if let Error::CanceledBackedCandidates(_) = err {
gum::debug!(
target: LOG_TARGET,
err = ?err,
"Failed to assemble or send inherent data - block got likely obsoleted already."
);
} else {
gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
}
metrics.on_inherent_data_request(Err(()));
},
Ok(()) => {
metrics.on_inherent_data_request(Ok(()));
gum::debug!(
target: LOG_TARGET,
signed_bitfield_count = signed_bitfields.len(),
leaf_hash = ?leaf.hash,
"inherent data sent successfully"
);
metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
},
}
};
ctx.spawn("send-inherent-data", bg.boxed())
.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
Ok(())
}
fn note_provisionable_data(
per_relay_parent: &mut PerRelayParent,
provisionable_data: ProvisionableData,
) {
match provisionable_data {
ProvisionableData::Bitfield(_, signed_bitfield) => {
per_relay_parent.signed_bitfields.push(signed_bitfield)
},
ProvisionableData::MisbehaviorReport(_, _, _) => {},
ProvisionableData::Dispute(_, _) => {},
}
}
type CoreAvailability = BitVec<u8, bitvec::order::Lsb0>;
async fn send_inherent_data(
leaf: &ActivatedLeaf,
bitfields: &[SignedAvailabilityBitfield],
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut impl overseer::ProvisionerSenderTrait,
metrics: &Metrics,
) -> Result<(), Error> {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Requesting availability cores"
);
let availability_cores = request_availability_cores(leaf.hash, from_job)
.await
.await
.map_err(|err| Error::CanceledAvailabilityCores(err))??;
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Selecting disputes"
);
let disputes = disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await;
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Selected disputes"
);
let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash);
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Selected bitfields"
);
let candidates = select_candidates(&availability_cores, &bitfields, leaf, from_job).await?;
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Selected candidates"
);
gum::debug!(
target: LOG_TARGET,
availability_cores_len = availability_cores.len(),
disputes_count = disputes.len(),
bitfields_count = bitfields.len(),
candidates_count = candidates.len(),
leaf_hash = ?leaf.hash,
"inherent data prepared",
);
let inherent_data =
ProvisionerInherentData { bitfields, backed_candidates: candidates, disputes };
gum::trace!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
"Sending back inherent data to requesters."
);
for return_sender in return_senders {
return_sender
.send(inherent_data.clone())
.map_err(|_data| Error::InherentDataReturnChannel)?;
}
Ok(())
}
fn select_availability_bitfields(
cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
leaf_hash: &Hash,
) -> Vec<SignedAvailabilityBitfield> {
let mut selected: BTreeMap<ValidatorIndex, SignedAvailabilityBitfield> = BTreeMap::new();
gum::debug!(
target: LOG_TARGET,
bitfields_count = bitfields.len(),
?leaf_hash,
"bitfields count before selection"
);
'a: for bitfield in bitfields.iter().cloned() {
if bitfield.payload().0.len() != cores.len() {
gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch");
continue;
}
let is_better = selected
.get(&bitfield.validator_index())
.map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones());
if !is_better {
gum::trace!(
target: LOG_TARGET,
val_idx = bitfield.validator_index().0,
?leaf_hash,
"dropping bitfield due to duplication - the better one is kept"
);
continue;
}
for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) {
if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) {
gum::debug!(
target: LOG_TARGET,
val_idx = bitfield.validator_index().0,
?leaf_hash,
"dropping invalid bitfield - bit is set for an unoccupied core"
);
continue 'a;
}
}
let _ = selected.insert(bitfield.validator_index(), bitfield);
}
gum::debug!(
target: LOG_TARGET,
?leaf_hash,
"selected {} of all {} bitfields (each bitfield is from a unique validator)",
selected.len(),
bitfields.len()
);
selected.into_values().collect()
}
async fn request_backable_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
leaf: &ActivatedLeaf,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<HashMap<ParaId, Vec<BackableCandidateRef>>, Error> {
let block_number_under_construction = leaf.number + 1;
let mut scheduled_cores_per_para: BTreeMap<ParaId, usize> = BTreeMap::new();
let mut ancestors: HashMap<ParaId, Ancestors> =
HashMap::with_capacity(availability_cores.len());
for (core_idx, core) in availability_cores.iter().enumerate() {
let core_idx = CoreIndex(core_idx as u32);
match core {
CoreState::Scheduled(scheduled_core) => {
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
},
CoreState::Occupied(occupied_core) => {
let is_available = bitfields_indicate_availability(
core_idx.0 as usize,
bitfields,
&occupied_core.availability,
);
if is_available {
ancestors
.entry(occupied_core.para_id())
.or_default()
.insert(occupied_core.candidate_hash);
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
}
} else if occupied_core.time_out_at <= block_number_under_construction {
if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out {
*scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1;
}
} else {
ancestors
.entry(occupied_core.para_id())
.or_default()
.insert(occupied_core.candidate_hash);
}
},
CoreState::Free => continue,
};
}
let mut selected_candidates: HashMap<ParaId, Vec<BackableCandidateRef>> =
HashMap::with_capacity(scheduled_cores_per_para.len());
for (para_id, core_count) in scheduled_cores_per_para {
let para_ancestors = ancestors.remove(¶_id).unwrap_or_default();
let response =
get_backable_candidates(leaf.hash, para_id, para_ancestors, core_count as u32, sender)
.await?;
if response.is_empty() {
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?leaf.hash,
?para_id,
"No backable candidate returned by prospective parachains",
);
continue;
}
selected_candidates.insert(para_id, response);
}
Ok(selected_candidates)
}
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
leaf: &ActivatedLeaf,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<Vec<BackedCandidate>, Error> {
gum::trace!(
target: LOG_TARGET,
leaf_hash=?leaf.hash,
"before GetBackedCandidates"
);
let selected_candidates =
request_backable_candidates(availability_cores, bitfields, leaf, sender).await?;
gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates");
let (tx, rx) = oneshot::channel();
sender.send_unbounded_message(CandidateBackingMessage::GetBackableCandidates {
candidates: selected_candidates.clone(),
sender: tx,
});
let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
gum::trace!(
target: LOG_TARGET,
leaf_hash=?leaf.hash,
"Got {} backed candidates", candidates.len()
);
let mut with_validation_code = false;
let mut merged_candidates = Vec::with_capacity(availability_cores.len());
for para_candidates in candidates.into_values() {
for candidate in para_candidates {
if candidate.candidate().commitments.new_validation_code.is_some() {
if with_validation_code {
break;
} else {
with_validation_code = true;
}
}
merged_candidates.push(candidate);
}
}
gum::debug!(
target: LOG_TARGET,
n_candidates = merged_candidates.len(),
n_cores = availability_cores.len(),
leaf=?leaf.hash,
"Selected backed candidates",
);
Ok(merged_candidates)
}
async fn get_backable_candidates(
leaf: Hash,
para_id: ParaId,
ancestors: Ancestors,
count: u32,
sender: &mut impl overseer::ProvisionerSenderTrait,
) -> Result<Vec<BackableCandidateRef>, Error> {
let (tx, rx) = oneshot::channel();
sender
.send_message(ProspectiveParachainsMessage::GetBackableCandidates {
leaf,
para_id,
count,
ancestors,
sender: tx,
})
.await;
rx.await.map_err(Error::CanceledBackableCandidates)
}
fn bitfields_indicate_availability(
core_idx: usize,
bitfields: &[SignedAvailabilityBitfield],
availability: &CoreAvailability,
) -> bool {
let mut availability = availability.clone();
let availability_len = availability.len();
for bitfield in bitfields {
let validator_idx = bitfield.validator_index().0 as usize;
match availability.get_mut(validator_idx) {
None => {
gum::warn!(
target: LOG_TARGET,
validator_idx = %validator_idx,
availability_len = %availability_len,
"attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
validator_idx,
availability_len,
);
return false;
},
Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
}
}
3 * availability.count_ones() >= 2 * availability.len()
}