use crate::{
types::{Balance, CreatedOutput, LedgerIndex, Migration, Receipt, TreasuryOutput},
workers::{
consensus::{metadata::WhiteFlagMetadata, state::validate_ledger_state, white_flag},
error::Error,
event::{MessageReferenced, MilestoneConfirmed, OutputConsumed, OutputCreated},
pruning::{condition::should_prune, config::PruningConfig, prune},
snapshot::{condition::should_snapshot, config::SnapshotConfig, worker::SnapshotWorker},
storage::{self, StorageBackend},
},
};
use bee_message::{
address::Address,
milestone::MilestoneIndex,
output::{Output, OutputId},
payload::{milestone::MilestoneId, receipt::ReceiptPayload, transaction::TransactionId, Payload},
MessageId,
};
use bee_runtime::{event::Bus, node::Node, shutdown_stream::ShutdownStream, worker::Worker};
use bee_tangle::{ConflictReason, Tangle, TangleWorker};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::StreamExt};
use log::{debug, error, info, warn};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use std::any::TypeId;
pub(crate) const EXTRA_SNAPSHOT_DEPTH: u32 = 5;
pub(crate) const EXTRA_PRUNING_DEPTH: u32 = 5;
#[allow(clippy::type_complexity)]
pub enum ConsensusWorkerCommand {
ConfirmMilestone(MessageId),
FetchBalance(Address, oneshot::Sender<(Result<Option<Balance>, Error>, LedgerIndex)>),
FetchOutput(
OutputId,
oneshot::Sender<(Result<Option<CreatedOutput>, Error>, LedgerIndex)>,
),
FetchOutputs(
Address,
oneshot::Sender<(Result<Option<Vec<OutputId>>, Error>, LedgerIndex)>,
),
}
pub struct ConsensusWorker {
pub tx: mpsc::UnboundedSender<ConsensusWorkerCommand>,
}
pub(crate) async fn migration_from_milestone(
milestone_index: MilestoneIndex,
milestone_id: MilestoneId,
receipt: &ReceiptPayload,
consumed_treasury: TreasuryOutput,
) -> Result<Migration, Error> {
let receipt = Receipt::new(receipt.clone(), milestone_index);
receipt.validate(&consumed_treasury)?;
let created_treasury = TreasuryOutput::new(
match receipt.inner().transaction() {
Payload::TreasuryTransaction(treasury) => match treasury.output() {
Output::Treasury(output) => output.clone(),
Output::SignatureLockedDustAllowance(_) | Output::SignatureLockedSingle(_) => {
return Err(Error::UnsupportedOutputKind(treasury.output().kind()));
}
},
Payload::Milestone(_) | Payload::Indexation(_) | Payload::Receipt(_) | Payload::Transaction(_) => {
return Err(Error::UnsupportedPayloadKind(receipt.inner().transaction().kind()));
}
},
milestone_id,
);
Ok(Migration::new(receipt, consumed_treasury, created_treasury))
}
async fn confirm<N: Node>(
tangle: &Tangle<N::Backend>,
storage: &N::Backend,
bus: &Bus<'static>,
message_id: MessageId,
ledger_index: &mut LedgerIndex,
receipt_migrated_at: &mut MilestoneIndex,
) -> Result<(), Error>
where
N::Backend: StorageBackend,
{
let message = tangle
.get(&message_id)
.await
.ok_or(Error::MilestoneMessageNotFound(message_id))?;
let milestone = match message.payload() {
Some(Payload::Milestone(milestone)) => milestone,
_ => return Err(Error::NoMilestonePayload),
};
if milestone.essence().index() != MilestoneIndex(**ledger_index + 1) {
return Err(Error::NonContiguousMilestones(
*milestone.essence().index(),
**ledger_index,
));
}
let mut metadata = WhiteFlagMetadata::new(milestone.essence().index());
white_flag(tangle, storage, message.parents(), &mut metadata).await?;
if !metadata.merkle_proof.eq(&milestone.essence().merkle_proof()) {
return Err(Error::MerkleProofMismatch(
milestone.essence().index(),
hex::encode(metadata.merkle_proof),
hex::encode(milestone.essence().merkle_proof()),
));
}
metadata.referenced_messages += 1;
metadata.excluded_no_transaction_messages.push(message_id);
let migration = if let Some(Payload::Receipt(receipt)) = milestone.essence().receipt() {
let milestone_id = milestone.id();
let transaction_id = TransactionId::new(milestone_id.as_ref().to_vec().try_into().unwrap());
for (index, fund) in receipt.funds().iter().enumerate() {
metadata.created_outputs.insert(
OutputId::new(transaction_id, index as u16)?,
CreatedOutput::new(message_id, Output::from(fund.output().clone())),
);
metadata
.balance_diffs
.amount_add(*fund.output().address(), fund.output().amount())?;
}
if receipt.migrated_at() < *receipt_migrated_at {
return Err(Error::DecreasingReceiptMigratedAtIndex(
receipt.migrated_at(),
*receipt_migrated_at,
));
} else {
*receipt_migrated_at = receipt.migrated_at();
}
if receipt.last() {
*receipt_migrated_at = *receipt_migrated_at + 1;
}
Some(
migration_from_milestone(
milestone.essence().index(),
milestone_id,
receipt,
storage::fetch_unspent_treasury_output(storage)?,
)
.await?,
)
} else {
None
};
storage::apply_milestone(
&*storage,
metadata.index,
&metadata.created_outputs,
&metadata.consumed_outputs,
&metadata.balance_diffs,
&migration,
)?;
*ledger_index = LedgerIndex(milestone.essence().index());
tangle.update_confirmed_milestone_index(milestone.essence().index());
for message_id in metadata.excluded_no_transaction_messages.iter() {
tangle
.update_metadata(message_id, |message_metadata| {
message_metadata.set_conflict(ConflictReason::None);
message_metadata.reference(milestone.essence().timestamp());
})
.await;
bus.dispatch(MessageReferenced {
message_id: *message_id,
});
}
for (message_id, conflict) in metadata.excluded_conflicting_messages.iter() {
tangle
.update_metadata(message_id, |message_metadata| {
message_metadata.set_conflict(*conflict);
message_metadata.reference(milestone.essence().timestamp());
})
.await;
bus.dispatch(MessageReferenced {
message_id: *message_id,
});
}
for message_id in metadata.included_messages.iter() {
tangle
.update_metadata(message_id, |message_metadata| {
message_metadata.set_conflict(ConflictReason::None);
message_metadata.reference(milestone.essence().timestamp());
})
.await;
bus.dispatch(MessageReferenced {
message_id: *message_id,
});
}
info!(
"Confirmed milestone {}: referenced {}, no transaction {}, conflicting {}, included {}, consumed {}, created {}, receipt {}.",
milestone.essence().index(),
metadata.referenced_messages,
metadata.excluded_no_transaction_messages.len(),
metadata.excluded_conflicting_messages.len(),
metadata.included_messages.len(),
metadata.consumed_outputs.len(),
metadata.created_outputs.len(),
milestone.essence().receipt().is_some()
);
bus.dispatch(MilestoneConfirmed {
message_id,
index: milestone.essence().index(),
timestamp: milestone.essence().timestamp(),
referenced_messages: metadata.referenced_messages,
excluded_no_transaction_messages: metadata.excluded_no_transaction_messages,
excluded_conflicting_messages: metadata.excluded_conflicting_messages,
included_messages: metadata.included_messages,
consumed_outputs: metadata.consumed_outputs.len(),
created_outputs: metadata.created_outputs.len(),
receipt: migration.is_some(),
});
for (output_id, created_output) in metadata.created_outputs {
bus.dispatch(OutputCreated {
message_id: *created_output.message_id(),
output_id,
output: created_output.inner().clone(),
});
}
for (output_id, (created_output, _consumed_output)) in metadata.consumed_outputs {
bus.dispatch(OutputConsumed {
message_id: *created_output.message_id(),
output_id,
output: created_output.inner().clone(),
});
}
Ok(())
}
#[async_trait]
impl<N: Node> Worker<N> for ConsensusWorker
where
N::Backend: StorageBackend,
{
type Config = (SnapshotConfig, PruningConfig);
type Error = Error;
fn dependencies() -> &'static [TypeId] {
vec![TypeId::of::<TangleWorker>(), TypeId::of::<SnapshotWorker>()].leak()
}
async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
let (snapshot_config, pruning_config) = config;
let (tx, rx) = mpsc::unbounded_channel();
let tangle = node.resource::<Tangle<N::Backend>>();
let storage = node.storage();
let bus = node.bus();
validate_ledger_state(&*storage)?;
let bmd = tangle.config().below_max_depth();
let snapshot_depth_min = bmd + EXTRA_SNAPSHOT_DEPTH;
let snapshot_depth = if snapshot_config.depth() < snapshot_depth_min {
warn!(
"Configuration value for \"snapshot.depth\" is too low ({}), value changed to {}.",
snapshot_config.depth(),
snapshot_depth_min
);
snapshot_depth_min
} else {
snapshot_config.depth()
};
let snapshot_pruning_delta = bmd + EXTRA_PRUNING_DEPTH;
let pruning_delay_min = snapshot_depth + snapshot_pruning_delta;
let pruning_delay = if pruning_config.delay() < pruning_delay_min {
warn!(
"Configuration value for \"pruning.delay\" is too low ({}), value changed to {}.",
pruning_config.delay(),
pruning_delay_min
);
pruning_delay_min
} else {
pruning_config.delay()
};
let mut ledger_index = storage::fetch_ledger_index(&*storage)?.unwrap();
let mut receipt_migrated_at = MilestoneIndex(0);
node.spawn::<Self, _, _>(|shutdown| async move {
info!("Running.");
let mut receiver = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(rx));
while let Some(event) = receiver.next().await {
match event {
ConsensusWorkerCommand::ConfirmMilestone(message_id) => {
if let Err(e) = confirm::<N>(
&tangle,
&storage,
&bus,
message_id,
&mut ledger_index,
&mut receipt_migrated_at,
)
.await
{
error!("Confirmation error on {}: {}.", message_id, e);
panic!("Aborting due to unexpected ledger error.");
}
if !tangle.is_confirmed() {
continue;
}
match should_snapshot(&tangle, ledger_index, snapshot_depth, &snapshot_config) {
Ok(()) => {
}
Err(reason) => {
debug!("Snapshotting skipped: {:?}", reason);
}
}
match should_prune(&tangle, ledger_index, pruning_delay, &pruning_config) {
Ok((start_index, target_index)) => {
if let Err(e) =
prune::prune(&tangle, &storage, &bus, start_index, target_index, &pruning_config)
.await
{
error!("Pruning failed: {:?}.", e);
}
}
Err(reason) => {
debug!("Pruning skipped: {:?}", reason);
}
}
}
ConsensusWorkerCommand::FetchBalance(address, sender) => {
if let Err(e) = sender.send((storage::fetch_balance(&*storage, &address), ledger_index)) {
error!("Error while sending balance: {:?}", e);
}
}
ConsensusWorkerCommand::FetchOutput(output_id, sender) => {
if let Err(e) = sender.send((storage::fetch_output(&*storage, &output_id), ledger_index)) {
error!("Error while sending output: {:?}", e);
}
}
ConsensusWorkerCommand::FetchOutputs(address, sender) => match address {
Address::Ed25519(address) => {
if let Err(e) = sender.send((
storage::fetch_outputs_for_ed25519_address(&*storage, &address),
ledger_index,
)) {
error!("Error while sending output: {:?}", e);
}
}
},
}
}
info!("Stopped.");
});
Ok(Self { tx })
}
}